hopr_chain_actions/
channels.rs

1//! This module contains the [ChannelActions] trait defining HOPR channels operations.
2//!
3//! An implementation of this trait is added to [ChainActions] which realizes the redemption
4//! operations via [ActionQueue](crate::action_queue::ActionQueue).
5//! There are 4 basic high-level on-chain functions in the [ChannelActions] trait:
6//! - [open_channel](ChannelActions::open_channel)
7//! - [fund_channel](ChannelActions::fund_channel)
8//! - [close_channel](ChannelActions::close_channel)
9//!
10//! All the functions do the necessary validations using the DB and then post the corresponding action
11//! into the [ActionQueue](crate::action_queue::ActionQueue).
12//! The functions return immediately, but provide futures that can be awaited in case the callers wishes to await the on-chain
13//! confirmation of the corresponding operation.
14//! See the details in [ActionQueue](crate::action_queue::ActionQueue) on how the confirmation is realized by awaiting the respective [SignificantChainEvent](hopr_chain_types::chain_events::SignificantChainEvent)
15//! by the Indexer.
16use async_trait::async_trait;
17use std::time::Duration;
18use tracing::{debug, error, info};
19
20use hopr_chain_types::actions::Action;
21use hopr_crypto_types::types::Hash;
22use hopr_db_sql::HoprDbAllOperations;
23use hopr_internal_types::prelude::*;
24use hopr_primitive_types::prelude::*;
25
26use crate::action_queue::PendingAction;
27use crate::errors::ChainActionsError::{
28    BalanceTooLow, ClosureTimeHasNotElapsed, InvalidArguments, InvalidState, NotEnoughAllowance, PeerAccessDenied,
29};
30use crate::errors::{
31    ChainActionsError::{ChannelAlreadyClosed, ChannelAlreadyExists, ChannelDoesNotExist},
32    Result,
33};
34use crate::redeem::TicketRedeemActions;
35use crate::ChainActions;
36
37use hopr_platform::time::native::current_time;
38
39/// Gathers all channel related on-chain actions.
40#[async_trait]
41pub trait ChannelActions {
42    /// Opens a channel to the given `destination` with the given `amount` staked.
43    async fn open_channel(&self, destination: Address, amount: Balance) -> Result<PendingAction>;
44
45    /// Funds the given channel with the given `amount`
46    async fn fund_channel(&self, channel_id: Hash, amount: Balance) -> Result<PendingAction>;
47
48    /// Closes the channel to counterparty in the given direction. Optionally can issue redeeming of all tickets in that channel,
49    /// in case the `direction` is [`ChannelDirection::Incoming`].
50    async fn close_channel(
51        &self,
52        counterparty: Address,
53        direction: ChannelDirection,
54        redeem_before_close: bool,
55    ) -> Result<PendingAction>;
56}
57
58#[async_trait]
59impl<Db> ChannelActions for ChainActions<Db>
60where
61    Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
62{
63    #[tracing::instrument(level = "debug", skip(self))]
64    async fn open_channel(&self, destination: Address, amount: Balance) -> Result<PendingAction> {
65        if self.self_address() == destination {
66            return Err(InvalidArguments("cannot open channel to self".into()));
67        }
68
69        if amount.eq(&amount.of_same("0")) || amount.balance_type() != BalanceType::HOPR {
70            return Err(InvalidArguments("invalid balance or balance type given".into()));
71        }
72
73        // Perform all checks
74        let db_clone = self.db.clone();
75        let self_addr = self.self_address();
76        self.db
77            .begin_transaction()
78            .await?
79            .perform(|tx| {
80                Box::pin(async move {
81                    let allowance = db_clone.get_safe_hopr_allowance(Some(tx)).await?;
82                    debug!(%allowance, "current staking safe allowance");
83                    if allowance.lt(&amount) {
84                        return Err(NotEnoughAllowance);
85                    }
86
87                    let hopr_balance = db_clone.get_safe_hopr_balance(Some(tx)).await?;
88                    debug!(balance = %hopr_balance, "current Safe HOPR balance");
89                    if hopr_balance.lt(&amount) {
90                        return Err(BalanceTooLow);
91                    }
92
93                    if db_clone.get_indexer_data(Some(tx)).await?.nr_enabled
94                        && !db_clone.is_allowed_in_network_registry(Some(tx), &destination).await?
95                    {
96                        return Err(PeerAccessDenied);
97                    }
98
99                    let maybe_channel = db_clone
100                        .get_channel_by_parties(Some(tx), &self_addr, &destination, false)
101                        .await?;
102                    if let Some(channel) = maybe_channel {
103                        debug!(%channel, "already found existing channel");
104                        if channel.status != ChannelStatus::Closed {
105                            error!(
106                                %destination,
107                                "channel to destination is already opened or pending to close"
108                            );
109                            return Err(ChannelAlreadyExists);
110                        }
111                    }
112                    Ok(())
113                })
114            })
115            .await?;
116
117        info!(%destination, %amount, "initiating channel open");
118        self.tx_sender.send(Action::OpenChannel(destination, amount)).await
119    }
120
121    #[tracing::instrument(level = "debug", skip(self))]
122    async fn fund_channel(&self, channel_id: Hash, amount: Balance) -> Result<PendingAction> {
123        if amount.eq(&amount.of_same("0")) || amount.balance_type() != BalanceType::HOPR {
124            return Err(InvalidArguments("invalid balance or balance type given".into()));
125        }
126
127        let db_clone = self.db.clone();
128        let maybe_channel = self
129            .db
130            .begin_transaction()
131            .await?
132            .perform(|tx| {
133                Box::pin(async move {
134                    let allowance = db_clone.get_safe_hopr_allowance(Some(tx)).await?;
135                    debug!(%allowance, "current staking safe allowance");
136                    if allowance.lt(&amount) {
137                        return Err(NotEnoughAllowance);
138                    }
139
140                    let hopr_balance = db_clone.get_safe_hopr_balance(Some(tx)).await?;
141                    debug!(balance = %hopr_balance, "current Safe HOPR balance");
142                    if hopr_balance.lt(&amount) {
143                        return Err(BalanceTooLow);
144                    }
145
146                    Ok(db_clone.get_channel_by_id(Some(tx), &channel_id).await?)
147                })
148            })
149            .await?;
150
151        match maybe_channel {
152            Some(channel) => {
153                if channel.status == ChannelStatus::Open {
154                    info!("initiating funding of {channel} with {amount}");
155                    self.tx_sender.send(Action::FundChannel(channel, amount)).await
156                } else {
157                    Err(InvalidState(format!("channel {channel_id} is not opened")))
158                }
159            }
160            None => Err(ChannelDoesNotExist),
161        }
162    }
163
164    #[tracing::instrument(level = "debug", skip(self))]
165    async fn close_channel(
166        &self,
167        counterparty: Address,
168        direction: ChannelDirection,
169        redeem_before_close: bool,
170    ) -> Result<PendingAction> {
171        let maybe_channel = match direction {
172            ChannelDirection::Incoming => {
173                self.db
174                    .get_channel_by_parties(None, &counterparty, &self.self_address(), false)
175                    .await?
176            }
177            ChannelDirection::Outgoing => {
178                self.db
179                    .get_channel_by_parties(None, &self.self_address(), &counterparty, false)
180                    .await?
181            }
182        };
183
184        match maybe_channel {
185            Some(channel) => {
186                match channel.status {
187                    ChannelStatus::Closed => Err(ChannelAlreadyClosed),
188                    ChannelStatus::PendingToClose(_) => {
189                        let remaining_closure_time = channel.remaining_closure_time(current_time());
190                        info!(%channel, ?remaining_closure_time, "remaining closure time update for a channel");
191                        match remaining_closure_time {
192                            Some(Duration::ZERO) => {
193                                info!(%channel, %direction, "initiating finalization of channel closure");
194                                self.tx_sender.send(Action::CloseChannel(channel, direction)).await
195                            }
196                            _ => Err(ClosureTimeHasNotElapsed(
197                                channel
198                                    .remaining_closure_time(current_time())
199                                    .expect("impossible: closure time has not passed but no remaining closure time")
200                                    .as_secs(),
201                            )),
202                        }
203                    }
204                    ChannelStatus::Open => {
205                        if redeem_before_close && direction == ChannelDirection::Incoming {
206                            // TODO: trigger aggregation
207                            // Do not await the redemption, just submit it to the queue
208                            let redeemed = self.redeem_tickets_in_channel(&channel, false).await?.len();
209                            info!(count = redeemed, %channel, "redeemed tickets before channel closing");
210                        }
211
212                        info!(%channel, ?direction, "initiating channel closure");
213                        self.tx_sender.send(Action::CloseChannel(channel, direction)).await
214                    }
215                }
216            }
217            None => Err(ChannelDoesNotExist),
218        }
219    }
220}
221#[cfg(test)]
222mod tests {
223    use crate::action_queue::{ActionQueue, MockTransactionExecutor};
224    use crate::action_state::MockActionState;
225    use crate::channels::ChannelActions;
226    use crate::errors::ChainActionsError;
227    use crate::ChainActions;
228    use futures::FutureExt;
229    use hex_literal::hex;
230    use hopr_chain_types::actions::Action;
231    use hopr_chain_types::chain_events::{ChainEventType, SignificantChainEvent};
232    use hopr_crypto_random::random_bytes;
233    use hopr_crypto_types::prelude::*;
234    use hopr_db_sql::channels::HoprDbChannelOperations;
235    use hopr_db_sql::db::HoprDb;
236    use hopr_db_sql::HoprDbGeneralModelOperations;
237    use hopr_db_sql::{api::info::DomainSeparator, info::HoprDbInfoOperations};
238    use hopr_internal_types::prelude::*;
239    use hopr_primitive_types::prelude::*;
240    use lazy_static::lazy_static;
241    use mockall::Sequence;
242    use std::{
243        ops::{Add, Sub},
244        time::{Duration, SystemTime},
245    };
246
247    lazy_static! {
248        static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
249            "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
250        ))
251        .expect("lazy static keypair should be constructible");
252        static ref BOB_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
253            "48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c"
254        ))
255        .expect("lazy static keypair should be constructible");
256        static ref ALICE: Address = ALICE_KP.public().to_address();
257        static ref BOB: Address = BOB_KP.public().to_address();
258    }
259
260    #[async_std::test]
261    async fn test_open_channel() -> anyhow::Result<()> {
262        let stake = Balance::new(10_u32, BalanceType::HOPR);
263        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
264
265        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
266        let db_clone = db.clone();
267        db.begin_transaction()
268            .await?
269            .perform(|tx| {
270                Box::pin(async move {
271                    db_clone
272                        .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
273                        .await?;
274                    db_clone
275                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
276                        .await?;
277                    db_clone
278                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
279                        .await?;
280                    db_clone.set_network_registry_enabled(Some(tx), false).await
281                })
282            })
283            .await?;
284
285        let mut tx_exec = MockTransactionExecutor::new();
286        tx_exec
287            .expect_fund_channel()
288            .times(1)
289            .withf(move |dst, balance| BOB.eq(dst) && stake.eq(balance))
290            .returning(move |_, _| Ok(random_hash));
291
292        let new_channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
293
294        let mut indexer_action_tracker = MockActionState::new();
295        indexer_action_tracker
296            .expect_register_expectation()
297            .once()
298            .returning(move |_| {
299                Ok(futures::future::ok(SignificantChainEvent {
300                    tx_hash: random_hash,
301                    event_type: ChainEventType::ChannelOpened(new_channel),
302                })
303                .boxed())
304            });
305
306        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
307
308        let tx_sender = tx_queue.new_sender();
309        async_std::task::spawn(async move { tx_queue.start().await });
310
311        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
312
313        let tx_res = actions.open_channel(*BOB, stake).await?.await?;
314
315        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
316        assert!(
317            matches!(tx_res.action, Action::OpenChannel(_, _)),
318            "must be open channel action"
319        );
320        assert!(
321            matches!(tx_res.event, Some(ChainEventType::ChannelOpened(_))),
322            "must correspond to open channel chain event"
323        );
324
325        Ok(())
326    }
327
328    #[async_std::test]
329    async fn test_should_not_open_channel_again() -> anyhow::Result<()> {
330        let stake = Balance::new(10_u32, BalanceType::HOPR);
331
332        let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
333
334        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
335        let db_clone = db.clone();
336        db.begin_transaction()
337            .await?
338            .perform(|tx| {
339                Box::pin(async move {
340                    db_clone
341                        .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
342                        .await?;
343                    db_clone
344                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
345                        .await?;
346                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
347                    db_clone
348                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
349                        .await?;
350                    db_clone.upsert_channel(Some(tx), channel).await
351                })
352            })
353            .await?;
354
355        let tx_queue = ActionQueue::new(
356            db.clone(),
357            MockActionState::new(),
358            MockTransactionExecutor::new(),
359            Default::default(),
360        );
361
362        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
363
364        assert!(
365            matches!(
366                actions
367                    .open_channel(*BOB, stake)
368                    .await
369                    .err()
370                    .expect("should be an error"),
371                ChainActionsError::ChannelAlreadyExists
372            ),
373            "should fail when channel exists"
374        );
375
376        Ok(())
377    }
378
379    #[async_std::test]
380    async fn test_should_not_open_channel_to_self() -> anyhow::Result<()> {
381        let stake = Balance::new(10_u32, BalanceType::HOPR);
382
383        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
384        let db_clone = db.clone();
385        db.begin_transaction()
386            .await?
387            .perform(|tx| {
388                Box::pin(async move {
389                    db_clone
390                        .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
391                        .await?;
392                    db_clone
393                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
394                        .await?;
395                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
396                    db_clone
397                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
398                        .await
399                })
400            })
401            .await?;
402
403        let tx_queue = ActionQueue::new(
404            db.clone(),
405            MockActionState::new(),
406            MockTransactionExecutor::new(),
407            Default::default(),
408        );
409
410        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
411
412        assert!(
413            matches!(
414                actions
415                    .open_channel(*ALICE, stake)
416                    .await
417                    .err()
418                    .expect("should be an error"),
419                ChainActionsError::InvalidArguments(_)
420            ),
421            "should not create channel to self"
422        );
423        Ok(())
424    }
425
426    #[async_std::test]
427    async fn test_open_should_not_allow_invalid_balance() -> anyhow::Result<()> {
428        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
429        let db_clone = db.clone();
430        db.begin_transaction()
431            .await?
432            .perform(|tx| {
433                Box::pin(async move {
434                    db_clone
435                        .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
436                        .await?;
437                    db_clone
438                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
439                        .await?;
440                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
441                    db_clone
442                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
443                        .await
444                })
445            })
446            .await?;
447
448        let tx_queue = ActionQueue::new(
449            db.clone(),
450            MockActionState::new(),
451            MockTransactionExecutor::new(),
452            Default::default(),
453        );
454
455        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
456        let stake = Balance::new(10_u32, BalanceType::Native);
457        assert!(
458            matches!(
459                actions
460                    .open_channel(*BOB, stake)
461                    .await
462                    .err()
463                    .expect("should be an error"),
464                ChainActionsError::InvalidArguments(_)
465            ),
466            "should not allow invalid balance"
467        );
468
469        let stake = Balance::new(0_u32, BalanceType::HOPR);
470
471        assert!(
472            matches!(
473                actions
474                    .open_channel(*BOB, stake)
475                    .await
476                    .err()
477                    .expect("should be an error"),
478                ChainActionsError::InvalidArguments(_)
479            ),
480            "should not allow invalid balance"
481        );
482        Ok(())
483    }
484
485    #[async_std::test]
486    async fn test_should_not_open_if_not_enough_allowance() -> anyhow::Result<()> {
487        let stake = Balance::new(10_000_u32, BalanceType::HOPR);
488
489        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
490        let db_clone = db.clone();
491        db.begin_transaction()
492            .await?
493            .perform(|tx| {
494                Box::pin(async move {
495                    db_clone
496                        .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
497                        .await?;
498                    db_clone
499                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
500                        .await?;
501                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
502                    db_clone
503                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
504                        .await
505                })
506            })
507            .await?;
508
509        let tx_queue = ActionQueue::new(
510            db.clone(),
511            MockActionState::new(),
512            MockTransactionExecutor::new(),
513            Default::default(),
514        );
515
516        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
517
518        assert!(
519            matches!(
520                actions
521                    .open_channel(*BOB, stake)
522                    .await
523                    .err()
524                    .expect("should be an error"),
525                ChainActionsError::NotEnoughAllowance
526            ),
527            "should fail when not enough allowance"
528        );
529        Ok(())
530    }
531
532    #[async_std::test]
533    async fn test_should_not_open_if_not_enough_token_balance() -> anyhow::Result<()> {
534        let stake = Balance::new(10_000_u32, BalanceType::HOPR);
535
536        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
537        let db_clone = db.clone();
538        db.begin_transaction()
539            .await?
540            .perform(|tx| {
541                Box::pin(async move {
542                    db_clone
543                        .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
544                        .await?;
545                    db_clone
546                        .set_safe_hopr_balance(Some(tx), Balance::new(1_u64, BalanceType::HOPR))
547                        .await?;
548                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
549                    db_clone
550                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
551                        .await
552                })
553            })
554            .await?;
555
556        let tx_queue = ActionQueue::new(
557            db.clone(),
558            MockActionState::new(),
559            MockTransactionExecutor::new(),
560            Default::default(),
561        );
562
563        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
564
565        assert!(
566            matches!(
567                actions
568                    .open_channel(*BOB, stake)
569                    .await
570                    .err()
571                    .expect("should be an error"),
572                ChainActionsError::BalanceTooLow
573            ),
574            "should fail when not enough token balance"
575        );
576        Ok(())
577    }
578
579    #[async_std::test]
580    async fn test_fund_channel() -> anyhow::Result<()> {
581        let stake = Balance::new(10_u32, BalanceType::HOPR);
582        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
583        let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
584
585        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
586        let db_clone = db.clone();
587        db.begin_transaction()
588            .await?
589            .perform(|tx| {
590                Box::pin(async move {
591                    db_clone
592                        .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
593                        .await?;
594                    db_clone
595                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
596                        .await?;
597                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
598                    db_clone
599                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
600                        .await?;
601                    db_clone.upsert_channel(Some(tx), channel).await
602                })
603            })
604            .await?;
605
606        let mut tx_exec = MockTransactionExecutor::new();
607        tx_exec
608            .expect_fund_channel()
609            .times(1)
610            .withf(move |dest, balance| channel.destination.eq(dest) && stake.eq(balance))
611            .returning(move |_, _| Ok(random_hash));
612
613        let mut indexer_action_tracker = MockActionState::new();
614        indexer_action_tracker
615            .expect_register_expectation()
616            .once()
617            .returning(move |_| {
618                Ok(futures::future::ok(SignificantChainEvent {
619                    tx_hash: random_hash,
620                    event_type: ChainEventType::ChannelBalanceIncreased(channel, stake),
621                })
622                .boxed())
623            });
624
625        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
626        let tx_sender = tx_queue.new_sender();
627        async_std::task::spawn(async move {
628            tx_queue.start().await;
629        });
630
631        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
632
633        let tx_res = actions.fund_channel(channel.get_id(), stake).await?.await?;
634
635        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
636        assert!(
637            matches!(tx_res.action, Action::FundChannel(_, _)),
638            "must be open channel action"
639        );
640        assert!(
641            matches!(tx_res.event, Some(ChainEventType::ChannelBalanceIncreased(_, _))),
642            "must correspond to channel chain event"
643        );
644        Ok(())
645    }
646
647    #[async_std::test]
648    async fn test_should_not_fund_nonexistent_channel() -> anyhow::Result<()> {
649        let channel_id = generate_channel_id(&*ALICE, &*BOB);
650
651        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
652        let db_clone = db.clone();
653        db.begin_transaction()
654            .await?
655            .perform(|tx| {
656                Box::pin(async move {
657                    db_clone
658                        .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
659                        .await?;
660                    db_clone
661                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
662                        .await?;
663                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
664                    db_clone
665                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
666                        .await
667                })
668            })
669            .await?;
670
671        let tx_queue = ActionQueue::new(
672            db.clone(),
673            MockActionState::new(),
674            MockTransactionExecutor::new(),
675            Default::default(),
676        );
677
678        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
679        let stake = Balance::new(10_u32, BalanceType::HOPR);
680        assert!(
681            matches!(
682                actions
683                    .fund_channel(channel_id, stake)
684                    .await
685                    .err()
686                    .expect("should be an error"),
687                ChainActionsError::ChannelDoesNotExist
688            ),
689            "should fail when channel does not exist"
690        );
691        Ok(())
692    }
693
694    #[async_std::test]
695    async fn test_fund_should_not_allow_invalid_balance() -> anyhow::Result<()> {
696        let channel_id = generate_channel_id(&*ALICE, &*BOB);
697
698        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
699        let db_clone = db.clone();
700        db.begin_transaction()
701            .await?
702            .perform(|tx| {
703                Box::pin(async move {
704                    db_clone
705                        .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
706                        .await?;
707                    db_clone
708                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
709                        .await?;
710                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
711                    db_clone
712                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
713                        .await
714                })
715            })
716            .await?;
717
718        let tx_queue = ActionQueue::new(
719            db.clone(),
720            MockActionState::new(),
721            MockTransactionExecutor::new(),
722            Default::default(),
723        );
724
725        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
726        let stake = Balance::new(10_u32, BalanceType::Native);
727        assert!(
728            matches!(
729                actions
730                    .open_channel(*BOB, stake)
731                    .await
732                    .err()
733                    .expect("should be an error"),
734                ChainActionsError::InvalidArguments(_)
735            ),
736            "should not allow invalid balance"
737        );
738
739        let stake = Balance::new(0_u32, BalanceType::HOPR);
740        assert!(
741            matches!(
742                actions
743                    .fund_channel(channel_id, stake)
744                    .await
745                    .err()
746                    .expect("should be an error"),
747                ChainActionsError::InvalidArguments(_)
748            ),
749            "should not allow invalid balance"
750        );
751        Ok(())
752    }
753
754    #[async_std::test]
755    async fn test_should_not_fund_if_not_enough_allowance() -> anyhow::Result<()> {
756        let channel_id = generate_channel_id(&*ALICE, &*BOB);
757
758        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
759        let db_clone = db.clone();
760        db.begin_transaction()
761            .await?
762            .perform(|tx| {
763                Box::pin(async move {
764                    db_clone
765                        .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
766                        .await?;
767                    db_clone
768                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
769                        .await?;
770                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
771                    db_clone
772                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
773                        .await
774                })
775            })
776            .await?;
777
778        let tx_queue = ActionQueue::new(
779            db.clone(),
780            MockActionState::new(),
781            MockTransactionExecutor::new(),
782            Default::default(),
783        );
784
785        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
786        let stake = Balance::new(10_000_u32, BalanceType::HOPR);
787        assert!(
788            matches!(
789                actions
790                    .fund_channel(channel_id, stake)
791                    .await
792                    .err()
793                    .expect("should be an error"),
794                ChainActionsError::NotEnoughAllowance
795            ),
796            "should fail when not enough allowance"
797        );
798        Ok(())
799    }
800
801    #[async_std::test]
802    async fn test_should_not_fund_if_not_enough_balance() -> anyhow::Result<()> {
803        let channel_id = generate_channel_id(&*ALICE, &*BOB);
804
805        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
806        let db_clone = db.clone();
807        db.begin_transaction()
808            .await?
809            .perform(|tx| {
810                Box::pin(async move {
811                    db_clone
812                        .set_safe_hopr_allowance(Some(tx), Balance::new(100_000_u64, BalanceType::HOPR))
813                        .await?;
814                    db_clone
815                        .set_safe_hopr_balance(Some(tx), Balance::new(1_u64, BalanceType::HOPR))
816                        .await?;
817                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
818                    db_clone
819                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
820                        .await
821                })
822            })
823            .await?;
824
825        let tx_queue = ActionQueue::new(
826            db.clone(),
827            MockActionState::new(),
828            MockTransactionExecutor::new(),
829            Default::default(),
830        );
831
832        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
833        let stake = Balance::new(10_000_u32, BalanceType::HOPR);
834        assert!(
835            matches!(
836                actions
837                    .fund_channel(channel_id, stake)
838                    .await
839                    .err()
840                    .expect("should be an error"),
841                ChainActionsError::BalanceTooLow
842            ),
843            "should fail when not enough balance"
844        );
845        Ok(())
846    }
847
848    #[async_std::test]
849    async fn test_close_channel_outgoing() -> anyhow::Result<()> {
850        let stake = Balance::new(10_u32, BalanceType::HOPR);
851        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
852
853        let mut channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
854
855        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
856        let db_clone = db.clone();
857        db.begin_transaction()
858            .await?
859            .perform(|tx| {
860                Box::pin(async move {
861                    db_clone
862                        .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
863                        .await?;
864                    db_clone
865                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
866                        .await?;
867                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
868                    db_clone
869                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
870                        .await?;
871                    db_clone.upsert_channel(Some(tx), channel).await
872                })
873            })
874            .await?;
875
876        let mut tx_exec = MockTransactionExecutor::new();
877        let mut seq = Sequence::new();
878        tx_exec
879            .expect_initiate_outgoing_channel_closure()
880            .times(1)
881            .in_sequence(&mut seq)
882            .withf(move |dst| BOB.eq(dst))
883            .returning(move |_| Ok(random_hash));
884
885        tx_exec
886            .expect_finalize_outgoing_channel_closure()
887            .times(1)
888            .in_sequence(&mut seq)
889            .withf(move |dst| BOB.eq(dst))
890            .returning(move |_| Ok(random_hash));
891
892        let mut indexer_action_tracker = MockActionState::new();
893        let mut seq2 = Sequence::new();
894        indexer_action_tracker
895            .expect_register_expectation()
896            .once()
897            .in_sequence(&mut seq2)
898            .returning(move |_| {
899                Ok(futures::future::ok(SignificantChainEvent {
900                    tx_hash: random_hash,
901                    event_type: ChainEventType::ChannelClosureInitiated(channel),
902                })
903                .boxed())
904            });
905
906        indexer_action_tracker
907            .expect_register_expectation()
908            .once()
909            .in_sequence(&mut seq2)
910            .returning(move |_| {
911                Ok(futures::future::ok(SignificantChainEvent {
912                    tx_hash: random_hash,
913                    event_type: ChainEventType::ChannelClosed(channel),
914                })
915                .boxed())
916            });
917
918        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
919        let tx_sender = tx_queue.new_sender();
920        async_std::task::spawn(async move {
921            tx_queue.start().await;
922        });
923
924        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
925
926        let tx_res = actions
927            .close_channel(*BOB, ChannelDirection::Outgoing, false)
928            .await?
929            .await?;
930
931        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
932        assert!(
933            matches!(tx_res.action, Action::CloseChannel(_, _)),
934            "must be close channel action"
935        );
936        assert!(
937            matches!(tx_res.event, Some(ChainEventType::ChannelClosureInitiated(_))),
938            "must correspond to channel chain event"
939        );
940
941        // Transition the channel to the PendingToClose state with the closure time already elapsed
942        channel.status = ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(10)));
943
944        db.upsert_channel(None, channel).await?;
945
946        let tx_res = actions
947            .close_channel(*BOB, ChannelDirection::Outgoing, false)
948            .await?
949            .await?;
950
951        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
952        assert!(
953            matches!(tx_res.action, Action::CloseChannel(_, _)),
954            "must be close channel action"
955        );
956        assert!(
957            matches!(tx_res.event, Some(ChainEventType::ChannelClosed(_))),
958            "must correspond to channel chain event"
959        );
960        Ok(())
961    }
962
963    #[async_std::test]
964    async fn test_close_channel_incoming() -> anyhow::Result<()> {
965        let stake = Balance::new(10_u32, BalanceType::HOPR);
966        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
967
968        let channel = ChannelEntry::new(*BOB, *ALICE, stake, U256::zero(), ChannelStatus::Open, U256::zero());
969
970        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
971        let db_clone = db.clone();
972        db.begin_transaction()
973            .await?
974            .perform(|tx| {
975                Box::pin(async move {
976                    db_clone
977                        .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
978                        .await?;
979                    db_clone
980                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
981                        .await?;
982                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
983                    db_clone
984                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
985                        .await?;
986                    db_clone.upsert_channel(Some(tx), channel).await
987                })
988            })
989            .await?;
990
991        let mut tx_exec = MockTransactionExecutor::new();
992        let mut seq = Sequence::new();
993        tx_exec
994            .expect_close_incoming_channel()
995            .times(1)
996            .in_sequence(&mut seq)
997            .withf(move |dst| BOB.eq(dst))
998            .returning(move |_| Ok(random_hash));
999
1000        let mut indexer_action_tracker = MockActionState::new();
1001        indexer_action_tracker
1002            .expect_register_expectation()
1003            .returning(move |_| {
1004                Ok(futures::future::ok(SignificantChainEvent {
1005                    tx_hash: random_hash,
1006                    event_type: ChainEventType::ChannelClosed(channel),
1007                })
1008                .boxed())
1009            });
1010
1011        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
1012        let tx_sender = tx_queue.new_sender();
1013        async_std::task::spawn(async move {
1014            tx_queue.start().await;
1015        });
1016
1017        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
1018
1019        let tx_res = actions
1020            .close_channel(*BOB, ChannelDirection::Incoming, false)
1021            .await?
1022            .await?;
1023
1024        assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
1025        assert!(
1026            matches!(tx_res.action, Action::CloseChannel(_, _)),
1027            "must be close channel action"
1028        );
1029        assert!(
1030            matches!(tx_res.event, Some(ChainEventType::ChannelClosed(_))),
1031            "must correspond to channel chain event"
1032        );
1033        Ok(())
1034    }
1035
1036    #[async_std::test]
1037    async fn test_should_not_close_when_closure_time_did_not_elapse() -> anyhow::Result<()> {
1038        let stake = Balance::new(10_u32, BalanceType::HOPR);
1039
1040        let channel = ChannelEntry::new(
1041            *ALICE,
1042            *BOB,
1043            stake,
1044            U256::zero(),
1045            ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100))),
1046            U256::zero(),
1047        );
1048
1049        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
1050        let db_clone = db.clone();
1051        db.begin_transaction()
1052            .await?
1053            .perform(|tx| {
1054                Box::pin(async move {
1055                    db_clone
1056                        .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
1057                        .await?;
1058                    db_clone
1059                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
1060                        .await?;
1061                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
1062                    db_clone
1063                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
1064                        .await?;
1065                    db_clone.upsert_channel(Some(tx), channel).await
1066                })
1067            })
1068            .await?;
1069
1070        let tx_queue = ActionQueue::new(
1071            db.clone(),
1072            MockActionState::new(),
1073            MockTransactionExecutor::new(),
1074            Default::default(),
1075        );
1076
1077        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
1078
1079        assert!(
1080            matches!(
1081                actions
1082                    .close_channel(*BOB, ChannelDirection::Outgoing, false)
1083                    .await
1084                    .err()
1085                    .expect("should be an error"),
1086                ChainActionsError::ClosureTimeHasNotElapsed(_)
1087            ),
1088            "should fail when the channel closure period did not elapse"
1089        );
1090        Ok(())
1091    }
1092
1093    #[async_std::test]
1094    async fn test_should_not_close_nonexistent_channel() -> anyhow::Result<()> {
1095        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
1096        let db_clone = db.clone();
1097        db.begin_transaction()
1098            .await?
1099            .perform(|tx| {
1100                Box::pin(async move {
1101                    db_clone
1102                        .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
1103                        .await?;
1104                    db_clone
1105                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
1106                        .await?;
1107                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
1108                    db_clone
1109                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
1110                        .await
1111                })
1112            })
1113            .await?;
1114
1115        let tx_queue = ActionQueue::new(
1116            db.clone(),
1117            MockActionState::new(),
1118            MockTransactionExecutor::new(),
1119            Default::default(),
1120        );
1121        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
1122
1123        assert!(
1124            matches!(
1125                actions
1126                    .close_channel(*BOB, ChannelDirection::Outgoing, false)
1127                    .await
1128                    .err()
1129                    .expect("should be an error"),
1130                ChainActionsError::ChannelDoesNotExist
1131            ),
1132            "should fail when channel does not exist"
1133        );
1134        Ok(())
1135    }
1136
1137    #[async_std::test]
1138    async fn test_should_not_close_closed_channel() -> anyhow::Result<()> {
1139        let stake = Balance::new(10_u32, BalanceType::HOPR);
1140        let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Closed, U256::zero());
1141
1142        let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
1143        let db_clone = db.clone();
1144        db.begin_transaction()
1145            .await?
1146            .perform(|tx| {
1147                Box::pin(async move {
1148                    db_clone
1149                        .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
1150                        .await?;
1151                    db_clone
1152                        .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
1153                        .await?;
1154                    db_clone.set_network_registry_enabled(Some(tx), false).await?;
1155                    db_clone
1156                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
1157                        .await?;
1158                    db_clone.upsert_channel(Some(tx), channel).await
1159                })
1160            })
1161            .await?;
1162
1163        let tx_queue = ActionQueue::new(
1164            db.clone(),
1165            MockActionState::new(),
1166            MockTransactionExecutor::new(),
1167            Default::default(),
1168        );
1169
1170        let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
1171
1172        assert!(
1173            matches!(
1174                actions
1175                    .close_channel(*BOB, ChannelDirection::Outgoing, false)
1176                    .await
1177                    .err()
1178                    .expect("should be an error"),
1179                ChainActionsError::ChannelAlreadyClosed
1180            ),
1181            "should fail when channel is already closed"
1182        );
1183        Ok(())
1184    }
1185}