hopr_chain_rpc/
indexer.rs

1//! Extends the [RpcOperations] type with functionality needed by the Indexer component.
2//!
3//! The functionality required functionality is defined in the [HoprIndexerRpcOperations] trait,
4//! which is implemented for [RpcOperations] hereof.
5//! The primary goal is to provide a stream of [BlockWithLogs] filtered by the given [LogFilter]
6//! as the new matching blocks are mined in the underlying blockchain. The stream also allows to collect
7//! historical blockchain data.
8//!
9//! For details on the Indexer see the `chain-indexer` crate.
10use async_stream::stream;
11use async_trait::async_trait;
12use ethers::providers::{JsonRpcClient, Middleware};
13use futures::stream::BoxStream;
14use futures::{Stream, StreamExt};
15use std::pin::Pin;
16use tracing::{debug, error, trace, warn};
17
18use crate::errors::{Result, RpcError, RpcError::FilterIsEmpty};
19use crate::rpc::RpcOperations;
20use crate::{BlockWithLogs, HoprIndexerRpcOperations, HttpRequestor, Log, LogFilter};
21
22#[cfg(all(feature = "prometheus", not(test)))]
23use hopr_metrics::metrics::SimpleGauge;
24
25#[cfg(all(feature = "prometheus", not(test)))]
26lazy_static::lazy_static! {
27    static ref METRIC_RPC_CHAIN_HEAD: SimpleGauge =
28        SimpleGauge::new(
29            "hopr_chain_head_block_number",
30            "Current block number of chain head",
31    ).unwrap();
32}
33
34/// Splits the range between `from_block` and `to_block` (inclusive)
35/// to chunks of maximum size `max_chunk_size` and creates [ethers::types::Filter] for each chunk
36/// using the given [LogFilter].
37fn split_range<'a>(
38    filter: LogFilter,
39    from_block: u64,
40    to_block: u64,
41    max_chunk_size: u64,
42) -> BoxStream<'a, ethers::types::Filter> {
43    assert!(from_block <= to_block, "invalid block range");
44    assert!(max_chunk_size > 0, "chunk size must be greater than 0");
45
46    futures::stream::unfold((from_block, to_block), move |(start, to)| {
47        if start <= to {
48            let end = to_block.min(start + max_chunk_size - 1);
49            let filter = ethers::types::Filter::from(filter.clone())
50                .from_block(start)
51                .to_block(end);
52            futures::future::ready(Some((filter, (end + 1, to))))
53        } else {
54            futures::future::ready(None)
55        }
56    })
57    .boxed()
58}
59
60impl<P: JsonRpcClient + 'static, R: HttpRequestor + 'static> RpcOperations<P, R> {
61    /// Retrieves logs in the given range (`from_block` and `to_block` are inclusive).
62    fn stream_logs(&self, filter: LogFilter, from_block: u64, to_block: u64) -> BoxStream<Result<Log>> {
63        let fetch_ranges = split_range(filter, from_block, to_block, self.cfg.max_block_range_fetch_size);
64
65        debug!(
66            "polling logs from blocks #{from_block} - #{to_block} (via {:?} chunks)",
67            (to_block - from_block) / self.cfg.max_block_range_fetch_size + 1
68        );
69
70        fetch_ranges
71            .then(|subrange| {
72                let prov_clone = self.provider.clone();
73
74                async move {
75                    trace!(
76                        from = ?subrange.get_from_block(),
77                        to = ?subrange.get_to_block(),
78                        "fetching logs in block subrange"
79                    );
80                    match prov_clone.get_logs(&subrange).await {
81                        Ok(logs) => Ok(logs),
82                        Err(e) => {
83                            error!(
84                                from = ?subrange.get_from_block(),
85                                to = ?subrange.get_to_block(),
86                                error = %e,
87                                "failed to fetch logs in block subrange"
88                            );
89                            Err(e)
90                        }
91                    }
92                }
93            })
94            .flat_map(|result| {
95                futures::stream::iter(match result {
96                    Ok(logs) => logs.into_iter().map(|log| Ok(Log::from(log))).collect::<Vec<_>>(),
97                    Err(e) => vec![Err(RpcError::from(e))],
98                })
99            })
100            .boxed()
101    }
102}
103
104#[async_trait]
105impl<P: JsonRpcClient + 'static, R: HttpRequestor + 'static> HoprIndexerRpcOperations for RpcOperations<P, R> {
106    async fn block_number(&self) -> Result<u64> {
107        self.get_block_number().await
108    }
109
110    fn try_stream_logs<'a>(
111        &'a self,
112        start_block_number: u64,
113        filter: LogFilter,
114    ) -> Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>> {
115        if filter.is_empty() {
116            return Err(FilterIsEmpty);
117        }
118
119        Ok(Box::pin(stream! {
120            // On first iteration use the given block number as start
121            let mut from_block = start_block_number;
122
123            const MAX_LOOP_FAILURES: usize = 5;
124            const MAX_RPC_PAST_BLOCKS: usize = 50;
125            let mut count_failures = 0;
126
127            'outer: loop {
128                match self.block_number().await {
129                    Ok(latest_block) => {
130                        if from_block > latest_block {
131                            let past_diff = from_block - latest_block;
132                            if from_block == start_block_number {
133                                // If on first iteration the start block is in the future, just set
134                                // it to the latest
135                                from_block = latest_block;
136                            } else if past_diff <= MAX_RPC_PAST_BLOCKS as u64 {
137                                // If we came here early (we tolerate only off-by MAX_RPC_PAST_BLOCKS), wait some more
138                                debug!(last_block = latest_block, start_block = start_block_number, blocks_diff = past_diff, "Indexer premature request. Block not found yet in RPC provider.");
139                                futures_timer::Delay::new(past_diff as u32 * self.cfg.expected_block_time / 3).await;
140                                continue;
141                            } else {
142                                // This is a hard-failure on later iterations which is unrecoverable
143                                panic!("indexer start block number {from_block} is greater than the chain latest block number {latest_block} (diff {past_diff}) =>
144                                possible causes: chain reorg, RPC provider out of sync, corrupted DB =>
145                                possible solutions: change the RPC provider, reinitialize the DB");
146                            }
147                        }
148
149
150                        #[cfg(all(feature = "prometheus", not(test)))]
151                        METRIC_RPC_CHAIN_HEAD.set(latest_block as f64);
152
153                        let mut retrieved_logs = self.stream_logs(filter.clone(), from_block, latest_block);
154
155                        trace!(from_block, to_block = latest_block, "processing batch");
156
157                        let mut current_block_log = BlockWithLogs { block_id: from_block, ..Default::default()};
158
159                        loop {
160                            match retrieved_logs.next().await {
161                                Some(Ok(log)) => {
162                                    // This in general should not happen, but handle such a case to be safe
163                                    if log.block_number > latest_block {
164                                        warn!(%log, latest_block, "got log that has not yet reached the finalized tip");
165                                        break;
166                                    }
167
168                                    // This should not happen, thus panic.
169                                    if current_block_log.block_id > log.block_number {
170                                        error!(log_block_id = log.block_number, current_block_log.block_id, "received log from a previous block");
171                                        panic!("The on-chain logs are not ordered by block number. This is a critical error.");
172                                    }
173
174                                    // This assumes the logs are arriving ordered by blocks when fetching a range
175                                    if current_block_log.block_id < log.block_number {
176                                        debug!(block = %current_block_log, "completed block, moving to next");
177                                        yield current_block_log;
178
179                                        current_block_log = BlockWithLogs::default();
180                                        current_block_log.block_id = log.block_number;
181                                    }
182
183                                    debug!("retrieved {log}");
184                                    current_block_log.logs.insert(log.into());
185                                },
186                                None => {
187                                    break;
188                                },
189                                Some(Err(e)) => {
190                                    error!(error=%e, "failed to process blocks");
191                                    count_failures += 1;
192
193                                    if count_failures < MAX_LOOP_FAILURES {
194                                        // Continue the outer loop, which throws away the current block
195                                        // that may be incomplete due to this error.
196                                        // We will start at this block again to re-query it.
197                                        from_block = current_block_log.block_id;
198                                        continue 'outer;
199                                    } else {
200                                        panic!("!!! Cannot advance the chain indexing due to unrecoverable RPC errors.
201
202                                        The RPC provider does not seem to be working correctly.
203
204                                        The last encountered error was: {e}");
205                                    }
206                                }
207                            }
208                        }
209
210                        // Yield everything we've collected until this point
211                        debug!(block = %current_block_log, "completed block, processing batch finished");
212                        yield current_block_log;
213                        from_block = latest_block + 1;
214                        count_failures = 0;
215                    }
216
217                    Err(e) => error!(error = %e, "failed to obtain current block number from chain")
218                }
219
220                futures_timer::Delay::new(self.cfg.expected_block_time).await;
221            }
222        }))
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use anyhow::Context;
229    use async_std::prelude::FutureExt;
230    use ethers::contract::EthEvent;
231    use futures::StreamExt;
232    use std::time::Duration;
233    use tracing::debug;
234
235    use hopr_async_runtime::prelude::{sleep, spawn};
236    use hopr_bindings::hopr_channels::*;
237    use hopr_bindings::hopr_token::{ApprovalFilter, TransferFilter};
238    use hopr_chain_types::{ContractAddresses, ContractInstances};
239    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
240
241    use crate::client::surf_client::SurfRequestor;
242    use crate::client::{create_rpc_client_to_anvil, JsonRpcProviderClient, SimpleJsonRpcRetryPolicy};
243    use crate::errors::RpcError;
244    use crate::indexer::split_range;
245    use crate::rpc::{RpcOperations, RpcOperationsConfig};
246    use crate::{BlockWithLogs, HoprIndexerRpcOperations, LogFilter};
247
248    fn filter_bounds(filter: &ethers::types::Filter) -> anyhow::Result<(u64, u64)> {
249        Ok((
250            filter
251                .block_option
252                .get_from_block()
253                .context("a value should be present")?
254                .as_number()
255                .context("a value should be convertible")?
256                .as_u64(),
257            filter
258                .block_option
259                .get_to_block()
260                .context("a value should be present")?
261                .as_number()
262                .context("a value should be convertible")?
263                .as_u64(),
264        ))
265    }
266
267    #[async_std::test]
268    async fn test_split_range() -> anyhow::Result<()> {
269        let ranges = split_range(LogFilter::default(), 0, 10, 2).collect::<Vec<_>>().await;
270
271        assert_eq!(6, ranges.len());
272        assert_eq!((0, 1), filter_bounds(&ranges[0])?);
273        assert_eq!((2, 3), filter_bounds(&ranges[1])?);
274        assert_eq!((4, 5), filter_bounds(&ranges[2])?);
275        assert_eq!((6, 7), filter_bounds(&ranges[3])?);
276        assert_eq!((8, 9), filter_bounds(&ranges[4])?);
277        assert_eq!((10, 10), filter_bounds(&ranges[5])?);
278
279        let ranges = split_range(LogFilter::default(), 0, 0, 2).collect::<Vec<_>>().await;
280        assert_eq!(1, ranges.len());
281        assert_eq!((0, 0), filter_bounds(&ranges[0])?);
282
283        let ranges = split_range(LogFilter::default(), 0, 0, 1).collect::<Vec<_>>().await;
284        assert_eq!(1, ranges.len());
285        assert_eq!((0, 0), filter_bounds(&ranges[0])?);
286
287        let ranges = split_range(LogFilter::default(), 0, 3, 1).collect::<Vec<_>>().await;
288        assert_eq!(4, ranges.len());
289        assert_eq!((0, 0), filter_bounds(&ranges[0])?);
290        assert_eq!((1, 1), filter_bounds(&ranges[1])?);
291        assert_eq!((2, 2), filter_bounds(&ranges[2])?);
292        assert_eq!((3, 3), filter_bounds(&ranges[3])?);
293
294        let ranges = split_range(LogFilter::default(), 0, 3, 10).collect::<Vec<_>>().await;
295        assert_eq!(1, ranges.len());
296        assert_eq!((0, 3), filter_bounds(&ranges[0])?);
297
298        Ok(())
299    }
300
301    #[async_std::test]
302    async fn test_should_get_block_number() -> anyhow::Result<()> {
303        let expected_block_time = Duration::from_secs(1);
304        let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
305        let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
306
307        let client = JsonRpcProviderClient::new(
308            &anvil.endpoint(),
309            SurfRequestor::default(),
310            SimpleJsonRpcRetryPolicy::default(),
311        );
312
313        let cfg = RpcOperationsConfig {
314            finality: 2,
315            expected_block_time,
316            gas_oracle_url: None,
317            ..RpcOperationsConfig::default()
318        };
319
320        // Wait until contracts deployments are final
321        sleep((1 + cfg.finality) * expected_block_time).await;
322
323        let rpc = RpcOperations::new(client, SurfRequestor::default(), &chain_key_0, cfg)?;
324
325        let b1 = rpc.block_number().await?;
326
327        sleep(expected_block_time * 2).await;
328
329        let b2 = rpc.block_number().await?;
330
331        assert!(b2 > b1, "block number should increase");
332
333        Ok(())
334    }
335
336    #[async_std::test]
337    async fn test_try_stream_logs_should_contain_all_logs_when_opening_channel() -> anyhow::Result<()> {
338        let _ = env_logger::builder().is_test(true).try_init();
339
340        let expected_block_time = Duration::from_secs(1);
341
342        let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
343        let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
344        let chain_key_1 = ChainKeypair::from_secret(anvil.keys()[1].to_bytes().as_ref())?;
345
346        // Deploy contracts
347        let contract_instances = {
348            let client = create_rpc_client_to_anvil(SurfRequestor::default(), &anvil, &chain_key_0);
349            ContractInstances::deploy_for_testing(client, &chain_key_0).await?
350        };
351
352        let tokens_minted_at =
353            hopr_chain_types::utils::mint_tokens(contract_instances.token.clone(), 1000_u128.into()).await;
354        debug!("tokens were minted at block {tokens_minted_at}");
355
356        let contract_addrs = ContractAddresses::from(&contract_instances);
357
358        let cfg = RpcOperationsConfig {
359            tx_polling_interval: Duration::from_millis(10),
360            contract_addrs,
361            expected_block_time,
362            gas_oracle_url: None,
363            ..RpcOperationsConfig::default()
364        };
365
366        let client = JsonRpcProviderClient::new(
367            &anvil.endpoint(),
368            SurfRequestor::default(),
369            SimpleJsonRpcRetryPolicy::default(),
370        );
371
372        // Wait until contracts deployments are final
373        sleep((1 + cfg.finality) * expected_block_time).await;
374
375        let rpc = RpcOperations::new(client, SurfRequestor::default(), &chain_key_0, cfg)?;
376
377        let log_filter = LogFilter {
378            address: vec![contract_addrs.token, contract_addrs.channels],
379            topics: vec![
380                TransferFilter::signature().into(),
381                ApprovalFilter::signature().into(),
382                ChannelOpenedFilter::signature().into(),
383                ChannelBalanceIncreasedFilter::signature().into(),
384            ],
385        };
386
387        debug!("{:#?}", contract_addrs);
388        debug!("{:#?}", log_filter);
389
390        // Spawn stream
391        let count_filtered_topics = log_filter.topics.len();
392        let retrieved_logs = spawn(async move {
393            Ok::<_, RpcError>(
394                rpc.try_stream_logs(1, log_filter)?
395                    .skip_while(|b| futures::future::ready(b.len() != count_filtered_topics))
396                    .take(1)
397                    .collect::<Vec<BlockWithLogs>>()
398                    .await,
399            )
400        });
401
402        // Spawn channel funding
403        hopr_chain_types::utils::fund_channel(
404            chain_key_1.public().to_address(),
405            contract_instances.token,
406            contract_instances.channels,
407            1_u128.into(),
408        )
409        .await;
410
411        let retrieved_logs = retrieved_logs
412            .timeout(Duration::from_secs(30)) // Give up after 30 seconds
413            .await??;
414
415        // The last block must contain all 4 events
416        let last_block_logs = retrieved_logs.last().context("a log should be present")?.clone().logs;
417        let channel_open_filter = ChannelOpenedFilter::signature();
418        let channel_balance_filter = ChannelBalanceIncreasedFilter::signature();
419        let approval_filter = ApprovalFilter::signature();
420        let transfer_filter = TransferFilter::signature();
421
422        debug!(
423            "channel_open_filter: {:?} - {:?}",
424            channel_open_filter,
425            channel_open_filter.as_ref().to_vec()
426        );
427        debug!(
428            "channel_balance_filter: {:?} - {:?}",
429            channel_balance_filter,
430            channel_balance_filter.as_ref().to_vec()
431        );
432        debug!(
433            "approval_filter: {:?} - {:?}",
434            approval_filter,
435            approval_filter.as_ref().to_vec()
436        );
437        debug!(
438            "transfer_filter: {:?} - {:?}",
439            transfer_filter,
440            transfer_filter.as_ref().to_vec()
441        );
442        debug!("logs: {:#?}", last_block_logs);
443
444        assert!(
445            last_block_logs
446                .iter()
447                .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_open_filter.into())),
448            "must contain channel open"
449        );
450        assert!(
451            last_block_logs.iter().any(
452                |log| log.address == contract_addrs.channels && log.topics.contains(&channel_balance_filter.into())
453            ),
454            "must contain channel balance increase"
455        );
456        assert!(
457            last_block_logs
458                .iter()
459                .any(|log| log.address == contract_addrs.token && log.topics.contains(&approval_filter.into())),
460            "must contain token approval"
461        );
462        assert!(
463            last_block_logs
464                .iter()
465                .any(|log| log.address == contract_addrs.token && log.topics.contains(&transfer_filter.into())),
466            "must contain token transfer"
467        );
468
469        Ok(())
470    }
471
472    #[async_std::test]
473    async fn test_try_stream_logs_should_contain_only_channel_logs_when_filtered_on_funding_channel(
474    ) -> anyhow::Result<()> {
475        let _ = env_logger::builder().is_test(true).try_init();
476
477        let expected_block_time = Duration::from_secs(1);
478
479        let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
480        let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
481        let chain_key_1 = ChainKeypair::from_secret(anvil.keys()[1].to_bytes().as_ref())?;
482
483        // Deploy contracts
484        let contract_instances = {
485            let client = create_rpc_client_to_anvil(SurfRequestor::default(), &anvil, &chain_key_0);
486            ContractInstances::deploy_for_testing(client, &chain_key_0).await?
487        };
488
489        let tokens_minted_at =
490            hopr_chain_types::utils::mint_tokens(contract_instances.token.clone(), 1000_u128.into()).await;
491        debug!("tokens were minted at block {tokens_minted_at}");
492
493        let contract_addrs = ContractAddresses::from(&contract_instances);
494
495        let cfg = RpcOperationsConfig {
496            tx_polling_interval: Duration::from_millis(10),
497            contract_addrs,
498            expected_block_time,
499            finality: 2,
500            gas_oracle_url: None,
501            ..RpcOperationsConfig::default()
502        };
503
504        let client = JsonRpcProviderClient::new(
505            &anvil.endpoint(),
506            SurfRequestor::default(),
507            SimpleJsonRpcRetryPolicy::default(),
508        );
509
510        // Wait until contracts deployments are final
511        sleep((1 + cfg.finality) * expected_block_time).await;
512
513        let rpc = RpcOperations::new(client, SurfRequestor::default(), &chain_key_0, cfg)?;
514
515        let log_filter = LogFilter {
516            address: vec![contract_addrs.channels],
517            topics: vec![
518                ChannelOpenedFilter::signature().into(),
519                ChannelBalanceIncreasedFilter::signature().into(),
520            ],
521        };
522
523        debug!("{:#?}", contract_addrs);
524        debug!("{:#?}", log_filter);
525
526        // Spawn stream
527        let count_filtered_topics = log_filter.topics.len();
528        let retrieved_logs = spawn(async move {
529            Ok::<_, RpcError>(
530                rpc.try_stream_logs(1, log_filter)?
531                    .skip_while(|b| futures::future::ready(b.len() != count_filtered_topics))
532                    .take(1)
533                    .collect::<Vec<BlockWithLogs>>()
534                    .await,
535            )
536        });
537
538        // Spawn channel funding
539        hopr_chain_types::utils::fund_channel(
540            chain_key_1.public().to_address(),
541            contract_instances.token,
542            contract_instances.channels,
543            1_u128.into(),
544        )
545        .await;
546
547        let retrieved_logs = retrieved_logs
548            .timeout(Duration::from_secs(30)) // Give up after 30 seconds
549            .await??;
550
551        // The last block must contain all 2 events
552        let last_block_logs = retrieved_logs
553            .first()
554            .context("a value should be present")?
555            .clone()
556            .logs;
557        let channel_open_filter: [u8; 32] = ChannelOpenedFilter::signature().into();
558        let channel_balance_filter: [u8; 32] = ChannelBalanceIncreasedFilter::signature().into();
559
560        assert!(
561            last_block_logs
562                .iter()
563                .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_open_filter)),
564            "must contain channel open"
565        );
566        assert!(
567            last_block_logs
568                .iter()
569                .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_balance_filter)),
570            "must contain channel balance increase"
571        );
572
573        Ok(())
574    }
575}