hopr_transport_bloom/
raw.rs1use bloomfilter::Bloom;
2use hopr_crypto_random::random_bytes;
3use hopr_crypto_types::types::PacketTag;
4
5#[derive(Debug, Clone)]
11#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
12pub struct TagBloomFilter {
13 bloom: SerializableBloomWrapper,
14 count: usize,
15 capacity: usize,
16}
17
18#[derive(Debug, Clone)]
19struct SerializableBloomWrapper(Bloom<PacketTag>);
20
21#[cfg(feature = "serde")]
22impl serde::Serialize for SerializableBloomWrapper {
23 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
24 where
25 S: serde::Serializer,
26 {
27 bloomfilter::serialize(&self.0, serializer)
28 }
29}
30
31#[cfg(feature = "serde")]
32impl<'de> serde::Deserialize<'de> for SerializableBloomWrapper {
33 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
34 where
35 D: serde::Deserializer<'de>,
36 {
37 bloomfilter::deserialize(deserializer).map(Self)
38 }
39}
40
41impl TagBloomFilter {
42 const DEFAULT_MAX_ITEMS: usize = 10_000_000;
45 const FALSE_POSITIVE_RATE: f64 = 0.00001_f64;
47
48 pub fn count(&self) -> usize {
50 self.count
51 }
52
53 pub fn capacity(&self) -> usize {
54 self.capacity
55 }
56
57 pub fn set(&mut self, tag: &PacketTag) {
59 if self.count == self.capacity {
60 tracing::warn!("maximum number of items in the Bloom filter reached!");
61 self.bloom.0.clear();
62 self.count = 0;
63 }
64
65 self.bloom.0.set(tag);
66 self.count += 1;
67 }
68
69 pub fn check(&self, tag: &PacketTag) -> bool {
72 self.bloom.0.check(tag)
73 }
74
75 pub fn check_and_set(&mut self, tag: &PacketTag) -> bool {
77 if self.count == self.capacity {
79 let is_present = self.bloom.0.check(tag);
80 if !is_present {
81 tracing::warn!("maximum number of items in the Bloom filter reached!");
83 self.bloom.0.clear();
84 self.bloom.0.set(tag);
85 self.count = 1;
86 }
87 is_present
88 } else {
89 let was_present = self.bloom.0.check_and_set(tag);
91 if !was_present {
92 self.count += 1;
93 }
94 was_present
95 }
96 }
97
98 fn with_capacity(size: usize) -> Self {
99 Self {
100 bloom: SerializableBloomWrapper(
101 Bloom::new_for_fp_rate_with_seed(size, Self::FALSE_POSITIVE_RATE, &random_bytes())
102 .expect("bloom filter with the specified capacity is constructible"),
103 ),
104 count: 0,
105 capacity: size,
106 }
107 }
108}
109
110impl Default for TagBloomFilter {
111 fn default() -> Self {
112 Self::with_capacity(Self::DEFAULT_MAX_ITEMS)
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use hopr_crypto_types::types::PACKET_TAG_LENGTH;
119
120 use super::*;
121
122 const ZEROS_TAG: [u8; PACKET_TAG_LENGTH] = [0; PACKET_TAG_LENGTH];
123 const ONES_TAG: [u8; PACKET_TAG_LENGTH] = [1; PACKET_TAG_LENGTH];
124
125 #[cfg(feature = "serde")]
126 const TAGBLOOM_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
127 .with_little_endian()
128 .with_variable_int_encoding();
129
130 #[test]
131 #[cfg(feature = "serde")]
132 fn test_packet_tag_bloom_filter() -> anyhow::Result<()> {
133 let mut filter1 = TagBloomFilter::default();
134
135 let items = (0..10_000)
136 .map(|i| {
137 let mut ret = random_bytes::<{ hopr_crypto_types::types::PACKET_TAG_LENGTH }>();
138 ret[i % hopr_crypto_types::types::PACKET_TAG_LENGTH] = 0xaa; ret
140 })
141 .collect::<Vec<_>>();
142
143 items.iter().for_each(|item| filter1.set(item));
145
146 assert_eq!(items.len(), filter1.count(), "invalid number of items in bf");
147
148 let match_count_1 = items.iter().filter(|item| filter1.check(item)).count();
150
151 let filter2: TagBloomFilter = bincode::serde::decode_from_slice(
152 &bincode::serde::encode_to_vec(&filter1, TAGBLOOM_BINCODE_CONFIGURATION)?,
153 TAGBLOOM_BINCODE_CONFIGURATION,
154 )?
155 .0;
156
157 let match_count_2 = items.iter().filter(|item| filter2.check(item)).count();
159
160 assert_eq!(
161 match_count_1, match_count_2,
162 "the number of false positives must be equal"
163 );
164 assert_eq!(filter1.count(), filter2.count(), "the number of items must be equal");
165
166 assert!(!filter1.check(&ZEROS_TAG), "bf 1 must not contain zero tag");
168 assert!(!filter2.check(&ZEROS_TAG), "bf 2 must not contain zero tag");
169
170 Ok(())
171 }
172
173 #[test]
174 fn tag_bloom_filter_count() {
175 let mut filter = TagBloomFilter::default();
176 assert!(!filter.check_and_set(&ZEROS_TAG));
177 assert_eq!(1, filter.count());
178
179 assert!(filter.check_and_set(&ZEROS_TAG));
180 assert_eq!(1, filter.count());
181
182 assert!(!filter.check_and_set(&ONES_TAG));
183 assert_eq!(2, filter.count());
184
185 assert!(filter.check_and_set(&ZEROS_TAG));
186 assert_eq!(2, filter.count());
187 }
188
189 #[test]
190 fn tag_bloom_filter_wrap_around() {
191 let mut filter = TagBloomFilter::with_capacity(1000);
192 for _ in 1..filter.capacity() {
193 let mut tag: PacketTag = hopr_crypto_random::random_bytes();
194 tag[0] = 0xaa; assert!(!filter.check_and_set(&tag));
196 }
197 assert_eq!(filter.capacity() - 1, filter.count());
199
200 assert!(!filter.check_and_set(&ZEROS_TAG));
202
203 assert_eq!(filter.capacity(), filter.count());
205 assert!(filter.check(&ZEROS_TAG));
206
207 assert!(filter.check_and_set(&ZEROS_TAG));
209 assert_eq!(filter.capacity(), filter.count());
210
211 assert!(!filter.check_and_set(&ONES_TAG));
213 assert_eq!(1, filter.count());
214 assert!(filter.check(&ONES_TAG));
215 }
216}