1use std::time::Instant;
2
3use crate::{
4 errors::SessionError,
5 protocol::{Frame, FrameId, MissingSegmentsBitmap, Segment, SeqNum},
6};
7
8#[derive(Debug)]
10pub struct FrameBuilder {
11 segments: Vec<Option<Segment>>,
12 frame_id: FrameId,
13 seg_remaining: SeqNum,
14 recv_bytes: usize,
15 pub(crate) last_recv: Instant,
16 #[cfg(all(not(test), feature = "prometheus"))]
17 pub(crate) created: Instant,
18}
19
20impl From<Segment> for FrameBuilder {
21 fn from(value: Segment) -> Self {
22 let idx = value.seq_idx;
23 let mut ret = Self {
24 segments: vec![None; value.seq_flags.seq_len() as usize],
25 frame_id: value.frame_id,
26 seg_remaining: value.seq_flags.seq_len() - 1,
27 recv_bytes: value.data.len(),
28 last_recv: Instant::now(),
29 #[cfg(all(not(test), feature = "prometheus"))]
30 created: Instant::now(),
31 };
32
33 ret.segments[idx as usize] = Some(value);
34
35 ret
36 }
37}
38
39impl FrameBuilder {
40 pub fn add_segment(&mut self, segment: Segment) -> Result<(), SessionError> {
44 let idx = segment.seq_idx;
45 if segment.frame_id != self.frame_id
46 || idx as usize >= self.segments.len()
47 || segment.seq_flags.seq_len() as usize != self.segments.len()
48 || self.seg_remaining == 0
49 || self.segments[idx as usize].is_some()
50 {
51 return Err(SessionError::InvalidSegment);
52 }
53
54 self.recv_bytes += segment.data.len();
55 self.seg_remaining -= 1;
56 self.segments[idx as usize] = Some(segment);
57 self.last_recv = Instant::now();
58 Ok(())
59 }
60
61 pub fn as_missing(&self) -> MissingSegmentsBitmap {
63 let mut ret = MissingSegmentsBitmap::ZERO;
64 self.segments
65 .iter()
66 .take(SeqNum::BITS as usize)
67 .enumerate()
68 .for_each(|(i, v)| ret.set(i, v.is_none()));
69 ret
70 }
71
72 #[inline]
74 pub fn frame_id(&self) -> FrameId {
75 self.frame_id
76 }
77
78 #[inline]
80 pub fn is_complete(&self) -> bool {
81 self.seg_remaining == 0
82 }
83}
84
85impl TryFrom<FrameBuilder> for Frame {
86 type Error = SessionError;
87
88 fn try_from(value: FrameBuilder) -> Result<Self, Self::Error> {
89 let mut is_terminating = false;
92 value
93 .segments
94 .into_iter()
95 .try_fold(Vec::with_capacity(value.recv_bytes), |mut acc, segment| match segment {
96 Some(segment) => {
97 acc.extend_from_slice(&segment.data);
98 is_terminating = is_terminating || segment.seq_flags.is_terminating();
99 Ok(acc)
100 }
101 None => Err(SessionError::IncompleteFrame(value.frame_id)),
102 })
103 .map(|data| Frame {
104 frame_id: value.frame_id,
105 data: data.into_boxed_slice(),
106 is_terminating,
107 })
108 }
109}
110
111#[derive(Clone, Debug)]
114pub struct FrameInspector(pub(crate) FrameDashMap);
115
116impl FrameInspector {
117 pub const INCOMPLETE_FRAME_RATIO: usize = 2;
119
120 pub fn new(capacity: usize) -> Self {
121 Self(FrameDashMap::with_capacity(Self::INCOMPLETE_FRAME_RATIO * capacity + 1))
122 }
123
124 pub fn missing_segments(&self, frame_id: &FrameId) -> Option<MissingSegmentsBitmap> {
126 self.0.0.get(frame_id).map(|f| f.as_missing())
127 }
128
129 pub fn len(&self) -> usize {
131 self.0.len()
132 }
133
134 pub fn is_empty(&self) -> bool {
136 self.0.is_empty()
137 }
138}
139
140pub trait FrameMapOccupiedEntry {
144 fn get_builder_mut(&mut self) -> &mut FrameBuilder;
146
147 #[allow(unused)]
148 fn frame_id(&self) -> &FrameId;
149
150 fn finalize(self) -> FrameBuilder;
153}
154
155pub trait FrameMapVacantEntry {
159 fn insert_builder(self, value: FrameBuilder);
161}
162
163#[derive(strum::EnumTryAs)]
164pub enum FrameMapEntry<O: FrameMapOccupiedEntry, V: FrameMapVacantEntry> {
165 Occupied(O),
166 Vacant(V),
167}
168
169pub trait FrameMap {
171 type ExistingEntry<'a>: FrameMapOccupiedEntry
172 where
173 Self: 'a;
174 type VacantEntry<'a>: FrameMapVacantEntry
175 where
176 Self: 'a;
177
178 fn with_capacity(capacity: usize) -> Self;
180
181 fn entry(&mut self, frame_id: FrameId) -> FrameMapEntry<Self::ExistingEntry<'_>, Self::VacantEntry<'_>>;
183
184 fn len(&self) -> usize;
186
187 fn is_empty(&self) -> bool {
191 self.len() == 0
192 }
193
194 fn retain(&mut self, f: impl FnMut(&FrameId, &mut FrameBuilder) -> bool);
196}
197
198impl FrameMapOccupiedEntry for dashmap::OccupiedEntry<'_, FrameId, FrameBuilder> {
199 fn get_builder_mut(&mut self) -> &mut FrameBuilder {
200 self.get_mut()
201 }
202
203 fn frame_id(&self) -> &FrameId {
204 self.key()
205 }
206
207 fn finalize(self) -> FrameBuilder {
208 self.remove()
209 }
210}
211
212impl FrameMapVacantEntry for dashmap::VacantEntry<'_, FrameId, FrameBuilder> {
213 fn insert_builder(self, value: FrameBuilder) {
214 self.insert(value);
215 }
216}
217
218#[derive(Clone, Debug)]
220pub struct FrameDashMap(std::sync::Arc<dashmap::DashMap<FrameId, FrameBuilder>>);
221
222impl FrameMap for FrameDashMap {
223 type ExistingEntry<'a> = dashmap::OccupiedEntry<'a, FrameId, FrameBuilder>;
224 type VacantEntry<'a> = dashmap::VacantEntry<'a, FrameId, FrameBuilder>;
225
226 fn with_capacity(capacity: usize) -> Self {
227 Self(std::sync::Arc::new(dashmap::DashMap::with_capacity(capacity)))
228 }
229
230 fn entry(&mut self, frame_id: FrameId) -> FrameMapEntry<Self::ExistingEntry<'_>, Self::VacantEntry<'_>> {
231 match self.0.entry(frame_id) {
232 dashmap::Entry::Occupied(e) => FrameMapEntry::Occupied(e),
233 dashmap::Entry::Vacant(v) => FrameMapEntry::Vacant(v),
234 }
235 }
236
237 fn len(&self) -> usize {
238 self.0.len()
239 }
240
241 fn is_empty(&self) -> bool {
242 self.0.is_empty()
243 }
244
245 fn retain(&mut self, f: impl FnMut(&FrameId, &mut FrameBuilder) -> bool) {
246 self.0.retain(f)
247 }
248}
249
250#[cfg(not(feature = "hashbrown"))]
252pub struct FrameHashMap(std::collections::HashMap<FrameId, FrameBuilder>);
253
254#[cfg(not(feature = "hashbrown"))]
255impl FrameMapOccupiedEntry for std::collections::hash_map::OccupiedEntry<'_, FrameId, FrameBuilder> {
256 fn get_builder_mut(&mut self) -> &mut FrameBuilder {
257 self.get_mut()
258 }
259
260 fn frame_id(&self) -> &FrameId {
261 self.key()
262 }
263
264 fn finalize(self) -> FrameBuilder {
265 self.remove()
266 }
267}
268
269#[cfg(not(feature = "hashbrown"))]
270impl FrameMapVacantEntry for std::collections::hash_map::VacantEntry<'_, FrameId, FrameBuilder> {
271 fn insert_builder(self, value: FrameBuilder) {
272 self.insert(value);
273 }
274}
275
276#[cfg(not(feature = "hashbrown"))]
277impl FrameMap for FrameHashMap {
278 type ExistingEntry<'a> = std::collections::hash_map::OccupiedEntry<'a, FrameId, FrameBuilder>;
279 type VacantEntry<'a> = std::collections::hash_map::VacantEntry<'a, FrameId, FrameBuilder>;
280
281 fn with_capacity(capacity: usize) -> Self {
282 Self(std::collections::HashMap::with_capacity(capacity))
283 }
284
285 fn entry(&mut self, frame_id: FrameId) -> FrameMapEntry<Self::ExistingEntry<'_>, Self::VacantEntry<'_>> {
286 match self.0.entry(frame_id) {
287 std::collections::hash_map::Entry::Occupied(e) => FrameMapEntry::Occupied(e),
288 std::collections::hash_map::Entry::Vacant(v) => FrameMapEntry::Vacant(v),
289 }
290 }
291
292 fn len(&self) -> usize {
293 self.0.len()
294 }
295
296 fn is_empty(&self) -> bool {
297 self.0.is_empty()
298 }
299
300 fn retain(&mut self, f: impl FnMut(&FrameId, &mut FrameBuilder) -> bool) {
301 self.0.retain(f)
302 }
303}
304
305#[cfg(feature = "hashbrown")]
307pub struct FrameHashMap(hashbrown::HashMap<FrameId, FrameBuilder>);
308
309#[cfg(feature = "hashbrown")]
310impl FrameMapOccupiedEntry for hashbrown::hash_map::OccupiedEntry<'_, FrameId, FrameBuilder> {
311 fn get_builder_mut(&mut self) -> &mut FrameBuilder {
312 self.get_mut()
313 }
314
315 fn frame_id(&self) -> &FrameId {
316 self.key()
317 }
318
319 fn finalize(self) -> FrameBuilder {
320 self.remove()
321 }
322}
323
324#[cfg(feature = "hashbrown")]
325impl FrameMapVacantEntry for hashbrown::hash_map::VacantEntry<'_, FrameId, FrameBuilder> {
326 fn insert_builder(self, value: FrameBuilder) {
327 self.insert(value);
328 }
329}
330
331#[cfg(feature = "hashbrown")]
332impl FrameMap for FrameHashMap {
333 type ExistingEntry<'a> = hashbrown::hash_map::OccupiedEntry<'a, FrameId, FrameBuilder>;
334 type VacantEntry<'a> = hashbrown::hash_map::VacantEntry<'a, FrameId, FrameBuilder>;
335
336 fn with_capacity(capacity: usize) -> Self {
337 Self(hashbrown::HashMap::with_capacity(capacity))
338 }
339
340 fn entry(&mut self, frame_id: FrameId) -> FrameMapEntry<Self::ExistingEntry<'_>, Self::VacantEntry<'_>> {
341 match self.0.entry(frame_id) {
342 hashbrown::hash_map::Entry::Occupied(e) => FrameMapEntry::Occupied(e),
343 hashbrown::hash_map::Entry::Vacant(v) => FrameMapEntry::Vacant(v),
344 }
345 }
346
347 fn len(&self) -> usize {
348 self.0.len()
349 }
350
351 fn is_empty(&self) -> bool {
352 self.0.is_empty()
353 }
354
355 fn retain(&mut self, f: impl FnMut(&FrameId, &mut FrameBuilder) -> bool) {
356 self.0.retain(f)
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363 use crate::protocol::SeqIndicator;
364
365 #[test]
366 fn frame_builder_should_return_ordered_segments() -> anyhow::Result<()> {
367 let mut fb = FrameBuilder::from(Segment {
368 frame_id: 1,
369 seq_idx: 1,
370 seq_flags: 3.try_into()?,
371 data: (*b" new ").into(),
372 });
373
374 fb.add_segment(Segment {
375 frame_id: 1,
376 seq_idx: 2,
377 seq_flags: 3.try_into()?,
378 data: (*b"world").into(),
379 })?;
380
381 fb.add_segment(Segment {
382 frame_id: 1,
383 seq_idx: 0,
384 seq_flags: 3.try_into()?,
385 data: (*b"hello").into(),
386 })?;
387
388 assert!(fb.is_complete());
389 assert_eq!(MissingSegmentsBitmap::ZERO, fb.as_missing());
390
391 let reassembled: Frame = fb.try_into()?;
392 assert_eq!(1, reassembled.frame_id);
393 assert_eq!(b"hello new world", reassembled.data.as_ref());
394
395 Ok(())
396 }
397
398 #[test]
399 fn frame_builder_should_correctly_mark_terminating_flag() -> anyhow::Result<()> {
400 let mut fb = FrameBuilder::from(Segment {
401 frame_id: 1,
402 seq_idx: 1,
403 seq_flags: 3.try_into()?,
404 data: (*b" new ").into(),
405 });
406
407 fb.add_segment(Segment {
408 frame_id: 1,
409 seq_idx: 2,
410 seq_flags: SeqIndicator::new_with_flags(3, true),
411 data: (*b"world").into(),
412 })?;
413
414 fb.add_segment(Segment {
415 frame_id: 1,
416 seq_idx: 0,
417 seq_flags: 3.try_into()?,
418 data: (*b"hello").into(),
419 })?;
420
421 assert!(fb.is_complete());
422 assert_eq!(MissingSegmentsBitmap::ZERO, fb.as_missing());
423
424 let reassembled: Frame = fb.try_into()?;
425 assert_eq!(1, reassembled.frame_id);
426 assert_eq!(b"hello new world", reassembled.data.as_ref());
427 assert!(reassembled.is_terminating);
428
429 Ok(())
430 }
431
432 #[test]
433 fn frame_builder_should_correctly_allow_empty_segments() -> anyhow::Result<()> {
434 let mut fb = FrameBuilder::from(Segment {
435 frame_id: 1,
436 seq_idx: 1,
437 seq_flags: 4.try_into()?,
438 data: (*b" new ").into(),
439 });
440
441 fb.add_segment(Segment {
442 frame_id: 1,
443 seq_idx: 2,
444 seq_flags: 4.try_into()?,
445 data: (*b"world").into(),
446 })?;
447
448 fb.add_segment(Segment {
449 frame_id: 1,
450 seq_idx: 0,
451 seq_flags: 4.try_into()?,
452 data: (*b"hello").into(),
453 })?;
454
455 fb.add_segment(Segment {
456 frame_id: 1,
457 seq_idx: 3,
458 seq_flags: SeqIndicator::new_with_flags(4, true),
459 data: Box::new([]),
460 })?;
461
462 assert!(fb.is_complete());
463 assert_eq!(MissingSegmentsBitmap::ZERO, fb.as_missing());
464
465 let reassembled: Frame = fb.try_into()?;
466 assert_eq!(1, reassembled.frame_id);
467 assert_eq!(b"hello new world", reassembled.data.as_ref());
468 assert!(reassembled.is_terminating);
469
470 Ok(())
471 }
472
473 #[test]
474 fn frame_builder_should_not_accept_invalid_segments() -> anyhow::Result<()> {
475 let mut fb = FrameBuilder::from(Segment {
476 frame_id: 1,
477 seq_idx: 0,
478 seq_flags: 2.try_into()?,
479 data: (*b"hello world").into(),
480 });
481
482 fb.add_segment(Segment {
483 frame_id: 1,
484 seq_idx: 3,
485 seq_flags: 2.try_into()?,
486 data: (*b"foo").into(),
487 })
488 .expect_err("should not accept invalid segment");
489
490 fb.add_segment(Segment {
491 frame_id: 2,
492 seq_idx: 1,
493 seq_flags: 2.try_into()?,
494 data: (*b"foo").into(),
495 })
496 .expect_err("should not accept segment from another frame");
497
498 fb.add_segment(Segment {
499 frame_id: 1,
500 seq_idx: 1,
501 seq_flags: 3.try_into()?,
502 data: (*b"foo").into(),
503 })
504 .expect_err("should not accept invalid segment");
505
506 Ok(())
507 }
508
509 #[test]
510 fn frame_builder_should_not_accept_segments_when_complete() -> anyhow::Result<()> {
511 let mut fb = FrameBuilder::from(Segment {
512 frame_id: 1,
513 seq_idx: 0,
514 seq_flags: 1.try_into()?,
515 data: (*b"hello world").into(),
516 });
517
518 fb.add_segment(Segment {
519 frame_id: 1,
520 seq_idx: 0,
521 seq_flags: 1.try_into()?,
522 data: (*b"foo").into(),
523 })
524 .expect_err("should not accept segment when complete");
525
526 Ok(())
527 }
528
529 #[test]
530 fn frame_builder_should_not_accept_duplicate_segment() -> anyhow::Result<()> {
531 let mut fb = FrameBuilder::from(Segment {
532 frame_id: 1,
533 seq_idx: 0,
534 seq_flags: 2.try_into()?,
535 data: (*b"hello world").into(),
536 });
537
538 fb.add_segment(Segment {
539 frame_id: 1,
540 seq_idx: 0,
541 seq_flags: 2.try_into()?,
542 data: (*b"foo").into(),
543 })
544 .expect_err("should not accept duplicate segment");
545
546 Ok(())
547 }
548}