Skip to main content

hopr_network_graph/petgraph/
update.rs

1use hopr_api::graph::{MeasurableEdge, MeasurableNode, NetworkGraphWrite, traits::EdgeObservableWrite};
2use petgraph::graph::{EdgeIndex, NodeIndex};
3
4use crate::{ChannelGraph, Observations, graph::InnerGraph};
5
6/// Resolves a loopback path from serialized node-index bytes into a validated chain of edge indices.
7///
8/// The `path_bytes` encode a `PathId` where each `u64` is a [`NodeIndex`].
9/// The path is expected to start and end at `me_idx` (a closed loop).
10///
11/// Walks consecutive node pairs, finding the connecting edge for each.
12/// Stops when the loop closes back to `me_idx` or when no edge exists
13/// between a pair. Returns `None` if the path bytes have wrong length,
14/// the first node is not `me_idx`, or fewer than 2 edges can be resolved.
15fn resolve_loopback_edges(inner: &InnerGraph, me_idx: NodeIndex, path_bytes: &[u8]) -> Option<Vec<EdgeIndex>> {
16    if path_bytes.len() != size_of::<hopr_api::ct::PathId>() {
17        tracing::warn!(
18            path_len = path_bytes.len(),
19            expected = size_of::<hopr_api::ct::PathId>(),
20            "invalid loopback path byte length"
21        );
22        return None;
23    }
24
25    let mut path_id = [0u64; 5];
26    for (i, chunk) in path_bytes.chunks_exact(8).enumerate() {
27        path_id[i] = u64::from_le_bytes(chunk.try_into().expect("chunk is 8 bytes"));
28    }
29
30    let me_val = me_idx.index() as u64;
31
32    // First node must be self
33    if path_id[0] != me_val {
34        tracing::warn!("loopback path does not start at self");
35        return None;
36    }
37
38    // Find the closing node: the first reoccurrence of me after position 0
39    let Some(end_pos) = path_id[1..].iter().position(|&v| v == me_val).map(|p| p + 1) else {
40        tracing::warn!("loopback path does not close back to self");
41        return None;
42    };
43
44    // Walk consecutive node pairs up to (and including) the closing node
45    let mut edges = Vec::new();
46
47    for pair in path_id[..=end_pos].windows(2) {
48        let from = NodeIndex::new(pair[0] as usize);
49        let to = NodeIndex::new(pair[1] as usize);
50        let Some(edge) = inner.graph.find_edge(from, to) else {
51            break;
52        };
53        edges.push(edge);
54    }
55
56    if edges.len() < 2 {
57        tracing::warn!(
58            edge_count = edges.len(),
59            "loopback path too short to attribute intermediate measurement"
60        );
61        return None;
62    }
63
64    Some(edges)
65}
66
67impl hopr_api::graph::NetworkGraphUpdate for ChannelGraph {
68    #[tracing::instrument(level = "debug", skip(self, update))]
69    fn record_edge<N, P>(&self, update: MeasurableEdge<N, P>)
70    where
71        N: hopr_api::graph::MeasurablePeer + Send + Clone,
72        P: hopr_api::graph::MeasurablePath + Send + Clone,
73    {
74        use hopr_api::graph::{
75            EdgeLinkObservable,
76            traits::{EdgeObservableRead, EdgeWeightType},
77        };
78
79        match update {
80            MeasurableEdge::Probe(Ok(hopr_api::graph::EdgeTransportTelemetry::Neighbor(ref telemetry))) => {
81                tracing::trace!(
82                    peer = %telemetry.peer(),
83                    latency_ms = telemetry.rtt().as_millis(),
84                    "neighbor probe successful"
85                );
86
87                // Both directions are set for immediate connections, because the graph is directional
88                // and must be directionally complete for looping traffic.
89                self.upsert_edge(&self.me, telemetry.peer(), |obs| {
90                    obs.record(EdgeWeightType::Connected(true));
91                    obs.record(EdgeWeightType::Immediate(Ok(telemetry.rtt() / 2)));
92                });
93                self.upsert_edge(telemetry.peer(), &self.me, |obs| {
94                    obs.record(EdgeWeightType::Connected(true));
95                    obs.record(EdgeWeightType::Immediate(Ok(telemetry.rtt() / 2)));
96                });
97            }
98            MeasurableEdge::Probe(Ok(hopr_api::graph::EdgeTransportTelemetry::Loopback(telemetry))) => {
99                tracing::trace!("loopback probe successful");
100
101                let mut inner = self.inner.write();
102                let Some(me_idx) = inner.indices.get_by_left(&self.me).copied() else {
103                    tracing::debug!("failed to resolve index of myself for loopback probe attribution");
104                    return;
105                };
106                let Some(edges) = resolve_loopback_edges(&inner, me_idx, telemetry.path()) else {
107                    tracing::debug!("failed to resolve loopback path for probe attribution");
108                    return;
109                };
110
111                let target_idx = edges.len() - 2;
112
113                // Attributed duration = total RTT - sum of all known edge latencies.
114                // For each edge (including the target), use intermediate QoS if available,
115                // otherwise fall back to immediate QoS. The residual is attributed to the
116                // target edge as its new intermediate measurement.
117                let total_rtt = std::time::Duration::from_millis(telemetry.timestamp() as u64);
118                let mut known_latency = std::time::Duration::ZERO;
119
120                for &edge in &edges {
121                    if let Some(weight) = inner.graph.edge_weight(edge) {
122                        let lat = weight
123                            .intermediate_qos()
124                            .and_then(|q| q.average_latency())
125                            .or_else(|| weight.immediate_qos().and_then(|q| q.average_latency()));
126                        if let Some(lat) = lat {
127                            known_latency += lat;
128                        }
129                    } else {
130                        tracing::debug!("failed to find edge for loopback probe attribution");
131                    }
132                }
133
134                let attributed_duration = total_rtt.saturating_sub(known_latency);
135
136                tracing::trace!(
137                    target_edge = edges[target_idx].index(),
138                    attributed_ms = attributed_duration.as_millis(),
139                    total_rtt_ms = total_rtt.as_millis(),
140                    path_edges = edges.len(),
141                    "loopback probe attributed to intermediate edge"
142                );
143
144                if let Some(weight) = inner.graph.edge_weight_mut(edges[target_idx]) {
145                    weight.record(EdgeWeightType::Intermediate(Ok(attributed_duration)));
146                } else {
147                    tracing::debug!("failed to find target edge for loopback probe attribution");
148                }
149            }
150            MeasurableEdge::Probe(Err(hopr_api::graph::NetworkGraphError::ProbeNeighborTimeout(ref peer))) => {
151                tracing::trace!(
152                    peer = %peer,
153                    reason = "probe timeout",
154                    "neighbor probe failed"
155                );
156
157                // Both directions are set for immediate connections, because the graph is directional
158                // and must be directionally complete for looping traffic.
159                self.upsert_edge(&self.me, peer, |obs| {
160                    obs.record(EdgeWeightType::Immediate(Err(())));
161                });
162                self.upsert_edge(peer, &self.me, |obs| {
163                    obs.record(EdgeWeightType::Immediate(Err(())));
164                });
165            }
166            MeasurableEdge::Probe(Err(hopr_api::graph::NetworkGraphError::ProbeLoopbackTimeout(telemetry))) => {
167                tracing::trace!("loopback probe failed");
168
169                let mut inner = self.inner.write();
170                let Some(me_idx) = inner.indices.get_by_left(&self.me).copied() else {
171                    tracing::debug!("failed to resolve index of myself");
172                    return;
173                };
174                let Some(edges) = resolve_loopback_edges(&inner, me_idx, telemetry.path()) else {
175                    tracing::debug!("failed to resolve loopback path for probe timeout, cannot attribute");
176                    return;
177                };
178
179                let target_idx = edges.len() - 2;
180
181                tracing::trace!(
182                    target_edge = edges[target_idx].index(),
183                    path_edges = edges.len(),
184                    "loopback probe timeout attributed to intermediate edge"
185                );
186
187                if let Some(weight) = inner.graph.edge_weight_mut(edges[target_idx]) {
188                    weight.record(EdgeWeightType::Intermediate(Err(())));
189                }
190            }
191            MeasurableEdge::Capacity(update) => {
192                self.upsert_edge(&update.src, &update.dest, |obs: &mut Observations| {
193                    obs.record(EdgeWeightType::Capacity(update.capacity));
194                });
195            }
196            MeasurableEdge::ConnectionStatus { peer, connected } => {
197                tracing::trace!(
198                    peer = %peer,
199                    connected = connected,
200                    "recording connection status update"
201                );
202
203                self.upsert_edge(&self.me, &peer, |obs| {
204                    obs.record(EdgeWeightType::Connected(connected));
205                });
206                self.upsert_edge(&peer, &self.me, |obs| {
207                    obs.record(EdgeWeightType::Connected(connected));
208                });
209            }
210        }
211    }
212
213    #[tracing::instrument(level = "debug", skip(self, update))]
214    fn record_node<N>(&self, update: N)
215    where
216        N: MeasurableNode + Clone + Send + Sync + 'static,
217    {
218        hopr_api::graph::NetworkGraphWrite::add_node(self, update.into());
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use anyhow::Context;
225    use hex_literal::hex;
226    use hopr_api::{
227        OffchainPublicKey,
228        graph::{
229            EdgeLinkObservable, EdgeTransportTelemetry, MeasurablePath, MeasurablePeer, NetworkGraphError,
230            NetworkGraphUpdate, NetworkGraphView, NetworkGraphWrite,
231            traits::{EdgeObservableRead, EdgeProtocolObservable},
232        },
233        types::crypto::prelude::{Keypair, OffchainKeypair},
234    };
235
236    use super::*;
237
238    /// Fixed test secret keys (reused from the broader codebase).
239    const SECRET_0: [u8; 32] = hex!("60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d");
240    const SECRET_1: [u8; 32] = hex!("71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a");
241    const SECRET_2: [u8; 32] = hex!("c24bd833704dd2abdae3933fcc9962c2ac404f84132224c474147382d4db2299");
242    const SECRET_3: [u8; 32] = hex!("e0bf93e9c916104da00b1850adc4608bd7e9087bbd3f805451f4556aa6b3fd6e");
243
244    /// Creates an OffchainPublicKey from a fixed secret.
245    fn pubkey_from(secret: &[u8; 32]) -> OffchainPublicKey {
246        *OffchainKeypair::from_secret(secret).expect("valid secret key").public()
247    }
248
249    #[derive(Debug, Clone)]
250    struct TestNeighbor {
251        peer: OffchainPublicKey,
252        rtt: std::time::Duration,
253    }
254
255    impl MeasurablePeer for TestNeighbor {
256        fn peer(&self) -> &OffchainPublicKey {
257            &self.peer
258        }
259
260        fn rtt(&self) -> std::time::Duration {
261            self.rtt
262        }
263    }
264
265    #[derive(Debug, Clone)]
266    struct TestPath;
267
268    impl MeasurablePath for TestPath {
269        fn id(&self) -> &[u8] {
270            &[]
271        }
272
273        fn path(&self) -> &[u8] {
274            &[]
275        }
276
277        fn timestamp(&self) -> u128 {
278            0
279        }
280    }
281
282    #[tokio::test]
283    async fn neighbor_probe_should_update_edge_observation() -> anyhow::Result<()> {
284        let me_kp = OffchainKeypair::from_secret(&SECRET_0)?;
285        let me = *me_kp.public();
286        let peer_kp = OffchainKeypair::from_secret(&SECRET_1)?;
287        let peer_key = *peer_kp.public();
288
289        let graph = ChannelGraph::new(me);
290        graph.add_node(peer_key);
291        graph.add_edge(&me, &peer_key)?;
292
293        let rtt = std::time::Duration::from_millis(100);
294        let telemetry: Result<EdgeTransportTelemetry<TestNeighbor, TestPath>, NetworkGraphError<TestPath>> =
295            Ok(EdgeTransportTelemetry::Neighbor(TestNeighbor { peer: peer_key, rtt }));
296        graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
297
298        let obs = graph.edge(&me, &peer_key).context("edge observation should exist")?;
299        let immediate = obs
300            .immediate_qos()
301            .context("immediate QoS should be present after probe")?;
302        assert_eq!(immediate.average_latency().context("latency should be set")?, rtt / 2,);
303        Ok(())
304    }
305
306    #[tokio::test]
307    async fn neighbor_probe_should_create_symmetric_edges() -> anyhow::Result<()> {
308        let me = pubkey_from(&SECRET_0);
309        let peer = pubkey_from(&SECRET_1);
310
311        let graph = ChannelGraph::new(me);
312        graph.add_node(peer);
313        // No edges pre-created — upsert should create both directions
314
315        let rtt = std::time::Duration::from_millis(100);
316        let telemetry: Result<EdgeTransportTelemetry<TestNeighbor, TestPath>, NetworkGraphError<TestPath>> =
317            Ok(EdgeTransportTelemetry::Neighbor(TestNeighbor { peer, rtt }));
318        graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
319
320        // me → peer
321        let obs_fwd = graph.edge(&me, &peer).context("edge me→peer should exist")?;
322        let imm_fwd = obs_fwd.immediate_qos().context("me→peer should have immediate QoS")?;
323        assert_eq!(
324            imm_fwd.average_latency().context("me→peer latency should be set")?,
325            rtt / 2
326        );
327
328        // peer → me
329        let obs_rev = graph.edge(&peer, &me).context("edge peer→me should exist")?;
330        let imm_rev = obs_rev.immediate_qos().context("peer→me should have immediate QoS")?;
331        assert_eq!(
332            imm_rev.average_latency().context("peer→me latency should be set")?,
333            rtt / 2
334        );
335
336        Ok(())
337    }
338
339    #[tokio::test]
340    async fn neighbor_probe_timeout_should_create_symmetric_edges() -> anyhow::Result<()> {
341        let me = pubkey_from(&SECRET_0);
342        let peer = pubkey_from(&SECRET_1);
343
344        let graph = ChannelGraph::new(me);
345        graph.add_node(peer);
346        // No edges pre-created
347
348        let telemetry: Result<EdgeTransportTelemetry<TestNeighbor, TestPath>, NetworkGraphError<TestPath>> =
349            Err(NetworkGraphError::ProbeNeighborTimeout(Box::new(peer)));
350        graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
351
352        // me → peer
353        let obs_fwd = graph
354            .edge(&me, &peer)
355            .context("edge me→peer should exist after timeout")?;
356        let imm_fwd = obs_fwd.immediate_qos().context("me→peer should have immediate QoS")?;
357        assert!(
358            imm_fwd.average_latency().is_none(),
359            "failed probe should not set latency"
360        );
361        assert!(
362            imm_fwd.average_probe_rate() < 1.0,
363            "failed probe should lower success rate"
364        );
365
366        // peer → me
367        let obs_rev = graph
368            .edge(&peer, &me)
369            .context("edge peer→me should exist after timeout")?;
370        let imm_rev = obs_rev.immediate_qos().context("peer→me should have immediate QoS")?;
371        assert!(
372            imm_rev.average_latency().is_none(),
373            "failed probe should not set latency on reverse"
374        );
375        assert!(
376            imm_rev.average_probe_rate() < 1.0,
377            "failed probe should lower success rate on reverse"
378        );
379
380        Ok(())
381    }
382
383    #[tokio::test]
384    async fn probe_timeout_should_record_as_failed_probe() -> anyhow::Result<()> {
385        let me_kp = OffchainKeypair::from_secret(&SECRET_0)?;
386        let me = *me_kp.public();
387        let peer_kp = OffchainKeypair::from_secret(&SECRET_1)?;
388        let peer_key = *peer_kp.public();
389
390        let graph = ChannelGraph::new(me);
391        graph.add_node(peer_key);
392        graph.add_edge(&me, &peer_key)?;
393
394        let telemetry: Result<EdgeTransportTelemetry<TestNeighbor, TestPath>, NetworkGraphError<TestPath>> =
395            Err(NetworkGraphError::ProbeNeighborTimeout(Box::new(peer_key)));
396        graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
397
398        let obs = graph.edge(&me, &peer_key).context("edge observation should exist")?;
399        let immediate = obs
400            .immediate_qos()
401            .context("immediate QoS should be present after failed probe")?;
402        assert!(immediate.average_latency().is_none());
403        assert!(immediate.average_probe_rate() < 1.0);
404        Ok(())
405    }
406
407    #[tokio::test]
408    async fn capacity_update_should_set_edge_capacity() -> anyhow::Result<()> {
409        let me = pubkey_from(&SECRET_0);
410        let peer = pubkey_from(&SECRET_1);
411        let graph = ChannelGraph::new(me);
412        graph.add_node(peer);
413        graph.add_edge(&me, &peer)?;
414
415        let capacity_update = hopr_api::graph::EdgeCapacityUpdate {
416            src: me,
417            dest: peer,
418            capacity: Some(1000),
419        };
420        graph.record_edge::<TestNeighbor, TestPath>(hopr_api::graph::MeasurableEdge::Capacity(Box::new(
421            capacity_update,
422        )));
423
424        let obs = graph.edge(&me, &peer).context("edge should exist")?;
425        let intermediate = obs
426            .intermediate_qos()
427            .context("intermediate QoS should be present after capacity update")?;
428        assert_eq!(intermediate.capacity(), Some(1000));
429        Ok(())
430    }
431
432    #[tokio::test]
433    async fn capacity_update_should_accept_none_value() -> anyhow::Result<()> {
434        let me = pubkey_from(&SECRET_0);
435        let peer = pubkey_from(&SECRET_1);
436        let graph = ChannelGraph::new(me);
437        graph.add_node(peer);
438        graph.add_edge(&me, &peer)?;
439
440        let capacity_update = hopr_api::graph::EdgeCapacityUpdate {
441            src: me,
442            dest: peer,
443            capacity: None,
444        };
445        graph.record_edge::<TestNeighbor, TestPath>(hopr_api::graph::MeasurableEdge::Capacity(Box::new(
446            capacity_update,
447        )));
448
449        let obs = graph.edge(&me, &peer).context("edge should exist")?;
450        let intermediate = obs.intermediate_qos().context("intermediate QoS should be present")?;
451        assert_eq!(intermediate.capacity(), None);
452        Ok(())
453    }
454
455    #[tokio::test]
456    async fn record_node_should_add_node_to_graph() {
457        let me = pubkey_from(&SECRET_0);
458        let peer = pubkey_from(&SECRET_1);
459        let graph = ChannelGraph::new(me);
460
461        assert!(!graph.contains_node(&peer));
462        graph.record_node(peer);
463        assert!(graph.contains_node(&peer));
464    }
465
466    #[tokio::test]
467    async fn probe_should_create_edge_if_absent() -> anyhow::Result<()> {
468        let me = pubkey_from(&SECRET_0);
469        let peer = pubkey_from(&SECRET_1);
470        let graph = ChannelGraph::new(me);
471        graph.add_node(peer);
472        // No explicit add_edge — record_edge should upsert
473
474        let rtt = std::time::Duration::from_millis(80);
475        let telemetry: Result<EdgeTransportTelemetry<TestNeighbor, TestPath>, NetworkGraphError<TestPath>> =
476            Ok(EdgeTransportTelemetry::Neighbor(TestNeighbor { peer: peer, rtt }));
477        graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
478
479        assert!(graph.has_edge(&me, &peer), "probe should create edge via upsert");
480        let obs = graph.edge(&me, &peer).context("edge should exist")?;
481        assert!(obs.immediate_qos().is_some());
482        Ok(())
483    }
484
485    #[tokio::test]
486    async fn multiple_probes_should_accumulate_in_observations() -> anyhow::Result<()> {
487        let me = pubkey_from(&SECRET_0);
488        let peer = pubkey_from(&SECRET_1);
489        let graph = ChannelGraph::new(me);
490        graph.add_node(peer);
491        graph.add_edge(&me, &peer)?;
492
493        // Send several successful probes
494        for _ in 0..5 {
495            let telemetry: Result<EdgeTransportTelemetry<TestNeighbor, TestPath>, NetworkGraphError<TestPath>> =
496                Ok(EdgeTransportTelemetry::Neighbor(TestNeighbor {
497                    peer: peer,
498                    rtt: std::time::Duration::from_millis(60),
499                }));
500            graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
501        }
502
503        let obs = graph.edge(&me, &peer).context("edge should exist")?;
504        let qos = obs.immediate_qos().context("immediate QoS should exist")?;
505        assert_eq!(
506            qos.average_latency().context("latency should be set")?,
507            std::time::Duration::from_millis(30), // rtt / 2 = 30ms
508        );
509        assert!(qos.average_probe_rate() > 0.9, "all probes succeeded");
510        Ok(())
511    }
512
513    /// A `MeasurablePath` carrying a serialized `PathId` and a timestamp for
514    /// loopback probe telemetry tests.
515    #[derive(Debug, Clone)]
516    struct LoopbackTestPath {
517        path_bytes: Vec<u8>,
518        timestamp_ms: u128,
519    }
520
521    impl LoopbackTestPath {
522        fn new(path_id: [u64; 5], timestamp_ms: u128) -> Self {
523            let path_bytes = path_id.iter().flat_map(|v| v.to_le_bytes()).collect();
524            Self {
525                path_bytes,
526                timestamp_ms,
527            }
528        }
529    }
530
531    impl MeasurablePath for LoopbackTestPath {
532        fn id(&self) -> &[u8] {
533            &[]
534        }
535
536        fn path(&self) -> &[u8] {
537            &self.path_bytes
538        }
539
540        fn timestamp(&self) -> u128 {
541            self.timestamp_ms
542        }
543    }
544
545    /// Helper to send a loopback probe with the given path and timestamp.
546    fn send_loopback(graph: &ChannelGraph, path_id: [u64; 5], timestamp_ms: u128) {
547        let telemetry: Result<
548            EdgeTransportTelemetry<TestNeighbor, LoopbackTestPath>,
549            NetworkGraphError<LoopbackTestPath>,
550        > = Ok(EdgeTransportTelemetry::Loopback(LoopbackTestPath::new(
551            path_id,
552            timestamp_ms,
553        )));
554        graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
555    }
556
557    /// Helper to send a loopback timeout with the given path.
558    fn send_loopback_timeout(graph: &ChannelGraph, path_id: [u64; 5]) {
559        let telemetry: Result<
560            EdgeTransportTelemetry<TestNeighbor, LoopbackTestPath>,
561            NetworkGraphError<LoopbackTestPath>,
562        > = Err(NetworkGraphError::ProbeLoopbackTimeout(LoopbackTestPath::new(
563            path_id, 0,
564        )));
565        graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
566    }
567
568    #[tokio::test]
569    async fn loopback_three_hop_should_attribute_to_penultimate_edge() -> anyhow::Result<()> {
570        // Loopback: me(0) → a(1) → b(2) → me(0)
571        // PathId nodes: [me=0, a=1, b=2, me=0, 0]
572        // Resolved edges: me→a, a→b, b→me (3 edges)
573        // Target = edges[len-2] = edges[1] = a→b
574        let me = pubkey_from(&SECRET_0);
575        let a = pubkey_from(&SECRET_1);
576        let b = pubkey_from(&SECRET_2);
577
578        let graph = ChannelGraph::new(me);
579        graph.add_node(a);
580        graph.add_node(b);
581        graph.add_edge(&me, &a)?;
582        graph.add_edge(&a, &b)?;
583        graph.add_edge(&b, &me)?; // return edge
584
585        send_loopback(&graph, [0, 1, 2, 0, 0], 200);
586
587        let obs = graph.edge(&a, &b).context("edge a→b should exist")?;
588        let qos = obs
589            .intermediate_qos()
590            .context("intermediate QoS should be present on a→b")?;
591        assert_eq!(
592            qos.average_latency().context("latency should be set")?,
593            std::time::Duration::from_millis(200),
594        );
595
596        // me→a should NOT have intermediate QoS from this probe
597        let obs_me_a = graph.edge(&me, &a).context("edge me→a should exist")?;
598        assert!(obs_me_a.intermediate_qos().is_none());
599
600        Ok(())
601    }
602
603    #[tokio::test]
604    async fn loopback_four_hop_should_attribute_to_penultimate_edge() -> anyhow::Result<()> {
605        // Loopback: me(0) → a(1) → b(2) → c(3) → me(0)
606        // PathId nodes: [me=0, a=1, b=2, c=3, me=0]
607        // Resolved edges: me→a, a→b, b→c, c→me (4 edges)
608        // Target = edges[len-2] = edges[2] = b→c
609        let me = pubkey_from(&SECRET_0);
610        let a = pubkey_from(&SECRET_1);
611        let b = pubkey_from(&SECRET_2);
612        let c = pubkey_from(&SECRET_3);
613
614        let graph = ChannelGraph::new(me);
615        graph.add_node(a);
616        graph.add_node(b);
617        graph.add_node(c);
618        graph.add_edge(&me, &a)?;
619        graph.add_edge(&a, &b)?;
620        graph.add_edge(&b, &c)?;
621        graph.add_edge(&c, &me)?; // return edge
622
623        send_loopback(&graph, [0, 1, 2, 3, 0], 300);
624
625        // Edge b→c (target) should have the intermediate QoS
626        let obs = graph.edge(&b, &c).context("edge b→c should exist")?;
627        let qos = obs
628            .intermediate_qos()
629            .context("intermediate QoS should be present on b→c")?;
630        assert_eq!(
631            qos.average_latency().context("latency should be set")?,
632            std::time::Duration::from_millis(300),
633            "no preceding intermediate latencies, so full RTT is attributed"
634        );
635
636        // Earlier edges should NOT have intermediate QoS from this probe
637        let obs_me_a = graph.edge(&me, &a).context("edge me→a should exist")?;
638        assert!(obs_me_a.intermediate_qos().is_none());
639        let obs_a_b = graph.edge(&a, &b).context("edge a→b should exist")?;
640        assert!(obs_a_b.intermediate_qos().is_none());
641
642        Ok(())
643    }
644
645    #[tokio::test]
646    async fn loopback_should_subtract_known_preceding_latencies() -> anyhow::Result<()> {
647        // Loopback: me(0) → a(1) → b(2) → c(3) → me(0)
648        // Resolved edges: me→a, a→b, b→c, c→me (4 edges). Target = b→c (idx 2).
649        // Preceding edges = [me→a, a→b]
650        // Pre-set me→a = 80ms, a→b = 40ms
651        // Attributed for b→c = 300 - 80 - 40 = 180ms
652        let me = pubkey_from(&SECRET_0);
653        let a = pubkey_from(&SECRET_1);
654        let b = pubkey_from(&SECRET_2);
655        let c = pubkey_from(&SECRET_3);
656
657        let graph = ChannelGraph::new(me);
658        graph.add_node(a);
659        graph.add_node(b);
660        graph.add_node(c);
661        graph.add_edge(&me, &a)?;
662        graph.add_edge(&a, &b)?;
663        graph.add_edge(&b, &c)?;
664        graph.add_edge(&c, &me)?; // return edge
665
666        // Pre-set intermediate latency on me→a and a→b
667        graph.upsert_edge(&me, &a, |obs| {
668            use hopr_api::graph::traits::EdgeObservableWrite;
669            obs.record(hopr_api::graph::traits::EdgeWeightType::Intermediate(Ok(
670                std::time::Duration::from_millis(80),
671            )));
672        });
673        graph.upsert_edge(&a, &b, |obs| {
674            use hopr_api::graph::traits::EdgeObservableWrite;
675            obs.record(hopr_api::graph::traits::EdgeWeightType::Intermediate(Ok(
676                std::time::Duration::from_millis(40),
677            )));
678        });
679
680        send_loopback(&graph, [0, 1, 2, 3, 0], 300);
681
682        let obs = graph.edge(&b, &c).context("edge b→c should exist")?;
683        let qos = obs
684            .intermediate_qos()
685            .context("intermediate QoS should be present on b→c")?;
686        assert_eq!(
687            qos.average_latency().context("latency should be set")?,
688            std::time::Duration::from_millis(180),
689            "300ms total - 80ms (me→a) - 40ms (a→b) = 180ms attributed to b→c"
690        );
691
692        Ok(())
693    }
694
695    #[tokio::test]
696    async fn loopback_should_subtract_immediate_latency_on_first_edge() -> anyhow::Result<()> {
697        // Loopback: me(0) → a(1) → b(2) → me(0)
698        // Resolved edges: me→a, a→b, b→me (3 edges). Target = a→b (idx 1).
699        // me→a has immediate QoS = 60ms (from my neighbor probing of a), no intermediate yet.
700        // Attributed for a→b = 200 - 60 = 140ms
701        let me = pubkey_from(&SECRET_0);
702        let a = pubkey_from(&SECRET_1);
703        let b = pubkey_from(&SECRET_2);
704
705        let graph = ChannelGraph::new(me);
706        graph.add_node(a);
707        graph.add_node(b);
708        graph.add_edge(&me, &a)?;
709        graph.add_edge(&a, &b)?;
710        graph.add_edge(&b, &me)?;
711
712        // Pre-set immediate QoS on me→a (my direct measurement to neighbor a)
713        graph.upsert_edge(&me, &a, |obs| {
714            use hopr_api::graph::traits::EdgeObservableWrite;
715            obs.record(hopr_api::graph::traits::EdgeWeightType::Immediate(Ok(
716                std::time::Duration::from_millis(60),
717            )));
718        });
719
720        send_loopback(&graph, [0, 1, 2, 0, 0], 200);
721
722        let obs = graph.edge(&a, &b).context("edge a→b should exist")?;
723        let qos = obs
724            .intermediate_qos()
725            .context("intermediate QoS should be present on a→b")?;
726        assert_eq!(
727            qos.average_latency().context("latency should be set")?,
728            std::time::Duration::from_millis(140),
729            "200ms total - 60ms (me→a immediate) = 140ms attributed to a→b"
730        );
731
732        Ok(())
733    }
734
735    #[tokio::test]
736    async fn loopback_invalid_path_length_should_be_ignored() -> anyhow::Result<()> {
737        let me = pubkey_from(&SECRET_0);
738        let a = pubkey_from(&SECRET_1);
739
740        let graph = ChannelGraph::new(me);
741        graph.add_node(a);
742        graph.add_edge(&me, &a)?;
743
744        // Send loopback with wrong-length path bytes (not 40 bytes)
745        let telemetry: Result<
746            EdgeTransportTelemetry<TestNeighbor, LoopbackTestPath>,
747            NetworkGraphError<LoopbackTestPath>,
748        > = Ok(EdgeTransportTelemetry::Loopback(LoopbackTestPath {
749            path_bytes: vec![0u8; 16], // wrong: 16 bytes instead of 40
750            timestamp_ms: 100,
751        }));
752        graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
753
754        // Edge should have no intermediate observations
755        let obs = graph.edge(&me, &a).context("edge should exist")?;
756        assert!(
757            obs.intermediate_qos().is_none(),
758            "invalid path bytes should not produce any intermediate measurement"
759        );
760
761        Ok(())
762    }
763
764    #[tokio::test]
765    async fn loopback_single_edge_path_should_be_ignored_if_no_immediate_or_intermediate_result_exists_for_the_edge()
766    -> anyhow::Result<()> {
767        // A path with only 1 edge has no "edge before the last"
768        // me(0) → a(1)
769        // PathId nodes: [me=0, a=1, 0, 0, 0]
770        // Trailing 0 = me which is already visited → stops at 1 edge
771        let me = pubkey_from(&SECRET_0);
772        let a = pubkey_from(&SECRET_1);
773
774        let graph = ChannelGraph::new(me);
775        graph.add_node(a);
776        graph.add_edge(&me, &a)?;
777
778        send_loopback(&graph, [0, 1, 0, 0, 0], 100);
779
780        let obs = graph.edge(&me, &a).context("edge should exist")?;
781        assert!(
782            obs.intermediate_qos().is_none(),
783            "single-edge path should not produce intermediate measurement"
784        );
785
786        Ok(())
787    }
788
789    #[tokio::test]
790    async fn loopback_two_edge_path_should_attribute_when_return_edge_exists() -> anyhow::Result<()> {
791        // Loopback: me(0) → a(1) → me(0)
792        // PathId nodes: [me=0, a=1, me=0, 0, 0]
793        // Resolved edges: me→a, a→me (2 edges). Target = me→a (idx 0).
794        // me→a already has immediate QoS = 50ms (from my neighbor probing).
795        // No known latency on the non-target edge a→me, so attributed = full RTT.
796        let me = pubkey_from(&SECRET_0);
797        let a = pubkey_from(&SECRET_1);
798
799        let graph = ChannelGraph::new(me);
800        graph.add_node(a);
801        graph.add_edge(&me, &a)?;
802        graph.add_edge(&a, &me)?; // return edge
803
804        // Pre-set immediate QoS on me→a (my direct neighbor measurement)
805        graph.upsert_edge(&me, &a, |obs| {
806            use hopr_api::graph::traits::EdgeObservableWrite;
807            obs.record(hopr_api::graph::traits::EdgeWeightType::Immediate(Ok(
808                std::time::Duration::from_millis(50),
809            )));
810        });
811
812        send_loopback(&graph, [0, 1, 0, 0, 0], 100);
813
814        let obs = graph.edge(&me, &a).context("edge me→a should exist")?;
815        let qos = obs
816            .intermediate_qos()
817            .context("intermediate QoS should be present on me→a")?;
818        assert_eq!(
819            qos.average_latency().context("latency should be set")?,
820            std::time::Duration::from_millis(50),
821            "100ms total - 50ms (me→a immediate) = 50ms attributed to me→a"
822        );
823
824        // Immediate QoS should still be intact
825        let imm = obs
826            .immediate_qos()
827            .context("immediate QoS should still be present on me→a")?;
828        assert_eq!(
829            imm.average_latency().context("immediate latency should be set")?,
830            std::time::Duration::from_millis(50),
831        );
832
833        Ok(())
834    }
835
836    #[tokio::test]
837    async fn loopback_broken_chain_should_be_ignored() -> anyhow::Result<()> {
838        // Nodes exist but no edge connects a to c (only b→c exists)
839        // me(0) → a(1), b(2) → c(3)
840        // PathId nodes: [me=0, a=1, c=3, 0, 0]
841        // Edge me→a exists, but edge a→c does NOT → chain breaks, 1 edge < 2
842        let me = pubkey_from(&SECRET_0);
843        let a = pubkey_from(&SECRET_1);
844        let b = pubkey_from(&SECRET_2);
845        let c = pubkey_from(&SECRET_3);
846
847        let graph = ChannelGraph::new(me);
848        graph.add_node(a);
849        graph.add_node(b);
850        graph.add_node(c);
851        graph.add_edge(&me, &a)?;
852        graph.add_edge(&b, &c)?; // b→c, NOT a→c
853
854        send_loopback(&graph, [0, 1, 3, 0, 0], 200);
855
856        let obs_me_a = graph.edge(&me, &a).context("edge me→a should exist")?;
857        assert!(
858            obs_me_a.intermediate_qos().is_none(),
859            "broken chain should not attribute any intermediate measurement"
860        );
861
862        Ok(())
863    }
864
865    #[tokio::test]
866    async fn loopback_wrong_start_node_should_be_ignored() -> anyhow::Result<()> {
867        // PathId starts with node 99 which is not me → early reject
868        let me = pubkey_from(&SECRET_0);
869        let a = pubkey_from(&SECRET_1);
870
871        let graph = ChannelGraph::new(me);
872        graph.add_node(a);
873        graph.add_edge(&me, &a)?;
874
875        send_loopback(&graph, [99, 1, 0, 0, 0], 200);
876
877        let obs = graph.edge(&me, &a).context("edge should exist")?;
878        assert!(
879            obs.intermediate_qos().is_none(),
880            "wrong start node should not produce any measurement"
881        );
882
883        Ok(())
884    }
885
886    #[tokio::test]
887    async fn loopback_probes_should_accumulate_on_target_edge() -> anyhow::Result<()> {
888        // Send multiple loopback probes for the same target edge
889        // Loopback: me(0) → a(1) → b(2) → me(0). Target = a→b.
890        let me = pubkey_from(&SECRET_0);
891        let a = pubkey_from(&SECRET_1);
892        let b = pubkey_from(&SECRET_2);
893
894        let graph = ChannelGraph::new(me);
895        graph.add_node(a);
896        graph.add_node(b);
897        graph.add_edge(&me, &a)?;
898        graph.add_edge(&a, &b)?;
899        graph.add_edge(&b, &me)?; // return edge
900
901        // Send 5 probes all with 100ms RTT.
902        // After each probe the target's intermediate QoS is subtracted from subsequent
903        // attributions, so the attributed value converges rather than staying at 100ms.
904        for _ in 0..5 {
905            send_loopback(&graph, [0, 1, 2, 0, 0], 100);
906        }
907
908        let obs = graph.edge(&a, &b).context("edge a→b should exist")?;
909        let qos = obs.intermediate_qos().context("intermediate QoS should be present")?;
910        assert!(
911            qos.average_latency().is_some(),
912            "latency should be set after multiple probes"
913        );
914        assert!(
915            qos.average_probe_rate() > 0.9,
916            "all probes succeeded, rate should be high"
917        );
918
919        Ok(())
920    }
921
922    // This is handled by the moving average object, but the expectation test can stay here.
923    #[tokio::test]
924    async fn loopback_saturating_sub_should_not_underflow() -> anyhow::Result<()> {
925        // If preceding edge latencies exceed total RTT, duration should saturate at 0
926        // Loopback: me(0) → a(1) → b(2) → c(3) → me(0). Target = b→c.
927        // Preceding = [me→a, a→b] with me→a = 500ms
928        let me = pubkey_from(&SECRET_0);
929        let a = pubkey_from(&SECRET_1);
930        let b = pubkey_from(&SECRET_2);
931        let c = pubkey_from(&SECRET_3);
932
933        let graph = ChannelGraph::new(me);
934        graph.add_node(a);
935        graph.add_node(b);
936        graph.add_node(c);
937        graph.add_edge(&me, &a)?;
938        graph.add_edge(&a, &b)?;
939        graph.add_edge(&b, &c)?;
940        graph.add_edge(&c, &me)?; // return edge
941
942        // Pre-set me→a intermediate latency to 500ms
943        graph.upsert_edge(&me, &a, |obs| {
944            use hopr_api::graph::traits::EdgeObservableWrite;
945            obs.record(hopr_api::graph::traits::EdgeWeightType::Intermediate(Ok(
946                std::time::Duration::from_millis(500),
947            )));
948        });
949
950        // Total RTT = 100ms, but preceding latency is 500ms → 100 - 500 saturates to 0
951        send_loopback(&graph, [0, 1, 2, 3, 0], 100);
952
953        let obs = graph.edge(&b, &c).context("edge b→c should exist")?;
954        let qos = obs.intermediate_qos().context("intermediate QoS should be present")?;
955        // Duration::ZERO means latency_average gets updated with 0ms
956        // which the EMA may not report as Some(0) but rather None if <= 0
957        // Let's check the probe rate instead — it should be recorded
958        assert!(
959            qos.average_probe_rate() > 0.0,
960            "probe should still be recorded even with saturated duration"
961        );
962
963        Ok(())
964    }
965
966    #[tokio::test]
967    async fn loopback_timeout_should_record_failed_intermediate_on_target_edge() -> anyhow::Result<()> {
968        // Loopback: me(0) → a(1) → b(2) → me(0)
969        // PathId nodes: [me=0, a=1, b=2, me=0, 0]
970        // Resolved edges: me→a, a→b, b→me. Target = edges[1] = a→b
971        let me = pubkey_from(&SECRET_0);
972        let a = pubkey_from(&SECRET_1);
973        let b = pubkey_from(&SECRET_2);
974
975        let graph = ChannelGraph::new(me);
976        graph.add_node(a);
977        graph.add_node(b);
978        graph.add_edge(&me, &a)?;
979        graph.add_edge(&a, &b)?;
980        graph.add_edge(&b, &me)?; // return edge
981
982        send_loopback_timeout(&graph, [0, 1, 2, 0, 0]);
983
984        let obs = graph.edge(&a, &b).context("edge a→b should exist")?;
985        let qos = obs
986            .intermediate_qos()
987            .context("intermediate QoS should be present on a→b after timeout")?;
988        assert!(qos.average_latency().is_none(), "failed probe should not set latency");
989        assert!(qos.average_probe_rate() < 1.0, "failed probe should lower success rate");
990
991        // me→a should NOT have intermediate QoS
992        let obs_me_a = graph.edge(&me, &a).context("edge me→a should exist")?;
993        assert!(obs_me_a.intermediate_qos().is_none());
994
995        Ok(())
996    }
997
998    #[tokio::test]
999    async fn loopback_timeout_four_hop_should_attribute_to_penultimate_edge() -> anyhow::Result<()> {
1000        // Loopback: me(0) → a(1) → b(2) → c(3) → me(0)
1001        // Target = last resolved edge = b→c
1002        let me = pubkey_from(&SECRET_0);
1003        let a = pubkey_from(&SECRET_1);
1004        let b = pubkey_from(&SECRET_2);
1005        let c = pubkey_from(&SECRET_3);
1006
1007        let graph = ChannelGraph::new(me);
1008        graph.add_node(a);
1009        graph.add_node(b);
1010        graph.add_node(c);
1011        graph.add_edge(&me, &a)?;
1012        graph.add_edge(&a, &b)?;
1013        graph.add_edge(&b, &c)?;
1014        graph.add_edge(&c, &me)?;
1015
1016        send_loopback_timeout(&graph, [0, 1, 2, 3, 0]);
1017
1018        // Edge b→c (target) should have a failed intermediate record
1019        let obs = graph.edge(&b, &c).context("edge b→c should exist")?;
1020        let qos = obs
1021            .intermediate_qos()
1022            .context("intermediate QoS should be present on b→c")?;
1023        assert!(qos.average_latency().is_none());
1024        assert!(qos.average_probe_rate() < 1.0);
1025
1026        // Earlier edges should NOT have intermediate QoS
1027        let obs_me_a = graph.edge(&me, &a).context("edge me→a should exist")?;
1028        assert!(obs_me_a.intermediate_qos().is_none());
1029        let obs_a_b = graph.edge(&a, &b).context("edge a→b should exist")?;
1030        assert!(obs_a_b.intermediate_qos().is_none());
1031
1032        Ok(())
1033    }
1034
1035    #[tokio::test]
1036    async fn loopback_timeout_invalid_path_should_be_ignored() -> anyhow::Result<()> {
1037        let me = pubkey_from(&SECRET_0);
1038        let a = pubkey_from(&SECRET_1);
1039
1040        let graph = ChannelGraph::new(me);
1041        graph.add_node(a);
1042        graph.add_edge(&me, &a)?;
1043
1044        // Wrong-length path
1045        let telemetry: Result<
1046            EdgeTransportTelemetry<TestNeighbor, LoopbackTestPath>,
1047            NetworkGraphError<LoopbackTestPath>,
1048        > = Err(NetworkGraphError::ProbeLoopbackTimeout(LoopbackTestPath {
1049            path_bytes: vec![0u8; 8],
1050            timestamp_ms: 0,
1051        }));
1052        graph.record_edge(hopr_api::graph::MeasurableEdge::Probe(telemetry));
1053
1054        let obs = graph.edge(&me, &a).context("edge should exist")?;
1055        assert!(obs.intermediate_qos().is_none());
1056
1057        Ok(())
1058    }
1059
1060    #[tokio::test]
1061    async fn loopback_timeout_single_edge_should_be_ignored() -> anyhow::Result<()> {
1062        // me(0) → a(1), PathId: [0, 1, 0, 0, 0] → 1 edge < 2
1063        let me = pubkey_from(&SECRET_0);
1064        let a = pubkey_from(&SECRET_1);
1065
1066        let graph = ChannelGraph::new(me);
1067        graph.add_node(a);
1068        graph.add_edge(&me, &a)?;
1069
1070        send_loopback_timeout(&graph, [0, 1, 0, 0, 0]);
1071
1072        let obs = graph.edge(&me, &a).context("edge should exist")?;
1073        assert!(
1074            obs.intermediate_qos().is_none(),
1075            "single-edge timeout should not produce intermediate measurement"
1076        );
1077
1078        Ok(())
1079    }
1080}