Skip to main content

hopr_utils/parallelize/
mod.rs

1//! Parallelization utilities for CPU-heavy blocking workloads.
2//!
3//! This crate provides async-friendly wrappers around Rayon's thread pool for offloading
4//! CPU-intensive operations (EC multiplication, ECDSA signing, MAC verification) from
5//! async executor threads.
6//!
7//! For background on async executors and blocking, see
8//! [Async: What is blocking?](https://ryhl.io/blog/async-what-is-blocking/).
9//!
10//! See the [`cpu`] module for the primary API.
11
12/// Module for thread pool-based parallelization of CPU-heavy blocking workloads.
13///
14/// ## Zombie Task Prevention
15///
16/// The Rayon thread pool is sized to CPU cores for crypto operations. Callers wrap
17/// tasks with timeouts (e.g., 150ms for packet decoding). When a timeout fires, the
18/// async receiver is dropped, but Rayon has no native cancellation—the closure
19/// continues as a "zombie" task whose result is discarded.
20///
21/// Under sustained load, zombie accumulation can starve the pool: timed-out tasks
22/// continue occupying threads, causing subsequent tasks to also time out. To break
23/// this cycle, each spawned closure checks `tx.is_canceled()` before executing.
24/// If the receiver was dropped while queued, the closure returns immediately.
25///
26/// ## Queue Depth Limiting
27///
28/// To prevent unbounded queue growth, the module tracks outstanding tasks (queued +
29/// running). Use [`cpu::spawn_blocking`] or [`cpu::spawn_fifo_blocking`] which return
30/// [`cpu::SpawnError::QueueFull`] when the configured limit is reached.
31///
32/// Set `HOPR_CPU_TASK_QUEUE_LIMIT` environment variable to enable limiting.
33///
34/// ## Observability
35///
36/// Prometheus metrics (behind the `telemetry` feature) track:
37/// - **submitted**: total tasks entering the queue
38/// - **completed**: tasks that delivered results to a live receiver
39/// - **cancelled**: tasks skipped via cooperative cancellation
40/// - **orphaned**: tasks that ran but whose receiver was dropped during execution
41/// - **rejected**: tasks rejected due to queue being full
42/// - **queue_wait**: histogram of queue wait time
43/// - **execution_time**: histogram of task execution duration
44/// - **outstanding_tasks**: current queued + running tasks
45/// - **queue_limit**: configured maximum (for comparison)
46#[cfg(feature = "parallelize-rayon")]
47pub mod cpu {
48    use std::sync::atomic::{AtomicUsize, Ordering};
49
50    use futures::channel::oneshot;
51    pub use rayon;
52
53    /// Histogram buckets for timing metrics (seconds).
54    #[cfg(all(feature = "parallelize", feature = "telemetry", not(test)))]
55    const TIMING_BUCKETS: &[f64] = &[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.5, 1.0];
56
57    mod metrics {
58        #[cfg(any(not(all(feature = "parallelize", feature = "telemetry")), test))]
59        pub use noop::*;
60        #[cfg(all(feature = "parallelize", feature = "telemetry", not(test)))]
61        pub use real::*;
62
63        #[cfg(all(feature = "parallelize", feature = "telemetry", not(test)))]
64        mod real {
65            use lazy_static::lazy_static;
66
67            lazy_static! {
68                static ref TASKS_SUBMITTED: hopr_types::telemetry::SimpleCounter =
69                    hopr_types::telemetry::SimpleCounter::new(
70                        "hopr_rayon_tasks_submitted_total",
71                        "Total number of tasks submitted to the Rayon thread pool",
72                    )
73                    .unwrap();
74                static ref TASKS_COMPLETED: hopr_types::telemetry::SimpleCounter =
75                    hopr_types::telemetry::SimpleCounter::new(
76                        "hopr_rayon_tasks_completed_total",
77                        "Total number of Rayon tasks that completed and delivered results",
78                    )
79                    .unwrap();
80                static ref TASKS_CANCELLED: hopr_types::telemetry::SimpleCounter =
81                    hopr_types::telemetry::SimpleCounter::new(
82                        "hopr_rayon_tasks_cancelled_total",
83                        "Total number of Rayon tasks skipped because receiver was already dropped",
84                    )
85                    .unwrap();
86                static ref TASKS_ORPHANED: hopr_types::telemetry::SimpleCounter =
87                    hopr_types::telemetry::SimpleCounter::new(
88                        "hopr_rayon_tasks_orphaned_total",
89                        "Total number of Rayon tasks whose results were discarded after completion",
90                    )
91                    .unwrap();
92                static ref TASKS_REJECTED: hopr_types::telemetry::SimpleCounter =
93                    hopr_types::telemetry::SimpleCounter::new(
94                        "hopr_rayon_tasks_rejected_total",
95                        "Total number of tasks rejected due to queue being full",
96                    )
97                    .unwrap();
98                static ref QUEUE_WAIT: hopr_types::telemetry::SimpleHistogram =
99                    hopr_types::telemetry::SimpleHistogram::new(
100                        "hopr_rayon_queue_wait_seconds",
101                        "Time tasks spend waiting in the Rayon queue before execution starts",
102                        super::super::TIMING_BUCKETS.to_vec(),
103                    )
104                    .unwrap();
105                static ref EXECUTION_TIME: hopr_types::telemetry::MultiHistogram =
106                    hopr_types::telemetry::MultiHistogram::new(
107                        "hopr_rayon_execution_seconds",
108                        "Time tasks spend executing in the Rayon thread pool",
109                        super::super::TIMING_BUCKETS.to_vec(),
110                        &["operation"],
111                    )
112                    .unwrap();
113                static ref OUTSTANDING_TASKS: hopr_types::telemetry::SimpleGauge =
114                    hopr_types::telemetry::SimpleGauge::new(
115                        "hopr_rayon_outstanding_tasks",
116                        "Current number of tasks queued or running in the Rayon pool",
117                    )
118                    .unwrap();
119                static ref QUEUE_LIMIT: hopr_types::telemetry::SimpleGauge = hopr_types::telemetry::SimpleGauge::new(
120                    "hopr_rayon_queue_limit",
121                    "Configured maximum outstanding tasks for the Rayon thread pool",
122                )
123                .unwrap();
124            }
125
126            #[inline]
127            pub fn submitted() {
128                TASKS_SUBMITTED.increment();
129            }
130
131            #[inline]
132            pub fn completed() {
133                TASKS_COMPLETED.increment();
134            }
135
136            #[inline]
137            pub fn cancelled() {
138                TASKS_CANCELLED.increment();
139            }
140
141            #[inline]
142            pub fn orphaned() {
143                TASKS_ORPHANED.increment();
144            }
145
146            #[inline]
147            pub fn rejected() {
148                TASKS_REJECTED.increment();
149            }
150
151            #[inline]
152            pub fn observe_queue_wait(seconds: f64) {
153                QUEUE_WAIT.observe(seconds);
154            }
155
156            #[inline]
157            pub fn observe_execution(operation: &str, seconds: f64) {
158                EXECUTION_TIME.observe(&[operation], seconds);
159            }
160
161            #[inline]
162            pub fn outstanding_inc() {
163                OUTSTANDING_TASKS.increment(1.0);
164            }
165
166            #[inline]
167            pub fn outstanding_dec() {
168                OUTSTANDING_TASKS.decrement(1.0);
169            }
170
171            #[inline]
172            pub fn set_queue_limit(limit: usize) {
173                QUEUE_LIMIT.set(limit as f64);
174            }
175        }
176
177        #[cfg(any(not(all(feature = "parallelize", feature = "telemetry")), test))]
178        mod noop {
179            #[inline]
180            pub fn submitted() {}
181            #[inline]
182            pub fn completed() {}
183            #[inline]
184            pub fn cancelled() {}
185            #[inline]
186            pub fn orphaned() {}
187            #[inline]
188            pub fn rejected() {}
189            #[inline]
190            pub fn observe_queue_wait(_: f64) {}
191            #[inline]
192            pub fn observe_execution(_: &str, _: f64) {}
193            #[inline]
194            pub fn outstanding_inc() {}
195            #[inline]
196            pub fn outstanding_dec() {}
197            #[inline]
198            pub fn set_queue_limit(_: usize) {}
199        }
200    }
201
202    /// Current number of outstanding tasks (queued + running).
203    static OUTSTANDING: AtomicUsize = AtomicUsize::new(0);
204
205    lazy_static::lazy_static! {
206        /// Queue limit from environment. `None` means no limit.
207        static ref QUEUE_LIMIT: Option<usize> = {
208            let limit = std::env::var("HOPR_CPU_TASK_QUEUE_LIMIT")
209                .ok()
210                .and_then(|v| v.parse::<usize>().ok())
211                .filter(|&v| v > 0);
212
213            if let Some(l) = limit {
214                metrics::set_queue_limit(l);
215            }
216
217            limit
218        };
219    }
220
221    /// Error type for spawn operations.
222    #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
223    pub enum SpawnError {
224        /// The queue is full and cannot accept more tasks.
225        #[error("rayon queue full: {current}/{limit} tasks outstanding")]
226        QueueFull {
227            /// Current outstanding task count when rejection occurred.
228            current: usize,
229            /// Configured queue limit.
230            limit: usize,
231        },
232    }
233
234    /// Returns the current outstanding task count (queued + running).
235    #[inline]
236    pub fn outstanding_tasks() -> usize {
237        OUTSTANDING.load(Ordering::Relaxed)
238    }
239
240    /// Returns the configured queue limit, or `None` if unlimited.
241    #[inline]
242    pub fn queue_limit() -> Option<usize> {
243        *QUEUE_LIMIT
244    }
245
246    /// Guard that acquires a slot on construction and calls releases slot on drop,
247    /// even if the task panics or returns early.
248    struct SlotGuard;
249
250    impl SlotGuard {
251        /// Attempts to acquire a slot for a new task.
252        ///
253        /// Returns `Ok(())` if no limit or slot acquired, `Err(QueueFull)` if at limit.
254        pub fn try_acquire_slot() -> Result<Self, SpawnError> {
255            let prev = OUTSTANDING.fetch_add(1, Ordering::AcqRel);
256            metrics::outstanding_inc();
257            let guard = Self;
258
259            if let Some(limit) = *QUEUE_LIMIT {
260                let new = prev + 1;
261                if new > limit {
262                    metrics::rejected();
263                    return Err(SpawnError::QueueFull { current: prev, limit });
264                }
265            }
266            Ok(guard)
267        }
268    }
269
270    impl Drop for SlotGuard {
271        #[inline]
272        fn drop(&mut self) {
273            let prev = OUTSTANDING.fetch_sub(1, Ordering::AcqRel);
274            debug_assert!(prev > 0, "outstanding task count underflow");
275            metrics::outstanding_dec();
276        }
277    }
278
279    /// Initialize the Rayon thread pool with the given number of threads.
280    ///
281    /// Also initializes the queue limit metric.
282    pub fn init_thread_pool(num_threads: usize) -> Result<(), rayon::ThreadPoolBuildError> {
283        let builder = rayon::ThreadPoolBuilder::new().num_threads(num_threads);
284
285        let builder = builder.spawn_handler(|thread| {
286            let mut thread_builder = std::thread::Builder::new();
287            if let Some(name) = thread.name() {
288                thread_builder = thread_builder.name(name.to_owned());
289            }
290            if let Some(stack_size) = thread.stack_size() {
291                thread_builder = thread_builder.stack_size(stack_size);
292            }
293            thread_builder.spawn(|| {
294                #[cfg(target_os = "macos")]
295                unsafe {
296                    // MacOS: Set the QOS class to "user initiated" to allow running on performance cores
297                    libc::pthread_set_qos_class_self_np(libc::qos_class_t::QOS_CLASS_USER_INITIATED, 0);
298                }
299                thread.run()
300            })?;
301            Ok(())
302        });
303
304        let result = builder.build_global();
305        let _ = *QUEUE_LIMIT; // Initialize limit metric
306        result
307    }
308
309    /// Builds a cancellable task closure and its receiver.
310    ///
311    /// The closure wraps `f` with cooperative cancellation, panic catching,
312    /// timing metrics, and slot tracking via guard.
313    ///
314    /// Note: Cooperative cancellation only prevents "queued zombies" - tasks whose
315    /// receiver was dropped before execution started. If the timeout fires *after*
316    /// execution begins, the task will still run to completion (counted as "orphaned").
317    fn cancellable_task<R: Send + 'static>(
318        f: impl FnOnce() -> R + Send + 'static,
319        operation: &'static str,
320    ) -> Result<
321        (
322            impl FnOnce() + Send + 'static,
323            oneshot::Receiver<std::thread::Result<R>>,
324        ),
325        SpawnError,
326    > {
327        let guard = SlotGuard::try_acquire_slot()?;
328
329        let (tx, rx) = oneshot::channel();
330        let submitted_at = std::time::Instant::now();
331
332        metrics::submitted();
333
334        let task = move || {
335            // ensures guard is moved inside the closure, and
336            // that the slot is released even on panic
337            let _g = guard;
338
339            if tx.is_canceled() {
340                tracing::debug!(
341                    queue_wait_ms = submitted_at.elapsed().as_millis() as u64,
342                    "skipping cancelled task (receiver dropped before execution)"
343                );
344                metrics::cancelled();
345                return;
346            }
347
348            let wait_duration = submitted_at.elapsed();
349            metrics::observe_queue_wait(wait_duration.as_secs_f64());
350
351            let execution_start = std::time::Instant::now();
352            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
353            metrics::observe_execution(operation, execution_start.elapsed().as_secs_f64());
354
355            match tx.send(result) {
356                Ok(()) => metrics::completed(),
357                Err(_) => {
358                    tracing::debug!(
359                        queue_wait_ms = wait_duration.as_millis() as u64,
360                        "receiver dropped during execution, result discarded"
361                    );
362                    metrics::orphaned();
363                }
364            }
365        };
366
367        Ok((task, rx))
368    }
369
370    /// Spawn a blocking function on the Rayon thread pool (LIFO scheduling).
371    ///
372    /// Uses Rayon's default LIFO scheduling for the thread's local queue.
373    ///
374    /// Includes cooperative cancellation: if the receiver is dropped before the
375    /// task starts (e.g., timeout), the task is skipped without executing.
376    ///
377    /// # Errors
378    ///
379    /// Returns [`SpawnError::QueueFull`] if the outstanding task count exceeds the limit.
380    pub async fn spawn_blocking<R: Send + 'static>(
381        f: impl FnOnce() -> R + Send + 'static,
382        operation: &'static str,
383    ) -> Result<R, SpawnError> {
384        let (task, rx) = cancellable_task(f, operation)?;
385        rayon::spawn(task);
386        Ok(rx
387            .await
388            .expect("rayon task channel closed unexpectedly")
389            .unwrap_or_else(|panic| std::panic::resume_unwind(panic)))
390    }
391
392    /// Spawn a blocking function on the Rayon thread pool (FIFO scheduling).
393    ///
394    /// Uses FIFO scheduling which prevents starvation of older tasks. This is the
395    /// preferred variant for packet decoding and similar ordered workloads.
396    ///
397    /// Includes cooperative cancellation: if the receiver is dropped before the
398    /// task starts (e.g., timeout), the task is skipped without executing.
399    ///
400    /// # Errors
401    ///
402    /// Returns [`SpawnError::QueueFull`] if the outstanding task count exceeds the limit.
403    pub async fn spawn_fifo_blocking<R: Send + 'static>(
404        f: impl FnOnce() -> R + Send + 'static,
405        operation: &'static str,
406    ) -> Result<R, SpawnError> {
407        let (task, rx) = cancellable_task(f, operation)?;
408        rayon::spawn_fifo(task);
409        Ok(rx
410            .await
411            .expect("rayon task channel closed unexpectedly")
412            .unwrap_or_else(|panic| std::panic::resume_unwind(panic)))
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use std::{
419        sync::{
420            Arc,
421            atomic::{AtomicU32, Ordering},
422        },
423        time::Duration,
424    };
425
426    use futures::FutureExt;
427    use serial_test::serial;
428
429    use super::cpu;
430
431    #[tokio::test]
432    #[serial]
433    async fn spawn_blocking_returns_result() {
434        let result = cpu::spawn_blocking(|| 42, "test").await.unwrap();
435        assert_eq!(result, 42);
436    }
437
438    #[tokio::test]
439    #[serial]
440    async fn spawn_fifo_blocking_returns_result() {
441        let result = cpu::spawn_fifo_blocking(|| "hello", "test").await.unwrap();
442        assert_eq!(result, "hello");
443    }
444
445    #[cfg(panic = "unwind")]
446    #[tokio::test]
447    #[serial]
448    async fn spawn_blocking_propagates_panic() {
449        let result = std::panic::AssertUnwindSafe(async {
450            cpu::spawn_blocking(
451                || {
452                    panic!("test panic");
453                },
454                "test",
455            )
456            .await
457            .unwrap()
458        })
459        .catch_unwind()
460        .await;
461        assert!(result.is_err(), "should propagate panic from Rayon task");
462    }
463
464    #[tokio::test]
465    #[serial]
466    async fn cancelled_tasks_are_skipped_via_cooperative_cancellation() {
467        let initial_outstanding = cpu::outstanding_tasks();
468        let executed_count = Arc::new(AtomicU32::new(0));
469
470        for _ in 0..100 {
471            let count = executed_count.clone();
472            let fut = cpu::spawn_fifo_blocking(
473                move || {
474                    count.fetch_add(1, Ordering::SeqCst);
475                    std::thread::sleep(Duration::from_millis(50));
476                },
477                "test",
478            );
479            let _ = fut.now_or_never();
480        }
481
482        let start = std::time::Instant::now();
483        let result = cpu::spawn_fifo_blocking(|| 42, "test").await.unwrap();
484        let elapsed = start.elapsed();
485
486        assert_eq!(result, 42);
487        assert!(
488            elapsed < Duration::from_secs(2),
489            "Task took {elapsed:?} - cancelled tasks may not be getting skipped"
490        );
491
492        for _ in 0..50 {
493            tokio::time::sleep(Duration::from_millis(100)).await;
494            if cpu::outstanding_tasks() == initial_outstanding {
495                break;
496            }
497        }
498
499        let executed = executed_count.load(Ordering::SeqCst);
500        assert!(
501            executed < 50,
502            "Expected most tasks to be skipped by cancellation, but {executed}/100 executed"
503        );
504    }
505
506    #[tokio::test]
507    #[serial]
508    async fn pool_recovers_after_cancelled_burst() {
509        let initial_outstanding = cpu::outstanding_tasks();
510
511        for _ in 0..50 {
512            let fut = cpu::spawn_fifo_blocking(
513                || {
514                    std::thread::sleep(Duration::from_millis(100));
515                },
516                "test",
517            );
518            let _ = fut.now_or_never();
519        }
520
521        tokio::time::sleep(Duration::from_millis(300)).await;
522
523        for i in 0..10 {
524            let start = std::time::Instant::now();
525            let result = cpu::spawn_fifo_blocking(move || i * 2, "test").await.unwrap();
526            let elapsed = start.elapsed();
527
528            assert_eq!(result, i * 2);
529            assert!(
530                elapsed < Duration::from_millis(500),
531                "Recovery task {i} took {elapsed:?} - pool may still be starved"
532            );
533        }
534
535        for _ in 0..50 {
536            tokio::time::sleep(Duration::from_millis(100)).await;
537            if cpu::outstanding_tasks() == initial_outstanding {
538                break;
539            }
540        }
541    }
542
543    #[tokio::test]
544    #[serial]
545    async fn outstanding_tasks_tracking() {
546        let initial = cpu::outstanding_tasks();
547
548        let barrier = Arc::new(std::sync::Barrier::new(2));
549        let barrier_clone = barrier.clone();
550
551        let handle = tokio::spawn(async move {
552            cpu::spawn_fifo_blocking(
553                move || {
554                    barrier_clone.wait();
555                    42
556                },
557                "test",
558            )
559            .await
560        });
561
562        tokio::time::sleep(Duration::from_millis(50)).await;
563
564        let during = cpu::outstanding_tasks();
565        assert!(
566            during > initial,
567            "Outstanding should increase: initial={initial}, during={during}"
568        );
569
570        barrier.wait();
571
572        let result = handle.await.unwrap();
573        assert_eq!(result.unwrap(), 42);
574
575        tokio::time::sleep(Duration::from_millis(50)).await;
576
577        let after = cpu::outstanding_tasks();
578        assert_eq!(after, initial, "Outstanding should return to initial after completion");
579    }
580
581    #[tokio::test]
582    #[serial]
583    async fn outstanding_decrements_on_cancellation() {
584        let initial = cpu::outstanding_tasks();
585
586        for _ in 0..10 {
587            let fut = cpu::spawn_fifo_blocking(
588                || {
589                    std::thread::sleep(Duration::from_millis(100));
590                },
591                "test",
592            );
593            let _ = fut.now_or_never();
594        }
595
596        tokio::time::sleep(Duration::from_millis(500)).await;
597
598        let after = cpu::outstanding_tasks();
599        assert_eq!(
600            after, initial,
601            "Outstanding should return to initial after cancelled tasks drain"
602        );
603    }
604}