Skip to main content

hopr_network_graph/
weight.rs

1use hopr_api::graph::{
2    EdgeImmediateProtocolObservable, EdgeLinkObservable,
3    traits::{
4        EdgeNetworkObservableRead, EdgeObservableRead, EdgeObservableWrite, EdgeProtocolObservable,
5        EdgeTransportMeasurement, EdgeWeightType,
6    },
7};
8use hopr_statistics::ExponentialMovingAverage;
9
10/// A representation of a individual neighbor link measurement
11#[derive(Debug, Copy, Clone, Default, PartialEq)]
12pub struct TransportLinkMeasurement {
13    latency_average: ExponentialMovingAverage<3>,
14    probe_success_rate: ExponentialMovingAverage<5>,
15}
16
17impl EdgeLinkObservable for TransportLinkMeasurement {
18    fn record(&mut self, measurement: EdgeTransportMeasurement) {
19        if let Ok(latency) = measurement {
20            self.latency_average.update(latency.as_millis() as f64);
21            self.probe_success_rate.update(1.0);
22        } else {
23            self.probe_success_rate.update(0.0);
24        }
25    }
26
27    fn average_latency(&self) -> Option<std::time::Duration> {
28        if self.latency_average.get() <= 0.0 {
29            None
30        } else {
31            Some(std::time::Duration::from_millis(self.latency_average.get() as u64))
32        }
33    }
34
35    fn average_probe_rate(&self) -> f64 {
36        self.probe_success_rate.get()
37    }
38
39    fn score(&self) -> f64 {
40        self.average_probe_rate() * latency_score(self.average_latency())
41    }
42}
43
44/// Aid in calculation of the overall transport link score.
45///
46/// The smaller the latency over the channel, the more useful the link might
47/// be for routing complext traffic.
48fn latency_score(latency: Option<std::time::Duration>) -> f64 {
49    if let Some(latency) = latency {
50        match latency.as_millis() {
51            0..=75 => 1.0,
52            76..=125 => 0.7,
53            126..=200 => 0.3,
54            _ => 0.15,
55        }
56    } else {
57        0.05
58    }
59}
60
61/// Observations related to a specific peer in the network.
62#[derive(Debug, Copy, Clone, Default, PartialEq)]
63pub struct Observations {
64    last_update: std::time::Duration,
65    immediate_probe: Option<TransportImmediates>,
66    intermediate_probe: Option<TransportIntermediates>,
67}
68
69impl EdgeObservableWrite for Observations {
70    #[tracing::instrument(level = "trace", skip(self), name = "record_observation")]
71    fn record(&mut self, measurement: EdgeWeightType) {
72        self.last_update = std::time::SystemTime::now()
73            .duration_since(std::time::UNIX_EPOCH)
74            .unwrap_or_default();
75
76        match measurement {
77            EdgeWeightType::Immediate(result) => self.immediate_probe.get_or_insert_default().record(result),
78            EdgeWeightType::Intermediate(result) => self.intermediate_probe.get_or_insert_default().record(result),
79            EdgeWeightType::Capacity(capacity) => self.intermediate_probe.get_or_insert_default().capacity = capacity,
80            EdgeWeightType::Connected(is_connected) => {
81                self.immediate_probe.get_or_insert_default().is_connected = is_connected
82            }
83            EdgeWeightType::ImmediateProtocolConformance { num_packets, num_acks } => {
84                let imm = self.immediate_probe.get_or_insert_default();
85                imm.messages_sent += num_packets;
86                imm.acks_received += num_acks;
87            }
88        }
89    }
90}
91
92#[derive(Debug, Copy, Clone, Default, PartialEq)]
93pub struct TransportImmediates {
94    link: TransportLinkMeasurement,
95    is_connected: bool,
96    messages_sent: u64,
97    acks_received: u64,
98}
99
100impl EdgeNetworkObservableRead for TransportImmediates {
101    fn is_connected(&self) -> bool {
102        self.is_connected
103    }
104}
105
106impl EdgeImmediateProtocolObservable for TransportImmediates {
107    fn ack_rate(&self) -> Option<f64> {
108        if self.messages_sent == 0 {
109            None
110        } else {
111            Some(self.acks_received as f64 / self.messages_sent as f64)
112        }
113    }
114}
115
116impl EdgeLinkObservable for TransportImmediates {
117    fn record(&mut self, measurement: EdgeTransportMeasurement) {
118        self.link.record(measurement)
119    }
120
121    fn average_latency(&self) -> Option<std::time::Duration> {
122        self.link.average_latency()
123    }
124
125    fn average_probe_rate(&self) -> f64 {
126        self.link.average_probe_rate()
127    }
128
129    fn score(&self) -> f64 {
130        self.link.score()
131    }
132}
133
134#[derive(Debug, Copy, Clone, Default, PartialEq)]
135pub struct TransportIntermediates {
136    link: TransportLinkMeasurement,
137    capacity: Option<u128>,
138}
139
140impl EdgeProtocolObservable for TransportIntermediates {
141    fn capacity(&self) -> Option<u128> {
142        self.capacity
143    }
144}
145
146impl EdgeLinkObservable for TransportIntermediates {
147    fn record(&mut self, measurement: EdgeTransportMeasurement) {
148        self.link.record(measurement);
149    }
150
151    fn average_latency(&self) -> Option<std::time::Duration> {
152        self.link.average_latency()
153    }
154
155    fn average_probe_rate(&self) -> f64 {
156        self.link.average_probe_rate()
157    }
158
159    fn score(&self) -> f64 {
160        self.link.score()
161    }
162}
163
164impl EdgeObservableRead for Observations {
165    type ImmediateMeasurement = TransportImmediates;
166    type IntermediateMeasurement = TransportIntermediates;
167
168    #[inline]
169    fn last_update(&self) -> std::time::Duration {
170        self.last_update
171    }
172
173    fn immediate_qos(&self) -> Option<&Self::ImmediateMeasurement> {
174        self.immediate_probe.as_ref()
175    }
176
177    fn intermediate_qos(&self) -> Option<&Self::IntermediateMeasurement> {
178        self.intermediate_probe.as_ref()
179    }
180
181    /// The score combines immediate and intermediate observations:
182    /// - When both are present, average their scores (immediate neighbor probes prevent an empty intermediate from
183    ///   masking real measurements).
184    /// - When only intermediate is present, use it directly.
185    /// - When only immediate is present, use it directly.
186    fn score(&self) -> f64 {
187        match (&self.immediate_probe, &self.intermediate_probe) {
188            (Some(imm), Some(inter)) => (imm.score() + inter.score()) / 2.0,
189            (None, Some(inter)) => inter.score(),
190            (Some(imm), None) => imm.score(),
191            (None, None) => 0.0,
192        }
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use anyhow::Context;
199    use assertables::{assert_gt, assert_in_delta, assert_lt};
200
201    use super::*;
202
203    #[test]
204    fn observations_should_update_the_timestamp_on_latency_update() {
205        let mut observation = Observations::default();
206
207        assert_eq!(observation.last_update, std::time::Duration::default());
208
209        observation.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
210
211        std::thread::sleep(std::time::Duration::from_millis(10));
212
213        let after = std::time::SystemTime::now()
214            .duration_since(std::time::UNIX_EPOCH)
215            .unwrap_or_default();
216
217        assert_gt!(observation.last_update, std::time::Duration::default());
218        assert_lt!(observation.last_update, after);
219    }
220
221    #[test]
222    fn observations_should_store_an_average_latency_value_after_multiple_updates() -> anyhow::Result<()> {
223        let big_latency = std::time::Duration::from_millis(300);
224        let small_latency = std::time::Duration::from_millis(10);
225
226        let mut observation = Observations::default();
227
228        for _ in 0..10 {
229            observation.record(EdgeWeightType::Immediate(Ok(small_latency)));
230        }
231
232        assert_eq!(
233            observation
234                .immediate_qos()
235                .ok_or_else(|| anyhow::anyhow!("should contain a value"))?
236                .average_latency()
237                .context("should contain a value")?,
238            small_latency
239        );
240
241        observation.record(EdgeWeightType::Immediate(Ok(big_latency)));
242
243        assert_gt!(
244            observation
245                .immediate_qos()
246                .ok_or_else(|| anyhow::anyhow!("should contain a value"))?
247                .average_latency()
248                .context("should contain a value")?,
249            small_latency
250        );
251        assert_lt!(
252            observation
253                .immediate_qos()
254                .ok_or_else(|| anyhow::anyhow!("should contain a value"))?
255                .average_latency()
256                .context("should contain a value")?,
257            big_latency
258        );
259
260        Ok(())
261    }
262
263    #[test]
264    fn ack_rate_should_be_none_when_no_messages_sent() -> anyhow::Result<()> {
265        let mut observation = Observations::default();
266        observation.record(EdgeWeightType::Connected(true));
267
268        let imm = observation.immediate_qos().context("should have immediate QoS")?;
269        assert_eq!(imm.ack_rate(), None);
270        Ok(())
271    }
272
273    #[test]
274    fn ack_rate_should_be_one_when_all_messages_acked() -> anyhow::Result<()> {
275        let mut observation = Observations::default();
276        observation.record(EdgeWeightType::ImmediateProtocolConformance {
277            num_packets: 10,
278            num_acks: 10,
279        });
280
281        let imm = observation.immediate_qos().context("should have immediate QoS")?;
282        assert_eq!(imm.ack_rate(), Some(1.0));
283        Ok(())
284    }
285
286    #[test]
287    fn ack_rate_should_reflect_partial_acknowledgment() -> anyhow::Result<()> {
288        let mut observation = Observations::default();
289        observation.record(EdgeWeightType::ImmediateProtocolConformance {
290            num_packets: 10,
291            num_acks: 7,
292        });
293
294        let imm = observation.immediate_qos().context("should have immediate QoS")?;
295        let rate = imm.ack_rate().context("should have ack rate")?;
296        assert_in_delta!(rate, 0.7, 0.001);
297        Ok(())
298    }
299
300    #[test]
301    fn ack_rate_should_accumulate_across_multiple_records() -> anyhow::Result<()> {
302        let mut observation = Observations::default();
303        observation.record(EdgeWeightType::ImmediateProtocolConformance {
304            num_packets: 5,
305            num_acks: 5,
306        });
307        observation.record(EdgeWeightType::ImmediateProtocolConformance {
308            num_packets: 5,
309            num_acks: 0,
310        });
311
312        let imm = observation.immediate_qos().context("should have immediate QoS")?;
313        let rate = imm.ack_rate().context("should have ack rate")?;
314        assert_in_delta!(rate, 0.5, 0.001);
315        Ok(())
316    }
317
318    #[test]
319    fn observations_should_store_the_averaged_success_rate_of_the_probes() {
320        let small_latency = std::time::Duration::from_millis(10);
321
322        let mut observation = Observations::default();
323
324        for i in 0..10 {
325            if i % 2 == 0 {
326                observation.record(EdgeWeightType::Immediate(Err(())));
327            } else {
328                observation.record(EdgeWeightType::Immediate(Ok(small_latency)));
329            }
330        }
331
332        assert_in_delta!(observation.score(), 0.5, 0.05);
333    }
334
335    #[test]
336    fn score_should_average_immediate_and_intermediate_when_both_present() {
337        let mut observation = Observations::default();
338
339        // Record a successful immediate probe (simulates neighbor probe success)
340        observation.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
341
342        // Record on-chain capacity only (simulates channel existing but no loopback probes)
343        observation.record(EdgeWeightType::Capacity(Some(100)));
344
345        let imm_score = observation.immediate_qos().unwrap().score();
346        let inter_score = observation.intermediate_qos().unwrap().score();
347
348        assert_gt!(imm_score, 0.0, "immediate score should be positive");
349        assert_eq!(
350            inter_score, 0.0,
351            "intermediate score should be zero (no loopback probes)"
352        );
353
354        // The combined score should be the average, not zero
355        let combined = observation.score();
356        assert_gt!(combined, 0.0, "combined score must not be masked by empty intermediate");
357        assert_in_delta!(combined, imm_score / 2.0, 0.001);
358    }
359
360    #[test]
361    fn score_should_use_intermediate_only_when_no_immediate() {
362        let mut observation = Observations::default();
363        // Record a successful intermediate probe (no immediate probe recorded)
364        observation.record(EdgeWeightType::Intermediate(Ok(std::time::Duration::from_millis(80))));
365        observation.record(EdgeWeightType::Capacity(Some(500)));
366
367        assert!(observation.immediate_qos().is_none());
368        let inter_score = observation.intermediate_qos().unwrap().score();
369        assert_gt!(inter_score, 0.0, "intermediate score should be positive");
370        assert_in_delta!(observation.score(), inter_score, 0.001);
371    }
372
373    #[test]
374    fn score_should_use_immediate_only_when_no_intermediate() {
375        let mut observation = Observations::default();
376        observation.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
377
378        let imm_score = observation.immediate_qos().unwrap().score();
379        assert!(observation.intermediate_qos().is_none());
380        assert_in_delta!(observation.score(), imm_score, 0.001);
381    }
382}