1use async_trait::async_trait;
26use futures::StreamExt;
27use hopr_chain_types::actions::Action;
28use hopr_crypto_types::types::Hash;
29use hopr_db_sql::{
30 api::{
31 info::DomainSeparator,
32 tickets::{HoprDbTicketOperations, TicketSelector},
33 },
34 channels::HoprDbChannelOperations,
35 prelude::HoprDbInfoOperations,
36};
37use hopr_internal_types::prelude::*;
38use hopr_primitive_types::prelude::*;
39use tracing::{debug, error, info, warn};
40
41use crate::{
42 ChainActions,
43 action_queue::PendingAction,
44 errors::{
45 ChainActionsError::{ChannelDoesNotExist, InvalidState, OldTicket, WrongTicketState},
46 Result,
47 },
48};
49
50lazy_static::lazy_static! {
51 static ref EMPTY_TX_HASH: Hash = Hash::default();
53}
54
55#[async_trait]
57pub trait TicketRedeemActions {
58 async fn redeem_all_tickets(&self, only_aggregated: bool) -> Result<Vec<PendingAction>>;
60
61 async fn redeem_tickets_with_counterparty(
63 &self,
64 counterparty: &Address,
65 only_aggregated: bool,
66 ) -> Result<Vec<PendingAction>>;
67
68 async fn redeem_tickets_in_channel(
70 &self,
71 channel: &ChannelEntry,
72 only_aggregated: bool,
73 ) -> Result<Vec<PendingAction>>;
74
75 async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>>;
77
78 async fn redeem_ticket(&self, ack: AcknowledgedTicket) -> Result<PendingAction>;
81}
82
83#[async_trait]
84impl<Db> TicketRedeemActions for ChainActions<Db>
85where
86 Db: HoprDbChannelOperations + HoprDbTicketOperations + HoprDbInfoOperations + Clone + Send + Sync + std::fmt::Debug,
87{
88 #[tracing::instrument(level = "debug", skip(self))]
89 async fn redeem_all_tickets(&self, only_aggregated: bool) -> Result<Vec<PendingAction>> {
90 let incoming_channels = self
91 .db
92 .get_channels_via(None, ChannelDirection::Incoming, &self.self_address())
93 .await?;
94 debug!(
95 channel_count = incoming_channels.len(),
96 "starting to redeem all tickets in channels to self"
97 );
98
99 let mut receivers: Vec<PendingAction> = vec![];
100
101 for incoming_channel in incoming_channels {
103 match self.redeem_tickets_in_channel(&incoming_channel, only_aggregated).await {
104 Ok(mut successful_txs) => {
105 receivers.append(&mut successful_txs);
106 }
107 Err(e) => {
108 warn!(
109 channel = %generate_channel_id(&incoming_channel.source, &incoming_channel.destination),
110 error = %e,
111 "Failed to redeem tickets in channel",
112 );
113 }
114 }
115 }
116
117 Ok(receivers)
118 }
119
120 #[tracing::instrument(level = "debug", skip(self))]
121 async fn redeem_tickets_with_counterparty(
122 &self,
123 counterparty: &Address,
124 only_aggregated: bool,
125 ) -> Result<Vec<PendingAction>> {
126 let maybe_channel = self
127 .db
128 .get_channel_by_parties(None, counterparty, &self.self_address(), false)
129 .await?;
130 if let Some(channel) = maybe_channel {
131 self.redeem_tickets_in_channel(&channel, only_aggregated).await
132 } else {
133 Err(ChannelDoesNotExist)
134 }
135 }
136
137 #[tracing::instrument(level = "debug", skip(self))]
138 async fn redeem_tickets_in_channel(
139 &self,
140 channel: &ChannelEntry,
141 only_aggregated: bool,
142 ) -> Result<Vec<PendingAction>> {
143 self.redeem_tickets(
144 TicketSelector::from(channel)
145 .with_aggregated_only(only_aggregated)
146 .with_index_range(channel.ticket_index.as_u64()..)
147 .with_state(AcknowledgedTicketStatus::Untouched),
148 )
149 .await
150 }
151
152 #[tracing::instrument(level = "debug", skip(self))]
153 async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>> {
154 let (count_redeemable_tickets, _) = self.db.get_tickets_value(selector.clone()).await?;
155
156 info!(
157 count_redeemable_tickets, %selector,
158 "acknowledged tickets in channel that can be redeemed"
159 );
160
161 if count_redeemable_tickets == 0 {
163 return Ok(vec![]);
164 }
165
166 let channel_dst = self
167 .db
168 .get_indexer_data(None)
169 .await?
170 .domain_separator(DomainSeparator::Channel)
171 .ok_or(InvalidState("missing channel dst".into()))?;
172
173 let selector_id = selector.to_string();
174
175 let redeem_stream = self
177 .db
178 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
179 .await?
180 .collect::<Vec<_>>()
181 .await;
182
183 let mut receivers: Vec<PendingAction> = vec![];
184 for ack_ticket in redeem_stream {
185 let ticket_id = ack_ticket.to_string();
186
187 if let Ok(redeemable) = ack_ticket.into_redeemable(&self.chain_key, &channel_dst) {
188 let action = self.tx_sender.send(Action::RedeemTicket(redeemable)).await;
189 match action {
190 Ok(successful_tx) => {
191 receivers.push(successful_tx);
192 }
193 Err(e) => {
194 error!(ticket_id, error = %e, "Failed to submit transaction that redeems ticket",);
195 }
196 }
197 } else {
198 error!("failed to extract redeemable ticket");
199 }
200 }
201
202 info!(
203 count = receivers.len(),
204 selector = selector_id,
205 "acknowledged tickets were submitted to redeem in channel",
206 );
207
208 Ok(receivers)
209 }
210
211 #[tracing::instrument(level = "debug", skip(self))]
212 async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<PendingAction> {
213 if let Some(channel) = self
214 .db
215 .get_channel_by_id(None, &ack_ticket.verified_ticket().channel_id)
216 .await?
217 {
218 if ack_ticket.verified_ticket().index < channel.ticket_index.as_u64() {
221 return Err(OldTicket);
222 }
223
224 debug!(%ack_ticket, %channel, "redeeming single ticket");
225
226 let selector = TicketSelector::from(&channel)
227 .with_index(ack_ticket.verified_ticket().index)
228 .with_state(AcknowledgedTicketStatus::Untouched);
229
230 let maybe_ticket = self
232 .db
233 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
234 .await?
235 .next()
236 .await;
237
238 if let Some(ticket) = maybe_ticket {
239 let channel_dst = self
240 .db
241 .get_indexer_data(None)
242 .await?
243 .domain_separator(DomainSeparator::Channel)
244 .ok_or(InvalidState("missing channel dst".into()))?;
245
246 let redeemable = ticket.into_redeemable(&self.chain_key, &channel_dst)?;
247
248 debug!(%ack_ticket, "ticket is redeemable");
249 Ok(self.tx_sender.send(Action::RedeemTicket(redeemable)).await?)
250 } else {
251 Err(WrongTicketState(ack_ticket.to_string()))
252 }
253 } else {
254 Err(ChannelDoesNotExist)
255 }
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use futures::FutureExt;
262 use hex_literal::hex;
263 use hopr_chain_types::chain_events::{ChainEventType::TicketRedeemed, SignificantChainEvent};
264 use hopr_crypto_random::{Randomizable, random_bytes};
265 use hopr_crypto_types::prelude::*;
266 use hopr_db_sql::{
267 HoprDbGeneralModelOperations, TargetDb, api::info::DomainSeparator, db::HoprDb, errors::DbSqlError,
268 info::HoprDbInfoOperations,
269 };
270
271 use super::*;
272 use crate::{
273 action_queue::{ActionQueue, MockTransactionExecutor},
274 action_state::MockActionState,
275 };
276
277 lazy_static::lazy_static! {
278 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
279 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be constructible");
280 static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
281 }
282
283 fn generate_random_ack_ticket(
284 idx: u64,
285 counterparty: &ChainKeypair,
286 channel_epoch: u32,
287 ) -> anyhow::Result<AcknowledgedTicket> {
288 let hk1 = HalfKey::random();
289 let hk2 = HalfKey::random();
290
291 let cp1: CurvePoint = hk1.to_challenge().try_into()?;
292 let cp2: CurvePoint = hk2.to_challenge().try_into()?;
293 let cp_sum = CurvePoint::combine(&[&cp1, &cp2]);
294
295 let price_per_packet: U256 = 10000000000000000u128.into(); Ok(TicketBuilder::default()
298 .addresses(counterparty, &*ALICE)
299 .amount(price_per_packet.div_f64(1.0f64)? * 5u32)
300 .index(idx)
301 .index_offset(1)
302 .win_prob(WinningProbability::ALWAYS)
303 .channel_epoch(channel_epoch)
304 .challenge(Challenge::from(cp_sum).to_ethereum_challenge())
305 .build_signed(counterparty, &Hash::default())?
306 .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
307 }
308
309 async fn create_channel_with_ack_tickets(
310 db: HoprDb,
311 ticket_count: usize,
312 counterparty: &ChainKeypair,
313 channel_epoch: u32,
314 ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
315 let ckp = counterparty.clone();
316 let db_clone = db.clone();
317 let channel = db
318 .begin_transaction()
319 .await?
320 .perform(|tx| {
321 Box::pin(async move {
322 db_clone
323 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
324 .await?;
325
326 let channel = ChannelEntry::new(
327 ckp.public().to_address(),
328 ALICE.public().to_address(),
329 0.into(),
330 U256::zero(),
331 ChannelStatus::Open,
332 channel_epoch.into(),
333 );
334 db_clone.upsert_channel(Some(tx), channel).await?;
335 Ok::<_, DbSqlError>(channel)
336 })
337 })
338 .await?;
339
340 let ckp = counterparty.clone();
341 let input_tickets = db
342 .begin_transaction_in_db(TargetDb::Tickets)
343 .await?
344 .perform(|tx| {
345 Box::pin(async move {
346 let mut input_tickets = Vec::new();
347 for i in 0..ticket_count {
348 let ack_ticket = generate_random_ack_ticket(i as u64, &ckp, channel_epoch)
349 .map_err(|e| hopr_db_sql::errors::DbSqlError::LogicalError(e.to_string()))?;
350 db.upsert_ticket(Some(tx), ack_ticket.clone()).await?;
351 input_tickets.push(ack_ticket);
352 }
353 Ok::<_, DbSqlError>(input_tickets)
354 })
355 })
356 .await?;
357
358 Ok((channel, input_tickets))
359 }
360
361 #[tokio::test]
362 async fn test_ticket_redeem_flow() -> anyhow::Result<()> {
363 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
364
365 let ticket_count = 5;
366 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
367
368 let (channel_from_bob, bob_tickets) =
370 create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
371 let (channel_from_charlie, charlie_tickets) =
372 create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
373
374 let mut indexer_action_tracker = MockActionState::new();
375 let mut seq2 = mockall::Sequence::new();
376
377 for tkt in bob_tickets.iter().cloned() {
378 indexer_action_tracker
379 .expect_register_expectation()
380 .once()
381 .in_sequence(&mut seq2)
382 .return_once(move |_| {
383 Ok(futures::future::ok(SignificantChainEvent {
384 tx_hash: random_hash,
385 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
386 })
387 .boxed())
388 });
389 }
390
391 for tkt in charlie_tickets.iter().cloned() {
392 indexer_action_tracker
393 .expect_register_expectation()
394 .once()
395 .in_sequence(&mut seq2)
396 .return_once(move |_| {
397 Ok(futures::future::ok(SignificantChainEvent {
398 tx_hash: random_hash,
399 event_type: TicketRedeemed(channel_from_charlie, Some(tkt)),
400 })
401 .boxed())
402 });
403 }
404
405 let mut tx_exec = MockTransactionExecutor::new();
406 let mut seq = mockall::Sequence::new();
407
408 tx_exec
410 .expect_redeem_ticket()
411 .times(ticket_count)
412 .in_sequence(&mut seq)
413 .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
414 .returning(move |_| Ok(random_hash));
415
416 tx_exec
418 .expect_redeem_ticket()
419 .times(ticket_count)
420 .in_sequence(&mut seq)
421 .withf(move |t| charlie_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
422 .returning(move |_| Ok(random_hash));
423
424 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
426 let tx_sender = tx_queue.new_sender();
427 tokio::task::spawn(async move {
428 tx_queue.start().await;
429 });
430
431 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
432
433 let confirmations = futures::future::try_join_all(actions.redeem_all_tickets(false).await?.into_iter()).await?;
434
435 assert_eq!(2 * ticket_count, confirmations.len(), "must have all confirmations");
436 assert!(
437 confirmations.into_iter().all(|c| c.tx_hash == random_hash),
438 "tx hashes must be equal"
439 );
440
441 let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
442
443 let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
444
445 assert!(
446 db_acks_bob
447 .into_iter()
448 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
449 "all bob's tickets must be in BeingRedeemed state"
450 );
451 assert!(
452 db_acks_charlie
453 .into_iter()
454 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
455 "all charlie's tickets must be in BeingRedeemed state"
456 );
457
458 Ok(())
459 }
460
461 #[tokio::test]
462 async fn test_ticket_redeem_in_channel() -> anyhow::Result<()> {
463 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
464
465 let ticket_count = 5;
466 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
467
468 let (mut channel_from_bob, bob_tickets) =
470 create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
471 let (channel_from_charlie, _) =
472 create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
473
474 channel_from_bob.ticket_index = 1_u32.into();
476 db.upsert_channel(None, channel_from_bob).await?;
477
478 let mut indexer_action_tracker = MockActionState::new();
479 let mut seq2 = mockall::Sequence::new();
480
481 for tkt in bob_tickets.iter().skip(1).cloned() {
483 indexer_action_tracker
484 .expect_register_expectation()
485 .once()
486 .in_sequence(&mut seq2)
487 .return_once(move |_| {
488 Ok(futures::future::ok(SignificantChainEvent {
489 tx_hash: random_hash,
490 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
491 })
492 .boxed())
493 });
494 }
495
496 let mut tx_exec = MockTransactionExecutor::new();
497 let mut seq = mockall::Sequence::new();
498
499 tx_exec
501 .expect_redeem_ticket()
502 .times(ticket_count - 1)
503 .in_sequence(&mut seq)
504 .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
505 .returning(move |_| Ok(random_hash));
506
507 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
509 let tx_sender = tx_queue.new_sender();
510 tokio::task::spawn(async move {
511 tx_queue.start().await;
512 });
513
514 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
515
516 let confirmations = futures::future::try_join_all(
517 actions
518 .redeem_tickets_with_counterparty(&BOB.public().to_address(), false)
519 .await?
520 .into_iter(),
521 )
522 .await?;
523
524 assert_eq!(ticket_count - 1, confirmations.len(), "must have all confirmations");
526 assert!(
527 confirmations.into_iter().all(|c| c.tx_hash == random_hash),
528 "tx hashes must be equal"
529 );
530
531 let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
532
533 let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
534
535 assert!(
536 db_acks_bob
537 .into_iter()
538 .take_while(|tkt| tkt.verified_ticket().index != 0)
539 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
540 "all bob's tickets must be in BeingRedeemed state"
541 );
542 assert!(
543 db_acks_charlie
544 .into_iter()
545 .all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
546 "all charlie's tickets must be in Untouched state"
547 );
548
549 Ok(())
550 }
551
552 #[tokio::test]
553 async fn test_redeem_must_not_work_for_tickets_being_aggregated_and_being_redeemed() -> anyhow::Result<()> {
554 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
555
556 let ticket_count = 3;
557 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
558
559 let (channel_from_bob, mut tickets) =
560 create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
561
562 tickets[0].status = AcknowledgedTicketStatus::BeingAggregated;
564 let selector = TicketSelector::from(&tickets[0]).with_no_state();
565 db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingAggregated)
566 .await?;
567
568 tickets[1].status = AcknowledgedTicketStatus::BeingRedeemed;
570 let selector = TicketSelector::from(&tickets[1]).with_no_state();
571 db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingRedeemed)
572 .await?;
573
574 let tickets_clone = tickets.clone();
576 let mut tx_exec = MockTransactionExecutor::new();
577 tx_exec
578 .expect_redeem_ticket()
579 .times(ticket_count - 2)
580 .withf(move |t| tickets_clone[2..].iter().any(|tk| tk.ticket.eq(&t.ticket)))
581 .returning(move |_| Ok(random_hash));
582
583 let mut indexer_action_tracker = MockActionState::new();
584 for tkt in tickets.iter().skip(2).cloned() {
585 indexer_action_tracker
586 .expect_register_expectation()
587 .once()
588 .return_once(move |_| {
589 Ok(futures::future::ok(SignificantChainEvent {
590 tx_hash: random_hash,
591 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
592 })
593 .boxed())
594 });
595 }
596
597 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
599 let tx_sender = tx_queue.new_sender();
600 tokio::task::spawn(async move {
601 tx_queue.start().await;
602 });
603
604 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
605
606 let confirmations = futures::future::try_join_all(
607 actions
608 .redeem_tickets_in_channel(&channel_from_bob, false)
609 .await?
610 .into_iter(),
611 )
612 .await?;
613
614 assert_eq!(
615 ticket_count - 2,
616 confirmations.len(),
617 "must redeem only redeemable tickets in channel"
618 );
619
620 assert!(
621 actions.redeem_ticket(tickets[0].clone()).await.is_err(),
622 "cannot redeem a ticket that's being aggregated"
623 );
624
625 assert!(
626 actions.redeem_ticket(tickets[1].clone()).await.is_err(),
627 "cannot redeem a ticket that's being redeemed"
628 );
629
630 Ok(())
631 }
632
633 #[tokio::test]
634 async fn test_redeem_must_not_work_for_tickets_of_previous_epoch_being_aggregated_and_being_redeemed()
635 -> anyhow::Result<()> {
636 let ticket_count = 3;
637 let ticket_from_previous_epoch_count = 2;
638 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
639 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
640
641 let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
643
644 let ticket = generate_random_ack_ticket(0, &BOB, 3)?;
646 db.upsert_ticket(None, ticket.clone()).await?;
647 tickets.insert(0, ticket);
648
649 let ticket = generate_random_ack_ticket(1, &BOB, 3)?;
650 db.upsert_ticket(None, ticket.clone()).await?;
651 tickets.insert(1, ticket);
652
653 let tickets_clone = tickets.clone();
654 let mut tx_exec = MockTransactionExecutor::new();
655 tx_exec
656 .expect_redeem_ticket()
657 .times(ticket_count - ticket_from_previous_epoch_count)
658 .withf(move |t| {
659 tickets_clone[ticket_from_previous_epoch_count..]
660 .iter()
661 .any(|tk| tk.ticket.eq(&t.ticket))
662 })
663 .returning(move |_| Ok(random_hash));
664
665 let mut indexer_action_tracker = MockActionState::new();
666 for tkt in tickets.iter().skip(ticket_from_previous_epoch_count).cloned() {
667 indexer_action_tracker
668 .expect_register_expectation()
669 .once()
670 .return_once(move |_| {
671 Ok(futures::future::ok(SignificantChainEvent {
672 tx_hash: random_hash,
673 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
674 })
675 .boxed())
676 });
677 }
678
679 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
681 let tx_sender = tx_queue.new_sender();
682 tokio::task::spawn(async move {
683 tx_queue.start().await;
684 });
685
686 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
687
688 futures::future::join_all(
689 actions
690 .redeem_tickets_in_channel(&channel_from_bob, false)
691 .await?
692 .into_iter(),
693 )
694 .await;
695
696 assert!(
697 actions.redeem_ticket(tickets[0].clone()).await.is_err(),
698 "cannot redeem a ticket that's from the previous epoch"
699 );
700
701 Ok(())
702 }
703
704 #[tokio::test]
705 async fn test_redeem_must_not_work_for_tickets_of_next_epoch_being_redeemed() -> anyhow::Result<()> {
706 let ticket_count = 3;
707 let ticket_from_next_epoch_count = 2;
708 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
709 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
710
711 let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
713
714 let ticket = generate_random_ack_ticket(0, &BOB, 5)?;
716 db.upsert_ticket(None, ticket.clone()).await?;
717 tickets.insert(0, ticket);
718
719 let ticket = generate_random_ack_ticket(1, &BOB, 5)?;
720 db.upsert_ticket(None, ticket.clone()).await?;
721 tickets.insert(1, ticket);
722
723 let tickets_clone = tickets.clone();
724 let mut tx_exec = MockTransactionExecutor::new();
725 tx_exec
726 .expect_redeem_ticket()
727 .times(ticket_count - ticket_from_next_epoch_count)
728 .withf(move |t| {
729 tickets_clone[ticket_from_next_epoch_count..]
730 .iter()
731 .any(|tk| tk.ticket.eq(&t.ticket))
732 })
733 .returning(move |_| Ok(random_hash));
734
735 let mut indexer_action_tracker = MockActionState::new();
736 for tkt in tickets.iter().skip(ticket_from_next_epoch_count).cloned() {
737 indexer_action_tracker
738 .expect_register_expectation()
739 .once()
740 .return_once(move |_| {
741 Ok(futures::future::ok(SignificantChainEvent {
742 tx_hash: random_hash,
743 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
744 })
745 .boxed())
746 });
747 }
748
749 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
751 let tx_sender = tx_queue.new_sender();
752 tokio::task::spawn(async move {
753 tx_queue.start().await;
754 });
755
756 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
757
758 futures::future::join_all(
759 actions
760 .redeem_tickets_in_channel(&channel_from_bob, false)
761 .await?
762 .into_iter(),
763 )
764 .await;
765
766 for ticket in tickets.iter().take(ticket_from_next_epoch_count) {
767 assert!(
768 actions.redeem_ticket(ticket.clone()).await.is_err(),
769 "cannot redeem a ticket that's from the next epoch"
770 );
771 }
772
773 Ok(())
774 }
775
776 #[tokio::test]
777 async fn test_should_redeem_single_ticket() -> anyhow::Result<()> {
778 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
779 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
780
781 let (channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
782
783 let ticket = tickets.into_iter().next().unwrap();
784
785 let mut tx_exec = MockTransactionExecutor::new();
786 let ticket_clone = ticket.clone();
787 tx_exec
788 .expect_redeem_ticket()
789 .once()
790 .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
791 .returning(move |_| Ok(random_hash));
792
793 let mut indexer_action_tracker = MockActionState::new();
794 let ticket_clone = ticket.clone();
795 indexer_action_tracker
796 .expect_register_expectation()
797 .once()
798 .return_once(move |_| {
799 Ok(futures::future::ok(SignificantChainEvent {
800 tx_hash: random_hash,
801 event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
802 })
803 .boxed())
804 });
805
806 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
808 let tx_sender = tx_queue.new_sender();
809 tokio::task::spawn(async move {
810 tx_queue.start().await;
811 });
812
813 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
814
815 let confirmation = actions.redeem_ticket(ticket).await?.await?;
816
817 assert_eq!(confirmation.tx_hash, random_hash);
818
819 assert!(
820 db.get_tickets((&channel_from_bob).into())
821 .await?
822 .into_iter()
823 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
824 "all bob's tickets must be in BeingRedeemed state"
825 );
826
827 Ok(())
828 }
829
830 #[tokio::test]
831 async fn test_should_not_redeem_single_ticket_with_lower_index_than_channel_index() -> anyhow::Result<()> {
832 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
833 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
834
835 let (mut channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
836
837 channel_from_bob.ticket_index = 2_u32.into();
838 db.upsert_channel(None, channel_from_bob).await?;
839
840 let ticket = tickets.into_iter().next().unwrap();
841
842 let mut tx_exec = MockTransactionExecutor::new();
843 let ticket_clone = ticket.clone();
844 tx_exec
845 .expect_redeem_ticket()
846 .never()
847 .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
848 .returning(move |_| Ok(random_hash));
849
850 let mut indexer_action_tracker = MockActionState::new();
851 let ticket_clone = ticket.clone();
852 indexer_action_tracker
853 .expect_register_expectation()
854 .never()
855 .return_once(move |_| {
856 Ok(futures::future::ok(SignificantChainEvent {
857 tx_hash: random_hash,
858 event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
859 })
860 .boxed())
861 });
862
863 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
865 let tx_sender = tx_queue.new_sender();
866 tokio::task::spawn(async move {
867 tx_queue.start().await;
868 });
869
870 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
871
872 assert!(matches!(actions.redeem_ticket(ticket).await, Err(OldTicket)));
873
874 Ok(())
875 }
876}