1use async_lock::RwLock;
2use futures::channel::mpsc::Sender;
3use std::sync::{Arc, OnceLock};
4use tracing::trace;
5
6use hopr_chain_types::chain_events::NetworkRegistryStatus;
7use hopr_crypto_types::types::OffchainPublicKey;
8use hopr_db_sql::HoprDbAllOperations;
9use hopr_internal_types::protocol::ApplicationData;
10use hopr_network_types::prelude::RoutingOptions;
11use hopr_path::{path::TransportPath, selectors::PathSelector};
12use hopr_primitive_types::primitives::Address;
13use hopr_transport_identity::PeerId;
14use hopr_transport_protocol::msg::processor::{MsgSender, SendMsgInput};
15use hopr_transport_session::{
16 errors::{SessionManagerError, TransportSessionError},
17 traits::SendMsg,
18};
19
20#[cfg(all(feature = "prometheus", not(test)))]
21lazy_static::lazy_static! {
22 static ref METRIC_PATH_LENGTH: hopr_metrics::metrics::SimpleHistogram = hopr_metrics::metrics::SimpleHistogram::new(
23 "hopr_path_length",
24 "Distribution of number of hops of sent messages",
25 vec![0.0, 1.0, 2.0, 3.0, 4.0]
26 ).unwrap();
27}
28
29use crate::{constants::RESERVED_SESSION_TAG_UPPER_LIMIT, errors::HoprTransportError};
30
31#[derive(Debug, Copy, Clone, PartialEq, Eq)]
32pub enum PeerEligibility {
33 Eligible,
34 Ineligible,
35}
36
37impl From<NetworkRegistryStatus> for PeerEligibility {
38 fn from(value: NetworkRegistryStatus) -> Self {
39 match value {
40 NetworkRegistryStatus::Allowed => Self::Eligible,
41 NetworkRegistryStatus::Denied => Self::Ineligible,
42 }
43 }
44}
45
46#[derive(Debug, Copy, Clone, PartialEq)]
48pub struct TicketStatistics {
49 pub winning_count: u128,
50 pub unredeemed_value: hopr_primitive_types::primitives::Balance,
51 pub redeemed_value: hopr_primitive_types::primitives::Balance,
52 pub neglected_value: hopr_primitive_types::primitives::Balance,
53 pub rejected_value: hopr_primitive_types::primitives::Balance,
54}
55
56#[derive(Clone)]
57pub(crate) struct PathPlanner<T, S> {
58 db: T,
59 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
60 selector: S,
61 me: Address,
62}
63
64impl<T, S> PathPlanner<T, S>
65where
66 T: HoprDbAllOperations + std::fmt::Debug + Send + Sync + 'static,
67 S: PathSelector + Send + Sync,
68{
69 pub(crate) fn new(
70 db: T,
71 selector: S,
72 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
73 me: Address,
74 ) -> Self {
75 Self {
76 db,
77 channel_graph,
78 me,
79 selector,
80 }
81 }
82
83 pub(crate) fn channel_graph(&self) -> Arc<RwLock<hopr_path::channel_graph::ChannelGraph>> {
84 self.channel_graph.clone()
85 }
86
87 #[tracing::instrument(level = "trace", skip(self))]
88 pub(crate) async fn resolve_path(
89 &self,
90 destination: PeerId,
91 options: RoutingOptions,
92 ) -> crate::errors::Result<TransportPath> {
93 let path = match options {
94 RoutingOptions::IntermediatePath(path) => {
95 let complete_path = Vec::from_iter(path.into_iter().chain([destination]));
96 trace!(full_path = ?complete_path, "resolved a specific path");
97
98 let cg = self.channel_graph.read().await;
99
100 TransportPath::resolve(complete_path, &self.db, &cg)
101 .await
102 .map(|(p, _)| p)?
103 }
104 RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
105 trace!(hops = 0, %destination, "resolved zero-hop path");
106 TransportPath::direct(destination)
107 }
108 RoutingOptions::Hops(hops) => {
109 trace!(%hops, "resolved path using hop count");
110
111 let pk = OffchainPublicKey::try_from(destination)?;
112
113 if let Some(chain_key) = self
114 .db
115 .translate_key(None, pk)
116 .await
117 .map_err(hopr_db_sql::api::errors::DbError::from)?
118 {
119 let target_chain_key: Address = chain_key.try_into()?;
120 let cp = self
121 .selector
122 .select_path(self.me, target_chain_key, hops.into(), hops.into())
123 .await?;
124
125 let full_path = cp.into_path(&self.db, target_chain_key).await?;
126 trace!(%full_path, "resolved automatic path");
127
128 full_path
129 } else {
130 return Err(HoprTransportError::Api(
131 "send msg: unknown destination peer id encountered".to_owned(),
132 ));
133 }
134 }
135 };
136
137 #[cfg(all(feature = "prometheus", not(test)))]
138 {
139 use hopr_path::path::Path;
140 hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.hops().len() - 1) as f64);
141 }
142
143 Ok(path)
144 }
145}
146
147#[derive(Clone)]
148pub(crate) struct MessageSender<T, S> {
149 pub process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
150 pub resolver: PathPlanner<T, S>,
151}
152
153impl<T, S> MessageSender<T, S>
154where
155 T: HoprDbAllOperations + std::fmt::Debug + Send + Sync + 'static,
156 S: PathSelector + Send + Sync,
157{
158 pub fn new(
159 process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
160 resolver: PathPlanner<T, S>,
161 ) -> Self {
162 Self {
163 process_packet_send,
164 resolver,
165 }
166 }
167}
168
169#[async_trait::async_trait]
170impl<T, S> SendMsg for MessageSender<T, S>
171where
172 T: HoprDbAllOperations + std::fmt::Debug + Send + Sync + 'static,
173 S: PathSelector + Send + Sync,
174{
175 #[tracing::instrument(level = "debug", skip(self, data))]
176 async fn send_message(
177 &self,
178 data: ApplicationData,
179 destination: PeerId,
180 options: RoutingOptions,
181 ) -> std::result::Result<(), TransportSessionError> {
182 data.application_tag
183 .is_some_and(|application_tag| application_tag < RESERVED_SESSION_TAG_UPPER_LIMIT)
184 .then_some(())
185 .ok_or(TransportSessionError::Tag)?;
186
187 let path = self
188 .resolver
189 .resolve_path(destination, options)
190 .await
191 .map_err(|_| TransportSessionError::Path)?;
192
193 self.process_packet_send
194 .get()
195 .ok_or_else(|| SessionManagerError::NotStarted)?
196 .send_packet(data, path)
197 .await
198 .map_err(|_| TransportSessionError::Closed)?
199 .consume_and_wait(crate::constants::PACKET_QUEUE_TIMEOUT_MILLISECONDS)
200 .await
201 .map_err(|_e| TransportSessionError::Timeout)?;
202
203 trace!("Packet sent to the outgoing queue");
204
205 Ok(())
206 }
207}