1use futures::{TryStreamExt, stream::FuturesUnordered};
2use hopr_api::{
3 chain::{ChainKeyOperations, ChainPathResolver, ChainReadChannelOperations},
4 types::{crypto::crypto_traits::Randomizable, internal::prelude::*, primitive::prelude::*},
5};
6use hopr_crypto_packet::prelude::*;
7use hopr_protocol_hopr::{FoundSurb, SurbStore};
8use tracing::trace;
9
10use crate::errors::HoprTransportError;
11
12#[cfg(all(feature = "prometheus", not(test)))]
13lazy_static::lazy_static! {
14 static ref METRIC_PATH_LENGTH: hopr_metrics::SimpleHistogram = hopr_metrics::SimpleHistogram::new(
15 "hopr_path_length",
16 "Distribution of number of hops of sent messages",
17 vec![0.0, 1.0, 2.0, 3.0, 4.0]
18 ).unwrap();
19}
20
21#[derive(Clone)]
22pub(crate) struct PathPlanner<Surb, R, S> {
23 pub(crate) surb_store: Surb,
24 resolver: R,
25 selector: S,
26 me: Address,
27}
28
29impl<Surb, R, S> PathPlanner<Surb, R, S>
30where
31 Surb: SurbStore + Send + Sync + 'static,
32 R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
33 S: PathSelector + Send + Sync,
34{
35 pub(crate) fn new(me: Address, surb_store: Surb, resolver: R, selector: S) -> Self {
36 Self {
37 surb_store,
38 resolver,
39 selector,
40 me,
41 }
42 }
43
44 async fn resolve_node_id_to_addr(&self, node_id: &NodeId) -> crate::errors::Result<Address> {
45 match node_id {
46 NodeId::Chain(addr) => Ok(*addr),
47 NodeId::Offchain(key) => self
48 .resolver
49 .packet_key_to_chain_key(key)
50 .await
51 .map_err(|e| {
52 HoprTransportError::Other(anyhow::anyhow!("failed to resolve offchain key to chain key: {e}"))
53 })?
54 .ok_or(HoprTransportError::Other(anyhow::anyhow!(
55 "failed to resolve offchain key to chain key: no chain key found"
56 ))),
57 }
58 }
59
60 #[tracing::instrument(level = "trace", skip(self))]
61 async fn resolve_path(
62 &self,
63 source: NodeId,
64 destination: NodeId,
65 options: RoutingOptions,
66 ) -> crate::errors::Result<ValidatedPath> {
67 let resolver = ChainPathResolver::from(&self.resolver);
68 let path = match options {
69 RoutingOptions::IntermediatePath(path) => {
70 trace!(?path, "resolving a specific path");
71
72 ValidatedPath::new(
73 source,
74 path.into_iter().chain(std::iter::once(destination)).collect::<Vec<_>>(),
75 &resolver,
76 )
77 .await?
78 }
79 RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
80 trace!(hops = 0, "resolving zero-hop path");
81
82 ValidatedPath::new(source, vec![destination], &resolver).await?
83 }
84 RoutingOptions::Hops(hops) => {
85 trace!(%hops, "resolving path using hop count");
86
87 let dst = self.resolve_node_id_to_addr(&destination).await?;
88
89 let cp = self
90 .selector
91 .select_path(
92 self.resolve_node_id_to_addr(&source).await?,
93 dst,
94 hops.into(),
95 hops.into(),
96 )
97 .await?;
98
99 ValidatedPath::new(source, ChainPath::from_channel_path(cp, dst), &resolver).await?
100 }
101 };
102
103 #[cfg(all(feature = "prometheus", not(test)))]
104 {
105 use hopr_api::types::internal::path::Path;
106 hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
107 }
108
109 trace!(%path, "validated resolved path");
110
111 Ok(path)
112 }
113
114 #[tracing::instrument(level = "trace", skip(self))]
115 pub(crate) async fn resolve_routing(
116 &self,
117 size_hint: usize,
118 max_surbs: usize,
119 routing: DestinationRouting,
120 ) -> crate::errors::Result<(ResolvedTransportRouting<HoprSurb>, Option<usize>)> {
121 match routing {
122 DestinationRouting::Forward {
123 destination,
124 pseudonym,
125 forward_options,
126 return_options,
127 } => {
128 let forward_path = self.resolve_path(self.me.into(), *destination, forward_options).await?;
129
130 let return_paths = if let Some(return_options) = return_options {
131 let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
133 trace!(%destination, %num_possible_surbs, data_len = size_hint, max_surbs, "resolving packet return paths");
134
135 (0..num_possible_surbs)
136 .map(|_| self.resolve_path(*destination, self.me.into(), return_options.clone()))
137 .collect::<FuturesUnordered<_>>()
138 .try_collect::<Vec<ValidatedPath>>()
139 .await?
140 } else {
141 vec![]
142 };
143
144 trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
145
146 Ok((
147 ResolvedTransportRouting::Forward {
148 pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
149 forward_path,
150 return_paths,
151 },
152 None,
153 ))
154 }
155 DestinationRouting::Return(matcher) => {
156 let FoundSurb {
157 sender_id,
158 surb,
159 remaining,
160 } = self
161 .surb_store
162 .find_surb(matcher)
163 .await
164 .ok_or(HoprTransportError::Api(format!(
165 "no surb for pseudonym {}",
166 matcher.pseudonym()
167 )))?;
168 Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
169 }
170 }
171 }
172}