hopr_transport/
helpers.rs

1use std::sync::Arc;
2
3use async_lock::RwLock;
4use futures::{StreamExt, TryStreamExt, channel::mpsc::Sender, stream::FuturesUnordered};
5use hopr_chain_types::chain_events::NetworkRegistryStatus;
6use hopr_crypto_packet::prelude::HoprPacket;
7use hopr_crypto_types::crypto_traits::Randomizable;
8use hopr_db_sql::{HoprDbAllOperations, prelude::FoundSurb};
9use hopr_internal_types::prelude::*;
10use hopr_network_types::{
11    prelude::{ResolvedTransportRouting, RoutingOptions},
12    types::DestinationRouting,
13};
14use hopr_path::{ChainPath, PathAddressResolver, ValidatedPath, selectors::PathSelector};
15use hopr_primitive_types::{prelude::HoprBalance, primitives::Address};
16use hopr_protocol_app::v1::{ApplicationData, ApplicationFlag, ApplicationFlags};
17use hopr_transport_protocol::processor::{MsgSender, SendMsgInput};
18use tracing::trace;
19
20use crate::constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
21
22#[cfg(all(feature = "prometheus", not(test)))]
23lazy_static::lazy_static! {
24    static ref METRIC_PATH_LENGTH: hopr_metrics::metrics::SimpleHistogram = hopr_metrics::metrics::SimpleHistogram::new(
25        "hopr_path_length",
26        "Distribution of number of hops of sent messages",
27        vec![0.0, 1.0, 2.0, 3.0, 4.0]
28    ).unwrap();
29}
30
31#[derive(Debug, Copy, Clone, PartialEq, Eq)]
32pub enum PeerEligibility {
33    Eligible,
34    Ineligible,
35}
36
37impl From<NetworkRegistryStatus> for PeerEligibility {
38    fn from(value: NetworkRegistryStatus) -> Self {
39        match value {
40            NetworkRegistryStatus::Allowed => Self::Eligible,
41            NetworkRegistryStatus::Denied => Self::Ineligible,
42        }
43    }
44}
45
46/// Ticket statistics data exposed by the ticket mechanism.
47#[derive(Debug, Copy, Clone, PartialEq)]
48pub struct TicketStatistics {
49    pub winning_count: u128,
50    pub unredeemed_value: HoprBalance,
51    pub redeemed_value: HoprBalance,
52    pub neglected_value: HoprBalance,
53    pub rejected_value: HoprBalance,
54}
55
56#[derive(Clone)]
57pub(crate) struct PathPlanner<T, S> {
58    db: T,
59    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
60    selector: S,
61    me: Address,
62}
63
64const DEFAULT_PACKET_PLANNER_CONCURRENCY: usize = 10;
65
66impl<T, S> PathPlanner<T, S>
67where
68    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Send + Sync + 'static,
69    S: PathSelector + Send + Sync,
70{
71    pub(crate) fn new(
72        me: Address,
73        db: T,
74        selector: S,
75        channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
76    ) -> Self {
77        Self {
78            db,
79            channel_graph,
80            selector,
81            me,
82        }
83    }
84
85    pub(crate) fn channel_graph(&self) -> Arc<RwLock<hopr_path::channel_graph::ChannelGraph>> {
86        self.channel_graph.clone()
87    }
88
89    #[tracing::instrument(level = "trace", skip(self))]
90    async fn resolve_path(
91        &self,
92        source: Address,
93        destination: Address,
94        options: RoutingOptions,
95    ) -> crate::errors::Result<ValidatedPath> {
96        let cg = self.channel_graph.read_arc().await;
97        let path = match options {
98            RoutingOptions::IntermediatePath(path) => {
99                trace!(?path, "resolving a specific path");
100
101                ValidatedPath::new(
102                    source,
103                    ChainPath::new(path.into_iter().chain(std::iter::once(destination)))?,
104                    &cg,
105                    &self.db,
106                )
107                .await?
108            }
109            RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
110                trace!(hops = 0, "resolving zero-hop path");
111
112                ValidatedPath::new(source, ChainPath::direct(destination), &cg, &self.db).await?
113            }
114            RoutingOptions::Hops(hops) => {
115                trace!(%hops, "resolving path using hop count");
116
117                let cp = self
118                    .selector
119                    .select_path(source, destination, hops.into(), hops.into())
120                    .await?;
121
122                ValidatedPath::new(source, ChainPath::from_channel_path(cp, destination), &cg, &self.db).await?
123            }
124        };
125
126        #[cfg(all(feature = "prometheus", not(test)))]
127        {
128            use hopr_path::Path;
129            hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
130        }
131
132        trace!(%path, "validated resolved path");
133
134        Ok(path)
135    }
136
137    #[tracing::instrument(level = "trace", skip(self))]
138    pub(crate) async fn resolve_routing(
139        &self,
140        size_hint: usize,
141        routing: DestinationRouting,
142    ) -> crate::errors::Result<(ResolvedTransportRouting, Option<usize>)> {
143        match routing {
144            DestinationRouting::Forward {
145                destination,
146                pseudonym,
147                forward_options,
148                return_options,
149            } => {
150                let forward_path = self.resolve_path(self.me, destination, forward_options).await?;
151
152                let return_paths = if let Some(return_options) = return_options {
153                    let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint);
154                    trace!(%destination, %num_possible_surbs, data_len = size_hint, "resolving packet return paths");
155
156                    (0..num_possible_surbs)
157                        .map(|_| self.resolve_path(destination, self.me, return_options.clone()))
158                        .collect::<FuturesUnordered<_>>()
159                        .try_collect::<Vec<ValidatedPath>>()
160                        .await?
161                } else {
162                    vec![]
163                };
164
165                trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
166
167                Ok((
168                    ResolvedTransportRouting::Forward {
169                        pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
170                        forward_path,
171                        return_paths,
172                    },
173                    None,
174                ))
175            }
176            DestinationRouting::Return(matcher) => {
177                let FoundSurb {
178                    sender_id,
179                    surb,
180                    remaining,
181                } = self.db.find_surb(matcher).await?;
182                Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
183            }
184        }
185    }
186}
187
188// TODO: consider making this a `with` decorator for `MsgSender`
189// ^^ This requires
190//   a) `MsgSender` to be a `Sink`
191//   b) Dropping the `Clone` requirement on `Sink` that is given into `SessionManager`
192// However the DestinationRouting resolution concurrency (from for_each_concurrent) would be lost.
193pub(crate) fn run_packet_planner<T, S>(
194    planner: PathPlanner<T, S>,
195    packet_sender: MsgSender<Sender<SendMsgInput>>,
196) -> Sender<(DestinationRouting, ApplicationData)>
197where
198    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
199    S: PathSelector + Clone + Send + Sync + 'static,
200{
201    let (tx, rx) =
202        futures::channel::mpsc::channel::<(DestinationRouting, ApplicationData)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
203
204    let planner_concurrency = std::env::var("HOPR_PACKET_PLANNER_CONCURRENCY")
205        .ok()
206        .and_then(|v| v.parse().ok())
207        .unwrap_or(DEFAULT_PACKET_PLANNER_CONCURRENCY);
208
209    let distress_threshold = planner.db.get_surb_config().distress_threshold;
210    hopr_async_runtime::prelude::spawn(rx.for_each_concurrent(planner_concurrency, move |(routing, data)| {
211        let planner = planner.clone();
212        let packet_sender = packet_sender.clone();
213        async move {
214            match planner.resolve_routing(data.len(), routing).await {
215                Ok((resolved, rem_surbs)) => {
216                    // Set the SURB distress/out-of-SURBs flag if applicable.
217                    // These flags are translated into HOPR protocol packet signals.
218                    let flags: ApplicationFlags = match rem_surbs {
219                        Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
220                            ApplicationFlag::SurbDistress.into()
221                        }
222                        Some(0) => ApplicationFlag::OutOfSurbs.into(),
223                        _ => data.flags - (ApplicationFlag::OutOfSurbs | ApplicationFlag::SurbDistress),
224                    };
225
226                    // The awaiter here is intentionally dropped,
227                    // since we do not intend to be notified about packet delivery to the first hop
228                    if let Err(error) = packet_sender.send_packet(data.with_flags(flags), resolved).await {
229                        tracing::error!(%error, "failed to enqueue packet for sending");
230                    }
231                }
232                Err(error) => tracing::error!(%error, "failed to resolve path for routing"),
233            }
234        }
235    }));
236
237    tx
238}