1use async_stream::stream;
11use async_trait::async_trait;
12use ethers::providers::{JsonRpcClient, Middleware};
13use futures::stream::BoxStream;
14use futures::{Stream, StreamExt};
15use std::pin::Pin;
16use tracing::{debug, error, trace, warn};
17
18use crate::errors::{Result, RpcError, RpcError::FilterIsEmpty};
19use crate::rpc::RpcOperations;
20use crate::{BlockWithLogs, HoprIndexerRpcOperations, HttpRequestor, Log, LogFilter};
21
22#[cfg(all(feature = "prometheus", not(test)))]
23use hopr_metrics::metrics::SimpleGauge;
24
25#[cfg(all(feature = "prometheus", not(test)))]
26lazy_static::lazy_static! {
27 static ref METRIC_RPC_CHAIN_HEAD: SimpleGauge =
28 SimpleGauge::new(
29 "hopr_chain_head_block_number",
30 "Current block number of chain head",
31 ).unwrap();
32}
33
34fn split_range<'a>(
38 filter: LogFilter,
39 from_block: u64,
40 to_block: u64,
41 max_chunk_size: u64,
42) -> BoxStream<'a, ethers::types::Filter> {
43 assert!(from_block <= to_block, "invalid block range");
44 assert!(max_chunk_size > 0, "chunk size must be greater than 0");
45
46 futures::stream::unfold((from_block, to_block), move |(start, to)| {
47 if start <= to {
48 let end = to_block.min(start + max_chunk_size - 1);
49 let filter = ethers::types::Filter::from(filter.clone())
50 .from_block(start)
51 .to_block(end);
52 futures::future::ready(Some((filter, (end + 1, to))))
53 } else {
54 futures::future::ready(None)
55 }
56 })
57 .boxed()
58}
59
60impl<P: JsonRpcClient + 'static, R: HttpRequestor + 'static> RpcOperations<P, R> {
61 fn stream_logs(&self, filter: LogFilter, from_block: u64, to_block: u64) -> BoxStream<Result<Log>> {
63 let fetch_ranges = split_range(filter, from_block, to_block, self.cfg.max_block_range_fetch_size);
64
65 debug!(
66 "polling logs from blocks #{from_block} - #{to_block} (via {:?} chunks)",
67 (to_block - from_block) / self.cfg.max_block_range_fetch_size + 1
68 );
69
70 fetch_ranges
71 .then(|subrange| {
72 let prov_clone = self.provider.clone();
73
74 async move {
75 trace!(
76 from = ?subrange.get_from_block(),
77 to = ?subrange.get_to_block(),
78 "fetching logs in block subrange"
79 );
80 match prov_clone.get_logs(&subrange).await {
81 Ok(logs) => Ok(logs),
82 Err(e) => {
83 error!(
84 from = ?subrange.get_from_block(),
85 to = ?subrange.get_to_block(),
86 error = %e,
87 "failed to fetch logs in block subrange"
88 );
89 Err(e)
90 }
91 }
92 }
93 })
94 .flat_map(|result| {
95 futures::stream::iter(match result {
96 Ok(logs) => logs.into_iter().map(|log| Ok(Log::from(log))).collect::<Vec<_>>(),
97 Err(e) => vec![Err(RpcError::from(e))],
98 })
99 })
100 .boxed()
101 }
102}
103
104#[async_trait]
105impl<P: JsonRpcClient + 'static, R: HttpRequestor + 'static> HoprIndexerRpcOperations for RpcOperations<P, R> {
106 async fn block_number(&self) -> Result<u64> {
107 self.get_block_number().await
108 }
109
110 fn try_stream_logs<'a>(
111 &'a self,
112 start_block_number: u64,
113 filter: LogFilter,
114 ) -> Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>> {
115 if filter.is_empty() {
116 return Err(FilterIsEmpty);
117 }
118
119 Ok(Box::pin(stream! {
120 let mut from_block = start_block_number;
122
123 const MAX_LOOP_FAILURES: usize = 5;
124 const MAX_RPC_PAST_BLOCKS: usize = 50;
125 let mut count_failures = 0;
126
127 'outer: loop {
128 match self.block_number().await {
129 Ok(latest_block) => {
130 if from_block > latest_block {
131 let past_diff = from_block - latest_block;
132 if from_block == start_block_number {
133 from_block = latest_block;
136 } else if past_diff <= MAX_RPC_PAST_BLOCKS as u64 {
137 debug!(last_block = latest_block, start_block = start_block_number, blocks_diff = past_diff, "Indexer premature request. Block not found yet in RPC provider.");
139 futures_timer::Delay::new(past_diff as u32 * self.cfg.expected_block_time / 3).await;
140 continue;
141 } else {
142 panic!("indexer start block number {from_block} is greater than the chain latest block number {latest_block} (diff {past_diff}) =>
144 possible causes: chain reorg, RPC provider out of sync, corrupted DB =>
145 possible solutions: change the RPC provider, reinitialize the DB");
146 }
147 }
148
149
150 #[cfg(all(feature = "prometheus", not(test)))]
151 METRIC_RPC_CHAIN_HEAD.set(latest_block as f64);
152
153 let mut retrieved_logs = self.stream_logs(filter.clone(), from_block, latest_block);
154
155 trace!(from_block, to_block = latest_block, "processing batch");
156
157 let mut current_block_log = BlockWithLogs { block_id: from_block, ..Default::default()};
158
159 loop {
160 match retrieved_logs.next().await {
161 Some(Ok(log)) => {
162 if log.block_number > latest_block {
164 warn!(%log, latest_block, "got log that has not yet reached the finalized tip");
165 break;
166 }
167
168 if current_block_log.block_id > log.block_number {
170 error!(log_block_id = log.block_number, current_block_log.block_id, "received log from a previous block");
171 panic!("The on-chain logs are not ordered by block number. This is a critical error.");
172 }
173
174 if current_block_log.block_id < log.block_number {
176 debug!(block = %current_block_log, "completed block, moving to next");
177 yield current_block_log;
178
179 current_block_log = BlockWithLogs::default();
180 current_block_log.block_id = log.block_number;
181 }
182
183 debug!("retrieved {log}");
184 current_block_log.logs.insert(log.into());
185 },
186 None => {
187 break;
188 },
189 Some(Err(e)) => {
190 error!(error=%e, "failed to process blocks");
191 count_failures += 1;
192
193 if count_failures < MAX_LOOP_FAILURES {
194 from_block = current_block_log.block_id;
198 continue 'outer;
199 } else {
200 panic!("!!! Cannot advance the chain indexing due to unrecoverable RPC errors.
201
202 The RPC provider does not seem to be working correctly.
203
204 The last encountered error was: {e}");
205 }
206 }
207 }
208 }
209
210 debug!(block = %current_block_log, "completed block, processing batch finished");
212 yield current_block_log;
213 from_block = latest_block + 1;
214 count_failures = 0;
215 }
216
217 Err(e) => error!(error = %e, "failed to obtain current block number from chain")
218 }
219
220 futures_timer::Delay::new(self.cfg.expected_block_time).await;
221 }
222 }))
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use anyhow::Context;
229 use async_std::prelude::FutureExt;
230 use ethers::contract::EthEvent;
231 use futures::StreamExt;
232 use std::time::Duration;
233 use tracing::debug;
234
235 use hopr_async_runtime::prelude::{sleep, spawn};
236 use hopr_bindings::hopr_channels::*;
237 use hopr_bindings::hopr_token::{ApprovalFilter, TransferFilter};
238 use hopr_chain_types::{ContractAddresses, ContractInstances};
239 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
240
241 use crate::client::surf_client::SurfRequestor;
242 use crate::client::{create_rpc_client_to_anvil, JsonRpcProviderClient, SimpleJsonRpcRetryPolicy};
243 use crate::errors::RpcError;
244 use crate::indexer::split_range;
245 use crate::rpc::{RpcOperations, RpcOperationsConfig};
246 use crate::{BlockWithLogs, HoprIndexerRpcOperations, LogFilter};
247
248 fn filter_bounds(filter: ðers::types::Filter) -> anyhow::Result<(u64, u64)> {
249 Ok((
250 filter
251 .block_option
252 .get_from_block()
253 .context("a value should be present")?
254 .as_number()
255 .context("a value should be convertible")?
256 .as_u64(),
257 filter
258 .block_option
259 .get_to_block()
260 .context("a value should be present")?
261 .as_number()
262 .context("a value should be convertible")?
263 .as_u64(),
264 ))
265 }
266
267 #[async_std::test]
268 async fn test_split_range() -> anyhow::Result<()> {
269 let ranges = split_range(LogFilter::default(), 0, 10, 2).collect::<Vec<_>>().await;
270
271 assert_eq!(6, ranges.len());
272 assert_eq!((0, 1), filter_bounds(&ranges[0])?);
273 assert_eq!((2, 3), filter_bounds(&ranges[1])?);
274 assert_eq!((4, 5), filter_bounds(&ranges[2])?);
275 assert_eq!((6, 7), filter_bounds(&ranges[3])?);
276 assert_eq!((8, 9), filter_bounds(&ranges[4])?);
277 assert_eq!((10, 10), filter_bounds(&ranges[5])?);
278
279 let ranges = split_range(LogFilter::default(), 0, 0, 2).collect::<Vec<_>>().await;
280 assert_eq!(1, ranges.len());
281 assert_eq!((0, 0), filter_bounds(&ranges[0])?);
282
283 let ranges = split_range(LogFilter::default(), 0, 0, 1).collect::<Vec<_>>().await;
284 assert_eq!(1, ranges.len());
285 assert_eq!((0, 0), filter_bounds(&ranges[0])?);
286
287 let ranges = split_range(LogFilter::default(), 0, 3, 1).collect::<Vec<_>>().await;
288 assert_eq!(4, ranges.len());
289 assert_eq!((0, 0), filter_bounds(&ranges[0])?);
290 assert_eq!((1, 1), filter_bounds(&ranges[1])?);
291 assert_eq!((2, 2), filter_bounds(&ranges[2])?);
292 assert_eq!((3, 3), filter_bounds(&ranges[3])?);
293
294 let ranges = split_range(LogFilter::default(), 0, 3, 10).collect::<Vec<_>>().await;
295 assert_eq!(1, ranges.len());
296 assert_eq!((0, 3), filter_bounds(&ranges[0])?);
297
298 Ok(())
299 }
300
301 #[async_std::test]
302 async fn test_should_get_block_number() -> anyhow::Result<()> {
303 let expected_block_time = Duration::from_secs(1);
304 let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
305 let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
306
307 let client = JsonRpcProviderClient::new(
308 &anvil.endpoint(),
309 SurfRequestor::default(),
310 SimpleJsonRpcRetryPolicy::default(),
311 );
312
313 let cfg = RpcOperationsConfig {
314 finality: 2,
315 expected_block_time,
316 gas_oracle_url: None,
317 ..RpcOperationsConfig::default()
318 };
319
320 sleep((1 + cfg.finality) * expected_block_time).await;
322
323 let rpc = RpcOperations::new(client, SurfRequestor::default(), &chain_key_0, cfg)?;
324
325 let b1 = rpc.block_number().await?;
326
327 sleep(expected_block_time * 2).await;
328
329 let b2 = rpc.block_number().await?;
330
331 assert!(b2 > b1, "block number should increase");
332
333 Ok(())
334 }
335
336 #[async_std::test]
337 async fn test_try_stream_logs_should_contain_all_logs_when_opening_channel() -> anyhow::Result<()> {
338 let _ = env_logger::builder().is_test(true).try_init();
339
340 let expected_block_time = Duration::from_secs(1);
341
342 let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
343 let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
344 let chain_key_1 = ChainKeypair::from_secret(anvil.keys()[1].to_bytes().as_ref())?;
345
346 let contract_instances = {
348 let client = create_rpc_client_to_anvil(SurfRequestor::default(), &anvil, &chain_key_0);
349 ContractInstances::deploy_for_testing(client, &chain_key_0).await?
350 };
351
352 let tokens_minted_at =
353 hopr_chain_types::utils::mint_tokens(contract_instances.token.clone(), 1000_u128.into()).await;
354 debug!("tokens were minted at block {tokens_minted_at}");
355
356 let contract_addrs = ContractAddresses::from(&contract_instances);
357
358 let cfg = RpcOperationsConfig {
359 tx_polling_interval: Duration::from_millis(10),
360 contract_addrs,
361 expected_block_time,
362 gas_oracle_url: None,
363 ..RpcOperationsConfig::default()
364 };
365
366 let client = JsonRpcProviderClient::new(
367 &anvil.endpoint(),
368 SurfRequestor::default(),
369 SimpleJsonRpcRetryPolicy::default(),
370 );
371
372 sleep((1 + cfg.finality) * expected_block_time).await;
374
375 let rpc = RpcOperations::new(client, SurfRequestor::default(), &chain_key_0, cfg)?;
376
377 let log_filter = LogFilter {
378 address: vec![contract_addrs.token, contract_addrs.channels],
379 topics: vec![
380 TransferFilter::signature().into(),
381 ApprovalFilter::signature().into(),
382 ChannelOpenedFilter::signature().into(),
383 ChannelBalanceIncreasedFilter::signature().into(),
384 ],
385 };
386
387 debug!("{:#?}", contract_addrs);
388 debug!("{:#?}", log_filter);
389
390 let count_filtered_topics = log_filter.topics.len();
392 let retrieved_logs = spawn(async move {
393 Ok::<_, RpcError>(
394 rpc.try_stream_logs(1, log_filter)?
395 .skip_while(|b| futures::future::ready(b.len() != count_filtered_topics))
396 .take(1)
397 .collect::<Vec<BlockWithLogs>>()
398 .await,
399 )
400 });
401
402 hopr_chain_types::utils::fund_channel(
404 chain_key_1.public().to_address(),
405 contract_instances.token,
406 contract_instances.channels,
407 1_u128.into(),
408 )
409 .await;
410
411 let retrieved_logs = retrieved_logs
412 .timeout(Duration::from_secs(30)) .await??;
414
415 let last_block_logs = retrieved_logs.last().context("a log should be present")?.clone().logs;
417 let channel_open_filter = ChannelOpenedFilter::signature();
418 let channel_balance_filter = ChannelBalanceIncreasedFilter::signature();
419 let approval_filter = ApprovalFilter::signature();
420 let transfer_filter = TransferFilter::signature();
421
422 debug!(
423 "channel_open_filter: {:?} - {:?}",
424 channel_open_filter,
425 channel_open_filter.as_ref().to_vec()
426 );
427 debug!(
428 "channel_balance_filter: {:?} - {:?}",
429 channel_balance_filter,
430 channel_balance_filter.as_ref().to_vec()
431 );
432 debug!(
433 "approval_filter: {:?} - {:?}",
434 approval_filter,
435 approval_filter.as_ref().to_vec()
436 );
437 debug!(
438 "transfer_filter: {:?} - {:?}",
439 transfer_filter,
440 transfer_filter.as_ref().to_vec()
441 );
442 debug!("logs: {:#?}", last_block_logs);
443
444 assert!(
445 last_block_logs
446 .iter()
447 .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_open_filter.into())),
448 "must contain channel open"
449 );
450 assert!(
451 last_block_logs.iter().any(
452 |log| log.address == contract_addrs.channels && log.topics.contains(&channel_balance_filter.into())
453 ),
454 "must contain channel balance increase"
455 );
456 assert!(
457 last_block_logs
458 .iter()
459 .any(|log| log.address == contract_addrs.token && log.topics.contains(&approval_filter.into())),
460 "must contain token approval"
461 );
462 assert!(
463 last_block_logs
464 .iter()
465 .any(|log| log.address == contract_addrs.token && log.topics.contains(&transfer_filter.into())),
466 "must contain token transfer"
467 );
468
469 Ok(())
470 }
471
472 #[async_std::test]
473 async fn test_try_stream_logs_should_contain_only_channel_logs_when_filtered_on_funding_channel(
474 ) -> anyhow::Result<()> {
475 let _ = env_logger::builder().is_test(true).try_init();
476
477 let expected_block_time = Duration::from_secs(1);
478
479 let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
480 let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
481 let chain_key_1 = ChainKeypair::from_secret(anvil.keys()[1].to_bytes().as_ref())?;
482
483 let contract_instances = {
485 let client = create_rpc_client_to_anvil(SurfRequestor::default(), &anvil, &chain_key_0);
486 ContractInstances::deploy_for_testing(client, &chain_key_0).await?
487 };
488
489 let tokens_minted_at =
490 hopr_chain_types::utils::mint_tokens(contract_instances.token.clone(), 1000_u128.into()).await;
491 debug!("tokens were minted at block {tokens_minted_at}");
492
493 let contract_addrs = ContractAddresses::from(&contract_instances);
494
495 let cfg = RpcOperationsConfig {
496 tx_polling_interval: Duration::from_millis(10),
497 contract_addrs,
498 expected_block_time,
499 finality: 2,
500 gas_oracle_url: None,
501 ..RpcOperationsConfig::default()
502 };
503
504 let client = JsonRpcProviderClient::new(
505 &anvil.endpoint(),
506 SurfRequestor::default(),
507 SimpleJsonRpcRetryPolicy::default(),
508 );
509
510 sleep((1 + cfg.finality) * expected_block_time).await;
512
513 let rpc = RpcOperations::new(client, SurfRequestor::default(), &chain_key_0, cfg)?;
514
515 let log_filter = LogFilter {
516 address: vec![contract_addrs.channels],
517 topics: vec![
518 ChannelOpenedFilter::signature().into(),
519 ChannelBalanceIncreasedFilter::signature().into(),
520 ],
521 };
522
523 debug!("{:#?}", contract_addrs);
524 debug!("{:#?}", log_filter);
525
526 let count_filtered_topics = log_filter.topics.len();
528 let retrieved_logs = spawn(async move {
529 Ok::<_, RpcError>(
530 rpc.try_stream_logs(1, log_filter)?
531 .skip_while(|b| futures::future::ready(b.len() != count_filtered_topics))
532 .take(1)
533 .collect::<Vec<BlockWithLogs>>()
534 .await,
535 )
536 });
537
538 hopr_chain_types::utils::fund_channel(
540 chain_key_1.public().to_address(),
541 contract_instances.token,
542 contract_instances.channels,
543 1_u128.into(),
544 )
545 .await;
546
547 let retrieved_logs = retrieved_logs
548 .timeout(Duration::from_secs(30)) .await??;
550
551 let last_block_logs = retrieved_logs
553 .first()
554 .context("a value should be present")?
555 .clone()
556 .logs;
557 let channel_open_filter: [u8; 32] = ChannelOpenedFilter::signature().into();
558 let channel_balance_filter: [u8; 32] = ChannelBalanceIncreasedFilter::signature().into();
559
560 assert!(
561 last_block_logs
562 .iter()
563 .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_open_filter)),
564 "must contain channel open"
565 );
566 assert!(
567 last_block_logs
568 .iter()
569 .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_balance_filter)),
570 "must contain channel balance increase"
571 );
572
573 Ok(())
574 }
575}