hopr_chain_rpc/
indexer.rs

1//! Extends the [RpcOperations] type with functionality needed by the Indexer component.
2//!
3//! The functionality required functionality is defined in the [HoprIndexerRpcOperations] trait,
4//! which is implemented for [RpcOperations] hereof.
5//! The primary goal is to provide a stream of [BlockWithLogs] filtered by the given [LogFilter]
6//! as the new matching blocks are mined in the underlying blockchain. The stream also allows to collect
7//! historical blockchain data.
8//!
9//! For details on the Indexer see the `chain-indexer` crate.
10use std::pin::Pin;
11
12use alloy::{providers::Provider, rpc::types::Filter};
13use async_stream::stream;
14use async_trait::async_trait;
15use futures::{Stream, StreamExt, stream::BoxStream};
16use hopr_primitive_types::prelude::*;
17use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
18use tracing::{debug, error, trace, warn};
19
20use crate::{
21    BlockWithLogs, FilterSet, HoprIndexerRpcOperations, Log,
22    errors::{Result, RpcError, RpcError::FilterIsEmpty},
23    rpc::RpcOperations,
24    transport::HttpRequestor,
25};
26
27#[cfg(all(feature = "prometheus", not(test)))]
28lazy_static::lazy_static! {
29    static ref METRIC_RPC_CHAIN_HEAD: hopr_metrics::SimpleGauge =
30        hopr_metrics::SimpleGauge::new(
31            "hopr_chain_head_block_number",
32            "Current block number of chain head",
33    ).unwrap();
34}
35
36/// Splits a block range into smaller chunks and applies filters to each chunk.
37///
38/// This function takes a range of blocks and divides it into smaller sub-ranges
39/// for concurrent processing. Each sub-range gets a copy of all the provided filters
40/// with the appropriate block range applied.
41///
42/// # Arguments
43/// * `filters` - Vector of log filters to apply to each chunk
44/// * `from_block` - Starting block number
45/// * `to_block` - Ending block number
46/// * `max_chunk_size` - Maximum number of blocks per chunk
47///
48/// # Returns
49/// * `impl Stream<Item = Vec<Filter>>` - Stream of filter vectors, one per chunk
50fn split_range<'a>(
51    filters: Vec<Filter>,
52    from_block: u64,
53    to_block: u64,
54    max_chunk_size: u64,
55) -> BoxStream<'a, Vec<Filter>> {
56    assert!(from_block <= to_block, "invalid block range");
57    assert!(max_chunk_size > 0, "chunk size must be greater than 0");
58
59    futures::stream::unfold((from_block, to_block), move |(start, to)| {
60        if start <= to {
61            let end = to_block.min(start + max_chunk_size - 1);
62            let ranged_filters = filters
63                .iter()
64                .cloned()
65                .map(|f| f.from_block(start).to_block(end))
66                .collect::<Vec<_>>();
67            futures::future::ready(Some((ranged_filters, (end + 1, to))))
68        } else {
69            futures::future::ready(None)
70        }
71    })
72    .boxed()
73}
74
75// impl<P: JsonRpcClient + 'static, R: HttpRequestor + 'static> RpcOperations<P, R> {
76impl<R: HttpRequestor + 'static + Clone> RpcOperations<R> {
77    /// Retrieves logs in the given range (`from_block` and `to_block` are inclusive).
78    fn stream_logs(&self, filters: Vec<Filter>, from_block: u64, to_block: u64) -> BoxStream<'_, Result<Log>> {
79        let fetch_ranges = split_range(filters, from_block, to_block, self.cfg.max_block_range_fetch_size);
80
81        debug!(
82            "polling logs from blocks #{from_block} - #{to_block} (via {:?} chunks)",
83            (to_block - from_block) / self.cfg.max_block_range_fetch_size + 1
84        );
85
86        fetch_ranges
87            .then(move |subrange_filters| async move {
88                let mut results = futures::stream::iter(subrange_filters)
89                    .then_concurrent(|filter| async move {
90                        let prov_clone = self.provider.clone();
91
92                        match prov_clone.get_logs(&filter).await {
93                            Ok(logs) => Ok(logs),
94                            Err(e) => {
95                                error!(
96                                    from = ?filter.get_from_block(),
97                                    to = ?filter.get_to_block(),
98                                    error = %e,
99                                    "failed to fetch logs in block subrange"
100                                );
101                                Err(e)
102                            }
103                        }
104                    })
105                    .flat_map(|result| {
106                        futures::stream::iter(match result {
107                            Ok(logs) => logs.into_iter().map(|log| Ok(Log::try_from(log)?)).collect::<Vec<_>>(),
108                            Err(e) => vec![Err(RpcError::from(e))],
109                        })
110                    })
111                    .collect::<Vec<_>>()
112                    .await;
113
114                // at this point we need to ensure logs are ordered by block number since that is
115                // expected by the indexer
116                results.sort_by(|a, b| {
117                    if let Ok(a) = a {
118                        if let Ok(b) = b {
119                            a.block_number.cmp(&b.block_number)
120                        } else {
121                            std::cmp::Ordering::Greater
122                        }
123                    } else {
124                        std::cmp::Ordering::Less
125                    }
126                });
127
128                futures::stream::iter(results)
129            })
130            .flatten()
131            .boxed()
132    }
133}
134
135#[async_trait]
136impl<R: HttpRequestor + 'static + Clone> HoprIndexerRpcOperations for RpcOperations<R> {
137    async fn block_number(&self) -> Result<u64> {
138        self.get_block_number().await
139    }
140
141    async fn get_hopr_allowance(&self, owner: Address, spender: Address) -> Result<HoprBalance> {
142        self.get_hopr_allowance(owner, spender).await
143    }
144
145    async fn get_xdai_balance(&self, address: Address) -> Result<XDaiBalance> {
146        self.get_xdai_balance(address).await
147    }
148
149    async fn get_hopr_balance(&self, address: Address) -> Result<HoprBalance> {
150        self.get_hopr_balance(address).await
151    }
152
153    fn try_stream_logs<'a>(
154        &'a self,
155        start_block_number: u64,
156        filters: FilterSet,
157        is_synced: bool,
158    ) -> Result<Pin<Box<dyn Stream<Item = BlockWithLogs> + Send + 'a>>> {
159        if filters.all.is_empty() {
160            return Err(FilterIsEmpty);
161        }
162
163        let log_filters = if !is_synced {
164            // Because we are not synced yet, we will not get logs for the token contract.
165            // These are only relevant for the indexer if we are synced.
166            filters.no_token
167        } else {
168            filters.all
169        };
170
171        Ok(Box::pin(stream! {
172            // On first iteration use the given block number as start
173            let mut from_block = start_block_number;
174
175            const MAX_LOOP_FAILURES: usize = 5;
176            const MAX_RPC_PAST_BLOCKS: usize = 50;
177            let mut count_failures = 0;
178
179            'outer: loop {
180                match self.block_number().await {
181                    Ok(latest_block) => {
182                        if from_block > latest_block {
183                            let past_diff = from_block - latest_block;
184                            if from_block == start_block_number {
185                                // If on first iteration the start block is in the future, just set
186                                // it to the latest
187                                from_block = latest_block;
188                            } else if past_diff <= MAX_RPC_PAST_BLOCKS as u64 {
189                                // If we came here early (we tolerate only off-by MAX_RPC_PAST_BLOCKS), wait some more
190                                debug!(last_block = latest_block, start_block = start_block_number, blocks_diff = past_diff, "Indexer premature request. Block not found yet in RPC provider.");
191                                futures_timer::Delay::new(past_diff as u32 * self.cfg.expected_block_time / 3).await;
192                                continue;
193                            } else {
194                                // This is a hard-failure on later iterations which is unrecoverable
195                                panic!("indexer start block number {from_block} is greater than the chain latest block number {latest_block} (diff {past_diff}) =>
196                                possible causes: chain reorg, RPC provider out of sync, corrupted DB =>
197                                possible solutions: change the RPC provider, reinitialize the DB");
198                            }
199                        }
200
201
202                        #[cfg(all(feature = "prometheus", not(test)))]
203                        METRIC_RPC_CHAIN_HEAD.set(latest_block as f64);
204
205                        let mut retrieved_logs = self.stream_logs(log_filters.clone(), from_block, latest_block);
206
207                        trace!(from_block, to_block = latest_block, "processing batch");
208
209                        let mut current_block_log = BlockWithLogs { block_id: from_block, ..Default::default()};
210
211                        loop {
212                            match retrieved_logs.next().await {
213                                Some(Ok(log)) => {
214                                    // This in general should not happen, but handle such a case to be safe
215                                    if log.block_number > latest_block {
216                                        warn!(%log, latest_block, "got log that has not yet reached the finalized tip");
217                                        break;
218                                    }
219
220                                    // This should not happen, thus panic.
221                                    if current_block_log.block_id > log.block_number {
222                                        error!(log_block_id = log.block_number, current_block_log.block_id, "received log from a previous block");
223                                        panic!("The on-chain logs are not ordered by block number. This is a critical error.");
224                                    }
225
226                                    // This assumes the logs are arriving ordered by blocks when fetching a range
227                                    if current_block_log.block_id < log.block_number {
228                                        debug!(block = %current_block_log, "completed block, moving to next");
229                                        yield current_block_log;
230
231                                        current_block_log = BlockWithLogs::default();
232                                        current_block_log.block_id = log.block_number;
233                                    }
234
235                                    debug!("retrieved {log}");
236                                    current_block_log.logs.insert(log.into());
237                                },
238                                None => {
239                                    break;
240                                },
241                                Some(Err(e)) => {
242                                    error!(error=%e, "failed to process blocks");
243                                    count_failures += 1;
244
245                                    if count_failures < MAX_LOOP_FAILURES {
246                                        // Continue the outer loop, which throws away the current block
247                                        // that may be incomplete due to this error.
248                                        // We will start at this block again to re-query it.
249                                        from_block = current_block_log.block_id;
250                                        continue 'outer;
251                                    } else {
252                                        panic!("!!! Cannot advance the chain indexing due to unrecoverable RPC errors.
253
254                                        The RPC provider does not seem to be working correctly.
255
256                                        The last encountered error was: {e}");
257                                    }
258                                }
259                            }
260                        }
261
262                        // Yield everything we've collected until this point
263                        debug!(block = %current_block_log, "completed block, processing batch finished");
264                        yield current_block_log;
265                        from_block = latest_block + 1;
266                        count_failures = 0;
267                    }
268
269                    Err(e) => error!(error = %e, "failed to obtain current block number from chain")
270                }
271
272                futures_timer::Delay::new(self.cfg.expected_block_time).await;
273            }
274        }))
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use std::time::Duration;
281
282    use alloy::{
283        primitives::U256,
284        rpc::{client::ClientBuilder, types::Filter},
285        sol_types::SolEvent,
286        transports::{http::ReqwestTransport, layers::RetryBackoffLayer},
287    };
288    use anyhow::Context;
289    use futures::StreamExt;
290    use hopr_async_runtime::prelude::{sleep, spawn};
291    use hopr_bindings::{
292        hoprchannelsevents::HoprChannelsEvents::{ChannelBalanceIncreased, ChannelOpened},
293        hoprtoken::HoprToken::{Approval, Transfer},
294    };
295    use hopr_chain_types::{ContractAddresses, ContractInstances};
296    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
297    use tokio::time::timeout;
298    use tracing::debug;
299
300    use crate::{
301        BlockWithLogs, FilterSet, HoprIndexerRpcOperations,
302        client::create_rpc_client_to_anvil,
303        errors::RpcError,
304        indexer::split_range,
305        rpc::{RpcOperations, RpcOperationsConfig},
306    };
307
308    fn filter_bounds(filters: &[Filter]) -> anyhow::Result<(u64, u64)> {
309        let bounds = filters.iter().try_fold((0, 0), |acc, filter| {
310            let to = filter
311                .block_option
312                .get_from_block()
313                .context("a value should be present")?
314                .as_number()
315                .context("a value should be convertible")?;
316            let from = filter
317                .block_option
318                .get_to_block()
319                .context("a value should be present")?
320                .as_number()
321                .context("a value should be convertible")?;
322            let next = (to, from);
323
324            match acc {
325                (0, 0) => Ok(next), // First pair becomes the reference
326                acc => {
327                    if acc != next {
328                        anyhow::bail!("range bounds are not equal across all filters");
329                    }
330                    Ok(acc)
331                }
332            }
333        })?;
334
335        Ok(bounds)
336    }
337
338    #[tokio::test]
339    async fn test_split_range() -> anyhow::Result<()> {
340        let filters = vec![Filter::default()];
341        let ranges = split_range(filters.clone(), 0, 10, 2).collect::<Vec<_>>().await;
342
343        assert_eq!(6, ranges.len());
344        assert_eq!((0, 1), filter_bounds(&ranges[0])?);
345        assert_eq!((2, 3), filter_bounds(&ranges[1])?);
346        assert_eq!((4, 5), filter_bounds(&ranges[2])?);
347        assert_eq!((6, 7), filter_bounds(&ranges[3])?);
348        assert_eq!((8, 9), filter_bounds(&ranges[4])?);
349        assert_eq!((10, 10), filter_bounds(&ranges[5])?);
350
351        let ranges = split_range(filters.clone(), 0, 0, 1).collect::<Vec<_>>().await;
352        assert_eq!(1, ranges.len());
353        assert_eq!((0, 0), filter_bounds(&ranges[0])?);
354
355        let ranges = split_range(filters.clone(), 0, 3, 1).collect::<Vec<_>>().await;
356        assert_eq!(4, ranges.len());
357        assert_eq!((0, 0), filter_bounds(&ranges[0])?);
358        assert_eq!((1, 1), filter_bounds(&ranges[1])?);
359        assert_eq!((2, 2), filter_bounds(&ranges[2])?);
360        assert_eq!((3, 3), filter_bounds(&ranges[3])?);
361
362        let ranges = split_range(filters.clone(), 0, 3, 10).collect::<Vec<_>>().await;
363        assert_eq!(1, ranges.len());
364        assert_eq!((0, 3), filter_bounds(&ranges[0])?);
365
366        Ok(())
367    }
368
369    #[tokio::test]
370    async fn test_should_get_block_number() -> anyhow::Result<()> {
371        let expected_block_time = Duration::from_secs(1);
372        let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
373        let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
374
375        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
376
377        let rpc_client = ClientBuilder::default()
378            .layer(RetryBackoffLayer::new(2, 100, 100))
379            .transport(transport_client.clone(), transport_client.guess_local());
380
381        let cfg = RpcOperationsConfig {
382            finality: 2,
383            expected_block_time,
384            gas_oracle_url: None,
385            ..RpcOperationsConfig::default()
386        };
387
388        // Wait until contracts deployments are final
389        sleep((1 + cfg.finality) * expected_block_time).await;
390
391        let rpc = RpcOperations::new(rpc_client, transport_client.client().clone(), &chain_key_0, cfg, None)?;
392
393        let b1 = rpc.block_number().await?;
394
395        sleep(expected_block_time * 2).await;
396
397        let b2 = rpc.block_number().await?;
398
399        assert!(b2 > b1, "block number should increase");
400
401        Ok(())
402    }
403
404    #[tokio::test]
405    async fn test_try_stream_logs_should_contain_all_logs_when_opening_channel() -> anyhow::Result<()> {
406        let _ = env_logger::builder().is_test(true).try_init();
407
408        let expected_block_time = Duration::from_secs(1);
409
410        let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
411        let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
412        let chain_key_1 = ChainKeypair::from_secret(anvil.keys()[1].to_bytes().as_ref())?;
413
414        // Deploy contracts
415        let contract_instances = {
416            let client = create_rpc_client_to_anvil(&anvil, &chain_key_0);
417            ContractInstances::deploy_for_testing(client, &chain_key_0).await?
418        };
419
420        let contract_addrs = ContractAddresses::from(&contract_instances);
421
422        let filter_token_approval = alloy::rpc::types::Filter::new()
423            .address(alloy::primitives::Address::from(contract_addrs.token))
424            .event_signature(Approval::SIGNATURE_HASH);
425        let filter_token_transfer = alloy::rpc::types::Filter::new()
426            .address(alloy::primitives::Address::from(contract_addrs.token))
427            .event_signature(Transfer::SIGNATURE_HASH);
428        let filter_channels_opened = alloy::rpc::types::Filter::new()
429            .address(alloy::primitives::Address::from(contract_addrs.channels))
430            .event_signature(ChannelOpened::SIGNATURE_HASH);
431        let filter_channels_balance_increased = alloy::rpc::types::Filter::new()
432            .address(alloy::primitives::Address::from(contract_addrs.channels))
433            .event_signature(ChannelBalanceIncreased::SIGNATURE_HASH);
434
435        let log_filter = FilterSet {
436            all: vec![
437                filter_token_approval.clone(),
438                filter_token_transfer.clone(),
439                filter_channels_opened.clone(),
440                filter_channels_balance_increased.clone(),
441            ],
442            token: vec![filter_token_approval, filter_token_transfer],
443            no_token: vec![filter_channels_opened, filter_channels_balance_increased],
444        };
445
446        debug!("{:#?}", contract_addrs);
447        debug!("{:#?}", log_filter);
448
449        let tokens_minted_at =
450            hopr_chain_types::utils::mint_tokens(contract_instances.token.clone(), U256::from(1000_u128))
451                .await?
452                .unwrap();
453        debug!("tokens were minted at block {tokens_minted_at}");
454
455        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
456
457        let rpc_client = ClientBuilder::default()
458            .layer(RetryBackoffLayer::new(2, 100, 100))
459            .transport(transport_client.clone(), transport_client.guess_local());
460
461        let cfg = RpcOperationsConfig {
462            tx_polling_interval: Duration::from_millis(10),
463            contract_addrs,
464            expected_block_time,
465            gas_oracle_url: None,
466            ..RpcOperationsConfig::default()
467        };
468
469        // Wait until contracts deployments are final
470        sleep((1 + cfg.finality) * expected_block_time).await;
471
472        let rpc = RpcOperations::new(rpc_client, transport_client.client().clone(), &chain_key_0, cfg, None)?;
473
474        // Spawn stream
475        let count_filtered_topics = 2;
476        let retrieved_logs = spawn(async move {
477            Ok::<_, RpcError>(
478                rpc.try_stream_logs(1, log_filter, false)?
479                    .skip_while(|b| futures::future::ready(b.len() != count_filtered_topics))
480                    .next()
481                    .await,
482            )
483        });
484
485        // Spawn channel funding
486        let _ = hopr_chain_types::utils::fund_channel(
487            chain_key_1.public().to_address(),
488            contract_instances.token,
489            contract_instances.channels,
490            U256::from(1_u128),
491        )
492        .await;
493
494        let retrieved_logs = timeout(Duration::from_secs(30), retrieved_logs) // Give up after 30 seconds
495            .await???;
496
497        // The last block must contain all 4 events
498        let last_block_logs = retrieved_logs
499            .into_iter()
500            .next_back()
501            .context("a log should be present")?
502            .clone()
503            .logs;
504
505        let channel_open_filter = ChannelOpened::SIGNATURE_HASH;
506        let channel_balance_filter = ChannelBalanceIncreased::SIGNATURE_HASH;
507
508        debug!(
509            "channel_open_filter: {:?} - {:?}",
510            channel_open_filter,
511            channel_open_filter.0.to_vec()
512        );
513        debug!(
514            "channel_balance_filter: {:?} - {:?}",
515            channel_balance_filter,
516            channel_balance_filter.0.to_vec()
517        );
518        debug!("logs: {:#?}", last_block_logs);
519
520        assert!(
521            last_block_logs
522                .iter()
523                .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_open_filter.into())),
524            "must contain channel open"
525        );
526        assert!(
527            last_block_logs.iter().any(
528                |log| log.address == contract_addrs.channels && log.topics.contains(&channel_balance_filter.into())
529            ),
530            "must contain channel balance increase"
531        );
532
533        Ok(())
534    }
535
536    #[tokio::test]
537    async fn test_try_stream_logs_should_contain_only_channel_logs_when_filtered_on_funding_channel()
538    -> anyhow::Result<()> {
539        let _ = env_logger::builder().is_test(true).try_init();
540
541        let expected_block_time = Duration::from_secs(1);
542
543        let anvil = hopr_chain_types::utils::create_anvil(Some(expected_block_time));
544        let chain_key_0 = ChainKeypair::from_secret(anvil.keys()[0].to_bytes().as_ref())?;
545        let chain_key_1 = ChainKeypair::from_secret(anvil.keys()[1].to_bytes().as_ref())?;
546
547        // Deploy contracts
548        let contract_instances = {
549            let client = create_rpc_client_to_anvil(&anvil, &chain_key_0);
550            ContractInstances::deploy_for_testing(client, &chain_key_0).await?
551        };
552
553        let tokens_minted_at =
554            hopr_chain_types::utils::mint_tokens(contract_instances.token.clone(), U256::from(1000_u128))
555                .await?
556                .unwrap();
557        debug!("tokens were minted at block {tokens_minted_at}");
558
559        let contract_addrs = ContractAddresses::from(&contract_instances);
560
561        let cfg = RpcOperationsConfig {
562            tx_polling_interval: Duration::from_millis(10),
563            contract_addrs,
564            expected_block_time,
565            finality: 2,
566            gas_oracle_url: None,
567            ..RpcOperationsConfig::default()
568        };
569
570        let transport_client = ReqwestTransport::new(anvil.endpoint_url());
571
572        let rpc_client = ClientBuilder::default()
573            .layer(RetryBackoffLayer::new(2, 100, 100))
574            .transport(transport_client.clone(), transport_client.guess_local());
575
576        // Wait until contracts deployments are final
577        sleep((1 + cfg.finality) * expected_block_time).await;
578
579        let rpc = RpcOperations::new(rpc_client, transport_client.client().clone(), &chain_key_0, cfg, None)?;
580
581        let filter_channels_opened = alloy::rpc::types::Filter::new()
582            .address(alloy::primitives::Address::from(contract_addrs.channels))
583            .event_signature(ChannelOpened::SIGNATURE_HASH);
584        let filter_channels_balance_increased = alloy::rpc::types::Filter::new()
585            .address(alloy::primitives::Address::from(contract_addrs.channels))
586            .event_signature(ChannelBalanceIncreased::SIGNATURE_HASH);
587
588        let log_filter = FilterSet {
589            all: vec![
590                filter_channels_opened.clone(),
591                filter_channels_balance_increased.clone(),
592            ],
593            token: vec![],
594            no_token: vec![filter_channels_opened, filter_channels_balance_increased],
595        };
596
597        debug!("{:#?}", contract_addrs);
598        debug!("{:#?}", log_filter);
599
600        // Spawn stream
601        let count_filtered_topics = 2;
602        let retrieved_logs = spawn(async move {
603            Ok::<_, RpcError>(
604                rpc.try_stream_logs(1, log_filter, false)?
605                    .skip_while(|b| futures::future::ready(b.len() != count_filtered_topics))
606                    // .next()
607                    .take(1)
608                    .collect::<Vec<BlockWithLogs>>()
609                    .await,
610            )
611        });
612
613        // Spawn channel funding
614        let _ = hopr_chain_types::utils::fund_channel(
615            chain_key_1.public().to_address(),
616            contract_instances.token,
617            contract_instances.channels,
618            U256::from(1_u128),
619        )
620        .await;
621
622        let retrieved_logs = timeout(Duration::from_secs(30), retrieved_logs) // Give up after 30 seconds
623            .await???;
624
625        // The last block must contain all 2 events
626        let last_block_logs = retrieved_logs
627            .first()
628            .context("a value should be present")?
629            .clone()
630            .logs;
631
632        let channel_open_filter: [u8; 32] = ChannelOpened::SIGNATURE_HASH.0;
633        let channel_balance_filter: [u8; 32] = ChannelBalanceIncreased::SIGNATURE_HASH.0;
634
635        assert!(
636            last_block_logs
637                .iter()
638                .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_open_filter)),
639            "must contain channel open"
640        );
641        assert!(
642            last_block_logs
643                .iter()
644                .any(|log| log.address == contract_addrs.channels && log.topics.contains(&channel_balance_filter)),
645            "must contain channel balance increase"
646        );
647
648        Ok(())
649    }
650}