1use std::{
2 path::Path,
3 sync::{
4 Arc,
5 atomic::{AtomicBool, AtomicU64, Ordering},
6 },
7};
8
9use alloy::sol_types::SolEvent;
10use futures::{
11 StreamExt,
12 future::AbortHandle,
13 stream::{self},
14};
15use hopr_bindings::hoprtoken::HoprToken::{Approval, Transfer};
16use hopr_chain_rpc::{BlockWithLogs, FilterSet, HoprIndexerRpcOperations};
17use hopr_chain_types::chain_events::SignificantChainEvent;
18use hopr_crypto_types::types::Hash;
19use hopr_db_api::logs::HoprDbLogOperations;
20use hopr_db_sql::{HoprDbGeneralModelOperations, info::HoprDbInfoOperations};
21use hopr_primitive_types::prelude::*;
22use tracing::{debug, error, info, trace};
23
24use crate::{
25 IndexerConfig,
26 errors::{CoreEthereumIndexerError, Result},
27 snapshot::{SnapshotInfo, SnapshotManager},
28 traits::ChainLogHandler,
29};
30
31#[cfg(all(feature = "prometheus", not(test)))]
32lazy_static::lazy_static! {
33 static ref METRIC_INDEXER_CURRENT_BLOCK: hopr_metrics::metrics::SimpleGauge =
34 hopr_metrics::metrics::SimpleGauge::new(
35 "hopr_indexer_block_number",
36 "Current last processed block number by the indexer",
37 ).unwrap();
38 static ref METRIC_INDEXER_CHECKSUM: hopr_metrics::metrics::SimpleGauge =
39 hopr_metrics::metrics::SimpleGauge::new(
40 "hopr_indexer_checksum",
41 "Contains an unsigned integer that represents the low 32-bits of the Indexer checksum"
42 ).unwrap();
43 static ref METRIC_INDEXER_SYNC_PROGRESS: hopr_metrics::metrics::SimpleGauge =
44 hopr_metrics::metrics::SimpleGauge::new(
45 "hopr_indexer_sync_progress",
46 "Sync progress of the historical data by the indexer",
47 ).unwrap();
48 static ref METRIC_INDEXER_SYNC_SOURCE: hopr_metrics::metrics::MultiGauge =
49 hopr_metrics::metrics::MultiGauge::new(
50 "hopr_indexer_data_source",
51 "Current data source of the Indexer",
52 &["source"],
53 ).unwrap();
54
55}
56
57#[derive(Debug, Clone)]
71pub struct Indexer<T, U, Db>
72where
73 T: HoprIndexerRpcOperations + Send + 'static,
74 U: ChainLogHandler + Send + 'static,
75 Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
76{
77 rpc: Option<T>,
78 db_processor: Option<U>,
79 db: Db,
80 cfg: IndexerConfig,
81 egress: async_channel::Sender<SignificantChainEvent>,
82 panic_on_completion: bool,
85}
86
87impl<T, U, Db> Indexer<T, U, Db>
88where
89 T: HoprIndexerRpcOperations + Sync + Send + 'static,
90 U: ChainLogHandler + Send + Sync + 'static,
91 Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
92{
93 pub fn new(
94 rpc: T,
95 db_processor: U,
96 db: Db,
97 cfg: IndexerConfig,
98 egress: async_channel::Sender<SignificantChainEvent>,
99 ) -> Self {
100 Self {
101 rpc: Some(rpc),
102 db_processor: Some(db_processor),
103 db,
104 cfg,
105 egress,
106 panic_on_completion: true,
107 }
108 }
109
110 pub fn without_panic_on_completion(mut self) -> Self {
112 self.panic_on_completion = false;
113 self
114 }
115
116 pub async fn start(mut self) -> Result<AbortHandle>
117 where
118 T: HoprIndexerRpcOperations + 'static,
119 U: ChainLogHandler + 'static,
120 Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
121 {
122 if self.rpc.is_none() || self.db_processor.is_none() {
123 return Err(CoreEthereumIndexerError::ProcessError(
124 "indexer cannot start, missing components".into(),
125 ));
126 }
127
128 info!("Starting chain indexing");
129
130 let rpc = self.rpc.take().expect("rpc should be present");
131 let logs_handler = Arc::new(self.db_processor.take().expect("db_processor should be present"));
132 let db = self.db.clone();
133 let tx_significant_events = self.egress.clone();
134 let panic_on_completion = self.panic_on_completion;
135
136 let (log_filters, address_topics) = Self::generate_log_filters(&logs_handler);
137
138 db.ensure_logs_origin(address_topics).await?;
141
142 let is_synced = Arc::new(AtomicBool::new(false));
143 let chain_head = Arc::new(AtomicU64::new(0));
144
145 debug!("Updating chain head at indexer startup");
148 Self::update_chain_head(&rpc, chain_head.clone()).await;
149
150 let fast_sync_configured = self.cfg.fast_sync;
158 let index_empty = self.db.index_is_empty().await?;
159
160 self.pre_start().await?;
162
163 #[derive(PartialEq, Eq)]
164 enum FastSyncMode {
165 None,
166 FromScratch,
167 Continue,
168 }
169
170 let will_perform_fast_sync = match (fast_sync_configured, index_empty) {
171 (true, false) => {
172 info!(
173 "Fast sync is enabled, but the index database is not empty. Fast sync will continue on existing \
174 unprocessed logs."
175 );
176 FastSyncMode::Continue
177 }
178 (false, true) => {
179 info!("Fast sync is disabled, but the index database is empty. Doing a full re-sync.");
180 self.db.clear_index_db(None).await?;
182 self.db.set_logs_unprocessed(None, None).await?;
183 FastSyncMode::None
184 }
185 (false, false) => {
186 info!("Fast sync is disabled and the index database is not empty. Continuing normal sync.");
187 FastSyncMode::None
188 }
189 (true, true) => {
190 info!("Fast sync is enabled, starting the fast sync process");
191 self.db.clear_index_db(None).await?;
193 self.db.set_logs_unprocessed(None, None).await?;
194 FastSyncMode::FromScratch
195 }
196 };
197
198 let (tx, mut rx) = futures::channel::mpsc::channel::<()>(1);
199
200 if FastSyncMode::None != will_perform_fast_sync {
202 let processed = match will_perform_fast_sync {
203 FastSyncMode::FromScratch => None,
204 FastSyncMode::Continue => Some(false),
205 _ => unreachable!(),
206 };
207
208 #[cfg(all(feature = "prometheus", not(test)))]
209 {
210 METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 1.0);
211 METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 0.0);
212 }
213
214 let log_block_numbers = self.db.get_logs_block_numbers(None, None, processed).await?;
215 let _first_log_block_number = log_block_numbers.first().copied().unwrap_or(0);
216 let _head = chain_head.load(Ordering::Relaxed);
217 for block_number in log_block_numbers {
218 debug!(
219 block_number,
220 first_log_block_number = _first_log_block_number,
221 head = _head,
222 "computing processed logs"
223 );
224 Self::process_block_by_id(&db, &logs_handler, block_number, is_synced.load(Ordering::Relaxed)).await?;
226
227 #[cfg(all(feature = "prometheus", not(test)))]
228 {
229 let progress =
230 (block_number - _first_log_block_number) as f64 / (_head - _first_log_block_number) as f64;
231 METRIC_INDEXER_SYNC_PROGRESS.set(progress);
232 }
233 }
234 }
235
236 info!("Building rpc indexer background process");
237
238 let next_block_to_process = if let Some(last_log) = self.db.get_last_checksummed_log().await? {
239 info!(
240 start_block = last_log.block_number,
241 start_checksum = last_log.checksum.unwrap(),
242 "Loaded indexer state",
243 );
244
245 if self.cfg.start_block_number < last_log.block_number {
246 last_log.block_number + 1
248 } else {
249 self.cfg.start_block_number
250 }
251 } else {
252 self.cfg.start_block_number
253 };
254
255 info!(next_block_to_process, "Indexer start point");
256
257 let indexing_abort_handle = hopr_async_runtime::spawn_as_abortable!(async move {
258 debug!("Updating chain head at indexer startup");
260 Self::update_chain_head(&rpc, chain_head.clone()).await;
261
262 #[cfg(all(feature = "prometheus", not(test)))]
263 {
264 METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 0.0);
265 METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 1.0);
266 }
267
268 let rpc_ref = &rpc;
269
270 let event_stream = rpc
271 .try_stream_logs(next_block_to_process, log_filters, is_synced.load(Ordering::Relaxed))
272 .expect("block stream should be constructible")
273 .then(|block| {
274 let db = db.clone();
275 let chain_head = chain_head.clone();
276 let is_synced = is_synced.clone();
277 let tx = tx.clone();
278 let logs_handler = logs_handler.clone();
279
280 async move {
281 Self::calculate_sync_process(
282 block.block_id,
283 rpc_ref,
284 db,
285 chain_head.clone(),
286 is_synced.clone(),
287 next_block_to_process,
288 tx.clone(),
289 logs_handler.safe_address().into(),
290 logs_handler.contract_addresses_map().channels.into(),
291 )
292 .await;
293
294 block
295 }
296 })
297 .filter_map(|block| {
298 let db = db.clone();
299 let logs_handler = logs_handler.clone();
300
301 async move {
302 debug!(%block, "storing logs from block");
303 let logs = block.logs.clone();
304
305 let logs_vec = logs
308 .into_iter()
309 .filter(|log| log.address != logs_handler.contract_addresses_map().token)
310 .collect();
311
312 match db.store_logs(logs_vec).await {
313 Ok(store_results) => {
314 if let Some(error) = store_results
315 .into_iter()
316 .filter(|r| r.is_err())
317 .map(|r| r.unwrap_err())
318 .next()
319 {
320 error!(%block, %error, "failed to processed stored logs from block");
321 None
322 } else {
323 Some(block)
324 }
325 }
326 Err(error) => {
327 error!(%block, %error, "failed to store logs from block");
328 None
329 }
330 }
331 }
332 })
333 .filter_map(|block| {
334 let db = db.clone();
335 let logs_handler = logs_handler.clone();
336 let is_synced = is_synced.clone();
337 async move {
338 Self::process_block(&db, &logs_handler, block, false, is_synced.load(Ordering::Relaxed)).await
339 }
340 })
341 .flat_map(stream::iter);
342
343 futures::pin_mut!(event_stream);
344 while let Some(event) = event_stream.next().await {
345 trace!(%event, "processing on-chain event");
346 if is_synced.load(Ordering::Relaxed) {
348 if let Err(error) = tx_significant_events.try_send(event) {
349 error!(%error, "failed to pass a significant chain event further");
350 }
351 }
352 }
353
354 if panic_on_completion {
355 panic!(
356 "Indexer event stream has been terminated. This error may be caused by a failed RPC connection."
357 );
358 }
359 });
360
361 if rx.next().await.is_some() {
362 Ok(indexing_abort_handle)
363 } else {
364 Err(crate::errors::CoreEthereumIndexerError::ProcessError(
365 "Error during indexing start".into(),
366 ))
367 }
368 }
369
370 pub async fn pre_start(&self) -> Result<()> {
371 let fast_sync_configured = self.cfg.fast_sync;
372 let index_empty = self.db.index_is_empty().await?;
373
374 let logs_db_has_data = self.has_logs_data().await?;
376
377 if fast_sync_configured && index_empty && !logs_db_has_data && self.cfg.enable_logs_snapshot {
378 info!("Logs database is empty, attempting to download logs snapshot...");
379
380 match self.download_snapshot().await {
381 Ok(snapshot_info) => {
382 info!("Logs snapshot downloaded successfully: {:?}", snapshot_info);
383 }
384 Err(e) => {
385 error!("Failed to download logs snapshot: {}. Continuing with regular sync.", e);
386 }
387 }
388 }
389
390 Ok(())
391 }
392
393 fn generate_log_filters(logs_handler: &U) -> (FilterSet, Vec<(Address, Hash)>) {
412 let safe_address = logs_handler.safe_address();
413 let addresses_no_token = logs_handler
414 .contract_addresses()
415 .into_iter()
416 .filter(|a| *a != logs_handler.contract_addresses_map().token)
417 .collect::<Vec<_>>();
418 let mut filter_base_addresses = vec![];
419 let mut filter_base_topics = vec![];
420 let mut address_topics = vec![];
421
422 addresses_no_token.iter().for_each(|address| {
423 let topics = logs_handler.contract_address_topics(*address);
424 if !topics.is_empty() {
425 filter_base_addresses.push(alloy::primitives::Address::from(*address));
426 filter_base_topics.extend(topics.clone());
427 for topic in topics.iter() {
428 address_topics.push((*address, Hash::from(topic.0)))
429 }
430 }
431 });
432
433 let filter_base = alloy::rpc::types::Filter::new()
434 .address(filter_base_addresses)
435 .event_signature(filter_base_topics);
436 let filter_token = alloy::rpc::types::Filter::new().address(alloy::primitives::Address::from(
437 logs_handler.contract_addresses_map().token,
438 ));
439
440 let filter_transfer_to = filter_token
441 .clone()
442 .event_signature(Transfer::SIGNATURE_HASH)
443 .topic2(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()));
444
445 let filter_transfer_from = filter_token
446 .clone()
447 .event_signature(Transfer::SIGNATURE_HASH)
448 .topic1(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()));
449
450 let filter_approval = filter_token
451 .event_signature(Approval::SIGNATURE_HASH)
452 .topic1(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()))
453 .topic2(alloy::primitives::B256::from_slice(
454 logs_handler.contract_addresses_map().channels.to_bytes32().as_ref(),
455 ));
456
457 let set = FilterSet {
458 all: vec![
459 filter_base.clone(),
460 filter_transfer_from.clone(),
461 filter_transfer_to.clone(),
462 filter_approval.clone(),
463 ],
464 token: vec![filter_transfer_from, filter_transfer_to, filter_approval],
465 no_token: vec![filter_base],
466 };
467
468 (set, address_topics)
469 }
470
471 async fn process_block_by_id(
487 db: &Db,
488 logs_handler: &U,
489 block_id: u64,
490 is_synced: bool,
491 ) -> crate::errors::Result<Option<Vec<SignificantChainEvent>>>
492 where
493 U: ChainLogHandler + 'static,
494 Db: HoprDbLogOperations + 'static,
495 {
496 let logs = db.get_logs(Some(block_id), Some(0)).await?;
497 let mut block = BlockWithLogs {
498 block_id,
499 ..Default::default()
500 };
501
502 for log in logs {
503 if log.block_number == block_id {
504 block.logs.insert(log);
505 } else {
506 error!(
507 expected = block_id,
508 actual = log.block_number,
509 "block number mismatch in logs from database"
510 );
511 panic!("block number mismatch in logs from database")
512 }
513 }
514
515 Ok(Self::process_block(db, logs_handler, block, true, is_synced).await)
516 }
517
518 async fn process_block(
533 db: &Db,
534 logs_handler: &U,
535 block: BlockWithLogs,
536 fetch_checksum_from_db: bool,
537 is_synced: bool,
538 ) -> Option<Vec<SignificantChainEvent>>
539 where
540 U: ChainLogHandler + 'static,
541 Db: HoprDbLogOperations + 'static,
542 {
543 let block_id = block.block_id;
544 let log_count = block.logs.len();
545 debug!(block_id, "processing events");
546
547 let events = stream::iter(block.logs.clone())
550 .filter_map(|log| async move {
551 match logs_handler.collect_log_event(log.clone(), is_synced).await {
552 Ok(data) => match db.set_log_processed(log).await {
553 Ok(_) => data,
554 Err(error) => {
555 error!(block_id, %error, "failed to mark log as processed, panicking to prevent data loss");
556 panic!("failed to mark log as processed, panicking to prevent data loss")
557 }
558 },
559 Err(CoreEthereumIndexerError::ProcessError(error)) => {
560 error!(block_id, %error, "failed to process log into event, continuing indexing");
561 None
562 }
563 Err(error) => {
564 error!(block_id, %error, "failed to process log into event, panicking to prevent data loss");
565 panic!("failed to process log into event, panicking to prevent data loss")
566 }
567 }
568 })
569 .collect::<Vec<SignificantChainEvent>>()
570 .await;
571
572 match db.update_logs_checksums().await {
574 Ok(last_log_checksum) => {
575 let checksum = if fetch_checksum_from_db {
576 let last_log = block.logs.into_iter().next_back()?;
577 let log = db.get_log(block_id, last_log.tx_index, last_log.log_index).await.ok()?;
578
579 log.checksum?
580 } else {
581 last_log_checksum.to_string()
582 };
583
584 if log_count != 0 {
585 info!(
586 block_number = block_id,
587 log_count, last_log_checksum = ?checksum, "Indexer state update",
588 );
589
590 #[cfg(all(feature = "prometheus", not(test)))]
591 {
592 if let Ok(checksum_hash) = Hash::from_hex(checksum.as_str()) {
593 let low_4_bytes =
594 hopr_primitive_types::prelude::U256::from_big_endian(checksum_hash.as_ref()).low_u32();
595 METRIC_INDEXER_CHECKSUM.set(low_4_bytes.into());
596 } else {
597 error!("Invalid checksum generated from logs");
598 }
599 }
600 }
601
602 match db.set_indexer_state_info(None, block_id as u32).await {
604 Ok(_) => {
605 trace!(block_id, "updated indexer state info");
606 }
607 Err(error) => error!(block_id, %error, "failed to update indexer state info"),
608 }
609 }
610 Err(error) => error!(block_id, %error, "failed to update checksums for logs from block"),
611 }
612
613 debug!(
614 block_id,
615 num_events = events.len(),
616 "processed significant chain events from block",
617 );
618
619 Some(events)
620 }
621
622 async fn update_chain_head(rpc: &T, chain_head: Arc<AtomicU64>) -> u64
623 where
624 T: HoprIndexerRpcOperations + 'static,
625 {
626 match rpc.block_number().await {
627 Ok(head) => {
628 chain_head.store(head, Ordering::Relaxed);
629 debug!(head, "Updated chain head");
630 head
631 }
632 Err(error) => {
633 error!(%error, "Failed to fetch block number from RPC");
634 panic!("Failed to fetch block number from RPC, cannot continue indexing due to {error}")
635 }
636 }
637 }
638
639 #[allow(clippy::too_many_arguments)]
656 async fn calculate_sync_process(
657 current_block: u64,
658 rpc: &T,
659 db: Db,
660 chain_head: Arc<AtomicU64>,
661 is_synced: Arc<AtomicBool>,
662 next_block_to_process: u64,
663 mut tx: futures::channel::mpsc::Sender<()>,
664 safe_address: Option<Address>,
665 channels_address: Option<Address>,
666 ) where
667 T: HoprIndexerRpcOperations + 'static,
668 Db: HoprDbInfoOperations + Clone + Send + Sync + 'static,
669 {
670 #[cfg(all(feature = "prometheus", not(test)))]
671 {
672 METRIC_INDEXER_CURRENT_BLOCK.set(current_block as f64);
673 }
674
675 let mut head = chain_head.load(Ordering::Relaxed);
676
677 if !is_synced.load(Ordering::Relaxed) {
680 let mut block_difference = head.saturating_sub(next_block_to_process);
681
682 let progress = if block_difference == 0 {
683 head = Self::update_chain_head(rpc, chain_head.clone()).await;
685 block_difference = head.saturating_sub(next_block_to_process);
686
687 if block_difference == 0 {
688 1_f64
689 } else {
690 (current_block - next_block_to_process) as f64 / block_difference as f64
691 }
692 } else {
693 (current_block - next_block_to_process) as f64 / block_difference as f64
694 };
695
696 info!(
697 progress = progress * 100_f64,
698 block = current_block,
699 head,
700 "Sync progress to last known head"
701 );
702
703 #[cfg(all(feature = "prometheus", not(test)))]
704 METRIC_INDEXER_SYNC_PROGRESS.set(progress);
705
706 if current_block >= head {
707 info!("indexer sync completed successfully");
708 is_synced.store(true, Ordering::Relaxed);
709
710 if let Some(safe_address) = safe_address {
711 info!("updating safe balance from chain after indexer sync completed");
712 match rpc.get_hopr_balance(safe_address).await {
713 Ok(balance) => {
714 if let Err(error) = db.set_safe_hopr_balance(None, balance).await {
715 error!(%error, "failed to update safe balance from chain after indexer sync completed");
716 }
717 }
718 Err(error) => {
719 error!(%error, "failed to fetch safe balance from chain after indexer sync completed");
720 }
721 }
722 }
723
724 if let Some((channels_address, safe_address)) = channels_address.zip(safe_address) {
725 info!("updating safe allowance from chain after indexer sync completed");
726 match rpc.get_hopr_allowance(safe_address, channels_address).await {
727 Ok(allowance) => {
728 if let Err(error) = db.set_safe_hopr_allowance(None, allowance).await {
729 error!(%error, "failed to update safe allowance from chain after indexer sync completed");
730 }
731 }
732 Err(error) => {
733 error!(%error, "failed to fetch safe allowance from chain after indexer sync completed");
734 }
735 }
736 }
737
738 if let Err(error) = tx.try_send(()) {
739 error!(%error, "failed to notify about achieving indexer synchronization")
740 }
741 }
742 }
743 }
744
745 async fn has_logs_data(&self) -> Result<bool> {
758 self.db
759 .get_logs_count(None, None)
760 .await
761 .map(|count| count > 0)
762 .map_err(|e| CoreEthereumIndexerError::SnapshotError(e.to_string()))
763 }
764
765 pub async fn download_snapshot(&self) -> Result<SnapshotInfo> {
786 if let Err(e) = self.cfg.validate() {
788 return Err(CoreEthereumIndexerError::SnapshotError(e.to_string()));
789 }
790
791 let snapshot_manager = SnapshotManager::with_db(self.db.clone())
792 .map_err(|e| CoreEthereumIndexerError::SnapshotError(e.to_string()))?;
793
794 let data_dir = Path::new(&self.cfg.data_directory);
795
796 if let Some(url) = &self.cfg.logs_snapshot_url {
798 snapshot_manager
799 .download_and_setup_snapshot(url, data_dir)
800 .await
801 .map_err(|e| CoreEthereumIndexerError::SnapshotError(e.to_string()))
802 } else {
803 Err(CoreEthereumIndexerError::SnapshotError(
804 "Logs snapshot URL is not configured".to_string(),
805 ))
806 }
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use std::{collections::BTreeSet, pin::Pin};
813
814 use alloy::{
815 dyn_abi::DynSolValue,
816 primitives::{Address as AlloyAddress, B256},
817 sol_types::SolEvent,
818 };
819 use async_trait::async_trait;
820 use futures::{Stream, join};
821 use hex_literal::hex;
822 use hopr_chain_rpc::BlockWithLogs;
823 use hopr_chain_types::{ContractAddresses, chain_events::ChainEventType};
824 use hopr_crypto_types::{
825 keypairs::{Keypair, OffchainKeypair},
826 prelude::ChainKeypair,
827 };
828 use hopr_db_sql::{accounts::HoprDbAccountOperations, db::HoprDb};
829 use hopr_internal_types::account::{AccountEntry, AccountType};
830 use hopr_primitive_types::prelude::*;
831 use mockall::mock;
832 use multiaddr::Multiaddr;
833
834 use super::*;
835 use crate::traits::MockChainLogHandler;
836
837 lazy_static::lazy_static! {
838 static ref ALICE_OKP: OffchainKeypair = OffchainKeypair::random();
839 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
840 static ref ALICE: Address = ALICE_KP.public().to_address();
841 static ref BOB_OKP: OffchainKeypair = OffchainKeypair::random();
842 static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
843 static ref CHRIS: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
844
845 static ref RANDOM_ANNOUNCEMENT_CHAIN_EVENT: ChainEventType = ChainEventType::Announcement {
846 peer: (*OffchainKeypair::from_secret(&hex!("14d2d952715a51aadbd4cc6bfac9aa9927182040da7b336d37d5bb7247aa7566")).expect("lazy static keypair should be constructible").public()).into(),
847 address: hex!("2f4b7662a192b8125bbf51cfbf1bf5cc00b2c8e5").into(),
848 multiaddresses: vec![Multiaddr::empty()],
849 };
850 }
851
852 fn build_announcement_logs(
853 address: Address,
854 size: usize,
855 block_number: u64,
856 starting_log_index: u64,
857 ) -> anyhow::Result<Vec<SerializableLog>> {
858 let mut logs: Vec<SerializableLog> = vec![];
859 let block_hash = Hash::create(&[format!("my block hash {block_number}").as_bytes()]);
860
861 for i in 0..size {
862 let test_multiaddr: Multiaddr = format!("/ip4/1.2.3.4/tcp/{}", 1000 + i).parse()?;
863 let tx_index: u64 = i as u64;
864 let log_index: u64 = starting_log_index + tx_index;
865
866 logs.push(SerializableLog {
867 address,
868 block_hash: block_hash.into(),
869 topics: vec![hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH.into()],
870 data: DynSolValue::Tuple(vec![
871 DynSolValue::Address(AlloyAddress::from_slice(address.as_ref())),
872 DynSolValue::String(test_multiaddr.to_string()),
873 ])
874 .abi_encode(),
875 tx_hash: Hash::create(&[format!("my tx hash {i}").as_bytes()]).into(),
876 tx_index,
877 block_number,
878 log_index,
879 ..Default::default()
880 });
881 }
882
883 Ok(logs)
884 }
885
886 mock! {
887 HoprIndexerOps {} #[async_trait]
890 impl HoprIndexerRpcOperations for HoprIndexerOps {
891 async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
892 async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
893 async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance>;
894 async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
895
896 fn try_stream_logs<'a>(
897 &'a self,
898 start_block_number: u64,
899 filters: FilterSet,
900 is_synced: bool,
901 ) -> hopr_chain_rpc::errors::Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>>;
902 }
903 }
904
905 #[tokio::test]
906 async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_none_if_none_is_found()
907 -> anyhow::Result<()> {
908 let mut handlers = MockChainLogHandler::new();
909 let mut rpc = MockHoprIndexerOps::new();
910 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
911
912 let addr = Address::new(b"my address 123456789");
913 let topic = Hash::create(&[b"my topic"]);
914 db.ensure_logs_origin(vec![(addr, topic)]).await?;
915
916 handlers.expect_contract_addresses().return_const(vec![addr]);
917 handlers
918 .expect_contract_address_topics()
919 .withf(move |x| x == &addr)
920 .return_const(vec![B256::from_slice(topic.as_ref())]);
921 handlers
922 .expect_contract_address_topics()
923 .withf(move |x| x == &addr)
924 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
925 handlers
926 .expect_safe_address()
927 .return_const(Address::new(b"my safe address 1234"));
928 handlers
929 .expect_contract_addresses_map()
930 .return_const(ContractAddresses::default());
931
932 let head_block = 1000;
933 rpc.expect_block_number().times(2).returning(move || Ok(head_block));
934
935 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
936 rpc.expect_try_stream_logs()
937 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
938 .return_once(move |_, _, _| Ok(Box::pin(rx)));
939
940 let indexer = Indexer::new(
941 rpc,
942 handlers,
943 db.clone(),
944 IndexerConfig::default(),
945 async_channel::unbounded().0,
946 )
947 .without_panic_on_completion();
948
949 let (indexing, _) = join!(indexer.start(), async move {
950 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
951 tx.close_channel()
952 });
953 assert!(indexing.is_err()); Ok(())
956 }
957
958 #[tokio::test]
959 async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_it_when_found() -> anyhow::Result<()>
960 {
961 let mut handlers = MockChainLogHandler::new();
962 let mut rpc = MockHoprIndexerOps::new();
963 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
964 let head_block = 1000;
965 let latest_block = 15u64;
966
967 let addr = Address::new(b"my address 123456789");
968 let topic = Hash::create(&[b"my topic"]);
969
970 handlers.expect_contract_addresses().return_const(vec![addr]);
971 handlers
972 .expect_contract_address_topics()
973 .withf(move |x| x == &addr)
974 .return_const(vec![B256::from_slice(topic.as_ref())]);
975 handlers
976 .expect_contract_address_topics()
977 .withf(move |x| x == &addr)
978 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
979 handlers
980 .expect_safe_address()
981 .return_const(Address::new(b"my safe address 1234"));
982 handlers
983 .expect_contract_addresses_map()
984 .return_const(ContractAddresses::default());
985
986 db.ensure_logs_origin(vec![(addr, topic)]).await?;
987
988 rpc.expect_block_number().times(2).returning(move || Ok(head_block));
989
990 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
991 rpc.expect_try_stream_logs()
992 .once()
993 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == latest_block + 1)
994 .return_once(move |_, _, _| Ok(Box::pin(rx)));
995
996 let log_1 = SerializableLog {
998 address: Address::new(b"my address 123456789"),
999 topics: [Hash::create(&[b"my topic"]).into()].into(),
1000 data: [1, 2, 3, 4].into(),
1001 tx_index: 1u64,
1002 block_number: latest_block,
1003 block_hash: Hash::create(&[b"my block hash"]).into(),
1004 tx_hash: Hash::create(&[b"my tx hash"]).into(),
1005 log_index: 1u64,
1006 removed: false,
1007 processed: Some(false),
1008 ..Default::default()
1009 };
1010 assert!(db.store_log(log_1.clone()).await.is_ok());
1011 assert!(db.set_logs_processed(Some(latest_block), Some(0)).await.is_ok());
1012 assert!(db.update_logs_checksums().await.is_ok());
1013
1014 let indexer = Indexer::new(
1015 rpc,
1016 handlers,
1017 db.clone(),
1018 IndexerConfig {
1019 fast_sync: false,
1020 ..Default::default()
1021 },
1022 async_channel::unbounded().0,
1023 )
1024 .without_panic_on_completion();
1025
1026 let (indexing, _) = join!(indexer.start(), async move {
1027 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1028 tx.close_channel()
1029 });
1030 assert!(indexing.is_err()); Ok(())
1033 }
1034
1035 #[tokio::test]
1036 async fn test_indexer_should_pass_blocks_that_are_finalized() -> anyhow::Result<()> {
1037 let mut handlers = MockChainLogHandler::new();
1038 let mut rpc = MockHoprIndexerOps::new();
1039 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1040
1041 let cfg = IndexerConfig::default();
1042
1043 let addr = Address::new(b"my address 123456789");
1044 handlers.expect_contract_addresses().return_const(vec![addr]);
1045 handlers
1046 .expect_contract_address_topics()
1047 .withf(move |x| x == &addr)
1048 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1049 handlers
1050 .expect_safe_address()
1051 .return_const(Address::new(b"my safe address 1234"));
1052 handlers
1053 .expect_contract_addresses_map()
1054 .return_const(ContractAddresses::default());
1055
1056 let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1057 rpc.expect_try_stream_logs()
1058 .times(1)
1059 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
1060 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1061
1062 let head_block = 1000;
1063 rpc.expect_block_number().returning(move || Ok(head_block));
1064
1065 rpc.expect_get_hopr_balance()
1066 .withf(move |x| x == &Address::new(b"my safe address 1234"))
1067 .returning(move |_| Ok(HoprBalance::default()));
1068
1069 rpc.expect_get_hopr_allowance()
1070 .withf(move |x, y| x == &Address::new(b"my safe address 1234") && y == &Address::from([0; 20]))
1071 .returning(move |_, _| Ok(HoprBalance::default()));
1072
1073 let finalized_block = BlockWithLogs {
1074 block_id: head_block - 1,
1075 logs: BTreeSet::from_iter(build_announcement_logs(*BOB, 4, head_block - 1, 23)?),
1076 };
1077 let head_allowing_finalization = BlockWithLogs {
1078 block_id: head_block,
1079 logs: BTreeSet::new(),
1080 };
1081
1082 handlers
1084 .expect_collect_log_event()
1085 .times(finalized_block.logs.len())
1087 .returning(|_, _| Ok(None));
1088
1089 assert!(tx.start_send(finalized_block.clone()).is_ok());
1090 assert!(tx.start_send(head_allowing_finalization.clone()).is_ok());
1091
1092 let indexer =
1093 Indexer::new(rpc, handlers, db.clone(), cfg, async_channel::unbounded().0).without_panic_on_completion();
1094 let _ = join!(indexer.start(), async move {
1095 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1096 tx.close_channel()
1097 });
1098
1099 Ok(())
1100 }
1101
1102 #[test_log::test(tokio::test)]
1103 async fn test_indexer_fast_sync_full_with_resume() -> anyhow::Result<()> {
1104 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1105
1106 let addr = Address::new(b"my address 123456789");
1107 let topic = Hash::create(&[b"my topic"]);
1108
1109 {
1111 let logs = vec![
1112 build_announcement_logs(*BOB, 1, 1, 1)?,
1113 build_announcement_logs(*BOB, 1, 2, 1)?,
1114 ]
1115 .into_iter()
1116 .flatten()
1117 .collect::<Vec<_>>();
1118
1119 assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1120
1121 for log in logs {
1122 assert!(db.store_log(log).await.is_ok());
1123 }
1124 assert!(db.update_logs_checksums().await.is_ok());
1125 assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 0);
1126 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
1127
1128 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1129 let (tx_events, _) = async_channel::unbounded();
1130
1131 let head_block = 5;
1132 let mut rpc = MockHoprIndexerOps::new();
1133 rpc.expect_block_number().returning(move || Ok(head_block));
1134 rpc.expect_try_stream_logs()
1135 .times(1)
1136 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 3)
1137 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1138
1139 let mut handlers = MockChainLogHandler::new();
1140 handlers.expect_contract_addresses().return_const(vec![addr]);
1141 handlers
1142 .expect_contract_address_topics()
1143 .withf(move |x| x == &addr)
1144 .return_const(vec![B256::from_slice(topic.as_ref())]);
1145 handlers
1146 .expect_collect_log_event()
1147 .times(2)
1148 .withf(move |l, _| [1, 2].contains(&l.block_number))
1149 .returning(|_, _| Ok(None));
1150 handlers
1151 .expect_contract_address_topics()
1152 .withf(move |x| x == &addr)
1153 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1154 handlers
1155 .expect_safe_address()
1156 .return_const(Address::new(b"my safe address 1234"));
1157 handlers
1158 .expect_contract_addresses_map()
1159 .return_const(ContractAddresses::default());
1160
1161 let indexer_cfg = IndexerConfig::new(0, true, false, None, "/tmp/test_data".to_string());
1162 let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1163 let (indexing, _) = join!(indexer.start(), async move {
1164 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1165 tx.close_channel()
1166 });
1167 assert!(indexing.is_err()); assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
1170 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
1171
1172 db.insert_account(
1175 None,
1176 AccountEntry {
1177 public_key: *ALICE_OKP.public(),
1178 chain_addr: *ALICE,
1179 entry_type: AccountType::NotAnnounced,
1180 published_at: 1,
1181 },
1182 )
1183 .await?;
1184 db.insert_account(
1185 None,
1186 AccountEntry {
1187 public_key: *BOB_OKP.public(),
1188 chain_addr: *BOB,
1189 entry_type: AccountType::NotAnnounced,
1190 published_at: 1,
1191 },
1192 )
1193 .await?;
1194 }
1195
1196 {
1198 let logs = vec![
1199 build_announcement_logs(*BOB, 1, 3, 1)?,
1200 build_announcement_logs(*BOB, 1, 4, 1)?,
1201 ]
1202 .into_iter()
1203 .flatten()
1204 .collect::<Vec<_>>();
1205
1206 assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1207
1208 for log in logs {
1209 assert!(db.store_log(log).await.is_ok());
1210 }
1211 assert!(db.update_logs_checksums().await.is_ok());
1212 assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
1213 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
1214
1215 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1216 let (tx_events, _) = async_channel::unbounded();
1217
1218 let head_block = 5;
1219 let mut rpc = MockHoprIndexerOps::new();
1220 rpc.expect_block_number().returning(move || Ok(head_block));
1221 rpc.expect_try_stream_logs()
1222 .times(1)
1223 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 5)
1224 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1225
1226 let mut handlers = MockChainLogHandler::new();
1227 handlers.expect_contract_addresses().return_const(vec![addr]);
1228 handlers
1229 .expect_contract_address_topics()
1230 .withf(move |x| x == &addr)
1231 .return_const(vec![B256::from_slice(topic.as_ref())]);
1232
1233 handlers
1234 .expect_collect_log_event()
1235 .times(2)
1236 .withf(move |l, _| [3, 4].contains(&l.block_number))
1237 .returning(|_, _| Ok(None));
1238 handlers
1239 .expect_contract_address_topics()
1240 .withf(move |x| x == &addr)
1241 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1242 handlers
1243 .expect_safe_address()
1244 .return_const(Address::new(b"my safe address 1234"));
1245 handlers
1246 .expect_contract_addresses_map()
1247 .return_const(ContractAddresses::default());
1248
1249 let indexer_cfg = IndexerConfig::new(0, true, false, None, "/tmp/test_data".to_string());
1250 let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1251 let (indexing, _) = join!(indexer.start(), async move {
1252 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1253 tx.close_channel()
1254 });
1255 assert!(indexing.is_err()); assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 4);
1258 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
1259 }
1260
1261 Ok(())
1262 }
1263
1264 #[test_log::test(tokio::test)]
1265 async fn test_indexer_should_yield_back_once_the_past_events_are_indexed() -> anyhow::Result<()> {
1266 let mut handlers = MockChainLogHandler::new();
1267 let mut rpc = MockHoprIndexerOps::new();
1268 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1269
1270 let cfg = IndexerConfig::default();
1271
1272 let addr = Address::new(b"my address 123456789");
1274 handlers.expect_contract_addresses().return_const(vec![addr]);
1275 handlers
1276 .expect_contract_address_topics()
1277 .withf(move |x| x == &addr)
1278 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1279 handlers
1280 .expect_contract_address_topics()
1281 .withf(move |x| x == &addr)
1282 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1283 handlers
1284 .expect_safe_address()
1285 .return_const(Address::new(b"my safe address 1234"));
1286 handlers
1287 .expect_contract_addresses_map()
1288 .return_const(ContractAddresses::default());
1289
1290 let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1291 rpc.expect_try_stream_logs()
1293 .times(1)
1294 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
1295 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1296 rpc.expect_get_hopr_balance()
1297 .once()
1298 .return_once(move |_| Ok(HoprBalance::zero()));
1299 rpc.expect_get_hopr_allowance()
1300 .once()
1301 .return_once(move |_, _| Ok(HoprBalance::zero()));
1302
1303 let head_block = 1000;
1304 let block_numbers = [head_block - 1, head_block, head_block + 1];
1305
1306 let blocks: Vec<BlockWithLogs> = block_numbers
1307 .iter()
1308 .map(|block_id| BlockWithLogs {
1309 block_id: *block_id,
1310 logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, *block_id, 23).unwrap()),
1311 })
1312 .collect();
1313
1314 for _ in 0..(blocks.len() as u64) {
1315 rpc.expect_block_number().returning(move || Ok(head_block));
1316 }
1317
1318 for block in blocks.iter() {
1319 assert!(tx.start_send(block.clone()).is_ok());
1320 }
1321
1322 handlers
1324 .expect_collect_log_event()
1325 .times(1)
1326 .withf(move |l, _| block_numbers.contains(&l.block_number))
1327 .returning(|l, _| {
1328 let block_number = l.block_number;
1329 Ok(Some(SignificantChainEvent {
1330 tx_hash: Hash::create(&[format!("my tx hash {block_number}").as_bytes()]),
1331 event_type: RANDOM_ANNOUNCEMENT_CHAIN_EVENT.clone(),
1332 }))
1333 });
1334
1335 let (tx_events, rx_events) = async_channel::unbounded();
1336 let indexer = Indexer::new(rpc, handlers, db.clone(), cfg, tx_events).without_panic_on_completion();
1337 indexer.start().await?;
1338
1339 let _first = rx_events.recv();
1342 let _second = rx_events.recv();
1343 let third = rx_events.try_recv();
1344
1345 assert!(third.is_err());
1346
1347 Ok(())
1348 }
1349
1350 #[test_log::test(tokio::test)]
1351 async fn test_indexer_should_not_reprocess_last_processed_block() -> anyhow::Result<()> {
1352 let last_processed_block = 100_u64;
1353
1354 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1355
1356 let addr = Address::new(b"my address 123456789");
1357 let topic = Hash::create(&[b"my topic"]);
1358 assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1359
1360 let log_1 = SerializableLog {
1362 address: Address::new(b"my address 123456789"),
1363 topics: [Hash::create(&[b"my topic"]).into()].into(),
1364 data: [1, 2, 3, 4].into(),
1365 tx_index: 1u64,
1366 block_number: last_processed_block,
1367 block_hash: Hash::create(&[b"my block hash"]).into(),
1368 tx_hash: Hash::create(&[b"my tx hash"]).into(),
1369 log_index: 1u64,
1370 removed: false,
1371 processed: Some(false),
1372 ..Default::default()
1373 };
1374 assert!(db.store_log(log_1.clone()).await.is_ok());
1375 assert!(db.set_logs_processed(Some(last_processed_block), Some(0)).await.is_ok());
1376 assert!(db.update_logs_checksums().await.is_ok());
1377
1378 let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1379
1380 let mut rpc = MockHoprIndexerOps::new();
1381 rpc.expect_try_stream_logs()
1382 .once()
1383 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == last_processed_block + 1)
1384 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1385
1386 rpc.expect_block_number()
1387 .times(3)
1388 .returning(move || Ok(last_processed_block + 1));
1389
1390 rpc.expect_get_hopr_balance()
1391 .once()
1392 .return_once(move |_| Ok(HoprBalance::zero()));
1393
1394 rpc.expect_get_hopr_allowance()
1395 .once()
1396 .return_once(move |_, _| Ok(HoprBalance::zero()));
1397
1398 let block = BlockWithLogs {
1399 block_id: last_processed_block + 1,
1400 logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, last_processed_block + 1, 23)?),
1401 };
1402
1403 tx.start_send(block)?;
1404
1405 let mut handlers = MockChainLogHandler::new();
1406 handlers.expect_contract_addresses().return_const(vec![addr]);
1407 handlers
1408 .expect_contract_address_topics()
1409 .withf(move |x| x == &addr)
1410 .return_const(vec![B256::from_slice(topic.as_ref())]);
1411 handlers
1412 .expect_contract_address_topics()
1413 .withf(move |x| x == &addr)
1414 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1415 handlers
1416 .expect_safe_address()
1417 .return_const(Address::new(b"my safe address 1234"));
1418 handlers
1419 .expect_contract_addresses_map()
1420 .return_const(ContractAddresses::default());
1421
1422 let indexer_cfg = IndexerConfig::new(0, false, false, None, "/tmp/test_data".to_string());
1423
1424 let (tx_events, _) = async_channel::unbounded();
1425 let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1426 indexer.start().await?;
1427
1428 Ok(())
1429 }
1430}