hopr_transport/
helpers.rs

1use futures::{TryStreamExt, stream::FuturesUnordered};
2use hopr_api::{
3    chain::{ChainKeyOperations, ChainPathResolver, ChainReadChannelOperations},
4    db::{FoundSurb, HoprDbProtocolOperations},
5};
6use hopr_crypto_packet::prelude::*;
7use hopr_crypto_types::crypto_traits::Randomizable;
8use hopr_internal_types::prelude::*;
9use hopr_network_types::prelude::*;
10use hopr_primitive_types::prelude::*;
11use tracing::trace;
12
13use crate::errors::HoprTransportError;
14
15#[cfg(all(feature = "prometheus", not(test)))]
16lazy_static::lazy_static! {
17    static ref METRIC_PATH_LENGTH: hopr_metrics::SimpleHistogram = hopr_metrics::SimpleHistogram::new(
18        "hopr_path_length",
19        "Distribution of number of hops of sent messages",
20        vec![0.0, 1.0, 2.0, 3.0, 4.0]
21    ).unwrap();
22}
23
24/// Ticket statistics data exposed by the ticket mechanism.
25#[derive(Debug, Copy, Clone, PartialEq)]
26pub struct TicketStatistics {
27    pub winning_count: u128,
28    pub unredeemed_value: HoprBalance,
29    pub redeemed_value: HoprBalance,
30    pub neglected_value: HoprBalance,
31    pub rejected_value: HoprBalance,
32}
33
34#[derive(Clone)]
35pub(crate) struct PathPlanner<Db, R, S> {
36    db: Db,
37    resolver: R,
38    selector: S,
39    me: Address,
40}
41
42impl<Db, R, S> PathPlanner<Db, R, S>
43where
44    Db: HoprDbProtocolOperations + Send + Sync + 'static,
45    R: ChainKeyOperations + ChainReadChannelOperations + Send + Sync + 'static,
46    S: PathSelector + Send + Sync,
47{
48    pub(crate) fn new(me: Address, db: Db, resolver: R, selector: S) -> Self {
49        Self {
50            db,
51            resolver,
52            selector,
53            me,
54        }
55    }
56
57    async fn resolve_node_id_to_addr(&self, node_id: &NodeId) -> crate::errors::Result<Address> {
58        match node_id {
59            NodeId::Chain(addr) => Ok(*addr),
60            NodeId::Offchain(key) => self
61                .resolver
62                .packet_key_to_chain_key(key)
63                .await
64                .map_err(|e| {
65                    HoprTransportError::Other(format!("failed to resolve offchain key to chain key: {e}").into())
66                })?
67                .ok_or(HoprTransportError::Other(
68                    "failed to resolve offchain key to chain key: no chain key found".into(),
69                )),
70        }
71    }
72
73    #[tracing::instrument(level = "trace", skip(self))]
74    async fn resolve_path(
75        &self,
76        source: NodeId,
77        destination: NodeId,
78        options: RoutingOptions,
79    ) -> crate::errors::Result<ValidatedPath> {
80        let resolver = ChainPathResolver::from(&self.resolver);
81        let path = match options {
82            RoutingOptions::IntermediatePath(path) => {
83                trace!(?path, "resolving a specific path");
84
85                ValidatedPath::new(
86                    source,
87                    path.into_iter().chain(std::iter::once(destination)).collect::<Vec<_>>(),
88                    &resolver,
89                )
90                .await?
91            }
92            RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
93                trace!(hops = 0, "resolving zero-hop path");
94
95                ValidatedPath::new(source, vec![destination], &resolver).await?
96            }
97            RoutingOptions::Hops(hops) => {
98                trace!(%hops, "resolving path using hop count");
99
100                let dst = self.resolve_node_id_to_addr(&destination).await?;
101
102                let cp = self
103                    .selector
104                    .select_path(
105                        self.resolve_node_id_to_addr(&source).await?,
106                        dst,
107                        hops.into(),
108                        hops.into(),
109                    )
110                    .await?;
111
112                ValidatedPath::new(source, ChainPath::from_channel_path(cp, dst), &resolver).await?
113            }
114        };
115
116        #[cfg(all(feature = "prometheus", not(test)))]
117        {
118            use hopr_internal_types::path::Path;
119            hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
120        }
121
122        trace!(%path, "validated resolved path");
123
124        Ok(path)
125    }
126
127    #[tracing::instrument(level = "trace", skip(self))]
128    pub(crate) async fn resolve_routing(
129        &self,
130        size_hint: usize,
131        max_surbs: usize,
132        routing: DestinationRouting,
133    ) -> crate::errors::Result<(ResolvedTransportRouting, Option<usize>)> {
134        match routing {
135            DestinationRouting::Forward {
136                destination,
137                pseudonym,
138                forward_options,
139                return_options,
140            } => {
141                let forward_path = self.resolve_path(self.me.into(), *destination, forward_options).await?;
142
143                let return_paths = if let Some(return_options) = return_options {
144                    // Safeguard for the correct number of SURBs
145                    let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
146                    trace!(%destination, %num_possible_surbs, data_len = size_hint, max_surbs, "resolving packet return paths");
147
148                    (0..num_possible_surbs)
149                        .map(|_| self.resolve_path(*destination, self.me.into(), return_options.clone()))
150                        .collect::<FuturesUnordered<_>>()
151                        .try_collect::<Vec<ValidatedPath>>()
152                        .await?
153                } else {
154                    vec![]
155                };
156
157                trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
158
159                Ok((
160                    ResolvedTransportRouting::Forward {
161                        pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
162                        forward_path,
163                        return_paths,
164                    },
165                    None,
166                ))
167            }
168            DestinationRouting::Return(matcher) => {
169                let FoundSurb {
170                    sender_id,
171                    surb,
172                    remaining,
173                } = self
174                    .db
175                    .find_surb(matcher)
176                    .await
177                    .map_err(|e| HoprTransportError::Other(e.into()))?;
178                Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
179            }
180        }
181    }
182}