hopr_transport_probe/
probe.rs

1use std::sync::Arc;
2
3use futures::{FutureExt, SinkExt, StreamExt, pin_mut};
4use futures_concurrency::stream::StreamExt as _;
5use hopr_api::ct::{
6    NetworkGraphView, Telemetry, TrafficGeneration, traits::NetworkGraphUpdate, types::TrafficGenerationError,
7};
8use hopr_async_runtime::AbortableList;
9use hopr_crypto_random::Randomizable;
10use hopr_crypto_types::types::OffchainPublicKey;
11use hopr_internal_types::prelude::*;
12use hopr_network_types::prelude::*;
13use hopr_platform::time::native::current_time;
14use hopr_primitive_types::traits::AsUnixTimestamp;
15use hopr_protocol_app::prelude::{ApplicationDataIn, ApplicationDataOut, ReservedTag};
16use libp2p_identity::PeerId;
17
18use crate::{
19    HoprProbeProcess,
20    config::ProbeConfig,
21    content::Message,
22    errors::ProbeError,
23    ping::PingQueryReplier,
24    types::{NeighborProbe, NeighborTelemetry, PathTelemetry},
25};
26
27type CacheKey = (HoprPseudonym, NeighborProbe);
28type CacheValue = (Box<NodeId>, std::time::Duration, Option<PingQueryReplier>);
29
30/// Probe functionality builder.
31///
32/// The builder holds information about this node's own addresses and the configuration for the probing process. It is
33/// then used to construct the probing process itself.
34pub struct Probe {
35    /// Probe configuration.
36    cfg: ProbeConfig,
37}
38
39impl Probe {
40    pub fn new(cfg: ProbeConfig) -> Self {
41        Self { cfg }
42    }
43
44    /// The main function that assembles and starts the probing process.
45    pub async fn continuously_scan<T, U, V, Up, Tr, G>(
46        self,
47        api: (T, U),      // lower (tx, rx) channels for sending and receiving messages
48        manual_events: V, // explicit requests from the API
49        move_up: Up,      // forward up non-probing messages from the network
50        traffic_generator: Tr,
51        network_graph: G,
52    ) -> AbortableList<HoprProbeProcess>
53    where
54        T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + 'static,
55        T::Error: Send,
56        U: futures::Stream<Item = (HoprPseudonym, ApplicationDataIn)> + Send + Sync + 'static,
57        V: futures::Stream<Item = (PeerId, PingQueryReplier)> + Send + Sync + 'static,
58        Up: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Clone + Send + Sync + 'static,
59        Tr: TrafficGeneration + Send + Sync + 'static,
60        G: NetworkGraphView + NetworkGraphUpdate + Clone + Send + Sync + 'static,
61    {
62        let max_parallel_probes = self.cfg.max_parallel_probes;
63
64        let probing_routes = traffic_generator.build(network_graph.clone());
65
66        // Currently active probes
67        let network_graph_internal = network_graph.clone();
68        let timeout = self.cfg.timeout;
69        let active_probes: moka::future::Cache<CacheKey, CacheValue> = moka::future::Cache::builder()
70            .time_to_live(timeout)
71            .max_capacity(100_000)
72            .async_eviction_listener(
73                move |k: Arc<(HoprPseudonym, NeighborProbe)>,
74                      v: (Box<NodeId>, std::time::Duration, Option<PingQueryReplier>),
75                      cause|
76                      -> moka::notification::ListenerFuture {
77                    if matches!(cause, moka::notification::RemovalCause::Expired) {
78                        // If the eviction cause is expiration => record as a failed probe
79                        let store = network_graph_internal.clone();
80                        let (peer, _start, notifier) = v;
81
82                        tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "probe failed");
83                        if let Some(replier) = notifier {
84                            if let NodeId::Offchain(opk) = peer.as_ref() {
85                                replier.notify(Err(ProbeError::TrafficError(
86                                    TrafficGenerationError::ProbeNeighborTimeout(opk.into()),
87                                )));
88                            } else {
89                                tracing::warn!(
90                                    reason = "non-offchain peer",
91                                    "cannot notify timeout for non-offchain peer"
92                                );
93                            }
94                        };
95
96                        if let NodeId::Offchain(opk) = peer.as_ref() {
97                            let peer: PeerId = opk.into();
98                            futures::FutureExt::boxed(async move {
99                                store
100                                    .record::<NeighborTelemetry, PathTelemetry>(Err(
101                                        TrafficGenerationError::ProbeNeighborTimeout(peer),
102                                    ))
103                                    .await
104                            })
105                        } else {
106                            futures::FutureExt::boxed(futures::future::ready(()))
107                        }
108                    } else {
109                        // If the eviction cause is not expiration, nothing needs to be done
110                        futures::FutureExt::boxed(futures::future::ready(()))
111                    }
112                },
113            )
114            .build();
115
116        let active_probes_rx = active_probes.clone();
117        let push_to_network = api.0.clone();
118
119        let mut processes = AbortableList::default();
120
121        // -- Emit probes --
122        let direct_neighbors =
123            probing_routes
124                .map(|peer| (peer, None))
125                .merge(manual_events.filter_map(|(peer, notifier)| async move {
126                    if let Ok(peer) = OffchainPublicKey::from_peerid(&peer) {
127                        let routing = DestinationRouting::Forward {
128                            destination: Box::new(peer.into()),
129                            pseudonym: Some(HoprPseudonym::random()),
130                            forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
131                            return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
132                        };
133                        Some((routing, Some(notifier)))
134                    } else {
135                        None
136                    }
137                }));
138
139        processes.insert(
140            HoprProbeProcess::Emit,
141            hopr_async_runtime::spawn_as_abortable!(async move {
142                direct_neighbors
143                    .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
144                        let active_probes = active_probes.clone();
145                        let push_to_network = push_to_network.clone();
146
147                        async move {
148                            match peer {
149                                DestinationRouting::Forward {
150                                    destination,
151                                    pseudonym,
152                                    forward_options,
153                                    return_options,
154                                } => {
155                                    let nonce = NeighborProbe::random_nonce();
156
157                                    let message = Message::Probe(nonce);
158
159                                    if let Ok(data) = message.try_into() {
160                                        let routing = DestinationRouting::Forward {
161                                            destination: destination.clone(),
162                                            pseudonym,
163                                            forward_options,
164                                            return_options,
165                                        };
166                                        let data = ApplicationDataOut::with_no_packet_info(data);
167                                        pin_mut!(push_to_network);
168
169                                        if let Err(_error) = push_to_network.send((routing, data)).await {
170                                            tracing::error!("failed to send out a ping");
171                                        } else {
172                                            active_probes
173                                                .insert(
174                                                    (
175                                                        pseudonym
176                                                            .expect("the pseudonym must be present in Forward routing"),
177                                                        nonce,
178                                                    ),
179                                                    (destination, current_time().as_unix_timestamp(), notifier),
180                                                )
181                                                .await;
182                                        }
183                                    } else {
184                                        tracing::error!("failed to convert ping message into data");
185                                    }
186                                }
187                                DestinationRouting::Return(_surb_matcher) => tracing::error!(
188                                    error = "logical error",
189                                    "resolved transport routing is not forward"
190                                ),
191                            }
192                        }
193                    })
194                    .inspect(|_| {
195                        tracing::warn!(
196                            task = "transport (probe - generate outgoing)",
197                            "long-running background task finished"
198                        )
199                    })
200                    .await;
201            }),
202        );
203
204        // -- Process probes --
205        processes.insert(
206            HoprProbeProcess::Process,
207            hopr_async_runtime::spawn_as_abortable!(api.1.for_each_concurrent(max_parallel_probes, move |(pseudonym, in_data)| {
208                let active_probes = active_probes_rx.clone();
209                let push_to_network = api.0.clone();
210                let move_up = move_up.clone();
211                let store = network_graph.clone();
212
213                async move {
214                    // TODO(v3.1): compare not only against ping tag, but also against telemetry that will be occurring on random tags
215                    if in_data.data.application_tag == ReservedTag::Ping.into() {
216                        let message: anyhow::Result<Message> = in_data.data.try_into().map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
217
218                        match message {
219                            Ok(message) => {
220                                match message {
221                                    Message::Telemetry(path_telemetry) => {
222                                        store.record::<NeighborTelemetry, PathTelemetry>(Ok(Telemetry::Loopback(path_telemetry))).await
223                                    },
224                                    Message::Probe(NeighborProbe::Ping(ping)) => {
225                                        tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
226                                        tracing::trace!(%pseudonym, nonce = hex::encode(ping), "wrapping a pong in the found SURB");
227
228                                        let message = Message::Probe(NeighborProbe::Pong(ping));
229                                        if let Ok(data) = message.try_into() {
230                                            let routing = DestinationRouting::Return(pseudonym.into());
231                                            let data = ApplicationDataOut::with_no_packet_info(data);
232                                            pin_mut!(push_to_network);
233
234                                            if let Err(_error) = push_to_network.send((routing, data)).await {
235                                                tracing::error!(%pseudonym, "failed to send back a pong");
236                                            }
237                                        } else {
238                                            tracing::error!(%pseudonym, "failed to convert pong message into data");
239                                        }
240                                    },
241                                    Message::Probe(NeighborProbe::Pong(pong)) => {
242                                        tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
243                                        if let Some((peer, start, replier)) = active_probes.remove(&(pseudonym, NeighborProbe::Ping(pong))).await {
244                                            let latency = current_time()
245                                                .as_unix_timestamp()
246                                                .saturating_sub(start);
247
248                                            if let NodeId::Offchain(opk) = peer.as_ref() {
249                                                tracing::info!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful");
250                                                store.record::<NeighborTelemetry, PathTelemetry>(Ok(Telemetry::Neighbor(NeighborTelemetry {
251                                                    peer: opk.into(),
252                                                    rtt: latency,
253                                                }))).await
254                                            } else {
255                                                tracing::warn!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful to non-offchain peer");
256                                            }
257
258                                            if let Some(replier) = replier {
259                                                replier.notify(Ok(latency))
260                                            };
261                                        } else {
262                                            tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
263                                        };
264                                    },
265                                }
266                            },
267                            Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
268                        }
269                    } else {
270                        // If the message is not a probing message, forward it up
271                        pin_mut!(move_up);
272                        if move_up.send((pseudonym, in_data)).await.is_err() {
273                            tracing::error!(%pseudonym, error = "receiver error", "failed to send message up");
274                        }
275                    }
276                }
277            }).inspect(|_| tracing::warn!(task = "transport (probe - processing incoming)", "long-running background task finished")))
278        );
279
280        processes
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use std::{collections::VecDeque, sync::RwLock, time::Duration};
287
288    use async_trait::async_trait;
289    use futures::future::BoxFuture;
290    use hopr_api::ct::types::TrafficGenerationError;
291    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
292    use hopr_ct_telemetry::{ImmediateNeighborProber, ProberConfig};
293    use hopr_protocol_app::prelude::{ApplicationData, Tag};
294
295    use super::*;
296
297    lazy_static::lazy_static!(
298        static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
299        static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
300        static ref NEIGHBOURS: Vec<PeerId> = vec![
301            OffchainKeypair::random().public().into(),
302            OffchainKeypair::random().public().into(),
303            OffchainKeypair::random().public().into(),
304            OffchainKeypair::random().public().into(),
305        ];
306    );
307
308    #[derive(Debug, Clone)]
309    pub struct PeerStore {
310        get_peers: Arc<RwLock<VecDeque<Vec<PeerId>>>>,
311        #[allow(clippy::type_complexity)]
312        on_finished: Arc<RwLock<Vec<(PeerId, crate::errors::Result<Duration>)>>>,
313    }
314
315    #[async_trait]
316    impl NetworkGraphUpdate for PeerStore {
317        async fn record<N, P>(&self, telemetry: std::result::Result<Telemetry<N, P>, TrafficGenerationError<P>>)
318        where
319            N: hopr_api::ct::MeasurableNeighbor + Send + Clone,
320            P: hopr_api::ct::MeasurablePath + Send + Clone,
321        {
322            let mut on_finished = self.on_finished.write().unwrap();
323
324            match telemetry {
325                Ok(Telemetry::Neighbor(neighbor_telemetry)) => {
326                    let peer: PeerId = neighbor_telemetry.peer().clone();
327                    let duration = neighbor_telemetry.rtt();
328                    on_finished.push((peer, Ok(duration)));
329                }
330                Err(TrafficGenerationError::ProbeNeighborTimeout(peer)) => {
331                    on_finished.push((
332                        peer.clone(),
333                        Err(ProbeError::TrafficError(TrafficGenerationError::ProbeNeighborTimeout(
334                            peer,
335                        ))),
336                    ));
337                }
338                _ => panic!("unexpected telemetry type, unimplemented"),
339            }
340        }
341    }
342
343    #[async_trait]
344    impl NetworkGraphView for PeerStore {
345        /// Returns a stream of all known nodes in the network graph.
346        fn nodes(&self) -> futures::stream::BoxStream<'static, PeerId> {
347            let mut get_peers = self.get_peers.write().unwrap();
348            Box::pin(futures::stream::iter(get_peers.pop_front().unwrap_or_default()))
349        }
350
351        /// Returns a list of all routes to the given destination of the specified length.
352        async fn find_routes(&self, destination: &PeerId, length: usize) -> Vec<DestinationRouting> {
353            tracing::debug!(%destination, %length, "finding routes in test peer store");
354            vec![]
355        }
356    }
357
358    struct TestInterface {
359        from_probing_up_rx: futures::channel::mpsc::Receiver<(HoprPseudonym, ApplicationDataIn)>,
360        from_probing_to_network_rx: futures::channel::mpsc::Receiver<(DestinationRouting, ApplicationDataOut)>,
361        from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
362        manual_probe_tx: futures::channel::mpsc::Sender<(PeerId, PingQueryReplier)>,
363    }
364
365    async fn test_with_probing<F, St, Fut>(cfg: ProbeConfig, store: St, test: F) -> anyhow::Result<()>
366    where
367        Fut: std::future::Future<Output = anyhow::Result<()>>,
368        F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
369        St: NetworkGraphUpdate + NetworkGraphView + Clone + Send + Sync + 'static,
370    {
371        let probe = Probe::new(cfg.clone());
372
373        let (from_probing_up_tx, from_probing_up_rx) =
374            futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
375
376        let (from_probing_to_network_tx, from_probing_to_network_rx) =
377            futures::channel::mpsc::channel::<(DestinationRouting, ApplicationDataOut)>(100);
378
379        let (from_network_to_probing_tx, from_network_to_probing_rx) =
380            futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
381
382        let (manual_probe_tx, manual_probe_rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(100);
383
384        let interface = TestInterface {
385            from_probing_up_rx,
386            from_probing_to_network_rx,
387            from_network_to_probing_tx,
388            manual_probe_tx,
389        };
390
391        let jhs = probe
392            .continuously_scan(
393                (from_probing_to_network_tx, from_network_to_probing_rx),
394                manual_probe_rx,
395                from_probing_up_tx,
396                ImmediateNeighborProber::new(ProberConfig {
397                    interval: cfg.interval,
398                    recheck_threshold: cfg.recheck_threshold,
399                }),
400                store,
401            )
402            .await;
403
404        let result = test(interface).await;
405
406        jhs.abort_all();
407
408        result
409    }
410
411    const NO_PROBE_PASSES: f64 = 0.0;
412    const ALL_PROBES_PASS: f64 = 1.0;
413
414    /// Channel that can drop any probes and concurrently replies to a probe correctly
415    fn concurrent_channel(
416        delay: Option<std::time::Duration>,
417        pass_rate: f64,
418        from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
419    ) -> impl Fn((DestinationRouting, ApplicationDataOut)) -> BoxFuture<'static, ()> {
420        debug_assert!(
421            (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
422            "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
423        );
424
425        move |(path, data_out): (DestinationRouting, ApplicationDataOut)| -> BoxFuture<'static, ()> {
426            let mut from_network_to_probing_tx = from_network_to_probing_tx.clone();
427
428            Box::pin(async move {
429                if let DestinationRouting::Forward { pseudonym, .. } = path {
430                    let message: Message = data_out.data.try_into().expect("failed to convert data into message");
431                    if let Message::Probe(NeighborProbe::Ping(ping)) = message {
432                        let pong_message = Message::Probe(NeighborProbe::Pong(ping));
433
434                        if let Some(delay) = delay {
435                            // Simulate a delay if specified
436                            tokio::time::sleep(delay).await;
437                        }
438
439                        if rand::Rng::gen_range(&mut rand::thread_rng(), NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate
440                        {
441                            from_network_to_probing_tx
442                                .send((
443                                    pseudonym.expect("the pseudonym is always known from cache"),
444                                    ApplicationDataIn {
445                                        data: pong_message
446                                            .try_into()
447                                            .expect("failed to convert pong message into data"),
448                                        packet_info: Default::default(),
449                                    },
450                                ))
451                                .await
452                                .expect("failed to send pong message");
453                        }
454                    }
455                };
456            })
457        }
458    }
459
460    #[tokio::test]
461    // #[tracing_test::traced_test]
462    async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
463        let cfg = ProbeConfig {
464            timeout: std::time::Duration::from_millis(5),
465            interval: std::time::Duration::from_secs(0),
466            ..Default::default()
467        };
468
469        let store = PeerStore {
470            get_peers: Arc::new(RwLock::new(VecDeque::new())),
471            on_finished: Arc::new(RwLock::new(Vec::new())),
472        };
473
474        test_with_probing(cfg, store, move |iface: TestInterface| async move {
475            let mut manual_probe_tx = iface.manual_probe_tx;
476            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
477            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
478
479            let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
480            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
481
482            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
483                from_probing_to_network_rx
484                    .for_each_concurrent(
485                        cfg.max_parallel_probes + 1,
486                        concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
487                    )
488                    .await;
489            });
490
491            let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
492                .await?
493                .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))??;
494
495            Ok(())
496        })
497        .await
498    }
499
500    #[tokio::test]
501    // #[tracing_test::traced_test]
502    async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
503        let cfg = ProbeConfig {
504            timeout: std::time::Duration::from_millis(5),
505            interval: std::time::Duration::from_secs(0),
506            ..Default::default()
507        };
508
509        let store = PeerStore {
510            get_peers: Arc::new(RwLock::new(VecDeque::new())),
511            on_finished: Arc::new(RwLock::new(Vec::new())),
512        };
513
514        test_with_probing(cfg, store, move |iface: TestInterface| async move {
515            let mut manual_probe_tx = iface.manual_probe_tx;
516            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
517            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
518
519            let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
520            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
521
522            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
523                from_probing_to_network_rx
524                    .for_each_concurrent(
525                        cfg.max_parallel_probes + 1,
526                        concurrent_channel(None, NO_PROBE_PASSES, from_network_to_probing_tx),
527                    )
528                    .await;
529            });
530
531            assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
532
533            Ok(())
534        })
535        .await
536    }
537
538    #[tokio::test]
539    // #[tracing_test::traced_test]
540    async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
541        let cfg = ProbeConfig {
542            timeout: std::time::Duration::from_millis(20),
543            max_parallel_probes: NEIGHBOURS.len(),
544            interval: std::time::Duration::from_secs(0),
545            ..Default::default()
546        };
547
548        let store = PeerStore {
549            get_peers: Arc::new(RwLock::new({
550                let mut neighbors = VecDeque::new();
551                neighbors.push_back(NEIGHBOURS.clone());
552                neighbors
553            })),
554            on_finished: Arc::new(RwLock::new(Vec::new())),
555        };
556
557        test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
558            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
559            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
560
561            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
562                from_probing_to_network_rx
563                    .for_each_concurrent(
564                        cfg.max_parallel_probes + 1,
565                        concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
566                    )
567                    .await;
568            });
569
570            // wait for the probes to start and finish
571            tokio::time::sleep(cfg.timeout).await;
572
573            Ok(())
574        })
575        .await?;
576
577        assert_eq!(
578            store
579                .on_finished
580                .read()
581                .expect("should be lockable")
582                .iter()
583                .filter(|(_peer, result)| result.is_ok())
584                .count(),
585            NEIGHBOURS.len()
586        );
587
588        Ok(())
589    }
590
591    #[tokio::test]
592    // #[tracing_test::traced_test]
593    async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
594        let cfg = ProbeConfig {
595            timeout: std::time::Duration::from_millis(10),
596            max_parallel_probes: NEIGHBOURS.len(),
597            interval: std::time::Duration::from_secs(0),
598            ..Default::default()
599        };
600
601        let store = PeerStore {
602            get_peers: Arc::new(RwLock::new({
603                let mut neighbors = VecDeque::new();
604                neighbors.push_back(NEIGHBOURS.clone());
605                neighbors
606            })),
607            on_finished: Arc::new(RwLock::new(Vec::new())),
608        };
609
610        let timeout = cfg.timeout * 2;
611
612        test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
613            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
614            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
615
616            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
617                from_probing_to_network_rx
618                    .for_each_concurrent(
619                        cfg.max_parallel_probes + 1,
620                        concurrent_channel(Some(timeout), ALL_PROBES_PASS, from_network_to_probing_tx),
621                    )
622                    .await;
623            });
624
625            // wait for the probes to start and finish
626            tokio::time::sleep(timeout * 2).await;
627
628            Ok(())
629        })
630        .await?;
631
632        assert_eq!(
633            store
634                .on_finished
635                .read()
636                .expect("should be lockable")
637                .iter()
638                .filter(|(_peer, result)| result.is_err())
639                .count(),
640            NEIGHBOURS.len()
641        );
642
643        Ok(())
644    }
645
646    #[tokio::test]
647    // #[tracing_test::traced_test]
648    async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
649        let cfg = ProbeConfig {
650            timeout: std::time::Duration::from_millis(20),
651            interval: std::time::Duration::from_secs(0),
652            ..Default::default()
653        };
654
655        let store = PeerStore {
656            get_peers: Arc::new(RwLock::new({
657                let mut neighbors = VecDeque::new();
658                neighbors.push_back(NEIGHBOURS.clone());
659                neighbors
660            })),
661            on_finished: Arc::new(RwLock::new(Vec::new())),
662        };
663
664        test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
665            let mut from_network_to_probing_tx = iface.from_network_to_probing_tx;
666            let mut from_probing_up_rx = iface.from_probing_up_rx;
667
668            let expected_data = ApplicationData::new(Tag::MAX, b"Hello, this is a test message!")?;
669
670            from_network_to_probing_tx
671                .send((
672                    HoprPseudonym::random(),
673                    ApplicationDataIn {
674                        data: expected_data.clone(),
675                        packet_info: Default::default(),
676                    },
677                ))
678                .await?;
679
680            let actual = tokio::time::timeout(cfg.timeout, from_probing_up_rx.next())
681                .await?
682                .ok_or_else(|| anyhow::anyhow!("Did not return any data in time"))?
683                .1;
684
685            assert_eq!(actual.data, expected_data);
686
687            Ok(())
688        })
689        .await
690    }
691}