hopr_network_types/session/
state.rs

1//! # `Session` protocol state machine
2//!
3//! The protocol always forms a middle layer between a *lower layer* transport (such as an unreliable
4//! UDP-like network) and any upstream protocol.
5//! The communication with the *lower layer* is done via [`SessionState`];
6//! the *upper layer* is using the [`SessionSocket`] to pass data with the
7//! `Session` protocol.
8//!
9//! ## Instantiation
10//! The instantiation of the protocol state machine is done by creating the [`SessionSocket`]
11//! object, by [providing it](SessionSocket::new) an underlying transport writer and its MTU `C`.
12//! The protocol can be instantiated over any transport that implements [`AsyncWrite`] + [`AsyncRead`]
13//! for sending and receiving raw data packets.
14//!
15//! ## Passing data between the protocol and the upper layer
16//! The [`SessionSocket`] exposes as [`AsyncRead`] +
17//! [`AsyncWrite`] and can be used to read and write arbitrary data
18//! to the protocol. If the writer is [closed](AsyncWrite::poll_close), the session is closed
19//! as well.
20//!
21//! ## Passing of data between the protocol and the lower layer
22//!
23//! As long as the underlying transport implements [`AsyncRead`] + [`AsyncWrite`],
24//! the [`SessionSocket`] automatically polls data from the underlying transport,
25//! and sends the data to the underlying transport as needed.
26//!
27//! ## Protocol features
28//!
29//! ### Data segmentation
30//! Once data is written to the [`SessionSocket`], it is segmented and written
31//! automatically to the underlying transport. Every writing to the `SessionSocket` corresponds to
32//! a [`Frame`](crate::session::frame::Frame).
33//!
34//! ## Frame reassembly
35//! The receiving side performs frame reassembly and sequencing of the frames.
36//! Frames are never emitted to the upper layer transport out of order, but frames
37//! can be skipped if they exceed the [`frame_expiration_age`](SessionConfig).
38//!
39//! ## Frame acknowledgement
40//!
41//! The recipient can acknowledge frames to the sender once all its segments have been received.
42//! This is done with a [`FrameAcknowledgements`] message sent back
43//! to the sender.
44//!
45//! ## Segment retransmission
46//!
47//! There are two means of segment retransmission:
48//!
49//! ### Recipient requested retransmission
50//! This is useful in situations when the recipient has received only some segments of a frame.
51//! At this point, the recipient knows which segments are missing in a frame and can initiate
52//! [`SegmentRequest`] sent back to the sender.
53//! This method is more targeted, as it requests only those segments of a frame that are needed.
54//! Once the sender receives the segment request, it will retransmit the segments in question
55//! over to the receiver.
56//! The recipient can make repeating requests on retransmission, based on the network reliability.
57//! However, retransmission requests decay with an exponential backoff given by `backoff_base`
58//! and `rto_base_receiver` timeout in [`SessionConfig`] up
59//! until the `frame_expiration_age`.
60//!
61//!
62//! ### Sender initiated retransmission
63//! The frame sender can also automatically retransmit entire frames (= all their segments)
64//! to the recipient. This happens if the sender (within a time period) did not receive the
65//! frame acknowledgement *and* the recipient also did not request retransmission of any segment in
66//! that frame.
67//! This is useful in situations when the recipient did not receive any segment of a frame. Once
68//! the recipient receives at least one segment of a frame, the recipient requested retransmission
69//! is the preferred way.
70//!
71//! The sender can make repeating frame retransmissions, based on the network reliability.
72//! However, retransmissions decay with an exponential backoff given by `backoff_base`
73//! and `rto_base_sender` timeout in [`SessionConfig`] up until
74//! the `frame_expiration_age`.
75//! The retransmissions of a frame by the sender stop if the frame has been acknowledged by the
76//! recipient *or* the recipient started requesting segment retransmission.
77//!
78//! ### Retransmission timing
79//! Both retransmission methods will work up until `frame_expiration_age`. Since the
80//! recipient-request-based method is more targeted, at least one should be allowed to happen
81//! before the sender-initiated retransmission kicks in. Therefore, it is recommended to set
82//! the `rto_base_sender` at least twice the `rto_base_receiver`.
83//!
84//! The above protocol features can be enabled by setting [SessionFeature] options in the configuration
85//! during [SessionSocket] construction.
86//!
87//! **For diagrams of individual retransmission situations, see the docs on the [`SessionSocket`] object.**
88use std::{
89    collections::{BTreeSet, HashSet},
90    fmt::{Debug, Display},
91    future::Future,
92    pin::Pin,
93    sync::{
94        Arc,
95        atomic::{AtomicU32, Ordering},
96    },
97    task::{Context, Poll},
98    time::{Duration, Instant},
99};
100
101use crossbeam_queue::ArrayQueue;
102use crossbeam_skiplist::SkipMap;
103use dashmap::{DashMap, mapref::entry::Entry};
104use futures::{
105    AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, Sink, SinkExt, StreamExt, TryStreamExt,
106    channel::mpsc::UnboundedSender, future::BoxFuture, pin_mut,
107};
108use governor::Quota;
109use hopr_async_runtime::prelude::spawn;
110use smart_default::SmartDefault;
111use tracing::{debug, error, trace, warn};
112
113use crate::{
114    errors::NetworkTypeError,
115    prelude::protocol::SessionMessageIter,
116    session::{
117        errors::SessionError,
118        frame::{FrameId, FrameReassembler, Segment, SegmentId, segment},
119        protocol::{FrameAcknowledgements, SegmentRequest, SessionMessage},
120        utils::{RetryResult, RetryToken},
121    },
122    utils::AsyncReadStreamer,
123};
124
125#[cfg(all(feature = "prometheus", not(test)))]
126lazy_static::lazy_static! {
127    static ref METRIC_TIME_TO_ACK: hopr_metrics::MultiHistogram =
128        hopr_metrics::MultiHistogram::new(
129            "hopr_session_time_to_ack",
130            "Time in seconds until a complete frame gets acknowledged by the recipient",
131            vec![0.5, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0],
132            &["session_id"]
133    ).unwrap();
134}
135
136/// Represents individual Session protocol features that can be enabled.
137#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
138pub enum SessionFeature {
139    /// Enable requesting of incomplete frames by the recipient.
140    RequestIncompleteFrames,
141    /// Enable frame retransmission by the sender.
142    /// This requires `AcknowledgeFrames` to be enabled at the recipient.
143    RetransmitFrames,
144    /// Enable frame acknowledgement by the recipient.
145    AcknowledgeFrames,
146    /// Disables small frame buffering.
147    NoDelay,
148}
149
150impl SessionFeature {
151    /// Default features
152    ///
153    /// These include:
154    /// - [`SessionFeature::AcknowledgeFrames`]
155    /// - ACK-based ([`SessionFeature::RetransmitFrames`]) and NACK-based ([`SessionFeature::RequestIncompleteFrames`])
156    ///   retransmission
157    /// - Frame buffering (no [`SessionFeature::NoDelay`])
158    fn default_features() -> Vec<SessionFeature> {
159        vec![
160            SessionFeature::AcknowledgeFrames,
161            SessionFeature::RequestIncompleteFrames,
162            SessionFeature::RetransmitFrames,
163        ]
164    }
165}
166
167/// Configuration of Session protocol.
168#[derive(Debug, Clone, SmartDefault, validator::Validate)]
169pub struct SessionConfig {
170    /// Maximum number of buffered segments.
171    ///
172    /// The value should be large enough to accommodate segments for an at least
173    /// `frame_expiration_age` period, considering the expected maximum bandwidth.
174    ///
175    /// Default is 50,000.
176    #[default = 50_000]
177    pub max_buffered_segments: usize,
178
179    /// Size of the buffer for acknowledged frame IDs.
180    ///
181    /// The value should be large enough so that the buffer can accommodate segments
182    /// for an at least `frame_expiration_age` period, given the expected maximum bandwidth.
183    ///
184    /// The minimum value is 1, default is 1024.
185    #[default = 1024]
186    #[validate(range(min = 1))]
187    pub acknowledged_frames_buffer: usize,
188
189    /// Specifies the maximum period a frame should be kept by the sender and
190    /// asked for retransmission by the recipient.
191    ///
192    /// Default is 30 seconds.
193    #[default(Duration::from_secs(30))]
194    pub frame_expiration_age: Duration,
195
196    /// If a frame is incomplete (on the receiver), retransmission requests will be made
197    /// with exponential backoff starting at this initial retry timeout (RTO).
198    ///
199    /// Requests will be sent until `frame_expiration_age` is reached.
200    ///
201    /// NOTE: this value should be offset from `rto_base_sender`, so that the receiver's
202    /// retransmission requests are interleaved with the sender's retransmissions.
203    ///
204    /// In *most* cases, you want to 0 < `rto_base_receiver` < `rto_base_sender` < `frame_expiration_age`.
205    ///
206    /// Default is 1 second.
207    #[default(Duration::from_millis(1000))]
208    pub rto_base_receiver: Duration,
209
210    /// If a frame is unacknowledged (on the sender), entire frame retransmissions will be made
211    /// with exponential backoff starting at this initial retry timeout (RTO).
212    ///
213    /// Frames will be retransmitted until `frame_expiration_age` is reached.
214    ///
215    /// NOTE: this value should be offset from `rto_base_receiver`, so that the receiver's
216    /// retransmission requests are interleaved with the sender's retransmissions.
217    ///
218    /// In *most* cases, you want to 0 < `rto_base_receiver` < `rto_base_sender` < `frame_expiration_age`.
219    ///
220    /// Default is 1.5 seconds.
221    #[default(Duration::from_millis(1500))]
222    pub rto_base_sender: Duration,
223
224    /// Base for the exponential backoff on retries.
225    ///
226    /// Default is 2.
227    #[default(2.0)]
228    #[validate(range(min = 1.0))]
229    pub backoff_base: f64,
230
231    /// Standard deviation of a Gaussian jitter applied to `rto_base_receiver` and
232    /// `rto_base_sender`. Must be between 0 and 0.25.
233    ///
234    /// Default is 0.05
235    #[default(0.05)]
236    #[validate(range(min = 0.0, max = 0.25))]
237    pub rto_jitter: f64,
238
239    /// Set of [features](SessionFeature) that should be enabled on this Session.
240    ///
241    /// Default is [`SessionFeature::default_features`].
242    #[default(_code = "HashSet::from_iter(SessionFeature::default_features())")]
243    pub enabled_features: HashSet<SessionFeature>,
244}
245
246/// Contains the cloneable state of the session bound to a [`SessionSocket`].
247///
248/// It implements the entire [`SessionMessage`] state machine and
249/// performs the frame reassembly and sequencing.
250/// The MTU size is specified by `C`.
251///
252/// The `SessionState` cannot be created directly, it must always be created via [`SessionSocket`] and
253/// then retrieved via [`SessionSocket::state`].
254#[derive(Debug, Clone)]
255pub struct SessionState<const C: usize> {
256    session_id: String,
257    lookbehind: Arc<SkipMap<SegmentId, Segment>>,
258    to_acknowledge: Arc<ArrayQueue<FrameId>>,
259    incoming_frame_retries: Arc<DashMap<FrameId, RetryToken>>,
260    outgoing_frame_resends: Arc<DashMap<FrameId, RetryToken>>,
261    outgoing_frame_id: Arc<AtomicU32>,
262    frame_reassembler: Arc<FrameReassembler>,
263    cfg: SessionConfig,
264    segment_egress_send: UnboundedSender<SessionMessage<C>>,
265}
266
267fn maybe_fused_future<'a, F>(condition: bool, future: F) -> futures::future::Fuse<BoxFuture<'a, ()>>
268where
269    F: Future<Output = ()> + Send + Sync + 'a,
270{
271    if condition {
272        future.boxed()
273    } else {
274        futures::future::pending().boxed()
275    }
276    .fuse()
277}
278
279impl<const C: usize> SessionState<C> {
280    /// Maximum size of a frame, which is determined by the maximum number of possible segments.
281    const MAX_WRITE_SIZE: usize = SessionMessage::<C>::MAX_SEGMENTS_PER_FRAME * Self::PAYLOAD_CAPACITY;
282    /// How much space for payload there is in a single packet.
283    const PAYLOAD_CAPACITY: usize = C - SessionMessage::<C>::SEGMENT_OVERHEAD;
284
285    fn consume_segment(&mut self, segment: Segment) -> crate::errors::Result<()> {
286        let id = segment.id();
287
288        trace!(session_id = self.session_id, segment = %id, "received segment");
289
290        match self.frame_reassembler.push_segment(segment) {
291            Ok(_) => {
292                match self.incoming_frame_retries.entry(id.0) {
293                    Entry::Occupied(e) => {
294                        // Receiving a frame segment restarts the retry token for this frame
295                        let rt = *e.get();
296                        e.replace_entry(rt.replenish(Instant::now(), self.cfg.backoff_base));
297                    }
298                    Entry::Vacant(v) => {
299                        // Create the retry token for this frame
300                        v.insert(RetryToken::new(Instant::now(), self.cfg.backoff_base));
301                    }
302                }
303                trace!(session_id = self.session_id, segment = %id, "received segment pushed");
304            }
305            // The error here is intentionally not propagated
306            Err(e) => warn!(session_id = self.session_id, ?id, error = %e, "segment not pushed"),
307        }
308
309        Ok(())
310    }
311
312    fn retransmit_segments(&mut self, request: SegmentRequest<C>) -> crate::errors::Result<()> {
313        trace!(
314            session_id = self.session_id,
315            count_of_segments = request.len(),
316            "received request",
317        );
318
319        let mut count = 0;
320        request
321            .into_iter()
322            .filter_map(|segment_id| {
323                // No need to retry this frame ourselves, since the other side will request on its own
324                self.outgoing_frame_resends.remove(&segment_id.0);
325                let ret = self
326                    .lookbehind
327                    .get(&segment_id)
328                    .map(|e| SessionMessage::<C>::Segment(e.value().clone()));
329                if ret.is_some() {
330                    trace!(
331                        session_id = self.session_id,
332                        %segment_id,
333                        "SENDING: retransmitted segment"
334                    );
335                    count += 1;
336                } else {
337                    warn!(
338                        session_id = self.session_id,
339                        id = ?segment_id,
340                        "segment not in lookbehind buffer anymore",
341                    );
342                }
343                ret
344            })
345            .try_for_each(|msg| self.segment_egress_send.unbounded_send(msg))
346            .map_err(|e| SessionError::ProcessingError(e.to_string()))?;
347
348        trace!(session_id = self.session_id, count, "retransmitted requested segments");
349
350        Ok(())
351    }
352
353    fn acknowledged_frames(&mut self, acked: FrameAcknowledgements<C>) -> crate::errors::Result<()> {
354        trace!(
355            session_id = self.session_id,
356            count = acked.len(),
357            "received acknowledgement frames",
358        );
359
360        for frame_id in acked {
361            // Frame acknowledged, we won't need to resend it
362            if let Some((_, rt)) = self.outgoing_frame_resends.remove(&frame_id) {
363                let to_ack = rt.time_since_creation();
364                trace!(
365                    session_id = self.session_id,
366                    frame_id,
367                    duration_in_ms = to_ack.as_millis(),
368                    "frame acknowledgement duratin"
369                );
370
371                #[cfg(all(feature = "prometheus", not(test)))]
372                METRIC_TIME_TO_ACK.observe(&[self.session_id()], to_ack.as_secs_f64())
373            }
374
375            for seg in self.lookbehind.iter().filter(|s| frame_id == s.key().0) {
376                seg.remove();
377            }
378        }
379
380        Ok(())
381    }
382
383    /// Sends a request for missing segments in incomplete frames.
384    /// One [request](SessionMessage::Request) message is sent per incomplete frame. The message contains
385    /// the segment indices missing from that frame.
386    /// Recurring requests have an [`rto_base_receiver`](SessionConfig) timeout with backoff.
387    /// Returns the number of sent request messages.
388    async fn request_missing_segments(&mut self) -> crate::errors::Result<usize> {
389        let tracked_incomplete = self.frame_reassembler.incomplete_frames();
390        trace!(
391            session_id = self.session_id,
392            count = tracked_incomplete.len(),
393            "tracking incomplete frames",
394        );
395
396        // Filter the frames which we are allowed to retry now
397        let mut to_retry = Vec::with_capacity(tracked_incomplete.len());
398        let now = Instant::now();
399        for info in tracked_incomplete {
400            match self.incoming_frame_retries.entry(info.frame_id) {
401                Entry::Occupied(e) => {
402                    // Check if we can retry this frame now
403                    let rto_check = e.get().check(
404                        now,
405                        self.cfg.rto_base_receiver,
406                        self.cfg.frame_expiration_age,
407                        self.cfg.rto_jitter,
408                    );
409                    match rto_check {
410                        RetryResult::RetryNow(next_rto) => {
411                            // Retry this frame and plan ahead of the time of the next retry
412                            trace!(
413                                session_id = self.session_id,
414                                frame_id = info.frame_id,
415                                retransmission_number = next_rto.num_retry,
416                                "performing frame retransmission",
417                            );
418                            e.replace_entry(next_rto);
419                            to_retry.push(info);
420                        }
421                        RetryResult::Expired => {
422                            // Frame is expired, so no more retries
423                            debug!(
424                                session_id = self.session_id,
425                                frame_id = info.frame_id,
426                                "frame is already expired and will be evicted"
427                            );
428                            e.remove();
429                        }
430                        RetryResult::Wait(d) => trace!(
431                            session_id = self.session_id,
432                            frame_id = info.frame_id,
433                            timeout_in_ms = d.as_millis(),
434                            next_retransmission_request_number = e.get().num_retry,
435                            "frame needs to wait for next retransmission request",
436                        ),
437                    }
438                }
439                Entry::Vacant(v) => {
440                    // Happens when no segment of this frame has been received yet
441                    debug!(
442                        session_id = self.session_id,
443                        frame_id = info.frame_id,
444                        "frame does not have a retry token"
445                    );
446                    v.insert(RetryToken::new(now, self.cfg.backoff_base));
447                    to_retry.push(info);
448                }
449            }
450        }
451
452        let mut sent = 0;
453        let to_retry = to_retry
454            .chunks(SegmentRequest::<C>::MAX_ENTRIES)
455            .map(|chunk| Ok(SessionMessage::<C>::Request(chunk.iter().cloned().collect())))
456            .inspect(|r| {
457                trace!(
458                    session_id = self.session_id,
459                    result = ?r,
460                    "SENDING: retransmission request"
461                );
462                sent += 1;
463            })
464            .collect::<Vec<_>>();
465
466        self.segment_egress_send
467            .send_all(&mut futures::stream::iter(to_retry))
468            .await
469            .map_err(|e| SessionError::ProcessingError(e.to_string()))?;
470
471        trace!(
472            session_id = self.session_id,
473            count = sent,
474            "RETRANSMISSION BATCH COMPLETE: sent {sent} re-send requests",
475        );
476        Ok(sent)
477    }
478
479    /// Sends [acknowledgement](SessionMessage::Acknowledge) messages containing frames IDs
480    /// of all frames that were successfully processed.
481    /// If [`acknowledged_frames_buffer`](SessionConfig) was set to `0` during the construction,
482    /// this method will do nothing and return `0`.
483    /// Otherwise, it returns the number of acknowledged frames.
484    /// If `acknowledged_frames_buffer` is non-zero, the buffer behaves like a ring buffer,
485    /// which means if this method is not called sufficiently often, the oldest acknowledged
486    /// frame IDs will be discarded.
487    /// Single [message](SessionMessage::Acknowledge) can accommodate up to [`FrameAcknowledgements::MAX_ACK_FRAMES`]
488    /// frame IDs, so this method sends as many messages as needed.
489    async fn acknowledge_segments(&mut self) -> crate::errors::Result<usize> {
490        let mut len = 0;
491        let mut msgs = 0;
492
493        while !self.to_acknowledge.is_empty() {
494            let mut ack_frames = FrameAcknowledgements::<C>::default();
495
496            while !ack_frames.is_full() && !self.to_acknowledge.is_empty() {
497                if let Some(ack_id) = self.to_acknowledge.pop() {
498                    ack_frames.push(ack_id);
499                    len += 1;
500                }
501            }
502
503            trace!(
504                session_id = self.session_id,
505                count = ack_frames.len(),
506                "SENDING: acknowledgements of frames",
507            );
508            self.segment_egress_send
509                .feed(SessionMessage::Acknowledge(ack_frames))
510                .await
511                .map_err(|e| SessionError::ProcessingError(e.to_string()))?;
512            msgs += 1;
513        }
514        self.segment_egress_send
515            .flush()
516            .await
517            .map_err(|e| SessionError::ProcessingError(e.to_string()))?;
518
519        trace!(
520            session_id = self.session_id,
521            count = len,
522            messages = msgs,
523            "ACK BATCH COMPLETE: sent acks in messages",
524        );
525        Ok(len)
526    }
527
528    /// Performs retransmission of entire unacknowledged frames as sender.
529    /// If [`acknowledged_frames_buffer`](SessionConfig) was set to `0` during the construction,
530    /// this method will do nothing and return `0`.
531    /// Otherwise, it returns the number of retransmitted frames.
532    /// Recurring retransmissions have an [`rto_base_sender`](SessionConfig) timeout with backoff.
533    async fn retransmit_unacknowledged_frames(&mut self) -> crate::errors::Result<usize> {
534        if self.cfg.acknowledged_frames_buffer == 0 {
535            return Ok(0);
536        }
537
538        let now = Instant::now();
539
540        // Retain only non-expired frames, collect all of which are due for re-send
541        let mut frames_to_resend = BTreeSet::new();
542        self.outgoing_frame_resends.retain(|frame_id, retry_log| {
543            let check_res = retry_log.check(
544                now,
545                self.cfg.rto_base_sender,
546                self.cfg.frame_expiration_age,
547                self.cfg.rto_jitter,
548            );
549            match check_res {
550                RetryResult::Wait(d) => {
551                    trace!(
552                        session_id = self.session_id,
553                        frame_id,
554                        wait_timeout_in_ms = d.as_millis(),
555                        "frame will retransmit"
556                    );
557                    true
558                }
559                RetryResult::RetryNow(next_retry) => {
560                    // Single segment frame scenario
561                    frames_to_resend.insert(*frame_id);
562                    *retry_log = next_retry;
563                    debug!(session_id = self.session_id, frame_id, "frame will self-resend now");
564                    true
565                }
566                RetryResult::Expired => {
567                    debug!(session_id = self.session_id, frame_id, "frame expired");
568                    false
569                }
570            }
571        });
572
573        trace!(
574            session_id = self.session_id,
575            count = frames_to_resend.len(),
576            "frames will auto-resend",
577        );
578
579        // Find all segments of the frames to resend in the lookbehind buffer,
580        // skip those that are not in the lookbehind buffer anymore
581        let mut count = 0;
582        let frames_to_resend = frames_to_resend
583            .into_iter()
584            .flat_map(|f| self.lookbehind.iter().filter(move |e| e.key().0 == f))
585            .inspect(|e| {
586                trace!(
587                    session_id = self.session_id,
588                    key = ?e.key(),
589                    "SENDING: auto-retransmitted"
590                );
591                count += 1
592            })
593            .map(|e| Ok(SessionMessage::<C>::Segment(e.value().clone())))
594            .collect::<Vec<_>>();
595
596        self.segment_egress_send
597            .send_all(&mut futures::stream::iter(frames_to_resend))
598            .await
599            .map_err(|e| SessionError::ProcessingError(e.to_string()))?;
600
601        trace!(
602            session_id = self.session_id,
603            count, "AUTO-RETRANSMIT BATCH COMPLETE: re-sent segments",
604        );
605
606        Ok(count)
607    }
608
609    /// Segments the `data` and sends them as (possibly multiple) [`SessionMessage::Segment`].
610    /// Therefore, this method sends as many messages as needed after the data was segmented.
611    /// Each segment is inserted into the lookbehind ring buffer for possible retransmissions.
612    ///
613    /// The size of the lookbehind ring buffer is given by the [`max_buffered_segments`](SessionConfig)
614    /// given during the construction. It needs to accommodate as many segments as
615    /// is the expected underlying transport bandwidth (segment/sec) to guarantee the retransmission
616    /// can still happen within some time window.
617    pub async fn send_frame_data(&mut self, data: &[u8]) -> crate::errors::Result<()> {
618        if !(1..=Self::MAX_WRITE_SIZE).contains(&data.len()) {
619            return Err(SessionError::IncorrectMessageLength.into());
620        }
621
622        let frame_id = self.outgoing_frame_id.fetch_add(1, Ordering::SeqCst);
623        let segments = segment(data, Self::PAYLOAD_CAPACITY, frame_id)?;
624        let count = segments.len();
625
626        for segment in segments {
627            let msg = SessionMessage::<C>::Segment(segment.clone());
628            trace!(session_id = self.session_id, id = ?segment.id(), "SENDING: segment");
629            self.segment_egress_send
630                .feed(msg)
631                .await
632                .map_err(|e| SessionError::ProcessingError(e.to_string()))?;
633
634            // This is the only place where we insert into the lookbehind buffer
635            self.lookbehind.insert((&segment).into(), segment.clone());
636            while self.lookbehind.len() > self.cfg.max_buffered_segments {
637                self.lookbehind.pop_front();
638            }
639        }
640
641        self.segment_egress_send
642            .flush()
643            .await
644            .map_err(|e| SessionError::ProcessingError(e.to_string()))?;
645        self.outgoing_frame_resends
646            .insert(frame_id, RetryToken::new(Instant::now(), self.cfg.backoff_base));
647
648        trace!(
649            session_id = self.session_id,
650            frame_id, count, "FRAME SEND COMPLETE: sent segments",
651        );
652
653        Ok(())
654    }
655
656    /// Convenience method to advance the state by calling all three methods in order:
657    /// - [`SessionState::acknowledge_segments`]
658    /// - [`SessionState::request_missing_segments`]
659    /// - [`SessionState::retransmit_unacknowledged_frames`]
660    async fn state_loop(&mut self) -> crate::errors::Result<()> {
661        // Rate limiter for reassembler evictions:
662        // tries to evict 10 times before a frame expires
663        let eviction_limiter =
664            governor::RateLimiter::direct(Quota::with_period(self.cfg.frame_expiration_age / 10).ok_or(
665                NetworkTypeError::Other("rate limiter frame_expiration_age invalid".into()),
666            )?);
667
668        // Rate limiter for acknowledgements:
669        // sends acknowledgements 4 times more often
670        // than the other side can retransmit them, or we ask for retransmissions.
671        let ack_rate_limiter = governor::RateLimiter::direct(
672            Quota::with_period(self.cfg.rto_base_sender.min(self.cfg.rto_base_receiver) / 4)
673                .ok_or(NetworkTypeError::Other("rate limiter ack rate invalid".into()))?,
674        );
675
676        // Rate limiter for retransmissions by the sender
677        let sender_retransmit = governor::RateLimiter::direct(
678            Quota::with_period(self.cfg.rto_base_sender)
679                .ok_or(NetworkTypeError::Other("rate limiter rto sender invalid".into()))?,
680        );
681
682        // Rate limiter for retransmissions by the receiver
683        let receiver_retransmit = governor::RateLimiter::direct(
684            Quota::with_period(self.cfg.rto_base_receiver)
685                .ok_or(NetworkTypeError::Other("rate limiter rto receiver invalid".into()))?,
686        );
687
688        loop {
689            let mut evict_fut = eviction_limiter.until_ready().boxed().fuse();
690            let mut ack_fut = maybe_fused_future(
691                self.cfg.enabled_features.contains(&SessionFeature::AcknowledgeFrames),
692                ack_rate_limiter.until_ready(),
693            );
694            let mut r_snd_fut = maybe_fused_future(
695                self.cfg.enabled_features.contains(&SessionFeature::RetransmitFrames),
696                sender_retransmit.until_ready(),
697            );
698            let mut r_rcv_fut = maybe_fused_future(
699                self.cfg
700                    .enabled_features
701                    .contains(&SessionFeature::RequestIncompleteFrames),
702                receiver_retransmit.until_ready(),
703            );
704            let mut is_done = maybe_fused_future(self.segment_egress_send.is_closed(), futures::future::ready(()));
705
706            // Futures in `select_biased!` are ordered from the least often happening first.
707            // This means that the least happening events will not get starved by those
708            // that happen very often.
709            if let Err(e) = futures::select_biased! {
710                _ = is_done => {
711                    Err(NetworkTypeError::Other("session writer has been closed".into()))
712                },
713                _ = r_rcv_fut => {
714                    self.request_missing_segments().await
715                },
716                _ = r_snd_fut => {
717                    self.retransmit_unacknowledged_frames().await
718                },
719                _ = ack_fut => {
720                    self.acknowledge_segments().await
721                },
722                 _ = evict_fut => {
723                    self.frame_reassembler.evict().map_err(NetworkTypeError::from)
724                },
725            } {
726                debug!(session_id = self.session_id, "session is closing: {e}");
727                break;
728            }
729        }
730
731        Ok(())
732    }
733
734    /// Returns the ID of this session.
735    pub fn session_id(&self) -> &str {
736        &self.session_id
737    }
738}
739
740// Sink for data coming from downstream
741impl<const C: usize> Sink<SessionMessage<C>> for SessionState<C> {
742    type Error = NetworkTypeError;
743
744    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
745        Poll::Ready(Ok(()))
746    }
747
748    fn start_send(mut self: Pin<&mut Self>, item: SessionMessage<C>) -> Result<(), Self::Error> {
749        match item {
750            SessionMessage::Segment(s) => self.consume_segment(s),
751            SessionMessage::Request(r) => self.retransmit_segments(r),
752            SessionMessage::Acknowledge(f) => self.acknowledged_frames(f),
753        }
754    }
755
756    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
757        Poll::Ready(Ok(()))
758    }
759
760    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
761        self.frame_reassembler.close();
762        Poll::Ready(Ok(()))
763    }
764}
765
766#[cfg_attr(doc, aquamarine::aquamarine)]
767/// Represents a socket for a session between two nodes bound by the
768/// underlying [network transport](AsyncWrite) and the maximum transmission unit (MTU) of `C`.
769///
770/// It also implements [`AsyncRead`] and [`AsyncWrite`] so that it can
771/// be used on top of the usual transport stack.
772///
773/// Based on the [configuration](SessionConfig), the `SessionSocket` can support:
774/// - frame segmentation and reassembly
775/// - segment and frame retransmission and reliability
776///
777/// See the module docs for details on retransmission.
778///
779/// # Retransmission driven by the Receiver
780/// ```mermaid
781/// sequenceDiagram
782///     Note over Sender,Receiver: Frame 1
783///     rect rgb(191, 223, 255)
784///     Note left of Sender: Frame 1 in buffer
785///     Sender->>Receiver: Segment 1/3 of Frame 1
786///     Sender->>Receiver: Segment 2/3 of Frame 1
787///     Sender--xReceiver: Segment 3/3 of Frame 1
788///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
789///     Receiver->>Sender: Request Segment 3 of Frame 1
790///     Sender->>Receiver: Segment 3/3 of Frame 1
791///     Receiver->>Sender: Acknowledge Frame 1
792///     Note left of Sender: Frame 1 dropped from buffer
793///     end
794///     Note over Sender,Receiver: Frame 1 delivered
795/// ```
796///
797/// # Retransmission driven by the Sender
798/// ```mermaid
799/// sequenceDiagram
800///     Note over Sender,Receiver: Frame 1
801///     rect rgb(191, 223, 255)
802///     Note left of Sender: Frame 1 in buffer
803///     Sender->>Receiver: Segment 1/3 of Frame 1
804///     Sender->>Receiver: Segment 2/3 of Frame 1
805///     Sender--xReceiver: Segment 3/3 of Frame 1
806///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
807///     Receiver--xSender: Request Segment 3 of Frame 1
808///     Note left of Sender: RTO_BASE_SENDER elapsed
809///     Sender->>Receiver: Segment 1/3 of Frame 1
810///     Sender->>Receiver: Segment 2/3 of Frame 1
811///     Sender->>Receiver: Segment 3/3 of Frame 1
812///     Receiver->>Sender: Acknowledge Frame 1
813///     Note left of Sender: Frame 1 dropped from buffer
814///     end
815///     Note over Sender,Receiver: Frame 1 delivered
816/// ```
817///
818/// # Sender-Receiver retransmission handover
819///
820/// ```mermaid
821///    sequenceDiagram
822///     Note over Sender,Receiver: Frame 1
823///     rect rgb(191, 223, 255)
824///     Note left of Sender: Frame 1 in buffer
825///     Sender->>Receiver: Segment 1/3 of Frame 1
826///     Sender--xReceiver: Segment 2/3 of Frame 1
827///     Sender--xReceiver: Segment 3/3 of Frame 1
828///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
829///     Receiver->>Sender: Request Segments 2,3 of Frame 1
830///     Note left of Sender: RTO_BASE_SENDER cancelled
831///     Sender->>Receiver: Segment 2/3 of Frame 1
832///     Sender--xReceiver: Segment 3/3 of Frame 1
833///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
834///     Receiver--xSender: Request Segments 3 of Frame 1
835///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
836///     Receiver->>Sender: Request Segments 3 of Frame 1
837///     Sender->>Receiver: Segment 3/3 of Frame 1
838///     Receiver->>Sender: Acknowledge Frame 1
839///     Note left of Sender: Frame 1 dropped from buffer
840///     end
841///     Note over Sender,Receiver: Frame 1 delivered
842/// ```
843///
844/// # Retransmission failure
845///
846/// ```mermaid
847///    sequenceDiagram
848///     Note over Sender,Receiver: Frame 1
849///     rect rgb(191, 223, 255)
850///     Note left of Sender: Frame 1 in buffer
851///     Sender->>Receiver: Segment 1/3 of Frame 1
852///     Sender->>Receiver: Segment 2/3 of Frame 1
853///     Sender--xReceiver: Segment 3/3 of Frame 1
854///     Note right of Receiver: RTO_BASE_RECEIVER elapsed
855///     Receiver--xSender: Request Segment 3 of Frame 1
856///     Note left of Sender: RTO_BASE_SENDER elapsed
857///     Sender--xReceiver: Segment 1/3 of Frame 1
858///     Sender--xReceiver: Segment 2/3 of Frame 1
859///     Sender--xReceiver: Segment 3/3 of Frame 1
860///     Note left of Sender: FRAME_MAX_AGE elapsed<br/>Frame 1 dropped from buffer
861///     Note right of Receiver: FRAME_MAX_AGE elapsed<br/>Frame 1 dropped from buffer
862///     end
863///     Note over Sender,Receiver: Frame 1 never delivered
864/// ```
865pub struct SessionSocket<const C: usize> {
866    state: SessionState<C>,
867    frame_egress: Box<dyn AsyncRead + Send + Unpin>,
868}
869
870impl<const C: usize> SessionSocket<C> {
871    /// Maximum number of bytes that can be written in a single `poll_write` to the Session.
872    pub const MAX_WRITE_SIZE: usize = SessionState::<C>::MAX_WRITE_SIZE;
873    /// Payload capacity is MTU minus the sizes of the Session protocol headers.
874    pub const PAYLOAD_CAPACITY: usize = SessionState::<C>::PAYLOAD_CAPACITY;
875
876    /// Create a new socket over the given underlying `transport` that binds the communicating parties.
877    /// A human-readable session `id` also must be supplied.
878    pub fn new<T, I>(id: I, transport: T, cfg: SessionConfig) -> Self
879    where
880        T: AsyncWrite + AsyncRead + Send + 'static,
881        I: Display + Send + 'static,
882    {
883        assert!(
884            C >= SessionMessage::<C>::minimum_message_size(),
885            "given MTU is too small"
886        );
887
888        let (reassembler, egress) = FrameReassembler::new(cfg.frame_expiration_age);
889
890        let to_acknowledge = Arc::new(ArrayQueue::new(cfg.acknowledged_frames_buffer.max(1)));
891        let incoming_frame_retries = Arc::new(DashMap::new());
892
893        let incoming_frame_retries_clone = incoming_frame_retries.clone();
894        let id_clone = id.to_string().clone();
895        let to_acknowledge_clone = to_acknowledge.clone();
896        let ack_enabled = cfg.enabled_features.contains(&SessionFeature::AcknowledgeFrames);
897
898        let frame_egress = Box::new(
899            egress
900                .filter_map(move |maybe_frame| {
901                    match maybe_frame {
902                        Ok(frame) => {
903                            trace!(session_id = id_clone, frame_id = frame.frame_id, "frame completed");
904                            // The frame has been completed, so remove its retry record
905                            incoming_frame_retries_clone.remove(&frame.frame_id);
906                            if ack_enabled {
907                                // Acts as a ring buffer, so if the buffer is full, any unsent acknowledgements
908                                // will be discarded.
909                                to_acknowledge_clone.force_push(frame.frame_id);
910                            }
911                            futures::future::ready(Some(Ok(frame)))
912                        }
913                        Err(SessionError::FrameDiscarded(fid)) | Err(SessionError::IncompleteFrame(fid)) => {
914                            // Remove the retry token because the frame has been discarded
915                            incoming_frame_retries_clone.remove(&fid);
916                            warn!(session_id = id_clone, frame_id = fid, "frame skipped");
917                            futures::future::ready(None) // Skip discarded frames
918                        }
919                        Err(e) => {
920                            error!(session_id = id_clone, "error on frame reassembly: {e}");
921                            futures::future::ready(Some(Err(std::io::Error::other(e))))
922                        }
923                    }
924                })
925                .into_async_read(),
926        );
927
928        let (segment_egress_send, segment_egress_recv) = futures::channel::mpsc::unbounded();
929
930        let (downstream_read, downstream_write) = transport.split();
931
932        // As `segment_egress_recv` terminates `forward` will flush the downstream buffer
933        let downstream_write = futures::io::BufWriter::with_capacity(
934            if !cfg.enabled_features.contains(&SessionFeature::NoDelay) {
935                C
936            } else {
937                0
938            },
939            downstream_write,
940        );
941
942        let state = SessionState {
943            lookbehind: Arc::new(SkipMap::new()),
944            outgoing_frame_id: Arc::new(AtomicU32::new(1)),
945            frame_reassembler: Arc::new(reassembler),
946            outgoing_frame_resends: Arc::new(DashMap::new()),
947            session_id: id.to_string(),
948            to_acknowledge,
949            incoming_frame_retries,
950            segment_egress_send,
951            cfg,
952        };
953
954        // Segment egress to downstream
955        spawn(async move {
956            if let Err(e) = segment_egress_recv
957                .map(|m: SessionMessage<C>| Ok(m.into_encoded()))
958                .forward(downstream_write.into_sink())
959                .await
960            {
961                error!(session_id = %id, error = %e, "FINISHED: forwarding to downstream terminated with error")
962            } else {
963                debug!(session_id = %id, "FINISHED: forwarding to downstream done");
964            }
965        });
966
967        // Segment ingress from downstream
968        spawn(
969            AsyncReadStreamer::<C, _>(downstream_read)
970                .map_err(|e| NetworkTypeError::SessionProtocolError(SessionError::ProcessingError(e.to_string())))
971                .and_then(|m| futures::future::ok(futures::stream::iter(SessionMessageIter::from(m.into_vec()))))
972                .try_flatten()
973                .forward(state.clone()),
974        );
975
976        // Advance the state until the socket is closed
977        let mut state_clone = state.clone();
978        spawn(async move {
979            let loop_done = state_clone.state_loop().await;
980            debug!(
981                session_id = state_clone.session_id,
982                "FINISHED: state loop {loop_done:?}"
983            );
984        });
985
986        Self { state, frame_egress }
987    }
988
989    /// Gets the [state](SessionState) of this socket.
990    pub fn state(&self) -> &SessionState<C> {
991        &self.state
992    }
993
994    /// Gets the mutable [state](SessionState) of this socket.
995    pub fn state_mut(&mut self) -> &mut SessionState<C> {
996        &mut self.state
997    }
998}
999
1000impl<const C: usize> AsyncWrite for SessionSocket<C> {
1001    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
1002        let len_to_write = Self::MAX_WRITE_SIZE.min(buf.len());
1003        tracing::trace!(
1004            session_id = self.state.session_id(),
1005            number_of_bytes = len_to_write,
1006            "polling write of bytes on socket reader inside session",
1007        );
1008
1009        // Zero-length write will always pass
1010        if len_to_write == 0 {
1011            return Poll::Ready(Ok(0));
1012        }
1013
1014        let mut socket_future = self.state.send_frame_data(&buf[..len_to_write]).boxed();
1015        match Pin::new(&mut socket_future).poll(cx) {
1016            Poll::Ready(Ok(())) => Poll::Ready(Ok(len_to_write)),
1017            Poll::Ready(Err(e)) => Poll::Ready(Err(std::io::Error::other(e))),
1018            Poll::Pending => Poll::Pending,
1019        }
1020    }
1021
1022    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1023        tracing::trace!(
1024            session_id = self.state.session_id(),
1025            "polling flush on socket reader inside session"
1026        );
1027        let inner = &mut self.state.segment_egress_send;
1028        pin_mut!(inner);
1029        inner.poll_flush(cx).map_err(std::io::Error::other)
1030    }
1031
1032    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1033        tracing::trace!(
1034            session_id = self.state.session_id(),
1035            "polling close on socket reader inside session"
1036        );
1037        // We call close_channel instead of poll_close to also end the receiver
1038        self.state.segment_egress_send.close_channel();
1039        Poll::Ready(Ok(()))
1040    }
1041}
1042
1043impl<const C: usize> AsyncRead for SessionSocket<C> {
1044    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
1045        tracing::trace!(
1046            session_id = self.state.session_id(),
1047            "polling read on socket reader inside session"
1048        );
1049        let inner = &mut self.frame_egress;
1050        pin_mut!(inner);
1051        inner.poll_read(cx, buf)
1052    }
1053}
1054
1055#[cfg(test)]
1056mod tests {
1057    use std::iter::Extend;
1058
1059    use futures::{
1060        future::Either,
1061        io::{AsyncReadExt, AsyncWriteExt},
1062        pin_mut,
1063    };
1064    use hex_literal::hex;
1065    use parameterized::parameterized;
1066    use rand::{Rng, SeedableRng, rngs::StdRng};
1067    use test_log::test;
1068
1069    use super::*;
1070    use crate::{
1071        session::utils::{FaultyNetwork, FaultyNetworkConfig, NetworkStats},
1072        utils::DuplexIO,
1073    };
1074
1075    const MTU: usize = 466; // MTU used by HOPR
1076
1077    // Using static RNG seed to make tests reproducible between different runs
1078    const RNG_SEED: [u8; 32] = hex!("d8a471f1c20490a3442b96fdde9d1807428096e1601b0cef0eea7e6d44a24c01");
1079
1080    fn setup_alice_bob(
1081        cfg: SessionConfig,
1082        network_cfg: FaultyNetworkConfig,
1083        alice_stats: Option<NetworkStats>,
1084        bob_stats: Option<NetworkStats>,
1085    ) -> (SessionSocket<MTU>, SessionSocket<MTU>) {
1086        let (alice_stats, bob_stats) = alice_stats
1087            .zip(bob_stats)
1088            .map(|(alice, bob)| {
1089                (
1090                    NetworkStats {
1091                        packets_sent: bob.packets_sent,
1092                        bytes_sent: bob.bytes_sent,
1093                        packets_received: alice.packets_received,
1094                        bytes_received: alice.bytes_received,
1095                    },
1096                    NetworkStats {
1097                        packets_sent: alice.packets_sent,
1098                        bytes_sent: alice.bytes_sent,
1099                        packets_received: bob.packets_received,
1100                        bytes_received: bob.bytes_received,
1101                    },
1102                )
1103            })
1104            .unzip();
1105
1106        let (alice_reader, alice_writer) = FaultyNetwork::<MTU>::new(network_cfg, alice_stats).split();
1107        let (bob_reader, bob_writer) = FaultyNetwork::<MTU>::new(network_cfg, bob_stats).split();
1108
1109        let alice_to_bob = SessionSocket::new("alice", DuplexIO(alice_reader, bob_writer), cfg.clone());
1110        let bob_to_alice = SessionSocket::new("bob", DuplexIO(bob_reader, alice_writer), cfg.clone());
1111
1112        (alice_to_bob, bob_to_alice)
1113    }
1114
1115    async fn send_and_recv<S>(
1116        num_frames: usize,
1117        frame_size: usize,
1118        alice: S,
1119        bob: S,
1120        timeout: Duration,
1121        alice_to_bob_only: bool,
1122        randomized_frame_sizes: bool,
1123    ) where
1124        S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1125    {
1126        #[derive(PartialEq, Eq)]
1127        enum Direction {
1128            Send,
1129            Recv,
1130            Both,
1131        }
1132
1133        let frame_sizes = if randomized_frame_sizes {
1134            let norm_dist = rand_distr::Normal::new(frame_size as f64 * 0.75, frame_size as f64 / 4.0).unwrap();
1135            StdRng::from_seed(RNG_SEED)
1136                .sample_iter(norm_dist)
1137                .map(|s| (s as usize).max(10).min(2 * frame_size))
1138                .take(num_frames)
1139                .collect::<Vec<_>>()
1140        } else {
1141            std::iter::repeat_n(frame_size, num_frames).collect::<Vec<_>>()
1142        };
1143
1144        let socket_worker = |mut socket: S, d: Direction| {
1145            let frame_sizes = frame_sizes.clone();
1146            let frame_sizes_total = frame_sizes.iter().sum();
1147            async move {
1148                let mut received = Vec::with_capacity(frame_sizes_total);
1149                let mut sent = Vec::with_capacity(frame_sizes_total);
1150
1151                if d == Direction::Send || d == Direction::Both {
1152                    for frame_size in &frame_sizes {
1153                        let mut write = vec![0u8; *frame_size];
1154                        hopr_crypto_random::random_fill(&mut write);
1155                        let _ = socket.write(&write).await?;
1156                        sent.extend(write);
1157                    }
1158                }
1159
1160                if d == Direction::Recv || d == Direction::Both {
1161                    // Either read everything or timeout trying
1162                    while received.len() < frame_sizes_total {
1163                        let mut buffer = [0u8; 2048];
1164                        let read = socket.read(&mut buffer).await?;
1165                        received.extend(buffer.into_iter().take(read));
1166                    }
1167                }
1168
1169                // TODO: fix this so it works properly
1170                // We cannot close immediately as some ack/resends might be ongoing
1171                // socket.close().await.unwrap();
1172
1173                Ok::<_, std::io::Error>((sent, received))
1174            }
1175        };
1176
1177        let alice_worker = tokio::task::spawn(socket_worker(
1178            alice,
1179            if alice_to_bob_only {
1180                Direction::Send
1181            } else {
1182                Direction::Both
1183            },
1184        ));
1185        let bob_worker = tokio::task::spawn(socket_worker(
1186            bob,
1187            if alice_to_bob_only {
1188                Direction::Recv
1189            } else {
1190                Direction::Both
1191            },
1192        ));
1193
1194        let send_recv = futures::future::join(
1195            async move { alice_worker.await.expect("alice should not fail") },
1196            async move { bob_worker.await.expect("bob should not fail") },
1197        );
1198        let timeout = tokio::time::sleep(timeout);
1199
1200        pin_mut!(send_recv);
1201        pin_mut!(timeout);
1202
1203        match futures::future::select(send_recv, timeout).await {
1204            Either::Left(((Ok((alice_sent, alice_recv)), Ok((bob_sent, bob_recv))), _)) => {
1205                assert_eq!(
1206                    hex::encode(alice_sent),
1207                    hex::encode(bob_recv),
1208                    "alice sent must be equal to bob received"
1209                );
1210                assert_eq!(
1211                    hex::encode(bob_sent),
1212                    hex::encode(alice_recv),
1213                    "bob sent must be equal to alice received",
1214                );
1215            }
1216            Either::Left(((Err(e), _), _)) => panic!("alice send recv error: {e}"),
1217            Either::Left(((_, Err(e)), _)) => panic!("bob send recv error: {e}"),
1218            Either::Right(_) => panic!("timeout"),
1219        }
1220    }
1221
1222    #[parameterized(num_frames = {10, 100, 1000}, frame_size = {1500, 1500, 1500})]
1223    #[parameterized_macro(tokio::test)]
1224    async fn reliable_send_recv_with_no_acks(num_frames: usize, frame_size: usize) {
1225        let cfg = SessionConfig {
1226            enabled_features: HashSet::new(),
1227            ..Default::default()
1228        };
1229
1230        let (alice_to_bob, bob_to_alice) = setup_alice_bob(cfg, Default::default(), None, None);
1231
1232        send_and_recv(
1233            num_frames,
1234            frame_size,
1235            alice_to_bob,
1236            bob_to_alice,
1237            Duration::from_secs(10),
1238            false,
1239            false,
1240        )
1241        .await;
1242    }
1243
1244    #[parameterized(num_frames = {10, 100, 1000}, frame_size = {1500, 1500, 1500})]
1245    #[parameterized_macro(tokio::test)]
1246    async fn reliable_send_recv_with_with_acks(num_frames: usize, frame_size: usize) {
1247        let cfg = SessionConfig { ..Default::default() };
1248
1249        let (alice_to_bob, bob_to_alice) = setup_alice_bob(cfg, Default::default(), None, None);
1250
1251        send_and_recv(
1252            num_frames,
1253            frame_size,
1254            alice_to_bob,
1255            bob_to_alice,
1256            Duration::from_secs(10),
1257            false,
1258            false,
1259        )
1260        .await;
1261    }
1262
1263    #[parameterized(num_frames = {10, 100, 1000}, frame_size = {1500, 1500, 1500})]
1264    #[parameterized_macro(tokio::test)]
1265    async fn unreliable_send_recv(num_frames: usize, frame_size: usize) {
1266        let cfg = SessionConfig {
1267            rto_base_receiver: Duration::from_millis(10),
1268            rto_base_sender: Duration::from_millis(500),
1269            frame_expiration_age: Duration::from_secs(30),
1270            backoff_base: 1.001,
1271            ..Default::default()
1272        };
1273
1274        let net_cfg = FaultyNetworkConfig {
1275            fault_prob: 0.33,
1276            ..Default::default()
1277        };
1278
1279        let (alice_to_bob, bob_to_alice) = setup_alice_bob(cfg, net_cfg, None, None);
1280
1281        send_and_recv(
1282            num_frames,
1283            frame_size,
1284            alice_to_bob,
1285            bob_to_alice,
1286            Duration::from_secs(30),
1287            false,
1288            false,
1289        )
1290        .await;
1291    }
1292
1293    #[ignore]
1294    #[parameterized(num_frames = {10, 100, 1000}, frame_size = {1500, 1500, 1500})]
1295    #[parameterized_macro(tokio::test)]
1296    async fn unreliable_send_recv_with_mixing(num_frames: usize, frame_size: usize) {
1297        let cfg = SessionConfig {
1298            rto_base_receiver: Duration::from_millis(10),
1299            rto_base_sender: Duration::from_millis(500),
1300            frame_expiration_age: Duration::from_secs(30),
1301            backoff_base: 1.001,
1302            ..Default::default()
1303        };
1304
1305        let net_cfg = FaultyNetworkConfig {
1306            fault_prob: 0.20,
1307            mixing_factor: 2,
1308            ..Default::default()
1309        };
1310
1311        let (alice_to_bob, bob_to_alice) = setup_alice_bob(cfg, net_cfg, None, None);
1312
1313        send_and_recv(
1314            num_frames,
1315            frame_size,
1316            alice_to_bob,
1317            bob_to_alice,
1318            Duration::from_secs(30),
1319            false,
1320            false,
1321        )
1322        .await;
1323    }
1324
1325    #[ignore]
1326    #[parameterized(num_frames = {10, 100, 1000}, frame_size = {1500, 1500, 1500})]
1327    #[parameterized_macro(tokio::test)]
1328    async fn almost_reliable_send_recv_with_mixing(num_frames: usize, frame_size: usize) {
1329        let cfg = SessionConfig {
1330            rto_base_sender: Duration::from_millis(500),
1331            rto_base_receiver: Duration::from_millis(10),
1332            frame_expiration_age: Duration::from_secs(30),
1333            backoff_base: 1.001,
1334            ..Default::default()
1335        };
1336
1337        let net_cfg = FaultyNetworkConfig {
1338            fault_prob: 0.1,
1339            mixing_factor: 2,
1340            ..Default::default()
1341        };
1342
1343        let (alice_to_bob, bob_to_alice) = setup_alice_bob(cfg, net_cfg, None, None);
1344
1345        send_and_recv(
1346            num_frames,
1347            frame_size,
1348            alice_to_bob,
1349            bob_to_alice,
1350            Duration::from_secs(30),
1351            false,
1352            false,
1353        )
1354        .await;
1355    }
1356
1357    #[ignore]
1358    #[parameterized(num_frames = {10, 100, 1000}, frame_size = {1500, 1500, 1500})]
1359    #[parameterized_macro(tokio::test)]
1360    async fn reliable_send_recv_with_mixing(num_frames: usize, frame_size: usize) {
1361        let cfg = SessionConfig {
1362            rto_base_sender: Duration::from_millis(500),
1363            rto_base_receiver: Duration::from_millis(10),
1364            frame_expiration_age: Duration::from_secs(30),
1365            backoff_base: 1.001,
1366            ..Default::default()
1367        };
1368
1369        let net_cfg = FaultyNetworkConfig {
1370            mixing_factor: 2,
1371            ..Default::default()
1372        };
1373
1374        let (alice_to_bob, bob_to_alice) = setup_alice_bob(cfg, net_cfg, None, None);
1375
1376        send_and_recv(
1377            num_frames,
1378            frame_size,
1379            alice_to_bob,
1380            bob_to_alice,
1381            Duration::from_secs(30),
1382            false,
1383            false,
1384        )
1385        .await;
1386    }
1387
1388    #[test(tokio::test)]
1389    async fn small_frames_should_be_sent_as_single_transport_msgs_with_buffering_disabled() {
1390        const NUM_FRAMES: usize = 10;
1391        const FRAME_SIZE: usize = 64;
1392
1393        let cfg = SessionConfig {
1394            enabled_features: HashSet::from_iter([SessionFeature::NoDelay]),
1395            ..Default::default()
1396        };
1397
1398        let alice_stats = NetworkStats::default();
1399        let bob_stats = NetworkStats::default();
1400
1401        let (alice_to_bob, bob_to_alice) = setup_alice_bob(
1402            cfg,
1403            FaultyNetworkConfig::default(),
1404            alice_stats.clone().into(),
1405            bob_stats.clone().into(),
1406        );
1407
1408        send_and_recv(
1409            NUM_FRAMES,
1410            FRAME_SIZE,
1411            alice_to_bob,
1412            bob_to_alice,
1413            Duration::from_secs(30),
1414            true,
1415            false,
1416        )
1417        .await;
1418
1419        assert_eq!(bob_stats.packets_received.load(Ordering::Relaxed), NUM_FRAMES);
1420        assert_eq!(alice_stats.packets_sent.load(Ordering::Relaxed), NUM_FRAMES);
1421
1422        assert_eq!(
1423            alice_stats.bytes_sent.load(Ordering::Relaxed),
1424            NUM_FRAMES * (FRAME_SIZE + SessionMessage::<MTU>::SEGMENT_OVERHEAD)
1425        );
1426        assert_eq!(
1427            bob_stats.bytes_received.load(Ordering::Relaxed),
1428            NUM_FRAMES * (FRAME_SIZE + SessionMessage::<MTU>::SEGMENT_OVERHEAD)
1429        );
1430    }
1431
1432    #[test(tokio::test)]
1433    async fn small_frames_should_be_sent_batched_in_transport_msgs_with_buffering_enabled() {
1434        const NUM_FRAMES: usize = 10;
1435        const FRAME_SIZE: usize = 64;
1436
1437        let cfg = SessionConfig {
1438            enabled_features: HashSet::new(),
1439            ..Default::default()
1440        };
1441
1442        let alice_stats = NetworkStats::default();
1443        let bob_stats = NetworkStats::default();
1444
1445        let (alice_to_bob, bob_to_alice) = setup_alice_bob(
1446            cfg,
1447            FaultyNetworkConfig::default(),
1448            alice_stats.clone().into(),
1449            bob_stats.clone().into(),
1450        );
1451
1452        send_and_recv(
1453            NUM_FRAMES,
1454            FRAME_SIZE,
1455            alice_to_bob,
1456            bob_to_alice,
1457            Duration::from_secs(30),
1458            true,
1459            false,
1460        )
1461        .await;
1462
1463        assert!(bob_stats.packets_received.load(Ordering::Relaxed) < NUM_FRAMES);
1464        assert!(alice_stats.packets_sent.load(Ordering::Relaxed) < NUM_FRAMES);
1465
1466        assert_eq!(
1467            alice_stats.bytes_sent.load(Ordering::Relaxed),
1468            NUM_FRAMES * (FRAME_SIZE + SessionMessage::<MTU>::SEGMENT_OVERHEAD)
1469        );
1470        assert_eq!(
1471            bob_stats.bytes_received.load(Ordering::Relaxed),
1472            NUM_FRAMES * (FRAME_SIZE + SessionMessage::<MTU>::SEGMENT_OVERHEAD)
1473        );
1474    }
1475
1476    #[test(tokio::test)]
1477    async fn receiving_on_disconnected_network_should_timeout() -> anyhow::Result<()> {
1478        let cfg = SessionConfig {
1479            rto_base_sender: Duration::from_millis(250),
1480            rto_base_receiver: Duration::from_millis(300),
1481            frame_expiration_age: Duration::from_secs(2),
1482            ..Default::default()
1483        };
1484
1485        let net_cfg = FaultyNetworkConfig {
1486            fault_prob: 1.0, // throws away 100% of packets
1487            mixing_factor: 0,
1488            ..Default::default()
1489        };
1490
1491        let (mut alice_to_bob, mut bob_to_alice) = setup_alice_bob(cfg, net_cfg, None, None);
1492        let data = b"will not be delivered!";
1493
1494        let _ = alice_to_bob.write(data.as_ref()).await?;
1495
1496        let mut out = vec![0u8; data.len()];
1497        let f1 = bob_to_alice.read_exact(&mut out);
1498        let f2 = tokio::time::sleep(Duration::from_secs(3));
1499        pin_mut!(f1);
1500        pin_mut!(f2);
1501
1502        match futures::future::select(f1, f2).await {
1503            Either::Left(_) => panic!("should timeout: {:?}", out),
1504            Either::Right(_) => {}
1505        }
1506
1507        Ok(())
1508    }
1509
1510    #[test(tokio::test)]
1511    async fn single_frame_resend_should_be_resent_on_unreliable_network() -> anyhow::Result<()> {
1512        let cfg = SessionConfig {
1513            rto_base_sender: Duration::from_millis(250),
1514            rto_base_receiver: Duration::from_millis(300),
1515            frame_expiration_age: Duration::from_secs(10),
1516            ..Default::default()
1517        };
1518
1519        let net_cfg = FaultyNetworkConfig {
1520            fault_prob: 0.5, // throws away 50% of packets
1521            mixing_factor: 0,
1522            ..Default::default()
1523        };
1524
1525        let (mut alice_to_bob, mut bob_to_alice) = setup_alice_bob(cfg, net_cfg, None, None);
1526        let data = b"will be re-delivered!";
1527
1528        let _ = alice_to_bob.write(data.as_ref()).await?;
1529
1530        let mut out = vec![0u8; data.len()];
1531        let f1 = bob_to_alice.read_exact(&mut out);
1532        let f2 = tokio::time::sleep(Duration::from_secs(5));
1533        pin_mut!(f1);
1534        pin_mut!(f2);
1535
1536        match futures::future::select(f1, f2).await {
1537            Either::Left(_) => {}
1538            Either::Right(_) => panic!("timeout"),
1539        }
1540
1541        Ok(())
1542    }
1543}