1use async_trait::async_trait;
26use futures::StreamExt;
27use hopr_api::db::{HoprDbTicketOperations, TicketSelector};
28use hopr_chain_types::actions::Action;
29use hopr_crypto_types::types::Hash;
30use hopr_db_sql::prelude::{HoprDbChannelOperations, HoprDbInfoOperations};
31use hopr_internal_types::prelude::*;
32use tracing::{debug, error, info};
33
34use crate::{
35 ChainActions,
36 action_queue::PendingAction,
37 errors::{
38 ChainActionsError,
39 ChainActionsError::{ChannelDoesNotExist, NodeDbError, OldTicket, WrongTicketState},
40 Result,
41 },
42};
43
44lazy_static::lazy_static! {
45 static ref EMPTY_TX_HASH: Hash = Hash::default();
47}
48
49#[async_trait]
51pub trait TicketRedeemActions {
52 async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>>;
54
55 async fn redeem_ticket(&self, ack: AcknowledgedTicket) -> Result<PendingAction>;
58}
59
60#[async_trait]
61impl<Db> TicketRedeemActions for ChainActions<Db>
62where
63 Db: HoprDbTicketOperations + Sync + 'static,
64{
65 #[tracing::instrument(level = "debug", skip(self))]
66 async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>> {
67 let selector = selector.with_state(AcknowledgedTicketStatus::Untouched);
72
73 let (count_redeemable_tickets, _) = self
74 .node_db
75 .get_tickets_value(selector.clone())
76 .await
77 .map_err(|e| NodeDbError(e.into()))?;
78
79 info!(
80 count_redeemable_tickets, %selector,
81 "acknowledged tickets in channel that can be redeemed"
82 );
83
84 if count_redeemable_tickets == 0 {
86 return Ok(vec![]);
87 }
88
89 let channel_dst = self
90 .index_db
91 .get_indexer_data(None)
92 .await?
93 .channels_dst
94 .ok_or(ChainActionsError::MissingDomainSeparator)?;
95
96 let selector_id = selector.to_string();
97
98 let mut tickets_to_redeem = self
100 .node_db
101 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
102 .await
103 .map_err(|e| NodeDbError(e.into()))?
104 .collect::<Vec<_>>()
105 .await;
106
107 tickets_to_redeem.sort();
109
110 let mut receivers: Vec<PendingAction> = vec![];
111 for ack_ticket in tickets_to_redeem {
112 let ticket_id = ack_ticket.to_string();
113
114 if let Ok(redeemable) = ack_ticket.into_redeemable(&self.chain_key, &channel_dst) {
115 let action = self.tx_sender.send(Action::RedeemTicket(redeemable)).await;
116 match action {
117 Ok(successful_tx) => {
118 receivers.push(successful_tx);
119 }
120 Err(e) => {
121 error!(ticket_id, error = %e, "Failed to submit transaction that redeems ticket",);
122 }
123 }
124 } else {
125 error!("failed to extract redeemable ticket");
126 }
127 }
128
129 info!(
130 count = receivers.len(),
131 selector = selector_id,
132 "acknowledged tickets were submitted to redeem in channel",
133 );
134
135 Ok(receivers)
136 }
137
138 #[tracing::instrument(level = "debug", skip(self))]
139 async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<PendingAction> {
140 if let Some(channel) = self
141 .index_db
142 .get_channel_by_id(None, &ack_ticket.verified_ticket().channel_id)
143 .await?
144 {
145 if ack_ticket.verified_ticket().index < channel.ticket_index.as_u64() {
148 return Err(OldTicket);
149 }
150
151 debug!(%ack_ticket, %channel, "redeeming single ticket");
152
153 let selector = TicketSelector::from(&channel)
154 .with_index(ack_ticket.verified_ticket().index)
155 .with_state(AcknowledgedTicketStatus::Untouched);
156
157 let maybe_ticket = self
159 .node_db
160 .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
161 .await
162 .map_err(|e| NodeDbError(e.into()))?
163 .next()
164 .await;
165
166 if let Some(ticket) = maybe_ticket {
167 let channel_dst = self
168 .index_db
169 .get_indexer_data(None)
170 .await?
171 .channels_dst
172 .ok_or(ChainActionsError::MissingDomainSeparator)?;
173
174 let redeemable = ticket.into_redeemable(&self.chain_key, &channel_dst)?;
175
176 debug!(%ack_ticket, "ticket is redeemable");
177 Ok(self.tx_sender.send(Action::RedeemTicket(redeemable)).await?)
178 } else {
179 Err(WrongTicketState(ack_ticket.to_string()))
180 }
181 } else {
182 Err(ChannelDoesNotExist)
183 }
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use futures::FutureExt;
190 use hex_literal::hex;
191 use hopr_chain_types::chain_events::{ChainEventType::TicketRedeemed, SignificantChainEvent};
192 use hopr_crypto_random::{Randomizable, random_bytes};
193 use hopr_crypto_types::prelude::*;
194 use hopr_db_node::HoprNodeDb;
195 use hopr_db_sql::{
196 HoprDbGeneralModelOperations, HoprIndexerDb,
197 errors::DbSqlError,
198 info::HoprDbInfoOperations,
199 prelude::{DomainSeparator, HoprDbChannelOperations},
200 };
201 use hopr_primitive_types::{
202 balance::HoprBalance,
203 prelude::{BytesRepresentable, U256, UnitaryFloatOps},
204 };
205
206 use super::*;
207 use crate::{
208 action_queue::{ActionQueue, MockTransactionExecutor},
209 action_state::MockActionState,
210 };
211
212 lazy_static::lazy_static! {
213 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
214 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be constructible");
215 static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
216 }
217
218 const PRICE_PER_PACKET: u128 = 10000000000000000u128; fn generate_random_ack_ticket(
221 idx: u64,
222 counterparty: &ChainKeypair,
223 channel_epoch: u32,
224 ) -> anyhow::Result<AcknowledgedTicket> {
225 let hk1 = HalfKey::random();
226 let hk2 = HalfKey::random();
227
228 let resp = Response::from_half_keys(&hk1, &hk2)?;
229
230 Ok(TicketBuilder::default()
231 .addresses(counterparty, &*ALICE)
232 .amount(U256::from(PRICE_PER_PACKET).div_f64(1.0f64)? * 5u32)
233 .index(idx)
234 .index_offset(1)
235 .win_prob(WinningProbability::ALWAYS)
236 .channel_epoch(channel_epoch)
237 .challenge(resp.to_challenge()?)
238 .build_signed(counterparty, &Hash::default())?
239 .into_acknowledged(resp))
240 }
241
242 async fn create_channel_with_ack_tickets(
243 db: HoprIndexerDb,
244 node_db: HoprNodeDb,
245 ticket_count: usize,
246 counterparty: &ChainKeypair,
247 channel_epoch: u32,
248 ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
249 let ckp = counterparty.clone();
250 let db_clone = db.clone();
251 let channel = db
252 .begin_transaction()
253 .await?
254 .perform(|tx| {
255 Box::pin(async move {
256 db_clone
257 .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
258 .await?;
259
260 let channel = ChannelEntry::new(
261 ckp.public().to_address(),
262 ALICE.public().to_address(),
263 0.into(),
264 U256::zero(),
265 ChannelStatus::Open,
266 channel_epoch.into(),
267 );
268 db_clone.upsert_channel(Some(tx), channel).await?;
269 Ok::<_, DbSqlError>(channel)
270 })
271 })
272 .await?;
273
274 let ckp = counterparty.clone();
275
276 let mut input_tickets = Vec::new();
277 for i in 0..ticket_count {
278 let ack_ticket = generate_random_ack_ticket(i as u64, &ckp, channel_epoch)
279 .map_err(|e| hopr_db_sql::errors::DbSqlError::LogicalError(e.to_string()))?;
280 node_db.upsert_ticket(ack_ticket.clone()).await?;
281 input_tickets.push(ack_ticket);
282 }
283
284 Ok((channel, input_tickets))
285 }
286
287 #[tokio::test]
288 async fn test_ticket_redeem_flow() -> anyhow::Result<()> {
289 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
290
291 let ticket_count = 5;
292 let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
293 let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
294
295 let (channel_from_bob, bob_tickets) =
297 create_channel_with_ack_tickets(db.clone(), node_db.clone(), ticket_count, &BOB, 4u32).await?;
298 let (channel_from_charlie, charlie_tickets) =
299 create_channel_with_ack_tickets(db.clone(), node_db.clone(), ticket_count, &CHARLIE, 4u32).await?;
300
301 let resp = Response::from_half_keys(&HalfKey::random(), &HalfKey::random())?;
303 let low_value_ack_ticket = TicketBuilder::default()
304 .addresses(&*CHARLIE, &*ALICE)
305 .amount(PRICE_PER_PACKET)
306 .index((ticket_count + 1) as u64)
307 .index_offset(1)
308 .win_prob(WinningProbability::ALWAYS)
309 .channel_epoch(4u32)
310 .challenge(resp.to_challenge()?)
311 .build_signed(&CHARLIE, &Hash::default())?
312 .into_acknowledged(resp);
313 node_db.upsert_ticket(low_value_ack_ticket).await?;
314
315 let mut indexer_action_tracker = MockActionState::new();
316 let mut seq2 = mockall::Sequence::new();
317
318 for tkt in bob_tickets.iter().cloned() {
319 indexer_action_tracker
320 .expect_register_expectation()
321 .once()
322 .in_sequence(&mut seq2)
323 .return_once(move |_| {
324 Ok(futures::future::ok(SignificantChainEvent {
325 tx_hash: random_hash,
326 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
327 })
328 .boxed())
329 });
330 }
331
332 for tkt in charlie_tickets.iter().cloned() {
333 indexer_action_tracker
334 .expect_register_expectation()
335 .once()
336 .in_sequence(&mut seq2)
337 .return_once(move |_| {
338 Ok(futures::future::ok(SignificantChainEvent {
339 tx_hash: random_hash,
340 event_type: TicketRedeemed(channel_from_charlie, Some(tkt)),
341 })
342 .boxed())
343 });
344 }
345
346 let mut tx_exec = MockTransactionExecutor::new();
347 let mut seq = mockall::Sequence::new();
348
349 tx_exec
351 .expect_redeem_ticket()
352 .times(ticket_count)
353 .in_sequence(&mut seq)
354 .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
355 .returning(move |_| Ok(random_hash));
356
357 tx_exec
359 .expect_redeem_ticket()
360 .times(ticket_count)
361 .in_sequence(&mut seq)
362 .withf(move |t| charlie_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
363 .returning(move |_| Ok(random_hash));
364
365 let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
367 let tx_sender = tx_queue.new_sender();
368 tokio::task::spawn(async move {
369 tx_queue.start().await;
370 });
371
372 let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
373
374 let confirmations = futures::future::try_join_all(
376 actions
377 .redeem_tickets(
378 TicketSelector::from(&channel_from_bob)
379 .also_on_channel_entry(&channel_from_charlie)
380 .with_amount(HoprBalance::from(PRICE_PER_PACKET * 5)..),
381 )
382 .await?
383 .into_iter(),
384 )
385 .await?;
386
387 assert_eq!(2 * ticket_count, confirmations.len(), "must have all confirmations");
388 assert!(
389 confirmations.into_iter().all(|c| c.tx_hash == random_hash),
390 "tx hashes must be equal"
391 );
392
393 let db_acks_bob = node_db
394 .stream_tickets(Some((&channel_from_bob).into()))
395 .await?
396 .collect::<Vec<_>>()
397 .await;
398
399 let db_acks_charlie = node_db
400 .stream_tickets(Some((&channel_from_charlie).into()))
401 .await?
402 .collect::<Vec<_>>()
403 .await;
404
405 assert!(
406 db_acks_bob
407 .into_iter()
408 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
409 "all bob's tickets must be in BeingRedeemed state"
410 );
411 assert!(
412 db_acks_charlie
413 .iter()
414 .take(ticket_count)
415 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
416 "all valuable charlie's tickets must be in BeingRedeemed state"
417 );
418 assert!(
419 db_acks_charlie
420 .iter()
421 .skip(ticket_count)
422 .all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
423 "all non-valuable charlie's tickets must be in Untouched state"
424 );
425
426 Ok(())
427 }
428
429 #[tokio::test]
430 async fn test_ticket_redeem_in_channel() -> anyhow::Result<()> {
431 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
432
433 let ticket_count = 5;
434 let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
435 let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
436
437 let (mut channel_from_bob, bob_tickets) =
439 create_channel_with_ack_tickets(db.clone(), node_db.clone(), ticket_count, &BOB, 4u32).await?;
440 let (channel_from_charlie, _) =
441 create_channel_with_ack_tickets(db.clone(), node_db.clone(), ticket_count, &CHARLIE, 4u32).await?;
442
443 channel_from_bob.ticket_index = 1_u32.into();
445 db.upsert_channel(None, channel_from_bob).await?;
446
447 let mut indexer_action_tracker = MockActionState::new();
448 let mut seq2 = mockall::Sequence::new();
449
450 for tkt in bob_tickets.iter().skip(1).cloned() {
452 indexer_action_tracker
453 .expect_register_expectation()
454 .once()
455 .in_sequence(&mut seq2)
456 .return_once(move |_| {
457 Ok(futures::future::ok(SignificantChainEvent {
458 tx_hash: random_hash,
459 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
460 })
461 .boxed())
462 });
463 }
464
465 let mut tx_exec = MockTransactionExecutor::new();
466 let mut seq = mockall::Sequence::new();
467
468 tx_exec
470 .expect_redeem_ticket()
471 .times(ticket_count - 1)
472 .in_sequence(&mut seq)
473 .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
474 .returning(move |_| Ok(random_hash));
475
476 let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
478 let tx_sender = tx_queue.new_sender();
479 tokio::task::spawn(tx_queue.start());
480
481 let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
482
483 let confirmations = futures::future::try_join_all(
484 actions
485 .redeem_tickets(TicketSelector::from(&channel_from_bob).with_index_range(1..))
486 .await?
487 .into_iter(),
488 )
489 .await?;
490
491 assert_eq!(ticket_count - 1, confirmations.len(), "must have all confirmations");
493 assert!(
494 confirmations.into_iter().all(|c| c.tx_hash == random_hash),
495 "tx hashes must be equal"
496 );
497
498 let db_acks_bob = node_db
499 .stream_tickets(Some((&channel_from_bob).into()))
500 .await?
501 .collect::<Vec<_>>()
502 .await;
503
504 let db_acks_charlie = node_db
505 .stream_tickets(Some((&channel_from_charlie).into()))
506 .await?
507 .collect::<Vec<_>>()
508 .await;
509
510 assert!(
511 db_acks_bob
512 .into_iter()
513 .take_while(|tkt| tkt.verified_ticket().index != 0)
514 .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
515 "all bob's tickets must be in BeingRedeemed state"
516 );
517 assert!(
518 db_acks_charlie
519 .into_iter()
520 .all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
521 "all charlie's tickets must be in Untouched state"
522 );
523
524 Ok(())
525 }
526
527 #[tokio::test]
528 async fn test_redeem_must_not_work_for_tickets_being_aggregated_and_being_redeemed() -> anyhow::Result<()> {
529 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
530
531 let ticket_count = 3;
532 let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
533 let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
534
535 let (channel_from_bob, mut tickets) =
536 create_channel_with_ack_tickets(db.clone(), node_db.clone(), ticket_count, &BOB, 4u32).await?;
537
538 tickets[0].status = AcknowledgedTicketStatus::BeingAggregated;
540 let selector = TicketSelector::from(&tickets[0]).with_no_state();
541 node_db
542 .update_ticket_states(selector, AcknowledgedTicketStatus::BeingAggregated)
543 .await?;
544
545 tickets[1].status = AcknowledgedTicketStatus::BeingRedeemed;
547 let selector = TicketSelector::from(&tickets[1]).with_no_state();
548 node_db
549 .update_ticket_states(selector, AcknowledgedTicketStatus::BeingRedeemed)
550 .await?;
551
552 let tickets_clone = tickets.clone();
554 let mut tx_exec = MockTransactionExecutor::new();
555 tx_exec
556 .expect_redeem_ticket()
557 .times(ticket_count - 2)
558 .withf(move |t| tickets_clone[2..].iter().any(|tk| tk.ticket.eq(&t.ticket)))
559 .returning(move |_| Ok(random_hash));
560
561 let mut indexer_action_tracker = MockActionState::new();
562 for tkt in tickets.iter().skip(2).cloned() {
563 indexer_action_tracker
564 .expect_register_expectation()
565 .once()
566 .return_once(move |_| {
567 Ok(futures::future::ok(SignificantChainEvent {
568 tx_hash: random_hash,
569 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
570 })
571 .boxed())
572 });
573 }
574
575 let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
577 let tx_sender = tx_queue.new_sender();
578 tokio::task::spawn(tx_queue.start());
579
580 let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
581
582 let confirmations = futures::future::try_join_all(
583 actions
584 .redeem_tickets(TicketSelector::from(&channel_from_bob))
585 .await?
586 .into_iter(),
587 )
588 .await?;
589
590 assert_eq!(
591 ticket_count - 2,
592 confirmations.len(),
593 "must redeem only redeemable tickets in channel"
594 );
595
596 assert!(
597 actions.redeem_ticket(tickets[0].clone()).await.is_err(),
598 "cannot redeem a ticket that's being aggregated"
599 );
600
601 assert!(
602 actions.redeem_ticket(tickets[1].clone()).await.is_err(),
603 "cannot redeem a ticket that's being redeemed"
604 );
605
606 Ok(())
607 }
608
609 #[tokio::test]
610 async fn test_redeem_must_not_work_for_tickets_of_previous_epoch_being_aggregated_and_being_redeemed()
611 -> anyhow::Result<()> {
612 let ticket_count = 3;
613 let ticket_from_previous_epoch_count = 2;
614 let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
615 let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
616 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
617
618 let (channel_from_bob, mut tickets) =
620 create_channel_with_ack_tickets(db.clone(), node_db.clone(), 1, &BOB, 4u32).await?;
621
622 let ticket = generate_random_ack_ticket(0, &BOB, 3)?;
624 node_db.upsert_ticket(ticket.clone()).await?;
625 tickets.insert(0, ticket);
626
627 let ticket = generate_random_ack_ticket(1, &BOB, 3)?;
628 node_db.upsert_ticket(ticket.clone()).await?;
629 tickets.insert(1, ticket);
630
631 let tickets_clone = tickets.clone();
632 let mut tx_exec = MockTransactionExecutor::new();
633 tx_exec
634 .expect_redeem_ticket()
635 .times(ticket_count - ticket_from_previous_epoch_count)
636 .withf(move |t| {
637 tickets_clone[ticket_from_previous_epoch_count..]
638 .iter()
639 .any(|tk| tk.ticket.eq(&t.ticket))
640 })
641 .returning(move |_| Ok(random_hash));
642
643 let mut indexer_action_tracker = MockActionState::new();
644 for tkt in tickets.iter().skip(ticket_from_previous_epoch_count).cloned() {
645 indexer_action_tracker
646 .expect_register_expectation()
647 .once()
648 .return_once(move |_| {
649 Ok(futures::future::ok(SignificantChainEvent {
650 tx_hash: random_hash,
651 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
652 })
653 .boxed())
654 });
655 }
656
657 let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
659 let tx_sender = tx_queue.new_sender();
660 tokio::task::spawn(async move {
661 tx_queue.start().await;
662 });
663
664 let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
665
666 futures::future::join_all(
667 actions
668 .redeem_tickets(TicketSelector::from(&channel_from_bob))
669 .await?
670 .into_iter(),
671 )
672 .await;
673
674 assert!(
675 actions.redeem_ticket(tickets[0].clone()).await.is_err(),
676 "cannot redeem a ticket that's from the previous epoch"
677 );
678
679 Ok(())
680 }
681
682 #[tokio::test]
683 async fn test_redeem_must_not_work_for_tickets_of_next_epoch_being_redeemed() -> anyhow::Result<()> {
684 let ticket_count = 3;
685 let ticket_from_next_epoch_count = 2;
686 let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
687 let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
688 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
689
690 let (channel_from_bob, mut tickets) =
692 create_channel_with_ack_tickets(db.clone(), node_db.clone(), 1, &BOB, 4u32).await?;
693
694 let ticket = generate_random_ack_ticket(0, &BOB, 5)?;
696 node_db.upsert_ticket(ticket.clone()).await?;
697 tickets.insert(0, ticket);
698
699 let ticket = generate_random_ack_ticket(1, &BOB, 5)?;
700 node_db.upsert_ticket(ticket.clone()).await?;
701 tickets.insert(1, ticket);
702
703 let tickets_clone = tickets.clone();
704 let mut tx_exec = MockTransactionExecutor::new();
705 tx_exec
706 .expect_redeem_ticket()
707 .times(ticket_count - ticket_from_next_epoch_count)
708 .withf(move |t| {
709 tickets_clone[ticket_from_next_epoch_count..]
710 .iter()
711 .any(|tk| tk.ticket.eq(&t.ticket))
712 })
713 .returning(move |_| Ok(random_hash));
714
715 let mut indexer_action_tracker = MockActionState::new();
716 for tkt in tickets.iter().skip(ticket_from_next_epoch_count).cloned() {
717 indexer_action_tracker
718 .expect_register_expectation()
719 .once()
720 .return_once(move |_| {
721 Ok(futures::future::ok(SignificantChainEvent {
722 tx_hash: random_hash,
723 event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
724 })
725 .boxed())
726 });
727 }
728
729 let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
731 let tx_sender = tx_queue.new_sender();
732 tokio::task::spawn(async move {
733 tx_queue.start().await;
734 });
735
736 let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
737
738 futures::future::join_all(
739 actions
740 .redeem_tickets(TicketSelector::from(&channel_from_bob))
741 .await?
742 .into_iter(),
743 )
744 .await;
745
746 for ticket in tickets.iter().take(ticket_from_next_epoch_count) {
747 assert!(
748 actions.redeem_ticket(ticket.clone()).await.is_err(),
749 "cannot redeem a ticket that's from the next epoch"
750 );
751 }
752
753 Ok(())
754 }
755
756 #[tokio::test]
757 async fn test_should_redeem_single_ticket() -> anyhow::Result<()> {
758 let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
759 let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
760 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
761
762 let (channel_from_bob, tickets) =
763 create_channel_with_ack_tickets(db.clone(), node_db.clone(), 1, &BOB, 1u32).await?;
764
765 let ticket = tickets.into_iter().next().unwrap();
766
767 let mut tx_exec = MockTransactionExecutor::new();
768 let ticket_clone = ticket.clone();
769 tx_exec
770 .expect_redeem_ticket()
771 .once()
772 .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
773 .returning(move |_| Ok(random_hash));
774
775 let mut indexer_action_tracker = MockActionState::new();
776 let ticket_clone = ticket.clone();
777 indexer_action_tracker
778 .expect_register_expectation()
779 .once()
780 .return_once(move |_| {
781 Ok(futures::future::ok(SignificantChainEvent {
782 tx_hash: random_hash,
783 event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
784 })
785 .boxed())
786 });
787
788 let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
790 let tx_sender = tx_queue.new_sender();
791 tokio::task::spawn(async move {
792 tx_queue.start().await;
793 });
794
795 let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
796
797 let confirmation = actions.redeem_ticket(ticket).await?.await?;
798
799 assert_eq!(confirmation.tx_hash, random_hash);
800
801 assert!(
802 node_db
803 .stream_tickets(Some((&channel_from_bob).into()))
804 .await?
805 .all(|tkt| futures::future::ready(tkt.status == AcknowledgedTicketStatus::BeingRedeemed))
806 .await,
807 "all bob's tickets must be in BeingRedeemed state"
808 );
809
810 Ok(())
811 }
812
813 #[tokio::test]
814 async fn test_should_not_redeem_single_ticket_with_lower_index_than_channel_index() -> anyhow::Result<()> {
815 let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
816 let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
817 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
818
819 let (mut channel_from_bob, tickets) =
820 create_channel_with_ack_tickets(db.clone(), node_db.clone(), 1, &BOB, 1u32).await?;
821
822 channel_from_bob.ticket_index = 2_u32.into();
823 db.upsert_channel(None, channel_from_bob).await?;
824
825 let ticket = tickets.into_iter().next().unwrap();
826
827 let mut tx_exec = MockTransactionExecutor::new();
828 let ticket_clone = ticket.clone();
829 tx_exec
830 .expect_redeem_ticket()
831 .never()
832 .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
833 .returning(move |_| Ok(random_hash));
834
835 let mut indexer_action_tracker = MockActionState::new();
836 let ticket_clone = ticket.clone();
837 indexer_action_tracker
838 .expect_register_expectation()
839 .never()
840 .return_once(move |_| {
841 Ok(futures::future::ok(SignificantChainEvent {
842 tx_hash: random_hash,
843 event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
844 })
845 .boxed())
846 });
847
848 let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
850 let tx_sender = tx_queue.new_sender();
851 tokio::task::spawn(async move {
852 tx_queue.start().await;
853 });
854
855 let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
856
857 assert!(matches!(actions.redeem_ticket(ticket).await, Err(OldTicket)));
858
859 Ok(())
860 }
861}