hopr_async_runtime/
lib.rs1use std::hash::Hash;
5
6pub use futures::future::AbortHandle;
7
8#[cfg(feature = "runtime-tokio")]
11pub mod prelude {
12 pub use futures::future::{AbortHandle, abortable};
13 pub use tokio::{
14 task::{JoinError, JoinHandle, spawn, spawn_blocking, spawn_local},
15 time::{sleep, timeout as timeout_fut},
16 };
17}
18
19#[macro_export]
20macro_rules! spawn_as_abortable {
21 ($($expr:expr),*) => {{
22 let (proc, abort_handle) = $crate::prelude::abortable($($expr),*);
23 let _jh = $crate::prelude::spawn(proc);
24 abort_handle
25 }}
26}
27
28#[cfg(not(feature = "runtime-tokio"))]
30compile_error!("No runtime enabled");
31
32#[auto_impl::auto_impl(&, Box, Arc)]
34pub trait Abortable {
35 fn abort_task(&self);
40
41 fn was_aborted(&self) -> bool;
46}
47
48impl Abortable for AbortHandle {
49 fn abort_task(&self) {
50 self.abort();
51 }
52
53 fn was_aborted(&self) -> bool {
54 self.is_aborted()
55 }
56}
57
58#[cfg(feature = "runtime-tokio")]
59impl Abortable for tokio::task::JoinHandle<()> {
60 fn abort_task(&self) {
61 self.abort();
62 }
63
64 fn was_aborted(&self) -> bool {
65 self.is_finished()
66 }
67}
68
69pub struct AbortableList<T>(indexmap::IndexMap<T, Box<dyn Abortable + Send + Sync>>);
80
81impl<T> Default for AbortableList<T> {
82 fn default() -> Self {
83 Self(indexmap::IndexMap::new())
84 }
85}
86
87impl<T: std::fmt::Debug> std::fmt::Debug for AbortableList<T> {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.debug_list().entries(self.0.keys()).finish()
90 }
91}
92
93impl<T: Hash + Eq> AbortableList<T> {
94 pub fn insert<A: Abortable + Send + Sync + 'static>(&mut self, process: T, task: A) {
96 self.0.insert(process, Box::new(task));
97 }
98
99 pub fn abort_one(&mut self, process: &T) -> bool {
104 if let Some(item) = self.0.shift_remove(process).filter(|t| !t.was_aborted()) {
105 item.abort_task();
106 true
107 } else {
108 false
109 }
110 }
111
112 pub fn extend_from(&mut self, mut other: AbortableList<T>) {
117 self.0.extend(other.0.drain(..));
118 }
119
120 pub fn flat_map_extend_from<U>(&mut self, mut other: AbortableList<U>, key_map: impl Fn(U) -> T) {
125 self.0.extend(other.0.drain(..).map(|(k, v)| (key_map(k), v)));
126 }
127}
128impl<T> AbortableList<T> {
129 pub fn is_empty(&self) -> bool {
131 self.0.is_empty()
132 }
133
134 pub fn size(&self) -> usize {
136 self.0.len()
137 }
138
139 pub fn iter_names(&self) -> impl Iterator<Item = &T> {
141 self.0.keys()
142 }
143
144 pub fn abort_all(&self) {
148 for (_, task) in self.0.iter().rev().filter(|(_, task)| !task.was_aborted()) {
149 task.abort_task();
150 }
151 }
152}
153
154impl<T> Abortable for AbortableList<T> {
155 fn abort_task(&self) {
156 self.abort_all();
157 }
158
159 fn was_aborted(&self) -> bool {
160 self.0.iter().all(|(_, task)| task.was_aborted())
161 }
162}
163
164impl<T> Drop for AbortableList<T> {
165 fn drop(&mut self) {
166 self.abort_all();
167 self.0.clear();
168 }
169}