hopr_transport_probe/
probe.rs

1use std::{collections::HashMap, sync::Arc};
2
3use anyhow::Context;
4use futures::{FutureExt, SinkExt, StreamExt, channel::oneshot::channel, pin_mut};
5use futures_concurrency::stream::StreamExt as _;
6use hopr_crypto_random::Randomizable;
7use hopr_crypto_types::types::OffchainPublicKey;
8use hopr_internal_types::protocol::HoprPseudonym;
9use hopr_network_types::types::{ResolvedTransportRouting, SurbMatcher, ValidatedPath};
10use hopr_platform::time::native::current_time;
11use hopr_primitive_types::{prelude::Address, traits::AsUnixTimestamp};
12use hopr_transport_packet::prelude::{ApplicationData, ReservedTag};
13use hopr_transport_protocol::processor::{PacketError, PacketSendFinalizer};
14use libp2p_identity::PeerId;
15
16use crate::{
17    HoprProbeProcess,
18    config::ProbeConfig,
19    content::{Message, NeighborProbe},
20    errors::ProbeError,
21    neighbors::neighbors_to_probe,
22    ping::PingQueryReplier,
23    traits::{DbOperations, PeerDiscoveryFetch, ProbeStatusUpdate},
24};
25
26#[inline(always)]
27fn to_nonce(message: &Message) -> String {
28    match message {
29        Message::Probe(NeighborProbe::Ping(ping)) => hex::encode(ping),
30        Message::Probe(NeighborProbe::Pong(ping)) => hex::encode(ping),
31        _ => "<telemetry>".to_string(),
32    }
33}
34
35#[inline(always)]
36fn to_pseudonym(path: &ResolvedTransportRouting) -> HoprPseudonym {
37    match path {
38        ResolvedTransportRouting::Forward { pseudonym, .. } => *pseudonym,
39        ResolvedTransportRouting::Return(pseudonym, _) => pseudonym.pseudonym(),
40    }
41}
42
43#[derive(Clone, Debug)]
44struct Sender<T> {
45    downstream: T,
46}
47
48impl<T> Sender<T>
49where
50    T: futures::Sink<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)> + Clone + Send + Sync + 'static,
51{
52    #[tracing::instrument(level = "debug", skip(self, path, message), fields(message=%message, nonce=%to_nonce(&message), pseudonym=%to_pseudonym(&path)), ret(level = tracing::Level::TRACE), err(Display))]
53    async fn send_message(self, path: ResolvedTransportRouting, message: Message) -> crate::errors::Result<()> {
54        let message: ApplicationData = message
55            .try_into()
56            .map_err(|e: anyhow::Error| ProbeError::SendError(e.to_string()))?;
57        let (packet_sent_tx, packet_sent_rx) = channel::<std::result::Result<(), PacketError>>();
58
59        let push_to_network = self.downstream;
60        futures::pin_mut!(push_to_network);
61        if push_to_network
62            .as_mut()
63            .send((message, path, packet_sent_tx.into()))
64            .await
65            .is_ok()
66        {
67            packet_sent_rx
68                .await
69                .map_err(|error| ProbeError::SendError(error.to_string()))?
70                .map_err(|error| ProbeError::SendError(error.to_string()))
71        } else {
72            Err(ProbeError::SendError("transport error".to_string()))
73        }
74    }
75}
76
77/// Probe functionality builder.
78///
79/// The builder holds information about this node's own addresses and the configuration for the probing process. It is
80/// then used to construct the probing process itself.
81pub struct Probe {
82    /// Own addresses for self reference and surb creation.
83    me: (OffchainPublicKey, Address),
84    /// Probe configuration.
85    cfg: ProbeConfig,
86}
87
88impl Probe {
89    pub fn new(me: (OffchainPublicKey, Address), cfg: ProbeConfig) -> Self {
90        Self { me, cfg }
91    }
92
93    /// The main function that assembles and starts the probing process.
94    pub async fn continuously_scan<T, U, V, W, C, Up>(
95        self,
96        api: (T, U),      // lower (tx, rx) channels for sending and receiving messages
97        manual_events: V, // explicit requests from the API
98        store: W,         // peer store
99        db: C,            // database for SURB & peer resolution
100        move_up: Up,      // forward up non-probing messages from the network
101    ) -> HashMap<HoprProbeProcess, hopr_async_runtime::AbortHandle>
102    where
103        T: futures::Sink<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>
104            + Clone
105            + Send
106            + Sync
107            + 'static,
108        T::Error: Send,
109        U: futures::Stream<Item = (HoprPseudonym, ApplicationData)> + Send + Sync + 'static,
110        W: PeerDiscoveryFetch + ProbeStatusUpdate + Clone + Send + Sync + 'static,
111        C: DbOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
112        V: futures::Stream<Item = (PeerId, PingQueryReplier)> + Send + Sync + 'static,
113        Up: futures::Sink<(HoprPseudonym, ApplicationData)> + Clone + Send + Sync + 'static,
114    {
115        let max_parallel_probes = self.cfg.max_parallel_probes;
116
117        // For each probe target a cached version of transport routing is stored
118        let cache_peer_routing: moka::future::Cache<PeerId, ResolvedTransportRouting> = moka::future::Cache::builder()
119            .time_to_live(std::time::Duration::from_secs(600))
120            .max_capacity(100_000)
121            .build();
122
123        // Currently active probes
124        let store_eviction = store.clone();
125        let timeout = self.cfg.timeout;
126        let active_probes: moka::future::Cache<
127            (HoprPseudonym, NeighborProbe),
128            (PeerId, std::time::Duration, Option<PingQueryReplier>),
129        > = moka::future::Cache::builder()
130            .time_to_live(timeout)
131            .max_capacity(100_000)
132            .async_eviction_listener(
133                move |k: Arc<(HoprPseudonym, NeighborProbe)>,
134                      v: (PeerId, std::time::Duration, Option<PingQueryReplier>),
135                      cause|
136                      -> moka::notification::ListenerFuture {
137                    if matches!(cause, moka::notification::RemovalCause::Expired) {
138                        // If the eviction cause is expiration => record as a failed probe
139                        let store = store_eviction.clone();
140                        let (peer, _start, notifier) = v;
141
142                        tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "probe failed");
143                        if let Some(replier) = notifier {
144                            replier.notify(Err(ProbeError::Timeout(timeout.as_millis() as u64)));
145                        };
146                        async move {
147                            store
148                                .on_finished(&peer, &Err(ProbeError::Timeout(timeout.as_millis() as u64)))
149                                .await;
150                        }
151                        .boxed()
152                    } else {
153                        // If the eviction cause is not expiration, nothing needs to be done
154                        futures::future::ready(()).boxed()
155                    }
156                },
157            )
158            .build();
159
160        let active_probes_rx = active_probes.clone();
161        let db_rx = db.clone();
162        let push_to_network = api.0.clone();
163
164        let mut processes = HashMap::<HoprProbeProcess, hopr_async_runtime::AbortHandle>::new();
165
166        // -- Emit probes --
167        let direct_neighbors = neighbors_to_probe(store.clone(), self.cfg)
168            .map(|peer| (peer, None))
169            .merge(manual_events.map(|(peer, notifier)| (peer, Some(notifier))));
170
171        processes.insert(
172            HoprProbeProcess::Emit,
173            hopr_async_runtime::spawn_as_abortable(direct_neighbors
174                .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
175                    let db = db.clone();
176                    let cache_peer_routing = cache_peer_routing.clone();
177                    let active_probes = active_probes.clone();
178                    let push_to_network = Sender { downstream: push_to_network.clone() };
179                    let me = self.me;
180
181                    async move {
182                        let result = cache_peer_routing
183                            .try_get_with(peer, async move {
184                                let cp_ofk = OffchainPublicKey::try_from(peer)
185                                    .context(format!("failed to convert {peer} to offchain public key"))?;
186                                let cp_address = db
187                                    .resolve_chain_key(&cp_ofk)
188                                    .await?
189                                    .ok_or_else(|| anyhow::anyhow!("Failed to resolve chain key for peer: {peer}"))?;
190
191                                Ok::<ResolvedTransportRouting, anyhow::Error>(ResolvedTransportRouting::Forward {
192                                    pseudonym: HoprPseudonym::random(),
193                                    forward_path: ValidatedPath::direct(cp_ofk, cp_address),
194                                    return_paths: vec![ValidatedPath::direct(me.0, me.1)],
195                                })
196                            })
197                            .await;
198
199                        match result {
200                            Ok(ResolvedTransportRouting::Forward { pseudonym, forward_path, return_paths }) => {
201                                let nonce = NeighborProbe::random_nonce();
202
203                                let message = Message::Probe(nonce);
204                                let path = ResolvedTransportRouting::Forward { pseudonym, forward_path, return_paths };
205                                if push_to_network.send_message(path, message).await.is_ok() {
206                                    active_probes
207                                        .insert((pseudonym, nonce), (peer, current_time().as_unix_timestamp(), notifier))
208                                        .await;
209                                }
210                            },
211                            Ok(_) => tracing::error!(%peer, error = "logical error", "resolved transport routing is not forward"),
212                            Err(error) => tracing::error!(%peer, %error, "failed to resolve transport routing"),
213                        };
214                    }
215                })
216            )
217        );
218
219        // -- Process probes --
220        processes.insert(
221            HoprProbeProcess::Process,
222            hopr_async_runtime::spawn_as_abortable(api.1.for_each_concurrent(max_parallel_probes, move |(pseudonym, data)| {
223                let active_probes = active_probes_rx.clone();
224                let push_to_network = Sender { downstream: api.0.clone() };
225                let db = db_rx.clone();
226                let store = store.clone();
227                let move_up = move_up.clone();
228
229                async move {
230                    // TODO(v3.1): compare not only against ping tag, but also against telemetry that will be occuring on random tags
231                    if data.application_tag == ReservedTag::Ping.into() {
232                        let message: anyhow::Result<Message> = data.try_into().map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
233
234                        match message {
235                            Ok(message) => {
236                                match message {
237                                    Message::Telemetry(_path_telemetry) => {
238                                        tracing::warn!(%pseudonym, reason = "feature not implemented", "this node could not originate the telemetry");
239                                    },
240                                    Message::Probe(NeighborProbe::Ping(ping)) => {
241                                        tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
242                                        match db.find_surb(SurbMatcher::Pseudonym(pseudonym)).await.map(|(sender_id, surb)| ResolvedTransportRouting::Return(sender_id, surb)) {
243                                            Ok(path) => {
244                                                let message = Message::Probe(NeighborProbe::Pong(ping));
245                                                let _ = push_to_network.send_message(path, message).await;
246                                            },
247                                            Err(error) => tracing::error!(%pseudonym, %error, "failed to get a SURB, cannot send pong"),
248                                        }
249                                    },
250                                    Message::Probe(NeighborProbe::Pong(pong)) => {
251                                        tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
252                                        if let Some((peer, start, replier)) = active_probes.remove(&(pseudonym, NeighborProbe::Ping(pong))).await {
253                                            let latency = current_time()
254                                                .as_unix_timestamp()
255                                                .saturating_sub(start);
256                                            store.on_finished(&peer, &Ok(latency)).await;
257
258                                            if let Some(replier) = replier {
259                                                replier.notify(Ok(latency))
260                                            };
261                                        } else {
262                                            tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
263                                        };
264                                    },
265                                }
266                            },
267                            Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
268                    }
269                    } else {
270                        // If the message is not a probing message, forward it up
271                        pin_mut!(move_up);
272                        if move_up.send((pseudonym, data.clone())).await.is_err() {
273                            tracing::error!(%pseudonym, error = "receiver error", "failed to send message up");
274                        }
275                    }
276                }
277            }))
278        );
279
280        processes
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use std::{collections::VecDeque, sync::RwLock, time::Duration};
287
288    use async_trait::async_trait;
289    use futures::future::BoxFuture;
290    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
291    use hopr_transport_packet::prelude::Tag;
292
293    use super::*;
294
295    lazy_static::lazy_static!(
296        static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
297        static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
298        static ref NEIGHBOURS: Vec<PeerId> = vec![
299            OffchainKeypair::random().public().into(),
300            OffchainKeypair::random().public().into(),
301            OffchainKeypair::random().public().into(),
302            OffchainKeypair::random().public().into(),
303        ];
304    );
305
306    #[derive(Debug, Clone)]
307    pub struct PeerStore {
308        get_peers: Arc<RwLock<VecDeque<Vec<PeerId>>>>,
309        #[allow(clippy::type_complexity)]
310        on_finished: Arc<RwLock<Vec<(PeerId, crate::errors::Result<Duration>)>>>,
311    }
312
313    #[async_trait]
314    impl ProbeStatusUpdate for PeerStore {
315        async fn on_finished(&self, peer: &PeerId, result: &crate::errors::Result<Duration>) {
316            let mut on_finished = self.on_finished.write().unwrap();
317            on_finished.push((
318                *peer,
319                match result {
320                    Ok(duration) => Ok(*duration),
321                    Err(_e) => Err(ProbeError::Timeout(1000)),
322                },
323            ));
324        }
325    }
326
327    #[async_trait]
328    impl PeerDiscoveryFetch for PeerStore {
329        async fn get_peers(&self, _from_timestamp: std::time::SystemTime) -> Vec<PeerId> {
330            let mut get_peers = self.get_peers.write().unwrap();
331            get_peers.pop_front().unwrap_or_default()
332        }
333    }
334
335    #[derive(Debug, Clone)]
336    pub struct Cache {}
337
338    #[async_trait]
339    impl DbOperations for Cache {
340        async fn find_surb(
341            &self,
342            _matcher: SurbMatcher,
343        ) -> hopr_db_api::errors::Result<(hopr_db_api::protocol::HoprSenderId, hopr_db_api::protocol::HoprSurb)>
344        {
345            // Mock implementation for testing purposes
346            Ok((
347                hopr_db_api::protocol::HoprSenderId::random(),
348                random_memory_violating_surb(),
349            ))
350        }
351
352        async fn resolve_chain_key(
353            &self,
354            _offchain_key: &OffchainPublicKey,
355        ) -> hopr_db_api::errors::Result<Option<Address>> {
356            Ok(Some(ONCHAIN_KEYPAIR.public().to_address()))
357        }
358    }
359
360    /// !!! This generates a completely random data blob that pretends to be a SURB.
361    ///
362    /// !!! It must never by accessed or invoked, only passed around.
363    fn random_memory_violating_surb() -> hopr_db_api::protocol::HoprSurb {
364        const SURB_SIZE: usize = std::mem::size_of::<hopr_db_api::protocol::HoprSurb>();
365
366        unsafe {
367            std::mem::transmute::<[u8; SURB_SIZE], hopr_db_api::protocol::HoprSurb>(hopr_crypto_random::random_bytes())
368        }
369    }
370
371    struct TestInterface {
372        from_probing_up_rx: futures::channel::mpsc::Receiver<(HoprPseudonym, ApplicationData)>,
373        from_probing_to_network_rx:
374            futures::channel::mpsc::Receiver<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>,
375        from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationData)>,
376        manual_probe_tx: futures::channel::mpsc::Sender<(PeerId, PingQueryReplier)>,
377    }
378
379    async fn test_with_probing<F, Db, St, Fut>(cfg: ProbeConfig, store: St, db: Db, test: F) -> anyhow::Result<()>
380    where
381        Fut: std::future::Future<Output = anyhow::Result<()>>,
382        F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
383        Db: DbOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
384        St: ProbeStatusUpdate + PeerDiscoveryFetch + Clone + Send + Sync + 'static,
385    {
386        let probe = Probe::new((*OFFCHAIN_KEYPAIR.public(), ONCHAIN_KEYPAIR.public().to_address()), cfg);
387
388        let (from_probing_up_tx, from_probing_up_rx) =
389            futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationData)>(100);
390
391        let (from_probing_to_network_tx, from_probing_to_network_rx) =
392            futures::channel::mpsc::channel::<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>(100);
393
394        let (from_network_to_probing_tx, from_network_to_probing_rx) =
395            futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationData)>(100);
396
397        let (manual_probe_tx, manual_probe_rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(100);
398
399        let interface = TestInterface {
400            from_probing_up_rx,
401            from_probing_to_network_rx,
402            from_network_to_probing_tx,
403            manual_probe_tx,
404        };
405
406        let jhs = probe
407            .continuously_scan(
408                (from_probing_to_network_tx, from_network_to_probing_rx),
409                manual_probe_rx,
410                store,
411                db,
412                from_probing_up_tx,
413            )
414            .await;
415
416        let result = test(interface).await;
417
418        jhs.into_iter().for_each(|(_name, handle)| handle.abort());
419
420        result
421    }
422
423    const NO_PROBE_PASSES: f64 = 0.0;
424    const ALL_PROBES_PASS: f64 = 1.0;
425
426    /// Channel that can drop any probes and concurrently replies to a probe correctly
427    fn concurrent_channel(
428        delay: Option<std::time::Duration>,
429        pass_rate: f64,
430        from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationData)>,
431    ) -> impl Fn((ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)) -> BoxFuture<'static, ()> {
432        debug_assert!(
433            (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
434            "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
435        );
436
437        move |(data, path, finalizer): (ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)| -> BoxFuture<'static, ()> {
438            let mut from_network_to_probing_tx = from_network_to_probing_tx.clone();
439
440            Box::pin(async move {
441                if let ResolvedTransportRouting::Forward { pseudonym, .. } = path {
442                    finalizer.finalize(Ok(()));
443
444                    let message: Message = data.try_into().expect("failed to convert data into message");
445                    if let Message::Probe(NeighborProbe::Ping(ping)) = message {
446                        let pong_message = Message::Probe(NeighborProbe::Pong(ping));
447
448                        if let Some(delay) = delay {
449                            // Simulate a delay if specified
450                            tokio::time::sleep(delay).await;
451                        }
452
453                        if rand::Rng::gen_range(&mut rand::thread_rng(), NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate {
454                            from_network_to_probing_tx
455                                .send((pseudonym, pong_message.try_into().expect("failed to convert pong message into data")))
456                                .await.expect("failed to send pong message");
457                        }
458                    }
459                };
460            })
461        }
462    }
463
464    #[tokio::test]
465    // #[tracing_test::traced_test]
466    async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
467        let cfg = ProbeConfig {
468            timeout: std::time::Duration::from_millis(5),
469            ..Default::default()
470        };
471
472        let store = PeerStore {
473            get_peers: Arc::new(RwLock::new(VecDeque::new())),
474            on_finished: Arc::new(RwLock::new(Vec::new())),
475        };
476
477        test_with_probing(cfg, store, Cache {}, move |iface: TestInterface| async move {
478            let mut manual_probe_tx = iface.manual_probe_tx;
479            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
480            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
481
482            let (tx, mut rx) = futures::channel::mpsc::unbounded::<std::result::Result<Duration, ProbeError>>();
483            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
484
485            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
486                from_probing_to_network_rx
487                    .for_each_concurrent(
488                        cfg.max_parallel_probes + 1,
489                        concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
490                    )
491                    .await;
492            });
493
494            let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
495                .await?
496                .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))??;
497
498            Ok(())
499        })
500        .await
501    }
502
503    #[tokio::test]
504    // #[tracing_test::traced_test]
505    async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
506        let cfg = ProbeConfig {
507            timeout: std::time::Duration::from_millis(5),
508            ..Default::default()
509        };
510
511        let store = PeerStore {
512            get_peers: Arc::new(RwLock::new(VecDeque::new())),
513            on_finished: Arc::new(RwLock::new(Vec::new())),
514        };
515
516        test_with_probing(cfg, store, Cache {}, move |iface: TestInterface| async move {
517            let mut manual_probe_tx = iface.manual_probe_tx;
518            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
519            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
520
521            let (tx, mut rx) = futures::channel::mpsc::unbounded::<std::result::Result<Duration, ProbeError>>();
522            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
523
524            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
525                from_probing_to_network_rx
526                    .for_each_concurrent(
527                        cfg.max_parallel_probes + 1,
528                        concurrent_channel(None, NO_PROBE_PASSES, from_network_to_probing_tx),
529                    )
530                    .await;
531            });
532
533            assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
534
535            Ok(())
536        })
537        .await
538    }
539
540    #[tokio::test]
541    // #[tracing_test::traced_test]
542    async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
543        let cfg = ProbeConfig {
544            timeout: std::time::Duration::from_millis(20),
545            max_parallel_probes: NEIGHBOURS.len(),
546            ..Default::default()
547        };
548
549        let store = PeerStore {
550            get_peers: Arc::new(RwLock::new({
551                let mut neighbors = VecDeque::new();
552                neighbors.push_back(NEIGHBOURS.clone());
553                neighbors
554            })),
555            on_finished: Arc::new(RwLock::new(Vec::new())),
556        };
557
558        test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
559            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
560            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
561
562            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
563                from_probing_to_network_rx
564                    .for_each_concurrent(
565                        cfg.max_parallel_probes + 1,
566                        concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
567                    )
568                    .await;
569            });
570
571            // wait for the probes to start and finish
572            tokio::time::sleep(cfg.timeout).await;
573
574            Ok(())
575        })
576        .await?;
577
578        assert_eq!(
579            store
580                .on_finished
581                .read()
582                .expect("should be lockable")
583                .iter()
584                .filter(|(_peer, result)| result.is_ok())
585                .count(),
586            NEIGHBOURS.len()
587        );
588
589        Ok(())
590    }
591
592    #[tokio::test]
593    // #[tracing_test::traced_test]
594    async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
595        let cfg = ProbeConfig {
596            timeout: std::time::Duration::from_millis(10),
597            max_parallel_probes: NEIGHBOURS.len(),
598            ..Default::default()
599        };
600
601        let store = PeerStore {
602            get_peers: Arc::new(RwLock::new({
603                let mut neighbors = VecDeque::new();
604                neighbors.push_back(NEIGHBOURS.clone());
605                neighbors
606            })),
607            on_finished: Arc::new(RwLock::new(Vec::new())),
608        };
609
610        let timeout = cfg.timeout * 2;
611
612        test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
613            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
614            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
615
616            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
617                from_probing_to_network_rx
618                    .for_each_concurrent(
619                        cfg.max_parallel_probes + 1,
620                        concurrent_channel(Some(timeout), ALL_PROBES_PASS, from_network_to_probing_tx),
621                    )
622                    .await;
623            });
624
625            // wait for the probes to start and finish
626            tokio::time::sleep(timeout * 2).await;
627
628            Ok(())
629        })
630        .await?;
631
632        assert_eq!(
633            store
634                .on_finished
635                .read()
636                .expect("should be lockable")
637                .iter()
638                .filter(|(_peer, result)| result.is_err())
639                .count(),
640            NEIGHBOURS.len()
641        );
642
643        Ok(())
644    }
645
646    #[tokio::test]
647    // #[tracing_test::traced_test]
648    async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
649        let cfg = ProbeConfig {
650            timeout: std::time::Duration::from_millis(20),
651            ..Default::default()
652        };
653
654        let store = PeerStore {
655            get_peers: Arc::new(RwLock::new({
656                let mut neighbors = VecDeque::new();
657                neighbors.push_back(NEIGHBOURS.clone());
658                neighbors
659            })),
660            on_finished: Arc::new(RwLock::new(Vec::new())),
661        };
662
663        test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
664            let mut from_network_to_probing_tx = iface.from_network_to_probing_tx;
665            let mut from_probing_up_rx = iface.from_probing_up_rx;
666
667            let expected_data = ApplicationData {
668                application_tag: Tag::MAX.into(),
669                plain_text: b"Hello, this is a test message!".to_vec().into_boxed_slice(),
670            };
671
672            from_network_to_probing_tx
673                .send((HoprPseudonym::random(), expected_data.clone()))
674                .await?;
675
676            let actual = tokio::time::timeout(cfg.timeout, from_probing_up_rx.next())
677                .await?
678                .ok_or_else(|| anyhow::anyhow!("Did not return any data in time"))?
679                .1;
680
681            assert_eq!(actual, expected_data);
682
683            Ok(())
684        })
685        .await
686    }
687}