1use std::{collections::HashMap, sync::Arc};
2
3use anyhow::Context;
4use futures::{FutureExt, SinkExt, StreamExt, pin_mut};
5use futures_concurrency::stream::StreamExt as _;
6use hopr_crypto_random::Randomizable;
7use hopr_crypto_types::types::OffchainPublicKey;
8use hopr_internal_types::protocol::HoprPseudonym;
9use hopr_network_types::types::{ResolvedTransportRouting, ValidatedPath};
10use hopr_platform::time::native::current_time;
11use hopr_primitive_types::{prelude::Address, traits::AsUnixTimestamp};
12use hopr_protocol_app::prelude::{ApplicationDataIn, ApplicationDataOut, ReservedTag};
13use libp2p_identity::PeerId;
14
15use crate::{
16 HoprProbeProcess,
17 config::ProbeConfig,
18 content::{Message, NeighborProbe},
19 errors::ProbeError,
20 neighbors::neighbors_to_probe,
21 ping::PingQueryReplier,
22 traits::{DbOperations, PeerDiscoveryFetch, ProbeStatusUpdate},
23};
24
25#[inline(always)]
26fn to_nonce(message: &Message) -> String {
27 match message {
28 Message::Probe(NeighborProbe::Ping(ping)) => hex::encode(ping),
29 Message::Probe(NeighborProbe::Pong(ping)) => hex::encode(ping),
30 _ => "<telemetry>".to_string(),
31 }
32}
33
34#[inline(always)]
35fn to_pseudonym(path: &ResolvedTransportRouting) -> HoprPseudonym {
36 match path {
37 ResolvedTransportRouting::Forward { pseudonym, .. } => *pseudonym,
38 ResolvedTransportRouting::Return(pseudonym, _) => pseudonym.pseudonym(),
39 }
40}
41
42#[derive(Clone, Debug)]
43struct Sender<T> {
44 downstream: T,
45}
46
47impl<T> Sender<T>
48where
49 T: futures::Sink<(ResolvedTransportRouting, ApplicationDataOut)> + Clone + Send + Sync + 'static,
50{
51 #[tracing::instrument(level = "debug", skip(self, path, message), fields(message=%message, nonce=%to_nonce(&message), pseudonym=%to_pseudonym(&path)), ret(level = tracing::Level::TRACE), err(Display))]
52 async fn send_message(self, path: ResolvedTransportRouting, message: Message) -> crate::errors::Result<()> {
53 let push_to_network = self.downstream;
54 pin_mut!(push_to_network);
55 if push_to_network
56 .as_mut()
57 .send((path, ApplicationDataOut::with_no_packet_info(message.try_into()?)))
58 .await
59 .is_ok()
60 {
61 Ok(())
62 } else {
63 Err(ProbeError::SendError("transport error".to_string()))
64 }
65 }
66}
67
68pub struct Probe {
73 me: (OffchainPublicKey, Address),
75 cfg: ProbeConfig,
77}
78
79impl Probe {
80 pub fn new(me: (OffchainPublicKey, Address), cfg: ProbeConfig) -> Self {
81 Self { me, cfg }
82 }
83
84 pub async fn continuously_scan<T, U, V, W, C, Up>(
86 self,
87 api: (T, U), manual_events: V, store: W, db: C, move_up: Up, ) -> HashMap<HoprProbeProcess, hopr_async_runtime::AbortHandle>
93 where
94 T: futures::Sink<(ResolvedTransportRouting, ApplicationDataOut)> + Clone + Send + Sync + 'static,
95 T::Error: Send,
96 U: futures::Stream<Item = (HoprPseudonym, ApplicationDataIn)> + Send + Sync + 'static,
97 W: PeerDiscoveryFetch + ProbeStatusUpdate + Clone + Send + Sync + 'static,
98 C: DbOperations + Clone + Send + Sync + 'static,
99 V: futures::Stream<Item = (PeerId, PingQueryReplier)> + Send + Sync + 'static,
100 Up: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Clone + Send + Sync + 'static,
101 {
102 let max_parallel_probes = self.cfg.max_parallel_probes;
103 let interval_between_rounds = self.cfg.interval;
104
105 let cache_peer_routing: moka::future::Cache<PeerId, ResolvedTransportRouting> = moka::future::Cache::builder()
107 .time_to_live(std::time::Duration::from_secs(600))
108 .max_capacity(100_000)
109 .build();
110
111 let store_eviction = store.clone();
113 let timeout = self.cfg.timeout;
114 let active_probes: moka::future::Cache<
115 (HoprPseudonym, NeighborProbe),
116 (PeerId, std::time::Duration, Option<PingQueryReplier>),
117 > = moka::future::Cache::builder()
118 .time_to_live(timeout)
119 .max_capacity(100_000)
120 .async_eviction_listener(
121 move |k: Arc<(HoprPseudonym, NeighborProbe)>,
122 v: (PeerId, std::time::Duration, Option<PingQueryReplier>),
123 cause|
124 -> moka::notification::ListenerFuture {
125 if matches!(cause, moka::notification::RemovalCause::Expired) {
126 let store = store_eviction.clone();
128 let (peer, _start, notifier) = v;
129
130 tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "probe failed");
131 if let Some(replier) = notifier {
132 replier.notify(Err(ProbeError::Timeout(timeout.as_secs())));
133 };
134 async move {
135 store
136 .on_finished(&peer, &Err(ProbeError::Timeout(timeout.as_secs())))
137 .await;
138 }
139 .boxed()
140 } else {
141 futures::future::ready(()).boxed()
143 }
144 },
145 )
146 .build();
147
148 let active_probes_rx = active_probes.clone();
149 let db_rx = db.clone();
150 let push_to_network = api.0.clone();
151
152 let mut processes = HashMap::<HoprProbeProcess, hopr_async_runtime::AbortHandle>::new();
153
154 let direct_neighbors = neighbors_to_probe(store.clone(), self.cfg)
156 .map(|peer| (peer, None))
157 .merge(manual_events.map(|(peer, notifier)| (peer, Some(notifier))));
158
159 processes.insert(
160 HoprProbeProcess::Emit,
161 hopr_async_runtime::spawn_as_abortable!(async move {
162 hopr_async_runtime::prelude::sleep(2 * interval_between_rounds).await; direct_neighbors
165 .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
166 let db = db.clone();
167 let cache_peer_routing = cache_peer_routing.clone();
168 let active_probes = active_probes.clone();
169 let push_to_network = Sender { downstream: push_to_network.clone() };
170 let me = self.me;
171
172 async move {
173 let result = cache_peer_routing
174 .try_get_with(peer, async move {
175 let pubkey = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer))
178 .await
179 .context(format!("failed to convert {peer} to offchain public key"))?;
180 let cp_address = db
181 .resolve_chain_key(&pubkey)
182 .await?
183 .ok_or_else(|| anyhow::anyhow!("Failed to resolve chain key for peer: {peer}"))?;
184
185 Ok::<ResolvedTransportRouting, anyhow::Error>(ResolvedTransportRouting::Forward {
186 pseudonym: HoprPseudonym::random(),
187 forward_path: ValidatedPath::direct(pubkey, cp_address),
188 return_paths: vec![ValidatedPath::direct(me.0, me.1)],
189 })
190 })
191 .await;
192
193 match result {
194 Ok(ResolvedTransportRouting::Forward { pseudonym, forward_path, return_paths }) => {
195 let nonce = NeighborProbe::random_nonce();
196
197 let message = Message::Probe(nonce);
198 let path = ResolvedTransportRouting::Forward { pseudonym, forward_path, return_paths };
199 if push_to_network.send_message(path, message).await.is_ok() {
200 active_probes
201 .insert((pseudonym, nonce), (peer, current_time().as_unix_timestamp(), notifier))
202 .await;
203 }
204 },
205 Ok(_) => tracing::error!(%peer, error = "logical error", "resolved transport routing is not forward"),
206 Err(error) => tracing::error!(%peer, %error, "failed to resolve transport routing"),
207 };
208 }
209 })
210 .inspect(|_| tracing::warn!(task = "transport (probe - generate outgoing)", "long-running background task finished"))
211 .await;
212 })
213 );
214
215 processes.insert(
217 HoprProbeProcess::Process,
218 hopr_async_runtime::spawn_as_abortable!(api.1.for_each_concurrent(max_parallel_probes, move |(pseudonym, in_data)| {
219 let active_probes = active_probes_rx.clone();
220 let push_to_network = Sender { downstream: api.0.clone() };
221 let db = db_rx.clone();
222 let store = store.clone();
223 let move_up = move_up.clone();
224
225 async move {
226 if in_data.data.application_tag == ReservedTag::Ping.into() {
228 let message: anyhow::Result<Message> = in_data.data.try_into().map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
229
230 match message {
231 Ok(message) => {
232 match message {
233 Message::Telemetry(_path_telemetry) => {
234 tracing::warn!(%pseudonym, reason = "feature not implemented", "this node could not originate the telemetry");
235 },
236 Message::Probe(NeighborProbe::Ping(ping)) => {
237 tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
238 match db.find_surb(pseudonym.into()).await.map(|found_surb| ResolvedTransportRouting::Return(found_surb.sender_id, found_surb.surb)) {
239 Ok(path) => {
240 tracing::trace!(%pseudonym, nonce = hex::encode(ping), "wrapping a pong in the found SURB");
241 let message = Message::Probe(NeighborProbe::Pong(ping));
242 if let Err(error) = push_to_network.send_message(path, message).await {
243 tracing::error!(%pseudonym, %error, "failed to send back a pong");
244 }
245 },
246 Err(error) => tracing::error!(%pseudonym, %error, "failed to get a SURB, cannot send back a pong"),
247 }
248 },
249 Message::Probe(NeighborProbe::Pong(pong)) => {
250 tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
251 if let Some((peer, start, replier)) = active_probes.remove(&(pseudonym, NeighborProbe::Ping(pong))).await {
252 let latency = current_time()
253 .as_unix_timestamp()
254 .saturating_sub(start);
255 store.on_finished(&peer, &Ok(latency)).await;
256
257 if let Some(replier) = replier {
258 replier.notify(Ok(latency))
259 };
260 } else {
261 tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
262 };
263 },
264 }
265 },
266 Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
267 }
268 } else {
269 pin_mut!(move_up);
271 if move_up.send((pseudonym, in_data)).await.is_err() {
272 tracing::error!(%pseudonym, error = "receiver error", "failed to send message up");
273 }
274 }
275 }
276 }).inspect(|_| tracing::warn!(task = "transport (probe - processing incoming)", "long-running background task finished")))
277 );
278
279 processes
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use std::{collections::VecDeque, convert::Infallible, sync::RwLock, time::Duration};
286
287 use async_trait::async_trait;
288 use futures::future::BoxFuture;
289 use hopr_api::db::FoundSurb;
290 use hopr_crypto_packet::{HoprSurb, prelude::HoprSenderId};
291 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
292 use hopr_network_types::prelude::SurbMatcher;
293 use hopr_protocol_app::prelude::{ApplicationData, Tag};
294
295 use super::*;
296
297 lazy_static::lazy_static!(
298 static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
299 static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
300 static ref NEIGHBOURS: Vec<PeerId> = vec![
301 OffchainKeypair::random().public().into(),
302 OffchainKeypair::random().public().into(),
303 OffchainKeypair::random().public().into(),
304 OffchainKeypair::random().public().into(),
305 ];
306 );
307
308 #[derive(Debug, Clone)]
309 pub struct PeerStore {
310 get_peers: Arc<RwLock<VecDeque<Vec<PeerId>>>>,
311 #[allow(clippy::type_complexity)]
312 on_finished: Arc<RwLock<Vec<(PeerId, crate::errors::Result<Duration>)>>>,
313 }
314
315 #[async_trait]
316 impl ProbeStatusUpdate for PeerStore {
317 async fn on_finished(&self, peer: &PeerId, result: &crate::errors::Result<Duration>) {
318 let mut on_finished = self.on_finished.write().unwrap();
319 on_finished.push((
320 *peer,
321 match result {
322 Ok(duration) => Ok(*duration),
323 Err(_e) => Err(ProbeError::Timeout(1000)),
324 },
325 ));
326 }
327 }
328
329 #[async_trait]
330 impl PeerDiscoveryFetch for PeerStore {
331 async fn get_peers(&self, _from_timestamp: std::time::SystemTime) -> Vec<PeerId> {
332 let mut get_peers = self.get_peers.write().unwrap();
333 get_peers.pop_front().unwrap_or_default()
334 }
335 }
336
337 #[derive(Debug, Clone)]
338 pub struct Cache {}
339
340 #[async_trait]
341 impl DbOperations for Cache {
342 type ChainError = Infallible;
343 type DbError = Infallible;
344
345 async fn find_surb(&self, _matcher: SurbMatcher) -> Result<FoundSurb, Self::DbError> {
346 Ok(FoundSurb {
348 sender_id: HoprSenderId::random(),
349 surb: random_memory_violating_surb(),
350 remaining: 0,
351 })
352 }
353
354 async fn resolve_chain_key(
355 &self,
356 _offchain_key: &OffchainPublicKey,
357 ) -> Result<Option<Address>, Self::ChainError> {
358 Ok(Some(ONCHAIN_KEYPAIR.public().to_address()))
359 }
360 }
361
362 fn random_memory_violating_surb() -> HoprSurb {
366 const SURB_SIZE: usize = size_of::<HoprSurb>();
367
368 unsafe { std::mem::transmute::<[u8; SURB_SIZE], HoprSurb>(hopr_crypto_random::random_bytes()) }
369 }
370
371 struct TestInterface {
372 from_probing_up_rx: futures::channel::mpsc::Receiver<(HoprPseudonym, ApplicationDataIn)>,
373 from_probing_to_network_rx: futures::channel::mpsc::Receiver<(ResolvedTransportRouting, ApplicationDataOut)>,
374 from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
375 manual_probe_tx: futures::channel::mpsc::Sender<(PeerId, PingQueryReplier)>,
376 }
377
378 async fn test_with_probing<F, Db, St, Fut>(cfg: ProbeConfig, store: St, db: Db, test: F) -> anyhow::Result<()>
379 where
380 Fut: std::future::Future<Output = anyhow::Result<()>>,
381 F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
382 Db: DbOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
383 St: ProbeStatusUpdate + PeerDiscoveryFetch + Clone + Send + Sync + 'static,
384 {
385 let probe = Probe::new((*OFFCHAIN_KEYPAIR.public(), ONCHAIN_KEYPAIR.public().to_address()), cfg);
386
387 let (from_probing_up_tx, from_probing_up_rx) =
388 futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
389
390 let (from_probing_to_network_tx, from_probing_to_network_rx) =
391 futures::channel::mpsc::channel::<(ResolvedTransportRouting, ApplicationDataOut)>(100);
392
393 let (from_network_to_probing_tx, from_network_to_probing_rx) =
394 futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
395
396 let (manual_probe_tx, manual_probe_rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(100);
397
398 let interface = TestInterface {
399 from_probing_up_rx,
400 from_probing_to_network_rx,
401 from_network_to_probing_tx,
402 manual_probe_tx,
403 };
404
405 let jhs = probe
406 .continuously_scan(
407 (from_probing_to_network_tx, from_network_to_probing_rx),
408 manual_probe_rx,
409 store,
410 db,
411 from_probing_up_tx,
412 )
413 .await;
414
415 let result = test(interface).await;
416
417 jhs.into_iter().for_each(|(_name, handle)| handle.abort());
418
419 result
420 }
421
422 const NO_PROBE_PASSES: f64 = 0.0;
423 const ALL_PROBES_PASS: f64 = 1.0;
424
425 fn concurrent_channel(
427 delay: Option<std::time::Duration>,
428 pass_rate: f64,
429 from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
430 ) -> impl Fn((ResolvedTransportRouting, ApplicationDataOut)) -> BoxFuture<'static, ()> {
431 debug_assert!(
432 (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
433 "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
434 );
435
436 move |(path, data_out): (ResolvedTransportRouting, ApplicationDataOut)| -> BoxFuture<'static, ()> {
437 let mut from_network_to_probing_tx = from_network_to_probing_tx.clone();
438
439 Box::pin(async move {
440 if let ResolvedTransportRouting::Forward { pseudonym, .. } = path {
441 let message: Message = data_out.data.try_into().expect("failed to convert data into message");
442 if let Message::Probe(NeighborProbe::Ping(ping)) = message {
443 let pong_message = Message::Probe(NeighborProbe::Pong(ping));
444
445 if let Some(delay) = delay {
446 tokio::time::sleep(delay).await;
448 }
449
450 if rand::Rng::gen_range(&mut rand::thread_rng(), NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate
451 {
452 from_network_to_probing_tx
453 .send((
454 pseudonym,
455 ApplicationDataIn {
456 data: pong_message
457 .try_into()
458 .expect("failed to convert pong message into data"),
459 packet_info: Default::default(),
460 },
461 ))
462 .await
463 .expect("failed to send pong message");
464 }
465 }
466 };
467 })
468 }
469 }
470
471 #[tokio::test]
472 async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
474 let cfg = ProbeConfig {
475 timeout: std::time::Duration::from_millis(5),
476 interval: std::time::Duration::from_secs(0),
477 ..Default::default()
478 };
479
480 let store = PeerStore {
481 get_peers: Arc::new(RwLock::new(VecDeque::new())),
482 on_finished: Arc::new(RwLock::new(Vec::new())),
483 };
484
485 test_with_probing(cfg, store, Cache {}, move |iface: TestInterface| async move {
486 let mut manual_probe_tx = iface.manual_probe_tx;
487 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
488 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
489
490 let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
491 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
492
493 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
494 from_probing_to_network_rx
495 .for_each_concurrent(
496 cfg.max_parallel_probes + 1,
497 concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
498 )
499 .await;
500 });
501
502 let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
503 .await?
504 .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))??;
505
506 Ok(())
507 })
508 .await
509 }
510
511 #[tokio::test]
512 async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
514 let cfg = ProbeConfig {
515 timeout: std::time::Duration::from_millis(5),
516 interval: std::time::Duration::from_secs(0),
517 ..Default::default()
518 };
519
520 let store = PeerStore {
521 get_peers: Arc::new(RwLock::new(VecDeque::new())),
522 on_finished: Arc::new(RwLock::new(Vec::new())),
523 };
524
525 test_with_probing(cfg, store, Cache {}, move |iface: TestInterface| async move {
526 let mut manual_probe_tx = iface.manual_probe_tx;
527 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
528 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
529
530 let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
531 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
532
533 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
534 from_probing_to_network_rx
535 .for_each_concurrent(
536 cfg.max_parallel_probes + 1,
537 concurrent_channel(None, NO_PROBE_PASSES, from_network_to_probing_tx),
538 )
539 .await;
540 });
541
542 assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
543
544 Ok(())
545 })
546 .await
547 }
548
549 #[tokio::test]
550 async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
552 let cfg = ProbeConfig {
553 timeout: std::time::Duration::from_millis(20),
554 max_parallel_probes: NEIGHBOURS.len(),
555 interval: std::time::Duration::from_secs(0),
556 ..Default::default()
557 };
558
559 let store = PeerStore {
560 get_peers: Arc::new(RwLock::new({
561 let mut neighbors = VecDeque::new();
562 neighbors.push_back(NEIGHBOURS.clone());
563 neighbors
564 })),
565 on_finished: Arc::new(RwLock::new(Vec::new())),
566 };
567
568 test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
569 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
570 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
571
572 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
573 from_probing_to_network_rx
574 .for_each_concurrent(
575 cfg.max_parallel_probes + 1,
576 concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
577 )
578 .await;
579 });
580
581 tokio::time::sleep(cfg.timeout).await;
583
584 Ok(())
585 })
586 .await?;
587
588 assert_eq!(
589 store
590 .on_finished
591 .read()
592 .expect("should be lockable")
593 .iter()
594 .filter(|(_peer, result)| result.is_ok())
595 .count(),
596 NEIGHBOURS.len()
597 );
598
599 Ok(())
600 }
601
602 #[tokio::test]
603 async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
605 let cfg = ProbeConfig {
606 timeout: std::time::Duration::from_millis(10),
607 max_parallel_probes: NEIGHBOURS.len(),
608 interval: std::time::Duration::from_secs(0),
609 ..Default::default()
610 };
611
612 let store = PeerStore {
613 get_peers: Arc::new(RwLock::new({
614 let mut neighbors = VecDeque::new();
615 neighbors.push_back(NEIGHBOURS.clone());
616 neighbors
617 })),
618 on_finished: Arc::new(RwLock::new(Vec::new())),
619 };
620
621 let timeout = cfg.timeout * 2;
622
623 test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
624 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
625 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
626
627 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
628 from_probing_to_network_rx
629 .for_each_concurrent(
630 cfg.max_parallel_probes + 1,
631 concurrent_channel(Some(timeout), ALL_PROBES_PASS, from_network_to_probing_tx),
632 )
633 .await;
634 });
635
636 tokio::time::sleep(timeout * 2).await;
638
639 Ok(())
640 })
641 .await?;
642
643 assert_eq!(
644 store
645 .on_finished
646 .read()
647 .expect("should be lockable")
648 .iter()
649 .filter(|(_peer, result)| result.is_err())
650 .count(),
651 NEIGHBOURS.len()
652 );
653
654 Ok(())
655 }
656
657 #[tokio::test]
658 async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
660 let cfg = ProbeConfig {
661 timeout: std::time::Duration::from_millis(20),
662 interval: std::time::Duration::from_secs(0),
663 ..Default::default()
664 };
665
666 let store = PeerStore {
667 get_peers: Arc::new(RwLock::new({
668 let mut neighbors = VecDeque::new();
669 neighbors.push_back(NEIGHBOURS.clone());
670 neighbors
671 })),
672 on_finished: Arc::new(RwLock::new(Vec::new())),
673 };
674
675 test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
676 let mut from_network_to_probing_tx = iface.from_network_to_probing_tx;
677 let mut from_probing_up_rx = iface.from_probing_up_rx;
678
679 let expected_data = ApplicationData::new(Tag::MAX, b"Hello, this is a test message!")?;
680
681 from_network_to_probing_tx
682 .send((
683 HoprPseudonym::random(),
684 ApplicationDataIn {
685 data: expected_data.clone(),
686 packet_info: Default::default(),
687 },
688 ))
689 .await?;
690
691 let actual = tokio::time::timeout(cfg.timeout, from_probing_up_rx.next())
692 .await?
693 .ok_or_else(|| anyhow::anyhow!("Did not return any data in time"))?
694 .1;
695
696 assert_eq!(actual.data, expected_data);
697
698 Ok(())
699 })
700 .await
701 }
702}