1use std::{collections::HashMap, sync::Arc};
2
3use anyhow::Context;
4use futures::{FutureExt, SinkExt, StreamExt, channel::oneshot::channel, pin_mut};
5use futures_concurrency::stream::StreamExt as _;
6use hopr_crypto_random::Randomizable;
7use hopr_crypto_types::types::OffchainPublicKey;
8use hopr_internal_types::protocol::HoprPseudonym;
9use hopr_network_types::types::{ResolvedTransportRouting, SurbMatcher, ValidatedPath};
10use hopr_platform::time::native::current_time;
11use hopr_primitive_types::{prelude::Address, traits::AsUnixTimestamp};
12use hopr_transport_packet::prelude::{ApplicationData, ReservedTag};
13use hopr_transport_protocol::processor::{PacketError, PacketSendFinalizer};
14use libp2p_identity::PeerId;
15
16use crate::{
17 HoprProbeProcess,
18 config::ProbeConfig,
19 content::{Message, NeighborProbe},
20 errors::ProbeError,
21 neighbors::neighbors_to_probe,
22 ping::PingQueryReplier,
23 traits::{DbOperations, PeerDiscoveryFetch, ProbeStatusUpdate},
24};
25
26#[inline(always)]
27fn to_nonce(message: &Message) -> String {
28 match message {
29 Message::Probe(NeighborProbe::Ping(ping)) => hex::encode(ping),
30 Message::Probe(NeighborProbe::Pong(ping)) => hex::encode(ping),
31 _ => "<telemetry>".to_string(),
32 }
33}
34
35#[inline(always)]
36fn to_pseudonym(path: &ResolvedTransportRouting) -> HoprPseudonym {
37 match path {
38 ResolvedTransportRouting::Forward { pseudonym, .. } => *pseudonym,
39 ResolvedTransportRouting::Return(pseudonym, _) => pseudonym.pseudonym(),
40 }
41}
42
43#[derive(Clone, Debug)]
44struct Sender<T> {
45 downstream: T,
46}
47
48impl<T> Sender<T>
49where
50 T: futures::Sink<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)> + Clone + Send + Sync + 'static,
51{
52 #[tracing::instrument(level = "debug", skip(self, path, message), fields(message=%message, nonce=%to_nonce(&message), pseudonym=%to_pseudonym(&path)), ret(level = tracing::Level::TRACE), err(Display))]
53 async fn send_message(self, path: ResolvedTransportRouting, message: Message) -> crate::errors::Result<()> {
54 let message: ApplicationData = message
55 .try_into()
56 .map_err(|e: anyhow::Error| ProbeError::SendError(e.to_string()))?;
57 let (packet_sent_tx, packet_sent_rx) = channel::<std::result::Result<(), PacketError>>();
58
59 let push_to_network = self.downstream;
60 futures::pin_mut!(push_to_network);
61 if push_to_network
62 .as_mut()
63 .send((message, path, packet_sent_tx.into()))
64 .await
65 .is_ok()
66 {
67 packet_sent_rx
68 .await
69 .map_err(|error| ProbeError::SendError(error.to_string()))?
70 .map_err(|error| ProbeError::SendError(error.to_string()))
71 } else {
72 Err(ProbeError::SendError("transport error".to_string()))
73 }
74 }
75}
76
77pub struct Probe {
82 me: (OffchainPublicKey, Address),
84 cfg: ProbeConfig,
86}
87
88impl Probe {
89 pub fn new(me: (OffchainPublicKey, Address), cfg: ProbeConfig) -> Self {
90 Self { me, cfg }
91 }
92
93 pub async fn continuously_scan<T, U, V, W, C, Up>(
95 self,
96 api: (T, U), manual_events: V, store: W, db: C, move_up: Up, ) -> HashMap<HoprProbeProcess, hopr_async_runtime::AbortHandle>
102 where
103 T: futures::Sink<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>
104 + Clone
105 + Send
106 + Sync
107 + 'static,
108 T::Error: Send,
109 U: futures::Stream<Item = (HoprPseudonym, ApplicationData)> + Send + Sync + 'static,
110 W: PeerDiscoveryFetch + ProbeStatusUpdate + Clone + Send + Sync + 'static,
111 C: DbOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
112 V: futures::Stream<Item = (PeerId, PingQueryReplier)> + Send + Sync + 'static,
113 Up: futures::Sink<(HoprPseudonym, ApplicationData)> + Clone + Send + Sync + 'static,
114 {
115 let max_parallel_probes = self.cfg.max_parallel_probes;
116
117 let cache_peer_routing: moka::future::Cache<PeerId, ResolvedTransportRouting> = moka::future::Cache::builder()
119 .time_to_live(std::time::Duration::from_secs(600))
120 .max_capacity(100_000)
121 .build();
122
123 let store_eviction = store.clone();
125 let timeout = self.cfg.timeout;
126 let active_probes: moka::future::Cache<
127 (HoprPseudonym, NeighborProbe),
128 (PeerId, std::time::Duration, Option<PingQueryReplier>),
129 > = moka::future::Cache::builder()
130 .time_to_live(timeout)
131 .max_capacity(100_000)
132 .async_eviction_listener(
133 move |k: Arc<(HoprPseudonym, NeighborProbe)>,
134 v: (PeerId, std::time::Duration, Option<PingQueryReplier>),
135 cause|
136 -> moka::notification::ListenerFuture {
137 if matches!(cause, moka::notification::RemovalCause::Expired) {
138 let store = store_eviction.clone();
140 let (peer, _start, notifier) = v;
141
142 tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "probe failed");
143 if let Some(replier) = notifier {
144 replier.notify(Err(ProbeError::Timeout(timeout.as_millis() as u64)));
145 };
146 async move {
147 store
148 .on_finished(&peer, &Err(ProbeError::Timeout(timeout.as_millis() as u64)))
149 .await;
150 }
151 .boxed()
152 } else {
153 futures::future::ready(()).boxed()
155 }
156 },
157 )
158 .build();
159
160 let active_probes_rx = active_probes.clone();
161 let db_rx = db.clone();
162 let push_to_network = api.0.clone();
163
164 let mut processes = HashMap::<HoprProbeProcess, hopr_async_runtime::AbortHandle>::new();
165
166 let direct_neighbors = neighbors_to_probe(store.clone(), self.cfg)
168 .map(|peer| (peer, None))
169 .merge(manual_events.map(|(peer, notifier)| (peer, Some(notifier))));
170
171 processes.insert(
172 HoprProbeProcess::Emit,
173 hopr_async_runtime::spawn_as_abortable(direct_neighbors
174 .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
175 let db = db.clone();
176 let cache_peer_routing = cache_peer_routing.clone();
177 let active_probes = active_probes.clone();
178 let push_to_network = Sender { downstream: push_to_network.clone() };
179 let me = self.me;
180
181 async move {
182 let result = cache_peer_routing
183 .try_get_with(peer, async move {
184 let cp_ofk = OffchainPublicKey::try_from(peer)
185 .context(format!("failed to convert {peer} to offchain public key"))?;
186 let cp_address = db
187 .resolve_chain_key(&cp_ofk)
188 .await?
189 .ok_or_else(|| anyhow::anyhow!("Failed to resolve chain key for peer: {peer}"))?;
190
191 Ok::<ResolvedTransportRouting, anyhow::Error>(ResolvedTransportRouting::Forward {
192 pseudonym: HoprPseudonym::random(),
193 forward_path: ValidatedPath::direct(cp_ofk, cp_address),
194 return_paths: vec![ValidatedPath::direct(me.0, me.1)],
195 })
196 })
197 .await;
198
199 match result {
200 Ok(ResolvedTransportRouting::Forward { pseudonym, forward_path, return_paths }) => {
201 let nonce = NeighborProbe::random_nonce();
202
203 let message = Message::Probe(nonce);
204 let path = ResolvedTransportRouting::Forward { pseudonym, forward_path, return_paths };
205 if push_to_network.send_message(path, message).await.is_ok() {
206 active_probes
207 .insert((pseudonym, nonce), (peer, current_time().as_unix_timestamp(), notifier))
208 .await;
209 }
210 },
211 Ok(_) => tracing::error!(%peer, error = "logical error", "resolved transport routing is not forward"),
212 Err(error) => tracing::error!(%peer, %error, "failed to resolve transport routing"),
213 };
214 }
215 })
216 )
217 );
218
219 processes.insert(
221 HoprProbeProcess::Process,
222 hopr_async_runtime::spawn_as_abortable(api.1.for_each_concurrent(max_parallel_probes, move |(pseudonym, data)| {
223 let active_probes = active_probes_rx.clone();
224 let push_to_network = Sender { downstream: api.0.clone() };
225 let db = db_rx.clone();
226 let store = store.clone();
227 let move_up = move_up.clone();
228
229 async move {
230 if data.application_tag == ReservedTag::Ping.into() {
232 let message: anyhow::Result<Message> = data.try_into().map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
233
234 match message {
235 Ok(message) => {
236 match message {
237 Message::Telemetry(_path_telemetry) => {
238 tracing::warn!(%pseudonym, reason = "feature not implemented", "this node could not originate the telemetry");
239 },
240 Message::Probe(NeighborProbe::Ping(ping)) => {
241 tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
242 match db.find_surb(SurbMatcher::Pseudonym(pseudonym)).await.map(|(sender_id, surb)| ResolvedTransportRouting::Return(sender_id, surb)) {
243 Ok(path) => {
244 let message = Message::Probe(NeighborProbe::Pong(ping));
245 let _ = push_to_network.send_message(path, message).await;
246 },
247 Err(error) => tracing::error!(%pseudonym, %error, "failed to get a SURB, cannot send pong"),
248 }
249 },
250 Message::Probe(NeighborProbe::Pong(pong)) => {
251 tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
252 if let Some((peer, start, replier)) = active_probes.remove(&(pseudonym, NeighborProbe::Ping(pong))).await {
253 let latency = current_time()
254 .as_unix_timestamp()
255 .saturating_sub(start);
256 store.on_finished(&peer, &Ok(latency)).await;
257
258 if let Some(replier) = replier {
259 replier.notify(Ok(latency))
260 };
261 } else {
262 tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
263 };
264 },
265 }
266 },
267 Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
268 }
269 } else {
270 pin_mut!(move_up);
272 if move_up.send((pseudonym, data.clone())).await.is_err() {
273 tracing::error!(%pseudonym, error = "receiver error", "failed to send message up");
274 }
275 }
276 }
277 }))
278 );
279
280 processes
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use std::{collections::VecDeque, sync::RwLock, time::Duration};
287
288 use async_trait::async_trait;
289 use futures::future::BoxFuture;
290 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
291 use hopr_transport_packet::prelude::Tag;
292
293 use super::*;
294
295 lazy_static::lazy_static!(
296 static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
297 static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
298 static ref NEIGHBOURS: Vec<PeerId> = vec![
299 OffchainKeypair::random().public().into(),
300 OffchainKeypair::random().public().into(),
301 OffchainKeypair::random().public().into(),
302 OffchainKeypair::random().public().into(),
303 ];
304 );
305
306 #[derive(Debug, Clone)]
307 pub struct PeerStore {
308 get_peers: Arc<RwLock<VecDeque<Vec<PeerId>>>>,
309 #[allow(clippy::type_complexity)]
310 on_finished: Arc<RwLock<Vec<(PeerId, crate::errors::Result<Duration>)>>>,
311 }
312
313 #[async_trait]
314 impl ProbeStatusUpdate for PeerStore {
315 async fn on_finished(&self, peer: &PeerId, result: &crate::errors::Result<Duration>) {
316 let mut on_finished = self.on_finished.write().unwrap();
317 on_finished.push((
318 *peer,
319 match result {
320 Ok(duration) => Ok(*duration),
321 Err(_e) => Err(ProbeError::Timeout(1000)),
322 },
323 ));
324 }
325 }
326
327 #[async_trait]
328 impl PeerDiscoveryFetch for PeerStore {
329 async fn get_peers(&self, _from_timestamp: std::time::SystemTime) -> Vec<PeerId> {
330 let mut get_peers = self.get_peers.write().unwrap();
331 get_peers.pop_front().unwrap_or_default()
332 }
333 }
334
335 #[derive(Debug, Clone)]
336 pub struct Cache {}
337
338 #[async_trait]
339 impl DbOperations for Cache {
340 async fn find_surb(
341 &self,
342 _matcher: SurbMatcher,
343 ) -> hopr_db_api::errors::Result<(hopr_db_api::protocol::HoprSenderId, hopr_db_api::protocol::HoprSurb)>
344 {
345 Ok((
347 hopr_db_api::protocol::HoprSenderId::random(),
348 random_memory_violating_surb(),
349 ))
350 }
351
352 async fn resolve_chain_key(
353 &self,
354 _offchain_key: &OffchainPublicKey,
355 ) -> hopr_db_api::errors::Result<Option<Address>> {
356 Ok(Some(ONCHAIN_KEYPAIR.public().to_address()))
357 }
358 }
359
360 fn random_memory_violating_surb() -> hopr_db_api::protocol::HoprSurb {
364 const SURB_SIZE: usize = std::mem::size_of::<hopr_db_api::protocol::HoprSurb>();
365
366 unsafe {
367 std::mem::transmute::<[u8; SURB_SIZE], hopr_db_api::protocol::HoprSurb>(hopr_crypto_random::random_bytes())
368 }
369 }
370
371 struct TestInterface {
372 from_probing_up_rx: futures::channel::mpsc::Receiver<(HoprPseudonym, ApplicationData)>,
373 from_probing_to_network_rx:
374 futures::channel::mpsc::Receiver<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>,
375 from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationData)>,
376 manual_probe_tx: futures::channel::mpsc::Sender<(PeerId, PingQueryReplier)>,
377 }
378
379 async fn test_with_probing<F, Db, St, Fut>(cfg: ProbeConfig, store: St, db: Db, test: F) -> anyhow::Result<()>
380 where
381 Fut: std::future::Future<Output = anyhow::Result<()>>,
382 F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
383 Db: DbOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
384 St: ProbeStatusUpdate + PeerDiscoveryFetch + Clone + Send + Sync + 'static,
385 {
386 let probe = Probe::new((*OFFCHAIN_KEYPAIR.public(), ONCHAIN_KEYPAIR.public().to_address()), cfg);
387
388 let (from_probing_up_tx, from_probing_up_rx) =
389 futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationData)>(100);
390
391 let (from_probing_to_network_tx, from_probing_to_network_rx) =
392 futures::channel::mpsc::channel::<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>(100);
393
394 let (from_network_to_probing_tx, from_network_to_probing_rx) =
395 futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationData)>(100);
396
397 let (manual_probe_tx, manual_probe_rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(100);
398
399 let interface = TestInterface {
400 from_probing_up_rx,
401 from_probing_to_network_rx,
402 from_network_to_probing_tx,
403 manual_probe_tx,
404 };
405
406 let jhs = probe
407 .continuously_scan(
408 (from_probing_to_network_tx, from_network_to_probing_rx),
409 manual_probe_rx,
410 store,
411 db,
412 from_probing_up_tx,
413 )
414 .await;
415
416 let result = test(interface).await;
417
418 jhs.into_iter().for_each(|(_name, handle)| handle.abort());
419
420 result
421 }
422
423 const NO_PROBE_PASSES: f64 = 0.0;
424 const ALL_PROBES_PASS: f64 = 1.0;
425
426 fn concurrent_channel(
428 delay: Option<std::time::Duration>,
429 pass_rate: f64,
430 from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationData)>,
431 ) -> impl Fn((ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)) -> BoxFuture<'static, ()> {
432 debug_assert!(
433 (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
434 "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
435 );
436
437 move |(data, path, finalizer): (ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)| -> BoxFuture<'static, ()> {
438 let mut from_network_to_probing_tx = from_network_to_probing_tx.clone();
439
440 Box::pin(async move {
441 if let ResolvedTransportRouting::Forward { pseudonym, .. } = path {
442 finalizer.finalize(Ok(()));
443
444 let message: Message = data.try_into().expect("failed to convert data into message");
445 if let Message::Probe(NeighborProbe::Ping(ping)) = message {
446 let pong_message = Message::Probe(NeighborProbe::Pong(ping));
447
448 if let Some(delay) = delay {
449 tokio::time::sleep(delay).await;
451 }
452
453 if rand::Rng::gen_range(&mut rand::thread_rng(), NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate {
454 from_network_to_probing_tx
455 .send((pseudonym, pong_message.try_into().expect("failed to convert pong message into data")))
456 .await.expect("failed to send pong message");
457 }
458 }
459 };
460 })
461 }
462 }
463
464 #[tokio::test]
465 async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
467 let cfg = ProbeConfig {
468 timeout: std::time::Duration::from_millis(5),
469 ..Default::default()
470 };
471
472 let store = PeerStore {
473 get_peers: Arc::new(RwLock::new(VecDeque::new())),
474 on_finished: Arc::new(RwLock::new(Vec::new())),
475 };
476
477 test_with_probing(cfg, store, Cache {}, move |iface: TestInterface| async move {
478 let mut manual_probe_tx = iface.manual_probe_tx;
479 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
480 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
481
482 let (tx, mut rx) = futures::channel::mpsc::unbounded::<std::result::Result<Duration, ProbeError>>();
483 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
484
485 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
486 from_probing_to_network_rx
487 .for_each_concurrent(
488 cfg.max_parallel_probes + 1,
489 concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
490 )
491 .await;
492 });
493
494 let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
495 .await?
496 .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))??;
497
498 Ok(())
499 })
500 .await
501 }
502
503 #[tokio::test]
504 async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
506 let cfg = ProbeConfig {
507 timeout: std::time::Duration::from_millis(5),
508 ..Default::default()
509 };
510
511 let store = PeerStore {
512 get_peers: Arc::new(RwLock::new(VecDeque::new())),
513 on_finished: Arc::new(RwLock::new(Vec::new())),
514 };
515
516 test_with_probing(cfg, store, Cache {}, move |iface: TestInterface| async move {
517 let mut manual_probe_tx = iface.manual_probe_tx;
518 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
519 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
520
521 let (tx, mut rx) = futures::channel::mpsc::unbounded::<std::result::Result<Duration, ProbeError>>();
522 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
523
524 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
525 from_probing_to_network_rx
526 .for_each_concurrent(
527 cfg.max_parallel_probes + 1,
528 concurrent_channel(None, NO_PROBE_PASSES, from_network_to_probing_tx),
529 )
530 .await;
531 });
532
533 assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
534
535 Ok(())
536 })
537 .await
538 }
539
540 #[tokio::test]
541 async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
543 let cfg = ProbeConfig {
544 timeout: std::time::Duration::from_millis(20),
545 max_parallel_probes: NEIGHBOURS.len(),
546 ..Default::default()
547 };
548
549 let store = PeerStore {
550 get_peers: Arc::new(RwLock::new({
551 let mut neighbors = VecDeque::new();
552 neighbors.push_back(NEIGHBOURS.clone());
553 neighbors
554 })),
555 on_finished: Arc::new(RwLock::new(Vec::new())),
556 };
557
558 test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
559 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
560 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
561
562 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
563 from_probing_to_network_rx
564 .for_each_concurrent(
565 cfg.max_parallel_probes + 1,
566 concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
567 )
568 .await;
569 });
570
571 tokio::time::sleep(cfg.timeout).await;
573
574 Ok(())
575 })
576 .await?;
577
578 assert_eq!(
579 store
580 .on_finished
581 .read()
582 .expect("should be lockable")
583 .iter()
584 .filter(|(_peer, result)| result.is_ok())
585 .count(),
586 NEIGHBOURS.len()
587 );
588
589 Ok(())
590 }
591
592 #[tokio::test]
593 async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
595 let cfg = ProbeConfig {
596 timeout: std::time::Duration::from_millis(10),
597 max_parallel_probes: NEIGHBOURS.len(),
598 ..Default::default()
599 };
600
601 let store = PeerStore {
602 get_peers: Arc::new(RwLock::new({
603 let mut neighbors = VecDeque::new();
604 neighbors.push_back(NEIGHBOURS.clone());
605 neighbors
606 })),
607 on_finished: Arc::new(RwLock::new(Vec::new())),
608 };
609
610 let timeout = cfg.timeout * 2;
611
612 test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
613 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
614 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
615
616 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
617 from_probing_to_network_rx
618 .for_each_concurrent(
619 cfg.max_parallel_probes + 1,
620 concurrent_channel(Some(timeout), ALL_PROBES_PASS, from_network_to_probing_tx),
621 )
622 .await;
623 });
624
625 tokio::time::sleep(timeout * 2).await;
627
628 Ok(())
629 })
630 .await?;
631
632 assert_eq!(
633 store
634 .on_finished
635 .read()
636 .expect("should be lockable")
637 .iter()
638 .filter(|(_peer, result)| result.is_err())
639 .count(),
640 NEIGHBOURS.len()
641 );
642
643 Ok(())
644 }
645
646 #[tokio::test]
647 async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
649 let cfg = ProbeConfig {
650 timeout: std::time::Duration::from_millis(20),
651 ..Default::default()
652 };
653
654 let store = PeerStore {
655 get_peers: Arc::new(RwLock::new({
656 let mut neighbors = VecDeque::new();
657 neighbors.push_back(NEIGHBOURS.clone());
658 neighbors
659 })),
660 on_finished: Arc::new(RwLock::new(Vec::new())),
661 };
662
663 test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
664 let mut from_network_to_probing_tx = iface.from_network_to_probing_tx;
665 let mut from_probing_up_rx = iface.from_probing_up_rx;
666
667 let expected_data = ApplicationData {
668 application_tag: Tag::MAX.into(),
669 plain_text: b"Hello, this is a test message!".to_vec().into_boxed_slice(),
670 };
671
672 from_network_to_probing_tx
673 .send((HoprPseudonym::random(), expected_data.clone()))
674 .await?;
675
676 let actual = tokio::time::timeout(cfg.timeout, from_probing_up_rx.next())
677 .await?
678 .ok_or_else(|| anyhow::anyhow!("Did not return any data in time"))?
679 .1;
680
681 assert_eq!(actual, expected_data);
682
683 Ok(())
684 })
685 .await
686 }
687}