hopr_transport_network/
observation.rs

1use hopr_api::network::Observable;
2use hopr_statistics::ExponentialMovingAverage;
3
4/// Observations related to a specific peer in the network.
5#[derive(Debug, Copy, Clone, Default, PartialEq)]
6pub struct Observations {
7    pub msg_sent: u64,
8    pub ack_received: u64,
9    last_update: std::time::Duration,
10    latency_average: ExponentialMovingAverage<3>,
11    probe_success_rate: ExponentialMovingAverage<5>,
12}
13
14impl Observable for Observations {
15    fn record_probe(&mut self, latency: std::result::Result<std::time::Duration, ()>) {
16        self.last_update = std::time::SystemTime::now()
17            .duration_since(std::time::UNIX_EPOCH)
18            .unwrap_or_default();
19
20        if let Ok(latency) = latency {
21            self.latency_average.update(latency.as_millis() as f64);
22            self.probe_success_rate.update(1.0);
23        } else {
24            self.probe_success_rate.update(0.0);
25        }
26    }
27
28    #[inline]
29    fn last_update(&self) -> std::time::Duration {
30        self.last_update
31    }
32
33    fn average_latency(&self) -> Option<std::time::Duration> {
34        if self.latency_average.get() <= 0.0 {
35            None
36        } else {
37            Some(std::time::Duration::from_millis(self.latency_average.get() as u64))
38        }
39    }
40
41    fn average_probe_rate(&self) -> f64 {
42        self.probe_success_rate.get()
43    }
44
45    fn score(&self) -> f64 {
46        self.probe_success_rate.get()
47    }
48}
49
50#[cfg(test)]
51mod tests {
52    use anyhow::Context;
53    use assertables::{assert_gt, assert_in_delta, assert_lt};
54
55    use super::*;
56
57    #[test]
58    fn observations_should_update_the_timestamp_on_latency_update() {
59        let mut observation = Observations::default();
60
61        assert_eq!(observation.last_update, std::time::Duration::default());
62
63        observation.record_probe(Ok(std::time::Duration::from_millis(50)));
64
65        std::thread::sleep(std::time::Duration::from_millis(10));
66
67        let after = std::time::SystemTime::now()
68            .duration_since(std::time::UNIX_EPOCH)
69            .unwrap_or_default();
70
71        assert_gt!(observation.last_update, std::time::Duration::default());
72        assert_lt!(observation.last_update, after);
73    }
74
75    #[test]
76    fn observations_should_store_an_average_latency_value_after_multiple_updates() -> anyhow::Result<()> {
77        let big_latency = std::time::Duration::from_millis(300);
78        let small_latency = std::time::Duration::from_millis(10);
79
80        let mut observation = Observations::default();
81
82        for _ in 0..10 {
83            observation.record_probe(Ok(small_latency));
84        }
85
86        assert_eq!(
87            observation.average_latency().context("should contain a value")?,
88            small_latency
89        );
90
91        observation.record_probe(Ok(big_latency));
92
93        assert_gt!(
94            observation.average_latency().context("should contain a value")?,
95            small_latency
96        );
97        assert_lt!(
98            observation.average_latency().context("should contain a value")?,
99            big_latency
100        );
101
102        Ok(())
103    }
104
105    #[test]
106    fn observations_should_store_the_averaged_success_rate_of_the_probes() {
107        let small_latency = std::time::Duration::from_millis(10);
108
109        let mut observation = Observations::default();
110
111        for i in 0..10 {
112            if i % 2 == 0 {
113                observation.record_probe(Err(()));
114            } else {
115                observation.record_probe(Ok(small_latency));
116            }
117        }
118
119        assert_in_delta!(observation.score(), 0.5, 0.05);
120    }
121}