1use std::sync::Arc;
2
3use async_lock::RwLock;
4use futures::{StreamExt, TryStreamExt, channel::mpsc::Sender, stream::FuturesUnordered};
5use hopr_chain_types::chain_events::NetworkRegistryStatus;
6use hopr_crypto_packet::prelude::HoprPacket;
7use hopr_crypto_types::crypto_traits::Randomizable;
8use hopr_db_sql::{HoprDbAllOperations, prelude::FoundSurb};
9use hopr_internal_types::prelude::*;
10use hopr_network_types::{
11 prelude::{ResolvedTransportRouting, RoutingOptions},
12 types::DestinationRouting,
13};
14use hopr_path::{ChainPath, PathAddressResolver, ValidatedPath, selectors::PathSelector};
15use hopr_primitive_types::{prelude::HoprBalance, primitives::Address};
16use hopr_protocol_app::v1::{ApplicationData, ApplicationFlag, ApplicationFlags};
17use hopr_transport_protocol::processor::{MsgSender, SendMsgInput};
18use tracing::trace;
19
20use crate::constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
21
22#[cfg(all(feature = "prometheus", not(test)))]
23lazy_static::lazy_static! {
24 static ref METRIC_PATH_LENGTH: hopr_metrics::metrics::SimpleHistogram = hopr_metrics::metrics::SimpleHistogram::new(
25 "hopr_path_length",
26 "Distribution of number of hops of sent messages",
27 vec![0.0, 1.0, 2.0, 3.0, 4.0]
28 ).unwrap();
29}
30
31#[derive(Debug, Copy, Clone, PartialEq, Eq)]
32pub enum PeerEligibility {
33 Eligible,
34 Ineligible,
35}
36
37impl From<NetworkRegistryStatus> for PeerEligibility {
38 fn from(value: NetworkRegistryStatus) -> Self {
39 match value {
40 NetworkRegistryStatus::Allowed => Self::Eligible,
41 NetworkRegistryStatus::Denied => Self::Ineligible,
42 }
43 }
44}
45
46#[derive(Debug, Copy, Clone, PartialEq)]
48pub struct TicketStatistics {
49 pub winning_count: u128,
50 pub unredeemed_value: HoprBalance,
51 pub redeemed_value: HoprBalance,
52 pub neglected_value: HoprBalance,
53 pub rejected_value: HoprBalance,
54}
55
56#[derive(Clone)]
57pub(crate) struct PathPlanner<T, S> {
58 db: T,
59 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
60 selector: S,
61 me: Address,
62}
63
64const DEFAULT_PACKET_PLANNER_CONCURRENCY: usize = 10;
65
66impl<T, S> PathPlanner<T, S>
67where
68 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Send + Sync + 'static,
69 S: PathSelector + Send + Sync,
70{
71 pub(crate) fn new(
72 me: Address,
73 db: T,
74 selector: S,
75 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
76 ) -> Self {
77 Self {
78 db,
79 channel_graph,
80 selector,
81 me,
82 }
83 }
84
85 pub(crate) fn channel_graph(&self) -> Arc<RwLock<hopr_path::channel_graph::ChannelGraph>> {
86 self.channel_graph.clone()
87 }
88
89 #[tracing::instrument(level = "trace", skip(self))]
90 async fn resolve_path(
91 &self,
92 source: Address,
93 destination: Address,
94 options: RoutingOptions,
95 ) -> crate::errors::Result<ValidatedPath> {
96 let cg = self.channel_graph.read_arc().await;
97 let path = match options {
98 RoutingOptions::IntermediatePath(path) => {
99 trace!(?path, "resolving a specific path");
100
101 ValidatedPath::new(
102 source,
103 ChainPath::new(path.into_iter().chain(std::iter::once(destination)))?,
104 &cg,
105 &self.db,
106 )
107 .await?
108 }
109 RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
110 trace!(hops = 0, "resolving zero-hop path");
111
112 ValidatedPath::new(source, ChainPath::direct(destination), &cg, &self.db).await?
113 }
114 RoutingOptions::Hops(hops) => {
115 trace!(%hops, "resolving path using hop count");
116
117 let cp = self
118 .selector
119 .select_path(source, destination, hops.into(), hops.into())
120 .await?;
121
122 ValidatedPath::new(source, ChainPath::from_channel_path(cp, destination), &cg, &self.db).await?
123 }
124 };
125
126 #[cfg(all(feature = "prometheus", not(test)))]
127 {
128 use hopr_path::Path;
129 hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
130 }
131
132 trace!(%path, "validated resolved path");
133
134 Ok(path)
135 }
136
137 #[tracing::instrument(level = "trace", skip(self))]
138 pub(crate) async fn resolve_routing(
139 &self,
140 size_hint: usize,
141 routing: DestinationRouting,
142 ) -> crate::errors::Result<(ResolvedTransportRouting, Option<usize>)> {
143 match routing {
144 DestinationRouting::Forward {
145 destination,
146 pseudonym,
147 forward_options,
148 return_options,
149 } => {
150 let forward_path = self.resolve_path(self.me, destination, forward_options).await?;
151
152 let return_paths = if let Some(return_options) = return_options {
153 let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint);
154 trace!(%destination, %num_possible_surbs, data_len = size_hint, "resolving packet return paths");
155
156 (0..num_possible_surbs)
157 .map(|_| self.resolve_path(destination, self.me, return_options.clone()))
158 .collect::<FuturesUnordered<_>>()
159 .try_collect::<Vec<ValidatedPath>>()
160 .await?
161 } else {
162 vec![]
163 };
164
165 trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
166
167 Ok((
168 ResolvedTransportRouting::Forward {
169 pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
170 forward_path,
171 return_paths,
172 },
173 None,
174 ))
175 }
176 DestinationRouting::Return(matcher) => {
177 let FoundSurb {
178 sender_id,
179 surb,
180 remaining,
181 } = self.db.find_surb(matcher).await?;
182 Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
183 }
184 }
185 }
186}
187
188pub(crate) fn run_packet_planner<T, S>(
194 planner: PathPlanner<T, S>,
195 packet_sender: MsgSender<Sender<SendMsgInput>>,
196) -> Sender<(DestinationRouting, ApplicationData)>
197where
198 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
199 S: PathSelector + Clone + Send + Sync + 'static,
200{
201 let (tx, rx) =
202 futures::channel::mpsc::channel::<(DestinationRouting, ApplicationData)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
203
204 let planner_concurrency = std::env::var("HOPR_PACKET_PLANNER_CONCURRENCY")
205 .ok()
206 .and_then(|v| v.parse().ok())
207 .unwrap_or(DEFAULT_PACKET_PLANNER_CONCURRENCY);
208
209 let distress_threshold = planner.db.get_surb_config().distress_threshold;
210 hopr_async_runtime::prelude::spawn(rx.for_each_concurrent(planner_concurrency, move |(routing, data)| {
211 let planner = planner.clone();
212 let packet_sender = packet_sender.clone();
213 async move {
214 match planner.resolve_routing(data.len(), routing).await {
215 Ok((resolved, rem_surbs)) => {
216 let flags: ApplicationFlags = match rem_surbs {
219 Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
220 ApplicationFlag::SurbDistress.into()
221 }
222 Some(0) => ApplicationFlag::OutOfSurbs.into(),
223 _ => data.flags - (ApplicationFlag::OutOfSurbs | ApplicationFlag::SurbDistress),
224 };
225
226 if let Err(error) = packet_sender.send_packet(data.with_flags(flags), resolved).await {
229 tracing::error!(%error, "failed to enqueue packet for sending");
230 }
231 }
232 Err(error) => tracing::error!(%error, "failed to resolve path for routing"),
233 }
234 }
235 }));
236
237 tx
238}