hopr_chain_connector/backend/
tempdb.rs

1use hopr_api::chain::HoprKeyIdent;
2use hopr_crypto_types::prelude::OffchainPublicKey;
3use hopr_internal_types::{
4    account::AccountEntry,
5    channels::{ChannelEntry, ChannelId},
6};
7use hopr_primitive_types::prelude::{Address, BytesRepresentable};
8use redb::{ReadableDatabase, TableDefinition};
9
10/// A backend that is implemented via [`redb`](https://docs.rs/redb/latest/redb/) database stored in a temporary file.
11///
12/// The database file is dropped once the last instance is dropped.
13#[derive(Clone)]
14pub struct TempDbBackend {
15    db: std::sync::Arc<redb::Database>,
16    _tmp: std::sync::Arc<tempfile::NamedTempFile>,
17}
18
19impl TempDbBackend {
20    pub fn new() -> Result<Self, std::io::Error> {
21        let file = tempfile::NamedTempFile::new().map_err(std::io::Error::other)?;
22
23        Ok(Self {
24            db: std::sync::Arc::new(redb::Database::create(file.path()).map_err(std::io::Error::other)?),
25            _tmp: std::sync::Arc::new(file),
26        })
27    }
28}
29
30const ACCOUNTS_TABLE_DEF: TableDefinition<u32, Vec<u8>> = TableDefinition::new("id_accounts");
31const CHANNELS_TABLE_DEF: TableDefinition<[u8; ChannelId::SIZE], Vec<u8>> = TableDefinition::new("id_channels");
32const ADDRESS_TO_ID: TableDefinition<[u8; Address::SIZE], u32> = TableDefinition::new("address_to_id");
33const KEY_TO_ID: TableDefinition<[u8; OffchainPublicKey::SIZE], u32> = TableDefinition::new("key_to_id");
34
35const BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
36    .with_little_endian()
37    .with_variable_int_encoding();
38
39impl super::Backend for TempDbBackend {
40    type Error = redb::Error;
41
42    fn insert_account(&self, account: AccountEntry) -> Result<Option<AccountEntry>, Self::Error> {
43        let write_tx = self.db.begin_write()?;
44        let old_value = {
45            let mut accounts = write_tx.open_table(ACCOUNTS_TABLE_DEF)?;
46            let old_value = accounts
47                .insert(
48                    u32::from(account.key_id),
49                    bincode::serde::encode_to_vec(&account, BINCODE_CONFIGURATION)
50                        .map_err(|e| redb::Error::Corrupted(format!("account encoding failed: {e}")))?,
51                )?
52                .map(|v| {
53                    bincode::serde::decode_from_slice::<AccountEntry, _>(&v.value(), BINCODE_CONFIGURATION).map(|v| v.0)
54                })
55                .transpose()
56                .map_err(|e| redb::Error::Corrupted(format!("account decoding failed: {e}")))?;
57
58            let mut address_to_id = write_tx.open_table(ADDRESS_TO_ID)?;
59            let mut key_to_id = write_tx.open_table(KEY_TO_ID)?;
60
61            // Remove old account entry references not to create stale mappings if keys changed
62            if let Some(old_entry) = &old_value {
63                let chain_addr: [u8; Address::SIZE] = old_entry.chain_addr.into();
64                let packet_addr: [u8; OffchainPublicKey::SIZE] = old_entry.public_key.into();
65                address_to_id.remove(&chain_addr)?;
66                key_to_id.remove(&packet_addr)?;
67            }
68
69            let chain_addr: [u8; Address::SIZE] = account.chain_addr.into();
70            address_to_id.insert(chain_addr, u32::from(account.key_id))?;
71
72            let packet_addr: [u8; OffchainPublicKey::SIZE] = account.public_key.into();
73            key_to_id.insert(packet_addr, u32::from(account.key_id))?;
74
75            old_value
76        };
77        write_tx.commit()?;
78
79        tracing::debug!(new = %account, old = ?old_value, "upserted account");
80        Ok(old_value)
81    }
82
83    fn insert_channel(&self, channel: ChannelEntry) -> Result<Option<ChannelEntry>, Self::Error> {
84        let write_tx = self.db.begin_write()?;
85        let old_value = {
86            let mut channels = write_tx.open_table(CHANNELS_TABLE_DEF)?;
87            let channel_id: [u8; ChannelId::SIZE] = channel.get_id().into();
88            channels
89                .insert(
90                    channel_id,
91                    bincode::serde::encode_to_vec(channel, BINCODE_CONFIGURATION)
92                        .map_err(|e| redb::Error::Corrupted(format!("channel encoding failed: {e}")))?,
93                )?
94                .map(|v| {
95                    bincode::serde::decode_from_slice::<ChannelEntry, _>(&v.value(), BINCODE_CONFIGURATION).map(|v| v.0)
96                })
97                .transpose()
98                .map_err(|e| redb::Error::Corrupted(format!("channel decoding failed: {e}")))?
99        };
100        write_tx.commit()?;
101
102        tracing::debug!(new = %channel, old = ?old_value, "upserted channel");
103        Ok(old_value)
104    }
105
106    fn get_account_by_id(&self, id: &HoprKeyIdent) -> Result<Option<AccountEntry>, Self::Error> {
107        let read_tx = self.db.begin_read()?;
108        let accounts = read_tx.open_table(ACCOUNTS_TABLE_DEF)?;
109        accounts
110            .get(u32::from(*id))?
111            .map(|v| {
112                bincode::serde::decode_from_slice::<AccountEntry, _>(&v.value(), BINCODE_CONFIGURATION).map(|v| v.0)
113            })
114            .transpose()
115            .map_err(|e| redb::Error::Corrupted(format!("account decoding failed: {e}")))
116    }
117
118    fn get_account_by_key(&self, key: &OffchainPublicKey) -> Result<Option<AccountEntry>, Self::Error> {
119        let id = {
120            let read_tx = self.db.begin_read()?;
121            let keys_to_id = read_tx.open_table(KEY_TO_ID)?;
122            let packet_addr: [u8; OffchainPublicKey::SIZE] = (*key).into();
123            let Some(id) = keys_to_id.get(packet_addr)?.map(|v| v.value()) else {
124                return Ok(None);
125            };
126            id
127        };
128
129        self.get_account_by_id(&id.into())
130    }
131
132    fn get_account_by_address(&self, chain_key: &Address) -> Result<Option<AccountEntry>, Self::Error> {
133        let id = {
134            let read_tx = self.db.begin_read()?;
135            let address_to_id = read_tx.open_table(ADDRESS_TO_ID)?;
136            let chain_key: [u8; Address::SIZE] = (*chain_key).into();
137            let Some(id) = address_to_id.get(chain_key)?.map(|v| v.value()) else {
138                return Ok(None);
139            };
140            id
141        };
142
143        self.get_account_by_id(&id.into())
144    }
145
146    fn get_channel_by_id(&self, id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error> {
147        let read_tx = self.db.begin_read()?;
148        let accounts = read_tx.open_table(CHANNELS_TABLE_DEF)?;
149        let id: [u8; ChannelId::SIZE] = (*id).into();
150        accounts
151            .get(id)?
152            .map(|v| {
153                bincode::serde::decode_from_slice::<ChannelEntry, _>(&v.value(), BINCODE_CONFIGURATION).map(|v| v.0)
154            })
155            .transpose()
156            .map_err(|e| redb::Error::Corrupted(format!("channel decoding failed: {e}")))
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use crate::backend::tests::test_backend;
164
165    #[test]
166    fn test_tempdb() -> anyhow::Result<()> {
167        let backend = TempDbBackend::new()?;
168        test_backend(backend)
169    }
170}