1use std::{
2 collections::HashSet,
3 fmt::{Display, Formatter},
4 ops::{Bound, RangeBounds},
5 sync::{Arc, atomic::AtomicU64},
6};
7
8use futures::stream::BoxStream;
9use hopr_crypto_types::prelude::*;
10use hopr_internal_types::prelude::*;
11use hopr_primitive_types::prelude::*;
12
13#[derive(Debug, Clone, PartialEq, Eq, Default)]
15pub enum TicketIndexSelector {
16 #[default]
19 None,
20 Single(u64),
22 Multiple(HashSet<u64>),
24 Range((Bound<u64>, Bound<u64>)),
26}
27
28impl Display for TicketIndexSelector {
29 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
30 match &self {
31 TicketIndexSelector::None => write!(f, ""),
32 TicketIndexSelector::Single(idx) => write!(f, "with index {idx}"),
33 TicketIndexSelector::Multiple(indices) => write!(f, "with indices {indices:?}"),
34 TicketIndexSelector::Range((lb, ub)) => write!(f, "with indices in {lb:?}..{ub:?}"),
35 }
36 }
37}
38
39#[derive(Clone, Debug)]
41pub struct TicketSelector {
42 pub channel_identifiers: Vec<(Hash, U256)>,
44 pub index: TicketIndexSelector,
49 pub win_prob: (Bound<WinningProbability>, Bound<WinningProbability>),
52 pub amount: (Bound<HoprBalance>, Bound<HoprBalance>),
55 pub state: Option<AcknowledgedTicketStatus>,
57 pub only_aggregated: bool,
59}
60
61impl Display for TicketSelector {
62 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63 let out = format!(
64 "ticket selector in {:?} {}{}{}{}{}",
65 self.channel_identifiers,
66 self.index,
67 self.state
68 .map(|state| format!(" in state {state}"))
69 .unwrap_or("".into()),
70 if self.only_aggregated { " only aggregated" } else { "" },
71 match &self.win_prob {
72 (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
73 bounds => format!(" with winning probability in {bounds:?}"),
74 },
75 match &self.amount {
76 (Bound::Unbounded, Bound::Unbounded) => "".to_string(),
77 bounds => format!(" with amount in {bounds:?}"),
78 },
79 );
80 write!(f, "{}", out.trim())
81 }
82}
83
84fn approx_cmp_bounds(b1: Bound<WinningProbability>, b2: Bound<WinningProbability>) -> bool {
85 match (b1, b2) {
86 (Bound::Unbounded, Bound::Unbounded) => true,
87 (Bound::Included(a), Bound::Included(b)) => b.approx_eq(&a),
88 (Bound::Excluded(a), Bound::Excluded(b)) => b.approx_eq(&a),
89 _ => false,
90 }
91}
92
93impl PartialEq for TicketSelector {
94 fn eq(&self, other: &Self) -> bool {
95 self.channel_identifiers == other.channel_identifiers
96 && self.index == other.index
97 && self.state == other.state
98 && self.only_aggregated == other.only_aggregated
99 && self.amount == other.amount
100 && approx_cmp_bounds(self.win_prob.0, other.win_prob.0)
101 && approx_cmp_bounds(self.win_prob.1, other.win_prob.1)
102 }
103}
104
105impl TicketSelector {
106 pub fn new<T: Into<U256>>(channel_id: Hash, epoch: T) -> Self {
108 Self {
109 channel_identifiers: vec![(channel_id, epoch.into())],
110 index: TicketIndexSelector::None,
111 win_prob: (Bound::Unbounded, Bound::Unbounded),
112 amount: (Bound::Unbounded, Bound::Unbounded),
113 state: None,
114 only_aggregated: false,
115 }
116 }
117
118 #[must_use]
125 pub fn also_on_channel<T: Into<U256>>(self, channel_id: Hash, epoch: T) -> Self {
126 let mut ret = self.clone();
127 ret.index = TicketIndexSelector::None;
128 ret.channel_identifiers.push((channel_id, epoch.into()));
129 ret
130 }
131
132 #[must_use]
134 pub fn also_on_channel_entry(self, entry: &ChannelEntry) -> Self {
135 self.also_on_channel(entry.get_id(), entry.channel_epoch)
136 }
137
138 #[must_use]
142 pub fn just_on_channel<T: Into<U256>>(self, channel_id: Hash, epoch: T) -> Self {
143 let mut ret = self.clone();
144 ret.channel_identifiers = vec![(channel_id, epoch.into())];
145 ret
146 }
147
148 pub fn is_single_channel(&self) -> bool {
154 self.channel_identifiers.len() == 1
155 }
156
157 pub fn is_unique(&self) -> bool {
159 self.is_single_channel()
160 && (matches!(&self.index, TicketIndexSelector::Single(_))
161 || matches!(&self.index, TicketIndexSelector::Multiple(indices) if indices.len() == 1))
162 }
163
164 #[must_use]
170 pub fn with_index(mut self, index: u64) -> Self {
171 self.channel_identifiers.truncate(1);
172 self.index = match self.index {
173 TicketIndexSelector::None | TicketIndexSelector::Range(_) => TicketIndexSelector::Single(index),
174 TicketIndexSelector::Single(existing) => {
175 TicketIndexSelector::Multiple(HashSet::from_iter([existing, index]))
176 }
177 TicketIndexSelector::Multiple(mut existing) => {
178 existing.insert(index);
179 TicketIndexSelector::Multiple(existing)
180 }
181 };
182 self
183 }
184
185 #[must_use]
189 pub fn with_index_range<T: RangeBounds<u64>>(mut self, index_bound: T) -> Self {
190 self.channel_identifiers.truncate(1);
191 self.index = TicketIndexSelector::Range((index_bound.start_bound().cloned(), index_bound.end_bound().cloned()));
192 self
193 }
194
195 #[must_use]
197 pub fn with_state(mut self, state: AcknowledgedTicketStatus) -> Self {
198 self.state = Some(state);
199 self
200 }
201
202 #[must_use]
204 pub fn with_no_state(mut self) -> Self {
205 self.state = None;
206 self
207 }
208
209 #[must_use]
211 pub fn with_aggregated_only(mut self, only_aggregated: bool) -> Self {
212 self.only_aggregated = only_aggregated;
213 self
214 }
215
216 #[must_use]
218 pub fn with_winning_probability<T: RangeBounds<WinningProbability>>(mut self, range: T) -> Self {
219 self.win_prob = (range.start_bound().cloned(), range.end_bound().cloned());
220 self
221 }
222
223 #[must_use]
225 pub fn with_amount<T: RangeBounds<HoprBalance>>(mut self, range: T) -> Self {
226 self.amount = (range.start_bound().cloned(), range.end_bound().cloned());
227 self
228 }
229}
230
231impl From<&AcknowledgedTicket> for TicketSelector {
232 fn from(value: &AcknowledgedTicket) -> Self {
233 Self {
234 channel_identifiers: vec![(
235 value.verified_ticket().channel_id,
236 value.verified_ticket().channel_epoch.into(),
237 )],
238 index: TicketIndexSelector::Single(value.verified_ticket().index),
239 win_prob: (Bound::Unbounded, Bound::Unbounded),
240 amount: (Bound::Unbounded, Bound::Unbounded),
241 state: Some(value.status),
242 only_aggregated: value.verified_ticket().index_offset > 1,
243 }
244 }
245}
246
247impl From<&RedeemableTicket> for TicketSelector {
248 fn from(value: &RedeemableTicket) -> Self {
249 Self {
250 channel_identifiers: vec![(
251 value.verified_ticket().channel_id,
252 value.verified_ticket().channel_epoch.into(),
253 )],
254 index: TicketIndexSelector::Single(value.verified_ticket().index),
255 win_prob: (Bound::Unbounded, Bound::Unbounded),
256 amount: (Bound::Unbounded, Bound::Unbounded),
257 state: None,
258 only_aggregated: value.verified_ticket().index_offset > 1,
259 }
260 }
261}
262
263impl From<&ChannelEntry> for TicketSelector {
264 fn from(value: &ChannelEntry) -> Self {
265 Self {
266 channel_identifiers: vec![(value.get_id(), value.channel_epoch)],
267 index: TicketIndexSelector::None,
268 win_prob: (Bound::Unbounded, Bound::Unbounded),
269 amount: (Bound::Unbounded, Bound::Unbounded),
270 state: None,
271 only_aggregated: false,
272 }
273 }
274}
275
276impl From<ChannelEntry> for TicketSelector {
277 fn from(value: ChannelEntry) -> Self {
278 TicketSelector::from(&value)
279 }
280}
281
282#[derive(Debug, Copy, Clone, PartialEq, Eq, strum::Display)]
285#[strum(serialize_all = "lowercase")]
286pub enum TicketMarker {
287 Redeemed,
288 Rejected,
289 Neglected,
290}
291
292#[async_trait::async_trait]
294pub trait HoprDbTicketOperations {
295 type Error: std::error::Error + Send + Sync + 'static;
296
297 async fn stream_tickets<'c>(
301 &'c self,
302 selector: Option<TicketSelector>,
303 ) -> Result<BoxStream<'c, AcknowledgedTicket>, Self::Error>;
304
305 async fn mark_tickets_as(&self, selector: TicketSelector, mark_as: TicketMarker) -> Result<usize, Self::Error>;
310
311 async fn mark_unsaved_ticket_rejected(&self, ticket: &Ticket) -> Result<(), Self::Error>;
317
318 async fn update_ticket_states_and_fetch<'a>(
322 &'a self,
323 selector: TicketSelector,
324 new_state: AcknowledgedTicketStatus,
325 ) -> Result<BoxStream<'a, AcknowledgedTicket>, Self::Error>;
326
327 async fn update_ticket_states(
329 &self,
330 selector: TicketSelector,
331 new_state: AcknowledgedTicketStatus,
332 ) -> Result<usize, Self::Error>;
333
334 async fn get_ticket_statistics(&self, channel_id: Option<Hash>) -> Result<ChannelTicketStatistics, Self::Error>;
338
339 async fn reset_ticket_statistics(&self) -> Result<(), Self::Error>;
341
342 async fn get_tickets_value(&self, selector: TicketSelector) -> Result<(usize, HoprBalance), Self::Error>;
346
347 async fn compare_and_set_outgoing_ticket_index(&self, channel_id: Hash, index: u64) -> Result<u64, Self::Error>;
354
355 async fn reset_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64, Self::Error>;
361
362 async fn increment_outgoing_ticket_index(&self, channel_id: Hash) -> Result<u64, Self::Error>;
366
367 async fn get_outgoing_ticket_index(&self, channel_id: Hash) -> Result<Arc<AtomicU64>, Self::Error>;
371
372 async fn persist_outgoing_ticket_indices(&self) -> Result<usize, Self::Error>;
377}
378
379#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
381pub struct ChannelTicketStatistics {
382 pub winning_tickets: u128,
383 pub neglected_value: HoprBalance,
384 pub redeemed_value: HoprBalance,
385 pub unredeemed_value: HoprBalance,
386 pub rejected_value: HoprBalance,
387}