1use std::{
2 path::Path,
3 sync::{
4 Arc,
5 atomic::{AtomicBool, AtomicU64, Ordering},
6 },
7};
8
9use alloy::sol_types::SolEvent;
10use futures::{
11 FutureExt, StreamExt,
12 future::AbortHandle,
13 stream::{self},
14};
15use hopr_bindings::hoprtoken::HoprToken::{Approval, Transfer};
16use hopr_chain_rpc::{BlockWithLogs, FilterSet, HoprIndexerRpcOperations};
17use hopr_chain_types::chain_events::SignificantChainEvent;
18use hopr_crypto_types::types::Hash;
19use hopr_db_sql::{HoprIndexerDb, info::HoprDbInfoOperations, logs::HoprDbLogOperations};
20use hopr_primitive_types::prelude::*;
21use tracing::{debug, error, info, trace};
22
23use crate::{
24 IndexerConfig,
25 errors::{CoreEthereumIndexerError, Result},
26 snapshot::{SnapshotInfo, SnapshotManager},
27 traits::ChainLogHandler,
28};
29
30#[cfg(all(feature = "prometheus", not(test)))]
31lazy_static::lazy_static! {
32 static ref METRIC_INDEXER_CURRENT_BLOCK: hopr_metrics::SimpleGauge =
33 hopr_metrics::SimpleGauge::new(
34 "hopr_indexer_block_number",
35 "Current last processed block number by the indexer",
36 ).unwrap();
37 static ref METRIC_INDEXER_CHECKSUM: hopr_metrics::SimpleGauge =
38 hopr_metrics::SimpleGauge::new(
39 "hopr_indexer_checksum",
40 "Contains an unsigned integer that represents the low 32-bits of the Indexer checksum"
41 ).unwrap();
42 static ref METRIC_INDEXER_SYNC_PROGRESS: hopr_metrics::SimpleGauge =
43 hopr_metrics::SimpleGauge::new(
44 "hopr_indexer_sync_progress",
45 "Sync progress of the historical data by the indexer",
46 ).unwrap();
47 static ref METRIC_INDEXER_SYNC_SOURCE: hopr_metrics::MultiGauge =
48 hopr_metrics::MultiGauge::new(
49 "hopr_indexer_data_source",
50 "Current data source of the Indexer",
51 &["source"],
52 ).unwrap();
53
54}
55
56#[derive(Debug, Clone)]
70pub struct Indexer<T, U>
71where
72 T: HoprIndexerRpcOperations + Send + 'static,
73 U: ChainLogHandler + Send + 'static,
74{
75 rpc: Option<T>,
76 db_processor: Option<U>,
77 db: HoprIndexerDb,
78 cfg: IndexerConfig,
79 egress: futures::channel::mpsc::Sender<SignificantChainEvent>,
80 panic_on_completion: bool,
83}
84
85impl<T, U> Indexer<T, U>
86where
87 T: HoprIndexerRpcOperations + Sync + Send + 'static,
88 U: ChainLogHandler + Send + Sync + 'static,
89{
90 pub fn new(
91 rpc: T,
92 db_processor: U,
93 db: HoprIndexerDb,
94 cfg: IndexerConfig,
95 egress: futures::channel::mpsc::Sender<SignificantChainEvent>,
96 ) -> Self {
97 Self {
98 rpc: Some(rpc),
99 db_processor: Some(db_processor),
100 db,
101 cfg,
102 egress,
103 panic_on_completion: true,
104 }
105 }
106
107 pub fn without_panic_on_completion(mut self) -> Self {
109 self.panic_on_completion = false;
110 self
111 }
112
113 pub async fn start(mut self) -> Result<AbortHandle>
114 where
115 T: HoprIndexerRpcOperations + 'static,
116 U: ChainLogHandler + 'static,
117 {
118 if self.rpc.is_none() || self.db_processor.is_none() {
119 return Err(CoreEthereumIndexerError::ProcessError(
120 "indexer cannot start, missing components".into(),
121 ));
122 }
123
124 info!("Starting chain indexing");
125
126 let rpc = self.rpc.take().expect("rpc should be present");
127 let logs_handler = Arc::new(self.db_processor.take().expect("db_processor should be present"));
128 let db = self.db.clone();
129 let mut tx_significant_events = self.egress.clone();
130 let panic_on_completion = self.panic_on_completion;
131
132 let (log_filters, address_topics) = Self::generate_log_filters(&logs_handler);
133
134 db.ensure_logs_origin(address_topics).await?;
137
138 let is_synced = Arc::new(AtomicBool::new(false));
139 let chain_head = Arc::new(AtomicU64::new(0));
140
141 debug!("Updating chain head at indexer startup");
144 Self::update_chain_head(&rpc, chain_head.clone()).await;
145
146 let fast_sync_configured = self.cfg.fast_sync;
154 let index_empty = self.db.index_is_empty().await?;
155
156 self.pre_start().await?;
158
159 #[derive(PartialEq, Eq)]
160 enum FastSyncMode {
161 None,
162 FromScratch,
163 Continue,
164 }
165
166 let will_perform_fast_sync = match (fast_sync_configured, index_empty) {
167 (true, false) => {
168 info!(
169 "Fast sync is enabled, but the index database is not empty. Fast sync will continue on existing \
170 unprocessed logs."
171 );
172 FastSyncMode::Continue
173 }
174 (false, true) => {
175 info!("Fast sync is disabled, but the index database is empty. Doing a full re-sync.");
176 self.db.clear_index_db(None).await?;
178 self.db.set_logs_unprocessed(None, None).await?;
179 FastSyncMode::None
180 }
181 (false, false) => {
182 info!("Fast sync is disabled and the index database is not empty. Continuing normal sync.");
183 FastSyncMode::None
184 }
185 (true, true) => {
186 info!("Fast sync is enabled, starting the fast sync process");
187 self.db.clear_index_db(None).await?;
189 self.db.set_logs_unprocessed(None, None).await?;
190 FastSyncMode::FromScratch
191 }
192 };
193
194 let (tx, mut rx) = futures::channel::mpsc::channel::<()>(1);
195
196 if FastSyncMode::None != will_perform_fast_sync {
198 let processed = match will_perform_fast_sync {
199 FastSyncMode::FromScratch => None,
200 FastSyncMode::Continue => Some(false),
201 _ => unreachable!(),
202 };
203
204 #[cfg(all(feature = "prometheus", not(test)))]
205 {
206 METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 1.0);
207 METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 0.0);
208 }
209
210 let log_block_numbers = self.db.get_logs_block_numbers(None, None, processed).await?;
211 let _first_log_block_number = log_block_numbers.first().copied().unwrap_or(0);
212 let _head = chain_head.load(Ordering::Relaxed);
213 for block_number in log_block_numbers {
214 debug!(
215 block_number,
216 first_log_block_number = _first_log_block_number,
217 head = _head,
218 "computing processed logs"
219 );
220 Self::process_block_by_id(&db, &logs_handler, block_number, is_synced.load(Ordering::Relaxed)).await?;
222
223 #[cfg(all(feature = "prometheus", not(test)))]
224 {
225 let progress =
226 (block_number - _first_log_block_number) as f64 / (_head - _first_log_block_number) as f64;
227 METRIC_INDEXER_SYNC_PROGRESS.set(progress);
228 }
229 }
230 }
231
232 info!("Building rpc indexer background process");
233
234 let next_block_to_process = if let Some(last_log) = self.db.get_last_checksummed_log().await? {
235 info!(
236 start_block = last_log.block_number,
237 start_checksum = last_log.checksum.unwrap(),
238 "Loaded indexer state",
239 );
240
241 if self.cfg.start_block_number < last_log.block_number {
242 last_log.block_number + 1
244 } else {
245 self.cfg.start_block_number
246 }
247 } else {
248 self.cfg.start_block_number
249 };
250
251 info!(next_block_to_process, "Indexer start point");
252
253 let indexing_abort_handle = hopr_async_runtime::spawn_as_abortable!(
254 async move {
255 debug!("Updating chain head at indexer startup");
257 Self::update_chain_head(&rpc, chain_head.clone()).await;
258
259 #[cfg(all(feature = "prometheus", not(test)))]
260 {
261 METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 0.0);
262 METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 1.0);
263 }
264
265 let rpc_ref = &rpc;
266
267 let event_stream = rpc
268 .try_stream_logs(next_block_to_process, log_filters, is_synced.load(Ordering::Relaxed))
269 .expect("block stream should be constructible")
270 .then(|block| {
271 let db = db.clone();
272 let chain_head = chain_head.clone();
273 let is_synced = is_synced.clone();
274 let tx = tx.clone();
275 let logs_handler = logs_handler.clone();
276
277 async move {
278 Self::calculate_sync_process(
279 block.block_id,
280 rpc_ref,
281 db,
282 chain_head.clone(),
283 is_synced.clone(),
284 next_block_to_process,
285 tx.clone(),
286 logs_handler.safe_address().into(),
287 logs_handler.contract_addresses_map().channels.into(),
288 )
289 .await;
290
291 block
292 }
293 })
294 .filter_map(|block| {
295 let db = db.clone();
296 let logs_handler = logs_handler.clone();
297
298 async move {
299 debug!(%block, "storing logs from block");
300 let logs = block.logs.clone();
301
302 let logs_vec = logs
305 .into_iter()
306 .filter(|log| log.address != logs_handler.contract_addresses_map().token)
307 .collect();
308
309 match db.store_logs(logs_vec).await {
310 Ok(store_results) => {
311 if let Some(error) = store_results
312 .into_iter()
313 .filter(|r| r.is_err())
314 .map(|r| r.unwrap_err())
315 .next()
316 {
317 error!(%block, %error, "failed to processed stored logs from block");
318 None
319 } else {
320 Some(block)
321 }
322 }
323 Err(error) => {
324 error!(%block, %error, "failed to store logs from block");
325 None
326 }
327 }
328 }
329 })
330 .filter_map(|block| {
331 let db = db.clone();
332 let logs_handler = logs_handler.clone();
333 let is_synced = is_synced.clone();
334 async move {
335 Self::process_block(&db, &logs_handler, block, false, is_synced.load(Ordering::Relaxed))
336 .await
337 }
338 })
339 .flat_map(stream::iter);
340
341 futures::pin_mut!(event_stream);
342 while let Some(event) = event_stream.next().await {
343 trace!(%event, "processing on-chain event");
344 if is_synced.load(Ordering::Relaxed) {
346 if let Err(error) = tx_significant_events.try_send(event) {
347 error!(%error, "failed to pass a significant chain event further");
348 }
349 }
350 }
351
352 if panic_on_completion {
353 panic!(
354 "Indexer event stream has been terminated. This error may be caused by a failed RPC \
355 connection."
356 );
357 }
358 }
359 .inspect(|_| tracing::warn!(task = "indexer", "long-running background task finished"))
360 );
361
362 if rx.next().await.is_some() {
363 Ok(indexing_abort_handle)
364 } else {
365 Err(crate::errors::CoreEthereumIndexerError::ProcessError(
366 "Error during indexing start".into(),
367 ))
368 }
369 }
370
371 pub async fn pre_start(&self) -> Result<()> {
372 let fast_sync_configured = self.cfg.fast_sync;
373 let index_empty = self.db.index_is_empty().await?;
374
375 let logs_db_has_data = self.has_logs_data().await?;
377
378 if fast_sync_configured && index_empty && !logs_db_has_data && self.cfg.enable_logs_snapshot {
379 info!("Logs database is empty, attempting to download logs snapshot...");
380
381 match self.download_snapshot().await {
382 Ok(snapshot_info) => {
383 info!("Logs snapshot downloaded successfully: {:?}", snapshot_info);
384 }
385 Err(e) => {
386 error!("Failed to download logs snapshot: {}. Continuing with regular sync.", e);
387 }
388 }
389 }
390
391 Ok(())
392 }
393
394 fn generate_log_filters(logs_handler: &U) -> (FilterSet, Vec<(Address, Hash)>) {
413 let safe_address = logs_handler.safe_address();
414 let addresses_no_token = logs_handler
415 .contract_addresses()
416 .into_iter()
417 .filter(|a| *a != logs_handler.contract_addresses_map().token)
418 .collect::<Vec<_>>();
419 let mut filter_base_addresses = vec![];
420 let mut filter_base_topics = vec![];
421 let mut address_topics = vec![];
422
423 addresses_no_token.iter().for_each(|address| {
424 let topics = logs_handler.contract_address_topics(*address);
425 if !topics.is_empty() {
426 filter_base_addresses.push(alloy::primitives::Address::from(*address));
427 filter_base_topics.extend(topics.clone());
428 for topic in topics.iter() {
429 address_topics.push((*address, Hash::from(topic.0)))
430 }
431 }
432 });
433
434 let filter_base = alloy::rpc::types::Filter::new()
435 .address(filter_base_addresses)
436 .event_signature(filter_base_topics);
437 let filter_token = alloy::rpc::types::Filter::new().address(alloy::primitives::Address::from(
438 logs_handler.contract_addresses_map().token,
439 ));
440
441 let filter_transfer_to = filter_token
442 .clone()
443 .event_signature(Transfer::SIGNATURE_HASH)
444 .topic2(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()));
445
446 let filter_transfer_from = filter_token
447 .clone()
448 .event_signature(Transfer::SIGNATURE_HASH)
449 .topic1(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()));
450
451 let filter_approval = filter_token
452 .event_signature(Approval::SIGNATURE_HASH)
453 .topic1(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()))
454 .topic2(alloy::primitives::B256::from_slice(
455 logs_handler.contract_addresses_map().channels.to_bytes32().as_ref(),
456 ));
457
458 let set = FilterSet {
459 all: vec![
460 filter_base.clone(),
461 filter_transfer_from.clone(),
462 filter_transfer_to.clone(),
463 filter_approval.clone(),
464 ],
465 token: vec![filter_transfer_from, filter_transfer_to, filter_approval],
466 no_token: vec![filter_base],
467 };
468
469 (set, address_topics)
470 }
471
472 async fn process_block_by_id(
488 db: &HoprIndexerDb,
489 logs_handler: &U,
490 block_id: u64,
491 is_synced: bool,
492 ) -> crate::errors::Result<Option<Vec<SignificantChainEvent>>>
493 where
494 U: ChainLogHandler + 'static,
495 {
496 let logs = db.get_logs(Some(block_id), Some(0)).await?;
497 let mut block = BlockWithLogs {
498 block_id,
499 ..Default::default()
500 };
501
502 for log in logs {
503 if log.block_number == block_id {
504 block.logs.insert(log);
505 } else {
506 error!(
507 expected = block_id,
508 actual = log.block_number,
509 "block number mismatch in logs from database"
510 );
511 panic!("block number mismatch in logs from database")
512 }
513 }
514
515 Ok(Self::process_block(db, logs_handler, block, true, is_synced).await)
516 }
517
518 async fn process_block(
533 db: &HoprIndexerDb,
534 logs_handler: &U,
535 block: BlockWithLogs,
536 fetch_checksum_from_db: bool,
537 is_synced: bool,
538 ) -> Option<Vec<SignificantChainEvent>>
539 where
540 U: ChainLogHandler + 'static,
541 {
542 let block_id = block.block_id;
543 let log_count = block.logs.len();
544 debug!(block_id, "processing events");
545
546 let events = stream::iter(block.logs.clone())
549 .filter_map(|log| async move {
550 match logs_handler.collect_log_event(log.clone(), is_synced).await {
551 Ok(data) => match db.set_log_processed(log).await {
552 Ok(_) => data,
553 Err(error) => {
554 error!(block_id, %error, "failed to mark log as processed, panicking to prevent data loss");
555 panic!("failed to mark log as processed, panicking to prevent data loss")
556 }
557 },
558 Err(CoreEthereumIndexerError::ProcessError(error)) => {
559 error!(block_id, %error, "failed to process log into event, continuing indexing");
560 None
561 }
562 Err(error) => {
563 error!(block_id, %error, "failed to process log into event, panicking to prevent data loss");
564 panic!("failed to process log into event, panicking to prevent data loss")
565 }
566 }
567 })
568 .collect::<Vec<SignificantChainEvent>>()
569 .await;
570
571 match db.update_logs_checksums().await {
573 Ok(last_log_checksum) => {
574 let checksum = if fetch_checksum_from_db {
575 let last_log = block.logs.into_iter().next_back()?;
576 let log = db.get_log(block_id, last_log.tx_index, last_log.log_index).await.ok()?;
577
578 log.checksum?
579 } else {
580 last_log_checksum.to_string()
581 };
582
583 if log_count != 0 {
584 info!(
585 block_number = block_id,
586 log_count, last_log_checksum = ?checksum, "Indexer state update",
587 );
588
589 #[cfg(all(feature = "prometheus", not(test)))]
590 {
591 if let Ok(checksum_hash) = Hash::from_hex(checksum.as_str()) {
592 let low_4_bytes =
593 hopr_primitive_types::prelude::U256::from_big_endian(checksum_hash.as_ref()).low_u32();
594 METRIC_INDEXER_CHECKSUM.set(low_4_bytes.into());
595 } else {
596 error!("Invalid checksum generated from logs");
597 }
598 }
599 }
600
601 match db.set_indexer_state_info(None, block_id as u32).await {
603 Ok(_) => {
604 trace!(block_id, "updated indexer state info");
605 }
606 Err(error) => error!(block_id, %error, "failed to update indexer state info"),
607 }
608 }
609 Err(error) => error!(block_id, %error, "failed to update checksums for logs from block"),
610 }
611
612 debug!(
613 block_id,
614 num_events = events.len(),
615 "processed significant chain events from block",
616 );
617
618 Some(events)
619 }
620
621 async fn update_chain_head(rpc: &T, chain_head: Arc<AtomicU64>) -> u64
622 where
623 T: HoprIndexerRpcOperations + 'static,
624 {
625 match rpc.block_number().await {
626 Ok(head) => {
627 chain_head.store(head, Ordering::Relaxed);
628 debug!(head, "Updated chain head");
629 head
630 }
631 Err(error) => {
632 error!(%error, "Failed to fetch block number from RPC");
633 panic!("Failed to fetch block number from RPC, cannot continue indexing due to {error}")
634 }
635 }
636 }
637
638 #[allow(clippy::too_many_arguments)]
655 async fn calculate_sync_process(
656 current_block: u64,
657 rpc: &T,
658 db: HoprIndexerDb,
659 chain_head: Arc<AtomicU64>,
660 is_synced: Arc<AtomicBool>,
661 next_block_to_process: u64,
662 mut tx: futures::channel::mpsc::Sender<()>,
663 safe_address: Option<Address>,
664 channels_address: Option<Address>,
665 ) where
666 T: HoprIndexerRpcOperations + 'static,
667 {
668 #[cfg(all(feature = "prometheus", not(test)))]
669 {
670 METRIC_INDEXER_CURRENT_BLOCK.set(current_block as f64);
671 }
672
673 let mut head = chain_head.load(Ordering::Relaxed);
674
675 if !is_synced.load(Ordering::Relaxed) {
678 let mut block_difference = head.saturating_sub(next_block_to_process);
679
680 let progress = if block_difference == 0 {
681 head = Self::update_chain_head(rpc, chain_head.clone()).await;
683 block_difference = head.saturating_sub(next_block_to_process);
684
685 if block_difference == 0 {
686 1_f64
687 } else {
688 (current_block - next_block_to_process) as f64 / block_difference as f64
689 }
690 } else {
691 (current_block - next_block_to_process) as f64 / block_difference as f64
692 };
693
694 info!(
695 progress = progress * 100_f64,
696 block = current_block,
697 head,
698 "Sync progress to last known head"
699 );
700
701 #[cfg(all(feature = "prometheus", not(test)))]
702 METRIC_INDEXER_SYNC_PROGRESS.set(progress);
703
704 if current_block >= head {
705 info!("indexer sync completed successfully");
706 is_synced.store(true, Ordering::Relaxed);
707
708 if let Some(safe_address) = safe_address {
709 info!("updating safe balance from chain after indexer sync completed");
710 match rpc.get_hopr_balance(safe_address).await {
711 Ok(balance) => {
712 if let Err(error) = db.set_safe_hopr_balance(None, balance).await {
713 error!(%error, "failed to update safe balance from chain after indexer sync completed");
714 }
715 }
716 Err(error) => {
717 error!(%error, "failed to fetch safe balance from chain after indexer sync completed");
718 }
719 }
720 }
721
722 if let Some((channels_address, safe_address)) = channels_address.zip(safe_address) {
723 info!("updating safe allowance from chain after indexer sync completed");
724 match rpc.get_hopr_allowance(safe_address, channels_address).await {
725 Ok(allowance) => {
726 if let Err(error) = db.set_safe_hopr_allowance(None, allowance).await {
727 error!(%error, "failed to update safe allowance from chain after indexer sync completed");
728 }
729 }
730 Err(error) => {
731 error!(%error, "failed to fetch safe allowance from chain after indexer sync completed");
732 }
733 }
734 }
735
736 if let Err(error) = tx.try_send(()) {
737 error!(%error, "failed to notify about achieving indexer synchronization")
738 }
739 }
740 }
741 }
742
743 async fn has_logs_data(&self) -> Result<bool> {
756 self.db
757 .get_logs_count(None, None)
758 .await
759 .map(|count| count > 0)
760 .map_err(|e| CoreEthereumIndexerError::SnapshotError(e.to_string()))
761 }
762
763 pub async fn download_snapshot(&self) -> Result<SnapshotInfo> {
784 if let Err(e) = self.cfg.validate() {
786 return Err(CoreEthereumIndexerError::SnapshotError(e.to_string()));
787 }
788
789 let snapshot_manager = SnapshotManager::with_db(self.db.clone())
790 .map_err(|e| CoreEthereumIndexerError::SnapshotError(e.to_string()))?;
791
792 let data_dir = Path::new(&self.cfg.data_directory);
793
794 if let Some(url) = &self.cfg.logs_snapshot_url {
796 snapshot_manager
797 .download_and_setup_snapshot(url, data_dir)
798 .await
799 .map_err(|e| CoreEthereumIndexerError::SnapshotError(e.to_string()))
800 } else {
801 Err(CoreEthereumIndexerError::SnapshotError(
802 "Logs snapshot URL is not configured".to_string(),
803 ))
804 }
805 }
806}
807
808#[cfg(test)]
809mod tests {
810 use std::{collections::BTreeSet, pin::Pin};
811
812 use alloy::{
813 dyn_abi::DynSolValue,
814 primitives::{Address as AlloyAddress, B256},
815 sol_types::SolEvent,
816 };
817 use async_trait::async_trait;
818 use futures::{Stream, join, pin_mut};
819 use hex_literal::hex;
820 use hopr_chain_rpc::BlockWithLogs;
821 use hopr_chain_types::{ContractAddresses, chain_events::ChainEventType};
822 use hopr_crypto_types::{
823 keypairs::{Keypair, OffchainKeypair},
824 prelude::ChainKeypair,
825 };
826 use hopr_db_sql::{HoprIndexerDb, accounts::HoprDbAccountOperations};
827 use hopr_internal_types::account::{AccountEntry, AccountType};
828 use hopr_primitive_types::prelude::*;
829 use mockall::mock;
830 use multiaddr::Multiaddr;
831
832 use super::*;
833 use crate::traits::MockChainLogHandler;
834
835 lazy_static::lazy_static! {
836 static ref ALICE_OKP: OffchainKeypair = OffchainKeypair::random();
837 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
838 static ref ALICE: Address = ALICE_KP.public().to_address();
839 static ref BOB_OKP: OffchainKeypair = OffchainKeypair::random();
840 static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
841 static ref CHRIS: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
842
843 static ref RANDOM_ANNOUNCEMENT_CHAIN_EVENT: ChainEventType = ChainEventType::Announcement {
844 peer: (*OffchainKeypair::from_secret(&hex!("14d2d952715a51aadbd4cc6bfac9aa9927182040da7b336d37d5bb7247aa7566")).expect("lazy static keypair should be constructible").public()).into(),
845 address: hex!("2f4b7662a192b8125bbf51cfbf1bf5cc00b2c8e5").into(),
846 multiaddresses: vec![Multiaddr::empty()],
847 };
848 }
849
850 fn build_announcement_logs(
851 address: Address,
852 size: usize,
853 block_number: u64,
854 starting_log_index: u64,
855 ) -> anyhow::Result<Vec<SerializableLog>> {
856 let mut logs: Vec<SerializableLog> = vec![];
857 let block_hash = Hash::create(&[format!("my block hash {block_number}").as_bytes()]);
858
859 for i in 0..size {
860 let test_multiaddr: Multiaddr = format!("/ip4/1.2.3.4/tcp/{}", 1000 + i).parse()?;
861 let tx_index: u64 = i as u64;
862 let log_index: u64 = starting_log_index + tx_index;
863
864 logs.push(SerializableLog {
865 address,
866 block_hash: block_hash.into(),
867 topics: vec![hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH.into()],
868 data: DynSolValue::Tuple(vec![
869 DynSolValue::Address(AlloyAddress::from_slice(address.as_ref())),
870 DynSolValue::String(test_multiaddr.to_string()),
871 ])
872 .abi_encode(),
873 tx_hash: Hash::create(&[format!("my tx hash {i}").as_bytes()]).into(),
874 tx_index,
875 block_number,
876 log_index,
877 ..Default::default()
878 });
879 }
880
881 Ok(logs)
882 }
883
884 mock! {
885 HoprIndexerOps {} #[async_trait]
888 impl HoprIndexerRpcOperations for HoprIndexerOps {
889 async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
890 async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
891 async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance>;
892 async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
893
894 fn try_stream_logs<'a>(
895 &'a self,
896 start_block_number: u64,
897 filters: FilterSet,
898 is_synced: bool,
899 ) -> hopr_chain_rpc::errors::Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>>;
900 }
901 }
902
903 #[tokio::test]
904 async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_none_if_none_is_found()
905 -> anyhow::Result<()> {
906 let mut handlers = MockChainLogHandler::new();
907 let mut rpc = MockHoprIndexerOps::new();
908 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
909
910 let addr = Address::new(b"my address 123456789");
911 let topic = Hash::create(&[b"my topic"]);
912 db.ensure_logs_origin(vec![(addr, topic)]).await?;
913
914 handlers.expect_contract_addresses().return_const(vec![addr]);
915 handlers
916 .expect_contract_address_topics()
917 .withf(move |x| x == &addr)
918 .return_const(vec![B256::from_slice(topic.as_ref())]);
919 handlers
920 .expect_contract_address_topics()
921 .withf(move |x| x == &addr)
922 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
923 handlers
924 .expect_safe_address()
925 .return_const(Address::new(b"my safe address 1234"));
926 handlers
927 .expect_contract_addresses_map()
928 .return_const(ContractAddresses::default());
929
930 let head_block = 1000;
931 rpc.expect_block_number().times(2).returning(move || Ok(head_block));
932
933 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
934 rpc.expect_try_stream_logs()
935 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
936 .return_once(move |_, _, _| Ok(Box::pin(rx)));
937
938 let indexer = Indexer::new(
939 rpc,
940 handlers,
941 db.clone(),
942 IndexerConfig::default(),
943 futures::channel::mpsc::channel(1000).0,
944 )
945 .without_panic_on_completion();
946
947 let (indexing, _) = join!(indexer.start(), async move {
948 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
949 tx.close_channel()
950 });
951 assert!(indexing.is_err()); Ok(())
954 }
955
956 #[tokio::test]
957 async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_it_when_found() -> anyhow::Result<()>
958 {
959 let mut handlers = MockChainLogHandler::new();
960 let mut rpc = MockHoprIndexerOps::new();
961 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
962 let head_block = 1000;
963 let latest_block = 15u64;
964
965 let addr = Address::new(b"my address 123456789");
966 let topic = Hash::create(&[b"my topic"]);
967
968 handlers.expect_contract_addresses().return_const(vec![addr]);
969 handlers
970 .expect_contract_address_topics()
971 .withf(move |x| x == &addr)
972 .return_const(vec![B256::from_slice(topic.as_ref())]);
973 handlers
974 .expect_contract_address_topics()
975 .withf(move |x| x == &addr)
976 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
977 handlers
978 .expect_safe_address()
979 .return_const(Address::new(b"my safe address 1234"));
980 handlers
981 .expect_contract_addresses_map()
982 .return_const(ContractAddresses::default());
983
984 db.ensure_logs_origin(vec![(addr, topic)]).await?;
985
986 rpc.expect_block_number().times(2).returning(move || Ok(head_block));
987
988 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
989 rpc.expect_try_stream_logs()
990 .once()
991 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == latest_block + 1)
992 .return_once(move |_, _, _| Ok(Box::pin(rx)));
993
994 let log_1 = SerializableLog {
996 address: Address::new(b"my address 123456789"),
997 topics: [Hash::create(&[b"my topic"]).into()].into(),
998 data: [1, 2, 3, 4].into(),
999 tx_index: 1u64,
1000 block_number: latest_block,
1001 block_hash: Hash::create(&[b"my block hash"]).into(),
1002 tx_hash: Hash::create(&[b"my tx hash"]).into(),
1003 log_index: 1u64,
1004 removed: false,
1005 processed: Some(false),
1006 ..Default::default()
1007 };
1008 assert!(db.store_log(log_1.clone()).await.is_ok());
1009 assert!(db.set_logs_processed(Some(latest_block), Some(0)).await.is_ok());
1010 assert!(db.update_logs_checksums().await.is_ok());
1011
1012 let indexer = Indexer::new(
1013 rpc,
1014 handlers,
1015 db.clone(),
1016 IndexerConfig {
1017 fast_sync: false,
1018 ..Default::default()
1019 },
1020 futures::channel::mpsc::channel(1000).0,
1021 )
1022 .without_panic_on_completion();
1023
1024 let (indexing, _) = join!(indexer.start(), async move {
1025 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1026 tx.close_channel()
1027 });
1028 assert!(indexing.is_err()); Ok(())
1031 }
1032
1033 #[tokio::test]
1034 async fn test_indexer_should_pass_blocks_that_are_finalized() -> anyhow::Result<()> {
1035 let mut handlers = MockChainLogHandler::new();
1036 let mut rpc = MockHoprIndexerOps::new();
1037 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1038
1039 let cfg = IndexerConfig::default();
1040
1041 let addr = Address::new(b"my address 123456789");
1042 handlers.expect_contract_addresses().return_const(vec![addr]);
1043 handlers
1044 .expect_contract_address_topics()
1045 .withf(move |x| x == &addr)
1046 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1047 handlers
1048 .expect_safe_address()
1049 .return_const(Address::new(b"my safe address 1234"));
1050 handlers
1051 .expect_contract_addresses_map()
1052 .return_const(ContractAddresses::default());
1053
1054 let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1055 rpc.expect_try_stream_logs()
1056 .times(1)
1057 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
1058 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1059
1060 let head_block = 1000;
1061 rpc.expect_block_number().returning(move || Ok(head_block));
1062
1063 rpc.expect_get_hopr_balance()
1064 .withf(move |x| x == &Address::new(b"my safe address 1234"))
1065 .returning(move |_| Ok(HoprBalance::default()));
1066
1067 rpc.expect_get_hopr_allowance()
1068 .withf(move |x, y| x == &Address::new(b"my safe address 1234") && y == &Address::from([0; 20]))
1069 .returning(move |_, _| Ok(HoprBalance::default()));
1070
1071 let finalized_block = BlockWithLogs {
1072 block_id: head_block - 1,
1073 logs: BTreeSet::from_iter(build_announcement_logs(*BOB, 4, head_block - 1, 23)?),
1074 };
1075 let head_allowing_finalization = BlockWithLogs {
1076 block_id: head_block,
1077 logs: BTreeSet::new(),
1078 };
1079
1080 handlers
1082 .expect_collect_log_event()
1083 .times(finalized_block.logs.len())
1085 .returning(|_, _| Ok(None));
1086
1087 assert!(tx.start_send(finalized_block.clone()).is_ok());
1088 assert!(tx.start_send(head_allowing_finalization.clone()).is_ok());
1089
1090 let indexer = Indexer::new(rpc, handlers, db.clone(), cfg, futures::channel::mpsc::channel(1000).0)
1091 .without_panic_on_completion();
1092 let _ = join!(indexer.start(), async move {
1093 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1094 tx.close_channel()
1095 });
1096
1097 Ok(())
1098 }
1099
1100 #[test_log::test(tokio::test)]
1101 async fn test_indexer_fast_sync_full_with_resume() -> anyhow::Result<()> {
1102 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1103
1104 let addr = Address::new(b"my address 123456789");
1105 let topic = Hash::create(&[b"my topic"]);
1106
1107 {
1109 let logs = vec![
1110 build_announcement_logs(*BOB, 1, 1, 1)?,
1111 build_announcement_logs(*BOB, 1, 2, 1)?,
1112 ]
1113 .into_iter()
1114 .flatten()
1115 .collect::<Vec<_>>();
1116
1117 assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1118
1119 for log in logs {
1120 assert!(db.store_log(log).await.is_ok());
1121 }
1122 assert!(db.update_logs_checksums().await.is_ok());
1123 assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 0);
1124 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
1125
1126 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1127 let (tx_events, _) = futures::channel::mpsc::channel(1000);
1128
1129 let head_block = 5;
1130 let mut rpc = MockHoprIndexerOps::new();
1131 rpc.expect_block_number().returning(move || Ok(head_block));
1132 rpc.expect_try_stream_logs()
1133 .times(1)
1134 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 3)
1135 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1136
1137 let mut handlers = MockChainLogHandler::new();
1138 handlers.expect_contract_addresses().return_const(vec![addr]);
1139 handlers
1140 .expect_contract_address_topics()
1141 .withf(move |x| x == &addr)
1142 .return_const(vec![B256::from_slice(topic.as_ref())]);
1143 handlers
1144 .expect_collect_log_event()
1145 .times(2)
1146 .withf(move |l, _| [1, 2].contains(&l.block_number))
1147 .returning(|_, _| Ok(None));
1148 handlers
1149 .expect_contract_address_topics()
1150 .withf(move |x| x == &addr)
1151 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1152 handlers
1153 .expect_safe_address()
1154 .return_const(Address::new(b"my safe address 1234"));
1155 handlers
1156 .expect_contract_addresses_map()
1157 .return_const(ContractAddresses::default());
1158
1159 let indexer_cfg = IndexerConfig::new(0, true, false, None, "/tmp/test_data".to_string());
1160 let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1161 let (indexing, _) = join!(indexer.start(), async move {
1162 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1163 tx.close_channel()
1164 });
1165 assert!(indexing.is_err()); assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
1168 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
1169
1170 db.insert_account(
1173 None,
1174 AccountEntry {
1175 public_key: *ALICE_OKP.public(),
1176 chain_addr: *ALICE,
1177 entry_type: AccountType::NotAnnounced,
1178 published_at: 1,
1179 },
1180 )
1181 .await?;
1182 db.insert_account(
1183 None,
1184 AccountEntry {
1185 public_key: *BOB_OKP.public(),
1186 chain_addr: *BOB,
1187 entry_type: AccountType::NotAnnounced,
1188 published_at: 1,
1189 },
1190 )
1191 .await?;
1192 }
1193
1194 {
1196 let logs = vec![
1197 build_announcement_logs(*BOB, 1, 3, 1)?,
1198 build_announcement_logs(*BOB, 1, 4, 1)?,
1199 ]
1200 .into_iter()
1201 .flatten()
1202 .collect::<Vec<_>>();
1203
1204 assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1205
1206 for log in logs {
1207 assert!(db.store_log(log).await.is_ok());
1208 }
1209 assert!(db.update_logs_checksums().await.is_ok());
1210 assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
1211 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
1212
1213 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1214 let (tx_events, _) = futures::channel::mpsc::channel(1000);
1215
1216 let head_block = 5;
1217 let mut rpc = MockHoprIndexerOps::new();
1218 rpc.expect_block_number().returning(move || Ok(head_block));
1219 rpc.expect_try_stream_logs()
1220 .times(1)
1221 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 5)
1222 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1223
1224 let mut handlers = MockChainLogHandler::new();
1225 handlers.expect_contract_addresses().return_const(vec![addr]);
1226 handlers
1227 .expect_contract_address_topics()
1228 .withf(move |x| x == &addr)
1229 .return_const(vec![B256::from_slice(topic.as_ref())]);
1230
1231 handlers
1232 .expect_collect_log_event()
1233 .times(2)
1234 .withf(move |l, _| [3, 4].contains(&l.block_number))
1235 .returning(|_, _| Ok(None));
1236 handlers
1237 .expect_contract_address_topics()
1238 .withf(move |x| x == &addr)
1239 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1240 handlers
1241 .expect_safe_address()
1242 .return_const(Address::new(b"my safe address 1234"));
1243 handlers
1244 .expect_contract_addresses_map()
1245 .return_const(ContractAddresses::default());
1246
1247 let indexer_cfg = IndexerConfig::new(0, true, false, None, "/tmp/test_data".to_string());
1248 let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1249 let (indexing, _) = join!(indexer.start(), async move {
1250 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1251 tx.close_channel()
1252 });
1253 assert!(indexing.is_err()); assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 4);
1256 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
1257 }
1258
1259 Ok(())
1260 }
1261
1262 #[test_log::test(tokio::test)]
1263 async fn test_indexer_should_yield_back_once_the_past_events_are_indexed() -> anyhow::Result<()> {
1264 let mut handlers = MockChainLogHandler::new();
1265 let mut rpc = MockHoprIndexerOps::new();
1266 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1267
1268 let cfg = IndexerConfig::default();
1269
1270 let addr = Address::new(b"my address 123456789");
1272 handlers.expect_contract_addresses().return_const(vec![addr]);
1273 handlers
1274 .expect_contract_address_topics()
1275 .withf(move |x| x == &addr)
1276 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1277 handlers
1278 .expect_contract_address_topics()
1279 .withf(move |x| x == &addr)
1280 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1281 handlers
1282 .expect_safe_address()
1283 .return_const(Address::new(b"my safe address 1234"));
1284 handlers
1285 .expect_contract_addresses_map()
1286 .return_const(ContractAddresses::default());
1287
1288 let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1289 rpc.expect_try_stream_logs()
1291 .times(1)
1292 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
1293 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1294 rpc.expect_get_hopr_balance()
1295 .once()
1296 .return_once(move |_| Ok(HoprBalance::zero()));
1297 rpc.expect_get_hopr_allowance()
1298 .once()
1299 .return_once(move |_, _| Ok(HoprBalance::zero()));
1300
1301 let head_block = 1000;
1302 let block_numbers = [head_block - 1, head_block, head_block + 1];
1303
1304 let blocks: Vec<BlockWithLogs> = block_numbers
1305 .iter()
1306 .map(|block_id| BlockWithLogs {
1307 block_id: *block_id,
1308 logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, *block_id, 23).unwrap()),
1309 })
1310 .collect();
1311
1312 for _ in 0..(blocks.len() as u64) {
1313 rpc.expect_block_number().returning(move || Ok(head_block));
1314 }
1315
1316 for block in blocks.iter() {
1317 assert!(tx.start_send(block.clone()).is_ok());
1318 }
1319
1320 handlers
1322 .expect_collect_log_event()
1323 .times(3)
1324 .withf(move |l, _| block_numbers.contains(&l.block_number))
1325 .returning(|l, _| {
1326 let block_number = l.block_number;
1327 Ok(Some(SignificantChainEvent {
1328 tx_hash: Hash::create(&[format!("my tx hash {block_number}").as_bytes()]),
1329 event_type: RANDOM_ANNOUNCEMENT_CHAIN_EVENT.clone(),
1330 }))
1331 });
1332
1333 let (tx_events, rx_events) = futures::channel::mpsc::channel(1000);
1334 let indexer = Indexer::new(rpc, handlers, db.clone(), cfg, tx_events).without_panic_on_completion();
1335 indexer.start().await?;
1336
1337 pin_mut!(rx_events);
1340 tokio::time::timeout(std::time::Duration::from_millis(100), rx_events.next())
1341 .await?
1342 .unwrap();
1343 tokio::time::timeout(std::time::Duration::from_millis(100), rx_events.next())
1344 .await?
1345 .unwrap();
1346
1347 assert!(
1349 tokio::time::timeout(std::time::Duration::from_millis(100), rx_events.next())
1350 .await
1351 .is_err()
1352 );
1353
1354 Ok(())
1355 }
1356
1357 #[test_log::test(tokio::test)]
1358 async fn test_indexer_should_not_reprocess_last_processed_block() -> anyhow::Result<()> {
1359 let last_processed_block = 100_u64;
1360
1361 let db = HoprIndexerDb::new_in_memory(ChainKeypair::random()).await?;
1362
1363 let addr = Address::new(b"my address 123456789");
1364 let topic = Hash::create(&[b"my topic"]);
1365 assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1366
1367 let log_1 = SerializableLog {
1369 address: Address::new(b"my address 123456789"),
1370 topics: [Hash::create(&[b"my topic"]).into()].into(),
1371 data: [1, 2, 3, 4].into(),
1372 tx_index: 1u64,
1373 block_number: last_processed_block,
1374 block_hash: Hash::create(&[b"my block hash"]).into(),
1375 tx_hash: Hash::create(&[b"my tx hash"]).into(),
1376 log_index: 1u64,
1377 removed: false,
1378 processed: Some(false),
1379 ..Default::default()
1380 };
1381 assert!(db.store_log(log_1.clone()).await.is_ok());
1382 assert!(db.set_logs_processed(Some(last_processed_block), Some(0)).await.is_ok());
1383 assert!(db.update_logs_checksums().await.is_ok());
1384
1385 let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1386
1387 let mut rpc = MockHoprIndexerOps::new();
1388 rpc.expect_try_stream_logs()
1389 .once()
1390 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == last_processed_block + 1)
1391 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1392
1393 rpc.expect_block_number()
1394 .times(3)
1395 .returning(move || Ok(last_processed_block + 1));
1396
1397 rpc.expect_get_hopr_balance()
1398 .once()
1399 .return_once(move |_| Ok(HoprBalance::zero()));
1400
1401 rpc.expect_get_hopr_allowance()
1402 .once()
1403 .return_once(move |_, _| Ok(HoprBalance::zero()));
1404
1405 let block = BlockWithLogs {
1406 block_id: last_processed_block + 1,
1407 logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, last_processed_block + 1, 23)?),
1408 };
1409
1410 tx.start_send(block)?;
1411
1412 let mut handlers = MockChainLogHandler::new();
1413 handlers.expect_contract_addresses().return_const(vec![addr]);
1414 handlers
1415 .expect_contract_address_topics()
1416 .withf(move |x| x == &addr)
1417 .return_const(vec![B256::from_slice(topic.as_ref())]);
1418 handlers
1419 .expect_contract_address_topics()
1420 .withf(move |x| x == &addr)
1421 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1422 handlers
1423 .expect_safe_address()
1424 .return_const(Address::new(b"my safe address 1234"));
1425 handlers
1426 .expect_contract_addresses_map()
1427 .return_const(ContractAddresses::default());
1428
1429 let indexer_cfg = IndexerConfig::new(0, false, false, None, "/tmp/test_data".to_string());
1430
1431 let (tx_events, _) = futures::channel::mpsc::channel(1000);
1432 let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1433 indexer.start().await?;
1434
1435 Ok(())
1436 }
1437}