1use std::sync::{Arc, OnceLock};
2
3use async_lock::RwLock;
4use futures::{TryStreamExt, channel::mpsc::Sender, stream::FuturesUnordered};
5use hopr_chain_types::chain_events::NetworkRegistryStatus;
6use hopr_crypto_packet::prelude::HoprPacket;
7use hopr_crypto_types::crypto_traits::Randomizable;
8use hopr_db_sql::{HoprDbAllOperations, api::prelude::DbError};
9use hopr_internal_types::prelude::*;
10use hopr_network_types::{
11 prelude::{ResolvedTransportRouting, RoutingOptions},
12 types::DestinationRouting,
13};
14use hopr_path::{ChainPath, PathAddressResolver, ValidatedPath, selectors::PathSelector};
15use hopr_primitive_types::{prelude::HoprBalance, primitives::Address};
16use hopr_transport_packet::prelude::ApplicationData;
17use hopr_transport_protocol::processor::{MsgSender, SendMsgInput};
18use hopr_transport_session::{
19 errors::{SessionManagerError, TransportSessionError},
20 traits::SendMsg,
21};
22use tracing::trace;
23
24use crate::errors::HoprTransportError;
25
26#[cfg(all(feature = "prometheus", not(test)))]
27lazy_static::lazy_static! {
28 static ref METRIC_PATH_LENGTH: hopr_metrics::metrics::SimpleHistogram = hopr_metrics::metrics::SimpleHistogram::new(
29 "hopr_path_length",
30 "Distribution of number of hops of sent messages",
31 vec![0.0, 1.0, 2.0, 3.0, 4.0]
32 ).unwrap();
33}
34
35#[derive(Debug, Copy, Clone, PartialEq, Eq)]
36pub enum PeerEligibility {
37 Eligible,
38 Ineligible,
39}
40
41impl From<NetworkRegistryStatus> for PeerEligibility {
42 fn from(value: NetworkRegistryStatus) -> Self {
43 match value {
44 NetworkRegistryStatus::Allowed => Self::Eligible,
45 NetworkRegistryStatus::Denied => Self::Ineligible,
46 }
47 }
48}
49
50#[derive(Debug, Copy, Clone, PartialEq)]
52pub struct TicketStatistics {
53 pub winning_count: u128,
54 pub unredeemed_value: HoprBalance,
55 pub redeemed_value: HoprBalance,
56 pub neglected_value: HoprBalance,
57 pub rejected_value: HoprBalance,
58}
59
60#[derive(Clone)]
61pub(crate) struct PathPlanner<T, S> {
62 db: T,
63 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
64 selector: S,
65 me: Address,
66}
67
68impl<T, S> PathPlanner<T, S>
69where
70 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Send + Sync + 'static,
71 S: PathSelector + Send + Sync,
72{
73 pub(crate) fn new(
74 me: Address,
75 db: T,
76 selector: S,
77 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
78 ) -> Self {
79 Self {
80 db,
81 channel_graph,
82 selector,
83 me,
84 }
85 }
86
87 pub(crate) fn channel_graph(&self) -> Arc<RwLock<hopr_path::channel_graph::ChannelGraph>> {
88 self.channel_graph.clone()
89 }
90
91 #[tracing::instrument(level = "trace", skip(self))]
92 async fn resolve_path(
93 &self,
94 source: Address,
95 destination: Address,
96 options: RoutingOptions,
97 ) -> crate::errors::Result<ValidatedPath> {
98 let cg = self.channel_graph.read_arc().await;
99 let path = match options {
100 RoutingOptions::IntermediatePath(path) => {
101 trace!(?path, "resolving a specific path");
102
103 ValidatedPath::new(
104 source,
105 ChainPath::new(path.into_iter().chain(std::iter::once(destination)))?,
106 &cg,
107 &self.db,
108 )
109 .await?
110 }
111 RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
112 trace!(hops = 0, "resolving zero-hop path");
113
114 ValidatedPath::new(source, ChainPath::direct(destination), &cg, &self.db).await?
115 }
116 RoutingOptions::Hops(hops) => {
117 trace!(%hops, "resolving path using hop count");
118
119 let cp = self
120 .selector
121 .select_path(source, destination, hops.into(), hops.into())
122 .await?;
123
124 ValidatedPath::new(source, ChainPath::from_channel_path(cp, destination), &cg, &self.db).await?
125 }
126 };
127
128 #[cfg(all(feature = "prometheus", not(test)))]
129 {
130 use hopr_path::Path;
131 hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
132 }
133
134 trace!(%path, "validated resolved path");
135
136 Ok(path)
137 }
138
139 #[tracing::instrument(level = "trace", skip(self))]
140 pub(crate) async fn resolve_routing(
141 &self,
142 size_hint: usize,
143 routing: DestinationRouting,
144 ) -> crate::errors::Result<ResolvedTransportRouting> {
145 match routing {
146 DestinationRouting::Forward {
147 destination,
148 pseudonym,
149 forward_options,
150 return_options,
151 } => {
152 let forward_path = self.resolve_path(self.me, destination, forward_options).await?;
153
154 let return_paths = if let Some(return_options) = return_options {
155 let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint);
156 trace!(%destination, %num_possible_surbs, data_len = size_hint, "resolving packet return paths");
157
158 (0..num_possible_surbs)
159 .map(|_| self.resolve_path(destination, self.me, return_options.clone()))
160 .collect::<FuturesUnordered<_>>()
161 .try_collect::<Vec<ValidatedPath>>()
162 .await?
163 } else {
164 vec![]
165 };
166
167 trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
168
169 Ok(ResolvedTransportRouting::Forward {
170 pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
171 forward_path,
172 return_paths,
173 })
174 }
175 DestinationRouting::Return(matcher) => {
176 let (sender_id, surb) = self.db.find_surb(matcher).await?;
177 Ok(ResolvedTransportRouting::Return(sender_id, surb))
178 }
179 }
180 }
181}
182
183#[derive(Clone)]
184pub(crate) struct MessageSender<T, S> {
185 pub process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
186 pub resolver: PathPlanner<T, S>,
187}
188
189impl<T, S> MessageSender<T, S>
190where
191 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Send + Sync + 'static,
192 S: PathSelector + Send + Sync,
193{
194 pub fn new(
195 process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
196 resolver: PathPlanner<T, S>,
197 ) -> Self {
198 Self {
199 process_packet_send,
200 resolver,
201 }
202 }
203}
204
205#[async_trait::async_trait]
206impl<T, S> SendMsg for MessageSender<T, S>
207where
208 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Send + Sync + 'static,
209 S: PathSelector + Send + Sync,
210{
211 #[tracing::instrument(level = "debug", skip(self, data))]
212 async fn send_message(
213 &self,
214 data: ApplicationData,
215 routing: DestinationRouting,
216 ) -> std::result::Result<(), TransportSessionError> {
217 let routing = self
218 .resolver
219 .resolve_routing(data.len(), routing)
220 .await
221 .map_err(|error| {
222 tracing::error!(%error, "failed to resolve routing");
223 if let HoprTransportError::Db(DbError::NoSurbAvailable(_)) = error {
225 TransportSessionError::OutOfSurbs
226 } else {
227 TransportSessionError::Path
228 }
229 })?;
230
231 self.process_packet_send
232 .get()
233 .ok_or_else(|| SessionManagerError::NotStarted)?
234 .send_packet(data, routing)
235 .await
236 .map_err(|_| TransportSessionError::Closed)?
237 .consume_and_wait(crate::constants::PACKET_QUEUE_TIMEOUT_MILLISECONDS)
238 .await
239 .map_err(|error| {
240 tracing::error!(%error, "packet send error");
241 TransportSessionError::Timeout
242 })?;
243
244 trace!("packet sent to the outgoing queue");
245
246 Ok(())
247 }
248}