hopr_transport_probe/
probe.rs

1use std::{collections::HashMap, sync::Arc};
2
3use anyhow::Context;
4use futures::{FutureExt, SinkExt, StreamExt, channel::oneshot::channel, pin_mut};
5use futures_concurrency::stream::StreamExt as _;
6use hopr_crypto_random::Randomizable;
7use hopr_crypto_types::types::OffchainPublicKey;
8use hopr_internal_types::protocol::HoprPseudonym;
9use hopr_network_types::types::{ResolvedTransportRouting, ValidatedPath};
10use hopr_platform::time::native::current_time;
11use hopr_primitive_types::{prelude::Address, traits::AsUnixTimestamp};
12use hopr_protocol_app::prelude::{ApplicationData, ReservedTag};
13use hopr_transport_protocol::processor::{PacketError, PacketSendFinalizer};
14use libp2p_identity::PeerId;
15
16use crate::{
17    HoprProbeProcess,
18    config::ProbeConfig,
19    content::{Message, NeighborProbe},
20    errors::ProbeError,
21    neighbors::neighbors_to_probe,
22    ping::PingQueryReplier,
23    traits::{DbOperations, PeerDiscoveryFetch, ProbeStatusUpdate},
24};
25
26#[inline(always)]
27fn to_nonce(message: &Message) -> String {
28    match message {
29        Message::Probe(NeighborProbe::Ping(ping)) => hex::encode(ping),
30        Message::Probe(NeighborProbe::Pong(ping)) => hex::encode(ping),
31        _ => "<telemetry>".to_string(),
32    }
33}
34
35#[inline(always)]
36fn to_pseudonym(path: &ResolvedTransportRouting) -> HoprPseudonym {
37    match path {
38        ResolvedTransportRouting::Forward { pseudonym, .. } => *pseudonym,
39        ResolvedTransportRouting::Return(pseudonym, _) => pseudonym.pseudonym(),
40    }
41}
42
43#[derive(Clone, Debug)]
44struct Sender<T> {
45    downstream: T,
46}
47
48impl<T> Sender<T>
49where
50    T: futures::Sink<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)> + Clone + Send + Sync + 'static,
51{
52    #[tracing::instrument(level = "debug", skip(self, path, message), fields(message=%message, nonce=%to_nonce(&message), pseudonym=%to_pseudonym(&path)), ret(level = tracing::Level::TRACE), err(Display))]
53    async fn send_message(self, path: ResolvedTransportRouting, message: Message) -> crate::errors::Result<()> {
54        let (packet_sent_tx, packet_sent_rx) = channel::<std::result::Result<(), PacketError>>();
55
56        let push_to_network = self.downstream;
57        pin_mut!(push_to_network);
58        if push_to_network
59            .as_mut()
60            .send((message.into(), path, packet_sent_tx.into()))
61            .await
62            .is_ok()
63        {
64            packet_sent_rx
65                .await
66                .map_err(|error| ProbeError::SendError(error.to_string()))?
67                .map_err(|error| ProbeError::SendError(error.to_string()))
68        } else {
69            Err(ProbeError::SendError("transport error".to_string()))
70        }
71    }
72}
73
74/// Probe functionality builder.
75///
76/// The builder holds information about this node's own addresses and the configuration for the probing process. It is
77/// then used to construct the probing process itself.
78pub struct Probe {
79    /// Own addresses for self reference and surb creation.
80    me: (OffchainPublicKey, Address),
81    /// Probe configuration.
82    cfg: ProbeConfig,
83}
84
85impl Probe {
86    pub fn new(me: (OffchainPublicKey, Address), cfg: ProbeConfig) -> Self {
87        Self { me, cfg }
88    }
89
90    /// The main function that assembles and starts the probing process.
91    pub async fn continuously_scan<T, U, V, W, C, Up>(
92        self,
93        api: (T, U),      // lower (tx, rx) channels for sending and receiving messages
94        manual_events: V, // explicit requests from the API
95        store: W,         // peer store
96        db: C,            // database for SURB & peer resolution
97        move_up: Up,      // forward up non-probing messages from the network
98    ) -> HashMap<HoprProbeProcess, hopr_async_runtime::AbortHandle>
99    where
100        T: futures::Sink<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>
101            + Clone
102            + Send
103            + Sync
104            + 'static,
105        T::Error: Send,
106        U: futures::Stream<Item = (HoprPseudonym, ApplicationData)> + Send + Sync + 'static,
107        W: PeerDiscoveryFetch + ProbeStatusUpdate + Clone + Send + Sync + 'static,
108        C: DbOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
109        V: futures::Stream<Item = (PeerId, PingQueryReplier)> + Send + Sync + 'static,
110        Up: futures::Sink<(HoprPseudonym, ApplicationData)> + Clone + Send + Sync + 'static,
111    {
112        let max_parallel_probes = self.cfg.max_parallel_probes;
113        let interval_between_rounds = self.cfg.interval;
114
115        // For each probe target a cached version of transport routing is stored
116        let cache_peer_routing: moka::future::Cache<PeerId, ResolvedTransportRouting> = moka::future::Cache::builder()
117            .time_to_live(std::time::Duration::from_secs(600))
118            .max_capacity(100_000)
119            .build();
120
121        // Currently active probes
122        let store_eviction = store.clone();
123        let timeout = self.cfg.timeout;
124        let active_probes: moka::future::Cache<
125            (HoprPseudonym, NeighborProbe),
126            (PeerId, std::time::Duration, Option<PingQueryReplier>),
127        > = moka::future::Cache::builder()
128            .time_to_live(timeout)
129            .max_capacity(100_000)
130            .async_eviction_listener(
131                move |k: Arc<(HoprPseudonym, NeighborProbe)>,
132                      v: (PeerId, std::time::Duration, Option<PingQueryReplier>),
133                      cause|
134                      -> moka::notification::ListenerFuture {
135                    if matches!(cause, moka::notification::RemovalCause::Expired) {
136                        // If the eviction cause is expiration => record as a failed probe
137                        let store = store_eviction.clone();
138                        let (peer, _start, notifier) = v;
139
140                        tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "probe failed");
141                        if let Some(replier) = notifier {
142                            replier.notify(Err(ProbeError::Timeout(timeout.as_secs())));
143                        };
144                        async move {
145                            store
146                                .on_finished(&peer, &Err(ProbeError::Timeout(timeout.as_secs())))
147                                .await;
148                        }
149                        .boxed()
150                    } else {
151                        // If the eviction cause is not expiration, nothing needs to be done
152                        futures::future::ready(()).boxed()
153                    }
154                },
155            )
156            .build();
157
158        let active_probes_rx = active_probes.clone();
159        let db_rx = db.clone();
160        let push_to_network = api.0.clone();
161
162        let mut processes = HashMap::<HoprProbeProcess, hopr_async_runtime::AbortHandle>::new();
163
164        // -- Emit probes --
165        let direct_neighbors = neighbors_to_probe(store.clone(), self.cfg)
166            .map(|peer| (peer, None))
167            .merge(manual_events.map(|(peer, notifier)| (peer, Some(notifier))));
168
169        processes.insert(
170            HoprProbeProcess::Emit,
171            hopr_async_runtime::spawn_as_abortable!(async move {
172                hopr_async_runtime::prelude::sleep(2 * interval_between_rounds).await;   // delay to allow network to stabilize
173
174                direct_neighbors
175                .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
176                    let db = db.clone();
177                    let cache_peer_routing = cache_peer_routing.clone();
178                    let active_probes = active_probes.clone();
179                    let push_to_network = Sender { downstream: push_to_network.clone() };
180                    let me = self.me;
181
182                    async move {
183                        let result = cache_peer_routing
184                            .try_get_with(peer, async move {
185                                let cp_ofk = OffchainPublicKey::try_from(peer)
186                                    .context(format!("failed to convert {peer} to offchain public key"))?;
187                                let cp_address = db
188                                    .resolve_chain_key(&cp_ofk)
189                                    .await?
190                                    .ok_or_else(|| anyhow::anyhow!("Failed to resolve chain key for peer: {peer}"))?;
191
192                                Ok::<ResolvedTransportRouting, anyhow::Error>(ResolvedTransportRouting::Forward {
193                                    pseudonym: HoprPseudonym::random(),
194                                    forward_path: ValidatedPath::direct(cp_ofk, cp_address),
195                                    return_paths: vec![ValidatedPath::direct(me.0, me.1)],
196                                })
197                            })
198                            .await;
199
200                        match result {
201                            Ok(ResolvedTransportRouting::Forward { pseudonym, forward_path, return_paths }) => {
202                                let nonce = NeighborProbe::random_nonce();
203
204                                let message = Message::Probe(nonce);
205                                let path = ResolvedTransportRouting::Forward { pseudonym, forward_path, return_paths };
206                                if push_to_network.send_message(path, message).await.is_ok() {
207                                    active_probes
208                                        .insert((pseudonym, nonce), (peer, current_time().as_unix_timestamp(), notifier))
209                                        .await;
210                                }
211                            },
212                            Ok(_) => tracing::error!(%peer, error = "logical error", "resolved transport routing is not forward"),
213                            Err(error) => tracing::error!(%peer, %error, "failed to resolve transport routing"),
214                        };
215                    }
216                }).await;
217            })
218        );
219
220        // -- Process probes --
221        processes.insert(
222            HoprProbeProcess::Process,
223            hopr_async_runtime::spawn_as_abortable!(api.1.for_each_concurrent(max_parallel_probes, move |(pseudonym, data)| {
224                let active_probes = active_probes_rx.clone();
225                let push_to_network = Sender { downstream: api.0.clone() };
226                let db = db_rx.clone();
227                let store = store.clone();
228                let move_up = move_up.clone();
229
230                async move {
231                    // TODO(v3.1): compare not only against ping tag, but also against telemetry that will be occurring on random tags
232                    if data.application_tag == ReservedTag::Ping.into() {
233                        let message: anyhow::Result<Message> = data.try_into().map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
234
235                        match message {
236                            Ok(message) => {
237                                match message {
238                                    Message::Telemetry(_path_telemetry) => {
239                                        tracing::warn!(%pseudonym, reason = "feature not implemented", "this node could not originate the telemetry");
240                                    },
241                                    Message::Probe(NeighborProbe::Ping(ping)) => {
242                                        tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
243                                        match db.find_surb(pseudonym.into()).await.map(|found_surb| ResolvedTransportRouting::Return(found_surb.sender_id, found_surb.surb)) {
244                                            Ok(path) => {
245                                                tracing::trace!(%pseudonym, nonce = hex::encode(ping), "wrapping a pong in the found SURB");
246                                            let message = Message::Probe(NeighborProbe::Pong(ping));
247                                                if let Err(error) = push_to_network.send_message(path, message).await {
248                                                    tracing::error!(%pseudonym, %error, "failed to send back a pong");
249                                                }
250                                            },
251                                            Err(error) => tracing::error!(%pseudonym, %error, "failed to get a SURB, cannot send back a pong"),
252                                        }
253                                    },
254                                    Message::Probe(NeighborProbe::Pong(pong)) => {
255                                        tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
256                                        if let Some((peer, start, replier)) = active_probes.remove(&(pseudonym, NeighborProbe::Ping(pong))).await {
257                                            let latency = current_time()
258                                                .as_unix_timestamp()
259                                                .saturating_sub(start);
260                                            store.on_finished(&peer, &Ok(latency)).await;
261
262                                            if let Some(replier) = replier {
263                                                replier.notify(Ok(latency))
264                                            };
265                                        } else {
266                                            tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
267                                        };
268                                    },
269                                }
270                            },
271                            Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
272                    }
273                    } else {
274                        // If the message is not a probing message, forward it up
275                        pin_mut!(move_up);
276                        if move_up.send((pseudonym, data.clone())).await.is_err() {
277                            tracing::error!(%pseudonym, error = "receiver error", "failed to send message up");
278                        }
279                    }
280                }
281            }))
282        );
283
284        processes
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use std::{collections::VecDeque, sync::RwLock, time::Duration};
291
292    use async_trait::async_trait;
293    use futures::future::BoxFuture;
294    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
295    use hopr_db_api::prelude::FoundSurb;
296    use hopr_network_types::prelude::SurbMatcher;
297    use hopr_protocol_app::prelude::Tag;
298
299    use super::*;
300
301    lazy_static::lazy_static!(
302        static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
303        static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
304        static ref NEIGHBOURS: Vec<PeerId> = vec![
305            OffchainKeypair::random().public().into(),
306            OffchainKeypair::random().public().into(),
307            OffchainKeypair::random().public().into(),
308            OffchainKeypair::random().public().into(),
309        ];
310    );
311
312    #[derive(Debug, Clone)]
313    pub struct PeerStore {
314        get_peers: Arc<RwLock<VecDeque<Vec<PeerId>>>>,
315        #[allow(clippy::type_complexity)]
316        on_finished: Arc<RwLock<Vec<(PeerId, crate::errors::Result<Duration>)>>>,
317    }
318
319    #[async_trait]
320    impl ProbeStatusUpdate for PeerStore {
321        async fn on_finished(&self, peer: &PeerId, result: &crate::errors::Result<Duration>) {
322            let mut on_finished = self.on_finished.write().unwrap();
323            on_finished.push((
324                *peer,
325                match result {
326                    Ok(duration) => Ok(*duration),
327                    Err(_e) => Err(ProbeError::Timeout(1000)),
328                },
329            ));
330        }
331    }
332
333    #[async_trait]
334    impl PeerDiscoveryFetch for PeerStore {
335        async fn get_peers(&self, _from_timestamp: std::time::SystemTime) -> Vec<PeerId> {
336            let mut get_peers = self.get_peers.write().unwrap();
337            get_peers.pop_front().unwrap_or_default()
338        }
339    }
340
341    #[derive(Debug, Clone)]
342    pub struct Cache {}
343
344    #[async_trait]
345    impl DbOperations for Cache {
346        async fn find_surb(&self, _matcher: SurbMatcher) -> hopr_db_api::errors::Result<FoundSurb> {
347            // Mock implementation for testing purposes
348            Ok(FoundSurb {
349                sender_id: hopr_db_api::protocol::HoprSenderId::random(),
350                surb: random_memory_violating_surb(),
351                remaining: 0,
352            })
353        }
354
355        async fn resolve_chain_key(
356            &self,
357            _offchain_key: &OffchainPublicKey,
358        ) -> hopr_db_api::errors::Result<Option<Address>> {
359            Ok(Some(ONCHAIN_KEYPAIR.public().to_address()))
360        }
361    }
362
363    /// !!! This generates a completely random data blob that pretends to be a SURB.
364    ///
365    /// !!! It must never be accessed or invoked, only passed around.
366    fn random_memory_violating_surb() -> hopr_db_api::protocol::HoprSurb {
367        const SURB_SIZE: usize = std::mem::size_of::<hopr_db_api::protocol::HoprSurb>();
368
369        unsafe {
370            std::mem::transmute::<[u8; SURB_SIZE], hopr_db_api::protocol::HoprSurb>(hopr_crypto_random::random_bytes())
371        }
372    }
373
374    struct TestInterface {
375        from_probing_up_rx: futures::channel::mpsc::Receiver<(HoprPseudonym, ApplicationData)>,
376        from_probing_to_network_rx:
377            futures::channel::mpsc::Receiver<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>,
378        from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationData)>,
379        manual_probe_tx: futures::channel::mpsc::Sender<(PeerId, PingQueryReplier)>,
380    }
381
382    async fn test_with_probing<F, Db, St, Fut>(cfg: ProbeConfig, store: St, db: Db, test: F) -> anyhow::Result<()>
383    where
384        Fut: std::future::Future<Output = anyhow::Result<()>>,
385        F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
386        Db: DbOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
387        St: ProbeStatusUpdate + PeerDiscoveryFetch + Clone + Send + Sync + 'static,
388    {
389        let probe = Probe::new((*OFFCHAIN_KEYPAIR.public(), ONCHAIN_KEYPAIR.public().to_address()), cfg);
390
391        let (from_probing_up_tx, from_probing_up_rx) =
392            futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationData)>(100);
393
394        let (from_probing_to_network_tx, from_probing_to_network_rx) =
395            futures::channel::mpsc::channel::<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>(100);
396
397        let (from_network_to_probing_tx, from_network_to_probing_rx) =
398            futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationData)>(100);
399
400        let (manual_probe_tx, manual_probe_rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(100);
401
402        let interface = TestInterface {
403            from_probing_up_rx,
404            from_probing_to_network_rx,
405            from_network_to_probing_tx,
406            manual_probe_tx,
407        };
408
409        let jhs = probe
410            .continuously_scan(
411                (from_probing_to_network_tx, from_network_to_probing_rx),
412                manual_probe_rx,
413                store,
414                db,
415                from_probing_up_tx,
416            )
417            .await;
418
419        let result = test(interface).await;
420
421        jhs.into_iter().for_each(|(_name, handle)| handle.abort());
422
423        result
424    }
425
426    const NO_PROBE_PASSES: f64 = 0.0;
427    const ALL_PROBES_PASS: f64 = 1.0;
428
429    /// Channel that can drop any probes and concurrently replies to a probe correctly
430    fn concurrent_channel(
431        delay: Option<std::time::Duration>,
432        pass_rate: f64,
433        from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationData)>,
434    ) -> impl Fn((ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)) -> BoxFuture<'static, ()> {
435        debug_assert!(
436            (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
437            "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
438        );
439
440        move |(data, path, finalizer): (ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)| -> BoxFuture<'static, ()> {
441            let mut from_network_to_probing_tx = from_network_to_probing_tx.clone();
442
443            Box::pin(async move {
444                if let ResolvedTransportRouting::Forward { pseudonym, .. } = path {
445                    finalizer.finalize(Ok(()));
446
447                    let message: Message = data.try_into().expect("failed to convert data into message");
448                    if let Message::Probe(NeighborProbe::Ping(ping)) = message {
449                        let pong_message = Message::Probe(NeighborProbe::Pong(ping));
450
451                        if let Some(delay) = delay {
452                            // Simulate a delay if specified
453                            tokio::time::sleep(delay).await;
454                        }
455
456                        if rand::Rng::gen_range(&mut rand::thread_rng(), NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate {
457                            from_network_to_probing_tx
458                                .send((pseudonym, pong_message.try_into().expect("failed to convert pong message into data")))
459                                .await.expect("failed to send pong message");
460                        }
461                    }
462                };
463            })
464        }
465    }
466
467    #[tokio::test]
468    // #[tracing_test::traced_test]
469    async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
470        let cfg = ProbeConfig {
471            timeout: std::time::Duration::from_millis(5),
472            interval: std::time::Duration::from_secs(0),
473            ..Default::default()
474        };
475
476        let store = PeerStore {
477            get_peers: Arc::new(RwLock::new(VecDeque::new())),
478            on_finished: Arc::new(RwLock::new(Vec::new())),
479        };
480
481        test_with_probing(cfg, store, Cache {}, move |iface: TestInterface| async move {
482            let mut manual_probe_tx = iface.manual_probe_tx;
483            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
484            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
485
486            let (tx, mut rx) = futures::channel::mpsc::unbounded::<std::result::Result<Duration, ProbeError>>();
487            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
488
489            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
490                from_probing_to_network_rx
491                    .for_each_concurrent(
492                        cfg.max_parallel_probes + 1,
493                        concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
494                    )
495                    .await;
496            });
497
498            let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
499                .await?
500                .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))??;
501
502            Ok(())
503        })
504        .await
505    }
506
507    #[tokio::test]
508    // #[tracing_test::traced_test]
509    async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
510        let cfg = ProbeConfig {
511            timeout: std::time::Duration::from_millis(5),
512            interval: std::time::Duration::from_secs(0),
513            ..Default::default()
514        };
515
516        let store = PeerStore {
517            get_peers: Arc::new(RwLock::new(VecDeque::new())),
518            on_finished: Arc::new(RwLock::new(Vec::new())),
519        };
520
521        test_with_probing(cfg, store, Cache {}, move |iface: TestInterface| async move {
522            let mut manual_probe_tx = iface.manual_probe_tx;
523            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
524            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
525
526            let (tx, mut rx) = futures::channel::mpsc::unbounded::<std::result::Result<Duration, ProbeError>>();
527            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
528
529            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
530                from_probing_to_network_rx
531                    .for_each_concurrent(
532                        cfg.max_parallel_probes + 1,
533                        concurrent_channel(None, NO_PROBE_PASSES, from_network_to_probing_tx),
534                    )
535                    .await;
536            });
537
538            assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
539
540            Ok(())
541        })
542        .await
543    }
544
545    #[tokio::test]
546    // #[tracing_test::traced_test]
547    async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
548        let cfg = ProbeConfig {
549            timeout: std::time::Duration::from_millis(20),
550            max_parallel_probes: NEIGHBOURS.len(),
551            interval: std::time::Duration::from_secs(0),
552            ..Default::default()
553        };
554
555        let store = PeerStore {
556            get_peers: Arc::new(RwLock::new({
557                let mut neighbors = VecDeque::new();
558                neighbors.push_back(NEIGHBOURS.clone());
559                neighbors
560            })),
561            on_finished: Arc::new(RwLock::new(Vec::new())),
562        };
563
564        test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
565            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
566            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
567
568            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
569                from_probing_to_network_rx
570                    .for_each_concurrent(
571                        cfg.max_parallel_probes + 1,
572                        concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
573                    )
574                    .await;
575            });
576
577            // wait for the probes to start and finish
578            tokio::time::sleep(cfg.timeout).await;
579
580            Ok(())
581        })
582        .await?;
583
584        assert_eq!(
585            store
586                .on_finished
587                .read()
588                .expect("should be lockable")
589                .iter()
590                .filter(|(_peer, result)| result.is_ok())
591                .count(),
592            NEIGHBOURS.len()
593        );
594
595        Ok(())
596    }
597
598    #[tokio::test]
599    // #[tracing_test::traced_test]
600    async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
601        let cfg = ProbeConfig {
602            timeout: std::time::Duration::from_millis(10),
603            max_parallel_probes: NEIGHBOURS.len(),
604            interval: std::time::Duration::from_secs(0),
605            ..Default::default()
606        };
607
608        let store = PeerStore {
609            get_peers: Arc::new(RwLock::new({
610                let mut neighbors = VecDeque::new();
611                neighbors.push_back(NEIGHBOURS.clone());
612                neighbors
613            })),
614            on_finished: Arc::new(RwLock::new(Vec::new())),
615        };
616
617        let timeout = cfg.timeout * 2;
618
619        test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
620            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
621            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
622
623            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
624                from_probing_to_network_rx
625                    .for_each_concurrent(
626                        cfg.max_parallel_probes + 1,
627                        concurrent_channel(Some(timeout), ALL_PROBES_PASS, from_network_to_probing_tx),
628                    )
629                    .await;
630            });
631
632            // wait for the probes to start and finish
633            tokio::time::sleep(timeout * 2).await;
634
635            Ok(())
636        })
637        .await?;
638
639        assert_eq!(
640            store
641                .on_finished
642                .read()
643                .expect("should be lockable")
644                .iter()
645                .filter(|(_peer, result)| result.is_err())
646                .count(),
647            NEIGHBOURS.len()
648        );
649
650        Ok(())
651    }
652
653    #[tokio::test]
654    // #[tracing_test::traced_test]
655    async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
656        let cfg = ProbeConfig {
657            timeout: std::time::Duration::from_millis(20),
658            interval: std::time::Duration::from_secs(0),
659            ..Default::default()
660        };
661
662        let store = PeerStore {
663            get_peers: Arc::new(RwLock::new({
664                let mut neighbors = VecDeque::new();
665                neighbors.push_back(NEIGHBOURS.clone());
666                neighbors
667            })),
668            on_finished: Arc::new(RwLock::new(Vec::new())),
669        };
670
671        test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
672            let mut from_network_to_probing_tx = iface.from_network_to_probing_tx;
673            let mut from_probing_up_rx = iface.from_probing_up_rx;
674
675            let expected_data = ApplicationData::new(Tag::MAX, b"Hello, this is a test message!");
676
677            from_network_to_probing_tx
678                .send((HoprPseudonym::random(), expected_data.clone()))
679                .await?;
680
681            let actual = tokio::time::timeout(cfg.timeout, from_probing_up_rx.next())
682                .await?
683                .ok_or_else(|| anyhow::anyhow!("Did not return any data in time"))?
684                .1;
685
686            assert_eq!(actual, expected_data);
687
688            Ok(())
689        })
690        .await
691    }
692}