1use std::{
7 fmt::{Debug, Display, Formatter},
8 str::FromStr,
9};
10
11use async_trait::async_trait;
12use futures::{SinkExt, StreamExt, pin_mut};
13use hopr_lib::{
14 AcknowledgedTicketStatus, ChannelChange, ChannelDirection, ChannelEntry, ChannelStatus, ChannelStatusDiscriminants,
15 HoprBalance, Utc, VerifiedTicket,
16 exports::api::{
17 chain::{ChainReadChannelOperations, ChannelSelector},
18 db::TicketSelector,
19 },
20};
21use serde::{Deserialize, Serialize};
22use serde_with::{DisplayFromStr, serde_as};
23use tracing::{debug, error, info};
24use validator::Validate;
25
26use crate::{
27 Strategy,
28 errors::{StrategyError, StrategyError::CriteriaNotSatisfied},
29 strategy::SingularStrategy,
30};
31
32#[cfg(all(feature = "prometheus", not(test)))]
33lazy_static::lazy_static! {
34 static ref METRIC_COUNT_AUTO_REDEEMS: hopr_metrics::SimpleCounter =
35 hopr_metrics::SimpleCounter::new("hopr_strategy_auto_redeem_redeem_count", "Count of initiated automatic redemptions").unwrap();
36}
37
38fn just_true() -> bool {
39 true
40}
41
42fn just_false() -> bool {
43 false
44}
45
46fn min_redeem_hopr() -> HoprBalance {
47 HoprBalance::from_str("1 wxHOPR").unwrap()
48}
49
50#[serde_as]
52#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
53pub struct AutoRedeemingStrategyConfig {
54 #[serde(default = "just_true")]
59 #[default = true]
60 pub redeem_all_on_close: bool,
61
62 #[serde(default = "min_redeem_hopr")]
67 #[serde_as(as = "DisplayFromStr")]
68 #[default(min_redeem_hopr())]
69 pub minimum_redeem_ticket_value: HoprBalance,
70
71 #[serde(default = "just_false")]
81 #[default = false]
82 pub redeem_on_winning: bool,
83}
84
85pub struct AutoRedeemingStrategy<A, R> {
89 hopr_chain_actions: A,
90 redeem_sink: R,
91 cfg: AutoRedeemingStrategyConfig,
92}
93
94impl<A, R> Debug for AutoRedeemingStrategy<A, R> {
95 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
96 write!(f, "{:?}", Strategy::AutoRedeeming(self.cfg))
97 }
98}
99
100impl<A, R> Display for AutoRedeemingStrategy<A, R> {
101 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
102 write!(f, "{}", Strategy::AutoRedeeming(self.cfg))
103 }
104}
105
106impl<A, R> AutoRedeemingStrategy<A, R>
107where
108 A: ChainReadChannelOperations + Clone + Send + Sync + 'static,
109 R: futures::Sink<TicketSelector> + Clone,
110 StrategyError: From<R::Error>,
111{
112 pub fn new(cfg: AutoRedeemingStrategyConfig, hopr_chain_actions: A, redeem_sink: R) -> Self {
113 Self {
114 cfg,
115 hopr_chain_actions,
116 redeem_sink,
117 }
118 }
119
120 async fn enqueue_redeem_request(&self, selector: TicketSelector) -> crate::errors::Result<()> {
121 let sink = self.redeem_sink.clone();
122 pin_mut!(sink);
123 Ok(sink
124 .send(selector.with_state(AcknowledgedTicketStatus::Untouched))
125 .await?)
126 }
127}
128
129#[async_trait]
130impl<A, R> SingularStrategy for AutoRedeemingStrategy<A, R>
131where
132 A: ChainReadChannelOperations + Clone + Send + Sync + 'static,
133 R: futures::Sink<TicketSelector> + Sync + Send + Clone,
134 StrategyError: From<R::Error>,
135{
136 async fn on_tick(&self) -> crate::errors::Result<()> {
137 if !self.cfg.redeem_on_winning {
138 debug!("trying to redeem all tickets in all channels");
139
140 self.hopr_chain_actions
141 .stream_channels(
142 ChannelSelector::default()
143 .with_destination(*self.hopr_chain_actions.me())
144 .with_allowed_states(&[
145 ChannelStatusDiscriminants::Open,
146 ChannelStatusDiscriminants::PendingToClose,
147 ])
148 .with_closure_time_range(Utc::now()..),
149 )
150 .await
151 .map_err(|e| StrategyError::Other(e.into()))?
152 .map(|channel| {
153 Ok(TicketSelector::from(&channel)
154 .with_amount(self.cfg.minimum_redeem_ticket_value..)
155 .with_index_range(channel.ticket_index..)
156 .with_state(AcknowledgedTicketStatus::Untouched))
157 })
158 .forward(self.redeem_sink.clone())
159 .await?;
160
161 Ok(())
162 } else {
163 Err(CriteriaNotSatisfied)
164 }
165 }
166
167 async fn on_acknowledged_winning_ticket(&self, ack: &VerifiedTicket) -> crate::errors::Result<()> {
168 if self.cfg.redeem_on_winning && ack.verified_ticket().amount.ge(&self.cfg.minimum_redeem_ticket_value) {
169 if let Some(channel) = self
170 .hopr_chain_actions
171 .channel_by_id(ack.channel_id())
172 .await
173 .map_err(|e| StrategyError::Other(e.into()))?
174 {
175 info!(%ack, "redeeming");
176
177 if ack.verified_ticket().index < channel.ticket_index {
178 error!(%ack, "acknowledged ticket is older than channel ticket index");
179 return Err(CriteriaNotSatisfied);
180 }
181
182 let selector = TicketSelector::from(channel)
183 .with_amount(self.cfg.minimum_redeem_ticket_value..)
184 .with_index_range(channel.ticket_index..=ack.verified_ticket().index)
185 .with_state(AcknowledgedTicketStatus::Untouched);
186
187 self.enqueue_redeem_request(selector).await
188 } else {
189 Err(CriteriaNotSatisfied)
190 }
191 } else {
192 Err(CriteriaNotSatisfied)
193 }
194 }
195
196 async fn on_own_channel_changed(
197 &self,
198 channel: &ChannelEntry,
199 direction: ChannelDirection,
200 change: ChannelChange,
201 ) -> crate::errors::Result<()> {
202 if direction != ChannelDirection::Incoming || !self.cfg.redeem_all_on_close {
203 return Ok(());
204 }
205
206 if let ChannelChange::Status { left: old, right: new } = change {
207 if old != ChannelStatus::Open || !matches!(new, ChannelStatus::PendingToClose(_)) {
208 debug!(?channel, "ignoring channel state change that's not in PendingToClose");
209 return Ok(());
210 }
211 info!(%channel, "channel transitioned to PendingToClose, checking if it has tickets to redeem");
212
213 let selector = TicketSelector::from(channel)
214 .with_amount(self.cfg.minimum_redeem_ticket_value..)
215 .with_index_range(channel.ticket_index..)
216 .with_state(AcknowledgedTicketStatus::Untouched);
217
218 self.enqueue_redeem_request(selector).await
219 } else {
220 Err(CriteriaNotSatisfied)
221 }
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use std::{
228 ops::Add,
229 sync::Arc,
230 time::{Duration, SystemTime},
231 };
232
233 use hex_literal::hex;
234 use hopr_chain_connector::{create_trustful_hopr_blokli_connector, testing::*};
235 use hopr_crypto_random::Randomizable;
236 use hopr_lib::{
237 Address, BytesRepresentable, ChainKeypair, HalfKey, Hash, Keypair, RedeemableTicket, Response, TicketBuilder,
238 UnitaryFloatOps, WinningProbability, XDaiBalance,
239 };
240
241 use super::*;
242
243 lazy_static::lazy_static! {
244 static ref ALICE: ChainKeypair = ChainKeypair::from_secret(&hex!("492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775")).expect("lazy static keypair should be valid");
245 static ref BOB: ChainKeypair = ChainKeypair::from_secret(&hex!("48680484c6fc31bc881a0083e6e32b6dc789f9eaba0f8b981429fd346c697f8c")).expect("lazy static keypair should be valid");
246 static ref CHARLIE: ChainKeypair = ChainKeypair::from_secret(&hex!("d39a926980d6fa96a9eba8f8058b2beb774bc11866a386e9ddf9dc1152557c26")).expect("lazy static keypair should be constructible");
247 static ref PRICE_PER_PACKET: HoprBalance = 10000000000000000_u128.into(); static ref CHANNEL_1: ChannelEntry = ChannelEntry::new(
250 ALICE.public().to_address(),
251 BOB.public().to_address(),
252 *PRICE_PER_PACKET * 10,
253 0,
254 ChannelStatus::Open,
255 4
256 );
257
258 static ref CHANNEL_2: ChannelEntry = ChannelEntry::new(
259 CHARLIE.public().to_address(),
260 BOB.public().to_address(),
261 *PRICE_PER_PACKET * 11,
262 1,
263 ChannelStatus::Open,
264 4
265 );
266
267 static ref CHAIN_CLIENT: BlokliTestClient<StaticState> = BlokliTestStateBuilder::default()
268 .with_generated_accounts(&[ALICE.public().as_ref(), BOB.public().as_ref(), CHARLIE.public().as_ref()], false, XDaiBalance::new_base(1), HoprBalance::new_base(1000))
269 .with_channels([CHANNEL_1.clone(), CHANNEL_2.clone()])
270 .build_static_client();
271 }
272
273 fn generate_random_ack_ticket(index: u64, worth_packets: u32) -> anyhow::Result<RedeemableTicket> {
274 let hk1 = HalfKey::random();
275 let hk2 = HalfKey::random();
276
277 let challenge = Response::from_half_keys(&hk1, &hk2)?.to_challenge()?;
278
279 Ok(TicketBuilder::default()
280 .counterparty(&*BOB)
281 .amount(PRICE_PER_PACKET.div_f64(1.0f64)?.amount() * worth_packets)
282 .index(index)
283 .win_prob(WinningProbability::ALWAYS)
284 .channel_epoch(4)
285 .challenge(challenge)
286 .build_signed(&ALICE, &Hash::default())?
287 .into_acknowledged(Response::from_half_keys(&hk1, &hk2)?)
288 .into_redeemable(&*BOB, &Hash::default())?)
289 }
290
291 #[tokio::test]
292 async fn test_auto_redeeming_strategy_redeem() -> anyhow::Result<()> {
293 let ack_ticket = generate_random_ack_ticket(0, 5)?;
294 let (tx, rx) = futures::channel::mpsc::channel(10);
295
296 let mut connector = create_trustful_hopr_blokli_connector(
297 &*BOB,
298 Default::default(),
299 CHAIN_CLIENT.clone(),
300 [1u8; Address::SIZE].into(),
301 )
302 .await?;
303 connector.connect(Duration::from_secs(3)).await?;
304
305 let cfg = AutoRedeemingStrategyConfig {
306 minimum_redeem_ticket_value: 0.into(),
307 redeem_on_winning: true,
308 ..Default::default()
309 };
310
311 {
312 let ars = AutoRedeemingStrategy::new(
313 cfg,
314 Arc::new(connector),
315 tx.sink_map_err(|e| StrategyError::Other(e.into())),
316 );
317
318 ars.on_acknowledged_winning_ticket(&ack_ticket.ticket).await?;
319 assert!(ars.on_tick().await.is_err());
320 }
321
322 let redeem_requests = rx.collect::<Vec<_>>().await;
323 assert_eq!(
324 redeem_requests,
325 vec![
326 TicketSelector::from(CHANNEL_1.clone())
327 .with_amount(HoprBalance::zero()..)
328 .with_index_range(
329 ack_ticket.ticket.verified_ticket().index..=ack_ticket.ticket.verified_ticket().index,
330 )
331 .with_state(AcknowledgedTicketStatus::Untouched)
332 ]
333 );
334
335 Ok(())
336 }
337
338 #[test_log::test(tokio::test)]
339 async fn test_auto_redeeming_strategy_redeem_on_tick() -> anyhow::Result<()> {
340 let (tx, rx) = futures::channel::mpsc::channel(10);
341
342 let mut connector = create_trustful_hopr_blokli_connector(
343 &*BOB,
344 Default::default(),
345 CHAIN_CLIENT.clone(),
346 [1u8; Address::SIZE].into(),
347 )
348 .await?;
349 connector.connect(Duration::from_secs(3)).await?;
350
351 let cfg = AutoRedeemingStrategyConfig {
352 minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
353 redeem_on_winning: false,
354 ..Default::default()
355 };
356
357 {
358 let ars = AutoRedeemingStrategy::new(
359 cfg,
360 Arc::new(connector),
361 tx.sink_map_err(|e| StrategyError::Other(e.into())),
362 );
363 ars.on_tick().await?;
364 }
365
366 let redeem_requests = rx.collect::<Vec<_>>().await;
367 assert_eq!(
368 redeem_requests,
369 vec![
370 TicketSelector::from(CHANNEL_1.clone())
371 .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
372 .with_index_range(CHANNEL_1.ticket_index..)
373 .with_state(AcknowledgedTicketStatus::Untouched),
374 TicketSelector::from(CHANNEL_2.clone())
375 .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
376 .with_index_range(CHANNEL_2.ticket_index..)
377 .with_state(AcknowledgedTicketStatus::Untouched)
378 ]
379 );
380
381 Ok(())
382 }
383
384 #[tokio::test]
385 async fn test_auto_redeeming_strategy_redeem_minimum_ticket_amount() -> anyhow::Result<()> {
386 let ack_ticket_below = generate_random_ack_ticket(1, 4)?;
387 let ack_ticket_at = generate_random_ack_ticket(1, 5)?;
388
389 let (tx, rx) = futures::channel::mpsc::channel(10);
390 let mut connector = create_trustful_hopr_blokli_connector(
391 &*BOB,
392 Default::default(),
393 CHAIN_CLIENT.clone(),
394 [1u8; Address::SIZE].into(),
395 )
396 .await?;
397 connector.connect(Duration::from_secs(3)).await?;
398
399 let cfg = AutoRedeemingStrategyConfig {
400 minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
401 redeem_on_winning: true,
402 ..Default::default()
403 };
404
405 {
406 let ars = AutoRedeemingStrategy::new(
407 cfg,
408 Arc::new(connector),
409 tx.sink_map_err(|e| StrategyError::Other(e.into())),
410 );
411 ars.on_acknowledged_winning_ticket(&ack_ticket_below.ticket)
412 .await
413 .expect_err("ticket below threshold should not satisfy");
414 ars.on_acknowledged_winning_ticket(&ack_ticket_at.ticket).await?;
415 }
416
417 let redeem_requests = rx.collect::<Vec<_>>().await;
418 assert_eq!(
419 redeem_requests,
420 vec![
421 TicketSelector::from(CHANNEL_1.clone())
422 .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
423 .with_index_range(CHANNEL_1.ticket_index..=ack_ticket_at.ticket.verified_ticket().index)
424 .with_state(AcknowledgedTicketStatus::Untouched)
425 ]
426 );
427
428 Ok(())
429 }
430
431 #[tokio::test]
432 async fn test_auto_redeeming_strategy_should_redeem_singular_ticket_on_close() -> anyhow::Result<()> {
433 let mut channel = CHANNEL_1.clone();
434 channel.status = ChannelStatus::PendingToClose(SystemTime::now().add(Duration::from_secs(100)));
435
436 let (tx, rx) = futures::channel::mpsc::channel(10);
437 let mut connector = create_trustful_hopr_blokli_connector(
438 &*BOB,
439 Default::default(),
440 CHAIN_CLIENT.clone(),
441 [1u8; Address::SIZE].into(),
442 )
443 .await?;
444 connector.connect(Duration::from_secs(3)).await?;
445
446 let cfg = AutoRedeemingStrategyConfig {
447 redeem_all_on_close: true,
448 minimum_redeem_ticket_value: HoprBalance::from(*PRICE_PER_PACKET * 5),
449 ..Default::default()
450 };
451
452 {
453 let ars = AutoRedeemingStrategy::new(
454 cfg,
455 Arc::new(connector),
456 tx.sink_map_err(|e| StrategyError::Other(e.into())),
457 );
458 ars.on_own_channel_changed(
459 &channel,
460 ChannelDirection::Incoming,
461 ChannelChange::Status {
462 left: ChannelStatus::Open,
463 right: channel.status,
464 },
465 )
466 .await?;
467 }
468
469 let redeem_requests = rx.collect::<Vec<_>>().await;
470 assert_eq!(
471 redeem_requests,
472 vec![
473 TicketSelector::from(CHANNEL_1.clone())
474 .with_amount(HoprBalance::from(*PRICE_PER_PACKET * 5)..)
475 .with_index_range(CHANNEL_1.ticket_index..)
476 .with_state(AcknowledgedTicketStatus::Untouched)
477 ]
478 );
479
480 Ok(())
481 }
482
483 #[tokio::test]
484 async fn test_auto_redeeming_strategy_redeem_multiple_tickets_in_channel() -> anyhow::Result<()> {
485 let ack_ticket_1 = generate_random_ack_ticket(0, 5)?;
486
487 let (tx, rx) = futures::channel::mpsc::channel(10);
488 let mut connector = create_trustful_hopr_blokli_connector(
489 &*BOB,
490 Default::default(),
491 CHAIN_CLIENT.clone(),
492 [1u8; Address::SIZE].into(),
493 )
494 .await?;
495 connector.connect(Duration::from_secs(3)).await?;
496
497 {
498 let cfg = AutoRedeemingStrategyConfig {
499 minimum_redeem_ticket_value: 0.into(),
500 redeem_on_winning: true,
501 ..Default::default()
502 };
503
504 let ars = AutoRedeemingStrategy::new(
505 cfg,
506 Arc::new(connector),
507 tx.sink_map_err(|e| StrategyError::Other(e.into())),
508 );
509 ars.on_acknowledged_winning_ticket(&ack_ticket_1.ticket).await?;
510 assert!(ars.on_tick().await.is_err());
511 }
512
513 let redeem_requests = rx.collect::<Vec<_>>().await;
514 assert_eq!(
515 redeem_requests,
516 vec![
517 TicketSelector::from(CHANNEL_1.clone())
518 .with_amount(HoprBalance::zero()..)
519 .with_index_range(CHANNEL_1.ticket_index..=ack_ticket_1.ticket.verified_ticket().index)
520 .with_state(AcknowledgedTicketStatus::Untouched),
521 ]
522 );
523
524 Ok(())
525 }
526}