1use std::{
7 fmt::{Debug, Display, Formatter},
8 str::FromStr,
9 time::Duration,
10};
11
12use async_trait::async_trait;
13use futures::{StreamExt, TryStreamExt};
14use hopr_async_runtime::{AbortableList, spawn_as_abortable};
15use hopr_lib::{
16 ChannelChange, ChannelDirection, ChannelEntry, ChannelId, ChannelStatus, HoprBalance, VerifiedTicket,
17 api::{
18 chain::{ChainReadChannelOperations, ChainWriteTicketOperations, ChannelSelector},
19 tickets::TicketManagement,
20 },
21};
22use parking_lot::lock_api::RwLockUpgradableReadGuard;
23use serde::{Deserialize, Serialize};
24use serde_with::{DisplayFromStr, serde_as};
25use validator::Validate;
26
27use crate::{
28 Strategy,
29 errors::{StrategyError, StrategyError::CriteriaNotSatisfied},
30 strategy::SingularStrategy,
31};
32
33#[cfg(all(feature = "telemetry", not(test)))]
34lazy_static::lazy_static! {
35 static ref METRIC_COUNT_AUTO_REDEEMS: hopr_metrics::SimpleCounter =
36 hopr_metrics::SimpleCounter::new("hopr_strategy_auto_redeem_redeem_count", "Count of initiated automatic redemptions").unwrap();
37}
38
39fn just_true() -> bool {
40 true
41}
42
43fn just_false() -> bool {
44 false
45}
46
47fn min_redeem_hopr() -> HoprBalance {
48 HoprBalance::from_str("1 wxHOPR").unwrap()
49}
50
51#[serde_as]
53#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
54pub struct AutoRedeemingStrategyConfig {
55 #[serde(default = "just_true")]
60 #[default = true]
61 pub redeem_all_on_close: bool,
62
63 #[serde(default = "min_redeem_hopr")]
68 #[serde_as(as = "DisplayFromStr")]
69 #[default(min_redeem_hopr())]
70 pub minimum_redeem_ticket_value: HoprBalance,
71
72 #[serde(default = "just_false")]
82 #[default = false]
83 pub redeem_on_winning: bool,
84}
85
86pub struct AutoRedeemingStrategy<A, T> {
90 cfg: AutoRedeemingStrategyConfig,
91 hopr_chain_actions: A,
92 ticket_manager: T,
93 running_redemptions: std::sync::Arc<parking_lot::RwLock<AbortableList<ChannelId>>>,
95}
96
97impl<A, T> Debug for AutoRedeemingStrategy<A, T> {
98 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
99 write!(f, "{:?}", Strategy::AutoRedeeming(self.cfg))
100 }
101}
102
103impl<A, T> Display for AutoRedeemingStrategy<A, T> {
104 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105 write!(f, "{}", Strategy::AutoRedeeming(self.cfg))
106 }
107}
108
109impl<A, T> AutoRedeemingStrategy<A, T>
110where
111 A: ChainReadChannelOperations + ChainWriteTicketOperations + Clone + Send + Sync + 'static,
112 T: TicketManagement + Clone + Sync + Send + 'static,
113{
114 pub fn new(cfg: AutoRedeemingStrategyConfig, hopr_chain_actions: A, ticket_manager: T) -> Self {
115 Self {
116 cfg,
117 hopr_chain_actions,
118 ticket_manager,
119 running_redemptions: std::sync::Arc::new(parking_lot::RwLock::new(AbortableList::default())),
120 }
121 }
122
123 fn enqueue_redemption(&self, channel_id: &ChannelId) -> Result<(), StrategyError> {
124 let redemptions = self.running_redemptions.upgradable_read();
125 if !redemptions.contains(channel_id) {
126 tracing::debug!(%channel_id, "attempting to start redemption in channel");
127
128 let tmgr = self.ticket_manager.clone();
129 let client = self.hopr_chain_actions.clone();
130 let min_value = self.cfg.minimum_redeem_ticket_value;
131 let channel_id = *channel_id;
132 let redemptions_clone = self.running_redemptions.clone();
133
134 RwLockUpgradableReadGuard::upgrade(redemptions).insert(
135 channel_id,
136 spawn_as_abortable!(async move {
137 let redeem_result = match tmgr
138 .redeem_stream(client.clone(), channel_id, min_value.into())
139 .map_err(StrategyError::other)
140 {
141 Ok(stream) => {
142 stream
143 .map_err(StrategyError::other)
144 .try_for_each(|res| {
145 tracing::debug!(?res, %channel_id, "ticket redemption completed");
146 futures::future::ok(())
147 })
148 .await
149 }
150 err => {
151 hopr_async_runtime::prelude::sleep(Duration::from_millis(100)).await;
154 err.map(|_| ())
155 }
156 };
157
158 tracing::debug!(?redeem_result, %channel_id, "redemption in channel complete");
159 redemptions_clone.write().abort_one(&channel_id);
160 }),
161 );
162 Ok(())
163 } else {
164 tracing::debug!(%channel_id, "existing on-going redemption");
165 Err(StrategyError::InProgress)
166 }
167 }
168}
169
170#[async_trait]
171impl<A, T> SingularStrategy for AutoRedeemingStrategy<A, T>
172where
173 A: ChainReadChannelOperations + ChainWriteTicketOperations + Clone + Send + Sync + 'static,
174 T: TicketManagement + Clone + Sync + Send + 'static,
175{
176 async fn on_tick(&self) -> crate::errors::Result<()> {
177 if !self.cfg.redeem_on_winning {
178 tracing::debug!("trying to redeem all tickets in all channels");
179
180 self.hopr_chain_actions
181 .stream_channels(
182 ChannelSelector::default()
183 .with_destination(*self.hopr_chain_actions.me())
184 .with_redeemable_channels(Duration::from_secs(30).into()),
185 )
186 .await
187 .map_err(StrategyError::other)?
188 .for_each(|channel| {
189 if let Err(error) = self.enqueue_redemption(channel.get_id()) {
190 tracing::error!(
191 %error,
192 channel_id = %channel.get_id(),
193 "cannot start redemption in channel"
194 );
195 }
196 futures::future::ready(())
197 })
198 .await;
199
200 Ok(())
201 } else {
202 Err(CriteriaNotSatisfied)
203 }
204 }
205
206 async fn on_acknowledged_winning_ticket(&self, ack: &VerifiedTicket) -> crate::errors::Result<()> {
207 if self.cfg.redeem_on_winning && ack.verified_ticket().amount.ge(&self.cfg.minimum_redeem_ticket_value) {
208 if let Some(channel) = self
209 .hopr_chain_actions
210 .channel_by_id(ack.channel_id())
211 .await
212 .map_err(StrategyError::other)?
213 {
214 tracing::info!(%ack, "redeeming");
215
216 if ack.verified_ticket().index < channel.ticket_index {
217 tracing::error!(%ack, "acknowledged ticket is older than channel ticket index");
218 return Err(CriteriaNotSatisfied);
219 }
220
221 self.enqueue_redemption(channel.get_id())?;
223
224 Ok(())
225 } else {
226 Err(CriteriaNotSatisfied)
227 }
228 } else {
229 Err(CriteriaNotSatisfied)
230 }
231 }
232
233 async fn on_own_channel_changed(
234 &self,
235 channel: &ChannelEntry,
236 direction: ChannelDirection,
237 change: ChannelChange,
238 ) -> crate::errors::Result<()> {
239 if direction != ChannelDirection::Incoming || !self.cfg.redeem_all_on_close {
240 return Ok(());
241 }
242
243 if let ChannelChange::Status { left: old, right: new } = change {
244 if old != ChannelStatus::Open || !matches!(new, ChannelStatus::PendingToClose(_)) {
245 tracing::debug!(?channel, "ignoring channel state change that's not in PendingToClose");
246 return Ok(());
247 }
248 tracing::info!(%channel, "channel transitioned to PendingToClose, checking if it has tickets to redeem");
249
250 self.enqueue_redemption(channel.get_id())?;
252
253 Ok(())
254 } else {
255 Err(CriteriaNotSatisfied)
256 }
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use std::{
263 ops::Add,
264 sync::Arc,
265 time::{Duration, SystemTime},
266 };
267
268 use futures::stream::BoxStream;
269 use futures_time::future::FutureExt as TimeExt;
270 use hex_literal::hex;
271 use hopr_api::{
272 tickets::{ChannelStats, RedemptionResult},
273 types::crypto_random::Randomizable,
274 };
275 use hopr_chain_connector::{HoprBlockchainSafeConnector, create_trustful_hopr_blokli_connector, testing::*};
276 use hopr_lib::{
277 Address, BytesRepresentable, ChainKeypair, HalfKey, Hash, Keypair, RedeemableTicket, Response, TicketBuilder,
278 UnitaryFloatOps, WinningProbability, XDaiBalance,
279 };
280
281 use super::*;
282
283 mockall::mock! {
284 pub TicketMgmt {}
285 #[allow(refining_impl_trait)]
286 impl TicketManagement for TicketMgmt {
287 type Error = std::io::Error;
288 fn redeem_stream<C: ChainWriteTicketOperations + Send + Sync + 'static>(
289 &self,
290 client: C,
291 channel_id: ChannelId,
292 min_amount: Option<HoprBalance>,
293 ) -> Result<BoxStream<'static, Result<RedemptionResult, std::io::Error>>, std::io::Error>;
294
295 fn neglect_tickets(
296 &self,
297 channel_id: &ChannelId,
298 max_ticket_index: Option<u64>,
299 ) -> Result<Vec<VerifiedTicket>, std::io::Error>;
300
301 fn ticket_stats<'a>(&self, channel_id: Option<&'a ChannelId>) -> Result<ChannelStats, std::io::Error>;
302 }
303 }
304
305 lazy_static::lazy_static! {
306 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
307 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
308 static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
309 static ref PRICE_PER_PACKET: HoprBalance = 10000000000000000_u128.into(); static ref CHANNEL_1: ChannelEntry = ChannelEntry::builder()
312 .between(&*ALICE, &*BOB)
313 .balance(*PRICE_PER_PACKET * 10)
314 .ticket_index(0)
315 .status(ChannelStatus::Open)
316 .epoch(4)
317 .build()
318 .unwrap();
319
320 static ref CHANNEL_2: ChannelEntry = ChannelEntry::builder()
321 .between(&*CHARLIE, &*BOB)
322 .balance(*PRICE_PER_PACKET * 11)
323 .ticket_index(1)
324 .status(ChannelStatus::Open)
325 .epoch(4)
326 .build()
327 .unwrap();
328
329 static ref CHAIN_CLIENT: BlokliTestClient<StaticState> = BlokliTestStateBuilder::default()
330 .with_generated_accounts(&[ALICE.public().as_ref(), BOB.public().as_ref(), CHARLIE.public().as_ref()], false, XDaiBalance::new_base(1), HoprBalance::new_base(1000))
331 .with_channels([*CHANNEL_1, *CHANNEL_2])
332 .build_static_client();
333 }
334
335 fn generate_random_ack_ticket(index: u64, worth_packets: u32) -> anyhow::Result<RedeemableTicket> {
336 let hk1 = HalfKey::random();
337 let hk2 = HalfKey::random();
338
339 let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
340
341 Ok(TicketBuilder::default()
342 .counterparty(&*BOB)
343 .amount(PRICE_PER_PACKET.div_f64(1.0f64)?.amount() * worth_packets)
344 .index(index)
345 .win_prob(WinningProbability::ALWAYS)
346 .channel_epoch(4)
347 .challenge(challenge)
348 .build_signed(&ALICE, &Hash::default())?
349 .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?)
350 .into_redeemable(&BOB, &Hash::default())?)
351 }
352
353 type TestConnector = Arc<HoprBlockchainSafeConnector<BlokliTestClient<StaticState>>>;
354
355 async fn await_redemption_queue_empty(redeems: Arc<parking_lot::RwLock<AbortableList<ChannelId>>>) {
356 loop {
357 hopr_async_runtime::prelude::sleep(Duration::from_millis(100)).await;
358
359 if redeems.read().is_empty() {
360 break;
361 }
362 }
363 }
364
365 #[test_log::test(tokio::test)]
366 async fn test_auto_redeeming_strategy_redeem() -> anyhow::Result<()> {
367 let ack_ticket = generate_random_ack_ticket(0, 5)?;
368
369 let mut connector = create_trustful_hopr_blokli_connector(
370 &BOB,
371 Default::default(),
372 CHAIN_CLIENT.clone(),
373 [1u8; Address::SIZE].into(),
374 )
375 .await?;
376 connector.connect().await?;
377
378 let cfg = AutoRedeemingStrategyConfig {
379 minimum_redeem_ticket_value: 0.into(),
380 redeem_on_winning: true,
381 ..Default::default()
382 };
383
384 let mut mock_tmgr = MockTicketMgmt::new();
385 mock_tmgr
386 .expect_redeem_stream()
387 .once()
388 .with(
389 mockall::predicate::always(),
390 mockall::predicate::eq(*CHANNEL_1.get_id()),
391 mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
392 )
393 .return_once(move |_: TestConnector, _, _| {
394 Ok(futures::stream::once(futures::future::ok(RedemptionResult::Redeemed(ack_ticket.ticket))).boxed())
395 });
396
397 let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
398
399 ars.on_acknowledged_winning_ticket(&ack_ticket.ticket).await?;
400 assert!(ars.on_tick().await.is_err());
401
402 await_redemption_queue_empty(ars.running_redemptions.clone())
403 .timeout(futures_time::time::Duration::from_secs(5))
404 .await?;
405
406 Ok(())
407 }
408
409 #[test_log::test(tokio::test)]
410 async fn test_auto_redeeming_strategy_redeem_on_tick() -> anyhow::Result<()> {
411 let mut connector = create_trustful_hopr_blokli_connector(
412 &BOB,
413 Default::default(),
414 CHAIN_CLIENT.clone(),
415 [1u8; Address::SIZE].into(),
416 )
417 .await?;
418 connector.connect().await?;
419
420 let cfg = AutoRedeemingStrategyConfig {
421 minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
422 redeem_on_winning: false,
423 ..Default::default()
424 };
425
426 let mut mock_tmgr = MockTicketMgmt::new();
427 mock_tmgr
428 .expect_redeem_stream()
429 .once()
430 .with(
431 mockall::predicate::always(),
432 mockall::predicate::eq(*CHANNEL_1.get_id()),
433 mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
434 )
435 .return_once(|_: TestConnector, _, _| Ok(futures::stream::empty().boxed()));
436
437 mock_tmgr
438 .expect_redeem_stream()
439 .once()
440 .with(
441 mockall::predicate::always(),
442 mockall::predicate::eq(*CHANNEL_2.get_id()),
443 mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
444 )
445 .return_once(|_: TestConnector, _, _| Ok(futures::stream::empty().boxed()));
446
447 let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
448 ars.on_tick().await?;
449
450 await_redemption_queue_empty(ars.running_redemptions.clone())
451 .timeout(futures_time::time::Duration::from_secs(5))
452 .await?;
453
454 Ok(())
455 }
456
457 #[tokio::test]
458 async fn test_auto_redeeming_strategy_should_redeem_singular_ticket_on_close() -> anyhow::Result<()> {
459 let mut channel = *CHANNEL_1;
460 channel.status = ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100)));
461
462 let mut connector = create_trustful_hopr_blokli_connector(
463 &BOB,
464 Default::default(),
465 CHAIN_CLIENT.clone(),
466 [1u8; Address::SIZE].into(),
467 )
468 .await?;
469 connector.connect().await?;
470
471 let cfg = AutoRedeemingStrategyConfig {
472 redeem_all_on_close: true,
473 minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
474 ..Default::default()
475 };
476
477 let mut mock_tmgr = MockTicketMgmt::new();
478 mock_tmgr
479 .expect_redeem_stream()
480 .once()
481 .with(
482 mockall::predicate::always(),
483 mockall::predicate::eq(*CHANNEL_1.get_id()),
484 mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
485 )
486 .return_once(move |_: TestConnector, _, _| Ok(futures::stream::empty().boxed()));
487
488 let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
489 ars.on_own_channel_changed(
490 &channel,
491 ChannelDirection::Incoming,
492 ChannelChange::Status {
493 left: ChannelStatus::Open,
494 right: channel.status,
495 },
496 )
497 .await?;
498
499 await_redemption_queue_empty(ars.running_redemptions.clone())
500 .timeout(futures_time::time::Duration::from_secs(5))
501 .await?;
502
503 Ok(())
504 }
505
506 #[tokio::test]
507 async fn test_auto_redeeming_strategy_should_not_redeem_multiple_times_in_same_channel() -> anyhow::Result<()> {
508 let ack_ticket_1 = generate_random_ack_ticket(0, 5)?;
509
510 let mut connector = create_trustful_hopr_blokli_connector(
511 &BOB,
512 Default::default(),
513 CHAIN_CLIENT.clone(),
514 [1u8; Address::SIZE].into(),
515 )
516 .await?;
517 connector.connect().await?;
518
519 let cfg = AutoRedeemingStrategyConfig {
520 minimum_redeem_ticket_value: 0.into(),
521 redeem_on_winning: true,
522 ..Default::default()
523 };
524
525 let mut mock_tmgr = MockTicketMgmt::new();
526 mock_tmgr
527 .expect_redeem_stream()
528 .once()
529 .with(
530 mockall::predicate::always(),
531 mockall::predicate::eq(*CHANNEL_1.get_id()),
532 mockall::predicate::eq(Some(cfg.minimum_redeem_ticket_value)),
533 )
534 .return_once(move |_: TestConnector, _, _| {
535 Ok(futures::stream::once(
536 futures::future::ok(RedemptionResult::Redeemed(ack_ticket_1.ticket))
537 .delay(futures_time::time::Duration::from_millis(500)),
538 )
539 .boxed())
540 });
541
542 let ars = AutoRedeemingStrategy::new(cfg, Arc::new(connector), Arc::new(mock_tmgr));
543 ars.on_acknowledged_winning_ticket(&ack_ticket_1.ticket).await?;
544 assert!(matches!(
545 ars.on_acknowledged_winning_ticket(&ack_ticket_1.ticket).await,
546 Err(StrategyError::InProgress)
547 ));
548
549 let mut channel = *CHANNEL_1;
550 channel.status = ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100)));
551
552 assert!(matches!(
553 ars.on_own_channel_changed(
554 &channel,
555 ChannelDirection::Incoming,
556 ChannelChange::Status {
557 left: ChannelStatus::Open,
558 right: channel.status,
559 }
560 )
561 .await,
562 Err(StrategyError::InProgress)
563 ));
564 assert!(ars.on_tick().await.is_err());
565
566 await_redemption_queue_empty(ars.running_redemptions.clone())
567 .timeout(futures_time::time::Duration::from_secs(5))
568 .await?;
569
570 Ok(())
571 }
572}