1use std::sync::{
2 OnceLock, RwLock,
3 atomic::{AtomicU64, Ordering},
4};
5
6use opentelemetry::{
7 KeyValue,
8 metrics::{Counter, Histogram, MeterProvider as _, UpDownCounter},
9};
10use opentelemetry_sdk::metrics::SdkMeterProvider;
11
12#[derive(Debug)]
14pub struct MetricError(String);
15
16impl std::fmt::Display for MetricError {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 f.write_str(&self.0)
19 }
20}
21
22impl std::error::Error for MetricError {}
23
24pub type MetricResult<T> = Result<T, MetricError>;
26
27struct GlobalMetricState {
32 exporter: opentelemetry_prometheus_text_exporter::PrometheusExporter,
33 provider: SdkMeterProvider,
34 prefix_providers: RwLock<Vec<PrefixProvider>>,
35}
36
37static GLOBAL_STATE: OnceLock<GlobalMetricState> = OnceLock::new();
38
39#[derive(Clone)]
40struct PrefixProvider {
41 prefix: String,
42 provider: SdkMeterProvider,
43}
44
45fn global_state() -> &'static GlobalMetricState {
46 GLOBAL_STATE.get_or_init(|| {
47 let exporter = opentelemetry_prometheus_text_exporter::PrometheusExporter::builder()
48 .without_counter_suffixes()
49 .without_units()
50 .without_target_info()
51 .without_scope_info()
52 .build();
53 let provider = SdkMeterProvider::builder().with_reader(exporter.clone()).build();
54 GlobalMetricState {
55 exporter,
56 provider,
57 prefix_providers: RwLock::new(Vec::new()),
58 }
59 })
60}
61
62pub fn init_with_provider(
69 exporter: opentelemetry_prometheus_text_exporter::PrometheusExporter,
70 provider: SdkMeterProvider,
71) -> bool {
72 GLOBAL_STATE
73 .set(GlobalMetricState {
74 exporter,
75 provider,
76 prefix_providers: RwLock::new(Vec::new()),
77 })
78 .is_ok()
79}
80
81pub fn register_prefix_provider(prefix: &str, provider: SdkMeterProvider) -> bool {
85 if prefix.is_empty() {
86 return false;
87 }
88
89 let Some(state) = GLOBAL_STATE.get() else {
90 return false;
91 };
92
93 let mut prefix_providers = state
94 .prefix_providers
95 .write()
96 .expect("prefix provider lock must not be poisoned");
97
98 if let Some(existing) = prefix_providers.iter_mut().find(|entry| entry.prefix == prefix) {
99 existing.provider = provider;
100 return true;
101 }
102
103 prefix_providers.push(PrefixProvider {
104 prefix: prefix.to_string(),
105 provider,
106 });
107 prefix_providers.sort_by(|a, b| b.prefix.len().cmp(&a.prefix.len()));
108 true
109}
110
111fn meter_for_metric(metric_name: &str) -> opentelemetry::metrics::Meter {
112 let state = global_state();
113
114 if let Some(provider) = state
115 .prefix_providers
116 .read()
117 .expect("prefix provider lock must not be poisoned")
118 .iter()
119 .find(|entry| metric_name.starts_with(&entry.prefix))
120 .map(|entry| entry.provider.clone())
121 {
122 return provider.meter("hopr");
123 }
124
125 state.provider.meter("hopr")
126}
127
128pub fn gather_all_metrics() -> MetricResult<String> {
130 let state = global_state();
131 let mut buf = Vec::new();
132 state
133 .exporter
134 .export(&mut buf)
135 .map_err(|e| MetricError(e.to_string()))?;
136 String::from_utf8(buf).map_err(|e| MetricError(e.to_string()))
137}
138
139fn labels_to_attributes(keys: &[String], values: &[&str]) -> Vec<KeyValue> {
144 debug_assert_eq!(
145 keys.len(),
146 values.len(),
147 "label key count ({}) must match value count ({})",
148 keys.len(),
149 values.len()
150 );
151 keys.iter()
152 .zip(values.iter())
153 .map(|(k, v)| KeyValue::new(k.clone(), v.to_string()))
154 .collect()
155}
156
157struct AtomicF64(AtomicU64);
159
160impl AtomicF64 {
161 fn new(val: f64) -> Self {
162 Self(AtomicU64::new(val.to_bits()))
163 }
164
165 fn load(&self) -> f64 {
166 f64::from_bits(self.0.load(Ordering::Relaxed))
167 }
168
169 fn swap(&self, new: f64) -> f64 {
171 f64::from_bits(self.0.swap(new.to_bits(), Ordering::Relaxed))
172 }
173
174 fn fetch_add(&self, delta: f64) -> f64 {
175 loop {
176 let current = self.0.load(Ordering::Relaxed);
177 let current_f64 = f64::from_bits(current);
178 let new_f64 = current_f64 + delta;
179 if self
180 .0
181 .compare_exchange_weak(current, new_f64.to_bits(), Ordering::Relaxed, Ordering::Relaxed)
182 .is_ok()
183 {
184 return new_f64;
185 }
186 }
187 }
188}
189
190pub struct SimpleCounter {
196 name: String,
197 ctr: Counter<u64>,
198 shadow: AtomicU64,
199}
200
201impl SimpleCounter {
202 pub fn new(name: &str, description: &str) -> MetricResult<Self> {
204 let ctr = meter_for_metric(name)
205 .u64_counter(name.to_string())
206 .with_description(description.to_string())
207 .build();
208 Ok(Self {
209 name: name.to_string(),
210 ctr,
211 shadow: AtomicU64::new(0),
212 })
213 }
214
215 pub fn get(&self) -> u64 {
217 self.shadow.load(Ordering::Relaxed)
218 }
219
220 pub fn increment_by(&self, by: u64) {
222 self.ctr.add(by, &[]);
223 self.shadow.fetch_add(by, Ordering::Relaxed);
224 }
225
226 pub fn increment(&self) {
228 self.increment_by(1);
229 }
230
231 pub fn name(&self) -> String {
233 self.name.clone()
234 }
235}
236
237pub struct MultiCounter {
243 name: String,
244 labels: Vec<String>,
245 ctr: Counter<u64>,
246}
247
248impl MultiCounter {
249 pub fn new(name: &str, description: &str, labels: &[&str]) -> MetricResult<Self> {
251 if labels.is_empty() {
252 return Err(MetricError("at least a single label must be specified".into()));
253 }
254 let ctr = meter_for_metric(name)
255 .u64_counter(name.to_string())
256 .with_description(description.to_string())
257 .build();
258 Ok(Self {
259 name: name.to_string(),
260 labels: labels.iter().map(|s| s.to_string()).collect(),
261 ctr,
262 })
263 }
264
265 pub fn increment_by(&self, label_values: &[&str], by: u64) {
267 let attrs = labels_to_attributes(&self.labels, label_values);
268 self.ctr.add(by, &attrs);
269 }
270
271 pub fn increment(&self, label_values: &[&str]) {
273 self.increment_by(label_values, 1);
274 }
275
276 pub fn name(&self) -> String {
278 self.name.clone()
279 }
280
281 pub fn labels(&self) -> Vec<&str> {
283 self.labels.iter().map(String::as_str).collect()
284 }
285}
286
287pub struct SimpleGauge {
293 name: String,
294 gauge: UpDownCounter<f64>,
295 shadow: AtomicF64,
296}
297
298impl SimpleGauge {
299 pub fn new(name: &str, description: &str) -> MetricResult<Self> {
301 let gauge = meter_for_metric(name)
302 .f64_up_down_counter(name.to_string())
303 .with_description(description.to_string())
304 .build();
305 Ok(Self {
306 name: name.to_string(),
307 gauge,
308 shadow: AtomicF64::new(0.0),
309 })
310 }
311
312 pub fn increment(&self, by: f64) {
314 self.gauge.add(by, &[]);
315 self.shadow.fetch_add(by);
316 }
317
318 pub fn decrement(&self, by: f64) {
320 self.gauge.add(-by, &[]);
321 self.shadow.fetch_add(-by);
322 }
323
324 pub fn set(&self, value: f64) {
326 let previous = self.shadow.swap(value);
327 self.gauge.add(value - previous, &[]);
328 }
329
330 pub fn get(&self) -> f64 {
332 self.shadow.load()
333 }
334
335 pub fn name(&self) -> String {
337 self.name.clone()
338 }
339}
340
341pub struct MultiGauge {
347 name: String,
348 labels: Vec<String>,
349 gauge: UpDownCounter<f64>,
350 shadow: std::sync::RwLock<std::collections::HashMap<Vec<String>, AtomicF64>>,
351}
352
353impl MultiGauge {
354 pub fn new(name: &str, description: &str, labels: &[&str]) -> MetricResult<Self> {
356 if labels.is_empty() {
357 return Err(MetricError("at least a single label must be specified".into()));
358 }
359 let gauge = meter_for_metric(name)
360 .f64_up_down_counter(name.to_string())
361 .with_description(description.to_string())
362 .build();
363 Ok(Self {
364 name: name.to_string(),
365 labels: labels.iter().map(|s| s.to_string()).collect(),
366 gauge,
367 shadow: std::sync::RwLock::new(std::collections::HashMap::new()),
368 })
369 }
370
371 fn shadow_entry(&self, label_values: &[&str]) -> Vec<String> {
372 label_values.iter().map(|s| s.to_string()).collect()
373 }
374
375 fn ensure_shadow(&self, key: &[String]) {
376 {
377 let read = self.shadow.read().unwrap();
378 if read.contains_key(key) {
379 return;
380 }
381 }
382 let mut write = self.shadow.write().unwrap();
383 write.entry(key.to_vec()).or_insert_with(|| AtomicF64::new(0.0));
384 }
385
386 pub fn increment(&self, label_values: &[&str], by: f64) {
388 let attrs = labels_to_attributes(&self.labels, label_values);
389 self.gauge.add(by, &attrs);
390 let key = self.shadow_entry(label_values);
391 self.ensure_shadow(&key);
392 let read = self.shadow.read().unwrap();
393 if let Some(v) = read.get(&key) {
394 v.fetch_add(by);
395 }
396 }
397
398 pub fn decrement(&self, label_values: &[&str], by: f64) {
400 let attrs = labels_to_attributes(&self.labels, label_values);
401 self.gauge.add(-by, &attrs);
402 let key = self.shadow_entry(label_values);
403 self.ensure_shadow(&key);
404 let read = self.shadow.read().unwrap();
405 if let Some(v) = read.get(&key) {
406 v.fetch_add(-by);
407 }
408 }
409
410 pub fn set(&self, label_values: &[&str], value: f64) {
412 let key = self.shadow_entry(label_values);
413 self.ensure_shadow(&key);
414 let attrs = labels_to_attributes(&self.labels, label_values);
415 let read = self.shadow.read().unwrap();
416 if let Some(v) = read.get(&key) {
417 let previous = v.swap(value);
418 self.gauge.add(value - previous, &attrs);
419 }
420 }
421
422 pub fn get(&self, label_values: &[&str]) -> Option<f64> {
424 let key = self.shadow_entry(label_values);
425 let read = self.shadow.read().unwrap();
426 read.get(&key).map(|v| v.load())
427 }
428
429 pub fn name(&self) -> String {
431 self.name.clone()
432 }
433
434 pub fn labels(&self) -> Vec<&str> {
436 self.labels.iter().map(String::as_str).collect()
437 }
438}
439
440#[macro_export]
446macro_rules! histogram_start_measure {
447 ($v:ident) => {
449 $v.start_measure()
450 };
451 ($v:ident, $l:expr) => {
453 $v.start_measure($l)
454 };
455}
456
457pub struct SimpleTimer {
459 start: std::time::Instant,
460 labels: Option<Vec<KeyValue>>,
461}
462
463pub struct SimpleHistogram {
469 name: String,
470 hh: Histogram<f64>,
471}
472
473impl SimpleHistogram {
474 pub fn new(name: &str, description: &str, buckets: Vec<f64>) -> MetricResult<Self> {
478 let hh = meter_for_metric(name)
479 .f64_histogram(name.to_string())
480 .with_description(description.to_string())
481 .with_boundaries(buckets)
482 .build();
483 Ok(Self {
484 name: name.to_string(),
485 hh,
486 })
487 }
488
489 pub fn observe(&self, value: f64) {
491 self.hh.record(value, &[]);
492 }
493
494 pub fn start_measure(&self) -> SimpleTimer {
496 SimpleTimer {
497 start: std::time::Instant::now(),
498 labels: None,
499 }
500 }
501
502 pub fn record_measure(&self, timer: SimpleTimer) {
504 self.hh.record(timer.start.elapsed().as_secs_f64(), &[]);
505 }
506
507 pub fn cancel_measure(&self, timer: SimpleTimer) -> f64 {
509 timer.start.elapsed().as_secs_f64()
510 }
511
512 pub fn name(&self) -> String {
514 self.name.clone()
515 }
516}
517
518pub struct MultiHistogram {
524 name: String,
525 labels: Vec<String>,
526 hh: Histogram<f64>,
527}
528
529impl MultiHistogram {
530 pub fn new(name: &str, description: &str, buckets: Vec<f64>, labels: &[&str]) -> MetricResult<Self> {
534 if labels.is_empty() {
535 return Err(MetricError("at least a single label must be specified".into()));
536 }
537 let hh = meter_for_metric(name)
538 .f64_histogram(name.to_string())
539 .with_description(description.to_string())
540 .with_boundaries(buckets)
541 .build();
542 Ok(Self {
543 name: name.to_string(),
544 labels: labels.iter().map(|s| s.to_string()).collect(),
545 hh,
546 })
547 }
548
549 pub fn start_measure(&self, label_values: &[&str]) -> MetricResult<SimpleTimer> {
551 Ok(SimpleTimer {
552 start: std::time::Instant::now(),
553 labels: Some(labels_to_attributes(&self.labels, label_values)),
554 })
555 }
556
557 pub fn observe(&self, label_values: &[&str], value: f64) {
559 let attrs = labels_to_attributes(&self.labels, label_values);
560 self.hh.record(value, &attrs);
561 }
562
563 pub fn record_measure(&self, timer: SimpleTimer) {
565 let elapsed = timer.start.elapsed().as_secs_f64();
566 let attrs = timer.labels.as_deref().unwrap_or(&[]);
567 self.hh.record(elapsed, attrs);
568 }
569
570 pub fn cancel_measure(&self, timer: SimpleTimer) -> f64 {
572 timer.start.elapsed().as_secs_f64()
573 }
574
575 pub fn name(&self) -> String {
577 self.name.clone()
578 }
579
580 pub fn labels(&self) -> Vec<&str> {
582 self.labels.iter().map(String::as_str).collect()
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use anyhow::Context;
589
590 use super::*;
591
592 #[test]
593 fn simple_counter() -> anyhow::Result<()> {
594 let counter = SimpleCounter::new("otel_my_ctr", "test counter")?;
595
596 assert_eq!("otel_my_ctr", counter.name());
597
598 counter.increment();
599 assert_eq!(1, counter.get());
600
601 counter.increment_by(9);
602 assert_eq!(10, counter.get());
603
604 let metrics = gather_all_metrics().context("gather_all_metrics")?;
605 assert!(
606 metrics.contains("otel_my_ctr"),
607 "Prometheus text must contain counter name"
608 );
609
610 Ok(())
611 }
612
613 #[test]
614 fn multi_counter() -> anyhow::Result<()> {
615 let counter = MultiCounter::new("otel_my_mctr", "test multicounter", &["version"])?;
616
617 assert_eq!("otel_my_mctr", counter.name());
618 assert!(counter.labels().contains(&"version"));
619
620 counter.increment_by(&["1.90.1"], 10);
621 counter.increment_by(&["1.89.20"], 1);
622 counter.increment_by(&["1.90.1"], 15);
623
624 let metrics = gather_all_metrics().context("gather_all_metrics")?;
625 assert!(
626 metrics.contains("otel_my_mctr"),
627 "Prometheus text must contain multi counter name"
628 );
629 assert!(
630 metrics.contains("version=\"1.90.1\""),
631 "Prometheus text must contain label value"
632 );
633
634 Ok(())
635 }
636
637 #[test]
638 fn simple_gauge() -> anyhow::Result<()> {
639 let gauge = SimpleGauge::new("otel_my_gauge", "test gauge")?;
640
641 assert_eq!("otel_my_gauge", gauge.name());
642
643 gauge.increment(10.0);
644 assert_eq!(10.0, gauge.get());
645
646 gauge.decrement(5.1);
647 assert!((gauge.get() - 4.9).abs() < f64::EPSILON);
648
649 gauge.set(100.0);
650 assert_eq!(100.0, gauge.get());
651
652 let metrics = gather_all_metrics().context("gather_all_metrics")?;
653 assert!(
654 metrics.contains("otel_my_gauge"),
655 "Prometheus text must contain gauge name"
656 );
657
658 Ok(())
659 }
660
661 #[test]
662 fn multi_gauge() -> anyhow::Result<()> {
663 let gauge = MultiGauge::new("otel_my_mgauge", "test multigauge", &["version"])?;
664
665 assert_eq!("otel_my_mgauge", gauge.name());
666 assert!(gauge.labels().contains(&"version"));
667
668 gauge.increment(&["1.90.1"], 10.0);
669 gauge.increment(&["1.89.20"], 5.0);
670 gauge.increment(&["1.90.1"], 15.0);
671 gauge.decrement(&["1.89.20"], 2.0);
672
673 assert_eq!(25.0, gauge.get(&["1.90.1"]).context("should be present")?);
674 assert_eq!(3.0, gauge.get(&["1.89.20"]).context("should be present")?);
675
676 let metrics = gather_all_metrics().context("gather_all_metrics")?;
677 assert!(
678 metrics.contains("otel_my_mgauge"),
679 "Prometheus text must contain multi gauge name"
680 );
681
682 Ok(())
683 }
684
685 #[test]
686 fn simple_histogram() -> anyhow::Result<()> {
687 let histogram = SimpleHistogram::new("otel_my_histogram", "test histogram", vec![1.0, 2.0, 3.0, 4.0, 5.0])?;
688
689 assert_eq!("otel_my_histogram", histogram.name());
690
691 histogram.observe(2.0);
692 histogram.observe(2.0);
693 histogram.observe(1.0);
694 histogram.observe(5.0);
695
696 let timer = histogram_start_measure!(histogram);
697 histogram.cancel_measure(timer);
698
699 let metrics = gather_all_metrics().context("gather_all_metrics")?;
700 assert!(
701 metrics.contains("otel_my_histogram"),
702 "Prometheus text must contain histogram name"
703 );
704
705 Ok(())
706 }
707
708 #[test]
709 fn multi_histogram() -> anyhow::Result<()> {
710 let histogram = MultiHistogram::new(
711 "otel_my_mhistogram",
712 "test histogram",
713 vec![1.0, 2.0, 3.0, 4.0, 5.0],
714 &["version"],
715 )?;
716
717 assert_eq!("otel_my_mhistogram", histogram.name());
718 assert!(histogram.labels().contains(&"version"));
719
720 histogram.observe(&["1.90.0"], 2.0);
721 histogram.observe(&["1.90.0"], 2.0);
722 histogram.observe(&["1.90.0"], 1.0);
723 histogram.observe(&["1.90.0"], 5.0);
724 histogram.observe(&["1.89.20"], 10.0);
725
726 let timer = histogram_start_measure!(histogram, &["1.90.0"])?;
727 histogram.cancel_measure(timer);
728
729 let metrics = gather_all_metrics().context("gather_all_metrics")?;
730 assert!(
731 metrics.contains("otel_my_mhistogram"),
732 "Prometheus text must contain multi histogram name"
733 );
734
735 Ok(())
736 }
737}