hopr_transport/
helpers.rs

1use futures::{TryStreamExt, stream::FuturesUnordered};
2use hopr_api::chain::{ChainKeyOperations, ChainPathResolver, ChainReadChannelOperations};
3use hopr_crypto_packet::prelude::*;
4use hopr_crypto_types::crypto_traits::Randomizable;
5use hopr_internal_types::prelude::*;
6use hopr_network_types::prelude::*;
7use hopr_primitive_types::prelude::*;
8use hopr_protocol_hopr::{FoundSurb, SurbStore};
9use tracing::trace;
10
11use crate::errors::HoprTransportError;
12
13#[cfg(all(feature = "prometheus", not(test)))]
14lazy_static::lazy_static! {
15    static ref METRIC_PATH_LENGTH: hopr_metrics::SimpleHistogram = hopr_metrics::SimpleHistogram::new(
16        "hopr_path_length",
17        "Distribution of number of hops of sent messages",
18        vec![0.0, 1.0, 2.0, 3.0, 4.0]
19    ).unwrap();
20}
21
22#[derive(Clone)]
23pub(crate) struct PathPlanner<Surb, R, S> {
24    pub(crate) surb_store: Surb,
25    resolver: R,
26    selector: S,
27    me: Address,
28}
29
30impl<Surb, R, S> PathPlanner<Surb, R, S>
31where
32    Surb: SurbStore + Send + Sync + 'static,
33    R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
34    S: PathSelector + Send + Sync,
35{
36    pub(crate) fn new(me: Address, surb_store: Surb, resolver: R, selector: S) -> Self {
37        Self {
38            surb_store,
39            resolver,
40            selector,
41            me,
42        }
43    }
44
45    async fn resolve_node_id_to_addr(&self, node_id: &NodeId) -> crate::errors::Result<Address> {
46        match node_id {
47            NodeId::Chain(addr) => Ok(*addr),
48            NodeId::Offchain(key) => self
49                .resolver
50                .packet_key_to_chain_key(key)
51                .await
52                .map_err(|e| {
53                    HoprTransportError::Other(anyhow::anyhow!("failed to resolve offchain key to chain key: {e}"))
54                })?
55                .ok_or(HoprTransportError::Other(anyhow::anyhow!(
56                    "failed to resolve offchain key to chain key: no chain key found"
57                ))),
58        }
59    }
60
61    #[tracing::instrument(level = "trace", skip(self))]
62    async fn resolve_path(
63        &self,
64        source: NodeId,
65        destination: NodeId,
66        options: RoutingOptions,
67    ) -> crate::errors::Result<ValidatedPath> {
68        let resolver = ChainPathResolver::from(&self.resolver);
69        let path = match options {
70            RoutingOptions::IntermediatePath(path) => {
71                trace!(?path, "resolving a specific path");
72
73                ValidatedPath::new(
74                    source,
75                    path.into_iter().chain(std::iter::once(destination)).collect::<Vec<_>>(),
76                    &resolver,
77                )
78                .await?
79            }
80            RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
81                trace!(hops = 0, "resolving zero-hop path");
82
83                ValidatedPath::new(source, vec![destination], &resolver).await?
84            }
85            RoutingOptions::Hops(hops) => {
86                trace!(%hops, "resolving path using hop count");
87
88                let dst = self.resolve_node_id_to_addr(&destination).await?;
89
90                let cp = self
91                    .selector
92                    .select_path(
93                        self.resolve_node_id_to_addr(&source).await?,
94                        dst,
95                        hops.into(),
96                        hops.into(),
97                    )
98                    .await?;
99
100                ValidatedPath::new(source, ChainPath::from_channel_path(cp, dst), &resolver).await?
101            }
102        };
103
104        #[cfg(all(feature = "prometheus", not(test)))]
105        {
106            use hopr_internal_types::path::Path;
107            hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
108        }
109
110        trace!(%path, "validated resolved path");
111
112        Ok(path)
113    }
114
115    #[tracing::instrument(level = "trace", skip(self))]
116    pub(crate) async fn resolve_routing(
117        &self,
118        size_hint: usize,
119        max_surbs: usize,
120        routing: DestinationRouting,
121    ) -> crate::errors::Result<(ResolvedTransportRouting, Option<usize>)> {
122        match routing {
123            DestinationRouting::Forward {
124                destination,
125                pseudonym,
126                forward_options,
127                return_options,
128            } => {
129                let forward_path = self.resolve_path(self.me.into(), *destination, forward_options).await?;
130
131                let return_paths = if let Some(return_options) = return_options {
132                    // Safeguard for the correct number of SURBs
133                    let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
134                    trace!(%destination, %num_possible_surbs, data_len = size_hint, max_surbs, "resolving packet return paths");
135
136                    (0..num_possible_surbs)
137                        .map(|_| self.resolve_path(*destination, self.me.into(), return_options.clone()))
138                        .collect::<FuturesUnordered<_>>()
139                        .try_collect::<Vec<ValidatedPath>>()
140                        .await?
141                } else {
142                    vec![]
143                };
144
145                trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
146
147                Ok((
148                    ResolvedTransportRouting::Forward {
149                        pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
150                        forward_path,
151                        return_paths,
152                    },
153                    None,
154                ))
155            }
156            DestinationRouting::Return(matcher) => {
157                let FoundSurb {
158                    sender_id,
159                    surb,
160                    remaining,
161                } = self
162                    .surb_store
163                    .find_surb(matcher)
164                    .await
165                    .ok_or(HoprTransportError::Api("no surb".into()))?;
166                Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
167            }
168        }
169    }
170}