Skip to main content

hopr_transport_protocol/
counters.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicU64, Ordering},
4};
5
6use dashmap::DashMap;
7use hopr_api::types::crypto::types::OffchainPublicKey;
8
9/// Minimal atomic counters for per-peer protocol conformance tracking.
10///
11/// Tracks the number of messages sent and acknowledgments received for a
12/// single peer. All operations are lock-free using relaxed atomic ordering.
13#[derive(Debug, Default)]
14pub struct PeerProtocolCounters {
15    messages_sent: AtomicU64,
16    acks_received: AtomicU64,
17}
18
19impl PeerProtocolCounters {
20    /// Record that a message was sent to this peer.
21    #[inline]
22    pub fn record_message_sent(&self) {
23        self.messages_sent.fetch_add(1, Ordering::Relaxed);
24    }
25
26    /// Record that an acknowledgment was received from this peer.
27    #[inline]
28    pub fn record_ack_received(&self) {
29        self.acks_received.fetch_add(1, Ordering::Relaxed);
30    }
31
32    /// Record that `count` acknowledgments were received from this peer in a batch.
33    #[inline]
34    pub fn record_acks_received(&self, count: u64) {
35        self.acks_received.fetch_add(count, Ordering::Relaxed);
36    }
37
38    /// Swap both counters to 0, returning accumulated values.
39    ///
40    /// Each counter is swapped independently via its own atomic operation;
41    /// there is no single atomic snapshot of the pair.
42    pub fn take(&self) -> (u64, u64) {
43        (
44            self.messages_sent.swap(0, Ordering::Relaxed),
45            self.acks_received.swap(0, Ordering::Relaxed),
46        )
47    }
48}
49
50/// Thread-safe registry of per-peer protocol conformance counters.
51///
52/// Keyed by [`OffchainPublicKey`] — no PeerId conversion needed since the
53/// protocol pipeline already operates on offchain keys.
54#[derive(Debug, Default, Clone)]
55pub struct PeerProtocolCounterRegistry {
56    inner: Arc<DashMap<OffchainPublicKey, Arc<PeerProtocolCounters>>>,
57}
58
59impl PeerProtocolCounterRegistry {
60    /// Get or create counters for the given peer.
61    pub fn get_or_create(&self, peer: &OffchainPublicKey) -> Arc<PeerProtocolCounters> {
62        self.inner
63            .entry(*peer)
64            .or_insert_with(|| Arc::new(PeerProtocolCounters::default()))
65            .value()
66            .clone()
67    }
68
69    /// Swap all counters to 0, returning `(peer, msgs_sent, acks_received)` for non-zero entries.
70    pub fn drain(&self) -> Vec<(OffchainPublicKey, u64, u64)> {
71        self.inner
72            .iter()
73            .filter_map(|entry| {
74                let (sent, received) = entry.value().take();
75                if sent > 0 || received > 0 {
76                    Some((*entry.key(), sent, received))
77                } else {
78                    None
79                }
80            })
81            .collect()
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use anyhow::Context;
88    use hopr_api::types::crypto::prelude::{Keypair, OffchainKeypair};
89
90    use super::*;
91
92    #[test]
93    fn counters_should_start_at_zero() {
94        let counters = PeerProtocolCounters::default();
95        let (sent, received) = counters.take();
96
97        assert_eq!(sent, 0);
98        assert_eq!(received, 0);
99    }
100
101    #[test]
102    fn counters_should_record_messages_sent() {
103        let counters = PeerProtocolCounters::default();
104
105        counters.record_message_sent();
106        counters.record_message_sent();
107        counters.record_message_sent();
108
109        let (sent, received) = counters.take();
110        assert_eq!(sent, 3);
111        assert_eq!(received, 0);
112    }
113
114    #[test]
115    fn counters_should_record_acks_received() {
116        let counters = PeerProtocolCounters::default();
117
118        counters.record_ack_received();
119        counters.record_ack_received();
120
121        let (sent, received) = counters.take();
122        assert_eq!(sent, 0);
123        assert_eq!(received, 2);
124    }
125
126    #[test]
127    fn take_should_reset_counters_to_zero() {
128        let counters = PeerProtocolCounters::default();
129
130        counters.record_message_sent();
131        counters.record_ack_received();
132
133        let (sent1, received1) = counters.take();
134        assert_eq!(sent1, 1);
135        assert_eq!(received1, 1);
136
137        let (sent2, received2) = counters.take();
138        assert_eq!(sent2, 0);
139        assert_eq!(received2, 0);
140    }
141
142    #[test]
143    fn registry_should_create_and_retrieve_counters() -> anyhow::Result<()> {
144        let registry = PeerProtocolCounterRegistry::default();
145        let peer = *OffchainKeypair::random().public();
146
147        let counters = registry.get_or_create(&peer);
148        counters.record_message_sent();
149
150        let same_counters = registry.get_or_create(&peer);
151        same_counters.record_message_sent();
152
153        let (sent, _) = counters.take();
154        assert_eq!(sent, 2, "both calls should share the same counter instance");
155        Ok(())
156    }
157
158    #[test]
159    fn drain_should_return_only_nonzero_entries() -> anyhow::Result<()> {
160        let registry = PeerProtocolCounterRegistry::default();
161        let peer_a = *OffchainKeypair::random().public();
162        let peer_b = *OffchainKeypair::random().public();
163        let peer_c = *OffchainKeypair::random().public();
164
165        registry.get_or_create(&peer_a).record_message_sent();
166        registry.get_or_create(&peer_b); // no activity
167        registry.get_or_create(&peer_c).record_ack_received();
168
169        let drained = registry.drain();
170        assert_eq!(drained.len(), 2, "only peers with non-zero counters should be drained");
171
172        let a_entry = drained
173            .iter()
174            .find(|(p, ..)| *p == peer_a)
175            .context("peer_a should be in drain results")?;
176        assert_eq!(a_entry.1, 1);
177        assert_eq!(a_entry.2, 0);
178
179        let c_entry = drained
180            .iter()
181            .find(|(p, ..)| *p == peer_c)
182            .context("peer_c should be in drain results")?;
183        assert_eq!(c_entry.1, 0);
184        assert_eq!(c_entry.2, 1);
185
186        Ok(())
187    }
188
189    #[test]
190    fn drain_should_reset_counters() {
191        let registry = PeerProtocolCounterRegistry::default();
192        let peer = *OffchainKeypair::random().public();
193
194        registry.get_or_create(&peer).record_message_sent();
195
196        let first_drain = registry.drain();
197        assert_eq!(first_drain.len(), 1);
198
199        let second_drain = registry.drain();
200        assert!(second_drain.is_empty(), "counters should be zero after drain");
201    }
202}