1use std::ops::{Mul, Sub};
2
3use async_trait::async_trait;
4use hopr_api::{
5 chain::{ChainKeyOperations, ChainReadChannelOperations, ChainValues},
6 db::*,
7};
8use hopr_crypto_packet::{errors::PacketError, prelude::*};
9use hopr_crypto_types::{crypto_traits::Randomizable, prelude::*};
10use hopr_internal_types::prelude::*;
11use hopr_network_types::prelude::{ResolvedTransportRouting, SurbMatcher};
12use hopr_parallelize::cpu::spawn_fifo_blocking;
13use hopr_path::{Path, ValidatedPath};
14use hopr_primitive_types::prelude::*;
15use tracing::{instrument, trace, warn};
16
17use crate::{cache::SurbRingBuffer, db::HoprNodeDb, errors::NodeDbError};
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21 static ref METRIC_INCOMING_WIN_PROB: hopr_metrics::SimpleHistogram =
22 hopr_metrics::SimpleHistogram::new(
23 "hopr_tickets_incoming_win_probability",
24 "Observes the winning probabilities on incoming tickets",
25 vec![0.0, 0.0001, 0.001, 0.01, 0.05, 0.1, 0.15, 0.25, 0.3, 0.5],
26 ).unwrap();
27 pub(crate) static ref METRIC_RECEIVED_ACKS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
28 "hopr_received_ack_count",
29 "Number of received acknowledgements",
30 &["valid"]
31 )
32 .unwrap();
33 pub(crate) static ref METRIC_SENT_ACKS: hopr_metrics::SimpleCounter =
34 hopr_metrics::SimpleCounter::new("hopr_sent_acks_count", "Number of sent message acknowledgements").unwrap();
35
36 pub(crate) static ref METRIC_TICKETS_COUNT: hopr_metrics::MultiCounter =
37 hopr_metrics::MultiCounter::new("hopr_tickets_count", "Number of winning tickets", &["type"]).unwrap();
38}
39
40const SLOW_OP_MS: u128 = 150;
41
42impl HoprNodeDb {
43 #[instrument(level = "trace", skip_all, err)]
45 async fn validate_and_replace_ticket<R>(
46 &self,
47 mut fwd: HoprForwardedPacket,
48 resolver: &R,
49 outgoing_ticket_win_prob: WinningProbability,
50 outgoing_ticket_price: HoprBalance,
51 ) -> Result<HoprForwardedPacket, NodeDbError>
52 where
53 R: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
54 {
55 let previous_hop_addr = resolver
56 .packet_key_to_chain_key(&fwd.previous_hop)
57 .await
58 .map_err(|e| NodeDbError::Other(e.into()))?
59 .ok_or_else(|| {
60 NodeDbError::LogicalError(format!(
61 "failed to find channel key for packet key {} on previous hop",
62 fwd.previous_hop.to_peerid_str()
63 ))
64 })?;
65
66 let next_hop_addr = resolver
67 .packet_key_to_chain_key(&fwd.outgoing.next_hop)
68 .await
69 .map_err(|e| NodeDbError::Other(e.into()))?
70 .ok_or_else(|| {
71 NodeDbError::LogicalError(format!(
72 "failed to find channel key for packet key {} on next hop",
73 fwd.outgoing.next_hop.to_peerid_str()
74 ))
75 })?;
76
77 let incoming_channel = resolver
78 .channel_by_parties(&previous_hop_addr, &self.me_address)
79 .await
80 .map_err(|e| NodeDbError::Other(e.into()))?
81 .ok_or_else(|| {
82 NodeDbError::LogicalError(format!(
83 "no channel found for previous hop address '{previous_hop_addr}'"
84 ))
85 })?;
86
87 if !fwd.outgoing.ticket.channel_id.eq(&incoming_channel.get_id()) {
89 return Err(NodeDbError::LogicalError("invalid ticket for channel".into()));
90 }
91
92 let domain_separator = resolver
93 .domain_separators()
94 .await
95 .map_err(|e| NodeDbError::Other(e.into()))?
96 .channel;
97
98 let minimum_ticket_price = resolver
101 .minimum_ticket_price()
102 .await
103 .map_err(|e| NodeDbError::Other(e.into()))?
104 .mul(U256::from(fwd.path_pos));
105
106 #[cfg(all(feature = "prometheus", not(test)))]
107 METRIC_INCOMING_WIN_PROB.observe(fwd.outgoing.ticket.win_prob().as_f64());
108
109 let remaining_balance = incoming_channel
110 .balance
111 .sub(self.ticket_manager.unrealized_value((&incoming_channel).into()).await?);
112
113 let start = std::time::Instant::now();
118 let win_prob = resolver
119 .minimum_incoming_ticket_win_prob()
120 .await
121 .map_err(|e| NodeDbError::Other(e.into()))?;
122 let verified_incoming_ticket = spawn_fifo_blocking(move || {
123 validate_unacknowledged_ticket(
124 fwd.outgoing.ticket,
125 &incoming_channel,
126 minimum_ticket_price,
127 win_prob,
128 remaining_balance,
129 &domain_separator,
130 )
131 })
132 .await?;
133
134 if start.elapsed().as_millis() > SLOW_OP_MS {
135 warn!("validate_unacknowledged_ticket took {} ms", start.elapsed().as_millis());
136 }
137
138 let outgoing_ticket_win_prob = outgoing_ticket_win_prob.max(&verified_incoming_ticket.win_prob());
142
143 self.caches
145 .unacked_tickets
146 .insert(
147 fwd.outgoing.ack_challenge,
148 PendingAcknowledgement::WaitingAsRelayer(
149 verified_incoming_ticket.into_unacknowledged(fwd.own_key).into(),
150 ),
151 )
152 .await;
153
154 let ticket_builder = if fwd.path_pos > 1 {
161 let outgoing_channel = resolver
163 .channel_by_parties(&self.me_address, &next_hop_addr)
164 .await
165 .map_err(|e| NodeDbError::Other(e.into()))?
166 .ok_or(NodeDbError::LogicalError(format!(
167 "channel to '{next_hop_addr}' not found",
168 )))?;
169
170 self.create_multihop_ticket(
171 outgoing_channel,
172 next_hop_addr,
173 fwd.path_pos,
174 outgoing_ticket_win_prob,
175 outgoing_ticket_price,
176 )
177 .await?
178 } else {
179 TicketBuilder::zero_hop().direction(&self.me_address, &next_hop_addr)
180 };
181
182 let ticket_builder = ticket_builder.eth_challenge(fwd.next_challenge);
184 let me_on_chain = self.me_onchain.clone();
185 fwd.outgoing.ticket = spawn_fifo_blocking(move || ticket_builder.build_signed(&me_on_chain, &domain_separator))
186 .await?
187 .leak();
188
189 Ok(fwd)
190 }
191
192 #[instrument(level = "trace", skip(self, ack, resolver), err)]
193 async fn find_ticket_to_acknowledge<R>(
194 &self,
195 ack: &VerifiedAcknowledgement,
196 resolver: &R,
197 ) -> Result<ResolvedAcknowledgement, NodeDbError>
198 where
199 R: ChainReadChannelOperations + ChainValues + Send + Sync,
200 {
201 let ack_half_key = *ack.ack_key_share();
202 let challenge = hopr_parallelize::cpu::spawn_blocking(move || ack_half_key.to_challenge()).await?;
203
204 let pending_ack = self.caches.unacked_tickets.remove(&challenge).await.ok_or_else(|| {
205 NodeDbError::AcknowledgementValidationError(format!(
206 "received unexpected acknowledgement for half key challenge {}",
207 ack.ack_key_share().to_hex()
208 ))
209 })?;
210
211 match pending_ack {
212 PendingAcknowledgement::WaitingAsSender => {
213 trace!("received acknowledgement as sender: first relayer has processed the packet");
214 Ok(ResolvedAcknowledgement::Sending(*ack))
215 }
216
217 PendingAcknowledgement::WaitingAsRelayer(unacknowledged) => {
218 let maybe_channel_with_issuer = resolver
219 .channel_by_parties(unacknowledged.ticket.verified_issuer(), &self.me_address)
220 .await
221 .map_err(|e| NodeDbError::Other(e.into()))?;
222
223 if maybe_channel_with_issuer
225 .is_some_and(|c| c.channel_epoch.as_u32() == unacknowledged.verified_ticket().channel_epoch)
226 {
227 let domain_separator = resolver
228 .domain_separators()
229 .await
230 .map_err(|e| NodeDbError::Other(e.into()))?
231 .channel;
232
233 let chain_key = self.me_onchain.clone();
234 hopr_parallelize::cpu::spawn_blocking(move || {
235 let ack_ticket = unacknowledged.acknowledge(&ack_half_key)?;
240
241 if ack_ticket.is_winning(&chain_key, &domain_separator) {
242 trace!("Found a winning ticket");
243 Ok(ResolvedAcknowledgement::RelayingWin(ack_ticket))
244 } else {
245 trace!("Found a losing ticket");
246 Ok(ResolvedAcknowledgement::RelayingLoss(
247 ack_ticket.ticket.verified_ticket().channel_id,
248 ))
249 }
250 })
251 .await
252 } else {
253 Err(NodeDbError::LogicalError(format!(
254 "no channel found for address '{}' with a matching epoch",
255 unacknowledged.ticket.verified_issuer()
256 )))
257 }
258 }
259 }
260 }
261}
262
263#[async_trait]
264impl HoprDbProtocolOperations for HoprNodeDb {
265 type Error = NodeDbError;
266
267 #[instrument(level = "trace", skip(self, ack, resolver), err(Debug), ret)]
268 async fn handle_acknowledgement<R>(&self, ack: VerifiedAcknowledgement, resolver: &R) -> Result<(), NodeDbError>
269 where
270 R: ChainReadChannelOperations + ChainValues + Send + Sync,
271 {
272 let result = self.find_ticket_to_acknowledge(&ack, resolver).await?;
273 match &result {
274 ResolvedAcknowledgement::RelayingWin(ack_ticket) => {
275 self.ticket_manager.insert_ticket(ack_ticket.clone()).await?;
277
278 #[cfg(all(feature = "prometheus", not(test)))]
279 {
280 METRIC_RECEIVED_ACKS.increment(&["true"]);
281 METRIC_TICKETS_COUNT.increment(&["winning"]);
282
283 let verified_ticket = ack_ticket.ticket.verified_ticket();
284 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
285 &["unredeemed"],
286 self.ticket_manager
287 .unrealized_value(TicketSelector::new(
288 verified_ticket.channel_id,
289 verified_ticket.channel_epoch,
290 ))
291 .await?
292 .amount()
293 .as_u128() as f64,
294 );
295 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.increment(&["winning_count"], 1.0f64);
296 }
297 }
298 ResolvedAcknowledgement::RelayingLoss(_channel) => {
299 #[cfg(all(feature = "prometheus", not(test)))]
300 {
301 METRIC_RECEIVED_ACKS.increment(&["true"]);
302 METRIC_TICKETS_COUNT.increment(&["losing"]);
303 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.increment(&["losing_count"], 1.0f64);
304 }
305 }
306 ResolvedAcknowledgement::Sending(_) => {
307 #[cfg(all(feature = "prometheus", not(test)))]
308 METRIC_RECEIVED_ACKS.increment(&["true"]);
309 }
310 };
311
312 Ok(())
313 }
314
315 #[tracing::instrument(level = "trace", skip(self, matcher), err)]
316 async fn find_surb(&self, matcher: SurbMatcher) -> Result<FoundSurb, NodeDbError> {
317 let pseudonym = matcher.pseudonym();
318 let surbs_for_pseudonym = self
319 .caches
320 .surbs_per_pseudonym
321 .get(&pseudonym)
322 .await
323 .ok_or(NodeDbError::NoSurbAvailable("pseudonym not found".into()))?;
324
325 match matcher {
326 SurbMatcher::Pseudonym(_) => Ok(surbs_for_pseudonym.pop_one().map(|popped_surb| FoundSurb {
327 sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
328 surb: popped_surb.surb,
329 remaining: popped_surb.remaining,
330 })?),
331 SurbMatcher::Exact(id) => Ok(surbs_for_pseudonym
336 .pop_one_if_has_id(&id.surb_id())
337 .map(|popped_surb| FoundSurb {
338 sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
339 surb: popped_surb.surb,
340 remaining: popped_surb.remaining, })?),
342 }
343 }
344
345 #[inline]
346 fn get_surb_config(&self) -> SurbCacheConfig {
347 SurbCacheConfig {
348 rb_capacity: self.cfg.surb_ring_buffer_size,
349 distress_threshold: self.cfg.surb_distress_threshold,
350 }
351 }
352
353 #[tracing::instrument(level = "trace", skip(self, data, resolver))]
354 async fn to_send_no_ack<R>(
355 &self,
356 data: Box<[u8]>,
357 destination: OffchainPublicKey,
358 resolver: &R,
359 ) -> Result<OutgoingPacket, NodeDbError>
360 where
361 R: ChainKeyOperations + ChainValues + Send + Sync,
362 {
363 let next_peer = resolver
364 .packet_key_to_chain_key(&destination)
365 .await
366 .map_err(|e| NodeDbError::Other(e.into()))?
367 .ok_or_else(|| {
368 NodeDbError::LogicalError(format!(
369 "failed to find chain key for packet key {} on previous hop",
370 destination.to_peerid_str()
371 ))
372 })?;
373
374 let pseudonym = HoprPseudonym::random();
376 let next_ticket = TicketBuilder::zero_hop().direction(&self.me_address, &next_peer);
377 let domain_separator = resolver
378 .domain_separators()
379 .await
380 .map_err(|e| NodeDbError::Other(e.into()))?
381 .channel;
382
383 let chain_key = self.me_onchain.clone();
385 let mapper = resolver.key_id_mapper_ref().clone();
386 let (packet, _) = spawn_fifo_blocking(move || {
387 HoprPacket::into_outgoing(
388 &data,
389 &pseudonym,
390 PacketRouting::NoAck::<ValidatedPath>(destination),
391 &chain_key,
392 next_ticket,
393 &mapper,
394 &domain_separator,
395 None, )
397 .map_err(|e| {
398 NodeDbError::LogicalError(format!("failed to construct chain components for a no-ack packet: {e}"))
399 })
400 })
401 .await?;
402
403 if let Some(out) = packet.try_as_outgoing() {
404 let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
405 transport_payload.extend_from_slice(out.packet.as_ref());
406 transport_payload.extend_from_slice(&out.ticket.into_encoded());
407
408 #[cfg(all(feature = "prometheus", not(test)))]
409 METRIC_SENT_ACKS.increment();
410
411 Ok(OutgoingPacket {
412 next_hop: out.next_hop,
413 ack_challenge: out.ack_challenge,
414 data: transport_payload.into_boxed_slice(),
415 })
416 } else {
417 Err(NodeDbError::LogicalError("must be an outgoing packet".into()))
418 }
419 }
420
421 #[tracing::instrument(level = "trace", skip(self, data, routing, resolver))]
422 async fn to_send<R>(
423 &self,
424 data: Box<[u8]>,
425 routing: ResolvedTransportRouting,
426 outgoing_ticket_win_prob: Option<WinningProbability>,
427 outgoing_ticket_price: Option<HoprBalance>,
428 signals: PacketSignals,
429 resolver: &R,
430 ) -> Result<OutgoingPacket, NodeDbError>
431 where
432 R: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
433 {
434 let (next_peer, num_hops, pseudonym, routing) = match routing {
436 ResolvedTransportRouting::Forward {
437 pseudonym,
438 forward_path,
439 return_paths,
440 } => (
441 forward_path[0],
442 forward_path.num_hops(),
443 pseudonym,
444 PacketRouting::ForwardPath {
445 forward_path,
446 return_paths,
447 },
448 ),
449 ResolvedTransportRouting::Return(sender_id, surb) => {
450 let next = resolver
451 .key_id_mapper_ref()
452 .map_id_to_public(&surb.first_relayer)
453 .ok_or(NodeDbError::LogicalError(format!(
454 "failed to find public key for key id {}",
455 surb.first_relayer
456 )))?;
457
458 (
459 next,
460 surb.additional_data_receiver.proof_of_relay_values().chain_length() as usize,
461 sender_id.pseudonym(),
462 PacketRouting::Surb(sender_id.surb_id(), surb),
463 )
464 }
465 };
466
467 let next_peer = resolver
468 .packet_key_to_chain_key(&next_peer)
469 .await
470 .map_err(|e| NodeDbError::Other(e.into()))?
471 .ok_or_else(|| {
472 NodeDbError::LogicalError(format!(
473 "failed to find chain key for packet key {} on previous hop",
474 next_peer.to_peerid_str()
475 ))
476 })?;
477
478 let next_ticket = if num_hops > 1 {
480 let channel = resolver
481 .channel_by_parties(&self.me_address, &next_peer)
482 .await
483 .map_err(|e| NodeDbError::Other(e.into()))?
484 .ok_or(NodeDbError::LogicalError(format!("channel to '{next_peer}' not found")))?;
485
486 let (outgoing_ticket_win_prob, outgoing_ticket_price) =
487 determine_network_config(outgoing_ticket_win_prob, outgoing_ticket_price, resolver).await?;
488
489 self.create_multihop_ticket(
490 channel,
491 next_peer,
492 num_hops as u8,
493 outgoing_ticket_win_prob,
494 outgoing_ticket_price,
495 )
496 .await?
497 } else {
498 TicketBuilder::zero_hop().direction(&self.me_address, &next_peer)
499 };
500
501 let domain_separator = resolver
502 .domain_separators()
503 .await
504 .map_err(|e| NodeDbError::Other(e.into()))?
505 .channel;
506
507 let chain_key = self.me_onchain.clone();
509 let mapper = resolver.key_id_mapper_ref().clone();
510 let (packet, openers) = spawn_fifo_blocking(move || {
511 HoprPacket::into_outgoing(
512 &data,
513 &pseudonym,
514 routing,
515 &chain_key,
516 next_ticket,
517 &mapper,
518 &domain_separator,
519 signals,
520 )
521 .map_err(|e| NodeDbError::LogicalError(format!("failed to construct chain components for a packet: {e}")))
522 })
523 .await?;
524
525 openers.into_iter().for_each(|(surb_id, opener)| {
528 self.caches
529 .insert_pseudonym_opener(HoprSenderId::from_pseudonym_and_id(&pseudonym, surb_id), opener)
530 });
531
532 if let Some(out) = packet.try_as_outgoing() {
533 self.caches
534 .unacked_tickets
535 .insert(out.ack_challenge, PendingAcknowledgement::WaitingAsSender)
536 .await;
537
538 let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
539 transport_payload.extend_from_slice(out.packet.as_ref());
540 transport_payload.extend_from_slice(&out.ticket.into_encoded());
541
542 Ok(OutgoingPacket {
543 next_hop: out.next_hop,
544 ack_challenge: out.ack_challenge,
545 data: transport_payload.into_boxed_slice(),
546 })
547 } else {
548 Err(NodeDbError::LogicalError("must be an outgoing packet".into()))
549 }
550 }
551
552 #[tracing::instrument(level = "trace", skip_all, fields(sender = %sender), err)]
553 async fn from_recv<R>(
554 &self,
555 data: Box<[u8]>,
556 pkt_keypair: &OffchainKeypair,
557 sender: OffchainPublicKey,
558 outgoing_ticket_win_prob: Option<WinningProbability>,
559 outgoing_ticket_price: Option<HoprBalance>,
560 resolver: &R,
561 ) -> Result<IncomingPacket, IncomingPacketError<NodeDbError>>
562 where
563 R: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
564 {
565 let offchain_keypair = pkt_keypair.clone();
566 let caches = self.caches.clone();
567 let start = std::time::Instant::now();
568 let mapper = resolver.key_id_mapper_ref().clone();
569 let packet = spawn_fifo_blocking(move || {
570 HoprPacket::from_incoming(&data, &offchain_keypair, sender, &mapper, |p| {
571 caches.extract_pseudonym_opener(p)
572 })
573 .map_err(|error| match &error {
574 PacketError::PacketDecodingError(_) | PacketError::SphinxError(_) => {
575 IncomingPacketError::Undecodable(NodeDbError::PacketError(error))
578 }
579 _ => IncomingPacketError::ProcessingError(NodeDbError::PacketError(error)),
580 })
581 })
582 .await?;
583 if start.elapsed().as_millis() > SLOW_OP_MS {
584 warn!(
585 peer = sender.to_peerid_str(),
586 "from_incoming took {} ms",
587 start.elapsed().as_millis()
588 );
589 }
590
591 match packet {
592 HoprPacket::Final(incoming) => {
593 let info = AuxiliaryPacketInfo {
595 packet_signals: incoming.signals,
596 num_surbs: incoming.surbs.len(),
597 };
598
599 if !incoming.surbs.is_empty() {
601 self.caches
602 .surbs_per_pseudonym
603 .entry_by_ref(&incoming.sender)
604 .or_insert_with(futures::future::lazy(|_| {
605 SurbRingBuffer::new(self.cfg.surb_ring_buffer_size)
606 }))
607 .await
608 .value()
609 .push(incoming.surbs)
610 .map_err(IncomingPacketError::ProcessingError)?;
611
612 trace!(pseudonym = %incoming.sender, num_surbs = info.num_surbs, "stored incoming surbs for pseudonym");
613 }
614
615 Ok(match incoming.ack_key {
616 None => {
617 IncomingPacket::Acknowledgement {
619 packet_tag: incoming.packet_tag,
620 previous_hop: incoming.previous_hop,
621 ack: incoming
622 .plain_text
623 .as_ref()
624 .try_into()
625 .map_err(|e| IncomingPacketError::ProcessingError(NodeDbError::TypeError(e)))?,
626 }
627 }
628 Some(ack_key) => IncomingPacket::Final {
629 packet_tag: incoming.packet_tag,
630 previous_hop: incoming.previous_hop,
631 sender: incoming.sender,
632 plain_text: incoming.plain_text,
633 ack_key,
634 info,
635 },
636 })
637 }
638 HoprPacket::Forwarded(fwd) => {
639 let (outgoing_ticket_win_prob, outgoing_ticket_price) =
640 determine_network_config(outgoing_ticket_win_prob, outgoing_ticket_price, resolver)
641 .await
642 .map_err(IncomingPacketError::ProcessingError)?;
643
644 let start = std::time::Instant::now();
645 let validation_res = self
646 .validate_and_replace_ticket(*fwd, resolver, outgoing_ticket_win_prob, outgoing_ticket_price)
647 .await;
648 if start.elapsed().as_millis() > SLOW_OP_MS {
649 warn!(
650 peer = sender.to_peerid_str(),
651 "validate_and_replace_ticket took {} ms",
652 start.elapsed().as_millis()
653 );
654 }
655 match validation_res {
656 Ok(fwd) => {
657 let mut payload = Vec::with_capacity(HoprPacket::SIZE);
658 payload.extend_from_slice(fwd.outgoing.packet.as_ref());
659 payload.extend_from_slice(&fwd.outgoing.ticket.into_encoded());
660
661 Ok(IncomingPacket::Forwarded {
662 packet_tag: fwd.packet_tag,
663 previous_hop: fwd.previous_hop,
664 next_hop: fwd.outgoing.next_hop,
665 data: payload.into_boxed_slice(),
666 ack_key: fwd.ack_key,
667 })
668 }
669 Err(NodeDbError::TicketValidationError(boxed_error)) => {
670 let (rejected_ticket, error) = *boxed_error;
671 let rejected_value = rejected_ticket.amount;
672 warn!(?rejected_ticket, %rejected_value, erorr = ?error, "failure to validate during forwarding");
673
674 self.mark_unsaved_ticket_rejected(&rejected_ticket)
675 .await
676 .map_err(|e| {
677 NodeDbError::TicketValidationError(Box::new((
678 rejected_ticket.clone(),
679 format!("during validation error '{error}' update another error occurred: {e}"),
680 )))
681 })
682 .map_err(IncomingPacketError::ProcessingError)?;
683
684 #[cfg(all(feature = "prometheus", not(test)))]
685 METRIC_TICKETS_COUNT.increment(&["rejected"]);
686
687 Err(IncomingPacketError::ProcessingError(
688 NodeDbError::TicketValidationError(Box::new((rejected_ticket, error))),
689 ))
690 }
691 Err(e) => Err(IncomingPacketError::ProcessingError(e)),
692 }
693 }
694 HoprPacket::Outgoing(_) => Err(IncomingPacketError::ProcessingError(NodeDbError::LogicalError(
695 "cannot receive an outgoing packet".into(),
696 ))),
697 }
698 }
699}
700
701async fn determine_network_config<R>(
702 configure_outgoing_wp: Option<WinningProbability>,
703 configured_outgoing_price: Option<HoprBalance>,
704 resolver: &R,
705) -> Result<(WinningProbability, HoprBalance), NodeDbError>
706where
707 R: ChainValues,
708{
709 let network_ticket_price = resolver
713 .minimum_ticket_price()
714 .await
715 .map_err(|e| NodeDbError::LogicalError(format!("failed to determine current network ticket price: {e}")))?;
716 let outgoing_ticket_price = configured_outgoing_price.unwrap_or(network_ticket_price);
717
718 let network_win_prob = resolver
720 .minimum_incoming_ticket_win_prob()
721 .await
722 .inspect_err(|error| tracing::error!(%error, "failed to determine current network winning probability"))
723 .ok();
724
725 let outgoing_ticket_win_prob = configure_outgoing_wp.or(network_win_prob).unwrap_or_default(); Ok((outgoing_ticket_win_prob, outgoing_ticket_price))
732}
733
734impl HoprNodeDb {
735 #[tracing::instrument(level = "trace", skip(self, channel))]
736 async fn create_multihop_ticket(
737 &self,
738 channel: ChannelEntry,
739 destination: Address,
740 current_path_pos: u8,
741 winning_prob: WinningProbability,
742 ticket_price: HoprBalance,
743 ) -> Result<TicketBuilder, NodeDbError> {
744 let amount = HoprBalance::from(
746 ticket_price
747 .amount()
748 .mul(U256::from(current_path_pos - 1))
749 .div_f64(winning_prob.into())
750 .map_err(|e| {
751 NodeDbError::LogicalError(format!(
752 "winning probability outside of the allowed interval (0.0, 1.0]: {e}"
753 ))
754 })?,
755 );
756
757 if channel.balance.lt(&amount) {
758 return Err(NodeDbError::LogicalError(format!(
759 "out of funds: {} with counterparty {destination} has balance {} < {amount}",
760 channel.get_id(),
761 channel.balance
762 )));
763 }
764
765 let ticket_builder = TicketBuilder::default()
766 .direction(&self.me_address, &destination)
767 .balance(amount)
768 .index(self.increment_outgoing_ticket_index(channel.get_id()).await?)
769 .index_offset(1) .win_prob(winning_prob)
771 .channel_epoch(channel.channel_epoch.as_u32());
772
773 Ok(ticket_builder)
774 }
775}