1use async_lock::RwLock;
27use async_trait::async_trait;
28use serde::{Deserialize, Serialize};
29use serde_with::serde_as;
30use std::collections::HashMap;
31use std::fmt::{Debug, Display, Formatter};
32use std::sync::Arc;
33use tracing::{debug, error, info, warn};
34use validator::Validate;
35
36use hopr_async_runtime::prelude::{spawn, JoinHandle};
37use hopr_crypto_types::prelude::Hash;
38use hopr_db_sql::api::tickets::{AggregationPrerequisites, HoprDbTicketOperations};
39use hopr_db_sql::channels::HoprDbChannelOperations;
40use hopr_internal_types::prelude::*;
41use hopr_transport_protocol::ticket_aggregation::processor::TicketAggregatorTrait;
42
43use crate::{strategy::SingularStrategy, Strategy};
44
45#[cfg(all(feature = "prometheus", not(test)))]
46use hopr_metrics::metrics::SimpleCounter;
47
48#[cfg(all(feature = "prometheus", not(test)))]
49lazy_static::lazy_static! {
50 static ref METRIC_COUNT_AGGREGATIONS: SimpleCounter =
51 SimpleCounter::new("hopr_strategy_aggregating_aggregation_count", "Count of initiated automatic aggregations").unwrap();
52}
53
54use hopr_platform::time::native::current_time;
55
56const MAX_AGGREGATABLE_TICKET_COUNT: u32 = hopr_db_sql::tickets::MAX_TICKETS_TO_AGGREGATE_BATCH as u32;
57
58#[inline]
59fn default_aggregation_threshold() -> Option<u32> {
60 Some(250)
61}
62
63#[inline]
64fn just_true() -> bool {
65 true
66}
67
68#[inline]
69fn default_unrealized_balance_ratio() -> Option<f32> {
70 Some(0.9)
71}
72
73#[serde_as]
75#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
76pub struct AggregatingStrategyConfig {
77 #[validate(range(min = 2, max = MAX_AGGREGATABLE_TICKET_COUNT))]
84 #[serde(default = "default_aggregation_threshold")]
85 #[default(default_aggregation_threshold())]
86 pub aggregation_threshold: Option<u32>,
87
88 #[validate(range(min = 0_f32, max = 1.0_f32))]
96 #[default(default_unrealized_balance_ratio())]
97 pub unrealized_balance_ratio: Option<f32>,
98
99 #[default(just_true())]
107 pub aggregate_on_channel_close: bool,
108}
109
110impl From<AggregatingStrategyConfig> for AggregationPrerequisites {
111 fn from(value: AggregatingStrategyConfig) -> Self {
112 AggregationPrerequisites {
113 min_ticket_count: value.aggregation_threshold.map(|x| x as usize),
114 min_unaggregated_ratio: value.unrealized_balance_ratio.map(|x| x as f64),
115 }
116 }
117}
118
119pub struct AggregatingStrategy<Db>
125where
126 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
127{
128 db: Db,
129 ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
130 cfg: AggregatingStrategyConfig,
131 #[allow(clippy::type_complexity)]
132 agg_tasks: Arc<RwLock<HashMap<Hash, (bool, JoinHandle<()>)>>>,
133}
134
135impl<Db> Debug for AggregatingStrategy<Db>
136where
137 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
138{
139 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140 write!(f, "{:?}", Strategy::Aggregating(self.cfg))
141 }
142}
143
144impl<Db> Display for AggregatingStrategy<Db>
145where
146 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
147{
148 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149 write!(f, "{}", Strategy::Aggregating(self.cfg))
150 }
151}
152
153impl<Db> AggregatingStrategy<Db>
154where
155 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
156{
157 pub fn new(
158 cfg: AggregatingStrategyConfig,
159 db: Db,
160 ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
161 ) -> Self {
162 Self {
163 db,
164 cfg,
165 ticket_aggregator,
166 agg_tasks: Arc::new(RwLock::new(HashMap::new())),
167 }
168 }
169}
170
171impl<Db> AggregatingStrategy<Db>
172where
173 Db: HoprDbChannelOperations + HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug + 'static,
174{
175 async fn try_start_aggregation(
176 &self,
177 channel_id: Hash,
178 criteria: AggregationPrerequisites,
179 ) -> crate::errors::Result<()> {
180 if !self.is_strategy_aggregating_in_channel(channel_id).await {
181 debug!("checking aggregation in {channel_id} with criteria {criteria:?}...");
182
183 let agg_tasks_clone = self.agg_tasks.clone();
184 let aggregator_clone = self.ticket_aggregator.clone();
185 let (can_remove_tx, can_remove_rx) = futures::channel::oneshot::channel();
186 let task = spawn(async move {
187 match aggregator_clone.aggregate_tickets(&channel_id, criteria).await {
188 Ok(_) => {
189 debug!("tried ticket aggregation in channel {channel_id} without any issues");
190
191 #[cfg(all(feature = "prometheus", not(test)))]
192 METRIC_COUNT_AGGREGATIONS.increment();
193 }
194 Err(e) => {
195 error!("cannot complete aggregation in channel {channel_id}: {e}");
196 }
197 }
198
199 let _ = can_remove_rx.await;
201 if let Some((done, _)) = agg_tasks_clone.write().await.get_mut(&channel_id) {
202 *done = true;
203 }
204 });
205
206 self.agg_tasks.write().await.insert(channel_id, (false, task));
207 let _ = can_remove_tx.send(()); } else {
209 warn!(channel = %channel_id, "this strategy already aggregates in channel");
210 }
211
212 Ok(())
213 }
214
215 async fn is_strategy_aggregating_in_channel(&self, channel_id: Hash) -> bool {
216 let existing = self.agg_tasks.read().await.get(&channel_id).map(|(done, _)| *done);
217 if let Some(done) = existing {
218 if done {
220 if let Some((_, task)) = self.agg_tasks.write().await.remove(&channel_id) {
221 let _ = task.await;
223 false
224 } else {
225 false
227 }
228 } else {
229 true
231 }
232 } else {
233 false
235 }
236 }
237}
238
239#[async_trait]
240impl<Db> SingularStrategy for AggregatingStrategy<Db>
241where
242 Db: HoprDbChannelOperations + HoprDbTicketOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
243{
244 async fn on_tick(&self) -> crate::errors::Result<()> {
245 let incoming = self
246 .db
247 .get_incoming_channels(None)
248 .await
249 .map_err(hopr_db_sql::api::errors::DbError::from)?
250 .into_iter()
251 .filter(|c| !c.closure_time_passed(current_time()))
252 .map(|c| c.get_id());
253
254 for channel_id in incoming {
255 if let Err(e) = self.try_start_aggregation(channel_id, self.cfg.into()).await {
256 debug!("skipped aggregation in channel {channel_id}: {e}");
257 }
258 }
259
260 Ok(())
261 }
262
263 async fn on_own_channel_changed(
264 &self,
265 channel: &ChannelEntry,
266 direction: ChannelDirection,
267 change: ChannelChange,
268 ) -> crate::errors::Result<()> {
269 if !self.cfg.aggregate_on_channel_close || direction != ChannelDirection::Incoming {
270 return Ok(());
271 }
272
273 if let ChannelChange::Status { left: old, right: new } = change {
274 if old != ChannelStatus::Open || !matches!(new, ChannelStatus::PendingToClose(_)) {
275 debug!("ignoring channel {channel} state change that's not in PendingToClose");
276 return Ok(());
277 }
278
279 info!(%channel, "going to aggregate tickets in channel because it transitioned to PendingToClose");
280
281 let on_close_agg_prerequisites = AggregationPrerequisites {
283 min_ticket_count: Some(2),
284 min_unaggregated_ratio: None,
285 };
286
287 Ok(self
288 .try_start_aggregation(channel.get_id(), on_close_agg_prerequisites)
289 .await?)
290 } else {
291 Ok(())
292 }
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use crate::aggregating::{default_aggregation_threshold, MAX_AGGREGATABLE_TICKET_COUNT};
299 use crate::strategy::SingularStrategy;
300 use anyhow::Context;
301 use async_std::prelude::FutureExt as AsyncStdFutureExt;
302 use futures::{pin_mut, FutureExt, StreamExt};
303 use hex_literal::hex;
304 use hopr_crypto_types::prelude::*;
305 use hopr_db_sql::accounts::HoprDbAccountOperations;
306 use hopr_db_sql::api::{info::DomainSeparator, tickets::HoprDbTicketOperations};
307 use hopr_db_sql::channels::HoprDbChannelOperations;
308 use hopr_db_sql::db::HoprDb;
309 use hopr_db_sql::errors::DbSqlError;
310 use hopr_db_sql::info::HoprDbInfoOperations;
311 use hopr_db_sql::{HoprDbGeneralModelOperations, TargetDb};
312 use hopr_internal_types::prelude::*;
313 use hopr_primitive_types::prelude::*;
314 use hopr_transport_protocol::ticket_aggregation::processor::{
315 AwaitingAggregator, TicketAggregationInteraction, TicketAggregationProcessed,
316 };
317 use lazy_static::lazy_static;
318 use std::ops::Add;
319 use std::pin::pin;
320 use std::sync::Arc;
321 use std::time::Duration;
322 use tracing::{debug, error};
323
324 #[test]
325 fn default_ticket_aggregation_count_is_lower_than_maximum_allowed_ticket_count() -> anyhow::Result<()> {
326 assert!(default_aggregation_threshold().unwrap() < MAX_AGGREGATABLE_TICKET_COUNT);
327
328 Ok(())
329 }
330
331 lazy_static! {
332 static ref PEERS: Vec<OffchainKeypair> = vec![
333 hex!("b91a28ff9840e9c93e5fafd581131f0b9f33f3e61b02bf5dd83458aa0221f572"),
334 hex!("82283757872f99541ce33a47b90c2ce9f64875abf08b5119a8a434b2fa83ea98")
335 ]
336 .iter()
337 .map(|private| OffchainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
338 .collect();
339 static ref PEERS_CHAIN: Vec<ChainKeypair> = vec![
340 hex!("51d3003d908045a4d76d0bfc0d84f6ff946b5934b7ea6a2958faf02fead4567a"),
341 hex!("e1f89073a01831d0eed9fe2c67e7d65c144b9d9945320f6d325b1cccc2d124e9"),
342 ]
343 .iter()
344 .map(|private| ChainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
345 .collect();
346 }
347
348 fn mock_acknowledged_ticket(
349 signer: &ChainKeypair,
350 destination: &ChainKeypair,
351 index: u64,
352 index_offset: u32,
353 ) -> anyhow::Result<AcknowledgedTicket> {
354 let price_per_packet: U256 = 20_u32.into();
355 let ticket_win_prob = 1.0f64;
356
357 let channel_id = generate_channel_id(&signer.into(), &destination.into());
358
359 let channel_epoch = 1u64;
360 let domain_separator = Hash::default();
361
362 let response = Response::try_from(
363 Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
364 )?;
365
366 Ok(TicketBuilder::default()
367 .addresses(signer, destination)
368 .amount(price_per_packet.div_f64(ticket_win_prob)?)
369 .index(index)
370 .index_offset(index_offset)
371 .win_prob(ticket_win_prob)
372 .channel_epoch(1)
373 .challenge(response.to_challenge().into())
374 .build_signed(signer, &domain_separator)?
375 .into_acknowledged(response))
376 }
377
378 async fn populate_db_with_ack_tickets(
379 db: HoprDb,
380 amount: usize,
381 ) -> anyhow::Result<(Vec<AcknowledgedTicket>, ChannelEntry)> {
382 let db_clone = db.clone();
383 let (acked_tickets, total_value) = db
384 .begin_transaction_in_db(TargetDb::Tickets)
385 .await?
386 .perform(|tx| {
387 Box::pin(async move {
388 let mut acked_tickets = Vec::new();
389 let mut total_value = Balance::zero(BalanceType::HOPR);
390
391 for i in 0..amount {
392 let acked_ticket = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], i as u64, 1)
393 .expect("should be able to create an ack ticket");
394 debug!("inserting {acked_ticket}");
395
396 db_clone.upsert_ticket(Some(tx), acked_ticket.clone()).await?;
397
398 total_value = total_value.add(&acked_ticket.verified_ticket().amount);
399 acked_tickets.push(acked_ticket);
400 }
401
402 Ok::<_, DbSqlError>((acked_tickets, total_value))
403 })
404 })
405 .await?;
406
407 let channel = ChannelEntry::new(
408 (&PEERS_CHAIN[0]).into(),
409 (&PEERS_CHAIN[1]).into(),
410 total_value,
411 0_u32.into(),
412 ChannelStatus::Open,
413 1u32.into(),
414 );
415
416 Ok((acked_tickets, channel))
417 }
418
419 async fn init_db(db: HoprDb) -> anyhow::Result<()> {
420 let db_clone = db.clone();
421 db.begin_transaction()
422 .await?
423 .perform(|tx| {
424 Box::pin(async move {
425 db_clone
426 .set_domain_separator(Some(tx), DomainSeparator::Channel, Hash::default())
427 .await?;
428 for i in 0..PEERS_CHAIN.len() {
429 debug!(
430 "linking {} with {}",
431 PEERS[i].public(),
432 PEERS_CHAIN[i].public().to_address()
433 );
434 db_clone
435 .insert_account(
436 Some(tx),
437 AccountEntry::new(
438 *PEERS[i].public(),
439 PEERS_CHAIN[i].public().to_address(),
440 AccountType::NotAnnounced,
441 ),
442 )
443 .await?;
444 }
445 Ok::<_, DbSqlError>(())
446 })
447 })
448 .await?;
449
450 Ok(())
451 }
452
453 fn spawn_aggregation_interaction(
454 db_alice: HoprDb,
455 db_bob: HoprDb,
456 key_alice: &ChainKeypair,
457 key_bob: &ChainKeypair,
458 ) -> anyhow::Result<(
459 AwaitingAggregator<(), (), HoprDb>,
460 futures::channel::oneshot::Receiver<()>,
461 )> {
462 let mut alice = TicketAggregationInteraction::<(), ()>::new(db_alice, key_alice);
463 let mut bob = TicketAggregationInteraction::<(), ()>::new(db_bob.clone(), key_bob);
464
465 let (tx, awaiter) = futures::channel::oneshot::channel::<()>();
466 let bob_aggregator = bob.writer();
467
468 async_std::task::spawn(async move {
469 let mut finalizer = None;
470
471 match bob.next().await {
472 Some(TicketAggregationProcessed::Send(_, acked_tickets, request_finalizer)) => {
473 let _ = finalizer.insert(request_finalizer);
474 match alice.writer().receive_aggregation_request(
475 PEERS[1].public().into(),
476 acked_tickets.into_iter().map(TransferableWinningTicket::from).collect(),
477 (),
478 ) {
479 Ok(_) => {}
480 Err(e) => error!(error = %e, "Failed to received aggregation ticket"),
481 }
482 }
483 _ => panic!("unexpected action happened"),
485 };
486
487 match alice.next().await {
488 Some(TicketAggregationProcessed::Reply(_, aggregated_ticket, ())) => {
489 match bob
490 .writer()
491 .receive_ticket(PEERS[0].public().into(), aggregated_ticket, ())
492 {
493 Ok(_) => {}
494 Err(e) => error!(error = %e, "Failed to receive a ticket"),
495 }
496 }
497
498 _ => panic!("unexpected action happened"),
499 };
500
501 match bob.next().await {
502 Some(TicketAggregationProcessed::Receive(_destination, _ticket, ())) => (),
503 _ => panic!("unexpected action happened"),
504 };
505
506 finalizer.expect("should have a value present").finalize();
507 let _ = tx.send(());
508 });
509
510 Ok((
511 AwaitingAggregator::new(db_bob, bob_aggregator, Duration::from_secs(5)),
512 awaiter,
513 ))
514 }
515
516 #[async_std::test]
517 async fn test_strategy_aggregation_on_tick() -> anyhow::Result<()> {
518 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
521 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
522
523 init_db(db_alice.clone()).await?;
524 init_db(db_bob.clone()).await?;
525
526 let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
527 db_bob.start_ticket_processing(bob_notify_tx.into())?;
528
529 let (_, channel) = populate_db_with_ack_tickets(db_bob.clone(), 5).await?;
530
531 db_alice.upsert_channel(None, channel).await?;
532 db_bob.upsert_channel(None, channel).await?;
533
534 let (bob_aggregator, awaiter) =
535 spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
536
537 let cfg = super::AggregatingStrategyConfig {
538 aggregation_threshold: Some(5),
539 unrealized_balance_ratio: None,
540 aggregate_on_channel_close: false,
541 };
542
543 let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
544
545 aggregation_strategy.on_tick().await?;
547
548 let f1 = pin!(awaiter);
550 let f2 = pin!(async_std::task::sleep(Duration::from_secs(5)).fuse());
551 let _ = futures::future::select(f1, f2).await;
552
553 pin_mut!(bob_notify_rx);
554 let notified_ticket = bob_notify_rx.next().await.expect("should have a ticket");
555
556 let tickets = db_bob.get_tickets((&channel).into()).await?;
557 assert_eq!(tickets.len(), 1, "there should be a single aggregated ticket");
558 assert_eq!(notified_ticket, tickets[0]);
559
560 Ok(())
561 }
562
563 #[async_std::test]
564 async fn test_strategy_aggregation_on_tick_when_unrealized_balance_exceeded() -> anyhow::Result<()> {
565 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
568 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
569
570 init_db(db_alice.clone()).await?;
571 init_db(db_bob.clone()).await?;
572
573 let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
574 db_bob.start_ticket_processing(bob_notify_tx.into())?;
575
576 let (_, channel) = populate_db_with_ack_tickets(db_bob.clone(), 4).await?;
577
578 db_alice.upsert_channel(None, channel).await?;
579 db_bob.upsert_channel(None, channel).await?;
580
581 let (bob_aggregator, awaiter) =
582 spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
583
584 let cfg = super::AggregatingStrategyConfig {
585 aggregation_threshold: None,
586 unrealized_balance_ratio: Some(0.75),
587 aggregate_on_channel_close: false,
588 };
589
590 let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
591
592 aggregation_strategy.on_tick().await?;
594
595 let f1 = pin!(awaiter);
597 let f2 = pin!(async_std::task::sleep(Duration::from_secs(5)));
598 let _ = futures::future::select(f1, f2).await;
599
600 pin_mut!(bob_notify_rx);
601 let notified_ticket = bob_notify_rx.next().await.expect("should have a ticket");
602
603 let tickets = db_bob.get_tickets((&channel).into()).await?;
604 assert_eq!(tickets.len(), 1, "there should be a single aggregated ticket");
605 assert_eq!(notified_ticket, tickets[0]);
606
607 Ok(())
608 }
609
610 #[async_std::test]
611 async fn test_strategy_aggregation_on_tick_should_not_agg_when_unrealized_balance_exceeded_via_aggregated_tickets(
612 ) -> anyhow::Result<()> {
613 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
616 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
617
618 init_db(db_alice.clone()).await?;
619 init_db(db_bob.clone()).await?;
620
621 db_bob.start_ticket_processing(None)?;
622
623 const NUM_TICKETS: usize = 4;
624 let (mut acked_tickets, mut channel) = populate_db_with_ack_tickets(db_bob.clone(), NUM_TICKETS).await?;
625
626 let (bob_aggregator, awaiter) =
627 spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
628
629 acked_tickets[0] = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], 0, 2)?;
631
632 debug!("upserting {}", acked_tickets[0]);
633 db_bob.upsert_ticket(None, acked_tickets[0].clone()).await?;
634
635 let tickets = db_bob.get_tickets((&channel).into()).await?;
636 assert_eq!(tickets.len(), NUM_TICKETS, "nothing should be aggregated");
637
638 channel.balance = Balance::new(100_u32, BalanceType::HOPR);
639
640 db_alice.upsert_channel(None, channel).await?;
641 db_bob.upsert_channel(None, channel).await?;
642
643 let cfg = super::AggregatingStrategyConfig {
644 aggregation_threshold: None,
645 unrealized_balance_ratio: Some(0.75),
646 aggregate_on_channel_close: false,
647 };
648
649 let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
650
651 aggregation_strategy.on_tick().await?;
653
654 let tickets = db_bob.get_tickets((&channel).into()).await?;
655 assert_eq!(tickets.len(), NUM_TICKETS, "nothing should be aggregated");
656 std::mem::drop(awaiter);
657
658 Ok(())
659 }
660
661 #[async_std::test]
662 async fn test_strategy_aggregation_on_channel_close() -> anyhow::Result<()> {
663 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
666 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
667
668 init_db(db_alice.clone()).await?;
669 init_db(db_bob.clone()).await?;
670
671 let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
672 db_bob.start_ticket_processing(bob_notify_tx.into())?;
673
674 let (_, mut channel) = populate_db_with_ack_tickets(db_bob.clone(), 5).await?;
675
676 let cfg = super::AggregatingStrategyConfig {
677 aggregation_threshold: Some(100),
678 unrealized_balance_ratio: None,
679 aggregate_on_channel_close: true,
680 };
681
682 channel.status = ChannelStatus::PendingToClose(std::time::SystemTime::now());
683
684 db_alice.upsert_channel(None, channel).await?;
685 db_bob.upsert_channel(None, channel).await?;
686
687 let (bob_aggregator, awaiter) =
688 spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
689
690 let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_alice.clone(), Arc::new(bob_aggregator));
691
692 aggregation_strategy
693 .on_own_channel_changed(
694 &channel,
695 ChannelDirection::Incoming,
696 ChannelChange::Status {
697 left: ChannelStatus::Open,
698 right: ChannelStatus::PendingToClose(std::time::SystemTime::now()),
699 },
700 )
701 .await?;
702
703 awaiter.timeout(Duration::from_secs(5)).await.context("Timeout")??;
705
706 pin_mut!(bob_notify_rx);
707 let notified_ticket = bob_notify_rx.next().await.expect("should have a ticket");
708
709 let tickets = db_bob.get_tickets((&channel).into()).await?;
710 assert_eq!(tickets.len(), 1, "there should be a single aggregated ticket");
711 assert_eq!(notified_ticket, tickets[0]);
712
713 Ok(())
714 }
715
716 #[async_std::test]
717 async fn test_strategy_aggregation_on_tick_should_not_agg_on_channel_close_if_only_single_ticket(
718 ) -> anyhow::Result<()> {
719 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
722 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
723
724 init_db(db_alice.clone()).await?;
725 init_db(db_bob.clone()).await?;
726
727 db_bob.start_ticket_processing(None)?;
728
729 const NUM_TICKETS: usize = 1;
730 let (_, channel) = populate_db_with_ack_tickets(db_bob.clone(), NUM_TICKETS).await?;
731
732 let (bob_aggregator, awaiter) =
733 spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
734
735 let tickets = db_bob.get_tickets((&channel).into()).await?;
736 assert_eq!(tickets.len(), NUM_TICKETS, "should have a single ticket");
737
738 db_alice.upsert_channel(None, channel).await?;
739 db_bob.upsert_channel(None, channel).await?;
740
741 let cfg = super::AggregatingStrategyConfig {
742 aggregation_threshold: Some(100),
743 unrealized_balance_ratio: Some(0.75),
744 aggregate_on_channel_close: true,
745 };
746
747 let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
748
749 aggregation_strategy
750 .on_own_channel_changed(
751 &channel,
752 ChannelDirection::Incoming,
753 ChannelChange::Status {
754 left: ChannelStatus::Open,
755 right: ChannelStatus::PendingToClose(std::time::SystemTime::now()),
756 },
757 )
758 .await?;
759
760 awaiter
761 .timeout(Duration::from_millis(500))
762 .await
763 .expect_err("should timeout");
764
765 let tickets = db_bob.get_tickets((&channel).into()).await?;
766 assert_eq!(tickets.len(), NUM_TICKETS, "nothing should be aggregated");
767 Ok(())
768 }
769}