hopr_strategy/
channel_finalizer.rs

1use std::{
2    fmt::{Display, Formatter},
3    ops::Sub,
4    time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use hopr_lib::{
10    ChannelStatusDiscriminants, Utc,
11    exports::api::chain::{ChainReadChannelOperations, ChainWriteChannelOperations, ChannelSelector},
12};
13use serde::{Deserialize, Serialize};
14use tracing::{debug, error, info};
15use validator::Validate;
16
17use crate::{Strategy, errors, strategy::SingularStrategy};
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21    static ref METRIC_COUNT_CLOSURE_FINALIZATIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
22        "hopr_strategy_closure_auto_finalization_count",
23        "Count of channels where closure finalizing was initiated automatically"
24    )
25    .unwrap();
26}
27
28#[inline]
29fn default_max_closure_overdue() -> Duration {
30    Duration::from_secs(300)
31}
32
33/// Contains configuration of the [`ClosureFinalizerStrategy`].
34#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
35pub struct ClosureFinalizerStrategyConfig {
36    /// Do not attempt to finalize closure of channels that have
37    /// been overdue for closure for more than this period.
38    ///
39    /// Default is 300 seconds.
40    #[serde(default = "default_max_closure_overdue", with = "humantime_serde")]
41    #[default(default_max_closure_overdue())]
42    pub max_closure_overdue: Duration,
43}
44
45/// Strategy which runs per tick and finalizes `PendingToClose` channels
46/// which have elapsed the grace period.
47pub struct ClosureFinalizerStrategy<A> {
48    cfg: ClosureFinalizerStrategyConfig,
49    hopr_chain_actions: A,
50}
51
52impl<A> ClosureFinalizerStrategy<A> {
53    /// Constructs the strategy.
54    pub fn new(cfg: ClosureFinalizerStrategyConfig, hopr_chain_actions: A) -> Self {
55        Self {
56            hopr_chain_actions,
57            cfg,
58        }
59    }
60}
61
62impl<A> Display for ClosureFinalizerStrategy<A> {
63    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
64        write!(f, "{}", Strategy::ClosureFinalizer(self.cfg))
65    }
66}
67
68#[async_trait]
69impl<A> SingularStrategy for ClosureFinalizerStrategy<A>
70where
71    A: ChainReadChannelOperations + ChainWriteChannelOperations + Send + Sync,
72{
73    async fn on_tick(&self) -> errors::Result<()> {
74        let now = Utc::now();
75        let mut outgoing_channels = self
76            .hopr_chain_actions
77            .stream_channels(
78                ChannelSelector::default()
79                    .with_source(*self.hopr_chain_actions.me())
80                    .with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
81                    .with_closure_time_range(now.sub(self.cfg.max_closure_overdue)..=now),
82            )
83            .await
84            .map_err(|e| errors::StrategyError::Other(e.into()))?;
85
86        while let Some(channel) = outgoing_channels.next().await {
87            info!(%channel, "channel closure finalizer: finalizing closure");
88            match self.hopr_chain_actions.close_channel(channel.get_id()).await {
89                Ok(_) => {
90                    // Currently, we're not interested in awaiting the Close transactions to confirmation
91                    debug!(%channel, "channel closure finalizer: finalizing closure");
92                    #[cfg(all(feature = "prometheus", not(test)))]
93                    METRIC_COUNT_CLOSURE_FINALIZATIONS.increment();
94                }
95                Err(e) => error!(%channel, error = %e, "channel closure finalizer: failed to finalize closure"),
96            }
97        }
98
99        debug!("channel closure finalizer: initiated closure finalization done");
100        Ok(())
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use std::{ops::Add, time::SystemTime};
107
108    use futures_time::future::FutureExt;
109    use hex_literal::hex;
110    use hopr_chain_connector::{create_trustful_hopr_blokli_connector, testing::BlokliTestStateBuilder};
111    use hopr_lib::{
112        Address, BytesRepresentable, ChainKeypair, ChannelEntry, ChannelStatus, HoprBalance, Keypair, XDaiBalance,
113        exports::api::chain::{ChainEvent, ChainEvents},
114    };
115    use lazy_static::lazy_static;
116
117    use super::*;
118
119    lazy_static! {
120        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
121            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
122        ))
123        .expect("lazy static keypair should be valid");
124        static ref ALICE: Address = ALICE_KP.public().to_address();
125        static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
126        static ref CHARLIE: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
127        static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
128        static ref EUGENE: Address = hex!("0c1da65d269f89b05e3775bf8fcd21a138e8cbeb").into();
129    }
130
131    #[tokio::test]
132    async fn test_should_close_only_non_overdue_pending_to_close_channels_with_elapsed_closure() -> anyhow::Result<()> {
133        let max_closure_overdue = Duration::from_secs(600);
134
135        let channel_to_be_closed = ChannelEntry::new(
136            *ALICE,
137            *DAVE,
138            10.into(),
139            0,
140            ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(60))),
141            1,
142        );
143
144        let blokli_sim = BlokliTestStateBuilder::default()
145            .with_generated_accounts(
146                &[&*ALICE, &*BOB, &*CHARLIE, &*DAVE, &*EUGENE],
147                false,
148                XDaiBalance::new_base(1),
149                HoprBalance::new_base(1000),
150            )
151            .with_channels([
152                // Should leave this channel opened
153                ChannelEntry::new(*ALICE, *BOB, 10.into(), 0, ChannelStatus::Open, 0),
154                // Should leave this unfinalized, because the channel closure period has not yet elapsed
155                ChannelEntry::new(
156                    *ALICE,
157                    *CHARLIE,
158                    10.into(),
159                    0,
160                    ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(60))),
161                    1,
162                ),
163                // Should finalize closure of this channel
164                channel_to_be_closed,
165                // Should leave this unfinalized, because the channel closure is long overdue
166                ChannelEntry::new(
167                    *ALICE,
168                    *EUGENE,
169                    10.into(),
170                    0,
171                    ChannelStatus::PendingToClose(SystemTime::now().sub(max_closure_overdue * 2)),
172                    1,
173                ),
174            ])
175            .build_dynamic_client([1; Address::SIZE].into());
176
177        let mut chain_connector =
178            create_trustful_hopr_blokli_connector(&ALICE_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
179                .await?;
180        chain_connector.connect(Duration::from_secs(3)).await?;
181        let events = chain_connector.subscribe()?;
182
183        let cfg = ClosureFinalizerStrategyConfig { max_closure_overdue };
184
185        let strat = ClosureFinalizerStrategy::new(cfg, chain_connector);
186        strat.on_tick().await?;
187
188        events
189            .filter(|event| {
190                futures::future::ready(
191                    matches!(event, ChainEvent::ChannelClosed(c) if channel_to_be_closed.get_id() == c.get_id()),
192                )
193            })
194            .next()
195            .timeout(futures_time::time::Duration::from_secs(2))
196            .await?;
197
198        // Cannot do snapshot testing here, since the execution is time-dependent
199
200        Ok(())
201    }
202}