Skip to main content

hopr_strategy/
channel_finalizer.rs

1use std::{
2    fmt::{Debug, Display, Formatter},
3    ops::Sub,
4    sync::Arc,
5    time::Duration,
6};
7
8use async_trait::async_trait;
9use futures::StreamExt;
10use hopr_lib::api::{
11    chain::{ChainReadChannelOperations, ChainWriteChannelOperations, ChannelSelector},
12    node::HasChainApi,
13    types::{internal::prelude::ChannelStatusDiscriminants, primitive::prelude::Utc},
14};
15use serde::{Deserialize, Serialize};
16use tracing::{debug, error, info};
17use validator::Validate;
18
19use crate::{errors, strategy::Strategy as StrategyTrait};
20
21#[cfg(all(feature = "telemetry", not(test)))]
22lazy_static::lazy_static! {
23    static ref METRIC_COUNT_CLOSURE_FINALIZATIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
24        "hopr_strategy_closure_auto_finalization_count",
25        "Count of channels where closure finalizing was initiated automatically"
26    )
27    .unwrap();
28}
29
30#[inline]
31fn default_max_closure_overdue() -> Duration {
32    Duration::from_secs(300)
33}
34
35/// Contains configuration of the [`ClosureFinalizerStrategy`].
36#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
37pub struct ClosureFinalizerStrategyConfig {
38    /// Do not attempt to finalize closure of channels that have
39    /// been overdue for closure for more than this period.
40    ///
41    /// Default is 300 seconds.
42    #[serde(default = "default_max_closure_overdue", with = "humantime_serde")]
43    #[default(default_max_closure_overdue())]
44    pub max_closure_overdue: Duration,
45}
46
47/// Builder for [`ClosureFinalizerStrategy`].
48///
49/// Call [`new`](ClosureFinalizerStrategy::new) with the strategy configuration,
50/// then [`build`](ClosureFinalizerStrategy::build) to wire in a node and obtain a
51/// runnable `Box<dyn Strategy + Send>`.
52pub struct ClosureFinalizerStrategy {
53    cfg: ClosureFinalizerStrategyConfig,
54    interval: Duration,
55}
56
57impl ClosureFinalizerStrategy {
58    /// Create a new builder with the given configuration.
59    pub fn new(cfg: ClosureFinalizerStrategyConfig, interval: Duration) -> Self {
60        Self { cfg, interval }
61    }
62
63    /// Wire in a node and return a running-ready strategy.
64    ///
65    /// The generic `N` is erased at construction time; the returned
66    /// `Box<dyn Strategy + Send>` can be held and spawned without knowledge
67    /// of the concrete node type.
68    pub fn build<N>(self, node: Arc<N>) -> Box<dyn StrategyTrait + Send>
69    where
70        N: HasChainApi + Send + Sync + 'static,
71        N::ChainApi: ChainReadChannelOperations + ChainWriteChannelOperations + Clone + Send + Sync + 'static,
72    {
73        Box::new(ClosureFinalizerStrategyInner {
74            node,
75            cfg: self.cfg,
76            interval: self.interval,
77        })
78    }
79}
80
81/// Private generic runner — constructed by [`ClosureFinalizerStrategy::build`].
82struct ClosureFinalizerStrategyInner<N: HasChainApi> {
83    node: Arc<N>,
84    cfg: ClosureFinalizerStrategyConfig,
85    interval: Duration,
86}
87
88impl<N> ClosureFinalizerStrategyInner<N>
89where
90    N: HasChainApi + Send + Sync + 'static,
91    <N as HasChainApi>::ChainApi:
92        ChainReadChannelOperations + ChainWriteChannelOperations + Clone + Send + Sync + 'static,
93{
94    async fn on_tick(&self) -> errors::Result<()> {
95        let now = Utc::now();
96        let chain = self.node.chain_api();
97        let mut outgoing_channels = chain
98            .stream_channels(
99                ChannelSelector::default()
100                    .with_source(*chain.me())
101                    .with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
102                    .with_closure_time_range(now.sub(self.cfg.max_closure_overdue)..=now),
103            )
104            .map_err(|e| errors::StrategyError::Other(e.into()))?;
105
106        while let Some(channel) = outgoing_channels.next().await {
107            info!(%channel, "channel closure finalizer: finalizing closure");
108            match self.node.chain_api().close_channel(channel.get_id()).await {
109                Ok(_) => {
110                    debug!(%channel, "channel closure finalizer: submitted close transaction");
111                    #[cfg(all(feature = "telemetry", not(test)))]
112                    METRIC_COUNT_CLOSURE_FINALIZATIONS.increment();
113                }
114                Err(e) => error!(%channel, error = %e, "channel closure finalizer: failed to finalize closure"),
115            }
116        }
117
118        debug!("channel closure finalizer: initiated closure finalization done");
119        Ok(())
120    }
121}
122
123impl<N: HasChainApi> Debug for ClosureFinalizerStrategyInner<N> {
124    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
125        write!(f, "ClosureFinalizerStrategy({:?})", self.cfg)
126    }
127}
128
129impl<N: HasChainApi> Display for ClosureFinalizerStrategyInner<N> {
130    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
131        write!(f, "closure_finalizer")
132    }
133}
134
135#[async_trait]
136impl<N> StrategyTrait for ClosureFinalizerStrategyInner<N>
137where
138    N: HasChainApi + Send + Sync + 'static,
139    <N as HasChainApi>::ChainApi:
140        ChainReadChannelOperations + ChainWriteChannelOperations + Clone + Send + Sync + 'static,
141{
142    async fn run(&mut self) -> errors::Result<()> {
143        // Run the first scan immediately at startup without waiting for the initial interval.
144        if let Err(e) = self.on_tick().await {
145            tracing::error!(%e, "closure finalizer tick failed");
146        }
147
148        let tick_stream = futures_time::stream::interval(self.interval.into()).map(|_| ());
149
150        futures::pin_mut!(tick_stream);
151        while tick_stream.next().await.is_some() {
152            if let Err(e) = self.on_tick().await {
153                tracing::error!(%e, "closure finalizer tick failed");
154            }
155        }
156
157        Ok(())
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use std::{ops::Add, sync::Arc, time::SystemTime};
164
165    use futures::StreamExt;
166    use futures_time::future::FutureExt;
167    use hex_literal::hex;
168    use hopr_chain_connector::{create_trustful_hopr_blokli_connector, testing::BlokliTestStateBuilder};
169    use hopr_lib::api::{
170        chain::{ChainEvent, ChainEvents, HoprChainApi},
171        node::{ComponentStatus, ComponentStatusReporter, EventWaitResult, HasChainApi, NodeOnchainIdentity},
172        types::{
173            crypto::{keypairs::Keypair, prelude::ChainKeypair},
174            internal::prelude::{ChannelEntry, ChannelStatus},
175            primitive::prelude::{Address, BytesRepresentable, HoprBalance, XDaiBalance},
176        },
177    };
178    use lazy_static::lazy_static;
179
180    use super::*;
181
182    lazy_static! {
183        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
184            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
185        ))
186        .expect("lazy static keypair should be valid");
187        static ref ALICE: Address = ALICE_KP.public().to_address();
188        static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
189        static ref CHARLIE: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
190        static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
191        static ref EUGENE: Address = hex!("0c1da65d269f89b05e3775bf8fcd21a138e8cbeb").into();
192    }
193
194    /// Wraps a chain API implementor as a minimal node for strategy tests.
195    struct ChainNode<C>(C);
196
197    impl<C> HasChainApi for ChainNode<C>
198    where
199        C: HoprChainApi + ComponentStatusReporter + Clone + Send + Sync + 'static,
200    {
201        type ChainApi = C;
202        type ChainError = <C as HoprChainApi>::ChainError;
203
204        fn identity(&self) -> &NodeOnchainIdentity {
205            static IDENTITY: std::sync::OnceLock<NodeOnchainIdentity> = std::sync::OnceLock::new();
206            IDENTITY.get_or_init(NodeOnchainIdentity::default)
207        }
208
209        fn chain_api(&self) -> &C {
210            &self.0
211        }
212
213        fn status(&self) -> ComponentStatus {
214            self.0.component_status()
215        }
216
217        fn wait_for_on_chain_event<F>(
218            &self,
219            _predicate: F,
220            _context: String,
221            _timeout: std::time::Duration,
222        ) -> EventWaitResult<<C as HoprChainApi>::ChainError, <C as HoprChainApi>::ChainError>
223        where
224            F: Fn(&ChainEvent) -> bool + Send + Sync + 'static,
225        {
226            unimplemented!("tests do not call wait_for_on_chain_event")
227        }
228    }
229
230    #[tokio::test]
231    async fn test_should_close_only_non_overdue_pending_to_close_channels_with_elapsed_closure() -> anyhow::Result<()> {
232        let max_closure_overdue = Duration::from_secs(600);
233
234        let channel_to_be_closed = ChannelEntry::builder()
235            .between(*ALICE, *DAVE)
236            .amount(10)
237            .ticket_index(0)
238            .status(ChannelStatus::PendingToClose(
239                SystemTime::now().sub(Duration::from_secs(60)),
240            ))
241            .epoch(1)
242            .build()?;
243
244        let blokli_sim = BlokliTestStateBuilder::default()
245            .with_generated_accounts(
246                &[&*ALICE, &*BOB, &*CHARLIE, &*DAVE, &*EUGENE],
247                false,
248                XDaiBalance::new_base(1),
249                HoprBalance::new_base(1000),
250            )
251            .with_channels([
252                ChannelEntry::builder()
253                    .between(*ALICE, *BOB)
254                    .amount(10)
255                    .ticket_index(0)
256                    .status(ChannelStatus::Open)
257                    .epoch(0)
258                    .build()?,
259                ChannelEntry::builder()
260                    .between(*ALICE, *CHARLIE)
261                    .amount(10)
262                    .ticket_index(0)
263                    .status(ChannelStatus::PendingToClose(
264                        SystemTime::now().add(Duration::from_secs(60)),
265                    ))
266                    .epoch(1)
267                    .build()?,
268                channel_to_be_closed,
269                ChannelEntry::builder()
270                    .between(*ALICE, *EUGENE)
271                    .amount(10)
272                    .ticket_index(0)
273                    .status(ChannelStatus::PendingToClose(
274                        SystemTime::now().sub(max_closure_overdue * 2),
275                    ))
276                    .epoch(1)
277                    .build()?,
278            ])
279            .build_dynamic_client([1; Address::SIZE].into());
280
281        let mut chain_connector =
282            create_trustful_hopr_blokli_connector(&ALICE_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
283                .await?;
284        chain_connector.connect().await?;
285        let chain_connector = Arc::new(chain_connector);
286        let events = chain_connector.subscribe()?;
287
288        let cfg = ClosureFinalizerStrategyConfig { max_closure_overdue };
289
290        let strat = ClosureFinalizerStrategyInner {
291            node: Arc::new(ChainNode(Arc::clone(&chain_connector))),
292            cfg,
293            interval: Duration::from_secs(60),
294        };
295        strat.on_tick().await?;
296
297        events
298            .filter(|event| {
299                futures::future::ready(
300                    matches!(event, ChainEvent::ChannelClosed(c) if channel_to_be_closed.get_id() == c.get_id()),
301                )
302            })
303            .next()
304            .timeout(futures_time::time::Duration::from_secs(2))
305            .await?;
306
307        Ok(())
308    }
309
310    /// Tests the public builder API: `ClosureFinalizerStrategy::new(...).build(node)` must
311    /// return a `Box<dyn Strategy + Send>` with the expected Display string.
312    #[tokio::test]
313    async fn test_build_returns_strategy_trait_object() -> anyhow::Result<()> {
314        let blokli_sim = BlokliTestStateBuilder::default()
315            .with_generated_accounts(
316                &[&*ALICE, &*BOB],
317                false,
318                XDaiBalance::new_base(1),
319                HoprBalance::new_base(1000),
320            )
321            .with_channels([])
322            .build_dynamic_client([1; Address::SIZE].into());
323
324        let mut chain_connector =
325            create_trustful_hopr_blokli_connector(&ALICE_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
326                .await?;
327        chain_connector.connect().await?;
328        let node = Arc::new(ChainNode(Arc::new(chain_connector)));
329
330        let strategy: Box<dyn crate::strategy::Strategy + Send> = super::ClosureFinalizerStrategy::new(
331            ClosureFinalizerStrategyConfig::default(),
332            std::time::Duration::from_secs(60),
333        )
334        .build(node);
335
336        assert_eq!(strategy.to_string(), "closure_finalizer");
337        // Verify the box is Send (compile-time check via trait object)
338        fn assert_send<T: Send>(_: T) {}
339        assert_send(strategy);
340
341        Ok(())
342    }
343}