hopr_chain_rpc/
indexer.rs

1//! Extends the [RpcOperations] type with functionality needed by the Indexer component.
2//!
3//! The functionality required functionality is defined in the [HoprIndexerRpcOperations] trait,
4//! which is implemented for [RpcOperations] hereof.
5//! The primary goal is to provide a stream of [BlockWithLogs] filtered by the given [LogFilter]
6//! as the new matching blocks are mined in the underlying blockchain. The stream also allows to collect
7//! historical blockchain data.
8//!
9//! For details on the Indexer see the `chain-indexer` crate.
10use std::pin::Pin;
11
12use alloy::{providers::Provider, rpc::types::Filter};
13use async_stream::stream;
14use async_trait::async_trait;
15use futures::{Stream, StreamExt, stream::BoxStream};
16#[cfg(all(feature = "prometheus", not(test)))]
17use hopr_metrics::metrics::SimpleGauge;
18use hopr_primitive_types::prelude::*;
19use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
20use tracing::{debug, error, trace, warn};
21
22use crate::{
23    BlockWithLogs, FilterSet, HoprIndexerRpcOperations, Log,
24    errors::{Result, RpcError, RpcError::FilterIsEmpty},
25    rpc::RpcOperations,
26    transport::HttpRequestor,
27};
28
29#[cfg(all(feature = "prometheus", not(test)))]
30lazy_static::lazy_static! {
31    static ref METRIC_RPC_CHAIN_HEAD: SimpleGauge =
32        SimpleGauge::new(
33            "hopr_chain_head_block_number",
34            "Current block number of chain head",
35    ).unwrap();
36}
37
38/// Splits a block range into smaller chunks and applies filters to each chunk.
39///
40/// This function takes a range of blocks and divides it into smaller sub-ranges
41/// for concurrent processing. Each sub-range gets a copy of all the provided filters
42/// with the appropriate block range applied.
43///
44/// # Arguments
45/// * `filters` - Vector of log filters to apply to each chunk
46/// * `from_block` - Starting block number
47/// * `to_block` - Ending block number
48/// * `max_chunk_size` - Maximum number of blocks per chunk
49///
50/// # Returns
51/// * `impl Stream<Item = Vec<Filter>>` - Stream of filter vectors, one per chunk
52fn split_range<'a>(
53    filters: Vec<Filter>,
54    from_block: u64,
55    to_block: u64,
56    max_chunk_size: u64,
57) -> BoxStream<'a, Vec<Filter>> {
58    assert!(from_block <= to_block, "invalid block range");
59    assert!(max_chunk_size > 0, "chunk size must be greater than 0");
60
61    futures::stream::unfold((from_block, to_block), move |(start, to)| {
62        if start <= to {
63            let end = to_block.min(start + max_chunk_size - 1);
64            let ranged_filters = filters
65                .iter()
66                .cloned()
67                .map(|f| f.from_block(start).to_block(end))
68                .collect::<Vec<_>>();
69            futures::future::ready(Some((ranged_filters, (end + 1, to))))
70        } else {
71            futures::future::ready(None)
72        }
73    })
74    .boxed()
75}
76
77// impl<P: JsonRpcClient + 'static, R: HttpRequestor + 'static> RpcOperations<P, R> {
78impl<R: HttpRequestor + 'static + Clone> RpcOperations<R> {
79    /// Retrieves logs in the given range (`from_block` and `to_block` are inclusive).
80    fn stream_logs(&self, filters: Vec<Filter>, from_block: u64, to_block: u64) -> BoxStream<Result<Log>> {
81        let fetch_ranges = split_range(filters, from_block, to_block, self.cfg.max_block_range_fetch_size);
82
83        debug!(
84            "polling logs from blocks #{from_block} - #{to_block} (via {:?} chunks)",
85            (to_block - from_block) / self.cfg.max_block_range_fetch_size + 1
86        );
87
88        fetch_ranges
89            .then(move |subrange_filters| async move {
90                let mut results = futures::stream::iter(subrange_filters)
91                    .then_concurrent(|filter| async move {
92                        let prov_clone = self.provider.clone();
93
94                        match prov_clone.get_logs(&filter).await {
95                            Ok(logs) => Ok(logs),
96                            Err(e) => {
97                                error!(
98                                    from = ?filter.get_from_block(),
99                                    to = ?filter.get_to_block(),
100                                    error = %e,
101                                    "failed to fetch logs in block subrange"
102                                );
103                                Err(e)
104                            }
105                        }
106                    })
107                    .flat_map(|result| {
108                        futures::stream::iter(match result {
109                            Ok(logs) => logs.into_iter().map(|log| Ok(Log::try_from(log)?)).collect::<Vec<_>>(),
110                            Err(e) => vec![Err(RpcError::from(e))],
111                        })
112                    })
113                    .collect::<Vec<_>>()
114                    .await;
115
116                // at this point we need to ensure logs are ordered by block number since that is
117                // expected by the indexer
118                results.sort_by(|a, b| {
119                    if let Ok(a) = a {
120                        if let Ok(b) = b {
121                            a.block_number.cmp(&b.block_number)
122                        } else {
123                            std::cmp::Ordering::Greater
124                        }
125                    } else {
126                        std::cmp::Ordering::Less
127                    }
128                });
129
130                futures::stream::iter(results)
131            })
132            .flatten()
133            .boxed()
134    }
135}
136
137#[async_trait]
138impl<R: HttpRequestor + 'static + Clone> HoprIndexerRpcOperations for RpcOperations<R> {
139    async fn block_number(&self) -> Result<u64> {
140        self.get_block_number().await
141    }
142
143    async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> Result<HoprBalance> {
144        self.get_hopr_allowance(owner, spender).await
145    }
146
147    async fn get_xdai_balance(&self, address: Address) -> Result<XDaiBalance> {
148        self.get_xdai_balance(address).await
149    }
150
151    async fn get_hopr_balance(&self, address: Address) -> Result<HoprBalance> {
152        self.get_hopr_balance(address).await
153    }
154
155    fn try_stream_logs<'a>(
156        &'a self,
157        start_block_number: u64,
158        filters: FilterSet,
159        is_synced: bool,
160    ) -> Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>> {
161        if filters.all.is_empty() {
162            return Err(FilterIsEmpty);
163        }
164
165        let log_filters = if !is_synced {
166            // Because we are not synced yet, we will not get logs for the token contract.
167            // These are only relevant for the indexer if we are synced.
168            filters.no_token
169        } else {
170            filters.all
171        };
172
173        Ok(Box::pin(stream! {
174            // On first iteration use the given block number as start
175            let mut from_block = start_block_number;
176
177            const MAX_LOOP_FAILURES: usize = 5;
178            const MAX_RPC_PAST_BLOCKS: usize = 50;
179            let mut count_failures = 0;
180
181            'outer: loop {
182                match self.block_number().await {
183                    Ok(latest_block) => {
184                        if from_block > latest_block {
185                            let past_diff = from_block - latest_block;
186                            if from_block == start_block_number {
187                                // If on first iteration the start block is in the future, just set
188                                // it to the latest
189                                from_block = latest_block;
190                            } else if past_diff <= MAX_RPC_PAST_BLOCKS as u64 {
191                                // If we came here early (we tolerate only off-by MAX_RPC_PAST_BLOCKS), wait some more
192                                debug!(last_block = latest_block, start_block = start_block_number, blocks_diff = past_diff, "Indexer premature request. Block not found yet in RPC provider.");
193                                futures_timer::Delay::new(past_diff as u32 * self.cfg.expected_block_time / 3).await;
194                                continue;
195                            } else {
196                                // This is a hard-failure on later iterations which is unrecoverable
197                                panic!("indexer start block number {from_block} is greater than the chain latest block number {latest_block} (diff {past_diff}) =>
198                                possible causes: chain reorg, RPC provider out of sync, corrupted DB =>
199                                possible solutions: change the RPC provider, reinitialize the DB");
200                            }
201                        }
202
203
204                        #[cfg(all(feature = "prometheus", not(test)))]
205                        METRIC_RPC_CHAIN_HEAD.set(latest_block as f64);
206
207                        let mut retrieved_logs = self.stream_logs(log_filters.clone(), from_block, latest_block);
208
209                        trace!(from_block, to_block = latest_block, "processing batch");
210
211                        let mut current_block_log = BlockWithLogs { block_id: from_block, ..Default::default()};
212
213                        loop {
214                            match retrieved_logs.next().await {
215                                Some(Ok(log)) => {
216                                    // This in general should not happen, but handle such a case to be safe
217                                    if log.block_number > latest_block {
218                                        warn!(%log, latest_block, "got log that has not yet reached the finalized tip");
219                                        break;
220                                    }
221
222                                    // This should not happen, thus panic.
223                                    if current_block_log.block_id > log.block_number {
224                                        error!(log_block_id = log.block_number, current_block_log.block_id, "received log from a previous block");
225                                        panic!("The on-chain logs are not ordered by block number. This is a critical error.");
226                                    }
227
228                                    // This assumes the logs are arriving ordered by blocks when fetching a range
229                                    if current_block_log.block_id < log.block_number {
230                                        debug!(block = %current_block_log, "completed block, moving to next");
231                                        yield current_block_log;
232
233                                        current_block_log = BlockWithLogs::default();
234                                        current_block_log.block_id = log.block_number;
235                                    }
236
237                                    debug!("retrieved {log}");
238                                    current_block_log.logs.insert(log.into());
239                                },
240                                None => {
241                                    break;
242                                },
243                                Some(Err(e)) => {
244                                    error!(error=%e, "failed to process blocks");
245                                    count_failures += 1;
246
247                                    if count_failures < MAX_LOOP_FAILURES {
248                                        // Continue the outer loop, which throws away the current block
249                                        // that may be incomplete due to this error.
250                                        // We will start at this block again to re-query it.
251                                        from_block = current_block_log.block_id;
252                                        continue 'outer;
253                                    } else {
254                                        panic!("!!! Cannot advance the chain indexing due to unrecoverable RPC errors.
255
256                                        The RPC provider does not seem to be working correctly.
257
258                                        The last encountered error was: {e}");
259                                    }
260                                }
261                            }
262                        }
263
264                        // Yield everything we've collected until this point
265                        debug!(block = %current_block_log, "completed block, processing batch finished");
266                        yield current_block_log;
267                        from_block = latest_block + 1;
268                        count_failures = 0;
269                    }
270
271                    Err(e) => error!(error = %e, "failed to obtain current block number from chain")
272                }
273
274                futures_timer::Delay::new(self.cfg.expected_block_time).await;
275            }
276        }))
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use std::time::Duration;
283
284    use alloy::{
285        primitives::U256,
286        rpc::{client::ClientBuilder, types::Filter},
287        sol_types::SolEvent,
288        transports::{http::ReqwestTransport, layers::RetryBackoffLayer},
289    };
290    use anyhow::Context;
291    use futures::StreamExt;
292    use hopr_async_runtime::prelude::{sleep, spawn};
293    use hopr_bindings::{
294        hoprchannelsevents::HoprChannelsEvents::{ChannelBalanceIncreased, ChannelOpened},
295        hoprtoken::HoprToken::{Approval, Transfer},
296    };
297    use hopr_chain_types::{ContractAddresses, ContractInstances};
298    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
299    use tokio::time::timeout;
300    use tracing::debug;
301
302    use crate::{
303        BlockWithLogs, FilterSet, HoprIndexerRpcOperations,
304        client::create_rpc_client_to_anvil,
305        errors::RpcError,
306        indexer::split_range,
307        rpc::{RpcOperations, RpcOperationsConfig},
308    };
309
310    fn filter_bounds(filters: &[Filter]) -> anyhow::Result<(u64, u64)> {
311        let bounds = filters.iter().try_fold((0, 0), |acc, filter| {
312            let to = filter
313                .block_option
314                .get_from_block()
315                .context("a value should be present")?
316                .as_number()
317                .context("a value should be convertible")?;
318            let from = filter
319                .block_option
320                .get_to_block()
321                .context("a value should be present")?
322                .as_number()
323                .context("a value should be convertible")?;
324            let next = (to, from);
325
326            match acc {
327                (0, 0) => Ok(next), // First pair becomes the reference
328                acc => {
329                    if acc != next {
330                        anyhow::bail!("range bounds are not equal across all filters");
331                    }
332                    Ok(acc)
333                }
334            }
335        })?;
336
337        Ok(bounds)
338    }
339
340    #[tokio::test]
341    async fn test_split_range() -> anyhow::Result<()> {
342        let filters = vec![Filter::default()];
343        let ranges = split_range(filters.clone(), 0, 10, 2).collect::<Vec<_>>().await;
344
345        assert_eq!(6, ranges.len());
346        assert_eq!((0, 1), filter_bounds(&ranges[0])?);
347        assert_eq!((2, 3), filter_bounds(&ranges[1])?);
348        assert_eq!((4, 5), filter_bounds(&ranges[2])?);
349        assert_eq!((6, 7), filter_bounds(&ranges[3])?);
350        assert_eq!((8, 9), filter_bounds(&ranges[4])?);
351        assert_eq!((10, 10), filter_bounds(&ranges[5])?);
352
353        let ranges = split_range(filters.clone(), 0, 0, 1).collect::<Vec<_>>().await;
354        assert_eq!(1, ranges.len());
355        assert_eq!((0, 0), filter_bounds(&ranges[0])?);
356
357        let ranges = split_range(filters.clone(), 0, 3, 1).collect::<Vec<_>>().await;
358        assert_eq!(4, ranges.len());
359        assert_eq!((0, 0), filter_bounds(&ranges[0])?);
360        assert_eq!((1, 1), filter_bounds(&ranges[1])?);
361        assert_eq!((2, 2), filter_bounds(&ranges[2])?);
362        assert_eq!((3, 3), filter_bounds(&ranges[3])?);
363
364        let ranges = split_range(filters.clone(), 0, 3, 10).collect::<Vec<_>>().await;
365        assert_eq!(1, ranges.len());
366        assert_eq!((0, 3), filter_bounds(&ranges[0])?);
367
368        Ok(())
369    }
370
371    #[tokio::test]
372    async fn test_should_get_block_number() -> anyhow::Result<()> {
373        let expected_block_time = Duration::from_secs(1);
374        let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
375        let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
376
377        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
378
379        let rpc_client = ClientBuilder::default()
380            .layer(RetryBackoffLayer::new(2, 100, 100))
381            .transport(transport_client.clone(), transport_client.guess_local());
382
383        let cfg = RpcOperationsConfig {
384            finality: 2,
385            expected_block_time,
386            gas_oracle_url: None,
387            ..RpcOperationsConfig::default()
388        };
389
390        // Wait until contracts deployments are final
391        sleep((1 + cfg.finality) * expected_block_time).await;
392
393        let rpc = RpcOperations::new(rpc_client, transport_client.client().clone(), &chain_key_0, cfg, None)?;
394
395        let b1 = rpc.block_number().await?;
396
397        sleep(expected_block_time * 2).await;
398
399        let b2 = rpc.block_number().await?;
400
401        assert!(b2 > b1, "block number should increase");
402
403        Ok(())
404    }
405
406    #[tokio::test]
407    async fn test_try_stream_logs_should_contain_all_logs_when_opening_channel() -> anyhow::Result<()> {
408        let _ = env_logger::builder().is_test(true).try_init();
409
410        let expected_block_time = Duration::from_secs(1);
411
412        let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
413        let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
414        let chain_key_1 = ChainKeypair::from_secret(anvil.keys()[1].to_bytes().as_ref())?;
415
416        // Deploy contracts
417        let contract_instances = {
418            let client = create_rpc_client_to_anvil(&anvil, &chain_key_0);
419            ContractInstances::deploy_for_testing(client, &chain_key_0).await?
420        };
421
422        let contract_addrs = ContractAddresses::from(&contract_instances);
423
424        let filter_token_approval = alloy::rpc::types::Filter::new()
425            .address(alloy::primitives::Address::from(contract_addrs.token))
426            .event_signature(Approval::SIGNATURE_HASH);
427        let filter_token_transfer = alloy::rpc::types::Filter::new()
428            .address(alloy::primitives::Address::from(contract_addrs.token))
429            .event_signature(Transfer::SIGNATURE_HASH);
430        let filter_channels_opened = alloy::rpc::types::Filter::new()
431            .address(alloy::primitives::Address::from(contract_addrs.channels))
432            .event_signature(ChannelOpened::SIGNATURE_HASH);
433        let filter_channels_balance_increased = alloy::rpc::types::Filter::new()
434            .address(alloy::primitives::Address::from(contract_addrs.channels))
435            .event_signature(ChannelBalanceIncreased::SIGNATURE_HASH);
436
437        let log_filter = FilterSet {
438            all: vec![
439                filter_token_approval.clone(),
440                filter_token_transfer.clone(),
441                filter_channels_opened.clone(),
442                filter_channels_balance_increased.clone(),
443            ],
444            token: vec![filter_token_approval, filter_token_transfer],
445            no_token: vec![filter_channels_opened, filter_channels_balance_increased],
446        };
447
448        debug!("{:#?}", contract_addrs);
449        debug!("{:#?}", log_filter);
450
451        let tokens_minted_at =
452            hopr_chain_types::utils::mint_tokens(contract_instances.token.clone(), U256::from(1000_u128))
453                .await?
454                .unwrap();
455        debug!("tokens were minted at block {tokens_minted_at}");
456
457        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
458
459        let rpc_client = ClientBuilder::default()
460            .layer(RetryBackoffLayer::new(2, 100, 100))
461            .transport(transport_client.clone(), transport_client.guess_local());
462
463        let cfg = RpcOperationsConfig {
464            tx_polling_interval: Duration::from_millis(10),
465            contract_addrs,
466            expected_block_time,
467            gas_oracle_url: None,
468            ..RpcOperationsConfig::default()
469        };
470
471        // Wait until contracts deployments are final
472        sleep((1 + cfg.finality) * expected_block_time).await;
473
474        let rpc = RpcOperations::new(rpc_client, transport_client.client().clone(), &chain_key_0, cfg, None)?;
475
476        // Spawn stream
477        let count_filtered_topics = 2;
478        let retrieved_logs = spawn(async move {
479            Ok::<_, RpcError>(
480                rpc.try_stream_logs(1, log_filter, false)?
481                    .skip_while(|b| futures::future::ready(b.len() != count_filtered_topics))
482                    .next()
483                    .await,
484            )
485        });
486
487        // Spawn channel funding
488        let _ = hopr_chain_types::utils::fund_channel(
489            chain_key_1.public().to_address(),
490            contract_instances.token,
491            contract_instances.channels,
492            U256::from(1_u128),
493        )
494        .await;
495
496        let retrieved_logs = timeout(Duration::from_secs(30), retrieved_logs) // Give up after 30 seconds
497            .await???;
498
499        // The last block must contain all 4 events
500        let last_block_logs = retrieved_logs
501            .into_iter()
502            .next_back()
503            .context("a log should be present")?
504            .clone()
505            .logs;
506
507        let channel_open_filter = ChannelOpened::SIGNATURE_HASH;
508        let channel_balance_filter = ChannelBalanceIncreased::SIGNATURE_HASH;
509
510        debug!(
511            "channel_open_filter: {:?} - {:?}",
512            channel_open_filter,
513            channel_open_filter.0.to_vec()
514        );
515        debug!(
516            "channel_balance_filter: {:?} - {:?}",
517            channel_balance_filter,
518            channel_balance_filter.0.to_vec()
519        );
520        debug!("logs: {:#?}", last_block_logs);
521
522        assert!(
523            last_block_logs
524                .iter()
525                .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_open_filter.into())),
526            "must contain channel open"
527        );
528        assert!(
529            last_block_logs.iter().any(
530                |log| log.address == contract_addrs.channels && log.topics.contains(&channel_balance_filter.into())
531            ),
532            "must contain channel balance increase"
533        );
534
535        Ok(())
536    }
537
538    #[tokio::test]
539    async fn test_try_stream_logs_should_contain_only_channel_logs_when_filtered_on_funding_channel()
540    -> anyhow::Result<()> {
541        let _ = env_logger::builder().is_test(true).try_init();
542
543        let expected_block_time = Duration::from_secs(1);
544
545        let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
546        let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
547        let chain_key_1 = ChainKeypair::from_secret(anvil.keys()[1].to_bytes().as_ref())?;
548
549        // Deploy contracts
550        let contract_instances = {
551            let client = create_rpc_client_to_anvil(&anvil, &chain_key_0);
552            ContractInstances::deploy_for_testing(client, &chain_key_0).await?
553        };
554
555        let tokens_minted_at =
556            hopr_chain_types::utils::mint_tokens(contract_instances.token.clone(), U256::from(1000_u128))
557                .await?
558                .unwrap();
559        debug!("tokens were minted at block {tokens_minted_at}");
560
561        let contract_addrs = ContractAddresses::from(&contract_instances);
562
563        let cfg = RpcOperationsConfig {
564            tx_polling_interval: Duration::from_millis(10),
565            contract_addrs,
566            expected_block_time,
567            finality: 2,
568            gas_oracle_url: None,
569            ..RpcOperationsConfig::default()
570        };
571
572        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
573
574        let rpc_client = ClientBuilder::default()
575            .layer(RetryBackoffLayer::new(2, 100, 100))
576            .transport(transport_client.clone(), transport_client.guess_local());
577
578        // Wait until contracts deployments are final
579        sleep((1 + cfg.finality) * expected_block_time).await;
580
581        let rpc = RpcOperations::new(rpc_client, transport_client.client().clone(), &chain_key_0, cfg, None)?;
582
583        let filter_channels_opened = alloy::rpc::types::Filter::new()
584            .address(alloy::primitives::Address::from(contract_addrs.channels))
585            .event_signature(ChannelOpened::SIGNATURE_HASH);
586        let filter_channels_balance_increased = alloy::rpc::types::Filter::new()
587            .address(alloy::primitives::Address::from(contract_addrs.channels))
588            .event_signature(ChannelBalanceIncreased::SIGNATURE_HASH);
589
590        let log_filter = FilterSet {
591            all: vec![
592                filter_channels_opened.clone(),
593                filter_channels_balance_increased.clone(),
594            ],
595            token: vec![],
596            no_token: vec![filter_channels_opened, filter_channels_balance_increased],
597        };
598
599        debug!("{:#?}", contract_addrs);
600        debug!("{:#?}", log_filter);
601
602        // Spawn stream
603        let count_filtered_topics = 2;
604        let retrieved_logs = spawn(async move {
605            Ok::<_, RpcError>(
606                rpc.try_stream_logs(1, log_filter, false)?
607                    .skip_while(|b| futures::future::ready(b.len() != count_filtered_topics))
608                    // .next()
609                    .take(1)
610                    .collect::<Vec<BlockWithLogs>>()
611                    .await,
612            )
613        });
614
615        // Spawn channel funding
616        let _ = hopr_chain_types::utils::fund_channel(
617            chain_key_1.public().to_address(),
618            contract_instances.token,
619            contract_instances.channels,
620            U256::from(1_u128),
621        )
622        .await;
623
624        let retrieved_logs = timeout(Duration::from_secs(30), retrieved_logs) // Give up after 30 seconds
625            .await???;
626
627        // The last block must contain all 2 events
628        let last_block_logs = retrieved_logs
629            .first()
630            .context("a value should be present")?
631            .clone()
632            .logs;
633
634        let channel_open_filter: [u8; 32] = ChannelOpened::SIGNATURE_HASH.0;
635        let channel_balance_filter: [u8; 32] = ChannelBalanceIncreased::SIGNATURE_HASH.0;
636
637        assert!(
638            last_block_logs
639                .iter()
640                .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_open_filter)),
641            "must contain channel open"
642        );
643        assert!(
644            last_block_logs
645                .iter()
646                .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_balance_filter)),
647            "must contain channel balance increase"
648        );
649
650        Ok(())
651    }
652}