hopr_api/db/
tickets.rs

1use std::{
2    collections::HashSet,
3    fmt::{Display, Formatter},
4    ops::{Bound, RangeBounds},
5    sync::{Arc, atomic::AtomicU64},
6};
7
8use futures::stream::BoxStream;
9use hopr_crypto_types::prelude::*;
10use hopr_internal_types::prelude::*;
11use hopr_primitive_types::prelude::*;
12
13/// Allows selecting a range of ticket indices in [TicketSelector].
14#[derive(Debug, Clone, PartialEq, Eq, Default)]
15pub enum TicketIndexSelector {
16    /// Selects no ticket index specifically.
17    /// This makes the [TicketSelector] less restrictive.
18    #[default]
19    None,
20    /// Selects a single ticket with the given index.
21    Single(u64),
22    /// Selects multiple tickets with the given indices.
23    Multiple(HashSet<u64>),
24    /// Selects multiple tickets with indices within the given range.
25    Range((Bound<u64>, Bound<u64>)),
26}
27
28impl Display for TicketIndexSelector {
29    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
30        match &self {
31            TicketIndexSelector::None => write!(f, ""),
32            TicketIndexSelector::Single(idx) => write!(f, "with index {idx}"),
33            TicketIndexSelector::Multiple(indices) => write!(f, "with indices {indices:?}"),
34            TicketIndexSelector::Range((lb, ub)) => write!(f, "with indices in {lb:?}..{ub:?}"),
35        }
36    }
37}
38
39/// Allows selecting tickets via [`HoprDbTicketOperations`].
40#[derive(Clone, Debug)]
41pub struct TicketSelector {
42    /// Channel ID and Epoch pairs.
43    pub channel_identifiers: Vec<(Hash, U256)>,
44    /// If given, will select ticket(s) with the given indices
45    /// in the given channel and epoch.
46    ///
47    /// See [`TicketIndexSelector`] for possible options.
48    pub index: TicketIndexSelector,
49    /// If given, the tickets are further restricted to the ones with a winning probability
50    /// in this range.
51    pub win_prob: (Bound<WinningProbability>, Bound<WinningProbability>),
52    /// If given, the tickets are further restricted to the ones with an amount
53    /// in this range.
54    pub amount: (Bound<HoprBalance>, Bound<HoprBalance>),
55    /// Further restriction to tickets with the given state.
56    pub state: Option<AcknowledgedTicketStatus>,
57    /// Further restrict to only aggregated tickets.
58    pub only_aggregated: bool,
59}
60
61impl Display for TicketSelector {
62    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63        let out = format!(
64            "ticket selector in {:?} {}{}{}{}{}",
65            self.channel_identifiers,
66            self.index,
67            self.state
68                .map(|state| format!(" in state {state}"))
69                .unwrap_or("".into()),
70            if self.only_aggregated { " only aggregated" } else { "" },
71            match &self.win_prob {
72                (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
73                bounds => format!(" with winning probability in {bounds:?}"),
74            },
75            match &self.amount {
76                (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
77                bounds => format!(" with amount in {bounds:?}"),
78            },
79        );
80        write!(f, "{}", out.trim())
81    }
82}
83
84fn approx_cmp_bounds(b1: Bound<WinningProbability>, b2: Bound<WinningProbability>) -> bool {
85    match (b1, b2) {
86        (Bound::Unbounded, Bound::Unbounded) => true,
87        (Bound::Included(a), Bound::Included(b)) => b.approx_eq(&a),
88        (Bound::Excluded(a), Bound::Excluded(b)) => b.approx_eq(&a),
89        _ => false,
90    }
91}
92
93impl PartialEq for TicketSelector {
94    fn eq(&self, other: &Self) -> bool {
95        self.channel_identifiers == other.channel_identifiers
96            && self.index == other.index
97            && self.state == other.state
98            && self.only_aggregated == other.only_aggregated
99            && self.amount == other.amount
100            && approx_cmp_bounds(self.win_prob.0, other.win_prob.0)
101            && approx_cmp_bounds(self.win_prob.1, other.win_prob.1)
102    }
103}
104
105impl TicketSelector {
106    /// Create a new ticket selector given the `channel_id` and `epoch`.
107    pub fn new<T: Into<U256>>(channel_id: Hash, epoch: T) -> Self {
108        Self {
109            channel_identifiers: vec![(channel_id, epoch.into())],
110            index: TicketIndexSelector::None,
111            win_prob: (Bound::Unbounded, Bound::Unbounded),
112            amount: (Bound::Unbounded, Bound::Unbounded),
113            state: None,
114            only_aggregated: false,
115        }
116    }
117
118    /// Allows matching tickets on multiple channels by adding the given
119    /// `channel_id` and `epoch` to the selector.
120    ///
121    /// This also nullifies any prior effect of any prior calls to [`TicketSelector::with_index`] or
122    /// [`TicketSelector::with_index_range`]
123    /// as ticket indices cannot be matched over multiple channels.
124    #[must_use]
125    pub fn also_on_channel<T: Into<U256>>(self, channel_id: Hash, epoch: T) -> Self {
126        let mut ret = self.clone();
127        ret.index = TicketIndexSelector::None;
128        ret.channel_identifiers.push((channel_id, epoch.into()));
129        ret
130    }
131
132    /// Convenience version on [`TicketSelector::also_on_channel`] that accepts a [`ChannelEntry`].
133    #[must_use]
134    pub fn also_on_channel_entry(self, entry: &ChannelEntry) -> Self {
135        self.also_on_channel(entry.get_id(), entry.channel_epoch)
136    }
137
138    /// Sets the selector to match only tickets on the given `channel_id` and `epoch`.
139    ///
140    /// This nullifies any prior calls to [`TicketSelector::also_on_channel`].
141    #[must_use]
142    pub fn just_on_channel<T: Into<U256>>(self, channel_id: Hash, epoch: T) -> Self {
143        let mut ret = self.clone();
144        ret.channel_identifiers = vec![(channel_id, epoch.into())];
145        ret
146    }
147
148    /// Checks if this selector operates only on a single channel.
149    ///
150    /// This will return `false` if [`TicketSelector::also_on_channel`] was called, and neither
151    /// the [`TicketSelector::with_index`] nor [`TicketSelector::with_index_range`] were
152    /// called subsequently.
153    pub fn is_single_channel(&self) -> bool {
154        self.channel_identifiers.len() == 1
155    }
156
157    /// If `false` is returned, the selector can fetch more than a single ticket.
158    pub fn is_unique(&self) -> bool {
159        self.is_single_channel()
160            && (matches!(&self.index, TicketIndexSelector::Single(_))
161                || matches!(&self.index, TicketIndexSelector::Multiple(indices) if indices.len() == 1))
162    }
163
164    /// Returns this instance with a ticket index set.
165    ///
166    /// This method can be called multiple times to select multiple tickets.
167    /// If [`TicketSelector::with_index_range`] was previously called, it will be replaced.
168    /// If [`TicketSelector::also_on_channel`] was previously called, its effect will be nullified.
169    #[must_use]
170    pub fn with_index(mut self, index: u64) -> Self {
171        self.channel_identifiers.truncate(1);
172        self.index = match self.index {
173            TicketIndexSelector::None | TicketIndexSelector::Range(_) => TicketIndexSelector::Single(index),
174            TicketIndexSelector::Single(existing) => {
175                TicketIndexSelector::Multiple(HashSet::from_iter([existing, index]))
176            }
177            TicketIndexSelector::Multiple(mut existing) => {
178                existing.insert(index);
179                TicketIndexSelector::Multiple(existing)
180            }
181        };
182        self
183    }
184
185    /// Returns this instance with a ticket index upper bound set.
186    /// If [`TicketSelector::with_index`] was previously called, it will be replaced.
187    /// If [`TicketSelector::also_on_channel`] was previously called, its effect will be nullified.
188    #[must_use]
189    pub fn with_index_range<T: RangeBounds<u64>>(mut self, index_bound: T) -> Self {
190        self.channel_identifiers.truncate(1);
191        self.index = TicketIndexSelector::Range((index_bound.start_bound().cloned(), index_bound.end_bound().cloned()));
192        self
193    }
194
195    /// Returns this instance with a ticket state set.
196    #[must_use]
197    pub fn with_state(mut self, state: AcknowledgedTicketStatus) -> Self {
198        self.state = Some(state);
199        self
200    }
201
202    /// Returns this instance without a ticket state set.
203    #[must_use]
204    pub fn with_no_state(mut self) -> Self {
205        self.state = None;
206        self
207    }
208
209    /// Returns this instance with `only_aggregated` flag value.
210    #[must_use]
211    pub fn with_aggregated_only(mut self, only_aggregated: bool) -> Self {
212        self.only_aggregated = only_aggregated;
213        self
214    }
215
216    /// Returns this instance with a winning probability range bounds set.
217    #[must_use]
218    pub fn with_winning_probability<T: RangeBounds<WinningProbability>>(mut self, range: T) -> Self {
219        self.win_prob = (range.start_bound().cloned(), range.end_bound().cloned());
220        self
221    }
222
223    /// Returns this instance with the ticket amount range bounds set.
224    #[must_use]
225    pub fn with_amount<T: RangeBounds<HoprBalance>>(mut self, range: T) -> Self {
226        self.amount = (range.start_bound().cloned(), range.end_bound().cloned());
227        self
228    }
229}
230
231impl From<&AcknowledgedTicket> for TicketSelector {
232    fn from(value: &AcknowledgedTicket) -> Self {
233        Self {
234            channel_identifiers: vec![(
235                value.verified_ticket().channel_id,
236                value.verified_ticket().channel_epoch.into(),
237            )],
238            index: TicketIndexSelector::Single(value.verified_ticket().index),
239            win_prob: (Bound::Unbounded, Bound::Unbounded),
240            amount: (Bound::Unbounded, Bound::Unbounded),
241            state: Some(value.status),
242            only_aggregated: value.verified_ticket().index_offset > 1,
243        }
244    }
245}
246
247impl From<&RedeemableTicket> for TicketSelector {
248    fn from(value: &RedeemableTicket) -> Self {
249        Self {
250            channel_identifiers: vec![(
251                value.verified_ticket().channel_id,
252                value.verified_ticket().channel_epoch.into(),
253            )],
254            index: TicketIndexSelector::Single(value.verified_ticket().index),
255            win_prob: (Bound::Unbounded, Bound::Unbounded),
256            amount: (Bound::Unbounded, Bound::Unbounded),
257            state: None,
258            only_aggregated: value.verified_ticket().index_offset > 1,
259        }
260    }
261}
262
263impl From<&ChannelEntry> for TicketSelector {
264    fn from(value: &ChannelEntry) -> Self {
265        Self {
266            channel_identifiers: vec![(value.get_id(), value.channel_epoch)],
267            index: TicketIndexSelector::None,
268            win_prob: (Bound::Unbounded, Bound::Unbounded),
269            amount: (Bound::Unbounded, Bound::Unbounded),
270            state: None,
271            only_aggregated: false,
272        }
273    }
274}
275
276impl From<ChannelEntry> for TicketSelector {
277    fn from(value: ChannelEntry) -> Self {
278        TicketSelector::from(&value)
279    }
280}
281
282/// Different markers for unredeemed tickets.
283/// See [`HoprDbTicketOperations::mark_tickets_as`] for usage.
284#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display)]
285#[strum(serialize_all = "lowercase")]
286pub enum TicketMarker {
287    Redeemed,
288    Rejected,
289    Neglected,
290}
291
292/// Database operations for tickets.
293#[async_trait::async_trait]
294pub trait HoprDbTicketOperations {
295    type Error: std::error::Error + Send + Sync + 'static;
296
297    /// Retrieve acknowledged winning tickets, according to the given `selector`.
298    ///
299    /// If no selector is given, streams tickets in all channels.
300    async fn stream_tickets<'c>(
301        &'c self,
302        selector: Option<TicketSelector>,
303    ) -> Result<BoxStream<'c, AcknowledgedTicket>, Self::Error>;
304
305    /// Marks tickets as the given [`TicketMarker`], removing them from the DB and updating the
306    /// ticket statistics for each ticket's channel.
307    ///
308    /// Returns the number of marked tickets.
309    async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize, Self::Error>;
310
311    /// Updates the ticket statistics according to the fact that the given ticket has
312    /// been rejected by the packet processing pipeline.
313    ///
314    /// This ticket is not yet stored in the ticket DB;
315    /// therefore, only the statistics in the corresponding channel are updated.
316    async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<(), Self::Error>;
317
318    /// Updates [state](AcknowledgedTicketStatus) of the tickets matching the given `selector`.
319    ///
320    /// Returns the updated tickets in the new state.
321    async fn update_ticket_states_and_fetch<'a>(
322        &'a self,
323        selector: TicketSelector,
324        new_state: AcknowledgedTicketStatus,
325    ) -> Result<BoxStream<'a, AcknowledgedTicket>, Self::Error>;
326
327    /// Updates [state](AcknowledgedTicketStatus) of the tickets matching the given `selector`.
328    async fn update_ticket_states(
329        &self,
330        selector: TicketSelector,
331        new_state: AcknowledgedTicketStatus,
332    ) -> Result<usize, Self::Error>;
333
334    /// Retrieves the ticket statistics for the given channel.
335    ///
336    /// If no channel is given, it retrieves aggregate ticket statistics for all channels.
337    async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics, Self::Error>;
338
339    /// Resets the ticket statistics about neglected, rejected, and redeemed tickets.
340    async fn reset_ticket_statistics(&self) -> Result<(), Self::Error>;
341
342    /// Counts the tickets matching the given `selector` and their total value.
343    ///
344    /// The optional transaction `tx` must be in the database.
345    async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, HoprBalance), Self::Error>;
346
347    /// Sets the stored outgoing ticket index to `index`, only if the currently stored value
348    /// is less than `index`. This ensures the stored value can only be growing.
349    ///
350    /// Returns the old value.
351    ///
352    /// If the entry is not yet present for the given ID, it is initialized to 0.
353    async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64, Self::Error>;
354
355    /// Resets the outgoing ticket index to 0 for the given channel id.
356    ///
357    /// Returns the old value before reset.
358    ///
359    /// If the entry is not yet present for the given ID, it is initialized to 0.
360    async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64, Self::Error>;
361
362    /// Increments the outgoing ticket index in the given channel ID and returns the value before incrementing.
363    ///
364    /// If the entry is not yet present for the given ID, it is initialized to 0 and incremented.
365    async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64, Self::Error>;
366
367    /// Gets the current outgoing ticket index for the given channel id.
368    ///
369    /// If the entry is not yet present for the given ID, it is initialized to 0.
370    async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>, Self::Error>;
371
372    /// Compares outgoing ticket indices in the cache with the stored values
373    /// and updates the stored value where changed.
374    ///
375    /// Returns the number of updated ticket indices.
376    async fn persist_outgoing_ticket_indices(&self) -> Result<usize, Self::Error>;
377}
378
379/// Can contain ticket statistics for a channel or aggregated ticket statistics for all channels.
380#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
381pub struct ChannelTicketStatistics {
382    pub winning_tickets: u128,
383    pub neglected_value: HoprBalance,
384    pub redeemed_value: HoprBalance,
385    pub unredeemed_value: HoprBalance,
386    pub rejected_value: HoprBalance,
387}