hopr_chain_indexer/
block.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicBool, AtomicU64, Ordering},
4};
5
6use alloy::sol_types::SolEvent;
7use futures::{
8    StreamExt,
9    future::AbortHandle,
10    stream::{self},
11};
12use hopr_bindings::hoprtoken::HoprToken::{Approval, Transfer};
13use hopr_chain_rpc::{BlockWithLogs, FilterSet, HoprIndexerRpcOperations};
14use hopr_chain_types::chain_events::SignificantChainEvent;
15use hopr_crypto_types::types::Hash;
16use hopr_db_api::logs::HoprDbLogOperations;
17use hopr_db_sql::{HoprDbGeneralModelOperations, info::HoprDbInfoOperations};
18use hopr_primitive_types::prelude::*;
19use tracing::{debug, error, info, trace};
20
21use crate::{
22    IndexerConfig,
23    errors::{CoreEthereumIndexerError, Result},
24    traits::ChainLogHandler,
25};
26
27#[cfg(all(feature = "prometheus", not(test)))]
28lazy_static::lazy_static! {
29    static ref METRIC_INDEXER_CURRENT_BLOCK: hopr_metrics::metrics::SimpleGauge =
30        hopr_metrics::metrics::SimpleGauge::new(
31            "hopr_indexer_block_number",
32            "Current last processed block number by the indexer",
33    ).unwrap();
34    static ref METRIC_INDEXER_CHECKSUM: hopr_metrics::metrics::SimpleGauge =
35        hopr_metrics::metrics::SimpleGauge::new(
36            "hopr_indexer_checksum",
37            "Contains an unsigned integer that represents the low 32-bits of the Indexer checksum"
38    ).unwrap();
39    static ref METRIC_INDEXER_SYNC_PROGRESS: hopr_metrics::metrics::SimpleGauge =
40        hopr_metrics::metrics::SimpleGauge::new(
41            "hopr_indexer_sync_progress",
42            "Sync progress of the historical data by the indexer",
43    ).unwrap();
44    static ref METRIC_INDEXER_SYNC_SOURCE: hopr_metrics::metrics::MultiGauge =
45        hopr_metrics::metrics::MultiGauge::new(
46            "hopr_indexer_data_source",
47            "Current data source of the Indexer",
48            &["source"],
49    ).unwrap();
50
51}
52
53/// Indexer
54///
55/// Accepts the RPC operational functionality [hopr_chain_rpc::HoprIndexerRpcOperations]
56/// and provides the indexing operation resulting in and output of
57/// [hopr_chain_types::chain_events::SignificantChainEvent] streamed outside the indexer by the unbounded channel.
58///
59/// The roles of the indexer:
60/// 1. prime the RPC endpoint
61/// 2. request an RPC stream of changes to process
62/// 3. process block and log stream
63/// 4. ensure finalization by postponing processing until the head is far enough
64/// 5. store relevant data into the DB
65/// 6. pass the processing on to the business logic
66#[derive(Debug, Clone)]
67pub struct Indexer<T, U, Db>
68where
69    T: HoprIndexerRpcOperations + Send + 'static,
70    U: ChainLogHandler + Send + 'static,
71    Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
72{
73    rpc: Option<T>,
74    db_processor: Option<U>,
75    db: Db,
76    cfg: IndexerConfig,
77    egress: async_channel::Sender<SignificantChainEvent>,
78    // If true (default), the indexer will panic if the event stream is terminated.
79    // Setting it to false is useful for testing.
80    panic_on_completion: bool,
81}
82
83impl<T, U, Db> Indexer<T, U, Db>
84where
85    T: HoprIndexerRpcOperations + Sync + Send + 'static,
86    U: ChainLogHandler + Send + Sync + 'static,
87    Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
88{
89    pub fn new(
90        rpc: T,
91        db_processor: U,
92        db: Db,
93        cfg: IndexerConfig,
94        egress: async_channel::Sender<SignificantChainEvent>,
95    ) -> Self {
96        Self {
97            rpc: Some(rpc),
98            db_processor: Some(db_processor),
99            db,
100            cfg,
101            egress,
102            panic_on_completion: true,
103        }
104    }
105
106    /// Disables the panic on completion.
107    pub fn without_panic_on_completion(mut self) -> Self {
108        self.panic_on_completion = false;
109        self
110    }
111
112    pub async fn start(mut self) -> Result<AbortHandle>
113    where
114        T: HoprIndexerRpcOperations + 'static,
115        U: ChainLogHandler + 'static,
116        Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
117    {
118        if self.rpc.is_none() || self.db_processor.is_none() {
119            return Err(CoreEthereumIndexerError::ProcessError(
120                "indexer cannot start, missing components".into(),
121            ));
122        }
123
124        info!("Starting chain indexing");
125
126        let rpc = self.rpc.take().expect("rpc should be present");
127        let logs_handler = Arc::new(self.db_processor.take().expect("db_processor should be present"));
128        let db = self.db.clone();
129        let tx_significant_events = self.egress.clone();
130        let panic_on_completion = self.panic_on_completion;
131
132        let (log_filters, address_topics) = Self::generate_log_filters(&logs_handler);
133
134        // Check that the contract addresses and topics are consistent with what is in the logs DB,
135        // or if the DB is empty, prime it with the given addresses and topics.
136        db.ensure_logs_origin(address_topics).await?;
137
138        let is_synced = Arc::new(AtomicBool::new(false));
139        let chain_head = Arc::new(AtomicU64::new(0));
140
141        // update the chain head once at startup to get a reference for initial syncing
142        // progress calculation
143        debug!("Updating chain head at indexer startup");
144        Self::update_chain_head(&rpc, chain_head.clone()).await;
145
146        // First, check whether fast sync is enabled and can be performed.
147        // If so:
148        //   1. Delete the existing indexed data
149        //   2. Reset fast sync progress
150        //   3. Run the fast sync process until completion
151        //   4. Finally, starting the rpc indexer.
152        let fast_sync_configured = self.cfg.fast_sync;
153        let index_empty = self.db.index_is_empty().await?;
154
155        #[derive(PartialEq, Eq)]
156        enum FastSyncMode {
157            None,
158            FromScratch,
159            Continue,
160        }
161
162        let will_perform_fast_sync = match (fast_sync_configured, index_empty) {
163            (true, false) => {
164                info!(
165                    "Fast sync is enabled, but the index database is not empty. Fast sync will continue on existing \
166                     unprocessed logs."
167                );
168                FastSyncMode::Continue
169            }
170            (false, true) => {
171                info!("Fast sync is disabled, but the index database is empty. Doing a full re-sync.");
172                // Clean the last processed log from the Log DB, to allow full resync
173                self.db.clear_index_db(None).await?;
174                self.db.set_logs_unprocessed(None, None).await?;
175                FastSyncMode::None
176            }
177            (false, false) => {
178                info!("Fast sync is disabled and the index database is not empty. Continuing normal sync.");
179                FastSyncMode::None
180            }
181            (true, true) => {
182                info!("Fast sync is enabled, starting the fast sync process");
183                // To ensure a proper state, reset any auxiliary data in the database
184                self.db.clear_index_db(None).await?;
185                self.db.set_logs_unprocessed(None, None).await?;
186                FastSyncMode::FromScratch
187            }
188        };
189
190        let (tx, mut rx) = futures::channel::mpsc::channel::<()>(1);
191
192        // Perform the fast-sync if requested
193        if FastSyncMode::None != will_perform_fast_sync {
194            let processed = match will_perform_fast_sync {
195                FastSyncMode::FromScratch => None,
196                FastSyncMode::Continue => Some(false),
197                _ => unreachable!(),
198            };
199
200            #[cfg(all(feature = "prometheus", not(test)))]
201            {
202                METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 1.0);
203                METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 0.0);
204            }
205
206            let log_block_numbers = self.db.get_logs_block_numbers(None, None, processed).await?;
207            let _first_log_block_number = log_block_numbers.first().copied().unwrap_or(0);
208            let _head = chain_head.load(Ordering::Relaxed);
209            for block_number in log_block_numbers {
210                debug!(
211                    block_number,
212                    first_log_block_number = _first_log_block_number,
213                    head = _head,
214                    "computing processed logs"
215                );
216                // Do not pollute the logs with the fast-sync progress
217                Self::process_block_by_id(&db, &logs_handler, block_number, is_synced.load(Ordering::Relaxed)).await?;
218
219                #[cfg(all(feature = "prometheus", not(test)))]
220                {
221                    let progress =
222                        (block_number - _first_log_block_number) as f64 / (_head - _first_log_block_number) as f64;
223                    METRIC_INDEXER_SYNC_PROGRESS.set(progress);
224                }
225            }
226        }
227
228        info!("Building rpc indexer background process");
229
230        let next_block_to_process = if let Some(last_log) = self.db.get_last_checksummed_log().await? {
231            info!(
232                start_block = last_log.block_number,
233                start_checksum = last_log.checksum.unwrap(),
234                "Loaded indexer state",
235            );
236
237            if self.cfg.start_block_number < last_log.block_number {
238                // If some prior indexing took place already, avoid reprocessing
239                last_log.block_number + 1
240            } else {
241                self.cfg.start_block_number
242            }
243        } else {
244            self.cfg.start_block_number
245        };
246
247        info!(next_block_to_process, "Indexer start point");
248
249        let indexing_abort_handle = hopr_async_runtime::spawn_as_abortable(async move {
250            // Update the chain head once again
251            debug!("Updating chain head at indexer startup");
252            Self::update_chain_head(&rpc, chain_head.clone()).await;
253
254            #[cfg(all(feature = "prometheus", not(test)))]
255            {
256                METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 0.0);
257                METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 1.0);
258            }
259
260            let rpc_ref = &rpc;
261
262            let event_stream = rpc
263                .try_stream_logs(next_block_to_process, log_filters, is_synced.load(Ordering::Relaxed))
264                .expect("block stream should be constructible")
265                .then(|block| {
266                    let db = db.clone();
267                    let chain_head = chain_head.clone();
268                    let is_synced = is_synced.clone();
269                    let tx = tx.clone();
270                    let logs_handler = logs_handler.clone();
271
272                    async move {
273                        Self::calculate_sync_process(
274                            block.block_id,
275                            rpc_ref,
276                            db,
277                            chain_head.clone(),
278                            is_synced.clone(),
279                            next_block_to_process,
280                            tx.clone(),
281                            logs_handler.safe_address().into(),
282                            logs_handler.contract_addresses_map().channels.into(),
283                        )
284                        .await;
285
286                        block
287                    }
288                })
289                .filter_map(|block| {
290                    let db = db.clone();
291                    let logs_handler = logs_handler.clone();
292
293                    async move {
294                        debug!(%block, "storing logs from block");
295                        let logs = block.logs.clone();
296
297                        // Filter out the token contract logs because we do not need to store these
298                        // in the database.
299                        let logs_vec = logs
300                            .into_iter()
301                            .filter(|log| log.address != logs_handler.contract_addresses_map().token)
302                            .collect();
303
304                        match db.store_logs(logs_vec).await {
305                            Ok(store_results) => {
306                                if let Some(error) = store_results
307                                    .into_iter()
308                                    .filter(|r| r.is_err())
309                                    .map(|r| r.unwrap_err())
310                                    .next()
311                                {
312                                    error!(%block, %error, "failed to processed stored logs from block");
313                                    None
314                                } else {
315                                    Some(block)
316                                }
317                            }
318                            Err(error) => {
319                                error!(%block, %error, "failed to store logs from block");
320                                None
321                            }
322                        }
323                    }
324                })
325                .filter_map(|block| {
326                    let db = db.clone();
327                    let logs_handler = logs_handler.clone();
328                    let is_synced = is_synced.clone();
329                    async move {
330                        Self::process_block(&db, &logs_handler, block, false, is_synced.load(Ordering::Relaxed)).await
331                    }
332                })
333                .flat_map(stream::iter);
334
335            futures::pin_mut!(event_stream);
336            while let Some(event) = event_stream.next().await {
337                trace!(%event, "processing on-chain event");
338                // Pass the events further only once we're fully synced
339                if is_synced.load(Ordering::Relaxed) {
340                    if let Err(error) = tx_significant_events.try_send(event) {
341                        error!(%error, "failed to pass a significant chain event further");
342                    }
343                }
344            }
345
346            if panic_on_completion {
347                panic!(
348                    "Indexer event stream has been terminated. This error may be caused by a failed RPC connection."
349                );
350            }
351        });
352
353        if rx.next().await.is_some() {
354            Ok(indexing_abort_handle)
355        } else {
356            Err(crate::errors::CoreEthereumIndexerError::ProcessError(
357                "Error during indexing start".into(),
358            ))
359        }
360    }
361
362    /// Generates specialized log filters for efficient blockchain event processing.
363    ///
364    /// This function creates a comprehensive set of log filters that optimize
365    /// indexer performance by categorizing filters based on contract types and
366    /// event relevance to the specific node.
367    ///
368    /// # Arguments
369    /// * `logs_handler` - Handler containing contract addresses and safe address
370    ///
371    /// # Returns
372    /// * `(FilterSet, Vec<(Address, Hash)>)` - A tuple containing:
373    ///   - `FilterSet`: Categorized filters for blockchain event processing.
374    ///   - `Vec<(Address, Hash)>`: A vector of address-topic pairs for logs origin validation.
375    ///
376    /// # Filter Categories
377    /// * `all` - Complete set of filters for normal operation
378    /// * `token` - Token-specific filters (Transfer, Approval events for safe)
379    /// * `no_token` - Non-token contract filters for initial sync optimization
380    fn generate_log_filters(logs_handler: &U) -> (FilterSet, Vec<(Address, Hash)>) {
381        let safe_address = logs_handler.safe_address();
382        let addresses_no_token = logs_handler
383            .contract_addresses()
384            .into_iter()
385            .filter(|a| *a != logs_handler.contract_addresses_map().token)
386            .collect::<Vec<_>>();
387        let mut filter_base_addresses = vec![];
388        let mut filter_base_topics = vec![];
389        let mut address_topics = vec![];
390
391        addresses_no_token.iter().for_each(|address| {
392            let topics = logs_handler.contract_address_topics(*address);
393            if !topics.is_empty() {
394                filter_base_addresses.push(alloy::primitives::Address::from(*address));
395                filter_base_topics.extend(topics.clone());
396                for topic in topics.iter() {
397                    address_topics.push((*address, Hash::from(topic.0)))
398                }
399            }
400        });
401
402        let filter_base = alloy::rpc::types::Filter::new()
403            .address(filter_base_addresses)
404            .event_signature(filter_base_topics);
405        let filter_token = alloy::rpc::types::Filter::new().address(alloy::primitives::Address::from(
406            logs_handler.contract_addresses_map().token,
407        ));
408
409        let filter_transfer_to = filter_token
410            .clone()
411            .event_signature(Transfer::SIGNATURE_HASH)
412            .topic2(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()));
413
414        let filter_transfer_from = filter_token
415            .clone()
416            .event_signature(Transfer::SIGNATURE_HASH)
417            .topic1(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()));
418
419        let filter_approval = filter_token
420            .event_signature(Approval::SIGNATURE_HASH)
421            .topic1(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()))
422            .topic2(alloy::primitives::B256::from_slice(
423                logs_handler.contract_addresses_map().channels.to_bytes32().as_ref(),
424            ));
425
426        let set = FilterSet {
427            all: vec![
428                filter_base.clone(),
429                filter_transfer_from.clone(),
430                filter_transfer_to.clone(),
431                filter_approval.clone(),
432            ],
433            token: vec![filter_transfer_from, filter_transfer_to, filter_approval],
434            no_token: vec![filter_base],
435        };
436
437        (set, address_topics)
438    }
439
440    /// Processes a block by its ID.
441    ///
442    /// This function retrieves logs for the given block ID and processes them using the database
443    /// and log handler.
444    ///
445    /// # Arguments
446    ///
447    /// * `db` - The database operations handler.
448    /// * `logs_handler` - The database log handler.
449    /// * `block_id` - The ID of the block to process.
450    ///
451    /// # Returns
452    ///
453    /// A `Result` containing an optional vector of significant chain events if the operation succeeds or an error if it
454    /// fails.
455    async fn process_block_by_id(
456        db: &Db,
457        logs_handler: &U,
458        block_id: u64,
459        is_synced: bool,
460    ) -> crate::errors::Result<Option<Vec<SignificantChainEvent>>>
461    where
462        U: ChainLogHandler + 'static,
463        Db: HoprDbLogOperations + 'static,
464    {
465        let logs = db.get_logs(Some(block_id), Some(0)).await?;
466        let mut block = BlockWithLogs {
467            block_id,
468            ..Default::default()
469        };
470
471        for log in logs {
472            if log.block_number == block_id {
473                block.logs.insert(log);
474            } else {
475                error!(
476                    expected = block_id,
477                    actual = log.block_number,
478                    "block number mismatch in logs from database"
479                );
480                panic!("block number mismatch in logs from database")
481            }
482        }
483
484        Ok(Self::process_block(db, logs_handler, block, true, is_synced).await)
485    }
486
487    /// Processes a block and its logs.
488    ///
489    /// This function collects events from the block logs and updates the database with the processed logs.
490    ///
491    /// # Arguments
492    ///
493    /// * `db` - The database operations handler.
494    /// * `logs_handler` - The database log handler.
495    /// * `block` - The block with logs to process.
496    /// * `fetch_checksum_from_db` - A boolean indicating whether to fetch the checksum from the database.
497    ///
498    /// # Returns
499    ///
500    /// An optional vector of significant chain events if the operation succeeds.
501    async fn process_block(
502        db: &Db,
503        logs_handler: &U,
504        block: BlockWithLogs,
505        fetch_checksum_from_db: bool,
506        is_synced: bool,
507    ) -> Option<Vec<SignificantChainEvent>>
508    where
509        U: ChainLogHandler + 'static,
510        Db: HoprDbLogOperations + 'static,
511    {
512        let block_id = block.block_id;
513        let log_count = block.logs.len();
514        debug!(block_id, "processing events");
515
516        // FIXME: The block indexing and marking as processed should be done in a single
517        // transaction. This is difficult since currently this would be across databases.
518        let events = stream::iter(block.logs.clone())
519            .filter_map(|log| async move {
520                match logs_handler.collect_log_event(log.clone(), is_synced).await {
521                    Ok(data) => match db.set_log_processed(log).await {
522                        Ok(_) => data,
523                        Err(error) => {
524                            error!(block_id, %error, "failed to mark log as processed, panicking to prevent data loss");
525                            panic!("failed to mark log as processed, panicking to prevent data loss")
526                        }
527                    },
528                    Err(error) => {
529                        error!(block_id, %error, "failed to process log into event, panicking to prevent data loss");
530                        panic!("failed to process log into event, panicking to prevent data loss")
531                    }
532                }
533            })
534            .collect::<Vec<SignificantChainEvent>>()
535            .await;
536
537        // if we made it this far, no errors occurred and we can update checksums and indexer state
538        match db.update_logs_checksums().await {
539            Ok(last_log_checksum) => {
540                let checksum = if fetch_checksum_from_db {
541                    let last_log = block.logs.into_iter().next_back()?;
542                    let log = db.get_log(block_id, last_log.tx_index, last_log.log_index).await.ok()?;
543
544                    log.checksum?
545                } else {
546                    last_log_checksum.to_string()
547                };
548
549                if log_count != 0 {
550                    info!(
551                        block_number = block_id,
552                        log_count, last_log_checksum = ?checksum, "Indexer state update",
553                    );
554
555                    #[cfg(all(feature = "prometheus", not(test)))]
556                    {
557                        if let Ok(checksum_hash) = Hash::from_hex(checksum.as_str()) {
558                            let low_4_bytes =
559                                hopr_primitive_types::prelude::U256::from_big_endian(checksum_hash.as_ref()).low_u32();
560                            METRIC_INDEXER_CHECKSUM.set(low_4_bytes.into());
561                        } else {
562                            error!("Invalid checksum generated from logs");
563                        }
564                    }
565                }
566
567                // finally update the block number in the database to the last processed block
568                match db.set_indexer_state_info(None, block_id as u32).await {
569                    Ok(_) => {
570                        trace!(block_id, "updated indexer state info");
571                    }
572                    Err(error) => error!(block_id, %error, "failed to update indexer state info"),
573                }
574            }
575            Err(error) => error!(block_id, %error, "failed to update checksums for logs from block"),
576        }
577
578        debug!(
579            block_id,
580            num_events = events.len(),
581            "processed significant chain events from block",
582        );
583
584        Some(events)
585    }
586
587    async fn update_chain_head(rpc: &T, chain_head: Arc<AtomicU64>) -> u64
588    where
589        T: HoprIndexerRpcOperations + 'static,
590    {
591        match rpc.block_number().await {
592            Ok(head) => {
593                chain_head.store(head, Ordering::Relaxed);
594                debug!(head, "Updated chain head");
595                head
596            }
597            Err(error) => {
598                error!(%error, "Failed to fetch block number from RPC");
599                panic!("Failed to fetch block number from RPC, cannot continue indexing due to {error}")
600            }
601        }
602    }
603
604    /// Calculates the synchronization progress.
605    ///
606    /// This function processes a block and updates synchronization metrics and state.
607    ///
608    /// # Arguments
609    ///
610    /// * `block` - The block with logs to process.
611    /// * `rpc` - The RPC operations handler.
612    /// * `chain_head` - The current chain head block number.
613    /// * `is_synced` - A boolean indicating whether the indexer is synced.
614    /// * `start_block` - The first block number to process.
615    /// * `tx` - A sender channel for synchronization notifications.
616    ///
617    /// # Returns
618    ///
619    /// The block which was provided as input.
620    #[allow(clippy::too_many_arguments)]
621    async fn calculate_sync_process(
622        current_block: u64,
623        rpc: &T,
624        db: Db,
625        chain_head: Arc<AtomicU64>,
626        is_synced: Arc<AtomicBool>,
627        next_block_to_process: u64,
628        mut tx: futures::channel::mpsc::Sender<()>,
629        safe_address: Option<Address>,
630        channels_address: Option<Address>,
631    ) where
632        T: HoprIndexerRpcOperations + 'static,
633        Db: HoprDbInfoOperations + Clone + Send + Sync + 'static,
634    {
635        #[cfg(all(feature = "prometheus", not(test)))]
636        {
637            METRIC_INDEXER_CURRENT_BLOCK.set(current_block as f64);
638        }
639
640        let mut head = chain_head.load(Ordering::Relaxed);
641
642        // We only print out sync progress if we are not yet synced.
643        // Once synced, we don't print out progress anymore.
644        if !is_synced.load(Ordering::Relaxed) {
645            let mut block_difference = head.saturating_sub(next_block_to_process);
646
647            let progress = if block_difference == 0 {
648                // Before we call the sync complete, we check the chain again.
649                head = Self::update_chain_head(rpc, chain_head.clone()).await;
650                block_difference = head.saturating_sub(next_block_to_process);
651
652                if block_difference == 0 {
653                    1_f64
654                } else {
655                    (current_block - next_block_to_process) as f64 / block_difference as f64
656                }
657            } else {
658                (current_block - next_block_to_process) as f64 / block_difference as f64
659            };
660
661            info!(
662                progress = progress * 100_f64,
663                block = current_block,
664                head,
665                "Sync progress to last known head"
666            );
667
668            #[cfg(all(feature = "prometheus", not(test)))]
669            METRIC_INDEXER_SYNC_PROGRESS.set(progress);
670
671            if current_block >= head {
672                info!("indexer sync completed successfully");
673                is_synced.store(true, Ordering::Relaxed);
674
675                if let Some(safe_address) = safe_address {
676                    info!("updating safe balance from chain after indexer sync completed");
677                    match rpc.get_hopr_balance(safe_address).await {
678                        Ok(balance) => {
679                            if let Err(error) = db.set_safe_hopr_balance(None, balance).await {
680                                error!(%error, "failed to update safe balance from chain after indexer sync completed");
681                            }
682                        }
683                        Err(error) => {
684                            error!(%error, "failed to fetch safe balance from chain after indexer sync completed");
685                        }
686                    }
687                }
688
689                if let Some((channels_address, safe_address)) = channels_address.zip(safe_address) {
690                    info!("updating safe allowance from chain after indexer sync completed");
691                    match rpc.get_hopr_allowance(safe_address, channels_address).await {
692                        Ok(allowance) => {
693                            if let Err(error) = db.set_safe_hopr_allowance(None, allowance).await {
694                                error!(%error, "failed to update safe allowance from chain after indexer sync completed");
695                            }
696                        }
697                        Err(error) => {
698                            error!(%error, "failed to fetch safe allowance from chain after indexer sync completed");
699                        }
700                    }
701                }
702
703                if let Err(error) = tx.try_send(()) {
704                    error!(%error, "failed to notify about achieving indexer synchronization")
705                }
706            }
707        }
708    }
709}
710
711#[cfg(test)]
712mod tests {
713    use std::{collections::BTreeSet, pin::Pin};
714
715    use alloy::{
716        dyn_abi::DynSolValue,
717        primitives::{Address as AlloyAddress, B256},
718        sol_types::SolEvent,
719    };
720    use async_trait::async_trait;
721    use futures::{Stream, join};
722    use hex_literal::hex;
723    use hopr_chain_rpc::BlockWithLogs;
724    use hopr_chain_types::{ContractAddresses, chain_events::ChainEventType};
725    use hopr_crypto_types::{
726        keypairs::{Keypair, OffchainKeypair},
727        prelude::ChainKeypair,
728    };
729    use hopr_db_sql::{accounts::HoprDbAccountOperations, db::HoprDb};
730    use hopr_internal_types::account::{AccountEntry, AccountType};
731    use hopr_primitive_types::prelude::*;
732    use mockall::mock;
733    use multiaddr::Multiaddr;
734
735    use super::*;
736    use crate::traits::MockChainLogHandler;
737
738    lazy_static::lazy_static! {
739        static ref ALICE_OKP: OffchainKeypair = OffchainKeypair::random();
740        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
741        static ref ALICE: Address = ALICE_KP.public().to_address();
742        static ref BOB_OKP: OffchainKeypair = OffchainKeypair::random();
743        static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
744        static ref CHRIS: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
745
746        static ref RANDOM_ANNOUNCEMENT_CHAIN_EVENT: ChainEventType = ChainEventType::Announcement {
747            peer: (*OffchainKeypair::from_secret(&hex!("14d2d952715a51aadbd4cc6bfac9aa9927182040da7b336d37d5bb7247aa7566")).expect("lazy static keypair should be constructible").public()).into(),
748            address: hex!("2f4b7662a192b8125bbf51cfbf1bf5cc00b2c8e5").into(),
749            multiaddresses: vec![Multiaddr::empty()],
750        };
751    }
752
753    fn build_announcement_logs(
754        address: Address,
755        size: usize,
756        block_number: u64,
757        starting_log_index: u64,
758    ) -> anyhow::Result<Vec<SerializableLog>> {
759        let mut logs: Vec<SerializableLog> = vec![];
760        let block_hash = Hash::create(&[format!("my block hash {block_number}").as_bytes()]);
761
762        for i in 0..size {
763            let test_multiaddr: Multiaddr = format!("/ip4/1.2.3.4/tcp/{}", 1000 + i).parse()?;
764            let tx_index: u64 = i as u64;
765            let log_index: u64 = starting_log_index + tx_index;
766
767            logs.push(SerializableLog {
768                address,
769                block_hash: block_hash.into(),
770                topics: vec![hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH.into()],
771                data: DynSolValue::Tuple(vec![
772                    DynSolValue::Address(AlloyAddress::from_slice(address.as_ref())),
773                    DynSolValue::String(test_multiaddr.to_string()),
774                ])
775                .abi_encode(),
776                tx_hash: Hash::create(&[format!("my tx hash {i}").as_bytes()]).into(),
777                tx_index,
778                block_number,
779                log_index,
780                ..Default::default()
781            });
782        }
783
784        Ok(logs)
785    }
786
787    mock! {
788        HoprIndexerOps {}     // Name of the mock struct, less the "Mock" prefix
789
790        #[async_trait]
791        impl HoprIndexerRpcOperations for HoprIndexerOps {
792            async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
793            async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
794            async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance>;
795            async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
796
797            fn try_stream_logs<'a>(
798                &'a self,
799                start_block_number: u64,
800                filters: FilterSet,
801                is_synced: bool,
802            ) -> hopr_chain_rpc::errors::Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>>;
803        }
804    }
805
806    #[tokio::test]
807    async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_none_if_none_is_found()
808    -> anyhow::Result<()> {
809        let mut handlers = MockChainLogHandler::new();
810        let mut rpc = MockHoprIndexerOps::new();
811        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
812
813        let addr = Address::new(b"my address 123456789");
814        let topic = Hash::create(&[b"my topic"]);
815        db.ensure_logs_origin(vec![(addr, topic)]).await?;
816
817        handlers.expect_contract_addresses().return_const(vec![addr]);
818        handlers
819            .expect_contract_address_topics()
820            .withf(move |x| x == &addr)
821            .return_const(vec![B256::from_slice(topic.as_ref())]);
822        handlers
823            .expect_contract_address_topics()
824            .withf(move |x| x == &addr)
825            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
826        handlers
827            .expect_safe_address()
828            .return_const(Address::new(b"my safe address 1234"));
829        handlers
830            .expect_contract_addresses_map()
831            .return_const(ContractAddresses::default());
832
833        let head_block = 1000;
834        rpc.expect_block_number().times(2).returning(move || Ok(head_block));
835
836        let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
837        rpc.expect_try_stream_logs()
838            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
839            .return_once(move |_, _, _| Ok(Box::pin(rx)));
840
841        let indexer = Indexer::new(
842            rpc,
843            handlers,
844            db.clone(),
845            IndexerConfig::default(),
846            async_channel::unbounded().0,
847        )
848        .without_panic_on_completion();
849
850        let (indexing, _) = join!(indexer.start(), async move {
851            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
852            tx.close_channel()
853        });
854        assert!(indexing.is_err()); // terminated by the close channel
855
856        Ok(())
857    }
858
859    #[tokio::test]
860    async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_it_when_found() -> anyhow::Result<()>
861    {
862        let mut handlers = MockChainLogHandler::new();
863        let mut rpc = MockHoprIndexerOps::new();
864        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
865        let head_block = 1000;
866        let latest_block = 15u64;
867
868        let addr = Address::new(b"my address 123456789");
869        let topic = Hash::create(&[b"my topic"]);
870
871        handlers.expect_contract_addresses().return_const(vec![addr]);
872        handlers
873            .expect_contract_address_topics()
874            .withf(move |x| x == &addr)
875            .return_const(vec![B256::from_slice(topic.as_ref())]);
876        handlers
877            .expect_contract_address_topics()
878            .withf(move |x| x == &addr)
879            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
880        handlers
881            .expect_safe_address()
882            .return_const(Address::new(b"my safe address 1234"));
883        handlers
884            .expect_contract_addresses_map()
885            .return_const(ContractAddresses::default());
886
887        db.ensure_logs_origin(vec![(addr, topic)]).await?;
888
889        rpc.expect_block_number().times(2).returning(move || Ok(head_block));
890
891        let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
892        rpc.expect_try_stream_logs()
893            .once()
894            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == latest_block + 1)
895            .return_once(move |_, _, _| Ok(Box::pin(rx)));
896
897        // insert and process latest block
898        let log_1 = SerializableLog {
899            address: Address::new(b"my address 123456789"),
900            topics: [Hash::create(&[b"my topic"]).into()].into(),
901            data: [1, 2, 3, 4].into(),
902            tx_index: 1u64,
903            block_number: latest_block,
904            block_hash: Hash::create(&[b"my block hash"]).into(),
905            tx_hash: Hash::create(&[b"my tx hash"]).into(),
906            log_index: 1u64,
907            removed: false,
908            processed: Some(false),
909            ..Default::default()
910        };
911        assert!(db.store_log(log_1.clone()).await.is_ok());
912        assert!(db.set_logs_processed(Some(latest_block), Some(0)).await.is_ok());
913        assert!(db.update_logs_checksums().await.is_ok());
914
915        let indexer = Indexer::new(
916            rpc,
917            handlers,
918            db.clone(),
919            IndexerConfig {
920                fast_sync: false,
921                ..Default::default()
922            },
923            async_channel::unbounded().0,
924        )
925        .without_panic_on_completion();
926
927        let (indexing, _) = join!(indexer.start(), async move {
928            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
929            tx.close_channel()
930        });
931        assert!(indexing.is_err()); // terminated by the close channel
932
933        Ok(())
934    }
935
936    #[tokio::test]
937    async fn test_indexer_should_pass_blocks_that_are_finalized() -> anyhow::Result<()> {
938        let mut handlers = MockChainLogHandler::new();
939        let mut rpc = MockHoprIndexerOps::new();
940        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
941
942        let cfg = IndexerConfig::default();
943
944        let addr = Address::new(b"my address 123456789");
945        handlers.expect_contract_addresses().return_const(vec![addr]);
946        handlers
947            .expect_contract_address_topics()
948            .withf(move |x| x == &addr)
949            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
950        handlers
951            .expect_safe_address()
952            .return_const(Address::new(b"my safe address 1234"));
953        handlers
954            .expect_contract_addresses_map()
955            .return_const(ContractAddresses::default());
956
957        let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
958        rpc.expect_try_stream_logs()
959            .times(1)
960            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
961            .return_once(move |_, _, _| Ok(Box::pin(rx)));
962
963        let head_block = 1000;
964        rpc.expect_block_number().returning(move || Ok(head_block));
965
966        rpc.expect_get_hopr_balance()
967            .withf(move |x| x == &Address::new(b"my safe address 1234"))
968            .returning(move |_| Ok(HoprBalance::default()));
969
970        rpc.expect_get_hopr_allowance()
971            .withf(move |x, y| x == &Address::new(b"my safe address 1234") && y == &Address::from([0; 20]))
972            .returning(move |_, _| Ok(HoprBalance::default()));
973
974        let finalized_block = BlockWithLogs {
975            block_id: head_block - 1,
976            logs: BTreeSet::from_iter(build_announcement_logs(*BOB, 4, head_block - 1, 23)?),
977        };
978        let head_allowing_finalization = BlockWithLogs {
979            block_id: head_block,
980            logs: BTreeSet::new(),
981        };
982
983        // called once per block which is finalizable
984        handlers
985            .expect_collect_log_event()
986            // .times(2)
987            .times(finalized_block.logs.len())
988            .returning(|_, _| Ok(None));
989
990        assert!(tx.start_send(finalized_block.clone()).is_ok());
991        assert!(tx.start_send(head_allowing_finalization.clone()).is_ok());
992
993        let indexer =
994            Indexer::new(rpc, handlers, db.clone(), cfg, async_channel::unbounded().0).without_panic_on_completion();
995        let _ = join!(indexer.start(), async move {
996            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
997            tx.close_channel()
998        });
999
1000        Ok(())
1001    }
1002
1003    #[test_log::test(tokio::test)]
1004    async fn test_indexer_fast_sync_full_with_resume() -> anyhow::Result<()> {
1005        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1006
1007        let addr = Address::new(b"my address 123456789");
1008        let topic = Hash::create(&[b"my topic"]);
1009
1010        // Run 1: Fast sync enabled, index empty
1011        {
1012            let logs = vec![
1013                build_announcement_logs(*BOB, 1, 1, 1)?,
1014                build_announcement_logs(*BOB, 1, 2, 1)?,
1015            ]
1016            .into_iter()
1017            .flatten()
1018            .collect::<Vec<_>>();
1019
1020            assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1021
1022            for log in logs {
1023                assert!(db.store_log(log).await.is_ok());
1024            }
1025            assert!(db.update_logs_checksums().await.is_ok());
1026            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 0);
1027            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
1028
1029            let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1030            let (tx_events, _) = async_channel::unbounded();
1031
1032            let head_block = 5;
1033            let mut rpc = MockHoprIndexerOps::new();
1034            rpc.expect_block_number().returning(move || Ok(head_block));
1035            rpc.expect_try_stream_logs()
1036                .times(1)
1037                .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 3)
1038                .return_once(move |_, _, _| Ok(Box::pin(rx)));
1039
1040            let mut handlers = MockChainLogHandler::new();
1041            handlers.expect_contract_addresses().return_const(vec![addr]);
1042            handlers
1043                .expect_contract_address_topics()
1044                .withf(move |x| x == &addr)
1045                .return_const(vec![B256::from_slice(topic.as_ref())]);
1046            handlers
1047                .expect_collect_log_event()
1048                .times(2)
1049                .withf(move |l, _| [1, 2].contains(&l.block_number))
1050                .returning(|_, _| Ok(None));
1051            handlers
1052                .expect_contract_address_topics()
1053                .withf(move |x| x == &addr)
1054                .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1055            handlers
1056                .expect_safe_address()
1057                .return_const(Address::new(b"my safe address 1234"));
1058            handlers
1059                .expect_contract_addresses_map()
1060                .return_const(ContractAddresses::default());
1061
1062            let indexer_cfg = IndexerConfig {
1063                start_block_number: 0,
1064                fast_sync: true,
1065            };
1066            let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1067            let (indexing, _) = join!(indexer.start(), async move {
1068                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1069                tx.close_channel()
1070            });
1071            assert!(indexing.is_err()); // terminated by the close channel
1072
1073            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
1074            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
1075
1076            // At the end we need to simulate that the index is not empty,
1077            // thus storing some data.
1078            db.insert_account(
1079                None,
1080                AccountEntry {
1081                    public_key: *ALICE_OKP.public(),
1082                    chain_addr: *ALICE,
1083                    entry_type: AccountType::NotAnnounced,
1084                    published_at: 1,
1085                },
1086            )
1087            .await?;
1088            db.insert_account(
1089                None,
1090                AccountEntry {
1091                    public_key: *BOB_OKP.public(),
1092                    chain_addr: *BOB,
1093                    entry_type: AccountType::NotAnnounced,
1094                    published_at: 1,
1095                },
1096            )
1097            .await?;
1098        }
1099
1100        // Run 2: Fast sync enabled, index not empty, resume after 2 logs
1101        {
1102            let logs = vec![
1103                build_announcement_logs(*BOB, 1, 3, 1)?,
1104                build_announcement_logs(*BOB, 1, 4, 1)?,
1105            ]
1106            .into_iter()
1107            .flatten()
1108            .collect::<Vec<_>>();
1109
1110            assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1111
1112            for log in logs {
1113                assert!(db.store_log(log).await.is_ok());
1114            }
1115            assert!(db.update_logs_checksums().await.is_ok());
1116            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
1117            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
1118
1119            let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1120            let (tx_events, _) = async_channel::unbounded();
1121
1122            let head_block = 5;
1123            let mut rpc = MockHoprIndexerOps::new();
1124            rpc.expect_block_number().returning(move || Ok(head_block));
1125            rpc.expect_try_stream_logs()
1126                .times(1)
1127                .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 5)
1128                .return_once(move |_, _, _| Ok(Box::pin(rx)));
1129
1130            let mut handlers = MockChainLogHandler::new();
1131            handlers.expect_contract_addresses().return_const(vec![addr]);
1132            handlers
1133                .expect_contract_address_topics()
1134                .withf(move |x| x == &addr)
1135                .return_const(vec![B256::from_slice(topic.as_ref())]);
1136
1137            handlers
1138                .expect_collect_log_event()
1139                .times(2)
1140                .withf(move |l, _| [3, 4].contains(&l.block_number))
1141                .returning(|_, _| Ok(None));
1142            handlers
1143                .expect_contract_address_topics()
1144                .withf(move |x| x == &addr)
1145                .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1146            handlers
1147                .expect_safe_address()
1148                .return_const(Address::new(b"my safe address 1234"));
1149            handlers
1150                .expect_contract_addresses_map()
1151                .return_const(ContractAddresses::default());
1152
1153            let indexer_cfg = IndexerConfig {
1154                start_block_number: 0,
1155                fast_sync: true,
1156            };
1157            let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1158            let (indexing, _) = join!(indexer.start(), async move {
1159                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1160                tx.close_channel()
1161            });
1162            assert!(indexing.is_err()); // terminated by the close channel
1163
1164            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 4);
1165            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
1166        }
1167
1168        Ok(())
1169    }
1170
1171    #[test_log::test(tokio::test)]
1172    async fn test_indexer_should_yield_back_once_the_past_events_are_indexed() -> anyhow::Result<()> {
1173        let mut handlers = MockChainLogHandler::new();
1174        let mut rpc = MockHoprIndexerOps::new();
1175        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1176
1177        let cfg = IndexerConfig::default();
1178
1179        // We don't want to index anything really
1180        let addr = Address::new(b"my address 123456789");
1181        handlers.expect_contract_addresses().return_const(vec![addr]);
1182        handlers
1183            .expect_contract_address_topics()
1184            .withf(move |x| x == &addr)
1185            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1186        handlers
1187            .expect_contract_address_topics()
1188            .withf(move |x| x == &addr)
1189            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1190        handlers
1191            .expect_safe_address()
1192            .return_const(Address::new(b"my safe address 1234"));
1193        handlers
1194            .expect_contract_addresses_map()
1195            .return_const(ContractAddresses::default());
1196
1197        let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1198        // Expected to be called once starting at 0 and yield the respective blocks
1199        rpc.expect_try_stream_logs()
1200            .times(1)
1201            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
1202            .return_once(move |_, _, _| Ok(Box::pin(rx)));
1203        rpc.expect_get_hopr_balance()
1204            .once()
1205            .return_once(move |_| Ok(HoprBalance::zero()));
1206        rpc.expect_get_hopr_allowance()
1207            .once()
1208            .return_once(move |_, _| Ok(HoprBalance::zero()));
1209
1210        let head_block = 1000;
1211        let block_numbers = [head_block - 1, head_block, head_block + 1];
1212
1213        let blocks: Vec<BlockWithLogs> = block_numbers
1214            .iter()
1215            .map(|block_id| BlockWithLogs {
1216                block_id: *block_id,
1217                logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, *block_id, 23).unwrap()),
1218            })
1219            .collect();
1220
1221        for _ in 0..(blocks.len() as u64) {
1222            rpc.expect_block_number().returning(move || Ok(head_block));
1223        }
1224
1225        for block in blocks.iter() {
1226            assert!(tx.start_send(block.clone()).is_ok());
1227        }
1228
1229        // Generate the expected events to be able to process the blocks
1230        handlers
1231            .expect_collect_log_event()
1232            .times(1)
1233            .withf(move |l, _| block_numbers.contains(&l.block_number))
1234            .returning(|l, _| {
1235                let block_number = l.block_number;
1236                Ok(Some(SignificantChainEvent {
1237                    tx_hash: Hash::create(&[format!("my tx hash {block_number}").as_bytes()]),
1238                    event_type: RANDOM_ANNOUNCEMENT_CHAIN_EVENT.clone(),
1239                }))
1240            });
1241
1242        let (tx_events, rx_events) = async_channel::unbounded();
1243        let indexer = Indexer::new(rpc, handlers, db.clone(), cfg, tx_events).without_panic_on_completion();
1244        indexer.start().await?;
1245
1246        // At this point we expect 2 events to arrive. The third event, which was generated first,
1247        // should be dropped because it was generated before the indexer was in sync with head.
1248        let _first = rx_events.recv();
1249        let _second = rx_events.recv();
1250        let third = rx_events.try_recv();
1251
1252        assert!(third.is_err());
1253
1254        Ok(())
1255    }
1256
1257    #[test_log::test(tokio::test)]
1258    async fn test_indexer_should_not_reprocess_last_processed_block() -> anyhow::Result<()> {
1259        let last_processed_block = 100_u64;
1260
1261        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1262
1263        let addr = Address::new(b"my address 123456789");
1264        let topic = Hash::create(&[b"my topic"]);
1265        assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1266
1267        // insert and process latest block
1268        let log_1 = SerializableLog {
1269            address: Address::new(b"my address 123456789"),
1270            topics: [Hash::create(&[b"my topic"]).into()].into(),
1271            data: [1, 2, 3, 4].into(),
1272            tx_index: 1u64,
1273            block_number: last_processed_block,
1274            block_hash: Hash::create(&[b"my block hash"]).into(),
1275            tx_hash: Hash::create(&[b"my tx hash"]).into(),
1276            log_index: 1u64,
1277            removed: false,
1278            processed: Some(false),
1279            ..Default::default()
1280        };
1281        assert!(db.store_log(log_1.clone()).await.is_ok());
1282        assert!(db.set_logs_processed(Some(last_processed_block), Some(0)).await.is_ok());
1283        assert!(db.update_logs_checksums().await.is_ok());
1284
1285        let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1286
1287        let mut rpc = MockHoprIndexerOps::new();
1288        rpc.expect_try_stream_logs()
1289            .once()
1290            .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == last_processed_block + 1)
1291            .return_once(move |_, _, _| Ok(Box::pin(rx)));
1292
1293        rpc.expect_block_number()
1294            .times(3)
1295            .returning(move || Ok(last_processed_block + 1));
1296
1297        rpc.expect_get_hopr_balance()
1298            .once()
1299            .return_once(move |_| Ok(HoprBalance::zero()));
1300
1301        rpc.expect_get_hopr_allowance()
1302            .once()
1303            .return_once(move |_, _| Ok(HoprBalance::zero()));
1304
1305        let block = BlockWithLogs {
1306            block_id: last_processed_block + 1,
1307            logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, last_processed_block + 1, 23)?),
1308        };
1309
1310        tx.start_send(block)?;
1311
1312        let mut handlers = MockChainLogHandler::new();
1313        handlers.expect_contract_addresses().return_const(vec![addr]);
1314        handlers
1315            .expect_contract_address_topics()
1316            .withf(move |x| x == &addr)
1317            .return_const(vec![B256::from_slice(topic.as_ref())]);
1318        handlers
1319            .expect_contract_address_topics()
1320            .withf(move |x| x == &addr)
1321            .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1322        handlers
1323            .expect_safe_address()
1324            .return_const(Address::new(b"my safe address 1234"));
1325        handlers
1326            .expect_contract_addresses_map()
1327            .return_const(ContractAddresses::default());
1328
1329        let indexer_cfg = IndexerConfig {
1330            start_block_number: 0,
1331            fast_sync: false,
1332        };
1333
1334        let (tx_events, _) = async_channel::unbounded();
1335        let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1336        indexer.start().await?;
1337
1338        Ok(())
1339    }
1340}