1use std::sync::Arc;
2
3use futures::{FutureExt, SinkExt, StreamExt};
4use futures_concurrency::stream::StreamExt as _;
5use hopr_api::{
6 ct::{ProbeRouting, ProbingTrafficGeneration},
7 graph::{EdgeTransportTelemetry, NetworkGraphError, NetworkGraphUpdate, NetworkGraphView},
8 types::{
9 crypto::types::OffchainPublicKey, crypto_random::Randomizable, internal::prelude::*,
10 primitive::traits::AsUnixTimestamp,
11 },
12};
13use hopr_protocol_app::{
14 prelude::{ApplicationDataIn, ApplicationDataOut, OutgoingPacketInfo, ReservedTag},
15 v1::Tag,
16};
17use hopr_transport_tag_allocator::{AllocatedTag, TagAllocator};
18use hopr_utils::{platform::time::native::current_time, runtime::AbortableList};
19
20use crate::{
21 HoprProbeProcess,
22 config::ProbeConfig,
23 content::Message,
24 ping::PingQueryReplier,
25 types::{NeighborProbe, NeighborTelemetry, PathTelemetry},
26};
27
28type CacheNeighborKey = (HoprPseudonym, NeighborProbe);
29type CacheNeighborValue = (Box<NodeId>, std::time::Duration, Option<PingQueryReplier>);
30
31pub enum ProbeDispatch {
33 Consumed,
35 Passthrough(HoprPseudonym, ApplicationDataIn),
37}
38
39#[derive(Clone)]
45pub struct ProbeClassifierState<G> {
46 active_neighbor_probes: moka::future::Cache<CacheNeighborKey, CacheNeighborValue>,
47 active_path_probes: moka::future::Cache<Tag, (PathTelemetry, Arc<AllocatedTag>)>,
48 network_graph: G,
49}
50
51impl<G> ProbeClassifierState<G>
52where
53 G: NetworkGraphUpdate + Clone + Send + Sync + 'static,
54{
55 pub async fn classify<T>(
60 &self,
61 mut push_to_network: T,
62 pseudonym: HoprPseudonym,
63 in_data: ApplicationDataIn,
64 ) -> ProbeDispatch
65 where
66 T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Unpin + Send + 'static,
67 T::Error: Send,
68 {
69 let tag: Tag = in_data.data.application_tag;
70
71 if let Some((path_telemetry, _allocated_tag)) = self.active_path_probes.remove(&tag).await {
72 tracing::debug!(%tag, "loopback probe successfully received");
73 self.network_graph
74 .record_edge::<NeighborTelemetry, PathTelemetry>(hopr_api::graph::MeasurableEdge::Probe(Ok(
75 EdgeTransportTelemetry::Loopback(path_telemetry),
76 )));
77 } else if tag == ReservedTag::Ping.into() {
78 let message: anyhow::Result<Message> = in_data
79 .data
80 .try_into()
81 .map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
82
83 match message {
84 Ok(message) => match message {
85 Message::Telemetry(_) => {
86 tracing::warn!(%pseudonym, "received telemetry on reserved ping tag, ignoring");
87 }
88 Message::Probe(NeighborProbe::Ping(ping)) => {
89 tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
90 tracing::trace!(%pseudonym, nonce = hex::encode(ping), "wrapping a pong in the found SURB");
91
92 let message = Message::Probe(NeighborProbe::Pong(ping));
93 if let Ok(data) = message.try_into() {
94 let routing = DestinationRouting::Return(pseudonym.into());
95 let data = ApplicationDataOut::with_no_packet_info(data);
96 if let Err(_error) = push_to_network.send((routing, data)).await {
97 tracing::error!(%pseudonym, "failed to send back a pong");
98 }
99 } else {
100 tracing::error!(%pseudonym, "failed to convert pong message into data");
101 }
102 }
103 Message::Probe(NeighborProbe::Pong(pong)) => {
104 tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
105 if let Some((peer, start, replier)) = self
106 .active_neighbor_probes
107 .remove(&(pseudonym, NeighborProbe::Ping(pong)))
108 .await
109 {
110 let latency = current_time().as_unix_timestamp().saturating_sub(start);
111
112 if let NodeId::Offchain(opk) = peer.as_ref() {
113 tracing::debug!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful");
114 self.network_graph.record_edge::<NeighborTelemetry, PathTelemetry>(
115 hopr_api::graph::MeasurableEdge::Probe(Ok(EdgeTransportTelemetry::Neighbor(
116 NeighborTelemetry {
117 peer: *opk,
118 rtt: latency,
119 },
120 ))),
121 )
122 } else {
123 tracing::warn!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful to non-offchain peer");
124 }
125
126 if let Some(replier) = replier {
127 replier.notify(Ok(latency));
128 }
129 } else {
130 tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
131 }
132 }
133 },
134 Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
135 }
136 } else {
137 return ProbeDispatch::Passthrough(pseudonym, in_data);
138 }
139
140 ProbeDispatch::Consumed
141 }
142
143 pub fn filter_stream<T, S>(
146 self,
147 push_to_network: T,
148 stream: S,
149 ) -> impl futures::Stream<Item = (HoprPseudonym, ApplicationDataIn)>
150 where
151 T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Unpin + Send + Sync + 'static,
152 T::Error: Send,
153 S: futures::Stream<Item = (HoprPseudonym, ApplicationDataIn)>,
154 {
155 use futures::StreamExt;
156 stream.filter_map(move |(pseudonym, data)| {
157 let state = self.clone();
158 let push = push_to_network.clone();
159 async move {
160 match state.classify(push, pseudonym, data).await {
161 ProbeDispatch::Consumed => None,
162 ProbeDispatch::Passthrough(ps, d) => Some((ps, d)),
163 }
164 }
165 })
166 }
167}
168
169pub struct Probe {
174 cfg: ProbeConfig,
176 tag_allocator: Arc<dyn TagAllocator + Send + Sync>,
178}
179
180impl Probe {
181 pub fn new(cfg: ProbeConfig, tag_allocator: Arc<dyn TagAllocator + Send + Sync>) -> Self {
182 Self { cfg, tag_allocator }
183 }
184
185 pub async fn continuously_scan<T, V, Tr, G>(
191 self,
192 api_out: T, manual_events: V, probing_traffic_generator: Tr,
195 network_graph: G,
196 ) -> (AbortableList<HoprProbeProcess>, ProbeClassifierState<G>)
197 where
198 T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
199 T::Error: Send,
200 V: futures::Stream<Item = (OffchainPublicKey, PingQueryReplier)> + Send + Sync + 'static,
201 Tr: ProbingTrafficGeneration + Send + Sync + 'static,
202 G: NetworkGraphView + NetworkGraphUpdate + Clone + Send + Sync + 'static,
203 {
204 let max_parallel_probes = self.cfg.max_parallel_probes;
205
206 let probing_routes = probing_traffic_generator.build();
207
208 let network_graph_internal_neighbor = network_graph.clone();
210 let network_graph_internal_path = network_graph.clone();
211 let timeout = self.cfg.timeout;
212 let active_neighbor_probes: moka::future::Cache<CacheNeighborKey, CacheNeighborValue> =
213 moka::future::Cache::builder()
214 .time_to_live(timeout)
215 .max_capacity(100_000)
216 .async_eviction_listener(
217 move |k: Arc<CacheNeighborKey>,
218 v: CacheNeighborValue,
219 cause|
220 -> moka::notification::ListenerFuture {
221 if matches!(cause, moka::notification::RemovalCause::Expired) {
222 let store = network_graph_internal_neighbor.clone();
224 let (peer, _start, notifier) = v;
225
226 tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "neighbor probe failed");
227 if let Some(replier) = notifier {
228 if matches!(peer.as_ref(), NodeId::Offchain(_)) {
229 replier.notify(Err(()));
230 } else {
231 tracing::warn!(
232 reason = "non-offchain peer",
233 "cannot notify timeout for non-offchain peer"
234 );
235 }
236 };
237
238 if let NodeId::Offchain(opk) = peer.as_ref() {
239 let opk: OffchainPublicKey = *opk;
240 store
241 .record_edge::<NeighborTelemetry, PathTelemetry>(
242 hopr_api::graph::MeasurableEdge::Probe(Err(
243 NetworkGraphError::ProbeNeighborTimeout(Box::new(opk)),
244 )),
245 );
246 futures::FutureExt::boxed(futures::future::ready(()))
247
248 } else {
249 futures::FutureExt::boxed(futures::future::ready(()))
250 }
251 } else {
252 futures::FutureExt::boxed(futures::future::ready(()))
254 }
255 },
256 )
257 .build();
258
259 let active_path_probes: moka::future::Cache<Tag, (PathTelemetry, Arc<AllocatedTag>)> =
260 moka::future::Cache::builder()
261 .time_to_live(timeout)
262 .max_capacity(100_000)
263 .async_eviction_listener(
264 move |tag: Arc<Tag>,
265 (path, _allocated_tag): (PathTelemetry, Arc<AllocatedTag>),
266 cause|
267 -> moka::notification::ListenerFuture {
268 if matches!(cause, moka::notification::RemovalCause::Expired) {
269 let store = network_graph_internal_path.clone();
271
272 tracing::debug!(%tag, reason = "timeout", "loopback probe failed");
273
274 store.record_edge::<NeighborTelemetry, PathTelemetry>(
275 hopr_api::graph::MeasurableEdge::Probe(Err(NetworkGraphError::ProbeLoopbackTimeout(
276 path,
277 ))),
278 );
279 futures::FutureExt::boxed(futures::future::ready(()))
280 } else {
281 futures::FutureExt::boxed(futures::future::ready(()))
283 }
284 },
285 )
286 .build();
287
288 let push_to_network = api_out.clone();
289
290 let mut processes = AbortableList::default();
291
292 let direct_neighbors =
294 probing_routes
295 .map(|peer| (peer, None))
296 .merge(manual_events.filter_map(|(peer, notifier)| async move {
297 let routing = DestinationRouting::Forward {
298 destination: Box::new(peer.into()),
299 pseudonym: Some(HoprPseudonym::random()),
300 forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
301 return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
302 };
303 Some((ProbeRouting::Neighbor(routing), Some(notifier)))
304 }));
305
306 let tag_allocator = self.tag_allocator.clone();
307 let classifier_neighbor_probes = active_neighbor_probes.clone();
308 let classifier_path_probes = active_path_probes.clone();
309 processes.insert(
310 HoprProbeProcess::Emit,
311 hopr_utils::spawn_as_abortable!(async move {
312 direct_neighbors
313 .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
314 let active_neighbor_probes = active_neighbor_probes.clone();
315 let active_path_probes = active_path_probes.clone();
316 let push_to_network = push_to_network.clone();
317 let tag_allocator = tag_allocator.clone();
318
319 async move {
320 match peer {
321 ProbeRouting::Neighbor(DestinationRouting::Forward {
322 destination,
323 pseudonym,
324 forward_options,
325 return_options,
326 }) => {
327 let nonce = NeighborProbe::random_nonce();
328
329 let message = Message::Probe(nonce);
330
331 if let Ok(data) = message.try_into() {
332 let routing = DestinationRouting::Forward {
333 destination: destination.clone(),
334 pseudonym,
335 forward_options,
336 return_options,
337 };
338 let data = ApplicationDataOut {
341 data,
342 packet_info: Some(OutgoingPacketInfo {
343 max_surbs_in_packet: 1,
344 ..Default::default()
345 }),
346 };
347 let mut push_to_network = push_to_network.clone();
348
349 if let Err(_error) = push_to_network.send((routing, data)).await {
350 tracing::error!("failed to send out a ping");
351 } else {
352 active_neighbor_probes
353 .insert(
354 (
355 pseudonym
356 .expect("the pseudonym must be present in Forward routing"),
357 nonce,
358 ),
359 (destination, current_time().as_unix_timestamp(), notifier),
360 )
361 .await;
362 }
363 } else {
364 tracing::error!("failed to convert ping message into data");
365 }
366 }
367 ProbeRouting::Neighbor(DestinationRouting::Return(_surb_matcher)) => tracing::error!(
368 error = "logical error",
369 "resolved transport routing is not forward"
370 ),
371 ProbeRouting::Looping((routing, path_id)) => {
372 let message = Message::Telemetry(PathTelemetry {
373 id: hopr_api::types::crypto_random::random_bytes(),
374 path: std::array::from_fn(|i| path_id[i / 8].to_le_bytes()[i % 8]),
375 timestamp: std::time::SystemTime::now()
376 .duration_since(std::time::UNIX_EPOCH)
377 .unwrap_or_default()
378 .as_millis(),
379 });
380
381 if let Some(allocated_tag) = tag_allocator.allocate() {
382 let tag_value = allocated_tag.value();
383
384 if let Ok(packet) = hopr_protocol_app::prelude::ApplicationData::new(
385 tag_value,
386 message.to_bytes().as_ref(),
387 ) {
388 let mut push_to_network = push_to_network.clone();
389
390 if let Err(_error) = push_to_network
393 .send((
394 routing,
395 ApplicationDataOut {
396 data: packet,
397 packet_info: Some(OutgoingPacketInfo {
398 max_surbs_in_packet: 0,
399 ..Default::default()
400 }),
401 },
402 ))
403 .await
404 {
405 tracing::error!("failed to send out a ping");
406 } else {
407 if let Message::Telemetry(telemetry) = message {
409 active_path_probes
410 .insert(tag_value.into(), (telemetry, Arc::new(allocated_tag)))
411 .await;
412 }
413 }
414 } else {
415 tracing::error!("failed to construct data for path telemetry")
416 }
417 } else {
418 tracing::warn!("probing telemetry tag pool exhausted, skipping loopback probe");
419 }
420 }
421 }
422 }
423 })
424 .inspect(|_| {
425 tracing::warn!(
426 task = "transport (probe - generate outgoing)",
427 "long-running background task finished"
428 )
429 })
430 .await;
431 }),
432 );
433
434 let classifier = ProbeClassifierState {
435 active_neighbor_probes: classifier_neighbor_probes,
436 active_path_probes: classifier_path_probes,
437 network_graph,
438 };
439
440 (processes, classifier)
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use std::{collections::VecDeque, sync::RwLock, time::Duration};
447
448 use async_trait::async_trait;
449 use futures::future::BoxFuture;
450 use hopr_api::{
451 graph::{
452 EdgeLinkObservable, MeasurableEdge, NetworkGraphError,
453 traits::{EdgeNetworkObservableRead, EdgeObservableRead, EdgeObservableWrite, EdgeProtocolObservable},
454 },
455 types::crypto::keypairs::{ChainKeypair, Keypair, OffchainKeypair},
456 };
457 use hopr_ct_immediate::{ImmediateNeighborProber, ProberConfig};
458 use hopr_protocol_app::prelude::{ApplicationData, ReservedTag, Tag};
459
460 use super::*;
461 use crate::errors::ProbeError;
462
463 lazy_static::lazy_static!(
464 static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
465 static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
466 static ref NEIGHBOURS: Vec<OffchainPublicKey> = vec![
467 *OffchainKeypair::random().public(),
468 *OffchainKeypair::random().public(),
469 *OffchainKeypair::random().public(),
470 *OffchainKeypair::random().public(),
471 ];
472 );
473
474 #[derive(Debug, Clone, Copy, Default)]
476 pub struct TestEdgeTransportObservations;
477
478 impl EdgeLinkObservable for TestEdgeTransportObservations {
479 fn record(&mut self, _latency: std::result::Result<Duration, ()>) {}
480
481 fn average_latency(&self) -> Option<Duration> {
482 None
483 }
484
485 fn average_probe_rate(&self) -> f64 {
486 1.0
487 }
488
489 fn score(&self) -> f64 {
490 1.0
491 }
492 }
493
494 impl EdgeNetworkObservableRead for TestEdgeTransportObservations {
495 fn is_connected(&self) -> bool {
496 true
497 }
498 }
499
500 impl EdgeProtocolObservable for TestEdgeTransportObservations {
501 fn capacity(&self) -> Option<u128> {
502 None
503 }
504 }
505
506 impl hopr_api::graph::EdgeImmediateProtocolObservable for TestEdgeTransportObservations {
507 fn ack_rate(&self) -> Option<f64> {
508 None
509 }
510 }
511
512 #[derive(Debug, Clone, Copy, Default)]
513 pub struct TestEdgeObservations;
514
515 impl EdgeObservableWrite for TestEdgeObservations {
516 fn record(&mut self, _measurement: hopr_api::graph::traits::EdgeWeightType) {}
517 }
518
519 impl EdgeObservableRead for TestEdgeObservations {
520 type ImmediateMeasurement = TestEdgeTransportObservations;
521 type IntermediateMeasurement = TestEdgeTransportObservations;
522
523 fn last_update(&self) -> std::time::Duration {
524 std::time::SystemTime::now()
525 .duration_since(std::time::UNIX_EPOCH)
526 .unwrap_or_default()
527 }
528
529 fn immediate_qos(&self) -> Option<&Self::ImmediateMeasurement> {
530 None
531 }
532
533 fn intermediate_qos(&self) -> Option<&Self::IntermediateMeasurement> {
534 None
535 }
536
537 fn score(&self) -> f64 {
538 1.0
539 }
540 }
541
542 #[derive(Debug, Clone)]
543 pub struct PeerStore {
544 me: OffchainPublicKey,
545 get_peers: Arc<RwLock<VecDeque<Vec<OffchainPublicKey>>>>,
546 #[allow(clippy::type_complexity)]
547 on_finished: Arc<RwLock<Vec<(OffchainPublicKey, crate::errors::Result<Duration>)>>>,
548 }
549
550 impl NetworkGraphUpdate for PeerStore {
551 fn record_edge<N, P>(&self, telemetry: MeasurableEdge<N, P>)
552 where
553 N: hopr_api::graph::MeasurablePeer + Send + Clone,
554 P: hopr_api::graph::MeasurablePath + Send + Clone,
555 {
556 let mut on_finished = self.on_finished.write().unwrap();
557
558 match telemetry {
559 hopr_api::graph::MeasurableEdge::Probe(Ok(EdgeTransportTelemetry::Neighbor(neighbor_telemetry))) => {
560 let peer: OffchainPublicKey = *neighbor_telemetry.peer();
561 let duration = neighbor_telemetry.rtt();
562 on_finished.push((peer, Ok(duration)));
563 }
564 hopr_api::graph::MeasurableEdge::Probe(Err(NetworkGraphError::ProbeNeighborTimeout(peer))) => {
565 on_finished.push((
566 *peer.as_ref(),
567 Err(ProbeError::TrafficError(NetworkGraphError::ProbeNeighborTimeout(peer))),
568 ));
569 }
570 _ => panic!("unexpected telemetry type, unimplemented"),
571 }
572 }
573
574 fn record_node<N>(&self, _node: N)
575 where
576 N: hopr_api::graph::MeasurableNode + Send + Clone,
577 {
578 unimplemented!()
579 }
580 }
581
582 #[async_trait]
583 impl NetworkGraphView for PeerStore {
584 type NodeId = OffchainPublicKey;
585 type Observed = TestEdgeObservations;
586
587 fn identity(&self) -> &OffchainPublicKey {
588 &self.me
589 }
590
591 fn node_count(&self) -> usize {
592 self.get_peers.read().unwrap().front().map_or(0, |v| v.len())
593 }
594
595 fn contains_node(&self, _key: &OffchainPublicKey) -> bool {
596 false
597 }
598
599 fn nodes(&self) -> futures::stream::BoxStream<'static, OffchainPublicKey> {
601 let mut get_peers = self.get_peers.write().unwrap();
602 Box::pin(futures::stream::iter(get_peers.pop_front().unwrap_or_default()))
603 }
604
605 fn edge(&self, _src: &OffchainPublicKey, _dest: &OffchainPublicKey) -> Option<TestEdgeObservations> {
606 Some(TestEdgeObservations)
607 }
608 }
609
610 type TestClassifier = ProbeClassifierState<PeerStore>;
611
612 struct TestInterface {
613 probe_classifier: TestClassifier,
614 from_probing_to_network_rx: futures::channel::mpsc::Receiver<(DestinationRouting, ApplicationDataOut)>,
615 from_probing_to_network_tx: futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
616 manual_probe_tx: futures::channel::mpsc::Sender<(OffchainPublicKey, PingQueryReplier)>,
617 }
618
619 async fn test_with_probing<F, Fut>(cfg: ProbeConfig, store: PeerStore, test: F) -> anyhow::Result<()>
620 where
621 Fut: std::future::Future<Output = anyhow::Result<()>>,
622 F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
623 {
624 let tag_allocators = hopr_transport_tag_allocator::create_allocators(
625 ReservedTag::range().end..u16::MAX as u64 + 1,
626 [
627 (hopr_transport_tag_allocator::Usage::Session, 2048),
628 (hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry, 4000),
629 (hopr_transport_tag_allocator::Usage::ProvingTelemetry, 10000),
630 ],
631 )
632 .expect("tag allocators should be created");
633 let probing_allocator = tag_allocators
634 .into_iter()
635 .find_map(|(u, alloc)| matches!(u, hopr_transport_tag_allocator::Usage::ProvingTelemetry).then_some(alloc))
636 .expect("probing allocator should exist");
637
638 let probe = Probe::new(cfg, probing_allocator);
639
640 let (from_probing_to_network_tx, from_probing_to_network_rx) =
641 futures::channel::mpsc::channel::<(DestinationRouting, ApplicationDataOut)>(100);
642
643 let (manual_probe_tx, manual_probe_rx) =
644 futures::channel::mpsc::channel::<(OffchainPublicKey, PingQueryReplier)>(100);
645
646 let (jhs, probe_classifier) = probe
647 .continuously_scan(
648 from_probing_to_network_tx.clone(),
649 manual_probe_rx,
650 ImmediateNeighborProber::new(
651 ProberConfig {
652 interval: cfg.interval,
653 recheck_threshold: cfg.recheck_threshold,
654 },
655 store.clone(),
656 ),
657 store,
658 )
659 .await;
660
661 let interface = TestInterface {
662 probe_classifier,
663 from_probing_to_network_rx,
664 from_probing_to_network_tx,
665 manual_probe_tx,
666 };
667
668 let result = test(interface).await;
669
670 jhs.abort_all();
671
672 result
673 }
674
675 const NO_PROBE_PASSES: f64 = 0.0;
676 const ALL_PROBES_PASS: f64 = 1.0;
677
678 fn concurrent_classify(
681 delay: Option<std::time::Duration>,
682 pass_rate: f64,
683 classifier: TestClassifier,
684 push_to_network: futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
685 ) -> impl Fn((DestinationRouting, ApplicationDataOut)) -> BoxFuture<'static, ()> {
686 debug_assert!(
687 (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
688 "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
689 );
690
691 move |(path, data_out): (DestinationRouting, ApplicationDataOut)| -> BoxFuture<'static, ()> {
692 let classifier = classifier.clone();
693 let push_to_network = push_to_network.clone();
694
695 Box::pin(async move {
696 if let DestinationRouting::Forward { pseudonym, .. } = path {
697 let message: Message = data_out.data.try_into().expect("failed to convert data into message");
698 if let Message::Probe(NeighborProbe::Ping(ping)) = message {
699 let pong_message = Message::Probe(NeighborProbe::Pong(ping));
700
701 if let Some(delay) = delay {
702 tokio::time::sleep(delay).await;
703 }
704
705 if rand::random_range(NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate {
706 let pseudonym = pseudonym.expect("the pseudonym is always known from cache");
707 classifier
708 .classify(
709 push_to_network,
710 pseudonym,
711 ApplicationDataIn {
712 data: pong_message
713 .try_into()
714 .expect("failed to convert pong message into data"),
715 packet_info: Default::default(),
716 },
717 )
718 .await;
719 }
720 }
721 };
722 })
723 }
724 }
725
726 #[tokio::test]
727 async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
729 let cfg = ProbeConfig {
730 timeout: std::time::Duration::from_millis(5),
731 interval: std::time::Duration::from_secs(0),
732 ..Default::default()
733 };
734
735 let store = PeerStore {
736 me: *OFFCHAIN_KEYPAIR.public(),
737 get_peers: Arc::new(RwLock::new(VecDeque::new())),
738 on_finished: Arc::new(RwLock::new(Vec::new())),
739 };
740
741 test_with_probing(cfg, store, move |iface: TestInterface| async move {
742 let mut manual_probe_tx = iface.manual_probe_tx;
743 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
744 let from_probing_to_network_tx = iface.from_probing_to_network_tx;
745 let probe_classifier = iface.probe_classifier;
746
747 let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ()>>(128);
748 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
749
750 let _jh: hopr_utils::runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
751 from_probing_to_network_rx
752 .for_each_concurrent(
753 cfg.max_parallel_probes + 1,
754 concurrent_classify(None, ALL_PROBES_PASS, probe_classifier, from_probing_to_network_tx),
755 )
756 .await;
757 });
758
759 let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
760 .await?
761 .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))?
762 .map_err(|_| anyhow::anyhow!("Probe failed"))?;
763
764 Ok(())
765 })
766 .await
767 }
768
769 #[tokio::test]
770 async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
772 let cfg = ProbeConfig {
773 timeout: std::time::Duration::from_millis(5),
774 interval: std::time::Duration::from_secs(0),
775 ..Default::default()
776 };
777
778 let store = PeerStore {
779 me: *OFFCHAIN_KEYPAIR.public(),
780 get_peers: Arc::new(RwLock::new(VecDeque::new())),
781 on_finished: Arc::new(RwLock::new(Vec::new())),
782 };
783
784 test_with_probing(cfg, store, move |iface: TestInterface| async move {
785 let mut manual_probe_tx = iface.manual_probe_tx;
786 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
787 let from_probing_to_network_tx = iface.from_probing_to_network_tx;
788 let probe_classifier = iface.probe_classifier;
789
790 let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ()>>(128);
791 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
792
793 let _jh: hopr_utils::runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
794 from_probing_to_network_rx
795 .for_each_concurrent(
796 cfg.max_parallel_probes + 1,
797 concurrent_classify(None, NO_PROBE_PASSES, probe_classifier, from_probing_to_network_tx),
798 )
799 .await;
800 });
801
802 assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
803
804 Ok(())
805 })
806 .await
807 }
808
809 #[tokio::test]
810 async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
812 let cfg = ProbeConfig {
813 timeout: std::time::Duration::from_millis(20),
814 max_parallel_probes: NEIGHBOURS.len(),
815 interval: std::time::Duration::from_secs(0),
816 ..Default::default()
817 };
818
819 let store = PeerStore {
820 me: *OFFCHAIN_KEYPAIR.public(),
821 get_peers: Arc::new(RwLock::new({
822 let mut neighbors = VecDeque::new();
823 neighbors.push_back(NEIGHBOURS.clone());
824 neighbors
825 })),
826 on_finished: Arc::new(RwLock::new(Vec::new())),
827 };
828
829 test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
830 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
831 let from_probing_to_network_tx = iface.from_probing_to_network_tx;
832 let probe_classifier = iface.probe_classifier;
833
834 let _jh: hopr_utils::runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
835 from_probing_to_network_rx
836 .for_each_concurrent(
837 cfg.max_parallel_probes + 1,
838 concurrent_classify(None, ALL_PROBES_PASS, probe_classifier, from_probing_to_network_tx),
839 )
840 .await;
841 });
842
843 tokio::time::sleep(cfg.timeout * 3).await;
847
848 Ok(())
849 })
850 .await?;
851
852 assert_eq!(
853 store
854 .on_finished
855 .read()
856 .expect("should be lockable")
857 .iter()
858 .filter(|(_peer, result)| result.is_ok())
859 .count(),
860 NEIGHBOURS.len()
861 );
862
863 Ok(())
864 }
865
866 #[tokio::test]
867 async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
869 let cfg = ProbeConfig {
870 timeout: std::time::Duration::from_millis(10),
871 max_parallel_probes: NEIGHBOURS.len(),
872 interval: std::time::Duration::from_secs(0),
873 ..Default::default()
874 };
875
876 let store = PeerStore {
877 me: *OFFCHAIN_KEYPAIR.public(),
878 get_peers: Arc::new(RwLock::new({
879 let mut neighbors = VecDeque::new();
880 neighbors.push_back(NEIGHBOURS.clone());
881 neighbors
882 })),
883 on_finished: Arc::new(RwLock::new(Vec::new())),
884 };
885
886 let timeout = cfg.timeout * 2;
887
888 test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
889 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
890 let from_probing_to_network_tx = iface.from_probing_to_network_tx;
891 let probe_classifier = iface.probe_classifier;
892
893 let _jh: hopr_utils::runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
894 from_probing_to_network_rx
895 .for_each_concurrent(
896 cfg.max_parallel_probes + 1,
897 concurrent_classify(
898 Some(timeout),
899 ALL_PROBES_PASS,
900 probe_classifier,
901 from_probing_to_network_tx,
902 ),
903 )
904 .await;
905 });
906
907 tokio::time::sleep(timeout * 2).await;
909
910 Ok(())
911 })
912 .await?;
913
914 assert_eq!(
915 store
916 .on_finished
917 .read()
918 .expect("should be lockable")
919 .iter()
920 .filter(|(_peer, result)| result.is_err())
921 .count(),
922 NEIGHBOURS.len()
923 );
924
925 Ok(())
926 }
927
928 #[tokio::test]
929 async fn probe_should_reply_with_pong_when_receiving_ping() -> anyhow::Result<()> {
930 use anyhow::Context;
931
932 let cfg = ProbeConfig {
933 timeout: std::time::Duration::from_millis(100),
934 interval: std::time::Duration::from_secs(10),
935 ..Default::default()
936 };
937
938 let store = PeerStore {
939 me: *OFFCHAIN_KEYPAIR.public(),
940 get_peers: Arc::new(RwLock::new(VecDeque::new())),
941 on_finished: Arc::new(RwLock::new(Vec::new())),
942 };
943
944 test_with_probing(cfg, store, move |iface: TestInterface| async move {
945 let probe_classifier = iface.probe_classifier;
946 let from_probing_to_network_tx = iface.from_probing_to_network_tx;
947 let mut from_probing_to_network_rx = iface.from_probing_to_network_rx;
948
949 let ping = NeighborProbe::random_nonce();
951 let ping_nonce = match ping {
952 NeighborProbe::Ping(n) => n,
953 _ => unreachable!(),
954 };
955 let ping_msg = Message::Probe(ping);
956 let app_data: ApplicationData = ping_msg.try_into().context("converting ping to ApplicationData")?;
957
958 let result = probe_classifier
960 .classify(
961 from_probing_to_network_tx,
962 HoprPseudonym::random(),
963 ApplicationDataIn {
964 data: app_data,
965 packet_info: Default::default(),
966 },
967 )
968 .await;
969 anyhow::ensure!(matches!(result, ProbeDispatch::Consumed), "ping should be consumed");
970
971 let (routing, data_out) = tokio::time::timeout(Duration::from_secs(2), from_probing_to_network_rx.next())
973 .await
974 .context("timeout waiting for pong")?
975 .context("probe should send pong reply")?;
976
977 anyhow::ensure!(
979 matches!(routing, DestinationRouting::Return(_)),
980 "pong should use Return routing, got: {routing:?}"
981 );
982
983 let response_msg: Message = data_out.data.try_into().context("converting response to Message")?;
985 anyhow::ensure!(
986 matches!(response_msg, Message::Probe(NeighborProbe::Pong(n)) if n == ping_nonce),
987 "response should be Pong with matching nonce"
988 );
989
990 Ok(())
991 })
992 .await
993 }
994
995 #[tokio::test]
996 async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
998 let cfg = ProbeConfig {
999 timeout: std::time::Duration::from_millis(20),
1000 interval: std::time::Duration::from_secs(0),
1001 ..Default::default()
1002 };
1003
1004 let store = PeerStore {
1005 me: *OFFCHAIN_KEYPAIR.public(),
1006 get_peers: Arc::new(RwLock::new({
1007 let mut neighbors = VecDeque::new();
1008 neighbors.push_back(NEIGHBOURS.clone());
1009 neighbors
1010 })),
1011 on_finished: Arc::new(RwLock::new(Vec::new())),
1012 };
1013
1014 test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
1015 let probe_classifier = iface.probe_classifier;
1016 let from_probing_to_network_tx = iface.from_probing_to_network_tx;
1017
1018 let expected_data = ApplicationData::new(Tag::MAX, b"Hello, this is a test message!")?;
1019
1020 let result = probe_classifier
1021 .classify(
1022 from_probing_to_network_tx,
1023 HoprPseudonym::random(),
1024 ApplicationDataIn {
1025 data: expected_data.clone(),
1026 packet_info: Default::default(),
1027 },
1028 )
1029 .await;
1030
1031 match result {
1032 ProbeDispatch::Passthrough(_, actual) => assert_eq!(actual.data, expected_data),
1033 ProbeDispatch::Consumed => anyhow::bail!("expected Passthrough, got Consumed"),
1034 }
1035
1036 Ok(())
1037 })
1038 .await
1039 }
1040
1041 #[derive(Clone)]
1045 enum TestProbeStrategy {
1046 ManualNeighbor,
1047 OneShotLoopback {
1048 routing: DestinationRouting,
1049 path_id: hopr_api::types::internal::routing::PathId,
1050 },
1051 }
1052
1053 impl hopr_api::ct::ProbingTrafficGeneration for TestProbeStrategy {
1054 fn build(&self) -> futures::stream::BoxStream<'static, hopr_api::ct::ProbeRouting> {
1055 match self {
1056 Self::ManualNeighbor => Box::pin(futures::stream::pending()),
1057 Self::OneShotLoopback { routing, path_id } => {
1058 let probe = hopr_api::ct::ProbeRouting::Looping((routing.clone(), *path_id));
1059 Box::pin(futures::StreamExt::chain(
1060 futures::stream::iter(std::iter::once(probe)),
1061 futures::stream::pending(),
1062 ))
1063 }
1064 }
1065 }
1066 }
1067
1068 #[rstest::rstest]
1075 #[case::neighbor_probe_requests_one_surb(TestProbeStrategy::ManualNeighbor, 1)]
1076 #[case::loopback_probe_requests_zero_surbs(
1077 TestProbeStrategy::OneShotLoopback {
1078 routing: DestinationRouting::Forward {
1079 destination: Box::new((*OFFCHAIN_KEYPAIR.public()).into()),
1080 pseudonym: Some(HoprPseudonym::random()),
1081 forward_options: RoutingOptions::Hops(1.try_into().expect("1 is a valid u8")),
1082 return_options: None,
1083 },
1084 path_id: [1, 2, 3, 4, 5],
1085 },
1086 0,
1087 )]
1088 #[tokio::test]
1089 async fn probe_should_emit_with_expected_surb_count(
1090 #[case] strategy: TestProbeStrategy,
1091 #[case] expected_max_surbs: usize,
1092 ) -> anyhow::Result<()> {
1093 let cfg = ProbeConfig {
1094 timeout: std::time::Duration::from_secs(1),
1095 interval: std::time::Duration::from_secs(0),
1096 ..Default::default()
1097 };
1098
1099 let tag_allocators = hopr_transport_tag_allocator::create_allocators(
1102 ReservedTag::range().end..u16::MAX as u64 + 1,
1103 [
1104 (hopr_transport_tag_allocator::Usage::Session, 2048),
1105 (hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry, 4000),
1106 (hopr_transport_tag_allocator::Usage::ProvingTelemetry, 10000),
1107 ],
1108 )
1109 .expect("tag allocators should be created");
1110 let probing_allocator = tag_allocators
1111 .into_iter()
1112 .find_map(|(u, alloc)| matches!(u, hopr_transport_tag_allocator::Usage::ProvingTelemetry).then_some(alloc))
1113 .expect("probing allocator should exist");
1114
1115 let probe = Probe::new(cfg, probing_allocator);
1116
1117 let (from_probing_to_network_tx, mut from_probing_to_network_rx) =
1118 futures::channel::mpsc::channel::<(DestinationRouting, ApplicationDataOut)>(100);
1119 let (mut manual_probe_tx, manual_probe_rx) =
1120 futures::channel::mpsc::channel::<(OffchainPublicKey, PingQueryReplier)>(100);
1121
1122 let store = PeerStore {
1123 me: *OFFCHAIN_KEYPAIR.public(),
1124 get_peers: Arc::new(RwLock::new(VecDeque::new())),
1125 on_finished: Arc::new(RwLock::new(Vec::new())),
1126 };
1127
1128 let is_manual = matches!(strategy, TestProbeStrategy::ManualNeighbor);
1131 let (jhs, _probe_classifier) = probe
1132 .continuously_scan(from_probing_to_network_tx, manual_probe_rx, strategy, store)
1133 .await;
1134
1135 if is_manual {
1136 let (tx, _rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ()>>(128);
1137 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
1138 }
1139
1140 let (_routing, data_out) =
1141 tokio::time::timeout(std::time::Duration::from_secs(1), from_probing_to_network_rx.next())
1142 .await?
1143 .ok_or_else(|| anyhow::anyhow!("no probe emitted"))?;
1144
1145 jhs.abort_all();
1146
1147 let packet_info = data_out
1148 .packet_info
1149 .ok_or_else(|| anyhow::anyhow!("probe must carry explicit OutgoingPacketInfo"))?;
1150 assert_eq!(
1151 packet_info.max_surbs_in_packet, expected_max_surbs,
1152 "probe must request exactly {expected_max_surbs} SURB(s)"
1153 );
1154
1155 Ok(())
1156 }
1157}