Skip to main content

hopr_ticket_manager/
traits.rs

1use hopr_api::chain::{ChannelId, HoprBalance, RedeemableTicket, VerifiedTicket};
2
3/// Allows loading and saving outgoing ticket indices.
4pub trait OutgoingIndexStore {
5    type Error: std::error::Error + Send + Sync + 'static;
6    /// Loads the last used outgoing ticket index for the given channel and epoch.
7    ///
8    /// If the index is not found, returns `Ok(None)`.
9    fn load_outgoing_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<Option<u64>, Self::Error>;
10    /// Saves the last used outgoing ticket index for the given channel and epoch.
11    fn save_outgoing_index(&mut self, channel_id: &ChannelId, epoch: u32, index: u64) -> Result<(), Self::Error>;
12    /// Deletes the outgoing ticket index for the given channel and epoch.
13    fn delete_outgoing_index(&mut self, channel_id: &ChannelId, epoch: u32) -> Result<(), Self::Error>;
14    /// Iterate over all channel IDs and epochs of outgoing ticket indices in the storage.
15    ///
16    /// The iterator may yield the values in arbitrary order.
17    fn iter_outgoing_indices(&self) -> Result<impl Iterator<Item = (ChannelId, u32)>, Self::Error>;
18}
19
20/// Allows loading ticket queues from a storage.
21///
22/// This trait reflects the possibility of a node to relay packets for receiving tickets.
23pub trait TicketQueueStore {
24    /// Type of per-channel incoming ticket queues.
25    type Queue: TicketQueue;
26    /// Opens or creates a new queue in storage for the given channel.
27    fn open_or_create_queue(
28        &mut self,
29        channel_id: &ChannelId,
30    ) -> Result<Self::Queue, <Self::Queue as TicketQueue>::Error>;
31    /// Deletes the queue for the given channel.
32    ///
33    /// Returns any tickets left-over in this queue if it existed.
34    /// Returned tickets are no longer redeemable.
35    fn delete_queue(
36        &mut self,
37        channel_id: &ChannelId,
38    ) -> Result<Vec<VerifiedTicket>, <Self::Queue as TicketQueue>::Error>;
39    /// Iterate over all channel IDs of ticket queues in the storage.
40    fn iter_queues(&self) -> Result<impl Iterator<Item = ChannelId>, <Self::Queue as TicketQueue>::Error>;
41}
42
43/// Backend for the incoming ticket storage (double-ended) queue.
44///
45/// This trait defines the operations that an incoming ticket queue for a specific channel must support.
46/// A queue can only store redeemable tickets from the same channel but can contain tickets from different channel
47/// epochs.
48///
49/// The implementations must honor the natural ordering of tickets.
50pub trait TicketQueue {
51    type Error: std::error::Error + Send + Sync + 'static;
52    /// Number of tickets in the queue.
53    fn len(&self) -> Result<usize, Self::Error>;
54    /// Indicates whether the queue is empty.
55    fn is_empty(&self) -> Result<bool, Self::Error> {
56        Ok(self.len()? == 0)
57    }
58    /// Add a ticket to the queue.
59    fn push(&mut self, ticket: RedeemableTicket) -> Result<(), Self::Error>;
60    /// Remove and return the next ticket in-order from the queue.
61    ///
62    /// This extracts the next ticket from the queue (after redeeming or neglecting it).
63    fn pop(&mut self) -> Result<Option<RedeemableTicket>, Self::Error>;
64    /// Return the next ticket in-order from the queue without removing it.
65    ///
66    /// This is the next ticket that can be extracted from the queue (redeemed or neglected).
67    fn peek(&self) -> Result<Option<RedeemableTicket>, Self::Error>;
68    /// Iterate over all tickets in the queue in **arbitrary** order.
69    ///
70    /// This is not the order in which tickets are redeemed or neglected (see [`TicketQueue::pop`] or
71    /// [`TicketQueue::peek`]).
72    fn iter_unordered(&self) -> Result<impl Iterator<Item = Result<RedeemableTicket, Self::Error>>, Self::Error>;
73    /// Computes the total value of tickets of the given epoch (and optionally minimum given index)
74    /// in this queue.
75    ///
76    /// The default implementation simply [iterates](TicketQueue::iter_unordered) the queue
77    /// and sums the total value of matching tickets. Implementations can provide more effective methods.
78    fn total_value(&self, epoch: u32, min_index: Option<u64>) -> Result<HoprBalance, Self::Error> {
79        default_total_value(self, epoch, min_index)
80    }
81    /// Drains all the remaining tickets from the queue, rendering them no longer redeemable.
82    ///
83    /// The drained tickets are still ordered according to their natural ordering.
84    fn drain(&mut self) -> Result<Vec<VerifiedTicket>, Self::Error> {
85        let mut tickets = Vec::new();
86        while let Some(ticket) = self.pop()? {
87            tickets.push(ticket.ticket);
88        }
89        Ok(tickets)
90    }
91}
92
93pub(crate) fn default_total_value<Q: TicketQueue + ?Sized>(
94    queue: &Q,
95    epoch: u32,
96    min_index: Option<u64>,
97) -> Result<HoprBalance, Q::Error> {
98    let min_index = min_index.unwrap_or(0);
99    Ok(queue
100        .iter_unordered()?
101        .filter_map(|res| {
102            res.inspect_err(|error| tracing::error!(%error, "failed to obtain ticket from the queue"))
103                .ok()
104                .filter(|t| t.verified_ticket().channel_epoch == epoch && t.verified_ticket().index >= min_index)
105                .map(|t| t.verified_ticket().amount)
106        })
107        .sum())
108}
109
110// Contains general utilities and test helpers for ticket queue implementations.
111#[cfg(test)]
112pub mod tests {
113    use std::ops::RangeBounds;
114
115    use hopr_api::{
116        chain::{ChannelId, HoprBalance, RedeemableTicket, WinningProbability},
117        types::{crypto::prelude::*, crypto_random::Randomizable, internal::prelude::TicketBuilder},
118    };
119    use rand::prelude::SliceRandom;
120
121    use crate::{OutgoingIndexStore, TicketQueue, TicketQueueStore};
122
123    const TICKET_VALUE: u64 = 10;
124
125    pub fn generate_owned_tickets(
126        issuer: &ChainKeypair,
127        recipient: &ChainKeypair,
128        count: usize,
129        epochs: impl RangeBounds<u32> + Iterator<Item = u32>,
130    ) -> anyhow::Result<Vec<RedeemableTicket>> {
131        let mut tickets = Vec::new();
132        for epoch in epochs {
133            for i in 0..count {
134                let hk1 = HalfKey::random();
135                let hk2 = HalfKey::random();
136
137                let ticket = TicketBuilder::default()
138                    .counterparty(recipient)
139                    .index(i as u64)
140                    .channel_epoch(epoch)
141                    .win_prob(WinningProbability::ALWAYS)
142                    .amount(TICKET_VALUE)
143                    .challenge(Challenge::from_hint_and_share(
144                        &hk1.to_challenge()?,
145                        &hk2.to_challenge()?,
146                    )?)
147                    .build_signed(issuer, &Default::default())?
148                    .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?)
149                    .into_redeemable(recipient, &Default::default())?;
150
151                tickets.push(ticket);
152            }
153        }
154
155        tickets.sort();
156        Ok(tickets)
157    }
158
159    pub fn generate_tickets() -> anyhow::Result<Vec<RedeemableTicket>> {
160        generate_owned_tickets(&ChainKeypair::random(), &ChainKeypair::random(), 5, 1..=2)
161    }
162
163    pub fn fill_queue<Q: TicketQueue, I: Iterator<Item = RedeemableTicket>>(
164        queue: &mut Q,
165        iter: I,
166    ) -> anyhow::Result<()> {
167        for ticket in iter {
168            queue.push(ticket)?;
169        }
170        Ok(())
171    }
172
173    pub fn queue_maintains_natural_ticket_order<Q: TicketQueue>(mut queue: Q) -> anyhow::Result<()> {
174        let mut tickets = generate_tickets()?;
175        tickets.shuffle(&mut rand::rng());
176
177        fill_queue(&mut queue, tickets.iter().copied())?;
178
179        assert!(!queue.is_empty()?);
180        assert_eq!(tickets.len(), queue.len()?);
181
182        tickets.sort();
183        assert_eq!(Some(tickets[0]), queue.peek()?);
184
185        let mut collected_tickets = Vec::new();
186        while let Some(ticket) = queue.pop()? {
187            collected_tickets.push(ticket);
188        }
189
190        assert_eq!(collected_tickets, tickets);
191
192        Ok(())
193    }
194
195    pub fn queue_returns_all_tickets<Q: TicketQueue>(mut queue: Q) -> anyhow::Result<()> {
196        let mut tickets = generate_tickets()?;
197        tickets.sort();
198
199        fill_queue(&mut queue, tickets.iter().copied())?;
200
201        let mut collected_tickets = queue.iter_unordered()?.filter_map(|r| r.ok()).collect::<Vec<_>>();
202        collected_tickets.sort();
203
204        assert_eq!(tickets, collected_tickets);
205        Ok(())
206    }
207
208    pub fn queue_is_empty_when_drained<Q: TicketQueue>(mut queue: Q) -> anyhow::Result<()> {
209        fill_queue(&mut queue, generate_tickets()?.into_iter())?;
210        assert!(!queue.is_empty()?);
211
212        while queue.pop()?.is_some() {}
213        assert!(queue.is_empty()?);
214        assert_eq!(0, queue.len()?);
215        assert_eq!(0, queue.iter_unordered()?.count());
216        assert_eq!(0, queue.drain()?.len());
217        assert_eq!(None, queue.peek()?);
218
219        Ok(())
220    }
221
222    pub fn queue_returns_empty_iterator_when_drained<Q: TicketQueue>(mut queue: Q) -> anyhow::Result<()> {
223        fill_queue(&mut queue, generate_tickets()?.into_iter())?;
224        assert!(!queue.is_empty()?);
225
226        while queue.pop()?.is_some() {}
227        assert_eq!(queue.iter_unordered()?.count(), 0);
228        assert!(queue.is_empty()?);
229
230        let mut pushed_tickets = generate_tickets()?;
231        fill_queue(&mut queue, pushed_tickets.iter().copied())?;
232        assert!(!queue.is_empty()?);
233        let dropped_tickets = queue.drain()?;
234        assert!(queue.is_empty()?);
235        assert_eq!(queue.iter_unordered()?.count(), 0);
236
237        pushed_tickets.sort();
238        let pushed_tickets = pushed_tickets.into_iter().map(|t| t.ticket).collect::<Vec<_>>();
239        assert_eq!(pushed_tickets, dropped_tickets);
240
241        Ok(())
242    }
243
244    pub fn queue_returns_correct_total_ticket_value<Q: TicketQueue>(mut queue: Q) -> anyhow::Result<()> {
245        let tickets = generate_tickets()?;
246        fill_queue(&mut queue, tickets.iter().copied())?;
247        let all_tickets_value: HoprBalance = tickets.iter().map(|ticket| ticket.verified_ticket().amount).sum();
248
249        let expected_total_value: HoprBalance = tickets
250            .into_iter()
251            .filter(|ticket| ticket.verified_ticket().channel_epoch == 2)
252            .map(|ticket| ticket.verified_ticket().amount)
253            .sum();
254        let actual_total_value = queue.total_value(2, None)?;
255        assert_eq!(expected_total_value, actual_total_value);
256
257        let drained_tickets = queue.drain()?;
258        assert_eq!(
259            all_tickets_value,
260            drained_tickets
261                .iter()
262                .map(|ticket| ticket.verified_ticket().amount)
263                .sum()
264        );
265
266        let actual_total_value = queue.total_value(2, None)?;
267        assert_eq!(HoprBalance::zero(), actual_total_value);
268
269        Ok(())
270    }
271
272    pub fn queue_returns_correct_total_ticket_value_with_min_index<Q: TicketQueue>(mut queue: Q) -> anyhow::Result<()> {
273        let tickets = generate_tickets()?;
274        fill_queue(&mut queue, tickets.iter().copied())?;
275
276        let expected_total_value: HoprBalance = tickets
277            .into_iter()
278            .filter(|ticket| ticket.verified_ticket().channel_epoch == 2 && ticket.verified_ticket().index >= 2)
279            .map(|ticket| ticket.verified_ticket().amount)
280            .sum();
281        let actual_total_value = queue.total_value(2, Some(2))?;
282
283        assert_eq!(expected_total_value, actual_total_value);
284        Ok(())
285    }
286
287    pub fn ticket_store_should_create_new_queue_for_channel<S: TicketQueueStore>(mut store: S) -> anyhow::Result<()> {
288        assert_eq!(0, store.iter_queues()?.count());
289
290        let _ = store.open_or_create_queue(&Default::default())?;
291        let queues = store.iter_queues()?.collect::<Vec<_>>();
292        assert_eq!(1, queues.len());
293        assert!(queues.contains(&Default::default()));
294
295        Ok(())
296    }
297
298    pub fn ticket_store_should_open_existing_queue_for_channel<S: TicketQueueStore>(
299        mut store: S,
300    ) -> anyhow::Result<()> {
301        assert_eq!(0, store.iter_queues()?.count());
302
303        let mut tickets = generate_tickets()?;
304        let mut queue = store.open_or_create_queue(&Default::default())?;
305        fill_queue(&mut queue, tickets.iter().copied())?;
306        tickets.sort();
307
308        let queue = store.open_or_create_queue(&Default::default())?;
309        let opened_tickets = queue.iter_unordered()?.filter_map(|r| r.ok()).collect::<Vec<_>>();
310
311        assert_eq!(tickets, opened_tickets);
312
313        Ok(())
314    }
315
316    pub fn ticket_store_should_delete_existing_queue_for_channel<S: TicketQueueStore>(
317        mut store: S,
318    ) -> anyhow::Result<()> {
319        assert_eq!(0, store.iter_queues()?.count());
320
321        let _ = store.open_or_create_queue(&Default::default())?;
322        let queues = store.iter_queues()?.collect::<Vec<_>>();
323        assert_eq!(1, queues.len());
324        assert!(queues.contains(&Default::default()));
325
326        let tickets = store.delete_queue(&Default::default())?;
327        let queues = store.iter_queues()?.collect::<Vec<_>>();
328        assert_eq!(0, queues.len());
329        assert!(tickets.is_empty());
330
331        Ok(())
332    }
333
334    pub fn ticket_store_should_delete_existing_queue_for_channel_and_return_neglected_tickets<S: TicketQueueStore>(
335        mut store: S,
336    ) -> anyhow::Result<()> {
337        assert_eq!(0, store.iter_queues()?.count());
338
339        let mut queue = store.open_or_create_queue(&Default::default())?;
340        let queues = store.iter_queues()?.collect::<Vec<_>>();
341        assert_eq!(1, queues.len());
342        assert!(queues.contains(&Default::default()));
343        let mut tickets = generate_tickets()?;
344        fill_queue(&mut queue, tickets.iter().copied())?;
345
346        let deleted_tickets = store.delete_queue(&Default::default())?;
347        let queues = store.iter_queues()?.collect::<Vec<_>>();
348        assert_eq!(0, queues.len());
349
350        tickets.sort();
351        let pushed_tickets = tickets.into_iter().map(|t| t.ticket).collect::<Vec<_>>();
352        assert_eq!(pushed_tickets, deleted_tickets);
353
354        Ok(())
355    }
356
357    pub fn ticket_store_should_iterate_existing_queues_for_channel<S: TicketQueueStore>(
358        mut store: S,
359    ) -> anyhow::Result<()> {
360        assert_eq!(0, store.iter_queues()?.count());
361
362        let c1 = ChannelId::create(&[b"beef"]);
363        let _ = store.open_or_create_queue(&c1)?;
364        let queues = store.iter_queues()?.collect::<Vec<_>>();
365        assert_eq!(1, queues.len());
366        assert!(queues.contains(&c1));
367
368        let c2 = ChannelId::create(&[b"feed"]);
369        let _ = store.open_or_create_queue(&c2)?;
370        let queues = store.iter_queues()?.collect::<Vec<_>>();
371
372        assert_eq!(2, queues.len());
373        assert!(queues.contains(&c1));
374        assert!(queues.contains(&c2));
375
376        Ok(())
377    }
378
379    pub fn ticket_store_should_not_fail_to_delete_non_existing_queue_for_channel<S: TicketQueueStore>(
380        mut store: S,
381    ) -> anyhow::Result<()> {
382        assert_eq!(0, store.iter_queues()?.count());
383        assert!(store.delete_queue(&Default::default()).is_ok());
384
385        Ok(())
386    }
387
388    pub fn out_index_store_should_load_existing_index_for_channel_epoch<S: OutgoingIndexStore>(
389        mut store: S,
390    ) -> anyhow::Result<()> {
391        store.save_outgoing_index(&Default::default(), 1, 1)?;
392        let loaded = store.load_outgoing_index(&Default::default(), 1)?;
393        assert_eq!(Some(1), loaded);
394        Ok(())
395    }
396
397    pub fn out_index_store_should_not_load_non_existing_index_for_channel_epoch<S: OutgoingIndexStore>(
398        store: S,
399    ) -> anyhow::Result<()> {
400        let loaded = store.load_outgoing_index(&Default::default(), 1)?;
401        assert_eq!(None, loaded);
402        Ok(())
403    }
404
405    pub fn out_index_store_should_delete_existing_index_for_channel_epoch<S: OutgoingIndexStore>(
406        mut store: S,
407    ) -> anyhow::Result<()> {
408        store.save_outgoing_index(&Default::default(), 1, 1)?;
409        let loaded = store.load_outgoing_index(&Default::default(), 1)?;
410        assert_eq!(Some(1), loaded);
411        store.delete_outgoing_index(&Default::default(), 1)?;
412        let loaded = store.load_outgoing_index(&Default::default(), 1)?;
413        assert_eq!(None, loaded);
414        Ok(())
415    }
416
417    pub fn out_index_store_should_store_new_index_for_channel_epoch<S: OutgoingIndexStore>(
418        mut store: S,
419    ) -> anyhow::Result<()> {
420        store.save_outgoing_index(&Default::default(), 1, 1)?;
421        store.save_outgoing_index(&Default::default(), 2, 1)?;
422
423        let loaded = store.load_outgoing_index(&Default::default(), 1)?;
424        assert_eq!(Some(1), loaded);
425
426        let loaded = store.load_outgoing_index(&Default::default(), 2)?;
427        assert_eq!(Some(1), loaded);
428        Ok(())
429    }
430
431    pub fn out_index_should_update_existing_index_for_channel_epoch<S: OutgoingIndexStore>(
432        mut store: S,
433    ) -> anyhow::Result<()> {
434        store.save_outgoing_index(&Default::default(), 1, 1)?;
435        let loaded = store.load_outgoing_index(&Default::default(), 1)?;
436        assert_eq!(Some(1), loaded);
437
438        store.save_outgoing_index(&Default::default(), 1, 2)?;
439
440        let loaded = store.load_outgoing_index(&Default::default(), 1)?;
441        assert_eq!(Some(2), loaded);
442        Ok(())
443    }
444
445    pub fn out_index_store_should_iterate_existing_indices_for_channel_epoch<S: OutgoingIndexStore>(
446        mut store: S,
447    ) -> anyhow::Result<()> {
448        let other = Hash::create(&[b"other"]);
449
450        store.save_outgoing_index(&Default::default(), 1, 1)?;
451        store.save_outgoing_index(&Default::default(), 2, 1)?;
452        store.save_outgoing_index(&other, 3, 1)?;
453
454        let mut values = store.iter_outgoing_indices()?.collect::<Vec<_>>();
455        values.sort_unstable_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
456
457        assert_eq!(
458            vec![(ChannelId::default(), 1), (ChannelId::default(), 2), (other, 3)],
459            values
460        );
461
462        Ok(())
463    }
464}