hopr_db_sql/
protocol.rs

1use std::ops::{Mul, Sub};
2
3use async_trait::async_trait;
4use hopr_crypto_packet::prelude::*;
5use hopr_crypto_types::{crypto_traits::Randomizable, prelude::*};
6use hopr_db_api::{
7    errors::Result,
8    prelude::DbError,
9    protocol::{HoprDbProtocolOperations, IncomingPacket, OutgoingPacket, ResolvedAcknowledgement},
10    resolver::HoprDbResolverOperations,
11};
12use hopr_internal_types::prelude::*;
13#[cfg(all(feature = "prometheus", not(test)))]
14use hopr_metrics::metrics::{MultiCounter, SimpleCounter};
15use hopr_network_types::prelude::{ResolvedTransportRouting, SurbMatcher};
16use hopr_parallelize::cpu::spawn_fifo_blocking;
17use hopr_path::{Path, PathAddressResolver, ValidatedPath, errors::PathError};
18use hopr_primitive_types::prelude::*;
19use tracing::{instrument, trace, warn};
20
21use crate::{
22    channels::HoprDbChannelOperations, db::HoprDb, errors::DbSqlError, info::HoprDbInfoOperations,
23    prelude::HoprDbTicketOperations,
24};
25
26#[cfg(all(feature = "prometheus", not(test)))]
27lazy_static::lazy_static! {
28    static ref METRIC_INCOMING_WIN_PROB: hopr_metrics::SimpleHistogram =
29        hopr_metrics::SimpleHistogram::new(
30            "hopr_tickets_incoming_win_probability",
31            "Observes the winning probabilities on incoming tickets",
32            vec![0.0, 0.0001, 0.001, 0.01, 0.05, 0.1, 0.15, 0.25, 0.3, 0.5],
33        ).unwrap();
34
35    pub(crate) static ref METRIC_RECEIVED_ACKS: MultiCounter = MultiCounter::new(
36        "hopr_received_ack_count",
37        "Number of received acknowledgements",
38        &["valid"]
39    )
40    .unwrap();
41
42    pub(crate) static ref METRIC_SENT_ACKS: SimpleCounter =
43        SimpleCounter::new("hopr_sent_acks_count", "Number of sent message acknowledgements").unwrap();
44
45    pub(crate) static ref METRIC_TICKETS_COUNT: MultiCounter =
46        MultiCounter::new("hopr_tickets_count", "Number of winning tickets", &["type"]).unwrap();
47}
48
49impl HoprDb {
50    /// Validates a ticket from a forwarded packet and replaces it with a new ticket for the next hop.
51    async fn validate_and_replace_ticket(
52        &self,
53        mut fwd: HoprForwardedPacket,
54        me: &ChainKeypair,
55        outgoing_ticket_win_prob: WinningProbability,
56        outgoing_ticket_price: HoprBalance,
57    ) -> std::result::Result<HoprForwardedPacket, DbSqlError> {
58        let previous_hop_addr = self.resolve_chain_key(&fwd.previous_hop).await?.ok_or_else(|| {
59            DbSqlError::LogicalError(format!(
60                "failed to find channel key for packet key {} on previous hop",
61                fwd.previous_hop.to_peerid_str()
62            ))
63        })?;
64
65        let next_hop_addr = self.resolve_chain_key(&fwd.outgoing.next_hop).await?.ok_or_else(|| {
66            DbSqlError::LogicalError(format!(
67                "failed to find channel key for packet key {} on next hop",
68                fwd.outgoing.next_hop.to_peerid_str()
69            ))
70        })?;
71
72        let incoming_channel = self
73            .get_channel_by_parties(None, &previous_hop_addr, &self.me_onchain, true)
74            .await?
75            .ok_or_else(|| {
76                DbSqlError::LogicalError(format!(
77                    "no channel found for previous hop address '{previous_hop_addr}'"
78                ))
79            })?;
80
81        // Check: is the ticket in the packet really for the given channel?
82        if !fwd.outgoing.ticket.channel_id.eq(&incoming_channel.get_id()) {
83            return Err(DbSqlError::LogicalError("invalid ticket for channel".into()));
84        }
85
86        let chain_data = self.get_indexer_data(None).await?;
87        let domain_separator = chain_data
88            .channels_dst
89            .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the domain separator".into()))?;
90
91        // The ticket price from the oracle times my node's position on the
92        // path is the acceptable minimum
93        let minimum_ticket_price = chain_data
94            .ticket_price
95            .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the ticket price".into()))?
96            .mul(U256::from(fwd.path_pos));
97
98        #[cfg(all(feature = "prometheus", not(test)))]
99        METRIC_INCOMING_WIN_PROB.observe(fwd.outgoing.ticket.win_prob().as_f64());
100
101        let remaining_balance = incoming_channel
102            .balance
103            .sub(self.ticket_manager.unrealized_value((&incoming_channel).into()).await?);
104
105        // Here also the signature on the ticket gets validated,
106        // so afterward we are sure the source of the `channel`
107        // (which is equal to `previous_hop_addr`) has issued this
108        // ticket.
109        let verified_incoming_ticket = spawn_fifo_blocking(move || {
110            validate_unacknowledged_ticket(
111                fwd.outgoing.ticket,
112                &incoming_channel,
113                minimum_ticket_price,
114                chain_data.minimum_incoming_ticket_winning_prob,
115                remaining_balance,
116                &domain_separator,
117            )
118        })
119        .await?;
120
121        // We currently take the maximum of the win prob from the incoming ticket
122        // and the one configured on this node.
123        // Therefore, the winning probability can only increase along the path.
124        let outgoing_ticket_win_prob = outgoing_ticket_win_prob.max(&verified_incoming_ticket.win_prob());
125
126        // The ticket is now validated, let's place it into the acknowledgement waiting queue
127        self.caches
128            .unacked_tickets
129            .insert(
130                fwd.outgoing.ack_challenge,
131                PendingAcknowledgement::WaitingAsRelayer(verified_incoming_ticket.into_unacknowledged(fwd.own_key)),
132            )
133            .await;
134
135        // NOTE: that the path position according to the ticket value
136        // may no longer match the path position from the packet header,
137        // because the price of the ticket may be set higher be the ticket
138        // issuer.
139
140        // Create the new ticket for the new packet
141        let ticket_builder = if fwd.path_pos > 1 {
142            // There must be a channel to the next node if it's not the final hop
143            let outgoing_channel = self
144                .get_channel_by_parties(None, &self.me_onchain, &next_hop_addr, true)
145                .await?
146                .ok_or(DbSqlError::LogicalError(format!(
147                    "channel to '{next_hop_addr}' not found",
148                )))?;
149
150            self.create_multihop_ticket(
151                outgoing_channel,
152                self.me_onchain,
153                next_hop_addr,
154                fwd.path_pos,
155                outgoing_ticket_win_prob,
156                outgoing_ticket_price,
157            )
158            .await?
159        } else {
160            TicketBuilder::zero_hop().direction(&self.me_onchain, &next_hop_addr)
161        };
162
163        // Finally, replace the ticket in the outgoing packet with a new one
164        fwd.outgoing.ticket = ticket_builder
165            .challenge(fwd.next_challenge)
166            .build_signed(me, &domain_separator)?
167            .leak();
168
169        Ok(fwd)
170    }
171
172    async fn validate_acknowledgement(
173        &self,
174        ack: &Acknowledgement,
175    ) -> std::result::Result<ResolvedAcknowledgement, DbSqlError> {
176        let pending_ack = self
177            .caches
178            .unacked_tickets
179            .remove(&ack.ack_challenge()?)
180            .await
181            .ok_or_else(|| {
182                DbSqlError::AcknowledgementValidationError(format!(
183                    "received unexpected acknowledgement for half key challenge {}",
184                    ack.to_hex()
185                ))
186            })?;
187
188        match pending_ack {
189            PendingAcknowledgement::WaitingAsSender => {
190                trace!("received acknowledgement as sender: first relayer has processed the packet");
191                Ok(ResolvedAcknowledgement::Sending(*ack))
192            }
193
194            PendingAcknowledgement::WaitingAsRelayer(unacknowledged) => {
195                let maybe_channel_with_issuer = self
196                    .get_channel_by_parties(None, unacknowledged.ticket.verified_issuer(), &self.me_onchain, true)
197                    .await?;
198
199                // Issuer's channel must have an epoch matching with the unacknowledged ticket
200                if maybe_channel_with_issuer
201                    .is_some_and(|c| c.channel_epoch.as_u32() == unacknowledged.verified_ticket().channel_epoch)
202                {
203                    let domain_separator = self
204                        .get_indexer_data(None)
205                        .await?
206                        .channels_dst
207                        .ok_or_else(|| DbSqlError::LogicalError("domain separator missing".into()))?;
208
209                    let myself = self.clone();
210                    let ack = *ack;
211                    hopr_parallelize::cpu::spawn_blocking(move || {
212                        // This explicitly checks whether the acknowledgement
213                        // solves the challenge on the ticket. It must be done before we
214                        // check that the ticket is winning, which is a lengthy operation
215                        // and should not be done for bogus unacknowledged tickets
216                        let ack_ticket = unacknowledged.acknowledge(&ack.ack_key_share()?)?;
217
218                        if ack_ticket.is_winning(&myself.chain_key, &domain_separator) {
219                            trace!("Found a winning ticket");
220                            Ok(ResolvedAcknowledgement::RelayingWin(ack_ticket))
221                        } else {
222                            trace!("Found a losing ticket");
223                            Ok(ResolvedAcknowledgement::RelayingLoss(
224                                ack_ticket.ticket.verified_ticket().channel_id,
225                            ))
226                        }
227                    })
228                    .await
229                } else {
230                    Err(DbSqlError::LogicalError(format!(
231                        "no channel found for  address '{}' with a matching epoch",
232                        unacknowledged.ticket.verified_issuer()
233                    )))
234                }
235            }
236        }
237    }
238}
239
240#[async_trait]
241impl HoprDbProtocolOperations for HoprDb {
242    #[instrument(level = "trace", skip(self, ack))]
243    async fn handle_acknowledgement(&self, ack: Acknowledgement) -> Result<()> {
244        let result = self.validate_acknowledgement(&ack).await?;
245        match &result {
246            ResolvedAcknowledgement::RelayingWin(ack_ticket) => {
247                // If the ticket was a win, store it
248                self.ticket_manager.insert_ticket(ack_ticket.clone()).await?;
249
250                #[cfg(all(feature = "prometheus", not(test)))]
251                {
252                    METRIC_RECEIVED_ACKS.increment(&["true"]);
253                    METRIC_TICKETS_COUNT.increment(&["winning"]);
254
255                    let verified_ticket = ack_ticket.ticket.verified_ticket();
256                    let channel = verified_ticket.channel_id.to_string();
257                    crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
258                        &[&channel, "unredeemed"],
259                        self.ticket_manager
260                            .unrealized_value(hopr_db_api::tickets::TicketSelector::new(
261                                verified_ticket.channel_id,
262                                verified_ticket.channel_epoch,
263                            ))
264                            .await?
265                            .amount()
266                            .as_u128() as f64,
267                    );
268                    crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
269                        .increment(&[&channel, "winning_count"], 1.0f64);
270                }
271            }
272            ResolvedAcknowledgement::RelayingLoss(_channel) => {
273                #[cfg(all(feature = "prometheus", not(test)))]
274                {
275                    METRIC_RECEIVED_ACKS.increment(&["true"]);
276                    METRIC_TICKETS_COUNT.increment(&["losing"]);
277                    crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
278                        .increment(&[&_channel.to_string(), "losing_count"], 1.0f64);
279                }
280            }
281            ResolvedAcknowledgement::Sending(_) => {
282                #[cfg(all(feature = "prometheus", not(test)))]
283                METRIC_RECEIVED_ACKS.increment(&["true"]);
284            }
285        };
286
287        Ok(())
288    }
289
290    async fn get_network_winning_probability(&self) -> Result<WinningProbability> {
291        Ok(self
292            .get_indexer_data(None)
293            .await
294            .map(|data| data.minimum_incoming_ticket_winning_prob)?)
295    }
296
297    async fn get_network_ticket_price(&self) -> Result<HoprBalance> {
298        Ok(self.get_indexer_data(None).await.and_then(|data| {
299            data.ticket_price
300                .ok_or(DbSqlError::LogicalError("missing ticket price".into()))
301        })?)
302    }
303
304    async fn find_surb(&self, matcher: SurbMatcher) -> Result<(HoprSenderId, HoprSurb)> {
305        let pseudonym = matcher.pseudonym();
306        let surbs_for_pseudonym = self
307            .caches
308            .surbs_per_pseudonym
309            .get(&pseudonym)
310            .await
311            .ok_or(DbError::NoSurbAvailable("pseudonym not found".into()))?;
312
313        match matcher {
314            SurbMatcher::Pseudonym(_) => Ok(surbs_for_pseudonym
315                .pop_one()
316                .map(|(id, surb)| (HoprSenderId::from_pseudonym_and_id(&pseudonym, id), surb))?),
317            // The following code intentionally only checks the first SURB in the ring buffer
318            // and does not search the entire RB.
319            // This is because the exact match use-case is suited only for situations
320            // when there is a single SURB.
321            SurbMatcher::Exact(id) => Ok(surbs_for_pseudonym
322                .pop_one_if_has_id(&id.surb_id())
323                .map(|(id, surb)| (HoprSenderId::from_pseudonym_and_id(&pseudonym, id), surb))?),
324        }
325    }
326
327    #[tracing::instrument(level = "trace", skip(self, data))]
328    async fn to_send_no_ack(&self, data: Box<[u8]>, destination: OffchainPublicKey) -> Result<OutgoingPacket> {
329        let next_peer = self.resolve_chain_key(&destination).await?.ok_or_else(|| {
330            DbSqlError::LogicalError(format!(
331                "failed to find chain key for packet key {} on previous hop",
332                destination.to_peerid_str()
333            ))
334        })?;
335
336        // No-ack packets are always sent as zero-hops with a random pseudonym
337        let pseudonym = HoprPseudonym::random();
338        let next_ticket = TicketBuilder::zero_hop().direction(&self.me_onchain, &next_peer);
339
340        let domain_separator = self
341            .get_indexer_data(None)
342            .await?
343            .channels_dst
344            .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the domain separator".into()))?;
345
346        // Construct the outgoing packet
347        let myself = self.clone();
348        let (packet, _) = spawn_fifo_blocking(move || {
349            HoprPacket::into_outgoing(
350                &data,
351                &pseudonym,
352                PacketRouting::NoAck::<ValidatedPath>(destination),
353                &myself.chain_key,
354                next_ticket,
355                &myself.caches.key_id_mapper,
356                &domain_separator,
357            )
358            .map_err(|e| DbSqlError::LogicalError(format!("failed to construct chain components for a packet: {e}")))
359        })
360        .await?;
361
362        if let Some(out) = packet.try_as_outgoing() {
363            let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
364            transport_payload.extend_from_slice(out.packet.as_ref());
365            transport_payload.extend_from_slice(&out.ticket.into_encoded());
366
367            #[cfg(all(feature = "prometheus", not(test)))]
368            METRIC_SENT_ACKS.increment();
369
370            Ok(OutgoingPacket {
371                next_hop: out.next_hop,
372                ack_challenge: out.ack_challenge,
373                data: transport_payload.into_boxed_slice(),
374            })
375        } else {
376            Err(DbSqlError::LogicalError("must be an outgoing packet".into()).into())
377        }
378    }
379
380    #[tracing::instrument(level = "trace", skip(self, data, routing))]
381    async fn to_send(
382        &self,
383        data: Box<[u8]>,
384        routing: ResolvedTransportRouting,
385        outgoing_ticket_win_prob: WinningProbability,
386        outgoing_ticket_price: HoprBalance,
387    ) -> Result<OutgoingPacket> {
388        // Get necessary packet routing values
389        let (next_peer, num_hops, pseudonym, routing) = match routing {
390            ResolvedTransportRouting::Forward {
391                pseudonym,
392                forward_path,
393                return_paths,
394            } => (
395                forward_path[0],
396                forward_path.num_hops(),
397                pseudonym,
398                PacketRouting::ForwardPath {
399                    forward_path,
400                    return_paths,
401                },
402            ),
403            ResolvedTransportRouting::Return(sender_id, surb) => {
404                let next = self
405                    .caches
406                    .key_id_mapper
407                    .map_id_to_public(&surb.first_relayer)
408                    .ok_or(DbSqlError::MissingAccount)?;
409
410                (
411                    next,
412                    surb.additional_data_receiver.proof_of_relay_values().chain_length() as usize,
413                    sender_id.pseudonym(),
414                    PacketRouting::Surb(sender_id.surb_id(), surb),
415                )
416            }
417        };
418
419        let next_peer = self.resolve_chain_key(&next_peer).await?.ok_or_else(|| {
420            DbSqlError::LogicalError(format!(
421                "failed to find chain key for packet key {} on previous hop",
422                next_peer.to_peerid_str()
423            ))
424        })?;
425
426        // Decide whether to create a multi-hop or a zero-hop ticket
427        let next_ticket = if num_hops > 1 {
428            let channel = self
429                .get_channel_by_parties(None, &self.me_onchain, &next_peer, true)
430                .await?
431                .ok_or(DbSqlError::LogicalError(format!("channel to '{next_peer}' not found")))?;
432
433            self.create_multihop_ticket(
434                channel,
435                self.me_onchain,
436                next_peer,
437                num_hops as u8,
438                outgoing_ticket_win_prob,
439                outgoing_ticket_price,
440            )
441            .await?
442        } else {
443            TicketBuilder::zero_hop().direction(&self.me_onchain, &next_peer)
444        };
445
446        let domain_separator = self
447            .get_indexer_data(None)
448            .await?
449            .channels_dst
450            .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the domain separator".into()))?;
451
452        // Construct the outgoing packet
453        let myself = self.clone();
454        let (packet, openers) = spawn_fifo_blocking(move || {
455            HoprPacket::into_outgoing(
456                &data,
457                &pseudonym,
458                routing,
459                &myself.chain_key,
460                next_ticket,
461                &myself.caches.key_id_mapper,
462                &domain_separator,
463            )
464            .map_err(|e| DbSqlError::LogicalError(format!("failed to construct chain components for a packet: {e}")))
465        })
466        .await?;
467
468        // Store the reply openers under the given SenderId
469        // This is a no-op for reply packets
470        openers.into_iter().for_each(|(surb_id, opener)| {
471            self.caches
472                .pseudonym_openers
473                .insert(HoprSenderId::from_pseudonym_and_id(&pseudonym, surb_id), opener)
474        });
475
476        if let Some(out) = packet.try_as_outgoing() {
477            self.caches
478                .unacked_tickets
479                .insert(out.ack_challenge, PendingAcknowledgement::WaitingAsSender)
480                .await;
481
482            let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
483            transport_payload.extend_from_slice(out.packet.as_ref());
484            transport_payload.extend_from_slice(&out.ticket.into_encoded());
485
486            Ok(OutgoingPacket {
487                next_hop: out.next_hop,
488                ack_challenge: out.ack_challenge,
489                data: transport_payload.into_boxed_slice(),
490            })
491        } else {
492            Err(DbSqlError::LogicalError("must be an outgoing packet".into()).into())
493        }
494    }
495
496    #[tracing::instrument(level = "trace", skip(self, data, pkt_keypair, sender), fields(sender = %sender))]
497    async fn from_recv(
498        &self,
499        data: Box<[u8]>,
500        pkt_keypair: &OffchainKeypair,
501        sender: OffchainPublicKey,
502        outgoing_ticket_win_prob: WinningProbability,
503        outgoing_ticket_price: HoprBalance,
504    ) -> Result<Option<IncomingPacket>> {
505        let offchain_keypair = pkt_keypair.clone();
506        let myself = self.clone();
507
508        let packet = spawn_fifo_blocking(move || {
509            HoprPacket::from_incoming(&data, &offchain_keypair, sender, &myself.caches.key_id_mapper, |p| {
510                myself.caches.pseudonym_openers.remove(p)
511            })
512            .map_err(|e| DbSqlError::LogicalError(format!("failed to construct an incoming packet: {e}")))
513        })
514        .await?;
515
516        match packet {
517            HoprPacket::Final(incoming) => {
518                // Store all incoming SURBs if any
519                if !incoming.surbs.is_empty() {
520                    let num_surbs = incoming.surbs.len();
521
522                    // TODO: make the capacity of the SURB RB configurable
523                    self.caches
524                        .surbs_per_pseudonym
525                        .entry_by_ref(&incoming.sender)
526                        .or_default() // Default capacity SurbRingBuffer
527                        .await
528                        .value()
529                        .push(incoming.surbs)?;
530
531                    tracing::trace!(pseudonym = %incoming.sender, num_surbs, "stored incoming surbs for pseudonym");
532                }
533
534                if let Some(ack_key) = incoming.ack_key {
535                    Ok(Some(IncomingPacket::Final {
536                        packet_tag: incoming.packet_tag,
537                        previous_hop: incoming.previous_hop,
538                        sender: incoming.sender,
539                        plain_text: incoming.plain_text,
540                        ack_key,
541                    }))
542                } else {
543                    // The contained payload represents an Acknowledgement
544                    let ack: Acknowledgement = incoming.plain_text.as_ref().try_into().map_err(|error| {
545                        tracing::error!(%error, "failed to decode the acknowledgement");
546
547                        #[cfg(all(feature = "prometheus", not(test)))]
548                        METRIC_RECEIVED_ACKS.increment(&["false"]);
549
550                        DbSqlError::DecodingError
551                    })?;
552
553                    let ack = ack
554                        .validate(&incoming.previous_hop)
555                        .map_err(|e| crate::errors::DbSqlError::AcknowledgementValidationError(e.to_string()))?;
556                    self.handle_acknowledgement(ack).await?;
557
558                    Ok(None)
559                }
560            }
561            HoprPacket::Forwarded(fwd) => {
562                match self
563                    .validate_and_replace_ticket(*fwd, &self.chain_key, outgoing_ticket_win_prob, outgoing_ticket_price)
564                    .await
565                {
566                    Ok(fwd) => {
567                        let mut payload = Vec::with_capacity(HoprPacket::SIZE);
568                        payload.extend_from_slice(fwd.outgoing.packet.as_ref());
569                        payload.extend_from_slice(&fwd.outgoing.ticket.into_encoded());
570
571                        Ok(Some(IncomingPacket::Forwarded {
572                            packet_tag: fwd.packet_tag,
573                            previous_hop: fwd.previous_hop,
574                            next_hop: fwd.outgoing.next_hop,
575                            data: payload.into_boxed_slice(),
576                            ack: Acknowledgement::new(fwd.ack_key, pkt_keypair),
577                        }))
578                    }
579                    Err(DbSqlError::TicketValidationError(boxed_error)) => {
580                        let (rejected_ticket, error) = *boxed_error;
581                        let rejected_value = rejected_ticket.amount;
582                        warn!(?rejected_ticket, %rejected_value, erorr = ?error, "failure to validate during forwarding");
583
584                        self.mark_unsaved_ticket_rejected(&rejected_ticket).await.map_err(|e| {
585                            DbSqlError::TicketValidationError(Box::new((
586                                rejected_ticket.clone(),
587                                format!("during validation error '{error}' update another error occurred: {e}"),
588                            )))
589                        })?;
590
591                        Err(DbSqlError::TicketValidationError(Box::new((rejected_ticket, error))).into())
592                    }
593                    Err(e) => Err(e.into()),
594                }
595            }
596            HoprPacket::Outgoing(_) => Err(DbSqlError::LogicalError("cannot receive an outgoing packet".into()).into()),
597        }
598    }
599}
600
601impl HoprDb {
602    async fn create_multihop_ticket(
603        &self,
604        channel: ChannelEntry,
605        me_onchain: Address,
606        destination: Address,
607        current_path_pos: u8,
608        winning_prob: WinningProbability,
609        ticket_price: HoprBalance,
610    ) -> crate::errors::Result<TicketBuilder> {
611        // The next ticket is worth: price * remaining hop count / winning probability
612        let amount = HoprBalance::from(
613            ticket_price
614                .amount()
615                .mul(U256::from(current_path_pos - 1))
616                .div_f64(winning_prob.into())
617                .map_err(|e| {
618                    DbSqlError::LogicalError(format!(
619                        "winning probability outside of the allowed interval (0.0, 1.0]: {e}"
620                    ))
621                })?,
622        );
623
624        if channel.balance.lt(&amount) {
625            return Err(DbSqlError::LogicalError(format!(
626                "out of funds: {} with counterparty {destination} has balance {} < {amount}",
627                channel.get_id(),
628                channel.balance
629            )));
630        }
631
632        let ticket_builder = TicketBuilder::default()
633            .direction(&me_onchain, &destination)
634            .balance(amount)
635            .index(self.increment_outgoing_ticket_index(channel.get_id()).await?)
636            .index_offset(1) // unaggregated always have index_offset == 1
637            .win_prob(winning_prob)
638            .channel_epoch(channel.channel_epoch.as_u32());
639
640        Ok(ticket_builder)
641    }
642}
643
644#[async_trait::async_trait]
645impl PathAddressResolver for HoprDb {
646    async fn resolve_transport_address(
647        &self,
648        address: &Address,
649    ) -> std::result::Result<Option<OffchainPublicKey>, PathError> {
650        self.resolve_packet_key(address)
651            .await
652            .map_err(|_| PathError::UnknownPeer(address.to_string()))
653    }
654
655    async fn resolve_chain_address(&self, key: &OffchainPublicKey) -> std::result::Result<Option<Address>, PathError> {
656        self.resolve_chain_key(key)
657            .await
658            .map_err(|_| PathError::UnknownPeer(key.to_string()))
659    }
660}