hopr_transport_bloom/
raw.rs

1use bloomfilter::Bloom;
2use hopr_crypto_random::random_bytes;
3use hopr_crypto_types::types::PacketTag;
4
5/// Bloom filter for packet tags to detect packet replays.
6///
7/// In addition, this structure also holds the number of items in the filter
8/// to determine if the filter needs to be refreshed. Once this happens, packet replays
9/// of past packets might be possible.
10#[derive(Debug, Clone)]
11#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
12pub struct TagBloomFilter {
13    bloom: SerializableBloomWrapper,
14    count: usize,
15    capacity: usize,
16}
17
18#[derive(Debug, Clone)]
19struct SerializableBloomWrapper(Bloom<PacketTag>);
20
21#[cfg(feature = "serde")]
22impl serde::Serialize for SerializableBloomWrapper {
23    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
24    where
25        S: serde::Serializer,
26    {
27        bloomfilter::serialize(&self.0, serializer)
28    }
29}
30
31#[cfg(feature = "serde")]
32impl<'de> serde::Deserialize<'de> for SerializableBloomWrapper {
33    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
34    where
35        D: serde::Deserializer<'de>,
36    {
37        bloomfilter::deserialize(deserializer).map(Self)
38    }
39}
40
41impl TagBloomFilter {
42    // The default maximum number of packet tags this Bloom filter can hold.
43    // After these many packets, the Bloom filter resets and packet replays are possible.
44    const DEFAULT_MAX_ITEMS: usize = 10_000_000;
45    // Allowed false positive rate. This amounts to 0.001% chance
46    const FALSE_POSITIVE_RATE: f64 = 0.00001_f64;
47
48    /// Returns the current number of items in this Bloom filter.
49    pub fn count(&self) -> usize {
50        self.count
51    }
52
53    pub fn capacity(&self) -> usize {
54        self.capacity
55    }
56
57    /// Puts a packet tag into the Bloom filter
58    pub fn set(&mut self, tag: &PacketTag) {
59        if self.count == self.capacity {
60            tracing::warn!("maximum number of items in the Bloom filter reached!");
61            self.bloom.0.clear();
62            self.count = 0;
63        }
64
65        self.bloom.0.set(tag);
66        self.count += 1;
67    }
68
69    /// Check if the packet tag is in the Bloom filter.
70    /// False positives are possible.
71    pub fn check(&self, tag: &PacketTag) -> bool {
72        self.bloom.0.check(tag)
73    }
74
75    /// Checks and sets a packet tag (if not present) in a single operation.
76    pub fn check_and_set(&mut self, tag: &PacketTag) -> bool {
77        // If we're at full capacity, we do only "check" and conditionally reset with the new entry
78        if self.count == self.capacity {
79            let is_present = self.bloom.0.check(tag);
80            if !is_present {
81                // There cannot be false negatives, so we can reset the filter
82                tracing::warn!("maximum number of items in the Bloom filter reached!");
83                self.bloom.0.clear();
84                self.bloom.0.set(tag);
85                self.count = 1;
86            }
87            is_present
88        } else {
89            // If not at full capacity, we can do check_and_set
90            let was_present = self.bloom.0.check_and_set(tag);
91            if !was_present {
92                self.count += 1;
93            }
94            was_present
95        }
96    }
97
98    fn with_capacity(size: usize) -> Self {
99        Self {
100            bloom: SerializableBloomWrapper(
101                Bloom::new_for_fp_rate_with_seed(size, Self::FALSE_POSITIVE_RATE, &random_bytes())
102                    .expect("bloom filter with the specified capacity is constructible"),
103            ),
104            count: 0,
105            capacity: size,
106        }
107    }
108}
109
110impl Default for TagBloomFilter {
111    fn default() -> Self {
112        Self::with_capacity(Self::DEFAULT_MAX_ITEMS)
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use hopr_crypto_types::types::PACKET_TAG_LENGTH;
119
120    use super::*;
121
122    const ZEROS_TAG: [u8; PACKET_TAG_LENGTH] = [0; PACKET_TAG_LENGTH];
123    const ONES_TAG: [u8; PACKET_TAG_LENGTH] = [1; PACKET_TAG_LENGTH];
124
125    #[cfg(feature = "serde")]
126    const TAGBLOOM_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
127        .with_little_endian()
128        .with_variable_int_encoding();
129
130    #[test]
131    #[cfg(feature = "serde")]
132    fn test_packet_tag_bloom_filter() -> anyhow::Result<()> {
133        let mut filter1 = TagBloomFilter::default();
134
135        let items = (0..10_000)
136            .map(|i| {
137                let mut ret = random_bytes::<{ hopr_crypto_types::types::PACKET_TAG_LENGTH }>();
138                ret[i % hopr_crypto_types::types::PACKET_TAG_LENGTH] = 0xaa; // ensure it is not completely just zeroes
139                ret
140            })
141            .collect::<Vec<_>>();
142
143        // Insert items into the BF
144        items.iter().for_each(|item| filter1.set(item));
145
146        assert_eq!(items.len(), filter1.count(), "invalid number of items in bf");
147
148        // Count the number of items in the BF (incl. false positives)
149        let match_count_1 = items.iter().filter(|item| filter1.check(item)).count();
150
151        let filter2: TagBloomFilter = bincode::serde::decode_from_slice(
152            &bincode::serde::encode_to_vec(&filter1, TAGBLOOM_BINCODE_CONFIGURATION)?,
153            TAGBLOOM_BINCODE_CONFIGURATION,
154        )?
155        .0;
156
157        // Count the number of items in the BF (incl. false positives)
158        let match_count_2 = items.iter().filter(|item| filter2.check(item)).count();
159
160        assert_eq!(
161            match_count_1, match_count_2,
162            "the number of false positives must be equal"
163        );
164        assert_eq!(filter1.count(), filter2.count(), "the number of items must be equal");
165
166        // All zeroes must be present in neither BF; we ensured we never insert a zero tag
167        assert!(!filter1.check(&ZEROS_TAG), "bf 1 must not contain zero tag");
168        assert!(!filter2.check(&ZEROS_TAG), "bf 2 must not contain zero tag");
169
170        Ok(())
171    }
172
173    #[test]
174    fn tag_bloom_filter_count() {
175        let mut filter = TagBloomFilter::default();
176        assert!(!filter.check_and_set(&ZEROS_TAG));
177        assert_eq!(1, filter.count());
178
179        assert!(filter.check_and_set(&ZEROS_TAG));
180        assert_eq!(1, filter.count());
181
182        assert!(!filter.check_and_set(&ONES_TAG));
183        assert_eq!(2, filter.count());
184
185        assert!(filter.check_and_set(&ZEROS_TAG));
186        assert_eq!(2, filter.count());
187    }
188
189    #[test]
190    fn tag_bloom_filter_wrap_around() {
191        let mut filter = TagBloomFilter::with_capacity(1000);
192        for _ in 1..filter.capacity() {
193            let mut tag: PacketTag = hopr_crypto_random::random_bytes();
194            tag[0] = 0xaa; // ensure it's not all zeroes
195            assert!(!filter.check_and_set(&tag));
196        }
197        // Not yet at full capacity
198        assert_eq!(filter.capacity() - 1, filter.count());
199
200        // This entry is not there yet
201        assert!(!filter.check_and_set(&ZEROS_TAG));
202
203        // Now the filter is at capacity and contains the previously inserted entry
204        assert_eq!(filter.capacity(), filter.count());
205        assert!(filter.check(&ZEROS_TAG));
206
207        // This will not clear out the filter, since the entry is there
208        assert!(filter.check_and_set(&ZEROS_TAG));
209        assert_eq!(filter.capacity(), filter.count());
210
211        // This will clear out the filter, since this other entry is definitely not there
212        assert!(!filter.check_and_set(&ONES_TAG));
213        assert_eq!(1, filter.count());
214        assert!(filter.check(&ONES_TAG));
215    }
216}