hopr_transport_network/
observation.rs1use hopr_api::network::Observable;
2use hopr_statistics::ExponentialMovingAverage;
3
4#[derive(Debug, Copy, Clone, Default, PartialEq)]
6pub struct Observations {
7 pub msg_sent: u64,
8 pub ack_received: u64,
9 last_update: std::time::Duration,
10 latency_average: ExponentialMovingAverage<3>,
11 probe_success_rate: ExponentialMovingAverage<5>,
12}
13
14impl Observable for Observations {
15 fn record_probe(&mut self, latency: std::result::Result<std::time::Duration, ()>) {
16 self.last_update = std::time::SystemTime::now()
17 .duration_since(std::time::UNIX_EPOCH)
18 .unwrap_or_default();
19
20 if let Ok(latency) = latency {
21 self.latency_average.update(latency.as_millis() as f64);
22 self.probe_success_rate.update(1.0);
23 } else {
24 self.probe_success_rate.update(0.0);
25 }
26 }
27
28 #[inline]
29 fn last_update(&self) -> std::time::Duration {
30 self.last_update
31 }
32
33 fn average_latency(&self) -> Option<std::time::Duration> {
34 if self.latency_average.get() <= 0.0 {
35 None
36 } else {
37 Some(std::time::Duration::from_millis(self.latency_average.get() as u64))
38 }
39 }
40
41 fn average_probe_rate(&self) -> f64 {
42 self.probe_success_rate.get()
43 }
44
45 fn score(&self) -> f64 {
46 self.probe_success_rate.get()
47 }
48}
49
50#[cfg(test)]
51mod tests {
52 use anyhow::Context;
53 use assertables::{assert_gt, assert_in_delta, assert_lt};
54
55 use super::*;
56
57 #[test]
58 fn observations_should_update_the_timestamp_on_latency_update() {
59 let mut observation = Observations::default();
60
61 assert_eq!(observation.last_update, std::time::Duration::default());
62
63 observation.record_probe(Ok(std::time::Duration::from_millis(50)));
64
65 std::thread::sleep(std::time::Duration::from_millis(10));
66
67 let after = std::time::SystemTime::now()
68 .duration_since(std::time::UNIX_EPOCH)
69 .unwrap_or_default();
70
71 assert_gt!(observation.last_update, std::time::Duration::default());
72 assert_lt!(observation.last_update, after);
73 }
74
75 #[test]
76 fn observations_should_store_an_average_latency_value_after_multiple_updates() -> anyhow::Result<()> {
77 let big_latency = std::time::Duration::from_millis(300);
78 let small_latency = std::time::Duration::from_millis(10);
79
80 let mut observation = Observations::default();
81
82 for _ in 0..10 {
83 observation.record_probe(Ok(small_latency));
84 }
85
86 assert_eq!(
87 observation.average_latency().context("should contain a value")?,
88 small_latency
89 );
90
91 observation.record_probe(Ok(big_latency));
92
93 assert_gt!(
94 observation.average_latency().context("should contain a value")?,
95 small_latency
96 );
97 assert_lt!(
98 observation.average_latency().context("should contain a value")?,
99 big_latency
100 );
101
102 Ok(())
103 }
104
105 #[test]
106 fn observations_should_store_the_averaged_success_rate_of_the_probes() {
107 let small_latency = std::time::Duration::from_millis(10);
108
109 let mut observation = Observations::default();
110
111 for i in 0..10 {
112 if i % 2 == 0 {
113 observation.record_probe(Err(()));
114 } else {
115 observation.record_probe(Ok(small_latency));
116 }
117 }
118
119 assert_in_delta!(observation.score(), 0.5, 0.05);
120 }
121}