hopr_protocol_hopr/
surb_store.rs

1use std::{sync::Arc, time::Duration};
2
3use hopr_crypto_packet::prelude::*;
4use hopr_internal_types::prelude::HoprPseudonym;
5use hopr_network_types::prelude::SurbMatcher;
6use moka::{future::Cache, notification::RemovalCause};
7use ringbuffer::RingBuffer;
8use validator::ValidationError;
9
10use crate::{FoundSurb, traits::SurbStore};
11
12const MINIMUM_SURB_LIFETIME: Duration = Duration::from_secs(30);
13const MINIMUM_OPENER_PSEUDONYMS: usize = 1000;
14const MINIMUM_OPENERS_PER_PSEUDONYM: usize = 1000;
15const MINIMUM_SURBS_PER_PSEUDONYM: usize = 1000;
16const MINIMUM_OPENER_LIFETIME: Duration = Duration::from_secs(60);
17const MIN_SURB_RB_CAPACITY: usize = 1024;
18
19fn validate_pseudonyms_lifetime(lifetime: &Duration) -> Result<(), ValidationError> {
20    if lifetime < &MINIMUM_SURB_LIFETIME {
21        Err(ValidationError::new("pseudonyms_lifetime is too low"))
22    } else {
23        Ok(())
24    }
25}
26
27fn validate_reply_opener_lifetime(lifetime: &Duration) -> Result<(), ValidationError> {
28    if lifetime < &MINIMUM_OPENER_LIFETIME {
29        Err(ValidationError::new("reply_opener_lifetime is too low"))
30    } else {
31        Ok(())
32    }
33}
34
35fn default_rb_capacity() -> usize {
36    15_000
37}
38
39fn default_distress_threshold() -> usize {
40    500
41}
42
43fn default_max_openers_per_pseudonym() -> usize {
44    100_000
45}
46
47fn default_max_pseudonyms() -> usize {
48    10_000
49}
50
51fn default_pseudonyms_lifetime() -> Duration {
52    Duration::from_secs(600)
53}
54
55fn default_reply_opener_lifetime() -> Duration {
56    Duration::from_secs(3600)
57}
58
59/// Configuration for the SURB cache.
60///
61/// The configuration options affect both the sending side (SURB creator) and the
62/// replying side (SURB consumer).
63///
64/// In the classical scenario (`Entry - Relay 1 -... - Exit`), the sending side is
65/// the `Entry` and the replying side is the `Exit`.
66#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, smart_default::SmartDefault, validator::Validate)]
67#[cfg_attr(
68    feature = "serde",
69    derive(serde::Deserialize, serde::Serialize),
70    serde(deny_unknown_fields)
71)]
72pub struct SurbStoreConfig {
73    /// Size of the SURB ring buffer per pseudonym.
74    ///
75    /// Affects only the replying side.
76    ///
77    /// This indicates how many SURBs can be at most held to be used to send a reply
78    /// back to the sending side.
79    ///
80    /// Default is 15 000.
81    #[default(default_rb_capacity())]
82    #[validate(range(min = 1024, message = "rb_capacity must be at least 1024"))]
83    #[cfg_attr(feature = "serde", serde(default = "default_rb_capacity"))]
84    pub rb_capacity: usize,
85    /// Threshold for the number of SURBs in the ring buffer, below which it is
86    /// considered low ("SURB distress").
87    ///
88    /// Default is 500.
89    #[default(default_distress_threshold())]
90    #[validate(range(min = 10, message = "distress_threshold must be at least 10"))]
91    #[cfg_attr(feature = "serde", serde(default = "default_distress_threshold"))]
92    pub distress_threshold: usize,
93    /// Maximum number of reply openers (SURB counterparts) per pseudonym.
94    ///
95    /// Affects only the sending side when decrypting a received reply.
96    ///
97    /// This mostly affects Sessions, as they use a fixed pseudonym.
98    /// It reflects how many reply openers the initiator-side of a Session can hold,
99    /// until the oldest ones are dropped. If the other party uses a SURB corresponding
100    /// to a dropped reply opener, the reply message will be undecryptable by the initiator-side.
101    ///
102    /// Default is 100 000.
103    #[default(default_max_openers_per_pseudonym())]
104    #[validate(range(min = 100, message = "max_openers_per_pseudonym must be at least 100"))]
105    #[cfg_attr(feature = "serde", serde(default = "default_max_openers_per_pseudonym"))]
106    pub max_openers_per_pseudonym: usize,
107    /// The maximum number of distinct pseudonyms for which we hold a SURB ringbuffer.
108    ///
109    /// Affects only the replying side.
110    ///
111    /// For each pseudonym, there is a ring-buffer with capacity `rb_capacity`.
112    ///
113    /// Default is 10 000.
114    #[default(default_max_pseudonyms())]
115    #[validate(range(min = 100, message = "max_pseudonyms must be at least 100"))]
116    #[cfg_attr(feature = "serde", serde(default = "default_max_pseudonyms"))]
117    pub max_pseudonyms: usize,
118    /// Maximum lifetime of ring-buffer for each pseudonym.
119    ///
120    /// # Effects on sending side
121    /// This is the period for which we hold all reply openers for a pseudonym.
122    /// If no more messages carrying SURBs are sent during this period, the entire stash of
123    /// reply openers is dropped. Preventing receiving any more replies for that pseudonym.
124    ///
125    /// # Effects on replying side
126    /// If a pseudonym has not received any SURBs for this period,
127    /// the entire ring buffer with `rb_capacity` (= all SURBs for this pseudonym) is dropped.
128    /// Preventing from sending any more replies for that pseudonym.
129    ///
130    /// Default is 600 seconds.
131    #[default(default_pseudonyms_lifetime())]
132    #[validate(custom(function = "validate_pseudonyms_lifetime"))]
133    #[cfg_attr(
134        feature = "serde",
135        serde(default = "default_pseudonyms_lifetime", with = "humantime_serde")
136    )]
137    pub pseudonyms_lifetime: Duration,
138    /// Maximum lifetime of a reply opener.
139    ///
140    /// Affects only the sending side.
141    ///
142    /// A reply opener is distinguished using [`HoprSurbId`] and a pseudonym it belongs to.
143    /// If a reply opener is not used to decrypt the received packet within this period,
144    /// it is dropped. If the replying side uses the corresponding SURB to send a reply,
145    /// it won't be possible to decrypt it when received.
146    ///
147    /// Default is 3600 seconds.
148    #[default(default_reply_opener_lifetime())]
149    #[validate(custom(function = "validate_reply_opener_lifetime"))]
150    #[cfg_attr(
151        feature = "serde",
152        serde(default = "default_reply_opener_lifetime", with = "humantime_serde")
153    )]
154    pub reply_opener_lifetime: Duration,
155}
156
157/// Basic [`SurbStore`] implementation based on an in-memory cache.
158///
159/// This SURB store offers no persistence, and all SURBs and Reply Openers are lost once dropped.
160///
161/// The instance can be cheaply cloned.
162#[derive(Clone)]
163pub struct MemorySurbStore {
164    pseudonym_openers: moka::sync::Cache<HoprPseudonym, moka::sync::Cache<HoprSurbId, ReplyOpener>>,
165    surbs_per_pseudonym: Cache<HoprPseudonym, SurbRingBuffer<HoprSurb>>,
166    cfg: Arc<SurbStoreConfig>,
167}
168
169impl MemorySurbStore {
170    /// Creates a new instance with the given configuration.
171    pub fn new(cfg: SurbStoreConfig) -> Self {
172        Self {
173            // Reply openers are indexed by entire Sender IDs (Pseudonym + SURB ID)
174            // in a cascade fashion, allowing the entire batches (by Pseudonym) to be evicted
175            // if not used.
176            pseudonym_openers: moka::sync::Cache::builder()
177                .time_to_idle(cfg.pseudonyms_lifetime.max(MINIMUM_SURB_LIFETIME))
178                .eviction_policy(moka::policy::EvictionPolicy::lru())
179                .eviction_listener(|sender_id, _reply_opener, cause| {
180                    tracing::warn!(?sender_id, ?cause, "evicting reply opener for pseudonym");
181                })
182                .max_capacity(cfg.max_openers_per_pseudonym.max(MINIMUM_OPENER_PSEUDONYMS) as u64)
183                .build(),
184            // SURBs are indexed only by Pseudonyms, which have longer lifetimes.
185            // For each Pseudonym, there's an RB of SURBs and their IDs.
186            surbs_per_pseudonym: Cache::builder()
187                .time_to_idle(cfg.pseudonyms_lifetime.max(MINIMUM_SURB_LIFETIME))
188                .eviction_policy(moka::policy::EvictionPolicy::lru())
189                .eviction_listener(|pseudonym, _reply_opener, cause| {
190                    tracing::warn!(%pseudonym, ?cause, "evicting surb for pseudonym");
191                })
192                .max_capacity(cfg.max_pseudonyms.max(MINIMUM_SURBS_PER_PSEUDONYM) as u64)
193                .build(),
194            cfg: cfg.into(),
195        }
196    }
197}
198
199impl Default for MemorySurbStore {
200    fn default() -> Self {
201        Self::new(SurbStoreConfig::default())
202    }
203}
204
205#[async_trait::async_trait]
206impl SurbStore for MemorySurbStore {
207    async fn find_surb(&self, matcher: SurbMatcher) -> Option<FoundSurb> {
208        let pseudonym = matcher.pseudonym();
209        let surbs_for_pseudonym = self.surbs_per_pseudonym.get(&pseudonym).await?;
210
211        match matcher {
212            SurbMatcher::Pseudonym(_) => surbs_for_pseudonym.pop_one().map(|popped_surb| FoundSurb {
213                sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
214                surb: popped_surb.surb,
215                remaining: popped_surb.remaining,
216            }),
217            // The following code intentionally only checks the first SURB in the ring buffer
218            // and does not search the entire RB.
219            // This is because the exact match use-case is suited only for situations
220            // when there is a single SURB in the RB.
221            SurbMatcher::Exact(id) => {
222                surbs_for_pseudonym
223                    .pop_one_if_has_id(&id.surb_id())
224                    .map(|popped_surb| FoundSurb {
225                        sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
226                        surb: popped_surb.surb,
227                        remaining: popped_surb.remaining, // = likely 0
228                    })
229            }
230        }
231    }
232
233    async fn insert_surbs(&self, pseudonym: HoprPseudonym, surbs: Vec<(HoprSurbId, HoprSurb)>) -> usize {
234        self.surbs_per_pseudonym
235            .entry_by_ref(&pseudonym)
236            .or_insert_with(futures::future::lazy(|_| {
237                SurbRingBuffer::new(self.cfg.rb_capacity.max(MIN_SURB_RB_CAPACITY))
238            }))
239            .await
240            .value()
241            .push(surbs)
242    }
243
244    fn insert_reply_opener(&self, sender_id: HoprSenderId, opener: ReplyOpener) {
245        let opener_lifetime = self.cfg.reply_opener_lifetime.max(MINIMUM_OPENER_LIFETIME);
246        let max_openers_per_pseudonym = self.cfg.max_openers_per_pseudonym.max(MINIMUM_OPENERS_PER_PSEUDONYM);
247        self.pseudonym_openers
248            .get_with(sender_id.pseudonym(), move || {
249                moka::sync::Cache::builder()
250                    .time_to_live(opener_lifetime)
251                    .eviction_listener(move |id: Arc<HoprSurbId>, _, cause| {
252                        if cause != RemovalCause::Explicit {
253                            tracing::warn!(
254                                pseudonym = %sender_id.pseudonym(),
255                                surb_id = hex::encode(id.as_slice()),
256                                ?cause,
257                                "evicting reply opener for sender id"
258                            );
259                        }
260                    })
261                    .max_capacity(max_openers_per_pseudonym as u64)
262                    .build()
263            })
264            .insert(sender_id.surb_id(), opener);
265    }
266
267    fn find_reply_opener(&self, sender_id: &HoprSenderId) -> Option<ReplyOpener> {
268        self.pseudonym_openers
269            .get(&sender_id.pseudonym())
270            .and_then(|cache| cache.remove(&sender_id.surb_id()))
271    }
272}
273
274/// Represents a single SURB along with its ID popped from the [`SurbRingBuffer`].
275#[derive(Debug, Clone)]
276pub struct PoppedSurb<S> {
277    /// Complete SURB sender ID.
278    pub id: HoprSurbId,
279    /// The popped SURB.
280    pub surb: S,
281    /// Number of SURBs left in the RB after the pop.
282    pub remaining: usize,
283}
284
285/// Ring buffer containing SURBs along with their IDs.
286///
287/// All these SURBs usually belong to the same pseudonym and are therefore identified
288/// only by the [`HoprSurbId`].
289#[derive(Clone, Debug)]
290pub struct SurbRingBuffer<S>(Arc<parking_lot::Mutex<ringbuffer::AllocRingBuffer<(HoprSurbId, S)>>>);
291
292impl<S> SurbRingBuffer<S> {
293    pub fn new(capacity: usize) -> Self {
294        Self(Arc::new(parking_lot::Mutex::new(ringbuffer::AllocRingBuffer::new(
295            capacity,
296        ))))
297    }
298
299    /// Push all SURBs with their IDs into the RB.
300    ///
301    /// Returns the total number of elements in the RB after the push.
302    pub fn push<I: IntoIterator<Item = (HoprSurbId, S)>>(&self, surbs: I) -> usize {
303        let mut rb = self.0.lock();
304        rb.extend(surbs);
305        rb.len()
306    }
307
308    /// Pop the latest SURB and its IDs from the RB.
309    pub fn pop_one(&self) -> Option<PoppedSurb<S>> {
310        let mut rb = self.0.lock();
311        let (id, surb) = rb.dequeue()?;
312        Some(PoppedSurb {
313            id,
314            surb,
315            remaining: rb.len(),
316        })
317    }
318
319    /// Check if the next SURB has the given ID and pop it from the RB.
320    pub fn pop_one_if_has_id(&self, id: &HoprSurbId) -> Option<PoppedSurb<S>> {
321        let mut rb = self.0.lock();
322
323        if rb.peek().is_some_and(|(surb_id, _)| surb_id == id) {
324            let (id, surb) = rb.dequeue()?;
325            Some(PoppedSurb {
326                id,
327                surb,
328                remaining: rb.len(),
329            })
330        } else {
331            None
332        }
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    #[test]
341    fn surb_ring_buffer_must_drop_items_when_capacity_is_reached() -> anyhow::Result<()> {
342        let rb = SurbRingBuffer::new(3);
343        rb.push([([1u8; 8], 0)]);
344        rb.push([([2u8; 8], 0)]);
345        rb.push([([3u8; 8], 0)]);
346        rb.push([([4u8; 8], 0)]);
347
348        let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
349        assert_eq!([2u8; 8], popped.id);
350        assert_eq!(2, popped.remaining);
351
352        let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
353        assert_eq!([3u8; 8], popped.id);
354        assert_eq!(1, popped.remaining);
355
356        let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
357        assert_eq!([4u8; 8], popped.id);
358        assert_eq!(0, popped.remaining);
359
360        assert!(rb.pop_one().is_none());
361
362        Ok(())
363    }
364
365    #[test]
366    fn surb_ring_buffer_must_be_fifo() -> anyhow::Result<()> {
367        let rb = SurbRingBuffer::new(5);
368
369        let len = rb.push([([1u8; 8], 0)]);
370        assert_eq!(1, len);
371
372        let len = rb.push([([2u8; 8], 0)]);
373        assert_eq!(2, len);
374
375        let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
376        assert_eq!([1u8; 8], popped.id);
377        assert_eq!(1, popped.remaining);
378
379        let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
380        assert_eq!([2u8; 8], popped.id);
381        assert_eq!(0, popped.remaining);
382
383        let len = rb.push([([1u8; 8], 0), ([2u8; 8], 0)]);
384        assert_eq!(2, len);
385
386        assert_eq!([1u8; 8], rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?.id);
387        assert_eq!([2u8; 8], rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?.id);
388
389        Ok(())
390    }
391
392    #[test]
393    fn surb_ring_buffer_must_not_pop_if_id_does_not_match() -> anyhow::Result<()> {
394        let rb = SurbRingBuffer::new(5);
395
396        rb.push([([1u8; 8], 0)]);
397
398        assert!(rb.pop_one_if_has_id(&[2u8; 8]).is_none());
399        assert_eq!(
400            [1u8; 8],
401            rb.pop_one_if_has_id(&[1u8; 8])
402                .ok_or(anyhow::anyhow!("expected pop"))?
403                .id
404        );
405
406        Ok(())
407    }
408}