1use std::{
5 sync::atomic::AtomicBool,
6 time::{Duration, Instant},
7};
8
9use futures::{FutureExt, StreamExt, channel::mpsc::Sender};
10use futures_time::stream::StreamExt as TimeStreamExt;
11use tracing::Instrument;
12
13use crate::{
14 errors::SessionError,
15 processing::types::FrameInspector,
16 protocol::{FrameAcknowledgements, FrameId, Segment, SegmentId, SegmentRequest, SeqIndicator, SessionMessage},
17 socket::{SocketState, state::SocketComponents},
18 utils::{
19 RetriedFrameId, RingBufferProducer, RingBufferView, next_deadline_with_backoff, searchable_ringbuffer,
20 skip_queue::{Skip, SkipDelaySender, skip_delay_channel},
21 },
22};
23
24#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
26pub enum AcknowledgementMode {
27 Partial,
30 Full,
33 #[default]
41 Both,
42}
43
44impl AcknowledgementMode {
45 #[inline]
47 fn is_partial_ack_enabled(&self) -> bool {
48 matches!(self, Self::Partial | Self::Both)
49 }
50
51 #[inline]
53 fn is_full_ack_enabled(&self) -> bool {
54 matches!(self, Self::Full | Self::Both)
55 }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault)]
60pub struct AcknowledgementStateConfig {
61 pub mode: AcknowledgementMode,
65
66 #[default(Duration::from_millis(20))]
70 pub expected_packet_latency: Duration,
71
72 #[default(1.2)]
76 pub backoff_base: f64,
77
78 #[default(3)]
82 pub max_incoming_frame_retries: usize,
83
84 #[default(3)]
88 pub max_outgoing_frame_retries: usize,
89
90 #[default(Duration::from_millis(50))]
94 pub acknowledgement_delay: Duration,
95
96 #[default(16384)]
101 pub lookbehind_segments: usize,
102}
103
104impl AcknowledgementStateConfig {
105 fn normalize(self) -> AcknowledgementStateConfig {
106 Self {
107 mode: self.mode,
108 expected_packet_latency: self.expected_packet_latency.max(Duration::from_millis(1)),
109 backoff_base: self.backoff_base.max(1.0),
110 max_incoming_frame_retries: self.max_incoming_frame_retries,
111 max_outgoing_frame_retries: self.max_outgoing_frame_retries,
112 acknowledgement_delay: self.acknowledgement_delay.max(Duration::from_millis(1)),
113 lookbehind_segments: self.lookbehind_segments.max(1024),
114 }
115 }
116}
117
118#[derive(Clone)]
119struct AcknowledgementStateContext<const C: usize> {
120 rb_tx: RingBufferProducer<Segment>,
121 rb_rx: RingBufferView<Segment>,
122 incoming_frame_retries_tx: SkipDelaySender<RetriedFrameId>,
123 outgoing_frame_retries_tx: SkipDelaySender<RetriedFrameId>,
124 ack_tx: futures::channel::mpsc::Sender<FrameId>,
125 inspector: FrameInspector,
126 ctl_tx: Sender<SessionMessage<C>>,
127}
128
129#[cfg_attr(doc, aquamarine::aquamarine)]
130#[derive(Clone)]
219pub struct AcknowledgementState<const C: usize> {
220 id: String,
221 cfg: AcknowledgementStateConfig,
222 context: Option<AcknowledgementStateContext<C>>,
223 started: std::sync::Arc<AtomicBool>,
224}
225
226impl<const C: usize> AcknowledgementState<C> {
227 pub fn new<I: std::fmt::Display>(session_id: I, cfg: AcknowledgementStateConfig) -> Self {
228 Self {
229 id: session_id.to_string(),
230 cfg: cfg.normalize(),
231 context: Default::default(),
232 started: std::sync::Arc::new(AtomicBool::new(false)),
233 }
234 }
235}
236
237impl<const C: usize> SocketState<C> for AcknowledgementState<C> {
238 fn session_id(&self) -> &str {
239 &self.id
240 }
241
242 #[tracing::instrument(name = "AcknowledgementState", skip(self, socket_components), fields(session_id = self.id))]
243 fn run(&mut self, socket_components: SocketComponents<C>) -> Result<(), SessionError> {
244 if self.started.load(std::sync::atomic::Ordering::Relaxed) && self.context.is_some() {
245 return Err(SessionError::InvalidState("state is already running".into()));
246 }
247
248 let (incoming_frame_retries_tx, incoming_frame_retries_rx) = skip_delay_channel();
249 let (outgoing_frame_retries_tx, outgoing_frame_retries_rx) = skip_delay_channel();
250 let (rb_tx, rb_rx) = searchable_ringbuffer(self.cfg.lookbehind_segments);
251
252 let (ack_tx, ack_rx) = futures::channel::mpsc::channel(2 * self.cfg.lookbehind_segments);
254
255 let context = self.context.insert(AcknowledgementStateContext {
256 rb_tx,
257 rb_rx,
258 incoming_frame_retries_tx,
259 outgoing_frame_retries_tx,
260 ack_tx,
261 ctl_tx: socket_components.ctl_tx,
262 inspector: socket_components
263 .inspector
264 .ok_or(SessionError::InvalidState("inspector is not available".into()))?,
265 });
266
267 if self.cfg.mode.is_partial_ack_enabled() {
268 let mut incoming_frame_retries_tx_clone = context.incoming_frame_retries_tx.clone();
271 let ctl_tx_clone = context.ctl_tx.clone();
272 let frame_inspector_clone = context.inspector.clone();
273 let cfg = self.cfg;
274 hopr_utils::runtime::prelude::spawn(incoming_frame_retries_rx
275 .filter_map(move |rf| {
276 let frame_id = rf.frame_id;
277 let missing_segments = frame_inspector_clone.missing_segments(&frame_id).unwrap_or_default();
278 if !missing_segments.is_empty() {
279 if let Some(next) = rf.next() {
281 let retry_at = next_deadline_with_backoff(next.retry_count, cfg.backoff_base, cfg.expected_packet_latency);
283 if let Err(error) = incoming_frame_retries_tx_clone.send_one((next, retry_at)) {
284 tracing::error!(frame_id, %error, "failed to register next resend of incoming frame");
285 } else {
286 tracing::debug!(frame_id, retry_in = ?retry_at.saturating_duration_since(Instant::now()), "next resend request of incoming frame segments");
287 }
288 } else {
289 tracing::debug!(frame_id, "last request of incoming frame segments");
290 }
291
292 futures::future::ready(Some((frame_id, missing_segments)))
293 } else {
294 tracing::debug!(frame_id, "no more missing segments in frame");
295 futures::future::ready(None)
296 }
297 })
298 .ready_chunks(SegmentRequest::<C>::MAX_ENTRIES)
299 .inspect(|r| tracing::trace!(req = ?r, "requesting segments resend"))
300 .map(|a| Ok(SessionMessage::<C>::Request(a.into_iter().collect())))
301 .forward(ctl_tx_clone)
302 .map(move |res| match res {
303 Ok(_) => tracing::debug!("incoming frame resends processing done"),
304 Err(error) => tracing::error!(%error, "error while processing incoming frame resends")
305 })
306 .instrument(tracing::debug_span!("incoming_frame_retries_sender"))
307 );
308 }
309
310 let ctl_tx_clone = context.ctl_tx.clone();
312 let ack_delay = self.cfg.acknowledgement_delay;
313 hopr_utils::runtime::prelude::spawn(
314 ack_rx
315 .buffer(futures_time::time::Duration::from(ack_delay))
316 .flat_map(|acks| futures::stream::iter(FrameAcknowledgements::<C>::new_multiple(acks)))
317 .filter(|acks| futures::future::ready(!acks.is_empty()))
318 .inspect(|acks| tracing::trace!(?acks, "acknowledgements sent"))
319 .map(|acks| Ok(SessionMessage::<C>::Acknowledge(acks)))
320 .forward(ctl_tx_clone)
321 .map(move |res| match res {
322 Ok(_) => tracing::debug!("acknowledgement forwarding done"),
323 Err(error) => tracing::debug!(%error, "acknowledgement forwarding failed"),
324 })
325 .instrument(tracing::debug_span!("acknowledgement_sender")),
326 );
327
328 let mut outgoing_frame_retries_tx_clone = context.outgoing_frame_retries_tx.clone();
330 let ctl_tx_clone = context.ctl_tx.clone();
331 let rb_rx_clone = context.rb_rx.clone();
332 let cfg = self.cfg;
333 hopr_utils::runtime::prelude::spawn(
334 outgoing_frame_retries_rx
335 .map(move |rf: RetriedFrameId| {
336 let frame_id = rf.frame_id;
338 if let Some(next) = rf.next() {
339 let retry_at =
341 next_deadline_with_backoff(next.retry_count, cfg.backoff_base, cfg.expected_packet_latency);
342 if let Err(error) = outgoing_frame_retries_tx_clone.send_one((next, retry_at)) {
343 tracing::error!(frame_id, %error, "failed to register next retry of frame");
344 } else {
345 tracing::debug!(frame_id, retry_in = ?retry_at.saturating_duration_since(Instant::now()), "next resend of outgoing frame");
346 }
347 } else {
348 tracing::debug!(frame_id, "last outgoing retry of frame");
349 }
350 tracing::trace!(frame_id, "going to re-send entire frame");
351 frame_id
352 })
353 .flat_map(move |frame_id| {
354 futures::stream::iter(
356 rb_rx_clone
357 .find(|s: &Segment| s.id().0 == frame_id)
358 .into_iter()
359 .inspect(|s| tracing::trace!(seg_id = %s.id(), "segment retransmit"))
360 .map(|s| Ok(SessionMessage::<C>::Segment(s))),
361 )
362 })
363 .forward(ctl_tx_clone) .map(move |res| match res {
365 Ok(_) => tracing::debug!("outgoing frame retries processing done"),
366 Err(error) => tracing::error!(%error, "error while processing outgoing frame retries"),
367 })
368 .instrument(tracing::debug_span!("outgoing_frame_retries_sender")),
369 );
370
371 tracing::debug!("acknowledgement state has been started");
372 self.started.store(true, std::sync::atomic::Ordering::Relaxed);
373
374 Ok(())
375 }
376
377 #[tracing::instrument(name = "AcknowledgementState::stop", skip(self), fields(session_id = self.id))]
378 fn stop(&mut self) -> Result<(), SessionError> {
379 if let Some(mut ctx) = self.context.take() {
380 ctx.outgoing_frame_retries_tx.force_close();
381 ctx.incoming_frame_retries_tx.force_close();
382 ctx.ack_tx.close_channel();
383 ctx.ctl_tx.close_channel();
384
385 self.started.store(false, std::sync::atomic::Ordering::Relaxed);
386 tracing::debug!("state has been stopped");
387 } else {
388 tracing::warn!("cannot be stopped, because it is not running");
389 }
390
391 Ok(())
392 }
393
394 #[tracing::instrument(name = "AcknowledgementState::incoming_segment", skip(self), fields(session_id = self.id, frame_id = seg_id.0))]
395 fn incoming_segment(&mut self, seg_id: &SegmentId, _ind: SeqIndicator) -> Result<(), SessionError> {
396 tracing::trace!("segment received");
397
398 let ctx = self
399 .started
400 .load(std::sync::atomic::Ordering::Relaxed)
401 .then_some(self.context.as_mut())
402 .flatten()
403 .ok_or(SessionError::StateNotRunning)?;
404
405 if self.cfg.mode.is_partial_ack_enabled() {
407 if let Err(error) = ctx.incoming_frame_retries_tx.send_one((
410 RetriedFrameId::with_retries(seg_id.0, self.cfg.max_incoming_frame_retries),
411 self.cfg.expected_packet_latency, )) {
413 tracing::error!(%error, "failed to register incoming retry for frame");
414 }
415 }
416 Ok(())
417 }
418
419 #[tracing::instrument(name = "AcknowledgementState::incoming_retransmission_request", skip(self, request), fields(session_id = self.id))]
420 fn incoming_retransmission_request(&mut self, request: SegmentRequest<C>) -> Result<(), SessionError> {
421 tracing::trace!(count = request.len(), "segment retransmission requested");
424
425 let ctx = self
426 .started
427 .load(std::sync::atomic::Ordering::Relaxed)
428 .then_some(self.context.as_mut())
429 .flatten()
430 .ok_or(SessionError::StateNotRunning)?;
431
432 let (mut missing_seg_ids, mut missing_frame_ids): (Vec<_>, Vec<_>) =
433 request.into_iter().map(|s| (s, s.0)).unzip();
434
435 let segments = ctx.rb_rx.find(|s| {
437 if let Ok(i) = missing_seg_ids.binary_search(&s.id()) {
439 missing_seg_ids.remove(i);
440 true
441 } else {
442 false
443 }
444 });
445
446 tracing::trace!(
447 found = segments.len(),
448 requested = missing_frame_ids.len(),
449 "found matching segments to be retransmitted"
450 );
451
452 if self.cfg.mode.is_full_ack_enabled() {
455 missing_frame_ids.dedup();
457
458 if let Err(error) = ctx.outgoing_frame_retries_tx.send_many(
459 missing_frame_ids
460 .into_iter()
461 .map(|frame_id| (RetriedFrameId::no_retries(frame_id), Skip).into()),
462 ) {
463 tracing::error!(%error, "failed to cancel frame resend of partially acknowledged frames");
464 }
465 }
466
467 segments
469 .into_iter()
470 .try_for_each(|s| {
471 tracing::trace!(seg_id = %s.id(), "retransmit segment on request");
472 ctx.ctl_tx.try_send(SessionMessage::Segment(s))
473 })
474 .map_err(|e| SessionError::ProcessingError(e.to_string()))
475 }
476
477 #[tracing::instrument(name = "AcknowledgementState::incoming_acknowledged_frames", skip(self), fields(session_id = self.id))]
478 fn incoming_acknowledged_frames(&mut self, ack: FrameAcknowledgements<C>) -> Result<(), SessionError> {
479 tracing::trace!(count = ack.len(), "frame acknowledgements received");
480
481 let ctx = self
482 .started
483 .load(std::sync::atomic::Ordering::Relaxed)
484 .then_some(self.context.as_mut())
485 .flatten()
486 .ok_or(SessionError::StateNotRunning)?;
487
488 if self.cfg.mode.is_full_ack_enabled()
490 && let Err(error) = ctx.outgoing_frame_retries_tx.send_many(
491 ack.into_iter()
492 .inspect(|frame_id| tracing::trace!(frame_id, "frame acknowledged"))
493 .map(|frame_id| (RetriedFrameId::no_retries(frame_id), Skip).into()),
494 )
495 {
496 tracing::error!(%error, "failed to cancel frame resend");
497 }
498
499 Ok(())
500 }
501
502 #[tracing::instrument(name = "AcknowledgementState::frame_complete", skip(self), fields(session_id = self.id))]
503 fn frame_complete(&mut self, frame_id: FrameId) -> Result<(), SessionError> {
504 tracing::trace!("frame complete");
505
506 let ctx = self
507 .started
508 .load(std::sync::atomic::Ordering::Relaxed)
509 .then_some(self.context.as_mut())
510 .flatten()
511 .ok_or(SessionError::StateNotRunning)?;
512
513 if let Err(error) = ctx.ack_tx.try_send(frame_id) {
515 tracing::error!(%error, "failed to acknowledge frame");
516 }
517
518 if self.cfg.mode.is_partial_ack_enabled() {
519 if let Err(error) = ctx
521 .incoming_frame_retries_tx
522 .send_one((RetriedFrameId::no_retries(frame_id), Skip))
523 {
524 tracing::error!(%error, "failed to cancel retry of acknowledged frame");
525 }
526 }
527
528 Ok(())
529 }
530
531 #[tracing::instrument(name = "AcknowledgementState::frame_emitted", skip(self), fields(session_id = self.id))]
532 fn frame_emitted(&mut self, id: FrameId) -> Result<(), SessionError> {
533 tracing::trace!("frame emitted");
534 let _ = self
535 .started
536 .load(std::sync::atomic::Ordering::Relaxed)
537 .then_some(self.context.as_mut())
538 .flatten()
539 .ok_or(SessionError::StateNotRunning)?;
540 Ok(())
541 }
542
543 #[tracing::instrument(name = "AcknowledgementState::frame_discarded", skip(self), fields(session_id = self.id))]
544 fn frame_discarded(&mut self, frame_id: FrameId) -> Result<(), SessionError> {
545 tracing::trace!("frame discarded");
546
547 let ctx = self
548 .started
549 .load(std::sync::atomic::Ordering::Relaxed)
550 .then_some(self.context.as_mut())
551 .flatten()
552 .ok_or(SessionError::StateNotRunning)?;
553
554 if self.cfg.mode.is_partial_ack_enabled() {
555 if let Err(error) = ctx
557 .incoming_frame_retries_tx
558 .send_one((RetriedFrameId::no_retries(frame_id), Skip))
559 {
560 tracing::error!(%error, "failed to cancel retry of acknowledged frame");
561 }
562 }
563
564 Ok(())
565 }
566
567 #[tracing::instrument(name = "AcknowledgementState::segment_sent", skip(self, segment), fields(session_id = self.id, frame_id = segment.frame_id, seq_idx = segment.seq_idx))]
568 fn segment_sent(&mut self, segment: &Segment) -> Result<(), SessionError> {
569 tracing::trace!("segment sent");
570
571 let ctx = self
572 .started
573 .load(std::sync::atomic::Ordering::Relaxed)
574 .then_some(self.context.as_mut())
575 .flatten()
576 .ok_or(SessionError::StateNotRunning)?;
577
578 if !ctx.rb_tx.push(segment.clone()) {
581 tracing::error!("failed to push segment into ring buffer");
582 }
583
584 if segment.is_last() && self.cfg.mode.is_full_ack_enabled() {
587 tracing::trace!("last segment of frame sent");
588
589 if let Err(error) = ctx.outgoing_frame_retries_tx.send_one((
590 RetriedFrameId::with_retries(segment.frame_id, self.cfg.max_outgoing_frame_retries),
591 self.cfg.expected_packet_latency * (segment.seq_flags.seq_len() + 1) as u32,
596 )) {
597 tracing::error!(%error, "failed to insert outgoing retry of a frame");
598 }
599 }
600
601 Ok(())
602 }
603}
604
605#[cfg(test)]
606mod tests {
607 use anyhow::Context;
608
609 use super::*;
610 use crate::{
611 processing::types::{FrameBuilder, FrameDashMap, FrameMap},
612 protocol::SeqNum,
613 utils::segment,
614 };
615
616 const FRAME_SIZE: usize = 1500;
617
618 const MTU: usize = 1000;
619
620 #[test_log::test(tokio::test)]
621 async fn ack_state_sender_must_acknowledge_completed_frames() -> anyhow::Result<()> {
622 let cfg = AcknowledgementStateConfig {
623 expected_packet_latency: Duration::from_millis(10),
624 acknowledgement_delay: Duration::from_millis(2),
625 ..Default::default()
626 };
627
628 let inspector = FrameInspector(FrameDashMap::with_capacity(10));
629 let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
630
631 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
632 state.run(SocketComponents {
633 inspector: inspector.into(),
634 ctl_tx,
635 })?;
636
637 let acked_frame_ids = [1, 2, 3];
638
639 for &frame_id in &acked_frame_ids {
640 state.frame_complete(frame_id)?;
641 }
642
643 tokio::time::sleep(cfg.acknowledgement_delay * 2).await;
644
645 state.stop()?;
646
647 let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
648 .await
649 .context("timeout receiving Control messages")?;
650
651 assert_eq!(1, ctl_msgs.len());
652
653 assert_eq!(
654 ctl_msgs[0],
655 SessionMessage::Acknowledge(acked_frame_ids.to_vec().try_into()?)
656 );
657
658 Ok(())
659 }
660
661 #[parameterized::parameterized(num_frames = { 1, 2, 3 })]
662 #[parameterized_macro(test_log::test(tokio::test))]
663 async fn ack_state_sender_must_resend_unacknowledged_frames(num_frames: usize) -> anyhow::Result<()> {
664 const NUM_RETRIES: usize = 2;
665
666 let cfg = AcknowledgementStateConfig {
667 mode: AcknowledgementMode::Full,
668 expected_packet_latency: Duration::from_millis(2),
669 max_outgoing_frame_retries: NUM_RETRIES,
670 ..Default::default()
671 };
672
673 let inspector = FrameInspector(FrameDashMap::with_capacity(10));
674 let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
675
676 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
677 state.run(SocketComponents {
678 inspector: inspector.into(),
679 ctl_tx,
680 })?;
681
682 let mut expected_frame_segments = Vec::new();
683 let num_segments_in_frame = FRAME_SIZE / MTU + 1;
684 for i in 1..=num_frames {
685 let expected_segments = segment(
686 hopr_types::crypto_random::random_bytes::<FRAME_SIZE>(),
687 MTU,
688 i as FrameId,
689 )?;
690 for segment in &expected_segments {
691 state.segment_sent(segment)?;
692 }
693 expected_frame_segments.push(expected_segments);
694 }
695
696 let expected_frame_delivery = cfg.expected_packet_latency * (num_segments_in_frame + 1) as u32;
697 tokio::time::sleep(2 * expected_frame_delivery).await;
698 state.stop()?;
699
700 let ctl_msg = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
701 .await
702 .context("timeout receiving Control message")?;
703
704 let retransmitted_segments = ctl_msg
705 .into_iter()
706 .map(|m| m.try_as_segment().ok_or(anyhow::anyhow!("must be segment")))
707 .collect::<Result<Vec<_>, _>>()?;
708
709 assert_eq!(
710 NUM_RETRIES * num_segments_in_frame * num_frames,
711 retransmitted_segments.len()
712 );
713
714 let total_segments = expected_frame_segments.iter().map(|m| m.len()).sum::<usize>();
715 let expected_segments = expected_frame_segments
716 .into_iter()
717 .flatten()
718 .cycle()
719 .take(total_segments * NUM_RETRIES)
720 .collect::<Vec<_>>();
721 assert_eq!(expected_segments, retransmitted_segments);
722
723 Ok(())
724 }
725
726 #[test_log::test(tokio::test)]
727 async fn ack_state_sender_must_not_resend_unacknowledged_frame_when_full_resend_disabled() -> anyhow::Result<()> {
728 const NUM_RETRIES: usize = 2;
729
730 let cfg = AcknowledgementStateConfig {
731 mode: AcknowledgementMode::Partial,
732 expected_packet_latency: Duration::from_millis(2),
733 max_outgoing_frame_retries: NUM_RETRIES,
734 ..Default::default()
735 };
736
737 let inspector = FrameInspector(FrameDashMap::with_capacity(10));
738 let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
739
740 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
741 state.run(SocketComponents {
742 inspector: inspector.into(),
743 ctl_tx,
744 })?;
745
746 let expected_segments = segment(hopr_types::crypto_random::random_bytes::<FRAME_SIZE>(), MTU, 1)?;
747 for segment in &expected_segments {
748 state.segment_sent(segment)?;
749 }
750
751 let expected_frame_delivery = cfg.expected_packet_latency * (expected_segments.len() + 1) as u32;
752 tokio::time::sleep(2 * expected_frame_delivery).await;
753 state.stop()?;
754
755 assert!(ctl_rx.collect::<Vec<_>>().await.is_empty());
757
758 Ok(())
759 }
760
761 #[tokio::test]
762 async fn ack_state_sender_must_not_resend_acknowledged_frame() -> anyhow::Result<()> {
763 let cfg = AcknowledgementStateConfig {
764 mode: AcknowledgementMode::Full,
765 expected_packet_latency: Duration::from_millis(2),
766 max_outgoing_frame_retries: 1,
767 ..Default::default()
768 };
769
770 let inspector = FrameInspector(FrameDashMap::with_capacity(10));
771 let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
772
773 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
774 state.run(SocketComponents {
775 inspector: inspector.into(),
776 ctl_tx,
777 })?;
778
779 let expected_segments = segment(hopr_types::crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 1)?;
780 for segment in &expected_segments {
781 state.segment_sent(segment)?;
782 }
783
784 state.incoming_acknowledged_frames(vec![1].try_into()?)?;
786
787 tokio::time::sleep(10 * cfg.expected_packet_latency).await;
788
789 state.stop()?;
790
791 assert!(ctl_rx.collect::<Vec<_>>().await.is_empty());
793
794 Ok(())
795 }
796
797 #[test_log::test(tokio::test)]
798 async fn ack_state_sender_must_not_resend_entire_frame_when_already_partially_acknowledged() -> anyhow::Result<()> {
799 let cfg = AcknowledgementStateConfig {
800 mode: AcknowledgementMode::Full,
801 expected_packet_latency: Duration::from_millis(2),
802 max_outgoing_frame_retries: 1,
803 ..Default::default()
804 };
805
806 let inspector = FrameInspector(FrameDashMap::with_capacity(10));
807 let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
808
809 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
810 state.run(SocketComponents {
811 inspector: inspector.into(),
812 ctl_tx,
813 })?;
814
815 let expected_segments = segment(hopr_types::crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 1)?;
816
817 for segment in &expected_segments {
819 state.segment_sent(segment)?;
820 }
821
822 tokio::time::sleep(cfg.expected_packet_latency).await;
823
824 state.incoming_retransmission_request(SegmentRequest::from_iter([(1, [0b10000000].into())]))?;
826
827 state.stop()?;
828
829 let ctl_msgs = ctl_rx.collect::<Vec<_>>().await;
831 assert_eq!(1, ctl_msgs.len());
832 assert_eq!(ctl_msgs[0], SessionMessage::Segment(expected_segments[0].clone()));
833
834 Ok(())
835 }
836
837 #[test_log::test(tokio::test)]
838 async fn ack_state_sender_must_retransmit_segments_when_requested() -> anyhow::Result<()> {
839 let cfg = AcknowledgementStateConfig {
840 mode: AcknowledgementMode::Full,
841 expected_packet_latency: Duration::from_millis(2),
842 max_outgoing_frame_retries: 1,
843 ..Default::default()
844 };
845
846 let inspector = FrameInspector(FrameDashMap::with_capacity(10));
847 let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
848
849 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
850 state.run(SocketComponents {
851 inspector: inspector.into(),
852 ctl_tx,
853 })?;
854
855 let expected_segments_1 = segment(hopr_types::crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 1)?;
856 for segment in &expected_segments_1 {
858 state.segment_sent(segment)?;
859 }
860
861 let expected_segments_2 = segment(hopr_types::crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 2)?;
862 for segment in &expected_segments_2 {
864 state.segment_sent(segment)?;
865 }
866
867 tokio::time::sleep(cfg.expected_packet_latency).await;
868
869 state.incoming_retransmission_request(SegmentRequest::from_iter([
871 (1, [0b11100000].into()),
872 (2, [0b11100000].into()),
873 ]))?;
874 tokio::time::sleep(cfg.expected_packet_latency).await;
875
876 state.incoming_retransmission_request(SegmentRequest::from_iter([(2, [0b11000000].into())]))?;
877 tokio::time::sleep(cfg.expected_packet_latency).await;
878
879 state.incoming_retransmission_request(SegmentRequest::from_iter([(2, [0b01000000].into())]))?;
880 tokio::time::sleep(cfg.expected_packet_latency).await;
881
882 state.stop()?;
883
884 let ctl_msgs = ctl_rx.collect::<Vec<_>>().await;
885
886 assert_eq!(9, ctl_msgs.len());
887 assert_eq!(ctl_msgs[0], SessionMessage::Segment(expected_segments_1[0].clone()));
889 assert_eq!(ctl_msgs[1], SessionMessage::Segment(expected_segments_1[1].clone()));
890 assert_eq!(ctl_msgs[2], SessionMessage::Segment(expected_segments_1[2].clone()));
891 assert_eq!(ctl_msgs[3], SessionMessage::Segment(expected_segments_2[0].clone()));
893 assert_eq!(ctl_msgs[4], SessionMessage::Segment(expected_segments_2[1].clone()));
894 assert_eq!(ctl_msgs[5], SessionMessage::Segment(expected_segments_2[2].clone()));
895
896 assert_eq!(ctl_msgs[6], SessionMessage::Segment(expected_segments_2[0].clone()));
898 assert_eq!(ctl_msgs[7], SessionMessage::Segment(expected_segments_2[1].clone()));
899
900 assert_eq!(ctl_msgs[8], SessionMessage::Segment(expected_segments_2[1].clone()));
902
903 Ok(())
904 }
905
906 #[tokio::test]
907 async fn ack_state_receiver_must_request_missing_frames_when_partial_acks_are_enabled() -> anyhow::Result<()> {
908 let cfg = AcknowledgementStateConfig {
909 mode: AcknowledgementMode::Partial,
910 expected_packet_latency: Duration::from_millis(2),
911 max_incoming_frame_retries: 1,
912 ..Default::default()
913 };
914
915 let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
916 let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
917
918 let segments = segment(hopr_types::crypto_random::random_bytes::<FRAME_SIZE>(), MTU, 1)?;
919
920 inspector
921 .0
922 .entry(1)
923 .try_as_vacant()
924 .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
925 .insert(FrameBuilder::from(segments[0].clone()));
926
927 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
928 state.run(SocketComponents {
929 inspector: inspector.into(),
930 ctl_tx,
931 })?;
932
933 state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
934
935 tokio::time::sleep(cfg.expected_packet_latency * 2).await;
936
937 state.stop()?;
938
939 let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
940 .await
941 .context("timeout receiving Control messages")?;
942
943 assert_eq!(1, ctl_msgs.len());
944 assert_eq!(
945 ctl_msgs[0],
946 SessionMessage::Request(SegmentRequest::from_iter([(1, [0b01000000].into())]))
947 );
948
949 Ok(())
950 }
951
952 #[tokio::test]
953 async fn ack_state_receiver_must_not_request_missing_frames_when_partial_acks_are_disabled() -> anyhow::Result<()> {
954 let cfg = AcknowledgementStateConfig {
955 mode: AcknowledgementMode::Full,
956 expected_packet_latency: Duration::from_millis(2),
957 max_incoming_frame_retries: 1,
958 ..Default::default()
959 };
960
961 let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
962 let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
963
964 let segments = segment(hopr_types::crypto_random::random_bytes::<FRAME_SIZE>(), MTU, 1)?;
965
966 inspector
967 .0
968 .entry(1)
969 .try_as_vacant()
970 .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
971 .insert(FrameBuilder::from(segments[0].clone()));
972
973 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
974 state.run(SocketComponents {
975 inspector: inspector.into(),
976 ctl_tx,
977 })?;
978
979 state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
980
981 tokio::time::sleep(cfg.expected_packet_latency * 2).await;
982
983 state.stop()?;
984
985 let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
986 .await
987 .context("timeout receiving Control messages")?;
988
989 assert!(ctl_msgs.iter().all(|m| !matches!(m, SessionMessage::Request(_))));
990
991 Ok(())
992 }
993
994 #[tokio::test]
995 async fn ack_state_receiver_must_continue_requesting_missing_frames_when_frame_not_completed() -> anyhow::Result<()>
996 {
997 let cfg = AcknowledgementStateConfig {
998 mode: AcknowledgementMode::Partial,
999 expected_packet_latency: Duration::from_millis(2),
1000 max_incoming_frame_retries: 3,
1001 ..Default::default()
1002 };
1003
1004 let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
1005 let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
1006
1007 let segments = segment(hopr_types::crypto_random::random_bytes::<{ 2 * FRAME_SIZE }>(), MTU, 1)?;
1008
1009 inspector
1010 .0
1011 .entry(1)
1012 .try_as_vacant()
1013 .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
1014 .insert(FrameBuilder::from(segments[0].clone()));
1015
1016 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
1017 state.run(SocketComponents {
1018 inspector: inspector.clone().into(),
1019 ctl_tx,
1020 })?;
1021
1022 state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
1023
1024 tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1025
1026 inspector
1027 .0
1028 .entry(1)
1029 .try_as_occupied()
1030 .ok_or(anyhow::anyhow!("frame 1 must be occupied"))?
1031 .get_mut()
1032 .add_segment(segments[1].clone())?;
1033
1034 state.incoming_segment(&segments[1].id(), (segments.len() as SeqNum).try_into()?)?;
1035
1036 tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1037
1038 state.stop()?;
1039
1040 let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
1041 .await
1042 .context("timeout receiving Control messages")?;
1043
1044 assert_eq!(2, ctl_msgs.len());
1045 assert_eq!(
1046 ctl_msgs[0],
1047 SessionMessage::Request(SegmentRequest::from_iter([(1, [0b01100000].into())]))
1048 );
1049
1050 assert_eq!(
1051 ctl_msgs[1],
1052 SessionMessage::Request(SegmentRequest::from_iter([(1, [0b00100000].into())]))
1053 );
1054
1055 Ok(())
1056 }
1057
1058 #[tokio::test]
1059 async fn ack_state_receiver_must_continue_requesting_missing_frames_and_acknowledge_once_complete()
1060 -> anyhow::Result<()> {
1061 let cfg = AcknowledgementStateConfig {
1062 mode: AcknowledgementMode::Partial,
1063 expected_packet_latency: Duration::from_millis(2),
1064 max_incoming_frame_retries: 3,
1065 acknowledgement_delay: Duration::from_millis(5),
1066 ..Default::default()
1067 };
1068
1069 let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
1070 let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
1071
1072 let segments = segment(hopr_types::crypto_random::random_bytes::<{ 2 * FRAME_SIZE }>(), MTU, 1)?;
1073
1074 inspector
1076 .0
1077 .entry(1)
1078 .try_as_vacant()
1079 .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
1080 .insert(FrameBuilder::from(segments[0].clone()));
1081
1082 let mut state = AcknowledgementState::<MTU>::new("test", cfg);
1083 state.run(SocketComponents {
1084 inspector: inspector.clone().into(),
1085 ctl_tx,
1086 })?;
1087
1088 state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
1090
1091 tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1092
1093 inspector
1094 .0
1095 .entry(1)
1096 .try_as_occupied()
1097 .ok_or(anyhow::anyhow!("frame 1 must be occupied"))?
1098 .get_mut()
1099 .add_segment(segments[1].clone())?;
1100
1101 state.incoming_segment(&segments[1].id(), (segments.len() as SeqNum).try_into()?)?;
1102
1103 tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1104
1105 inspector
1107 .0
1108 .entry(1)
1109 .try_as_occupied()
1110 .ok_or(anyhow::anyhow!("frame 1 must be occupied"))?
1111 .get_mut()
1112 .add_segment(segments[2].clone())?;
1113
1114 state.incoming_segment(&segments[2].id(), (segments.len() as SeqNum).try_into()?)?;
1115 state.frame_complete(1)?;
1116
1117 tokio::time::sleep(cfg.acknowledgement_delay * 2).await;
1118
1119 state.stop()?;
1120
1121 let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
1122 .await
1123 .context("timeout receiving Control messages")?;
1124
1125 assert_eq!(3, ctl_msgs.len());
1126 assert_eq!(
1127 ctl_msgs[0],
1128 SessionMessage::Request(SegmentRequest::from_iter([(1, [0b01100000].into())]))
1129 );
1130
1131 assert_eq!(
1132 ctl_msgs[1],
1133 SessionMessage::Request(SegmentRequest::from_iter([(1, [0b00100000].into())]))
1134 );
1135
1136 assert_eq!(ctl_msgs[2], SessionMessage::Acknowledge(vec![1].try_into()?));
1137
1138 Ok(())
1139 }
1140}