hopr_db_api/
tickets.rs

1use std::collections::HashSet;
2use std::fmt::{Display, Formatter};
3use std::ops::{Bound, RangeBounds};
4use std::sync::atomic::AtomicU64;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use futures::stream::BoxStream;
9
10use hopr_crypto_types::prelude::*;
11use hopr_internal_types::prelude::*;
12use hopr_primitive_types::prelude::*;
13
14use crate::errors::Result;
15
16/// Allows selecting a range of ticket indices in [TicketSelector].
17#[derive(Debug, Clone, PartialEq, Eq, Default)]
18pub enum TicketIndexSelector {
19    /// Selects no ticket index specifically.
20    /// This makes the [TicketSelector] less restrictive.
21    #[default]
22    None,
23    /// Selects a single ticket with the given index.
24    Single(u64),
25    /// Selects multiple tickets with the given indices.
26    Multiple(HashSet<u64>),
27    /// Selects multiple tickets with indices within the given range.
28    Range((Bound<u64>, Bound<u64>)),
29}
30
31impl Display for TicketIndexSelector {
32    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33        match &self {
34            TicketIndexSelector::None => write!(f, ""),
35            TicketIndexSelector::Single(idx) => write!(f, "with index {idx}"),
36            TicketIndexSelector::Multiple(indices) => write!(f, "with indices {indices:?}"),
37            TicketIndexSelector::Range((lb, ub)) => write!(f, "with indices in {lb:?}..{ub:?}"),
38        }
39    }
40}
41
42/// Allows selecting multiple tickets (if `index` does not contain a single value)
43/// or a single ticket (with unitary `index`) in the given channel and epoch.
44/// The selection can be further restricted to select ticket only in the given `state`.
45#[derive(Clone, Debug, PartialEq, Eq)]
46pub struct TicketSelector {
47    /// Channel ID and Epoch pairs.
48    pub channel_identifiers: Vec<(Hash, U256)>,
49    /// If given, will select ticket(s) with the given indices
50    /// in the given channel and epoch.
51    /// See [TicketIndexSelector] for possible options.
52    pub index: TicketIndexSelector,
53    /// If given, the tickets are further restricted to the ones with a winning probability
54    /// in this range.
55    pub win_prob: (Bound<EncodedWinProb>, Bound<EncodedWinProb>),
56    /// If given, the tickets are further restricted to the ones with an amount
57    /// in this range.
58    pub amount: (Bound<Balance>, Bound<Balance>),
59    /// Further restriction to tickets with the given state.
60    pub state: Option<AcknowledgedTicketStatus>,
61    /// Further restrict to only aggregated tickets.
62    pub only_aggregated: bool,
63}
64
65impl Display for TicketSelector {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        let out = format!(
68            "ticket selector in {:?} {}{}{}{}{}",
69            self.channel_identifiers,
70            self.index,
71            self.state
72                .map(|state| format!(" in state {state}"))
73                .unwrap_or("".into()),
74            if self.only_aggregated { " only aggregated" } else { "" },
75            match &self.win_prob {
76                (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
77                bounds => format!(" with winning probability in {bounds:?}"),
78            },
79            match &self.amount {
80                (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
81                bounds => format!(" with amount in {bounds:?}"),
82            },
83        );
84        write!(f, "{}", out.trim())
85    }
86}
87
88impl TicketSelector {
89    /// Create a new ticket selector given the `channel_id` and `epoch`.
90    pub fn new<T: Into<U256>>(channel_id: Hash, epoch: T) -> Self {
91        Self {
92            channel_identifiers: vec![(channel_id, epoch.into())],
93            index: TicketIndexSelector::None,
94            win_prob: (Bound::Unbounded, Bound::Unbounded),
95            amount: (Bound::Unbounded, Bound::Unbounded),
96            state: None,
97            only_aggregated: false,
98        }
99    }
100
101    /// Allows matching tickets on multiple channels, by adding the given
102    /// `channel_id` and `epoch` to the selector.
103    ///
104    /// This also nullifies any prior effect of any prior calls to [`TicketSelector::with_index`] or
105    /// [`TicketSelector::with_index_range`]
106    /// as ticket indices cannot be matched over multiple channels.
107    pub fn also_on_channel<T: Into<U256>>(self, channel_id: Hash, epoch: T) -> Self {
108        let mut ret = self.clone();
109        ret.index = TicketIndexSelector::None;
110        ret.channel_identifiers.push((channel_id, epoch.into()));
111        ret
112    }
113
114    /// Sets the selector to match only tickets on the given `channel_id` and `epoch`.
115    /// This nullifies any prior calls to [`TicketSelector::also_on_channel`].
116    pub fn just_on_channel<T: Into<U256>>(self, channel_id: Hash, epoch: T) -> Self {
117        let mut ret = self.clone();
118        ret.channel_identifiers = vec![(channel_id, epoch.into())];
119        ret
120    }
121
122    /// Checks if this selector operates only on a single channel.
123    ///
124    /// This will return `false` if [`TicketSelector::also_on_channel`] was called, and neither
125    /// the [`TicketSelector::with_index`] nor [`TicketSelector::with_index_range`] were
126    /// called subsequently.
127    pub fn is_single_channel(&self) -> bool {
128        self.channel_identifiers.len() == 1
129    }
130
131    /// If `false` is returned, the selector can fetch more than a single ticket.
132    pub fn is_unique(&self) -> bool {
133        self.is_single_channel()
134            && (matches!(&self.index, TicketIndexSelector::Single(_))
135                || matches!(&self.index, TicketIndexSelector::Multiple(indices) if indices.len() == 1))
136    }
137
138    /// Returns this instance with a ticket index set.
139    /// This method can be called multiple times to select multiple tickets.
140    /// If [`TicketSelector::with_index_range`] was previously called, it will be replaced.
141    /// If [`TicketSelector::also_on_channel`] was previously called, its effect will be nullified.
142    pub fn with_index(mut self, index: u64) -> Self {
143        self.channel_identifiers.truncate(1);
144        self.index = match self.index {
145            TicketIndexSelector::None | TicketIndexSelector::Range(_) => TicketIndexSelector::Single(index),
146            TicketIndexSelector::Single(existing) => {
147                TicketIndexSelector::Multiple(HashSet::from_iter([existing, index]))
148            }
149            TicketIndexSelector::Multiple(mut existing) => {
150                existing.insert(index);
151                TicketIndexSelector::Multiple(existing)
152            }
153        };
154        self
155    }
156
157    /// Returns this instance with a ticket index upper bound set.
158    /// If [`TicketSelector::with_index`] was previously called, it will be replaced.
159    /// If [`TicketSelector::also_on_channel`] was previously called, its effect will be nullified.
160    pub fn with_index_range<T: RangeBounds<u64>>(mut self, index_bound: T) -> Self {
161        self.channel_identifiers.truncate(1);
162        self.index = TicketIndexSelector::Range((index_bound.start_bound().cloned(), index_bound.end_bound().cloned()));
163        self
164    }
165
166    /// Returns this instance with a ticket state set.
167    pub fn with_state(mut self, state: AcknowledgedTicketStatus) -> Self {
168        self.state = Some(state);
169        self
170    }
171
172    /// Returns this instance without a ticket state set.
173    pub fn with_no_state(mut self) -> Self {
174        self.state = None;
175        self
176    }
177
178    /// Returns this instance with `only_aggregated` flag value.
179    pub fn with_aggregated_only(mut self, only_aggregated: bool) -> Self {
180        self.only_aggregated = only_aggregated;
181        self
182    }
183
184    /// Returns this instance with a winning probability range bounds set.
185    pub fn with_winning_probability<T: RangeBounds<EncodedWinProb>>(mut self, range: T) -> Self {
186        self.win_prob = (range.start_bound().cloned(), range.end_bound().cloned());
187        self
188    }
189
190    /// Returns this instance with the ticket amount range bounds set.
191    pub fn with_amount<T: RangeBounds<Balance>>(mut self, range: T) -> Self {
192        self.amount = (range.start_bound().cloned(), range.end_bound().cloned());
193        self
194    }
195}
196
197impl From<&AcknowledgedTicket> for TicketSelector {
198    fn from(value: &AcknowledgedTicket) -> Self {
199        Self {
200            channel_identifiers: vec![(
201                value.verified_ticket().channel_id,
202                value.verified_ticket().channel_epoch.into(),
203            )],
204            index: TicketIndexSelector::Single(value.verified_ticket().index),
205            win_prob: (Bound::Unbounded, Bound::Unbounded),
206            amount: (Bound::Unbounded, Bound::Unbounded),
207            state: Some(value.status),
208            only_aggregated: value.verified_ticket().index_offset > 1,
209        }
210    }
211}
212
213impl From<&RedeemableTicket> for TicketSelector {
214    fn from(value: &RedeemableTicket) -> Self {
215        Self {
216            channel_identifiers: vec![(
217                value.verified_ticket().channel_id,
218                value.verified_ticket().channel_epoch.into(),
219            )],
220            index: TicketIndexSelector::Single(value.verified_ticket().index),
221            win_prob: (Bound::Unbounded, Bound::Unbounded),
222            amount: (Bound::Unbounded, Bound::Unbounded),
223            state: None,
224            only_aggregated: value.verified_ticket().index_offset > 1,
225        }
226    }
227}
228
229impl From<&ChannelEntry> for TicketSelector {
230    fn from(value: &ChannelEntry) -> Self {
231        Self {
232            channel_identifiers: vec![(value.get_id(), value.channel_epoch)],
233            index: TicketIndexSelector::None,
234            win_prob: (Bound::Unbounded, Bound::Unbounded),
235            amount: (Bound::Unbounded, Bound::Unbounded),
236            state: None,
237            only_aggregated: false,
238        }
239    }
240}
241
242impl From<ChannelEntry> for TicketSelector {
243    fn from(value: ChannelEntry) -> Self {
244        TicketSelector::from(&value)
245    }
246}
247
248/// Different markers for unredeemed tickets.
249/// See [`HoprDbTicketOperations::mark_tickets_as`] for usage.
250#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display)]
251#[strum(serialize_all = "lowercase")]
252pub enum TicketMarker {
253    Redeemed,
254    Rejected,
255    Neglected,
256}
257
258/// Prerequisites for the ticket aggregator.
259/// The prerequisites are **independent** of each other.
260/// If none of the prerequisites are given, they are considered satisfied.
261#[derive(Debug, Clone, Copy, PartialEq, Default)]
262pub struct AggregationPrerequisites {
263    /// Minimum number of tickets in the channel.
264    pub min_ticket_count: Option<usize>,
265    /// Minimum ratio between balance of unaggregated messages and channel stake.
266    /// I.e.: the condition is met if a sum of unaggregated ticket amounts divided by
267    /// the total channel stake is greater than `min_unaggregated_ratio`.
268    pub min_unaggregated_ratio: Option<f64>,
269}
270
271#[async_trait]
272pub trait HoprDbTicketOperations {
273    /// Retrieve acknowledged winning tickets, according to the given `selector`.
274    ///
275    /// The optional transaction `tx` must be in the database.
276    async fn get_all_tickets(&self) -> Result<Vec<AcknowledgedTicket>>;
277
278    /// Retrieve acknowledged winning tickets, according to the given `selector`.
279    ///
280    /// The optional transaction `tx` must be in the database.
281    async fn get_tickets(&self, selector: TicketSelector) -> Result<Vec<AcknowledgedTicket>>;
282
283    /// Marks tickets as the given [`TicketMarker`], removing them from the DB and updating the
284    /// ticket statistics for each ticket's channel.
285    ///
286    /// Returns the number of marked tickets.
287    async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize>;
288
289    /// Updates the ticket statistics according to the fact that the given ticket has
290    /// been rejected by the packet processing pipeline.
291    ///
292    /// This ticket is not yet stored in the ticket DB;
293    /// therefore, only the statistics in the corresponding channel are updated.
294    async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<()>;
295
296    /// Updates [state](AcknowledgedTicketStatus) of the tickets matching the given `selector`.
297    ///
298    /// Returns the updated tickets in the new state.
299    async fn update_ticket_states_and_fetch<'a>(
300        &'a self,
301        selector: TicketSelector,
302        new_state: AcknowledgedTicketStatus,
303    ) -> Result<BoxStream<'a, AcknowledgedTicket>>;
304
305    /// Updates [state](AcknowledgedTicketStatus) of the tickets matching the given `selector`.
306    async fn update_ticket_states(
307        &self,
308        selector: TicketSelector,
309        new_state: AcknowledgedTicketStatus,
310    ) -> Result<usize>;
311
312    /// Retrieves the ticket statistics for the given channel.
313    ///
314    /// If no channel is given, it retrieves aggregate ticket statistics for all channels.
315    async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics>;
316
317    /// Resets the ticket statistics about neglected, rejected, and redeemed tickets.
318    async fn reset_ticket_statistics(&self) -> Result<()>;
319
320    /// Counts the tickets matching the given `selector` and their total value.
321    ///
322    /// The optional transaction `tx` must be in the database.
323    async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, Balance)>;
324
325    /// Sets the stored outgoing ticket index to `index`, only if the currently stored value
326    /// is less than `index`. This ensures the stored value can only be growing.
327    ///
328    /// Returns the old value.
329    ///
330    /// If the entry is not yet present for the given ID, it is initialized to 0.
331    async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64>;
332
333    /// Resets the outgoing ticket index to 0 for the given channel id.
334    ///
335    /// Returns the old value before reset.
336    ///
337    /// If the entry is not yet present for the given ID, it is initialized to 0.
338    async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64>;
339
340    /// Increments the outgoing ticket index in the given channel ID and returns the value before incrementing.
341    ///
342    /// If the entry is not yet present for the given ID, it is initialized to 0 and incremented.
343    async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64>;
344
345    /// Gets the current outgoing ticket index for the given channel id.
346    ///
347    /// If the entry is not yet present for the given ID, it is initialized to 0.
348    async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>>;
349
350    /// Compares outgoing ticket indices in the cache with the stored values
351    /// and updates the stored value where changed.
352    ///
353    /// Returns the number of updated ticket indices.
354    async fn persist_outgoing_ticket_indices(&self) -> Result<usize>;
355
356    /// Prepare a viable collection of tickets to be aggregated.
357    ///
358    /// Some preconditions for tickets apply. This callback will collect the aggregatable
359    /// tickets and marks them as being aggregated.
360    async fn prepare_aggregation_in_channel(
361        &self,
362        channel: &Hash,
363        prerequisites: AggregationPrerequisites,
364    ) -> Result<Option<(OffchainPublicKey, Vec<TransferableWinningTicket>, Hash)>>;
365
366    /// Perform a ticket aggregation rollback in the channel.
367    ///
368    /// If a ticket aggregation fails, this callback can be invoked to make sure that
369    /// resources are properly restored and cleaned up in the database, allowing further
370    /// aggregations.
371    async fn rollback_aggregation_in_channel(&self, channel: Hash) -> Result<()>;
372
373    /// Replace the aggregated tickets locally with an aggregated ticket from the counterparty.
374    async fn process_received_aggregated_ticket(
375        &self,
376        aggregated_ticket: Ticket,
377        chain_keypair: &ChainKeypair,
378    ) -> Result<AcknowledgedTicket>;
379
380    /// Performs ticket aggregation as an issuing party of the given tickets.
381    async fn aggregate_tickets(
382        &self,
383        destination: OffchainPublicKey,
384        acked_tickets: Vec<TransferableWinningTicket>,
385        me: &ChainKeypair,
386    ) -> Result<VerifiedTicket>;
387
388    /// Fix next ticket state if its out-of-sync in all this node's channels.
389    async fn fix_channels_next_ticket_state(&self) -> Result<()>;
390}
391
392/// Can contain ticket statistics for a channel or aggregated ticket statistics for all channels.
393#[derive(Copy, Clone, Debug, PartialEq, Eq)]
394pub struct ChannelTicketStatistics {
395    pub winning_tickets: u128,
396    pub neglected_value: Balance,
397    pub redeemed_value: Balance,
398    pub unredeemed_value: Balance,
399    pub rejected_value: Balance,
400}
401
402impl Default for ChannelTicketStatistics {
403    fn default() -> Self {
404        Self {
405            winning_tickets: 0,
406            neglected_value: BalanceType::HOPR.zero(),
407            redeemed_value: BalanceType::HOPR.zero(),
408            unredeemed_value: BalanceType::HOPR.zero(),
409            rejected_value: BalanceType::HOPR.zero(),
410        }
411    }
412}