1use std::{
2 collections::HashMap,
3 sync::Arc,
4 time::{Duration, SystemTime, UNIX_EPOCH},
5};
6
7use hopr_protocol_session::SessionMessageDiscriminants;
8
9pub use crate::balancer::{AtomicSurbFlowEstimator, BalancerStateValues};
10use crate::{Capability, HoprSessionConfig, SessionId, types::SESSION_SOCKET_CAPACITY};
11
12lazy_static::lazy_static! {
13 static ref METRIC_SESSION_SNAPSHOT_AT_MS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
14 "hopr_session_snapshot_at_ms",
15 "Session telemetry sample time in unix milliseconds",
16 &["session_id"]
17 ).unwrap();
18 static ref METRIC_SESSION_LIFETIME_CREATED_AT_MS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
19 "hopr_session_lifetime_created_at_ms",
20 "Session creation time in unix milliseconds",
21 &["session_id"]
22 ).unwrap();
23 static ref METRIC_SESSION_LIFETIME_LAST_ACTIVITY_AT_MS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
24 "hopr_session_lifetime_last_activity_at_ms",
25 "Last session activity time in unix milliseconds",
26 &["session_id"]
27 ).unwrap();
28 static ref METRIC_SESSION_LIFETIME_UPTIME_MS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
29 "hopr_session_lifetime_uptime_ms",
30 "Session uptime in milliseconds",
31 &["session_id"]
32 ).unwrap();
33 static ref METRIC_SESSION_LIFETIME_IDLE_MS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
34 "hopr_session_lifetime_idle_ms",
35 "Session idle time in milliseconds",
36 &["session_id"]
37 ).unwrap();
38 static ref METRIC_SESSION_LIFETIME_STATE: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
39 "hopr_session_lifetime_state",
40 "Session lifecycle state encoded as Active=0, Closing=1, Closed=2",
41 &["session_id"]
42 ).unwrap();
43 static ref METRIC_SESSION_LIFETIME_PIPELINE_ERRORS_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
44 "hopr_session_lifetime_pipeline_errors_total",
45 "Session pipeline processing errors",
46 &["session_id"]
47 ).unwrap();
48 static ref METRIC_SESSION_FRAME_MTU_BYTES: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
49 "hopr_session_frame_mtu_bytes",
50 "Configured frame MTU in bytes",
51 &["session_id"]
52 ).unwrap();
53 static ref METRIC_SESSION_FRAME_TIMEOUT_MS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
54 "hopr_session_frame_timeout_ms",
55 "Configured frame timeout in milliseconds",
56 &["session_id"]
57 ).unwrap();
58 static ref METRIC_SESSION_FRAME_FRAME_CAPACITY: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
59 "hopr_session_frame_frame_capacity",
60 "Configured frame buffer capacity",
61 &["session_id"]
62 ).unwrap();
63 static ref METRIC_SESSION_FRAME_BEING_ASSEMBLED: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
64 "hopr_session_frame_being_assembled",
65 "Number of frames currently being assembled",
66 &["session_id"]
67 ).unwrap();
68 static ref METRIC_SESSION_FRAME_COMPLETED_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
69 "hopr_session_frame_completed_total",
70 "Number of frames successfully completed",
71 &["session_id"]
72 ).unwrap();
73 static ref METRIC_SESSION_FRAME_EMITTED_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
74 "hopr_session_frame_emitted_total",
75 "Number of frames emitted from the sequencer",
76 &["session_id"]
77 ).unwrap();
78 static ref METRIC_SESSION_FRAME_DISCARDED_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
79 "hopr_session_frame_discarded_total",
80 "Number of frames discarded by the session protocol",
81 &["session_id"]
82 ).unwrap();
83 static ref METRIC_SESSION_ACK_MODE: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
84 "hopr_session_ack_mode",
85 "Configured ack mode encoded as None=0, Partial=1, Full=2, Both=3",
86 &["session_id"]
87 ).unwrap();
88 static ref METRIC_SESSION_ACK_INCOMING_SEGMENTS_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
89 "hopr_session_ack_incoming_segments_total",
90 "Incoming session segments",
91 &["session_id"]
92 ).unwrap();
93 static ref METRIC_SESSION_ACK_INCOMING_RETRANSMISSION_REQUESTS_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
94 "hopr_session_ack_incoming_retransmission_requests_total",
95 "Incoming session retransmission requests",
96 &["session_id"]
97 ).unwrap();
98 static ref METRIC_SESSION_ACK_INCOMING_ACKNOWLEDGED_FRAMES_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
99 "hopr_session_ack_incoming_acknowledged_frames_total",
100 "Incoming session acknowledgements",
101 &["session_id"]
102 ).unwrap();
103 static ref METRIC_SESSION_ACK_OUTGOING_SEGMENTS_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
104 "hopr_session_ack_outgoing_segments_total",
105 "Outgoing session segments",
106 &["session_id"]
107 ).unwrap();
108 static ref METRIC_SESSION_ACK_OUTGOING_RETRANSMISSION_REQUESTS_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
109 "hopr_session_ack_outgoing_retransmission_requests_total",
110 "Outgoing session retransmission requests",
111 &["session_id"]
112 ).unwrap();
113 static ref METRIC_SESSION_ACK_OUTGOING_ACKNOWLEDGED_FRAMES_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
114 "hopr_session_ack_outgoing_acknowledged_frames_total",
115 "Outgoing session acknowledgements",
116 &["session_id"]
117 ).unwrap();
118 static ref METRIC_SESSION_SURB_PRODUCED_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
119 "hopr_session_surb_produced_total",
120 "Produced SURBs per session",
121 &["session_id"]
122 ).unwrap();
123 static ref METRIC_SESSION_SURB_CONSUMED_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
124 "hopr_session_surb_consumed_total",
125 "Consumed SURBs per session",
126 &["session_id"]
127 ).unwrap();
128 static ref METRIC_SESSION_SURB_BUFFER_ESTIMATE: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
129 "hopr_session_surb_buffer_estimate",
130 "Estimated SURB buffer size",
131 &["session_id"]
132 ).unwrap();
133 static ref METRIC_SESSION_SURB_TARGET_BUFFER: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
134 "hopr_session_surb_target_buffer",
135 "Configured SURB target buffer size",
136 &["session_id"]
137 ).unwrap();
138 static ref METRIC_SESSION_SURB_RATE_PER_SEC: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
139 "hopr_session_surb_rate_per_sec",
140 "Estimated SURB buffer rate change per second",
141 &["session_id"]
142 ).unwrap();
143 static ref METRIC_SESSION_SURB_REFILL_IN_FLIGHT: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
144 "hopr_session_surb_refill_in_flight",
145 "Whether SURB refill is currently configured for a session (1 or 0)",
146 &["session_id"]
147 ).unwrap();
148 static ref METRIC_SESSION_TRANSPORT_BYTES_IN_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
149 "hopr_session_transport_bytes_in_total",
150 "Session ingress bytes",
151 &["session_id"]
152 ).unwrap();
153 static ref METRIC_SESSION_TRANSPORT_BYTES_OUT_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
154 "hopr_session_transport_bytes_out_total",
155 "Session egress bytes",
156 &["session_id"]
157 ).unwrap();
158 static ref METRIC_SESSION_TRANSPORT_PACKETS_IN_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
159 "hopr_session_transport_packets_in_total",
160 "Session ingress packets",
161 &["session_id"]
162 ).unwrap();
163 static ref METRIC_SESSION_TRANSPORT_PACKETS_OUT_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
164 "hopr_session_transport_packets_out_total",
165 "Session egress packets",
166 &["session_id"]
167 ).unwrap();
168 static ref SESSION_RUNTIME: parking_lot::Mutex<HashMap<SessionId, SessionRuntimeState>> = parking_lot::Mutex::new(HashMap::new());
169}
170
171#[derive(Clone, Copy, Debug, PartialEq, Eq, serde_repr::Serialize_repr)]
172#[repr(u8)]
173pub enum SessionLifecycleState {
174 Active = 0,
175 Closing = 1,
176 Closed = 2,
177}
178
179#[derive(Clone, Copy, Debug, PartialEq, Eq, serde_repr::Serialize_repr)]
180#[repr(u8)]
181pub enum SessionAckMode {
182 None,
183 Partial,
184 Full,
185 Both,
186}
187
188#[derive(Debug)]
189struct SessionSurbRuntimeState {
190 state: Arc<BalancerStateValues>,
191 estimator: AtomicSurbFlowEstimator,
192 last_snapshot_total: u64,
193 last_snapshot_us: u64,
194}
195
196#[derive(Debug)]
197struct SessionRuntimeState {
198 created_at_us: u64,
199 last_activity_us: u64,
200 frames_being_assembled: u64,
201 surb: Option<SessionSurbRuntimeState>,
202}
203
204impl SessionRuntimeState {
205 fn new(now_us: u64) -> Self {
206 Self {
207 created_at_us: now_us,
208 last_activity_us: now_us,
209 frames_being_assembled: 0,
210 surb: None,
211 }
212 }
213}
214
215fn session_ack_mode(capabilities: CapabilitySet) -> SessionAckMode {
216 if capabilities.contains(Capability::RetransmissionAck | Capability::RetransmissionNack) {
217 SessionAckMode::Both
218 } else if capabilities.contains(Capability::RetransmissionAck) {
219 SessionAckMode::Full
220 } else if capabilities.contains(Capability::RetransmissionNack) {
221 SessionAckMode::Partial
222 } else {
223 SessionAckMode::None
224 }
225}
226
227type CapabilitySet = flagset::FlagSet<Capability>;
228
229pub fn initialize_session_metrics(session_id: SessionId, cfg: HoprSessionConfig) {
230 let now = now_us();
231 let ack_mode = session_ack_mode(cfg.capabilities);
232
233 METRIC_SESSION_LIFETIME_CREATED_AT_MS.set(&[session_id.as_str()], now as f64 / 1_000.0);
234 METRIC_SESSION_LIFETIME_STATE.set(&[session_id.as_str()], SessionLifecycleState::Active as u8 as f64);
235 METRIC_SESSION_ACK_MODE.set(&[session_id.as_str()], ack_mode as u8 as f64);
236 METRIC_SESSION_FRAME_MTU_BYTES.set(&[session_id.as_str()], cfg.frame_mtu as f64);
237 METRIC_SESSION_FRAME_TIMEOUT_MS.set(&[session_id.as_str()], cfg.frame_timeout.as_millis() as f64);
238 METRIC_SESSION_FRAME_FRAME_CAPACITY.set(&[session_id.as_str()], SESSION_SOCKET_CAPACITY as f64);
239 METRIC_SESSION_FRAME_BEING_ASSEMBLED.set(&[session_id.as_str()], 0.0);
240 METRIC_SESSION_SURB_BUFFER_ESTIMATE.set(&[session_id.as_str()], 0.0);
241 METRIC_SESSION_SURB_TARGET_BUFFER.set(&[session_id.as_str()], 0.0);
242 METRIC_SESSION_SURB_RATE_PER_SEC.set(&[session_id.as_str()], 0.0);
243 METRIC_SESSION_SURB_REFILL_IN_FLIGHT.set(&[session_id.as_str()], 0.0);
244
245 {
246 let mut state = SESSION_RUNTIME.lock();
247 state.insert(session_id, SessionRuntimeState::new(now));
248 }
249
250 refresh_lifetime_metrics(&session_id, now, now, now);
251}
252
253pub fn remove_session_metrics_state(session_id: &SessionId) {
254 METRIC_SESSION_FRAME_BEING_ASSEMBLED.set(&[session_id.as_str()], 0.0);
255 SESSION_RUNTIME.lock().remove(session_id);
256}
257
258pub fn set_session_state(session_id: &SessionId, state: SessionLifecycleState) {
259 METRIC_SESSION_LIFETIME_STATE.set(&[session_id.as_str()], state as u8 as f64);
260 touch_session_activity(session_id);
261}
262
263fn update_session_activity_locked(
264 session_id: &SessionId,
265 now: u64,
266 state: &mut HashMap<SessionId, SessionRuntimeState>,
267) {
268 if let Some(runtime) = state.get_mut(session_id) {
269 runtime.last_activity_us = now;
270 refresh_lifetime_metrics(session_id, now, runtime.created_at_us, runtime.last_activity_us);
271 refresh_surb_gauges(session_id, runtime, now);
272 }
273}
274
275pub fn touch_session_activity(session_id: &SessionId) {
276 let now = now_us();
277 if let Some(mut state) = SESSION_RUNTIME.try_lock() {
278 update_session_activity_locked(session_id, now, &mut state);
279 }
280}
281
282pub fn record_session_read(session_id: &SessionId, bytes: usize) {
283 if bytes == 0 {
284 return;
285 }
286
287 touch_session_activity(session_id);
288 METRIC_SESSION_TRANSPORT_BYTES_IN_TOTAL.increment_by(&[session_id.as_str()], bytes as u64);
289 METRIC_SESSION_TRANSPORT_PACKETS_IN_TOTAL.increment_by(&[session_id.as_str()], 1);
290}
291
292pub fn record_session_write(session_id: &SessionId, bytes: usize) {
293 if bytes == 0 {
294 return;
295 }
296
297 touch_session_activity(session_id);
298 METRIC_SESSION_TRANSPORT_BYTES_OUT_TOTAL.increment_by(&[session_id.as_str()], bytes as u64);
299 METRIC_SESSION_TRANSPORT_PACKETS_OUT_TOTAL.increment_by(&[session_id.as_str()], 1);
300}
301
302pub fn set_session_balancer_data(
303 session_id: &SessionId,
304 estimator: AtomicSurbFlowEstimator,
305 state: Arc<BalancerStateValues>,
306) {
307 let now = now_us();
308 {
309 let mut all = SESSION_RUNTIME.lock();
310 let runtime = all.entry(*session_id).or_insert_with(|| SessionRuntimeState::new(now));
311 runtime.surb = Some(SessionSurbRuntimeState {
312 state,
313 estimator,
314 last_snapshot_total: 0,
315 last_snapshot_us: now,
316 });
317 }
318
319 METRIC_SESSION_SURB_REFILL_IN_FLIGHT.set(&[session_id.as_str()], 1.0);
320 touch_session_activity(session_id);
321}
322
323pub fn record_session_surb_produced(session_id: &SessionId, by: u64) {
324 METRIC_SESSION_SURB_PRODUCED_TOTAL.increment_by(&[session_id.as_str()], by);
325 touch_session_activity(session_id);
326}
327
328pub fn record_session_surb_consumed(session_id: &SessionId, by: u64) {
329 METRIC_SESSION_SURB_CONSUMED_TOTAL.increment_by(&[session_id.as_str()], by);
330 touch_session_activity(session_id);
331}
332
333fn refresh_lifetime_metrics(session_id: &SessionId, now_us: u64, created_at_us: u64, last_activity_us: u64) {
334 METRIC_SESSION_SNAPSHOT_AT_MS.set(&[session_id.as_str()], now_us as f64 / 1_000.0);
335 METRIC_SESSION_LIFETIME_LAST_ACTIVITY_AT_MS.set(&[session_id.as_str()], last_activity_us as f64 / 1_000.0);
336 METRIC_SESSION_LIFETIME_UPTIME_MS.set(
337 &[session_id.as_str()],
338 now_us.saturating_sub(created_at_us) as f64 / 1_000.0,
339 );
340 METRIC_SESSION_LIFETIME_IDLE_MS.set(
341 &[session_id.as_str()],
342 now_us.saturating_sub(last_activity_us) as f64 / 1_000.0,
343 );
344}
345
346fn refresh_surb_gauges(session_id: &SessionId, runtime: &mut SessionRuntimeState, now_us: u64) {
347 let Some(surb) = runtime.surb.as_mut() else {
348 return;
349 };
350
351 let produced = surb.estimator.produced.load(std::sync::atomic::Ordering::Relaxed);
352 let consumed = surb.estimator.consumed.load(std::sync::atomic::Ordering::Relaxed);
353 let total = produced.saturating_sub(consumed);
354
355 let elapsed_us = now_us.saturating_sub(surb.last_snapshot_us);
356 let rate_per_sec = if elapsed_us == 0 {
357 0.0
358 } else {
359 let delta = total as i64 - surb.last_snapshot_total as i64;
360 delta as f64 / (elapsed_us as f64 / 1_000_000.0)
361 };
362
363 surb.last_snapshot_total = total;
364 surb.last_snapshot_us = now_us;
365
366 METRIC_SESSION_SURB_TARGET_BUFFER.set(
367 &[session_id.as_str()],
368 surb.state
369 .target_surb_buffer_size
370 .load(std::sync::atomic::Ordering::Relaxed) as f64,
371 );
372 METRIC_SESSION_SURB_BUFFER_ESTIMATE.set(&[session_id.as_str()], total as f64);
373 METRIC_SESSION_SURB_RATE_PER_SEC.set(&[session_id.as_str()], rate_per_sec);
374}
375
376fn now_us() -> u64 {
377 SystemTime::now()
378 .duration_since(UNIX_EPOCH)
379 .unwrap_or(Duration::from_micros(0))
380 .as_micros() as u64
381}
382
383fn increment_frame_assembly_gauge(session_id: &SessionId) {
384 let mut state = SESSION_RUNTIME.lock();
385 let runtime = state
386 .entry(*session_id)
387 .or_insert_with(|| SessionRuntimeState::new(now_us()));
388 runtime.frames_being_assembled = runtime.frames_being_assembled.saturating_add(1);
389 METRIC_SESSION_FRAME_BEING_ASSEMBLED.increment(&[session_id.as_str()], 1.0);
390}
391
392fn decrement_frame_assembly_gauge(session_id: &SessionId) {
393 let mut state = SESSION_RUNTIME.lock();
394 let Some(runtime) = state.get_mut(session_id) else {
395 return;
396 };
397
398 if runtime.frames_being_assembled == 0 {
399 return;
400 }
401
402 runtime.frames_being_assembled -= 1;
403 METRIC_SESSION_FRAME_BEING_ASSEMBLED.decrement(&[session_id.as_str()], 1.0);
404}
405
406impl hopr_protocol_session::SessionTelemetryTracker for SessionId {
407 fn frame_emitted(&self) {
408 METRIC_SESSION_FRAME_EMITTED_TOTAL.increment(&[self.as_str()]);
409 }
410
411 fn frame_completed(&self) {
412 METRIC_SESSION_FRAME_COMPLETED_TOTAL.increment(&[self.as_str()]);
413 decrement_frame_assembly_gauge(self);
414 }
415
416 fn frame_discarded(&self) {
417 METRIC_SESSION_FRAME_DISCARDED_TOTAL.increment(&[self.as_str()]);
418 decrement_frame_assembly_gauge(self);
419 }
420
421 fn incomplete_frame(&self) {
422 increment_frame_assembly_gauge(self);
423 }
424
425 fn incoming_message(&self, msg: SessionMessageDiscriminants) {
426 match msg {
427 SessionMessageDiscriminants::Segment => {
428 METRIC_SESSION_ACK_INCOMING_SEGMENTS_TOTAL.increment(&[self.as_str()])
429 }
430 SessionMessageDiscriminants::Request => {
431 METRIC_SESSION_ACK_INCOMING_RETRANSMISSION_REQUESTS_TOTAL.increment(&[self.as_str()])
432 }
433 SessionMessageDiscriminants::Acknowledge => {
434 METRIC_SESSION_ACK_INCOMING_ACKNOWLEDGED_FRAMES_TOTAL.increment(&[self.as_str()])
435 }
436 }
437 }
438
439 fn outgoing_message(&self, msg: SessionMessageDiscriminants) {
440 match msg {
441 SessionMessageDiscriminants::Segment => {
442 METRIC_SESSION_ACK_OUTGOING_SEGMENTS_TOTAL.increment(&[self.as_str()])
443 }
444 SessionMessageDiscriminants::Request => {
445 METRIC_SESSION_ACK_OUTGOING_RETRANSMISSION_REQUESTS_TOTAL.increment(&[self.as_str()])
446 }
447 SessionMessageDiscriminants::Acknowledge => {
448 METRIC_SESSION_ACK_OUTGOING_ACKNOWLEDGED_FRAMES_TOTAL.increment(&[self.as_str()])
449 }
450 }
451 }
452
453 fn error(&self) {
454 METRIC_SESSION_LIFETIME_PIPELINE_ERRORS_TOTAL.increment(&[self.as_str()]);
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use hopr_protocol_session::SessionTelemetryTracker;
461 use hopr_types::{crypto_random::Randomizable, internal::prelude::HoprPseudonym};
462
463 use super::*;
464
465 #[test]
466 fn session_metrics_are_exported_through_hopr_metrics() {
467 let id = SessionId::new(4_u64, HoprPseudonym::random());
468 initialize_session_metrics(id, HoprSessionConfig::default());
469 record_session_read(&id, 10);
470 id.frame_completed();
471
472 let text = hopr_metrics::gather_all_metrics().expect("must gather metrics");
473 let session_id = id.to_string();
474 let ingress_metric = format!("hopr_session_transport_bytes_in_total{{session_id=\"{session_id}\"}} 10");
475 let frame_metric = format!("hopr_session_frame_completed_total{{session_id=\"{session_id}\"}} 1");
476 let mode_metric = format!(
477 "hopr_session_ack_mode{{session_id=\"{session_id}\"}} {}",
478 SessionAckMode::None as u8
479 );
480
481 assert!(text.contains(&ingress_metric));
482 assert!(text.contains(&frame_metric));
483 assert!(text.contains(&mode_metric));
484 }
485
486 #[test]
487 fn surb_metrics_are_exported_through_hopr_metrics() {
488 let id = SessionId::new(5_u64, HoprPseudonym::random());
489 initialize_session_metrics(id, HoprSessionConfig::default());
490 let estimator = AtomicSurbFlowEstimator::default();
491 let state = Arc::new(BalancerStateValues::new(Default::default()));
492
493 set_session_balancer_data(&id, estimator, Arc::clone(&state));
494 record_session_surb_produced(&id, 8);
495 record_session_surb_consumed(&id, 3);
496
497 let text = hopr_metrics::gather_all_metrics().expect("must gather metrics");
498 let session_id = id.to_string();
499 let produced_metric = format!("hopr_session_surb_produced_total{{session_id=\"{session_id}\"}} 8");
500 let consumed_metric = format!("hopr_session_surb_consumed_total{{session_id=\"{session_id}\"}} 3");
501
502 assert!(text.contains(&produced_metric));
503 assert!(text.contains(&consumed_metric));
504 }
505}