Skip to main content

hopr_ticket_manager/backend/
mod.rs

1mod memory;
2
3#[cfg(feature = "redb")]
4mod redb;
5
6use std::ops::{AddAssign, SubAssign};
7
8use hopr_api::chain::{HoprBalance, RedeemableTicket};
9pub use memory::*;
10#[cfg(feature = "redb")]
11pub use redb::*;
12
13use crate::{TicketQueue, traits::default_total_value};
14
15/// Adapter for [`TicketQueue`] that caches the total ticket value per channel epoch.
16///
17/// The cache value is updated with each `push` and `pop` operation.
18/// The [`total_value`](TicketQueue::total_value) method
19/// returns the cached value if available, otherwise it delegates to the underlying queue.
20/// If `min_index` is provided, the cache is bypassed and the underlying queue is queried directly.
21///
22/// All other calls are simply delegated to the underlying queue.
23///
24/// The implementation uses `hashbrown::HashMap` for efficient key-value storage.
25#[derive(Clone, Debug)]
26pub struct ValueCachedQueue<Q> {
27    queue: Q,
28    // Caches total ticket value per channel epoch
29    value_cache: hashbrown::HashMap<u32, HoprBalance>,
30}
31
32impl<Q: TicketQueue> ValueCachedQueue<Q> {
33    pub fn new(queue: Q) -> Result<Self, Q::Error> {
34        let mut value_cache = hashbrown::HashMap::<u32, HoprBalance>::new();
35        // Load all pre-existing ticket values into the cache
36        queue.iter_unordered()?.filter_map(|res| res.ok()).for_each(|ticket| {
37            value_cache
38                .entry(ticket.verified_ticket().channel_epoch)
39                .or_default()
40                .add_assign(ticket.verified_ticket().amount);
41        });
42
43        Ok(Self { queue, value_cache })
44    }
45}
46
47impl<Q: TicketQueue> TicketQueue for ValueCachedQueue<Q> {
48    type Error = Q::Error;
49
50    fn len(&self) -> Result<usize, Self::Error> {
51        self.queue.len()
52    }
53
54    fn is_empty(&self) -> Result<bool, Self::Error> {
55        self.queue.is_empty()
56    }
57
58    fn push(&mut self, ticket: RedeemableTicket) -> Result<(), Self::Error> {
59        self.value_cache
60            .entry(ticket.verified_ticket().channel_epoch)
61            .or_default()
62            .add_assign(ticket.verified_ticket().amount);
63        self.queue.push(ticket)
64    }
65
66    fn pop(&mut self) -> Result<Option<RedeemableTicket>, Self::Error> {
67        let ticket = self.queue.pop()?;
68        if let Some(ticket) = &ticket {
69            // NOTE: that all the arithmetic operations on the `HoprBalance` type are naturally saturating.
70            self.value_cache
71                .entry(ticket.verified_ticket().channel_epoch)
72                .or_default()
73                .sub_assign(ticket.verified_ticket().amount);
74        }
75        Ok(ticket)
76    }
77
78    fn peek(&self) -> Result<Option<RedeemableTicket>, Self::Error> {
79        self.queue.peek()
80    }
81
82    fn iter_unordered(&self) -> Result<impl Iterator<Item = Result<RedeemableTicket, Self::Error>>, Self::Error> {
83        self.queue.iter_unordered()
84    }
85
86    fn total_value(&self, epoch: u32, min_index: Option<u64>) -> Result<HoprBalance, Self::Error> {
87        if min_index.is_none()
88            && let Some(value) = self.value_cache.get(&epoch)
89        {
90            return Ok(*value);
91        }
92
93        default_total_value(&self.queue, epoch, min_index)
94    }
95}
96
97#[cfg(test)]
98pub mod tests {
99    use std::ops::AddAssign;
100
101    use hopr_api::chain::HoprBalance;
102
103    use crate::{
104        backend::{ValueCachedQueue, memory},
105        traits::tests::*,
106    };
107
108    #[test]
109    fn value_cached_queue_maintains_natural_ticket_order() -> anyhow::Result<()> {
110        queue_maintains_natural_ticket_order(ValueCachedQueue::new(memory::MemoryTicketQueue::default())?)
111    }
112
113    #[test]
114    fn value_cached_queue_returns_all_tickets() -> anyhow::Result<()> {
115        queue_returns_all_tickets(ValueCachedQueue::new(memory::MemoryTicketQueue::default())?)
116    }
117
118    #[test]
119    fn value_cached_queue_is_empty_when_drained() -> anyhow::Result<()> {
120        queue_is_empty_when_drained(ValueCachedQueue::new(memory::MemoryTicketQueue::default())?)
121    }
122
123    #[test]
124    fn value_cached_queue_returns_empty_iterator_when_drained() -> anyhow::Result<()> {
125        queue_returns_empty_iterator_when_drained(ValueCachedQueue::new(memory::MemoryTicketQueue::default())?)
126    }
127
128    #[test]
129    fn value_cached_queue_returns_correct_total_ticket_value() -> anyhow::Result<()> {
130        queue_returns_correct_total_ticket_value(ValueCachedQueue::new(memory::MemoryTicketQueue::default())?)
131    }
132
133    #[test]
134    fn value_cached_queue_returns_correct_total_ticket_value_with_min_index() -> anyhow::Result<()> {
135        queue_returns_correct_total_ticket_value_with_min_index(ValueCachedQueue::new(
136            memory::MemoryTicketQueue::default(),
137        )?)
138    }
139
140    #[test]
141    fn value_cache_queue_populates_cache_with_existing_tickets() -> anyhow::Result<()> {
142        let tickets = generate_tickets()?;
143        let mut queue_1 = memory::MemoryTicketQueue::default();
144        fill_queue(&mut queue_1, tickets.iter().copied())?;
145
146        let mut total_value_per_epoch = hashbrown::HashMap::<u32, HoprBalance>::new();
147        tickets.into_iter().for_each(|ticket| {
148            total_value_per_epoch
149                .entry(ticket.verified_ticket().channel_epoch)
150                .or_default()
151                .add_assign(ticket.verified_ticket().amount);
152        });
153
154        let queue_2 = ValueCachedQueue::new(queue_1)?;
155        assert_eq!(total_value_per_epoch, queue_2.value_cache);
156
157        Ok(())
158    }
159}