1use async_trait::async_trait;
26use futures::StreamExt;
27use hopr_chain_types::actions::Action;
28use hopr_crypto_types::types::Hash;
29use hopr_db_sql::{
30 api::{
31 info::DomainSeparator,
32 tickets::{HoprDbTicketOperations, TicketSelector},
33 },
34 channels::HoprDbChannelOperations,
35 prelude::HoprDbInfoOperations,
36};
37use hopr_internal_types::prelude::*;
38use hopr_primitive_types::prelude::*;
39use tracing::{debug, error, info, warn};
40
41use crate::{
42 ChainActions,
43 action_queue::PendingAction,
44 errors::{
45 ChainActionsError::{ChannelDoesNotExist, InvalidState, OldTicket, WrongTicketState},
46 Result,
47 },
48};
49
50lazy_static::lazy_static! {
51 static ref EMPTY_TX_HASH: Hash = Hash::default();
53}
54
55#[async_trait]
57pub trait TicketRedeemActions {
58 async fn redeem_all_tickets(&self, min_value: HoprBalance, only_aggregated: bool) -> Result<Vec<PendingAction>>;
60
61 async fn redeem_tickets_with_counterparty(
63 &self,
64 counterparty: &Address,
65 min_value: HoprBalance,
66 only_aggregated: bool,
67 ) -> Result<Vec<PendingAction>>;
68
69 async fn redeem_tickets_in_channel(
71 &self,
72 channel: &ChannelEntry,
73 min_value: HoprBalance,
74 only_aggregated: bool,
75 ) -> Result<Vec<PendingAction>>;
76
77 async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>>;
79
80 async fn redeem_ticket(&self, ack: AcknowledgedTicket) -> Result<PendingAction>;
83}
84
85#[async_trait]
86impl<Db> TicketRedeemActions for ChainActions<Db>
87where
88 Db: HoprDbChannelOperations + HoprDbTicketOperations + HoprDbInfoOperations + Clone + Send + Sync + std::fmt::Debug,
89{
90 #[tracing::instrument(level = "debug", skip(self))]
91 async fn redeem_all_tickets(&self, min_value: HoprBalance, only_aggregated: bool) -> Result<Vec<PendingAction>> {
92 let incoming_channels = self
93 .db
94 .get_channels_via(None, ChannelDirection::Incoming, &self.self_address())
95 .await?;
96 debug!(
97 channel_count = incoming_channels.len(),
98 "starting to redeem all tickets in channels to self"
99 );
100
101 let mut receivers: Vec<PendingAction> = vec![];
102
103 for incoming_channel in incoming_channels {
105 match self
106 .redeem_tickets_in_channel(&incoming_channel, min_value, only_aggregated)
107 .await
108 {
109 Ok(mut successful_txs) => {
110 receivers.append(&mut successful_txs);
111 }
112 Err(e) => {
113 warn!(
114 channel = %generate_channel_id(&incoming_channel.source, &incoming_channel.destination),
115 error = %e,
116 "Failed to redeem tickets in channel",
117 );
118 }
119 }
120 }
121
122 Ok(receivers)
123 }
124
125 #[tracing::instrument(level = "debug", skip(self))]
126 async fn redeem_tickets_with_counterparty(
127 &self,
128 counterparty: &Address,
129 min_value: HoprBalance,
130 only_aggregated: bool,
131 ) -> Result<Vec<PendingAction>> {
132 let maybe_channel = self
133 .db
134 .get_channel_by_parties(None, counterparty, &self.self_address(), false)
135 .await?;
136 if let Some(channel) = maybe_channel {
137 self.redeem_tickets_in_channel(&channel, min_value, only_aggregated)
138 .await
139 } else {
140 Err(ChannelDoesNotExist)
141 }
142 }
143
144 #[tracing::instrument(level = "debug", skip(self))]
145 async fn redeem_tickets_in_channel(
146 &self,
147 channel: &ChannelEntry,
148 min_value: HoprBalance,
149 only_aggregated: bool,
150 ) -> Result<Vec<PendingAction>> {
151 self.redeem_tickets(
152 TicketSelector::from(channel)
153 .with_aggregated_only(only_aggregated)
154 .with_index_range(channel.ticket_index.as_u64()..)
155 .with_amount(min_value..)
156 .with_state(AcknowledgedTicketStatus::Untouched),
157 )
158 .await
159 }
160
161 #[tracing::instrument(level = "debug", skip(self))]
162 async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>> {
163 let (count_redeemable_tickets, _) = self.db.get_tickets_value(selector.clone()).await?;
164
165 info!(
166 count_redeemable_tickets, %selector,
167 "acknowledged tickets in channel that can be redeemed"
168 );
169
170 if count_redeemable_tickets == 0 {
172 return Ok(vec![]);
173 }
174
175 let channel_dst = self
176 .db
177 .get_indexer_data(None)
178 .await?
179 .domain_separator(DomainSeparator::Channel)
180 .ok_or(InvalidState("missing channel dst".into()))?;
181
182 let selector_id = selector.to_string();
183
184 let redeem_stream = self
186 .db
187 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
188 .await?
189 .collect::<Vec<_>>()
190 .await;
191
192 let mut receivers: Vec<PendingAction> = vec![];
193 for ack_ticket in redeem_stream {
194 let ticket_id = ack_ticket.to_string();
195
196 if let Ok(redeemable) = ack_ticket.into_redeemable(&self.chain_key, &channel_dst) {
197 let action = self.tx_sender.send(Action::RedeemTicket(redeemable)).await;
198 match action {
199 Ok(successful_tx) => {
200 receivers.push(successful_tx);
201 }
202 Err(e) => {
203 error!(ticket_id, error = %e, "Failed to submit transaction that redeems ticket",);
204 }
205 }
206 } else {
207 error!("failed to extract redeemable ticket");
208 }
209 }
210
211 info!(
212 count = receivers.len(),
213 selector = selector_id,
214 "acknowledged tickets were submitted to redeem in channel",
215 );
216
217 Ok(receivers)
218 }
219
220 #[tracing::instrument(level = "debug", skip(self))]
221 async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<PendingAction> {
222 if let Some(channel) = self
223 .db
224 .get_channel_by_id(None, &ack_ticket.verified_ticket().channel_id)
225 .await?
226 {
227 if ack_ticket.verified_ticket().index < channel.ticket_index.as_u64() {
230 return Err(OldTicket);
231 }
232
233 debug!(%ack_ticket, %channel, "redeeming single ticket");
234
235 let selector = TicketSelector::from(&channel)
236 .with_index(ack_ticket.verified_ticket().index)
237 .with_state(AcknowledgedTicketStatus::Untouched);
238
239 let maybe_ticket = self
241 .db
242 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
243 .await?
244 .next()
245 .await;
246
247 if let Some(ticket) = maybe_ticket {
248 let channel_dst = self
249 .db
250 .get_indexer_data(None)
251 .await?
252 .domain_separator(DomainSeparator::Channel)
253 .ok_or(InvalidState("missing channel dst".into()))?;
254
255 let redeemable = ticket.into_redeemable(&self.chain_key, &channel_dst)?;
256
257 debug!(%ack_ticket, "ticket is redeemable");
258 Ok(self.tx_sender.send(Action::RedeemTicket(redeemable)).await?)
259 } else {
260 Err(WrongTicketState(ack_ticket.to_string()))
261 }
262 } else {
263 Err(ChannelDoesNotExist)
264 }
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use futures::FutureExt;
271 use hex_literal::hex;
272 use hopr_chain_types::chain_events::{ChainEventType::TicketRedeemed, SignificantChainEvent};
273 use hopr_crypto_random::{Randomizable, random_bytes};
274 use hopr_crypto_types::prelude::*;
275 use hopr_db_sql::{
276 HoprDbGeneralModelOperations, TargetDb, api::info::DomainSeparator, db::HoprDb, errors::DbSqlError,
277 info::HoprDbInfoOperations,
278 };
279
280 use super::*;
281 use crate::{
282 action_queue::{ActionQueue, MockTransactionExecutor},
283 action_state::MockActionState,
284 };
285
286 lazy_static::lazy_static! {
287 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
288 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be constructible");
289 static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
290 }
291
292 const PRICE_PER_PACKET: u128 = 10000000000000000u128; fn generate_random_ack_ticket(
295 idx: u64,
296 counterparty: &ChainKeypair,
297 channel_epoch: u32,
298 ) -> anyhow::Result<AcknowledgedTicket> {
299 let hk1 = HalfKey::random();
300 let hk2 = HalfKey::random();
301
302 let resp = Response::from_half_keys(&hk1, &hk2)?;
303
304 Ok(TicketBuilder::default()
305 .addresses(counterparty, &*ALICE)
306 .amount(U256::from(PRICE_PER_PACKET).div_f64(1.0f64)? * 5u32)
307 .index(idx)
308 .index_offset(1)
309 .win_prob(WinningProbability::ALWAYS)
310 .channel_epoch(channel_epoch)
311 .challenge(resp.to_challenge()?)
312 .build_signed(counterparty, &Hash::default())?
313 .into_acknowledged(resp))
314 }
315
316 async fn create_channel_with_ack_tickets(
317 db: HoprDb,
318 ticket_count: usize,
319 counterparty: &ChainKeypair,
320 channel_epoch: u32,
321 ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
322 let ckp = counterparty.clone();
323 let db_clone = db.clone();
324 let channel = db
325 .begin_transaction()
326 .await?
327 .perform(|tx| {
328 Box::pin(async move {
329 db_clone
330 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
331 .await?;
332
333 let channel = ChannelEntry::new(
334 ckp.public().to_address(),
335 ALICE.public().to_address(),
336 0.into(),
337 U256::zero(),
338 ChannelStatus::Open,
339 channel_epoch.into(),
340 );
341 db_clone.upsert_channel(Some(tx), channel).await?;
342 Ok::<_, DbSqlError>(channel)
343 })
344 })
345 .await?;
346
347 let ckp = counterparty.clone();
348 let input_tickets = db
349 .begin_transaction_in_db(TargetDb::Tickets)
350 .await?
351 .perform(|tx| {
352 Box::pin(async move {
353 let mut input_tickets = Vec::new();
354 for i in 0..ticket_count {
355 let ack_ticket = generate_random_ack_ticket(i as u64, &ckp, channel_epoch)
356 .map_err(|e| hopr_db_sql::errors::DbSqlError::LogicalError(e.to_string()))?;
357 db.upsert_ticket(Some(tx), ack_ticket.clone()).await?;
358 input_tickets.push(ack_ticket);
359 }
360 Ok::<_, DbSqlError>(input_tickets)
361 })
362 })
363 .await?;
364
365 Ok((channel, input_tickets))
366 }
367
368 #[tokio::test]
369 async fn test_ticket_redeem_flow() -> anyhow::Result<()> {
370 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
371
372 let ticket_count = 5;
373 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
374
375 let (channel_from_bob, bob_tickets) =
377 create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
378 let (channel_from_charlie, charlie_tickets) =
379 create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
380
381 let resp = Response::from_half_keys(&HalfKey::random(), &HalfKey::random())?;
383 let low_value_ack_ticket = TicketBuilder::default()
384 .addresses(&*CHARLIE, &*ALICE)
385 .amount(PRICE_PER_PACKET)
386 .index((ticket_count + 1) as u64)
387 .index_offset(1)
388 .win_prob(WinningProbability::ALWAYS)
389 .channel_epoch(4u32)
390 .challenge(resp.to_challenge()?)
391 .build_signed(&CHARLIE, &Hash::default())?
392 .into_acknowledged(resp);
393 db.upsert_ticket(None, low_value_ack_ticket).await?;
394
395 let mut indexer_action_tracker = MockActionState::new();
396 let mut seq2 = mockall::Sequence::new();
397
398 for tkt in bob_tickets.iter().cloned() {
399 indexer_action_tracker
400 .expect_register_expectation()
401 .once()
402 .in_sequence(&mut seq2)
403 .return_once(move |_| {
404 Ok(futures::future::ok(SignificantChainEvent {
405 tx_hash: random_hash,
406 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
407 })
408 .boxed())
409 });
410 }
411
412 for tkt in charlie_tickets.iter().cloned() {
413 indexer_action_tracker
414 .expect_register_expectation()
415 .once()
416 .in_sequence(&mut seq2)
417 .return_once(move |_| {
418 Ok(futures::future::ok(SignificantChainEvent {
419 tx_hash: random_hash,
420 event_type: TicketRedeemed(channel_from_charlie, Some(tkt)),
421 })
422 .boxed())
423 });
424 }
425
426 let mut tx_exec = MockTransactionExecutor::new();
427 let mut seq = mockall::Sequence::new();
428
429 tx_exec
431 .expect_redeem_ticket()
432 .times(ticket_count)
433 .in_sequence(&mut seq)
434 .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
435 .returning(move |_| Ok(random_hash));
436
437 tx_exec
439 .expect_redeem_ticket()
440 .times(ticket_count)
441 .in_sequence(&mut seq)
442 .withf(move |t| charlie_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
443 .returning(move |_| Ok(random_hash));
444
445 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
447 let tx_sender = tx_queue.new_sender();
448 tokio::task::spawn(async move {
449 tx_queue.start().await;
450 });
451
452 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
453
454 let confirmations = futures::future::try_join_all(
456 actions
457 .redeem_all_tickets((PRICE_PER_PACKET * 5).into(), false)
458 .await?
459 .into_iter(),
460 )
461 .await?;
462
463 assert_eq!(2 * ticket_count, confirmations.len(), "must have all confirmations");
464 assert!(
465 confirmations.into_iter().all(|c| c.tx_hash == random_hash),
466 "tx hashes must be equal"
467 );
468
469 let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
470
471 let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
472
473 assert!(
474 db_acks_bob
475 .into_iter()
476 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
477 "all bob's tickets must be in BeingRedeemed state"
478 );
479 assert!(
480 db_acks_charlie
481 .iter()
482 .take(ticket_count)
483 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
484 "all valuable charlie's tickets must be in BeingRedeemed state"
485 );
486 assert!(
487 db_acks_charlie
488 .iter()
489 .skip(ticket_count)
490 .all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
491 "all non-valuable charlie's tickets must be in Untouched state"
492 );
493
494 Ok(())
495 }
496
497 #[tokio::test]
498 async fn test_ticket_redeem_in_channel() -> anyhow::Result<()> {
499 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
500
501 let ticket_count = 5;
502 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
503
504 let (mut channel_from_bob, bob_tickets) =
506 create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
507 let (channel_from_charlie, _) =
508 create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
509
510 channel_from_bob.ticket_index = 1_u32.into();
512 db.upsert_channel(None, channel_from_bob).await?;
513
514 let mut indexer_action_tracker = MockActionState::new();
515 let mut seq2 = mockall::Sequence::new();
516
517 for tkt in bob_tickets.iter().skip(1).cloned() {
519 indexer_action_tracker
520 .expect_register_expectation()
521 .once()
522 .in_sequence(&mut seq2)
523 .return_once(move |_| {
524 Ok(futures::future::ok(SignificantChainEvent {
525 tx_hash: random_hash,
526 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
527 })
528 .boxed())
529 });
530 }
531
532 let mut tx_exec = MockTransactionExecutor::new();
533 let mut seq = mockall::Sequence::new();
534
535 tx_exec
537 .expect_redeem_ticket()
538 .times(ticket_count - 1)
539 .in_sequence(&mut seq)
540 .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
541 .returning(move |_| Ok(random_hash));
542
543 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
545 let tx_sender = tx_queue.new_sender();
546 tokio::task::spawn(async move {
547 tx_queue.start().await;
548 });
549
550 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
551
552 let confirmations = futures::future::try_join_all(
553 actions
554 .redeem_tickets_with_counterparty(&BOB.public().to_address(), 0.into(), false)
555 .await?
556 .into_iter(),
557 )
558 .await?;
559
560 assert_eq!(ticket_count - 1, confirmations.len(), "must have all confirmations");
562 assert!(
563 confirmations.into_iter().all(|c| c.tx_hash == random_hash),
564 "tx hashes must be equal"
565 );
566
567 let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
568
569 let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
570
571 assert!(
572 db_acks_bob
573 .into_iter()
574 .take_while(|tkt| tkt.verified_ticket().index != 0)
575 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
576 "all bob's tickets must be in BeingRedeemed state"
577 );
578 assert!(
579 db_acks_charlie
580 .into_iter()
581 .all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
582 "all charlie's tickets must be in Untouched state"
583 );
584
585 Ok(())
586 }
587
588 #[tokio::test]
589 async fn test_redeem_must_not_work_for_tickets_being_aggregated_and_being_redeemed() -> anyhow::Result<()> {
590 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
591
592 let ticket_count = 3;
593 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
594
595 let (channel_from_bob, mut tickets) =
596 create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
597
598 tickets[0].status = AcknowledgedTicketStatus::BeingAggregated;
600 let selector = TicketSelector::from(&tickets[0]).with_no_state();
601 db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingAggregated)
602 .await?;
603
604 tickets[1].status = AcknowledgedTicketStatus::BeingRedeemed;
606 let selector = TicketSelector::from(&tickets[1]).with_no_state();
607 db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingRedeemed)
608 .await?;
609
610 let tickets_clone = tickets.clone();
612 let mut tx_exec = MockTransactionExecutor::new();
613 tx_exec
614 .expect_redeem_ticket()
615 .times(ticket_count - 2)
616 .withf(move |t| tickets_clone[2..].iter().any(|tk| tk.ticket.eq(&t.ticket)))
617 .returning(move |_| Ok(random_hash));
618
619 let mut indexer_action_tracker = MockActionState::new();
620 for tkt in tickets.iter().skip(2).cloned() {
621 indexer_action_tracker
622 .expect_register_expectation()
623 .once()
624 .return_once(move |_| {
625 Ok(futures::future::ok(SignificantChainEvent {
626 tx_hash: random_hash,
627 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
628 })
629 .boxed())
630 });
631 }
632
633 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
635 let tx_sender = tx_queue.new_sender();
636 tokio::task::spawn(async move {
637 tx_queue.start().await;
638 });
639
640 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
641
642 let confirmations = futures::future::try_join_all(
643 actions
644 .redeem_tickets_in_channel(&channel_from_bob, 0.into(), false)
645 .await?
646 .into_iter(),
647 )
648 .await?;
649
650 assert_eq!(
651 ticket_count - 2,
652 confirmations.len(),
653 "must redeem only redeemable tickets in channel"
654 );
655
656 assert!(
657 actions.redeem_ticket(tickets[0].clone()).await.is_err(),
658 "cannot redeem a ticket that's being aggregated"
659 );
660
661 assert!(
662 actions.redeem_ticket(tickets[1].clone()).await.is_err(),
663 "cannot redeem a ticket that's being redeemed"
664 );
665
666 Ok(())
667 }
668
669 #[tokio::test]
670 async fn test_redeem_must_not_work_for_tickets_of_previous_epoch_being_aggregated_and_being_redeemed()
671 -> anyhow::Result<()> {
672 let ticket_count = 3;
673 let ticket_from_previous_epoch_count = 2;
674 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
675 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
676
677 let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
679
680 let ticket = generate_random_ack_ticket(0, &BOB, 3)?;
682 db.upsert_ticket(None, ticket.clone()).await?;
683 tickets.insert(0, ticket);
684
685 let ticket = generate_random_ack_ticket(1, &BOB, 3)?;
686 db.upsert_ticket(None, ticket.clone()).await?;
687 tickets.insert(1, ticket);
688
689 let tickets_clone = tickets.clone();
690 let mut tx_exec = MockTransactionExecutor::new();
691 tx_exec
692 .expect_redeem_ticket()
693 .times(ticket_count - ticket_from_previous_epoch_count)
694 .withf(move |t| {
695 tickets_clone[ticket_from_previous_epoch_count..]
696 .iter()
697 .any(|tk| tk.ticket.eq(&t.ticket))
698 })
699 .returning(move |_| Ok(random_hash));
700
701 let mut indexer_action_tracker = MockActionState::new();
702 for tkt in tickets.iter().skip(ticket_from_previous_epoch_count).cloned() {
703 indexer_action_tracker
704 .expect_register_expectation()
705 .once()
706 .return_once(move |_| {
707 Ok(futures::future::ok(SignificantChainEvent {
708 tx_hash: random_hash,
709 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
710 })
711 .boxed())
712 });
713 }
714
715 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
717 let tx_sender = tx_queue.new_sender();
718 tokio::task::spawn(async move {
719 tx_queue.start().await;
720 });
721
722 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
723
724 futures::future::join_all(
725 actions
726 .redeem_tickets_in_channel(&channel_from_bob, 0.into(), false)
727 .await?
728 .into_iter(),
729 )
730 .await;
731
732 assert!(
733 actions.redeem_ticket(tickets[0].clone()).await.is_err(),
734 "cannot redeem a ticket that's from the previous epoch"
735 );
736
737 Ok(())
738 }
739
740 #[tokio::test]
741 async fn test_redeem_must_not_work_for_tickets_of_next_epoch_being_redeemed() -> anyhow::Result<()> {
742 let ticket_count = 3;
743 let ticket_from_next_epoch_count = 2;
744 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
745 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
746
747 let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
749
750 let ticket = generate_random_ack_ticket(0, &BOB, 5)?;
752 db.upsert_ticket(None, ticket.clone()).await?;
753 tickets.insert(0, ticket);
754
755 let ticket = generate_random_ack_ticket(1, &BOB, 5)?;
756 db.upsert_ticket(None, ticket.clone()).await?;
757 tickets.insert(1, ticket);
758
759 let tickets_clone = tickets.clone();
760 let mut tx_exec = MockTransactionExecutor::new();
761 tx_exec
762 .expect_redeem_ticket()
763 .times(ticket_count - ticket_from_next_epoch_count)
764 .withf(move |t| {
765 tickets_clone[ticket_from_next_epoch_count..]
766 .iter()
767 .any(|tk| tk.ticket.eq(&t.ticket))
768 })
769 .returning(move |_| Ok(random_hash));
770
771 let mut indexer_action_tracker = MockActionState::new();
772 for tkt in tickets.iter().skip(ticket_from_next_epoch_count).cloned() {
773 indexer_action_tracker
774 .expect_register_expectation()
775 .once()
776 .return_once(move |_| {
777 Ok(futures::future::ok(SignificantChainEvent {
778 tx_hash: random_hash,
779 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
780 })
781 .boxed())
782 });
783 }
784
785 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
787 let tx_sender = tx_queue.new_sender();
788 tokio::task::spawn(async move {
789 tx_queue.start().await;
790 });
791
792 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
793
794 futures::future::join_all(
795 actions
796 .redeem_tickets_in_channel(&channel_from_bob, 0.into(), false)
797 .await?
798 .into_iter(),
799 )
800 .await;
801
802 for ticket in tickets.iter().take(ticket_from_next_epoch_count) {
803 assert!(
804 actions.redeem_ticket(ticket.clone()).await.is_err(),
805 "cannot redeem a ticket that's from the next epoch"
806 );
807 }
808
809 Ok(())
810 }
811
812 #[tokio::test]
813 async fn test_should_redeem_single_ticket() -> anyhow::Result<()> {
814 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
815 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
816
817 let (channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
818
819 let ticket = tickets.into_iter().next().unwrap();
820
821 let mut tx_exec = MockTransactionExecutor::new();
822 let ticket_clone = ticket.clone();
823 tx_exec
824 .expect_redeem_ticket()
825 .once()
826 .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
827 .returning(move |_| Ok(random_hash));
828
829 let mut indexer_action_tracker = MockActionState::new();
830 let ticket_clone = ticket.clone();
831 indexer_action_tracker
832 .expect_register_expectation()
833 .once()
834 .return_once(move |_| {
835 Ok(futures::future::ok(SignificantChainEvent {
836 tx_hash: random_hash,
837 event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
838 })
839 .boxed())
840 });
841
842 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
844 let tx_sender = tx_queue.new_sender();
845 tokio::task::spawn(async move {
846 tx_queue.start().await;
847 });
848
849 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
850
851 let confirmation = actions.redeem_ticket(ticket).await?.await?;
852
853 assert_eq!(confirmation.tx_hash, random_hash);
854
855 assert!(
856 db.get_tickets((&channel_from_bob).into())
857 .await?
858 .into_iter()
859 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
860 "all bob's tickets must be in BeingRedeemed state"
861 );
862
863 Ok(())
864 }
865
866 #[tokio::test]
867 async fn test_should_not_redeem_single_ticket_with_lower_index_than_channel_index() -> anyhow::Result<()> {
868 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
869 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
870
871 let (mut channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
872
873 channel_from_bob.ticket_index = 2_u32.into();
874 db.upsert_channel(None, channel_from_bob).await?;
875
876 let ticket = tickets.into_iter().next().unwrap();
877
878 let mut tx_exec = MockTransactionExecutor::new();
879 let ticket_clone = ticket.clone();
880 tx_exec
881 .expect_redeem_ticket()
882 .never()
883 .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
884 .returning(move |_| Ok(random_hash));
885
886 let mut indexer_action_tracker = MockActionState::new();
887 let ticket_clone = ticket.clone();
888 indexer_action_tracker
889 .expect_register_expectation()
890 .never()
891 .return_once(move |_| {
892 Ok(futures::future::ok(SignificantChainEvent {
893 tx_hash: random_hash,
894 event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
895 })
896 .boxed())
897 });
898
899 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
901 let tx_sender = tx_queue.new_sender();
902 tokio::task::spawn(async move {
903 tx_queue.start().await;
904 });
905
906 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
907
908 assert!(matches!(actions.redeem_ticket(ticket).await, Err(OldTicket)));
909
910 Ok(())
911 }
912}