hopr_transport_probe/
probe.rs

1use std::{collections::HashMap, sync::Arc};
2
3use anyhow::Context;
4use futures::{FutureExt, SinkExt, StreamExt, pin_mut};
5use futures_concurrency::stream::StreamExt as _;
6use hopr_crypto_random::Randomizable;
7use hopr_crypto_types::types::OffchainPublicKey;
8use hopr_internal_types::protocol::HoprPseudonym;
9use hopr_network_types::types::{ResolvedTransportRouting, ValidatedPath};
10use hopr_platform::time::native::current_time;
11use hopr_primitive_types::{prelude::Address, traits::AsUnixTimestamp};
12use hopr_protocol_app::prelude::{ApplicationDataIn, ApplicationDataOut, ReservedTag};
13use libp2p_identity::PeerId;
14
15use crate::{
16    HoprProbeProcess,
17    config::ProbeConfig,
18    content::{Message, NeighborProbe},
19    errors::ProbeError,
20    neighbors::neighbors_to_probe,
21    ping::PingQueryReplier,
22    traits::{DbOperations, PeerDiscoveryFetch, ProbeStatusUpdate},
23};
24
25#[inline(always)]
26fn to_nonce(message: &Message) -> String {
27    match message {
28        Message::Probe(NeighborProbe::Ping(ping)) => hex::encode(ping),
29        Message::Probe(NeighborProbe::Pong(ping)) => hex::encode(ping),
30        _ => "<telemetry>".to_string(),
31    }
32}
33
34#[inline(always)]
35fn to_pseudonym(path: &ResolvedTransportRouting) -> HoprPseudonym {
36    match path {
37        ResolvedTransportRouting::Forward { pseudonym, .. } => *pseudonym,
38        ResolvedTransportRouting::Return(pseudonym, _) => pseudonym.pseudonym(),
39    }
40}
41
42#[derive(Clone, Debug)]
43struct Sender<T> {
44    downstream: T,
45}
46
47impl<T> Sender<T>
48where
49    T: futures::Sink<(ResolvedTransportRouting, ApplicationDataOut)> + Clone + Send + Sync + 'static,
50{
51    #[tracing::instrument(level = "debug", skip(self, path, message), fields(message=%message, nonce=%to_nonce(&message), pseudonym=%to_pseudonym(&path)), ret(level = tracing::Level::TRACE), err(Display))]
52    async fn send_message(self, path: ResolvedTransportRouting, message: Message) -> crate::errors::Result<()> {
53        let push_to_network = self.downstream;
54        pin_mut!(push_to_network);
55        if push_to_network
56            .as_mut()
57            .send((path, ApplicationDataOut::with_no_packet_info(message.try_into()?)))
58            .await
59            .is_ok()
60        {
61            Ok(())
62        } else {
63            Err(ProbeError::SendError("transport error".to_string()))
64        }
65    }
66}
67
68/// Probe functionality builder.
69///
70/// The builder holds information about this node's own addresses and the configuration for the probing process. It is
71/// then used to construct the probing process itself.
72pub struct Probe {
73    /// Own addresses for self reference and surb creation.
74    me: (OffchainPublicKey, Address),
75    /// Probe configuration.
76    cfg: ProbeConfig,
77}
78
79impl Probe {
80    pub fn new(me: (OffchainPublicKey, Address), cfg: ProbeConfig) -> Self {
81        Self { me, cfg }
82    }
83
84    /// The main function that assembles and starts the probing process.
85    pub async fn continuously_scan<T, U, V, W, C, Up>(
86        self,
87        api: (T, U),      // lower (tx, rx) channels for sending and receiving messages
88        manual_events: V, // explicit requests from the API
89        store: W,         // peer store
90        db: C,            // database for SURB & peer resolution
91        move_up: Up,      // forward up non-probing messages from the network
92    ) -> HashMap<HoprProbeProcess, hopr_async_runtime::AbortHandle>
93    where
94        T: futures::Sink<(ResolvedTransportRouting, ApplicationDataOut)> + Clone + Send + Sync + 'static,
95        T::Error: Send,
96        U: futures::Stream<Item = (HoprPseudonym, ApplicationDataIn)> + Send + Sync + 'static,
97        W: PeerDiscoveryFetch + ProbeStatusUpdate + Clone + Send + Sync + 'static,
98        C: DbOperations + Clone + Send + Sync + 'static,
99        V: futures::Stream<Item = (PeerId, PingQueryReplier)> + Send + Sync + 'static,
100        Up: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Clone + Send + Sync + 'static,
101    {
102        let max_parallel_probes = self.cfg.max_parallel_probes;
103        let interval_between_rounds = self.cfg.interval;
104
105        // For each probe target a cached version of transport routing is stored
106        let cache_peer_routing: moka::future::Cache<PeerId, ResolvedTransportRouting> = moka::future::Cache::builder()
107            .time_to_live(std::time::Duration::from_secs(600))
108            .max_capacity(100_000)
109            .build();
110
111        // Currently active probes
112        let store_eviction = store.clone();
113        let timeout = self.cfg.timeout;
114        let active_probes: moka::future::Cache<
115            (HoprPseudonym, NeighborProbe),
116            (PeerId, std::time::Duration, Option<PingQueryReplier>),
117        > = moka::future::Cache::builder()
118            .time_to_live(timeout)
119            .max_capacity(100_000)
120            .async_eviction_listener(
121                move |k: Arc<(HoprPseudonym, NeighborProbe)>,
122                      v: (PeerId, std::time::Duration, Option<PingQueryReplier>),
123                      cause|
124                      -> moka::notification::ListenerFuture {
125                    if matches!(cause, moka::notification::RemovalCause::Expired) {
126                        // If the eviction cause is expiration => record as a failed probe
127                        let store = store_eviction.clone();
128                        let (peer, _start, notifier) = v;
129
130                        tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "probe failed");
131                        if let Some(replier) = notifier {
132                            replier.notify(Err(ProbeError::Timeout(timeout.as_secs())));
133                        };
134                        async move {
135                            store
136                                .on_finished(&peer, &Err(ProbeError::Timeout(timeout.as_secs())))
137                                .await;
138                        }
139                        .boxed()
140                    } else {
141                        // If the eviction cause is not expiration, nothing needs to be done
142                        futures::future::ready(()).boxed()
143                    }
144                },
145            )
146            .build();
147
148        let active_probes_rx = active_probes.clone();
149        let db_rx = db.clone();
150        let push_to_network = api.0.clone();
151
152        let mut processes = HashMap::<HoprProbeProcess, hopr_async_runtime::AbortHandle>::new();
153
154        // -- Emit probes --
155        let direct_neighbors = neighbors_to_probe(store.clone(), self.cfg)
156            .map(|peer| (peer, None))
157            .merge(manual_events.map(|(peer, notifier)| (peer, Some(notifier))));
158
159        processes.insert(
160            HoprProbeProcess::Emit,
161            hopr_async_runtime::spawn_as_abortable!(async move {
162                hopr_async_runtime::prelude::sleep(2 * interval_between_rounds).await;   // delay to allow network to stabilize
163
164                direct_neighbors
165                    .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
166                        let db = db.clone();
167                        let cache_peer_routing = cache_peer_routing.clone();
168                        let active_probes = active_probes.clone();
169                        let push_to_network = Sender { downstream: push_to_network.clone() };
170                        let me = self.me;
171
172                        async move {
173                            let result = cache_peer_routing
174                                .try_get_with(peer, async move {
175                                    // TODO: This is a CPU intensive operation, convert the probing mechanism to use OffchainPublicKey instead of PeerIDs!
176                                    // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
177                                    let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer))
178                                        .await
179                                        .context(format!("failed to convert {peer} to offchain public key"))?;
180                                    let cp_address = db
181                                        .resolve_chain_key(&pubkey)
182                                        .await?
183                                        .ok_or_else(|| anyhow::anyhow!("Failed to resolve chain key for peer: {peer}"))?;
184
185                                    Ok::<ResolvedTransportRouting, anyhow::Error>(ResolvedTransportRouting::Forward {
186                                        pseudonym: HoprPseudonym::random(),
187                                        forward_path: ValidatedPath::direct(pubkey, cp_address),
188                                        return_paths: vec![ValidatedPath::direct(me.0, me.1)],
189                                    })
190                                })
191                                .await;
192
193                            match result {
194                                Ok(ResolvedTransportRouting::Forward { pseudonym, forward_path, return_paths }) => {
195                                    let nonce = NeighborProbe::random_nonce();
196
197                                    let message = Message::Probe(nonce);
198                                    let path = ResolvedTransportRouting::Forward { pseudonym, forward_path, return_paths };
199                                    if push_to_network.send_message(path, message).await.is_ok() {
200                                        active_probes
201                                            .insert((pseudonym, nonce), (peer, current_time().as_unix_timestamp(), notifier))
202                                            .await;
203                                    }
204                                },
205                                Ok(_) => tracing::error!(%peer, error = "logical error", "resolved transport routing is not forward"),
206                                Err(error) => tracing::error!(%peer, %error, "failed to resolve transport routing"),
207                            };
208                        }
209                    })
210                    .inspect(|_| tracing::warn!(task = "transport (probe - generate outgoing)", "long-running background task finished"))
211                    .await;
212            })
213        );
214
215        // -- Process probes --
216        processes.insert(
217            HoprProbeProcess::Process,
218            hopr_async_runtime::spawn_as_abortable!(api.1.for_each_concurrent(max_parallel_probes, move |(pseudonym, in_data)| {
219                let active_probes = active_probes_rx.clone();
220                let push_to_network = Sender { downstream: api.0.clone() };
221                let db = db_rx.clone();
222                let store = store.clone();
223                let move_up = move_up.clone();
224
225                async move {
226                    // TODO(v3.1): compare not only against ping tag, but also against telemetry that will be occurring on random tags
227                    if in_data.data.application_tag == ReservedTag::Ping.into() {
228                        let message: anyhow::Result<Message> = in_data.data.try_into().map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
229
230                        match message {
231                            Ok(message) => {
232                                match message {
233                                    Message::Telemetry(_path_telemetry) => {
234                                        tracing::warn!(%pseudonym, reason = "feature not implemented", "this node could not originate the telemetry");
235                                    },
236                                    Message::Probe(NeighborProbe::Ping(ping)) => {
237                                        tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
238                                        match db.find_surb(pseudonym.into()).await.map(|found_surb| ResolvedTransportRouting::Return(found_surb.sender_id, found_surb.surb)) {
239                                            Ok(path) => {
240                                                tracing::trace!(%pseudonym, nonce = hex::encode(ping), "wrapping a pong in the found SURB");
241                                            let message = Message::Probe(NeighborProbe::Pong(ping));
242                                                if let Err(error) = push_to_network.send_message(path, message).await {
243                                                    tracing::error!(%pseudonym, %error, "failed to send back a pong");
244                                                }
245                                            },
246                                            Err(error) => tracing::error!(%pseudonym, %error, "failed to get a SURB, cannot send back a pong"),
247                                        }
248                                    },
249                                    Message::Probe(NeighborProbe::Pong(pong)) => {
250                                        tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
251                                        if let Some((peer, start, replier)) = active_probes.remove(&(pseudonym, NeighborProbe::Ping(pong))).await {
252                                            let latency = current_time()
253                                                .as_unix_timestamp()
254                                                .saturating_sub(start);
255                                            store.on_finished(&peer, &Ok(latency)).await;
256
257                                            if let Some(replier) = replier {
258                                                replier.notify(Ok(latency))
259                                            };
260                                        } else {
261                                            tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
262                                        };
263                                    },
264                                }
265                            },
266                            Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
267                        }
268                    } else {
269                        // If the message is not a probing message, forward it up
270                        pin_mut!(move_up);
271                        if move_up.send((pseudonym, in_data)).await.is_err() {
272                            tracing::error!(%pseudonym, error = "receiver error", "failed to send message up");
273                        }
274                    }
275                }
276            }).inspect(|_| tracing::warn!(task = "transport (probe - processing incoming)", "long-running background task finished")))
277        );
278
279        processes
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use std::{collections::VecDeque, convert::Infallible, sync::RwLock, time::Duration};
286
287    use async_trait::async_trait;
288    use futures::future::BoxFuture;
289    use hopr_api::db::FoundSurb;
290    use hopr_crypto_packet::{HoprSurb, prelude::HoprSenderId};
291    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
292    use hopr_network_types::prelude::SurbMatcher;
293    use hopr_protocol_app::prelude::{ApplicationData, Tag};
294
295    use super::*;
296
297    lazy_static::lazy_static!(
298        static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
299        static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
300        static ref NEIGHBOURS: Vec<PeerId> = vec![
301            OffchainKeypair::random().public().into(),
302            OffchainKeypair::random().public().into(),
303            OffchainKeypair::random().public().into(),
304            OffchainKeypair::random().public().into(),
305        ];
306    );
307
308    #[derive(Debug, Clone)]
309    pub struct PeerStore {
310        get_peers: Arc<RwLock<VecDeque<Vec<PeerId>>>>,
311        #[allow(clippy::type_complexity)]
312        on_finished: Arc<RwLock<Vec<(PeerId, crate::errors::Result<Duration>)>>>,
313    }
314
315    #[async_trait]
316    impl ProbeStatusUpdate for PeerStore {
317        async fn on_finished(&self, peer: &PeerId, result: &crate::errors::Result<Duration>) {
318            let mut on_finished = self.on_finished.write().unwrap();
319            on_finished.push((
320                *peer,
321                match result {
322                    Ok(duration) => Ok(*duration),
323                    Err(_e) => Err(ProbeError::Timeout(1000)),
324                },
325            ));
326        }
327    }
328
329    #[async_trait]
330    impl PeerDiscoveryFetch for PeerStore {
331        async fn get_peers(&self, _from_timestamp: std::time::SystemTime) -> Vec<PeerId> {
332            let mut get_peers = self.get_peers.write().unwrap();
333            get_peers.pop_front().unwrap_or_default()
334        }
335    }
336
337    #[derive(Debug, Clone)]
338    pub struct Cache {}
339
340    #[async_trait]
341    impl DbOperations for Cache {
342        type ChainError = Infallible;
343        type DbError = Infallible;
344
345        async fn find_surb(&self, _matcher: SurbMatcher) -> Result<FoundSurb, Self::DbError> {
346            // Mock implementation for testing purposes
347            Ok(FoundSurb {
348                sender_id: HoprSenderId::random(),
349                surb: random_memory_violating_surb(),
350                remaining: 0,
351            })
352        }
353
354        async fn resolve_chain_key(
355            &self,
356            _offchain_key: &OffchainPublicKey,
357        ) -> Result<Option<Address>, Self::ChainError> {
358            Ok(Some(ONCHAIN_KEYPAIR.public().to_address()))
359        }
360    }
361
362    /// !!! This generates a completely random data blob that pretends to be a SURB.
363    ///
364    /// !!! It must never be accessed or invoked, only passed around.
365    fn random_memory_violating_surb() -> HoprSurb {
366        const SURB_SIZE: usize = size_of::<HoprSurb>();
367
368        unsafe { std::mem::transmute::<[u8; SURB_SIZE], HoprSurb>(hopr_crypto_random::random_bytes()) }
369    }
370
371    struct TestInterface {
372        from_probing_up_rx: futures::channel::mpsc::Receiver<(HoprPseudonym, ApplicationDataIn)>,
373        from_probing_to_network_rx: futures::channel::mpsc::Receiver<(ResolvedTransportRouting, ApplicationDataOut)>,
374        from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
375        manual_probe_tx: futures::channel::mpsc::Sender<(PeerId, PingQueryReplier)>,
376    }
377
378    async fn test_with_probing<F, Db, St, Fut>(cfg: ProbeConfig, store: St, db: Db, test: F) -> anyhow::Result<()>
379    where
380        Fut: std::future::Future<Output = anyhow::Result<()>>,
381        F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
382        Db: DbOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
383        St: ProbeStatusUpdate + PeerDiscoveryFetch + Clone + Send + Sync + 'static,
384    {
385        let probe = Probe::new((*OFFCHAIN_KEYPAIR.public(), ONCHAIN_KEYPAIR.public().to_address()), cfg);
386
387        let (from_probing_up_tx, from_probing_up_rx) =
388            futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
389
390        let (from_probing_to_network_tx, from_probing_to_network_rx) =
391            futures::channel::mpsc::channel::<(ResolvedTransportRouting, ApplicationDataOut)>(100);
392
393        let (from_network_to_probing_tx, from_network_to_probing_rx) =
394            futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
395
396        let (manual_probe_tx, manual_probe_rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(100);
397
398        let interface = TestInterface {
399            from_probing_up_rx,
400            from_probing_to_network_rx,
401            from_network_to_probing_tx,
402            manual_probe_tx,
403        };
404
405        let jhs = probe
406            .continuously_scan(
407                (from_probing_to_network_tx, from_network_to_probing_rx),
408                manual_probe_rx,
409                store,
410                db,
411                from_probing_up_tx,
412            )
413            .await;
414
415        let result = test(interface).await;
416
417        jhs.into_iter().for_each(|(_name, handle)| handle.abort());
418
419        result
420    }
421
422    const NO_PROBE_PASSES: f64 = 0.0;
423    const ALL_PROBES_PASS: f64 = 1.0;
424
425    /// Channel that can drop any probes and concurrently replies to a probe correctly
426    fn concurrent_channel(
427        delay: Option<std::time::Duration>,
428        pass_rate: f64,
429        from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
430    ) -> impl Fn((ResolvedTransportRouting, ApplicationDataOut)) -> BoxFuture<'static, ()> {
431        debug_assert!(
432            (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
433            "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
434        );
435
436        move |(path, data_out): (ResolvedTransportRouting, ApplicationDataOut)| -> BoxFuture<'static, ()> {
437            let mut from_network_to_probing_tx = from_network_to_probing_tx.clone();
438
439            Box::pin(async move {
440                if let ResolvedTransportRouting::Forward { pseudonym, .. } = path {
441                    let message: Message = data_out.data.try_into().expect("failed to convert data into message");
442                    if let Message::Probe(NeighborProbe::Ping(ping)) = message {
443                        let pong_message = Message::Probe(NeighborProbe::Pong(ping));
444
445                        if let Some(delay) = delay {
446                            // Simulate a delay if specified
447                            tokio::time::sleep(delay).await;
448                        }
449
450                        if rand::Rng::gen_range(&mut rand::thread_rng(), NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate
451                        {
452                            from_network_to_probing_tx
453                                .send((
454                                    pseudonym,
455                                    ApplicationDataIn {
456                                        data: pong_message
457                                            .try_into()
458                                            .expect("failed to convert pong message into data"),
459                                        packet_info: Default::default(),
460                                    },
461                                ))
462                                .await
463                                .expect("failed to send pong message");
464                        }
465                    }
466                };
467            })
468        }
469    }
470
471    #[tokio::test]
472    // #[tracing_test::traced_test]
473    async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
474        let cfg = ProbeConfig {
475            timeout: std::time::Duration::from_millis(5),
476            interval: std::time::Duration::from_secs(0),
477            ..Default::default()
478        };
479
480        let store = PeerStore {
481            get_peers: Arc::new(RwLock::new(VecDeque::new())),
482            on_finished: Arc::new(RwLock::new(Vec::new())),
483        };
484
485        test_with_probing(cfg, store, Cache {}, move |iface: TestInterface| async move {
486            let mut manual_probe_tx = iface.manual_probe_tx;
487            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
488            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
489
490            let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
491            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
492
493            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
494                from_probing_to_network_rx
495                    .for_each_concurrent(
496                        cfg.max_parallel_probes + 1,
497                        concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
498                    )
499                    .await;
500            });
501
502            let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
503                .await?
504                .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))??;
505
506            Ok(())
507        })
508        .await
509    }
510
511    #[tokio::test]
512    // #[tracing_test::traced_test]
513    async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
514        let cfg = ProbeConfig {
515            timeout: std::time::Duration::from_millis(5),
516            interval: std::time::Duration::from_secs(0),
517            ..Default::default()
518        };
519
520        let store = PeerStore {
521            get_peers: Arc::new(RwLock::new(VecDeque::new())),
522            on_finished: Arc::new(RwLock::new(Vec::new())),
523        };
524
525        test_with_probing(cfg, store, Cache {}, move |iface: TestInterface| async move {
526            let mut manual_probe_tx = iface.manual_probe_tx;
527            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
528            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
529
530            let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
531            manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
532
533            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
534                from_probing_to_network_rx
535                    .for_each_concurrent(
536                        cfg.max_parallel_probes + 1,
537                        concurrent_channel(None, NO_PROBE_PASSES, from_network_to_probing_tx),
538                    )
539                    .await;
540            });
541
542            assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
543
544            Ok(())
545        })
546        .await
547    }
548
549    #[tokio::test]
550    // #[tracing_test::traced_test]
551    async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
552        let cfg = ProbeConfig {
553            timeout: std::time::Duration::from_millis(20),
554            max_parallel_probes: NEIGHBOURS.len(),
555            interval: std::time::Duration::from_secs(0),
556            ..Default::default()
557        };
558
559        let store = PeerStore {
560            get_peers: Arc::new(RwLock::new({
561                let mut neighbors = VecDeque::new();
562                neighbors.push_back(NEIGHBOURS.clone());
563                neighbors
564            })),
565            on_finished: Arc::new(RwLock::new(Vec::new())),
566        };
567
568        test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
569            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
570            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
571
572            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
573                from_probing_to_network_rx
574                    .for_each_concurrent(
575                        cfg.max_parallel_probes + 1,
576                        concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
577                    )
578                    .await;
579            });
580
581            // wait for the probes to start and finish
582            tokio::time::sleep(cfg.timeout).await;
583
584            Ok(())
585        })
586        .await?;
587
588        assert_eq!(
589            store
590                .on_finished
591                .read()
592                .expect("should be lockable")
593                .iter()
594                .filter(|(_peer, result)| result.is_ok())
595                .count(),
596            NEIGHBOURS.len()
597        );
598
599        Ok(())
600    }
601
602    #[tokio::test]
603    // #[tracing_test::traced_test]
604    async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
605        let cfg = ProbeConfig {
606            timeout: std::time::Duration::from_millis(10),
607            max_parallel_probes: NEIGHBOURS.len(),
608            interval: std::time::Duration::from_secs(0),
609            ..Default::default()
610        };
611
612        let store = PeerStore {
613            get_peers: Arc::new(RwLock::new({
614                let mut neighbors = VecDeque::new();
615                neighbors.push_back(NEIGHBOURS.clone());
616                neighbors
617            })),
618            on_finished: Arc::new(RwLock::new(Vec::new())),
619        };
620
621        let timeout = cfg.timeout * 2;
622
623        test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
624            let from_probing_to_network_rx = iface.from_probing_to_network_rx;
625            let from_network_to_probing_tx = iface.from_network_to_probing_tx;
626
627            let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
628                from_probing_to_network_rx
629                    .for_each_concurrent(
630                        cfg.max_parallel_probes + 1,
631                        concurrent_channel(Some(timeout), ALL_PROBES_PASS, from_network_to_probing_tx),
632                    )
633                    .await;
634            });
635
636            // wait for the probes to start and finish
637            tokio::time::sleep(timeout * 2).await;
638
639            Ok(())
640        })
641        .await?;
642
643        assert_eq!(
644            store
645                .on_finished
646                .read()
647                .expect("should be lockable")
648                .iter()
649                .filter(|(_peer, result)| result.is_err())
650                .count(),
651            NEIGHBOURS.len()
652        );
653
654        Ok(())
655    }
656
657    #[tokio::test]
658    // #[tracing_test::traced_test]
659    async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
660        let cfg = ProbeConfig {
661            timeout: std::time::Duration::from_millis(20),
662            interval: std::time::Duration::from_secs(0),
663            ..Default::default()
664        };
665
666        let store = PeerStore {
667            get_peers: Arc::new(RwLock::new({
668                let mut neighbors = VecDeque::new();
669                neighbors.push_back(NEIGHBOURS.clone());
670                neighbors
671            })),
672            on_finished: Arc::new(RwLock::new(Vec::new())),
673        };
674
675        test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
676            let mut from_network_to_probing_tx = iface.from_network_to_probing_tx;
677            let mut from_probing_up_rx = iface.from_probing_up_rx;
678
679            let expected_data = ApplicationData::new(Tag::MAX, b"Hello, this is a test message!")?;
680
681            from_network_to_probing_tx
682                .send((
683                    HoprPseudonym::random(),
684                    ApplicationDataIn {
685                        data: expected_data.clone(),
686                        packet_info: Default::default(),
687                    },
688                ))
689                .await?;
690
691            let actual = tokio::time::timeout(cfg.timeout, from_probing_up_rx.next())
692                .await?
693                .ok_or_else(|| anyhow::anyhow!("Did not return any data in time"))?
694                .1;
695
696            assert_eq!(actual.data, expected_data);
697
698            Ok(())
699        })
700        .await
701    }
702}