hopr_strategy/
lib.rs

1//! This crate contains all the Strategies for HOPRd.
2//! Strategies are vital for (partial) automation of ticket and HOPR channel operations
3//! during node runtime.
4//!
5//! - [passive strategy](crate::strategy::MultiStrategy)
6//! - [auto funding strategy](crate::auto_funding)
7//! - [auto redeeming strategy](crate::auto_redeeming)
8//! - [multiple strategy chains](crate::strategy)
9//!
10//! HOPRd can be configured to use any of the above strategies.
11//!
12//! ## Configuring strategies in HOPRd
13//!
14//! There are two ways of configuring strategies in HOPRd: via CLI and via a YAML config file.
15//!
16//! The configuration through CLI allows only fairly primitive single-strategy setting, through the `defaultStrategy`
17//! parameter. It can be set to any of the above strategies, however, the strategy parameters are not further
18//! configurable via the CLI and will always have their default values.
19//! In addition, if the ` disableTicketAutoRedeem ` CLI argument is `false`, the default Auto Redeem strategy is added
20//! to the strategy configured via the `defaultStrategy` argument (they execute together as Multi strategy).
21//!
22//! For more complex strategy configurations, the YAML configuration method is recommended via the `strategy` YAML
23//! section. In this case, the top-most strategy is always assumed to be Multi strategy:
24//!
25//! ```yaml
26//! strategy:
27//!   on_fail_continue: true
28//!   allow_recursive: true
29//!   execution_interval: 60
30//!   strategies:
31//!     - !AutoFunding
32//!       funding_amount: 20
33//! ```
34
35use std::{ops::Sub, str::FromStr, time::Duration};
36
37use futures::{StreamExt, pin_mut};
38use futures_concurrency::stream::Merge;
39use hopr_lib::{
40    Address, ChannelChange, ChannelStatus, HoprBalance, RedeemableTicket, VerifiedTicket,
41    exports::api::chain::ChainEvent,
42};
43use serde::{Deserialize, Serialize};
44use strum::{Display, EnumString, VariantNames};
45
46use crate::{
47    Strategy::AutoRedeeming,
48    auto_funding::AutoFundingStrategyConfig,
49    auto_redeeming::AutoRedeemingStrategyConfig,
50    channel_finalizer::ClosureFinalizerStrategyConfig,
51    strategy::{MultiStrategyConfig, SingularStrategy},
52};
53
54pub mod auto_funding;
55pub mod auto_redeeming;
56pub mod channel_finalizer;
57pub mod errors;
58pub mod strategy;
59
60/// Lists all possible strategies with their respective configurations.
61#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Display, EnumString, VariantNames)]
62#[strum(serialize_all = "snake_case")]
63pub enum Strategy {
64    AutoRedeeming(AutoRedeemingStrategyConfig),
65    AutoFunding(AutoFundingStrategyConfig),
66    ClosureFinalizer(ClosureFinalizerStrategyConfig),
67    Multi(MultiStrategyConfig),
68    Passive,
69}
70
71/// Default HOPR node strategies (in order).
72///
73/// ## Auto-redeem Strategy
74/// - redeem single tickets on channel close if worth at least 1 wxHOPR
75pub fn hopr_default_strategies() -> MultiStrategyConfig {
76    MultiStrategyConfig {
77        on_fail_continue: true,
78        allow_recursive: false,
79        execution_interval: Duration::from_secs(60),
80        strategies: vec![
81            // AutoFunding(AutoFundingStrategyConfig {
82            // min_stake_threshold: Balance::new_from_str("1000000000000000000", BalanceType::HOPR),
83            // funding_amount: Balance::new_from_str("10000000000000000000", BalanceType::HOPR),
84            // }),
85            AutoRedeeming(AutoRedeemingStrategyConfig {
86                redeem_all_on_close: true,
87                minimum_redeem_ticket_value: HoprBalance::from_str("1 wxHOPR").unwrap(),
88                redeem_on_winning: true,
89            }),
90        ],
91    }
92}
93
94enum StrategyEvent {
95    Tick,
96    ChainEvent(ChainEvent),
97    Ticket(VerifiedTicket),
98}
99
100/// Streams [`ChainEvents`](ChainEvent), [`AcknowledgedTickets`](AcknowledgedTicket) and `tick` at regular time
101/// intervals as events into the given `strategy`.
102pub fn stream_events_to_strategy_with_tick<C, T, S>(
103    strategy: std::sync::Arc<S>,
104    chain_events: C,
105    ticket_events: T,
106    tick: std::time::Duration,
107    me: Address,
108) -> hopr_async_runtime::AbortHandle
109where
110    C: futures::stream::Stream<Item = ChainEvent> + Send + 'static,
111    T: futures::stream::Stream<Item = RedeemableTicket> + Send + 'static,
112    S: SingularStrategy + Send + Sync + 'static,
113{
114    let tick_stream = futures_time::stream::interval(tick.into()).map(|_| StrategyEvent::Tick);
115    let chain_stream = chain_events.map(StrategyEvent::ChainEvent).fuse();
116    let ticket_stream = ticket_events.map(|t| StrategyEvent::Ticket(t.ticket)).fuse();
117
118    let (stream, abort_handle) = futures::stream::abortable((tick_stream, chain_stream, ticket_stream).merge());
119    hopr_async_runtime::prelude::spawn(async move {
120        pin_mut!(stream);
121        while let Some(event) = stream.next().await {
122            match event {
123                StrategyEvent::Tick => {
124                    if let Err(error) = strategy.on_tick().await {
125                        tracing::error!(%error, "error while notifying tick to strategy");
126                    }
127                }
128                StrategyEvent::ChainEvent(chain_event) => {
129                    // TODO: rework strategies so that they can react directly to `ChainEvent`s and avoid the following
130                    // conversion to `ChannelChange`
131                    match chain_event {
132                        ChainEvent::ChannelOpened(channel) => {
133                            if let Some(dir) = channel.direction(&me) {
134                                let _ = strategy
135                                    .on_own_channel_changed(
136                                        &channel,
137                                        dir,
138                                        ChannelChange::Status {
139                                            left: ChannelStatus::Closed,
140                                            right: ChannelStatus::Open,
141                                        },
142                                    )
143                                    .await;
144                            }
145                        }
146                        ChainEvent::ChannelClosureInitiated(channel) => {
147                            if let Some(dir) = channel.direction(&me) {
148                                let _ = strategy
149                                    .on_own_channel_changed(
150                                        &channel,
151                                        dir,
152                                        ChannelChange::Status {
153                                            left: ChannelStatus::Open,
154                                            right: channel.status,
155                                        },
156                                    )
157                                    .await;
158                            }
159                        }
160                        ChainEvent::ChannelClosed(channel) => {
161                            if let Some(dir) = channel.direction(&me) {
162                                let _ = strategy
163                                    .on_own_channel_changed(
164                                        &channel,
165                                        dir,
166                                        ChannelChange::Status {
167                                            left: ChannelStatus::PendingToClose(
168                                                std::time::SystemTime::now().sub(Duration::from_secs(30)),
169                                            ),
170                                            right: ChannelStatus::Closed,
171                                        },
172                                    )
173                                    .await;
174                            }
175                        }
176                        ChainEvent::ChannelBalanceIncreased(channel, diff) => {
177                            if let Some(dir) = channel.direction(&me) {
178                                let _ = strategy
179                                    .on_own_channel_changed(
180                                        &channel,
181                                        dir,
182                                        ChannelChange::Balance {
183                                            left: channel.balance - diff,
184                                            right: channel.balance,
185                                        },
186                                    )
187                                    .await;
188                            }
189                        }
190                        ChainEvent::ChannelBalanceDecreased(channel, diff) => {
191                            if let Some(dir) = channel.direction(&me) {
192                                let _ = strategy
193                                    .on_own_channel_changed(
194                                        &channel,
195                                        dir,
196                                        ChannelChange::Balance {
197                                            left: channel.balance + diff,
198                                            right: channel.balance,
199                                        },
200                                    )
201                                    .await;
202                            }
203                        }
204                        ChainEvent::TicketRedeemed(channel, _) => {
205                            if let Some(dir) = channel.direction(&me) {
206                                let _ = strategy
207                                    .on_own_channel_changed(
208                                        &channel,
209                                        dir,
210                                        ChannelChange::TicketIndex {
211                                            left: channel.ticket_index.as_u64() - 1,
212                                            right: channel.ticket_index.as_u64(),
213                                        },
214                                    )
215                                    .await;
216                            }
217                        }
218                        _ => {}
219                    }
220                }
221                StrategyEvent::Ticket(ack_ticket) => {
222                    if let Err(error) = strategy.on_acknowledged_winning_ticket(&ack_ticket).await {
223                        tracing::error!(%error, "error while notifying new winning ticket to strategy");
224                    }
225                }
226            }
227        }
228    });
229
230    abort_handle
231}
232
233impl Default for Strategy {
234    fn default() -> Self {
235        Self::Multi(hopr_default_strategies())
236    }
237}
238
239/// An alias for the strategy configuration type.
240pub type StrategyConfig = MultiStrategyConfig;