hopr_transport_bloom/
persistent.rs1use std::sync::Arc;
2
3use async_lock::RwLock;
4use hopr_crypto_types::types::PacketTag;
5use hopr_platform::file::native::{read_file, write};
6use tracing::{debug, error, info};
7
8use crate::raw::TagBloomFilter;
9
10#[derive(Debug, Clone)]
11pub struct WrappedTagBloomFilter {
12 path: String,
13 tbf: Arc<RwLock<TagBloomFilter>>,
14}
15
16impl WrappedTagBloomFilter {
17 const TAGBLOOM_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
18 .with_little_endian()
19 .with_variable_int_encoding();
20
21 pub fn new(path: String) -> Self {
22 let tbf = read_file(&path)
23 .and_then(|data| {
24 debug!(path = &path, "Found and loading a tag Bloom filter");
25 bincode::serde::decode_from_slice(&data, Self::TAGBLOOM_BINCODE_CONFIGURATION)
26 .map(|(f, _)| f)
27 .map_err(|e| hopr_platform::error::PlatformError::GeneralError(e.to_string()))
28 })
29 .unwrap_or_else(|_| {
30 debug!(path = &path, "No tag Bloom filter found, using empty");
31 TagBloomFilter::default()
32 });
33
34 Self {
35 path,
36 tbf: Arc::new(RwLock::new(tbf)),
37 }
38 }
39
40 #[tracing::instrument(level = "trace", skip(self, tag))]
44 pub async fn is_tag_replay(&self, tag: &PacketTag) -> bool {
45 self.with_write_lock(|inner: &mut TagBloomFilter| inner.check_and_set(tag))
46 .await
47 }
48
49 pub async fn with_write_lock<T>(&self, f: impl FnOnce(&mut TagBloomFilter) -> T) -> T {
50 let mut tbf = self.tbf.write_arc().await;
51 f(&mut tbf)
52 }
53
54 pub async fn save(&self) {
55 let bloom = self.tbf.read_arc().await.clone(); if let Err(e) = bincode::serde::encode_to_vec(&bloom, Self::TAGBLOOM_BINCODE_CONFIGURATION)
58 .map_err(|e| hopr_platform::error::PlatformError::GeneralError(e.to_string()))
59 .and_then(|d| write(&self.path, &d))
60 {
61 error!(error = %e, "Tag Bloom filter save failed")
62 } else {
63 info!("Tag Bloom filter saved successfully")
64 };
65 }
66}