1use futures::{StreamExt, stream::BoxStream};
2use hopr_api::{
3 ct::{CoverTrafficGeneration, ProbeRouting, ProbingTrafficGeneration},
4 graph::{
5 NetworkGraphTraverse, NetworkGraphView,
6 traits::{EdgeNetworkObservableRead, EdgeObservableRead},
7 },
8 types::{
9 crypto::types::OffchainPublicKey,
10 crypto_random::Randomizable,
11 internal::{
12 NodeId,
13 protocol::HoprPseudonym,
14 routing::{DestinationRouting, PathId, RoutingOptions},
15 },
16 },
17};
18use hopr_statistics::WeightedCollection;
19
20use crate::{ProberConfig, priority::immediate_probe_priority};
21
22pub struct FullNetworkDiscovery<U> {
23 me: OffchainPublicKey,
24 cfg: ProberConfig,
25 graph: U,
26}
27
28impl<U> FullNetworkDiscovery<U> {
29 pub fn new(me: OffchainPublicKey, cfg: ProberConfig, graph: U) -> Self {
30 Self { me, cfg, graph }
31 }
32}
33
34fn strip_loopback_endpoints(mut path: Vec<OffchainPublicKey>, me: &OffchainPublicKey) -> Vec<OffchainPublicKey> {
40 if path.last() == Some(me) {
41 path.pop();
42 }
43 if path.first() == Some(me) {
44 path.remove(0);
45 }
46 path
47}
48
49fn loopback_routing(me: NodeId, path: Vec<OffchainPublicKey>) -> Option<DestinationRouting> {
51 let path: Vec<NodeId> = path.into_iter().map(NodeId::from).collect();
52 hopr_api::network::BoundedVec::try_from(path)
53 .ok()
54 .map(|path| DestinationRouting::Forward {
55 destination: Box::new(me),
56 pseudonym: Some(HoprPseudonym::random()),
57 forward_options: RoutingOptions::IntermediatePath(path),
58 return_options: None,
59 })
60}
61
62fn loopback_path_stream<U>(cfg: ProberConfig, graph: U) -> impl futures::Stream<Item = (Vec<OffchainPublicKey>, PathId)>
67where
68 U: NetworkGraphTraverse<NodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
69{
70 futures_time::stream::interval(futures_time::time::Duration::from(cfg.interval))
72 .flat_map(|_| futures::stream::iter([2usize, 3, 4]))
73 .filter_map(move |edge_count| futures::future::ready(std::num::NonZeroUsize::new(edge_count)))
74 .flat_map(move |edge_count| {
75 let paths = graph.simple_loopback_to_self(edge_count.get(), Some(100));
76
77 let count = paths.len();
78 tracing::debug!(edge_count = edge_count.get(), count, "loopback path candidates");
79 let weighted: Vec<_> = paths
80 .into_iter()
81 .map(|(path, path_id)| ((path, path_id), cfg.base_priority))
82 .collect();
83
84 futures::stream::iter(WeightedCollection::new(weighted).into_shuffled())
85 })
86}
87
88impl<U> CoverTrafficGeneration for FullNetworkDiscovery<U>
89where
90 U: NetworkGraphTraverse<NodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
91{
92 fn build(&self) -> BoxStream<'static, DestinationRouting> {
93 cfg_if::cfg_if! {
94 if #[cfg(feature = "noise")] {
95 let me = self.me;
96 let me_node: NodeId = me.into();
97
98 loopback_path_stream(self.cfg, self.graph.clone())
99 .filter_map(move |(path, _)| {
100 let intermediates = strip_loopback_endpoints(path, &me);
101 futures::future::ready(loopback_routing(me_node, intermediates))
102 })
103 .boxed()
104 } else {
105 Box::pin(futures::stream::empty())
106 }
107 }
108 }
109}
110
111impl<U> ProbingTrafficGeneration for FullNetworkDiscovery<U>
112where
113 U: NetworkGraphView<NodeId = OffchainPublicKey, Observed = hopr_network_graph::Observations>
114 + NetworkGraphTraverse<NodeId = OffchainPublicKey>
115 + Clone
116 + Send
117 + Sync
118 + 'static,
119{
120 fn build(&self) -> BoxStream<'static, ProbeRouting> {
121 let cfg = self.cfg;
122 let me = self.me;
123
124 let immediates = immediate_probe_stream(me, cfg, self.graph.clone());
125
126 let me_node: NodeId = me.into();
127 let intermediates = loopback_path_stream(cfg, self.graph.clone()).filter_map(move |(path, path_id)| {
128 let intermediates = strip_loopback_endpoints(path, &me);
129 let routing = loopback_routing(me_node, intermediates).map(|r| ProbeRouting::Looping((r, path_id)));
130 futures::future::ready(routing)
131 });
132
133 futures::stream::select(immediates, intermediates).boxed()
134 }
135}
136
137struct ShuffleCache {
139 probes: Vec<ProbeRouting>,
140 created_at: std::time::Instant,
141}
142
143fn immediate_probe_stream<U>(
155 me: OffchainPublicKey,
156 cfg: ProberConfig,
157 graph: U,
158) -> impl futures::Stream<Item = ProbeRouting>
159where
160 U: NetworkGraphView<NodeId = OffchainPublicKey, Observed = hopr_network_graph::Observations>
161 + Clone
162 + Send
163 + Sync
164 + 'static,
165{
166 let cache: Option<ShuffleCache> = None;
167
168 futures::stream::unfold(
169 (
170 cache,
171 futures_time::stream::interval(futures_time::time::Duration::from(cfg.interval)),
172 ),
173 move |(mut cache, mut ticker)| {
174 let graph = graph.clone();
175
176 async move {
177 use futures::StreamExt as _;
178 ticker.next().await?;
179
180 let needs_refresh = cache
182 .as_ref()
183 .is_none_or(|c| c.probes.is_empty() || c.created_at.elapsed() >= cfg.shuffle_ttl);
184
185 if needs_refresh {
186 let now = std::time::SystemTime::now()
187 .duration_since(std::time::UNIX_EPOCH)
188 .unwrap_or_default();
189
190 let weighted: Vec<_> = graph
191 .nodes()
192 .filter(|peer| futures::future::ready(peer != &me))
193 .filter_map(|peer| {
194 let obs = graph.edge(&me, &peer);
195 if cfg.probe_connected_only {
196 let connected = obs
197 .as_ref()
198 .and_then(|o| o.immediate_qos())
199 .map(|imm| imm.is_connected())
200 .unwrap_or(false);
201 if !connected {
202 return futures::future::ready(None);
203 }
204 }
205 let priority = match obs {
206 Some(obs) => immediate_probe_priority(obs.score(), obs.last_update(), now, &cfg),
207 None => immediate_probe_priority(0.0, std::time::Duration::ZERO, now, &cfg),
208 };
209 futures::future::ready(Some((peer, priority)))
210 })
211 .collect()
212 .await;
213
214 let peer_count = weighted.len();
215 let zero_hop = RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"));
216 let probes: Vec<_> = WeightedCollection::new(weighted)
217 .into_shuffled()
218 .into_iter()
219 .map(|peer| {
220 ProbeRouting::Neighbor(DestinationRouting::Forward {
221 destination: Box::new(peer.into()),
222 pseudonym: Some(HoprPseudonym::random()),
223 forward_options: zero_hop.clone(),
224 return_options: Some(zero_hop.clone()),
225 })
226 })
227 .collect();
228
229 tracing::debug!(peer_count, probes = probes.len(), "computed new neighbor probe shuffle");
230 cache = Some(ShuffleCache {
231 probes,
232 created_at: std::time::Instant::now(),
233 });
234 }
235
236 let batch = cache.as_ref().map(|c| c.probes.clone()).unwrap_or_default();
237 tracing::debug!(probes = batch.len(), "emitting neighbor probe batch");
238
239 Some((futures::stream::iter(batch), (cache, ticker)))
240 }
241 },
242 )
243 .flatten()
244}
245
246#[cfg(test)]
247mod tests {
248 use std::{collections::HashSet, sync::Arc};
249
250 use futures::{StreamExt, pin_mut};
251 use hopr_api::{
252 OffchainKeypair,
253 ct::{ProbeRouting, ProbingTrafficGeneration},
254 graph::{NetworkGraphUpdate, NetworkGraphWrite},
255 types::{crypto::keypairs::Keypair, internal::NodeId},
256 };
257 use hopr_network_graph::ChannelGraph;
258 use tokio::time::timeout;
259
260 use super::*;
261
262 const TINY_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(20);
263
264 fn fast_cfg() -> ProberConfig {
265 ProberConfig {
266 interval: std::time::Duration::from_millis(1),
267 shuffle_ttl: std::time::Duration::ZERO,
268 probe_connected_only: false,
269 ..Default::default()
270 }
271 }
272
273 fn random_key() -> OffchainPublicKey {
274 *OffchainKeypair::random().public()
275 }
276
277 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
278 struct Node {
279 pub id: OffchainPublicKey,
280 }
281
282 impl From<Node> for OffchainPublicKey {
283 fn from(node: Node) -> Self {
284 node.id
285 }
286 }
287
288 lazy_static::lazy_static! {
289 static ref RANDOM_PEERS: HashSet<Node> = (1..10).map(|_| {
290 Node {
291 id: OffchainPublicKey::from_privkey(&hopr_api::types::crypto_random::random_bytes::<32>()).unwrap(),
292 }
293 }).collect::<HashSet<_>>();
294 }
295
296 #[tokio::test]
297 async fn peers_should_not_be_passed_if_none_are_present() -> anyhow::Result<()> {
298 let me = random_key();
299 let prober = FullNetworkDiscovery::new(me, Default::default(), Arc::new(ChannelGraph::new(me)));
300 let stream = ProbingTrafficGeneration::build(&prober);
301 pin_mut!(stream);
302
303 assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
304 Ok(())
305 }
306
307 #[tokio::test]
308 async fn peers_should_have_randomized_order() -> anyhow::Result<()> {
309 let me = random_key();
310 let graph = Arc::new(ChannelGraph::new(me));
311 for node in RANDOM_PEERS.iter() {
312 graph.record_node(node.clone());
313 }
314
315 let peer_count = RANDOM_PEERS.len();
316 let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
317 let stream = ProbingTrafficGeneration::build(&prober);
318 pin_mut!(stream);
319
320 let extract_peer = |routing: ProbeRouting| -> OffchainPublicKey {
321 match routing {
322 ProbeRouting::Neighbor(DestinationRouting::Forward { destination, .. }) => {
323 if let NodeId::Offchain(peer_key) = destination.as_ref() {
324 *peer_key
325 } else {
326 panic!("expected offchain destination");
327 }
328 }
329 _ => panic!("expected Neighbor Forward routing"),
330 }
331 };
332
333 let both_rounds: Vec<OffchainPublicKey> = timeout(
335 TINY_TIMEOUT * 40,
336 stream.take(peer_count * 2).map(extract_peer).collect::<Vec<_>>(),
337 )
338 .await?;
339
340 let round_1 = &both_rounds[..peer_count];
341 let round_2 = &both_rounds[peer_count..];
342
343 let set_1: HashSet<_> = round_1.iter().collect();
345 let set_2: HashSet<_> = round_2.iter().collect();
346 assert_eq!(set_1, set_2, "both rounds should cover the same peers");
347
348 assert_ne!(round_1, round_2, "two rounds should differ in order (probabilistic)");
350 Ok(())
351 }
352
353 #[tokio::test]
354 async fn peers_should_be_generated_in_multiple_rounds() -> anyhow::Result<()> {
355 let me = random_key();
356 let graph = Arc::new(ChannelGraph::new(me));
357 graph.record_node(RANDOM_PEERS.iter().next().unwrap().clone());
358
359 let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
360 let stream = ProbingTrafficGeneration::build(&prober);
361 pin_mut!(stream);
362
363 assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
364 assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
365 Ok(())
366 }
367
368 #[cfg(not(feature = "noise"))]
369 #[tokio::test]
370 async fn cover_traffic_should_produce_empty_stream() -> anyhow::Result<()> {
371 let me = random_key();
372 let prober = FullNetworkDiscovery::new(me, fast_cfg(), Arc::new(ChannelGraph::new(me)));
373 let stream = CoverTrafficGeneration::build(&prober);
374 pin_mut!(stream);
375
376 assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_none());
377 Ok(())
378 }
379
380 #[tokio::test]
381 async fn only_neighbor_probes_emitted_when_no_looping_paths_exist() -> anyhow::Result<()> {
382 let me = random_key();
383 let graph = Arc::new(ChannelGraph::new(me));
384
385 let a = random_key();
386 let b = random_key();
387 graph.record_node(a);
388 graph.record_node(b);
389 graph.add_edge(&me, &a)?;
390 graph.add_edge(&a, &b)?;
391
392 let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
393 let stream = ProbingTrafficGeneration::build(&prober);
394 pin_mut!(stream);
395
396 let items: Vec<ProbeRouting> = timeout(TINY_TIMEOUT * 50, stream.take(10).collect::<Vec<_>>()).await?;
397
398 assert!(!items.is_empty(), "should produce neighbor probes");
399 assert!(
400 items.iter().all(|r| matches!(r, ProbeRouting::Neighbor(_))),
401 "all probes should be Neighbor when no looping paths exist"
402 );
403 Ok(())
404 }
405
406 #[tokio::test]
407 async fn neighbor_probes_should_cover_all_known_nodes_across_rounds() -> anyhow::Result<()> {
408 let me = random_key();
409 let graph = Arc::new(ChannelGraph::new(me));
410
411 let peer_a = random_key();
412 let peer_b = random_key();
413 graph.record_node(peer_a);
414 graph.record_node(peer_b);
415
416 let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
417 let stream = ProbingTrafficGeneration::build(&prober);
418 pin_mut!(stream);
419
420 let destinations: Vec<NodeId> = timeout(
421 TINY_TIMEOUT * 50,
422 neighbor_destinations(stream).take(4).collect::<Vec<_>>(),
423 )
424 .await?;
425
426 let unique: HashSet<NodeId> = destinations.iter().cloned().collect();
427 let expected: HashSet<NodeId> = [peer_a, peer_b].into_iter().map(NodeId::from).collect();
428
429 assert_eq!(unique, expected, "probes should cover all known graph peers");
430 assert_eq!(destinations.len(), 4, "should have probes across multiple rounds");
431 Ok(())
432 }
433
434 #[tokio::test]
435 async fn single_tick_should_emit_all_peers_in_burst() -> anyhow::Result<()> {
436 let me = random_key();
437 let graph = Arc::new(ChannelGraph::new(me));
438
439 let peer_count = RANDOM_PEERS.len();
440 for node in RANDOM_PEERS.iter() {
441 graph.record_node(node.clone());
442 }
443
444 let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
445 let stream = ProbingTrafficGeneration::build(&prober);
446 pin_mut!(stream);
447
448 let burst: Vec<ProbeRouting> = timeout(TINY_TIMEOUT * 20, stream.take(peer_count).collect::<Vec<_>>()).await?;
450
451 assert_eq!(
452 burst.len(),
453 peer_count,
454 "a single tick should emit all {peer_count} peers"
455 );
456 assert!(
457 burst.iter().all(|r| matches!(r, ProbeRouting::Neighbor(_))),
458 "all burst items should be Neighbor probes"
459 );
460
461 Ok(())
462 }
463
464 fn neighbor_destinations(stream: impl futures::Stream<Item = ProbeRouting>) -> impl futures::Stream<Item = NodeId> {
466 stream.filter_map(|r| {
467 futures::future::ready(match r {
468 ProbeRouting::Neighbor(DestinationRouting::Forward { destination, .. }) => Some(*destination),
469 _ => None,
470 })
471 })
472 }
473
474 fn mark_edge_ready(graph: &ChannelGraph, src: &OffchainPublicKey, dst: &OffchainPublicKey) {
476 use hopr_api::graph::traits::{EdgeObservableWrite, EdgeWeightType};
477 graph.upsert_edge(src, dst, |obs| {
478 obs.record(EdgeWeightType::Connected(true));
479 obs.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
480 obs.record(EdgeWeightType::Capacity(Some(1000)));
481 });
482 }
483
484 #[tokio::test]
485 async fn loopback_probes_should_be_emitted_for_two_edge_path() -> anyhow::Result<()> {
486 let me = random_key();
489 let a = random_key();
490 let b = random_key();
491 let graph = Arc::new(ChannelGraph::new(me));
492 graph.add_node(a);
493 graph.add_node(b);
494
495 graph.add_edge(&me, &a)?;
497 graph.add_edge(&a, &b)?;
498 mark_edge_ready(&graph, &me, &a);
499 mark_edge_ready(&graph, &a, &b);
500
501 graph.add_edge(&b, &me)?;
503 mark_edge_ready(&graph, &b, &me);
504
505 graph.add_edge(&me, &b)?;
507 mark_edge_ready(&graph, &me, &b);
508
509 let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
510 let stream = ProbingTrafficGeneration::build(&prober);
511 pin_mut!(stream);
512
513 let items: Vec<ProbeRouting> = timeout(TINY_TIMEOUT * 100, stream.take(20).collect::<Vec<_>>()).await?;
515
516 let looping_count = items.iter().filter(|r| matches!(r, ProbeRouting::Looping(_))).count();
517 let neighbor_count = items.iter().filter(|r| matches!(r, ProbeRouting::Neighbor(_))).count();
518
519 assert!(neighbor_count > 0, "should have neighbor probes");
520 assert!(
521 looping_count > 0,
522 "should have loopback probes (was {looping_count} out of {} total)",
523 items.len()
524 );
525
526 for item in &items {
528 if let ProbeRouting::Looping((
529 DestinationRouting::Forward {
530 destination,
531 forward_options,
532 ..
533 },
534 _,
535 )) = item
536 {
537 assert_eq!(
538 destination.as_ref(),
539 &NodeId::Offchain(me),
540 "loopback destination should be me"
541 );
542 assert!(
543 matches!(forward_options, RoutingOptions::IntermediatePath(_)),
544 "loopback should use IntermediatePath routing"
545 );
546 }
547 }
548
549 Ok(())
550 }
551
552 #[tokio::test]
553 async fn probe_connected_only_should_skip_unconnected_peers() -> anyhow::Result<()> {
554 let me = random_key();
555 let graph = Arc::new(ChannelGraph::new(me));
556
557 let connected_peer = random_key();
558 let unconnected_peer = random_key();
559 graph.record_node(connected_peer);
560 graph.record_node(unconnected_peer);
561
562 mark_edge_ready(&graph, &me, &connected_peer);
564
565 let cfg = ProberConfig {
566 probe_connected_only: true,
567 ..fast_cfg()
568 };
569 let prober = FullNetworkDiscovery::new(me, cfg, graph);
570 let stream = ProbingTrafficGeneration::build(&prober);
571 pin_mut!(stream);
572
573 let destinations: Vec<NodeId> = timeout(
574 TINY_TIMEOUT * 50,
575 neighbor_destinations(stream).take(3).collect::<Vec<_>>(),
576 )
577 .await?;
578
579 let unique: HashSet<NodeId> = destinations.iter().cloned().collect();
580 assert_eq!(unique.len(), 1, "only one peer should be probed");
581 assert!(
582 unique.contains(&NodeId::from(connected_peer)),
583 "only the connected peer should be probed"
584 );
585 Ok(())
586 }
587
588 #[tokio::test]
589 async fn probe_connected_only_disabled_should_probe_all_peers() -> anyhow::Result<()> {
590 let me = random_key();
591 let graph = Arc::new(ChannelGraph::new(me));
592
593 let connected_peer = random_key();
594 let unconnected_peer = random_key();
595 graph.record_node(connected_peer);
596 graph.record_node(unconnected_peer);
597
598 mark_edge_ready(&graph, &me, &connected_peer);
599
600 let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
602 let stream = ProbingTrafficGeneration::build(&prober);
603 pin_mut!(stream);
604
605 let destinations: Vec<NodeId> = timeout(
606 TINY_TIMEOUT * 50,
607 neighbor_destinations(stream).take(4).collect::<Vec<_>>(),
608 )
609 .await?;
610
611 let unique: HashSet<NodeId> = destinations.iter().cloned().collect();
612 let expected: HashSet<NodeId> = [connected_peer, unconnected_peer]
613 .into_iter()
614 .map(NodeId::from)
615 .collect();
616 assert_eq!(
617 unique, expected,
618 "both peers should be probed when probe_connected_only is false"
619 );
620 Ok(())
621 }
622
623 #[tokio::test]
624 async fn loopback_routing_should_reject_full_path_with_me() -> anyhow::Result<()> {
625 let me = random_key();
628 let a = random_key();
629 let b = random_key();
630 let me_node = NodeId::Offchain(me);
631
632 let full_path = vec![me, a, b, me];
634 assert!(
635 loopback_routing(me_node, full_path).is_none(),
636 "full path [me, a, b, me] should exceed BoundedVec<3> and return None"
637 );
638
639 let stripped_path = vec![a, b];
641 assert!(
642 loopback_routing(me_node, stripped_path).is_some(),
643 "stripped path [a, b] should fit BoundedVec<3> and return Some"
644 );
645
646 Ok(())
647 }
648}