1use std::ops::{Mul, Sub};
2
3use async_trait::async_trait;
4use hopr_crypto_packet::prelude::*;
5use hopr_crypto_types::{crypto_traits::Randomizable, prelude::*};
6use hopr_db_api::{
7 errors::Result,
8 prelude::DbError,
9 protocol::{HoprDbProtocolOperations, IncomingPacket, OutgoingPacket, ResolvedAcknowledgement},
10 resolver::HoprDbResolverOperations,
11};
12use hopr_internal_types::prelude::*;
13#[cfg(all(feature = "prometheus", not(test)))]
14use hopr_metrics::metrics::{MultiCounter, SimpleCounter};
15use hopr_network_types::prelude::{ResolvedTransportRouting, SurbMatcher};
16use hopr_parallelize::cpu::spawn_fifo_blocking;
17use hopr_path::{Path, PathAddressResolver, ValidatedPath, errors::PathError};
18use hopr_primitive_types::prelude::*;
19use tracing::{instrument, trace, warn};
20
21use crate::{
22 channels::HoprDbChannelOperations, db::HoprDb, errors::DbSqlError, info::HoprDbInfoOperations,
23 prelude::HoprDbTicketOperations,
24};
25
26#[cfg(all(feature = "prometheus", not(test)))]
27lazy_static::lazy_static! {
28 static ref METRIC_INCOMING_WIN_PROB: hopr_metrics::SimpleHistogram =
29 hopr_metrics::SimpleHistogram::new(
30 "hopr_tickets_incoming_win_probability",
31 "Observes the winning probabilities on incoming tickets",
32 vec![0.0, 0.0001, 0.001, 0.01, 0.05, 0.1, 0.15, 0.25, 0.3, 0.5],
33 ).unwrap();
34
35 pub(crate) static ref METRIC_RECEIVED_ACKS: MultiCounter = MultiCounter::new(
36 "hopr_received_ack_count",
37 "Number of received acknowledgements",
38 &["valid"]
39 )
40 .unwrap();
41
42 pub(crate) static ref METRIC_SENT_ACKS: SimpleCounter =
43 SimpleCounter::new("hopr_sent_acks_count", "Number of sent message acknowledgements").unwrap();
44
45 pub(crate) static ref METRIC_TICKETS_COUNT: MultiCounter =
46 MultiCounter::new("hopr_tickets_count", "Number of winning tickets", &["type"]).unwrap();
47}
48
49impl HoprDb {
50 async fn validate_and_replace_ticket(
52 &self,
53 mut fwd: HoprForwardedPacket,
54 me: &ChainKeypair,
55 outgoing_ticket_win_prob: WinningProbability,
56 outgoing_ticket_price: HoprBalance,
57 ) -> std::result::Result<HoprForwardedPacket, DbSqlError> {
58 let previous_hop_addr = self.resolve_chain_key(&fwd.previous_hop).await?.ok_or_else(|| {
59 DbSqlError::LogicalError(format!(
60 "failed to find channel key for packet key {} on previous hop",
61 fwd.previous_hop.to_peerid_str()
62 ))
63 })?;
64
65 let next_hop_addr = self.resolve_chain_key(&fwd.outgoing.next_hop).await?.ok_or_else(|| {
66 DbSqlError::LogicalError(format!(
67 "failed to find channel key for packet key {} on next hop",
68 fwd.outgoing.next_hop.to_peerid_str()
69 ))
70 })?;
71
72 let incoming_channel = self
73 .get_channel_by_parties(None, &previous_hop_addr, &self.me_onchain, true)
74 .await?
75 .ok_or_else(|| {
76 DbSqlError::LogicalError(format!(
77 "no channel found for previous hop address '{previous_hop_addr}'"
78 ))
79 })?;
80
81 if !fwd.outgoing.ticket.channel_id.eq(&incoming_channel.get_id()) {
83 return Err(DbSqlError::LogicalError("invalid ticket for channel".into()));
84 }
85
86 let chain_data = self.get_indexer_data(None).await?;
87 let domain_separator = chain_data
88 .channels_dst
89 .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the domain separator".into()))?;
90
91 let minimum_ticket_price = chain_data
94 .ticket_price
95 .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the ticket price".into()))?
96 .mul(U256::from(fwd.path_pos));
97
98 #[cfg(all(feature = "prometheus", not(test)))]
99 METRIC_INCOMING_WIN_PROB.observe(fwd.outgoing.ticket.win_prob().as_f64());
100
101 let remaining_balance = incoming_channel
102 .balance
103 .sub(self.ticket_manager.unrealized_value((&incoming_channel).into()).await?);
104
105 let verified_incoming_ticket = spawn_fifo_blocking(move || {
110 validate_unacknowledged_ticket(
111 fwd.outgoing.ticket,
112 &incoming_channel,
113 minimum_ticket_price,
114 chain_data.minimum_incoming_ticket_winning_prob,
115 remaining_balance,
116 &domain_separator,
117 )
118 })
119 .await?;
120
121 let outgoing_ticket_win_prob = outgoing_ticket_win_prob.max(&verified_incoming_ticket.win_prob());
125
126 self.caches
128 .unacked_tickets
129 .insert(
130 fwd.outgoing.ack_challenge,
131 PendingAcknowledgement::WaitingAsRelayer(verified_incoming_ticket.into_unacknowledged(fwd.own_key)),
132 )
133 .await;
134
135 let ticket_builder = if fwd.path_pos > 1 {
142 let outgoing_channel = self
144 .get_channel_by_parties(None, &self.me_onchain, &next_hop_addr, true)
145 .await?
146 .ok_or(DbSqlError::LogicalError(format!(
147 "channel to '{next_hop_addr}' not found",
148 )))?;
149
150 self.create_multihop_ticket(
151 outgoing_channel,
152 self.me_onchain,
153 next_hop_addr,
154 fwd.path_pos,
155 outgoing_ticket_win_prob,
156 outgoing_ticket_price,
157 )
158 .await?
159 } else {
160 TicketBuilder::zero_hop().direction(&self.me_onchain, &next_hop_addr)
161 };
162
163 fwd.outgoing.ticket = ticket_builder
165 .challenge(fwd.next_challenge)
166 .build_signed(me, &domain_separator)?
167 .leak();
168
169 Ok(fwd)
170 }
171
172 async fn validate_acknowledgement(
173 &self,
174 ack: &Acknowledgement,
175 ) -> std::result::Result<ResolvedAcknowledgement, DbSqlError> {
176 let pending_ack = self
177 .caches
178 .unacked_tickets
179 .remove(&ack.ack_challenge()?)
180 .await
181 .ok_or_else(|| {
182 DbSqlError::AcknowledgementValidationError(format!(
183 "received unexpected acknowledgement for half key challenge {}",
184 ack.to_hex()
185 ))
186 })?;
187
188 match pending_ack {
189 PendingAcknowledgement::WaitingAsSender => {
190 trace!("received acknowledgement as sender: first relayer has processed the packet");
191 Ok(ResolvedAcknowledgement::Sending(*ack))
192 }
193
194 PendingAcknowledgement::WaitingAsRelayer(unacknowledged) => {
195 let maybe_channel_with_issuer = self
196 .get_channel_by_parties(None, unacknowledged.ticket.verified_issuer(), &self.me_onchain, true)
197 .await?;
198
199 if maybe_channel_with_issuer
201 .is_some_and(|c| c.channel_epoch.as_u32() == unacknowledged.verified_ticket().channel_epoch)
202 {
203 let domain_separator = self
204 .get_indexer_data(None)
205 .await?
206 .channels_dst
207 .ok_or_else(|| DbSqlError::LogicalError("domain separator missing".into()))?;
208
209 let myself = self.clone();
210 let ack = *ack;
211 hopr_parallelize::cpu::spawn_blocking(move || {
212 let ack_ticket = unacknowledged.acknowledge(&ack.ack_key_share()?)?;
217
218 if ack_ticket.is_winning(&myself.chain_key, &domain_separator) {
219 trace!("Found a winning ticket");
220 Ok(ResolvedAcknowledgement::RelayingWin(ack_ticket))
221 } else {
222 trace!("Found a losing ticket");
223 Ok(ResolvedAcknowledgement::RelayingLoss(
224 ack_ticket.ticket.verified_ticket().channel_id,
225 ))
226 }
227 })
228 .await
229 } else {
230 Err(DbSqlError::LogicalError(format!(
231 "no channel found for address '{}' with a matching epoch",
232 unacknowledged.ticket.verified_issuer()
233 )))
234 }
235 }
236 }
237 }
238}
239
240#[async_trait]
241impl HoprDbProtocolOperations for HoprDb {
242 #[instrument(level = "trace", skip(self, ack))]
243 async fn handle_acknowledgement(&self, ack: Acknowledgement) -> Result<()> {
244 let result = self.validate_acknowledgement(&ack).await?;
245 match &result {
246 ResolvedAcknowledgement::RelayingWin(ack_ticket) => {
247 self.ticket_manager.insert_ticket(ack_ticket.clone()).await?;
249
250 #[cfg(all(feature = "prometheus", not(test)))]
251 {
252 METRIC_RECEIVED_ACKS.increment(&["true"]);
253 METRIC_TICKETS_COUNT.increment(&["winning"]);
254
255 let verified_ticket = ack_ticket.ticket.verified_ticket();
256 let channel = verified_ticket.channel_id.to_string();
257 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
258 &[&channel, "unredeemed"],
259 self.ticket_manager
260 .unrealized_value(hopr_db_api::tickets::TicketSelector::new(
261 verified_ticket.channel_id,
262 verified_ticket.channel_epoch,
263 ))
264 .await?
265 .amount()
266 .as_u128() as f64,
267 );
268 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
269 .increment(&[&channel, "winning_count"], 1.0f64);
270 }
271 }
272 ResolvedAcknowledgement::RelayingLoss(_channel) => {
273 #[cfg(all(feature = "prometheus", not(test)))]
274 {
275 METRIC_RECEIVED_ACKS.increment(&["true"]);
276 METRIC_TICKETS_COUNT.increment(&["losing"]);
277 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
278 .increment(&[&_channel.to_string(), "losing_count"], 1.0f64);
279 }
280 }
281 ResolvedAcknowledgement::Sending(_) => {
282 #[cfg(all(feature = "prometheus", not(test)))]
283 METRIC_RECEIVED_ACKS.increment(&["true"]);
284 }
285 };
286
287 Ok(())
288 }
289
290 async fn get_network_winning_probability(&self) -> Result<WinningProbability> {
291 Ok(self
292 .get_indexer_data(None)
293 .await
294 .map(|data| data.minimum_incoming_ticket_winning_prob)?)
295 }
296
297 async fn get_network_ticket_price(&self) -> Result<HoprBalance> {
298 Ok(self.get_indexer_data(None).await.and_then(|data| {
299 data.ticket_price
300 .ok_or(DbSqlError::LogicalError("missing ticket price".into()))
301 })?)
302 }
303
304 async fn find_surb(&self, matcher: SurbMatcher) -> Result<(HoprSenderId, HoprSurb)> {
305 let pseudonym = matcher.pseudonym();
306 let surbs_for_pseudonym = self
307 .caches
308 .surbs_per_pseudonym
309 .get(&pseudonym)
310 .await
311 .ok_or(DbError::NoSurbAvailable("pseudonym not found".into()))?;
312
313 match matcher {
314 SurbMatcher::Pseudonym(_) => Ok(surbs_for_pseudonym
315 .pop_one()
316 .map(|(id, surb)| (HoprSenderId::from_pseudonym_and_id(&pseudonym, id), surb))?),
317 SurbMatcher::Exact(id) => Ok(surbs_for_pseudonym
322 .pop_one_if_has_id(&id.surb_id())
323 .map(|(id, surb)| (HoprSenderId::from_pseudonym_and_id(&pseudonym, id), surb))?),
324 }
325 }
326
327 #[tracing::instrument(level = "trace", skip(self, data))]
328 async fn to_send_no_ack(&self, data: Box<[u8]>, destination: OffchainPublicKey) -> Result<OutgoingPacket> {
329 let next_peer = self.resolve_chain_key(&destination).await?.ok_or_else(|| {
330 DbSqlError::LogicalError(format!(
331 "failed to find chain key for packet key {} on previous hop",
332 destination.to_peerid_str()
333 ))
334 })?;
335
336 let pseudonym = HoprPseudonym::random();
338 let next_ticket = TicketBuilder::zero_hop().direction(&self.me_onchain, &next_peer);
339
340 let domain_separator = self
341 .get_indexer_data(None)
342 .await?
343 .channels_dst
344 .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the domain separator".into()))?;
345
346 let myself = self.clone();
348 let (packet, _) = spawn_fifo_blocking(move || {
349 HoprPacket::into_outgoing(
350 &data,
351 &pseudonym,
352 PacketRouting::NoAck::<ValidatedPath>(destination),
353 &myself.chain_key,
354 next_ticket,
355 &myself.caches.key_id_mapper,
356 &domain_separator,
357 )
358 .map_err(|e| DbSqlError::LogicalError(format!("failed to construct chain components for a packet: {e}")))
359 })
360 .await?;
361
362 if let Some(out) = packet.try_as_outgoing() {
363 let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
364 transport_payload.extend_from_slice(out.packet.as_ref());
365 transport_payload.extend_from_slice(&out.ticket.into_encoded());
366
367 #[cfg(all(feature = "prometheus", not(test)))]
368 METRIC_SENT_ACKS.increment();
369
370 Ok(OutgoingPacket {
371 next_hop: out.next_hop,
372 ack_challenge: out.ack_challenge,
373 data: transport_payload.into_boxed_slice(),
374 })
375 } else {
376 Err(DbSqlError::LogicalError("must be an outgoing packet".into()).into())
377 }
378 }
379
380 #[tracing::instrument(level = "trace", skip(self, data, routing))]
381 async fn to_send(
382 &self,
383 data: Box<[u8]>,
384 routing: ResolvedTransportRouting,
385 outgoing_ticket_win_prob: WinningProbability,
386 outgoing_ticket_price: HoprBalance,
387 ) -> Result<OutgoingPacket> {
388 let (next_peer, num_hops, pseudonym, routing) = match routing {
390 ResolvedTransportRouting::Forward {
391 pseudonym,
392 forward_path,
393 return_paths,
394 } => (
395 forward_path[0],
396 forward_path.num_hops(),
397 pseudonym,
398 PacketRouting::ForwardPath {
399 forward_path,
400 return_paths,
401 },
402 ),
403 ResolvedTransportRouting::Return(sender_id, surb) => {
404 let next = self
405 .caches
406 .key_id_mapper
407 .map_id_to_public(&surb.first_relayer)
408 .ok_or(DbSqlError::MissingAccount)?;
409
410 (
411 next,
412 surb.additional_data_receiver.proof_of_relay_values().chain_length() as usize,
413 sender_id.pseudonym(),
414 PacketRouting::Surb(sender_id.surb_id(), surb),
415 )
416 }
417 };
418
419 let next_peer = self.resolve_chain_key(&next_peer).await?.ok_or_else(|| {
420 DbSqlError::LogicalError(format!(
421 "failed to find chain key for packet key {} on previous hop",
422 next_peer.to_peerid_str()
423 ))
424 })?;
425
426 let next_ticket = if num_hops > 1 {
428 let channel = self
429 .get_channel_by_parties(None, &self.me_onchain, &next_peer, true)
430 .await?
431 .ok_or(DbSqlError::LogicalError(format!("channel to '{next_peer}' not found")))?;
432
433 self.create_multihop_ticket(
434 channel,
435 self.me_onchain,
436 next_peer,
437 num_hops as u8,
438 outgoing_ticket_win_prob,
439 outgoing_ticket_price,
440 )
441 .await?
442 } else {
443 TicketBuilder::zero_hop().direction(&self.me_onchain, &next_peer)
444 };
445
446 let domain_separator = self
447 .get_indexer_data(None)
448 .await?
449 .channels_dst
450 .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the domain separator".into()))?;
451
452 let myself = self.clone();
454 let (packet, openers) = spawn_fifo_blocking(move || {
455 HoprPacket::into_outgoing(
456 &data,
457 &pseudonym,
458 routing,
459 &myself.chain_key,
460 next_ticket,
461 &myself.caches.key_id_mapper,
462 &domain_separator,
463 )
464 .map_err(|e| DbSqlError::LogicalError(format!("failed to construct chain components for a packet: {e}")))
465 })
466 .await?;
467
468 openers.into_iter().for_each(|(surb_id, opener)| {
471 self.caches
472 .pseudonym_openers
473 .insert(HoprSenderId::from_pseudonym_and_id(&pseudonym, surb_id), opener)
474 });
475
476 if let Some(out) = packet.try_as_outgoing() {
477 self.caches
478 .unacked_tickets
479 .insert(out.ack_challenge, PendingAcknowledgement::WaitingAsSender)
480 .await;
481
482 let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
483 transport_payload.extend_from_slice(out.packet.as_ref());
484 transport_payload.extend_from_slice(&out.ticket.into_encoded());
485
486 Ok(OutgoingPacket {
487 next_hop: out.next_hop,
488 ack_challenge: out.ack_challenge,
489 data: transport_payload.into_boxed_slice(),
490 })
491 } else {
492 Err(DbSqlError::LogicalError("must be an outgoing packet".into()).into())
493 }
494 }
495
496 #[tracing::instrument(level = "trace", skip(self, data, pkt_keypair, sender), fields(sender = %sender))]
497 async fn from_recv(
498 &self,
499 data: Box<[u8]>,
500 pkt_keypair: &OffchainKeypair,
501 sender: OffchainPublicKey,
502 outgoing_ticket_win_prob: WinningProbability,
503 outgoing_ticket_price: HoprBalance,
504 ) -> Result<Option<IncomingPacket>> {
505 let offchain_keypair = pkt_keypair.clone();
506 let myself = self.clone();
507
508 let packet = spawn_fifo_blocking(move || {
509 HoprPacket::from_incoming(&data, &offchain_keypair, sender, &myself.caches.key_id_mapper, |p| {
510 myself.caches.pseudonym_openers.remove(p)
511 })
512 .map_err(|e| DbSqlError::LogicalError(format!("failed to construct an incoming packet: {e}")))
513 })
514 .await?;
515
516 match packet {
517 HoprPacket::Final(incoming) => {
518 if !incoming.surbs.is_empty() {
520 let num_surbs = incoming.surbs.len();
521
522 self.caches
524 .surbs_per_pseudonym
525 .entry_by_ref(&incoming.sender)
526 .or_default() .await
528 .value()
529 .push(incoming.surbs)?;
530
531 tracing::trace!(pseudonym = %incoming.sender, num_surbs, "stored incoming surbs for pseudonym");
532 }
533
534 if let Some(ack_key) = incoming.ack_key {
535 Ok(Some(IncomingPacket::Final {
536 packet_tag: incoming.packet_tag,
537 previous_hop: incoming.previous_hop,
538 sender: incoming.sender,
539 plain_text: incoming.plain_text,
540 ack_key,
541 }))
542 } else {
543 let ack: Acknowledgement = incoming.plain_text.as_ref().try_into().map_err(|error| {
545 tracing::error!(%error, "failed to decode the acknowledgement");
546
547 #[cfg(all(feature = "prometheus", not(test)))]
548 METRIC_RECEIVED_ACKS.increment(&["false"]);
549
550 DbSqlError::DecodingError
551 })?;
552
553 let ack = ack
554 .validate(&incoming.previous_hop)
555 .map_err(|e| crate::errors::DbSqlError::AcknowledgementValidationError(e.to_string()))?;
556 self.handle_acknowledgement(ack).await?;
557
558 Ok(None)
559 }
560 }
561 HoprPacket::Forwarded(fwd) => {
562 match self
563 .validate_and_replace_ticket(*fwd, &self.chain_key, outgoing_ticket_win_prob, outgoing_ticket_price)
564 .await
565 {
566 Ok(fwd) => {
567 let mut payload = Vec::with_capacity(HoprPacket::SIZE);
568 payload.extend_from_slice(fwd.outgoing.packet.as_ref());
569 payload.extend_from_slice(&fwd.outgoing.ticket.into_encoded());
570
571 Ok(Some(IncomingPacket::Forwarded {
572 packet_tag: fwd.packet_tag,
573 previous_hop: fwd.previous_hop,
574 next_hop: fwd.outgoing.next_hop,
575 data: payload.into_boxed_slice(),
576 ack: Acknowledgement::new(fwd.ack_key, pkt_keypair),
577 }))
578 }
579 Err(DbSqlError::TicketValidationError(boxed_error)) => {
580 let (rejected_ticket, error) = *boxed_error;
581 let rejected_value = rejected_ticket.amount;
582 warn!(?rejected_ticket, %rejected_value, erorr = ?error, "failure to validate during forwarding");
583
584 self.mark_unsaved_ticket_rejected(&rejected_ticket).await.map_err(|e| {
585 DbSqlError::TicketValidationError(Box::new((
586 rejected_ticket.clone(),
587 format!("during validation error '{error}' update another error occurred: {e}"),
588 )))
589 })?;
590
591 Err(DbSqlError::TicketValidationError(Box::new((rejected_ticket, error))).into())
592 }
593 Err(e) => Err(e.into()),
594 }
595 }
596 HoprPacket::Outgoing(_) => Err(DbSqlError::LogicalError("cannot receive an outgoing packet".into()).into()),
597 }
598 }
599}
600
601impl HoprDb {
602 async fn create_multihop_ticket(
603 &self,
604 channel: ChannelEntry,
605 me_onchain: Address,
606 destination: Address,
607 current_path_pos: u8,
608 winning_prob: WinningProbability,
609 ticket_price: HoprBalance,
610 ) -> crate::errors::Result<TicketBuilder> {
611 let amount = HoprBalance::from(
613 ticket_price
614 .amount()
615 .mul(U256::from(current_path_pos - 1))
616 .div_f64(winning_prob.into())
617 .map_err(|e| {
618 DbSqlError::LogicalError(format!(
619 "winning probability outside of the allowed interval (0.0, 1.0]: {e}"
620 ))
621 })?,
622 );
623
624 if channel.balance.lt(&amount) {
625 return Err(DbSqlError::LogicalError(format!(
626 "out of funds: {} with counterparty {destination} has balance {} < {amount}",
627 channel.get_id(),
628 channel.balance
629 )));
630 }
631
632 let ticket_builder = TicketBuilder::default()
633 .direction(&me_onchain, &destination)
634 .balance(amount)
635 .index(self.increment_outgoing_ticket_index(channel.get_id()).await?)
636 .index_offset(1) .win_prob(winning_prob)
638 .channel_epoch(channel.channel_epoch.as_u32());
639
640 Ok(ticket_builder)
641 }
642}
643
644#[async_trait::async_trait]
645impl PathAddressResolver for HoprDb {
646 async fn resolve_transport_address(
647 &self,
648 address: &Address,
649 ) -> std::result::Result<Option<OffchainPublicKey>, PathError> {
650 self.resolve_packet_key(address)
651 .await
652 .map_err(|_| PathError::UnknownPeer(address.to_string()))
653 }
654
655 async fn resolve_chain_address(&self, key: &OffchainPublicKey) -> std::result::Result<Option<Address>, PathError> {
656 self.resolve_chain_key(key)
657 .await
658 .map_err(|_| PathError::UnknownPeer(key.to_string()))
659 }
660}