hopr_strategy/
channel_finalizer.rs1use std::{
2 fmt::{Display, Formatter},
3 ops::Sub,
4 time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use hopr_lib::{
10 ChannelStatusDiscriminants, Utc,
11 api::chain::{ChainReadChannelOperations, ChainWriteChannelOperations, ChannelSelector},
12};
13use serde::{Deserialize, Serialize};
14use tracing::{debug, error, info};
15use validator::Validate;
16
17use crate::{Strategy, errors, strategy::SingularStrategy};
18
19#[cfg(all(feature = "telemetry", not(test)))]
20lazy_static::lazy_static! {
21 static ref METRIC_COUNT_CLOSURE_FINALIZATIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
22 "hopr_strategy_closure_auto_finalization_count",
23 "Count of channels where closure finalizing was initiated automatically"
24 )
25 .unwrap();
26}
27
28#[inline]
29fn default_max_closure_overdue() -> Duration {
30 Duration::from_secs(300)
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
35pub struct ClosureFinalizerStrategyConfig {
36 #[serde(default = "default_max_closure_overdue", with = "humantime_serde")]
41 #[default(default_max_closure_overdue())]
42 pub max_closure_overdue: Duration,
43}
44
45pub struct ClosureFinalizerStrategy<A> {
48 cfg: ClosureFinalizerStrategyConfig,
49 hopr_chain_actions: A,
50}
51
52impl<A> ClosureFinalizerStrategy<A> {
53 pub fn new(cfg: ClosureFinalizerStrategyConfig, hopr_chain_actions: A) -> Self {
55 Self {
56 hopr_chain_actions,
57 cfg,
58 }
59 }
60}
61
62impl<A> Display for ClosureFinalizerStrategy<A> {
63 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
64 write!(f, "{}", Strategy::ClosureFinalizer(self.cfg))
65 }
66}
67
68#[async_trait]
69impl<A> SingularStrategy for ClosureFinalizerStrategy<A>
70where
71 A: ChainReadChannelOperations + ChainWriteChannelOperations + Send + Sync,
72{
73 async fn on_tick(&self) -> errors::Result<()> {
74 let now = Utc::now();
75 let mut outgoing_channels = self
76 .hopr_chain_actions
77 .stream_channels(
78 ChannelSelector::default()
79 .with_source(*self.hopr_chain_actions.me())
80 .with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
81 .with_closure_time_range(now.sub(self.cfg.max_closure_overdue)..=now),
82 )
83 .await
84 .map_err(|e| errors::StrategyError::Other(e.into()))?;
85
86 while let Some(channel) = outgoing_channels.next().await {
87 info!(%channel, "channel closure finalizer: finalizing closure");
88 match self.hopr_chain_actions.close_channel(channel.get_id()).await {
89 Ok(_) => {
90 debug!(%channel, "channel closure finalizer: finalizing closure");
92 #[cfg(all(feature = "telemetry", not(test)))]
93 METRIC_COUNT_CLOSURE_FINALIZATIONS.increment();
94 }
95 Err(e) => error!(%channel, error = %e, "channel closure finalizer: failed to finalize closure"),
96 }
97 }
98
99 debug!("channel closure finalizer: initiated closure finalization done");
100 Ok(())
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use std::{ops::Add, time::SystemTime};
107
108 use futures_time::future::FutureExt;
109 use hex_literal::hex;
110 use hopr_chain_connector::{create_trustful_hopr_blokli_connector, testing::BlokliTestStateBuilder};
111 use hopr_lib::{
112 Address, BytesRepresentable, ChainKeypair, ChannelEntry, ChannelStatus, HoprBalance, Keypair, XDaiBalance,
113 api::chain::{ChainEvent, ChainEvents},
114 };
115 use lazy_static::lazy_static;
116
117 use super::*;
118
119 lazy_static! {
120 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
121 "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
122 ))
123 .expect("lazy static keypair should be valid");
124 static ref ALICE: Address = ALICE_KP.public().to_address();
125 static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
126 static ref CHARLIE: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
127 static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
128 static ref EUGENE: Address = hex!("0c1da65d269f89b05e3775bf8fcd21a138e8cbeb").into();
129 }
130
131 #[tokio::test]
132 async fn test_should_close_only_non_overdue_pending_to_close_channels_with_elapsed_closure() -> anyhow::Result<()> {
133 let max_closure_overdue = Duration::from_secs(600);
134
135 let channel_to_be_closed = ChannelEntry::builder()
136 .between(*ALICE, *DAVE)
137 .amount(10)
138 .ticket_index(0)
139 .status(ChannelStatus::PendingToClose(
140 SystemTime::now().sub(Duration::from_secs(60)),
141 ))
142 .epoch(1)
143 .build()?;
144
145 let blokli_sim = BlokliTestStateBuilder::default()
146 .with_generated_accounts(
147 &[&*ALICE, &*BOB, &*CHARLIE, &*DAVE, &*EUGENE],
148 false,
149 XDaiBalance::new_base(1),
150 HoprBalance::new_base(1000),
151 )
152 .with_channels([
153 ChannelEntry::builder()
155 .between(*ALICE, *BOB)
156 .amount(10)
157 .ticket_index(0)
158 .status(ChannelStatus::Open)
159 .epoch(0)
160 .build()?,
161 ChannelEntry::builder()
163 .between(*ALICE, *CHARLIE)
164 .amount(10)
165 .ticket_index(0)
166 .status(ChannelStatus::PendingToClose(
167 SystemTime::now().add(Duration::from_secs(60)),
168 ))
169 .epoch(1)
170 .build()?,
171 channel_to_be_closed,
173 ChannelEntry::builder()
175 .between(*ALICE, *EUGENE)
176 .amount(10)
177 .ticket_index(0)
178 .status(ChannelStatus::PendingToClose(
179 SystemTime::now().sub(max_closure_overdue * 2),
180 ))
181 .epoch(1)
182 .build()?,
183 ])
184 .build_dynamic_client([1; Address::SIZE].into());
185
186 let mut chain_connector =
187 create_trustful_hopr_blokli_connector(&ALICE_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
188 .await?;
189 chain_connector.connect().await?;
190 let events = chain_connector.subscribe()?;
191
192 let cfg = ClosureFinalizerStrategyConfig { max_closure_overdue };
193
194 let strat = ClosureFinalizerStrategy::new(cfg, chain_connector);
195 strat.on_tick().await?;
196
197 events
198 .filter(|event| {
199 futures::future::ready(
200 matches!(event, ChainEvent::ChannelClosed(c) if channel_to_be_closed.get_id() == c.get_id()),
201 )
202 })
203 .next()
204 .timeout(futures_time::time::Duration::from_secs(2))
205 .await?;
206
207 Ok(())
210 }
211}