hopr_strategy/
channel_finalizer.rs

1use std::{
2    fmt::{Display, Formatter},
3    ops::Sub,
4    time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::{StreamExt, stream::FuturesUnordered};
9use hopr_chain_actions::channels::ChannelActions;
10use hopr_db_sql::{accounts::HoprDbAccountOperations, channels::HoprDbChannelOperations};
11use hopr_internal_types::prelude::*;
12#[cfg(all(feature = "prometheus", not(test)))]
13use hopr_metrics::metrics::SimpleCounter;
14use hopr_platform::time::native::current_time;
15use serde::{Deserialize, Serialize};
16use serde_with::{DurationSeconds, serde_as};
17use tracing::{debug, error, info};
18use validator::Validate;
19
20use crate::{Strategy, errors, strategy::SingularStrategy};
21
22#[cfg(all(feature = "prometheus", not(test)))]
23lazy_static::lazy_static! {
24    static ref METRIC_COUNT_CLOSURE_FINALIZATIONS: SimpleCounter = SimpleCounter::new(
25        "hopr_strategy_closure_auto_finalization_count",
26        "Count of channels where closure finalizing was initiated automatically"
27    )
28    .unwrap();
29}
30
31#[inline]
32fn default_max_closure_overdue() -> Duration {
33    Duration::from_secs(300)
34}
35/// Contains configuration of the [ClosureFinalizerStrategy].
36#[serde_as]
37#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
38pub struct ClosureFinalizerStrategyConfig {
39    /// Do not attempt to finalize closure of channels that have
40    /// been overdue for closure for more than this period.
41    ///
42    /// Default is 300 seconds.
43    #[serde_as(as = "DurationSeconds<u64>")]
44    #[serde(default = "default_max_closure_overdue")]
45    #[default(default_max_closure_overdue())]
46    pub max_closure_overdue: Duration,
47}
48
49/// Strategy which runs per tick and finalizes `PendingToClose` channels
50/// which have elapsed the grace period.
51pub struct ClosureFinalizerStrategy<Db, A>
52where
53    Db: HoprDbChannelOperations + Clone + Send + Sync,
54    A: ChannelActions,
55{
56    db: Db,
57    cfg: ClosureFinalizerStrategyConfig,
58    hopr_chain_actions: A,
59}
60
61impl<Db, A> ClosureFinalizerStrategy<Db, A>
62where
63    Db: HoprDbChannelOperations + Clone + Send + Sync,
64    A: ChannelActions,
65{
66    /// Constructs the strategy.
67    pub fn new(cfg: ClosureFinalizerStrategyConfig, db: Db, hopr_chain_actions: A) -> Self {
68        Self {
69            db,
70            hopr_chain_actions,
71            cfg,
72        }
73    }
74}
75
76impl<Db, A> Display for ClosureFinalizerStrategy<Db, A>
77where
78    Db: HoprDbChannelOperations + Clone + Send + Sync,
79    A: ChannelActions,
80{
81    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82        write!(f, "{}", Strategy::ClosureFinalizer(self.cfg))
83    }
84}
85
86#[async_trait]
87impl<Db, A> SingularStrategy for ClosureFinalizerStrategy<Db, A>
88where
89    Db: HoprDbChannelOperations + HoprDbAccountOperations + Clone + Send + Sync,
90    A: ChannelActions + Send + Sync,
91{
92    async fn on_tick(&self) -> errors::Result<()> {
93        let ts_limit = current_time().sub(self.cfg.max_closure_overdue);
94
95        let outgoing_channels = self
96            .db
97            .get_outgoing_channels(None)
98            .await
99            .map_err(hopr_db_sql::api::errors::DbError::from)?;
100
101        let to_close = outgoing_channels
102            .iter()
103            .filter(|channel| {
104                matches!(channel.status, ChannelStatus::PendingToClose(ct) if ct > ts_limit)
105                    && channel.closure_time_passed(current_time())
106            })
107            .map(|channel| async {
108                let channel_cpy = *channel;
109                info!(channel = %channel_cpy, "channel closure finalizer: finalizing closure");
110                match self
111                    .hopr_chain_actions
112                    .close_channel(channel_cpy.destination, ChannelDirection::Outgoing, false)
113                    .await
114                {
115                    Ok(_) => {
116                        // Currently, we're not interested in awaiting the Close transactions to confirmation
117                        debug!("channel closure finalizer: finalizing closure of {channel_cpy}");
118                    }
119                    Err(e) => error!(%channel_cpy, error = %e, "channel closure finalizer: failed to finalize closure"),
120                }
121            })
122            .collect::<FuturesUnordered<_>>()
123            .count()
124            .await;
125
126        #[cfg(all(feature = "prometheus", not(test)))]
127        METRIC_COUNT_CLOSURE_FINALIZATIONS.increment_by(to_close as u64);
128
129        debug!("channel closure finalizer: initiated closure finalization of {to_close} channels");
130        Ok(())
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use std::{ops::Add, time::SystemTime};
137
138    use futures::{FutureExt, future::ok};
139    use hex_literal::hex;
140    use hopr_chain_actions::action_queue::{ActionConfirmation, PendingAction};
141    use hopr_chain_types::{actions::Action, chain_events::ChainEventType};
142    use hopr_crypto_random::random_bytes;
143    use hopr_crypto_types::prelude::*;
144    use hopr_db_sql::{HoprDbGeneralModelOperations, db::HoprDb};
145    use hopr_primitive_types::prelude::*;
146    use lazy_static::lazy_static;
147    use mockall::mock;
148
149    use super::*;
150
151    lazy_static! {
152        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
153            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
154        ))
155        .expect("lazy static keypair should be valid");
156        static ref ALICE: Address = ALICE_KP.public().to_address();
157        static ref BOB: Address = hex!("3798fa65d6326d3813a0d33489ac35377f4496ef").into();
158        static ref CHARLIE: Address = hex!("250eefb2586ab0873befe90b905126810960ee7c").into();
159        static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
160        static ref EUGENE: Address = hex!("0c1da65d269f89b05e3775bf8fcd21a138e8cbeb").into();
161    }
162
163    mock! {
164        ChannelAct { }
165        #[async_trait]
166        impl ChannelActions for ChannelAct {
167            async fn open_channel(&self, destination: Address, amount: HoprBalance) -> hopr_chain_actions::errors::Result<PendingAction>;
168            async fn fund_channel(&self, channel_id: Hash, amount: HoprBalance) -> hopr_chain_actions::errors::Result<PendingAction>;
169            async fn close_channel(
170                &self,
171                counterparty: Address,
172                direction: ChannelDirection,
173                redeem_before_close: bool,
174            ) -> hopr_chain_actions::errors::Result<PendingAction>;
175        }
176    }
177
178    fn mock_action_confirmation_closure(channel: ChannelEntry) -> ActionConfirmation {
179        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
180        ActionConfirmation {
181            tx_hash: random_hash,
182            event: Some(ChainEventType::ChannelClosureInitiated(channel)),
183            action: Action::CloseChannel(channel, ChannelDirection::Outgoing),
184        }
185    }
186
187    #[tokio::test]
188    async fn test_should_close_only_non_overdue_pending_to_close_channels_with_elapsed_closure() -> anyhow::Result<()> {
189        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
190
191        let max_closure_overdue = Duration::from_secs(600);
192
193        // Should leave this channel opened
194        let c_open = ChannelEntry::new(*ALICE, *BOB, 10.into(), 0.into(), ChannelStatus::Open, 0.into());
195
196        // Should leave this unfinalized, because the channel closure period has not yet elapsed
197        let c_pending = ChannelEntry::new(
198            *ALICE,
199            *CHARLIE,
200            10.into(),
201            0.into(),
202            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(60))),
203            0.into(),
204        );
205
206        // Should finalize closure of this channel
207        let c_pending_elapsed = ChannelEntry::new(
208            *ALICE,
209            *DAVE,
210            10.into(),
211            0.into(),
212            ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(60))),
213            0.into(),
214        );
215
216        // Should leave this unfinalized, because the channel closure is long overdue
217        let c_pending_overdue = ChannelEntry::new(
218            *ALICE,
219            *EUGENE,
220            10.into(),
221            0.into(),
222            ChannelStatus::PendingToClose(SystemTime::now().sub(max_closure_overdue * 2)),
223            0.into(),
224        );
225
226        let db_clone = db.clone();
227        db.begin_transaction()
228            .await?
229            .perform(|tx| {
230                Box::pin(async move {
231                    db_clone.upsert_channel(Some(tx), c_open).await?;
232                    db_clone.upsert_channel(Some(tx), c_pending).await?;
233                    db_clone.upsert_channel(Some(tx), c_pending_elapsed).await?;
234                    db_clone.upsert_channel(Some(tx), c_pending_overdue).await
235                })
236            })
237            .await?;
238
239        let mut actions = MockChannelAct::new();
240        actions
241            .expect_close_channel()
242            .once()
243            .withf(|addr, dir, _| DAVE.eq(addr) && ChannelDirection::Outgoing.eq(dir))
244            .return_once(move |_, _, _| Ok(ok(mock_action_confirmation_closure(c_pending_elapsed)).boxed()));
245
246        let cfg = ClosureFinalizerStrategyConfig { max_closure_overdue };
247
248        let strat = ClosureFinalizerStrategy::new(cfg, db.clone(), actions);
249        strat.on_tick().await?;
250
251        Ok(())
252    }
253}