Skip to main content

hopr_ticket_manager/backend/
memory.rs

1use hopr_api::chain::{ChannelId, RedeemableTicket, VerifiedTicket};
2
3use crate::{
4    OutgoingIndexStore,
5    traits::{TicketQueue, TicketQueueStore},
6};
7
8/// Simple non-persistent ticket queue store backed by a `HashMap` and [`MemoryTicketQueue`].
9///
10/// Useful for non-persistent and testing scenarios.
11#[derive(Clone, Debug, Default)]
12pub struct MemoryStore {
13    tickets: std::collections::HashMap<ChannelId, MemoryTicketQueue>,
14    out_indices: std::collections::HashMap<(ChannelId, u32), u64>,
15}
16
17impl TicketQueueStore for MemoryStore {
18    type Queue = MemoryTicketQueue;
19
20    fn open_or_create_queue(
21        &mut self,
22        channel_id: &ChannelId,
23    ) -> Result<Self::Queue, <Self::Queue as TicketQueue>::Error> {
24        Ok(self.tickets.entry(*channel_id).or_default().clone())
25    }
26
27    fn delete_queue(
28        &mut self,
29        channel_id: &ChannelId,
30    ) -> Result<Vec<VerifiedTicket>, <Self::Queue as TicketQueue>::Error> {
31        if let Some(mut queue) = self.tickets.remove(channel_id) {
32            Ok(queue.drain()?)
33        } else {
34            Ok(Vec::with_capacity(0))
35        }
36    }
37
38    fn iter_queues(&self) -> Result<impl Iterator<Item = ChannelId>, <Self::Queue as TicketQueue>::Error> {
39        Ok(self.tickets.keys().copied())
40    }
41}
42
43impl OutgoingIndexStore for MemoryStore {
44    type Error = std::convert::Infallible;
45
46    fn load_outgoing_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<Option<u64>, Self::Error> {
47        Ok(self.out_indices.get(&(*channel_id, epoch)).copied())
48    }
49
50    fn save_outgoing_index(&mut self, channel_id: &ChannelId, epoch: u32, index: u64) -> Result<(), Self::Error> {
51        self.out_indices.insert((*channel_id, epoch), index);
52        Ok(())
53    }
54
55    fn delete_outgoing_index(&mut self, channel_id: &ChannelId, epoch: u32) -> Result<(), Self::Error> {
56        self.out_indices.remove(&(*channel_id, epoch));
57        Ok(())
58    }
59
60    fn iter_outgoing_indices(&self) -> Result<impl Iterator<Item = (ChannelId, u32)>, Self::Error> {
61        Ok(self.out_indices.keys().copied())
62    }
63}
64
65/// Simple in-memory ticket queue implementation using a binary heap.
66///
67/// This is suitable for testing where ticket persistence is not required.
68///
69/// The implementation might not be particularly efficient for production use.
70#[derive(Clone, Debug, Default)]
71pub struct MemoryTicketQueue(
72    std::sync::Arc<parking_lot::RwLock<std::collections::BinaryHeap<std::cmp::Reverse<RedeemableTicket>>>>,
73);
74
75impl TicketQueue for MemoryTicketQueue {
76    type Error = std::convert::Infallible;
77
78    fn len(&self) -> Result<usize, Self::Error> {
79        Ok(self.0.read().len())
80    }
81
82    fn is_empty(&self) -> Result<bool, Self::Error> {
83        Ok(self.0.read().is_empty())
84    }
85
86    fn push(&mut self, ticket: RedeemableTicket) -> Result<(), Self::Error> {
87        self.0.write().push(std::cmp::Reverse(ticket));
88        Ok(())
89    }
90
91    fn pop(&mut self) -> Result<Option<RedeemableTicket>, Self::Error> {
92        Ok(self.0.write().pop().map(|ticket| ticket.0))
93    }
94
95    fn peek(&self) -> Result<Option<RedeemableTicket>, Self::Error> {
96        Ok(self.0.read().peek().cloned().map(|ticket| ticket.0))
97    }
98
99    fn iter_unordered(&self) -> Result<impl Iterator<Item = Result<RedeemableTicket, Self::Error>>, Self::Error> {
100        Ok(self
101            .0
102            .read()
103            .iter()
104            .cloned()
105            .map(|t| Ok(t.0))
106            .collect::<Vec<_>>()
107            .into_iter())
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use crate::traits::tests::*;
115
116    #[test]
117    fn memory_queue_maintains_natural_ticket_order() -> anyhow::Result<()> {
118        queue_maintains_natural_ticket_order(MemoryTicketQueue::default())
119    }
120
121    #[test]
122    fn memory_queue_returns_all_tickets() -> anyhow::Result<()> {
123        queue_returns_all_tickets(MemoryTicketQueue::default())
124    }
125    #[test]
126    fn memory_queue_is_empty_when_drained() -> anyhow::Result<()> {
127        queue_is_empty_when_drained(MemoryTicketQueue::default())
128    }
129
130    #[test]
131    fn memory_queue_returns_empty_iterator_when_drained() -> anyhow::Result<()> {
132        queue_returns_empty_iterator_when_drained(MemoryTicketQueue::default())
133    }
134    #[test]
135    fn memory_queue_returns_correct_total_ticket_value() -> anyhow::Result<()> {
136        queue_returns_correct_total_ticket_value(MemoryTicketQueue::default())
137    }
138
139    #[test]
140    fn memory_queue_returns_correct_total_ticket_value_with_min_index() -> anyhow::Result<()> {
141        queue_returns_correct_total_ticket_value_with_min_index(MemoryTicketQueue::default())
142    }
143
144    #[test]
145    fn memory_out_index_store_should_load_existing_index_for_channel_epoch() -> anyhow::Result<()> {
146        out_index_store_should_load_existing_index_for_channel_epoch(MemoryStore::default())
147    }
148
149    #[test]
150    fn memory_out_index_store_should_not_load_non_existing_index_for_channel_epoch() -> anyhow::Result<()> {
151        out_index_store_should_not_load_non_existing_index_for_channel_epoch(MemoryStore::default())
152    }
153
154    #[test]
155    fn memory_out_index_store_should_store_new_index_for_channel_epoch() -> anyhow::Result<()> {
156        out_index_store_should_store_new_index_for_channel_epoch(MemoryStore::default())
157    }
158
159    #[test]
160    fn memory_out_index_store_should_delete_existing_index_for_channel_epoch() -> anyhow::Result<()> {
161        out_index_store_should_delete_existing_index_for_channel_epoch(MemoryStore::default())
162    }
163
164    #[test]
165    fn memory_out_index_should_update_existing_index_for_channel_epoch() -> anyhow::Result<()> {
166        out_index_should_update_existing_index_for_channel_epoch(MemoryStore::default())
167    }
168
169    #[test]
170    fn memory_out_index_store_should_iterate_existing_indices_for_channel_epoch() -> anyhow::Result<()> {
171        out_index_store_should_iterate_existing_indices_for_channel_epoch(MemoryStore::default())
172    }
173
174    #[test]
175    fn memory_ticket_store_should_create_new_queue_for_channel() -> anyhow::Result<()> {
176        ticket_store_should_create_new_queue_for_channel(MemoryStore::default())
177    }
178
179    #[test]
180    fn memory_ticket_store_should_open_existing_queue_for_channel() -> anyhow::Result<()> {
181        ticket_store_should_open_existing_queue_for_channel(MemoryStore::default())
182    }
183
184    #[test]
185    fn memory_ticket_store_should_delete_existing_queue_for_channel() -> anyhow::Result<()> {
186        ticket_store_should_delete_existing_queue_for_channel(MemoryStore::default())
187    }
188
189    #[test]
190    fn memory_ticket_store_should_delete_existing_queue_for_channel_and_return_neglected_tickets() -> anyhow::Result<()>
191    {
192        ticket_store_should_delete_existing_queue_for_channel_and_return_neglected_tickets(MemoryStore::default())
193    }
194
195    #[test]
196    fn memory_ticket_store_should_iterate_existing_queues_for_channel() -> anyhow::Result<()> {
197        ticket_store_should_iterate_existing_queues_for_channel(MemoryStore::default())
198    }
199
200    #[test]
201    fn memory_ticket_store_should_not_fail_to_delete_non_existing_queue_for_channel() -> anyhow::Result<()> {
202        ticket_store_should_not_fail_to_delete_non_existing_queue_for_channel(MemoryStore::default())
203    }
204}