1use std::{
5 sync::atomic::AtomicBool,
6 time::{Duration, Instant},
7};
8
9use futures::{FutureExt, StreamExt, channel::mpsc::UnboundedSender};
10use futures_time::stream::StreamExt as TimeStreamExt;
11use tracing::Instrument;
12
13use crate::{
14 errors::SessionError,
15 processing::types::FrameInspector,
16 protocol::{FrameAcknowledgements, FrameId, Segment, SegmentId, SegmentRequest, SeqIndicator, SessionMessage},
17 socket::{SocketState, state::SocketComponents},
18 utils::{
19 RetriedFrameId, RingBufferProducer, RingBufferView, next_deadline_with_backoff, searchable_ringbuffer,
20 skip_queue::{Skip, SkipDelaySender, skip_delay_channel},
21 },
22};
23
24#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
26pub enum AcknowledgementMode {
27 Partial,
30 Full,
33 #[default]
41 Both,
42}
43
44impl AcknowledgementMode {
45 #[inline]
47 fn is_partial_ack_enabled(&self) -> bool {
48 matches!(self, Self::Partial | Self::Both)
49 }
50
51 #[inline]
53 fn is_full_ack_enabled(&self) -> bool {
54 matches!(self, Self::Full | Self::Both)
55 }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault)]
60pub struct AcknowledgementStateConfig {
61 pub mode: AcknowledgementMode,
65
66 #[default(Duration::from_millis(20))]
70 pub expected_packet_latency: Duration,
71
72 #[default(1.2)]
76 pub backoff_base: f64,
77
78 #[default(3)]
82 pub max_incoming_frame_retries: usize,
83
84 #[default(3)]
88 pub max_outgoing_frame_retries: usize,
89
90 #[default(Duration::from_millis(50))]
94 pub acknowledgement_delay: Duration,
95
96 #[default(16384)]
101 pub lookbehind_segments: usize,
102}
103
104impl AcknowledgementStateConfig {
105 fn normalize(self) -> AcknowledgementStateConfig {
106 Self {
107 mode: self.mode,
108 expected_packet_latency: self.expected_packet_latency.max(Duration::from_millis(1)),
109 backoff_base: self.backoff_base.max(1.0),
110 max_incoming_frame_retries: self.max_incoming_frame_retries,
111 max_outgoing_frame_retries: self.max_outgoing_frame_retries,
112 acknowledgement_delay: self.acknowledgement_delay.max(Duration::from_millis(1)),
113 lookbehind_segments: self.lookbehind_segments.max(1024),
114 }
115 }
116}
117
118#[derive(Clone)]
119struct AcknowledgementStateContext<const C: usize> {
120 rb_tx: RingBufferProducer<Segment>,
121 rb_rx: RingBufferView<Segment>,
122 incoming_frame_retries_tx: SkipDelaySender<RetriedFrameId>,
123 outgoing_frame_retries_tx: SkipDelaySender<RetriedFrameId>,
124 ack_tx: futures::channel::mpsc::Sender<FrameId>,
125 inspector: FrameInspector,
126 ctl_tx: UnboundedSender<SessionMessage<C>>,
127}
128
129#[cfg_attr(doc, aquamarine::aquamarine)]
130#[derive(Clone)]
219pub struct AcknowledgementState<const C: usize> {
220 id: String,
221 cfg: AcknowledgementStateConfig,
222 context: Option<AcknowledgementStateContext<C>>,
223 started: std::sync::Arc<AtomicBool>,
224}
225
226impl<const C: usize> AcknowledgementState<C> {
227 pub fn new<I: std::fmt::Display>(session_id: I, cfg: AcknowledgementStateConfig) -> Self {
228 Self {
229 id: session_id.to_string(),
230 cfg: cfg.normalize(),
231 context: Default::default(),
232 started: std::sync::Arc::new(AtomicBool::new(false)),
233 }
234 }
235}
236
237impl<const C: usize> SocketState<C> for AcknowledgementState<C> {
238 fn session_id(&self) -> &str {
239 &self.id
240 }
241
242 #[tracing::instrument(name = "AcknowledgementState", skip(self, socket_components), fields(session_id = self.id))]
243 fn run(&mut self, socket_components: SocketComponents<C>) -> Result<(), SessionError> {
244 if self.started.load(std::sync::atomic::Ordering::Relaxed) && self.context.is_some() {
245 return Err(SessionError::InvalidState("state is already running".into()));
246 }
247
248 let (incoming_frame_retries_tx, incoming_frame_retries_rx) = skip_delay_channel();
249 let (outgoing_frame_retries_tx, outgoing_frame_retries_rx) = skip_delay_channel();
250 let (rb_tx, rb_rx) = searchable_ringbuffer(self.cfg.lookbehind_segments);
251
252 let (ack_tx, ack_rx) = futures::channel::mpsc::channel(2 * self.cfg.lookbehind_segments);
254
255 let context = self.context.insert(AcknowledgementStateContext {
256 rb_tx,
257 rb_rx,
258 incoming_frame_retries_tx,
259 outgoing_frame_retries_tx,
260 ack_tx,
261 ctl_tx: socket_components.ctl_tx,
262 inspector: socket_components
263 .inspector
264 .ok_or(SessionError::InvalidState("inspector is not available".into()))?,
265 });
266
267 if self.cfg.mode.is_partial_ack_enabled() {
268 let mut incoming_frame_retries_tx_clone = context.incoming_frame_retries_tx.clone();
271 let ctl_tx_clone = context.ctl_tx.clone();
272 let frame_inspector_clone = context.inspector.clone();
273 let cfg = self.cfg;
274 hopr_async_runtime::prelude::spawn(incoming_frame_retries_rx
275 .filter_map(move |rf| {
276 let frame_id = rf.frame_id;
277 let missing_segments = frame_inspector_clone.missing_segments(&frame_id).unwrap_or_default();
278 if !missing_segments.is_empty() {
279 if let Some(next) = rf.next() {
281 let retry_at = next_deadline_with_backoff(next.retry_count, cfg.backoff_base, cfg.expected_packet_latency);
283 if let Err(error) = incoming_frame_retries_tx_clone.send_one((next, retry_at)) {
284 tracing::error!(frame_id, %error, "failed to register next resend of incoming frame");
285 } else {
286 tracing::debug!(frame_id, retry_in = ?retry_at.saturating_duration_since(Instant::now()), "next resend request of incoming frame segments");
287 }
288 } else {
289 tracing::debug!(frame_id, "last request of incoming frame segments");
290 }
291
292 futures::future::ready(Some((frame_id, missing_segments)))
293 } else {
294 tracing::debug!(frame_id, "no more missing segments in frame");
295 futures::future::ready(None)
296 }
297 })
298 .ready_chunks(SegmentRequest::<C>::MAX_ENTRIES)
299 .inspect(|r| tracing::trace!(req = ?r, "requesting segments resend"))
300 .map(|a| Ok(SessionMessage::<C>::Request(a.into_iter().collect())))
301 .forward(ctl_tx_clone)
302 .map(move |res| match res {
303 Ok(_) => tracing::debug!("incoming frame resends processing done"),
304 Err(error) => tracing::error!(%error, "error while processing incoming frame resends")
305 })
306 .instrument(tracing::debug_span!("incoming_frame_retries_sender"))
307 );
308 }
309
310 let ctl_tx_clone = context.ctl_tx.clone();
312 let ack_delay = self.cfg.acknowledgement_delay;
313 hopr_async_runtime::prelude::spawn(
314 ack_rx
315 .buffer(futures_time::time::Duration::from(ack_delay))
316 .flat_map(|acks| futures::stream::iter(FrameAcknowledgements::<C>::new_multiple(acks)))
317 .filter(|acks| futures::future::ready(!acks.is_empty()))
318 .inspect(|acks| tracing::trace!(?acks, "acknowledgements sent"))
319 .map(|acks| Ok(SessionMessage::<C>::Acknowledge(acks)))
320 .forward(ctl_tx_clone)
321 .map(move |res| match res {
322 Ok(_) => tracing::debug!("acknowledgement forwarding done"),
323 Err(error) => tracing::debug!(%error, "acknowledgement forwarding failed"),
324 })
325 .instrument(tracing::debug_span!("acknowledgement_sender")),
326 );
327
328 let mut outgoing_frame_retries_tx_clone = context.outgoing_frame_retries_tx.clone();
330 let ctl_tx_clone = context.ctl_tx.clone();
331 let rb_rx_clone = context.rb_rx.clone();
332 let cfg = self.cfg;
333 hopr_async_runtime::prelude::spawn(
334 outgoing_frame_retries_rx
335 .map(move |rf: RetriedFrameId| {
336 let frame_id = rf.frame_id;
338 if let Some(next) = rf.next() {
339 let retry_at =
341 next_deadline_with_backoff(next.retry_count, cfg.backoff_base, cfg.expected_packet_latency);
342 if let Err(error) = outgoing_frame_retries_tx_clone.send_one((next, retry_at)) {
343 tracing::error!(frame_id, %error, "failed to register next retry of frame");
344 } else {
345 tracing::debug!(frame_id, retry_in = ?retry_at.saturating_duration_since(Instant::now()), "next resend of outgoing frame");
346 }
347 } else {
348 tracing::debug!(frame_id, "last outgoing retry of frame");
349 }
350 tracing::trace!(frame_id, "going to re-send entire frame");
351 frame_id
352 })
353 .flat_map(move |frame_id| {
354 futures::stream::iter(
356 rb_rx_clone
357 .find(|s: &Segment| s.id().0 == frame_id)
358 .into_iter()
359 .inspect(|s| tracing::trace!(seg_id = %s.id(), "segment retransmit"))
360 .map(|s| Ok(SessionMessage::<C>::Segment(s))),
361 )
362 })
363 .forward(ctl_tx_clone) .map(move |res| match res {
365 Ok(_) => tracing::debug!("outgoing frame retries processing done"),
366 Err(error) => tracing::error!(%error, "error while processing outgoing frame retries"),
367 })
368 .instrument(tracing::debug_span!("outgoing_frame_retries_sender")),
369 );
370
371 tracing::debug!("acknowledgement state has been started");
372 self.started.store(true, std::sync::atomic::Ordering::Relaxed);
373
374 Ok(())
375 }
376
377 #[tracing::instrument(name = "AcknowledgementState::stop", skip(self), fields(session_id = self.id))]
378 fn stop(&mut self) -> Result<(), SessionError> {
379 if let Some(mut ctx) = self.context.take() {
380 ctx.outgoing_frame_retries_tx.force_close();
381 ctx.incoming_frame_retries_tx.force_close();
382 ctx.ack_tx.close_channel();
383 ctx.ctl_tx.close_channel();
384
385 self.started.store(false, std::sync::atomic::Ordering::Relaxed);
386 tracing::debug!("state has been stopped");
387 } else {
388 tracing::warn!("cannot be stopped, because it is not running");
389 }
390
391 Ok(())
392 }
393
394 #[tracing::instrument(name = "AcknowledgementState::incoming_segment", skip(self), fields(session_id = self.id, frame_id = seg_id.0))]
395 fn incoming_segment(&mut self, seg_id: &SegmentId, _ind: SeqIndicator) -> Result<(), SessionError> {
396 tracing::trace!("segment received");
397
398 let ctx = self
399 .started
400 .load(std::sync::atomic::Ordering::Relaxed)
401 .then_some(self.context.as_mut())
402 .flatten()
403 .ok_or(SessionError::StateNotRunning)?;
404
405 if self.cfg.mode.is_partial_ack_enabled() {
407 if let Err(error) = ctx.incoming_frame_retries_tx.send_one((
410 RetriedFrameId::with_retries(seg_id.0, self.cfg.max_incoming_frame_retries),
411 self.cfg.expected_packet_latency, )) {
413 tracing::error!(%error, "failed to register incoming retry for frame");
414 }
415 }
416 Ok(())
417 }
418
419 #[tracing::instrument(name = "AcknowledgementState::incoming_retransmission_request", skip(self, request), fields(session_id = self.id))]
420 fn incoming_retransmission_request(&mut self, request: SegmentRequest<C>) -> Result<(), SessionError> {
421 tracing::trace!(count = request.len(), "segment retransmission requested");
424
425 let ctx = self
426 .started
427 .load(std::sync::atomic::Ordering::Relaxed)
428 .then_some(self.context.as_mut())
429 .flatten()
430 .ok_or(SessionError::StateNotRunning)?;
431
432 let (mut missing_seg_ids, mut missing_frame_ids): (Vec<_>, Vec<_>) =
433 request.into_iter().map(|s| (s, s.0)).unzip();
434
435 let segments = ctx.rb_rx.find(|s| {
437 if let Ok(i) = missing_seg_ids.binary_search(&s.id()) {
439 missing_seg_ids.remove(i);
440 true
441 } else {
442 false
443 }
444 });
445
446 tracing::trace!(
447 found = segments.len(),
448 requested = missing_frame_ids.len(),
449 "found matching segments to be retransmitted"
450 );
451
452 if self.cfg.mode.is_full_ack_enabled() {
455 missing_frame_ids.dedup();
457
458 if let Err(error) = ctx.outgoing_frame_retries_tx.send_many(
459 missing_frame_ids
460 .into_iter()
461 .map(|frame_id| (RetriedFrameId::no_retries(frame_id), Skip).into()),
462 ) {
463 tracing::error!(%error, "failed to cancel frame resend of partially acknowledged frames");
464 }
465 }
466
467 segments
469 .into_iter()
470 .try_for_each(|s| {
471 tracing::trace!(seg_id = %s.id(), "retransmit segment on request");
472 ctx.ctl_tx.unbounded_send(SessionMessage::Segment(s))
473 })
474 .map_err(|e| SessionError::ProcessingError(e.to_string()))
475 }
476
477 #[tracing::instrument(name = "AcknowledgementState::incoming_acknowledged_frames", skip(self), fields(session_id = self.id))]
478 fn incoming_acknowledged_frames(&mut self, ack: FrameAcknowledgements<C>) -> Result<(), SessionError> {
479 tracing::trace!(count = ack.len(), "frame acknowledgements received");
480
481 let ctx = self
482 .started
483 .load(std::sync::atomic::Ordering::Relaxed)
484 .then_some(self.context.as_mut())
485 .flatten()
486 .ok_or(SessionError::StateNotRunning)?;
487
488 if self.cfg.mode.is_full_ack_enabled()
490 && let Err(error) = ctx.outgoing_frame_retries_tx.send_many(
491 ack.into_iter()
492 .inspect(|frame_id| tracing::trace!(frame_id, "frame acknowledged"))
493 .map(|frame_id| (RetriedFrameId::no_retries(frame_id), Skip).into()),
494 )
495 {
496 tracing::error!(%error, "failed to cancel frame resend");
497 }
498
499 Ok(())
500 }
501
502 #[tracing::instrument(name = "AcknowledgementState::frame_complete", skip(self), fields(session_id = self.id))]
503 fn frame_complete(&mut self, frame_id: FrameId) -> Result<(), SessionError> {
504 tracing::trace!("frame complete");
505
506 let ctx = self
507 .started
508 .load(std::sync::atomic::Ordering::Relaxed)
509 .then_some(self.context.as_mut())
510 .flatten()
511 .ok_or(SessionError::StateNotRunning)?;
512
513 if let Err(error) = ctx.ack_tx.try_send(frame_id) {
515 tracing::error!(%error, "failed to acknowledge frame");
516 }
517
518 if self.cfg.mode.is_partial_ack_enabled() {
519 if let Err(error) = ctx
521 .incoming_frame_retries_tx
522 .send_one((RetriedFrameId::no_retries(frame_id), Skip))
523 {
524 tracing::error!(%error, "failed to cancel retry of acknowledged frame");
525 }
526 }
527
528 Ok(())
529 }
530
531 #[tracing::instrument(name = "AcknowledgementState::frame_emitted", skip(self), fields(session_id = self.id))]
532 fn frame_emitted(&mut self, id: FrameId) -> Result<(), SessionError> {
533 tracing::trace!("frame emitted");
534 let _ = self
535 .started
536 .load(std::sync::atomic::Ordering::Relaxed)
537 .then_some(self.context.as_mut())
538 .flatten()
539 .ok_or(SessionError::StateNotRunning)?;
540 Ok(())
541 }
542
543 #[tracing::instrument(name = "AcknowledgementState::frame_discarded", skip(self), fields(session_id = self.id))]
544 fn frame_discarded(&mut self, frame_id: FrameId) -> Result<(), SessionError> {
545 tracing::trace!("frame discarded");
546
547 let ctx = self
548 .started
549 .load(std::sync::atomic::Ordering::Relaxed)
550 .then_some(self.context.as_mut())
551 .flatten()
552 .ok_or(SessionError::StateNotRunning)?;
553
554 if self.cfg.mode.is_partial_ack_enabled() {
555 if let Err(error) = ctx
557 .incoming_frame_retries_tx
558 .send_one((RetriedFrameId::no_retries(frame_id), Skip))
559 {
560 tracing::error!(%error, "failed to cancel retry of acknowledged frame");
561 }
562 }
563
564 Ok(())
565 }
566
567 #[tracing::instrument(name = "AcknowledgementState::segment_sent", skip(self, segment), fields(session_id = self.id, frame_id = segment.frame_id, seq_idx = segment.seq_idx))]
568 fn segment_sent(&mut self, segment: &Segment) -> Result<(), SessionError> {
569 tracing::trace!("segment sent");
570
571 let ctx = self
572 .started
573 .load(std::sync::atomic::Ordering::Relaxed)
574 .then_some(self.context.as_mut())
575 .flatten()
576 .ok_or(SessionError::StateNotRunning)?;
577
578 if !ctx.rb_tx.push(segment.clone()) {
581 tracing::error!("failed to push segment into ring buffer");
582 }
583
584 if segment.is_last() && self.cfg.mode.is_full_ack_enabled() {
587 tracing::trace!("last segment of frame sent");
588
589 if let Err(error) = ctx.outgoing_frame_retries_tx.send_one((
590 RetriedFrameId::with_retries(segment.frame_id, self.cfg.max_outgoing_frame_retries),
591 self.cfg.expected_packet_latency * (segment.seq_flags.seq_len() + 1) as u32,
596 )) {
597 tracing::error!(%error, "failed to insert outgoing retry of a frame");
598 }
599 }
600
601 Ok(())
602 }
603}
604
605#[cfg(test)]
606mod tests {
607 use anyhow::Context;
608
609 use super::*;
610 use crate::{
611 processing::types::{FrameBuilder, FrameDashMap, FrameMap},
612 protocol::SeqNum,
613 utils::segment,
614 };
615
616 const FRAME_SIZE: usize = 1500;
617
618 const MTU: usize = 1000;
619
620 #[test_log::test(tokio::test)]
621 async fn ack_state_sender_must_acknowledge_completed_frames() -> anyhow::Result<()> {
622 let cfg = AcknowledgementStateConfig {
623 expected_packet_latency: Duration::from_millis(10),
624 acknowledgement_delay: Duration::from_millis(2),
625 ..Default::default()
626 };
627
628 let inspector = FrameInspector(FrameDashMap::with_capacity(10));
629 let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
630
631 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
632 state.run(SocketComponents {
633 inspector: inspector.into(),
634 ctl_tx,
635 })?;
636
637 let acked_frame_ids = [1, 2, 3];
638
639 for &frame_id in &acked_frame_ids {
640 state.frame_complete(frame_id)?;
641 }
642
643 tokio::time::sleep(cfg.acknowledgement_delay * 2).await;
644
645 state.stop()?;
646
647 let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
648 .await
649 .context("timeout receiving Control messages")?;
650
651 assert_eq!(1, ctl_msgs.len());
652
653 assert_eq!(
654 ctl_msgs[0],
655 SessionMessage::Acknowledge(acked_frame_ids.to_vec().try_into()?)
656 );
657
658 Ok(())
659 }
660
661 #[parameterized::parameterized(num_frames = { 1, 2, 3 })]
662 #[parameterized_macro(test_log::test(tokio::test))]
663 async fn ack_state_sender_must_resend_unacknowledged_frames(num_frames: usize) -> anyhow::Result<()> {
664 const NUM_RETRIES: usize = 2;
665
666 let cfg = AcknowledgementStateConfig {
667 mode: AcknowledgementMode::Full,
668 expected_packet_latency: Duration::from_millis(2),
669 max_outgoing_frame_retries: NUM_RETRIES,
670 ..Default::default()
671 };
672
673 let inspector = FrameInspector(FrameDashMap::with_capacity(10));
674 let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
675
676 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
677 state.run(SocketComponents {
678 inspector: inspector.into(),
679 ctl_tx,
680 })?;
681
682 let mut expected_frame_segments = Vec::new();
683 let num_segments_in_frame = FRAME_SIZE / MTU + 1;
684 for i in 1..=num_frames {
685 let expected_segments = segment(hopr_crypto_random::random_bytes::<FRAME_SIZE>(), MTU, i as FrameId)?;
686 for segment in &expected_segments {
687 state.segment_sent(segment)?;
688 }
689 expected_frame_segments.push(expected_segments);
690 }
691
692 let expected_frame_delivery = cfg.expected_packet_latency * (num_segments_in_frame + 1) as u32;
693 tokio::time::sleep(2 * expected_frame_delivery).await;
694 state.stop()?;
695
696 let ctl_msg = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
697 .await
698 .context("timeout receiving Control message")?;
699
700 let retransmitted_segments = ctl_msg
701 .into_iter()
702 .map(|m| m.try_as_segment().ok_or(anyhow::anyhow!("must be segment")))
703 .collect::<Result<Vec<_>, _>>()?;
704
705 assert_eq!(
706 NUM_RETRIES * num_segments_in_frame * num_frames,
707 retransmitted_segments.len()
708 );
709
710 let total_segments = expected_frame_segments.iter().map(|m| m.len()).sum::<usize>();
711 let expected_segments = expected_frame_segments
712 .into_iter()
713 .flatten()
714 .cycle()
715 .take(total_segments * NUM_RETRIES)
716 .collect::<Vec<_>>();
717 assert_eq!(expected_segments, retransmitted_segments);
718
719 Ok(())
720 }
721
722 #[test_log::test(tokio::test)]
723 async fn ack_state_sender_must_not_resend_unacknowledged_frame_when_full_resend_disabled() -> anyhow::Result<()> {
724 const NUM_RETRIES: usize = 2;
725
726 let cfg = AcknowledgementStateConfig {
727 mode: AcknowledgementMode::Partial,
728 expected_packet_latency: Duration::from_millis(2),
729 max_outgoing_frame_retries: NUM_RETRIES,
730 ..Default::default()
731 };
732
733 let inspector = FrameInspector(FrameDashMap::with_capacity(10));
734 let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
735
736 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
737 state.run(SocketComponents {
738 inspector: inspector.into(),
739 ctl_tx,
740 })?;
741
742 let expected_segments = segment(hopr_crypto_random::random_bytes::<FRAME_SIZE>(), MTU, 1)?;
743 for segment in &expected_segments {
744 state.segment_sent(segment)?;
745 }
746
747 let expected_frame_delivery = cfg.expected_packet_latency * (expected_segments.len() + 1) as u32;
748 tokio::time::sleep(2 * expected_frame_delivery).await;
749 state.stop()?;
750
751 assert!(ctl_rx.collect::<Vec<_>>().await.is_empty());
753
754 Ok(())
755 }
756
757 #[tokio::test]
758 async fn ack_state_sender_must_not_resend_acknowledged_frame() -> anyhow::Result<()> {
759 let cfg = AcknowledgementStateConfig {
760 mode: AcknowledgementMode::Full,
761 expected_packet_latency: Duration::from_millis(2),
762 max_outgoing_frame_retries: 1,
763 ..Default::default()
764 };
765
766 let inspector = FrameInspector(FrameDashMap::with_capacity(10));
767 let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
768
769 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
770 state.run(SocketComponents {
771 inspector: inspector.into(),
772 ctl_tx,
773 })?;
774
775 let expected_segments = segment(hopr_crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 1)?;
776 for segment in &expected_segments {
777 state.segment_sent(segment)?;
778 }
779
780 state.incoming_acknowledged_frames(vec![1].try_into()?)?;
782
783 tokio::time::sleep(10 * cfg.expected_packet_latency).await;
784
785 state.stop()?;
786
787 assert!(ctl_rx.collect::<Vec<_>>().await.is_empty());
789
790 Ok(())
791 }
792
793 #[test_log::test(tokio::test)]
794 async fn ack_state_sender_must_not_resend_entire_frame_when_already_partially_acknowledged() -> anyhow::Result<()> {
795 let cfg = AcknowledgementStateConfig {
796 mode: AcknowledgementMode::Full,
797 expected_packet_latency: Duration::from_millis(2),
798 max_outgoing_frame_retries: 1,
799 ..Default::default()
800 };
801
802 let inspector = FrameInspector(FrameDashMap::with_capacity(10));
803 let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
804
805 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
806 state.run(SocketComponents {
807 inspector: inspector.into(),
808 ctl_tx,
809 })?;
810
811 let expected_segments = segment(hopr_crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 1)?;
812
813 for segment in &expected_segments {
815 state.segment_sent(segment)?;
816 }
817
818 tokio::time::sleep(cfg.expected_packet_latency).await;
819
820 state.incoming_retransmission_request(SegmentRequest::from_iter([(1, [0b10000000].into())]))?;
822
823 state.stop()?;
824
825 let ctl_msgs = ctl_rx.collect::<Vec<_>>().await;
827 assert_eq!(1, ctl_msgs.len());
828 assert_eq!(ctl_msgs[0], SessionMessage::Segment(expected_segments[0].clone()));
829
830 Ok(())
831 }
832
833 #[test_log::test(tokio::test)]
834 async fn ack_state_sender_must_retransmit_segments_when_requested() -> anyhow::Result<()> {
835 let cfg = AcknowledgementStateConfig {
836 mode: AcknowledgementMode::Full,
837 expected_packet_latency: Duration::from_millis(2),
838 max_outgoing_frame_retries: 1,
839 ..Default::default()
840 };
841
842 let inspector = FrameInspector(FrameDashMap::with_capacity(10));
843 let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
844
845 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
846 state.run(SocketComponents {
847 inspector: inspector.into(),
848 ctl_tx,
849 })?;
850
851 let expected_segments_1 = segment(hopr_crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 1)?;
852 for segment in &expected_segments_1 {
854 state.segment_sent(segment)?;
855 }
856
857 let expected_segments_2 = segment(hopr_crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 2)?;
858 for segment in &expected_segments_2 {
860 state.segment_sent(segment)?;
861 }
862
863 tokio::time::sleep(cfg.expected_packet_latency).await;
864
865 state.incoming_retransmission_request(SegmentRequest::from_iter([
867 (1, [0b11100000].into()),
868 (2, [0b11100000].into()),
869 ]))?;
870 tokio::time::sleep(cfg.expected_packet_latency).await;
871
872 state.incoming_retransmission_request(SegmentRequest::from_iter([(2, [0b11000000].into())]))?;
873 tokio::time::sleep(cfg.expected_packet_latency).await;
874
875 state.incoming_retransmission_request(SegmentRequest::from_iter([(2, [0b01000000].into())]))?;
876 tokio::time::sleep(cfg.expected_packet_latency).await;
877
878 state.stop()?;
879
880 let ctl_msgs = ctl_rx.collect::<Vec<_>>().await;
881
882 assert_eq!(9, ctl_msgs.len());
883 assert_eq!(ctl_msgs[0], SessionMessage::Segment(expected_segments_1[0].clone()));
885 assert_eq!(ctl_msgs[1], SessionMessage::Segment(expected_segments_1[1].clone()));
886 assert_eq!(ctl_msgs[2], SessionMessage::Segment(expected_segments_1[2].clone()));
887 assert_eq!(ctl_msgs[3], SessionMessage::Segment(expected_segments_2[0].clone()));
889 assert_eq!(ctl_msgs[4], SessionMessage::Segment(expected_segments_2[1].clone()));
890 assert_eq!(ctl_msgs[5], SessionMessage::Segment(expected_segments_2[2].clone()));
891
892 assert_eq!(ctl_msgs[6], SessionMessage::Segment(expected_segments_2[0].clone()));
894 assert_eq!(ctl_msgs[7], SessionMessage::Segment(expected_segments_2[1].clone()));
895
896 assert_eq!(ctl_msgs[8], SessionMessage::Segment(expected_segments_2[1].clone()));
898
899 Ok(())
900 }
901
902 #[tokio::test]
903 async fn ack_state_receiver_must_request_missing_frames_when_partial_acks_are_enabled() -> anyhow::Result<()> {
904 let cfg = AcknowledgementStateConfig {
905 mode: AcknowledgementMode::Partial,
906 expected_packet_latency: Duration::from_millis(2),
907 max_incoming_frame_retries: 1,
908 ..Default::default()
909 };
910
911 let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
912 let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
913
914 let segments = segment(hopr_crypto_random::random_bytes::<FRAME_SIZE>(), MTU, 1)?;
915
916 inspector
917 .0
918 .entry(1)
919 .try_as_vacant()
920 .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
921 .insert(FrameBuilder::from(segments[0].clone()));
922
923 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
924 state.run(SocketComponents {
925 inspector: inspector.into(),
926 ctl_tx,
927 })?;
928
929 state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
930
931 tokio::time::sleep(cfg.expected_packet_latency * 2).await;
932
933 state.stop()?;
934
935 let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
936 .await
937 .context("timeout receiving Control messages")?;
938
939 assert_eq!(1, ctl_msgs.len());
940 assert_eq!(
941 ctl_msgs[0],
942 SessionMessage::Request(SegmentRequest::from_iter([(1, [0b01000000].into())]))
943 );
944
945 Ok(())
946 }
947
948 #[tokio::test]
949 async fn ack_state_receiver_must_not_request_missing_frames_when_partial_acks_are_disabled() -> anyhow::Result<()> {
950 let cfg = AcknowledgementStateConfig {
951 mode: AcknowledgementMode::Full,
952 expected_packet_latency: Duration::from_millis(2),
953 max_incoming_frame_retries: 1,
954 ..Default::default()
955 };
956
957 let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
958 let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
959
960 let segments = segment(hopr_crypto_random::random_bytes::<FRAME_SIZE>(), MTU, 1)?;
961
962 inspector
963 .0
964 .entry(1)
965 .try_as_vacant()
966 .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
967 .insert(FrameBuilder::from(segments[0].clone()));
968
969 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
970 state.run(SocketComponents {
971 inspector: inspector.into(),
972 ctl_tx,
973 })?;
974
975 state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
976
977 tokio::time::sleep(cfg.expected_packet_latency * 2).await;
978
979 state.stop()?;
980
981 let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
982 .await
983 .context("timeout receiving Control messages")?;
984
985 assert!(ctl_msgs.iter().all(|m| !matches!(m, SessionMessage::Request(_))));
986
987 Ok(())
988 }
989
990 #[tokio::test]
991 async fn ack_state_receiver_must_continue_requesting_missing_frames_when_frame_not_completed() -> anyhow::Result<()>
992 {
993 let cfg = AcknowledgementStateConfig {
994 mode: AcknowledgementMode::Partial,
995 expected_packet_latency: Duration::from_millis(2),
996 max_incoming_frame_retries: 3,
997 ..Default::default()
998 };
999
1000 let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
1001 let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
1002
1003 let segments = segment(hopr_crypto_random::random_bytes::<{ 2 * FRAME_SIZE }>(), MTU, 1)?;
1004
1005 inspector
1006 .0
1007 .entry(1)
1008 .try_as_vacant()
1009 .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
1010 .insert(FrameBuilder::from(segments[0].clone()));
1011
1012 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
1013 state.run(SocketComponents {
1014 inspector: inspector.clone().into(),
1015 ctl_tx,
1016 })?;
1017
1018 state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
1019
1020 tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1021
1022 inspector
1023 .0
1024 .entry(1)
1025 .try_as_occupied()
1026 .ok_or(anyhow::anyhow!("frame 1 must be occupied"))?
1027 .get_mut()
1028 .add_segment(segments[1].clone())?;
1029
1030 state.incoming_segment(&segments[1].id(), (segments.len() as SeqNum).try_into()?)?;
1031
1032 tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1033
1034 state.stop()?;
1035
1036 let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
1037 .await
1038 .context("timeout receiving Control messages")?;
1039
1040 assert_eq!(2, ctl_msgs.len());
1041 assert_eq!(
1042 ctl_msgs[0],
1043 SessionMessage::Request(SegmentRequest::from_iter([(1, [0b01100000].into())]))
1044 );
1045
1046 assert_eq!(
1047 ctl_msgs[1],
1048 SessionMessage::Request(SegmentRequest::from_iter([(1, [0b00100000].into())]))
1049 );
1050
1051 Ok(())
1052 }
1053
1054 #[tokio::test]
1055 async fn ack_state_receiver_must_continue_requesting_missing_frames_and_acknowledge_once_complete()
1056 -> anyhow::Result<()> {
1057 let cfg = AcknowledgementStateConfig {
1058 mode: AcknowledgementMode::Partial,
1059 expected_packet_latency: Duration::from_millis(2),
1060 max_incoming_frame_retries: 3,
1061 acknowledgement_delay: Duration::from_millis(5),
1062 ..Default::default()
1063 };
1064
1065 let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
1066 let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
1067
1068 let segments = segment(hopr_crypto_random::random_bytes::<{ 2 * FRAME_SIZE }>(), MTU, 1)?;
1069
1070 inspector
1072 .0
1073 .entry(1)
1074 .try_as_vacant()
1075 .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
1076 .insert(FrameBuilder::from(segments[0].clone()));
1077
1078 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
1079 state.run(SocketComponents {
1080 inspector: inspector.clone().into(),
1081 ctl_tx,
1082 })?;
1083
1084 state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
1086
1087 tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1088
1089 inspector
1090 .0
1091 .entry(1)
1092 .try_as_occupied()
1093 .ok_or(anyhow::anyhow!("frame 1 must be occupied"))?
1094 .get_mut()
1095 .add_segment(segments[1].clone())?;
1096
1097 state.incoming_segment(&segments[1].id(), (segments.len() as SeqNum).try_into()?)?;
1098
1099 tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1100
1101 inspector
1103 .0
1104 .entry(1)
1105 .try_as_occupied()
1106 .ok_or(anyhow::anyhow!("frame 1 must be occupied"))?
1107 .get_mut()
1108 .add_segment(segments[2].clone())?;
1109
1110 state.incoming_segment(&segments[2].id(), (segments.len() as SeqNum).try_into()?)?;
1111 state.frame_complete(1)?;
1112
1113 tokio::time::sleep(cfg.acknowledgement_delay * 2).await;
1114
1115 state.stop()?;
1116
1117 let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
1118 .await
1119 .context("timeout receiving Control messages")?;
1120
1121 assert_eq!(3, ctl_msgs.len());
1122 assert_eq!(
1123 ctl_msgs[0],
1124 SessionMessage::Request(SegmentRequest::from_iter([(1, [0b01100000].into())]))
1125 );
1126
1127 assert_eq!(
1128 ctl_msgs[1],
1129 SessionMessage::Request(SegmentRequest::from_iter([(1, [0b00100000].into())]))
1130 );
1131
1132 assert_eq!(ctl_msgs[2], SessionMessage::Acknowledge(vec![1].try_into()?));
1133
1134 Ok(())
1135 }
1136}