1use std::{
2 fmt::{Display, Formatter},
3 ops::Sub,
4 time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use hopr_lib::{
10 ChannelStatusDiscriminants, Utc,
11 exports::api::chain::{ChainReadChannelOperations, ChainWriteChannelOperations, ChannelSelector},
12};
13use serde::{Deserialize, Serialize};
14use tracing::{debug, error, info};
15use validator::Validate;
16
17use crate::{Strategy, errors, strategy::SingularStrategy};
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21 static ref METRIC_COUNT_CLOSURE_FINALIZATIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
22 "hopr_strategy_closure_auto_finalization_count",
23 "Count of channels where closure finalizing was initiated automatically"
24 )
25 .unwrap();
26}
27
28#[inline]
29fn default_max_closure_overdue() -> Duration {
30 Duration::from_secs(300)
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
35pub struct ClosureFinalizerStrategyConfig {
36 #[serde(default = "default_max_closure_overdue", with = "humantime_serde")]
41 #[default(default_max_closure_overdue())]
42 pub max_closure_overdue: Duration,
43}
44
45pub struct ClosureFinalizerStrategy<A> {
48 cfg: ClosureFinalizerStrategyConfig,
49 hopr_chain_actions: A,
50}
51
52impl<A> ClosureFinalizerStrategy<A> {
53 pub fn new(cfg: ClosureFinalizerStrategyConfig, hopr_chain_actions: A) -> Self {
55 Self {
56 hopr_chain_actions,
57 cfg,
58 }
59 }
60}
61
62impl<A> Display for ClosureFinalizerStrategy<A> {
63 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
64 write!(f, "{}", Strategy::ClosureFinalizer(self.cfg))
65 }
66}
67
68#[async_trait]
69impl<A> SingularStrategy for ClosureFinalizerStrategy<A>
70where
71 A: ChainReadChannelOperations + ChainWriteChannelOperations + Send + Sync,
72{
73 async fn on_tick(&self) -> errors::Result<()> {
74 let now = Utc::now();
75 let mut outgoing_channels = self
76 .hopr_chain_actions
77 .stream_channels(
78 ChannelSelector::default()
79 .with_source(*self.hopr_chain_actions.me())
80 .with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
81 .with_closure_time_range(now.sub(self.cfg.max_closure_overdue)..=now),
82 )
83 .await
84 .map_err(|e| errors::StrategyError::Other(e.into()))?;
85
86 while let Some(channel) = outgoing_channels.next().await {
87 info!(%channel, "channel closure finalizer: finalizing closure");
88 match self.hopr_chain_actions.close_channel(channel.get_id()).await {
89 Ok(_) => {
90 debug!(%channel, "channel closure finalizer: finalizing closure");
92 #[cfg(all(feature = "prometheus", not(test)))]
93 METRIC_COUNT_CLOSURE_FINALIZATIONS.increment();
94 }
95 Err(e) => error!(%channel, error = %e, "channel closure finalizer: failed to finalize closure"),
96 }
97 }
98
99 debug!("channel closure finalizer: initiated closure finalization done");
100 Ok(())
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use std::{ops::Add, time::SystemTime};
107
108 use futures_time::future::FutureExt;
109 use hex_literal::hex;
110 use hopr_chain_connector::{create_trustful_hopr_blokli_connector, testing::BlokliTestStateBuilder};
111 use hopr_lib::{
112 Address, BytesRepresentable, ChainKeypair, ChannelEntry, ChannelStatus, HoprBalance, Keypair, XDaiBalance,
113 exports::api::chain::{ChainEvent, ChainEvents},
114 };
115 use lazy_static::lazy_static;
116
117 use super::*;
118
119 lazy_static! {
120 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
121 "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
122 ))
123 .expect("lazy static keypair should be valid");
124 static ref ALICE: Address = ALICE_KP.public().to_address();
125 static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
126 static ref CHARLIE: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
127 static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
128 static ref EUGENE: Address = hex!("0c1da65d269f89b05e3775bf8fcd21a138e8cbeb").into();
129 }
130
131 #[tokio::test]
132 async fn test_should_close_only_non_overdue_pending_to_close_channels_with_elapsed_closure() -> anyhow::Result<()> {
133 let max_closure_overdue = Duration::from_secs(600);
134
135 let channel_to_be_closed = ChannelEntry::new(
136 *ALICE,
137 *DAVE,
138 10.into(),
139 0,
140 ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(60))),
141 1,
142 );
143
144 let blokli_sim = BlokliTestStateBuilder::default()
145 .with_generated_accounts(
146 &[&*ALICE, &*BOB, &*CHARLIE, &*DAVE, &*EUGENE],
147 false,
148 XDaiBalance::new_base(1),
149 HoprBalance::new_base(1000),
150 )
151 .with_channels([
152 ChannelEntry::new(*ALICE, *BOB, 10.into(), 0, ChannelStatus::Open, 0),
154 ChannelEntry::new(
156 *ALICE,
157 *CHARLIE,
158 10.into(),
159 0,
160 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(60))),
161 1,
162 ),
163 channel_to_be_closed,
165 ChannelEntry::new(
167 *ALICE,
168 *EUGENE,
169 10.into(),
170 0,
171 ChannelStatus::PendingToClose(SystemTime::now().sub(max_closure_overdue * 2)),
172 1,
173 ),
174 ])
175 .build_dynamic_client([1; Address::SIZE].into());
176
177 let mut chain_connector =
178 create_trustful_hopr_blokli_connector(&ALICE_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
179 .await?;
180 chain_connector.connect(Duration::from_secs(3)).await?;
181 let events = chain_connector.subscribe()?;
182
183 let cfg = ClosureFinalizerStrategyConfig { max_closure_overdue };
184
185 let strat = ClosureFinalizerStrategy::new(cfg, chain_connector);
186 strat.on_tick().await?;
187
188 events
189 .filter(|event| {
190 futures::future::ready(
191 matches!(event, ChainEvent::ChannelClosed(c) if channel_to_be_closed.get_id() == c.get_id()),
192 )
193 })
194 .next()
195 .timeout(futures_time::time::Duration::from_secs(2))
196 .await?;
197
198 Ok(())
201 }
202}