hopr_strategy/
aggregating.rs

1//! ## Aggregating Strategy
2//! This strategy automates ticket aggregation on different channel/ticket events.
3//! Note that the aggregating strategy can be combined with the Auto Redeem Strategy above.
4//!
5//! Ticket aggregation is an interactive process and requires cooperation of the ticket issuer, the aggregation
6//! will fail if the aggregation takes more than `aggregation_timeout` (in seconds). This does not affect runtime of the
7//! strategy, since the ticket aggregation and awaiting it is performed on a separate thread.
8//!
9//! This strategy listens for two distinct channel events and triggers the interactive aggregation based on different
10//! criteria:
11//!
12//! ### 1) New winning acknowledged ticket event
13//!
14//! This strategy listens to newly added acknowledged winning tickets and once the amount of tickets in a certain
15//! channel reaches an `aggregation_threshold`, the strategy will initiate ticket aggregation in that channel.
16//! The strategy can independently also check if the unrealized balance (current balance _minus_ total unredeemed
17//! unaggregated tickets value) in a certain channel has not gone over `unrelalized_balance_ratio` percent of the
18//! current balance in that channel. If that happens, the strategy will also initiate ticket aggregation.
19//!
20//! ### 2) Channel transition from `Open` to `PendingToClose` event
21//!
22//! If the `aggregate_on_channel_close` flag is set, the aggregation will be triggered once a channel transitions from
23//! `Open` to `PendingToClose` state. This behavior does not have any additional criteria, unlike in the previous event,
24//! but there must be at least 2 tickets in the channel.
25//!
26//!
27//! For details on default parameters see [AggregatingStrategyConfig].
28use std::{
29    collections::HashMap,
30    fmt::{Debug, Display, Formatter},
31    sync::Arc,
32};
33
34use async_lock::{RwLock, RwLockUpgradableReadGuardArc};
35use async_trait::async_trait;
36use hopr_async_runtime::prelude::{JoinHandle, spawn};
37use hopr_crypto_types::prelude::Hash;
38use hopr_db_sql::{
39    api::tickets::{AggregationPrerequisites, HoprDbTicketOperations},
40    channels::HoprDbChannelOperations,
41};
42use hopr_internal_types::prelude::*;
43#[cfg(all(feature = "prometheus", not(test)))]
44use hopr_metrics::metrics::SimpleCounter;
45use hopr_transport_ticket_aggregation::TicketAggregatorTrait;
46use serde::{Deserialize, Serialize};
47use serde_with::serde_as;
48use tracing::{debug, error, info, warn};
49use validator::Validate;
50
51use crate::{Strategy, strategy::SingularStrategy};
52
53#[cfg(all(feature = "prometheus", not(test)))]
54lazy_static::lazy_static! {
55    static ref METRIC_COUNT_AGGREGATIONS: SimpleCounter =
56        SimpleCounter::new("hopr_strategy_aggregating_aggregation_count", "Count of initiated automatic aggregations").unwrap();
57}
58
59use hopr_platform::time::native::current_time;
60
61const MAX_AGGREGATABLE_TICKET_COUNT: u32 = hopr_db_sql::tickets::MAX_TICKETS_TO_AGGREGATE_BATCH as u32;
62
63#[inline]
64fn default_aggregation_threshold() -> Option<u32> {
65    Some(250)
66}
67
68#[inline]
69fn just_true() -> bool {
70    true
71}
72
73#[inline]
74fn default_unrealized_balance_ratio() -> Option<f32> {
75    Some(0.9)
76}
77
78/// Configuration object for the `AggregatingStrategy`
79#[serde_as]
80#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
81pub struct AggregatingStrategyConfig {
82    /// Number of acknowledged winning tickets in a channel that triggers the ticket aggregation
83    /// in that channel when exceeded.
84    ///
85    /// This condition is independent of `unrealized_balance_ratio`.
86    ///
87    /// Default is 250.
88    #[validate(range(min = 2, max = MAX_AGGREGATABLE_TICKET_COUNT))]
89    #[serde(default = "default_aggregation_threshold")]
90    #[default(default_aggregation_threshold())]
91    pub aggregation_threshold: Option<u32>,
92
93    /// Percentage of unrealized balance in unaggregated tickets in a channel
94    /// that triggers the ticket aggregation when exceeded.
95    ///
96    /// The unrealized balance in this case is the proportion of the channel balance allocated in unredeemed
97    /// unaggregated tickets. This condition is independent of `aggregation_threshold`.
98    ///
99    /// Default is 0.9
100    #[validate(range(min = 0_f32, max = 1.0_f32))]
101    #[default(default_unrealized_balance_ratio())]
102    pub unrealized_balance_ratio: Option<f32>,
103
104    /// If set, the strategy will automatically aggregate tickets in channel that has transitioned
105    /// to the `PendingToClose` state.
106    ///
107    /// This happens regardless if `aggregation_threshold` or `unrealized_balance_ratio` thresholds are met on that
108    /// channel. If the aggregation on-close fails, the tickets are automatically sent for redeeming instead.
109    ///
110    /// Default is true.
111    #[default(just_true())]
112    pub aggregate_on_channel_close: bool,
113}
114
115impl From<AggregatingStrategyConfig> for AggregationPrerequisites {
116    fn from(value: AggregatingStrategyConfig) -> Self {
117        AggregationPrerequisites {
118            min_ticket_count: value.aggregation_threshold.map(|x| x as usize),
119            min_unaggregated_ratio: value.unrealized_balance_ratio.map(|x| x as f64),
120        }
121    }
122}
123
124/// Represents a strategy that starts aggregating tickets in a certain
125/// channel, once the number of acknowledged tickets in that channel goes
126/// above the given threshold.
127/// Optionally, the strategy can also redeem the aggregated ticket, if the aggregation
128/// was successful.
129pub struct AggregatingStrategy<Db>
130where
131    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
132{
133    db: Db,
134    ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
135    cfg: AggregatingStrategyConfig,
136    #[allow(clippy::type_complexity)]
137    agg_tasks: Arc<RwLock<HashMap<Hash, (bool, JoinHandle<()>)>>>,
138}
139
140impl<Db> Debug for AggregatingStrategy<Db>
141where
142    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
143{
144    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145        write!(f, "{:?}", Strategy::Aggregating(self.cfg))
146    }
147}
148
149impl<Db> Display for AggregatingStrategy<Db>
150where
151    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
152{
153    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154        write!(f, "{}", Strategy::Aggregating(self.cfg))
155    }
156}
157
158impl<Db> AggregatingStrategy<Db>
159where
160    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
161{
162    pub fn new(
163        cfg: AggregatingStrategyConfig,
164        db: Db,
165        ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
166    ) -> Self {
167        Self {
168            db,
169            cfg,
170            ticket_aggregator,
171            agg_tasks: Arc::new(RwLock::new(HashMap::new())),
172        }
173    }
174}
175
176impl<Db> AggregatingStrategy<Db>
177where
178    Db: HoprDbChannelOperations + HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug + 'static,
179{
180    async fn try_start_aggregation(
181        &self,
182        channel_id: Hash,
183        criteria: AggregationPrerequisites,
184    ) -> crate::errors::Result<()> {
185        if !self.is_strategy_aggregating_in_channel(channel_id).await {
186            debug!("checking aggregation in {channel_id} with criteria {criteria:?}...");
187
188            let agg_tasks_clone = self.agg_tasks.clone();
189            let aggregator_clone = self.ticket_aggregator.clone();
190            let (can_remove_tx, can_remove_rx) = futures::channel::oneshot::channel();
191            let task = spawn(async move {
192                match aggregator_clone.aggregate_tickets(&channel_id, criteria).await {
193                    Ok(_) => {
194                        debug!("tried ticket aggregation in channel {channel_id} without any issues");
195
196                        #[cfg(all(feature = "prometheus", not(test)))]
197                        METRIC_COUNT_AGGREGATIONS.increment();
198                    }
199                    Err(e) => {
200                        error!("cannot complete aggregation in channel {channel_id}: {e}");
201                    }
202                }
203
204                // Wait until we're added to the aggregation tasks table
205                let _ = can_remove_rx.await;
206                if let Some((done, _)) = agg_tasks_clone.write_arc().await.get_mut(&channel_id) {
207                    *done = true;
208                }
209            });
210
211            self.agg_tasks.write_arc().await.insert(channel_id, (false, task));
212            let _ = can_remove_tx.send(()); // Allow the task to mark itself as done
213        } else {
214            warn!(channel = %channel_id, "this strategy already aggregates in channel");
215        }
216
217        Ok(())
218    }
219
220    async fn is_strategy_aggregating_in_channel(&self, channel_id: Hash) -> bool {
221        let tasks_read_locked = self.agg_tasks.upgradable_read_arc().await;
222        let existing = tasks_read_locked.get(&channel_id).map(|(done, _)| *done);
223        if let Some(done) = existing {
224            // Task exists, check if it has been completed
225            if done {
226                let mut tasks_write_locked = RwLockUpgradableReadGuardArc::upgrade(tasks_read_locked).await;
227
228                if let Some((_, task)) = tasks_write_locked.remove(&channel_id) {
229                    // Task has been completed, remove it and await its join handle
230                    let _ = task.await;
231                    false
232                } else {
233                    // Should not happen, but means there's no more aggregation task for the channel
234                    false
235                }
236            } else {
237                // There's still a running aggregation task for this channel
238                true
239            }
240        } else {
241            // No aggregation task found for this channel
242            false
243        }
244    }
245}
246
247#[async_trait]
248impl<Db> SingularStrategy for AggregatingStrategy<Db>
249where
250    Db: HoprDbChannelOperations + HoprDbTicketOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
251{
252    async fn on_tick(&self) -> crate::errors::Result<()> {
253        let incoming = self
254            .db
255            .get_incoming_channels(None)
256            .await
257            .map_err(hopr_db_sql::api::errors::DbError::from)?
258            .into_iter()
259            .filter(|c| !c.closure_time_passed(current_time()))
260            .map(|c| c.get_id());
261
262        for channel_id in incoming {
263            if let Err(e) = self.try_start_aggregation(channel_id, self.cfg.into()).await {
264                debug!("skipped aggregation in channel {channel_id}: {e}");
265            }
266        }
267
268        Ok(())
269    }
270
271    async fn on_own_channel_changed(
272        &self,
273        channel: &ChannelEntry,
274        direction: ChannelDirection,
275        change: ChannelChange,
276    ) -> crate::errors::Result<()> {
277        if !self.cfg.aggregate_on_channel_close || direction != ChannelDirection::Incoming {
278            return Ok(());
279        }
280
281        if let ChannelChange::Status { left: old, right: new } = change {
282            if old != ChannelStatus::Open || !matches!(new, ChannelStatus::PendingToClose(_)) {
283                debug!("ignoring channel {channel} state change that's not in PendingToClose");
284                return Ok(());
285            }
286
287            info!(%channel, "going to aggregate tickets in channel because it transitioned to PendingToClose");
288
289            // On closing there must be at least 2 tickets to justify aggregation
290            let on_close_agg_prerequisites = AggregationPrerequisites {
291                min_ticket_count: Some(2),
292                min_unaggregated_ratio: None,
293            };
294
295            Ok(self
296                .try_start_aggregation(channel.get_id(), on_close_agg_prerequisites)
297                .await?)
298        } else {
299            Ok(())
300        }
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use std::{pin::pin, sync::Arc, time::Duration};
307
308    use anyhow::Context;
309    use futures::{FutureExt, StreamExt, pin_mut};
310    use hex_literal::hex;
311    use hopr_crypto_types::prelude::*;
312    use hopr_db_sql::{
313        HoprDbGeneralModelOperations, TargetDb,
314        accounts::HoprDbAccountOperations,
315        api::{info::DomainSeparator, tickets::HoprDbTicketOperations},
316        channels::HoprDbChannelOperations,
317        db::HoprDb,
318        errors::DbSqlError,
319        info::HoprDbInfoOperations,
320    };
321    use hopr_internal_types::prelude::*;
322    use hopr_primitive_types::prelude::*;
323    use hopr_transport_ticket_aggregation::{
324        AwaitingAggregator, TicketAggregationInteraction, TicketAggregationProcessed,
325    };
326    use lazy_static::lazy_static;
327    use tokio::time::timeout;
328    use tracing::{debug, error};
329
330    use crate::{
331        aggregating::{MAX_AGGREGATABLE_TICKET_COUNT, default_aggregation_threshold},
332        strategy::SingularStrategy,
333    };
334
335    #[test]
336    fn default_ticket_aggregation_count_is_lower_than_maximum_allowed_ticket_count() -> anyhow::Result<()> {
337        assert!(default_aggregation_threshold().unwrap() < MAX_AGGREGATABLE_TICKET_COUNT);
338
339        Ok(())
340    }
341
342    lazy_static! {
343        static ref PEERS: Vec<OffchainKeypair> = [
344            hex!("b91a28ff9840e9c93e5fafd581131f0b9f33f3e61b02bf5dd83458aa0221f572"),
345            hex!("82283757872f99541ce33a47b90c2ce9f64875abf08b5119a8a434b2fa83ea98")
346        ]
347        .iter()
348        .map(|private| OffchainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
349        .collect();
350        static ref PEERS_CHAIN: Vec<ChainKeypair> = [
351            hex!("51d3003d908045a4d76d0bfc0d84f6ff946b5934b7ea6a2958faf02fead4567a"),
352            hex!("e1f89073a01831d0eed9fe2c67e7d65c144b9d9945320f6d325b1cccc2d124e9")
353        ]
354        .iter()
355        .map(|private| ChainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
356        .collect();
357    }
358
359    fn mock_acknowledged_ticket(
360        signer: &ChainKeypair,
361        destination: &ChainKeypair,
362        index: u64,
363        index_offset: u32,
364    ) -> anyhow::Result<AcknowledgedTicket> {
365        let price_per_packet: U256 = 20_u32.into();
366        let ticket_win_prob = 1.0f64;
367
368        let channel_id = generate_channel_id(&signer.into(), &destination.into());
369
370        let channel_epoch = 1u64;
371        let domain_separator = Hash::default();
372
373        let response = Response::try_from(
374            Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
375        )?;
376
377        Ok(TicketBuilder::default()
378            .addresses(signer, destination)
379            .amount(price_per_packet.div_f64(ticket_win_prob)?)
380            .index(index)
381            .index_offset(index_offset)
382            .win_prob(ticket_win_prob.try_into()?)
383            .channel_epoch(1)
384            .challenge(response.to_challenge().into())
385            .build_signed(signer, &domain_separator)?
386            .into_acknowledged(response))
387    }
388
389    async fn populate_db_with_ack_tickets(
390        db: HoprDb,
391        amount: usize,
392    ) -> anyhow::Result<(Vec<AcknowledgedTicket>, ChannelEntry)> {
393        let db_clone = db.clone();
394        let (acked_tickets, total_value) = db
395            .begin_transaction_in_db(TargetDb::Tickets)
396            .await?
397            .perform(|tx| {
398                Box::pin(async move {
399                    let mut acked_tickets = Vec::new();
400                    let mut total_value = HoprBalance::zero();
401
402                    for i in 0..amount {
403                        let acked_ticket = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], i as u64, 1)
404                            .expect("should be able to create an ack ticket");
405                        debug!("inserting {acked_ticket}");
406
407                        db_clone.upsert_ticket(Some(tx), acked_ticket.clone()).await?;
408
409                        total_value += acked_ticket.verified_ticket().amount;
410                        acked_tickets.push(acked_ticket);
411                    }
412
413                    Ok::<_, DbSqlError>((acked_tickets, total_value))
414                })
415            })
416            .await?;
417
418        let channel = ChannelEntry::new(
419            (&PEERS_CHAIN[0]).into(),
420            (&PEERS_CHAIN[1]).into(),
421            total_value,
422            0_u32.into(),
423            ChannelStatus::Open,
424            1u32.into(),
425        );
426
427        Ok((acked_tickets, channel))
428    }
429
430    async fn init_db(db: HoprDb) -> anyhow::Result<()> {
431        let db_clone = db.clone();
432        db.begin_transaction()
433            .await?
434            .perform(|tx| {
435                Box::pin(async move {
436                    db_clone
437                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Hash::default())
438                        .await?;
439                    for i in 0..PEERS_CHAIN.len() {
440                        debug!(
441                            "linking {} with {}",
442                            PEERS[i].public(),
443                            PEERS_CHAIN[i].public().to_address()
444                        );
445                        db_clone
446                            .insert_account(
447                                Some(tx),
448                                AccountEntry {
449                                    public_key: *PEERS[i].public(),
450                                    chain_addr: PEERS_CHAIN[i].public().to_address(),
451                                    entry_type: AccountType::NotAnnounced,
452                                    published_at: 1,
453                                },
454                            )
455                            .await?;
456                    }
457                    Ok::<_, DbSqlError>(())
458                })
459            })
460            .await?;
461
462        Ok(())
463    }
464
465    fn spawn_aggregation_interaction(
466        db_alice: HoprDb,
467        db_bob: HoprDb,
468        key_alice: &ChainKeypair,
469        key_bob: &ChainKeypair,
470    ) -> anyhow::Result<(
471        AwaitingAggregator<(), (), HoprDb>,
472        futures::channel::oneshot::Receiver<()>,
473    )> {
474        let mut alice = TicketAggregationInteraction::<(), ()>::new(db_alice, key_alice);
475        let mut bob = TicketAggregationInteraction::<(), ()>::new(db_bob.clone(), key_bob);
476
477        let (tx, awaiter) = futures::channel::oneshot::channel::<()>();
478        let bob_aggregator = bob.writer();
479
480        tokio::task::spawn(async move {
481            let mut finalizer = None;
482
483            match bob.next().await {
484                Some(TicketAggregationProcessed::Send(_, acked_tickets, request_finalizer)) => {
485                    let _ = finalizer.insert(request_finalizer);
486                    match alice.writer().receive_aggregation_request(
487                        PEERS[1].public().into(),
488                        acked_tickets.into_iter().collect(),
489                        (),
490                    ) {
491                        Ok(_) => {}
492                        Err(e) => error!(error = %e, "Failed to received aggregation ticket"),
493                    }
494                }
495                //  alice.ack_event_queue.0.start_send(super::TicketAggregationToProcess::ToProcess(destination,
496                // acked_tickets)),
497                _ => panic!("unexpected action happened"),
498            };
499
500            match alice.next().await {
501                Some(TicketAggregationProcessed::Reply(_, aggregated_ticket, ())) => {
502                    match bob
503                        .writer()
504                        .receive_ticket(PEERS[0].public().into(), aggregated_ticket, ())
505                    {
506                        Ok(_) => {}
507                        Err(e) => error!(error = %e, "Failed to receive a ticket"),
508                    }
509                }
510
511                _ => panic!("unexpected action happened"),
512            };
513
514            match bob.next().await {
515                Some(TicketAggregationProcessed::Receive(_destination, _ticket, ())) => (),
516                _ => panic!("unexpected action happened"),
517            };
518
519            finalizer.expect("should have a value present").finalize();
520            let _ = tx.send(());
521        });
522
523        Ok((
524            AwaitingAggregator::new(db_bob, bob_aggregator, Duration::from_secs(5)),
525            awaiter,
526        ))
527    }
528
529    #[tokio::test]
530    async fn test_strategy_aggregation_on_tick() -> anyhow::Result<()> {
531        // db_0: Alice (channel source)
532        // db_1: Bob (channel destination)
533        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
534        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
535
536        init_db(db_alice.clone()).await?;
537        init_db(db_bob.clone()).await?;
538
539        let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
540        db_bob.start_ticket_processing(bob_notify_tx.into())?;
541
542        let (_, channel) = populate_db_with_ack_tickets(db_bob.clone(), 5).await?;
543
544        db_alice.upsert_channel(None, channel).await?;
545        db_bob.upsert_channel(None, channel).await?;
546
547        let (bob_aggregator, awaiter) =
548            spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
549
550        let cfg = super::AggregatingStrategyConfig {
551            aggregation_threshold: Some(5),
552            unrealized_balance_ratio: None,
553            aggregate_on_channel_close: false,
554        };
555
556        let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
557
558        // let threshold_ticket = acked_tickets.last().unwrap();
559        aggregation_strategy.on_tick().await?;
560
561        // Wait until aggregation has finished
562        let f1 = pin!(awaiter);
563        let f2 = pin!(tokio::time::sleep(Duration::from_secs(5)).fuse());
564        let _ = futures::future::select(f1, f2).await;
565
566        pin_mut!(bob_notify_rx);
567        let notified_ticket = bob_notify_rx.next().await.expect("should have a ticket");
568
569        let tickets = db_bob.get_tickets((&channel).into()).await?;
570        assert_eq!(tickets.len(), 1, "there should be a single aggregated ticket");
571        assert_eq!(notified_ticket, tickets[0]);
572
573        Ok(())
574    }
575
576    #[tokio::test]
577    async fn test_strategy_aggregation_on_tick_when_unrealized_balance_exceeded() -> anyhow::Result<()> {
578        // db_0: Alice (channel source)
579        // db_1: Bob (channel destination)
580        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
581        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
582
583        init_db(db_alice.clone()).await?;
584        init_db(db_bob.clone()).await?;
585
586        let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
587        db_bob.start_ticket_processing(bob_notify_tx.into())?;
588
589        let (_, channel) = populate_db_with_ack_tickets(db_bob.clone(), 4).await?;
590
591        db_alice.upsert_channel(None, channel).await?;
592        db_bob.upsert_channel(None, channel).await?;
593
594        let (bob_aggregator, awaiter) =
595            spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
596
597        let cfg = super::AggregatingStrategyConfig {
598            aggregation_threshold: None,
599            unrealized_balance_ratio: Some(0.75),
600            aggregate_on_channel_close: false,
601        };
602
603        let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
604
605        // let threshold_ticket = acked_tickets.last().unwrap();
606        aggregation_strategy.on_tick().await?;
607
608        // Wait until aggregation has finished
609        let f1 = pin!(awaiter);
610        let f2 = pin!(tokio::time::sleep(Duration::from_secs(5)));
611        let _ = futures::future::select(f1, f2).await;
612
613        pin_mut!(bob_notify_rx);
614        let notified_ticket = bob_notify_rx.next().await.expect("should have a ticket");
615
616        let tickets = db_bob.get_tickets((&channel).into()).await?;
617        assert_eq!(tickets.len(), 1, "there should be a single aggregated ticket");
618        assert_eq!(notified_ticket, tickets[0]);
619
620        Ok(())
621    }
622
623    #[tokio::test]
624    async fn test_strategy_aggregation_on_tick_should_not_agg_when_unrealized_balance_exceeded_via_aggregated_tickets()
625    -> anyhow::Result<()> {
626        // db_0: Alice (channel source)
627        // db_1: Bob (channel destination)
628        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
629        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
630
631        init_db(db_alice.clone()).await?;
632        init_db(db_bob.clone()).await?;
633
634        db_bob.start_ticket_processing(None)?;
635
636        const NUM_TICKETS: usize = 4;
637        let (mut acked_tickets, mut channel) = populate_db_with_ack_tickets(db_bob.clone(), NUM_TICKETS).await?;
638
639        let (bob_aggregator, awaiter) =
640            spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
641
642        // Make this ticket aggregated
643        acked_tickets[0] = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], 0, 2)?;
644
645        debug!("upserting {}", acked_tickets[0]);
646        db_bob.upsert_ticket(None, acked_tickets[0].clone()).await?;
647
648        let tickets = db_bob.get_tickets((&channel).into()).await?;
649        assert_eq!(tickets.len(), NUM_TICKETS, "nothing should be aggregated");
650
651        channel.balance = HoprBalance::from(100_u32);
652
653        db_alice.upsert_channel(None, channel).await?;
654        db_bob.upsert_channel(None, channel).await?;
655
656        let cfg = super::AggregatingStrategyConfig {
657            aggregation_threshold: None,
658            unrealized_balance_ratio: Some(0.75),
659            aggregate_on_channel_close: false,
660        };
661
662        let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
663
664        // let threshold_ticket = acked_tickets.last().unwrap();
665        aggregation_strategy.on_tick().await?;
666
667        let tickets = db_bob.get_tickets((&channel).into()).await?;
668        assert_eq!(tickets.len(), NUM_TICKETS, "nothing should be aggregated");
669        std::mem::drop(awaiter);
670
671        Ok(())
672    }
673
674    #[tokio::test]
675    async fn test_strategy_aggregation_on_channel_close() -> anyhow::Result<()> {
676        // db_0: Alice (channel source)
677        // db_1: Bob (channel destination)
678        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
679        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
680
681        init_db(db_alice.clone()).await?;
682        init_db(db_bob.clone()).await?;
683
684        let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
685        db_bob.start_ticket_processing(bob_notify_tx.into())?;
686
687        let (_, mut channel) = populate_db_with_ack_tickets(db_bob.clone(), 5).await?;
688
689        let cfg = super::AggregatingStrategyConfig {
690            aggregation_threshold: Some(100),
691            unrealized_balance_ratio: None,
692            aggregate_on_channel_close: true,
693        };
694
695        channel.status = ChannelStatus::PendingToClose(std::time::SystemTime::now());
696
697        db_alice.upsert_channel(None, channel).await?;
698        db_bob.upsert_channel(None, channel).await?;
699
700        let (bob_aggregator, awaiter) =
701            spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
702
703        let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_alice.clone(), Arc::new(bob_aggregator));
704
705        aggregation_strategy
706            .on_own_channel_changed(
707                &channel,
708                ChannelDirection::Incoming,
709                ChannelChange::Status {
710                    left: ChannelStatus::Open,
711                    right: ChannelStatus::PendingToClose(std::time::SystemTime::now()),
712                },
713            )
714            .await?;
715
716        // Wait until aggregation has finished
717        timeout(Duration::from_secs(5), awaiter).await.context("Timeout")??;
718
719        pin_mut!(bob_notify_rx);
720        let notified_ticket = bob_notify_rx.next().await.expect("should have a ticket");
721
722        let tickets = db_bob.get_tickets((&channel).into()).await?;
723        assert_eq!(tickets.len(), 1, "there should be a single aggregated ticket");
724        assert_eq!(notified_ticket, tickets[0]);
725
726        Ok(())
727    }
728
729    #[tokio::test]
730    async fn test_strategy_aggregation_on_tick_should_not_agg_on_channel_close_if_only_single_ticket()
731    -> anyhow::Result<()> {
732        // db_0: Alice (channel source)
733        // db_1: Bob (channel destination)
734        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
735        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
736
737        init_db(db_alice.clone()).await?;
738        init_db(db_bob.clone()).await?;
739
740        db_bob.start_ticket_processing(None)?;
741
742        const NUM_TICKETS: usize = 1;
743        let (_, channel) = populate_db_with_ack_tickets(db_bob.clone(), NUM_TICKETS).await?;
744
745        let (bob_aggregator, awaiter) =
746            spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
747
748        let tickets = db_bob.get_tickets((&channel).into()).await?;
749        assert_eq!(tickets.len(), NUM_TICKETS, "should have a single ticket");
750
751        db_alice.upsert_channel(None, channel).await?;
752        db_bob.upsert_channel(None, channel).await?;
753
754        let cfg = super::AggregatingStrategyConfig {
755            aggregation_threshold: Some(100),
756            unrealized_balance_ratio: Some(0.75),
757            aggregate_on_channel_close: true,
758        };
759
760        let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
761
762        aggregation_strategy
763            .on_own_channel_changed(
764                &channel,
765                ChannelDirection::Incoming,
766                ChannelChange::Status {
767                    left: ChannelStatus::Open,
768                    right: ChannelStatus::PendingToClose(std::time::SystemTime::now()),
769                },
770            )
771            .await?;
772
773        timeout(Duration::from_millis(500), awaiter)
774            .await
775            .expect_err("should timeout");
776
777        let tickets = db_bob.get_tickets((&channel).into()).await?;
778        assert_eq!(tickets.len(), NUM_TICKETS, "nothing should be aggregated");
779        Ok(())
780    }
781}