hopr_parallelize/lib.rs
1//! Parallelization utilities for different types of workloads.
2//!
3//! ## Blocking and non-blocking code
4//! Using `async` executor carries the need to consider different types of operations happening during the
5//! code execution. Primarily, the task in async code can be divided into *blocking* and *non-blocking*,
6//! where a code execution is considered blocking, if it does not allow the executor to swap the current task
7//! (i.e. to jump to a different code execution block). A good rule of thumb for efficient asynchronous code
8//! is to not have more than 10 to 100 microseconds between each .await.
9//!
10//! ## What if blocking is needed
11//! Sometimes it is necessary to block a thread, e.g. when performing a CPU intensive task or waiting for a
12//! synchronous IO operation. Because these blocking operations would prevent the async executor to jump to a
13//! different task, effectively blocking it, one of the 3 possible strategies must be used to offload the
14//! blocking task from the executor's thread:
15//! 1. use executor native `spawn_blocking` to spawn the blocking task to a dedicated pool of blocking tasks running alongside the executor threads
16//! - this solution allows to offload tasks onto typically hundreds of threads
17//! - because there are typically too many threads, such a scenario is ideal for synchronous blocking IO
18//! 2. use a dedicated parallelization mechanism with its own thread pool
19//! - solution most typically used for the CPU heavy tasks
20//! - allows execution of the task over a smaller thread pool fully optimizing each CPU
21//! 3. use a `thread`
22//! - used typically when a blocking operation keeps running forever
23//!
24//! More information about parallization, execution and executors can be found in an excellent blog post [here](https://ryhl.io/blog/async-what-is-blocking/).
25
26/// Module for real thread pool based parallelization of CPU heavy blocking workloads.
27pub mod cpu {
28 pub use rayon;
29
30 /// Spawn an awaitable non-blocking execution of the given blocking function on a `rayon` CPU thread pool.
31 ///
32 /// The current thread pool uses a LIFO (Last In First Out) scheduling policy for the thread's queue, but
33 /// FIFO (First In First Out) for stealing tasks from other threads.
34 #[cfg(feature = "rayon")]
35 pub async fn spawn_blocking<R: Send + 'static>(f: impl FnOnce() -> R + Send + 'static) -> R {
36 let (tx, rx) = futures::channel::oneshot::channel();
37 rayon::spawn(|| {
38 tx.send(std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)))
39 .unwrap_or_else(|_| unreachable!())
40 });
41 rx.await
42 .expect("spawned blocking process should be awaitable")
43 .unwrap_or_else(|caught_panic| std::panic::resume_unwind(caught_panic))
44 }
45
46 /// Spawn an awaitable non-blocking execution of the given blocking function on a `rayon` CPU thread pool.
47 ///
48 /// Executed tasks are loaded using a FIFO (First In First Out) scheduling policy.
49 #[cfg(feature = "rayon")]
50 pub async fn spawn_fifo_blocking<R: Send + 'static>(f: impl FnOnce() -> R + Send + 'static) -> R {
51 let (tx, rx) = futures::channel::oneshot::channel();
52 rayon::spawn_fifo(|| {
53 tx.send(std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)))
54 .unwrap_or_else(|_| unreachable!())
55 });
56 rx.await
57 .expect("spawned fifo blocking process should be awaitable")
58 .unwrap_or_else(|caught_panic| std::panic::resume_unwind(caught_panic))
59 }
60}