hopr_parallelize/
lib.rs

1//! Parallelization utilities for different types of workloads.
2//!
3//! ## Blocking and non-blocking code
4//! Using `async` executor carries the need to consider different types of operations happening during the
5//! code execution. Primarily, the task in async code can be divided into *blocking* and *non-blocking*,
6//! where a code execution is considered blocking, if it does not allow the executor to swap the current task
7//! (i.e. to jump to a different code execution block). A good rule of thumb for efficient asynchronous code
8//! is to not have more than 10 to 100 microseconds between each .await.
9//!
10//! ## What if blocking is needed
11//! Sometimes it is necessary to block a thread, e.g. when performing a CPU intensive task or waiting for a
12//! synchronous IO operation. Because these blocking operations would prevent the async executor to jump to a
13//! different task, effectively blocking it, one of the 3 possible strategies must be used to offload the
14//! blocking task from the executor's thread:
15//! 1. use executor native `spawn_blocking` to spawn the blocking task to a dedicated pool of blocking tasks running alongside the executor threads
16//!    - this solution allows to offload tasks onto typically hundreds of threads
17//!    - because there are typically too many threads, such a scenario is ideal for synchronous blocking IO
18//! 2. use a dedicated parallelization mechanism with its own thread pool
19//!    - solution most typically used for the CPU heavy tasks
20//!    - allows execution of the task over a smaller thread pool fully optimizing each CPU
21//! 3. use a `thread`
22//!    - used typically when a blocking operation keeps running forever
23//!
24//! More information about parallization, execution and executors can be found in an excellent blog post [here](https://ryhl.io/blog/async-what-is-blocking/).
25
26/// Module for real thread pool based parallelization of CPU heavy blocking workloads.
27pub mod cpu {
28    pub use rayon;
29
30    /// Spawn an awaitable non-blocking execution of the given blocking function on a `rayon` CPU thread pool.
31    ///
32    /// The current thread pool uses a LIFO (Last In First Out) scheduling policy for the thread's queue, but
33    /// FIFO (First In First Out) for stealing tasks from other threads.
34    #[cfg(feature = "rayon")]
35    pub async fn spawn_blocking<R: Send + 'static>(f: impl FnOnce() -> R + Send + 'static) -> R {
36        let (tx, rx) = futures::channel::oneshot::channel();
37        rayon::spawn(|| {
38            tx.send(std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)))
39                .unwrap_or_else(|_| unreachable!())
40        });
41        rx.await
42            .expect("spawned blocking process should be awaitable")
43            .unwrap_or_else(|caught_panic| std::panic::resume_unwind(caught_panic))
44    }
45
46    /// Spawn an awaitable non-blocking execution of the given blocking function on a `rayon` CPU thread pool.
47    ///
48    /// Executed tasks are loaded using a FIFO (First In First Out) scheduling policy.
49    #[cfg(feature = "rayon")]
50    pub async fn spawn_fifo_blocking<R: Send + 'static>(f: impl FnOnce() -> R + Send + 'static) -> R {
51        let (tx, rx) = futures::channel::oneshot::channel();
52        rayon::spawn_fifo(|| {
53            tx.send(std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)))
54                .unwrap_or_else(|_| unreachable!())
55        });
56        rx.await
57            .expect("spawned fifo blocking process should be awaitable")
58            .unwrap_or_else(|caught_panic| std::panic::resume_unwind(caught_panic))
59    }
60}