Skip to main content

hopr_ticket_manager/backend/
redb.rs

1use std::str::FromStr;
2
3use hopr_api::{
4    chain::{ChannelId, RedeemableTicket, VerifiedTicket},
5    types::primitive::prelude::BytesRepresentable,
6};
7use redb::{ReadableDatabase, ReadableTable, ReadableTableMetadata, TableDefinition, TableHandle};
8
9use crate::{OutgoingIndexStore, TicketQueue, TicketQueueStore};
10
11const OUT_IDX_TABLE: TableDefinition<([u8; ChannelId::SIZE], u32), u64> = TableDefinition::new("channel_out_index");
12
13/// Implementation of [`OutgoingIndexStore`] and [`TicketQueueStore`] using `redb` database and `postcard` serializer.
14///
15/// The store is intentionally not cloneable to allow the owner to have complete ownership of the database.
16#[derive(Debug)]
17pub struct RedbStore {
18    db: std::sync::Arc<redb::Database>,
19    _tmp: Option<tempfile::NamedTempFile>,
20}
21
22impl RedbStore {
23    /// Creates a new instance on the given path.
24    pub fn new(path: impl AsRef<std::path::Path>) -> Result<Self, RedbStoreError> {
25        let db = std::sync::Arc::new(redb::Database::create(path)?);
26        let tx = db.begin_write()?;
27        tx.open_table(OUT_IDX_TABLE)?;
28        tx.commit()?;
29        Ok(Self { db, _tmp: None })
30    }
31
32    /// Creates a new instance using a temporary file.
33    ///
34    /// The temporary file is automatically deleted when the store is dropped.
35    pub fn new_temp() -> Result<Self, RedbStoreError> {
36        let tempfile = tempfile::NamedTempFile::new()?;
37        tracing::debug!(path = ?tempfile.path(), "redb store created");
38        RedbStore::new(tempfile.path()).map(|mut store| {
39            store._tmp = Some(tempfile);
40            store
41        })
42    }
43}
44
45impl OutgoingIndexStore for RedbStore {
46    type Error = RedbStoreError;
47
48    fn load_outgoing_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<Option<u64>, Self::Error> {
49        let tx = self.db.begin_read()?;
50        let table = tx.open_table(OUT_IDX_TABLE)?;
51        Ok(table.get(((*channel_id).into(), epoch))?.map(|v| v.value()))
52    }
53
54    fn save_outgoing_index(&mut self, channel_id: &ChannelId, epoch: u32, index: u64) -> Result<(), Self::Error> {
55        let tx = self.db.begin_write()?;
56        {
57            let mut table = tx.open_table(OUT_IDX_TABLE)?;
58            table.insert(((*channel_id).into(), epoch), index)?;
59        }
60        tx.commit()?;
61        Ok(())
62    }
63
64    fn delete_outgoing_index(&mut self, channel_id: &ChannelId, epoch: u32) -> Result<(), Self::Error> {
65        let tx = self.db.begin_write()?;
66        {
67            let mut table = tx.open_table(OUT_IDX_TABLE)?;
68            table.remove(((*channel_id).into(), epoch))?;
69        }
70        tx.commit()?;
71        Ok(())
72    }
73
74    fn iter_outgoing_indices(&self) -> Result<impl Iterator<Item = (ChannelId, u32)>, Self::Error> {
75        let tx = self.db.begin_read()?;
76        let table = tx.open_table(OUT_IDX_TABLE)?;
77        Ok(table
78            .iter()?
79            .filter_map(|v| {
80                v.inspect_err(|error| tracing::error!(%error, "outgoing index corrupted in redb storage"))
81                    .ok()
82            })
83            .map(|(k, _)| (k.value().0.into(), k.value().1))
84            .collect::<Vec<_>>()
85            .into_iter())
86    }
87}
88
89const TABLE_QUEUE_NAME_PREFIX: &str = "ctq_";
90
91type TicketTableDef<'a> = TableDefinition<'a, u128, Vec<u8>>;
92
93#[inline]
94fn make_index(ticket: &RedeemableTicket) -> u128 {
95    ((ticket.verified_ticket().channel_epoch as u128) << 64) | ticket.verified_ticket().index as u128
96}
97
98impl TicketQueueStore for RedbStore {
99    type Queue = RedbTicketQueue;
100
101    fn open_or_create_queue(
102        &mut self,
103        channel_id: &ChannelId,
104    ) -> Result<Self::Queue, <Self::Queue as TicketQueue>::Error> {
105        {
106            let tx = self.db.begin_write()?;
107            tx.open_table(TicketTableDef::new(&format!("{TABLE_QUEUE_NAME_PREFIX}{channel_id}")))?;
108            tx.commit()?;
109        }
110
111        Ok(RedbTicketQueue {
112            db: std::sync::Arc::downgrade(&self.db),
113            channel_id: *channel_id,
114        })
115    }
116
117    fn delete_queue(
118        &mut self,
119        channel_id: &ChannelId,
120    ) -> Result<Vec<VerifiedTicket>, <Self::Queue as TicketQueue>::Error> {
121        let tx = self.db.begin_write()?;
122        let mut ret = Vec::new();
123        {
124            // Drain all the tickets from the queue first
125            let mut table = tx.open_table(TicketTableDef::new(&format!("{TABLE_QUEUE_NAME_PREFIX}{channel_id}")))?;
126            while let Some((_, ticket)) = table.pop_first()? {
127                let ticket: RedeemableTicket = postcard::from_bytes(&ticket.value())?;
128                ret.push(ticket.ticket); // Make it unredeemable
129            }
130        }
131        tx.delete_table(TicketTableDef::new(&format!("{TABLE_QUEUE_NAME_PREFIX}{channel_id}")))?;
132        tx.commit()?;
133
134        Ok(ret)
135    }
136
137    fn iter_queues(&self) -> Result<impl Iterator<Item = ChannelId>, <Self::Queue as TicketQueue>::Error> {
138        let tx = self.db.begin_read()?;
139        Ok(tx
140            .list_tables()?
141            .filter_map(|t| {
142                t.name()
143                    .strip_prefix(TABLE_QUEUE_NAME_PREFIX)
144                    .and_then(|c| ChannelId::from_str(c).ok())
145            })
146            .collect::<Vec<_>>()
147            .into_iter())
148    }
149}
150
151/// Implementation of [`TicketQueue`] using `redb` database and `postcard` serializer,
152/// associated with the [`RedbStore`].
153pub struct RedbTicketQueue {
154    db: std::sync::Weak<redb::Database>,
155    channel_id: ChannelId,
156}
157
158impl TicketQueue for RedbTicketQueue {
159    type Error = RedbStoreError;
160
161    fn len(&self) -> Result<usize, Self::Error> {
162        if let Some(db) = self.db.upgrade() {
163            let tx = db.begin_read()?;
164            let table = tx.open_table(TicketTableDef::new(&format!(
165                "{TABLE_QUEUE_NAME_PREFIX}{}",
166                self.channel_id
167            )))?;
168            Ok(table.len()? as usize)
169        } else {
170            Err(RedbStoreError::Database(redb::Error::DatabaseClosed))
171        }
172    }
173
174    fn is_empty(&self) -> Result<bool, Self::Error> {
175        if let Some(db) = self.db.upgrade() {
176            let tx = db.begin_read()?;
177            let table = tx.open_table(TicketTableDef::new(&format!(
178                "{TABLE_QUEUE_NAME_PREFIX}{}",
179                self.channel_id
180            )))?;
181            Ok(table.is_empty()?)
182        } else {
183            Err(RedbStoreError::Database(redb::Error::DatabaseClosed))
184        }
185    }
186
187    fn push(&mut self, ticket: RedeemableTicket) -> Result<(), Self::Error> {
188        if let Some(db) = self.db.upgrade() {
189            let tx = db.begin_write()?;
190            {
191                let mut table = tx.open_table(TicketTableDef::new(&format!(
192                    "{TABLE_QUEUE_NAME_PREFIX}{}",
193                    self.channel_id
194                )))?;
195                table.insert(make_index(&ticket), postcard::to_stdvec(&ticket)?)?;
196            }
197            tx.commit()?;
198            Ok(())
199        } else {
200            Err(RedbStoreError::Database(redb::Error::DatabaseClosed))
201        }
202    }
203
204    fn pop(&mut self) -> Result<Option<RedeemableTicket>, Self::Error> {
205        if let Some(db) = self.db.upgrade() {
206            let tx = db.begin_write()?;
207            let maybe_ticket = {
208                let mut table = tx.open_table(TicketTableDef::new(&format!(
209                    "{TABLE_QUEUE_NAME_PREFIX}{}",
210                    self.channel_id
211                )))?;
212                table.pop_first()?.map(|(_, v)| v.value())
213            };
214            // Commit the pop operation here, even though the deserialization might fail later,
215            // so that we do not render the queue unusable by keeping the corrupted data in.
216            tx.commit()?;
217            if let Some(ticket_bytes) = maybe_ticket {
218                Ok(Some(postcard::from_bytes(&ticket_bytes)?))
219            } else {
220                Ok(None)
221            }
222        } else {
223            Err(RedbStoreError::Database(redb::Error::DatabaseClosed))
224        }
225    }
226
227    fn peek(&self) -> Result<Option<RedeemableTicket>, Self::Error> {
228        if let Some(db) = self.db.upgrade() {
229            let tx = db.begin_read()?;
230            let table = tx.open_table(TicketTableDef::new(&format!(
231                "{TABLE_QUEUE_NAME_PREFIX}{}",
232                self.channel_id
233            )))?;
234            let ticket_bytes = table.first()?.map(|(_, v)| v.value());
235            if let Some(ticket_bytes) = ticket_bytes {
236                Ok(Some(postcard::from_bytes(&ticket_bytes)?))
237            } else {
238                Ok(None)
239            }
240        } else {
241            Err(RedbStoreError::Database(redb::Error::DatabaseClosed))
242        }
243    }
244
245    fn iter_unordered(&self) -> Result<impl Iterator<Item = Result<RedeemableTicket, Self::Error>>, Self::Error> {
246        if let Some(db) = self.db.upgrade() {
247            let tx = db.begin_read()?;
248            let table = tx.open_table(TicketTableDef::new(&format!(
249                "{TABLE_QUEUE_NAME_PREFIX}{}",
250                self.channel_id
251            )))?;
252            Ok(table
253                .iter()?
254                .map(|result| {
255                    result.map_err(RedbStoreError::from).and_then(|(_, v)| {
256                        postcard::from_bytes::<RedeemableTicket>(&v.value()).map_err(RedbStoreError::from)
257                    })
258                })
259                .collect::<Vec<Result<RedeemableTicket, RedbStoreError>>>()
260                .into_iter())
261        } else {
262            Err(RedbStoreError::Database(redb::Error::DatabaseClosed))
263        }
264    }
265}
266
267/// Errors returned by the [`RedbStore`].
268#[derive(Debug, thiserror::Error)]
269pub enum RedbStoreError {
270    #[error("database error: {0}")]
271    Database(#[from] redb::Error),
272    #[error("serialization error: {0}")]
273    Serialization(#[from] postcard::Error),
274    #[error("I/O error: {0}")]
275    Io(#[from] std::io::Error),
276    #[error(transparent)]
277    Other(#[from] anyhow::Error),
278}
279
280impl From<redb::DatabaseError> for RedbStoreError {
281    fn from(error: redb::DatabaseError) -> Self {
282        Self::Database(error.into())
283    }
284}
285
286impl From<redb::TransactionError> for RedbStoreError {
287    fn from(error: redb::TransactionError) -> Self {
288        Self::Database(error.into())
289    }
290}
291
292impl From<redb::TableError> for RedbStoreError {
293    fn from(error: redb::TableError) -> Self {
294        Self::Database(error.into())
295    }
296}
297
298impl From<redb::StorageError> for RedbStoreError {
299    fn from(error: redb::StorageError) -> Self {
300        Self::Database(error.into())
301    }
302}
303
304impl From<redb::CommitError> for RedbStoreError {
305    fn from(error: redb::CommitError) -> Self {
306        Self::Database(error.into())
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use crate::traits::tests::*;
314
315    #[test]
316    fn redb_queue_maintains_natural_ticket_order() -> anyhow::Result<()> {
317        let file = tempfile::NamedTempFile::new()?;
318        queue_maintains_natural_ticket_order(RedbStore::new(file)?.open_or_create_queue(&Default::default())?)
319    }
320
321    #[test]
322    fn redb_queue_returns_all_tickets() -> anyhow::Result<()> {
323        let file = tempfile::NamedTempFile::new()?;
324        queue_returns_all_tickets(RedbStore::new(file)?.open_or_create_queue(&Default::default())?)
325    }
326    #[test]
327    fn redb_queue_is_empty_when_drained() -> anyhow::Result<()> {
328        let file = tempfile::NamedTempFile::new()?;
329        queue_is_empty_when_drained(RedbStore::new(file)?.open_or_create_queue(&Default::default())?)
330    }
331
332    #[test]
333    fn redb_queue_returns_empty_iterator_when_drained() -> anyhow::Result<()> {
334        let file = tempfile::NamedTempFile::new()?;
335        queue_returns_empty_iterator_when_drained(RedbStore::new(file)?.open_or_create_queue(&Default::default())?)
336    }
337    #[test]
338    fn redb_queue_returns_correct_total_ticket_value() -> anyhow::Result<()> {
339        let file = tempfile::NamedTempFile::new()?;
340        queue_returns_correct_total_ticket_value(RedbStore::new(file)?.open_or_create_queue(&Default::default())?)
341    }
342
343    #[test]
344    fn redb_queue_returns_correct_total_ticket_value_with_min_index() -> anyhow::Result<()> {
345        let file = tempfile::NamedTempFile::new()?;
346        queue_returns_correct_total_ticket_value_with_min_index(
347            RedbStore::new(file)?.open_or_create_queue(&Default::default())?,
348        )
349    }
350
351    #[test]
352    fn redb_out_index_store_should_load_existing_index_for_channel_epoch() -> anyhow::Result<()> {
353        let file = tempfile::NamedTempFile::new()?;
354        out_index_store_should_load_existing_index_for_channel_epoch(RedbStore::new(file)?)
355    }
356
357    #[test]
358    fn redb_out_index_store_should_not_load_non_existing_index_for_channel_epoch() -> anyhow::Result<()> {
359        let file = tempfile::NamedTempFile::new()?;
360        out_index_store_should_not_load_non_existing_index_for_channel_epoch(RedbStore::new(file)?)
361    }
362
363    #[test]
364    fn redb_out_index_store_should_store_new_index_for_channel_epoch() -> anyhow::Result<()> {
365        let file = tempfile::NamedTempFile::new()?;
366        out_index_store_should_store_new_index_for_channel_epoch(RedbStore::new(file)?)
367    }
368
369    #[test]
370    fn redb_out_index_store_should_delete_existing_index_for_channel_epoch() -> anyhow::Result<()> {
371        let file = tempfile::NamedTempFile::new()?;
372        out_index_store_should_delete_existing_index_for_channel_epoch(RedbStore::new(file)?)
373    }
374
375    #[test]
376    fn redb_out_index_should_update_existing_index_for_channel_epoch() -> anyhow::Result<()> {
377        let file = tempfile::NamedTempFile::new()?;
378        out_index_should_update_existing_index_for_channel_epoch(RedbStore::new(file)?)
379    }
380
381    #[test]
382    fn redb_out_index_store_should_iterate_existing_indices_for_channel_epoch() -> anyhow::Result<()> {
383        let file = tempfile::NamedTempFile::new()?;
384        out_index_store_should_iterate_existing_indices_for_channel_epoch(RedbStore::new(file)?)
385    }
386
387    #[test]
388    fn redb_ticket_store_should_create_new_queue_for_channel() -> anyhow::Result<()> {
389        let file = tempfile::NamedTempFile::new()?;
390        ticket_store_should_create_new_queue_for_channel(RedbStore::new(file)?)
391    }
392
393    #[test]
394    fn redb_ticket_store_should_open_existing_queue_for_channel() -> anyhow::Result<()> {
395        let file = tempfile::NamedTempFile::new()?;
396        ticket_store_should_open_existing_queue_for_channel(RedbStore::new(file)?)
397    }
398
399    #[test]
400    fn redb_ticket_store_should_delete_existing_queue_for_channel() -> anyhow::Result<()> {
401        let file = tempfile::NamedTempFile::new()?;
402        ticket_store_should_delete_existing_queue_for_channel(RedbStore::new(file)?)
403    }
404
405    #[test]
406    fn redb_ticket_store_should_delete_existing_queue_for_channel_and_return_neglected_tickets() -> anyhow::Result<()> {
407        let file = tempfile::NamedTempFile::new()?;
408        ticket_store_should_delete_existing_queue_for_channel_and_return_neglected_tickets(RedbStore::new(file)?)
409    }
410
411    #[test]
412    fn redb_ticket_store_should_iterate_existing_queues_for_channel() -> anyhow::Result<()> {
413        let file = tempfile::NamedTempFile::new()?;
414        ticket_store_should_iterate_existing_queues_for_channel(RedbStore::new(file)?)
415    }
416
417    #[test]
418    fn redb_ticket_store_should_not_fail_to_delete_non_existing_queue_for_channel() -> anyhow::Result<()> {
419        let file = tempfile::NamedTempFile::new()?;
420        ticket_store_should_not_fail_to_delete_non_existing_queue_for_channel(RedbStore::new(file)?)
421    }
422}