1use async_trait::async_trait;
25use futures::StreamExt;
26use hopr_chain_types::actions::Action;
27use hopr_crypto_types::types::Hash;
28use hopr_db_sql::api::info::DomainSeparator;
29use hopr_db_sql::api::tickets::{HoprDbTicketOperations, TicketSelector};
30use hopr_db_sql::channels::HoprDbChannelOperations;
31use hopr_db_sql::prelude::HoprDbInfoOperations;
32use hopr_internal_types::prelude::*;
33use hopr_primitive_types::prelude::*;
34use tracing::{debug, error, info, warn};
35
36use crate::action_queue::PendingAction;
37use crate::errors::ChainActionsError::{ChannelDoesNotExist, InvalidState, OldTicket};
38use crate::errors::{ChainActionsError::WrongTicketState, Result};
39use crate::ChainActions;
40
41lazy_static::lazy_static! {
42 static ref EMPTY_TX_HASH: Hash = Hash::default();
44}
45
46#[async_trait]
48pub trait TicketRedeemActions {
49 async fn redeem_all_tickets(&self, only_aggregated: bool) -> Result<Vec<PendingAction>>;
51
52 async fn redeem_tickets_with_counterparty(
54 &self,
55 counterparty: &Address,
56 only_aggregated: bool,
57 ) -> Result<Vec<PendingAction>>;
58
59 async fn redeem_tickets_in_channel(
61 &self,
62 channel: &ChannelEntry,
63 only_aggregated: bool,
64 ) -> Result<Vec<PendingAction>>;
65
66 async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>>;
68
69 async fn redeem_ticket(&self, ack: AcknowledgedTicket) -> Result<PendingAction>;
72}
73
74#[async_trait]
75impl<Db> TicketRedeemActions for ChainActions<Db>
76where
77 Db: HoprDbChannelOperations + HoprDbTicketOperations + HoprDbInfoOperations + Clone + Send + Sync + std::fmt::Debug,
78{
79 #[tracing::instrument(level = "debug", skip(self))]
80 async fn redeem_all_tickets(&self, only_aggregated: bool) -> Result<Vec<PendingAction>> {
81 let incoming_channels = self
82 .db
83 .get_channels_via(None, ChannelDirection::Incoming, &self.self_address())
84 .await?;
85 debug!(
86 channel_count = incoming_channels.len(),
87 "starting to redeem all tickets in channels to self"
88 );
89
90 let mut receivers: Vec<PendingAction> = vec![];
91
92 for incoming_channel in incoming_channels {
94 match self.redeem_tickets_in_channel(&incoming_channel, only_aggregated).await {
95 Ok(mut successful_txs) => {
96 receivers.append(&mut successful_txs);
97 }
98 Err(e) => {
99 warn!(
100 channel = %generate_channel_id(&incoming_channel.source, &incoming_channel.destination),
101 error = %e,
102 "Failed to redeem tickets in channel",
103 );
104 }
105 }
106 }
107
108 Ok(receivers)
109 }
110
111 #[tracing::instrument(level = "debug", skip(self))]
112 async fn redeem_tickets_with_counterparty(
113 &self,
114 counterparty: &Address,
115 only_aggregated: bool,
116 ) -> Result<Vec<PendingAction>> {
117 let maybe_channel = self
118 .db
119 .get_channel_by_parties(None, counterparty, &self.self_address(), false)
120 .await?;
121 if let Some(channel) = maybe_channel {
122 self.redeem_tickets_in_channel(&channel, only_aggregated).await
123 } else {
124 Err(ChannelDoesNotExist)
125 }
126 }
127
128 #[tracing::instrument(level = "debug", skip(self))]
129 async fn redeem_tickets_in_channel(
130 &self,
131 channel: &ChannelEntry,
132 only_aggregated: bool,
133 ) -> Result<Vec<PendingAction>> {
134 self.redeem_tickets(
135 TicketSelector::from(channel)
136 .with_aggregated_only(only_aggregated)
137 .with_index_range(channel.ticket_index.as_u64()..)
138 .with_state(AcknowledgedTicketStatus::Untouched),
139 )
140 .await
141 }
142
143 #[tracing::instrument(level = "debug", skip(self))]
144 async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>> {
145 let (count_redeemable_tickets, _) = self.db.get_tickets_value(selector.clone()).await?;
146
147 info!(
148 count_redeemable_tickets, %selector,
149 "acknowledged tickets in channel that can be redeemed"
150 );
151
152 if count_redeemable_tickets == 0 {
154 return Ok(vec![]);
155 }
156
157 let channel_dst = self
158 .db
159 .get_indexer_data(None)
160 .await?
161 .domain_separator(DomainSeparator::Channel)
162 .ok_or(InvalidState("missing channel dst".into()))?;
163
164 let selector_id = selector.to_string();
165
166 let redeem_stream = self
168 .db
169 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
170 .await?
171 .collect::<Vec<_>>()
172 .await;
173
174 let mut receivers: Vec<PendingAction> = vec![];
175 for ack_ticket in redeem_stream {
176 let ticket_id = ack_ticket.to_string();
177
178 if let Ok(redeemable) = ack_ticket.into_redeemable(&self.chain_key, &channel_dst) {
179 let action = self.tx_sender.send(Action::RedeemTicket(redeemable)).await;
180 match action {
181 Ok(successful_tx) => {
182 receivers.push(successful_tx);
183 }
184 Err(e) => {
185 error!(ticket_id, error = %e, "Failed to submit transaction that redeems ticket",);
186 }
187 }
188 } else {
189 error!("failed to extract redeemable ticket");
190 }
191 }
192
193 info!(
194 count = receivers.len(),
195 selector = selector_id,
196 "acknowledged tickets were submitted to redeem in channel",
197 );
198
199 Ok(receivers)
200 }
201
202 #[tracing::instrument(level = "debug", skip(self))]
203 async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<PendingAction> {
204 if let Some(channel) = self
205 .db
206 .get_channel_by_id(None, &ack_ticket.verified_ticket().channel_id)
207 .await?
208 {
209 if ack_ticket.verified_ticket().index < channel.ticket_index.as_u64() {
212 return Err(OldTicket);
213 }
214
215 debug!(%ack_ticket, %channel, "redeeming single ticket");
216
217 let selector = TicketSelector::from(&channel)
218 .with_index(ack_ticket.verified_ticket().index)
219 .with_state(AcknowledgedTicketStatus::Untouched);
220
221 let maybe_ticket = self
223 .db
224 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
225 .await?
226 .next()
227 .await;
228
229 if let Some(ticket) = maybe_ticket {
230 let channel_dst = self
231 .db
232 .get_indexer_data(None)
233 .await?
234 .domain_separator(DomainSeparator::Channel)
235 .ok_or(InvalidState("missing channel dst".into()))?;
236
237 let redeemable = ticket.into_redeemable(&self.chain_key, &channel_dst)?;
238
239 debug!(%ack_ticket, "ticket is redeemable");
240 Ok(self.tx_sender.send(Action::RedeemTicket(redeemable)).await?)
241 } else {
242 Err(WrongTicketState(ack_ticket.to_string()))
243 }
244 } else {
245 Err(ChannelDoesNotExist)
246 }
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253 use futures::FutureExt;
254 use hex_literal::hex;
255 use hopr_chain_types::chain_events::ChainEventType::TicketRedeemed;
256 use hopr_chain_types::chain_events::SignificantChainEvent;
257 use hopr_crypto_random::random_bytes;
258 use hopr_crypto_types::prelude::*;
259 use hopr_db_sql::api::info::DomainSeparator;
260 use hopr_db_sql::db::HoprDb;
261 use hopr_db_sql::errors::DbSqlError;
262 use hopr_db_sql::info::HoprDbInfoOperations;
263 use hopr_db_sql::{HoprDbGeneralModelOperations, TargetDb};
264
265 use crate::action_queue::{ActionQueue, MockTransactionExecutor};
266 use crate::action_state::MockActionState;
267
268 lazy_static::lazy_static! {
269 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
270 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be constructible");
271 static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
272 }
273
274 fn generate_random_ack_ticket(
275 idx: u64,
276 counterparty: &ChainKeypair,
277 channel_epoch: u32,
278 ) -> anyhow::Result<AcknowledgedTicket> {
279 let hk1 = HalfKey::random();
280 let hk2 = HalfKey::random();
281
282 let cp1: CurvePoint = hk1.to_challenge().try_into()?;
283 let cp2: CurvePoint = hk2.to_challenge().try_into()?;
284 let cp_sum = CurvePoint::combine(&[&cp1, &cp2]);
285
286 let price_per_packet: U256 = 10000000000000000u128.into(); Ok(TicketBuilder::default()
289 .addresses(counterparty, &*ALICE)
290 .amount(price_per_packet.div_f64(1.0f64)? * 5u32)
291 .index(idx)
292 .index_offset(1)
293 .win_prob(1.0)
294 .channel_epoch(channel_epoch)
295 .challenge(Challenge::from(cp_sum).to_ethereum_challenge())
296 .build_signed(counterparty, &Hash::default())?
297 .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
298 }
299
300 async fn create_channel_with_ack_tickets(
301 db: HoprDb,
302 ticket_count: usize,
303 counterparty: &ChainKeypair,
304 channel_epoch: u32,
305 ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
306 let ckp = counterparty.clone();
307 let db_clone = db.clone();
308 let channel = db
309 .begin_transaction()
310 .await?
311 .perform(|tx| {
312 Box::pin(async move {
313 db_clone
314 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
315 .await?;
316
317 let channel = ChannelEntry::new(
318 ckp.public().to_address(),
319 ALICE.public().to_address(),
320 Balance::zero(BalanceType::HOPR),
321 U256::zero(),
322 ChannelStatus::Open,
323 channel_epoch.into(),
324 );
325 db_clone.upsert_channel(Some(tx), channel).await?;
326 Ok::<_, DbSqlError>(channel)
327 })
328 })
329 .await?;
330
331 let ckp = counterparty.clone();
332 let input_tickets = db
333 .begin_transaction_in_db(TargetDb::Tickets)
334 .await?
335 .perform(|tx| {
336 Box::pin(async move {
337 let mut input_tickets = Vec::new();
338 for i in 0..ticket_count {
339 let ack_ticket = generate_random_ack_ticket(i as u64, &ckp, channel_epoch)
340 .map_err(|e| hopr_db_sql::errors::DbSqlError::LogicalError(e.to_string()))?;
341 db.upsert_ticket(Some(tx), ack_ticket.clone()).await?;
342 input_tickets.push(ack_ticket);
343 }
344 Ok::<_, DbSqlError>(input_tickets)
345 })
346 })
347 .await?;
348
349 Ok((channel, input_tickets))
350 }
351
352 #[async_std::test]
353 async fn test_ticket_redeem_flow() -> anyhow::Result<()> {
354 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
355
356 let ticket_count = 5;
357 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
358
359 let (channel_from_bob, bob_tickets) =
361 create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
362 let (channel_from_charlie, charlie_tickets) =
363 create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
364
365 let mut indexer_action_tracker = MockActionState::new();
366 let mut seq2 = mockall::Sequence::new();
367
368 for tkt in bob_tickets.iter().cloned() {
369 indexer_action_tracker
370 .expect_register_expectation()
371 .once()
372 .in_sequence(&mut seq2)
373 .return_once(move |_| {
374 Ok(futures::future::ok(SignificantChainEvent {
375 tx_hash: random_hash,
376 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
377 })
378 .boxed())
379 });
380 }
381
382 for tkt in charlie_tickets.iter().cloned() {
383 indexer_action_tracker
384 .expect_register_expectation()
385 .once()
386 .in_sequence(&mut seq2)
387 .return_once(move |_| {
388 Ok(futures::future::ok(SignificantChainEvent {
389 tx_hash: random_hash,
390 event_type: TicketRedeemed(channel_from_charlie, Some(tkt)),
391 })
392 .boxed())
393 });
394 }
395
396 let mut tx_exec = MockTransactionExecutor::new();
397 let mut seq = mockall::Sequence::new();
398
399 tx_exec
401 .expect_redeem_ticket()
402 .times(ticket_count)
403 .in_sequence(&mut seq)
404 .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
405 .returning(move |_| Ok(random_hash));
406
407 tx_exec
409 .expect_redeem_ticket()
410 .times(ticket_count)
411 .in_sequence(&mut seq)
412 .withf(move |t| charlie_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
413 .returning(move |_| Ok(random_hash));
414
415 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
417 let tx_sender = tx_queue.new_sender();
418 async_std::task::spawn(async move {
419 tx_queue.start().await;
420 });
421
422 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
423
424 let confirmations = futures::future::try_join_all(actions.redeem_all_tickets(false).await?.into_iter()).await?;
425
426 assert_eq!(2 * ticket_count, confirmations.len(), "must have all confirmations");
427 assert!(
428 confirmations.into_iter().all(|c| c.tx_hash == random_hash),
429 "tx hashes must be equal"
430 );
431
432 let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
433
434 let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
435
436 assert!(
437 db_acks_bob
438 .into_iter()
439 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
440 "all bob's tickets must be in BeingRedeemed state"
441 );
442 assert!(
443 db_acks_charlie
444 .into_iter()
445 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
446 "all charlie's tickets must be in BeingRedeemed state"
447 );
448
449 Ok(())
450 }
451
452 #[async_std::test]
453 async fn test_ticket_redeem_in_channel() -> anyhow::Result<()> {
454 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
455
456 let ticket_count = 5;
457 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
458
459 let (mut channel_from_bob, bob_tickets) =
461 create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
462 let (channel_from_charlie, _) =
463 create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
464
465 channel_from_bob.ticket_index = 1_u32.into();
467 db.upsert_channel(None, channel_from_bob.clone()).await?;
468
469 let mut indexer_action_tracker = MockActionState::new();
470 let mut seq2 = mockall::Sequence::new();
471
472 for tkt in bob_tickets.iter().cloned() {
473 indexer_action_tracker
474 .expect_register_expectation()
475 .once()
476 .in_sequence(&mut seq2)
477 .return_once(move |_| {
478 Ok(futures::future::ok(SignificantChainEvent {
479 tx_hash: random_hash,
480 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
481 })
482 .boxed())
483 });
484 }
485
486 let mut tx_exec = MockTransactionExecutor::new();
488 tx_exec
489 .expect_redeem_ticket()
490 .times(ticket_count - 1)
491 .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
492 .returning(move |_| Ok(random_hash));
493
494 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
496 let tx_sender = tx_queue.new_sender();
497 async_std::task::spawn(async move {
498 tx_queue.start().await;
499 });
500
501 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
502
503 let confirmations = futures::future::try_join_all(
504 actions
505 .redeem_tickets_with_counterparty(&BOB.public().to_address(), false)
506 .await?
507 .into_iter(),
508 )
509 .await?;
510
511 assert_eq!(ticket_count - 1, confirmations.len(), "must have all confirmations");
513 assert!(
514 confirmations.into_iter().all(|c| c.tx_hash == random_hash),
515 "tx hashes must be equal"
516 );
517
518 let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
519
520 let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
521
522 assert!(
523 db_acks_bob
524 .into_iter()
525 .take_while(|tkt| tkt.verified_ticket().index != 0)
526 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
527 "all bob's tickets must be in BeingRedeemed state"
528 );
529 assert!(
530 db_acks_charlie
531 .into_iter()
532 .all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
533 "all charlie's tickets must be in Untouched state"
534 );
535
536 Ok(())
537 }
538
539 #[async_std::test]
540 async fn test_redeem_must_not_work_for_tickets_being_aggregated_and_being_redeemed() -> anyhow::Result<()> {
541 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
542
543 let ticket_count = 3;
544 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
545
546 let (channel_from_bob, mut tickets) =
547 create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
548
549 tickets[0].status = AcknowledgedTicketStatus::BeingAggregated;
551 let selector = TicketSelector::from(&tickets[0]).with_no_state();
552 db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingAggregated)
553 .await?;
554
555 tickets[1].status = AcknowledgedTicketStatus::BeingRedeemed;
557 let selector = TicketSelector::from(&tickets[1]).with_no_state();
558 db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingRedeemed)
559 .await?;
560
561 let tickets_clone = tickets.clone();
563 let mut tx_exec = MockTransactionExecutor::new();
564 tx_exec
565 .expect_redeem_ticket()
566 .times(ticket_count - 2)
567 .withf(move |t| tickets_clone[2..].iter().any(|tk| tk.ticket.eq(&t.ticket)))
568 .returning(move |_| Ok(random_hash));
569
570 let mut indexer_action_tracker = MockActionState::new();
571 for tkt in tickets.iter().skip(2).cloned() {
572 indexer_action_tracker
573 .expect_register_expectation()
574 .once()
575 .return_once(move |_| {
576 Ok(futures::future::ok(SignificantChainEvent {
577 tx_hash: random_hash,
578 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
579 })
580 .boxed())
581 });
582 }
583
584 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
586 let tx_sender = tx_queue.new_sender();
587 async_std::task::spawn(async move {
588 tx_queue.start().await;
589 });
590
591 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
592
593 let confirmations = futures::future::try_join_all(
594 actions
595 .redeem_tickets_in_channel(&channel_from_bob, false)
596 .await?
597 .into_iter(),
598 )
599 .await?;
600
601 assert_eq!(
602 ticket_count - 2,
603 confirmations.len(),
604 "must redeem only redeemable tickets in channel"
605 );
606
607 assert!(
608 actions.redeem_ticket(tickets[0].clone()).await.is_err(),
609 "cannot redeem a ticket that's being aggregated"
610 );
611
612 assert!(
613 actions.redeem_ticket(tickets[1].clone()).await.is_err(),
614 "cannot redeem a ticket that's being redeemed"
615 );
616
617 Ok(())
618 }
619
620 #[async_std::test]
621 async fn test_redeem_must_not_work_for_tickets_of_previous_epoch_being_aggregated_and_being_redeemed(
622 ) -> anyhow::Result<()> {
623 let ticket_count = 3;
624 let ticket_from_previous_epoch_count = 2;
625 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
626 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
627
628 let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
630
631 let ticket = generate_random_ack_ticket(0, &BOB, 3)?;
633 db.upsert_ticket(None, ticket.clone()).await?;
634 tickets.insert(0, ticket);
635
636 let ticket = generate_random_ack_ticket(1, &BOB, 3)?;
637 db.upsert_ticket(None, ticket.clone()).await?;
638 tickets.insert(1, ticket);
639
640 let tickets_clone = tickets.clone();
641 let mut tx_exec = MockTransactionExecutor::new();
642 tx_exec
643 .expect_redeem_ticket()
644 .times(ticket_count - ticket_from_previous_epoch_count)
645 .withf(move |t| {
646 tickets_clone[ticket_from_previous_epoch_count..]
647 .iter()
648 .any(|tk| tk.ticket.eq(&t.ticket))
649 })
650 .returning(move |_| Ok(random_hash));
651
652 let mut indexer_action_tracker = MockActionState::new();
653 for tkt in tickets.iter().skip(ticket_from_previous_epoch_count).cloned() {
654 indexer_action_tracker
655 .expect_register_expectation()
656 .once()
657 .return_once(move |_| {
658 Ok(futures::future::ok(SignificantChainEvent {
659 tx_hash: random_hash,
660 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
661 })
662 .boxed())
663 });
664 }
665
666 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
668 let tx_sender = tx_queue.new_sender();
669 async_std::task::spawn(async move {
670 tx_queue.start().await;
671 });
672
673 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
674
675 futures::future::join_all(
676 actions
677 .redeem_tickets_in_channel(&channel_from_bob, false)
678 .await?
679 .into_iter(),
680 )
681 .await;
682
683 assert!(
684 actions.redeem_ticket(tickets[0].clone()).await.is_err(),
685 "cannot redeem a ticket that's from the previous epoch"
686 );
687
688 Ok(())
689 }
690
691 #[async_std::test]
692 async fn test_redeem_must_not_work_for_tickets_of_next_epoch_being_redeemed() -> anyhow::Result<()> {
693 let ticket_count = 4;
694 let ticket_from_next_epoch_count = 2;
695 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
696 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
697
698 let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
700
701 let ticket = generate_random_ack_ticket(0, &BOB, 5)?;
703 db.upsert_ticket(None, ticket.clone()).await?;
704 tickets.insert(0, ticket);
705
706 let ticket = generate_random_ack_ticket(1, &BOB, 5)?;
707 db.upsert_ticket(None, ticket.clone()).await?;
708 tickets.insert(1, ticket);
709
710 let tickets_clone = tickets.clone();
711 let mut tx_exec = MockTransactionExecutor::new();
712 tx_exec
713 .expect_redeem_ticket()
714 .times(ticket_count - ticket_from_next_epoch_count)
715 .withf(move |t| {
716 tickets_clone[ticket_from_next_epoch_count..]
717 .iter()
718 .any(|tk| tk.ticket.eq(&t.ticket))
719 })
720 .returning(move |_| Ok(random_hash));
721
722 let mut indexer_action_tracker = MockActionState::new();
723 for tkt in tickets.iter().skip(ticket_from_next_epoch_count).cloned() {
724 indexer_action_tracker
725 .expect_register_expectation()
726 .once()
727 .return_once(move |_| {
728 Ok(futures::future::ok(SignificantChainEvent {
729 tx_hash: random_hash,
730 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
731 })
732 .boxed())
733 });
734 }
735
736 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
738 let tx_sender = tx_queue.new_sender();
739 async_std::task::spawn(async move {
740 tx_queue.start().await;
741 });
742
743 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
744
745 futures::future::join_all(
746 actions
747 .redeem_tickets_in_channel(&channel_from_bob, false)
748 .await?
749 .into_iter(),
750 )
751 .await;
752
753 for unredeemable_index in 0..ticket_from_next_epoch_count {
754 assert!(
755 actions
756 .redeem_ticket(tickets[unredeemable_index].clone())
757 .await
758 .is_err(),
759 "cannot redeem a ticket that's from the next epoch"
760 );
761 }
762
763 Ok(())
764 }
765
766 #[async_std::test]
767 async fn test_should_redeem_single_ticket() -> anyhow::Result<()> {
768 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
769 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
770
771 let (channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
772
773 let ticket = tickets.into_iter().next().unwrap();
774
775 let mut tx_exec = MockTransactionExecutor::new();
776 let ticket_clone = ticket.clone();
777 tx_exec
778 .expect_redeem_ticket()
779 .once()
780 .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
781 .returning(move |_| Ok(random_hash));
782
783 let mut indexer_action_tracker = MockActionState::new();
784 let ticket_clone = ticket.clone();
785 indexer_action_tracker
786 .expect_register_expectation()
787 .once()
788 .return_once(move |_| {
789 Ok(futures::future::ok(SignificantChainEvent {
790 tx_hash: random_hash,
791 event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
792 })
793 .boxed())
794 });
795
796 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
798 let tx_sender = tx_queue.new_sender();
799 async_std::task::spawn(async move {
800 tx_queue.start().await;
801 });
802
803 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
804
805 let confirmation = actions.redeem_ticket(ticket).await?.await?;
806
807 assert_eq!(confirmation.tx_hash, random_hash);
808
809 assert!(
810 db.get_tickets((&channel_from_bob).into())
811 .await?
812 .into_iter()
813 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
814 "all bob's tickets must be in BeingRedeemed state"
815 );
816
817 Ok(())
818 }
819
820 #[async_std::test]
821 async fn test_should_not_redeem_single_ticket_with_lower_index_than_channel_index() -> anyhow::Result<()> {
822 let db = HoprDb::new_in_memory(ALICE.clone()).await?;
823 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
824
825 let (mut channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
826
827 channel_from_bob.ticket_index = 2_u32.into();
828 db.upsert_channel(None, channel_from_bob.clone()).await?;
829
830 let ticket = tickets.into_iter().next().unwrap();
831
832 let mut tx_exec = MockTransactionExecutor::new();
833 let ticket_clone = ticket.clone();
834 tx_exec
835 .expect_redeem_ticket()
836 .never()
837 .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
838 .returning(move |_| Ok(random_hash));
839
840 let mut indexer_action_tracker = MockActionState::new();
841 let ticket_clone = ticket.clone();
842 indexer_action_tracker
843 .expect_register_expectation()
844 .never()
845 .return_once(move |_| {
846 Ok(futures::future::ok(SignificantChainEvent {
847 tx_hash: random_hash,
848 event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
849 })
850 .boxed())
851 });
852
853 let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
855 let tx_sender = tx_queue.new_sender();
856 async_std::task::spawn(async move {
857 tx_queue.start().await;
858 });
859
860 let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
861
862 assert!(matches!(actions.redeem_ticket(ticket).await, Err(OldTicket)));
863
864 Ok(())
865 }
866}