hopr_ticket_manager/backend/
memory.rs1use hopr_api::chain::{ChannelId, RedeemableTicket, VerifiedTicket};
2
3use crate::{
4 OutgoingIndexStore,
5 traits::{TicketQueue, TicketQueueStore},
6};
7
8#[derive(Clone, Debug, Default)]
12pub struct MemoryStore {
13 tickets: std::collections::HashMap<ChannelId, MemoryTicketQueue>,
14 out_indices: std::collections::HashMap<(ChannelId, u32), u64>,
15}
16
17impl TicketQueueStore for MemoryStore {
18 type Queue = MemoryTicketQueue;
19
20 fn open_or_create_queue(
21 &mut self,
22 channel_id: &ChannelId,
23 ) -> Result<Self::Queue, <Self::Queue as TicketQueue>::Error> {
24 Ok(self.tickets.entry(*channel_id).or_default().clone())
25 }
26
27 fn delete_queue(
28 &mut self,
29 channel_id: &ChannelId,
30 ) -> Result<Vec<VerifiedTicket>, <Self::Queue as TicketQueue>::Error> {
31 if let Some(mut queue) = self.tickets.remove(channel_id) {
32 Ok(queue.drain()?)
33 } else {
34 Ok(Vec::with_capacity(0))
35 }
36 }
37
38 fn iter_queues(&self) -> Result<impl Iterator<Item = ChannelId>, <Self::Queue as TicketQueue>::Error> {
39 Ok(self.tickets.keys().copied())
40 }
41}
42
43impl OutgoingIndexStore for MemoryStore {
44 type Error = std::convert::Infallible;
45
46 fn load_outgoing_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<Option<u64>, Self::Error> {
47 Ok(self.out_indices.get(&(*channel_id, epoch)).copied())
48 }
49
50 fn save_outgoing_index(&mut self, channel_id: &ChannelId, epoch: u32, index: u64) -> Result<(), Self::Error> {
51 self.out_indices.insert((*channel_id, epoch), index);
52 Ok(())
53 }
54
55 fn delete_outgoing_index(&mut self, channel_id: &ChannelId, epoch: u32) -> Result<(), Self::Error> {
56 self.out_indices.remove(&(*channel_id, epoch));
57 Ok(())
58 }
59
60 fn iter_outgoing_indices(&self) -> Result<impl Iterator<Item = (ChannelId, u32)>, Self::Error> {
61 Ok(self.out_indices.keys().copied())
62 }
63}
64
65#[derive(Clone, Debug, Default)]
71pub struct MemoryTicketQueue(
72 std::sync::Arc<parking_lot::RwLock<std::collections::BinaryHeap<std::cmp::Reverse<RedeemableTicket>>>>,
73);
74
75impl TicketQueue for MemoryTicketQueue {
76 type Error = std::convert::Infallible;
77
78 fn len(&self) -> Result<usize, Self::Error> {
79 Ok(self.0.read().len())
80 }
81
82 fn is_empty(&self) -> Result<bool, Self::Error> {
83 Ok(self.0.read().is_empty())
84 }
85
86 fn push(&mut self, ticket: RedeemableTicket) -> Result<(), Self::Error> {
87 self.0.write().push(std::cmp::Reverse(ticket));
88 Ok(())
89 }
90
91 fn pop(&mut self) -> Result<Option<RedeemableTicket>, Self::Error> {
92 Ok(self.0.write().pop().map(|ticket| ticket.0))
93 }
94
95 fn peek(&self) -> Result<Option<RedeemableTicket>, Self::Error> {
96 Ok(self.0.read().peek().cloned().map(|ticket| ticket.0))
97 }
98
99 fn iter_unordered(&self) -> Result<impl Iterator<Item = Result<RedeemableTicket, Self::Error>>, Self::Error> {
100 Ok(self
101 .0
102 .read()
103 .iter()
104 .cloned()
105 .map(|t| Ok(t.0))
106 .collect::<Vec<_>>()
107 .into_iter())
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use crate::traits::tests::*;
115
116 #[test]
117 fn memory_queue_maintains_natural_ticket_order() -> anyhow::Result<()> {
118 queue_maintains_natural_ticket_order(MemoryTicketQueue::default())
119 }
120
121 #[test]
122 fn memory_queue_returns_all_tickets() -> anyhow::Result<()> {
123 queue_returns_all_tickets(MemoryTicketQueue::default())
124 }
125 #[test]
126 fn memory_queue_is_empty_when_drained() -> anyhow::Result<()> {
127 queue_is_empty_when_drained(MemoryTicketQueue::default())
128 }
129
130 #[test]
131 fn memory_queue_returns_empty_iterator_when_drained() -> anyhow::Result<()> {
132 queue_returns_empty_iterator_when_drained(MemoryTicketQueue::default())
133 }
134 #[test]
135 fn memory_queue_returns_correct_total_ticket_value() -> anyhow::Result<()> {
136 queue_returns_correct_total_ticket_value(MemoryTicketQueue::default())
137 }
138
139 #[test]
140 fn memory_queue_returns_correct_total_ticket_value_with_min_index() -> anyhow::Result<()> {
141 queue_returns_correct_total_ticket_value_with_min_index(MemoryTicketQueue::default())
142 }
143
144 #[test]
145 fn memory_out_index_store_should_load_existing_index_for_channel_epoch() -> anyhow::Result<()> {
146 out_index_store_should_load_existing_index_for_channel_epoch(MemoryStore::default())
147 }
148
149 #[test]
150 fn memory_out_index_store_should_not_load_non_existing_index_for_channel_epoch() -> anyhow::Result<()> {
151 out_index_store_should_not_load_non_existing_index_for_channel_epoch(MemoryStore::default())
152 }
153
154 #[test]
155 fn memory_out_index_store_should_store_new_index_for_channel_epoch() -> anyhow::Result<()> {
156 out_index_store_should_store_new_index_for_channel_epoch(MemoryStore::default())
157 }
158
159 #[test]
160 fn memory_out_index_store_should_delete_existing_index_for_channel_epoch() -> anyhow::Result<()> {
161 out_index_store_should_delete_existing_index_for_channel_epoch(MemoryStore::default())
162 }
163
164 #[test]
165 fn memory_out_index_should_update_existing_index_for_channel_epoch() -> anyhow::Result<()> {
166 out_index_should_update_existing_index_for_channel_epoch(MemoryStore::default())
167 }
168
169 #[test]
170 fn memory_out_index_store_should_iterate_existing_indices_for_channel_epoch() -> anyhow::Result<()> {
171 out_index_store_should_iterate_existing_indices_for_channel_epoch(MemoryStore::default())
172 }
173
174 #[test]
175 fn memory_ticket_store_should_create_new_queue_for_channel() -> anyhow::Result<()> {
176 ticket_store_should_create_new_queue_for_channel(MemoryStore::default())
177 }
178
179 #[test]
180 fn memory_ticket_store_should_open_existing_queue_for_channel() -> anyhow::Result<()> {
181 ticket_store_should_open_existing_queue_for_channel(MemoryStore::default())
182 }
183
184 #[test]
185 fn memory_ticket_store_should_delete_existing_queue_for_channel() -> anyhow::Result<()> {
186 ticket_store_should_delete_existing_queue_for_channel(MemoryStore::default())
187 }
188
189 #[test]
190 fn memory_ticket_store_should_delete_existing_queue_for_channel_and_return_neglected_tickets() -> anyhow::Result<()>
191 {
192 ticket_store_should_delete_existing_queue_for_channel_and_return_neglected_tickets(MemoryStore::default())
193 }
194
195 #[test]
196 fn memory_ticket_store_should_iterate_existing_queues_for_channel() -> anyhow::Result<()> {
197 ticket_store_should_iterate_existing_queues_for_channel(MemoryStore::default())
198 }
199
200 #[test]
201 fn memory_ticket_store_should_not_fail_to_delete_non_existing_queue_for_channel() -> anyhow::Result<()> {
202 ticket_store_should_not_fail_to_delete_non_existing_queue_for_channel(MemoryStore::default())
203 }
204}