hopr_chain_indexer/
block.rs

1use futures::{stream, FutureExt, StreamExt};
2use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3use std::sync::Arc;
4use tracing::{debug, error, info, trace};
5
6use hopr_async_runtime::prelude::{spawn, JoinHandle};
7use hopr_chain_rpc::{BlockWithLogs, HoprIndexerRpcOperations, LogFilter};
8use hopr_chain_types::chain_events::SignificantChainEvent;
9use hopr_crypto_types::types::Hash;
10use hopr_db_api::logs::HoprDbLogOperations;
11use hopr_db_sql::info::HoprDbInfoOperations;
12use hopr_db_sql::HoprDbGeneralModelOperations;
13
14#[cfg(all(feature = "prometheus", not(test)))]
15use hopr_primitive_types::prelude::ToHex;
16
17use crate::{
18    errors::{CoreEthereumIndexerError, Result},
19    traits::ChainLogHandler,
20    IndexerConfig,
21};
22
23#[cfg(all(feature = "prometheus", not(test)))]
24lazy_static::lazy_static! {
25    static ref METRIC_INDEXER_CURRENT_BLOCK: hopr_metrics::metrics::SimpleGauge =
26        hopr_metrics::metrics::SimpleGauge::new(
27            "hopr_indexer_block_number",
28            "Current last processed block number by the indexer",
29    ).unwrap();
30    static ref METRIC_INDEXER_CHECKSUM: hopr_metrics::metrics::SimpleGauge =
31        hopr_metrics::metrics::SimpleGauge::new(
32            "hopr_indexer_checksum",
33            "Contains an unsigned integer that represents the low 32-bits of the Indexer checksum"
34    ).unwrap();
35    static ref METRIC_INDEXER_SYNC_PROGRESS: hopr_metrics::metrics::SimpleGauge =
36        hopr_metrics::metrics::SimpleGauge::new(
37            "hopr_indexer_sync_progress",
38            "Sync progress of the historical data by the indexer",
39    ).unwrap();
40    static ref METRIC_INDEXER_SYNC_SOURCE: hopr_metrics::metrics::MultiGauge =
41        hopr_metrics::metrics::MultiGauge::new(
42            "hopr_indexer_data_source",
43            "Current data source of the Indexer",
44            &["source"],
45    ).unwrap();
46
47}
48
49/// Indexer
50///
51/// Accepts the RPC operational functionality [hopr_chain_rpc::HoprIndexerRpcOperations]
52/// and provides the indexing operation resulting in and output of [hopr_chain_types::chain_events::SignificantChainEvent]
53/// streamed outside the indexer by the unbounded channel.
54///
55/// The roles of the indexer:
56/// 1. prime the RPC endpoint
57/// 2. request an RPC stream of changes to process
58/// 3. process block and log stream
59/// 4. ensure finalization by postponing processing until the head is far enough
60/// 5. store relevant data into the DB
61/// 6. pass the processing on to the business logic
62#[derive(Debug, Clone)]
63pub struct Indexer<T, U, Db>
64where
65    T: HoprIndexerRpcOperations + Send + 'static,
66    U: ChainLogHandler + Send + 'static,
67    Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
68{
69    rpc: Option<T>,
70    db_processor: Option<U>,
71    db: Db,
72    cfg: IndexerConfig,
73    egress: async_channel::Sender<SignificantChainEvent>,
74    // If true (default), the indexer will panic if the event stream is terminated.
75    // Setting it to false is useful for testing.
76    panic_on_completion: bool,
77}
78
79impl<T, U, Db> Indexer<T, U, Db>
80where
81    T: HoprIndexerRpcOperations + Sync + Send + 'static,
82    U: ChainLogHandler + Send + Sync + 'static,
83    Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
84{
85    pub fn new(
86        rpc: T,
87        db_processor: U,
88        db: Db,
89        cfg: IndexerConfig,
90        egress: async_channel::Sender<SignificantChainEvent>,
91    ) -> Self {
92        Self {
93            rpc: Some(rpc),
94            db_processor: Some(db_processor),
95            db,
96            cfg,
97            egress,
98            panic_on_completion: true,
99        }
100    }
101
102    /// Disables the panic on completion.
103    pub fn without_panic_on_completion(mut self) -> Self {
104        self.panic_on_completion = false;
105        self
106    }
107
108    pub async fn start(mut self) -> Result<JoinHandle<()>>
109    where
110        T: HoprIndexerRpcOperations + 'static,
111        U: ChainLogHandler + 'static,
112        Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
113    {
114        if self.rpc.is_none() || self.db_processor.is_none() {
115            return Err(CoreEthereumIndexerError::ProcessError(
116                "indexer cannot start, missing components".into(),
117            ));
118        }
119
120        info!("Starting chain indexing");
121
122        let rpc = self.rpc.take().expect("rpc should be present");
123        let logs_handler = Arc::new(self.db_processor.take().expect("db_processor should be present"));
124        let db = self.db.clone();
125        let tx_significant_events = self.egress.clone();
126        let panic_on_completion = self.panic_on_completion;
127
128        // we skip on addresses which have no topics
129        let mut addresses = vec![];
130        let mut topics = vec![];
131        let mut address_topics = vec![];
132        logs_handler.contract_addresses().iter().for_each(|address| {
133            let contract_topics = logs_handler.contract_address_topics(*address);
134            if !contract_topics.is_empty() {
135                addresses.push(*address);
136                for topic in contract_topics {
137                    address_topics.push((*address, Hash::from(topic)));
138                    topics.push(topic);
139                }
140            }
141        });
142
143        // Check that the contract addresses and topics are consistent with what is in the logs DB,
144        // or if the DB is empty, prime it with the given addresses and topics.
145        db.ensure_logs_origin(address_topics).await?;
146
147        let log_filter = LogFilter {
148            address: addresses,
149            topics: topics.into_iter().map(Hash::from).collect(),
150        };
151
152        let is_synced = Arc::new(AtomicBool::new(false));
153        let chain_head = Arc::new(AtomicU64::new(0));
154
155        // update the chain head once at startup to get a reference for initial syncing
156        // progress calculation
157        debug!("Updating chain head at indexer startup");
158        Self::update_chain_head(&rpc, chain_head.clone()).await;
159
160        // First, check whether fast sync is enabled and can be performed.
161        // If so:
162        //   1. Delete the existing indexed data
163        //   2. Reset fast sync progress
164        //   3. Run the fast sync process until completion
165        //   4. Finally, starting the rpc indexer.
166        let fast_sync_configured = self.cfg.fast_sync;
167        let index_empty = self.db.index_is_empty().await?;
168
169        #[derive(PartialEq, Eq)]
170        enum FastSyncMode {
171            None,
172            FromScratch,
173            Continue,
174        }
175
176        let will_perform_fast_sync = match (fast_sync_configured, index_empty) {
177            (true, false) => {
178                info!("Fast sync is enabled, but the index database is not empty. Fast sync will continue on existing unprocessed logs.");
179                FastSyncMode::Continue
180            }
181            (false, true) => {
182                info!("Fast sync is disabled, but the index database is empty. Doing a full re-sync.");
183                // Clean the last processed log from the Log DB, to allow full resync
184                self.db.clear_index_db(None).await?;
185                self.db.set_logs_unprocessed(None, None).await?;
186                FastSyncMode::None
187            }
188            (false, false) => {
189                info!("Fast sync is disabled and the index database is not empty. Continuing normal sync.");
190                FastSyncMode::None
191            }
192            (true, true) => {
193                info!("Fast sync is enabled, starting the fast sync process");
194                // To ensure a proper state, reset any auxiliary data in the database
195                self.db.clear_index_db(None).await?;
196                self.db.set_logs_unprocessed(None, None).await?;
197                FastSyncMode::FromScratch
198            }
199        };
200
201        let (tx, mut rx) = futures::channel::mpsc::channel::<()>(1);
202
203        // Perform the fast-sync if requested
204        if FastSyncMode::None != will_perform_fast_sync {
205            let processed = match will_perform_fast_sync {
206                FastSyncMode::FromScratch => None,
207                FastSyncMode::Continue => Some(false),
208                _ => unreachable!(),
209            };
210
211            #[cfg(all(feature = "prometheus", not(test)))]
212            {
213                METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 1.0);
214                METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 0.0);
215            }
216
217            let log_block_numbers = self.db.get_logs_block_numbers(None, None, processed).await?;
218            let _first_log_block_number = log_block_numbers.first().copied().unwrap_or(0);
219            let _head = chain_head.load(Ordering::Relaxed);
220            for block_number in log_block_numbers {
221                // Do not pollute the logs with the fast-sync progress
222                Self::process_block_by_id(&db, &logs_handler, block_number).await?;
223                #[cfg(all(feature = "prometheus", not(test)))]
224                {
225                    let progress =
226                        (block_number - _first_log_block_number) as f64 / (_head - _first_log_block_number) as f64;
227                    METRIC_INDEXER_SYNC_PROGRESS.set(progress);
228                }
229            }
230        }
231
232        info!("Building rpc indexer background process");
233
234        let next_block_to_process = if let Some(last_log) = self.db.get_last_checksummed_log().await? {
235            info!(
236                start_block = last_log.block_number,
237                start_checksum = last_log.checksum.unwrap(),
238                "Loaded indexer state",
239            );
240
241            if self.cfg.start_block_number < last_log.block_number {
242                // If some prior indexing took place already, avoid reprocessing
243                last_log.block_number + 1
244            } else {
245                self.cfg.start_block_number
246            }
247        } else {
248            self.cfg.start_block_number
249        };
250
251        info!(next_block_to_process, "Indexer start point");
252
253        let indexing_proc = spawn(async move {
254            // Update the chain head once again
255            debug!("Updating chain head at indexer startup");
256            Self::update_chain_head(&rpc, chain_head.clone()).await;
257
258            #[cfg(all(feature = "prometheus", not(test)))]
259            {
260                METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 0.0);
261                METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 1.0);
262            }
263
264            let event_stream = rpc
265                .try_stream_logs(next_block_to_process, log_filter)
266                .expect("block stream should be constructible")
267                .then(|block| {
268                    Self::calculate_sync_process(
269                        block.block_id,
270                        &rpc,
271                        chain_head.clone(),
272                        is_synced.clone(),
273                        next_block_to_process,
274                        tx.clone(),
275                    )
276                    .map(|_| block)
277                })
278                .filter_map(|block| {
279                    let db = db.clone();
280
281                    async move {
282                        debug!(%block, "storing logs from block");
283                        let logs = block.logs.clone();
284                        let logs_vec = logs.into_iter().collect();
285                        match db.store_logs(logs_vec).await {
286                            Ok(store_results) => {
287                                if let Some(error) = store_results
288                                    .into_iter()
289                                    .filter(|r| r.is_err())
290                                    .map(|r| r.unwrap_err())
291                                    .next()
292                                {
293                                    error!(%block, %error, "failed to processed stored logs from block");
294                                    None
295                                } else {
296                                    Some(block)
297                                }
298                            }
299                            Err(error) => {
300                                error!(%block, %error, "failed to store logs from block");
301                                None
302                            }
303                        }
304                    }
305                })
306                .filter_map(|block| Self::process_block(&db, &logs_handler, block, false))
307                .flat_map(stream::iter);
308
309            futures::pin_mut!(event_stream);
310            while let Some(event) = event_stream.next().await {
311                trace!(%event, "processing on-chain event");
312                // Pass the events further only once we're fully synced
313                if is_synced.load(Ordering::Relaxed) {
314                    if let Err(error) = tx_significant_events.try_send(event) {
315                        error!(%error, "failed to pass a significant chain event further");
316                    }
317                }
318            }
319
320            if panic_on_completion {
321                panic!(
322                    "Indexer event stream has been terminated. This error may be caused by a failed RPC connection."
323                );
324            }
325        });
326
327        if rx.next().await.is_some() {
328            Ok(indexing_proc)
329        } else {
330            Err(crate::errors::CoreEthereumIndexerError::ProcessError(
331                "Error during indexing start".into(),
332            ))
333        }
334    }
335
336    /// Processes a block by its ID.
337    ///
338    /// This function retrieves logs for the given block ID and processes them using the database
339    /// and log handler.
340    ///
341    /// # Arguments
342    ///
343    /// * `db` - The database operations handler.
344    /// * `logs_handler` - The database log handler.
345    /// * `block_id` - The ID of the block to process.
346    ///
347    /// # Returns
348    ///
349    /// A `Result` containing an optional vector of significant chain events if the operation succeeds or an error if it fails.
350    async fn process_block_by_id(
351        db: &Db,
352        logs_handler: &U,
353        block_id: u64,
354    ) -> crate::errors::Result<Option<Vec<SignificantChainEvent>>>
355    where
356        U: ChainLogHandler + 'static,
357        Db: HoprDbLogOperations + 'static,
358    {
359        let logs = db.get_logs(Some(block_id), Some(0)).await?;
360        let mut block = BlockWithLogs {
361            block_id,
362            ..Default::default()
363        };
364
365        for log in logs {
366            if log.block_number == block_id {
367                block.logs.insert(log);
368            } else {
369                error!(
370                    expected = block_id,
371                    actual = log.block_number,
372                    "block number mismatch in logs from database"
373                );
374                panic!("block number mismatch in logs from database")
375            }
376        }
377
378        Ok(Self::process_block(db, logs_handler, block, true).await)
379    }
380
381    /// Processes a block and its logs.
382    ///
383    /// This function collects events from the block logs and updates the database with the processed logs.
384    ///
385    /// # Arguments
386    ///
387    /// * `db` - The database operations handler.
388    /// * `logs_handler` - The database log handler.
389    /// * `block` - The block with logs to process.
390    /// * `fetch_checksum_from_db` - A boolean indicating whether to fetch the checksum from the database.
391    ///
392    /// # Returns
393    ///
394    /// An optional vector of significant chain events if the operation succeeds.
395    async fn process_block(
396        db: &Db,
397        logs_handler: &U,
398        block: BlockWithLogs,
399        fetch_checksum_from_db: bool,
400    ) -> Option<Vec<SignificantChainEvent>>
401    where
402        U: ChainLogHandler + 'static,
403        Db: HoprDbLogOperations + 'static,
404    {
405        let block_id = block.block_id;
406        let log_count = block.logs.len();
407        debug!(block_id, "processing events");
408
409        // FIXME: The block indexing and marking as processed should be done in a single
410        // transaction. This is difficult since currently this would be across databases.
411        match logs_handler.collect_block_events(block.clone()).await {
412            Ok(events) => {
413                match db.set_logs_processed(Some(block_id), Some(0)).await {
414                    Ok(_) => match db.update_logs_checksums().await {
415                        Ok(last_log_checksum) => {
416                            let checksum = if fetch_checksum_from_db {
417                                let last_log = block.logs.into_iter().next_back().unwrap();
418                                let log = db.get_log(block_id, last_log.tx_index, last_log.log_index).await.ok()?;
419
420                                log.checksum
421                            } else {
422                                Some(last_log_checksum.to_string())
423                            };
424
425                            if log_count != 0 {
426                                info!(
427                                    block_number = block_id,
428                                    log_count, last_log_checksum = ?checksum, "Indexer state update",
429                                );
430
431                                #[cfg(all(feature = "prometheus", not(test)))]
432                                {
433                                    if let Some(last_log_checksum) = checksum {
434                                        if let Ok(checksum_hash) = Hash::from_hex(last_log_checksum.as_str()) {
435                                            let low_4_bytes = hopr_primitive_types::prelude::U256::from_big_endian(
436                                                checksum_hash.as_ref(),
437                                            )
438                                            .low_u32();
439                                            METRIC_INDEXER_CHECKSUM.set(low_4_bytes.into());
440                                        } else {
441                                            error!("Invalid checksum generated from logs");
442                                        }
443                                    }
444                                }
445                            }
446
447                            // finally update the block number in the database to the last
448                            // processed block
449                            match db.set_indexer_state_info(None, block_id as u32).await {
450                                Ok(_) => {
451                                    trace!(block_id, "updated indexer state info");
452                                }
453                                Err(error) => error!(block_id, %error, "failed to update indexer state info"),
454                            }
455                        }
456                        Err(error) => error!(block_id, %error, "failed to update checksums for logs from block"),
457                    },
458                    Err(error) => error!(block_id, %error, "failed to mark logs from block as processed"),
459                }
460
461                debug!(
462                    block_id,
463                    num_events = events.len(),
464                    "processed significant chain events from block",
465                );
466
467                Some(events)
468            }
469            Err(error) => {
470                error!(block_id, %error, "failed to process logs from block into events");
471                None
472            }
473        }
474    }
475
476    async fn update_chain_head(rpc: &T, chain_head: Arc<AtomicU64>) -> u64
477    where
478        T: HoprIndexerRpcOperations + 'static,
479    {
480        match rpc.block_number().await {
481            Ok(head) => {
482                chain_head.store(head, Ordering::Relaxed);
483                debug!(head, "Updated chain head");
484                head
485            }
486            Err(error) => {
487                error!(%error, "Failed to fetch block number from RPC");
488                panic!("Failed to fetch block number from RPC, cannot continue indexing due to {error}")
489            }
490        }
491    }
492
493    /// Calculates the synchronization progress.
494    ///
495    /// This function processes a block and updates synchronization metrics and state.
496    ///
497    /// # Arguments
498    ///
499    /// * `prefix` - A string prefix for logging purposes.
500    /// * `block` - The block with logs to process.
501    /// * `rpc` - The RPC operations handler.
502    /// * `chain_head` - The current chain head block number.
503    /// * `is_synced` - A boolean indicating whether the indexer is synced.
504    /// * `start_block` - The first block number to process.
505    /// * `tx` - A sender channel for synchronization notifications.
506    ///
507    /// # Returns
508    ///
509    /// The block which was provided as input.
510    async fn calculate_sync_process(
511        current_block: u64,
512        rpc: &T,
513        chain_head: Arc<AtomicU64>,
514        is_synced: Arc<AtomicBool>,
515        start_block: u64,
516        mut tx: futures::channel::mpsc::Sender<()>,
517    ) where
518        T: HoprIndexerRpcOperations + 'static,
519    {
520        #[cfg(all(feature = "prometheus", not(test)))]
521        {
522            METRIC_INDEXER_CURRENT_BLOCK.set(current_block as f64);
523        }
524
525        let mut head = chain_head.load(Ordering::Relaxed);
526
527        // We only print out sync progress if we are not yet synced.
528        // Once synced, we don't print out progress anymore.
529        if !is_synced.load(Ordering::Relaxed) {
530            let mut block_difference = head.saturating_sub(start_block);
531
532            let progress = if block_difference == 0 {
533                // Before we call the sync complete, we check the chain again.
534                head = Self::update_chain_head(rpc, chain_head.clone()).await;
535                block_difference = head.saturating_sub(start_block);
536
537                if block_difference == 0 {
538                    1_f64
539                } else {
540                    (current_block - start_block) as f64 / block_difference as f64
541                }
542            } else {
543                (current_block - start_block) as f64 / block_difference as f64
544            };
545
546            info!(
547                progress = progress * 100_f64,
548                block = current_block,
549                head,
550                "Sync progress to last known head"
551            );
552
553            #[cfg(all(feature = "prometheus", not(test)))]
554            METRIC_INDEXER_SYNC_PROGRESS.set(progress);
555
556            if current_block >= head {
557                info!("indexer sync completed successfully");
558                is_synced.store(true, Ordering::Relaxed);
559                if let Err(e) = tx.try_send(()) {
560                    error!(error = %e, "failed to notify about achieving indexer synchronization")
561                }
562            }
563        }
564    }
565}
566
567#[cfg(test)]
568mod tests {
569    use async_trait::async_trait;
570    use ethers::{
571        abi::{encode, Token},
572        contract::EthEvent,
573    };
574    use futures::{join, Stream};
575    use hex_literal::hex;
576    use mockall::mock;
577    use multiaddr::Multiaddr;
578    use std::collections::BTreeSet;
579    use std::pin::Pin;
580
581    use hopr_bindings::hopr_announcements::AddressAnnouncementFilter;
582    use hopr_chain_rpc::BlockWithLogs;
583    use hopr_chain_types::chain_events::ChainEventType;
584    use hopr_crypto_types::keypairs::{Keypair, OffchainKeypair};
585    use hopr_crypto_types::prelude::ChainKeypair;
586    use hopr_db_sql::accounts::HoprDbAccountOperations;
587    use hopr_db_sql::db::HoprDb;
588    use hopr_internal_types::account::{AccountEntry, AccountType};
589    use hopr_primitive_types::prelude::*;
590
591    use crate::traits::MockChainLogHandler;
592
593    use super::*;
594
595    lazy_static::lazy_static! {
596        static ref ALICE_OKP: OffchainKeypair = OffchainKeypair::random();
597        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
598        static ref ALICE: Address = ALICE_KP.public().to_address();
599        static ref BOB_OKP: OffchainKeypair = OffchainKeypair::random();
600        static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
601        static ref CHRIS: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
602
603        static ref RANDOM_ANNOUNCEMENT_CHAIN_EVENT: ChainEventType = ChainEventType::Announcement {
604            peer: (*OffchainKeypair::from_secret(&hex!("14d2d952715a51aadbd4cc6bfac9aa9927182040da7b336d37d5bb7247aa7566")).expect("lazy static keypair should be constructible").public()).into(),
605            address: hex!("2f4b7662a192b8125bbf51cfbf1bf5cc00b2c8e5").into(),
606            multiaddresses: vec![Multiaddr::empty()],
607        };
608    }
609
610    fn build_announcement_logs(
611        address: Address,
612        size: usize,
613        block_number: u64,
614        log_index: U256,
615    ) -> anyhow::Result<Vec<SerializableLog>> {
616        let mut logs: Vec<SerializableLog> = vec![];
617        let block_hash = Hash::create(&[format!("my block hash {block_number}").as_bytes()]);
618
619        for i in 0..size {
620            let test_multiaddr: Multiaddr = format!("/ip4/1.2.3.4/tcp/{}", 1000 + i).parse()?;
621            logs.push(SerializableLog {
622                address,
623                block_hash: block_hash.into(),
624                topics: vec![AddressAnnouncementFilter::signature().into()],
625                data: encode(&[
626                    Token::Address(ethers::abi::Address::from_slice(address.as_ref())),
627                    Token::String(test_multiaddr.to_string()),
628                ])
629                .into(),
630                tx_hash: Hash::create(&[format!("my tx hash {i}").as_bytes()]).into(),
631                tx_index: 0,
632                block_number,
633                log_index: log_index.as_u64(),
634                ..Default::default()
635            });
636        }
637
638        Ok(logs)
639    }
640
641    mock! {
642        HoprIndexerOps {}     // Name of the mock struct, less the "Mock" prefix
643
644        #[async_trait]
645        impl HoprIndexerRpcOperations for HoprIndexerOps {
646            async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
647
648            fn try_stream_logs<'a>(
649                &'a self,
650                start_block_number: u64,
651                filter: LogFilter,
652            ) -> hopr_chain_rpc::errors::Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>>;
653        }
654    }
655
656    #[async_std::test]
657    async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_none_if_none_is_found(
658    ) -> anyhow::Result<()> {
659        let mut handlers = MockChainLogHandler::new();
660        let mut rpc = MockHoprIndexerOps::new();
661        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
662
663        let addr = Address::new(b"my address 123456789");
664        let topic = Hash::create(&[b"my topic"]);
665        db.ensure_logs_origin(vec![(addr, topic)]).await?;
666
667        handlers.expect_contract_addresses().return_const(vec![addr]);
668        handlers
669            .expect_contract_address_topics()
670            .withf(move |x| x == &addr)
671            .return_const(vec![topic.into()]);
672
673        let head_block = 1000;
674        rpc.expect_block_number().return_once(move || Ok(head_block));
675
676        let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
677        rpc.expect_try_stream_logs()
678            .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == 0)
679            .return_once(move |_, _| Ok(Box::pin(rx)));
680
681        let indexer = Indexer::new(
682            rpc,
683            handlers,
684            db.clone(),
685            IndexerConfig::default(),
686            async_channel::unbounded().0,
687        )
688        .without_panic_on_completion();
689
690        let (indexing, _) = join!(indexer.start(), async move {
691            async_std::task::sleep(std::time::Duration::from_millis(200)).await;
692            tx.close_channel()
693        });
694        assert!(indexing.is_err()); // terminated by the close channel
695
696        Ok(())
697    }
698
699    #[test_log::test(async_std::test)]
700    async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_it_when_found() -> anyhow::Result<()>
701    {
702        let mut handlers = MockChainLogHandler::new();
703        let mut rpc = MockHoprIndexerOps::new();
704        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
705        let head_block = 1000;
706        let latest_block = 15u64;
707
708        let addr = Address::new(b"my address 123456789");
709        let topic = Hash::create(&[b"my topic"]);
710
711        handlers.expect_contract_addresses().return_const(vec![addr]);
712        handlers
713            .expect_contract_address_topics()
714            .withf(move |x| x == &addr)
715            .return_const(vec![topic.into()]);
716        db.ensure_logs_origin(vec![(addr, topic)]).await?;
717
718        rpc.expect_block_number().return_once(move || Ok(head_block));
719
720        let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
721        rpc.expect_try_stream_logs()
722            .once()
723            .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == latest_block + 1)
724            .return_once(move |_, _| Ok(Box::pin(rx)));
725
726        // insert and process latest block
727        let log_1 = SerializableLog {
728            address: Address::new(b"my address 123456789"),
729            topics: [Hash::create(&[b"my topic"]).into()].into(),
730            data: [1, 2, 3, 4].into(),
731            tx_index: 1u64,
732            block_number: latest_block,
733            block_hash: Hash::create(&[b"my block hash"]).into(),
734            tx_hash: Hash::create(&[b"my tx hash"]).into(),
735            log_index: 1u64,
736            removed: false,
737            processed: Some(false),
738            ..Default::default()
739        };
740        assert!(db.store_log(log_1.clone()).await.is_ok());
741        assert!(db.set_logs_processed(Some(latest_block), Some(0)).await.is_ok());
742        assert!(db.update_logs_checksums().await.is_ok());
743
744        let indexer = Indexer::new(
745            rpc,
746            handlers,
747            db.clone(),
748            IndexerConfig {
749                fast_sync: false,
750                ..Default::default()
751            },
752            async_channel::unbounded().0,
753        )
754        .without_panic_on_completion();
755
756        let (indexing, _) = join!(indexer.start(), async move {
757            async_std::task::sleep(std::time::Duration::from_millis(200)).await;
758            tx.close_channel()
759        });
760        assert!(indexing.is_err()); // terminated by the close channel
761
762        Ok(())
763    }
764
765    #[async_std::test]
766    async fn test_indexer_should_pass_blocks_that_are_finalized() -> anyhow::Result<()> {
767        let mut handlers = MockChainLogHandler::new();
768        let mut rpc = MockHoprIndexerOps::new();
769        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
770
771        let cfg = IndexerConfig::default();
772
773        let addr = Address::new(b"my address 123456789");
774        handlers.expect_contract_addresses().return_const(vec![addr]);
775        handlers
776            .expect_contract_address_topics()
777            .withf(move |x| x == &addr)
778            .return_const(vec![Hash::create(&[b"my topic"]).into()]);
779
780        let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
781        rpc.expect_try_stream_logs()
782            .times(1)
783            .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == 0)
784            .return_once(move |_, _| Ok(Box::pin(rx)));
785
786        let head_block = 1000;
787        rpc.expect_block_number().returning(move || Ok(head_block));
788
789        let finalized_block = BlockWithLogs {
790            block_id: head_block - 1,
791            logs: BTreeSet::from_iter(build_announcement_logs(*BOB, 4, head_block - 1, U256::from(23u8))?),
792        };
793        let head_allowing_finalization = BlockWithLogs {
794            block_id: head_block,
795            logs: BTreeSet::new(),
796        };
797
798        handlers
799            .expect_collect_block_events()
800            .times(finalized_block.logs.len())
801            .returning(|_| Ok(vec![]));
802
803        assert!(tx.start_send(finalized_block.clone()).is_ok());
804        assert!(tx.start_send(head_allowing_finalization.clone()).is_ok());
805
806        let indexer =
807            Indexer::new(rpc, handlers, db.clone(), cfg, async_channel::unbounded().0).without_panic_on_completion();
808        let _ = join!(indexer.start(), async move {
809            async_std::task::sleep(std::time::Duration::from_millis(200)).await;
810            tx.close_channel()
811        });
812
813        Ok(())
814    }
815
816    #[test_log::test(async_std::test)]
817    async fn test_indexer_fast_sync_full_with_resume() -> anyhow::Result<()> {
818        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
819
820        let addr = Address::new(b"my address 123456789");
821        let topic = Hash::create(&[b"my topic"]);
822
823        // Run 1: Fast sync enabled, index empty
824        {
825            let logs = vec![
826                build_announcement_logs(*BOB, 1, 1, U256::from(1u8))?,
827                build_announcement_logs(*BOB, 1, 2, U256::from(1u8))?,
828            ]
829            .into_iter()
830            .flatten()
831            .collect::<Vec<_>>();
832
833            assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
834
835            for log in logs {
836                assert!(db.store_log(log).await.is_ok());
837            }
838            assert!(db.update_logs_checksums().await.is_ok());
839            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 0);
840            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
841
842            let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
843            let (tx_events, _) = async_channel::unbounded();
844
845            let head_block = 5;
846            let mut rpc = MockHoprIndexerOps::new();
847            rpc.expect_block_number().returning(move || Ok(head_block));
848            rpc.expect_try_stream_logs()
849                .times(1)
850                .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == 3)
851                .return_once(move |_, _| Ok(Box::pin(rx)));
852
853            let mut handlers = MockChainLogHandler::new();
854            handlers.expect_contract_addresses().return_const(vec![addr]);
855            handlers
856                .expect_contract_address_topics()
857                .withf(move |x| x == &addr)
858                .return_const(vec![topic.into()]);
859            handlers
860                .expect_collect_block_events()
861                .times(2)
862                .withf(move |b| [1, 2].contains(&b.block_id))
863                .returning(|_| Ok(vec![]));
864
865            let indexer_cfg = IndexerConfig {
866                start_block_number: 0,
867                fast_sync: true,
868            };
869            let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
870            let (indexing, _) = join!(indexer.start(), async move {
871                async_std::task::sleep(std::time::Duration::from_millis(200)).await;
872                tx.close_channel()
873            });
874            assert!(indexing.is_err()); // terminated by the close channel
875
876            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
877            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
878
879            // At the end we need to simulate that the index is not empty,
880            // thus storing some data.
881            db.insert_account(
882                None,
883                AccountEntry::new(*ALICE_OKP.public(), *ALICE, AccountType::NotAnnounced).into(),
884            )
885            .await?;
886            db.insert_account(
887                None,
888                AccountEntry::new(*BOB_OKP.public(), *BOB, AccountType::NotAnnounced).into(),
889            )
890            .await?;
891        }
892
893        // Run 2: Fast sync enabled, index not empty, resume after 2 logs
894        {
895            let logs = vec![
896                build_announcement_logs(*BOB, 1, 3, U256::from(1u8))?,
897                build_announcement_logs(*BOB, 1, 4, U256::from(1u8))?,
898            ]
899            .into_iter()
900            .flatten()
901            .collect::<Vec<_>>();
902
903            assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
904
905            for log in logs {
906                assert!(db.store_log(log).await.is_ok());
907            }
908            assert!(db.update_logs_checksums().await.is_ok());
909            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
910            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
911
912            let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
913            let (tx_events, _) = async_channel::unbounded();
914
915            let head_block = 5;
916            let mut rpc = MockHoprIndexerOps::new();
917            rpc.expect_block_number().returning(move || Ok(head_block));
918            rpc.expect_try_stream_logs()
919                .times(1)
920                .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == 5)
921                .return_once(move |_, _| Ok(Box::pin(rx)));
922
923            let mut handlers = MockChainLogHandler::new();
924            handlers.expect_contract_addresses().return_const(vec![addr]);
925            handlers
926                .expect_contract_address_topics()
927                .withf(move |x| x == &addr)
928                .return_const(vec![topic.into()]);
929
930            handlers
931                .expect_collect_block_events()
932                .times(2)
933                .withf(move |b| [3, 4].contains(&b.block_id))
934                .returning(|_| Ok(vec![]));
935
936            let indexer_cfg = IndexerConfig {
937                start_block_number: 0,
938                fast_sync: true,
939            };
940            let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
941            let (indexing, _) = join!(indexer.start(), async move {
942                async_std::task::sleep(std::time::Duration::from_millis(200)).await;
943                tx.close_channel()
944            });
945            assert!(indexing.is_err()); // terminated by the close channel
946
947            assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 4);
948            assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
949        }
950
951        Ok(())
952    }
953
954    #[test_log::test(async_std::test)]
955    async fn test_indexer_should_yield_back_once_the_past_events_are_indexed() -> anyhow::Result<()> {
956        let mut handlers = MockChainLogHandler::new();
957        let mut rpc = MockHoprIndexerOps::new();
958        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
959
960        let cfg = IndexerConfig::default();
961
962        // We don't want to index anything really
963        let addr = Address::new(b"my address 123456789");
964        handlers.expect_contract_addresses().return_const(vec![addr]);
965        handlers
966            .expect_contract_address_topics()
967            .withf(move |x| x == &addr)
968            .return_const(vec![Hash::create(&[b"my topic"]).into()]);
969
970        let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
971        // Expected to be called once starting at 0 and yield the respective blocks
972        rpc.expect_try_stream_logs()
973            .times(1)
974            .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == 0)
975            .return_once(move |_, _| Ok(Box::pin(rx)));
976
977        let head_block = 1000;
978        let block_numbers = vec![head_block - 1, head_block, head_block + 1];
979
980        let blocks: Vec<BlockWithLogs> = block_numbers
981            .iter()
982            .map(|block_id| BlockWithLogs {
983                block_id: *block_id,
984                logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, *block_id, U256::from(23u8)).unwrap()),
985            })
986            .collect();
987
988        for _ in 0..(blocks.len() as u64) {
989            rpc.expect_block_number().returning(move || Ok(head_block));
990        }
991
992        for block in blocks.iter() {
993            assert!(tx.start_send(block.clone()).is_ok());
994        }
995
996        // Generate the expected events to be able to process the blocks
997        handlers
998            .expect_collect_block_events()
999            .times(1)
1000            .withf(move |b| block_numbers.contains(&b.block_id))
1001            .returning(|b| {
1002                let block_id = b.block_id;
1003                Ok(vec![SignificantChainEvent {
1004                    tx_hash: Hash::create(&[format!("my tx hash {block_id}").as_bytes()]),
1005                    event_type: RANDOM_ANNOUNCEMENT_CHAIN_EVENT.clone(),
1006                }])
1007            });
1008
1009        let (tx_events, rx_events) = async_channel::unbounded();
1010        let indexer = Indexer::new(rpc, handlers, db.clone(), cfg, tx_events).without_panic_on_completion();
1011        indexer.start().await?;
1012
1013        // At this point we expect 2 events to arrive. The third event, which was generated first,
1014        // should be dropped because it was generated before the indexer was in sync with head.
1015        let _first = rx_events.recv();
1016        let _second = rx_events.recv();
1017        let third = rx_events.try_recv();
1018
1019        assert!(third.is_err());
1020
1021        Ok(())
1022    }
1023
1024    #[test_log::test(async_std::test)]
1025    async fn test_indexer_should_not_reprocess_last_processed_block() -> anyhow::Result<()> {
1026        let last_processed_block = 100_u64;
1027
1028        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1029
1030        let addr = Address::new(b"my address 123456789");
1031        let topic = Hash::create(&[b"my topic"]);
1032        assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1033
1034        // insert and process latest block
1035        let log_1 = SerializableLog {
1036            address: Address::new(b"my address 123456789"),
1037            topics: [Hash::create(&[b"my topic"]).into()].into(),
1038            data: [1, 2, 3, 4].into(),
1039            tx_index: 1u64,
1040            block_number: last_processed_block,
1041            block_hash: Hash::create(&[b"my block hash"]).into(),
1042            tx_hash: Hash::create(&[b"my tx hash"]).into(),
1043            log_index: 1u64,
1044            removed: false,
1045            processed: Some(false),
1046            ..Default::default()
1047        };
1048        assert!(db.store_log(log_1.clone()).await.is_ok());
1049        assert!(db.set_logs_processed(Some(last_processed_block), Some(0)).await.is_ok());
1050        assert!(db.update_logs_checksums().await.is_ok());
1051
1052        let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1053
1054        let mut rpc = MockHoprIndexerOps::new();
1055        rpc.expect_try_stream_logs()
1056            .once()
1057            .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == last_processed_block + 1)
1058            .return_once(move |_, _| Ok(Box::pin(rx)));
1059
1060        rpc.expect_block_number()
1061            .times(3)
1062            .returning(move || Ok(last_processed_block + 1));
1063
1064        let block = BlockWithLogs {
1065            block_id: last_processed_block + 1,
1066            logs: BTreeSet::from_iter(build_announcement_logs(
1067                *ALICE,
1068                1,
1069                last_processed_block + 1,
1070                U256::from(23u8),
1071            )?),
1072        };
1073
1074        tx.start_send(block)?;
1075
1076        let mut handlers = MockChainLogHandler::new();
1077        handlers.expect_contract_addresses().return_const(vec![addr]);
1078        handlers
1079            .expect_contract_address_topics()
1080            .withf(move |x| x == &addr)
1081            .return_const(vec![topic.into()]);
1082
1083        let indexer_cfg = IndexerConfig {
1084            start_block_number: 0,
1085            fast_sync: false,
1086        };
1087
1088        let (tx_events, _) = async_channel::unbounded();
1089        let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1090        indexer.start().await?;
1091
1092        Ok(())
1093    }
1094}