hopr_strategy/
auto_redeeming.rs

1//! ## Auto Redeeming Strategy
2//! This strategy listens for newly added acknowledged tickets and automatically issues a redeem transaction on that
3//! ticket.
4//!
5//! For details on default parameters, see [AutoRedeemingStrategyConfig].
6use std::{
7    fmt::{Debug, Display, Formatter},
8    str::FromStr,
9};
10
11use async_trait::async_trait;
12use futures::{SinkExt, StreamExt, pin_mut};
13use hopr_lib::{
14    AcknowledgedTicketStatus, ChannelChange, ChannelDirection, ChannelEntry, ChannelStatus, ChannelStatusDiscriminants,
15    HoprBalance, Utc, VerifiedTicket,
16    exports::api::{
17        chain::{ChainReadChannelOperations, ChannelSelector},
18        db::TicketSelector,
19    },
20};
21use serde::{Deserialize, Serialize};
22use serde_with::{DisplayFromStr, serde_as};
23use tracing::{debug, error, info};
24use validator::Validate;
25
26use crate::{
27    Strategy,
28    errors::{StrategyError, StrategyError::CriteriaNotSatisfied},
29    strategy::SingularStrategy,
30};
31
32#[cfg(all(feature = "prometheus", not(test)))]
33lazy_static::lazy_static! {
34    static ref METRIC_COUNT_AUTO_REDEEMS:  hopr_metrics::SimpleCounter =
35         hopr_metrics::SimpleCounter::new("hopr_strategy_auto_redeem_redeem_count", "Count of initiated automatic redemptions").unwrap();
36}
37
38fn just_true() -> bool {
39    true
40}
41
42fn just_false() -> bool {
43    false
44}
45
46fn min_redeem_hopr() -> HoprBalance {
47    HoprBalance::from_str("1 wxHOPR").unwrap()
48}
49
50/// Configuration object for the `AutoRedeemingStrategy`
51#[serde_as]
52#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
53pub struct AutoRedeemingStrategyConfig {
54    /// If set to true, will redeem all tickets in the channel (which are over the
55    /// `minimum_redeem_ticket_value` threshold) once it transitions to `PendingToClose`.
56    ///
57    /// Default is `true`.
58    #[serde(default = "just_true")]
59    #[default = true]
60    pub redeem_all_on_close: bool,
61
62    /// The strategy will only redeem an acknowledged winning ticket if it has at least this value of HOPR.
63    /// If 0 is given, the strategy will redeem tickets regardless of their value.
64    ///
65    /// Default is `1 wxHOPR`.
66    #[serde(default = "min_redeem_hopr")]
67    #[serde_as(as = "DisplayFromStr")]
68    #[default(min_redeem_hopr())]
69    pub minimum_redeem_ticket_value: HoprBalance,
70
71    /// If set, the strategy will redeem each incoming winning ticket.
72    /// Otherwise, it will try to redeem tickets in all channels periodically.
73    ///
74    /// Set this to `true` when winning tickets are not happening too often (e.g., when winning probability
75    /// is below 1%).
76    /// Set this to `false` when winning tickets are happening very often (e.g., when winning probability
77    /// is above 1%).
78    ///
79    /// Default is `true`
80    #[serde(default = "just_false")]
81    #[default = false]
82    pub redeem_on_winning: bool,
83}
84
85/// The `AutoRedeemingStrategy` automatically sends an acknowledged ticket
86/// for redemption once encountered.
87/// The strategy does not await the result of the redemption.
88pub struct AutoRedeemingStrategy<A, R> {
89    hopr_chain_actions: A,
90    redeem_sink: R,
91    cfg: AutoRedeemingStrategyConfig,
92}
93
94impl<A, R> Debug for AutoRedeemingStrategy<A, R> {
95    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96        write!(f, "{:?}", Strategy::AutoRedeeming(self.cfg))
97    }
98}
99
100impl<A, R> Display for AutoRedeemingStrategy<A, R> {
101    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102        write!(f, "{}", Strategy::AutoRedeeming(self.cfg))
103    }
104}
105
106impl<A, R> AutoRedeemingStrategy<A, R>
107where
108    A: ChainReadChannelOperations + Clone + Send + Sync + 'static,
109    R: futures::Sink<TicketSelector> + Clone,
110    StrategyError: From<R::Error>,
111{
112    pub fn new(cfg: AutoRedeemingStrategyConfig, hopr_chain_actions: A, redeem_sink: R) -> Self {
113        Self {
114            cfg,
115            hopr_chain_actions,
116            redeem_sink,
117        }
118    }
119
120    async fn enqueue_redeem_request(&self, selector: TicketSelector) -> crate::errors::Result<()> {
121        let sink = self.redeem_sink.clone();
122        pin_mut!(sink);
123        Ok(sink
124            .send(selector.with_state(AcknowledgedTicketStatus::Untouched))
125            .await?)
126    }
127}
128
129#[async_trait]
130impl<A, R> SingularStrategy for AutoRedeemingStrategy<A, R>
131where
132    A: ChainReadChannelOperations + Clone + Send + Sync + 'static,
133    R: futures::Sink<TicketSelector> + Sync + Send + Clone,
134    StrategyError: From<R::Error>,
135{
136    async fn on_tick(&self) -> crate::errors::Result<()> {
137        if !self.cfg.redeem_on_winning {
138            debug!("trying to redeem all tickets in all channels");
139
140            self.hopr_chain_actions
141                .stream_channels(
142                    ChannelSelector::default()
143                        .with_destination(*self.hopr_chain_actions.me())
144                        .with_allowed_states(&[
145                            ChannelStatusDiscriminants::Open,
146                            ChannelStatusDiscriminants::PendingToClose,
147                        ])
148                        .with_closure_time_range(Utc::now()..),
149                )
150                .await
151                .map_err(|e| StrategyError::Other(e.into()))?
152                .map(|channel| {
153                    Ok(TicketSelector::from(&channel)
154                        .with_amount(self.cfg.minimum_redeem_ticket_value..)
155                        .with_index_range(channel.ticket_index..)
156                        .with_state(AcknowledgedTicketStatus::Untouched))
157                })
158                .forward(self.redeem_sink.clone())
159                .await?;
160
161            Ok(())
162        } else {
163            Err(CriteriaNotSatisfied)
164        }
165    }
166
167    async fn on_acknowledged_winning_ticket(&self, ack: &VerifiedTicket) -> crate::errors::Result<()> {
168        if self.cfg.redeem_on_winning && ack.verified_ticket().amount.ge(&self.cfg.minimum_redeem_ticket_value) {
169            if let Some(channel) = self
170                .hopr_chain_actions
171                .channel_by_id(ack.channel_id())
172                .await
173                .map_err(|e| StrategyError::Other(e.into()))?
174            {
175                info!(%ack, "redeeming");
176
177                if ack.verified_ticket().index < channel.ticket_index {
178                    error!(%ack, "acknowledged ticket is older than channel ticket index");
179                    return Err(CriteriaNotSatisfied);
180                }
181
182                let selector = TicketSelector::from(channel)
183                    .with_amount(self.cfg.minimum_redeem_ticket_value..)
184                    .with_index_range(channel.ticket_index..=ack.verified_ticket().index)
185                    .with_state(AcknowledgedTicketStatus::Untouched);
186
187                self.enqueue_redeem_request(selector).await
188            } else {
189                Err(CriteriaNotSatisfied)
190            }
191        } else {
192            Err(CriteriaNotSatisfied)
193        }
194    }
195
196    async fn on_own_channel_changed(
197        &self,
198        channel: &ChannelEntry,
199        direction: ChannelDirection,
200        change: ChannelChange,
201    ) -> crate::errors::Result<()> {
202        if direction != ChannelDirection::Incoming || !self.cfg.redeem_all_on_close {
203            return Ok(());
204        }
205
206        if let ChannelChange::Status { left: old, right: new } = change {
207            if old != ChannelStatus::Open || !matches!(new, ChannelStatus::PendingToClose(_)) {
208                debug!(?channel, "ignoring channel state change that's not in PendingToClose");
209                return Ok(());
210            }
211            info!(%channel, "channel transitioned to PendingToClose, checking if it has tickets to redeem");
212
213            let selector = TicketSelector::from(channel)
214                .with_amount(self.cfg.minimum_redeem_ticket_value..)
215                .with_index_range(channel.ticket_index..)
216                .with_state(AcknowledgedTicketStatus::Untouched);
217
218            self.enqueue_redeem_request(selector).await
219        } else {
220            Err(CriteriaNotSatisfied)
221        }
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use std::{
228        ops::Add,
229        sync::Arc,
230        time::{Duration, SystemTime},
231    };
232
233    use hex_literal::hex;
234    use hopr_chain_connector::{create_trustful_hopr_blokli_connector, testing::*};
235    use hopr_crypto_random::Randomizable;
236    use hopr_lib::{
237        Address, BytesRepresentable, ChainKeypair, HalfKey, Hash, Keypair, RedeemableTicket, Response, TicketBuilder,
238        UnitaryFloatOps, WinningProbability, XDaiBalance,
239    };
240
241    use super::*;
242
243    lazy_static::lazy_static! {
244        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
245        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
246        static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
247        static ref PRICE_PER_PACKET: HoprBalance = 10000000000000000_u128.into(); // 0.01 HOPR
248
249        static ref CHANNEL_1: ChannelEntry = ChannelEntry::new(
250            ALICE.public().to_address(),
251            BOB.public().to_address(),
252            *PRICE_PER_PACKET * 10,
253            0,
254            ChannelStatus::Open,
255            4
256        );
257
258        static ref CHANNEL_2: ChannelEntry = ChannelEntry::new(
259            CHARLIE.public().to_address(),
260            BOB.public().to_address(),
261            *PRICE_PER_PACKET * 11,
262            1,
263            ChannelStatus::Open,
264            4
265        );
266
267        static ref CHAIN_CLIENT: BlokliTestClient<StaticState> = BlokliTestStateBuilder::default()
268            .with_generated_accounts(&[ALICE.public().as_ref(), BOB.public().as_ref(), CHARLIE.public().as_ref()], false, XDaiBalance::new_base(1), HoprBalance::new_base(1000))
269            .with_channels([CHANNEL_1.clone(), CHANNEL_2.clone()])
270            .build_static_client();
271    }
272
273    fn generate_random_ack_ticket(index: u64, worth_packets: u32) -> anyhow::Result<RedeemableTicket> {
274        let hk1 = HalfKey::random();
275        let hk2 = HalfKey::random();
276
277        let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
278
279        Ok(TicketBuilder::default()
280            .counterparty(&*BOB)
281            .amount(PRICE_PER_PACKET.div_f64(1.0f64)?.amount() * worth_packets)
282            .index(index)
283            .win_prob(WinningProbability::ALWAYS)
284            .channel_epoch(4)
285            .challenge(challenge)
286            .build_signed(&ALICE, &Hash::default())?
287            .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?)
288            .into_redeemable(&*BOB, &Hash::default())?)
289    }
290
291    #[tokio::test]
292    async fn test_auto_redeeming_strategy_redeem() -> anyhow::Result<()> {
293        let ack_ticket = generate_random_ack_ticket(0, 5)?;
294        let (tx, rx) = futures::channel::mpsc::channel(10);
295
296        let mut connector = create_trustful_hopr_blokli_connector(
297            &*BOB,
298            Default::default(),
299            CHAIN_CLIENT.clone(),
300            [1u8; Address::SIZE].into(),
301        )
302        .await?;
303        connector.connect(Duration::from_secs(3)).await?;
304
305        let cfg = AutoRedeemingStrategyConfig {
306            minimum_redeem_ticket_value: 0.into(),
307            redeem_on_winning: true,
308            ..Default::default()
309        };
310
311        {
312            let ars = AutoRedeemingStrategy::new(
313                cfg,
314                Arc::new(connector),
315                tx.sink_map_err(|e| StrategyError::Other(e.into())),
316            );
317
318            ars.on_acknowledged_winning_ticket(&ack_ticket.ticket).await?;
319            assert!(ars.on_tick().await.is_err());
320        }
321
322        let redeem_requests = rx.collect::<Vec<_>>().await;
323        assert_eq!(
324            redeem_requests,
325            vec![
326                TicketSelector::from(CHANNEL_1.clone())
327                    .with_amount(HoprBalance::zero()..)
328                    .with_index_range(
329                        ack_ticket.ticket.verified_ticket().index..=ack_ticket.ticket.verified_ticket().index,
330                    )
331                    .with_state(AcknowledgedTicketStatus::Untouched)
332            ]
333        );
334
335        Ok(())
336    }
337
338    #[test_log::test(tokio::test)]
339    async fn test_auto_redeeming_strategy_redeem_on_tick() -> anyhow::Result<()> {
340        let (tx, rx) = futures::channel::mpsc::channel(10);
341
342        let mut connector = create_trustful_hopr_blokli_connector(
343            &*BOB,
344            Default::default(),
345            CHAIN_CLIENT.clone(),
346            [1u8; Address::SIZE].into(),
347        )
348        .await?;
349        connector.connect(Duration::from_secs(3)).await?;
350
351        let cfg = AutoRedeemingStrategyConfig {
352            minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
353            redeem_on_winning: false,
354            ..Default::default()
355        };
356
357        {
358            let ars = AutoRedeemingStrategy::new(
359                cfg,
360                Arc::new(connector),
361                tx.sink_map_err(|e| StrategyError::Other(e.into())),
362            );
363            ars.on_tick().await?;
364        }
365
366        let redeem_requests = rx.collect::<Vec<_>>().await;
367        assert_eq!(
368            redeem_requests,
369            vec![
370                TicketSelector::from(CHANNEL_1.clone())
371                    .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
372                    .with_index_range(CHANNEL_1.ticket_index..)
373                    .with_state(AcknowledgedTicketStatus::Untouched),
374                TicketSelector::from(CHANNEL_2.clone())
375                    .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
376                    .with_index_range(CHANNEL_2.ticket_index..)
377                    .with_state(AcknowledgedTicketStatus::Untouched)
378            ]
379        );
380
381        Ok(())
382    }
383
384    #[tokio::test]
385    async fn test_auto_redeeming_strategy_redeem_minimum_ticket_amount() -> anyhow::Result<()> {
386        let ack_ticket_below = generate_random_ack_ticket(1, 4)?;
387        let ack_ticket_at = generate_random_ack_ticket(1, 5)?;
388
389        let (tx, rx) = futures::channel::mpsc::channel(10);
390        let mut connector = create_trustful_hopr_blokli_connector(
391            &*BOB,
392            Default::default(),
393            CHAIN_CLIENT.clone(),
394            [1u8; Address::SIZE].into(),
395        )
396        .await?;
397        connector.connect(Duration::from_secs(3)).await?;
398
399        let cfg = AutoRedeemingStrategyConfig {
400            minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
401            redeem_on_winning: true,
402            ..Default::default()
403        };
404
405        {
406            let ars = AutoRedeemingStrategy::new(
407                cfg,
408                Arc::new(connector),
409                tx.sink_map_err(|e| StrategyError::Other(e.into())),
410            );
411            ars.on_acknowledged_winning_ticket(&ack_ticket_below.ticket)
412                .await
413                .expect_err("ticket below threshold should not satisfy");
414            ars.on_acknowledged_winning_ticket(&ack_ticket_at.ticket).await?;
415        }
416
417        let redeem_requests = rx.collect::<Vec<_>>().await;
418        assert_eq!(
419            redeem_requests,
420            vec![
421                TicketSelector::from(CHANNEL_1.clone())
422                    .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
423                    .with_index_range(CHANNEL_1.ticket_index..=ack_ticket_at.ticket.verified_ticket().index)
424                    .with_state(AcknowledgedTicketStatus::Untouched)
425            ]
426        );
427
428        Ok(())
429    }
430
431    #[tokio::test]
432    async fn test_auto_redeeming_strategy_should_redeem_singular_ticket_on_close() -> anyhow::Result<()> {
433        let mut channel = CHANNEL_1.clone();
434        channel.status = ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100)));
435
436        let (tx, rx) = futures::channel::mpsc::channel(10);
437        let mut connector = create_trustful_hopr_blokli_connector(
438            &*BOB,
439            Default::default(),
440            CHAIN_CLIENT.clone(),
441            [1u8; Address::SIZE].into(),
442        )
443        .await?;
444        connector.connect(Duration::from_secs(3)).await?;
445
446        let cfg = AutoRedeemingStrategyConfig {
447            redeem_all_on_close: true,
448            minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
449            ..Default::default()
450        };
451
452        {
453            let ars = AutoRedeemingStrategy::new(
454                cfg,
455                Arc::new(connector),
456                tx.sink_map_err(|e| StrategyError::Other(e.into())),
457            );
458            ars.on_own_channel_changed(
459                &channel,
460                ChannelDirection::Incoming,
461                ChannelChange::Status {
462                    left: ChannelStatus::Open,
463                    right: channel.status,
464                },
465            )
466            .await?;
467        }
468
469        let redeem_requests = rx.collect::<Vec<_>>().await;
470        assert_eq!(
471            redeem_requests,
472            vec![
473                TicketSelector::from(CHANNEL_1.clone())
474                    .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
475                    .with_index_range(CHANNEL_1.ticket_index..)
476                    .with_state(AcknowledgedTicketStatus::Untouched)
477            ]
478        );
479
480        Ok(())
481    }
482
483    #[tokio::test]
484    async fn test_auto_redeeming_strategy_redeem_multiple_tickets_in_channel() -> anyhow::Result<()> {
485        let ack_ticket_1 = generate_random_ack_ticket(0, 5)?;
486
487        let (tx, rx) = futures::channel::mpsc::channel(10);
488        let mut connector = create_trustful_hopr_blokli_connector(
489            &*BOB,
490            Default::default(),
491            CHAIN_CLIENT.clone(),
492            [1u8; Address::SIZE].into(),
493        )
494        .await?;
495        connector.connect(Duration::from_secs(3)).await?;
496
497        {
498            let cfg = AutoRedeemingStrategyConfig {
499                minimum_redeem_ticket_value: 0.into(),
500                redeem_on_winning: true,
501                ..Default::default()
502            };
503
504            let ars = AutoRedeemingStrategy::new(
505                cfg,
506                Arc::new(connector),
507                tx.sink_map_err(|e| StrategyError::Other(e.into())),
508            );
509            ars.on_acknowledged_winning_ticket(&ack_ticket_1.ticket).await?;
510            assert!(ars.on_tick().await.is_err());
511        }
512
513        let redeem_requests = rx.collect::<Vec<_>>().await;
514        assert_eq!(
515            redeem_requests,
516            vec![
517                TicketSelector::from(CHANNEL_1.clone())
518                    .with_amount(HoprBalance::zero()..)
519                    .with_index_range(CHANNEL_1.ticket_index..=ack_ticket_1.ticket.verified_ticket().index)
520                    .with_state(AcknowledgedTicketStatus::Untouched),
521            ]
522        );
523
524        Ok(())
525    }
526}