hopr_api/db/
tickets.rs

1use std::{
2    collections::{HashMap, HashSet},
3    fmt::{Display, Formatter},
4    ops::{Bound, RangeBounds},
5};
6
7use futures::stream::BoxStream;
8use hopr_internal_types::prelude::*;
9use hopr_primitive_types::prelude::*;
10
11/// Allows selecting a range of ticket indices in [`TicketSelector`].
12#[derive(Debug, Clone, PartialEq, Eq, Default)]
13pub enum TicketIndexSelector {
14    /// Selects no ticket index specifically.
15    /// This makes the [`TicketSelector`] less restrictive.
16    #[default]
17    None,
18    /// Selects a single ticket with the given index.
19    Single(u64),
20    /// Selects multiple tickets with the given indices.
21    Multiple(HashSet<u64>),
22    /// Selects multiple tickets with indices within the given range.
23    Range((Bound<u64>, Bound<u64>)),
24}
25
26impl Display for TicketIndexSelector {
27    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
28        match &self {
29            TicketIndexSelector::None => write!(f, ""),
30            TicketIndexSelector::Single(idx) => write!(f, "with index {idx}"),
31            TicketIndexSelector::Multiple(indices) => write!(f, "with indices {indices:?}"),
32            TicketIndexSelector::Range((lb, ub)) => write!(f, "with indices in {lb:?}..{ub:?}"),
33        }
34    }
35}
36
37/// Allows selecting tickets via [`HoprDbTicketOperations`].
38///
39/// The `TicketSelector` always allows selecting only tickets in a single channel.
40/// To select tickets across multiple channels, multiple `TicketSelector`s must be used.
41#[derive(Clone, Debug)]
42pub struct TicketSelector {
43    /// Channel ID and Epoch pair.
44    pub channel_identifier: (ChannelId, u32),
45    /// If given, will select ticket(s) with the given indices
46    /// in the given channel and epoch.
47    ///
48    /// See [`TicketIndexSelector`] for possible options.
49    pub index: TicketIndexSelector,
50    /// If given, the tickets are further restricted to the ones with a winning probability
51    /// in this range.
52    pub win_prob: (Bound<WinningProbability>, Bound<WinningProbability>),
53    /// If given, the tickets are further restricted to the ones with an amount
54    /// in this range.
55    pub amount: (Bound<HoprBalance>, Bound<HoprBalance>),
56    /// Further restriction to tickets with the given state.
57    pub state: Option<AcknowledgedTicketStatus>,
58}
59
60impl Display for TicketSelector {
61    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62        let out = format!(
63            "ticket selector in {:?} {}{}{}{}",
64            self.channel_identifier,
65            self.index,
66            self.state
67                .map(|state| format!(" in state {state}"))
68                .unwrap_or("".into()),
69            match &self.win_prob {
70                (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
71                bounds => format!(" with winning probability in {bounds:?}"),
72            },
73            match &self.amount {
74                (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
75                bounds => format!(" with amount in {bounds:?}"),
76            },
77        );
78        write!(f, "{}", out.trim())
79    }
80}
81
82fn approx_cmp_bounds(b1: Bound<WinningProbability>, b2: Bound<WinningProbability>) -> bool {
83    match (b1, b2) {
84        (Bound::Unbounded, Bound::Unbounded) => true,
85        (Bound::Included(a), Bound::Included(b)) => b.approx_eq(&a),
86        (Bound::Excluded(a), Bound::Excluded(b)) => b.approx_eq(&a),
87        _ => false,
88    }
89}
90
91impl PartialEq for TicketSelector {
92    fn eq(&self, other: &Self) -> bool {
93        self.channel_identifier == other.channel_identifier
94            && self.index == other.index
95            && self.state == other.state
96            && self.amount == other.amount
97            && approx_cmp_bounds(self.win_prob.0, other.win_prob.0)
98            && approx_cmp_bounds(self.win_prob.1, other.win_prob.1)
99    }
100}
101
102impl TicketSelector {
103    /// Create a new ticket selector given the `channel_id` and `epoch`.
104    pub fn new(channel_id: ChannelId, epoch: u32) -> Self {
105        Self {
106            channel_identifier: (channel_id, epoch),
107            index: TicketIndexSelector::None,
108            win_prob: (Bound::Unbounded, Bound::Unbounded),
109            amount: (Bound::Unbounded, Bound::Unbounded),
110            state: None,
111        }
112    }
113
114    /// If `false` is returned, the selector can fetch more than a single ticket.
115    pub fn is_unique(&self) -> bool {
116        matches!(&self.index, TicketIndexSelector::Single(_))
117            || matches!(&self.index, TicketIndexSelector::Multiple(indices) if indices.len() == 1)
118    }
119
120    /// Returns this instance with a ticket index set.
121    ///
122    /// This method can be called multiple times to select multiple tickets.
123    /// If [`TicketSelector::with_index_range`] was previously called, it will be replaced.
124    #[must_use]
125    pub fn with_index(mut self, index: u64) -> Self {
126        self.index = match self.index {
127            TicketIndexSelector::None | TicketIndexSelector::Range(_) => TicketIndexSelector::Single(index),
128            TicketIndexSelector::Single(existing) => {
129                TicketIndexSelector::Multiple(HashSet::from_iter([existing, index]))
130            }
131            TicketIndexSelector::Multiple(mut existing) => {
132                existing.insert(index);
133                TicketIndexSelector::Multiple(existing)
134            }
135        };
136        self
137    }
138
139    /// Returns this instance with a ticket index upper bound set.
140    /// If [`TicketSelector::with_index`] was previously called, it will be replaced.
141    #[must_use]
142    pub fn with_index_range<T: RangeBounds<u64>>(mut self, index_bound: T) -> Self {
143        self.index = TicketIndexSelector::Range((index_bound.start_bound().cloned(), index_bound.end_bound().cloned()));
144        self
145    }
146
147    /// Returns this instance with a ticket state set.
148    #[must_use]
149    pub fn with_state(mut self, state: AcknowledgedTicketStatus) -> Self {
150        self.state = Some(state);
151        self
152    }
153
154    /// Returns this instance without a ticket state set.
155    #[must_use]
156    pub fn with_no_state(mut self) -> Self {
157        self.state = None;
158        self
159    }
160
161    /// Returns this instance with a winning probability range bounds set.
162    #[must_use]
163    pub fn with_winning_probability<T: RangeBounds<WinningProbability>>(mut self, range: T) -> Self {
164        self.win_prob = (range.start_bound().cloned(), range.end_bound().cloned());
165        self
166    }
167
168    /// Returns this instance with the ticket amount range bounds set.
169    #[must_use]
170    pub fn with_amount<T: RangeBounds<HoprBalance>>(mut self, range: T) -> Self {
171        self.amount = (range.start_bound().cloned(), range.end_bound().cloned());
172        self
173    }
174}
175
176impl From<&AcknowledgedTicket> for TicketSelector {
177    fn from(value: &AcknowledgedTicket) -> Self {
178        Self::from(&value.ticket)
179    }
180}
181
182impl From<&RedeemableTicket> for TicketSelector {
183    fn from(value: &RedeemableTicket) -> Self {
184        Self::from(&value.ticket)
185    }
186}
187
188impl From<&VerifiedTicket> for TicketSelector {
189    fn from(value: &VerifiedTicket) -> Self {
190        Self {
191            channel_identifier: (*value.channel_id(), value.verified_ticket().channel_epoch),
192            index: TicketIndexSelector::Single(value.verified_ticket().index),
193            win_prob: (Bound::Unbounded, Bound::Unbounded),
194            amount: (Bound::Unbounded, Bound::Unbounded),
195            state: None,
196        }
197    }
198}
199
200impl From<&ChannelEntry> for TicketSelector {
201    fn from(value: &ChannelEntry) -> Self {
202        Self {
203            channel_identifier: (*value.get_id(), value.channel_epoch),
204            index: TicketIndexSelector::None,
205            win_prob: (Bound::Unbounded, Bound::Unbounded),
206            amount: (Bound::Unbounded, Bound::Unbounded),
207            state: None,
208        }
209    }
210}
211
212impl From<ChannelEntry> for TicketSelector {
213    fn from(value: ChannelEntry) -> Self {
214        TicketSelector::from(&value)
215    }
216}
217
218/// Different markers for unredeemed tickets.
219/// See [`HoprDbTicketOperations::mark_tickets_as`] for usage.
220#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display, Hash)]
221#[strum(serialize_all = "lowercase")]
222pub enum TicketMarker {
223    /// Ticket has been successfully redeemed on-chain.
224    Redeemed,
225    /// An invalid ticket has been rejected by the packet processing pipeline.
226    Rejected,
227    /// A winning ticket that was not redeemed on-chain (e.g.: due to the channel being closed)
228    Neglected,
229}
230
231// TODO: refactor this trait further so that caching responsibility does not lie in the DB (#7575)
232/// Database operations for tickets.
233///
234/// The redeemable winning tickets enter the DB via [`HoprDb::insert_ticket`] and can only leave the DB
235/// when [marked](TicketMarker) via [`HoprDbTicketOperations::mark_tickets_as`]
236///
237/// The overall value of tickets in the DB and of those that left the DB is tracked
238/// via the [`ChannelTicketStatistics`] by calling the [`HoprDbTicketOperations::get_ticket_statistics`].
239///
240/// The statistics can also track tickets that were rejected before entering the DB,
241/// which can be done via [`HoprDbTicketOperations::mark_unsaved_ticket_rejected`].
242///
243/// NOTE: tickets that are not winning are NOT considered as rejected. Non-winning tickets
244/// are therefore not tracked in any statistics, also for performance reasons.
245#[async_trait::async_trait]
246#[auto_impl::auto_impl(&, Box, Arc)]
247pub trait HoprDbTicketOperations {
248    type Error: std::error::Error + Send + Sync + 'static;
249
250    /// Retrieve acknowledged winning tickets, according to the given `selectors`.
251    ///
252    /// If no selector is given, streams tickets in all channels.
253    async fn stream_tickets<'c, S: Into<TicketSelector>, I: IntoIterator<Item = S> + Send>(
254        &'c self,
255        selectors: I,
256    ) -> Result<BoxStream<'c, RedeemableTicket>, Self::Error>;
257
258    /// Inserts a new winning ticket into the DB.
259    ///
260    /// Returns an error if the ticket already exists.
261    async fn insert_ticket(&self, ticket: RedeemableTicket) -> Result<(), Self::Error>;
262
263    /// Marks tickets as the given [`TicketMarker`], removing them from the DB and updating the
264    /// ticket statistics for each ticket's channel.
265    ///
266    /// Returns the number of marked tickets.
267    async fn mark_tickets_as<S: Into<TicketSelector> + Send, I: IntoIterator<Item = S> + Send>(
268        &self,
269        selectors: I,
270        mark_as: TicketMarker,
271    ) -> Result<usize, Self::Error>;
272
273    /// Updates the ticket statistics according to the fact that the given ticket has
274    /// been rejected by the packet processing pipeline.
275    ///
276    /// This ticket is not yet stored in the ticket DB;
277    /// therefore, only the statistics in the corresponding channel are updated.
278    async fn mark_unsaved_ticket_rejected(&self, issuer: &Address, ticket: &Ticket) -> Result<(), Self::Error>;
279
280    /// Updates the [state](AcknowledgedTicketStatus) of the tickets matching the given `selectors`.
281    ///
282    /// Returns the updated tickets in the new state.
283    async fn update_ticket_states_and_fetch<'a, S: Into<TicketSelector>, I: IntoIterator<Item = S> + Send>(
284        &'a self,
285        selectors: I,
286        new_state: AcknowledgedTicketStatus,
287    ) -> Result<BoxStream<'a, RedeemableTicket>, Self::Error>;
288
289    /// Updates [state](AcknowledgedTicketStatus) of the tickets matching the given `selector`.
290    async fn update_ticket_states<S: Into<TicketSelector>, I: IntoIterator<Item = S> + Send>(
291        &self,
292        selectors: I,
293        new_state: AcknowledgedTicketStatus,
294    ) -> Result<usize, Self::Error>;
295
296    /// Retrieves the ticket statistics for the given channel.
297    ///
298    /// If no channel is given, it retrieves aggregate ticket statistics for all channels.
299    async fn get_ticket_statistics(
300        &self,
301        channel_id: Option<ChannelId>,
302    ) -> Result<ChannelTicketStatistics, Self::Error>;
303
304    /// Resets the ticket statistics about neglected, rejected, and redeemed tickets.
305    async fn reset_ticket_statistics(&self) -> Result<(), Self::Error>;
306
307    /// Counts the total value of tickets matching the given `selector` on a single channel.
308    async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, HoprBalance), Self::Error>;
309
310    // TODO: outgoing ticket index manipulations APIs will be refactored in #7575
311
312    /// Sets the stored outgoing ticket index to `index`, only if the currently stored value
313    /// is less than `index`. This ensures the stored value can only be growing.
314    ///
315    /// Returns the old value.
316    ///
317    /// If the entry is not yet present for the given ID, it is initialized to 0.
318    async fn compare_and_set_outgoing_ticket_index(
319        &self,
320        channel_id: &ChannelId,
321        index: u64,
322    ) -> Result<u64, Self::Error>;
323
324    /// Resets the outgoing ticket index to 0 for the given channel id.
325    ///
326    /// Returns the old value before reset.
327    ///
328    /// If the entry is not yet present for the given ID, it is initialized to 0.
329    async fn reset_outgoing_ticket_index(&self, channel_id: &ChannelId) -> Result<u64, Self::Error>;
330
331    /// Increments the outgoing ticket index in the given channel ID and returns the value before incrementing.
332    ///
333    /// If the entry is not yet present for the given ID, it is initialized to 0 and incremented.
334    async fn increment_outgoing_ticket_index(&self, channel_id: &ChannelId) -> Result<u64, Self::Error>;
335
336    /// Compares outgoing ticket indices in the cache with the stored values
337    /// and updates the stored value where changed.
338    ///
339    /// Returns the number of updated ticket indices.
340    async fn persist_outgoing_ticket_indices(&self) -> Result<usize, Self::Error>;
341}
342
343/// Contains ticket statistics for one or more channels.
344#[derive(Clone, Debug, PartialEq, Eq)]
345pub struct ChannelTicketStatistics {
346    /// Total number of winning tickets.
347    pub winning_tickets: u128,
348    /// Values of tickets that were finalized and removed from the DB.
349    pub finalized_values: HashMap<TicketMarker, HoprBalance>,
350    /// The total value in unredeemed winning tickets still in the DB.
351    pub unredeemed_value: HoprBalance,
352}
353
354impl Default for ChannelTicketStatistics {
355    fn default() -> Self {
356        Self {
357            winning_tickets: 0,
358            finalized_values: HashMap::from([
359                (TicketMarker::Neglected, HoprBalance::zero()),
360                (TicketMarker::Rejected, HoprBalance::zero()),
361                (TicketMarker::Redeemed, HoprBalance::zero()),
362            ]),
363            unredeemed_value: HoprBalance::zero(),
364        }
365    }
366}
367
368impl ChannelTicketStatistics {
369    /// The total value of neglected tickets.
370    pub fn neglected_value(&self) -> HoprBalance {
371        self.finalized_values
372            .get(&TicketMarker::Neglected)
373            .copied()
374            .unwrap_or_default()
375    }
376
377    /// The total value of rejected tickets.
378    pub fn rejected_value(&self) -> HoprBalance {
379        self.finalized_values
380            .get(&TicketMarker::Rejected)
381            .copied()
382            .unwrap_or_default()
383    }
384
385    /// The total value of redeemed tickets.
386    pub fn redeemed_value(&self) -> HoprBalance {
387        self.finalized_values
388            .get(&TicketMarker::Redeemed)
389            .copied()
390            .unwrap_or_default()
391    }
392}