1use hopr_api::graph::{MeasurableEdge, MeasurableNode, NetworkGraphWrite, traits::EdgeObservableWrite};
2use petgraph::graph::{EdgeIndex, NodeIndex};
3
4use crate::{ChannelGraph, Observations, graph::InnerGraph};
5
6fn resolve_loopback_edges(inner: &InnerGraph, me_idx: NodeIndex, path_bytes: &[u8]) -> Option<Vec<EdgeIndex>> {
16 if path_bytes.len() != size_of::<hopr_api::ct::PathId>() {
17 tracing::warn!(
18 path_len = path_bytes.len(),
19 expected = size_of::<hopr_api::ct::PathId>(),
20 "invalid loopback path byte length"
21 );
22 return None;
23 }
24
25 let mut path_id = [0u64; 5];
26 for (i, chunk) in path_bytes.chunks_exact(8).enumerate() {
27 path_id[i] = u64::from_le_bytes(chunk.try_into().expect("chunk is 8 bytes"));
28 }
29
30 let me_val = me_idx.index() as u64;
31
32 if path_id[0] != me_val {
34 tracing::warn!("loopback path does not start at self");
35 return None;
36 }
37
38 let Some(end_pos) = path_id[1..].iter().position(|&v| v == me_val).map(|p| p + 1) else {
40 tracing::warn!("loopback path does not close back to self");
41 return None;
42 };
43
44 let mut edges = Vec::new();
46
47 for pair in path_id[..=end_pos].windows(2) {
48 let from = NodeIndex::new(pair[0] as usize);
49 let to = NodeIndex::new(pair[1] as usize);
50 let Some(edge) = inner.graph.find_edge(from, to) else {
51 break;
52 };
53 edges.push(edge);
54 }
55
56 if edges.len() < 2 {
57 tracing::warn!(
58 edge_count = edges.len(),
59 "loopback path too short to attribute intermediate measurement"
60 );
61 return None;
62 }
63
64 Some(edges)
65}
66
67impl hopr_api::graph::NetworkGraphUpdate for ChannelGraph {
68 #[tracing::instrument(level = "debug", skip(self, update))]
69 fn record_edge<N, P>(&self, update: MeasurableEdge<N, P>)
70 where
71 N: hopr_api::graph::MeasurablePeer + Send + Clone,
72 P: hopr_api::graph::MeasurablePath + Send + Clone,
73 {
74 use hopr_api::graph::{
75 EdgeLinkObservable,
76 traits::{EdgeObservableRead, EdgeWeightType},
77 };
78
79 match update {
80 MeasurableEdge::Probe(Ok(hopr_api::graph::EdgeTransportTelemetry::Neighbor(ref telemetry))) => {
81 tracing::trace!(
82 peer = %telemetry.peer(),
83 latency_ms = telemetry.rtt().as_millis(),
84 "neighbor probe successful"
85 );
86
87 self.upsert_edge(&self.me, telemetry.peer(), |obs| {
90 obs.record(EdgeWeightType::Connected(true));
91 obs.record(EdgeWeightType::Immediate(Ok(telemetry.rtt() / 2)));
92 });
93 self.upsert_edge(telemetry.peer(), &self.me, |obs| {
94 obs.record(EdgeWeightType::Connected(true));
95 obs.record(EdgeWeightType::Immediate(Ok(telemetry.rtt() / 2)));
96 });
97 }
98 MeasurableEdge::Probe(Ok(hopr_api::graph::EdgeTransportTelemetry::Loopback(telemetry))) => {
99 tracing::trace!("loopback probe successful");
100
101 let mut inner = self.inner.write();
102 let Some(me_idx) = inner.indices.get_by_left(&self.me).copied() else {
103 tracing::debug!("failed to resolve index of myself for loopback probe attribution");
104 return;
105 };
106 let Some(edges) = resolve_loopback_edges(&inner, me_idx, telemetry.path()) else {
107 tracing::debug!("failed to resolve loopback path for probe attribution");
108 return;
109 };
110
111 let target_idx = edges.len() - 2;
112
113 let total_rtt = std::time::Duration::from_millis(telemetry.timestamp() as u64);
118 let mut known_latency = std::time::Duration::ZERO;
119
120 for &edge in &edges {
121 if let Some(weight) = inner.graph.edge_weight(edge) {
122 let lat = weight
123 .intermediate_qos()
124 .and_then(|q| q.average_latency())
125 .or_else(|| weight.immediate_qos().and_then(|q| q.average_latency()));
126 if let Some(lat) = lat {
127 known_latency += lat;
128 }
129 } else {
130 tracing::debug!("failed to find edge for loopback probe attribution");
131 }
132 }
133
134 let attributed_duration = total_rtt.saturating_sub(known_latency);
135
136 tracing::trace!(
137 target_edge = edges[target_idx].index(),
138 attributed_ms = attributed_duration.as_millis(),
139 total_rtt_ms = total_rtt.as_millis(),
140 path_edges = edges.len(),
141 "loopback probe attributed to intermediate edge"
142 );
143
144 if let Some(weight) = inner.graph.edge_weight_mut(edges[target_idx]) {
145 weight.record(EdgeWeightType::Intermediate(Ok(attributed_duration)));
146 } else {
147 tracing::debug!("failed to find target edge for loopback probe attribution");
148 }
149 }
150 MeasurableEdge::Probe(Err(hopr_api::graph::NetworkGraphError::ProbeNeighborTimeout(ref peer))) => {
151 tracing::trace!(
152 peer = %peer,
153 reason = "probe timeout",
154 "neighbor probe failed"
155 );
156
157 self.upsert_edge(&self.me, peer, |obs| {
160 obs.record(EdgeWeightType::Immediate(Err(())));
161 });
162 self.upsert_edge(peer, &self.me, |obs| {
163 obs.record(EdgeWeightType::Immediate(Err(())));
164 });
165 }
166 MeasurableEdge::Probe(Err(hopr_api::graph::NetworkGraphError::ProbeLoopbackTimeout(telemetry))) => {
167 tracing::trace!("loopback probe failed");
168
169 let mut inner = self.inner.write();
170 let Some(me_idx) = inner.indices.get_by_left(&self.me).copied() else {
171 tracing::debug!("failed to resolve index of myself");
172 return;
173 };
174 let Some(edges) = resolve_loopback_edges(&inner, me_idx, telemetry.path()) else {
175 tracing::debug!("failed to resolve loopback path for probe timeout, cannot attribute");
176 return;
177 };
178
179 let target_idx = edges.len() - 2;
180
181 tracing::trace!(
182 target_edge = edges[target_idx].index(),
183 path_edges = edges.len(),
184 "loopback probe timeout attributed to intermediate edge"
185 );
186
187 if let Some(weight) = inner.graph.edge_weight_mut(edges[target_idx]) {
188 weight.record(EdgeWeightType::Intermediate(Err(())));
189 }
190 }
191 MeasurableEdge::Capacity(update) => {
192 self.upsert_edge(&update.src, &update.dest, |obs: &mut Observations| {
193 obs.record(EdgeWeightType::Capacity(update.capacity));
194 });
195 }
196 MeasurableEdge::ConnectionStatus { peer, connected } => {
197 tracing::trace!(
198 peer = %peer,
199 connected = connected,
200 "recording connection status update"
201 );
202
203 self.upsert_edge(&self.me, &peer, |obs| {
204 obs.record(EdgeWeightType::Connected(connected));
205 });
206 self.upsert_edge(&peer, &self.me, |obs| {
207 obs.record(EdgeWeightType::Connected(connected));
208 });
209 }
210 }
211 }
212
213 #[tracing::instrument(level = "debug", skip(self, update))]
214 fn record_node<N>(&self, update: N)
215 where
216 N: MeasurableNode + Clone + Send + Sync + 'static,
217 {
218 hopr_api::graph::NetworkGraphWrite::add_node(self, update.into());
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use anyhow::Context;
225 use hex_literal::hex;
226 use hopr_api::{
227 OffchainPublicKey,
228 graph::{
229 EdgeLinkObservable, EdgeTransportTelemetry, MeasurablePath, MeasurablePeer, NetworkGraphError,
230 NetworkGraphUpdate, NetworkGraphView, NetworkGraphWrite,
231 traits::{EdgeObservableRead, EdgeProtocolObservable},
232 },
233 types::crypto::prelude::{Keypair, OffchainKeypair},
234 };
235
236 use super::*;
237
238 const SECRET_0: [u8; 32] = hex!("60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d");
240 const SECRET_1: [u8; 32] = hex!("71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a");
241 const SECRET_2: [u8; 32] = hex!("c24bd833704dd2abdae3933fcc9962c2ac404f84132224c474147382d4db2299");
242 const SECRET_3: [u8; 32] = hex!("e0bf93e9c916104da00b1850adc4608bd7e9087bbd3f805451f4556aa6b3fd6e");
243
244 fn pubkey_from(secret: &[u8; 32]) -> OffchainPublicKey {
246 *OffchainKeypair::from_secret(secret).expect("valid secret key").public()
247 }
248
249 #[derive(Debug, Clone)]
250 struct TestNeighbor {
251 peer: OffchainPublicKey,
252 rtt: std::time::Duration,
253 }
254
255 impl MeasurablePeer for TestNeighbor {
256 fn peer(&self) -> &OffchainPublicKey {
257 &self.peer
258 }
259
260 fn rtt(&self) -> std::time::Duration {
261 self.rtt
262 }
263 }
264
265 #[derive(Debug, Clone)]
266 struct TestPath;
267
268 impl MeasurablePath for TestPath {
269 fn id(&self) -> &[u8] {
270 &[]
271 }
272
273 fn path(&self) -> &[u8] {
274 &[]
275 }
276
277 fn timestamp(&self) -> u128 {
278 0
279 }
280 }
281
282 #[tokio::test]
283 async fn neighbor_probe_should_update_edge_observation() -> anyhow::Result<()> {
284 let me_kp = OffchainKeypair::from_secret(&SECRET_0)?;
285 let me = *me_kp.public();
286 let peer_kp = OffchainKeypair::from_secret(&SECRET_1)?;
287 let peer_key = *peer_kp.public();
288
289 let graph = ChannelGraph::new(me);
290 graph.add_node(peer_key);
291 graph.add_edge(&me, &peer_key)?;
292
293 let rtt = std::time::Duration::from_millis(100);
294 let telemetry: Result<EdgeTransportTelemetry<TestNeighbor, TestPath>, NetworkGraphError<TestPath>> =
295 Ok(EdgeTransportTelemetry::Neighbor(TestNeighbor { peer: peer_key, rtt }));
296 graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
297
298 let obs = graph.edge(&me, &peer_key).context("edge observation should exist")?;
299 let immediate = obs
300 .immediate_qos()
301 .context("immediate QoS should be present after probe")?;
302 assert_eq!(immediate.average_latency().context("latency should be set")?, rtt / 2,);
303 Ok(())
304 }
305
306 #[tokio::test]
307 async fn neighbor_probe_should_create_symmetric_edges() -> anyhow::Result<()> {
308 let me = pubkey_from(&SECRET_0);
309 let peer = pubkey_from(&SECRET_1);
310
311 let graph = ChannelGraph::new(me);
312 graph.add_node(peer);
313 let rtt = std::time::Duration::from_millis(100);
316 let telemetry: Result<EdgeTransportTelemetry<TestNeighbor, TestPath>, NetworkGraphError<TestPath>> =
317 Ok(EdgeTransportTelemetry::Neighbor(TestNeighbor { peer, rtt }));
318 graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
319
320 let obs_fwd = graph.edge(&me, &peer).context("edge me→peer should exist")?;
322 let imm_fwd = obs_fwd.immediate_qos().context("me→peer should have immediate QoS")?;
323 assert_eq!(
324 imm_fwd.average_latency().context("me→peer latency should be set")?,
325 rtt / 2
326 );
327
328 let obs_rev = graph.edge(&peer, &me).context("edge peer→me should exist")?;
330 let imm_rev = obs_rev.immediate_qos().context("peer→me should have immediate QoS")?;
331 assert_eq!(
332 imm_rev.average_latency().context("peer→me latency should be set")?,
333 rtt / 2
334 );
335
336 Ok(())
337 }
338
339 #[tokio::test]
340 async fn neighbor_probe_timeout_should_create_symmetric_edges() -> anyhow::Result<()> {
341 let me = pubkey_from(&SECRET_0);
342 let peer = pubkey_from(&SECRET_1);
343
344 let graph = ChannelGraph::new(me);
345 graph.add_node(peer);
346 let telemetry: Result<EdgeTransportTelemetry<TestNeighbor, TestPath>, NetworkGraphError<TestPath>> =
349 Err(NetworkGraphError::ProbeNeighborTimeout(Box::new(peer)));
350 graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
351
352 let obs_fwd = graph
354 .edge(&me, &peer)
355 .context("edge me→peer should exist after timeout")?;
356 let imm_fwd = obs_fwd.immediate_qos().context("me→peer should have immediate QoS")?;
357 assert!(
358 imm_fwd.average_latency().is_none(),
359 "failed probe should not set latency"
360 );
361 assert!(
362 imm_fwd.average_probe_rate() < 1.0,
363 "failed probe should lower success rate"
364 );
365
366 let obs_rev = graph
368 .edge(&peer, &me)
369 .context("edge peer→me should exist after timeout")?;
370 let imm_rev = obs_rev.immediate_qos().context("peer→me should have immediate QoS")?;
371 assert!(
372 imm_rev.average_latency().is_none(),
373 "failed probe should not set latency on reverse"
374 );
375 assert!(
376 imm_rev.average_probe_rate() < 1.0,
377 "failed probe should lower success rate on reverse"
378 );
379
380 Ok(())
381 }
382
383 #[tokio::test]
384 async fn probe_timeout_should_record_as_failed_probe() -> anyhow::Result<()> {
385 let me_kp = OffchainKeypair::from_secret(&SECRET_0)?;
386 let me = *me_kp.public();
387 let peer_kp = OffchainKeypair::from_secret(&SECRET_1)?;
388 let peer_key = *peer_kp.public();
389
390 let graph = ChannelGraph::new(me);
391 graph.add_node(peer_key);
392 graph.add_edge(&me, &peer_key)?;
393
394 let telemetry: Result<EdgeTransportTelemetry<TestNeighbor, TestPath>, NetworkGraphError<TestPath>> =
395 Err(NetworkGraphError::ProbeNeighborTimeout(Box::new(peer_key)));
396 graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
397
398 let obs = graph.edge(&me, &peer_key).context("edge observation should exist")?;
399 let immediate = obs
400 .immediate_qos()
401 .context("immediate QoS should be present after failed probe")?;
402 assert!(immediate.average_latency().is_none());
403 assert!(immediate.average_probe_rate() < 1.0);
404 Ok(())
405 }
406
407 #[tokio::test]
408 async fn capacity_update_should_set_edge_capacity() -> anyhow::Result<()> {
409 let me = pubkey_from(&SECRET_0);
410 let peer = pubkey_from(&SECRET_1);
411 let graph = ChannelGraph::new(me);
412 graph.add_node(peer);
413 graph.add_edge(&me, &peer)?;
414
415 let capacity_update = hopr_api::graph::EdgeCapacityUpdate {
416 src: me,
417 dest: peer,
418 capacity: Some(1000),
419 };
420 graph.record_edge::<TestNeighbor, TestPath>(hopr_api::graph::MeasurableEdge::Capacity(Box::new(
421 capacity_update,
422 )));
423
424 let obs = graph.edge(&me, &peer).context("edge should exist")?;
425 let intermediate = obs
426 .intermediate_qos()
427 .context("intermediate QoS should be present after capacity update")?;
428 assert_eq!(intermediate.capacity(), Some(1000));
429 Ok(())
430 }
431
432 #[tokio::test]
433 async fn capacity_update_should_accept_none_value() -> anyhow::Result<()> {
434 let me = pubkey_from(&SECRET_0);
435 let peer = pubkey_from(&SECRET_1);
436 let graph = ChannelGraph::new(me);
437 graph.add_node(peer);
438 graph.add_edge(&me, &peer)?;
439
440 let capacity_update = hopr_api::graph::EdgeCapacityUpdate {
441 src: me,
442 dest: peer,
443 capacity: None,
444 };
445 graph.record_edge::<TestNeighbor, TestPath>(hopr_api::graph::MeasurableEdge::Capacity(Box::new(
446 capacity_update,
447 )));
448
449 let obs = graph.edge(&me, &peer).context("edge should exist")?;
450 let intermediate = obs.intermediate_qos().context("intermediate QoS should be present")?;
451 assert_eq!(intermediate.capacity(), None);
452 Ok(())
453 }
454
455 #[tokio::test]
456 async fn record_node_should_add_node_to_graph() {
457 let me = pubkey_from(&SECRET_0);
458 let peer = pubkey_from(&SECRET_1);
459 let graph = ChannelGraph::new(me);
460
461 assert!(!graph.contains_node(&peer));
462 graph.record_node(peer);
463 assert!(graph.contains_node(&peer));
464 }
465
466 #[tokio::test]
467 async fn probe_should_create_edge_if_absent() -> anyhow::Result<()> {
468 let me = pubkey_from(&SECRET_0);
469 let peer = pubkey_from(&SECRET_1);
470 let graph = ChannelGraph::new(me);
471 graph.add_node(peer);
472 let rtt = std::time::Duration::from_millis(80);
475 let telemetry: Result<EdgeTransportTelemetry<TestNeighbor, TestPath>, NetworkGraphError<TestPath>> =
476 Ok(EdgeTransportTelemetry::Neighbor(TestNeighbor { peer: peer, rtt }));
477 graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
478
479 assert!(graph.has_edge(&me, &peer), "probe should create edge via upsert");
480 let obs = graph.edge(&me, &peer).context("edge should exist")?;
481 assert!(obs.immediate_qos().is_some());
482 Ok(())
483 }
484
485 #[tokio::test]
486 async fn multiple_probes_should_accumulate_in_observations() -> anyhow::Result<()> {
487 let me = pubkey_from(&SECRET_0);
488 let peer = pubkey_from(&SECRET_1);
489 let graph = ChannelGraph::new(me);
490 graph.add_node(peer);
491 graph.add_edge(&me, &peer)?;
492
493 for _ in 0..5 {
495 let telemetry: Result<EdgeTransportTelemetry<TestNeighbor, TestPath>, NetworkGraphError<TestPath>> =
496 Ok(EdgeTransportTelemetry::Neighbor(TestNeighbor {
497 peer: peer,
498 rtt: std::time::Duration::from_millis(60),
499 }));
500 graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
501 }
502
503 let obs = graph.edge(&me, &peer).context("edge should exist")?;
504 let qos = obs.immediate_qos().context("immediate QoS should exist")?;
505 assert_eq!(
506 qos.average_latency().context("latency should be set")?,
507 std::time::Duration::from_millis(30), );
509 assert!(qos.average_probe_rate() > 0.9, "all probes succeeded");
510 Ok(())
511 }
512
513 #[derive(Debug, Clone)]
516 struct LoopbackTestPath {
517 path_bytes: Vec<u8>,
518 timestamp_ms: u128,
519 }
520
521 impl LoopbackTestPath {
522 fn new(path_id: [u64; 5], timestamp_ms: u128) -> Self {
523 let path_bytes = path_id.iter().flat_map(|v| v.to_le_bytes()).collect();
524 Self {
525 path_bytes,
526 timestamp_ms,
527 }
528 }
529 }
530
531 impl MeasurablePath for LoopbackTestPath {
532 fn id(&self) -> &[u8] {
533 &[]
534 }
535
536 fn path(&self) -> &[u8] {
537 &self.path_bytes
538 }
539
540 fn timestamp(&self) -> u128 {
541 self.timestamp_ms
542 }
543 }
544
545 fn send_loopback(graph: &ChannelGraph, path_id: [u64; 5], timestamp_ms: u128) {
547 let telemetry: Result<
548 EdgeTransportTelemetry<TestNeighbor, LoopbackTestPath>,
549 NetworkGraphError<LoopbackTestPath>,
550 > = Ok(EdgeTransportTelemetry::Loopback(LoopbackTestPath::new(
551 path_id,
552 timestamp_ms,
553 )));
554 graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
555 }
556
557 fn send_loopback_timeout(graph: &ChannelGraph, path_id: [u64; 5]) {
559 let telemetry: Result<
560 EdgeTransportTelemetry<TestNeighbor, LoopbackTestPath>,
561 NetworkGraphError<LoopbackTestPath>,
562 > = Err(NetworkGraphError::ProbeLoopbackTimeout(LoopbackTestPath::new(
563 path_id, 0,
564 )));
565 graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
566 }
567
568 #[tokio::test]
569 async fn loopback_three_hop_should_attribute_to_penultimate_edge() -> anyhow::Result<()> {
570 let me = pubkey_from(&SECRET_0);
575 let a = pubkey_from(&SECRET_1);
576 let b = pubkey_from(&SECRET_2);
577
578 let graph = ChannelGraph::new(me);
579 graph.add_node(a);
580 graph.add_node(b);
581 graph.add_edge(&me, &a)?;
582 graph.add_edge(&a, &b)?;
583 graph.add_edge(&b, &me)?; send_loopback(&graph, [0, 1, 2, 0, 0], 200);
586
587 let obs = graph.edge(&a, &b).context("edge a→b should exist")?;
588 let qos = obs
589 .intermediate_qos()
590 .context("intermediate QoS should be present on a→b")?;
591 assert_eq!(
592 qos.average_latency().context("latency should be set")?,
593 std::time::Duration::from_millis(200),
594 );
595
596 let obs_me_a = graph.edge(&me, &a).context("edge me→a should exist")?;
598 assert!(obs_me_a.intermediate_qos().is_none());
599
600 Ok(())
601 }
602
603 #[tokio::test]
604 async fn loopback_four_hop_should_attribute_to_penultimate_edge() -> anyhow::Result<()> {
605 let me = pubkey_from(&SECRET_0);
610 let a = pubkey_from(&SECRET_1);
611 let b = pubkey_from(&SECRET_2);
612 let c = pubkey_from(&SECRET_3);
613
614 let graph = ChannelGraph::new(me);
615 graph.add_node(a);
616 graph.add_node(b);
617 graph.add_node(c);
618 graph.add_edge(&me, &a)?;
619 graph.add_edge(&a, &b)?;
620 graph.add_edge(&b, &c)?;
621 graph.add_edge(&c, &me)?; send_loopback(&graph, [0, 1, 2, 3, 0], 300);
624
625 let obs = graph.edge(&b, &c).context("edge b→c should exist")?;
627 let qos = obs
628 .intermediate_qos()
629 .context("intermediate QoS should be present on b→c")?;
630 assert_eq!(
631 qos.average_latency().context("latency should be set")?,
632 std::time::Duration::from_millis(300),
633 "no preceding intermediate latencies, so full RTT is attributed"
634 );
635
636 let obs_me_a = graph.edge(&me, &a).context("edge me→a should exist")?;
638 assert!(obs_me_a.intermediate_qos().is_none());
639 let obs_a_b = graph.edge(&a, &b).context("edge a→b should exist")?;
640 assert!(obs_a_b.intermediate_qos().is_none());
641
642 Ok(())
643 }
644
645 #[tokio::test]
646 async fn loopback_should_subtract_known_preceding_latencies() -> anyhow::Result<()> {
647 let me = pubkey_from(&SECRET_0);
653 let a = pubkey_from(&SECRET_1);
654 let b = pubkey_from(&SECRET_2);
655 let c = pubkey_from(&SECRET_3);
656
657 let graph = ChannelGraph::new(me);
658 graph.add_node(a);
659 graph.add_node(b);
660 graph.add_node(c);
661 graph.add_edge(&me, &a)?;
662 graph.add_edge(&a, &b)?;
663 graph.add_edge(&b, &c)?;
664 graph.add_edge(&c, &me)?; graph.upsert_edge(&me, &a, |obs| {
668 use hopr_api::graph::traits::EdgeObservableWrite;
669 obs.record(hopr_api::graph::traits::EdgeWeightType::Intermediate(Ok(
670 std::time::Duration::from_millis(80),
671 )));
672 });
673 graph.upsert_edge(&a, &b, |obs| {
674 use hopr_api::graph::traits::EdgeObservableWrite;
675 obs.record(hopr_api::graph::traits::EdgeWeightType::Intermediate(Ok(
676 std::time::Duration::from_millis(40),
677 )));
678 });
679
680 send_loopback(&graph, [0, 1, 2, 3, 0], 300);
681
682 let obs = graph.edge(&b, &c).context("edge b→c should exist")?;
683 let qos = obs
684 .intermediate_qos()
685 .context("intermediate QoS should be present on b→c")?;
686 assert_eq!(
687 qos.average_latency().context("latency should be set")?,
688 std::time::Duration::from_millis(180),
689 "300ms total - 80ms (me→a) - 40ms (a→b) = 180ms attributed to b→c"
690 );
691
692 Ok(())
693 }
694
695 #[tokio::test]
696 async fn loopback_should_subtract_immediate_latency_on_first_edge() -> anyhow::Result<()> {
697 let me = pubkey_from(&SECRET_0);
702 let a = pubkey_from(&SECRET_1);
703 let b = pubkey_from(&SECRET_2);
704
705 let graph = ChannelGraph::new(me);
706 graph.add_node(a);
707 graph.add_node(b);
708 graph.add_edge(&me, &a)?;
709 graph.add_edge(&a, &b)?;
710 graph.add_edge(&b, &me)?;
711
712 graph.upsert_edge(&me, &a, |obs| {
714 use hopr_api::graph::traits::EdgeObservableWrite;
715 obs.record(hopr_api::graph::traits::EdgeWeightType::Immediate(Ok(
716 std::time::Duration::from_millis(60),
717 )));
718 });
719
720 send_loopback(&graph, [0, 1, 2, 0, 0], 200);
721
722 let obs = graph.edge(&a, &b).context("edge a→b should exist")?;
723 let qos = obs
724 .intermediate_qos()
725 .context("intermediate QoS should be present on a→b")?;
726 assert_eq!(
727 qos.average_latency().context("latency should be set")?,
728 std::time::Duration::from_millis(140),
729 "200ms total - 60ms (me→a immediate) = 140ms attributed to a→b"
730 );
731
732 Ok(())
733 }
734
735 #[tokio::test]
736 async fn loopback_invalid_path_length_should_be_ignored() -> anyhow::Result<()> {
737 let me = pubkey_from(&SECRET_0);
738 let a = pubkey_from(&SECRET_1);
739
740 let graph = ChannelGraph::new(me);
741 graph.add_node(a);
742 graph.add_edge(&me, &a)?;
743
744 let telemetry: Result<
746 EdgeTransportTelemetry<TestNeighbor, LoopbackTestPath>,
747 NetworkGraphError<LoopbackTestPath>,
748 > = Ok(EdgeTransportTelemetry::Loopback(LoopbackTestPath {
749 path_bytes: vec![0u8; 16], timestamp_ms: 100,
751 }));
752 graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
753
754 let obs = graph.edge(&me, &a).context("edge should exist")?;
756 assert!(
757 obs.intermediate_qos().is_none(),
758 "invalid path bytes should not produce any intermediate measurement"
759 );
760
761 Ok(())
762 }
763
764 #[tokio::test]
765 async fn loopback_single_edge_path_should_be_ignored_if_no_immediate_or_intermediate_result_exists_for_the_edge()
766 -> anyhow::Result<()> {
767 let me = pubkey_from(&SECRET_0);
772 let a = pubkey_from(&SECRET_1);
773
774 let graph = ChannelGraph::new(me);
775 graph.add_node(a);
776 graph.add_edge(&me, &a)?;
777
778 send_loopback(&graph, [0, 1, 0, 0, 0], 100);
779
780 let obs = graph.edge(&me, &a).context("edge should exist")?;
781 assert!(
782 obs.intermediate_qos().is_none(),
783 "single-edge path should not produce intermediate measurement"
784 );
785
786 Ok(())
787 }
788
789 #[tokio::test]
790 async fn loopback_two_edge_path_should_attribute_when_return_edge_exists() -> anyhow::Result<()> {
791 let me = pubkey_from(&SECRET_0);
797 let a = pubkey_from(&SECRET_1);
798
799 let graph = ChannelGraph::new(me);
800 graph.add_node(a);
801 graph.add_edge(&me, &a)?;
802 graph.add_edge(&a, &me)?; graph.upsert_edge(&me, &a, |obs| {
806 use hopr_api::graph::traits::EdgeObservableWrite;
807 obs.record(hopr_api::graph::traits::EdgeWeightType::Immediate(Ok(
808 std::time::Duration::from_millis(50),
809 )));
810 });
811
812 send_loopback(&graph, [0, 1, 0, 0, 0], 100);
813
814 let obs = graph.edge(&me, &a).context("edge me→a should exist")?;
815 let qos = obs
816 .intermediate_qos()
817 .context("intermediate QoS should be present on me→a")?;
818 assert_eq!(
819 qos.average_latency().context("latency should be set")?,
820 std::time::Duration::from_millis(50),
821 "100ms total - 50ms (me→a immediate) = 50ms attributed to me→a"
822 );
823
824 let imm = obs
826 .immediate_qos()
827 .context("immediate QoS should still be present on me→a")?;
828 assert_eq!(
829 imm.average_latency().context("immediate latency should be set")?,
830 std::time::Duration::from_millis(50),
831 );
832
833 Ok(())
834 }
835
836 #[tokio::test]
837 async fn loopback_broken_chain_should_be_ignored() -> anyhow::Result<()> {
838 let me = pubkey_from(&SECRET_0);
843 let a = pubkey_from(&SECRET_1);
844 let b = pubkey_from(&SECRET_2);
845 let c = pubkey_from(&SECRET_3);
846
847 let graph = ChannelGraph::new(me);
848 graph.add_node(a);
849 graph.add_node(b);
850 graph.add_node(c);
851 graph.add_edge(&me, &a)?;
852 graph.add_edge(&b, &c)?; send_loopback(&graph, [0, 1, 3, 0, 0], 200);
855
856 let obs_me_a = graph.edge(&me, &a).context("edge me→a should exist")?;
857 assert!(
858 obs_me_a.intermediate_qos().is_none(),
859 "broken chain should not attribute any intermediate measurement"
860 );
861
862 Ok(())
863 }
864
865 #[tokio::test]
866 async fn loopback_wrong_start_node_should_be_ignored() -> anyhow::Result<()> {
867 let me = pubkey_from(&SECRET_0);
869 let a = pubkey_from(&SECRET_1);
870
871 let graph = ChannelGraph::new(me);
872 graph.add_node(a);
873 graph.add_edge(&me, &a)?;
874
875 send_loopback(&graph, [99, 1, 0, 0, 0], 200);
876
877 let obs = graph.edge(&me, &a).context("edge should exist")?;
878 assert!(
879 obs.intermediate_qos().is_none(),
880 "wrong start node should not produce any measurement"
881 );
882
883 Ok(())
884 }
885
886 #[tokio::test]
887 async fn loopback_probes_should_accumulate_on_target_edge() -> anyhow::Result<()> {
888 let me = pubkey_from(&SECRET_0);
891 let a = pubkey_from(&SECRET_1);
892 let b = pubkey_from(&SECRET_2);
893
894 let graph = ChannelGraph::new(me);
895 graph.add_node(a);
896 graph.add_node(b);
897 graph.add_edge(&me, &a)?;
898 graph.add_edge(&a, &b)?;
899 graph.add_edge(&b, &me)?; for _ in 0..5 {
905 send_loopback(&graph, [0, 1, 2, 0, 0], 100);
906 }
907
908 let obs = graph.edge(&a, &b).context("edge a→b should exist")?;
909 let qos = obs.intermediate_qos().context("intermediate QoS should be present")?;
910 assert!(
911 qos.average_latency().is_some(),
912 "latency should be set after multiple probes"
913 );
914 assert!(
915 qos.average_probe_rate() > 0.9,
916 "all probes succeeded, rate should be high"
917 );
918
919 Ok(())
920 }
921
922 #[tokio::test]
924 async fn loopback_saturating_sub_should_not_underflow() -> anyhow::Result<()> {
925 let me = pubkey_from(&SECRET_0);
929 let a = pubkey_from(&SECRET_1);
930 let b = pubkey_from(&SECRET_2);
931 let c = pubkey_from(&SECRET_3);
932
933 let graph = ChannelGraph::new(me);
934 graph.add_node(a);
935 graph.add_node(b);
936 graph.add_node(c);
937 graph.add_edge(&me, &a)?;
938 graph.add_edge(&a, &b)?;
939 graph.add_edge(&b, &c)?;
940 graph.add_edge(&c, &me)?; graph.upsert_edge(&me, &a, |obs| {
944 use hopr_api::graph::traits::EdgeObservableWrite;
945 obs.record(hopr_api::graph::traits::EdgeWeightType::Intermediate(Ok(
946 std::time::Duration::from_millis(500),
947 )));
948 });
949
950 send_loopback(&graph, [0, 1, 2, 3, 0], 100);
952
953 let obs = graph.edge(&b, &c).context("edge b→c should exist")?;
954 let qos = obs.intermediate_qos().context("intermediate QoS should be present")?;
955 assert!(
959 qos.average_probe_rate() > 0.0,
960 "probe should still be recorded even with saturated duration"
961 );
962
963 Ok(())
964 }
965
966 #[tokio::test]
967 async fn loopback_timeout_should_record_failed_intermediate_on_target_edge() -> anyhow::Result<()> {
968 let me = pubkey_from(&SECRET_0);
972 let a = pubkey_from(&SECRET_1);
973 let b = pubkey_from(&SECRET_2);
974
975 let graph = ChannelGraph::new(me);
976 graph.add_node(a);
977 graph.add_node(b);
978 graph.add_edge(&me, &a)?;
979 graph.add_edge(&a, &b)?;
980 graph.add_edge(&b, &me)?; send_loopback_timeout(&graph, [0, 1, 2, 0, 0]);
983
984 let obs = graph.edge(&a, &b).context("edge a→b should exist")?;
985 let qos = obs
986 .intermediate_qos()
987 .context("intermediate QoS should be present on a→b after timeout")?;
988 assert!(qos.average_latency().is_none(), "failed probe should not set latency");
989 assert!(qos.average_probe_rate() < 1.0, "failed probe should lower success rate");
990
991 let obs_me_a = graph.edge(&me, &a).context("edge me→a should exist")?;
993 assert!(obs_me_a.intermediate_qos().is_none());
994
995 Ok(())
996 }
997
998 #[tokio::test]
999 async fn loopback_timeout_four_hop_should_attribute_to_penultimate_edge() -> anyhow::Result<()> {
1000 let me = pubkey_from(&SECRET_0);
1003 let a = pubkey_from(&SECRET_1);
1004 let b = pubkey_from(&SECRET_2);
1005 let c = pubkey_from(&SECRET_3);
1006
1007 let graph = ChannelGraph::new(me);
1008 graph.add_node(a);
1009 graph.add_node(b);
1010 graph.add_node(c);
1011 graph.add_edge(&me, &a)?;
1012 graph.add_edge(&a, &b)?;
1013 graph.add_edge(&b, &c)?;
1014 graph.add_edge(&c, &me)?;
1015
1016 send_loopback_timeout(&graph, [0, 1, 2, 3, 0]);
1017
1018 let obs = graph.edge(&b, &c).context("edge b→c should exist")?;
1020 let qos = obs
1021 .intermediate_qos()
1022 .context("intermediate QoS should be present on b→c")?;
1023 assert!(qos.average_latency().is_none());
1024 assert!(qos.average_probe_rate() < 1.0);
1025
1026 let obs_me_a = graph.edge(&me, &a).context("edge me→a should exist")?;
1028 assert!(obs_me_a.intermediate_qos().is_none());
1029 let obs_a_b = graph.edge(&a, &b).context("edge a→b should exist")?;
1030 assert!(obs_a_b.intermediate_qos().is_none());
1031
1032 Ok(())
1033 }
1034
1035 #[tokio::test]
1036 async fn loopback_timeout_invalid_path_should_be_ignored() -> anyhow::Result<()> {
1037 let me = pubkey_from(&SECRET_0);
1038 let a = pubkey_from(&SECRET_1);
1039
1040 let graph = ChannelGraph::new(me);
1041 graph.add_node(a);
1042 graph.add_edge(&me, &a)?;
1043
1044 let telemetry: Result<
1046 EdgeTransportTelemetry<TestNeighbor, LoopbackTestPath>,
1047 NetworkGraphError<LoopbackTestPath>,
1048 > = Err(NetworkGraphError::ProbeLoopbackTimeout(LoopbackTestPath {
1049 path_bytes: vec![0u8; 8],
1050 timestamp_ms: 0,
1051 }));
1052 graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
1053
1054 let obs = graph.edge(&me, &a).context("edge should exist")?;
1055 assert!(obs.intermediate_qos().is_none());
1056
1057 Ok(())
1058 }
1059
1060 #[tokio::test]
1061 async fn loopback_timeout_single_edge_should_be_ignored() -> anyhow::Result<()> {
1062 let me = pubkey_from(&SECRET_0);
1064 let a = pubkey_from(&SECRET_1);
1065
1066 let graph = ChannelGraph::new(me);
1067 graph.add_node(a);
1068 graph.add_edge(&me, &a)?;
1069
1070 send_loopback_timeout(&graph, [0, 1, 0, 0, 0]);
1071
1072 let obs = graph.edge(&me, &a).context("edge should exist")?;
1073 assert!(
1074 obs.intermediate_qos().is_none(),
1075 "single-edge timeout should not produce intermediate measurement"
1076 );
1077
1078 Ok(())
1079 }
1080}