1use std::{collections::HashMap, sync::Arc};
2
3use anyhow::Context;
4use futures::{FutureExt, SinkExt, StreamExt, channel::oneshot::channel, pin_mut};
5use futures_concurrency::stream::StreamExt as _;
6use hopr_crypto_random::Randomizable;
7use hopr_crypto_types::types::OffchainPublicKey;
8use hopr_internal_types::protocol::HoprPseudonym;
9use hopr_network_types::types::{ResolvedTransportRouting, ValidatedPath};
10use hopr_platform::time::native::current_time;
11use hopr_primitive_types::{prelude::Address, traits::AsUnixTimestamp};
12use hopr_protocol_app::prelude::{ApplicationData, ReservedTag};
13use hopr_transport_protocol::processor::{PacketError, PacketSendFinalizer};
14use libp2p_identity::PeerId;
15
16use crate::{
17 HoprProbeProcess,
18 config::ProbeConfig,
19 content::{Message, NeighborProbe},
20 errors::ProbeError,
21 neighbors::neighbors_to_probe,
22 ping::PingQueryReplier,
23 traits::{DbOperations, PeerDiscoveryFetch, ProbeStatusUpdate},
24};
25
26#[inline(always)]
27fn to_nonce(message: &Message) -> String {
28 match message {
29 Message::Probe(NeighborProbe::Ping(ping)) => hex::encode(ping),
30 Message::Probe(NeighborProbe::Pong(ping)) => hex::encode(ping),
31 _ => "<telemetry>".to_string(),
32 }
33}
34
35#[inline(always)]
36fn to_pseudonym(path: &ResolvedTransportRouting) -> HoprPseudonym {
37 match path {
38 ResolvedTransportRouting::Forward { pseudonym, .. } => *pseudonym,
39 ResolvedTransportRouting::Return(pseudonym, _) => pseudonym.pseudonym(),
40 }
41}
42
43#[derive(Clone, Debug)]
44struct Sender<T> {
45 downstream: T,
46}
47
48impl<T> Sender<T>
49where
50 T: futures::Sink<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)> + Clone + Send + Sync + 'static,
51{
52 #[tracing::instrument(level = "debug", skip(self, path, message), fields(message=%message, nonce=%to_nonce(&message), pseudonym=%to_pseudonym(&path)), ret(level = tracing::Level::TRACE), err(Display))]
53 async fn send_message(self, path: ResolvedTransportRouting, message: Message) -> crate::errors::Result<()> {
54 let (packet_sent_tx, packet_sent_rx) = channel::<std::result::Result<(), PacketError>>();
55
56 let push_to_network = self.downstream;
57 pin_mut!(push_to_network);
58 if push_to_network
59 .as_mut()
60 .send((message.into(), path, packet_sent_tx.into()))
61 .await
62 .is_ok()
63 {
64 packet_sent_rx
65 .await
66 .map_err(|error| ProbeError::SendError(error.to_string()))?
67 .map_err(|error| ProbeError::SendError(error.to_string()))
68 } else {
69 Err(ProbeError::SendError("transport error".to_string()))
70 }
71 }
72}
73
74pub struct Probe {
79 me: (OffchainPublicKey, Address),
81 cfg: ProbeConfig,
83}
84
85impl Probe {
86 pub fn new(me: (OffchainPublicKey, Address), cfg: ProbeConfig) -> Self {
87 Self { me, cfg }
88 }
89
90 pub async fn continuously_scan<T, U, V, W, C, Up>(
92 self,
93 api: (T, U), manual_events: V, store: W, db: C, move_up: Up, ) -> HashMap<HoprProbeProcess, hopr_async_runtime::AbortHandle>
99 where
100 T: futures::Sink<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>
101 + Clone
102 + Send
103 + Sync
104 + 'static,
105 T::Error: Send,
106 U: futures::Stream<Item = (HoprPseudonym, ApplicationData)> + Send + Sync + 'static,
107 W: PeerDiscoveryFetch + ProbeStatusUpdate + Clone + Send + Sync + 'static,
108 C: DbOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
109 V: futures::Stream<Item = (PeerId, PingQueryReplier)> + Send + Sync + 'static,
110 Up: futures::Sink<(HoprPseudonym, ApplicationData)> + Clone + Send + Sync + 'static,
111 {
112 let max_parallel_probes = self.cfg.max_parallel_probes;
113 let interval_between_rounds = self.cfg.interval;
114
115 let cache_peer_routing: moka::future::Cache<PeerId, ResolvedTransportRouting> = moka::future::Cache::builder()
117 .time_to_live(std::time::Duration::from_secs(600))
118 .max_capacity(100_000)
119 .build();
120
121 let store_eviction = store.clone();
123 let timeout = self.cfg.timeout;
124 let active_probes: moka::future::Cache<
125 (HoprPseudonym, NeighborProbe),
126 (PeerId, std::time::Duration, Option<PingQueryReplier>),
127 > = moka::future::Cache::builder()
128 .time_to_live(timeout)
129 .max_capacity(100_000)
130 .async_eviction_listener(
131 move |k: Arc<(HoprPseudonym, NeighborProbe)>,
132 v: (PeerId, std::time::Duration, Option<PingQueryReplier>),
133 cause|
134 -> moka::notification::ListenerFuture {
135 if matches!(cause, moka::notification::RemovalCause::Expired) {
136 let store = store_eviction.clone();
138 let (peer, _start, notifier) = v;
139
140 tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "probe failed");
141 if let Some(replier) = notifier {
142 replier.notify(Err(ProbeError::Timeout(timeout.as_secs())));
143 };
144 async move {
145 store
146 .on_finished(&peer, &Err(ProbeError::Timeout(timeout.as_secs())))
147 .await;
148 }
149 .boxed()
150 } else {
151 futures::future::ready(()).boxed()
153 }
154 },
155 )
156 .build();
157
158 let active_probes_rx = active_probes.clone();
159 let db_rx = db.clone();
160 let push_to_network = api.0.clone();
161
162 let mut processes = HashMap::<HoprProbeProcess, hopr_async_runtime::AbortHandle>::new();
163
164 let direct_neighbors = neighbors_to_probe(store.clone(), self.cfg)
166 .map(|peer| (peer, None))
167 .merge(manual_events.map(|(peer, notifier)| (peer, Some(notifier))));
168
169 processes.insert(
170 HoprProbeProcess::Emit,
171 hopr_async_runtime::spawn_as_abortable!(async move {
172 hopr_async_runtime::prelude::sleep(2 * interval_between_rounds).await; direct_neighbors
175 .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
176 let db = db.clone();
177 let cache_peer_routing = cache_peer_routing.clone();
178 let active_probes = active_probes.clone();
179 let push_to_network = Sender { downstream: push_to_network.clone() };
180 let me = self.me;
181
182 async move {
183 let result = cache_peer_routing
184 .try_get_with(peer, async move {
185 let cp_ofk = OffchainPublicKey::try_from(peer)
186 .context(format!("failed to convert {peer} to offchain public key"))?;
187 let cp_address = db
188 .resolve_chain_key(&cp_ofk)
189 .await?
190 .ok_or_else(|| anyhow::anyhow!("Failed to resolve chain key for peer: {peer}"))?;
191
192 Ok::<ResolvedTransportRouting, anyhow::Error>(ResolvedTransportRouting::Forward {
193 pseudonym: HoprPseudonym::random(),
194 forward_path: ValidatedPath::direct(cp_ofk, cp_address),
195 return_paths: vec![ValidatedPath::direct(me.0, me.1)],
196 })
197 })
198 .await;
199
200 match result {
201 Ok(ResolvedTransportRouting::Forward { pseudonym, forward_path, return_paths }) => {
202 let nonce = NeighborProbe::random_nonce();
203
204 let message = Message::Probe(nonce);
205 let path = ResolvedTransportRouting::Forward { pseudonym, forward_path, return_paths };
206 if push_to_network.send_message(path, message).await.is_ok() {
207 active_probes
208 .insert((pseudonym, nonce), (peer, current_time().as_unix_timestamp(), notifier))
209 .await;
210 }
211 },
212 Ok(_) => tracing::error!(%peer, error = "logical error", "resolved transport routing is not forward"),
213 Err(error) => tracing::error!(%peer, %error, "failed to resolve transport routing"),
214 };
215 }
216 }).await;
217 })
218 );
219
220 processes.insert(
222 HoprProbeProcess::Process,
223 hopr_async_runtime::spawn_as_abortable!(api.1.for_each_concurrent(max_parallel_probes, move |(pseudonym, data)| {
224 let active_probes = active_probes_rx.clone();
225 let push_to_network = Sender { downstream: api.0.clone() };
226 let db = db_rx.clone();
227 let store = store.clone();
228 let move_up = move_up.clone();
229
230 async move {
231 if data.application_tag == ReservedTag::Ping.into() {
233 let message: anyhow::Result<Message> = data.try_into().map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
234
235 match message {
236 Ok(message) => {
237 match message {
238 Message::Telemetry(_path_telemetry) => {
239 tracing::warn!(%pseudonym, reason = "feature not implemented", "this node could not originate the telemetry");
240 },
241 Message::Probe(NeighborProbe::Ping(ping)) => {
242 tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
243 match db.find_surb(pseudonym.into()).await.map(|found_surb| ResolvedTransportRouting::Return(found_surb.sender_id, found_surb.surb)) {
244 Ok(path) => {
245 tracing::trace!(%pseudonym, nonce = hex::encode(ping), "wrapping a pong in the found SURB");
246 let message = Message::Probe(NeighborProbe::Pong(ping));
247 if let Err(error) = push_to_network.send_message(path, message).await {
248 tracing::error!(%pseudonym, %error, "failed to send back a pong");
249 }
250 },
251 Err(error) => tracing::error!(%pseudonym, %error, "failed to get a SURB, cannot send back a pong"),
252 }
253 },
254 Message::Probe(NeighborProbe::Pong(pong)) => {
255 tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
256 if let Some((peer, start, replier)) = active_probes.remove(&(pseudonym, NeighborProbe::Ping(pong))).await {
257 let latency = current_time()
258 .as_unix_timestamp()
259 .saturating_sub(start);
260 store.on_finished(&peer, &Ok(latency)).await;
261
262 if let Some(replier) = replier {
263 replier.notify(Ok(latency))
264 };
265 } else {
266 tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
267 };
268 },
269 }
270 },
271 Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
272 }
273 } else {
274 pin_mut!(move_up);
276 if move_up.send((pseudonym, data.clone())).await.is_err() {
277 tracing::error!(%pseudonym, error = "receiver error", "failed to send message up");
278 }
279 }
280 }
281 }))
282 );
283
284 processes
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use std::{collections::VecDeque, sync::RwLock, time::Duration};
291
292 use async_trait::async_trait;
293 use futures::future::BoxFuture;
294 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
295 use hopr_db_api::prelude::FoundSurb;
296 use hopr_network_types::prelude::SurbMatcher;
297 use hopr_protocol_app::prelude::Tag;
298
299 use super::*;
300
301 lazy_static::lazy_static!(
302 static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
303 static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
304 static ref NEIGHBOURS: Vec<PeerId> = vec![
305 OffchainKeypair::random().public().into(),
306 OffchainKeypair::random().public().into(),
307 OffchainKeypair::random().public().into(),
308 OffchainKeypair::random().public().into(),
309 ];
310 );
311
312 #[derive(Debug, Clone)]
313 pub struct PeerStore {
314 get_peers: Arc<RwLock<VecDeque<Vec<PeerId>>>>,
315 #[allow(clippy::type_complexity)]
316 on_finished: Arc<RwLock<Vec<(PeerId, crate::errors::Result<Duration>)>>>,
317 }
318
319 #[async_trait]
320 impl ProbeStatusUpdate for PeerStore {
321 async fn on_finished(&self, peer: &PeerId, result: &crate::errors::Result<Duration>) {
322 let mut on_finished = self.on_finished.write().unwrap();
323 on_finished.push((
324 *peer,
325 match result {
326 Ok(duration) => Ok(*duration),
327 Err(_e) => Err(ProbeError::Timeout(1000)),
328 },
329 ));
330 }
331 }
332
333 #[async_trait]
334 impl PeerDiscoveryFetch for PeerStore {
335 async fn get_peers(&self, _from_timestamp: std::time::SystemTime) -> Vec<PeerId> {
336 let mut get_peers = self.get_peers.write().unwrap();
337 get_peers.pop_front().unwrap_or_default()
338 }
339 }
340
341 #[derive(Debug, Clone)]
342 pub struct Cache {}
343
344 #[async_trait]
345 impl DbOperations for Cache {
346 async fn find_surb(&self, _matcher: SurbMatcher) -> hopr_db_api::errors::Result<FoundSurb> {
347 Ok(FoundSurb {
349 sender_id: hopr_db_api::protocol::HoprSenderId::random(),
350 surb: random_memory_violating_surb(),
351 remaining: 0,
352 })
353 }
354
355 async fn resolve_chain_key(
356 &self,
357 _offchain_key: &OffchainPublicKey,
358 ) -> hopr_db_api::errors::Result<Option<Address>> {
359 Ok(Some(ONCHAIN_KEYPAIR.public().to_address()))
360 }
361 }
362
363 fn random_memory_violating_surb() -> hopr_db_api::protocol::HoprSurb {
367 const SURB_SIZE: usize = std::mem::size_of::<hopr_db_api::protocol::HoprSurb>();
368
369 unsafe {
370 std::mem::transmute::<[u8; SURB_SIZE], hopr_db_api::protocol::HoprSurb>(hopr_crypto_random::random_bytes())
371 }
372 }
373
374 struct TestInterface {
375 from_probing_up_rx: futures::channel::mpsc::Receiver<(HoprPseudonym, ApplicationData)>,
376 from_probing_to_network_rx:
377 futures::channel::mpsc::Receiver<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>,
378 from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationData)>,
379 manual_probe_tx: futures::channel::mpsc::Sender<(PeerId, PingQueryReplier)>,
380 }
381
382 async fn test_with_probing<F, Db, St, Fut>(cfg: ProbeConfig, store: St, db: Db, test: F) -> anyhow::Result<()>
383 where
384 Fut: std::future::Future<Output = anyhow::Result<()>>,
385 F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
386 Db: DbOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
387 St: ProbeStatusUpdate + PeerDiscoveryFetch + Clone + Send + Sync + 'static,
388 {
389 let probe = Probe::new((*OFFCHAIN_KEYPAIR.public(), ONCHAIN_KEYPAIR.public().to_address()), cfg);
390
391 let (from_probing_up_tx, from_probing_up_rx) =
392 futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationData)>(100);
393
394 let (from_probing_to_network_tx, from_probing_to_network_rx) =
395 futures::channel::mpsc::channel::<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>(100);
396
397 let (from_network_to_probing_tx, from_network_to_probing_rx) =
398 futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationData)>(100);
399
400 let (manual_probe_tx, manual_probe_rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(100);
401
402 let interface = TestInterface {
403 from_probing_up_rx,
404 from_probing_to_network_rx,
405 from_network_to_probing_tx,
406 manual_probe_tx,
407 };
408
409 let jhs = probe
410 .continuously_scan(
411 (from_probing_to_network_tx, from_network_to_probing_rx),
412 manual_probe_rx,
413 store,
414 db,
415 from_probing_up_tx,
416 )
417 .await;
418
419 let result = test(interface).await;
420
421 jhs.into_iter().for_each(|(_name, handle)| handle.abort());
422
423 result
424 }
425
426 const NO_PROBE_PASSES: f64 = 0.0;
427 const ALL_PROBES_PASS: f64 = 1.0;
428
429 fn concurrent_channel(
431 delay: Option<std::time::Duration>,
432 pass_rate: f64,
433 from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationData)>,
434 ) -> impl Fn((ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)) -> BoxFuture<'static, ()> {
435 debug_assert!(
436 (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
437 "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
438 );
439
440 move |(data, path, finalizer): (ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)| -> BoxFuture<'static, ()> {
441 let mut from_network_to_probing_tx = from_network_to_probing_tx.clone();
442
443 Box::pin(async move {
444 if let ResolvedTransportRouting::Forward { pseudonym, .. } = path {
445 finalizer.finalize(Ok(()));
446
447 let message: Message = data.try_into().expect("failed to convert data into message");
448 if let Message::Probe(NeighborProbe::Ping(ping)) = message {
449 let pong_message = Message::Probe(NeighborProbe::Pong(ping));
450
451 if let Some(delay) = delay {
452 tokio::time::sleep(delay).await;
454 }
455
456 if rand::Rng::gen_range(&mut rand::thread_rng(), NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate {
457 from_network_to_probing_tx
458 .send((pseudonym, pong_message.try_into().expect("failed to convert pong message into data")))
459 .await.expect("failed to send pong message");
460 }
461 }
462 };
463 })
464 }
465 }
466
467 #[tokio::test]
468 async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
470 let cfg = ProbeConfig {
471 timeout: std::time::Duration::from_millis(5),
472 interval: std::time::Duration::from_secs(0),
473 ..Default::default()
474 };
475
476 let store = PeerStore {
477 get_peers: Arc::new(RwLock::new(VecDeque::new())),
478 on_finished: Arc::new(RwLock::new(Vec::new())),
479 };
480
481 test_with_probing(cfg, store, Cache {}, move |iface: TestInterface| async move {
482 let mut manual_probe_tx = iface.manual_probe_tx;
483 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
484 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
485
486 let (tx, mut rx) = futures::channel::mpsc::unbounded::<std::result::Result<Duration, ProbeError>>();
487 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
488
489 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
490 from_probing_to_network_rx
491 .for_each_concurrent(
492 cfg.max_parallel_probes + 1,
493 concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
494 )
495 .await;
496 });
497
498 let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
499 .await?
500 .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))??;
501
502 Ok(())
503 })
504 .await
505 }
506
507 #[tokio::test]
508 async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
510 let cfg = ProbeConfig {
511 timeout: std::time::Duration::from_millis(5),
512 interval: std::time::Duration::from_secs(0),
513 ..Default::default()
514 };
515
516 let store = PeerStore {
517 get_peers: Arc::new(RwLock::new(VecDeque::new())),
518 on_finished: Arc::new(RwLock::new(Vec::new())),
519 };
520
521 test_with_probing(cfg, store, Cache {}, move |iface: TestInterface| async move {
522 let mut manual_probe_tx = iface.manual_probe_tx;
523 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
524 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
525
526 let (tx, mut rx) = futures::channel::mpsc::unbounded::<std::result::Result<Duration, ProbeError>>();
527 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
528
529 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
530 from_probing_to_network_rx
531 .for_each_concurrent(
532 cfg.max_parallel_probes + 1,
533 concurrent_channel(None, NO_PROBE_PASSES, from_network_to_probing_tx),
534 )
535 .await;
536 });
537
538 assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
539
540 Ok(())
541 })
542 .await
543 }
544
545 #[tokio::test]
546 async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
548 let cfg = ProbeConfig {
549 timeout: std::time::Duration::from_millis(20),
550 max_parallel_probes: NEIGHBOURS.len(),
551 interval: std::time::Duration::from_secs(0),
552 ..Default::default()
553 };
554
555 let store = PeerStore {
556 get_peers: Arc::new(RwLock::new({
557 let mut neighbors = VecDeque::new();
558 neighbors.push_back(NEIGHBOURS.clone());
559 neighbors
560 })),
561 on_finished: Arc::new(RwLock::new(Vec::new())),
562 };
563
564 test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
565 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
566 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
567
568 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
569 from_probing_to_network_rx
570 .for_each_concurrent(
571 cfg.max_parallel_probes + 1,
572 concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
573 )
574 .await;
575 });
576
577 tokio::time::sleep(cfg.timeout).await;
579
580 Ok(())
581 })
582 .await?;
583
584 assert_eq!(
585 store
586 .on_finished
587 .read()
588 .expect("should be lockable")
589 .iter()
590 .filter(|(_peer, result)| result.is_ok())
591 .count(),
592 NEIGHBOURS.len()
593 );
594
595 Ok(())
596 }
597
598 #[tokio::test]
599 async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
601 let cfg = ProbeConfig {
602 timeout: std::time::Duration::from_millis(10),
603 max_parallel_probes: NEIGHBOURS.len(),
604 interval: std::time::Duration::from_secs(0),
605 ..Default::default()
606 };
607
608 let store = PeerStore {
609 get_peers: Arc::new(RwLock::new({
610 let mut neighbors = VecDeque::new();
611 neighbors.push_back(NEIGHBOURS.clone());
612 neighbors
613 })),
614 on_finished: Arc::new(RwLock::new(Vec::new())),
615 };
616
617 let timeout = cfg.timeout * 2;
618
619 test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
620 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
621 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
622
623 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
624 from_probing_to_network_rx
625 .for_each_concurrent(
626 cfg.max_parallel_probes + 1,
627 concurrent_channel(Some(timeout), ALL_PROBES_PASS, from_network_to_probing_tx),
628 )
629 .await;
630 });
631
632 tokio::time::sleep(timeout * 2).await;
634
635 Ok(())
636 })
637 .await?;
638
639 assert_eq!(
640 store
641 .on_finished
642 .read()
643 .expect("should be lockable")
644 .iter()
645 .filter(|(_peer, result)| result.is_err())
646 .count(),
647 NEIGHBOURS.len()
648 );
649
650 Ok(())
651 }
652
653 #[tokio::test]
654 async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
656 let cfg = ProbeConfig {
657 timeout: std::time::Duration::from_millis(20),
658 interval: std::time::Duration::from_secs(0),
659 ..Default::default()
660 };
661
662 let store = PeerStore {
663 get_peers: Arc::new(RwLock::new({
664 let mut neighbors = VecDeque::new();
665 neighbors.push_back(NEIGHBOURS.clone());
666 neighbors
667 })),
668 on_finished: Arc::new(RwLock::new(Vec::new())),
669 };
670
671 test_with_probing(cfg, store.clone(), Cache {}, move |iface: TestInterface| async move {
672 let mut from_network_to_probing_tx = iface.from_network_to_probing_tx;
673 let mut from_probing_up_rx = iface.from_probing_up_rx;
674
675 let expected_data = ApplicationData::new(Tag::MAX, b"Hello, this is a test message!");
676
677 from_network_to_probing_tx
678 .send((HoprPseudonym::random(), expected_data.clone()))
679 .await?;
680
681 let actual = tokio::time::timeout(cfg.timeout, from_probing_up_rx.next())
682 .await?
683 .ok_or_else(|| anyhow::anyhow!("Did not return any data in time"))?
684 .1;
685
686 assert_eq!(actual, expected_data);
687
688 Ok(())
689 })
690 .await
691 }
692}