Skip to main content

hopr_parallelize/
lib.rs

1//! Parallelization utilities for CPU-heavy blocking workloads.
2//!
3//! This crate provides async-friendly wrappers around Rayon's thread pool for offloading
4//! CPU-intensive operations (EC multiplication, ECDSA signing, MAC verification) from
5//! async executor threads.
6//!
7//! For background on async executors and blocking, see
8//! [Async: What is blocking?](https://ryhl.io/blog/async-what-is-blocking/).
9//!
10//! See the [`cpu`] module for the primary API.
11
12/// Module for thread pool-based parallelization of CPU-heavy blocking workloads.
13///
14/// ## Zombie Task Prevention
15///
16/// The Rayon thread pool is sized to CPU cores for crypto operations. Callers wrap
17/// tasks with timeouts (e.g., 150ms for packet decoding). When a timeout fires, the
18/// async receiver is dropped, but Rayon has no native cancellation—the closure
19/// continues as a "zombie" task whose result is discarded.
20///
21/// Under sustained load, zombie accumulation can starve the pool: timed-out tasks
22/// continue occupying threads, causing subsequent tasks to also time out. To break
23/// this cycle, each spawned closure checks `tx.is_canceled()` before executing.
24/// If the receiver was dropped while queued, the closure returns immediately.
25///
26/// ## Queue Depth Limiting
27///
28/// To prevent unbounded queue growth, the module tracks outstanding tasks (queued +
29/// running). Use [`spawn_blocking`] or [`spawn_fifo_blocking`] which return
30/// [`SpawnError::QueueFull`] when the configured limit is reached.
31///
32/// Set `HOPR_CPU_TASK_QUEUE_LIMIT` environment variable to enable limiting.
33///
34/// ## Observability
35///
36/// Prometheus metrics (behind the `prometheus` feature) track:
37/// - **submitted**: total tasks entering the queue
38/// - **completed**: tasks that delivered results to a live receiver
39/// - **cancelled**: tasks skipped via cooperative cancellation
40/// - **orphaned**: tasks that ran but whose receiver was dropped during execution
41/// - **rejected**: tasks rejected due to queue being full
42/// - **queue_wait**: histogram of queue wait time
43/// - **execution_time**: histogram of task execution duration
44/// - **outstanding_tasks**: current queued + running tasks
45/// - **queue_limit**: configured maximum (for comparison)
46#[cfg(feature = "rayon")]
47pub mod cpu {
48    use std::sync::atomic::{AtomicUsize, Ordering};
49
50    use futures::channel::oneshot;
51    pub use rayon;
52
53    /// Histogram buckets for timing metrics (seconds).
54    #[cfg(all(feature = "prometheus", not(test)))]
55    const TIMING_BUCKETS: &[f64] = &[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.5, 1.0];
56
57    mod metrics {
58        #[cfg(any(not(feature = "prometheus"), test))]
59        pub use noop::*;
60        #[cfg(all(feature = "prometheus", not(test)))]
61        pub use real::*;
62
63        #[cfg(all(feature = "prometheus", not(test)))]
64        mod real {
65            use lazy_static::lazy_static;
66
67            lazy_static! {
68                static ref TASKS_SUBMITTED: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
69                    "hopr_rayon_tasks_submitted_total",
70                    "Total number of tasks submitted to the Rayon thread pool",
71                )
72                .unwrap();
73                static ref TASKS_COMPLETED: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
74                    "hopr_rayon_tasks_completed_total",
75                    "Total number of Rayon tasks that completed and delivered results",
76                )
77                .unwrap();
78                static ref TASKS_CANCELLED: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
79                    "hopr_rayon_tasks_cancelled_total",
80                    "Total number of Rayon tasks skipped because receiver was already dropped",
81                )
82                .unwrap();
83                static ref TASKS_ORPHANED: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
84                    "hopr_rayon_tasks_orphaned_total",
85                    "Total number of Rayon tasks whose results were discarded after completion",
86                )
87                .unwrap();
88                static ref TASKS_REJECTED: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
89                    "hopr_rayon_tasks_rejected_total",
90                    "Total number of tasks rejected due to queue being full",
91                )
92                .unwrap();
93                static ref QUEUE_WAIT: hopr_metrics::SimpleHistogram = hopr_metrics::SimpleHistogram::new(
94                    "hopr_rayon_queue_wait_seconds",
95                    "Time tasks spend waiting in the Rayon queue before execution starts",
96                    super::super::TIMING_BUCKETS.to_vec(),
97                )
98                .unwrap();
99                static ref EXECUTION_TIME: hopr_metrics::MultiHistogram = hopr_metrics::MultiHistogram::new(
100                    "hopr_rayon_execution_seconds",
101                    "Time tasks spend executing in the Rayon thread pool",
102                    super::super::TIMING_BUCKETS.to_vec(),
103                    &["operation"],
104                )
105                .unwrap();
106                static ref OUTSTANDING_TASKS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
107                    "hopr_rayon_outstanding_tasks",
108                    "Current number of tasks queued or running in the Rayon pool",
109                )
110                .unwrap();
111                static ref QUEUE_LIMIT: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
112                    "hopr_rayon_queue_limit",
113                    "Configured maximum outstanding tasks for the Rayon thread pool",
114                )
115                .unwrap();
116            }
117
118            #[inline]
119            pub fn submitted() {
120                TASKS_SUBMITTED.increment();
121            }
122
123            #[inline]
124            pub fn completed() {
125                TASKS_COMPLETED.increment();
126            }
127
128            #[inline]
129            pub fn cancelled() {
130                TASKS_CANCELLED.increment();
131            }
132
133            #[inline]
134            pub fn orphaned() {
135                TASKS_ORPHANED.increment();
136            }
137
138            #[inline]
139            pub fn rejected() {
140                TASKS_REJECTED.increment();
141            }
142
143            #[inline]
144            pub fn observe_queue_wait(seconds: f64) {
145                QUEUE_WAIT.observe(seconds);
146            }
147
148            #[inline]
149            pub fn observe_execution(operation: &str, seconds: f64) {
150                EXECUTION_TIME.observe(&[operation], seconds);
151            }
152
153            #[inline]
154            pub fn outstanding_inc() {
155                OUTSTANDING_TASKS.increment(1.0);
156            }
157
158            #[inline]
159            pub fn outstanding_dec() {
160                OUTSTANDING_TASKS.decrement(1.0);
161            }
162
163            #[inline]
164            pub fn set_queue_limit(limit: usize) {
165                QUEUE_LIMIT.set(limit as f64);
166            }
167        }
168
169        #[cfg(any(not(feature = "prometheus"), test))]
170        mod noop {
171            #[inline]
172            pub fn submitted() {}
173            #[inline]
174            pub fn completed() {}
175            #[inline]
176            pub fn cancelled() {}
177            #[inline]
178            pub fn orphaned() {}
179            #[inline]
180            pub fn rejected() {}
181            #[inline]
182            pub fn observe_queue_wait(_: f64) {}
183            #[inline]
184            pub fn observe_execution(_: &str, _: f64) {}
185            #[inline]
186            pub fn outstanding_inc() {}
187            #[inline]
188            pub fn outstanding_dec() {}
189            #[inline]
190            pub fn set_queue_limit(_: usize) {}
191        }
192    }
193
194    /// Current number of outstanding tasks (queued + running).
195    static OUTSTANDING: AtomicUsize = AtomicUsize::new(0);
196
197    lazy_static::lazy_static! {
198        /// Queue limit from environment. `None` means no limit.
199        static ref QUEUE_LIMIT: Option<usize> = {
200            let limit = std::env::var("HOPR_CPU_TASK_QUEUE_LIMIT")
201                .ok()
202                .and_then(|v| v.parse::<usize>().ok())
203                .filter(|&v| v > 0);
204
205            if let Some(l) = limit {
206                metrics::set_queue_limit(l);
207            }
208
209            limit
210        };
211    }
212
213    /// Error type for spawn operations.
214    #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
215    pub enum SpawnError {
216        /// The queue is full and cannot accept more tasks.
217        #[error("rayon queue full: {current}/{limit} tasks outstanding")]
218        QueueFull {
219            /// Current outstanding task count when rejection occurred.
220            current: usize,
221            /// Configured queue limit.
222            limit: usize,
223        },
224    }
225
226    /// Returns the current outstanding task count (queued + running).
227    #[inline]
228    pub fn outstanding_tasks() -> usize {
229        OUTSTANDING.load(Ordering::Relaxed)
230    }
231
232    /// Returns the configured queue limit, or `None` if unlimited.
233    #[inline]
234    pub fn queue_limit() -> Option<usize> {
235        *QUEUE_LIMIT
236    }
237
238    /// Guard that acquires a slot on construction and calls releases slot on drop,
239    /// even if the task panics or returns early.
240    struct SlotGuard;
241
242    impl SlotGuard {
243        /// Attempts to acquire a slot for a new task.
244        ///
245        /// Returns `Ok(())` if no limit or slot acquired, `Err(QueueFull)` if at limit.
246        pub fn try_acquire_slot() -> Result<Self, SpawnError> {
247            let prev = OUTSTANDING.fetch_add(1, Ordering::AcqRel);
248            metrics::outstanding_inc();
249            let guard = Self;
250
251            if let Some(limit) = *QUEUE_LIMIT {
252                let new = prev + 1;
253                if new > limit {
254                    metrics::rejected();
255                    return Err(SpawnError::QueueFull { current: prev, limit });
256                }
257            }
258            Ok(guard)
259        }
260    }
261
262    impl Drop for SlotGuard {
263        #[inline]
264        fn drop(&mut self) {
265            let prev = OUTSTANDING.fetch_sub(1, Ordering::AcqRel);
266            debug_assert!(prev > 0, "outstanding task count underflow");
267            metrics::outstanding_dec();
268        }
269    }
270
271    /// Initialize the Rayon thread pool with the given number of threads.
272    ///
273    /// Also initializes the queue limit metric.
274    pub fn init_thread_pool(num_threads: usize) -> Result<(), rayon::ThreadPoolBuildError> {
275        let builder = rayon::ThreadPoolBuilder::new().num_threads(num_threads);
276
277        let builder = builder.spawn_handler(|thread| {
278            let mut thread_builder = std::thread::Builder::new();
279            if let Some(name) = thread.name() {
280                thread_builder = thread_builder.name(name.to_owned());
281            }
282            if let Some(stack_size) = thread.stack_size() {
283                thread_builder = thread_builder.stack_size(stack_size);
284            }
285            thread_builder.spawn(|| {
286                #[cfg(target_os = "macos")]
287                unsafe {
288                    // MacOS: Set the QOS class to "user initiated" to allow running on performance cores
289                    libc::pthread_set_qos_class_self_np(libc::qos_class_t::QOS_CLASS_USER_INITIATED, 0);
290                }
291                thread.run()
292            })?;
293            Ok(())
294        });
295
296        let result = builder.build_global();
297        let _ = *QUEUE_LIMIT; // Initialize limit metric
298        result
299    }
300
301    /// Builds a cancellable task closure and its receiver.
302    ///
303    /// The closure wraps `f` with cooperative cancellation, panic catching,
304    /// timing metrics, and slot tracking via guard.
305    ///
306    /// Note: Cooperative cancellation only prevents "queued zombies" - tasks whose
307    /// receiver was dropped before execution started. If the timeout fires *after*
308    /// execution begins, the task will still run to completion (counted as "orphaned").
309    fn cancellable_task<R: Send + 'static>(
310        f: impl FnOnce() -> R + Send + 'static,
311        operation: &'static str,
312    ) -> Result<
313        (
314            impl FnOnce() + Send + 'static,
315            oneshot::Receiver<std::thread::Result<R>>,
316        ),
317        SpawnError,
318    > {
319        let guard = SlotGuard::try_acquire_slot()?;
320
321        let (tx, rx) = oneshot::channel();
322        let submitted_at = std::time::Instant::now();
323
324        metrics::submitted();
325
326        let task = move || {
327            // ensures guard is moved inside the closure, and
328            // that the slot is released even on panic
329            let _g = guard;
330
331            if tx.is_canceled() {
332                tracing::debug!(
333                    queue_wait_ms = submitted_at.elapsed().as_millis() as u64,
334                    "skipping cancelled task (receiver dropped before execution)"
335                );
336                metrics::cancelled();
337                return;
338            }
339
340            let wait_duration = submitted_at.elapsed();
341            metrics::observe_queue_wait(wait_duration.as_secs_f64());
342
343            let execution_start = std::time::Instant::now();
344            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
345            metrics::observe_execution(operation, execution_start.elapsed().as_secs_f64());
346
347            match tx.send(result) {
348                Ok(()) => metrics::completed(),
349                Err(_) => {
350                    tracing::debug!(
351                        queue_wait_ms = wait_duration.as_millis() as u64,
352                        "receiver dropped during execution, result discarded"
353                    );
354                    metrics::orphaned();
355                }
356            }
357        };
358
359        Ok((task, rx))
360    }
361
362    /// Spawn a blocking function on the Rayon thread pool (LIFO scheduling).
363    ///
364    /// Uses Rayon's default LIFO scheduling for the thread's local queue.
365    ///
366    /// Includes cooperative cancellation: if the receiver is dropped before the
367    /// task starts (e.g., timeout), the task is skipped without executing.
368    ///
369    /// # Errors
370    ///
371    /// Returns [`SpawnError::QueueFull`] if the outstanding task count exceeds the limit.
372    pub async fn spawn_blocking<R: Send + 'static>(
373        f: impl FnOnce() -> R + Send + 'static,
374        operation: &'static str,
375    ) -> Result<R, SpawnError> {
376        let (task, rx) = cancellable_task(f, operation)?;
377        rayon::spawn(task);
378        Ok(rx
379            .await
380            .expect("rayon task channel closed unexpectedly")
381            .unwrap_or_else(|panic| std::panic::resume_unwind(panic)))
382    }
383
384    /// Spawn a blocking function on the Rayon thread pool (FIFO scheduling).
385    ///
386    /// Uses FIFO scheduling which prevents starvation of older tasks. This is the
387    /// preferred variant for packet decoding and similar ordered workloads.
388    ///
389    /// Includes cooperative cancellation: if the receiver is dropped before the
390    /// task starts (e.g., timeout), the task is skipped without executing.
391    ///
392    /// # Errors
393    ///
394    /// Returns [`SpawnError::QueueFull`] if the outstanding task count exceeds the limit.
395    pub async fn spawn_fifo_blocking<R: Send + 'static>(
396        f: impl FnOnce() -> R + Send + 'static,
397        operation: &'static str,
398    ) -> Result<R, SpawnError> {
399        let (task, rx) = cancellable_task(f, operation)?;
400        rayon::spawn_fifo(task);
401        Ok(rx
402            .await
403            .expect("rayon task channel closed unexpectedly")
404            .unwrap_or_else(|panic| std::panic::resume_unwind(panic)))
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use std::{
411        sync::{
412            Arc,
413            atomic::{AtomicU32, Ordering},
414        },
415        time::Duration,
416    };
417
418    use futures::FutureExt;
419    use serial_test::serial;
420
421    use super::cpu;
422
423    #[tokio::test]
424    #[serial]
425    async fn spawn_blocking_returns_result() {
426        let result = cpu::spawn_blocking(|| 42, "test").await.unwrap();
427        assert_eq!(result, 42);
428    }
429
430    #[tokio::test]
431    #[serial]
432    async fn spawn_fifo_blocking_returns_result() {
433        let result = cpu::spawn_fifo_blocking(|| "hello", "test").await.unwrap();
434        assert_eq!(result, "hello");
435    }
436
437    #[cfg(panic = "unwind")]
438    #[tokio::test]
439    #[serial]
440    async fn spawn_blocking_propagates_panic() {
441        let result = std::panic::AssertUnwindSafe(async {
442            cpu::spawn_blocking(
443                || {
444                    panic!("test panic");
445                },
446                "test",
447            )
448            .await
449            .unwrap()
450        })
451        .catch_unwind()
452        .await;
453        assert!(result.is_err(), "should propagate panic from Rayon task");
454    }
455
456    #[tokio::test]
457    #[serial]
458    async fn cancelled_tasks_are_skipped_via_cooperative_cancellation() {
459        let initial_outstanding = cpu::outstanding_tasks();
460        let executed_count = Arc::new(AtomicU32::new(0));
461
462        for _ in 0..100 {
463            let count = executed_count.clone();
464            let fut = cpu::spawn_fifo_blocking(
465                move || {
466                    count.fetch_add(1, Ordering::SeqCst);
467                    std::thread::sleep(Duration::from_millis(50));
468                },
469                "test",
470            );
471            let _ = fut.now_or_never();
472        }
473
474        let start = std::time::Instant::now();
475        let result = cpu::spawn_fifo_blocking(|| 42, "test").await.unwrap();
476        let elapsed = start.elapsed();
477
478        assert_eq!(result, 42);
479        assert!(
480            elapsed < Duration::from_secs(2),
481            "Task took {elapsed:?} - cancelled tasks may not be getting skipped"
482        );
483
484        for _ in 0..50 {
485            tokio::time::sleep(Duration::from_millis(100)).await;
486            if cpu::outstanding_tasks() == initial_outstanding {
487                break;
488            }
489        }
490
491        let executed = executed_count.load(Ordering::SeqCst);
492        assert!(
493            executed < 50,
494            "Expected most tasks to be skipped by cancellation, but {executed}/100 executed"
495        );
496    }
497
498    #[tokio::test]
499    #[serial]
500    async fn pool_recovers_after_cancelled_burst() {
501        let initial_outstanding = cpu::outstanding_tasks();
502
503        for _ in 0..50 {
504            let fut = cpu::spawn_fifo_blocking(
505                || {
506                    std::thread::sleep(Duration::from_millis(100));
507                },
508                "test",
509            );
510            let _ = fut.now_or_never();
511        }
512
513        tokio::time::sleep(Duration::from_millis(300)).await;
514
515        for i in 0..10 {
516            let start = std::time::Instant::now();
517            let result = cpu::spawn_fifo_blocking(move || i * 2, "test").await.unwrap();
518            let elapsed = start.elapsed();
519
520            assert_eq!(result, i * 2);
521            assert!(
522                elapsed < Duration::from_millis(500),
523                "Recovery task {i} took {elapsed:?} - pool may still be starved"
524            );
525        }
526
527        for _ in 0..50 {
528            tokio::time::sleep(Duration::from_millis(100)).await;
529            if cpu::outstanding_tasks() == initial_outstanding {
530                break;
531            }
532        }
533    }
534
535    #[tokio::test]
536    #[serial]
537    async fn outstanding_tasks_tracking() {
538        let initial = cpu::outstanding_tasks();
539
540        let barrier = Arc::new(std::sync::Barrier::new(2));
541        let barrier_clone = barrier.clone();
542
543        let handle = tokio::spawn(async move {
544            cpu::spawn_fifo_blocking(
545                move || {
546                    barrier_clone.wait();
547                    42
548                },
549                "test",
550            )
551            .await
552        });
553
554        tokio::time::sleep(Duration::from_millis(50)).await;
555
556        let during = cpu::outstanding_tasks();
557        assert!(
558            during > initial,
559            "Outstanding should increase: initial={initial}, during={during}"
560        );
561
562        barrier.wait();
563
564        let result = handle.await.unwrap();
565        assert_eq!(result.unwrap(), 42);
566
567        tokio::time::sleep(Duration::from_millis(50)).await;
568
569        let after = cpu::outstanding_tasks();
570        assert_eq!(after, initial, "Outstanding should return to initial after completion");
571    }
572
573    #[tokio::test]
574    #[serial]
575    async fn outstanding_decrements_on_cancellation() {
576        let initial = cpu::outstanding_tasks();
577
578        for _ in 0..10 {
579            let fut = cpu::spawn_fifo_blocking(
580                || {
581                    std::thread::sleep(Duration::from_millis(100));
582                },
583                "test",
584            );
585            let _ = fut.now_or_never();
586        }
587
588        tokio::time::sleep(Duration::from_millis(500)).await;
589
590        let after = cpu::outstanding_tasks();
591        assert_eq!(
592            after, initial,
593            "Outstanding should return to initial after cancelled tasks drain"
594        );
595    }
596}