hopr_transport_ticket_aggregation/
lib.rs

1use std::{pin::Pin, task::Poll};
2
3use futures::{
4    channel::{
5        mpsc,
6        mpsc::{Receiver, Sender, UnboundedSender, channel},
7    },
8    future::{Either, poll_fn},
9    pin_mut,
10    stream::{Stream, StreamExt},
11};
12use hopr_async_runtime::prelude::{sleep, spawn};
13use hopr_crypto_types::prelude::*;
14use hopr_db_api::{
15    errors::DbError,
16    tickets::{AggregationPrerequisites, HoprDbTicketOperations},
17};
18use hopr_internal_types::prelude::*;
19use hopr_transport_identity::PeerId;
20use libp2p::request_response::{OutboundRequestId, ResponseChannel};
21use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
22use thiserror::Error;
23use tracing::{error, warn};
24
25/// Errors generated by the crate.
26#[derive(Error, Debug)]
27pub enum TicketAggregationError {
28    #[error("tx queue is full, retry later")]
29    Retry,
30
31    #[error("underlying transport error while sending packet: {0}")]
32    TransportError(String),
33
34    #[error("db error {0}")]
35    DatabaseError(#[from] hopr_db_api::errors::DbError),
36}
37
38/// Result used by the crate, based on the [ProtocolError] error type.
39pub type Result<T> = core::result::Result<T, TicketAggregationError>;
40
41#[cfg(all(feature = "prometheus", not(test)))]
42use hopr_metrics::metrics::SimpleCounter;
43
44#[cfg(all(feature = "prometheus", not(test)))]
45lazy_static::lazy_static! {
46    static ref METRIC_AGGREGATED_TICKETS: SimpleCounter = SimpleCounter::new(
47        "hopr_aggregated_tickets_count",
48        "Number of aggregated tickets"
49    )
50    .unwrap();
51    static ref METRIC_AGGREGATION_COUNT: SimpleCounter = SimpleCounter::new(
52        "hopr_aggregations_count",
53        "Number of performed ticket aggregations"
54    )
55    .unwrap();
56}
57
58// Default sizes of the acknowledgement queues
59pub const TICKET_AGGREGATION_TX_QUEUE_SIZE: usize = 2048;
60pub const TICKET_AGGREGATION_RX_QUEUE_SIZE: usize = 2048;
61
62/// The input to the processor background pipeline
63#[allow(clippy::type_complexity)] // TODO: The type needs to be significantly refactored to easily move around
64#[allow(clippy::large_enum_variant)] // TODO: refactor the large types used in the enum
65#[derive(Debug)]
66pub enum TicketAggregationToProcess<T, U> {
67    ToReceive(PeerId, std::result::Result<Ticket, String>, U),
68    ToProcess(PeerId, Vec<TransferableWinningTicket>, T),
69    ToSend(Hash, AggregationPrerequisites, TicketAggregationFinalizer),
70}
71
72/// Emitted by the processor background pipeline once processed
73#[allow(clippy::large_enum_variant)] // TODO: refactor the large types used in the enum
74#[derive(Debug)]
75pub enum TicketAggregationProcessed<T, U> {
76    Receive(PeerId, AcknowledgedTicket, U),
77    Reply(PeerId, std::result::Result<Ticket, String>, T),
78    Send(PeerId, Vec<TransferableWinningTicket>, TicketAggregationFinalizer),
79}
80
81#[async_trait::async_trait]
82pub trait TicketAggregatorTrait {
83    /// Pushes a new collection of tickets into the processing.
84    async fn aggregate_tickets(&self, channel: &Hash, prerequisites: AggregationPrerequisites) -> Result<()>;
85}
86
87#[derive(Debug)]
88pub struct AwaitingAggregator<T, U, Db>
89where
90    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
91    T: Send,
92    U: Send,
93{
94    db: Db,
95    writer: TicketAggregationActions<T, U>,
96    agg_timeout: std::time::Duration,
97}
98
99impl<T, U, Db> Clone for AwaitingAggregator<T, U, Db>
100where
101    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
102    T: Send,
103    U: Send,
104{
105    fn clone(&self) -> Self {
106        Self {
107            db: self.db.clone(),
108            writer: self.writer.clone(),
109            agg_timeout: self.agg_timeout,
110        }
111    }
112}
113
114impl<T, U, Db> AwaitingAggregator<T, U, Db>
115where
116    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
117    T: Send,
118    U: Send,
119{
120    pub fn new(db: Db, writer: TicketAggregationActions<T, U>, agg_timeout: std::time::Duration) -> Self {
121        Self {
122            db,
123            writer,
124            agg_timeout,
125        }
126    }
127}
128
129#[async_trait::async_trait]
130impl<T, U, Db> TicketAggregatorTrait for AwaitingAggregator<T, U, Db>
131where
132    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
133    T: Send,
134    U: Send,
135{
136    #[tracing::instrument(level = "debug", skip(self))]
137    async fn aggregate_tickets(&self, channel: &Hash, prerequisites: AggregationPrerequisites) -> Result<()> {
138        let awaiter = self.writer.clone().aggregate_tickets(channel, prerequisites)?;
139
140        if let Err(e) = awaiter.consume_and_wait(self.agg_timeout).await {
141            warn!(%channel, error = %e, "Error during ticket aggregation, performing a rollback");
142            self.db.rollback_aggregation_in_channel(*channel).await?;
143        }
144
145        Ok(())
146    }
147}
148
149#[derive(Debug)]
150pub struct TicketAggregationAwaiter {
151    rx: mpsc::UnboundedReceiver<()>,
152}
153
154impl From<mpsc::UnboundedReceiver<()>> for TicketAggregationAwaiter {
155    fn from(value: mpsc::UnboundedReceiver<()>) -> Self {
156        Self { rx: value }
157    }
158}
159
160impl TicketAggregationAwaiter {
161    pub async fn consume_and_wait(mut self, until_timeout: std::time::Duration) -> Result<()> {
162        let timeout = sleep(until_timeout);
163        let resolve = self.rx.next();
164
165        pin_mut!(resolve, timeout);
166        match futures::future::select(resolve, timeout).await {
167            Either::Left((result, _)) => result.ok_or(TicketAggregationError::TransportError("Canceled".to_owned())),
168            Either::Right(_) => Err(TicketAggregationError::TransportError(
169                "Timed out on sending a packet".to_owned(),
170            )),
171        }
172    }
173}
174
175#[derive(Debug, Clone)]
176pub struct TicketAggregationFinalizer {
177    tx: Option<UnboundedSender<()>>,
178}
179
180impl TicketAggregationFinalizer {
181    pub fn new(tx: UnboundedSender<()>) -> Self {
182        Self { tx: Some(tx) }
183    }
184
185    pub fn finalize(mut self) {
186        if let Some(sender) = self.tx.take() {
187            if sender.unbounded_send(()).is_err() {
188                error!("Failed to notify the awaiter about the successful ticket aggregation")
189            }
190        } else {
191            error!("Sender for packet send signalization is already spent")
192        }
193    }
194}
195
196/// External API for feeding Ticket Aggregation actions into the Ticket Aggregation
197/// processor processing the elements independently in the background.
198#[derive(Debug)]
199pub struct TicketAggregationActions<T, U> {
200    pub queue: Sender<TicketAggregationToProcess<T, U>>,
201}
202
203pub type BasicTicketAggregationActions<T> = TicketAggregationActions<ResponseChannel<T>, OutboundRequestId>;
204
205impl<T, U> Clone for TicketAggregationActions<T, U> {
206    /// Generic type requires a handwritten clone function
207    fn clone(&self) -> Self {
208        Self {
209            queue: self.queue.clone(),
210        }
211    }
212}
213
214impl<T, U> TicketAggregationActions<T, U> {
215    /// Pushes the aggregated ticket received from the transport layer into processing.
216    pub fn receive_ticket(
217        &mut self,
218        source: PeerId,
219        ticket: std::result::Result<Ticket, String>,
220        request: U,
221    ) -> Result<()> {
222        self.process(TicketAggregationToProcess::ToReceive(source, ticket, request))
223    }
224
225    /// Process the received aggregation request
226    pub fn receive_aggregation_request(
227        &mut self,
228        source: PeerId,
229        tickets: Vec<TransferableWinningTicket>,
230        request: T,
231    ) -> Result<()> {
232        self.process(TicketAggregationToProcess::ToProcess(source, tickets, request))
233    }
234
235    /// Pushes a new collection of tickets into the processing.
236    pub fn aggregate_tickets(
237        &mut self,
238        channel: &Hash,
239        prerequisites: AggregationPrerequisites,
240    ) -> Result<TicketAggregationAwaiter> {
241        let (tx, rx) = mpsc::unbounded::<()>();
242
243        self.process(TicketAggregationToProcess::ToSend(
244            *channel,
245            prerequisites,
246            TicketAggregationFinalizer::new(tx),
247        ))?;
248
249        Ok(rx.into())
250    }
251
252    fn process(&mut self, event: TicketAggregationToProcess<T, U>) -> Result<()> {
253        self.queue.try_send(event).map_err(|e| {
254            if e.is_full() {
255                TicketAggregationError::Retry
256            } else if e.is_disconnected() {
257                TicketAggregationError::TransportError("queue is closed".to_string())
258            } else {
259                TicketAggregationError::TransportError(format!("Unknown error: {e}"))
260            }
261        })
262    }
263}
264
265type AckEventQueue<T, U> = (
266    Sender<TicketAggregationToProcess<T, U>>,
267    Receiver<TicketAggregationProcessed<T, U>>,
268);
269
270/// Sets up processing of ticket aggregation interactions and returns relevant read and write mechanism.
271pub struct TicketAggregationInteraction<T, U>
272where
273    T: Send,
274    U: Send,
275{
276    ack_event_queue: AckEventQueue<T, U>,
277}
278
279impl<T: 'static, U: 'static> TicketAggregationInteraction<T, U>
280where
281    T: Send,
282    U: Send,
283{
284    /// Creates a new instance given the DB to process the ticket aggregation requests.
285    pub fn new<Db>(db: Db, chain_key: &ChainKeypair) -> Self
286    where
287        Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug + 'static,
288    {
289        let (processing_in_tx, processing_in_rx) = channel::<TicketAggregationToProcess<T, U>>(
290            TICKET_AGGREGATION_RX_QUEUE_SIZE + TICKET_AGGREGATION_TX_QUEUE_SIZE,
291        );
292        let (processing_out_tx, processing_out_rx) = channel::<TicketAggregationProcessed<T, U>>(
293            TICKET_AGGREGATION_RX_QUEUE_SIZE + TICKET_AGGREGATION_TX_QUEUE_SIZE,
294        );
295
296        let chain_key = chain_key.clone();
297
298        let mut processing_stream = processing_in_rx.then_concurrent(move |event| {
299            let chain_key = chain_key.clone();
300            let db = db.clone();
301            let mut processed_tx = processing_out_tx.clone();
302
303            async move {
304                let processed = match event {
305                    TicketAggregationToProcess::ToProcess(destination, acked_tickets, response) => {
306                        let opk: std::result::Result<OffchainPublicKey, hopr_primitive_types::errors::GeneralError> =
307                            destination.try_into();
308                        match opk {
309                            Ok(opk) => {
310                                let count = acked_tickets.len();
311                                match db.aggregate_tickets(opk, acked_tickets, &chain_key).await {
312                                    Ok(ticket) => Some(TicketAggregationProcessed::Reply(
313                                        destination,
314                                        Ok(ticket.leak()),
315                                        response,
316                                    )),
317                                    Err(DbError::TicketAggregationError(e)) => {
318                                        // forward error to counterparty
319                                        Some(TicketAggregationProcessed::Reply(destination, Err(e), response))
320                                    }
321                                    Err(e) => {
322                                        error!(error = %e, %destination, count, "Dropping tickets aggregation request due to an error");
323                                        None
324                                    }
325                                }
326                            },
327                            Err(e) => {
328                                error!(
329                                    %destination, error = %e,
330                                    "Failed to aggregate tickets due to destination deserialization error from an offchain public key"
331                                );
332                                None
333                            }
334                        }
335                    }
336                    TicketAggregationToProcess::ToReceive(destination, aggregated_ticket, request) => {
337                        match aggregated_ticket {
338                            Ok(ticket) => match db.process_received_aggregated_ticket(ticket.clone(), &chain_key).await
339                            {
340                                Ok(acked_ticket) => {
341                                    Some(TicketAggregationProcessed::Receive(destination, acked_ticket, request))
342                                }
343                                Err(e) => {
344                                    error!(error = %e, counterparty = %destination, "Error while handling aggregated ticket");
345                                    None
346                                }
347                            },
348                            Err(e) => {
349                                warn!(error = %e, counterparty = %destination, "Counterparty refused to aggregate tickets");
350                                None
351                            }
352                        }
353                    }
354                    TicketAggregationToProcess::ToSend(channel, prerequsites, finalizer) => {
355                        match db.prepare_aggregation_in_channel(&channel, prerequsites).await {
356                            Ok(Some((source, tickets, _dest))) if !tickets.is_empty() => {
357                                #[cfg(all(feature = "prometheus", not(test)))]
358                                {
359                                    METRIC_AGGREGATED_TICKETS.increment_by(tickets.len() as u64);
360                                    METRIC_AGGREGATION_COUNT.increment();
361                                }
362
363                                Some(TicketAggregationProcessed::Send(source.into(), tickets, finalizer))
364                            }
365                            Err(e) => {
366                                error!(error = %e, "An error occured when preparing the channel aggregation");
367                                None
368                            }
369                            _ => {
370                                finalizer.finalize();
371                                None
372                            }
373                        }
374                    }
375                };
376
377                if let Some(event) = processed {
378                    match poll_fn(|cx| Pin::new(&mut processed_tx).poll_ready(cx)).await {
379                        Ok(_) => match processed_tx.start_send(event) {
380                            Ok(_) => {}
381                            Err(e) => error!(error = %e, "Failed to pass a processed ack message"),
382                        },
383                        Err(e) => {
384                            warn!(error = %e, "The receiver for processed ack no longer exists");
385                        }
386                    };
387                }
388            }
389        });
390
391        // NOTE: This spawned task does not need to be explicitly canceled, since it will
392        // be automatically dropped when the event sender object is dropped.
393        spawn(async move {
394            // poll the stream until it's done
395            while processing_stream.next().await.is_some() {}
396        });
397
398        Self {
399            ack_event_queue: (processing_in_tx, processing_out_rx),
400        }
401    }
402
403    pub fn writer(&self) -> TicketAggregationActions<T, U> {
404        TicketAggregationActions {
405            queue: self.ack_event_queue.0.clone(),
406        }
407    }
408}
409
410impl<T, U> Stream for TicketAggregationInteraction<T, U>
411where
412    T: Send,
413    U: Send,
414{
415    type Item = TicketAggregationProcessed<T, U>;
416
417    fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
418        Pin::new(self).ack_event_queue.1.poll_next_unpin(cx)
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use std::{ops::Mul, time::Duration};
425
426    use futures::{pin_mut, stream::StreamExt};
427    use hex_literal::hex;
428    use hopr_crypto_types::{
429        keypairs::{ChainKeypair, Keypair, OffchainKeypair},
430        types::{Hash, Response},
431    };
432    use hopr_db_sql::{
433        HoprDbGeneralModelOperations,
434        accounts::HoprDbAccountOperations,
435        api::{info::DomainSeparator, tickets::HoprDbTicketOperations},
436        channels::HoprDbChannelOperations,
437        db::HoprDb,
438        info::HoprDbInfoOperations,
439    };
440    use hopr_internal_types::prelude::*;
441    use hopr_primitive_types::prelude::*;
442    use lazy_static::lazy_static;
443    use tokio::time::timeout;
444
445    use super::TicketAggregationProcessed;
446
447    lazy_static! {
448        static ref PEERS: Vec<OffchainKeypair> = [
449            hex!("b91a28ff9840e9c93e5fafd581131f0b9f33f3e61b02bf5dd83458aa0221f572"),
450            hex!("82283757872f99541ce33a47b90c2ce9f64875abf08b5119a8a434b2fa83ea98")
451        ]
452        .iter()
453        .map(|private| OffchainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
454        .collect();
455        static ref PEERS_CHAIN: Vec<ChainKeypair> = [
456            hex!("51d3003d908045a4d76d0bfc0d84f6ff946b5934b7ea6a2958faf02fead4567a"),
457            hex!("e1f89073a01831d0eed9fe2c67e7d65c144b9d9945320f6d325b1cccc2d124e9")
458        ]
459        .iter()
460        .map(|private| ChainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
461        .collect();
462    }
463
464    fn mock_acknowledged_ticket(
465        signer: &ChainKeypair,
466        destination: &ChainKeypair,
467        index: u64,
468    ) -> anyhow::Result<AcknowledgedTicket> {
469        let price_per_packet: U256 = 10000000000000000u128.into();
470        let ticket_win_prob = 1.0f64;
471
472        let channel_id = generate_channel_id(&signer.into(), &destination.into());
473
474        let channel_epoch = 1u64;
475        let domain_separator = Hash::default();
476
477        let response = Response::try_from(
478            Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
479        )?;
480
481        Ok(TicketBuilder::default()
482            .addresses(signer, destination)
483            .amount(price_per_packet.div_f64(ticket_win_prob)?)
484            .index(index)
485            .index_offset(1)
486            .win_prob(ticket_win_prob.try_into()?)
487            .channel_epoch(1)
488            .challenge(response.to_challenge().into())
489            .build_signed(signer, &domain_separator)?
490            .into_acknowledged(response))
491    }
492
493    async fn init_db(db: HoprDb) -> anyhow::Result<()> {
494        let db_clone = db.clone();
495
496        let peers = PEERS.clone();
497        let peers_chain = PEERS_CHAIN.clone();
498
499        db.begin_transaction()
500            .await?
501            .perform(move |tx| {
502                Box::pin(async move {
503                    db_clone
504                        .set_domain_separator(Some(tx), DomainSeparator::Channel, Hash::default())
505                        .await?;
506                    for (offchain, chain) in peers.iter().zip(peers_chain.iter()) {
507                        db_clone
508                            .insert_account(
509                                Some(tx),
510                                AccountEntry {
511                                    public_key: *offchain.public(),
512                                    chain_addr: chain.public().to_address(),
513                                    entry_type: AccountType::NotAnnounced,
514                                    published_at: 1,
515                                },
516                            )
517                            .await?
518                    }
519
520                    Ok::<(), hopr_db_sql::errors::DbSqlError>(())
521                })
522            })
523            .await?;
524
525        Ok(())
526    }
527
528    #[tokio::test]
529    async fn test_ticket_aggregation() -> anyhow::Result<()> {
530        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
531        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
532        init_db(db_alice.clone()).await?;
533        init_db(db_bob.clone()).await?;
534
535        const NUM_TICKETS: u64 = 30;
536
537        let mut tickets = vec![];
538        let mut agg_balance = HoprBalance::zero();
539        // Generate acknowledged tickets
540        for i in 1..=NUM_TICKETS {
541            let mut ack_ticket = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], i)?;
542
543            // Mark the first ticket as redeemed, so it does not enter the aggregation
544            if i == 1 {
545                ack_ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
546            } else {
547                agg_balance += ack_ticket.verified_ticket().amount;
548            }
549
550            tickets.push(ack_ticket)
551        }
552
553        let alice_addr: Address = (&PEERS_CHAIN[0]).into();
554        let bob_addr: Address = (&PEERS_CHAIN[1]).into();
555
556        let alice_packet_key = PEERS[0].public().into();
557        let bob_packet_key = PEERS[1].public().into();
558
559        let channel_alice_bob = ChannelEntry::new(
560            alice_addr,
561            bob_addr,
562            agg_balance.mul(10),
563            1_u32.into(),
564            ChannelStatus::Open,
565            1u32.into(),
566        );
567
568        db_alice.upsert_channel(None, channel_alice_bob).await?;
569        db_bob.upsert_channel(None, channel_alice_bob).await?;
570
571        for ticket in tickets.into_iter() {
572            db_bob.upsert_ticket(None, ticket).await?;
573        }
574
575        let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
576        db_bob.start_ticket_processing(bob_notify_tx.into())?;
577
578        let mut alice = super::TicketAggregationInteraction::<(), ()>::new(db_alice.clone(), &PEERS_CHAIN[0]);
579        let mut bob = super::TicketAggregationInteraction::<(), ()>::new(db_bob.clone(), &PEERS_CHAIN[1]);
580
581        let awaiter = bob
582            .writer()
583            .aggregate_tickets(&channel_alice_bob.get_id(), Default::default())?;
584
585        let mut finalizer = None;
586        match timeout(Duration::from_secs(5), bob.next()).await {
587            Ok(Some(TicketAggregationProcessed::Send(_, acked_tickets, request_finalizer))) => {
588                let _ = finalizer.insert(request_finalizer);
589                assert_eq!(
590                    NUM_TICKETS - 1,
591                    acked_tickets.len() as u64,
592                    "invalid number of tickets to aggregate"
593                );
594                alice
595                    .writer()
596                    .receive_aggregation_request(bob_packet_key, acked_tickets.into_iter().collect(), ())?;
597            }
598            _ => panic!("unexpected action happened while sending agg request by Bob"),
599        };
600
601        match timeout(Duration::from_secs(5), alice.next()).await {
602            Ok(Some(TicketAggregationProcessed::Reply(_, aggregated_ticket, ()))) => {
603                bob.writer().receive_ticket(alice_packet_key, aggregated_ticket, ())?
604            }
605            _ => panic!("unexpected action happened while awaiting agg request at Alice"),
606        };
607
608        match timeout(Duration::from_secs(5), bob.next()).await {
609            Ok(Some(TicketAggregationProcessed::Receive(_destination, _acked_tkt, ()))) => {
610                finalizer.take().expect("finalizer should be present").finalize()
611            }
612            _ => panic!("unexpected action happened while awaiting agg response at Bob"),
613        }
614
615        pin_mut!(bob_notify_rx);
616        bob_notify_rx
617            .next()
618            .await
619            .expect("bob should have received the ticket notification");
620
621        let stored_acked_tickets = db_bob.get_tickets((&channel_alice_bob).into()).await?;
622
623        assert_eq!(
624            stored_acked_tickets.len(),
625            2,
626            "there should be 1 aggregated ticket and 1 ticket being redeemed"
627        );
628
629        assert_eq!(
630            AcknowledgedTicketStatus::BeingRedeemed,
631            stored_acked_tickets[0].status,
632            "first ticket must be being redeemed"
633        );
634        assert!(
635            stored_acked_tickets[1].verified_ticket().is_aggregated(),
636            "last ticket must be the aggregated one"
637        );
638        assert_eq!(
639            AcknowledgedTicketStatus::Untouched,
640            stored_acked_tickets[1].status,
641            "second ticket must be untouched"
642        );
643        assert_eq!(
644            agg_balance,
645            stored_acked_tickets[1].verified_ticket().amount,
646            "aggregated balance invalid"
647        );
648
649        Ok(awaiter.consume_and_wait(Duration::from_millis(2000)).await?)
650    }
651
652    #[tokio::test]
653    async fn test_ticket_aggregation_skip_lower_indices() -> anyhow::Result<()> {
654        let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
655        let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
656        init_db(db_alice.clone()).await?;
657        init_db(db_bob.clone()).await?;
658
659        let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
660        db_bob.start_ticket_processing(bob_notify_tx.into())?;
661
662        const NUM_TICKETS: u64 = 30;
663        const CHANNEL_TICKET_IDX: u64 = 20;
664
665        let mut tickets = vec![];
666        let mut agg_balance = HoprBalance::zero();
667        // Generate acknowledged tickets
668        for i in 1..=NUM_TICKETS {
669            let ack_ticket = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], i)?;
670            if i >= CHANNEL_TICKET_IDX {
671                agg_balance += ack_ticket.verified_ticket().amount;
672            }
673            tickets.push(ack_ticket)
674        }
675
676        let alice_addr: Address = (&PEERS_CHAIN[0]).into();
677        let bob_addr: Address = (&PEERS_CHAIN[1]).into();
678
679        let alice_packet_key = PEERS[0].public().into();
680        let bob_packet_key = PEERS[1].public().into();
681
682        let channel_alice_bob = ChannelEntry::new(
683            alice_addr,
684            bob_addr,
685            agg_balance.mul(10),
686            CHANNEL_TICKET_IDX.into(),
687            ChannelStatus::Open,
688            1u32.into(),
689        );
690
691        db_alice.upsert_channel(None, channel_alice_bob).await?;
692        db_bob.upsert_channel(None, channel_alice_bob).await?;
693
694        for ticket in tickets.into_iter() {
695            db_bob.upsert_ticket(None, ticket).await?;
696        }
697
698        let mut alice = super::TicketAggregationInteraction::<(), ()>::new(db_alice.clone(), &PEERS_CHAIN[0]);
699        let mut bob = super::TicketAggregationInteraction::<(), ()>::new(db_bob.clone(), &PEERS_CHAIN[1]);
700
701        let awaiter = bob
702            .writer()
703            .aggregate_tickets(&channel_alice_bob.get_id(), Default::default())?;
704
705        let mut finalizer = None;
706        match timeout(Duration::from_secs(5), bob.next()).await {
707            Ok(Some(TicketAggregationProcessed::Send(_, acked_tickets, request_finalizer))) => {
708                let _ = finalizer.insert(request_finalizer);
709                assert_eq!(
710                    NUM_TICKETS - CHANNEL_TICKET_IDX + 1,
711                    acked_tickets.len() as u64,
712                    "invalid number of tickets to aggregate"
713                );
714                alice
715                    .writer()
716                    .receive_aggregation_request(bob_packet_key, acked_tickets.into_iter().collect(), ())?;
717            }
718            _ => panic!("unexpected action happened while sending agg request by Bob"),
719        };
720
721        match timeout(Duration::from_secs(5), alice.next()).await {
722            Ok(Some(TicketAggregationProcessed::Reply(_, aggregated_ticket, ()))) => {
723                bob.writer().receive_ticket(alice_packet_key, aggregated_ticket, ())?
724            }
725            _ => panic!("unexpected action happened while awaiting agg request at Alice"),
726        };
727
728        match timeout(Duration::from_secs(5), bob.next()).await {
729            Ok(Some(TicketAggregationProcessed::Receive(_destination, _acked_tkt, ()))) => {
730                finalizer.take().expect("finalizer should be present").finalize()
731            }
732            _ => panic!("unexpected action happened while awaiting agg response at Bob"),
733        }
734
735        pin_mut!(bob_notify_rx);
736        bob_notify_rx
737            .next()
738            .await
739            .expect("bob should have received the ticket notification");
740
741        let stored_acked_tickets = db_bob.get_tickets((&channel_alice_bob).into()).await?;
742
743        assert_eq!(
744            stored_acked_tickets.len(),
745            20,
746            "there should be 1 aggregated ticket and 19 old tickets"
747        );
748
749        assert!(
750            stored_acked_tickets[19].verified_ticket().is_aggregated(),
751            "last ticket must be the aggregated one"
752        );
753        for (i, stored_acked_ticket) in stored_acked_tickets.iter().enumerate().take(19) {
754            assert_eq!(
755                AcknowledgedTicketStatus::Untouched,
756                stored_acked_ticket.status,
757                "ticket #{i} must be untouched"
758            );
759        }
760        assert_eq!(
761            agg_balance,
762            stored_acked_tickets[19].verified_ticket().amount,
763            "aggregated balance invalid"
764        );
765
766        Ok(awaiter.consume_and_wait(Duration::from_millis(2000)).await?)
767    }
768}