1use std::sync::Arc;
2
3use async_lock::RwLock;
4use futures::{FutureExt, StreamExt, TryStreamExt, channel::mpsc::Sender, stream::FuturesUnordered};
5use hopr_chain_types::chain_events::NetworkRegistryStatus;
6use hopr_crypto_packet::prelude::*;
7use hopr_crypto_types::crypto_traits::Randomizable;
8use hopr_db_sql::{HoprDbAllOperations, prelude::FoundSurb};
9use hopr_internal_types::prelude::*;
10use hopr_network_types::prelude::*;
11use hopr_path::{ChainPath, PathAddressResolver, ValidatedPath, selectors::PathSelector};
12use hopr_primitive_types::prelude::*;
13use hopr_protocol_app::prelude::*;
14use hopr_transport_protocol::processor::{MsgSender, SendMsgInput};
15use tracing::trace;
16
17use crate::constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21 static ref METRIC_PATH_LENGTH: hopr_metrics::metrics::SimpleHistogram = hopr_metrics::metrics::SimpleHistogram::new(
22 "hopr_path_length",
23 "Distribution of number of hops of sent messages",
24 vec![0.0, 1.0, 2.0, 3.0, 4.0]
25 ).unwrap();
26}
27
28#[derive(Debug, Copy, Clone, PartialEq, Eq)]
29pub enum PeerEligibility {
30 Eligible,
31 Ineligible,
32}
33
34impl From<NetworkRegistryStatus> for PeerEligibility {
35 fn from(value: NetworkRegistryStatus) -> Self {
36 match value {
37 NetworkRegistryStatus::Allowed => Self::Eligible,
38 NetworkRegistryStatus::Denied => Self::Ineligible,
39 }
40 }
41}
42
43#[derive(Debug, Copy, Clone, PartialEq)]
45pub struct TicketStatistics {
46 pub winning_count: u128,
47 pub unredeemed_value: HoprBalance,
48 pub redeemed_value: HoprBalance,
49 pub neglected_value: HoprBalance,
50 pub rejected_value: HoprBalance,
51}
52
53#[derive(Clone)]
54pub(crate) struct PathPlanner<T, S> {
55 db: T,
56 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
57 selector: S,
58 me: Address,
59}
60
61const DEFAULT_PACKET_PLANNER_CONCURRENCY: usize = 10;
62
63impl<T, S> PathPlanner<T, S>
64where
65 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Send + Sync + 'static,
66 S: PathSelector + Send + Sync,
67{
68 pub(crate) fn new(
69 me: Address,
70 db: T,
71 selector: S,
72 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
73 ) -> Self {
74 Self {
75 db,
76 channel_graph,
77 selector,
78 me,
79 }
80 }
81
82 pub(crate) fn channel_graph(&self) -> Arc<RwLock<hopr_path::channel_graph::ChannelGraph>> {
83 self.channel_graph.clone()
84 }
85
86 #[tracing::instrument(level = "trace", skip(self))]
87 async fn resolve_path(
88 &self,
89 source: Address,
90 destination: Address,
91 options: RoutingOptions,
92 ) -> crate::errors::Result<ValidatedPath> {
93 let cg = self.channel_graph.read_arc().await;
94 let path = match options {
95 RoutingOptions::IntermediatePath(path) => {
96 trace!(?path, "resolving a specific path");
97
98 ValidatedPath::new(
99 source,
100 ChainPath::new(path.into_iter().chain(std::iter::once(destination)))?,
101 &cg,
102 &self.db,
103 )
104 .await?
105 }
106 RoutingOptions::Hops(hops) if u32::from(hops) == 0 => {
107 trace!(hops = 0, "resolving zero-hop path");
108
109 ValidatedPath::new(source, ChainPath::direct(destination), &cg, &self.db).await?
110 }
111 RoutingOptions::Hops(hops) => {
112 trace!(%hops, "resolving path using hop count");
113
114 let cp = self
115 .selector
116 .select_path(source, destination, hops.into(), hops.into())
117 .await?;
118
119 ValidatedPath::new(source, ChainPath::from_channel_path(cp, destination), &cg, &self.db).await?
120 }
121 };
122
123 #[cfg(all(feature = "prometheus", not(test)))]
124 {
125 use hopr_path::Path;
126 hopr_metrics::SimpleHistogram::observe(&METRIC_PATH_LENGTH, (path.num_hops() - 1) as f64);
127 }
128
129 trace!(%path, "validated resolved path");
130
131 Ok(path)
132 }
133
134 #[tracing::instrument(level = "trace", skip(self))]
135 pub(crate) async fn resolve_routing(
136 &self,
137 size_hint: usize,
138 max_surbs: usize,
139 routing: DestinationRouting,
140 ) -> crate::errors::Result<(ResolvedTransportRouting, Option<usize>)> {
141 match routing {
142 DestinationRouting::Forward {
143 destination,
144 pseudonym,
145 forward_options,
146 return_options,
147 } => {
148 let forward_path = self.resolve_path(self.me, destination, forward_options).await?;
149
150 let return_paths = if let Some(return_options) = return_options {
151 let num_possible_surbs = HoprPacket::max_surbs_with_message(size_hint).min(max_surbs);
153 trace!(%destination, %num_possible_surbs, data_len = size_hint, max_surbs, "resolving packet return paths");
154
155 (0..num_possible_surbs)
156 .map(|_| self.resolve_path(destination, self.me, return_options.clone()))
157 .collect::<FuturesUnordered<_>>()
158 .try_collect::<Vec<ValidatedPath>>()
159 .await?
160 } else {
161 vec![]
162 };
163
164 trace!(%destination, num_surbs = return_paths.len(), data_len = size_hint, "resolved packet");
165
166 Ok((
167 ResolvedTransportRouting::Forward {
168 pseudonym: pseudonym.unwrap_or_else(HoprPseudonym::random),
169 forward_path,
170 return_paths,
171 },
172 None,
173 ))
174 }
175 DestinationRouting::Return(matcher) => {
176 let FoundSurb {
177 sender_id,
178 surb,
179 remaining,
180 } = self.db.find_surb(matcher).await?;
181 Ok((ResolvedTransportRouting::Return(sender_id, surb), Some(remaining)))
182 }
183 }
184 }
185}
186
187pub(crate) fn run_packet_planner<T, S>(
195 planner: PathPlanner<T, S>,
196 packet_sender: MsgSender<Sender<SendMsgInput>>,
197) -> Sender<(DestinationRouting, ApplicationDataOut)>
198where
199 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
200 S: PathSelector + Clone + Send + Sync + 'static,
201{
202 let (tx, rx) =
203 futures::channel::mpsc::channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
204
205 let planner_concurrency = std::env::var("HOPR_PACKET_PLANNER_CONCURRENCY")
206 .ok()
207 .and_then(|v| v.parse().ok())
208 .unwrap_or(DEFAULT_PACKET_PLANNER_CONCURRENCY);
209
210 let distress_threshold = planner.db.get_surb_config().distress_threshold;
211 hopr_async_runtime::prelude::spawn(
212 rx.for_each_concurrent(planner_concurrency, move |(routing, mut data)| {
213 let planner = planner.clone();
214 let packet_sender = packet_sender.clone();
215 async move {
216 let max_surbs = data.estimate_surbs_with_msg();
217
218 match planner.resolve_routing(data.data.total_len(), max_surbs, routing).await {
219 Ok((resolved, rem_surbs)) => {
220 let mut signals_to_dst = data
224 .packet_info
225 .as_ref()
226 .map(|info| info.signals_to_destination)
227 .unwrap_or_default();
228
229 if resolved.is_return() {
230 signals_to_dst = match rem_surbs {
231 Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
232 signals_to_dst | PacketSignal::SurbDistress
233 }
234 Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
235 _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
236 };
237 } else {
238 signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
240 }
241
242 data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
243
244 if let Err(error) = packet_sender.send_packet(data, resolved).await {
247 tracing::error!(%error, "failed to enqueue packet for sending");
248 }
249 }
250 Err(error) => tracing::error!(%error, "failed to resolve path for routing"),
251 }
252 }
253 })
254 .inspect(|_| tracing::info!(task = "packet planner", "long-running background task finished")),
255 );
256
257 tx
258}