hopr_db_sql/
cache.rs

1use std::{
2    sync::{Arc, atomic::AtomicU64},
3    time::Duration,
4};
5
6use dashmap::{DashMap, Entry};
7use hopr_crypto_packet::{
8    HoprSphinxHeaderSpec, HoprSphinxSuite, HoprSurb, ReplyOpener,
9    prelude::{HoprSenderId, HoprSurbId},
10};
11use hopr_crypto_types::prelude::*;
12use hopr_db_api::{
13    info::{IndexerData, SafeInfo},
14    prelude::DbError,
15};
16use hopr_internal_types::prelude::*;
17use hopr_primitive_types::{
18    balance::HoprBalance,
19    prelude::{Address, KeyIdent, U256},
20};
21use moka::{Expiry, future::Cache, notification::RemovalCause};
22use ringbuffer::{AllocRingBuffer, RingBuffer};
23
24use crate::errors::DbSqlError;
25
26/// Lists all singular data that can be cached and
27/// cannot be represented by a key. These values can be cached for the long term.
28#[derive(Debug, Clone, strum::EnumDiscriminants)]
29#[strum_discriminants(derive(Hash))]
30pub enum CachedValue {
31    /// Cached [IndexerData].
32    IndexerDataCache(IndexerData),
33    /// Cached [SafeInfo].
34    SafeInfoCache(Option<SafeInfo>),
35}
36
37impl TryFrom<CachedValue> for IndexerData {
38    type Error = DbSqlError;
39
40    fn try_from(value: CachedValue) -> Result<Self, Self::Error> {
41        match value {
42            CachedValue::IndexerDataCache(data) => Ok(data),
43            _ => Err(DbSqlError::DecodingError),
44        }
45    }
46}
47
48impl TryFrom<CachedValue> for Option<SafeInfo> {
49    type Error = DbSqlError;
50
51    fn try_from(value: CachedValue) -> Result<Self, Self::Error> {
52        match value {
53            CachedValue::SafeInfoCache(data) => Ok(data),
54            _ => Err(DbSqlError::DecodingError),
55        }
56    }
57}
58
59struct ExpiryNever;
60
61impl<K, V> Expiry<K, V> for ExpiryNever {
62    fn expire_after_create(&self, _key: &K, _value: &V, _current_time: std::time::Instant) -> Option<Duration> {
63        None
64    }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub(crate) struct ChannelParties(pub(crate) Address, pub(crate) Address);
69
70/// Represents a single SURB along with its ID popped from the [`SurbRingBuffer`].
71#[derive(Debug, Clone)]
72pub struct PoppedSurb<S> {
73    /// Complete SURB sender ID.
74    pub id: HoprSurbId,
75    /// The popped SURB.
76    pub surb: S,
77    /// Number of SURBs left in the RB after the pop.
78    pub remaining: usize,
79}
80
81/// Ring buffer containing SURBs along with their IDs.
82///
83/// All these SURBs usually belong to the same pseudonym and are therefore identified
84/// only by the [`HoprSurbId`].
85#[derive(Clone, Debug)]
86pub(crate) struct SurbRingBuffer<S>(Arc<parking_lot::Mutex<AllocRingBuffer<(HoprSurbId, S)>>>);
87
88impl<S> Default for SurbRingBuffer<S> {
89    fn default() -> Self {
90        // With the current packet size, this is almost 10 MB of data budget in SURBs
91        Self::new(10_000)
92    }
93}
94
95impl<S> SurbRingBuffer<S> {
96    pub fn new(capacity: usize) -> Self {
97        Self(Arc::new(parking_lot::Mutex::new(AllocRingBuffer::new(capacity))))
98    }
99
100    /// Push all SURBs with their IDs into the RB.
101    ///
102    /// Returns the total number of elements in the RB after the push.
103    pub fn push<I: IntoIterator<Item = (HoprSurbId, S)>>(&self, surbs: I) -> Result<usize, DbError> {
104        let mut rb = self.0.lock();
105
106        rb.extend(surbs);
107        Ok(rb.len())
108    }
109
110    /// Pop the latest SURB and its IDs from the RB.
111    pub fn pop_one(&self) -> Result<PoppedSurb<S>, DbError> {
112        let mut rb = self.0.lock();
113
114        let (id, surb) = rb.dequeue().ok_or(DbError::NoSurbAvailable("no more surbs".into()))?;
115        Ok(PoppedSurb {
116            id,
117            surb,
118            remaining: rb.len(),
119        })
120    }
121
122    /// Check if the next SURB has the given ID and pop it from the RB.
123    pub fn pop_one_if_has_id(&self, id: &HoprSurbId) -> Result<PoppedSurb<S>, DbError> {
124        let mut rb = self.0.lock();
125
126        if rb.peek().is_some_and(|(surb_id, _)| surb_id == id) {
127            let (id, surb) = rb.dequeue().ok_or(DbError::NoSurbAvailable("no more surbs".into()))?;
128            Ok(PoppedSurb {
129                id,
130                surb,
131                remaining: rb.len(),
132            })
133        } else {
134            Err(DbError::NoSurbAvailable("surb does not match the given id".into()))
135        }
136    }
137}
138
139/// Contains all caches used by the [crate::db::HoprDb].
140#[derive(Debug)]
141pub struct HoprDbCaches {
142    pub(crate) single_values: Cache<CachedValueDiscriminants, CachedValue>,
143    pub(crate) unacked_tickets: Cache<HalfKeyChallenge, PendingAcknowledgement>,
144    pub(crate) ticket_index: Cache<Hash, Arc<AtomicU64>>,
145    // key is (channel_id, channel_epoch) to ensure calculation of unrealized value does not
146    // include tickets from other epochs
147    pub(crate) unrealized_value: Cache<(Hash, U256), HoprBalance>,
148    pub(crate) chain_to_offchain: Cache<Address, Option<OffchainPublicKey>>,
149    pub(crate) offchain_to_chain: Cache<OffchainPublicKey, Option<Address>>,
150    pub(crate) src_dst_to_channel: Cache<ChannelParties, Option<ChannelEntry>>,
151    // KeyIdMapper must be synchronous because it is used from a sync context.
152    pub(crate) key_id_mapper: CacheKeyMapper,
153    pseudonym_openers: moka::sync::Cache<HoprPseudonym, moka::sync::Cache<HoprSurbId, ReplyOpener>>,
154    pub(crate) surbs_per_pseudonym: Cache<HoprPseudonym, SurbRingBuffer<HoprSurb>>,
155}
156
157impl Default for HoprDbCaches {
158    fn default() -> Self {
159        Self {
160            single_values: Cache::builder().time_to_idle(Duration::from_secs(1800)).build(),
161            unacked_tickets: Cache::builder()
162                .time_to_live(Duration::from_secs(30))
163                .max_capacity(1_000_000_000)
164                .build(),
165            ticket_index: Cache::builder().expire_after(ExpiryNever).max_capacity(10_000).build(),
166            unrealized_value: Cache::builder().expire_after(ExpiryNever).max_capacity(10_000).build(),
167            chain_to_offchain: Cache::builder()
168                .time_to_idle(Duration::from_secs(600))
169                .max_capacity(100_000)
170                .build(),
171            offchain_to_chain: Cache::builder()
172                .time_to_idle(Duration::from_secs(600))
173                .max_capacity(100_000)
174                .build(),
175            src_dst_to_channel: Cache::builder()
176                .time_to_live(Duration::from_secs(600))
177                .max_capacity(10_000)
178                .build(),
179            // Reply openers are indexed by entire Sender IDs (Pseudonym + SURB ID)
180            // in a cascade fashion, allowing the entire batches (by Pseudonym) to be evicted
181            // if not used.
182            pseudonym_openers: moka::sync::Cache::builder()
183                .time_to_idle(Duration::from_secs(600))
184                .eviction_policy(moka::policy::EvictionPolicy::lru())
185                .eviction_listener(|sender_id, _reply_opener, cause| {
186                    tracing::warn!(?sender_id, ?cause, "evicting reply opener for pseudonym");
187                })
188                .max_capacity(10_000)
189                .build(),
190            // SURBs are indexed only by Pseudonyms, which have longer lifetimes.
191            // For each Pseudonym, there's an RB of SURBs and their IDs.
192            surbs_per_pseudonym: Cache::builder()
193                .time_to_idle(Duration::from_secs(600))
194                .eviction_policy(moka::policy::EvictionPolicy::lru())
195                .eviction_listener(|pseudonym, _reply_opener, cause| {
196                    tracing::warn!(%pseudonym, ?cause, "evicting surb for pseudonym");
197                })
198                .max_capacity(10_000)
199                .build(),
200            key_id_mapper: CacheKeyMapper::with_capacity(10_000),
201        }
202    }
203}
204
205impl HoprDbCaches {
206    pub(crate) fn insert_pseudonym_opener(&self, sender_id: HoprSenderId, opener: ReplyOpener) {
207        self.pseudonym_openers
208            .get_with(sender_id.pseudonym(), move || {
209                moka::sync::Cache::builder()
210                    .time_to_live(Duration::from_secs(3600))
211                    .eviction_listener(move |id: Arc<HoprSurbId>, _, cause| {
212                        if cause != RemovalCause::Explicit {
213                            tracing::warn!(pseudonym = %sender_id.pseudonym(), surb_id = hex::encode(id.as_slice()), ?cause, "evicting reply opener for sender id");
214                        }
215                    })
216                    .max_capacity(100_000)
217                    .build()
218            })
219            .insert(sender_id.surb_id(), opener);
220    }
221
222    pub(crate) fn extract_pseudonym_opener(&self, sender_id: &HoprSenderId) -> Option<ReplyOpener> {
223        self.pseudonym_openers
224            .get(&sender_id.pseudonym())
225            .and_then(|cache| cache.remove(&sender_id.surb_id()))
226    }
227
228    // For future use by the SessionManager
229    #[allow(dead_code)]
230    pub(crate) fn invalidate_pseudonym_openers(&self, pseudonym: &HoprPseudonym) {
231        self.pseudonym_openers.invalidate(pseudonym);
232    }
233
234    /// Invalidates all caches.
235    pub fn invalidate_all(&self) {
236        self.single_values.invalidate_all();
237        self.unacked_tickets.invalidate_all();
238        self.unrealized_value.invalidate_all();
239        self.chain_to_offchain.invalidate_all();
240        self.offchain_to_chain.invalidate_all();
241        self.src_dst_to_channel.invalidate_all();
242        // NOTE: key_id_mapper intentionally not invalidated
243    }
244}
245
246#[derive(Debug)]
247pub(crate) struct CacheKeyMapper(
248    DashMap<KeyIdent<4>, OffchainPublicKey>,
249    DashMap<OffchainPublicKey, KeyIdent<4>>,
250);
251
252impl CacheKeyMapper {
253    pub fn with_capacity(capacity: usize) -> Self {
254        Self(DashMap::with_capacity(capacity), DashMap::with_capacity(capacity))
255    }
256
257    /// Creates key id mapping for a public key of an [account](AccountEntry).
258    ///
259    /// Does nothing if the binding already exists. Returns error if an existing binding
260    /// is not consistent.
261    pub fn update_key_id_binding(&self, account: &AccountEntry) -> Result<(), DbSqlError> {
262        let id = account.key_id();
263        let key = account.public_key;
264
265        // Lock entries in the maps to avoid concurrent modifications
266        let id_entry = self.0.entry(id);
267        let key_entry = self.1.entry(key);
268
269        match (id_entry, key_entry) {
270            (Entry::Vacant(v_id), Entry::Vacant(v_key)) => {
271                v_id.insert_entry(key);
272                v_key.insert_entry(id);
273                tracing::debug!(%id, %key, "inserted key-id binding");
274                Ok(())
275            }
276            (Entry::Occupied(v_id), Entry::Occupied(v_key)) => {
277                // Check if the existing binding is consistent with the new one.
278                if v_id.get() != v_key.key() {
279                    Err(DbSqlError::LogicalError(format!(
280                        "attempt to insert key {key} with key-id {id}, but key-id already maps to key {} while {} is \
281                         expected",
282                        v_id.get(),
283                        v_key.key(),
284                    )))
285                } else {
286                    Ok(())
287                }
288            }
289            // This only happens on re-announcements:
290            // The re-announcement uses the same packet key and chain-key, but the block number (published at)
291            // is different, and therefore the id_entry will be vacant.
292            (Entry::Vacant(_), Entry::Occupied(v_key)) => {
293                tracing::debug!(
294                    "attempt to insert key {key} with key-id {id} failed because key is already set as {}",
295                    v_key.get()
296                );
297                Err(DbSqlError::LogicalError("inconsistent key-id binding".into()))
298            }
299            // This should never happen.
300            (Entry::Occupied(v_id), Entry::Vacant(_)) => {
301                tracing::debug!(
302                    "attempt to insert key {key} with key-id {id} failed because key-id is already set as {}",
303                    v_id.get()
304                );
305                Err(DbSqlError::LogicalError("inconsistent key-id binding".into()))
306            }
307        }
308    }
309}
310
311impl hopr_crypto_packet::KeyIdMapper<HoprSphinxSuite, HoprSphinxHeaderSpec> for CacheKeyMapper {
312    fn map_key_to_id(&self, key: &OffchainPublicKey) -> Option<KeyIdent> {
313        self.1.get(key).map(|k| *k.value())
314    }
315
316    fn map_id_to_public(&self, id: &KeyIdent) -> Option<OffchainPublicKey> {
317        self.0.get(id).map(|k| *k.value())
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324
325    #[test]
326    fn surb_ring_buffer_must_drop_items_when_capacity_is_reached() -> anyhow::Result<()> {
327        let rb = SurbRingBuffer::new(3);
328        rb.push([([1u8; 8], 0)])?;
329        rb.push([([2u8; 8], 0)])?;
330        rb.push([([3u8; 8], 0)])?;
331        rb.push([([4u8; 8], 0)])?;
332
333        let popped = rb.pop_one()?;
334        assert_eq!([2u8; 8], popped.id);
335        assert_eq!(2, popped.remaining);
336
337        let popped = rb.pop_one()?;
338        assert_eq!([3u8; 8], popped.id);
339        assert_eq!(1, popped.remaining);
340
341        let popped = rb.pop_one()?;
342        assert_eq!([4u8; 8], popped.id);
343        assert_eq!(0, popped.remaining);
344
345        assert!(rb.pop_one().is_err());
346
347        Ok(())
348    }
349
350    #[test]
351    fn surb_ring_buffer_must_be_fifo() -> anyhow::Result<()> {
352        let rb = SurbRingBuffer::new(5);
353
354        let len = rb.push([([1u8; 8], 0)])?;
355        assert_eq!(1, len);
356
357        let len = rb.push([([2u8; 8], 0)])?;
358        assert_eq!(2, len);
359
360        let popped = rb.pop_one()?;
361        assert_eq!([1u8; 8], popped.id);
362        assert_eq!(1, popped.remaining);
363
364        let popped = rb.pop_one()?;
365        assert_eq!([2u8; 8], popped.id);
366        assert_eq!(0, popped.remaining);
367
368        let len = rb.push([([1u8; 8], 0), ([2u8; 8], 0)])?;
369        assert_eq!(2, len);
370
371        assert_eq!([1u8; 8], rb.pop_one()?.id);
372        assert_eq!([2u8; 8], rb.pop_one()?.id);
373
374        Ok(())
375    }
376
377    #[test]
378    fn surb_ring_buffer_must_not_pop_if_id_does_not_match() -> anyhow::Result<()> {
379        let rb = SurbRingBuffer::new(5);
380
381        rb.push([([1u8; 8], 0)])?;
382
383        assert!(rb.pop_one_if_has_id(&[2u8; 8]).is_err());
384        assert_eq!([1u8; 8], rb.pop_one_if_has_id(&[1u8; 8])?.id);
385
386        Ok(())
387    }
388}