hopr_transport_ticket_aggregation/
lib.rs

1use std::{pin::Pin, task::Poll};
2
3use futures::{
4    channel::{
5        mpsc,
6        mpsc::{Receiver, Sender, UnboundedSender, channel},
7    },
8    future::{Either, poll_fn},
9    pin_mut,
10    stream::{Stream, StreamExt},
11};
12use hopr_async_runtime::prelude::{sleep, spawn};
13use hopr_crypto_types::prelude::*;
14use hopr_db_api::{
15    errors::DbError,
16    tickets::{AggregationPrerequisites, HoprDbTicketOperations},
17};
18use hopr_internal_types::prelude::*;
19use hopr_transport_identity::PeerId;
20use libp2p::request_response::{OutboundRequestId, ResponseChannel};
21use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
22use thiserror::Error;
23use tracing::{error, warn};
24
25/// Errors generated by the crate.
26#[derive(Error, Debug)]
27pub enum TicketAggregationError {
28    #[error("tx queue is full, retry later")]
29    Retry,
30
31    #[error("underlying transport error while sending packet: {0}")]
32    TransportError(String),
33
34    #[error("db error {0}")]
35    DatabaseError(#[from] hopr_db_api::errors::DbError),
36}
37
38/// Result used by the crate, based on the [`TicketAggregationError`] error type.
39pub type Result<T> = core::result::Result<T, TicketAggregationError>;
40
41#[cfg(all(feature = "prometheus", not(test)))]
42use hopr_metrics::metrics::SimpleCounter;
43
44#[cfg(all(feature = "prometheus", not(test)))]
45lazy_static::lazy_static! {
46    static ref METRIC_AGGREGATED_TICKETS: SimpleCounter = SimpleCounter::new(
47        "hopr_aggregated_tickets_count",
48        "Number of aggregated tickets"
49    )
50    .unwrap();
51    static ref METRIC_AGGREGATION_COUNT: SimpleCounter = SimpleCounter::new(
52        "hopr_aggregations_count",
53        "Number of performed ticket aggregations"
54    )
55    .unwrap();
56}
57
58// Default sizes of the acknowledgement queues
59pub const TICKET_AGGREGATION_TX_QUEUE_SIZE: usize = 2048;
60pub const TICKET_AGGREGATION_RX_QUEUE_SIZE: usize = 2048;
61
62/// The input to the processor background pipeline
63#[allow(clippy::type_complexity)] // TODO: The type needs to be significantly refactored to easily move around
64#[allow(clippy::large_enum_variant)] // TODO: refactor the large types used in the enum
65#[derive(Debug)]
66pub enum TicketAggregationToProcess<T, U> {
67    ToReceive(PeerId, std::result::Result<Ticket, String>, U),
68    ToProcess(PeerId, Vec<TransferableWinningTicket>, T),
69    ToSend(Hash, AggregationPrerequisites, TicketAggregationFinalizer),
70}
71
72/// Emitted by the processor background pipeline once processed
73#[allow(clippy::large_enum_variant)] // TODO: refactor the large types used in the enum
74#[derive(Debug)]
75pub enum TicketAggregationProcessed<T, U> {
76    Receive(PeerId, AcknowledgedTicket, U),
77    Reply(PeerId, std::result::Result<Ticket, String>, T),
78    Send(PeerId, Vec<TransferableWinningTicket>, TicketAggregationFinalizer),
79}
80
81#[async_trait::async_trait]
82pub trait TicketAggregatorTrait {
83    /// Pushes a new collection of tickets into the processing.
84    async fn aggregate_tickets(&self, channel: &Hash, prerequisites: AggregationPrerequisites) -> Result<()>;
85}
86
87#[derive(Debug)]
88pub struct AwaitingAggregator<T, U, Db>
89where
90    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
91    T: Send,
92    U: Send,
93{
94    db: Db,
95    writer: TicketAggregationActions<T, U>,
96    agg_timeout: std::time::Duration,
97}
98
99impl<T, U, Db> Clone for AwaitingAggregator<T, U, Db>
100where
101    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
102    T: Send,
103    U: Send,
104{
105    fn clone(&self) -> Self {
106        Self {
107            db: self.db.clone(),
108            writer: self.writer.clone(),
109            agg_timeout: self.agg_timeout,
110        }
111    }
112}
113
114impl<T, U, Db> AwaitingAggregator<T, U, Db>
115where
116    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
117    T: Send,
118    U: Send,
119{
120    pub fn new(db: Db, writer: TicketAggregationActions<T, U>, agg_timeout: std::time::Duration) -> Self {
121        Self {
122            db,
123            writer,
124            agg_timeout,
125        }
126    }
127}
128
129#[async_trait::async_trait]
130impl<T, U, Db> TicketAggregatorTrait for AwaitingAggregator<T, U, Db>
131where
132    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
133    T: Send,
134    U: Send,
135{
136    #[tracing::instrument(level = "debug", skip(self))]
137    async fn aggregate_tickets(&self, channel: &Hash, prerequisites: AggregationPrerequisites) -> Result<()> {
138        let awaiter = self.writer.clone().aggregate_tickets(channel, prerequisites)?;
139
140        if let Err(e) = awaiter.consume_and_wait(self.agg_timeout).await {
141            warn!(%channel, error = %e, "Error during ticket aggregation, performing a rollback");
142            self.db.rollback_aggregation_in_channel(*channel).await?;
143        }
144
145        Ok(())
146    }
147}
148
149#[derive(Debug)]
150pub struct TicketAggregationAwaiter {
151    rx: mpsc::UnboundedReceiver<()>,
152}
153
154impl From<mpsc::UnboundedReceiver<()>> for TicketAggregationAwaiter {
155    fn from(value: mpsc::UnboundedReceiver<()>) -> Self {
156        Self { rx: value }
157    }
158}
159
160impl TicketAggregationAwaiter {
161    pub async fn consume_and_wait(mut self, until_timeout: std::time::Duration) -> Result<()> {
162        let timeout = sleep(until_timeout);
163        let resolve = self.rx.next();
164
165        pin_mut!(resolve, timeout);
166        match futures::future::select(resolve, timeout).await {
167            Either::Left((result, _)) => result.ok_or(TicketAggregationError::TransportError("Canceled".to_owned())),
168            Either::Right(_) => Err(TicketAggregationError::TransportError(
169                "Timed out on sending a packet".to_owned(),
170            )),
171        }
172    }
173}
174
175#[derive(Debug, Clone)]
176pub struct TicketAggregationFinalizer {
177    tx: Option<UnboundedSender<()>>,
178}
179
180impl TicketAggregationFinalizer {
181    pub fn new(tx: UnboundedSender<()>) -> Self {
182        Self { tx: Some(tx) }
183    }
184
185    pub fn finalize(mut self) {
186        if let Some(sender) = self.tx.take() {
187            if sender.unbounded_send(()).is_err() {
188                error!("Failed to notify the awaiter about the successful ticket aggregation")
189            }
190        } else {
191            error!("Sender for packet send signalization is already spent")
192        }
193    }
194}
195
196/// External API for feeding Ticket Aggregation actions into the Ticket Aggregation
197/// processor processing the elements independently in the background.
198#[derive(Debug)]
199pub struct TicketAggregationActions<T, U> {
200    pub queue: Sender<TicketAggregationToProcess<T, U>>,
201}
202
203pub type BasicTicketAggregationActions<T> = TicketAggregationActions<ResponseChannel<T>, OutboundRequestId>;
204
205impl<T, U> Clone for TicketAggregationActions<T, U> {
206    /// Generic type requires a handwritten clone function
207    fn clone(&self) -> Self {
208        Self {
209            queue: self.queue.clone(),
210        }
211    }
212}
213
214impl<T, U> TicketAggregationActions<T, U> {
215    /// Pushes the aggregated ticket received from the transport layer into processing.
216    pub fn receive_ticket(
217        &mut self,
218        source: PeerId,
219        ticket: std::result::Result<Ticket, String>,
220        request: U,
221    ) -> Result<()> {
222        self.process(TicketAggregationToProcess::ToReceive(source, ticket, request))
223    }
224
225    /// Process the received aggregation request
226    pub fn receive_aggregation_request(
227        &mut self,
228        source: PeerId,
229        tickets: Vec<TransferableWinningTicket>,
230        request: T,
231    ) -> Result<()> {
232        self.process(TicketAggregationToProcess::ToProcess(source, tickets, request))
233    }
234
235    /// Pushes a new collection of tickets into the processing.
236    pub fn aggregate_tickets(
237        &mut self,
238        channel: &Hash,
239        prerequisites: AggregationPrerequisites,
240    ) -> Result<TicketAggregationAwaiter> {
241        let (tx, rx) = mpsc::unbounded::<()>();
242
243        self.process(TicketAggregationToProcess::ToSend(
244            *channel,
245            prerequisites,
246            TicketAggregationFinalizer::new(tx),
247        ))?;
248
249        Ok(rx.into())
250    }
251
252    fn process(&mut self, event: TicketAggregationToProcess<T, U>) -> Result<()> {
253        self.queue.try_send(event).map_err(|e| {
254            if e.is_full() {
255                TicketAggregationError::Retry
256            } else if e.is_disconnected() {
257                TicketAggregationError::TransportError("queue is closed".to_string())
258            } else {
259                TicketAggregationError::TransportError(format!("Unknown error: {e}"))
260            }
261        })
262    }
263}
264
265type AckEventQueue<T, U> = (
266    Sender<TicketAggregationToProcess<T, U>>,
267    Receiver<TicketAggregationProcessed<T, U>>,
268);
269
270/// Sets up processing of ticket aggregation interactions and returns relevant read and write mechanism.
271pub struct TicketAggregationInteraction<T, U>
272where
273    T: Send,
274    U: Send,
275{
276    ack_event_queue: AckEventQueue<T, U>,
277}
278
279impl<T: 'static, U: 'static> TicketAggregationInteraction<T, U>
280where
281    T: Send,
282    U: Send,
283{
284    /// Creates a new instance given the DB to process the ticket aggregation requests.
285    pub fn new<Db>(db: Db, chain_key: &ChainKeypair) -> Self
286    where
287        Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug + 'static,
288    {
289        let (processing_in_tx, processing_in_rx) = channel::<TicketAggregationToProcess<T, U>>(
290            TICKET_AGGREGATION_RX_QUEUE_SIZE + TICKET_AGGREGATION_TX_QUEUE_SIZE,
291        );
292        let (processing_out_tx, processing_out_rx) = channel::<TicketAggregationProcessed<T, U>>(
293            TICKET_AGGREGATION_RX_QUEUE_SIZE + TICKET_AGGREGATION_TX_QUEUE_SIZE,
294        );
295
296        let chain_key = chain_key.clone();
297
298        let mut processing_stream = processing_in_rx.then_concurrent(move |event| {
299            let chain_key = chain_key.clone();
300            let db = db.clone();
301            let mut processed_tx = processing_out_tx.clone();
302
303            async move {
304                let processed = match event {
305                    TicketAggregationToProcess::ToProcess(destination, acked_tickets, response) => {
306                        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
307                        let pubkey: std::result::Result<OffchainPublicKey, hopr_primitive_types::errors::GeneralError> =
308                            hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&destination))
309                                .await;
310
311                        match pubkey {
312                            Ok(opk) => {
313                                let count = acked_tickets.len();
314                                match db.aggregate_tickets(opk, acked_tickets, &chain_key).await {
315                                    Ok(ticket) => Some(TicketAggregationProcessed::Reply(
316                                        destination,
317                                        Ok(ticket.leak()),
318                                        response,
319                                    )),
320                                    Err(DbError::TicketAggregationError(e)) => {
321                                        // forward error to counterparty
322                                        Some(TicketAggregationProcessed::Reply(destination, Err(e), response))
323                                    }
324                                    Err(e) => {
325                                        error!(error = %e, %destination, count, "Dropping tickets aggregation request due to an error");
326                                        None
327                                    }
328                                }
329                            },
330                            Err(e) => {
331                                error!(
332                                    %destination, error = %e,
333                                    "Failed to aggregate tickets due to destination deserialization error from an offchain public key"
334                                );
335                                None
336                            }
337                        }
338                    }
339                    TicketAggregationToProcess::ToReceive(destination, aggregated_ticket, request) => {
340                        match aggregated_ticket {
341                            Ok(ticket) => match db.process_received_aggregated_ticket(ticket.clone(), &chain_key).await
342                            {
343                                Ok(acked_ticket) => {
344                                    Some(TicketAggregationProcessed::Receive(destination, acked_ticket, request))
345                                }
346                                Err(e) => {
347                                    error!(error = %e, counterparty = %destination, "Error while handling aggregated ticket");
348                                    None
349                                }
350                            },
351                            Err(e) => {
352                                warn!(error = %e, counterparty = %destination, "Counterparty refused to aggregate tickets");
353                                None
354                            }
355                        }
356                    }
357                    TicketAggregationToProcess::ToSend(channel, prerequsites, finalizer) => {
358                        match db.prepare_aggregation_in_channel(&channel, prerequsites).await {
359                            Ok(Some((source, tickets, _dest))) if !tickets.is_empty() => {
360                                #[cfg(all(feature = "prometheus", not(test)))]
361                                {
362                                    METRIC_AGGREGATED_TICKETS.increment_by(tickets.len() as u64);
363                                    METRIC_AGGREGATION_COUNT.increment();
364                                }
365
366                                Some(TicketAggregationProcessed::Send(source.into(), tickets, finalizer))
367                            }
368                            Err(e) => {
369                                error!(error = %e, "An error occured when preparing the channel aggregation");
370                                None
371                            }
372                            _ => {
373                                finalizer.finalize();
374                                None
375                            }
376                        }
377                    }
378                };
379
380                if let Some(event) = processed {
381                    match poll_fn(|cx| Pin::new(&mut processed_tx).poll_ready(cx)).await {
382                        Ok(_) => match processed_tx.start_send(event) {
383                            Ok(_) => {}
384                            Err(e) => error!(error = %e, "Failed to pass a processed ack message"),
385                        },
386                        Err(e) => {
387                            warn!(error = %e, "The receiver for processed ack no longer exists");
388                        }
389                    };
390                }
391            }
392        });
393
394        // NOTE: This spawned task does not need to be explicitly canceled, since it will
395        // be automatically dropped when the event sender object is dropped.
396        spawn(async move {
397            // poll the stream until it's done
398            while processing_stream.next().await.is_some() {}
399        });
400
401        Self {
402            ack_event_queue: (processing_in_tx, processing_out_rx),
403        }
404    }
405
406    pub fn writer(&self) -> TicketAggregationActions<T, U> {
407        TicketAggregationActions {
408            queue: self.ack_event_queue.0.clone(),
409        }
410    }
411}
412
413impl<T, U> Stream for TicketAggregationInteraction<T, U>
414where
415    T: Send,
416    U: Send,
417{
418    type Item = TicketAggregationProcessed<T, U>;
419
420    fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
421        Pin::new(self).ack_event_queue.1.poll_next_unpin(cx)
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use std::{ops::Mul, time::Duration};
428
429    use futures::{pin_mut, stream::StreamExt};
430    use hex_literal::hex;
431    use hopr_crypto_types::{
432        keypairs::{ChainKeypair, Keypair, OffchainKeypair},
433        types::{Hash, Response},
434    };
435    use hopr_db_sql::{
436        HoprDbGeneralModelOperations,
437        accounts::HoprDbAccountOperations,
438        api::{info::DomainSeparator, tickets::HoprDbTicketOperations},
439        channels::HoprDbChannelOperations,
440        db::HoprDb,
441        info::HoprDbInfoOperations,
442    };
443    use hopr_internal_types::prelude::*;
444    use hopr_primitive_types::prelude::*;
445    use lazy_static::lazy_static;
446    use tokio::time::timeout;
447
448    use super::TicketAggregationProcessed;
449
450    lazy_static! {
451        static ref PEERS: Vec<OffchainKeypair> = [
452            hex!("b91a28ff9840e9c93e5fafd581131f0b9f33f3e61b02bf5dd83458aa0221f572"),
453            hex!("82283757872f99541ce33a47b90c2ce9f64875abf08b5119a8a434b2fa83ea98")
454        ]
455        .iter()
456        .map(|private| OffchainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
457        .collect();
458        static ref PEERS_CHAIN: Vec<ChainKeypair> = [
459            hex!("51d3003d908045a4d76d0bfc0d84f6ff946b5934b7ea6a2958faf02fead4567a"),
460            hex!("e1f89073a01831d0eed9fe2c67e7d65c144b9d9945320f6d325b1cccc2d124e9")
461        ]
462        .iter()
463        .map(|private| ChainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
464        .collect();
465    }
466
467    fn mock_acknowledged_ticket(
468        signer: &ChainKeypair,
469        destination: &ChainKeypair,
470        index: u64,
471    ) -> anyhow::Result<AcknowledgedTicket> {
472        let price_per_packet: U256 = 10000000000000000u128.into();
473        let ticket_win_prob = 1.0f64;
474
475        let channel_id = generate_channel_id(&signer.into(), &destination.into());
476
477        let channel_epoch = 1u64;
478        let domain_separator = Hash::default();
479
480        let response = Response::try_from(
481            Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
482        )?;
483
484        Ok(TicketBuilder::default()
485            .addresses(signer, destination)
486            .amount(price_per_packet.div_f64(ticket_win_prob)?)
487            .index(index)
488            .index_offset(1)
489            .win_prob(ticket_win_prob.try_into()?)
490            .channel_epoch(1)
491            .challenge(response.to_challenge()?)
492            .build_signed(signer, &domain_separator)?
493            .into_acknowledged(response))
494    }
495
496    async fn init_db(db: HoprDb) -> anyhow::Result<()> {
497        let db_clone = db.clone();
498
499        let peers = PEERS.clone();
500        let peers_chain = PEERS_CHAIN.clone();
501
502        db.begin_transaction()
503            .await?
504            .perform(move |tx| {
505                Box::pin(async move {
506                    db_clone
507                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Hash::default())
508                        .await?;
509                    for (offchain, chain) in peers.iter().zip(peers_chain.iter()) {
510                        db_clone
511                            .insert_account(
512                                Some(tx),
513                                AccountEntry {
514                                    public_key: *offchain.public(),
515                                    chain_addr: chain.public().to_address(),
516                                    entry_type: AccountType::NotAnnounced,
517                                    published_at: 1,
518                                },
519                            )
520                            .await?
521                    }
522
523                    Ok::<(), hopr_db_sql::errors::DbSqlError>(())
524                })
525            })
526            .await?;
527
528        Ok(())
529    }
530
531    #[tokio::test]
532    async fn test_ticket_aggregation() -> anyhow::Result<()> {
533        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
534        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
535        init_db(db_alice.clone()).await?;
536        init_db(db_bob.clone()).await?;
537
538        const NUM_TICKETS: u64 = 30;
539
540        let mut tickets = vec![];
541        let mut agg_balance = HoprBalance::zero();
542        // Generate acknowledged tickets
543        for i in 1..=NUM_TICKETS {
544            let mut ack_ticket = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], i)?;
545
546            // Mark the first ticket as redeemed, so it does not enter the aggregation
547            if i == 1 {
548                ack_ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
549            } else {
550                agg_balance += ack_ticket.verified_ticket().amount;
551            }
552
553            tickets.push(ack_ticket)
554        }
555
556        let alice_addr: Address = (&PEERS_CHAIN[0]).into();
557        let bob_addr: Address = (&PEERS_CHAIN[1]).into();
558
559        let alice_packet_key = PEERS[0].public().into();
560        let bob_packet_key = PEERS[1].public().into();
561
562        let channel_alice_bob = ChannelEntry::new(
563            alice_addr,
564            bob_addr,
565            agg_balance.mul(10),
566            1_u32.into(),
567            ChannelStatus::Open,
568            1u32.into(),
569        );
570
571        db_alice.upsert_channel(None, channel_alice_bob).await?;
572        db_bob.upsert_channel(None, channel_alice_bob).await?;
573
574        for ticket in tickets.into_iter() {
575            db_bob.upsert_ticket(None, ticket).await?;
576        }
577
578        let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
579        db_bob.start_ticket_processing(bob_notify_tx.into())?;
580
581        let mut alice = super::TicketAggregationInteraction::<(), ()>::new(db_alice.clone(), &PEERS_CHAIN[0]);
582        let mut bob = super::TicketAggregationInteraction::<(), ()>::new(db_bob.clone(), &PEERS_CHAIN[1]);
583
584        let awaiter = bob
585            .writer()
586            .aggregate_tickets(&channel_alice_bob.get_id(), Default::default())?;
587
588        let mut finalizer = None;
589        match timeout(Duration::from_secs(5), bob.next()).await {
590            Ok(Some(TicketAggregationProcessed::Send(_, acked_tickets, request_finalizer))) => {
591                let _ = finalizer.insert(request_finalizer);
592                assert_eq!(
593                    NUM_TICKETS - 1,
594                    acked_tickets.len() as u64,
595                    "invalid number of tickets to aggregate"
596                );
597                alice
598                    .writer()
599                    .receive_aggregation_request(bob_packet_key, acked_tickets.into_iter().collect(), ())?;
600            }
601            _ => panic!("unexpected action happened while sending agg request by Bob"),
602        };
603
604        match timeout(Duration::from_secs(5), alice.next()).await {
605            Ok(Some(TicketAggregationProcessed::Reply(_, aggregated_ticket, ()))) => {
606                bob.writer().receive_ticket(alice_packet_key, aggregated_ticket, ())?
607            }
608            _ => panic!("unexpected action happened while awaiting agg request at Alice"),
609        };
610
611        match timeout(Duration::from_secs(5), bob.next()).await {
612            Ok(Some(TicketAggregationProcessed::Receive(_destination, _acked_tkt, ()))) => {
613                finalizer.take().expect("finalizer should be present").finalize()
614            }
615            _ => panic!("unexpected action happened while awaiting agg response at Bob"),
616        }
617
618        pin_mut!(bob_notify_rx);
619        bob_notify_rx
620            .next()
621            .await
622            .expect("bob should have received the ticket notification");
623
624        let stored_acked_tickets = db_bob.get_tickets((&channel_alice_bob).into()).await?;
625
626        assert_eq!(
627            stored_acked_tickets.len(),
628            2,
629            "there should be 1 aggregated ticket and 1 ticket being redeemed"
630        );
631
632        assert_eq!(
633            AcknowledgedTicketStatus::BeingRedeemed,
634            stored_acked_tickets[0].status,
635            "first ticket must be being redeemed"
636        );
637        assert!(
638            stored_acked_tickets[1].verified_ticket().is_aggregated(),
639            "last ticket must be the aggregated one"
640        );
641        assert_eq!(
642            AcknowledgedTicketStatus::Untouched,
643            stored_acked_tickets[1].status,
644            "second ticket must be untouched"
645        );
646        assert_eq!(
647            agg_balance,
648            stored_acked_tickets[1].verified_ticket().amount,
649            "aggregated balance invalid"
650        );
651
652        Ok(awaiter.consume_and_wait(Duration::from_millis(2000)).await?)
653    }
654
655    #[tokio::test]
656    async fn test_ticket_aggregation_skip_lower_indices() -> anyhow::Result<()> {
657        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
658        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
659        init_db(db_alice.clone()).await?;
660        init_db(db_bob.clone()).await?;
661
662        let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
663        db_bob.start_ticket_processing(bob_notify_tx.into())?;
664
665        const NUM_TICKETS: u64 = 30;
666        const CHANNEL_TICKET_IDX: u64 = 20;
667
668        let mut tickets = vec![];
669        let mut agg_balance = HoprBalance::zero();
670        // Generate acknowledged tickets
671        for i in 1..=NUM_TICKETS {
672            let ack_ticket = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], i)?;
673            if i >= CHANNEL_TICKET_IDX {
674                agg_balance += ack_ticket.verified_ticket().amount;
675            }
676            tickets.push(ack_ticket)
677        }
678
679        let alice_addr: Address = (&PEERS_CHAIN[0]).into();
680        let bob_addr: Address = (&PEERS_CHAIN[1]).into();
681
682        let alice_packet_key = PEERS[0].public().into();
683        let bob_packet_key = PEERS[1].public().into();
684
685        let channel_alice_bob = ChannelEntry::new(
686            alice_addr,
687            bob_addr,
688            agg_balance.mul(10),
689            CHANNEL_TICKET_IDX.into(),
690            ChannelStatus::Open,
691            1u32.into(),
692        );
693
694        db_alice.upsert_channel(None, channel_alice_bob).await?;
695        db_bob.upsert_channel(None, channel_alice_bob).await?;
696
697        for ticket in tickets.into_iter() {
698            db_bob.upsert_ticket(None, ticket).await?;
699        }
700
701        let mut alice = super::TicketAggregationInteraction::<(), ()>::new(db_alice.clone(), &PEERS_CHAIN[0]);
702        let mut bob = super::TicketAggregationInteraction::<(), ()>::new(db_bob.clone(), &PEERS_CHAIN[1]);
703
704        let awaiter = bob
705            .writer()
706            .aggregate_tickets(&channel_alice_bob.get_id(), Default::default())?;
707
708        let mut finalizer = None;
709        match timeout(Duration::from_secs(5), bob.next()).await {
710            Ok(Some(TicketAggregationProcessed::Send(_, acked_tickets, request_finalizer))) => {
711                let _ = finalizer.insert(request_finalizer);
712                assert_eq!(
713                    NUM_TICKETS - CHANNEL_TICKET_IDX + 1,
714                    acked_tickets.len() as u64,
715                    "invalid number of tickets to aggregate"
716                );
717                alice
718                    .writer()
719                    .receive_aggregation_request(bob_packet_key, acked_tickets.into_iter().collect(), ())?;
720            }
721            _ => panic!("unexpected action happened while sending agg request by Bob"),
722        };
723
724        match timeout(Duration::from_secs(5), alice.next()).await {
725            Ok(Some(TicketAggregationProcessed::Reply(_, aggregated_ticket, ()))) => {
726                bob.writer().receive_ticket(alice_packet_key, aggregated_ticket, ())?
727            }
728            _ => panic!("unexpected action happened while awaiting agg request at Alice"),
729        };
730
731        match timeout(Duration::from_secs(5), bob.next()).await {
732            Ok(Some(TicketAggregationProcessed::Receive(_destination, _acked_tkt, ()))) => {
733                finalizer.take().expect("finalizer should be present").finalize()
734            }
735            _ => panic!("unexpected action happened while awaiting agg response at Bob"),
736        }
737
738        pin_mut!(bob_notify_rx);
739        bob_notify_rx
740            .next()
741            .await
742            .expect("bob should have received the ticket notification");
743
744        let stored_acked_tickets = db_bob.get_tickets((&channel_alice_bob).into()).await?;
745
746        assert_eq!(
747            stored_acked_tickets.len(),
748            20,
749            "there should be 1 aggregated ticket and 19 old tickets"
750        );
751
752        assert!(
753            stored_acked_tickets[19].verified_ticket().is_aggregated(),
754            "last ticket must be the aggregated one"
755        );
756        for (i, stored_acked_ticket) in stored_acked_tickets.iter().enumerate().take(19) {
757            assert_eq!(
758                AcknowledgedTicketStatus::Untouched,
759                stored_acked_ticket.status,
760                "ticket #{i} must be untouched"
761            );
762        }
763        assert_eq!(
764            agg_balance,
765            stored_acked_tickets[19].verified_ticket().amount,
766            "aggregated balance invalid"
767        );
768
769        Ok(awaiter.consume_and_wait(Duration::from_millis(2000)).await?)
770    }
771}