hopr_transport_probe/
probe.rs

1use std::sync::Arc;
2
3use futures::{FutureExt, SinkExt, StreamExt, pin_mut};
4use futures_concurrency::stream::StreamExt as _;
5use hopr_async_runtime::AbortableList;
6use hopr_crypto_random::Randomizable;
7use hopr_crypto_types::types::OffchainPublicKey;
8use hopr_internal_types::prelude::*;
9use hopr_network_types::prelude::*;
10use hopr_platform::time::native::current_time;
11use hopr_primitive_types::traits::AsUnixTimestamp;
12use hopr_protocol_app::prelude::{ApplicationDataIn, ApplicationDataOut, ReservedTag};
13use libp2p_identity::PeerId;
14
15use crate::{
16    HoprProbeProcess,
17    config::ProbeConfig,
18    content::{Message, NeighborProbe},
19    errors::ProbeError,
20    ping::PingQueryReplier,
21    traits::TrafficGeneration,
22    types::{NeighborTelemetry, Telemetry},
23};
24
25#[inline(always)]
26fn to_nonce(message: &Message) -> String {
27    match message {
28        Message::Probe(NeighborProbe::Ping(ping)) => hex::encode(ping),
29        Message::Probe(NeighborProbe::Pong(pong)) => hex::encode(pong),
30        _ => "<telemetry>".to_string(),
31    }
32}
33
34#[inline(always)]
35fn to_pseudonym(path: &DestinationRouting) -> Option<HoprPseudonym> {
36    match path {
37        DestinationRouting::Forward { pseudonym, .. } => *pseudonym,
38        DestinationRouting::Return(matcher) => match matcher {
39            hopr_network_types::types::SurbMatcher::Exact(sender_id) => Some(sender_id.pseudonym()),
40            hopr_network_types::types::SurbMatcher::Pseudonym(pseudonym) => Some(*pseudonym),
41        },
42    }
43}
44
45#[derive(Clone, Debug)]
46struct Sender<T> {
47    downstream: T,
48}
49
50impl<T> Sender<T>
51where
52    T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + 'static,
53{
54    #[tracing::instrument(level = "debug", skip(self, path, message), fields(message=%message, nonce=%to_nonce(&message), pseudonym=?to_pseudonym(&path)), ret(level = tracing::Level::TRACE), err(Display))]
55    async fn send_message(self, path: DestinationRouting, message: Message) -> crate::errors::Result<()> {
56        let push_to_network = self.downstream;
57        pin_mut!(push_to_network);
58        if push_to_network
59            .as_mut()
60            .send((path, ApplicationDataOut::with_no_packet_info(message.try_into()?)))
61            .await
62            .is_ok()
63        {
64            Ok(())
65        } else {
66            Err(ProbeError::SendError("transport error".to_string()))
67        }
68    }
69}
70
71type CacheKey = (HoprPseudonym, NeighborProbe);
72type CacheValue = (Box<NodeId>, std::time::Duration, Option<PingQueryReplier>);
73
74/// Probe functionality builder.
75///
76/// The builder holds information about this node's own addresses and the configuration for the probing process. It is
77/// then used to construct the probing process itself.
78pub struct Probe {
79    /// Probe configuration.
80    cfg: ProbeConfig,
81}
82
83impl Probe {
84    pub fn new(cfg: ProbeConfig) -> Self {
85        Self { cfg }
86    }
87
88    /// The main function that assembles and starts the probing process.
89    pub async fn continuously_scan<T, U, V, Up, Tr>(
90        self,
91        api: (T, U),      // lower (tx, rx) channels for sending and receiving messages
92        manual_events: V, // explicit requests from the API
93        move_up: Up,      // forward up non-probing messages from the network
94        traffic_generator: Tr,
95    ) -> AbortableList<HoprProbeProcess>
96    where
97        T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + 'static,
98        T::Error: Send,
99        U: futures::Stream<Item = (HoprPseudonym, ApplicationDataIn)> + Send + Sync + 'static,
100        V: futures::Stream<Item = (PeerId, PingQueryReplier)> + Send + Sync + 'static,
101        Up: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Clone + Send + Sync + 'static,
102        Tr: TrafficGeneration + Send + Sync + 'static,
103    {
104        let max_parallel_probes = self.cfg.max_parallel_probes;
105        let interval_between_rounds = self.cfg.interval;
106
107        let (probing_routes, reports) = traffic_generator.build();
108
109        // Currently active probes
110        let store_eviction = reports.clone();
111        let timeout = self.cfg.timeout;
112        let active_probes: moka::future::Cache<CacheKey, CacheValue> = moka::future::Cache::builder()
113            .time_to_live(timeout)
114            .max_capacity(100_000)
115            .async_eviction_listener(
116                move |k: Arc<(HoprPseudonym, NeighborProbe)>,
117                      v: (Box<NodeId>, std::time::Duration, Option<PingQueryReplier>),
118                      cause|
119                      -> moka::notification::ListenerFuture {
120                    if matches!(cause, moka::notification::RemovalCause::Expired) {
121                        // If the eviction cause is expiration => record as a failed probe
122                        let store = store_eviction.clone();
123                        let (peer, _start, notifier) = v;
124
125                        tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "probe failed");
126                        if let Some(replier) = notifier {
127                            if let NodeId::Offchain(opk) = peer.as_ref() {
128                                replier.notify(Err(ProbeError::ProbeNeighborTimeout(opk.into())));
129                            } else {
130                                tracing::warn!(
131                                    reason = "non-offchain peer",
132                                    "cannot notify timeout for non-offchain peer"
133                                );
134                            }
135                        };
136
137                        if let NodeId::Offchain(opk) = peer.as_ref() {
138                            let peer: PeerId = opk.into();
139                            futures::FutureExt::boxed(async move {
140                                pin_mut!(store);
141                                if let Err(error) = store.send(Err(ProbeError::ProbeNeighborTimeout(peer))).await {
142                                    tracing::error!(%peer, %error, "failed to record probe timeout");
143                                }
144                            })
145                        } else {
146                            futures::FutureExt::boxed(futures::future::ready(()))
147                        }
148                    } else {
149                        // If the eviction cause is not expiration, nothing needs to be done
150                        futures::FutureExt::boxed(futures::future::ready(()))
151                    }
152                },
153            )
154            .build();
155
156        let active_probes_rx = active_probes.clone();
157        let push_to_network = api.0.clone();
158
159        let mut processes = AbortableList::default();
160
161        // -- Emit probes --
162        let direct_neighbors =
163            probing_routes
164                .map(|peer| (peer, None))
165                .merge(manual_events.filter_map(|(peer, notifier)| async move {
166                    if let Ok(peer) = OffchainPublicKey::from_peerid(&peer) {
167                        let routing = DestinationRouting::Forward {
168                            destination: Box::new(peer.into()),
169                            pseudonym: Some(HoprPseudonym::random()),
170                            forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
171                            return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
172                        };
173                        Some((routing, Some(notifier)))
174                    } else {
175                        None
176                    }
177                }));
178
179        processes.insert(
180            HoprProbeProcess::Emit,
181            hopr_async_runtime::spawn_as_abortable!(async move {
182                hopr_async_runtime::prelude::sleep(2 * interval_between_rounds).await; // delay to allow network to stabilize
183
184                direct_neighbors
185                    .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
186                        let active_probes = active_probes.clone();
187                        let push_to_network = Sender {
188                            downstream: push_to_network.clone(),
189                        };
190
191                        async move {
192                            match peer {
193                                DestinationRouting::Forward {
194                                    destination,
195                                    pseudonym,
196                                    forward_options,
197                                    return_options,
198                                } => {
199                                    let nonce = NeighborProbe::random_nonce();
200
201                                    let message = Message::Probe(nonce);
202
203                                    let routing = DestinationRouting::Forward {
204                                        destination: destination.clone(),
205                                        pseudonym,
206                                        forward_options,
207                                        return_options,
208                                    };
209
210                                    if let Err(error) = push_to_network.send_message(routing, message).await {
211                                        tracing::error!(?destination, %error, "failed to send out a probe");
212                                    } else {
213                                        active_probes
214                                            .insert(
215                                                (
216                                                    pseudonym
217                                                        .expect("the pseudonym must be present in Forward routing"),
218                                                    nonce,
219                                                ),
220                                                (destination, current_time().as_unix_timestamp(), notifier),
221                                            )
222                                            .await;
223                                    }
224                                }
225                                DestinationRouting::Return(_surb_matcher) => tracing::error!(
226                                    error = "logical error",
227                                    "resolved transport routing is not forward"
228                                ),
229                            }
230                        }
231                    })
232                    .inspect(|_| {
233                        tracing::warn!(
234                            task = "transport (probe - generate outgoing)",
235                            "long-running background task finished"
236                        )
237                    })
238                    .await;
239            }),
240        );
241
242        // -- Process probes --
243        processes.insert(
244            HoprProbeProcess::Process,
245            hopr_async_runtime::spawn_as_abortable!(api.1.for_each_concurrent(max_parallel_probes, move |(pseudonym, in_data)| {
246                let active_probes = active_probes_rx.clone();
247                let push_to_network = Sender { downstream: api.0.clone() };
248                let move_up = move_up.clone();
249                let store = reports.clone();
250
251                async move {
252                    // TODO(v3.1): compare not only against ping tag, but also against telemetry that will be occurring on random tags
253                    if in_data.data.application_tag == ReservedTag::Ping.into() {
254                        let message: anyhow::Result<Message> = in_data.data.try_into().map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
255
256                        match message {
257                            Ok(message) => {
258                                match message {
259                                    Message::Telemetry(path_telemetry) => {
260                                        pin_mut!(store);
261                                        if let Err(error) = store.send(Ok(Telemetry::Loopback(path_telemetry))).await {
262                                            tracing::error!(%pseudonym, %error, "failed to record probe success");
263                                        }
264                                    },
265                                    Message::Probe(NeighborProbe::Ping(ping)) => {
266                                        tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
267                                        tracing::trace!(%pseudonym, nonce = hex::encode(ping), "wrapping a pong in the found SURB");
268
269                                        let message = Message::Probe(NeighborProbe::Pong(ping));
270                                        if let Err(error) = push_to_network.send_message(DestinationRouting::Return(pseudonym.into()), message).await {
271                                            tracing::error!(%pseudonym, %error, "failed to send back a pong");
272                                        }
273                                    },
274                                    Message::Probe(NeighborProbe::Pong(pong)) => {
275                                        tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
276                                        if let Some((peer, start, replier)) = active_probes.remove(&(pseudonym, NeighborProbe::Ping(pong))).await {
277                                            let latency = current_time()
278                                                .as_unix_timestamp()
279                                                .saturating_sub(start);
280
281                                            if let NodeId::Offchain(opk) = peer.as_ref() {
282                                                tracing::info!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful");
283                                                pin_mut!(store);
284                                                if let Err(error) = store.send(Ok(Telemetry::Neighbor(NeighborTelemetry {
285                                                    peer: opk.into(),
286                                                    rtt: latency,
287                                                }))).await {
288                                                    tracing::error!(%pseudonym, %error, "failed to record probe success");
289                                                }
290                                            } else {
291                                                tracing::warn!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful to non-offchain peer");
292                                            }
293
294                                            if let Some(replier) = replier {
295                                                replier.notify(Ok(latency))
296                                            };
297                                        } else {
298                                            tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
299                                        };
300                                    },
301                                }
302                            },
303                            Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
304                        }
305                    } else {
306                        // If the message is not a probing message, forward it up
307                        pin_mut!(move_up);
308                        if move_up.send((pseudonym, in_data)).await.is_err() {
309                            tracing::error!(%pseudonym, error = "receiver error", "failed to send message up");
310                        }
311                    }
312                }
313            }).inspect(|_| tracing::warn!(task = "transport (probe - processing incoming)", "long-running background task finished")))
314        );
315
316        processes
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use std::{collections::VecDeque, sync::RwLock, time::Duration};
323
324    use async_trait::async_trait;
325    use futures::future::BoxFuture;
326    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
327    use hopr_protocol_app::prelude::{ApplicationData, Tag};
328
329    use super::*;
330    use crate::{
331        neighbors::ImmediateNeighborProber,
332        traits::{PeerDiscoveryFetch, ProbeStatusUpdate},
333    };
334
335    lazy_static::lazy_static!(
336        static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
337        static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
338        static ref NEIGHBOURS: Vec<PeerId> = vec![
339            OffchainKeypair::random().public().into(),
340            OffchainKeypair::random().public().into(),
341            OffchainKeypair::random().public().into(),
342            OffchainKeypair::random().public().into(),
343        ];
344    );
345
346    #[derive(Debug, Clone)]
347    pub struct PeerStore {
348        get_peers: Arc<RwLock<VecDeque<Vec<PeerId>>>>,
349        #[allow(clippy::type_complexity)]
350        on_finished: Arc<RwLock<Vec<(PeerId, crate::errors::Result<Duration>)>>>,
351    }
352
353    #[async_trait]
354    impl ProbeStatusUpdate for PeerStore {
355        async fn on_finished(&self, peer: &PeerId, result: &crate::errors::Result<Duration>) {
356            let mut on_finished = self.on_finished.write().unwrap();
357            on_finished.push((
358                *peer,
359                match result {
360                    Ok(duration) => Ok(*duration),
361                    Err(_e) => Err(ProbeError::ProbeNeighborTimeout(peer.clone())),
362                },
363            ));
364        }
365    }
366
367    #[async_trait]
368    impl PeerDiscoveryFetch for PeerStore {
369        async fn get_peers(&self, _from_timestamp: std::time::SystemTime) -> Vec<PeerId> {
370            let mut get_peers = self.get_peers.write().unwrap();
371            get_peers.pop_front().unwrap_or_default()
372        }
373    }
374
375    struct TestInterface {
376        from_probing_up_rx: futures::channel::mpsc::Receiver<(HoprPseudonym, ApplicationDataIn)>,
377        from_probing_to_network_rx: futures::channel::mpsc::Receiver<(DestinationRouting, ApplicationDataOut)>,
378        from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
379        manual_probe_tx: futures::channel::mpsc::Sender<(PeerId, PingQueryReplier)>,
380    }
381
382    async fn test_with_probing<F, St, Fut>(cfg: ProbeConfig, store: St, test: F) -> anyhow::Result<()>
383    where
384        Fut: std::future::Future<Output = anyhow::Result<()>>,
385        F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
386        St: ProbeStatusUpdate + PeerDiscoveryFetch + Clone + Send + Sync + 'static,
387    {
388        let probe = Probe::new(cfg);
389
390        let (from_probing_up_tx, from_probing_up_rx) =
391            futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
392
393        let (from_probing_to_network_tx, from_probing_to_network_rx) =
394            futures::channel::mpsc::channel::<(DestinationRouting, ApplicationDataOut)>(100);
395
396        let (from_network_to_probing_tx, from_network_to_probing_rx) =
397            futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
398
399        let (manual_probe_tx, manual_probe_rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(100);
400
401        let interface = TestInterface {
402            from_probing_up_rx,
403            from_probing_to_network_rx,
404            from_network_to_probing_tx,
405            manual_probe_tx,
406        };
407
408        let jhs = probe
409            .continuously_scan(
410                (from_probing_to_network_tx, from_network_to_probing_rx),
411                manual_probe_rx,
412                from_probing_up_tx,
413                ImmediateNeighborProber::new(cfg, store),
414            )
415            .await;
416
417        let result = test(interface).await;
418
419        jhs.abort_all();
420
421        result
422    }
423
424    const NO_PROBE_PASSES: f64 = 0.0;
425    const ALL_PROBES_PASS: f64 = 1.0;
426
427    /// Channel that can drop any probes and concurrently replies to a probe correctly
428    fn concurrent_channel(
429        delay: Option<std::time::Duration>,
430        pass_rate: f64,
431        from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
432    ) -> impl Fn((DestinationRouting, ApplicationDataOut)) -> BoxFuture<'static, ()> {
433        debug_assert!(
434            (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
435            "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
436        );
437
438        move |(path, data_out): (DestinationRouting, ApplicationDataOut)| -> BoxFuture<'static, ()> {
439            let mut from_network_to_probing_tx = from_network_to_probing_tx.clone();
440
441            Box::pin(async move {
442                if let DestinationRouting::Forward { pseudonym, .. } = path {
443                    let message: Message = data_out.data.try_into().expect("failed to convert data into message");
444                    if let Message::Probe(NeighborProbe::Ping(ping)) = message {
445                        let pong_message = Message::Probe(NeighborProbe::Pong(ping));
446
447                        if let Some(delay) = delay {
448                            // Simulate a delay if specified
449                            tokio::time::sleep(delay).await;
450                        }
451
452                        if rand::Rng::gen_range(&mut rand::thread_rng(), NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate
453                        {
454                            from_network_to_probing_tx
455                                .send((
456                                    pseudonym.expect("the pseudonym is always known from cache"),
457                                    ApplicationDataIn {
458                                        data: pong_message
459                                            .try_into()
460                                            .expect("failed to convert pong message into data"),
461                                        packet_info: Default::default(),
462                                    },
463                                ))
464                                .await
465                                .expect("failed to send pong message");
466                        }
467                    }
468                };
469            })
470        }
471    }
472
473    #[tokio::test]
474    // #[tracing_test::traced_test]
475    async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
476        let cfg = ProbeConfig {
477            timeout: std::time::Duration::from_millis(5),
478            interval: std::time::Duration::from_secs(0),
479            ..Default::default()
480        };
481
482        let store = PeerStore {
483            get_peers: Arc::new(RwLock::new(VecDeque::new())),
484            on_finished: Arc::new(RwLock::new(Vec::new())),
485        };
486
487        test_with_probing(cfg, store, move |iface: TestInterface| async move {
488            let mut manual_probe_tx = iface.manual_probe_tx;
489            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
490            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
491
492            let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
493            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
494
495            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
496                from_probing_to_network_rx
497                    .for_each_concurrent(
498                        cfg.max_parallel_probes + 1,
499                        concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
500                    )
501                    .await;
502            });
503
504            let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
505                .await?
506                .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))??;
507
508            Ok(())
509        })
510        .await
511    }
512
513    #[tokio::test]
514    // #[tracing_test::traced_test]
515    async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
516        let cfg = ProbeConfig {
517            timeout: std::time::Duration::from_millis(5),
518            interval: std::time::Duration::from_secs(0),
519            ..Default::default()
520        };
521
522        let store = PeerStore {
523            get_peers: Arc::new(RwLock::new(VecDeque::new())),
524            on_finished: Arc::new(RwLock::new(Vec::new())),
525        };
526
527        test_with_probing(cfg, store, move |iface: TestInterface| async move {
528            let mut manual_probe_tx = iface.manual_probe_tx;
529            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
530            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
531
532            let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
533            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
534
535            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
536                from_probing_to_network_rx
537                    .for_each_concurrent(
538                        cfg.max_parallel_probes + 1,
539                        concurrent_channel(None, NO_PROBE_PASSES, from_network_to_probing_tx),
540                    )
541                    .await;
542            });
543
544            assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
545
546            Ok(())
547        })
548        .await
549    }
550
551    #[tokio::test]
552    // #[tracing_test::traced_test]
553    async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
554        let cfg = ProbeConfig {
555            timeout: std::time::Duration::from_millis(20),
556            max_parallel_probes: NEIGHBOURS.len(),
557            interval: std::time::Duration::from_secs(0),
558            ..Default::default()
559        };
560
561        let store = PeerStore {
562            get_peers: Arc::new(RwLock::new({
563                let mut neighbors = VecDeque::new();
564                neighbors.push_back(NEIGHBOURS.clone());
565                neighbors
566            })),
567            on_finished: Arc::new(RwLock::new(Vec::new())),
568        };
569
570        test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
571            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
572            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
573
574            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
575                from_probing_to_network_rx
576                    .for_each_concurrent(
577                        cfg.max_parallel_probes + 1,
578                        concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
579                    )
580                    .await;
581            });
582
583            // wait for the probes to start and finish
584            tokio::time::sleep(cfg.timeout).await;
585
586            Ok(())
587        })
588        .await?;
589
590        assert_eq!(
591            store
592                .on_finished
593                .read()
594                .expect("should be lockable")
595                .iter()
596                .filter(|(_peer, result)| result.is_ok())
597                .count(),
598            NEIGHBOURS.len()
599        );
600
601        Ok(())
602    }
603
604    #[tokio::test]
605    // #[tracing_test::traced_test]
606    async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
607        let cfg = ProbeConfig {
608            timeout: std::time::Duration::from_millis(10),
609            max_parallel_probes: NEIGHBOURS.len(),
610            interval: std::time::Duration::from_secs(0),
611            ..Default::default()
612        };
613
614        let store = PeerStore {
615            get_peers: Arc::new(RwLock::new({
616                let mut neighbors = VecDeque::new();
617                neighbors.push_back(NEIGHBOURS.clone());
618                neighbors
619            })),
620            on_finished: Arc::new(RwLock::new(Vec::new())),
621        };
622
623        let timeout = cfg.timeout * 2;
624
625        test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
626            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
627            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
628
629            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
630                from_probing_to_network_rx
631                    .for_each_concurrent(
632                        cfg.max_parallel_probes + 1,
633                        concurrent_channel(Some(timeout), ALL_PROBES_PASS, from_network_to_probing_tx),
634                    )
635                    .await;
636            });
637
638            // wait for the probes to start and finish
639            tokio::time::sleep(timeout * 2).await;
640
641            Ok(())
642        })
643        .await?;
644
645        assert_eq!(
646            store
647                .on_finished
648                .read()
649                .expect("should be lockable")
650                .iter()
651                .filter(|(_peer, result)| result.is_err())
652                .count(),
653            NEIGHBOURS.len()
654        );
655
656        Ok(())
657    }
658
659    #[tokio::test]
660    // #[tracing_test::traced_test]
661    async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
662        let cfg = ProbeConfig {
663            timeout: std::time::Duration::from_millis(20),
664            interval: std::time::Duration::from_secs(0),
665            ..Default::default()
666        };
667
668        let store = PeerStore {
669            get_peers: Arc::new(RwLock::new({
670                let mut neighbors = VecDeque::new();
671                neighbors.push_back(NEIGHBOURS.clone());
672                neighbors
673            })),
674            on_finished: Arc::new(RwLock::new(Vec::new())),
675        };
676
677        test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
678            let mut from_network_to_probing_tx = iface.from_network_to_probing_tx;
679            let mut from_probing_up_rx = iface.from_probing_up_rx;
680
681            let expected_data = ApplicationData::new(Tag::MAX, b"Hello, this is a test message!")?;
682
683            from_network_to_probing_tx
684                .send((
685                    HoprPseudonym::random(),
686                    ApplicationDataIn {
687                        data: expected_data.clone(),
688                        packet_info: Default::default(),
689                    },
690                ))
691                .await?;
692
693            let actual = tokio::time::timeout(cfg.timeout, from_probing_up_rx.next())
694                .await?
695                .ok_or_else(|| anyhow::anyhow!("Did not return any data in time"))?
696                .1;
697
698            assert_eq!(actual.data, expected_data);
699
700            Ok(())
701        })
702        .await
703    }
704}