1use std::time::Duration;
17
18use async_trait::async_trait;
19use hopr_chain_types::actions::Action;
20use hopr_crypto_types::types::Hash;
21use hopr_db_sql::prelude::{HoprDbChannelOperations, HoprDbInfoOperations};
22use hopr_internal_types::prelude::*;
23use hopr_platform::time::native::current_time;
24use hopr_primitive_types::prelude::*;
25use tracing::{debug, error, info};
26
27use crate::{
28 ChainActions,
29 action_queue::PendingAction,
30 errors::{
31 ChainActionsError::{
32 BalanceTooLow, ChannelAlreadyClosed, ChannelAlreadyExists, ChannelDoesNotExist, ClosureTimeHasNotElapsed,
33 InvalidArguments, InvalidChannelStake, InvalidState, NotEnoughAllowance,
34 },
35 Result,
36 },
37};
38
39#[async_trait]
41pub trait ChannelActions {
42 async fn open_channel(&self, destination: Address, amount: HoprBalance) -> Result<PendingAction>;
44
45 async fn fund_channel(&self, channel_id: Hash, amount: HoprBalance) -> Result<PendingAction>;
47
48 async fn close_channel(&self, channel_entry: ChannelEntry) -> Result<PendingAction>;
51}
52
53#[async_trait]
54impl<Db: Sync> ChannelActions for ChainActions<Db> {
55 #[tracing::instrument(level = "debug", skip(self))]
56 async fn open_channel(&self, destination: Address, amount: HoprBalance) -> Result<PendingAction> {
57 if self.self_address() == destination {
58 return Err(InvalidArguments("cannot open channel to self".into()));
59 }
60
61 if amount.is_zero() {
62 return Err(InvalidArguments("invalid balance or balance type given".into()));
63 }
64
65 let allowance: HoprBalance = self.index_db.get_safe_hopr_allowance(None).await?;
67 debug!(%allowance, "current staking safe allowance");
68 if allowance < amount {
69 return Err(NotEnoughAllowance);
70 }
71
72 let hopr_balance: HoprBalance = self.index_db.get_safe_hopr_balance(None).await?;
73 debug!(balance = %hopr_balance, "current Safe HOPR balance");
74 if hopr_balance < amount {
75 return Err(BalanceTooLow);
76 }
77
78 if HoprBalance::from(ChannelEntry::MAX_CHANNEL_BALANCE) < amount {
79 return Err(InvalidChannelStake);
80 }
81
82 let maybe_channel = self
83 .index_db
84 .get_channel_by_parties(None, &self.self_address(), &destination, false)
85 .await?;
86 if let Some(channel) = maybe_channel {
87 debug!(%channel, "already found existing channel");
88 if channel.status != ChannelStatus::Closed {
89 error!(
90 %destination,
91 "channel to destination is already opened or pending to close"
92 );
93 return Err(ChannelAlreadyExists);
94 }
95 }
96
97 info!(%destination, %amount, "initiating channel open");
98 self.tx_sender.send(Action::OpenChannel(destination, amount)).await
99 }
100
101 #[tracing::instrument(level = "debug", skip(self))]
102 async fn fund_channel(&self, channel_id: Hash, amount: HoprBalance) -> Result<PendingAction> {
103 if amount.is_zero() {
104 return Err(InvalidArguments("invalid balance or balance type given".into()));
105 }
106
107 let allowance: HoprBalance = self.index_db.get_safe_hopr_allowance(None).await?;
108 debug!(%allowance, "current staking safe allowance");
109 if allowance.lt(&amount) {
110 return Err(NotEnoughAllowance);
111 }
112
113 let hopr_balance: HoprBalance = self.index_db.get_safe_hopr_balance(None).await?;
114 debug!(balance = %hopr_balance, "current Safe HOPR balance");
115 if hopr_balance.lt(&amount) {
116 return Err(BalanceTooLow);
117 }
118
119 match self.index_db.get_channel_by_id(None, &channel_id).await? {
120 Some(channel) => {
121 if channel.status == ChannelStatus::Open {
122 if channel.balance + amount > HoprBalance::from(ChannelEntry::MAX_CHANNEL_BALANCE) {
123 return Err(InvalidChannelStake);
124 }
125
126 info!("initiating funding of {channel} with {amount}");
127 self.tx_sender.send(Action::FundChannel(channel, amount)).await
128 } else {
129 Err(InvalidState(format!("channel {channel_id} is not opened")))
130 }
131 }
132 None => Err(ChannelDoesNotExist),
133 }
134 }
135
136 #[tracing::instrument(level = "debug", skip(self))]
137 async fn close_channel(&self, channel: ChannelEntry) -> Result<PendingAction> {
138 let direction = channel.direction(&self.me).ok_or(ChannelDoesNotExist)?; match channel.status {
141 ChannelStatus::Closed => Err(ChannelAlreadyClosed),
142 ChannelStatus::PendingToClose(_) => {
143 let remaining_closure_time = channel.remaining_closure_time(current_time());
144 info!(%channel, ?remaining_closure_time, "remaining closure time update for a channel");
145 match remaining_closure_time {
146 Some(Duration::ZERO) => {
147 info!(%channel, %direction, "initiating finalization of channel closure");
148 self.tx_sender.send(Action::CloseChannel(channel, direction)).await
149 }
150 _ => Err(ClosureTimeHasNotElapsed(
151 channel
152 .remaining_closure_time(current_time())
153 .expect("impossible: closure time has not passed but no remaining closure time")
154 .as_secs(),
155 )),
156 }
157 }
158 ChannelStatus::Open => {
159 info!(%channel, ?direction, "initiating channel closure");
160 self.tx_sender.send(Action::CloseChannel(channel, direction)).await
161 }
162 }
163 }
164}
165#[cfg(test)]
166mod tests {
167 use std::{
168 ops::{Add, Sub},
169 time::{Duration, SystemTime},
170 };
171
172 use futures::FutureExt;
173 use hex_literal::hex;
174 use hopr_chain_types::{
175 actions::Action,
176 chain_events::{ChainEventType, SignificantChainEvent},
177 };
178 use hopr_crypto_random::random_bytes;
179 use hopr_crypto_types::prelude::*;
180 use hopr_db_node::HoprNodeDb;
181 use hopr_db_sql::{
182 HoprDbGeneralModelOperations, HoprIndexerDb, channels::HoprDbChannelOperations, errors::DbSqlError,
183 info::HoprDbInfoOperations, prelude::DomainSeparator,
184 };
185 use hopr_internal_types::prelude::*;
186 use hopr_primitive_types::prelude::*;
187 use lazy_static::lazy_static;
188 use mockall::Sequence;
189
190 use crate::{
191 ChainActions,
192 action_queue::{ActionQueue, MockTransactionExecutor},
193 action_state::MockActionState,
194 channels::ChannelActions,
195 errors::ChainActionsError,
196 };
197
198 lazy_static! {
199 static ref ALICE_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
200 "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
201 ))
202 .expect("lazy static keypair should be constructible");
203 static ref BOB_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
204 "48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c"
205 ))
206 .expect("lazy static keypair should be constructible");
207 static ref ALICE: Address = ALICE_KP.public().to_address();
208 static ref BOB: Address = BOB_KP.public().to_address();
209 }
210
211 async fn init_db(
212 db: &HoprIndexerDb,
213 safe_balance: HoprBalance,
214 safe_allowance: HoprBalance,
215 channel: Option<ChannelEntry>,
216 ) -> anyhow::Result<()> {
217 let db_clone = db.clone();
218 Ok(db
219 .begin_transaction()
220 .await?
221 .perform(|tx| {
222 Box::pin(async move {
223 db_clone.set_safe_hopr_allowance(Some(tx), safe_allowance).await?;
224 db_clone.set_safe_hopr_balance(Some(tx), safe_balance).await?;
225 db_clone
226 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
227 .await?;
228
229 if let Some(channel) = channel {
230 db_clone.upsert_channel(Some(tx), channel).await?;
231 }
232
233 Ok::<_, DbSqlError>(())
234 })
235 })
236 .await?)
237 }
238
239 #[tokio::test]
240 async fn test_open_channel() -> anyhow::Result<()> {
241 let stake: HoprBalance = 10_u32.into();
242 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
243
244 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
245 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
246 init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), None).await?;
247
248 let mut tx_exec = MockTransactionExecutor::new();
249 tx_exec
250 .expect_fund_channel()
251 .times(1)
252 .withf(move |dst, balance| BOB.eq(dst) && stake.eq(balance))
253 .returning(move |_, _| Ok(random_hash));
254
255 let new_channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
256
257 let mut indexer_action_tracker = MockActionState::new();
258 indexer_action_tracker
259 .expect_register_expectation()
260 .once()
261 .returning(move |_| {
262 Ok(futures::future::ok(SignificantChainEvent {
263 tx_hash: random_hash,
264 event_type: ChainEventType::ChannelOpened(new_channel),
265 })
266 .boxed())
267 });
268
269 let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
270
271 let tx_sender = tx_queue.new_sender();
272 tokio::task::spawn(async move { tx_queue.start().await });
273
274 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_sender.clone());
275
276 let tx_res = actions.open_channel(*BOB, stake).await?.await?;
277
278 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
279 assert!(
280 matches!(tx_res.action, Action::OpenChannel(_, _)),
281 "must be open channel action"
282 );
283 assert!(
284 matches!(tx_res.event, Some(ChainEventType::ChannelOpened(_))),
285 "must correspond to open channel chain event"
286 );
287
288 Ok(())
289 }
290
291 #[tokio::test]
292 async fn test_should_not_open_channel_again() -> anyhow::Result<()> {
293 let stake = 10_u32.into();
294
295 let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
296
297 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
298 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
299 init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), Some(channel)).await?;
300
301 let tx_queue = ActionQueue::new(
302 node_db.clone(),
303 MockActionState::new(),
304 MockTransactionExecutor::new(),
305 Default::default(),
306 );
307
308 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
309
310 assert!(
311 matches!(
312 actions
313 .open_channel(*BOB, stake)
314 .await
315 .err()
316 .expect("should be an error"),
317 ChainActionsError::ChannelAlreadyExists
318 ),
319 "should fail when channel exists"
320 );
321
322 Ok(())
323 }
324
325 #[tokio::test]
326 async fn test_should_not_open_channel_to_self() -> anyhow::Result<()> {
327 let stake = 10_u32.into();
328
329 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
330 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
331 init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), None).await?;
332
333 let tx_queue = ActionQueue::new(
334 node_db.clone(),
335 MockActionState::new(),
336 MockTransactionExecutor::new(),
337 Default::default(),
338 );
339
340 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
341
342 assert!(
343 matches!(
344 actions
345 .open_channel(*ALICE, stake)
346 .await
347 .err()
348 .expect("should be an error"),
349 ChainActionsError::InvalidArguments(_)
350 ),
351 "should not create channel to self"
352 );
353 Ok(())
354 }
355
356 #[tokio::test]
357 async fn test_should_not_open_channel_with_too_big_stake() -> anyhow::Result<()> {
358 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
359 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
360 init_db(&db, U256::max_value().into(), U256::max_value().into(), None).await?;
361
362 let tx_queue = ActionQueue::new(
363 node_db.clone(),
364 MockActionState::new(),
365 MockTransactionExecutor::new(),
366 Default::default(),
367 );
368
369 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
370
371 assert!(
372 matches!(
373 actions
374 .open_channel(*BOB, (ChannelEntry::MAX_CHANNEL_BALANCE + 1).into())
375 .await
376 .err()
377 .expect("should be an error"),
378 ChainActionsError::InvalidChannelStake
379 ),
380 "should not create channel with too big stake"
381 );
382 Ok(())
383 }
384
385 #[tokio::test]
386 async fn test_should_not_open_if_not_enough_allowance() -> anyhow::Result<()> {
387 let stake = 10_000_u32.into();
388
389 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
390 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
391 init_db(&db, 5_000_000_u64.into(), 1_000_u64.into(), None).await?;
392
393 let tx_queue = ActionQueue::new(
394 node_db.clone(),
395 MockActionState::new(),
396 MockTransactionExecutor::new(),
397 Default::default(),
398 );
399
400 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
401
402 assert!(
403 matches!(
404 actions
405 .open_channel(*BOB, stake)
406 .await
407 .err()
408 .expect("should be an error"),
409 ChainActionsError::NotEnoughAllowance
410 ),
411 "should fail when not enough allowance"
412 );
413 Ok(())
414 }
415
416 #[tokio::test]
417 async fn test_should_not_open_if_not_enough_token_balance() -> anyhow::Result<()> {
418 let stake = 10_000_u32.into();
419
420 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
421 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
422 init_db(&db, 1_u64.into(), 10_000_000_u64.into(), None).await?;
423
424 let tx_queue = ActionQueue::new(
425 node_db.clone(),
426 MockActionState::new(),
427 MockTransactionExecutor::new(),
428 Default::default(),
429 );
430
431 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
432
433 assert!(
434 matches!(
435 actions
436 .open_channel(*BOB, stake)
437 .await
438 .err()
439 .expect("should be an error"),
440 ChainActionsError::BalanceTooLow
441 ),
442 "should fail when not enough token balance"
443 );
444 Ok(())
445 }
446
447 #[tokio::test]
448 async fn test_fund_channel() -> anyhow::Result<()> {
449 let stake = 10_u32.into();
450 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
451 let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
452
453 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
454 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
455 init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), Some(channel)).await?;
456
457 let mut tx_exec = MockTransactionExecutor::new();
458 tx_exec
459 .expect_fund_channel()
460 .times(1)
461 .withf(move |dest, balance| channel.destination.eq(dest) && stake.eq(balance))
462 .returning(move |_, _| Ok(random_hash));
463
464 let mut indexer_action_tracker = MockActionState::new();
465 indexer_action_tracker
466 .expect_register_expectation()
467 .once()
468 .returning(move |_| {
469 Ok(futures::future::ok(SignificantChainEvent {
470 tx_hash: random_hash,
471 event_type: ChainEventType::ChannelBalanceIncreased(channel, stake),
472 })
473 .boxed())
474 });
475
476 let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
477 let tx_sender = tx_queue.new_sender();
478 tokio::task::spawn(async move {
479 tx_queue.start().await;
480 });
481
482 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_sender.clone());
483
484 let tx_res = actions.fund_channel(channel.get_id(), stake).await?.await?;
485
486 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
487 assert!(
488 matches!(tx_res.action, Action::FundChannel(_, _)),
489 "must be open channel action"
490 );
491 assert!(
492 matches!(tx_res.event, Some(ChainEventType::ChannelBalanceIncreased(_, _))),
493 "must correspond to channel chain event"
494 );
495 Ok(())
496 }
497
498 #[tokio::test]
499 async fn test_fund_channel_should_not_over_fund() -> anyhow::Result<()> {
500 let channel = ChannelEntry::new(
501 *ALICE,
502 *BOB,
503 HoprBalance::from(ChannelEntry::MAX_CHANNEL_BALANCE),
504 U256::zero(),
505 ChannelStatus::Open,
506 U256::zero(),
507 );
508
509 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
510 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
511 init_db(&db, U256::max_value().into(), U256::max_value().into(), Some(channel)).await?;
512
513 let tx_queue = ActionQueue::new(
514 node_db.clone(),
515 MockActionState::new(),
516 MockTransactionExecutor::new(),
517 Default::default(),
518 );
519
520 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
521
522 assert!(
523 matches!(
524 actions
525 .fund_channel(channel.get_id(), 1.into())
526 .await
527 .err()
528 .expect("should be an error"),
529 ChainActionsError::InvalidChannelStake
530 ),
531 "should fail channel stake is too high"
532 );
533 Ok(())
534 }
535
536 #[tokio::test]
537 async fn test_should_not_fund_nonexistent_channel() -> anyhow::Result<()> {
538 let channel_id = generate_channel_id(&ALICE, &BOB);
539
540 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
541 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
542 init_db(&db, 5_000_000_u64.into(), 10_000_000_u64.into(), None).await?;
543
544 let tx_queue = ActionQueue::new(
545 node_db.clone(),
546 MockActionState::new(),
547 MockTransactionExecutor::new(),
548 Default::default(),
549 );
550
551 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
552 let stake = 10_u32.into();
553 assert!(
554 matches!(
555 actions
556 .fund_channel(channel_id, stake)
557 .await
558 .err()
559 .expect("should be an error"),
560 ChainActionsError::ChannelDoesNotExist
561 ),
562 "should fail when channel does not exist"
563 );
564 Ok(())
565 }
566
567 #[tokio::test]
568 async fn test_should_not_fund_if_not_enough_allowance() -> anyhow::Result<()> {
569 let channel_id = generate_channel_id(&ALICE, &BOB);
570
571 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
572 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
573 init_db(&db, 5_000_000_u64.into(), 1_000_u64.into(), None).await?;
574
575 let tx_queue = ActionQueue::new(
576 node_db.clone(),
577 MockActionState::new(),
578 MockTransactionExecutor::new(),
579 Default::default(),
580 );
581
582 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
583 let stake = 10_000_u32.into();
584 assert!(
585 matches!(
586 actions
587 .fund_channel(channel_id, stake)
588 .await
589 .err()
590 .expect("should be an error"),
591 ChainActionsError::NotEnoughAllowance
592 ),
593 "should fail when not enough allowance"
594 );
595 Ok(())
596 }
597
598 #[tokio::test]
599 async fn test_should_not_fund_if_not_enough_balance() -> anyhow::Result<()> {
600 let channel_id = generate_channel_id(&ALICE, &BOB);
601
602 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
603 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
604 init_db(&db, 1_u64.into(), 100_000_u64.into(), None).await?;
605
606 let tx_queue = ActionQueue::new(
607 node_db.clone(),
608 MockActionState::new(),
609 MockTransactionExecutor::new(),
610 Default::default(),
611 );
612
613 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
614 let stake = 10_000_u32.into();
615 assert!(
616 matches!(
617 actions
618 .fund_channel(channel_id, stake)
619 .await
620 .err()
621 .expect("should be an error"),
622 ChainActionsError::BalanceTooLow
623 ),
624 "should fail when not enough balance"
625 );
626 Ok(())
627 }
628
629 #[tokio::test]
630 async fn test_close_channel_outgoing() -> anyhow::Result<()> {
631 let stake = 10_u32.into();
632 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
633
634 let mut channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Open, U256::zero());
635
636 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
637 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
638 init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
639
640 let mut tx_exec = MockTransactionExecutor::new();
641 let mut seq = Sequence::new();
642 tx_exec
643 .expect_initiate_outgoing_channel_closure()
644 .times(1)
645 .in_sequence(&mut seq)
646 .withf(move |dst| BOB.eq(dst))
647 .returning(move |_| Ok(random_hash));
648
649 tx_exec
650 .expect_finalize_outgoing_channel_closure()
651 .times(1)
652 .in_sequence(&mut seq)
653 .withf(move |dst| BOB.eq(dst))
654 .returning(move |_| Ok(random_hash));
655
656 let mut indexer_action_tracker = MockActionState::new();
657 let mut seq2 = Sequence::new();
658 indexer_action_tracker
659 .expect_register_expectation()
660 .once()
661 .in_sequence(&mut seq2)
662 .returning(move |_| {
663 Ok(futures::future::ok(SignificantChainEvent {
664 tx_hash: random_hash,
665 event_type: ChainEventType::ChannelClosureInitiated(channel),
666 })
667 .boxed())
668 });
669
670 indexer_action_tracker
671 .expect_register_expectation()
672 .once()
673 .in_sequence(&mut seq2)
674 .returning(move |_| {
675 Ok(futures::future::ok(SignificantChainEvent {
676 tx_hash: random_hash,
677 event_type: ChainEventType::ChannelClosed(channel),
678 })
679 .boxed())
680 });
681
682 let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
683 let tx_sender = tx_queue.new_sender();
684 tokio::task::spawn(async move {
685 tx_queue.start().await;
686 });
687
688 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_sender.clone());
689
690 let tx_res = actions.close_channel(channel.clone()).await?.await?;
691
692 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
693 assert!(
694 matches!(tx_res.action, Action::CloseChannel(_, _)),
695 "must be close channel action"
696 );
697 assert!(
698 matches!(tx_res.event, Some(ChainEventType::ChannelClosureInitiated(_))),
699 "must correspond to channel chain event"
700 );
701
702 channel.status = ChannelStatus::PendingToClose(SystemTime::now().sub(Duration::from_secs(10)));
704
705 db.upsert_channel(None, channel).await?;
706
707 let tx_res = actions.close_channel(channel.clone()).await?.await?;
708
709 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
710 assert!(
711 matches!(tx_res.action, Action::CloseChannel(_, _)),
712 "must be close channel action"
713 );
714 assert!(
715 matches!(tx_res.event, Some(ChainEventType::ChannelClosed(_))),
716 "must correspond to channel chain event"
717 );
718 Ok(())
719 }
720
721 #[tokio::test]
722 async fn test_close_channel_incoming() -> anyhow::Result<()> {
723 let stake = 10_u32.into();
724 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
725
726 let channel = ChannelEntry::new(*BOB, *ALICE, stake, U256::zero(), ChannelStatus::Open, U256::zero());
727
728 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
729 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
730 init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
731
732 let mut tx_exec = MockTransactionExecutor::new();
733 let mut seq = Sequence::new();
734 tx_exec
735 .expect_close_incoming_channel()
736 .times(1)
737 .in_sequence(&mut seq)
738 .withf(move |dst| BOB.eq(dst))
739 .returning(move |_| Ok(random_hash));
740
741 let mut indexer_action_tracker = MockActionState::new();
742 indexer_action_tracker
743 .expect_register_expectation()
744 .returning(move |_| {
745 Ok(futures::future::ok(SignificantChainEvent {
746 tx_hash: random_hash,
747 event_type: ChainEventType::ChannelClosed(channel),
748 })
749 .boxed())
750 });
751
752 let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
753 let tx_sender = tx_queue.new_sender();
754 tokio::task::spawn(async move {
755 tx_queue.start().await;
756 });
757
758 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_sender.clone());
759
760 let tx_res = actions.close_channel(channel.clone()).await?.await?;
761
762 assert_eq!(tx_res.tx_hash, random_hash, "tx hashes must be equal");
763 assert!(
764 matches!(tx_res.action, Action::CloseChannel(_, _)),
765 "must be close channel action"
766 );
767 assert!(
768 matches!(tx_res.event, Some(ChainEventType::ChannelClosed(_))),
769 "must correspond to channel chain event"
770 );
771 Ok(())
772 }
773
774 #[tokio::test]
775 async fn test_should_not_close_when_closure_time_did_not_elapse() -> anyhow::Result<()> {
776 let stake = 10_u32.into();
777
778 let channel = ChannelEntry::new(
779 *ALICE,
780 *BOB,
781 stake,
782 U256::zero(),
783 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100))),
784 U256::zero(),
785 );
786
787 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
788 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
789 init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
790
791 let tx_queue = ActionQueue::new(
792 node_db.clone(),
793 MockActionState::new(),
794 MockTransactionExecutor::new(),
795 Default::default(),
796 );
797
798 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
799
800 assert!(
801 matches!(
802 actions
803 .close_channel(channel.clone())
804 .await
805 .err()
806 .expect("should be an error"),
807 ChainActionsError::ClosureTimeHasNotElapsed(_)
808 ),
809 "should fail when the channel closure period did not elapse"
810 );
811 Ok(())
812 }
813
814 #[tokio::test]
815 async fn test_should_not_close_foreign_channel() -> anyhow::Result<()> {
816 let channel = ChannelEntry::new(
817 Address::from([0x01; 20]),
818 Address::from([0x02; 20]),
819 10_u32.into(),
820 U256::zero(),
821 ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100))),
822 U256::zero(),
823 );
824
825 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
826 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
827 init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), None).await?;
828
829 let tx_queue = ActionQueue::new(
830 node_db.clone(),
831 MockActionState::new(),
832 MockTransactionExecutor::new(),
833 Default::default(),
834 );
835 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
836
837 assert!(
838 matches!(
839 actions.close_channel(channel).await.err().expect("should be an error"),
840 ChainActionsError::ChannelDoesNotExist
841 ),
842 "should fail when channel does not exist"
843 );
844 Ok(())
845 }
846
847 #[tokio::test]
848 async fn test_should_not_close_closed_channel() -> anyhow::Result<()> {
849 let stake = 10_u32.into();
850 let channel = ChannelEntry::new(*ALICE, *BOB, stake, U256::zero(), ChannelStatus::Closed, U256::zero());
851
852 let db = HoprIndexerDb::new_in_memory(ALICE_KP.clone()).await?;
853 let node_db = HoprNodeDb::new_in_memory(ALICE_KP.clone()).await?;
854 init_db(&db, 5_000_000_u64.into(), 1000_u64.into(), Some(channel)).await?;
855
856 let tx_queue = ActionQueue::new(
857 node_db.clone(),
858 MockActionState::new(),
859 MockTransactionExecutor::new(),
860 Default::default(),
861 );
862
863 let actions = ChainActions::new(&ALICE_KP, db.clone(), node_db.clone(), tx_queue.new_sender());
864
865 assert!(
866 matches!(
867 actions
868 .close_channel(channel.clone())
869 .await
870 .err()
871 .expect("should be an error"),
872 ChainActionsError::ChannelAlreadyClosed
873 ),
874 "should fail when channel is already closed"
875 );
876 Ok(())
877 }
878}