hopr_transport/
helpers.rs

1use std::sync::{Arc, OnceLock};
2
3use async_lock::RwLock;
4use futures::{TryStreamExt, channel::mpsc::Sender, stream::FuturesUnordered};
5use hopr_chain_types::chain_events::NetworkRegistryStatus;
6use hopr_crypto_packet::prelude::HoprPacket;
7use hopr_crypto_types::crypto_traits::Randomizable;
8use hopr_db_sql::{HoprDbAllOperations, api::prelude::DbError};
9use hopr_internal_types::prelude::*;
10use hopr_network_types::{
11    prelude::{ResolvedTransportRouting, RoutingOptions},
12    types::DestinationRouting,
13};
14use hopr_path::{ChainPath, PathAddressResolver, ValidatedPath, selectors::PathSelector};
15use hopr_primitive_types::{prelude::HoprBalance, primitives::Address};
16use hopr_transport_packet::prelude::ApplicationData;
17use hopr_transport_protocol::processor::{MsgSender, SendMsgInput};
18use hopr_transport_session::{
19    errors::{SessionManagerError, TransportSessionError},
20    traits::SendMsg,
21};
22use tracing::trace;
23
24use crate::errors::HoprTransportError;
25
26#[cfg(all(feature = "prometheus", not(test)))]
27lazy_static::lazy_static! {
28    static ref METRIC_PATH_LENGTH: hopr_metrics::metrics::SimpleHistogram = hopr_metrics::metrics::SimpleHistogram::new(
29        "hopr_path_length",
30        "Distribution of number of hops of sent messages",
31        vec![0.0, 1.0, 2.0, 3.0, 4.0]
32    ).unwrap();
33}
34
35#[derive(Debug, Copy, Clone, PartialEq, Eq)]
36pub enum PeerEligibility {
37    Eligible,
38    Ineligible,
39}
40
41impl From<NetworkRegistryStatus> for PeerEligibility {
42    fn from(value: NetworkRegistryStatus) -> Self {
43        match value {
44            NetworkRegistryStatus::Allowed => Self::Eligible,
45            NetworkRegistryStatus::Denied => Self::Ineligible,
46        }
47    }
48}
49
50/// Ticket statistics data exposed by the ticket mechanism.
51#[derive(Debug, Copy, Clone, PartialEq)]
52pub struct TicketStatistics {
53    pub winning_count: u128,
54    pub unredeemed_value: HoprBalance,
55    pub redeemed_value: HoprBalance,
56    pub neglected_value: HoprBalance,
57    pub rejected_value: HoprBalance,
58}
59
60#[derive(Clone)]
61pub(crate) struct PathPlanner<T, S> {
62    db: T,
63    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
64    selector: S,
65    me: Address,
66}
67
68impl<T, S> PathPlanner<T, S>
69where
70    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Send + Sync + 'static,
71    S: PathSelector + Send + Sync,
72{
73    pub(crate) fn new(
74        me: Address,
75        db: T,
76        selector: S,
77        channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
78    ) -> Self {
79        Self {
80            db,
81            channel_graph,
82            selector,
83            me,
84        }
85    }
86
87    pub(crate) fn channel_graph(&self) -> Arc<RwLock<hopr_path::channel_graph::ChannelGraph>> {
88        self.channel_graph.clone()
89    }
90
91    #[tracing::instrument(level = "trace", skip(self))]
92    async fn resolve_path(
93        &self,
94        source: Address,
95        destination: Address,
96        options: RoutingOptions,
97    ) -> crate::errors::Result<ValidatedPath> {
98        let cg = self.channel_graph.read_arc().await;
99        let path = match options {
100            RoutingOptions::IntermediatePath(path) => {
101                trace!(?path, "resolving a specific path");
102
103                ValidatedPath::new(
104                    source,
105                    ChainPath::new(path.into_iter().chain(std::iter::once(destination)))?,
106                    &cg,
107                    &self.db,
108                )
109                .await?
110            }
111            RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
112                trace!(hops = 0, "resolving zero-hop path");
113
114                ValidatedPath::new(source, ChainPath::direct(destination), &cg, &self.db).await?
115            }
116            RoutingOptions::Hops(hops) => {
117                trace!(%hops, "resolving path using hop count");
118
119                let cp = self
120                    .selector
121                    .select_path(source, destination, hops.into(), hops.into())
122                    .await?;
123
124                ValidatedPath::new(source, ChainPath::from_channel_path(cp, destination), &cg, &self.db).await?
125            }
126        };
127
128        #[cfg(all(feature = "prometheus", not(test)))]
129        {
130            use hopr_path::Path;
131            hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
132        }
133
134        trace!(%path, "validated resolved path");
135
136        Ok(path)
137    }
138
139    #[tracing::instrument(level = "trace", skip(self))]
140    pub(crate) async fn resolve_routing(
141        &self,
142        size_hint: usize,
143        routing: DestinationRouting,
144    ) -> crate::errors::Result<ResolvedTransportRouting> {
145        match routing {
146            DestinationRouting::Forward {
147                destination,
148                pseudonym,
149                forward_options,
150                return_options,
151            } => {
152                let forward_path = self.resolve_path(self.me, destination, forward_options).await?;
153
154                let return_paths = if let Some(return_options) = return_options {
155                    let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint);
156                    trace!(%destination, %num_possible_surbs, data_len = size_hint, "resolving packet return paths");
157
158                    (0..num_possible_surbs)
159                        .map(|_| self.resolve_path(destination, self.me, return_options.clone()))
160                        .collect::<FuturesUnordered<_>>()
161                        .try_collect::<Vec<ValidatedPath>>()
162                        .await?
163                } else {
164                    vec![]
165                };
166
167                trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
168
169                Ok(ResolvedTransportRouting::Forward {
170                    pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
171                    forward_path,
172                    return_paths,
173                })
174            }
175            DestinationRouting::Return(matcher) => {
176                let (sender_id, surb) = self.db.find_surb(matcher).await?;
177                Ok(ResolvedTransportRouting::Return(sender_id, surb))
178            }
179        }
180    }
181}
182
183#[derive(Clone)]
184pub(crate) struct MessageSender<T, S> {
185    pub process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
186    pub resolver: PathPlanner<T, S>,
187}
188
189impl<T, S> MessageSender<T, S>
190where
191    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Send + Sync + 'static,
192    S: PathSelector + Send + Sync,
193{
194    pub fn new(
195        process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
196        resolver: PathPlanner<T, S>,
197    ) -> Self {
198        Self {
199            process_packet_send,
200            resolver,
201        }
202    }
203}
204
205#[async_trait::async_trait]
206impl<T, S> SendMsg for MessageSender<T, S>
207where
208    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Send + Sync + 'static,
209    S: PathSelector + Send + Sync,
210{
211    #[tracing::instrument(level = "debug", skip(self, data))]
212    async fn send_message(
213        &self,
214        data: ApplicationData,
215        routing: DestinationRouting,
216    ) -> std::result::Result<(), TransportSessionError> {
217        let routing = self
218            .resolver
219            .resolve_routing(data.len(), routing)
220            .await
221            .map_err(|error| {
222                tracing::error!(%error, "failed to resolve routing");
223                // Out of SURBs error gets a special distinction by the Session code
224                if let HoprTransportError::Db(DbError::NoSurbAvailable(_)) = error {
225                    TransportSessionError::OutOfSurbs
226                } else {
227                    TransportSessionError::Path
228                }
229            })?;
230
231        self.process_packet_send
232            .get()
233            .ok_or_else(|| SessionManagerError::NotStarted)?
234            .send_packet(data, routing)
235            .await
236            .map_err(|_| TransportSessionError::Closed)?
237            .consume_and_wait(crate::constants::PACKET_QUEUE_TIMEOUT_MILLISECONDS)
238            .await
239            .map_err(|error| {
240                tracing::error!(%error, "packet send error");
241                TransportSessionError::Timeout
242            })?;
243
244        trace!("packet sent to the outgoing queue");
245
246        Ok(())
247    }
248}