1use std::collections::HashSet;
2use std::fmt::{Display, Formatter};
3use std::ops::{Bound, RangeBounds};
4use std::sync::atomic::AtomicU64;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use futures::stream::BoxStream;
9
10use hopr_crypto_types::prelude::*;
11use hopr_internal_types::prelude::*;
12use hopr_primitive_types::prelude::*;
13
14use crate::errors::Result;
15
16#[derive(Debug, Clone, PartialEq, Eq, Default)]
18pub enum TicketIndexSelector {
19 #[default]
22 None,
23 Single(u64),
25 Multiple(HashSet<u64>),
27 Range((Bound<u64>, Bound<u64>)),
29}
30
31impl Display for TicketIndexSelector {
32 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33 match &self {
34 TicketIndexSelector::None => write!(f, ""),
35 TicketIndexSelector::Single(idx) => write!(f, "with index {idx}"),
36 TicketIndexSelector::Multiple(indices) => write!(f, "with indices {indices:?}"),
37 TicketIndexSelector::Range((lb, ub)) => write!(f, "with indices in {lb:?}..{ub:?}"),
38 }
39 }
40}
41
42#[derive(Clone, Debug, PartialEq, Eq)]
46pub struct TicketSelector {
47 pub channel_identifiers: Vec<(Hash, U256)>,
49 pub index: TicketIndexSelector,
53 pub win_prob: (Bound<EncodedWinProb>, Bound<EncodedWinProb>),
56 pub amount: (Bound<Balance>, Bound<Balance>),
59 pub state: Option<AcknowledgedTicketStatus>,
61 pub only_aggregated: bool,
63}
64
65impl Display for TicketSelector {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 let out = format!(
68 "ticket selector in {:?} {}{}{}{}{}",
69 self.channel_identifiers,
70 self.index,
71 self.state
72 .map(|state| format!(" in state {state}"))
73 .unwrap_or("".into()),
74 if self.only_aggregated { " only aggregated" } else { "" },
75 match &self.win_prob {
76 (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
77 bounds => format!(" with winning probability in {bounds:?}"),
78 },
79 match &self.amount {
80 (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
81 bounds => format!(" with amount in {bounds:?}"),
82 },
83 );
84 write!(f, "{}", out.trim())
85 }
86}
87
88impl TicketSelector {
89 pub fn new<T: Into<U256>>(channel_id: Hash, epoch: T) -> Self {
91 Self {
92 channel_identifiers: vec![(channel_id, epoch.into())],
93 index: TicketIndexSelector::None,
94 win_prob: (Bound::Unbounded, Bound::Unbounded),
95 amount: (Bound::Unbounded, Bound::Unbounded),
96 state: None,
97 only_aggregated: false,
98 }
99 }
100
101 pub fn also_on_channel<T: Into<U256>>(self, channel_id: Hash, epoch: T) -> Self {
108 let mut ret = self.clone();
109 ret.index = TicketIndexSelector::None;
110 ret.channel_identifiers.push((channel_id, epoch.into()));
111 ret
112 }
113
114 pub fn just_on_channel<T: Into<U256>>(self, channel_id: Hash, epoch: T) -> Self {
117 let mut ret = self.clone();
118 ret.channel_identifiers = vec![(channel_id, epoch.into())];
119 ret
120 }
121
122 pub fn is_single_channel(&self) -> bool {
128 self.channel_identifiers.len() == 1
129 }
130
131 pub fn is_unique(&self) -> bool {
133 self.is_single_channel()
134 && (matches!(&self.index, TicketIndexSelector::Single(_))
135 || matches!(&self.index, TicketIndexSelector::Multiple(indices) if indices.len() == 1))
136 }
137
138 pub fn with_index(mut self, index: u64) -> Self {
143 self.channel_identifiers.truncate(1);
144 self.index = match self.index {
145 TicketIndexSelector::None | TicketIndexSelector::Range(_) => TicketIndexSelector::Single(index),
146 TicketIndexSelector::Single(existing) => {
147 TicketIndexSelector::Multiple(HashSet::from_iter([existing, index]))
148 }
149 TicketIndexSelector::Multiple(mut existing) => {
150 existing.insert(index);
151 TicketIndexSelector::Multiple(existing)
152 }
153 };
154 self
155 }
156
157 pub fn with_index_range<T: RangeBounds<u64>>(mut self, index_bound: T) -> Self {
161 self.channel_identifiers.truncate(1);
162 self.index = TicketIndexSelector::Range((index_bound.start_bound().cloned(), index_bound.end_bound().cloned()));
163 self
164 }
165
166 pub fn with_state(mut self, state: AcknowledgedTicketStatus) -> Self {
168 self.state = Some(state);
169 self
170 }
171
172 pub fn with_no_state(mut self) -> Self {
174 self.state = None;
175 self
176 }
177
178 pub fn with_aggregated_only(mut self, only_aggregated: bool) -> Self {
180 self.only_aggregated = only_aggregated;
181 self
182 }
183
184 pub fn with_winning_probability<T: RangeBounds<EncodedWinProb>>(mut self, range: T) -> Self {
186 self.win_prob = (range.start_bound().cloned(), range.end_bound().cloned());
187 self
188 }
189
190 pub fn with_amount<T: RangeBounds<Balance>>(mut self, range: T) -> Self {
192 self.amount = (range.start_bound().cloned(), range.end_bound().cloned());
193 self
194 }
195}
196
197impl From<&AcknowledgedTicket> for TicketSelector {
198 fn from(value: &AcknowledgedTicket) -> Self {
199 Self {
200 channel_identifiers: vec![(
201 value.verified_ticket().channel_id,
202 value.verified_ticket().channel_epoch.into(),
203 )],
204 index: TicketIndexSelector::Single(value.verified_ticket().index),
205 win_prob: (Bound::Unbounded, Bound::Unbounded),
206 amount: (Bound::Unbounded, Bound::Unbounded),
207 state: Some(value.status),
208 only_aggregated: value.verified_ticket().index_offset > 1,
209 }
210 }
211}
212
213impl From<&RedeemableTicket> for TicketSelector {
214 fn from(value: &RedeemableTicket) -> Self {
215 Self {
216 channel_identifiers: vec![(
217 value.verified_ticket().channel_id,
218 value.verified_ticket().channel_epoch.into(),
219 )],
220 index: TicketIndexSelector::Single(value.verified_ticket().index),
221 win_prob: (Bound::Unbounded, Bound::Unbounded),
222 amount: (Bound::Unbounded, Bound::Unbounded),
223 state: None,
224 only_aggregated: value.verified_ticket().index_offset > 1,
225 }
226 }
227}
228
229impl From<&ChannelEntry> for TicketSelector {
230 fn from(value: &ChannelEntry) -> Self {
231 Self {
232 channel_identifiers: vec![(value.get_id(), value.channel_epoch)],
233 index: TicketIndexSelector::None,
234 win_prob: (Bound::Unbounded, Bound::Unbounded),
235 amount: (Bound::Unbounded, Bound::Unbounded),
236 state: None,
237 only_aggregated: false,
238 }
239 }
240}
241
242impl From<ChannelEntry> for TicketSelector {
243 fn from(value: ChannelEntry) -> Self {
244 TicketSelector::from(&value)
245 }
246}
247
248#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display)]
251#[strum(serialize_all = "lowercase")]
252pub enum TicketMarker {
253 Redeemed,
254 Rejected,
255 Neglected,
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Default)]
262pub struct AggregationPrerequisites {
263 pub min_ticket_count: Option<usize>,
265 pub min_unaggregated_ratio: Option<f64>,
269}
270
271#[async_trait]
272pub trait HoprDbTicketOperations {
273 async fn get_all_tickets(&self) -> Result<Vec<AcknowledgedTicket>>;
277
278 async fn get_tickets(&self, selector: TicketSelector) -> Result<Vec<AcknowledgedTicket>>;
282
283 async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize>;
288
289 async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<()>;
295
296 async fn update_ticket_states_and_fetch<'a>(
300 &'a self,
301 selector: TicketSelector,
302 new_state: AcknowledgedTicketStatus,
303 ) -> Result<BoxStream<'a, AcknowledgedTicket>>;
304
305 async fn update_ticket_states(
307 &self,
308 selector: TicketSelector,
309 new_state: AcknowledgedTicketStatus,
310 ) -> Result<usize>;
311
312 async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics>;
316
317 async fn reset_ticket_statistics(&self) -> Result<()>;
319
320 async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, Balance)>;
324
325 async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64>;
332
333 async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64>;
339
340 async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64>;
344
345 async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>>;
349
350 async fn persist_outgoing_ticket_indices(&self) -> Result<usize>;
355
356 async fn prepare_aggregation_in_channel(
361 &self,
362 channel: &Hash,
363 prerequisites: AggregationPrerequisites,
364 ) -> Result<Option<(OffchainPublicKey, Vec<TransferableWinningTicket>, Hash)>>;
365
366 async fn rollback_aggregation_in_channel(&self, channel: Hash) -> Result<()>;
372
373 async fn process_received_aggregated_ticket(
375 &self,
376 aggregated_ticket: Ticket,
377 chain_keypair: &ChainKeypair,
378 ) -> Result<AcknowledgedTicket>;
379
380 async fn aggregate_tickets(
382 &self,
383 destination: OffchainPublicKey,
384 acked_tickets: Vec<TransferableWinningTicket>,
385 me: &ChainKeypair,
386 ) -> Result<VerifiedTicket>;
387
388 async fn fix_channels_next_ticket_state(&self) -> Result<()>;
390}
391
392#[derive(Copy, Clone, Debug, PartialEq, Eq)]
394pub struct ChannelTicketStatistics {
395 pub winning_tickets: u128,
396 pub neglected_value: Balance,
397 pub redeemed_value: Balance,
398 pub unredeemed_value: Balance,
399 pub rejected_value: Balance,
400}
401
402impl Default for ChannelTicketStatistics {
403 fn default() -> Self {
404 Self {
405 winning_tickets: 0,
406 neglected_value: BalanceType::HOPR.zero(),
407 redeemed_value: BalanceType::HOPR.zero(),
408 unredeemed_value: BalanceType::HOPR.zero(),
409 rejected_value: BalanceType::HOPR.zero(),
410 }
411 }
412}