Skip to main content

hopr_ticket_manager/backend/
mod.rs

1mod memory;
2
3#[cfg(feature = "redb")]
4mod redb;
5
6use std::ops::{AddAssign, SubAssign};
7
8use hopr_api::chain::{HoprBalance, RedeemableTicket};
9pub use memory::*;
10#[cfg(feature = "redb")]
11pub use redb::*;
12
13use crate::{TicketQueue, traits::default_total_value};
14
15/// Adapter for [`TicketQueue`] that caches the total ticket value per channel epoch.
16///
17/// The cache value is updated with each `push` and `pop` operation.
18/// The [`total_value`](TicketQueue::total_value) method
19/// returns the cached value if available, otherwise it delegates to the underlying queue.
20/// If `min_index` is provided, the cache is bypassed and the underlying queue is queried directly.
21///
22/// All other calls are simply delegated to the underlying queue.
23///
24/// The implementation uses `hashbrown::HashMap` for efficient key-value storage.
25#[derive(Clone, Debug)]
26pub struct ValueCachedQueue<Q> {
27    queue: Q,
28    // Caches total ticket value per channel epoch
29    value_cache: hashbrown::HashMap<u32, HoprBalance>,
30}
31
32impl<Q: TicketQueue> ValueCachedQueue<Q> {
33    pub fn new(queue: Q) -> Result<Self, Q::Error> {
34        let mut value_cache = hashbrown::HashMap::<u32, HoprBalance>::new();
35        // Load all pre-existing ticket values into the cache
36        queue.iter_unordered()?.filter_map(|res| res.ok()).for_each(|ticket| {
37            value_cache
38                .entry(ticket.verified_ticket().channel_epoch)
39                .or_default()
40                .add_assign(ticket.verified_ticket().amount);
41        });
42
43        Ok(Self { queue, value_cache })
44    }
45}
46
47impl<Q: TicketQueue> TicketQueue for ValueCachedQueue<Q> {
48    type Error = Q::Error;
49
50    fn len(&self) -> Result<usize, Self::Error> {
51        self.queue.len()
52    }
53
54    fn is_empty(&self) -> Result<bool, Self::Error> {
55        self.queue.is_empty()
56    }
57
58    fn push(&mut self, ticket: RedeemableTicket) -> Result<(), Self::Error> {
59        self.value_cache
60            .entry(ticket.verified_ticket().channel_epoch)
61            .or_default()
62            .add_assign(ticket.verified_ticket().amount);
63        self.queue.push(ticket)
64    }
65
66    fn pop(&mut self) -> Result<Option<RedeemableTicket>, Self::Error> {
67        let ticket = self.queue.pop()?;
68        if let Some(ticket) = &ticket {
69            // NOTE: that all the arithmetic operations on the `HoprBalance` type are naturally saturating.
70            self.value_cache
71                .entry(ticket.verified_ticket().channel_epoch)
72                .or_default()
73                .sub_assign(ticket.verified_ticket().amount);
74        }
75        Ok(ticket)
76    }
77
78    fn peek(&self) -> Result<Option<RedeemableTicket>, Self::Error> {
79        self.queue.peek()
80    }
81
82    fn iter_unordered(&self) -> Result<impl Iterator<Item = Result<RedeemableTicket, Self::Error>>, Self::Error> {
83        self.queue.iter_unordered()
84    }
85
86    fn total_value(&self, epoch: u32, min_index: Option<u64>) -> Result<HoprBalance, Self::Error> {
87        if min_index.is_none()
88            && let Some(value) = self.value_cache.get(&epoch)
89        {
90            return Ok(*value);
91        }
92
93        default_total_value(&self.queue, epoch, min_index)
94    }
95}
96
97#[cfg(test)]
98pub mod tests {
99    use std::ops::AddAssign;
100
101    use hopr_api::chain::HoprBalance;
102
103    use crate::{ValueCachedQueue, backend::memory, traits::tests::*};
104
105    #[test]
106    fn value_cached_queue_maintains_natural_ticket_order() -> anyhow::Result<()> {
107        queue_maintains_natural_ticket_order(ValueCachedQueue::new(memory::MemoryTicketQueue::default())?)
108    }
109
110    #[test]
111    fn value_cached_queue_returns_all_tickets() -> anyhow::Result<()> {
112        queue_returns_all_tickets(ValueCachedQueue::new(memory::MemoryTicketQueue::default())?)
113    }
114
115    #[test]
116    fn value_cached_queue_is_empty_when_drained() -> anyhow::Result<()> {
117        queue_is_empty_when_drained(ValueCachedQueue::new(memory::MemoryTicketQueue::default())?)
118    }
119
120    #[test]
121    fn value_cached_queue_returns_empty_iterator_when_drained() -> anyhow::Result<()> {
122        queue_returns_empty_iterator_when_drained(ValueCachedQueue::new(memory::MemoryTicketQueue::default())?)
123    }
124
125    #[test]
126    fn value_cached_queue_returns_correct_total_ticket_value() -> anyhow::Result<()> {
127        queue_returns_correct_total_ticket_value(ValueCachedQueue::new(memory::MemoryTicketQueue::default())?)
128    }
129
130    #[test]
131    fn value_cached_queue_returns_correct_total_ticket_value_with_min_index() -> anyhow::Result<()> {
132        queue_returns_correct_total_ticket_value_with_min_index(ValueCachedQueue::new(
133            memory::MemoryTicketQueue::default(),
134        )?)
135    }
136
137    #[test]
138    fn value_cache_queue_populates_cache_with_existing_tickets() -> anyhow::Result<()> {
139        let tickets = generate_tickets()?;
140        let mut queue_1 = memory::MemoryTicketQueue::default();
141        fill_queue(&mut queue_1, tickets.iter().copied())?;
142
143        let mut total_value_per_epoch = hashbrown::HashMap::<u32, HoprBalance>::new();
144        tickets.into_iter().for_each(|ticket| {
145            total_value_per_epoch
146                .entry(ticket.verified_ticket().channel_epoch)
147                .or_default()
148                .add_assign(ticket.verified_ticket().amount);
149        });
150
151        let queue_2 = ValueCachedQueue::new(queue_1)?;
152        assert_eq!(total_value_per_epoch, queue_2.value_cache);
153
154        Ok(())
155    }
156}