hopr_db_sql/
protocol.rs

1use std::ops::{Mul, Sub};
2
3use async_trait::async_trait;
4use hopr_crypto_packet::prelude::*;
5use hopr_crypto_types::{crypto_traits::Randomizable, prelude::*};
6use hopr_db_api::{
7    errors::Result,
8    prelude::{AuxiliaryPacketInfo, DbError, SurbCacheConfig},
9    protocol::{FoundSurb, HoprDbProtocolOperations, IncomingPacket, OutgoingPacket, ResolvedAcknowledgement},
10    resolver::HoprDbResolverOperations,
11};
12use hopr_internal_types::prelude::*;
13#[cfg(all(feature = "prometheus", not(test)))]
14use hopr_metrics::metrics::{MultiCounter, SimpleCounter};
15use hopr_network_types::prelude::{ResolvedTransportRouting, SurbMatcher};
16use hopr_parallelize::cpu::spawn_fifo_blocking;
17use hopr_path::{Path, PathAddressResolver, ValidatedPath, errors::PathError};
18use hopr_primitive_types::prelude::*;
19use tracing::{instrument, trace, warn};
20
21use crate::{
22    cache::SurbRingBuffer, channels::HoprDbChannelOperations, db::HoprDb, errors::DbSqlError,
23    info::HoprDbInfoOperations, prelude::HoprDbTicketOperations,
24};
25
26#[cfg(all(feature = "prometheus", not(test)))]
27lazy_static::lazy_static! {
28    static ref METRIC_INCOMING_WIN_PROB: hopr_metrics::SimpleHistogram =
29        hopr_metrics::SimpleHistogram::new(
30            "hopr_tickets_incoming_win_probability",
31            "Observes the winning probabilities on incoming tickets",
32            vec![0.0, 0.0001, 0.001, 0.01, 0.05, 0.1, 0.15, 0.25, 0.3, 0.5],
33        ).unwrap();
34
35    pub(crate) static ref METRIC_RECEIVED_ACKS: MultiCounter = MultiCounter::new(
36        "hopr_received_ack_count",
37        "Number of received acknowledgements",
38        &["valid"]
39    )
40    .unwrap();
41
42    pub(crate) static ref METRIC_SENT_ACKS: SimpleCounter =
43        SimpleCounter::new("hopr_sent_acks_count", "Number of sent message acknowledgements").unwrap();
44
45    pub(crate) static ref METRIC_TICKETS_COUNT: MultiCounter =
46        MultiCounter::new("hopr_tickets_count", "Number of winning tickets", &["type"]).unwrap();
47}
48
49const SLOW_OP_MS: u128 = 150;
50
51impl HoprDb {
52    /// Validates a ticket from a forwarded packet and replaces it with a new ticket for the next hop.
53    #[instrument(level = "trace", skip_all, err)]
54    async fn validate_and_replace_ticket(
55        &self,
56        mut fwd: HoprForwardedPacket,
57        me: &ChainKeypair,
58        outgoing_ticket_win_prob: WinningProbability,
59        outgoing_ticket_price: HoprBalance,
60    ) -> std::result::Result<HoprForwardedPacket, DbSqlError> {
61        let previous_hop_addr = self.resolve_chain_key(&fwd.previous_hop).await?.ok_or_else(|| {
62            DbSqlError::LogicalError(format!(
63                "failed to find channel key for packet key {} on previous hop",
64                fwd.previous_hop.to_peerid_str()
65            ))
66        })?;
67
68        let next_hop_addr = self.resolve_chain_key(&fwd.outgoing.next_hop).await?.ok_or_else(|| {
69            DbSqlError::LogicalError(format!(
70                "failed to find channel key for packet key {} on next hop",
71                fwd.outgoing.next_hop.to_peerid_str()
72            ))
73        })?;
74
75        let incoming_channel = self
76            .get_channel_by_parties(None, &previous_hop_addr, &self.me_onchain, true)
77            .await?
78            .ok_or_else(|| {
79                DbSqlError::LogicalError(format!(
80                    "no channel found for previous hop address '{previous_hop_addr}'"
81                ))
82            })?;
83
84        // Check: is the ticket in the packet really for the given channel?
85        if !fwd.outgoing.ticket.channel_id.eq(&incoming_channel.get_id()) {
86            return Err(DbSqlError::LogicalError("invalid ticket for channel".into()));
87        }
88
89        let chain_data = self.get_indexer_data(None).await?;
90        let domain_separator = chain_data
91            .channels_dst
92            .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the domain separator".into()))?;
93
94        // The ticket price from the oracle times my node's position on the
95        // path is the acceptable minimum
96        let minimum_ticket_price = chain_data
97            .ticket_price
98            .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the ticket price".into()))?
99            .mul(U256::from(fwd.path_pos));
100
101        #[cfg(all(feature = "prometheus", not(test)))]
102        METRIC_INCOMING_WIN_PROB.observe(fwd.outgoing.ticket.win_prob().as_f64());
103
104        let remaining_balance = incoming_channel
105            .balance
106            .sub(self.ticket_manager.unrealized_value((&incoming_channel).into()).await?);
107
108        // Here also the signature on the ticket gets validated,
109        // so afterward we are sure the source of the `channel`
110        // (which is equal to `previous_hop_addr`) has issued this
111        // ticket.
112        let start = std::time::Instant::now();
113        let verified_incoming_ticket = spawn_fifo_blocking(move || {
114            validate_unacknowledged_ticket(
115                fwd.outgoing.ticket,
116                &incoming_channel,
117                minimum_ticket_price,
118                chain_data.minimum_incoming_ticket_winning_prob,
119                remaining_balance,
120                &domain_separator,
121            )
122        })
123        .await?;
124        if start.elapsed().as_millis() > SLOW_OP_MS {
125            warn!("validate_unacknowledged_ticket took {} ms", start.elapsed().as_millis());
126        }
127
128        // We currently take the maximum of the win prob from the incoming ticket
129        // and the one configured on this node.
130        // Therefore, the winning probability can only increase along the path.
131        let outgoing_ticket_win_prob = outgoing_ticket_win_prob.max(&verified_incoming_ticket.win_prob());
132
133        // The ticket is now validated, let's place it into the acknowledgement waiting queue
134        self.caches
135            .unacked_tickets
136            .insert(
137                fwd.outgoing.ack_challenge,
138                PendingAcknowledgement::WaitingAsRelayer(
139                    verified_incoming_ticket.into_unacknowledged(fwd.own_key).into(),
140                ),
141            )
142            .await;
143
144        // NOTE: that the path position according to the ticket value
145        // may no longer match the path position from the packet header,
146        // because the price of the ticket may be set higher be the ticket
147        // issuer.
148
149        // Create the new ticket for the new packet
150        let ticket_builder = if fwd.path_pos > 1 {
151            // There must be a channel to the next node if it's not the final hop
152            let outgoing_channel = self
153                .get_channel_by_parties(None, &self.me_onchain, &next_hop_addr, true)
154                .await?
155                .ok_or(DbSqlError::LogicalError(format!(
156                    "channel to '{next_hop_addr}' not found",
157                )))?;
158
159            self.create_multihop_ticket(
160                outgoing_channel,
161                self.me_onchain,
162                next_hop_addr,
163                fwd.path_pos,
164                outgoing_ticket_win_prob,
165                outgoing_ticket_price,
166            )
167            .await?
168        } else {
169            TicketBuilder::zero_hop().direction(&self.me_onchain, &next_hop_addr)
170        };
171
172        // Finally, replace the ticket in the outgoing packet with a new one
173        let ticket_builder = ticket_builder.eth_challenge(fwd.next_challenge);
174        let me_clone = me.clone();
175        fwd.outgoing.ticket = spawn_fifo_blocking(move || ticket_builder.build_signed(&me_clone, &domain_separator))
176            .await?
177            .leak();
178
179        Ok(fwd)
180    }
181
182    #[instrument(level = "trace", skip(self, ack), err)]
183    async fn find_ticket_to_acknowledge(
184        &self,
185        ack: &VerifiedAcknowledgement,
186    ) -> std::result::Result<ResolvedAcknowledgement, DbSqlError> {
187        let ack_half_key = *ack.ack_key_share();
188        let challenge = hopr_parallelize::cpu::spawn_blocking(move || ack_half_key.to_challenge()).await?;
189
190        let pending_ack = self.caches.unacked_tickets.remove(&challenge).await.ok_or_else(|| {
191            DbSqlError::AcknowledgementValidationError(format!(
192                "received unexpected acknowledgement for half key challenge {}",
193                ack.ack_key_share().to_hex()
194            ))
195        })?;
196
197        match pending_ack {
198            PendingAcknowledgement::WaitingAsSender => {
199                trace!("received acknowledgement as sender: first relayer has processed the packet");
200                Ok(ResolvedAcknowledgement::Sending(*ack))
201            }
202
203            PendingAcknowledgement::WaitingAsRelayer(unacknowledged) => {
204                let maybe_channel_with_issuer = self
205                    .get_channel_by_parties(None, unacknowledged.ticket.verified_issuer(), &self.me_onchain, true)
206                    .await?;
207
208                // Issuer's channel must have an epoch matching with the unacknowledged ticket
209                if maybe_channel_with_issuer
210                    .is_some_and(|c| c.channel_epoch.as_u32() == unacknowledged.verified_ticket().channel_epoch)
211                {
212                    let domain_separator = self
213                        .get_indexer_data(None)
214                        .await?
215                        .channels_dst
216                        .ok_or_else(|| DbSqlError::LogicalError("domain separator missing".into()))?;
217
218                    let myself = self.clone();
219                    let ack = *ack;
220                    hopr_parallelize::cpu::spawn_blocking(move || {
221                        // This explicitly checks whether the acknowledgement
222                        // solves the challenge on the ticket. It must be done before we
223                        // check that the ticket is winning, which is a lengthy operation
224                        // and should not be done for bogus unacknowledged tickets
225                        let ack_ticket = unacknowledged.acknowledge(ack.ack_key_share())?;
226
227                        if ack_ticket.is_winning(&myself.chain_key, &domain_separator) {
228                            trace!("Found a winning ticket");
229                            Ok(ResolvedAcknowledgement::RelayingWin(ack_ticket))
230                        } else {
231                            trace!("Found a losing ticket");
232                            Ok(ResolvedAcknowledgement::RelayingLoss(
233                                ack_ticket.ticket.verified_ticket().channel_id,
234                            ))
235                        }
236                    })
237                    .await
238                } else {
239                    Err(DbSqlError::LogicalError(format!(
240                        "no channel found for  address '{}' with a matching epoch",
241                        unacknowledged.ticket.verified_issuer()
242                    )))
243                }
244            }
245        }
246    }
247}
248
249#[async_trait]
250impl HoprDbProtocolOperations for HoprDb {
251    #[instrument(level = "trace", skip(self, ack), err(Debug), ret)]
252    async fn handle_acknowledgement(&self, ack: VerifiedAcknowledgement) -> Result<()> {
253        let result = self.find_ticket_to_acknowledge(&ack).await?;
254        match &result {
255            ResolvedAcknowledgement::RelayingWin(ack_ticket) => {
256                // If the ticket was a win, store it
257                self.ticket_manager.insert_ticket(ack_ticket.clone()).await?;
258
259                #[cfg(all(feature = "prometheus", not(test)))]
260                {
261                    METRIC_RECEIVED_ACKS.increment(&["true"]);
262                    METRIC_TICKETS_COUNT.increment(&["winning"]);
263
264                    let verified_ticket = ack_ticket.ticket.verified_ticket();
265                    let channel = verified_ticket.channel_id.to_string();
266                    crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
267                        &[&channel, "unredeemed"],
268                        self.ticket_manager
269                            .unrealized_value(hopr_db_api::tickets::TicketSelector::new(
270                                verified_ticket.channel_id,
271                                verified_ticket.channel_epoch,
272                            ))
273                            .await?
274                            .amount()
275                            .as_u128() as f64,
276                    );
277                    crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
278                        .increment(&[&channel, "winning_count"], 1.0f64);
279                }
280            }
281            ResolvedAcknowledgement::RelayingLoss(_channel) => {
282                #[cfg(all(feature = "prometheus", not(test)))]
283                {
284                    METRIC_RECEIVED_ACKS.increment(&["true"]);
285                    METRIC_TICKETS_COUNT.increment(&["losing"]);
286                    crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
287                        .increment(&[&_channel.to_string(), "losing_count"], 1.0f64);
288                }
289            }
290            ResolvedAcknowledgement::Sending(_) => {
291                #[cfg(all(feature = "prometheus", not(test)))]
292                METRIC_RECEIVED_ACKS.increment(&["true"]);
293            }
294        };
295
296        Ok(())
297    }
298
299    async fn get_network_winning_probability(&self) -> Result<WinningProbability> {
300        Ok(self
301            .get_indexer_data(None)
302            .await
303            .map(|data| data.minimum_incoming_ticket_winning_prob)?)
304    }
305
306    async fn get_network_ticket_price(&self) -> Result<HoprBalance> {
307        Ok(self.get_indexer_data(None).await.and_then(|data| {
308            data.ticket_price
309                .ok_or(DbSqlError::LogicalError("missing ticket price".into()))
310        })?)
311    }
312
313    #[tracing::instrument(level = "trace", skip(self, matcher), err)]
314    async fn find_surb(&self, matcher: SurbMatcher) -> Result<FoundSurb> {
315        let pseudonym = matcher.pseudonym();
316        let surbs_for_pseudonym = self
317            .caches
318            .surbs_per_pseudonym
319            .get(&pseudonym)
320            .await
321            .ok_or(DbError::NoSurbAvailable("pseudonym not found".into()))?;
322
323        match matcher {
324            SurbMatcher::Pseudonym(_) => Ok(surbs_for_pseudonym.pop_one().map(|popped_surb| FoundSurb {
325                sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
326                surb: popped_surb.surb,
327                remaining: popped_surb.remaining,
328            })?),
329            // The following code intentionally only checks the first SURB in the ring buffer
330            // and does not search the entire RB.
331            // This is because the exact match use-case is suited only for situations
332            // when there is a single SURB in the RB.
333            SurbMatcher::Exact(id) => Ok(surbs_for_pseudonym
334                .pop_one_if_has_id(&id.surb_id())
335                .map(|popped_surb| FoundSurb {
336                    sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
337                    surb: popped_surb.surb,
338                    remaining: popped_surb.remaining, // = likely 0
339                })?),
340        }
341    }
342
343    #[inline]
344    fn get_surb_config(&self) -> SurbCacheConfig {
345        SurbCacheConfig {
346            rb_capacity: self.cfg.surb_ring_buffer_size,
347            distress_threshold: self.cfg.surb_distress_threshold,
348        }
349    }
350
351    #[tracing::instrument(level = "trace", skip(self, data))]
352    async fn to_send_no_ack(&self, data: Box<[u8]>, destination: OffchainPublicKey) -> Result<OutgoingPacket> {
353        let next_peer = self.resolve_chain_key(&destination).await?.ok_or_else(|| {
354            DbSqlError::LogicalError(format!(
355                "failed to find chain key for packet key {} on previous hop",
356                destination.to_peerid_str()
357            ))
358        })?;
359
360        // No-ack packets are always sent as zero-hops with a random pseudonym
361        let pseudonym = HoprPseudonym::random();
362        let next_ticket = TicketBuilder::zero_hop().direction(&self.me_onchain, &next_peer);
363
364        let domain_separator = self
365            .get_indexer_data(None)
366            .await?
367            .channels_dst
368            .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the domain separator".into()))?;
369
370        // Construct the outgoing packet
371        let myself = self.clone();
372        let (packet, _) = spawn_fifo_blocking(move || {
373            HoprPacket::into_outgoing(
374                &data,
375                &pseudonym,
376                PacketRouting::NoAck::<ValidatedPath>(destination),
377                &myself.chain_key,
378                next_ticket,
379                &myself.caches.key_id_mapper,
380                &domain_separator,
381                None, // NoAck messages currently do not have signals
382            )
383            .map_err(|e| {
384                DbSqlError::LogicalError(format!("failed to construct chain components for a no-ack packet: {e}"))
385            })
386        })
387        .await?;
388
389        if let Some(out) = packet.try_as_outgoing() {
390            let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
391            transport_payload.extend_from_slice(out.packet.as_ref());
392            transport_payload.extend_from_slice(&out.ticket.into_encoded());
393
394            #[cfg(all(feature = "prometheus", not(test)))]
395            METRIC_SENT_ACKS.increment();
396
397            Ok(OutgoingPacket {
398                next_hop: out.next_hop,
399                ack_challenge: out.ack_challenge,
400                data: transport_payload.into_boxed_slice(),
401            })
402        } else {
403            Err(DbSqlError::LogicalError("must be an outgoing packet".into()).into())
404        }
405    }
406
407    #[tracing::instrument(level = "trace", skip(self, data, routing))]
408    async fn to_send(
409        &self,
410        data: Box<[u8]>,
411        routing: ResolvedTransportRouting,
412        outgoing_ticket_win_prob: WinningProbability,
413        outgoing_ticket_price: HoprBalance,
414        signals: PacketSignals,
415    ) -> Result<OutgoingPacket> {
416        // Get necessary packet routing values
417        let (next_peer, num_hops, pseudonym, routing) = match routing {
418            ResolvedTransportRouting::Forward {
419                pseudonym,
420                forward_path,
421                return_paths,
422            } => (
423                forward_path[0],
424                forward_path.num_hops(),
425                pseudonym,
426                PacketRouting::ForwardPath {
427                    forward_path,
428                    return_paths,
429                },
430            ),
431            ResolvedTransportRouting::Return(sender_id, surb) => {
432                let next = self
433                    .caches
434                    .key_id_mapper
435                    .map_id_to_public(&surb.first_relayer)
436                    .ok_or(DbSqlError::MissingAccount)?;
437
438                (
439                    next,
440                    surb.additional_data_receiver.proof_of_relay_values().chain_length() as usize,
441                    sender_id.pseudonym(),
442                    PacketRouting::Surb(sender_id.surb_id(), surb),
443                )
444            }
445        };
446
447        let next_peer = self.resolve_chain_key(&next_peer).await?.ok_or_else(|| {
448            DbSqlError::LogicalError(format!(
449                "failed to find chain key for packet key {} on previous hop",
450                next_peer.to_peerid_str()
451            ))
452        })?;
453
454        // Decide whether to create a multi-hop or a zero-hop ticket
455        let next_ticket = if num_hops > 1 {
456            let channel = self
457                .get_channel_by_parties(None, &self.me_onchain, &next_peer, true)
458                .await?
459                .ok_or(DbSqlError::LogicalError(format!("channel to '{next_peer}' not found")))?;
460
461            self.create_multihop_ticket(
462                channel,
463                self.me_onchain,
464                next_peer,
465                num_hops as u8,
466                outgoing_ticket_win_prob,
467                outgoing_ticket_price,
468            )
469            .await?
470        } else {
471            TicketBuilder::zero_hop().direction(&self.me_onchain, &next_peer)
472        };
473
474        let domain_separator = self
475            .get_indexer_data(None)
476            .await?
477            .channels_dst
478            .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the domain separator".into()))?;
479
480        // Construct the outgoing packet
481        let myself = self.clone();
482        let (packet, openers) = spawn_fifo_blocking(move || {
483            HoprPacket::into_outgoing(
484                &data,
485                &pseudonym,
486                routing,
487                &myself.chain_key,
488                next_ticket,
489                &myself.caches.key_id_mapper,
490                &domain_separator,
491                signals,
492            )
493            .map_err(|e| DbSqlError::LogicalError(format!("failed to construct chain components for a packet: {e}")))
494        })
495        .await?;
496
497        // Store the reply openers under the given SenderId
498        // This is a no-op for reply packets
499        openers.into_iter().for_each(|(surb_id, opener)| {
500            self.caches
501                .insert_pseudonym_opener(HoprSenderId::from_pseudonym_and_id(&pseudonym, surb_id), opener)
502        });
503
504        if let Some(out) = packet.try_as_outgoing() {
505            self.caches
506                .unacked_tickets
507                .insert(out.ack_challenge, PendingAcknowledgement::WaitingAsSender)
508                .await;
509
510            let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
511            transport_payload.extend_from_slice(out.packet.as_ref());
512            transport_payload.extend_from_slice(&out.ticket.into_encoded());
513
514            Ok(OutgoingPacket {
515                next_hop: out.next_hop,
516                ack_challenge: out.ack_challenge,
517                data: transport_payload.into_boxed_slice(),
518            })
519        } else {
520            Err(DbSqlError::LogicalError("must be an outgoing packet".into()).into())
521        }
522    }
523
524    #[tracing::instrument(level = "trace", skip_all, fields(sender = %sender), err)]
525    async fn from_recv(
526        &self,
527        data: Box<[u8]>,
528        pkt_keypair: &OffchainKeypair,
529        sender: OffchainPublicKey,
530        outgoing_ticket_win_prob: WinningProbability,
531        outgoing_ticket_price: HoprBalance,
532    ) -> Result<IncomingPacket> {
533        let offchain_keypair = pkt_keypair.clone();
534        let myself = self.clone();
535
536        let start = std::time::Instant::now();
537        let packet = spawn_fifo_blocking(move || {
538            HoprPacket::from_incoming(&data, &offchain_keypair, sender, &myself.caches.key_id_mapper, |p| {
539                myself.caches.extract_pseudonym_opener(p)
540            })
541            .map_err(|error| match error {
542                errors::PacketError::PacketDecodingError(_) | errors::PacketError::SphinxError(_) => {
543                    DbSqlError::PossibleAdversaryError(format!("possibly adversarial behavior: {error}"))
544                }
545                _ => DbSqlError::LogicalError(format!("failed to construct an incoming packet: {error}")),
546            })
547        })
548        .await?;
549        if start.elapsed().as_millis() > SLOW_OP_MS {
550            warn!(
551                peer = sender.to_peerid_str(),
552                "from_incoming took {} ms",
553                start.elapsed().as_millis()
554            );
555        }
556
557        match packet {
558            HoprPacket::Final(incoming) => {
559                // Extract additional information from the packet that will be passed upwards
560                let info = AuxiliaryPacketInfo {
561                    packet_signals: incoming.signals,
562                    num_surbs: incoming.surbs.len(),
563                };
564
565                // Store all incoming SURBs if any
566                if !incoming.surbs.is_empty() {
567                    self.caches
568                        .surbs_per_pseudonym
569                        .entry_by_ref(&incoming.sender)
570                        .or_insert_with(futures::future::lazy(|_| {
571                            SurbRingBuffer::new(self.cfg.surb_ring_buffer_size)
572                        }))
573                        .await
574                        .value()
575                        .push(incoming.surbs)?;
576
577                    trace!(pseudonym = %incoming.sender, num_surbs = info.num_surbs, "stored incoming surbs for pseudonym");
578                }
579
580                Ok(match incoming.ack_key {
581                    None => {
582                        // The contained payload represents an Acknowledgement
583                        IncomingPacket::Acknowledgement {
584                            packet_tag: incoming.packet_tag,
585                            previous_hop: incoming.previous_hop,
586                            ack: incoming
587                                .plain_text
588                                .as_ref()
589                                .try_into()
590                                .map_err(|_| DbSqlError::DecodingError)?,
591                        }
592                    }
593                    Some(ack_key) => IncomingPacket::Final {
594                        packet_tag: incoming.packet_tag,
595                        previous_hop: incoming.previous_hop,
596                        sender: incoming.sender,
597                        plain_text: incoming.plain_text,
598                        ack_key,
599                        info,
600                    },
601                })
602            }
603            HoprPacket::Forwarded(fwd) => {
604                let start = std::time::Instant::now();
605                let validation_res = self
606                    .validate_and_replace_ticket(*fwd, &self.chain_key, outgoing_ticket_win_prob, outgoing_ticket_price)
607                    .await;
608                if start.elapsed().as_millis() > SLOW_OP_MS {
609                    warn!(
610                        peer = sender.to_peerid_str(),
611                        "validate_and_replace_ticket took {} ms",
612                        start.elapsed().as_millis()
613                    );
614                }
615                match validation_res {
616                    Ok(fwd) => {
617                        let mut payload = Vec::with_capacity(HoprPacket::SIZE);
618                        payload.extend_from_slice(fwd.outgoing.packet.as_ref());
619                        payload.extend_from_slice(&fwd.outgoing.ticket.into_encoded());
620
621                        Ok(IncomingPacket::Forwarded {
622                            packet_tag: fwd.packet_tag,
623                            previous_hop: fwd.previous_hop,
624                            next_hop: fwd.outgoing.next_hop,
625                            data: payload.into_boxed_slice(),
626                            ack_key: fwd.ack_key,
627                        })
628                    }
629                    Err(DbSqlError::TicketValidationError(boxed_error)) => {
630                        let (rejected_ticket, error) = *boxed_error;
631                        let rejected_value = rejected_ticket.amount;
632                        warn!(?rejected_ticket, %rejected_value, erorr = ?error, "failure to validate during forwarding");
633
634                        self.mark_unsaved_ticket_rejected(&rejected_ticket).await.map_err(|e| {
635                            DbSqlError::TicketValidationError(Box::new((
636                                rejected_ticket.clone(),
637                                format!("during validation error '{error}' update another error occurred: {e}"),
638                            )))
639                        })?;
640
641                        Err(DbSqlError::TicketValidationError(Box::new((rejected_ticket, error))).into())
642                    }
643                    Err(e) => Err(e.into()),
644                }
645            }
646            HoprPacket::Outgoing(_) => Err(DbSqlError::LogicalError("cannot receive an outgoing packet".into()).into()),
647        }
648    }
649}
650
651impl HoprDb {
652    #[tracing::instrument(level = "trace", skip(self, channel))]
653    async fn create_multihop_ticket(
654        &self,
655        channel: ChannelEntry,
656        me_onchain: Address,
657        destination: Address,
658        current_path_pos: u8,
659        winning_prob: WinningProbability,
660        ticket_price: HoprBalance,
661    ) -> crate::errors::Result<TicketBuilder> {
662        // The next ticket is worth: price * remaining hop count / winning probability
663        let amount = HoprBalance::from(
664            ticket_price
665                .amount()
666                .mul(U256::from(current_path_pos - 1))
667                .div_f64(winning_prob.into())
668                .map_err(|e| {
669                    DbSqlError::LogicalError(format!(
670                        "winning probability outside of the allowed interval (0.0, 1.0]: {e}"
671                    ))
672                })?,
673        );
674
675        if channel.balance.lt(&amount) {
676            return Err(DbSqlError::LogicalError(format!(
677                "out of funds: {} with counterparty {destination} has balance {} < {amount}",
678                channel.get_id(),
679                channel.balance
680            )));
681        }
682
683        let ticket_builder = TicketBuilder::default()
684            .direction(&me_onchain, &destination)
685            .balance(amount)
686            .index(self.increment_outgoing_ticket_index(channel.get_id()).await?)
687            .index_offset(1) // unaggregated always have index_offset == 1
688            .win_prob(winning_prob)
689            .channel_epoch(channel.channel_epoch.as_u32());
690
691        Ok(ticket_builder)
692    }
693}
694
695#[async_trait::async_trait]
696impl PathAddressResolver for HoprDb {
697    async fn resolve_transport_address(
698        &self,
699        address: &Address,
700    ) -> std::result::Result<Option<OffchainPublicKey>, PathError> {
701        self.resolve_packet_key(address)
702            .await
703            .map_err(|_| PathError::UnknownPeer(address.to_string()))
704    }
705
706    async fn resolve_chain_address(&self, key: &OffchainPublicKey) -> std::result::Result<Option<Address>, PathError> {
707        self.resolve_chain_key(key)
708            .await
709            .map_err(|_| PathError::UnknownPeer(key.to_string()))
710    }
711}