hopr_api/db/
tickets.rs

1use std::{
2    collections::{HashMap, HashSet},
3    fmt::{Display, Formatter},
4    ops::{Bound, RangeBounds},
5};
6
7use futures::stream::BoxStream;
8use hopr_internal_types::prelude::*;
9use hopr_primitive_types::prelude::*;
10
11/// Allows selecting a range of ticket indices in [`TicketSelector`].
12#[derive(Debug, Clone, PartialEq, Eq, Default)]
13pub enum TicketIndexSelector {
14    /// Selects no ticket index specifically.
15    /// This makes the [`TicketSelector`] less restrictive.
16    #[default]
17    None,
18    /// Selects a single ticket with the given index.
19    Single(u64),
20    /// Selects multiple tickets with the given indices.
21    Multiple(HashSet<u64>),
22    /// Selects multiple tickets with indices within the given range.
23    Range((Bound<u64>, Bound<u64>)),
24}
25
26impl Display for TicketIndexSelector {
27    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
28        match &self {
29            TicketIndexSelector::None => write!(f, ""),
30            TicketIndexSelector::Single(idx) => write!(f, "with index {idx}"),
31            TicketIndexSelector::Multiple(indices) => write!(f, "with indices {indices:?}"),
32            TicketIndexSelector::Range((lb, ub)) => write!(f, "with indices in {lb:?}..{ub:?}"),
33        }
34    }
35}
36
37/// Allows selecting tickets via [`HoprDbTicketOperations`].
38///
39/// The `TicketSelector` always allows selecting only tickets in a single channel.
40/// To select tickets across multiple channels, multiple `TicketSelector`s must be used.
41#[derive(Clone, Debug)]
42pub struct TicketSelector {
43    /// Channel ID and Epoch pair.
44    pub channel_identifier: (ChannelId, u32),
45    /// If given, will select ticket(s) with the given indices
46    /// in the given channel and epoch.
47    ///
48    /// See [`TicketIndexSelector`] for possible options.
49    pub index: TicketIndexSelector,
50    /// If given, the tickets are further restricted to the ones with a winning probability
51    /// in this range.
52    pub win_prob: (Bound<WinningProbability>, Bound<WinningProbability>),
53    /// If given, the tickets are further restricted to the ones with an amount
54    /// in this range.
55    pub amount: (Bound<HoprBalance>, Bound<HoprBalance>),
56    /// Further restriction to tickets with the given state.
57    pub state: Option<AcknowledgedTicketStatus>,
58}
59
60impl Display for TicketSelector {
61    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62        let out = format!(
63            "ticket selector in {:?} {}{}{}{}",
64            self.channel_identifier,
65            self.index,
66            self.state
67                .map(|state| format!(" in state {state}"))
68                .unwrap_or("".into()),
69            match &self.win_prob {
70                (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
71                bounds => format!(" with winning probability in {bounds:?}"),
72            },
73            match &self.amount {
74                (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
75                bounds => format!(" with amount in {bounds:?}"),
76            },
77        );
78        write!(f, "{}", out.trim())
79    }
80}
81
82fn approx_cmp_bounds(b1: Bound<WinningProbability>, b2: Bound<WinningProbability>) -> bool {
83    match (b1, b2) {
84        (Bound::Unbounded, Bound::Unbounded) => true,
85        (Bound::Included(a), Bound::Included(b)) => b.approx_eq(&a),
86        (Bound::Excluded(a), Bound::Excluded(b)) => b.approx_eq(&a),
87        _ => false,
88    }
89}
90
91impl PartialEq for TicketSelector {
92    fn eq(&self, other: &Self) -> bool {
93        self.channel_identifier == other.channel_identifier
94            && self.index == other.index
95            && self.state == other.state
96            && self.amount == other.amount
97            && approx_cmp_bounds(self.win_prob.0, other.win_prob.0)
98            && approx_cmp_bounds(self.win_prob.1, other.win_prob.1)
99    }
100}
101
102impl TicketSelector {
103    /// Create a new ticket selector given the `channel_id` and `epoch`.
104    pub fn new(channel_id: ChannelId, epoch: u32) -> Self {
105        Self {
106            channel_identifier: (channel_id, epoch),
107            index: TicketIndexSelector::None,
108            win_prob: (Bound::Unbounded, Bound::Unbounded),
109            amount: (Bound::Unbounded, Bound::Unbounded),
110            state: None,
111        }
112    }
113
114    /// If `false` is returned, the selector can fetch more than a single ticket.
115    pub fn is_unique(&self) -> bool {
116        matches!(&self.index, TicketIndexSelector::Single(_))
117            || matches!(&self.index, TicketIndexSelector::Multiple(indices) if indices.len() == 1)
118    }
119
120    /// Returns this instance with a ticket index set.
121    ///
122    /// This method can be called multiple times to select multiple tickets.
123    /// If [`TicketSelector::with_index_range`] was previously called, it will be replaced.
124    #[must_use]
125    pub fn with_index(mut self, index: u64) -> Self {
126        self.index = match self.index {
127            TicketIndexSelector::None | TicketIndexSelector::Range(_) => TicketIndexSelector::Single(index),
128            TicketIndexSelector::Single(existing) => {
129                TicketIndexSelector::Multiple(HashSet::from_iter([existing, index]))
130            }
131            TicketIndexSelector::Multiple(mut existing) => {
132                existing.insert(index);
133                TicketIndexSelector::Multiple(existing)
134            }
135        };
136        self
137    }
138
139    /// Removes all other restriction except for the channel ID and epoch.
140    #[must_use]
141    pub fn only_channel(mut self) -> Self {
142        self.index = TicketIndexSelector::None;
143        self.win_prob = (Bound::Unbounded, Bound::Unbounded);
144        self.amount = (Bound::Unbounded, Bound::Unbounded);
145        self.state = None;
146        self
147    }
148
149    /// Returns this instance with a ticket index upper bound set.
150    /// If [`TicketSelector::with_index`] was previously called, it will be replaced.
151    #[must_use]
152    pub fn with_index_range<T: RangeBounds<u64>>(mut self, index_bound: T) -> Self {
153        self.index = TicketIndexSelector::Range((index_bound.start_bound().cloned(), index_bound.end_bound().cloned()));
154        self
155    }
156
157    /// Returns this instance with a ticket state set.
158    #[must_use]
159    pub fn with_state(mut self, state: AcknowledgedTicketStatus) -> Self {
160        self.state = Some(state);
161        self
162    }
163
164    /// Returns this instance without a ticket state set.
165    #[must_use]
166    pub fn with_no_state(mut self) -> Self {
167        self.state = None;
168        self
169    }
170
171    /// Returns this instance with a winning probability range bounds set.
172    #[must_use]
173    pub fn with_winning_probability<T: RangeBounds<WinningProbability>>(mut self, range: T) -> Self {
174        self.win_prob = (range.start_bound().cloned(), range.end_bound().cloned());
175        self
176    }
177
178    /// Returns this instance with the ticket amount range bounds set.
179    #[must_use]
180    pub fn with_amount<T: RangeBounds<HoprBalance>>(mut self, range: T) -> Self {
181        self.amount = (range.start_bound().cloned(), range.end_bound().cloned());
182        self
183    }
184}
185
186impl From<&AcknowledgedTicket> for TicketSelector {
187    fn from(value: &AcknowledgedTicket) -> Self {
188        Self::from(&value.ticket)
189    }
190}
191
192impl From<&RedeemableTicket> for TicketSelector {
193    fn from(value: &RedeemableTicket) -> Self {
194        Self::from(&value.ticket)
195    }
196}
197
198impl From<&VerifiedTicket> for TicketSelector {
199    fn from(value: &VerifiedTicket) -> Self {
200        Self {
201            channel_identifier: (*value.channel_id(), value.verified_ticket().channel_epoch),
202            index: TicketIndexSelector::Single(value.verified_ticket().index),
203            win_prob: (Bound::Unbounded, Bound::Unbounded),
204            amount: (Bound::Unbounded, Bound::Unbounded),
205            state: None,
206        }
207    }
208}
209
210impl From<&ChannelEntry> for TicketSelector {
211    fn from(value: &ChannelEntry) -> Self {
212        Self {
213            channel_identifier: (*value.get_id(), value.channel_epoch),
214            index: TicketIndexSelector::None,
215            win_prob: (Bound::Unbounded, Bound::Unbounded),
216            amount: (Bound::Unbounded, Bound::Unbounded),
217            state: None,
218        }
219    }
220}
221
222impl From<ChannelEntry> for TicketSelector {
223    fn from(value: ChannelEntry) -> Self {
224        TicketSelector::from(&value)
225    }
226}
227
228/// Different markers for unredeemed tickets.
229/// See [`HoprDbTicketOperations::mark_tickets_as`] for usage.
230#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display, Hash)]
231#[strum(serialize_all = "lowercase")]
232pub enum TicketMarker {
233    /// Ticket has been successfully redeemed on-chain.
234    Redeemed,
235    /// An invalid ticket has been rejected by the packet processing pipeline.
236    Rejected,
237    /// A winning ticket that was not redeemed on-chain (e.g.: due to the channel being closed)
238    Neglected,
239}
240
241/// Database operations for tickets.
242///
243/// The redeemable winning tickets enter the DB via [`HoprDb::insert_ticket`] and can only leave the DB
244/// when [marked](TicketMarker) via [`HoprDbTicketOperations::mark_tickets_as`]
245///
246/// The overall value of tickets in the DB and of those that left the DB is tracked
247/// via the [`ChannelTicketStatistics`] by calling the [`HoprDbTicketOperations::get_ticket_statistics`].
248///
249/// The statistics can also track tickets that were rejected before entering the DB,
250/// which can be done via [`HoprDbTicketOperations::mark_unsaved_ticket_rejected`].
251///
252/// NOTE: tickets that are not winning are NOT considered as rejected. Non-winning tickets
253/// are therefore not tracked in any statistics, also for performance reasons.
254#[async_trait::async_trait]
255#[auto_impl::auto_impl(&, Box, Arc)]
256pub trait HoprDbTicketOperations {
257    type Error: std::error::Error + Send + Sync + 'static;
258
259    /// Retrieve acknowledged winning tickets, according to the given `selectors`.
260    ///
261    /// If no selector is given, streams tickets in all channels.
262    async fn stream_tickets<'c, S: Into<TicketSelector>, I: IntoIterator<Item = S> + Send>(
263        &'c self,
264        selectors: I,
265    ) -> Result<BoxStream<'c, RedeemableTicket>, Self::Error>;
266
267    /// Inserts a new winning ticket into the DB.
268    ///
269    /// Returns an error if the ticket already exists.
270    async fn insert_ticket(&self, ticket: RedeemableTicket) -> Result<(), Self::Error>;
271
272    /// Marks tickets as the given [`TicketMarker`], removing them from the DB and updating the
273    /// ticket statistics for each ticket's channel.
274    ///
275    /// Returns the number of marked tickets.
276    async fn mark_tickets_as<S: Into<TicketSelector> + Send, I: IntoIterator<Item = S> + Send>(
277        &self,
278        selectors: I,
279        mark_as: TicketMarker,
280    ) -> Result<usize, Self::Error>;
281
282    /// Updates the ticket statistics according to the fact that the given ticket has
283    /// been rejected by the packet processing pipeline.
284    ///
285    /// This ticket is not yet stored in the ticket DB;
286    /// therefore, only the statistics in the corresponding channel are updated, and the overall
287    /// unrealized value in the respective channel does not change.
288    async fn mark_unsaved_ticket_rejected(&self, issuer: &Address, ticket: &Ticket) -> Result<(), Self::Error>;
289
290    /// Updates the [state](AcknowledgedTicketStatus) of the tickets matching the given `selectors`.
291    ///
292    /// The operation should prevent any concurrent changes to the tickets before the stream is fully
293    /// consumed.
294    ///
295    /// Returns the updated tickets in the new state.
296    async fn update_ticket_states_and_fetch<'a, S: Into<TicketSelector>, I: IntoIterator<Item = S> + Send>(
297        &'a self,
298        selectors: I,
299        new_state: AcknowledgedTicketStatus,
300    ) -> Result<BoxStream<'a, RedeemableTicket>, Self::Error>;
301
302    /// Updates [state](AcknowledgedTicketStatus) of the tickets matching the given `selector`.
303    async fn update_ticket_states<S: Into<TicketSelector>, I: IntoIterator<Item = S> + Send>(
304        &self,
305        selectors: I,
306        new_state: AcknowledgedTicketStatus,
307    ) -> Result<usize, Self::Error>;
308
309    /// Retrieves the ticket statistics for the given channel.
310    ///
311    /// If no channel is given, it retrieves aggregate ticket statistics for all channels.
312    async fn get_ticket_statistics(
313        &self,
314        channel_id: Option<ChannelId>,
315    ) -> Result<ChannelTicketStatistics, Self::Error>;
316
317    /// Resets the ticket statistics about neglected, rejected, and redeemed tickets.
318    async fn reset_ticket_statistics(&self) -> Result<(), Self::Error>;
319
320    /// Counts the total value of tickets matching the channel.
321    ///
322    /// Returns the total ticket value.
323    async fn get_tickets_value(&self, id: &ChannelId, epoch: u32) -> Result<HoprBalance, Self::Error>;
324
325    /// Gets the index of the next outgoing ticket for the given channel.
326    ///
327    /// If such an entry does not exist, it is initialized with 0 and `None` is returned.
328    async fn get_or_create_outgoing_ticket_index(
329        &self,
330        channel_id: &ChannelId,
331        epoch: u32,
332    ) -> Result<Option<u64>, Self::Error>;
333
334    /// Stores the ticket index of the next outgoing ticket for the given channel.
335    ///
336    /// Does nothing if the entry for the given channel and epoch does not exist.
337    /// Returns an error if the given `index` is less than the current index in the DB.
338    async fn update_outgoing_ticket_index(
339        &self,
340        channel_id: &ChannelId,
341        epoch: u32,
342        index: u64,
343    ) -> Result<(), Self::Error>;
344
345    /// Removes the outgoing ticket index for the given channel and epoch.
346    ///
347    /// Does nothing if the value did not exist
348    async fn remove_outgoing_ticket_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<(), Self::Error>;
349}
350
351/// Contains ticket statistics for one or more channels.
352#[derive(Clone, Debug, PartialEq, Eq)]
353pub struct ChannelTicketStatistics {
354    /// Total number of winning tickets.
355    pub winning_tickets: u128,
356    /// Values of tickets that were finalized and removed from the DB.
357    pub finalized_values: HashMap<TicketMarker, HoprBalance>,
358    /// The total value in unredeemed winning tickets still in the DB.
359    pub unredeemed_value: HoprBalance,
360}
361
362impl Default for ChannelTicketStatistics {
363    fn default() -> Self {
364        Self {
365            winning_tickets: 0,
366            finalized_values: HashMap::from([
367                (TicketMarker::Neglected, HoprBalance::zero()),
368                (TicketMarker::Rejected, HoprBalance::zero()),
369                (TicketMarker::Redeemed, HoprBalance::zero()),
370            ]),
371            unredeemed_value: HoprBalance::zero(),
372        }
373    }
374}
375
376impl ChannelTicketStatistics {
377    /// The total value of neglected tickets.
378    pub fn neglected_value(&self) -> HoprBalance {
379        self.finalized_values
380            .get(&TicketMarker::Neglected)
381            .copied()
382            .unwrap_or_default()
383    }
384
385    /// The total value of rejected tickets.
386    pub fn rejected_value(&self) -> HoprBalance {
387        self.finalized_values
388            .get(&TicketMarker::Rejected)
389            .copied()
390            .unwrap_or_default()
391    }
392
393    /// The total value of redeemed tickets.
394    pub fn redeemed_value(&self) -> HoprBalance {
395        self.finalized_values
396            .get(&TicketMarker::Redeemed)
397            .copied()
398            .unwrap_or_default()
399    }
400}