1use futures::{stream, FutureExt, StreamExt};
2use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3use std::sync::Arc;
4use tracing::{debug, error, info, trace};
5
6use hopr_async_runtime::prelude::{spawn, JoinHandle};
7use hopr_chain_rpc::{BlockWithLogs, HoprIndexerRpcOperations, LogFilter};
8use hopr_chain_types::chain_events::SignificantChainEvent;
9use hopr_crypto_types::types::Hash;
10use hopr_db_api::logs::HoprDbLogOperations;
11use hopr_db_sql::info::HoprDbInfoOperations;
12use hopr_db_sql::HoprDbGeneralModelOperations;
13
14#[cfg(all(feature = "prometheus", not(test)))]
15use hopr_primitive_types::prelude::ToHex;
16
17use crate::{
18 errors::{CoreEthereumIndexerError, Result},
19 traits::ChainLogHandler,
20 IndexerConfig,
21};
22
23#[cfg(all(feature = "prometheus", not(test)))]
24lazy_static::lazy_static! {
25 static ref METRIC_INDEXER_CURRENT_BLOCK: hopr_metrics::metrics::SimpleGauge =
26 hopr_metrics::metrics::SimpleGauge::new(
27 "hopr_indexer_block_number",
28 "Current last processed block number by the indexer",
29 ).unwrap();
30 static ref METRIC_INDEXER_CHECKSUM: hopr_metrics::metrics::SimpleGauge =
31 hopr_metrics::metrics::SimpleGauge::new(
32 "hopr_indexer_checksum",
33 "Contains an unsigned integer that represents the low 32-bits of the Indexer checksum"
34 ).unwrap();
35 static ref METRIC_INDEXER_SYNC_PROGRESS: hopr_metrics::metrics::SimpleGauge =
36 hopr_metrics::metrics::SimpleGauge::new(
37 "hopr_indexer_sync_progress",
38 "Sync progress of the historical data by the indexer",
39 ).unwrap();
40 static ref METRIC_INDEXER_SYNC_SOURCE: hopr_metrics::metrics::MultiGauge =
41 hopr_metrics::metrics::MultiGauge::new(
42 "hopr_indexer_data_source",
43 "Current data source of the Indexer",
44 &["source"],
45 ).unwrap();
46
47}
48
49#[derive(Debug, Clone)]
63pub struct Indexer<T, U, Db>
64where
65 T: HoprIndexerRpcOperations + Send + 'static,
66 U: ChainLogHandler + Send + 'static,
67 Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
68{
69 rpc: Option<T>,
70 db_processor: Option<U>,
71 db: Db,
72 cfg: IndexerConfig,
73 egress: async_channel::Sender<SignificantChainEvent>,
74 panic_on_completion: bool,
77}
78
79impl<T, U, Db> Indexer<T, U, Db>
80where
81 T: HoprIndexerRpcOperations + Sync + Send + 'static,
82 U: ChainLogHandler + Send + Sync + 'static,
83 Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
84{
85 pub fn new(
86 rpc: T,
87 db_processor: U,
88 db: Db,
89 cfg: IndexerConfig,
90 egress: async_channel::Sender<SignificantChainEvent>,
91 ) -> Self {
92 Self {
93 rpc: Some(rpc),
94 db_processor: Some(db_processor),
95 db,
96 cfg,
97 egress,
98 panic_on_completion: true,
99 }
100 }
101
102 pub fn without_panic_on_completion(mut self) -> Self {
104 self.panic_on_completion = false;
105 self
106 }
107
108 pub async fn start(mut self) -> Result<JoinHandle<()>>
109 where
110 T: HoprIndexerRpcOperations + 'static,
111 U: ChainLogHandler + 'static,
112 Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
113 {
114 if self.rpc.is_none() || self.db_processor.is_none() {
115 return Err(CoreEthereumIndexerError::ProcessError(
116 "indexer cannot start, missing components".into(),
117 ));
118 }
119
120 info!("Starting chain indexing");
121
122 let rpc = self.rpc.take().expect("rpc should be present");
123 let logs_handler = Arc::new(self.db_processor.take().expect("db_processor should be present"));
124 let db = self.db.clone();
125 let tx_significant_events = self.egress.clone();
126 let panic_on_completion = self.panic_on_completion;
127
128 let mut addresses = vec![];
130 let mut topics = vec![];
131 let mut address_topics = vec![];
132 logs_handler.contract_addresses().iter().for_each(|address| {
133 let contract_topics = logs_handler.contract_address_topics(*address);
134 if !contract_topics.is_empty() {
135 addresses.push(*address);
136 for topic in contract_topics {
137 address_topics.push((*address, Hash::from(topic)));
138 topics.push(topic);
139 }
140 }
141 });
142
143 db.ensure_logs_origin(address_topics).await?;
146
147 let log_filter = LogFilter {
148 address: addresses,
149 topics: topics.into_iter().map(Hash::from).collect(),
150 };
151
152 let is_synced = Arc::new(AtomicBool::new(false));
153 let chain_head = Arc::new(AtomicU64::new(0));
154
155 debug!("Updating chain head at indexer startup");
158 Self::update_chain_head(&rpc, chain_head.clone()).await;
159
160 let fast_sync_configured = self.cfg.fast_sync;
167 let index_empty = self.db.index_is_empty().await?;
168
169 #[derive(PartialEq, Eq)]
170 enum FastSyncMode {
171 None,
172 FromScratch,
173 Continue,
174 }
175
176 let will_perform_fast_sync = match (fast_sync_configured, index_empty) {
177 (true, false) => {
178 info!("Fast sync is enabled, but the index database is not empty. Fast sync will continue on existing unprocessed logs.");
179 FastSyncMode::Continue
180 }
181 (false, true) => {
182 info!("Fast sync is disabled, but the index database is empty. Doing a full re-sync.");
183 self.db.clear_index_db(None).await?;
185 self.db.set_logs_unprocessed(None, None).await?;
186 FastSyncMode::None
187 }
188 (false, false) => {
189 info!("Fast sync is disabled and the index database is not empty. Continuing normal sync.");
190 FastSyncMode::None
191 }
192 (true, true) => {
193 info!("Fast sync is enabled, starting the fast sync process");
194 self.db.clear_index_db(None).await?;
196 self.db.set_logs_unprocessed(None, None).await?;
197 FastSyncMode::FromScratch
198 }
199 };
200
201 let (tx, mut rx) = futures::channel::mpsc::channel::<()>(1);
202
203 if FastSyncMode::None != will_perform_fast_sync {
205 let processed = match will_perform_fast_sync {
206 FastSyncMode::FromScratch => None,
207 FastSyncMode::Continue => Some(false),
208 _ => unreachable!(),
209 };
210
211 #[cfg(all(feature = "prometheus", not(test)))]
212 {
213 METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 1.0);
214 METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 0.0);
215 }
216
217 let log_block_numbers = self.db.get_logs_block_numbers(None, None, processed).await?;
218 let _first_log_block_number = log_block_numbers.first().copied().unwrap_or(0);
219 let _head = chain_head.load(Ordering::Relaxed);
220 for block_number in log_block_numbers {
221 Self::process_block_by_id(&db, &logs_handler, block_number).await?;
223 #[cfg(all(feature = "prometheus", not(test)))]
224 {
225 let progress =
226 (block_number - _first_log_block_number) as f64 / (_head - _first_log_block_number) as f64;
227 METRIC_INDEXER_SYNC_PROGRESS.set(progress);
228 }
229 }
230 }
231
232 info!("Building rpc indexer background process");
233
234 let next_block_to_process = if let Some(last_log) = self.db.get_last_checksummed_log().await? {
235 info!(
236 start_block = last_log.block_number,
237 start_checksum = last_log.checksum.unwrap(),
238 "Loaded indexer state",
239 );
240
241 if self.cfg.start_block_number < last_log.block_number {
242 last_log.block_number + 1
244 } else {
245 self.cfg.start_block_number
246 }
247 } else {
248 self.cfg.start_block_number
249 };
250
251 info!(next_block_to_process, "Indexer start point");
252
253 let indexing_proc = spawn(async move {
254 debug!("Updating chain head at indexer startup");
256 Self::update_chain_head(&rpc, chain_head.clone()).await;
257
258 #[cfg(all(feature = "prometheus", not(test)))]
259 {
260 METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 0.0);
261 METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 1.0);
262 }
263
264 let event_stream = rpc
265 .try_stream_logs(next_block_to_process, log_filter)
266 .expect("block stream should be constructible")
267 .then(|block| {
268 Self::calculate_sync_process(
269 block.block_id,
270 &rpc,
271 chain_head.clone(),
272 is_synced.clone(),
273 next_block_to_process,
274 tx.clone(),
275 )
276 .map(|_| block)
277 })
278 .filter_map(|block| {
279 let db = db.clone();
280
281 async move {
282 debug!(%block, "storing logs from block");
283 let logs = block.logs.clone();
284 let logs_vec = logs.into_iter().collect();
285 match db.store_logs(logs_vec).await {
286 Ok(store_results) => {
287 if let Some(error) = store_results
288 .into_iter()
289 .filter(|r| r.is_err())
290 .map(|r| r.unwrap_err())
291 .next()
292 {
293 error!(%block, %error, "failed to processed stored logs from block");
294 None
295 } else {
296 Some(block)
297 }
298 }
299 Err(error) => {
300 error!(%block, %error, "failed to store logs from block");
301 None
302 }
303 }
304 }
305 })
306 .filter_map(|block| Self::process_block(&db, &logs_handler, block, false))
307 .flat_map(stream::iter);
308
309 futures::pin_mut!(event_stream);
310 while let Some(event) = event_stream.next().await {
311 trace!(%event, "processing on-chain event");
312 if is_synced.load(Ordering::Relaxed) {
314 if let Err(error) = tx_significant_events.try_send(event) {
315 error!(%error, "failed to pass a significant chain event further");
316 }
317 }
318 }
319
320 if panic_on_completion {
321 panic!(
322 "Indexer event stream has been terminated. This error may be caused by a failed RPC connection."
323 );
324 }
325 });
326
327 if rx.next().await.is_some() {
328 Ok(indexing_proc)
329 } else {
330 Err(crate::errors::CoreEthereumIndexerError::ProcessError(
331 "Error during indexing start".into(),
332 ))
333 }
334 }
335
336 async fn process_block_by_id(
351 db: &Db,
352 logs_handler: &U,
353 block_id: u64,
354 ) -> crate::errors::Result<Option<Vec<SignificantChainEvent>>>
355 where
356 U: ChainLogHandler + 'static,
357 Db: HoprDbLogOperations + 'static,
358 {
359 let logs = db.get_logs(Some(block_id), Some(0)).await?;
360 let mut block = BlockWithLogs {
361 block_id,
362 ..Default::default()
363 };
364
365 for log in logs {
366 if log.block_number == block_id {
367 block.logs.insert(log);
368 } else {
369 error!(
370 expected = block_id,
371 actual = log.block_number,
372 "block number mismatch in logs from database"
373 );
374 panic!("block number mismatch in logs from database")
375 }
376 }
377
378 Ok(Self::process_block(db, logs_handler, block, true).await)
379 }
380
381 async fn process_block(
396 db: &Db,
397 logs_handler: &U,
398 block: BlockWithLogs,
399 fetch_checksum_from_db: bool,
400 ) -> Option<Vec<SignificantChainEvent>>
401 where
402 U: ChainLogHandler + 'static,
403 Db: HoprDbLogOperations + 'static,
404 {
405 let block_id = block.block_id;
406 let log_count = block.logs.len();
407 debug!(block_id, "processing events");
408
409 match logs_handler.collect_block_events(block.clone()).await {
412 Ok(events) => {
413 match db.set_logs_processed(Some(block_id), Some(0)).await {
414 Ok(_) => match db.update_logs_checksums().await {
415 Ok(last_log_checksum) => {
416 let checksum = if fetch_checksum_from_db {
417 let last_log = block.logs.into_iter().next_back().unwrap();
418 let log = db.get_log(block_id, last_log.tx_index, last_log.log_index).await.ok()?;
419
420 log.checksum
421 } else {
422 Some(last_log_checksum.to_string())
423 };
424
425 if log_count != 0 {
426 info!(
427 block_number = block_id,
428 log_count, last_log_checksum = ?checksum, "Indexer state update",
429 );
430
431 #[cfg(all(feature = "prometheus", not(test)))]
432 {
433 if let Some(last_log_checksum) = checksum {
434 if let Ok(checksum_hash) = Hash::from_hex(last_log_checksum.as_str()) {
435 let low_4_bytes = hopr_primitive_types::prelude::U256::from_big_endian(
436 checksum_hash.as_ref(),
437 )
438 .low_u32();
439 METRIC_INDEXER_CHECKSUM.set(low_4_bytes.into());
440 } else {
441 error!("Invalid checksum generated from logs");
442 }
443 }
444 }
445 }
446
447 match db.set_indexer_state_info(None, block_id as u32).await {
450 Ok(_) => {
451 trace!(block_id, "updated indexer state info");
452 }
453 Err(error) => error!(block_id, %error, "failed to update indexer state info"),
454 }
455 }
456 Err(error) => error!(block_id, %error, "failed to update checksums for logs from block"),
457 },
458 Err(error) => error!(block_id, %error, "failed to mark logs from block as processed"),
459 }
460
461 debug!(
462 block_id,
463 num_events = events.len(),
464 "processed significant chain events from block",
465 );
466
467 Some(events)
468 }
469 Err(error) => {
470 error!(block_id, %error, "failed to process logs from block into events");
471 None
472 }
473 }
474 }
475
476 async fn update_chain_head(rpc: &T, chain_head: Arc<AtomicU64>) -> u64
477 where
478 T: HoprIndexerRpcOperations + 'static,
479 {
480 match rpc.block_number().await {
481 Ok(head) => {
482 chain_head.store(head, Ordering::Relaxed);
483 debug!(head, "Updated chain head");
484 head
485 }
486 Err(error) => {
487 error!(%error, "Failed to fetch block number from RPC");
488 panic!("Failed to fetch block number from RPC, cannot continue indexing due to {error}")
489 }
490 }
491 }
492
493 async fn calculate_sync_process(
511 current_block: u64,
512 rpc: &T,
513 chain_head: Arc<AtomicU64>,
514 is_synced: Arc<AtomicBool>,
515 start_block: u64,
516 mut tx: futures::channel::mpsc::Sender<()>,
517 ) where
518 T: HoprIndexerRpcOperations + 'static,
519 {
520 #[cfg(all(feature = "prometheus", not(test)))]
521 {
522 METRIC_INDEXER_CURRENT_BLOCK.set(current_block as f64);
523 }
524
525 let mut head = chain_head.load(Ordering::Relaxed);
526
527 if !is_synced.load(Ordering::Relaxed) {
530 let mut block_difference = head.saturating_sub(start_block);
531
532 let progress = if block_difference == 0 {
533 head = Self::update_chain_head(rpc, chain_head.clone()).await;
535 block_difference = head.saturating_sub(start_block);
536
537 if block_difference == 0 {
538 1_f64
539 } else {
540 (current_block - start_block) as f64 / block_difference as f64
541 }
542 } else {
543 (current_block - start_block) as f64 / block_difference as f64
544 };
545
546 info!(
547 progress = progress * 100_f64,
548 block = current_block,
549 head,
550 "Sync progress to last known head"
551 );
552
553 #[cfg(all(feature = "prometheus", not(test)))]
554 METRIC_INDEXER_SYNC_PROGRESS.set(progress);
555
556 if current_block >= head {
557 info!("indexer sync completed successfully");
558 is_synced.store(true, Ordering::Relaxed);
559 if let Err(e) = tx.try_send(()) {
560 error!(error = %e, "failed to notify about achieving indexer synchronization")
561 }
562 }
563 }
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use async_trait::async_trait;
570 use ethers::{
571 abi::{encode, Token},
572 contract::EthEvent,
573 };
574 use futures::{join, Stream};
575 use hex_literal::hex;
576 use mockall::mock;
577 use multiaddr::Multiaddr;
578 use std::collections::BTreeSet;
579 use std::pin::Pin;
580
581 use hopr_bindings::hopr_announcements::AddressAnnouncementFilter;
582 use hopr_chain_rpc::BlockWithLogs;
583 use hopr_chain_types::chain_events::ChainEventType;
584 use hopr_crypto_types::keypairs::{Keypair, OffchainKeypair};
585 use hopr_crypto_types::prelude::ChainKeypair;
586 use hopr_db_sql::accounts::HoprDbAccountOperations;
587 use hopr_db_sql::db::HoprDb;
588 use hopr_internal_types::account::{AccountEntry, AccountType};
589 use hopr_primitive_types::prelude::*;
590
591 use crate::traits::MockChainLogHandler;
592
593 use super::*;
594
595 lazy_static::lazy_static! {
596 static ref ALICE_OKP: OffchainKeypair = OffchainKeypair::random();
597 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
598 static ref ALICE: Address = ALICE_KP.public().to_address();
599 static ref BOB_OKP: OffchainKeypair = OffchainKeypair::random();
600 static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
601 static ref CHRIS: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
602
603 static ref RANDOM_ANNOUNCEMENT_CHAIN_EVENT: ChainEventType = ChainEventType::Announcement {
604 peer: (*OffchainKeypair::from_secret(&hex!("14d2d952715a51aadbd4cc6bfac9aa9927182040da7b336d37d5bb7247aa7566")).expect("lazy static keypair should be constructible").public()).into(),
605 address: hex!("2f4b7662a192b8125bbf51cfbf1bf5cc00b2c8e5").into(),
606 multiaddresses: vec![Multiaddr::empty()],
607 };
608 }
609
610 fn build_announcement_logs(
611 address: Address,
612 size: usize,
613 block_number: u64,
614 log_index: U256,
615 ) -> anyhow::Result<Vec<SerializableLog>> {
616 let mut logs: Vec<SerializableLog> = vec![];
617 let block_hash = Hash::create(&[format!("my block hash {block_number}").as_bytes()]);
618
619 for i in 0..size {
620 let test_multiaddr: Multiaddr = format!("/ip4/1.2.3.4/tcp/{}", 1000 + i).parse()?;
621 logs.push(SerializableLog {
622 address,
623 block_hash: block_hash.into(),
624 topics: vec![AddressAnnouncementFilter::signature().into()],
625 data: encode(&[
626 Token::Address(ethers::abi::Address::from_slice(address.as_ref())),
627 Token::String(test_multiaddr.to_string()),
628 ])
629 .into(),
630 tx_hash: Hash::create(&[format!("my tx hash {i}").as_bytes()]).into(),
631 tx_index: 0,
632 block_number,
633 log_index: log_index.as_u64(),
634 ..Default::default()
635 });
636 }
637
638 Ok(logs)
639 }
640
641 mock! {
642 HoprIndexerOps {} #[async_trait]
645 impl HoprIndexerRpcOperations for HoprIndexerOps {
646 async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
647
648 fn try_stream_logs<'a>(
649 &'a self,
650 start_block_number: u64,
651 filter: LogFilter,
652 ) -> hopr_chain_rpc::errors::Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>>;
653 }
654 }
655
656 #[async_std::test]
657 async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_none_if_none_is_found(
658 ) -> anyhow::Result<()> {
659 let mut handlers = MockChainLogHandler::new();
660 let mut rpc = MockHoprIndexerOps::new();
661 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
662
663 let addr = Address::new(b"my address 123456789");
664 let topic = Hash::create(&[b"my topic"]);
665 db.ensure_logs_origin(vec![(addr, topic)]).await?;
666
667 handlers.expect_contract_addresses().return_const(vec![addr]);
668 handlers
669 .expect_contract_address_topics()
670 .withf(move |x| x == &addr)
671 .return_const(vec![topic.into()]);
672
673 let head_block = 1000;
674 rpc.expect_block_number().return_once(move || Ok(head_block));
675
676 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
677 rpc.expect_try_stream_logs()
678 .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == 0)
679 .return_once(move |_, _| Ok(Box::pin(rx)));
680
681 let indexer = Indexer::new(
682 rpc,
683 handlers,
684 db.clone(),
685 IndexerConfig::default(),
686 async_channel::unbounded().0,
687 )
688 .without_panic_on_completion();
689
690 let (indexing, _) = join!(indexer.start(), async move {
691 async_std::task::sleep(std::time::Duration::from_millis(200)).await;
692 tx.close_channel()
693 });
694 assert!(indexing.is_err()); Ok(())
697 }
698
699 #[test_log::test(async_std::test)]
700 async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_it_when_found() -> anyhow::Result<()>
701 {
702 let mut handlers = MockChainLogHandler::new();
703 let mut rpc = MockHoprIndexerOps::new();
704 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
705 let head_block = 1000;
706 let latest_block = 15u64;
707
708 let addr = Address::new(b"my address 123456789");
709 let topic = Hash::create(&[b"my topic"]);
710
711 handlers.expect_contract_addresses().return_const(vec![addr]);
712 handlers
713 .expect_contract_address_topics()
714 .withf(move |x| x == &addr)
715 .return_const(vec![topic.into()]);
716 db.ensure_logs_origin(vec![(addr, topic)]).await?;
717
718 rpc.expect_block_number().return_once(move || Ok(head_block));
719
720 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
721 rpc.expect_try_stream_logs()
722 .once()
723 .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == latest_block + 1)
724 .return_once(move |_, _| Ok(Box::pin(rx)));
725
726 let log_1 = SerializableLog {
728 address: Address::new(b"my address 123456789"),
729 topics: [Hash::create(&[b"my topic"]).into()].into(),
730 data: [1, 2, 3, 4].into(),
731 tx_index: 1u64,
732 block_number: latest_block,
733 block_hash: Hash::create(&[b"my block hash"]).into(),
734 tx_hash: Hash::create(&[b"my tx hash"]).into(),
735 log_index: 1u64,
736 removed: false,
737 processed: Some(false),
738 ..Default::default()
739 };
740 assert!(db.store_log(log_1.clone()).await.is_ok());
741 assert!(db.set_logs_processed(Some(latest_block), Some(0)).await.is_ok());
742 assert!(db.update_logs_checksums().await.is_ok());
743
744 let indexer = Indexer::new(
745 rpc,
746 handlers,
747 db.clone(),
748 IndexerConfig {
749 fast_sync: false,
750 ..Default::default()
751 },
752 async_channel::unbounded().0,
753 )
754 .without_panic_on_completion();
755
756 let (indexing, _) = join!(indexer.start(), async move {
757 async_std::task::sleep(std::time::Duration::from_millis(200)).await;
758 tx.close_channel()
759 });
760 assert!(indexing.is_err()); Ok(())
763 }
764
765 #[async_std::test]
766 async fn test_indexer_should_pass_blocks_that_are_finalized() -> anyhow::Result<()> {
767 let mut handlers = MockChainLogHandler::new();
768 let mut rpc = MockHoprIndexerOps::new();
769 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
770
771 let cfg = IndexerConfig::default();
772
773 let addr = Address::new(b"my address 123456789");
774 handlers.expect_contract_addresses().return_const(vec![addr]);
775 handlers
776 .expect_contract_address_topics()
777 .withf(move |x| x == &addr)
778 .return_const(vec![Hash::create(&[b"my topic"]).into()]);
779
780 let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
781 rpc.expect_try_stream_logs()
782 .times(1)
783 .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == 0)
784 .return_once(move |_, _| Ok(Box::pin(rx)));
785
786 let head_block = 1000;
787 rpc.expect_block_number().returning(move || Ok(head_block));
788
789 let finalized_block = BlockWithLogs {
790 block_id: head_block - 1,
791 logs: BTreeSet::from_iter(build_announcement_logs(*BOB, 4, head_block - 1, U256::from(23u8))?),
792 };
793 let head_allowing_finalization = BlockWithLogs {
794 block_id: head_block,
795 logs: BTreeSet::new(),
796 };
797
798 handlers
799 .expect_collect_block_events()
800 .times(finalized_block.logs.len())
801 .returning(|_| Ok(vec![]));
802
803 assert!(tx.start_send(finalized_block.clone()).is_ok());
804 assert!(tx.start_send(head_allowing_finalization.clone()).is_ok());
805
806 let indexer =
807 Indexer::new(rpc, handlers, db.clone(), cfg, async_channel::unbounded().0).without_panic_on_completion();
808 let _ = join!(indexer.start(), async move {
809 async_std::task::sleep(std::time::Duration::from_millis(200)).await;
810 tx.close_channel()
811 });
812
813 Ok(())
814 }
815
816 #[test_log::test(async_std::test)]
817 async fn test_indexer_fast_sync_full_with_resume() -> anyhow::Result<()> {
818 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
819
820 let addr = Address::new(b"my address 123456789");
821 let topic = Hash::create(&[b"my topic"]);
822
823 {
825 let logs = vec![
826 build_announcement_logs(*BOB, 1, 1, U256::from(1u8))?,
827 build_announcement_logs(*BOB, 1, 2, U256::from(1u8))?,
828 ]
829 .into_iter()
830 .flatten()
831 .collect::<Vec<_>>();
832
833 assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
834
835 for log in logs {
836 assert!(db.store_log(log).await.is_ok());
837 }
838 assert!(db.update_logs_checksums().await.is_ok());
839 assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 0);
840 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
841
842 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
843 let (tx_events, _) = async_channel::unbounded();
844
845 let head_block = 5;
846 let mut rpc = MockHoprIndexerOps::new();
847 rpc.expect_block_number().returning(move || Ok(head_block));
848 rpc.expect_try_stream_logs()
849 .times(1)
850 .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == 3)
851 .return_once(move |_, _| Ok(Box::pin(rx)));
852
853 let mut handlers = MockChainLogHandler::new();
854 handlers.expect_contract_addresses().return_const(vec![addr]);
855 handlers
856 .expect_contract_address_topics()
857 .withf(move |x| x == &addr)
858 .return_const(vec![topic.into()]);
859 handlers
860 .expect_collect_block_events()
861 .times(2)
862 .withf(move |b| [1, 2].contains(&b.block_id))
863 .returning(|_| Ok(vec![]));
864
865 let indexer_cfg = IndexerConfig {
866 start_block_number: 0,
867 fast_sync: true,
868 };
869 let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
870 let (indexing, _) = join!(indexer.start(), async move {
871 async_std::task::sleep(std::time::Duration::from_millis(200)).await;
872 tx.close_channel()
873 });
874 assert!(indexing.is_err()); assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
877 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
878
879 db.insert_account(
882 None,
883 AccountEntry::new(*ALICE_OKP.public(), *ALICE, AccountType::NotAnnounced).into(),
884 )
885 .await?;
886 db.insert_account(
887 None,
888 AccountEntry::new(*BOB_OKP.public(), *BOB, AccountType::NotAnnounced).into(),
889 )
890 .await?;
891 }
892
893 {
895 let logs = vec![
896 build_announcement_logs(*BOB, 1, 3, U256::from(1u8))?,
897 build_announcement_logs(*BOB, 1, 4, U256::from(1u8))?,
898 ]
899 .into_iter()
900 .flatten()
901 .collect::<Vec<_>>();
902
903 assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
904
905 for log in logs {
906 assert!(db.store_log(log).await.is_ok());
907 }
908 assert!(db.update_logs_checksums().await.is_ok());
909 assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
910 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
911
912 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
913 let (tx_events, _) = async_channel::unbounded();
914
915 let head_block = 5;
916 let mut rpc = MockHoprIndexerOps::new();
917 rpc.expect_block_number().returning(move || Ok(head_block));
918 rpc.expect_try_stream_logs()
919 .times(1)
920 .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == 5)
921 .return_once(move |_, _| Ok(Box::pin(rx)));
922
923 let mut handlers = MockChainLogHandler::new();
924 handlers.expect_contract_addresses().return_const(vec![addr]);
925 handlers
926 .expect_contract_address_topics()
927 .withf(move |x| x == &addr)
928 .return_const(vec![topic.into()]);
929
930 handlers
931 .expect_collect_block_events()
932 .times(2)
933 .withf(move |b| [3, 4].contains(&b.block_id))
934 .returning(|_| Ok(vec![]));
935
936 let indexer_cfg = IndexerConfig {
937 start_block_number: 0,
938 fast_sync: true,
939 };
940 let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
941 let (indexing, _) = join!(indexer.start(), async move {
942 async_std::task::sleep(std::time::Duration::from_millis(200)).await;
943 tx.close_channel()
944 });
945 assert!(indexing.is_err()); assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 4);
948 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
949 }
950
951 Ok(())
952 }
953
954 #[test_log::test(async_std::test)]
955 async fn test_indexer_should_yield_back_once_the_past_events_are_indexed() -> anyhow::Result<()> {
956 let mut handlers = MockChainLogHandler::new();
957 let mut rpc = MockHoprIndexerOps::new();
958 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
959
960 let cfg = IndexerConfig::default();
961
962 let addr = Address::new(b"my address 123456789");
964 handlers.expect_contract_addresses().return_const(vec![addr]);
965 handlers
966 .expect_contract_address_topics()
967 .withf(move |x| x == &addr)
968 .return_const(vec![Hash::create(&[b"my topic"]).into()]);
969
970 let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
971 rpc.expect_try_stream_logs()
973 .times(1)
974 .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == 0)
975 .return_once(move |_, _| Ok(Box::pin(rx)));
976
977 let head_block = 1000;
978 let block_numbers = vec![head_block - 1, head_block, head_block + 1];
979
980 let blocks: Vec<BlockWithLogs> = block_numbers
981 .iter()
982 .map(|block_id| BlockWithLogs {
983 block_id: *block_id,
984 logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, *block_id, U256::from(23u8)).unwrap()),
985 })
986 .collect();
987
988 for _ in 0..(blocks.len() as u64) {
989 rpc.expect_block_number().returning(move || Ok(head_block));
990 }
991
992 for block in blocks.iter() {
993 assert!(tx.start_send(block.clone()).is_ok());
994 }
995
996 handlers
998 .expect_collect_block_events()
999 .times(1)
1000 .withf(move |b| block_numbers.contains(&b.block_id))
1001 .returning(|b| {
1002 let block_id = b.block_id;
1003 Ok(vec![SignificantChainEvent {
1004 tx_hash: Hash::create(&[format!("my tx hash {block_id}").as_bytes()]),
1005 event_type: RANDOM_ANNOUNCEMENT_CHAIN_EVENT.clone(),
1006 }])
1007 });
1008
1009 let (tx_events, rx_events) = async_channel::unbounded();
1010 let indexer = Indexer::new(rpc, handlers, db.clone(), cfg, tx_events).without_panic_on_completion();
1011 indexer.start().await?;
1012
1013 let _first = rx_events.recv();
1016 let _second = rx_events.recv();
1017 let third = rx_events.try_recv();
1018
1019 assert!(third.is_err());
1020
1021 Ok(())
1022 }
1023
1024 #[test_log::test(async_std::test)]
1025 async fn test_indexer_should_not_reprocess_last_processed_block() -> anyhow::Result<()> {
1026 let last_processed_block = 100_u64;
1027
1028 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1029
1030 let addr = Address::new(b"my address 123456789");
1031 let topic = Hash::create(&[b"my topic"]);
1032 assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1033
1034 let log_1 = SerializableLog {
1036 address: Address::new(b"my address 123456789"),
1037 topics: [Hash::create(&[b"my topic"]).into()].into(),
1038 data: [1, 2, 3, 4].into(),
1039 tx_index: 1u64,
1040 block_number: last_processed_block,
1041 block_hash: Hash::create(&[b"my block hash"]).into(),
1042 tx_hash: Hash::create(&[b"my tx hash"]).into(),
1043 log_index: 1u64,
1044 removed: false,
1045 processed: Some(false),
1046 ..Default::default()
1047 };
1048 assert!(db.store_log(log_1.clone()).await.is_ok());
1049 assert!(db.set_logs_processed(Some(last_processed_block), Some(0)).await.is_ok());
1050 assert!(db.update_logs_checksums().await.is_ok());
1051
1052 let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1053
1054 let mut rpc = MockHoprIndexerOps::new();
1055 rpc.expect_try_stream_logs()
1056 .once()
1057 .withf(move |x: &u64, _y: &hopr_chain_rpc::LogFilter| *x == last_processed_block + 1)
1058 .return_once(move |_, _| Ok(Box::pin(rx)));
1059
1060 rpc.expect_block_number()
1061 .times(3)
1062 .returning(move || Ok(last_processed_block + 1));
1063
1064 let block = BlockWithLogs {
1065 block_id: last_processed_block + 1,
1066 logs: BTreeSet::from_iter(build_announcement_logs(
1067 *ALICE,
1068 1,
1069 last_processed_block + 1,
1070 U256::from(23u8),
1071 )?),
1072 };
1073
1074 tx.start_send(block)?;
1075
1076 let mut handlers = MockChainLogHandler::new();
1077 handlers.expect_contract_addresses().return_const(vec![addr]);
1078 handlers
1079 .expect_contract_address_topics()
1080 .withf(move |x| x == &addr)
1081 .return_const(vec![topic.into()]);
1082
1083 let indexer_cfg = IndexerConfig {
1084 start_block_number: 0,
1085 fast_sync: false,
1086 };
1087
1088 let (tx_events, _) = async_channel::unbounded();
1089 let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1090 indexer.start().await?;
1091
1092 Ok(())
1093 }
1094}