1use std::sync::Arc;
2
3use futures::{FutureExt, SinkExt, StreamExt, pin_mut};
4use futures_concurrency::stream::StreamExt as _;
5use hopr_async_runtime::AbortableList;
6use hopr_crypto_random::Randomizable;
7use hopr_crypto_types::types::OffchainPublicKey;
8use hopr_internal_types::prelude::*;
9use hopr_network_types::prelude::*;
10use hopr_platform::time::native::current_time;
11use hopr_primitive_types::traits::AsUnixTimestamp;
12use hopr_protocol_app::prelude::{ApplicationDataIn, ApplicationDataOut, ReservedTag};
13use libp2p_identity::PeerId;
14
15use crate::{
16 HoprProbeProcess,
17 config::ProbeConfig,
18 content::{Message, NeighborProbe},
19 errors::ProbeError,
20 ping::PingQueryReplier,
21 traits::TrafficGeneration,
22 types::{NeighborTelemetry, Telemetry},
23};
24
25#[inline(always)]
26fn to_nonce(message: &Message) -> String {
27 match message {
28 Message::Probe(NeighborProbe::Ping(ping)) => hex::encode(ping),
29 Message::Probe(NeighborProbe::Pong(pong)) => hex::encode(pong),
30 _ => "<telemetry>".to_string(),
31 }
32}
33
34#[inline(always)]
35fn to_pseudonym(path: &DestinationRouting) -> Option<HoprPseudonym> {
36 match path {
37 DestinationRouting::Forward { pseudonym, .. } => *pseudonym,
38 DestinationRouting::Return(matcher) => match matcher {
39 hopr_network_types::types::SurbMatcher::Exact(sender_id) => Some(sender_id.pseudonym()),
40 hopr_network_types::types::SurbMatcher::Pseudonym(pseudonym) => Some(*pseudonym),
41 },
42 }
43}
44
45#[derive(Clone, Debug)]
46struct Sender<T> {
47 downstream: T,
48}
49
50impl<T> Sender<T>
51where
52 T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + 'static,
53{
54 #[tracing::instrument(level = "debug", skip(self, path, message), fields(message=%message, nonce=%to_nonce(&message), pseudonym=?to_pseudonym(&path)), ret(level = tracing::Level::TRACE), err(Display))]
55 async fn send_message(self, path: DestinationRouting, message: Message) -> crate::errors::Result<()> {
56 let push_to_network = self.downstream;
57 pin_mut!(push_to_network);
58 if push_to_network
59 .as_mut()
60 .send((path, ApplicationDataOut::with_no_packet_info(message.try_into()?)))
61 .await
62 .is_ok()
63 {
64 Ok(())
65 } else {
66 Err(ProbeError::SendError("transport error".to_string()))
67 }
68 }
69}
70
71type CacheKey = (HoprPseudonym, NeighborProbe);
72type CacheValue = (Box<NodeId>, std::time::Duration, Option<PingQueryReplier>);
73
74pub struct Probe {
79 cfg: ProbeConfig,
81}
82
83impl Probe {
84 pub fn new(cfg: ProbeConfig) -> Self {
85 Self { cfg }
86 }
87
88 pub async fn continuously_scan<T, U, V, Up, Tr>(
90 self,
91 api: (T, U), manual_events: V, move_up: Up, traffic_generator: Tr,
95 ) -> AbortableList<HoprProbeProcess>
96 where
97 T: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + 'static,
98 T::Error: Send,
99 U: futures::Stream<Item = (HoprPseudonym, ApplicationDataIn)> + Send + Sync + 'static,
100 V: futures::Stream<Item = (PeerId, PingQueryReplier)> + Send + Sync + 'static,
101 Up: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Clone + Send + Sync + 'static,
102 Tr: TrafficGeneration + Send + Sync + 'static,
103 {
104 let max_parallel_probes = self.cfg.max_parallel_probes;
105 let interval_between_rounds = self.cfg.interval;
106
107 let (probing_routes, reports) = traffic_generator.build();
108
109 let store_eviction = reports.clone();
111 let timeout = self.cfg.timeout;
112 let active_probes: moka::future::Cache<CacheKey, CacheValue> = moka::future::Cache::builder()
113 .time_to_live(timeout)
114 .max_capacity(100_000)
115 .async_eviction_listener(
116 move |k: Arc<(HoprPseudonym, NeighborProbe)>,
117 v: (Box<NodeId>, std::time::Duration, Option<PingQueryReplier>),
118 cause|
119 -> moka::notification::ListenerFuture {
120 if matches!(cause, moka::notification::RemovalCause::Expired) {
121 let store = store_eviction.clone();
123 let (peer, _start, notifier) = v;
124
125 tracing::debug!(%peer, pseudonym = %k.0, probe = %k.1, reason = "timeout", "probe failed");
126 if let Some(replier) = notifier {
127 if let NodeId::Offchain(opk) = peer.as_ref() {
128 replier.notify(Err(ProbeError::ProbeNeighborTimeout(opk.into())));
129 } else {
130 tracing::warn!(
131 reason = "non-offchain peer",
132 "cannot notify timeout for non-offchain peer"
133 );
134 }
135 };
136
137 if let NodeId::Offchain(opk) = peer.as_ref() {
138 let peer: PeerId = opk.into();
139 futures::FutureExt::boxed(async move {
140 pin_mut!(store);
141 if let Err(error) = store.send(Err(ProbeError::ProbeNeighborTimeout(peer))).await {
142 tracing::error!(%peer, %error, "failed to record probe timeout");
143 }
144 })
145 } else {
146 futures::FutureExt::boxed(futures::future::ready(()))
147 }
148 } else {
149 futures::FutureExt::boxed(futures::future::ready(()))
151 }
152 },
153 )
154 .build();
155
156 let active_probes_rx = active_probes.clone();
157 let push_to_network = api.0.clone();
158
159 let mut processes = AbortableList::default();
160
161 let direct_neighbors =
163 probing_routes
164 .map(|peer| (peer, None))
165 .merge(manual_events.filter_map(|(peer, notifier)| async move {
166 if let Ok(peer) = OffchainPublicKey::from_peerid(&peer) {
167 let routing = DestinationRouting::Forward {
168 destination: Box::new(peer.into()),
169 pseudonym: Some(HoprPseudonym::random()),
170 forward_options: RoutingOptions::Hops(0.try_into().expect("0 is a valid u8")),
171 return_options: Some(RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"))),
172 };
173 Some((routing, Some(notifier)))
174 } else {
175 None
176 }
177 }));
178
179 processes.insert(
180 HoprProbeProcess::Emit,
181 hopr_async_runtime::spawn_as_abortable!(async move {
182 hopr_async_runtime::prelude::sleep(2 * interval_between_rounds).await; direct_neighbors
185 .for_each_concurrent(max_parallel_probes, move |(peer, notifier)| {
186 let active_probes = active_probes.clone();
187 let push_to_network = Sender {
188 downstream: push_to_network.clone(),
189 };
190
191 async move {
192 match peer {
193 DestinationRouting::Forward {
194 destination,
195 pseudonym,
196 forward_options,
197 return_options,
198 } => {
199 let nonce = NeighborProbe::random_nonce();
200
201 let message = Message::Probe(nonce);
202
203 let routing = DestinationRouting::Forward {
204 destination: destination.clone(),
205 pseudonym,
206 forward_options,
207 return_options,
208 };
209
210 if let Err(error) = push_to_network.send_message(routing, message).await {
211 tracing::error!(?destination, %error, "failed to send out a probe");
212 } else {
213 active_probes
214 .insert(
215 (
216 pseudonym
217 .expect("the pseudonym must be present in Forward routing"),
218 nonce,
219 ),
220 (destination, current_time().as_unix_timestamp(), notifier),
221 )
222 .await;
223 }
224 }
225 DestinationRouting::Return(_surb_matcher) => tracing::error!(
226 error = "logical error",
227 "resolved transport routing is not forward"
228 ),
229 }
230 }
231 })
232 .inspect(|_| {
233 tracing::warn!(
234 task = "transport (probe - generate outgoing)",
235 "long-running background task finished"
236 )
237 })
238 .await;
239 }),
240 );
241
242 processes.insert(
244 HoprProbeProcess::Process,
245 hopr_async_runtime::spawn_as_abortable!(api.1.for_each_concurrent(max_parallel_probes, move |(pseudonym, in_data)| {
246 let active_probes = active_probes_rx.clone();
247 let push_to_network = Sender { downstream: api.0.clone() };
248 let move_up = move_up.clone();
249 let store = reports.clone();
250
251 async move {
252 if in_data.data.application_tag == ReservedTag::Ping.into() {
254 let message: anyhow::Result<Message> = in_data.data.try_into().map_err(|e| anyhow::anyhow!("failed to convert data into message: {e}"));
255
256 match message {
257 Ok(message) => {
258 match message {
259 Message::Telemetry(path_telemetry) => {
260 pin_mut!(store);
261 if let Err(error) = store.send(Ok(Telemetry::Loopback(path_telemetry))).await {
262 tracing::error!(%pseudonym, %error, "failed to record probe success");
263 }
264 },
265 Message::Probe(NeighborProbe::Ping(ping)) => {
266 tracing::debug!(%pseudonym, nonce = hex::encode(ping), "received ping");
267 tracing::trace!(%pseudonym, nonce = hex::encode(ping), "wrapping a pong in the found SURB");
268
269 let message = Message::Probe(NeighborProbe::Pong(ping));
270 if let Err(error) = push_to_network.send_message(DestinationRouting::Return(pseudonym.into()), message).await {
271 tracing::error!(%pseudonym, %error, "failed to send back a pong");
272 }
273 },
274 Message::Probe(NeighborProbe::Pong(pong)) => {
275 tracing::debug!(%pseudonym, nonce = hex::encode(pong), "received pong");
276 if let Some((peer, start, replier)) = active_probes.remove(&(pseudonym, NeighborProbe::Ping(pong))).await {
277 let latency = current_time()
278 .as_unix_timestamp()
279 .saturating_sub(start);
280
281 if let NodeId::Offchain(opk) = peer.as_ref() {
282 tracing::info!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful");
283 pin_mut!(store);
284 if let Err(error) = store.send(Ok(Telemetry::Neighbor(NeighborTelemetry {
285 peer: opk.into(),
286 rtt: latency,
287 }))).await {
288 tracing::error!(%pseudonym, %error, "failed to record probe success");
289 }
290 } else {
291 tracing::warn!(%pseudonym, nonce = hex::encode(pong), latency_ms = latency.as_millis(), "probe successful to non-offchain peer");
292 }
293
294 if let Some(replier) = replier {
295 replier.notify(Ok(latency))
296 };
297 } else {
298 tracing::warn!(%pseudonym, nonce = hex::encode(pong), possible_reasons = "[timeout, adversary]", "received pong for unknown probe");
299 };
300 },
301 }
302 },
303 Err(error) => tracing::error!(%pseudonym, %error, "cannot deserialize message"),
304 }
305 } else {
306 pin_mut!(move_up);
308 if move_up.send((pseudonym, in_data)).await.is_err() {
309 tracing::error!(%pseudonym, error = "receiver error", "failed to send message up");
310 }
311 }
312 }
313 }).inspect(|_| tracing::warn!(task = "transport (probe - processing incoming)", "long-running background task finished")))
314 );
315
316 processes
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use std::{collections::VecDeque, sync::RwLock, time::Duration};
323
324 use async_trait::async_trait;
325 use futures::future::BoxFuture;
326 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair, OffchainKeypair};
327 use hopr_protocol_app::prelude::{ApplicationData, Tag};
328
329 use super::*;
330 use crate::{
331 neighbors::ImmediateNeighborProber,
332 traits::{PeerDiscoveryFetch, ProbeStatusUpdate},
333 };
334
335 lazy_static::lazy_static!(
336 static ref OFFCHAIN_KEYPAIR: OffchainKeypair = OffchainKeypair::random();
337 static ref ONCHAIN_KEYPAIR: ChainKeypair = ChainKeypair::random();
338 static ref NEIGHBOURS: Vec<PeerId> = vec![
339 OffchainKeypair::random().public().into(),
340 OffchainKeypair::random().public().into(),
341 OffchainKeypair::random().public().into(),
342 OffchainKeypair::random().public().into(),
343 ];
344 );
345
346 #[derive(Debug, Clone)]
347 pub struct PeerStore {
348 get_peers: Arc<RwLock<VecDeque<Vec<PeerId>>>>,
349 #[allow(clippy::type_complexity)]
350 on_finished: Arc<RwLock<Vec<(PeerId, crate::errors::Result<Duration>)>>>,
351 }
352
353 #[async_trait]
354 impl ProbeStatusUpdate for PeerStore {
355 async fn on_finished(&self, peer: &PeerId, result: &crate::errors::Result<Duration>) {
356 let mut on_finished = self.on_finished.write().unwrap();
357 on_finished.push((
358 *peer,
359 match result {
360 Ok(duration) => Ok(*duration),
361 Err(_e) => Err(ProbeError::ProbeNeighborTimeout(peer.clone())),
362 },
363 ));
364 }
365 }
366
367 #[async_trait]
368 impl PeerDiscoveryFetch for PeerStore {
369 async fn get_peers(&self, _from_timestamp: std::time::SystemTime) -> Vec<PeerId> {
370 let mut get_peers = self.get_peers.write().unwrap();
371 get_peers.pop_front().unwrap_or_default()
372 }
373 }
374
375 struct TestInterface {
376 from_probing_up_rx: futures::channel::mpsc::Receiver<(HoprPseudonym, ApplicationDataIn)>,
377 from_probing_to_network_rx: futures::channel::mpsc::Receiver<(DestinationRouting, ApplicationDataOut)>,
378 from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
379 manual_probe_tx: futures::channel::mpsc::Sender<(PeerId, PingQueryReplier)>,
380 }
381
382 async fn test_with_probing<F, St, Fut>(cfg: ProbeConfig, store: St, test: F) -> anyhow::Result<()>
383 where
384 Fut: std::future::Future<Output = anyhow::Result<()>>,
385 F: Fn(TestInterface) -> Fut + Send + Sync + 'static,
386 St: ProbeStatusUpdate + PeerDiscoveryFetch + Clone + Send + Sync + 'static,
387 {
388 let probe = Probe::new(cfg);
389
390 let (from_probing_up_tx, from_probing_up_rx) =
391 futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
392
393 let (from_probing_to_network_tx, from_probing_to_network_rx) =
394 futures::channel::mpsc::channel::<(DestinationRouting, ApplicationDataOut)>(100);
395
396 let (from_network_to_probing_tx, from_network_to_probing_rx) =
397 futures::channel::mpsc::channel::<(HoprPseudonym, ApplicationDataIn)>(100);
398
399 let (manual_probe_tx, manual_probe_rx) = futures::channel::mpsc::channel::<(PeerId, PingQueryReplier)>(100);
400
401 let interface = TestInterface {
402 from_probing_up_rx,
403 from_probing_to_network_rx,
404 from_network_to_probing_tx,
405 manual_probe_tx,
406 };
407
408 let jhs = probe
409 .continuously_scan(
410 (from_probing_to_network_tx, from_network_to_probing_rx),
411 manual_probe_rx,
412 from_probing_up_tx,
413 ImmediateNeighborProber::new(cfg, store),
414 )
415 .await;
416
417 let result = test(interface).await;
418
419 jhs.abort_all();
420
421 result
422 }
423
424 const NO_PROBE_PASSES: f64 = 0.0;
425 const ALL_PROBES_PASS: f64 = 1.0;
426
427 fn concurrent_channel(
429 delay: Option<std::time::Duration>,
430 pass_rate: f64,
431 from_network_to_probing_tx: futures::channel::mpsc::Sender<(HoprPseudonym, ApplicationDataIn)>,
432 ) -> impl Fn((DestinationRouting, ApplicationDataOut)) -> BoxFuture<'static, ()> {
433 debug_assert!(
434 (NO_PROBE_PASSES..=ALL_PROBES_PASS).contains(&pass_rate),
435 "Pass rate must be between {NO_PROBE_PASSES} and {ALL_PROBES_PASS}"
436 );
437
438 move |(path, data_out): (DestinationRouting, ApplicationDataOut)| -> BoxFuture<'static, ()> {
439 let mut from_network_to_probing_tx = from_network_to_probing_tx.clone();
440
441 Box::pin(async move {
442 if let DestinationRouting::Forward { pseudonym, .. } = path {
443 let message: Message = data_out.data.try_into().expect("failed to convert data into message");
444 if let Message::Probe(NeighborProbe::Ping(ping)) = message {
445 let pong_message = Message::Probe(NeighborProbe::Pong(ping));
446
447 if let Some(delay) = delay {
448 tokio::time::sleep(delay).await;
450 }
451
452 if rand::Rng::gen_range(&mut rand::thread_rng(), NO_PROBE_PASSES..=ALL_PROBES_PASS) < pass_rate
453 {
454 from_network_to_probing_tx
455 .send((
456 pseudonym.expect("the pseudonym is always known from cache"),
457 ApplicationDataIn {
458 data: pong_message
459 .try_into()
460 .expect("failed to convert pong message into data"),
461 packet_info: Default::default(),
462 },
463 ))
464 .await
465 .expect("failed to send pong message");
466 }
467 }
468 };
469 })
470 }
471 }
472
473 #[tokio::test]
474 async fn probe_should_record_value_for_manual_neighbor_probe() -> anyhow::Result<()> {
476 let cfg = ProbeConfig {
477 timeout: std::time::Duration::from_millis(5),
478 interval: std::time::Duration::from_secs(0),
479 ..Default::default()
480 };
481
482 let store = PeerStore {
483 get_peers: Arc::new(RwLock::new(VecDeque::new())),
484 on_finished: Arc::new(RwLock::new(Vec::new())),
485 };
486
487 test_with_probing(cfg, store, move |iface: TestInterface| async move {
488 let mut manual_probe_tx = iface.manual_probe_tx;
489 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
490 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
491
492 let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
493 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
494
495 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
496 from_probing_to_network_rx
497 .for_each_concurrent(
498 cfg.max_parallel_probes + 1,
499 concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
500 )
501 .await;
502 });
503
504 let _duration = tokio::time::timeout(std::time::Duration::from_secs(1), rx.next())
505 .await?
506 .ok_or_else(|| anyhow::anyhow!("Probe did not return a result in time"))??;
507
508 Ok(())
509 })
510 .await
511 }
512
513 #[tokio::test]
514 async fn probe_should_record_failure_on_manual_fail() -> anyhow::Result<()> {
516 let cfg = ProbeConfig {
517 timeout: std::time::Duration::from_millis(5),
518 interval: std::time::Duration::from_secs(0),
519 ..Default::default()
520 };
521
522 let store = PeerStore {
523 get_peers: Arc::new(RwLock::new(VecDeque::new())),
524 on_finished: Arc::new(RwLock::new(Vec::new())),
525 };
526
527 test_with_probing(cfg, store, move |iface: TestInterface| async move {
528 let mut manual_probe_tx = iface.manual_probe_tx;
529 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
530 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
531
532 let (tx, mut rx) = futures::channel::mpsc::channel::<std::result::Result<Duration, ProbeError>>(128);
533 manual_probe_tx.send((NEIGHBOURS[0], PingQueryReplier::new(tx))).await?;
534
535 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
536 from_probing_to_network_rx
537 .for_each_concurrent(
538 cfg.max_parallel_probes + 1,
539 concurrent_channel(None, NO_PROBE_PASSES, from_network_to_probing_tx),
540 )
541 .await;
542 });
543
544 assert!(tokio::time::timeout(cfg.timeout * 2, rx.next()).await.is_err());
545
546 Ok(())
547 })
548 .await
549 }
550
551 #[tokio::test]
552 async fn probe_should_record_results_of_successful_automatically_generated_probes() -> anyhow::Result<()> {
554 let cfg = ProbeConfig {
555 timeout: std::time::Duration::from_millis(20),
556 max_parallel_probes: NEIGHBOURS.len(),
557 interval: std::time::Duration::from_secs(0),
558 ..Default::default()
559 };
560
561 let store = PeerStore {
562 get_peers: Arc::new(RwLock::new({
563 let mut neighbors = VecDeque::new();
564 neighbors.push_back(NEIGHBOURS.clone());
565 neighbors
566 })),
567 on_finished: Arc::new(RwLock::new(Vec::new())),
568 };
569
570 test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
571 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
572 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
573
574 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
575 from_probing_to_network_rx
576 .for_each_concurrent(
577 cfg.max_parallel_probes + 1,
578 concurrent_channel(None, ALL_PROBES_PASS, from_network_to_probing_tx),
579 )
580 .await;
581 });
582
583 tokio::time::sleep(cfg.timeout).await;
585
586 Ok(())
587 })
588 .await?;
589
590 assert_eq!(
591 store
592 .on_finished
593 .read()
594 .expect("should be lockable")
595 .iter()
596 .filter(|(_peer, result)| result.is_ok())
597 .count(),
598 NEIGHBOURS.len()
599 );
600
601 Ok(())
602 }
603
604 #[tokio::test]
605 async fn probe_should_record_results_of_timed_out_automatically_generated_probes() -> anyhow::Result<()> {
607 let cfg = ProbeConfig {
608 timeout: std::time::Duration::from_millis(10),
609 max_parallel_probes: NEIGHBOURS.len(),
610 interval: std::time::Duration::from_secs(0),
611 ..Default::default()
612 };
613
614 let store = PeerStore {
615 get_peers: Arc::new(RwLock::new({
616 let mut neighbors = VecDeque::new();
617 neighbors.push_back(NEIGHBOURS.clone());
618 neighbors
619 })),
620 on_finished: Arc::new(RwLock::new(Vec::new())),
621 };
622
623 let timeout = cfg.timeout * 2;
624
625 test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
626 let from_probing_to_network_rx = iface.from_probing_to_network_rx;
627 let from_network_to_probing_tx = iface.from_network_to_probing_tx;
628
629 let _jh: hopr_async_runtime::prelude::JoinHandle<()> = tokio::spawn(async move {
630 from_probing_to_network_rx
631 .for_each_concurrent(
632 cfg.max_parallel_probes + 1,
633 concurrent_channel(Some(timeout), ALL_PROBES_PASS, from_network_to_probing_tx),
634 )
635 .await;
636 });
637
638 tokio::time::sleep(timeout * 2).await;
640
641 Ok(())
642 })
643 .await?;
644
645 assert_eq!(
646 store
647 .on_finished
648 .read()
649 .expect("should be lockable")
650 .iter()
651 .filter(|(_peer, result)| result.is_err())
652 .count(),
653 NEIGHBOURS.len()
654 );
655
656 Ok(())
657 }
658
659 #[tokio::test]
660 async fn probe_should_pass_through_non_associated_tags() -> anyhow::Result<()> {
662 let cfg = ProbeConfig {
663 timeout: std::time::Duration::from_millis(20),
664 interval: std::time::Duration::from_secs(0),
665 ..Default::default()
666 };
667
668 let store = PeerStore {
669 get_peers: Arc::new(RwLock::new({
670 let mut neighbors = VecDeque::new();
671 neighbors.push_back(NEIGHBOURS.clone());
672 neighbors
673 })),
674 on_finished: Arc::new(RwLock::new(Vec::new())),
675 };
676
677 test_with_probing(cfg, store.clone(), move |iface: TestInterface| async move {
678 let mut from_network_to_probing_tx = iface.from_network_to_probing_tx;
679 let mut from_probing_up_rx = iface.from_probing_up_rx;
680
681 let expected_data = ApplicationData::new(Tag::MAX, b"Hello, this is a test message!")?;
682
683 from_network_to_probing_tx
684 .send((
685 HoprPseudonym::random(),
686 ApplicationDataIn {
687 data: expected_data.clone(),
688 packet_info: Default::default(),
689 },
690 ))
691 .await?;
692
693 let actual = tokio::time::timeout(cfg.timeout, from_probing_up_rx.next())
694 .await?
695 .ok_or_else(|| anyhow::anyhow!("Did not return any data in time"))?
696 .1;
697
698 assert_eq!(actual.data, expected_data);
699
700 Ok(())
701 })
702 .await
703 }
704}