Skip to main content

hopr_metrics/
metrics.rs

1use std::sync::{
2    OnceLock, RwLock,
3    atomic::{AtomicU64, Ordering},
4};
5
6use opentelemetry::{
7    KeyValue,
8    metrics::{Counter, Histogram, MeterProvider as _, UpDownCounter},
9};
10use opentelemetry_sdk::metrics::SdkMeterProvider;
11
12/// Error type for metric operations.
13#[derive(Debug)]
14pub struct MetricError(String);
15
16impl std::fmt::Display for MetricError {
17    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18        f.write_str(&self.0)
19    }
20}
21
22impl std::error::Error for MetricError {}
23
24/// Result type for metric operations.
25pub type MetricResult<T> = Result<T, MetricError>;
26
27// ---------------------------------------------------------------------------
28// Global metric state
29// ---------------------------------------------------------------------------
30
31struct GlobalMetricState {
32    exporter: opentelemetry_prometheus_text_exporter::PrometheusExporter,
33    provider: SdkMeterProvider,
34    prefix_providers: RwLock<Vec<PrefixProvider>>,
35}
36
37static GLOBAL_STATE: OnceLock<GlobalMetricState> = OnceLock::new();
38
39#[derive(Clone)]
40struct PrefixProvider {
41    prefix: String,
42    provider: SdkMeterProvider,
43}
44
45fn global_state() -> &'static GlobalMetricState {
46    GLOBAL_STATE.get_or_init(|| {
47        let exporter = opentelemetry_prometheus_text_exporter::PrometheusExporter::builder()
48            .without_counter_suffixes()
49            .without_units()
50            .without_target_info()
51            .without_scope_info()
52            .build();
53        let provider = SdkMeterProvider::builder().with_reader(exporter.clone()).build();
54        GlobalMetricState {
55            exporter,
56            provider,
57            prefix_providers: RwLock::new(Vec::new()),
58        }
59    })
60}
61
62/// Initializes the global metric state with the given exporter and provider.
63///
64/// This must be called **before** any metric is created if you want all
65/// instruments to feed into a custom provider (e.g. one that also has an OTLP reader).
66///
67/// Returns `false` if the state was already initialized.
68pub fn init_with_provider(
69    exporter: opentelemetry_prometheus_text_exporter::PrometheusExporter,
70    provider: SdkMeterProvider,
71) -> bool {
72    GLOBAL_STATE
73        .set(GlobalMetricState {
74            exporter,
75            provider,
76            prefix_providers: RwLock::new(Vec::new()),
77        })
78        .is_ok()
79}
80
81/// Registers a meter provider for metric names with the given `prefix`.
82///
83/// The longest matching prefix wins when multiple prefixes match a metric name.
84pub fn register_prefix_provider(prefix: &str, provider: SdkMeterProvider) -> bool {
85    if prefix.is_empty() {
86        return false;
87    }
88
89    let Some(state) = GLOBAL_STATE.get() else {
90        return false;
91    };
92
93    let mut prefix_providers = state
94        .prefix_providers
95        .write()
96        .expect("prefix provider lock must not be poisoned");
97
98    if let Some(existing) = prefix_providers.iter_mut().find(|entry| entry.prefix == prefix) {
99        existing.provider = provider;
100        return true;
101    }
102
103    prefix_providers.push(PrefixProvider {
104        prefix: prefix.to_string(),
105        provider,
106    });
107    prefix_providers.sort_by(|a, b| b.prefix.len().cmp(&a.prefix.len()));
108    true
109}
110
111fn meter_for_metric(metric_name: &str) -> opentelemetry::metrics::Meter {
112    let state = global_state();
113
114    if let Some(provider) = state
115        .prefix_providers
116        .read()
117        .expect("prefix provider lock must not be poisoned")
118        .iter()
119        .find(|entry| metric_name.starts_with(&entry.prefix))
120        .map(|entry| entry.provider.clone())
121    {
122        return provider.meter("hopr");
123    }
124
125    state.provider.meter("hopr")
126}
127
128/// Gathers all metrics in Prometheus text exposition format.
129pub fn gather_all_metrics() -> MetricResult<String> {
130    let state = global_state();
131    let mut buf = Vec::new();
132    state
133        .exporter
134        .export(&mut buf)
135        .map_err(|e| MetricError(e.to_string()))?;
136    String::from_utf8(buf).map_err(|e| MetricError(e.to_string()))
137}
138
139// ---------------------------------------------------------------------------
140// Helpers
141// ---------------------------------------------------------------------------
142
143fn labels_to_attributes(keys: &[String], values: &[&str]) -> Vec<KeyValue> {
144    debug_assert_eq!(
145        keys.len(),
146        values.len(),
147        "label key count ({}) must match value count ({})",
148        keys.len(),
149        values.len()
150    );
151    keys.iter()
152        .zip(values.iter())
153        .map(|(k, v)| KeyValue::new(k.clone(), v.to_string()))
154        .collect()
155}
156
157/// Stores an `f64` in an `AtomicU64` via bit reinterpretation.
158struct AtomicF64(AtomicU64);
159
160impl AtomicF64 {
161    fn new(val: f64) -> Self {
162        Self(AtomicU64::new(val.to_bits()))
163    }
164
165    fn load(&self) -> f64 {
166        f64::from_bits(self.0.load(Ordering::Relaxed))
167    }
168
169    /// Atomically swaps the value and returns the previous one.
170    fn swap(&self, new: f64) -> f64 {
171        f64::from_bits(self.0.swap(new.to_bits(), Ordering::Relaxed))
172    }
173
174    fn fetch_add(&self, delta: f64) -> f64 {
175        loop {
176            let current = self.0.load(Ordering::Relaxed);
177            let current_f64 = f64::from_bits(current);
178            let new_f64 = current_f64 + delta;
179            if self
180                .0
181                .compare_exchange_weak(current, new_f64.to_bits(), Ordering::Relaxed, Ordering::Relaxed)
182                .is_ok()
183            {
184                return new_f64;
185            }
186        }
187    }
188}
189
190// ---------------------------------------------------------------------------
191// SimpleCounter
192// ---------------------------------------------------------------------------
193
194/// Represents a simple monotonic unsigned integer counter.
195pub struct SimpleCounter {
196    name: String,
197    ctr: Counter<u64>,
198    shadow: AtomicU64,
199}
200
201impl SimpleCounter {
202    /// Creates a new integer counter with given name and description.
203    pub fn new(name: &str, description: &str) -> MetricResult<Self> {
204        let ctr = meter_for_metric(name)
205            .u64_counter(name.to_string())
206            .with_description(description.to_string())
207            .build();
208        Ok(Self {
209            name: name.to_string(),
210            ctr,
211            shadow: AtomicU64::new(0),
212        })
213    }
214
215    /// Retrieves the value of the counter.
216    pub fn get(&self) -> u64 {
217        self.shadow.load(Ordering::Relaxed)
218    }
219
220    /// Increments the counter by the given number.
221    pub fn increment_by(&self, by: u64) {
222        self.ctr.add(by, &[]);
223        self.shadow.fetch_add(by, Ordering::Relaxed);
224    }
225
226    /// Increments the counter by 1.
227    pub fn increment(&self) {
228        self.increment_by(1);
229    }
230
231    /// Returns the name of the counter given at construction.
232    pub fn name(&self) -> String {
233        self.name.clone()
234    }
235}
236
237// ---------------------------------------------------------------------------
238// MultiCounter
239// ---------------------------------------------------------------------------
240
241/// Represents a vector of named monotonic unsigned integer counters.
242pub struct MultiCounter {
243    name: String,
244    labels: Vec<String>,
245    ctr: Counter<u64>,
246}
247
248impl MultiCounter {
249    /// Creates a new vector of integer counters with given name, description and counter labels.
250    pub fn new(name: &str, description: &str, labels: &[&str]) -> MetricResult<Self> {
251        if labels.is_empty() {
252            return Err(MetricError("at least a single label must be specified".into()));
253        }
254        let ctr = meter_for_metric(name)
255            .u64_counter(name.to_string())
256            .with_description(description.to_string())
257            .build();
258        Ok(Self {
259            name: name.to_string(),
260            labels: labels.iter().map(|s| s.to_string()).collect(),
261            ctr,
262        })
263    }
264
265    /// Increments counter with given labels by the given number.
266    pub fn increment_by(&self, label_values: &[&str], by: u64) {
267        let attrs = labels_to_attributes(&self.labels, label_values);
268        self.ctr.add(by, &attrs);
269    }
270
271    /// Increments counter with given labels by 1.
272    pub fn increment(&self, label_values: &[&str]) {
273        self.increment_by(label_values, 1);
274    }
275
276    /// Returns the name of the counter vector given at construction.
277    pub fn name(&self) -> String {
278        self.name.clone()
279    }
280
281    /// Returns the labels of the counters given at construction.
282    pub fn labels(&self) -> Vec<&str> {
283        self.labels.iter().map(String::as_str).collect()
284    }
285}
286
287// ---------------------------------------------------------------------------
288// SimpleGauge
289// ---------------------------------------------------------------------------
290
291/// Represents a simple gauge with floating point values.
292pub struct SimpleGauge {
293    name: String,
294    gauge: UpDownCounter<f64>,
295    shadow: AtomicF64,
296}
297
298impl SimpleGauge {
299    /// Creates a new gauge with given name and description.
300    pub fn new(name: &str, description: &str) -> MetricResult<Self> {
301        let gauge = meter_for_metric(name)
302            .f64_up_down_counter(name.to_string())
303            .with_description(description.to_string())
304            .build();
305        Ok(Self {
306            name: name.to_string(),
307            gauge,
308            shadow: AtomicF64::new(0.0),
309        })
310    }
311
312    /// Increments the gauge by the given value.
313    pub fn increment(&self, by: f64) {
314        self.gauge.add(by, &[]);
315        self.shadow.fetch_add(by);
316    }
317
318    /// Decrements the gauge by the given value.
319    pub fn decrement(&self, by: f64) {
320        self.gauge.add(-by, &[]);
321        self.shadow.fetch_add(-by);
322    }
323
324    /// Sets the gauge to the given value.
325    pub fn set(&self, value: f64) {
326        let previous = self.shadow.swap(value);
327        self.gauge.add(value - previous, &[]);
328    }
329
330    /// Retrieves the value of the gauge.
331    pub fn get(&self) -> f64 {
332        self.shadow.load()
333    }
334
335    /// Returns the name of the gauge given at construction.
336    pub fn name(&self) -> String {
337        self.name.clone()
338    }
339}
340
341// ---------------------------------------------------------------------------
342// MultiGauge
343// ---------------------------------------------------------------------------
344
345/// Represents a vector of gauges with floating point values.
346pub struct MultiGauge {
347    name: String,
348    labels: Vec<String>,
349    gauge: UpDownCounter<f64>,
350    shadow: std::sync::RwLock<std::collections::HashMap<Vec<String>, AtomicF64>>,
351}
352
353impl MultiGauge {
354    /// Creates a new vector of gauges with given name, description and counter labels.
355    pub fn new(name: &str, description: &str, labels: &[&str]) -> MetricResult<Self> {
356        if labels.is_empty() {
357            return Err(MetricError("at least a single label must be specified".into()));
358        }
359        let gauge = meter_for_metric(name)
360            .f64_up_down_counter(name.to_string())
361            .with_description(description.to_string())
362            .build();
363        Ok(Self {
364            name: name.to_string(),
365            labels: labels.iter().map(|s| s.to_string()).collect(),
366            gauge,
367            shadow: std::sync::RwLock::new(std::collections::HashMap::new()),
368        })
369    }
370
371    fn shadow_entry(&self, label_values: &[&str]) -> Vec<String> {
372        label_values.iter().map(|s| s.to_string()).collect()
373    }
374
375    fn ensure_shadow(&self, key: &[String]) {
376        {
377            let read = self.shadow.read().unwrap();
378            if read.contains_key(key) {
379                return;
380            }
381        }
382        let mut write = self.shadow.write().unwrap();
383        write.entry(key.to_vec()).or_insert_with(|| AtomicF64::new(0.0));
384    }
385
386    /// Increments gauge with given labels by the given number.
387    pub fn increment(&self, label_values: &[&str], by: f64) {
388        let attrs = labels_to_attributes(&self.labels, label_values);
389        self.gauge.add(by, &attrs);
390        let key = self.shadow_entry(label_values);
391        self.ensure_shadow(&key);
392        let read = self.shadow.read().unwrap();
393        if let Some(v) = read.get(&key) {
394            v.fetch_add(by);
395        }
396    }
397
398    /// Decrements gauge with given labels by the given number.
399    pub fn decrement(&self, label_values: &[&str], by: f64) {
400        let attrs = labels_to_attributes(&self.labels, label_values);
401        self.gauge.add(-by, &attrs);
402        let key = self.shadow_entry(label_values);
403        self.ensure_shadow(&key);
404        let read = self.shadow.read().unwrap();
405        if let Some(v) = read.get(&key) {
406            v.fetch_add(-by);
407        }
408    }
409
410    /// Sets gauge with given labels to the given value.
411    pub fn set(&self, label_values: &[&str], value: f64) {
412        let key = self.shadow_entry(label_values);
413        self.ensure_shadow(&key);
414        let attrs = labels_to_attributes(&self.labels, label_values);
415        let read = self.shadow.read().unwrap();
416        if let Some(v) = read.get(&key) {
417            let previous = v.swap(value);
418            self.gauge.add(value - previous, &attrs);
419        }
420    }
421
422    /// Retrieves the value of the specified gauge.
423    pub fn get(&self, label_values: &[&str]) -> Option<f64> {
424        let key = self.shadow_entry(label_values);
425        let read = self.shadow.read().unwrap();
426        read.get(&key).map(|v| v.load())
427    }
428
429    /// Returns the name of the gauge vector given at construction.
430    pub fn name(&self) -> String {
431        self.name.clone()
432    }
433
434    /// Returns the labels of the counters given at construction.
435    pub fn labels(&self) -> Vec<&str> {
436        self.labels.iter().map(String::as_str).collect()
437    }
438}
439
440// ---------------------------------------------------------------------------
441// Timer
442// ---------------------------------------------------------------------------
443
444/// Macro to start a timer measurement on a histogram.
445#[macro_export]
446macro_rules! histogram_start_measure {
447    // SimpleHistogram case
448    ($v:ident) => {
449        $v.start_measure()
450    };
451    // MultiHistogram case
452    ($v:ident, $l:expr) => {
453        $v.start_measure($l)
454    };
455}
456
457/// Represents a timer handle.
458pub struct SimpleTimer {
459    start: std::time::Instant,
460    labels: Option<Vec<KeyValue>>,
461}
462
463// ---------------------------------------------------------------------------
464// SimpleHistogram
465// ---------------------------------------------------------------------------
466
467/// Represents a histogram with floating point values.
468pub struct SimpleHistogram {
469    name: String,
470    hh: Histogram<f64>,
471}
472
473impl SimpleHistogram {
474    /// Creates a new histogram with the given name, description and buckets.
475    /// If no buckets are specified, they will be defined automatically.
476    /// The +Inf bucket is always added automatically.
477    pub fn new(name: &str, description: &str, buckets: Vec<f64>) -> MetricResult<Self> {
478        let hh = meter_for_metric(name)
479            .f64_histogram(name.to_string())
480            .with_description(description.to_string())
481            .with_boundaries(buckets)
482            .build();
483        Ok(Self {
484            name: name.to_string(),
485            hh,
486        })
487    }
488
489    /// Records a value observation to the histogram.
490    pub fn observe(&self, value: f64) {
491        self.hh.record(value, &[]);
492    }
493
494    /// Starts a timer.
495    pub fn start_measure(&self) -> SimpleTimer {
496        SimpleTimer {
497            start: std::time::Instant::now(),
498            labels: None,
499        }
500    }
501
502    /// Stops the given timer and records the elapsed duration in seconds to the histogram.
503    pub fn record_measure(&self, timer: SimpleTimer) {
504        self.hh.record(timer.start.elapsed().as_secs_f64(), &[]);
505    }
506
507    /// Stops the given timer and discards the measured duration in seconds and returns it.
508    pub fn cancel_measure(&self, timer: SimpleTimer) -> f64 {
509        timer.start.elapsed().as_secs_f64()
510    }
511
512    /// Returns the name of the histogram given at construction.
513    pub fn name(&self) -> String {
514        self.name.clone()
515    }
516}
517
518// ---------------------------------------------------------------------------
519// MultiHistogram
520// ---------------------------------------------------------------------------
521
522/// Represents a vector of histograms with floating point values.
523pub struct MultiHistogram {
524    name: String,
525    labels: Vec<String>,
526    hh: Histogram<f64>,
527}
528
529impl MultiHistogram {
530    /// Creates a new histogram with the given name, description, buckets and labels.
531    /// If no buckets are specified, they will be defined automatically.
532    /// The +Inf bucket is always added automatically.
533    pub fn new(name: &str, description: &str, buckets: Vec<f64>, labels: &[&str]) -> MetricResult<Self> {
534        if labels.is_empty() {
535            return Err(MetricError("at least a single label must be specified".into()));
536        }
537        let hh = meter_for_metric(name)
538            .f64_histogram(name.to_string())
539            .with_description(description.to_string())
540            .with_boundaries(buckets)
541            .build();
542        Ok(Self {
543            name: name.to_string(),
544            labels: labels.iter().map(|s| s.to_string()).collect(),
545            hh,
546        })
547    }
548
549    /// Starts a timer for a histogram with the given labels.
550    pub fn start_measure(&self, label_values: &[&str]) -> MetricResult<SimpleTimer> {
551        Ok(SimpleTimer {
552            start: std::time::Instant::now(),
553            labels: Some(labels_to_attributes(&self.labels, label_values)),
554        })
555    }
556
557    /// Records a value observation to the histogram with the given labels.
558    pub fn observe(&self, label_values: &[&str], value: f64) {
559        let attrs = labels_to_attributes(&self.labels, label_values);
560        self.hh.record(value, &attrs);
561    }
562
563    /// Stops the given timer and records the elapsed duration in seconds to the multi-histogram.
564    pub fn record_measure(&self, timer: SimpleTimer) {
565        let elapsed = timer.start.elapsed().as_secs_f64();
566        let attrs = timer.labels.as_deref().unwrap_or(&[]);
567        self.hh.record(elapsed, attrs);
568    }
569
570    /// Stops the given timer and discards the measured duration in seconds and returns it.
571    pub fn cancel_measure(&self, timer: SimpleTimer) -> f64 {
572        timer.start.elapsed().as_secs_f64()
573    }
574
575    /// Returns the name of the histogram given at construction.
576    pub fn name(&self) -> String {
577        self.name.clone()
578    }
579
580    /// Returns the labels of the counters given at construction.
581    pub fn labels(&self) -> Vec<&str> {
582        self.labels.iter().map(String::as_str).collect()
583    }
584}
585
586#[cfg(test)]
587mod tests {
588    use anyhow::Context;
589
590    use super::*;
591
592    #[test]
593    fn simple_counter() -> anyhow::Result<()> {
594        let counter = SimpleCounter::new("otel_my_ctr", "test counter")?;
595
596        assert_eq!("otel_my_ctr", counter.name());
597
598        counter.increment();
599        assert_eq!(1, counter.get());
600
601        counter.increment_by(9);
602        assert_eq!(10, counter.get());
603
604        let metrics = gather_all_metrics().context("gather_all_metrics")?;
605        assert!(
606            metrics.contains("otel_my_ctr"),
607            "Prometheus text must contain counter name"
608        );
609
610        Ok(())
611    }
612
613    #[test]
614    fn multi_counter() -> anyhow::Result<()> {
615        let counter = MultiCounter::new("otel_my_mctr", "test multicounter", &["version"])?;
616
617        assert_eq!("otel_my_mctr", counter.name());
618        assert!(counter.labels().contains(&"version"));
619
620        counter.increment_by(&["1.90.1"], 10);
621        counter.increment_by(&["1.89.20"], 1);
622        counter.increment_by(&["1.90.1"], 15);
623
624        let metrics = gather_all_metrics().context("gather_all_metrics")?;
625        assert!(
626            metrics.contains("otel_my_mctr"),
627            "Prometheus text must contain multi counter name"
628        );
629        assert!(
630            metrics.contains("version=\"1.90.1\""),
631            "Prometheus text must contain label value"
632        );
633
634        Ok(())
635    }
636
637    #[test]
638    fn simple_gauge() -> anyhow::Result<()> {
639        let gauge = SimpleGauge::new("otel_my_gauge", "test gauge")?;
640
641        assert_eq!("otel_my_gauge", gauge.name());
642
643        gauge.increment(10.0);
644        assert_eq!(10.0, gauge.get());
645
646        gauge.decrement(5.1);
647        assert!((gauge.get() - 4.9).abs() < f64::EPSILON);
648
649        gauge.set(100.0);
650        assert_eq!(100.0, gauge.get());
651
652        let metrics = gather_all_metrics().context("gather_all_metrics")?;
653        assert!(
654            metrics.contains("otel_my_gauge"),
655            "Prometheus text must contain gauge name"
656        );
657
658        Ok(())
659    }
660
661    #[test]
662    fn multi_gauge() -> anyhow::Result<()> {
663        let gauge = MultiGauge::new("otel_my_mgauge", "test multigauge", &["version"])?;
664
665        assert_eq!("otel_my_mgauge", gauge.name());
666        assert!(gauge.labels().contains(&"version"));
667
668        gauge.increment(&["1.90.1"], 10.0);
669        gauge.increment(&["1.89.20"], 5.0);
670        gauge.increment(&["1.90.1"], 15.0);
671        gauge.decrement(&["1.89.20"], 2.0);
672
673        assert_eq!(25.0, gauge.get(&["1.90.1"]).context("should be present")?);
674        assert_eq!(3.0, gauge.get(&["1.89.20"]).context("should be present")?);
675
676        let metrics = gather_all_metrics().context("gather_all_metrics")?;
677        assert!(
678            metrics.contains("otel_my_mgauge"),
679            "Prometheus text must contain multi gauge name"
680        );
681
682        Ok(())
683    }
684
685    #[test]
686    fn simple_histogram() -> anyhow::Result<()> {
687        let histogram = SimpleHistogram::new("otel_my_histogram", "test histogram", vec![1.0, 2.0, 3.0, 4.0, 5.0])?;
688
689        assert_eq!("otel_my_histogram", histogram.name());
690
691        histogram.observe(2.0);
692        histogram.observe(2.0);
693        histogram.observe(1.0);
694        histogram.observe(5.0);
695
696        let timer = histogram_start_measure!(histogram);
697        histogram.cancel_measure(timer);
698
699        let metrics = gather_all_metrics().context("gather_all_metrics")?;
700        assert!(
701            metrics.contains("otel_my_histogram"),
702            "Prometheus text must contain histogram name"
703        );
704
705        Ok(())
706    }
707
708    #[test]
709    fn multi_histogram() -> anyhow::Result<()> {
710        let histogram = MultiHistogram::new(
711            "otel_my_mhistogram",
712            "test histogram",
713            vec![1.0, 2.0, 3.0, 4.0, 5.0],
714            &["version"],
715        )?;
716
717        assert_eq!("otel_my_mhistogram", histogram.name());
718        assert!(histogram.labels().contains(&"version"));
719
720        histogram.observe(&["1.90.0"], 2.0);
721        histogram.observe(&["1.90.0"], 2.0);
722        histogram.observe(&["1.90.0"], 1.0);
723        histogram.observe(&["1.90.0"], 5.0);
724        histogram.observe(&["1.89.20"], 10.0);
725
726        let timer = histogram_start_measure!(histogram, &["1.90.0"])?;
727        histogram.cancel_measure(timer);
728
729        let metrics = gather_all_metrics().context("gather_all_metrics")?;
730        assert!(
731            metrics.contains("otel_my_mhistogram"),
732            "Prometheus text must contain multi histogram name"
733        );
734
735        Ok(())
736    }
737}