Skip to main content

hopr_strategy/
auto_funding.rs

1//! ## Auto Funding Strategy
2//! This strategy listens for channel state change events to check whether a channel has dropped below
3//! `min_stake_threshold` HOPR. If this happens, the strategy issues a **fund channel** transaction to re-stake the
4//! channel with `funding_amount` HOPR.
5//!
6//! Additionally, the strategy periodically scans all outgoing open channels on each tick and funds
7//! any that have fallen below the threshold. This catches channels opened with low balance and
8//! channels that were underfunded when the node started.
9//!
10//! ### In-flight tracking
11//! To prevent duplicate funding when multiple balance-decrease events arrive in quick succession,
12//! the strategy maintains a set of channel IDs currently being funded.
13//! A channel is added to the set when a funding tx is successfully enqueued, and removed when:
14//! - The spawned confirmation task observes a transaction failure,
15//! - A balance increase event is observed for that channel (indicating the funding confirmed), or
16//! - An `on_tick` scan finds the channel's balance has risen above the threshold.
17//!
18//! ### Metrics
19//! - `hopr_strategy_auto_funding_funding_count` — incremented when a funding tx is successfully enqueued
20//! - `hopr_strategy_auto_funding_failure_count` — incremented when a funding tx fails to enqueue or confirm
21//!
22//! For details on default parameters see [AutoFundingStrategyConfig].
23use std::{
24    fmt::{Debug, Display, Formatter},
25    sync::Arc,
26};
27
28use async_trait::async_trait;
29use dashmap::DashSet;
30use futures::StreamExt;
31use hopr_lib::{
32    ChannelChange, ChannelDirection, ChannelEntry, ChannelId, ChannelStatus, ChannelStatusDiscriminants, HoprBalance,
33    api::chain::{ChainReadChannelOperations, ChainWriteChannelOperations, ChannelSelector},
34};
35use serde::{Deserialize, Serialize};
36use serde_with::{DisplayFromStr, serde_as};
37use tracing::{debug, info, warn};
38use validator::{Validate, ValidationError};
39
40use crate::{
41    Strategy,
42    errors::{StrategyError, StrategyError::CriteriaNotSatisfied},
43    strategy::SingularStrategy,
44};
45
46#[cfg(all(feature = "prometheus", not(test)))]
47lazy_static::lazy_static! {
48    static ref METRIC_COUNT_AUTO_FUNDINGS: hopr_metrics::SimpleCounter =
49        hopr_metrics::SimpleCounter::new("hopr_strategy_auto_funding_funding_count", "Count of initiated automatic fundings").unwrap();
50    static ref METRIC_COUNT_AUTO_FUNDING_FAILURES: hopr_metrics::SimpleCounter =
51        hopr_metrics::SimpleCounter::new("hopr_strategy_auto_funding_failure_count", "Count of failed automatic funding attempts").unwrap();
52}
53
54/// Validates that [`AutoFundingStrategyConfig::funding_amount`] is non-zero.
55fn validate_funding_amount(amount: &HoprBalance) -> std::result::Result<(), ValidationError> {
56    if amount.is_zero() {
57        return Err(ValidationError::new("funding_amount must be greater than zero"));
58    }
59    Ok(())
60}
61
62/// Configuration for `AutoFundingStrategy`
63#[serde_as]
64#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
65pub struct AutoFundingStrategyConfig {
66    /// Minimum stake that a channel's balance must not go below.
67    ///
68    /// Default is 1 wxHOPR
69    #[serde_as(as = "DisplayFromStr")]
70    #[default(HoprBalance::new_base(1))]
71    pub min_stake_threshold: HoprBalance,
72
73    /// Funding amount. Must be greater than zero.
74    ///
75    /// Defaults to 10 wxHOPR.
76    #[serde_as(as = "DisplayFromStr")]
77    #[default(HoprBalance::new_base(10))]
78    #[validate(custom(function = "validate_funding_amount"))]
79    pub funding_amount: HoprBalance,
80}
81
82/// The `AutoFundingStrategy` automatically funds a channel that
83/// dropped its staked balance below the configured threshold.
84///
85/// Tracks channels with in-flight funding transactions to prevent duplicate
86/// funding when multiple balance-decrease events arrive in quick succession.
87pub struct AutoFundingStrategy<A> {
88    hopr_chain_actions: Arc<A>,
89    cfg: AutoFundingStrategyConfig,
90    /// Channels with in-flight funding transactions. Entries are removed when:
91    /// - The spawned confirmation task observes a tx failure,
92    /// - A balance increase is observed via `on_own_channel_changed`, or
93    /// - `on_tick` finds the balance above threshold.
94    in_flight: Arc<DashSet<ChannelId>>,
95}
96
97impl<A: ChainReadChannelOperations + ChainWriteChannelOperations + Send + Sync + 'static> AutoFundingStrategy<A> {
98    pub fn new(cfg: AutoFundingStrategyConfig, hopr_chain_actions: A) -> Self {
99        if cfg.funding_amount.le(&cfg.min_stake_threshold) {
100            warn!(
101                funding_amount = %cfg.funding_amount,
102                min_stake_threshold = %cfg.min_stake_threshold,
103                "funding_amount is not greater than min_stake_threshold; \
104                 successful funding may re-trigger the threshold check"
105            );
106        }
107        Self {
108            cfg,
109            hopr_chain_actions: Arc::new(hopr_chain_actions),
110            in_flight: Arc::new(DashSet::new()),
111        }
112    }
113
114    /// Attempt to fund a channel if it is not already in-flight.
115    /// Returns `Ok(())` if the channel was skipped (already in-flight)
116    /// or if a funding task was spawned. The actual `fund_channel` call
117    /// happens inside the spawned task so the confirmation future is `'static`.
118    async fn try_fund_channel(&self, channel: &ChannelEntry) -> crate::errors::Result<()> {
119        let channel_id = *channel.get_id();
120
121        // Atomically check and mark as in-flight
122        if !self.in_flight.insert(channel_id) {
123            debug!(%channel, "skipping channel with in-flight funding");
124            return Ok(());
125        }
126
127        info!(
128            %channel,
129            balance = %channel.balance,
130            threshold = %self.cfg.min_stake_threshold,
131            "stake on channel at or below threshold"
132        );
133
134        let chain_actions = Arc::clone(&self.hopr_chain_actions);
135        let funding_amount = self.cfg.funding_amount;
136        let in_flight = Arc::clone(&self.in_flight);
137
138        hopr_async_runtime::prelude::spawn(async move {
139            match chain_actions.fund_channel(&channel_id, funding_amount).await {
140                Ok(confirmation) => {
141                    #[cfg(all(feature = "prometheus", not(test)))]
142                    METRIC_COUNT_AUTO_FUNDINGS.increment();
143
144                    info!(%channel_id, %funding_amount, "issued re-staking of channel");
145
146                    if let Err(e) = confirmation.await {
147                        warn!(%channel_id, error = %e, "funding transaction failed");
148                        in_flight.remove(&channel_id);
149
150                        #[cfg(all(feature = "prometheus", not(test)))]
151                        METRIC_COUNT_AUTO_FUNDING_FAILURES.increment();
152                    }
153                    // On success: the ChannelBalanceIncreased event will clear the
154                    // in-flight entry via on_own_channel_changed.
155                }
156                Err(e) => {
157                    warn!(%channel_id, error = %e, "failed to enqueue funding transaction");
158                    in_flight.remove(&channel_id);
159
160                    #[cfg(all(feature = "prometheus", not(test)))]
161                    METRIC_COUNT_AUTO_FUNDING_FAILURES.increment();
162                }
163            }
164        });
165
166        Ok(())
167    }
168}
169
170impl<A> Debug for AutoFundingStrategy<A> {
171    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172        write!(f, "{:?}", Strategy::AutoFunding(self.cfg))
173    }
174}
175
176impl<A> Display for AutoFundingStrategy<A> {
177    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178        write!(f, "{}", Strategy::AutoFunding(self.cfg))
179    }
180}
181
182#[async_trait]
183impl<A: ChainReadChannelOperations + ChainWriteChannelOperations + Send + Sync + 'static> SingularStrategy
184    for AutoFundingStrategy<A>
185{
186    /// Periodically scans all outgoing open channels and funds any with balance at or below
187    /// the configured threshold. Skips channels that already have in-flight funding transactions.
188    ///
189    /// This handles two cases that event-driven funding misses:
190    /// - Channels opened with balance already below threshold (only a `ChannelOpened` event is emitted, which doesn't
191    ///   trigger balance-based funding)
192    /// - Channels that were underfunded when the node started or restarted (no events are replayed to the strategy at
193    ///   startup)
194    async fn on_tick(&self) -> crate::errors::Result<()> {
195        let mut channels = self
196            .hopr_chain_actions
197            .stream_channels(
198                ChannelSelector::default()
199                    .with_source(*self.hopr_chain_actions.me())
200                    .with_allowed_states(&[ChannelStatusDiscriminants::Open]),
201            )
202            .await
203            .map_err(|e| StrategyError::Other(e.into()))?;
204
205        while let Some(channel) = channels.next().await {
206            if channel.balance.le(&self.cfg.min_stake_threshold) {
207                if let Err(e) = self.try_fund_channel(&channel).await {
208                    warn!(%channel, error = %e, "on_tick: failed to fund channel");
209                }
210            } else {
211                // Channel is above threshold; clear any stale in-flight entry
212                self.in_flight.remove(channel.get_id());
213            }
214        }
215
216        debug!("auto-funding on_tick scan complete");
217        Ok(())
218    }
219
220    async fn on_own_channel_changed(
221        &self,
222        channel: &ChannelEntry,
223        direction: ChannelDirection,
224        change: ChannelChange,
225    ) -> crate::errors::Result<()> {
226        // Can only auto-fund outgoing channels
227        if direction != ChannelDirection::Outgoing {
228            return Ok(());
229        }
230
231        if let ChannelChange::Balance { left: old, right: new } = change {
232            // If balance increased, clear in-flight state for this channel
233            if new > old && self.in_flight.remove(channel.get_id()).is_some() {
234                debug!(%channel, "cleared in-flight funding state after balance increase");
235            }
236
237            if new.le(&self.cfg.min_stake_threshold) && channel.status == ChannelStatus::Open {
238                self.try_fund_channel(channel).await?;
239            }
240            Ok(())
241        } else {
242            Err(CriteriaNotSatisfied)
243        }
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use std::str::FromStr;
250
251    use futures::StreamExt;
252    use futures_time::future::FutureExt;
253    use hex_literal::hex;
254    use hopr_chain_connector::{create_trustful_hopr_blokli_connector, testing::BlokliTestStateBuilder};
255    use hopr_lib::{
256        Address, BytesRepresentable, ChainKeypair, Keypair, XDaiBalance,
257        api::chain::{ChainEvent, ChainEvents},
258    };
259
260    use super::*;
261    use crate::{
262        auto_funding::{AutoFundingStrategy, AutoFundingStrategyConfig},
263        strategy::SingularStrategy,
264    };
265
266    lazy_static::lazy_static! {
267        static ref BOB_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
268            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
269        ))
270        .expect("lazy static keypair should be valid");
271
272        static ref ALICE: Address = hex!("18f8ae833c85c51fbeba29cef9fbfb53b3bad950").into();
273        static ref BOB: Address = BOB_KP.public().to_address();
274        static ref CHRIS: Address = hex!("b6021e0860dd9d96c9ff0a73e2e5ba3a466ba234").into();
275        static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
276    }
277
278    #[test_log::test(tokio::test)]
279    async fn test_auto_funding_strategy() -> anyhow::Result<()> {
280        let stake_limit = HoprBalance::from(7_u32);
281        let fund_amount = HoprBalance::from(5_u32);
282
283        let c1 = ChannelEntry::new(*ALICE, *BOB, 10_u32.into(), 0_u32.into(), ChannelStatus::Open, 0);
284
285        let c2 = ChannelEntry::new(*BOB, *CHRIS, 5_u32.into(), 0_u32.into(), ChannelStatus::Open, 0);
286
287        let c3 = ChannelEntry::new(
288            *CHRIS,
289            *DAVE,
290            5_u32.into(),
291            0_u32.into(),
292            ChannelStatus::PendingToClose(
293                chrono::DateTime::<chrono::Utc>::from_str("2025-11-10T00:00:00+00:00")?.into(),
294            ),
295            0,
296        );
297
298        let blokli_sim = BlokliTestStateBuilder::default()
299            .with_generated_accounts(
300                &[&*ALICE, &*BOB, &*CHRIS, &*DAVE],
301                false,
302                XDaiBalance::new_base(1),
303                HoprBalance::new_base(1000),
304            )
305            .with_channels([c1, c2, c3])
306            .build_dynamic_client([1; Address::SIZE].into());
307
308        let snapshot = blokli_sim.snapshot();
309
310        let mut chain_connector =
311            create_trustful_hopr_blokli_connector(&BOB_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
312                .await?;
313        chain_connector.connect().await?;
314        let events = chain_connector.subscribe()?;
315
316        let cfg = AutoFundingStrategyConfig {
317            min_stake_threshold: stake_limit,
318            funding_amount: fund_amount,
319        };
320
321        let afs = AutoFundingStrategy::new(cfg, chain_connector);
322        afs.on_own_channel_changed(
323            &c1,
324            ChannelDirection::Outgoing,
325            ChannelChange::Balance {
326                left: HoprBalance::zero(),
327                right: c1.balance,
328            },
329        )
330        .await?;
331
332        afs.on_own_channel_changed(
333            &c2,
334            ChannelDirection::Outgoing,
335            ChannelChange::Balance {
336                left: HoprBalance::zero(),
337                right: c2.balance,
338            },
339        )
340        .await?;
341
342        afs.on_own_channel_changed(
343            &c3,
344            ChannelDirection::Outgoing,
345            ChannelChange::Balance {
346                left: HoprBalance::zero(),
347                right: c3.balance,
348            },
349        )
350        .await?;
351
352        events
353            .filter(|event| futures::future::ready(matches!(event, ChainEvent::ChannelBalanceIncreased(c, amount) if c.get_id() == c2.get_id() && amount == &fund_amount)))
354            .next()
355            .timeout(futures_time::time::Duration::from_secs(2))
356            .await?;
357
358        insta::assert_yaml_snapshot!(*snapshot.refresh());
359
360        Ok(())
361    }
362
363    #[test]
364    fn test_config_validation_rejects_zero_funding_amount() {
365        let cfg = AutoFundingStrategyConfig {
366            min_stake_threshold: HoprBalance::new_base(1),
367            funding_amount: HoprBalance::zero(),
368        };
369        assert!(
370            cfg.validate().is_err(),
371            "config with zero funding_amount should fail validation"
372        );
373    }
374
375    #[test]
376    fn test_config_validation_accepts_valid_config() {
377        let cfg = AutoFundingStrategyConfig {
378            min_stake_threshold: HoprBalance::new_base(1),
379            funding_amount: HoprBalance::new_base(10),
380        };
381        assert!(
382            cfg.validate().is_ok(),
383            "config with valid funding_amount should pass validation"
384        );
385    }
386
387    #[test]
388    fn test_default_config_passes_validation() {
389        let cfg = AutoFundingStrategyConfig::default();
390        assert!(cfg.validate().is_ok(), "default config should pass validation");
391    }
392
393    #[test_log::test(tokio::test)]
394    async fn test_on_tick_funds_underfunded_channels() -> anyhow::Result<()> {
395        let stake_limit = HoprBalance::from(7_u32);
396        let fund_amount = HoprBalance::from(5_u32);
397
398        // BOB -> CHRIS channel with balance below threshold
399        let c1 = ChannelEntry::new(*BOB, *CHRIS, 3_u32.into(), 0_u32.into(), ChannelStatus::Open, 0_u32);
400
401        // BOB -> DAVE channel with balance above threshold
402        let c2 = ChannelEntry::new(*BOB, *DAVE, 10_u32.into(), 0_u32.into(), ChannelStatus::Open, 0_u32);
403
404        let blokli_sim = BlokliTestStateBuilder::default()
405            .with_generated_accounts(
406                &[&*ALICE, &*BOB, &*CHRIS, &*DAVE],
407                false,
408                XDaiBalance::new_base(1),
409                HoprBalance::new_base(1000),
410            )
411            .with_channels([c1, c2])
412            .build_dynamic_client([1; Address::SIZE].into());
413
414        let mut chain_connector =
415            create_trustful_hopr_blokli_connector(&BOB_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
416                .await?;
417        chain_connector.connect().await?;
418        let events = chain_connector.subscribe()?;
419
420        let cfg = AutoFundingStrategyConfig {
421            min_stake_threshold: stake_limit,
422            funding_amount: fund_amount,
423        };
424
425        let afs = AutoFundingStrategy::new(cfg, chain_connector);
426
427        // on_tick should scan channels and fund c1 (below threshold) but not c2 (above threshold)
428        afs.on_tick().await?;
429
430        // Expect a ChannelBalanceIncreased event for c1
431        events
432            .filter(|event| {
433                futures::future::ready(
434                    matches!(event, ChainEvent::ChannelBalanceIncreased(c, amount) if c.get_id() == c1.get_id() && amount == &fund_amount),
435                )
436            })
437            .next()
438            .timeout(futures_time::time::Duration::from_secs(2))
439            .await?;
440
441        Ok(())
442    }
443
444    #[test_log::test(tokio::test)]
445    async fn test_in_flight_prevents_duplicate_funding() -> anyhow::Result<()> {
446        let stake_limit = HoprBalance::from(7_u32);
447        let fund_amount = HoprBalance::from(5_u32);
448
449        let c1 = ChannelEntry::new(*BOB, *CHRIS, 3_u32.into(), 0_u32.into(), ChannelStatus::Open, 0_u32);
450
451        let blokli_sim = BlokliTestStateBuilder::default()
452            .with_generated_accounts(
453                &[&*ALICE, &*BOB, &*CHRIS, &*DAVE],
454                false,
455                XDaiBalance::new_base(1),
456                HoprBalance::new_base(1000),
457            )
458            .with_channels([c1])
459            .build_dynamic_client([1; Address::SIZE].into());
460
461        let mut chain_connector =
462            create_trustful_hopr_blokli_connector(&BOB_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
463                .await?;
464        chain_connector.connect().await?;
465        let _events = chain_connector.subscribe()?;
466
467        let cfg = AutoFundingStrategyConfig {
468            min_stake_threshold: stake_limit,
469            funding_amount: fund_amount,
470        };
471
472        let afs = AutoFundingStrategy::new(cfg, chain_connector);
473
474        // First call should trigger funding
475        afs.on_own_channel_changed(
476            &c1,
477            ChannelDirection::Outgoing,
478            ChannelChange::Balance {
479                left: HoprBalance::from(10_u32),
480                right: c1.balance,
481            },
482        )
483        .await?;
484
485        // Verify the channel is in the in-flight set
486        assert!(
487            afs.in_flight.contains(c1.get_id()),
488            "channel should be in the in-flight set after funding"
489        );
490
491        // Second call with same balance should be skipped due to in-flight tracking.
492        // This returns Ok(()) rather than triggering another funding tx.
493        afs.on_own_channel_changed(
494            &c1,
495            ChannelDirection::Outgoing,
496            ChannelChange::Balance {
497                left: HoprBalance::from(10_u32),
498                right: c1.balance,
499            },
500        )
501        .await?;
502
503        // The in-flight set should still contain exactly one entry (unchanged)
504        assert_eq!(
505            afs.in_flight.len(),
506            1,
507            "in-flight set should still have exactly one entry"
508        );
509        assert!(
510            afs.in_flight.contains(c1.get_id()),
511            "channel should still be in the in-flight set"
512        );
513
514        Ok(())
515    }
516
517    #[test_log::test(tokio::test)]
518    async fn test_balance_increase_clears_in_flight() -> anyhow::Result<()> {
519        let stake_limit = HoprBalance::from(7_u32);
520        let fund_amount = HoprBalance::from(5_u32);
521
522        let c1 = ChannelEntry::new(*BOB, *CHRIS, 3_u32.into(), 0_u32.into(), ChannelStatus::Open, 0_u32);
523
524        let blokli_sim = BlokliTestStateBuilder::default()
525            .with_generated_accounts(
526                &[&*ALICE, &*BOB, &*CHRIS, &*DAVE],
527                false,
528                XDaiBalance::new_base(1),
529                HoprBalance::new_base(1000),
530            )
531            .with_channels([c1])
532            .build_dynamic_client([1; Address::SIZE].into());
533
534        let mut chain_connector =
535            create_trustful_hopr_blokli_connector(&BOB_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
536                .await?;
537        chain_connector.connect().await?;
538        let _events = chain_connector.subscribe()?;
539
540        let cfg = AutoFundingStrategyConfig {
541            min_stake_threshold: stake_limit,
542            funding_amount: fund_amount,
543        };
544
545        let afs = AutoFundingStrategy::new(cfg, chain_connector);
546
547        // Trigger funding (balance decrease below threshold)
548        afs.on_own_channel_changed(
549            &c1,
550            ChannelDirection::Outgoing,
551            ChannelChange::Balance {
552                left: HoprBalance::from(10_u32),
553                right: c1.balance,
554            },
555        )
556        .await?;
557
558        // Verify channel is in-flight
559        assert!(afs.in_flight.contains(c1.get_id()));
560
561        // Simulate balance increase event (funding confirmed)
562        let funded_channel = ChannelEntry::new(
563            *BOB,
564            *CHRIS,
565            (3_u32 + 5_u32).into(),
566            0_u32.into(),
567            ChannelStatus::Open,
568            0,
569        );
570
571        afs.on_own_channel_changed(
572            &funded_channel,
573            ChannelDirection::Outgoing,
574            ChannelChange::Balance {
575                left: HoprBalance::from(3_u32),
576                right: HoprBalance::from(8_u32),
577            },
578        )
579        .await?;
580
581        // Verify channel is no longer in-flight
582        assert!(
583            !afs.in_flight.contains(c1.get_id()),
584            "channel should be cleared from in-flight after balance increase"
585        );
586
587        Ok(())
588    }
589
590    #[test_log::test(tokio::test)]
591    async fn test_on_tick_skips_in_flight_channels() -> anyhow::Result<()> {
592        let stake_limit = HoprBalance::from(7_u32);
593        let fund_amount = HoprBalance::from(5_u32);
594
595        // BOB -> CHRIS channel with balance below threshold
596        let c1 = ChannelEntry::new(*BOB, *CHRIS, 3_u32.into(), 0_u32.into(), ChannelStatus::Open, 0);
597
598        let blokli_sim = BlokliTestStateBuilder::default()
599            .with_generated_accounts(
600                &[&*ALICE, &*BOB, &*CHRIS, &*DAVE],
601                false,
602                XDaiBalance::new_base(1),
603                HoprBalance::new_base(1000),
604            )
605            .with_channels([c1])
606            .build_dynamic_client([1; Address::SIZE].into());
607
608        let mut chain_connector =
609            create_trustful_hopr_blokli_connector(&BOB_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
610                .await?;
611        chain_connector.connect().await?;
612        let _events = chain_connector.subscribe()?;
613
614        let cfg = AutoFundingStrategyConfig {
615            min_stake_threshold: stake_limit,
616            funding_amount: fund_amount,
617        };
618
619        let afs = AutoFundingStrategy::new(cfg, chain_connector);
620
621        // Fund via on_own_channel_changed to populate in-flight set
622        afs.on_own_channel_changed(
623            &c1,
624            ChannelDirection::Outgoing,
625            ChannelChange::Balance {
626                left: HoprBalance::from(10_u32),
627                right: c1.balance,
628            },
629        )
630        .await?;
631
632        // Verify the channel is in-flight
633        assert!(
634            afs.in_flight.contains(c1.get_id()),
635            "channel should be in the in-flight set after funding"
636        );
637
638        // on_tick should skip c1 because it is already in-flight
639        afs.on_tick().await?;
640
641        // Verify the in-flight set still has exactly one entry (channel was not re-funded)
642        assert_eq!(
643            afs.in_flight.len(),
644            1,
645            "in-flight set should still have exactly one entry"
646        );
647        assert!(
648            afs.in_flight.contains(c1.get_id()),
649            "channel should still be in the in-flight set"
650        );
651
652        Ok(())
653    }
654}