1use std::sync::Arc;
2
3use futures::{FutureExt, SinkExt, StreamExt, pin_mut};
4use futures_concurrency::stream::StreamExt as _;
5use hopr_api::ct::{
6 NetworkGraphView, Telemetry, TrafficGeneration, traits::NetworkGraphUpdate, types::TrafficGenerationError,
7};
8use hopr_async_runtime::AbortableList;
9use hopr_crypto_random::Randomizable;
10use hopr_crypto_types::types::OffchainPublicKey;
11use hopr_internal_types::prelude::*;
12use hopr_network_types::prelude::*;
13use hopr_platform::time::native::current_time;
14use hopr_primitive_types::traits::AsUnixTimestamp;
15use hopr_protocol_app::prelude::{ApplicationDataIn, ApplicationDataOut, ReservedTag};
16use libp2p_identity::PeerId;
17
18use crate::{
19 HoprProbeProcess,
20 config::ProbeConfig,
21 content::Message,
22 errors::ProbeError,
23 ping::PingQueryReplier,
24 types::{NeighborProbe, NeighborTelemetry, PathTelemetry},
25};
26
27type CacheKey = (HoprPseudonym, NeighborProbe);
28type CacheValue = (Box<NodeId>, std::time::Duration, Option<PingQueryReplier>);
29
30pub struct Probe {
35 cfg: ProbeConfig,
37}
38
39impl Probe {
40 pub fn new(cfg: ProbeConfig) -> Self {
41 Self { cfg }
42 }
43
44 pub async fn continuously_scan<T, U, V, Up, Tr, G>(
46 self,
47 api: (T, U), manual_events: V, move_up: Up, traffic_generator: Tr,
51 network_graph: G,
52 ) -> AbortableList<HoprProbeProcess>
53 where
54 T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + 'static,
55 T::Error: Send,
56 U: futures::Stream<Item = (HoprPseudonym, ApplicationDataIn)> + Send + Sync + 'static,
57 V: futures::Stream<Item = (PeerId, PingQueryReplier)> + Send + Sync + 'static,
58 Up: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Clone + Send + Sync + 'static,
59 Tr: TrafficGeneration + Send + Sync + 'static,
60 G: NetworkGraphView + NetworkGraphUpdate + Clone + Send + Sync + 'static,
61 {
62 let max_parallel_probes = self.cfg.max_parallel_probes;
63
64 let probing_routes = traffic_generator.build(network_graph.clone());
65
66 let network_graph_internal = network_graph.clone();
68 let timeout = self.cfg.timeout;
69 let active_probes: moka::future::Cache<CacheKey, CacheValue> = moka::future::Cache::builder()
70 .time_to_live(timeout)
71 .max_capacity(100_000)
72 .async_eviction_listener(
73 move |k: Arc<(HoprPseudonym, NeighborProbe)>,
74 v: (Box<NodeId>, std::time::Duration, Option<PingQueryReplier>),
75 cause|
76 -> moka::notification::ListenerFuture {
77 if matches!(cause, moka::notification::RemovalCause::Expired) {
78 let store = network_graph_internal.clone();
80 let (peer, _start, notifier) = v;
81
82 tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "probe failed");
83 if let Some(replier) = notifier {
84 if let NodeId::Offchain(opk) = peer.as_ref() {
85 replier.notify(Err(ProbeError::TrafficError(
86 TrafficGenerationError::ProbeNeighborTimeout(opk.into()),
87 )));
88 } else {
89 tracing::warn!(
90 reason = "non-offchain peer",
91 "cannot notify timeout for non-offchain peer"
92 );
93 }
94 };
95
96 if let NodeId::Offchain(opk) = peer.as_ref() {
97 let peer: PeerId = opk.into();
98 futures::FutureExt::boxed(async move {
99 store
100 .record::<NeighborTelemetry, PathTelemetry>(Err(
101 TrafficGenerationError::ProbeNeighborTimeout(peer),
102 ))
103 .await
104 })
105 } else {
106 futures::FutureExt::boxed(futures::future::ready(()))
107 }
108 } else {
109 futures::FutureExt::boxed(futures::future::ready(()))
111 }
112 },
113 )
114 .build();
115
116 let active_probes_rx = active_probes.clone();
117 let push_to_network = api.0.clone();
118
119 let mut processes = AbortableList::default();
120
121 let direct_neighbors =
123 probing_routes
124 .map(|peer| (peer, None))
125 .merge(manual_events.filter_map(|(peer, notifier)| async move {
126 if let Ok(peer) = OffchainPublicKey::from_peerid(&peer) {
127 let routing = DestinationRouting::Forward {
128 destination: Box::new(peer.into()),
129 pseudonym: Some(HoprPseudonym::random()),
130 forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
131 return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
132 };
133 Some((routing, Some(notifier)))
134 } else {
135 None
136 }
137 }));
138
139 processes.insert(
140 HoprProbeProcess::Emit,
141 hopr_async_runtime::spawn_as_abortable!(async move {
142 direct_neighbors
143 .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
144 let active_probes = active_probes.clone();
145 let push_to_network = push_to_network.clone();
146
147 async move {
148 match peer {
149 DestinationRouting::Forward {
150 destination,
151 pseudonym,
152 forward_options,
153 return_options,
154 } => {
155 let nonce = NeighborProbe::random_nonce();
156
157 let message = Message::Probe(nonce);
158
159 if let Ok(data) = message.try_into() {
160 let routing = DestinationRouting::Forward {
161 destination: destination.clone(),
162 pseudonym,
163 forward_options,
164 return_options,
165 };
166 let data = ApplicationDataOut::with_no_packet_info(data);
167 pin_mut!(push_to_network);
168
169 if let Err(_error) = push_to_network.send((routing, data)).await {
170 tracing::error!("failed to send out a ping");
171 } else {
172 active_probes
173 .insert(
174 (
175 pseudonym
176 .expect("the pseudonym must be present in Forward routing"),
177 nonce,
178 ),
179 (destination, current_time().as_unix_timestamp(), notifier),
180 )
181 .await;
182 }
183 } else {
184 tracing::error!("failed to convert ping message into data");
185 }
186 }
187 DestinationRouting::Return(_surb_matcher) => tracing::error!(
188 error = "logical error",
189 "resolved transport routing is not forward"
190 ),
191 }
192 }
193 })
194 .inspect(|_| {
195 tracing::warn!(
196 task = "transport (probe - generate outgoing)",
197 "long-running background task finished"
198 )
199 })
200 .await;
201 }),
202 );
203
204 processes.insert(
206 HoprProbeProcess::Process,
207 hopr_async_runtime::spawn_as_abortable!(api.1.for_each_concurrent(max_parallel_probes, move |(pseudonym, in_data)| {
208 let active_probes = active_probes_rx.clone();
209 let push_to_network = api.0.clone();
210 let move_up = move_up.clone();
211 let store = network_graph.clone();
212
213 async move {
214 if in_data.data.application_tag == ReservedTag::Ping.into() {
216 let message: anyhow::Result<Message> = in_data.data.try_into().map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
217
218 match message {
219 Ok(message) => {
220 match message {
221 Message::Telemetry(path_telemetry) => {
222 store.record::<NeighborTelemetry, PathTelemetry>(Ok(Telemetry::Loopback(path_telemetry))).await
223 },
224 Message::Probe(NeighborProbe::Ping(ping)) => {
225 tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
226 tracing::trace!(%pseudonym, nonce = hex::encode(ping), "wrapping a pong in the found SURB");
227
228 let message = Message::Probe(NeighborProbe::Pong(ping));
229 if let Ok(data) = message.try_into() {
230 let routing = DestinationRouting::Return(pseudonym.into());
231 let data = ApplicationDataOut::with_no_packet_info(data);
232 pin_mut!(push_to_network);
233
234 if let Err(_error) = push_to_network.send((routing, data)).await {
235 tracing::error!(%pseudonym, "failed to send back a pong");
236 }
237 } else {
238 tracing::error!(%pseudonym, "failed to convert pong message into data");
239 }
240 },
241 Message::Probe(NeighborProbe::Pong(pong)) => {
242 tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
243 if let Some((peer, start, replier)) = active_probes.remove(&(pseudonym, NeighborProbe::Ping(pong))).await {
244 let latency = current_time()
245 .as_unix_timestamp()
246 .saturating_sub(start);
247
248 if let NodeId::Offchain(opk) = peer.as_ref() {
249 tracing::info!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful");
250 store.record::<NeighborTelemetry, PathTelemetry>(Ok(Telemetry::Neighbor(NeighborTelemetry {
251 peer: opk.into(),
252 rtt: latency,
253 }))).await
254 } else {
255 tracing::warn!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful to non-offchain peer");
256 }
257
258 if let Some(replier) = replier {
259 replier.notify(Ok(latency))
260 };
261 } else {
262 tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
263 };
264 },
265 }
266 },
267 Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
268 }
269 } else {
270 pin_mut!(move_up);
272 if move_up.send((pseudonym, in_data)).await.is_err() {
273 tracing::error!(%pseudonym, error = "receiver error", "failed to send message up");
274 }
275 }
276 }
277 }).inspect(|_| tracing::warn!(task = "transport (probe - processing incoming)", "long-running background task finished")))
278 );
279
280 processes
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use std::{collections::VecDeque, sync::RwLock, time::Duration};
287
288 use async_trait::async_trait;
289 use futures::future::BoxFuture;
290 use hopr_api::ct::types::TrafficGenerationError;
291 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
292 use hopr_ct_telemetry::{ImmediateNeighborProber, ProberConfig};
293 use hopr_protocol_app::prelude::{ApplicationData, Tag};
294
295 use super::*;
296
297 lazy_static::lazy_static!(
298 static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
299 static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
300 static ref NEIGHBOURS: Vec<PeerId> = vec![
301 OffchainKeypair::random().public().into(),
302 OffchainKeypair::random().public().into(),
303 OffchainKeypair::random().public().into(),
304 OffchainKeypair::random().public().into(),
305 ];
306 );
307
308 #[derive(Debug, Clone)]
309 pub struct PeerStore {
310 get_peers: Arc<RwLock<VecDeque<Vec<PeerId>>>>,
311 #[allow(clippy::type_complexity)]
312 on_finished: Arc<RwLock<Vec<(PeerId, crate::errors::Result<Duration>)>>>,
313 }
314
315 #[async_trait]
316 impl NetworkGraphUpdate for PeerStore {
317 async fn record<N, P>(&self, telemetry: std::result::Result<Telemetry<N, P>, TrafficGenerationError<P>>)
318 where
319 N: hopr_api::ct::MeasurableNeighbor + Send + Clone,
320 P: hopr_api::ct::MeasurablePath + Send + Clone,
321 {
322 let mut on_finished = self.on_finished.write().unwrap();
323
324 match telemetry {
325 Ok(Telemetry::Neighbor(neighbor_telemetry)) => {
326 let peer: PeerId = neighbor_telemetry.peer().clone();
327 let duration = neighbor_telemetry.rtt();
328 on_finished.push((peer, Ok(duration)));
329 }
330 Err(TrafficGenerationError::ProbeNeighborTimeout(peer)) => {
331 on_finished.push((
332 peer.clone(),
333 Err(ProbeError::TrafficError(TrafficGenerationError::ProbeNeighborTimeout(
334 peer,
335 ))),
336 ));
337 }
338 _ => panic!("unexpected telemetry type, unimplemented"),
339 }
340 }
341 }
342
343 #[async_trait]
344 impl NetworkGraphView for PeerStore {
345 fn nodes(&self) -> futures::stream::BoxStream<'static, PeerId> {
347 let mut get_peers = self.get_peers.write().unwrap();
348 Box::pin(futures::stream::iter(get_peers.pop_front().unwrap_or_default()))
349 }
350
351 async fn find_routes(&self, destination: &PeerId, length: usize) -> Vec<DestinationRouting> {
353 tracing::debug!(%destination, %length, "finding routes in test peer store");
354 vec![]
355 }
356 }
357
358 struct TestInterface {
359 from_probing_up_rx: futures::channel::mpsc::Receiver<(HoprPseudonym, ApplicationDataIn)>,
360 from_probing_to_network_rx: futures::channel::mpsc::Receiver<(DestinationRouting, ApplicationDataOut)>,
361 from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
362 manual_probe_tx: futures::channel::mpsc::Sender<(PeerId, PingQueryReplier)>,
363 }
364
365 async fn test_with_probing<F, St, Fut>(cfg: ProbeConfig, store: St, test: F) -> anyhow::Result<()>
366 where
367 Fut: std::future::Future<Output = anyhow::Result<()>>,
368 F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
369 St: NetworkGraphUpdate + NetworkGraphView + Clone + Send + Sync + 'static,
370 {
371 let probe = Probe::new(cfg.clone());
372
373 let (from_probing_up_tx, from_probing_up_rx) =
374 futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
375
376 let (from_probing_to_network_tx, from_probing_to_network_rx) =
377 futures::channel::mpsc::channel::<(DestinationRouting, ApplicationDataOut)>(100);
378
379 let (from_network_to_probing_tx, from_network_to_probing_rx) =
380 futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
381
382 let (manual_probe_tx, manual_probe_rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(100);
383
384 let interface = TestInterface {
385 from_probing_up_rx,
386 from_probing_to_network_rx,
387 from_network_to_probing_tx,
388 manual_probe_tx,
389 };
390
391 let jhs = probe
392 .continuously_scan(
393 (from_probing_to_network_tx, from_network_to_probing_rx),
394 manual_probe_rx,
395 from_probing_up_tx,
396 ImmediateNeighborProber::new(ProberConfig {
397 interval: cfg.interval,
398 recheck_threshold: cfg.recheck_threshold,
399 }),
400 store,
401 )
402 .await;
403
404 let result = test(interface).await;
405
406 jhs.abort_all();
407
408 result
409 }
410
411 const NO_PROBE_PASSES: f64 = 0.0;
412 const ALL_PROBES_PASS: f64 = 1.0;
413
414 fn concurrent_channel(
416 delay: Option<std::time::Duration>,
417 pass_rate: f64,
418 from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
419 ) -> impl Fn((DestinationRouting, ApplicationDataOut)) -> BoxFuture<'static, ()> {
420 debug_assert!(
421 (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
422 "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
423 );
424
425 move |(path, data_out): (DestinationRouting, ApplicationDataOut)| -> BoxFuture<'static, ()> {
426 let mut from_network_to_probing_tx = from_network_to_probing_tx.clone();
427
428 Box::pin(async move {
429 if let DestinationRouting::Forward { pseudonym, .. } = path {
430 let message: Message = data_out.data.try_into().expect("failed to convert data into message");
431 if let Message::Probe(NeighborProbe::Ping(ping)) = message {
432 let pong_message = Message::Probe(NeighborProbe::Pong(ping));
433
434 if let Some(delay) = delay {
435 tokio::time::sleep(delay).await;
437 }
438
439 if rand::Rng::gen_range(&mut rand::thread_rng(), NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate
440 {
441 from_network_to_probing_tx
442 .send((
443 pseudonym.expect("the pseudonym is always known from cache"),
444 ApplicationDataIn {
445 data: pong_message
446 .try_into()
447 .expect("failed to convert pong message into data"),
448 packet_info: Default::default(),
449 },
450 ))
451 .await
452 .expect("failed to send pong message");
453 }
454 }
455 };
456 })
457 }
458 }
459
460 #[tokio::test]
461 async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
463 let cfg = ProbeConfig {
464 timeout: std::time::Duration::from_millis(5),
465 interval: std::time::Duration::from_secs(0),
466 ..Default::default()
467 };
468
469 let store = PeerStore {
470 get_peers: Arc::new(RwLock::new(VecDeque::new())),
471 on_finished: Arc::new(RwLock::new(Vec::new())),
472 };
473
474 test_with_probing(cfg, store, move |iface: TestInterface| async move {
475 let mut manual_probe_tx = iface.manual_probe_tx;
476 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
477 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
478
479 let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
480 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
481
482 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
483 from_probing_to_network_rx
484 .for_each_concurrent(
485 cfg.max_parallel_probes + 1,
486 concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
487 )
488 .await;
489 });
490
491 let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
492 .await?
493 .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))??;
494
495 Ok(())
496 })
497 .await
498 }
499
500 #[tokio::test]
501 async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
503 let cfg = ProbeConfig {
504 timeout: std::time::Duration::from_millis(5),
505 interval: std::time::Duration::from_secs(0),
506 ..Default::default()
507 };
508
509 let store = PeerStore {
510 get_peers: Arc::new(RwLock::new(VecDeque::new())),
511 on_finished: Arc::new(RwLock::new(Vec::new())),
512 };
513
514 test_with_probing(cfg, store, move |iface: TestInterface| async move {
515 let mut manual_probe_tx = iface.manual_probe_tx;
516 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
517 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
518
519 let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
520 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
521
522 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
523 from_probing_to_network_rx
524 .for_each_concurrent(
525 cfg.max_parallel_probes + 1,
526 concurrent_channel(None, NO_PROBE_PASSES, from_network_to_probing_tx),
527 )
528 .await;
529 });
530
531 assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
532
533 Ok(())
534 })
535 .await
536 }
537
538 #[tokio::test]
539 async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
541 let cfg = ProbeConfig {
542 timeout: std::time::Duration::from_millis(20),
543 max_parallel_probes: NEIGHBOURS.len(),
544 interval: std::time::Duration::from_secs(0),
545 ..Default::default()
546 };
547
548 let store = PeerStore {
549 get_peers: Arc::new(RwLock::new({
550 let mut neighbors = VecDeque::new();
551 neighbors.push_back(NEIGHBOURS.clone());
552 neighbors
553 })),
554 on_finished: Arc::new(RwLock::new(Vec::new())),
555 };
556
557 test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
558 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
559 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
560
561 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
562 from_probing_to_network_rx
563 .for_each_concurrent(
564 cfg.max_parallel_probes + 1,
565 concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
566 )
567 .await;
568 });
569
570 tokio::time::sleep(cfg.timeout).await;
572
573 Ok(())
574 })
575 .await?;
576
577 assert_eq!(
578 store
579 .on_finished
580 .read()
581 .expect("should be lockable")
582 .iter()
583 .filter(|(_peer, result)| result.is_ok())
584 .count(),
585 NEIGHBOURS.len()
586 );
587
588 Ok(())
589 }
590
591 #[tokio::test]
592 async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
594 let cfg = ProbeConfig {
595 timeout: std::time::Duration::from_millis(10),
596 max_parallel_probes: NEIGHBOURS.len(),
597 interval: std::time::Duration::from_secs(0),
598 ..Default::default()
599 };
600
601 let store = PeerStore {
602 get_peers: Arc::new(RwLock::new({
603 let mut neighbors = VecDeque::new();
604 neighbors.push_back(NEIGHBOURS.clone());
605 neighbors
606 })),
607 on_finished: Arc::new(RwLock::new(Vec::new())),
608 };
609
610 let timeout = cfg.timeout * 2;
611
612 test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
613 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
614 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
615
616 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
617 from_probing_to_network_rx
618 .for_each_concurrent(
619 cfg.max_parallel_probes + 1,
620 concurrent_channel(Some(timeout), ALL_PROBES_PASS, from_network_to_probing_tx),
621 )
622 .await;
623 });
624
625 tokio::time::sleep(timeout * 2).await;
627
628 Ok(())
629 })
630 .await?;
631
632 assert_eq!(
633 store
634 .on_finished
635 .read()
636 .expect("should be lockable")
637 .iter()
638 .filter(|(_peer, result)| result.is_err())
639 .count(),
640 NEIGHBOURS.len()
641 );
642
643 Ok(())
644 }
645
646 #[tokio::test]
647 async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
649 let cfg = ProbeConfig {
650 timeout: std::time::Duration::from_millis(20),
651 interval: std::time::Duration::from_secs(0),
652 ..Default::default()
653 };
654
655 let store = PeerStore {
656 get_peers: Arc::new(RwLock::new({
657 let mut neighbors = VecDeque::new();
658 neighbors.push_back(NEIGHBOURS.clone());
659 neighbors
660 })),
661 on_finished: Arc::new(RwLock::new(Vec::new())),
662 };
663
664 test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
665 let mut from_network_to_probing_tx = iface.from_network_to_probing_tx;
666 let mut from_probing_up_rx = iface.from_probing_up_rx;
667
668 let expected_data = ApplicationData::new(Tag::MAX, b"Hello, this is a test message!")?;
669
670 from_network_to_probing_tx
671 .send((
672 HoprPseudonym::random(),
673 ApplicationDataIn {
674 data: expected_data.clone(),
675 packet_info: Default::default(),
676 },
677 ))
678 .await?;
679
680 let actual = tokio::time::timeout(cfg.timeout, from_probing_up_rx.next())
681 .await?
682 .ok_or_else(|| anyhow::anyhow!("Did not return any data in time"))?
683 .1;
684
685 assert_eq!(actual.data, expected_data);
686
687 Ok(())
688 })
689 .await
690 }
691}