1use std::sync::Arc;
2
3use futures::StreamExt;
4use hopr_api::{
5 PeerId,
6 ct::{DestinationRouting, NetworkGraphView, TrafficGeneration},
7 network::{NetworkObservations, NetworkView},
8};
9use hopr_crypto_random::Randomizable;
10use hopr_crypto_types::types::OffchainPublicKey;
11use hopr_internal_types::protocol::HoprPseudonym;
12use hopr_network_types::types::RoutingOptions;
13use rand::seq::SliceRandom;
14#[cfg(feature = "serde")]
15use serde::{Deserialize, Serialize};
16use validator::Validate;
17
18#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate)]
20#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(deny_unknown_fields))]
21pub struct ProberConfig {
22 #[cfg_attr(
24 feature = "serde",
25 serde(default = "default_probing_interval", with = "humantime_serde")
26 )]
27 #[default(default_probing_interval())]
28 pub interval: std::time::Duration,
29
30 #[cfg_attr(
32 feature = "serde",
33 serde(default = "default_recheck_threshold", with = "humantime_serde")
34 )]
35 #[default(default_recheck_threshold())]
36 pub recheck_threshold: std::time::Duration,
37}
38
39const DEFAULT_REPEATED_PROBING_DELAY: std::time::Duration = std::time::Duration::from_secs(5);
41
42const DEFAULT_PROBE_RECHECK_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(60);
44
45#[inline]
46const fn default_probing_interval() -> std::time::Duration {
47 DEFAULT_REPEATED_PROBING_DELAY
48}
49
50#[inline]
51const fn default_recheck_threshold() -> std::time::Duration {
52 DEFAULT_PROBE_RECHECK_THRESHOLD
53}
54
55pub struct ImmediateNeighborProber {
56 cfg: ProberConfig,
57}
58
59impl ImmediateNeighborProber {
60 pub fn new(cfg: ProberConfig) -> Self {
61 Self { cfg }
62 }
63}
64
65impl TrafficGeneration for ImmediateNeighborProber {
66 fn build<U>(self, network_graph: U) -> impl futures::Stream<Item = DestinationRouting> + Send
67 where
68 U: NetworkGraphView + Send + Sync + 'static,
69 {
70 let cache_peer_routing: moka::future::Cache<PeerId, DestinationRouting> = moka::future::Cache::builder()
72 .time_to_live(std::time::Duration::from_secs(600))
73 .max_capacity(100_000)
74 .build();
75
76 let cfg = self.cfg;
77
78 futures::stream::repeat(())
79 .filter_map(move |_| {
80 let nodes = network_graph.nodes();
81
82 async move {
83 hopr_async_runtime::prelude::sleep(cfg.interval).await;
84 Some(nodes)
85 }
86 })
87 .flatten()
88 .filter_map(move |peer| {
89 let cache_peer_routing = cache_peer_routing.clone();
90
91 async move {
92 cache_peer_routing
93 .try_get_with(peer, async move {
94 Ok::<DestinationRouting, anyhow::Error>(DestinationRouting::Forward {
95 destination: Box::new(OffchainPublicKey::from_peerid(&peer)?.into()),
96 pseudonym: Some(HoprPseudonym::random()),
97 forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
98 return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
99 })
100 })
101 .await
102 .ok()
103 }
104 })
105 }
106}
107
108#[derive(Clone)]
109pub struct ImmediateNeighborChannelGraph<T> {
110 network: Arc<T>,
111 recheck_threshold: std::time::Duration,
112}
113
114impl<T> ImmediateNeighborChannelGraph<T> {
115 pub fn new(network: T, recheck_threshold: std::time::Duration) -> Self {
116 Self {
117 network: Arc::new(network),
118 recheck_threshold,
119 }
120 }
121}
122
123#[async_trait::async_trait]
124impl<T> hopr_api::ct::NetworkGraphUpdate for ImmediateNeighborChannelGraph<T>
125where
126 T: NetworkObservations + Send + Sync,
127{
128 async fn record<N, P>(
129 &self,
130 telemetry: std::result::Result<hopr_api::ct::Telemetry<N, P>, hopr_api::ct::TrafficGenerationError<P>>,
131 ) where
132 N: hopr_api::ct::MeasurableNeighbor + Send + Clone,
133 P: hopr_api::ct::MeasurablePath + Send + Clone,
134 {
135 match telemetry {
136 Ok(hopr_api::ct::Telemetry::Neighbor(telemetry)) => {
137 tracing::trace!(
138 peer = %telemetry.peer(),
139 latency_ms = telemetry.rtt().as_millis(),
140 "neighbor probe successful"
141 );
142 hopr_api::network::NetworkObservations::update(
143 self.network.as_ref(),
144 telemetry.peer(),
145 Ok(telemetry.rtt() / 2),
146 );
147 }
148 Ok(hopr_api::ct::Telemetry::Loopback(_)) => {
149 tracing::warn!(
150 reason = "feature not implemented",
151 "loopback path telemetry not supported yet"
152 );
153 }
154 Err(hopr_api::ct::TrafficGenerationError::ProbeNeighborTimeout(peer)) => {
155 hopr_api::network::NetworkObservations::update(self.network.as_ref(), &peer, Err(()));
156 }
157 Err(hopr_api::ct::TrafficGenerationError::ProbeLoopbackTimeout(_)) => {
158 tracing::warn!(
159 reason = "feature not implemented",
160 "loopback path telemetry not supported yet"
161 );
162 }
163 }
164 }
165}
166
167#[async_trait::async_trait]
168impl<T> hopr_api::ct::NetworkGraphView for ImmediateNeighborChannelGraph<T>
169where
170 T: NetworkView + Send + Sync + Clone + 'static,
171{
172 fn nodes(&self) -> futures::stream::BoxStream<'static, PeerId> {
173 let fetcher = self.network.clone();
174 let _recheck_threshold = self.recheck_threshold; Box::pin(async_stream::stream! {
177 let mut peers: Vec<PeerId> = fetcher.discovered_peers().into_iter().collect();
178 peers.shuffle(&mut hopr_crypto_random::rng()); for peer in peers {
181 yield peer;
182 }
183 })
184 }
185
186 async fn find_routes(&self, _destination: &PeerId, _length: usize) -> Vec<DestinationRouting> {
187 vec![]
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use std::collections::HashSet;
194
195 use futures::{StreamExt, pin_mut};
196 use hopr_api::{
197 Multiaddr,
198 network::{Health, Observable},
199 };
200 use hopr_internal_types::NodeId;
201 use tokio::time::timeout;
202
203 use super::*;
204
205 const TINY_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(20);
206
207 mockall::mock! {
208 Observed {}
209
210 impl Observable for Observed {
211 fn record_probe(&mut self, latency: std::result::Result<std::time::Duration, ()>);
212
213 fn last_update(&self) -> std::time::Duration;
214
215 fn average_latency(&self) -> Option<std::time::Duration>;
216
217 fn average_probe_rate(&self) -> f64;
218
219 fn score(&self) -> f64;
220 }
221 }
222
223 mockall::mock! {
224 ScanInteraction {}
225
226 #[async_trait::async_trait]
227 impl hopr_api::network::NetworkObservations for ScanInteraction {
228 fn update(&self, peer: &PeerId, result: std::result::Result<std::time::Duration, ()>);
229 }
230
231 #[async_trait::async_trait]
232 impl hopr_api::network::NetworkView for ScanInteraction {
233 fn listening_as(&self) -> HashSet<Multiaddr>;
234
235 fn multiaddress_of(&self, peer: &PeerId) -> Option<HashSet<Multiaddr>>;
236
237 fn discovered_peers(&self) -> std::collections::HashSet<PeerId> ;
238
239 fn connected_peers(&self) -> HashSet<PeerId>;
240
241 #[allow(refining_impl_trait)]
242 fn observations_for(&self, peer: &PeerId) -> Option<MockObserved>;
243
244 fn health(&self) -> Health;
245 }
246
247 impl Clone for ScanInteraction {
248 fn clone(&self) -> Self;
249 }
250 }
251
252 #[tokio::test]
253 async fn peers_should_not_be_passed_if_none_are_present() -> anyhow::Result<()> {
254 let mut fetcher = MockScanInteraction::new();
255 fetcher.expect_discovered_peers().returning(HashSet::new);
256
257 let channel_graph = ImmediateNeighborChannelGraph {
258 network: Arc::new(fetcher),
259 recheck_threshold: ProberConfig::default().recheck_threshold,
260 };
261
262 let prober = ImmediateNeighborProber::new(Default::default());
263 let stream = prober.build(channel_graph);
264 pin_mut!(stream);
265
266 assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
267
268 Ok(())
269 }
270
271 lazy_static::lazy_static! {
272 static ref RANDOM_PEERS: HashSet<PeerId> = (1..10).map(|_| {
273 let peer: PeerId = OffchainPublicKey::from_privkey(&hopr_crypto_random::random_bytes::<32>()).unwrap().into();
274 peer
275 }).collect::<HashSet<_>>();
276 }
277
278 #[tokio::test]
279 async fn peers_should_have_randomized_order() -> anyhow::Result<()> {
280 let mut fetcher = MockScanInteraction::new();
281 fetcher.expect_discovered_peers().returning(|| RANDOM_PEERS.clone());
282
283 let channel_graph = ImmediateNeighborChannelGraph {
284 network: Arc::new(fetcher),
285 recheck_threshold: ProberConfig::default().recheck_threshold,
286 };
287
288 let prober = ImmediateNeighborProber::new(ProberConfig {
289 interval: std::time::Duration::from_millis(1),
290 ..Default::default()
291 });
292 let stream = prober.build(channel_graph);
293 pin_mut!(stream);
294
295 let actual = timeout(
296 TINY_TIMEOUT * 20,
297 stream
298 .take(RANDOM_PEERS.len())
299 .map(|routing| match routing {
300 DestinationRouting::Forward { destination, .. } => {
301 if let NodeId::Offchain(peer_key) = destination.as_ref() {
302 PeerId::from(peer_key)
303 } else {
304 panic!("expected offchain destination, got chain address");
305 }
306 }
307 _ => panic!("expected Forward routing"),
308 })
309 .collect::<Vec<_>>(),
310 )
311 .await?;
312
313 assert_eq!(actual.len(), RANDOM_PEERS.len());
314 assert!(!actual.iter().zip(RANDOM_PEERS.iter()).all(|(a, b)| a == b));
315
316 Ok(())
317 }
318
319 #[tokio::test]
320 async fn peers_should_be_generated_in_multiple_rounds_as_long_as_they_are_available() -> anyhow::Result<()> {
321 let cfg = ProberConfig {
322 interval: std::time::Duration::from_millis(1),
323 recheck_threshold: std::time::Duration::from_millis(1000),
324 };
325
326 let mut fetcher = MockScanInteraction::new();
327 fetcher.expect_discovered_peers().times(2).returning(|| {
328 let peer: PeerId = OffchainPublicKey::from_privkey(&hopr_crypto_random::random_bytes::<32>())
329 .unwrap()
330 .into();
331 HashSet::from([peer])
332 });
333 fetcher.expect_discovered_peers().returning(HashSet::new);
334
335 let channel_graph = ImmediateNeighborChannelGraph {
336 network: Arc::new(fetcher),
337 recheck_threshold: cfg.recheck_threshold,
338 };
339
340 let prober = ImmediateNeighborProber::new(cfg);
341 let stream = prober.build(channel_graph);
342 pin_mut!(stream);
343
344 assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
345 assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
346 assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
347
348 Ok(())
349 }
350}