hopr_protocol_session/socket/
ack_state.rs

1//! This module defines the [`SocketState`] that turns [`SessionSocket`](super::SessionSocket) into
2//! a reliable socket, with segment/frame retransmission and frame acknowledgements.
3
4use std::{
5    sync::atomic::AtomicBool,
6    time::{Duration, Instant},
7};
8
9use futures::{FutureExt, StreamExt, channel::mpsc::UnboundedSender};
10use futures_time::stream::StreamExt as TimeStreamExt;
11use tracing::Instrument;
12
13use crate::{
14    errors::SessionError,
15    processing::types::FrameInspector,
16    protocol::{FrameAcknowledgements, FrameId, Segment, SegmentId, SegmentRequest, SeqIndicator, SessionMessage},
17    socket::{SocketState, state::SocketComponents},
18    utils::{
19        RetriedFrameId, RingBufferProducer, RingBufferView, next_deadline_with_backoff, searchable_ringbuffer,
20        skip_queue::{Skip, SkipDelaySender, skip_delay_channel},
21    },
22};
23
24/// Indicates the acknowledgement mode of a [stateful](AcknowledgementState) Session socket.
25#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
26pub enum AcknowledgementMode {
27    /// Partial frames are acknowledged, leading to receiver-driven retransmission requests.
28    /// The frame sender is never going to retransmit the entire frame.
29    Partial,
30    /// Only full frames are acknowledged, leading to the full-frame retransmission if
31    /// no acknowledgement is received by the frame sender.
32    Full,
33    /// Both partial and full acknowledgements are sent by the receiver.
34    ///
35    /// If a frame is partially acknowledged first, only receiver-driven retransmission requests follow
36    /// (as with [`AcknowledgementMode::Partial`].
37    ///
38    /// If the frame sender receives no acknowledgement (partial nor full), it retransmits
39    /// the entire frame (as in [`AcknowledgementMode::Full`]).
40    #[default]
41    Both,
42}
43
44impl AcknowledgementMode {
45    /// Indicates if `self` is [`AcknowledgementMode::Partial`] or [`AcknowledgementMode::Both`].
46    #[inline]
47    fn is_partial_ack_enabled(&self) -> bool {
48        matches!(self, Self::Partial | Self::Both)
49    }
50
51    /// Indicates if `self` is [`AcknowledgementMode::Full`] or [`AcknowledgementMode::Both`].
52    #[inline]
53    fn is_full_ack_enabled(&self) -> bool {
54        matches!(self, Self::Full | Self::Both)
55    }
56}
57
58/// Configuration object of the [`AcknowledgementState`].
59#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault)]
60pub struct AcknowledgementStateConfig {
61    /// Mode of frame acknowledgement.
62    ///
63    /// Default is [`AcknowledgementMode::Both`]
64    pub mode: AcknowledgementMode,
65
66    /// The expected (average) latency of a packet (= single frame segment).
67    ///
68    /// Default is 20 ms
69    #[default(Duration::from_millis(20))]
70    pub expected_packet_latency: Duration,
71
72    /// Backoff base applied for segment or frame retransmissions.
73    ///
74    /// Default is 1.2
75    #[default(1.2)]
76    pub backoff_base: f64,
77
78    /// The maximum number of receiver-driven segment retransmission requests.
79    ///
80    /// Default is 3
81    #[default(3)]
82    pub max_incoming_frame_retries: usize,
83
84    /// The maximum number of sender-driven full-frame retransmissions.
85    ///
86    /// Default is 3
87    #[default(3)]
88    pub max_outgoing_frame_retries: usize,
89
90    /// Delay between acknowledgement batches.
91    ///
92    /// Default is 50 ms
93    #[default(Duration::from_millis(50))]
94    pub acknowledgement_delay: Duration,
95
96    /// Number of segments to hold back for retransmission upon other party's request.
97    /// Minimum is 1024.
98    ///
99    /// Default is 16 384.
100    #[default(16384)]
101    pub lookbehind_segments: usize,
102}
103
104impl AcknowledgementStateConfig {
105    fn normalize(self) -> AcknowledgementStateConfig {
106        Self {
107            mode: self.mode,
108            expected_packet_latency: self.expected_packet_latency.max(Duration::from_millis(1)),
109            backoff_base: self.backoff_base.max(1.0),
110            max_incoming_frame_retries: self.max_incoming_frame_retries,
111            max_outgoing_frame_retries: self.max_outgoing_frame_retries,
112            acknowledgement_delay: self.acknowledgement_delay.max(Duration::from_millis(1)),
113            lookbehind_segments: self.lookbehind_segments.max(1024),
114        }
115    }
116}
117
118#[derive(Clone)]
119struct AcknowledgementStateContext<const C: usize> {
120    rb_tx: RingBufferProducer<Segment>,
121    rb_rx: RingBufferView<Segment>,
122    incoming_frame_retries_tx: SkipDelaySender<RetriedFrameId>,
123    outgoing_frame_retries_tx: SkipDelaySender<RetriedFrameId>,
124    ack_tx: futures::channel::mpsc::Sender<FrameId>,
125    inspector: FrameInspector,
126    ctl_tx: UnboundedSender<SessionMessage<C>>,
127}
128
129#[cfg_attr(doc, aquamarine::aquamarine)]
130/// Represents a Session socket state is able to process acknowledgements.
131///
132/// # Retransmission driven by the Receiver
133/// ```mermaid
134/// sequenceDiagram
135///     Note over Sender,Receiver: Frame 1
136///     rect rgb(191, 223, 255)
137///     Note left of Sender: Frame 1 in buffer
138///     Sender->>Receiver: Segment 1/3 of Frame 1
139///     Sender->>Receiver: Segment 2/3 of Frame 1
140///     Sender--xReceiver: Segment 3/3 of Frame 1
141///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
142///     Receiver->>Sender: Request Segment 3 of Frame 1
143///     Sender->>Receiver: Segment 3/3 of Frame 1
144///     Receiver->>Sender: Acknowledge Frame 1
145///     Note left of Sender: Frame 1 dropped from buffer
146///     end
147///     Note over Sender,Receiver: Frame 1 delivered
148/// ```
149///
150/// # Retransmission driven by the Sender
151/// ```mermaid
152/// sequenceDiagram
153///     Note over Sender,Receiver: Frame 1
154///     rect rgb(191, 223, 255)
155///     Note left of Sender: Frame 1 in buffer
156///     Sender->>Receiver: Segment 1/3 of Frame 1
157///     Sender->>Receiver: Segment 2/3 of Frame 1
158///     Sender--xReceiver: Segment 3/3 of Frame 1
159///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
160///     Receiver--xSender: Request Segment 3 of Frame 1
161///     Note left of Sender: RTO_BASE_SENDER elapsed
162///     Sender->>Receiver: Segment 1/3 of Frame 1
163///     Sender->>Receiver: Segment 2/3 of Frame 1
164///     Sender->>Receiver: Segment 3/3 of Frame 1
165///     Receiver->>Sender: Acknowledge Frame 1
166///     Note left of Sender: Frame 1 dropped from buffer
167///     end
168///     Note over Sender,Receiver: Frame 1 delivered
169/// ```
170///
171/// # Sender-Receiver retransmission handover
172///
173/// ```mermaid
174///    sequenceDiagram
175///     Note over Sender,Receiver: Frame 1
176///     rect rgb(191, 223, 255)
177///     Note left of Sender: Frame 1 in buffer
178///     Sender->>Receiver: Segment 1/3 of Frame 1
179///     Sender--xReceiver: Segment 2/3 of Frame 1
180///     Sender--xReceiver: Segment 3/3 of Frame 1
181///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
182///     Receiver->>Sender: Request Segments 2,3 of Frame 1
183///     Note left of Sender: RTO_BASE_SENDER cancelled
184///     Sender->>Receiver: Segment 2/3 of Frame 1
185///     Sender--xReceiver: Segment 3/3 of Frame 1
186///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
187///     Receiver--xSender: Request Segments 3 of Frame 1
188///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
189///     Receiver->>Sender: Request Segments 3 of Frame 1
190///     Sender->>Receiver: Segment 3/3 of Frame 1
191///     Receiver->>Sender: Acknowledge Frame 1
192///     Note left of Sender: Frame 1 dropped from buffer
193///     end
194///     Note over Sender,Receiver: Frame 1 delivered
195/// ```
196///
197/// # Retransmission failure
198///
199/// ```mermaid
200///    sequenceDiagram
201///     Note over Sender,Receiver: Frame 1
202///     rect rgb(191, 223, 255)
203///     Note left of Sender: Frame 1 in buffer
204///     Sender->>Receiver: Segment 1/3 of Frame 1
205///     Sender->>Receiver: Segment 2/3 of Frame 1
206///     Sender--xReceiver: Segment 3/3 of Frame 1
207///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
208///     Receiver--xSender: Request Segment 3 of Frame 1
209///     Note left of Sender: RTO_BASE_SENDER elapsed
210///     Sender--xReceiver: Segment 1/3 of Frame 1
211///     Sender--xReceiver: Segment 2/3 of Frame 1
212///     Sender--xReceiver: Segment 3/3 of Frame 1
213///     Note left of Sender: FRAME_MAX_AGE elapsed<br/>Frame 1 dropped from buffer
214///     Note right of Receiver: FRAME_MAX_AGE elapsed<br/>Frame 1 dropped from buffer
215///     end
216///     Note over Sender,Receiver: Frame 1 never delivered
217/// ```
218#[derive(Clone)]
219pub struct AcknowledgementState<const C: usize> {
220    id: String,
221    cfg: AcknowledgementStateConfig,
222    context: Option<AcknowledgementStateContext<C>>,
223    started: std::sync::Arc<AtomicBool>,
224}
225
226impl<const C: usize> AcknowledgementState<C> {
227    pub fn new<I: std::fmt::Display>(session_id: I, cfg: AcknowledgementStateConfig) -> Self {
228        Self {
229            id: session_id.to_string(),
230            cfg: cfg.normalize(),
231            context: Default::default(),
232            started: std::sync::Arc::new(AtomicBool::new(false)),
233        }
234    }
235}
236
237impl<const C: usize> SocketState<C> for AcknowledgementState<C> {
238    fn session_id(&self) -> &str {
239        &self.id
240    }
241
242    #[tracing::instrument(name = "AcknowledgementState", skip(self, socket_components), fields(session_id = self.id))]
243    fn run(&mut self, socket_components: SocketComponents<C>) -> Result<(), SessionError> {
244        if self.started.load(std::sync::atomic::Ordering::Relaxed) && self.context.is_some() {
245            return Err(SessionError::InvalidState("state is already running".into()));
246        }
247
248        let (incoming_frame_retries_tx, incoming_frame_retries_rx) = skip_delay_channel();
249        let (outgoing_frame_retries_tx, outgoing_frame_retries_rx) = skip_delay_channel();
250        let (rb_tx, rb_rx) = searchable_ringbuffer(self.cfg.lookbehind_segments);
251
252        // Full frame acknowledgements get a special channel with fixed capacity
253        let (ack_tx, ack_rx) = futures::channel::mpsc::channel(2 * self.cfg.lookbehind_segments);
254
255        let context = self.context.insert(AcknowledgementStateContext {
256            rb_tx,
257            rb_rx,
258            incoming_frame_retries_tx,
259            outgoing_frame_retries_tx,
260            ack_tx,
261            ctl_tx: socket_components.ctl_tx,
262            inspector: socket_components
263                .inspector
264                .ok_or(SessionError::InvalidState("inspector is not available".into()))?,
265        });
266
267        if self.cfg.mode.is_partial_ack_enabled() {
268            // For partially received frames incomplete for too long,
269            // missing segments will be asked for retransmission
270            let mut incoming_frame_retries_tx_clone = context.incoming_frame_retries_tx.clone();
271            let ctl_tx_clone = context.ctl_tx.clone();
272            let frame_inspector_clone = context.inspector.clone();
273            let cfg = self.cfg;
274            hopr_async_runtime::prelude::spawn(incoming_frame_retries_rx
275                .filter_map(move |rf| {
276                    let frame_id = rf.frame_id;
277                    let missing_segments = frame_inspector_clone.missing_segments(&frame_id).unwrap_or_default();
278                    if !missing_segments.is_empty() {
279                        // Find out if we need to subscribe for further retries of this Frame
280                        if let Some(next) = rf.next() {
281                            // Register the next retry if still possible
282                            let retry_at = next_deadline_with_backoff(next.retry_count, cfg.backoff_base, cfg.expected_packet_latency);
283                            if let Err(error) = incoming_frame_retries_tx_clone.send_one((next, retry_at)) {
284                                tracing::error!(frame_id, %error, "failed to register next resend of incoming frame");
285                            } else {
286                                tracing::debug!(frame_id, retry_in = ?retry_at.saturating_duration_since(Instant::now()), "next resend request of incoming frame segments");
287                            }
288                        } else {
289                            tracing::debug!(frame_id, "last request of incoming frame segments");
290                        }
291
292                        futures::future::ready(Some((frame_id, missing_segments)))
293                    } else {
294                        tracing::debug!(frame_id, "no more missing segments in frame");
295                        futures::future::ready(None)
296                    }
297                })
298                .ready_chunks(SegmentRequest::<C>::MAX_ENTRIES)
299                .inspect(|r| tracing::trace!(req = ?r, "requesting segments resend"))
300                .map(|a| Ok(SessionMessage::<C>::Request(a.into_iter().collect())))
301                .forward(ctl_tx_clone)
302                .map(move |res| match res {
303                    Ok(_) => tracing::debug!("incoming frame resends processing done"),
304                    Err(error) => tracing::error!(%error, "error while processing incoming frame resends")
305                })
306                .instrument(tracing::debug_span!("incoming_frame_retries_sender"))
307            );
308        }
309
310        // Send out Frame Acknowledgements chunked as Control messages
311        let ctl_tx_clone = context.ctl_tx.clone();
312        let ack_delay = self.cfg.acknowledgement_delay;
313        hopr_async_runtime::prelude::spawn(
314            ack_rx
315                .buffer(futures_time::time::Duration::from(ack_delay))
316                .flat_map(|acks| futures::stream::iter(FrameAcknowledgements::<C>::new_multiple(acks)))
317                .filter(|acks| futures::future::ready(!acks.is_empty()))
318                .inspect(|acks| tracing::trace!(?acks, "acknowledgements sent"))
319                .map(|acks| Ok(SessionMessage::<C>::Acknowledge(acks)))
320                .forward(ctl_tx_clone)
321                .map(move |res| match res {
322                    Ok(_) => tracing::debug!("acknowledgement forwarding done"),
323                    Err(error) => tracing::debug!(%error, "acknowledgement forwarding failed"),
324                })
325                .instrument(tracing::debug_span!("acknowledgement_sender")),
326        );
327
328        // Resend outgoing frame Segments if they were not (partially or fully) acknowledged
329        let mut outgoing_frame_retries_tx_clone = context.outgoing_frame_retries_tx.clone();
330        let ctl_tx_clone = context.ctl_tx.clone();
331        let rb_rx_clone = context.rb_rx.clone();
332        let cfg = self.cfg;
333        hopr_async_runtime::prelude::spawn(
334            outgoing_frame_retries_rx
335                .map(move |rf: RetriedFrameId| {
336                    // Find out if the frame can be retried again in the future
337                    let frame_id = rf.frame_id;
338                    if let Some(next) = rf.next() {
339                        // Register the next retry if still possible
340                        let retry_at =
341                            next_deadline_with_backoff(next.retry_count, cfg.backoff_base, cfg.expected_packet_latency);
342                        if let Err(error) = outgoing_frame_retries_tx_clone.send_one((next, retry_at)) {
343                            tracing::error!(frame_id, %error, "failed to register next retry of frame");
344                        } else {
345                            tracing::debug!(frame_id, retry_in = ?retry_at.saturating_duration_since(Instant::now()), "next resend of outgoing frame");
346                        }
347                    } else {
348                        tracing::debug!(frame_id, "last outgoing retry of frame");
349                    }
350                    tracing::trace!(frame_id, "going to re-send entire frame");
351                    frame_id
352                })
353                .flat_map(move |frame_id| {
354                    // Find out all the segments of that frame to be retransmitted
355                    futures::stream::iter(
356                        rb_rx_clone
357                            .find(|s: &Segment| s.id().0 == frame_id)
358                            .into_iter()
359                            .inspect(|s| tracing::trace!(seg_id = %s.id(), "segment retransmit"))
360                            .map(|s| Ok(SessionMessage::<C>::Segment(s))),
361                    )
362                })
363                .forward(ctl_tx_clone) // Retransmit all the segments
364                .map(move |res| match res {
365                    Ok(_) => tracing::debug!("outgoing frame retries processing done"),
366                    Err(error) => tracing::error!(%error, "error while processing outgoing frame retries"),
367                })
368                .instrument(tracing::debug_span!("outgoing_frame_retries_sender")),
369        );
370
371        tracing::debug!("acknowledgement state has been started");
372        self.started.store(true, std::sync::atomic::Ordering::Relaxed);
373
374        Ok(())
375    }
376
377    #[tracing::instrument(name = "AcknowledgementState::stop", skip(self), fields(session_id = self.id))]
378    fn stop(&mut self) -> Result<(), SessionError> {
379        if let Some(mut ctx) = self.context.take() {
380            ctx.outgoing_frame_retries_tx.force_close();
381            ctx.incoming_frame_retries_tx.force_close();
382            ctx.ack_tx.close_channel();
383            ctx.ctl_tx.close_channel();
384
385            self.started.store(false, std::sync::atomic::Ordering::Relaxed);
386            tracing::debug!("state has been stopped");
387        } else {
388            tracing::warn!("cannot be stopped, because it is not running");
389        }
390
391        Ok(())
392    }
393
394    #[tracing::instrument(name = "AcknowledgementState::incoming_segment", skip(self), fields(session_id = self.id, frame_id = seg_id.0))]
395    fn incoming_segment(&mut self, seg_id: &SegmentId, _ind: SeqIndicator) -> Result<(), SessionError> {
396        tracing::trace!("segment received");
397
398        let ctx = self
399            .started
400            .load(std::sync::atomic::Ordering::Relaxed)
401            .then_some(self.context.as_mut())
402            .flatten()
403            .ok_or(SessionError::StateNotRunning)?;
404
405        // Register future requesting of segments for this frame
406        if self.cfg.mode.is_partial_ack_enabled() {
407            // Every incoming segment of this frame will move the deadline further
408            // into the future.
409            if let Err(error) = ctx.incoming_frame_retries_tx.send_one((
410                RetriedFrameId::with_retries(seg_id.0, self.cfg.max_incoming_frame_retries),
411                self.cfg.expected_packet_latency, // RTO_BASE_RECEIVER - when we expect the next segment to arrive
412            )) {
413                tracing::error!(%error, "failed to register incoming retry for frame");
414            }
415        }
416        Ok(())
417    }
418
419    #[tracing::instrument(name = "AcknowledgementState::incoming_retransmission_request", skip(self, request), fields(session_id = self.id))]
420    fn incoming_retransmission_request(&mut self, request: SegmentRequest<C>) -> Result<(), SessionError> {
421        // The state will respond to segment retransmission requests even
422        // if it has this feature disabled in the config.
423        tracing::trace!(count = request.len(), "segment retransmission requested");
424
425        let ctx = self
426            .started
427            .load(std::sync::atomic::Ordering::Relaxed)
428            .then_some(self.context.as_mut())
429            .flatten()
430            .ok_or(SessionError::StateNotRunning)?;
431
432        let (mut missing_seg_ids, mut missing_frame_ids): (Vec<_>, Vec<_>) =
433            request.into_iter().map(|s| (s, s.0)).unzip();
434
435        // Perform a single find to lock the RB only once
436        let segments = ctx.rb_rx.find(|s| {
437            // SegmentIds are guaranteed to be sorted, so we can use binary search
438            if let Ok(i) = missing_seg_ids.binary_search(&s.id()) {
439                missing_seg_ids.remove(i);
440                true
441            } else {
442                false
443            }
444        });
445
446        tracing::trace!(
447            found = segments.len(),
448            requested = missing_frame_ids.len(),
449            "found matching segments to be retransmitted"
450        );
451
452        // Partially acknowledged frames will not need to be fully resent in the future.
453        // Cancel all partially acknowledged frame resends.
454        if self.cfg.mode.is_full_ack_enabled() {
455            // Since the FrameIds are guaranteed to be sorted, we can simply dedup them.
456            missing_frame_ids.dedup();
457
458            if let Err(error) = ctx.outgoing_frame_retries_tx.send_many(
459                missing_frame_ids
460                    .into_iter()
461                    .map(|frame_id| (RetriedFrameId::no_retries(frame_id), Skip).into()),
462            ) {
463                tracing::error!(%error, "failed to cancel frame resend of partially acknowledged frames");
464            }
465        }
466
467        // Resend the segments via the Control Stream
468        segments
469            .into_iter()
470            .try_for_each(|s| {
471                tracing::trace!(seg_id = %s.id(), "retransmit segment on request");
472                ctx.ctl_tx.unbounded_send(SessionMessage::Segment(s))
473            })
474            .map_err(|e| SessionError::ProcessingError(e.to_string()))
475    }
476
477    #[tracing::instrument(name = "AcknowledgementState::incoming_acknowledged_frames", skip(self), fields(session_id = self.id))]
478    fn incoming_acknowledged_frames(&mut self, ack: FrameAcknowledgements<C>) -> Result<(), SessionError> {
479        tracing::trace!(count = ack.len(), "frame acknowledgements received");
480
481        let ctx = self
482            .started
483            .load(std::sync::atomic::Ordering::Relaxed)
484            .then_some(self.context.as_mut())
485            .flatten()
486            .ok_or(SessionError::StateNotRunning)?;
487
488        // Frame acknowledged, we will not need to resend it
489        if self.cfg.mode.is_full_ack_enabled()
490            && let Err(error) = ctx.outgoing_frame_retries_tx.send_many(
491                ack.into_iter()
492                    .inspect(|frame_id| tracing::trace!(frame_id, "frame acknowledged"))
493                    .map(|frame_id| (RetriedFrameId::no_retries(frame_id), Skip).into()),
494            )
495        {
496            tracing::error!(%error, "failed to cancel frame resend");
497        }
498
499        Ok(())
500    }
501
502    #[tracing::instrument(name = "AcknowledgementState::frame_complete", skip(self), fields(session_id = self.id))]
503    fn frame_complete(&mut self, frame_id: FrameId) -> Result<(), SessionError> {
504        tracing::trace!("frame complete");
505
506        let ctx = self
507            .started
508            .load(std::sync::atomic::Ordering::Relaxed)
509            .then_some(self.context.as_mut())
510            .flatten()
511            .ok_or(SessionError::StateNotRunning)?;
512
513        // Since the frame has been completed, push its ID into the acknowledgement queue
514        if let Err(error) = ctx.ack_tx.try_send(frame_id) {
515            tracing::error!(%error, "failed to acknowledge frame");
516        }
517
518        if self.cfg.mode.is_partial_ack_enabled() {
519            // No more requesting of segment retransmissions from frames that were completed
520            if let Err(error) = ctx
521                .incoming_frame_retries_tx
522                .send_one((RetriedFrameId::no_retries(frame_id), Skip))
523            {
524                tracing::error!(%error, "failed to cancel retry of acknowledged frame");
525            }
526        }
527
528        Ok(())
529    }
530
531    #[tracing::instrument(name = "AcknowledgementState::frame_emitted", skip(self), fields(session_id = self.id))]
532    fn frame_emitted(&mut self, id: FrameId) -> Result<(), SessionError> {
533        tracing::trace!("frame emitted");
534        let _ = self
535            .started
536            .load(std::sync::atomic::Ordering::Relaxed)
537            .then_some(self.context.as_mut())
538            .flatten()
539            .ok_or(SessionError::StateNotRunning)?;
540        Ok(())
541    }
542
543    #[tracing::instrument(name = "AcknowledgementState::frame_discarded", skip(self), fields(session_id = self.id))]
544    fn frame_discarded(&mut self, frame_id: FrameId) -> Result<(), SessionError> {
545        tracing::trace!("frame discarded");
546
547        let ctx = self
548            .started
549            .load(std::sync::atomic::Ordering::Relaxed)
550            .then_some(self.context.as_mut())
551            .flatten()
552            .ok_or(SessionError::StateNotRunning)?;
553
554        if self.cfg.mode.is_partial_ack_enabled() {
555            // No more requesting of segment retransmissions from frames that were discarded
556            if let Err(error) = ctx
557                .incoming_frame_retries_tx
558                .send_one((RetriedFrameId::no_retries(frame_id), Skip))
559            {
560                tracing::error!(%error, "failed to cancel retry of acknowledged frame");
561            }
562        }
563
564        Ok(())
565    }
566
567    #[tracing::instrument(name = "AcknowledgementState::segment_sent", skip(self, segment), fields(session_id = self.id, frame_id = segment.frame_id, seq_idx = segment.seq_idx))]
568    fn segment_sent(&mut self, segment: &Segment) -> Result<(), SessionError> {
569        tracing::trace!("segment sent");
570
571        let ctx = self
572            .started
573            .load(std::sync::atomic::Ordering::Relaxed)
574            .then_some(self.context.as_mut())
575            .flatten()
576            .ok_or(SessionError::StateNotRunning)?;
577
578        // Since segments are re-sent via Control stream, they are not later fed again
579        // into the ring buffer.
580        if !ctx.rb_tx.push(segment.clone()) {
581            tracing::error!("failed to push segment into ring buffer");
582        }
583
584        // When the last segment of a frame has been sent,
585        // add it to outgoing retries (if the full ack mode is enabled).
586        if segment.is_last() && self.cfg.mode.is_full_ack_enabled() {
587            tracing::trace!("last segment of frame sent");
588
589            if let Err(error) = ctx.outgoing_frame_retries_tx.send_one((
590                RetriedFrameId::with_retries(segment.frame_id, self.cfg.max_outgoing_frame_retries),
591                // The whole frame should be delivered and acknowledged
592                // once all its segments (seq_len) are sent,
593                // and the acknowledgement also comes back to us.
594                // Therefore, RTO_BASE_SENDER = latency * (seq_len + 1)
595                self.cfg.expected_packet_latency * (segment.seq_flags.seq_len() + 1) as u32,
596            )) {
597                tracing::error!(%error, "failed to insert outgoing retry of a frame");
598            }
599        }
600
601        Ok(())
602    }
603}
604
605#[cfg(test)]
606mod tests {
607    use anyhow::Context;
608
609    use super::*;
610    use crate::{
611        processing::types::{FrameBuilder, FrameDashMap, FrameMap},
612        protocol::SeqNum,
613        utils::segment,
614    };
615
616    const FRAME_SIZE: usize = 1500;
617
618    const MTU: usize = 1000;
619
620    #[test_log::test(tokio::test)]
621    async fn ack_state_sender_must_acknowledge_completed_frames() -> anyhow::Result<()> {
622        let cfg = AcknowledgementStateConfig {
623            expected_packet_latency: Duration::from_millis(10),
624            acknowledgement_delay: Duration::from_millis(2),
625            ..Default::default()
626        };
627
628        let inspector = FrameInspector(FrameDashMap::with_capacity(10));
629        let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
630
631        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
632        state.run(SocketComponents {
633            inspector: inspector.into(),
634            ctl_tx,
635        })?;
636
637        let acked_frame_ids = [1, 2, 3];
638
639        for &frame_id in &acked_frame_ids {
640            state.frame_complete(frame_id)?;
641        }
642
643        tokio::time::sleep(cfg.acknowledgement_delay * 2).await;
644
645        state.stop()?;
646
647        let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
648            .await
649            .context("timeout receiving Control messages")?;
650
651        assert_eq!(1, ctl_msgs.len());
652
653        assert_eq!(
654            ctl_msgs[0],
655            SessionMessage::Acknowledge(acked_frame_ids.to_vec().try_into()?)
656        );
657
658        Ok(())
659    }
660
661    #[parameterized::parameterized(num_frames = { 1, 2, 3 })]
662    #[parameterized_macro(test_log::test(tokio::test))]
663    async fn ack_state_sender_must_resend_unacknowledged_frames(num_frames: usize) -> anyhow::Result<()> {
664        const NUM_RETRIES: usize = 2;
665
666        let cfg = AcknowledgementStateConfig {
667            mode: AcknowledgementMode::Full,
668            expected_packet_latency: Duration::from_millis(2),
669            max_outgoing_frame_retries: NUM_RETRIES,
670            ..Default::default()
671        };
672
673        let inspector = FrameInspector(FrameDashMap::with_capacity(10));
674        let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
675
676        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
677        state.run(SocketComponents {
678            inspector: inspector.into(),
679            ctl_tx,
680        })?;
681
682        let mut expected_frame_segments = Vec::new();
683        let num_segments_in_frame = FRAME_SIZE / MTU + 1;
684        for i in 1..=num_frames {
685            let expected_segments = segment(hopr_crypto_random::random_bytes::<FRAME_SIZE>(), MTU, i as FrameId)?;
686            for segment in &expected_segments {
687                state.segment_sent(segment)?;
688            }
689            expected_frame_segments.push(expected_segments);
690        }
691
692        let expected_frame_delivery = cfg.expected_packet_latency * (num_segments_in_frame + 1) as u32;
693        tokio::time::sleep(2 * expected_frame_delivery).await;
694        state.stop()?;
695
696        let ctl_msg = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
697            .await
698            .context("timeout receiving Control message")?;
699
700        let retransmitted_segments = ctl_msg
701            .into_iter()
702            .map(|m| m.try_as_segment().ok_or(anyhow::anyhow!("must be segment")))
703            .collect::<Result<Vec<_>, _>>()?;
704
705        assert_eq!(
706            NUM_RETRIES * num_segments_in_frame * num_frames,
707            retransmitted_segments.len()
708        );
709
710        let total_segments = expected_frame_segments.iter().map(|m| m.len()).sum::<usize>();
711        let expected_segments = expected_frame_segments
712            .into_iter()
713            .flatten()
714            .cycle()
715            .take(total_segments * NUM_RETRIES)
716            .collect::<Vec<_>>();
717        assert_eq!(expected_segments, retransmitted_segments);
718
719        Ok(())
720    }
721
722    #[test_log::test(tokio::test)]
723    async fn ack_state_sender_must_not_resend_unacknowledged_frame_when_full_resend_disabled() -> anyhow::Result<()> {
724        const NUM_RETRIES: usize = 2;
725
726        let cfg = AcknowledgementStateConfig {
727            mode: AcknowledgementMode::Partial,
728            expected_packet_latency: Duration::from_millis(2),
729            max_outgoing_frame_retries: NUM_RETRIES,
730            ..Default::default()
731        };
732
733        let inspector = FrameInspector(FrameDashMap::with_capacity(10));
734        let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
735
736        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
737        state.run(SocketComponents {
738            inspector: inspector.into(),
739            ctl_tx,
740        })?;
741
742        let expected_segments = segment(hopr_crypto_random::random_bytes::<FRAME_SIZE>(), MTU, 1)?;
743        for segment in &expected_segments {
744            state.segment_sent(segment)?;
745        }
746
747        let expected_frame_delivery = cfg.expected_packet_latency * (expected_segments.len() + 1) as u32;
748        tokio::time::sleep(2 * expected_frame_delivery).await;
749        state.stop()?;
750
751        // No retransmission should be sent because it is disabled
752        assert!(ctl_rx.collect::<Vec<_>>().await.is_empty());
753
754        Ok(())
755    }
756
757    #[tokio::test]
758    async fn ack_state_sender_must_not_resend_acknowledged_frame() -> anyhow::Result<()> {
759        let cfg = AcknowledgementStateConfig {
760            mode: AcknowledgementMode::Full,
761            expected_packet_latency: Duration::from_millis(2),
762            max_outgoing_frame_retries: 1,
763            ..Default::default()
764        };
765
766        let inspector = FrameInspector(FrameDashMap::with_capacity(10));
767        let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
768
769        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
770        state.run(SocketComponents {
771            inspector: inspector.into(),
772            ctl_tx,
773        })?;
774
775        let expected_segments = segment(hopr_crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 1)?;
776        for segment in &expected_segments {
777            state.segment_sent(segment)?;
778        }
779
780        // Acknowledge the frame
781        state.incoming_acknowledged_frames(vec![1].try_into()?)?;
782
783        tokio::time::sleep(10 * cfg.expected_packet_latency).await;
784
785        state.stop()?;
786
787        // No retransmission should be sent because the frame was already acknowledged.
788        assert!(ctl_rx.collect::<Vec<_>>().await.is_empty());
789
790        Ok(())
791    }
792
793    #[test_log::test(tokio::test)]
794    async fn ack_state_sender_must_not_resend_entire_frame_when_already_partially_acknowledged() -> anyhow::Result<()> {
795        let cfg = AcknowledgementStateConfig {
796            mode: AcknowledgementMode::Full,
797            expected_packet_latency: Duration::from_millis(2),
798            max_outgoing_frame_retries: 1,
799            ..Default::default()
800        };
801
802        let inspector = FrameInspector(FrameDashMap::with_capacity(10));
803        let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
804
805        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
806        state.run(SocketComponents {
807            inspector: inspector.into(),
808            ctl_tx,
809        })?;
810
811        let expected_segments = segment(hopr_crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 1)?;
812
813        // Load segments into the ring buffer
814        for segment in &expected_segments {
815            state.segment_sent(segment)?;
816        }
817
818        tokio::time::sleep(cfg.expected_packet_latency).await;
819
820        // Partially acknowledge the frame (report the first segment as missing)
821        state.incoming_retransmission_request(SegmentRequest::from_iter([(1, [0b10000000].into())]))?;
822
823        state.stop()?;
824
825        // Only segment 1 must be retransmitted
826        let ctl_msgs = ctl_rx.collect::<Vec<_>>().await;
827        assert_eq!(1, ctl_msgs.len());
828        assert_eq!(ctl_msgs[0], SessionMessage::Segment(expected_segments[0].clone()));
829
830        Ok(())
831    }
832
833    #[test_log::test(tokio::test)]
834    async fn ack_state_sender_must_retransmit_segments_when_requested() -> anyhow::Result<()> {
835        let cfg = AcknowledgementStateConfig {
836            mode: AcknowledgementMode::Full,
837            expected_packet_latency: Duration::from_millis(2),
838            max_outgoing_frame_retries: 1,
839            ..Default::default()
840        };
841
842        let inspector = FrameInspector(FrameDashMap::with_capacity(10));
843        let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
844
845        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
846        state.run(SocketComponents {
847            inspector: inspector.into(),
848            ctl_tx,
849        })?;
850
851        let expected_segments_1 = segment(hopr_crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 1)?;
852        // Load frame 1 segments into the ring buffer
853        for segment in &expected_segments_1 {
854            state.segment_sent(segment)?;
855        }
856
857        let expected_segments_2 = segment(hopr_crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 2)?;
858        // Load frame 2 segments into the ring buffer
859        for segment in &expected_segments_2 {
860            state.segment_sent(segment)?;
861        }
862
863        tokio::time::sleep(cfg.expected_packet_latency).await;
864
865        // Request different segments to be retransmitted
866        state.incoming_retransmission_request(SegmentRequest::from_iter([
867            (1, [0b11100000].into()),
868            (2, [0b11100000].into()),
869        ]))?;
870        tokio::time::sleep(cfg.expected_packet_latency).await;
871
872        state.incoming_retransmission_request(SegmentRequest::from_iter([(2, [0b11000000].into())]))?;
873        tokio::time::sleep(cfg.expected_packet_latency).await;
874
875        state.incoming_retransmission_request(SegmentRequest::from_iter([(2, [0b01000000].into())]))?;
876        tokio::time::sleep(cfg.expected_packet_latency).await;
877
878        state.stop()?;
879
880        let ctl_msgs = ctl_rx.collect::<Vec<_>>().await;
881
882        assert_eq!(9, ctl_msgs.len());
883        // Request 1 - frame 1
884        assert_eq!(ctl_msgs[0], SessionMessage::Segment(expected_segments_1[0].clone()));
885        assert_eq!(ctl_msgs[1], SessionMessage::Segment(expected_segments_1[1].clone()));
886        assert_eq!(ctl_msgs[2], SessionMessage::Segment(expected_segments_1[2].clone()));
887        // Request 1 - frame 2
888        assert_eq!(ctl_msgs[3], SessionMessage::Segment(expected_segments_2[0].clone()));
889        assert_eq!(ctl_msgs[4], SessionMessage::Segment(expected_segments_2[1].clone()));
890        assert_eq!(ctl_msgs[5], SessionMessage::Segment(expected_segments_2[2].clone()));
891
892        // Request 2 - frame 2
893        assert_eq!(ctl_msgs[6], SessionMessage::Segment(expected_segments_2[0].clone()));
894        assert_eq!(ctl_msgs[7], SessionMessage::Segment(expected_segments_2[1].clone()));
895
896        // Request 3 - frame 2
897        assert_eq!(ctl_msgs[8], SessionMessage::Segment(expected_segments_2[1].clone()));
898
899        Ok(())
900    }
901
902    #[tokio::test]
903    async fn ack_state_receiver_must_request_missing_frames_when_partial_acks_are_enabled() -> anyhow::Result<()> {
904        let cfg = AcknowledgementStateConfig {
905            mode: AcknowledgementMode::Partial,
906            expected_packet_latency: Duration::from_millis(2),
907            max_incoming_frame_retries: 1,
908            ..Default::default()
909        };
910
911        let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
912        let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
913
914        let segments = segment(hopr_crypto_random::random_bytes::<FRAME_SIZE>(), MTU, 1)?;
915
916        inspector
917            .0
918            .entry(1)
919            .try_as_vacant()
920            .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
921            .insert(FrameBuilder::from(segments[0].clone()));
922
923        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
924        state.run(SocketComponents {
925            inspector: inspector.into(),
926            ctl_tx,
927        })?;
928
929        state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
930
931        tokio::time::sleep(cfg.expected_packet_latency * 2).await;
932
933        state.stop()?;
934
935        let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
936            .await
937            .context("timeout receiving Control messages")?;
938
939        assert_eq!(1, ctl_msgs.len());
940        assert_eq!(
941            ctl_msgs[0],
942            SessionMessage::Request(SegmentRequest::from_iter([(1, [0b01000000].into())]))
943        );
944
945        Ok(())
946    }
947
948    #[tokio::test]
949    async fn ack_state_receiver_must_not_request_missing_frames_when_partial_acks_are_disabled() -> anyhow::Result<()> {
950        let cfg = AcknowledgementStateConfig {
951            mode: AcknowledgementMode::Full,
952            expected_packet_latency: Duration::from_millis(2),
953            max_incoming_frame_retries: 1,
954            ..Default::default()
955        };
956
957        let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
958        let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
959
960        let segments = segment(hopr_crypto_random::random_bytes::<FRAME_SIZE>(), MTU, 1)?;
961
962        inspector
963            .0
964            .entry(1)
965            .try_as_vacant()
966            .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
967            .insert(FrameBuilder::from(segments[0].clone()));
968
969        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
970        state.run(SocketComponents {
971            inspector: inspector.into(),
972            ctl_tx,
973        })?;
974
975        state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
976
977        tokio::time::sleep(cfg.expected_packet_latency * 2).await;
978
979        state.stop()?;
980
981        let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
982            .await
983            .context("timeout receiving Control messages")?;
984
985        assert!(ctl_msgs.iter().all(|m| !matches!(m, SessionMessage::Request(_))));
986
987        Ok(())
988    }
989
990    #[tokio::test]
991    async fn ack_state_receiver_must_continue_requesting_missing_frames_when_frame_not_completed() -> anyhow::Result<()>
992    {
993        let cfg = AcknowledgementStateConfig {
994            mode: AcknowledgementMode::Partial,
995            expected_packet_latency: Duration::from_millis(2),
996            max_incoming_frame_retries: 3,
997            ..Default::default()
998        };
999
1000        let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
1001        let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
1002
1003        let segments = segment(hopr_crypto_random::random_bytes::<{ 2 * FRAME_SIZE }>(), MTU, 1)?;
1004
1005        inspector
1006            .0
1007            .entry(1)
1008            .try_as_vacant()
1009            .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
1010            .insert(FrameBuilder::from(segments[0].clone()));
1011
1012        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
1013        state.run(SocketComponents {
1014            inspector: inspector.clone().into(),
1015            ctl_tx,
1016        })?;
1017
1018        state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
1019
1020        tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1021
1022        inspector
1023            .0
1024            .entry(1)
1025            .try_as_occupied()
1026            .ok_or(anyhow::anyhow!("frame 1 must be occupied"))?
1027            .get_mut()
1028            .add_segment(segments[1].clone())?;
1029
1030        state.incoming_segment(&segments[1].id(), (segments.len() as SeqNum).try_into()?)?;
1031
1032        tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1033
1034        state.stop()?;
1035
1036        let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
1037            .await
1038            .context("timeout receiving Control messages")?;
1039
1040        assert_eq!(2, ctl_msgs.len());
1041        assert_eq!(
1042            ctl_msgs[0],
1043            SessionMessage::Request(SegmentRequest::from_iter([(1, [0b01100000].into())]))
1044        );
1045
1046        assert_eq!(
1047            ctl_msgs[1],
1048            SessionMessage::Request(SegmentRequest::from_iter([(1, [0b00100000].into())]))
1049        );
1050
1051        Ok(())
1052    }
1053
1054    #[tokio::test]
1055    async fn ack_state_receiver_must_continue_requesting_missing_frames_and_acknowledge_once_complete()
1056    -> anyhow::Result<()> {
1057        let cfg = AcknowledgementStateConfig {
1058            mode: AcknowledgementMode::Partial,
1059            expected_packet_latency: Duration::from_millis(2),
1060            max_incoming_frame_retries: 3,
1061            acknowledgement_delay: Duration::from_millis(5),
1062            ..Default::default()
1063        };
1064
1065        let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
1066        let (ctl_tx, ctl_rx) = futures::channel::mpsc::unbounded();
1067
1068        let segments = segment(hopr_crypto_random::random_bytes::<{ 2 * FRAME_SIZE }>(), MTU, 1)?;
1069
1070        // Segment 1
1071        inspector
1072            .0
1073            .entry(1)
1074            .try_as_vacant()
1075            .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
1076            .insert(FrameBuilder::from(segments[0].clone()));
1077
1078        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
1079        state.run(SocketComponents {
1080            inspector: inspector.clone().into(),
1081            ctl_tx,
1082        })?;
1083
1084        // Segment 2
1085        state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
1086
1087        tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1088
1089        inspector
1090            .0
1091            .entry(1)
1092            .try_as_occupied()
1093            .ok_or(anyhow::anyhow!("frame 1 must be occupied"))?
1094            .get_mut()
1095            .add_segment(segments[1].clone())?;
1096
1097        state.incoming_segment(&segments[1].id(), (segments.len() as SeqNum).try_into()?)?;
1098
1099        tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1100
1101        // Segment 3
1102        inspector
1103            .0
1104            .entry(1)
1105            .try_as_occupied()
1106            .ok_or(anyhow::anyhow!("frame 1 must be occupied"))?
1107            .get_mut()
1108            .add_segment(segments[2].clone())?;
1109
1110        state.incoming_segment(&segments[2].id(), (segments.len() as SeqNum).try_into()?)?;
1111        state.frame_complete(1)?;
1112
1113        tokio::time::sleep(cfg.acknowledgement_delay * 2).await;
1114
1115        state.stop()?;
1116
1117        let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
1118            .await
1119            .context("timeout receiving Control messages")?;
1120
1121        assert_eq!(3, ctl_msgs.len());
1122        assert_eq!(
1123            ctl_msgs[0],
1124            SessionMessage::Request(SegmentRequest::from_iter([(1, [0b01100000].into())]))
1125        );
1126
1127        assert_eq!(
1128            ctl_msgs[1],
1129            SessionMessage::Request(SegmentRequest::from_iter([(1, [0b00100000].into())]))
1130        );
1131
1132        assert_eq!(ctl_msgs[2], SessionMessage::Acknowledge(vec![1].try_into()?));
1133
1134        Ok(())
1135    }
1136}