hopr_strategy/
channel_finalizer.rs1use std::{
2 fmt::{Display, Formatter},
3 ops::Sub,
4 time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use hopr_api::chain::{ChainReadChannelOperations, ChainWriteChannelOperations, ChannelSelector, Utc};
10use hopr_internal_types::prelude::*;
11use serde::{Deserialize, Serialize};
12use serde_with::{DurationSeconds, serde_as};
13use tracing::{debug, error, info};
14use validator::Validate;
15
16use crate::{Strategy, errors, strategy::SingularStrategy};
17
18#[cfg(all(feature = "prometheus", not(test)))]
19lazy_static::lazy_static! {
20 static ref METRIC_COUNT_CLOSURE_FINALIZATIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
21 "hopr_strategy_closure_auto_finalization_count",
22 "Count of channels where closure finalizing was initiated automatically"
23 )
24 .unwrap();
25}
26
27#[inline]
28fn default_max_closure_overdue() -> Duration {
29 Duration::from_secs(300)
30}
31
32#[serde_as]
34#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
35pub struct ClosureFinalizerStrategyConfig {
36 #[serde_as(as = "DurationSeconds<u64>")]
41 #[serde(default = "default_max_closure_overdue")]
42 #[default(default_max_closure_overdue())]
43 pub max_closure_overdue: Duration,
44}
45
46pub struct ClosureFinalizerStrategy<A> {
49 cfg: ClosureFinalizerStrategyConfig,
50 hopr_chain_actions: A,
51}
52
53impl<A> ClosureFinalizerStrategy<A> {
54 pub fn new(cfg: ClosureFinalizerStrategyConfig, hopr_chain_actions: A) -> Self {
56 Self {
57 hopr_chain_actions,
58 cfg,
59 }
60 }
61}
62
63impl<A> Display for ClosureFinalizerStrategy<A> {
64 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
65 write!(f, "{}", Strategy::ClosureFinalizer(self.cfg))
66 }
67}
68
69#[async_trait]
70impl<A> SingularStrategy for ClosureFinalizerStrategy<A>
71where
72 A: ChainReadChannelOperations + ChainWriteChannelOperations + Send + Sync,
73{
74 async fn on_tick(&self) -> errors::Result<()> {
75 let now = Utc::now();
76 let mut outgoing_channels = self
77 .hopr_chain_actions
78 .stream_channels(
79 ChannelSelector::default()
80 .with_source(*self.hopr_chain_actions.me())
81 .with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
82 .with_closure_time_range(now.sub(self.cfg.max_closure_overdue)..=now),
83 )
84 .await
85 .map_err(|e| errors::StrategyError::Other(e.into()))?;
86
87 while let Some(channel) = outgoing_channels.next().await {
88 info!(%channel, "channel closure finalizer: finalizing closure");
89 match self.hopr_chain_actions.close_channel(&channel.get_id()).await {
90 Ok(_) => {
91 debug!(%channel, "channel closure finalizer: finalizing closure");
93 #[cfg(all(feature = "prometheus", not(test)))]
94 METRIC_COUNT_CLOSURE_FINALIZATIONS.increment();
95 }
96 Err(e) => error!(%channel, error = %e, "channel closure finalizer: failed to finalize closure"),
97 }
98 }
99
100 debug!("channel closure finalizer: initiated closure finalization done");
101 Ok(())
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use std::{collections::Bound, time::SystemTime};
108
109 use hex_literal::hex;
110 use hopr_api::chain::ChainReceipt;
111 use hopr_crypto_types::prelude::*;
112 use hopr_primitive_types::prelude::*;
113 use lazy_static::lazy_static;
114
115 use super::*;
116 use crate::tests::{MockChainActions, MockTestActions};
117
118 lazy_static! {
119 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
120 "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
121 ))
122 .expect("lazy static keypair should be valid");
123 static ref ALICE: Address = ALICE_KP.public().to_address();
124 static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
125 static ref CHARLIE: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
126 static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
127 static ref EUGENE: Address = hex!("0c1da65d269f89b05e3775bf8fcd21a138e8cbeb").into();
128 }
129
130 #[tokio::test]
131 async fn test_should_close_only_non_overdue_pending_to_close_channels_with_elapsed_closure() -> anyhow::Result<()> {
132 let max_closure_overdue = Duration::from_secs(600);
133
134 let c_pending_elapsed = ChannelEntry::new(
136 *ALICE,
137 *DAVE,
138 10.into(),
139 0.into(),
140 ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(60))),
141 0.into(),
142 );
143
144 let mut mock = MockTestActions::new();
145 mock.expect_me().return_const(*ALICE);
146
147 mock.expect_stream_channels()
148 .once()
149 .withf(move |selector| {
150 selector.source == Some(*ALICE)
151 && selector.destination.is_none()
152 && selector.allowed_states == &[ChannelStatusDiscriminants::PendingToClose]
153 && match selector.closure_time_range {
154 (Bound::Included(a), Bound::Included(b)) => {
155 b.sub(a).to_std().is_ok_and(|d| d == max_closure_overdue)
156 }
157 _ => false,
158 }
159 })
160 .return_once(move |_| futures::stream::iter([c_pending_elapsed]).boxed());
161
162 mock.expect_close_channel()
163 .once()
164 .with(mockall::predicate::eq(c_pending_elapsed.get_id()))
165 .return_once(|_| Ok((ChannelStatus::Closed, ChainReceipt::default())));
166
167 let cfg = ClosureFinalizerStrategyConfig { max_closure_overdue };
168
169 let strat = ClosureFinalizerStrategy::new(cfg, MockChainActions(mock.into()));
170 strat.on_tick().await?;
171
172 Ok(())
173 }
174}