hopr_api/chain/
tickets.rs

1use std::fmt::Formatter;
2
3use futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered};
4use hopr_internal_types::prelude::AcknowledgedTicketStatus;
5pub use hopr_internal_types::prelude::{RedeemableTicket, VerifiedTicket};
6
7use crate::{
8    chain::ChainReceipt,
9    db::{HoprDbTicketOperations, TicketMarker, TicketSelector},
10};
11
12/// Result of [`redeem_tickets_via_selector`].
13///
14/// Contains tickets that were successfully redeemed, rejected or left untouched.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct BatchRedemptionResult<E> {
17    /// Tickets which were successfully redeemed and
18    /// removed from the ticket database.
19    pub successful: Vec<(VerifiedTicket, ChainReceipt)>,
20    /// Tickets which were permanently rejected and removed from the ticket database.
21    pub rejected: Vec<(VerifiedTicket, String)>,
22    /// Tickets which could not be redeemed and will be retried later.
23    pub will_retry: Vec<(VerifiedTicket, E)>,
24}
25
26impl<E> Default for BatchRedemptionResult<E> {
27    fn default() -> Self {
28        Self {
29            successful: vec![],
30            rejected: vec![],
31            will_retry: vec![],
32        }
33    }
34}
35
36impl<E> std::fmt::Display for BatchRedemptionResult<E> {
37    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38        write!(
39            f,
40            "redemption results - successful: {}, rejected: {}, retriable: {}",
41            self.successful.len(),
42            self.rejected.len(),
43            self.will_retry.len()
44        )
45    }
46}
47
48/// Errors that can occur during ticket redemption.
49#[derive(Debug, thiserror::Error)]
50pub enum TicketRedeemError<E> {
51    /// Non-retryable error, the ticket should be discarded
52    #[error("redemption of ticket {0} was rejected due to: {1}")]
53    Rejected(VerifiedTicket, String),
54    /// Retryable error, the ticket redemption should be retried.
55    #[error("processing error during redemption of ticket {0}: {1}")]
56    ProcessingError(VerifiedTicket, E),
57}
58
59/// On-chain write operations with tickets.
60#[async_trait::async_trait]
61#[auto_impl::auto_impl(&, Box, Arc)]
62pub trait ChainWriteTicketOperations {
63    type Error: std::error::Error + Send + Sync + 'static;
64    /// Redeems a single ticket on-chain.
65    ///
66    /// The input `ticket` is always returned as [`VerifiedTicket`], either on success or failure.
67    async fn redeem_ticket<'a>(
68        &'a self,
69        ticket: RedeemableTicket,
70    ) -> Result<
71        BoxFuture<'a, Result<(VerifiedTicket, ChainReceipt), TicketRedeemError<Self::Error>>>,
72        TicketRedeemError<Self::Error>,
73    >;
74
75    /// Fetches a batch of tickets via [`selector`](TicketSelector) to [`HoprDbTicketOperations`]
76    /// and performs batched ticket redemption.
77    ///
78    /// The function takes care of properly marking the tickets in the DB as being redeemed and
79    /// also properly unmarking or removing them on redemption success or failure.
80    ///
81    /// The method waits until all matched tickets are either redeemed or fail to redeem,
82    /// reporting the results in the [`BatchRedemptionResult`] object.
83    async fn redeem_tickets_via_selectors<Db, S, I>(
84        &self,
85        db: &Db,
86        selectors: I,
87    ) -> Result<BatchRedemptionResult<Self::Error>, Db::Error>
88    where
89        Db: HoprDbTicketOperations + Sync,
90        I: IntoIterator<Item = S> + Send,
91        S: Into<TicketSelector>,
92    {
93        // Make sure the selector only matches untouched tickets
94        let selectors = selectors
95            .into_iter()
96            .map(|sel| sel.into().with_state(AcknowledgedTicketStatus::Untouched))
97            .collect::<Vec<_>>();
98
99        // Collect the tickets first so we don't hold up the DB connection
100        let mut tickets = db
101            .update_ticket_states_and_fetch(selectors, AcknowledgedTicketStatus::BeingRedeemed)
102            .await?
103            .collect::<Vec<_>>()
104            .await;
105
106        if tickets.is_empty() {
107            return Ok(BatchRedemptionResult::default());
108        }
109
110        // Make sure that the tickets are sorted
111        tickets.sort();
112
113        let futures = FuturesUnordered::new();
114        for ticket in tickets {
115            match self.redeem_ticket(ticket).await {
116                Ok(redeem_tracker) => futures.push(redeem_tracker),
117                Err(error) => futures.push(futures::future::err(error).boxed()),
118            }
119        }
120
121        Ok(futures
122            .fold(BatchRedemptionResult::default(), |mut res, item| async move {
123                match item {
124                    Ok((ticket, receipt)) => {
125                        if let Err(error) = db.mark_tickets_as([&ticket], TicketMarker::Redeemed).await {
126                            tracing::error!(%error, "failed to mark ticket as redeemed");
127                        }
128                        res.successful.push((ticket, receipt));
129                    }
130                    Err(TicketRedeemError::Rejected(ticket, reason)) => {
131                        if let Err(error) = db.mark_tickets_as([&ticket], TicketMarker::Rejected).await {
132                            tracing::error!(%error, "failed to mark ticket as rejected");
133                        }
134                        res.rejected.push((ticket, reason));
135                    }
136                    Err(TicketRedeemError::ProcessingError(ticket, proc_error)) => {
137                        if let Err(error) = db
138                            .update_ticket_states([&ticket], AcknowledgedTicketStatus::Untouched)
139                            .await
140                        {
141                            tracing::error!(%error, "failed to update ticket state to untouched");
142                        }
143                        res.will_retry.push((ticket, proc_error));
144                    }
145                }
146                res
147            })
148            .await)
149    }
150}