1use hopr_api::chain::{ChannelId, HoprBalance, RedeemableTicket, VerifiedTicket};
2
3pub trait OutgoingIndexStore {
5 type Error: std::error::Error + Send + Sync + 'static;
6 fn load_outgoing_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<Option<u64>, Self::Error>;
10 fn save_outgoing_index(&mut self, channel_id: &ChannelId, epoch: u32, index: u64) -> Result<(), Self::Error>;
12 fn delete_outgoing_index(&mut self, channel_id: &ChannelId, epoch: u32) -> Result<(), Self::Error>;
14 fn iter_outgoing_indices(&self) -> Result<impl Iterator<Item = (ChannelId, u32)>, Self::Error>;
18}
19
20pub trait TicketQueueStore {
24 type Queue: TicketQueue;
26 fn open_or_create_queue(
28 &mut self,
29 channel_id: &ChannelId,
30 ) -> Result<Self::Queue, <Self::Queue as TicketQueue>::Error>;
31 fn delete_queue(
36 &mut self,
37 channel_id: &ChannelId,
38 ) -> Result<Vec<VerifiedTicket>, <Self::Queue as TicketQueue>::Error>;
39 fn iter_queues(&self) -> Result<impl Iterator<Item = ChannelId>, <Self::Queue as TicketQueue>::Error>;
41}
42
43pub trait TicketQueue {
51 type Error: std::error::Error + Send + Sync + 'static;
52 fn len(&self) -> Result<usize, Self::Error>;
54 fn is_empty(&self) -> Result<bool, Self::Error> {
56 Ok(self.len()? == 0)
57 }
58 fn push(&mut self, ticket: RedeemableTicket) -> Result<(), Self::Error>;
60 fn pop(&mut self) -> Result<Option<RedeemableTicket>, Self::Error>;
64 fn peek(&self) -> Result<Option<RedeemableTicket>, Self::Error>;
68 fn iter_unordered(&self) -> Result<impl Iterator<Item = Result<RedeemableTicket, Self::Error>>, Self::Error>;
73 fn total_value(&self, epoch: u32, min_index: Option<u64>) -> Result<HoprBalance, Self::Error> {
79 default_total_value(self, epoch, min_index)
80 }
81 fn drain(&mut self) -> Result<Vec<VerifiedTicket>, Self::Error> {
85 let mut tickets = Vec::new();
86 while let Some(ticket) = self.pop()? {
87 tickets.push(ticket.ticket);
88 }
89 Ok(tickets)
90 }
91}
92
93pub(crate) fn default_total_value<Q: TicketQueue + ?Sized>(
94 queue: &Q,
95 epoch: u32,
96 min_index: Option<u64>,
97) -> Result<HoprBalance, Q::Error> {
98 let min_index = min_index.unwrap_or(0);
99 Ok(queue
100 .iter_unordered()?
101 .filter_map(|res| {
102 res.inspect_err(|error| tracing::error!(%error, "failed to obtain ticket from the queue"))
103 .ok()
104 .filter(|t| t.verified_ticket().channel_epoch == epoch && t.verified_ticket().index >= min_index)
105 .map(|t| t.verified_ticket().amount)
106 })
107 .sum())
108}
109
110#[cfg(test)]
112pub mod tests {
113 use std::ops::RangeBounds;
114
115 use hopr_api::{
116 chain::{ChannelId, HoprBalance, RedeemableTicket, WinningProbability},
117 types::{crypto::prelude::*, crypto_random::Randomizable, internal::prelude::TicketBuilder},
118 };
119 use rand::prelude::SliceRandom;
120
121 use crate::{OutgoingIndexStore, TicketQueue, TicketQueueStore};
122
123 const TICKET_VALUE: u64 = 10;
124
125 pub fn generate_owned_tickets(
126 issuer: &ChainKeypair,
127 recipient: &ChainKeypair,
128 count: usize,
129 epochs: impl RangeBounds<u32> + Iterator<Item = u32>,
130 ) -> anyhow::Result<Vec<RedeemableTicket>> {
131 let mut tickets = Vec::new();
132 for epoch in epochs {
133 for i in 0..count {
134 let hk1 = HalfKey::random();
135 let hk2 = HalfKey::random();
136
137 let ticket = TicketBuilder::default()
138 .counterparty(recipient)
139 .index(i as u64)
140 .channel_epoch(epoch)
141 .win_prob(WinningProbability::ALWAYS)
142 .amount(TICKET_VALUE)
143 .challenge(Challenge::from_hint_and_share(
144 &hk1.to_challenge()?,
145 &hk2.to_challenge()?,
146 )?)
147 .build_signed(issuer, &Default::default())?
148 .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?)
149 .into_redeemable(recipient, &Default::default())?;
150
151 tickets.push(ticket);
152 }
153 }
154
155 tickets.sort();
156 Ok(tickets)
157 }
158
159 pub fn generate_tickets() -> anyhow::Result<Vec<RedeemableTicket>> {
160 generate_owned_tickets(&ChainKeypair::random(), &ChainKeypair::random(), 5, 1..=2)
161 }
162
163 pub fn fill_queue<Q: TicketQueue, I: Iterator<Item = RedeemableTicket>>(
164 queue: &mut Q,
165 iter: I,
166 ) -> anyhow::Result<()> {
167 for ticket in iter {
168 queue.push(ticket)?;
169 }
170 Ok(())
171 }
172
173 pub fn queue_maintains_natural_ticket_order<Q: TicketQueue>(mut queue: Q) -> anyhow::Result<()> {
174 let mut tickets = generate_tickets()?;
175 tickets.shuffle(&mut rand::rng());
176
177 fill_queue(&mut queue, tickets.iter().copied())?;
178
179 assert!(!queue.is_empty()?);
180 assert_eq!(tickets.len(), queue.len()?);
181
182 tickets.sort();
183 assert_eq!(Some(tickets[0]), queue.peek()?);
184
185 let mut collected_tickets = Vec::new();
186 while let Some(ticket) = queue.pop()? {
187 collected_tickets.push(ticket);
188 }
189
190 assert_eq!(collected_tickets, tickets);
191
192 Ok(())
193 }
194
195 pub fn queue_returns_all_tickets<Q: TicketQueue>(mut queue: Q) -> anyhow::Result<()> {
196 let mut tickets = generate_tickets()?;
197 tickets.sort();
198
199 fill_queue(&mut queue, tickets.iter().copied())?;
200
201 let mut collected_tickets = queue.iter_unordered()?.filter_map(|r| r.ok()).collect::<Vec<_>>();
202 collected_tickets.sort();
203
204 assert_eq!(tickets, collected_tickets);
205 Ok(())
206 }
207
208 pub fn queue_is_empty_when_drained<Q: TicketQueue>(mut queue: Q) -> anyhow::Result<()> {
209 fill_queue(&mut queue, generate_tickets()?.into_iter())?;
210 assert!(!queue.is_empty()?);
211
212 while queue.pop()?.is_some() {}
213 assert!(queue.is_empty()?);
214 assert_eq!(0, queue.len()?);
215 assert_eq!(0, queue.iter_unordered()?.count());
216 assert_eq!(0, queue.drain()?.len());
217 assert_eq!(None, queue.peek()?);
218
219 Ok(())
220 }
221
222 pub fn queue_returns_empty_iterator_when_drained<Q: TicketQueue>(mut queue: Q) -> anyhow::Result<()> {
223 fill_queue(&mut queue, generate_tickets()?.into_iter())?;
224 assert!(!queue.is_empty()?);
225
226 while queue.pop()?.is_some() {}
227 assert_eq!(queue.iter_unordered()?.count(), 0);
228 assert!(queue.is_empty()?);
229
230 let mut pushed_tickets = generate_tickets()?;
231 fill_queue(&mut queue, pushed_tickets.iter().copied())?;
232 assert!(!queue.is_empty()?);
233 let dropped_tickets = queue.drain()?;
234 assert!(queue.is_empty()?);
235 assert_eq!(queue.iter_unordered()?.count(), 0);
236
237 pushed_tickets.sort();
238 let pushed_tickets = pushed_tickets.into_iter().map(|t| t.ticket).collect::<Vec<_>>();
239 assert_eq!(pushed_tickets, dropped_tickets);
240
241 Ok(())
242 }
243
244 pub fn queue_returns_correct_total_ticket_value<Q: TicketQueue>(mut queue: Q) -> anyhow::Result<()> {
245 let tickets = generate_tickets()?;
246 fill_queue(&mut queue, tickets.iter().copied())?;
247 let all_tickets_value: HoprBalance = tickets.iter().map(|ticket| ticket.verified_ticket().amount).sum();
248
249 let expected_total_value: HoprBalance = tickets
250 .into_iter()
251 .filter(|ticket| ticket.verified_ticket().channel_epoch == 2)
252 .map(|ticket| ticket.verified_ticket().amount)
253 .sum();
254 let actual_total_value = queue.total_value(2, None)?;
255 assert_eq!(expected_total_value, actual_total_value);
256
257 let drained_tickets = queue.drain()?;
258 assert_eq!(
259 all_tickets_value,
260 drained_tickets
261 .iter()
262 .map(|ticket| ticket.verified_ticket().amount)
263 .sum()
264 );
265
266 let actual_total_value = queue.total_value(2, None)?;
267 assert_eq!(HoprBalance::zero(), actual_total_value);
268
269 Ok(())
270 }
271
272 pub fn queue_returns_correct_total_ticket_value_with_min_index<Q: TicketQueue>(mut queue: Q) -> anyhow::Result<()> {
273 let tickets = generate_tickets()?;
274 fill_queue(&mut queue, tickets.iter().copied())?;
275
276 let expected_total_value: HoprBalance = tickets
277 .into_iter()
278 .filter(|ticket| ticket.verified_ticket().channel_epoch == 2 && ticket.verified_ticket().index >= 2)
279 .map(|ticket| ticket.verified_ticket().amount)
280 .sum();
281 let actual_total_value = queue.total_value(2, Some(2))?;
282
283 assert_eq!(expected_total_value, actual_total_value);
284 Ok(())
285 }
286
287 pub fn ticket_store_should_create_new_queue_for_channel<S: TicketQueueStore>(mut store: S) -> anyhow::Result<()> {
288 assert_eq!(0, store.iter_queues()?.count());
289
290 let _ = store.open_or_create_queue(&Default::default())?;
291 let queues = store.iter_queues()?.collect::<Vec<_>>();
292 assert_eq!(1, queues.len());
293 assert!(queues.contains(&Default::default()));
294
295 Ok(())
296 }
297
298 pub fn ticket_store_should_open_existing_queue_for_channel<S: TicketQueueStore>(
299 mut store: S,
300 ) -> anyhow::Result<()> {
301 assert_eq!(0, store.iter_queues()?.count());
302
303 let mut tickets = generate_tickets()?;
304 let mut queue = store.open_or_create_queue(&Default::default())?;
305 fill_queue(&mut queue, tickets.iter().copied())?;
306 tickets.sort();
307
308 let queue = store.open_or_create_queue(&Default::default())?;
309 let opened_tickets = queue.iter_unordered()?.filter_map(|r| r.ok()).collect::<Vec<_>>();
310
311 assert_eq!(tickets, opened_tickets);
312
313 Ok(())
314 }
315
316 pub fn ticket_store_should_delete_existing_queue_for_channel<S: TicketQueueStore>(
317 mut store: S,
318 ) -> anyhow::Result<()> {
319 assert_eq!(0, store.iter_queues()?.count());
320
321 let _ = store.open_or_create_queue(&Default::default())?;
322 let queues = store.iter_queues()?.collect::<Vec<_>>();
323 assert_eq!(1, queues.len());
324 assert!(queues.contains(&Default::default()));
325
326 let tickets = store.delete_queue(&Default::default())?;
327 let queues = store.iter_queues()?.collect::<Vec<_>>();
328 assert_eq!(0, queues.len());
329 assert!(tickets.is_empty());
330
331 Ok(())
332 }
333
334 pub fn ticket_store_should_delete_existing_queue_for_channel_and_return_neglected_tickets<S: TicketQueueStore>(
335 mut store: S,
336 ) -> anyhow::Result<()> {
337 assert_eq!(0, store.iter_queues()?.count());
338
339 let mut queue = store.open_or_create_queue(&Default::default())?;
340 let queues = store.iter_queues()?.collect::<Vec<_>>();
341 assert_eq!(1, queues.len());
342 assert!(queues.contains(&Default::default()));
343 let mut tickets = generate_tickets()?;
344 fill_queue(&mut queue, tickets.iter().copied())?;
345
346 let deleted_tickets = store.delete_queue(&Default::default())?;
347 let queues = store.iter_queues()?.collect::<Vec<_>>();
348 assert_eq!(0, queues.len());
349
350 tickets.sort();
351 let pushed_tickets = tickets.into_iter().map(|t| t.ticket).collect::<Vec<_>>();
352 assert_eq!(pushed_tickets, deleted_tickets);
353
354 Ok(())
355 }
356
357 pub fn ticket_store_should_iterate_existing_queues_for_channel<S: TicketQueueStore>(
358 mut store: S,
359 ) -> anyhow::Result<()> {
360 assert_eq!(0, store.iter_queues()?.count());
361
362 let c1 = ChannelId::create(&[b"beef"]);
363 let _ = store.open_or_create_queue(&c1)?;
364 let queues = store.iter_queues()?.collect::<Vec<_>>();
365 assert_eq!(1, queues.len());
366 assert!(queues.contains(&c1));
367
368 let c2 = ChannelId::create(&[b"feed"]);
369 let _ = store.open_or_create_queue(&c2)?;
370 let queues = store.iter_queues()?.collect::<Vec<_>>();
371
372 assert_eq!(2, queues.len());
373 assert!(queues.contains(&c1));
374 assert!(queues.contains(&c2));
375
376 Ok(())
377 }
378
379 pub fn ticket_store_should_not_fail_to_delete_non_existing_queue_for_channel<S: TicketQueueStore>(
380 mut store: S,
381 ) -> anyhow::Result<()> {
382 assert_eq!(0, store.iter_queues()?.count());
383 assert!(store.delete_queue(&Default::default()).is_ok());
384
385 Ok(())
386 }
387
388 pub fn out_index_store_should_load_existing_index_for_channel_epoch<S: OutgoingIndexStore>(
389 mut store: S,
390 ) -> anyhow::Result<()> {
391 store.save_outgoing_index(&Default::default(), 1, 1)?;
392 let loaded = store.load_outgoing_index(&Default::default(), 1)?;
393 assert_eq!(Some(1), loaded);
394 Ok(())
395 }
396
397 pub fn out_index_store_should_not_load_non_existing_index_for_channel_epoch<S: OutgoingIndexStore>(
398 store: S,
399 ) -> anyhow::Result<()> {
400 let loaded = store.load_outgoing_index(&Default::default(), 1)?;
401 assert_eq!(None, loaded);
402 Ok(())
403 }
404
405 pub fn out_index_store_should_delete_existing_index_for_channel_epoch<S: OutgoingIndexStore>(
406 mut store: S,
407 ) -> anyhow::Result<()> {
408 store.save_outgoing_index(&Default::default(), 1, 1)?;
409 let loaded = store.load_outgoing_index(&Default::default(), 1)?;
410 assert_eq!(Some(1), loaded);
411 store.delete_outgoing_index(&Default::default(), 1)?;
412 let loaded = store.load_outgoing_index(&Default::default(), 1)?;
413 assert_eq!(None, loaded);
414 Ok(())
415 }
416
417 pub fn out_index_store_should_store_new_index_for_channel_epoch<S: OutgoingIndexStore>(
418 mut store: S,
419 ) -> anyhow::Result<()> {
420 store.save_outgoing_index(&Default::default(), 1, 1)?;
421 store.save_outgoing_index(&Default::default(), 2, 1)?;
422
423 let loaded = store.load_outgoing_index(&Default::default(), 1)?;
424 assert_eq!(Some(1), loaded);
425
426 let loaded = store.load_outgoing_index(&Default::default(), 2)?;
427 assert_eq!(Some(1), loaded);
428 Ok(())
429 }
430
431 pub fn out_index_should_update_existing_index_for_channel_epoch<S: OutgoingIndexStore>(
432 mut store: S,
433 ) -> anyhow::Result<()> {
434 store.save_outgoing_index(&Default::default(), 1, 1)?;
435 let loaded = store.load_outgoing_index(&Default::default(), 1)?;
436 assert_eq!(Some(1), loaded);
437
438 store.save_outgoing_index(&Default::default(), 1, 2)?;
439
440 let loaded = store.load_outgoing_index(&Default::default(), 1)?;
441 assert_eq!(Some(2), loaded);
442 Ok(())
443 }
444
445 pub fn out_index_store_should_iterate_existing_indices_for_channel_epoch<S: OutgoingIndexStore>(
446 mut store: S,
447 ) -> anyhow::Result<()> {
448 let other = Hash::create(&[b"other"]);
449
450 store.save_outgoing_index(&Default::default(), 1, 1)?;
451 store.save_outgoing_index(&Default::default(), 2, 1)?;
452 store.save_outgoing_index(&other, 3, 1)?;
453
454 let mut values = store.iter_outgoing_indices()?.collect::<Vec<_>>();
455 values.sort_unstable_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
456
457 assert_eq!(
458 vec![(ChannelId::default(), 1), (ChannelId::default(), 2), (other, 3)],
459 values
460 );
461
462 Ok(())
463 }
464}