1use std::{
2 sync::{Arc, atomic::AtomicU64},
3 time::Duration,
4};
5
6use dashmap::{DashMap, Entry};
7use hopr_crypto_packet::{
8 HoprSphinxHeaderSpec, HoprSphinxSuite, HoprSurb, ReplyOpener,
9 prelude::{HoprSenderId, HoprSurbId},
10};
11use hopr_crypto_types::prelude::*;
12use hopr_db_api::{
13 info::{IndexerData, SafeInfo},
14 prelude::DbError,
15};
16use hopr_internal_types::prelude::*;
17use hopr_primitive_types::{
18 balance::HoprBalance,
19 prelude::{Address, KeyIdent, U256},
20};
21use moka::{Expiry, future::Cache, notification::RemovalCause};
22use ringbuffer::{AllocRingBuffer, RingBuffer};
23
24use crate::errors::DbSqlError;
25
26#[derive(Debug, Clone, strum::EnumDiscriminants)]
29#[strum_discriminants(derive(Hash))]
30pub enum CachedValue {
31 IndexerDataCache(IndexerData),
33 SafeInfoCache(Option<SafeInfo>),
35}
36
37impl TryFrom<CachedValue> for IndexerData {
38 type Error = DbSqlError;
39
40 fn try_from(value: CachedValue) -> Result<Self, Self::Error> {
41 match value {
42 CachedValue::IndexerDataCache(data) => Ok(data),
43 _ => Err(DbSqlError::DecodingError),
44 }
45 }
46}
47
48impl TryFrom<CachedValue> for Option<SafeInfo> {
49 type Error = DbSqlError;
50
51 fn try_from(value: CachedValue) -> Result<Self, Self::Error> {
52 match value {
53 CachedValue::SafeInfoCache(data) => Ok(data),
54 _ => Err(DbSqlError::DecodingError),
55 }
56 }
57}
58
59struct ExpiryNever;
60
61impl<K, V> Expiry<K, V> for ExpiryNever {
62 fn expire_after_create(&self, _key: &K, _value: &V, _current_time: std::time::Instant) -> Option<Duration> {
63 None
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub(crate) struct ChannelParties(pub(crate) Address, pub(crate) Address);
69
70#[derive(Debug, Clone)]
72pub struct PoppedSurb<S> {
73 pub id: HoprSurbId,
75 pub surb: S,
77 pub remaining: usize,
79}
80
81#[derive(Clone, Debug)]
86pub(crate) struct SurbRingBuffer<S>(Arc<parking_lot::Mutex<AllocRingBuffer<(HoprSurbId, S)>>>);
87
88impl<S> Default for SurbRingBuffer<S> {
89 fn default() -> Self {
90 Self::new(10_000)
92 }
93}
94
95impl<S> SurbRingBuffer<S> {
96 pub fn new(capacity: usize) -> Self {
97 Self(Arc::new(parking_lot::Mutex::new(AllocRingBuffer::new(capacity))))
98 }
99
100 pub fn push<I: IntoIterator<Item = (HoprSurbId, S)>>(&self, surbs: I) -> Result<usize, DbError> {
104 let mut rb = self.0.lock();
105
106 rb.extend(surbs);
107 Ok(rb.len())
108 }
109
110 pub fn pop_one(&self) -> Result<PoppedSurb<S>, DbError> {
112 let mut rb = self.0.lock();
113
114 let (id, surb) = rb.dequeue().ok_or(DbError::NoSurbAvailable("no more surbs".into()))?;
115 Ok(PoppedSurb {
116 id,
117 surb,
118 remaining: rb.len(),
119 })
120 }
121
122 pub fn pop_one_if_has_id(&self, id: &HoprSurbId) -> Result<PoppedSurb<S>, DbError> {
124 let mut rb = self.0.lock();
125
126 if rb.peek().is_some_and(|(surb_id, _)| surb_id == id) {
127 let (id, surb) = rb.dequeue().ok_or(DbError::NoSurbAvailable("no more surbs".into()))?;
128 Ok(PoppedSurb {
129 id,
130 surb,
131 remaining: rb.len(),
132 })
133 } else {
134 Err(DbError::NoSurbAvailable("surb does not match the given id".into()))
135 }
136 }
137}
138
139#[derive(Debug)]
141pub struct HoprDbCaches {
142 pub(crate) single_values: Cache<CachedValueDiscriminants, CachedValue>,
143 pub(crate) unacked_tickets: Cache<HalfKeyChallenge, PendingAcknowledgement>,
144 pub(crate) ticket_index: Cache<Hash, Arc<AtomicU64>>,
145 pub(crate) unrealized_value: Cache<(Hash, U256), HoprBalance>,
148 pub(crate) chain_to_offchain: Cache<Address, Option<OffchainPublicKey>>,
149 pub(crate) offchain_to_chain: Cache<OffchainPublicKey, Option<Address>>,
150 pub(crate) src_dst_to_channel: Cache<ChannelParties, Option<ChannelEntry>>,
151 pub(crate) key_id_mapper: CacheKeyMapper,
153 pseudonym_openers: moka::sync::Cache<HoprPseudonym, moka::sync::Cache<HoprSurbId, ReplyOpener>>,
154 pub(crate) surbs_per_pseudonym: Cache<HoprPseudonym, SurbRingBuffer<HoprSurb>>,
155}
156
157impl Default for HoprDbCaches {
158 fn default() -> Self {
159 Self {
160 single_values: Cache::builder().time_to_idle(Duration::from_secs(1800)).build(),
161 unacked_tickets: Cache::builder()
162 .time_to_live(Duration::from_secs(30))
163 .max_capacity(1_000_000_000)
164 .build(),
165 ticket_index: Cache::builder().expire_after(ExpiryNever).max_capacity(10_000).build(),
166 unrealized_value: Cache::builder().expire_after(ExpiryNever).max_capacity(10_000).build(),
167 chain_to_offchain: Cache::builder()
168 .time_to_idle(Duration::from_secs(600))
169 .max_capacity(100_000)
170 .build(),
171 offchain_to_chain: Cache::builder()
172 .time_to_idle(Duration::from_secs(600))
173 .max_capacity(100_000)
174 .build(),
175 src_dst_to_channel: Cache::builder()
176 .time_to_live(Duration::from_secs(600))
177 .max_capacity(10_000)
178 .build(),
179 pseudonym_openers: moka::sync::Cache::builder()
183 .time_to_idle(Duration::from_secs(600))
184 .eviction_policy(moka::policy::EvictionPolicy::lru())
185 .eviction_listener(|sender_id, _reply_opener, cause| {
186 tracing::warn!(?sender_id, ?cause, "evicting reply opener for pseudonym");
187 })
188 .max_capacity(10_000)
189 .build(),
190 surbs_per_pseudonym: Cache::builder()
193 .time_to_idle(Duration::from_secs(600))
194 .eviction_policy(moka::policy::EvictionPolicy::lru())
195 .eviction_listener(|pseudonym, _reply_opener, cause| {
196 tracing::warn!(%pseudonym, ?cause, "evicting surb for pseudonym");
197 })
198 .max_capacity(10_000)
199 .build(),
200 key_id_mapper: CacheKeyMapper::with_capacity(10_000),
201 }
202 }
203}
204
205impl HoprDbCaches {
206 pub(crate) fn insert_pseudonym_opener(&self, sender_id: HoprSenderId, opener: ReplyOpener) {
207 self.pseudonym_openers
208 .get_with(sender_id.pseudonym(), move || {
209 moka::sync::Cache::builder()
210 .time_to_live(Duration::from_secs(3600))
211 .eviction_listener(move |id: Arc<HoprSurbId>, _, cause| {
212 if cause != RemovalCause::Explicit {
213 tracing::warn!(pseudonym = %sender_id.pseudonym(), surb_id = hex::encode(id.as_slice()), ?cause, "evicting reply opener for sender id");
214 }
215 })
216 .max_capacity(100_000)
217 .build()
218 })
219 .insert(sender_id.surb_id(), opener);
220 }
221
222 pub(crate) fn extract_pseudonym_opener(&self, sender_id: &HoprSenderId) -> Option<ReplyOpener> {
223 self.pseudonym_openers
224 .get(&sender_id.pseudonym())
225 .and_then(|cache| cache.remove(&sender_id.surb_id()))
226 }
227
228 #[allow(dead_code)]
230 pub(crate) fn invalidate_pseudonym_openers(&self, pseudonym: &HoprPseudonym) {
231 self.pseudonym_openers.invalidate(pseudonym);
232 }
233
234 pub fn invalidate_all(&self) {
236 self.single_values.invalidate_all();
237 self.unacked_tickets.invalidate_all();
238 self.unrealized_value.invalidate_all();
239 self.chain_to_offchain.invalidate_all();
240 self.offchain_to_chain.invalidate_all();
241 self.src_dst_to_channel.invalidate_all();
242 }
244}
245
246#[derive(Debug)]
247pub(crate) struct CacheKeyMapper(
248 DashMap<KeyIdent<4>, OffchainPublicKey>,
249 DashMap<OffchainPublicKey, KeyIdent<4>>,
250);
251
252impl CacheKeyMapper {
253 pub fn with_capacity(capacity: usize) -> Self {
254 Self(DashMap::with_capacity(capacity), DashMap::with_capacity(capacity))
255 }
256
257 pub fn update_key_id_binding(&self, account: &AccountEntry) -> Result<(), DbSqlError> {
262 let id = account.key_id();
263 let key = account.public_key;
264
265 let id_entry = self.0.entry(id);
267 let key_entry = self.1.entry(key);
268
269 match (id_entry, key_entry) {
270 (Entry::Vacant(v_id), Entry::Vacant(v_key)) => {
271 v_id.insert_entry(key);
272 v_key.insert_entry(id);
273 tracing::debug!(%id, %key, "inserted key-id binding");
274 Ok(())
275 }
276 (Entry::Occupied(v_id), Entry::Occupied(v_key)) => {
277 if v_id.get() != v_key.key() {
279 Err(DbSqlError::LogicalError(format!(
280 "attempt to insert key {key} with key-id {id}, but key-id already maps to key {} while {} is \
281 expected",
282 v_id.get(),
283 v_key.key(),
284 )))
285 } else {
286 Ok(())
287 }
288 }
289 (Entry::Vacant(_), Entry::Occupied(v_key)) => {
293 tracing::debug!(
294 "attempt to insert key {key} with key-id {id} failed because key is already set as {}",
295 v_key.get()
296 );
297 Err(DbSqlError::LogicalError("inconsistent key-id binding".into()))
298 }
299 (Entry::Occupied(v_id), Entry::Vacant(_)) => {
301 tracing::debug!(
302 "attempt to insert key {key} with key-id {id} failed because key-id is already set as {}",
303 v_id.get()
304 );
305 Err(DbSqlError::LogicalError("inconsistent key-id binding".into()))
306 }
307 }
308 }
309}
310
311impl hopr_crypto_packet::KeyIdMapper<HoprSphinxSuite, HoprSphinxHeaderSpec> for CacheKeyMapper {
312 fn map_key_to_id(&self, key: &OffchainPublicKey) -> Option<KeyIdent> {
313 self.1.get(key).map(|k| *k.value())
314 }
315
316 fn map_id_to_public(&self, id: &KeyIdent) -> Option<OffchainPublicKey> {
317 self.0.get(id).map(|k| *k.value())
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324
325 #[test]
326 fn surb_ring_buffer_must_drop_items_when_capacity_is_reached() -> anyhow::Result<()> {
327 let rb = SurbRingBuffer::new(3);
328 rb.push([([1u8; 8], 0)])?;
329 rb.push([([2u8; 8], 0)])?;
330 rb.push([([3u8; 8], 0)])?;
331 rb.push([([4u8; 8], 0)])?;
332
333 let popped = rb.pop_one()?;
334 assert_eq!([2u8; 8], popped.id);
335 assert_eq!(2, popped.remaining);
336
337 let popped = rb.pop_one()?;
338 assert_eq!([3u8; 8], popped.id);
339 assert_eq!(1, popped.remaining);
340
341 let popped = rb.pop_one()?;
342 assert_eq!([4u8; 8], popped.id);
343 assert_eq!(0, popped.remaining);
344
345 assert!(rb.pop_one().is_err());
346
347 Ok(())
348 }
349
350 #[test]
351 fn surb_ring_buffer_must_be_fifo() -> anyhow::Result<()> {
352 let rb = SurbRingBuffer::new(5);
353
354 let len = rb.push([([1u8; 8], 0)])?;
355 assert_eq!(1, len);
356
357 let len = rb.push([([2u8; 8], 0)])?;
358 assert_eq!(2, len);
359
360 let popped = rb.pop_one()?;
361 assert_eq!([1u8; 8], popped.id);
362 assert_eq!(1, popped.remaining);
363
364 let popped = rb.pop_one()?;
365 assert_eq!([2u8; 8], popped.id);
366 assert_eq!(0, popped.remaining);
367
368 let len = rb.push([([1u8; 8], 0), ([2u8; 8], 0)])?;
369 assert_eq!(2, len);
370
371 assert_eq!([1u8; 8], rb.pop_one()?.id);
372 assert_eq!([2u8; 8], rb.pop_one()?.id);
373
374 Ok(())
375 }
376
377 #[test]
378 fn surb_ring_buffer_must_not_pop_if_id_does_not_match() -> anyhow::Result<()> {
379 let rb = SurbRingBuffer::new(5);
380
381 rb.push([([1u8; 8], 0)])?;
382
383 assert!(rb.pop_one_if_has_id(&[2u8; 8]).is_err());
384 assert_eq!([1u8; 8], rb.pop_one_if_has_id(&[1u8; 8])?.id);
385
386 Ok(())
387 }
388}