hopr_transport/
helpers.rs

1use async_lock::RwLock;
2use futures::channel::mpsc::Sender;
3use std::sync::{Arc, OnceLock};
4use tracing::trace;
5
6use hopr_chain_types::chain_events::NetworkRegistryStatus;
7use hopr_crypto_types::types::OffchainPublicKey;
8use hopr_db_sql::HoprDbAllOperations;
9use hopr_internal_types::protocol::ApplicationData;
10use hopr_network_types::prelude::RoutingOptions;
11use hopr_path::{path::TransportPath, selectors::PathSelector};
12use hopr_primitive_types::primitives::Address;
13use hopr_transport_identity::PeerId;
14use hopr_transport_protocol::msg::processor::{MsgSender, SendMsgInput};
15use hopr_transport_session::{
16    errors::{SessionManagerError, TransportSessionError},
17    traits::SendMsg,
18};
19
20#[cfg(all(feature = "prometheus", not(test)))]
21lazy_static::lazy_static! {
22    static ref METRIC_PATH_LENGTH: hopr_metrics::metrics::SimpleHistogram = hopr_metrics::metrics::SimpleHistogram::new(
23        "hopr_path_length",
24        "Distribution of number of hops of sent messages",
25        vec![0.0, 1.0, 2.0, 3.0, 4.0]
26    ).unwrap();
27}
28
29use crate::{constants::RESERVED_SESSION_TAG_UPPER_LIMIT, errors::HoprTransportError};
30
31#[derive(Debug, Copy, Clone, PartialEq, Eq)]
32pub enum PeerEligibility {
33    Eligible,
34    Ineligible,
35}
36
37impl From<NetworkRegistryStatus> for PeerEligibility {
38    fn from(value: NetworkRegistryStatus) -> Self {
39        match value {
40            NetworkRegistryStatus::Allowed => Self::Eligible,
41            NetworkRegistryStatus::Denied => Self::Ineligible,
42        }
43    }
44}
45
46/// Ticket statistics data exposed by the ticket mechanism.
47#[derive(Debug, Copy, Clone, PartialEq)]
48pub struct TicketStatistics {
49    pub winning_count: u128,
50    pub unredeemed_value: hopr_primitive_types::primitives::Balance,
51    pub redeemed_value: hopr_primitive_types::primitives::Balance,
52    pub neglected_value: hopr_primitive_types::primitives::Balance,
53    pub rejected_value: hopr_primitive_types::primitives::Balance,
54}
55
56#[derive(Clone)]
57pub(crate) struct PathPlanner<T, S> {
58    db: T,
59    channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
60    selector: S,
61    me: Address,
62}
63
64impl<T, S> PathPlanner<T, S>
65where
66    T: HoprDbAllOperations + std::fmt::Debug + Send + Sync + 'static,
67    S: PathSelector + Send + Sync,
68{
69    pub(crate) fn new(
70        db: T,
71        selector: S,
72        channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
73        me: Address,
74    ) -> Self {
75        Self {
76            db,
77            channel_graph,
78            me,
79            selector,
80        }
81    }
82
83    pub(crate) fn channel_graph(&self) -> Arc<RwLock<hopr_path::channel_graph::ChannelGraph>> {
84        self.channel_graph.clone()
85    }
86
87    #[tracing::instrument(level = "trace", skip(self))]
88    pub(crate) async fn resolve_path(
89        &self,
90        destination: PeerId,
91        options: RoutingOptions,
92    ) -> crate::errors::Result<TransportPath> {
93        let path = match options {
94            RoutingOptions::IntermediatePath(path) => {
95                let complete_path = Vec::from_iter(path.into_iter().chain([destination]));
96                trace!(full_path = ?complete_path, "resolved a specific path");
97
98                let cg = self.channel_graph.read().await;
99
100                TransportPath::resolve(complete_path, &self.db, &cg)
101                    .await
102                    .map(|(p, _)| p)?
103            }
104            RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
105                trace!(hops = 0, %destination, "resolved zero-hop path");
106                TransportPath::direct(destination)
107            }
108            RoutingOptions::Hops(hops) => {
109                trace!(%hops, "resolved path using hop count");
110
111                let pk = OffchainPublicKey::try_from(destination)?;
112
113                if let Some(chain_key) = self
114                    .db
115                    .translate_key(None, pk)
116                    .await
117                    .map_err(hopr_db_sql::api::errors::DbError::from)?
118                {
119                    let target_chain_key: Address = chain_key.try_into()?;
120                    let cp = self
121                        .selector
122                        .select_path(self.me, target_chain_key, hops.into(), hops.into())
123                        .await?;
124
125                    let full_path = cp.into_path(&self.db, target_chain_key).await?;
126                    trace!(%full_path, "resolved automatic path");
127
128                    full_path
129                } else {
130                    return Err(HoprTransportError::Api(
131                        "send msg: unknown destination peer id encountered".to_owned(),
132                    ));
133                }
134            }
135        };
136
137        #[cfg(all(feature = "prometheus", not(test)))]
138        {
139            use hopr_path::path::Path;
140            hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.hops().len() - 1) as f64);
141        }
142
143        Ok(path)
144    }
145}
146
147#[derive(Clone)]
148pub(crate) struct MessageSender<T, S> {
149    pub process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
150    pub resolver: PathPlanner<T, S>,
151}
152
153impl<T, S> MessageSender<T, S>
154where
155    T: HoprDbAllOperations + std::fmt::Debug + Send + Sync + 'static,
156    S: PathSelector + Send + Sync,
157{
158    pub fn new(
159        process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
160        resolver: PathPlanner<T, S>,
161    ) -> Self {
162        Self {
163            process_packet_send,
164            resolver,
165        }
166    }
167}
168
169#[async_trait::async_trait]
170impl<T, S> SendMsg for MessageSender<T, S>
171where
172    T: HoprDbAllOperations + std::fmt::Debug + Send + Sync + 'static,
173    S: PathSelector + Send + Sync,
174{
175    #[tracing::instrument(level = "debug", skip(self, data))]
176    async fn send_message(
177        &self,
178        data: ApplicationData,
179        destination: PeerId,
180        options: RoutingOptions,
181    ) -> std::result::Result<(), TransportSessionError> {
182        data.application_tag
183            .is_some_and(|application_tag| application_tag < RESERVED_SESSION_TAG_UPPER_LIMIT)
184            .then_some(())
185            .ok_or(TransportSessionError::Tag)?;
186
187        let path = self
188            .resolver
189            .resolve_path(destination, options)
190            .await
191            .map_err(|_| TransportSessionError::Path)?;
192
193        self.process_packet_send
194            .get()
195            .ok_or_else(|| SessionManagerError::NotStarted)?
196            .send_packet(data, path)
197            .await
198            .map_err(|_| TransportSessionError::Closed)?
199            .consume_and_wait(crate::constants::PACKET_QUEUE_TIMEOUT_MILLISECONDS)
200            .await
201            .map_err(|_e| TransportSessionError::Timeout)?;
202
203        trace!("Packet sent to the outgoing queue");
204
205        Ok(())
206    }
207}