hopr_parallelize/
lib.rs

1//! Parallelization utilities for different types of workloads.
2//!
3//! ## Blocking and non-blocking code
4//! Using `async` executor carries the need to consider different types of operations happening during the
5//! code execution. Primarily, the task in async code can be divided into *blocking* and *non-blocking*,
6//! where a code execution is considered blocking, if it does not allow the executor to swap the current task
7//! (i.e. to jump to a different code execution block). A good rule of thumb for efficient asynchronous code
8//! is to not have more than 10 to 100 microseconds between each .await.
9//!
10//! ## What if blocking is needed
11//! Sometimes it is necessary to block a thread, e.g. when performing a CPU intensive task or waiting for a
12//! synchronous IO operation. Because these blocking operations would prevent the async executor to jump to a
13//! different task, effectively blocking it, one of the 3 possible strategies must be used to offload the
14//! blocking task from the executor's thread:
15//! 1. use executor native `spawn_blocking` to spawn the blocking task to a dedicated pool of blocking tasks running
16//!    alongside the executor threads
17//!    - this solution allows to offload tasks onto typically hundreds of threads
18//!    - because there are typically too many threads, such a scenario is ideal for synchronous blocking IO
19//! 2. use a dedicated parallelization mechanism with its own thread pool
20//!    - solution most typically used for the CPU heavy tasks
21//!    - allows execution of the task over a smaller thread pool fully optimizing each CPU
22//! 3. use a `thread`
23//!    - used typically when a blocking operation keeps running forever
24//!
25//! More information about parallization, execution and executors can be found in an excellent blog post [here](https://ryhl.io/blog/async-what-is-blocking/).
26
27/// Module for real thread pool based parallelization of CPU heavy blocking workloads.
28pub mod cpu {
29    pub use rayon;
30
31    /// Spawn an awaitable non-blocking execution of the given blocking function on a `rayon` CPU thread pool.
32    ///
33    /// The current thread pool uses a LIFO (Last In First Out) scheduling policy for the thread's queue, but
34    /// FIFO (First In First Out) for stealing tasks from other threads.
35    #[cfg(feature = "rayon")]
36    pub async fn spawn_blocking<R: Send + 'static>(f: impl FnOnce() -> R + Send + 'static) -> R {
37        let (tx, rx) = futures::channel::oneshot::channel();
38        rayon::spawn(|| {
39            tx.send(std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)))
40                .unwrap_or_else(|_| unreachable!())
41        });
42        rx.await
43            .expect("spawned blocking process should be awaitable")
44            .unwrap_or_else(|caught_panic| std::panic::resume_unwind(caught_panic))
45    }
46
47    /// Spawn an awaitable non-blocking execution of the given blocking function on a `rayon` CPU thread pool.
48    ///
49    /// Executed tasks are loaded using a FIFO (First In First Out) scheduling policy.
50    #[cfg(feature = "rayon")]
51    pub async fn spawn_fifo_blocking<R: Send + 'static>(f: impl FnOnce() -> R + Send + 'static) -> R {
52        let (tx, rx) = futures::channel::oneshot::channel();
53        rayon::spawn_fifo(|| {
54            tx.send(std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)))
55                .unwrap_or_else(|_| unreachable!())
56        });
57        rx.await
58            .expect("spawned fifo blocking process should be awaitable")
59            .unwrap_or_else(|caught_panic| std::panic::resume_unwind(caught_panic))
60    }
61}