Skip to main content

hopr_chain_connector/backend/
tempdb.rs

1use hopr_api::{
2    chain::HoprKeyIdent,
3    types::{
4        crypto::prelude::OffchainPublicKey,
5        internal::{
6            account::AccountEntry,
7            channels::{ChannelEntry, ChannelId},
8        },
9        primitive::prelude::{Address, BytesRepresentable},
10    },
11};
12use redb::{ReadableDatabase, TableDefinition};
13
14/// Errors from the temporary database backend.
15#[derive(Debug, thiserror::Error)]
16pub enum TempDbError {
17    #[error("database error: {0}")]
18    Database(#[from] redb::Error),
19    #[error("serialization error: {0}")]
20    Serialization(#[from] postcard::Error),
21    #[error("I/O error: {0}")]
22    Io(#[from] std::io::Error),
23}
24
25impl From<redb::DatabaseError> for TempDbError {
26    fn from(error: redb::DatabaseError) -> Self {
27        Self::Database(error.into())
28    }
29}
30
31impl From<redb::TransactionError> for TempDbError {
32    fn from(error: redb::TransactionError) -> Self {
33        Self::Database(error.into())
34    }
35}
36
37impl From<redb::TableError> for TempDbError {
38    fn from(error: redb::TableError) -> Self {
39        Self::Database(error.into())
40    }
41}
42
43impl From<redb::StorageError> for TempDbError {
44    fn from(error: redb::StorageError) -> Self {
45        Self::Database(error.into())
46    }
47}
48
49impl From<redb::CommitError> for TempDbError {
50    fn from(error: redb::CommitError) -> Self {
51        Self::Database(error.into())
52    }
53}
54
55/// A backend that is implemented via [`redb`](https://docs.rs/redb/latest/redb/) database stored in a temporary file.
56///
57/// The database file is dropped once the last instance is dropped.
58#[derive(Clone)]
59pub struct TempDbBackend {
60    // `db` is declared before `_tmp` so the database is closed before the temp file is deleted
61    // (Rust drops fields in declaration order).
62    db: std::sync::Arc<redb::Database>,
63    _tmp: std::sync::Arc<tempfile::NamedTempFile>,
64}
65
66impl TempDbBackend {
67    pub fn new() -> Result<Self, TempDbError> {
68        let file = tempfile::NamedTempFile::new()?;
69
70        tracing::info!(path = %file.path().display(), "opened temporary redb database");
71
72        let db = redb::Database::create(file.path())?;
73
74        // Create all tables eagerly so that read-only lookups on a fresh
75        // database return Ok(None) instead of a TableError.
76        {
77            let write_tx = db.begin_write()?;
78            write_tx.open_table(ACCOUNTS_TABLE_DEF)?;
79            write_tx.open_table(CHANNELS_TABLE_DEF)?;
80            write_tx.open_table(ADDRESS_TO_ID)?;
81            write_tx.open_table(KEY_TO_ID)?;
82            write_tx.commit()?;
83        }
84
85        Ok(Self {
86            db: std::sync::Arc::new(db),
87            _tmp: std::sync::Arc::new(file),
88        })
89    }
90}
91
92const ACCOUNTS_TABLE_DEF: TableDefinition<u32, Vec<u8>> = TableDefinition::new("id_accounts");
93const CHANNELS_TABLE_DEF: TableDefinition<[u8; ChannelId::SIZE], Vec<u8>> = TableDefinition::new("id_channels");
94const ADDRESS_TO_ID: TableDefinition<[u8; Address::SIZE], u32> = TableDefinition::new("address_to_id");
95const KEY_TO_ID: TableDefinition<[u8; OffchainPublicKey::SIZE], u32> = TableDefinition::new("key_to_id");
96
97impl super::Backend for TempDbBackend {
98    type Error = TempDbError;
99
100    fn insert_account(&self, account: AccountEntry) -> Result<Option<AccountEntry>, Self::Error> {
101        let write_tx = self.db.begin_write()?;
102        let old_value = {
103            let mut accounts = write_tx.open_table(ACCOUNTS_TABLE_DEF)?;
104            let old_value = accounts
105                .insert(u32::from(account.key_id), postcard::to_allocvec(&account)?)?
106                .map(|v| postcard::from_bytes::<AccountEntry>(&v.value()))
107                .transpose()?;
108
109            let mut address_to_id = write_tx.open_table(ADDRESS_TO_ID)?;
110            let mut key_to_id = write_tx.open_table(KEY_TO_ID)?;
111
112            // Remove old account entry references not to create stale mappings if keys changed
113            if let Some(old_entry) = &old_value {
114                let chain_addr: [u8; Address::SIZE] = old_entry.chain_addr.into();
115                let packet_addr: [u8; OffchainPublicKey::SIZE] = old_entry.public_key.into();
116                address_to_id.remove(&chain_addr)?;
117                key_to_id.remove(&packet_addr)?;
118            }
119
120            let chain_addr: [u8; Address::SIZE] = account.chain_addr.into();
121            address_to_id.insert(chain_addr, u32::from(account.key_id))?;
122
123            let packet_addr: [u8; OffchainPublicKey::SIZE] = account.public_key.into();
124            key_to_id.insert(packet_addr, u32::from(account.key_id))?;
125
126            old_value
127        };
128        write_tx.commit()?;
129
130        tracing::debug!(new = %account, old = ?old_value, "upserted account");
131        Ok(old_value)
132    }
133
134    fn insert_channel(&self, channel: ChannelEntry) -> Result<Option<ChannelEntry>, Self::Error> {
135        let write_tx = self.db.begin_write()?;
136        let old_value = {
137            let mut channels = write_tx.open_table(CHANNELS_TABLE_DEF)?;
138            let channel_id: [u8; ChannelId::SIZE] = channel.get_id().into();
139            channels
140                .insert(channel_id, postcard::to_allocvec(&channel)?)?
141                .map(|v| postcard::from_bytes::<ChannelEntry>(&v.value()))
142                .transpose()?
143        };
144        write_tx.commit()?;
145
146        tracing::debug!(new = %channel, old = ?old_value, "upserted channel");
147        Ok(old_value)
148    }
149
150    fn get_account_by_id(&self, id: &HoprKeyIdent) -> Result<Option<AccountEntry>, Self::Error> {
151        let read_tx = self.db.begin_read()?;
152        let accounts = read_tx.open_table(ACCOUNTS_TABLE_DEF)?;
153        accounts
154            .get(u32::from(*id))?
155            .map(|v| postcard::from_bytes::<AccountEntry>(&v.value()))
156            .transpose()
157            .map_err(TempDbError::from)
158    }
159
160    fn get_account_by_key(&self, key: &OffchainPublicKey) -> Result<Option<AccountEntry>, Self::Error> {
161        let read_tx = self.db.begin_read()?;
162        let keys_to_id = read_tx.open_table(KEY_TO_ID)?;
163        let packet_addr: [u8; OffchainPublicKey::SIZE] = (*key).into();
164        let Some(id) = keys_to_id.get(packet_addr)?.map(|v| v.value()) else {
165            return Ok(None);
166        };
167        let accounts = read_tx.open_table(ACCOUNTS_TABLE_DEF)?;
168        accounts
169            .get(id)?
170            .map(|v| postcard::from_bytes::<AccountEntry>(&v.value()))
171            .transpose()
172            .map_err(TempDbError::from)
173    }
174
175    fn get_account_by_address(&self, chain_key: &Address) -> Result<Option<AccountEntry>, Self::Error> {
176        let read_tx = self.db.begin_read()?;
177        let address_to_id = read_tx.open_table(ADDRESS_TO_ID)?;
178        let chain_key: [u8; Address::SIZE] = (*chain_key).into();
179        let Some(id) = address_to_id.get(chain_key)?.map(|v| v.value()) else {
180            return Ok(None);
181        };
182        let accounts = read_tx.open_table(ACCOUNTS_TABLE_DEF)?;
183        accounts
184            .get(id)?
185            .map(|v| postcard::from_bytes::<AccountEntry>(&v.value()))
186            .transpose()
187            .map_err(TempDbError::from)
188    }
189
190    fn get_channel_by_id(&self, id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error> {
191        let read_tx = self.db.begin_read()?;
192        let channels = read_tx.open_table(CHANNELS_TABLE_DEF)?;
193        let id: [u8; ChannelId::SIZE] = (*id).into();
194        channels
195            .get(id)?
196            .map(|v| postcard::from_bytes::<ChannelEntry>(&v.value()))
197            .transpose()
198            .map_err(TempDbError::from)
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use hopr_api::types::{
205        crypto::keypairs::{ChainKeypair, Keypair, OffchainKeypair},
206        internal::{
207            account::AccountType,
208            channels::{ChannelStatus, generate_channel_id},
209        },
210        primitive::balance::HoprBalance,
211    };
212
213    use super::*;
214    use crate::{Backend, backend::tests::test_backend};
215
216    #[test]
217    fn test_tempdb() -> anyhow::Result<()> {
218        let backend = TempDbBackend::new()?;
219        test_backend(backend)
220    }
221
222    #[test]
223    fn upsert_cleans_up_stale_index_entries() -> anyhow::Result<()> {
224        let backend = TempDbBackend::new()?;
225
226        let kp_a = OffchainKeypair::random();
227        let cp = ChainKeypair::random();
228
229        let account = AccountEntry {
230            public_key: (*kp_a.public()),
231            chain_addr: cp.public().to_address(),
232            entry_type: AccountType::NotAnnounced,
233            safe_address: None,
234            key_id: 1.into(),
235        };
236        backend.insert_account(account)?;
237
238        // Update same ID with a different offchain key
239        let kp_b = OffchainKeypair::random();
240        let updated = AccountEntry {
241            public_key: (*kp_b.public()),
242            chain_addr: cp.public().to_address(),
243            entry_type: AccountType::NotAnnounced,
244            safe_address: None,
245            key_id: 1.into(),
246        };
247        backend.insert_account(updated.clone())?;
248
249        // Old key should no longer resolve
250        assert!(backend.get_account_by_key(kp_a.public())?.is_none());
251        // New key should resolve
252        assert_eq!(backend.get_account_by_key(kp_b.public())?, Some(updated));
253
254        Ok(())
255    }
256
257    #[test]
258    fn upsert_cleans_up_stale_address_entries() -> anyhow::Result<()> {
259        let backend = TempDbBackend::new()?;
260
261        let kp = OffchainKeypair::random();
262        let cp_a = ChainKeypair::random();
263
264        let account = AccountEntry {
265            public_key: (*kp.public()),
266            chain_addr: cp_a.public().to_address(),
267            entry_type: AccountType::NotAnnounced,
268            safe_address: None,
269            key_id: 1.into(),
270        };
271        backend.insert_account(account)?;
272
273        // Update same ID with a different chain address
274        let cp_b = ChainKeypair::random();
275        let updated = AccountEntry {
276            public_key: (*kp.public()),
277            chain_addr: cp_b.public().to_address(),
278            entry_type: AccountType::NotAnnounced,
279            safe_address: None,
280            key_id: 1.into(),
281        };
282        backend.insert_account(updated.clone())?;
283
284        // Old address should no longer resolve
285        assert!(backend.get_account_by_address(&cp_a.public().to_address())?.is_none());
286        // New address should resolve
287        assert_eq!(
288            backend.get_account_by_address(&cp_b.public().to_address())?,
289            Some(updated)
290        );
291
292        Ok(())
293    }
294
295    #[test]
296    fn channel_upsert_returns_old_value() -> anyhow::Result<()> {
297        let backend = TempDbBackend::new()?;
298
299        let src = Address::new(&[1u8; 20]);
300        let dst = Address::new(&[2u8; 20]);
301
302        let channel_v1 = ChannelEntry::builder()
303            .between(src, dst)
304            .balance(HoprBalance::new_base(100))
305            .ticket_index(1)
306            .epoch(1)
307            .status(ChannelStatus::Open)
308            .build()?;
309
310        let first_insert = backend.insert_channel(channel_v1)?;
311        assert!(first_insert.is_none(), "first insert should return None");
312
313        let channel_v2 = ChannelEntry::builder()
314            .between(src, dst)
315            .balance(HoprBalance::new_base(200))
316            .ticket_index(2)
317            .status(ChannelStatus::Open)
318            .epoch(2)
319            .build()?;
320
321        let second_insert = backend.insert_channel(channel_v2)?;
322        assert_eq!(second_insert, Some(channel_v1), "second insert should return old value");
323
324        Ok(())
325    }
326
327    #[test]
328    fn lookup_nonexistent_returns_none() -> anyhow::Result<()> {
329        let backend = TempDbBackend::new()?;
330
331        let kp = OffchainKeypair::random();
332        let cp = ChainKeypair::random();
333        let channel_id = generate_channel_id(&Address::new(&[1u8; 20]), &Address::new(&[2u8; 20]));
334
335        assert!(backend.get_account_by_id(&42.into())?.is_none());
336        assert!(backend.get_account_by_key(kp.public())?.is_none());
337        assert!(backend.get_account_by_address(&cp.public().to_address())?.is_none());
338        assert!(backend.get_channel_by_id(&channel_id)?.is_none());
339
340        Ok(())
341    }
342
343    #[test]
344    fn index_lookup_uses_single_transaction() -> anyhow::Result<()> {
345        let backend = TempDbBackend::new()?;
346
347        let kp = OffchainKeypair::random();
348        let cp = ChainKeypair::random();
349
350        let account = AccountEntry {
351            public_key: (*kp.public()),
352            chain_addr: cp.public().to_address(),
353            entry_type: AccountType::NotAnnounced,
354            safe_address: None,
355            key_id: 5.into(),
356        };
357        backend.insert_account(account.clone())?;
358
359        // Verify lookups via both index paths return the correct account
360        let by_key = backend.get_account_by_key(kp.public())?;
361        assert_eq!(by_key, Some(account.clone()));
362
363        let by_addr = backend.get_account_by_address(&cp.public().to_address())?;
364        assert_eq!(by_addr, Some(account));
365
366        Ok(())
367    }
368}