1use async_trait::async_trait;
2use futures::stream::FuturesUnordered;
3use futures::StreamExt;
4use serde::{Deserialize, Serialize};
5use serde_with::{serde_as, DurationSeconds};
6use std::fmt::{Display, Formatter};
7use std::ops::Sub;
8use std::time::Duration;
9use tracing::{debug, error, info};
10use validator::Validate;
11
12use hopr_chain_actions::channels::ChannelActions;
13use hopr_db_sql::accounts::HoprDbAccountOperations;
14use hopr_db_sql::channels::HoprDbChannelOperations;
15use hopr_internal_types::prelude::*;
16use hopr_platform::time::native::current_time;
17
18#[cfg(all(feature = "prometheus", not(test)))]
19use hopr_metrics::metrics::SimpleCounter;
20
21use crate::strategy::SingularStrategy;
22use crate::{errors, Strategy};
23
24#[cfg(all(feature = "prometheus", not(test)))]
25lazy_static::lazy_static! {
26 static ref METRIC_COUNT_CLOSURE_FINALIZATIONS: SimpleCounter = SimpleCounter::new(
27 "hopr_strategy_closure_auto_finalization_count",
28 "Count of channels where closure finalizing was initiated automatically"
29 )
30 .unwrap();
31}
32
33#[inline]
34fn default_max_closure_overdue() -> Duration {
35 Duration::from_secs(300)
36}
37#[serde_as]
39#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
40pub struct ClosureFinalizerStrategyConfig {
41 #[serde_as(as = "DurationSeconds<u64>")]
46 #[serde(default = "default_max_closure_overdue")]
47 #[default(default_max_closure_overdue())]
48 pub max_closure_overdue: Duration,
49}
50
51pub struct ClosureFinalizerStrategy<Db, A>
54where
55 Db: HoprDbChannelOperations + Clone + Send + Sync,
56 A: ChannelActions,
57{
58 db: Db,
59 cfg: ClosureFinalizerStrategyConfig,
60 hopr_chain_actions: A,
61}
62
63impl<Db, A> ClosureFinalizerStrategy<Db, A>
64where
65 Db: HoprDbChannelOperations + Clone + Send + Sync,
66 A: ChannelActions,
67{
68 pub fn new(cfg: ClosureFinalizerStrategyConfig, db: Db, hopr_chain_actions: A) -> Self {
70 Self {
71 db,
72 hopr_chain_actions,
73 cfg,
74 }
75 }
76}
77
78impl<Db, A> Display for ClosureFinalizerStrategy<Db, A>
79where
80 Db: HoprDbChannelOperations + Clone + Send + Sync,
81 A: ChannelActions,
82{
83 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
84 write!(f, "{}", Strategy::ClosureFinalizer(self.cfg))
85 }
86}
87
88#[async_trait]
89impl<Db, A> SingularStrategy for ClosureFinalizerStrategy<Db, A>
90where
91 Db: HoprDbChannelOperations + HoprDbAccountOperations + Clone + Send + Sync,
92 A: ChannelActions + Send + Sync,
93{
94 async fn on_tick(&self) -> errors::Result<()> {
95 let ts_limit = current_time().sub(self.cfg.max_closure_overdue);
96
97 let outgoing_channels = self
98 .db
99 .get_outgoing_channels(None)
100 .await
101 .map_err(hopr_db_sql::api::errors::DbError::from)?;
102
103 let to_close = outgoing_channels
104 .iter()
105 .filter(|channel| {
106 matches!(channel.status, ChannelStatus::PendingToClose(ct) if ct > ts_limit)
107 && channel.closure_time_passed(current_time())
108 })
109 .map(|channel| async {
110 let channel_cpy = *channel;
111 info!(channel = %channel_cpy, "channel closure finalizer: finalizing closure");
112 match self
113 .hopr_chain_actions
114 .close_channel(channel_cpy.destination, ChannelDirection::Outgoing, false)
115 .await
116 {
117 Ok(_) => {
118 debug!("channel closure finalizer: finalizing closure of {channel_cpy}");
120 }
121 Err(e) => error!(%channel_cpy, error = %e, "channel closure finalizer: failed to finalize closure"),
122 }
123 })
124 .collect::<FuturesUnordered<_>>()
125 .count()
126 .await;
127
128 #[cfg(all(feature = "prometheus", not(test)))]
129 METRIC_COUNT_CLOSURE_FINALIZATIONS.increment_by(to_close as u64);
130
131 debug!("channel closure finalizer: initiated closure finalization of {to_close} channels");
132 Ok(())
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use futures::future::ok;
140 use futures::FutureExt;
141 use hex_literal::hex;
142 use hopr_chain_actions::action_queue::{ActionConfirmation, PendingAction};
143 use hopr_chain_types::actions::Action;
144 use hopr_chain_types::chain_events::ChainEventType;
145 use hopr_crypto_random::random_bytes;
146 use hopr_crypto_types::prelude::*;
147 use hopr_db_sql::db::HoprDb;
148 use hopr_db_sql::HoprDbGeneralModelOperations;
149 use hopr_primitive_types::prelude::*;
150 use lazy_static::lazy_static;
151 use mockall::mock;
152 use std::ops::Add;
153 use std::time::SystemTime;
154
155 lazy_static! {
156 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
157 "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
158 ))
159 .expect("lazy static keypair should be valid");
160 static ref ALICE: Address = ALICE_KP.public().to_address();
161 static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
162 static ref CHARLIE: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
163 static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
164 static ref EUGENE: Address = hex!("0c1da65d269f89b05e3775bf8fcd21a138e8cbeb").into();
165 }
166
167 mock! {
168 ChannelAct { }
169 #[async_trait]
170 impl ChannelActions for ChannelAct {
171 async fn open_channel(&self, destination: Address, amount: Balance) -> hopr_chain_actions::errors::Result<PendingAction>;
172 async fn fund_channel(&self, channel_id: Hash, amount: Balance) -> hopr_chain_actions::errors::Result<PendingAction>;
173 async fn close_channel(
174 &self,
175 counterparty: Address,
176 direction: ChannelDirection,
177 redeem_before_close: bool,
178 ) -> hopr_chain_actions::errors::Result<PendingAction>;
179 }
180 }
181
182 fn mock_action_confirmation_closure(channel: ChannelEntry) -> ActionConfirmation {
183 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
184 ActionConfirmation {
185 tx_hash: random_hash,
186 event: Some(ChainEventType::ChannelClosureInitiated(channel)),
187 action: Action::CloseChannel(channel, ChannelDirection::Outgoing),
188 }
189 }
190
191 #[async_std::test]
192 async fn test_should_close_only_non_overdue_pending_to_close_channels_with_elapsed_closure() -> anyhow::Result<()> {
193 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
194
195 let max_closure_overdue = Duration::from_secs(600);
196
197 let c_open = ChannelEntry::new(
199 *ALICE,
200 *BOB,
201 BalanceType::HOPR.balance(10),
202 0.into(),
203 ChannelStatus::Open,
204 0.into(),
205 );
206
207 let c_pending = ChannelEntry::new(
209 *ALICE,
210 *CHARLIE,
211 BalanceType::HOPR.balance(10),
212 0.into(),
213 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(60))),
214 0.into(),
215 );
216
217 let c_pending_elapsed = ChannelEntry::new(
219 *ALICE,
220 *DAVE,
221 BalanceType::HOPR.balance(10),
222 0.into(),
223 ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(60))),
224 0.into(),
225 );
226
227 let c_pending_overdue = ChannelEntry::new(
229 *ALICE,
230 *EUGENE,
231 BalanceType::HOPR.balance(10),
232 0.into(),
233 ChannelStatus::PendingToClose(SystemTime::now().sub(max_closure_overdue * 2)),
234 0.into(),
235 );
236
237 let db_clone = db.clone();
238 db.begin_transaction()
239 .await?
240 .perform(|tx| {
241 Box::pin(async move {
242 db_clone.upsert_channel(Some(tx), c_open).await?;
243 db_clone.upsert_channel(Some(tx), c_pending).await?;
244 db_clone.upsert_channel(Some(tx), c_pending_elapsed).await?;
245 db_clone.upsert_channel(Some(tx), c_pending_overdue).await
246 })
247 })
248 .await?;
249
250 let mut actions = MockChannelAct::new();
251 actions
252 .expect_close_channel()
253 .once()
254 .withf(|addr, dir, _| DAVE.eq(addr) && ChannelDirection::Outgoing.eq(dir))
255 .return_once(move |_, _, _| Ok(ok(mock_action_confirmation_closure(c_pending_elapsed)).boxed()));
256
257 let cfg = ClosureFinalizerStrategyConfig { max_closure_overdue };
258
259 let strat = ClosureFinalizerStrategy::new(cfg, db.clone(), actions);
260 strat.on_tick().await?;
261
262 Ok(())
263 }
264}