Skip to main content

hopr_async_runtime/
lib.rs

1//! Executor API for HOPR which exposes the necessary async functions depending on the enabled
2//! runtime.
3
4use std::hash::Hash;
5
6pub use futures::future::AbortHandle;
7
8// Both features could be enabled during testing; therefore, we only use tokio when it's
9// exclusively enabled.
10#[cfg(feature = "runtime-tokio")]
11pub mod prelude {
12    pub use futures::future::{AbortHandle, abortable};
13    pub use tokio::{
14        task::{JoinError, JoinHandle, spawn, spawn_blocking, spawn_local},
15        time::{sleep, timeout as timeout_fut},
16    };
17}
18
19#[macro_export]
20macro_rules! spawn_as_abortable {
21    ($($expr:expr),*) => {{
22        let (proc, abort_handle) = $crate::prelude::abortable($($expr),*);
23        let _jh = $crate::prelude::spawn(proc);
24        abort_handle
25    }}
26}
27
28// If no runtime is enabled, fail compilation
29#[cfg(not(feature = "runtime-tokio"))]
30compile_error!("No runtime enabled");
31
32/// Abstraction over tasks that can be aborted (such as join or abort handles).
33#[auto_impl::auto_impl(&, Box, Arc)]
34pub trait Abortable {
35    /// Notifies the task that it should abort.
36    ///
37    /// Must be idempotent and not panic if it was already called before, due to implementation-specific
38    /// semantics of [`Abortable::was_aborted`].
39    fn abort_task(&self);
40
41    /// Returns `true` if [`abort_task`](Abortable::abort_task) was already called or the task has finished.
42    ///
43    /// It is implementation-specific whether `true` actually means that the task has been finished.
44    /// The [`Abortable::abort_task`] therefore can be also called if `true` is returned without a consequence.
45    fn was_aborted(&self) -> bool;
46}
47
48impl Abortable for AbortHandle {
49    fn abort_task(&self) {
50        self.abort();
51    }
52
53    fn was_aborted(&self) -> bool {
54        self.is_aborted()
55    }
56}
57
58#[cfg(feature = "runtime-tokio")]
59impl Abortable for tokio::task::JoinHandle<()> {
60    fn abort_task(&self) {
61        self.abort();
62    }
63
64    fn was_aborted(&self) -> bool {
65        self.is_finished()
66    }
67}
68
69/// List of [`Abortable`] tasks with each task identified by a unique key of type `T`.
70///
71/// Abortable objects, such as join or abort handles, do not by design abort when dropped.
72/// Sometimes this behavior is not desirable, and spawned run-away tasks may still continue to live
73/// e.g.: after an error is raised.
74///
75/// This object allows safely managing abortable tasks and will terminate all the tasks in reverse insertion order once
76/// dropped.
77///
78/// Additionally, this object also implements [`Abortable`] allowing it to be arbitrarily nested.
79pub struct AbortableList<T>(indexmap::IndexMap<T, Box<dyn Abortable + Send + Sync>>);
80
81impl<T> Default for AbortableList<T> {
82    fn default() -> Self {
83        Self(indexmap::IndexMap::new())
84    }
85}
86
87impl<T: std::fmt::Debug> std::fmt::Debug for AbortableList<T> {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.debug_list().entries(self.0.keys()).finish()
90    }
91}
92
93impl<T: Hash + Eq> AbortableList<T> {
94    /// Appends a new [`abortable task`](Abortable) to the end of this list.
95    pub fn insert<A: Abortable + Send + Sync + 'static>(&mut self, process: T, task: A) {
96        self.0.insert(process, Box::new(task));
97    }
98
99    /// Looks up a task by its key, removes it and aborts it.
100    ///
101    /// Returns `true` if the task was aborted and removed.
102    /// Otherwise, returns `false` (including a situation when the task was present but already aborted).
103    pub fn abort_one(&mut self, process: &T) -> bool {
104        if let Some(item) = self.0.shift_remove(process).filter(|t| !t.was_aborted()) {
105            item.abort_task();
106            true
107        } else {
108            false
109        }
110    }
111
112    /// Extends this list by appending `other`.
113    ///
114    /// The tasks from `other` are moved to this list without aborting them.
115    /// Afterward, `other` will be empty.
116    pub fn extend_from(&mut self, mut other: AbortableList<T>) {
117        self.0.extend(other.0.drain(..));
118    }
119
120    /// Extends this list by appending `other` while mapping its keys to the ones in this list.
121    ///
122    /// The tasks from `other` are moved to this list without aborting them.
123    /// Afterward, `other` will be empty.
124    pub fn flat_map_extend_from<U>(&mut self, mut other: AbortableList<U>, key_map: impl Fn(U) -> T) {
125        self.0.extend(other.0.drain(..).map(|(k, v)| (key_map(k), v)));
126    }
127}
128impl<T> AbortableList<T> {
129    /// Checks if the list is empty.
130    pub fn is_empty(&self) -> bool {
131        self.0.is_empty()
132    }
133
134    /// Returns the number of abortable tasks in the list.
135    pub fn size(&self) -> usize {
136        self.0.len()
137    }
138
139    /// Returns an iterator over the task names in the insertion order.
140    pub fn iter_names(&self) -> impl Iterator<Item = &T> {
141        self.0.keys()
142    }
143
144    /// Aborts all tasks in this list in the reverse insertion order.
145    ///
146    /// Skips tasks which were [already aborted](Abortable::was_aborted).
147    pub fn abort_all(&self) {
148        for (_, task) in self.0.iter().rev().filter(|(_, task)| !task.was_aborted()) {
149            task.abort_task();
150        }
151    }
152}
153
154impl<T> Abortable for AbortableList<T> {
155    fn abort_task(&self) {
156        self.abort_all();
157    }
158
159    fn was_aborted(&self) -> bool {
160        self.0.iter().all(|(_, task)| task.was_aborted())
161    }
162}
163
164impl<T> Drop for AbortableList<T> {
165    fn drop(&mut self) {
166        self.abort_all();
167        self.0.clear();
168    }
169}