Skip to main content

hopr_strategy/
auto_redeeming.rs

1//! ## Auto Redeeming Strategy
2//! This strategy listens for newly added acknowledged tickets and automatically issues a redeem transaction on that
3//! ticket.
4//!
5//! For details on default parameters, see [AutoRedeemingStrategyConfig].
6use std::{
7    fmt::{Debug, Display, Formatter},
8    str::FromStr,
9    time::Duration,
10};
11
12use async_trait::async_trait;
13use futures::{StreamExt, TryStreamExt};
14use hopr_async_runtime::{AbortableList, spawn_as_abortable};
15use hopr_lib::{
16    ChannelChange, ChannelDirection, ChannelEntry, ChannelId, ChannelStatus, HoprBalance, VerifiedTicket,
17    api::{
18        chain::{ChainReadChannelOperations, ChainWriteTicketOperations, ChannelSelector},
19        tickets::TicketManagement,
20    },
21};
22use parking_lot::lock_api::RwLockUpgradableReadGuard;
23use serde::{Deserialize, Serialize};
24use serde_with::{DisplayFromStr, serde_as};
25use validator::Validate;
26
27use crate::{
28    Strategy,
29    errors::{StrategyError, StrategyError::CriteriaNotSatisfied},
30    strategy::SingularStrategy,
31};
32
33#[cfg(all(feature = "telemetry", not(test)))]
34lazy_static::lazy_static! {
35    static ref METRIC_COUNT_AUTO_REDEEMS:  hopr_metrics::SimpleCounter =
36         hopr_metrics::SimpleCounter::new("hopr_strategy_auto_redeem_redeem_count", "Count of initiated automatic redemptions").unwrap();
37}
38
39fn just_true() -> bool {
40    true
41}
42
43fn just_false() -> bool {
44    false
45}
46
47fn min_redeem_hopr() -> HoprBalance {
48    HoprBalance::from_str("1 wxHOPR").unwrap()
49}
50
51/// Configuration object for the `AutoRedeemingStrategy`
52#[serde_as]
53#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
54pub struct AutoRedeemingStrategyConfig {
55    /// If set to true, will redeem all tickets in the channel (which are over the
56    /// `minimum_redeem_ticket_value` threshold) once it transitions to `PendingToClose`.
57    ///
58    /// Default is `true`.
59    #[serde(default = "just_true")]
60    #[default = true]
61    pub redeem_all_on_close: bool,
62
63    /// The strategy will only redeem an acknowledged winning ticket if it has at least this value of HOPR.
64    /// If 0 is given, the strategy will redeem tickets regardless of their value.
65    ///
66    /// Default is `1 wxHOPR`.
67    #[serde(default = "min_redeem_hopr")]
68    #[serde_as(as = "DisplayFromStr")]
69    #[default(min_redeem_hopr())]
70    pub minimum_redeem_ticket_value: HoprBalance,
71
72    /// If set, the strategy will redeem each incoming winning ticket.
73    /// Otherwise, it will try to redeem tickets in all channels periodically.
74    ///
75    /// Set this to `true` when winning tickets are not happening too often (e.g., when winning probability
76    /// is below 1%).
77    /// Set this to `false` when winning tickets are happening very often (e.g., when winning probability
78    /// is above 1%).
79    ///
80    /// Default is `true`
81    #[serde(default = "just_false")]
82    #[default = false]
83    pub redeem_on_winning: bool,
84}
85
86/// The `AutoRedeemingStrategy` automatically sends an acknowledged ticket
87/// for redemption once encountered.
88/// The strategy does not await the result of the redemption.
89pub struct AutoRedeemingStrategy<A, T> {
90    cfg: AutoRedeemingStrategyConfig,
91    hopr_chain_actions: A,
92    ticket_manager: T,
93    // Makes sure all ongoing ticket redemptions to be terminated once the strategy is dropped.
94    running_redemptions: std::sync::Arc<parking_lot::RwLock<AbortableList<ChannelId>>>,
95}
96
97impl<A, T> Debug for AutoRedeemingStrategy<A, T> {
98    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
99        write!(f, "{:?}", Strategy::AutoRedeeming(self.cfg))
100    }
101}
102
103impl<A, T> Display for AutoRedeemingStrategy<A, T> {
104    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105        write!(f, "{}", Strategy::AutoRedeeming(self.cfg))
106    }
107}
108
109impl<A, T> AutoRedeemingStrategy<A, T>
110where
111    A: ChainReadChannelOperations + ChainWriteTicketOperations + Clone + Send + Sync + 'static,
112    T: TicketManagement + Clone + Sync + Send + 'static,
113{
114    pub fn new(cfg: AutoRedeemingStrategyConfig, hopr_chain_actions: A, ticket_manager: T) -> Self {
115        Self {
116            cfg,
117            hopr_chain_actions,
118            ticket_manager,
119            running_redemptions: std::sync::Arc::new(parking_lot::RwLock::new(AbortableList::default())),
120        }
121    }
122
123    fn enqueue_redemption(&self, channel_id: &ChannelId) -> Result<(), StrategyError> {
124        let redemptions = self.running_redemptions.upgradable_read();
125        if !redemptions.contains(channel_id) {
126            tracing::debug!(%channel_id, "attempting to start redemption in channel");
127
128            let tmgr = self.ticket_manager.clone();
129            let client = self.hopr_chain_actions.clone();
130            let min_value = self.cfg.minimum_redeem_ticket_value;
131            let channel_id = *channel_id;
132            let redemptions_clone = self.running_redemptions.clone();
133
134            RwLockUpgradableReadGuard::upgrade(redemptions).insert(
135                channel_id,
136                spawn_as_abortable!(async move {
137                    let redeem_result = match tmgr
138                        .redeem_stream(client.clone(), channel_id, min_value.into())
139                        .map_err(StrategyError::other)
140                    {
141                        Ok(stream) => {
142                            stream
143                                .map_err(StrategyError::other)
144                                .try_for_each(|res| {
145                                    tracing::debug!(?res, %channel_id, "ticket redemption completed");
146                                    futures::future::ok(())
147                                })
148                                .await
149                        }
150                        err => {
151                            // Add small delay to avoid the write lock acquired for insertion
152                            // still being held.
153                            hopr_async_runtime::prelude::sleep(Duration::from_millis(100)).await;
154                            err.map(|_| ())
155                        }
156                    };
157
158                    tracing::debug!(?redeem_result, %channel_id, "redemption in channel complete");
159                    redemptions_clone.write().abort_one(&channel_id);
160                }),
161            );
162            Ok(())
163        } else {
164            tracing::debug!(%channel_id, "existing on-going redemption");
165            Err(StrategyError::InProgress)
166        }
167    }
168}
169
170#[async_trait]
171impl<A, T> SingularStrategy for AutoRedeemingStrategy<A, T>
172where
173    A: ChainReadChannelOperations + ChainWriteTicketOperations + Clone + Send + Sync + 'static,
174    T: TicketManagement + Clone + Sync + Send + 'static,
175{
176    async fn on_tick(&self) -> crate::errors::Result<()> {
177        if !self.cfg.redeem_on_winning {
178            tracing::debug!("trying to redeem all tickets in all channels");
179
180            self.hopr_chain_actions
181                .stream_channels(
182                    ChannelSelector::default()
183                        .with_destination(*self.hopr_chain_actions.me())
184                        .with_redeemable_channels(Duration::from_secs(30).into()),
185                )
186                .await
187                .map_err(StrategyError::other)?
188                .for_each(|channel| {
189                    if let Err(error) = self.enqueue_redemption(channel.get_id()) {
190                        tracing::error!(
191                            %error,
192                            channel_id = %channel.get_id(),
193                            "cannot start redemption in channel"
194                        );
195                    }
196                    futures::future::ready(())
197                })
198                .await;
199
200            Ok(())
201        } else {
202            Err(CriteriaNotSatisfied)
203        }
204    }
205
206    async fn on_acknowledged_winning_ticket(&self, ack: &VerifiedTicket) -> crate::errors::Result<()> {
207        if self.cfg.redeem_on_winning && ack.verified_ticket().amount.ge(&self.cfg.minimum_redeem_ticket_value) {
208            if let Some(channel) = self
209                .hopr_chain_actions
210                .channel_by_id(ack.channel_id())
211                .await
212                .map_err(StrategyError::other)?
213            {
214                tracing::info!(%ack, "redeeming");
215
216                if ack.verified_ticket().index < channel.ticket_index {
217                    tracing::error!(%ack, "acknowledged ticket is older than channel ticket index");
218                    return Err(CriteriaNotSatisfied);
219                }
220
221                // Raises an error if redemption in this channel is ongoing
222                self.enqueue_redemption(channel.get_id())?;
223
224                Ok(())
225            } else {
226                Err(CriteriaNotSatisfied)
227            }
228        } else {
229            Err(CriteriaNotSatisfied)
230        }
231    }
232
233    async fn on_own_channel_changed(
234        &self,
235        channel: &ChannelEntry,
236        direction: ChannelDirection,
237        change: ChannelChange,
238    ) -> crate::errors::Result<()> {
239        if direction != ChannelDirection::Incoming || !self.cfg.redeem_all_on_close {
240            return Ok(());
241        }
242
243        if let ChannelChange::Status { left: old, right: new } = change {
244            if old != ChannelStatus::Open || !matches!(new, ChannelStatus::PendingToClose(_)) {
245                tracing::debug!(?channel, "ignoring channel state change that's not in PendingToClose");
246                return Ok(());
247            }
248            tracing::info!(%channel, "channel transitioned to PendingToClose, checking if it has tickets to redeem");
249
250            // Raises an error if redemption in this channel is ongoing
251            self.enqueue_redemption(channel.get_id())?;
252
253            Ok(())
254        } else {
255            Err(CriteriaNotSatisfied)
256        }
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use std::{
263        ops::Add,
264        sync::Arc,
265        time::{Duration, SystemTime},
266    };
267
268    use futures::stream::BoxStream;
269    use futures_time::future::FutureExt as TimeExt;
270    use hex_literal::hex;
271    use hopr_api::{
272        tickets::{ChannelStats, RedemptionResult},
273        types::crypto_random::Randomizable,
274    };
275    use hopr_chain_connector::{HoprBlockchainSafeConnector, create_trustful_hopr_blokli_connector, testing::*};
276    use hopr_lib::{
277        Address, BytesRepresentable, ChainKeypair, HalfKey, Hash, Keypair, RedeemableTicket, Response, TicketBuilder,
278        UnitaryFloatOps, WinningProbability, XDaiBalance,
279    };
280
281    use super::*;
282
283    mockall::mock! {
284        pub TicketMgmt {}
285         #[allow(refining_impl_trait)]
286        impl TicketManagement for TicketMgmt {
287            type Error = std::io::Error;
288            fn redeem_stream<C: ChainWriteTicketOperations + Send + Sync + 'static>(
289                &self,
290                client: C,
291                channel_id: ChannelId,
292                min_amount: Option<HoprBalance>,
293            ) -> Result<BoxStream<'static, Result<RedemptionResult, std::io::Error>>, std::io::Error>;
294
295            fn neglect_tickets(
296                &self,
297                channel_id: &ChannelId,
298                max_ticket_index: Option<u64>,
299            ) -> Result<Vec<VerifiedTicket>, std::io::Error>;
300
301            fn ticket_stats<'a>(&self, channel_id: Option<&'a ChannelId>) -> Result<ChannelStats, std::io::Error>;
302        }
303    }
304
305    lazy_static::lazy_static! {
306        static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
307        static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
308        static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
309        static ref PRICE_PER_PACKET: HoprBalance = 10000000000000000_u128.into(); // 0.01 HOPR
310
311        static ref CHANNEL_1: ChannelEntry = ChannelEntry::builder()
312            .between(&*ALICE, &*BOB)
313            .balance(*PRICE_PER_PACKET * 10)
314            .ticket_index(0)
315            .status(ChannelStatus::Open)
316            .epoch(4)
317            .build()
318            .unwrap();
319
320        static ref CHANNEL_2: ChannelEntry = ChannelEntry::builder()
321            .between(&*CHARLIE, &*BOB)
322            .balance(*PRICE_PER_PACKET * 11)
323            .ticket_index(1)
324            .status(ChannelStatus::Open)
325            .epoch(4)
326            .build()
327            .unwrap();
328
329        static ref CHAIN_CLIENT: BlokliTestClient<StaticState> = BlokliTestStateBuilder::default()
330            .with_generated_accounts(&[ALICE.public().as_ref(), BOB.public().as_ref(), CHARLIE.public().as_ref()], false, XDaiBalance::new_base(1), HoprBalance::new_base(1000))
331            .with_channels([*CHANNEL_1, *CHANNEL_2])
332            .build_static_client();
333    }
334
335    fn generate_random_ack_ticket(index: u64, worth_packets: u32) -> anyhow::Result<RedeemableTicket> {
336        let hk1 = HalfKey::random();
337        let hk2 = HalfKey::random();
338
339        let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
340
341        Ok(TicketBuilder::default()
342            .counterparty(&*BOB)
343            .amount(PRICE_PER_PACKET.div_f64(1.0f64)?.amount() * worth_packets)
344            .index(index)
345            .win_prob(WinningProbability::ALWAYS)
346            .channel_epoch(4)
347            .challenge(challenge)
348            .build_signed(&ALICE, &Hash::default())?
349            .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?)
350            .into_redeemable(&BOB, &Hash::default())?)
351    }
352
353    type TestConnector = Arc<HoprBlockchainSafeConnector<BlokliTestClient<StaticState>>>;
354
355    async fn await_redemption_queue_empty(redeems: Arc<parking_lot::RwLock<AbortableList<ChannelId>>>) {
356        loop {
357            hopr_async_runtime::prelude::sleep(Duration::from_millis(100)).await;
358
359            if redeems.read().is_empty() {
360                break;
361            }
362        }
363    }
364
365    #[test_log::test(tokio::test)]
366    async fn test_auto_redeeming_strategy_redeem() -> anyhow::Result<()> {
367        let ack_ticket = generate_random_ack_ticket(0, 5)?;
368
369        let mut connector = create_trustful_hopr_blokli_connector(
370            &BOB,
371            Default::default(),
372            CHAIN_CLIENT.clone(),
373            [1u8; Address::SIZE].into(),
374        )
375        .await?;
376        connector.connect().await?;
377
378        let cfg = AutoRedeemingStrategyConfig {
379            minimum_redeem_ticket_value: 0.into(),
380            redeem_on_winning: true,
381            ..Default::default()
382        };
383
384        let mut mock_tmgr = MockTicketMgmt::new();
385        mock_tmgr
386            .expect_redeem_stream()
387            .once()
388            .with(
389                mockall::predicate::always(),
390                mockall::predicate::eq(*CHANNEL_1.get_id()),
391                mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
392            )
393            .return_once(move |_: TestConnector, _, _| {
394                Ok(futures::stream::once(futures::future::ok(RedemptionResult::Redeemed(ack_ticket.ticket))).boxed())
395            });
396
397        let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
398
399        ars.on_acknowledged_winning_ticket(&ack_ticket.ticket).await?;
400        assert!(ars.on_tick().await.is_err());
401
402        await_redemption_queue_empty(ars.running_redemptions.clone())
403            .timeout(futures_time::time::Duration::from_secs(5))
404            .await?;
405
406        Ok(())
407    }
408
409    #[test_log::test(tokio::test)]
410    async fn test_auto_redeeming_strategy_redeem_on_tick() -> anyhow::Result<()> {
411        let mut connector = create_trustful_hopr_blokli_connector(
412            &BOB,
413            Default::default(),
414            CHAIN_CLIENT.clone(),
415            [1u8; Address::SIZE].into(),
416        )
417        .await?;
418        connector.connect().await?;
419
420        let cfg = AutoRedeemingStrategyConfig {
421            minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
422            redeem_on_winning: false,
423            ..Default::default()
424        };
425
426        let mut mock_tmgr = MockTicketMgmt::new();
427        mock_tmgr
428            .expect_redeem_stream()
429            .once()
430            .with(
431                mockall::predicate::always(),
432                mockall::predicate::eq(*CHANNEL_1.get_id()),
433                mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
434            )
435            .return_once(|_: TestConnector, _, _| Ok(futures::stream::empty().boxed()));
436
437        mock_tmgr
438            .expect_redeem_stream()
439            .once()
440            .with(
441                mockall::predicate::always(),
442                mockall::predicate::eq(*CHANNEL_2.get_id()),
443                mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
444            )
445            .return_once(|_: TestConnector, _, _| Ok(futures::stream::empty().boxed()));
446
447        let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
448        ars.on_tick().await?;
449
450        await_redemption_queue_empty(ars.running_redemptions.clone())
451            .timeout(futures_time::time::Duration::from_secs(5))
452            .await?;
453
454        Ok(())
455    }
456
457    #[tokio::test]
458    async fn test_auto_redeeming_strategy_should_redeem_singular_ticket_on_close() -> anyhow::Result<()> {
459        let mut channel = *CHANNEL_1;
460        channel.status = ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100)));
461
462        let mut connector = create_trustful_hopr_blokli_connector(
463            &BOB,
464            Default::default(),
465            CHAIN_CLIENT.clone(),
466            [1u8; Address::SIZE].into(),
467        )
468        .await?;
469        connector.connect().await?;
470
471        let cfg = AutoRedeemingStrategyConfig {
472            redeem_all_on_close: true,
473            minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
474            ..Default::default()
475        };
476
477        let mut mock_tmgr = MockTicketMgmt::new();
478        mock_tmgr
479            .expect_redeem_stream()
480            .once()
481            .with(
482                mockall::predicate::always(),
483                mockall::predicate::eq(*CHANNEL_1.get_id()),
484                mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
485            )
486            .return_once(move |_: TestConnector, _, _| Ok(futures::stream::empty().boxed()));
487
488        let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
489        ars.on_own_channel_changed(
490            &channel,
491            ChannelDirection::Incoming,
492            ChannelChange::Status {
493                left: ChannelStatus::Open,
494                right: channel.status,
495            },
496        )
497        .await?;
498
499        await_redemption_queue_empty(ars.running_redemptions.clone())
500            .timeout(futures_time::time::Duration::from_secs(5))
501            .await?;
502
503        Ok(())
504    }
505
506    #[tokio::test]
507    async fn test_auto_redeeming_strategy_should_not_redeem_multiple_times_in_same_channel() -> anyhow::Result<()> {
508        let ack_ticket_1 = generate_random_ack_ticket(0, 5)?;
509
510        let mut connector = create_trustful_hopr_blokli_connector(
511            &BOB,
512            Default::default(),
513            CHAIN_CLIENT.clone(),
514            [1u8; Address::SIZE].into(),
515        )
516        .await?;
517        connector.connect().await?;
518
519        let cfg = AutoRedeemingStrategyConfig {
520            minimum_redeem_ticket_value: 0.into(),
521            redeem_on_winning: true,
522            ..Default::default()
523        };
524
525        let mut mock_tmgr = MockTicketMgmt::new();
526        mock_tmgr
527            .expect_redeem_stream()
528            .once()
529            .with(
530                mockall::predicate::always(),
531                mockall::predicate::eq(*CHANNEL_1.get_id()),
532                mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
533            )
534            .return_once(move |_: TestConnector, _, _| {
535                Ok(futures::stream::once(
536                    futures::future::ok(RedemptionResult::Redeemed(ack_ticket_1.ticket))
537                        .delay(futures_time::time::Duration::from_millis(500)),
538                )
539                .boxed())
540            });
541
542        let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
543        ars.on_acknowledged_winning_ticket(&ack_ticket_1.ticket).await?;
544        assert!(matches!(
545            ars.on_acknowledged_winning_ticket(&ack_ticket_1.ticket).await,
546            Err(StrategyError::InProgress)
547        ));
548
549        let mut channel = *CHANNEL_1;
550        channel.status = ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100)));
551
552        assert!(matches!(
553            ars.on_own_channel_changed(
554                &channel,
555                ChannelDirection::Incoming,
556                ChannelChange::Status {
557                    left: ChannelStatus::Open,
558                    right: channel.status,
559                }
560            )
561            .await,
562            Err(StrategyError::InProgress)
563        ));
564        assert!(ars.on_tick().await.is_err());
565
566        await_redemption_queue_empty(ars.running_redemptions.clone())
567            .timeout(futures_time::time::Duration::from_secs(5))
568            .await?;
569
570        Ok(())
571    }
572}