1use async_trait::async_trait;
17use std::time::Duration;
18use tracing::{debug, error, info};
19
20use hopr_chain_types::actions::Action;
21use hopr_crypto_types::types::Hash;
22use hopr_db_sql::HoprDbAllOperations;
23use hopr_internal_types::prelude::*;
24use hopr_primitive_types::prelude::*;
25
26use crate::action_queue::PendingAction;
27use crate::errors::ChainActionsError::{
28 BalanceTooLow, ClosureTimeHasNotElapsed, InvalidArguments, InvalidState, NotEnoughAllowance, PeerAccessDenied,
29};
30use crate::errors::{
31 ChainActionsError::{ChannelAlreadyClosed, ChannelAlreadyExists, ChannelDoesNotExist},
32 Result,
33};
34use crate::redeem::TicketRedeemActions;
35use crate::ChainActions;
36
37use hopr_platform::time::native::current_time;
38
39#[async_trait]
41pub trait ChannelActions {
42 async fn open_channel(&self, destination: Address, amount: Balance) -> Result<PendingAction>;
44
45 async fn fund_channel(&self, channel_id: Hash, amount: Balance) -> Result<PendingAction>;
47
48 async fn close_channel(
51 &self,
52 counterparty: Address,
53 direction: ChannelDirection,
54 redeem_before_close: bool,
55 ) -> Result<PendingAction>;
56}
57
58#[async_trait]
59impl<Db> ChannelActions for ChainActions<Db>
60where
61 Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
62{
63 #[tracing::instrument(level = "debug", skip(self))]
64 async fn open_channel(&self, destination: Address, amount: Balance) -> Result<PendingAction> {
65 if self.self_address() == destination {
66 return Err(InvalidArguments("cannot open channel to self".into()));
67 }
68
69 if amount.eq(&amount.of_same("0")) || amount.balance_type() != BalanceType::HOPR {
70 return Err(InvalidArguments("invalid balance or balance type given".into()));
71 }
72
73 let db_clone = self.db.clone();
75 let self_addr = self.self_address();
76 self.db
77 .begin_transaction()
78 .await?
79 .perform(|tx| {
80 Box::pin(async move {
81 let allowance = db_clone.get_safe_hopr_allowance(Some(tx)).await?;
82 debug!(%allowance, "current staking safe allowance");
83 if allowance.lt(&amount) {
84 return Err(NotEnoughAllowance);
85 }
86
87 let hopr_balance = db_clone.get_safe_hopr_balance(Some(tx)).await?;
88 debug!(balance = %hopr_balance, "current Safe HOPR balance");
89 if hopr_balance.lt(&amount) {
90 return Err(BalanceTooLow);
91 }
92
93 if db_clone.get_indexer_data(Some(tx)).await?.nr_enabled
94 && !db_clone.is_allowed_in_network_registry(Some(tx), &destination).await?
95 {
96 return Err(PeerAccessDenied);
97 }
98
99 let maybe_channel = db_clone
100 .get_channel_by_parties(Some(tx), &self_addr, &destination, false)
101 .await?;
102 if let Some(channel) = maybe_channel {
103 debug!(%channel, "already found existing channel");
104 if channel.status != ChannelStatus::Closed {
105 error!(
106 %destination,
107 "channel to destination is already opened or pending to close"
108 );
109 return Err(ChannelAlreadyExists);
110 }
111 }
112 Ok(())
113 })
114 })
115 .await?;
116
117 info!(%destination, %amount, "initiating channel open");
118 self.tx_sender.send(Action::OpenChannel(destination, amount)).await
119 }
120
121 #[tracing::instrument(level = "debug", skip(self))]
122 async fn fund_channel(&self, channel_id: Hash, amount: Balance) -> Result<PendingAction> {
123 if amount.eq(&amount.of_same("0")) || amount.balance_type() != BalanceType::HOPR {
124 return Err(InvalidArguments("invalid balance or balance type given".into()));
125 }
126
127 let db_clone = self.db.clone();
128 let maybe_channel = self
129 .db
130 .begin_transaction()
131 .await?
132 .perform(|tx| {
133 Box::pin(async move {
134 let allowance = db_clone.get_safe_hopr_allowance(Some(tx)).await?;
135 debug!(%allowance, "current staking safe allowance");
136 if allowance.lt(&amount) {
137 return Err(NotEnoughAllowance);
138 }
139
140 let hopr_balance = db_clone.get_safe_hopr_balance(Some(tx)).await?;
141 debug!(balance = %hopr_balance, "current Safe HOPR balance");
142 if hopr_balance.lt(&amount) {
143 return Err(BalanceTooLow);
144 }
145
146 Ok(db_clone.get_channel_by_id(Some(tx), &channel_id).await?)
147 })
148 })
149 .await?;
150
151 match maybe_channel {
152 Some(channel) => {
153 if channel.status == ChannelStatus::Open {
154 info!("initiating funding of {channel} with {amount}");
155 self.tx_sender.send(Action::FundChannel(channel, amount)).await
156 } else {
157 Err(InvalidState(format!("channel {channel_id} is not opened")))
158 }
159 }
160 None => Err(ChannelDoesNotExist),
161 }
162 }
163
164 #[tracing::instrument(level = "debug", skip(self))]
165 async fn close_channel(
166 &self,
167 counterparty: Address,
168 direction: ChannelDirection,
169 redeem_before_close: bool,
170 ) -> Result<PendingAction> {
171 let maybe_channel = match direction {
172 ChannelDirection::Incoming => {
173 self.db
174 .get_channel_by_parties(None, &counterparty, &self.self_address(), false)
175 .await?
176 }
177 ChannelDirection::Outgoing => {
178 self.db
179 .get_channel_by_parties(None, &self.self_address(), &counterparty, false)
180 .await?
181 }
182 };
183
184 match maybe_channel {
185 Some(channel) => {
186 match channel.status {
187 ChannelStatus::Closed => Err(ChannelAlreadyClosed),
188 ChannelStatus::PendingToClose(_) => {
189 let remaining_closure_time = channel.remaining_closure_time(current_time());
190 info!(%channel, ?remaining_closure_time, "remaining closure time update for a channel");
191 match remaining_closure_time {
192 Some(Duration::ZERO) => {
193 info!(%channel, %direction, "initiating finalization of channel closure");
194 self.tx_sender.send(Action::CloseChannel(channel, direction)).await
195 }
196 _ => Err(ClosureTimeHasNotElapsed(
197 channel
198 .remaining_closure_time(current_time())
199 .expect("impossible: closure time has not passed but no remaining closure time")
200 .as_secs(),
201 )),
202 }
203 }
204 ChannelStatus::Open => {
205 if redeem_before_close && direction == ChannelDirection::Incoming {
206 let redeemed = self.redeem_tickets_in_channel(&channel, false).await?.len();
209 info!(count = redeemed, %channel, "redeemed tickets before channel closing");
210 }
211
212 info!(%channel, ?direction, "initiating channel closure");
213 self.tx_sender.send(Action::CloseChannel(channel, direction)).await
214 }
215 }
216 }
217 None => Err(ChannelDoesNotExist),
218 }
219 }
220}
221#[cfg(test)]
222mod tests {
223 use crate::action_queue::{ActionQueue, MockTransactionExecutor};
224 use crate::action_state::MockActionState;
225 use crate::channels::ChannelActions;
226 use crate::errors::ChainActionsError;
227 use crate::ChainActions;
228 use futures::FutureExt;
229 use hex_literal::hex;
230 use hopr_chain_types::actions::Action;
231 use hopr_chain_types::chain_events::{ChainEventType, SignificantChainEvent};
232 use hopr_crypto_random::random_bytes;
233 use hopr_crypto_types::prelude::*;
234 use hopr_db_sql::channels::HoprDbChannelOperations;
235 use hopr_db_sql::db::HoprDb;
236 use hopr_db_sql::HoprDbGeneralModelOperations;
237 use hopr_db_sql::{api::info::DomainSeparator, info::HoprDbInfoOperations};
238 use hopr_internal_types::prelude::*;
239 use hopr_primitive_types::prelude::*;
240 use lazy_static::lazy_static;
241 use mockall::Sequence;
242 use std::{
243 ops::{Add, Sub},
244 time::{Duration, SystemTime},
245 };
246
247 lazy_static! {
248 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
249 "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
250 ))
251 .expect("lazy static keypair should be constructible");
252 static ref BOB_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
253 "48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c"
254 ))
255 .expect("lazy static keypair should be constructible");
256 static ref ALICE: Address = ALICE_KP.public().to_address();
257 static ref BOB: Address = BOB_KP.public().to_address();
258 }
259
260 #[async_std::test]
261 async fn test_open_channel() -> anyhow::Result<()> {
262 let stake = Balance::new(10_u32, BalanceType::HOPR);
263 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
264
265 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
266 let db_clone = db.clone();
267 db.begin_transaction()
268 .await?
269 .perform(|tx| {
270 Box::pin(async move {
271 db_clone
272 .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
273 .await?;
274 db_clone
275 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
276 .await?;
277 db_clone
278 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
279 .await?;
280 db_clone.set_network_registry_enabled(Some(tx), false).await
281 })
282 })
283 .await?;
284
285 let mut tx_exec = MockTransactionExecutor::new();
286 tx_exec
287 .expect_fund_channel()
288 .times(1)
289 .withf(move |dst, balance| BOB.eq(dst) && stake.eq(balance))
290 .returning(move |_, _| Ok(random_hash));
291
292 let new_channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
293
294 let mut indexer_action_tracker = MockActionState::new();
295 indexer_action_tracker
296 .expect_register_expectation()
297 .once()
298 .returning(move |_| {
299 Ok(futures::future::ok(SignificantChainEvent {
300 tx_hash: random_hash,
301 event_type: ChainEventType::ChannelOpened(new_channel),
302 })
303 .boxed())
304 });
305
306 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
307
308 let tx_sender = tx_queue.new_sender();
309 async_std::task::spawn(async move { tx_queue.start().await });
310
311 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
312
313 let tx_res = actions.open_channel(*BOB, stake).await?.await?;
314
315 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
316 assert!(
317 matches!(tx_res.action, Action::OpenChannel(_, _)),
318 "must be open channel action"
319 );
320 assert!(
321 matches!(tx_res.event, Some(ChainEventType::ChannelOpened(_))),
322 "must correspond to open channel chain event"
323 );
324
325 Ok(())
326 }
327
328 #[async_std::test]
329 async fn test_should_not_open_channel_again() -> anyhow::Result<()> {
330 let stake = Balance::new(10_u32, BalanceType::HOPR);
331
332 let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
333
334 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
335 let db_clone = db.clone();
336 db.begin_transaction()
337 .await?
338 .perform(|tx| {
339 Box::pin(async move {
340 db_clone
341 .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
342 .await?;
343 db_clone
344 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
345 .await?;
346 db_clone.set_network_registry_enabled(Some(tx), false).await?;
347 db_clone
348 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
349 .await?;
350 db_clone.upsert_channel(Some(tx), channel).await
351 })
352 })
353 .await?;
354
355 let tx_queue = ActionQueue::new(
356 db.clone(),
357 MockActionState::new(),
358 MockTransactionExecutor::new(),
359 Default::default(),
360 );
361
362 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
363
364 assert!(
365 matches!(
366 actions
367 .open_channel(*BOB, stake)
368 .await
369 .err()
370 .expect("should be an error"),
371 ChainActionsError::ChannelAlreadyExists
372 ),
373 "should fail when channel exists"
374 );
375
376 Ok(())
377 }
378
379 #[async_std::test]
380 async fn test_should_not_open_channel_to_self() -> anyhow::Result<()> {
381 let stake = Balance::new(10_u32, BalanceType::HOPR);
382
383 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
384 let db_clone = db.clone();
385 db.begin_transaction()
386 .await?
387 .perform(|tx| {
388 Box::pin(async move {
389 db_clone
390 .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
391 .await?;
392 db_clone
393 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
394 .await?;
395 db_clone.set_network_registry_enabled(Some(tx), false).await?;
396 db_clone
397 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
398 .await
399 })
400 })
401 .await?;
402
403 let tx_queue = ActionQueue::new(
404 db.clone(),
405 MockActionState::new(),
406 MockTransactionExecutor::new(),
407 Default::default(),
408 );
409
410 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
411
412 assert!(
413 matches!(
414 actions
415 .open_channel(*ALICE, stake)
416 .await
417 .err()
418 .expect("should be an error"),
419 ChainActionsError::InvalidArguments(_)
420 ),
421 "should not create channel to self"
422 );
423 Ok(())
424 }
425
426 #[async_std::test]
427 async fn test_open_should_not_allow_invalid_balance() -> anyhow::Result<()> {
428 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
429 let db_clone = db.clone();
430 db.begin_transaction()
431 .await?
432 .perform(|tx| {
433 Box::pin(async move {
434 db_clone
435 .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
436 .await?;
437 db_clone
438 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
439 .await?;
440 db_clone.set_network_registry_enabled(Some(tx), false).await?;
441 db_clone
442 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
443 .await
444 })
445 })
446 .await?;
447
448 let tx_queue = ActionQueue::new(
449 db.clone(),
450 MockActionState::new(),
451 MockTransactionExecutor::new(),
452 Default::default(),
453 );
454
455 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
456 let stake = Balance::new(10_u32, BalanceType::Native);
457 assert!(
458 matches!(
459 actions
460 .open_channel(*BOB, stake)
461 .await
462 .err()
463 .expect("should be an error"),
464 ChainActionsError::InvalidArguments(_)
465 ),
466 "should not allow invalid balance"
467 );
468
469 let stake = Balance::new(0_u32, BalanceType::HOPR);
470
471 assert!(
472 matches!(
473 actions
474 .open_channel(*BOB, stake)
475 .await
476 .err()
477 .expect("should be an error"),
478 ChainActionsError::InvalidArguments(_)
479 ),
480 "should not allow invalid balance"
481 );
482 Ok(())
483 }
484
485 #[async_std::test]
486 async fn test_should_not_open_if_not_enough_allowance() -> anyhow::Result<()> {
487 let stake = Balance::new(10_000_u32, BalanceType::HOPR);
488
489 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
490 let db_clone = db.clone();
491 db.begin_transaction()
492 .await?
493 .perform(|tx| {
494 Box::pin(async move {
495 db_clone
496 .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
497 .await?;
498 db_clone
499 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
500 .await?;
501 db_clone.set_network_registry_enabled(Some(tx), false).await?;
502 db_clone
503 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
504 .await
505 })
506 })
507 .await?;
508
509 let tx_queue = ActionQueue::new(
510 db.clone(),
511 MockActionState::new(),
512 MockTransactionExecutor::new(),
513 Default::default(),
514 );
515
516 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
517
518 assert!(
519 matches!(
520 actions
521 .open_channel(*BOB, stake)
522 .await
523 .err()
524 .expect("should be an error"),
525 ChainActionsError::NotEnoughAllowance
526 ),
527 "should fail when not enough allowance"
528 );
529 Ok(())
530 }
531
532 #[async_std::test]
533 async fn test_should_not_open_if_not_enough_token_balance() -> anyhow::Result<()> {
534 let stake = Balance::new(10_000_u32, BalanceType::HOPR);
535
536 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
537 let db_clone = db.clone();
538 db.begin_transaction()
539 .await?
540 .perform(|tx| {
541 Box::pin(async move {
542 db_clone
543 .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
544 .await?;
545 db_clone
546 .set_safe_hopr_balance(Some(tx), Balance::new(1_u64, BalanceType::HOPR))
547 .await?;
548 db_clone.set_network_registry_enabled(Some(tx), false).await?;
549 db_clone
550 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
551 .await
552 })
553 })
554 .await?;
555
556 let tx_queue = ActionQueue::new(
557 db.clone(),
558 MockActionState::new(),
559 MockTransactionExecutor::new(),
560 Default::default(),
561 );
562
563 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
564
565 assert!(
566 matches!(
567 actions
568 .open_channel(*BOB, stake)
569 .await
570 .err()
571 .expect("should be an error"),
572 ChainActionsError::BalanceTooLow
573 ),
574 "should fail when not enough token balance"
575 );
576 Ok(())
577 }
578
579 #[async_std::test]
580 async fn test_fund_channel() -> anyhow::Result<()> {
581 let stake = Balance::new(10_u32, BalanceType::HOPR);
582 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
583 let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
584
585 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
586 let db_clone = db.clone();
587 db.begin_transaction()
588 .await?
589 .perform(|tx| {
590 Box::pin(async move {
591 db_clone
592 .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
593 .await?;
594 db_clone
595 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
596 .await?;
597 db_clone.set_network_registry_enabled(Some(tx), false).await?;
598 db_clone
599 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
600 .await?;
601 db_clone.upsert_channel(Some(tx), channel).await
602 })
603 })
604 .await?;
605
606 let mut tx_exec = MockTransactionExecutor::new();
607 tx_exec
608 .expect_fund_channel()
609 .times(1)
610 .withf(move |dest, balance| channel.destination.eq(dest) && stake.eq(balance))
611 .returning(move |_, _| Ok(random_hash));
612
613 let mut indexer_action_tracker = MockActionState::new();
614 indexer_action_tracker
615 .expect_register_expectation()
616 .once()
617 .returning(move |_| {
618 Ok(futures::future::ok(SignificantChainEvent {
619 tx_hash: random_hash,
620 event_type: ChainEventType::ChannelBalanceIncreased(channel, stake),
621 })
622 .boxed())
623 });
624
625 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
626 let tx_sender = tx_queue.new_sender();
627 async_std::task::spawn(async move {
628 tx_queue.start().await;
629 });
630
631 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
632
633 let tx_res = actions.fund_channel(channel.get_id(), stake).await?.await?;
634
635 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
636 assert!(
637 matches!(tx_res.action, Action::FundChannel(_, _)),
638 "must be open channel action"
639 );
640 assert!(
641 matches!(tx_res.event, Some(ChainEventType::ChannelBalanceIncreased(_, _))),
642 "must correspond to channel chain event"
643 );
644 Ok(())
645 }
646
647 #[async_std::test]
648 async fn test_should_not_fund_nonexistent_channel() -> anyhow::Result<()> {
649 let channel_id = generate_channel_id(&*ALICE, &*BOB);
650
651 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
652 let db_clone = db.clone();
653 db.begin_transaction()
654 .await?
655 .perform(|tx| {
656 Box::pin(async move {
657 db_clone
658 .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
659 .await?;
660 db_clone
661 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
662 .await?;
663 db_clone.set_network_registry_enabled(Some(tx), false).await?;
664 db_clone
665 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
666 .await
667 })
668 })
669 .await?;
670
671 let tx_queue = ActionQueue::new(
672 db.clone(),
673 MockActionState::new(),
674 MockTransactionExecutor::new(),
675 Default::default(),
676 );
677
678 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
679 let stake = Balance::new(10_u32, BalanceType::HOPR);
680 assert!(
681 matches!(
682 actions
683 .fund_channel(channel_id, stake)
684 .await
685 .err()
686 .expect("should be an error"),
687 ChainActionsError::ChannelDoesNotExist
688 ),
689 "should fail when channel does not exist"
690 );
691 Ok(())
692 }
693
694 #[async_std::test]
695 async fn test_fund_should_not_allow_invalid_balance() -> anyhow::Result<()> {
696 let channel_id = generate_channel_id(&*ALICE, &*BOB);
697
698 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
699 let db_clone = db.clone();
700 db.begin_transaction()
701 .await?
702 .perform(|tx| {
703 Box::pin(async move {
704 db_clone
705 .set_safe_hopr_allowance(Some(tx), Balance::new(10_000_000_u64, BalanceType::HOPR))
706 .await?;
707 db_clone
708 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
709 .await?;
710 db_clone.set_network_registry_enabled(Some(tx), false).await?;
711 db_clone
712 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
713 .await
714 })
715 })
716 .await?;
717
718 let tx_queue = ActionQueue::new(
719 db.clone(),
720 MockActionState::new(),
721 MockTransactionExecutor::new(),
722 Default::default(),
723 );
724
725 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
726 let stake = Balance::new(10_u32, BalanceType::Native);
727 assert!(
728 matches!(
729 actions
730 .open_channel(*BOB, stake)
731 .await
732 .err()
733 .expect("should be an error"),
734 ChainActionsError::InvalidArguments(_)
735 ),
736 "should not allow invalid balance"
737 );
738
739 let stake = Balance::new(0_u32, BalanceType::HOPR);
740 assert!(
741 matches!(
742 actions
743 .fund_channel(channel_id, stake)
744 .await
745 .err()
746 .expect("should be an error"),
747 ChainActionsError::InvalidArguments(_)
748 ),
749 "should not allow invalid balance"
750 );
751 Ok(())
752 }
753
754 #[async_std::test]
755 async fn test_should_not_fund_if_not_enough_allowance() -> anyhow::Result<()> {
756 let channel_id = generate_channel_id(&*ALICE, &*BOB);
757
758 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
759 let db_clone = db.clone();
760 db.begin_transaction()
761 .await?
762 .perform(|tx| {
763 Box::pin(async move {
764 db_clone
765 .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
766 .await?;
767 db_clone
768 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
769 .await?;
770 db_clone.set_network_registry_enabled(Some(tx), false).await?;
771 db_clone
772 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
773 .await
774 })
775 })
776 .await?;
777
778 let tx_queue = ActionQueue::new(
779 db.clone(),
780 MockActionState::new(),
781 MockTransactionExecutor::new(),
782 Default::default(),
783 );
784
785 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
786 let stake = Balance::new(10_000_u32, BalanceType::HOPR);
787 assert!(
788 matches!(
789 actions
790 .fund_channel(channel_id, stake)
791 .await
792 .err()
793 .expect("should be an error"),
794 ChainActionsError::NotEnoughAllowance
795 ),
796 "should fail when not enough allowance"
797 );
798 Ok(())
799 }
800
801 #[async_std::test]
802 async fn test_should_not_fund_if_not_enough_balance() -> anyhow::Result<()> {
803 let channel_id = generate_channel_id(&*ALICE, &*BOB);
804
805 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
806 let db_clone = db.clone();
807 db.begin_transaction()
808 .await?
809 .perform(|tx| {
810 Box::pin(async move {
811 db_clone
812 .set_safe_hopr_allowance(Some(tx), Balance::new(100_000_u64, BalanceType::HOPR))
813 .await?;
814 db_clone
815 .set_safe_hopr_balance(Some(tx), Balance::new(1_u64, BalanceType::HOPR))
816 .await?;
817 db_clone.set_network_registry_enabled(Some(tx), false).await?;
818 db_clone
819 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
820 .await
821 })
822 })
823 .await?;
824
825 let tx_queue = ActionQueue::new(
826 db.clone(),
827 MockActionState::new(),
828 MockTransactionExecutor::new(),
829 Default::default(),
830 );
831
832 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
833 let stake = Balance::new(10_000_u32, BalanceType::HOPR);
834 assert!(
835 matches!(
836 actions
837 .fund_channel(channel_id, stake)
838 .await
839 .err()
840 .expect("should be an error"),
841 ChainActionsError::BalanceTooLow
842 ),
843 "should fail when not enough balance"
844 );
845 Ok(())
846 }
847
848 #[async_std::test]
849 async fn test_close_channel_outgoing() -> anyhow::Result<()> {
850 let stake = Balance::new(10_u32, BalanceType::HOPR);
851 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
852
853 let mut channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
854
855 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
856 let db_clone = db.clone();
857 db.begin_transaction()
858 .await?
859 .perform(|tx| {
860 Box::pin(async move {
861 db_clone
862 .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
863 .await?;
864 db_clone
865 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
866 .await?;
867 db_clone.set_network_registry_enabled(Some(tx), false).await?;
868 db_clone
869 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
870 .await?;
871 db_clone.upsert_channel(Some(tx), channel).await
872 })
873 })
874 .await?;
875
876 let mut tx_exec = MockTransactionExecutor::new();
877 let mut seq = Sequence::new();
878 tx_exec
879 .expect_initiate_outgoing_channel_closure()
880 .times(1)
881 .in_sequence(&mut seq)
882 .withf(move |dst| BOB.eq(dst))
883 .returning(move |_| Ok(random_hash));
884
885 tx_exec
886 .expect_finalize_outgoing_channel_closure()
887 .times(1)
888 .in_sequence(&mut seq)
889 .withf(move |dst| BOB.eq(dst))
890 .returning(move |_| Ok(random_hash));
891
892 let mut indexer_action_tracker = MockActionState::new();
893 let mut seq2 = Sequence::new();
894 indexer_action_tracker
895 .expect_register_expectation()
896 .once()
897 .in_sequence(&mut seq2)
898 .returning(move |_| {
899 Ok(futures::future::ok(SignificantChainEvent {
900 tx_hash: random_hash,
901 event_type: ChainEventType::ChannelClosureInitiated(channel),
902 })
903 .boxed())
904 });
905
906 indexer_action_tracker
907 .expect_register_expectation()
908 .once()
909 .in_sequence(&mut seq2)
910 .returning(move |_| {
911 Ok(futures::future::ok(SignificantChainEvent {
912 tx_hash: random_hash,
913 event_type: ChainEventType::ChannelClosed(channel),
914 })
915 .boxed())
916 });
917
918 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
919 let tx_sender = tx_queue.new_sender();
920 async_std::task::spawn(async move {
921 tx_queue.start().await;
922 });
923
924 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
925
926 let tx_res = actions
927 .close_channel(*BOB, ChannelDirection::Outgoing, false)
928 .await?
929 .await?;
930
931 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
932 assert!(
933 matches!(tx_res.action, Action::CloseChannel(_, _)),
934 "must be close channel action"
935 );
936 assert!(
937 matches!(tx_res.event, Some(ChainEventType::ChannelClosureInitiated(_))),
938 "must correspond to channel chain event"
939 );
940
941 channel.status = ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(10)));
943
944 db.upsert_channel(None, channel).await?;
945
946 let tx_res = actions
947 .close_channel(*BOB, ChannelDirection::Outgoing, false)
948 .await?
949 .await?;
950
951 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
952 assert!(
953 matches!(tx_res.action, Action::CloseChannel(_, _)),
954 "must be close channel action"
955 );
956 assert!(
957 matches!(tx_res.event, Some(ChainEventType::ChannelClosed(_))),
958 "must correspond to channel chain event"
959 );
960 Ok(())
961 }
962
963 #[async_std::test]
964 async fn test_close_channel_incoming() -> anyhow::Result<()> {
965 let stake = Balance::new(10_u32, BalanceType::HOPR);
966 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
967
968 let channel = ChannelEntry::new(*BOB, *ALICE, stake, U256::zero(), ChannelStatus::Open, U256::zero());
969
970 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
971 let db_clone = db.clone();
972 db.begin_transaction()
973 .await?
974 .perform(|tx| {
975 Box::pin(async move {
976 db_clone
977 .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
978 .await?;
979 db_clone
980 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
981 .await?;
982 db_clone.set_network_registry_enabled(Some(tx), false).await?;
983 db_clone
984 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
985 .await?;
986 db_clone.upsert_channel(Some(tx), channel).await
987 })
988 })
989 .await?;
990
991 let mut tx_exec = MockTransactionExecutor::new();
992 let mut seq = Sequence::new();
993 tx_exec
994 .expect_close_incoming_channel()
995 .times(1)
996 .in_sequence(&mut seq)
997 .withf(move |dst| BOB.eq(dst))
998 .returning(move |_| Ok(random_hash));
999
1000 let mut indexer_action_tracker = MockActionState::new();
1001 indexer_action_tracker
1002 .expect_register_expectation()
1003 .returning(move |_| {
1004 Ok(futures::future::ok(SignificantChainEvent {
1005 tx_hash: random_hash,
1006 event_type: ChainEventType::ChannelClosed(channel),
1007 })
1008 .boxed())
1009 });
1010
1011 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
1012 let tx_sender = tx_queue.new_sender();
1013 async_std::task::spawn(async move {
1014 tx_queue.start().await;
1015 });
1016
1017 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
1018
1019 let tx_res = actions
1020 .close_channel(*BOB, ChannelDirection::Incoming, false)
1021 .await?
1022 .await?;
1023
1024 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
1025 assert!(
1026 matches!(tx_res.action, Action::CloseChannel(_, _)),
1027 "must be close channel action"
1028 );
1029 assert!(
1030 matches!(tx_res.event, Some(ChainEventType::ChannelClosed(_))),
1031 "must correspond to channel chain event"
1032 );
1033 Ok(())
1034 }
1035
1036 #[async_std::test]
1037 async fn test_should_not_close_when_closure_time_did_not_elapse() -> anyhow::Result<()> {
1038 let stake = Balance::new(10_u32, BalanceType::HOPR);
1039
1040 let channel = ChannelEntry::new(
1041 *ALICE,
1042 *BOB,
1043 stake,
1044 U256::zero(),
1045 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100))),
1046 U256::zero(),
1047 );
1048
1049 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
1050 let db_clone = db.clone();
1051 db.begin_transaction()
1052 .await?
1053 .perform(|tx| {
1054 Box::pin(async move {
1055 db_clone
1056 .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
1057 .await?;
1058 db_clone
1059 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
1060 .await?;
1061 db_clone.set_network_registry_enabled(Some(tx), false).await?;
1062 db_clone
1063 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
1064 .await?;
1065 db_clone.upsert_channel(Some(tx), channel).await
1066 })
1067 })
1068 .await?;
1069
1070 let tx_queue = ActionQueue::new(
1071 db.clone(),
1072 MockActionState::new(),
1073 MockTransactionExecutor::new(),
1074 Default::default(),
1075 );
1076
1077 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
1078
1079 assert!(
1080 matches!(
1081 actions
1082 .close_channel(*BOB, ChannelDirection::Outgoing, false)
1083 .await
1084 .err()
1085 .expect("should be an error"),
1086 ChainActionsError::ClosureTimeHasNotElapsed(_)
1087 ),
1088 "should fail when the channel closure period did not elapse"
1089 );
1090 Ok(())
1091 }
1092
1093 #[async_std::test]
1094 async fn test_should_not_close_nonexistent_channel() -> anyhow::Result<()> {
1095 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
1096 let db_clone = db.clone();
1097 db.begin_transaction()
1098 .await?
1099 .perform(|tx| {
1100 Box::pin(async move {
1101 db_clone
1102 .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
1103 .await?;
1104 db_clone
1105 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
1106 .await?;
1107 db_clone.set_network_registry_enabled(Some(tx), false).await?;
1108 db_clone
1109 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
1110 .await
1111 })
1112 })
1113 .await?;
1114
1115 let tx_queue = ActionQueue::new(
1116 db.clone(),
1117 MockActionState::new(),
1118 MockTransactionExecutor::new(),
1119 Default::default(),
1120 );
1121 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
1122
1123 assert!(
1124 matches!(
1125 actions
1126 .close_channel(*BOB, ChannelDirection::Outgoing, false)
1127 .await
1128 .err()
1129 .expect("should be an error"),
1130 ChainActionsError::ChannelDoesNotExist
1131 ),
1132 "should fail when channel does not exist"
1133 );
1134 Ok(())
1135 }
1136
1137 #[async_std::test]
1138 async fn test_should_not_close_closed_channel() -> anyhow::Result<()> {
1139 let stake = Balance::new(10_u32, BalanceType::HOPR);
1140 let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Closed, U256::zero());
1141
1142 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
1143 let db_clone = db.clone();
1144 db.begin_transaction()
1145 .await?
1146 .perform(|tx| {
1147 Box::pin(async move {
1148 db_clone
1149 .set_safe_hopr_allowance(Some(tx), Balance::new(1_000_u64, BalanceType::HOPR))
1150 .await?;
1151 db_clone
1152 .set_safe_hopr_balance(Some(tx), Balance::new(5_000_000_u64, BalanceType::HOPR))
1153 .await?;
1154 db_clone.set_network_registry_enabled(Some(tx), false).await?;
1155 db_clone
1156 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
1157 .await?;
1158 db_clone.upsert_channel(Some(tx), channel).await
1159 })
1160 })
1161 .await?;
1162
1163 let tx_queue = ActionQueue::new(
1164 db.clone(),
1165 MockActionState::new(),
1166 MockTransactionExecutor::new(),
1167 Default::default(),
1168 );
1169
1170 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
1171
1172 assert!(
1173 matches!(
1174 actions
1175 .close_channel(*BOB, ChannelDirection::Outgoing, false)
1176 .await
1177 .err()
1178 .expect("should be an error"),
1179 ChainActionsError::ChannelAlreadyClosed
1180 ),
1181 "should fail when channel is already closed"
1182 );
1183 Ok(())
1184 }
1185}