hopr_db_api/
tickets.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::ops::{Bound, RangeBounds};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use async_trait::async_trait;
use futures::stream::BoxStream;

use hopr_crypto_types::prelude::*;
use hopr_internal_types::prelude::*;
use hopr_primitive_types::prelude::*;

use crate::errors::Result;

/// Allows selecting a range of ticket indices in [TicketSelector].
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum TicketIndexSelector {
    /// Selects no ticket index specifically.
    /// This makes the [TicketSelector] less restrictive.
    #[default]
    None,
    /// Selects a single ticket with the given index.
    Single(u64),
    /// Selects multiple tickets with the given indices.
    Multiple(HashSet<u64>),
    /// Selects multiple tickets with indices within the given range.
    Range((Bound<u64>, Bound<u64>)),
}

impl Display for TicketIndexSelector {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        match &self {
            TicketIndexSelector::None => write!(f, ""),
            TicketIndexSelector::Single(idx) => write!(f, "with index {idx}"),
            TicketIndexSelector::Multiple(indices) => write!(f, "with indices {indices:?}"),
            TicketIndexSelector::Range((lb, ub)) => write!(f, "with indices in {lb:?}..{ub:?}"),
        }
    }
}

/// Allows selecting multiple tickets (if `index` does not contain a single value)
/// or a single ticket (with unitary `index`) in the given channel and epoch.
/// The selection can be further restricted to select ticket only in the given `state`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketSelector {
    /// Channel ID and Epoch pairs.
    pub channel_identifiers: Vec<(Hash, U256)>,
    /// If given, will select ticket(s) with the given indices
    /// in the given channel and epoch.
    /// See [TicketIndexSelector] for possible options.
    pub index: TicketIndexSelector,
    /// If given, the tickets are further restricted to the ones with a winning probability
    /// in this range.
    pub win_prob: (Bound<EncodedWinProb>, Bound<EncodedWinProb>),
    /// If given, the tickets are further restricted to the ones with an amount
    /// in this range.
    pub amount: (Bound<Balance>, Bound<Balance>),
    /// Further restriction to tickets with the given state.
    pub state: Option<AcknowledgedTicketStatus>,
    /// Further restrict to only aggregated tickets.
    pub only_aggregated: bool,
}

impl Display for TicketSelector {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        let out = format!(
            "ticket selector in {:?} {}{}{}{}{}",
            self.channel_identifiers,
            self.index,
            self.state
                .map(|state| format!(" in state {state}"))
                .unwrap_or("".into()),
            if self.only_aggregated { " only aggregated" } else { "" },
            match &self.win_prob {
                (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
                bounds => format!(" with winning probability in {bounds:?}"),
            },
            match &self.amount {
                (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
                bounds => format!(" with amount in {bounds:?}"),
            },
        );
        write!(f, "{}", out.trim())
    }
}

impl TicketSelector {
    /// Create a new ticket selector given the `channel_id` and `epoch`.
    pub fn new<T: Into<U256>>(channel_id: Hash, epoch: T) -> Self {
        Self {
            channel_identifiers: vec![(channel_id, epoch.into())],
            index: TicketIndexSelector::None,
            win_prob: (Bound::Unbounded, Bound::Unbounded),
            amount: (Bound::Unbounded, Bound::Unbounded),
            state: None,
            only_aggregated: false,
        }
    }

    /// Allows matching tickets on multiple channels, by adding the given
    /// `channel_id` and `epoch` to the selector.
    ///
    /// This also nullifies any prior effect of any prior calls to [`TicketSelector::with_index`] or
    /// [`TicketSelector::with_index_range`]
    /// as ticket indices cannot be matched over multiple channels.
    pub fn also_on_channel<T: Into<U256>>(self, channel_id: Hash, epoch: T) -> Self {
        let mut ret = self.clone();
        ret.index = TicketIndexSelector::None;
        ret.channel_identifiers.push((channel_id, epoch.into()));
        ret
    }

    /// Sets the selector to match only tickets on the given `channel_id` and `epoch`.
    /// This nullifies any prior calls to [`TicketSelector::also_on_channel`].
    pub fn just_on_channel<T: Into<U256>>(self, channel_id: Hash, epoch: T) -> Self {
        let mut ret = self.clone();
        ret.channel_identifiers = vec![(channel_id, epoch.into())];
        ret
    }

    /// Checks if this selector operates only on a single channel.
    ///
    /// This will return `false` if [`TicketSelector::also_on_channel`] was called, and neither
    /// the [`TicketSelector::with_index`] nor [`TicketSelector::with_index_range`] were
    /// called subsequently.
    pub fn is_single_channel(&self) -> bool {
        self.channel_identifiers.len() == 1
    }

    /// If `false` is returned, the selector can fetch more than a single ticket.
    pub fn is_unique(&self) -> bool {
        self.is_single_channel()
            && (matches!(&self.index, TicketIndexSelector::Single(_))
                || matches!(&self.index, TicketIndexSelector::Multiple(indices) if indices.len() == 1))
    }

    /// Returns this instance with a ticket index set.
    /// This method can be called multiple times to select multiple tickets.
    /// If [`TicketSelector::with_index_range`] was previously called, it will be replaced.
    /// If [`TicketSelector::also_on_channel`] was previously called, its effect will be nullified.
    pub fn with_index(mut self, index: u64) -> Self {
        self.channel_identifiers.truncate(1);
        self.index = match self.index {
            TicketIndexSelector::None | TicketIndexSelector::Range(_) => TicketIndexSelector::Single(index),
            TicketIndexSelector::Single(existing) => {
                TicketIndexSelector::Multiple(HashSet::from_iter([existing, index]))
            }
            TicketIndexSelector::Multiple(mut existing) => {
                existing.insert(index);
                TicketIndexSelector::Multiple(existing)
            }
        };
        self
    }

    /// Returns this instance with a ticket index upper bound set.
    /// If [`TicketSelector::with_index`] was previously called, it will be replaced.
    /// If [`TicketSelector::also_on_channel`] was previously called, its effect will be nullified.
    pub fn with_index_range<T: RangeBounds<u64>>(mut self, index_bound: T) -> Self {
        self.channel_identifiers.truncate(1);
        self.index = TicketIndexSelector::Range((index_bound.start_bound().cloned(), index_bound.end_bound().cloned()));
        self
    }

    /// Returns this instance with a ticket state set.
    pub fn with_state(mut self, state: AcknowledgedTicketStatus) -> Self {
        self.state = Some(state);
        self
    }

    /// Returns this instance without a ticket state set.
    pub fn with_no_state(mut self) -> Self {
        self.state = None;
        self
    }

    /// Returns this instance with `only_aggregated` flag value.
    pub fn with_aggregated_only(mut self, only_aggregated: bool) -> Self {
        self.only_aggregated = only_aggregated;
        self
    }

    /// Returns this instance with a winning probability range bounds set.
    pub fn with_winning_probability<T: RangeBounds<EncodedWinProb>>(mut self, range: T) -> Self {
        self.win_prob = (range.start_bound().cloned(), range.end_bound().cloned());
        self
    }

    /// Returns this instance with the ticket amount range bounds set.
    pub fn with_amount<T: RangeBounds<Balance>>(mut self, range: T) -> Self {
        self.amount = (range.start_bound().cloned(), range.end_bound().cloned());
        self
    }
}

impl From<&AcknowledgedTicket> for TicketSelector {
    fn from(value: &AcknowledgedTicket) -> Self {
        Self {
            channel_identifiers: vec![(
                value.verified_ticket().channel_id,
                value.verified_ticket().channel_epoch.into(),
            )],
            index: TicketIndexSelector::Single(value.verified_ticket().index),
            win_prob: (Bound::Unbounded, Bound::Unbounded),
            amount: (Bound::Unbounded, Bound::Unbounded),
            state: Some(value.status),
            only_aggregated: value.verified_ticket().index_offset > 1,
        }
    }
}

impl From<&RedeemableTicket> for TicketSelector {
    fn from(value: &RedeemableTicket) -> Self {
        Self {
            channel_identifiers: vec![(
                value.verified_ticket().channel_id,
                value.verified_ticket().channel_epoch.into(),
            )],
            index: TicketIndexSelector::Single(value.verified_ticket().index),
            win_prob: (Bound::Unbounded, Bound::Unbounded),
            amount: (Bound::Unbounded, Bound::Unbounded),
            state: None,
            only_aggregated: value.verified_ticket().index_offset > 1,
        }
    }
}

impl From<&ChannelEntry> for TicketSelector {
    fn from(value: &ChannelEntry) -> Self {
        Self {
            channel_identifiers: vec![(value.get_id(), value.channel_epoch)],
            index: TicketIndexSelector::None,
            win_prob: (Bound::Unbounded, Bound::Unbounded),
            amount: (Bound::Unbounded, Bound::Unbounded),
            state: None,
            only_aggregated: false,
        }
    }
}

impl From<ChannelEntry> for TicketSelector {
    fn from(value: ChannelEntry) -> Self {
        TicketSelector::from(&value)
    }
}

/// Different markers for unredeemed tickets.
/// See [`HoprDbTicketOperations::mark_tickets_as`] for usage.
#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display)]
#[strum(serialize_all = "lowercase")]
pub enum TicketMarker {
    Redeemed,
    Rejected,
    Neglected,
}

/// Prerequisites for the ticket aggregator.
/// The prerequisites are **independent** of each other.
/// If none of the prerequisites are given, they are considered satisfied.
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub struct AggregationPrerequisites {
    /// Minimum number of tickets in the channel.
    pub min_ticket_count: Option<usize>,
    /// Minimum ratio between balance of unaggregated messages and channel stake.
    /// I.e.: the condition is met if a sum of unaggregated ticket amounts divided by
    /// the total channel stake is greater than `min_unaggregated_ratio`.
    pub min_unaggregated_ratio: Option<f64>,
}

#[async_trait]
pub trait HoprDbTicketOperations {
    /// Retrieve acknowledged winning tickets, according to the given `selector`.
    ///
    /// The optional transaction `tx` must be in the database.
    async fn get_all_tickets(&self) -> Result<Vec<AcknowledgedTicket>>;

    /// Retrieve acknowledged winning tickets, according to the given `selector`.
    ///
    /// The optional transaction `tx` must be in the database.
    async fn get_tickets(&self, selector: TicketSelector) -> Result<Vec<AcknowledgedTicket>>;

    /// Marks tickets as the given [`TicketMarker`], removing them from the DB and updating the
    /// ticket statistics for each ticket's channel.
    ///
    /// Returns the number of marked tickets.
    async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize>;

    /// Updates the ticket statistics according to the fact that the given ticket has
    /// been rejected by the packet processing pipeline.
    ///
    /// This ticket is not yet stored in the ticket DB;
    /// therefore, only the statistics in the corresponding channel are updated.
    async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<()>;

    /// Updates [state](AcknowledgedTicketStatus) of the tickets matching the given `selector`.
    ///
    /// Returns the updated tickets in the new state.
    async fn update_ticket_states_and_fetch<'a>(
        &'a self,
        selector: TicketSelector,
        new_state: AcknowledgedTicketStatus,
    ) -> Result<BoxStream<'a, AcknowledgedTicket>>;

    /// Updates [state](AcknowledgedTicketStatus) of the tickets matching the given `selector`.
    async fn update_ticket_states(
        &self,
        selector: TicketSelector,
        new_state: AcknowledgedTicketStatus,
    ) -> Result<usize>;

    /// Retrieves the ticket statistics for the given channel.
    ///
    /// If no channel is given, it retrieves aggregate ticket statistics for all channels.
    async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics>;

    /// Resets the ticket statistics about neglected, rejected, and redeemed tickets.
    async fn reset_ticket_statistics(&self) -> Result<()>;

    /// Counts the tickets matching the given `selector` and their total value.
    ///
    /// The optional transaction `tx` must be in the database.
    async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, Balance)>;

    /// Sets the stored outgoing ticket index to `index`, only if the currently stored value
    /// is less than `index`. This ensures the stored value can only be growing.
    ///
    /// Returns the old value.
    ///
    /// If the entry is not yet present for the given ID, it is initialized to 0.
    async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64>;

    /// Resets the outgoing ticket index to 0 for the given channel id.
    ///
    /// Returns the old value before reset.
    ///
    /// If the entry is not yet present for the given ID, it is initialized to 0.
    async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64>;

    /// Increments the outgoing ticket index in the given channel ID and returns the value before incrementing.
    ///
    /// If the entry is not yet present for the given ID, it is initialized to 0 and incremented.
    async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64>;

    /// Gets the current outgoing ticket index for the given channel id.
    ///
    /// If the entry is not yet present for the given ID, it is initialized to 0.
    async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>>;

    /// Compares outgoing ticket indices in the cache with the stored values
    /// and updates the stored value where changed.
    ///
    /// Returns the number of updated ticket indices.
    async fn persist_outgoing_ticket_indices(&self) -> Result<usize>;

    /// Prepare a viable collection of tickets to be aggregated.
    ///
    /// Some preconditions for tickets apply. This callback will collect the aggregatable
    /// tickets and marks them as being aggregated.
    async fn prepare_aggregation_in_channel(
        &self,
        channel: &Hash,
        prerequisites: AggregationPrerequisites,
    ) -> Result<Option<(OffchainPublicKey, Vec<TransferableWinningTicket>, Hash)>>;

    /// Perform a ticket aggregation rollback in the channel.
    ///
    /// If a ticket aggregation fails, this callback can be invoked to make sure that
    /// resources are properly restored and cleaned up in the database, allowing further
    /// aggregations.
    async fn rollback_aggregation_in_channel(&self, channel: Hash) -> Result<()>;

    /// Replace the aggregated tickets locally with an aggregated ticket from the counterparty.
    async fn process_received_aggregated_ticket(
        &self,
        aggregated_ticket: Ticket,
        chain_keypair: &ChainKeypair,
    ) -> Result<AcknowledgedTicket>;

    /// Performs ticket aggregation as an issuing party of the given tickets.
    async fn aggregate_tickets(
        &self,
        destination: OffchainPublicKey,
        acked_tickets: Vec<TransferableWinningTicket>,
        me: &ChainKeypair,
    ) -> Result<VerifiedTicket>;
}

/// Can contain ticket statistics for a channel or aggregated ticket statistics for all channels.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct ChannelTicketStatistics {
    pub winning_tickets: u128,
    pub neglected_value: Balance,
    pub redeemed_value: Balance,
    pub unredeemed_value: Balance,
    pub rejected_value: Balance,
}

impl Default for ChannelTicketStatistics {
    fn default() -> Self {
        Self {
            winning_tickets: 0,
            neglected_value: BalanceType::HOPR.zero(),
            redeemed_value: BalanceType::HOPR.zero(),
            unredeemed_value: BalanceType::HOPR.zero(),
            rejected_value: BalanceType::HOPR.zero(),
        }
    }
}