1use std::sync::Arc;
2
3use futures::{FutureExt, SinkExt, StreamExt, pin_mut};
4use futures_concurrency::stream::StreamExt as _;
5use hopr_api::ct::{
6 NetworkGraphView, Telemetry, TrafficGeneration, traits::NetworkGraphUpdate, types::TrafficGenerationError,
7};
8use hopr_async_runtime::AbortableList;
9use hopr_crypto_random::Randomizable;
10use hopr_crypto_types::types::OffchainPublicKey;
11use hopr_internal_types::prelude::*;
12use hopr_network_types::prelude::*;
13use hopr_platform::time::native::current_time;
14use hopr_primitive_types::traits::AsUnixTimestamp;
15use hopr_protocol_app::prelude::{ApplicationDataIn, ApplicationDataOut, ReservedTag};
16use libp2p_identity::PeerId;
17
18use crate::{
19 HoprProbeProcess,
20 config::ProbeConfig,
21 content::Message,
22 errors::ProbeError,
23 ping::PingQueryReplier,
24 types::{NeighborProbe, NeighborTelemetry, PathTelemetry},
25};
26
27type CacheKey = (HoprPseudonym, NeighborProbe);
28type CacheValue = (Box<NodeId>, std::time::Duration, Option<PingQueryReplier>);
29
30pub struct Probe {
35 cfg: ProbeConfig,
37}
38
39impl Probe {
40 pub fn new(cfg: ProbeConfig) -> Self {
41 Self { cfg }
42 }
43
44 pub async fn continuously_scan<T, U, V, Up, Tr, G>(
46 self,
47 api: (T, U), manual_events: V, move_up: Up, traffic_generator: Tr,
51 network_graph: G,
52 ) -> AbortableList<HoprProbeProcess>
53 where
54 T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + 'static,
55 T::Error: Send,
56 U: futures::Stream<Item = (HoprPseudonym, ApplicationDataIn)> + Send + Sync + 'static,
57 V: futures::Stream<Item = (PeerId, PingQueryReplier)> + Send + Sync + 'static,
58 Up: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Clone + Send + Sync + 'static,
59 Tr: TrafficGeneration + Send + Sync + 'static,
60 G: NetworkGraphView + NetworkGraphUpdate + Clone + Send + Sync + 'static,
61 {
62 let max_parallel_probes = self.cfg.max_parallel_probes;
63
64 let probing_routes = traffic_generator.build(network_graph.clone());
65
66 let network_graph_internal = network_graph.clone();
68 let timeout = self.cfg.timeout;
69 let active_probes: moka::future::Cache<CacheKey, CacheValue> = moka::future::Cache::builder()
70 .time_to_live(timeout)
71 .max_capacity(100_000)
72 .async_eviction_listener(
73 move |k: Arc<(HoprPseudonym, NeighborProbe)>,
74 v: (Box<NodeId>, std::time::Duration, Option<PingQueryReplier>),
75 cause|
76 -> moka::notification::ListenerFuture {
77 if matches!(cause, moka::notification::RemovalCause::Expired) {
78 let store = network_graph_internal.clone();
80 let (peer, _start, notifier) = v;
81
82 tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "probe failed");
83 if let Some(replier) = notifier {
84 if let NodeId::Offchain(opk) = peer.as_ref() {
85 replier.notify(Err(ProbeError::TrafficError(
86 TrafficGenerationError::ProbeNeighborTimeout(opk.into()),
87 )));
88 } else {
89 tracing::warn!(
90 reason = "non-offchain peer",
91 "cannot notify timeout for non-offchain peer"
92 );
93 }
94 };
95
96 if let NodeId::Offchain(opk) = peer.as_ref() {
97 let peer: PeerId = opk.into();
98 futures::FutureExt::boxed(async move {
99 store
100 .record::<NeighborTelemetry, PathTelemetry>(Err(
101 TrafficGenerationError::ProbeNeighborTimeout(peer),
102 ))
103 .await
104 })
105 } else {
106 futures::FutureExt::boxed(futures::future::ready(()))
107 }
108 } else {
109 futures::FutureExt::boxed(futures::future::ready(()))
111 }
112 },
113 )
114 .build();
115
116 let active_probes_rx = active_probes.clone();
117 let push_to_network = api.0.clone();
118
119 let mut processes = AbortableList::default();
120
121 let direct_neighbors =
123 probing_routes
124 .map(|peer| (peer, None))
125 .merge(manual_events.filter_map(|(peer, notifier)| async move {
126 if let Ok(peer) = OffchainPublicKey::from_peerid(&peer) {
127 let routing = DestinationRouting::Forward {
128 destination: Box::new(peer.into()),
129 pseudonym: Some(HoprPseudonym::random()),
130 forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
131 return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
132 };
133 Some((routing, Some(notifier)))
134 } else {
135 None
136 }
137 }));
138
139 processes.insert(
140 HoprProbeProcess::Emit,
141 hopr_async_runtime::spawn_as_abortable!(async move {
142 direct_neighbors
143 .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
144 let active_probes = active_probes.clone();
145 let push_to_network = push_to_network.clone();
146
147 async move {
148 match peer {
149 DestinationRouting::Forward {
150 destination,
151 pseudonym,
152 forward_options,
153 return_options,
154 } => {
155 let nonce = NeighborProbe::random_nonce();
156
157 let message = Message::Probe(nonce);
158
159 if let Ok(data) = message.try_into() {
160 let routing = DestinationRouting::Forward {
161 destination: destination.clone(),
162 pseudonym,
163 forward_options,
164 return_options,
165 };
166 let data = ApplicationDataOut::with_no_packet_info(data);
167 pin_mut!(push_to_network);
168
169 if let Err(_error) = push_to_network.send((routing, data)).await {
170 tracing::error!("failed to send out a ping");
171 } else {
172 active_probes
173 .insert(
174 (
175 pseudonym
176 .expect("the pseudonym must be present in Forward routing"),
177 nonce,
178 ),
179 (destination, current_time().as_unix_timestamp(), notifier),
180 )
181 .await;
182 }
183 } else {
184 tracing::error!("failed to convert ping message into data");
185 }
186 }
187 DestinationRouting::Return(_surb_matcher) => tracing::error!(
188 error = "logical error",
189 "resolved transport routing is not forward"
190 ),
191 }
192 }
193 })
194 .inspect(|_| {
195 tracing::warn!(
196 task = "transport (probe - generate outgoing)",
197 "long-running background task finished"
198 )
199 })
200 .await;
201 }),
202 );
203
204 processes.insert(
206 HoprProbeProcess::Process,
207 hopr_async_runtime::spawn_as_abortable!(api.1.for_each_concurrent(max_parallel_probes, move |(pseudonym, in_data)| {
208 let active_probes = active_probes_rx.clone();
209 let push_to_network = api.0.clone();
210 let move_up = move_up.clone();
211 let store = network_graph.clone();
212
213 async move {
214 if in_data.data.application_tag == ReservedTag::Ping.into() {
216 let message: anyhow::Result<Message> = in_data.data.try_into().map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
217
218 match message {
219 Ok(message) => {
220 match message {
221 Message::Telemetry(path_telemetry) => {
222 store.record::<NeighborTelemetry, PathTelemetry>(Ok(Telemetry::Loopback(path_telemetry))).await
223 },
224 Message::Probe(NeighborProbe::Ping(ping)) => {
225 tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
226 tracing::trace!(%pseudonym, nonce = hex::encode(ping), "wrapping a pong in the found SURB");
227
228 let message = Message::Probe(NeighborProbe::Pong(ping));
229 if let Ok(data) = message.try_into() {
230 let routing = DestinationRouting::Return(pseudonym.into());
231 let data = ApplicationDataOut::with_no_packet_info(data);
232 pin_mut!(push_to_network);
233
234 if let Err(_error) = push_to_network.send((routing, data)).await {
235 tracing::error!(%pseudonym, "failed to send back a pong");
236 }
237 } else {
238 tracing::error!(%pseudonym, "failed to convert pong message into data");
239 }
240 },
241 Message::Probe(NeighborProbe::Pong(pong)) => {
242 tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
243 if let Some((peer, start, replier)) = active_probes.remove(&(pseudonym, NeighborProbe::Ping(pong))).await {
244 let latency = current_time()
245 .as_unix_timestamp()
246 .saturating_sub(start);
247
248 if let NodeId::Offchain(opk) = peer.as_ref() {
249 tracing::debug!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful");
250 store.record::<NeighborTelemetry, PathTelemetry>(Ok(Telemetry::Neighbor(NeighborTelemetry {
251 peer: opk.into(),
252 rtt: latency,
253 }))).await
254 } else {
255 tracing::warn!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful to non-offchain peer");
256 }
257
258 if let Some(replier) = replier {
259 replier.notify(Ok(latency))
260 };
261 } else {
262 tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
263 };
264 },
265 }
266 },
267 Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
268 }
269 } else {
270 pin_mut!(move_up);
272 if move_up.send((pseudonym, in_data)).await.is_err() {
273 tracing::error!(%pseudonym, error = "receiver error", "failed to send message up");
274 }
275 }
276 }
277 }).inspect(|_| tracing::warn!(task = "transport (probe - processing incoming)", "long-running background task finished")))
278 );
279
280 processes
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use std::{collections::VecDeque, sync::RwLock, time::Duration};
287
288 use async_trait::async_trait;
289 use futures::future::BoxFuture;
290 use hopr_api::ct::types::TrafficGenerationError;
291 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
292 use hopr_ct_telemetry::{ImmediateNeighborProber, ProberConfig};
293 use hopr_protocol_app::prelude::{ApplicationData, Tag};
294
295 use super::*;
296
297 lazy_static::lazy_static!(
298 static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
299 static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
300 static ref NEIGHBOURS: Vec<PeerId> = vec![
301 OffchainKeypair::random().public().into(),
302 OffchainKeypair::random().public().into(),
303 OffchainKeypair::random().public().into(),
304 OffchainKeypair::random().public().into(),
305 ];
306 );
307
308 #[derive(Debug, Clone)]
309 pub struct PeerStore {
310 get_peers: Arc<RwLock<VecDeque<Vec<PeerId>>>>,
311 #[allow(clippy::type_complexity)]
312 on_finished: Arc<RwLock<Vec<(PeerId, crate::errors::Result<Duration>)>>>,
313 }
314
315 #[async_trait]
316 impl NetworkGraphUpdate for PeerStore {
317 async fn record<N, P>(&self, telemetry: std::result::Result<Telemetry<N, P>, TrafficGenerationError<P>>)
318 where
319 N: hopr_api::ct::MeasurableNeighbor + Send + Clone,
320 P: hopr_api::ct::MeasurablePath + Send + Clone,
321 {
322 let mut on_finished = self.on_finished.write().unwrap();
323
324 match telemetry {
325 Ok(Telemetry::Neighbor(neighbor_telemetry)) => {
326 let peer: PeerId = *neighbor_telemetry.peer();
327 let duration = neighbor_telemetry.rtt();
328 on_finished.push((peer, Ok(duration)));
329 }
330 Err(TrafficGenerationError::ProbeNeighborTimeout(peer)) => {
331 on_finished.push((
332 peer,
333 Err(ProbeError::TrafficError(TrafficGenerationError::ProbeNeighborTimeout(
334 peer,
335 ))),
336 ));
337 }
338 _ => panic!("unexpected telemetry type, unimplemented"),
339 }
340 }
341 }
342
343 #[async_trait]
344 impl NetworkGraphView for PeerStore {
345 fn nodes(&self) -> futures::stream::BoxStream<'static, PeerId> {
347 let mut get_peers = self.get_peers.write().unwrap();
348 Box::pin(futures::stream::iter(get_peers.pop_front().unwrap_or_default()))
349 }
350
351 async fn find_routes(&self, destination: &PeerId, length: usize) -> Vec<DestinationRouting> {
353 tracing::debug!(%destination, %length, "finding routes in test peer store");
354 vec![]
355 }
356 }
357
358 struct TestInterface {
359 from_probing_up_rx: futures::channel::mpsc::Receiver<(HoprPseudonym, ApplicationDataIn)>,
360 from_probing_to_network_rx: futures::channel::mpsc::Receiver<(DestinationRouting, ApplicationDataOut)>,
361 from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
362 manual_probe_tx: futures::channel::mpsc::Sender<(PeerId, PingQueryReplier)>,
363 }
364
365 async fn test_with_probing<F, St, Fut>(cfg: ProbeConfig, store: St, test: F) -> anyhow::Result<()>
366 where
367 Fut: std::future::Future<Output = anyhow::Result<()>>,
368 F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
369 St: NetworkGraphUpdate + NetworkGraphView + Clone + Send + Sync + 'static,
370 {
371 let probe = Probe::new(cfg);
372
373 let (from_probing_up_tx, from_probing_up_rx) =
374 futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
375
376 let (from_probing_to_network_tx, from_probing_to_network_rx) =
377 futures::channel::mpsc::channel::<(DestinationRouting, ApplicationDataOut)>(100);
378
379 let (from_network_to_probing_tx, from_network_to_probing_rx) =
380 futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
381
382 let (manual_probe_tx, manual_probe_rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(100);
383
384 let interface = TestInterface {
385 from_probing_up_rx,
386 from_probing_to_network_rx,
387 from_network_to_probing_tx,
388 manual_probe_tx,
389 };
390
391 let jhs = probe
392 .continuously_scan(
393 (from_probing_to_network_tx, from_network_to_probing_rx),
394 manual_probe_rx,
395 from_probing_up_tx,
396 ImmediateNeighborProber::new(ProberConfig {
397 interval: cfg.interval,
398 recheck_threshold: cfg.recheck_threshold,
399 }),
400 store,
401 )
402 .await;
403
404 let result = test(interface).await;
405
406 jhs.abort_all();
407
408 result
409 }
410
411 const NO_PROBE_PASSES: f64 = 0.0;
412 const ALL_PROBES_PASS: f64 = 1.0;
413
414 fn concurrent_channel(
416 delay: Option<std::time::Duration>,
417 pass_rate: f64,
418 from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
419 ) -> impl Fn((DestinationRouting, ApplicationDataOut)) -> BoxFuture<'static, ()> {
420 debug_assert!(
421 (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
422 "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
423 );
424
425 move |(path, data_out): (DestinationRouting, ApplicationDataOut)| -> BoxFuture<'static, ()> {
426 let mut from_network_to_probing_tx = from_network_to_probing_tx.clone();
427
428 Box::pin(async move {
429 if let DestinationRouting::Forward { pseudonym, .. } = path {
430 let message: Message = data_out.data.try_into().expect("failed to convert data into message");
431 if let Message::Probe(NeighborProbe::Ping(ping)) = message {
432 let pong_message = Message::Probe(NeighborProbe::Pong(ping));
433
434 if let Some(delay) = delay {
435 tokio::time::sleep(delay).await;
437 }
438
439 if rand::random_range(NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate {
440 from_network_to_probing_tx
441 .send((
442 pseudonym.expect("the pseudonym is always known from cache"),
443 ApplicationDataIn {
444 data: pong_message
445 .try_into()
446 .expect("failed to convert pong message into data"),
447 packet_info: Default::default(),
448 },
449 ))
450 .await
451 .expect("failed to send pong message");
452 }
453 }
454 };
455 })
456 }
457 }
458
459 #[tokio::test]
460 async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
462 let cfg = ProbeConfig {
463 timeout: std::time::Duration::from_millis(5),
464 interval: std::time::Duration::from_secs(0),
465 ..Default::default()
466 };
467
468 let store = PeerStore {
469 get_peers: Arc::new(RwLock::new(VecDeque::new())),
470 on_finished: Arc::new(RwLock::new(Vec::new())),
471 };
472
473 test_with_probing(cfg, store, move |iface: TestInterface| async move {
474 let mut manual_probe_tx = iface.manual_probe_tx;
475 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
476 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
477
478 let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
479 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
480
481 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
482 from_probing_to_network_rx
483 .for_each_concurrent(
484 cfg.max_parallel_probes + 1,
485 concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
486 )
487 .await;
488 });
489
490 let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
491 .await?
492 .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))??;
493
494 Ok(())
495 })
496 .await
497 }
498
499 #[tokio::test]
500 async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
502 let cfg = ProbeConfig {
503 timeout: std::time::Duration::from_millis(5),
504 interval: std::time::Duration::from_secs(0),
505 ..Default::default()
506 };
507
508 let store = PeerStore {
509 get_peers: Arc::new(RwLock::new(VecDeque::new())),
510 on_finished: Arc::new(RwLock::new(Vec::new())),
511 };
512
513 test_with_probing(cfg, store, move |iface: TestInterface| async move {
514 let mut manual_probe_tx = iface.manual_probe_tx;
515 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
516 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
517
518 let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
519 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
520
521 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
522 from_probing_to_network_rx
523 .for_each_concurrent(
524 cfg.max_parallel_probes + 1,
525 concurrent_channel(None, NO_PROBE_PASSES, from_network_to_probing_tx),
526 )
527 .await;
528 });
529
530 assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
531
532 Ok(())
533 })
534 .await
535 }
536
537 #[tokio::test]
538 async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
540 let cfg = ProbeConfig {
541 timeout: std::time::Duration::from_millis(20),
542 max_parallel_probes: NEIGHBOURS.len(),
543 interval: std::time::Duration::from_secs(0),
544 ..Default::default()
545 };
546
547 let store = PeerStore {
548 get_peers: Arc::new(RwLock::new({
549 let mut neighbors = VecDeque::new();
550 neighbors.push_back(NEIGHBOURS.clone());
551 neighbors
552 })),
553 on_finished: Arc::new(RwLock::new(Vec::new())),
554 };
555
556 test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
557 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
558 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
559
560 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
561 from_probing_to_network_rx
562 .for_each_concurrent(
563 cfg.max_parallel_probes + 1,
564 concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
565 )
566 .await;
567 });
568
569 tokio::time::sleep(cfg.timeout).await;
571
572 Ok(())
573 })
574 .await?;
575
576 assert_eq!(
577 store
578 .on_finished
579 .read()
580 .expect("should be lockable")
581 .iter()
582 .filter(|(_peer, result)| result.is_ok())
583 .count(),
584 NEIGHBOURS.len()
585 );
586
587 Ok(())
588 }
589
590 #[tokio::test]
591 async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
593 let cfg = ProbeConfig {
594 timeout: std::time::Duration::from_millis(10),
595 max_parallel_probes: NEIGHBOURS.len(),
596 interval: std::time::Duration::from_secs(0),
597 ..Default::default()
598 };
599
600 let store = PeerStore {
601 get_peers: Arc::new(RwLock::new({
602 let mut neighbors = VecDeque::new();
603 neighbors.push_back(NEIGHBOURS.clone());
604 neighbors
605 })),
606 on_finished: Arc::new(RwLock::new(Vec::new())),
607 };
608
609 let timeout = cfg.timeout * 2;
610
611 test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
612 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
613 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
614
615 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
616 from_probing_to_network_rx
617 .for_each_concurrent(
618 cfg.max_parallel_probes + 1,
619 concurrent_channel(Some(timeout), ALL_PROBES_PASS, from_network_to_probing_tx),
620 )
621 .await;
622 });
623
624 tokio::time::sleep(timeout * 2).await;
626
627 Ok(())
628 })
629 .await?;
630
631 assert_eq!(
632 store
633 .on_finished
634 .read()
635 .expect("should be lockable")
636 .iter()
637 .filter(|(_peer, result)| result.is_err())
638 .count(),
639 NEIGHBOURS.len()
640 );
641
642 Ok(())
643 }
644
645 #[tokio::test]
646 async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
648 let cfg = ProbeConfig {
649 timeout: std::time::Duration::from_millis(20),
650 interval: std::time::Duration::from_secs(0),
651 ..Default::default()
652 };
653
654 let store = PeerStore {
655 get_peers: Arc::new(RwLock::new({
656 let mut neighbors = VecDeque::new();
657 neighbors.push_back(NEIGHBOURS.clone());
658 neighbors
659 })),
660 on_finished: Arc::new(RwLock::new(Vec::new())),
661 };
662
663 test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
664 let mut from_network_to_probing_tx = iface.from_network_to_probing_tx;
665 let mut from_probing_up_rx = iface.from_probing_up_rx;
666
667 let expected_data = ApplicationData::new(Tag::MAX, b"Hello, this is a test message!")?;
668
669 from_network_to_probing_tx
670 .send((
671 HoprPseudonym::random(),
672 ApplicationDataIn {
673 data: expected_data.clone(),
674 packet_info: Default::default(),
675 },
676 ))
677 .await?;
678
679 let actual = tokio::time::timeout(cfg.timeout, from_probing_up_rx.next())
680 .await?
681 .ok_or_else(|| anyhow::anyhow!("Did not return any data in time"))?
682 .1;
683
684 assert_eq!(actual.data, expected_data);
685
686 Ok(())
687 })
688 .await
689 }
690}