hopr_chain_indexer/
block.rs

1use std::{
2    path::Path,
3    sync::{
4        Arc,
5        atomic::{AtomicBool, AtomicU64, Ordering},
6    },
7};
8
9use alloy::sol_types::SolEvent;
10use futures::{
11    StreamExt,
12    future::AbortHandle,
13    stream::{self},
14};
15use hopr_bindings::hoprtoken::HoprToken::{Approval, Transfer};
16use hopr_chain_rpc::{BlockWithLogs, FilterSet, HoprIndexerRpcOperations};
17use hopr_chain_types::chain_events::SignificantChainEvent;
18use hopr_crypto_types::types::Hash;
19use hopr_db_api::logs::HoprDbLogOperations;
20use hopr_db_sql::{HoprDbGeneralModelOperations, info::HoprDbInfoOperations};
21use hopr_primitive_types::prelude::*;
22use tracing::{debug, error, info, trace};
23
24use crate::{
25    IndexerConfig,
26    errors::{CoreEthereumIndexerError, Result},
27    snapshot::{SnapshotInfo, SnapshotManager},
28    traits::ChainLogHandler,
29};
30
31#[cfg(all(feature = "prometheus", not(test)))]
32lazy_static::lazy_static! {
33    static ref METRIC_INDEXER_CURRENT_BLOCK: hopr_metrics::metrics::SimpleGauge =
34        hopr_metrics::metrics::SimpleGauge::new(
35            "hopr_indexer_block_number",
36            "Current last processed block number by the indexer",
37    ).unwrap();
38    static ref METRIC_INDEXER_CHECKSUM: hopr_metrics::metrics::SimpleGauge =
39        hopr_metrics::metrics::SimpleGauge::new(
40            "hopr_indexer_checksum",
41            "Contains an unsigned integer that represents the low 32-bits of the Indexer checksum"
42    ).unwrap();
43    static ref METRIC_INDEXER_SYNC_PROGRESS: hopr_metrics::metrics::SimpleGauge =
44        hopr_metrics::metrics::SimpleGauge::new(
45            "hopr_indexer_sync_progress",
46            "Sync progress of the historical data by the indexer",
47    ).unwrap();
48    static ref METRIC_INDEXER_SYNC_SOURCE: hopr_metrics::metrics::MultiGauge =
49        hopr_metrics::metrics::MultiGauge::new(
50            "hopr_indexer_data_source",
51            "Current data source of the Indexer",
52            &["source"],
53    ).unwrap();
54
55}
56
57/// Indexer
58///
59/// Accepts the RPC operational functionality [hopr_chain_rpc::HoprIndexerRpcOperations]
60/// and provides the indexing operation resulting in and output of
61/// [hopr_chain_types::chain_events::SignificantChainEvent] streamed outside the indexer by the unbounded channel.
62///
63/// The roles of the indexer:
64/// 1. prime the RPC endpoint
65/// 2. request an RPC stream of changes to process
66/// 3. process block and log stream
67/// 4. ensure finalization by postponing processing until the head is far enough
68/// 5. store relevant data into the DB
69/// 6. pass the processing on to the business logic
70#[derive(Debug, Clone)]
71pub struct Indexer<T, U, Db>
72where
73    T: HoprIndexerRpcOperations + Send + 'static,
74    U: ChainLogHandler + Send + 'static,
75    Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
76{
77    rpc: Option<T>,
78    db_processor: Option<U>,
79    db: Db,
80    cfg: IndexerConfig,
81    egress: async_channel::Sender<SignificantChainEvent>,
82    // If true (default), the indexer will panic if the event stream is terminated.
83    // Setting it to false is useful for testing.
84    panic_on_completion: bool,
85}
86
87impl<T, U, Db> Indexer<T, U, Db>
88where
89    T: HoprIndexerRpcOperations + Sync + Send + 'static,
90    U: ChainLogHandler + Send + Sync + 'static,
91    Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
92{
93    pub fn new(
94        rpc: T,
95        db_processor: U,
96        db: Db,
97        cfg: IndexerConfig,
98        egress: async_channel::Sender<SignificantChainEvent>,
99    ) -> Self {
100        Self {
101            rpc: Some(rpc),
102            db_processor: Some(db_processor),
103            db,
104            cfg,
105            egress,
106            panic_on_completion: true,
107        }
108    }
109
110    /// Disables the panic on completion.
111    pub fn without_panic_on_completion(mut self) -> Self {
112        self.panic_on_completion = false;
113        self
114    }
115
116    pub async fn start(mut self) -> Result<AbortHandle>
117    where
118        T: HoprIndexerRpcOperations + 'static,
119        U: ChainLogHandler + 'static,
120        Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
121    {
122        if self.rpc.is_none() || self.db_processor.is_none() {
123            return Err(CoreEthereumIndexerError::ProcessError(
124                "indexer cannot start, missing components".into(),
125            ));
126        }
127
128        info!("Starting chain indexing");
129
130        let rpc = self.rpc.take().expect("rpc should be present");
131        let logs_handler = Arc::new(self.db_processor.take().expect("db_processor should be present"));
132        let db = self.db.clone();
133        let tx_significant_events = self.egress.clone();
134        let panic_on_completion = self.panic_on_completion;
135
136        let (log_filters, address_topics) = Self::generate_log_filters(&logs_handler);
137
138        // Check that the contract addresses and topics are consistent with what is in the logs DB,
139        // or if the DB is empty, prime it with the given addresses and topics.
140        db.ensure_logs_origin(address_topics).await?;
141
142        let is_synced = Arc::new(AtomicBool::new(false));
143        let chain_head = Arc::new(AtomicU64::new(0));
144
145        // update the chain head once at startup to get a reference for initial syncing
146        // progress calculation
147        debug!("Updating chain head at indexer startup");
148        Self::update_chain_head(&rpc, chain_head.clone()).await;
149
150        // First, check whether fast sync is enabled and can be performed.
151        // If so:
152        //   1. Download the snapshot if the logs database is empty and the snapshot is enabled
153        //   2. Delete the existing indexed data
154        //   3. Reset fast sync progress
155        //   4. Run the fast sync process until completion
156        //   5. Finally, starting the rpc indexer.
157        let fast_sync_configured = self.cfg.fast_sync;
158        let index_empty = self.db.index_is_empty().await?;
159
160        // Pre-start operations to ensure the indexer is ready, including snapshot fetching
161        self.pre_start().await?;
162
163        #[derive(PartialEq, Eq)]
164        enum FastSyncMode {
165            None,
166            FromScratch,
167            Continue,
168        }
169
170        let will_perform_fast_sync = match (fast_sync_configured, index_empty) {
171            (true, false) => {
172                info!(
173                    "Fast sync is enabled, but the index database is not empty. Fast sync will continue on existing \
174                     unprocessed logs."
175                );
176                FastSyncMode::Continue
177            }
178            (false, true) => {
179                info!("Fast sync is disabled, but the index database is empty. Doing a full re-sync.");
180                // Clean the last processed log from the Log DB, to allow full resync
181                self.db.clear_index_db(None).await?;
182                self.db.set_logs_unprocessed(None, None).await?;
183                FastSyncMode::None
184            }
185            (false, false) => {
186                info!("Fast sync is disabled and the index database is not empty. Continuing normal sync.");
187                FastSyncMode::None
188            }
189            (true, true) => {
190                info!("Fast sync is enabled, starting the fast sync process");
191                // To ensure a proper state, reset any auxiliary data in the database
192                self.db.clear_index_db(None).await?;
193                self.db.set_logs_unprocessed(None, None).await?;
194                FastSyncMode::FromScratch
195            }
196        };
197
198        let (tx, mut rx) = futures::channel::mpsc::channel::<()>(1);
199
200        // Perform the fast-sync if requested
201        if FastSyncMode::None != will_perform_fast_sync {
202            let processed = match will_perform_fast_sync {
203                FastSyncMode::FromScratch => None,
204                FastSyncMode::Continue => Some(false),
205                _ => unreachable!(),
206            };
207
208            #[cfg(all(feature = "prometheus", not(test)))]
209            {
210                METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 1.0);
211                METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 0.0);
212            }
213
214            let log_block_numbers = self.db.get_logs_block_numbers(None, None, processed).await?;
215            let _first_log_block_number = log_block_numbers.first().copied().unwrap_or(0);
216            let _head = chain_head.load(Ordering::Relaxed);
217            for block_number in log_block_numbers {
218                debug!(
219                    block_number,
220                    first_log_block_number = _first_log_block_number,
221                    head = _head,
222                    "computing processed logs"
223                );
224                // Do not pollute the logs with the fast-sync progress
225                Self::process_block_by_id(&db, &logs_handler, block_number, is_synced.load(Ordering::Relaxed)).await?;
226
227                #[cfg(all(feature = "prometheus", not(test)))]
228                {
229                    let progress =
230                        (block_number - _first_log_block_number) as f64 / (_head - _first_log_block_number) as f64;
231                    METRIC_INDEXER_SYNC_PROGRESS.set(progress);
232                }
233            }
234        }
235
236        info!("Building rpc indexer background process");
237
238        let next_block_to_process = if let Some(last_log) = self.db.get_last_checksummed_log().await? {
239            info!(
240                start_block = last_log.block_number,
241                start_checksum = last_log.checksum.unwrap(),
242                "Loaded indexer state",
243            );
244
245            if self.cfg.start_block_number < last_log.block_number {
246                // If some prior indexing took place already, avoid reprocessing
247                last_log.block_number + 1
248            } else {
249                self.cfg.start_block_number
250            }
251        } else {
252            self.cfg.start_block_number
253        };
254
255        info!(next_block_to_process, "Indexer start point");
256
257        let indexing_abort_handle = hopr_async_runtime::spawn_as_abortable!(async move {
258            // Update the chain head once again
259            debug!("Updating chain head at indexer startup");
260            Self::update_chain_head(&rpc, chain_head.clone()).await;
261
262            #[cfg(all(feature = "prometheus", not(test)))]
263            {
264                METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 0.0);
265                METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 1.0);
266            }
267
268            let rpc_ref = &rpc;
269
270            let event_stream = rpc
271                .try_stream_logs(next_block_to_process, log_filters, is_synced.load(Ordering::Relaxed))
272                .expect("block stream should be constructible")
273                .then(|block| {
274                    let db = db.clone();
275                    let chain_head = chain_head.clone();
276                    let is_synced = is_synced.clone();
277                    let tx = tx.clone();
278                    let logs_handler = logs_handler.clone();
279
280                    async move {
281                        Self::calculate_sync_process(
282                            block.block_id,
283                            rpc_ref,
284                            db,
285                            chain_head.clone(),
286                            is_synced.clone(),
287                            next_block_to_process,
288                            tx.clone(),
289                            logs_handler.safe_address().into(),
290                            logs_handler.contract_addresses_map().channels.into(),
291                        )
292                        .await;
293
294                        block
295                    }
296                })
297                .filter_map(|block| {
298                    let db = db.clone();
299                    let logs_handler = logs_handler.clone();
300
301                    async move {
302                        debug!(%block, "storing logs from block");
303                        let logs = block.logs.clone();
304
305                        // Filter out the token contract logs because we do not need to store these
306                        // in the database.
307                        let logs_vec = logs
308                            .into_iter()
309                            .filter(|log| log.address != logs_handler.contract_addresses_map().token)
310                            .collect();
311
312                        match db.store_logs(logs_vec).await {
313                            Ok(store_results) => {
314                                if let Some(error) = store_results
315                                    .into_iter()
316                                    .filter(|r| r.is_err())
317                                    .map(|r| r.unwrap_err())
318                                    .next()
319                                {
320                                    error!(%block, %error, "failed to processed stored logs from block");
321                                    None
322                                } else {
323                                    Some(block)
324                                }
325                            }
326                            Err(error) => {
327                                error!(%block, %error, "failed to store logs from block");
328                                None
329                            }
330                        }
331                    }
332                })
333                .filter_map(|block| {
334                    let db = db.clone();
335                    let logs_handler = logs_handler.clone();
336                    let is_synced = is_synced.clone();
337                    async move {
338                        Self::process_block(&db, &logs_handler, block, false, is_synced.load(Ordering::Relaxed)).await
339                    }
340                })
341                .flat_map(stream::iter);
342
343            futures::pin_mut!(event_stream);
344            while let Some(event) = event_stream.next().await {
345                trace!(%event, "processing on-chain event");
346                // Pass the events further only once we're fully synced
347                if is_synced.load(Ordering::Relaxed) {
348                    if let Err(error) = tx_significant_events.try_send(event) {
349                        error!(%error, "failed to pass a significant chain event further");
350                    }
351                }
352            }
353
354            if panic_on_completion {
355                panic!(
356                    "Indexer event stream has been terminated. This error may be caused by a failed RPC connection."
357                );
358            }
359        });
360
361        if rx.next().await.is_some() {
362            Ok(indexing_abort_handle)
363        } else {
364            Err(crate::errors::CoreEthereumIndexerError::ProcessError(
365                "Error during indexing start".into(),
366            ))
367        }
368    }
369
370    pub async fn pre_start(&self) -> Result<()> {
371        let fast_sync_configured = self.cfg.fast_sync;
372        let index_empty = self.db.index_is_empty().await?;
373
374        // Check if we need to download snapshot before fast sync
375        let logs_db_has_data = self.has_logs_data().await?;
376
377        if fast_sync_configured && index_empty && !logs_db_has_data && self.cfg.enable_logs_snapshot {
378            info!("Logs database is empty, attempting to download logs snapshot...");
379
380            match self.download_snapshot().await {
381                Ok(snapshot_info) => {
382                    info!("Logs snapshot downloaded successfully: {:?}", snapshot_info);
383                }
384                Err(e) => {
385                    error!("Failed to download logs snapshot: {}. Continuing with regular sync.", e);
386                }
387            }
388        }
389
390        Ok(())
391    }
392
393    /// Generates specialized log filters for efficient blockchain event processing.
394    ///
395    /// This function creates a comprehensive set of log filters that optimize
396    /// indexer performance by categorizing filters based on contract types and
397    /// event relevance to the specific node.
398    ///
399    /// # Arguments
400    /// * `logs_handler` - Handler containing contract addresses and safe address
401    ///
402    /// # Returns
403    /// * `(FilterSet, Vec<(Address, Hash)>)` - A tuple containing:
404    ///   - `FilterSet`: Categorized filters for blockchain event processing.
405    ///   - `Vec<(Address, Hash)>`: A vector of address-topic pairs for logs origin validation.
406    ///
407    /// # Filter Categories
408    /// * `all` - Complete set of filters for normal operation
409    /// * `token` - Token-specific filters (Transfer, Approval events for safe)
410    /// * `no_token` - Non-token contract filters for initial sync optimization
411    fn generate_log_filters(logs_handler: &U) -> (FilterSet, Vec<(Address, Hash)>) {
412        let safe_address = logs_handler.safe_address();
413        let addresses_no_token = logs_handler
414            .contract_addresses()
415            .into_iter()
416            .filter(|a| *a != logs_handler.contract_addresses_map().token)
417            .collect::<Vec<_>>();
418        let mut filter_base_addresses = vec![];
419        let mut filter_base_topics = vec![];
420        let mut address_topics = vec![];
421
422        addresses_no_token.iter().for_each(|address| {
423            let topics = logs_handler.contract_address_topics(*address);
424            if !topics.is_empty() {
425                filter_base_addresses.push(alloy::primitives::Address::from(*address));
426                filter_base_topics.extend(topics.clone());
427                for topic in topics.iter() {
428                    address_topics.push((*address, Hash::from(topic.0)))
429                }
430            }
431        });
432
433        let filter_base = alloy::rpc::types::Filter::new()
434            .address(filter_base_addresses)
435            .event_signature(filter_base_topics);
436        let filter_token = alloy::rpc::types::Filter::new().address(alloy::primitives::Address::from(
437            logs_handler.contract_addresses_map().token,
438        ));
439
440        let filter_transfer_to = filter_token
441            .clone()
442            .event_signature(Transfer::SIGNATURE_HASH)
443            .topic2(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()));
444
445        let filter_transfer_from = filter_token
446            .clone()
447            .event_signature(Transfer::SIGNATURE_HASH)
448            .topic1(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()));
449
450        let filter_approval = filter_token
451            .event_signature(Approval::SIGNATURE_HASH)
452            .topic1(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()))
453            .topic2(alloy::primitives::B256::from_slice(
454                logs_handler.contract_addresses_map().channels.to_bytes32().as_ref(),
455            ));
456
457        let set = FilterSet {
458            all: vec![
459                filter_base.clone(),
460                filter_transfer_from.clone(),
461                filter_transfer_to.clone(),
462                filter_approval.clone(),
463            ],
464            token: vec![filter_transfer_from, filter_transfer_to, filter_approval],
465            no_token: vec![filter_base],
466        };
467
468        (set, address_topics)
469    }
470
471    /// Processes a block by its ID.
472    ///
473    /// This function retrieves logs for the given block ID and processes them using the database
474    /// and log handler.
475    ///
476    /// # Arguments
477    ///
478    /// * `db` - The database operations handler.
479    /// * `logs_handler` - The database log handler.
480    /// * `block_id` - The ID of the block to process.
481    ///
482    /// # Returns
483    ///
484    /// A `Result` containing an optional vector of significant chain events if the operation succeeds or an error if it
485    /// fails.
486    async fn process_block_by_id(
487        db: &Db,
488        logs_handler: &U,
489        block_id: u64,
490        is_synced: bool,
491    ) -> crate::errors::Result<Option<Vec<SignificantChainEvent>>>
492    where
493        U: ChainLogHandler + 'static,
494        Db: HoprDbLogOperations + 'static,
495    {
496        let logs = db.get_logs(Some(block_id), Some(0)).await?;
497        let mut block = BlockWithLogs {
498            block_id,
499            ..Default::default()
500        };
501
502        for log in logs {
503            if log.block_number == block_id {
504                block.logs.insert(log);
505            } else {
506                error!(
507                    expected = block_id,
508                    actual = log.block_number,
509                    "block number mismatch in logs from database"
510                );
511                panic!("block number mismatch in logs from database")
512            }
513        }
514
515        Ok(Self::process_block(db, logs_handler, block, true, is_synced).await)
516    }
517
518    /// Processes a block and its logs.
519    ///
520    /// This function collects events from the block logs and updates the database with the processed logs.
521    ///
522    /// # Arguments
523    ///
524    /// * `db` - The database operations handler.
525    /// * `logs_handler` - The database log handler.
526    /// * `block` - The block with logs to process.
527    /// * `fetch_checksum_from_db` - A boolean indicating whether to fetch the checksum from the database.
528    ///
529    /// # Returns
530    ///
531    /// An optional vector of significant chain events if the operation succeeds.
532    async fn process_block(
533        db: &Db,
534        logs_handler: &U,
535        block: BlockWithLogs,
536        fetch_checksum_from_db: bool,
537        is_synced: bool,
538    ) -> Option<Vec<SignificantChainEvent>>
539    where
540        U: ChainLogHandler + 'static,
541        Db: HoprDbLogOperations + 'static,
542    {
543        let block_id = block.block_id;
544        let log_count = block.logs.len();
545        debug!(block_id, "processing events");
546
547        // FIXME: The block indexing and marking as processed should be done in a single
548        // transaction. This is difficult since currently this would be across databases.
549        let events = stream::iter(block.logs.clone())
550            .filter_map(|log| async move {
551                match logs_handler.collect_log_event(log.clone(), is_synced).await {
552                    Ok(data) => match db.set_log_processed(log).await {
553                        Ok(_) => data,
554                        Err(error) => {
555                            error!(block_id, %error, "failed to mark log as processed, panicking to prevent data loss");
556                            panic!("failed to mark log as processed, panicking to prevent data loss")
557                        }
558                    },
559                    Err(CoreEthereumIndexerError::ProcessError(error)) => {
560                        error!(block_id, %error, "failed to process log into event, continuing indexing");
561                        None
562                    }
563                    Err(error) => {
564                        error!(block_id, %error, "failed to process log into event, panicking to prevent data loss");
565                        panic!("failed to process log into event, panicking to prevent data loss")
566                    }
567                }
568            })
569            .collect::<Vec<SignificantChainEvent>>()
570            .await;
571
572        // if we made it this far, no errors occurred and we can update checksums and indexer state
573        match db.update_logs_checksums().await {
574            Ok(last_log_checksum) => {
575                let checksum = if fetch_checksum_from_db {
576                    let last_log = block.logs.into_iter().next_back()?;
577                    let log = db.get_log(block_id, last_log.tx_index, last_log.log_index).await.ok()?;
578
579                    log.checksum?
580                } else {
581                    last_log_checksum.to_string()
582                };
583
584                if log_count != 0 {
585                    info!(
586                        block_number = block_id,
587                        log_count, last_log_checksum = ?checksum, "Indexer state update",
588                    );
589
590                    #[cfg(all(feature = "prometheus", not(test)))]
591                    {
592                        if let Ok(checksum_hash) = Hash::from_hex(checksum.as_str()) {
593                            let low_4_bytes =
594                                hopr_primitive_types::prelude::U256::from_big_endian(checksum_hash.as_ref()).low_u32();
595                            METRIC_INDEXER_CHECKSUM.set(low_4_bytes.into());
596                        } else {
597                            error!("Invalid checksum generated from logs");
598                        }
599                    }
600                }
601
602                // finally update the block number in the database to the last processed block
603                match db.set_indexer_state_info(None, block_id as u32).await {
604                    Ok(_) => {
605                        trace!(block_id, "updated indexer state info");
606                    }
607                    Err(error) => error!(block_id, %error, "failed to update indexer state info"),
608                }
609            }
610            Err(error) => error!(block_id, %error, "failed to update checksums for logs from block"),
611        }
612
613        debug!(
614            block_id,
615            num_events = events.len(),
616            "processed significant chain events from block",
617        );
618
619        Some(events)
620    }
621
622    async fn update_chain_head(rpc: &T, chain_head: Arc<AtomicU64>) -> u64
623    where
624        T: HoprIndexerRpcOperations + 'static,
625    {
626        match rpc.block_number().await {
627            Ok(head) => {
628                chain_head.store(head, Ordering::Relaxed);
629                debug!(head, "Updated chain head");
630                head
631            }
632            Err(error) => {
633                error!(%error, "Failed to fetch block number from RPC");
634                panic!("Failed to fetch block number from RPC, cannot continue indexing due to {error}")
635            }
636        }
637    }
638
639    /// Calculates the synchronization progress.
640    ///
641    /// This function processes a block and updates synchronization metrics and state.
642    ///
643    /// # Arguments
644    ///
645    /// * `block` - The block with logs to process.
646    /// * `rpc` - The RPC operations handler.
647    /// * `chain_head` - The current chain head block number.
648    /// * `is_synced` - A boolean indicating whether the indexer is synced.
649    /// * `start_block` - The first block number to process.
650    /// * `tx` - A sender channel for synchronization notifications.
651    ///
652    /// # Returns
653    ///
654    /// The block which was provided as input.
655    #[allow(clippy::too_many_arguments)]
656    async fn calculate_sync_process(
657        current_block: u64,
658        rpc: &T,
659        db: Db,
660        chain_head: Arc<AtomicU64>,
661        is_synced: Arc<AtomicBool>,
662        next_block_to_process: u64,
663        mut tx: futures::channel::mpsc::Sender<()>,
664        safe_address: Option<Address>,
665        channels_address: Option<Address>,
666    ) where
667        T: HoprIndexerRpcOperations + 'static,
668        Db: HoprDbInfoOperations + Clone + Send + Sync + 'static,
669    {
670        #[cfg(all(feature = "prometheus", not(test)))]
671        {
672            METRIC_INDEXER_CURRENT_BLOCK.set(current_block as f64);
673        }
674
675        let mut head = chain_head.load(Ordering::Relaxed);
676
677        // We only print out sync progress if we are not yet synced.
678        // Once synced, we don't print out progress anymore.
679        if !is_synced.load(Ordering::Relaxed) {
680            let mut block_difference = head.saturating_sub(next_block_to_process);
681
682            let progress = if block_difference == 0 {
683                // Before we call the sync complete, we check the chain again.
684                head = Self::update_chain_head(rpc, chain_head.clone()).await;
685                block_difference = head.saturating_sub(next_block_to_process);
686
687                if block_difference == 0 {
688                    1_f64
689                } else {
690                    (current_block - next_block_to_process) as f64 / block_difference as f64
691                }
692            } else {
693                (current_block - next_block_to_process) as f64 / block_difference as f64
694            };
695
696            info!(
697                progress = progress * 100_f64,
698                block = current_block,
699                head,
700                "Sync progress to last known head"
701            );
702
703            #[cfg(all(feature = "prometheus", not(test)))]
704            METRIC_INDEXER_SYNC_PROGRESS.set(progress);
705
706            if current_block >= head {
707                info!("indexer sync completed successfully");
708                is_synced.store(true, Ordering::Relaxed);
709
710                if let Some(safe_address) = safe_address {
711                    info!("updating safe balance from chain after indexer sync completed");
712                    match rpc.get_hopr_balance(safe_address).await {
713                        Ok(balance) => {
714                            if let Err(error) = db.set_safe_hopr_balance(None, balance).await {
715                                error!(%error, "failed to update safe balance from chain after indexer sync completed");
716                            }
717                        }
718                        Err(error) => {
719                            error!(%error, "failed to fetch safe balance from chain after indexer sync completed");
720                        }
721                    }
722                }
723
724                if let Some((channels_address, safe_address)) = channels_address.zip(safe_address) {
725                    info!("updating safe allowance from chain after indexer sync completed");
726                    match rpc.get_hopr_allowance(safe_address, channels_address).await {
727                        Ok(allowance) => {
728                            if let Err(error) = db.set_safe_hopr_allowance(None, allowance).await {
729                                error!(%error, "failed to update safe allowance from chain after indexer sync completed");
730                            }
731                        }
732                        Err(error) => {
733                            error!(%error, "failed to fetch safe allowance from chain after indexer sync completed");
734                        }
735                    }
736                }
737
738                if let Err(error) = tx.try_send(()) {
739                    error!(%error, "failed to notify about achieving indexer synchronization")
740                }
741            }
742        }
743    }
744
745    /// Checks if the logs database has any existing data.
746    ///
747    /// This method determines whether the database already contains logs, which helps
748    /// decide whether to download a snapshot for faster synchronization. It queries
749    /// the database for the total log count and returns an error if the query fails
750    /// (e.g., when the database doesn't exist yet).
751    ///
752    /// # Returns
753    ///
754    /// - `Ok(true)` if the database contains one or more logs
755    /// - `Ok(false)` if the database is empty
756    /// - `Err(e)` if the database cannot be queried
757    async fn has_logs_data(&self) -> Result<bool> {
758        self.db
759            .get_logs_count(None, None)
760            .await
761            .map(|count| count > 0)
762            .map_err(|e| CoreEthereumIndexerError::SnapshotError(e.to_string()))
763    }
764
765    /// Downloads and installs a database snapshot for faster initial synchronization.
766    ///
767    /// This method coordinates the snapshot download process by:
768    /// 1. Validating the indexer configuration
769    /// 2. Creating a snapshot manager instance
770    /// 3. Downloading and extracting the snapshot to the data directory
771    ///
772    /// Snapshots allow new nodes to quickly synchronize with the network by downloading
773    /// pre-built database files instead of fetching all historical logs from scratch.
774    ///
775    /// # Returns
776    ///
777    /// - `Ok(SnapshotInfo)` containing details about the downloaded snapshot
778    /// - `Err(CoreEthereumIndexerError::SnapshotError)` if validation or download fails
779    ///
780    /// # Prerequisites
781    ///
782    /// - Configuration must be valid (proper URL format, data directory set)
783    /// - Sufficient disk space must be available
784    /// - Network connectivity to the snapshot URL
785    pub async fn download_snapshot(&self) -> Result<SnapshotInfo> {
786        // Validate config before proceeding
787        if let Err(e) = self.cfg.validate() {
788            return Err(CoreEthereumIndexerError::SnapshotError(e.to_string()));
789        }
790
791        let snapshot_manager = SnapshotManager::with_db(self.db.clone())
792            .map_err(|e| CoreEthereumIndexerError::SnapshotError(e.to_string()))?;
793
794        let data_dir = Path::new(&self.cfg.data_directory);
795
796        // The URL has been verified so we can just use it.
797        if let Some(url) = &self.cfg.logs_snapshot_url {
798            snapshot_manager
799                .download_and_setup_snapshot(url, data_dir)
800                .await
801                .map_err(|e| CoreEthereumIndexerError::SnapshotError(e.to_string()))
802        } else {
803            Err(CoreEthereumIndexerError::SnapshotError(
804                "Logs snapshot URL is not configured".to_string(),
805            ))
806        }
807    }
808}
809
810#[cfg(test)]
811mod tests {
812    use std::{collections::BTreeSet, pin::Pin};
813
814    use alloy::{
815        dyn_abi::DynSolValue,
816        primitives::{Address as AlloyAddress, B256},
817        sol_types::SolEvent,
818    };
819    use async_trait::async_trait;
820    use futures::{Stream, join};
821    use hex_literal::hex;
822    use hopr_chain_rpc::BlockWithLogs;
823    use hopr_chain_types::{ContractAddresses, chain_events::ChainEventType};
824    use hopr_crypto_types::{
825        keypairs::{Keypair, OffchainKeypair},
826        prelude::ChainKeypair,
827    };
828    use hopr_db_sql::{accounts::HoprDbAccountOperations, db::HoprDb};
829    use hopr_internal_types::account::{AccountEntry, AccountType};
830    use hopr_primitive_types::prelude::*;
831    use mockall::mock;
832    use multiaddr::Multiaddr;
833
834    use super::*;
835    use crate::traits::MockChainLogHandler;
836
837    lazy_static::lazy_static! {
838        static ref ALICE_OKP: OffchainKeypair = OffchainKeypair::random();
839        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
840        static ref ALICE: Address = ALICE_KP.public().to_address();
841        static ref BOB_OKP: OffchainKeypair = OffchainKeypair::random();
842        static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
843        static ref CHRIS: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
844
845        static ref RANDOM_ANNOUNCEMENT_CHAIN_EVENT: ChainEventType = ChainEventType::Announcement {
846            peer: (*OffchainKeypair::from_secret(&hex!("14d2d952715a51aadbd4cc6bfac9aa9927182040da7b336d37d5bb7247aa7566")).expect("lazy static keypair should be constructible").public()).into(),
847            address: hex!("2f4b7662a192b8125bbf51cfbf1bf5cc00b2c8e5").into(),
848            multiaddresses: vec![Multiaddr::empty()],
849        };
850    }
851
852    fn build_announcement_logs(
853        address: Address,
854        size: usize,
855        block_number: u64,
856        starting_log_index: u64,
857    ) -> anyhow::Result<Vec<SerializableLog>> {
858        let mut logs: Vec<SerializableLog> = vec![];
859        let block_hash = Hash::create(&[format!("my block hash {block_number}").as_bytes()]);
860
861        for i in 0..size {
862            let test_multiaddr: Multiaddr = format!("/ip4/1.2.3.4/tcp/{}", 1000 + i).parse()?;
863            let tx_index: u64 = i as u64;
864            let log_index: u64 = starting_log_index + tx_index;
865
866            logs.push(SerializableLog {
867                address,
868                block_hash: block_hash.into(),
869                topics: vec![hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH.into()],
870                data: DynSolValue::Tuple(vec![
871                    DynSolValue::Address(AlloyAddress::from_slice(address.as_ref())),
872                    DynSolValue::String(test_multiaddr.to_string()),
873                ])
874                .abi_encode(),
875                tx_hash: Hash::create(&[format!("my tx hash {i}").as_bytes()]).into(),
876                tx_index,
877                block_number,
878                log_index,
879                ..Default::default()
880            });
881        }
882
883        Ok(logs)
884    }
885
886    mock! {
887        HoprIndexerOps {}     // Name of the mock struct, less the "Mock" prefix
888
889        #[async_trait]
890        impl HoprIndexerRpcOperations for HoprIndexerOps {
891            async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
892            async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
893            async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance>;
894            async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
895
896            fn try_stream_logs<'a>(
897                &'a self,
898                start_block_number: u64,
899                filters: FilterSet,
900                is_synced: bool,
901            ) -> hopr_chain_rpc::errors::Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>>;
902        }
903    }
904
905    #[tokio::test]
906    async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_none_if_none_is_found()
907    -> anyhow::Result<()> {
908        let mut handlers = MockChainLogHandler::new();
909        let mut rpc = MockHoprIndexerOps::new();
910        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
911
912        let addr = Address::new(b"my address 123456789");
913        let topic = Hash::create(&[b"my topic"]);
914        db.ensure_logs_origin(vec![(addr, topic)]).await?;
915
916        handlers.expect_contract_addresses().return_const(vec![addr]);
917        handlers
918            .expect_contract_address_topics()
919            .withf(move |x| x == &addr)
920            .return_const(vec![B256::from_slice(topic.as_ref())]);
921        handlers
922            .expect_contract_address_topics()
923            .withf(move |x| x == &addr)
924            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
925        handlers
926            .expect_safe_address()
927            .return_const(Address::new(b"my safe address 1234"));
928        handlers
929            .expect_contract_addresses_map()
930            .return_const(ContractAddresses::default());
931
932        let head_block = 1000;
933        rpc.expect_block_number().times(2).returning(move || Ok(head_block));
934
935        let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
936        rpc.expect_try_stream_logs()
937            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
938            .return_once(move |_, _, _| Ok(Box::pin(rx)));
939
940        let indexer = Indexer::new(
941            rpc,
942            handlers,
943            db.clone(),
944            IndexerConfig::default(),
945            async_channel::unbounded().0,
946        )
947        .without_panic_on_completion();
948
949        let (indexing, _) = join!(indexer.start(), async move {
950            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
951            tx.close_channel()
952        });
953        assert!(indexing.is_err()); // terminated by the close channel
954
955        Ok(())
956    }
957
958    #[tokio::test]
959    async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_it_when_found() -> anyhow::Result<()>
960    {
961        let mut handlers = MockChainLogHandler::new();
962        let mut rpc = MockHoprIndexerOps::new();
963        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
964        let head_block = 1000;
965        let latest_block = 15u64;
966
967        let addr = Address::new(b"my address 123456789");
968        let topic = Hash::create(&[b"my topic"]);
969
970        handlers.expect_contract_addresses().return_const(vec![addr]);
971        handlers
972            .expect_contract_address_topics()
973            .withf(move |x| x == &addr)
974            .return_const(vec![B256::from_slice(topic.as_ref())]);
975        handlers
976            .expect_contract_address_topics()
977            .withf(move |x| x == &addr)
978            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
979        handlers
980            .expect_safe_address()
981            .return_const(Address::new(b"my safe address 1234"));
982        handlers
983            .expect_contract_addresses_map()
984            .return_const(ContractAddresses::default());
985
986        db.ensure_logs_origin(vec![(addr, topic)]).await?;
987
988        rpc.expect_block_number().times(2).returning(move || Ok(head_block));
989
990        let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
991        rpc.expect_try_stream_logs()
992            .once()
993            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == latest_block + 1)
994            .return_once(move |_, _, _| Ok(Box::pin(rx)));
995
996        // insert and process latest block
997        let log_1 = SerializableLog {
998            address: Address::new(b"my address 123456789"),
999            topics: [Hash::create(&[b"my topic"]).into()].into(),
1000            data: [1, 2, 3, 4].into(),
1001            tx_index: 1u64,
1002            block_number: latest_block,
1003            block_hash: Hash::create(&[b"my block hash"]).into(),
1004            tx_hash: Hash::create(&[b"my tx hash"]).into(),
1005            log_index: 1u64,
1006            removed: false,
1007            processed: Some(false),
1008            ..Default::default()
1009        };
1010        assert!(db.store_log(log_1.clone()).await.is_ok());
1011        assert!(db.set_logs_processed(Some(latest_block), Some(0)).await.is_ok());
1012        assert!(db.update_logs_checksums().await.is_ok());
1013
1014        let indexer = Indexer::new(
1015            rpc,
1016            handlers,
1017            db.clone(),
1018            IndexerConfig {
1019                fast_sync: false,
1020                ..Default::default()
1021            },
1022            async_channel::unbounded().0,
1023        )
1024        .without_panic_on_completion();
1025
1026        let (indexing, _) = join!(indexer.start(), async move {
1027            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1028            tx.close_channel()
1029        });
1030        assert!(indexing.is_err()); // terminated by the close channel
1031
1032        Ok(())
1033    }
1034
1035    #[tokio::test]
1036    async fn test_indexer_should_pass_blocks_that_are_finalized() -> anyhow::Result<()> {
1037        let mut handlers = MockChainLogHandler::new();
1038        let mut rpc = MockHoprIndexerOps::new();
1039        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1040
1041        let cfg = IndexerConfig::default();
1042
1043        let addr = Address::new(b"my address 123456789");
1044        handlers.expect_contract_addresses().return_const(vec![addr]);
1045        handlers
1046            .expect_contract_address_topics()
1047            .withf(move |x| x == &addr)
1048            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1049        handlers
1050            .expect_safe_address()
1051            .return_const(Address::new(b"my safe address 1234"));
1052        handlers
1053            .expect_contract_addresses_map()
1054            .return_const(ContractAddresses::default());
1055
1056        let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1057        rpc.expect_try_stream_logs()
1058            .times(1)
1059            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
1060            .return_once(move |_, _, _| Ok(Box::pin(rx)));
1061
1062        let head_block = 1000;
1063        rpc.expect_block_number().returning(move || Ok(head_block));
1064
1065        rpc.expect_get_hopr_balance()
1066            .withf(move |x| x == &Address::new(b"my safe address 1234"))
1067            .returning(move |_| Ok(HoprBalance::default()));
1068
1069        rpc.expect_get_hopr_allowance()
1070            .withf(move |x, y| x == &Address::new(b"my safe address 1234") && y == &Address::from([0; 20]))
1071            .returning(move |_, _| Ok(HoprBalance::default()));
1072
1073        let finalized_block = BlockWithLogs {
1074            block_id: head_block - 1,
1075            logs: BTreeSet::from_iter(build_announcement_logs(*BOB, 4, head_block - 1, 23)?),
1076        };
1077        let head_allowing_finalization = BlockWithLogs {
1078            block_id: head_block,
1079            logs: BTreeSet::new(),
1080        };
1081
1082        // called once per block which is finalizable
1083        handlers
1084            .expect_collect_log_event()
1085            // .times(2)
1086            .times(finalized_block.logs.len())
1087            .returning(|_, _| Ok(None));
1088
1089        assert!(tx.start_send(finalized_block.clone()).is_ok());
1090        assert!(tx.start_send(head_allowing_finalization.clone()).is_ok());
1091
1092        let indexer =
1093            Indexer::new(rpc, handlers, db.clone(), cfg, async_channel::unbounded().0).without_panic_on_completion();
1094        let _ = join!(indexer.start(), async move {
1095            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1096            tx.close_channel()
1097        });
1098
1099        Ok(())
1100    }
1101
1102    #[test_log::test(tokio::test)]
1103    async fn test_indexer_fast_sync_full_with_resume() -> anyhow::Result<()> {
1104        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1105
1106        let addr = Address::new(b"my address 123456789");
1107        let topic = Hash::create(&[b"my topic"]);
1108
1109        // Run 1: Fast sync enabled, index empty
1110        {
1111            let logs = vec![
1112                build_announcement_logs(*BOB, 1, 1, 1)?,
1113                build_announcement_logs(*BOB, 1, 2, 1)?,
1114            ]
1115            .into_iter()
1116            .flatten()
1117            .collect::<Vec<_>>();
1118
1119            assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1120
1121            for log in logs {
1122                assert!(db.store_log(log).await.is_ok());
1123            }
1124            assert!(db.update_logs_checksums().await.is_ok());
1125            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 0);
1126            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
1127
1128            let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1129            let (tx_events, _) = async_channel::unbounded();
1130
1131            let head_block = 5;
1132            let mut rpc = MockHoprIndexerOps::new();
1133            rpc.expect_block_number().returning(move || Ok(head_block));
1134            rpc.expect_try_stream_logs()
1135                .times(1)
1136                .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 3)
1137                .return_once(move |_, _, _| Ok(Box::pin(rx)));
1138
1139            let mut handlers = MockChainLogHandler::new();
1140            handlers.expect_contract_addresses().return_const(vec![addr]);
1141            handlers
1142                .expect_contract_address_topics()
1143                .withf(move |x| x == &addr)
1144                .return_const(vec![B256::from_slice(topic.as_ref())]);
1145            handlers
1146                .expect_collect_log_event()
1147                .times(2)
1148                .withf(move |l, _| [1, 2].contains(&l.block_number))
1149                .returning(|_, _| Ok(None));
1150            handlers
1151                .expect_contract_address_topics()
1152                .withf(move |x| x == &addr)
1153                .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1154            handlers
1155                .expect_safe_address()
1156                .return_const(Address::new(b"my safe address 1234"));
1157            handlers
1158                .expect_contract_addresses_map()
1159                .return_const(ContractAddresses::default());
1160
1161            let indexer_cfg = IndexerConfig::new(0, true, false, None, "/tmp/test_data".to_string());
1162            let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1163            let (indexing, _) = join!(indexer.start(), async move {
1164                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1165                tx.close_channel()
1166            });
1167            assert!(indexing.is_err()); // terminated by the close channel
1168
1169            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
1170            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
1171
1172            // At the end we need to simulate that the index is not empty,
1173            // thus storing some data.
1174            db.insert_account(
1175                None,
1176                AccountEntry {
1177                    public_key: *ALICE_OKP.public(),
1178                    chain_addr: *ALICE,
1179                    entry_type: AccountType::NotAnnounced,
1180                    published_at: 1,
1181                },
1182            )
1183            .await?;
1184            db.insert_account(
1185                None,
1186                AccountEntry {
1187                    public_key: *BOB_OKP.public(),
1188                    chain_addr: *BOB,
1189                    entry_type: AccountType::NotAnnounced,
1190                    published_at: 1,
1191                },
1192            )
1193            .await?;
1194        }
1195
1196        // Run 2: Fast sync enabled, index not empty, resume after 2 logs
1197        {
1198            let logs = vec![
1199                build_announcement_logs(*BOB, 1, 3, 1)?,
1200                build_announcement_logs(*BOB, 1, 4, 1)?,
1201            ]
1202            .into_iter()
1203            .flatten()
1204            .collect::<Vec<_>>();
1205
1206            assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1207
1208            for log in logs {
1209                assert!(db.store_log(log).await.is_ok());
1210            }
1211            assert!(db.update_logs_checksums().await.is_ok());
1212            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
1213            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
1214
1215            let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1216            let (tx_events, _) = async_channel::unbounded();
1217
1218            let head_block = 5;
1219            let mut rpc = MockHoprIndexerOps::new();
1220            rpc.expect_block_number().returning(move || Ok(head_block));
1221            rpc.expect_try_stream_logs()
1222                .times(1)
1223                .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 5)
1224                .return_once(move |_, _, _| Ok(Box::pin(rx)));
1225
1226            let mut handlers = MockChainLogHandler::new();
1227            handlers.expect_contract_addresses().return_const(vec![addr]);
1228            handlers
1229                .expect_contract_address_topics()
1230                .withf(move |x| x == &addr)
1231                .return_const(vec![B256::from_slice(topic.as_ref())]);
1232
1233            handlers
1234                .expect_collect_log_event()
1235                .times(2)
1236                .withf(move |l, _| [3, 4].contains(&l.block_number))
1237                .returning(|_, _| Ok(None));
1238            handlers
1239                .expect_contract_address_topics()
1240                .withf(move |x| x == &addr)
1241                .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1242            handlers
1243                .expect_safe_address()
1244                .return_const(Address::new(b"my safe address 1234"));
1245            handlers
1246                .expect_contract_addresses_map()
1247                .return_const(ContractAddresses::default());
1248
1249            let indexer_cfg = IndexerConfig::new(0, true, false, None, "/tmp/test_data".to_string());
1250            let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1251            let (indexing, _) = join!(indexer.start(), async move {
1252                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1253                tx.close_channel()
1254            });
1255            assert!(indexing.is_err()); // terminated by the close channel
1256
1257            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 4);
1258            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
1259        }
1260
1261        Ok(())
1262    }
1263
1264    #[test_log::test(tokio::test)]
1265    async fn test_indexer_should_yield_back_once_the_past_events_are_indexed() -> anyhow::Result<()> {
1266        let mut handlers = MockChainLogHandler::new();
1267        let mut rpc = MockHoprIndexerOps::new();
1268        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1269
1270        let cfg = IndexerConfig::default();
1271
1272        // We don't want to index anything really
1273        let addr = Address::new(b"my address 123456789");
1274        handlers.expect_contract_addresses().return_const(vec![addr]);
1275        handlers
1276            .expect_contract_address_topics()
1277            .withf(move |x| x == &addr)
1278            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1279        handlers
1280            .expect_contract_address_topics()
1281            .withf(move |x| x == &addr)
1282            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1283        handlers
1284            .expect_safe_address()
1285            .return_const(Address::new(b"my safe address 1234"));
1286        handlers
1287            .expect_contract_addresses_map()
1288            .return_const(ContractAddresses::default());
1289
1290        let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1291        // Expected to be called once starting at 0 and yield the respective blocks
1292        rpc.expect_try_stream_logs()
1293            .times(1)
1294            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
1295            .return_once(move |_, _, _| Ok(Box::pin(rx)));
1296        rpc.expect_get_hopr_balance()
1297            .once()
1298            .return_once(move |_| Ok(HoprBalance::zero()));
1299        rpc.expect_get_hopr_allowance()
1300            .once()
1301            .return_once(move |_, _| Ok(HoprBalance::zero()));
1302
1303        let head_block = 1000;
1304        let block_numbers = [head_block - 1, head_block, head_block + 1];
1305
1306        let blocks: Vec<BlockWithLogs> = block_numbers
1307            .iter()
1308            .map(|block_id| BlockWithLogs {
1309                block_id: *block_id,
1310                logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, *block_id, 23).unwrap()),
1311            })
1312            .collect();
1313
1314        for _ in 0..(blocks.len() as u64) {
1315            rpc.expect_block_number().returning(move || Ok(head_block));
1316        }
1317
1318        for block in blocks.iter() {
1319            assert!(tx.start_send(block.clone()).is_ok());
1320        }
1321
1322        // Generate the expected events to be able to process the blocks
1323        handlers
1324            .expect_collect_log_event()
1325            .times(1)
1326            .withf(move |l, _| block_numbers.contains(&l.block_number))
1327            .returning(|l, _| {
1328                let block_number = l.block_number;
1329                Ok(Some(SignificantChainEvent {
1330                    tx_hash: Hash::create(&[format!("my tx hash {block_number}").as_bytes()]),
1331                    event_type: RANDOM_ANNOUNCEMENT_CHAIN_EVENT.clone(),
1332                }))
1333            });
1334
1335        let (tx_events, rx_events) = async_channel::unbounded();
1336        let indexer = Indexer::new(rpc, handlers, db.clone(), cfg, tx_events).without_panic_on_completion();
1337        indexer.start().await?;
1338
1339        // At this point we expect 2 events to arrive. The third event, which was generated first,
1340        // should be dropped because it was generated before the indexer was in sync with head.
1341        let _first = rx_events.recv();
1342        let _second = rx_events.recv();
1343        let third = rx_events.try_recv();
1344
1345        assert!(third.is_err());
1346
1347        Ok(())
1348    }
1349
1350    #[test_log::test(tokio::test)]
1351    async fn test_indexer_should_not_reprocess_last_processed_block() -> anyhow::Result<()> {
1352        let last_processed_block = 100_u64;
1353
1354        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1355
1356        let addr = Address::new(b"my address 123456789");
1357        let topic = Hash::create(&[b"my topic"]);
1358        assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1359
1360        // insert and process latest block
1361        let log_1 = SerializableLog {
1362            address: Address::new(b"my address 123456789"),
1363            topics: [Hash::create(&[b"my topic"]).into()].into(),
1364            data: [1, 2, 3, 4].into(),
1365            tx_index: 1u64,
1366            block_number: last_processed_block,
1367            block_hash: Hash::create(&[b"my block hash"]).into(),
1368            tx_hash: Hash::create(&[b"my tx hash"]).into(),
1369            log_index: 1u64,
1370            removed: false,
1371            processed: Some(false),
1372            ..Default::default()
1373        };
1374        assert!(db.store_log(log_1.clone()).await.is_ok());
1375        assert!(db.set_logs_processed(Some(last_processed_block), Some(0)).await.is_ok());
1376        assert!(db.update_logs_checksums().await.is_ok());
1377
1378        let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1379
1380        let mut rpc = MockHoprIndexerOps::new();
1381        rpc.expect_try_stream_logs()
1382            .once()
1383            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == last_processed_block + 1)
1384            .return_once(move |_, _, _| Ok(Box::pin(rx)));
1385
1386        rpc.expect_block_number()
1387            .times(3)
1388            .returning(move || Ok(last_processed_block + 1));
1389
1390        rpc.expect_get_hopr_balance()
1391            .once()
1392            .return_once(move |_| Ok(HoprBalance::zero()));
1393
1394        rpc.expect_get_hopr_allowance()
1395            .once()
1396            .return_once(move |_, _| Ok(HoprBalance::zero()));
1397
1398        let block = BlockWithLogs {
1399            block_id: last_processed_block + 1,
1400            logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, last_processed_block + 1, 23)?),
1401        };
1402
1403        tx.start_send(block)?;
1404
1405        let mut handlers = MockChainLogHandler::new();
1406        handlers.expect_contract_addresses().return_const(vec![addr]);
1407        handlers
1408            .expect_contract_address_topics()
1409            .withf(move |x| x == &addr)
1410            .return_const(vec![B256::from_slice(topic.as_ref())]);
1411        handlers
1412            .expect_contract_address_topics()
1413            .withf(move |x| x == &addr)
1414            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1415        handlers
1416            .expect_safe_address()
1417            .return_const(Address::new(b"my safe address 1234"));
1418        handlers
1419            .expect_contract_addresses_map()
1420            .return_const(ContractAddresses::default());
1421
1422        let indexer_cfg = IndexerConfig::new(0, false, false, None, "/tmp/test_data".to_string());
1423
1424        let (tx_events, _) = async_channel::unbounded();
1425        let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1426        indexer.start().await?;
1427
1428        Ok(())
1429    }
1430}