1use std::pin::Pin;
11
12use alloy::{providers::Provider, rpc::types::Filter};
13use async_stream::stream;
14use async_trait::async_trait;
15use futures::{Stream, StreamExt, stream::BoxStream};
16use hopr_primitive_types::prelude::*;
17use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
18use tracing::{debug, error, trace, warn};
19
20use crate::{
21 BlockWithLogs, FilterSet, HoprIndexerRpcOperations, Log,
22 errors::{Result, RpcError, RpcError::FilterIsEmpty},
23 rpc::RpcOperations,
24 transport::HttpRequestor,
25};
26
27#[cfg(all(feature = "prometheus", not(test)))]
28lazy_static::lazy_static! {
29 static ref METRIC_RPC_CHAIN_HEAD: hopr_metrics::SimpleGauge =
30 hopr_metrics::SimpleGauge::new(
31 "hopr_chain_head_block_number",
32 "Current block number of chain head",
33 ).unwrap();
34}
35
36fn split_range<'a>(
51 filters: Vec<Filter>,
52 from_block: u64,
53 to_block: u64,
54 max_chunk_size: u64,
55) -> BoxStream<'a, Vec<Filter>> {
56 assert!(from_block <= to_block, "invalid block range");
57 assert!(max_chunk_size > 0, "chunk size must be greater than 0");
58
59 futures::stream::unfold((from_block, to_block), move |(start, to)| {
60 if start <= to {
61 let end = to_block.min(start + max_chunk_size - 1);
62 let ranged_filters = filters
63 .iter()
64 .cloned()
65 .map(|f| f.from_block(start).to_block(end))
66 .collect::<Vec<_>>();
67 futures::future::ready(Some((ranged_filters, (end + 1, to))))
68 } else {
69 futures::future::ready(None)
70 }
71 })
72 .boxed()
73}
74
75impl<R: HttpRequestor + 'static + Clone> RpcOperations<R> {
77 fn stream_logs(&self, filters: Vec<Filter>, from_block: u64, to_block: u64) -> BoxStream<'_, Result<Log>> {
79 let fetch_ranges = split_range(filters, from_block, to_block, self.cfg.max_block_range_fetch_size);
80
81 debug!(
82 "polling logs from blocks #{from_block} - #{to_block} (via {:?} chunks)",
83 (to_block - from_block) / self.cfg.max_block_range_fetch_size + 1
84 );
85
86 fetch_ranges
87 .then(move |subrange_filters| async move {
88 let mut results = futures::stream::iter(subrange_filters)
89 .then_concurrent(|filter| async move {
90 let prov_clone = self.provider.clone();
91
92 match prov_clone.get_logs(&filter).await {
93 Ok(logs) => Ok(logs),
94 Err(e) => {
95 error!(
96 from = ?filter.get_from_block(),
97 to = ?filter.get_to_block(),
98 error = %e,
99 "failed to fetch logs in block subrange"
100 );
101 Err(e)
102 }
103 }
104 })
105 .flat_map(|result| {
106 futures::stream::iter(match result {
107 Ok(logs) => logs.into_iter().map(|log| Ok(Log::try_from(log)?)).collect::<Vec<_>>(),
108 Err(e) => vec![Err(RpcError::from(e))],
109 })
110 })
111 .collect::<Vec<_>>()
112 .await;
113
114 results.sort_by(|a, b| {
117 if let Ok(a) = a {
118 if let Ok(b) = b {
119 a.block_number.cmp(&b.block_number)
120 } else {
121 std::cmp::Ordering::Greater
122 }
123 } else {
124 std::cmp::Ordering::Less
125 }
126 });
127
128 futures::stream::iter(results)
129 })
130 .flatten()
131 .boxed()
132 }
133}
134
135#[async_trait]
136impl<R: HttpRequestor + 'static + Clone> HoprIndexerRpcOperations for RpcOperations<R> {
137 async fn block_number(&self) -> Result<u64> {
138 self.get_block_number().await
139 }
140
141 async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> Result<HoprBalance> {
142 self.get_hopr_allowance(owner, spender).await
143 }
144
145 async fn get_xdai_balance(&self, address: Address) -> Result<XDaiBalance> {
146 self.get_xdai_balance(address).await
147 }
148
149 async fn get_hopr_balance(&self, address: Address) -> Result<HoprBalance> {
150 self.get_hopr_balance(address).await
151 }
152
153 fn try_stream_logs<'a>(
154 &'a self,
155 start_block_number: u64,
156 filters: FilterSet,
157 is_synced: bool,
158 ) -> Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>> {
159 if filters.all.is_empty() {
160 return Err(FilterIsEmpty);
161 }
162
163 let log_filters = if !is_synced {
164 filters.no_token
167 } else {
168 filters.all
169 };
170
171 Ok(Box::pin(stream! {
172 let mut from_block = start_block_number;
174
175 const MAX_LOOP_FAILURES: usize = 5;
176 const MAX_RPC_PAST_BLOCKS: usize = 50;
177 let mut count_failures = 0;
178
179 'outer: loop {
180 match self.block_number().await {
181 Ok(latest_block) => {
182 if from_block > latest_block {
183 let past_diff = from_block - latest_block;
184 if from_block == start_block_number {
185 from_block = latest_block;
188 } else if past_diff <= MAX_RPC_PAST_BLOCKS as u64 {
189 debug!(last_block = latest_block, start_block = start_block_number, blocks_diff = past_diff, "Indexer premature request. Block not found yet in RPC provider.");
191 futures_timer::Delay::new(past_diff as u32 * self.cfg.expected_block_time / 3).await;
192 continue;
193 } else {
194 panic!("indexer start block number {from_block} is greater than the chain latest block number {latest_block} (diff {past_diff}) =>
196 possible causes: chain reorg, RPC provider out of sync, corrupted DB =>
197 possible solutions: change the RPC provider, reinitialize the DB");
198 }
199 }
200
201
202 #[cfg(all(feature = "prometheus", not(test)))]
203 METRIC_RPC_CHAIN_HEAD.set(latest_block as f64);
204
205 let mut retrieved_logs = self.stream_logs(log_filters.clone(), from_block, latest_block);
206
207 trace!(from_block, to_block = latest_block, "processing batch");
208
209 let mut current_block_log = BlockWithLogs { block_id: from_block, ..Default::default()};
210
211 loop {
212 match retrieved_logs.next().await {
213 Some(Ok(log)) => {
214 if log.block_number > latest_block {
216 warn!(%log, latest_block, "got log that has not yet reached the finalized tip");
217 break;
218 }
219
220 if current_block_log.block_id > log.block_number {
222 error!(log_block_id = log.block_number, current_block_log.block_id, "received log from a previous block");
223 panic!("The on-chain logs are not ordered by block number. This is a critical error.");
224 }
225
226 if current_block_log.block_id < log.block_number {
228 debug!(block = %current_block_log, "completed block, moving to next");
229 yield current_block_log;
230
231 current_block_log = BlockWithLogs::default();
232 current_block_log.block_id = log.block_number;
233 }
234
235 debug!("retrieved {log}");
236 current_block_log.logs.insert(log.into());
237 },
238 None => {
239 break;
240 },
241 Some(Err(e)) => {
242 error!(error=%e, "failed to process blocks");
243 count_failures += 1;
244
245 if count_failures < MAX_LOOP_FAILURES {
246 from_block = current_block_log.block_id;
250 continue 'outer;
251 } else {
252 panic!("!!! Cannot advance the chain indexing due to unrecoverable RPC errors.
253
254 The RPC provider does not seem to be working correctly.
255
256 The last encountered error was: {e}");
257 }
258 }
259 }
260 }
261
262 debug!(block = %current_block_log, "completed block, processing batch finished");
264 yield current_block_log;
265 from_block = latest_block + 1;
266 count_failures = 0;
267 }
268
269 Err(e) => error!(error = %e, "failed to obtain current block number from chain")
270 }
271
272 futures_timer::Delay::new(self.cfg.expected_block_time).await;
273 }
274 }))
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use std::time::Duration;
281
282 use alloy::{
283 primitives::U256,
284 rpc::{client::ClientBuilder, types::Filter},
285 sol_types::SolEvent,
286 transports::{http::ReqwestTransport, layers::RetryBackoffLayer},
287 };
288 use anyhow::Context;
289 use futures::StreamExt;
290 use hopr_async_runtime::prelude::{sleep, spawn};
291 use hopr_bindings::{
292 hoprchannelsevents::HoprChannelsEvents::{ChannelBalanceIncreased, ChannelOpened},
293 hoprtoken::HoprToken::{Approval, Transfer},
294 };
295 use hopr_chain_types::{ContractAddresses, ContractInstances};
296 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
297 use tokio::time::timeout;
298 use tracing::debug;
299
300 use crate::{
301 BlockWithLogs, FilterSet, HoprIndexerRpcOperations,
302 client::create_rpc_client_to_anvil,
303 errors::RpcError,
304 indexer::split_range,
305 rpc::{RpcOperations, RpcOperationsConfig},
306 };
307
308 fn filter_bounds(filters: &[Filter]) -> anyhow::Result<(u64, u64)> {
309 let bounds = filters.iter().try_fold((0, 0), |acc, filter| {
310 let to = filter
311 .block_option
312 .get_from_block()
313 .context("a value should be present")?
314 .as_number()
315 .context("a value should be convertible")?;
316 let from = filter
317 .block_option
318 .get_to_block()
319 .context("a value should be present")?
320 .as_number()
321 .context("a value should be convertible")?;
322 let next = (to, from);
323
324 match acc {
325 (0, 0) => Ok(next), acc => {
327 if acc != next {
328 anyhow::bail!("range bounds are not equal across all filters");
329 }
330 Ok(acc)
331 }
332 }
333 })?;
334
335 Ok(bounds)
336 }
337
338 #[tokio::test]
339 async fn test_split_range() -> anyhow::Result<()> {
340 let filters = vec![Filter::default()];
341 let ranges = split_range(filters.clone(), 0, 10, 2).collect::<Vec<_>>().await;
342
343 assert_eq!(6, ranges.len());
344 assert_eq!((0, 1), filter_bounds(&ranges[0])?);
345 assert_eq!((2, 3), filter_bounds(&ranges[1])?);
346 assert_eq!((4, 5), filter_bounds(&ranges[2])?);
347 assert_eq!((6, 7), filter_bounds(&ranges[3])?);
348 assert_eq!((8, 9), filter_bounds(&ranges[4])?);
349 assert_eq!((10, 10), filter_bounds(&ranges[5])?);
350
351 let ranges = split_range(filters.clone(), 0, 0, 1).collect::<Vec<_>>().await;
352 assert_eq!(1, ranges.len());
353 assert_eq!((0, 0), filter_bounds(&ranges[0])?);
354
355 let ranges = split_range(filters.clone(), 0, 3, 1).collect::<Vec<_>>().await;
356 assert_eq!(4, ranges.len());
357 assert_eq!((0, 0), filter_bounds(&ranges[0])?);
358 assert_eq!((1, 1), filter_bounds(&ranges[1])?);
359 assert_eq!((2, 2), filter_bounds(&ranges[2])?);
360 assert_eq!((3, 3), filter_bounds(&ranges[3])?);
361
362 let ranges = split_range(filters.clone(), 0, 3, 10).collect::<Vec<_>>().await;
363 assert_eq!(1, ranges.len());
364 assert_eq!((0, 3), filter_bounds(&ranges[0])?);
365
366 Ok(())
367 }
368
369 #[tokio::test]
370 async fn test_should_get_block_number() -> anyhow::Result<()> {
371 let expected_block_time = Duration::from_secs(1);
372 let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
373 let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
374
375 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
376
377 let rpc_client = ClientBuilder::default()
378 .layer(RetryBackoffLayer::new(2, 100, 100))
379 .transport(transport_client.clone(), transport_client.guess_local());
380
381 let cfg = RpcOperationsConfig {
382 finality: 2,
383 expected_block_time,
384 gas_oracle_url: None,
385 ..RpcOperationsConfig::default()
386 };
387
388 sleep((1 + cfg.finality) * expected_block_time).await;
390
391 let rpc = RpcOperations::new(rpc_client, transport_client.client().clone(), &chain_key_0, cfg, None)?;
392
393 let b1 = rpc.block_number().await?;
394
395 sleep(expected_block_time * 2).await;
396
397 let b2 = rpc.block_number().await?;
398
399 assert!(b2 > b1, "block number should increase");
400
401 Ok(())
402 }
403
404 #[tokio::test]
405 async fn test_try_stream_logs_should_contain_all_logs_when_opening_channel() -> anyhow::Result<()> {
406 let _ = env_logger::builder().is_test(true).try_init();
407
408 let expected_block_time = Duration::from_secs(1);
409
410 let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
411 let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
412 let chain_key_1 = ChainKeypair::from_secret(anvil.keys()[1].to_bytes().as_ref())?;
413
414 let contract_instances = {
416 let client = create_rpc_client_to_anvil(&anvil, &chain_key_0);
417 ContractInstances::deploy_for_testing(client, &chain_key_0).await?
418 };
419
420 let contract_addrs = ContractAddresses::from(&contract_instances);
421
422 let filter_token_approval = alloy::rpc::types::Filter::new()
423 .address(alloy::primitives::Address::from(contract_addrs.token))
424 .event_signature(Approval::SIGNATURE_HASH);
425 let filter_token_transfer = alloy::rpc::types::Filter::new()
426 .address(alloy::primitives::Address::from(contract_addrs.token))
427 .event_signature(Transfer::SIGNATURE_HASH);
428 let filter_channels_opened = alloy::rpc::types::Filter::new()
429 .address(alloy::primitives::Address::from(contract_addrs.channels))
430 .event_signature(ChannelOpened::SIGNATURE_HASH);
431 let filter_channels_balance_increased = alloy::rpc::types::Filter::new()
432 .address(alloy::primitives::Address::from(contract_addrs.channels))
433 .event_signature(ChannelBalanceIncreased::SIGNATURE_HASH);
434
435 let log_filter = FilterSet {
436 all: vec![
437 filter_token_approval.clone(),
438 filter_token_transfer.clone(),
439 filter_channels_opened.clone(),
440 filter_channels_balance_increased.clone(),
441 ],
442 token: vec![filter_token_approval, filter_token_transfer],
443 no_token: vec![filter_channels_opened, filter_channels_balance_increased],
444 };
445
446 debug!("{:#?}", contract_addrs);
447 debug!("{:#?}", log_filter);
448
449 let tokens_minted_at =
450 hopr_chain_types::utils::mint_tokens(contract_instances.token.clone(), U256::from(1000_u128))
451 .await?
452 .unwrap();
453 debug!("tokens were minted at block {tokens_minted_at}");
454
455 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
456
457 let rpc_client = ClientBuilder::default()
458 .layer(RetryBackoffLayer::new(2, 100, 100))
459 .transport(transport_client.clone(), transport_client.guess_local());
460
461 let cfg = RpcOperationsConfig {
462 tx_polling_interval: Duration::from_millis(10),
463 contract_addrs,
464 expected_block_time,
465 gas_oracle_url: None,
466 ..RpcOperationsConfig::default()
467 };
468
469 sleep((1 + cfg.finality) * expected_block_time).await;
471
472 let rpc = RpcOperations::new(rpc_client, transport_client.client().clone(), &chain_key_0, cfg, None)?;
473
474 let count_filtered_topics = 2;
476 let retrieved_logs = spawn(async move {
477 Ok::<_, RpcError>(
478 rpc.try_stream_logs(1, log_filter, false)?
479 .skip_while(|b| futures::future::ready(b.len() != count_filtered_topics))
480 .next()
481 .await,
482 )
483 });
484
485 let _ = hopr_chain_types::utils::fund_channel(
487 chain_key_1.public().to_address(),
488 contract_instances.token,
489 contract_instances.channels,
490 U256::from(1_u128),
491 )
492 .await;
493
494 let retrieved_logs = timeout(Duration::from_secs(30), retrieved_logs) .await???;
496
497 let last_block_logs = retrieved_logs
499 .into_iter()
500 .next_back()
501 .context("a log should be present")?
502 .clone()
503 .logs;
504
505 let channel_open_filter = ChannelOpened::SIGNATURE_HASH;
506 let channel_balance_filter = ChannelBalanceIncreased::SIGNATURE_HASH;
507
508 debug!(
509 "channel_open_filter: {:?} - {:?}",
510 channel_open_filter,
511 channel_open_filter.0.to_vec()
512 );
513 debug!(
514 "channel_balance_filter: {:?} - {:?}",
515 channel_balance_filter,
516 channel_balance_filter.0.to_vec()
517 );
518 debug!("logs: {:#?}", last_block_logs);
519
520 assert!(
521 last_block_logs
522 .iter()
523 .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_open_filter.into())),
524 "must contain channel open"
525 );
526 assert!(
527 last_block_logs.iter().any(
528 |log| log.address == contract_addrs.channels && log.topics.contains(&channel_balance_filter.into())
529 ),
530 "must contain channel balance increase"
531 );
532
533 Ok(())
534 }
535
536 #[tokio::test]
537 async fn test_try_stream_logs_should_contain_only_channel_logs_when_filtered_on_funding_channel()
538 -> anyhow::Result<()> {
539 let _ = env_logger::builder().is_test(true).try_init();
540
541 let expected_block_time = Duration::from_secs(1);
542
543 let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
544 let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
545 let chain_key_1 = ChainKeypair::from_secret(anvil.keys()[1].to_bytes().as_ref())?;
546
547 let contract_instances = {
549 let client = create_rpc_client_to_anvil(&anvil, &chain_key_0);
550 ContractInstances::deploy_for_testing(client, &chain_key_0).await?
551 };
552
553 let tokens_minted_at =
554 hopr_chain_types::utils::mint_tokens(contract_instances.token.clone(), U256::from(1000_u128))
555 .await?
556 .unwrap();
557 debug!("tokens were minted at block {tokens_minted_at}");
558
559 let contract_addrs = ContractAddresses::from(&contract_instances);
560
561 let cfg = RpcOperationsConfig {
562 tx_polling_interval: Duration::from_millis(10),
563 contract_addrs,
564 expected_block_time,
565 finality: 2,
566 gas_oracle_url: None,
567 ..RpcOperationsConfig::default()
568 };
569
570 let transport_client = ReqwestTransport::new(anvil.endpoint_url());
571
572 let rpc_client = ClientBuilder::default()
573 .layer(RetryBackoffLayer::new(2, 100, 100))
574 .transport(transport_client.clone(), transport_client.guess_local());
575
576 sleep((1 + cfg.finality) * expected_block_time).await;
578
579 let rpc = RpcOperations::new(rpc_client, transport_client.client().clone(), &chain_key_0, cfg, None)?;
580
581 let filter_channels_opened = alloy::rpc::types::Filter::new()
582 .address(alloy::primitives::Address::from(contract_addrs.channels))
583 .event_signature(ChannelOpened::SIGNATURE_HASH);
584 let filter_channels_balance_increased = alloy::rpc::types::Filter::new()
585 .address(alloy::primitives::Address::from(contract_addrs.channels))
586 .event_signature(ChannelBalanceIncreased::SIGNATURE_HASH);
587
588 let log_filter = FilterSet {
589 all: vec![
590 filter_channels_opened.clone(),
591 filter_channels_balance_increased.clone(),
592 ],
593 token: vec![],
594 no_token: vec![filter_channels_opened, filter_channels_balance_increased],
595 };
596
597 debug!("{:#?}", contract_addrs);
598 debug!("{:#?}", log_filter);
599
600 let count_filtered_topics = 2;
602 let retrieved_logs = spawn(async move {
603 Ok::<_, RpcError>(
604 rpc.try_stream_logs(1, log_filter, false)?
605 .skip_while(|b| futures::future::ready(b.len() != count_filtered_topics))
606 .take(1)
608 .collect::<Vec<BlockWithLogs>>()
609 .await,
610 )
611 });
612
613 let _ = hopr_chain_types::utils::fund_channel(
615 chain_key_1.public().to_address(),
616 contract_instances.token,
617 contract_instances.channels,
618 U256::from(1_u128),
619 )
620 .await;
621
622 let retrieved_logs = timeout(Duration::from_secs(30), retrieved_logs) .await???;
624
625 let last_block_logs = retrieved_logs
627 .first()
628 .context("a value should be present")?
629 .clone()
630 .logs;
631
632 let channel_open_filter: [u8; 32] = ChannelOpened::SIGNATURE_HASH.0;
633 let channel_balance_filter: [u8; 32] = ChannelBalanceIncreased::SIGNATURE_HASH.0;
634
635 assert!(
636 last_block_logs
637 .iter()
638 .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_open_filter)),
639 "must contain channel open"
640 );
641 assert!(
642 last_block_logs
643 .iter()
644 .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_balance_filter)),
645 "must contain channel balance increase"
646 );
647
648 Ok(())
649 }
650}