Skip to main content

hopr_transport_session/
telemetry.rs

1//! Session stats tracking and snapshotting.
2//!
3//! This module provides functionality to track various stats of a HOPR session,
4//! including data throughput, packet counts, frame events, and session lifecycle.
5//! It allows creating immutable snapshots of the stats state for monitoring and reporting.
6
7use std::{
8    sync::{
9        OnceLock,
10        atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering},
11    },
12    time::{Duration, SystemTime, UNIX_EPOCH},
13};
14
15use hopr_protocol_session::{FrameInspector, SessionMessageDiscriminants};
16
17use crate::{
18    Capability, HoprSessionConfig, SessionId, balancer::AtomicSurbFlowEstimator, types::SESSION_SOCKET_CAPACITY,
19};
20
21/// The lifecycle state of a session from the perspective of metrics.
22#[derive(Clone, Copy, Debug, PartialEq, Eq, strum::FromRepr, serde::Serialize)]
23#[repr(u8)]
24pub enum SessionLifecycleState {
25    /// Session is active and running.
26    Active = 0,
27    /// Session is in the process of closing (e.g. sending/receiving close frames).
28    Closing = 1,
29    /// Session has been fully closed.
30    Closed = 2,
31}
32
33/// The acknowledgement mode configured for the session.
34#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize)]
35#[repr(u8)]
36pub enum SessionAckMode {
37    /// No acknowledgements.
38    None,
39    /// Partial acknowledgements (some frames/segments).
40    Partial,
41    /// Full acknowledgements (all frames/segments).
42    Full,
43    /// Both (if applicable, though typically maps to Full in some contexts).
44    Both,
45}
46
47/// Snapshot of session lifetime metrics.
48#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
49pub struct SessionLifetimeSnapshot {
50    /// Time when the session was created.
51    pub created_at: SystemTime,
52    /// Time of the last read or write activity.
53    pub last_activity_at: SystemTime,
54    /// Total duration the session has been alive.
55    pub uptime: Duration,
56    /// Duration since the last activity.
57    pub idle: Duration,
58    /// Current lifecycle state of the session.
59    pub state: SessionLifecycleState,
60    /// Errors during pipeline processing.
61    pub pipeline_errors: u64,
62}
63
64/// Snapshot of frame buffer metrics.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
66pub struct FrameBufferSnapshot {
67    /// Maximum Transmission Unit for frames.
68    pub frame_mtu: usize,
69    /// Configured timeout for frame reassembly/acknowledgement.
70    pub frame_timeout: Duration,
71    /// Configured capacity of the frame buffer.
72    pub frame_capacity: usize,
73    /// Number of frames currently being assembled (incomplete).
74    pub frames_being_assembled: usize,
75    /// Total number of frames successfully completed/assembled.
76    pub frames_completed: u64,
77    /// Total number of frames emitted to the application.
78    pub frames_emitted: u64,
79    /// Total number of frames discarded (e.g. due to timeout or errors).
80    pub frames_discarded: u64,
81}
82
83/// Snapshot of acknowledgement metrics.
84#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
85pub struct AckSnapshot {
86    /// Configured acknowledgement mode.
87    pub mode: SessionAckMode,
88    /// Total incoming segments received.
89    pub incoming_segments: u64,
90    /// Total retransmission requests received.
91    pub incoming_retransmission_requests: u64,
92    /// Total frame acknowledgements received.
93    pub incoming_acknowledged_frames: u64,
94    /// Total outgoing segments sent.
95    pub outgoing_segments: u64,
96    /// Total retransmission requests sent.
97    pub outgoing_retransmission_requests: u64,
98    /// Total frame acknowledgements sent.
99    pub outgoing_acknowledged_frames: u64,
100}
101
102/// Snapshot of SURB (Single Use Reply Block) metrics.
103#[derive(Debug, Clone, Copy, PartialEq, serde::Serialize)]
104pub struct SurbSnapshot {
105    /// Total SURBs produced/minted.
106    pub produced_total: u64,
107    /// Total SURBs consumed/used.
108    pub consumed_total: u64,
109    /// Estimated number of SURBs currently available.
110    pub buffer_estimate: u64,
111    /// Target number of SURBs to maintain in buffer (if configured).
112    pub target_buffer: Option<u64>,
113    /// Rate of SURB consumption/production per second.
114    pub rate_per_sec: f64,
115    /// Whether a SURB refill request is currently in flight.
116    pub refill_in_flight: bool,
117}
118
119/// Snapshot of transport-level data metrics.
120#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
121pub struct TransportSnapshot {
122    /// Total bytes received.
123    pub bytes_in: u64,
124    /// Total bytes sent.
125    pub bytes_out: u64,
126    /// Total packets received.
127    pub packets_in: u64,
128    /// Total packets sent.
129    pub packets_out: u64,
130}
131
132/// Complete snapshot of all session metrics at a point in time.
133#[derive(Debug, Clone, Copy, PartialEq, serde::Serialize)]
134pub struct SessionStatsSnapshot {
135    /// The ID of the session.
136    pub session_id: SessionId,
137    /// Time when this snapshot was taken.
138    pub snapshot_at: SystemTime,
139    /// Lifetime-related metrics.
140    pub lifetime: SessionLifetimeSnapshot,
141    /// Frame buffer-related metrics.
142    pub frame_buffer: FrameBufferSnapshot,
143    /// Acknowledgement-related metrics.
144    pub ack: AckSnapshot,
145    /// SURB management related metrics.
146    pub surb: SurbSnapshot,
147    /// Transport level metrics (bytes/packets).
148    pub transport: TransportSnapshot,
149}
150
151/// Internal metrics tracker for a session.
152///
153/// This struct uses atomic counters to allow lock-free updates from multiple threads/tasks.
154#[derive(Debug)]
155pub struct SessionTelemetry {
156    session_id: SessionId,
157    created_at_us: AtomicU64,
158    last_activity_us: AtomicU64,
159    state: AtomicU8,
160    ack_mode: SessionAckMode,
161    frame_mtu: usize,
162    frame_timeout: Duration,
163    frame_capacity: usize,
164    frames_being_reassembled: AtomicUsize,
165    frames_incomplete: AtomicU64,
166    frames_completed: AtomicU64,
167    frames_emitted: AtomicU64,
168    frames_discarded: AtomicU64,
169    incoming_segments: AtomicU64,
170    outgoing_segments: AtomicU64,
171    incoming_retransmission_requests: AtomicU64,
172    incoming_acknowledged_frames: AtomicU64,
173    outgoing_retransmission_requests: AtomicU64,
174    outgoing_acknowledged_frames: AtomicU64,
175    session_pipeline_errors: AtomicU64,
176    bytes_in: AtomicU64,
177    bytes_out: AtomicU64,
178    packets_in: AtomicU64,
179    packets_out: AtomicU64,
180    surb_refill_in_flight: AtomicBool,
181    /// Previous (buffer_estimate, timestamp_us) for rate calculation, protected by mutex
182    /// to ensure atomic read/update of the pair.
183    last_rate_snapshot: parking_lot::Mutex<(u64, u64)>,
184    inspector: OnceLock<FrameInspector>,
185    surb_estimator: OnceLock<AtomicSurbFlowEstimator>,
186    surb_target_buffer: OnceLock<u64>,
187}
188
189impl SessionTelemetry {
190    pub fn new(session_id: SessionId, cfg: HoprSessionConfig) -> Self {
191        let now = now_us();
192        let ack_mode = if cfg
193            .capabilities
194            .contains(Capability::RetransmissionAck | Capability::RetransmissionNack)
195        {
196            SessionAckMode::Both
197        } else if cfg.capabilities.contains(Capability::RetransmissionAck) {
198            SessionAckMode::Full
199        } else if cfg.capabilities.contains(Capability::RetransmissionNack) {
200            SessionAckMode::Partial
201        } else {
202            SessionAckMode::None
203        };
204
205        Self {
206            session_id,
207            created_at_us: AtomicU64::new(now),
208            last_activity_us: AtomicU64::new(now),
209            state: AtomicU8::new(SessionLifecycleState::Active as u8),
210            ack_mode,
211            frame_mtu: cfg.frame_mtu,
212            frame_timeout: cfg.frame_timeout,
213            frame_capacity: SESSION_SOCKET_CAPACITY,
214            frames_being_reassembled: AtomicUsize::new(0),
215            frames_incomplete: AtomicU64::new(0),
216            frames_completed: AtomicU64::new(0),
217            frames_emitted: AtomicU64::new(0),
218            frames_discarded: AtomicU64::new(0),
219            incoming_segments: AtomicU64::new(0),
220            incoming_retransmission_requests: AtomicU64::new(0),
221            incoming_acknowledged_frames: AtomicU64::new(0),
222            outgoing_segments: AtomicU64::new(0),
223            outgoing_retransmission_requests: AtomicU64::new(0),
224            outgoing_acknowledged_frames: AtomicU64::new(0),
225            session_pipeline_errors: AtomicU64::new(0),
226            bytes_in: AtomicU64::new(0),
227            bytes_out: AtomicU64::new(0),
228            packets_in: AtomicU64::new(0),
229            packets_out: AtomicU64::new(0),
230            surb_refill_in_flight: AtomicBool::new(false),
231            last_rate_snapshot: parking_lot::Mutex::new((0, now)),
232            inspector: OnceLock::new(),
233            surb_estimator: OnceLock::new(),
234            surb_target_buffer: OnceLock::new(),
235        }
236    }
237
238    /// Returns a reference to the session ID.
239    pub fn session_id(&self) -> &SessionId {
240        &self.session_id
241    }
242
243    /// Updates the session lifecycle state.
244    ///
245    /// This method atomically stores the new state, which affects the metrics snapshot
246    /// and indicates the session's current phase (Active, Closing, or Closed).
247    pub fn set_state(&self, state: SessionLifecycleState) {
248        self.state.store(state as u8, Ordering::Relaxed);
249    }
250
251    /// Records activity on the session by updating the last activity timestamp.
252    ///
253    /// This is called on read/write operations to track session idleness.
254    pub fn touch_activity(&self) {
255        self.last_activity_us.store(now_us(), Ordering::Relaxed);
256    }
257
258    /// Records an incoming read operation with the specified number of bytes.
259    ///
260    /// Increments byte and packet counters, and updates activity timestamp.
261    /// Zero-byte reads are ignored.
262    pub fn record_read(&self, bytes: usize) {
263        if bytes == 0 {
264            return;
265        }
266        self.touch_activity();
267        self.bytes_in.fetch_add(bytes as u64, Ordering::Relaxed);
268        self.packets_in.fetch_add(1, Ordering::Relaxed);
269    }
270
271    /// Records an outgoing write operation with the specified number of bytes.
272    ///
273    /// Increments byte and packet counters, and updates activity timestamp.
274    /// Zero-byte writes are ignored.
275    pub fn record_write(&self, bytes: usize) {
276        if bytes == 0 {
277            return;
278        }
279        self.touch_activity();
280        self.bytes_out.fetch_add(bytes as u64, Ordering::Relaxed);
281        self.packets_out.fetch_add(1, Ordering::Relaxed);
282    }
283
284    /// Sets whether a SURB (Single Use Reply Block) refill request is currently in flight.
285    pub fn set_refill_in_flight(&self, active: bool) {
286        self.surb_refill_in_flight.store(active, Ordering::Relaxed);
287    }
288
289    /// Sets the frame inspector for tracking incomplete frames.
290    ///
291    /// The inspector is initialized only once via `OnceLock`.
292    pub fn set_inspector(&self, inspector: FrameInspector) {
293        let _ = self.inspector.set(inspector);
294    }
295
296    /// Sets the SURB flow estimator for tracking produced/consumed SURBs.
297    ///
298    /// The estimator and target buffer are initialized only once via `OnceLock`.
299    pub fn set_surb_estimator(&self, estimator: AtomicSurbFlowEstimator, target_buffer: u64) {
300        let _ = self.surb_estimator.set(estimator);
301        let _ = self.surb_target_buffer.set(target_buffer);
302    }
303
304    /// Updates the count of incomplete frames from the frame inspector.
305    fn record_incomplete_frames(&self) {
306        if let Some(inspector) = self.inspector.get() {
307            self.frames_being_reassembled.store(inspector.len(), Ordering::Relaxed);
308        }
309    }
310
311    /// Creates a snapshot of all current metrics.
312    ///
313    /// This method atomically reads all metric counters and creates an immutable snapshot
314    /// that includes lifetime, frame buffer, acknowledgement, SURB, and transport metrics.
315    /// SURB metrics are loaded automatically from the stored estimator if one was set via
316    /// [`set_surb_estimator`].
317    pub fn snapshot(&self) -> SessionStatsSnapshot {
318        self.record_incomplete_frames();
319
320        let snapshot_at_us = now_us();
321        let created_at_us = self.created_at_us.load(Ordering::Relaxed);
322        let last_activity_us = self.last_activity_us.load(Ordering::Relaxed);
323        let state = SessionLifecycleState::from_repr(self.state.load(Ordering::Relaxed))
324            .unwrap_or(SessionLifecycleState::Active);
325        let uptime_us = snapshot_at_us.saturating_sub(created_at_us);
326        let idle_us = snapshot_at_us.saturating_sub(last_activity_us);
327
328        let (produced, consumed) = self
329            .surb_estimator
330            .get()
331            .map(|e| (e.produced.load(Ordering::Relaxed), e.consumed.load(Ordering::Relaxed)))
332            .unwrap_or((0, 0));
333
334        let target = self.surb_target_buffer.get().copied();
335
336        let buffer_estimate = produced.saturating_sub(consumed);
337        let rate_per_sec = self.compute_rate_per_sec(produced, consumed, snapshot_at_us);
338
339        SessionStatsSnapshot {
340            session_id: self.session_id,
341            snapshot_at: UNIX_EPOCH + Duration::from_micros(snapshot_at_us),
342            lifetime: SessionLifetimeSnapshot {
343                created_at: UNIX_EPOCH + Duration::from_micros(created_at_us),
344                last_activity_at: UNIX_EPOCH + Duration::from_micros(last_activity_us),
345                uptime: Duration::from_micros(uptime_us),
346                idle: Duration::from_micros(idle_us),
347                state,
348                pipeline_errors: self.session_pipeline_errors.load(Ordering::Relaxed),
349            },
350            frame_buffer: FrameBufferSnapshot {
351                frame_mtu: self.frame_mtu,
352                frame_timeout: self.frame_timeout,
353                frame_capacity: self.frame_capacity,
354                frames_being_assembled: self.frames_being_reassembled.load(Ordering::Relaxed),
355                frames_completed: self.frames_completed.load(Ordering::Relaxed),
356                frames_emitted: self.frames_emitted.load(Ordering::Relaxed),
357                frames_discarded: self.frames_discarded.load(Ordering::Relaxed),
358            },
359            ack: AckSnapshot {
360                mode: self.ack_mode,
361                incoming_segments: self.incoming_segments.load(Ordering::Relaxed),
362                outgoing_segments: self.outgoing_segments.load(Ordering::Relaxed),
363                incoming_retransmission_requests: self.incoming_retransmission_requests.load(Ordering::Relaxed),
364                incoming_acknowledged_frames: self.incoming_acknowledged_frames.load(Ordering::Relaxed),
365                outgoing_retransmission_requests: self.outgoing_retransmission_requests.load(Ordering::Relaxed),
366                outgoing_acknowledged_frames: self.outgoing_acknowledged_frames.load(Ordering::Relaxed),
367            },
368            surb: SurbSnapshot {
369                produced_total: produced,
370                consumed_total: consumed,
371                buffer_estimate,
372                target_buffer: target,
373                rate_per_sec,
374                refill_in_flight: self.surb_refill_in_flight.load(Ordering::Relaxed),
375            },
376            transport: TransportSnapshot {
377                bytes_in: self.bytes_in.load(Ordering::Relaxed),
378                bytes_out: self.bytes_out.load(Ordering::Relaxed),
379                packets_in: self.packets_in.load(Ordering::Relaxed),
380                packets_out: self.packets_out.load(Ordering::Relaxed),
381            },
382        }
383    }
384
385    /// Computes the SURB buffer change rate in items per second.
386    ///
387    /// This uses a sliding window approach, tracking the delta since the last computation
388    /// and the elapsed time to calculate the current rate.
389    ///
390    /// Returns:
391    /// - Positive value: buffer is growing (production > consumption)
392    /// - Negative value: buffer is depleting (consumption > production)
393    /// - Zero: no change or no time has elapsed
394    fn compute_rate_per_sec(&self, produced: u64, consumed: u64, now_us: u64) -> f64 {
395        let total = produced.saturating_sub(consumed);
396
397        // Atomically read and update the previous snapshot
398        let (last_total, last_us) = {
399            let mut snapshot = self.last_rate_snapshot.lock();
400            let prev = *snapshot;
401            *snapshot = (total, now_us);
402            prev
403        };
404
405        let elapsed_us = now_us.saturating_sub(last_us);
406        if elapsed_us == 0 {
407            return 0.0;
408        }
409
410        // Use signed arithmetic to capture buffer depletion as negative rates
411        let delta = total as i64 - last_total as i64;
412        (delta as f64) / (elapsed_us as f64 / 1_000_000.0)
413    }
414}
415
416/// Returns the current time as microseconds since the Unix epoch.
417fn now_us() -> u64 {
418    SystemTime::now()
419        .duration_since(UNIX_EPOCH)
420        .unwrap_or_default()
421        .as_micros() as u64
422}
423
424impl hopr_protocol_session::SessionTelemetryTracker for SessionTelemetry {
425    fn frame_emitted(&self) {
426        self.frames_emitted.fetch_add(1, Ordering::Relaxed);
427    }
428
429    fn frame_completed(&self) {
430        self.frames_completed.fetch_add(1, Ordering::Relaxed);
431    }
432
433    fn frame_discarded(&self) {
434        self.frames_discarded.fetch_add(1, Ordering::Relaxed);
435    }
436
437    fn incomplete_frame(&self) {
438        self.frames_incomplete.fetch_add(1, Ordering::Relaxed);
439    }
440
441    fn incoming_message(&self, msg: SessionMessageDiscriminants) {
442        match msg {
443            SessionMessageDiscriminants::Segment => {
444                self.incoming_segments.fetch_add(1, Ordering::Relaxed);
445            }
446            SessionMessageDiscriminants::Request => {
447                self.incoming_retransmission_requests.fetch_add(1, Ordering::Relaxed);
448            }
449            SessionMessageDiscriminants::Acknowledge => {
450                self.incoming_acknowledged_frames.fetch_add(1, Ordering::Relaxed);
451            }
452        }
453    }
454
455    fn outgoing_message(&self, msg: SessionMessageDiscriminants) {
456        match msg {
457            SessionMessageDiscriminants::Segment => {
458                self.outgoing_segments.fetch_add(1, Ordering::Relaxed);
459            }
460            SessionMessageDiscriminants::Request => {
461                self.outgoing_retransmission_requests.fetch_add(1, Ordering::Relaxed);
462            }
463            SessionMessageDiscriminants::Acknowledge => {
464                self.outgoing_acknowledged_frames.fetch_add(1, Ordering::Relaxed);
465            }
466        }
467    }
468
469    fn error(&self) {
470        self.session_pipeline_errors.fetch_add(1, Ordering::Relaxed);
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use hopr_crypto_random::Randomizable;
477    use hopr_internal_types::prelude::HoprPseudonym;
478    use hopr_protocol_session::SessionTelemetryTracker;
479
480    use super::*;
481    use crate::SessionId;
482
483    #[test]
484    fn metrics_snapshot_tracks_bytes_and_packets() {
485        let id = SessionId::new(1_u64, HoprPseudonym::random());
486        let metrics = SessionTelemetry::new(id, HoprSessionConfig::default());
487
488        metrics.record_read(10);
489        metrics.record_read(0);
490        metrics.record_write(20);
491
492        let snapshot = metrics.snapshot();
493
494        assert_eq!(snapshot.transport.bytes_in, 10);
495        assert_eq!(snapshot.transport.bytes_out, 20);
496        assert_eq!(snapshot.transport.packets_in, 1);
497        assert_eq!(snapshot.transport.packets_out, 1);
498    }
499
500    #[test]
501    fn metrics_snapshot_tracks_frame_events() {
502        let id = SessionId::new(2_u64, HoprPseudonym::random());
503        let metrics = SessionTelemetry::new(id, HoprSessionConfig::default());
504
505        metrics.frame_completed();
506        metrics.frame_emitted();
507        metrics.frame_discarded();
508
509        let snapshot = metrics.snapshot();
510
511        assert_eq!(snapshot.frame_buffer.frames_completed, 1);
512        assert_eq!(snapshot.frame_buffer.frames_emitted, 1);
513        assert_eq!(snapshot.frame_buffer.frames_discarded, 1);
514    }
515}