1use std::{
2 fmt::{Display, Formatter},
3 ops::Sub,
4 time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::{StreamExt, stream::FuturesUnordered};
9use hopr_chain_actions::channels::ChannelActions;
10use hopr_db_sql::{accounts::HoprDbAccountOperations, channels::HoprDbChannelOperations};
11use hopr_internal_types::prelude::*;
12#[cfg(all(feature = "prometheus", not(test)))]
13use hopr_metrics::metrics::SimpleCounter;
14use hopr_platform::time::native::current_time;
15use serde::{Deserialize, Serialize};
16use serde_with::{DurationSeconds, serde_as};
17use tracing::{debug, error, info};
18use validator::Validate;
19
20use crate::{Strategy, errors, strategy::SingularStrategy};
21
22#[cfg(all(feature = "prometheus", not(test)))]
23lazy_static::lazy_static! {
24 static ref METRIC_COUNT_CLOSURE_FINALIZATIONS: SimpleCounter = SimpleCounter::new(
25 "hopr_strategy_closure_auto_finalization_count",
26 "Count of channels where closure finalizing was initiated automatically"
27 )
28 .unwrap();
29}
30
31#[inline]
32fn default_max_closure_overdue() -> Duration {
33 Duration::from_secs(300)
34}
35#[serde_as]
37#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
38pub struct ClosureFinalizerStrategyConfig {
39 #[serde_as(as = "DurationSeconds<u64>")]
44 #[serde(default = "default_max_closure_overdue")]
45 #[default(default_max_closure_overdue())]
46 pub max_closure_overdue: Duration,
47}
48
49pub struct ClosureFinalizerStrategy<Db, A>
52where
53 Db: HoprDbChannelOperations + Clone + Send + Sync,
54 A: ChannelActions,
55{
56 db: Db,
57 cfg: ClosureFinalizerStrategyConfig,
58 hopr_chain_actions: A,
59}
60
61impl<Db, A> ClosureFinalizerStrategy<Db, A>
62where
63 Db: HoprDbChannelOperations + Clone + Send + Sync,
64 A: ChannelActions,
65{
66 pub fn new(cfg: ClosureFinalizerStrategyConfig, db: Db, hopr_chain_actions: A) -> Self {
68 Self {
69 db,
70 hopr_chain_actions,
71 cfg,
72 }
73 }
74}
75
76impl<Db, A> Display for ClosureFinalizerStrategy<Db, A>
77where
78 Db: HoprDbChannelOperations + Clone + Send + Sync,
79 A: ChannelActions,
80{
81 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82 write!(f, "{}", Strategy::ClosureFinalizer(self.cfg))
83 }
84}
85
86#[async_trait]
87impl<Db, A> SingularStrategy for ClosureFinalizerStrategy<Db, A>
88where
89 Db: HoprDbChannelOperations + HoprDbAccountOperations + Clone + Send + Sync,
90 A: ChannelActions + Send + Sync,
91{
92 async fn on_tick(&self) -> errors::Result<()> {
93 let ts_limit = current_time().sub(self.cfg.max_closure_overdue);
94
95 let outgoing_channels = self
96 .db
97 .get_outgoing_channels(None)
98 .await
99 .map_err(hopr_db_sql::api::errors::DbError::from)?;
100
101 let to_close = outgoing_channels
102 .iter()
103 .filter(|channel| {
104 matches!(channel.status, ChannelStatus::PendingToClose(ct) if ct > ts_limit)
105 && channel.closure_time_passed(current_time())
106 })
107 .map(|channel| async {
108 let channel_cpy = *channel;
109 info!(channel = %channel_cpy, "channel closure finalizer: finalizing closure");
110 match self
111 .hopr_chain_actions
112 .close_channel(channel_cpy.destination, ChannelDirection::Outgoing, false)
113 .await
114 {
115 Ok(_) => {
116 debug!("channel closure finalizer: finalizing closure of {channel_cpy}");
118 }
119 Err(e) => error!(%channel_cpy, error = %e, "channel closure finalizer: failed to finalize closure"),
120 }
121 })
122 .collect::<FuturesUnordered<_>>()
123 .count()
124 .await;
125
126 #[cfg(all(feature = "prometheus", not(test)))]
127 METRIC_COUNT_CLOSURE_FINALIZATIONS.increment_by(to_close as u64);
128
129 debug!("channel closure finalizer: initiated closure finalization of {to_close} channels");
130 Ok(())
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use std::{ops::Add, time::SystemTime};
137
138 use futures::{FutureExt, future::ok};
139 use hex_literal::hex;
140 use hopr_chain_actions::action_queue::{ActionConfirmation, PendingAction};
141 use hopr_chain_types::{actions::Action, chain_events::ChainEventType};
142 use hopr_crypto_random::random_bytes;
143 use hopr_crypto_types::prelude::*;
144 use hopr_db_sql::{HoprDbGeneralModelOperations, db::HoprDb};
145 use hopr_primitive_types::prelude::*;
146 use lazy_static::lazy_static;
147 use mockall::mock;
148
149 use super::*;
150
151 lazy_static! {
152 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
153 "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
154 ))
155 .expect("lazy static keypair should be valid");
156 static ref ALICE: Address = ALICE_KP.public().to_address();
157 static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
158 static ref CHARLIE: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
159 static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
160 static ref EUGENE: Address = hex!("0c1da65d269f89b05e3775bf8fcd21a138e8cbeb").into();
161 }
162
163 mock! {
164 ChannelAct { }
165 #[async_trait]
166 impl ChannelActions for ChannelAct {
167 async fn open_channel(&self, destination: Address, amount: HoprBalance) -> hopr_chain_actions::errors::Result<PendingAction>;
168 async fn fund_channel(&self, channel_id: Hash, amount: HoprBalance) -> hopr_chain_actions::errors::Result<PendingAction>;
169 async fn close_channel(
170 &self,
171 counterparty: Address,
172 direction: ChannelDirection,
173 redeem_before_close: bool,
174 ) -> hopr_chain_actions::errors::Result<PendingAction>;
175 }
176 }
177
178 fn mock_action_confirmation_closure(channel: ChannelEntry) -> ActionConfirmation {
179 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
180 ActionConfirmation {
181 tx_hash: random_hash,
182 event: Some(ChainEventType::ChannelClosureInitiated(channel)),
183 action: Action::CloseChannel(channel, ChannelDirection::Outgoing),
184 }
185 }
186
187 #[tokio::test]
188 async fn test_should_close_only_non_overdue_pending_to_close_channels_with_elapsed_closure() -> anyhow::Result<()> {
189 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
190
191 let max_closure_overdue = Duration::from_secs(600);
192
193 let c_open = ChannelEntry::new(*ALICE, *BOB, 10.into(), 0.into(), ChannelStatus::Open, 0.into());
195
196 let c_pending = ChannelEntry::new(
198 *ALICE,
199 *CHARLIE,
200 10.into(),
201 0.into(),
202 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(60))),
203 0.into(),
204 );
205
206 let c_pending_elapsed = ChannelEntry::new(
208 *ALICE,
209 *DAVE,
210 10.into(),
211 0.into(),
212 ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(60))),
213 0.into(),
214 );
215
216 let c_pending_overdue = ChannelEntry::new(
218 *ALICE,
219 *EUGENE,
220 10.into(),
221 0.into(),
222 ChannelStatus::PendingToClose(SystemTime::now().sub(max_closure_overdue * 2)),
223 0.into(),
224 );
225
226 let db_clone = db.clone();
227 db.begin_transaction()
228 .await?
229 .perform(|tx| {
230 Box::pin(async move {
231 db_clone.upsert_channel(Some(tx), c_open).await?;
232 db_clone.upsert_channel(Some(tx), c_pending).await?;
233 db_clone.upsert_channel(Some(tx), c_pending_elapsed).await?;
234 db_clone.upsert_channel(Some(tx), c_pending_overdue).await
235 })
236 })
237 .await?;
238
239 let mut actions = MockChannelAct::new();
240 actions
241 .expect_close_channel()
242 .once()
243 .withf(|addr, dir, _| DAVE.eq(addr) && ChannelDirection::Outgoing.eq(dir))
244 .return_once(move |_, _, _| Ok(ok(mock_action_confirmation_closure(c_pending_elapsed)).boxed()));
245
246 let cfg = ClosureFinalizerStrategyConfig { max_closure_overdue };
247
248 let strat = ClosureFinalizerStrategy::new(cfg, db.clone(), actions);
249 strat.on_tick().await?;
250
251 Ok(())
252 }
253}