hopr_db_api/tickets.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::ops::{Bound, RangeBounds};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use async_trait::async_trait;
use futures::stream::BoxStream;
use hopr_crypto_types::prelude::*;
use hopr_internal_types::prelude::*;
use hopr_primitive_types::prelude::*;
use crate::errors::Result;
/// Allows selecting a range of ticket indices in [TicketSelector].
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum TicketIndexSelector {
/// Selects no ticket index specifically.
/// This makes the [TicketSelector] less restrictive.
#[default]
None,
/// Selects a single ticket with the given index.
Single(u64),
/// Selects multiple tickets with the given indices.
Multiple(HashSet<u64>),
/// Selects multiple tickets with indices within the given range.
Range((Bound<u64>, Bound<u64>)),
}
impl Display for TicketIndexSelector {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match &self {
TicketIndexSelector::None => write!(f, ""),
TicketIndexSelector::Single(idx) => write!(f, "with index {idx}"),
TicketIndexSelector::Multiple(indices) => write!(f, "with indices {indices:?}"),
TicketIndexSelector::Range((lb, ub)) => write!(f, "with indices in {lb:?}..{ub:?}"),
}
}
}
/// Allows selecting multiple tickets (if `index` does not contain a single value)
/// or a single ticket (with unitary `index`) in the given channel and epoch.
/// The selection can be further restricted to select ticket only in the given `state`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketSelector {
/// Channel ID and Epoch pairs.
pub channel_identifiers: Vec<(Hash, U256)>,
/// If given, will select ticket(s) with the given indices
/// in the given channel and epoch.
/// See [TicketIndexSelector] for possible options.
pub index: TicketIndexSelector,
/// If given, the tickets are further restricted to the ones with a winning probability
/// in this range.
pub win_prob: (Bound<EncodedWinProb>, Bound<EncodedWinProb>),
/// If given, the tickets are further restricted to the ones with an amount
/// in this range.
pub amount: (Bound<Balance>, Bound<Balance>),
/// Further restriction to tickets with the given state.
pub state: Option<AcknowledgedTicketStatus>,
/// Further restrict to only aggregated tickets.
pub only_aggregated: bool,
}
impl Display for TicketSelector {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let out = format!(
"ticket selector in {:?} {}{}{}{}{}",
self.channel_identifiers,
self.index,
self.state
.map(|state| format!(" in state {state}"))
.unwrap_or("".into()),
if self.only_aggregated { " only aggregated" } else { "" },
match &self.win_prob {
(Bound::Unbounded, Bound::Unbounded) => "".to_string(),
bounds => format!(" with winning probability in {bounds:?}"),
},
match &self.amount {
(Bound::Unbounded, Bound::Unbounded) => "".to_string(),
bounds => format!(" with amount in {bounds:?}"),
},
);
write!(f, "{}", out.trim())
}
}
impl TicketSelector {
/// Create a new ticket selector given the `channel_id` and `epoch`.
pub fn new<T: Into<U256>>(channel_id: Hash, epoch: T) -> Self {
Self {
channel_identifiers: vec![(channel_id, epoch.into())],
index: TicketIndexSelector::None,
win_prob: (Bound::Unbounded, Bound::Unbounded),
amount: (Bound::Unbounded, Bound::Unbounded),
state: None,
only_aggregated: false,
}
}
/// Allows matching tickets on multiple channels, by adding the given
/// `channel_id` and `epoch` to the selector.
///
/// This also nullifies any prior effect of any prior calls to [`TicketSelector::with_index`] or
/// [`TicketSelector::with_index_range`]
/// as ticket indices cannot be matched over multiple channels.
pub fn also_on_channel<T: Into<U256>>(self, channel_id: Hash, epoch: T) -> Self {
let mut ret = self.clone();
ret.index = TicketIndexSelector::None;
ret.channel_identifiers.push((channel_id, epoch.into()));
ret
}
/// Sets the selector to match only tickets on the given `channel_id` and `epoch`.
/// This nullifies any prior calls to [`TicketSelector::also_on_channel`].
pub fn just_on_channel<T: Into<U256>>(self, channel_id: Hash, epoch: T) -> Self {
let mut ret = self.clone();
ret.channel_identifiers = vec![(channel_id, epoch.into())];
ret
}
/// Checks if this selector operates only on a single channel.
///
/// This will return `false` if [`TicketSelector::also_on_channel`] was called, and neither
/// the [`TicketSelector::with_index`] nor [`TicketSelector::with_index_range`] were
/// called subsequently.
pub fn is_single_channel(&self) -> bool {
self.channel_identifiers.len() == 1
}
/// If `false` is returned, the selector can fetch more than a single ticket.
pub fn is_unique(&self) -> bool {
self.is_single_channel()
&& (matches!(&self.index, TicketIndexSelector::Single(_))
|| matches!(&self.index, TicketIndexSelector::Multiple(indices) if indices.len() == 1))
}
/// Returns this instance with a ticket index set.
/// This method can be called multiple times to select multiple tickets.
/// If [`TicketSelector::with_index_range`] was previously called, it will be replaced.
/// If [`TicketSelector::also_on_channel`] was previously called, its effect will be nullified.
pub fn with_index(mut self, index: u64) -> Self {
self.channel_identifiers.truncate(1);
self.index = match self.index {
TicketIndexSelector::None | TicketIndexSelector::Range(_) => TicketIndexSelector::Single(index),
TicketIndexSelector::Single(existing) => {
TicketIndexSelector::Multiple(HashSet::from_iter([existing, index]))
}
TicketIndexSelector::Multiple(mut existing) => {
existing.insert(index);
TicketIndexSelector::Multiple(existing)
}
};
self
}
/// Returns this instance with a ticket index upper bound set.
/// If [`TicketSelector::with_index`] was previously called, it will be replaced.
/// If [`TicketSelector::also_on_channel`] was previously called, its effect will be nullified.
pub fn with_index_range<T: RangeBounds<u64>>(mut self, index_bound: T) -> Self {
self.channel_identifiers.truncate(1);
self.index = TicketIndexSelector::Range((index_bound.start_bound().cloned(), index_bound.end_bound().cloned()));
self
}
/// Returns this instance with a ticket state set.
pub fn with_state(mut self, state: AcknowledgedTicketStatus) -> Self {
self.state = Some(state);
self
}
/// Returns this instance without a ticket state set.
pub fn with_no_state(mut self) -> Self {
self.state = None;
self
}
/// Returns this instance with `only_aggregated` flag value.
pub fn with_aggregated_only(mut self, only_aggregated: bool) -> Self {
self.only_aggregated = only_aggregated;
self
}
/// Returns this instance with a winning probability range bounds set.
pub fn with_winning_probability<T: RangeBounds<EncodedWinProb>>(mut self, range: T) -> Self {
self.win_prob = (range.start_bound().cloned(), range.end_bound().cloned());
self
}
/// Returns this instance with the ticket amount range bounds set.
pub fn with_amount<T: RangeBounds<Balance>>(mut self, range: T) -> Self {
self.amount = (range.start_bound().cloned(), range.end_bound().cloned());
self
}
}
impl From<&AcknowledgedTicket> for TicketSelector {
fn from(value: &AcknowledgedTicket) -> Self {
Self {
channel_identifiers: vec![(
value.verified_ticket().channel_id,
value.verified_ticket().channel_epoch.into(),
)],
index: TicketIndexSelector::Single(value.verified_ticket().index),
win_prob: (Bound::Unbounded, Bound::Unbounded),
amount: (Bound::Unbounded, Bound::Unbounded),
state: Some(value.status),
only_aggregated: value.verified_ticket().index_offset > 1,
}
}
}
impl From<&RedeemableTicket> for TicketSelector {
fn from(value: &RedeemableTicket) -> Self {
Self {
channel_identifiers: vec![(
value.verified_ticket().channel_id,
value.verified_ticket().channel_epoch.into(),
)],
index: TicketIndexSelector::Single(value.verified_ticket().index),
win_prob: (Bound::Unbounded, Bound::Unbounded),
amount: (Bound::Unbounded, Bound::Unbounded),
state: None,
only_aggregated: value.verified_ticket().index_offset > 1,
}
}
}
impl From<&ChannelEntry> for TicketSelector {
fn from(value: &ChannelEntry) -> Self {
Self {
channel_identifiers: vec![(value.get_id(), value.channel_epoch)],
index: TicketIndexSelector::None,
win_prob: (Bound::Unbounded, Bound::Unbounded),
amount: (Bound::Unbounded, Bound::Unbounded),
state: None,
only_aggregated: false,
}
}
}
impl From<ChannelEntry> for TicketSelector {
fn from(value: ChannelEntry) -> Self {
TicketSelector::from(&value)
}
}
/// Different markers for unredeemed tickets.
/// See [`HoprDbTicketOperations::mark_tickets_as`] for usage.
#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display)]
#[strum(serialize_all = "lowercase")]
pub enum TicketMarker {
Redeemed,
Rejected,
Neglected,
}
/// Prerequisites for the ticket aggregator.
/// The prerequisites are **independent** of each other.
/// If none of the prerequisites are given, they are considered satisfied.
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub struct AggregationPrerequisites {
/// Minimum number of tickets in the channel.
pub min_ticket_count: Option<usize>,
/// Minimum ratio between balance of unaggregated messages and channel stake.
/// I.e.: the condition is met if a sum of unaggregated ticket amounts divided by
/// the total channel stake is greater than `min_unaggregated_ratio`.
pub min_unaggregated_ratio: Option<f64>,
}
#[async_trait]
pub trait HoprDbTicketOperations {
/// Retrieve acknowledged winning tickets, according to the given `selector`.
///
/// The optional transaction `tx` must be in the database.
async fn get_all_tickets(&self) -> Result<Vec<AcknowledgedTicket>>;
/// Retrieve acknowledged winning tickets, according to the given `selector`.
///
/// The optional transaction `tx` must be in the database.
async fn get_tickets(&self, selector: TicketSelector) -> Result<Vec<AcknowledgedTicket>>;
/// Marks tickets as the given [`TicketMarker`], removing them from the DB and updating the
/// ticket statistics for each ticket's channel.
///
/// Returns the number of marked tickets.
async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize>;
/// Updates the ticket statistics according to the fact that the given ticket has
/// been rejected by the packet processing pipeline.
///
/// This ticket is not yet stored in the ticket DB;
/// therefore, only the statistics in the corresponding channel are updated.
async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<()>;
/// Updates [state](AcknowledgedTicketStatus) of the tickets matching the given `selector`.
///
/// Returns the updated tickets in the new state.
async fn update_ticket_states_and_fetch<'a>(
&'a self,
selector: TicketSelector,
new_state: AcknowledgedTicketStatus,
) -> Result<BoxStream<'a, AcknowledgedTicket>>;
/// Updates [state](AcknowledgedTicketStatus) of the tickets matching the given `selector`.
async fn update_ticket_states(
&self,
selector: TicketSelector,
new_state: AcknowledgedTicketStatus,
) -> Result<usize>;
/// Retrieves the ticket statistics for the given channel.
///
/// If no channel is given, it retrieves aggregate ticket statistics for all channels.
async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics>;
/// Resets the ticket statistics about neglected, rejected, and redeemed tickets.
async fn reset_ticket_statistics(&self) -> Result<()>;
/// Counts the tickets matching the given `selector` and their total value.
///
/// The optional transaction `tx` must be in the database.
async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, Balance)>;
/// Sets the stored outgoing ticket index to `index`, only if the currently stored value
/// is less than `index`. This ensures the stored value can only be growing.
///
/// Returns the old value.
///
/// If the entry is not yet present for the given ID, it is initialized to 0.
async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64>;
/// Resets the outgoing ticket index to 0 for the given channel id.
///
/// Returns the old value before reset.
///
/// If the entry is not yet present for the given ID, it is initialized to 0.
async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64>;
/// Increments the outgoing ticket index in the given channel ID and returns the value before incrementing.
///
/// If the entry is not yet present for the given ID, it is initialized to 0 and incremented.
async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64>;
/// Gets the current outgoing ticket index for the given channel id.
///
/// If the entry is not yet present for the given ID, it is initialized to 0.
async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>>;
/// Compares outgoing ticket indices in the cache with the stored values
/// and updates the stored value where changed.
///
/// Returns the number of updated ticket indices.
async fn persist_outgoing_ticket_indices(&self) -> Result<usize>;
/// Prepare a viable collection of tickets to be aggregated.
///
/// Some preconditions for tickets apply. This callback will collect the aggregatable
/// tickets and marks them as being aggregated.
async fn prepare_aggregation_in_channel(
&self,
channel: &Hash,
prerequisites: AggregationPrerequisites,
) -> Result<Option<(OffchainPublicKey, Vec<TransferableWinningTicket>, Hash)>>;
/// Perform a ticket aggregation rollback in the channel.
///
/// If a ticket aggregation fails, this callback can be invoked to make sure that
/// resources are properly restored and cleaned up in the database, allowing further
/// aggregations.
async fn rollback_aggregation_in_channel(&self, channel: Hash) -> Result<()>;
/// Replace the aggregated tickets locally with an aggregated ticket from the counterparty.
async fn process_received_aggregated_ticket(
&self,
aggregated_ticket: Ticket,
chain_keypair: &ChainKeypair,
) -> Result<AcknowledgedTicket>;
/// Performs ticket aggregation as an issuing party of the given tickets.
async fn aggregate_tickets(
&self,
destination: OffchainPublicKey,
acked_tickets: Vec<TransferableWinningTicket>,
me: &ChainKeypair,
) -> Result<VerifiedTicket>;
}
/// Can contain ticket statistics for a channel or aggregated ticket statistics for all channels.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct ChannelTicketStatistics {
pub winning_tickets: u128,
pub neglected_value: Balance,
pub redeemed_value: Balance,
pub unredeemed_value: Balance,
pub rejected_value: Balance,
}
impl Default for ChannelTicketStatistics {
fn default() -> Self {
Self {
winning_tickets: 0,
neglected_value: BalanceType::HOPR.zero(),
redeemed_value: BalanceType::HOPR.zero(),
unredeemed_value: BalanceType::HOPR.zero(),
rejected_value: BalanceType::HOPR.zero(),
}
}
}