Skip to main content

hopr_transport_probe/
probe.rs

1use std::sync::Arc;
2
3use futures::{FutureExt, SinkExt, StreamExt};
4use futures_concurrency::stream::StreamExt as _;
5use hopr_api::{
6    ct::{ProbeRouting, ProbingTrafficGeneration},
7    graph::{EdgeTransportTelemetry, NetworkGraphError, NetworkGraphUpdate, NetworkGraphView},
8    types::{
9        crypto::types::OffchainPublicKey, crypto_random::Randomizable, internal::prelude::*,
10        primitive::traits::AsUnixTimestamp,
11    },
12};
13use hopr_protocol_app::{
14    prelude::{ApplicationDataIn, ApplicationDataOut, OutgoingPacketInfo, ReservedTag},
15    v1::Tag,
16};
17use hopr_transport_tag_allocator::{AllocatedTag, TagAllocator};
18use hopr_utils::{platform::time::native::current_time, runtime::AbortableList};
19
20use crate::{
21    HoprProbeProcess,
22    config::ProbeConfig,
23    content::Message,
24    ping::PingQueryReplier,
25    types::{NeighborProbe, NeighborTelemetry, PathTelemetry},
26};
27
28type CacheNeighborKey = (HoprPseudonym, NeighborProbe);
29type CacheNeighborValue = (Box<NodeId>, std::time::Duration, Option<PingQueryReplier>);
30
31/// Result of classifying one incoming message through the probe layer.
32pub enum ProbeDispatch {
33    /// The message was a probe message and has been consumed internally.
34    Consumed,
35    /// The message was not related to probing; caller should route it further.
36    Passthrough(HoprPseudonym, ApplicationDataIn),
37}
38
39/// Shared state used to classify incoming messages as probe or non-probe.
40///
41/// Obtained from [`Probe::continuously_scan`]. Use [`filter_stream`](ProbeClassifierState::filter_stream)
42/// to wrap an incoming stream so that probe messages are consumed internally and non-probe messages
43/// are yielded to the caller.
44#[derive(Clone)]
45pub struct ProbeClassifierState<G> {
46    active_neighbor_probes: moka::future::Cache<CacheNeighborKey, CacheNeighborValue>,
47    active_path_probes: moka::future::Cache<Tag, (PathTelemetry, Arc<AllocatedTag>)>,
48    network_graph: G,
49}
50
51impl<G> ProbeClassifierState<G>
52where
53    G: NetworkGraphUpdate + Clone + Send + Sync + 'static,
54{
55    /// Classify one incoming `(pseudonym, data)` pair.
56    ///
57    /// The `push_to_network` sink is used to send pong replies when the message is a Ping.
58    /// Returns `Consumed` if the message was a probe, or `Passthrough` for all other messages.
59    pub async fn classify<T>(
60        &self,
61        mut push_to_network: T,
62        pseudonym: HoprPseudonym,
63        in_data: ApplicationDataIn,
64    ) -> ProbeDispatch
65    where
66        T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Unpin + Send + 'static,
67        T::Error: Send,
68    {
69        let tag: Tag = in_data.data.application_tag;
70
71        if let Some((path_telemetry, _allocated_tag)) = self.active_path_probes.remove(&tag).await {
72            tracing::debug!(%tag, "loopback probe successfully received");
73            self.network_graph
74                .record_edge::<NeighborTelemetry, PathTelemetry>(hopr_api::graph::MeasurableEdge::Probe(Ok(
75                    EdgeTransportTelemetry::Loopback(path_telemetry),
76                )));
77        } else if tag == ReservedTag::Ping.into() {
78            let message: anyhow::Result<Message> = in_data
79                .data
80                .try_into()
81                .map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
82
83            match message {
84                Ok(message) => match message {
85                    Message::Telemetry(_) => {
86                        tracing::warn!(%pseudonym, "received telemetry on reserved ping tag, ignoring");
87                    }
88                    Message::Probe(NeighborProbe::Ping(ping)) => {
89                        tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
90                        tracing::trace!(%pseudonym, nonce = hex::encode(ping), "wrapping a pong in the found SURB");
91
92                        let message = Message::Probe(NeighborProbe::Pong(ping));
93                        if let Ok(data) = message.try_into() {
94                            let routing = DestinationRouting::Return(pseudonym.into());
95                            let data = ApplicationDataOut::with_no_packet_info(data);
96                            if let Err(_error) = push_to_network.send((routing, data)).await {
97                                tracing::error!(%pseudonym, "failed to send back a pong");
98                            }
99                        } else {
100                            tracing::error!(%pseudonym, "failed to convert pong message into data");
101                        }
102                    }
103                    Message::Probe(NeighborProbe::Pong(pong)) => {
104                        tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
105                        if let Some((peer, start, replier)) = self
106                            .active_neighbor_probes
107                            .remove(&(pseudonym, NeighborProbe::Ping(pong)))
108                            .await
109                        {
110                            let latency = current_time().as_unix_timestamp().saturating_sub(start);
111
112                            if let NodeId::Offchain(opk) = peer.as_ref() {
113                                tracing::debug!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful");
114                                self.network_graph.record_edge::<NeighborTelemetry, PathTelemetry>(
115                                    hopr_api::graph::MeasurableEdge::Probe(Ok(EdgeTransportTelemetry::Neighbor(
116                                        NeighborTelemetry {
117                                            peer: *opk,
118                                            rtt: latency,
119                                        },
120                                    ))),
121                                )
122                            } else {
123                                tracing::warn!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful to non-offchain peer");
124                            }
125
126                            if let Some(replier) = replier {
127                                replier.notify(Ok(latency));
128                            }
129                        } else {
130                            tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
131                        }
132                    }
133                },
134                Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
135            }
136        } else {
137            return ProbeDispatch::Passthrough(pseudonym, in_data);
138        }
139
140        ProbeDispatch::Consumed
141    }
142
143    /// Wraps `stream` as an in-place filter: probe messages are handled internally (telemetry,
144    /// pong replies via `push_to_network`), non-probe messages are yielded.
145    pub fn filter_stream<T, S>(
146        self,
147        push_to_network: T,
148        stream: S,
149    ) -> impl futures::Stream<Item = (HoprPseudonym, ApplicationDataIn)>
150    where
151        T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Unpin + Send + Sync + 'static,
152        T::Error: Send,
153        S: futures::Stream<Item = (HoprPseudonym, ApplicationDataIn)>,
154    {
155        use futures::StreamExt;
156        stream.filter_map(move |(pseudonym, data)| {
157            let state = self.clone();
158            let push = push_to_network.clone();
159            async move {
160                match state.classify(push, pseudonym, data).await {
161                    ProbeDispatch::Consumed => None,
162                    ProbeDispatch::Passthrough(ps, d) => Some((ps, d)),
163                }
164            }
165        })
166    }
167}
168
169/// Probe functionality builder.
170///
171/// The builder holds information about this node's own addresses and the configuration for the probing process. It is
172/// then used to construct the probing process itself.
173pub struct Probe {
174    /// Probe configuration.
175    cfg: ProbeConfig,
176    /// Tag allocator for probing telemetry tags.
177    tag_allocator: Arc<dyn TagAllocator + Send + Sync>,
178}
179
180impl Probe {
181    pub fn new(cfg: ProbeConfig, tag_allocator: Arc<dyn TagAllocator + Send + Sync>) -> Self {
182        Self { cfg, tag_allocator }
183    }
184
185    /// The main function that assembles and starts the probing process.
186    ///
187    /// Returns the abortable list of background tasks (probe emission) and a
188    /// [`ProbeClassifierState`] for inline classification of incoming messages.
189    /// Use [`ProbeClassifierState::filter_stream`] to wrap the incoming stream.
190    pub async fn continuously_scan<T, V, Tr, G>(
191        self,
192        api_out: T,       // lower tx channel for sending outgoing probes and pong replies
193        manual_events: V, // explicit requests from the API
194        probing_traffic_generator: Tr,
195        network_graph: G,
196    ) -> (AbortableList<HoprProbeProcess>, ProbeClassifierState<G>)
197    where
198        T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
199        T::Error: Send,
200        V: futures::Stream<Item = (OffchainPublicKey, PingQueryReplier)> + Send + Sync + 'static,
201        Tr: ProbingTrafficGeneration + Send + Sync + 'static,
202        G: NetworkGraphView + NetworkGraphUpdate + Clone + Send + Sync + 'static,
203    {
204        let max_parallel_probes = self.cfg.max_parallel_probes;
205
206        let probing_routes = probing_traffic_generator.build();
207
208        // Currently active probes
209        let network_graph_internal_neighbor = network_graph.clone();
210        let network_graph_internal_path = network_graph.clone();
211        let timeout = self.cfg.timeout;
212        let active_neighbor_probes: moka::future::Cache<CacheNeighborKey, CacheNeighborValue> =
213            moka::future::Cache::builder()
214                .time_to_live(timeout)
215                .max_capacity(100_000)
216                .async_eviction_listener(
217                    move |k: Arc<CacheNeighborKey>,
218                          v: CacheNeighborValue,
219                          cause|
220                          -> moka::notification::ListenerFuture {
221                        if matches!(cause, moka::notification::RemovalCause::Expired) {
222                            // If the eviction cause is expiration => record as a failed probe
223                            let store = network_graph_internal_neighbor.clone();
224                            let (peer, _start, notifier) = v;
225
226                            tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "neighbor probe failed");
227                            if let Some(replier) = notifier {
228                                if matches!(peer.as_ref(), NodeId::Offchain(_)) {
229                                    replier.notify(Err(()));
230                                } else {
231                                    tracing::warn!(
232                                        reason = "non-offchain peer",
233                                        "cannot notify timeout for non-offchain peer"
234                                    );
235                                }
236                            };
237
238                            if let NodeId::Offchain(opk) = peer.as_ref() {
239                                let opk: OffchainPublicKey = *opk;
240                                store
241                                    .record_edge::<NeighborTelemetry, PathTelemetry>(
242                                        hopr_api::graph::MeasurableEdge::Probe(Err(
243                                            NetworkGraphError::ProbeNeighborTimeout(Box::new(opk)),
244                                        )),
245                                    );
246                                futures::FutureExt::boxed(futures::future::ready(()))
247
248                            } else {
249                                futures::FutureExt::boxed(futures::future::ready(()))
250                            }
251                        } else {
252                            // If the eviction cause is not expiration, nothing needs to be done
253                            futures::FutureExt::boxed(futures::future::ready(()))
254                        }
255                    },
256                )
257                .build();
258
259        let active_path_probes: moka::future::Cache<Tag, (PathTelemetry, Arc<AllocatedTag>)> =
260            moka::future::Cache::builder()
261                .time_to_live(timeout)
262                .max_capacity(100_000)
263                .async_eviction_listener(
264                    move |tag: Arc<Tag>,
265                          (path, _allocated_tag): (PathTelemetry, Arc<AllocatedTag>),
266                          cause|
267                          -> moka::notification::ListenerFuture {
268                        if matches!(cause, moka::notification::RemovalCause::Expired) {
269                            // If the eviction cause is expiration => record as a failed probe
270                            let store = network_graph_internal_path.clone();
271
272                            tracing::debug!(%tag, reason = "timeout", "loopback probe failed");
273
274                            store.record_edge::<NeighborTelemetry, PathTelemetry>(
275                                hopr_api::graph::MeasurableEdge::Probe(Err(NetworkGraphError::ProbeLoopbackTimeout(
276                                    path,
277                                ))),
278                            );
279                            futures::FutureExt::boxed(futures::future::ready(()))
280                        } else {
281                            // If the eviction cause is not expiration, nothing needs to be done
282                            futures::FutureExt::boxed(futures::future::ready(()))
283                        }
284                    },
285                )
286                .build();
287
288        let push_to_network = api_out.clone();
289
290        let mut processes = AbortableList::default();
291
292        // -- Emit probes --
293        let direct_neighbors =
294            probing_routes
295                .map(|peer| (peer, None))
296                .merge(manual_events.filter_map(|(peer, notifier)| async move {
297                    let routing = DestinationRouting::Forward {
298                        destination: Box::new(peer.into()),
299                        pseudonym: Some(HoprPseudonym::random()),
300                        forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
301                        return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
302                    };
303                    Some((ProbeRouting::Neighbor(routing), Some(notifier)))
304                }));
305
306        let tag_allocator = self.tag_allocator.clone();
307        let classifier_neighbor_probes = active_neighbor_probes.clone();
308        let classifier_path_probes = active_path_probes.clone();
309        processes.insert(
310            HoprProbeProcess::Emit,
311            hopr_utils::spawn_as_abortable!(async move {
312                direct_neighbors
313                    .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
314                        let active_neighbor_probes = active_neighbor_probes.clone();
315                        let active_path_probes = active_path_probes.clone();
316                        let push_to_network = push_to_network.clone();
317                        let tag_allocator = tag_allocator.clone();
318
319                        async move {
320                            match peer {
321                                ProbeRouting::Neighbor(DestinationRouting::Forward {
322                                    destination,
323                                    pseudonym,
324                                    forward_options,
325                                    return_options,
326                                }) => {
327                                    let nonce = NeighborProbe::random_nonce();
328
329                                    let message = Message::Probe(nonce);
330
331                                    if let Ok(data) = message.try_into() {
332                                        let routing = DestinationRouting::Forward {
333                                            destination: destination.clone(),
334                                            pseudonym,
335                                            forward_options,
336                                            return_options,
337                                        };
338                                        // Neighbor probes are sent to a direct neighbor and returned via a zero-hop
339                                        // return path: only 1 SURB is ever consumed. See hoprnet/hoprnet#7972.
340                                        let data = ApplicationDataOut {
341                                            data,
342                                            packet_info: Some(OutgoingPacketInfo {
343                                                max_surbs_in_packet: 1,
344                                                ..Default::default()
345                                            }),
346                                        };
347                                        let mut push_to_network = push_to_network.clone();
348
349                                        if let Err(_error) = push_to_network.send((routing, data)).await {
350                                            tracing::error!("failed to send out a ping");
351                                        } else {
352                                            active_neighbor_probes
353                                                .insert(
354                                                    (
355                                                        pseudonym
356                                                            .expect("the pseudonym must be present in Forward routing"),
357                                                        nonce,
358                                                    ),
359                                                    (destination, current_time().as_unix_timestamp(), notifier),
360                                                )
361                                                .await;
362                                        }
363                                    } else {
364                                        tracing::error!("failed to convert ping message into data");
365                                    }
366                                }
367                                ProbeRouting::Neighbor(DestinationRouting::Return(_surb_matcher)) => tracing::error!(
368                                    error = "logical error",
369                                    "resolved transport routing is not forward"
370                                ),
371                                ProbeRouting::Looping((routing, path_id)) => {
372                                    let message = Message::Telemetry(PathTelemetry {
373                                        id: hopr_api::types::crypto_random::random_bytes(),
374                                        path: std::array::from_fn(|i| path_id[i / 8].to_le_bytes()[i % 8]),
375                                        timestamp: std::time::SystemTime::now()
376                                            .duration_since(std::time::UNIX_EPOCH)
377                                            .unwrap_or_default()
378                                            .as_millis(),
379                                    });
380
381                                    if let Some(allocated_tag) = tag_allocator.allocate() {
382                                        let tag_value = allocated_tag.value();
383
384                                        if let Ok(packet) = hopr_protocol_app::prelude::ApplicationData::new(
385                                            tag_value,
386                                            message.to_bytes().as_ref(),
387                                        ) {
388                                            let mut push_to_network = push_to_network.clone();
389
390                                            // Loopback telemetry probes are self-routed and never replied to via
391                                            // SURB, so no SURBs should be bundled. See hoprnet/hoprnet#7972.
392                                            if let Err(_error) = push_to_network
393                                                .send((
394                                                    routing,
395                                                    ApplicationDataOut {
396                                                        data: packet,
397                                                        packet_info: Some(OutgoingPacketInfo {
398                                                            max_surbs_in_packet: 0,
399                                                            ..Default::default()
400                                                        }),
401                                                    },
402                                                ))
403                                                .await
404                                            {
405                                                tracing::error!("failed to send out a ping");
406                                            } else {
407                                                // the object is constructed above, so will always match
408                                                if let Message::Telemetry(telemetry) = message {
409                                                    active_path_probes
410                                                        .insert(tag_value.into(), (telemetry, Arc::new(allocated_tag)))
411                                                        .await;
412                                                }
413                                            }
414                                        } else {
415                                            tracing::error!("failed to construct data for path telemetry")
416                                        }
417                                    } else {
418                                        tracing::warn!("probing telemetry tag pool exhausted, skipping loopback probe");
419                                    }
420                                }
421                            }
422                        }
423                    })
424                    .inspect(|_| {
425                        tracing::warn!(
426                            task = "transport (probe - generate outgoing)",
427                            "long-running background task finished"
428                        )
429                    })
430                    .await;
431            }),
432        );
433
434        let classifier = ProbeClassifierState {
435            active_neighbor_probes: classifier_neighbor_probes,
436            active_path_probes: classifier_path_probes,
437            network_graph,
438        };
439
440        (processes, classifier)
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use std::{collections::VecDeque, sync::RwLock, time::Duration};
447
448    use async_trait::async_trait;
449    use futures::future::BoxFuture;
450    use hopr_api::{
451        graph::{
452            EdgeLinkObservable, MeasurableEdge, NetworkGraphError,
453            traits::{EdgeNetworkObservableRead, EdgeObservableRead, EdgeObservableWrite, EdgeProtocolObservable},
454        },
455        types::crypto::keypairs::{ChainKeypair, Keypair, OffchainKeypair},
456    };
457    use hopr_ct_immediate::{ImmediateNeighborProber, ProberConfig};
458    use hopr_protocol_app::prelude::{ApplicationData, ReservedTag, Tag};
459
460    use super::*;
461    use crate::errors::ProbeError;
462
463    lazy_static::lazy_static!(
464        static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
465        static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
466        static ref NEIGHBOURS: Vec<OffchainPublicKey> = vec![
467            *OffchainKeypair::random().public(),
468            *OffchainKeypair::random().public(),
469            *OffchainKeypair::random().public(),
470            *OffchainKeypair::random().public(),
471        ];
472    );
473
474    /// Test stub implementation of Observable.
475    #[derive(Debug, Clone, Copy, Default)]
476    pub struct TestEdgeTransportObservations;
477
478    impl EdgeLinkObservable for TestEdgeTransportObservations {
479        fn record(&mut self, _latency: std::result::Result<Duration, ()>) {}
480
481        fn average_latency(&self) -> Option<Duration> {
482            None
483        }
484
485        fn average_probe_rate(&self) -> f64 {
486            1.0
487        }
488
489        fn score(&self) -> f64 {
490            1.0
491        }
492    }
493
494    impl EdgeNetworkObservableRead for TestEdgeTransportObservations {
495        fn is_connected(&self) -> bool {
496            true
497        }
498    }
499
500    impl EdgeProtocolObservable for TestEdgeTransportObservations {
501        fn capacity(&self) -> Option<u128> {
502            None
503        }
504    }
505
506    impl hopr_api::graph::EdgeImmediateProtocolObservable for TestEdgeTransportObservations {
507        fn ack_rate(&self) -> Option<f64> {
508            None
509        }
510    }
511
512    #[derive(Debug, Clone, Copy, Default)]
513    pub struct TestEdgeObservations;
514
515    impl EdgeObservableWrite for TestEdgeObservations {
516        fn record(&mut self, _measurement: hopr_api::graph::traits::EdgeWeightType) {}
517    }
518
519    impl EdgeObservableRead for TestEdgeObservations {
520        type ImmediateMeasurement = TestEdgeTransportObservations;
521        type IntermediateMeasurement = TestEdgeTransportObservations;
522
523        fn last_update(&self) -> std::time::Duration {
524            std::time::SystemTime::now()
525                .duration_since(std::time::UNIX_EPOCH)
526                .unwrap_or_default()
527        }
528
529        fn immediate_qos(&self) -> Option<&Self::ImmediateMeasurement> {
530            None
531        }
532
533        fn intermediate_qos(&self) -> Option<&Self::IntermediateMeasurement> {
534            None
535        }
536
537        fn score(&self) -> f64 {
538            1.0
539        }
540    }
541
542    #[derive(Debug, Clone)]
543    pub struct PeerStore {
544        me: OffchainPublicKey,
545        get_peers: Arc<RwLock<VecDeque<Vec<OffchainPublicKey>>>>,
546        #[allow(clippy::type_complexity)]
547        on_finished: Arc<RwLock<Vec<(OffchainPublicKey, crate::errors::Result<Duration>)>>>,
548    }
549
550    impl NetworkGraphUpdate for PeerStore {
551        fn record_edge<N, P>(&self, telemetry: MeasurableEdge<N, P>)
552        where
553            N: hopr_api::graph::MeasurablePeer + Send + Clone,
554            P: hopr_api::graph::MeasurablePath + Send + Clone,
555        {
556            let mut on_finished = self.on_finished.write().unwrap();
557
558            match telemetry {
559                hopr_api::graph::MeasurableEdge::Probe(Ok(EdgeTransportTelemetry::Neighbor(neighbor_telemetry))) => {
560                    let peer: OffchainPublicKey = *neighbor_telemetry.peer();
561                    let duration = neighbor_telemetry.rtt();
562                    on_finished.push((peer, Ok(duration)));
563                }
564                hopr_api::graph::MeasurableEdge::Probe(Err(NetworkGraphError::ProbeNeighborTimeout(peer))) => {
565                    on_finished.push((
566                        *peer.as_ref(),
567                        Err(ProbeError::TrafficError(NetworkGraphError::ProbeNeighborTimeout(peer))),
568                    ));
569                }
570                _ => panic!("unexpected telemetry type, unimplemented"),
571            }
572        }
573
574        fn record_node<N>(&self, _node: N)
575        where
576            N: hopr_api::graph::MeasurableNode + Send + Clone,
577        {
578            unimplemented!()
579        }
580    }
581
582    #[async_trait]
583    impl NetworkGraphView for PeerStore {
584        type NodeId = OffchainPublicKey;
585        type Observed = TestEdgeObservations;
586
587        fn identity(&self) -> &OffchainPublicKey {
588            &self.me
589        }
590
591        fn node_count(&self) -> usize {
592            self.get_peers.read().unwrap().front().map_or(0, |v| v.len())
593        }
594
595        fn contains_node(&self, _key: &OffchainPublicKey) -> bool {
596            false
597        }
598
599        /// Returns a stream of all known nodes in the network graph.
600        fn nodes(&self) -> futures::stream::BoxStream<'static, OffchainPublicKey> {
601            let mut get_peers = self.get_peers.write().unwrap();
602            Box::pin(futures::stream::iter(get_peers.pop_front().unwrap_or_default()))
603        }
604
605        fn edge(&self, _src: &OffchainPublicKey, _dest: &OffchainPublicKey) -> Option<TestEdgeObservations> {
606            Some(TestEdgeObservations)
607        }
608    }
609
610    type TestClassifier = ProbeClassifierState<PeerStore>;
611
612    struct TestInterface {
613        probe_classifier: TestClassifier,
614        from_probing_to_network_rx: futures::channel::mpsc::Receiver<(DestinationRouting, ApplicationDataOut)>,
615        from_probing_to_network_tx: futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
616        manual_probe_tx: futures::channel::mpsc::Sender<(OffchainPublicKey, PingQueryReplier)>,
617    }
618
619    async fn test_with_probing<F, Fut>(cfg: ProbeConfig, store: PeerStore, test: F) -> anyhow::Result<()>
620    where
621        Fut: std::future::Future<Output = anyhow::Result<()>>,
622        F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
623    {
624        let tag_allocators = hopr_transport_tag_allocator::create_allocators(
625            ReservedTag::range().end..u16::MAX as u64 + 1,
626            [
627                (hopr_transport_tag_allocator::Usage::Session, 2048),
628                (hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry, 4000),
629                (hopr_transport_tag_allocator::Usage::ProvingTelemetry, 10000),
630            ],
631        )
632        .expect("tag allocators should be created");
633        let probing_allocator = tag_allocators
634            .into_iter()
635            .find_map(|(u, alloc)| matches!(u, hopr_transport_tag_allocator::Usage::ProvingTelemetry).then_some(alloc))
636            .expect("probing allocator should exist");
637
638        let probe = Probe::new(cfg, probing_allocator);
639
640        let (from_probing_to_network_tx, from_probing_to_network_rx) =
641            futures::channel::mpsc::channel::<(DestinationRouting, ApplicationDataOut)>(100);
642
643        let (manual_probe_tx, manual_probe_rx) =
644            futures::channel::mpsc::channel::<(OffchainPublicKey, PingQueryReplier)>(100);
645
646        let (jhs, probe_classifier) = probe
647            .continuously_scan(
648                from_probing_to_network_tx.clone(),
649                manual_probe_rx,
650                ImmediateNeighborProber::new(
651                    ProberConfig {
652                        interval: cfg.interval,
653                        recheck_threshold: cfg.recheck_threshold,
654                    },
655                    store.clone(),
656                ),
657                store,
658            )
659            .await;
660
661        let interface = TestInterface {
662            probe_classifier,
663            from_probing_to_network_rx,
664            from_probing_to_network_tx,
665            manual_probe_tx,
666        };
667
668        let result = test(interface).await;
669
670        jhs.abort_all();
671
672        result
673    }
674
675    const NO_PROBE_PASSES: f64 = 0.0;
676    const ALL_PROBES_PASS: f64 = 1.0;
677
678    /// Simulates the network: receives outgoing probe packets and feeds pong responses back
679    /// through the classifier (mirroring what the remote peer + packet pipeline would do).
680    fn concurrent_classify(
681        delay: Option<std::time::Duration>,
682        pass_rate: f64,
683        classifier: TestClassifier,
684        push_to_network: futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
685    ) -> impl Fn((DestinationRouting, ApplicationDataOut)) -> BoxFuture<'static, ()> {
686        debug_assert!(
687            (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
688            "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
689        );
690
691        move |(path, data_out): (DestinationRouting, ApplicationDataOut)| -> BoxFuture<'static, ()> {
692            let classifier = classifier.clone();
693            let push_to_network = push_to_network.clone();
694
695            Box::pin(async move {
696                if let DestinationRouting::Forward { pseudonym, .. } = path {
697                    let message: Message = data_out.data.try_into().expect("failed to convert data into message");
698                    if let Message::Probe(NeighborProbe::Ping(ping)) = message {
699                        let pong_message = Message::Probe(NeighborProbe::Pong(ping));
700
701                        if let Some(delay) = delay {
702                            tokio::time::sleep(delay).await;
703                        }
704
705                        if rand::random_range(NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate {
706                            let pseudonym = pseudonym.expect("the pseudonym is always known from cache");
707                            classifier
708                                .classify(
709                                    push_to_network,
710                                    pseudonym,
711                                    ApplicationDataIn {
712                                        data: pong_message
713                                            .try_into()
714                                            .expect("failed to convert pong message into data"),
715                                        packet_info: Default::default(),
716                                    },
717                                )
718                                .await;
719                        }
720                    }
721                };
722            })
723        }
724    }
725
726    #[tokio::test]
727    // #[tracing_test::traced_test]
728    async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
729        let cfg = ProbeConfig {
730            timeout: std::time::Duration::from_millis(5),
731            interval: std::time::Duration::from_secs(0),
732            ..Default::default()
733        };
734
735        let store = PeerStore {
736            me: *OFFCHAIN_KEYPAIR.public(),
737            get_peers: Arc::new(RwLock::new(VecDeque::new())),
738            on_finished: Arc::new(RwLock::new(Vec::new())),
739        };
740
741        test_with_probing(cfg, store, move |iface: TestInterface| async move {
742            let mut manual_probe_tx = iface.manual_probe_tx;
743            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
744            let from_probing_to_network_tx = iface.from_probing_to_network_tx;
745            let probe_classifier = iface.probe_classifier;
746
747            let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ()>>(128);
748            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
749
750            let _jh: hopr_utils::runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
751                from_probing_to_network_rx
752                    .for_each_concurrent(
753                        cfg.max_parallel_probes + 1,
754                        concurrent_classify(None, ALL_PROBES_PASS, probe_classifier, from_probing_to_network_tx),
755                    )
756                    .await;
757            });
758
759            let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
760                .await?
761                .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))?
762                .map_err(|_| anyhow::anyhow!("Probe failed"))?;
763
764            Ok(())
765        })
766        .await
767    }
768
769    #[tokio::test]
770    // #[tracing_test::traced_test]
771    async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
772        let cfg = ProbeConfig {
773            timeout: std::time::Duration::from_millis(5),
774            interval: std::time::Duration::from_secs(0),
775            ..Default::default()
776        };
777
778        let store = PeerStore {
779            me: *OFFCHAIN_KEYPAIR.public(),
780            get_peers: Arc::new(RwLock::new(VecDeque::new())),
781            on_finished: Arc::new(RwLock::new(Vec::new())),
782        };
783
784        test_with_probing(cfg, store, move |iface: TestInterface| async move {
785            let mut manual_probe_tx = iface.manual_probe_tx;
786            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
787            let from_probing_to_network_tx = iface.from_probing_to_network_tx;
788            let probe_classifier = iface.probe_classifier;
789
790            let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ()>>(128);
791            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
792
793            let _jh: hopr_utils::runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
794                from_probing_to_network_rx
795                    .for_each_concurrent(
796                        cfg.max_parallel_probes + 1,
797                        concurrent_classify(None, NO_PROBE_PASSES, probe_classifier, from_probing_to_network_tx),
798                    )
799                    .await;
800            });
801
802            assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
803
804            Ok(())
805        })
806        .await
807    }
808
809    #[tokio::test]
810    // #[tracing_test::traced_test]
811    async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
812        let cfg = ProbeConfig {
813            timeout: std::time::Duration::from_millis(20),
814            max_parallel_probes: NEIGHBOURS.len(),
815            interval: std::time::Duration::from_secs(0),
816            ..Default::default()
817        };
818
819        let store = PeerStore {
820            me: *OFFCHAIN_KEYPAIR.public(),
821            get_peers: Arc::new(RwLock::new({
822                let mut neighbors = VecDeque::new();
823                neighbors.push_back(NEIGHBOURS.clone());
824                neighbors
825            })),
826            on_finished: Arc::new(RwLock::new(Vec::new())),
827        };
828
829        test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
830            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
831            let from_probing_to_network_tx = iface.from_probing_to_network_tx;
832            let probe_classifier = iface.probe_classifier;
833
834            let _jh: hopr_utils::runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
835                from_probing_to_network_rx
836                    .for_each_concurrent(
837                        cfg.max_parallel_probes + 1,
838                        concurrent_classify(None, ALL_PROBES_PASS, probe_classifier, from_probing_to_network_tx),
839                    )
840                    .await;
841            });
842
843            // Wait for the full probe lifecycle: emit → network round trip → process.
844            // Must exceed cfg.timeout (the cache TTL) to avoid probes being evicted
845            // as timeouts before the pong response is processed.
846            tokio::time::sleep(cfg.timeout * 3).await;
847
848            Ok(())
849        })
850        .await?;
851
852        assert_eq!(
853            store
854                .on_finished
855                .read()
856                .expect("should be lockable")
857                .iter()
858                .filter(|(_peer, result)| result.is_ok())
859                .count(),
860            NEIGHBOURS.len()
861        );
862
863        Ok(())
864    }
865
866    #[tokio::test]
867    // #[tracing_test::traced_test]
868    async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
869        let cfg = ProbeConfig {
870            timeout: std::time::Duration::from_millis(10),
871            max_parallel_probes: NEIGHBOURS.len(),
872            interval: std::time::Duration::from_secs(0),
873            ..Default::default()
874        };
875
876        let store = PeerStore {
877            me: *OFFCHAIN_KEYPAIR.public(),
878            get_peers: Arc::new(RwLock::new({
879                let mut neighbors = VecDeque::new();
880                neighbors.push_back(NEIGHBOURS.clone());
881                neighbors
882            })),
883            on_finished: Arc::new(RwLock::new(Vec::new())),
884        };
885
886        let timeout = cfg.timeout * 2;
887
888        test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
889            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
890            let from_probing_to_network_tx = iface.from_probing_to_network_tx;
891            let probe_classifier = iface.probe_classifier;
892
893            let _jh: hopr_utils::runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
894                from_probing_to_network_rx
895                    .for_each_concurrent(
896                        cfg.max_parallel_probes + 1,
897                        concurrent_classify(
898                            Some(timeout),
899                            ALL_PROBES_PASS,
900                            probe_classifier,
901                            from_probing_to_network_tx,
902                        ),
903                    )
904                    .await;
905            });
906
907            // wait for the probes to start and finish
908            tokio::time::sleep(timeout * 2).await;
909
910            Ok(())
911        })
912        .await?;
913
914        assert_eq!(
915            store
916                .on_finished
917                .read()
918                .expect("should be lockable")
919                .iter()
920                .filter(|(_peer, result)| result.is_err())
921                .count(),
922            NEIGHBOURS.len()
923        );
924
925        Ok(())
926    }
927
928    #[tokio::test]
929    async fn probe_should_reply_with_pong_when_receiving_ping() -> anyhow::Result<()> {
930        use anyhow::Context;
931
932        let cfg = ProbeConfig {
933            timeout: std::time::Duration::from_millis(100),
934            interval: std::time::Duration::from_secs(10),
935            ..Default::default()
936        };
937
938        let store = PeerStore {
939            me: *OFFCHAIN_KEYPAIR.public(),
940            get_peers: Arc::new(RwLock::new(VecDeque::new())),
941            on_finished: Arc::new(RwLock::new(Vec::new())),
942        };
943
944        test_with_probing(cfg, store, move |iface: TestInterface| async move {
945            let probe_classifier = iface.probe_classifier;
946            let from_probing_to_network_tx = iface.from_probing_to_network_tx;
947            let mut from_probing_to_network_rx = iface.from_probing_to_network_rx;
948
949            // Build a Ping message with the reserved Ping tag
950            let ping = NeighborProbe::random_nonce();
951            let ping_nonce = match ping {
952                NeighborProbe::Ping(n) => n,
953                _ => unreachable!(),
954            };
955            let ping_msg = Message::Probe(ping);
956            let app_data: ApplicationData = ping_msg.try_into().context("converting ping to ApplicationData")?;
957
958            // Classify the ping directly — the classifier sends the pong reply to push_to_network
959            let result = probe_classifier
960                .classify(
961                    from_probing_to_network_tx,
962                    HoprPseudonym::random(),
963                    ApplicationDataIn {
964                        data: app_data,
965                        packet_info: Default::default(),
966                    },
967                )
968                .await;
969            anyhow::ensure!(matches!(result, ProbeDispatch::Consumed), "ping should be consumed");
970
971            // The probe should reply with a Pong on the network channel
972            let (routing, data_out) = tokio::time::timeout(Duration::from_secs(2), from_probing_to_network_rx.next())
973                .await
974                .context("timeout waiting for pong")?
975                .context("probe should send pong reply")?;
976
977            // Verify it's a Return routing (SURB-based reply)
978            anyhow::ensure!(
979                matches!(routing, DestinationRouting::Return(_)),
980                "pong should use Return routing, got: {routing:?}"
981            );
982
983            // Verify the payload is a Pong with the same nonce
984            let response_msg: Message = data_out.data.try_into().context("converting response to Message")?;
985            anyhow::ensure!(
986                matches!(response_msg, Message::Probe(NeighborProbe::Pong(n)) if n == ping_nonce),
987                "response should be Pong with matching nonce"
988            );
989
990            Ok(())
991        })
992        .await
993    }
994
995    #[tokio::test]
996    // #[tracing_test::traced_test]
997    async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
998        let cfg = ProbeConfig {
999            timeout: std::time::Duration::from_millis(20),
1000            interval: std::time::Duration::from_secs(0),
1001            ..Default::default()
1002        };
1003
1004        let store = PeerStore {
1005            me: *OFFCHAIN_KEYPAIR.public(),
1006            get_peers: Arc::new(RwLock::new({
1007                let mut neighbors = VecDeque::new();
1008                neighbors.push_back(NEIGHBOURS.clone());
1009                neighbors
1010            })),
1011            on_finished: Arc::new(RwLock::new(Vec::new())),
1012        };
1013
1014        test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
1015            let probe_classifier = iface.probe_classifier;
1016            let from_probing_to_network_tx = iface.from_probing_to_network_tx;
1017
1018            let expected_data = ApplicationData::new(Tag::MAX, b"Hello, this is a test message!")?;
1019
1020            let result = probe_classifier
1021                .classify(
1022                    from_probing_to_network_tx,
1023                    HoprPseudonym::random(),
1024                    ApplicationDataIn {
1025                        data: expected_data.clone(),
1026                        packet_info: Default::default(),
1027                    },
1028                )
1029                .await;
1030
1031            match result {
1032                ProbeDispatch::Passthrough(_, actual) => assert_eq!(actual.data, expected_data),
1033                ProbeDispatch::Consumed => anyhow::bail!("expected Passthrough, got Consumed"),
1034            }
1035
1036            Ok(())
1037        })
1038        .await
1039    }
1040
1041    /// How a probe gets triggered inside the test: either via the manual ping channel
1042    /// (for neighbor probes) or by injecting a single [`ProbeRouting::Looping`] into the
1043    /// probing traffic generator (for loopback probes).
1044    #[derive(Clone)]
1045    enum TestProbeStrategy {
1046        ManualNeighbor,
1047        OneShotLoopback {
1048            routing: DestinationRouting,
1049            path_id: hopr_api::types::internal::routing::PathId,
1050        },
1051    }
1052
1053    impl hopr_api::ct::ProbingTrafficGeneration for TestProbeStrategy {
1054        fn build(&self) -> futures::stream::BoxStream<'static, hopr_api::ct::ProbeRouting> {
1055            match self {
1056                Self::ManualNeighbor => Box::pin(futures::stream::pending()),
1057                Self::OneShotLoopback { routing, path_id } => {
1058                    let probe = hopr_api::ct::ProbeRouting::Looping((routing.clone(), *path_id));
1059                    Box::pin(futures::StreamExt::chain(
1060                        futures::stream::iter(std::iter::once(probe)),
1061                        futures::stream::pending(),
1062                    ))
1063                }
1064            }
1065        }
1066    }
1067
1068    /// Regression test for hoprnet/hoprnet#7972: each probe type must be emitted with the
1069    /// exact SURB count it actually consumes — no more, no less — so the path planner does
1070    /// not resolve (and the SURB store does not accumulate) unused return paths.
1071    ///
1072    /// - Neighbor probe: 1 SURB (zero-hop pong return).
1073    /// - Loopback probe: 0 SURBs (self-routed, never replied to).
1074    #[rstest::rstest]
1075    #[case::neighbor_probe_requests_one_surb(TestProbeStrategy::ManualNeighbor, 1)]
1076    #[case::loopback_probe_requests_zero_surbs(
1077        TestProbeStrategy::OneShotLoopback {
1078            routing: DestinationRouting::Forward {
1079                destination: Box::new((*OFFCHAIN_KEYPAIR.public()).into()),
1080                pseudonym: Some(HoprPseudonym::random()),
1081                forward_options: RoutingOptions::Hops(1.try_into().expect("1 is a valid u8")),
1082                return_options: None,
1083            },
1084            path_id: [1, 2, 3, 4, 5],
1085        },
1086        0,
1087    )]
1088    #[tokio::test]
1089    async fn probe_should_emit_with_expected_surb_count(
1090        #[case] strategy: TestProbeStrategy,
1091        #[case] expected_max_surbs: usize,
1092    ) -> anyhow::Result<()> {
1093        let cfg = ProbeConfig {
1094            timeout: std::time::Duration::from_secs(1),
1095            interval: std::time::Duration::from_secs(0),
1096            ..Default::default()
1097        };
1098
1099        // Wire up a full probing harness with the parameterized strategy injected as the
1100        // traffic generator. Mirrors `test_with_probing` but allows arbitrary `ProbingTrafficGeneration`.
1101        let tag_allocators = hopr_transport_tag_allocator::create_allocators(
1102            ReservedTag::range().end..u16::MAX as u64 + 1,
1103            [
1104                (hopr_transport_tag_allocator::Usage::Session, 2048),
1105                (hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry, 4000),
1106                (hopr_transport_tag_allocator::Usage::ProvingTelemetry, 10000),
1107            ],
1108        )
1109        .expect("tag allocators should be created");
1110        let probing_allocator = tag_allocators
1111            .into_iter()
1112            .find_map(|(u, alloc)| matches!(u, hopr_transport_tag_allocator::Usage::ProvingTelemetry).then_some(alloc))
1113            .expect("probing allocator should exist");
1114
1115        let probe = Probe::new(cfg, probing_allocator);
1116
1117        let (from_probing_to_network_tx, mut from_probing_to_network_rx) =
1118            futures::channel::mpsc::channel::<(DestinationRouting, ApplicationDataOut)>(100);
1119        let (mut manual_probe_tx, manual_probe_rx) =
1120            futures::channel::mpsc::channel::<(OffchainPublicKey, PingQueryReplier)>(100);
1121
1122        let store = PeerStore {
1123            me: *OFFCHAIN_KEYPAIR.public(),
1124            get_peers: Arc::new(RwLock::new(VecDeque::new())),
1125            on_finished: Arc::new(RwLock::new(Vec::new())),
1126        };
1127
1128        // Kick off the probing process before triggering — the strategy's stream drives
1129        // the loopback case, and we push to `manual_probe_tx` below for the neighbor case.
1130        let is_manual = matches!(strategy, TestProbeStrategy::ManualNeighbor);
1131        let (jhs, _probe_classifier) = probe
1132            .continuously_scan(from_probing_to_network_tx, manual_probe_rx, strategy, store)
1133            .await;
1134
1135        if is_manual {
1136            let (tx, _rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ()>>(128);
1137            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
1138        }
1139
1140        let (_routing, data_out) =
1141            tokio::time::timeout(std::time::Duration::from_secs(1), from_probing_to_network_rx.next())
1142                .await?
1143                .ok_or_else(|| anyhow::anyhow!("no probe emitted"))?;
1144
1145        jhs.abort_all();
1146
1147        let packet_info = data_out
1148            .packet_info
1149            .ok_or_else(|| anyhow::anyhow!("probe must carry explicit OutgoingPacketInfo"))?;
1150        assert_eq!(
1151            packet_info.max_surbs_in_packet, expected_max_surbs,
1152            "probe must request exactly {expected_max_surbs} SURB(s)"
1153        );
1154
1155        Ok(())
1156    }
1157}