hopr_ticket_manager/backend/
redb.rs1use std::str::FromStr;
2
3use hopr_api::{
4 chain::{ChannelId, RedeemableTicket, VerifiedTicket},
5 types::primitive::prelude::BytesRepresentable,
6};
7use redb::{ReadableDatabase, ReadableTable, ReadableTableMetadata, TableDefinition, TableHandle};
8
9use crate::{OutgoingIndexStore, TicketQueue, TicketQueueStore};
10
11const OUT_IDX_TABLE: TableDefinition<([u8; ChannelId::SIZE], u32), u64> = TableDefinition::new("channel_out_index");
12
13#[derive(Debug)]
17pub struct RedbStore {
18 db: std::sync::Arc<redb::Database>,
19 _tmp: Option<tempfile::NamedTempFile>,
20}
21
22impl RedbStore {
23 pub fn new(path: impl AsRef<std::path::Path>) -> Result<Self, RedbStoreError> {
25 let db = std::sync::Arc::new(redb::Database::create(path)?);
26 let tx = db.begin_write()?;
27 tx.open_table(OUT_IDX_TABLE)?;
28 tx.commit()?;
29 Ok(Self { db, _tmp: None })
30 }
31
32 pub fn new_temp() -> Result<Self, RedbStoreError> {
36 let tempfile = tempfile::NamedTempFile::new()?;
37 tracing::debug!(path = ?tempfile.path(), "redb store created");
38 RedbStore::new(tempfile.path()).map(|mut store| {
39 store._tmp = Some(tempfile);
40 store
41 })
42 }
43}
44
45impl OutgoingIndexStore for RedbStore {
46 type Error = RedbStoreError;
47
48 fn load_outgoing_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<Option<u64>, Self::Error> {
49 let tx = self.db.begin_read()?;
50 let table = tx.open_table(OUT_IDX_TABLE)?;
51 Ok(table.get(((*channel_id).into(), epoch))?.map(|v| v.value()))
52 }
53
54 fn save_outgoing_index(&mut self, channel_id: &ChannelId, epoch: u32, index: u64) -> Result<(), Self::Error> {
55 let tx = self.db.begin_write()?;
56 {
57 let mut table = tx.open_table(OUT_IDX_TABLE)?;
58 table.insert(((*channel_id).into(), epoch), index)?;
59 }
60 tx.commit()?;
61 Ok(())
62 }
63
64 fn delete_outgoing_index(&mut self, channel_id: &ChannelId, epoch: u32) -> Result<(), Self::Error> {
65 let tx = self.db.begin_write()?;
66 {
67 let mut table = tx.open_table(OUT_IDX_TABLE)?;
68 table.remove(((*channel_id).into(), epoch))?;
69 }
70 tx.commit()?;
71 Ok(())
72 }
73
74 fn iter_outgoing_indices(&self) -> Result<impl Iterator<Item = (ChannelId, u32)>, Self::Error> {
75 let tx = self.db.begin_read()?;
76 let table = tx.open_table(OUT_IDX_TABLE)?;
77 Ok(table
78 .iter()?
79 .filter_map(|v| {
80 v.inspect_err(|error| tracing::error!(%error, "outgoing index corrupted in redb storage"))
81 .ok()
82 })
83 .map(|(k, _)| (k.value().0.into(), k.value().1))
84 .collect::<Vec<_>>()
85 .into_iter())
86 }
87}
88
89const TABLE_QUEUE_NAME_PREFIX: &str = "ctq_";
90
91type TicketTableDef<'a> = TableDefinition<'a, u128, Vec<u8>>;
92
93#[inline]
94fn make_index(ticket: &RedeemableTicket) -> u128 {
95 ((ticket.verified_ticket().channel_epoch as u128) << 64) | ticket.verified_ticket().index as u128
96}
97
98impl TicketQueueStore for RedbStore {
99 type Queue = RedbTicketQueue;
100
101 fn open_or_create_queue(
102 &mut self,
103 channel_id: &ChannelId,
104 ) -> Result<Self::Queue, <Self::Queue as TicketQueue>::Error> {
105 {
106 let tx = self.db.begin_write()?;
107 tx.open_table(TicketTableDef::new(&format!("{TABLE_QUEUE_NAME_PREFIX}{channel_id}")))?;
108 tx.commit()?;
109 }
110
111 Ok(RedbTicketQueue {
112 db: std::sync::Arc::downgrade(&self.db),
113 channel_id: *channel_id,
114 })
115 }
116
117 fn delete_queue(
118 &mut self,
119 channel_id: &ChannelId,
120 ) -> Result<Vec<VerifiedTicket>, <Self::Queue as TicketQueue>::Error> {
121 let tx = self.db.begin_write()?;
122 let mut ret = Vec::new();
123 {
124 let mut table = tx.open_table(TicketTableDef::new(&format!("{TABLE_QUEUE_NAME_PREFIX}{channel_id}")))?;
126 while let Some((_, ticket)) = table.pop_first()? {
127 let ticket: RedeemableTicket = postcard::from_bytes(&ticket.value())?;
128 ret.push(ticket.ticket); }
130 }
131 tx.delete_table(TicketTableDef::new(&format!("{TABLE_QUEUE_NAME_PREFIX}{channel_id}")))?;
132 tx.commit()?;
133
134 Ok(ret)
135 }
136
137 fn iter_queues(&self) -> Result<impl Iterator<Item = ChannelId>, <Self::Queue as TicketQueue>::Error> {
138 let tx = self.db.begin_read()?;
139 Ok(tx
140 .list_tables()?
141 .filter_map(|t| {
142 t.name()
143 .strip_prefix(TABLE_QUEUE_NAME_PREFIX)
144 .and_then(|c| ChannelId::from_str(c).ok())
145 })
146 .collect::<Vec<_>>()
147 .into_iter())
148 }
149}
150
151pub struct RedbTicketQueue {
154 db: std::sync::Weak<redb::Database>,
155 channel_id: ChannelId,
156}
157
158impl TicketQueue for RedbTicketQueue {
159 type Error = RedbStoreError;
160
161 fn len(&self) -> Result<usize, Self::Error> {
162 if let Some(db) = self.db.upgrade() {
163 let tx = db.begin_read()?;
164 let table = tx.open_table(TicketTableDef::new(&format!(
165 "{TABLE_QUEUE_NAME_PREFIX}{}",
166 self.channel_id
167 )))?;
168 Ok(table.len()? as usize)
169 } else {
170 Err(RedbStoreError::Database(redb::Error::DatabaseClosed))
171 }
172 }
173
174 fn is_empty(&self) -> Result<bool, Self::Error> {
175 if let Some(db) = self.db.upgrade() {
176 let tx = db.begin_read()?;
177 let table = tx.open_table(TicketTableDef::new(&format!(
178 "{TABLE_QUEUE_NAME_PREFIX}{}",
179 self.channel_id
180 )))?;
181 Ok(table.is_empty()?)
182 } else {
183 Err(RedbStoreError::Database(redb::Error::DatabaseClosed))
184 }
185 }
186
187 fn push(&mut self, ticket: RedeemableTicket) -> Result<(), Self::Error> {
188 if let Some(db) = self.db.upgrade() {
189 let tx = db.begin_write()?;
190 {
191 let mut table = tx.open_table(TicketTableDef::new(&format!(
192 "{TABLE_QUEUE_NAME_PREFIX}{}",
193 self.channel_id
194 )))?;
195 table.insert(make_index(&ticket), postcard::to_stdvec(&ticket)?)?;
196 }
197 tx.commit()?;
198 Ok(())
199 } else {
200 Err(RedbStoreError::Database(redb::Error::DatabaseClosed))
201 }
202 }
203
204 fn pop(&mut self) -> Result<Option<RedeemableTicket>, Self::Error> {
205 if let Some(db) = self.db.upgrade() {
206 let tx = db.begin_write()?;
207 let maybe_ticket = {
208 let mut table = tx.open_table(TicketTableDef::new(&format!(
209 "{TABLE_QUEUE_NAME_PREFIX}{}",
210 self.channel_id
211 )))?;
212 table.pop_first()?.map(|(_, v)| v.value())
213 };
214 tx.commit()?;
217 if let Some(ticket_bytes) = maybe_ticket {
218 Ok(Some(postcard::from_bytes(&ticket_bytes)?))
219 } else {
220 Ok(None)
221 }
222 } else {
223 Err(RedbStoreError::Database(redb::Error::DatabaseClosed))
224 }
225 }
226
227 fn peek(&self) -> Result<Option<RedeemableTicket>, Self::Error> {
228 if let Some(db) = self.db.upgrade() {
229 let tx = db.begin_read()?;
230 let table = tx.open_table(TicketTableDef::new(&format!(
231 "{TABLE_QUEUE_NAME_PREFIX}{}",
232 self.channel_id
233 )))?;
234 let ticket_bytes = table.first()?.map(|(_, v)| v.value());
235 if let Some(ticket_bytes) = ticket_bytes {
236 Ok(Some(postcard::from_bytes(&ticket_bytes)?))
237 } else {
238 Ok(None)
239 }
240 } else {
241 Err(RedbStoreError::Database(redb::Error::DatabaseClosed))
242 }
243 }
244
245 fn iter_unordered(&self) -> Result<impl Iterator<Item = Result<RedeemableTicket, Self::Error>>, Self::Error> {
246 if let Some(db) = self.db.upgrade() {
247 let tx = db.begin_read()?;
248 let table = tx.open_table(TicketTableDef::new(&format!(
249 "{TABLE_QUEUE_NAME_PREFIX}{}",
250 self.channel_id
251 )))?;
252 Ok(table
253 .iter()?
254 .map(|result| {
255 result.map_err(RedbStoreError::from).and_then(|(_, v)| {
256 postcard::from_bytes::<RedeemableTicket>(&v.value()).map_err(RedbStoreError::from)
257 })
258 })
259 .collect::<Vec<Result<RedeemableTicket, RedbStoreError>>>()
260 .into_iter())
261 } else {
262 Err(RedbStoreError::Database(redb::Error::DatabaseClosed))
263 }
264 }
265}
266
267#[derive(Debug, thiserror::Error)]
269pub enum RedbStoreError {
270 #[error("database error: {0}")]
271 Database(#[from] redb::Error),
272 #[error("serialization error: {0}")]
273 Serialization(#[from] postcard::Error),
274 #[error("I/O error: {0}")]
275 Io(#[from] std::io::Error),
276 #[error(transparent)]
277 Other(#[from] anyhow::Error),
278}
279
280impl From<redb::DatabaseError> for RedbStoreError {
281 fn from(error: redb::DatabaseError) -> Self {
282 Self::Database(error.into())
283 }
284}
285
286impl From<redb::TransactionError> for RedbStoreError {
287 fn from(error: redb::TransactionError) -> Self {
288 Self::Database(error.into())
289 }
290}
291
292impl From<redb::TableError> for RedbStoreError {
293 fn from(error: redb::TableError) -> Self {
294 Self::Database(error.into())
295 }
296}
297
298impl From<redb::StorageError> for RedbStoreError {
299 fn from(error: redb::StorageError) -> Self {
300 Self::Database(error.into())
301 }
302}
303
304impl From<redb::CommitError> for RedbStoreError {
305 fn from(error: redb::CommitError) -> Self {
306 Self::Database(error.into())
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use crate::traits::tests::*;
314
315 #[test]
316 fn redb_queue_maintains_natural_ticket_order() -> anyhow::Result<()> {
317 let file = tempfile::NamedTempFile::new()?;
318 queue_maintains_natural_ticket_order(RedbStore::new(file)?.open_or_create_queue(&Default::default())?)
319 }
320
321 #[test]
322 fn redb_queue_returns_all_tickets() -> anyhow::Result<()> {
323 let file = tempfile::NamedTempFile::new()?;
324 queue_returns_all_tickets(RedbStore::new(file)?.open_or_create_queue(&Default::default())?)
325 }
326 #[test]
327 fn redb_queue_is_empty_when_drained() -> anyhow::Result<()> {
328 let file = tempfile::NamedTempFile::new()?;
329 queue_is_empty_when_drained(RedbStore::new(file)?.open_or_create_queue(&Default::default())?)
330 }
331
332 #[test]
333 fn redb_queue_returns_empty_iterator_when_drained() -> anyhow::Result<()> {
334 let file = tempfile::NamedTempFile::new()?;
335 queue_returns_empty_iterator_when_drained(RedbStore::new(file)?.open_or_create_queue(&Default::default())?)
336 }
337 #[test]
338 fn redb_queue_returns_correct_total_ticket_value() -> anyhow::Result<()> {
339 let file = tempfile::NamedTempFile::new()?;
340 queue_returns_correct_total_ticket_value(RedbStore::new(file)?.open_or_create_queue(&Default::default())?)
341 }
342
343 #[test]
344 fn redb_queue_returns_correct_total_ticket_value_with_min_index() -> anyhow::Result<()> {
345 let file = tempfile::NamedTempFile::new()?;
346 queue_returns_correct_total_ticket_value_with_min_index(
347 RedbStore::new(file)?.open_or_create_queue(&Default::default())?,
348 )
349 }
350
351 #[test]
352 fn redb_out_index_store_should_load_existing_index_for_channel_epoch() -> anyhow::Result<()> {
353 let file = tempfile::NamedTempFile::new()?;
354 out_index_store_should_load_existing_index_for_channel_epoch(RedbStore::new(file)?)
355 }
356
357 #[test]
358 fn redb_out_index_store_should_not_load_non_existing_index_for_channel_epoch() -> anyhow::Result<()> {
359 let file = tempfile::NamedTempFile::new()?;
360 out_index_store_should_not_load_non_existing_index_for_channel_epoch(RedbStore::new(file)?)
361 }
362
363 #[test]
364 fn redb_out_index_store_should_store_new_index_for_channel_epoch() -> anyhow::Result<()> {
365 let file = tempfile::NamedTempFile::new()?;
366 out_index_store_should_store_new_index_for_channel_epoch(RedbStore::new(file)?)
367 }
368
369 #[test]
370 fn redb_out_index_store_should_delete_existing_index_for_channel_epoch() -> anyhow::Result<()> {
371 let file = tempfile::NamedTempFile::new()?;
372 out_index_store_should_delete_existing_index_for_channel_epoch(RedbStore::new(file)?)
373 }
374
375 #[test]
376 fn redb_out_index_should_update_existing_index_for_channel_epoch() -> anyhow::Result<()> {
377 let file = tempfile::NamedTempFile::new()?;
378 out_index_should_update_existing_index_for_channel_epoch(RedbStore::new(file)?)
379 }
380
381 #[test]
382 fn redb_out_index_store_should_iterate_existing_indices_for_channel_epoch() -> anyhow::Result<()> {
383 let file = tempfile::NamedTempFile::new()?;
384 out_index_store_should_iterate_existing_indices_for_channel_epoch(RedbStore::new(file)?)
385 }
386
387 #[test]
388 fn redb_ticket_store_should_create_new_queue_for_channel() -> anyhow::Result<()> {
389 let file = tempfile::NamedTempFile::new()?;
390 ticket_store_should_create_new_queue_for_channel(RedbStore::new(file)?)
391 }
392
393 #[test]
394 fn redb_ticket_store_should_open_existing_queue_for_channel() -> anyhow::Result<()> {
395 let file = tempfile::NamedTempFile::new()?;
396 ticket_store_should_open_existing_queue_for_channel(RedbStore::new(file)?)
397 }
398
399 #[test]
400 fn redb_ticket_store_should_delete_existing_queue_for_channel() -> anyhow::Result<()> {
401 let file = tempfile::NamedTempFile::new()?;
402 ticket_store_should_delete_existing_queue_for_channel(RedbStore::new(file)?)
403 }
404
405 #[test]
406 fn redb_ticket_store_should_delete_existing_queue_for_channel_and_return_neglected_tickets() -> anyhow::Result<()> {
407 let file = tempfile::NamedTempFile::new()?;
408 ticket_store_should_delete_existing_queue_for_channel_and_return_neglected_tickets(RedbStore::new(file)?)
409 }
410
411 #[test]
412 fn redb_ticket_store_should_iterate_existing_queues_for_channel() -> anyhow::Result<()> {
413 let file = tempfile::NamedTempFile::new()?;
414 ticket_store_should_iterate_existing_queues_for_channel(RedbStore::new(file)?)
415 }
416
417 #[test]
418 fn redb_ticket_store_should_not_fail_to_delete_non_existing_queue_for_channel() -> anyhow::Result<()> {
419 let file = tempfile::NamedTempFile::new()?;
420 ticket_store_should_not_fail_to_delete_non_existing_queue_for_channel(RedbStore::new(file)?)
421 }
422}