hopr_chain_actions/
redeem.rs

1//! This module contains the [TicketRedeemActions] trait defining actions regarding
2//! ticket redemption.
3//!
4//! An implementation of this trait is added to [ChainActions] which realizes the redemption
5//! operations via [ActionQueue](crate::action_queue::ActionQueue).
6//!
7//! There are 4 functions that can be used to redeem tickets in the [TicketRedeemActions] trait:
8//! - [redeem_all_tickets](TicketRedeemActions::redeem_all_tickets)
9//! - [redeem_tickets_in_channel](TicketRedeemActions::redeem_tickets_in_channel)
10//! - [redeem_tickets_with_counterparty](TicketRedeemActions::redeem_tickets_with_counterparty)
11//! - [redeem_ticket](TicketRedeemActions::redeem_ticket)
12//!
13//! Each method first checks if the tickets are redeemable.
14//! (= they are not marked as [BeingRedeemed](hopr_internal_types::tickets::AcknowledgedTicketStatus::BeingRedeemed) or
15//! [BeingAggregated](hopr_internal_types::tickets::AcknowledgedTicketStatus::BeingAggregated) in the DB),
16//! If they are redeemable, their state is changed to
17//! [BeingRedeemed](hopr_internal_types::tickets::AcknowledgedTicketStatus::BeingRedeemed) (while having acquired the
18//! exclusive DB write lock). Subsequently, the ticket in such a state is transmitted into the
19//! [ActionQueue](crate::action_queue::ActionQueue) so the redemption is soon executed on-chain. The functions return
20//! immediately but provide futures that can be awaited in case the callers wish to await the on-chain confirmation of
21//! each ticket redemption.
22//!
23//! See the details in [ActionQueue](crate::action_queue::ActionQueue) on how the confirmation is realized by awaiting
24//! the respective [SignificantChainEvent](hopr_chain_types::chain_events::SignificantChainEvent). by the Indexer.
25use async_trait::async_trait;
26use futures::StreamExt;
27use hopr_api::db::{HoprDbTicketOperations, TicketSelector};
28use hopr_chain_types::actions::Action;
29use hopr_crypto_types::types::Hash;
30use hopr_db_sql::prelude::{HoprDbChannelOperations, HoprDbInfoOperations};
31use hopr_internal_types::prelude::*;
32use tracing::{debug, error, info};
33
34use crate::{
35    ChainActions,
36    action_queue::PendingAction,
37    errors::{
38        ChainActionsError,
39        ChainActionsError::{ChannelDoesNotExist, NodeDbError, OldTicket, WrongTicketState},
40        Result,
41    },
42};
43
44lazy_static::lazy_static! {
45    /// Used as a placeholder when the redeem transaction has not yet been published on-chain
46    static ref EMPTY_TX_HASH: Hash = Hash::default();
47}
48
49/// Gathers all the ticket-redemption-related on-chain calls.
50#[async_trait]
51pub trait TicketRedeemActions {
52    /// Redeems all tickets based on the given [`TicketSelector`].
53    async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>>;
54
55    /// Tries to redeem the given ticket. If the ticket is not redeemable, returns an error.
56    /// Otherwise, the transaction hash of the on-chain redemption is returned.
57    async fn redeem_ticket(&self, ack: AcknowledgedTicket) -> Result<PendingAction>;
58}
59
60#[async_trait]
61impl<Db> TicketRedeemActions for ChainActions<Db>
62where
63    Db: HoprDbTicketOperations + Sync + 'static,
64{
65    #[tracing::instrument(level = "debug", skip(self))]
66    async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>> {
67        // Add safeguard so that we do not accidentally try to redeem tickets which
68        // are already in progress of some sort.
69        // Note that the caller is still responsible for taking care of the ticket index range
70        // not being less than the next ticket index on the corresponding channel entry.
71        let selector = selector.with_state(AcknowledgedTicketStatus::Untouched);
72
73        let (count_redeemable_tickets, _) = self
74            .node_db
75            .get_tickets_value(selector.clone())
76            .await
77            .map_err(|e| NodeDbError(e.into()))?;
78
79        info!(
80            count_redeemable_tickets, %selector,
81            "acknowledged tickets in channel that can be redeemed"
82        );
83
84        // Return fast if there are no redeemable tickets
85        if count_redeemable_tickets == 0 {
86            return Ok(vec![]);
87        }
88
89        let channel_dst = self
90            .index_db
91            .get_indexer_data(None)
92            .await?
93            .channels_dst
94            .ok_or(ChainActionsError::MissingDomainSeparator)?;
95
96        let selector_id = selector.to_string();
97
98        // Collect here, so we don't hold-up the stream open for too long
99        let mut tickets_to_redeem = self
100            .node_db
101            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
102            .await
103            .map_err(|e| NodeDbError(e.into()))?
104            .collect::<Vec<_>>()
105            .await;
106
107        // Make sure the tickets to be redeemed are sorted by index
108        tickets_to_redeem.sort();
109
110        let mut receivers: Vec<PendingAction> = vec![];
111        for ack_ticket in tickets_to_redeem {
112            let ticket_id = ack_ticket.to_string();
113
114            if let Ok(redeemable) = ack_ticket.into_redeemable(&self.chain_key, &channel_dst) {
115                let action = self.tx_sender.send(Action::RedeemTicket(redeemable)).await;
116                match action {
117                    Ok(successful_tx) => {
118                        receivers.push(successful_tx);
119                    }
120                    Err(e) => {
121                        error!(ticket_id, error = %e, "Failed to submit transaction that redeems ticket",);
122                    }
123                }
124            } else {
125                error!("failed to extract redeemable ticket");
126            }
127        }
128
129        info!(
130            count = receivers.len(),
131            selector = selector_id,
132            "acknowledged tickets were submitted to redeem in channel",
133        );
134
135        Ok(receivers)
136    }
137
138    #[tracing::instrument(level = "debug", skip(self))]
139    async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<PendingAction> {
140        if let Some(channel) = self
141            .index_db
142            .get_channel_by_id(None, &ack_ticket.verified_ticket().channel_id)
143            .await?
144        {
145            // Check if not trying to redeem a ticket that cannot be redeemed.
146            // Such tickets are automatically cleaned up (neglected) after successful redemption.
147            if ack_ticket.verified_ticket().index < channel.ticket_index.as_u64() {
148                return Err(OldTicket);
149            }
150
151            debug!(%ack_ticket, %channel, "redeeming single ticket");
152
153            let selector = TicketSelector::from(&channel)
154                .with_index(ack_ticket.verified_ticket().index)
155                .with_state(AcknowledgedTicketStatus::Untouched);
156
157            // Do not hold up the stream open for too long
158            let maybe_ticket = self
159                .node_db
160                .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
161                .await
162                .map_err(|e| NodeDbError(e.into()))?
163                .next()
164                .await;
165
166            if let Some(ticket) = maybe_ticket {
167                let channel_dst = self
168                    .index_db
169                    .get_indexer_data(None)
170                    .await?
171                    .channels_dst
172                    .ok_or(ChainActionsError::MissingDomainSeparator)?;
173
174                let redeemable = ticket.into_redeemable(&self.chain_key, &channel_dst)?;
175
176                debug!(%ack_ticket, "ticket is redeemable");
177                Ok(self.tx_sender.send(Action::RedeemTicket(redeemable)).await?)
178            } else {
179                Err(WrongTicketState(ack_ticket.to_string()))
180            }
181        } else {
182            Err(ChannelDoesNotExist)
183        }
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use futures::FutureExt;
190    use hex_literal::hex;
191    use hopr_chain_types::chain_events::{ChainEventType::TicketRedeemed, SignificantChainEvent};
192    use hopr_crypto_random::{Randomizable, random_bytes};
193    use hopr_crypto_types::prelude::*;
194    use hopr_db_node::HoprNodeDb;
195    use hopr_db_sql::{
196        HoprDbGeneralModelOperations, HoprIndexerDb,
197        errors::DbSqlError,
198        info::HoprDbInfoOperations,
199        prelude::{DomainSeparator, HoprDbChannelOperations},
200    };
201    use hopr_primitive_types::{
202        balance::HoprBalance,
203        prelude::{BytesRepresentable, U256, UnitaryFloatOps},
204    };
205
206    use super::*;
207    use crate::{
208        action_queue::{ActionQueue, MockTransactionExecutor},
209        action_state::MockActionState,
210    };
211
212    lazy_static::lazy_static! {
213        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
214        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be constructible");
215        static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
216    }
217
218    const PRICE_PER_PACKET: u128 = 10000000000000000u128; // 0.01 HOPR
219
220    fn generate_random_ack_ticket(
221        idx: u64,
222        counterparty: &ChainKeypair,
223        channel_epoch: u32,
224    ) -> anyhow::Result<AcknowledgedTicket> {
225        let hk1 = HalfKey::random();
226        let hk2 = HalfKey::random();
227
228        let resp = Response::from_half_keys(&hk1, &hk2)?;
229
230        Ok(TicketBuilder::default()
231            .addresses(counterparty, &*ALICE)
232            .amount(U256::from(PRICE_PER_PACKET).div_f64(1.0f64)? * 5u32)
233            .index(idx)
234            .index_offset(1)
235            .win_prob(WinningProbability::ALWAYS)
236            .channel_epoch(channel_epoch)
237            .challenge(resp.to_challenge()?)
238            .build_signed(counterparty, &Hash::default())?
239            .into_acknowledged(resp))
240    }
241
242    async fn create_channel_with_ack_tickets(
243        db: HoprIndexerDb,
244        node_db: HoprNodeDb,
245        ticket_count: usize,
246        counterparty: &ChainKeypair,
247        channel_epoch: u32,
248    ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
249        let ckp = counterparty.clone();
250        let db_clone = db.clone();
251        let channel = db
252            .begin_transaction()
253            .await?
254            .perform(|tx| {
255                Box::pin(async move {
256                    db_clone
257                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
258                        .await?;
259
260                    let channel = ChannelEntry::new(
261                        ckp.public().to_address(),
262                        ALICE.public().to_address(),
263                        0.into(),
264                        U256::zero(),
265                        ChannelStatus::Open,
266                        channel_epoch.into(),
267                    );
268                    db_clone.upsert_channel(Some(tx), channel).await?;
269                    Ok::<_, DbSqlError>(channel)
270                })
271            })
272            .await?;
273
274        let ckp = counterparty.clone();
275
276        let mut input_tickets = Vec::new();
277        for i in 0..ticket_count {
278            let ack_ticket = generate_random_ack_ticket(i as u64, &ckp, channel_epoch)
279                .map_err(|e| hopr_db_sql::errors::DbSqlError::LogicalError(e.to_string()))?;
280            node_db.upsert_ticket(ack_ticket.clone()).await?;
281            input_tickets.push(ack_ticket);
282        }
283
284        Ok((channel, input_tickets))
285    }
286
287    #[tokio::test]
288    async fn test_ticket_redeem_flow() -> anyhow::Result<()> {
289        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
290
291        let ticket_count = 5;
292        let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
293        let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
294
295        // All the tickets can be redeemed because they are issued with the same channel epoch
296        let (channel_from_bob, bob_tickets) =
297            create_channel_with_ack_tickets(db.clone(), node_db.clone(), ticket_count, &BOB, 4u32).await?;
298        let (channel_from_charlie, charlie_tickets) =
299            create_channel_with_ack_tickets(db.clone(), node_db.clone(), ticket_count, &CHARLIE, 4u32).await?;
300
301        // Add extra ticket to Charlie that has very low value
302        let resp = Response::from_half_keys(&HalfKey::random(), &HalfKey::random())?;
303        let low_value_ack_ticket = TicketBuilder::default()
304            .addresses(&*CHARLIE, &*ALICE)
305            .amount(PRICE_PER_PACKET)
306            .index((ticket_count + 1) as u64)
307            .index_offset(1)
308            .win_prob(WinningProbability::ALWAYS)
309            .channel_epoch(4u32)
310            .challenge(resp.to_challenge()?)
311            .build_signed(&CHARLIE, &Hash::default())?
312            .into_acknowledged(resp);
313        node_db.upsert_ticket(low_value_ack_ticket).await?;
314
315        let mut indexer_action_tracker = MockActionState::new();
316        let mut seq2 = mockall::Sequence::new();
317
318        for tkt in bob_tickets.iter().cloned() {
319            indexer_action_tracker
320                .expect_register_expectation()
321                .once()
322                .in_sequence(&mut seq2)
323                .return_once(move |_| {
324                    Ok(futures::future::ok(SignificantChainEvent {
325                        tx_hash: random_hash,
326                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
327                    })
328                    .boxed())
329                });
330        }
331
332        for tkt in charlie_tickets.iter().cloned() {
333            indexer_action_tracker
334                .expect_register_expectation()
335                .once()
336                .in_sequence(&mut seq2)
337                .return_once(move |_| {
338                    Ok(futures::future::ok(SignificantChainEvent {
339                        tx_hash: random_hash,
340                        event_type: TicketRedeemed(channel_from_charlie, Some(tkt)),
341                    })
342                    .boxed())
343                });
344        }
345
346        let mut tx_exec = MockTransactionExecutor::new();
347        let mut seq = mockall::Sequence::new();
348
349        // Expect all Bob's tickets get redeemed first
350        tx_exec
351            .expect_redeem_ticket()
352            .times(ticket_count)
353            .in_sequence(&mut seq)
354            .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
355            .returning(move |_| Ok(random_hash));
356
357        // And then all Charlie's tickets get redeemed except the one that does not meet the minimum value
358        tx_exec
359            .expect_redeem_ticket()
360            .times(ticket_count)
361            .in_sequence(&mut seq)
362            .withf(move |t| charlie_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
363            .returning(move |_| Ok(random_hash));
364
365        // Start the ActionQueue with the mock TransactionExecutor
366        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
367        let tx_sender = tx_queue.new_sender();
368        tokio::task::spawn(async move {
369            tx_queue.start().await;
370        });
371
372        let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
373
374        // Make sure that the low value ticket does not pass the min_value check
375        let confirmations = futures::future::try_join_all(
376            actions
377                .redeem_tickets(
378                    TicketSelector::from(&channel_from_bob)
379                        .also_on_channel_entry(&channel_from_charlie)
380                        .with_amount(HoprBalance::from(PRICE_PER_PACKET * 5)..),
381                )
382                .await?
383                .into_iter(),
384        )
385        .await?;
386
387        assert_eq!(2 * ticket_count, confirmations.len(), "must have all confirmations");
388        assert!(
389            confirmations.into_iter().all(|c| c.tx_hash == random_hash),
390            "tx hashes must be equal"
391        );
392
393        let db_acks_bob = node_db
394            .stream_tickets(Some((&channel_from_bob).into()))
395            .await?
396            .collect::<Vec<_>>()
397            .await;
398
399        let db_acks_charlie = node_db
400            .stream_tickets(Some((&channel_from_charlie).into()))
401            .await?
402            .collect::<Vec<_>>()
403            .await;
404
405        assert!(
406            db_acks_bob
407                .into_iter()
408                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
409            "all bob's tickets must be in BeingRedeemed state"
410        );
411        assert!(
412            db_acks_charlie
413                .iter()
414                .take(ticket_count)
415                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
416            "all valuable charlie's tickets must be in BeingRedeemed state"
417        );
418        assert!(
419            db_acks_charlie
420                .iter()
421                .skip(ticket_count)
422                .all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
423            "all non-valuable charlie's tickets must be in Untouched state"
424        );
425
426        Ok(())
427    }
428
429    #[tokio::test]
430    async fn test_ticket_redeem_in_channel() -> anyhow::Result<()> {
431        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
432
433        let ticket_count = 5;
434        let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
435        let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
436
437        // All the tickets can be redeemed because they are issued with the same channel epoch
438        let (mut channel_from_bob, bob_tickets) =
439            create_channel_with_ack_tickets(db.clone(), node_db.clone(), ticket_count, &BOB, 4u32).await?;
440        let (channel_from_charlie, _) =
441            create_channel_with_ack_tickets(db.clone(), node_db.clone(), ticket_count, &CHARLIE, 4u32).await?;
442
443        // Tickets with index 0 will be skipped, as that is already past
444        channel_from_bob.ticket_index = 1_u32.into();
445        db.upsert_channel(None, channel_from_bob).await?;
446
447        let mut indexer_action_tracker = MockActionState::new();
448        let mut seq2 = mockall::Sequence::new();
449
450        // Skipping ticket with index 0
451        for tkt in bob_tickets.iter().skip(1).cloned() {
452            indexer_action_tracker
453                .expect_register_expectation()
454                .once()
455                .in_sequence(&mut seq2)
456                .return_once(move |_| {
457                    Ok(futures::future::ok(SignificantChainEvent {
458                        tx_hash: random_hash,
459                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
460                    })
461                    .boxed())
462                });
463        }
464
465        let mut tx_exec = MockTransactionExecutor::new();
466        let mut seq = mockall::Sequence::new();
467
468        // Expect only Bob's tickets to get redeemed
469        tx_exec
470            .expect_redeem_ticket()
471            .times(ticket_count - 1)
472            .in_sequence(&mut seq)
473            .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
474            .returning(move |_| Ok(random_hash));
475
476        // Start the ActionQueue with the mock TransactionExecutor
477        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
478        let tx_sender = tx_queue.new_sender();
479        tokio::task::spawn(tx_queue.start());
480
481        let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
482
483        let confirmations = futures::future::try_join_all(
484            actions
485                .redeem_tickets(TicketSelector::from(&channel_from_bob).with_index_range(1..))
486                .await?
487                .into_iter(),
488        )
489        .await?;
490
491        // First ticket is skipped, because its index is lower than the index on the channel entry
492        assert_eq!(ticket_count - 1, confirmations.len(), "must have all confirmations");
493        assert!(
494            confirmations.into_iter().all(|c| c.tx_hash == random_hash),
495            "tx hashes must be equal"
496        );
497
498        let db_acks_bob = node_db
499            .stream_tickets(Some((&channel_from_bob).into()))
500            .await?
501            .collect::<Vec<_>>()
502            .await;
503
504        let db_acks_charlie = node_db
505            .stream_tickets(Some((&channel_from_charlie).into()))
506            .await?
507            .collect::<Vec<_>>()
508            .await;
509
510        assert!(
511            db_acks_bob
512                .into_iter()
513                .take_while(|tkt| tkt.verified_ticket().index != 0)
514                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
515            "all bob's tickets must be in BeingRedeemed state"
516        );
517        assert!(
518            db_acks_charlie
519                .into_iter()
520                .all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
521            "all charlie's tickets must be in Untouched state"
522        );
523
524        Ok(())
525    }
526
527    #[tokio::test]
528    async fn test_redeem_must_not_work_for_tickets_being_aggregated_and_being_redeemed() -> anyhow::Result<()> {
529        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
530
531        let ticket_count = 3;
532        let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
533        let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
534
535        let (channel_from_bob, mut tickets) =
536            create_channel_with_ack_tickets(db.clone(), node_db.clone(), ticket_count, &BOB, 4u32).await?;
537
538        // Make the first ticket unredeemable
539        tickets[0].status = AcknowledgedTicketStatus::BeingAggregated;
540        let selector = TicketSelector::from(&tickets[0]).with_no_state();
541        node_db
542            .update_ticket_states(selector, AcknowledgedTicketStatus::BeingAggregated)
543            .await?;
544
545        // Make the second ticket unredeemable
546        tickets[1].status = AcknowledgedTicketStatus::BeingRedeemed;
547        let selector = TicketSelector::from(&tickets[1]).with_no_state();
548        node_db
549            .update_ticket_states(selector, AcknowledgedTicketStatus::BeingRedeemed)
550            .await?;
551
552        // Expect only the redeemable tickets get redeemed
553        let tickets_clone = tickets.clone();
554        let mut tx_exec = MockTransactionExecutor::new();
555        tx_exec
556            .expect_redeem_ticket()
557            .times(ticket_count - 2)
558            .withf(move |t| tickets_clone[2..].iter().any(|tk| tk.ticket.eq(&t.ticket)))
559            .returning(move |_| Ok(random_hash));
560
561        let mut indexer_action_tracker = MockActionState::new();
562        for tkt in tickets.iter().skip(2).cloned() {
563            indexer_action_tracker
564                .expect_register_expectation()
565                .once()
566                .return_once(move |_| {
567                    Ok(futures::future::ok(SignificantChainEvent {
568                        tx_hash: random_hash,
569                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
570                    })
571                    .boxed())
572                });
573        }
574
575        // Start the ActionQueue with the mock TransactionExecutor
576        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
577        let tx_sender = tx_queue.new_sender();
578        tokio::task::spawn(tx_queue.start());
579
580        let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
581
582        let confirmations = futures::future::try_join_all(
583            actions
584                .redeem_tickets(TicketSelector::from(&channel_from_bob))
585                .await?
586                .into_iter(),
587        )
588        .await?;
589
590        assert_eq!(
591            ticket_count - 2,
592            confirmations.len(),
593            "must redeem only redeemable tickets in channel"
594        );
595
596        assert!(
597            actions.redeem_ticket(tickets[0].clone()).await.is_err(),
598            "cannot redeem a ticket that's being aggregated"
599        );
600
601        assert!(
602            actions.redeem_ticket(tickets[1].clone()).await.is_err(),
603            "cannot redeem a ticket that's being redeemed"
604        );
605
606        Ok(())
607    }
608
609    #[tokio::test]
610    async fn test_redeem_must_not_work_for_tickets_of_previous_epoch_being_aggregated_and_being_redeemed()
611    -> anyhow::Result<()> {
612        let ticket_count = 3;
613        let ticket_from_previous_epoch_count = 2;
614        let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
615        let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
616        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
617
618        // Create 1 ticket in Epoch 4
619        let (channel_from_bob, mut tickets) =
620            create_channel_with_ack_tickets(db.clone(), node_db.clone(), 1, &BOB, 4u32).await?;
621
622        // Insert another 2 tickets in Epoch 3
623        let ticket = generate_random_ack_ticket(0, &BOB, 3)?;
624        node_db.upsert_ticket(ticket.clone()).await?;
625        tickets.insert(0, ticket);
626
627        let ticket = generate_random_ack_ticket(1, &BOB, 3)?;
628        node_db.upsert_ticket(ticket.clone()).await?;
629        tickets.insert(1, ticket);
630
631        let tickets_clone = tickets.clone();
632        let mut tx_exec = MockTransactionExecutor::new();
633        tx_exec
634            .expect_redeem_ticket()
635            .times(ticket_count - ticket_from_previous_epoch_count)
636            .withf(move |t| {
637                tickets_clone[ticket_from_previous_epoch_count..]
638                    .iter()
639                    .any(|tk| tk.ticket.eq(&t.ticket))
640            })
641            .returning(move |_| Ok(random_hash));
642
643        let mut indexer_action_tracker = MockActionState::new();
644        for tkt in tickets.iter().skip(ticket_from_previous_epoch_count).cloned() {
645            indexer_action_tracker
646                .expect_register_expectation()
647                .once()
648                .return_once(move |_| {
649                    Ok(futures::future::ok(SignificantChainEvent {
650                        tx_hash: random_hash,
651                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
652                    })
653                    .boxed())
654                });
655        }
656
657        // Start the ActionQueue with the mock TransactionExecutor
658        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
659        let tx_sender = tx_queue.new_sender();
660        tokio::task::spawn(async move {
661            tx_queue.start().await;
662        });
663
664        let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
665
666        futures::future::join_all(
667            actions
668                .redeem_tickets(TicketSelector::from(&channel_from_bob))
669                .await?
670                .into_iter(),
671        )
672        .await;
673
674        assert!(
675            actions.redeem_ticket(tickets[0].clone()).await.is_err(),
676            "cannot redeem a ticket that's from the previous epoch"
677        );
678
679        Ok(())
680    }
681
682    #[tokio::test]
683    async fn test_redeem_must_not_work_for_tickets_of_next_epoch_being_redeemed() -> anyhow::Result<()> {
684        let ticket_count = 3;
685        let ticket_from_next_epoch_count = 2;
686        let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
687        let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
688        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
689
690        // Create 1 ticket in Epoch 4
691        let (channel_from_bob, mut tickets) =
692            create_channel_with_ack_tickets(db.clone(), node_db.clone(), 1, &BOB, 4u32).await?;
693
694        // Insert another 2 tickets in Epoch 5
695        let ticket = generate_random_ack_ticket(0, &BOB, 5)?;
696        node_db.upsert_ticket(ticket.clone()).await?;
697        tickets.insert(0, ticket);
698
699        let ticket = generate_random_ack_ticket(1, &BOB, 5)?;
700        node_db.upsert_ticket(ticket.clone()).await?;
701        tickets.insert(1, ticket);
702
703        let tickets_clone = tickets.clone();
704        let mut tx_exec = MockTransactionExecutor::new();
705        tx_exec
706            .expect_redeem_ticket()
707            .times(ticket_count - ticket_from_next_epoch_count)
708            .withf(move |t| {
709                tickets_clone[ticket_from_next_epoch_count..]
710                    .iter()
711                    .any(|tk| tk.ticket.eq(&t.ticket))
712            })
713            .returning(move |_| Ok(random_hash));
714
715        let mut indexer_action_tracker = MockActionState::new();
716        for tkt in tickets.iter().skip(ticket_from_next_epoch_count).cloned() {
717            indexer_action_tracker
718                .expect_register_expectation()
719                .once()
720                .return_once(move |_| {
721                    Ok(futures::future::ok(SignificantChainEvent {
722                        tx_hash: random_hash,
723                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
724                    })
725                    .boxed())
726                });
727        }
728
729        // Start the ActionQueue with the mock TransactionExecutor
730        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
731        let tx_sender = tx_queue.new_sender();
732        tokio::task::spawn(async move {
733            tx_queue.start().await;
734        });
735
736        let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
737
738        futures::future::join_all(
739            actions
740                .redeem_tickets(TicketSelector::from(&channel_from_bob))
741                .await?
742                .into_iter(),
743        )
744        .await;
745
746        for ticket in tickets.iter().take(ticket_from_next_epoch_count) {
747            assert!(
748                actions.redeem_ticket(ticket.clone()).await.is_err(),
749                "cannot redeem a ticket that's from the next epoch"
750            );
751        }
752
753        Ok(())
754    }
755
756    #[tokio::test]
757    async fn test_should_redeem_single_ticket() -> anyhow::Result<()> {
758        let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
759        let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
760        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
761
762        let (channel_from_bob, tickets) =
763            create_channel_with_ack_tickets(db.clone(), node_db.clone(), 1, &BOB, 1u32).await?;
764
765        let ticket = tickets.into_iter().next().unwrap();
766
767        let mut tx_exec = MockTransactionExecutor::new();
768        let ticket_clone = ticket.clone();
769        tx_exec
770            .expect_redeem_ticket()
771            .once()
772            .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
773            .returning(move |_| Ok(random_hash));
774
775        let mut indexer_action_tracker = MockActionState::new();
776        let ticket_clone = ticket.clone();
777        indexer_action_tracker
778            .expect_register_expectation()
779            .once()
780            .return_once(move |_| {
781                Ok(futures::future::ok(SignificantChainEvent {
782                    tx_hash: random_hash,
783                    event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
784                })
785                .boxed())
786            });
787
788        // Start the ActionQueue with the mock TransactionExecutor
789        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
790        let tx_sender = tx_queue.new_sender();
791        tokio::task::spawn(async move {
792            tx_queue.start().await;
793        });
794
795        let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
796
797        let confirmation = actions.redeem_ticket(ticket).await?.await?;
798
799        assert_eq!(confirmation.tx_hash, random_hash);
800
801        assert!(
802            node_db
803                .stream_tickets(Some((&channel_from_bob).into()))
804                .await?
805                .all(|tkt| futures::future::ready(tkt.status == AcknowledgedTicketStatus::BeingRedeemed))
806                .await,
807            "all bob's tickets must be in BeingRedeemed state"
808        );
809
810        Ok(())
811    }
812
813    #[tokio::test]
814    async fn test_should_not_redeem_single_ticket_with_lower_index_than_channel_index() -> anyhow::Result<()> {
815        let db = HoprIndexerDb::new_in_memory(ALICE.clone()).await?;
816        let node_db = HoprNodeDb::new_in_memory(ALICE.clone()).await?;
817        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
818
819        let (mut channel_from_bob, tickets) =
820            create_channel_with_ack_tickets(db.clone(), node_db.clone(), 1, &BOB, 1u32).await?;
821
822        channel_from_bob.ticket_index = 2_u32.into();
823        db.upsert_channel(None, channel_from_bob).await?;
824
825        let ticket = tickets.into_iter().next().unwrap();
826
827        let mut tx_exec = MockTransactionExecutor::new();
828        let ticket_clone = ticket.clone();
829        tx_exec
830            .expect_redeem_ticket()
831            .never()
832            .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
833            .returning(move |_| Ok(random_hash));
834
835        let mut indexer_action_tracker = MockActionState::new();
836        let ticket_clone = ticket.clone();
837        indexer_action_tracker
838            .expect_register_expectation()
839            .never()
840            .return_once(move |_| {
841                Ok(futures::future::ok(SignificantChainEvent {
842                    tx_hash: random_hash,
843                    event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
844                })
845                .boxed())
846            });
847
848        // Start the ActionQueue with the mock TransactionExecutor
849        let tx_queue = ActionQueue::new(node_db.clone(), indexer_action_tracker, tx_exec, Default::default());
850        let tx_sender = tx_queue.new_sender();
851        tokio::task::spawn(async move {
852            tx_queue.start().await;
853        });
854
855        let actions = ChainActions::new(&ALICE, db.clone(), node_db.clone(), tx_sender.clone());
856
857        assert!(matches!(actions.redeem_ticket(ticket).await, Err(OldTicket)));
858
859        Ok(())
860    }
861}