1use std::time::Duration;
17
18use async_trait::async_trait;
19use hopr_chain_types::actions::Action;
20use hopr_crypto_types::types::Hash;
21use hopr_db_sql::HoprDbAllOperations;
22use hopr_internal_types::prelude::*;
23use hopr_platform::time::native::current_time;
24use hopr_primitive_types::prelude::*;
25use tracing::{debug, error, info};
26
27use crate::{
28 ChainActions,
29 action_queue::PendingAction,
30 errors::{
31 ChainActionsError::{
32 BalanceTooLow, ChannelAlreadyClosed, ChannelAlreadyExists, ChannelDoesNotExist, ClosureTimeHasNotElapsed,
33 InvalidArguments, InvalidChannelStake, InvalidState, NotEnoughAllowance, PeerAccessDenied,
34 },
35 Result,
36 },
37 redeem::TicketRedeemActions,
38};
39
40#[async_trait]
42pub trait ChannelActions {
43 async fn open_channel(&self, destination: Address, amount: HoprBalance) -> Result<PendingAction>;
45
46 async fn fund_channel(&self, channel_id: Hash, amount: HoprBalance) -> Result<PendingAction>;
48
49 async fn close_channel(
52 &self,
53 counterparty: Address,
54 direction: ChannelDirection,
55 redeem_before_close: bool,
56 ) -> Result<PendingAction>;
57}
58
59#[async_trait]
60impl<Db> ChannelActions for ChainActions<Db>
61where
62 Db: HoprDbAllOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
63{
64 #[tracing::instrument(level = "debug", skip(self))]
65 async fn open_channel(&self, destination: Address, amount: HoprBalance) -> Result<PendingAction> {
66 if self.self_address() == destination {
67 return Err(InvalidArguments("cannot open channel to self".into()));
68 }
69
70 if amount.is_zero() {
71 return Err(InvalidArguments("invalid balance or balance type given".into()));
72 }
73
74 let db_clone = self.db.clone();
76 let self_addr = self.self_address();
77 self.db
78 .begin_transaction()
79 .await?
80 .perform(|tx| {
81 Box::pin(async move {
82 let allowance = db_clone.get_safe_hopr_allowance(Some(tx)).await?;
83 debug!(%allowance, "current staking safe allowance");
84 if allowance < amount {
85 return Err(NotEnoughAllowance);
86 }
87
88 let hopr_balance = db_clone.get_safe_hopr_balance(Some(tx)).await?;
89 debug!(balance = %hopr_balance, "current Safe HOPR balance");
90 if hopr_balance < amount {
91 return Err(BalanceTooLow);
92 }
93
94 if HoprBalance::from(ChannelEntry::MAX_CHANNEL_BALANCE) < amount {
95 return Err(InvalidChannelStake);
96 }
97
98 if db_clone.get_indexer_data(Some(tx)).await?.nr_enabled
99 && !db_clone.is_allowed_in_network_registry(Some(tx), &destination).await?
100 {
101 return Err(PeerAccessDenied);
102 }
103
104 let maybe_channel = db_clone
105 .get_channel_by_parties(Some(tx), &self_addr, &destination, false)
106 .await?;
107 if let Some(channel) = maybe_channel {
108 debug!(%channel, "already found existing channel");
109 if channel.status != ChannelStatus::Closed {
110 error!(
111 %destination,
112 "channel to destination is already opened or pending to close"
113 );
114 return Err(ChannelAlreadyExists);
115 }
116 }
117 Ok(())
118 })
119 })
120 .await?;
121
122 info!(%destination, %amount, "initiating channel open");
123 self.tx_sender.send(Action::OpenChannel(destination, amount)).await
124 }
125
126 #[tracing::instrument(level = "debug", skip(self))]
127 async fn fund_channel(&self, channel_id: Hash, amount: HoprBalance) -> Result<PendingAction> {
128 if amount.is_zero() {
129 return Err(InvalidArguments("invalid balance or balance type given".into()));
130 }
131
132 let db_clone = self.db.clone();
133 let maybe_channel = self
134 .db
135 .begin_transaction()
136 .await?
137 .perform(|tx| {
138 Box::pin(async move {
139 let allowance = db_clone.get_safe_hopr_allowance(Some(tx)).await?;
140 debug!(%allowance, "current staking safe allowance");
141 if allowance.lt(&amount) {
142 return Err(NotEnoughAllowance);
143 }
144
145 let hopr_balance = db_clone.get_safe_hopr_balance(Some(tx)).await?;
146 debug!(balance = %hopr_balance, "current Safe HOPR balance");
147 if hopr_balance.lt(&amount) {
148 return Err(BalanceTooLow);
149 }
150
151 Ok(db_clone.get_channel_by_id(Some(tx), &channel_id).await?)
152 })
153 })
154 .await?;
155
156 match maybe_channel {
157 Some(channel) => {
158 if channel.status == ChannelStatus::Open {
159 if channel.balance + amount > HoprBalance::from(ChannelEntry::MAX_CHANNEL_BALANCE) {
160 return Err(InvalidChannelStake);
161 }
162
163 info!("initiating funding of {channel} with {amount}");
164 self.tx_sender.send(Action::FundChannel(channel, amount)).await
165 } else {
166 Err(InvalidState(format!("channel {channel_id} is not opened")))
167 }
168 }
169 None => Err(ChannelDoesNotExist),
170 }
171 }
172
173 #[tracing::instrument(level = "debug", skip(self))]
174 async fn close_channel(
175 &self,
176 counterparty: Address,
177 direction: ChannelDirection,
178 redeem_before_close: bool,
179 ) -> Result<PendingAction> {
180 let maybe_channel = match direction {
181 ChannelDirection::Incoming => {
182 self.db
183 .get_channel_by_parties(None, &counterparty, &self.self_address(), false)
184 .await?
185 }
186 ChannelDirection::Outgoing => {
187 self.db
188 .get_channel_by_parties(None, &self.self_address(), &counterparty, false)
189 .await?
190 }
191 };
192
193 match maybe_channel {
194 Some(channel) => {
195 match channel.status {
196 ChannelStatus::Closed => Err(ChannelAlreadyClosed),
197 ChannelStatus::PendingToClose(_) => {
198 let remaining_closure_time = channel.remaining_closure_time(current_time());
199 info!(%channel, ?remaining_closure_time, "remaining closure time update for a channel");
200 match remaining_closure_time {
201 Some(Duration::ZERO) => {
202 info!(%channel, %direction, "initiating finalization of channel closure");
203 self.tx_sender.send(Action::CloseChannel(channel, direction)).await
204 }
205 _ => Err(ClosureTimeHasNotElapsed(
206 channel
207 .remaining_closure_time(current_time())
208 .expect("impossible: closure time has not passed but no remaining closure time")
209 .as_secs(),
210 )),
211 }
212 }
213 ChannelStatus::Open => {
214 if redeem_before_close && direction == ChannelDirection::Incoming {
215 let redeemed = self.redeem_tickets_in_channel(&channel, false).await?.len();
218 info!(count = redeemed, %channel, "redeemed tickets before channel closing");
219 }
220
221 info!(%channel, ?direction, "initiating channel closure");
222 self.tx_sender.send(Action::CloseChannel(channel, direction)).await
223 }
224 }
225 }
226 None => Err(ChannelDoesNotExist),
227 }
228 }
229}
230#[cfg(test)]
231mod tests {
232 use std::{
233 ops::{Add, Sub},
234 time::{Duration, SystemTime},
235 };
236
237 use futures::FutureExt;
238 use hex_literal::hex;
239 use hopr_chain_types::{
240 actions::Action,
241 chain_events::{ChainEventType, SignificantChainEvent},
242 };
243 use hopr_crypto_random::random_bytes;
244 use hopr_crypto_types::prelude::*;
245 use hopr_db_sql::{
246 HoprDbGeneralModelOperations, api::info::DomainSeparator, channels::HoprDbChannelOperations, db::HoprDb,
247 errors::DbSqlError, info::HoprDbInfoOperations,
248 };
249 use hopr_internal_types::prelude::*;
250 use hopr_primitive_types::prelude::*;
251 use lazy_static::lazy_static;
252 use mockall::Sequence;
253
254 use crate::{
255 ChainActions,
256 action_queue::{ActionQueue, MockTransactionExecutor},
257 action_state::MockActionState,
258 channels::ChannelActions,
259 errors::ChainActionsError,
260 };
261
262 lazy_static! {
263 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
264 "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
265 ))
266 .expect("lazy static keypair should be constructible");
267 static ref BOB_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
268 "48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c"
269 ))
270 .expect("lazy static keypair should be constructible");
271 static ref ALICE: Address = ALICE_KP.public().to_address();
272 static ref BOB: Address = BOB_KP.public().to_address();
273 }
274
275 async fn init_db(
276 db: &HoprDb,
277 safe_balance: HoprBalance,
278 safe_allowance: HoprBalance,
279 channel: Option<ChannelEntry>,
280 ) -> anyhow::Result<()> {
281 let db_clone = db.clone();
282 Ok(db
283 .begin_transaction()
284 .await?
285 .perform(|tx| {
286 Box::pin(async move {
287 db_clone.set_safe_hopr_allowance(Some(tx), safe_allowance).await?;
288 db_clone.set_safe_hopr_balance(Some(tx), safe_balance).await?;
289 db_clone.set_network_registry_enabled(Some(tx), false).await?;
290 db_clone
291 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
292 .await?;
293
294 if let Some(channel) = channel {
295 db_clone.upsert_channel(Some(tx), channel).await?;
296 }
297
298 Ok::<_, DbSqlError>(())
299 })
300 })
301 .await?)
302 }
303
304 #[tokio::test]
305 async fn test_open_channel() -> anyhow::Result<()> {
306 let stake: HoprBalance = 10_u32.into();
307 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
308
309 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
310 init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), None).await?;
311
312 let mut tx_exec = MockTransactionExecutor::new();
313 tx_exec
314 .expect_fund_channel()
315 .times(1)
316 .withf(move |dst, balance| BOB.eq(dst) && stake.eq(balance))
317 .returning(move |_, _| Ok(random_hash));
318
319 let new_channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
320
321 let mut indexer_action_tracker = MockActionState::new();
322 indexer_action_tracker
323 .expect_register_expectation()
324 .once()
325 .returning(move |_| {
326 Ok(futures::future::ok(SignificantChainEvent {
327 tx_hash: random_hash,
328 event_type: ChainEventType::ChannelOpened(new_channel),
329 })
330 .boxed())
331 });
332
333 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
334
335 let tx_sender = tx_queue.new_sender();
336 tokio::task::spawn(async move { tx_queue.start().await });
337
338 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
339
340 let tx_res = actions.open_channel(*BOB, stake).await?.await?;
341
342 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
343 assert!(
344 matches!(tx_res.action, Action::OpenChannel(_, _)),
345 "must be open channel action"
346 );
347 assert!(
348 matches!(tx_res.event, Some(ChainEventType::ChannelOpened(_))),
349 "must correspond to open channel chain event"
350 );
351
352 Ok(())
353 }
354
355 #[tokio::test]
356 async fn test_should_not_open_channel_again() -> anyhow::Result<()> {
357 let stake = 10_u32.into();
358
359 let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
360
361 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
362 init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), Some(channel)).await?;
363
364 let tx_queue = ActionQueue::new(
365 db.clone(),
366 MockActionState::new(),
367 MockTransactionExecutor::new(),
368 Default::default(),
369 );
370
371 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
372
373 assert!(
374 matches!(
375 actions
376 .open_channel(*BOB, stake)
377 .await
378 .err()
379 .expect("should be an error"),
380 ChainActionsError::ChannelAlreadyExists
381 ),
382 "should fail when channel exists"
383 );
384
385 Ok(())
386 }
387
388 #[tokio::test]
389 async fn test_should_not_open_channel_to_self() -> anyhow::Result<()> {
390 let stake = 10_u32.into();
391
392 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
393 init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), None).await?;
394
395 let tx_queue = ActionQueue::new(
396 db.clone(),
397 MockActionState::new(),
398 MockTransactionExecutor::new(),
399 Default::default(),
400 );
401
402 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
403
404 assert!(
405 matches!(
406 actions
407 .open_channel(*ALICE, stake)
408 .await
409 .err()
410 .expect("should be an error"),
411 ChainActionsError::InvalidArguments(_)
412 ),
413 "should not create channel to self"
414 );
415 Ok(())
416 }
417
418 #[tokio::test]
419 async fn test_should_not_open_channel_with_too_big_stake() -> anyhow::Result<()> {
420 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
421 init_db(&db, U256::max_value().into(), U256::max_value().into(), None).await?;
422
423 let tx_queue = ActionQueue::new(
424 db.clone(),
425 MockActionState::new(),
426 MockTransactionExecutor::new(),
427 Default::default(),
428 );
429
430 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
431
432 assert!(
433 matches!(
434 actions
435 .open_channel(*BOB, (ChannelEntry::MAX_CHANNEL_BALANCE + 1).into())
436 .await
437 .err()
438 .expect("should be an error"),
439 ChainActionsError::InvalidChannelStake
440 ),
441 "should not create channel with too big stake"
442 );
443 Ok(())
444 }
445
446 #[tokio::test]
447 async fn test_should_not_open_if_not_enough_allowance() -> anyhow::Result<()> {
448 let stake = 10_000_u32.into();
449
450 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
451 init_db(&db, 5_000_000_u64.into(), 1_000_u64.into(), None).await?;
452
453 let tx_queue = ActionQueue::new(
454 db.clone(),
455 MockActionState::new(),
456 MockTransactionExecutor::new(),
457 Default::default(),
458 );
459
460 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
461
462 assert!(
463 matches!(
464 actions
465 .open_channel(*BOB, stake)
466 .await
467 .err()
468 .expect("should be an error"),
469 ChainActionsError::NotEnoughAllowance
470 ),
471 "should fail when not enough allowance"
472 );
473 Ok(())
474 }
475
476 #[tokio::test]
477 async fn test_should_not_open_if_not_enough_token_balance() -> anyhow::Result<()> {
478 let stake = 10_000_u32.into();
479
480 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
481 init_db(&db, 1_u64.into(), 10_000_000_u64.into(), None).await?;
482
483 let tx_queue = ActionQueue::new(
484 db.clone(),
485 MockActionState::new(),
486 MockTransactionExecutor::new(),
487 Default::default(),
488 );
489
490 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
491
492 assert!(
493 matches!(
494 actions
495 .open_channel(*BOB, stake)
496 .await
497 .err()
498 .expect("should be an error"),
499 ChainActionsError::BalanceTooLow
500 ),
501 "should fail when not enough token balance"
502 );
503 Ok(())
504 }
505
506 #[tokio::test]
507 async fn test_fund_channel() -> anyhow::Result<()> {
508 let stake = 10_u32.into();
509 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
510 let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
511
512 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
513 init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), Some(channel)).await?;
514
515 let mut tx_exec = MockTransactionExecutor::new();
516 tx_exec
517 .expect_fund_channel()
518 .times(1)
519 .withf(move |dest, balance| channel.destination.eq(dest) && stake.eq(balance))
520 .returning(move |_, _| Ok(random_hash));
521
522 let mut indexer_action_tracker = MockActionState::new();
523 indexer_action_tracker
524 .expect_register_expectation()
525 .once()
526 .returning(move |_| {
527 Ok(futures::future::ok(SignificantChainEvent {
528 tx_hash: random_hash,
529 event_type: ChainEventType::ChannelBalanceIncreased(channel, stake),
530 })
531 .boxed())
532 });
533
534 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
535 let tx_sender = tx_queue.new_sender();
536 tokio::task::spawn(async move {
537 tx_queue.start().await;
538 });
539
540 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
541
542 let tx_res = actions.fund_channel(channel.get_id(), stake).await?.await?;
543
544 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
545 assert!(
546 matches!(tx_res.action, Action::FundChannel(_, _)),
547 "must be open channel action"
548 );
549 assert!(
550 matches!(tx_res.event, Some(ChainEventType::ChannelBalanceIncreased(_, _))),
551 "must correspond to channel chain event"
552 );
553 Ok(())
554 }
555
556 #[tokio::test]
557 async fn test_fund_channel_should_not_over_fund() -> anyhow::Result<()> {
558 let channel = ChannelEntry::new(
559 *ALICE,
560 *BOB,
561 HoprBalance::from(ChannelEntry::MAX_CHANNEL_BALANCE),
562 U256::zero(),
563 ChannelStatus::Open,
564 U256::zero(),
565 );
566
567 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
568 init_db(&db, U256::max_value().into(), U256::max_value().into(), Some(channel)).await?;
569
570 let tx_queue = ActionQueue::new(
571 db.clone(),
572 MockActionState::new(),
573 MockTransactionExecutor::new(),
574 Default::default(),
575 );
576
577 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
578
579 assert!(
580 matches!(
581 actions
582 .fund_channel(channel.get_id(), 1.into())
583 .await
584 .err()
585 .expect("should be an error"),
586 ChainActionsError::InvalidChannelStake
587 ),
588 "should fail channel stake is too high"
589 );
590 Ok(())
591 }
592
593 #[tokio::test]
594 async fn test_should_not_fund_nonexistent_channel() -> anyhow::Result<()> {
595 let channel_id = generate_channel_id(&ALICE, &BOB);
596
597 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
598 init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), None).await?;
599
600 let tx_queue = ActionQueue::new(
601 db.clone(),
602 MockActionState::new(),
603 MockTransactionExecutor::new(),
604 Default::default(),
605 );
606
607 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
608 let stake = 10_u32.into();
609 assert!(
610 matches!(
611 actions
612 .fund_channel(channel_id, stake)
613 .await
614 .err()
615 .expect("should be an error"),
616 ChainActionsError::ChannelDoesNotExist
617 ),
618 "should fail when channel does not exist"
619 );
620 Ok(())
621 }
622
623 #[tokio::test]
624 async fn test_should_not_fund_if_not_enough_allowance() -> anyhow::Result<()> {
625 let channel_id = generate_channel_id(&ALICE, &BOB);
626
627 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
628 init_db(&db, 5_000_000_u64.into(), 1_000_u64.into(), None).await?;
629
630 let tx_queue = ActionQueue::new(
631 db.clone(),
632 MockActionState::new(),
633 MockTransactionExecutor::new(),
634 Default::default(),
635 );
636
637 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
638 let stake = 10_000_u32.into();
639 assert!(
640 matches!(
641 actions
642 .fund_channel(channel_id, stake)
643 .await
644 .err()
645 .expect("should be an error"),
646 ChainActionsError::NotEnoughAllowance
647 ),
648 "should fail when not enough allowance"
649 );
650 Ok(())
651 }
652
653 #[tokio::test]
654 async fn test_should_not_fund_if_not_enough_balance() -> anyhow::Result<()> {
655 let channel_id = generate_channel_id(&ALICE, &BOB);
656
657 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
658 init_db(&db, 1_u64.into(), 100_000_u64.into(), None).await?;
659
660 let tx_queue = ActionQueue::new(
661 db.clone(),
662 MockActionState::new(),
663 MockTransactionExecutor::new(),
664 Default::default(),
665 );
666
667 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
668 let stake = 10_000_u32.into();
669 assert!(
670 matches!(
671 actions
672 .fund_channel(channel_id, stake)
673 .await
674 .err()
675 .expect("should be an error"),
676 ChainActionsError::BalanceTooLow
677 ),
678 "should fail when not enough balance"
679 );
680 Ok(())
681 }
682
683 #[tokio::test]
684 async fn test_close_channel_outgoing() -> anyhow::Result<()> {
685 let stake = 10_u32.into();
686 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
687
688 let mut channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
689
690 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
691 init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
692
693 let mut tx_exec = MockTransactionExecutor::new();
694 let mut seq = Sequence::new();
695 tx_exec
696 .expect_initiate_outgoing_channel_closure()
697 .times(1)
698 .in_sequence(&mut seq)
699 .withf(move |dst| BOB.eq(dst))
700 .returning(move |_| Ok(random_hash));
701
702 tx_exec
703 .expect_finalize_outgoing_channel_closure()
704 .times(1)
705 .in_sequence(&mut seq)
706 .withf(move |dst| BOB.eq(dst))
707 .returning(move |_| Ok(random_hash));
708
709 let mut indexer_action_tracker = MockActionState::new();
710 let mut seq2 = Sequence::new();
711 indexer_action_tracker
712 .expect_register_expectation()
713 .once()
714 .in_sequence(&mut seq2)
715 .returning(move |_| {
716 Ok(futures::future::ok(SignificantChainEvent {
717 tx_hash: random_hash,
718 event_type: ChainEventType::ChannelClosureInitiated(channel),
719 })
720 .boxed())
721 });
722
723 indexer_action_tracker
724 .expect_register_expectation()
725 .once()
726 .in_sequence(&mut seq2)
727 .returning(move |_| {
728 Ok(futures::future::ok(SignificantChainEvent {
729 tx_hash: random_hash,
730 event_type: ChainEventType::ChannelClosed(channel),
731 })
732 .boxed())
733 });
734
735 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
736 let tx_sender = tx_queue.new_sender();
737 tokio::task::spawn(async move {
738 tx_queue.start().await;
739 });
740
741 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
742
743 let tx_res = actions
744 .close_channel(*BOB, ChannelDirection::Outgoing, false)
745 .await?
746 .await?;
747
748 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
749 assert!(
750 matches!(tx_res.action, Action::CloseChannel(_, _)),
751 "must be close channel action"
752 );
753 assert!(
754 matches!(tx_res.event, Some(ChainEventType::ChannelClosureInitiated(_))),
755 "must correspond to channel chain event"
756 );
757
758 channel.status = ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(10)));
760
761 db.upsert_channel(None, channel).await?;
762
763 let tx_res = actions
764 .close_channel(*BOB, ChannelDirection::Outgoing, false)
765 .await?
766 .await?;
767
768 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
769 assert!(
770 matches!(tx_res.action, Action::CloseChannel(_, _)),
771 "must be close channel action"
772 );
773 assert!(
774 matches!(tx_res.event, Some(ChainEventType::ChannelClosed(_))),
775 "must correspond to channel chain event"
776 );
777 Ok(())
778 }
779
780 #[tokio::test]
781 async fn test_close_channel_incoming() -> anyhow::Result<()> {
782 let stake = 10_u32.into();
783 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
784
785 let channel = ChannelEntry::new(*BOB, *ALICE, stake, U256::zero(), ChannelStatus::Open, U256::zero());
786
787 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
788 init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
789
790 let mut tx_exec = MockTransactionExecutor::new();
791 let mut seq = Sequence::new();
792 tx_exec
793 .expect_close_incoming_channel()
794 .times(1)
795 .in_sequence(&mut seq)
796 .withf(move |dst| BOB.eq(dst))
797 .returning(move |_| Ok(random_hash));
798
799 let mut indexer_action_tracker = MockActionState::new();
800 indexer_action_tracker
801 .expect_register_expectation()
802 .returning(move |_| {
803 Ok(futures::future::ok(SignificantChainEvent {
804 tx_hash: random_hash,
805 event_type: ChainEventType::ChannelClosed(channel),
806 })
807 .boxed())
808 });
809
810 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
811 let tx_sender = tx_queue.new_sender();
812 tokio::task::spawn(async move {
813 tx_queue.start().await;
814 });
815
816 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_sender.clone());
817
818 let tx_res = actions
819 .close_channel(*BOB, ChannelDirection::Incoming, false)
820 .await?
821 .await?;
822
823 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
824 assert!(
825 matches!(tx_res.action, Action::CloseChannel(_, _)),
826 "must be close channel action"
827 );
828 assert!(
829 matches!(tx_res.event, Some(ChainEventType::ChannelClosed(_))),
830 "must correspond to channel chain event"
831 );
832 Ok(())
833 }
834
835 #[tokio::test]
836 async fn test_should_not_close_when_closure_time_did_not_elapse() -> anyhow::Result<()> {
837 let stake = 10_u32.into();
838
839 let channel = ChannelEntry::new(
840 *ALICE,
841 *BOB,
842 stake,
843 U256::zero(),
844 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100))),
845 U256::zero(),
846 );
847
848 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
849 init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
850
851 let tx_queue = ActionQueue::new(
852 db.clone(),
853 MockActionState::new(),
854 MockTransactionExecutor::new(),
855 Default::default(),
856 );
857
858 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
859
860 assert!(
861 matches!(
862 actions
863 .close_channel(*BOB, ChannelDirection::Outgoing, false)
864 .await
865 .err()
866 .expect("should be an error"),
867 ChainActionsError::ClosureTimeHasNotElapsed(_)
868 ),
869 "should fail when the channel closure period did not elapse"
870 );
871 Ok(())
872 }
873
874 #[tokio::test]
875 async fn test_should_not_close_nonexistent_channel() -> anyhow::Result<()> {
876 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
877 init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), None).await?;
878
879 let tx_queue = ActionQueue::new(
880 db.clone(),
881 MockActionState::new(),
882 MockTransactionExecutor::new(),
883 Default::default(),
884 );
885 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
886
887 assert!(
888 matches!(
889 actions
890 .close_channel(*BOB, ChannelDirection::Outgoing, false)
891 .await
892 .err()
893 .expect("should be an error"),
894 ChainActionsError::ChannelDoesNotExist
895 ),
896 "should fail when channel does not exist"
897 );
898 Ok(())
899 }
900
901 #[tokio::test]
902 async fn test_should_not_close_closed_channel() -> anyhow::Result<()> {
903 let stake = 10_u32.into();
904 let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Closed, U256::zero());
905
906 let db = HoprDb::new_in_memory(ALICE_KP.clone()).await?;
907 init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
908
909 let tx_queue = ActionQueue::new(
910 db.clone(),
911 MockActionState::new(),
912 MockTransactionExecutor::new(),
913 Default::default(),
914 );
915
916 let actions = ChainActions::new(&ALICE_KP, db.clone(), tx_queue.new_sender());
917
918 assert!(
919 matches!(
920 actions
921 .close_channel(*BOB, ChannelDirection::Outgoing, false)
922 .await
923 .err()
924 .expect("should be an error"),
925 ChainActionsError::ChannelAlreadyClosed
926 ),
927 "should fail when channel is already closed"
928 );
929 Ok(())
930 }
931}