hopr_strategy/
channel_finalizer.rs

1use async_trait::async_trait;
2use futures::stream::FuturesUnordered;
3use futures::StreamExt;
4use serde::{Deserialize, Serialize};
5use serde_with::{serde_as, DurationSeconds};
6use std::fmt::{Display, Formatter};
7use std::ops::Sub;
8use std::time::Duration;
9use tracing::{debug, error, info};
10use validator::Validate;
11
12use hopr_chain_actions::channels::ChannelActions;
13use hopr_db_sql::accounts::HoprDbAccountOperations;
14use hopr_db_sql::channels::HoprDbChannelOperations;
15use hopr_internal_types::prelude::*;
16use hopr_platform::time::native::current_time;
17
18#[cfg(all(feature = "prometheus", not(test)))]
19use hopr_metrics::metrics::SimpleCounter;
20
21use crate::strategy::SingularStrategy;
22use crate::{errors, Strategy};
23
24#[cfg(all(feature = "prometheus", not(test)))]
25lazy_static::lazy_static! {
26    static ref METRIC_COUNT_CLOSURE_FINALIZATIONS: SimpleCounter = SimpleCounter::new(
27        "hopr_strategy_closure_auto_finalization_count",
28        "Count of channels where closure finalizing was initiated automatically"
29    )
30    .unwrap();
31}
32
33#[inline]
34fn default_max_closure_overdue() -> Duration {
35    Duration::from_secs(300)
36}
37/// Contains configuration of the [ClosureFinalizerStrategy].
38#[serde_as]
39#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
40pub struct ClosureFinalizerStrategyConfig {
41    /// Do not attempt to finalize closure of channels that have
42    /// been overdue for closure for more than this period.
43    ///
44    /// Default is 300 seconds.
45    #[serde_as(as = "DurationSeconds<u64>")]
46    #[serde(default = "default_max_closure_overdue")]
47    #[default(default_max_closure_overdue())]
48    pub max_closure_overdue: Duration,
49}
50
51/// Strategy which runs per tick and finalizes `PendingToClose` channels
52/// which have elapsed the grace period.
53pub struct ClosureFinalizerStrategy<Db, A>
54where
55    Db: HoprDbChannelOperations + Clone + Send + Sync,
56    A: ChannelActions,
57{
58    db: Db,
59    cfg: ClosureFinalizerStrategyConfig,
60    hopr_chain_actions: A,
61}
62
63impl<Db, A> ClosureFinalizerStrategy<Db, A>
64where
65    Db: HoprDbChannelOperations + Clone + Send + Sync,
66    A: ChannelActions,
67{
68    /// Constructs the strategy.
69    pub fn new(cfg: ClosureFinalizerStrategyConfig, db: Db, hopr_chain_actions: A) -> Self {
70        Self {
71            db,
72            hopr_chain_actions,
73            cfg,
74        }
75    }
76}
77
78impl<Db, A> Display for ClosureFinalizerStrategy<Db, A>
79where
80    Db: HoprDbChannelOperations + Clone + Send + Sync,
81    A: ChannelActions,
82{
83    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
84        write!(f, "{}", Strategy::ClosureFinalizer(self.cfg))
85    }
86}
87
88#[async_trait]
89impl<Db, A> SingularStrategy for ClosureFinalizerStrategy<Db, A>
90where
91    Db: HoprDbChannelOperations + HoprDbAccountOperations + Clone + Send + Sync,
92    A: ChannelActions + Send + Sync,
93{
94    async fn on_tick(&self) -> errors::Result<()> {
95        let ts_limit = current_time().sub(self.cfg.max_closure_overdue);
96
97        let outgoing_channels = self
98            .db
99            .get_outgoing_channels(None)
100            .await
101            .map_err(hopr_db_sql::api::errors::DbError::from)?;
102
103        let to_close = outgoing_channels
104            .iter()
105            .filter(|channel| {
106                matches!(channel.status, ChannelStatus::PendingToClose(ct) if ct > ts_limit)
107                    && channel.closure_time_passed(current_time())
108            })
109            .map(|channel| async {
110                let channel_cpy = *channel;
111                info!(channel = %channel_cpy, "channel closure finalizer: finalizing closure");
112                match self
113                    .hopr_chain_actions
114                    .close_channel(channel_cpy.destination, ChannelDirection::Outgoing, false)
115                    .await
116                {
117                    Ok(_) => {
118                        // Currently, we're not interested in awaiting the Close transactions to confirmation
119                        debug!("channel closure finalizer: finalizing closure of {channel_cpy}");
120                    }
121                    Err(e) => error!(%channel_cpy, error = %e, "channel closure finalizer: failed to finalize closure"),
122                }
123            })
124            .collect::<FuturesUnordered<_>>()
125            .count()
126            .await;
127
128        #[cfg(all(feature = "prometheus", not(test)))]
129        METRIC_COUNT_CLOSURE_FINALIZATIONS.increment_by(to_close as u64);
130
131        debug!("channel closure finalizer: initiated closure finalization of {to_close} channels");
132        Ok(())
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139    use futures::future::ok;
140    use futures::FutureExt;
141    use hex_literal::hex;
142    use hopr_chain_actions::action_queue::{ActionConfirmation, PendingAction};
143    use hopr_chain_types::actions::Action;
144    use hopr_chain_types::chain_events::ChainEventType;
145    use hopr_crypto_random::random_bytes;
146    use hopr_crypto_types::prelude::*;
147    use hopr_db_sql::db::HoprDb;
148    use hopr_db_sql::HoprDbGeneralModelOperations;
149    use hopr_primitive_types::prelude::*;
150    use lazy_static::lazy_static;
151    use mockall::mock;
152    use std::ops::Add;
153    use std::time::SystemTime;
154
155    lazy_static! {
156        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
157            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
158        ))
159        .expect("lazy static keypair should be valid");
160        static ref ALICE: Address = ALICE_KP.public().to_address();
161        static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
162        static ref CHARLIE: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
163        static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
164        static ref EUGENE: Address = hex!("0c1da65d269f89b05e3775bf8fcd21a138e8cbeb").into();
165    }
166
167    mock! {
168        ChannelAct { }
169        #[async_trait]
170        impl ChannelActions for ChannelAct {
171            async fn open_channel(&self, destination: Address, amount: Balance) -> hopr_chain_actions::errors::Result<PendingAction>;
172            async fn fund_channel(&self, channel_id: Hash, amount: Balance) -> hopr_chain_actions::errors::Result<PendingAction>;
173            async fn close_channel(
174                &self,
175                counterparty: Address,
176                direction: ChannelDirection,
177                redeem_before_close: bool,
178            ) -> hopr_chain_actions::errors::Result<PendingAction>;
179        }
180    }
181
182    fn mock_action_confirmation_closure(channel: ChannelEntry) -> ActionConfirmation {
183        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
184        ActionConfirmation {
185            tx_hash: random_hash,
186            event: Some(ChainEventType::ChannelClosureInitiated(channel)),
187            action: Action::CloseChannel(channel, ChannelDirection::Outgoing),
188        }
189    }
190
191    #[async_std::test]
192    async fn test_should_close_only_non_overdue_pending_to_close_channels_with_elapsed_closure() -> anyhow::Result<()> {
193        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
194
195        let max_closure_overdue = Duration::from_secs(600);
196
197        // Should leave this channel opened
198        let c_open = ChannelEntry::new(
199            *ALICE,
200            *BOB,
201            BalanceType::HOPR.balance(10),
202            0.into(),
203            ChannelStatus::Open,
204            0.into(),
205        );
206
207        // Should leave this unfinalized, because the channel closure period has not yet elapsed
208        let c_pending = ChannelEntry::new(
209            *ALICE,
210            *CHARLIE,
211            BalanceType::HOPR.balance(10),
212            0.into(),
213            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(60))),
214            0.into(),
215        );
216
217        // Should finalize closure of this channel
218        let c_pending_elapsed = ChannelEntry::new(
219            *ALICE,
220            *DAVE,
221            BalanceType::HOPR.balance(10),
222            0.into(),
223            ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(60))),
224            0.into(),
225        );
226
227        // Should leave this unfinalized, because the channel closure is long overdue
228        let c_pending_overdue = ChannelEntry::new(
229            *ALICE,
230            *EUGENE,
231            BalanceType::HOPR.balance(10),
232            0.into(),
233            ChannelStatus::PendingToClose(SystemTime::now().sub(max_closure_overdue * 2)),
234            0.into(),
235        );
236
237        let db_clone = db.clone();
238        db.begin_transaction()
239            .await?
240            .perform(|tx| {
241                Box::pin(async move {
242                    db_clone.upsert_channel(Some(tx), c_open).await?;
243                    db_clone.upsert_channel(Some(tx), c_pending).await?;
244                    db_clone.upsert_channel(Some(tx), c_pending_elapsed).await?;
245                    db_clone.upsert_channel(Some(tx), c_pending_overdue).await
246                })
247            })
248            .await?;
249
250        let mut actions = MockChannelAct::new();
251        actions
252            .expect_close_channel()
253            .once()
254            .withf(|addr, dir, _| DAVE.eq(addr) && ChannelDirection::Outgoing.eq(dir))
255            .return_once(move |_, _, _| Ok(ok(mock_action_confirmation_closure(c_pending_elapsed)).boxed()));
256
257        let cfg = ClosureFinalizerStrategyConfig { max_closure_overdue };
258
259        let strat = ClosureFinalizerStrategy::new(cfg, db.clone(), actions);
260        strat.on_tick().await?;
261
262        Ok(())
263    }
264}