Skip to main content

hopr_transport_session/
metric_bridge.rs

1use std::{
2    sync::{Arc, OnceLock, Weak},
3    time::{Duration, SystemTime, UNIX_EPOCH},
4};
5
6use super::{
7    AckSnapshot, FrameBufferSnapshot, SessionAckMode, SessionId, SessionLifecycleState, SessionLifetimeSnapshot,
8    SessionStatsSnapshot, SessionTelemetry, SurbSnapshot, TransportSnapshot,
9};
10
11#[derive(Clone, Copy, Debug, PartialEq, Eq)]
12pub enum SessionMetricKind {
13    U64Gauge,
14    U64Counter,
15    F64Gauge,
16}
17
18impl SessionMetricKind {
19    fn from_name(metric_name: &str) -> Self {
20        if metric_name.ends_with("_total") {
21            SessionMetricKind::U64Counter
22        } else {
23            SessionMetricKind::U64Gauge
24        }
25    }
26}
27
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub struct SessionMetricDefinition {
30    pub name: String,
31    pub kind: SessionMetricKind,
32}
33
34#[derive(Clone, Copy, Debug, PartialEq)]
35pub enum SessionMetricValue {
36    U64(u64),
37    F64(f64),
38}
39
40#[derive(Clone, Debug, PartialEq)]
41pub struct SessionMetricSample {
42    pub name: String,
43    pub value: SessionMetricValue,
44}
45
46/// Duration after which session telemetry entries are automatically removed from the registry if not accessed or
47/// updated. This helps to prevent memory leaks by ensuring that stale telemetry data is cleaned up after a reasonable
48/// period of inactivity.
49const SESSION_TELEMETRY_REGISTRY_TTL: Duration = Duration::from_mins(30);
50
51pub fn serialize_system_time_millis<S>(timestamp: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
52where
53    S: serde::Serializer,
54{
55    let timestamp_millis = timestamp.duration_since(UNIX_EPOCH).unwrap_or_default().as_millis() as u64;
56    serializer.serialize_u64(timestamp_millis)
57}
58
59pub fn serialize_duration_millis<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
60where
61    S: serde::Serializer,
62{
63    serializer.serialize_u64(duration.as_millis() as u64)
64}
65
66fn flatten_snapshot_value(value: &serde_json::Value, path: &mut Vec<String>, output: &mut Vec<SessionMetricSample>) {
67    match value {
68        serde_json::Value::Object(fields) => {
69            for (field_name, field_value) in fields {
70                if path.is_empty() && field_name == "session_id" {
71                    continue;
72                }
73                path.push(field_name.clone());
74                flatten_snapshot_value(field_value, path, output);
75                path.pop();
76            }
77        }
78        serde_json::Value::Number(number) => {
79            let metric_name = format!("hopr_session_{}", path.join("_"));
80            if let Some(value) = number.as_u64() {
81                output.push(SessionMetricSample {
82                    name: metric_name,
83                    value: SessionMetricValue::U64(value),
84                });
85            } else if let Some(value) = number.as_i64() {
86                output.push(SessionMetricSample {
87                    name: metric_name,
88                    value: SessionMetricValue::U64(value.max(0) as u64),
89                });
90            } else if let Some(value) = number.as_f64() {
91                output.push(SessionMetricSample {
92                    name: metric_name,
93                    value: SessionMetricValue::F64(value),
94                });
95            }
96        }
97        serde_json::Value::Bool(value) => {
98            let metric_name = format!("hopr_session_{}", path.join("_"));
99            output.push(SessionMetricSample {
100                name: metric_name,
101                value: SessionMetricValue::U64(u64::from(*value)),
102            });
103        }
104        serde_json::Value::Null | serde_json::Value::Array(_) | serde_json::Value::String(_) => {}
105    }
106}
107
108fn collect_snapshot_metrics(snapshot: &SessionStatsSnapshot) -> Vec<SessionMetricSample> {
109    let mut output = Vec::new();
110    let mut path = Vec::new();
111
112    if let Ok(serialized) = serde_json::to_value(snapshot) {
113        flatten_snapshot_value(&serialized, &mut path, &mut output);
114    }
115
116    output
117}
118
119#[derive(serde::Serialize)]
120struct SessionMetricSchemaSnapshot {
121    #[serde(rename = "snapshot_at_ms")]
122    #[serde(serialize_with = "serialize_system_time_millis")]
123    snapshot_at: SystemTime,
124    lifetime: SessionLifetimeSnapshot,
125    #[serde(rename = "frame")]
126    frame_buffer: FrameBufferSnapshot,
127    ack: AckSnapshot,
128    surb: SurbSnapshot,
129    transport: TransportSnapshot,
130}
131
132fn metric_schema_snapshot() -> SessionMetricSchemaSnapshot {
133    SessionMetricSchemaSnapshot {
134        snapshot_at: UNIX_EPOCH,
135        lifetime: SessionLifetimeSnapshot {
136            created_at: UNIX_EPOCH,
137            last_activity_at: UNIX_EPOCH,
138            uptime: Duration::from_millis(0),
139            idle: Duration::from_millis(0),
140            state: SessionLifecycleState::Active,
141            pipeline_errors: 0,
142        },
143        frame_buffer: FrameBufferSnapshot {
144            frame_mtu: 0,
145            frame_timeout: Duration::from_millis(0),
146            frame_capacity: 0,
147            frames_being_assembled: 0,
148            frames_completed: 0,
149            frames_emitted: 0,
150            frames_discarded: 0,
151        },
152        ack: AckSnapshot {
153            mode: SessionAckMode::None,
154            incoming_segments: 0,
155            incoming_retransmission_requests: 0,
156            incoming_acknowledged_frames: 0,
157            outgoing_segments: 0,
158            outgoing_retransmission_requests: 0,
159            outgoing_acknowledged_frames: 0,
160        },
161        surb: SurbSnapshot {
162            produced_total: 0,
163            consumed_total: 0,
164            buffer_estimate: 0,
165            target_buffer: Some(0),
166            rate_per_sec: 0.0,
167            refill_in_flight: false,
168        },
169        transport: TransportSnapshot {
170            bytes_in: 0,
171            bytes_out: 0,
172            packets_in: 0,
173            packets_out: 0,
174        },
175    }
176}
177
178pub fn session_snapshot_metric_definitions() -> Vec<SessionMetricDefinition> {
179    let mut definitions = Vec::new();
180    let mut path = Vec::new();
181
182    if let Ok(serialized) = serde_json::to_value(metric_schema_snapshot()) {
183        let mut samples = Vec::new();
184        flatten_snapshot_value(&serialized, &mut path, &mut samples);
185        for sample in samples {
186            let kind = match sample.value {
187                SessionMetricValue::U64(_) => SessionMetricKind::from_name(&sample.name),
188                SessionMetricValue::F64(_) => SessionMetricKind::F64Gauge,
189            };
190            definitions.push(SessionMetricDefinition {
191                name: sample.name,
192                kind,
193            });
194        }
195    }
196
197    definitions
198}
199
200pub fn session_snapshot_metric_value(snapshot: &SessionStatsSnapshot, metric_name: &str) -> Option<SessionMetricValue> {
201    collect_snapshot_metrics(snapshot)
202        .into_iter()
203        .find(|sample| sample.name == metric_name)
204        .map(|sample| sample.value)
205}
206
207pub fn session_snapshot_metric_samples(snapshot: &SessionStatsSnapshot) -> Vec<SessionMetricSample> {
208    collect_snapshot_metrics(snapshot)
209}
210
211fn session_telemetry_registry() -> &'static moka::sync::Cache<SessionId, Weak<SessionTelemetry>> {
212    static REGISTRY: OnceLock<moka::sync::Cache<SessionId, Weak<SessionTelemetry>>> = OnceLock::new();
213    REGISTRY.get_or_init(|| {
214        moka::sync::Cache::builder()
215            .time_to_live(SESSION_TELEMETRY_REGISTRY_TTL)
216            .build()
217    })
218}
219
220pub(crate) fn register_session_telemetry(session_telemetry: &Arc<SessionTelemetry>) {
221    session_telemetry_registry().insert(*session_telemetry.session_id(), Arc::downgrade(session_telemetry));
222}
223
224pub(crate) fn unregister_session_telemetry(session_id: &SessionId) {
225    session_telemetry_registry().remove(session_id);
226}
227
228pub fn session_telemetry_snapshots() -> Vec<SessionStatsSnapshot> {
229    let registry = session_telemetry_registry();
230    registry.run_pending_tasks();
231
232    registry
233        .iter()
234        .filter_map(|(session_id, session_telemetry)| session_telemetry.upgrade().is_none().then_some(session_id))
235        .collect::<Vec<_>>()
236        .into_iter()
237        .for_each(|session_id| {
238            registry.remove(session_id.as_ref());
239        });
240
241    let mut snapshots: Vec<_> = registry
242        .iter()
243        .filter_map(|(_, session_telemetry)| {
244            session_telemetry
245                .upgrade()
246                .map(|session_telemetry| session_telemetry.snapshot())
247        })
248        .collect();
249
250    snapshots.sort_by(|left, right| left.session_id.as_str().cmp(right.session_id.as_str()));
251    snapshots
252}