hopr_chain_connector/backend/
tempdb.rs1use hopr_api::chain::HoprKeyIdent;
2use hopr_crypto_types::prelude::OffchainPublicKey;
3use hopr_internal_types::{
4 account::AccountEntry,
5 channels::{ChannelEntry, ChannelId},
6};
7use hopr_primitive_types::prelude::{Address, BytesRepresentable};
8use redb::{ReadableDatabase, TableDefinition};
9
10#[derive(Clone)]
14pub struct TempDbBackend {
15 db: std::sync::Arc<redb::Database>,
16 _tmp: std::sync::Arc<tempfile::NamedTempFile>,
17}
18
19impl TempDbBackend {
20 pub fn new() -> Result<Self, std::io::Error> {
21 let file = tempfile::NamedTempFile::new().map_err(std::io::Error::other)?;
22
23 Ok(Self {
24 db: std::sync::Arc::new(redb::Database::create(file.path()).map_err(std::io::Error::other)?),
25 _tmp: std::sync::Arc::new(file),
26 })
27 }
28}
29
30const ACCOUNTS_TABLE_DEF: TableDefinition<u32, Vec<u8>> = TableDefinition::new("id_accounts");
31const CHANNELS_TABLE_DEF: TableDefinition<[u8; ChannelId::SIZE], Vec<u8>> = TableDefinition::new("id_channels");
32const ADDRESS_TO_ID: TableDefinition<[u8; Address::SIZE], u32> = TableDefinition::new("address_to_id");
33const KEY_TO_ID: TableDefinition<[u8; OffchainPublicKey::SIZE], u32> = TableDefinition::new("key_to_id");
34
35const BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
36 .with_little_endian()
37 .with_variable_int_encoding();
38
39impl super::Backend for TempDbBackend {
40 type Error = redb::Error;
41
42 fn insert_account(&self, account: AccountEntry) -> Result<Option<AccountEntry>, Self::Error> {
43 let write_tx = self.db.begin_write()?;
44 let old_value = {
45 let mut accounts = write_tx.open_table(ACCOUNTS_TABLE_DEF)?;
46 let old_value = accounts
47 .insert(
48 u32::from(account.key_id),
49 bincode::serde::encode_to_vec(&account, BINCODE_CONFIGURATION)
50 .map_err(|e| redb::Error::Corrupted(format!("account encoding failed: {e}")))?,
51 )?
52 .map(|v| {
53 bincode::serde::decode_from_slice::<AccountEntry, _>(&v.value(), BINCODE_CONFIGURATION).map(|v| v.0)
54 })
55 .transpose()
56 .map_err(|e| redb::Error::Corrupted(format!("account decoding failed: {e}")))?;
57
58 let mut address_to_id = write_tx.open_table(ADDRESS_TO_ID)?;
59 let mut key_to_id = write_tx.open_table(KEY_TO_ID)?;
60
61 if let Some(old_entry) = &old_value {
63 let chain_addr: [u8; Address::SIZE] = old_entry.chain_addr.into();
64 let packet_addr: [u8; OffchainPublicKey::SIZE] = old_entry.public_key.into();
65 address_to_id.remove(&chain_addr)?;
66 key_to_id.remove(&packet_addr)?;
67 }
68
69 let chain_addr: [u8; Address::SIZE] = account.chain_addr.into();
70 address_to_id.insert(chain_addr, u32::from(account.key_id))?;
71
72 let packet_addr: [u8; OffchainPublicKey::SIZE] = account.public_key.into();
73 key_to_id.insert(packet_addr, u32::from(account.key_id))?;
74
75 old_value
76 };
77 write_tx.commit()?;
78
79 tracing::debug!(new = %account, old = ?old_value, "upserted account");
80 Ok(old_value)
81 }
82
83 fn insert_channel(&self, channel: ChannelEntry) -> Result<Option<ChannelEntry>, Self::Error> {
84 let write_tx = self.db.begin_write()?;
85 let old_value = {
86 let mut channels = write_tx.open_table(CHANNELS_TABLE_DEF)?;
87 let channel_id: [u8; ChannelId::SIZE] = channel.get_id().into();
88 channels
89 .insert(
90 channel_id,
91 bincode::serde::encode_to_vec(channel, BINCODE_CONFIGURATION)
92 .map_err(|e| redb::Error::Corrupted(format!("channel encoding failed: {e}")))?,
93 )?
94 .map(|v| {
95 bincode::serde::decode_from_slice::<ChannelEntry, _>(&v.value(), BINCODE_CONFIGURATION).map(|v| v.0)
96 })
97 .transpose()
98 .map_err(|e| redb::Error::Corrupted(format!("channel decoding failed: {e}")))?
99 };
100 write_tx.commit()?;
101
102 tracing::debug!(new = %channel, old = ?old_value, "upserted channel");
103 Ok(old_value)
104 }
105
106 fn get_account_by_id(&self, id: &HoprKeyIdent) -> Result<Option<AccountEntry>, Self::Error> {
107 let read_tx = self.db.begin_read()?;
108 let accounts = read_tx.open_table(ACCOUNTS_TABLE_DEF)?;
109 accounts
110 .get(u32::from(*id))?
111 .map(|v| {
112 bincode::serde::decode_from_slice::<AccountEntry, _>(&v.value(), BINCODE_CONFIGURATION).map(|v| v.0)
113 })
114 .transpose()
115 .map_err(|e| redb::Error::Corrupted(format!("account decoding failed: {e}")))
116 }
117
118 fn get_account_by_key(&self, key: &OffchainPublicKey) -> Result<Option<AccountEntry>, Self::Error> {
119 let id = {
120 let read_tx = self.db.begin_read()?;
121 let keys_to_id = read_tx.open_table(KEY_TO_ID)?;
122 let packet_addr: [u8; OffchainPublicKey::SIZE] = (*key).into();
123 let Some(id) = keys_to_id.get(packet_addr)?.map(|v| v.value()) else {
124 return Ok(None);
125 };
126 id
127 };
128
129 self.get_account_by_id(&id.into())
130 }
131
132 fn get_account_by_address(&self, chain_key: &Address) -> Result<Option<AccountEntry>, Self::Error> {
133 let id = {
134 let read_tx = self.db.begin_read()?;
135 let address_to_id = read_tx.open_table(ADDRESS_TO_ID)?;
136 let chain_key: [u8; Address::SIZE] = (*chain_key).into();
137 let Some(id) = address_to_id.get(chain_key)?.map(|v| v.value()) else {
138 return Ok(None);
139 };
140 id
141 };
142
143 self.get_account_by_id(&id.into())
144 }
145
146 fn get_channel_by_id(&self, id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error> {
147 let read_tx = self.db.begin_read()?;
148 let accounts = read_tx.open_table(CHANNELS_TABLE_DEF)?;
149 let id: [u8; ChannelId::SIZE] = (*id).into();
150 accounts
151 .get(id)?
152 .map(|v| {
153 bincode::serde::decode_from_slice::<ChannelEntry, _>(&v.value(), BINCODE_CONFIGURATION).map(|v| v.0)
154 })
155 .transpose()
156 .map_err(|e| redb::Error::Corrupted(format!("channel decoding failed: {e}")))
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use crate::backend::tests::test_backend;
164
165 #[test]
166 fn test_tempdb() -> anyhow::Result<()> {
167 let backend = TempDbBackend::new()?;
168 test_backend(backend)
169 }
170}