hopr_db_sql/
cache.rs

1use std::{
2    sync::{Arc, Mutex, atomic::AtomicU64},
3    time::Duration,
4};
5
6use dashmap::{DashMap, Entry};
7use hopr_crypto_packet::{
8    HoprSphinxHeaderSpec, HoprSphinxSuite, HoprSurb, ReplyOpener,
9    prelude::{HoprSenderId, HoprSurbId},
10};
11use hopr_crypto_types::prelude::*;
12use hopr_db_api::{
13    info::{IndexerData, SafeInfo},
14    prelude::DbError,
15};
16use hopr_internal_types::prelude::*;
17use hopr_primitive_types::{
18    balance::HoprBalance,
19    prelude::{Address, KeyIdent, U256},
20};
21use moka::{Expiry, future::Cache};
22use ringbuffer::{AllocRingBuffer, RingBuffer};
23
24use crate::errors::DbSqlError;
25
26/// Lists all singular data that can be cached and
27/// cannot be represented by a key. These values can be cached for the long term.
28#[derive(Debug, Clone, strum::EnumDiscriminants)]
29#[strum_discriminants(derive(Hash))]
30pub enum CachedValue {
31    /// Cached [IndexerData].
32    IndexerDataCache(IndexerData),
33    /// Cached [SafeInfo].
34    SafeInfoCache(Option<SafeInfo>),
35}
36
37impl TryFrom<CachedValue> for IndexerData {
38    type Error = DbSqlError;
39
40    fn try_from(value: CachedValue) -> Result<Self, Self::Error> {
41        match value {
42            CachedValue::IndexerDataCache(data) => Ok(data),
43            _ => Err(DbSqlError::DecodingError),
44        }
45    }
46}
47
48impl TryFrom<CachedValue> for Option<SafeInfo> {
49    type Error = DbSqlError;
50
51    fn try_from(value: CachedValue) -> Result<Self, Self::Error> {
52        match value {
53            CachedValue::SafeInfoCache(data) => Ok(data),
54            _ => Err(DbSqlError::DecodingError),
55        }
56    }
57}
58
59struct ExpiryNever;
60
61impl<K, V> Expiry<K, V> for ExpiryNever {
62    fn expire_after_create(&self, _key: &K, _value: &V, _current_time: std::time::Instant) -> Option<Duration> {
63        None
64    }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub(crate) struct ChannelParties(pub(crate) Address, pub(crate) Address);
69
70/// Ring buffer containing SURBs along with their IDs.
71/// All these SURBs usually belong to the same pseudonym.
72#[derive(Clone, Debug)]
73pub(crate) struct SurbRingBuffer(Arc<Mutex<AllocRingBuffer<(HoprSurbId, HoprSurb)>>>);
74
75impl Default for SurbRingBuffer {
76    fn default() -> Self {
77        // With the current packet size, this is roughly for 7 MB of data budget in SURBs
78        Self::new(10_000)
79    }
80}
81
82impl SurbRingBuffer {
83    pub fn new(capacity: usize) -> Self {
84        Self(Arc::new(Mutex::new(AllocRingBuffer::new(capacity))))
85    }
86
87    /// Push all SURBs with their IDs into the RB.
88    pub fn push<I: IntoIterator<Item = (HoprSurbId, HoprSurb)>>(&self, surbs: I) -> Result<(), DbError> {
89        self.0
90            .lock()
91            .map_err(|_| DbError::LogicalError("failed to lock surbs".into()))?
92            .extend(surbs);
93        Ok(())
94    }
95
96    /// Pop the latest SURB and its IDs from the RB.
97    pub fn pop_one(&self) -> Result<(HoprSurbId, HoprSurb), DbError> {
98        self.0
99            .lock()
100            .map_err(|_| DbError::LogicalError("failed to lock surbs".into()))?
101            .dequeue()
102            .ok_or(DbError::NoSurbAvailable("no more surbs".into()))
103    }
104
105    /// Check if the next SURB has the given ID and pop it from the RB.
106    pub fn pop_one_if_has_id(&self, id: &HoprSurbId) -> Result<(HoprSurbId, HoprSurb), DbError> {
107        let mut rb = self
108            .0
109            .lock()
110            .map_err(|_| DbError::LogicalError("failed to lock surbs".into()))?;
111        if rb.peek().is_some_and(|(surb_id, _)| surb_id == id) {
112            rb.dequeue().ok_or(DbError::NoSurbAvailable("no more surbs".into()))
113        } else {
114            Err(DbError::NoSurbAvailable("surb does not match the given id".into()))
115        }
116    }
117}
118
119/// Contains all caches used by the [crate::db::HoprDb].
120#[derive(Debug)]
121pub struct HoprDbCaches {
122    pub(crate) single_values: Cache<CachedValueDiscriminants, CachedValue>,
123    pub(crate) unacked_tickets: Cache<HalfKeyChallenge, PendingAcknowledgement>,
124    pub(crate) ticket_index: Cache<Hash, Arc<AtomicU64>>,
125    // key is (channel_id, channel_epoch) to ensure calculation of unrealized value does not
126    // include tickets from other epochs
127    pub(crate) unrealized_value: Cache<(Hash, U256), HoprBalance>,
128    pub(crate) chain_to_offchain: Cache<Address, Option<OffchainPublicKey>>,
129    pub(crate) offchain_to_chain: Cache<OffchainPublicKey, Option<Address>>,
130    pub(crate) src_dst_to_channel: Cache<ChannelParties, Option<ChannelEntry>>,
131    // KeyIdMapper must be synchronous because it is used from a sync context.
132    pub(crate) key_id_mapper: CacheKeyMapper,
133    pub(crate) pseudonym_openers: moka::sync::Cache<HoprSenderId, ReplyOpener>,
134    pub(crate) surbs_per_pseudonym: Cache<HoprPseudonym, SurbRingBuffer>,
135}
136
137impl Default for HoprDbCaches {
138    fn default() -> Self {
139        let single_values = Cache::builder().time_to_idle(Duration::from_secs(1800)).build();
140
141        let unacked_tickets = Cache::builder()
142            .time_to_live(Duration::from_secs(30))
143            .max_capacity(1_000_000_000)
144            .build();
145
146        let ticket_index = Cache::builder().expire_after(ExpiryNever).max_capacity(10_000).build();
147
148        let unrealized_value = Cache::builder().expire_after(ExpiryNever).max_capacity(10_000).build();
149
150        let chain_to_offchain = Cache::builder()
151            .time_to_idle(Duration::from_secs(600))
152            .max_capacity(100_000)
153            .build();
154
155        let offchain_to_chain = Cache::builder()
156            .time_to_idle(Duration::from_secs(600))
157            .max_capacity(100_000)
158            .build();
159
160        let src_dst_to_channel = Cache::builder()
161            .time_to_live(Duration::from_secs(600))
162            .max_capacity(10_000)
163            .build();
164
165        // SURB openers are indexed by entire Sender IDs (Pseudonym + SURB ID)
166        // and therefore, there's more but with a shorter lifetime
167        let pseudonym_openers = moka::sync::Cache::builder()
168            .time_to_live(Duration::from_secs(60))
169            .max_capacity(100_000)
170            .build();
171
172        // SURBs are indexed only by Pseudonyms, which have longer lifetimes.
173        // For each Pseudonym, there's an RB of SURBs and their IDs.
174        let surbs_per_pseudonym = Cache::builder()
175            .time_to_idle(Duration::from_secs(600))
176            .max_capacity(10_000)
177            .build();
178
179        Self {
180            single_values,
181            unacked_tickets,
182            ticket_index,
183            unrealized_value,
184            chain_to_offchain,
185            offchain_to_chain,
186            src_dst_to_channel,
187            pseudonym_openers,
188            surbs_per_pseudonym,
189            key_id_mapper: CacheKeyMapper::with_capacity(10_000),
190        }
191    }
192}
193
194impl HoprDbCaches {
195    /// Invalidates all caches.
196    pub fn invalidate_all(&self) {
197        self.single_values.invalidate_all();
198        self.unacked_tickets.invalidate_all();
199        self.unrealized_value.invalidate_all();
200        self.chain_to_offchain.invalidate_all();
201        self.offchain_to_chain.invalidate_all();
202        self.src_dst_to_channel.invalidate_all();
203        // NOTE: key_id_mapper intentionally not invalidated
204    }
205}
206
207#[derive(Debug)]
208pub(crate) struct CacheKeyMapper(
209    DashMap<KeyIdent<4>, OffchainPublicKey>,
210    DashMap<OffchainPublicKey, KeyIdent<4>>,
211);
212
213impl CacheKeyMapper {
214    pub fn with_capacity(capacity: usize) -> Self {
215        Self(DashMap::with_capacity(capacity), DashMap::with_capacity(capacity))
216    }
217
218    /// Creates key id mapping for a public key of an [account](AccountEntry).
219    ///
220    /// Does nothing if the binding already exists. Returns error if an existing binding
221    /// is not consistent.
222    pub fn update_key_id_binding(&self, account: &AccountEntry) -> Result<(), DbSqlError> {
223        let id = account.key_id();
224        let key = account.public_key;
225
226        // Lock entries in the maps to avoid concurrent modifications
227        let id_entry = self.0.entry(id);
228        let key_entry = self.1.entry(key);
229
230        match (id_entry, key_entry) {
231            (Entry::Vacant(v_id), Entry::Vacant(v_key)) => {
232                v_id.insert_entry(key);
233                v_key.insert_entry(id);
234                tracing::debug!(%id, %key, "inserted key-id binding");
235                Ok(())
236            }
237            (Entry::Occupied(v_id), Entry::Occupied(v_key)) => {
238                // Check if the existing binding is consistent with the new one.
239                if v_id.get() != v_key.key() {
240                    Err(DbSqlError::LogicalError(format!(
241                        "attempt to insert key {key} with key-id {id}, but key-id already maps to key {} while {} is \
242                         expected",
243                        v_id.get(),
244                        v_key.key(),
245                    )))
246                } else {
247                    Ok(())
248                }
249            }
250            // This only happens on re-announcements:
251            // The re-announcement uses the same packet key and chain-key, but the block number (published at)
252            // is different, and therefore the id_entry will be vacant.
253            (Entry::Vacant(_), Entry::Occupied(v_key)) => {
254                tracing::debug!(
255                    "attempt to insert key {key} with key-id {id} failed because key is already set as {}",
256                    v_key.get()
257                );
258                Err(DbSqlError::LogicalError("inconsistent key-id binding".into()))
259            }
260            // This should never happen.
261            (Entry::Occupied(v_id), Entry::Vacant(_)) => {
262                tracing::debug!(
263                    "attempt to insert key {key} with key-id {id} failed because key-id is already set as {}",
264                    v_id.get()
265                );
266                Err(DbSqlError::LogicalError("inconsistent key-id binding".into()))
267            }
268        }
269    }
270}
271
272impl hopr_crypto_packet::KeyIdMapper<HoprSphinxSuite, HoprSphinxHeaderSpec> for CacheKeyMapper {
273    fn map_key_to_id(&self, key: &OffchainPublicKey) -> Option<KeyIdent> {
274        self.1.get(key).map(|k| *k.value())
275    }
276
277    fn map_id_to_public(&self, id: &KeyIdent) -> Option<OffchainPublicKey> {
278        self.0.get(id).map(|k| *k.value())
279    }
280}