1use std::{sync::Arc, time::Duration};
2
3use hopr_crypto_packet::prelude::*;
4use hopr_internal_types::prelude::HoprPseudonym;
5use hopr_network_types::prelude::SurbMatcher;
6use moka::{future::Cache, notification::RemovalCause};
7use ringbuffer::RingBuffer;
8use validator::ValidationError;
9
10use crate::{FoundSurb, traits::SurbStore};
11
12const MINIMUM_SURB_LIFETIME: Duration = Duration::from_secs(30);
13const MINIMUM_OPENER_PSEUDONYMS: usize = 1000;
14const MINIMUM_OPENERS_PER_PSEUDONYM: usize = 1000;
15const MINIMUM_SURBS_PER_PSEUDONYM: usize = 1000;
16const MINIMUM_OPENER_LIFETIME: Duration = Duration::from_secs(60);
17const MIN_SURB_RB_CAPACITY: usize = 1024;
18
19fn validate_pseudonyms_lifetime(lifetime: &Duration) -> Result<(), ValidationError> {
20 if lifetime < &MINIMUM_SURB_LIFETIME {
21 Err(ValidationError::new("pseudonyms_lifetime is too low"))
22 } else {
23 Ok(())
24 }
25}
26
27fn validate_reply_opener_lifetime(lifetime: &Duration) -> Result<(), ValidationError> {
28 if lifetime < &MINIMUM_OPENER_LIFETIME {
29 Err(ValidationError::new("reply_opener_lifetime is too low"))
30 } else {
31 Ok(())
32 }
33}
34
35fn default_rb_capacity() -> usize {
36 15_000
37}
38
39fn default_distress_threshold() -> usize {
40 500
41}
42
43fn default_max_openers_per_pseudonym() -> usize {
44 100_000
45}
46
47fn default_max_pseudonyms() -> usize {
48 10_000
49}
50
51fn default_pseudonyms_lifetime() -> Duration {
52 Duration::from_secs(600)
53}
54
55fn default_reply_opener_lifetime() -> Duration {
56 Duration::from_secs(3600)
57}
58
59#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, smart_default::SmartDefault, validator::Validate)]
67#[cfg_attr(
68 feature = "serde",
69 derive(serde::Deserialize, serde::Serialize),
70 serde(deny_unknown_fields)
71)]
72pub struct SurbStoreConfig {
73 #[default(default_rb_capacity())]
82 #[validate(range(min = 1024, message = "rb_capacity must be at least 1024"))]
83 #[cfg_attr(feature = "serde", serde(default = "default_rb_capacity"))]
84 pub rb_capacity: usize,
85 #[default(default_distress_threshold())]
90 #[validate(range(min = 10, message = "distress_threshold must be at least 10"))]
91 #[cfg_attr(feature = "serde", serde(default = "default_distress_threshold"))]
92 pub distress_threshold: usize,
93 #[default(default_max_openers_per_pseudonym())]
104 #[validate(range(min = 100, message = "max_openers_per_pseudonym must be at least 100"))]
105 #[cfg_attr(feature = "serde", serde(default = "default_max_openers_per_pseudonym"))]
106 pub max_openers_per_pseudonym: usize,
107 #[default(default_max_pseudonyms())]
115 #[validate(range(min = 100, message = "max_pseudonyms must be at least 100"))]
116 #[cfg_attr(feature = "serde", serde(default = "default_max_pseudonyms"))]
117 pub max_pseudonyms: usize,
118 #[default(default_pseudonyms_lifetime())]
132 #[validate(custom(function = "validate_pseudonyms_lifetime"))]
133 #[cfg_attr(
134 feature = "serde",
135 serde(default = "default_pseudonyms_lifetime", with = "humantime_serde")
136 )]
137 pub pseudonyms_lifetime: Duration,
138 #[default(default_reply_opener_lifetime())]
149 #[validate(custom(function = "validate_reply_opener_lifetime"))]
150 #[cfg_attr(
151 feature = "serde",
152 serde(default = "default_reply_opener_lifetime", with = "humantime_serde")
153 )]
154 pub reply_opener_lifetime: Duration,
155}
156
157#[derive(Clone)]
163pub struct MemorySurbStore {
164 pseudonym_openers: moka::sync::Cache<HoprPseudonym, moka::sync::Cache<HoprSurbId, ReplyOpener>>,
165 surbs_per_pseudonym: Cache<HoprPseudonym, SurbRingBuffer<HoprSurb>>,
166 cfg: Arc<SurbStoreConfig>,
167}
168
169impl MemorySurbStore {
170 pub fn new(cfg: SurbStoreConfig) -> Self {
172 Self {
173 pseudonym_openers: moka::sync::Cache::builder()
177 .time_to_idle(cfg.pseudonyms_lifetime.max(MINIMUM_SURB_LIFETIME))
178 .eviction_policy(moka::policy::EvictionPolicy::lru())
179 .eviction_listener(|sender_id, _reply_opener, cause| {
180 tracing::warn!(?sender_id, ?cause, "evicting reply opener for pseudonym");
181 })
182 .max_capacity(cfg.max_openers_per_pseudonym.max(MINIMUM_OPENER_PSEUDONYMS) as u64)
183 .build(),
184 surbs_per_pseudonym: Cache::builder()
187 .time_to_idle(cfg.pseudonyms_lifetime.max(MINIMUM_SURB_LIFETIME))
188 .eviction_policy(moka::policy::EvictionPolicy::lru())
189 .eviction_listener(|pseudonym, _reply_opener, cause| {
190 tracing::warn!(%pseudonym, ?cause, "evicting surb for pseudonym");
191 })
192 .max_capacity(cfg.max_pseudonyms.max(MINIMUM_SURBS_PER_PSEUDONYM) as u64)
193 .build(),
194 cfg: cfg.into(),
195 }
196 }
197}
198
199impl Default for MemorySurbStore {
200 fn default() -> Self {
201 Self::new(SurbStoreConfig::default())
202 }
203}
204
205#[async_trait::async_trait]
206impl SurbStore for MemorySurbStore {
207 async fn find_surb(&self, matcher: SurbMatcher) -> Option<FoundSurb> {
208 let pseudonym = matcher.pseudonym();
209 let surbs_for_pseudonym = self.surbs_per_pseudonym.get(&pseudonym).await?;
210
211 match matcher {
212 SurbMatcher::Pseudonym(_) => surbs_for_pseudonym.pop_one().map(|popped_surb| FoundSurb {
213 sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
214 surb: popped_surb.surb,
215 remaining: popped_surb.remaining,
216 }),
217 SurbMatcher::Exact(id) => {
222 surbs_for_pseudonym
223 .pop_one_if_has_id(&id.surb_id())
224 .map(|popped_surb| FoundSurb {
225 sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
226 surb: popped_surb.surb,
227 remaining: popped_surb.remaining, })
229 }
230 }
231 }
232
233 async fn insert_surbs(&self, pseudonym: HoprPseudonym, surbs: Vec<(HoprSurbId, HoprSurb)>) -> usize {
234 self.surbs_per_pseudonym
235 .entry_by_ref(&pseudonym)
236 .or_insert_with(futures::future::lazy(|_| {
237 SurbRingBuffer::new(self.cfg.rb_capacity.max(MIN_SURB_RB_CAPACITY))
238 }))
239 .await
240 .value()
241 .push(surbs)
242 }
243
244 fn insert_reply_opener(&self, sender_id: HoprSenderId, opener: ReplyOpener) {
245 let opener_lifetime = self.cfg.reply_opener_lifetime.max(MINIMUM_OPENER_LIFETIME);
246 let max_openers_per_pseudonym = self.cfg.max_openers_per_pseudonym.max(MINIMUM_OPENERS_PER_PSEUDONYM);
247 self.pseudonym_openers
248 .get_with(sender_id.pseudonym(), move || {
249 moka::sync::Cache::builder()
250 .time_to_live(opener_lifetime)
251 .eviction_listener(move |id: Arc<HoprSurbId>, _, cause| {
252 if cause != RemovalCause::Explicit {
253 tracing::warn!(
254 pseudonym = %sender_id.pseudonym(),
255 surb_id = hex::encode(id.as_slice()),
256 ?cause,
257 "evicting reply opener for sender id"
258 );
259 }
260 })
261 .max_capacity(max_openers_per_pseudonym as u64)
262 .build()
263 })
264 .insert(sender_id.surb_id(), opener);
265 }
266
267 fn find_reply_opener(&self, sender_id: &HoprSenderId) -> Option<ReplyOpener> {
268 self.pseudonym_openers
269 .get(&sender_id.pseudonym())
270 .and_then(|cache| cache.remove(&sender_id.surb_id()))
271 }
272}
273
274#[derive(Debug, Clone)]
276pub struct PoppedSurb<S> {
277 pub id: HoprSurbId,
279 pub surb: S,
281 pub remaining: usize,
283}
284
285#[derive(Clone, Debug)]
290pub struct SurbRingBuffer<S>(Arc<parking_lot::Mutex<ringbuffer::AllocRingBuffer<(HoprSurbId, S)>>>);
291
292impl<S> SurbRingBuffer<S> {
293 pub fn new(capacity: usize) -> Self {
294 Self(Arc::new(parking_lot::Mutex::new(ringbuffer::AllocRingBuffer::new(
295 capacity,
296 ))))
297 }
298
299 pub fn push<I: IntoIterator<Item = (HoprSurbId, S)>>(&self, surbs: I) -> usize {
303 let mut rb = self.0.lock();
304 rb.extend(surbs);
305 rb.len()
306 }
307
308 pub fn pop_one(&self) -> Option<PoppedSurb<S>> {
310 let mut rb = self.0.lock();
311 let (id, surb) = rb.dequeue()?;
312 Some(PoppedSurb {
313 id,
314 surb,
315 remaining: rb.len(),
316 })
317 }
318
319 pub fn pop_one_if_has_id(&self, id: &HoprSurbId) -> Option<PoppedSurb<S>> {
321 let mut rb = self.0.lock();
322
323 if rb.peek().is_some_and(|(surb_id, _)| surb_id == id) {
324 let (id, surb) = rb.dequeue()?;
325 Some(PoppedSurb {
326 id,
327 surb,
328 remaining: rb.len(),
329 })
330 } else {
331 None
332 }
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 #[test]
341 fn surb_ring_buffer_must_drop_items_when_capacity_is_reached() -> anyhow::Result<()> {
342 let rb = SurbRingBuffer::new(3);
343 rb.push([([1u8; 8], 0)]);
344 rb.push([([2u8; 8], 0)]);
345 rb.push([([3u8; 8], 0)]);
346 rb.push([([4u8; 8], 0)]);
347
348 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
349 assert_eq!([2u8; 8], popped.id);
350 assert_eq!(2, popped.remaining);
351
352 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
353 assert_eq!([3u8; 8], popped.id);
354 assert_eq!(1, popped.remaining);
355
356 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
357 assert_eq!([4u8; 8], popped.id);
358 assert_eq!(0, popped.remaining);
359
360 assert!(rb.pop_one().is_none());
361
362 Ok(())
363 }
364
365 #[test]
366 fn surb_ring_buffer_must_be_fifo() -> anyhow::Result<()> {
367 let rb = SurbRingBuffer::new(5);
368
369 let len = rb.push([([1u8; 8], 0)]);
370 assert_eq!(1, len);
371
372 let len = rb.push([([2u8; 8], 0)]);
373 assert_eq!(2, len);
374
375 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
376 assert_eq!([1u8; 8], popped.id);
377 assert_eq!(1, popped.remaining);
378
379 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
380 assert_eq!([2u8; 8], popped.id);
381 assert_eq!(0, popped.remaining);
382
383 let len = rb.push([([1u8; 8], 0), ([2u8; 8], 0)]);
384 assert_eq!(2, len);
385
386 assert_eq!([1u8; 8], rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?.id);
387 assert_eq!([2u8; 8], rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?.id);
388
389 Ok(())
390 }
391
392 #[test]
393 fn surb_ring_buffer_must_not_pop_if_id_does_not_match() -> anyhow::Result<()> {
394 let rb = SurbRingBuffer::new(5);
395
396 rb.push([([1u8; 8], 0)]);
397
398 assert!(rb.pop_one_if_has_id(&[2u8; 8]).is_none());
399 assert_eq!(
400 [1u8; 8],
401 rb.pop_one_if_has_id(&[1u8; 8])
402 .ok_or(anyhow::anyhow!("expected pop"))?
403 .id
404 );
405
406 Ok(())
407 }
408}