hopr_db_node/
protocol.rs

1use std::ops::{Mul, Sub};
2
3use async_trait::async_trait;
4use hopr_api::{
5    chain::{ChainKeyOperations, ChainReadChannelOperations, ChainValues},
6    db::*,
7};
8use hopr_crypto_packet::{errors::PacketError, prelude::*};
9use hopr_crypto_types::{crypto_traits::Randomizable, prelude::*};
10use hopr_internal_types::prelude::*;
11use hopr_network_types::prelude::{ResolvedTransportRouting, SurbMatcher};
12use hopr_parallelize::cpu::spawn_fifo_blocking;
13use hopr_path::{Path, ValidatedPath};
14use hopr_primitive_types::prelude::*;
15use tracing::{instrument, trace, warn};
16
17use crate::{cache::SurbRingBuffer, db::HoprNodeDb, errors::NodeDbError};
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21    static ref METRIC_INCOMING_WIN_PROB: hopr_metrics::SimpleHistogram =
22        hopr_metrics::SimpleHistogram::new(
23            "hopr_tickets_incoming_win_probability",
24            "Observes the winning probabilities on incoming tickets",
25            vec![0.0, 0.0001, 0.001, 0.01, 0.05, 0.1, 0.15, 0.25, 0.3, 0.5],
26        ).unwrap();
27    pub(crate) static ref METRIC_RECEIVED_ACKS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
28        "hopr_received_ack_count",
29        "Number of received acknowledgements",
30        &["valid"]
31    )
32    .unwrap();
33    pub(crate) static ref METRIC_SENT_ACKS: hopr_metrics::SimpleCounter =
34        hopr_metrics::SimpleCounter::new("hopr_sent_acks_count", "Number of sent message acknowledgements").unwrap();
35
36    pub(crate) static ref METRIC_TICKETS_COUNT: hopr_metrics::MultiCounter =
37        hopr_metrics::MultiCounter::new("hopr_tickets_count", "Number of winning tickets", &["type"]).unwrap();
38}
39
40const SLOW_OP_MS: u128 = 150;
41
42impl HoprNodeDb {
43    /// Validates a ticket from a forwarded packet and replaces it with a new ticket for the next hop.
44    #[instrument(level = "trace", skip_all, err)]
45    async fn validate_and_replace_ticket<R>(
46        &self,
47        mut fwd: HoprForwardedPacket,
48        resolver: &R,
49        outgoing_ticket_win_prob: WinningProbability,
50        outgoing_ticket_price: HoprBalance,
51    ) -> Result<HoprForwardedPacket, NodeDbError>
52    where
53        R: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
54    {
55        let previous_hop_addr = resolver
56            .packet_key_to_chain_key(&fwd.previous_hop)
57            .await
58            .map_err(|e| NodeDbError::Other(e.into()))?
59            .ok_or_else(|| {
60                NodeDbError::LogicalError(format!(
61                    "failed to find channel key for packet key {} on previous hop",
62                    fwd.previous_hop.to_peerid_str()
63                ))
64            })?;
65
66        let next_hop_addr = resolver
67            .packet_key_to_chain_key(&fwd.outgoing.next_hop)
68            .await
69            .map_err(|e| NodeDbError::Other(e.into()))?
70            .ok_or_else(|| {
71                NodeDbError::LogicalError(format!(
72                    "failed to find channel key for packet key {} on next hop",
73                    fwd.outgoing.next_hop.to_peerid_str()
74                ))
75            })?;
76
77        let incoming_channel = resolver
78            .channel_by_parties(&previous_hop_addr, &self.me_address)
79            .await
80            .map_err(|e| NodeDbError::Other(e.into()))?
81            .ok_or_else(|| {
82                NodeDbError::LogicalError(format!(
83                    "no channel found for previous hop address '{previous_hop_addr}'"
84                ))
85            })?;
86
87        // Check: is the ticket in the packet really for the given channel?
88        if !fwd.outgoing.ticket.channel_id.eq(&incoming_channel.get_id()) {
89            return Err(NodeDbError::LogicalError("invalid ticket for channel".into()));
90        }
91
92        let domain_separator = resolver
93            .domain_separators()
94            .await
95            .map_err(|e| NodeDbError::Other(e.into()))?
96            .channel;
97
98        // The ticket price from the oracle times my node's position on the
99        // path is the acceptable minimum
100        let minimum_ticket_price = resolver
101            .minimum_ticket_price()
102            .await
103            .map_err(|e| NodeDbError::Other(e.into()))?
104            .mul(U256::from(fwd.path_pos));
105
106        #[cfg(all(feature = "prometheus", not(test)))]
107        METRIC_INCOMING_WIN_PROB.observe(fwd.outgoing.ticket.win_prob().as_f64());
108
109        let remaining_balance = incoming_channel
110            .balance
111            .sub(self.ticket_manager.unrealized_value((&incoming_channel).into()).await?);
112
113        // Here also the signature on the ticket gets validated,
114        // so afterward we are sure the source of the `channel`
115        // (which is equal to `previous_hop_addr`) has issued this
116        // ticket.
117        let start = std::time::Instant::now();
118        let win_prob = resolver
119            .minimum_incoming_ticket_win_prob()
120            .await
121            .map_err(|e| NodeDbError::Other(e.into()))?;
122        let verified_incoming_ticket = spawn_fifo_blocking(move || {
123            validate_unacknowledged_ticket(
124                fwd.outgoing.ticket,
125                &incoming_channel,
126                minimum_ticket_price,
127                win_prob,
128                remaining_balance,
129                &domain_separator,
130            )
131        })
132        .await?;
133
134        if start.elapsed().as_millis() > SLOW_OP_MS {
135            warn!("validate_unacknowledged_ticket took {} ms", start.elapsed().as_millis());
136        }
137
138        // We currently take the maximum of the win prob from the incoming ticket
139        // and the one configured on this node.
140        // Therefore, the winning probability can only increase along the path.
141        let outgoing_ticket_win_prob = outgoing_ticket_win_prob.max(&verified_incoming_ticket.win_prob());
142
143        // The ticket is now validated, let's place it into the acknowledgement waiting queue
144        self.caches
145            .unacked_tickets
146            .insert(
147                fwd.outgoing.ack_challenge,
148                PendingAcknowledgement::WaitingAsRelayer(
149                    verified_incoming_ticket.into_unacknowledged(fwd.own_key).into(),
150                ),
151            )
152            .await;
153
154        // NOTE: that the path position according to the ticket value
155        // may no longer match the path position from the packet header,
156        // because the price of the ticket may be set higher be the ticket
157        // issuer.
158
159        // Create the new ticket for the new packet
160        let ticket_builder = if fwd.path_pos > 1 {
161            // There must be a channel to the next node if it's not the final hop
162            let outgoing_channel = resolver
163                .channel_by_parties(&self.me_address, &next_hop_addr)
164                .await
165                .map_err(|e| NodeDbError::Other(e.into()))?
166                .ok_or(NodeDbError::LogicalError(format!(
167                    "channel to '{next_hop_addr}' not found",
168                )))?;
169
170            self.create_multihop_ticket(
171                outgoing_channel,
172                next_hop_addr,
173                fwd.path_pos,
174                outgoing_ticket_win_prob,
175                outgoing_ticket_price,
176            )
177            .await?
178        } else {
179            TicketBuilder::zero_hop().direction(&self.me_address, &next_hop_addr)
180        };
181
182        // Finally, replace the ticket in the outgoing packet with a new one
183        let ticket_builder = ticket_builder.eth_challenge(fwd.next_challenge);
184        let me_on_chain = self.me_onchain.clone();
185        fwd.outgoing.ticket = spawn_fifo_blocking(move || ticket_builder.build_signed(&me_on_chain, &domain_separator))
186            .await?
187            .leak();
188
189        Ok(fwd)
190    }
191
192    #[instrument(level = "trace", skip(self, ack, resolver), err)]
193    async fn find_ticket_to_acknowledge<R>(
194        &self,
195        ack: &VerifiedAcknowledgement,
196        resolver: &R,
197    ) -> Result<ResolvedAcknowledgement, NodeDbError>
198    where
199        R: ChainReadChannelOperations + ChainValues + Send + Sync,
200    {
201        let ack_half_key = *ack.ack_key_share();
202        let challenge = hopr_parallelize::cpu::spawn_blocking(move || ack_half_key.to_challenge()).await?;
203
204        let pending_ack = self.caches.unacked_tickets.remove(&challenge).await.ok_or_else(|| {
205            NodeDbError::AcknowledgementValidationError(format!(
206                "received unexpected acknowledgement for half key challenge {}",
207                ack.ack_key_share().to_hex()
208            ))
209        })?;
210
211        match pending_ack {
212            PendingAcknowledgement::WaitingAsSender => {
213                trace!("received acknowledgement as sender: first relayer has processed the packet");
214                Ok(ResolvedAcknowledgement::Sending(*ack))
215            }
216
217            PendingAcknowledgement::WaitingAsRelayer(unacknowledged) => {
218                let maybe_channel_with_issuer = resolver
219                    .channel_by_parties(unacknowledged.ticket.verified_issuer(), &self.me_address)
220                    .await
221                    .map_err(|e| NodeDbError::Other(e.into()))?;
222
223                // Issuer's channel must have an epoch matching with the unacknowledged ticket
224                if maybe_channel_with_issuer
225                    .is_some_and(|c| c.channel_epoch.as_u32() == unacknowledged.verified_ticket().channel_epoch)
226                {
227                    let domain_separator = resolver
228                        .domain_separators()
229                        .await
230                        .map_err(|e| NodeDbError::Other(e.into()))?
231                        .channel;
232
233                    let chain_key = self.me_onchain.clone();
234                    hopr_parallelize::cpu::spawn_blocking(move || {
235                        // This explicitly checks whether the acknowledgement
236                        // solves the challenge on the ticket. It must be done before we
237                        // check that the ticket is winning, which is a lengthy operation
238                        // and should not be done for bogus unacknowledged tickets
239                        let ack_ticket = unacknowledged.acknowledge(&ack_half_key)?;
240
241                        if ack_ticket.is_winning(&chain_key, &domain_separator) {
242                            trace!("Found a winning ticket");
243                            Ok(ResolvedAcknowledgement::RelayingWin(ack_ticket))
244                        } else {
245                            trace!("Found a losing ticket");
246                            Ok(ResolvedAcknowledgement::RelayingLoss(
247                                ack_ticket.ticket.verified_ticket().channel_id,
248                            ))
249                        }
250                    })
251                    .await
252                } else {
253                    Err(NodeDbError::LogicalError(format!(
254                        "no channel found for  address '{}' with a matching epoch",
255                        unacknowledged.ticket.verified_issuer()
256                    )))
257                }
258            }
259        }
260    }
261}
262
263#[async_trait]
264impl HoprDbProtocolOperations for HoprNodeDb {
265    type Error = NodeDbError;
266
267    #[instrument(level = "trace", skip(self, ack, resolver), err(Debug), ret)]
268    async fn handle_acknowledgement<R>(&self, ack: VerifiedAcknowledgement, resolver: &R) -> Result<(), NodeDbError>
269    where
270        R: ChainReadChannelOperations + ChainValues + Send + Sync,
271    {
272        let result = self.find_ticket_to_acknowledge(&ack, resolver).await?;
273        match &result {
274            ResolvedAcknowledgement::RelayingWin(ack_ticket) => {
275                // If the ticket was a win, store it
276                self.ticket_manager.insert_ticket(ack_ticket.clone()).await?;
277
278                #[cfg(all(feature = "prometheus", not(test)))]
279                {
280                    METRIC_RECEIVED_ACKS.increment(&["true"]);
281                    METRIC_TICKETS_COUNT.increment(&["winning"]);
282
283                    let verified_ticket = ack_ticket.ticket.verified_ticket();
284                    crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
285                        &["unredeemed"],
286                        self.ticket_manager
287                            .unrealized_value(TicketSelector::new(
288                                verified_ticket.channel_id,
289                                verified_ticket.channel_epoch,
290                            ))
291                            .await?
292                            .amount()
293                            .as_u128() as f64,
294                    );
295                    crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.increment(&["winning_count"], 1.0f64);
296                }
297            }
298            ResolvedAcknowledgement::RelayingLoss(_channel) => {
299                #[cfg(all(feature = "prometheus", not(test)))]
300                {
301                    METRIC_RECEIVED_ACKS.increment(&["true"]);
302                    METRIC_TICKETS_COUNT.increment(&["losing"]);
303                    crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.increment(&["losing_count"], 1.0f64);
304                }
305            }
306            ResolvedAcknowledgement::Sending(_) => {
307                #[cfg(all(feature = "prometheus", not(test)))]
308                METRIC_RECEIVED_ACKS.increment(&["true"]);
309            }
310        };
311
312        Ok(())
313    }
314
315    #[tracing::instrument(level = "trace", skip(self, matcher), err)]
316    async fn find_surb(&self, matcher: SurbMatcher) -> Result<FoundSurb, NodeDbError> {
317        let pseudonym = matcher.pseudonym();
318        let surbs_for_pseudonym = self
319            .caches
320            .surbs_per_pseudonym
321            .get(&pseudonym)
322            .await
323            .ok_or(NodeDbError::NoSurbAvailable("pseudonym not found".into()))?;
324
325        match matcher {
326            SurbMatcher::Pseudonym(_) => Ok(surbs_for_pseudonym.pop_one().map(|popped_surb| FoundSurb {
327                sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
328                surb: popped_surb.surb,
329                remaining: popped_surb.remaining,
330            })?),
331            // The following code intentionally only checks the first SURB in the ring buffer
332            // and does not search the entire RB.
333            // This is because the exact match use-case is suited only for situations
334            // when there is a single SURB in the RB.
335            SurbMatcher::Exact(id) => Ok(surbs_for_pseudonym
336                .pop_one_if_has_id(&id.surb_id())
337                .map(|popped_surb| FoundSurb {
338                    sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
339                    surb: popped_surb.surb,
340                    remaining: popped_surb.remaining, // = likely 0
341                })?),
342        }
343    }
344
345    #[inline]
346    fn get_surb_config(&self) -> SurbCacheConfig {
347        SurbCacheConfig {
348            rb_capacity: self.cfg.surb_ring_buffer_size,
349            distress_threshold: self.cfg.surb_distress_threshold,
350        }
351    }
352
353    #[tracing::instrument(level = "trace", skip(self, data, resolver))]
354    async fn to_send_no_ack<R>(
355        &self,
356        data: Box<[u8]>,
357        destination: OffchainPublicKey,
358        resolver: &R,
359    ) -> Result<OutgoingPacket, NodeDbError>
360    where
361        R: ChainKeyOperations + ChainValues + Send + Sync,
362    {
363        let next_peer = resolver
364            .packet_key_to_chain_key(&destination)
365            .await
366            .map_err(|e| NodeDbError::Other(e.into()))?
367            .ok_or_else(|| {
368                NodeDbError::LogicalError(format!(
369                    "failed to find chain key for packet key {} on previous hop",
370                    destination.to_peerid_str()
371                ))
372            })?;
373
374        // No-ack packets are always sent as zero-hops with a random pseudonym
375        let pseudonym = HoprPseudonym::random();
376        let next_ticket = TicketBuilder::zero_hop().direction(&self.me_address, &next_peer);
377        let domain_separator = resolver
378            .domain_separators()
379            .await
380            .map_err(|e| NodeDbError::Other(e.into()))?
381            .channel;
382
383        // Construct the outgoing packet
384        let chain_key = self.me_onchain.clone();
385        let mapper = resolver.key_id_mapper_ref().clone();
386        let (packet, _) = spawn_fifo_blocking(move || {
387            HoprPacket::into_outgoing(
388                &data,
389                &pseudonym,
390                PacketRouting::NoAck::<ValidatedPath>(destination),
391                &chain_key,
392                next_ticket,
393                &mapper,
394                &domain_separator,
395                None, // NoAck messages currently do not have signals
396            )
397            .map_err(|e| {
398                NodeDbError::LogicalError(format!("failed to construct chain components for a no-ack packet: {e}"))
399            })
400        })
401        .await?;
402
403        if let Some(out) = packet.try_as_outgoing() {
404            let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
405            transport_payload.extend_from_slice(out.packet.as_ref());
406            transport_payload.extend_from_slice(&out.ticket.into_encoded());
407
408            #[cfg(all(feature = "prometheus", not(test)))]
409            METRIC_SENT_ACKS.increment();
410
411            Ok(OutgoingPacket {
412                next_hop: out.next_hop,
413                ack_challenge: out.ack_challenge,
414                data: transport_payload.into_boxed_slice(),
415            })
416        } else {
417            Err(NodeDbError::LogicalError("must be an outgoing packet".into()))
418        }
419    }
420
421    #[tracing::instrument(level = "trace", skip(self, data, routing, resolver))]
422    async fn to_send<R>(
423        &self,
424        data: Box<[u8]>,
425        routing: ResolvedTransportRouting,
426        outgoing_ticket_win_prob: Option<WinningProbability>,
427        outgoing_ticket_price: Option<HoprBalance>,
428        signals: PacketSignals,
429        resolver: &R,
430    ) -> Result<OutgoingPacket, NodeDbError>
431    where
432        R: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
433    {
434        // Get necessary packet routing values
435        let (next_peer, num_hops, pseudonym, routing) = match routing {
436            ResolvedTransportRouting::Forward {
437                pseudonym,
438                forward_path,
439                return_paths,
440            } => (
441                forward_path[0],
442                forward_path.num_hops(),
443                pseudonym,
444                PacketRouting::ForwardPath {
445                    forward_path,
446                    return_paths,
447                },
448            ),
449            ResolvedTransportRouting::Return(sender_id, surb) => {
450                let next = resolver
451                    .key_id_mapper_ref()
452                    .map_id_to_public(&surb.first_relayer)
453                    .ok_or(NodeDbError::LogicalError(format!(
454                        "failed to find public key for key id {}",
455                        surb.first_relayer
456                    )))?;
457
458                (
459                    next,
460                    surb.additional_data_receiver.proof_of_relay_values().chain_length() as usize,
461                    sender_id.pseudonym(),
462                    PacketRouting::Surb(sender_id.surb_id(), surb),
463                )
464            }
465        };
466
467        let next_peer = resolver
468            .packet_key_to_chain_key(&next_peer)
469            .await
470            .map_err(|e| NodeDbError::Other(e.into()))?
471            .ok_or_else(|| {
472                NodeDbError::LogicalError(format!(
473                    "failed to find chain key for packet key {} on previous hop",
474                    next_peer.to_peerid_str()
475                ))
476            })?;
477
478        // Decide whether to create a multi-hop or a zero-hop ticket
479        let next_ticket = if num_hops > 1 {
480            let channel = resolver
481                .channel_by_parties(&self.me_address, &next_peer)
482                .await
483                .map_err(|e| NodeDbError::Other(e.into()))?
484                .ok_or(NodeDbError::LogicalError(format!("channel to '{next_peer}' not found")))?;
485
486            let (outgoing_ticket_win_prob, outgoing_ticket_price) =
487                determine_network_config(outgoing_ticket_win_prob, outgoing_ticket_price, resolver).await?;
488
489            self.create_multihop_ticket(
490                channel,
491                next_peer,
492                num_hops as u8,
493                outgoing_ticket_win_prob,
494                outgoing_ticket_price,
495            )
496            .await?
497        } else {
498            TicketBuilder::zero_hop().direction(&self.me_address, &next_peer)
499        };
500
501        let domain_separator = resolver
502            .domain_separators()
503            .await
504            .map_err(|e| NodeDbError::Other(e.into()))?
505            .channel;
506
507        // Construct the outgoing packet
508        let chain_key = self.me_onchain.clone();
509        let mapper = resolver.key_id_mapper_ref().clone();
510        let (packet, openers) = spawn_fifo_blocking(move || {
511            HoprPacket::into_outgoing(
512                &data,
513                &pseudonym,
514                routing,
515                &chain_key,
516                next_ticket,
517                &mapper,
518                &domain_separator,
519                signals,
520            )
521            .map_err(|e| NodeDbError::LogicalError(format!("failed to construct chain components for a packet: {e}")))
522        })
523        .await?;
524
525        // Store the reply openers under the given SenderId
526        // This is a no-op for reply packets
527        openers.into_iter().for_each(|(surb_id, opener)| {
528            self.caches
529                .insert_pseudonym_opener(HoprSenderId::from_pseudonym_and_id(&pseudonym, surb_id), opener)
530        });
531
532        if let Some(out) = packet.try_as_outgoing() {
533            self.caches
534                .unacked_tickets
535                .insert(out.ack_challenge, PendingAcknowledgement::WaitingAsSender)
536                .await;
537
538            let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
539            transport_payload.extend_from_slice(out.packet.as_ref());
540            transport_payload.extend_from_slice(&out.ticket.into_encoded());
541
542            Ok(OutgoingPacket {
543                next_hop: out.next_hop,
544                ack_challenge: out.ack_challenge,
545                data: transport_payload.into_boxed_slice(),
546            })
547        } else {
548            Err(NodeDbError::LogicalError("must be an outgoing packet".into()))
549        }
550    }
551
552    #[tracing::instrument(level = "trace", skip_all, fields(sender = %sender), err)]
553    async fn from_recv<R>(
554        &self,
555        data: Box<[u8]>,
556        pkt_keypair: &OffchainKeypair,
557        sender: OffchainPublicKey,
558        outgoing_ticket_win_prob: Option<WinningProbability>,
559        outgoing_ticket_price: Option<HoprBalance>,
560        resolver: &R,
561    ) -> Result<IncomingPacket, IncomingPacketError<NodeDbError>>
562    where
563        R: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
564    {
565        let offchain_keypair = pkt_keypair.clone();
566        let caches = self.caches.clone();
567        let start = std::time::Instant::now();
568        let mapper = resolver.key_id_mapper_ref().clone();
569        let packet = spawn_fifo_blocking(move || {
570            HoprPacket::from_incoming(&data, &offchain_keypair, sender, &mapper, |p| {
571                caches.extract_pseudonym_opener(p)
572            })
573            .map_err(|error| match &error {
574                PacketError::PacketDecodingError(_) | PacketError::SphinxError(_) => {
575                    // Such packets should not be acknowledged as this might indicate an adversarial behavior
576                    // or incorrect protocol format
577                    IncomingPacketError::Undecodable(NodeDbError::PacketError(error))
578                }
579                _ => IncomingPacketError::ProcessingError(NodeDbError::PacketError(error)),
580            })
581        })
582        .await?;
583        if start.elapsed().as_millis() > SLOW_OP_MS {
584            warn!(
585                peer = sender.to_peerid_str(),
586                "from_incoming took {} ms",
587                start.elapsed().as_millis()
588            );
589        }
590
591        match packet {
592            HoprPacket::Final(incoming) => {
593                // Extract additional information from the packet that will be passed upwards
594                let info = AuxiliaryPacketInfo {
595                    packet_signals: incoming.signals,
596                    num_surbs: incoming.surbs.len(),
597                };
598
599                // Store all incoming SURBs if any
600                if !incoming.surbs.is_empty() {
601                    self.caches
602                        .surbs_per_pseudonym
603                        .entry_by_ref(&incoming.sender)
604                        .or_insert_with(futures::future::lazy(|_| {
605                            SurbRingBuffer::new(self.cfg.surb_ring_buffer_size)
606                        }))
607                        .await
608                        .value()
609                        .push(incoming.surbs)
610                        .map_err(IncomingPacketError::ProcessingError)?;
611
612                    trace!(pseudonym = %incoming.sender, num_surbs = info.num_surbs, "stored incoming surbs for pseudonym");
613                }
614
615                Ok(match incoming.ack_key {
616                    None => {
617                        // The contained payload represents an Acknowledgement
618                        IncomingPacket::Acknowledgement {
619                            packet_tag: incoming.packet_tag,
620                            previous_hop: incoming.previous_hop,
621                            ack: incoming
622                                .plain_text
623                                .as_ref()
624                                .try_into()
625                                .map_err(|e| IncomingPacketError::ProcessingError(NodeDbError::TypeError(e)))?,
626                        }
627                    }
628                    Some(ack_key) => IncomingPacket::Final {
629                        packet_tag: incoming.packet_tag,
630                        previous_hop: incoming.previous_hop,
631                        sender: incoming.sender,
632                        plain_text: incoming.plain_text,
633                        ack_key,
634                        info,
635                    },
636                })
637            }
638            HoprPacket::Forwarded(fwd) => {
639                let (outgoing_ticket_win_prob, outgoing_ticket_price) =
640                    determine_network_config(outgoing_ticket_win_prob, outgoing_ticket_price, resolver)
641                        .await
642                        .map_err(IncomingPacketError::ProcessingError)?;
643
644                let start = std::time::Instant::now();
645                let validation_res = self
646                    .validate_and_replace_ticket(*fwd, resolver, outgoing_ticket_win_prob, outgoing_ticket_price)
647                    .await;
648                if start.elapsed().as_millis() > SLOW_OP_MS {
649                    warn!(
650                        peer = sender.to_peerid_str(),
651                        "validate_and_replace_ticket took {} ms",
652                        start.elapsed().as_millis()
653                    );
654                }
655                match validation_res {
656                    Ok(fwd) => {
657                        let mut payload = Vec::with_capacity(HoprPacket::SIZE);
658                        payload.extend_from_slice(fwd.outgoing.packet.as_ref());
659                        payload.extend_from_slice(&fwd.outgoing.ticket.into_encoded());
660
661                        Ok(IncomingPacket::Forwarded {
662                            packet_tag: fwd.packet_tag,
663                            previous_hop: fwd.previous_hop,
664                            next_hop: fwd.outgoing.next_hop,
665                            data: payload.into_boxed_slice(),
666                            ack_key: fwd.ack_key,
667                        })
668                    }
669                    Err(NodeDbError::TicketValidationError(boxed_error)) => {
670                        let (rejected_ticket, error) = *boxed_error;
671                        let rejected_value = rejected_ticket.amount;
672                        warn!(?rejected_ticket, %rejected_value, erorr = ?error, "failure to validate during forwarding");
673
674                        self.mark_unsaved_ticket_rejected(&rejected_ticket)
675                            .await
676                            .map_err(|e| {
677                                NodeDbError::TicketValidationError(Box::new((
678                                    rejected_ticket.clone(),
679                                    format!("during validation error '{error}' update another error occurred: {e}"),
680                                )))
681                            })
682                            .map_err(IncomingPacketError::ProcessingError)?;
683
684                        #[cfg(all(feature = "prometheus", not(test)))]
685                        METRIC_TICKETS_COUNT.increment(&["rejected"]);
686
687                        Err(IncomingPacketError::ProcessingError(
688                            NodeDbError::TicketValidationError(Box::new((rejected_ticket, error))),
689                        ))
690                    }
691                    Err(e) => Err(IncomingPacketError::ProcessingError(e)),
692                }
693            }
694            HoprPacket::Outgoing(_) => Err(IncomingPacketError::ProcessingError(NodeDbError::LogicalError(
695                "cannot receive an outgoing packet".into(),
696            ))),
697        }
698    }
699}
700
701async fn determine_network_config<R>(
702    configure_outgoing_wp: Option<WinningProbability>,
703    configured_outgoing_price: Option<HoprBalance>,
704    resolver: &R,
705) -> Result<(WinningProbability, HoprBalance), NodeDbError>
706where
707    R: ChainValues,
708{
709    // This operation hits the cache unless the new value is fetched for the first time
710    // NOTE: as opposed to the winning probability, the ticket price does not have
711    // a reasonable default, and therefore the operation fails
712    let network_ticket_price = resolver
713        .minimum_ticket_price()
714        .await
715        .map_err(|e| NodeDbError::LogicalError(format!("failed to determine current network ticket price: {e}")))?;
716    let outgoing_ticket_price = configured_outgoing_price.unwrap_or(network_ticket_price);
717
718    // This operation hits the cache unless the new value is fetched for the first time
719    let network_win_prob = resolver
720        .minimum_incoming_ticket_win_prob()
721        .await
722        .inspect_err(|error| tracing::error!(%error, "failed to determine current network winning probability"))
723        .ok();
724
725    // If no explicit winning probability is configured, use the network value
726    // or 1 if the network value was not determined.
727    // This code does not take the max from those, as it is the upper layer's responsibility
728    // to ensure the configured value is not smaller than the network value.
729    let outgoing_ticket_win_prob = configure_outgoing_wp.or(network_win_prob).unwrap_or_default(); // Absolute default WinningProbability is 1.0
730
731    Ok((outgoing_ticket_win_prob, outgoing_ticket_price))
732}
733
734impl HoprNodeDb {
735    #[tracing::instrument(level = "trace", skip(self, channel))]
736    async fn create_multihop_ticket(
737        &self,
738        channel: ChannelEntry,
739        destination: Address,
740        current_path_pos: u8,
741        winning_prob: WinningProbability,
742        ticket_price: HoprBalance,
743    ) -> Result<TicketBuilder, NodeDbError> {
744        // The next ticket is worth: price * remaining hop count / winning probability
745        let amount = HoprBalance::from(
746            ticket_price
747                .amount()
748                .mul(U256::from(current_path_pos - 1))
749                .div_f64(winning_prob.into())
750                .map_err(|e| {
751                    NodeDbError::LogicalError(format!(
752                        "winning probability outside of the allowed interval (0.0, 1.0]: {e}"
753                    ))
754                })?,
755        );
756
757        if channel.balance.lt(&amount) {
758            return Err(NodeDbError::LogicalError(format!(
759                "out of funds: {} with counterparty {destination} has balance {} < {amount}",
760                channel.get_id(),
761                channel.balance
762            )));
763        }
764
765        let ticket_builder = TicketBuilder::default()
766            .direction(&self.me_address, &destination)
767            .balance(amount)
768            .index(self.increment_outgoing_ticket_index(channel.get_id()).await?)
769            .index_offset(1) // unaggregated always have index_offset == 1
770            .win_prob(winning_prob)
771            .channel_epoch(channel.channel_epoch.as_u32());
772
773        Ok(ticket_builder)
774    }
775}