hopr_strategy/
aggregating.rs

1//! ## Aggregating Strategy
2//! This strategy automates ticket aggregation on different channel/ticket events.
3//! Note that the aggregating strategy can be combined with the Auto Redeem Strategy above.
4//!
5//! Ticket aggregation is an interactive process and requires cooperation of the ticket issuer, the aggregation
6//! will fail if the aggregation takes more than `aggregation_timeout` (in seconds). This does not affect runtime of the
7//! strategy, since the ticket aggregation and awaiting it is performed on a separate thread.
8//!
9//! This strategy listens for two distinct channel events and triggers the interactive aggregation based on different criteria:
10//!
11//! ### 1) New winning acknowledged ticket event
12//!
13//! This strategy listens to newly added acknowledged winning tickets and once the amount of tickets in a certain channel reaches
14//! an `aggregation_threshold`, the strategy will initiate ticket aggregation in that channel.
15//! The strategy can independently also check if the unrealized balance (current balance _minus_ total unredeemed unaggregated tickets value) in a certain channel
16//! has not gone over `unrelalized_balance_ratio` percent of the current balance in that channel. If that happens, the strategy will also initiate
17//! ticket aggregation.
18//!
19//! ### 2) Channel transition from `Open` to `PendingToClose` event
20//!
21//! If the `aggregate_on_channel_close` flag is set, the aggregation will be triggered once a channel transitions from `Open` to `PendingToClose` state.
22//! This behavior does not have any additional criteria, unlike in the previous event, but there must be at least 2 tickets in the channel.
23//!
24//!
25//! For details on default parameters see [AggregatingStrategyConfig].
26use async_lock::RwLock;
27use async_trait::async_trait;
28use serde::{Deserialize, Serialize};
29use serde_with::serde_as;
30use std::collections::HashMap;
31use std::fmt::{Debug, Display, Formatter};
32use std::sync::Arc;
33use tracing::{debug, error, info, warn};
34use validator::Validate;
35
36use hopr_async_runtime::prelude::{spawn, JoinHandle};
37use hopr_crypto_types::prelude::Hash;
38use hopr_db_sql::api::tickets::{AggregationPrerequisites, HoprDbTicketOperations};
39use hopr_db_sql::channels::HoprDbChannelOperations;
40use hopr_internal_types::prelude::*;
41use hopr_transport_protocol::ticket_aggregation::processor::TicketAggregatorTrait;
42
43use crate::{strategy::SingularStrategy, Strategy};
44
45#[cfg(all(feature = "prometheus", not(test)))]
46use hopr_metrics::metrics::SimpleCounter;
47
48#[cfg(all(feature = "prometheus", not(test)))]
49lazy_static::lazy_static! {
50    static ref METRIC_COUNT_AGGREGATIONS: SimpleCounter =
51        SimpleCounter::new("hopr_strategy_aggregating_aggregation_count", "Count of initiated automatic aggregations").unwrap();
52}
53
54use hopr_platform::time::native::current_time;
55
56const MAX_AGGREGATABLE_TICKET_COUNT: u32 = hopr_db_sql::tickets::MAX_TICKETS_TO_AGGREGATE_BATCH as u32;
57
58#[inline]
59fn default_aggregation_threshold() -> Option<u32> {
60    Some(250)
61}
62
63#[inline]
64fn just_true() -> bool {
65    true
66}
67
68#[inline]
69fn default_unrealized_balance_ratio() -> Option<f32> {
70    Some(0.9)
71}
72
73/// Configuration object for the `AggregatingStrategy`
74#[serde_as]
75#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Validate, Serialize, Deserialize)]
76pub struct AggregatingStrategyConfig {
77    /// Number of acknowledged winning tickets in a channel that triggers the ticket aggregation
78    /// in that channel when exceeded.
79    ///
80    /// This condition is independent of `unrealized_balance_ratio`.
81    ///
82    /// Default is 250.
83    #[validate(range(min = 2, max = MAX_AGGREGATABLE_TICKET_COUNT))]
84    #[serde(default = "default_aggregation_threshold")]
85    #[default(default_aggregation_threshold())]
86    pub aggregation_threshold: Option<u32>,
87
88    /// Percentage of unrealized balance in unaggregated tickets in a channel
89    /// that triggers the ticket aggregation when exceeded.
90    ///
91    /// The unrealized balance in this case is the proportion of the channel balance allocated in unredeemed unaggregated tickets.
92    /// This condition is independent of `aggregation_threshold`.
93    ///
94    /// Default is 0.9
95    #[validate(range(min = 0_f32, max = 1.0_f32))]
96    #[default(default_unrealized_balance_ratio())]
97    pub unrealized_balance_ratio: Option<f32>,
98
99    /// If set, the strategy will automatically aggregate tickets in channel that has transitioned
100    /// to the `PendingToClose` state.
101    ///
102    /// This happens regardless if `aggregation_threshold` or `unrealized_balance_ratio` thresholds are met on that channel.
103    /// If the aggregation on-close fails, the tickets are automatically sent for redeeming instead.
104    ///
105    /// Default is true.
106    #[default(just_true())]
107    pub aggregate_on_channel_close: bool,
108}
109
110impl From<AggregatingStrategyConfig> for AggregationPrerequisites {
111    fn from(value: AggregatingStrategyConfig) -> Self {
112        AggregationPrerequisites {
113            min_ticket_count: value.aggregation_threshold.map(|x| x as usize),
114            min_unaggregated_ratio: value.unrealized_balance_ratio.map(|x| x as f64),
115        }
116    }
117}
118
119/// Represents a strategy that starts aggregating tickets in a certain
120/// channel, once the number of acknowledged tickets in that channel goes
121/// above the given threshold.
122/// Optionally, the strategy can also redeem the aggregated ticket, if the aggregation
123/// was successful.
124pub struct AggregatingStrategy<Db>
125where
126    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
127{
128    db: Db,
129    ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
130    cfg: AggregatingStrategyConfig,
131    #[allow(clippy::type_complexity)]
132    agg_tasks: Arc<RwLock<HashMap<Hash, (bool, JoinHandle<()>)>>>,
133}
134
135impl<Db> Debug for AggregatingStrategy<Db>
136where
137    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
138{
139    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
140        write!(f, "{:?}", Strategy::Aggregating(self.cfg))
141    }
142}
143
144impl<Db> Display for AggregatingStrategy<Db>
145where
146    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
147{
148    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
149        write!(f, "{}", Strategy::Aggregating(self.cfg))
150    }
151}
152
153impl<Db> AggregatingStrategy<Db>
154where
155    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
156{
157    pub fn new(
158        cfg: AggregatingStrategyConfig,
159        db: Db,
160        ticket_aggregator: Arc<dyn TicketAggregatorTrait + Send + Sync + 'static>,
161    ) -> Self {
162        Self {
163            db,
164            cfg,
165            ticket_aggregator,
166            agg_tasks: Arc::new(RwLock::new(HashMap::new())),
167        }
168    }
169}
170
171impl<Db> AggregatingStrategy<Db>
172where
173    Db: HoprDbChannelOperations + HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug + 'static,
174{
175    async fn try_start_aggregation(
176        &self,
177        channel_id: Hash,
178        criteria: AggregationPrerequisites,
179    ) -> crate::errors::Result<()> {
180        if !self.is_strategy_aggregating_in_channel(channel_id).await {
181            debug!("checking aggregation in {channel_id} with criteria {criteria:?}...");
182
183            let agg_tasks_clone = self.agg_tasks.clone();
184            let aggregator_clone = self.ticket_aggregator.clone();
185            let (can_remove_tx, can_remove_rx) = futures::channel::oneshot::channel();
186            let task = spawn(async move {
187                match aggregator_clone.aggregate_tickets(&channel_id, criteria).await {
188                    Ok(_) => {
189                        debug!("tried ticket aggregation in channel {channel_id} without any issues");
190
191                        #[cfg(all(feature = "prometheus", not(test)))]
192                        METRIC_COUNT_AGGREGATIONS.increment();
193                    }
194                    Err(e) => {
195                        error!("cannot complete aggregation in channel {channel_id}: {e}");
196                    }
197                }
198
199                // Wait until we're added to the aggregation tasks table
200                let _ = can_remove_rx.await;
201                if let Some((done, _)) = agg_tasks_clone.write().await.get_mut(&channel_id) {
202                    *done = true;
203                }
204            });
205
206            self.agg_tasks.write().await.insert(channel_id, (false, task));
207            let _ = can_remove_tx.send(()); // Allow the task to mark itself as done
208        } else {
209            warn!(channel = %channel_id, "this strategy already aggregates in channel");
210        }
211
212        Ok(())
213    }
214
215    async fn is_strategy_aggregating_in_channel(&self, channel_id: Hash) -> bool {
216        let existing = self.agg_tasks.read().await.get(&channel_id).map(|(done, _)| *done);
217        if let Some(done) = existing {
218            // Task exists, check if it has been completed
219            if done {
220                if let Some((_, task)) = self.agg_tasks.write().await.remove(&channel_id) {
221                    // Task has been completed, remove it and await its join handle
222                    let _ = task.await;
223                    false
224                } else {
225                    // Should not happen, but means there's no more aggregation task for the channel
226                    false
227                }
228            } else {
229                // There's still a running aggregation task for this channel
230                true
231            }
232        } else {
233            // No aggregation task found for this channel
234            false
235        }
236    }
237}
238
239#[async_trait]
240impl<Db> SingularStrategy for AggregatingStrategy<Db>
241where
242    Db: HoprDbChannelOperations + HoprDbTicketOperations + Clone + Send + Sync + std::fmt::Debug + 'static,
243{
244    async fn on_tick(&self) -> crate::errors::Result<()> {
245        let incoming = self
246            .db
247            .get_incoming_channels(None)
248            .await
249            .map_err(hopr_db_sql::api::errors::DbError::from)?
250            .into_iter()
251            .filter(|c| !c.closure_time_passed(current_time()))
252            .map(|c| c.get_id());
253
254        for channel_id in incoming {
255            if let Err(e) = self.try_start_aggregation(channel_id, self.cfg.into()).await {
256                debug!("skipped aggregation in channel {channel_id}: {e}");
257            }
258        }
259
260        Ok(())
261    }
262
263    async fn on_own_channel_changed(
264        &self,
265        channel: &ChannelEntry,
266        direction: ChannelDirection,
267        change: ChannelChange,
268    ) -> crate::errors::Result<()> {
269        if !self.cfg.aggregate_on_channel_close || direction != ChannelDirection::Incoming {
270            return Ok(());
271        }
272
273        if let ChannelChange::Status { left: old, right: new } = change {
274            if old != ChannelStatus::Open || !matches!(new, ChannelStatus::PendingToClose(_)) {
275                debug!("ignoring channel {channel} state change that's not in PendingToClose");
276                return Ok(());
277            }
278
279            info!(%channel, "going to aggregate tickets in channel because it transitioned to PendingToClose");
280
281            // On closing there must be at least 2 tickets to justify aggregation
282            let on_close_agg_prerequisites = AggregationPrerequisites {
283                min_ticket_count: Some(2),
284                min_unaggregated_ratio: None,
285            };
286
287            Ok(self
288                .try_start_aggregation(channel.get_id(), on_close_agg_prerequisites)
289                .await?)
290        } else {
291            Ok(())
292        }
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use crate::aggregating::{default_aggregation_threshold, MAX_AGGREGATABLE_TICKET_COUNT};
299    use crate::strategy::SingularStrategy;
300    use anyhow::Context;
301    use async_std::prelude::FutureExt as AsyncStdFutureExt;
302    use futures::{pin_mut, FutureExt, StreamExt};
303    use hex_literal::hex;
304    use hopr_crypto_types::prelude::*;
305    use hopr_db_sql::accounts::HoprDbAccountOperations;
306    use hopr_db_sql::api::{info::DomainSeparator, tickets::HoprDbTicketOperations};
307    use hopr_db_sql::channels::HoprDbChannelOperations;
308    use hopr_db_sql::db::HoprDb;
309    use hopr_db_sql::errors::DbSqlError;
310    use hopr_db_sql::info::HoprDbInfoOperations;
311    use hopr_db_sql::{HoprDbGeneralModelOperations, TargetDb};
312    use hopr_internal_types::prelude::*;
313    use hopr_primitive_types::prelude::*;
314    use hopr_transport_protocol::ticket_aggregation::processor::{
315        AwaitingAggregator, TicketAggregationInteraction, TicketAggregationProcessed,
316    };
317    use lazy_static::lazy_static;
318    use std::ops::Add;
319    use std::pin::pin;
320    use std::sync::Arc;
321    use std::time::Duration;
322    use tracing::{debug, error};
323
324    #[test]
325    fn default_ticket_aggregation_count_is_lower_than_maximum_allowed_ticket_count() -> anyhow::Result<()> {
326        assert!(default_aggregation_threshold().unwrap() < MAX_AGGREGATABLE_TICKET_COUNT);
327
328        Ok(())
329    }
330
331    lazy_static! {
332        static ref PEERS: Vec<OffchainKeypair> = vec![
333            hex!("b91a28ff9840e9c93e5fafd581131f0b9f33f3e61b02bf5dd83458aa0221f572"),
334            hex!("82283757872f99541ce33a47b90c2ce9f64875abf08b5119a8a434b2fa83ea98")
335        ]
336        .iter()
337        .map(|private| OffchainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
338        .collect();
339        static ref PEERS_CHAIN: Vec<ChainKeypair> = vec![
340            hex!("51d3003d908045a4d76d0bfc0d84f6ff946b5934b7ea6a2958faf02fead4567a"),
341            hex!("e1f89073a01831d0eed9fe2c67e7d65c144b9d9945320f6d325b1cccc2d124e9"),
342        ]
343        .iter()
344        .map(|private| ChainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
345        .collect();
346    }
347
348    fn mock_acknowledged_ticket(
349        signer: &ChainKeypair,
350        destination: &ChainKeypair,
351        index: u64,
352        index_offset: u32,
353    ) -> anyhow::Result<AcknowledgedTicket> {
354        let price_per_packet: U256 = 20_u32.into();
355        let ticket_win_prob = 1.0f64;
356
357        let channel_id = generate_channel_id(&signer.into(), &destination.into());
358
359        let channel_epoch = 1u64;
360        let domain_separator = Hash::default();
361
362        let response = Response::try_from(
363            Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
364        )?;
365
366        Ok(TicketBuilder::default()
367            .addresses(signer, destination)
368            .amount(price_per_packet.div_f64(ticket_win_prob)?)
369            .index(index)
370            .index_offset(index_offset)
371            .win_prob(ticket_win_prob)
372            .channel_epoch(1)
373            .challenge(response.to_challenge().into())
374            .build_signed(signer, &domain_separator)?
375            .into_acknowledged(response))
376    }
377
378    async fn populate_db_with_ack_tickets(
379        db: HoprDb,
380        amount: usize,
381    ) -> anyhow::Result<(Vec<AcknowledgedTicket>, ChannelEntry)> {
382        let db_clone = db.clone();
383        let (acked_tickets, total_value) = db
384            .begin_transaction_in_db(TargetDb::Tickets)
385            .await?
386            .perform(|tx| {
387                Box::pin(async move {
388                    let mut acked_tickets = Vec::new();
389                    let mut total_value = Balance::zero(BalanceType::HOPR);
390
391                    for i in 0..amount {
392                        let acked_ticket = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], i as u64, 1)
393                            .expect("should be able to create an ack ticket");
394                        debug!("inserting {acked_ticket}");
395
396                        db_clone.upsert_ticket(Some(tx), acked_ticket.clone()).await?;
397
398                        total_value = total_value.add(&acked_ticket.verified_ticket().amount);
399                        acked_tickets.push(acked_ticket);
400                    }
401
402                    Ok::<_, DbSqlError>((acked_tickets, total_value))
403                })
404            })
405            .await?;
406
407        let channel = ChannelEntry::new(
408            (&PEERS_CHAIN[0]).into(),
409            (&PEERS_CHAIN[1]).into(),
410            total_value,
411            0_u32.into(),
412            ChannelStatus::Open,
413            1u32.into(),
414        );
415
416        Ok((acked_tickets, channel))
417    }
418
419    async fn init_db(db: HoprDb) -> anyhow::Result<()> {
420        let db_clone = db.clone();
421        db.begin_transaction()
422            .await?
423            .perform(|tx| {
424                Box::pin(async move {
425                    db_clone
426                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Hash::default())
427                        .await?;
428                    for i in 0..PEERS_CHAIN.len() {
429                        debug!(
430                            "linking {} with {}",
431                            PEERS[i].public(),
432                            PEERS_CHAIN[i].public().to_address()
433                        );
434                        db_clone
435                            .insert_account(
436                                Some(tx),
437                                AccountEntry::new(
438                                    *PEERS[i].public(),
439                                    PEERS_CHAIN[i].public().to_address(),
440                                    AccountType::NotAnnounced,
441                                ),
442                            )
443                            .await?;
444                    }
445                    Ok::<_, DbSqlError>(())
446                })
447            })
448            .await?;
449
450        Ok(())
451    }
452
453    fn spawn_aggregation_interaction(
454        db_alice: HoprDb,
455        db_bob: HoprDb,
456        key_alice: &ChainKeypair,
457        key_bob: &ChainKeypair,
458    ) -> anyhow::Result<(
459        AwaitingAggregator<(), (), HoprDb>,
460        futures::channel::oneshot::Receiver<()>,
461    )> {
462        let mut alice = TicketAggregationInteraction::<(), ()>::new(db_alice, key_alice);
463        let mut bob = TicketAggregationInteraction::<(), ()>::new(db_bob.clone(), key_bob);
464
465        let (tx, awaiter) = futures::channel::oneshot::channel::<()>();
466        let bob_aggregator = bob.writer();
467
468        async_std::task::spawn(async move {
469            let mut finalizer = None;
470
471            match bob.next().await {
472                Some(TicketAggregationProcessed::Send(_, acked_tickets, request_finalizer)) => {
473                    let _ = finalizer.insert(request_finalizer);
474                    match alice.writer().receive_aggregation_request(
475                        PEERS[1].public().into(),
476                        acked_tickets.into_iter().map(TransferableWinningTicket::from).collect(),
477                        (),
478                    ) {
479                        Ok(_) => {}
480                        Err(e) => error!(error = %e, "Failed to received aggregation ticket"),
481                    }
482                }
483                //  alice.ack_event_queue.0.start_send(super::TicketAggregationToProcess::ToProcess(destination, acked_tickets)),
484                _ => panic!("unexpected action happened"),
485            };
486
487            match alice.next().await {
488                Some(TicketAggregationProcessed::Reply(_, aggregated_ticket, ())) => {
489                    match bob
490                        .writer()
491                        .receive_ticket(PEERS[0].public().into(), aggregated_ticket, ())
492                    {
493                        Ok(_) => {}
494                        Err(e) => error!(error = %e, "Failed to receive a ticket"),
495                    }
496                }
497
498                _ => panic!("unexpected action happened"),
499            };
500
501            match bob.next().await {
502                Some(TicketAggregationProcessed::Receive(_destination, _ticket, ())) => (),
503                _ => panic!("unexpected action happened"),
504            };
505
506            finalizer.expect("should have a value present").finalize();
507            let _ = tx.send(());
508        });
509
510        Ok((
511            AwaitingAggregator::new(db_bob, bob_aggregator, Duration::from_secs(5)),
512            awaiter,
513        ))
514    }
515
516    #[async_std::test]
517    async fn test_strategy_aggregation_on_tick() -> anyhow::Result<()> {
518        // db_0: Alice (channel source)
519        // db_1: Bob (channel destination)
520        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
521        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
522
523        init_db(db_alice.clone()).await?;
524        init_db(db_bob.clone()).await?;
525
526        let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
527        db_bob.start_ticket_processing(bob_notify_tx.into())?;
528
529        let (_, channel) = populate_db_with_ack_tickets(db_bob.clone(), 5).await?;
530
531        db_alice.upsert_channel(None, channel).await?;
532        db_bob.upsert_channel(None, channel).await?;
533
534        let (bob_aggregator, awaiter) =
535            spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
536
537        let cfg = super::AggregatingStrategyConfig {
538            aggregation_threshold: Some(5),
539            unrealized_balance_ratio: None,
540            aggregate_on_channel_close: false,
541        };
542
543        let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
544
545        //let threshold_ticket = acked_tickets.last().unwrap();
546        aggregation_strategy.on_tick().await?;
547
548        // Wait until aggregation has finished
549        let f1 = pin!(awaiter);
550        let f2 = pin!(async_std::task::sleep(Duration::from_secs(5)).fuse());
551        let _ = futures::future::select(f1, f2).await;
552
553        pin_mut!(bob_notify_rx);
554        let notified_ticket = bob_notify_rx.next().await.expect("should have a ticket");
555
556        let tickets = db_bob.get_tickets((&channel).into()).await?;
557        assert_eq!(tickets.len(), 1, "there should be a single aggregated ticket");
558        assert_eq!(notified_ticket, tickets[0]);
559
560        Ok(())
561    }
562
563    #[async_std::test]
564    async fn test_strategy_aggregation_on_tick_when_unrealized_balance_exceeded() -> anyhow::Result<()> {
565        // db_0: Alice (channel source)
566        // db_1: Bob (channel destination)
567        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
568        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
569
570        init_db(db_alice.clone()).await?;
571        init_db(db_bob.clone()).await?;
572
573        let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
574        db_bob.start_ticket_processing(bob_notify_tx.into())?;
575
576        let (_, channel) = populate_db_with_ack_tickets(db_bob.clone(), 4).await?;
577
578        db_alice.upsert_channel(None, channel).await?;
579        db_bob.upsert_channel(None, channel).await?;
580
581        let (bob_aggregator, awaiter) =
582            spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
583
584        let cfg = super::AggregatingStrategyConfig {
585            aggregation_threshold: None,
586            unrealized_balance_ratio: Some(0.75),
587            aggregate_on_channel_close: false,
588        };
589
590        let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
591
592        //let threshold_ticket = acked_tickets.last().unwrap();
593        aggregation_strategy.on_tick().await?;
594
595        // Wait until aggregation has finished
596        let f1 = pin!(awaiter);
597        let f2 = pin!(async_std::task::sleep(Duration::from_secs(5)));
598        let _ = futures::future::select(f1, f2).await;
599
600        pin_mut!(bob_notify_rx);
601        let notified_ticket = bob_notify_rx.next().await.expect("should have a ticket");
602
603        let tickets = db_bob.get_tickets((&channel).into()).await?;
604        assert_eq!(tickets.len(), 1, "there should be a single aggregated ticket");
605        assert_eq!(notified_ticket, tickets[0]);
606
607        Ok(())
608    }
609
610    #[async_std::test]
611    async fn test_strategy_aggregation_on_tick_should_not_agg_when_unrealized_balance_exceeded_via_aggregated_tickets(
612    ) -> anyhow::Result<()> {
613        // db_0: Alice (channel source)
614        // db_1: Bob (channel destination)
615        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
616        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
617
618        init_db(db_alice.clone()).await?;
619        init_db(db_bob.clone()).await?;
620
621        db_bob.start_ticket_processing(None)?;
622
623        const NUM_TICKETS: usize = 4;
624        let (mut acked_tickets, mut channel) = populate_db_with_ack_tickets(db_bob.clone(), NUM_TICKETS).await?;
625
626        let (bob_aggregator, awaiter) =
627            spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
628
629        // Make this ticket aggregated
630        acked_tickets[0] = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], 0, 2)?;
631
632        debug!("upserting {}", acked_tickets[0]);
633        db_bob.upsert_ticket(None, acked_tickets[0].clone()).await?;
634
635        let tickets = db_bob.get_tickets((&channel).into()).await?;
636        assert_eq!(tickets.len(), NUM_TICKETS, "nothing should be aggregated");
637
638        channel.balance = Balance::new(100_u32, BalanceType::HOPR);
639
640        db_alice.upsert_channel(None, channel).await?;
641        db_bob.upsert_channel(None, channel).await?;
642
643        let cfg = super::AggregatingStrategyConfig {
644            aggregation_threshold: None,
645            unrealized_balance_ratio: Some(0.75),
646            aggregate_on_channel_close: false,
647        };
648
649        let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
650
651        //let threshold_ticket = acked_tickets.last().unwrap();
652        aggregation_strategy.on_tick().await?;
653
654        let tickets = db_bob.get_tickets((&channel).into()).await?;
655        assert_eq!(tickets.len(), NUM_TICKETS, "nothing should be aggregated");
656        std::mem::drop(awaiter);
657
658        Ok(())
659    }
660
661    #[async_std::test]
662    async fn test_strategy_aggregation_on_channel_close() -> anyhow::Result<()> {
663        // db_0: Alice (channel source)
664        // db_1: Bob (channel destination)
665        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
666        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
667
668        init_db(db_alice.clone()).await?;
669        init_db(db_bob.clone()).await?;
670
671        let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
672        db_bob.start_ticket_processing(bob_notify_tx.into())?;
673
674        let (_, mut channel) = populate_db_with_ack_tickets(db_bob.clone(), 5).await?;
675
676        let cfg = super::AggregatingStrategyConfig {
677            aggregation_threshold: Some(100),
678            unrealized_balance_ratio: None,
679            aggregate_on_channel_close: true,
680        };
681
682        channel.status = ChannelStatus::PendingToClose(std::time::SystemTime::now());
683
684        db_alice.upsert_channel(None, channel).await?;
685        db_bob.upsert_channel(None, channel).await?;
686
687        let (bob_aggregator, awaiter) =
688            spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
689
690        let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_alice.clone(), Arc::new(bob_aggregator));
691
692        aggregation_strategy
693            .on_own_channel_changed(
694                &channel,
695                ChannelDirection::Incoming,
696                ChannelChange::Status {
697                    left: ChannelStatus::Open,
698                    right: ChannelStatus::PendingToClose(std::time::SystemTime::now()),
699                },
700            )
701            .await?;
702
703        // Wait until aggregation has finished
704        awaiter.timeout(Duration::from_secs(5)).await.context("Timeout")??;
705
706        pin_mut!(bob_notify_rx);
707        let notified_ticket = bob_notify_rx.next().await.expect("should have a ticket");
708
709        let tickets = db_bob.get_tickets((&channel).into()).await?;
710        assert_eq!(tickets.len(), 1, "there should be a single aggregated ticket");
711        assert_eq!(notified_ticket, tickets[0]);
712
713        Ok(())
714    }
715
716    #[async_std::test]
717    async fn test_strategy_aggregation_on_tick_should_not_agg_on_channel_close_if_only_single_ticket(
718    ) -> anyhow::Result<()> {
719        // db_0: Alice (channel source)
720        // db_1: Bob (channel destination)
721        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
722        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
723
724        init_db(db_alice.clone()).await?;
725        init_db(db_bob.clone()).await?;
726
727        db_bob.start_ticket_processing(None)?;
728
729        const NUM_TICKETS: usize = 1;
730        let (_, channel) = populate_db_with_ack_tickets(db_bob.clone(), NUM_TICKETS).await?;
731
732        let (bob_aggregator, awaiter) =
733            spawn_aggregation_interaction(db_alice.clone(), db_bob.clone(), &PEERS_CHAIN[0], &PEERS_CHAIN[1])?;
734
735        let tickets = db_bob.get_tickets((&channel).into()).await?;
736        assert_eq!(tickets.len(), NUM_TICKETS, "should have a single ticket");
737
738        db_alice.upsert_channel(None, channel).await?;
739        db_bob.upsert_channel(None, channel).await?;
740
741        let cfg = super::AggregatingStrategyConfig {
742            aggregation_threshold: Some(100),
743            unrealized_balance_ratio: Some(0.75),
744            aggregate_on_channel_close: true,
745        };
746
747        let aggregation_strategy = super::AggregatingStrategy::new(cfg, db_bob.clone(), Arc::new(bob_aggregator));
748
749        aggregation_strategy
750            .on_own_channel_changed(
751                &channel,
752                ChannelDirection::Incoming,
753                ChannelChange::Status {
754                    left: ChannelStatus::Open,
755                    right: ChannelStatus::PendingToClose(std::time::SystemTime::now()),
756                },
757            )
758            .await?;
759
760        awaiter
761            .timeout(Duration::from_millis(500))
762            .await
763            .expect_err("should timeout");
764
765        let tickets = db_bob.get_tickets((&channel).into()).await?;
766        assert_eq!(tickets.len(), NUM_TICKETS, "nothing should be aggregated");
767        Ok(())
768    }
769}