1use futures::{TryStreamExt, stream::FuturesUnordered};
2use hopr_api::{
3 chain::{ChainKeyOperations, ChainPathResolver, ChainReadChannelOperations},
4 db::{FoundSurb, HoprDbProtocolOperations},
5};
6use hopr_crypto_packet::prelude::*;
7use hopr_crypto_types::crypto_traits::Randomizable;
8use hopr_internal_types::prelude::*;
9use hopr_network_types::prelude::*;
10use hopr_primitive_types::prelude::*;
11use tracing::trace;
12
13use crate::errors::HoprTransportError;
14
15#[cfg(all(feature = "prometheus", not(test)))]
16lazy_static::lazy_static! {
17 static ref METRIC_PATH_LENGTH: hopr_metrics::SimpleHistogram = hopr_metrics::SimpleHistogram::new(
18 "hopr_path_length",
19 "Distribution of number of hops of sent messages",
20 vec![0.0, 1.0, 2.0, 3.0, 4.0]
21 ).unwrap();
22}
23
24#[derive(Debug, Copy, Clone, PartialEq)]
26pub struct TicketStatistics {
27 pub winning_count: u128,
28 pub unredeemed_value: HoprBalance,
29 pub redeemed_value: HoprBalance,
30 pub neglected_value: HoprBalance,
31 pub rejected_value: HoprBalance,
32}
33
34#[derive(Clone)]
35pub(crate) struct PathPlanner<Db, R, S> {
36 db: Db,
37 resolver: R,
38 selector: S,
39 me: Address,
40}
41
42impl<Db, R, S> PathPlanner<Db, R, S>
43where
44 Db: HoprDbProtocolOperations + Send + Sync + 'static,
45 R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
46 S: PathSelector + Send + Sync,
47{
48 pub(crate) fn new(me: Address, db: Db, resolver: R, selector: S) -> Self {
49 Self {
50 db,
51 resolver,
52 selector,
53 me,
54 }
55 }
56
57 async fn resolve_node_id_to_addr(&self, node_id: &NodeId) -> crate::errors::Result<Address> {
58 match node_id {
59 NodeId::Chain(addr) => Ok(*addr),
60 NodeId::Offchain(key) => self
61 .resolver
62 .packet_key_to_chain_key(key)
63 .await
64 .map_err(|e| {
65 HoprTransportError::Other(format!("failed to resolve offchain key to chain key: {e}").into())
66 })?
67 .ok_or(HoprTransportError::Other(
68 "failed to resolve offchain key to chain key: no chain key found".into(),
69 )),
70 }
71 }
72
73 #[tracing::instrument(level = "trace", skip(self))]
74 async fn resolve_path(
75 &self,
76 source: NodeId,
77 destination: NodeId,
78 options: RoutingOptions,
79 ) -> crate::errors::Result<ValidatedPath> {
80 let resolver = ChainPathResolver::from(&self.resolver);
81 let path = match options {
82 RoutingOptions::IntermediatePath(path) => {
83 trace!(?path, "resolving a specific path");
84
85 ValidatedPath::new(
86 source,
87 path.into_iter().chain(std::iter::once(destination)).collect::<Vec<_>>(),
88 &resolver,
89 )
90 .await?
91 }
92 RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
93 trace!(hops = 0, "resolving zero-hop path");
94
95 ValidatedPath::new(source, vec![destination], &resolver).await?
96 }
97 RoutingOptions::Hops(hops) => {
98 trace!(%hops, "resolving path using hop count");
99
100 let dst = self.resolve_node_id_to_addr(&destination).await?;
101
102 let cp = self
103 .selector
104 .select_path(
105 self.resolve_node_id_to_addr(&source).await?,
106 dst,
107 hops.into(),
108 hops.into(),
109 )
110 .await?;
111
112 ValidatedPath::new(source, ChainPath::from_channel_path(cp, dst), &resolver).await?
113 }
114 };
115
116 #[cfg(all(feature = "prometheus", not(test)))]
117 {
118 use hopr_internal_types::path::Path;
119 hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
120 }
121
122 trace!(%path, "validated resolved path");
123
124 Ok(path)
125 }
126
127 #[tracing::instrument(level = "trace", skip(self))]
128 pub(crate) async fn resolve_routing(
129 &self,
130 size_hint: usize,
131 max_surbs: usize,
132 routing: DestinationRouting,
133 ) -> crate::errors::Result<(ResolvedTransportRouting, Option<usize>)> {
134 match routing {
135 DestinationRouting::Forward {
136 destination,
137 pseudonym,
138 forward_options,
139 return_options,
140 } => {
141 let forward_path = self.resolve_path(self.me.into(), *destination, forward_options).await?;
142
143 let return_paths = if let Some(return_options) = return_options {
144 let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
146 trace!(%destination, %num_possible_surbs, data_len = size_hint, max_surbs, "resolving packet return paths");
147
148 (0..num_possible_surbs)
149 .map(|_| self.resolve_path(*destination, self.me.into(), return_options.clone()))
150 .collect::<FuturesUnordered<_>>()
151 .try_collect::<Vec<ValidatedPath>>()
152 .await?
153 } else {
154 vec![]
155 };
156
157 trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
158
159 Ok((
160 ResolvedTransportRouting::Forward {
161 pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
162 forward_path,
163 return_paths,
164 },
165 None,
166 ))
167 }
168 DestinationRouting::Return(matcher) => {
169 let FoundSurb {
170 sender_id,
171 surb,
172 remaining,
173 } = self
174 .db
175 .find_surb(matcher)
176 .await
177 .map_err(|e| HoprTransportError::Other(e.into()))?;
178 Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
179 }
180 }
181 }
182}