Skip to main content

hopr_protocol_session/processing/
mod.rs

1//! This module contains the three main parts for frame processing.
2//! Each of them is represented using an adaptor
3//! extension to [`futures::Stream`] or [`futures::Sink`]
4//!
5//!
6//! 1. Segmenter
7//! 2. Reassembler
8//! 3. Sequencer
9//!
10//! Reassembler followed by a Sequencer is commonly called frame Reconstructor.
11
12mod reassembly;
13mod segmenter;
14mod sequencer;
15/// Types necessary for frame reconstruction and segmentation.
16pub(crate) mod types;
17
18pub(crate) use reassembly::ReassemblerExt;
19pub(crate) use segmenter::SegmenterExt;
20pub(crate) use sequencer::SequencerExt;
21
22#[cfg(test)]
23mod tests {
24    use std::time::Duration;
25
26    use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt, TryStreamExt, pin_mut};
27    use futures_time::future::FutureExt;
28    use rand::prelude::*;
29
30    use super::*;
31    use crate::{
32        errors::SessionError,
33        protocol::{Frame, OrderedFrame},
34        utils::segment,
35    };
36
37    const RNG_SEED: [u8; 32] = hex_literal::hex!("d8a471f1c20490a3442b96fdde9d1807428096e1601b0cef0eea7e6d44a24c01");
38
39    #[tokio::test]
40    async fn framed_reconstructor_should_reconstruct_frames() -> anyhow::Result<()> {
41        let expected = (1u32..=10)
42            .map(|frame_id| Frame {
43                frame_id,
44                data: hopr_types::crypto_random::random_bytes::<100>().into(),
45                is_terminating: false,
46            })
47            .collect::<Vec<_>>();
48
49        let (reassm_tx, reassm_rx) = futures::channel::mpsc::unbounded();
50
51        let reassm_rx = reassm_rx
52            .reassembler(Duration::from_secs(5), 1024)
53            .filter_map(|maybe_frame| match maybe_frame {
54                Ok(frame) => futures::future::ready(Some(OrderedFrame(frame))),
55                Err(_) => futures::future::ready(None),
56            })
57            .sequencer(Duration::from_secs(5), 1024)
58            .and_then(|frame| futures::future::ok(frame.0));
59
60        let mut segments = expected
61            .iter()
62            .cloned()
63            .flat_map(|f| segment(f.data, 22, f.frame_id).unwrap())
64            .collect::<Vec<_>>();
65
66        let mut rng = StdRng::from_seed(RNG_SEED);
67        segments.shuffle(&mut rng);
68
69        hopr_utils::runtime::prelude::spawn(futures::stream::iter(segments).map(Ok).forward(reassm_tx)).await??;
70
71        let actual = reassm_rx
72            .try_collect::<Vec<_>>()
73            .timeout(futures_time::time::Duration::from_secs(5))
74            .await??;
75
76        assert_eq!(actual, expected);
77
78        Ok(())
79    }
80
81    #[test_log::test(tokio::test)]
82    async fn frame_reconstructor_should_discard_missing_segment() -> anyhow::Result<()> {
83        let expected = (1u32..=10)
84            .map(|frame_id| Frame {
85                frame_id,
86                data: hopr_types::crypto_random::random_bytes::<100>().into(),
87                is_terminating: false,
88            })
89            .collect::<Vec<_>>();
90
91        let (reassm_tx, reassm_rx) = futures::channel::mpsc::unbounded();
92
93        let reassm_rx = reassm_rx
94            .reassembler(Duration::from_secs(5), 1024)
95            .filter_map(|maybe_frame| match maybe_frame {
96                Ok(frame) => futures::future::ready(Some(OrderedFrame(frame))),
97                Err(_) => futures::future::ready(None),
98            })
99            .sequencer(Duration::from_secs(5), 1024)
100            .and_then(|frame| futures::future::ok(frame.0));
101
102        let mut segments = expected
103            .iter()
104            .cloned()
105            .flat_map(|f| segment(f.data, 22, f.frame_id).unwrap())
106            .filter(|s| s.frame_id != 4 || s.seq_idx != 1)
107            .collect::<Vec<_>>();
108
109        let mut rng = StdRng::from_seed(RNG_SEED);
110        segments.shuffle(&mut rng);
111
112        hopr_utils::runtime::prelude::spawn(futures::stream::iter(segments).map(Ok).forward(reassm_tx)).await??;
113
114        let actual = reassm_rx
115            .collect::<Vec<_>>()
116            .timeout(futures_time::time::Duration::from_secs(5))
117            .await?;
118
119        assert_eq!(actual.len(), expected.len());
120        for i in 0..expected.len() {
121            if i != 3 {
122                assert!(matches!(&actual[i], Ok(frame) if expected[i].eq(frame)));
123            } else {
124                assert!(matches!(actual[i], Err(SessionError::FrameDiscarded(4))))
125            }
126        }
127
128        Ok(())
129    }
130
131    #[test_log::test(tokio::test)]
132    async fn test_segmenter_reconstructor_should_work_together() -> anyhow::Result<()> {
133        const DATA_SIZE: usize = 9001;
134        const FRAME_SIZE: usize = 1500;
135
136        const MTU: usize = 1000;
137
138        let (reassm_tx, reassm_rx) = futures::channel::mpsc::unbounded();
139
140        let data_out = reassm_rx
141            .reassembler(Duration::from_secs(1), 1024)
142            .filter_map(|maybe_frame| match maybe_frame {
143                Ok(frame) => futures::future::ready(Some(OrderedFrame(frame))),
144                Err(_) => futures::future::ready(None),
145            })
146            .sequencer(Duration::from_secs(1), 1024)
147            .and_then(|frame| futures::future::ok(frame.0));
148
149        let mut data_in = reassm_tx
150            .sink_map_err(|_| SessionError::InvalidSegment)
151            .segmenter::<MTU>(FRAME_SIZE);
152
153        let data_written = hopr_types::crypto_random::random_bytes::<DATA_SIZE>();
154
155        let data_read = tokio::task::spawn(async move {
156            let mut frame_count = 0;
157            let mut out = Vec::new();
158            let data_out = data_out
159                .inspect(|frame| {
160                    tracing::debug!("{:?}", frame);
161                    frame_count += 1;
162                })
163                .map_err(std::io::Error::other)
164                .into_async_read();
165
166            pin_mut!(data_out);
167            data_out.read_to_end(&mut out).await?;
168            Ok::<_, std::io::Error>((out, frame_count))
169        })
170        .timeout(futures_time::time::Duration::from_secs(5));
171
172        data_in.write_all(&data_written).await?;
173        data_in.flush().await?;
174        data_in.close().await?;
175
176        let (data_read, frame_count) = data_read.await???;
177
178        assert_eq!(&data_written, data_read.as_slice());
179        assert_eq!(DATA_SIZE / FRAME_SIZE + 1, frame_count);
180
181        Ok(())
182    }
183}