1use std::ops::{Mul, Sub};
2
3use async_trait::async_trait;
4use hopr_crypto_packet::prelude::*;
5use hopr_crypto_types::{crypto_traits::Randomizable, prelude::*};
6use hopr_db_api::{
7 errors::Result,
8 prelude::{AuxiliaryPacketInfo, DbError, SurbCacheConfig},
9 protocol::{FoundSurb, HoprDbProtocolOperations, IncomingPacket, OutgoingPacket, ResolvedAcknowledgement},
10 resolver::HoprDbResolverOperations,
11};
12use hopr_internal_types::prelude::*;
13#[cfg(all(feature = "prometheus", not(test)))]
14use hopr_metrics::metrics::{MultiCounter, SimpleCounter};
15use hopr_network_types::prelude::{ResolvedTransportRouting, SurbMatcher};
16use hopr_parallelize::cpu::spawn_fifo_blocking;
17use hopr_path::{Path, PathAddressResolver, ValidatedPath, errors::PathError};
18use hopr_primitive_types::prelude::*;
19use tracing::{instrument, trace, warn};
20
21use crate::{
22 cache::SurbRingBuffer, channels::HoprDbChannelOperations, db::HoprDb, errors::DbSqlError,
23 info::HoprDbInfoOperations, prelude::HoprDbTicketOperations,
24};
25
26#[cfg(all(feature = "prometheus", not(test)))]
27lazy_static::lazy_static! {
28 static ref METRIC_INCOMING_WIN_PROB: hopr_metrics::SimpleHistogram =
29 hopr_metrics::SimpleHistogram::new(
30 "hopr_tickets_incoming_win_probability",
31 "Observes the winning probabilities on incoming tickets",
32 vec![0.0, 0.0001, 0.001, 0.01, 0.05, 0.1, 0.15, 0.25, 0.3, 0.5],
33 ).unwrap();
34
35 pub(crate) static ref METRIC_RECEIVED_ACKS: MultiCounter = MultiCounter::new(
36 "hopr_received_ack_count",
37 "Number of received acknowledgements",
38 &["valid"]
39 )
40 .unwrap();
41
42 pub(crate) static ref METRIC_SENT_ACKS: SimpleCounter =
43 SimpleCounter::new("hopr_sent_acks_count", "Number of sent message acknowledgements").unwrap();
44
45 pub(crate) static ref METRIC_TICKETS_COUNT: MultiCounter =
46 MultiCounter::new("hopr_tickets_count", "Number of winning tickets", &["type"]).unwrap();
47}
48
49const SLOW_OP_MS: u128 = 150;
50
51impl HoprDb {
52 #[instrument(level = "trace", skip_all, err)]
54 async fn validate_and_replace_ticket(
55 &self,
56 mut fwd: HoprForwardedPacket,
57 me: &ChainKeypair,
58 outgoing_ticket_win_prob: WinningProbability,
59 outgoing_ticket_price: HoprBalance,
60 ) -> std::result::Result<HoprForwardedPacket, DbSqlError> {
61 let previous_hop_addr = self.resolve_chain_key(&fwd.previous_hop).await?.ok_or_else(|| {
62 DbSqlError::LogicalError(format!(
63 "failed to find channel key for packet key {} on previous hop",
64 fwd.previous_hop.to_peerid_str()
65 ))
66 })?;
67
68 let next_hop_addr = self.resolve_chain_key(&fwd.outgoing.next_hop).await?.ok_or_else(|| {
69 DbSqlError::LogicalError(format!(
70 "failed to find channel key for packet key {} on next hop",
71 fwd.outgoing.next_hop.to_peerid_str()
72 ))
73 })?;
74
75 let incoming_channel = self
76 .get_channel_by_parties(None, &previous_hop_addr, &self.me_onchain, true)
77 .await?
78 .ok_or_else(|| {
79 DbSqlError::LogicalError(format!(
80 "no channel found for previous hop address '{previous_hop_addr}'"
81 ))
82 })?;
83
84 if !fwd.outgoing.ticket.channel_id.eq(&incoming_channel.get_id()) {
86 return Err(DbSqlError::LogicalError("invalid ticket for channel".into()));
87 }
88
89 let chain_data = self.get_indexer_data(None).await?;
90 let domain_separator = chain_data
91 .channels_dst
92 .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the domain separator".into()))?;
93
94 let minimum_ticket_price = chain_data
97 .ticket_price
98 .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the ticket price".into()))?
99 .mul(U256::from(fwd.path_pos));
100
101 #[cfg(all(feature = "prometheus", not(test)))]
102 METRIC_INCOMING_WIN_PROB.observe(fwd.outgoing.ticket.win_prob().as_f64());
103
104 let remaining_balance = incoming_channel
105 .balance
106 .sub(self.ticket_manager.unrealized_value((&incoming_channel).into()).await?);
107
108 let start = std::time::Instant::now();
113 let verified_incoming_ticket = spawn_fifo_blocking(move || {
114 validate_unacknowledged_ticket(
115 fwd.outgoing.ticket,
116 &incoming_channel,
117 minimum_ticket_price,
118 chain_data.minimum_incoming_ticket_winning_prob,
119 remaining_balance,
120 &domain_separator,
121 )
122 })
123 .await?;
124 if start.elapsed().as_millis() > SLOW_OP_MS {
125 warn!("validate_unacknowledged_ticket took {} ms", start.elapsed().as_millis());
126 }
127
128 let outgoing_ticket_win_prob = outgoing_ticket_win_prob.max(&verified_incoming_ticket.win_prob());
132
133 self.caches
135 .unacked_tickets
136 .insert(
137 fwd.outgoing.ack_challenge,
138 PendingAcknowledgement::WaitingAsRelayer(
139 verified_incoming_ticket.into_unacknowledged(fwd.own_key).into(),
140 ),
141 )
142 .await;
143
144 let ticket_builder = if fwd.path_pos > 1 {
151 let outgoing_channel = self
153 .get_channel_by_parties(None, &self.me_onchain, &next_hop_addr, true)
154 .await?
155 .ok_or(DbSqlError::LogicalError(format!(
156 "channel to '{next_hop_addr}' not found",
157 )))?;
158
159 self.create_multihop_ticket(
160 outgoing_channel,
161 self.me_onchain,
162 next_hop_addr,
163 fwd.path_pos,
164 outgoing_ticket_win_prob,
165 outgoing_ticket_price,
166 )
167 .await?
168 } else {
169 TicketBuilder::zero_hop().direction(&self.me_onchain, &next_hop_addr)
170 };
171
172 let ticket_builder = ticket_builder.eth_challenge(fwd.next_challenge);
174 let me_clone = me.clone();
175 fwd.outgoing.ticket = spawn_fifo_blocking(move || ticket_builder.build_signed(&me_clone, &domain_separator))
176 .await?
177 .leak();
178
179 Ok(fwd)
180 }
181
182 #[instrument(level = "trace", skip(self, ack), err)]
183 async fn find_ticket_to_acknowledge(
184 &self,
185 ack: &VerifiedAcknowledgement,
186 ) -> std::result::Result<ResolvedAcknowledgement, DbSqlError> {
187 let ack_half_key = *ack.ack_key_share();
188 let challenge = hopr_parallelize::cpu::spawn_blocking(move || ack_half_key.to_challenge()).await?;
189
190 let pending_ack = self.caches.unacked_tickets.remove(&challenge).await.ok_or_else(|| {
191 DbSqlError::AcknowledgementValidationError(format!(
192 "received unexpected acknowledgement for half key challenge {}",
193 ack.ack_key_share().to_hex()
194 ))
195 })?;
196
197 match pending_ack {
198 PendingAcknowledgement::WaitingAsSender => {
199 trace!("received acknowledgement as sender: first relayer has processed the packet");
200 Ok(ResolvedAcknowledgement::Sending(*ack))
201 }
202
203 PendingAcknowledgement::WaitingAsRelayer(unacknowledged) => {
204 let maybe_channel_with_issuer = self
205 .get_channel_by_parties(None, unacknowledged.ticket.verified_issuer(), &self.me_onchain, true)
206 .await?;
207
208 if maybe_channel_with_issuer
210 .is_some_and(|c| c.channel_epoch.as_u32() == unacknowledged.verified_ticket().channel_epoch)
211 {
212 let domain_separator = self
213 .get_indexer_data(None)
214 .await?
215 .channels_dst
216 .ok_or_else(|| DbSqlError::LogicalError("domain separator missing".into()))?;
217
218 let myself = self.clone();
219 let ack = *ack;
220 hopr_parallelize::cpu::spawn_blocking(move || {
221 let ack_ticket = unacknowledged.acknowledge(ack.ack_key_share())?;
226
227 if ack_ticket.is_winning(&myself.chain_key, &domain_separator) {
228 trace!("Found a winning ticket");
229 Ok(ResolvedAcknowledgement::RelayingWin(ack_ticket))
230 } else {
231 trace!("Found a losing ticket");
232 Ok(ResolvedAcknowledgement::RelayingLoss(
233 ack_ticket.ticket.verified_ticket().channel_id,
234 ))
235 }
236 })
237 .await
238 } else {
239 Err(DbSqlError::LogicalError(format!(
240 "no channel found for address '{}' with a matching epoch",
241 unacknowledged.ticket.verified_issuer()
242 )))
243 }
244 }
245 }
246 }
247}
248
249#[async_trait]
250impl HoprDbProtocolOperations for HoprDb {
251 #[instrument(level = "trace", skip(self, ack), err(Debug), ret)]
252 async fn handle_acknowledgement(&self, ack: VerifiedAcknowledgement) -> Result<()> {
253 let result = self.find_ticket_to_acknowledge(&ack).await?;
254 match &result {
255 ResolvedAcknowledgement::RelayingWin(ack_ticket) => {
256 self.ticket_manager.insert_ticket(ack_ticket.clone()).await?;
258
259 #[cfg(all(feature = "prometheus", not(test)))]
260 {
261 METRIC_RECEIVED_ACKS.increment(&["true"]);
262 METRIC_TICKETS_COUNT.increment(&["winning"]);
263
264 let verified_ticket = ack_ticket.ticket.verified_ticket();
265 let channel = verified_ticket.channel_id.to_string();
266 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
267 &[&channel, "unredeemed"],
268 self.ticket_manager
269 .unrealized_value(hopr_db_api::tickets::TicketSelector::new(
270 verified_ticket.channel_id,
271 verified_ticket.channel_epoch,
272 ))
273 .await?
274 .amount()
275 .as_u128() as f64,
276 );
277 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
278 .increment(&[&channel, "winning_count"], 1.0f64);
279 }
280 }
281 ResolvedAcknowledgement::RelayingLoss(_channel) => {
282 #[cfg(all(feature = "prometheus", not(test)))]
283 {
284 METRIC_RECEIVED_ACKS.increment(&["true"]);
285 METRIC_TICKETS_COUNT.increment(&["losing"]);
286 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
287 .increment(&[&_channel.to_string(), "losing_count"], 1.0f64);
288 }
289 }
290 ResolvedAcknowledgement::Sending(_) => {
291 #[cfg(all(feature = "prometheus", not(test)))]
292 METRIC_RECEIVED_ACKS.increment(&["true"]);
293 }
294 };
295
296 Ok(())
297 }
298
299 async fn get_network_winning_probability(&self) -> Result<WinningProbability> {
300 Ok(self
301 .get_indexer_data(None)
302 .await
303 .map(|data| data.minimum_incoming_ticket_winning_prob)?)
304 }
305
306 async fn get_network_ticket_price(&self) -> Result<HoprBalance> {
307 Ok(self.get_indexer_data(None).await.and_then(|data| {
308 data.ticket_price
309 .ok_or(DbSqlError::LogicalError("missing ticket price".into()))
310 })?)
311 }
312
313 #[tracing::instrument(level = "trace", skip(self, matcher), err)]
314 async fn find_surb(&self, matcher: SurbMatcher) -> Result<FoundSurb> {
315 let pseudonym = matcher.pseudonym();
316 let surbs_for_pseudonym = self
317 .caches
318 .surbs_per_pseudonym
319 .get(&pseudonym)
320 .await
321 .ok_or(DbError::NoSurbAvailable("pseudonym not found".into()))?;
322
323 match matcher {
324 SurbMatcher::Pseudonym(_) => Ok(surbs_for_pseudonym.pop_one().map(|popped_surb| FoundSurb {
325 sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
326 surb: popped_surb.surb,
327 remaining: popped_surb.remaining,
328 })?),
329 SurbMatcher::Exact(id) => Ok(surbs_for_pseudonym
334 .pop_one_if_has_id(&id.surb_id())
335 .map(|popped_surb| FoundSurb {
336 sender_id: HoprSenderId::from_pseudonym_and_id(&pseudonym, popped_surb.id),
337 surb: popped_surb.surb,
338 remaining: popped_surb.remaining, })?),
340 }
341 }
342
343 #[inline]
344 fn get_surb_config(&self) -> SurbCacheConfig {
345 SurbCacheConfig {
346 rb_capacity: self.cfg.surb_ring_buffer_size,
347 distress_threshold: self.cfg.surb_distress_threshold,
348 }
349 }
350
351 #[tracing::instrument(level = "trace", skip(self, data))]
352 async fn to_send_no_ack(&self, data: Box<[u8]>, destination: OffchainPublicKey) -> Result<OutgoingPacket> {
353 let next_peer = self.resolve_chain_key(&destination).await?.ok_or_else(|| {
354 DbSqlError::LogicalError(format!(
355 "failed to find chain key for packet key {} on previous hop",
356 destination.to_peerid_str()
357 ))
358 })?;
359
360 let pseudonym = HoprPseudonym::random();
362 let next_ticket = TicketBuilder::zero_hop().direction(&self.me_onchain, &next_peer);
363
364 let domain_separator = self
365 .get_indexer_data(None)
366 .await?
367 .channels_dst
368 .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the domain separator".into()))?;
369
370 let myself = self.clone();
372 let (packet, _) = spawn_fifo_blocking(move || {
373 HoprPacket::into_outgoing(
374 &data,
375 &pseudonym,
376 PacketRouting::NoAck::<ValidatedPath>(destination),
377 &myself.chain_key,
378 next_ticket,
379 &myself.caches.key_id_mapper,
380 &domain_separator,
381 None, )
383 .map_err(|e| {
384 DbSqlError::LogicalError(format!("failed to construct chain components for a no-ack packet: {e}"))
385 })
386 })
387 .await?;
388
389 if let Some(out) = packet.try_as_outgoing() {
390 let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
391 transport_payload.extend_from_slice(out.packet.as_ref());
392 transport_payload.extend_from_slice(&out.ticket.into_encoded());
393
394 #[cfg(all(feature = "prometheus", not(test)))]
395 METRIC_SENT_ACKS.increment();
396
397 Ok(OutgoingPacket {
398 next_hop: out.next_hop,
399 ack_challenge: out.ack_challenge,
400 data: transport_payload.into_boxed_slice(),
401 })
402 } else {
403 Err(DbSqlError::LogicalError("must be an outgoing packet".into()).into())
404 }
405 }
406
407 #[tracing::instrument(level = "trace", skip(self, data, routing))]
408 async fn to_send(
409 &self,
410 data: Box<[u8]>,
411 routing: ResolvedTransportRouting,
412 outgoing_ticket_win_prob: WinningProbability,
413 outgoing_ticket_price: HoprBalance,
414 signals: PacketSignals,
415 ) -> Result<OutgoingPacket> {
416 let (next_peer, num_hops, pseudonym, routing) = match routing {
418 ResolvedTransportRouting::Forward {
419 pseudonym,
420 forward_path,
421 return_paths,
422 } => (
423 forward_path[0],
424 forward_path.num_hops(),
425 pseudonym,
426 PacketRouting::ForwardPath {
427 forward_path,
428 return_paths,
429 },
430 ),
431 ResolvedTransportRouting::Return(sender_id, surb) => {
432 let next = self
433 .caches
434 .key_id_mapper
435 .map_id_to_public(&surb.first_relayer)
436 .ok_or(DbSqlError::MissingAccount)?;
437
438 (
439 next,
440 surb.additional_data_receiver.proof_of_relay_values().chain_length() as usize,
441 sender_id.pseudonym(),
442 PacketRouting::Surb(sender_id.surb_id(), surb),
443 )
444 }
445 };
446
447 let next_peer = self.resolve_chain_key(&next_peer).await?.ok_or_else(|| {
448 DbSqlError::LogicalError(format!(
449 "failed to find chain key for packet key {} on previous hop",
450 next_peer.to_peerid_str()
451 ))
452 })?;
453
454 let next_ticket = if num_hops > 1 {
456 let channel = self
457 .get_channel_by_parties(None, &self.me_onchain, &next_peer, true)
458 .await?
459 .ok_or(DbSqlError::LogicalError(format!("channel to '{next_peer}' not found")))?;
460
461 self.create_multihop_ticket(
462 channel,
463 self.me_onchain,
464 next_peer,
465 num_hops as u8,
466 outgoing_ticket_win_prob,
467 outgoing_ticket_price,
468 )
469 .await?
470 } else {
471 TicketBuilder::zero_hop().direction(&self.me_onchain, &next_peer)
472 };
473
474 let domain_separator = self
475 .get_indexer_data(None)
476 .await?
477 .channels_dst
478 .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the domain separator".into()))?;
479
480 let myself = self.clone();
482 let (packet, openers) = spawn_fifo_blocking(move || {
483 HoprPacket::into_outgoing(
484 &data,
485 &pseudonym,
486 routing,
487 &myself.chain_key,
488 next_ticket,
489 &myself.caches.key_id_mapper,
490 &domain_separator,
491 signals,
492 )
493 .map_err(|e| DbSqlError::LogicalError(format!("failed to construct chain components for a packet: {e}")))
494 })
495 .await?;
496
497 openers.into_iter().for_each(|(surb_id, opener)| {
500 self.caches
501 .insert_pseudonym_opener(HoprSenderId::from_pseudonym_and_id(&pseudonym, surb_id), opener)
502 });
503
504 if let Some(out) = packet.try_as_outgoing() {
505 self.caches
506 .unacked_tickets
507 .insert(out.ack_challenge, PendingAcknowledgement::WaitingAsSender)
508 .await;
509
510 let mut transport_payload = Vec::with_capacity(HoprPacket::SIZE);
511 transport_payload.extend_from_slice(out.packet.as_ref());
512 transport_payload.extend_from_slice(&out.ticket.into_encoded());
513
514 Ok(OutgoingPacket {
515 next_hop: out.next_hop,
516 ack_challenge: out.ack_challenge,
517 data: transport_payload.into_boxed_slice(),
518 })
519 } else {
520 Err(DbSqlError::LogicalError("must be an outgoing packet".into()).into())
521 }
522 }
523
524 #[tracing::instrument(level = "trace", skip_all, fields(sender = %sender), err)]
525 async fn from_recv(
526 &self,
527 data: Box<[u8]>,
528 pkt_keypair: &OffchainKeypair,
529 sender: OffchainPublicKey,
530 outgoing_ticket_win_prob: WinningProbability,
531 outgoing_ticket_price: HoprBalance,
532 ) -> Result<IncomingPacket> {
533 let offchain_keypair = pkt_keypair.clone();
534 let myself = self.clone();
535
536 let start = std::time::Instant::now();
537 let packet = spawn_fifo_blocking(move || {
538 HoprPacket::from_incoming(&data, &offchain_keypair, sender, &myself.caches.key_id_mapper, |p| {
539 myself.caches.extract_pseudonym_opener(p)
540 })
541 .map_err(|error| match error {
542 errors::PacketError::PacketDecodingError(_) | errors::PacketError::SphinxError(_) => {
543 DbSqlError::PossibleAdversaryError(format!("possibly adversarial behavior: {error}"))
544 }
545 _ => DbSqlError::LogicalError(format!("failed to construct an incoming packet: {error}")),
546 })
547 })
548 .await?;
549 if start.elapsed().as_millis() > SLOW_OP_MS {
550 warn!(
551 peer = sender.to_peerid_str(),
552 "from_incoming took {} ms",
553 start.elapsed().as_millis()
554 );
555 }
556
557 match packet {
558 HoprPacket::Final(incoming) => {
559 let info = AuxiliaryPacketInfo {
561 packet_signals: incoming.signals,
562 num_surbs: incoming.surbs.len(),
563 };
564
565 if !incoming.surbs.is_empty() {
567 self.caches
568 .surbs_per_pseudonym
569 .entry_by_ref(&incoming.sender)
570 .or_insert_with(futures::future::lazy(|_| {
571 SurbRingBuffer::new(self.cfg.surb_ring_buffer_size)
572 }))
573 .await
574 .value()
575 .push(incoming.surbs)?;
576
577 trace!(pseudonym = %incoming.sender, num_surbs = info.num_surbs, "stored incoming surbs for pseudonym");
578 }
579
580 Ok(match incoming.ack_key {
581 None => {
582 IncomingPacket::Acknowledgement {
584 packet_tag: incoming.packet_tag,
585 previous_hop: incoming.previous_hop,
586 ack: incoming
587 .plain_text
588 .as_ref()
589 .try_into()
590 .map_err(|_| DbSqlError::DecodingError)?,
591 }
592 }
593 Some(ack_key) => IncomingPacket::Final {
594 packet_tag: incoming.packet_tag,
595 previous_hop: incoming.previous_hop,
596 sender: incoming.sender,
597 plain_text: incoming.plain_text,
598 ack_key,
599 info,
600 },
601 })
602 }
603 HoprPacket::Forwarded(fwd) => {
604 let start = std::time::Instant::now();
605 let validation_res = self
606 .validate_and_replace_ticket(*fwd, &self.chain_key, outgoing_ticket_win_prob, outgoing_ticket_price)
607 .await;
608 if start.elapsed().as_millis() > SLOW_OP_MS {
609 warn!(
610 peer = sender.to_peerid_str(),
611 "validate_and_replace_ticket took {} ms",
612 start.elapsed().as_millis()
613 );
614 }
615 match validation_res {
616 Ok(fwd) => {
617 let mut payload = Vec::with_capacity(HoprPacket::SIZE);
618 payload.extend_from_slice(fwd.outgoing.packet.as_ref());
619 payload.extend_from_slice(&fwd.outgoing.ticket.into_encoded());
620
621 Ok(IncomingPacket::Forwarded {
622 packet_tag: fwd.packet_tag,
623 previous_hop: fwd.previous_hop,
624 next_hop: fwd.outgoing.next_hop,
625 data: payload.into_boxed_slice(),
626 ack_key: fwd.ack_key,
627 })
628 }
629 Err(DbSqlError::TicketValidationError(boxed_error)) => {
630 let (rejected_ticket, error) = *boxed_error;
631 let rejected_value = rejected_ticket.amount;
632 warn!(?rejected_ticket, %rejected_value, erorr = ?error, "failure to validate during forwarding");
633
634 self.mark_unsaved_ticket_rejected(&rejected_ticket).await.map_err(|e| {
635 DbSqlError::TicketValidationError(Box::new((
636 rejected_ticket.clone(),
637 format!("during validation error '{error}' update another error occurred: {e}"),
638 )))
639 })?;
640
641 Err(DbSqlError::TicketValidationError(Box::new((rejected_ticket, error))).into())
642 }
643 Err(e) => Err(e.into()),
644 }
645 }
646 HoprPacket::Outgoing(_) => Err(DbSqlError::LogicalError("cannot receive an outgoing packet".into()).into()),
647 }
648 }
649}
650
651impl HoprDb {
652 #[tracing::instrument(level = "trace", skip(self, channel))]
653 async fn create_multihop_ticket(
654 &self,
655 channel: ChannelEntry,
656 me_onchain: Address,
657 destination: Address,
658 current_path_pos: u8,
659 winning_prob: WinningProbability,
660 ticket_price: HoprBalance,
661 ) -> crate::errors::Result<TicketBuilder> {
662 let amount = HoprBalance::from(
664 ticket_price
665 .amount()
666 .mul(U256::from(current_path_pos - 1))
667 .div_f64(winning_prob.into())
668 .map_err(|e| {
669 DbSqlError::LogicalError(format!(
670 "winning probability outside of the allowed interval (0.0, 1.0]: {e}"
671 ))
672 })?,
673 );
674
675 if channel.balance.lt(&amount) {
676 return Err(DbSqlError::LogicalError(format!(
677 "out of funds: {} with counterparty {destination} has balance {} < {amount}",
678 channel.get_id(),
679 channel.balance
680 )));
681 }
682
683 let ticket_builder = TicketBuilder::default()
684 .direction(&me_onchain, &destination)
685 .balance(amount)
686 .index(self.increment_outgoing_ticket_index(channel.get_id()).await?)
687 .index_offset(1) .win_prob(winning_prob)
689 .channel_epoch(channel.channel_epoch.as_u32());
690
691 Ok(ticket_builder)
692 }
693}
694
695#[async_trait::async_trait]
696impl PathAddressResolver for HoprDb {
697 async fn resolve_transport_address(
698 &self,
699 address: &Address,
700 ) -> std::result::Result<Option<OffchainPublicKey>, PathError> {
701 self.resolve_packet_key(address)
702 .await
703 .map_err(|_| PathError::UnknownPeer(address.to_string()))
704 }
705
706 async fn resolve_chain_address(&self, key: &OffchainPublicKey) -> std::result::Result<Option<Address>, PathError> {
707 self.resolve_chain_key(key)
708 .await
709 .map_err(|_| PathError::UnknownPeer(key.to_string()))
710 }
711}