1use std::{
2 sync::{Arc, Mutex, atomic::AtomicU64},
3 time::Duration,
4};
5
6use dashmap::{DashMap, Entry};
7use hopr_crypto_packet::{
8 HoprSphinxHeaderSpec, HoprSphinxSuite, HoprSurb, ReplyOpener,
9 prelude::{HoprSenderId, HoprSurbId},
10};
11use hopr_crypto_types::prelude::*;
12use hopr_db_api::{
13 info::{IndexerData, SafeInfo},
14 prelude::DbError,
15};
16use hopr_internal_types::prelude::*;
17use hopr_primitive_types::{
18 balance::HoprBalance,
19 prelude::{Address, KeyIdent, U256},
20};
21use moka::{Expiry, future::Cache};
22use ringbuffer::{AllocRingBuffer, RingBuffer};
23
24use crate::errors::DbSqlError;
25
26#[derive(Debug, Clone, strum::EnumDiscriminants)]
29#[strum_discriminants(derive(Hash))]
30pub enum CachedValue {
31 IndexerDataCache(IndexerData),
33 SafeInfoCache(Option<SafeInfo>),
35}
36
37impl TryFrom<CachedValue> for IndexerData {
38 type Error = DbSqlError;
39
40 fn try_from(value: CachedValue) -> Result<Self, Self::Error> {
41 match value {
42 CachedValue::IndexerDataCache(data) => Ok(data),
43 _ => Err(DbSqlError::DecodingError),
44 }
45 }
46}
47
48impl TryFrom<CachedValue> for Option<SafeInfo> {
49 type Error = DbSqlError;
50
51 fn try_from(value: CachedValue) -> Result<Self, Self::Error> {
52 match value {
53 CachedValue::SafeInfoCache(data) => Ok(data),
54 _ => Err(DbSqlError::DecodingError),
55 }
56 }
57}
58
59struct ExpiryNever;
60
61impl<K, V> Expiry<K, V> for ExpiryNever {
62 fn expire_after_create(&self, _key: &K, _value: &V, _current_time: std::time::Instant) -> Option<Duration> {
63 None
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub(crate) struct ChannelParties(pub(crate) Address, pub(crate) Address);
69
70#[derive(Clone, Debug)]
73pub(crate) struct SurbRingBuffer(Arc<Mutex<AllocRingBuffer<(HoprSurbId, HoprSurb)>>>);
74
75impl Default for SurbRingBuffer {
76 fn default() -> Self {
77 Self::new(10_000)
79 }
80}
81
82impl SurbRingBuffer {
83 pub fn new(capacity: usize) -> Self {
84 Self(Arc::new(Mutex::new(AllocRingBuffer::new(capacity))))
85 }
86
87 pub fn push<I: IntoIterator<Item = (HoprSurbId, HoprSurb)>>(&self, surbs: I) -> Result<(), DbError> {
89 self.0
90 .lock()
91 .map_err(|_| DbError::LogicalError("failed to lock surbs".into()))?
92 .extend(surbs);
93 Ok(())
94 }
95
96 pub fn pop_one(&self) -> Result<(HoprSurbId, HoprSurb), DbError> {
98 self.0
99 .lock()
100 .map_err(|_| DbError::LogicalError("failed to lock surbs".into()))?
101 .dequeue()
102 .ok_or(DbError::NoSurbAvailable("no more surbs".into()))
103 }
104
105 pub fn pop_one_if_has_id(&self, id: &HoprSurbId) -> Result<(HoprSurbId, HoprSurb), DbError> {
107 let mut rb = self
108 .0
109 .lock()
110 .map_err(|_| DbError::LogicalError("failed to lock surbs".into()))?;
111 if rb.peek().is_some_and(|(surb_id, _)| surb_id == id) {
112 rb.dequeue().ok_or(DbError::NoSurbAvailable("no more surbs".into()))
113 } else {
114 Err(DbError::NoSurbAvailable("surb does not match the given id".into()))
115 }
116 }
117}
118
119#[derive(Debug)]
121pub struct HoprDbCaches {
122 pub(crate) single_values: Cache<CachedValueDiscriminants, CachedValue>,
123 pub(crate) unacked_tickets: Cache<HalfKeyChallenge, PendingAcknowledgement>,
124 pub(crate) ticket_index: Cache<Hash, Arc<AtomicU64>>,
125 pub(crate) unrealized_value: Cache<(Hash, U256), HoprBalance>,
128 pub(crate) chain_to_offchain: Cache<Address, Option<OffchainPublicKey>>,
129 pub(crate) offchain_to_chain: Cache<OffchainPublicKey, Option<Address>>,
130 pub(crate) src_dst_to_channel: Cache<ChannelParties, Option<ChannelEntry>>,
131 pub(crate) key_id_mapper: CacheKeyMapper,
133 pub(crate) pseudonym_openers: moka::sync::Cache<HoprSenderId, ReplyOpener>,
134 pub(crate) surbs_per_pseudonym: Cache<HoprPseudonym, SurbRingBuffer>,
135}
136
137impl Default for HoprDbCaches {
138 fn default() -> Self {
139 let single_values = Cache::builder().time_to_idle(Duration::from_secs(1800)).build();
140
141 let unacked_tickets = Cache::builder()
142 .time_to_live(Duration::from_secs(30))
143 .max_capacity(1_000_000_000)
144 .build();
145
146 let ticket_index = Cache::builder().expire_after(ExpiryNever).max_capacity(10_000).build();
147
148 let unrealized_value = Cache::builder().expire_after(ExpiryNever).max_capacity(10_000).build();
149
150 let chain_to_offchain = Cache::builder()
151 .time_to_idle(Duration::from_secs(600))
152 .max_capacity(100_000)
153 .build();
154
155 let offchain_to_chain = Cache::builder()
156 .time_to_idle(Duration::from_secs(600))
157 .max_capacity(100_000)
158 .build();
159
160 let src_dst_to_channel = Cache::builder()
161 .time_to_live(Duration::from_secs(600))
162 .max_capacity(10_000)
163 .build();
164
165 let pseudonym_openers = moka::sync::Cache::builder()
168 .time_to_live(Duration::from_secs(60))
169 .max_capacity(100_000)
170 .build();
171
172 let surbs_per_pseudonym = Cache::builder()
175 .time_to_idle(Duration::from_secs(600))
176 .max_capacity(10_000)
177 .build();
178
179 Self {
180 single_values,
181 unacked_tickets,
182 ticket_index,
183 unrealized_value,
184 chain_to_offchain,
185 offchain_to_chain,
186 src_dst_to_channel,
187 pseudonym_openers,
188 surbs_per_pseudonym,
189 key_id_mapper: CacheKeyMapper::with_capacity(10_000),
190 }
191 }
192}
193
194impl HoprDbCaches {
195 pub fn invalidate_all(&self) {
197 self.single_values.invalidate_all();
198 self.unacked_tickets.invalidate_all();
199 self.unrealized_value.invalidate_all();
200 self.chain_to_offchain.invalidate_all();
201 self.offchain_to_chain.invalidate_all();
202 self.src_dst_to_channel.invalidate_all();
203 }
205}
206
207#[derive(Debug)]
208pub(crate) struct CacheKeyMapper(
209 DashMap<KeyIdent<4>, OffchainPublicKey>,
210 DashMap<OffchainPublicKey, KeyIdent<4>>,
211);
212
213impl CacheKeyMapper {
214 pub fn with_capacity(capacity: usize) -> Self {
215 Self(DashMap::with_capacity(capacity), DashMap::with_capacity(capacity))
216 }
217
218 pub fn update_key_id_binding(&self, account: &AccountEntry) -> Result<(), DbSqlError> {
223 let id = account.key_id();
224 let key = account.public_key;
225
226 let id_entry = self.0.entry(id);
228 let key_entry = self.1.entry(key);
229
230 match (id_entry, key_entry) {
231 (Entry::Vacant(v_id), Entry::Vacant(v_key)) => {
232 v_id.insert_entry(key);
233 v_key.insert_entry(id);
234 tracing::debug!(%id, %key, "inserted key-id binding");
235 Ok(())
236 }
237 (Entry::Occupied(v_id), Entry::Occupied(v_key)) => {
238 if v_id.get() != v_key.key() {
240 Err(DbSqlError::LogicalError(format!(
241 "attempt to insert key {key} with key-id {id}, but key-id already maps to key {} while {} is \
242 expected",
243 v_id.get(),
244 v_key.key(),
245 )))
246 } else {
247 Ok(())
248 }
249 }
250 (Entry::Vacant(_), Entry::Occupied(v_key)) => {
254 tracing::debug!(
255 "attempt to insert key {key} with key-id {id} failed because key is already set as {}",
256 v_key.get()
257 );
258 Err(DbSqlError::LogicalError("inconsistent key-id binding".into()))
259 }
260 (Entry::Occupied(v_id), Entry::Vacant(_)) => {
262 tracing::debug!(
263 "attempt to insert key {key} with key-id {id} failed because key-id is already set as {}",
264 v_id.get()
265 );
266 Err(DbSqlError::LogicalError("inconsistent key-id binding".into()))
267 }
268 }
269 }
270}
271
272impl hopr_crypto_packet::KeyIdMapper<HoprSphinxSuite, HoprSphinxHeaderSpec> for CacheKeyMapper {
273 fn map_key_to_id(&self, key: &OffchainPublicKey) -> Option<KeyIdent> {
274 self.1.get(key).map(|k| *k.value())
275 }
276
277 fn map_id_to_public(&self, id: &KeyIdent) -> Option<OffchainPublicKey> {
278 self.0.get(id).map(|k| *k.value())
279 }
280}