hopr_chain_actions/
channels.rs

1//! This module contains the [ChannelActions] trait defining HOPR channels operations.
2//!
3//! An implementation of this trait is added to [ChainActions] which realizes the redemption
4//! operations via [ActionQueue](crate::action_queue::ActionQueue).
5//! There are 4 basic high-level on-chain functions in the [ChannelActions] trait:
6//! - [open_channel](ChannelActions::open_channel)
7//! - [fund_channel](ChannelActions::fund_channel)
8//! - [close_channel](ChannelActions::close_channel)
9//!
10//! All the functions do the necessary validations using the DB and then post the corresponding action
11//! into the [ActionQueue](crate::action_queue::ActionQueue).
12//! The functions return immediately but provide futures that can be awaited in case the callers wishes to await the
13//! on-chain confirmation of the corresponding operation.
14//! See the details in [ActionQueue](crate::action_queue::ActionQueue) on how the confirmation is realized by awaiting
15//! the respective [SignificantChainEvent](hopr_chain_types::chain_events::SignificantChainEvent) by the Indexer.
16use std::time::Duration;
17
18use async_trait::async_trait;
19use hopr_chain_types::actions::Action;
20use hopr_crypto_types::types::Hash;
21use hopr_db_sql::prelude::{HoprDbChannelOperations, HoprDbInfoOperations};
22use hopr_internal_types::prelude::*;
23use hopr_platform::time::native::current_time;
24use hopr_primitive_types::prelude::*;
25use tracing::{debug, error, info};
26
27use crate::{
28    ChainActions,
29    action_queue::PendingAction,
30    errors::{
31        ChainActionsError::{
32            BalanceTooLow, ChannelAlreadyClosed, ChannelAlreadyExists, ChannelDoesNotExist, ClosureTimeHasNotElapsed,
33            InvalidArguments, InvalidChannelStake, InvalidState, NotEnoughAllowance,
34        },
35        Result,
36    },
37};
38
39/// Gathers all channel-related on-chain actions.
40#[async_trait]
41pub trait ChannelActions {
42    /// Opens a channel to the given `destination` with the given `amount` staked.
43    async fn open_channel(&self, destination: Address, amount: HoprBalance) -> Result<PendingAction>;
44
45    /// Funds the given channel with the given `amount`
46    async fn fund_channel(&self, channel_id: Hash, amount: HoprBalance) -> Result<PendingAction>;
47
48    /// Closes the channel to counterparty in the given direction. Optionally can issue redeeming of all tickets in that
49    /// channel, in case the `direction` is [`ChannelDirection::Incoming`].
50    async fn close_channel(&self, channel_entry: ChannelEntry) -> Result<PendingAction>;
51}
52
53#[async_trait]
54impl<Db: Sync> ChannelActions for ChainActions<Db> {
55    #[tracing::instrument(level = "debug", skip(self))]
56    async fn open_channel(&self, destination: Address, amount: HoprBalance) -> Result<PendingAction> {
57        if self.self_address() == destination {
58            return Err(InvalidArguments("cannot open channel to self".into()));
59        }
60
61        if amount.is_zero() {
62            return Err(InvalidArguments("invalid balance or balance type given".into()));
63        }
64
65        // Perform all checks
66        let allowance: HoprBalance = self.index_db.get_safe_hopr_allowance(None).await?;
67        debug!(%allowance, "current staking safe allowance");
68        if allowance < amount {
69            return Err(NotEnoughAllowance);
70        }
71
72        let hopr_balance: HoprBalance = self.index_db.get_safe_hopr_balance(None).await?;
73        debug!(balance = %hopr_balance, "current Safe HOPR balance");
74        if hopr_balance < amount {
75            return Err(BalanceTooLow);
76        }
77
78        if HoprBalance::from(ChannelEntry::MAX_CHANNEL_BALANCE) < amount {
79            return Err(InvalidChannelStake);
80        }
81
82        let maybe_channel = self
83            .index_db
84            .get_channel_by_parties(None, &self.self_address(), &destination, false)
85            .await?;
86        if let Some(channel) = maybe_channel {
87            debug!(%channel, "already found existing channel");
88            if channel.status != ChannelStatus::Closed {
89                error!(
90                    %destination,
91                    "channel to destination is already opened or pending to close"
92                );
93                return Err(ChannelAlreadyExists);
94            }
95        }
96
97        info!(%destination, %amount, "initiating channel open");
98        self.tx_sender.send(Action::OpenChannel(destination, amount)).await
99    }
100
101    #[tracing::instrument(level = "debug", skip(self))]
102    async fn fund_channel(&self, channel_id: Hash, amount: HoprBalance) -> Result<PendingAction> {
103        if amount.is_zero() {
104            return Err(InvalidArguments("invalid balance or balance type given".into()));
105        }
106
107        let allowance: HoprBalance = self.index_db.get_safe_hopr_allowance(None).await?;
108        debug!(%allowance, "current staking safe allowance");
109        if allowance.lt(&amount) {
110            return Err(NotEnoughAllowance);
111        }
112
113        let hopr_balance: HoprBalance = self.index_db.get_safe_hopr_balance(None).await?;
114        debug!(balance = %hopr_balance, "current Safe HOPR balance");
115        if hopr_balance.lt(&amount) {
116            return Err(BalanceTooLow);
117        }
118
119        match self.index_db.get_channel_by_id(None, &channel_id).await? {
120            Some(channel) => {
121                if channel.status == ChannelStatus::Open {
122                    if channel.balance + amount > HoprBalance::from(ChannelEntry::MAX_CHANNEL_BALANCE) {
123                        return Err(InvalidChannelStake);
124                    }
125
126                    info!("initiating funding of {channel} with {amount}");
127                    self.tx_sender.send(Action::FundChannel(channel, amount)).await
128                } else {
129                    Err(InvalidState(format!("channel {channel_id} is not opened")))
130                }
131            }
132            None => Err(ChannelDoesNotExist),
133        }
134    }
135
136    #[tracing::instrument(level = "debug", skip(self))]
137    async fn close_channel(&self, channel: ChannelEntry) -> Result<PendingAction> {
138        let direction = channel.direction(&self.me).ok_or(ChannelDoesNotExist)?; // Cannot close channel that is not own
139
140        match channel.status {
141            ChannelStatus::Closed => Err(ChannelAlreadyClosed),
142            ChannelStatus::PendingToClose(_) => {
143                let remaining_closure_time = channel.remaining_closure_time(current_time());
144                info!(%channel, ?remaining_closure_time, "remaining closure time update for a channel");
145                match remaining_closure_time {
146                    Some(Duration::ZERO) => {
147                        info!(%channel, %direction, "initiating finalization of channel closure");
148                        self.tx_sender.send(Action::CloseChannel(channel, direction)).await
149                    }
150                    _ => Err(ClosureTimeHasNotElapsed(
151                        channel
152                            .remaining_closure_time(current_time())
153                            .expect("impossible: closure time has not passed but no remaining closure time")
154                            .as_secs(),
155                    )),
156                }
157            }
158            ChannelStatus::Open => {
159                info!(%channel, ?direction, "initiating channel closure");
160                self.tx_sender.send(Action::CloseChannel(channel, direction)).await
161            }
162        }
163    }
164}
165#[cfg(test)]
166mod tests {
167    use std::{
168        ops::{Add, Sub},
169        time::{Duration, SystemTime},
170    };
171
172    use futures::FutureExt;
173    use hex_literal::hex;
174    use hopr_chain_types::{
175        actions::Action,
176        chain_events::{ChainEventType, SignificantChainEvent},
177    };
178    use hopr_crypto_random::random_bytes;
179    use hopr_crypto_types::prelude::*;
180    use hopr_db_node::HoprNodeDb;
181    use hopr_db_sql::{
182        HoprDbGeneralModelOperations, HoprIndexerDb, channels::HoprDbChannelOperations, errors::DbSqlError,
183        info::HoprDbInfoOperations, prelude::DomainSeparator,
184    };
185    use hopr_internal_types::prelude::*;
186    use hopr_primitive_types::prelude::*;
187    use lazy_static::lazy_static;
188    use mockall::Sequence;
189
190    use crate::{
191        ChainActions,
192        action_queue::{ActionQueue, MockTransactionExecutor},
193        action_state::MockActionState,
194        channels::ChannelActions,
195        errors::ChainActionsError,
196    };
197
198    lazy_static! {
199        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
200            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
201        ))
202        .expect("lazy static keypair should be constructible");
203        static ref BOB_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
204            "48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c"
205        ))
206        .expect("lazy static keypair should be constructible");
207        static ref ALICE: Address = ALICE_KP.public().to_address();
208        static ref BOB: Address = BOB_KP.public().to_address();
209    }
210
211    async fn init_db(
212        db: &HoprIndexerDb,
213        safe_balance: HoprBalance,
214        safe_allowance: HoprBalance,
215        channel: Option<ChannelEntry>,
216    ) -> anyhow::Result<()> {
217        let db_clone = db.clone();
218        Ok(db
219            .begin_transaction()
220            .await?
221            .perform(|tx| {
222                Box::pin(async move {
223                    db_clone.set_safe_hopr_allowance(Some(tx), safe_allowance).await?;
224                    db_clone.set_safe_hopr_balance(Some(tx), safe_balance).await?;
225                    db_clone
226                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
227                        .await?;
228
229                    if let Some(channel) = channel {
230                        db_clone.upsert_channel(Some(tx), channel).await?;
231                    }
232
233                    Ok::<_, DbSqlError>(())
234                })
235            })
236            .await?)
237    }
238
239    #[tokio::test]
240    async fn test_open_channel() -> anyhow::Result<()> {
241        let stake: HoprBalance = 10_u32.into();
242        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
243
244        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
245        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
246        init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), None).await?;
247
248        let mut tx_exec = MockTransactionExecutor::new();
249        tx_exec
250            .expect_fund_channel()
251            .times(1)
252            .withf(move |dst, balance| BOB.eq(dst) && stake.eq(balance))
253            .returning(move |_, _| Ok(random_hash));
254
255        let new_channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
256
257        let mut indexer_action_tracker = MockActionState::new();
258        indexer_action_tracker
259            .expect_register_expectation()
260            .once()
261            .returning(move |_| {
262                Ok(futures::future::ok(SignificantChainEvent {
263                    tx_hash: random_hash,
264                    event_type: ChainEventType::ChannelOpened(new_channel),
265                })
266                .boxed())
267            });
268
269        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
270
271        let tx_sender = tx_queue.new_sender();
272        tokio::task::spawn(async move { tx_queue.start().await });
273
274        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_sender.clone());
275
276        let tx_res = actions.open_channel(*BOB, stake).await?.await?;
277
278        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
279        assert!(
280            matches!(tx_res.action, Action::OpenChannel(_, _)),
281            "must be open channel action"
282        );
283        assert!(
284            matches!(tx_res.event, Some(ChainEventType::ChannelOpened(_))),
285            "must correspond to open channel chain event"
286        );
287
288        Ok(())
289    }
290
291    #[tokio::test]
292    async fn test_should_not_open_channel_again() -> anyhow::Result<()> {
293        let stake = 10_u32.into();
294
295        let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
296
297        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
298        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
299        init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), Some(channel)).await?;
300
301        let tx_queue = ActionQueue::new(
302            node_db.clone(),
303            MockActionState::new(),
304            MockTransactionExecutor::new(),
305            Default::default(),
306        );
307
308        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
309
310        assert!(
311            matches!(
312                actions
313                    .open_channel(*BOB, stake)
314                    .await
315                    .err()
316                    .expect("should be an error"),
317                ChainActionsError::ChannelAlreadyExists
318            ),
319            "should fail when channel exists"
320        );
321
322        Ok(())
323    }
324
325    #[tokio::test]
326    async fn test_should_not_open_channel_to_self() -> anyhow::Result<()> {
327        let stake = 10_u32.into();
328
329        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
330        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
331        init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), None).await?;
332
333        let tx_queue = ActionQueue::new(
334            node_db.clone(),
335            MockActionState::new(),
336            MockTransactionExecutor::new(),
337            Default::default(),
338        );
339
340        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
341
342        assert!(
343            matches!(
344                actions
345                    .open_channel(*ALICE, stake)
346                    .await
347                    .err()
348                    .expect("should be an error"),
349                ChainActionsError::InvalidArguments(_)
350            ),
351            "should not create channel to self"
352        );
353        Ok(())
354    }
355
356    #[tokio::test]
357    async fn test_should_not_open_channel_with_too_big_stake() -> anyhow::Result<()> {
358        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
359        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
360        init_db(&db, U256::max_value().into(), U256::max_value().into(), None).await?;
361
362        let tx_queue = ActionQueue::new(
363            node_db.clone(),
364            MockActionState::new(),
365            MockTransactionExecutor::new(),
366            Default::default(),
367        );
368
369        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
370
371        assert!(
372            matches!(
373                actions
374                    .open_channel(*BOB, (ChannelEntry::MAX_CHANNEL_BALANCE + 1).into())
375                    .await
376                    .err()
377                    .expect("should be an error"),
378                ChainActionsError::InvalidChannelStake
379            ),
380            "should not create channel with too big stake"
381        );
382        Ok(())
383    }
384
385    #[tokio::test]
386    async fn test_should_not_open_if_not_enough_allowance() -> anyhow::Result<()> {
387        let stake = 10_000_u32.into();
388
389        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
390        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
391        init_db(&db, 5_000_000_u64.into(), 1_000_u64.into(), None).await?;
392
393        let tx_queue = ActionQueue::new(
394            node_db.clone(),
395            MockActionState::new(),
396            MockTransactionExecutor::new(),
397            Default::default(),
398        );
399
400        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
401
402        assert!(
403            matches!(
404                actions
405                    .open_channel(*BOB, stake)
406                    .await
407                    .err()
408                    .expect("should be an error"),
409                ChainActionsError::NotEnoughAllowance
410            ),
411            "should fail when not enough allowance"
412        );
413        Ok(())
414    }
415
416    #[tokio::test]
417    async fn test_should_not_open_if_not_enough_token_balance() -> anyhow::Result<()> {
418        let stake = 10_000_u32.into();
419
420        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
421        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
422        init_db(&db, 1_u64.into(), 10_000_000_u64.into(), None).await?;
423
424        let tx_queue = ActionQueue::new(
425            node_db.clone(),
426            MockActionState::new(),
427            MockTransactionExecutor::new(),
428            Default::default(),
429        );
430
431        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
432
433        assert!(
434            matches!(
435                actions
436                    .open_channel(*BOB, stake)
437                    .await
438                    .err()
439                    .expect("should be an error"),
440                ChainActionsError::BalanceTooLow
441            ),
442            "should fail when not enough token balance"
443        );
444        Ok(())
445    }
446
447    #[tokio::test]
448    async fn test_fund_channel() -> anyhow::Result<()> {
449        let stake = 10_u32.into();
450        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
451        let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
452
453        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
454        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
455        init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), Some(channel)).await?;
456
457        let mut tx_exec = MockTransactionExecutor::new();
458        tx_exec
459            .expect_fund_channel()
460            .times(1)
461            .withf(move |dest, balance| channel.destination.eq(dest) && stake.eq(balance))
462            .returning(move |_, _| Ok(random_hash));
463
464        let mut indexer_action_tracker = MockActionState::new();
465        indexer_action_tracker
466            .expect_register_expectation()
467            .once()
468            .returning(move |_| {
469                Ok(futures::future::ok(SignificantChainEvent {
470                    tx_hash: random_hash,
471                    event_type: ChainEventType::ChannelBalanceIncreased(channel, stake),
472                })
473                .boxed())
474            });
475
476        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
477        let tx_sender = tx_queue.new_sender();
478        tokio::task::spawn(async move {
479            tx_queue.start().await;
480        });
481
482        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_sender.clone());
483
484        let tx_res = actions.fund_channel(channel.get_id(), stake).await?.await?;
485
486        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
487        assert!(
488            matches!(tx_res.action, Action::FundChannel(_, _)),
489            "must be open channel action"
490        );
491        assert!(
492            matches!(tx_res.event, Some(ChainEventType::ChannelBalanceIncreased(_, _))),
493            "must correspond to channel chain event"
494        );
495        Ok(())
496    }
497
498    #[tokio::test]
499    async fn test_fund_channel_should_not_over_fund() -> anyhow::Result<()> {
500        let channel = ChannelEntry::new(
501            *ALICE,
502            *BOB,
503            HoprBalance::from(ChannelEntry::MAX_CHANNEL_BALANCE),
504            U256::zero(),
505            ChannelStatus::Open,
506            U256::zero(),
507        );
508
509        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
510        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
511        init_db(&db, U256::max_value().into(), U256::max_value().into(), Some(channel)).await?;
512
513        let tx_queue = ActionQueue::new(
514            node_db.clone(),
515            MockActionState::new(),
516            MockTransactionExecutor::new(),
517            Default::default(),
518        );
519
520        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
521
522        assert!(
523            matches!(
524                actions
525                    .fund_channel(channel.get_id(), 1.into())
526                    .await
527                    .err()
528                    .expect("should be an error"),
529                ChainActionsError::InvalidChannelStake
530            ),
531            "should fail channel stake is too high"
532        );
533        Ok(())
534    }
535
536    #[tokio::test]
537    async fn test_should_not_fund_nonexistent_channel() -> anyhow::Result<()> {
538        let channel_id = generate_channel_id(&ALICE, &BOB);
539
540        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
541        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
542        init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), None).await?;
543
544        let tx_queue = ActionQueue::new(
545            node_db.clone(),
546            MockActionState::new(),
547            MockTransactionExecutor::new(),
548            Default::default(),
549        );
550
551        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
552        let stake = 10_u32.into();
553        assert!(
554            matches!(
555                actions
556                    .fund_channel(channel_id, stake)
557                    .await
558                    .err()
559                    .expect("should be an error"),
560                ChainActionsError::ChannelDoesNotExist
561            ),
562            "should fail when channel does not exist"
563        );
564        Ok(())
565    }
566
567    #[tokio::test]
568    async fn test_should_not_fund_if_not_enough_allowance() -> anyhow::Result<()> {
569        let channel_id = generate_channel_id(&ALICE, &BOB);
570
571        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
572        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
573        init_db(&db, 5_000_000_u64.into(), 1_000_u64.into(), None).await?;
574
575        let tx_queue = ActionQueue::new(
576            node_db.clone(),
577            MockActionState::new(),
578            MockTransactionExecutor::new(),
579            Default::default(),
580        );
581
582        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
583        let stake = 10_000_u32.into();
584        assert!(
585            matches!(
586                actions
587                    .fund_channel(channel_id, stake)
588                    .await
589                    .err()
590                    .expect("should be an error"),
591                ChainActionsError::NotEnoughAllowance
592            ),
593            "should fail when not enough allowance"
594        );
595        Ok(())
596    }
597
598    #[tokio::test]
599    async fn test_should_not_fund_if_not_enough_balance() -> anyhow::Result<()> {
600        let channel_id = generate_channel_id(&ALICE, &BOB);
601
602        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
603        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
604        init_db(&db, 1_u64.into(), 100_000_u64.into(), None).await?;
605
606        let tx_queue = ActionQueue::new(
607            node_db.clone(),
608            MockActionState::new(),
609            MockTransactionExecutor::new(),
610            Default::default(),
611        );
612
613        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
614        let stake = 10_000_u32.into();
615        assert!(
616            matches!(
617                actions
618                    .fund_channel(channel_id, stake)
619                    .await
620                    .err()
621                    .expect("should be an error"),
622                ChainActionsError::BalanceTooLow
623            ),
624            "should fail when not enough balance"
625        );
626        Ok(())
627    }
628
629    #[tokio::test]
630    async fn test_close_channel_outgoing() -> anyhow::Result<()> {
631        let stake = 10_u32.into();
632        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
633
634        let mut channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
635
636        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
637        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
638        init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
639
640        let mut tx_exec = MockTransactionExecutor::new();
641        let mut seq = Sequence::new();
642        tx_exec
643            .expect_initiate_outgoing_channel_closure()
644            .times(1)
645            .in_sequence(&mut seq)
646            .withf(move |dst| BOB.eq(dst))
647            .returning(move |_| Ok(random_hash));
648
649        tx_exec
650            .expect_finalize_outgoing_channel_closure()
651            .times(1)
652            .in_sequence(&mut seq)
653            .withf(move |dst| BOB.eq(dst))
654            .returning(move |_| Ok(random_hash));
655
656        let mut indexer_action_tracker = MockActionState::new();
657        let mut seq2 = Sequence::new();
658        indexer_action_tracker
659            .expect_register_expectation()
660            .once()
661            .in_sequence(&mut seq2)
662            .returning(move |_| {
663                Ok(futures::future::ok(SignificantChainEvent {
664                    tx_hash: random_hash,
665                    event_type: ChainEventType::ChannelClosureInitiated(channel),
666                })
667                .boxed())
668            });
669
670        indexer_action_tracker
671            .expect_register_expectation()
672            .once()
673            .in_sequence(&mut seq2)
674            .returning(move |_| {
675                Ok(futures::future::ok(SignificantChainEvent {
676                    tx_hash: random_hash,
677                    event_type: ChainEventType::ChannelClosed(channel),
678                })
679                .boxed())
680            });
681
682        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
683        let tx_sender = tx_queue.new_sender();
684        tokio::task::spawn(async move {
685            tx_queue.start().await;
686        });
687
688        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_sender.clone());
689
690        let tx_res = actions.close_channel(channel.clone()).await?.await?;
691
692        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
693        assert!(
694            matches!(tx_res.action, Action::CloseChannel(_, _)),
695            "must be close channel action"
696        );
697        assert!(
698            matches!(tx_res.event, Some(ChainEventType::ChannelClosureInitiated(_))),
699            "must correspond to channel chain event"
700        );
701
702        // Transition the channel to the PendingToClose state with the closure time already elapsed
703        channel.status = ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(10)));
704
705        db.upsert_channel(None, channel).await?;
706
707        let tx_res = actions.close_channel(channel.clone()).await?.await?;
708
709        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
710        assert!(
711            matches!(tx_res.action, Action::CloseChannel(_, _)),
712            "must be close channel action"
713        );
714        assert!(
715            matches!(tx_res.event, Some(ChainEventType::ChannelClosed(_))),
716            "must correspond to channel chain event"
717        );
718        Ok(())
719    }
720
721    #[tokio::test]
722    async fn test_close_channel_incoming() -> anyhow::Result<()> {
723        let stake = 10_u32.into();
724        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
725
726        let channel = ChannelEntry::new(*BOB, *ALICE, stake, U256::zero(), ChannelStatus::Open, U256::zero());
727
728        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
729        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
730        init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
731
732        let mut tx_exec = MockTransactionExecutor::new();
733        let mut seq = Sequence::new();
734        tx_exec
735            .expect_close_incoming_channel()
736            .times(1)
737            .in_sequence(&mut seq)
738            .withf(move |dst| BOB.eq(dst))
739            .returning(move |_| Ok(random_hash));
740
741        let mut indexer_action_tracker = MockActionState::new();
742        indexer_action_tracker
743            .expect_register_expectation()
744            .returning(move |_| {
745                Ok(futures::future::ok(SignificantChainEvent {
746                    tx_hash: random_hash,
747                    event_type: ChainEventType::ChannelClosed(channel),
748                })
749                .boxed())
750            });
751
752        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
753        let tx_sender = tx_queue.new_sender();
754        tokio::task::spawn(async move {
755            tx_queue.start().await;
756        });
757
758        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_sender.clone());
759
760        let tx_res = actions.close_channel(channel.clone()).await?.await?;
761
762        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
763        assert!(
764            matches!(tx_res.action, Action::CloseChannel(_, _)),
765            "must be close channel action"
766        );
767        assert!(
768            matches!(tx_res.event, Some(ChainEventType::ChannelClosed(_))),
769            "must correspond to channel chain event"
770        );
771        Ok(())
772    }
773
774    #[tokio::test]
775    async fn test_should_not_close_when_closure_time_did_not_elapse() -> anyhow::Result<()> {
776        let stake = 10_u32.into();
777
778        let channel = ChannelEntry::new(
779            *ALICE,
780            *BOB,
781            stake,
782            U256::zero(),
783            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100))),
784            U256::zero(),
785        );
786
787        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
788        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
789        init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
790
791        let tx_queue = ActionQueue::new(
792            node_db.clone(),
793            MockActionState::new(),
794            MockTransactionExecutor::new(),
795            Default::default(),
796        );
797
798        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
799
800        assert!(
801            matches!(
802                actions
803                    .close_channel(channel.clone())
804                    .await
805                    .err()
806                    .expect("should be an error"),
807                ChainActionsError::ClosureTimeHasNotElapsed(_)
808            ),
809            "should fail when the channel closure period did not elapse"
810        );
811        Ok(())
812    }
813
814    #[tokio::test]
815    async fn test_should_not_close_foreign_channel() -> anyhow::Result<()> {
816        let channel = ChannelEntry::new(
817            Address::from([0x01; 20]),
818            Address::from([0x02; 20]),
819            10_u32.into(),
820            U256::zero(),
821            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100))),
822            U256::zero(),
823        );
824
825        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
826        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
827        init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), None).await?;
828
829        let tx_queue = ActionQueue::new(
830            node_db.clone(),
831            MockActionState::new(),
832            MockTransactionExecutor::new(),
833            Default::default(),
834        );
835        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
836
837        assert!(
838            matches!(
839                actions.close_channel(channel).await.err().expect("should be an error"),
840                ChainActionsError::ChannelDoesNotExist
841            ),
842            "should fail when channel does not exist"
843        );
844        Ok(())
845    }
846
847    #[tokio::test]
848    async fn test_should_not_close_closed_channel() -> anyhow::Result<()> {
849        let stake = 10_u32.into();
850        let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Closed, U256::zero());
851
852        let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
853        let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
854        init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
855
856        let tx_queue = ActionQueue::new(
857            node_db.clone(),
858            MockActionState::new(),
859            MockTransactionExecutor::new(),
860            Default::default(),
861        );
862
863        let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
864
865        assert!(
866            matches!(
867                actions
868                    .close_channel(channel.clone())
869                    .await
870                    .err()
871                    .expect("should be an error"),
872                ChainActionsError::ChannelAlreadyClosed
873            ),
874            "should fail when channel is already closed"
875        );
876        Ok(())
877    }
878}