1use std::{
2 fmt::{Debug, Display, Formatter},
3 ops::Sub,
4 sync::Arc,
5 time::Duration,
6};
7
8use async_trait::async_trait;
9use futures::StreamExt;
10use hopr_lib::api::{
11 chain::{ChainReadChannelOperations, ChainWriteChannelOperations, ChannelSelector},
12 node::HasChainApi,
13 types::{internal::prelude::ChannelStatusDiscriminants, primitive::prelude::Utc},
14};
15use serde::{Deserialize, Serialize};
16use tracing::{debug, error, info};
17use validator::Validate;
18
19use crate::{errors, strategy::Strategy as StrategyTrait};
20
21#[cfg(all(feature = "telemetry", not(test)))]
22lazy_static::lazy_static! {
23 static ref METRIC_COUNT_CLOSURE_FINALIZATIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
24 "hopr_strategy_closure_auto_finalization_count",
25 "Count of channels where closure finalizing was initiated automatically"
26 )
27 .unwrap();
28}
29
30#[inline]
31fn default_max_closure_overdue() -> Duration {
32 Duration::from_secs(300)
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
37pub struct ClosureFinalizerStrategyConfig {
38 #[serde(default = "default_max_closure_overdue", with = "humantime_serde")]
43 #[default(default_max_closure_overdue())]
44 pub max_closure_overdue: Duration,
45}
46
47pub struct ClosureFinalizerStrategy {
53 cfg: ClosureFinalizerStrategyConfig,
54 interval: Duration,
55}
56
57impl ClosureFinalizerStrategy {
58 pub fn new(cfg: ClosureFinalizerStrategyConfig, interval: Duration) -> Self {
60 Self { cfg, interval }
61 }
62
63 pub fn build<N>(self, node: Arc<N>) -> Box<dyn StrategyTrait + Send>
69 where
70 N: HasChainApi + Send + Sync + 'static,
71 N::ChainApi: ChainReadChannelOperations + ChainWriteChannelOperations + Clone + Send + Sync + 'static,
72 {
73 Box::new(ClosureFinalizerStrategyInner {
74 node,
75 cfg: self.cfg,
76 interval: self.interval,
77 })
78 }
79}
80
81struct ClosureFinalizerStrategyInner<N: HasChainApi> {
83 node: Arc<N>,
84 cfg: ClosureFinalizerStrategyConfig,
85 interval: Duration,
86}
87
88impl<N> ClosureFinalizerStrategyInner<N>
89where
90 N: HasChainApi + Send + Sync + 'static,
91 <N as HasChainApi>::ChainApi:
92 ChainReadChannelOperations + ChainWriteChannelOperations + Clone + Send + Sync + 'static,
93{
94 async fn on_tick(&self) -> errors::Result<()> {
95 let now = Utc::now();
96 let chain = self.node.chain_api();
97 let mut outgoing_channels = chain
98 .stream_channels(
99 ChannelSelector::default()
100 .with_source(*chain.me())
101 .with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
102 .with_closure_time_range(now.sub(self.cfg.max_closure_overdue)..=now),
103 )
104 .map_err(|e| errors::StrategyError::Other(e.into()))?;
105
106 while let Some(channel) = outgoing_channels.next().await {
107 info!(%channel, "channel closure finalizer: finalizing closure");
108 match self.node.chain_api().close_channel(channel.get_id()).await {
109 Ok(_) => {
110 debug!(%channel, "channel closure finalizer: submitted close transaction");
111 #[cfg(all(feature = "telemetry", not(test)))]
112 METRIC_COUNT_CLOSURE_FINALIZATIONS.increment();
113 }
114 Err(e) => error!(%channel, error = %e, "channel closure finalizer: failed to finalize closure"),
115 }
116 }
117
118 debug!("channel closure finalizer: initiated closure finalization done");
119 Ok(())
120 }
121}
122
123impl<N: HasChainApi> Debug for ClosureFinalizerStrategyInner<N> {
124 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
125 write!(f, "ClosureFinalizerStrategy({:?})", self.cfg)
126 }
127}
128
129impl<N: HasChainApi> Display for ClosureFinalizerStrategyInner<N> {
130 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
131 write!(f, "closure_finalizer")
132 }
133}
134
135#[async_trait]
136impl<N> StrategyTrait for ClosureFinalizerStrategyInner<N>
137where
138 N: HasChainApi + Send + Sync + 'static,
139 <N as HasChainApi>::ChainApi:
140 ChainReadChannelOperations + ChainWriteChannelOperations + Clone + Send + Sync + 'static,
141{
142 async fn run(&mut self) -> errors::Result<()> {
143 if let Err(e) = self.on_tick().await {
145 tracing::error!(%e, "closure finalizer tick failed");
146 }
147
148 let tick_stream = futures_time::stream::interval(self.interval.into()).map(|_| ());
149
150 futures::pin_mut!(tick_stream);
151 while tick_stream.next().await.is_some() {
152 if let Err(e) = self.on_tick().await {
153 tracing::error!(%e, "closure finalizer tick failed");
154 }
155 }
156
157 Ok(())
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use std::{ops::Add, sync::Arc, time::SystemTime};
164
165 use futures::StreamExt;
166 use futures_time::future::FutureExt;
167 use hex_literal::hex;
168 use hopr_chain_connector::{create_trustful_hopr_blokli_connector, testing::BlokliTestStateBuilder};
169 use hopr_lib::api::{
170 chain::{ChainEvent, ChainEvents, HoprChainApi},
171 node::{ComponentStatus, ComponentStatusReporter, EventWaitResult, HasChainApi, NodeOnchainIdentity},
172 types::{
173 crypto::{keypairs::Keypair, prelude::ChainKeypair},
174 internal::prelude::{ChannelEntry, ChannelStatus},
175 primitive::prelude::{Address, BytesRepresentable, HoprBalance, XDaiBalance},
176 },
177 };
178 use lazy_static::lazy_static;
179
180 use super::*;
181
182 lazy_static! {
183 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
184 "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
185 ))
186 .expect("lazy static keypair should be valid");
187 static ref ALICE: Address = ALICE_KP.public().to_address();
188 static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
189 static ref CHARLIE: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
190 static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
191 static ref EUGENE: Address = hex!("0c1da65d269f89b05e3775bf8fcd21a138e8cbeb").into();
192 }
193
194 struct ChainNode<C>(C);
196
197 impl<C> HasChainApi for ChainNode<C>
198 where
199 C: HoprChainApi + ComponentStatusReporter + Clone + Send + Sync + 'static,
200 {
201 type ChainApi = C;
202 type ChainError = <C as HoprChainApi>::ChainError;
203
204 fn identity(&self) -> &NodeOnchainIdentity {
205 static IDENTITY: std::sync::OnceLock<NodeOnchainIdentity> = std::sync::OnceLock::new();
206 IDENTITY.get_or_init(NodeOnchainIdentity::default)
207 }
208
209 fn chain_api(&self) -> &C {
210 &self.0
211 }
212
213 fn status(&self) -> ComponentStatus {
214 self.0.component_status()
215 }
216
217 fn wait_for_on_chain_event<F>(
218 &self,
219 _predicate: F,
220 _context: String,
221 _timeout: std::time::Duration,
222 ) -> EventWaitResult<<C as HoprChainApi>::ChainError, <C as HoprChainApi>::ChainError>
223 where
224 F: Fn(&ChainEvent) -> bool + Send + Sync + 'static,
225 {
226 unimplemented!("tests do not call wait_for_on_chain_event")
227 }
228 }
229
230 #[tokio::test]
231 async fn test_should_close_only_non_overdue_pending_to_close_channels_with_elapsed_closure() -> anyhow::Result<()> {
232 let max_closure_overdue = Duration::from_secs(600);
233
234 let channel_to_be_closed = ChannelEntry::builder()
235 .between(*ALICE, *DAVE)
236 .amount(10)
237 .ticket_index(0)
238 .status(ChannelStatus::PendingToClose(
239 SystemTime::now().sub(Duration::from_secs(60)),
240 ))
241 .epoch(1)
242 .build()?;
243
244 let blokli_sim = BlokliTestStateBuilder::default()
245 .with_generated_accounts(
246 &[&*ALICE, &*BOB, &*CHARLIE, &*DAVE, &*EUGENE],
247 false,
248 XDaiBalance::new_base(1),
249 HoprBalance::new_base(1000),
250 )
251 .with_channels([
252 ChannelEntry::builder()
253 .between(*ALICE, *BOB)
254 .amount(10)
255 .ticket_index(0)
256 .status(ChannelStatus::Open)
257 .epoch(0)
258 .build()?,
259 ChannelEntry::builder()
260 .between(*ALICE, *CHARLIE)
261 .amount(10)
262 .ticket_index(0)
263 .status(ChannelStatus::PendingToClose(
264 SystemTime::now().add(Duration::from_secs(60)),
265 ))
266 .epoch(1)
267 .build()?,
268 channel_to_be_closed,
269 ChannelEntry::builder()
270 .between(*ALICE, *EUGENE)
271 .amount(10)
272 .ticket_index(0)
273 .status(ChannelStatus::PendingToClose(
274 SystemTime::now().sub(max_closure_overdue * 2),
275 ))
276 .epoch(1)
277 .build()?,
278 ])
279 .build_dynamic_client([1; Address::SIZE].into());
280
281 let mut chain_connector =
282 create_trustful_hopr_blokli_connector(&ALICE_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
283 .await?;
284 chain_connector.connect().await?;
285 let chain_connector = Arc::new(chain_connector);
286 let events = chain_connector.subscribe()?;
287
288 let cfg = ClosureFinalizerStrategyConfig { max_closure_overdue };
289
290 let strat = ClosureFinalizerStrategyInner {
291 node: Arc::new(ChainNode(Arc::clone(&chain_connector))),
292 cfg,
293 interval: Duration::from_secs(60),
294 };
295 strat.on_tick().await?;
296
297 events
298 .filter(|event| {
299 futures::future::ready(
300 matches!(event, ChainEvent::ChannelClosed(c) if channel_to_be_closed.get_id() == c.get_id()),
301 )
302 })
303 .next()
304 .timeout(futures_time::time::Duration::from_secs(2))
305 .await?;
306
307 Ok(())
308 }
309
310 #[tokio::test]
313 async fn test_build_returns_strategy_trait_object() -> anyhow::Result<()> {
314 let blokli_sim = BlokliTestStateBuilder::default()
315 .with_generated_accounts(
316 &[&*ALICE, &*BOB],
317 false,
318 XDaiBalance::new_base(1),
319 HoprBalance::new_base(1000),
320 )
321 .with_channels([])
322 .build_dynamic_client([1; Address::SIZE].into());
323
324 let mut chain_connector =
325 create_trustful_hopr_blokli_connector(&ALICE_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
326 .await?;
327 chain_connector.connect().await?;
328 let node = Arc::new(ChainNode(Arc::new(chain_connector)));
329
330 let strategy: Box<dyn crate::strategy::Strategy + Send> = super::ClosureFinalizerStrategy::new(
331 ClosureFinalizerStrategyConfig::default(),
332 std::time::Duration::from_secs(60),
333 )
334 .build(node);
335
336 assert_eq!(strategy.to_string(), "closure_finalizer");
337 fn assert_send<T: Send>(_: T) {}
339 assert_send(strategy);
340
341 Ok(())
342 }
343}