1use std::{
8 sync::{
9 OnceLock,
10 atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering},
11 },
12 time::{Duration, SystemTime, UNIX_EPOCH},
13};
14
15use hopr_protocol_session::{FrameInspector, SessionMessageDiscriminants};
16
17use crate::{
18 Capability, HoprSessionConfig, SessionId, balancer::AtomicSurbFlowEstimator, types::SESSION_SOCKET_CAPACITY,
19};
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq, strum::FromRepr, serde::Serialize)]
23#[repr(u8)]
24pub enum SessionLifecycleState {
25 Active = 0,
27 Closing = 1,
29 Closed = 2,
31}
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize)]
35#[repr(u8)]
36pub enum SessionAckMode {
37 None,
39 Partial,
41 Full,
43 Both,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
49pub struct SessionLifetimeSnapshot {
50 pub created_at: SystemTime,
52 pub last_activity_at: SystemTime,
54 pub uptime: Duration,
56 pub idle: Duration,
58 pub state: SessionLifecycleState,
60 pub pipeline_errors: u64,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
66pub struct FrameBufferSnapshot {
67 pub frame_mtu: usize,
69 pub frame_timeout: Duration,
71 pub frame_capacity: usize,
73 pub frames_being_assembled: usize,
75 pub frames_completed: u64,
77 pub frames_emitted: u64,
79 pub frames_discarded: u64,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
85pub struct AckSnapshot {
86 pub mode: SessionAckMode,
88 pub incoming_segments: u64,
90 pub incoming_retransmission_requests: u64,
92 pub incoming_acknowledged_frames: u64,
94 pub outgoing_segments: u64,
96 pub outgoing_retransmission_requests: u64,
98 pub outgoing_acknowledged_frames: u64,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, serde::Serialize)]
104pub struct SurbSnapshot {
105 pub produced_total: u64,
107 pub consumed_total: u64,
109 pub buffer_estimate: u64,
111 pub target_buffer: Option<u64>,
113 pub rate_per_sec: f64,
115 pub refill_in_flight: bool,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
121pub struct TransportSnapshot {
122 pub bytes_in: u64,
124 pub bytes_out: u64,
126 pub packets_in: u64,
128 pub packets_out: u64,
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, serde::Serialize)]
134pub struct SessionStatsSnapshot {
135 pub session_id: SessionId,
137 pub snapshot_at: SystemTime,
139 pub lifetime: SessionLifetimeSnapshot,
141 pub frame_buffer: FrameBufferSnapshot,
143 pub ack: AckSnapshot,
145 pub surb: SurbSnapshot,
147 pub transport: TransportSnapshot,
149}
150
151#[derive(Debug)]
155pub struct SessionTelemetry {
156 session_id: SessionId,
157 created_at_us: AtomicU64,
158 last_activity_us: AtomicU64,
159 state: AtomicU8,
160 ack_mode: SessionAckMode,
161 frame_mtu: usize,
162 frame_timeout: Duration,
163 frame_capacity: usize,
164 frames_being_reassembled: AtomicUsize,
165 frames_incomplete: AtomicU64,
166 frames_completed: AtomicU64,
167 frames_emitted: AtomicU64,
168 frames_discarded: AtomicU64,
169 incoming_segments: AtomicU64,
170 outgoing_segments: AtomicU64,
171 incoming_retransmission_requests: AtomicU64,
172 incoming_acknowledged_frames: AtomicU64,
173 outgoing_retransmission_requests: AtomicU64,
174 outgoing_acknowledged_frames: AtomicU64,
175 session_pipeline_errors: AtomicU64,
176 bytes_in: AtomicU64,
177 bytes_out: AtomicU64,
178 packets_in: AtomicU64,
179 packets_out: AtomicU64,
180 surb_refill_in_flight: AtomicBool,
181 last_rate_snapshot: parking_lot::Mutex<(u64, u64)>,
184 inspector: OnceLock<FrameInspector>,
185 surb_estimator: OnceLock<AtomicSurbFlowEstimator>,
186 surb_target_buffer: OnceLock<u64>,
187}
188
189impl SessionTelemetry {
190 pub fn new(session_id: SessionId, cfg: HoprSessionConfig) -> Self {
191 let now = now_us();
192 let ack_mode = if cfg
193 .capabilities
194 .contains(Capability::RetransmissionAck | Capability::RetransmissionNack)
195 {
196 SessionAckMode::Both
197 } else if cfg.capabilities.contains(Capability::RetransmissionAck) {
198 SessionAckMode::Full
199 } else if cfg.capabilities.contains(Capability::RetransmissionNack) {
200 SessionAckMode::Partial
201 } else {
202 SessionAckMode::None
203 };
204
205 Self {
206 session_id,
207 created_at_us: AtomicU64::new(now),
208 last_activity_us: AtomicU64::new(now),
209 state: AtomicU8::new(SessionLifecycleState::Active as u8),
210 ack_mode,
211 frame_mtu: cfg.frame_mtu,
212 frame_timeout: cfg.frame_timeout,
213 frame_capacity: SESSION_SOCKET_CAPACITY,
214 frames_being_reassembled: AtomicUsize::new(0),
215 frames_incomplete: AtomicU64::new(0),
216 frames_completed: AtomicU64::new(0),
217 frames_emitted: AtomicU64::new(0),
218 frames_discarded: AtomicU64::new(0),
219 incoming_segments: AtomicU64::new(0),
220 incoming_retransmission_requests: AtomicU64::new(0),
221 incoming_acknowledged_frames: AtomicU64::new(0),
222 outgoing_segments: AtomicU64::new(0),
223 outgoing_retransmission_requests: AtomicU64::new(0),
224 outgoing_acknowledged_frames: AtomicU64::new(0),
225 session_pipeline_errors: AtomicU64::new(0),
226 bytes_in: AtomicU64::new(0),
227 bytes_out: AtomicU64::new(0),
228 packets_in: AtomicU64::new(0),
229 packets_out: AtomicU64::new(0),
230 surb_refill_in_flight: AtomicBool::new(false),
231 last_rate_snapshot: parking_lot::Mutex::new((0, now)),
232 inspector: OnceLock::new(),
233 surb_estimator: OnceLock::new(),
234 surb_target_buffer: OnceLock::new(),
235 }
236 }
237
238 pub fn session_id(&self) -> &SessionId {
240 &self.session_id
241 }
242
243 pub fn set_state(&self, state: SessionLifecycleState) {
248 self.state.store(state as u8, Ordering::Relaxed);
249 }
250
251 pub fn touch_activity(&self) {
255 self.last_activity_us.store(now_us(), Ordering::Relaxed);
256 }
257
258 pub fn record_read(&self, bytes: usize) {
263 if bytes == 0 {
264 return;
265 }
266 self.touch_activity();
267 self.bytes_in.fetch_add(bytes as u64, Ordering::Relaxed);
268 self.packets_in.fetch_add(1, Ordering::Relaxed);
269 }
270
271 pub fn record_write(&self, bytes: usize) {
276 if bytes == 0 {
277 return;
278 }
279 self.touch_activity();
280 self.bytes_out.fetch_add(bytes as u64, Ordering::Relaxed);
281 self.packets_out.fetch_add(1, Ordering::Relaxed);
282 }
283
284 pub fn set_refill_in_flight(&self, active: bool) {
286 self.surb_refill_in_flight.store(active, Ordering::Relaxed);
287 }
288
289 pub fn set_inspector(&self, inspector: FrameInspector) {
293 let _ = self.inspector.set(inspector);
294 }
295
296 pub fn set_surb_estimator(&self, estimator: AtomicSurbFlowEstimator, target_buffer: u64) {
300 let _ = self.surb_estimator.set(estimator);
301 let _ = self.surb_target_buffer.set(target_buffer);
302 }
303
304 fn record_incomplete_frames(&self) {
306 if let Some(inspector) = self.inspector.get() {
307 self.frames_being_reassembled.store(inspector.len(), Ordering::Relaxed);
308 }
309 }
310
311 pub fn snapshot(&self) -> SessionStatsSnapshot {
318 self.record_incomplete_frames();
319
320 let snapshot_at_us = now_us();
321 let created_at_us = self.created_at_us.load(Ordering::Relaxed);
322 let last_activity_us = self.last_activity_us.load(Ordering::Relaxed);
323 let state = SessionLifecycleState::from_repr(self.state.load(Ordering::Relaxed))
324 .unwrap_or(SessionLifecycleState::Active);
325 let uptime_us = snapshot_at_us.saturating_sub(created_at_us);
326 let idle_us = snapshot_at_us.saturating_sub(last_activity_us);
327
328 let (produced, consumed) = self
329 .surb_estimator
330 .get()
331 .map(|e| (e.produced.load(Ordering::Relaxed), e.consumed.load(Ordering::Relaxed)))
332 .unwrap_or((0, 0));
333
334 let target = self.surb_target_buffer.get().copied();
335
336 let buffer_estimate = produced.saturating_sub(consumed);
337 let rate_per_sec = self.compute_rate_per_sec(produced, consumed, snapshot_at_us);
338
339 SessionStatsSnapshot {
340 session_id: self.session_id,
341 snapshot_at: UNIX_EPOCH + Duration::from_micros(snapshot_at_us),
342 lifetime: SessionLifetimeSnapshot {
343 created_at: UNIX_EPOCH + Duration::from_micros(created_at_us),
344 last_activity_at: UNIX_EPOCH + Duration::from_micros(last_activity_us),
345 uptime: Duration::from_micros(uptime_us),
346 idle: Duration::from_micros(idle_us),
347 state,
348 pipeline_errors: self.session_pipeline_errors.load(Ordering::Relaxed),
349 },
350 frame_buffer: FrameBufferSnapshot {
351 frame_mtu: self.frame_mtu,
352 frame_timeout: self.frame_timeout,
353 frame_capacity: self.frame_capacity,
354 frames_being_assembled: self.frames_being_reassembled.load(Ordering::Relaxed),
355 frames_completed: self.frames_completed.load(Ordering::Relaxed),
356 frames_emitted: self.frames_emitted.load(Ordering::Relaxed),
357 frames_discarded: self.frames_discarded.load(Ordering::Relaxed),
358 },
359 ack: AckSnapshot {
360 mode: self.ack_mode,
361 incoming_segments: self.incoming_segments.load(Ordering::Relaxed),
362 outgoing_segments: self.outgoing_segments.load(Ordering::Relaxed),
363 incoming_retransmission_requests: self.incoming_retransmission_requests.load(Ordering::Relaxed),
364 incoming_acknowledged_frames: self.incoming_acknowledged_frames.load(Ordering::Relaxed),
365 outgoing_retransmission_requests: self.outgoing_retransmission_requests.load(Ordering::Relaxed),
366 outgoing_acknowledged_frames: self.outgoing_acknowledged_frames.load(Ordering::Relaxed),
367 },
368 surb: SurbSnapshot {
369 produced_total: produced,
370 consumed_total: consumed,
371 buffer_estimate,
372 target_buffer: target,
373 rate_per_sec,
374 refill_in_flight: self.surb_refill_in_flight.load(Ordering::Relaxed),
375 },
376 transport: TransportSnapshot {
377 bytes_in: self.bytes_in.load(Ordering::Relaxed),
378 bytes_out: self.bytes_out.load(Ordering::Relaxed),
379 packets_in: self.packets_in.load(Ordering::Relaxed),
380 packets_out: self.packets_out.load(Ordering::Relaxed),
381 },
382 }
383 }
384
385 fn compute_rate_per_sec(&self, produced: u64, consumed: u64, now_us: u64) -> f64 {
395 let total = produced.saturating_sub(consumed);
396
397 let (last_total, last_us) = {
399 let mut snapshot = self.last_rate_snapshot.lock();
400 let prev = *snapshot;
401 *snapshot = (total, now_us);
402 prev
403 };
404
405 let elapsed_us = now_us.saturating_sub(last_us);
406 if elapsed_us == 0 {
407 return 0.0;
408 }
409
410 let delta = total as i64 - last_total as i64;
412 (delta as f64) / (elapsed_us as f64 / 1_000_000.0)
413 }
414}
415
416fn now_us() -> u64 {
418 SystemTime::now()
419 .duration_since(UNIX_EPOCH)
420 .unwrap_or_default()
421 .as_micros() as u64
422}
423
424impl hopr_protocol_session::SessionTelemetryTracker for SessionTelemetry {
425 fn frame_emitted(&self) {
426 self.frames_emitted.fetch_add(1, Ordering::Relaxed);
427 }
428
429 fn frame_completed(&self) {
430 self.frames_completed.fetch_add(1, Ordering::Relaxed);
431 }
432
433 fn frame_discarded(&self) {
434 self.frames_discarded.fetch_add(1, Ordering::Relaxed);
435 }
436
437 fn incomplete_frame(&self) {
438 self.frames_incomplete.fetch_add(1, Ordering::Relaxed);
439 }
440
441 fn incoming_message(&self, msg: SessionMessageDiscriminants) {
442 match msg {
443 SessionMessageDiscriminants::Segment => {
444 self.incoming_segments.fetch_add(1, Ordering::Relaxed);
445 }
446 SessionMessageDiscriminants::Request => {
447 self.incoming_retransmission_requests.fetch_add(1, Ordering::Relaxed);
448 }
449 SessionMessageDiscriminants::Acknowledge => {
450 self.incoming_acknowledged_frames.fetch_add(1, Ordering::Relaxed);
451 }
452 }
453 }
454
455 fn outgoing_message(&self, msg: SessionMessageDiscriminants) {
456 match msg {
457 SessionMessageDiscriminants::Segment => {
458 self.outgoing_segments.fetch_add(1, Ordering::Relaxed);
459 }
460 SessionMessageDiscriminants::Request => {
461 self.outgoing_retransmission_requests.fetch_add(1, Ordering::Relaxed);
462 }
463 SessionMessageDiscriminants::Acknowledge => {
464 self.outgoing_acknowledged_frames.fetch_add(1, Ordering::Relaxed);
465 }
466 }
467 }
468
469 fn error(&self) {
470 self.session_pipeline_errors.fetch_add(1, Ordering::Relaxed);
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use hopr_crypto_random::Randomizable;
477 use hopr_internal_types::prelude::HoprPseudonym;
478 use hopr_protocol_session::SessionTelemetryTracker;
479
480 use super::*;
481 use crate::SessionId;
482
483 #[test]
484 fn metrics_snapshot_tracks_bytes_and_packets() {
485 let id = SessionId::new(1_u64, HoprPseudonym::random());
486 let metrics = SessionTelemetry::new(id, HoprSessionConfig::default());
487
488 metrics.record_read(10);
489 metrics.record_read(0);
490 metrics.record_write(20);
491
492 let snapshot = metrics.snapshot();
493
494 assert_eq!(snapshot.transport.bytes_in, 10);
495 assert_eq!(snapshot.transport.bytes_out, 20);
496 assert_eq!(snapshot.transport.packets_in, 1);
497 assert_eq!(snapshot.transport.packets_out, 1);
498 }
499
500 #[test]
501 fn metrics_snapshot_tracks_frame_events() {
502 let id = SessionId::new(2_u64, HoprPseudonym::random());
503 let metrics = SessionTelemetry::new(id, HoprSessionConfig::default());
504
505 metrics.frame_completed();
506 metrics.frame_emitted();
507 metrics.frame_discarded();
508
509 let snapshot = metrics.snapshot();
510
511 assert_eq!(snapshot.frame_buffer.frames_completed, 1);
512 assert_eq!(snapshot.frame_buffer.frames_emitted, 1);
513 assert_eq!(snapshot.frame_buffer.frames_discarded, 1);
514 }
515}