hopr_chain_indexer/
block.rs

1use std::{
2    path::Path,
3    sync::{
4        Arc,
5        atomic::{AtomicBool, AtomicU64, Ordering},
6    },
7};
8
9use alloy::sol_types::SolEvent;
10use futures::{
11    FutureExt, StreamExt,
12    future::AbortHandle,
13    stream::{self},
14};
15use hopr_bindings::hoprtoken::HoprToken::{Approval, Transfer};
16use hopr_chain_rpc::{BlockWithLogs, FilterSet, HoprIndexerRpcOperations};
17use hopr_chain_types::chain_events::SignificantChainEvent;
18use hopr_crypto_types::types::Hash;
19use hopr_db_sql::{HoprIndexerDb, info::HoprDbInfoOperations, logs::HoprDbLogOperations};
20use hopr_primitive_types::prelude::*;
21use tracing::{debug, error, info, trace};
22
23use crate::{
24    IndexerConfig,
25    errors::{CoreEthereumIndexerError, Result},
26    snapshot::{SnapshotInfo, SnapshotManager},
27    traits::ChainLogHandler,
28};
29
30#[cfg(all(feature = "prometheus", not(test)))]
31lazy_static::lazy_static! {
32    static ref METRIC_INDEXER_CURRENT_BLOCK: hopr_metrics::SimpleGauge =
33        hopr_metrics::SimpleGauge::new(
34            "hopr_indexer_block_number",
35            "Current last processed block number by the indexer",
36    ).unwrap();
37    static ref METRIC_INDEXER_CHECKSUM: hopr_metrics::SimpleGauge =
38        hopr_metrics::SimpleGauge::new(
39            "hopr_indexer_checksum",
40            "Contains an unsigned integer that represents the low 32-bits of the Indexer checksum"
41    ).unwrap();
42    static ref METRIC_INDEXER_SYNC_PROGRESS: hopr_metrics::SimpleGauge =
43        hopr_metrics::SimpleGauge::new(
44            "hopr_indexer_sync_progress",
45            "Sync progress of the historical data by the indexer",
46    ).unwrap();
47    static ref METRIC_INDEXER_SYNC_SOURCE: hopr_metrics::MultiGauge =
48        hopr_metrics::MultiGauge::new(
49            "hopr_indexer_data_source",
50            "Current data source of the Indexer",
51            &["source"],
52    ).unwrap();
53
54}
55
56/// Indexer
57///
58/// Accepts the RPC operational functionality [hopr_chain_rpc::HoprIndexerRpcOperations]
59/// and provides the indexing operation resulting in and output of
60/// [hopr_chain_types::chain_events::SignificantChainEvent] streamed outside the indexer by the unbounded channel.
61///
62/// The roles of the indexer:
63/// 1. prime the RPC endpoint
64/// 2. request an RPC stream of changes to process
65/// 3. process block and log stream
66/// 4. ensure finalization by postponing processing until the head is far enough
67/// 5. store relevant data into the DB
68/// 6. pass the processing on to the business logic
69#[derive(Debug, Clone)]
70pub struct Indexer<T, U>
71where
72    T: HoprIndexerRpcOperations + Send + 'static,
73    U: ChainLogHandler + Send + 'static,
74{
75    rpc: Option<T>,
76    db_processor: Option<U>,
77    db: HoprIndexerDb,
78    cfg: IndexerConfig,
79    egress: futures::channel::mpsc::Sender<SignificantChainEvent>,
80    // If true (default), the indexer will panic if the event stream is terminated.
81    // Setting it to false is useful for testing.
82    panic_on_completion: bool,
83}
84
85impl<T, U> Indexer<T, U>
86where
87    T: HoprIndexerRpcOperations + Sync + Send + 'static,
88    U: ChainLogHandler + Send + Sync + 'static,
89{
90    pub fn new(
91        rpc: T,
92        db_processor: U,
93        db: HoprIndexerDb,
94        cfg: IndexerConfig,
95        egress: futures::channel::mpsc::Sender<SignificantChainEvent>,
96    ) -> Self {
97        Self {
98            rpc: Some(rpc),
99            db_processor: Some(db_processor),
100            db,
101            cfg,
102            egress,
103            panic_on_completion: true,
104        }
105    }
106
107    /// Disables the panic on completion.
108    pub fn without_panic_on_completion(mut self) -> Self {
109        self.panic_on_completion = false;
110        self
111    }
112
113    pub async fn start(mut self) -> Result<AbortHandle>
114    where
115        T: HoprIndexerRpcOperations + 'static,
116        U: ChainLogHandler + 'static,
117    {
118        if self.rpc.is_none() || self.db_processor.is_none() {
119            return Err(CoreEthereumIndexerError::ProcessError(
120                "indexer cannot start, missing components".into(),
121            ));
122        }
123
124        info!("Starting chain indexing");
125
126        let rpc = self.rpc.take().expect("rpc should be present");
127        let logs_handler = Arc::new(self.db_processor.take().expect("db_processor should be present"));
128        let db = self.db.clone();
129        let mut tx_significant_events = self.egress.clone();
130        let panic_on_completion = self.panic_on_completion;
131
132        let (log_filters, address_topics) = Self::generate_log_filters(&logs_handler);
133
134        // Check that the contract addresses and topics are consistent with what is in the logs DB,
135        // or if the DB is empty, prime it with the given addresses and topics.
136        db.ensure_logs_origin(address_topics).await?;
137
138        let is_synced = Arc::new(AtomicBool::new(false));
139        let chain_head = Arc::new(AtomicU64::new(0));
140
141        // update the chain head once at startup to get a reference for initial syncing
142        // progress calculation
143        debug!("Updating chain head at indexer startup");
144        Self::update_chain_head(&rpc, chain_head.clone()).await;
145
146        // First, check whether fast sync is enabled and can be performed.
147        // If so:
148        //   1. Download the snapshot if the logs database is empty and the snapshot is enabled
149        //   2. Delete the existing indexed data
150        //   3. Reset fast sync progress
151        //   4. Run the fast sync process until completion
152        //   5. Finally, starting the rpc indexer.
153        let fast_sync_configured = self.cfg.fast_sync;
154        let index_empty = self.db.index_is_empty().await?;
155
156        // Pre-start operations to ensure the indexer is ready, including snapshot fetching
157        self.pre_start().await?;
158
159        #[derive(PartialEq, Eq)]
160        enum FastSyncMode {
161            None,
162            FromScratch,
163            Continue,
164        }
165
166        let will_perform_fast_sync = match (fast_sync_configured, index_empty) {
167            (true, false) => {
168                info!(
169                    "Fast sync is enabled, but the index database is not empty. Fast sync will continue on existing \
170                     unprocessed logs."
171                );
172                FastSyncMode::Continue
173            }
174            (false, true) => {
175                info!("Fast sync is disabled, but the index database is empty. Doing a full re-sync.");
176                // Clean the last processed log from the Log DB, to allow full resync
177                self.db.clear_index_db(None).await?;
178                self.db.set_logs_unprocessed(None, None).await?;
179                FastSyncMode::None
180            }
181            (false, false) => {
182                info!("Fast sync is disabled and the index database is not empty. Continuing normal sync.");
183                FastSyncMode::None
184            }
185            (true, true) => {
186                info!("Fast sync is enabled, starting the fast sync process");
187                // To ensure a proper state, reset any auxiliary data in the database
188                self.db.clear_index_db(None).await?;
189                self.db.set_logs_unprocessed(None, None).await?;
190                FastSyncMode::FromScratch
191            }
192        };
193
194        let (tx, mut rx) = futures::channel::mpsc::channel::<()>(1);
195
196        // Perform the fast-sync if requested
197        if FastSyncMode::None != will_perform_fast_sync {
198            let processed = match will_perform_fast_sync {
199                FastSyncMode::FromScratch => None,
200                FastSyncMode::Continue => Some(false),
201                _ => unreachable!(),
202            };
203
204            #[cfg(all(feature = "prometheus", not(test)))]
205            {
206                METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 1.0);
207                METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 0.0);
208            }
209
210            let log_block_numbers = self.db.get_logs_block_numbers(None, None, processed).await?;
211            let _first_log_block_number = log_block_numbers.first().copied().unwrap_or(0);
212            let _head = chain_head.load(Ordering::Relaxed);
213            for block_number in log_block_numbers {
214                debug!(
215                    block_number,
216                    first_log_block_number = _first_log_block_number,
217                    head = _head,
218                    "computing processed logs"
219                );
220                // Do not pollute the logs with the fast-sync progress
221                Self::process_block_by_id(&db, &logs_handler, block_number, is_synced.load(Ordering::Relaxed)).await?;
222
223                #[cfg(all(feature = "prometheus", not(test)))]
224                {
225                    let progress =
226                        (block_number - _first_log_block_number) as f64 / (_head - _first_log_block_number) as f64;
227                    METRIC_INDEXER_SYNC_PROGRESS.set(progress);
228                }
229            }
230        }
231
232        info!("Building rpc indexer background process");
233
234        let next_block_to_process = if let Some(last_log) = self.db.get_last_checksummed_log().await? {
235            info!(
236                start_block = last_log.block_number,
237                start_checksum = last_log.checksum.unwrap(),
238                "Loaded indexer state",
239            );
240
241            if self.cfg.start_block_number < last_log.block_number {
242                // If some prior indexing took place already, avoid reprocessing
243                last_log.block_number + 1
244            } else {
245                self.cfg.start_block_number
246            }
247        } else {
248            self.cfg.start_block_number
249        };
250
251        info!(next_block_to_process, "Indexer start point");
252
253        let indexing_abort_handle = hopr_async_runtime::spawn_as_abortable!(
254            async move {
255                // Update the chain head once again
256                debug!("Updating chain head at indexer startup");
257                Self::update_chain_head(&rpc, chain_head.clone()).await;
258
259                #[cfg(all(feature = "prometheus", not(test)))]
260                {
261                    METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 0.0);
262                    METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 1.0);
263                }
264
265                let rpc_ref = &rpc;
266
267                let event_stream = rpc
268                    .try_stream_logs(next_block_to_process, log_filters, is_synced.load(Ordering::Relaxed))
269                    .expect("block stream should be constructible")
270                    .then(|block| {
271                        let db = db.clone();
272                        let chain_head = chain_head.clone();
273                        let is_synced = is_synced.clone();
274                        let tx = tx.clone();
275                        let logs_handler = logs_handler.clone();
276
277                        async move {
278                            Self::calculate_sync_process(
279                                block.block_id,
280                                rpc_ref,
281                                db,
282                                chain_head.clone(),
283                                is_synced.clone(),
284                                next_block_to_process,
285                                tx.clone(),
286                                logs_handler.safe_address().into(),
287                                logs_handler.contract_addresses_map().channels.into(),
288                            )
289                            .await;
290
291                            block
292                        }
293                    })
294                    .filter_map(|block| {
295                        let db = db.clone();
296                        let logs_handler = logs_handler.clone();
297
298                        async move {
299                            debug!(%block, "storing logs from block");
300                            let logs = block.logs.clone();
301
302                            // Filter out the token contract logs because we do not need to store these
303                            // in the database.
304                            let logs_vec = logs
305                                .into_iter()
306                                .filter(|log| log.address != logs_handler.contract_addresses_map().token)
307                                .collect();
308
309                            match db.store_logs(logs_vec).await {
310                                Ok(store_results) => {
311                                    if let Some(error) = store_results
312                                        .into_iter()
313                                        .filter(|r| r.is_err())
314                                        .map(|r| r.unwrap_err())
315                                        .next()
316                                    {
317                                        error!(%block, %error, "failed to processed stored logs from block");
318                                        None
319                                    } else {
320                                        Some(block)
321                                    }
322                                }
323                                Err(error) => {
324                                    error!(%block, %error, "failed to store logs from block");
325                                    None
326                                }
327                            }
328                        }
329                    })
330                    .filter_map(|block| {
331                        let db = db.clone();
332                        let logs_handler = logs_handler.clone();
333                        let is_synced = is_synced.clone();
334                        async move {
335                            Self::process_block(&db, &logs_handler, block, false, is_synced.load(Ordering::Relaxed))
336                                .await
337                        }
338                    })
339                    .flat_map(stream::iter);
340
341                futures::pin_mut!(event_stream);
342                while let Some(event) = event_stream.next().await {
343                    trace!(%event, "processing on-chain event");
344                    // Pass the events further only once we're fully synced
345                    if is_synced.load(Ordering::Relaxed) {
346                        if let Err(error) = tx_significant_events.try_send(event) {
347                            error!(%error, "failed to pass a significant chain event further");
348                        }
349                    }
350                }
351
352                if panic_on_completion {
353                    panic!(
354                        "Indexer event stream has been terminated. This error may be caused by a failed RPC \
355                         connection."
356                    );
357                }
358            }
359            .inspect(|_| tracing::warn!(task = "indexer", "long-running background task finished"))
360        );
361
362        if rx.next().await.is_some() {
363            Ok(indexing_abort_handle)
364        } else {
365            Err(crate::errors::CoreEthereumIndexerError::ProcessError(
366                "Error during indexing start".into(),
367            ))
368        }
369    }
370
371    pub async fn pre_start(&self) -> Result<()> {
372        let fast_sync_configured = self.cfg.fast_sync;
373        let index_empty = self.db.index_is_empty().await?;
374
375        // Check if we need to download snapshot before fast sync
376        let logs_db_has_data = self.has_logs_data().await?;
377
378        if fast_sync_configured && index_empty && !logs_db_has_data && self.cfg.enable_logs_snapshot {
379            info!("Logs database is empty, attempting to download logs snapshot...");
380
381            match self.download_snapshot().await {
382                Ok(snapshot_info) => {
383                    info!("Logs snapshot downloaded successfully: {:?}", snapshot_info);
384                }
385                Err(e) => {
386                    error!("Failed to download logs snapshot: {}. Continuing with regular sync.", e);
387                }
388            }
389        }
390
391        Ok(())
392    }
393
394    /// Generates specialized log filters for efficient blockchain event processing.
395    ///
396    /// This function creates a comprehensive set of log filters that optimize
397    /// indexer performance by categorizing filters based on contract types and
398    /// event relevance to the specific node.
399    ///
400    /// # Arguments
401    /// * `logs_handler` - Handler containing contract addresses and safe address
402    ///
403    /// # Returns
404    /// * `(FilterSet, Vec<(Address, Hash)>)` - A tuple containing:
405    ///   - `FilterSet`: Categorized filters for blockchain event processing.
406    ///   - `Vec<(Address, Hash)>`: A vector of address-topic pairs for logs origin validation.
407    ///
408    /// # Filter Categories
409    /// * `all` - Complete set of filters for normal operation
410    /// * `token` - Token-specific filters (Transfer, Approval events for safe)
411    /// * `no_token` - Non-token contract filters for initial sync optimization
412    fn generate_log_filters(logs_handler: &U) -> (FilterSet, Vec<(Address, Hash)>) {
413        let safe_address = logs_handler.safe_address();
414        let addresses_no_token = logs_handler
415            .contract_addresses()
416            .into_iter()
417            .filter(|a| *a != logs_handler.contract_addresses_map().token)
418            .collect::<Vec<_>>();
419        let mut filter_base_addresses = vec![];
420        let mut filter_base_topics = vec![];
421        let mut address_topics = vec![];
422
423        addresses_no_token.iter().for_each(|address| {
424            let topics = logs_handler.contract_address_topics(*address);
425            if !topics.is_empty() {
426                filter_base_addresses.push(alloy::primitives::Address::from(*address));
427                filter_base_topics.extend(topics.clone());
428                for topic in topics.iter() {
429                    address_topics.push((*address, Hash::from(topic.0)))
430                }
431            }
432        });
433
434        let filter_base = alloy::rpc::types::Filter::new()
435            .address(filter_base_addresses)
436            .event_signature(filter_base_topics);
437        let filter_token = alloy::rpc::types::Filter::new().address(alloy::primitives::Address::from(
438            logs_handler.contract_addresses_map().token,
439        ));
440
441        let filter_transfer_to = filter_token
442            .clone()
443            .event_signature(Transfer::SIGNATURE_HASH)
444            .topic2(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()));
445
446        let filter_transfer_from = filter_token
447            .clone()
448            .event_signature(Transfer::SIGNATURE_HASH)
449            .topic1(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()));
450
451        let filter_approval = filter_token
452            .event_signature(Approval::SIGNATURE_HASH)
453            .topic1(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()))
454            .topic2(alloy::primitives::B256::from_slice(
455                logs_handler.contract_addresses_map().channels.to_bytes32().as_ref(),
456            ));
457
458        let set = FilterSet {
459            all: vec![
460                filter_base.clone(),
461                filter_transfer_from.clone(),
462                filter_transfer_to.clone(),
463                filter_approval.clone(),
464            ],
465            token: vec![filter_transfer_from, filter_transfer_to, filter_approval],
466            no_token: vec![filter_base],
467        };
468
469        (set, address_topics)
470    }
471
472    /// Processes a block by its ID.
473    ///
474    /// This function retrieves logs for the given block ID and processes them using the database
475    /// and log handler.
476    ///
477    /// # Arguments
478    ///
479    /// * `db` - The database operations handler.
480    /// * `logs_handler` - The database log handler.
481    /// * `block_id` - The ID of the block to process.
482    ///
483    /// # Returns
484    ///
485    /// A `Result` containing an optional vector of significant chain events if the operation succeeds or an error if it
486    /// fails.
487    async fn process_block_by_id(
488        db: &HoprIndexerDb,
489        logs_handler: &U,
490        block_id: u64,
491        is_synced: bool,
492    ) -> crate::errors::Result<Option<Vec<SignificantChainEvent>>>
493    where
494        U: ChainLogHandler + 'static,
495    {
496        let logs = db.get_logs(Some(block_id), Some(0)).await?;
497        let mut block = BlockWithLogs {
498            block_id,
499            ..Default::default()
500        };
501
502        for log in logs {
503            if log.block_number == block_id {
504                block.logs.insert(log);
505            } else {
506                error!(
507                    expected = block_id,
508                    actual = log.block_number,
509                    "block number mismatch in logs from database"
510                );
511                panic!("block number mismatch in logs from database")
512            }
513        }
514
515        Ok(Self::process_block(db, logs_handler, block, true, is_synced).await)
516    }
517
518    /// Processes a block and its logs.
519    ///
520    /// This function collects events from the block logs and updates the database with the processed logs.
521    ///
522    /// # Arguments
523    ///
524    /// * `db` - The database operations handler.
525    /// * `logs_handler` - The database log handler.
526    /// * `block` - The block with logs to process.
527    /// * `fetch_checksum_from_db` - A boolean indicating whether to fetch the checksum from the database.
528    ///
529    /// # Returns
530    ///
531    /// An optional vector of significant chain events if the operation succeeds.
532    async fn process_block(
533        db: &HoprIndexerDb,
534        logs_handler: &U,
535        block: BlockWithLogs,
536        fetch_checksum_from_db: bool,
537        is_synced: bool,
538    ) -> Option<Vec<SignificantChainEvent>>
539    where
540        U: ChainLogHandler + 'static,
541    {
542        let block_id = block.block_id;
543        let log_count = block.logs.len();
544        debug!(block_id, "processing events");
545
546        // FIXME: The block indexing and marking as processed should be done in a single
547        // transaction. This is difficult since currently this would be across databases.
548        let events = stream::iter(block.logs.clone())
549            .filter_map(|log| async move {
550                match logs_handler.collect_log_event(log.clone(), is_synced).await {
551                    Ok(data) => match db.set_log_processed(log).await {
552                        Ok(_) => data,
553                        Err(error) => {
554                            error!(block_id, %error, "failed to mark log as processed, panicking to prevent data loss");
555                            panic!("failed to mark log as processed, panicking to prevent data loss")
556                        }
557                    },
558                    Err(CoreEthereumIndexerError::ProcessError(error)) => {
559                        error!(block_id, %error, "failed to process log into event, continuing indexing");
560                        None
561                    }
562                    Err(error) => {
563                        error!(block_id, %error, "failed to process log into event, panicking to prevent data loss");
564                        panic!("failed to process log into event, panicking to prevent data loss")
565                    }
566                }
567            })
568            .collect::<Vec<SignificantChainEvent>>()
569            .await;
570
571        // if we made it this far, no errors occurred and we can update checksums and indexer state
572        match db.update_logs_checksums().await {
573            Ok(last_log_checksum) => {
574                let checksum = if fetch_checksum_from_db {
575                    let last_log = block.logs.into_iter().next_back()?;
576                    let log = db.get_log(block_id, last_log.tx_index, last_log.log_index).await.ok()?;
577
578                    log.checksum?
579                } else {
580                    last_log_checksum.to_string()
581                };
582
583                if log_count != 0 {
584                    info!(
585                        block_number = block_id,
586                        log_count, last_log_checksum = ?checksum, "Indexer state update",
587                    );
588
589                    #[cfg(all(feature = "prometheus", not(test)))]
590                    {
591                        if let Ok(checksum_hash) = Hash::from_hex(checksum.as_str()) {
592                            let low_4_bytes =
593                                hopr_primitive_types::prelude::U256::from_big_endian(checksum_hash.as_ref()).low_u32();
594                            METRIC_INDEXER_CHECKSUM.set(low_4_bytes.into());
595                        } else {
596                            error!("Invalid checksum generated from logs");
597                        }
598                    }
599                }
600
601                // finally update the block number in the database to the last processed block
602                match db.set_indexer_state_info(None, block_id as u32).await {
603                    Ok(_) => {
604                        trace!(block_id, "updated indexer state info");
605                    }
606                    Err(error) => error!(block_id, %error, "failed to update indexer state info"),
607                }
608            }
609            Err(error) => error!(block_id, %error, "failed to update checksums for logs from block"),
610        }
611
612        debug!(
613            block_id,
614            num_events = events.len(),
615            "processed significant chain events from block",
616        );
617
618        Some(events)
619    }
620
621    async fn update_chain_head(rpc: &T, chain_head: Arc<AtomicU64>) -> u64
622    where
623        T: HoprIndexerRpcOperations + 'static,
624    {
625        match rpc.block_number().await {
626            Ok(head) => {
627                chain_head.store(head, Ordering::Relaxed);
628                debug!(head, "Updated chain head");
629                head
630            }
631            Err(error) => {
632                error!(%error, "Failed to fetch block number from RPC");
633                panic!("Failed to fetch block number from RPC, cannot continue indexing due to {error}")
634            }
635        }
636    }
637
638    /// Calculates the synchronization progress.
639    ///
640    /// This function processes a block and updates synchronization metrics and state.
641    ///
642    /// # Arguments
643    ///
644    /// * `block` - The block with logs to process.
645    /// * `rpc` - The RPC operations handler.
646    /// * `chain_head` - The current chain head block number.
647    /// * `is_synced` - A boolean indicating whether the indexer is synced.
648    /// * `start_block` - The first block number to process.
649    /// * `tx` - A sender channel for synchronization notifications.
650    ///
651    /// # Returns
652    ///
653    /// The block which was provided as input.
654    #[allow(clippy::too_many_arguments)]
655    async fn calculate_sync_process(
656        current_block: u64,
657        rpc: &T,
658        db: HoprIndexerDb,
659        chain_head: Arc<AtomicU64>,
660        is_synced: Arc<AtomicBool>,
661        next_block_to_process: u64,
662        mut tx: futures::channel::mpsc::Sender<()>,
663        safe_address: Option<Address>,
664        channels_address: Option<Address>,
665    ) where
666        T: HoprIndexerRpcOperations + 'static,
667    {
668        #[cfg(all(feature = "prometheus", not(test)))]
669        {
670            METRIC_INDEXER_CURRENT_BLOCK.set(current_block as f64);
671        }
672
673        let mut head = chain_head.load(Ordering::Relaxed);
674
675        // We only print out sync progress if we are not yet synced.
676        // Once synced, we don't print out progress anymore.
677        if !is_synced.load(Ordering::Relaxed) {
678            let mut block_difference = head.saturating_sub(next_block_to_process);
679
680            let progress = if block_difference == 0 {
681                // Before we call the sync complete, we check the chain again.
682                head = Self::update_chain_head(rpc, chain_head.clone()).await;
683                block_difference = head.saturating_sub(next_block_to_process);
684
685                if block_difference == 0 {
686                    1_f64
687                } else {
688                    (current_block - next_block_to_process) as f64 / block_difference as f64
689                }
690            } else {
691                (current_block - next_block_to_process) as f64 / block_difference as f64
692            };
693
694            info!(
695                progress = progress * 100_f64,
696                block = current_block,
697                head,
698                "Sync progress to last known head"
699            );
700
701            #[cfg(all(feature = "prometheus", not(test)))]
702            METRIC_INDEXER_SYNC_PROGRESS.set(progress);
703
704            if current_block >= head {
705                info!("indexer sync completed successfully");
706                is_synced.store(true, Ordering::Relaxed);
707
708                if let Some(safe_address) = safe_address {
709                    info!("updating safe balance from chain after indexer sync completed");
710                    match rpc.get_hopr_balance(safe_address).await {
711                        Ok(balance) => {
712                            if let Err(error) = db.set_safe_hopr_balance(None, balance).await {
713                                error!(%error, "failed to update safe balance from chain after indexer sync completed");
714                            }
715                        }
716                        Err(error) => {
717                            error!(%error, "failed to fetch safe balance from chain after indexer sync completed");
718                        }
719                    }
720                }
721
722                if let Some((channels_address, safe_address)) = channels_address.zip(safe_address) {
723                    info!("updating safe allowance from chain after indexer sync completed");
724                    match rpc.get_hopr_allowance(safe_address, channels_address).await {
725                        Ok(allowance) => {
726                            if let Err(error) = db.set_safe_hopr_allowance(None, allowance).await {
727                                error!(%error, "failed to update safe allowance from chain after indexer sync completed");
728                            }
729                        }
730                        Err(error) => {
731                            error!(%error, "failed to fetch safe allowance from chain after indexer sync completed");
732                        }
733                    }
734                }
735
736                if let Err(error) = tx.try_send(()) {
737                    error!(%error, "failed to notify about achieving indexer synchronization")
738                }
739            }
740        }
741    }
742
743    /// Checks if the logs database has any existing data.
744    ///
745    /// This method determines whether the database already contains logs, which helps
746    /// decide whether to download a snapshot for faster synchronization. It queries
747    /// the database for the total log count and returns an error if the query fails
748    /// (e.g., when the database doesn't exist yet).
749    ///
750    /// # Returns
751    ///
752    /// - `Ok(true)` if the database contains one or more logs
753    /// - `Ok(false)` if the database is empty
754    /// - `Err(e)` if the database cannot be queried
755    async fn has_logs_data(&self) -> Result<bool> {
756        self.db
757            .get_logs_count(None, None)
758            .await
759            .map(|count| count > 0)
760            .map_err(|e| CoreEthereumIndexerError::SnapshotError(e.to_string()))
761    }
762
763    /// Downloads and installs a database snapshot for faster initial synchronization.
764    ///
765    /// This method coordinates the snapshot download process by:
766    /// 1. Validating the indexer configuration
767    /// 2. Creating a snapshot manager instance
768    /// 3. Downloading and extracting the snapshot to the data directory
769    ///
770    /// Snapshots allow new nodes to quickly synchronize with the network by downloading
771    /// pre-built database files instead of fetching all historical logs from scratch.
772    ///
773    /// # Returns
774    ///
775    /// - `Ok(SnapshotInfo)` containing details about the downloaded snapshot
776    /// - `Err(CoreEthereumIndexerError::SnapshotError)` if validation or download fails
777    ///
778    /// # Prerequisites
779    ///
780    /// - Configuration must be valid (proper URL format, data directory set)
781    /// - Sufficient disk space must be available
782    /// - Network connectivity to the snapshot URL
783    pub async fn download_snapshot(&self) -> Result<SnapshotInfo> {
784        // Validate config before proceeding
785        if let Err(e) = self.cfg.validate() {
786            return Err(CoreEthereumIndexerError::SnapshotError(e.to_string()));
787        }
788
789        let snapshot_manager = SnapshotManager::with_db(self.db.clone())
790            .map_err(|e| CoreEthereumIndexerError::SnapshotError(e.to_string()))?;
791
792        let data_dir = Path::new(&self.cfg.data_directory);
793
794        // The URL has been verified so we can just use it.
795        if let Some(url) = &self.cfg.logs_snapshot_url {
796            snapshot_manager
797                .download_and_setup_snapshot(url, data_dir)
798                .await
799                .map_err(|e| CoreEthereumIndexerError::SnapshotError(e.to_string()))
800        } else {
801            Err(CoreEthereumIndexerError::SnapshotError(
802                "Logs snapshot URL is not configured".to_string(),
803            ))
804        }
805    }
806}
807
808#[cfg(test)]
809mod tests {
810    use std::{collections::BTreeSet, pin::Pin};
811
812    use alloy::{
813        dyn_abi::DynSolValue,
814        primitives::{Address as AlloyAddress, B256},
815        sol_types::SolEvent,
816    };
817    use async_trait::async_trait;
818    use futures::{Stream, join, pin_mut};
819    use hex_literal::hex;
820    use hopr_chain_rpc::BlockWithLogs;
821    use hopr_chain_types::{ContractAddresses, chain_events::ChainEventType};
822    use hopr_crypto_types::{
823        keypairs::{Keypair, OffchainKeypair},
824        prelude::ChainKeypair,
825    };
826    use hopr_db_sql::{HoprIndexerDb, accounts::HoprDbAccountOperations};
827    use hopr_internal_types::account::{AccountEntry, AccountType};
828    use hopr_primitive_types::prelude::*;
829    use mockall::mock;
830    use multiaddr::Multiaddr;
831
832    use super::*;
833    use crate::traits::MockChainLogHandler;
834
835    lazy_static::lazy_static! {
836        static ref ALICE_OKP: OffchainKeypair = OffchainKeypair::random();
837        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
838        static ref ALICE: Address = ALICE_KP.public().to_address();
839        static ref BOB_OKP: OffchainKeypair = OffchainKeypair::random();
840        static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
841        static ref CHRIS: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
842
843        static ref RANDOM_ANNOUNCEMENT_CHAIN_EVENT: ChainEventType = ChainEventType::Announcement {
844            peer: (*OffchainKeypair::from_secret(&hex!("14d2d952715a51aadbd4cc6bfac9aa9927182040da7b336d37d5bb7247aa7566")).expect("lazy static keypair should be constructible").public()).into(),
845            address: hex!("2f4b7662a192b8125bbf51cfbf1bf5cc00b2c8e5").into(),
846            multiaddresses: vec![Multiaddr::empty()],
847        };
848    }
849
850    fn build_announcement_logs(
851        address: Address,
852        size: usize,
853        block_number: u64,
854        starting_log_index: u64,
855    ) -> anyhow::Result<Vec<SerializableLog>> {
856        let mut logs: Vec<SerializableLog> = vec![];
857        let block_hash = Hash::create(&[format!("my block hash {block_number}").as_bytes()]);
858
859        for i in 0..size {
860            let test_multiaddr: Multiaddr = format!("/ip4/1.2.3.4/tcp/{}", 1000 + i).parse()?;
861            let tx_index: u64 = i as u64;
862            let log_index: u64 = starting_log_index + tx_index;
863
864            logs.push(SerializableLog {
865                address,
866                block_hash: block_hash.into(),
867                topics: vec![hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH.into()],
868                data: DynSolValue::Tuple(vec![
869                    DynSolValue::Address(AlloyAddress::from_slice(address.as_ref())),
870                    DynSolValue::String(test_multiaddr.to_string()),
871                ])
872                .abi_encode(),
873                tx_hash: Hash::create(&[format!("my tx hash {i}").as_bytes()]).into(),
874                tx_index,
875                block_number,
876                log_index,
877                ..Default::default()
878            });
879        }
880
881        Ok(logs)
882    }
883
884    mock! {
885        HoprIndexerOps {}     // Name of the mock struct, less the "Mock" prefix
886
887        #[async_trait]
888        impl HoprIndexerRpcOperations for HoprIndexerOps {
889            async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
890            async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
891            async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance>;
892            async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
893
894            fn try_stream_logs<'a>(
895                &'a self,
896                start_block_number: u64,
897                filters: FilterSet,
898                is_synced: bool,
899            ) -> hopr_chain_rpc::errors::Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>>;
900        }
901    }
902
903    #[tokio::test]
904    async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_none_if_none_is_found()
905    -> anyhow::Result<()> {
906        let mut handlers = MockChainLogHandler::new();
907        let mut rpc = MockHoprIndexerOps::new();
908        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
909
910        let addr = Address::new(b"my address 123456789");
911        let topic = Hash::create(&[b"my topic"]);
912        db.ensure_logs_origin(vec![(addr, topic)]).await?;
913
914        handlers.expect_contract_addresses().return_const(vec![addr]);
915        handlers
916            .expect_contract_address_topics()
917            .withf(move |x| x == &addr)
918            .return_const(vec![B256::from_slice(topic.as_ref())]);
919        handlers
920            .expect_contract_address_topics()
921            .withf(move |x| x == &addr)
922            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
923        handlers
924            .expect_safe_address()
925            .return_const(Address::new(b"my safe address 1234"));
926        handlers
927            .expect_contract_addresses_map()
928            .return_const(ContractAddresses::default());
929
930        let head_block = 1000;
931        rpc.expect_block_number().times(2).returning(move || Ok(head_block));
932
933        let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
934        rpc.expect_try_stream_logs()
935            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
936            .return_once(move |_, _, _| Ok(Box::pin(rx)));
937
938        let indexer = Indexer::new(
939            rpc,
940            handlers,
941            db.clone(),
942            IndexerConfig::default(),
943            futures::channel::mpsc::channel(1000).0,
944        )
945        .without_panic_on_completion();
946
947        let (indexing, _) = join!(indexer.start(), async move {
948            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
949            tx.close_channel()
950        });
951        assert!(indexing.is_err()); // terminated by the close channel
952
953        Ok(())
954    }
955
956    #[tokio::test]
957    async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_it_when_found() -> anyhow::Result<()>
958    {
959        let mut handlers = MockChainLogHandler::new();
960        let mut rpc = MockHoprIndexerOps::new();
961        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
962        let head_block = 1000;
963        let latest_block = 15u64;
964
965        let addr = Address::new(b"my address 123456789");
966        let topic = Hash::create(&[b"my topic"]);
967
968        handlers.expect_contract_addresses().return_const(vec![addr]);
969        handlers
970            .expect_contract_address_topics()
971            .withf(move |x| x == &addr)
972            .return_const(vec![B256::from_slice(topic.as_ref())]);
973        handlers
974            .expect_contract_address_topics()
975            .withf(move |x| x == &addr)
976            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
977        handlers
978            .expect_safe_address()
979            .return_const(Address::new(b"my safe address 1234"));
980        handlers
981            .expect_contract_addresses_map()
982            .return_const(ContractAddresses::default());
983
984        db.ensure_logs_origin(vec![(addr, topic)]).await?;
985
986        rpc.expect_block_number().times(2).returning(move || Ok(head_block));
987
988        let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
989        rpc.expect_try_stream_logs()
990            .once()
991            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == latest_block + 1)
992            .return_once(move |_, _, _| Ok(Box::pin(rx)));
993
994        // insert and process latest block
995        let log_1 = SerializableLog {
996            address: Address::new(b"my address 123456789"),
997            topics: [Hash::create(&[b"my topic"]).into()].into(),
998            data: [1, 2, 3, 4].into(),
999            tx_index: 1u64,
1000            block_number: latest_block,
1001            block_hash: Hash::create(&[b"my block hash"]).into(),
1002            tx_hash: Hash::create(&[b"my tx hash"]).into(),
1003            log_index: 1u64,
1004            removed: false,
1005            processed: Some(false),
1006            ..Default::default()
1007        };
1008        assert!(db.store_log(log_1.clone()).await.is_ok());
1009        assert!(db.set_logs_processed(Some(latest_block), Some(0)).await.is_ok());
1010        assert!(db.update_logs_checksums().await.is_ok());
1011
1012        let indexer = Indexer::new(
1013            rpc,
1014            handlers,
1015            db.clone(),
1016            IndexerConfig {
1017                fast_sync: false,
1018                ..Default::default()
1019            },
1020            futures::channel::mpsc::channel(1000).0,
1021        )
1022        .without_panic_on_completion();
1023
1024        let (indexing, _) = join!(indexer.start(), async move {
1025            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1026            tx.close_channel()
1027        });
1028        assert!(indexing.is_err()); // terminated by the close channel
1029
1030        Ok(())
1031    }
1032
1033    #[tokio::test]
1034    async fn test_indexer_should_pass_blocks_that_are_finalized() -> anyhow::Result<()> {
1035        let mut handlers = MockChainLogHandler::new();
1036        let mut rpc = MockHoprIndexerOps::new();
1037        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1038
1039        let cfg = IndexerConfig::default();
1040
1041        let addr = Address::new(b"my address 123456789");
1042        handlers.expect_contract_addresses().return_const(vec![addr]);
1043        handlers
1044            .expect_contract_address_topics()
1045            .withf(move |x| x == &addr)
1046            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1047        handlers
1048            .expect_safe_address()
1049            .return_const(Address::new(b"my safe address 1234"));
1050        handlers
1051            .expect_contract_addresses_map()
1052            .return_const(ContractAddresses::default());
1053
1054        let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1055        rpc.expect_try_stream_logs()
1056            .times(1)
1057            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
1058            .return_once(move |_, _, _| Ok(Box::pin(rx)));
1059
1060        let head_block = 1000;
1061        rpc.expect_block_number().returning(move || Ok(head_block));
1062
1063        rpc.expect_get_hopr_balance()
1064            .withf(move |x| x == &Address::new(b"my safe address 1234"))
1065            .returning(move |_| Ok(HoprBalance::default()));
1066
1067        rpc.expect_get_hopr_allowance()
1068            .withf(move |x, y| x == &Address::new(b"my safe address 1234") && y == &Address::from([0; 20]))
1069            .returning(move |_, _| Ok(HoprBalance::default()));
1070
1071        let finalized_block = BlockWithLogs {
1072            block_id: head_block - 1,
1073            logs: BTreeSet::from_iter(build_announcement_logs(*BOB, 4, head_block - 1, 23)?),
1074        };
1075        let head_allowing_finalization = BlockWithLogs {
1076            block_id: head_block,
1077            logs: BTreeSet::new(),
1078        };
1079
1080        // called once per block which is finalizable
1081        handlers
1082            .expect_collect_log_event()
1083            // .times(2)
1084            .times(finalized_block.logs.len())
1085            .returning(|_, _| Ok(None));
1086
1087        assert!(tx.start_send(finalized_block.clone()).is_ok());
1088        assert!(tx.start_send(head_allowing_finalization.clone()).is_ok());
1089
1090        let indexer = Indexer::new(rpc, handlers, db.clone(), cfg, futures::channel::mpsc::channel(1000).0)
1091            .without_panic_on_completion();
1092        let _ = join!(indexer.start(), async move {
1093            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1094            tx.close_channel()
1095        });
1096
1097        Ok(())
1098    }
1099
1100    #[test_log::test(tokio::test)]
1101    async fn test_indexer_fast_sync_full_with_resume() -> anyhow::Result<()> {
1102        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1103
1104        let addr = Address::new(b"my address 123456789");
1105        let topic = Hash::create(&[b"my topic"]);
1106
1107        // Run 1: Fast sync enabled, index empty
1108        {
1109            let logs = vec![
1110                build_announcement_logs(*BOB, 1, 1, 1)?,
1111                build_announcement_logs(*BOB, 1, 2, 1)?,
1112            ]
1113            .into_iter()
1114            .flatten()
1115            .collect::<Vec<_>>();
1116
1117            assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1118
1119            for log in logs {
1120                assert!(db.store_log(log).await.is_ok());
1121            }
1122            assert!(db.update_logs_checksums().await.is_ok());
1123            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 0);
1124            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
1125
1126            let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1127            let (tx_events, _) = futures::channel::mpsc::channel(1000);
1128
1129            let head_block = 5;
1130            let mut rpc = MockHoprIndexerOps::new();
1131            rpc.expect_block_number().returning(move || Ok(head_block));
1132            rpc.expect_try_stream_logs()
1133                .times(1)
1134                .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 3)
1135                .return_once(move |_, _, _| Ok(Box::pin(rx)));
1136
1137            let mut handlers = MockChainLogHandler::new();
1138            handlers.expect_contract_addresses().return_const(vec![addr]);
1139            handlers
1140                .expect_contract_address_topics()
1141                .withf(move |x| x == &addr)
1142                .return_const(vec![B256::from_slice(topic.as_ref())]);
1143            handlers
1144                .expect_collect_log_event()
1145                .times(2)
1146                .withf(move |l, _| [1, 2].contains(&l.block_number))
1147                .returning(|_, _| Ok(None));
1148            handlers
1149                .expect_contract_address_topics()
1150                .withf(move |x| x == &addr)
1151                .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1152            handlers
1153                .expect_safe_address()
1154                .return_const(Address::new(b"my safe address 1234"));
1155            handlers
1156                .expect_contract_addresses_map()
1157                .return_const(ContractAddresses::default());
1158
1159            let indexer_cfg = IndexerConfig::new(0, true, false, None, "/tmp/test_data".to_string());
1160            let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1161            let (indexing, _) = join!(indexer.start(), async move {
1162                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1163                tx.close_channel()
1164            });
1165            assert!(indexing.is_err()); // terminated by the close channel
1166
1167            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
1168            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
1169
1170            // At the end we need to simulate that the index is not empty,
1171            // thus storing some data.
1172            db.insert_account(
1173                None,
1174                AccountEntry {
1175                    public_key: *ALICE_OKP.public(),
1176                    chain_addr: *ALICE,
1177                    entry_type: AccountType::NotAnnounced,
1178                    published_at: 1,
1179                },
1180            )
1181            .await?;
1182            db.insert_account(
1183                None,
1184                AccountEntry {
1185                    public_key: *BOB_OKP.public(),
1186                    chain_addr: *BOB,
1187                    entry_type: AccountType::NotAnnounced,
1188                    published_at: 1,
1189                },
1190            )
1191            .await?;
1192        }
1193
1194        // Run 2: Fast sync enabled, index not empty, resume after 2 logs
1195        {
1196            let logs = vec![
1197                build_announcement_logs(*BOB, 1, 3, 1)?,
1198                build_announcement_logs(*BOB, 1, 4, 1)?,
1199            ]
1200            .into_iter()
1201            .flatten()
1202            .collect::<Vec<_>>();
1203
1204            assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1205
1206            for log in logs {
1207                assert!(db.store_log(log).await.is_ok());
1208            }
1209            assert!(db.update_logs_checksums().await.is_ok());
1210            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
1211            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
1212
1213            let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1214            let (tx_events, _) = futures::channel::mpsc::channel(1000);
1215
1216            let head_block = 5;
1217            let mut rpc = MockHoprIndexerOps::new();
1218            rpc.expect_block_number().returning(move || Ok(head_block));
1219            rpc.expect_try_stream_logs()
1220                .times(1)
1221                .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 5)
1222                .return_once(move |_, _, _| Ok(Box::pin(rx)));
1223
1224            let mut handlers = MockChainLogHandler::new();
1225            handlers.expect_contract_addresses().return_const(vec![addr]);
1226            handlers
1227                .expect_contract_address_topics()
1228                .withf(move |x| x == &addr)
1229                .return_const(vec![B256::from_slice(topic.as_ref())]);
1230
1231            handlers
1232                .expect_collect_log_event()
1233                .times(2)
1234                .withf(move |l, _| [3, 4].contains(&l.block_number))
1235                .returning(|_, _| Ok(None));
1236            handlers
1237                .expect_contract_address_topics()
1238                .withf(move |x| x == &addr)
1239                .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1240            handlers
1241                .expect_safe_address()
1242                .return_const(Address::new(b"my safe address 1234"));
1243            handlers
1244                .expect_contract_addresses_map()
1245                .return_const(ContractAddresses::default());
1246
1247            let indexer_cfg = IndexerConfig::new(0, true, false, None, "/tmp/test_data".to_string());
1248            let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1249            let (indexing, _) = join!(indexer.start(), async move {
1250                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1251                tx.close_channel()
1252            });
1253            assert!(indexing.is_err()); // terminated by the close channel
1254
1255            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 4);
1256            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
1257        }
1258
1259        Ok(())
1260    }
1261
1262    #[test_log::test(tokio::test)]
1263    async fn test_indexer_should_yield_back_once_the_past_events_are_indexed() -> anyhow::Result<()> {
1264        let mut handlers = MockChainLogHandler::new();
1265        let mut rpc = MockHoprIndexerOps::new();
1266        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1267
1268        let cfg = IndexerConfig::default();
1269
1270        // We don't want to index anything really
1271        let addr = Address::new(b"my address 123456789");
1272        handlers.expect_contract_addresses().return_const(vec![addr]);
1273        handlers
1274            .expect_contract_address_topics()
1275            .withf(move |x| x == &addr)
1276            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1277        handlers
1278            .expect_contract_address_topics()
1279            .withf(move |x| x == &addr)
1280            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1281        handlers
1282            .expect_safe_address()
1283            .return_const(Address::new(b"my safe address 1234"));
1284        handlers
1285            .expect_contract_addresses_map()
1286            .return_const(ContractAddresses::default());
1287
1288        let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1289        // Expected to be called once starting at 0 and yield the respective blocks
1290        rpc.expect_try_stream_logs()
1291            .times(1)
1292            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
1293            .return_once(move |_, _, _| Ok(Box::pin(rx)));
1294        rpc.expect_get_hopr_balance()
1295            .once()
1296            .return_once(move |_| Ok(HoprBalance::zero()));
1297        rpc.expect_get_hopr_allowance()
1298            .once()
1299            .return_once(move |_, _| Ok(HoprBalance::zero()));
1300
1301        let head_block = 1000;
1302        let block_numbers = [head_block - 1, head_block, head_block + 1];
1303
1304        let blocks: Vec<BlockWithLogs> = block_numbers
1305            .iter()
1306            .map(|block_id| BlockWithLogs {
1307                block_id: *block_id,
1308                logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, *block_id, 23).unwrap()),
1309            })
1310            .collect();
1311
1312        for _ in 0..(blocks.len() as u64) {
1313            rpc.expect_block_number().returning(move || Ok(head_block));
1314        }
1315
1316        for block in blocks.iter() {
1317            assert!(tx.start_send(block.clone()).is_ok());
1318        }
1319
1320        // Generate the expected events to be able to process the blocks
1321        handlers
1322            .expect_collect_log_event()
1323            .times(3)
1324            .withf(move |l, _| block_numbers.contains(&l.block_number))
1325            .returning(|l, _| {
1326                let block_number = l.block_number;
1327                Ok(Some(SignificantChainEvent {
1328                    tx_hash: Hash::create(&[format!("my tx hash {block_number}").as_bytes()]),
1329                    event_type: RANDOM_ANNOUNCEMENT_CHAIN_EVENT.clone(),
1330                }))
1331            });
1332
1333        let (tx_events, rx_events) = futures::channel::mpsc::channel(1000);
1334        let indexer = Indexer::new(rpc, handlers, db.clone(), cfg, tx_events).without_panic_on_completion();
1335        indexer.start().await?;
1336
1337        // At this point we expect 2 events to arrive. The third event, which was generated first,
1338        // should be dropped because it was generated before the indexer was in sync with head.
1339        pin_mut!(rx_events);
1340        tokio::time::timeout(std::time::Duration::from_millis(100), rx_events.next())
1341            .await?
1342            .unwrap();
1343        tokio::time::timeout(std::time::Duration::from_millis(100), rx_events.next())
1344            .await?
1345            .unwrap();
1346
1347        // Must time out
1348        assert!(
1349            tokio::time::timeout(std::time::Duration::from_millis(100), rx_events.next())
1350                .await
1351                .is_err()
1352        );
1353
1354        Ok(())
1355    }
1356
1357    #[test_log::test(tokio::test)]
1358    async fn test_indexer_should_not_reprocess_last_processed_block() -> anyhow::Result<()> {
1359        let last_processed_block = 100_u64;
1360
1361        let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1362
1363        let addr = Address::new(b"my address 123456789");
1364        let topic = Hash::create(&[b"my topic"]);
1365        assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1366
1367        // insert and process latest block
1368        let log_1 = SerializableLog {
1369            address: Address::new(b"my address 123456789"),
1370            topics: [Hash::create(&[b"my topic"]).into()].into(),
1371            data: [1, 2, 3, 4].into(),
1372            tx_index: 1u64,
1373            block_number: last_processed_block,
1374            block_hash: Hash::create(&[b"my block hash"]).into(),
1375            tx_hash: Hash::create(&[b"my tx hash"]).into(),
1376            log_index: 1u64,
1377            removed: false,
1378            processed: Some(false),
1379            ..Default::default()
1380        };
1381        assert!(db.store_log(log_1.clone()).await.is_ok());
1382        assert!(db.set_logs_processed(Some(last_processed_block), Some(0)).await.is_ok());
1383        assert!(db.update_logs_checksums().await.is_ok());
1384
1385        let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1386
1387        let mut rpc = MockHoprIndexerOps::new();
1388        rpc.expect_try_stream_logs()
1389            .once()
1390            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == last_processed_block + 1)
1391            .return_once(move |_, _, _| Ok(Box::pin(rx)));
1392
1393        rpc.expect_block_number()
1394            .times(3)
1395            .returning(move || Ok(last_processed_block + 1));
1396
1397        rpc.expect_get_hopr_balance()
1398            .once()
1399            .return_once(move |_| Ok(HoprBalance::zero()));
1400
1401        rpc.expect_get_hopr_allowance()
1402            .once()
1403            .return_once(move |_, _| Ok(HoprBalance::zero()));
1404
1405        let block = BlockWithLogs {
1406            block_id: last_processed_block + 1,
1407            logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, last_processed_block + 1, 23)?),
1408        };
1409
1410        tx.start_send(block)?;
1411
1412        let mut handlers = MockChainLogHandler::new();
1413        handlers.expect_contract_addresses().return_const(vec![addr]);
1414        handlers
1415            .expect_contract_address_topics()
1416            .withf(move |x| x == &addr)
1417            .return_const(vec![B256::from_slice(topic.as_ref())]);
1418        handlers
1419            .expect_contract_address_topics()
1420            .withf(move |x| x == &addr)
1421            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1422        handlers
1423            .expect_safe_address()
1424            .return_const(Address::new(b"my safe address 1234"));
1425        handlers
1426            .expect_contract_addresses_map()
1427            .return_const(ContractAddresses::default());
1428
1429        let indexer_cfg = IndexerConfig::new(0, false, false, None, "/tmp/test_data".to_string());
1430
1431        let (tx_events, _) = futures::channel::mpsc::channel(1000);
1432        let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1433        indexer.start().await?;
1434
1435        Ok(())
1436    }
1437}