hopr_api/chain/
tickets.rs1use std::fmt::Formatter;
2
3use futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered};
4use hopr_internal_types::prelude::AcknowledgedTicketStatus;
5pub use hopr_internal_types::prelude::{RedeemableTicket, VerifiedTicket};
6
7use crate::{
8 chain::ChainReceipt,
9 db::{HoprDbTicketOperations, TicketMarker, TicketSelector},
10};
11
12#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct BatchRedemptionResult<E> {
17 pub successful: Vec<(VerifiedTicket, ChainReceipt)>,
20 pub rejected: Vec<(VerifiedTicket, String)>,
22 pub will_retry: Vec<(VerifiedTicket, E)>,
24}
25
26impl<E> Default for BatchRedemptionResult<E> {
27 fn default() -> Self {
28 Self {
29 successful: vec![],
30 rejected: vec![],
31 will_retry: vec![],
32 }
33 }
34}
35
36impl<E> std::fmt::Display for BatchRedemptionResult<E> {
37 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38 write!(
39 f,
40 "redemption results - successful: {}, rejected: {}, retriable: {}",
41 self.successful.len(),
42 self.rejected.len(),
43 self.will_retry.len()
44 )
45 }
46}
47
48#[derive(Debug, thiserror::Error)]
50pub enum TicketRedeemError<E> {
51 #[error("redemption of ticket {0} was rejected due to: {1}")]
53 Rejected(VerifiedTicket, String),
54 #[error("processing error during redemption of ticket {0}: {1}")]
56 ProcessingError(VerifiedTicket, E),
57}
58
59#[async_trait::async_trait]
61#[auto_impl::auto_impl(&, Box, Arc)]
62pub trait ChainWriteTicketOperations {
63 type Error: std::error::Error + Send + Sync + 'static;
64 async fn redeem_ticket<'a>(
68 &'a self,
69 ticket: RedeemableTicket,
70 ) -> Result<
71 BoxFuture<'a, Result<(VerifiedTicket, ChainReceipt), TicketRedeemError<Self::Error>>>,
72 TicketRedeemError<Self::Error>,
73 >;
74
75 async fn redeem_tickets_via_selectors<Db, S, I>(
84 &self,
85 db: &Db,
86 selectors: I,
87 ) -> Result<BatchRedemptionResult<Self::Error>, Db::Error>
88 where
89 Db: HoprDbTicketOperations + Sync,
90 I: IntoIterator<Item = S> + Send,
91 S: Into<TicketSelector>,
92 {
93 let selectors = selectors
95 .into_iter()
96 .map(|sel| sel.into().with_state(AcknowledgedTicketStatus::Untouched))
97 .collect::<Vec<_>>();
98
99 let mut tickets = db
101 .update_ticket_states_and_fetch(selectors, AcknowledgedTicketStatus::BeingRedeemed)
102 .await?
103 .collect::<Vec<_>>()
104 .await;
105
106 if tickets.is_empty() {
107 return Ok(BatchRedemptionResult::default());
108 }
109
110 tickets.sort();
112
113 let futures = FuturesUnordered::new();
114 for ticket in tickets {
115 match self.redeem_ticket(ticket).await {
116 Ok(redeem_tracker) => futures.push(redeem_tracker),
117 Err(error) => futures.push(futures::future::err(error).boxed()),
118 }
119 }
120
121 Ok(futures
122 .fold(BatchRedemptionResult::default(), |mut res, item| async move {
123 match item {
124 Ok((ticket, receipt)) => {
125 if let Err(error) = db.mark_tickets_as([&ticket], TicketMarker::Redeemed).await {
126 tracing::error!(%error, "failed to mark ticket as redeemed");
127 }
128 res.successful.push((ticket, receipt));
129 }
130 Err(TicketRedeemError::Rejected(ticket, reason)) => {
131 if let Err(error) = db.mark_tickets_as([&ticket], TicketMarker::Rejected).await {
132 tracing::error!(%error, "failed to mark ticket as rejected");
133 }
134 res.rejected.push((ticket, reason));
135 }
136 Err(TicketRedeemError::ProcessingError(ticket, proc_error)) => {
137 if let Err(error) = db
138 .update_ticket_states([&ticket], AcknowledgedTicketStatus::Untouched)
139 .await
140 {
141 tracing::error!(%error, "failed to update ticket state to untouched");
142 }
143 res.will_retry.push((ticket, proc_error));
144 }
145 }
146 res
147 })
148 .await)
149 }
150}