Skip to main content

hopr_protocol_hopr/
surb_store.rs

1use std::{sync::Arc, time::Duration};
2
3use hopr_api::types::internal::{prelude::HoprPseudonym, routing::SurbMatcher};
4use hopr_crypto_packet::prelude::*;
5use moka::notification::RemovalCause;
6use ringbuffer::RingBuffer;
7use validator::ValidationError;
8
9use crate::{FoundSurb, traits::SurbStore};
10
11const MINIMUM_SURB_LIFETIME: Duration = Duration::from_secs(30);
12const MINIMUM_OPENER_PSEUDONYMS: usize = 1000;
13const MINIMUM_OPENERS_PER_PSEUDONYM: usize = 1000;
14const MINIMUM_SURBS_PER_PSEUDONYM: usize = 1000;
15const MINIMUM_OPENER_LIFETIME: Duration = Duration::from_secs(60);
16const MIN_SURB_RB_CAPACITY: usize = 1024;
17
18fn validate_pseudonyms_lifetime(lifetime: &Duration) -> Result<(), ValidationError> {
19    if lifetime < &MINIMUM_SURB_LIFETIME {
20        Err(ValidationError::new("pseudonyms_lifetime is too low"))
21    } else {
22        Ok(())
23    }
24}
25
26fn validate_reply_opener_lifetime(lifetime: &Duration) -> Result<(), ValidationError> {
27    if lifetime < &MINIMUM_OPENER_LIFETIME {
28        Err(ValidationError::new("reply_opener_lifetime is too low"))
29    } else {
30        Ok(())
31    }
32}
33
34fn default_rb_capacity() -> usize {
35    15_000
36}
37
38fn default_distress_threshold() -> usize {
39    500
40}
41
42fn default_max_openers_per_pseudonym() -> usize {
43    100_000
44}
45
46fn default_max_pseudonyms() -> usize {
47    10_000
48}
49
50fn default_pseudonyms_lifetime() -> Duration {
51    Duration::from_secs(600)
52}
53
54fn default_reply_opener_lifetime() -> Duration {
55    Duration::from_secs(3600)
56}
57
58/// Configuration for the SURB cache.
59///
60/// The configuration options affect both the sending side (SURB creator) and the
61/// replying side (SURB consumer).
62///
63/// In the classical scenario (`Entry - Relay 1 -... - Exit`), the sending side is
64/// the `Entry` and the replying side is the `Exit`.
65#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, smart_default::SmartDefault, validator::Validate)]
66#[cfg_attr(
67    feature = "serde",
68    derive(serde::Deserialize, serde::Serialize),
69    serde(deny_unknown_fields)
70)]
71pub struct SurbStoreConfig {
72    /// Size of the SURB ring buffer per pseudonym.
73    ///
74    /// Affects only the replying side.
75    ///
76    /// This indicates how many SURBs can be at most held to be used to send a reply
77    /// back to the sending side.
78    ///
79    /// Default is 15 000.
80    #[default(default_rb_capacity())]
81    #[validate(range(min = 1024, message = "rb_capacity must be at least 1024"))]
82    #[cfg_attr(feature = "serde", serde(default = "default_rb_capacity"))]
83    pub rb_capacity: usize,
84    /// Threshold for the number of SURBs in the ring buffer, below which it is
85    /// considered low ("SURB distress").
86    ///
87    /// Default is 500.
88    #[default(default_distress_threshold())]
89    #[validate(range(min = 10, message = "distress_threshold must be at least 10"))]
90    #[cfg_attr(feature = "serde", serde(default = "default_distress_threshold"))]
91    pub distress_threshold: usize,
92    /// Maximum number of reply openers (SURB counterparts) per pseudonym.
93    ///
94    /// Affects only the sending side when decrypting a received reply.
95    ///
96    /// This mostly affects Sessions, as they use a fixed pseudonym.
97    /// It reflects how many reply openers the initiator-side of a Session can hold,
98    /// until the oldest ones are dropped. If the other party uses a SURB corresponding
99    /// to a dropped reply opener, the reply message will be undecryptable by the initiator-side.
100    ///
101    /// Default is 100 000.
102    #[default(default_max_openers_per_pseudonym())]
103    #[validate(range(min = 100, message = "max_openers_per_pseudonym must be at least 100"))]
104    #[cfg_attr(feature = "serde", serde(default = "default_max_openers_per_pseudonym"))]
105    pub max_openers_per_pseudonym: usize,
106    /// The maximum number of distinct pseudonyms for which we hold a SURB ringbuffer.
107    ///
108    /// Affects only the replying side.
109    ///
110    /// For each pseudonym, there is a ring-buffer with capacity `rb_capacity`.
111    ///
112    /// Default is 10 000.
113    #[default(default_max_pseudonyms())]
114    #[validate(range(min = 100, message = "max_pseudonyms must be at least 100"))]
115    #[cfg_attr(feature = "serde", serde(default = "default_max_pseudonyms"))]
116    pub max_pseudonyms: usize,
117    /// Maximum lifetime of ring-buffer for each pseudonym.
118    ///
119    /// # Effects on sending side
120    /// This is the period for which we hold all reply openers for a pseudonym.
121    /// If no more messages carrying SURBs are sent during this period, the entire stash of
122    /// reply openers is dropped. Preventing receiving any more replies for that pseudonym.
123    ///
124    /// # Effects on replying side
125    /// If a pseudonym has not received any SURBs for this period,
126    /// the entire ring buffer with `rb_capacity` (= all SURBs for this pseudonym) is dropped.
127    /// Preventing from sending any more replies for that pseudonym.
128    ///
129    /// Default is 600 seconds.
130    #[default(default_pseudonyms_lifetime())]
131    #[validate(custom(function = "validate_pseudonyms_lifetime"))]
132    #[cfg_attr(
133        feature = "serde",
134        serde(default = "default_pseudonyms_lifetime", with = "humantime_serde")
135    )]
136    pub pseudonyms_lifetime: Duration,
137    /// Maximum lifetime of a reply opener.
138    ///
139    /// Affects only the sending side.
140    ///
141    /// A reply opener is distinguished using [`HoprSurbId`] and a pseudonym it belongs to.
142    /// If a reply opener is not used to decrypt the received packet within this period,
143    /// it is dropped. If the replying side uses the corresponding SURB to send a reply,
144    /// it won't be possible to decrypt it when received.
145    ///
146    /// Default is 3600 seconds.
147    #[default(default_reply_opener_lifetime())]
148    #[validate(custom(function = "validate_reply_opener_lifetime"))]
149    #[cfg_attr(
150        feature = "serde",
151        serde(default = "default_reply_opener_lifetime", with = "humantime_serde")
152    )]
153    pub reply_opener_lifetime: Duration,
154}
155
156/// Basic [`SurbStore`] implementation based on an in-memory cache.
157///
158/// This SURB store offers no persistence, and all SURBs and Reply Openers are lost once dropped.
159///
160/// The instance can be cheaply cloned.
161#[derive(Clone)]
162pub struct MemorySurbStore {
163    pseudonym_openers: moka::sync::Cache<HoprPseudonym, moka::sync::Cache<HoprSurbId, ReplyOpener>>,
164    surbs_per_pseudonym: moka::sync::Cache<HoprPseudonym, SurbRingBuffer<HoprSurb>>,
165    cfg: Arc<SurbStoreConfig>,
166}
167
168impl MemorySurbStore {
169    /// Creates a new instance with the given configuration.
170    pub fn new(cfg: SurbStoreConfig) -> Self {
171        Self {
172            // Reply openers are indexed by entire Sender IDs (Pseudonym + SURB ID)
173            // in a cascade fashion, allowing the entire batches (by Pseudonym) to be evicted
174            // if not used.
175            pseudonym_openers: moka::sync::Cache::builder()
176                .time_to_idle(cfg.pseudonyms_lifetime.max(MINIMUM_SURB_LIFETIME))
177                .eviction_policy(moka::policy::EvictionPolicy::lru())
178                .eviction_listener(|sender_id, _reply_opener, cause| {
179                    tracing::warn!(?sender_id, ?cause, "evicting reply opener for pseudonym");
180                })
181                .max_capacity(cfg.max_openers_per_pseudonym.max(MINIMUM_OPENER_PSEUDONYMS) as u64)
182                .build(),
183            // SURBs are indexed only by Pseudonyms, which have longer lifetimes.
184            // For each Pseudonym, there's an RB of SURBs and their IDs.
185            surbs_per_pseudonym: moka::sync::Cache::builder()
186                .time_to_idle(cfg.pseudonyms_lifetime.max(MINIMUM_SURB_LIFETIME))
187                .eviction_policy(moka::policy::EvictionPolicy::lru())
188                .eviction_listener(|pseudonym, _reply_opener, cause| {
189                    tracing::warn!(%pseudonym, ?cause, "evicting surb for pseudonym");
190                })
191                .max_capacity(cfg.max_pseudonyms.max(MINIMUM_SURBS_PER_PSEUDONYM) as u64)
192                .build(),
193            cfg: cfg.into(),
194        }
195    }
196}
197
198impl Default for MemorySurbStore {
199    fn default() -> Self {
200        Self::new(SurbStoreConfig::default())
201    }
202}
203
204#[async_trait::async_trait]
205impl SurbStore for MemorySurbStore {
206    #[tracing::instrument(skip_all, level = "trace", fields(?matcher), ret)]
207    fn find_surb(&self, matcher: SurbMatcher) -> Option<FoundSurb> {
208        let pseudonym = matcher.pseudonym();
209        let surbs_for_pseudonym = self.surbs_per_pseudonym.get(&pseudonym)?;
210
211        match matcher {
212            SurbMatcher::Pseudonym(_) => surbs_for_pseudonym.pop_one().map(|popped_surb| FoundSurb {
213                sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
214                surb: popped_surb.surb,
215                remaining: popped_surb.remaining,
216            }),
217            // The following code intentionally only checks the first SURB in the ring buffer
218            // and does not search the entire RB.
219            // This is because the exact match use-case is suited only for situations
220            // when there is a single SURB in the RB.
221            SurbMatcher::Exact(id) => {
222                surbs_for_pseudonym
223                    .pop_one_if_has_id(&id.surb_id())
224                    .map(|popped_surb| FoundSurb {
225                        sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
226                        surb: popped_surb.surb,
227                        remaining: popped_surb.remaining, // = likely 0
228                    })
229            }
230        }
231    }
232
233    #[tracing::instrument(skip_all, level = "trace", fields(%pseudonym, num_surbs = surbs.len()))]
234    fn insert_surbs(&self, pseudonym: HoprPseudonym, surbs: Vec<(HoprSurbId, HoprSurb)>) -> usize {
235        self.surbs_per_pseudonym
236            .entry_by_ref(&pseudonym)
237            .or_insert_with(|| SurbRingBuffer::new(self.cfg.rb_capacity.max(MIN_SURB_RB_CAPACITY)))
238            .value()
239            .push(surbs)
240    }
241
242    #[tracing::instrument(skip_all, level = "trace", fields(?sender_id))]
243    fn insert_reply_opener(&self, sender_id: HoprSenderId, opener: ReplyOpener) {
244        let opener_lifetime = self.cfg.reply_opener_lifetime.max(MINIMUM_OPENER_LIFETIME);
245        let max_openers_per_pseudonym = self.cfg.max_openers_per_pseudonym.max(MINIMUM_OPENERS_PER_PSEUDONYM);
246        self.pseudonym_openers
247            .get_with(sender_id.pseudonym(), move || {
248                moka::sync::Cache::builder()
249                    .time_to_live(opener_lifetime)
250                    .eviction_listener(move |id: Arc<HoprSurbId>, _, cause| {
251                        if cause != RemovalCause::Explicit {
252                            tracing::warn!(
253                                pseudonym = %sender_id.pseudonym(),
254                                surb_id = hex::encode(id.as_slice()),
255                                ?cause,
256                                "evicting reply opener for sender id"
257                            );
258                        }
259                    })
260                    .max_capacity(max_openers_per_pseudonym as u64)
261                    .build()
262            })
263            .insert(sender_id.surb_id(), opener);
264    }
265
266    #[tracing::instrument(skip_all, level = "trace", fields(?sender_id), ret)]
267    fn find_reply_opener(&self, sender_id: &HoprSenderId) -> Option<ReplyOpener> {
268        self.pseudonym_openers
269            .get(&sender_id.pseudonym())
270            .and_then(|cache| cache.remove(&sender_id.surb_id()))
271    }
272}
273
274/// Represents a single SURB along with its ID popped from the [`SurbRingBuffer`].
275#[derive(Debug, Clone)]
276pub struct PoppedSurb<S> {
277    /// Complete SURB sender ID.
278    pub id: HoprSurbId,
279    /// The popped SURB.
280    pub surb: S,
281    /// Number of SURBs left in the RB after the pop.
282    pub remaining: usize,
283}
284
285/// Ring buffer containing SURBs along with their IDs.
286///
287/// All these SURBs usually belong to the same pseudonym and are therefore identified
288/// only by the [`HoprSurbId`].
289#[derive(Clone, Debug)]
290pub struct SurbRingBuffer<S>(Arc<parking_lot::Mutex<ringbuffer::AllocRingBuffer<(HoprSurbId, S)>>>);
291
292impl<S> SurbRingBuffer<S> {
293    pub fn new(capacity: usize) -> Self {
294        Self(Arc::new(parking_lot::Mutex::new(ringbuffer::AllocRingBuffer::new(
295            capacity,
296        ))))
297    }
298
299    /// Push all SURBs with their IDs into the RB.
300    ///
301    /// Returns the total number of elements in the RB after the push.
302    pub fn push<I: IntoIterator<Item = (HoprSurbId, S)>>(&self, surbs: I) -> usize {
303        let mut rb = self.0.lock();
304        rb.extend(surbs);
305        rb.len()
306    }
307
308    /// Pop the latest SURB and its IDs from the RB.
309    pub fn pop_one(&self) -> Option<PoppedSurb<S>> {
310        let mut rb = self.0.lock();
311        let (id, surb) = rb.dequeue()?;
312        Some(PoppedSurb {
313            id,
314            surb,
315            remaining: rb.len(),
316        })
317    }
318
319    /// Check if the next SURB has the given ID and pop it from the RB.
320    pub fn pop_one_if_has_id(&self, id: &HoprSurbId) -> Option<PoppedSurb<S>> {
321        let mut rb = self.0.lock();
322
323        if rb.peek().is_some_and(|(surb_id, _)| surb_id == id) {
324            let (id, surb) = rb.dequeue()?;
325            Some(PoppedSurb {
326                id,
327                surb,
328                remaining: rb.len(),
329            })
330        } else {
331            None
332        }
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    #[test]
341    fn surb_ring_buffer_must_drop_items_when_capacity_is_reached() -> anyhow::Result<()> {
342        let rb = SurbRingBuffer::new(3);
343        rb.push([([1u8; 8], 0)]);
344        rb.push([([2u8; 8], 0)]);
345        rb.push([([3u8; 8], 0)]);
346        rb.push([([4u8; 8], 0)]);
347
348        let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
349        assert_eq!([2u8; 8], popped.id);
350        assert_eq!(2, popped.remaining);
351
352        let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
353        assert_eq!([3u8; 8], popped.id);
354        assert_eq!(1, popped.remaining);
355
356        let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
357        assert_eq!([4u8; 8], popped.id);
358        assert_eq!(0, popped.remaining);
359
360        assert!(rb.pop_one().is_none());
361
362        Ok(())
363    }
364
365    #[test]
366    fn surb_ring_buffer_must_be_fifo() -> anyhow::Result<()> {
367        let rb = SurbRingBuffer::new(5);
368
369        let len = rb.push([([1u8; 8], 0)]);
370        assert_eq!(1, len);
371
372        let len = rb.push([([2u8; 8], 0)]);
373        assert_eq!(2, len);
374
375        let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
376        assert_eq!([1u8; 8], popped.id);
377        assert_eq!(1, popped.remaining);
378
379        let popped = rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?;
380        assert_eq!([2u8; 8], popped.id);
381        assert_eq!(0, popped.remaining);
382
383        let len = rb.push([([1u8; 8], 0), ([2u8; 8], 0)]);
384        assert_eq!(2, len);
385
386        assert_eq!([1u8; 8], rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?.id);
387        assert_eq!([2u8; 8], rb.pop_one().ok_or(anyhow::anyhow!("expected pop"))?.id);
388
389        Ok(())
390    }
391
392    #[test]
393    fn surb_ring_buffer_must_not_pop_if_id_does_not_match() -> anyhow::Result<()> {
394        let rb = SurbRingBuffer::new(5);
395
396        rb.push([([1u8; 8], 0)]);
397
398        assert!(rb.pop_one_if_has_id(&[2u8; 8]).is_none());
399        assert_eq!(
400            [1u8; 8],
401            rb.pop_one_if_has_id(&[1u8; 8])
402                .ok_or(anyhow::anyhow!("expected pop"))?
403                .id
404        );
405
406        Ok(())
407    }
408}