1use std::sync::{Arc, atomic::AtomicU64};
2
3use futures::StreamExt;
4use hopr_api::{chain::ChainReadChannelOperations, db::HoprDbTicketOperations};
5use hopr_crypto_types::prelude::*;
6use hopr_internal_types::prelude::*;
7use hopr_primitive_types::balance::HoprBalance;
8use validator::ValidationError;
9
10use crate::{HoprProtocolError, ResolvedAcknowledgement, TicketTracker, UnacknowledgedTicketProcessor};
11
12const MIN_UNACK_TICKET_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
13fn validate_unack_ticket_timeout(timeout: &std::time::Duration) -> Result<(), ValidationError> {
14 if timeout < &MIN_UNACK_TICKET_TIMEOUT {
15 Err(ValidationError::new("unack_ticket_timeout too low"))
16 } else {
17 Ok(())
18 }
19}
20
21const MIN_OUTGOING_INDEX_RETENTION: std::time::Duration = std::time::Duration::from_secs(10);
22
23fn validate_outgoing_index_retention(retention: &std::time::Duration) -> Result<(), ValidationError> {
24 if retention < &MIN_OUTGOING_INDEX_RETENTION {
25 Err(ValidationError::new("outgoing_index_cache_retention too low"))
26 } else {
27 Ok(())
28 }
29}
30
31fn default_outgoing_index_retention() -> std::time::Duration {
32 std::time::Duration::from_secs(10)
33}
34
35fn default_unack_ticket_timeout() -> std::time::Duration {
36 std::time::Duration::from_secs(30)
37}
38
39fn default_max_unack_tickets() -> usize {
40 10_000_000
41}
42
43#[derive(Debug, Clone, Copy, smart_default::SmartDefault, PartialEq, validator::Validate)]
46#[cfg_attr(
47 feature = "serde",
48 derive(serde::Deserialize, serde::Serialize),
49 serde(deny_unknown_fields)
50)]
51pub struct HoprTicketProcessorConfig {
52 #[default(default_unack_ticket_timeout())]
58 #[validate(custom(function = "validate_unack_ticket_timeout"))]
59 #[cfg_attr(
60 feature = "serde",
61 serde(default = "default_unack_ticket_timeout", with = "humantime_serde")
62 )]
63 pub unack_ticket_timeout: std::time::Duration,
64 #[default(default_max_unack_tickets())]
70 #[validate(range(min = 100))]
71 #[cfg_attr(feature = "serde", serde(default = "default_max_unack_tickets"))]
72 pub max_unack_tickets: usize,
73 #[default(default_outgoing_index_retention())]
77 #[validate(custom(function = "validate_outgoing_index_retention"))]
78 #[cfg_attr(
79 feature = "serde",
80 serde(default = "default_outgoing_index_retention", with = "humantime_serde")
81 )]
82 pub outgoing_index_cache_retention: std::time::Duration,
83}
84
85#[derive(Clone)]
87pub struct HoprTicketProcessor<Chain, Db> {
88 unacknowledged_tickets:
89 moka::future::Cache<OffchainPublicKey, moka::future::Cache<HalfKeyChallenge, UnacknowledgedTicket>>,
90 out_ticket_index: moka::future::Cache<(ChannelId, u32), Arc<AtomicU64>>,
91 db: Db,
92 chain_api: Chain,
93 chain_key: ChainKeypair,
94 channels_dst: Hash,
95 cfg: HoprTicketProcessorConfig,
96}
97
98impl<Chain, Db> HoprTicketProcessor<Chain, Db> {
99 pub fn new(
101 chain_api: Chain,
102 db: Db,
103 chain_key: ChainKeypair,
104 channels_dst: Hash,
105 cfg: HoprTicketProcessorConfig,
106 ) -> Self {
107 Self {
108 out_ticket_index: moka::future::Cache::builder()
109 .time_to_idle(cfg.outgoing_index_cache_retention)
110 .max_capacity(10_000)
111 .build(),
112 unacknowledged_tickets: moka::future::Cache::builder()
113 .time_to_idle(cfg.unack_ticket_timeout * 2)
114 .max_capacity(100_000)
115 .build(),
116 chain_api,
117 db,
118 chain_key,
119 channels_dst,
120 cfg,
121 }
122 }
123}
124
125impl<Chain, Db> HoprTicketProcessor<Chain, Db>
126where
127 Db: HoprDbTicketOperations + Clone + Send + 'static,
128{
129 pub fn outgoing_index_sync_task(
135 &self,
136 reg: futures::future::AbortRegistration,
137 ) -> impl Future<Output = ()> + use<Db, Chain> {
138 let index_save_stream = futures::stream::Abortable::new(
139 futures_time::stream::interval(futures_time::time::Duration::from(
140 self.cfg.outgoing_index_cache_retention.div_f32(2.0),
141 )),
142 reg,
143 );
144
145 let db = self.db.clone();
146 let out_ticket_index = self.out_ticket_index.clone();
147
148 index_save_stream
149 .for_each(move |_| {
150 let db = db.clone();
151 let out_ticket_index = out_ticket_index.clone();
152 async move {
153 for (channel_key, out_idx) in out_ticket_index.iter() {
156 if let Err(error) = db
157 .update_outgoing_ticket_index(
158 &channel_key.0,
159 channel_key.1,
160 out_idx.load(std::sync::atomic::Ordering::SeqCst),
161 )
162 .await
163 {
164 tracing::error!(%error, channel_id = %channel_key.0, epoch = channel_key.1, "failed to sync outgoing ticket index to db");
165 }
166 }
167 tracing::trace!("synced outgoing ticket indices to db");
168 }
169 })
170 }
171}
172
173#[async_trait::async_trait]
174impl<Chain, Db> UnacknowledgedTicketProcessor for HoprTicketProcessor<Chain, Db>
175where
176 Chain: ChainReadChannelOperations + Send + Sync,
177 Db: Send + Sync,
178{
179 type Error = HoprProtocolError;
180
181 async fn insert_unacknowledged_ticket(
182 &self,
183 next_hop: &OffchainPublicKey,
184 challenge: HalfKeyChallenge,
185 ticket: UnacknowledgedTicket,
186 ) -> Result<(), Self::Error> {
187 tracing::trace!(%ticket, "received unacknowledged ticket");
188 self.unacknowledged_tickets
189 .get_with_by_ref(next_hop, async {
190 moka::future::Cache::builder()
191 .time_to_live(self.cfg.unack_ticket_timeout)
192 .max_capacity(self.cfg.max_unack_tickets as u64)
193 .build()
194 })
195 .await
196 .insert(challenge, ticket)
197 .await;
198
199 Ok(())
200 }
201
202 async fn acknowledge_ticket(
203 &self,
204 peer: OffchainPublicKey,
205 ack: Acknowledgement,
206 ) -> Result<Option<ResolvedAcknowledgement>, Self::Error> {
207 let Some(awaiting_ack_from_peer) = self.unacknowledged_tickets.get(&peer).await else {
209 tracing::trace!(%peer, "not awaiting any acknowledgement from peer");
210 return Ok(None);
211 };
212
213 let (half_key, challenge) = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
215 ack.verify(&peer)
216 .and_then(|verified| Ok((*verified.ack_key_share(), verified.ack_key_share().to_challenge()?)))
217 })
218 .await?;
219
220 let unacknowledged = awaiting_ack_from_peer
222 .remove(&challenge)
223 .await
224 .ok_or_else(|| HoprProtocolError::UnacknowledgedTicketNotFound(challenge))?;
225
226 let issuer_channel = self
228 .chain_api
229 .channel_by_parties(unacknowledged.ticket.verified_issuer(), self.chain_key.as_ref())
230 .await
231 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
232 .filter(|c| c.channel_epoch == unacknowledged.verified_ticket().channel_epoch)
233 .ok_or(HoprProtocolError::ChannelNotFound(
234 *unacknowledged.ticket.verified_issuer(),
235 *self.chain_key.as_ref(),
236 ))?;
237
238 let domain_separator = self.channels_dst;
239 let chain_key = self.chain_key.clone();
240 let channel_id = *issuer_channel.get_id();
241 hopr_parallelize::cpu::spawn_fifo_blocking(move || {
242 let ack_ticket = unacknowledged.acknowledge(&half_key)?;
247
248 match ack_ticket.into_redeemable(&chain_key, &domain_separator) {
251 Ok(redeemable) => {
252 tracing::debug!(%issuer_channel, "found winning ticket");
253 Ok(Some(ResolvedAcknowledgement::RelayingWin(Box::new(redeemable))))
254 }
255 Err(CoreTypesError::TicketNotWinning) => {
256 tracing::trace!(%issuer_channel, "found losing ticket");
257 Ok(Some(ResolvedAcknowledgement::RelayingLoss(channel_id)))
258 }
259 Err(error) => {
260 tracing::error!(%error, %issuer_channel, "error when acknowledging ticket");
261 Ok(Some(ResolvedAcknowledgement::RelayingLoss(channel_id)))
262 }
263 }
264 })
265 .await
266 }
267}
268
269#[async_trait::async_trait]
270impl<Chain, Db> TicketTracker for HoprTicketProcessor<Chain, Db>
271where
272 Chain: Send + Sync,
273 Db: HoprDbTicketOperations + Clone + Send + Sync + 'static,
274{
275 type Error = Arc<Db::Error>;
276
277 async fn next_outgoing_ticket_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<u64, Self::Error> {
278 let channel_id = *channel_id;
279 self.out_ticket_index
280 .try_get_with((channel_id, epoch), async {
281 self.db
282 .get_or_create_outgoing_ticket_index(&channel_id, epoch)
283 .await
284 .map(|maybe_idx| Arc::new(AtomicU64::new(maybe_idx.unwrap_or_default())))
285 })
286 .await
287 .map(|idx| idx.fetch_add(1, std::sync::atomic::Ordering::SeqCst))
288 }
289
290 async fn incoming_channel_unrealized_balance(
291 &self,
292 channel_id: &ChannelId,
293 epoch: u32,
294 ) -> Result<HoprBalance, Self::Error> {
295 self.db.get_tickets_value(channel_id, epoch).await.map_err(Into::into)
298 }
299}