hopr_chain_actions/
redeem.rs

1//! This module contains the [TicketRedeemActions] trait defining actions regarding
2//! ticket redemption.
3//!
4//! An implementation of this trait is added to [ChainActions] which realizes the redemption
5//! operations via [ActionQueue](crate::action_queue::ActionQueue).
6//!
7//! There are 4 functions that can be used to redeem tickets in the [TicketRedeemActions] trait:
8//! - [redeem_all_tickets](TicketRedeemActions::redeem_all_tickets)
9//! - [redeem_tickets_in_channel](TicketRedeemActions::redeem_tickets_in_channel)
10//! - [redeem_tickets_with_counterparty](TicketRedeemActions::redeem_tickets_with_counterparty)
11//! - [redeem_ticket](TicketRedeemActions::redeem_ticket)
12//!
13//! Each method first checks if the tickets are redeemable.
14//! (= they are not marked as [BeingRedeemed](hopr_internal_types::tickets::AcknowledgedTicketStatus::BeingRedeemed) or
15//! [BeingAggregated](hopr_internal_types::tickets::AcknowledgedTicketStatus::BeingAggregated) in the DB),
16//! If they are redeemable, their state is changed to
17//! [BeingRedeemed](hopr_internal_types::tickets::AcknowledgedTicketStatus::BeingRedeemed) (while having acquired the
18//! exclusive DB write lock). Subsequently, the ticket in such a state is transmitted into the
19//! [ActionQueue](crate::action_queue::ActionQueue) so the redemption is soon executed on-chain. The functions return
20//! immediately but provide futures that can be awaited in case the callers wish to await the on-chain confirmation of
21//! each ticket redemption.
22//!
23//! See the details in [ActionQueue](crate::action_queue::ActionQueue) on how the confirmation is realized by awaiting
24//! the respective [SignificantChainEvent](hopr_chain_types::chain_events::SignificantChainEvent). by the Indexer.
25use async_trait::async_trait;
26use futures::StreamExt;
27use hopr_chain_types::actions::Action;
28use hopr_crypto_types::types::Hash;
29use hopr_db_sql::{
30    api::{
31        info::DomainSeparator,
32        tickets::{HoprDbTicketOperations, TicketSelector},
33    },
34    channels::HoprDbChannelOperations,
35    prelude::HoprDbInfoOperations,
36};
37use hopr_internal_types::prelude::*;
38use hopr_primitive_types::prelude::*;
39use tracing::{debug, error, info, warn};
40
41use crate::{
42    ChainActions,
43    action_queue::PendingAction,
44    errors::{
45        ChainActionsError::{ChannelDoesNotExist, InvalidState, OldTicket, WrongTicketState},
46        Result,
47    },
48};
49
50lazy_static::lazy_static! {
51    /// Used as a placeholder when the redeem transaction has not yet been published on-chain
52    static ref EMPTY_TX_HASH: Hash = Hash::default();
53}
54
55/// Gathers all the ticket redemption-related on-chain calls.
56#[async_trait]
57pub trait TicketRedeemActions {
58    /// Redeems all redeemable tickets in all channels.
59    async fn redeem_all_tickets(&self, min_value: HoprBalance, only_aggregated: bool) -> Result<Vec<PendingAction>>;
60
61    /// Redeems all redeemable tickets in the incoming channel from the given counterparty.
62    async fn redeem_tickets_with_counterparty(
63        &self,
64        counterparty: &Address,
65        min_value: HoprBalance,
66        only_aggregated: bool,
67    ) -> Result<Vec<PendingAction>>;
68
69    /// Redeems all redeemable tickets in the given channel.
70    async fn redeem_tickets_in_channel(
71        &self,
72        channel: &ChannelEntry,
73        min_value: HoprBalance,
74        only_aggregated: bool,
75    ) -> Result<Vec<PendingAction>>;
76
77    /// Redeems all tickets based on the given [`TicketSelector`].
78    async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>>;
79
80    /// Tries to redeem the given ticket. If the ticket is not redeemable, returns an error.
81    /// Otherwise, the transaction hash of the on-chain redemption is returned.
82    async fn redeem_ticket(&self, ack: AcknowledgedTicket) -> Result<PendingAction>;
83}
84
85#[async_trait]
86impl<Db> TicketRedeemActions for ChainActions<Db>
87where
88    Db: HoprDbChannelOperations + HoprDbTicketOperations + HoprDbInfoOperations + Clone + Send + Sync + std::fmt::Debug,
89{
90    #[tracing::instrument(level = "debug", skip(self))]
91    async fn redeem_all_tickets(&self, min_value: HoprBalance, only_aggregated: bool) -> Result<Vec<PendingAction>> {
92        let incoming_channels = self
93            .db
94            .get_channels_via(None, ChannelDirection::Incoming, &self.self_address())
95            .await?;
96        debug!(
97            channel_count = incoming_channels.len(),
98            "starting to redeem all tickets in channels to self"
99        );
100
101        let mut receivers: Vec<PendingAction> = vec![];
102
103        // Must be synchronous because underlying Ethereum transactions are sequential
104        for incoming_channel in incoming_channels {
105            match self
106                .redeem_tickets_in_channel(&incoming_channel, min_value, only_aggregated)
107                .await
108            {
109                Ok(mut successful_txs) => {
110                    receivers.append(&mut successful_txs);
111                }
112                Err(e) => {
113                    warn!(
114                        channel = %generate_channel_id(&incoming_channel.source, &incoming_channel.destination),
115                        error = %e,
116                        "Failed to redeem tickets in channel",
117                    );
118                }
119            }
120        }
121
122        Ok(receivers)
123    }
124
125    #[tracing::instrument(level = "debug", skip(self))]
126    async fn redeem_tickets_with_counterparty(
127        &self,
128        counterparty: &Address,
129        min_value: HoprBalance,
130        only_aggregated: bool,
131    ) -> Result<Vec<PendingAction>> {
132        let maybe_channel = self
133            .db
134            .get_channel_by_parties(None, counterparty, &self.self_address(), false)
135            .await?;
136        if let Some(channel) = maybe_channel {
137            self.redeem_tickets_in_channel(&channel, min_value, only_aggregated)
138                .await
139        } else {
140            Err(ChannelDoesNotExist)
141        }
142    }
143
144    #[tracing::instrument(level = "debug", skip(self))]
145    async fn redeem_tickets_in_channel(
146        &self,
147        channel: &ChannelEntry,
148        min_value: HoprBalance,
149        only_aggregated: bool,
150    ) -> Result<Vec<PendingAction>> {
151        self.redeem_tickets(
152            TicketSelector::from(channel)
153                .with_aggregated_only(only_aggregated)
154                .with_index_range(channel.ticket_index.as_u64()..)
155                .with_amount(min_value..)
156                .with_state(AcknowledgedTicketStatus::Untouched),
157        )
158        .await
159    }
160
161    #[tracing::instrument(level = "debug", skip(self))]
162    async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>> {
163        let (count_redeemable_tickets, _) = self.db.get_tickets_value(selector.clone()).await?;
164
165        info!(
166            count_redeemable_tickets, %selector,
167            "acknowledged tickets in channel that can be redeemed"
168        );
169
170        // Return fast if there are no redeemable tickets
171        if count_redeemable_tickets == 0 {
172            return Ok(vec![]);
173        }
174
175        let channel_dst = self
176            .db
177            .get_indexer_data(None)
178            .await?
179            .domain_separator(DomainSeparator::Channel)
180            .ok_or(InvalidState("missing channel dst".into()))?;
181
182        let selector_id = selector.to_string();
183
184        // Collect here, so we don't hold-up the stream open for too long
185        let redeem_stream = self
186            .db
187            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
188            .await?
189            .collect::<Vec<_>>()
190            .await;
191
192        let mut receivers: Vec<PendingAction> = vec![];
193        for ack_ticket in redeem_stream {
194            let ticket_id = ack_ticket.to_string();
195
196            if let Ok(redeemable) = ack_ticket.into_redeemable(&self.chain_key, &channel_dst) {
197                let action = self.tx_sender.send(Action::RedeemTicket(redeemable)).await;
198                match action {
199                    Ok(successful_tx) => {
200                        receivers.push(successful_tx);
201                    }
202                    Err(e) => {
203                        error!(ticket_id, error = %e, "Failed to submit transaction that redeems ticket",);
204                    }
205                }
206            } else {
207                error!("failed to extract redeemable ticket");
208            }
209        }
210
211        info!(
212            count = receivers.len(),
213            selector = selector_id,
214            "acknowledged tickets were submitted to redeem in channel",
215        );
216
217        Ok(receivers)
218    }
219
220    #[tracing::instrument(level = "debug", skip(self))]
221    async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<PendingAction> {
222        if let Some(channel) = self
223            .db
224            .get_channel_by_id(None, &ack_ticket.verified_ticket().channel_id)
225            .await?
226        {
227            // Check if not trying to redeem a ticket that cannot be redeemed.
228            // Such tickets are automatically cleaned up (neglected) after successful redemption.
229            if ack_ticket.verified_ticket().index < channel.ticket_index.as_u64() {
230                return Err(OldTicket);
231            }
232
233            debug!(%ack_ticket, %channel, "redeeming single ticket");
234
235            let selector = TicketSelector::from(&channel)
236                .with_index(ack_ticket.verified_ticket().index)
237                .with_state(AcknowledgedTicketStatus::Untouched);
238
239            // Do not hold up the stream open for too long
240            let maybe_ticket = self
241                .db
242                .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
243                .await?
244                .next()
245                .await;
246
247            if let Some(ticket) = maybe_ticket {
248                let channel_dst = self
249                    .db
250                    .get_indexer_data(None)
251                    .await?
252                    .domain_separator(DomainSeparator::Channel)
253                    .ok_or(InvalidState("missing channel dst".into()))?;
254
255                let redeemable = ticket.into_redeemable(&self.chain_key, &channel_dst)?;
256
257                debug!(%ack_ticket, "ticket is redeemable");
258                Ok(self.tx_sender.send(Action::RedeemTicket(redeemable)).await?)
259            } else {
260                Err(WrongTicketState(ack_ticket.to_string()))
261            }
262        } else {
263            Err(ChannelDoesNotExist)
264        }
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use futures::FutureExt;
271    use hex_literal::hex;
272    use hopr_chain_types::chain_events::{ChainEventType::TicketRedeemed, SignificantChainEvent};
273    use hopr_crypto_random::{Randomizable, random_bytes};
274    use hopr_crypto_types::prelude::*;
275    use hopr_db_sql::{
276        HoprDbGeneralModelOperations, TargetDb, api::info::DomainSeparator, db::HoprDb, errors::DbSqlError,
277        info::HoprDbInfoOperations,
278    };
279
280    use super::*;
281    use crate::{
282        action_queue::{ActionQueue, MockTransactionExecutor},
283        action_state::MockActionState,
284    };
285
286    lazy_static::lazy_static! {
287        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
288        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be constructible");
289        static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
290    }
291
292    const PRICE_PER_PACKET: u128 = 10000000000000000u128; // 0.01 HOPR
293
294    fn generate_random_ack_ticket(
295        idx: u64,
296        counterparty: &ChainKeypair,
297        channel_epoch: u32,
298    ) -> anyhow::Result<AcknowledgedTicket> {
299        let hk1 = HalfKey::random();
300        let hk2 = HalfKey::random();
301
302        let resp = Response::from_half_keys(&hk1, &hk2)?;
303
304        Ok(TicketBuilder::default()
305            .addresses(counterparty, &*ALICE)
306            .amount(U256::from(PRICE_PER_PACKET).div_f64(1.0f64)? * 5u32)
307            .index(idx)
308            .index_offset(1)
309            .win_prob(WinningProbability::ALWAYS)
310            .channel_epoch(channel_epoch)
311            .challenge(resp.to_challenge()?)
312            .build_signed(counterparty, &Hash::default())?
313            .into_acknowledged(resp))
314    }
315
316    async fn create_channel_with_ack_tickets(
317        db: HoprDb,
318        ticket_count: usize,
319        counterparty: &ChainKeypair,
320        channel_epoch: u32,
321    ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
322        let ckp = counterparty.clone();
323        let db_clone = db.clone();
324        let channel = db
325            .begin_transaction()
326            .await?
327            .perform(|tx| {
328                Box::pin(async move {
329                    db_clone
330                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
331                        .await?;
332
333                    let channel = ChannelEntry::new(
334                        ckp.public().to_address(),
335                        ALICE.public().to_address(),
336                        0.into(),
337                        U256::zero(),
338                        ChannelStatus::Open,
339                        channel_epoch.into(),
340                    );
341                    db_clone.upsert_channel(Some(tx), channel).await?;
342                    Ok::<_, DbSqlError>(channel)
343                })
344            })
345            .await?;
346
347        let ckp = counterparty.clone();
348        let input_tickets = db
349            .begin_transaction_in_db(TargetDb::Tickets)
350            .await?
351            .perform(|tx| {
352                Box::pin(async move {
353                    let mut input_tickets = Vec::new();
354                    for i in 0..ticket_count {
355                        let ack_ticket = generate_random_ack_ticket(i as u64, &ckp, channel_epoch)
356                            .map_err(|e| hopr_db_sql::errors::DbSqlError::LogicalError(e.to_string()))?;
357                        db.upsert_ticket(Some(tx), ack_ticket.clone()).await?;
358                        input_tickets.push(ack_ticket);
359                    }
360                    Ok::<_, DbSqlError>(input_tickets)
361                })
362            })
363            .await?;
364
365        Ok((channel, input_tickets))
366    }
367
368    #[tokio::test]
369    async fn test_ticket_redeem_flow() -> anyhow::Result<()> {
370        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
371
372        let ticket_count = 5;
373        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
374
375        // All the tickets can be redeemed because they are issued with the same channel epoch
376        let (channel_from_bob, bob_tickets) =
377            create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
378        let (channel_from_charlie, charlie_tickets) =
379            create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
380
381        // Add extra ticket to Charlie that has very low value
382        let resp = Response::from_half_keys(&HalfKey::random(), &HalfKey::random())?;
383        let low_value_ack_ticket = TicketBuilder::default()
384            .addresses(&*CHARLIE, &*ALICE)
385            .amount(PRICE_PER_PACKET)
386            .index((ticket_count + 1) as u64)
387            .index_offset(1)
388            .win_prob(WinningProbability::ALWAYS)
389            .channel_epoch(4u32)
390            .challenge(resp.to_challenge()?)
391            .build_signed(&CHARLIE, &Hash::default())?
392            .into_acknowledged(resp);
393        db.upsert_ticket(None, low_value_ack_ticket).await?;
394
395        let mut indexer_action_tracker = MockActionState::new();
396        let mut seq2 = mockall::Sequence::new();
397
398        for tkt in bob_tickets.iter().cloned() {
399            indexer_action_tracker
400                .expect_register_expectation()
401                .once()
402                .in_sequence(&mut seq2)
403                .return_once(move |_| {
404                    Ok(futures::future::ok(SignificantChainEvent {
405                        tx_hash: random_hash,
406                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
407                    })
408                    .boxed())
409                });
410        }
411
412        for tkt in charlie_tickets.iter().cloned() {
413            indexer_action_tracker
414                .expect_register_expectation()
415                .once()
416                .in_sequence(&mut seq2)
417                .return_once(move |_| {
418                    Ok(futures::future::ok(SignificantChainEvent {
419                        tx_hash: random_hash,
420                        event_type: TicketRedeemed(channel_from_charlie, Some(tkt)),
421                    })
422                    .boxed())
423                });
424        }
425
426        let mut tx_exec = MockTransactionExecutor::new();
427        let mut seq = mockall::Sequence::new();
428
429        // Expect all Bob's tickets get redeemed first
430        tx_exec
431            .expect_redeem_ticket()
432            .times(ticket_count)
433            .in_sequence(&mut seq)
434            .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
435            .returning(move |_| Ok(random_hash));
436
437        // And then all Charlie's tickets get redeemed except the one that does not meet the minimum value
438        tx_exec
439            .expect_redeem_ticket()
440            .times(ticket_count)
441            .in_sequence(&mut seq)
442            .withf(move |t| charlie_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
443            .returning(move |_| Ok(random_hash));
444
445        // Start the ActionQueue with the mock TransactionExecutor
446        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
447        let tx_sender = tx_queue.new_sender();
448        tokio::task::spawn(async move {
449            tx_queue.start().await;
450        });
451
452        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
453
454        // Make sure that the low value ticket does not pass the min_value check
455        let confirmations = futures::future::try_join_all(
456            actions
457                .redeem_all_tickets((PRICE_PER_PACKET * 5).into(), false)
458                .await?
459                .into_iter(),
460        )
461        .await?;
462
463        assert_eq!(2 * ticket_count, confirmations.len(), "must have all confirmations");
464        assert!(
465            confirmations.into_iter().all(|c| c.tx_hash == random_hash),
466            "tx hashes must be equal"
467        );
468
469        let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
470
471        let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
472
473        assert!(
474            db_acks_bob
475                .into_iter()
476                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
477            "all bob's tickets must be in BeingRedeemed state"
478        );
479        assert!(
480            db_acks_charlie
481                .iter()
482                .take(ticket_count)
483                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
484            "all valuable charlie's tickets must be in BeingRedeemed state"
485        );
486        assert!(
487            db_acks_charlie
488                .iter()
489                .skip(ticket_count)
490                .all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
491            "all non-valuable charlie's tickets must be in Untouched state"
492        );
493
494        Ok(())
495    }
496
497    #[tokio::test]
498    async fn test_ticket_redeem_in_channel() -> anyhow::Result<()> {
499        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
500
501        let ticket_count = 5;
502        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
503
504        // All the tickets can be redeemed because they are issued with the same channel epoch
505        let (mut channel_from_bob, bob_tickets) =
506            create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
507        let (channel_from_charlie, _) =
508            create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
509
510        // Tickets with index 0 will be skipped, as that is already past
511        channel_from_bob.ticket_index = 1_u32.into();
512        db.upsert_channel(None, channel_from_bob).await?;
513
514        let mut indexer_action_tracker = MockActionState::new();
515        let mut seq2 = mockall::Sequence::new();
516
517        // Skipping ticket with index 0
518        for tkt in bob_tickets.iter().skip(1).cloned() {
519            indexer_action_tracker
520                .expect_register_expectation()
521                .once()
522                .in_sequence(&mut seq2)
523                .return_once(move |_| {
524                    Ok(futures::future::ok(SignificantChainEvent {
525                        tx_hash: random_hash,
526                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
527                    })
528                    .boxed())
529                });
530        }
531
532        let mut tx_exec = MockTransactionExecutor::new();
533        let mut seq = mockall::Sequence::new();
534
535        // Expect only Bob's tickets to get redeemed
536        tx_exec
537            .expect_redeem_ticket()
538            .times(ticket_count - 1)
539            .in_sequence(&mut seq)
540            .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
541            .returning(move |_| Ok(random_hash));
542
543        // Start the ActionQueue with the mock TransactionExecutor
544        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
545        let tx_sender = tx_queue.new_sender();
546        tokio::task::spawn(async move {
547            tx_queue.start().await;
548        });
549
550        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
551
552        let confirmations = futures::future::try_join_all(
553            actions
554                .redeem_tickets_with_counterparty(&BOB.public().to_address(), 0.into(), false)
555                .await?
556                .into_iter(),
557        )
558        .await?;
559
560        // First ticket is skipped, because its index is lower than the index on the channel entry
561        assert_eq!(ticket_count - 1, confirmations.len(), "must have all confirmations");
562        assert!(
563            confirmations.into_iter().all(|c| c.tx_hash == random_hash),
564            "tx hashes must be equal"
565        );
566
567        let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
568
569        let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
570
571        assert!(
572            db_acks_bob
573                .into_iter()
574                .take_while(|tkt| tkt.verified_ticket().index != 0)
575                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
576            "all bob's tickets must be in BeingRedeemed state"
577        );
578        assert!(
579            db_acks_charlie
580                .into_iter()
581                .all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
582            "all charlie's tickets must be in Untouched state"
583        );
584
585        Ok(())
586    }
587
588    #[tokio::test]
589    async fn test_redeem_must_not_work_for_tickets_being_aggregated_and_being_redeemed() -> anyhow::Result<()> {
590        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
591
592        let ticket_count = 3;
593        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
594
595        let (channel_from_bob, mut tickets) =
596            create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
597
598        // Make the first ticket unredeemable
599        tickets[0].status = AcknowledgedTicketStatus::BeingAggregated;
600        let selector = TicketSelector::from(&tickets[0]).with_no_state();
601        db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingAggregated)
602            .await?;
603
604        // Make the second ticket unredeemable
605        tickets[1].status = AcknowledgedTicketStatus::BeingRedeemed;
606        let selector = TicketSelector::from(&tickets[1]).with_no_state();
607        db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingRedeemed)
608            .await?;
609
610        // Expect only the redeemable tickets get redeemed
611        let tickets_clone = tickets.clone();
612        let mut tx_exec = MockTransactionExecutor::new();
613        tx_exec
614            .expect_redeem_ticket()
615            .times(ticket_count - 2)
616            .withf(move |t| tickets_clone[2..].iter().any(|tk| tk.ticket.eq(&t.ticket)))
617            .returning(move |_| Ok(random_hash));
618
619        let mut indexer_action_tracker = MockActionState::new();
620        for tkt in tickets.iter().skip(2).cloned() {
621            indexer_action_tracker
622                .expect_register_expectation()
623                .once()
624                .return_once(move |_| {
625                    Ok(futures::future::ok(SignificantChainEvent {
626                        tx_hash: random_hash,
627                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
628                    })
629                    .boxed())
630                });
631        }
632
633        // Start the ActionQueue with the mock TransactionExecutor
634        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
635        let tx_sender = tx_queue.new_sender();
636        tokio::task::spawn(async move {
637            tx_queue.start().await;
638        });
639
640        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
641
642        let confirmations = futures::future::try_join_all(
643            actions
644                .redeem_tickets_in_channel(&channel_from_bob, 0.into(), false)
645                .await?
646                .into_iter(),
647        )
648        .await?;
649
650        assert_eq!(
651            ticket_count - 2,
652            confirmations.len(),
653            "must redeem only redeemable tickets in channel"
654        );
655
656        assert!(
657            actions.redeem_ticket(tickets[0].clone()).await.is_err(),
658            "cannot redeem a ticket that's being aggregated"
659        );
660
661        assert!(
662            actions.redeem_ticket(tickets[1].clone()).await.is_err(),
663            "cannot redeem a ticket that's being redeemed"
664        );
665
666        Ok(())
667    }
668
669    #[tokio::test]
670    async fn test_redeem_must_not_work_for_tickets_of_previous_epoch_being_aggregated_and_being_redeemed()
671    -> anyhow::Result<()> {
672        let ticket_count = 3;
673        let ticket_from_previous_epoch_count = 2;
674        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
675        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
676
677        // Create 1 ticket in Epoch 4
678        let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
679
680        // Insert another 2 tickets in Epoch 3
681        let ticket = generate_random_ack_ticket(0, &BOB, 3)?;
682        db.upsert_ticket(None, ticket.clone()).await?;
683        tickets.insert(0, ticket);
684
685        let ticket = generate_random_ack_ticket(1, &BOB, 3)?;
686        db.upsert_ticket(None, ticket.clone()).await?;
687        tickets.insert(1, ticket);
688
689        let tickets_clone = tickets.clone();
690        let mut tx_exec = MockTransactionExecutor::new();
691        tx_exec
692            .expect_redeem_ticket()
693            .times(ticket_count - ticket_from_previous_epoch_count)
694            .withf(move |t| {
695                tickets_clone[ticket_from_previous_epoch_count..]
696                    .iter()
697                    .any(|tk| tk.ticket.eq(&t.ticket))
698            })
699            .returning(move |_| Ok(random_hash));
700
701        let mut indexer_action_tracker = MockActionState::new();
702        for tkt in tickets.iter().skip(ticket_from_previous_epoch_count).cloned() {
703            indexer_action_tracker
704                .expect_register_expectation()
705                .once()
706                .return_once(move |_| {
707                    Ok(futures::future::ok(SignificantChainEvent {
708                        tx_hash: random_hash,
709                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
710                    })
711                    .boxed())
712                });
713        }
714
715        // Start the ActionQueue with the mock TransactionExecutor
716        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
717        let tx_sender = tx_queue.new_sender();
718        tokio::task::spawn(async move {
719            tx_queue.start().await;
720        });
721
722        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
723
724        futures::future::join_all(
725            actions
726                .redeem_tickets_in_channel(&channel_from_bob, 0.into(), false)
727                .await?
728                .into_iter(),
729        )
730        .await;
731
732        assert!(
733            actions.redeem_ticket(tickets[0].clone()).await.is_err(),
734            "cannot redeem a ticket that's from the previous epoch"
735        );
736
737        Ok(())
738    }
739
740    #[tokio::test]
741    async fn test_redeem_must_not_work_for_tickets_of_next_epoch_being_redeemed() -> anyhow::Result<()> {
742        let ticket_count = 3;
743        let ticket_from_next_epoch_count = 2;
744        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
745        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
746
747        // Create 1 ticket in Epoch 4
748        let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
749
750        // Insert another 2 tickets in Epoch 5
751        let ticket = generate_random_ack_ticket(0, &BOB, 5)?;
752        db.upsert_ticket(None, ticket.clone()).await?;
753        tickets.insert(0, ticket);
754
755        let ticket = generate_random_ack_ticket(1, &BOB, 5)?;
756        db.upsert_ticket(None, ticket.clone()).await?;
757        tickets.insert(1, ticket);
758
759        let tickets_clone = tickets.clone();
760        let mut tx_exec = MockTransactionExecutor::new();
761        tx_exec
762            .expect_redeem_ticket()
763            .times(ticket_count - ticket_from_next_epoch_count)
764            .withf(move |t| {
765                tickets_clone[ticket_from_next_epoch_count..]
766                    .iter()
767                    .any(|tk| tk.ticket.eq(&t.ticket))
768            })
769            .returning(move |_| Ok(random_hash));
770
771        let mut indexer_action_tracker = MockActionState::new();
772        for tkt in tickets.iter().skip(ticket_from_next_epoch_count).cloned() {
773            indexer_action_tracker
774                .expect_register_expectation()
775                .once()
776                .return_once(move |_| {
777                    Ok(futures::future::ok(SignificantChainEvent {
778                        tx_hash: random_hash,
779                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
780                    })
781                    .boxed())
782                });
783        }
784
785        // Start the ActionQueue with the mock TransactionExecutor
786        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
787        let tx_sender = tx_queue.new_sender();
788        tokio::task::spawn(async move {
789            tx_queue.start().await;
790        });
791
792        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
793
794        futures::future::join_all(
795            actions
796                .redeem_tickets_in_channel(&channel_from_bob, 0.into(), false)
797                .await?
798                .into_iter(),
799        )
800        .await;
801
802        for ticket in tickets.iter().take(ticket_from_next_epoch_count) {
803            assert!(
804                actions.redeem_ticket(ticket.clone()).await.is_err(),
805                "cannot redeem a ticket that's from the next epoch"
806            );
807        }
808
809        Ok(())
810    }
811
812    #[tokio::test]
813    async fn test_should_redeem_single_ticket() -> anyhow::Result<()> {
814        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
815        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
816
817        let (channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
818
819        let ticket = tickets.into_iter().next().unwrap();
820
821        let mut tx_exec = MockTransactionExecutor::new();
822        let ticket_clone = ticket.clone();
823        tx_exec
824            .expect_redeem_ticket()
825            .once()
826            .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
827            .returning(move |_| Ok(random_hash));
828
829        let mut indexer_action_tracker = MockActionState::new();
830        let ticket_clone = ticket.clone();
831        indexer_action_tracker
832            .expect_register_expectation()
833            .once()
834            .return_once(move |_| {
835                Ok(futures::future::ok(SignificantChainEvent {
836                    tx_hash: random_hash,
837                    event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
838                })
839                .boxed())
840            });
841
842        // Start the ActionQueue with the mock TransactionExecutor
843        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
844        let tx_sender = tx_queue.new_sender();
845        tokio::task::spawn(async move {
846            tx_queue.start().await;
847        });
848
849        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
850
851        let confirmation = actions.redeem_ticket(ticket).await?.await?;
852
853        assert_eq!(confirmation.tx_hash, random_hash);
854
855        assert!(
856            db.get_tickets((&channel_from_bob).into())
857                .await?
858                .into_iter()
859                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
860            "all bob's tickets must be in BeingRedeemed state"
861        );
862
863        Ok(())
864    }
865
866    #[tokio::test]
867    async fn test_should_not_redeem_single_ticket_with_lower_index_than_channel_index() -> anyhow::Result<()> {
868        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
869        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
870
871        let (mut channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
872
873        channel_from_bob.ticket_index = 2_u32.into();
874        db.upsert_channel(None, channel_from_bob).await?;
875
876        let ticket = tickets.into_iter().next().unwrap();
877
878        let mut tx_exec = MockTransactionExecutor::new();
879        let ticket_clone = ticket.clone();
880        tx_exec
881            .expect_redeem_ticket()
882            .never()
883            .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
884            .returning(move |_| Ok(random_hash));
885
886        let mut indexer_action_tracker = MockActionState::new();
887        let ticket_clone = ticket.clone();
888        indexer_action_tracker
889            .expect_register_expectation()
890            .never()
891            .return_once(move |_| {
892                Ok(futures::future::ok(SignificantChainEvent {
893                    tx_hash: random_hash,
894                    event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
895                })
896                .boxed())
897            });
898
899        // Start the ActionQueue with the mock TransactionExecutor
900        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
901        let tx_sender = tx_queue.new_sender();
902        tokio::task::spawn(async move {
903            tx_queue.start().await;
904        });
905
906        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
907
908        assert!(matches!(actions.redeem_ticket(ticket).await, Err(OldTicket)));
909
910        Ok(())
911    }
912}