Skip to main content

hopr_protocol_session/socket/
telemetry.rs

1pub use crate::protocol::SessionMessageDiscriminants;
2
3/// Used to track various statistics of a [`SessionSocket`](crate::SessionSocket).
4#[auto_impl::auto_impl(&, Arc)]
5pub trait SessionTelemetryTracker {
6    /// Records a frame that has been emitted from the Sequencer.
7    fn frame_emitted(&self);
8    /// Records a frame that has successfully reassembled by the Reassembler.
9    fn frame_completed(&self);
10    /// Records a frame that has been discarded due to timeout or other errors.
11    fn frame_discarded(&self);
12    /// Records an incomplete frame that could not be reassembled.
13    fn incomplete_frame(&self);
14    /// Records an incoming Session message.
15    fn incoming_message(&self, msg: SessionMessageDiscriminants);
16    /// Records an outgoing Session message.
17    fn outgoing_message(&self, msg: SessionMessageDiscriminants);
18    /// Records an error that occurred during processing of a Session packet.
19    fn error(&self);
20}
21
22/// Session socket statistics tracker that does nothing.
23#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
24pub struct NoopTracker;
25
26impl SessionTelemetryTracker for NoopTracker {
27    fn frame_emitted(&self) {}
28
29    fn frame_completed(&self) {}
30
31    fn frame_discarded(&self) {}
32
33    fn incomplete_frame(&self) {}
34
35    fn incoming_message(&self, _: SessionMessageDiscriminants) {}
36
37    fn outgoing_message(&self, _: SessionMessageDiscriminants) {}
38
39    fn error(&self) {}
40}
41
42#[cfg(test)]
43pub mod tests {
44    use parking_lot::Mutex;
45    use serde::{Deserialize, Serialize};
46
47    use super::*;
48
49    /// Statistics tracker that records all events in a map and is possible to serialize (e.g.: for snapshot testing)
50    #[derive(Debug, Clone)]
51    pub struct TestTelemetryTracker(std::sync::Arc<Mutex<indexmap::IndexMap<String, usize>>>);
52
53    impl Serialize for TestTelemetryTracker {
54        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
55        where
56            S: serde::Serializer,
57        {
58            self.0.lock().serialize(serializer)
59        }
60    }
61
62    impl<'de> Deserialize<'de> for TestTelemetryTracker {
63        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
64        where
65            D: serde::Deserializer<'de>,
66        {
67            let map = indexmap::IndexMap::deserialize(deserializer)?;
68            Ok(Self(std::sync::Arc::new(Mutex::new(map))))
69        }
70    }
71
72    impl Default for TestTelemetryTracker {
73        fn default() -> Self {
74            let mut map = indexmap::IndexMap::new();
75            map.insert("frames_emitted".into(), 0);
76            map.insert("frames_completed".into(), 0);
77            map.insert("frames_discarded".into(), 0);
78            map.insert("incomplete_frames".into(), 0);
79            map.insert("errors".into(), 0);
80            map.insert("incoming_messages_Segment".into(), 0);
81            map.insert("incoming_messages_Request".into(), 0);
82            map.insert("incoming_messages_Acknowledge".into(), 0);
83            map.insert("outgoing_messages_Segment".into(), 0);
84            map.insert("outgoing_messages_Request".into(), 0);
85            map.insert("outgoing_messages_Acknowledge".into(), 0);
86
87            Self(std::sync::Arc::new(Mutex::new(map)))
88        }
89    }
90
91    impl TestTelemetryTracker {
92        fn increment(&self, key: &str) {
93            let mut map = self.0.lock();
94            *map.entry(key.to_string()).or_insert(0) += 1;
95        }
96    }
97
98    impl SessionTelemetryTracker for TestTelemetryTracker {
99        fn frame_emitted(&self) {
100            self.increment("frames_emitted");
101        }
102
103        fn frame_completed(&self) {
104            self.increment("frames_completed");
105        }
106
107        fn frame_discarded(&self) {
108            self.increment("frames_discarded");
109        }
110
111        fn incomplete_frame(&self) {
112            self.increment("incomplete_frames");
113        }
114
115        fn incoming_message(&self, msg: SessionMessageDiscriminants) {
116            self.increment(&format!("incoming_messages_{:?}", msg));
117        }
118
119        fn outgoing_message(&self, msg: SessionMessageDiscriminants) {
120            self.increment(&format!("outgoing_messages_{:?}", msg));
121        }
122
123        fn error(&self) {
124            self.increment("errors");
125        }
126    }
127}