hopr_chain_actions/
channels.rs

1//! This module contains the [ChannelActions] trait defining HOPR channels operations.
2//!
3//! An implementation of this trait is added to [ChainActions] which realizes the redemption
4//! operations via [ActionQueue](crate::action_queue::ActionQueue).
5//! There are 4 basic high-level on-chain functions in the [ChannelActions] trait:
6//! - [open_channel](ChannelActions::open_channel)
7//! - [fund_channel](ChannelActions::fund_channel)
8//! - [close_channel](ChannelActions::close_channel)
9//!
10//! All the functions do the necessary validations using the DB and then post the corresponding action
11//! into the [ActionQueue](crate::action_queue::ActionQueue).
12//! The functions return immediately but provide futures that can be awaited in case the callers wishes to await the
13//! on-chain confirmation of the corresponding operation.
14//! See the details in [ActionQueue](crate::action_queue::ActionQueue) on how the confirmation is realized by awaiting
15//! the respective [SignificantChainEvent](hopr_chain_types::chain_events::SignificantChainEvent) by the Indexer.
16use std::time::Duration;
17
18use async_trait::async_trait;
19use hopr_chain_types::actions::Action;
20use hopr_crypto_types::types::Hash;
21use hopr_db_sql::HoprDbAllOperations;
22use hopr_internal_types::prelude::*;
23use hopr_platform::time::native::current_time;
24use hopr_primitive_types::prelude::*;
25use tracing::{debug, error, info};
26
27use crate::{
28    ChainActions,
29    action_queue::PendingAction,
30    errors::{
31        ChainActionsError::{
32            BalanceTooLow, ChannelAlreadyClosed, ChannelAlreadyExists, ChannelDoesNotExist, ClosureTimeHasNotElapsed,
33            InvalidArguments, InvalidChannelStake, InvalidState, NotEnoughAllowance, PeerAccessDenied,
34        },
35        Result,
36    },
37    redeem::TicketRedeemActions,
38};
39
40/// Gathers all channel-related on-chain actions.
41#[async_trait]
42pub trait ChannelActions {
43    /// Opens a channel to the given `destination` with the given `amount` staked.
44    async fn open_channel(&self, destination: Address, amount: HoprBalance) -> Result<PendingAction>;
45
46    /// Funds the given channel with the given `amount`
47    async fn fund_channel(&self, channel_id: Hash, amount: HoprBalance) -> Result<PendingAction>;
48
49    /// Closes the channel to counterparty in the given direction. Optionally can issue redeeming of all tickets in that
50    /// channel, in case the `direction` is [`ChannelDirection::Incoming`].
51    async fn close_channel(
52        &self,
53        counterparty: Address,
54        direction: ChannelDirection,
55        redeem_before_close: bool,
56    ) -> Result<PendingAction>;
57}
58
59#[async_trait]
60impl<Db> ChannelActions for ChainActions<Db>
61where
62    Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
63{
64    #[tracing::instrument(level = "debug", skip(self))]
65    async fn open_channel(&self, destination: Address, amount: HoprBalance) -> Result<PendingAction> {
66        if self.self_address() == destination {
67            return Err(InvalidArguments("cannot open channel to self".into()));
68        }
69
70        if amount.is_zero() {
71            return Err(InvalidArguments("invalid balance or balance type given".into()));
72        }
73
74        // Perform all checks
75        let db_clone = self.db.clone();
76        let self_addr = self.self_address();
77        self.db
78            .begin_transaction()
79            .await?
80            .perform(|tx| {
81                Box::pin(async move {
82                    let allowance = db_clone.get_safe_hopr_allowance(Some(tx)).await?;
83                    debug!(%allowance, "current staking safe allowance");
84                    if allowance < amount {
85                        return Err(NotEnoughAllowance);
86                    }
87
88                    let hopr_balance = db_clone.get_safe_hopr_balance(Some(tx)).await?;
89                    debug!(balance = %hopr_balance, "current Safe HOPR balance");
90                    if hopr_balance < amount {
91                        return Err(BalanceTooLow);
92                    }
93
94                    if HoprBalance::from(ChannelEntry::MAX_CHANNEL_BALANCE) < amount {
95                        return Err(InvalidChannelStake);
96                    }
97
98                    if db_clone.get_indexer_data(Some(tx)).await?.nr_enabled
99                        && !db_clone.is_allowed_in_network_registry(Some(tx), &destination).await?
100                    {
101                        return Err(PeerAccessDenied);
102                    }
103
104                    let maybe_channel = db_clone
105                        .get_channel_by_parties(Some(tx), &self_addr, &destination, false)
106                        .await?;
107                    if let Some(channel) = maybe_channel {
108                        debug!(%channel, "already found existing channel");
109                        if channel.status != ChannelStatus::Closed {
110                            error!(
111                                %destination,
112                                "channel to destination is already opened or pending to close"
113                            );
114                            return Err(ChannelAlreadyExists);
115                        }
116                    }
117                    Ok(())
118                })
119            })
120            .await?;
121
122        info!(%destination, %amount, "initiating channel open");
123        self.tx_sender.send(Action::OpenChannel(destination, amount)).await
124    }
125
126    #[tracing::instrument(level = "debug", skip(self))]
127    async fn fund_channel(&self, channel_id: Hash, amount: HoprBalance) -> Result<PendingAction> {
128        if amount.is_zero() {
129            return Err(InvalidArguments("invalid balance or balance type given".into()));
130        }
131
132        let db_clone = self.db.clone();
133        let maybe_channel = self
134            .db
135            .begin_transaction()
136            .await?
137            .perform(|tx| {
138                Box::pin(async move {
139                    let allowance = db_clone.get_safe_hopr_allowance(Some(tx)).await?;
140                    debug!(%allowance, "current staking safe allowance");
141                    if allowance.lt(&amount) {
142                        return Err(NotEnoughAllowance);
143                    }
144
145                    let hopr_balance = db_clone.get_safe_hopr_balance(Some(tx)).await?;
146                    debug!(balance = %hopr_balance, "current Safe HOPR balance");
147                    if hopr_balance.lt(&amount) {
148                        return Err(BalanceTooLow);
149                    }
150
151                    Ok(db_clone.get_channel_by_id(Some(tx), &channel_id).await?)
152                })
153            })
154            .await?;
155
156        match maybe_channel {
157            Some(channel) => {
158                if channel.status == ChannelStatus::Open {
159                    if channel.balance + amount > HoprBalance::from(ChannelEntry::MAX_CHANNEL_BALANCE) {
160                        return Err(InvalidChannelStake);
161                    }
162
163                    info!("initiating funding of {channel} with {amount}");
164                    self.tx_sender.send(Action::FundChannel(channel, amount)).await
165                } else {
166                    Err(InvalidState(format!("channel {channel_id} is not opened")))
167                }
168            }
169            None => Err(ChannelDoesNotExist),
170        }
171    }
172
173    #[tracing::instrument(level = "debug", skip(self))]
174    async fn close_channel(
175        &self,
176        counterparty: Address,
177        direction: ChannelDirection,
178        redeem_before_close: bool,
179    ) -> Result<PendingAction> {
180        let maybe_channel = match direction {
181            ChannelDirection::Incoming => {
182                self.db
183                    .get_channel_by_parties(None, &counterparty, &self.self_address(), false)
184                    .await?
185            }
186            ChannelDirection::Outgoing => {
187                self.db
188                    .get_channel_by_parties(None, &self.self_address(), &counterparty, false)
189                    .await?
190            }
191        };
192
193        match maybe_channel {
194            Some(channel) => {
195                match channel.status {
196                    ChannelStatus::Closed => Err(ChannelAlreadyClosed),
197                    ChannelStatus::PendingToClose(_) => {
198                        let remaining_closure_time = channel.remaining_closure_time(current_time());
199                        info!(%channel, ?remaining_closure_time, "remaining closure time update for a channel");
200                        match remaining_closure_time {
201                            Some(Duration::ZERO) => {
202                                info!(%channel, %direction, "initiating finalization of channel closure");
203                                self.tx_sender.send(Action::CloseChannel(channel, direction)).await
204                            }
205                            _ => Err(ClosureTimeHasNotElapsed(
206                                channel
207                                    .remaining_closure_time(current_time())
208                                    .expect("impossible: closure time has not passed but no remaining closure time")
209                                    .as_secs(),
210                            )),
211                        }
212                    }
213                    ChannelStatus::Open => {
214                        if redeem_before_close && direction == ChannelDirection::Incoming {
215                            // TODO: trigger aggregation
216                            // Do not await the redemption, just submit it to the queue
217                            let redeemed = self.redeem_tickets_in_channel(&channel, false).await?.len();
218                            info!(count = redeemed, %channel, "redeemed tickets before channel closing");
219                        }
220
221                        info!(%channel, ?direction, "initiating channel closure");
222                        self.tx_sender.send(Action::CloseChannel(channel, direction)).await
223                    }
224                }
225            }
226            None => Err(ChannelDoesNotExist),
227        }
228    }
229}
230#[cfg(test)]
231mod tests {
232    use std::{
233        ops::{Add, Sub},
234        time::{Duration, SystemTime},
235    };
236
237    use futures::FutureExt;
238    use hex_literal::hex;
239    use hopr_chain_types::{
240        actions::Action,
241        chain_events::{ChainEventType, SignificantChainEvent},
242    };
243    use hopr_crypto_random::random_bytes;
244    use hopr_crypto_types::prelude::*;
245    use hopr_db_sql::{
246        HoprDbGeneralModelOperations, api::info::DomainSeparator, channels::HoprDbChannelOperations, db::HoprDb,
247        errors::DbSqlError, info::HoprDbInfoOperations,
248    };
249    use hopr_internal_types::prelude::*;
250    use hopr_primitive_types::prelude::*;
251    use lazy_static::lazy_static;
252    use mockall::Sequence;
253
254    use crate::{
255        ChainActions,
256        action_queue::{ActionQueue, MockTransactionExecutor},
257        action_state::MockActionState,
258        channels::ChannelActions,
259        errors::ChainActionsError,
260    };
261
262    lazy_static! {
263        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
264            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
265        ))
266        .expect("lazy static keypair should be constructible");
267        static ref BOB_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
268            "48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c"
269        ))
270        .expect("lazy static keypair should be constructible");
271        static ref ALICE: Address = ALICE_KP.public().to_address();
272        static ref BOB: Address = BOB_KP.public().to_address();
273    }
274
275    async fn init_db(
276        db: &HoprDb,
277        safe_balance: HoprBalance,
278        safe_allowance: HoprBalance,
279        channel: Option<ChannelEntry>,
280    ) -> anyhow::Result<()> {
281        let db_clone = db.clone();
282        Ok(db
283            .begin_transaction()
284            .await?
285            .perform(|tx| {
286                Box::pin(async move {
287                    db_clone.set_safe_hopr_allowance(Some(tx), safe_allowance).await?;
288                    db_clone.set_safe_hopr_balance(Some(tx), safe_balance).await?;
289                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
290                    db_clone
291                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
292                        .await?;
293
294                    if let Some(channel) = channel {
295                        db_clone.upsert_channel(Some(tx), channel).await?;
296                    }
297
298                    Ok::<_, DbSqlError>(())
299                })
300            })
301            .await?)
302    }
303
304    #[tokio::test]
305    async fn test_open_channel() -> anyhow::Result<()> {
306        let stake: HoprBalance = 10_u32.into();
307        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
308
309        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
310        init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), None).await?;
311
312        let mut tx_exec = MockTransactionExecutor::new();
313        tx_exec
314            .expect_fund_channel()
315            .times(1)
316            .withf(move |dst, balance| BOB.eq(dst) && stake.eq(balance))
317            .returning(move |_, _| Ok(random_hash));
318
319        let new_channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
320
321        let mut indexer_action_tracker = MockActionState::new();
322        indexer_action_tracker
323            .expect_register_expectation()
324            .once()
325            .returning(move |_| {
326                Ok(futures::future::ok(SignificantChainEvent {
327                    tx_hash: random_hash,
328                    event_type: ChainEventType::ChannelOpened(new_channel),
329                })
330                .boxed())
331            });
332
333        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
334
335        let tx_sender = tx_queue.new_sender();
336        tokio::task::spawn(async move { tx_queue.start().await });
337
338        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
339
340        let tx_res = actions.open_channel(*BOB, stake).await?.await?;
341
342        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
343        assert!(
344            matches!(tx_res.action, Action::OpenChannel(_, _)),
345            "must be open channel action"
346        );
347        assert!(
348            matches!(tx_res.event, Some(ChainEventType::ChannelOpened(_))),
349            "must correspond to open channel chain event"
350        );
351
352        Ok(())
353    }
354
355    #[tokio::test]
356    async fn test_should_not_open_channel_again() -> anyhow::Result<()> {
357        let stake = 10_u32.into();
358
359        let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
360
361        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
362        init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), Some(channel)).await?;
363
364        let tx_queue = ActionQueue::new(
365            db.clone(),
366            MockActionState::new(),
367            MockTransactionExecutor::new(),
368            Default::default(),
369        );
370
371        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
372
373        assert!(
374            matches!(
375                actions
376                    .open_channel(*BOB, stake)
377                    .await
378                    .err()
379                    .expect("should be an error"),
380                ChainActionsError::ChannelAlreadyExists
381            ),
382            "should fail when channel exists"
383        );
384
385        Ok(())
386    }
387
388    #[tokio::test]
389    async fn test_should_not_open_channel_to_self() -> anyhow::Result<()> {
390        let stake = 10_u32.into();
391
392        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
393        init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), None).await?;
394
395        let tx_queue = ActionQueue::new(
396            db.clone(),
397            MockActionState::new(),
398            MockTransactionExecutor::new(),
399            Default::default(),
400        );
401
402        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
403
404        assert!(
405            matches!(
406                actions
407                    .open_channel(*ALICE, stake)
408                    .await
409                    .err()
410                    .expect("should be an error"),
411                ChainActionsError::InvalidArguments(_)
412            ),
413            "should not create channel to self"
414        );
415        Ok(())
416    }
417
418    #[tokio::test]
419    async fn test_should_not_open_channel_with_too_big_stake() -> anyhow::Result<()> {
420        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
421        init_db(&db, U256::max_value().into(), U256::max_value().into(), None).await?;
422
423        let tx_queue = ActionQueue::new(
424            db.clone(),
425            MockActionState::new(),
426            MockTransactionExecutor::new(),
427            Default::default(),
428        );
429
430        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
431
432        assert!(
433            matches!(
434                actions
435                    .open_channel(*BOB, (ChannelEntry::MAX_CHANNEL_BALANCE + 1).into())
436                    .await
437                    .err()
438                    .expect("should be an error"),
439                ChainActionsError::InvalidChannelStake
440            ),
441            "should not create channel with too big stake"
442        );
443        Ok(())
444    }
445
446    #[tokio::test]
447    async fn test_should_not_open_if_not_enough_allowance() -> anyhow::Result<()> {
448        let stake = 10_000_u32.into();
449
450        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
451        init_db(&db, 5_000_000_u64.into(), 1_000_u64.into(), None).await?;
452
453        let tx_queue = ActionQueue::new(
454            db.clone(),
455            MockActionState::new(),
456            MockTransactionExecutor::new(),
457            Default::default(),
458        );
459
460        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
461
462        assert!(
463            matches!(
464                actions
465                    .open_channel(*BOB, stake)
466                    .await
467                    .err()
468                    .expect("should be an error"),
469                ChainActionsError::NotEnoughAllowance
470            ),
471            "should fail when not enough allowance"
472        );
473        Ok(())
474    }
475
476    #[tokio::test]
477    async fn test_should_not_open_if_not_enough_token_balance() -> anyhow::Result<()> {
478        let stake = 10_000_u32.into();
479
480        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
481        init_db(&db, 1_u64.into(), 10_000_000_u64.into(), None).await?;
482
483        let tx_queue = ActionQueue::new(
484            db.clone(),
485            MockActionState::new(),
486            MockTransactionExecutor::new(),
487            Default::default(),
488        );
489
490        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
491
492        assert!(
493            matches!(
494                actions
495                    .open_channel(*BOB, stake)
496                    .await
497                    .err()
498                    .expect("should be an error"),
499                ChainActionsError::BalanceTooLow
500            ),
501            "should fail when not enough token balance"
502        );
503        Ok(())
504    }
505
506    #[tokio::test]
507    async fn test_fund_channel() -> anyhow::Result<()> {
508        let stake = 10_u32.into();
509        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
510        let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
511
512        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
513        init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), Some(channel)).await?;
514
515        let mut tx_exec = MockTransactionExecutor::new();
516        tx_exec
517            .expect_fund_channel()
518            .times(1)
519            .withf(move |dest, balance| channel.destination.eq(dest) && stake.eq(balance))
520            .returning(move |_, _| Ok(random_hash));
521
522        let mut indexer_action_tracker = MockActionState::new();
523        indexer_action_tracker
524            .expect_register_expectation()
525            .once()
526            .returning(move |_| {
527                Ok(futures::future::ok(SignificantChainEvent {
528                    tx_hash: random_hash,
529                    event_type: ChainEventType::ChannelBalanceIncreased(channel, stake),
530                })
531                .boxed())
532            });
533
534        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
535        let tx_sender = tx_queue.new_sender();
536        tokio::task::spawn(async move {
537            tx_queue.start().await;
538        });
539
540        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
541
542        let tx_res = actions.fund_channel(channel.get_id(), stake).await?.await?;
543
544        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
545        assert!(
546            matches!(tx_res.action, Action::FundChannel(_, _)),
547            "must be open channel action"
548        );
549        assert!(
550            matches!(tx_res.event, Some(ChainEventType::ChannelBalanceIncreased(_, _))),
551            "must correspond to channel chain event"
552        );
553        Ok(())
554    }
555
556    #[tokio::test]
557    async fn test_fund_channel_should_not_over_fund() -> anyhow::Result<()> {
558        let channel = ChannelEntry::new(
559            *ALICE,
560            *BOB,
561            HoprBalance::from(ChannelEntry::MAX_CHANNEL_BALANCE),
562            U256::zero(),
563            ChannelStatus::Open,
564            U256::zero(),
565        );
566
567        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
568        init_db(&db, U256::max_value().into(), U256::max_value().into(), Some(channel)).await?;
569
570        let tx_queue = ActionQueue::new(
571            db.clone(),
572            MockActionState::new(),
573            MockTransactionExecutor::new(),
574            Default::default(),
575        );
576
577        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
578
579        assert!(
580            matches!(
581                actions
582                    .fund_channel(channel.get_id(), 1.into())
583                    .await
584                    .err()
585                    .expect("should be an error"),
586                ChainActionsError::InvalidChannelStake
587            ),
588            "should fail channel stake is too high"
589        );
590        Ok(())
591    }
592
593    #[tokio::test]
594    async fn test_should_not_fund_nonexistent_channel() -> anyhow::Result<()> {
595        let channel_id = generate_channel_id(&ALICE, &BOB);
596
597        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
598        init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), None).await?;
599
600        let tx_queue = ActionQueue::new(
601            db.clone(),
602            MockActionState::new(),
603            MockTransactionExecutor::new(),
604            Default::default(),
605        );
606
607        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
608        let stake = 10_u32.into();
609        assert!(
610            matches!(
611                actions
612                    .fund_channel(channel_id, stake)
613                    .await
614                    .err()
615                    .expect("should be an error"),
616                ChainActionsError::ChannelDoesNotExist
617            ),
618            "should fail when channel does not exist"
619        );
620        Ok(())
621    }
622
623    #[tokio::test]
624    async fn test_should_not_fund_if_not_enough_allowance() -> anyhow::Result<()> {
625        let channel_id = generate_channel_id(&ALICE, &BOB);
626
627        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
628        init_db(&db, 5_000_000_u64.into(), 1_000_u64.into(), None).await?;
629
630        let tx_queue = ActionQueue::new(
631            db.clone(),
632            MockActionState::new(),
633            MockTransactionExecutor::new(),
634            Default::default(),
635        );
636
637        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
638        let stake = 10_000_u32.into();
639        assert!(
640            matches!(
641                actions
642                    .fund_channel(channel_id, stake)
643                    .await
644                    .err()
645                    .expect("should be an error"),
646                ChainActionsError::NotEnoughAllowance
647            ),
648            "should fail when not enough allowance"
649        );
650        Ok(())
651    }
652
653    #[tokio::test]
654    async fn test_should_not_fund_if_not_enough_balance() -> anyhow::Result<()> {
655        let channel_id = generate_channel_id(&ALICE, &BOB);
656
657        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
658        init_db(&db, 1_u64.into(), 100_000_u64.into(), None).await?;
659
660        let tx_queue = ActionQueue::new(
661            db.clone(),
662            MockActionState::new(),
663            MockTransactionExecutor::new(),
664            Default::default(),
665        );
666
667        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
668        let stake = 10_000_u32.into();
669        assert!(
670            matches!(
671                actions
672                    .fund_channel(channel_id, stake)
673                    .await
674                    .err()
675                    .expect("should be an error"),
676                ChainActionsError::BalanceTooLow
677            ),
678            "should fail when not enough balance"
679        );
680        Ok(())
681    }
682
683    #[tokio::test]
684    async fn test_close_channel_outgoing() -> anyhow::Result<()> {
685        let stake = 10_u32.into();
686        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
687
688        let mut channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
689
690        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
691        init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
692
693        let mut tx_exec = MockTransactionExecutor::new();
694        let mut seq = Sequence::new();
695        tx_exec
696            .expect_initiate_outgoing_channel_closure()
697            .times(1)
698            .in_sequence(&mut seq)
699            .withf(move |dst| BOB.eq(dst))
700            .returning(move |_| Ok(random_hash));
701
702        tx_exec
703            .expect_finalize_outgoing_channel_closure()
704            .times(1)
705            .in_sequence(&mut seq)
706            .withf(move |dst| BOB.eq(dst))
707            .returning(move |_| Ok(random_hash));
708
709        let mut indexer_action_tracker = MockActionState::new();
710        let mut seq2 = Sequence::new();
711        indexer_action_tracker
712            .expect_register_expectation()
713            .once()
714            .in_sequence(&mut seq2)
715            .returning(move |_| {
716                Ok(futures::future::ok(SignificantChainEvent {
717                    tx_hash: random_hash,
718                    event_type: ChainEventType::ChannelClosureInitiated(channel),
719                })
720                .boxed())
721            });
722
723        indexer_action_tracker
724            .expect_register_expectation()
725            .once()
726            .in_sequence(&mut seq2)
727            .returning(move |_| {
728                Ok(futures::future::ok(SignificantChainEvent {
729                    tx_hash: random_hash,
730                    event_type: ChainEventType::ChannelClosed(channel),
731                })
732                .boxed())
733            });
734
735        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
736        let tx_sender = tx_queue.new_sender();
737        tokio::task::spawn(async move {
738            tx_queue.start().await;
739        });
740
741        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
742
743        let tx_res = actions
744            .close_channel(*BOB, ChannelDirection::Outgoing, false)
745            .await?
746            .await?;
747
748        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
749        assert!(
750            matches!(tx_res.action, Action::CloseChannel(_, _)),
751            "must be close channel action"
752        );
753        assert!(
754            matches!(tx_res.event, Some(ChainEventType::ChannelClosureInitiated(_))),
755            "must correspond to channel chain event"
756        );
757
758        // Transition the channel to the PendingToClose state with the closure time already elapsed
759        channel.status = ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(10)));
760
761        db.upsert_channel(None, channel).await?;
762
763        let tx_res = actions
764            .close_channel(*BOB, ChannelDirection::Outgoing, false)
765            .await?
766            .await?;
767
768        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
769        assert!(
770            matches!(tx_res.action, Action::CloseChannel(_, _)),
771            "must be close channel action"
772        );
773        assert!(
774            matches!(tx_res.event, Some(ChainEventType::ChannelClosed(_))),
775            "must correspond to channel chain event"
776        );
777        Ok(())
778    }
779
780    #[tokio::test]
781    async fn test_close_channel_incoming() -> anyhow::Result<()> {
782        let stake = 10_u32.into();
783        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
784
785        let channel = ChannelEntry::new(*BOB, *ALICE, stake, U256::zero(), ChannelStatus::Open, U256::zero());
786
787        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
788        init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
789
790        let mut tx_exec = MockTransactionExecutor::new();
791        let mut seq = Sequence::new();
792        tx_exec
793            .expect_close_incoming_channel()
794            .times(1)
795            .in_sequence(&mut seq)
796            .withf(move |dst| BOB.eq(dst))
797            .returning(move |_| Ok(random_hash));
798
799        let mut indexer_action_tracker = MockActionState::new();
800        indexer_action_tracker
801            .expect_register_expectation()
802            .returning(move |_| {
803                Ok(futures::future::ok(SignificantChainEvent {
804                    tx_hash: random_hash,
805                    event_type: ChainEventType::ChannelClosed(channel),
806                })
807                .boxed())
808            });
809
810        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
811        let tx_sender = tx_queue.new_sender();
812        tokio::task::spawn(async move {
813            tx_queue.start().await;
814        });
815
816        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
817
818        let tx_res = actions
819            .close_channel(*BOB, ChannelDirection::Incoming, false)
820            .await?
821            .await?;
822
823        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
824        assert!(
825            matches!(tx_res.action, Action::CloseChannel(_, _)),
826            "must be close channel action"
827        );
828        assert!(
829            matches!(tx_res.event, Some(ChainEventType::ChannelClosed(_))),
830            "must correspond to channel chain event"
831        );
832        Ok(())
833    }
834
835    #[tokio::test]
836    async fn test_should_not_close_when_closure_time_did_not_elapse() -> anyhow::Result<()> {
837        let stake = 10_u32.into();
838
839        let channel = ChannelEntry::new(
840            *ALICE,
841            *BOB,
842            stake,
843            U256::zero(),
844            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100))),
845            U256::zero(),
846        );
847
848        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
849        init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
850
851        let tx_queue = ActionQueue::new(
852            db.clone(),
853            MockActionState::new(),
854            MockTransactionExecutor::new(),
855            Default::default(),
856        );
857
858        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
859
860        assert!(
861            matches!(
862                actions
863                    .close_channel(*BOB, ChannelDirection::Outgoing, false)
864                    .await
865                    .err()
866                    .expect("should be an error"),
867                ChainActionsError::ClosureTimeHasNotElapsed(_)
868            ),
869            "should fail when the channel closure period did not elapse"
870        );
871        Ok(())
872    }
873
874    #[tokio::test]
875    async fn test_should_not_close_nonexistent_channel() -> anyhow::Result<()> {
876        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
877        init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), None).await?;
878
879        let tx_queue = ActionQueue::new(
880            db.clone(),
881            MockActionState::new(),
882            MockTransactionExecutor::new(),
883            Default::default(),
884        );
885        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
886
887        assert!(
888            matches!(
889                actions
890                    .close_channel(*BOB, ChannelDirection::Outgoing, false)
891                    .await
892                    .err()
893                    .expect("should be an error"),
894                ChainActionsError::ChannelDoesNotExist
895            ),
896            "should fail when channel does not exist"
897        );
898        Ok(())
899    }
900
901    #[tokio::test]
902    async fn test_should_not_close_closed_channel() -> anyhow::Result<()> {
903        let stake = 10_u32.into();
904        let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Closed, U256::zero());
905
906        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
907        init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
908
909        let tx_queue = ActionQueue::new(
910            db.clone(),
911            MockActionState::new(),
912            MockTransactionExecutor::new(),
913            Default::default(),
914        );
915
916        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
917
918        assert!(
919            matches!(
920                actions
921                    .close_channel(*BOB, ChannelDirection::Outgoing, false)
922                    .await
923                    .err()
924                    .expect("should be an error"),
925                ChainActionsError::ChannelAlreadyClosed
926            ),
927            "should fail when channel is already closed"
928        );
929        Ok(())
930    }
931}