hopr_transport/
helpers.rs

1use std::sync::Arc;
2
3use async_lock::RwLock;
4use futures::{TryStreamExt, stream::FuturesUnordered};
5use hopr_api::{
6    chain::ChainKeyOperations,
7    db::{FoundSurb, HoprDbProtocolOperations},
8};
9use hopr_chain_types::chain_events::NetworkRegistryStatus;
10use hopr_crypto_packet::prelude::*;
11use hopr_crypto_types::{crypto_traits::Randomizable, prelude::OffchainPublicKey};
12use hopr_internal_types::prelude::*;
13use hopr_network_types::prelude::*;
14use hopr_path::{ChainPath, PathAddressResolver, ValidatedPath, errors::PathError, selectors::PathSelector};
15use hopr_primitive_types::prelude::*;
16use tracing::trace;
17
18use crate::errors::HoprTransportError;
19
20#[cfg(all(feature = "prometheus", not(test)))]
21lazy_static::lazy_static! {
22    static ref METRIC_PATH_LENGTH: hopr_metrics::SimpleHistogram = hopr_metrics::SimpleHistogram::new(
23        "hopr_path_length",
24        "Distribution of number of hops of sent messages",
25        vec![0.0, 1.0, 2.0, 3.0, 4.0]
26    ).unwrap();
27}
28
29#[derive(Debug, Copy, Clone, PartialEq, Eq)]
30pub enum PeerEligibility {
31    Eligible,
32    Ineligible,
33}
34
35impl From<NetworkRegistryStatus> for PeerEligibility {
36    fn from(value: NetworkRegistryStatus) -> Self {
37        match value {
38            NetworkRegistryStatus::Allowed => Self::Eligible,
39            NetworkRegistryStatus::Denied => Self::Ineligible,
40        }
41    }
42}
43
44/// Ticket statistics data exposed by the ticket mechanism.
45#[derive(Debug, Copy, Clone, PartialEq)]
46pub struct TicketStatistics {
47    pub winning_count: u128,
48    pub unredeemed_value: HoprBalance,
49    pub redeemed_value: HoprBalance,
50    pub neglected_value: HoprBalance,
51    pub rejected_value: HoprBalance,
52}
53
54#[derive(Clone)]
55pub(crate) struct PathPlanner<Db, R, S> {
56    db: Db,
57    resolver: R,
58    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
59    selector: S,
60    me: Address,
61}
62
63struct ChainPathResolver<'c, R>(&'c R);
64
65#[async_trait::async_trait]
66impl<'c, R: ChainKeyOperations + Sync> PathAddressResolver for ChainPathResolver<'c, R> {
67    async fn resolve_transport_address(&self, address: &Address) -> Result<Option<OffchainPublicKey>, PathError> {
68        self.0
69            .chain_key_to_packet_key(address)
70            .await
71            .map_err(|e| PathError::UnknownPeer(format!("{address}: {e}")))
72    }
73
74    async fn resolve_chain_address(&self, key: &OffchainPublicKey) -> Result<Option<Address>, PathError> {
75        self.0
76            .packet_key_to_chain_key(key)
77            .await
78            .map_err(|e| PathError::UnknownPeer(format!("{key}: {e}")))
79    }
80}
81
82impl<Db, R, S> PathPlanner<Db, R, S>
83where
84    Db: HoprDbProtocolOperations + Send + Sync + 'static,
85    R: ChainKeyOperations + Send + Sync + 'static,
86    S: PathSelector + Send + Sync,
87{
88    pub(crate) fn new(
89        me: Address,
90        db: Db,
91        resolver: R,
92        selector: S,
93        channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
94    ) -> Self {
95        Self {
96            db,
97            resolver,
98            channel_graph,
99            selector,
100            me,
101        }
102    }
103
104    pub(crate) fn channel_graph(&self) -> Arc<RwLock<hopr_path::channel_graph::ChannelGraph>> {
105        self.channel_graph.clone()
106    }
107
108    async fn resolve_node_id_to_addr(&self, node_id: &NodeId) -> crate::errors::Result<Address> {
109        match node_id {
110            NodeId::Chain(addr) => Ok(*addr),
111            NodeId::Offchain(key) => self
112                .resolver
113                .packet_key_to_chain_key(key)
114                .await
115                .map_err(|e| {
116                    HoprTransportError::Other(format!("failed to resolve offchain key to chain key: {e}").into())
117                })?
118                .ok_or(HoprTransportError::Other(
119                    "failed to resolve offchain key to chain key: no chain key found".into(),
120                )),
121        }
122    }
123
124    #[tracing::instrument(level = "trace", skip(self))]
125    async fn resolve_path(
126        &self,
127        source: NodeId,
128        destination: NodeId,
129        options: RoutingOptions,
130    ) -> crate::errors::Result<ValidatedPath> {
131        let cg = self.channel_graph.read_arc().await;
132        let path = match options {
133            RoutingOptions::IntermediatePath(path) => {
134                trace!(?path, "resolving a specific path");
135
136                ValidatedPath::new(
137                    source,
138                    path.into_iter().chain(std::iter::once(destination)).collect::<Vec<_>>(),
139                    &cg,
140                    &ChainPathResolver(&self.resolver),
141                )
142                .await?
143            }
144            RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
145                trace!(hops = 0, "resolving zero-hop path");
146
147                ValidatedPath::new(source, vec![destination], &cg, &ChainPathResolver(&self.resolver)).await?
148            }
149            RoutingOptions::Hops(hops) => {
150                trace!(%hops, "resolving path using hop count");
151
152                let dst = self.resolve_node_id_to_addr(&destination).await?;
153
154                let cp = self
155                    .selector
156                    .select_path(
157                        self.resolve_node_id_to_addr(&source).await?,
158                        dst,
159                        hops.into(),
160                        hops.into(),
161                    )
162                    .await?;
163
164                ValidatedPath::new(
165                    source,
166                    ChainPath::from_channel_path(cp, dst),
167                    &cg,
168                    &ChainPathResolver(&self.resolver),
169                )
170                .await?
171            }
172        };
173
174        #[cfg(all(feature = "prometheus", not(test)))]
175        {
176            use hopr_path::Path;
177            hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
178        }
179
180        trace!(%path, "validated resolved path");
181
182        Ok(path)
183    }
184
185    #[tracing::instrument(level = "trace", skip(self))]
186    pub(crate) async fn resolve_routing(
187        &self,
188        size_hint: usize,
189        max_surbs: usize,
190        routing: DestinationRouting,
191    ) -> crate::errors::Result<(ResolvedTransportRouting, Option<usize>)> {
192        match routing {
193            DestinationRouting::Forward {
194                destination,
195                pseudonym,
196                forward_options,
197                return_options,
198            } => {
199                let forward_path = self.resolve_path(self.me.into(), *destination, forward_options).await?;
200
201                let return_paths = if let Some(return_options) = return_options {
202                    // Safeguard for the correct number of SURBs
203                    let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
204                    trace!(%destination, %num_possible_surbs, data_len = size_hint, max_surbs, "resolving packet return paths");
205
206                    (0..num_possible_surbs)
207                        .map(|_| self.resolve_path(*destination, self.me.into(), return_options.clone()))
208                        .collect::<FuturesUnordered<_>>()
209                        .try_collect::<Vec<ValidatedPath>>()
210                        .await?
211                } else {
212                    vec![]
213                };
214
215                trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
216
217                Ok((
218                    ResolvedTransportRouting::Forward {
219                        pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
220                        forward_path,
221                        return_paths,
222                    },
223                    None,
224                ))
225            }
226            DestinationRouting::Return(matcher) => {
227                let FoundSurb {
228                    sender_id,
229                    surb,
230                    remaining,
231                } = self
232                    .db
233                    .find_surb(matcher)
234                    .await
235                    .map_err(|e| HoprTransportError::Other(e.into()))?;
236                Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
237            }
238        }
239    }
240}