Skip to main content

hopr_transport_session/
telemetry.rs

1use std::{
2    collections::HashMap,
3    sync::Arc,
4    time::{Duration, SystemTime, UNIX_EPOCH},
5};
6
7use hopr_protocol_session::SessionMessageDiscriminants;
8
9pub use crate::balancer::{AtomicSurbFlowEstimator, BalancerStateValues};
10use crate::{Capability, HoprSessionConfig, SessionId, types::SESSION_SOCKET_CAPACITY};
11
12lazy_static::lazy_static! {
13    static ref METRIC_SESSION_SNAPSHOT_AT_MS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
14        "hopr_session_snapshot_at_ms",
15        "Session telemetry sample time in unix milliseconds",
16        &["session_id"]
17    ).unwrap();
18    static ref METRIC_SESSION_LIFETIME_CREATED_AT_MS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
19        "hopr_session_lifetime_created_at_ms",
20        "Session creation time in unix milliseconds",
21        &["session_id"]
22    ).unwrap();
23    static ref METRIC_SESSION_LIFETIME_LAST_ACTIVITY_AT_MS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
24        "hopr_session_lifetime_last_activity_at_ms",
25        "Last session activity time in unix milliseconds",
26        &["session_id"]
27    ).unwrap();
28    static ref METRIC_SESSION_LIFETIME_UPTIME_MS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
29        "hopr_session_lifetime_uptime_ms",
30        "Session uptime in milliseconds",
31        &["session_id"]
32    ).unwrap();
33    static ref METRIC_SESSION_LIFETIME_IDLE_MS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
34        "hopr_session_lifetime_idle_ms",
35        "Session idle time in milliseconds",
36        &["session_id"]
37    ).unwrap();
38    static ref METRIC_SESSION_LIFETIME_STATE: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
39        "hopr_session_lifetime_state",
40        "Session lifecycle state encoded as Active=0, Closing=1, Closed=2",
41        &["session_id"]
42    ).unwrap();
43    static ref METRIC_SESSION_LIFETIME_PIPELINE_ERRORS_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
44        "hopr_session_lifetime_pipeline_errors_total",
45        "Session pipeline processing errors",
46        &["session_id"]
47    ).unwrap();
48    static ref METRIC_SESSION_FRAME_MTU_BYTES: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
49        "hopr_session_frame_mtu_bytes",
50        "Configured frame MTU in bytes",
51        &["session_id"]
52    ).unwrap();
53    static ref METRIC_SESSION_FRAME_TIMEOUT_MS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
54        "hopr_session_frame_timeout_ms",
55        "Configured frame timeout in milliseconds",
56        &["session_id"]
57    ).unwrap();
58    static ref METRIC_SESSION_FRAME_FRAME_CAPACITY: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
59        "hopr_session_frame_frame_capacity",
60        "Configured frame buffer capacity",
61        &["session_id"]
62    ).unwrap();
63    static ref METRIC_SESSION_FRAME_BEING_ASSEMBLED: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
64        "hopr_session_frame_being_assembled",
65        "Number of frames currently being assembled",
66        &["session_id"]
67    ).unwrap();
68    static ref METRIC_SESSION_FRAME_COMPLETED_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
69        "hopr_session_frame_completed_total",
70        "Number of frames successfully completed",
71        &["session_id"]
72    ).unwrap();
73    static ref METRIC_SESSION_FRAME_EMITTED_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
74        "hopr_session_frame_emitted_total",
75        "Number of frames emitted from the sequencer",
76        &["session_id"]
77    ).unwrap();
78    static ref METRIC_SESSION_FRAME_DISCARDED_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
79        "hopr_session_frame_discarded_total",
80        "Number of frames discarded by the session protocol",
81        &["session_id"]
82    ).unwrap();
83    static ref METRIC_SESSION_ACK_MODE: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
84        "hopr_session_ack_mode",
85        "Configured ack mode encoded as None=0, Partial=1, Full=2, Both=3",
86        &["session_id"]
87    ).unwrap();
88    static ref METRIC_SESSION_ACK_INCOMING_SEGMENTS_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
89        "hopr_session_ack_incoming_segments_total",
90        "Incoming session segments",
91        &["session_id"]
92    ).unwrap();
93    static ref METRIC_SESSION_ACK_INCOMING_RETRANSMISSION_REQUESTS_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
94        "hopr_session_ack_incoming_retransmission_requests_total",
95        "Incoming session retransmission requests",
96        &["session_id"]
97    ).unwrap();
98    static ref METRIC_SESSION_ACK_INCOMING_ACKNOWLEDGED_FRAMES_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
99        "hopr_session_ack_incoming_acknowledged_frames_total",
100        "Incoming session acknowledgements",
101        &["session_id"]
102    ).unwrap();
103    static ref METRIC_SESSION_ACK_OUTGOING_SEGMENTS_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
104        "hopr_session_ack_outgoing_segments_total",
105        "Outgoing session segments",
106        &["session_id"]
107    ).unwrap();
108    static ref METRIC_SESSION_ACK_OUTGOING_RETRANSMISSION_REQUESTS_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
109        "hopr_session_ack_outgoing_retransmission_requests_total",
110        "Outgoing session retransmission requests",
111        &["session_id"]
112    ).unwrap();
113    static ref METRIC_SESSION_ACK_OUTGOING_ACKNOWLEDGED_FRAMES_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
114        "hopr_session_ack_outgoing_acknowledged_frames_total",
115        "Outgoing session acknowledgements",
116        &["session_id"]
117    ).unwrap();
118    static ref METRIC_SESSION_SURB_PRODUCED_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
119        "hopr_session_surb_produced_total",
120        "Produced SURBs per session",
121        &["session_id"]
122    ).unwrap();
123    static ref METRIC_SESSION_SURB_CONSUMED_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
124        "hopr_session_surb_consumed_total",
125        "Consumed SURBs per session",
126        &["session_id"]
127    ).unwrap();
128    static ref METRIC_SESSION_SURB_BUFFER_ESTIMATE: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
129        "hopr_session_surb_buffer_estimate",
130        "Estimated SURB buffer size",
131        &["session_id"]
132    ).unwrap();
133    static ref METRIC_SESSION_SURB_TARGET_BUFFER: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
134        "hopr_session_surb_target_buffer",
135        "Configured SURB target buffer size",
136        &["session_id"]
137    ).unwrap();
138    static ref METRIC_SESSION_SURB_RATE_PER_SEC: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
139        "hopr_session_surb_rate_per_sec",
140        "Estimated SURB buffer rate change per second",
141        &["session_id"]
142    ).unwrap();
143    static ref METRIC_SESSION_SURB_REFILL_IN_FLIGHT: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
144        "hopr_session_surb_refill_in_flight",
145        "Whether SURB refill is currently configured for a session (1 or 0)",
146        &["session_id"]
147    ).unwrap();
148    static ref METRIC_SESSION_TRANSPORT_BYTES_IN_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
149        "hopr_session_transport_bytes_in_total",
150        "Session ingress bytes",
151        &["session_id"]
152    ).unwrap();
153    static ref METRIC_SESSION_TRANSPORT_BYTES_OUT_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
154        "hopr_session_transport_bytes_out_total",
155        "Session egress bytes",
156        &["session_id"]
157    ).unwrap();
158    static ref METRIC_SESSION_TRANSPORT_PACKETS_IN_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
159        "hopr_session_transport_packets_in_total",
160        "Session ingress packets",
161        &["session_id"]
162    ).unwrap();
163    static ref METRIC_SESSION_TRANSPORT_PACKETS_OUT_TOTAL: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
164        "hopr_session_transport_packets_out_total",
165        "Session egress packets",
166        &["session_id"]
167    ).unwrap();
168    static ref SESSION_RUNTIME: parking_lot::Mutex<HashMap<SessionId, SessionRuntimeState>> = parking_lot::Mutex::new(HashMap::new());
169}
170
171#[derive(Clone, Copy, Debug, PartialEq, Eq, serde_repr::Serialize_repr)]
172#[repr(u8)]
173pub enum SessionLifecycleState {
174    Active = 0,
175    Closing = 1,
176    Closed = 2,
177}
178
179#[derive(Clone, Copy, Debug, PartialEq, Eq, serde_repr::Serialize_repr)]
180#[repr(u8)]
181pub enum SessionAckMode {
182    None,
183    Partial,
184    Full,
185    Both,
186}
187
188#[derive(Debug)]
189struct SessionSurbRuntimeState {
190    state: Arc<BalancerStateValues>,
191    estimator: AtomicSurbFlowEstimator,
192    last_snapshot_total: u64,
193    last_snapshot_us: u64,
194}
195
196#[derive(Debug)]
197struct SessionRuntimeState {
198    created_at_us: u64,
199    last_activity_us: u64,
200    frames_being_assembled: u64,
201    surb: Option<SessionSurbRuntimeState>,
202}
203
204impl SessionRuntimeState {
205    fn new(now_us: u64) -> Self {
206        Self {
207            created_at_us: now_us,
208            last_activity_us: now_us,
209            frames_being_assembled: 0,
210            surb: None,
211        }
212    }
213}
214
215fn session_ack_mode(capabilities: CapabilitySet) -> SessionAckMode {
216    if capabilities.contains(Capability::RetransmissionAck | Capability::RetransmissionNack) {
217        SessionAckMode::Both
218    } else if capabilities.contains(Capability::RetransmissionAck) {
219        SessionAckMode::Full
220    } else if capabilities.contains(Capability::RetransmissionNack) {
221        SessionAckMode::Partial
222    } else {
223        SessionAckMode::None
224    }
225}
226
227type CapabilitySet = flagset::FlagSet<Capability>;
228
229pub fn initialize_session_metrics(session_id: SessionId, cfg: HoprSessionConfig) {
230    let now = now_us();
231    let ack_mode = session_ack_mode(cfg.capabilities);
232
233    METRIC_SESSION_LIFETIME_CREATED_AT_MS.set(&[session_id.as_str()], now as f64 / 1_000.0);
234    METRIC_SESSION_LIFETIME_STATE.set(&[session_id.as_str()], SessionLifecycleState::Active as u8 as f64);
235    METRIC_SESSION_ACK_MODE.set(&[session_id.as_str()], ack_mode as u8 as f64);
236    METRIC_SESSION_FRAME_MTU_BYTES.set(&[session_id.as_str()], cfg.frame_mtu as f64);
237    METRIC_SESSION_FRAME_TIMEOUT_MS.set(&[session_id.as_str()], cfg.frame_timeout.as_millis() as f64);
238    METRIC_SESSION_FRAME_FRAME_CAPACITY.set(&[session_id.as_str()], SESSION_SOCKET_CAPACITY as f64);
239    METRIC_SESSION_FRAME_BEING_ASSEMBLED.set(&[session_id.as_str()], 0.0);
240    METRIC_SESSION_SURB_BUFFER_ESTIMATE.set(&[session_id.as_str()], 0.0);
241    METRIC_SESSION_SURB_TARGET_BUFFER.set(&[session_id.as_str()], 0.0);
242    METRIC_SESSION_SURB_RATE_PER_SEC.set(&[session_id.as_str()], 0.0);
243    METRIC_SESSION_SURB_REFILL_IN_FLIGHT.set(&[session_id.as_str()], 0.0);
244
245    {
246        let mut state = SESSION_RUNTIME.lock();
247        state.insert(session_id, SessionRuntimeState::new(now));
248    }
249
250    refresh_lifetime_metrics(&session_id, now, now, now);
251}
252
253pub fn remove_session_metrics_state(session_id: &SessionId) {
254    METRIC_SESSION_FRAME_BEING_ASSEMBLED.set(&[session_id.as_str()], 0.0);
255    SESSION_RUNTIME.lock().remove(session_id);
256}
257
258pub fn set_session_state(session_id: &SessionId, state: SessionLifecycleState) {
259    METRIC_SESSION_LIFETIME_STATE.set(&[session_id.as_str()], state as u8 as f64);
260    touch_session_activity(session_id);
261}
262
263fn update_session_activity_locked(
264    session_id: &SessionId,
265    now: u64,
266    state: &mut HashMap<SessionId, SessionRuntimeState>,
267) {
268    if let Some(runtime) = state.get_mut(session_id) {
269        runtime.last_activity_us = now;
270        refresh_lifetime_metrics(session_id, now, runtime.created_at_us, runtime.last_activity_us);
271        refresh_surb_gauges(session_id, runtime, now);
272    }
273}
274
275pub fn touch_session_activity(session_id: &SessionId) {
276    let now = now_us();
277    if let Some(mut state) = SESSION_RUNTIME.try_lock() {
278        update_session_activity_locked(session_id, now, &mut state);
279    }
280}
281
282pub fn record_session_read(session_id: &SessionId, bytes: usize) {
283    if bytes == 0 {
284        return;
285    }
286
287    touch_session_activity(session_id);
288    METRIC_SESSION_TRANSPORT_BYTES_IN_TOTAL.increment_by(&[session_id.as_str()], bytes as u64);
289    METRIC_SESSION_TRANSPORT_PACKETS_IN_TOTAL.increment_by(&[session_id.as_str()], 1);
290}
291
292pub fn record_session_write(session_id: &SessionId, bytes: usize) {
293    if bytes == 0 {
294        return;
295    }
296
297    touch_session_activity(session_id);
298    METRIC_SESSION_TRANSPORT_BYTES_OUT_TOTAL.increment_by(&[session_id.as_str()], bytes as u64);
299    METRIC_SESSION_TRANSPORT_PACKETS_OUT_TOTAL.increment_by(&[session_id.as_str()], 1);
300}
301
302pub fn set_session_balancer_data(
303    session_id: &SessionId,
304    estimator: AtomicSurbFlowEstimator,
305    state: Arc<BalancerStateValues>,
306) {
307    let now = now_us();
308    {
309        let mut all = SESSION_RUNTIME.lock();
310        let runtime = all.entry(*session_id).or_insert_with(|| SessionRuntimeState::new(now));
311        runtime.surb = Some(SessionSurbRuntimeState {
312            state,
313            estimator,
314            last_snapshot_total: 0,
315            last_snapshot_us: now,
316        });
317    }
318
319    METRIC_SESSION_SURB_REFILL_IN_FLIGHT.set(&[session_id.as_str()], 1.0);
320    touch_session_activity(session_id);
321}
322
323pub fn record_session_surb_produced(session_id: &SessionId, by: u64) {
324    METRIC_SESSION_SURB_PRODUCED_TOTAL.increment_by(&[session_id.as_str()], by);
325    touch_session_activity(session_id);
326}
327
328pub fn record_session_surb_consumed(session_id: &SessionId, by: u64) {
329    METRIC_SESSION_SURB_CONSUMED_TOTAL.increment_by(&[session_id.as_str()], by);
330    touch_session_activity(session_id);
331}
332
333fn refresh_lifetime_metrics(session_id: &SessionId, now_us: u64, created_at_us: u64, last_activity_us: u64) {
334    METRIC_SESSION_SNAPSHOT_AT_MS.set(&[session_id.as_str()], now_us as f64 / 1_000.0);
335    METRIC_SESSION_LIFETIME_LAST_ACTIVITY_AT_MS.set(&[session_id.as_str()], last_activity_us as f64 / 1_000.0);
336    METRIC_SESSION_LIFETIME_UPTIME_MS.set(
337        &[session_id.as_str()],
338        now_us.saturating_sub(created_at_us) as f64 / 1_000.0,
339    );
340    METRIC_SESSION_LIFETIME_IDLE_MS.set(
341        &[session_id.as_str()],
342        now_us.saturating_sub(last_activity_us) as f64 / 1_000.0,
343    );
344}
345
346fn refresh_surb_gauges(session_id: &SessionId, runtime: &mut SessionRuntimeState, now_us: u64) {
347    let Some(surb) = runtime.surb.as_mut() else {
348        return;
349    };
350
351    let produced = surb.estimator.produced.load(std::sync::atomic::Ordering::Relaxed);
352    let consumed = surb.estimator.consumed.load(std::sync::atomic::Ordering::Relaxed);
353    let total = produced.saturating_sub(consumed);
354
355    let elapsed_us = now_us.saturating_sub(surb.last_snapshot_us);
356    let rate_per_sec = if elapsed_us == 0 {
357        0.0
358    } else {
359        let delta = total as i64 - surb.last_snapshot_total as i64;
360        delta as f64 / (elapsed_us as f64 / 1_000_000.0)
361    };
362
363    surb.last_snapshot_total = total;
364    surb.last_snapshot_us = now_us;
365
366    METRIC_SESSION_SURB_TARGET_BUFFER.set(
367        &[session_id.as_str()],
368        surb.state
369            .target_surb_buffer_size
370            .load(std::sync::atomic::Ordering::Relaxed) as f64,
371    );
372    METRIC_SESSION_SURB_BUFFER_ESTIMATE.set(&[session_id.as_str()], total as f64);
373    METRIC_SESSION_SURB_RATE_PER_SEC.set(&[session_id.as_str()], rate_per_sec);
374}
375
376fn now_us() -> u64 {
377    SystemTime::now()
378        .duration_since(UNIX_EPOCH)
379        .unwrap_or(Duration::from_micros(0))
380        .as_micros() as u64
381}
382
383fn increment_frame_assembly_gauge(session_id: &SessionId) {
384    let mut state = SESSION_RUNTIME.lock();
385    let runtime = state
386        .entry(*session_id)
387        .or_insert_with(|| SessionRuntimeState::new(now_us()));
388    runtime.frames_being_assembled = runtime.frames_being_assembled.saturating_add(1);
389    METRIC_SESSION_FRAME_BEING_ASSEMBLED.increment(&[session_id.as_str()], 1.0);
390}
391
392fn decrement_frame_assembly_gauge(session_id: &SessionId) {
393    let mut state = SESSION_RUNTIME.lock();
394    let Some(runtime) = state.get_mut(session_id) else {
395        return;
396    };
397
398    if runtime.frames_being_assembled == 0 {
399        return;
400    }
401
402    runtime.frames_being_assembled -= 1;
403    METRIC_SESSION_FRAME_BEING_ASSEMBLED.decrement(&[session_id.as_str()], 1.0);
404}
405
406impl hopr_protocol_session::SessionTelemetryTracker for SessionId {
407    fn frame_emitted(&self) {
408        METRIC_SESSION_FRAME_EMITTED_TOTAL.increment(&[self.as_str()]);
409    }
410
411    fn frame_completed(&self) {
412        METRIC_SESSION_FRAME_COMPLETED_TOTAL.increment(&[self.as_str()]);
413        decrement_frame_assembly_gauge(self);
414    }
415
416    fn frame_discarded(&self) {
417        METRIC_SESSION_FRAME_DISCARDED_TOTAL.increment(&[self.as_str()]);
418        decrement_frame_assembly_gauge(self);
419    }
420
421    fn incomplete_frame(&self) {
422        increment_frame_assembly_gauge(self);
423    }
424
425    fn incoming_message(&self, msg: SessionMessageDiscriminants) {
426        match msg {
427            SessionMessageDiscriminants::Segment => {
428                METRIC_SESSION_ACK_INCOMING_SEGMENTS_TOTAL.increment(&[self.as_str()])
429            }
430            SessionMessageDiscriminants::Request => {
431                METRIC_SESSION_ACK_INCOMING_RETRANSMISSION_REQUESTS_TOTAL.increment(&[self.as_str()])
432            }
433            SessionMessageDiscriminants::Acknowledge => {
434                METRIC_SESSION_ACK_INCOMING_ACKNOWLEDGED_FRAMES_TOTAL.increment(&[self.as_str()])
435            }
436        }
437    }
438
439    fn outgoing_message(&self, msg: SessionMessageDiscriminants) {
440        match msg {
441            SessionMessageDiscriminants::Segment => {
442                METRIC_SESSION_ACK_OUTGOING_SEGMENTS_TOTAL.increment(&[self.as_str()])
443            }
444            SessionMessageDiscriminants::Request => {
445                METRIC_SESSION_ACK_OUTGOING_RETRANSMISSION_REQUESTS_TOTAL.increment(&[self.as_str()])
446            }
447            SessionMessageDiscriminants::Acknowledge => {
448                METRIC_SESSION_ACK_OUTGOING_ACKNOWLEDGED_FRAMES_TOTAL.increment(&[self.as_str()])
449            }
450        }
451    }
452
453    fn error(&self) {
454        METRIC_SESSION_LIFETIME_PIPELINE_ERRORS_TOTAL.increment(&[self.as_str()]);
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use hopr_protocol_session::SessionTelemetryTracker;
461    use hopr_types::{crypto_random::Randomizable, internal::prelude::HoprPseudonym};
462
463    use super::*;
464
465    #[test]
466    fn session_metrics_are_exported_through_hopr_metrics() {
467        let id = SessionId::new(4_u64, HoprPseudonym::random());
468        initialize_session_metrics(id, HoprSessionConfig::default());
469        record_session_read(&id, 10);
470        id.frame_completed();
471
472        let text = hopr_metrics::gather_all_metrics().expect("must gather metrics");
473        let session_id = id.to_string();
474        let ingress_metric = format!("hopr_session_transport_bytes_in_total{{session_id=\"{session_id}\"}} 10");
475        let frame_metric = format!("hopr_session_frame_completed_total{{session_id=\"{session_id}\"}} 1");
476        let mode_metric = format!(
477            "hopr_session_ack_mode{{session_id=\"{session_id}\"}} {}",
478            SessionAckMode::None as u8
479        );
480
481        assert!(text.contains(&ingress_metric));
482        assert!(text.contains(&frame_metric));
483        assert!(text.contains(&mode_metric));
484    }
485
486    #[test]
487    fn surb_metrics_are_exported_through_hopr_metrics() {
488        let id = SessionId::new(5_u64, HoprPseudonym::random());
489        initialize_session_metrics(id, HoprSessionConfig::default());
490        let estimator = AtomicSurbFlowEstimator::default();
491        let state = Arc::new(BalancerStateValues::new(Default::default()));
492
493        set_session_balancer_data(&id, estimator, Arc::clone(&state));
494        record_session_surb_produced(&id, 8);
495        record_session_surb_consumed(&id, 3);
496
497        let text = hopr_metrics::gather_all_metrics().expect("must gather metrics");
498        let session_id = id.to_string();
499        let produced_metric = format!("hopr_session_surb_produced_total{{session_id=\"{session_id}\"}} 8");
500        let consumed_metric = format!("hopr_session_surb_consumed_total{{session_id=\"{session_id}\"}} 3");
501
502        assert!(text.contains(&produced_metric));
503        assert!(text.contains(&consumed_metric));
504    }
505}