1use std::{pin::Pin, task::Poll};
2
3use futures::{
4 channel::{
5 mpsc,
6 mpsc::{Receiver, Sender, UnboundedSender, channel},
7 },
8 future::{Either, poll_fn},
9 pin_mut,
10 stream::{Stream, StreamExt},
11};
12use hopr_async_runtime::prelude::{sleep, spawn};
13use hopr_crypto_types::prelude::*;
14use hopr_db_api::{
15 errors::DbError,
16 tickets::{AggregationPrerequisites, HoprDbTicketOperations},
17};
18use hopr_internal_types::prelude::*;
19use hopr_transport_identity::PeerId;
20use libp2p::request_response::{OutboundRequestId, ResponseChannel};
21use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
22use thiserror::Error;
23use tracing::{error, warn};
24
25#[derive(Error, Debug)]
27pub enum TicketAggregationError {
28 #[error("tx queue is full, retry later")]
29 Retry,
30
31 #[error("underlying transport error while sending packet: {0}")]
32 TransportError(String),
33
34 #[error("db error {0}")]
35 DatabaseError(#[from] hopr_db_api::errors::DbError),
36}
37
38pub type Result<T> = core::result::Result<T, TicketAggregationError>;
40
41#[cfg(all(feature = "prometheus", not(test)))]
42use hopr_metrics::metrics::SimpleCounter;
43
44#[cfg(all(feature = "prometheus", not(test)))]
45lazy_static::lazy_static! {
46 static ref METRIC_AGGREGATED_TICKETS: SimpleCounter = SimpleCounter::new(
47 "hopr_aggregated_tickets_count",
48 "Number of aggregated tickets"
49 )
50 .unwrap();
51 static ref METRIC_AGGREGATION_COUNT: SimpleCounter = SimpleCounter::new(
52 "hopr_aggregations_count",
53 "Number of performed ticket aggregations"
54 )
55 .unwrap();
56}
57
58pub const TICKET_AGGREGATION_TX_QUEUE_SIZE: usize = 2048;
60pub const TICKET_AGGREGATION_RX_QUEUE_SIZE: usize = 2048;
61
62#[allow(clippy::type_complexity)] #[allow(clippy::large_enum_variant)] #[derive(Debug)]
66pub enum TicketAggregationToProcess<T, U> {
67 ToReceive(PeerId, std::result::Result<Ticket, String>, U),
68 ToProcess(PeerId, Vec<TransferableWinningTicket>, T),
69 ToSend(Hash, AggregationPrerequisites, TicketAggregationFinalizer),
70}
71
72#[allow(clippy::large_enum_variant)] #[derive(Debug)]
75pub enum TicketAggregationProcessed<T, U> {
76 Receive(PeerId, AcknowledgedTicket, U),
77 Reply(PeerId, std::result::Result<Ticket, String>, T),
78 Send(PeerId, Vec<TransferableWinningTicket>, TicketAggregationFinalizer),
79}
80
81#[async_trait::async_trait]
82pub trait TicketAggregatorTrait {
83 async fn aggregate_tickets(&self, channel: &Hash, prerequisites: AggregationPrerequisites) -> Result<()>;
85}
86
87#[derive(Debug)]
88pub struct AwaitingAggregator<T, U, Db>
89where
90 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
91 T: Send,
92 U: Send,
93{
94 db: Db,
95 writer: TicketAggregationActions<T, U>,
96 agg_timeout: std::time::Duration,
97}
98
99impl<T, U, Db> Clone for AwaitingAggregator<T, U, Db>
100where
101 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
102 T: Send,
103 U: Send,
104{
105 fn clone(&self) -> Self {
106 Self {
107 db: self.db.clone(),
108 writer: self.writer.clone(),
109 agg_timeout: self.agg_timeout,
110 }
111 }
112}
113
114impl<T, U, Db> AwaitingAggregator<T, U, Db>
115where
116 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
117 T: Send,
118 U: Send,
119{
120 pub fn new(db: Db, writer: TicketAggregationActions<T, U>, agg_timeout: std::time::Duration) -> Self {
121 Self {
122 db,
123 writer,
124 agg_timeout,
125 }
126 }
127}
128
129#[async_trait::async_trait]
130impl<T, U, Db> TicketAggregatorTrait for AwaitingAggregator<T, U, Db>
131where
132 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
133 T: Send,
134 U: Send,
135{
136 #[tracing::instrument(level = "debug", skip(self))]
137 async fn aggregate_tickets(&self, channel: &Hash, prerequisites: AggregationPrerequisites) -> Result<()> {
138 let awaiter = self.writer.clone().aggregate_tickets(channel, prerequisites)?;
139
140 if let Err(e) = awaiter.consume_and_wait(self.agg_timeout).await {
141 warn!(%channel, error = %e, "Error during ticket aggregation, performing a rollback");
142 self.db.rollback_aggregation_in_channel(*channel).await?;
143 }
144
145 Ok(())
146 }
147}
148
149#[derive(Debug)]
150pub struct TicketAggregationAwaiter {
151 rx: mpsc::UnboundedReceiver<()>,
152}
153
154impl From<mpsc::UnboundedReceiver<()>> for TicketAggregationAwaiter {
155 fn from(value: mpsc::UnboundedReceiver<()>) -> Self {
156 Self { rx: value }
157 }
158}
159
160impl TicketAggregationAwaiter {
161 pub async fn consume_and_wait(mut self, until_timeout: std::time::Duration) -> Result<()> {
162 let timeout = sleep(until_timeout);
163 let resolve = self.rx.next();
164
165 pin_mut!(resolve, timeout);
166 match futures::future::select(resolve, timeout).await {
167 Either::Left((result, _)) => result.ok_or(TicketAggregationError::TransportError("Canceled".to_owned())),
168 Either::Right(_) => Err(TicketAggregationError::TransportError(
169 "Timed out on sending a packet".to_owned(),
170 )),
171 }
172 }
173}
174
175#[derive(Debug, Clone)]
176pub struct TicketAggregationFinalizer {
177 tx: Option<UnboundedSender<()>>,
178}
179
180impl TicketAggregationFinalizer {
181 pub fn new(tx: UnboundedSender<()>) -> Self {
182 Self { tx: Some(tx) }
183 }
184
185 pub fn finalize(mut self) {
186 if let Some(sender) = self.tx.take() {
187 if sender.unbounded_send(()).is_err() {
188 error!("Failed to notify the awaiter about the successful ticket aggregation")
189 }
190 } else {
191 error!("Sender for packet send signalization is already spent")
192 }
193 }
194}
195
196#[derive(Debug)]
199pub struct TicketAggregationActions<T, U> {
200 pub queue: Sender<TicketAggregationToProcess<T, U>>,
201}
202
203pub type BasicTicketAggregationActions<T> = TicketAggregationActions<ResponseChannel<T>, OutboundRequestId>;
204
205impl<T, U> Clone for TicketAggregationActions<T, U> {
206 fn clone(&self) -> Self {
208 Self {
209 queue: self.queue.clone(),
210 }
211 }
212}
213
214impl<T, U> TicketAggregationActions<T, U> {
215 pub fn receive_ticket(
217 &mut self,
218 source: PeerId,
219 ticket: std::result::Result<Ticket, String>,
220 request: U,
221 ) -> Result<()> {
222 self.process(TicketAggregationToProcess::ToReceive(source, ticket, request))
223 }
224
225 pub fn receive_aggregation_request(
227 &mut self,
228 source: PeerId,
229 tickets: Vec<TransferableWinningTicket>,
230 request: T,
231 ) -> Result<()> {
232 self.process(TicketAggregationToProcess::ToProcess(source, tickets, request))
233 }
234
235 pub fn aggregate_tickets(
237 &mut self,
238 channel: &Hash,
239 prerequisites: AggregationPrerequisites,
240 ) -> Result<TicketAggregationAwaiter> {
241 let (tx, rx) = mpsc::unbounded::<()>();
242
243 self.process(TicketAggregationToProcess::ToSend(
244 *channel,
245 prerequisites,
246 TicketAggregationFinalizer::new(tx),
247 ))?;
248
249 Ok(rx.into())
250 }
251
252 fn process(&mut self, event: TicketAggregationToProcess<T, U>) -> Result<()> {
253 self.queue.try_send(event).map_err(|e| {
254 if e.is_full() {
255 TicketAggregationError::Retry
256 } else if e.is_disconnected() {
257 TicketAggregationError::TransportError("queue is closed".to_string())
258 } else {
259 TicketAggregationError::TransportError(format!("Unknown error: {e}"))
260 }
261 })
262 }
263}
264
265type AckEventQueue<T, U> = (
266 Sender<TicketAggregationToProcess<T, U>>,
267 Receiver<TicketAggregationProcessed<T, U>>,
268);
269
270pub struct TicketAggregationInteraction<T, U>
272where
273 T: Send,
274 U: Send,
275{
276 ack_event_queue: AckEventQueue<T, U>,
277}
278
279impl<T: 'static, U: 'static> TicketAggregationInteraction<T, U>
280where
281 T: Send,
282 U: Send,
283{
284 pub fn new<Db>(db: Db, chain_key: &ChainKeypair) -> Self
286 where
287 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug + 'static,
288 {
289 let (processing_in_tx, processing_in_rx) = channel::<TicketAggregationToProcess<T, U>>(
290 TICKET_AGGREGATION_RX_QUEUE_SIZE + TICKET_AGGREGATION_TX_QUEUE_SIZE,
291 );
292 let (processing_out_tx, processing_out_rx) = channel::<TicketAggregationProcessed<T, U>>(
293 TICKET_AGGREGATION_RX_QUEUE_SIZE + TICKET_AGGREGATION_TX_QUEUE_SIZE,
294 );
295
296 let chain_key = chain_key.clone();
297
298 let mut processing_stream = processing_in_rx.then_concurrent(move |event| {
299 let chain_key = chain_key.clone();
300 let db = db.clone();
301 let mut processed_tx = processing_out_tx.clone();
302
303 async move {
304 let processed = match event {
305 TicketAggregationToProcess::ToProcess(destination, acked_tickets, response) => {
306 let pubkey: std::result::Result<OffchainPublicKey, hopr_primitive_types::errors::GeneralError> =
308 hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&destination))
309 .await;
310
311 match pubkey {
312 Ok(opk) => {
313 let count = acked_tickets.len();
314 match db.aggregate_tickets(opk, acked_tickets, &chain_key).await {
315 Ok(ticket) => Some(TicketAggregationProcessed::Reply(
316 destination,
317 Ok(ticket.leak()),
318 response,
319 )),
320 Err(DbError::TicketAggregationError(e)) => {
321 Some(TicketAggregationProcessed::Reply(destination, Err(e), response))
323 }
324 Err(e) => {
325 error!(error = %e, %destination, count, "Dropping tickets aggregation request due to an error");
326 None
327 }
328 }
329 },
330 Err(e) => {
331 error!(
332 %destination, error = %e,
333 "Failed to aggregate tickets due to destination deserialization error from an offchain public key"
334 );
335 None
336 }
337 }
338 }
339 TicketAggregationToProcess::ToReceive(destination, aggregated_ticket, request) => {
340 match aggregated_ticket {
341 Ok(ticket) => match db.process_received_aggregated_ticket(ticket.clone(), &chain_key).await
342 {
343 Ok(acked_ticket) => {
344 Some(TicketAggregationProcessed::Receive(destination, acked_ticket, request))
345 }
346 Err(e) => {
347 error!(error = %e, counterparty = %destination, "Error while handling aggregated ticket");
348 None
349 }
350 },
351 Err(e) => {
352 warn!(error = %e, counterparty = %destination, "Counterparty refused to aggregate tickets");
353 None
354 }
355 }
356 }
357 TicketAggregationToProcess::ToSend(channel, prerequsites, finalizer) => {
358 match db.prepare_aggregation_in_channel(&channel, prerequsites).await {
359 Ok(Some((source, tickets, _dest))) if !tickets.is_empty() => {
360 #[cfg(all(feature = "prometheus", not(test)))]
361 {
362 METRIC_AGGREGATED_TICKETS.increment_by(tickets.len() as u64);
363 METRIC_AGGREGATION_COUNT.increment();
364 }
365
366 Some(TicketAggregationProcessed::Send(source.into(), tickets, finalizer))
367 }
368 Err(e) => {
369 error!(error = %e, "An error occured when preparing the channel aggregation");
370 None
371 }
372 _ => {
373 finalizer.finalize();
374 None
375 }
376 }
377 }
378 };
379
380 if let Some(event) = processed {
381 match poll_fn(|cx| Pin::new(&mut processed_tx).poll_ready(cx)).await {
382 Ok(_) => match processed_tx.start_send(event) {
383 Ok(_) => {}
384 Err(e) => error!(error = %e, "Failed to pass a processed ack message"),
385 },
386 Err(e) => {
387 warn!(error = %e, "The receiver for processed ack no longer exists");
388 }
389 };
390 }
391 }
392 });
393
394 spawn(async move {
397 while processing_stream.next().await.is_some() {}
399 });
400
401 Self {
402 ack_event_queue: (processing_in_tx, processing_out_rx),
403 }
404 }
405
406 pub fn writer(&self) -> TicketAggregationActions<T, U> {
407 TicketAggregationActions {
408 queue: self.ack_event_queue.0.clone(),
409 }
410 }
411}
412
413impl<T, U> Stream for TicketAggregationInteraction<T, U>
414where
415 T: Send,
416 U: Send,
417{
418 type Item = TicketAggregationProcessed<T, U>;
419
420 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
421 Pin::new(self).ack_event_queue.1.poll_next_unpin(cx)
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use std::{ops::Mul, time::Duration};
428
429 use futures::{pin_mut, stream::StreamExt};
430 use hex_literal::hex;
431 use hopr_crypto_types::{
432 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
433 types::{Hash, Response},
434 };
435 use hopr_db_sql::{
436 HoprDbGeneralModelOperations,
437 accounts::HoprDbAccountOperations,
438 api::{info::DomainSeparator, tickets::HoprDbTicketOperations},
439 channels::HoprDbChannelOperations,
440 db::HoprDb,
441 info::HoprDbInfoOperations,
442 };
443 use hopr_internal_types::prelude::*;
444 use hopr_primitive_types::prelude::*;
445 use lazy_static::lazy_static;
446 use tokio::time::timeout;
447
448 use super::TicketAggregationProcessed;
449
450 lazy_static! {
451 static ref PEERS: Vec<OffchainKeypair> = [
452 hex!("b91a28ff9840e9c93e5fafd581131f0b9f33f3e61b02bf5dd83458aa0221f572"),
453 hex!("82283757872f99541ce33a47b90c2ce9f64875abf08b5119a8a434b2fa83ea98")
454 ]
455 .iter()
456 .map(|private| OffchainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
457 .collect();
458 static ref PEERS_CHAIN: Vec<ChainKeypair> = [
459 hex!("51d3003d908045a4d76d0bfc0d84f6ff946b5934b7ea6a2958faf02fead4567a"),
460 hex!("e1f89073a01831d0eed9fe2c67e7d65c144b9d9945320f6d325b1cccc2d124e9")
461 ]
462 .iter()
463 .map(|private| ChainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
464 .collect();
465 }
466
467 fn mock_acknowledged_ticket(
468 signer: &ChainKeypair,
469 destination: &ChainKeypair,
470 index: u64,
471 ) -> anyhow::Result<AcknowledgedTicket> {
472 let price_per_packet: U256 = 10000000000000000u128.into();
473 let ticket_win_prob = 1.0f64;
474
475 let channel_id = generate_channel_id(&signer.into(), &destination.into());
476
477 let channel_epoch = 1u64;
478 let domain_separator = Hash::default();
479
480 let response = Response::try_from(
481 Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
482 )?;
483
484 Ok(TicketBuilder::default()
485 .addresses(signer, destination)
486 .amount(price_per_packet.div_f64(ticket_win_prob)?)
487 .index(index)
488 .index_offset(1)
489 .win_prob(ticket_win_prob.try_into()?)
490 .channel_epoch(1)
491 .challenge(response.to_challenge()?)
492 .build_signed(signer, &domain_separator)?
493 .into_acknowledged(response))
494 }
495
496 async fn init_db(db: HoprDb) -> anyhow::Result<()> {
497 let db_clone = db.clone();
498
499 let peers = PEERS.clone();
500 let peers_chain = PEERS_CHAIN.clone();
501
502 db.begin_transaction()
503 .await?
504 .perform(move |tx| {
505 Box::pin(async move {
506 db_clone
507 .set_domain_separator(Some(tx), DomainSeparator::Channel, Hash::default())
508 .await?;
509 for (offchain, chain) in peers.iter().zip(peers_chain.iter()) {
510 db_clone
511 .insert_account(
512 Some(tx),
513 AccountEntry {
514 public_key: *offchain.public(),
515 chain_addr: chain.public().to_address(),
516 entry_type: AccountType::NotAnnounced,
517 published_at: 1,
518 },
519 )
520 .await?
521 }
522
523 Ok::<(), hopr_db_sql::errors::DbSqlError>(())
524 })
525 })
526 .await?;
527
528 Ok(())
529 }
530
531 #[tokio::test]
532 async fn test_ticket_aggregation() -> anyhow::Result<()> {
533 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
534 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
535 init_db(db_alice.clone()).await?;
536 init_db(db_bob.clone()).await?;
537
538 const NUM_TICKETS: u64 = 30;
539
540 let mut tickets = vec![];
541 let mut agg_balance = HoprBalance::zero();
542 for i in 1..=NUM_TICKETS {
544 let mut ack_ticket = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], i)?;
545
546 if i == 1 {
548 ack_ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
549 } else {
550 agg_balance += ack_ticket.verified_ticket().amount;
551 }
552
553 tickets.push(ack_ticket)
554 }
555
556 let alice_addr: Address = (&PEERS_CHAIN[0]).into();
557 let bob_addr: Address = (&PEERS_CHAIN[1]).into();
558
559 let alice_packet_key = PEERS[0].public().into();
560 let bob_packet_key = PEERS[1].public().into();
561
562 let channel_alice_bob = ChannelEntry::new(
563 alice_addr,
564 bob_addr,
565 agg_balance.mul(10),
566 1_u32.into(),
567 ChannelStatus::Open,
568 1u32.into(),
569 );
570
571 db_alice.upsert_channel(None, channel_alice_bob).await?;
572 db_bob.upsert_channel(None, channel_alice_bob).await?;
573
574 for ticket in tickets.into_iter() {
575 db_bob.upsert_ticket(None, ticket).await?;
576 }
577
578 let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
579 db_bob.start_ticket_processing(bob_notify_tx.into())?;
580
581 let mut alice = super::TicketAggregationInteraction::<(), ()>::new(db_alice.clone(), &PEERS_CHAIN[0]);
582 let mut bob = super::TicketAggregationInteraction::<(), ()>::new(db_bob.clone(), &PEERS_CHAIN[1]);
583
584 let awaiter = bob
585 .writer()
586 .aggregate_tickets(&channel_alice_bob.get_id(), Default::default())?;
587
588 let mut finalizer = None;
589 match timeout(Duration::from_secs(5), bob.next()).await {
590 Ok(Some(TicketAggregationProcessed::Send(_, acked_tickets, request_finalizer))) => {
591 let _ = finalizer.insert(request_finalizer);
592 assert_eq!(
593 NUM_TICKETS - 1,
594 acked_tickets.len() as u64,
595 "invalid number of tickets to aggregate"
596 );
597 alice
598 .writer()
599 .receive_aggregation_request(bob_packet_key, acked_tickets.into_iter().collect(), ())?;
600 }
601 _ => panic!("unexpected action happened while sending agg request by Bob"),
602 };
603
604 match timeout(Duration::from_secs(5), alice.next()).await {
605 Ok(Some(TicketAggregationProcessed::Reply(_, aggregated_ticket, ()))) => {
606 bob.writer().receive_ticket(alice_packet_key, aggregated_ticket, ())?
607 }
608 _ => panic!("unexpected action happened while awaiting agg request at Alice"),
609 };
610
611 match timeout(Duration::from_secs(5), bob.next()).await {
612 Ok(Some(TicketAggregationProcessed::Receive(_destination, _acked_tkt, ()))) => {
613 finalizer.take().expect("finalizer should be present").finalize()
614 }
615 _ => panic!("unexpected action happened while awaiting agg response at Bob"),
616 }
617
618 pin_mut!(bob_notify_rx);
619 bob_notify_rx
620 .next()
621 .await
622 .expect("bob should have received the ticket notification");
623
624 let stored_acked_tickets = db_bob.get_tickets((&channel_alice_bob).into()).await?;
625
626 assert_eq!(
627 stored_acked_tickets.len(),
628 2,
629 "there should be 1 aggregated ticket and 1 ticket being redeemed"
630 );
631
632 assert_eq!(
633 AcknowledgedTicketStatus::BeingRedeemed,
634 stored_acked_tickets[0].status,
635 "first ticket must be being redeemed"
636 );
637 assert!(
638 stored_acked_tickets[1].verified_ticket().is_aggregated(),
639 "last ticket must be the aggregated one"
640 );
641 assert_eq!(
642 AcknowledgedTicketStatus::Untouched,
643 stored_acked_tickets[1].status,
644 "second ticket must be untouched"
645 );
646 assert_eq!(
647 agg_balance,
648 stored_acked_tickets[1].verified_ticket().amount,
649 "aggregated balance invalid"
650 );
651
652 Ok(awaiter.consume_and_wait(Duration::from_millis(2000)).await?)
653 }
654
655 #[tokio::test]
656 async fn test_ticket_aggregation_skip_lower_indices() -> anyhow::Result<()> {
657 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
658 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
659 init_db(db_alice.clone()).await?;
660 init_db(db_bob.clone()).await?;
661
662 let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
663 db_bob.start_ticket_processing(bob_notify_tx.into())?;
664
665 const NUM_TICKETS: u64 = 30;
666 const CHANNEL_TICKET_IDX: u64 = 20;
667
668 let mut tickets = vec![];
669 let mut agg_balance = HoprBalance::zero();
670 for i in 1..=NUM_TICKETS {
672 let ack_ticket = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], i)?;
673 if i >= CHANNEL_TICKET_IDX {
674 agg_balance += ack_ticket.verified_ticket().amount;
675 }
676 tickets.push(ack_ticket)
677 }
678
679 let alice_addr: Address = (&PEERS_CHAIN[0]).into();
680 let bob_addr: Address = (&PEERS_CHAIN[1]).into();
681
682 let alice_packet_key = PEERS[0].public().into();
683 let bob_packet_key = PEERS[1].public().into();
684
685 let channel_alice_bob = ChannelEntry::new(
686 alice_addr,
687 bob_addr,
688 agg_balance.mul(10),
689 CHANNEL_TICKET_IDX.into(),
690 ChannelStatus::Open,
691 1u32.into(),
692 );
693
694 db_alice.upsert_channel(None, channel_alice_bob).await?;
695 db_bob.upsert_channel(None, channel_alice_bob).await?;
696
697 for ticket in tickets.into_iter() {
698 db_bob.upsert_ticket(None, ticket).await?;
699 }
700
701 let mut alice = super::TicketAggregationInteraction::<(), ()>::new(db_alice.clone(), &PEERS_CHAIN[0]);
702 let mut bob = super::TicketAggregationInteraction::<(), ()>::new(db_bob.clone(), &PEERS_CHAIN[1]);
703
704 let awaiter = bob
705 .writer()
706 .aggregate_tickets(&channel_alice_bob.get_id(), Default::default())?;
707
708 let mut finalizer = None;
709 match timeout(Duration::from_secs(5), bob.next()).await {
710 Ok(Some(TicketAggregationProcessed::Send(_, acked_tickets, request_finalizer))) => {
711 let _ = finalizer.insert(request_finalizer);
712 assert_eq!(
713 NUM_TICKETS - CHANNEL_TICKET_IDX + 1,
714 acked_tickets.len() as u64,
715 "invalid number of tickets to aggregate"
716 );
717 alice
718 .writer()
719 .receive_aggregation_request(bob_packet_key, acked_tickets.into_iter().collect(), ())?;
720 }
721 _ => panic!("unexpected action happened while sending agg request by Bob"),
722 };
723
724 match timeout(Duration::from_secs(5), alice.next()).await {
725 Ok(Some(TicketAggregationProcessed::Reply(_, aggregated_ticket, ()))) => {
726 bob.writer().receive_ticket(alice_packet_key, aggregated_ticket, ())?
727 }
728 _ => panic!("unexpected action happened while awaiting agg request at Alice"),
729 };
730
731 match timeout(Duration::from_secs(5), bob.next()).await {
732 Ok(Some(TicketAggregationProcessed::Receive(_destination, _acked_tkt, ()))) => {
733 finalizer.take().expect("finalizer should be present").finalize()
734 }
735 _ => panic!("unexpected action happened while awaiting agg response at Bob"),
736 }
737
738 pin_mut!(bob_notify_rx);
739 bob_notify_rx
740 .next()
741 .await
742 .expect("bob should have received the ticket notification");
743
744 let stored_acked_tickets = db_bob.get_tickets((&channel_alice_bob).into()).await?;
745
746 assert_eq!(
747 stored_acked_tickets.len(),
748 20,
749 "there should be 1 aggregated ticket and 19 old tickets"
750 );
751
752 assert!(
753 stored_acked_tickets[19].verified_ticket().is_aggregated(),
754 "last ticket must be the aggregated one"
755 );
756 for (i, stored_acked_ticket) in stored_acked_tickets.iter().enumerate().take(19) {
757 assert_eq!(
758 AcknowledgedTicketStatus::Untouched,
759 stored_acked_ticket.status,
760 "ticket #{i} must be untouched"
761 );
762 }
763 assert_eq!(
764 agg_balance,
765 stored_acked_tickets[19].verified_ticket().amount,
766 "aggregated balance invalid"
767 );
768
769 Ok(awaiter.consume_and_wait(Duration::from_millis(2000)).await?)
770 }
771}