1#[cfg(feature = "rayon")]
47pub mod cpu {
48 use std::sync::atomic::{AtomicUsize, Ordering};
49
50 use futures::channel::oneshot;
51 pub use rayon;
52
53 #[cfg(all(feature = "prometheus", not(test)))]
55 const TIMING_BUCKETS: &[f64] = &[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.5, 1.0];
56
57 mod metrics {
58 #[cfg(any(not(feature = "prometheus"), test))]
59 pub use noop::*;
60 #[cfg(all(feature = "prometheus", not(test)))]
61 pub use real::*;
62
63 #[cfg(all(feature = "prometheus", not(test)))]
64 mod real {
65 use lazy_static::lazy_static;
66
67 lazy_static! {
68 static ref TASKS_SUBMITTED: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
69 "hopr_rayon_tasks_submitted_total",
70 "Total number of tasks submitted to the Rayon thread pool",
71 )
72 .unwrap();
73 static ref TASKS_COMPLETED: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
74 "hopr_rayon_tasks_completed_total",
75 "Total number of Rayon tasks that completed and delivered results",
76 )
77 .unwrap();
78 static ref TASKS_CANCELLED: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
79 "hopr_rayon_tasks_cancelled_total",
80 "Total number of Rayon tasks skipped because receiver was already dropped",
81 )
82 .unwrap();
83 static ref TASKS_ORPHANED: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
84 "hopr_rayon_tasks_orphaned_total",
85 "Total number of Rayon tasks whose results were discarded after completion",
86 )
87 .unwrap();
88 static ref TASKS_REJECTED: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
89 "hopr_rayon_tasks_rejected_total",
90 "Total number of tasks rejected due to queue being full",
91 )
92 .unwrap();
93 static ref QUEUE_WAIT: hopr_metrics::SimpleHistogram = hopr_metrics::SimpleHistogram::new(
94 "hopr_rayon_queue_wait_seconds",
95 "Time tasks spend waiting in the Rayon queue before execution starts",
96 super::super::TIMING_BUCKETS.to_vec(),
97 )
98 .unwrap();
99 static ref EXECUTION_TIME: hopr_metrics::MultiHistogram = hopr_metrics::MultiHistogram::new(
100 "hopr_rayon_execution_seconds",
101 "Time tasks spend executing in the Rayon thread pool",
102 super::super::TIMING_BUCKETS.to_vec(),
103 &["operation"],
104 )
105 .unwrap();
106 static ref OUTSTANDING_TASKS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
107 "hopr_rayon_outstanding_tasks",
108 "Current number of tasks queued or running in the Rayon pool",
109 )
110 .unwrap();
111 static ref QUEUE_LIMIT: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
112 "hopr_rayon_queue_limit",
113 "Configured maximum outstanding tasks for the Rayon thread pool",
114 )
115 .unwrap();
116 }
117
118 #[inline]
119 pub fn submitted() {
120 TASKS_SUBMITTED.increment();
121 }
122
123 #[inline]
124 pub fn completed() {
125 TASKS_COMPLETED.increment();
126 }
127
128 #[inline]
129 pub fn cancelled() {
130 TASKS_CANCELLED.increment();
131 }
132
133 #[inline]
134 pub fn orphaned() {
135 TASKS_ORPHANED.increment();
136 }
137
138 #[inline]
139 pub fn rejected() {
140 TASKS_REJECTED.increment();
141 }
142
143 #[inline]
144 pub fn observe_queue_wait(seconds: f64) {
145 QUEUE_WAIT.observe(seconds);
146 }
147
148 #[inline]
149 pub fn observe_execution(operation: &str, seconds: f64) {
150 EXECUTION_TIME.observe(&[operation], seconds);
151 }
152
153 #[inline]
154 pub fn outstanding_inc() {
155 OUTSTANDING_TASKS.increment(1.0);
156 }
157
158 #[inline]
159 pub fn outstanding_dec() {
160 OUTSTANDING_TASKS.decrement(1.0);
161 }
162
163 #[inline]
164 pub fn set_queue_limit(limit: usize) {
165 QUEUE_LIMIT.set(limit as f64);
166 }
167 }
168
169 #[cfg(any(not(feature = "prometheus"), test))]
170 mod noop {
171 #[inline]
172 pub fn submitted() {}
173 #[inline]
174 pub fn completed() {}
175 #[inline]
176 pub fn cancelled() {}
177 #[inline]
178 pub fn orphaned() {}
179 #[inline]
180 pub fn rejected() {}
181 #[inline]
182 pub fn observe_queue_wait(_: f64) {}
183 #[inline]
184 pub fn observe_execution(_: &str, _: f64) {}
185 #[inline]
186 pub fn outstanding_inc() {}
187 #[inline]
188 pub fn outstanding_dec() {}
189 #[inline]
190 pub fn set_queue_limit(_: usize) {}
191 }
192 }
193
194 static OUTSTANDING: AtomicUsize = AtomicUsize::new(0);
196
197 lazy_static::lazy_static! {
198 static ref QUEUE_LIMIT: Option<usize> = {
200 let limit = std::env::var("HOPR_CPU_TASK_QUEUE_LIMIT")
201 .ok()
202 .and_then(|v| v.parse::<usize>().ok())
203 .filter(|&v| v > 0);
204
205 if let Some(l) = limit {
206 metrics::set_queue_limit(l);
207 }
208
209 limit
210 };
211 }
212
213 #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
215 pub enum SpawnError {
216 #[error("rayon queue full: {current}/{limit} tasks outstanding")]
218 QueueFull {
219 current: usize,
221 limit: usize,
223 },
224 }
225
226 #[inline]
228 pub fn outstanding_tasks() -> usize {
229 OUTSTANDING.load(Ordering::Relaxed)
230 }
231
232 #[inline]
234 pub fn queue_limit() -> Option<usize> {
235 *QUEUE_LIMIT
236 }
237
238 struct SlotGuard;
241
242 impl SlotGuard {
243 pub fn try_acquire_slot() -> Result<Self, SpawnError> {
247 let prev = OUTSTANDING.fetch_add(1, Ordering::AcqRel);
248 metrics::outstanding_inc();
249 let guard = Self;
250
251 if let Some(limit) = *QUEUE_LIMIT {
252 let new = prev + 1;
253 if new > limit {
254 metrics::rejected();
255 return Err(SpawnError::QueueFull { current: prev, limit });
256 }
257 }
258 Ok(guard)
259 }
260 }
261
262 impl Drop for SlotGuard {
263 #[inline]
264 fn drop(&mut self) {
265 let prev = OUTSTANDING.fetch_sub(1, Ordering::AcqRel);
266 debug_assert!(prev > 0, "outstanding task count underflow");
267 metrics::outstanding_dec();
268 }
269 }
270
271 pub fn init_thread_pool(num_threads: usize) -> Result<(), rayon::ThreadPoolBuildError> {
275 let builder = rayon::ThreadPoolBuilder::new().num_threads(num_threads);
276
277 let builder = builder.spawn_handler(|thread| {
278 let mut thread_builder = std::thread::Builder::new();
279 if let Some(name) = thread.name() {
280 thread_builder = thread_builder.name(name.to_owned());
281 }
282 if let Some(stack_size) = thread.stack_size() {
283 thread_builder = thread_builder.stack_size(stack_size);
284 }
285 thread_builder.spawn(|| {
286 #[cfg(target_os = "macos")]
287 unsafe {
288 libc::pthread_set_qos_class_self_np(libc::qos_class_t::QOS_CLASS_USER_INITIATED, 0);
290 }
291 thread.run()
292 })?;
293 Ok(())
294 });
295
296 let result = builder.build_global();
297 let _ = *QUEUE_LIMIT; result
299 }
300
301 fn cancellable_task<R: Send + 'static>(
310 f: impl FnOnce() -> R + Send + 'static,
311 operation: &'static str,
312 ) -> Result<
313 (
314 impl FnOnce() + Send + 'static,
315 oneshot::Receiver<std::thread::Result<R>>,
316 ),
317 SpawnError,
318 > {
319 let guard = SlotGuard::try_acquire_slot()?;
320
321 let (tx, rx) = oneshot::channel();
322 let submitted_at = std::time::Instant::now();
323
324 metrics::submitted();
325
326 let task = move || {
327 let _g = guard;
330
331 if tx.is_canceled() {
332 tracing::debug!(
333 queue_wait_ms = submitted_at.elapsed().as_millis() as u64,
334 "skipping cancelled task (receiver dropped before execution)"
335 );
336 metrics::cancelled();
337 return;
338 }
339
340 let wait_duration = submitted_at.elapsed();
341 metrics::observe_queue_wait(wait_duration.as_secs_f64());
342
343 let execution_start = std::time::Instant::now();
344 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
345 metrics::observe_execution(operation, execution_start.elapsed().as_secs_f64());
346
347 match tx.send(result) {
348 Ok(()) => metrics::completed(),
349 Err(_) => {
350 tracing::debug!(
351 queue_wait_ms = wait_duration.as_millis() as u64,
352 "receiver dropped during execution, result discarded"
353 );
354 metrics::orphaned();
355 }
356 }
357 };
358
359 Ok((task, rx))
360 }
361
362 pub async fn spawn_blocking<R: Send + 'static>(
373 f: impl FnOnce() -> R + Send + 'static,
374 operation: &'static str,
375 ) -> Result<R, SpawnError> {
376 let (task, rx) = cancellable_task(f, operation)?;
377 rayon::spawn(task);
378 Ok(rx
379 .await
380 .expect("rayon task channel closed unexpectedly")
381 .unwrap_or_else(|panic| std::panic::resume_unwind(panic)))
382 }
383
384 pub async fn spawn_fifo_blocking<R: Send + 'static>(
396 f: impl FnOnce() -> R + Send + 'static,
397 operation: &'static str,
398 ) -> Result<R, SpawnError> {
399 let (task, rx) = cancellable_task(f, operation)?;
400 rayon::spawn_fifo(task);
401 Ok(rx
402 .await
403 .expect("rayon task channel closed unexpectedly")
404 .unwrap_or_else(|panic| std::panic::resume_unwind(panic)))
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use std::{
411 sync::{
412 Arc,
413 atomic::{AtomicU32, Ordering},
414 },
415 time::Duration,
416 };
417
418 use futures::FutureExt;
419 use serial_test::serial;
420
421 use super::cpu;
422
423 #[tokio::test]
424 #[serial]
425 async fn spawn_blocking_returns_result() {
426 let result = cpu::spawn_blocking(|| 42, "test").await.unwrap();
427 assert_eq!(result, 42);
428 }
429
430 #[tokio::test]
431 #[serial]
432 async fn spawn_fifo_blocking_returns_result() {
433 let result = cpu::spawn_fifo_blocking(|| "hello", "test").await.unwrap();
434 assert_eq!(result, "hello");
435 }
436
437 #[cfg(panic = "unwind")]
438 #[tokio::test]
439 #[serial]
440 async fn spawn_blocking_propagates_panic() {
441 let result = std::panic::AssertUnwindSafe(async {
442 cpu::spawn_blocking(
443 || {
444 panic!("test panic");
445 },
446 "test",
447 )
448 .await
449 .unwrap()
450 })
451 .catch_unwind()
452 .await;
453 assert!(result.is_err(), "should propagate panic from Rayon task");
454 }
455
456 #[tokio::test]
457 #[serial]
458 async fn cancelled_tasks_are_skipped_via_cooperative_cancellation() {
459 let initial_outstanding = cpu::outstanding_tasks();
460 let executed_count = Arc::new(AtomicU32::new(0));
461
462 for _ in 0..100 {
463 let count = executed_count.clone();
464 let fut = cpu::spawn_fifo_blocking(
465 move || {
466 count.fetch_add(1, Ordering::SeqCst);
467 std::thread::sleep(Duration::from_millis(50));
468 },
469 "test",
470 );
471 let _ = fut.now_or_never();
472 }
473
474 let start = std::time::Instant::now();
475 let result = cpu::spawn_fifo_blocking(|| 42, "test").await.unwrap();
476 let elapsed = start.elapsed();
477
478 assert_eq!(result, 42);
479 assert!(
480 elapsed < Duration::from_secs(2),
481 "Task took {elapsed:?} - cancelled tasks may not be getting skipped"
482 );
483
484 for _ in 0..50 {
485 tokio::time::sleep(Duration::from_millis(100)).await;
486 if cpu::outstanding_tasks() == initial_outstanding {
487 break;
488 }
489 }
490
491 let executed = executed_count.load(Ordering::SeqCst);
492 assert!(
493 executed < 50,
494 "Expected most tasks to be skipped by cancellation, but {executed}/100 executed"
495 );
496 }
497
498 #[tokio::test]
499 #[serial]
500 async fn pool_recovers_after_cancelled_burst() {
501 let initial_outstanding = cpu::outstanding_tasks();
502
503 for _ in 0..50 {
504 let fut = cpu::spawn_fifo_blocking(
505 || {
506 std::thread::sleep(Duration::from_millis(100));
507 },
508 "test",
509 );
510 let _ = fut.now_or_never();
511 }
512
513 tokio::time::sleep(Duration::from_millis(300)).await;
514
515 for i in 0..10 {
516 let start = std::time::Instant::now();
517 let result = cpu::spawn_fifo_blocking(move || i * 2, "test").await.unwrap();
518 let elapsed = start.elapsed();
519
520 assert_eq!(result, i * 2);
521 assert!(
522 elapsed < Duration::from_millis(500),
523 "Recovery task {i} took {elapsed:?} - pool may still be starved"
524 );
525 }
526
527 for _ in 0..50 {
528 tokio::time::sleep(Duration::from_millis(100)).await;
529 if cpu::outstanding_tasks() == initial_outstanding {
530 break;
531 }
532 }
533 }
534
535 #[tokio::test]
536 #[serial]
537 async fn outstanding_tasks_tracking() {
538 let initial = cpu::outstanding_tasks();
539
540 let barrier = Arc::new(std::sync::Barrier::new(2));
541 let barrier_clone = barrier.clone();
542
543 let handle = tokio::spawn(async move {
544 cpu::spawn_fifo_blocking(
545 move || {
546 barrier_clone.wait();
547 42
548 },
549 "test",
550 )
551 .await
552 });
553
554 tokio::time::sleep(Duration::from_millis(50)).await;
555
556 let during = cpu::outstanding_tasks();
557 assert!(
558 during > initial,
559 "Outstanding should increase: initial={initial}, during={during}"
560 );
561
562 barrier.wait();
563
564 let result = handle.await.unwrap();
565 assert_eq!(result.unwrap(), 42);
566
567 tokio::time::sleep(Duration::from_millis(50)).await;
568
569 let after = cpu::outstanding_tasks();
570 assert_eq!(after, initial, "Outstanding should return to initial after completion");
571 }
572
573 #[tokio::test]
574 #[serial]
575 async fn outstanding_decrements_on_cancellation() {
576 let initial = cpu::outstanding_tasks();
577
578 for _ in 0..10 {
579 let fut = cpu::spawn_fifo_blocking(
580 || {
581 std::thread::sleep(Duration::from_millis(100));
582 },
583 "test",
584 );
585 let _ = fut.now_or_never();
586 }
587
588 tokio::time::sleep(Duration::from_millis(500)).await;
589
590 let after = cpu::outstanding_tasks();
591 assert_eq!(
592 after, initial,
593 "Outstanding should return to initial after cancelled tasks drain"
594 );
595 }
596}