1use std::{
24 fmt::{Debug, Display, Formatter},
25 sync::Arc,
26};
27
28use async_trait::async_trait;
29use dashmap::DashSet;
30use futures::StreamExt;
31use hopr_lib::{
32 ChannelChange, ChannelDirection, ChannelEntry, ChannelId, ChannelStatus, ChannelStatusDiscriminants, HoprBalance,
33 api::chain::{ChainReadChannelOperations, ChainWriteChannelOperations, ChannelSelector},
34};
35use serde::{Deserialize, Serialize};
36use serde_with::{DisplayFromStr, serde_as};
37use tracing::{debug, info, warn};
38use validator::{Validate, ValidationError};
39
40use crate::{
41 Strategy,
42 errors::{StrategyError, StrategyError::CriteriaNotSatisfied},
43 strategy::SingularStrategy,
44};
45
46#[cfg(all(feature = "prometheus", not(test)))]
47lazy_static::lazy_static! {
48 static ref METRIC_COUNT_AUTO_FUNDINGS: hopr_metrics::SimpleCounter =
49 hopr_metrics::SimpleCounter::new("hopr_strategy_auto_funding_funding_count", "Count of initiated automatic fundings").unwrap();
50 static ref METRIC_COUNT_AUTO_FUNDING_FAILURES: hopr_metrics::SimpleCounter =
51 hopr_metrics::SimpleCounter::new("hopr_strategy_auto_funding_failure_count", "Count of failed automatic funding attempts").unwrap();
52}
53
54fn validate_funding_amount(amount: &HoprBalance) -> std::result::Result<(), ValidationError> {
56 if amount.is_zero() {
57 return Err(ValidationError::new("funding_amount must be greater than zero"));
58 }
59 Ok(())
60}
61
62#[serde_as]
64#[derive(Debug, Clone, Copy, PartialEq, Eq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
65pub struct AutoFundingStrategyConfig {
66 #[serde_as(as = "DisplayFromStr")]
70 #[default(HoprBalance::new_base(1))]
71 pub min_stake_threshold: HoprBalance,
72
73 #[serde_as(as = "DisplayFromStr")]
77 #[default(HoprBalance::new_base(10))]
78 #[validate(custom(function = "validate_funding_amount"))]
79 pub funding_amount: HoprBalance,
80}
81
82pub struct AutoFundingStrategy<A> {
88 hopr_chain_actions: Arc<A>,
89 cfg: AutoFundingStrategyConfig,
90 in_flight: Arc<DashSet<ChannelId>>,
95}
96
97impl<A: ChainReadChannelOperations + ChainWriteChannelOperations + Send + Sync + 'static> AutoFundingStrategy<A> {
98 pub fn new(cfg: AutoFundingStrategyConfig, hopr_chain_actions: A) -> Self {
99 if cfg.funding_amount.le(&cfg.min_stake_threshold) {
100 warn!(
101 funding_amount = %cfg.funding_amount,
102 min_stake_threshold = %cfg.min_stake_threshold,
103 "funding_amount is not greater than min_stake_threshold; \
104 successful funding may re-trigger the threshold check"
105 );
106 }
107 Self {
108 cfg,
109 hopr_chain_actions: Arc::new(hopr_chain_actions),
110 in_flight: Arc::new(DashSet::new()),
111 }
112 }
113
114 async fn try_fund_channel(&self, channel: &ChannelEntry) -> crate::errors::Result<()> {
119 let channel_id = *channel.get_id();
120
121 if !self.in_flight.insert(channel_id) {
123 debug!(%channel, "skipping channel with in-flight funding");
124 return Ok(());
125 }
126
127 info!(
128 %channel,
129 balance = %channel.balance,
130 threshold = %self.cfg.min_stake_threshold,
131 "stake on channel at or below threshold"
132 );
133
134 let chain_actions = Arc::clone(&self.hopr_chain_actions);
135 let funding_amount = self.cfg.funding_amount;
136 let in_flight = Arc::clone(&self.in_flight);
137
138 hopr_async_runtime::prelude::spawn(async move {
139 match chain_actions.fund_channel(&channel_id, funding_amount).await {
140 Ok(confirmation) => {
141 #[cfg(all(feature = "prometheus", not(test)))]
142 METRIC_COUNT_AUTO_FUNDINGS.increment();
143
144 info!(%channel_id, %funding_amount, "issued re-staking of channel");
145
146 if let Err(e) = confirmation.await {
147 warn!(%channel_id, error = %e, "funding transaction failed");
148 in_flight.remove(&channel_id);
149
150 #[cfg(all(feature = "prometheus", not(test)))]
151 METRIC_COUNT_AUTO_FUNDING_FAILURES.increment();
152 }
153 }
156 Err(e) => {
157 warn!(%channel_id, error = %e, "failed to enqueue funding transaction");
158 in_flight.remove(&channel_id);
159
160 #[cfg(all(feature = "prometheus", not(test)))]
161 METRIC_COUNT_AUTO_FUNDING_FAILURES.increment();
162 }
163 }
164 });
165
166 Ok(())
167 }
168}
169
170impl<A> Debug for AutoFundingStrategy<A> {
171 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172 write!(f, "{:?}", Strategy::AutoFunding(self.cfg))
173 }
174}
175
176impl<A> Display for AutoFundingStrategy<A> {
177 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178 write!(f, "{}", Strategy::AutoFunding(self.cfg))
179 }
180}
181
182#[async_trait]
183impl<A: ChainReadChannelOperations + ChainWriteChannelOperations + Send + Sync + 'static> SingularStrategy
184 for AutoFundingStrategy<A>
185{
186 async fn on_tick(&self) -> crate::errors::Result<()> {
195 let mut channels = self
196 .hopr_chain_actions
197 .stream_channels(
198 ChannelSelector::default()
199 .with_source(*self.hopr_chain_actions.me())
200 .with_allowed_states(&[ChannelStatusDiscriminants::Open]),
201 )
202 .await
203 .map_err(|e| StrategyError::Other(e.into()))?;
204
205 while let Some(channel) = channels.next().await {
206 if channel.balance.le(&self.cfg.min_stake_threshold) {
207 if let Err(e) = self.try_fund_channel(&channel).await {
208 warn!(%channel, error = %e, "on_tick: failed to fund channel");
209 }
210 } else {
211 self.in_flight.remove(channel.get_id());
213 }
214 }
215
216 debug!("auto-funding on_tick scan complete");
217 Ok(())
218 }
219
220 async fn on_own_channel_changed(
221 &self,
222 channel: &ChannelEntry,
223 direction: ChannelDirection,
224 change: ChannelChange,
225 ) -> crate::errors::Result<()> {
226 if direction != ChannelDirection::Outgoing {
228 return Ok(());
229 }
230
231 if let ChannelChange::Balance { left: old, right: new } = change {
232 if new > old && self.in_flight.remove(channel.get_id()).is_some() {
234 debug!(%channel, "cleared in-flight funding state after balance increase");
235 }
236
237 if new.le(&self.cfg.min_stake_threshold) && channel.status == ChannelStatus::Open {
238 self.try_fund_channel(channel).await?;
239 }
240 Ok(())
241 } else {
242 Err(CriteriaNotSatisfied)
243 }
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use std::str::FromStr;
250
251 use futures::StreamExt;
252 use futures_time::future::FutureExt;
253 use hex_literal::hex;
254 use hopr_chain_connector::{create_trustful_hopr_blokli_connector, testing::BlokliTestStateBuilder};
255 use hopr_lib::{
256 Address, BytesRepresentable, ChainKeypair, Keypair, XDaiBalance,
257 api::chain::{ChainEvent, ChainEvents},
258 };
259
260 use super::*;
261 use crate::{
262 auto_funding::{AutoFundingStrategy, AutoFundingStrategyConfig},
263 strategy::SingularStrategy,
264 };
265
266 lazy_static::lazy_static! {
267 static ref BOB_KP: ChainKeypair = ChainKeypair::from_secret(&hex!(
268 "492057cf93e99b31d2a85bc5e98a9c3aa0021feec52c227cc8170e8f7d047775"
269 ))
270 .expect("lazy static keypair should be valid");
271
272 static ref ALICE: Address = hex!("18f8ae833c85c51fbeba29cef9fbfb53b3bad950").into();
273 static ref BOB: Address = BOB_KP.public().to_address();
274 static ref CHRIS: Address = hex!("b6021e0860dd9d96c9ff0a73e2e5ba3a466ba234").into();
275 static ref DAVE: Address = hex!("68499f50ff68d523385dc60686069935d17d762a").into();
276 }
277
278 #[test_log::test(tokio::test)]
279 async fn test_auto_funding_strategy() -> anyhow::Result<()> {
280 let stake_limit = HoprBalance::from(7_u32);
281 let fund_amount = HoprBalance::from(5_u32);
282
283 let c1 = ChannelEntry::new(*ALICE, *BOB, 10_u32.into(), 0_u32.into(), ChannelStatus::Open, 0);
284
285 let c2 = ChannelEntry::new(*BOB, *CHRIS, 5_u32.into(), 0_u32.into(), ChannelStatus::Open, 0);
286
287 let c3 = ChannelEntry::new(
288 *CHRIS,
289 *DAVE,
290 5_u32.into(),
291 0_u32.into(),
292 ChannelStatus::PendingToClose(
293 chrono::DateTime::<chrono::Utc>::from_str("2025-11-10T00:00:00+00:00")?.into(),
294 ),
295 0,
296 );
297
298 let blokli_sim = BlokliTestStateBuilder::default()
299 .with_generated_accounts(
300 &[&*ALICE, &*BOB, &*CHRIS, &*DAVE],
301 false,
302 XDaiBalance::new_base(1),
303 HoprBalance::new_base(1000),
304 )
305 .with_channels([c1, c2, c3])
306 .build_dynamic_client([1; Address::SIZE].into());
307
308 let snapshot = blokli_sim.snapshot();
309
310 let mut chain_connector =
311 create_trustful_hopr_blokli_connector(&BOB_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
312 .await?;
313 chain_connector.connect().await?;
314 let events = chain_connector.subscribe()?;
315
316 let cfg = AutoFundingStrategyConfig {
317 min_stake_threshold: stake_limit,
318 funding_amount: fund_amount,
319 };
320
321 let afs = AutoFundingStrategy::new(cfg, chain_connector);
322 afs.on_own_channel_changed(
323 &c1,
324 ChannelDirection::Outgoing,
325 ChannelChange::Balance {
326 left: HoprBalance::zero(),
327 right: c1.balance,
328 },
329 )
330 .await?;
331
332 afs.on_own_channel_changed(
333 &c2,
334 ChannelDirection::Outgoing,
335 ChannelChange::Balance {
336 left: HoprBalance::zero(),
337 right: c2.balance,
338 },
339 )
340 .await?;
341
342 afs.on_own_channel_changed(
343 &c3,
344 ChannelDirection::Outgoing,
345 ChannelChange::Balance {
346 left: HoprBalance::zero(),
347 right: c3.balance,
348 },
349 )
350 .await?;
351
352 events
353 .filter(|event| futures::future::ready(matches!(event, ChainEvent::ChannelBalanceIncreased(c, amount) if c.get_id() == c2.get_id() && amount == &fund_amount)))
354 .next()
355 .timeout(futures_time::time::Duration::from_secs(2))
356 .await?;
357
358 insta::assert_yaml_snapshot!(*snapshot.refresh());
359
360 Ok(())
361 }
362
363 #[test]
364 fn test_config_validation_rejects_zero_funding_amount() {
365 let cfg = AutoFundingStrategyConfig {
366 min_stake_threshold: HoprBalance::new_base(1),
367 funding_amount: HoprBalance::zero(),
368 };
369 assert!(
370 cfg.validate().is_err(),
371 "config with zero funding_amount should fail validation"
372 );
373 }
374
375 #[test]
376 fn test_config_validation_accepts_valid_config() {
377 let cfg = AutoFundingStrategyConfig {
378 min_stake_threshold: HoprBalance::new_base(1),
379 funding_amount: HoprBalance::new_base(10),
380 };
381 assert!(
382 cfg.validate().is_ok(),
383 "config with valid funding_amount should pass validation"
384 );
385 }
386
387 #[test]
388 fn test_default_config_passes_validation() {
389 let cfg = AutoFundingStrategyConfig::default();
390 assert!(cfg.validate().is_ok(), "default config should pass validation");
391 }
392
393 #[test_log::test(tokio::test)]
394 async fn test_on_tick_funds_underfunded_channels() -> anyhow::Result<()> {
395 let stake_limit = HoprBalance::from(7_u32);
396 let fund_amount = HoprBalance::from(5_u32);
397
398 let c1 = ChannelEntry::new(*BOB, *CHRIS, 3_u32.into(), 0_u32.into(), ChannelStatus::Open, 0_u32);
400
401 let c2 = ChannelEntry::new(*BOB, *DAVE, 10_u32.into(), 0_u32.into(), ChannelStatus::Open, 0_u32);
403
404 let blokli_sim = BlokliTestStateBuilder::default()
405 .with_generated_accounts(
406 &[&*ALICE, &*BOB, &*CHRIS, &*DAVE],
407 false,
408 XDaiBalance::new_base(1),
409 HoprBalance::new_base(1000),
410 )
411 .with_channels([c1, c2])
412 .build_dynamic_client([1; Address::SIZE].into());
413
414 let mut chain_connector =
415 create_trustful_hopr_blokli_connector(&BOB_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
416 .await?;
417 chain_connector.connect().await?;
418 let events = chain_connector.subscribe()?;
419
420 let cfg = AutoFundingStrategyConfig {
421 min_stake_threshold: stake_limit,
422 funding_amount: fund_amount,
423 };
424
425 let afs = AutoFundingStrategy::new(cfg, chain_connector);
426
427 afs.on_tick().await?;
429
430 events
432 .filter(|event| {
433 futures::future::ready(
434 matches!(event, ChainEvent::ChannelBalanceIncreased(c, amount) if c.get_id() == c1.get_id() && amount == &fund_amount),
435 )
436 })
437 .next()
438 .timeout(futures_time::time::Duration::from_secs(2))
439 .await?;
440
441 Ok(())
442 }
443
444 #[test_log::test(tokio::test)]
445 async fn test_in_flight_prevents_duplicate_funding() -> anyhow::Result<()> {
446 let stake_limit = HoprBalance::from(7_u32);
447 let fund_amount = HoprBalance::from(5_u32);
448
449 let c1 = ChannelEntry::new(*BOB, *CHRIS, 3_u32.into(), 0_u32.into(), ChannelStatus::Open, 0_u32);
450
451 let blokli_sim = BlokliTestStateBuilder::default()
452 .with_generated_accounts(
453 &[&*ALICE, &*BOB, &*CHRIS, &*DAVE],
454 false,
455 XDaiBalance::new_base(1),
456 HoprBalance::new_base(1000),
457 )
458 .with_channels([c1])
459 .build_dynamic_client([1; Address::SIZE].into());
460
461 let mut chain_connector =
462 create_trustful_hopr_blokli_connector(&BOB_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
463 .await?;
464 chain_connector.connect().await?;
465 let _events = chain_connector.subscribe()?;
466
467 let cfg = AutoFundingStrategyConfig {
468 min_stake_threshold: stake_limit,
469 funding_amount: fund_amount,
470 };
471
472 let afs = AutoFundingStrategy::new(cfg, chain_connector);
473
474 afs.on_own_channel_changed(
476 &c1,
477 ChannelDirection::Outgoing,
478 ChannelChange::Balance {
479 left: HoprBalance::from(10_u32),
480 right: c1.balance,
481 },
482 )
483 .await?;
484
485 assert!(
487 afs.in_flight.contains(c1.get_id()),
488 "channel should be in the in-flight set after funding"
489 );
490
491 afs.on_own_channel_changed(
494 &c1,
495 ChannelDirection::Outgoing,
496 ChannelChange::Balance {
497 left: HoprBalance::from(10_u32),
498 right: c1.balance,
499 },
500 )
501 .await?;
502
503 assert_eq!(
505 afs.in_flight.len(),
506 1,
507 "in-flight set should still have exactly one entry"
508 );
509 assert!(
510 afs.in_flight.contains(c1.get_id()),
511 "channel should still be in the in-flight set"
512 );
513
514 Ok(())
515 }
516
517 #[test_log::test(tokio::test)]
518 async fn test_balance_increase_clears_in_flight() -> anyhow::Result<()> {
519 let stake_limit = HoprBalance::from(7_u32);
520 let fund_amount = HoprBalance::from(5_u32);
521
522 let c1 = ChannelEntry::new(*BOB, *CHRIS, 3_u32.into(), 0_u32.into(), ChannelStatus::Open, 0_u32);
523
524 let blokli_sim = BlokliTestStateBuilder::default()
525 .with_generated_accounts(
526 &[&*ALICE, &*BOB, &*CHRIS, &*DAVE],
527 false,
528 XDaiBalance::new_base(1),
529 HoprBalance::new_base(1000),
530 )
531 .with_channels([c1])
532 .build_dynamic_client([1; Address::SIZE].into());
533
534 let mut chain_connector =
535 create_trustful_hopr_blokli_connector(&BOB_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
536 .await?;
537 chain_connector.connect().await?;
538 let _events = chain_connector.subscribe()?;
539
540 let cfg = AutoFundingStrategyConfig {
541 min_stake_threshold: stake_limit,
542 funding_amount: fund_amount,
543 };
544
545 let afs = AutoFundingStrategy::new(cfg, chain_connector);
546
547 afs.on_own_channel_changed(
549 &c1,
550 ChannelDirection::Outgoing,
551 ChannelChange::Balance {
552 left: HoprBalance::from(10_u32),
553 right: c1.balance,
554 },
555 )
556 .await?;
557
558 assert!(afs.in_flight.contains(c1.get_id()));
560
561 let funded_channel = ChannelEntry::new(
563 *BOB,
564 *CHRIS,
565 (3_u32 + 5_u32).into(),
566 0_u32.into(),
567 ChannelStatus::Open,
568 0,
569 );
570
571 afs.on_own_channel_changed(
572 &funded_channel,
573 ChannelDirection::Outgoing,
574 ChannelChange::Balance {
575 left: HoprBalance::from(3_u32),
576 right: HoprBalance::from(8_u32),
577 },
578 )
579 .await?;
580
581 assert!(
583 !afs.in_flight.contains(c1.get_id()),
584 "channel should be cleared from in-flight after balance increase"
585 );
586
587 Ok(())
588 }
589
590 #[test_log::test(tokio::test)]
591 async fn test_on_tick_skips_in_flight_channels() -> anyhow::Result<()> {
592 let stake_limit = HoprBalance::from(7_u32);
593 let fund_amount = HoprBalance::from(5_u32);
594
595 let c1 = ChannelEntry::new(*BOB, *CHRIS, 3_u32.into(), 0_u32.into(), ChannelStatus::Open, 0);
597
598 let blokli_sim = BlokliTestStateBuilder::default()
599 .with_generated_accounts(
600 &[&*ALICE, &*BOB, &*CHRIS, &*DAVE],
601 false,
602 XDaiBalance::new_base(1),
603 HoprBalance::new_base(1000),
604 )
605 .with_channels([c1])
606 .build_dynamic_client([1; Address::SIZE].into());
607
608 let mut chain_connector =
609 create_trustful_hopr_blokli_connector(&BOB_KP, Default::default(), blokli_sim, [1; Address::SIZE].into())
610 .await?;
611 chain_connector.connect().await?;
612 let _events = chain_connector.subscribe()?;
613
614 let cfg = AutoFundingStrategyConfig {
615 min_stake_threshold: stake_limit,
616 funding_amount: fund_amount,
617 };
618
619 let afs = AutoFundingStrategy::new(cfg, chain_connector);
620
621 afs.on_own_channel_changed(
623 &c1,
624 ChannelDirection::Outgoing,
625 ChannelChange::Balance {
626 left: HoprBalance::from(10_u32),
627 right: c1.balance,
628 },
629 )
630 .await?;
631
632 assert!(
634 afs.in_flight.contains(c1.get_id()),
635 "channel should be in the in-flight set after funding"
636 );
637
638 afs.on_tick().await?;
640
641 assert_eq!(
643 afs.in_flight.len(),
644 1,
645 "in-flight set should still have exactly one entry"
646 );
647 assert!(
648 afs.in_flight.contains(c1.get_id()),
649 "channel should still be in the in-flight set"
650 );
651
652 Ok(())
653 }
654}