hopr_strategy/
channel_finalizer.rs

1use std::{
2    fmt::{Display, Formatter},
3    ops::Sub,
4    time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use hopr_api::chain::{ChainReadChannelOperations, ChainWriteChannelOperations, ChannelSelector, Utc};
10use hopr_internal_types::prelude::*;
11use serde::{Deserialize, Serialize};
12use serde_with::{DurationSeconds, serde_as};
13use tracing::{debug, error, info};
14use validator::Validate;
15
16use crate::{Strategy, errors, strategy::SingularStrategy};
17
18#[cfg(all(feature = "prometheus", not(test)))]
19lazy_static::lazy_static! {
20    static ref METRIC_COUNT_CLOSURE_FINALIZATIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
21        "hopr_strategy_closure_auto_finalization_count",
22        "Count of channels where closure finalizing was initiated automatically"
23    )
24    .unwrap();
25}
26
27#[inline]
28fn default_max_closure_overdue() -> Duration {
29    Duration::from_secs(300)
30}
31
32/// Contains configuration of the [`ClosureFinalizerStrategy`].
33#[serde_as]
34#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
35pub struct ClosureFinalizerStrategyConfig {
36    /// Do not attempt to finalize closure of channels that have
37    /// been overdue for closure for more than this period.
38    ///
39    /// Default is 300 seconds.
40    #[serde_as(as = "DurationSeconds<u64>")]
41    #[serde(default = "default_max_closure_overdue")]
42    #[default(default_max_closure_overdue())]
43    pub max_closure_overdue: Duration,
44}
45
46/// Strategy which runs per tick and finalizes `PendingToClose` channels
47/// which have elapsed the grace period.
48pub struct ClosureFinalizerStrategy<A> {
49    cfg: ClosureFinalizerStrategyConfig,
50    hopr_chain_actions: A,
51}
52
53impl<A> ClosureFinalizerStrategy<A> {
54    /// Constructs the strategy.
55    pub fn new(cfg: ClosureFinalizerStrategyConfig, hopr_chain_actions: A) -> Self {
56        Self {
57            hopr_chain_actions,
58            cfg,
59        }
60    }
61}
62
63impl<A> Display for ClosureFinalizerStrategy<A> {
64    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
65        write!(f, "{}", Strategy::ClosureFinalizer(self.cfg))
66    }
67}
68
69#[async_trait]
70impl<A> SingularStrategy for ClosureFinalizerStrategy<A>
71where
72    A: ChainReadChannelOperations + ChainWriteChannelOperations + Send + Sync,
73{
74    async fn on_tick(&self) -> errors::Result<()> {
75        let now = Utc::now();
76        let mut outgoing_channels = self
77            .hopr_chain_actions
78            .stream_channels(
79                ChannelSelector::default()
80                    .with_source(*self.hopr_chain_actions.me())
81                    .with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
82                    .with_closure_time_range(now.sub(self.cfg.max_closure_overdue)..=now),
83            )
84            .await
85            .map_err(|e| errors::StrategyError::Other(e.into()))?;
86
87        while let Some(channel) = outgoing_channels.next().await {
88            info!(%channel, "channel closure finalizer: finalizing closure");
89            match self.hopr_chain_actions.close_channel(&channel.get_id()).await {
90                Ok(_) => {
91                    // Currently, we're not interested in awaiting the Close transactions to confirmation
92                    debug!(%channel, "channel closure finalizer: finalizing closure");
93                    #[cfg(all(feature = "prometheus", not(test)))]
94                    METRIC_COUNT_CLOSURE_FINALIZATIONS.increment();
95                }
96                Err(e) => error!(%channel, error = %e, "channel closure finalizer: failed to finalize closure"),
97            }
98        }
99
100        debug!("channel closure finalizer: initiated closure finalization done");
101        Ok(())
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use std::{collections::Bound, time::SystemTime};
108
109    use hex_literal::hex;
110    use hopr_api::chain::ChainReceipt;
111    use hopr_crypto_types::prelude::*;
112    use hopr_primitive_types::prelude::*;
113    use lazy_static::lazy_static;
114
115    use super::*;
116    use crate::tests::{MockChainActions, MockTestActions};
117
118    lazy_static! {
119        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
120            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
121        ))
122        .expect("lazy static keypair should be valid");
123        static ref ALICE: Address = ALICE_KP.public().to_address();
124        static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
125        static ref CHARLIE: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
126        static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
127        static ref EUGENE: Address = hex!("0c1da65d269f89b05e3775bf8fcd21a138e8cbeb").into();
128    }
129
130    #[tokio::test]
131    async fn test_should_close_only_non_overdue_pending_to_close_channels_with_elapsed_closure() -> anyhow::Result<()> {
132        let max_closure_overdue = Duration::from_secs(600);
133
134        // Should finalize closure of this channel
135        let c_pending_elapsed = ChannelEntry::new(
136            *ALICE,
137            *DAVE,
138            10.into(),
139            0.into(),
140            ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(60))),
141            0.into(),
142        );
143
144        let mut mock = MockTestActions::new();
145        mock.expect_me().return_const(*ALICE);
146
147        mock.expect_stream_channels()
148            .once()
149            .withf(move |selector| {
150                selector.source == Some(*ALICE)
151                    && selector.destination.is_none()
152                    && selector.allowed_states == &[ChannelStatusDiscriminants::PendingToClose]
153                    && match selector.closure_time_range {
154                        (Bound::Included(a), Bound::Included(b)) => {
155                            b.sub(a).to_std().is_ok_and(|d| d == max_closure_overdue)
156                        }
157                        _ => false,
158                    }
159            })
160            .return_once(move |_| futures::stream::iter([c_pending_elapsed]).boxed());
161
162        mock.expect_close_channel()
163            .once()
164            .with(mockall::predicate::eq(c_pending_elapsed.get_id()))
165            .return_once(|_| Ok((ChannelStatus::Closed, ChainReceipt::default())));
166
167        let cfg = ClosureFinalizerStrategyConfig { max_closure_overdue };
168
169        let strat = ClosureFinalizerStrategy::new(cfg, MockChainActions(mock.into()));
170        strat.on_tick().await?;
171
172        Ok(())
173    }
174}