migration/
m20241112_000018_logs_add_index.rs

1use sea_orm_migration::prelude::*;
2
3#[derive(DeriveMigrationName)]
4pub struct Migration;
5
6#[async_trait::async_trait]
7impl MigrationTrait for Migration {
8    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
9        // This index enables fast querying for the next unprocessed log.
10        // In a 300MB dataset it reduces the query time from 100ms to 1e-5s.
11        manager
12            .create_index(
13                sea_query::Index::create()
14                    .if_not_exists()
15                    .name("idx_unprocessed_log_status")
16                    .table(LogStatus::Table)
17                    .col((LogStatus::BlockNumber, IndexOrder::Asc))
18                    .col((LogStatus::TransactionIndex, IndexOrder::Asc))
19                    .col((LogStatus::LogIndex, IndexOrder::Asc))
20                    .and_where(Expr::col((LogStatus::Table, LogStatus::Checksum)).is_null())
21                    .unique()
22                    .to_owned(),
23            )
24            .await
25    }
26
27    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
28        manager
29            .drop_index(Index::drop().name("idx_unprocessed_log_status").to_owned())
30            .await
31    }
32}
33
34#[derive(DeriveIden)]
35enum LogStatus {
36    Table,
37    // Values to identify the log.
38    BlockNumber,
39    TransactionIndex,
40    LogIndex,
41    // Indicates whether the log has been processed.
42    #[allow(dead_code)]
43    Processed,
44    // Time when the log was processed.
45    #[allow(dead_code)]
46    ProcessedAt,
47    // Computed checksum of this log and previous logs
48    Checksum,
49}