hopr_async_runtime/
lib.rs

1//! Executor API for HOPR which exposes the necessary async functions depending on the enabled
2//! runtime.
3
4use std::hash::Hash;
5
6pub use futures::future::AbortHandle;
7
8// Both features could be enabled during testing; therefore, we only use tokio when it's
9// exclusively enabled.
10#[cfg(feature = "runtime-tokio")]
11pub mod prelude {
12    pub use futures::future::{AbortHandle, abortable};
13    pub use tokio::{
14        task::{JoinError, JoinHandle, spawn, spawn_blocking, spawn_local},
15        time::{sleep, timeout as timeout_fut},
16    };
17}
18
19#[macro_export]
20macro_rules! spawn_as_abortable {
21    ($($expr:expr),*) => {{
22        let (proc, abort_handle) = $crate::prelude::abortable($($expr),*);
23        let _jh = $crate::prelude::spawn(proc);
24        abort_handle
25    }}
26}
27
28// If no runtime is enabled, fail compilation
29#[cfg(not(feature = "runtime-tokio"))]
30compile_error!("No runtime enabled");
31
32/// Abstraction over tasks that can be aborted (such as join or abort handles).
33#[auto_impl::auto_impl(&, Box, Arc)]
34pub trait Abortable {
35    /// Notifies the task that it should abort.
36    ///
37    /// Must be idempotent and not panic if it was already called before, due to implementation-specific
38    /// semantics of [`Abortable::was_aborted`].
39    fn abort_task(&self);
40
41    /// Returns `true` if [`abort_task`](Abortable::abort_task) was already called or the task has finished.
42    ///
43    /// It is implementation-specific whether `true` actually means that the task has been finished.
44    /// The [`Abortable::abort_task`] therefore can be also called if `true` is returned without a consequence.
45    fn was_aborted(&self) -> bool;
46}
47
48impl Abortable for AbortHandle {
49    fn abort_task(&self) {
50        self.abort();
51    }
52
53    fn was_aborted(&self) -> bool {
54        self.is_aborted()
55    }
56}
57
58#[cfg(feature = "runtime-tokio")]
59impl Abortable for tokio::task::JoinHandle<()> {
60    fn abort_task(&self) {
61        self.abort();
62    }
63
64    fn was_aborted(&self) -> bool {
65        self.is_finished()
66    }
67}
68
69/// List of [`Abortable`] tasks with each task identified by a unique key of type `T`.
70///
71/// Abortable objects, such as join or abort handles, do not by design abort when dropped.
72/// Sometimes this behavior is not desirable, and spawned run-away tasks may still continue to live
73/// e.g.: after an error is raised.
74///
75/// This object allows safely managing abortable tasks and will terminate all the tasks in reverse insertion order once
76/// dropped.
77///
78/// Additionally, this object also implements [`Abortable`] allowing it to be arbitrarily nested.
79pub struct AbortableList<T>(indexmap::IndexMap<T, Box<dyn Abortable + Send + Sync>>);
80
81impl<T> Default for AbortableList<T> {
82    fn default() -> Self {
83        Self(indexmap::IndexMap::new())
84    }
85}
86
87impl<T: Hash + Eq> AbortableList<T> {
88    /// Appends a new [`abortable task`](Abortable) to the end of this list.
89    pub fn insert<A: Abortable + Send + Sync + 'static>(&mut self, process: T, task: A) {
90        self.0.insert(process, Box::new(task));
91    }
92
93    /// Looks up a task by its key, removes it and aborts it.
94    ///
95    /// Returns `true` if the task was aborted and removed.
96    /// Otherwise, returns `false` (including a situation when the task was present but already aborted).
97    pub fn abort_one(&mut self, process: &T) -> bool {
98        if let Some(item) = self.0.shift_remove(process).filter(|t| !t.was_aborted()) {
99            item.abort_task();
100            true
101        } else {
102            false
103        }
104    }
105
106    /// Extends this list by appending `other`.
107    ///
108    /// The tasks from `other` are moved to this list without aborting them.
109    /// Afterward, `other` will be empty.
110    pub fn extend_from(&mut self, mut other: AbortableList<T>) {
111        self.0.extend(other.0.drain(..));
112    }
113
114    /// Extends this list by appending `other` while mapping its keys to the ones in this list.
115    ///
116    /// The tasks from `other` are moved to this list without aborting them.
117    /// Afterward, `other` will be empty.
118    pub fn flat_map_extend_from<U>(&mut self, mut other: AbortableList<U>, key_map: impl Fn(U) -> T) {
119        self.0.extend(other.0.drain(..).map(|(k, v)| (key_map(k), v)));
120    }
121}
122impl<T> AbortableList<T> {
123    /// Checks if the list is empty.
124    pub fn is_empty(&self) -> bool {
125        self.0.is_empty()
126    }
127
128    /// Returns the number of abortable tasks in the list.
129    pub fn size(&self) -> usize {
130        self.0.len()
131    }
132
133    /// Returns an iterator over the task names in the insertion order.
134    pub fn iter_names(&self) -> impl Iterator<Item = &T> {
135        self.0.keys()
136    }
137
138    /// Aborts all tasks in this list in the reverse insertion order.
139    ///
140    /// Skips tasks which were [already aborted](Abortable::was_aborted).
141    pub fn abort_all(&self) {
142        for (_, task) in self.0.iter().rev().filter(|(_, task)| !task.was_aborted()) {
143            task.abort_task();
144        }
145    }
146}
147
148impl<T> Abortable for AbortableList<T> {
149    fn abort_task(&self) {
150        self.abort_all();
151    }
152
153    fn was_aborted(&self) -> bool {
154        self.0.iter().all(|(_, task)| task.was_aborted())
155    }
156}
157
158impl<T> Drop for AbortableList<T> {
159    fn drop(&mut self) {
160        self.abort_all();
161        self.0.clear();
162    }
163}