Skip to main content

hopr_transport/
helpers.rs

1use futures::{TryStreamExt, stream::FuturesUnordered};
2use hopr_api::{
3    chain::{ChainKeyOperations, ChainPathResolver, ChainReadChannelOperations},
4    types::{crypto::crypto_traits::Randomizable, internal::prelude::*, primitive::prelude::*},
5};
6use hopr_crypto_packet::prelude::*;
7use hopr_protocol_hopr::{FoundSurb, SurbStore};
8use tracing::trace;
9
10use crate::errors::HoprTransportError;
11
12#[cfg(all(feature = "prometheus", not(test)))]
13lazy_static::lazy_static! {
14    static ref METRIC_PATH_LENGTH: hopr_metrics::SimpleHistogram = hopr_metrics::SimpleHistogram::new(
15        "hopr_path_length",
16        "Distribution of number of hops of sent messages",
17        vec![0.0, 1.0, 2.0, 3.0, 4.0]
18    ).unwrap();
19}
20
21#[derive(Clone)]
22pub(crate) struct PathPlanner<Surb, R, S> {
23    pub(crate) surb_store: Surb,
24    resolver: R,
25    selector: S,
26    me: Address,
27}
28
29impl<Surb, R, S> PathPlanner<Surb, R, S>
30where
31    Surb: SurbStore + Send + Sync + 'static,
32    R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
33    S: PathSelector + Send + Sync,
34{
35    pub(crate) fn new(me: Address, surb_store: Surb, resolver: R, selector: S) -> Self {
36        Self {
37            surb_store,
38            resolver,
39            selector,
40            me,
41        }
42    }
43
44    async fn resolve_node_id_to_addr(&self, node_id: &NodeId) -> crate::errors::Result<Address> {
45        match node_id {
46            NodeId::Chain(addr) => Ok(*addr),
47            NodeId::Offchain(key) => self
48                .resolver
49                .packet_key_to_chain_key(key)
50                .await
51                .map_err(|e| {
52                    HoprTransportError::Other(anyhow::anyhow!("failed to resolve offchain key to chain key: {e}"))
53                })?
54                .ok_or(HoprTransportError::Other(anyhow::anyhow!(
55                    "failed to resolve offchain key to chain key: no chain key found"
56                ))),
57        }
58    }
59
60    #[tracing::instrument(level = "trace", skip(self))]
61    async fn resolve_path(
62        &self,
63        source: NodeId,
64        destination: NodeId,
65        options: RoutingOptions,
66    ) -> crate::errors::Result<ValidatedPath> {
67        let resolver = ChainPathResolver::from(&self.resolver);
68        let path = match options {
69            RoutingOptions::IntermediatePath(path) => {
70                trace!(?path, "resolving a specific path");
71
72                ValidatedPath::new(
73                    source,
74                    path.into_iter().chain(std::iter::once(destination)).collect::<Vec<_>>(),
75                    &resolver,
76                )
77                .await?
78            }
79            RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
80                trace!(hops = 0, "resolving zero-hop path");
81
82                ValidatedPath::new(source, vec![destination], &resolver).await?
83            }
84            RoutingOptions::Hops(hops) => {
85                trace!(%hops, "resolving path using hop count");
86
87                let dst = self.resolve_node_id_to_addr(&destination).await?;
88
89                let cp = self
90                    .selector
91                    .select_path(
92                        self.resolve_node_id_to_addr(&source).await?,
93                        dst,
94                        hops.into(),
95                        hops.into(),
96                    )
97                    .await?;
98
99                ValidatedPath::new(source, ChainPath::from_channel_path(cp, dst), &resolver).await?
100            }
101        };
102
103        #[cfg(all(feature = "prometheus", not(test)))]
104        {
105            use hopr_api::types::internal::path::Path;
106            hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
107        }
108
109        trace!(%path, "validated resolved path");
110
111        Ok(path)
112    }
113
114    #[tracing::instrument(level = "trace", skip(self))]
115    pub(crate) async fn resolve_routing(
116        &self,
117        size_hint: usize,
118        max_surbs: usize,
119        routing: DestinationRouting,
120    ) -> crate::errors::Result<(ResolvedTransportRouting<HoprSurb>, Option<usize>)> {
121        match routing {
122            DestinationRouting::Forward {
123                destination,
124                pseudonym,
125                forward_options,
126                return_options,
127            } => {
128                let forward_path = self.resolve_path(self.me.into(), *destination, forward_options).await?;
129
130                let return_paths = if let Some(return_options) = return_options {
131                    // Safeguard for the correct number of SURBs
132                    let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
133                    trace!(%destination, %num_possible_surbs, data_len = size_hint, max_surbs, "resolving packet return paths");
134
135                    (0..num_possible_surbs)
136                        .map(|_| self.resolve_path(*destination, self.me.into(), return_options.clone()))
137                        .collect::<FuturesUnordered<_>>()
138                        .try_collect::<Vec<ValidatedPath>>()
139                        .await?
140                } else {
141                    vec![]
142                };
143
144                trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
145
146                Ok((
147                    ResolvedTransportRouting::Forward {
148                        pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
149                        forward_path,
150                        return_paths,
151                    },
152                    None,
153                ))
154            }
155            DestinationRouting::Return(matcher) => {
156                let FoundSurb {
157                    sender_id,
158                    surb,
159                    remaining,
160                } = self
161                    .surb_store
162                    .find_surb(matcher)
163                    .await
164                    .ok_or(HoprTransportError::Api(format!(
165                        "no surb for pseudonym {}",
166                        matcher.pseudonym()
167                    )))?;
168                Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
169            }
170        }
171    }
172}