1use std::{pin::Pin, task::Poll};
2
3use futures::{
4 channel::{
5 mpsc,
6 mpsc::{Receiver, Sender, UnboundedSender, channel},
7 },
8 future::{Either, poll_fn},
9 pin_mut,
10 stream::{Stream, StreamExt},
11};
12use hopr_async_runtime::prelude::{sleep, spawn};
13use hopr_crypto_types::prelude::*;
14use hopr_db_api::{
15 errors::DbError,
16 tickets::{AggregationPrerequisites, HoprDbTicketOperations},
17};
18use hopr_internal_types::prelude::*;
19use hopr_transport_identity::PeerId;
20use libp2p::request_response::{OutboundRequestId, ResponseChannel};
21use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
22use thiserror::Error;
23use tracing::{error, warn};
24
25#[derive(Error, Debug)]
27pub enum TicketAggregationError {
28 #[error("tx queue is full, retry later")]
29 Retry,
30
31 #[error("underlying transport error while sending packet: {0}")]
32 TransportError(String),
33
34 #[error("db error {0}")]
35 DatabaseError(#[from] hopr_db_api::errors::DbError),
36}
37
38pub type Result<T> = core::result::Result<T, TicketAggregationError>;
40
41#[cfg(all(feature = "prometheus", not(test)))]
42use hopr_metrics::metrics::SimpleCounter;
43
44#[cfg(all(feature = "prometheus", not(test)))]
45lazy_static::lazy_static! {
46 static ref METRIC_AGGREGATED_TICKETS: SimpleCounter = SimpleCounter::new(
47 "hopr_aggregated_tickets_count",
48 "Number of aggregated tickets"
49 )
50 .unwrap();
51 static ref METRIC_AGGREGATION_COUNT: SimpleCounter = SimpleCounter::new(
52 "hopr_aggregations_count",
53 "Number of performed ticket aggregations"
54 )
55 .unwrap();
56}
57
58pub const TICKET_AGGREGATION_TX_QUEUE_SIZE: usize = 2048;
60pub const TICKET_AGGREGATION_RX_QUEUE_SIZE: usize = 2048;
61
62#[allow(clippy::type_complexity)] #[allow(clippy::large_enum_variant)] #[derive(Debug)]
66pub enum TicketAggregationToProcess<T, U> {
67 ToReceive(PeerId, std::result::Result<Ticket, String>, U),
68 ToProcess(PeerId, Vec<TransferableWinningTicket>, T),
69 ToSend(Hash, AggregationPrerequisites, TicketAggregationFinalizer),
70}
71
72#[allow(clippy::large_enum_variant)] #[derive(Debug)]
75pub enum TicketAggregationProcessed<T, U> {
76 Receive(PeerId, AcknowledgedTicket, U),
77 Reply(PeerId, std::result::Result<Ticket, String>, T),
78 Send(PeerId, Vec<TransferableWinningTicket>, TicketAggregationFinalizer),
79}
80
81#[async_trait::async_trait]
82pub trait TicketAggregatorTrait {
83 async fn aggregate_tickets(&self, channel: &Hash, prerequisites: AggregationPrerequisites) -> Result<()>;
85}
86
87#[derive(Debug)]
88pub struct AwaitingAggregator<T, U, Db>
89where
90 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
91 T: Send,
92 U: Send,
93{
94 db: Db,
95 writer: TicketAggregationActions<T, U>,
96 agg_timeout: std::time::Duration,
97}
98
99impl<T, U, Db> Clone for AwaitingAggregator<T, U, Db>
100where
101 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
102 T: Send,
103 U: Send,
104{
105 fn clone(&self) -> Self {
106 Self {
107 db: self.db.clone(),
108 writer: self.writer.clone(),
109 agg_timeout: self.agg_timeout,
110 }
111 }
112}
113
114impl<T, U, Db> AwaitingAggregator<T, U, Db>
115where
116 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
117 T: Send,
118 U: Send,
119{
120 pub fn new(db: Db, writer: TicketAggregationActions<T, U>, agg_timeout: std::time::Duration) -> Self {
121 Self {
122 db,
123 writer,
124 agg_timeout,
125 }
126 }
127}
128
129#[async_trait::async_trait]
130impl<T, U, Db> TicketAggregatorTrait for AwaitingAggregator<T, U, Db>
131where
132 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
133 T: Send,
134 U: Send,
135{
136 #[tracing::instrument(level = "debug", skip(self))]
137 async fn aggregate_tickets(&self, channel: &Hash, prerequisites: AggregationPrerequisites) -> Result<()> {
138 let awaiter = self.writer.clone().aggregate_tickets(channel, prerequisites)?;
139
140 if let Err(e) = awaiter.consume_and_wait(self.agg_timeout).await {
141 warn!(%channel, error = %e, "Error during ticket aggregation, performing a rollback");
142 self.db.rollback_aggregation_in_channel(*channel).await?;
143 }
144
145 Ok(())
146 }
147}
148
149#[derive(Debug)]
150pub struct TicketAggregationAwaiter {
151 rx: mpsc::UnboundedReceiver<()>,
152}
153
154impl From<mpsc::UnboundedReceiver<()>> for TicketAggregationAwaiter {
155 fn from(value: mpsc::UnboundedReceiver<()>) -> Self {
156 Self { rx: value }
157 }
158}
159
160impl TicketAggregationAwaiter {
161 pub async fn consume_and_wait(mut self, until_timeout: std::time::Duration) -> Result<()> {
162 let timeout = sleep(until_timeout);
163 let resolve = self.rx.next();
164
165 pin_mut!(resolve, timeout);
166 match futures::future::select(resolve, timeout).await {
167 Either::Left((result, _)) => result.ok_or(TicketAggregationError::TransportError("Canceled".to_owned())),
168 Either::Right(_) => Err(TicketAggregationError::TransportError(
169 "Timed out on sending a packet".to_owned(),
170 )),
171 }
172 }
173}
174
175#[derive(Debug, Clone)]
176pub struct TicketAggregationFinalizer {
177 tx: Option<UnboundedSender<()>>,
178}
179
180impl TicketAggregationFinalizer {
181 pub fn new(tx: UnboundedSender<()>) -> Self {
182 Self { tx: Some(tx) }
183 }
184
185 pub fn finalize(mut self) {
186 if let Some(sender) = self.tx.take() {
187 if sender.unbounded_send(()).is_err() {
188 error!("Failed to notify the awaiter about the successful ticket aggregation")
189 }
190 } else {
191 error!("Sender for packet send signalization is already spent")
192 }
193 }
194}
195
196#[derive(Debug)]
199pub struct TicketAggregationActions<T, U> {
200 pub queue: Sender<TicketAggregationToProcess<T, U>>,
201}
202
203pub type BasicTicketAggregationActions<T> = TicketAggregationActions<ResponseChannel<T>, OutboundRequestId>;
204
205impl<T, U> Clone for TicketAggregationActions<T, U> {
206 fn clone(&self) -> Self {
208 Self {
209 queue: self.queue.clone(),
210 }
211 }
212}
213
214impl<T, U> TicketAggregationActions<T, U> {
215 pub fn receive_ticket(
217 &mut self,
218 source: PeerId,
219 ticket: std::result::Result<Ticket, String>,
220 request: U,
221 ) -> Result<()> {
222 self.process(TicketAggregationToProcess::ToReceive(source, ticket, request))
223 }
224
225 pub fn receive_aggregation_request(
227 &mut self,
228 source: PeerId,
229 tickets: Vec<TransferableWinningTicket>,
230 request: T,
231 ) -> Result<()> {
232 self.process(TicketAggregationToProcess::ToProcess(source, tickets, request))
233 }
234
235 pub fn aggregate_tickets(
237 &mut self,
238 channel: &Hash,
239 prerequisites: AggregationPrerequisites,
240 ) -> Result<TicketAggregationAwaiter> {
241 let (tx, rx) = mpsc::unbounded::<()>();
242
243 self.process(TicketAggregationToProcess::ToSend(
244 *channel,
245 prerequisites,
246 TicketAggregationFinalizer::new(tx),
247 ))?;
248
249 Ok(rx.into())
250 }
251
252 fn process(&mut self, event: TicketAggregationToProcess<T, U>) -> Result<()> {
253 self.queue.try_send(event).map_err(|e| {
254 if e.is_full() {
255 TicketAggregationError::Retry
256 } else if e.is_disconnected() {
257 TicketAggregationError::TransportError("queue is closed".to_string())
258 } else {
259 TicketAggregationError::TransportError(format!("Unknown error: {e}"))
260 }
261 })
262 }
263}
264
265type AckEventQueue<T, U> = (
266 Sender<TicketAggregationToProcess<T, U>>,
267 Receiver<TicketAggregationProcessed<T, U>>,
268);
269
270pub struct TicketAggregationInteraction<T, U>
272where
273 T: Send,
274 U: Send,
275{
276 ack_event_queue: AckEventQueue<T, U>,
277}
278
279impl<T: 'static, U: 'static> TicketAggregationInteraction<T, U>
280where
281 T: Send,
282 U: Send,
283{
284 pub fn new<Db>(db: Db, chain_key: &ChainKeypair) -> Self
286 where
287 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug + 'static,
288 {
289 let (processing_in_tx, processing_in_rx) = channel::<TicketAggregationToProcess<T, U>>(
290 TICKET_AGGREGATION_RX_QUEUE_SIZE + TICKET_AGGREGATION_TX_QUEUE_SIZE,
291 );
292 let (processing_out_tx, processing_out_rx) = channel::<TicketAggregationProcessed<T, U>>(
293 TICKET_AGGREGATION_RX_QUEUE_SIZE + TICKET_AGGREGATION_TX_QUEUE_SIZE,
294 );
295
296 let chain_key = chain_key.clone();
297
298 let mut processing_stream = processing_in_rx.then_concurrent(move |event| {
299 let chain_key = chain_key.clone();
300 let db = db.clone();
301 let mut processed_tx = processing_out_tx.clone();
302
303 async move {
304 let processed = match event {
305 TicketAggregationToProcess::ToProcess(destination, acked_tickets, response) => {
306 let opk: std::result::Result<OffchainPublicKey, hopr_primitive_types::errors::GeneralError> =
307 destination.try_into();
308 match opk {
309 Ok(opk) => {
310 let count = acked_tickets.len();
311 match db.aggregate_tickets(opk, acked_tickets, &chain_key).await {
312 Ok(ticket) => Some(TicketAggregationProcessed::Reply(
313 destination,
314 Ok(ticket.leak()),
315 response,
316 )),
317 Err(DbError::TicketAggregationError(e)) => {
318 Some(TicketAggregationProcessed::Reply(destination, Err(e), response))
320 }
321 Err(e) => {
322 error!(error = %e, %destination, count, "Dropping tickets aggregation request due to an error");
323 None
324 }
325 }
326 },
327 Err(e) => {
328 error!(
329 %destination, error = %e,
330 "Failed to aggregate tickets due to destination deserialization error from an offchain public key"
331 );
332 None
333 }
334 }
335 }
336 TicketAggregationToProcess::ToReceive(destination, aggregated_ticket, request) => {
337 match aggregated_ticket {
338 Ok(ticket) => match db.process_received_aggregated_ticket(ticket.clone(), &chain_key).await
339 {
340 Ok(acked_ticket) => {
341 Some(TicketAggregationProcessed::Receive(destination, acked_ticket, request))
342 }
343 Err(e) => {
344 error!(error = %e, counterparty = %destination, "Error while handling aggregated ticket");
345 None
346 }
347 },
348 Err(e) => {
349 warn!(error = %e, counterparty = %destination, "Counterparty refused to aggregate tickets");
350 None
351 }
352 }
353 }
354 TicketAggregationToProcess::ToSend(channel, prerequsites, finalizer) => {
355 match db.prepare_aggregation_in_channel(&channel, prerequsites).await {
356 Ok(Some((source, tickets, _dest))) if !tickets.is_empty() => {
357 #[cfg(all(feature = "prometheus", not(test)))]
358 {
359 METRIC_AGGREGATED_TICKETS.increment_by(tickets.len() as u64);
360 METRIC_AGGREGATION_COUNT.increment();
361 }
362
363 Some(TicketAggregationProcessed::Send(source.into(), tickets, finalizer))
364 }
365 Err(e) => {
366 error!(error = %e, "An error occured when preparing the channel aggregation");
367 None
368 }
369 _ => {
370 finalizer.finalize();
371 None
372 }
373 }
374 }
375 };
376
377 if let Some(event) = processed {
378 match poll_fn(|cx| Pin::new(&mut processed_tx).poll_ready(cx)).await {
379 Ok(_) => match processed_tx.start_send(event) {
380 Ok(_) => {}
381 Err(e) => error!(error = %e, "Failed to pass a processed ack message"),
382 },
383 Err(e) => {
384 warn!(error = %e, "The receiver for processed ack no longer exists");
385 }
386 };
387 }
388 }
389 });
390
391 spawn(async move {
394 while processing_stream.next().await.is_some() {}
396 });
397
398 Self {
399 ack_event_queue: (processing_in_tx, processing_out_rx),
400 }
401 }
402
403 pub fn writer(&self) -> TicketAggregationActions<T, U> {
404 TicketAggregationActions {
405 queue: self.ack_event_queue.0.clone(),
406 }
407 }
408}
409
410impl<T, U> Stream for TicketAggregationInteraction<T, U>
411where
412 T: Send,
413 U: Send,
414{
415 type Item = TicketAggregationProcessed<T, U>;
416
417 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
418 Pin::new(self).ack_event_queue.1.poll_next_unpin(cx)
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use std::{ops::Mul, time::Duration};
425
426 use futures::{pin_mut, stream::StreamExt};
427 use hex_literal::hex;
428 use hopr_crypto_types::{
429 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
430 types::{Hash, Response},
431 };
432 use hopr_db_sql::{
433 HoprDbGeneralModelOperations,
434 accounts::HoprDbAccountOperations,
435 api::{info::DomainSeparator, tickets::HoprDbTicketOperations},
436 channels::HoprDbChannelOperations,
437 db::HoprDb,
438 info::HoprDbInfoOperations,
439 };
440 use hopr_internal_types::prelude::*;
441 use hopr_primitive_types::prelude::*;
442 use lazy_static::lazy_static;
443 use tokio::time::timeout;
444
445 use super::TicketAggregationProcessed;
446
447 lazy_static! {
448 static ref PEERS: Vec<OffchainKeypair> = [
449 hex!("b91a28ff9840e9c93e5fafd581131f0b9f33f3e61b02bf5dd83458aa0221f572"),
450 hex!("82283757872f99541ce33a47b90c2ce9f64875abf08b5119a8a434b2fa83ea98")
451 ]
452 .iter()
453 .map(|private| OffchainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
454 .collect();
455 static ref PEERS_CHAIN: Vec<ChainKeypair> = [
456 hex!("51d3003d908045a4d76d0bfc0d84f6ff946b5934b7ea6a2958faf02fead4567a"),
457 hex!("e1f89073a01831d0eed9fe2c67e7d65c144b9d9945320f6d325b1cccc2d124e9")
458 ]
459 .iter()
460 .map(|private| ChainKeypair::from_secret(private).expect("lazy static keypair should be valid"))
461 .collect();
462 }
463
464 fn mock_acknowledged_ticket(
465 signer: &ChainKeypair,
466 destination: &ChainKeypair,
467 index: u64,
468 ) -> anyhow::Result<AcknowledgedTicket> {
469 let price_per_packet: U256 = 10000000000000000u128.into();
470 let ticket_win_prob = 1.0f64;
471
472 let channel_id = generate_channel_id(&signer.into(), &destination.into());
473
474 let channel_epoch = 1u64;
475 let domain_separator = Hash::default();
476
477 let response = Response::try_from(
478 Hash::create(&[channel_id.as_ref(), &channel_epoch.to_be_bytes(), &index.to_be_bytes()]).as_ref(),
479 )?;
480
481 Ok(TicketBuilder::default()
482 .addresses(signer, destination)
483 .amount(price_per_packet.div_f64(ticket_win_prob)?)
484 .index(index)
485 .index_offset(1)
486 .win_prob(ticket_win_prob.try_into()?)
487 .channel_epoch(1)
488 .challenge(response.to_challenge().into())
489 .build_signed(signer, &domain_separator)?
490 .into_acknowledged(response))
491 }
492
493 async fn init_db(db: HoprDb) -> anyhow::Result<()> {
494 let db_clone = db.clone();
495
496 let peers = PEERS.clone();
497 let peers_chain = PEERS_CHAIN.clone();
498
499 db.begin_transaction()
500 .await?
501 .perform(move |tx| {
502 Box::pin(async move {
503 db_clone
504 .set_domain_separator(Some(tx), DomainSeparator::Channel, Hash::default())
505 .await?;
506 for (offchain, chain) in peers.iter().zip(peers_chain.iter()) {
507 db_clone
508 .insert_account(
509 Some(tx),
510 AccountEntry {
511 public_key: *offchain.public(),
512 chain_addr: chain.public().to_address(),
513 entry_type: AccountType::NotAnnounced,
514 published_at: 1,
515 },
516 )
517 .await?
518 }
519
520 Ok::<(), hopr_db_sql::errors::DbSqlError>(())
521 })
522 })
523 .await?;
524
525 Ok(())
526 }
527
528 #[tokio::test]
529 async fn test_ticket_aggregation() -> anyhow::Result<()> {
530 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
531 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
532 init_db(db_alice.clone()).await?;
533 init_db(db_bob.clone()).await?;
534
535 const NUM_TICKETS: u64 = 30;
536
537 let mut tickets = vec![];
538 let mut agg_balance = HoprBalance::zero();
539 for i in 1..=NUM_TICKETS {
541 let mut ack_ticket = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], i)?;
542
543 if i == 1 {
545 ack_ticket.status = AcknowledgedTicketStatus::BeingRedeemed;
546 } else {
547 agg_balance += ack_ticket.verified_ticket().amount;
548 }
549
550 tickets.push(ack_ticket)
551 }
552
553 let alice_addr: Address = (&PEERS_CHAIN[0]).into();
554 let bob_addr: Address = (&PEERS_CHAIN[1]).into();
555
556 let alice_packet_key = PEERS[0].public().into();
557 let bob_packet_key = PEERS[1].public().into();
558
559 let channel_alice_bob = ChannelEntry::new(
560 alice_addr,
561 bob_addr,
562 agg_balance.mul(10),
563 1_u32.into(),
564 ChannelStatus::Open,
565 1u32.into(),
566 );
567
568 db_alice.upsert_channel(None, channel_alice_bob).await?;
569 db_bob.upsert_channel(None, channel_alice_bob).await?;
570
571 for ticket in tickets.into_iter() {
572 db_bob.upsert_ticket(None, ticket).await?;
573 }
574
575 let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
576 db_bob.start_ticket_processing(bob_notify_tx.into())?;
577
578 let mut alice = super::TicketAggregationInteraction::<(), ()>::new(db_alice.clone(), &PEERS_CHAIN[0]);
579 let mut bob = super::TicketAggregationInteraction::<(), ()>::new(db_bob.clone(), &PEERS_CHAIN[1]);
580
581 let awaiter = bob
582 .writer()
583 .aggregate_tickets(&channel_alice_bob.get_id(), Default::default())?;
584
585 let mut finalizer = None;
586 match timeout(Duration::from_secs(5), bob.next()).await {
587 Ok(Some(TicketAggregationProcessed::Send(_, acked_tickets, request_finalizer))) => {
588 let _ = finalizer.insert(request_finalizer);
589 assert_eq!(
590 NUM_TICKETS - 1,
591 acked_tickets.len() as u64,
592 "invalid number of tickets to aggregate"
593 );
594 alice
595 .writer()
596 .receive_aggregation_request(bob_packet_key, acked_tickets.into_iter().collect(), ())?;
597 }
598 _ => panic!("unexpected action happened while sending agg request by Bob"),
599 };
600
601 match timeout(Duration::from_secs(5), alice.next()).await {
602 Ok(Some(TicketAggregationProcessed::Reply(_, aggregated_ticket, ()))) => {
603 bob.writer().receive_ticket(alice_packet_key, aggregated_ticket, ())?
604 }
605 _ => panic!("unexpected action happened while awaiting agg request at Alice"),
606 };
607
608 match timeout(Duration::from_secs(5), bob.next()).await {
609 Ok(Some(TicketAggregationProcessed::Receive(_destination, _acked_tkt, ()))) => {
610 finalizer.take().expect("finalizer should be present").finalize()
611 }
612 _ => panic!("unexpected action happened while awaiting agg response at Bob"),
613 }
614
615 pin_mut!(bob_notify_rx);
616 bob_notify_rx
617 .next()
618 .await
619 .expect("bob should have received the ticket notification");
620
621 let stored_acked_tickets = db_bob.get_tickets((&channel_alice_bob).into()).await?;
622
623 assert_eq!(
624 stored_acked_tickets.len(),
625 2,
626 "there should be 1 aggregated ticket and 1 ticket being redeemed"
627 );
628
629 assert_eq!(
630 AcknowledgedTicketStatus::BeingRedeemed,
631 stored_acked_tickets[0].status,
632 "first ticket must be being redeemed"
633 );
634 assert!(
635 stored_acked_tickets[1].verified_ticket().is_aggregated(),
636 "last ticket must be the aggregated one"
637 );
638 assert_eq!(
639 AcknowledgedTicketStatus::Untouched,
640 stored_acked_tickets[1].status,
641 "second ticket must be untouched"
642 );
643 assert_eq!(
644 agg_balance,
645 stored_acked_tickets[1].verified_ticket().amount,
646 "aggregated balance invalid"
647 );
648
649 Ok(awaiter.consume_and_wait(Duration::from_millis(2000)).await?)
650 }
651
652 #[tokio::test]
653 async fn test_ticket_aggregation_skip_lower_indices() -> anyhow::Result<()> {
654 let db_alice = HoprDb::new_in_memory(PEERS_CHAIN[0].clone()).await?;
655 let db_bob = HoprDb::new_in_memory(PEERS_CHAIN[1].clone()).await?;
656 init_db(db_alice.clone()).await?;
657 init_db(db_bob.clone()).await?;
658
659 let (bob_notify_tx, bob_notify_rx) = futures::channel::mpsc::unbounded();
660 db_bob.start_ticket_processing(bob_notify_tx.into())?;
661
662 const NUM_TICKETS: u64 = 30;
663 const CHANNEL_TICKET_IDX: u64 = 20;
664
665 let mut tickets = vec![];
666 let mut agg_balance = HoprBalance::zero();
667 for i in 1..=NUM_TICKETS {
669 let ack_ticket = mock_acknowledged_ticket(&PEERS_CHAIN[0], &PEERS_CHAIN[1], i)?;
670 if i >= CHANNEL_TICKET_IDX {
671 agg_balance += ack_ticket.verified_ticket().amount;
672 }
673 tickets.push(ack_ticket)
674 }
675
676 let alice_addr: Address = (&PEERS_CHAIN[0]).into();
677 let bob_addr: Address = (&PEERS_CHAIN[1]).into();
678
679 let alice_packet_key = PEERS[0].public().into();
680 let bob_packet_key = PEERS[1].public().into();
681
682 let channel_alice_bob = ChannelEntry::new(
683 alice_addr,
684 bob_addr,
685 agg_balance.mul(10),
686 CHANNEL_TICKET_IDX.into(),
687 ChannelStatus::Open,
688 1u32.into(),
689 );
690
691 db_alice.upsert_channel(None, channel_alice_bob).await?;
692 db_bob.upsert_channel(None, channel_alice_bob).await?;
693
694 for ticket in tickets.into_iter() {
695 db_bob.upsert_ticket(None, ticket).await?;
696 }
697
698 let mut alice = super::TicketAggregationInteraction::<(), ()>::new(db_alice.clone(), &PEERS_CHAIN[0]);
699 let mut bob = super::TicketAggregationInteraction::<(), ()>::new(db_bob.clone(), &PEERS_CHAIN[1]);
700
701 let awaiter = bob
702 .writer()
703 .aggregate_tickets(&channel_alice_bob.get_id(), Default::default())?;
704
705 let mut finalizer = None;
706 match timeout(Duration::from_secs(5), bob.next()).await {
707 Ok(Some(TicketAggregationProcessed::Send(_, acked_tickets, request_finalizer))) => {
708 let _ = finalizer.insert(request_finalizer);
709 assert_eq!(
710 NUM_TICKETS - CHANNEL_TICKET_IDX + 1,
711 acked_tickets.len() as u64,
712 "invalid number of tickets to aggregate"
713 );
714 alice
715 .writer()
716 .receive_aggregation_request(bob_packet_key, acked_tickets.into_iter().collect(), ())?;
717 }
718 _ => panic!("unexpected action happened while sending agg request by Bob"),
719 };
720
721 match timeout(Duration::from_secs(5), alice.next()).await {
722 Ok(Some(TicketAggregationProcessed::Reply(_, aggregated_ticket, ()))) => {
723 bob.writer().receive_ticket(alice_packet_key, aggregated_ticket, ())?
724 }
725 _ => panic!("unexpected action happened while awaiting agg request at Alice"),
726 };
727
728 match timeout(Duration::from_secs(5), bob.next()).await {
729 Ok(Some(TicketAggregationProcessed::Receive(_destination, _acked_tkt, ()))) => {
730 finalizer.take().expect("finalizer should be present").finalize()
731 }
732 _ => panic!("unexpected action happened while awaiting agg response at Bob"),
733 }
734
735 pin_mut!(bob_notify_rx);
736 bob_notify_rx
737 .next()
738 .await
739 .expect("bob should have received the ticket notification");
740
741 let stored_acked_tickets = db_bob.get_tickets((&channel_alice_bob).into()).await?;
742
743 assert_eq!(
744 stored_acked_tickets.len(),
745 20,
746 "there should be 1 aggregated ticket and 19 old tickets"
747 );
748
749 assert!(
750 stored_acked_tickets[19].verified_ticket().is_aggregated(),
751 "last ticket must be the aggregated one"
752 );
753 for (i, stored_acked_ticket) in stored_acked_tickets.iter().enumerate().take(19) {
754 assert_eq!(
755 AcknowledgedTicketStatus::Untouched,
756 stored_acked_ticket.status,
757 "ticket #{i} must be untouched"
758 );
759 }
760 assert_eq!(
761 agg_balance,
762 stored_acked_tickets[19].verified_ticket().amount,
763 "aggregated balance invalid"
764 );
765
766 Ok(awaiter.consume_and_wait(Duration::from_millis(2000)).await?)
767 }
768}