hopr_chain_actions/
redeem.rs

1//! This module contains the [TicketRedeemActions] trait defining actions regarding
2//! ticket redemption.
3//!
4//! An implementation of this trait is added to [ChainActions] which realizes the redemption
5//! operations via [ActionQueue](crate::action_queue::ActionQueue).
6//!
7//! There are 4 functions that can be used to redeem tickets in the [TicketRedeemActions] trait:
8//! - [redeem_all_tickets](TicketRedeemActions::redeem_all_tickets)
9//! - [redeem_tickets_in_channel](TicketRedeemActions::redeem_tickets_in_channel)
10//! - [redeem_tickets_with_counterparty](TicketRedeemActions::redeem_tickets_with_counterparty)
11//! - [redeem_ticket](TicketRedeemActions::redeem_ticket)
12//!
13//! Each method first checks if the tickets are redeemable.
14//! (= they are not marked as [BeingRedeemed](hopr_internal_types::tickets::AcknowledgedTicketStatus::BeingRedeemed) or
15//! [BeingAggregated](hopr_internal_types::tickets::AcknowledgedTicketStatus::BeingAggregated) in the DB),
16//! If they are redeemable, their state is changed to
17//! [BeingRedeemed](hopr_internal_types::tickets::AcknowledgedTicketStatus::BeingRedeemed) (while having acquired the
18//! exclusive DB write lock). Subsequently, the ticket in such a state is transmitted into the
19//! [ActionQueue](crate::action_queue::ActionQueue) so the redemption is soon executed on-chain. The functions return
20//! immediately but provide futures that can be awaited in case the callers wish to await the on-chain confirmation of
21//! each ticket redemption.
22//!
23//! See the details in [ActionQueue](crate::action_queue::ActionQueue) on how the confirmation is realized by awaiting
24//! the respective [SignificantChainEvent](hopr_chain_types::chain_events::SignificantChainEvent). by the Indexer.
25use async_trait::async_trait;
26use futures::StreamExt;
27use hopr_chain_types::actions::Action;
28use hopr_crypto_types::types::Hash;
29use hopr_db_sql::{
30    api::{
31        info::DomainSeparator,
32        tickets::{HoprDbTicketOperations, TicketSelector},
33    },
34    channels::HoprDbChannelOperations,
35    prelude::HoprDbInfoOperations,
36};
37use hopr_internal_types::prelude::*;
38use hopr_primitive_types::prelude::*;
39use tracing::{debug, error, info, warn};
40
41use crate::{
42    ChainActions,
43    action_queue::PendingAction,
44    errors::{
45        ChainActionsError::{ChannelDoesNotExist, InvalidState, OldTicket, WrongTicketState},
46        Result,
47    },
48};
49
50lazy_static::lazy_static! {
51    /// Used as a placeholder when the redeem transaction has not yet been published on-chain
52    static ref EMPTY_TX_HASH: Hash = Hash::default();
53}
54
55/// Gathers all the ticket redemption-related on-chain calls.
56#[async_trait]
57pub trait TicketRedeemActions {
58    /// Redeems all redeemable tickets in all channels.
59    async fn redeem_all_tickets(&self, only_aggregated: bool) -> Result<Vec<PendingAction>>;
60
61    /// Redeems all redeemable tickets in the incoming channel from the given counterparty.
62    async fn redeem_tickets_with_counterparty(
63        &self,
64        counterparty: &Address,
65        only_aggregated: bool,
66    ) -> Result<Vec<PendingAction>>;
67
68    /// Redeems all redeemable tickets in the given channel.
69    async fn redeem_tickets_in_channel(
70        &self,
71        channel: &ChannelEntry,
72        only_aggregated: bool,
73    ) -> Result<Vec<PendingAction>>;
74
75    /// Redeems all tickets based on the given [`TicketSelector`].
76    async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>>;
77
78    /// Tries to redeem the given ticket. If the ticket is not redeemable, returns an error.
79    /// Otherwise, the transaction hash of the on-chain redemption is returned.
80    async fn redeem_ticket(&self, ack: AcknowledgedTicket) -> Result<PendingAction>;
81}
82
83#[async_trait]
84impl<Db> TicketRedeemActions for ChainActions<Db>
85where
86    Db: HoprDbChannelOperations + HoprDbTicketOperations + HoprDbInfoOperations + Clone + Send + Sync + std::fmt::Debug,
87{
88    #[tracing::instrument(level = "debug", skip(self))]
89    async fn redeem_all_tickets(&self, only_aggregated: bool) -> Result<Vec<PendingAction>> {
90        let incoming_channels = self
91            .db
92            .get_channels_via(None, ChannelDirection::Incoming, &self.self_address())
93            .await?;
94        debug!(
95            channel_count = incoming_channels.len(),
96            "starting to redeem all tickets in channels to self"
97        );
98
99        let mut receivers: Vec<PendingAction> = vec![];
100
101        // Must be synchronous because underlying Ethereum transactions are sequential
102        for incoming_channel in incoming_channels {
103            match self.redeem_tickets_in_channel(&incoming_channel, only_aggregated).await {
104                Ok(mut successful_txs) => {
105                    receivers.append(&mut successful_txs);
106                }
107                Err(e) => {
108                    warn!(
109                        channel = %generate_channel_id(&incoming_channel.source, &incoming_channel.destination),
110                        error = %e,
111                        "Failed to redeem tickets in channel",
112                    );
113                }
114            }
115        }
116
117        Ok(receivers)
118    }
119
120    #[tracing::instrument(level = "debug", skip(self))]
121    async fn redeem_tickets_with_counterparty(
122        &self,
123        counterparty: &Address,
124        only_aggregated: bool,
125    ) -> Result<Vec<PendingAction>> {
126        let maybe_channel = self
127            .db
128            .get_channel_by_parties(None, counterparty, &self.self_address(), false)
129            .await?;
130        if let Some(channel) = maybe_channel {
131            self.redeem_tickets_in_channel(&channel, only_aggregated).await
132        } else {
133            Err(ChannelDoesNotExist)
134        }
135    }
136
137    #[tracing::instrument(level = "debug", skip(self))]
138    async fn redeem_tickets_in_channel(
139        &self,
140        channel: &ChannelEntry,
141        only_aggregated: bool,
142    ) -> Result<Vec<PendingAction>> {
143        self.redeem_tickets(
144            TicketSelector::from(channel)
145                .with_aggregated_only(only_aggregated)
146                .with_index_range(channel.ticket_index.as_u64()..)
147                .with_state(AcknowledgedTicketStatus::Untouched),
148        )
149        .await
150    }
151
152    #[tracing::instrument(level = "debug", skip(self))]
153    async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>> {
154        let (count_redeemable_tickets, _) = self.db.get_tickets_value(selector.clone()).await?;
155
156        info!(
157            count_redeemable_tickets, %selector,
158            "acknowledged tickets in channel that can be redeemed"
159        );
160
161        // Return fast if there are no redeemable tickets
162        if count_redeemable_tickets == 0 {
163            return Ok(vec![]);
164        }
165
166        let channel_dst = self
167            .db
168            .get_indexer_data(None)
169            .await?
170            .domain_separator(DomainSeparator::Channel)
171            .ok_or(InvalidState("missing channel dst".into()))?;
172
173        let selector_id = selector.to_string();
174
175        // Collect here, so we don't hold-up the stream open for too long
176        let redeem_stream = self
177            .db
178            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
179            .await?
180            .collect::<Vec<_>>()
181            .await;
182
183        let mut receivers: Vec<PendingAction> = vec![];
184        for ack_ticket in redeem_stream {
185            let ticket_id = ack_ticket.to_string();
186
187            if let Ok(redeemable) = ack_ticket.into_redeemable(&self.chain_key, &channel_dst) {
188                let action = self.tx_sender.send(Action::RedeemTicket(redeemable)).await;
189                match action {
190                    Ok(successful_tx) => {
191                        receivers.push(successful_tx);
192                    }
193                    Err(e) => {
194                        error!(ticket_id, error = %e, "Failed to submit transaction that redeems ticket",);
195                    }
196                }
197            } else {
198                error!("failed to extract redeemable ticket");
199            }
200        }
201
202        info!(
203            count = receivers.len(),
204            selector = selector_id,
205            "acknowledged tickets were submitted to redeem in channel",
206        );
207
208        Ok(receivers)
209    }
210
211    #[tracing::instrument(level = "debug", skip(self))]
212    async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<PendingAction> {
213        if let Some(channel) = self
214            .db
215            .get_channel_by_id(None, &ack_ticket.verified_ticket().channel_id)
216            .await?
217        {
218            // Check if not trying to redeem a ticket that cannot be redeemed.
219            // Such tickets are automatically cleaned up (neglected) after successful redemption.
220            if ack_ticket.verified_ticket().index < channel.ticket_index.as_u64() {
221                return Err(OldTicket);
222            }
223
224            debug!(%ack_ticket, %channel, "redeeming single ticket");
225
226            let selector = TicketSelector::from(&channel)
227                .with_index(ack_ticket.verified_ticket().index)
228                .with_state(AcknowledgedTicketStatus::Untouched);
229
230            // Do not hold up the stream open for too long
231            let maybe_ticket = self
232                .db
233                .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
234                .await?
235                .next()
236                .await;
237
238            if let Some(ticket) = maybe_ticket {
239                let channel_dst = self
240                    .db
241                    .get_indexer_data(None)
242                    .await?
243                    .domain_separator(DomainSeparator::Channel)
244                    .ok_or(InvalidState("missing channel dst".into()))?;
245
246                let redeemable = ticket.into_redeemable(&self.chain_key, &channel_dst)?;
247
248                debug!(%ack_ticket, "ticket is redeemable");
249                Ok(self.tx_sender.send(Action::RedeemTicket(redeemable)).await?)
250            } else {
251                Err(WrongTicketState(ack_ticket.to_string()))
252            }
253        } else {
254            Err(ChannelDoesNotExist)
255        }
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use futures::FutureExt;
262    use hex_literal::hex;
263    use hopr_chain_types::chain_events::{ChainEventType::TicketRedeemed, SignificantChainEvent};
264    use hopr_crypto_random::{Randomizable, random_bytes};
265    use hopr_crypto_types::prelude::*;
266    use hopr_db_sql::{
267        HoprDbGeneralModelOperations, TargetDb, api::info::DomainSeparator, db::HoprDb, errors::DbSqlError,
268        info::HoprDbInfoOperations,
269    };
270
271    use super::*;
272    use crate::{
273        action_queue::{ActionQueue, MockTransactionExecutor},
274        action_state::MockActionState,
275    };
276
277    lazy_static::lazy_static! {
278        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
279        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be constructible");
280        static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
281    }
282
283    fn generate_random_ack_ticket(
284        idx: u64,
285        counterparty: &ChainKeypair,
286        channel_epoch: u32,
287    ) -> anyhow::Result<AcknowledgedTicket> {
288        let hk1 = HalfKey::random();
289        let hk2 = HalfKey::random();
290
291        let cp1: CurvePoint = hk1.to_challenge().try_into()?;
292        let cp2: CurvePoint = hk2.to_challenge().try_into()?;
293        let cp_sum = CurvePoint::combine(&[&cp1, &cp2]);
294
295        let price_per_packet: U256 = 10000000000000000u128.into(); // 0.01 HOPR
296
297        Ok(TicketBuilder::default()
298            .addresses(counterparty, &*ALICE)
299            .amount(price_per_packet.div_f64(1.0f64)? * 5u32)
300            .index(idx)
301            .index_offset(1)
302            .win_prob(WinningProbability::ALWAYS)
303            .channel_epoch(channel_epoch)
304            .challenge(Challenge::from(cp_sum).to_ethereum_challenge())
305            .build_signed(counterparty, &Hash::default())?
306            .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
307    }
308
309    async fn create_channel_with_ack_tickets(
310        db: HoprDb,
311        ticket_count: usize,
312        counterparty: &ChainKeypair,
313        channel_epoch: u32,
314    ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
315        let ckp = counterparty.clone();
316        let db_clone = db.clone();
317        let channel = db
318            .begin_transaction()
319            .await?
320            .perform(|tx| {
321                Box::pin(async move {
322                    db_clone
323                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
324                        .await?;
325
326                    let channel = ChannelEntry::new(
327                        ckp.public().to_address(),
328                        ALICE.public().to_address(),
329                        0.into(),
330                        U256::zero(),
331                        ChannelStatus::Open,
332                        channel_epoch.into(),
333                    );
334                    db_clone.upsert_channel(Some(tx), channel).await?;
335                    Ok::<_, DbSqlError>(channel)
336                })
337            })
338            .await?;
339
340        let ckp = counterparty.clone();
341        let input_tickets = db
342            .begin_transaction_in_db(TargetDb::Tickets)
343            .await?
344            .perform(|tx| {
345                Box::pin(async move {
346                    let mut input_tickets = Vec::new();
347                    for i in 0..ticket_count {
348                        let ack_ticket = generate_random_ack_ticket(i as u64, &ckp, channel_epoch)
349                            .map_err(|e| hopr_db_sql::errors::DbSqlError::LogicalError(e.to_string()))?;
350                        db.upsert_ticket(Some(tx), ack_ticket.clone()).await?;
351                        input_tickets.push(ack_ticket);
352                    }
353                    Ok::<_, DbSqlError>(input_tickets)
354                })
355            })
356            .await?;
357
358        Ok((channel, input_tickets))
359    }
360
361    #[tokio::test]
362    async fn test_ticket_redeem_flow() -> anyhow::Result<()> {
363        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
364
365        let ticket_count = 5;
366        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
367
368        // All the tickets can be redeemed because they are issued with the same channel epoch
369        let (channel_from_bob, bob_tickets) =
370            create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
371        let (channel_from_charlie, charlie_tickets) =
372            create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
373
374        let mut indexer_action_tracker = MockActionState::new();
375        let mut seq2 = mockall::Sequence::new();
376
377        for tkt in bob_tickets.iter().cloned() {
378            indexer_action_tracker
379                .expect_register_expectation()
380                .once()
381                .in_sequence(&mut seq2)
382                .return_once(move |_| {
383                    Ok(futures::future::ok(SignificantChainEvent {
384                        tx_hash: random_hash,
385                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
386                    })
387                    .boxed())
388                });
389        }
390
391        for tkt in charlie_tickets.iter().cloned() {
392            indexer_action_tracker
393                .expect_register_expectation()
394                .once()
395                .in_sequence(&mut seq2)
396                .return_once(move |_| {
397                    Ok(futures::future::ok(SignificantChainEvent {
398                        tx_hash: random_hash,
399                        event_type: TicketRedeemed(channel_from_charlie, Some(tkt)),
400                    })
401                    .boxed())
402                });
403        }
404
405        let mut tx_exec = MockTransactionExecutor::new();
406        let mut seq = mockall::Sequence::new();
407
408        // Expect all Bob's tickets get redeemed first
409        tx_exec
410            .expect_redeem_ticket()
411            .times(ticket_count)
412            .in_sequence(&mut seq)
413            .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
414            .returning(move |_| Ok(random_hash));
415
416        // and then all Charlie's tickets get redeemed
417        tx_exec
418            .expect_redeem_ticket()
419            .times(ticket_count)
420            .in_sequence(&mut seq)
421            .withf(move |t| charlie_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
422            .returning(move |_| Ok(random_hash));
423
424        // Start the ActionQueue with the mock TransactionExecutor
425        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
426        let tx_sender = tx_queue.new_sender();
427        tokio::task::spawn(async move {
428            tx_queue.start().await;
429        });
430
431        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
432
433        let confirmations = futures::future::try_join_all(actions.redeem_all_tickets(false).await?.into_iter()).await?;
434
435        assert_eq!(2 * ticket_count, confirmations.len(), "must have all confirmations");
436        assert!(
437            confirmations.into_iter().all(|c| c.tx_hash == random_hash),
438            "tx hashes must be equal"
439        );
440
441        let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
442
443        let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
444
445        assert!(
446            db_acks_bob
447                .into_iter()
448                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
449            "all bob's tickets must be in BeingRedeemed state"
450        );
451        assert!(
452            db_acks_charlie
453                .into_iter()
454                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
455            "all charlie's tickets must be in BeingRedeemed state"
456        );
457
458        Ok(())
459    }
460
461    #[tokio::test]
462    async fn test_ticket_redeem_in_channel() -> anyhow::Result<()> {
463        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
464
465        let ticket_count = 5;
466        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
467
468        // All the tickets can be redeemed because they are issued with the same channel epoch
469        let (mut channel_from_bob, bob_tickets) =
470            create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
471        let (channel_from_charlie, _) =
472            create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
473
474        // Tickets with index 0 will be skipped, as that is already past
475        channel_from_bob.ticket_index = 1_u32.into();
476        db.upsert_channel(None, channel_from_bob).await?;
477
478        let mut indexer_action_tracker = MockActionState::new();
479        let mut seq2 = mockall::Sequence::new();
480
481        // Skipping ticket with index 0
482        for tkt in bob_tickets.iter().skip(1).cloned() {
483            indexer_action_tracker
484                .expect_register_expectation()
485                .once()
486                .in_sequence(&mut seq2)
487                .return_once(move |_| {
488                    Ok(futures::future::ok(SignificantChainEvent {
489                        tx_hash: random_hash,
490                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
491                    })
492                    .boxed())
493                });
494        }
495
496        let mut tx_exec = MockTransactionExecutor::new();
497        let mut seq = mockall::Sequence::new();
498
499        // Expect only Bob's tickets to get redeemed
500        tx_exec
501            .expect_redeem_ticket()
502            .times(ticket_count - 1)
503            .in_sequence(&mut seq)
504            .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
505            .returning(move |_| Ok(random_hash));
506
507        // Start the ActionQueue with the mock TransactionExecutor
508        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
509        let tx_sender = tx_queue.new_sender();
510        tokio::task::spawn(async move {
511            tx_queue.start().await;
512        });
513
514        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
515
516        let confirmations = futures::future::try_join_all(
517            actions
518                .redeem_tickets_with_counterparty(&BOB.public().to_address(), false)
519                .await?
520                .into_iter(),
521        )
522        .await?;
523
524        // First ticket is skipped, because its index is lower than the index on the channel entry
525        assert_eq!(ticket_count - 1, confirmations.len(), "must have all confirmations");
526        assert!(
527            confirmations.into_iter().all(|c| c.tx_hash == random_hash),
528            "tx hashes must be equal"
529        );
530
531        let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
532
533        let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
534
535        assert!(
536            db_acks_bob
537                .into_iter()
538                .take_while(|tkt| tkt.verified_ticket().index != 0)
539                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
540            "all bob's tickets must be in BeingRedeemed state"
541        );
542        assert!(
543            db_acks_charlie
544                .into_iter()
545                .all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
546            "all charlie's tickets must be in Untouched state"
547        );
548
549        Ok(())
550    }
551
552    #[tokio::test]
553    async fn test_redeem_must_not_work_for_tickets_being_aggregated_and_being_redeemed() -> anyhow::Result<()> {
554        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
555
556        let ticket_count = 3;
557        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
558
559        let (channel_from_bob, mut tickets) =
560            create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
561
562        // Make the first ticket unredeemable
563        tickets[0].status = AcknowledgedTicketStatus::BeingAggregated;
564        let selector = TicketSelector::from(&tickets[0]).with_no_state();
565        db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingAggregated)
566            .await?;
567
568        // Make the second ticket unredeemable
569        tickets[1].status = AcknowledgedTicketStatus::BeingRedeemed;
570        let selector = TicketSelector::from(&tickets[1]).with_no_state();
571        db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingRedeemed)
572            .await?;
573
574        // Expect only the redeemable tickets get redeemed
575        let tickets_clone = tickets.clone();
576        let mut tx_exec = MockTransactionExecutor::new();
577        tx_exec
578            .expect_redeem_ticket()
579            .times(ticket_count - 2)
580            .withf(move |t| tickets_clone[2..].iter().any(|tk| tk.ticket.eq(&t.ticket)))
581            .returning(move |_| Ok(random_hash));
582
583        let mut indexer_action_tracker = MockActionState::new();
584        for tkt in tickets.iter().skip(2).cloned() {
585            indexer_action_tracker
586                .expect_register_expectation()
587                .once()
588                .return_once(move |_| {
589                    Ok(futures::future::ok(SignificantChainEvent {
590                        tx_hash: random_hash,
591                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
592                    })
593                    .boxed())
594                });
595        }
596
597        // Start the ActionQueue with the mock TransactionExecutor
598        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
599        let tx_sender = tx_queue.new_sender();
600        tokio::task::spawn(async move {
601            tx_queue.start().await;
602        });
603
604        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
605
606        let confirmations = futures::future::try_join_all(
607            actions
608                .redeem_tickets_in_channel(&channel_from_bob, false)
609                .await?
610                .into_iter(),
611        )
612        .await?;
613
614        assert_eq!(
615            ticket_count - 2,
616            confirmations.len(),
617            "must redeem only redeemable tickets in channel"
618        );
619
620        assert!(
621            actions.redeem_ticket(tickets[0].clone()).await.is_err(),
622            "cannot redeem a ticket that's being aggregated"
623        );
624
625        assert!(
626            actions.redeem_ticket(tickets[1].clone()).await.is_err(),
627            "cannot redeem a ticket that's being redeemed"
628        );
629
630        Ok(())
631    }
632
633    #[tokio::test]
634    async fn test_redeem_must_not_work_for_tickets_of_previous_epoch_being_aggregated_and_being_redeemed()
635    -> anyhow::Result<()> {
636        let ticket_count = 3;
637        let ticket_from_previous_epoch_count = 2;
638        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
639        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
640
641        // Create 1 ticket in Epoch 4
642        let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
643
644        // Insert another 2 tickets in Epoch 3
645        let ticket = generate_random_ack_ticket(0, &BOB, 3)?;
646        db.upsert_ticket(None, ticket.clone()).await?;
647        tickets.insert(0, ticket);
648
649        let ticket = generate_random_ack_ticket(1, &BOB, 3)?;
650        db.upsert_ticket(None, ticket.clone()).await?;
651        tickets.insert(1, ticket);
652
653        let tickets_clone = tickets.clone();
654        let mut tx_exec = MockTransactionExecutor::new();
655        tx_exec
656            .expect_redeem_ticket()
657            .times(ticket_count - ticket_from_previous_epoch_count)
658            .withf(move |t| {
659                tickets_clone[ticket_from_previous_epoch_count..]
660                    .iter()
661                    .any(|tk| tk.ticket.eq(&t.ticket))
662            })
663            .returning(move |_| Ok(random_hash));
664
665        let mut indexer_action_tracker = MockActionState::new();
666        for tkt in tickets.iter().skip(ticket_from_previous_epoch_count).cloned() {
667            indexer_action_tracker
668                .expect_register_expectation()
669                .once()
670                .return_once(move |_| {
671                    Ok(futures::future::ok(SignificantChainEvent {
672                        tx_hash: random_hash,
673                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
674                    })
675                    .boxed())
676                });
677        }
678
679        // Start the ActionQueue with the mock TransactionExecutor
680        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
681        let tx_sender = tx_queue.new_sender();
682        tokio::task::spawn(async move {
683            tx_queue.start().await;
684        });
685
686        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
687
688        futures::future::join_all(
689            actions
690                .redeem_tickets_in_channel(&channel_from_bob, false)
691                .await?
692                .into_iter(),
693        )
694        .await;
695
696        assert!(
697            actions.redeem_ticket(tickets[0].clone()).await.is_err(),
698            "cannot redeem a ticket that's from the previous epoch"
699        );
700
701        Ok(())
702    }
703
704    #[tokio::test]
705    async fn test_redeem_must_not_work_for_tickets_of_next_epoch_being_redeemed() -> anyhow::Result<()> {
706        let ticket_count = 3;
707        let ticket_from_next_epoch_count = 2;
708        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
709        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
710
711        // Create 1 ticket in Epoch 4
712        let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
713
714        // Insert another 2 tickets in Epoch 5
715        let ticket = generate_random_ack_ticket(0, &BOB, 5)?;
716        db.upsert_ticket(None, ticket.clone()).await?;
717        tickets.insert(0, ticket);
718
719        let ticket = generate_random_ack_ticket(1, &BOB, 5)?;
720        db.upsert_ticket(None, ticket.clone()).await?;
721        tickets.insert(1, ticket);
722
723        let tickets_clone = tickets.clone();
724        let mut tx_exec = MockTransactionExecutor::new();
725        tx_exec
726            .expect_redeem_ticket()
727            .times(ticket_count - ticket_from_next_epoch_count)
728            .withf(move |t| {
729                tickets_clone[ticket_from_next_epoch_count..]
730                    .iter()
731                    .any(|tk| tk.ticket.eq(&t.ticket))
732            })
733            .returning(move |_| Ok(random_hash));
734
735        let mut indexer_action_tracker = MockActionState::new();
736        for tkt in tickets.iter().skip(ticket_from_next_epoch_count).cloned() {
737            indexer_action_tracker
738                .expect_register_expectation()
739                .once()
740                .return_once(move |_| {
741                    Ok(futures::future::ok(SignificantChainEvent {
742                        tx_hash: random_hash,
743                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
744                    })
745                    .boxed())
746                });
747        }
748
749        // Start the ActionQueue with the mock TransactionExecutor
750        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
751        let tx_sender = tx_queue.new_sender();
752        tokio::task::spawn(async move {
753            tx_queue.start().await;
754        });
755
756        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
757
758        futures::future::join_all(
759            actions
760                .redeem_tickets_in_channel(&channel_from_bob, false)
761                .await?
762                .into_iter(),
763        )
764        .await;
765
766        for ticket in tickets.iter().take(ticket_from_next_epoch_count) {
767            assert!(
768                actions.redeem_ticket(ticket.clone()).await.is_err(),
769                "cannot redeem a ticket that's from the next epoch"
770            );
771        }
772
773        Ok(())
774    }
775
776    #[tokio::test]
777    async fn test_should_redeem_single_ticket() -> anyhow::Result<()> {
778        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
779        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
780
781        let (channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
782
783        let ticket = tickets.into_iter().next().unwrap();
784
785        let mut tx_exec = MockTransactionExecutor::new();
786        let ticket_clone = ticket.clone();
787        tx_exec
788            .expect_redeem_ticket()
789            .once()
790            .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
791            .returning(move |_| Ok(random_hash));
792
793        let mut indexer_action_tracker = MockActionState::new();
794        let ticket_clone = ticket.clone();
795        indexer_action_tracker
796            .expect_register_expectation()
797            .once()
798            .return_once(move |_| {
799                Ok(futures::future::ok(SignificantChainEvent {
800                    tx_hash: random_hash,
801                    event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
802                })
803                .boxed())
804            });
805
806        // Start the ActionQueue with the mock TransactionExecutor
807        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
808        let tx_sender = tx_queue.new_sender();
809        tokio::task::spawn(async move {
810            tx_queue.start().await;
811        });
812
813        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
814
815        let confirmation = actions.redeem_ticket(ticket).await?.await?;
816
817        assert_eq!(confirmation.tx_hash, random_hash);
818
819        assert!(
820            db.get_tickets((&channel_from_bob).into())
821                .await?
822                .into_iter()
823                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
824            "all bob's tickets must be in BeingRedeemed state"
825        );
826
827        Ok(())
828    }
829
830    #[tokio::test]
831    async fn test_should_not_redeem_single_ticket_with_lower_index_than_channel_index() -> anyhow::Result<()> {
832        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
833        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
834
835        let (mut channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
836
837        channel_from_bob.ticket_index = 2_u32.into();
838        db.upsert_channel(None, channel_from_bob).await?;
839
840        let ticket = tickets.into_iter().next().unwrap();
841
842        let mut tx_exec = MockTransactionExecutor::new();
843        let ticket_clone = ticket.clone();
844        tx_exec
845            .expect_redeem_ticket()
846            .never()
847            .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
848            .returning(move |_| Ok(random_hash));
849
850        let mut indexer_action_tracker = MockActionState::new();
851        let ticket_clone = ticket.clone();
852        indexer_action_tracker
853            .expect_register_expectation()
854            .never()
855            .return_once(move |_| {
856                Ok(futures::future::ok(SignificantChainEvent {
857                    tx_hash: random_hash,
858                    event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
859                })
860                .boxed())
861            });
862
863        // Start the ActionQueue with the mock TransactionExecutor
864        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
865        let tx_sender = tx_queue.new_sender();
866        tokio::task::spawn(async move {
867            tx_queue.start().await;
868        });
869
870        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
871
872        assert!(matches!(actions.redeem_ticket(ticket).await, Err(OldTicket)));
873
874        Ok(())
875    }
876}