hopr_transport_protocol/
counters.rs1use std::sync::{
2 Arc,
3 atomic::{AtomicU64, Ordering},
4};
5
6use dashmap::DashMap;
7use hopr_api::types::crypto::types::OffchainPublicKey;
8
9#[derive(Debug, Default)]
14pub struct PeerProtocolCounters {
15 messages_sent: AtomicU64,
16 acks_received: AtomicU64,
17}
18
19impl PeerProtocolCounters {
20 #[inline]
22 pub fn record_message_sent(&self) {
23 self.messages_sent.fetch_add(1, Ordering::Relaxed);
24 }
25
26 #[inline]
28 pub fn record_ack_received(&self) {
29 self.acks_received.fetch_add(1, Ordering::Relaxed);
30 }
31
32 #[inline]
34 pub fn record_acks_received(&self, count: u64) {
35 self.acks_received.fetch_add(count, Ordering::Relaxed);
36 }
37
38 pub fn take(&self) -> (u64, u64) {
43 (
44 self.messages_sent.swap(0, Ordering::Relaxed),
45 self.acks_received.swap(0, Ordering::Relaxed),
46 )
47 }
48}
49
50#[derive(Debug, Default, Clone)]
55pub struct PeerProtocolCounterRegistry {
56 inner: Arc<DashMap<OffchainPublicKey, Arc<PeerProtocolCounters>>>,
57}
58
59impl PeerProtocolCounterRegistry {
60 pub fn get_or_create(&self, peer: &OffchainPublicKey) -> Arc<PeerProtocolCounters> {
62 self.inner
63 .entry(*peer)
64 .or_insert_with(|| Arc::new(PeerProtocolCounters::default()))
65 .value()
66 .clone()
67 }
68
69 pub fn drain(&self) -> Vec<(OffchainPublicKey, u64, u64)> {
71 self.inner
72 .iter()
73 .filter_map(|entry| {
74 let (sent, received) = entry.value().take();
75 if sent > 0 || received > 0 {
76 Some((*entry.key(), sent, received))
77 } else {
78 None
79 }
80 })
81 .collect()
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use anyhow::Context;
88 use hopr_api::types::crypto::prelude::{Keypair, OffchainKeypair};
89
90 use super::*;
91
92 #[test]
93 fn counters_should_start_at_zero() {
94 let counters = PeerProtocolCounters::default();
95 let (sent, received) = counters.take();
96
97 assert_eq!(sent, 0);
98 assert_eq!(received, 0);
99 }
100
101 #[test]
102 fn counters_should_record_messages_sent() {
103 let counters = PeerProtocolCounters::default();
104
105 counters.record_message_sent();
106 counters.record_message_sent();
107 counters.record_message_sent();
108
109 let (sent, received) = counters.take();
110 assert_eq!(sent, 3);
111 assert_eq!(received, 0);
112 }
113
114 #[test]
115 fn counters_should_record_acks_received() {
116 let counters = PeerProtocolCounters::default();
117
118 counters.record_ack_received();
119 counters.record_ack_received();
120
121 let (sent, received) = counters.take();
122 assert_eq!(sent, 0);
123 assert_eq!(received, 2);
124 }
125
126 #[test]
127 fn take_should_reset_counters_to_zero() {
128 let counters = PeerProtocolCounters::default();
129
130 counters.record_message_sent();
131 counters.record_ack_received();
132
133 let (sent1, received1) = counters.take();
134 assert_eq!(sent1, 1);
135 assert_eq!(received1, 1);
136
137 let (sent2, received2) = counters.take();
138 assert_eq!(sent2, 0);
139 assert_eq!(received2, 0);
140 }
141
142 #[test]
143 fn registry_should_create_and_retrieve_counters() -> anyhow::Result<()> {
144 let registry = PeerProtocolCounterRegistry::default();
145 let peer = *OffchainKeypair::random().public();
146
147 let counters = registry.get_or_create(&peer);
148 counters.record_message_sent();
149
150 let same_counters = registry.get_or_create(&peer);
151 same_counters.record_message_sent();
152
153 let (sent, _) = counters.take();
154 assert_eq!(sent, 2, "both calls should share the same counter instance");
155 Ok(())
156 }
157
158 #[test]
159 fn drain_should_return_only_nonzero_entries() -> anyhow::Result<()> {
160 let registry = PeerProtocolCounterRegistry::default();
161 let peer_a = *OffchainKeypair::random().public();
162 let peer_b = *OffchainKeypair::random().public();
163 let peer_c = *OffchainKeypair::random().public();
164
165 registry.get_or_create(&peer_a).record_message_sent();
166 registry.get_or_create(&peer_b); registry.get_or_create(&peer_c).record_ack_received();
168
169 let drained = registry.drain();
170 assert_eq!(drained.len(), 2, "only peers with non-zero counters should be drained");
171
172 let a_entry = drained
173 .iter()
174 .find(|(p, ..)| *p == peer_a)
175 .context("peer_a should be in drain results")?;
176 assert_eq!(a_entry.1, 1);
177 assert_eq!(a_entry.2, 0);
178
179 let c_entry = drained
180 .iter()
181 .find(|(p, ..)| *p == peer_c)
182 .context("peer_c should be in drain results")?;
183 assert_eq!(c_entry.1, 0);
184 assert_eq!(c_entry.2, 1);
185
186 Ok(())
187 }
188
189 #[test]
190 fn drain_should_reset_counters() {
191 let registry = PeerProtocolCounterRegistry::default();
192 let peer = *OffchainKeypair::random().public();
193
194 registry.get_or_create(&peer).record_message_sent();
195
196 let first_drain = registry.drain();
197 assert_eq!(first_drain.len(), 1);
198
199 let second_drain = registry.drain();
200 assert!(second_drain.is_empty(), "counters should be zero after drain");
201 }
202}