hopr_transport/
helpers.rs

1use std::sync::Arc;
2
3use async_lock::RwLock;
4use futures::{FutureExt, StreamExt, TryStreamExt, channel::mpsc::Sender, stream::FuturesUnordered};
5use hopr_chain_types::chain_events::NetworkRegistryStatus;
6use hopr_crypto_packet::prelude::*;
7use hopr_crypto_types::crypto_traits::Randomizable;
8use hopr_db_sql::{HoprDbAllOperations, prelude::FoundSurb};
9use hopr_internal_types::prelude::*;
10use hopr_network_types::prelude::*;
11use hopr_path::{ChainPath, PathAddressResolver, ValidatedPath, selectors::PathSelector};
12use hopr_primitive_types::prelude::*;
13use hopr_protocol_app::prelude::*;
14use hopr_transport_protocol::processor::{MsgSender, SendMsgInput};
15use tracing::trace;
16
17use crate::constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21    static ref METRIC_PATH_LENGTH: hopr_metrics::metrics::SimpleHistogram = hopr_metrics::metrics::SimpleHistogram::new(
22        "hopr_path_length",
23        "Distribution of number of hops of sent messages",
24        vec![0.0, 1.0, 2.0, 3.0, 4.0]
25    ).unwrap();
26}
27
28#[derive(Debug, Copy, Clone, PartialEq, Eq)]
29pub enum PeerEligibility {
30    Eligible,
31    Ineligible,
32}
33
34impl From<NetworkRegistryStatus> for PeerEligibility {
35    fn from(value: NetworkRegistryStatus) -> Self {
36        match value {
37            NetworkRegistryStatus::Allowed => Self::Eligible,
38            NetworkRegistryStatus::Denied => Self::Ineligible,
39        }
40    }
41}
42
43/// Ticket statistics data exposed by the ticket mechanism.
44#[derive(Debug, Copy, Clone, PartialEq)]
45pub struct TicketStatistics {
46    pub winning_count: u128,
47    pub unredeemed_value: HoprBalance,
48    pub redeemed_value: HoprBalance,
49    pub neglected_value: HoprBalance,
50    pub rejected_value: HoprBalance,
51}
52
53#[derive(Clone)]
54pub(crate) struct PathPlanner<T, S> {
55    db: T,
56    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
57    selector: S,
58    me: Address,
59}
60
61const DEFAULT_PACKET_PLANNER_CONCURRENCY: usize = 10;
62
63impl<T, S> PathPlanner<T, S>
64where
65    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Send + Sync + 'static,
66    S: PathSelector + Send + Sync,
67{
68    pub(crate) fn new(
69        me: Address,
70        db: T,
71        selector: S,
72        channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
73    ) -> Self {
74        Self {
75            db,
76            channel_graph,
77            selector,
78            me,
79        }
80    }
81
82    pub(crate) fn channel_graph(&self) -> Arc<RwLock<hopr_path::channel_graph::ChannelGraph>> {
83        self.channel_graph.clone()
84    }
85
86    #[tracing::instrument(level = "trace", skip(self))]
87    async fn resolve_path(
88        &self,
89        source: Address,
90        destination: Address,
91        options: RoutingOptions,
92    ) -> crate::errors::Result<ValidatedPath> {
93        let cg = self.channel_graph.read_arc().await;
94        let path = match options {
95            RoutingOptions::IntermediatePath(path) => {
96                trace!(?path, "resolving a specific path");
97
98                ValidatedPath::new(
99                    source,
100                    ChainPath::new(path.into_iter().chain(std::iter::once(destination)))?,
101                    &cg,
102                    &self.db,
103                )
104                .await?
105            }
106            RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
107                trace!(hops = 0, "resolving zero-hop path");
108
109                ValidatedPath::new(source, ChainPath::direct(destination), &cg, &self.db).await?
110            }
111            RoutingOptions::Hops(hops) => {
112                trace!(%hops, "resolving path using hop count");
113
114                let cp = self
115                    .selector
116                    .select_path(source, destination, hops.into(), hops.into())
117                    .await?;
118
119                ValidatedPath::new(source, ChainPath::from_channel_path(cp, destination), &cg, &self.db).await?
120            }
121        };
122
123        #[cfg(all(feature = "prometheus", not(test)))]
124        {
125            use hopr_path::Path;
126            hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
127        }
128
129        trace!(%path, "validated resolved path");
130
131        Ok(path)
132    }
133
134    #[tracing::instrument(level = "trace", skip(self))]
135    pub(crate) async fn resolve_routing(
136        &self,
137        size_hint: usize,
138        max_surbs: usize,
139        routing: DestinationRouting,
140    ) -> crate::errors::Result<(ResolvedTransportRouting, Option<usize>)> {
141        match routing {
142            DestinationRouting::Forward {
143                destination,
144                pseudonym,
145                forward_options,
146                return_options,
147            } => {
148                let forward_path = self.resolve_path(self.me, destination, forward_options).await?;
149
150                let return_paths = if let Some(return_options) = return_options {
151                    // Safeguard for the correct number of SURBs
152                    let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
153                    trace!(%destination, %num_possible_surbs, data_len = size_hint, max_surbs, "resolving packet return paths");
154
155                    (0..num_possible_surbs)
156                        .map(|_| self.resolve_path(destination, self.me, return_options.clone()))
157                        .collect::<FuturesUnordered<_>>()
158                        .try_collect::<Vec<ValidatedPath>>()
159                        .await?
160                } else {
161                    vec![]
162                };
163
164                trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
165
166                Ok((
167                    ResolvedTransportRouting::Forward {
168                        pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
169                        forward_path,
170                        return_paths,
171                    },
172                    None,
173                ))
174            }
175            DestinationRouting::Return(matcher) => {
176                let FoundSurb {
177                    sender_id,
178                    surb,
179                    remaining,
180                } = self.db.find_surb(matcher).await?;
181                Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
182            }
183        }
184    }
185}
186
187// TODO: consider making this a `with` decorator for `MsgSender`
188// ^^ This requires
189//   a) `MsgSender` to be a `Sink`
190//   b) Dropping the `Clone` requirement on `Sink` that is given into `SessionManager`
191// However the DestinationRouting resolution concurrency (from for_each_concurrent) would be lost.
192// Therefore, this will likely make sense when the path planner is behind some sort of a mutex,
193// where concurrent resolution would not make sense.
194pub(crate) fn run_packet_planner<T, S>(
195    planner: PathPlanner<T, S>,
196    packet_sender: MsgSender<Sender<SendMsgInput>>,
197) -> Sender<(DestinationRouting, ApplicationDataOut)>
198where
199    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
200    S: PathSelector + Clone + Send + Sync + 'static,
201{
202    let (tx, rx) =
203        futures::channel::mpsc::channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
204
205    let planner_concurrency = std::env::var("HOPR_PACKET_PLANNER_CONCURRENCY")
206        .ok()
207        .and_then(|v| v.parse().ok())
208        .unwrap_or(DEFAULT_PACKET_PLANNER_CONCURRENCY);
209
210    let distress_threshold = planner.db.get_surb_config().distress_threshold;
211    hopr_async_runtime::prelude::spawn(
212        rx.for_each_concurrent(planner_concurrency, move |(routing, mut data)| {
213            let planner = planner.clone();
214            let packet_sender = packet_sender.clone();
215            async move {
216                let max_surbs = data.estimate_surbs_with_msg();
217
218                match planner.resolve_routing(data.data.total_len(), max_surbs, routing).await {
219                    Ok((resolved, rem_surbs)) => {
220                        // Set the SURB distress/out-of-SURBs flag if applicable.
221                        // These flags are translated into HOPR protocol packet signals and are
222                        // applicable only on the return path.
223                        let mut signals_to_dst = data
224                            .packet_info
225                            .as_ref()
226                            .map(|info| info.signals_to_destination)
227                            .unwrap_or_default();
228
229                        if resolved.is_return() {
230                            signals_to_dst = match rem_surbs {
231                                Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
232                                    signals_to_dst | PacketSignal::SurbDistress
233                                }
234                                Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
235                                _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
236                            };
237                        } else {
238                            // Unset these flags as they make no sense on the forward path.
239                            signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
240                        }
241
242                        data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
243
244                        // The awaiter here is intentionally dropped,
245                        // since we do not intend to be notified about packet delivery to the first hop
246                        if let Err(error) = packet_sender.send_packet(data, resolved).await {
247                            tracing::error!(%error, "failed to enqueue packet for sending");
248                        }
249                    }
250                    Err(error) => tracing::error!(%error, "failed to resolve path for routing"),
251                }
252            }
253        })
254        .inspect(|_| tracing::info!(task = "packet planner", "long-running background task finished")),
255    );
256
257    tx
258}