hopr_transport_protocol/
bloom.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
use std::sync::Arc;

use async_lock::RwLock;
use hopr_internal_types::protocol::TagBloomFilter;
use hopr_platform::file::native::{read_file, write};
use tracing::{debug, error, info};

#[derive(Debug, Clone)]
pub struct WrappedTagBloomFilter {
    path: String,
    tbf: Arc<RwLock<TagBloomFilter>>,
}

impl WrappedTagBloomFilter {
    pub fn new(path: String) -> Self {
        let tbf = read_file(&path)
            .and_then(|data| {
                debug!(path = &path, "Found and loading a tag Bloom filter");
                TagBloomFilter::from_bytes(&data)
                    .map_err(|e| hopr_platform::error::PlatformError::GeneralError(e.to_string()))
            })
            .unwrap_or_else(|_| {
                debug!(path = &path, "No tag Bloom filter found, using empty");
                TagBloomFilter::default()
            });

        Self {
            path,
            tbf: Arc::new(RwLock::new(tbf)),
        }
    }

    pub async fn with_write_lock<T>(&self, f: impl FnOnce(&mut TagBloomFilter) -> T) -> T {
        let mut tbf = self.tbf.write().await;
        f(&mut tbf)
    }

    pub async fn save(&self) {
        let bloom = self.tbf.read().await.clone(); // Clone to immediately release the lock

        if let Err(e) = write(&self.path, bloom.to_bytes()) {
            error!(erorr = %e, "Tag Bloom filter save failed")
        } else {
            info!("Tag Bloom filter saved successfully")
        };
    }
}