1use std::pin::Pin;
11
12use alloy::{providers::Provider, rpc::types::Filter};
13use async_stream::stream;
14use async_trait::async_trait;
15use futures::{Stream, StreamExt, stream::BoxStream};
16#[cfg(all(feature = "prometheus", not(test)))]
17use hopr_metrics::metrics::SimpleGauge;
18use hopr_primitive_types::prelude::*;
19use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
20use tracing::{debug, error, trace, warn};
21
22use crate::{
23 BlockWithLogs, FilterSet, HoprIndexerRpcOperations, Log,
24 errors::{Result, RpcError, RpcError::FilterIsEmpty},
25 rpc::RpcOperations,
26 transport::HttpRequestor,
27};
28
29#[cfg(all(feature = "prometheus", not(test)))]
30lazy_static::lazy_static! {
31 static ref METRIC_RPC_CHAIN_HEAD: SimpleGauge =
32 SimpleGauge::new(
33 "hopr_chain_head_block_number",
34 "Current block number of chain head",
35 ).unwrap();
36}
37
38fn split_range<'a>(
53 filters: Vec<Filter>,
54 from_block: u64,
55 to_block: u64,
56 max_chunk_size: u64,
57) -> BoxStream<'a, Vec<Filter>> {
58 assert!(from_block <= to_block, "invalid block range");
59 assert!(max_chunk_size > 0, "chunk size must be greater than 0");
60
61 futures::stream::unfold((from_block, to_block), move |(start, to)| {
62 if start <= to {
63 let end = to_block.min(start + max_chunk_size - 1);
64 let ranged_filters = filters
65 .iter()
66 .cloned()
67 .map(|f| f.from_block(start).to_block(end))
68 .collect::<Vec<_>>();
69 futures::future::ready(Some((ranged_filters, (end + 1, to))))
70 } else {
71 futures::future::ready(None)
72 }
73 })
74 .boxed()
75}
76
77impl<R: HttpRequestor + 'static + Clone> RpcOperations<R> {
79 fn stream_logs(&self, filters: Vec<Filter>, from_block: u64, to_block: u64) -> BoxStream<Result<Log>> {
81 let fetch_ranges = split_range(filters, from_block, to_block, self.cfg.max_block_range_fetch_size);
82
83 debug!(
84 "polling logs from blocks #{from_block} - #{to_block} (via {:?} chunks)",
85 (to_block - from_block) / self.cfg.max_block_range_fetch_size + 1
86 );
87
88 fetch_ranges
89 .then(move |subrange_filters| async move {
90 let mut results = futures::stream::iter(subrange_filters)
91 .then_concurrent(|filter| async move {
92 let prov_clone = self.provider.clone();
93
94 match prov_clone.get_logs(&filter).await {
95 Ok(logs) => Ok(logs),
96 Err(e) => {
97 error!(
98 from = ?filter.get_from_block(),
99 to = ?filter.get_to_block(),
100 error = %e,
101 "failed to fetch logs in block subrange"
102 );
103 Err(e)
104 }
105 }
106 })
107 .flat_map(|result| {
108 futures::stream::iter(match result {
109 Ok(logs) => logs.into_iter().map(|log| Ok(Log::try_from(log)?)).collect::<Vec<_>>(),
110 Err(e) => vec![Err(RpcError::from(e))],
111 })
112 })
113 .collect::<Vec<_>>()
114 .await;
115
116 results.sort_by(|a, b| {
119 if let Ok(a) = a {
120 if let Ok(b) = b {
121 a.block_number.cmp(&b.block_number)
122 } else {
123 std::cmp::Ordering::Greater
124 }
125 } else {
126 std::cmp::Ordering::Less
127 }
128 });
129
130 futures::stream::iter(results)
131 })
132 .flatten()
133 .boxed()
134 }
135}
136
137#[async_trait]
138impl<R: HttpRequestor + 'static + Clone> HoprIndexerRpcOperations for RpcOperations<R> {
139 async fn block_number(&self) -> Result<u64> {
140 self.get_block_number().await
141 }
142
143 async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> Result<HoprBalance> {
144 self.get_hopr_allowance(owner, spender).await
145 }
146
147 async fn get_xdai_balance(&self, address: Address) -> Result<XDaiBalance> {
148 self.get_xdai_balance(address).await
149 }
150
151 async fn get_hopr_balance(&self, address: Address) -> Result<HoprBalance> {
152 self.get_hopr_balance(address).await
153 }
154
155 fn try_stream_logs<'a>(
156 &'a self,
157 start_block_number: u64,
158 filters: FilterSet,
159 is_synced: bool,
160 ) -> Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>> {
161 if filters.all.is_empty() {
162 return Err(FilterIsEmpty);
163 }
164
165 let log_filters = if !is_synced {
166 filters.no_token
169 } else {
170 filters.all
171 };
172
173 Ok(Box::pin(stream! {
174 let mut from_block = start_block_number;
176
177 const MAX_LOOP_FAILURES: usize = 5;
178 const MAX_RPC_PAST_BLOCKS: usize = 50;
179 let mut count_failures = 0;
180
181 'outer: loop {
182 match self.block_number().await {
183 Ok(latest_block) => {
184 if from_block > latest_block {
185 let past_diff = from_block - latest_block;
186 if from_block == start_block_number {
187 from_block = latest_block;
190 } else if past_diff <= MAX_RPC_PAST_BLOCKS as u64 {
191 debug!(last_block = latest_block, start_block = start_block_number, blocks_diff = past_diff, "Indexer premature request. Block not found yet in RPC provider.");
193 futures_timer::Delay::new(past_diff as u32 * self.cfg.expected_block_time / 3).await;
194 continue;
195 } else {
196 panic!("indexer start block number {from_block} is greater than the chain latest block number {latest_block} (diff {past_diff}) =>
198 possible causes: chain reorg, RPC provider out of sync, corrupted DB =>
199 possible solutions: change the RPC provider, reinitialize the DB");
200 }
201 }
202
203
204 #[cfg(all(feature = "prometheus", not(test)))]
205 METRIC_RPC_CHAIN_HEAD.set(latest_block as f64);
206
207 let mut retrieved_logs = self.stream_logs(log_filters.clone(), from_block, latest_block);
208
209 trace!(from_block, to_block = latest_block, "processing batch");
210
211 let mut current_block_log = BlockWithLogs { block_id: from_block, ..Default::default()};
212
213 loop {
214 match retrieved_logs.next().await {
215 Some(Ok(log)) => {
216 if log.block_number > latest_block {
218 warn!(%log, latest_block, "got log that has not yet reached the finalized tip");
219 break;
220 }
221
222 if current_block_log.block_id > log.block_number {
224 error!(log_block_id = log.block_number, current_block_log.block_id, "received log from a previous block");
225 panic!("The on-chain logs are not ordered by block number. This is a critical error.");
226 }
227
228 if current_block_log.block_id < log.block_number {
230 debug!(block = %current_block_log, "completed block, moving to next");
231 yield current_block_log;
232
233 current_block_log = BlockWithLogs::default();
234 current_block_log.block_id = log.block_number;
235 }
236
237 debug!("retrieved {log}");
238 current_block_log.logs.insert(log.into());
239 },
240 None => {
241 break;
242 },
243 Some(Err(e)) => {
244 error!(error=%e, "failed to process blocks");
245 count_failures += 1;
246
247 if count_failures < MAX_LOOP_FAILURES {
248 from_block = current_block_log.block_id;
252 continue 'outer;
253 } else {
254 panic!("!!! Cannot advance the chain indexing due to unrecoverable RPC errors.
255
256 The RPC provider does not seem to be working correctly.
257
258 The last encountered error was: {e}");
259 }
260 }
261 }
262 }
263
264 debug!(block = %current_block_log, "completed block, processing batch finished");
266 yield current_block_log;
267 from_block = latest_block + 1;
268 count_failures = 0;
269 }
270
271 Err(e) => error!(error = %e, "failed to obtain current block number from chain")
272 }
273
274 futures_timer::Delay::new(self.cfg.expected_block_time).await;
275 }
276 }))
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use std::time::Duration;
283
284 use alloy::{
285 primitives::U256,
286 rpc::{client::ClientBuilder, types::Filter},
287 sol_types::SolEvent,
288 transports::{http::ReqwestTransport, layers::RetryBackoffLayer},
289 };
290 use anyhow::Context;
291 use futures::StreamExt;
292 use hopr_async_runtime::prelude::{sleep, spawn};
293 use hopr_bindings::{
294 hoprchannelsevents::HoprChannelsEvents::{ChannelBalanceIncreased, ChannelOpened},
295 hoprtoken::HoprToken::{Approval, Transfer},
296 };
297 use hopr_chain_types::{ContractAddresses, ContractInstances};
298 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
299 use tokio::time::timeout;
300 use tracing::debug;
301
302 use crate::{
303 BlockWithLogs, FilterSet, HoprIndexerRpcOperations,
304 client::create_rpc_client_to_anvil,
305 errors::RpcError,
306 indexer::split_range,
307 rpc::{RpcOperations, RpcOperationsConfig},
308 };
309
310 fn filter_bounds(filters: &[Filter]) -> anyhow::Result<(u64, u64)> {
311 let bounds = filters.iter().try_fold((0, 0), |acc, filter| {
312 let to = filter
313 .block_option
314 .get_from_block()
315 .context("a value should be present")?
316 .as_number()
317 .context("a value should be convertible")?;
318 let from = filter
319 .block_option
320 .get_to_block()
321 .context("a value should be present")?
322 .as_number()
323 .context("a value should be convertible")?;
324 let next = (to, from);
325
326 match acc {
327 (0, 0) => Ok(next), acc => {
329 if acc != next {
330 anyhow::bail!("range bounds are not equal across all filters");
331 }
332 Ok(acc)
333 }
334 }
335 })?;
336
337 Ok(bounds)
338 }
339
340 #[tokio::test]
341 async fn test_split_range() -> anyhow::Result<()> {
342 let filters = vec![Filter::default()];
343 let ranges = split_range(filters.clone(), 0, 10, 2).collect::<Vec<_>>().await;
344
345 assert_eq!(6, ranges.len());
346 assert_eq!((0, 1), filter_bounds(&ranges[0])?);
347 assert_eq!((2, 3), filter_bounds(&ranges[1])?);
348 assert_eq!((4, 5), filter_bounds(&ranges[2])?);
349 assert_eq!((6, 7), filter_bounds(&ranges[3])?);
350 assert_eq!((8, 9), filter_bounds(&ranges[4])?);
351 assert_eq!((10, 10), filter_bounds(&ranges[5])?);
352
353 let ranges = split_range(filters.clone(), 0, 0, 1).collect::<Vec<_>>().await;
354 assert_eq!(1, ranges.len());
355 assert_eq!((0, 0), filter_bounds(&ranges[0])?);
356
357 let ranges = split_range(filters.clone(), 0, 3, 1).collect::<Vec<_>>().await;
358 assert_eq!(4, ranges.len());
359 assert_eq!((0, 0), filter_bounds(&ranges[0])?);
360 assert_eq!((1, 1), filter_bounds(&ranges[1])?);
361 assert_eq!((2, 2), filter_bounds(&ranges[2])?);
362 assert_eq!((3, 3), filter_bounds(&ranges[3])?);
363
364 let ranges = split_range(filters.clone(), 0, 3, 10).collect::<Vec<_>>().await;
365 assert_eq!(1, ranges.len());
366 assert_eq!((0, 3), filter_bounds(&ranges[0])?);
367
368 Ok(())
369 }
370
371 #[tokio::test]
372 async fn test_should_get_block_number() -> anyhow::Result<()> {
373 let expected_block_time = Duration::from_secs(1);
374 let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
375 let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
376
377 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
378
379 let rpc_client = ClientBuilder::default()
380 .layer(RetryBackoffLayer::new(2, 100, 100))
381 .transport(transport_client.clone(), transport_client.guess_local());
382
383 let cfg = RpcOperationsConfig {
384 finality: 2,
385 expected_block_time,
386 gas_oracle_url: None,
387 ..RpcOperationsConfig::default()
388 };
389
390 sleep((1 + cfg.finality) * expected_block_time).await;
392
393 let rpc = RpcOperations::new(rpc_client, transport_client.client().clone(), &chain_key_0, cfg, None)?;
394
395 let b1 = rpc.block_number().await?;
396
397 sleep(expected_block_time * 2).await;
398
399 let b2 = rpc.block_number().await?;
400
401 assert!(b2 > b1, "block number should increase");
402
403 Ok(())
404 }
405
406 #[tokio::test]
407 async fn test_try_stream_logs_should_contain_all_logs_when_opening_channel() -> anyhow::Result<()> {
408 let _ = env_logger::builder().is_test(true).try_init();
409
410 let expected_block_time = Duration::from_secs(1);
411
412 let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
413 let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
414 let chain_key_1 = ChainKeypair::from_secret(anvil.keys()[1].to_bytes().as_ref())?;
415
416 let contract_instances = {
418 let client = create_rpc_client_to_anvil(&anvil, &chain_key_0);
419 ContractInstances::deploy_for_testing(client, &chain_key_0).await?
420 };
421
422 let contract_addrs = ContractAddresses::from(&contract_instances);
423
424 let filter_token_approval = alloy::rpc::types::Filter::new()
425 .address(alloy::primitives::Address::from(contract_addrs.token))
426 .event_signature(Approval::SIGNATURE_HASH);
427 let filter_token_transfer = alloy::rpc::types::Filter::new()
428 .address(alloy::primitives::Address::from(contract_addrs.token))
429 .event_signature(Transfer::SIGNATURE_HASH);
430 let filter_channels_opened = alloy::rpc::types::Filter::new()
431 .address(alloy::primitives::Address::from(contract_addrs.channels))
432 .event_signature(ChannelOpened::SIGNATURE_HASH);
433 let filter_channels_balance_increased = alloy::rpc::types::Filter::new()
434 .address(alloy::primitives::Address::from(contract_addrs.channels))
435 .event_signature(ChannelBalanceIncreased::SIGNATURE_HASH);
436
437 let log_filter = FilterSet {
438 all: vec![
439 filter_token_approval.clone(),
440 filter_token_transfer.clone(),
441 filter_channels_opened.clone(),
442 filter_channels_balance_increased.clone(),
443 ],
444 token: vec![filter_token_approval, filter_token_transfer],
445 no_token: vec![filter_channels_opened, filter_channels_balance_increased],
446 };
447
448 debug!("{:#?}", contract_addrs);
449 debug!("{:#?}", log_filter);
450
451 let tokens_minted_at =
452 hopr_chain_types::utils::mint_tokens(contract_instances.token.clone(), U256::from(1000_u128))
453 .await?
454 .unwrap();
455 debug!("tokens were minted at block {tokens_minted_at}");
456
457 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
458
459 let rpc_client = ClientBuilder::default()
460 .layer(RetryBackoffLayer::new(2, 100, 100))
461 .transport(transport_client.clone(), transport_client.guess_local());
462
463 let cfg = RpcOperationsConfig {
464 tx_polling_interval: Duration::from_millis(10),
465 contract_addrs,
466 expected_block_time,
467 gas_oracle_url: None,
468 ..RpcOperationsConfig::default()
469 };
470
471 sleep((1 + cfg.finality) * expected_block_time).await;
473
474 let rpc = RpcOperations::new(rpc_client, transport_client.client().clone(), &chain_key_0, cfg, None)?;
475
476 let count_filtered_topics = 2;
478 let retrieved_logs = spawn(async move {
479 Ok::<_, RpcError>(
480 rpc.try_stream_logs(1, log_filter, false)?
481 .skip_while(|b| futures::future::ready(b.len() != count_filtered_topics))
482 .next()
483 .await,
484 )
485 });
486
487 let _ = hopr_chain_types::utils::fund_channel(
489 chain_key_1.public().to_address(),
490 contract_instances.token,
491 contract_instances.channels,
492 U256::from(1_u128),
493 )
494 .await;
495
496 let retrieved_logs = timeout(Duration::from_secs(30), retrieved_logs) .await???;
498
499 let last_block_logs = retrieved_logs
501 .into_iter()
502 .next_back()
503 .context("a log should be present")?
504 .clone()
505 .logs;
506
507 let channel_open_filter = ChannelOpened::SIGNATURE_HASH;
508 let channel_balance_filter = ChannelBalanceIncreased::SIGNATURE_HASH;
509
510 debug!(
511 "channel_open_filter: {:?} - {:?}",
512 channel_open_filter,
513 channel_open_filter.0.to_vec()
514 );
515 debug!(
516 "channel_balance_filter: {:?} - {:?}",
517 channel_balance_filter,
518 channel_balance_filter.0.to_vec()
519 );
520 debug!("logs: {:#?}", last_block_logs);
521
522 assert!(
523 last_block_logs
524 .iter()
525 .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_open_filter.into())),
526 "must contain channel open"
527 );
528 assert!(
529 last_block_logs.iter().any(
530 |log| log.address == contract_addrs.channels && log.topics.contains(&channel_balance_filter.into())
531 ),
532 "must contain channel balance increase"
533 );
534
535 Ok(())
536 }
537
538 #[tokio::test]
539 async fn test_try_stream_logs_should_contain_only_channel_logs_when_filtered_on_funding_channel()
540 -> anyhow::Result<()> {
541 let _ = env_logger::builder().is_test(true).try_init();
542
543 let expected_block_time = Duration::from_secs(1);
544
545 let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
546 let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
547 let chain_key_1 = ChainKeypair::from_secret(anvil.keys()[1].to_bytes().as_ref())?;
548
549 let contract_instances = {
551 let client = create_rpc_client_to_anvil(&anvil, &chain_key_0);
552 ContractInstances::deploy_for_testing(client, &chain_key_0).await?
553 };
554
555 let tokens_minted_at =
556 hopr_chain_types::utils::mint_tokens(contract_instances.token.clone(), U256::from(1000_u128))
557 .await?
558 .unwrap();
559 debug!("tokens were minted at block {tokens_minted_at}");
560
561 let contract_addrs = ContractAddresses::from(&contract_instances);
562
563 let cfg = RpcOperationsConfig {
564 tx_polling_interval: Duration::from_millis(10),
565 contract_addrs,
566 expected_block_time,
567 finality: 2,
568 gas_oracle_url: None,
569 ..RpcOperationsConfig::default()
570 };
571
572 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
573
574 let rpc_client = ClientBuilder::default()
575 .layer(RetryBackoffLayer::new(2, 100, 100))
576 .transport(transport_client.clone(), transport_client.guess_local());
577
578 sleep((1 + cfg.finality) * expected_block_time).await;
580
581 let rpc = RpcOperations::new(rpc_client, transport_client.client().clone(), &chain_key_0, cfg, None)?;
582
583 let filter_channels_opened = alloy::rpc::types::Filter::new()
584 .address(alloy::primitives::Address::from(contract_addrs.channels))
585 .event_signature(ChannelOpened::SIGNATURE_HASH);
586 let filter_channels_balance_increased = alloy::rpc::types::Filter::new()
587 .address(alloy::primitives::Address::from(contract_addrs.channels))
588 .event_signature(ChannelBalanceIncreased::SIGNATURE_HASH);
589
590 let log_filter = FilterSet {
591 all: vec![
592 filter_channels_opened.clone(),
593 filter_channels_balance_increased.clone(),
594 ],
595 token: vec![],
596 no_token: vec![filter_channels_opened, filter_channels_balance_increased],
597 };
598
599 debug!("{:#?}", contract_addrs);
600 debug!("{:#?}", log_filter);
601
602 let count_filtered_topics = 2;
604 let retrieved_logs = spawn(async move {
605 Ok::<_, RpcError>(
606 rpc.try_stream_logs(1, log_filter, false)?
607 .skip_while(|b| futures::future::ready(b.len() != count_filtered_topics))
608 .take(1)
610 .collect::<Vec<BlockWithLogs>>()
611 .await,
612 )
613 });
614
615 let _ = hopr_chain_types::utils::fund_channel(
617 chain_key_1.public().to_address(),
618 contract_instances.token,
619 contract_instances.channels,
620 U256::from(1_u128),
621 )
622 .await;
623
624 let retrieved_logs = timeout(Duration::from_secs(30), retrieved_logs) .await???;
626
627 let last_block_logs = retrieved_logs
629 .first()
630 .context("a value should be present")?
631 .clone()
632 .logs;
633
634 let channel_open_filter: [u8; 32] = ChannelOpened::SIGNATURE_HASH.0;
635 let channel_balance_filter: [u8; 32] = ChannelBalanceIncreased::SIGNATURE_HASH.0;
636
637 assert!(
638 last_block_logs
639 .iter()
640 .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_open_filter)),
641 "must contain channel open"
642 );
643 assert!(
644 last_block_logs
645 .iter()
646 .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_balance_filter)),
647 "must contain channel balance increase"
648 );
649
650 Ok(())
651 }
652}