hopr_chain_actions/
redeem.rs

1//! This module contains the [TicketRedeemActions] trait defining actions regarding
2//! ticket redemption.
3//!
4//! An implementation of this trait is added to [ChainActions] which realizes the redemption
5//! operations via [ActionQueue](crate::action_queue::ActionQueue).
6//!
7//! There are 4 functions that can be used to redeem tickets in the [TicketRedeemActions] trait:
8//! - [redeem_all_tickets](TicketRedeemActions::redeem_all_tickets)
9//! - [redeem_tickets_in_channel](TicketRedeemActions::redeem_tickets_in_channel)
10//! - [redeem_tickets_with_counterparty](TicketRedeemActions::redeem_tickets_with_counterparty)
11//! - [redeem_ticket](TicketRedeemActions::redeem_ticket)
12//!
13//! Each method first checks if the tickets are redeemable.
14//! (= they are not marked as [BeingRedeemed](hopr_internal_types::tickets::AcknowledgedTicketStatus::BeingRedeemed) or
15//! [BeingAggregated](hopr_internal_types::tickets::AcknowledgedTicketStatus::BeingAggregated) in the DB),
16//! If they are redeemable, their state is changed to
17//! [BeingRedeemed](hopr_internal_types::tickets::AcknowledgedTicketStatus::BeingRedeemed) (while having acquired the exclusive DB write lock).
18//! Subsequently, the ticket in such a state is transmitted into the [ActionQueue](crate::action_queue::ActionQueue) so the redemption is soon executed on-chain.
19//! The functions return immediately but provide futures that can be awaited in case the callers wish to await the on-chain
20//! confirmation of each ticket redemption.
21//!
22//! See the details in [ActionQueue](crate::action_queue::ActionQueue) on how the confirmation is realized by awaiting the respective [SignificantChainEvent](hopr_chain_types::chain_events::SignificantChainEvent).
23//! by the Indexer.
24use async_trait::async_trait;
25use futures::StreamExt;
26use hopr_chain_types::actions::Action;
27use hopr_crypto_types::types::Hash;
28use hopr_db_sql::api::info::DomainSeparator;
29use hopr_db_sql::api::tickets::{HoprDbTicketOperations, TicketSelector};
30use hopr_db_sql::channels::HoprDbChannelOperations;
31use hopr_db_sql::prelude::HoprDbInfoOperations;
32use hopr_internal_types::prelude::*;
33use hopr_primitive_types::prelude::*;
34use tracing::{debug, error, info, warn};
35
36use crate::action_queue::PendingAction;
37use crate::errors::ChainActionsError::{ChannelDoesNotExist, InvalidState, OldTicket};
38use crate::errors::{ChainActionsError::WrongTicketState, Result};
39use crate::ChainActions;
40
41lazy_static::lazy_static! {
42    /// Used as a placeholder when the redeem transaction has not yet been published on-chain
43    static ref EMPTY_TX_HASH: Hash = Hash::default();
44}
45
46/// Gathers all the ticket redemption-related on-chain calls.
47#[async_trait]
48pub trait TicketRedeemActions {
49    /// Redeems all redeemable tickets in all channels.
50    async fn redeem_all_tickets(&self, only_aggregated: bool) -> Result<Vec<PendingAction>>;
51
52    /// Redeems all redeemable tickets in the incoming channel from the given counterparty.
53    async fn redeem_tickets_with_counterparty(
54        &self,
55        counterparty: &Address,
56        only_aggregated: bool,
57    ) -> Result<Vec<PendingAction>>;
58
59    /// Redeems all redeemable tickets in the given channel.
60    async fn redeem_tickets_in_channel(
61        &self,
62        channel: &ChannelEntry,
63        only_aggregated: bool,
64    ) -> Result<Vec<PendingAction>>;
65
66    /// Redeems all tickets based on the given [`TicketSelector`].
67    async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>>;
68
69    /// Tries to redeem the given ticket. If the ticket is not redeemable, returns an error.
70    /// Otherwise, the transaction hash of the on-chain redemption is returned.
71    async fn redeem_ticket(&self, ack: AcknowledgedTicket) -> Result<PendingAction>;
72}
73
74#[async_trait]
75impl<Db> TicketRedeemActions for ChainActions<Db>
76where
77    Db: HoprDbChannelOperations + HoprDbTicketOperations + HoprDbInfoOperations + Clone + Send + Sync + std::fmt::Debug,
78{
79    #[tracing::instrument(level = "debug", skip(self))]
80    async fn redeem_all_tickets(&self, only_aggregated: bool) -> Result<Vec<PendingAction>> {
81        let incoming_channels = self
82            .db
83            .get_channels_via(None, ChannelDirection::Incoming, &self.self_address())
84            .await?;
85        debug!(
86            channel_count = incoming_channels.len(),
87            "starting to redeem all tickets in channels to self"
88        );
89
90        let mut receivers: Vec<PendingAction> = vec![];
91
92        // Must be synchronous because underlying Ethereum transactions are sequential
93        for incoming_channel in incoming_channels {
94            match self.redeem_tickets_in_channel(&incoming_channel, only_aggregated).await {
95                Ok(mut successful_txs) => {
96                    receivers.append(&mut successful_txs);
97                }
98                Err(e) => {
99                    warn!(
100                        channel = %generate_channel_id(&incoming_channel.source, &incoming_channel.destination),
101                        error = %e,
102                        "Failed to redeem tickets in channel",
103                    );
104                }
105            }
106        }
107
108        Ok(receivers)
109    }
110
111    #[tracing::instrument(level = "debug", skip(self))]
112    async fn redeem_tickets_with_counterparty(
113        &self,
114        counterparty: &Address,
115        only_aggregated: bool,
116    ) -> Result<Vec<PendingAction>> {
117        let maybe_channel = self
118            .db
119            .get_channel_by_parties(None, counterparty, &self.self_address(), false)
120            .await?;
121        if let Some(channel) = maybe_channel {
122            self.redeem_tickets_in_channel(&channel, only_aggregated).await
123        } else {
124            Err(ChannelDoesNotExist)
125        }
126    }
127
128    #[tracing::instrument(level = "debug", skip(self))]
129    async fn redeem_tickets_in_channel(
130        &self,
131        channel: &ChannelEntry,
132        only_aggregated: bool,
133    ) -> Result<Vec<PendingAction>> {
134        self.redeem_tickets(
135            TicketSelector::from(channel)
136                .with_aggregated_only(only_aggregated)
137                .with_index_range(channel.ticket_index.as_u64()..)
138                .with_state(AcknowledgedTicketStatus::Untouched),
139        )
140        .await
141    }
142
143    #[tracing::instrument(level = "debug", skip(self))]
144    async fn redeem_tickets(&self, selector: TicketSelector) -> Result<Vec<PendingAction>> {
145        let (count_redeemable_tickets, _) = self.db.get_tickets_value(selector.clone()).await?;
146
147        info!(
148            count_redeemable_tickets, %selector,
149            "acknowledged tickets in channel that can be redeemed"
150        );
151
152        // Return fast if there are no redeemable tickets
153        if count_redeemable_tickets == 0 {
154            return Ok(vec![]);
155        }
156
157        let channel_dst = self
158            .db
159            .get_indexer_data(None)
160            .await?
161            .domain_separator(DomainSeparator::Channel)
162            .ok_or(InvalidState("missing channel dst".into()))?;
163
164        let selector_id = selector.to_string();
165
166        // Collect here, so we don't hold-up the stream open for too long
167        let redeem_stream = self
168            .db
169            .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
170            .await?
171            .collect::<Vec<_>>()
172            .await;
173
174        let mut receivers: Vec<PendingAction> = vec![];
175        for ack_ticket in redeem_stream {
176            let ticket_id = ack_ticket.to_string();
177
178            if let Ok(redeemable) = ack_ticket.into_redeemable(&self.chain_key, &channel_dst) {
179                let action = self.tx_sender.send(Action::RedeemTicket(redeemable)).await;
180                match action {
181                    Ok(successful_tx) => {
182                        receivers.push(successful_tx);
183                    }
184                    Err(e) => {
185                        error!(ticket_id, error = %e, "Failed to submit transaction that redeems ticket",);
186                    }
187                }
188            } else {
189                error!("failed to extract redeemable ticket");
190            }
191        }
192
193        info!(
194            count = receivers.len(),
195            selector = selector_id,
196            "acknowledged tickets were submitted to redeem in channel",
197        );
198
199        Ok(receivers)
200    }
201
202    #[tracing::instrument(level = "debug", skip(self))]
203    async fn redeem_ticket(&self, ack_ticket: AcknowledgedTicket) -> Result<PendingAction> {
204        if let Some(channel) = self
205            .db
206            .get_channel_by_id(None, &ack_ticket.verified_ticket().channel_id)
207            .await?
208        {
209            // Check if not trying to redeem a ticket that cannot be redeemed.
210            // Such tickets are automatically cleaned up (neglected) after successful redemption.
211            if ack_ticket.verified_ticket().index < channel.ticket_index.as_u64() {
212                return Err(OldTicket);
213            }
214
215            debug!(%ack_ticket, %channel, "redeeming single ticket");
216
217            let selector = TicketSelector::from(&channel)
218                .with_index(ack_ticket.verified_ticket().index)
219                .with_state(AcknowledgedTicketStatus::Untouched);
220
221            // Do not hold up the stream open for too long
222            let maybe_ticket = self
223                .db
224                .update_ticket_states_and_fetch(selector, AcknowledgedTicketStatus::BeingRedeemed)
225                .await?
226                .next()
227                .await;
228
229            if let Some(ticket) = maybe_ticket {
230                let channel_dst = self
231                    .db
232                    .get_indexer_data(None)
233                    .await?
234                    .domain_separator(DomainSeparator::Channel)
235                    .ok_or(InvalidState("missing channel dst".into()))?;
236
237                let redeemable = ticket.into_redeemable(&self.chain_key, &channel_dst)?;
238
239                debug!(%ack_ticket, "ticket is redeemable");
240                Ok(self.tx_sender.send(Action::RedeemTicket(redeemable)).await?)
241            } else {
242                Err(WrongTicketState(ack_ticket.to_string()))
243            }
244        } else {
245            Err(ChannelDoesNotExist)
246        }
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use futures::FutureExt;
254    use hex_literal::hex;
255    use hopr_chain_types::chain_events::ChainEventType::TicketRedeemed;
256    use hopr_chain_types::chain_events::SignificantChainEvent;
257    use hopr_crypto_random::random_bytes;
258    use hopr_crypto_types::prelude::*;
259    use hopr_db_sql::api::info::DomainSeparator;
260    use hopr_db_sql::db::HoprDb;
261    use hopr_db_sql::errors::DbSqlError;
262    use hopr_db_sql::info::HoprDbInfoOperations;
263    use hopr_db_sql::{HoprDbGeneralModelOperations, TargetDb};
264
265    use crate::action_queue::{ActionQueue, MockTransactionExecutor};
266    use crate::action_state::MockActionState;
267
268    lazy_static::lazy_static! {
269        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be constructible");
270        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be constructible");
271        static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
272    }
273
274    fn generate_random_ack_ticket(
275        idx: u64,
276        counterparty: &ChainKeypair,
277        channel_epoch: u32,
278    ) -> anyhow::Result<AcknowledgedTicket> {
279        let hk1 = HalfKey::random();
280        let hk2 = HalfKey::random();
281
282        let cp1: CurvePoint = hk1.to_challenge().try_into()?;
283        let cp2: CurvePoint = hk2.to_challenge().try_into()?;
284        let cp_sum = CurvePoint::combine(&[&cp1, &cp2]);
285
286        let price_per_packet: U256 = 10000000000000000u128.into(); // 0.01 HOPR
287
288        Ok(TicketBuilder::default()
289            .addresses(counterparty, &*ALICE)
290            .amount(price_per_packet.div_f64(1.0f64)? * 5u32)
291            .index(idx)
292            .index_offset(1)
293            .win_prob(1.0)
294            .channel_epoch(channel_epoch)
295            .challenge(Challenge::from(cp_sum).to_ethereum_challenge())
296            .build_signed(counterparty, &Hash::default())?
297            .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?))
298    }
299
300    async fn create_channel_with_ack_tickets(
301        db: HoprDb,
302        ticket_count: usize,
303        counterparty: &ChainKeypair,
304        channel_epoch: u32,
305    ) -> anyhow::Result<(ChannelEntry, Vec<AcknowledgedTicket>)> {
306        let ckp = counterparty.clone();
307        let db_clone = db.clone();
308        let channel = db
309            .begin_transaction()
310            .await?
311            .perform(|tx| {
312                Box::pin(async move {
313                    db_clone
314                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Default::default())
315                        .await?;
316
317                    let channel = ChannelEntry::new(
318                        ckp.public().to_address(),
319                        ALICE.public().to_address(),
320                        Balance::zero(BalanceType::HOPR),
321                        U256::zero(),
322                        ChannelStatus::Open,
323                        channel_epoch.into(),
324                    );
325                    db_clone.upsert_channel(Some(tx), channel).await?;
326                    Ok::<_, DbSqlError>(channel)
327                })
328            })
329            .await?;
330
331        let ckp = counterparty.clone();
332        let input_tickets = db
333            .begin_transaction_in_db(TargetDb::Tickets)
334            .await?
335            .perform(|tx| {
336                Box::pin(async move {
337                    let mut input_tickets = Vec::new();
338                    for i in 0..ticket_count {
339                        let ack_ticket = generate_random_ack_ticket(i as u64, &ckp, channel_epoch)
340                            .map_err(|e| hopr_db_sql::errors::DbSqlError::LogicalError(e.to_string()))?;
341                        db.upsert_ticket(Some(tx), ack_ticket.clone()).await?;
342                        input_tickets.push(ack_ticket);
343                    }
344                    Ok::<_, DbSqlError>(input_tickets)
345                })
346            })
347            .await?;
348
349        Ok((channel, input_tickets))
350    }
351
352    #[async_std::test]
353    async fn test_ticket_redeem_flow() -> anyhow::Result<()> {
354        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
355
356        let ticket_count = 5;
357        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
358
359        // All the tickets can be redeemed because they are issued with the same channel epoch
360        let (channel_from_bob, bob_tickets) =
361            create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
362        let (channel_from_charlie, charlie_tickets) =
363            create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
364
365        let mut indexer_action_tracker = MockActionState::new();
366        let mut seq2 = mockall::Sequence::new();
367
368        for tkt in bob_tickets.iter().cloned() {
369            indexer_action_tracker
370                .expect_register_expectation()
371                .once()
372                .in_sequence(&mut seq2)
373                .return_once(move |_| {
374                    Ok(futures::future::ok(SignificantChainEvent {
375                        tx_hash: random_hash,
376                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
377                    })
378                    .boxed())
379                });
380        }
381
382        for tkt in charlie_tickets.iter().cloned() {
383            indexer_action_tracker
384                .expect_register_expectation()
385                .once()
386                .in_sequence(&mut seq2)
387                .return_once(move |_| {
388                    Ok(futures::future::ok(SignificantChainEvent {
389                        tx_hash: random_hash,
390                        event_type: TicketRedeemed(channel_from_charlie, Some(tkt)),
391                    })
392                    .boxed())
393                });
394        }
395
396        let mut tx_exec = MockTransactionExecutor::new();
397        let mut seq = mockall::Sequence::new();
398
399        // Expect all Bob's tickets get redeemed first
400        tx_exec
401            .expect_redeem_ticket()
402            .times(ticket_count)
403            .in_sequence(&mut seq)
404            .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
405            .returning(move |_| Ok(random_hash));
406
407        // and then all Charlie's tickets get redeemed
408        tx_exec
409            .expect_redeem_ticket()
410            .times(ticket_count)
411            .in_sequence(&mut seq)
412            .withf(move |t| charlie_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
413            .returning(move |_| Ok(random_hash));
414
415        // Start the ActionQueue with the mock TransactionExecutor
416        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
417        let tx_sender = tx_queue.new_sender();
418        async_std::task::spawn(async move {
419            tx_queue.start().await;
420        });
421
422        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
423
424        let confirmations = futures::future::try_join_all(actions.redeem_all_tickets(false).await?.into_iter()).await?;
425
426        assert_eq!(2 * ticket_count, confirmations.len(), "must have all confirmations");
427        assert!(
428            confirmations.into_iter().all(|c| c.tx_hash == random_hash),
429            "tx hashes must be equal"
430        );
431
432        let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
433
434        let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
435
436        assert!(
437            db_acks_bob
438                .into_iter()
439                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
440            "all bob's tickets must be in BeingRedeemed state"
441        );
442        assert!(
443            db_acks_charlie
444                .into_iter()
445                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
446            "all charlie's tickets must be in BeingRedeemed state"
447        );
448
449        Ok(())
450    }
451
452    #[async_std::test]
453    async fn test_ticket_redeem_in_channel() -> anyhow::Result<()> {
454        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
455
456        let ticket_count = 5;
457        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
458
459        // All the tickets can be redeemed because they are issued with the same channel epoch
460        let (mut channel_from_bob, bob_tickets) =
461            create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
462        let (channel_from_charlie, _) =
463            create_channel_with_ack_tickets(db.clone(), ticket_count, &CHARLIE, 4u32).await?;
464
465        // Tickets with index 0 will be skipped, as that is already past
466        channel_from_bob.ticket_index = 1_u32.into();
467        db.upsert_channel(None, channel_from_bob.clone()).await?;
468
469        let mut indexer_action_tracker = MockActionState::new();
470        let mut seq2 = mockall::Sequence::new();
471
472        for tkt in bob_tickets.iter().cloned() {
473            indexer_action_tracker
474                .expect_register_expectation()
475                .once()
476                .in_sequence(&mut seq2)
477                .return_once(move |_| {
478                    Ok(futures::future::ok(SignificantChainEvent {
479                        tx_hash: random_hash,
480                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
481                    })
482                    .boxed())
483                });
484        }
485
486        // Expect only Bob's tickets to get redeemed
487        let mut tx_exec = MockTransactionExecutor::new();
488        tx_exec
489            .expect_redeem_ticket()
490            .times(ticket_count - 1)
491            .withf(move |t| bob_tickets.iter().any(|tk| tk.ticket.eq(&t.ticket)))
492            .returning(move |_| Ok(random_hash));
493
494        // Start the ActionQueue with the mock TransactionExecutor
495        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
496        let tx_sender = tx_queue.new_sender();
497        async_std::task::spawn(async move {
498            tx_queue.start().await;
499        });
500
501        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
502
503        let confirmations = futures::future::try_join_all(
504            actions
505                .redeem_tickets_with_counterparty(&BOB.public().to_address(), false)
506                .await?
507                .into_iter(),
508        )
509        .await?;
510
511        // First ticket is skipped, because its index is lower than the index on the channel entry
512        assert_eq!(ticket_count - 1, confirmations.len(), "must have all confirmations");
513        assert!(
514            confirmations.into_iter().all(|c| c.tx_hash == random_hash),
515            "tx hashes must be equal"
516        );
517
518        let db_acks_bob = db.get_tickets((&channel_from_bob).into()).await?;
519
520        let db_acks_charlie = db.get_tickets((&channel_from_charlie).into()).await?;
521
522        assert!(
523            db_acks_bob
524                .into_iter()
525                .take_while(|tkt| tkt.verified_ticket().index != 0)
526                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
527            "all bob's tickets must be in BeingRedeemed state"
528        );
529        assert!(
530            db_acks_charlie
531                .into_iter()
532                .all(|tkt| tkt.status == AcknowledgedTicketStatus::Untouched),
533            "all charlie's tickets must be in Untouched state"
534        );
535
536        Ok(())
537    }
538
539    #[async_std::test]
540    async fn test_redeem_must_not_work_for_tickets_being_aggregated_and_being_redeemed() -> anyhow::Result<()> {
541        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
542
543        let ticket_count = 3;
544        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
545
546        let (channel_from_bob, mut tickets) =
547            create_channel_with_ack_tickets(db.clone(), ticket_count, &BOB, 4u32).await?;
548
549        // Make the first ticket unredeemable
550        tickets[0].status = AcknowledgedTicketStatus::BeingAggregated;
551        let selector = TicketSelector::from(&tickets[0]).with_no_state();
552        db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingAggregated)
553            .await?;
554
555        // Make the second ticket unredeemable
556        tickets[1].status = AcknowledgedTicketStatus::BeingRedeemed;
557        let selector = TicketSelector::from(&tickets[1]).with_no_state();
558        db.update_ticket_states(selector, AcknowledgedTicketStatus::BeingRedeemed)
559            .await?;
560
561        // Expect only the redeemable tickets get redeemed
562        let tickets_clone = tickets.clone();
563        let mut tx_exec = MockTransactionExecutor::new();
564        tx_exec
565            .expect_redeem_ticket()
566            .times(ticket_count - 2)
567            .withf(move |t| tickets_clone[2..].iter().any(|tk| tk.ticket.eq(&t.ticket)))
568            .returning(move |_| Ok(random_hash));
569
570        let mut indexer_action_tracker = MockActionState::new();
571        for tkt in tickets.iter().skip(2).cloned() {
572            indexer_action_tracker
573                .expect_register_expectation()
574                .once()
575                .return_once(move |_| {
576                    Ok(futures::future::ok(SignificantChainEvent {
577                        tx_hash: random_hash,
578                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
579                    })
580                    .boxed())
581                });
582        }
583
584        // Start the ActionQueue with the mock TransactionExecutor
585        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
586        let tx_sender = tx_queue.new_sender();
587        async_std::task::spawn(async move {
588            tx_queue.start().await;
589        });
590
591        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
592
593        let confirmations = futures::future::try_join_all(
594            actions
595                .redeem_tickets_in_channel(&channel_from_bob, false)
596                .await?
597                .into_iter(),
598        )
599        .await?;
600
601        assert_eq!(
602            ticket_count - 2,
603            confirmations.len(),
604            "must redeem only redeemable tickets in channel"
605        );
606
607        assert!(
608            actions.redeem_ticket(tickets[0].clone()).await.is_err(),
609            "cannot redeem a ticket that's being aggregated"
610        );
611
612        assert!(
613            actions.redeem_ticket(tickets[1].clone()).await.is_err(),
614            "cannot redeem a ticket that's being redeemed"
615        );
616
617        Ok(())
618    }
619
620    #[async_std::test]
621    async fn test_redeem_must_not_work_for_tickets_of_previous_epoch_being_aggregated_and_being_redeemed(
622    ) -> anyhow::Result<()> {
623        let ticket_count = 3;
624        let ticket_from_previous_epoch_count = 2;
625        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
626        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
627
628        // Create 1 ticket in Epoch 4
629        let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
630
631        // Insert another 2 tickets in Epoch 3
632        let ticket = generate_random_ack_ticket(0, &BOB, 3)?;
633        db.upsert_ticket(None, ticket.clone()).await?;
634        tickets.insert(0, ticket);
635
636        let ticket = generate_random_ack_ticket(1, &BOB, 3)?;
637        db.upsert_ticket(None, ticket.clone()).await?;
638        tickets.insert(1, ticket);
639
640        let tickets_clone = tickets.clone();
641        let mut tx_exec = MockTransactionExecutor::new();
642        tx_exec
643            .expect_redeem_ticket()
644            .times(ticket_count - ticket_from_previous_epoch_count)
645            .withf(move |t| {
646                tickets_clone[ticket_from_previous_epoch_count..]
647                    .iter()
648                    .any(|tk| tk.ticket.eq(&t.ticket))
649            })
650            .returning(move |_| Ok(random_hash));
651
652        let mut indexer_action_tracker = MockActionState::new();
653        for tkt in tickets.iter().skip(ticket_from_previous_epoch_count).cloned() {
654            indexer_action_tracker
655                .expect_register_expectation()
656                .once()
657                .return_once(move |_| {
658                    Ok(futures::future::ok(SignificantChainEvent {
659                        tx_hash: random_hash,
660                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
661                    })
662                    .boxed())
663                });
664        }
665
666        // Start the ActionQueue with the mock TransactionExecutor
667        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
668        let tx_sender = tx_queue.new_sender();
669        async_std::task::spawn(async move {
670            tx_queue.start().await;
671        });
672
673        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
674
675        futures::future::join_all(
676            actions
677                .redeem_tickets_in_channel(&channel_from_bob, false)
678                .await?
679                .into_iter(),
680        )
681        .await;
682
683        assert!(
684            actions.redeem_ticket(tickets[0].clone()).await.is_err(),
685            "cannot redeem a ticket that's from the previous epoch"
686        );
687
688        Ok(())
689    }
690
691    #[async_std::test]
692    async fn test_redeem_must_not_work_for_tickets_of_next_epoch_being_redeemed() -> anyhow::Result<()> {
693        let ticket_count = 4;
694        let ticket_from_next_epoch_count = 2;
695        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
696        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
697
698        // Create 1 ticket in Epoch 4
699        let (channel_from_bob, mut tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 4u32).await?;
700
701        // Insert another 2 tickets in Epoch 5
702        let ticket = generate_random_ack_ticket(0, &BOB, 5)?;
703        db.upsert_ticket(None, ticket.clone()).await?;
704        tickets.insert(0, ticket);
705
706        let ticket = generate_random_ack_ticket(1, &BOB, 5)?;
707        db.upsert_ticket(None, ticket.clone()).await?;
708        tickets.insert(1, ticket);
709
710        let tickets_clone = tickets.clone();
711        let mut tx_exec = MockTransactionExecutor::new();
712        tx_exec
713            .expect_redeem_ticket()
714            .times(ticket_count - ticket_from_next_epoch_count)
715            .withf(move |t| {
716                tickets_clone[ticket_from_next_epoch_count..]
717                    .iter()
718                    .any(|tk| tk.ticket.eq(&t.ticket))
719            })
720            .returning(move |_| Ok(random_hash));
721
722        let mut indexer_action_tracker = MockActionState::new();
723        for tkt in tickets.iter().skip(ticket_from_next_epoch_count).cloned() {
724            indexer_action_tracker
725                .expect_register_expectation()
726                .once()
727                .return_once(move |_| {
728                    Ok(futures::future::ok(SignificantChainEvent {
729                        tx_hash: random_hash,
730                        event_type: TicketRedeemed(channel_from_bob, Some(tkt)),
731                    })
732                    .boxed())
733                });
734        }
735
736        // Start the ActionQueue with the mock TransactionExecutor
737        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
738        let tx_sender = tx_queue.new_sender();
739        async_std::task::spawn(async move {
740            tx_queue.start().await;
741        });
742
743        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
744
745        futures::future::join_all(
746            actions
747                .redeem_tickets_in_channel(&channel_from_bob, false)
748                .await?
749                .into_iter(),
750        )
751        .await;
752
753        for unredeemable_index in 0..ticket_from_next_epoch_count {
754            assert!(
755                actions
756                    .redeem_ticket(tickets[unredeemable_index].clone())
757                    .await
758                    .is_err(),
759                "cannot redeem a ticket that's from the next epoch"
760            );
761        }
762
763        Ok(())
764    }
765
766    #[async_std::test]
767    async fn test_should_redeem_single_ticket() -> anyhow::Result<()> {
768        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
769        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
770
771        let (channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
772
773        let ticket = tickets.into_iter().next().unwrap();
774
775        let mut tx_exec = MockTransactionExecutor::new();
776        let ticket_clone = ticket.clone();
777        tx_exec
778            .expect_redeem_ticket()
779            .once()
780            .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
781            .returning(move |_| Ok(random_hash));
782
783        let mut indexer_action_tracker = MockActionState::new();
784        let ticket_clone = ticket.clone();
785        indexer_action_tracker
786            .expect_register_expectation()
787            .once()
788            .return_once(move |_| {
789                Ok(futures::future::ok(SignificantChainEvent {
790                    tx_hash: random_hash,
791                    event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
792                })
793                .boxed())
794            });
795
796        // Start the ActionQueue with the mock TransactionExecutor
797        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
798        let tx_sender = tx_queue.new_sender();
799        async_std::task::spawn(async move {
800            tx_queue.start().await;
801        });
802
803        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
804
805        let confirmation = actions.redeem_ticket(ticket).await?.await?;
806
807        assert_eq!(confirmation.tx_hash, random_hash);
808
809        assert!(
810            db.get_tickets((&channel_from_bob).into())
811                .await?
812                .into_iter()
813                .all(|tkt| tkt.status == AcknowledgedTicketStatus::BeingRedeemed),
814            "all bob's tickets must be in BeingRedeemed state"
815        );
816
817        Ok(())
818    }
819
820    #[async_std::test]
821    async fn test_should_not_redeem_single_ticket_with_lower_index_than_channel_index() -> anyhow::Result<()> {
822        let db = HoprDb::new_in_memory(ALICE.clone()).await?;
823        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
824
825        let (mut channel_from_bob, tickets) = create_channel_with_ack_tickets(db.clone(), 1, &BOB, 1u32).await?;
826
827        channel_from_bob.ticket_index = 2_u32.into();
828        db.upsert_channel(None, channel_from_bob.clone()).await?;
829
830        let ticket = tickets.into_iter().next().unwrap();
831
832        let mut tx_exec = MockTransactionExecutor::new();
833        let ticket_clone = ticket.clone();
834        tx_exec
835            .expect_redeem_ticket()
836            .never()
837            .withf(move |t| ticket_clone.ticket.eq(&t.ticket))
838            .returning(move |_| Ok(random_hash));
839
840        let mut indexer_action_tracker = MockActionState::new();
841        let ticket_clone = ticket.clone();
842        indexer_action_tracker
843            .expect_register_expectation()
844            .never()
845            .return_once(move |_| {
846                Ok(futures::future::ok(SignificantChainEvent {
847                    tx_hash: random_hash,
848                    event_type: TicketRedeemed(channel_from_bob, Some(ticket_clone)),
849                })
850                .boxed())
851            });
852
853        // Start the ActionQueue with the mock TransactionExecutor
854        let tx_queue = ActionQueue::new(db.clone(), indexer_action_tracker, tx_exec, Default::default());
855        let tx_sender = tx_queue.new_sender();
856        async_std::task::spawn(async move {
857            tx_queue.start().await;
858        });
859
860        let actions = ChainActions::new(&ALICE, db.clone(), tx_sender.clone());
861
862        assert!(matches!(actions.redeem_ticket(ticket).await, Err(OldTicket)));
863
864        Ok(())
865    }
866}