1use std::{
29 collections::HashMap,
30 fmt::{Debug, Display, Formatter},
31 sync::Arc,
32};
33
34use async_lock::{RwLock, RwLockUpgradableReadGuardArc};
35use async_trait::async_trait;
36use hopr_async_runtime::prelude::{JoinHandle, spawn};
37use hopr_crypto_types::prelude::Hash;
38use hopr_db_sql::{
39 api::tickets::{AggregationPrerequisites, HoprDbTicketOperations},
40 channels::HoprDbChannelOperations,
41};
42use hopr_internal_types::prelude::*;
43#[cfg(all(feature = "prometheus", not(test)))]
44use hopr_metrics::metrics::SimpleCounter;
45use hopr_transport_ticket_aggregation::TicketAggregatorTrait;
46use serde::{Deserialize, Serialize};
47use serde_with::serde_as;
48use tracing::{debug, error, info, warn};
49use validator::Validate;
50
51use crate::{Strategy, strategy::SingularStrategy};
52
53#[cfg(all(feature = "prometheus", not(test)))]
54lazy_static::lazy_static! {
55 static ref METRIC_COUNT_AGGREGATIONS: SimpleCounter =
56 SimpleCounter::new("hopr_strategy_aggregating_aggregation_count", "Count of initiated automatic aggregations").unwrap();
57}
58
59use hopr_platform::time::native::current_time;
60
61const MAX_AGGREGATABLE_TICKET_COUNT: u32 = hopr_db_sql::tickets::MAX_TICKETS_TO_AGGREGATE_BATCH as u32;
62
63#[inline]
64fn default_aggregation_threshold() -> Option<u32> {
65 Some(250)
66}
67
68#[inline]
69fn just_true() -> bool {
70 true
71}
72
73#[inline]
74fn default_unrealized_balance_ratio() -> Option<f32> {
75 Some(0.9)
76}
77
78#[serde_as]
80#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
81pub struct AggregatingStrategyConfig {
82 #[validate(range(min = 2, max = MAX_AGGREGATABLE_TICKET_COUNT))]
89 #[serde(default = "default_aggregation_threshold")]
90 #[default(default_aggregation_threshold())]
91 pub aggregation_threshold: Option<u32>,
92
93 #[validate(range(min = 0_f32, max = 1.0_f32))]
101 #[default(default_unrealized_balance_ratio())]
102 pub unrealized_balance_ratio: Option<f32>,
103
104 #[default(just_true())]
112 pub aggregate_on_channel_close: bool,
113}
114
115impl From<AggregatingStrategyConfig> for AggregationPrerequisites {
116 fn from(value: AggregatingStrategyConfig) -> Self {
117 AggregationPrerequisites {
118 min_ticket_count: value.aggregation_threshold.map(|x| x as usize),
119 min_unaggregated_ratio: value.unrealized_balance_ratio.map(|x| x as f64),
120 }
121 }
122}
123
124pub struct AggregatingStrategy<Db>
130where
131 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
132{
133 db: Db,
134 ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
135 cfg: AggregatingStrategyConfig,
136 #[allow(clippy::type_complexity)]
137 agg_tasks: Arc<RwLock<HashMap<Hash, (bool, JoinHandle<()>)>>>,
138}
139
140impl<Db> Debug for AggregatingStrategy<Db>
141where
142 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
143{
144 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145 write!(f, "{:?}", Strategy::Aggregating(self.cfg))
146 }
147}
148
149impl<Db> Display for AggregatingStrategy<Db>
150where
151 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
152{
153 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154 write!(f, "{}", Strategy::Aggregating(self.cfg))
155 }
156}
157
158impl<Db> AggregatingStrategy<Db>
159where
160 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
161{
162 pub fn new(
163 cfg: AggregatingStrategyConfig,
164 db: Db,
165 ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
166 ) -> Self {
167 Self {
168 db,
169 cfg,
170 ticket_aggregator,
171 agg_tasks: Arc::new(RwLock::new(HashMap::new())),
172 }
173 }
174}
175
176impl<Db> AggregatingStrategy<Db>
177where
178 Db: HoprDbChannelOperations + HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug + 'static,
179{
180 async fn try_start_aggregation(
181 &self,
182 channel_id: Hash,
183 criteria: AggregationPrerequisites,
184 ) -> crate::errors::Result<()> {
185 if !self.is_strategy_aggregating_in_channel(channel_id).await {
186 debug!("checking aggregation in {channel_id} with criteria {criteria:?}...");
187
188 let agg_tasks_clone = self.agg_tasks.clone();
189 let aggregator_clone = self.ticket_aggregator.clone();
190 let (can_remove_tx, can_remove_rx) = futures::channel::oneshot::channel();
191 let task = spawn(async move {
192 match aggregator_clone.aggregate_tickets(&channel_id, criteria).await {
193 Ok(_) => {
194 debug!("tried ticket aggregation in channel {channel_id} without any issues");
195
196 #[cfg(all(feature = "prometheus", not(test)))]
197 METRIC_COUNT_AGGREGATIONS.increment();
198 }
199 Err(e) => {
200 error!("cannot complete aggregation in channel {channel_id}: {e}");
201 }
202 }
203
204 let _ = can_remove_rx.await;
206 if let Some((done, _)) = agg_tasks_clone.write_arc().await.get_mut(&channel_id) {
207 *done = true;
208 }
209 });
210
211 self.agg_tasks.write_arc().await.insert(channel_id, (false, task));
212 let _ = can_remove_tx.send(()); } else {
214 warn!(channel = %channel_id, "this strategy already aggregates in channel");
215 }
216
217 Ok(())
218 }
219
220 async fn is_strategy_aggregating_in_channel(&self, channel_id: Hash) -> bool {
221 let tasks_read_locked = self.agg_tasks.upgradable_read_arc().await;
222 let existing = tasks_read_locked.get(&channel_id).map(|(done, _)| *done);
223 if let Some(done) = existing {
224 if done {
226 let mut tasks_write_locked = RwLockUpgradableReadGuardArc::upgrade(tasks_read_locked).await;
227
228 if let Some((_, task)) = tasks_write_locked.remove(&channel_id) {
229 let _ = task.await;
231 false
232 } else {
233 false
235 }
236 } else {
237 true
239 }
240 } else {
241 false
243 }
244 }
245}
246
247#[async_trait]
248impl<Db> SingularStrategy for AggregatingStrategy<Db>
249where
250 Db: HoprDbChannelOperations + HoprDbTicketOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
251{
252 async fn on_tick(&self) -> crate::errors::Result<()> {
253 let incoming = self
254 .db
255 .get_incoming_channels(None)
256 .await
257 .map_err(hopr_db_sql::api::errors::DbError::from)?
258 .into_iter()
259 .filter(|c| !c.closure_time_passed(current_time()))
260 .map(|c| c.get_id());
261
262 for channel_id in incoming {
263 if let Err(e) = self.try_start_aggregation(channel_id, self.cfg.into()).await {
264 debug!("skipped aggregation in channel {channel_id}: {e}");
265 }
266 }
267
268 Ok(())
269 }
270
271 async fn on_own_channel_changed(
272 &self,
273 channel: &ChannelEntry,
274 direction: ChannelDirection,
275 change: ChannelChange,
276 ) -> crate::errors::Result<()> {
277 if !self.cfg.aggregate_on_channel_close || direction != ChannelDirection::Incoming {
278 return Ok(());
279 }
280
281 if let ChannelChange::Status { left: old, right: new } = change {
282 if old != ChannelStatus::Open || !matches!(new, ChannelStatus::PendingToClose(_)) {
283 debug!("ignoring channel {channel} state change that's not in PendingToClose");
284 return Ok(());
285 }
286
287 info!(%channel, "going to aggregate tickets in channel because it transitioned to PendingToClose");
288
289 let on_close_agg_prerequisites = AggregationPrerequisites {
291 min_ticket_count: Some(2),
292 min_unaggregated_ratio: None,
293 };
294
295 Ok(self
296 .try_start_aggregation(channel.get_id(), on_close_agg_prerequisites)
297 .await?)
298 } else {
299 Ok(())
300 }
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use std::{pin::pin, sync::Arc, time::Duration};
307
308 use anyhow::Context;
309 use futures::{FutureExt, StreamExt, pin_mut};
310 use hex_literal::hex;
311 use hopr_crypto_types::prelude::*;
312 use hopr_db_sql::{
313 HoprDbGeneralModelOperations, TargetDb,
314 accounts::HoprDbAccountOperations,
315 api::{info::DomainSeparator, tickets::HoprDbTicketOperations},
316 channels::HoprDbChannelOperations,
317 db::HoprDb,
318 errors::DbSqlError,
319 info::HoprDbInfoOperations,
320 };
321 use hopr_internal_types::prelude::*;
322 use hopr_primitive_types::prelude::*;
323 use hopr_transport_ticket_aggregation::{
324 AwaitingAggregator, TicketAggregationInteraction, TicketAggregationProcessed,
325 };
326 use lazy_static::lazy_static;
327 use tokio::time::timeout;
328 use tracing::{debug, error};
329
330 use crate::{
331 aggregating::{MAX_AGGREGATABLE_TICKET_COUNT, default_aggregation_threshold},
332 strategy::SingularStrategy,
333 };
334
335 #[test]
336 fn default_ticket_aggregation_count_is_lower_than_maximum_allowed_ticket_count() -> anyhow::Result<()> {
337 assert!(default_aggregation_threshold().unwrap() < MAX_AGGREGATABLE_TICKET_COUNT);
338
339 Ok(())
340 }
341
342 lazy_static! {
343 static ref PEERS: Vec<OffchainKeypair> = [
344 hex!("b91a28ff9840e9c93e5fafd581131f0b9f33f3e61b02bf5dd83458aa0221f572"),
345 hex!("82283757872f99541ce33a47b90c2ce9f64875abf08b5119a8a434b2fa83ea98")
346 ]
347 .iter()
348 .map(|private| OffchainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
349 .collect();
350 static ref PEERS_CHAIN: Vec<ChainKeypair> = [
351 hex!("51d3003d908045a4d76d0bfc0d84f6ff946b5934b7ea6a2958faf02fead4567a"),
352 hex!("e1f89073a01831d0eed9fe2c67e7d65c144b9d9945320f6d325b1cccc2d124e9")
353 ]
354 .iter()
355 .map(|private| ChainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
356 .collect();
357 }
358
359 fn mock_acknowledged_ticket(
360 signer: &ChainKeypair,
361 destination: &ChainKeypair,
362 index: u64,
363 index_offset: u32,
364 ) -> anyhow::Result<AcknowledgedTicket> {
365 let price_per_packet: U256 = 20_u32.into();
366 let ticket_win_prob = 1.0f64;
367
368 let channel_id = generate_channel_id(&signer.into(), &destination.into());
369
370 let channel_epoch = 1u64;
371 let domain_separator = Hash::default();
372
373 let response = Response::try_from(
374 Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
375 )?;
376
377 Ok(TicketBuilder::default()
378 .addresses(signer, destination)
379 .amount(price_per_packet.div_f64(ticket_win_prob)?)
380 .index(index)
381 .index_offset(index_offset)
382 .win_prob(ticket_win_prob.try_into()?)
383 .channel_epoch(1)
384 .challenge(response.to_challenge().into())
385 .build_signed(signer, &domain_separator)?
386 .into_acknowledged(response))
387 }
388
389 async fn populate_db_with_ack_tickets(
390 db: HoprDb,
391 amount: usize,
392 ) -> anyhow::Result<(Vec<AcknowledgedTicket>, ChannelEntry)> {
393 let db_clone = db.clone();
394 let (acked_tickets, total_value) = db
395 .begin_transaction_in_db(TargetDb::Tickets)
396 .await?
397 .perform(|tx| {
398 Box::pin(async move {
399 let mut acked_tickets = Vec::new();
400 let mut total_value = HoprBalance::zero();
401
402 for i in 0..amount {
403 let acked_ticket = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], i as u64, 1)
404 .expect("should be able to create an ack ticket");
405 debug!("inserting {acked_ticket}");
406
407 db_clone.upsert_ticket(Some(tx), acked_ticket.clone()).await?;
408
409 total_value += acked_ticket.verified_ticket().amount;
410 acked_tickets.push(acked_ticket);
411 }
412
413 Ok::<_, DbSqlError>((acked_tickets, total_value))
414 })
415 })
416 .await?;
417
418 let channel = ChannelEntry::new(
419 (&PEERS_CHAIN[0]).into(),
420 (&PEERS_CHAIN[1]).into(),
421 total_value,
422 0_u32.into(),
423 ChannelStatus::Open,
424 1u32.into(),
425 );
426
427 Ok((acked_tickets, channel))
428 }
429
430 async fn init_db(db: HoprDb) -> anyhow::Result<()> {
431 let db_clone = db.clone();
432 db.begin_transaction()
433 .await?
434 .perform(|tx| {
435 Box::pin(async move {
436 db_clone
437 .set_domain_separator(Some(tx), DomainSeparator::Channel, Hash::default())
438 .await?;
439 for i in 0..PEERS_CHAIN.len() {
440 debug!(
441 "linking {} with {}",
442 PEERS[i].public(),
443 PEERS_CHAIN[i].public().to_address()
444 );
445 db_clone
446 .insert_account(
447 Some(tx),
448 AccountEntry {
449 public_key: *PEERS[i].public(),
450 chain_addr: PEERS_CHAIN[i].public().to_address(),
451 entry_type: AccountType::NotAnnounced,
452 published_at: 1,
453 },
454 )
455 .await?;
456 }
457 Ok::<_, DbSqlError>(())
458 })
459 })
460 .await?;
461
462 Ok(())
463 }
464
465 fn spawn_aggregation_interaction(
466 db_alice: HoprDb,
467 db_bob: HoprDb,
468 key_alice: &ChainKeypair,
469 key_bob: &ChainKeypair,
470 ) -> anyhow::Result<(
471 AwaitingAggregator<(), (), HoprDb>,
472 futures::channel::oneshot::Receiver<()>,
473 )> {
474 let mut alice = TicketAggregationInteraction::<(), ()>::new(db_alice, key_alice);
475 let mut bob = TicketAggregationInteraction::<(), ()>::new(db_bob.clone(), key_bob);
476
477 let (tx, awaiter) = futures::channel::oneshot::channel::<()>();
478 let bob_aggregator = bob.writer();
479
480 tokio::task::spawn(async move {
481 let mut finalizer = None;
482
483 match bob.next().await {
484 Some(TicketAggregationProcessed::Send(_, acked_tickets, request_finalizer)) => {
485 let _ = finalizer.insert(request_finalizer);
486 match alice.writer().receive_aggregation_request(
487 PEERS[1].public().into(),
488 acked_tickets.into_iter().collect(),
489 (),
490 ) {
491 Ok(_) => {}
492 Err(e) => error!(error = %e, "Failed to received aggregation ticket"),
493 }
494 }
495 _ => panic!("unexpected action happened"),
498 };
499
500 match alice.next().await {
501 Some(TicketAggregationProcessed::Reply(_, aggregated_ticket, ())) => {
502 match bob
503 .writer()
504 .receive_ticket(PEERS[0].public().into(), aggregated_ticket, ())
505 {
506 Ok(_) => {}
507 Err(e) => error!(error = %e, "Failed to receive a ticket"),
508 }
509 }
510
511 _ => panic!("unexpected action happened"),
512 };
513
514 match bob.next().await {
515 Some(TicketAggregationProcessed::Receive(_destination, _ticket, ())) => (),
516 _ => panic!("unexpected action happened"),
517 };
518
519 finalizer.expect("should have a value present").finalize();
520 let _ = tx.send(());
521 });
522
523 Ok((
524 AwaitingAggregator::new(db_bob, bob_aggregator, Duration::from_secs(5)),
525 awaiter,
526 ))
527 }
528
529 #[tokio::test]
530 async fn test_strategy_aggregation_on_tick() -> anyhow::Result<()> {
531 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
534 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
535
536 init_db(db_alice.clone()).await?;
537 init_db(db_bob.clone()).await?;
538
539 let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
540 db_bob.start_ticket_processing(bob_notify_tx.into())?;
541
542 let (_, channel) = populate_db_with_ack_tickets(db_bob.clone(), 5).await?;
543
544 db_alice.upsert_channel(None, channel).await?;
545 db_bob.upsert_channel(None, channel).await?;
546
547 let (bob_aggregator, awaiter) =
548 spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
549
550 let cfg = super::AggregatingStrategyConfig {
551 aggregation_threshold: Some(5),
552 unrealized_balance_ratio: None,
553 aggregate_on_channel_close: false,
554 };
555
556 let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
557
558 aggregation_strategy.on_tick().await?;
560
561 let f1 = pin!(awaiter);
563 let f2 = pin!(tokio::time::sleep(Duration::from_secs(5)).fuse());
564 let _ = futures::future::select(f1, f2).await;
565
566 pin_mut!(bob_notify_rx);
567 let notified_ticket = bob_notify_rx.next().await.expect("should have a ticket");
568
569 let tickets = db_bob.get_tickets((&channel).into()).await?;
570 assert_eq!(tickets.len(), 1, "there should be a single aggregated ticket");
571 assert_eq!(notified_ticket, tickets[0]);
572
573 Ok(())
574 }
575
576 #[tokio::test]
577 async fn test_strategy_aggregation_on_tick_when_unrealized_balance_exceeded() -> anyhow::Result<()> {
578 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
581 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
582
583 init_db(db_alice.clone()).await?;
584 init_db(db_bob.clone()).await?;
585
586 let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
587 db_bob.start_ticket_processing(bob_notify_tx.into())?;
588
589 let (_, channel) = populate_db_with_ack_tickets(db_bob.clone(), 4).await?;
590
591 db_alice.upsert_channel(None, channel).await?;
592 db_bob.upsert_channel(None, channel).await?;
593
594 let (bob_aggregator, awaiter) =
595 spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
596
597 let cfg = super::AggregatingStrategyConfig {
598 aggregation_threshold: None,
599 unrealized_balance_ratio: Some(0.75),
600 aggregate_on_channel_close: false,
601 };
602
603 let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
604
605 aggregation_strategy.on_tick().await?;
607
608 let f1 = pin!(awaiter);
610 let f2 = pin!(tokio::time::sleep(Duration::from_secs(5)));
611 let _ = futures::future::select(f1, f2).await;
612
613 pin_mut!(bob_notify_rx);
614 let notified_ticket = bob_notify_rx.next().await.expect("should have a ticket");
615
616 let tickets = db_bob.get_tickets((&channel).into()).await?;
617 assert_eq!(tickets.len(), 1, "there should be a single aggregated ticket");
618 assert_eq!(notified_ticket, tickets[0]);
619
620 Ok(())
621 }
622
623 #[tokio::test]
624 async fn test_strategy_aggregation_on_tick_should_not_agg_when_unrealized_balance_exceeded_via_aggregated_tickets()
625 -> anyhow::Result<()> {
626 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
629 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
630
631 init_db(db_alice.clone()).await?;
632 init_db(db_bob.clone()).await?;
633
634 db_bob.start_ticket_processing(None)?;
635
636 const NUM_TICKETS: usize = 4;
637 let (mut acked_tickets, mut channel) = populate_db_with_ack_tickets(db_bob.clone(), NUM_TICKETS).await?;
638
639 let (bob_aggregator, awaiter) =
640 spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
641
642 acked_tickets[0] = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], 0, 2)?;
644
645 debug!("upserting {}", acked_tickets[0]);
646 db_bob.upsert_ticket(None, acked_tickets[0].clone()).await?;
647
648 let tickets = db_bob.get_tickets((&channel).into()).await?;
649 assert_eq!(tickets.len(), NUM_TICKETS, "nothing should be aggregated");
650
651 channel.balance = HoprBalance::from(100_u32);
652
653 db_alice.upsert_channel(None, channel).await?;
654 db_bob.upsert_channel(None, channel).await?;
655
656 let cfg = super::AggregatingStrategyConfig {
657 aggregation_threshold: None,
658 unrealized_balance_ratio: Some(0.75),
659 aggregate_on_channel_close: false,
660 };
661
662 let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
663
664 aggregation_strategy.on_tick().await?;
666
667 let tickets = db_bob.get_tickets((&channel).into()).await?;
668 assert_eq!(tickets.len(), NUM_TICKETS, "nothing should be aggregated");
669 std::mem::drop(awaiter);
670
671 Ok(())
672 }
673
674 #[tokio::test]
675 async fn test_strategy_aggregation_on_channel_close() -> anyhow::Result<()> {
676 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
679 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
680
681 init_db(db_alice.clone()).await?;
682 init_db(db_bob.clone()).await?;
683
684 let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
685 db_bob.start_ticket_processing(bob_notify_tx.into())?;
686
687 let (_, mut channel) = populate_db_with_ack_tickets(db_bob.clone(), 5).await?;
688
689 let cfg = super::AggregatingStrategyConfig {
690 aggregation_threshold: Some(100),
691 unrealized_balance_ratio: None,
692 aggregate_on_channel_close: true,
693 };
694
695 channel.status = ChannelStatus::PendingToClose(std::time::SystemTime::now());
696
697 db_alice.upsert_channel(None, channel).await?;
698 db_bob.upsert_channel(None, channel).await?;
699
700 let (bob_aggregator, awaiter) =
701 spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
702
703 let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_alice.clone(), Arc::new(bob_aggregator));
704
705 aggregation_strategy
706 .on_own_channel_changed(
707 &channel,
708 ChannelDirection::Incoming,
709 ChannelChange::Status {
710 left: ChannelStatus::Open,
711 right: ChannelStatus::PendingToClose(std::time::SystemTime::now()),
712 },
713 )
714 .await?;
715
716 timeout(Duration::from_secs(5), awaiter).await.context("Timeout")??;
718
719 pin_mut!(bob_notify_rx);
720 let notified_ticket = bob_notify_rx.next().await.expect("should have a ticket");
721
722 let tickets = db_bob.get_tickets((&channel).into()).await?;
723 assert_eq!(tickets.len(), 1, "there should be a single aggregated ticket");
724 assert_eq!(notified_ticket, tickets[0]);
725
726 Ok(())
727 }
728
729 #[tokio::test]
730 async fn test_strategy_aggregation_on_tick_should_not_agg_on_channel_close_if_only_single_ticket()
731 -> anyhow::Result<()> {
732 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
735 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
736
737 init_db(db_alice.clone()).await?;
738 init_db(db_bob.clone()).await?;
739
740 db_bob.start_ticket_processing(None)?;
741
742 const NUM_TICKETS: usize = 1;
743 let (_, channel) = populate_db_with_ack_tickets(db_bob.clone(), NUM_TICKETS).await?;
744
745 let (bob_aggregator, awaiter) =
746 spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
747
748 let tickets = db_bob.get_tickets((&channel).into()).await?;
749 assert_eq!(tickets.len(), NUM_TICKETS, "should have a single ticket");
750
751 db_alice.upsert_channel(None, channel).await?;
752 db_bob.upsert_channel(None, channel).await?;
753
754 let cfg = super::AggregatingStrategyConfig {
755 aggregation_threshold: Some(100),
756 unrealized_balance_ratio: Some(0.75),
757 aggregate_on_channel_close: true,
758 };
759
760 let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
761
762 aggregation_strategy
763 .on_own_channel_changed(
764 &channel,
765 ChannelDirection::Incoming,
766 ChannelChange::Status {
767 left: ChannelStatus::Open,
768 right: ChannelStatus::PendingToClose(std::time::SystemTime::now()),
769 },
770 )
771 .await?;
772
773 timeout(Duration::from_millis(500), awaiter)
774 .await
775 .expect_err("should timeout");
776
777 let tickets = db_bob.get_tickets((&channel).into()).await?;
778 assert_eq!(tickets.len(), NUM_TICKETS, "nothing should be aggregated");
779 Ok(())
780 }
781}