1use hopr_api::graph::{
2 EdgeImmediateProtocolObservable, EdgeLinkObservable,
3 traits::{
4 EdgeNetworkObservableRead, EdgeObservableRead, EdgeObservableWrite, EdgeProtocolObservable,
5 EdgeTransportMeasurement, EdgeWeightType,
6 },
7};
8use hopr_statistics::ExponentialMovingAverage;
9
10#[derive(Debug, Copy, Clone, Default, PartialEq)]
12pub struct TransportLinkMeasurement {
13 latency_average: ExponentialMovingAverage<3>,
14 probe_success_rate: ExponentialMovingAverage<5>,
15}
16
17impl EdgeLinkObservable for TransportLinkMeasurement {
18 fn record(&mut self, measurement: EdgeTransportMeasurement) {
19 if let Ok(latency) = measurement {
20 self.latency_average.update(latency.as_millis() as f64);
21 self.probe_success_rate.update(1.0);
22 } else {
23 self.probe_success_rate.update(0.0);
24 }
25 }
26
27 fn average_latency(&self) -> Option<std::time::Duration> {
28 if self.latency_average.get() <= 0.0 {
29 None
30 } else {
31 Some(std::time::Duration::from_millis(self.latency_average.get() as u64))
32 }
33 }
34
35 fn average_probe_rate(&self) -> f64 {
36 self.probe_success_rate.get()
37 }
38
39 fn score(&self) -> f64 {
40 self.average_probe_rate() * latency_score(self.average_latency())
41 }
42}
43
44fn latency_score(latency: Option<std::time::Duration>) -> f64 {
49 if let Some(latency) = latency {
50 match latency.as_millis() {
51 0..=75 => 1.0,
52 76..=125 => 0.7,
53 126..=200 => 0.3,
54 _ => 0.15,
55 }
56 } else {
57 0.05
58 }
59}
60
61#[derive(Debug, Copy, Clone, Default, PartialEq)]
63pub struct Observations {
64 last_update: std::time::Duration,
65 immediate_probe: Option<TransportImmediates>,
66 intermediate_probe: Option<TransportIntermediates>,
67}
68
69impl EdgeObservableWrite for Observations {
70 #[tracing::instrument(level = "trace", skip(self), name = "record_observation")]
71 fn record(&mut self, measurement: EdgeWeightType) {
72 self.last_update = std::time::SystemTime::now()
73 .duration_since(std::time::UNIX_EPOCH)
74 .unwrap_or_default();
75
76 match measurement {
77 EdgeWeightType::Immediate(result) => self.immediate_probe.get_or_insert_default().record(result),
78 EdgeWeightType::Intermediate(result) => self.intermediate_probe.get_or_insert_default().record(result),
79 EdgeWeightType::Capacity(capacity) => self.intermediate_probe.get_or_insert_default().capacity = capacity,
80 EdgeWeightType::Connected(is_connected) => {
81 self.immediate_probe.get_or_insert_default().is_connected = is_connected
82 }
83 EdgeWeightType::ImmediateProtocolConformance { num_packets, num_acks } => {
84 let imm = self.immediate_probe.get_or_insert_default();
85 imm.messages_sent += num_packets;
86 imm.acks_received += num_acks;
87 }
88 }
89 }
90}
91
92#[derive(Debug, Copy, Clone, Default, PartialEq)]
93pub struct TransportImmediates {
94 link: TransportLinkMeasurement,
95 is_connected: bool,
96 messages_sent: u64,
97 acks_received: u64,
98}
99
100impl EdgeNetworkObservableRead for TransportImmediates {
101 fn is_connected(&self) -> bool {
102 self.is_connected
103 }
104}
105
106impl EdgeImmediateProtocolObservable for TransportImmediates {
107 fn ack_rate(&self) -> Option<f64> {
108 if self.messages_sent == 0 {
109 None
110 } else {
111 Some(self.acks_received as f64 / self.messages_sent as f64)
112 }
113 }
114}
115
116impl EdgeLinkObservable for TransportImmediates {
117 fn record(&mut self, measurement: EdgeTransportMeasurement) {
118 self.link.record(measurement)
119 }
120
121 fn average_latency(&self) -> Option<std::time::Duration> {
122 self.link.average_latency()
123 }
124
125 fn average_probe_rate(&self) -> f64 {
126 self.link.average_probe_rate()
127 }
128
129 fn score(&self) -> f64 {
130 self.link.score()
131 }
132}
133
134#[derive(Debug, Copy, Clone, Default, PartialEq)]
135pub struct TransportIntermediates {
136 link: TransportLinkMeasurement,
137 capacity: Option<u128>,
138}
139
140impl EdgeProtocolObservable for TransportIntermediates {
141 fn capacity(&self) -> Option<u128> {
142 self.capacity
143 }
144}
145
146impl EdgeLinkObservable for TransportIntermediates {
147 fn record(&mut self, measurement: EdgeTransportMeasurement) {
148 self.link.record(measurement);
149 }
150
151 fn average_latency(&self) -> Option<std::time::Duration> {
152 self.link.average_latency()
153 }
154
155 fn average_probe_rate(&self) -> f64 {
156 self.link.average_probe_rate()
157 }
158
159 fn score(&self) -> f64 {
160 self.link.score()
161 }
162}
163
164impl EdgeObservableRead for Observations {
165 type ImmediateMeasurement = TransportImmediates;
166 type IntermediateMeasurement = TransportIntermediates;
167
168 #[inline]
169 fn last_update(&self) -> std::time::Duration {
170 self.last_update
171 }
172
173 fn immediate_qos(&self) -> Option<&Self::ImmediateMeasurement> {
174 self.immediate_probe.as_ref()
175 }
176
177 fn intermediate_qos(&self) -> Option<&Self::IntermediateMeasurement> {
178 self.intermediate_probe.as_ref()
179 }
180
181 fn score(&self) -> f64 {
187 match (&self.immediate_probe, &self.intermediate_probe) {
188 (Some(imm), Some(inter)) => (imm.score() + inter.score()) / 2.0,
189 (None, Some(inter)) => inter.score(),
190 (Some(imm), None) => imm.score(),
191 (None, None) => 0.0,
192 }
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use anyhow::Context;
199 use assertables::{assert_gt, assert_in_delta, assert_lt};
200
201 use super::*;
202
203 #[test]
204 fn observations_should_update_the_timestamp_on_latency_update() {
205 let mut observation = Observations::default();
206
207 assert_eq!(observation.last_update, std::time::Duration::default());
208
209 observation.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
210
211 std::thread::sleep(std::time::Duration::from_millis(10));
212
213 let after = std::time::SystemTime::now()
214 .duration_since(std::time::UNIX_EPOCH)
215 .unwrap_or_default();
216
217 assert_gt!(observation.last_update, std::time::Duration::default());
218 assert_lt!(observation.last_update, after);
219 }
220
221 #[test]
222 fn observations_should_store_an_average_latency_value_after_multiple_updates() -> anyhow::Result<()> {
223 let big_latency = std::time::Duration::from_millis(300);
224 let small_latency = std::time::Duration::from_millis(10);
225
226 let mut observation = Observations::default();
227
228 for _ in 0..10 {
229 observation.record(EdgeWeightType::Immediate(Ok(small_latency)));
230 }
231
232 assert_eq!(
233 observation
234 .immediate_qos()
235 .ok_or_else(|| anyhow::anyhow!("should contain a value"))?
236 .average_latency()
237 .context("should contain a value")?,
238 small_latency
239 );
240
241 observation.record(EdgeWeightType::Immediate(Ok(big_latency)));
242
243 assert_gt!(
244 observation
245 .immediate_qos()
246 .ok_or_else(|| anyhow::anyhow!("should contain a value"))?
247 .average_latency()
248 .context("should contain a value")?,
249 small_latency
250 );
251 assert_lt!(
252 observation
253 .immediate_qos()
254 .ok_or_else(|| anyhow::anyhow!("should contain a value"))?
255 .average_latency()
256 .context("should contain a value")?,
257 big_latency
258 );
259
260 Ok(())
261 }
262
263 #[test]
264 fn ack_rate_should_be_none_when_no_messages_sent() -> anyhow::Result<()> {
265 let mut observation = Observations::default();
266 observation.record(EdgeWeightType::Connected(true));
267
268 let imm = observation.immediate_qos().context("should have immediate QoS")?;
269 assert_eq!(imm.ack_rate(), None);
270 Ok(())
271 }
272
273 #[test]
274 fn ack_rate_should_be_one_when_all_messages_acked() -> anyhow::Result<()> {
275 let mut observation = Observations::default();
276 observation.record(EdgeWeightType::ImmediateProtocolConformance {
277 num_packets: 10,
278 num_acks: 10,
279 });
280
281 let imm = observation.immediate_qos().context("should have immediate QoS")?;
282 assert_eq!(imm.ack_rate(), Some(1.0));
283 Ok(())
284 }
285
286 #[test]
287 fn ack_rate_should_reflect_partial_acknowledgment() -> anyhow::Result<()> {
288 let mut observation = Observations::default();
289 observation.record(EdgeWeightType::ImmediateProtocolConformance {
290 num_packets: 10,
291 num_acks: 7,
292 });
293
294 let imm = observation.immediate_qos().context("should have immediate QoS")?;
295 let rate = imm.ack_rate().context("should have ack rate")?;
296 assert_in_delta!(rate, 0.7, 0.001);
297 Ok(())
298 }
299
300 #[test]
301 fn ack_rate_should_accumulate_across_multiple_records() -> anyhow::Result<()> {
302 let mut observation = Observations::default();
303 observation.record(EdgeWeightType::ImmediateProtocolConformance {
304 num_packets: 5,
305 num_acks: 5,
306 });
307 observation.record(EdgeWeightType::ImmediateProtocolConformance {
308 num_packets: 5,
309 num_acks: 0,
310 });
311
312 let imm = observation.immediate_qos().context("should have immediate QoS")?;
313 let rate = imm.ack_rate().context("should have ack rate")?;
314 assert_in_delta!(rate, 0.5, 0.001);
315 Ok(())
316 }
317
318 #[test]
319 fn observations_should_store_the_averaged_success_rate_of_the_probes() {
320 let small_latency = std::time::Duration::from_millis(10);
321
322 let mut observation = Observations::default();
323
324 for i in 0..10 {
325 if i % 2 == 0 {
326 observation.record(EdgeWeightType::Immediate(Err(())));
327 } else {
328 observation.record(EdgeWeightType::Immediate(Ok(small_latency)));
329 }
330 }
331
332 assert_in_delta!(observation.score(), 0.5, 0.05);
333 }
334
335 #[test]
336 fn score_should_average_immediate_and_intermediate_when_both_present() {
337 let mut observation = Observations::default();
338
339 observation.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
341
342 observation.record(EdgeWeightType::Capacity(Some(100)));
344
345 let imm_score = observation.immediate_qos().unwrap().score();
346 let inter_score = observation.intermediate_qos().unwrap().score();
347
348 assert_gt!(imm_score, 0.0, "immediate score should be positive");
349 assert_eq!(
350 inter_score, 0.0,
351 "intermediate score should be zero (no loopback probes)"
352 );
353
354 let combined = observation.score();
356 assert_gt!(combined, 0.0, "combined score must not be masked by empty intermediate");
357 assert_in_delta!(combined, imm_score / 2.0, 0.001);
358 }
359
360 #[test]
361 fn score_should_use_intermediate_only_when_no_immediate() {
362 let mut observation = Observations::default();
363 observation.record(EdgeWeightType::Intermediate(Ok(std::time::Duration::from_millis(80))));
365 observation.record(EdgeWeightType::Capacity(Some(500)));
366
367 assert!(observation.immediate_qos().is_none());
368 let inter_score = observation.intermediate_qos().unwrap().score();
369 assert_gt!(inter_score, 0.0, "intermediate score should be positive");
370 assert_in_delta!(observation.score(), inter_score, 0.001);
371 }
372
373 #[test]
374 fn score_should_use_immediate_only_when_no_intermediate() {
375 let mut observation = Observations::default();
376 observation.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
377
378 let imm_score = observation.immediate_qos().unwrap().score();
379 assert!(observation.intermediate_qos().is_none());
380 assert_in_delta!(observation.score(), imm_score, 0.001);
381 }
382}