Skip to main content

hopr_strategy/
strategy.rs

1//! ## Multi Strategy
2//!
3//! Runs multiple sub-strategies concurrently. Each sub-strategy manages its own
4//! event subscription and internal timers via the `Strategy::run` method.
5//!
6//! `MultiStrategy` is a pure combinator: it accepts any `Box<dyn Strategy + Send>` —
7//! including strategies defined outside this crate — and runs them all concurrently.
8//! Sub-strategies are fully isolated: a failure in one is logged and does not affect
9//! the others.
10use std::fmt::{Debug, Display, Formatter};
11
12use async_trait::async_trait;
13use tracing::warn;
14
15use crate::errors::Result;
16
17/// A strategy that runs until cancelled or a fatal error occurs.
18///
19/// Each implementation subscribes to the node's event stream and/or creates internal
20/// timers in [`run`](Strategy::run). The trait is trivially object-safe: `run` takes only
21/// `&mut self`, so strategies can be held as `Box<dyn Strategy + Send>`.
22///
23/// Any type implementing this trait can be composed into a [`MultiStrategy`] without
24/// any changes to this crate.
25#[async_trait]
26pub trait Strategy: Display + Send {
27    /// Run the strategy. Returns only on cancellation or fatal error.
28    async fn run(&mut self) -> Result<()>;
29}
30
31/// Runs a group of sub-strategies concurrently, each in its own async task.
32///
33/// `MultiStrategy` is strategy-kind-agnostic: it only knows about
34/// `Box<dyn Strategy + Send>`. Any type implementing [`Strategy`] — including
35/// ones defined outside this crate — can be composed here.
36pub struct MultiStrategy {
37    strategies: Vec<Box<dyn Strategy + Send>>,
38}
39
40impl MultiStrategy {
41    /// Creates a new `MultiStrategy` from pre-built strategy objects.
42    ///
43    /// Strategies are passed in already constructed; `MultiStrategy` does not know or
44    /// care about the concrete types. Pass an empty `strategies` vec to get a passive
45    /// strategy that blocks forever.
46    pub fn new(strategies: Vec<Box<dyn Strategy + Send>>) -> Self {
47        Self { strategies }
48    }
49}
50
51impl Debug for MultiStrategy {
52    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
53        write!(f, "MultiStrategy({} sub-strategies)", self.strategies.len())
54    }
55}
56
57impl Display for MultiStrategy {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        let names: Vec<String> = self.strategies.iter().map(|s| s.to_string()).collect();
60        if names.is_empty() {
61            write!(f, "multi_strategy(passive)")
62        } else {
63            write!(f, "multi_strategy({})", names.join(", "))
64        }
65    }
66}
67
68#[async_trait]
69impl Strategy for MultiStrategy {
70    async fn run(&mut self) -> Result<()> {
71        use futures::StreamExt as _;
72        use hopr_async_runtime::prelude::{AbortHandle, abortable, spawn};
73
74        let strategies = std::mem::take(&mut self.strategies);
75
76        if strategies.is_empty() {
77            // Passive strategy: block forever until cancelled.
78            futures::future::pending::<()>().await;
79            return Ok(());
80        }
81
82        // Spawn each sub-strategy as an abortable task.
83        // Keeping all AbortHandles in a RAII guard ensures every sub-task is cancelled
84        // when MultiStrategy is dropped (graceful shutdown).
85        let mut join_handles = Vec::new();
86        let mut abort_handles: Vec<AbortHandle> = Vec::new();
87        for mut s in strategies {
88            let (proc, abort_handle) = abortable(async move { s.run().await });
89            join_handles.push(spawn(proc));
90            abort_handles.push(abort_handle);
91        }
92
93        struct AbortGuard(Vec<AbortHandle>);
94        impl Drop for AbortGuard {
95            fn drop(&mut self) {
96                for h in &self.0 {
97                    h.abort();
98                }
99            }
100        }
101        let _guard = AbortGuard(abort_handles);
102
103        // Process completions as they arrive. Sub-strategies are fully isolated:
104        // a failure in one is logged but does not affect the others.
105        let mut pending: futures::stream::FuturesUnordered<_> = join_handles.into_iter().collect();
106
107        while let Some(join_result) = pending.next().await {
108            let strategy_result = match join_result {
109                Err(e) => Err(crate::errors::StrategyError::Other(e.into())),
110                Ok(Ok(result)) => result,
111                Ok(Err(_aborted)) => continue, // aborted by the guard — expected during shutdown
112            };
113
114            if let Err(e) = strategy_result {
115                warn!(%e, "sub-strategy failed");
116            }
117        }
118
119        Ok(())
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use std::fmt::{Display, Formatter};
126
127    use super::*;
128    use crate::errors::StrategyError;
129
130    struct OkStrategy;
131    impl Display for OkStrategy {
132        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
133            write!(f, "ok")
134        }
135    }
136    #[async_trait]
137    impl Strategy for OkStrategy {
138        async fn run(&mut self) -> Result<()> {
139            Ok(())
140        }
141    }
142
143    struct FailStrategy;
144    impl Display for FailStrategy {
145        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146            write!(f, "fail")
147        }
148    }
149    #[async_trait]
150    impl Strategy for FailStrategy {
151        async fn run(&mut self) -> Result<()> {
152            Err(StrategyError::Other(anyhow::anyhow!("error")))
153        }
154    }
155
156    /// An externally-defined strategy — simulates a plugin or application-defined strategy.
157    struct ExternalStrategy {
158        ran: bool,
159    }
160    impl Display for ExternalStrategy {
161        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
162            write!(f, "external")
163        }
164    }
165    #[async_trait]
166    impl Strategy for ExternalStrategy {
167        async fn run(&mut self) -> Result<()> {
168            self.ran = true;
169            Ok(())
170        }
171    }
172
173    #[tokio::test]
174    async fn test_multi_strategy_sub_failure_does_not_propagate() -> anyhow::Result<()> {
175        // A failing sub-strategy is isolated: the MultiStrategy still returns Ok.
176        let mut ms = MultiStrategy::new(vec![Box::new(FailStrategy), Box::new(OkStrategy)]);
177        ms.run().await?;
178        Ok(())
179    }
180
181    #[tokio::test]
182    async fn test_multi_strategy_accepts_external_strategy() -> anyhow::Result<()> {
183        // Demonstrates that any impl Strategy can be composed without modifying hopr-strategy.
184        let mut ms = MultiStrategy::new(vec![Box::new(OkStrategy), Box::new(ExternalStrategy { ran: false })]);
185        ms.run().await?;
186        Ok(())
187    }
188
189    #[tokio::test]
190    async fn test_multi_strategy_empty_is_passive() {
191        // An empty MultiStrategy blocks forever — verify it does not complete immediately.
192        let mut ms = MultiStrategy::new(vec![]);
193        let result =
194            futures_time::future::FutureExt::timeout(ms.run(), futures_time::time::Duration::from_millis(50)).await;
195        assert!(result.is_err(), "empty MultiStrategy should block (timeout expected)");
196    }
197
198    #[test]
199    fn test_multi_strategy_display() {
200        let ms = MultiStrategy::new(vec![Box::new(OkStrategy), Box::new(FailStrategy)]);
201        assert_eq!(ms.to_string(), "multi_strategy(ok, fail)");
202    }
203
204    #[test]
205    fn test_multi_strategy_display_passive() {
206        let ms = MultiStrategy::new(vec![]);
207        assert_eq!(ms.to_string(), "multi_strategy(passive)");
208    }
209}