hopr_chain_connector/backend/
tempdb.rs1use hopr_api::{
2 chain::HoprKeyIdent,
3 types::{
4 crypto::prelude::OffchainPublicKey,
5 internal::{
6 account::AccountEntry,
7 channels::{ChannelEntry, ChannelId},
8 },
9 primitive::prelude::{Address, BytesRepresentable},
10 },
11};
12use redb::{ReadableDatabase, TableDefinition};
13
14#[derive(Debug, thiserror::Error)]
16pub enum TempDbError {
17 #[error("database error: {0}")]
18 Database(#[from] redb::Error),
19 #[error("serialization error: {0}")]
20 Serialization(#[from] postcard::Error),
21 #[error("I/O error: {0}")]
22 Io(#[from] std::io::Error),
23}
24
25impl From<redb::DatabaseError> for TempDbError {
26 fn from(error: redb::DatabaseError) -> Self {
27 Self::Database(error.into())
28 }
29}
30
31impl From<redb::TransactionError> for TempDbError {
32 fn from(error: redb::TransactionError) -> Self {
33 Self::Database(error.into())
34 }
35}
36
37impl From<redb::TableError> for TempDbError {
38 fn from(error: redb::TableError) -> Self {
39 Self::Database(error.into())
40 }
41}
42
43impl From<redb::StorageError> for TempDbError {
44 fn from(error: redb::StorageError) -> Self {
45 Self::Database(error.into())
46 }
47}
48
49impl From<redb::CommitError> for TempDbError {
50 fn from(error: redb::CommitError) -> Self {
51 Self::Database(error.into())
52 }
53}
54
55#[derive(Clone)]
59pub struct TempDbBackend {
60 db: std::sync::Arc<redb::Database>,
63 _tmp: std::sync::Arc<tempfile::NamedTempFile>,
64}
65
66impl TempDbBackend {
67 pub fn new() -> Result<Self, TempDbError> {
68 let file = tempfile::NamedTempFile::new()?;
69
70 tracing::info!(path = %file.path().display(), "opened temporary redb database");
71
72 let db = redb::Database::create(file.path())?;
73
74 {
77 let write_tx = db.begin_write()?;
78 write_tx.open_table(ACCOUNTS_TABLE_DEF)?;
79 write_tx.open_table(CHANNELS_TABLE_DEF)?;
80 write_tx.open_table(ADDRESS_TO_ID)?;
81 write_tx.open_table(KEY_TO_ID)?;
82 write_tx.commit()?;
83 }
84
85 Ok(Self {
86 db: std::sync::Arc::new(db),
87 _tmp: std::sync::Arc::new(file),
88 })
89 }
90}
91
92const ACCOUNTS_TABLE_DEF: TableDefinition<u32, Vec<u8>> = TableDefinition::new("id_accounts");
93const CHANNELS_TABLE_DEF: TableDefinition<[u8; ChannelId::SIZE], Vec<u8>> = TableDefinition::new("id_channels");
94const ADDRESS_TO_ID: TableDefinition<[u8; Address::SIZE], u32> = TableDefinition::new("address_to_id");
95const KEY_TO_ID: TableDefinition<[u8; OffchainPublicKey::SIZE], u32> = TableDefinition::new("key_to_id");
96
97impl super::Backend for TempDbBackend {
98 type Error = TempDbError;
99
100 fn insert_account(&self, account: AccountEntry) -> Result<Option<AccountEntry>, Self::Error> {
101 let write_tx = self.db.begin_write()?;
102 let old_value = {
103 let mut accounts = write_tx.open_table(ACCOUNTS_TABLE_DEF)?;
104 let old_value = accounts
105 .insert(u32::from(account.key_id), postcard::to_allocvec(&account)?)?
106 .map(|v| postcard::from_bytes::<AccountEntry>(&v.value()))
107 .transpose()?;
108
109 let mut address_to_id = write_tx.open_table(ADDRESS_TO_ID)?;
110 let mut key_to_id = write_tx.open_table(KEY_TO_ID)?;
111
112 if let Some(old_entry) = &old_value {
114 let chain_addr: [u8; Address::SIZE] = old_entry.chain_addr.into();
115 let packet_addr: [u8; OffchainPublicKey::SIZE] = old_entry.public_key.into();
116 address_to_id.remove(&chain_addr)?;
117 key_to_id.remove(&packet_addr)?;
118 }
119
120 let chain_addr: [u8; Address::SIZE] = account.chain_addr.into();
121 address_to_id.insert(chain_addr, u32::from(account.key_id))?;
122
123 let packet_addr: [u8; OffchainPublicKey::SIZE] = account.public_key.into();
124 key_to_id.insert(packet_addr, u32::from(account.key_id))?;
125
126 old_value
127 };
128 write_tx.commit()?;
129
130 tracing::debug!(new = %account, old = ?old_value, "upserted account");
131 Ok(old_value)
132 }
133
134 fn insert_channel(&self, channel: ChannelEntry) -> Result<Option<ChannelEntry>, Self::Error> {
135 let write_tx = self.db.begin_write()?;
136 let old_value = {
137 let mut channels = write_tx.open_table(CHANNELS_TABLE_DEF)?;
138 let channel_id: [u8; ChannelId::SIZE] = channel.get_id().into();
139 channels
140 .insert(channel_id, postcard::to_allocvec(&channel)?)?
141 .map(|v| postcard::from_bytes::<ChannelEntry>(&v.value()))
142 .transpose()?
143 };
144 write_tx.commit()?;
145
146 tracing::debug!(new = %channel, old = ?old_value, "upserted channel");
147 Ok(old_value)
148 }
149
150 fn get_account_by_id(&self, id: &HoprKeyIdent) -> Result<Option<AccountEntry>, Self::Error> {
151 let read_tx = self.db.begin_read()?;
152 let accounts = read_tx.open_table(ACCOUNTS_TABLE_DEF)?;
153 accounts
154 .get(u32::from(*id))?
155 .map(|v| postcard::from_bytes::<AccountEntry>(&v.value()))
156 .transpose()
157 .map_err(TempDbError::from)
158 }
159
160 fn get_account_by_key(&self, key: &OffchainPublicKey) -> Result<Option<AccountEntry>, Self::Error> {
161 let read_tx = self.db.begin_read()?;
162 let keys_to_id = read_tx.open_table(KEY_TO_ID)?;
163 let packet_addr: [u8; OffchainPublicKey::SIZE] = (*key).into();
164 let Some(id) = keys_to_id.get(packet_addr)?.map(|v| v.value()) else {
165 return Ok(None);
166 };
167 let accounts = read_tx.open_table(ACCOUNTS_TABLE_DEF)?;
168 accounts
169 .get(id)?
170 .map(|v| postcard::from_bytes::<AccountEntry>(&v.value()))
171 .transpose()
172 .map_err(TempDbError::from)
173 }
174
175 fn get_account_by_address(&self, chain_key: &Address) -> Result<Option<AccountEntry>, Self::Error> {
176 let read_tx = self.db.begin_read()?;
177 let address_to_id = read_tx.open_table(ADDRESS_TO_ID)?;
178 let chain_key: [u8; Address::SIZE] = (*chain_key).into();
179 let Some(id) = address_to_id.get(chain_key)?.map(|v| v.value()) else {
180 return Ok(None);
181 };
182 let accounts = read_tx.open_table(ACCOUNTS_TABLE_DEF)?;
183 accounts
184 .get(id)?
185 .map(|v| postcard::from_bytes::<AccountEntry>(&v.value()))
186 .transpose()
187 .map_err(TempDbError::from)
188 }
189
190 fn get_channel_by_id(&self, id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error> {
191 let read_tx = self.db.begin_read()?;
192 let channels = read_tx.open_table(CHANNELS_TABLE_DEF)?;
193 let id: [u8; ChannelId::SIZE] = (*id).into();
194 channels
195 .get(id)?
196 .map(|v| postcard::from_bytes::<ChannelEntry>(&v.value()))
197 .transpose()
198 .map_err(TempDbError::from)
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use hopr_api::types::{
205 crypto::keypairs::{ChainKeypair, Keypair, OffchainKeypair},
206 internal::{
207 account::AccountType,
208 channels::{ChannelStatus, generate_channel_id},
209 },
210 primitive::balance::HoprBalance,
211 };
212
213 use super::*;
214 use crate::{Backend, backend::tests::test_backend};
215
216 #[test]
217 fn test_tempdb() -> anyhow::Result<()> {
218 let backend = TempDbBackend::new()?;
219 test_backend(backend)
220 }
221
222 #[test]
223 fn upsert_cleans_up_stale_index_entries() -> anyhow::Result<()> {
224 let backend = TempDbBackend::new()?;
225
226 let kp_a = OffchainKeypair::random();
227 let cp = ChainKeypair::random();
228
229 let account = AccountEntry {
230 public_key: (*kp_a.public()),
231 chain_addr: cp.public().to_address(),
232 entry_type: AccountType::NotAnnounced,
233 safe_address: None,
234 key_id: 1.into(),
235 };
236 backend.insert_account(account)?;
237
238 let kp_b = OffchainKeypair::random();
240 let updated = AccountEntry {
241 public_key: (*kp_b.public()),
242 chain_addr: cp.public().to_address(),
243 entry_type: AccountType::NotAnnounced,
244 safe_address: None,
245 key_id: 1.into(),
246 };
247 backend.insert_account(updated.clone())?;
248
249 assert!(backend.get_account_by_key(kp_a.public())?.is_none());
251 assert_eq!(backend.get_account_by_key(kp_b.public())?, Some(updated));
253
254 Ok(())
255 }
256
257 #[test]
258 fn upsert_cleans_up_stale_address_entries() -> anyhow::Result<()> {
259 let backend = TempDbBackend::new()?;
260
261 let kp = OffchainKeypair::random();
262 let cp_a = ChainKeypair::random();
263
264 let account = AccountEntry {
265 public_key: (*kp.public()),
266 chain_addr: cp_a.public().to_address(),
267 entry_type: AccountType::NotAnnounced,
268 safe_address: None,
269 key_id: 1.into(),
270 };
271 backend.insert_account(account)?;
272
273 let cp_b = ChainKeypair::random();
275 let updated = AccountEntry {
276 public_key: (*kp.public()),
277 chain_addr: cp_b.public().to_address(),
278 entry_type: AccountType::NotAnnounced,
279 safe_address: None,
280 key_id: 1.into(),
281 };
282 backend.insert_account(updated.clone())?;
283
284 assert!(backend.get_account_by_address(&cp_a.public().to_address())?.is_none());
286 assert_eq!(
288 backend.get_account_by_address(&cp_b.public().to_address())?,
289 Some(updated)
290 );
291
292 Ok(())
293 }
294
295 #[test]
296 fn channel_upsert_returns_old_value() -> anyhow::Result<()> {
297 let backend = TempDbBackend::new()?;
298
299 let src = Address::new(&[1u8; 20]);
300 let dst = Address::new(&[2u8; 20]);
301
302 let channel_v1 = ChannelEntry::builder()
303 .between(src, dst)
304 .balance(HoprBalance::new_base(100))
305 .ticket_index(1)
306 .epoch(1)
307 .status(ChannelStatus::Open)
308 .build()?;
309
310 let first_insert = backend.insert_channel(channel_v1)?;
311 assert!(first_insert.is_none(), "first insert should return None");
312
313 let channel_v2 = ChannelEntry::builder()
314 .between(src, dst)
315 .balance(HoprBalance::new_base(200))
316 .ticket_index(2)
317 .status(ChannelStatus::Open)
318 .epoch(2)
319 .build()?;
320
321 let second_insert = backend.insert_channel(channel_v2)?;
322 assert_eq!(second_insert, Some(channel_v1), "second insert should return old value");
323
324 Ok(())
325 }
326
327 #[test]
328 fn lookup_nonexistent_returns_none() -> anyhow::Result<()> {
329 let backend = TempDbBackend::new()?;
330
331 let kp = OffchainKeypair::random();
332 let cp = ChainKeypair::random();
333 let channel_id = generate_channel_id(&Address::new(&[1u8; 20]), &Address::new(&[2u8; 20]));
334
335 assert!(backend.get_account_by_id(&42.into())?.is_none());
336 assert!(backend.get_account_by_key(kp.public())?.is_none());
337 assert!(backend.get_account_by_address(&cp.public().to_address())?.is_none());
338 assert!(backend.get_channel_by_id(&channel_id)?.is_none());
339
340 Ok(())
341 }
342
343 #[test]
344 fn index_lookup_uses_single_transaction() -> anyhow::Result<()> {
345 let backend = TempDbBackend::new()?;
346
347 let kp = OffchainKeypair::random();
348 let cp = ChainKeypair::random();
349
350 let account = AccountEntry {
351 public_key: (*kp.public()),
352 chain_addr: cp.public().to_address(),
353 entry_type: AccountType::NotAnnounced,
354 safe_address: None,
355 key_id: 5.into(),
356 };
357 backend.insert_account(account.clone())?;
358
359 let by_key = backend.get_account_by_key(kp.public())?;
361 assert_eq!(by_key, Some(account.clone()));
362
363 let by_addr = backend.get_account_by_address(&cp.public().to_address())?;
364 assert_eq!(by_addr, Some(account));
365
366 Ok(())
367 }
368}