1use std::sync::{
2 Arc,
3 atomic::{AtomicBool, AtomicU64, Ordering},
4};
5
6use alloy::sol_types::SolEvent;
7use futures::{
8 StreamExt,
9 future::AbortHandle,
10 stream::{self},
11};
12use hopr_bindings::hoprtoken::HoprToken::{Approval, Transfer};
13use hopr_chain_rpc::{BlockWithLogs, FilterSet, HoprIndexerRpcOperations};
14use hopr_chain_types::chain_events::SignificantChainEvent;
15use hopr_crypto_types::types::Hash;
16use hopr_db_api::logs::HoprDbLogOperations;
17use hopr_db_sql::{HoprDbGeneralModelOperations, info::HoprDbInfoOperations};
18use hopr_primitive_types::prelude::*;
19use tracing::{debug, error, info, trace};
20
21use crate::{
22 IndexerConfig,
23 errors::{CoreEthereumIndexerError, Result},
24 traits::ChainLogHandler,
25};
26
27#[cfg(all(feature = "prometheus", not(test)))]
28lazy_static::lazy_static! {
29 static ref METRIC_INDEXER_CURRENT_BLOCK: hopr_metrics::metrics::SimpleGauge =
30 hopr_metrics::metrics::SimpleGauge::new(
31 "hopr_indexer_block_number",
32 "Current last processed block number by the indexer",
33 ).unwrap();
34 static ref METRIC_INDEXER_CHECKSUM: hopr_metrics::metrics::SimpleGauge =
35 hopr_metrics::metrics::SimpleGauge::new(
36 "hopr_indexer_checksum",
37 "Contains an unsigned integer that represents the low 32-bits of the Indexer checksum"
38 ).unwrap();
39 static ref METRIC_INDEXER_SYNC_PROGRESS: hopr_metrics::metrics::SimpleGauge =
40 hopr_metrics::metrics::SimpleGauge::new(
41 "hopr_indexer_sync_progress",
42 "Sync progress of the historical data by the indexer",
43 ).unwrap();
44 static ref METRIC_INDEXER_SYNC_SOURCE: hopr_metrics::metrics::MultiGauge =
45 hopr_metrics::metrics::MultiGauge::new(
46 "hopr_indexer_data_source",
47 "Current data source of the Indexer",
48 &["source"],
49 ).unwrap();
50
51}
52
53#[derive(Debug, Clone)]
67pub struct Indexer<T, U, Db>
68where
69 T: HoprIndexerRpcOperations + Send + 'static,
70 U: ChainLogHandler + Send + 'static,
71 Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
72{
73 rpc: Option<T>,
74 db_processor: Option<U>,
75 db: Db,
76 cfg: IndexerConfig,
77 egress: async_channel::Sender<SignificantChainEvent>,
78 panic_on_completion: bool,
81}
82
83impl<T, U, Db> Indexer<T, U, Db>
84where
85 T: HoprIndexerRpcOperations + Sync + Send + 'static,
86 U: ChainLogHandler + Send + Sync + 'static,
87 Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
88{
89 pub fn new(
90 rpc: T,
91 db_processor: U,
92 db: Db,
93 cfg: IndexerConfig,
94 egress: async_channel::Sender<SignificantChainEvent>,
95 ) -> Self {
96 Self {
97 rpc: Some(rpc),
98 db_processor: Some(db_processor),
99 db,
100 cfg,
101 egress,
102 panic_on_completion: true,
103 }
104 }
105
106 pub fn without_panic_on_completion(mut self) -> Self {
108 self.panic_on_completion = false;
109 self
110 }
111
112 pub async fn start(mut self) -> Result<AbortHandle>
113 where
114 T: HoprIndexerRpcOperations + 'static,
115 U: ChainLogHandler + 'static,
116 Db: HoprDbGeneralModelOperations + HoprDbInfoOperations + HoprDbLogOperations + Clone + Send + Sync + 'static,
117 {
118 if self.rpc.is_none() || self.db_processor.is_none() {
119 return Err(CoreEthereumIndexerError::ProcessError(
120 "indexer cannot start, missing components".into(),
121 ));
122 }
123
124 info!("Starting chain indexing");
125
126 let rpc = self.rpc.take().expect("rpc should be present");
127 let logs_handler = Arc::new(self.db_processor.take().expect("db_processor should be present"));
128 let db = self.db.clone();
129 let tx_significant_events = self.egress.clone();
130 let panic_on_completion = self.panic_on_completion;
131
132 let (log_filters, address_topics) = Self::generate_log_filters(&logs_handler);
133
134 db.ensure_logs_origin(address_topics).await?;
137
138 let is_synced = Arc::new(AtomicBool::new(false));
139 let chain_head = Arc::new(AtomicU64::new(0));
140
141 debug!("Updating chain head at indexer startup");
144 Self::update_chain_head(&rpc, chain_head.clone()).await;
145
146 let fast_sync_configured = self.cfg.fast_sync;
153 let index_empty = self.db.index_is_empty().await?;
154
155 #[derive(PartialEq, Eq)]
156 enum FastSyncMode {
157 None,
158 FromScratch,
159 Continue,
160 }
161
162 let will_perform_fast_sync = match (fast_sync_configured, index_empty) {
163 (true, false) => {
164 info!(
165 "Fast sync is enabled, but the index database is not empty. Fast sync will continue on existing \
166 unprocessed logs."
167 );
168 FastSyncMode::Continue
169 }
170 (false, true) => {
171 info!("Fast sync is disabled, but the index database is empty. Doing a full re-sync.");
172 self.db.clear_index_db(None).await?;
174 self.db.set_logs_unprocessed(None, None).await?;
175 FastSyncMode::None
176 }
177 (false, false) => {
178 info!("Fast sync is disabled and the index database is not empty. Continuing normal sync.");
179 FastSyncMode::None
180 }
181 (true, true) => {
182 info!("Fast sync is enabled, starting the fast sync process");
183 self.db.clear_index_db(None).await?;
185 self.db.set_logs_unprocessed(None, None).await?;
186 FastSyncMode::FromScratch
187 }
188 };
189
190 let (tx, mut rx) = futures::channel::mpsc::channel::<()>(1);
191
192 if FastSyncMode::None != will_perform_fast_sync {
194 let processed = match will_perform_fast_sync {
195 FastSyncMode::FromScratch => None,
196 FastSyncMode::Continue => Some(false),
197 _ => unreachable!(),
198 };
199
200 #[cfg(all(feature = "prometheus", not(test)))]
201 {
202 METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 1.0);
203 METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 0.0);
204 }
205
206 let log_block_numbers = self.db.get_logs_block_numbers(None, None, processed).await?;
207 let _first_log_block_number = log_block_numbers.first().copied().unwrap_or(0);
208 let _head = chain_head.load(Ordering::Relaxed);
209 for block_number in log_block_numbers {
210 debug!(
211 block_number,
212 first_log_block_number = _first_log_block_number,
213 head = _head,
214 "computing processed logs"
215 );
216 Self::process_block_by_id(&db, &logs_handler, block_number, is_synced.load(Ordering::Relaxed)).await?;
218
219 #[cfg(all(feature = "prometheus", not(test)))]
220 {
221 let progress =
222 (block_number - _first_log_block_number) as f64 / (_head - _first_log_block_number) as f64;
223 METRIC_INDEXER_SYNC_PROGRESS.set(progress);
224 }
225 }
226 }
227
228 info!("Building rpc indexer background process");
229
230 let next_block_to_process = if let Some(last_log) = self.db.get_last_checksummed_log().await? {
231 info!(
232 start_block = last_log.block_number,
233 start_checksum = last_log.checksum.unwrap(),
234 "Loaded indexer state",
235 );
236
237 if self.cfg.start_block_number < last_log.block_number {
238 last_log.block_number + 1
240 } else {
241 self.cfg.start_block_number
242 }
243 } else {
244 self.cfg.start_block_number
245 };
246
247 info!(next_block_to_process, "Indexer start point");
248
249 let indexing_abort_handle = hopr_async_runtime::spawn_as_abortable(async move {
250 debug!("Updating chain head at indexer startup");
252 Self::update_chain_head(&rpc, chain_head.clone()).await;
253
254 #[cfg(all(feature = "prometheus", not(test)))]
255 {
256 METRIC_INDEXER_SYNC_SOURCE.set(&["fast-sync"], 0.0);
257 METRIC_INDEXER_SYNC_SOURCE.set(&["rpc"], 1.0);
258 }
259
260 let rpc_ref = &rpc;
261
262 let event_stream = rpc
263 .try_stream_logs(next_block_to_process, log_filters, is_synced.load(Ordering::Relaxed))
264 .expect("block stream should be constructible")
265 .then(|block| {
266 let db = db.clone();
267 let chain_head = chain_head.clone();
268 let is_synced = is_synced.clone();
269 let tx = tx.clone();
270 let logs_handler = logs_handler.clone();
271
272 async move {
273 Self::calculate_sync_process(
274 block.block_id,
275 rpc_ref,
276 db,
277 chain_head.clone(),
278 is_synced.clone(),
279 next_block_to_process,
280 tx.clone(),
281 logs_handler.safe_address().into(),
282 logs_handler.contract_addresses_map().channels.into(),
283 )
284 .await;
285
286 block
287 }
288 })
289 .filter_map(|block| {
290 let db = db.clone();
291 let logs_handler = logs_handler.clone();
292
293 async move {
294 debug!(%block, "storing logs from block");
295 let logs = block.logs.clone();
296
297 let logs_vec = logs
300 .into_iter()
301 .filter(|log| log.address != logs_handler.contract_addresses_map().token)
302 .collect();
303
304 match db.store_logs(logs_vec).await {
305 Ok(store_results) => {
306 if let Some(error) = store_results
307 .into_iter()
308 .filter(|r| r.is_err())
309 .map(|r| r.unwrap_err())
310 .next()
311 {
312 error!(%block, %error, "failed to processed stored logs from block");
313 None
314 } else {
315 Some(block)
316 }
317 }
318 Err(error) => {
319 error!(%block, %error, "failed to store logs from block");
320 None
321 }
322 }
323 }
324 })
325 .filter_map(|block| {
326 let db = db.clone();
327 let logs_handler = logs_handler.clone();
328 let is_synced = is_synced.clone();
329 async move {
330 Self::process_block(&db, &logs_handler, block, false, is_synced.load(Ordering::Relaxed)).await
331 }
332 })
333 .flat_map(stream::iter);
334
335 futures::pin_mut!(event_stream);
336 while let Some(event) = event_stream.next().await {
337 trace!(%event, "processing on-chain event");
338 if is_synced.load(Ordering::Relaxed) {
340 if let Err(error) = tx_significant_events.try_send(event) {
341 error!(%error, "failed to pass a significant chain event further");
342 }
343 }
344 }
345
346 if panic_on_completion {
347 panic!(
348 "Indexer event stream has been terminated. This error may be caused by a failed RPC connection."
349 );
350 }
351 });
352
353 if rx.next().await.is_some() {
354 Ok(indexing_abort_handle)
355 } else {
356 Err(crate::errors::CoreEthereumIndexerError::ProcessError(
357 "Error during indexing start".into(),
358 ))
359 }
360 }
361
362 fn generate_log_filters(logs_handler: &U) -> (FilterSet, Vec<(Address, Hash)>) {
381 let safe_address = logs_handler.safe_address();
382 let addresses_no_token = logs_handler
383 .contract_addresses()
384 .into_iter()
385 .filter(|a| *a != logs_handler.contract_addresses_map().token)
386 .collect::<Vec<_>>();
387 let mut filter_base_addresses = vec![];
388 let mut filter_base_topics = vec![];
389 let mut address_topics = vec![];
390
391 addresses_no_token.iter().for_each(|address| {
392 let topics = logs_handler.contract_address_topics(*address);
393 if !topics.is_empty() {
394 filter_base_addresses.push(alloy::primitives::Address::from(*address));
395 filter_base_topics.extend(topics.clone());
396 for topic in topics.iter() {
397 address_topics.push((*address, Hash::from(topic.0)))
398 }
399 }
400 });
401
402 let filter_base = alloy::rpc::types::Filter::new()
403 .address(filter_base_addresses)
404 .event_signature(filter_base_topics);
405 let filter_token = alloy::rpc::types::Filter::new().address(alloy::primitives::Address::from(
406 logs_handler.contract_addresses_map().token,
407 ));
408
409 let filter_transfer_to = filter_token
410 .clone()
411 .event_signature(Transfer::SIGNATURE_HASH)
412 .topic2(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()));
413
414 let filter_transfer_from = filter_token
415 .clone()
416 .event_signature(Transfer::SIGNATURE_HASH)
417 .topic1(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()));
418
419 let filter_approval = filter_token
420 .event_signature(Approval::SIGNATURE_HASH)
421 .topic1(alloy::primitives::B256::from_slice(safe_address.to_bytes32().as_ref()))
422 .topic2(alloy::primitives::B256::from_slice(
423 logs_handler.contract_addresses_map().channels.to_bytes32().as_ref(),
424 ));
425
426 let set = FilterSet {
427 all: vec![
428 filter_base.clone(),
429 filter_transfer_from.clone(),
430 filter_transfer_to.clone(),
431 filter_approval.clone(),
432 ],
433 token: vec![filter_transfer_from, filter_transfer_to, filter_approval],
434 no_token: vec![filter_base],
435 };
436
437 (set, address_topics)
438 }
439
440 async fn process_block_by_id(
456 db: &Db,
457 logs_handler: &U,
458 block_id: u64,
459 is_synced: bool,
460 ) -> crate::errors::Result<Option<Vec<SignificantChainEvent>>>
461 where
462 U: ChainLogHandler + 'static,
463 Db: HoprDbLogOperations + 'static,
464 {
465 let logs = db.get_logs(Some(block_id), Some(0)).await?;
466 let mut block = BlockWithLogs {
467 block_id,
468 ..Default::default()
469 };
470
471 for log in logs {
472 if log.block_number == block_id {
473 block.logs.insert(log);
474 } else {
475 error!(
476 expected = block_id,
477 actual = log.block_number,
478 "block number mismatch in logs from database"
479 );
480 panic!("block number mismatch in logs from database")
481 }
482 }
483
484 Ok(Self::process_block(db, logs_handler, block, true, is_synced).await)
485 }
486
487 async fn process_block(
502 db: &Db,
503 logs_handler: &U,
504 block: BlockWithLogs,
505 fetch_checksum_from_db: bool,
506 is_synced: bool,
507 ) -> Option<Vec<SignificantChainEvent>>
508 where
509 U: ChainLogHandler + 'static,
510 Db: HoprDbLogOperations + 'static,
511 {
512 let block_id = block.block_id;
513 let log_count = block.logs.len();
514 debug!(block_id, "processing events");
515
516 let events = stream::iter(block.logs.clone())
519 .filter_map(|log| async move {
520 match logs_handler.collect_log_event(log.clone(), is_synced).await {
521 Ok(data) => match db.set_log_processed(log).await {
522 Ok(_) => data,
523 Err(error) => {
524 error!(block_id, %error, "failed to mark log as processed, panicking to prevent data loss");
525 panic!("failed to mark log as processed, panicking to prevent data loss")
526 }
527 },
528 Err(error) => {
529 error!(block_id, %error, "failed to process log into event, panicking to prevent data loss");
530 panic!("failed to process log into event, panicking to prevent data loss")
531 }
532 }
533 })
534 .collect::<Vec<SignificantChainEvent>>()
535 .await;
536
537 match db.update_logs_checksums().await {
539 Ok(last_log_checksum) => {
540 let checksum = if fetch_checksum_from_db {
541 let last_log = block.logs.into_iter().next_back()?;
542 let log = db.get_log(block_id, last_log.tx_index, last_log.log_index).await.ok()?;
543
544 log.checksum?
545 } else {
546 last_log_checksum.to_string()
547 };
548
549 if log_count != 0 {
550 info!(
551 block_number = block_id,
552 log_count, last_log_checksum = ?checksum, "Indexer state update",
553 );
554
555 #[cfg(all(feature = "prometheus", not(test)))]
556 {
557 if let Ok(checksum_hash) = Hash::from_hex(checksum.as_str()) {
558 let low_4_bytes =
559 hopr_primitive_types::prelude::U256::from_big_endian(checksum_hash.as_ref()).low_u32();
560 METRIC_INDEXER_CHECKSUM.set(low_4_bytes.into());
561 } else {
562 error!("Invalid checksum generated from logs");
563 }
564 }
565 }
566
567 match db.set_indexer_state_info(None, block_id as u32).await {
569 Ok(_) => {
570 trace!(block_id, "updated indexer state info");
571 }
572 Err(error) => error!(block_id, %error, "failed to update indexer state info"),
573 }
574 }
575 Err(error) => error!(block_id, %error, "failed to update checksums for logs from block"),
576 }
577
578 debug!(
579 block_id,
580 num_events = events.len(),
581 "processed significant chain events from block",
582 );
583
584 Some(events)
585 }
586
587 async fn update_chain_head(rpc: &T, chain_head: Arc<AtomicU64>) -> u64
588 where
589 T: HoprIndexerRpcOperations + 'static,
590 {
591 match rpc.block_number().await {
592 Ok(head) => {
593 chain_head.store(head, Ordering::Relaxed);
594 debug!(head, "Updated chain head");
595 head
596 }
597 Err(error) => {
598 error!(%error, "Failed to fetch block number from RPC");
599 panic!("Failed to fetch block number from RPC, cannot continue indexing due to {error}")
600 }
601 }
602 }
603
604 #[allow(clippy::too_many_arguments)]
621 async fn calculate_sync_process(
622 current_block: u64,
623 rpc: &T,
624 db: Db,
625 chain_head: Arc<AtomicU64>,
626 is_synced: Arc<AtomicBool>,
627 next_block_to_process: u64,
628 mut tx: futures::channel::mpsc::Sender<()>,
629 safe_address: Option<Address>,
630 channels_address: Option<Address>,
631 ) where
632 T: HoprIndexerRpcOperations + 'static,
633 Db: HoprDbInfoOperations + Clone + Send + Sync + 'static,
634 {
635 #[cfg(all(feature = "prometheus", not(test)))]
636 {
637 METRIC_INDEXER_CURRENT_BLOCK.set(current_block as f64);
638 }
639
640 let mut head = chain_head.load(Ordering::Relaxed);
641
642 if !is_synced.load(Ordering::Relaxed) {
645 let mut block_difference = head.saturating_sub(next_block_to_process);
646
647 let progress = if block_difference == 0 {
648 head = Self::update_chain_head(rpc, chain_head.clone()).await;
650 block_difference = head.saturating_sub(next_block_to_process);
651
652 if block_difference == 0 {
653 1_f64
654 } else {
655 (current_block - next_block_to_process) as f64 / block_difference as f64
656 }
657 } else {
658 (current_block - next_block_to_process) as f64 / block_difference as f64
659 };
660
661 info!(
662 progress = progress * 100_f64,
663 block = current_block,
664 head,
665 "Sync progress to last known head"
666 );
667
668 #[cfg(all(feature = "prometheus", not(test)))]
669 METRIC_INDEXER_SYNC_PROGRESS.set(progress);
670
671 if current_block >= head {
672 info!("indexer sync completed successfully");
673 is_synced.store(true, Ordering::Relaxed);
674
675 if let Some(safe_address) = safe_address {
676 info!("updating safe balance from chain after indexer sync completed");
677 match rpc.get_hopr_balance(safe_address).await {
678 Ok(balance) => {
679 if let Err(error) = db.set_safe_hopr_balance(None, balance).await {
680 error!(%error, "failed to update safe balance from chain after indexer sync completed");
681 }
682 }
683 Err(error) => {
684 error!(%error, "failed to fetch safe balance from chain after indexer sync completed");
685 }
686 }
687 }
688
689 if let Some((channels_address, safe_address)) = channels_address.zip(safe_address) {
690 info!("updating safe allowance from chain after indexer sync completed");
691 match rpc.get_hopr_allowance(safe_address, channels_address).await {
692 Ok(allowance) => {
693 if let Err(error) = db.set_safe_hopr_allowance(None, allowance).await {
694 error!(%error, "failed to update safe allowance from chain after indexer sync completed");
695 }
696 }
697 Err(error) => {
698 error!(%error, "failed to fetch safe allowance from chain after indexer sync completed");
699 }
700 }
701 }
702
703 if let Err(error) = tx.try_send(()) {
704 error!(%error, "failed to notify about achieving indexer synchronization")
705 }
706 }
707 }
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use std::{collections::BTreeSet, pin::Pin};
714
715 use alloy::{
716 dyn_abi::DynSolValue,
717 primitives::{Address as AlloyAddress, B256},
718 sol_types::SolEvent,
719 };
720 use async_trait::async_trait;
721 use futures::{Stream, join};
722 use hex_literal::hex;
723 use hopr_chain_rpc::BlockWithLogs;
724 use hopr_chain_types::{ContractAddresses, chain_events::ChainEventType};
725 use hopr_crypto_types::{
726 keypairs::{Keypair, OffchainKeypair},
727 prelude::ChainKeypair,
728 };
729 use hopr_db_sql::{accounts::HoprDbAccountOperations, db::HoprDb};
730 use hopr_internal_types::account::{AccountEntry, AccountType};
731 use hopr_primitive_types::prelude::*;
732 use mockall::mock;
733 use multiaddr::Multiaddr;
734
735 use super::*;
736 use crate::traits::MockChainLogHandler;
737
738 lazy_static::lazy_static! {
739 static ref ALICE_OKP: OffchainKeypair = OffchainKeypair::random();
740 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
741 static ref ALICE: Address = ALICE_KP.public().to_address();
742 static ref BOB_OKP: OffchainKeypair = OffchainKeypair::random();
743 static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
744 static ref CHRIS: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
745
746 static ref RANDOM_ANNOUNCEMENT_CHAIN_EVENT: ChainEventType = ChainEventType::Announcement {
747 peer: (*OffchainKeypair::from_secret(&hex!("14d2d952715a51aadbd4cc6bfac9aa9927182040da7b336d37d5bb7247aa7566")).expect("lazy static keypair should be constructible").public()).into(),
748 address: hex!("2f4b7662a192b8125bbf51cfbf1bf5cc00b2c8e5").into(),
749 multiaddresses: vec![Multiaddr::empty()],
750 };
751 }
752
753 fn build_announcement_logs(
754 address: Address,
755 size: usize,
756 block_number: u64,
757 starting_log_index: u64,
758 ) -> anyhow::Result<Vec<SerializableLog>> {
759 let mut logs: Vec<SerializableLog> = vec![];
760 let block_hash = Hash::create(&[format!("my block hash {block_number}").as_bytes()]);
761
762 for i in 0..size {
763 let test_multiaddr: Multiaddr = format!("/ip4/1.2.3.4/tcp/{}", 1000 + i).parse()?;
764 let tx_index: u64 = i as u64;
765 let log_index: u64 = starting_log_index + tx_index;
766
767 logs.push(SerializableLog {
768 address,
769 block_hash: block_hash.into(),
770 topics: vec![hopr_bindings::hoprannouncementsevents::HoprAnnouncementsEvents::AddressAnnouncement::SIGNATURE_HASH.into()],
771 data: DynSolValue::Tuple(vec![
772 DynSolValue::Address(AlloyAddress::from_slice(address.as_ref())),
773 DynSolValue::String(test_multiaddr.to_string()),
774 ])
775 .abi_encode(),
776 tx_hash: Hash::create(&[format!("my tx hash {i}").as_bytes()]).into(),
777 tx_index,
778 block_number,
779 log_index,
780 ..Default::default()
781 });
782 }
783
784 Ok(logs)
785 }
786
787 mock! {
788 HoprIndexerOps {} #[async_trait]
791 impl HoprIndexerRpcOperations for HoprIndexerOps {
792 async fn block_number(&self) -> hopr_chain_rpc::errors::Result<u64>;
793 async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
794 async fn get_xdai_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<XDaiBalance>;
795 async fn get_hopr_balance(&self, address: Address) -> hopr_chain_rpc::errors::Result<HoprBalance>;
796
797 fn try_stream_logs<'a>(
798 &'a self,
799 start_block_number: u64,
800 filters: FilterSet,
801 is_synced: bool,
802 ) -> hopr_chain_rpc::errors::Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>>;
803 }
804 }
805
806 #[tokio::test]
807 async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_none_if_none_is_found()
808 -> anyhow::Result<()> {
809 let mut handlers = MockChainLogHandler::new();
810 let mut rpc = MockHoprIndexerOps::new();
811 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
812
813 let addr = Address::new(b"my address 123456789");
814 let topic = Hash::create(&[b"my topic"]);
815 db.ensure_logs_origin(vec![(addr, topic)]).await?;
816
817 handlers.expect_contract_addresses().return_const(vec![addr]);
818 handlers
819 .expect_contract_address_topics()
820 .withf(move |x| x == &addr)
821 .return_const(vec![B256::from_slice(topic.as_ref())]);
822 handlers
823 .expect_contract_address_topics()
824 .withf(move |x| x == &addr)
825 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
826 handlers
827 .expect_safe_address()
828 .return_const(Address::new(b"my safe address 1234"));
829 handlers
830 .expect_contract_addresses_map()
831 .return_const(ContractAddresses::default());
832
833 let head_block = 1000;
834 rpc.expect_block_number().times(2).returning(move || Ok(head_block));
835
836 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
837 rpc.expect_try_stream_logs()
838 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
839 .return_once(move |_, _, _| Ok(Box::pin(rx)));
840
841 let indexer = Indexer::new(
842 rpc,
843 handlers,
844 db.clone(),
845 IndexerConfig::default(),
846 async_channel::unbounded().0,
847 )
848 .without_panic_on_completion();
849
850 let (indexing, _) = join!(indexer.start(), async move {
851 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
852 tx.close_channel()
853 });
854 assert!(indexing.is_err()); Ok(())
857 }
858
859 #[tokio::test]
860 async fn test_indexer_should_check_the_db_for_last_processed_block_and_supply_it_when_found() -> anyhow::Result<()>
861 {
862 let mut handlers = MockChainLogHandler::new();
863 let mut rpc = MockHoprIndexerOps::new();
864 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
865 let head_block = 1000;
866 let latest_block = 15u64;
867
868 let addr = Address::new(b"my address 123456789");
869 let topic = Hash::create(&[b"my topic"]);
870
871 handlers.expect_contract_addresses().return_const(vec![addr]);
872 handlers
873 .expect_contract_address_topics()
874 .withf(move |x| x == &addr)
875 .return_const(vec![B256::from_slice(topic.as_ref())]);
876 handlers
877 .expect_contract_address_topics()
878 .withf(move |x| x == &addr)
879 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
880 handlers
881 .expect_safe_address()
882 .return_const(Address::new(b"my safe address 1234"));
883 handlers
884 .expect_contract_addresses_map()
885 .return_const(ContractAddresses::default());
886
887 db.ensure_logs_origin(vec![(addr, topic)]).await?;
888
889 rpc.expect_block_number().times(2).returning(move || Ok(head_block));
890
891 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
892 rpc.expect_try_stream_logs()
893 .once()
894 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == latest_block + 1)
895 .return_once(move |_, _, _| Ok(Box::pin(rx)));
896
897 let log_1 = SerializableLog {
899 address: Address::new(b"my address 123456789"),
900 topics: [Hash::create(&[b"my topic"]).into()].into(),
901 data: [1, 2, 3, 4].into(),
902 tx_index: 1u64,
903 block_number: latest_block,
904 block_hash: Hash::create(&[b"my block hash"]).into(),
905 tx_hash: Hash::create(&[b"my tx hash"]).into(),
906 log_index: 1u64,
907 removed: false,
908 processed: Some(false),
909 ..Default::default()
910 };
911 assert!(db.store_log(log_1.clone()).await.is_ok());
912 assert!(db.set_logs_processed(Some(latest_block), Some(0)).await.is_ok());
913 assert!(db.update_logs_checksums().await.is_ok());
914
915 let indexer = Indexer::new(
916 rpc,
917 handlers,
918 db.clone(),
919 IndexerConfig {
920 fast_sync: false,
921 ..Default::default()
922 },
923 async_channel::unbounded().0,
924 )
925 .without_panic_on_completion();
926
927 let (indexing, _) = join!(indexer.start(), async move {
928 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
929 tx.close_channel()
930 });
931 assert!(indexing.is_err()); Ok(())
934 }
935
936 #[tokio::test]
937 async fn test_indexer_should_pass_blocks_that_are_finalized() -> anyhow::Result<()> {
938 let mut handlers = MockChainLogHandler::new();
939 let mut rpc = MockHoprIndexerOps::new();
940 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
941
942 let cfg = IndexerConfig::default();
943
944 let addr = Address::new(b"my address 123456789");
945 handlers.expect_contract_addresses().return_const(vec![addr]);
946 handlers
947 .expect_contract_address_topics()
948 .withf(move |x| x == &addr)
949 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
950 handlers
951 .expect_safe_address()
952 .return_const(Address::new(b"my safe address 1234"));
953 handlers
954 .expect_contract_addresses_map()
955 .return_const(ContractAddresses::default());
956
957 let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
958 rpc.expect_try_stream_logs()
959 .times(1)
960 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
961 .return_once(move |_, _, _| Ok(Box::pin(rx)));
962
963 let head_block = 1000;
964 rpc.expect_block_number().returning(move || Ok(head_block));
965
966 rpc.expect_get_hopr_balance()
967 .withf(move |x| x == &Address::new(b"my safe address 1234"))
968 .returning(move |_| Ok(HoprBalance::default()));
969
970 rpc.expect_get_hopr_allowance()
971 .withf(move |x, y| x == &Address::new(b"my safe address 1234") && y == &Address::from([0; 20]))
972 .returning(move |_, _| Ok(HoprBalance::default()));
973
974 let finalized_block = BlockWithLogs {
975 block_id: head_block - 1,
976 logs: BTreeSet::from_iter(build_announcement_logs(*BOB, 4, head_block - 1, 23)?),
977 };
978 let head_allowing_finalization = BlockWithLogs {
979 block_id: head_block,
980 logs: BTreeSet::new(),
981 };
982
983 handlers
985 .expect_collect_log_event()
986 .times(finalized_block.logs.len())
988 .returning(|_, _| Ok(None));
989
990 assert!(tx.start_send(finalized_block.clone()).is_ok());
991 assert!(tx.start_send(head_allowing_finalization.clone()).is_ok());
992
993 let indexer =
994 Indexer::new(rpc, handlers, db.clone(), cfg, async_channel::unbounded().0).without_panic_on_completion();
995 let _ = join!(indexer.start(), async move {
996 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
997 tx.close_channel()
998 });
999
1000 Ok(())
1001 }
1002
1003 #[test_log::test(tokio::test)]
1004 async fn test_indexer_fast_sync_full_with_resume() -> anyhow::Result<()> {
1005 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1006
1007 let addr = Address::new(b"my address 123456789");
1008 let topic = Hash::create(&[b"my topic"]);
1009
1010 {
1012 let logs = vec![
1013 build_announcement_logs(*BOB, 1, 1, 1)?,
1014 build_announcement_logs(*BOB, 1, 2, 1)?,
1015 ]
1016 .into_iter()
1017 .flatten()
1018 .collect::<Vec<_>>();
1019
1020 assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1021
1022 for log in logs {
1023 assert!(db.store_log(log).await.is_ok());
1024 }
1025 assert!(db.update_logs_checksums().await.is_ok());
1026 assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 0);
1027 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
1028
1029 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1030 let (tx_events, _) = async_channel::unbounded();
1031
1032 let head_block = 5;
1033 let mut rpc = MockHoprIndexerOps::new();
1034 rpc.expect_block_number().returning(move || Ok(head_block));
1035 rpc.expect_try_stream_logs()
1036 .times(1)
1037 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 3)
1038 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1039
1040 let mut handlers = MockChainLogHandler::new();
1041 handlers.expect_contract_addresses().return_const(vec![addr]);
1042 handlers
1043 .expect_contract_address_topics()
1044 .withf(move |x| x == &addr)
1045 .return_const(vec![B256::from_slice(topic.as_ref())]);
1046 handlers
1047 .expect_collect_log_event()
1048 .times(2)
1049 .withf(move |l, _| [1, 2].contains(&l.block_number))
1050 .returning(|_, _| Ok(None));
1051 handlers
1052 .expect_contract_address_topics()
1053 .withf(move |x| x == &addr)
1054 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1055 handlers
1056 .expect_safe_address()
1057 .return_const(Address::new(b"my safe address 1234"));
1058 handlers
1059 .expect_contract_addresses_map()
1060 .return_const(ContractAddresses::default());
1061
1062 let indexer_cfg = IndexerConfig {
1063 start_block_number: 0,
1064 fast_sync: true,
1065 };
1066 let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1067 let (indexing, _) = join!(indexer.start(), async move {
1068 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1069 tx.close_channel()
1070 });
1071 assert!(indexing.is_err()); assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
1074 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
1075
1076 db.insert_account(
1079 None,
1080 AccountEntry {
1081 public_key: *ALICE_OKP.public(),
1082 chain_addr: *ALICE,
1083 entry_type: AccountType::NotAnnounced,
1084 published_at: 1,
1085 },
1086 )
1087 .await?;
1088 db.insert_account(
1089 None,
1090 AccountEntry {
1091 public_key: *BOB_OKP.public(),
1092 chain_addr: *BOB,
1093 entry_type: AccountType::NotAnnounced,
1094 published_at: 1,
1095 },
1096 )
1097 .await?;
1098 }
1099
1100 {
1102 let logs = vec![
1103 build_announcement_logs(*BOB, 1, 3, 1)?,
1104 build_announcement_logs(*BOB, 1, 4, 1)?,
1105 ]
1106 .into_iter()
1107 .flatten()
1108 .collect::<Vec<_>>();
1109
1110 assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1111
1112 for log in logs {
1113 assert!(db.store_log(log).await.is_ok());
1114 }
1115 assert!(db.update_logs_checksums().await.is_ok());
1116 assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 2);
1117 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 2);
1118
1119 let (tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1120 let (tx_events, _) = async_channel::unbounded();
1121
1122 let head_block = 5;
1123 let mut rpc = MockHoprIndexerOps::new();
1124 rpc.expect_block_number().returning(move || Ok(head_block));
1125 rpc.expect_try_stream_logs()
1126 .times(1)
1127 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 5)
1128 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1129
1130 let mut handlers = MockChainLogHandler::new();
1131 handlers.expect_contract_addresses().return_const(vec![addr]);
1132 handlers
1133 .expect_contract_address_topics()
1134 .withf(move |x| x == &addr)
1135 .return_const(vec![B256::from_slice(topic.as_ref())]);
1136
1137 handlers
1138 .expect_collect_log_event()
1139 .times(2)
1140 .withf(move |l, _| [3, 4].contains(&l.block_number))
1141 .returning(|_, _| Ok(None));
1142 handlers
1143 .expect_contract_address_topics()
1144 .withf(move |x| x == &addr)
1145 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1146 handlers
1147 .expect_safe_address()
1148 .return_const(Address::new(b"my safe address 1234"));
1149 handlers
1150 .expect_contract_addresses_map()
1151 .return_const(ContractAddresses::default());
1152
1153 let indexer_cfg = IndexerConfig {
1154 start_block_number: 0,
1155 fast_sync: true,
1156 };
1157 let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1158 let (indexing, _) = join!(indexer.start(), async move {
1159 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1160 tx.close_channel()
1161 });
1162 assert!(indexing.is_err()); assert_eq!(db.get_logs_block_numbers(None, None, Some(true)).await?.len(), 4);
1165 assert_eq!(db.get_logs_block_numbers(None, None, Some(false)).await?.len(), 0);
1166 }
1167
1168 Ok(())
1169 }
1170
1171 #[test_log::test(tokio::test)]
1172 async fn test_indexer_should_yield_back_once_the_past_events_are_indexed() -> anyhow::Result<()> {
1173 let mut handlers = MockChainLogHandler::new();
1174 let mut rpc = MockHoprIndexerOps::new();
1175 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1176
1177 let cfg = IndexerConfig::default();
1178
1179 let addr = Address::new(b"my address 123456789");
1181 handlers.expect_contract_addresses().return_const(vec![addr]);
1182 handlers
1183 .expect_contract_address_topics()
1184 .withf(move |x| x == &addr)
1185 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1186 handlers
1187 .expect_contract_address_topics()
1188 .withf(move |x| x == &addr)
1189 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1190 handlers
1191 .expect_safe_address()
1192 .return_const(Address::new(b"my safe address 1234"));
1193 handlers
1194 .expect_contract_addresses_map()
1195 .return_const(ContractAddresses::default());
1196
1197 let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1198 rpc.expect_try_stream_logs()
1200 .times(1)
1201 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == 0)
1202 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1203 rpc.expect_get_hopr_balance()
1204 .once()
1205 .return_once(move |_| Ok(HoprBalance::zero()));
1206 rpc.expect_get_hopr_allowance()
1207 .once()
1208 .return_once(move |_, _| Ok(HoprBalance::zero()));
1209
1210 let head_block = 1000;
1211 let block_numbers = [head_block - 1, head_block, head_block + 1];
1212
1213 let blocks: Vec<BlockWithLogs> = block_numbers
1214 .iter()
1215 .map(|block_id| BlockWithLogs {
1216 block_id: *block_id,
1217 logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, *block_id, 23).unwrap()),
1218 })
1219 .collect();
1220
1221 for _ in 0..(blocks.len() as u64) {
1222 rpc.expect_block_number().returning(move || Ok(head_block));
1223 }
1224
1225 for block in blocks.iter() {
1226 assert!(tx.start_send(block.clone()).is_ok());
1227 }
1228
1229 handlers
1231 .expect_collect_log_event()
1232 .times(1)
1233 .withf(move |l, _| block_numbers.contains(&l.block_number))
1234 .returning(|l, _| {
1235 let block_number = l.block_number;
1236 Ok(Some(SignificantChainEvent {
1237 tx_hash: Hash::create(&[format!("my tx hash {block_number}").as_bytes()]),
1238 event_type: RANDOM_ANNOUNCEMENT_CHAIN_EVENT.clone(),
1239 }))
1240 });
1241
1242 let (tx_events, rx_events) = async_channel::unbounded();
1243 let indexer = Indexer::new(rpc, handlers, db.clone(), cfg, tx_events).without_panic_on_completion();
1244 indexer.start().await?;
1245
1246 let _first = rx_events.recv();
1249 let _second = rx_events.recv();
1250 let third = rx_events.try_recv();
1251
1252 assert!(third.is_err());
1253
1254 Ok(())
1255 }
1256
1257 #[test_log::test(tokio::test)]
1258 async fn test_indexer_should_not_reprocess_last_processed_block() -> anyhow::Result<()> {
1259 let last_processed_block = 100_u64;
1260
1261 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
1262
1263 let addr = Address::new(b"my address 123456789");
1264 let topic = Hash::create(&[b"my topic"]);
1265 assert!(db.ensure_logs_origin(vec![(addr, topic)]).await.is_ok());
1266
1267 let log_1 = SerializableLog {
1269 address: Address::new(b"my address 123456789"),
1270 topics: [Hash::create(&[b"my topic"]).into()].into(),
1271 data: [1, 2, 3, 4].into(),
1272 tx_index: 1u64,
1273 block_number: last_processed_block,
1274 block_hash: Hash::create(&[b"my block hash"]).into(),
1275 tx_hash: Hash::create(&[b"my tx hash"]).into(),
1276 log_index: 1u64,
1277 removed: false,
1278 processed: Some(false),
1279 ..Default::default()
1280 };
1281 assert!(db.store_log(log_1.clone()).await.is_ok());
1282 assert!(db.set_logs_processed(Some(last_processed_block), Some(0)).await.is_ok());
1283 assert!(db.update_logs_checksums().await.is_ok());
1284
1285 let (mut tx, rx) = futures::channel::mpsc::unbounded::<BlockWithLogs>();
1286
1287 let mut rpc = MockHoprIndexerOps::new();
1288 rpc.expect_try_stream_logs()
1289 .once()
1290 .withf(move |x: &u64, _y: &FilterSet, _: &bool| *x == last_processed_block + 1)
1291 .return_once(move |_, _, _| Ok(Box::pin(rx)));
1292
1293 rpc.expect_block_number()
1294 .times(3)
1295 .returning(move || Ok(last_processed_block + 1));
1296
1297 rpc.expect_get_hopr_balance()
1298 .once()
1299 .return_once(move |_| Ok(HoprBalance::zero()));
1300
1301 rpc.expect_get_hopr_allowance()
1302 .once()
1303 .return_once(move |_, _| Ok(HoprBalance::zero()));
1304
1305 let block = BlockWithLogs {
1306 block_id: last_processed_block + 1,
1307 logs: BTreeSet::from_iter(build_announcement_logs(*ALICE, 1, last_processed_block + 1, 23)?),
1308 };
1309
1310 tx.start_send(block)?;
1311
1312 let mut handlers = MockChainLogHandler::new();
1313 handlers.expect_contract_addresses().return_const(vec![addr]);
1314 handlers
1315 .expect_contract_address_topics()
1316 .withf(move |x| x == &addr)
1317 .return_const(vec![B256::from_slice(topic.as_ref())]);
1318 handlers
1319 .expect_contract_address_topics()
1320 .withf(move |x| x == &addr)
1321 .return_const(vec![B256::from_slice(Hash::create(&[b"my topic"]).as_ref())]);
1322 handlers
1323 .expect_safe_address()
1324 .return_const(Address::new(b"my safe address 1234"));
1325 handlers
1326 .expect_contract_addresses_map()
1327 .return_const(ContractAddresses::default());
1328
1329 let indexer_cfg = IndexerConfig {
1330 start_block_number: 0,
1331 fast_sync: false,
1332 };
1333
1334 let (tx_events, _) = async_channel::unbounded();
1335 let indexer = Indexer::new(rpc, handlers, db.clone(), indexer_cfg, tx_events).without_panic_on_completion();
1336 indexer.start().await?;
1337
1338 Ok(())
1339 }
1340}