hopr_protocol_session/processing/
mod.rs1mod reassembly;
13mod segmenter;
14mod sequencer;
15pub(crate) mod types;
17
18pub(crate) use reassembly::ReassemblerExt;
19pub(crate) use segmenter::SegmenterExt;
20pub(crate) use sequencer::SequencerExt;
21
22#[cfg(test)]
23mod tests {
24 use std::time::Duration;
25
26 use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt, TryStreamExt, pin_mut};
27 use futures_time::future::FutureExt;
28 use rand::prelude::*;
29
30 use super::*;
31 use crate::{
32 errors::SessionError,
33 protocol::{Frame, OrderedFrame},
34 utils::segment,
35 };
36
37 const RNG_SEED: [u8; 32] = hex_literal::hex!("d8a471f1c20490a3442b96fdde9d1807428096e1601b0cef0eea7e6d44a24c01");
38
39 #[tokio::test]
40 async fn framed_reconstructor_should_reconstruct_frames() -> anyhow::Result<()> {
41 let expected = (1u32..=10)
42 .map(|frame_id| Frame {
43 frame_id,
44 data: hopr_types::crypto_random::random_bytes::<100>().into(),
45 is_terminating: false,
46 })
47 .collect::<Vec<_>>();
48
49 let (reassm_tx, reassm_rx) = futures::channel::mpsc::unbounded();
50
51 let reassm_rx = reassm_rx
52 .reassembler(Duration::from_secs(5), 1024)
53 .filter_map(|maybe_frame| match maybe_frame {
54 Ok(frame) => futures::future::ready(Some(OrderedFrame(frame))),
55 Err(_) => futures::future::ready(None),
56 })
57 .sequencer(Duration::from_secs(5), 1024)
58 .and_then(|frame| futures::future::ok(frame.0));
59
60 let mut segments = expected
61 .iter()
62 .cloned()
63 .flat_map(|f| segment(f.data, 22, f.frame_id).unwrap())
64 .collect::<Vec<_>>();
65
66 let mut rng = StdRng::from_seed(RNG_SEED);
67 segments.shuffle(&mut rng);
68
69 hopr_utils::runtime::prelude::spawn(futures::stream::iter(segments).map(Ok).forward(reassm_tx)).await??;
70
71 let actual = reassm_rx
72 .try_collect::<Vec<_>>()
73 .timeout(futures_time::time::Duration::from_secs(5))
74 .await??;
75
76 assert_eq!(actual, expected);
77
78 Ok(())
79 }
80
81 #[test_log::test(tokio::test)]
82 async fn frame_reconstructor_should_discard_missing_segment() -> anyhow::Result<()> {
83 let expected = (1u32..=10)
84 .map(|frame_id| Frame {
85 frame_id,
86 data: hopr_types::crypto_random::random_bytes::<100>().into(),
87 is_terminating: false,
88 })
89 .collect::<Vec<_>>();
90
91 let (reassm_tx, reassm_rx) = futures::channel::mpsc::unbounded();
92
93 let reassm_rx = reassm_rx
94 .reassembler(Duration::from_secs(5), 1024)
95 .filter_map(|maybe_frame| match maybe_frame {
96 Ok(frame) => futures::future::ready(Some(OrderedFrame(frame))),
97 Err(_) => futures::future::ready(None),
98 })
99 .sequencer(Duration::from_secs(5), 1024)
100 .and_then(|frame| futures::future::ok(frame.0));
101
102 let mut segments = expected
103 .iter()
104 .cloned()
105 .flat_map(|f| segment(f.data, 22, f.frame_id).unwrap())
106 .filter(|s| s.frame_id != 4 || s.seq_idx != 1)
107 .collect::<Vec<_>>();
108
109 let mut rng = StdRng::from_seed(RNG_SEED);
110 segments.shuffle(&mut rng);
111
112 hopr_utils::runtime::prelude::spawn(futures::stream::iter(segments).map(Ok).forward(reassm_tx)).await??;
113
114 let actual = reassm_rx
115 .collect::<Vec<_>>()
116 .timeout(futures_time::time::Duration::from_secs(5))
117 .await?;
118
119 assert_eq!(actual.len(), expected.len());
120 for i in 0..expected.len() {
121 if i != 3 {
122 assert!(matches!(&actual[i], Ok(frame) if expected[i].eq(frame)));
123 } else {
124 assert!(matches!(actual[i], Err(SessionError::FrameDiscarded(4))))
125 }
126 }
127
128 Ok(())
129 }
130
131 #[test_log::test(tokio::test)]
132 async fn test_segmenter_reconstructor_should_work_together() -> anyhow::Result<()> {
133 const DATA_SIZE: usize = 9001;
134 const FRAME_SIZE: usize = 1500;
135
136 const MTU: usize = 1000;
137
138 let (reassm_tx, reassm_rx) = futures::channel::mpsc::unbounded();
139
140 let data_out = reassm_rx
141 .reassembler(Duration::from_secs(1), 1024)
142 .filter_map(|maybe_frame| match maybe_frame {
143 Ok(frame) => futures::future::ready(Some(OrderedFrame(frame))),
144 Err(_) => futures::future::ready(None),
145 })
146 .sequencer(Duration::from_secs(1), 1024)
147 .and_then(|frame| futures::future::ok(frame.0));
148
149 let mut data_in = reassm_tx
150 .sink_map_err(|_| SessionError::InvalidSegment)
151 .segmenter::<MTU>(FRAME_SIZE);
152
153 let data_written = hopr_types::crypto_random::random_bytes::<DATA_SIZE>();
154
155 let data_read = tokio::task::spawn(async move {
156 let mut frame_count = 0;
157 let mut out = Vec::new();
158 let data_out = data_out
159 .inspect(|frame| {
160 tracing::debug!("{:?}", frame);
161 frame_count += 1;
162 })
163 .map_err(std::io::Error::other)
164 .into_async_read();
165
166 pin_mut!(data_out);
167 data_out.read_to_end(&mut out).await?;
168 Ok::<_, std::io::Error>((out, frame_count))
169 })
170 .timeout(futures_time::time::Duration::from_secs(5));
171
172 data_in.write_all(&data_written).await?;
173 data_in.flush().await?;
174 data_in.close().await?;
175
176 let (data_read, frame_count) = data_read.await???;
177
178 assert_eq!(&data_written, data_read.as_slice());
179 assert_eq!(DATA_SIZE / FRAME_SIZE + 1, frame_count);
180
181 Ok(())
182 }
183}