1#[cfg(feature = "parallelize-rayon")]
47pub mod cpu {
48 use std::sync::atomic::{AtomicUsize, Ordering};
49
50 use futures::channel::oneshot;
51 pub use rayon;
52
53 #[cfg(all(feature = "parallelize", feature = "telemetry", not(test)))]
55 const TIMING_BUCKETS: &[f64] = &[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.5, 1.0];
56
57 mod metrics {
58 #[cfg(any(not(all(feature = "parallelize", feature = "telemetry")), test))]
59 pub use noop::*;
60 #[cfg(all(feature = "parallelize", feature = "telemetry", not(test)))]
61 pub use real::*;
62
63 #[cfg(all(feature = "parallelize", feature = "telemetry", not(test)))]
64 mod real {
65 use lazy_static::lazy_static;
66
67 lazy_static! {
68 static ref TASKS_SUBMITTED: hopr_types::telemetry::SimpleCounter =
69 hopr_types::telemetry::SimpleCounter::new(
70 "hopr_rayon_tasks_submitted_total",
71 "Total number of tasks submitted to the Rayon thread pool",
72 )
73 .unwrap();
74 static ref TASKS_COMPLETED: hopr_types::telemetry::SimpleCounter =
75 hopr_types::telemetry::SimpleCounter::new(
76 "hopr_rayon_tasks_completed_total",
77 "Total number of Rayon tasks that completed and delivered results",
78 )
79 .unwrap();
80 static ref TASKS_CANCELLED: hopr_types::telemetry::SimpleCounter =
81 hopr_types::telemetry::SimpleCounter::new(
82 "hopr_rayon_tasks_cancelled_total",
83 "Total number of Rayon tasks skipped because receiver was already dropped",
84 )
85 .unwrap();
86 static ref TASKS_ORPHANED: hopr_types::telemetry::SimpleCounter =
87 hopr_types::telemetry::SimpleCounter::new(
88 "hopr_rayon_tasks_orphaned_total",
89 "Total number of Rayon tasks whose results were discarded after completion",
90 )
91 .unwrap();
92 static ref TASKS_REJECTED: hopr_types::telemetry::SimpleCounter =
93 hopr_types::telemetry::SimpleCounter::new(
94 "hopr_rayon_tasks_rejected_total",
95 "Total number of tasks rejected due to queue being full",
96 )
97 .unwrap();
98 static ref QUEUE_WAIT: hopr_types::telemetry::SimpleHistogram =
99 hopr_types::telemetry::SimpleHistogram::new(
100 "hopr_rayon_queue_wait_seconds",
101 "Time tasks spend waiting in the Rayon queue before execution starts",
102 super::super::TIMING_BUCKETS.to_vec(),
103 )
104 .unwrap();
105 static ref EXECUTION_TIME: hopr_types::telemetry::MultiHistogram =
106 hopr_types::telemetry::MultiHistogram::new(
107 "hopr_rayon_execution_seconds",
108 "Time tasks spend executing in the Rayon thread pool",
109 super::super::TIMING_BUCKETS.to_vec(),
110 &["operation"],
111 )
112 .unwrap();
113 static ref OUTSTANDING_TASKS: hopr_types::telemetry::SimpleGauge =
114 hopr_types::telemetry::SimpleGauge::new(
115 "hopr_rayon_outstanding_tasks",
116 "Current number of tasks queued or running in the Rayon pool",
117 )
118 .unwrap();
119 static ref QUEUE_LIMIT: hopr_types::telemetry::SimpleGauge = hopr_types::telemetry::SimpleGauge::new(
120 "hopr_rayon_queue_limit",
121 "Configured maximum outstanding tasks for the Rayon thread pool",
122 )
123 .unwrap();
124 }
125
126 #[inline]
127 pub fn submitted() {
128 TASKS_SUBMITTED.increment();
129 }
130
131 #[inline]
132 pub fn completed() {
133 TASKS_COMPLETED.increment();
134 }
135
136 #[inline]
137 pub fn cancelled() {
138 TASKS_CANCELLED.increment();
139 }
140
141 #[inline]
142 pub fn orphaned() {
143 TASKS_ORPHANED.increment();
144 }
145
146 #[inline]
147 pub fn rejected() {
148 TASKS_REJECTED.increment();
149 }
150
151 #[inline]
152 pub fn observe_queue_wait(seconds: f64) {
153 QUEUE_WAIT.observe(seconds);
154 }
155
156 #[inline]
157 pub fn observe_execution(operation: &str, seconds: f64) {
158 EXECUTION_TIME.observe(&[operation], seconds);
159 }
160
161 #[inline]
162 pub fn outstanding_inc() {
163 OUTSTANDING_TASKS.increment(1.0);
164 }
165
166 #[inline]
167 pub fn outstanding_dec() {
168 OUTSTANDING_TASKS.decrement(1.0);
169 }
170
171 #[inline]
172 pub fn set_queue_limit(limit: usize) {
173 QUEUE_LIMIT.set(limit as f64);
174 }
175 }
176
177 #[cfg(any(not(all(feature = "parallelize", feature = "telemetry")), test))]
178 mod noop {
179 #[inline]
180 pub fn submitted() {}
181 #[inline]
182 pub fn completed() {}
183 #[inline]
184 pub fn cancelled() {}
185 #[inline]
186 pub fn orphaned() {}
187 #[inline]
188 pub fn rejected() {}
189 #[inline]
190 pub fn observe_queue_wait(_: f64) {}
191 #[inline]
192 pub fn observe_execution(_: &str, _: f64) {}
193 #[inline]
194 pub fn outstanding_inc() {}
195 #[inline]
196 pub fn outstanding_dec() {}
197 #[inline]
198 pub fn set_queue_limit(_: usize) {}
199 }
200 }
201
202 static OUTSTANDING: AtomicUsize = AtomicUsize::new(0);
204
205 lazy_static::lazy_static! {
206 static ref QUEUE_LIMIT: Option<usize> = {
208 let limit = std::env::var("HOPR_CPU_TASK_QUEUE_LIMIT")
209 .ok()
210 .and_then(|v| v.parse::<usize>().ok())
211 .filter(|&v| v > 0);
212
213 if let Some(l) = limit {
214 metrics::set_queue_limit(l);
215 }
216
217 limit
218 };
219 }
220
221 #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
223 pub enum SpawnError {
224 #[error("rayon queue full: {current}/{limit} tasks outstanding")]
226 QueueFull {
227 current: usize,
229 limit: usize,
231 },
232 }
233
234 #[inline]
236 pub fn outstanding_tasks() -> usize {
237 OUTSTANDING.load(Ordering::Relaxed)
238 }
239
240 #[inline]
242 pub fn queue_limit() -> Option<usize> {
243 *QUEUE_LIMIT
244 }
245
246 struct SlotGuard;
249
250 impl SlotGuard {
251 pub fn try_acquire_slot() -> Result<Self, SpawnError> {
255 let prev = OUTSTANDING.fetch_add(1, Ordering::AcqRel);
256 metrics::outstanding_inc();
257 let guard = Self;
258
259 if let Some(limit) = *QUEUE_LIMIT {
260 let new = prev + 1;
261 if new > limit {
262 metrics::rejected();
263 return Err(SpawnError::QueueFull { current: prev, limit });
264 }
265 }
266 Ok(guard)
267 }
268 }
269
270 impl Drop for SlotGuard {
271 #[inline]
272 fn drop(&mut self) {
273 let prev = OUTSTANDING.fetch_sub(1, Ordering::AcqRel);
274 debug_assert!(prev > 0, "outstanding task count underflow");
275 metrics::outstanding_dec();
276 }
277 }
278
279 pub fn init_thread_pool(num_threads: usize) -> Result<(), rayon::ThreadPoolBuildError> {
283 let builder = rayon::ThreadPoolBuilder::new().num_threads(num_threads);
284
285 let builder = builder.spawn_handler(|thread| {
286 let mut thread_builder = std::thread::Builder::new();
287 if let Some(name) = thread.name() {
288 thread_builder = thread_builder.name(name.to_owned());
289 }
290 if let Some(stack_size) = thread.stack_size() {
291 thread_builder = thread_builder.stack_size(stack_size);
292 }
293 thread_builder.spawn(|| {
294 #[cfg(target_os = "macos")]
295 unsafe {
296 libc::pthread_set_qos_class_self_np(libc::qos_class_t::QOS_CLASS_USER_INITIATED, 0);
298 }
299 thread.run()
300 })?;
301 Ok(())
302 });
303
304 let result = builder.build_global();
305 let _ = *QUEUE_LIMIT; result
307 }
308
309 fn cancellable_task<R: Send + 'static>(
318 f: impl FnOnce() -> R + Send + 'static,
319 operation: &'static str,
320 ) -> Result<
321 (
322 impl FnOnce() + Send + 'static,
323 oneshot::Receiver<std::thread::Result<R>>,
324 ),
325 SpawnError,
326 > {
327 let guard = SlotGuard::try_acquire_slot()?;
328
329 let (tx, rx) = oneshot::channel();
330 let submitted_at = std::time::Instant::now();
331
332 metrics::submitted();
333
334 let task = move || {
335 let _g = guard;
338
339 if tx.is_canceled() {
340 tracing::debug!(
341 queue_wait_ms = submitted_at.elapsed().as_millis() as u64,
342 "skipping cancelled task (receiver dropped before execution)"
343 );
344 metrics::cancelled();
345 return;
346 }
347
348 let wait_duration = submitted_at.elapsed();
349 metrics::observe_queue_wait(wait_duration.as_secs_f64());
350
351 let execution_start = std::time::Instant::now();
352 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
353 metrics::observe_execution(operation, execution_start.elapsed().as_secs_f64());
354
355 match tx.send(result) {
356 Ok(()) => metrics::completed(),
357 Err(_) => {
358 tracing::debug!(
359 queue_wait_ms = wait_duration.as_millis() as u64,
360 "receiver dropped during execution, result discarded"
361 );
362 metrics::orphaned();
363 }
364 }
365 };
366
367 Ok((task, rx))
368 }
369
370 pub async fn spawn_blocking<R: Send + 'static>(
381 f: impl FnOnce() -> R + Send + 'static,
382 operation: &'static str,
383 ) -> Result<R, SpawnError> {
384 let (task, rx) = cancellable_task(f, operation)?;
385 rayon::spawn(task);
386 Ok(rx
387 .await
388 .expect("rayon task channel closed unexpectedly")
389 .unwrap_or_else(|panic| std::panic::resume_unwind(panic)))
390 }
391
392 pub async fn spawn_fifo_blocking<R: Send + 'static>(
404 f: impl FnOnce() -> R + Send + 'static,
405 operation: &'static str,
406 ) -> Result<R, SpawnError> {
407 let (task, rx) = cancellable_task(f, operation)?;
408 rayon::spawn_fifo(task);
409 Ok(rx
410 .await
411 .expect("rayon task channel closed unexpectedly")
412 .unwrap_or_else(|panic| std::panic::resume_unwind(panic)))
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use std::{
419 sync::{
420 Arc,
421 atomic::{AtomicU32, Ordering},
422 },
423 time::Duration,
424 };
425
426 use futures::FutureExt;
427 use serial_test::serial;
428
429 use super::cpu;
430
431 #[tokio::test]
432 #[serial]
433 async fn spawn_blocking_returns_result() {
434 let result = cpu::spawn_blocking(|| 42, "test").await.unwrap();
435 assert_eq!(result, 42);
436 }
437
438 #[tokio::test]
439 #[serial]
440 async fn spawn_fifo_blocking_returns_result() {
441 let result = cpu::spawn_fifo_blocking(|| "hello", "test").await.unwrap();
442 assert_eq!(result, "hello");
443 }
444
445 #[cfg(panic = "unwind")]
446 #[tokio::test]
447 #[serial]
448 async fn spawn_blocking_propagates_panic() {
449 let result = std::panic::AssertUnwindSafe(async {
450 cpu::spawn_blocking(
451 || {
452 panic!("test panic");
453 },
454 "test",
455 )
456 .await
457 .unwrap()
458 })
459 .catch_unwind()
460 .await;
461 assert!(result.is_err(), "should propagate panic from Rayon task");
462 }
463
464 #[tokio::test]
465 #[serial]
466 async fn cancelled_tasks_are_skipped_via_cooperative_cancellation() {
467 let initial_outstanding = cpu::outstanding_tasks();
468 let executed_count = Arc::new(AtomicU32::new(0));
469
470 for _ in 0..100 {
471 let count = executed_count.clone();
472 let fut = cpu::spawn_fifo_blocking(
473 move || {
474 count.fetch_add(1, Ordering::SeqCst);
475 std::thread::sleep(Duration::from_millis(50));
476 },
477 "test",
478 );
479 let _ = fut.now_or_never();
480 }
481
482 let start = std::time::Instant::now();
483 let result = cpu::spawn_fifo_blocking(|| 42, "test").await.unwrap();
484 let elapsed = start.elapsed();
485
486 assert_eq!(result, 42);
487 assert!(
488 elapsed < Duration::from_secs(2),
489 "Task took {elapsed:?} - cancelled tasks may not be getting skipped"
490 );
491
492 for _ in 0..50 {
493 tokio::time::sleep(Duration::from_millis(100)).await;
494 if cpu::outstanding_tasks() == initial_outstanding {
495 break;
496 }
497 }
498
499 let executed = executed_count.load(Ordering::SeqCst);
500 assert!(
501 executed < 50,
502 "Expected most tasks to be skipped by cancellation, but {executed}/100 executed"
503 );
504 }
505
506 #[tokio::test]
507 #[serial]
508 async fn pool_recovers_after_cancelled_burst() {
509 let initial_outstanding = cpu::outstanding_tasks();
510
511 for _ in 0..50 {
512 let fut = cpu::spawn_fifo_blocking(
513 || {
514 std::thread::sleep(Duration::from_millis(100));
515 },
516 "test",
517 );
518 let _ = fut.now_or_never();
519 }
520
521 tokio::time::sleep(Duration::from_millis(300)).await;
522
523 for i in 0..10 {
524 let start = std::time::Instant::now();
525 let result = cpu::spawn_fifo_blocking(move || i * 2, "test").await.unwrap();
526 let elapsed = start.elapsed();
527
528 assert_eq!(result, i * 2);
529 assert!(
530 elapsed < Duration::from_millis(500),
531 "Recovery task {i} took {elapsed:?} - pool may still be starved"
532 );
533 }
534
535 for _ in 0..50 {
536 tokio::time::sleep(Duration::from_millis(100)).await;
537 if cpu::outstanding_tasks() == initial_outstanding {
538 break;
539 }
540 }
541 }
542
543 #[tokio::test]
544 #[serial]
545 async fn outstanding_tasks_tracking() {
546 let initial = cpu::outstanding_tasks();
547
548 let barrier = Arc::new(std::sync::Barrier::new(2));
549 let barrier_clone = barrier.clone();
550
551 let handle = tokio::spawn(async move {
552 cpu::spawn_fifo_blocking(
553 move || {
554 barrier_clone.wait();
555 42
556 },
557 "test",
558 )
559 .await
560 });
561
562 tokio::time::sleep(Duration::from_millis(50)).await;
563
564 let during = cpu::outstanding_tasks();
565 assert!(
566 during > initial,
567 "Outstanding should increase: initial={initial}, during={during}"
568 );
569
570 barrier.wait();
571
572 let result = handle.await.unwrap();
573 assert_eq!(result.unwrap(), 42);
574
575 tokio::time::sleep(Duration::from_millis(50)).await;
576
577 let after = cpu::outstanding_tasks();
578 assert_eq!(after, initial, "Outstanding should return to initial after completion");
579 }
580
581 #[tokio::test]
582 #[serial]
583 async fn outstanding_decrements_on_cancellation() {
584 let initial = cpu::outstanding_tasks();
585
586 for _ in 0..10 {
587 let fut = cpu::spawn_fifo_blocking(
588 || {
589 std::thread::sleep(Duration::from_millis(100));
590 },
591 "test",
592 );
593 let _ = fut.now_or_never();
594 }
595
596 tokio::time::sleep(Duration::from_millis(500)).await;
597
598 let after = cpu::outstanding_tasks();
599 assert_eq!(
600 after, initial,
601 "Outstanding should return to initial after cancelled tasks drain"
602 );
603 }
604}