Skip to main content

hopr_protocol_session/socket/
ack_state.rs

1//! This module defines the [`SocketState`] that turns [`SessionSocket`](super::SessionSocket) into
2//! a reliable socket, with segment/frame retransmission and frame acknowledgements.
3
4use std::{
5    sync::atomic::AtomicBool,
6    time::{Duration, Instant},
7};
8
9use futures::{FutureExt, StreamExt, channel::mpsc::Sender};
10use futures_time::stream::StreamExt as TimeStreamExt;
11use tracing::Instrument;
12
13use crate::{
14    errors::SessionError,
15    processing::types::FrameInspector,
16    protocol::{FrameAcknowledgements, FrameId, Segment, SegmentId, SegmentRequest, SeqIndicator, SessionMessage},
17    socket::{SocketState, state::SocketComponents},
18    utils::{
19        RetriedFrameId, RingBufferProducer, RingBufferView, next_deadline_with_backoff, searchable_ringbuffer,
20        skip_queue::{Skip, SkipDelaySender, skip_delay_channel},
21    },
22};
23
24/// Indicates the acknowledgement mode of a [stateful](AcknowledgementState) Session socket.
25#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
26pub enum AcknowledgementMode {
27    /// Partial frames are acknowledged, leading to receiver-driven retransmission requests.
28    /// The frame sender is never going to retransmit the entire frame.
29    Partial,
30    /// Only full frames are acknowledged, leading to the full-frame retransmission if
31    /// no acknowledgement is received by the frame sender.
32    Full,
33    /// Both partial and full acknowledgements are sent by the receiver.
34    ///
35    /// If a frame is partially acknowledged first, only receiver-driven retransmission requests follow
36    /// (as with [`AcknowledgementMode::Partial`].
37    ///
38    /// If the frame sender receives no acknowledgement (partial nor full), it retransmits
39    /// the entire frame (as in [`AcknowledgementMode::Full`]).
40    #[default]
41    Both,
42}
43
44impl AcknowledgementMode {
45    /// Indicates if `self` is [`AcknowledgementMode::Partial`] or [`AcknowledgementMode::Both`].
46    #[inline]
47    fn is_partial_ack_enabled(&self) -> bool {
48        matches!(self, Self::Partial | Self::Both)
49    }
50
51    /// Indicates if `self` is [`AcknowledgementMode::Full`] or [`AcknowledgementMode::Both`].
52    #[inline]
53    fn is_full_ack_enabled(&self) -> bool {
54        matches!(self, Self::Full | Self::Both)
55    }
56}
57
58/// Configuration object of the [`AcknowledgementState`].
59#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault)]
60pub struct AcknowledgementStateConfig {
61    /// Mode of frame acknowledgement.
62    ///
63    /// Default is [`AcknowledgementMode::Both`]
64    pub mode: AcknowledgementMode,
65
66    /// The expected (average) latency of a packet (= single frame segment).
67    ///
68    /// Default is 20 ms
69    #[default(Duration::from_millis(20))]
70    pub expected_packet_latency: Duration,
71
72    /// Backoff base applied for segment or frame retransmissions.
73    ///
74    /// Default is 1.2
75    #[default(1.2)]
76    pub backoff_base: f64,
77
78    /// The maximum number of receiver-driven segment retransmission requests.
79    ///
80    /// Default is 3
81    #[default(3)]
82    pub max_incoming_frame_retries: usize,
83
84    /// The maximum number of sender-driven full-frame retransmissions.
85    ///
86    /// Default is 3
87    #[default(3)]
88    pub max_outgoing_frame_retries: usize,
89
90    /// Delay between acknowledgement batches.
91    ///
92    /// Default is 50 ms
93    #[default(Duration::from_millis(50))]
94    pub acknowledgement_delay: Duration,
95
96    /// Number of segments to hold back for retransmission upon other party's request.
97    /// Minimum is 1024.
98    ///
99    /// Default is 16 384.
100    #[default(16384)]
101    pub lookbehind_segments: usize,
102}
103
104impl AcknowledgementStateConfig {
105    fn normalize(self) -> AcknowledgementStateConfig {
106        Self {
107            mode: self.mode,
108            expected_packet_latency: self.expected_packet_latency.max(Duration::from_millis(1)),
109            backoff_base: self.backoff_base.max(1.0),
110            max_incoming_frame_retries: self.max_incoming_frame_retries,
111            max_outgoing_frame_retries: self.max_outgoing_frame_retries,
112            acknowledgement_delay: self.acknowledgement_delay.max(Duration::from_millis(1)),
113            lookbehind_segments: self.lookbehind_segments.max(1024),
114        }
115    }
116}
117
118#[derive(Clone)]
119struct AcknowledgementStateContext<const C: usize> {
120    rb_tx: RingBufferProducer<Segment>,
121    rb_rx: RingBufferView<Segment>,
122    incoming_frame_retries_tx: SkipDelaySender<RetriedFrameId>,
123    outgoing_frame_retries_tx: SkipDelaySender<RetriedFrameId>,
124    ack_tx: futures::channel::mpsc::Sender<FrameId>,
125    inspector: FrameInspector,
126    ctl_tx: Sender<SessionMessage<C>>,
127}
128
129#[cfg_attr(doc, aquamarine::aquamarine)]
130/// Represents a Session socket state is able to process acknowledgements.
131///
132/// # Retransmission driven by the Receiver
133/// ```mermaid
134/// sequenceDiagram
135///     Note over Sender,Receiver: Frame 1
136///     rect rgb(191, 223, 255)
137///     Note left of Sender: Frame 1 in buffer
138///     Sender->>Receiver: Segment 1/3 of Frame 1
139///     Sender->>Receiver: Segment 2/3 of Frame 1
140///     Sender--xReceiver: Segment 3/3 of Frame 1
141///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
142///     Receiver->>Sender: Request Segment 3 of Frame 1
143///     Sender->>Receiver: Segment 3/3 of Frame 1
144///     Receiver->>Sender: Acknowledge Frame 1
145///     Note left of Sender: Frame 1 dropped from buffer
146///     end
147///     Note over Sender,Receiver: Frame 1 delivered
148/// ```
149///
150/// # Retransmission driven by the Sender
151/// ```mermaid
152/// sequenceDiagram
153///     Note over Sender,Receiver: Frame 1
154///     rect rgb(191, 223, 255)
155///     Note left of Sender: Frame 1 in buffer
156///     Sender->>Receiver: Segment 1/3 of Frame 1
157///     Sender->>Receiver: Segment 2/3 of Frame 1
158///     Sender--xReceiver: Segment 3/3 of Frame 1
159///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
160///     Receiver--xSender: Request Segment 3 of Frame 1
161///     Note left of Sender: RTO_BASE_SENDER elapsed
162///     Sender->>Receiver: Segment 1/3 of Frame 1
163///     Sender->>Receiver: Segment 2/3 of Frame 1
164///     Sender->>Receiver: Segment 3/3 of Frame 1
165///     Receiver->>Sender: Acknowledge Frame 1
166///     Note left of Sender: Frame 1 dropped from buffer
167///     end
168///     Note over Sender,Receiver: Frame 1 delivered
169/// ```
170///
171/// # Sender-Receiver retransmission handover
172///
173/// ```mermaid
174///    sequenceDiagram
175///     Note over Sender,Receiver: Frame 1
176///     rect rgb(191, 223, 255)
177///     Note left of Sender: Frame 1 in buffer
178///     Sender->>Receiver: Segment 1/3 of Frame 1
179///     Sender--xReceiver: Segment 2/3 of Frame 1
180///     Sender--xReceiver: Segment 3/3 of Frame 1
181///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
182///     Receiver->>Sender: Request Segments 2,3 of Frame 1
183///     Note left of Sender: RTO_BASE_SENDER cancelled
184///     Sender->>Receiver: Segment 2/3 of Frame 1
185///     Sender--xReceiver: Segment 3/3 of Frame 1
186///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
187///     Receiver--xSender: Request Segments 3 of Frame 1
188///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
189///     Receiver->>Sender: Request Segments 3 of Frame 1
190///     Sender->>Receiver: Segment 3/3 of Frame 1
191///     Receiver->>Sender: Acknowledge Frame 1
192///     Note left of Sender: Frame 1 dropped from buffer
193///     end
194///     Note over Sender,Receiver: Frame 1 delivered
195/// ```
196///
197/// # Retransmission failure
198///
199/// ```mermaid
200///    sequenceDiagram
201///     Note over Sender,Receiver: Frame 1
202///     rect rgb(191, 223, 255)
203///     Note left of Sender: Frame 1 in buffer
204///     Sender->>Receiver: Segment 1/3 of Frame 1
205///     Sender->>Receiver: Segment 2/3 of Frame 1
206///     Sender--xReceiver: Segment 3/3 of Frame 1
207///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
208///     Receiver--xSender: Request Segment 3 of Frame 1
209///     Note left of Sender: RTO_BASE_SENDER elapsed
210///     Sender--xReceiver: Segment 1/3 of Frame 1
211///     Sender--xReceiver: Segment 2/3 of Frame 1
212///     Sender--xReceiver: Segment 3/3 of Frame 1
213///     Note left of Sender: FRAME_MAX_AGE elapsed<br/>Frame 1 dropped from buffer
214///     Note right of Receiver: FRAME_MAX_AGE elapsed<br/>Frame 1 dropped from buffer
215///     end
216///     Note over Sender,Receiver: Frame 1 never delivered
217/// ```
218#[derive(Clone)]
219pub struct AcknowledgementState<const C: usize> {
220    id: String,
221    cfg: AcknowledgementStateConfig,
222    context: Option<AcknowledgementStateContext<C>>,
223    started: std::sync::Arc<AtomicBool>,
224}
225
226impl<const C: usize> AcknowledgementState<C> {
227    pub fn new<I: std::fmt::Display>(session_id: I, cfg: AcknowledgementStateConfig) -> Self {
228        Self {
229            id: session_id.to_string(),
230            cfg: cfg.normalize(),
231            context: Default::default(),
232            started: std::sync::Arc::new(AtomicBool::new(false)),
233        }
234    }
235}
236
237impl<const C: usize> SocketState<C> for AcknowledgementState<C> {
238    fn session_id(&self) -> &str {
239        &self.id
240    }
241
242    #[tracing::instrument(name = "AcknowledgementState", skip(self, socket_components), fields(session_id = self.id))]
243    fn run(&mut self, socket_components: SocketComponents<C>) -> Result<(), SessionError> {
244        if self.started.load(std::sync::atomic::Ordering::Relaxed) && self.context.is_some() {
245            return Err(SessionError::InvalidState("state is already running".into()));
246        }
247
248        let (incoming_frame_retries_tx, incoming_frame_retries_rx) = skip_delay_channel();
249        let (outgoing_frame_retries_tx, outgoing_frame_retries_rx) = skip_delay_channel();
250        let (rb_tx, rb_rx) = searchable_ringbuffer(self.cfg.lookbehind_segments);
251
252        // Full frame acknowledgements get a special channel with fixed capacity
253        let (ack_tx, ack_rx) = futures::channel::mpsc::channel(2 * self.cfg.lookbehind_segments);
254
255        let context = self.context.insert(AcknowledgementStateContext {
256            rb_tx,
257            rb_rx,
258            incoming_frame_retries_tx,
259            outgoing_frame_retries_tx,
260            ack_tx,
261            ctl_tx: socket_components.ctl_tx,
262            inspector: socket_components
263                .inspector
264                .ok_or(SessionError::InvalidState("inspector is not available".into()))?,
265        });
266
267        if self.cfg.mode.is_partial_ack_enabled() {
268            // For partially received frames incomplete for too long,
269            // missing segments will be asked for retransmission
270            let mut incoming_frame_retries_tx_clone = context.incoming_frame_retries_tx.clone();
271            let ctl_tx_clone = context.ctl_tx.clone();
272            let frame_inspector_clone = context.inspector.clone();
273            let cfg = self.cfg;
274            hopr_utils::runtime::prelude::spawn(incoming_frame_retries_rx
275                .filter_map(move |rf| {
276                    let frame_id = rf.frame_id;
277                    let missing_segments = frame_inspector_clone.missing_segments(&frame_id).unwrap_or_default();
278                    if !missing_segments.is_empty() {
279                        // Find out if we need to subscribe for further retries of this Frame
280                        if let Some(next) = rf.next() {
281                            // Register the next retry if still possible
282                            let retry_at = next_deadline_with_backoff(next.retry_count, cfg.backoff_base, cfg.expected_packet_latency);
283                            if let Err(error) = incoming_frame_retries_tx_clone.send_one((next, retry_at)) {
284                                tracing::error!(frame_id, %error, "failed to register next resend of incoming frame");
285                            } else {
286                                tracing::debug!(frame_id, retry_in = ?retry_at.saturating_duration_since(Instant::now()), "next resend request of incoming frame segments");
287                            }
288                        } else {
289                            tracing::debug!(frame_id, "last request of incoming frame segments");
290                        }
291
292                        futures::future::ready(Some((frame_id, missing_segments)))
293                    } else {
294                        tracing::debug!(frame_id, "no more missing segments in frame");
295                        futures::future::ready(None)
296                    }
297                })
298                .ready_chunks(SegmentRequest::<C>::MAX_ENTRIES)
299                .inspect(|r| tracing::trace!(req = ?r, "requesting segments resend"))
300                .map(|a| Ok(SessionMessage::<C>::Request(a.into_iter().collect())))
301                .forward(ctl_tx_clone)
302                .map(move |res| match res {
303                    Ok(_) => tracing::debug!("incoming frame resends processing done"),
304                    Err(error) => tracing::error!(%error, "error while processing incoming frame resends")
305                })
306                .instrument(tracing::debug_span!("incoming_frame_retries_sender"))
307            );
308        }
309
310        // Send out Frame Acknowledgements chunked as Control messages
311        let ctl_tx_clone = context.ctl_tx.clone();
312        let ack_delay = self.cfg.acknowledgement_delay;
313        hopr_utils::runtime::prelude::spawn(
314            ack_rx
315                .buffer(futures_time::time::Duration::from(ack_delay))
316                .flat_map(|acks| futures::stream::iter(FrameAcknowledgements::<C>::new_multiple(acks)))
317                .filter(|acks| futures::future::ready(!acks.is_empty()))
318                .inspect(|acks| tracing::trace!(?acks, "acknowledgements sent"))
319                .map(|acks| Ok(SessionMessage::<C>::Acknowledge(acks)))
320                .forward(ctl_tx_clone)
321                .map(move |res| match res {
322                    Ok(_) => tracing::debug!("acknowledgement forwarding done"),
323                    Err(error) => tracing::debug!(%error, "acknowledgement forwarding failed"),
324                })
325                .instrument(tracing::debug_span!("acknowledgement_sender")),
326        );
327
328        // Resend outgoing frame Segments if they were not (partially or fully) acknowledged
329        let mut outgoing_frame_retries_tx_clone = context.outgoing_frame_retries_tx.clone();
330        let ctl_tx_clone = context.ctl_tx.clone();
331        let rb_rx_clone = context.rb_rx.clone();
332        let cfg = self.cfg;
333        hopr_utils::runtime::prelude::spawn(
334            outgoing_frame_retries_rx
335                .map(move |rf: RetriedFrameId| {
336                    // Find out if the frame can be retried again in the future
337                    let frame_id = rf.frame_id;
338                    if let Some(next) = rf.next() {
339                        // Register the next retry if still possible
340                        let retry_at =
341                            next_deadline_with_backoff(next.retry_count, cfg.backoff_base, cfg.expected_packet_latency);
342                        if let Err(error) = outgoing_frame_retries_tx_clone.send_one((next, retry_at)) {
343                            tracing::error!(frame_id, %error, "failed to register next retry of frame");
344                        } else {
345                            tracing::debug!(frame_id, retry_in = ?retry_at.saturating_duration_since(Instant::now()), "next resend of outgoing frame");
346                        }
347                    } else {
348                        tracing::debug!(frame_id, "last outgoing retry of frame");
349                    }
350                    tracing::trace!(frame_id, "going to re-send entire frame");
351                    frame_id
352                })
353                .flat_map(move |frame_id| {
354                    // Find out all the segments of that frame to be retransmitted
355                    futures::stream::iter(
356                        rb_rx_clone
357                            .find(|s: &Segment| s.id().0 == frame_id)
358                            .into_iter()
359                            .inspect(|s| tracing::trace!(seg_id = %s.id(), "segment retransmit"))
360                            .map(|s| Ok(SessionMessage::<C>::Segment(s))),
361                    )
362                })
363                .forward(ctl_tx_clone) // Retransmit all the segments
364                .map(move |res| match res {
365                    Ok(_) => tracing::debug!("outgoing frame retries processing done"),
366                    Err(error) => tracing::error!(%error, "error while processing outgoing frame retries"),
367                })
368                .instrument(tracing::debug_span!("outgoing_frame_retries_sender")),
369        );
370
371        tracing::debug!("acknowledgement state has been started");
372        self.started.store(true, std::sync::atomic::Ordering::Relaxed);
373
374        Ok(())
375    }
376
377    #[tracing::instrument(name = "AcknowledgementState::stop", skip(self), fields(session_id = self.id))]
378    fn stop(&mut self) -> Result<(), SessionError> {
379        if let Some(mut ctx) = self.context.take() {
380            ctx.outgoing_frame_retries_tx.force_close();
381            ctx.incoming_frame_retries_tx.force_close();
382            ctx.ack_tx.close_channel();
383            ctx.ctl_tx.close_channel();
384
385            self.started.store(false, std::sync::atomic::Ordering::Relaxed);
386            tracing::debug!("state has been stopped");
387        } else {
388            tracing::warn!("cannot be stopped, because it is not running");
389        }
390
391        Ok(())
392    }
393
394    #[tracing::instrument(name = "AcknowledgementState::incoming_segment", skip(self), fields(session_id = self.id, frame_id = seg_id.0))]
395    fn incoming_segment(&mut self, seg_id: &SegmentId, _ind: SeqIndicator) -> Result<(), SessionError> {
396        tracing::trace!("segment received");
397
398        let ctx = self
399            .started
400            .load(std::sync::atomic::Ordering::Relaxed)
401            .then_some(self.context.as_mut())
402            .flatten()
403            .ok_or(SessionError::StateNotRunning)?;
404
405        // Register future requesting of segments for this frame
406        if self.cfg.mode.is_partial_ack_enabled() {
407            // Every incoming segment of this frame will move the deadline further
408            // into the future.
409            if let Err(error) = ctx.incoming_frame_retries_tx.send_one((
410                RetriedFrameId::with_retries(seg_id.0, self.cfg.max_incoming_frame_retries),
411                self.cfg.expected_packet_latency, // RTO_BASE_RECEIVER - when we expect the next segment to arrive
412            )) {
413                tracing::error!(%error, "failed to register incoming retry for frame");
414            }
415        }
416        Ok(())
417    }
418
419    #[tracing::instrument(name = "AcknowledgementState::incoming_retransmission_request", skip(self, request), fields(session_id = self.id))]
420    fn incoming_retransmission_request(&mut self, request: SegmentRequest<C>) -> Result<(), SessionError> {
421        // The state will respond to segment retransmission requests even
422        // if it has this feature disabled in the config.
423        tracing::trace!(count = request.len(), "segment retransmission requested");
424
425        let ctx = self
426            .started
427            .load(std::sync::atomic::Ordering::Relaxed)
428            .then_some(self.context.as_mut())
429            .flatten()
430            .ok_or(SessionError::StateNotRunning)?;
431
432        let (mut missing_seg_ids, mut missing_frame_ids): (Vec<_>, Vec<_>) =
433            request.into_iter().map(|s| (s, s.0)).unzip();
434
435        // Perform a single find to lock the RB only once
436        let segments = ctx.rb_rx.find(|s| {
437            // SegmentIds are guaranteed to be sorted, so we can use binary search
438            if let Ok(i) = missing_seg_ids.binary_search(&s.id()) {
439                missing_seg_ids.remove(i);
440                true
441            } else {
442                false
443            }
444        });
445
446        tracing::trace!(
447            found = segments.len(),
448            requested = missing_frame_ids.len(),
449            "found matching segments to be retransmitted"
450        );
451
452        // Partially acknowledged frames will not need to be fully resent in the future.
453        // Cancel all partially acknowledged frame resends.
454        if self.cfg.mode.is_full_ack_enabled() {
455            // Since the FrameIds are guaranteed to be sorted, we can simply dedup them.
456            missing_frame_ids.dedup();
457
458            if let Err(error) = ctx.outgoing_frame_retries_tx.send_many(
459                missing_frame_ids
460                    .into_iter()
461                    .map(|frame_id| (RetriedFrameId::no_retries(frame_id), Skip).into()),
462            ) {
463                tracing::error!(%error, "failed to cancel frame resend of partially acknowledged frames");
464            }
465        }
466
467        // Resend the segments via the Control Stream
468        segments
469            .into_iter()
470            .try_for_each(|s| {
471                tracing::trace!(seg_id = %s.id(), "retransmit segment on request");
472                ctx.ctl_tx.try_send(SessionMessage::Segment(s))
473            })
474            .map_err(|e| SessionError::ProcessingError(e.to_string()))
475    }
476
477    #[tracing::instrument(name = "AcknowledgementState::incoming_acknowledged_frames", skip(self), fields(session_id = self.id))]
478    fn incoming_acknowledged_frames(&mut self, ack: FrameAcknowledgements<C>) -> Result<(), SessionError> {
479        tracing::trace!(count = ack.len(), "frame acknowledgements received");
480
481        let ctx = self
482            .started
483            .load(std::sync::atomic::Ordering::Relaxed)
484            .then_some(self.context.as_mut())
485            .flatten()
486            .ok_or(SessionError::StateNotRunning)?;
487
488        // Frame acknowledged, we will not need to resend it
489        if self.cfg.mode.is_full_ack_enabled()
490            && let Err(error) = ctx.outgoing_frame_retries_tx.send_many(
491                ack.into_iter()
492                    .inspect(|frame_id| tracing::trace!(frame_id, "frame acknowledged"))
493                    .map(|frame_id| (RetriedFrameId::no_retries(frame_id), Skip).into()),
494            )
495        {
496            tracing::error!(%error, "failed to cancel frame resend");
497        }
498
499        Ok(())
500    }
501
502    #[tracing::instrument(name = "AcknowledgementState::frame_complete", skip(self), fields(session_id = self.id))]
503    fn frame_complete(&mut self, frame_id: FrameId) -> Result<(), SessionError> {
504        tracing::trace!("frame complete");
505
506        let ctx = self
507            .started
508            .load(std::sync::atomic::Ordering::Relaxed)
509            .then_some(self.context.as_mut())
510            .flatten()
511            .ok_or(SessionError::StateNotRunning)?;
512
513        // Since the frame has been completed, push its ID into the acknowledgement queue
514        if let Err(error) = ctx.ack_tx.try_send(frame_id) {
515            tracing::error!(%error, "failed to acknowledge frame");
516        }
517
518        if self.cfg.mode.is_partial_ack_enabled() {
519            // No more requesting of segment retransmissions from frames that were completed
520            if let Err(error) = ctx
521                .incoming_frame_retries_tx
522                .send_one((RetriedFrameId::no_retries(frame_id), Skip))
523            {
524                tracing::error!(%error, "failed to cancel retry of acknowledged frame");
525            }
526        }
527
528        Ok(())
529    }
530
531    #[tracing::instrument(name = "AcknowledgementState::frame_emitted", skip(self), fields(session_id = self.id))]
532    fn frame_emitted(&mut self, id: FrameId) -> Result<(), SessionError> {
533        tracing::trace!("frame emitted");
534        let _ = self
535            .started
536            .load(std::sync::atomic::Ordering::Relaxed)
537            .then_some(self.context.as_mut())
538            .flatten()
539            .ok_or(SessionError::StateNotRunning)?;
540        Ok(())
541    }
542
543    #[tracing::instrument(name = "AcknowledgementState::frame_discarded", skip(self), fields(session_id = self.id))]
544    fn frame_discarded(&mut self, frame_id: FrameId) -> Result<(), SessionError> {
545        tracing::trace!("frame discarded");
546
547        let ctx = self
548            .started
549            .load(std::sync::atomic::Ordering::Relaxed)
550            .then_some(self.context.as_mut())
551            .flatten()
552            .ok_or(SessionError::StateNotRunning)?;
553
554        if self.cfg.mode.is_partial_ack_enabled() {
555            // No more requesting of segment retransmissions from frames that were discarded
556            if let Err(error) = ctx
557                .incoming_frame_retries_tx
558                .send_one((RetriedFrameId::no_retries(frame_id), Skip))
559            {
560                tracing::error!(%error, "failed to cancel retry of acknowledged frame");
561            }
562        }
563
564        Ok(())
565    }
566
567    #[tracing::instrument(name = "AcknowledgementState::segment_sent", skip(self, segment), fields(session_id = self.id, frame_id = segment.frame_id, seq_idx = segment.seq_idx))]
568    fn segment_sent(&mut self, segment: &Segment) -> Result<(), SessionError> {
569        tracing::trace!("segment sent");
570
571        let ctx = self
572            .started
573            .load(std::sync::atomic::Ordering::Relaxed)
574            .then_some(self.context.as_mut())
575            .flatten()
576            .ok_or(SessionError::StateNotRunning)?;
577
578        // Since segments are re-sent via Control stream, they are not later fed again
579        // into the ring buffer.
580        if !ctx.rb_tx.push(segment.clone()) {
581            tracing::error!("failed to push segment into ring buffer");
582        }
583
584        // When the last segment of a frame has been sent,
585        // add it to outgoing retries (if the full ack mode is enabled).
586        if segment.is_last() && self.cfg.mode.is_full_ack_enabled() {
587            tracing::trace!("last segment of frame sent");
588
589            if let Err(error) = ctx.outgoing_frame_retries_tx.send_one((
590                RetriedFrameId::with_retries(segment.frame_id, self.cfg.max_outgoing_frame_retries),
591                // The whole frame should be delivered and acknowledged
592                // once all its segments (seq_len) are sent,
593                // and the acknowledgement also comes back to us.
594                // Therefore, RTO_BASE_SENDER = latency * (seq_len + 1)
595                self.cfg.expected_packet_latency * (segment.seq_flags.seq_len() + 1) as u32,
596            )) {
597                tracing::error!(%error, "failed to insert outgoing retry of a frame");
598            }
599        }
600
601        Ok(())
602    }
603}
604
605#[cfg(test)]
606mod tests {
607    use anyhow::Context;
608
609    use super::*;
610    use crate::{
611        processing::types::{FrameBuilder, FrameDashMap, FrameMap},
612        protocol::SeqNum,
613        utils::segment,
614    };
615
616    const FRAME_SIZE: usize = 1500;
617
618    const MTU: usize = 1000;
619
620    #[test_log::test(tokio::test)]
621    async fn ack_state_sender_must_acknowledge_completed_frames() -> anyhow::Result<()> {
622        let cfg = AcknowledgementStateConfig {
623            expected_packet_latency: Duration::from_millis(10),
624            acknowledgement_delay: Duration::from_millis(2),
625            ..Default::default()
626        };
627
628        let inspector = FrameInspector(FrameDashMap::with_capacity(10));
629        let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
630
631        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
632        state.run(SocketComponents {
633            inspector: inspector.into(),
634            ctl_tx,
635        })?;
636
637        let acked_frame_ids = [1, 2, 3];
638
639        for &frame_id in &acked_frame_ids {
640            state.frame_complete(frame_id)?;
641        }
642
643        tokio::time::sleep(cfg.acknowledgement_delay * 2).await;
644
645        state.stop()?;
646
647        let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
648            .await
649            .context("timeout receiving Control messages")?;
650
651        assert_eq!(1, ctl_msgs.len());
652
653        assert_eq!(
654            ctl_msgs[0],
655            SessionMessage::Acknowledge(acked_frame_ids.to_vec().try_into()?)
656        );
657
658        Ok(())
659    }
660
661    #[parameterized::parameterized(num_frames = { 1, 2, 3 })]
662    #[parameterized_macro(test_log::test(tokio::test))]
663    async fn ack_state_sender_must_resend_unacknowledged_frames(num_frames: usize) -> anyhow::Result<()> {
664        const NUM_RETRIES: usize = 2;
665
666        let cfg = AcknowledgementStateConfig {
667            mode: AcknowledgementMode::Full,
668            expected_packet_latency: Duration::from_millis(2),
669            max_outgoing_frame_retries: NUM_RETRIES,
670            ..Default::default()
671        };
672
673        let inspector = FrameInspector(FrameDashMap::with_capacity(10));
674        let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
675
676        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
677        state.run(SocketComponents {
678            inspector: inspector.into(),
679            ctl_tx,
680        })?;
681
682        let mut expected_frame_segments = Vec::new();
683        let num_segments_in_frame = FRAME_SIZE / MTU + 1;
684        for i in 1..=num_frames {
685            let expected_segments = segment(
686                hopr_types::crypto_random::random_bytes::<FRAME_SIZE>(),
687                MTU,
688                i as FrameId,
689            )?;
690            for segment in &expected_segments {
691                state.segment_sent(segment)?;
692            }
693            expected_frame_segments.push(expected_segments);
694        }
695
696        let expected_frame_delivery = cfg.expected_packet_latency * (num_segments_in_frame + 1) as u32;
697        tokio::time::sleep(2 * expected_frame_delivery).await;
698        state.stop()?;
699
700        let ctl_msg = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
701            .await
702            .context("timeout receiving Control message")?;
703
704        let retransmitted_segments = ctl_msg
705            .into_iter()
706            .map(|m| m.try_as_segment().ok_or(anyhow::anyhow!("must be segment")))
707            .collect::<Result<Vec<_>, _>>()?;
708
709        assert_eq!(
710            NUM_RETRIES * num_segments_in_frame * num_frames,
711            retransmitted_segments.len()
712        );
713
714        let total_segments = expected_frame_segments.iter().map(|m| m.len()).sum::<usize>();
715        let expected_segments = expected_frame_segments
716            .into_iter()
717            .flatten()
718            .cycle()
719            .take(total_segments * NUM_RETRIES)
720            .collect::<Vec<_>>();
721        assert_eq!(expected_segments, retransmitted_segments);
722
723        Ok(())
724    }
725
726    #[test_log::test(tokio::test)]
727    async fn ack_state_sender_must_not_resend_unacknowledged_frame_when_full_resend_disabled() -> anyhow::Result<()> {
728        const NUM_RETRIES: usize = 2;
729
730        let cfg = AcknowledgementStateConfig {
731            mode: AcknowledgementMode::Partial,
732            expected_packet_latency: Duration::from_millis(2),
733            max_outgoing_frame_retries: NUM_RETRIES,
734            ..Default::default()
735        };
736
737        let inspector = FrameInspector(FrameDashMap::with_capacity(10));
738        let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
739
740        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
741        state.run(SocketComponents {
742            inspector: inspector.into(),
743            ctl_tx,
744        })?;
745
746        let expected_segments = segment(hopr_types::crypto_random::random_bytes::<FRAME_SIZE>(), MTU, 1)?;
747        for segment in &expected_segments {
748            state.segment_sent(segment)?;
749        }
750
751        let expected_frame_delivery = cfg.expected_packet_latency * (expected_segments.len() + 1) as u32;
752        tokio::time::sleep(2 * expected_frame_delivery).await;
753        state.stop()?;
754
755        // No retransmission should be sent because it is disabled
756        assert!(ctl_rx.collect::<Vec<_>>().await.is_empty());
757
758        Ok(())
759    }
760
761    #[tokio::test]
762    async fn ack_state_sender_must_not_resend_acknowledged_frame() -> anyhow::Result<()> {
763        let cfg = AcknowledgementStateConfig {
764            mode: AcknowledgementMode::Full,
765            expected_packet_latency: Duration::from_millis(2),
766            max_outgoing_frame_retries: 1,
767            ..Default::default()
768        };
769
770        let inspector = FrameInspector(FrameDashMap::with_capacity(10));
771        let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
772
773        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
774        state.run(SocketComponents {
775            inspector: inspector.into(),
776            ctl_tx,
777        })?;
778
779        let expected_segments = segment(hopr_types::crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 1)?;
780        for segment in &expected_segments {
781            state.segment_sent(segment)?;
782        }
783
784        // Acknowledge the frame
785        state.incoming_acknowledged_frames(vec![1].try_into()?)?;
786
787        tokio::time::sleep(10 * cfg.expected_packet_latency).await;
788
789        state.stop()?;
790
791        // No retransmission should be sent because the frame was already acknowledged.
792        assert!(ctl_rx.collect::<Vec<_>>().await.is_empty());
793
794        Ok(())
795    }
796
797    #[test_log::test(tokio::test)]
798    async fn ack_state_sender_must_not_resend_entire_frame_when_already_partially_acknowledged() -> anyhow::Result<()> {
799        let cfg = AcknowledgementStateConfig {
800            mode: AcknowledgementMode::Full,
801            expected_packet_latency: Duration::from_millis(2),
802            max_outgoing_frame_retries: 1,
803            ..Default::default()
804        };
805
806        let inspector = FrameInspector(FrameDashMap::with_capacity(10));
807        let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
808
809        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
810        state.run(SocketComponents {
811            inspector: inspector.into(),
812            ctl_tx,
813        })?;
814
815        let expected_segments = segment(hopr_types::crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 1)?;
816
817        // Load segments into the ring buffer
818        for segment in &expected_segments {
819            state.segment_sent(segment)?;
820        }
821
822        tokio::time::sleep(cfg.expected_packet_latency).await;
823
824        // Partially acknowledge the frame (report the first segment as missing)
825        state.incoming_retransmission_request(SegmentRequest::from_iter([(1, [0b10000000].into())]))?;
826
827        state.stop()?;
828
829        // Only segment 1 must be retransmitted
830        let ctl_msgs = ctl_rx.collect::<Vec<_>>().await;
831        assert_eq!(1, ctl_msgs.len());
832        assert_eq!(ctl_msgs[0], SessionMessage::Segment(expected_segments[0].clone()));
833
834        Ok(())
835    }
836
837    #[test_log::test(tokio::test)]
838    async fn ack_state_sender_must_retransmit_segments_when_requested() -> anyhow::Result<()> {
839        let cfg = AcknowledgementStateConfig {
840            mode: AcknowledgementMode::Full,
841            expected_packet_latency: Duration::from_millis(2),
842            max_outgoing_frame_retries: 1,
843            ..Default::default()
844        };
845
846        let inspector = FrameInspector(FrameDashMap::with_capacity(10));
847        let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
848
849        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
850        state.run(SocketComponents {
851            inspector: inspector.into(),
852            ctl_tx,
853        })?;
854
855        let expected_segments_1 = segment(hopr_types::crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 1)?;
856        // Load frame 1 segments into the ring buffer
857        for segment in &expected_segments_1 {
858            state.segment_sent(segment)?;
859        }
860
861        let expected_segments_2 = segment(hopr_types::crypto_random::random_bytes::<{ FRAME_SIZE * 2 }>(), MTU, 2)?;
862        // Load frame 2 segments into the ring buffer
863        for segment in &expected_segments_2 {
864            state.segment_sent(segment)?;
865        }
866
867        tokio::time::sleep(cfg.expected_packet_latency).await;
868
869        // Request different segments to be retransmitted
870        state.incoming_retransmission_request(SegmentRequest::from_iter([
871            (1, [0b11100000].into()),
872            (2, [0b11100000].into()),
873        ]))?;
874        tokio::time::sleep(cfg.expected_packet_latency).await;
875
876        state.incoming_retransmission_request(SegmentRequest::from_iter([(2, [0b11000000].into())]))?;
877        tokio::time::sleep(cfg.expected_packet_latency).await;
878
879        state.incoming_retransmission_request(SegmentRequest::from_iter([(2, [0b01000000].into())]))?;
880        tokio::time::sleep(cfg.expected_packet_latency).await;
881
882        state.stop()?;
883
884        let ctl_msgs = ctl_rx.collect::<Vec<_>>().await;
885
886        assert_eq!(9, ctl_msgs.len());
887        // Request 1 - frame 1
888        assert_eq!(ctl_msgs[0], SessionMessage::Segment(expected_segments_1[0].clone()));
889        assert_eq!(ctl_msgs[1], SessionMessage::Segment(expected_segments_1[1].clone()));
890        assert_eq!(ctl_msgs[2], SessionMessage::Segment(expected_segments_1[2].clone()));
891        // Request 1 - frame 2
892        assert_eq!(ctl_msgs[3], SessionMessage::Segment(expected_segments_2[0].clone()));
893        assert_eq!(ctl_msgs[4], SessionMessage::Segment(expected_segments_2[1].clone()));
894        assert_eq!(ctl_msgs[5], SessionMessage::Segment(expected_segments_2[2].clone()));
895
896        // Request 2 - frame 2
897        assert_eq!(ctl_msgs[6], SessionMessage::Segment(expected_segments_2[0].clone()));
898        assert_eq!(ctl_msgs[7], SessionMessage::Segment(expected_segments_2[1].clone()));
899
900        // Request 3 - frame 2
901        assert_eq!(ctl_msgs[8], SessionMessage::Segment(expected_segments_2[1].clone()));
902
903        Ok(())
904    }
905
906    #[tokio::test]
907    async fn ack_state_receiver_must_request_missing_frames_when_partial_acks_are_enabled() -> anyhow::Result<()> {
908        let cfg = AcknowledgementStateConfig {
909            mode: AcknowledgementMode::Partial,
910            expected_packet_latency: Duration::from_millis(2),
911            max_incoming_frame_retries: 1,
912            ..Default::default()
913        };
914
915        let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
916        let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
917
918        let segments = segment(hopr_types::crypto_random::random_bytes::<FRAME_SIZE>(), MTU, 1)?;
919
920        inspector
921            .0
922            .entry(1)
923            .try_as_vacant()
924            .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
925            .insert(FrameBuilder::from(segments[0].clone()));
926
927        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
928        state.run(SocketComponents {
929            inspector: inspector.into(),
930            ctl_tx,
931        })?;
932
933        state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
934
935        tokio::time::sleep(cfg.expected_packet_latency * 2).await;
936
937        state.stop()?;
938
939        let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
940            .await
941            .context("timeout receiving Control messages")?;
942
943        assert_eq!(1, ctl_msgs.len());
944        assert_eq!(
945            ctl_msgs[0],
946            SessionMessage::Request(SegmentRequest::from_iter([(1, [0b01000000].into())]))
947        );
948
949        Ok(())
950    }
951
952    #[tokio::test]
953    async fn ack_state_receiver_must_not_request_missing_frames_when_partial_acks_are_disabled() -> anyhow::Result<()> {
954        let cfg = AcknowledgementStateConfig {
955            mode: AcknowledgementMode::Full,
956            expected_packet_latency: Duration::from_millis(2),
957            max_incoming_frame_retries: 1,
958            ..Default::default()
959        };
960
961        let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
962        let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
963
964        let segments = segment(hopr_types::crypto_random::random_bytes::<FRAME_SIZE>(), MTU, 1)?;
965
966        inspector
967            .0
968            .entry(1)
969            .try_as_vacant()
970            .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
971            .insert(FrameBuilder::from(segments[0].clone()));
972
973        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
974        state.run(SocketComponents {
975            inspector: inspector.into(),
976            ctl_tx,
977        })?;
978
979        state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
980
981        tokio::time::sleep(cfg.expected_packet_latency * 2).await;
982
983        state.stop()?;
984
985        let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
986            .await
987            .context("timeout receiving Control messages")?;
988
989        assert!(ctl_msgs.iter().all(|m| !matches!(m, SessionMessage::Request(_))));
990
991        Ok(())
992    }
993
994    #[tokio::test]
995    async fn ack_state_receiver_must_continue_requesting_missing_frames_when_frame_not_completed() -> anyhow::Result<()>
996    {
997        let cfg = AcknowledgementStateConfig {
998            mode: AcknowledgementMode::Partial,
999            expected_packet_latency: Duration::from_millis(2),
1000            max_incoming_frame_retries: 3,
1001            ..Default::default()
1002        };
1003
1004        let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
1005        let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
1006
1007        let segments = segment(hopr_types::crypto_random::random_bytes::<{ 2 * FRAME_SIZE }>(), MTU, 1)?;
1008
1009        inspector
1010            .0
1011            .entry(1)
1012            .try_as_vacant()
1013            .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
1014            .insert(FrameBuilder::from(segments[0].clone()));
1015
1016        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
1017        state.run(SocketComponents {
1018            inspector: inspector.clone().into(),
1019            ctl_tx,
1020        })?;
1021
1022        state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
1023
1024        tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1025
1026        inspector
1027            .0
1028            .entry(1)
1029            .try_as_occupied()
1030            .ok_or(anyhow::anyhow!("frame 1 must be occupied"))?
1031            .get_mut()
1032            .add_segment(segments[1].clone())?;
1033
1034        state.incoming_segment(&segments[1].id(), (segments.len() as SeqNum).try_into()?)?;
1035
1036        tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1037
1038        state.stop()?;
1039
1040        let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
1041            .await
1042            .context("timeout receiving Control messages")?;
1043
1044        assert_eq!(2, ctl_msgs.len());
1045        assert_eq!(
1046            ctl_msgs[0],
1047            SessionMessage::Request(SegmentRequest::from_iter([(1, [0b01100000].into())]))
1048        );
1049
1050        assert_eq!(
1051            ctl_msgs[1],
1052            SessionMessage::Request(SegmentRequest::from_iter([(1, [0b00100000].into())]))
1053        );
1054
1055        Ok(())
1056    }
1057
1058    #[tokio::test]
1059    async fn ack_state_receiver_must_continue_requesting_missing_frames_and_acknowledge_once_complete()
1060    -> anyhow::Result<()> {
1061        let cfg = AcknowledgementStateConfig {
1062            mode: AcknowledgementMode::Partial,
1063            expected_packet_latency: Duration::from_millis(2),
1064            max_incoming_frame_retries: 3,
1065            acknowledgement_delay: Duration::from_millis(5),
1066            ..Default::default()
1067        };
1068
1069        let mut inspector = FrameInspector(FrameDashMap::with_capacity(10));
1070        let (ctl_tx, ctl_rx) = futures::channel::mpsc::channel(1024);
1071
1072        let segments = segment(hopr_types::crypto_random::random_bytes::<{ 2 * FRAME_SIZE }>(), MTU, 1)?;
1073
1074        // Segment 1
1075        inspector
1076            .0
1077            .entry(1)
1078            .try_as_vacant()
1079            .ok_or(anyhow::anyhow!("frame 1 must be vacant"))?
1080            .insert(FrameBuilder::from(segments[0].clone()));
1081
1082        let mut state = AcknowledgementState::<MTU>::new("test", cfg);
1083        state.run(SocketComponents {
1084            inspector: inspector.clone().into(),
1085            ctl_tx,
1086        })?;
1087
1088        // Segment 2
1089        state.incoming_segment(&segments[0].id(), (segments.len() as SeqNum).try_into()?)?;
1090
1091        tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1092
1093        inspector
1094            .0
1095            .entry(1)
1096            .try_as_occupied()
1097            .ok_or(anyhow::anyhow!("frame 1 must be occupied"))?
1098            .get_mut()
1099            .add_segment(segments[1].clone())?;
1100
1101        state.incoming_segment(&segments[1].id(), (segments.len() as SeqNum).try_into()?)?;
1102
1103        tokio::time::sleep(cfg.expected_packet_latency * 2).await;
1104
1105        // Segment 3
1106        inspector
1107            .0
1108            .entry(1)
1109            .try_as_occupied()
1110            .ok_or(anyhow::anyhow!("frame 1 must be occupied"))?
1111            .get_mut()
1112            .add_segment(segments[2].clone())?;
1113
1114        state.incoming_segment(&segments[2].id(), (segments.len() as SeqNum).try_into()?)?;
1115        state.frame_complete(1)?;
1116
1117        tokio::time::sleep(cfg.acknowledgement_delay * 2).await;
1118
1119        state.stop()?;
1120
1121        let ctl_msgs = tokio::time::timeout(Duration::from_millis(100), ctl_rx.collect::<Vec<_>>())
1122            .await
1123            .context("timeout receiving Control messages")?;
1124
1125        assert_eq!(3, ctl_msgs.len());
1126        assert_eq!(
1127            ctl_msgs[0],
1128            SessionMessage::Request(SegmentRequest::from_iter([(1, [0b01100000].into())]))
1129        );
1130
1131        assert_eq!(
1132            ctl_msgs[1],
1133            SessionMessage::Request(SegmentRequest::from_iter([(1, [0b00100000].into())]))
1134        );
1135
1136        assert_eq!(ctl_msgs[2], SessionMessage::Acknowledge(vec![1].try_into()?));
1137
1138        Ok(())
1139    }
1140}