1use std::{
2 sync::{Arc, OnceLock, Weak},
3 time::{Duration, SystemTime, UNIX_EPOCH},
4};
5
6use super::{
7 AckSnapshot, FrameBufferSnapshot, SessionAckMode, SessionId, SessionLifecycleState, SessionLifetimeSnapshot,
8 SessionStatsSnapshot, SessionTelemetry, SurbSnapshot, TransportSnapshot,
9};
10
11#[derive(Clone, Copy, Debug, PartialEq, Eq)]
12pub enum SessionMetricKind {
13 U64Gauge,
14 U64Counter,
15 F64Gauge,
16}
17
18impl SessionMetricKind {
19 fn from_name(metric_name: &str) -> Self {
20 if metric_name.ends_with("_total") {
21 SessionMetricKind::U64Counter
22 } else {
23 SessionMetricKind::U64Gauge
24 }
25 }
26}
27
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub struct SessionMetricDefinition {
30 pub name: String,
31 pub kind: SessionMetricKind,
32}
33
34#[derive(Clone, Copy, Debug, PartialEq)]
35pub enum SessionMetricValue {
36 U64(u64),
37 F64(f64),
38}
39
40#[derive(Clone, Debug, PartialEq)]
41pub struct SessionMetricSample {
42 pub name: String,
43 pub value: SessionMetricValue,
44}
45
46const SESSION_TELEMETRY_REGISTRY_TTL: Duration = Duration::from_mins(30);
50
51pub fn serialize_system_time_millis<S>(timestamp: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
52where
53 S: serde::Serializer,
54{
55 let timestamp_millis = timestamp.duration_since(UNIX_EPOCH).unwrap_or_default().as_millis() as u64;
56 serializer.serialize_u64(timestamp_millis)
57}
58
59pub fn serialize_duration_millis<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
60where
61 S: serde::Serializer,
62{
63 serializer.serialize_u64(duration.as_millis() as u64)
64}
65
66fn flatten_snapshot_value(value: &serde_json::Value, path: &mut Vec<String>, output: &mut Vec<SessionMetricSample>) {
67 match value {
68 serde_json::Value::Object(fields) => {
69 for (field_name, field_value) in fields {
70 if path.is_empty() && field_name == "session_id" {
71 continue;
72 }
73 path.push(field_name.clone());
74 flatten_snapshot_value(field_value, path, output);
75 path.pop();
76 }
77 }
78 serde_json::Value::Number(number) => {
79 let metric_name = format!("hopr_session_{}", path.join("_"));
80 if let Some(value) = number.as_u64() {
81 output.push(SessionMetricSample {
82 name: metric_name,
83 value: SessionMetricValue::U64(value),
84 });
85 } else if let Some(value) = number.as_i64() {
86 output.push(SessionMetricSample {
87 name: metric_name,
88 value: SessionMetricValue::U64(value.max(0) as u64),
89 });
90 } else if let Some(value) = number.as_f64() {
91 output.push(SessionMetricSample {
92 name: metric_name,
93 value: SessionMetricValue::F64(value),
94 });
95 }
96 }
97 serde_json::Value::Bool(value) => {
98 let metric_name = format!("hopr_session_{}", path.join("_"));
99 output.push(SessionMetricSample {
100 name: metric_name,
101 value: SessionMetricValue::U64(u64::from(*value)),
102 });
103 }
104 serde_json::Value::Null | serde_json::Value::Array(_) | serde_json::Value::String(_) => {}
105 }
106}
107
108fn collect_snapshot_metrics(snapshot: &SessionStatsSnapshot) -> Vec<SessionMetricSample> {
109 let mut output = Vec::new();
110 let mut path = Vec::new();
111
112 if let Ok(serialized) = serde_json::to_value(snapshot) {
113 flatten_snapshot_value(&serialized, &mut path, &mut output);
114 }
115
116 output
117}
118
119#[derive(serde::Serialize)]
120struct SessionMetricSchemaSnapshot {
121 #[serde(rename = "snapshot_at_ms")]
122 #[serde(serialize_with = "serialize_system_time_millis")]
123 snapshot_at: SystemTime,
124 lifetime: SessionLifetimeSnapshot,
125 #[serde(rename = "frame")]
126 frame_buffer: FrameBufferSnapshot,
127 ack: AckSnapshot,
128 surb: SurbSnapshot,
129 transport: TransportSnapshot,
130}
131
132fn metric_schema_snapshot() -> SessionMetricSchemaSnapshot {
133 SessionMetricSchemaSnapshot {
134 snapshot_at: UNIX_EPOCH,
135 lifetime: SessionLifetimeSnapshot {
136 created_at: UNIX_EPOCH,
137 last_activity_at: UNIX_EPOCH,
138 uptime: Duration::from_millis(0),
139 idle: Duration::from_millis(0),
140 state: SessionLifecycleState::Active,
141 pipeline_errors: 0,
142 },
143 frame_buffer: FrameBufferSnapshot {
144 frame_mtu: 0,
145 frame_timeout: Duration::from_millis(0),
146 frame_capacity: 0,
147 frames_being_assembled: 0,
148 frames_completed: 0,
149 frames_emitted: 0,
150 frames_discarded: 0,
151 },
152 ack: AckSnapshot {
153 mode: SessionAckMode::None,
154 incoming_segments: 0,
155 incoming_retransmission_requests: 0,
156 incoming_acknowledged_frames: 0,
157 outgoing_segments: 0,
158 outgoing_retransmission_requests: 0,
159 outgoing_acknowledged_frames: 0,
160 },
161 surb: SurbSnapshot {
162 produced_total: 0,
163 consumed_total: 0,
164 buffer_estimate: 0,
165 target_buffer: Some(0),
166 rate_per_sec: 0.0,
167 refill_in_flight: false,
168 },
169 transport: TransportSnapshot {
170 bytes_in: 0,
171 bytes_out: 0,
172 packets_in: 0,
173 packets_out: 0,
174 },
175 }
176}
177
178pub fn session_snapshot_metric_definitions() -> Vec<SessionMetricDefinition> {
179 let mut definitions = Vec::new();
180 let mut path = Vec::new();
181
182 if let Ok(serialized) = serde_json::to_value(metric_schema_snapshot()) {
183 let mut samples = Vec::new();
184 flatten_snapshot_value(&serialized, &mut path, &mut samples);
185 for sample in samples {
186 let kind = match sample.value {
187 SessionMetricValue::U64(_) => SessionMetricKind::from_name(&sample.name),
188 SessionMetricValue::F64(_) => SessionMetricKind::F64Gauge,
189 };
190 definitions.push(SessionMetricDefinition {
191 name: sample.name,
192 kind,
193 });
194 }
195 }
196
197 definitions
198}
199
200pub fn session_snapshot_metric_value(snapshot: &SessionStatsSnapshot, metric_name: &str) -> Option<SessionMetricValue> {
201 collect_snapshot_metrics(snapshot)
202 .into_iter()
203 .find(|sample| sample.name == metric_name)
204 .map(|sample| sample.value)
205}
206
207pub fn session_snapshot_metric_samples(snapshot: &SessionStatsSnapshot) -> Vec<SessionMetricSample> {
208 collect_snapshot_metrics(snapshot)
209}
210
211fn session_telemetry_registry() -> &'static moka::sync::Cache<SessionId, Weak<SessionTelemetry>> {
212 static REGISTRY: OnceLock<moka::sync::Cache<SessionId, Weak<SessionTelemetry>>> = OnceLock::new();
213 REGISTRY.get_or_init(|| {
214 moka::sync::Cache::builder()
215 .time_to_live(SESSION_TELEMETRY_REGISTRY_TTL)
216 .build()
217 })
218}
219
220pub(crate) fn register_session_telemetry(session_telemetry: &Arc<SessionTelemetry>) {
221 session_telemetry_registry().insert(*session_telemetry.session_id(), Arc::downgrade(session_telemetry));
222}
223
224pub(crate) fn unregister_session_telemetry(session_id: &SessionId) {
225 session_telemetry_registry().remove(session_id);
226}
227
228pub fn session_telemetry_snapshots() -> Vec<SessionStatsSnapshot> {
229 let registry = session_telemetry_registry();
230 registry.run_pending_tasks();
231
232 registry
233 .iter()
234 .filter_map(|(session_id, session_telemetry)| session_telemetry.upgrade().is_none().then_some(session_id))
235 .collect::<Vec<_>>()
236 .into_iter()
237 .for_each(|session_id| {
238 registry.remove(session_id.as_ref());
239 });
240
241 let mut snapshots: Vec<_> = registry
242 .iter()
243 .filter_map(|(_, session_telemetry)| {
244 session_telemetry
245 .upgrade()
246 .map(|session_telemetry| session_telemetry.snapshot())
247 })
248 .collect();
249
250 snapshots.sort_by(|left, right| left.session_id.as_str().cmp(right.session_id.as_str()));
251 snapshots
252}