1use std::{sync::Arc, time::Duration};
2
3use hopr_api::types::internal::{prelude::HoprPseudonym, routing::SurbMatcher};
4use hopr_crypto_packet::prelude::*;
5use moka::notification::RemovalCause;
6use ringbuffer::RingBuffer;
7use validator::ValidationError;
8
9use crate::{FoundSurb, traits::SurbStore};
10
11const MINIMUM_SURB_LIFETIME: Duration = Duration::from_secs(30);
12const MINIMUM_OPENER_PSEUDONYMS: usize = 1000;
13const MINIMUM_OPENERS_PER_PSEUDONYM: usize = 1000;
14const MINIMUM_SURBS_PER_PSEUDONYM: usize = 1000;
15const MINIMUM_OPENER_LIFETIME: Duration = Duration::from_secs(60);
16const MIN_SURB_RB_CAPACITY: usize = 1024;
17
18fn validate_pseudonyms_lifetime(lifetime: &Duration) -> Result<(), ValidationError> {
19 if lifetime < &MINIMUM_SURB_LIFETIME {
20 Err(ValidationError::new("pseudonyms_lifetime is too low"))
21 } else {
22 Ok(())
23 }
24}
25
26fn validate_reply_opener_lifetime(lifetime: &Duration) -> Result<(), ValidationError> {
27 if lifetime < &MINIMUM_OPENER_LIFETIME {
28 Err(ValidationError::new("reply_opener_lifetime is too low"))
29 } else {
30 Ok(())
31 }
32}
33
34fn default_rb_capacity() -> usize {
35 15_000
36}
37
38fn default_distress_threshold() -> usize {
39 500
40}
41
42fn default_max_openers_per_pseudonym() -> usize {
43 100_000
44}
45
46fn default_max_pseudonyms() -> usize {
47 10_000
48}
49
50fn default_pseudonyms_lifetime() -> Duration {
51 Duration::from_secs(600)
52}
53
54fn default_reply_opener_lifetime() -> Duration {
55 Duration::from_secs(3600)
56}
57
58#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, smart_default::SmartDefault, validator::Validate)]
66#[cfg_attr(
67 feature = "serde",
68 derive(serde::Deserialize, serde::Serialize),
69 serde(deny_unknown_fields)
70)]
71pub struct SurbStoreConfig {
72 #[default(default_rb_capacity())]
81 #[validate(range(min = 1024, message = "rb_capacity must be at least 1024"))]
82 #[cfg_attr(feature = "serde", serde(default = "default_rb_capacity"))]
83 pub rb_capacity: usize,
84 #[default(default_distress_threshold())]
89 #[validate(range(min = 10, message = "distress_threshold must be at least 10"))]
90 #[cfg_attr(feature = "serde", serde(default = "default_distress_threshold"))]
91 pub distress_threshold: usize,
92 #[default(default_max_openers_per_pseudonym())]
103 #[validate(range(min = 100, message = "max_openers_per_pseudonym must be at least 100"))]
104 #[cfg_attr(feature = "serde", serde(default = "default_max_openers_per_pseudonym"))]
105 pub max_openers_per_pseudonym: usize,
106 #[default(default_max_pseudonyms())]
114 #[validate(range(min = 100, message = "max_pseudonyms must be at least 100"))]
115 #[cfg_attr(feature = "serde", serde(default = "default_max_pseudonyms"))]
116 pub max_pseudonyms: usize,
117 #[default(default_pseudonyms_lifetime())]
131 #[validate(custom(function = "validate_pseudonyms_lifetime"))]
132 #[cfg_attr(
133 feature = "serde",
134 serde(default = "default_pseudonyms_lifetime", with = "humantime_serde")
135 )]
136 pub pseudonyms_lifetime: Duration,
137 #[default(default_reply_opener_lifetime())]
148 #[validate(custom(function = "validate_reply_opener_lifetime"))]
149 #[cfg_attr(
150 feature = "serde",
151 serde(default = "default_reply_opener_lifetime", with = "humantime_serde")
152 )]
153 pub reply_opener_lifetime: Duration,
154}
155
156#[derive(Clone)]
162pub struct MemorySurbStore {
163 pseudonym_openers: moka::sync::Cache<HoprPseudonym, moka::sync::Cache<HoprSurbId, ReplyOpener>>,
164 surbs_per_pseudonym: moka::sync::Cache<HoprPseudonym, SurbRingBuffer<HoprSurb>>,
165 cfg: Arc<SurbStoreConfig>,
166}
167
168impl MemorySurbStore {
169 pub fn new(cfg: SurbStoreConfig) -> Self {
171 Self {
172 pseudonym_openers: moka::sync::Cache::builder()
176 .time_to_idle(cfg.pseudonyms_lifetime.max(MINIMUM_SURB_LIFETIME))
177 .eviction_policy(moka::policy::EvictionPolicy::lru())
178 .eviction_listener(|sender_id, _reply_opener, cause| {
179 tracing::warn!(?sender_id, ?cause, "evicting reply opener for pseudonym");
180 })
181 .max_capacity(cfg.max_openers_per_pseudonym.max(MINIMUM_OPENER_PSEUDONYMS) as u64)
182 .build(),
183 surbs_per_pseudonym: moka::sync::Cache::builder()
186 .time_to_idle(cfg.pseudonyms_lifetime.max(MINIMUM_SURB_LIFETIME))
187 .eviction_policy(moka::policy::EvictionPolicy::lru())
188 .eviction_listener(|pseudonym, _reply_opener, cause| {
189 tracing::warn!(%pseudonym, ?cause, "evicting surb for pseudonym");
190 })
191 .max_capacity(cfg.max_pseudonyms.max(MINIMUM_SURBS_PER_PSEUDONYM) as u64)
192 .build(),
193 cfg: cfg.into(),
194 }
195 }
196}
197
198impl Default for MemorySurbStore {
199 fn default() -> Self {
200 Self::new(SurbStoreConfig::default())
201 }
202}
203
204#[async_trait::async_trait]
205impl SurbStore for MemorySurbStore {
206 #[tracing::instrument(skip_all, level = "trace", fields(?matcher), ret)]
207 fn find_surb(&self, matcher: SurbMatcher) -> Option<FoundSurb> {
208 let pseudonym = matcher.pseudonym();
209 let surbs_for_pseudonym = self.surbs_per_pseudonym.get(&pseudonym)?;
210
211 match matcher {
212 SurbMatcher::Pseudonym(_) => surbs_for_pseudonym.pop_one().map(|popped_surb| FoundSurb {
213 sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
214 surb: popped_surb.surb,
215 remaining: popped_surb.remaining,
216 }),
217 SurbMatcher::Exact(id) => {
222 surbs_for_pseudonym
223 .pop_one_if_has_id(&id.surb_id())
224 .map(|popped_surb| FoundSurb {
225 sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
226 surb: popped_surb.surb,
227 remaining: popped_surb.remaining, })
229 }
230 }
231 }
232
233 #[tracing::instrument(skip_all, level = "trace", fields(%pseudonym, num_surbs = surbs.len()))]
234 fn insert_surbs(&self, pseudonym: HoprPseudonym, surbs: Vec<(HoprSurbId, HoprSurb)>) -> usize {
235 self.surbs_per_pseudonym
236 .entry_by_ref(&pseudonym)
237 .or_insert_with(|| SurbRingBuffer::new(self.cfg.rb_capacity.max(MIN_SURB_RB_CAPACITY)))
238 .value()
239 .push(surbs)
240 }
241
242 #[tracing::instrument(skip_all, level = "trace", fields(?sender_id))]
243 fn insert_reply_opener(&self, sender_id: HoprSenderId, opener: ReplyOpener) {
244 let opener_lifetime = self.cfg.reply_opener_lifetime.max(MINIMUM_OPENER_LIFETIME);
245 let max_openers_per_pseudonym = self.cfg.max_openers_per_pseudonym.max(MINIMUM_OPENERS_PER_PSEUDONYM);
246 self.pseudonym_openers
247 .get_with(sender_id.pseudonym(), move || {
248 moka::sync::Cache::builder()
249 .time_to_live(opener_lifetime)
250 .eviction_listener(move |id: Arc<HoprSurbId>, _, cause| {
251 if cause != RemovalCause::Explicit {
252 tracing::warn!(
253 pseudonym = %sender_id.pseudonym(),
254 surb_id = hex::encode(id.as_slice()),
255 ?cause,
256 "evicting reply opener for sender id"
257 );
258 }
259 })
260 .max_capacity(max_openers_per_pseudonym as u64)
261 .build()
262 })
263 .insert(sender_id.surb_id(), opener);
264 }
265
266 #[tracing::instrument(skip_all, level = "trace", fields(?sender_id), ret)]
267 fn find_reply_opener(&self, sender_id: &HoprSenderId) -> Option<ReplyOpener> {
268 self.pseudonym_openers
269 .get(&sender_id.pseudonym())
270 .and_then(|cache| cache.remove(&sender_id.surb_id()))
271 }
272}
273
274#[derive(Debug, Clone)]
276pub struct PoppedSurb<S> {
277 pub id: HoprSurbId,
279 pub surb: S,
281 pub remaining: usize,
283}
284
285#[derive(Clone, Debug)]
290pub struct SurbRingBuffer<S>(Arc<parking_lot::Mutex<ringbuffer::AllocRingBuffer<(HoprSurbId, S)>>>);
291
292impl<S> SurbRingBuffer<S> {
293 pub fn new(capacity: usize) -> Self {
294 Self(Arc::new(parking_lot::Mutex::new(ringbuffer::AllocRingBuffer::new(
295 capacity,
296 ))))
297 }
298
299 pub fn push<I: IntoIterator<Item = (HoprSurbId, S)>>(&self, surbs: I) -> usize {
303 let mut rb = self.0.lock();
304 rb.extend(surbs);
305 rb.len()
306 }
307
308 pub fn pop_one(&self) -> Option<PoppedSurb<S>> {
310 let mut rb = self.0.lock();
311 let (id, surb) = rb.dequeue()?;
312 Some(PoppedSurb {
313 id,
314 surb,
315 remaining: rb.len(),
316 })
317 }
318
319 pub fn pop_one_if_has_id(&self, id: &HoprSurbId) -> Option<PoppedSurb<S>> {
321 let mut rb = self.0.lock();
322
323 if rb.peek().is_some_and(|(surb_id, _)| surb_id == id) {
324 let (id, surb) = rb.dequeue()?;
325 Some(PoppedSurb {
326 id,
327 surb,
328 remaining: rb.len(),
329 })
330 } else {
331 None
332 }
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 #[test]
341 fn surb_ring_buffer_must_drop_items_when_capacity_is_reached() -> anyhow::Result<()> {
342 let rb = SurbRingBuffer::new(3);
343 rb.push([([1u8; 8], 0)]);
344 rb.push([([2u8; 8], 0)]);
345 rb.push([([3u8; 8], 0)]);
346 rb.push([([4u8; 8], 0)]);
347
348 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
349 assert_eq!([2u8; 8], popped.id);
350 assert_eq!(2, popped.remaining);
351
352 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
353 assert_eq!([3u8; 8], popped.id);
354 assert_eq!(1, popped.remaining);
355
356 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
357 assert_eq!([4u8; 8], popped.id);
358 assert_eq!(0, popped.remaining);
359
360 assert!(rb.pop_one().is_none());
361
362 Ok(())
363 }
364
365 #[test]
366 fn surb_ring_buffer_must_be_fifo() -> anyhow::Result<()> {
367 let rb = SurbRingBuffer::new(5);
368
369 let len = rb.push([([1u8; 8], 0)]);
370 assert_eq!(1, len);
371
372 let len = rb.push([([2u8; 8], 0)]);
373 assert_eq!(2, len);
374
375 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
376 assert_eq!([1u8; 8], popped.id);
377 assert_eq!(1, popped.remaining);
378
379 let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
380 assert_eq!([2u8; 8], popped.id);
381 assert_eq!(0, popped.remaining);
382
383 let len = rb.push([([1u8; 8], 0), ([2u8; 8], 0)]);
384 assert_eq!(2, len);
385
386 assert_eq!([1u8; 8], rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?.id);
387 assert_eq!([2u8; 8], rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?.id);
388
389 Ok(())
390 }
391
392 #[test]
393 fn surb_ring_buffer_must_not_pop_if_id_does_not_match() -> anyhow::Result<()> {
394 let rb = SurbRingBuffer::new(5);
395
396 rb.push([([1u8; 8], 0)]);
397
398 assert!(rb.pop_one_if_has_id(&[2u8; 8]).is_none());
399 assert_eq!(
400 [1u8; 8],
401 rb.pop_one_if_has_id(&[1u8; 8])
402 .ok_or(anyhow::anyhow!("expected pop"))?
403 .id
404 );
405
406 Ok(())
407 }
408}