1use std::sync::{Arc, atomic::AtomicU64};
2
3use futures::StreamExt;
4use hopr_api::{chain::ChainReadChannelOperations, db::HoprDbTicketOperations};
5use hopr_crypto_types::prelude::*;
6use hopr_internal_types::prelude::*;
7#[cfg(feature = "rayon")]
8use hopr_parallelize::cpu::rayon::prelude::*;
9use hopr_primitive_types::balance::HoprBalance;
10use validator::ValidationError;
11
12use crate::{
13 HoprProtocolError, ResolvedAcknowledgement, TicketAcknowledgementError, TicketTracker,
14 UnacknowledgedTicketProcessor,
15};
16
17const MIN_UNACK_TICKET_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
18fn validate_unack_ticket_timeout(timeout: &std::time::Duration) -> Result<(), ValidationError> {
19 if timeout < &MIN_UNACK_TICKET_TIMEOUT {
20 Err(ValidationError::new("unack_ticket_timeout too low"))
21 } else {
22 Ok(())
23 }
24}
25
26const MIN_OUTGOING_INDEX_RETENTION: std::time::Duration = std::time::Duration::from_secs(10);
27
28fn validate_outgoing_index_retention(retention: &std::time::Duration) -> Result<(), ValidationError> {
29 if retention < &MIN_OUTGOING_INDEX_RETENTION {
30 Err(ValidationError::new("outgoing_index_cache_retention too low"))
31 } else {
32 Ok(())
33 }
34}
35
36fn default_outgoing_index_retention() -> std::time::Duration {
37 std::time::Duration::from_secs(10)
38}
39
40fn default_unack_ticket_timeout() -> std::time::Duration {
41 std::time::Duration::from_secs(30)
42}
43
44fn default_max_unack_tickets() -> usize {
45 10_000_000
46}
47
48fn just_true() -> bool {
49 true
50}
51
52#[derive(Debug, Clone, Copy, smart_default::SmartDefault, PartialEq, validator::Validate)]
55#[cfg_attr(
56 feature = "serde",
57 derive(serde::Deserialize, serde::Serialize),
58 serde(deny_unknown_fields)
59)]
60pub struct HoprTicketProcessorConfig {
61 #[default(default_unack_ticket_timeout())]
67 #[validate(custom(function = "validate_unack_ticket_timeout"))]
68 #[cfg_attr(
69 feature = "serde",
70 serde(default = "default_unack_ticket_timeout", with = "humantime_serde")
71 )]
72 pub unack_ticket_timeout: std::time::Duration,
73 #[default(default_max_unack_tickets())]
79 #[validate(range(min = 100))]
80 #[cfg_attr(feature = "serde", serde(default = "default_max_unack_tickets"))]
81 pub max_unack_tickets: usize,
82 #[default(default_outgoing_index_retention())]
86 #[validate(custom(function = "validate_outgoing_index_retention"))]
87 #[cfg_attr(
88 feature = "serde",
89 serde(default = "default_outgoing_index_retention", with = "humantime_serde")
90 )]
91 pub outgoing_index_cache_retention: std::time::Duration,
92 #[default(just_true())]
98 #[cfg_attr(feature = "serde", serde(default = "just_true"))]
99 pub use_batch_verification: bool,
100}
101
102#[derive(Clone)]
104pub struct HoprTicketProcessor<Chain, Db> {
105 unacknowledged_tickets:
106 moka::future::Cache<OffchainPublicKey, moka::future::Cache<HalfKeyChallenge, UnacknowledgedTicket>>,
107 out_ticket_index: moka::future::Cache<(ChannelId, u32), Arc<AtomicU64>>,
108 db: Db,
109 chain_api: Chain,
110 chain_key: ChainKeypair,
111 channels_dst: Hash,
112 cfg: HoprTicketProcessorConfig,
113}
114
115impl<Chain, Db> HoprTicketProcessor<Chain, Db> {
116 pub fn new(
118 chain_api: Chain,
119 db: Db,
120 chain_key: ChainKeypair,
121 channels_dst: Hash,
122 cfg: HoprTicketProcessorConfig,
123 ) -> Self {
124 Self {
125 out_ticket_index: moka::future::Cache::builder()
126 .time_to_idle(cfg.outgoing_index_cache_retention)
127 .max_capacity(10_000)
128 .build(),
129 unacknowledged_tickets: moka::future::Cache::builder()
130 .time_to_idle(cfg.unack_ticket_timeout)
131 .max_capacity(100_000)
132 .build(),
133 chain_api,
134 db,
135 chain_key,
136 channels_dst,
137 cfg,
138 }
139 }
140}
141
142impl<Chain, Db> HoprTicketProcessor<Chain, Db>
143where
144 Db: HoprDbTicketOperations + Clone + Send + 'static,
145{
146 pub fn outgoing_index_sync_task(
152 &self,
153 reg: futures::future::AbortRegistration,
154 ) -> impl Future<Output = ()> + use<Db, Chain> {
155 let index_save_stream = futures::stream::Abortable::new(
156 futures_time::stream::interval(futures_time::time::Duration::from(
157 self.cfg.outgoing_index_cache_retention.div_f32(2.0),
158 )),
159 reg,
160 );
161
162 let db = self.db.clone();
163 let out_ticket_index = self.out_ticket_index.clone();
164
165 index_save_stream
166 .for_each(move |_| {
167 let db = db.clone();
168 let out_ticket_index = out_ticket_index.clone();
169 async move {
170 for (channel_key, out_idx) in out_ticket_index.iter() {
173 if let Err(error) = db
174 .update_outgoing_ticket_index(
175 &channel_key.0,
176 channel_key.1,
177 out_idx.load(std::sync::atomic::Ordering::SeqCst),
178 )
179 .await
180 {
181 tracing::error!(%error, channel_id = %channel_key.0, epoch = channel_key.1, "failed to sync outgoing ticket index to db");
182 }
183 }
184 tracing::trace!("synced outgoing ticket indices to db");
185 }
186 })
187 }
188}
189
190#[async_trait::async_trait]
191impl<Chain, Db> UnacknowledgedTicketProcessor for HoprTicketProcessor<Chain, Db>
192where
193 Chain: ChainReadChannelOperations + Send + Sync,
194 Db: Send + Sync,
195{
196 type Error = HoprProtocolError;
197
198 #[tracing::instrument(skip(self, next_hop, challenge, ticket), level = "trace", fields(next_hop = next_hop.to_peerid_str()))]
199 async fn insert_unacknowledged_ticket(
200 &self,
201 next_hop: &OffchainPublicKey,
202 challenge: HalfKeyChallenge,
203 ticket: UnacknowledgedTicket,
204 ) -> Result<(), Self::Error> {
205 tracing::trace!(%ticket, "received unacknowledged ticket");
206 self.unacknowledged_tickets
207 .get_with_by_ref(next_hop, async {
208 moka::future::Cache::builder()
209 .time_to_live(self.cfg.unack_ticket_timeout)
210 .max_capacity(self.cfg.max_unack_tickets as u64)
211 .build()
212 })
213 .await
214 .insert(challenge, ticket)
215 .await;
216
217 Ok(())
218 }
219
220 #[tracing::instrument(skip_all, level = "trace", fields(peer = peer.to_peerid_str()))]
221 async fn acknowledge_tickets(
222 &self,
223 peer: OffchainPublicKey,
224 acks: Vec<Acknowledgement>,
225 ) -> Result<Vec<ResolvedAcknowledgement>, TicketAcknowledgementError<Self::Error>> {
226 if !self.unacknowledged_tickets.contains_key(&peer) {
231 tracing::trace!("not awaiting any acknowledgement from peer");
232 return Err(TicketAcknowledgementError::UnexpectedAcknowledgement);
233 }
234 let Some(awaiting_ack_from_peer) = self.unacknowledged_tickets.get(&peer).await else {
235 tracing::trace!("not awaiting any acknowledgement from peer");
236 return Err(TicketAcknowledgementError::UnexpectedAcknowledgement);
237 };
238
239 let use_batch_verify = self.cfg.use_batch_verification;
241 let half_keys_challenges = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
242 if use_batch_verify {
243 let acks = Acknowledgement::verify_batch(acks.into_iter().map(|ack| (peer, ack)));
246
247 #[cfg(feature = "rayon")]
248 let iter = acks.into_par_iter();
249
250 #[cfg(not(feature = "rayon"))]
251 let iter = acks.into_iter();
252
253 iter.map(|verified| {
254 verified
255 .and_then(|verified| Ok((*verified.ack_key_share(), verified.ack_key_share().to_challenge()?)))
256 })
257 .filter_map(|res| {
258 res.inspect_err(|error| tracing::error!(%error, "failed to process acknowledgement"))
259 .ok()
260 })
261 .collect::<Vec<_>>()
262 } else {
263 #[cfg(feature = "rayon")]
264 let iter = acks.into_par_iter();
265
266 #[cfg(not(feature = "rayon"))]
267 let iter = acks.into_iter();
268
269 iter.map(|ack| {
270 ack.verify(&peer)
271 .and_then(|verified| Ok((*verified.ack_key_share(), verified.ack_key_share().to_challenge()?)))
272 })
273 .filter_map(|res| {
274 res.inspect_err(|error| tracing::error!(%error, "failed to process acknowledgement"))
275 .ok()
276 })
277 .collect::<Vec<_>>()
278 }
279 })
280 .await;
281
282 let mut unack_tickets = Vec::with_capacity(half_keys_challenges.len());
284 for (half_key, challenge) in half_keys_challenges {
285 let Some(unack_ticket) = awaiting_ack_from_peer.remove(&challenge).await else {
286 tracing::debug!(%challenge, "received acknowledgement for unknown ticket");
287 continue;
288 };
289
290 let issuer_channel = match self
291 .chain_api
292 .channel_by_parties(unack_ticket.ticket.verified_issuer(), self.chain_key.as_ref())
293 .await
294 {
295 Ok(Some(channel)) => {
296 if channel.channel_epoch != unack_ticket.verified_ticket().channel_epoch {
297 tracing::error!(%unack_ticket, "received acknowledgement for ticket issued in a different epoch");
298 continue;
299 }
300 channel
301 }
302 Ok(None) => {
303 tracing::error!(%unack_ticket, "received acknowledgement for ticket issued for unknown channel");
304 continue;
305 }
306 Err(error) => {
307 tracing::error!(%error, %unack_ticket, "failed to resolve channel for unacknowledged ticket");
308 continue;
309 }
310 };
311
312 unack_tickets.push((issuer_channel, half_key, unack_ticket));
313 }
314
315 let domain_separator = self.channels_dst;
316 let chain_key = self.chain_key.clone();
317 Ok(hopr_parallelize::cpu::spawn_fifo_blocking(move || {
318 #[cfg(feature = "rayon")]
319 let iter = unack_tickets.into_par_iter();
320
321 #[cfg(not(feature = "rayon"))]
322 let iter = unack_tickets.into_iter();
323
324 iter.filter_map(|(channel, half_key, unack_ticket)| {
325 let Ok(ack_ticket) = unack_ticket.acknowledge(&half_key) else {
331 tracing::error!(%unack_ticket, "failed to acknowledge ticket");
332 return None;
333 };
334
335 match ack_ticket.into_redeemable(&chain_key, &domain_separator) {
338 Ok(redeemable) => {
339 tracing::debug!(%channel, "found winning ticket");
340 Some(ResolvedAcknowledgement::RelayingWin(Box::new(redeemable)))
341 }
342 Err(CoreTypesError::TicketNotWinning) => {
343 tracing::trace!(%channel, "found losing ticket");
344 Some(ResolvedAcknowledgement::RelayingLoss(*channel.get_id()))
345 }
346 Err(error) => {
347 tracing::error!(%error, %channel, "error when acknowledging ticket");
348 Some(ResolvedAcknowledgement::RelayingLoss(*channel.get_id()))
349 }
350 }
351 })
352 .collect::<Vec<_>>()
353 })
354 .await)
355 }
356}
357
358#[async_trait::async_trait]
359impl<Chain, Db> TicketTracker for HoprTicketProcessor<Chain, Db>
360where
361 Chain: Send + Sync,
362 Db: HoprDbTicketOperations + Clone + Send + Sync + 'static,
363{
364 type Error = Arc<Db::Error>;
365
366 async fn next_outgoing_ticket_index(&self, channel_id: &ChannelId, epoch: u32) -> Result<u64, Self::Error> {
367 let channel_id = *channel_id;
368 self.out_ticket_index
369 .try_get_with((channel_id, epoch), async {
370 self.db
371 .get_or_create_outgoing_ticket_index(&channel_id, epoch)
372 .await
373 .map(|maybe_idx| Arc::new(AtomicU64::new(maybe_idx.unwrap_or_default())))
374 })
375 .await
376 .map(|idx| idx.fetch_add(1, std::sync::atomic::Ordering::SeqCst))
377 }
378
379 async fn incoming_channel_unrealized_balance(
380 &self,
381 channel_id: &ChannelId,
382 epoch: u32,
383 ) -> Result<HoprBalance, Self::Error> {
384 self.db.get_tickets_value(channel_id, epoch).await.map_err(Into::into)
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use hopr_crypto_random::Randomizable;
393
394 use super::*;
395 use crate::utils::*;
396
397 #[tokio::test]
398 async fn ticket_processor_should_acknowledge_previously_inserted_tickets() -> anyhow::Result<()> {
399 let blokli_client = create_blokli_client()?;
400
401 let node = create_node(1, &blokli_client).await?;
402
403 let ticket_processor = HoprTicketProcessor::new(
404 node.chain_api.clone(),
405 node.node_db.clone(),
406 node.chain_key.clone(),
407 Hash::default(),
408 HoprTicketProcessorConfig::default(),
409 );
410
411 const NUM_TICKETS: usize = 5;
412
413 let mut acks = Vec::with_capacity(5);
414 for index in 0..NUM_TICKETS {
415 let own_share = HalfKey::random();
416 let ack_share = HalfKey::random();
417 let challenge = Challenge::from_own_share_and_half_key(&own_share.to_challenge()?, &ack_share)?;
418
419 let unack_ticket = TicketBuilder::default()
420 .counterparty(&PEERS[1].0)
421 .index(index as u64)
422 .channel_epoch(1)
423 .amount(10_u32)
424 .challenge(challenge)
425 .build_signed(&PEERS[0].0, &Hash::default())?
426 .into_unacknowledged(own_share);
427
428 ticket_processor
429 .insert_unacknowledged_ticket(PEERS[2].1.public(), ack_share.to_challenge()?, unack_ticket)
430 .await?;
431
432 acks.push(VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak());
433 }
434
435 let resolutions = ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), acks).await?;
436 assert_eq!(NUM_TICKETS, resolutions.len());
437 assert!(
438 resolutions
439 .iter()
440 .all(|res| matches!(res, ResolvedAcknowledgement::RelayingWin(_)))
441 );
442
443 Ok(())
444 }
445
446 #[tokio::test]
447 async fn ticket_processor_should_reject_acknowledgements_from_unexpected_sender() -> anyhow::Result<()> {
448 let blokli_client = create_blokli_client()?;
449
450 let node = create_node(1, &blokli_client).await?;
451
452 let ticket_processor = HoprTicketProcessor::new(
453 node.chain_api.clone(),
454 node.node_db.clone(),
455 node.chain_key.clone(),
456 Hash::default(),
457 HoprTicketProcessorConfig::default(),
458 );
459
460 const NUM_ACKS: usize = 5;
461
462 let mut acks = Vec::with_capacity(5);
463 for _ in 0..NUM_ACKS {
464 let ack_share = HalfKey::random();
465 acks.push(VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak());
466 }
467
468 assert!(matches!(
469 ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), acks).await,
470 Err(TicketAcknowledgementError::UnexpectedAcknowledgement)
471 ));
472
473 Ok(())
474 }
475}