hopr_async_runtime/lib.rs
1//! Executor API for HOPR which exposes the necessary async functions depending on the enabled
2//! runtime.
3
4use std::hash::Hash;
5
6pub use futures::future::AbortHandle;
7
8// Both features could be enabled during testing; therefore, we only use tokio when it's
9// exclusively enabled.
10#[cfg(feature = "runtime-tokio")]
11pub mod prelude {
12 pub use futures::future::{AbortHandle, abortable};
13 pub use tokio::{
14 task::{JoinError, JoinHandle, spawn, spawn_blocking, spawn_local},
15 time::{sleep, timeout as timeout_fut},
16 };
17}
18
19#[macro_export]
20macro_rules! spawn_as_abortable {
21 ($($expr:expr),*) => {{
22 let (proc, abort_handle) = $crate::prelude::abortable($($expr),*);
23 let _jh = $crate::prelude::spawn(proc);
24 abort_handle
25 }}
26}
27
28// If no runtime is enabled, fail compilation
29#[cfg(not(feature = "runtime-tokio"))]
30compile_error!("No runtime enabled");
31
32/// Abstraction over tasks that can be aborted (such as join or abort handles).
33#[auto_impl::auto_impl(&, Box, Arc)]
34pub trait Abortable {
35 /// Notifies the task that it should abort.
36 ///
37 /// Must be idempotent and not panic if it was already called before, due to implementation-specific
38 /// semantics of [`Abortable::was_aborted`].
39 fn abort_task(&self);
40
41 /// Returns `true` if [`abort_task`](Abortable::abort_task) was already called or the task has finished.
42 ///
43 /// It is implementation-specific whether `true` actually means that the task has been finished.
44 /// The [`Abortable::abort_task`] therefore can be also called if `true` is returned without a consequence.
45 fn was_aborted(&self) -> bool;
46}
47
48impl Abortable for AbortHandle {
49 fn abort_task(&self) {
50 self.abort();
51 }
52
53 fn was_aborted(&self) -> bool {
54 self.is_aborted()
55 }
56}
57
58#[cfg(feature = "runtime-tokio")]
59impl Abortable for tokio::task::JoinHandle<()> {
60 fn abort_task(&self) {
61 self.abort();
62 }
63
64 fn was_aborted(&self) -> bool {
65 self.is_finished()
66 }
67}
68
69/// List of [`Abortable`] tasks with each task identified by a unique key of type `T`.
70///
71/// Abortable objects, such as join or abort handles, do not by design abort when dropped.
72/// Sometimes this behavior is not desirable, and spawned run-away tasks may still continue to live
73/// e.g.: after an error is raised.
74///
75/// This object allows safely managing abortable tasks and will terminate all the tasks in reverse insertion order once
76/// dropped.
77///
78/// Additionally, this object also implements [`Abortable`] allowing it to be arbitrarily nested.
79pub struct AbortableList<T>(indexmap::IndexMap<T, Box<dyn Abortable + Send + Sync>>);
80
81impl<T> Default for AbortableList<T> {
82 fn default() -> Self {
83 Self(indexmap::IndexMap::new())
84 }
85}
86
87impl<T: Hash + Eq> AbortableList<T> {
88 /// Appends a new [`abortable task`](Abortable) to the end of this list.
89 pub fn insert<A: Abortable + Send + Sync + 'static>(&mut self, process: T, task: A) {
90 self.0.insert(process, Box::new(task));
91 }
92
93 /// Looks up a task by its key, removes it and aborts it.
94 ///
95 /// Returns `true` if the task was aborted and removed.
96 /// Otherwise, returns `false` (including a situation when the task was present but already aborted).
97 pub fn abort_one(&mut self, process: &T) -> bool {
98 if let Some(item) = self.0.shift_remove(process).filter(|t| !t.was_aborted()) {
99 item.abort_task();
100 true
101 } else {
102 false
103 }
104 }
105
106 /// Extends this list by appending `other`.
107 ///
108 /// The tasks from `other` are moved to this list without aborting them.
109 /// Afterward, `other` will be empty.
110 pub fn extend_from(&mut self, mut other: AbortableList<T>) {
111 self.0.extend(other.0.drain(..));
112 }
113
114 /// Extends this list by appending `other` while mapping its keys to the ones in this list.
115 ///
116 /// The tasks from `other` are moved to this list without aborting them.
117 /// Afterward, `other` will be empty.
118 pub fn flat_map_extend_from<U>(&mut self, mut other: AbortableList<U>, key_map: impl Fn(U) -> T) {
119 self.0.extend(other.0.drain(..).map(|(k, v)| (key_map(k), v)));
120 }
121}
122impl<T> AbortableList<T> {
123 /// Checks if the list is empty.
124 pub fn is_empty(&self) -> bool {
125 self.0.is_empty()
126 }
127
128 /// Returns the number of abortable tasks in the list.
129 pub fn size(&self) -> usize {
130 self.0.len()
131 }
132
133 /// Returns an iterator over the task names in the insertion order.
134 pub fn iter_names(&self) -> impl Iterator<Item = &T> {
135 self.0.keys()
136 }
137
138 /// Aborts all tasks in this list in the reverse insertion order.
139 ///
140 /// Skips tasks which were [already aborted](Abortable::was_aborted).
141 pub fn abort_all(&self) {
142 for (_, task) in self.0.iter().rev().filter(|(_, task)| !task.was_aborted()) {
143 task.abort_task();
144 }
145 }
146}
147
148impl<T> Abortable for AbortableList<T> {
149 fn abort_task(&self) {
150 self.abort_all();
151 }
152
153 fn was_aborted(&self) -> bool {
154 self.0.iter().all(|(_, task)| task.was_aborted())
155 }
156}
157
158impl<T> Drop for AbortableList<T> {
159 fn drop(&mut self) {
160 self.abort_all();
161 self.0.clear();
162 }
163}