1use std::{sync::Arc, time::Duration};
2
3use hopr_crypto_packet::prelude::*;
4use hopr_internal_types::prelude::HoprPseudonym;
5use hopr_network_types::prelude::SurbMatcher;
6use moka::{future::Cache, notification::RemovalCause};
7use ringbuffer::RingBuffer;
8use validator::ValidationError;
9
10use crate::{FoundSurb, traits::SurbStore};
11
12const MINIMUM_SURB_LIFETIME: Duration = Duration::from_secs(30);
13const MINIMUM_OPENER_PSEUDONYMS: usize = 1000;
14const MINIMUM_OPENERS_PER_PSEUDONYM: usize = 1000;
15const MINIMUM_SURBS_PER_PSEUDONYM: usize = 1000;
16const MINIMUM_OPENER_LIFETIME: Duration = Duration::from_secs(60);
17const MIN_SURB_RB_CAPACITY: usize = 1024;
18
19fn validate_pseudonyms_lifetime(lifetime: &Duration) -> Result<(), ValidationError> {
20 if lifetime < &MINIMUM_SURB_LIFETIME {
21 Err(ValidationError::new("pseudonyms_lifetime is too low"))
22 } else {
23 Ok(())
24 }
25}
26
27fn validate_reply_opener_lifetime(lifetime: &Duration) -> Result<(), ValidationError> {
28 if lifetime < &MINIMUM_OPENER_LIFETIME {
29 Err(ValidationError::new("reply_opener_lifetime is too low"))
30 } else {
31 Ok(())
32 }
33}
34
35fn default_rb_capacity() -> usize {
36 15_000
37}
38
39fn default_distress_threshold() -> usize {
40 500
41}
42
43fn default_max_openers_per_pseudonym() -> usize {
44 100_000
45}
46
47fn default_max_pseudonyms() -> usize {
48 10_000
49}
50
51fn default_pseudonyms_lifetime() -> Duration {
52 Duration::from_secs(600)
53}
54
55fn default_reply_opener_lifetime() -> Duration {
56 Duration::from_secs(3600)
57}
58
59#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, smart_default::SmartDefault, validator::Validate)]
67#[cfg_attr(
68 feature = "serde",
69 derive(serde::Deserialize, serde::Serialize),
70 serde(deny_unknown_fields)
71)]
72pub struct SurbStoreConfig {
73 #[default(default_rb_capacity())]
82 #[validate(range(min = 1024, message = "rb_capacity must be at least 1024"))]
83 #[cfg_attr(feature = "serde", serde(default = "default_rb_capacity"))]
84 pub rb_capacity: usize,
85 #[default(default_distress_threshold())]
90 #[validate(range(min = 10, message = "distress_threshold must be at least 10"))]
91 #[cfg_attr(feature = "serde", serde(default = "default_distress_threshold"))]
92 pub distress_threshold: usize,
93 #[default(default_max_openers_per_pseudonym())]
104 #[validate(range(min = 100, message = "max_openers_per_pseudonym must be at least 100"))]
105 #[cfg_attr(feature = "serde", serde(default = "default_max_openers_per_pseudonym"))]
106 pub max_openers_per_pseudonym: usize,
107 #[default(default_max_pseudonyms())]
115 #[validate(range(min = 100, message = "max_pseudonyms must be at least 100"))]
116 #[cfg_attr(feature = "serde", serde(default = "default_max_pseudonyms"))]
117 pub max_pseudonyms: usize,
118 #[default(default_pseudonyms_lifetime())]
132 #[validate(custom(function = "validate_pseudonyms_lifetime"))]
133 #[cfg_attr(
134 feature = "serde",
135 serde(default = "default_pseudonyms_lifetime", with = "humantime_serde")
136 )]
137 pub pseudonyms_lifetime: Duration,
138 #[default(default_reply_opener_lifetime())]
149 #[validate(custom(function = "validate_reply_opener_lifetime"))]
150 #[cfg_attr(
151 feature = "serde",
152 serde(default = "default_reply_opener_lifetime", with = "humantime_serde")
153 )]
154 pub reply_opener_lifetime: Duration,
155}
156
157#[derive(Clone)]
163pub struct MemorySurbStore {
164 pseudonym_openers: moka::sync::Cache<HoprPseudonym, moka::sync::Cache<HoprSurbId, ReplyOpener>>,
165 surbs_per_pseudonym: Cache<HoprPseudonym, SurbRingBuffer<HoprSurb>>,
166 cfg: Arc<SurbStoreConfig>,
167}
168
169impl MemorySurbStore {
170 pub fn new(cfg: SurbStoreConfig) -> Self {
172 Self {
173 pseudonym_openers: moka::sync::Cache::builder()
177 .time_to_idle(cfg.pseudonyms_lifetime.max(MINIMUM_SURB_LIFETIME))
178 .eviction_policy(moka::policy::EvictionPolicy::lru())
179 .eviction_listener(|sender_id, _reply_opener, cause| {
180 tracing::warn!(?sender_id, ?cause, "evicting reply opener for pseudonym");
181 })
182 .max_capacity(cfg.max_openers_per_pseudonym.max(MINIMUM_OPENER_PSEUDONYMS) as u64)
183 .build(),
184 surbs_per_pseudonym: Cache::builder()
187 .time_to_idle(cfg.pseudonyms_lifetime.max(MINIMUM_SURB_LIFETIME))
188 .eviction_policy(moka::policy::EvictionPolicy::lru())
189 .eviction_listener(|pseudonym, _reply_opener, cause| {
190 tracing::warn!(%pseudonym, ?cause, "evicting surb for pseudonym");
191 })
192 .max_capacity(cfg.max_pseudonyms.max(MINIMUM_SURBS_PER_PSEUDONYM) as u64)
193 .build(),
194 cfg: cfg.into(),
195 }
196 }
197}
198
199impl Default for MemorySurbStore {
200 fn default() -> Self {
201 Self::new(SurbStoreConfig::default())
202 }
203}
204
205#[async_trait::async_trait]
206impl SurbStore for MemorySurbStore {
207 #[tracing::instrument(skip_all, level = "trace", fields(?matcher), ret)]
208 async fn find_surb(&self, matcher: SurbMatcher) -> Option<FoundSurb> {
209 let pseudonym = matcher.pseudonym();
210 let surbs_for_pseudonym = self.surbs_per_pseudonym.get(&pseudonym).await?;
211
212 match matcher {
213 SurbMatcher::Pseudonym(_) => surbs_for_pseudonym.pop_one().map(|popped_surb| FoundSurb {
214 sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
215 surb: popped_surb.surb,
216 remaining: popped_surb.remaining,
217 }),
218 SurbMatcher::Exact(id) => {
223 surbs_for_pseudonym
224 .pop_one_if_has_id(&id.surb_id())
225 .map(|popped_surb| FoundSurb {
226 sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
227 surb: popped_surb.surb,
228 remaining: popped_surb.remaining, })
230 }
231 }
232 }
233
234 #[tracing::instrument(skip_all, level = "trace", fields(%pseudonym, num_surbs = surbs.len()))]
235 async fn insert_surbs(&self, pseudonym: HoprPseudonym, surbs: Vec<(HoprSurbId, HoprSurb)>) -> usize {
236 self.surbs_per_pseudonym
237 .entry_by_ref(&pseudonym)
238 .or_insert_with(futures::future::lazy(|_| {
239 SurbRingBuffer::new(self.cfg.rb_capacity.max(MIN_SURB_RB_CAPACITY))
240 }))
241 .await
242 .value()
243 .push(surbs)
244 }
245
246 #[tracing::instrument(skip_all, level = "trace", fields(?sender_id))]
247 fn insert_reply_opener(&self, sender_id: HoprSenderId, opener: ReplyOpener) {
248 let opener_lifetime = self.cfg.reply_opener_lifetime.max(MINIMUM_OPENER_LIFETIME);
249 let max_openers_per_pseudonym = self.cfg.max_openers_per_pseudonym.max(MINIMUM_OPENERS_PER_PSEUDONYM);
250 self.pseudonym_openers
251 .get_with(sender_id.pseudonym(), move || {
252 moka::sync::Cache::builder()
253 .time_to_live(opener_lifetime)
254 .eviction_listener(move |id: Arc<HoprSurbId>, _, cause| {
255 if cause != RemovalCause::Explicit {
256 tracing::warn!(
257 pseudonym = %sender_id.pseudonym(),
258 surb_id = hex::encode(id.as_slice()),
259 ?cause,
260 "evicting reply opener for sender id"
261 );
262 }
263 })
264 .max_capacity(max_openers_per_pseudonym as u64)
265 .build()
266 })
267 .insert(sender_id.surb_id(), opener);
268 }
269
270 #[tracing::instrument(skip_all, level = "trace", fields(?sender_id), ret)]
271 fn find_reply_opener(&self, sender_id: &HoprSenderId) -> Option<ReplyOpener> {
272 self.pseudonym_openers
273 .get(&sender_id.pseudonym())
274 .and_then(|cache| cache.remove(&sender_id.surb_id()))
275 }
276}
277
278#[derive(Debug, Clone)]
280pub struct PoppedSurb<S> {
281 pub id: HoprSurbId,
283 pub surb: S,
285 pub remaining: usize,
287}
288
289#[derive(Clone, Debug)]
294pub struct SurbRingBuffer<S>(Arc<parking_lot::Mutex<ringbuffer::AllocRingBuffer<(HoprSurbId, S)>>>);
295
296impl<S> SurbRingBuffer<S> {
297 pub fn new(capacity: usize) -> Self {
298 Self(Arc::new(parking_lot::Mutex::new(ringbuffer::AllocRingBuffer::new(
299 capacity,
300 ))))
301 }
302
303 pub fn push<I: IntoIterator<Item = (HoprSurbId, S)>>(&self, surbs: I) -> usize {
307 let mut rb = self.0.lock();
308 rb.extend(surbs);
309 rb.len()
310 }
311
312 pub fn pop_one(&self) -> Option<PoppedSurb<S>> {
314 let mut rb = self.0.lock();
315 let (id, surb) = rb.dequeue()?;
316 Some(PoppedSurb {
317 id,
318 surb,
319 remaining: rb.len(),
320 })
321 }
322
323 pub fn pop_one_if_has_id(&self, id: &HoprSurbId) -> Option<PoppedSurb<S>> {
325 let mut rb = self.0.lock();
326
327 if rb.peek().is_some_and(|(surb_id, _)| surb_id == id) {
328 let (id, surb) = rb.dequeue()?;
329 Some(PoppedSurb {
330 id,
331 surb,
332 remaining: rb.len(),
333 })
334 } else {
335 None
336 }
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343
344 #[test]
345 fn surb_ring_buffer_must_drop_items_when_capacity_is_reached() -> anyhow::Result<()> {
346 let rb = SurbRingBuffer::new(3);
347 rb.push([([1u8; 8], 0)]);
348 rb.push([([2u8; 8], 0)]);
349 rb.push([([3u8; 8], 0)]);
350 rb.push([([4u8; 8], 0)]);
351
352 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
353 assert_eq!([2u8; 8], popped.id);
354 assert_eq!(2, popped.remaining);
355
356 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
357 assert_eq!([3u8; 8], popped.id);
358 assert_eq!(1, popped.remaining);
359
360 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
361 assert_eq!([4u8; 8], popped.id);
362 assert_eq!(0, popped.remaining);
363
364 assert!(rb.pop_one().is_none());
365
366 Ok(())
367 }
368
369 #[test]
370 fn surb_ring_buffer_must_be_fifo() -> anyhow::Result<()> {
371 let rb = SurbRingBuffer::new(5);
372
373 let len = rb.push([([1u8; 8], 0)]);
374 assert_eq!(1, len);
375
376 let len = rb.push([([2u8; 8], 0)]);
377 assert_eq!(2, len);
378
379 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
380 assert_eq!([1u8; 8], popped.id);
381 assert_eq!(1, popped.remaining);
382
383 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
384 assert_eq!([2u8; 8], popped.id);
385 assert_eq!(0, popped.remaining);
386
387 let len = rb.push([([1u8; 8], 0), ([2u8; 8], 0)]);
388 assert_eq!(2, len);
389
390 assert_eq!([1u8; 8], rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?.id);
391 assert_eq!([2u8; 8], rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?.id);
392
393 Ok(())
394 }
395
396 #[test]
397 fn surb_ring_buffer_must_not_pop_if_id_does_not_match() -> anyhow::Result<()> {
398 let rb = SurbRingBuffer::new(5);
399
400 rb.push([([1u8; 8], 0)]);
401
402 assert!(rb.pop_one_if_has_id(&[2u8; 8]).is_none());
403 assert_eq!(
404 [1u8; 8],
405 rb.pop_one_if_has_id(&[1u8; 8])
406 .ok_or(anyhow::anyhow!("expected pop"))?
407 .id
408 );
409
410 Ok(())
411 }
412}