1use futures::{TryStreamExt, stream::FuturesUnordered};
2use hopr_api::chain::{ChainKeyOperations, ChainPathResolver, ChainReadChannelOperations};
3use hopr_crypto_packet::prelude::*;
4use hopr_crypto_types::crypto_traits::Randomizable;
5use hopr_internal_types::prelude::*;
6use hopr_network_types::prelude::*;
7use hopr_primitive_types::prelude::*;
8use hopr_protocol_hopr::{FoundSurb, SurbStore};
9use tracing::trace;
10
11use crate::errors::HoprTransportError;
12
13#[cfg(all(feature = "prometheus", not(test)))]
14lazy_static::lazy_static! {
15 static ref METRIC_PATH_LENGTH: hopr_metrics::SimpleHistogram = hopr_metrics::SimpleHistogram::new(
16 "hopr_path_length",
17 "Distribution of number of hops of sent messages",
18 vec![0.0, 1.0, 2.0, 3.0, 4.0]
19 ).unwrap();
20}
21
22#[derive(Clone)]
23pub(crate) struct PathPlanner<Surb, R, S> {
24 pub(crate) surb_store: Surb,
25 resolver: R,
26 selector: S,
27 me: Address,
28}
29
30impl<Surb, R, S> PathPlanner<Surb, R, S>
31where
32 Surb: SurbStore + Send + Sync + 'static,
33 R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
34 S: PathSelector + Send + Sync,
35{
36 pub(crate) fn new(me: Address, surb_store: Surb, resolver: R, selector: S) -> Self {
37 Self {
38 surb_store,
39 resolver,
40 selector,
41 me,
42 }
43 }
44
45 async fn resolve_node_id_to_addr(&self, node_id: &NodeId) -> crate::errors::Result<Address> {
46 match node_id {
47 NodeId::Chain(addr) => Ok(*addr),
48 NodeId::Offchain(key) => self
49 .resolver
50 .packet_key_to_chain_key(key)
51 .await
52 .map_err(|e| {
53 HoprTransportError::Other(anyhow::anyhow!("failed to resolve offchain key to chain key: {e}"))
54 })?
55 .ok_or(HoprTransportError::Other(anyhow::anyhow!(
56 "failed to resolve offchain key to chain key: no chain key found"
57 ))),
58 }
59 }
60
61 #[tracing::instrument(level = "trace", skip(self))]
62 async fn resolve_path(
63 &self,
64 source: NodeId,
65 destination: NodeId,
66 options: RoutingOptions,
67 ) -> crate::errors::Result<ValidatedPath> {
68 let resolver = ChainPathResolver::from(&self.resolver);
69 let path = match options {
70 RoutingOptions::IntermediatePath(path) => {
71 trace!(?path, "resolving a specific path");
72
73 ValidatedPath::new(
74 source,
75 path.into_iter().chain(std::iter::once(destination)).collect::<Vec<_>>(),
76 &resolver,
77 )
78 .await?
79 }
80 RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
81 trace!(hops = 0, "resolving zero-hop path");
82
83 ValidatedPath::new(source, vec![destination], &resolver).await?
84 }
85 RoutingOptions::Hops(hops) => {
86 trace!(%hops, "resolving path using hop count");
87
88 let dst = self.resolve_node_id_to_addr(&destination).await?;
89
90 let cp = self
91 .selector
92 .select_path(
93 self.resolve_node_id_to_addr(&source).await?,
94 dst,
95 hops.into(),
96 hops.into(),
97 )
98 .await?;
99
100 ValidatedPath::new(source, ChainPath::from_channel_path(cp, dst), &resolver).await?
101 }
102 };
103
104 #[cfg(all(feature = "prometheus", not(test)))]
105 {
106 use hopr_internal_types::path::Path;
107 hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
108 }
109
110 trace!(%path, "validated resolved path");
111
112 Ok(path)
113 }
114
115 #[tracing::instrument(level = "trace", skip(self))]
116 pub(crate) async fn resolve_routing(
117 &self,
118 size_hint: usize,
119 max_surbs: usize,
120 routing: DestinationRouting,
121 ) -> crate::errors::Result<(ResolvedTransportRouting, Option<usize>)> {
122 match routing {
123 DestinationRouting::Forward {
124 destination,
125 pseudonym,
126 forward_options,
127 return_options,
128 } => {
129 let forward_path = self.resolve_path(self.me.into(), *destination, forward_options).await?;
130
131 let return_paths = if let Some(return_options) = return_options {
132 let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
134 trace!(%destination, %num_possible_surbs, data_len = size_hint, max_surbs, "resolving packet return paths");
135
136 (0..num_possible_surbs)
137 .map(|_| self.resolve_path(*destination, self.me.into(), return_options.clone()))
138 .collect::<FuturesUnordered<_>>()
139 .try_collect::<Vec<ValidatedPath>>()
140 .await?
141 } else {
142 vec![]
143 };
144
145 trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
146
147 Ok((
148 ResolvedTransportRouting::Forward {
149 pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
150 forward_path,
151 return_paths,
152 },
153 None,
154 ))
155 }
156 DestinationRouting::Return(matcher) => {
157 let FoundSurb {
158 sender_id,
159 surb,
160 remaining,
161 } = self
162 .surb_store
163 .find_surb(matcher)
164 .await
165 .ok_or(HoprTransportError::Api("no surb".into()))?;
166 Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
167 }
168 }
169 }
170}