1use std::sync::Arc;
2
3use futures::StreamExt;
4use hopr_api::{
5 PeerId,
6 ct::{DestinationRouting, NetworkGraphView, TrafficGeneration},
7 network::{NetworkObservations, NetworkView},
8};
9use hopr_crypto_random::Randomizable;
10use hopr_crypto_types::types::OffchainPublicKey;
11use hopr_internal_types::protocol::HoprPseudonym;
12use hopr_network_types::types::RoutingOptions;
13use rand::seq::SliceRandom;
14#[cfg(feature = "serde")]
15use serde::{Deserialize, Serialize};
16use validator::Validate;
17
18#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate)]
20#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(deny_unknown_fields))]
21pub struct ProberConfig {
22 #[cfg_attr(
24 feature = "serde",
25 serde(default = "default_probing_interval", with = "humantime_serde")
26 )]
27 #[default(default_probing_interval())]
28 pub interval: std::time::Duration,
29
30 #[cfg_attr(
32 feature = "serde",
33 serde(default = "default_recheck_threshold", with = "humantime_serde")
34 )]
35 #[default(default_recheck_threshold())]
36 pub recheck_threshold: std::time::Duration,
37}
38
39const DEFAULT_REPEATED_PROBING_DELAY: std::time::Duration = std::time::Duration::from_secs(5);
41
42const DEFAULT_PROBE_RECHECK_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(60);
44
45#[inline]
46const fn default_probing_interval() -> std::time::Duration {
47 DEFAULT_REPEATED_PROBING_DELAY
48}
49
50#[inline]
51const fn default_recheck_threshold() -> std::time::Duration {
52 DEFAULT_PROBE_RECHECK_THRESHOLD
53}
54
55pub struct ImmediateNeighborProber {
56 cfg: ProberConfig,
57}
58
59impl ImmediateNeighborProber {
60 pub fn new(cfg: ProberConfig) -> Self {
61 Self { cfg }
62 }
63}
64
65impl TrafficGeneration for ImmediateNeighborProber {
66 fn build<U>(self, network_graph: U) -> impl futures::Stream<Item = DestinationRouting> + Send
67 where
68 U: NetworkGraphView + Send + Sync + 'static,
69 {
70 let cache_peer_routing: moka::future::Cache<PeerId, DestinationRouting> = moka::future::Cache::builder()
72 .time_to_live(std::time::Duration::from_secs(600))
73 .max_capacity(100_000)
74 .build();
75
76 let cfg = self.cfg;
77
78 futures::stream::repeat(())
79 .filter_map(move |_| {
80 let nodes = network_graph.nodes();
81
82 async move {
83 hopr_async_runtime::prelude::sleep(cfg.interval).await;
84 Some(nodes)
85 }
86 })
87 .flatten()
88 .filter_map(move |peer| {
89 let cache_peer_routing = cache_peer_routing.clone();
90
91 async move {
92 cache_peer_routing
93 .try_get_with(peer, async move {
94 Ok::<DestinationRouting, anyhow::Error>(DestinationRouting::Forward {
95 destination: Box::new(OffchainPublicKey::from_peerid(&peer)?.into()),
96 pseudonym: Some(HoprPseudonym::random()),
97 forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
98 return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
99 })
100 })
101 .await
102 .ok()
103 }
104 })
105 }
106}
107
108#[derive(Clone)]
109pub struct ImmediateNeighborChannelGraph<T> {
110 network: Arc<T>,
111 recheck_threshold: std::time::Duration,
112}
113
114impl<T> ImmediateNeighborChannelGraph<T> {
115 pub fn new(network: T, recheck_threshold: std::time::Duration) -> Self {
116 Self {
117 network: Arc::new(network),
118 recheck_threshold,
119 }
120 }
121}
122
123#[async_trait::async_trait]
124impl<T> hopr_api::ct::NetworkGraphUpdate for ImmediateNeighborChannelGraph<T>
125where
126 T: NetworkObservations + Send + Sync,
127{
128 async fn record<N, P>(
129 &self,
130 telemetry: std::result::Result<hopr_api::ct::Telemetry<N, P>, hopr_api::ct::TrafficGenerationError<P>>,
131 ) where
132 N: hopr_api::ct::MeasurableNeighbor + Send + Clone,
133 P: hopr_api::ct::MeasurablePath + Send + Clone,
134 {
135 match telemetry {
136 Ok(hopr_api::ct::Telemetry::Neighbor(telemetry)) => {
137 tracing::trace!(
138 peer = %telemetry.peer(),
139 latency_ms = telemetry.rtt().as_millis(),
140 "neighbor probe successful"
141 );
142 hopr_api::network::NetworkObservations::update(
143 self.network.as_ref(),
144 telemetry.peer(),
145 Ok(telemetry.rtt() / 2),
146 );
147 }
148 Ok(hopr_api::ct::Telemetry::Loopback(_)) => {
149 tracing::warn!(
150 reason = "feature not implemented",
151 "loopback path telemetry not supported yet"
152 );
153 }
154 Err(hopr_api::ct::TrafficGenerationError::ProbeNeighborTimeout(peer)) => {
155 hopr_api::network::NetworkObservations::update(self.network.as_ref(), &peer, Err(()));
156 }
157 Err(hopr_api::ct::TrafficGenerationError::ProbeLoopbackTimeout(_)) => {
158 tracing::warn!(
159 reason = "feature not implemented",
160 "loopback path telemetry not supported yet"
161 );
162 }
163 }
164 }
165}
166
167#[async_trait::async_trait]
168impl<T> hopr_api::ct::NetworkGraphView for ImmediateNeighborChannelGraph<T>
169where
170 T: NetworkView + Send + Sync + Clone + 'static,
171{
172 fn nodes(&self) -> futures::stream::BoxStream<'static, PeerId> {
173 let fetcher = self.network.clone();
174 let _recheck_threshold = self.recheck_threshold; let mut rng = hopr_crypto_random::rng();
176
177 Box::pin(async_stream::stream! {
178 let mut peers: Vec<PeerId> = fetcher.discovered_peers().into_iter().collect();
179 peers.shuffle(&mut rng); for peer in peers {
182 yield peer;
183 }
184 })
185 }
186
187 async fn find_routes(&self, _destination: &PeerId, _length: usize) -> Vec<DestinationRouting> {
188 vec![]
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use std::collections::HashSet;
195
196 use futures::{StreamExt, pin_mut};
197 use hopr_api::{
198 Multiaddr,
199 network::{Health, Observable},
200 };
201 use hopr_internal_types::NodeId;
202 use tokio::time::timeout;
203
204 use super::*;
205
206 const TINY_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(20);
207
208 mockall::mock! {
209 Observed {}
210
211 impl Observable for Observed {
212 fn record_probe(&mut self, latency: std::result::Result<std::time::Duration, ()>);
213
214 fn last_update(&self) -> std::time::Duration;
215
216 fn average_latency(&self) -> Option<std::time::Duration>;
217
218 fn average_probe_rate(&self) -> f64;
219
220 fn score(&self) -> f64;
221 }
222 }
223
224 mockall::mock! {
225 ScanInteraction {}
226
227 #[async_trait::async_trait]
228 impl hopr_api::network::NetworkObservations for ScanInteraction {
229 fn update(&self, peer: &PeerId, result: std::result::Result<std::time::Duration, ()>);
230 }
231
232 #[async_trait::async_trait]
233 impl hopr_api::network::NetworkView for ScanInteraction {
234 fn listening_as(&self) -> HashSet<Multiaddr>;
235
236 fn multiaddress_of(&self, peer: &PeerId) -> Option<HashSet<Multiaddr>>;
237
238 fn discovered_peers(&self) -> std::collections::HashSet<PeerId> ;
239
240 fn connected_peers(&self) -> HashSet<PeerId>;
241
242 #[allow(refining_impl_trait)]
243 fn observations_for(&self, peer: &PeerId) -> Option<MockObserved>;
244
245 fn health(&self) -> Health;
246 }
247
248 impl Clone for ScanInteraction {
249 fn clone(&self) -> Self;
250 }
251 }
252
253 #[tokio::test]
254 async fn peers_should_not_be_passed_if_none_are_present() -> anyhow::Result<()> {
255 let mut fetcher = MockScanInteraction::new();
256 fetcher.expect_discovered_peers().returning(|| HashSet::new());
257
258 let channel_graph = ImmediateNeighborChannelGraph {
259 network: Arc::new(fetcher),
260 recheck_threshold: ProberConfig::default().recheck_threshold,
261 };
262
263 let prober = ImmediateNeighborProber::new(Default::default());
264 let stream = prober.build(channel_graph);
265 pin_mut!(stream);
266
267 assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
268
269 Ok(())
270 }
271
272 lazy_static::lazy_static! {
273 static ref RANDOM_PEERS: HashSet<PeerId> = (1..10).map(|_| {
274 let peer: PeerId = OffchainPublicKey::from_privkey(&hopr_crypto_random::random_bytes::<32>()).unwrap().into();
275 peer
276 }).collect::<HashSet<_>>();
277 }
278
279 #[tokio::test]
280 async fn peers_should_have_randomized_order() -> anyhow::Result<()> {
281 let mut fetcher = MockScanInteraction::new();
282 fetcher.expect_discovered_peers().returning(|| RANDOM_PEERS.clone());
283
284 let channel_graph = ImmediateNeighborChannelGraph {
285 network: Arc::new(fetcher),
286 recheck_threshold: ProberConfig::default().recheck_threshold,
287 };
288
289 let prober = ImmediateNeighborProber::new(ProberConfig {
290 interval: std::time::Duration::from_millis(1),
291 ..Default::default()
292 });
293 let stream = prober.build(channel_graph);
294 pin_mut!(stream);
295
296 let actual = timeout(
297 TINY_TIMEOUT * 20,
298 stream
299 .take(RANDOM_PEERS.len())
300 .map(|routing| match routing {
301 DestinationRouting::Forward { destination, .. } => {
302 if let NodeId::Offchain(peer_key) = destination.as_ref() {
303 PeerId::from(peer_key)
304 } else {
305 panic!("expected offchain destination, got chain address");
306 }
307 }
308 _ => panic!("expected Forward routing"),
309 })
310 .collect::<Vec<_>>(),
311 )
312 .await?;
313
314 assert_eq!(actual.len(), RANDOM_PEERS.len());
315 assert!(!actual.iter().zip(RANDOM_PEERS.iter()).all(|(a, b)| a == b));
316
317 Ok(())
318 }
319
320 #[tokio::test]
321 async fn peers_should_be_generated_in_multiple_rounds_as_long_as_they_are_available() -> anyhow::Result<()> {
322 let cfg = ProberConfig {
323 interval: std::time::Duration::from_millis(1),
324 recheck_threshold: std::time::Duration::from_millis(1000),
325 ..Default::default()
326 };
327
328 let mut fetcher = MockScanInteraction::new();
329 fetcher.expect_discovered_peers().times(2).returning(|| {
330 let peer: PeerId = OffchainPublicKey::from_privkey(&hopr_crypto_random::random_bytes::<32>())
331 .unwrap()
332 .into();
333 HashSet::from([peer])
334 });
335 fetcher.expect_discovered_peers().returning(|| HashSet::new());
336
337 let channel_graph = ImmediateNeighborChannelGraph {
338 network: Arc::new(fetcher),
339 recheck_threshold: cfg.recheck_threshold,
340 };
341
342 let prober = ImmediateNeighborProber::new(cfg);
343 let stream = prober.build(channel_graph);
344 pin_mut!(stream);
345
346 assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
347 assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
348 assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
349
350 Ok(())
351 }
352}