Skip to main content

hopr_protocol_session/processing/
types.rs

1use std::time::Instant;
2
3use crate::{
4    errors::SessionError,
5    protocol::{Frame, FrameId, MissingSegmentsBitmap, Segment, SeqNum},
6};
7
8/// A helper object that reassembles segments into frames.
9#[derive(Debug)]
10pub struct FrameBuilder {
11    segments: Vec<Option<Segment>>,
12    frame_id: FrameId,
13    seg_remaining: SeqNum,
14    recv_bytes: usize,
15    pub(crate) last_recv: Instant,
16    #[cfg(all(not(test), feature = "prometheus"))]
17    pub(crate) created: Instant,
18}
19
20impl From<Segment> for FrameBuilder {
21    fn from(value: Segment) -> Self {
22        let idx = value.seq_idx;
23        let mut ret = Self {
24            segments: vec![None; value.seq_flags.seq_len() as usize],
25            frame_id: value.frame_id,
26            seg_remaining: value.seq_flags.seq_len() - 1,
27            recv_bytes: value.data.len(),
28            last_recv: Instant::now(),
29            #[cfg(all(not(test), feature = "prometheus"))]
30            created: Instant::now(),
31        };
32
33        ret.segments[idx as usize] = Some(value);
34
35        ret
36    }
37}
38
39impl FrameBuilder {
40    /// Adds a segment to this frame.
41    ///
42    /// Fails if the segment is invalid for this frame.
43    pub fn add_segment(&mut self, segment: Segment) -> Result<(), SessionError> {
44        let idx = segment.seq_idx;
45        if segment.frame_id != self.frame_id
46            || idx as usize >= self.segments.len()
47            || segment.seq_flags.seq_len() as usize != self.segments.len()
48            || self.seg_remaining == 0
49            || self.segments[idx as usize].is_some()
50        {
51            return Err(SessionError::InvalidSegment);
52        }
53
54        self.recv_bytes += segment.data.len();
55        self.seg_remaining -= 1;
56        self.segments[idx as usize] = Some(segment);
57        self.last_recv = Instant::now();
58        Ok(())
59    }
60
61    /// Retrieves the bitmap of missing segments in this frame.
62    pub fn as_missing(&self) -> MissingSegmentsBitmap {
63        let mut ret = MissingSegmentsBitmap::ZERO;
64        self.segments
65            .iter()
66            .take(SeqNum::BITS as usize)
67            .enumerate()
68            .for_each(|(i, v)| ret.set(i, v.is_none()));
69        ret
70    }
71
72    /// Retrieves the frame's ID.
73    #[inline]
74    pub fn frame_id(&self) -> FrameId {
75        self.frame_id
76    }
77
78    /// Indicates if all segments of this frame were added.
79    #[inline]
80    pub fn is_complete(&self) -> bool {
81        self.seg_remaining == 0
82    }
83}
84
85impl TryFrom<FrameBuilder> for Frame {
86    type Error = SessionError;
87
88    fn try_from(value: FrameBuilder) -> Result<Self, Self::Error> {
89        // The Frame has the terminating flag set
90        // if any of its segments had the terminating indicator set
91        let mut is_terminating = false;
92        value
93            .segments
94            .into_iter()
95            .try_fold(Vec::with_capacity(value.recv_bytes), |mut acc, segment| match segment {
96                Some(segment) => {
97                    acc.extend_from_slice(&segment.data);
98                    is_terminating = is_terminating || segment.seq_flags.is_terminating();
99                    Ok(acc)
100                }
101                None => Err(SessionError::IncompleteFrame(value.frame_id)),
102            })
103            .map(|data| Frame {
104                frame_id: value.frame_id,
105                data: data.into_boxed_slice(),
106                is_terminating,
107            })
108    }
109}
110
111/// Allows inspecting incomplete frame buffer inside the Reassembler.
112// Must use only FrameDashMap, others cannot be reference-cloned
113#[derive(Clone, Debug)]
114pub struct FrameInspector(pub(crate) FrameDashMap);
115
116impl FrameInspector {
117    /// Indicates how many incomplete frames there could be per one complete/discarded frame.
118    pub const INCOMPLETE_FRAME_RATIO: usize = 2;
119
120    pub fn new(capacity: usize) -> Self {
121        Self(FrameDashMap::with_capacity(Self::INCOMPLETE_FRAME_RATIO * capacity + 1))
122    }
123
124    /// Returns a [`MissingSegmentsBitmap`] of missing segments in a frame.
125    pub fn missing_segments(&self, frame_id: &FrameId) -> Option<MissingSegmentsBitmap> {
126        self.0.0.get(frame_id).map(|f| f.as_missing())
127    }
128
129    /// Number of incomplete frames.
130    pub fn len(&self) -> usize {
131        self.0.len()
132    }
133
134    /// Checks if there are no incomplete frames.
135    pub fn is_empty(&self) -> bool {
136        self.0.is_empty()
137    }
138}
139
140/// Trait describing an occupied entry in a [`FrameMap`].
141///
142/// See [`FrameMap::entry`] for details.
143pub trait FrameMapOccupiedEntry {
144    /// Gets mutable reference to the builder assigned to this frame.
145    fn get_builder_mut(&mut self) -> &mut FrameBuilder;
146
147    #[allow(unused)]
148    fn frame_id(&self) -> &FrameId;
149
150    /// Removes the entry and returns the associated [`FrameBuilder`].
151    /// The `FrameBuilder` may or may not be [complete](FrameBuilder::is_complete).
152    fn finalize(self) -> FrameBuilder;
153}
154
155/// Trait describing a vacant entry in a [`FrameMap`].
156///
157/// See [`FrameMap::entry`] for details.
158pub trait FrameMapVacantEntry {
159    /// Insert a new [`FrameBuilder`] into the empty entry.
160    fn insert_builder(self, value: FrameBuilder);
161}
162
163#[derive(strum::EnumTryAs)]
164pub enum FrameMapEntry<O: FrameMapOccupiedEntry, V: FrameMapVacantEntry> {
165    Occupied(O),
166    Vacant(V),
167}
168
169/// An abstraction of a Hash Map, suitable for reassembling frames.
170pub trait FrameMap {
171    type ExistingEntry<'a>: FrameMapOccupiedEntry
172    where
173        Self: 'a;
174    type VacantEntry<'a>: FrameMapVacantEntry
175    where
176        Self: 'a;
177
178    /// Creates a new map with the given capacity.
179    fn with_capacity(capacity: usize) -> Self;
180
181    /// Returns the [`FrameMapEntry`] API for a vacant or existing entry.
182    fn entry(&mut self, frame_id: FrameId) -> FrameMapEntry<Self::ExistingEntry<'_>, Self::VacantEntry<'_>>;
183
184    /// Number of elements in the map.
185    fn len(&self) -> usize;
186
187    /// Indicates whether the map is empty.
188    ///
189    /// Defaults to check if the [`FrameMap::len`] is 0.
190    fn is_empty(&self) -> bool {
191        self.len() == 0
192    }
193
194    /// Removes all elements from the map, for which the given predicate evaluates to `false`.
195    fn retain(&mut self, f: impl FnMut(&FrameId, &mut FrameBuilder) -> bool);
196}
197
198impl FrameMapOccupiedEntry for dashmap::OccupiedEntry<'_, FrameId, FrameBuilder> {
199    fn get_builder_mut(&mut self) -> &mut FrameBuilder {
200        self.get_mut()
201    }
202
203    fn frame_id(&self) -> &FrameId {
204        self.key()
205    }
206
207    fn finalize(self) -> FrameBuilder {
208        self.remove()
209    }
210}
211
212impl FrameMapVacantEntry for dashmap::VacantEntry<'_, FrameId, FrameBuilder> {
213    fn insert_builder(self, value: FrameBuilder) {
214        self.insert(value);
215    }
216}
217
218/// A [`FrameMap`] implementation using reference-counted `dashmap::DashMap` as a backend.
219#[derive(Clone, Debug)]
220pub struct FrameDashMap(std::sync::Arc<dashmap::DashMap<FrameId, FrameBuilder>>);
221
222impl FrameMap for FrameDashMap {
223    type ExistingEntry<'a> = dashmap::OccupiedEntry<'a, FrameId, FrameBuilder>;
224    type VacantEntry<'a> = dashmap::VacantEntry<'a, FrameId, FrameBuilder>;
225
226    fn with_capacity(capacity: usize) -> Self {
227        Self(std::sync::Arc::new(dashmap::DashMap::with_capacity(capacity)))
228    }
229
230    fn entry(&mut self, frame_id: FrameId) -> FrameMapEntry<Self::ExistingEntry<'_>, Self::VacantEntry<'_>> {
231        match self.0.entry(frame_id) {
232            dashmap::Entry::Occupied(e) => FrameMapEntry::Occupied(e),
233            dashmap::Entry::Vacant(v) => FrameMapEntry::Vacant(v),
234        }
235    }
236
237    fn len(&self) -> usize {
238        self.0.len()
239    }
240
241    fn is_empty(&self) -> bool {
242        self.0.is_empty()
243    }
244
245    fn retain(&mut self, f: impl FnMut(&FrameId, &mut FrameBuilder) -> bool) {
246        self.0.retain(f)
247    }
248}
249
250/// A [`FrameMap`] implementation using `std::collections::HashMap` as backend.
251#[cfg(not(feature = "hashbrown"))]
252pub struct FrameHashMap(std::collections::HashMap<FrameId, FrameBuilder>);
253
254#[cfg(not(feature = "hashbrown"))]
255impl FrameMapOccupiedEntry for std::collections::hash_map::OccupiedEntry<'_, FrameId, FrameBuilder> {
256    fn get_builder_mut(&mut self) -> &mut FrameBuilder {
257        self.get_mut()
258    }
259
260    fn frame_id(&self) -> &FrameId {
261        self.key()
262    }
263
264    fn finalize(self) -> FrameBuilder {
265        self.remove()
266    }
267}
268
269#[cfg(not(feature = "hashbrown"))]
270impl FrameMapVacantEntry for std::collections::hash_map::VacantEntry<'_, FrameId, FrameBuilder> {
271    fn insert_builder(self, value: FrameBuilder) {
272        self.insert(value);
273    }
274}
275
276#[cfg(not(feature = "hashbrown"))]
277impl FrameMap for FrameHashMap {
278    type ExistingEntry<'a> = std::collections::hash_map::OccupiedEntry<'a, FrameId, FrameBuilder>;
279    type VacantEntry<'a> = std::collections::hash_map::VacantEntry<'a, FrameId, FrameBuilder>;
280
281    fn with_capacity(capacity: usize) -> Self {
282        Self(std::collections::HashMap::with_capacity(capacity))
283    }
284
285    fn entry(&mut self, frame_id: FrameId) -> FrameMapEntry<Self::ExistingEntry<'_>, Self::VacantEntry<'_>> {
286        match self.0.entry(frame_id) {
287            std::collections::hash_map::Entry::Occupied(e) => FrameMapEntry::Occupied(e),
288            std::collections::hash_map::Entry::Vacant(v) => FrameMapEntry::Vacant(v),
289        }
290    }
291
292    fn len(&self) -> usize {
293        self.0.len()
294    }
295
296    fn is_empty(&self) -> bool {
297        self.0.is_empty()
298    }
299
300    fn retain(&mut self, f: impl FnMut(&FrameId, &mut FrameBuilder) -> bool) {
301        self.0.retain(f)
302    }
303}
304
305/// A [`FrameMap`] implementation using `hashbrown::HashMap` as backend.
306#[cfg(feature = "hashbrown")]
307pub struct FrameHashMap(hashbrown::HashMap<FrameId, FrameBuilder>);
308
309#[cfg(feature = "hashbrown")]
310impl FrameMapOccupiedEntry for hashbrown::hash_map::OccupiedEntry<'_, FrameId, FrameBuilder> {
311    fn get_builder_mut(&mut self) -> &mut FrameBuilder {
312        self.get_mut()
313    }
314
315    fn frame_id(&self) -> &FrameId {
316        self.key()
317    }
318
319    fn finalize(self) -> FrameBuilder {
320        self.remove()
321    }
322}
323
324#[cfg(feature = "hashbrown")]
325impl FrameMapVacantEntry for hashbrown::hash_map::VacantEntry<'_, FrameId, FrameBuilder> {
326    fn insert_builder(self, value: FrameBuilder) {
327        self.insert(value);
328    }
329}
330
331#[cfg(feature = "hashbrown")]
332impl FrameMap for FrameHashMap {
333    type ExistingEntry<'a> = hashbrown::hash_map::OccupiedEntry<'a, FrameId, FrameBuilder>;
334    type VacantEntry<'a> = hashbrown::hash_map::VacantEntry<'a, FrameId, FrameBuilder>;
335
336    fn with_capacity(capacity: usize) -> Self {
337        Self(hashbrown::HashMap::with_capacity(capacity))
338    }
339
340    fn entry(&mut self, frame_id: FrameId) -> FrameMapEntry<Self::ExistingEntry<'_>, Self::VacantEntry<'_>> {
341        match self.0.entry(frame_id) {
342            hashbrown::hash_map::Entry::Occupied(e) => FrameMapEntry::Occupied(e),
343            hashbrown::hash_map::Entry::Vacant(v) => FrameMapEntry::Vacant(v),
344        }
345    }
346
347    fn len(&self) -> usize {
348        self.0.len()
349    }
350
351    fn is_empty(&self) -> bool {
352        self.0.is_empty()
353    }
354
355    fn retain(&mut self, f: impl FnMut(&FrameId, &mut FrameBuilder) -> bool) {
356        self.0.retain(f)
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363    use crate::protocol::SeqIndicator;
364
365    #[test]
366    fn frame_builder_should_return_ordered_segments() -> anyhow::Result<()> {
367        let mut fb = FrameBuilder::from(Segment {
368            frame_id: 1,
369            seq_idx: 1,
370            seq_flags: 3.try_into()?,
371            data: (*b" new ").into(),
372        });
373
374        fb.add_segment(Segment {
375            frame_id: 1,
376            seq_idx: 2,
377            seq_flags: 3.try_into()?,
378            data: (*b"world").into(),
379        })?;
380
381        fb.add_segment(Segment {
382            frame_id: 1,
383            seq_idx: 0,
384            seq_flags: 3.try_into()?,
385            data: (*b"hello").into(),
386        })?;
387
388        assert!(fb.is_complete());
389        assert_eq!(MissingSegmentsBitmap::ZERO, fb.as_missing());
390
391        let reassembled: Frame = fb.try_into()?;
392        assert_eq!(1, reassembled.frame_id);
393        assert_eq!(b"hello new world", reassembled.data.as_ref());
394
395        Ok(())
396    }
397
398    #[test]
399    fn frame_builder_should_correctly_mark_terminating_flag() -> anyhow::Result<()> {
400        let mut fb = FrameBuilder::from(Segment {
401            frame_id: 1,
402            seq_idx: 1,
403            seq_flags: 3.try_into()?,
404            data: (*b" new ").into(),
405        });
406
407        fb.add_segment(Segment {
408            frame_id: 1,
409            seq_idx: 2,
410            seq_flags: SeqIndicator::new_with_flags(3, true),
411            data: (*b"world").into(),
412        })?;
413
414        fb.add_segment(Segment {
415            frame_id: 1,
416            seq_idx: 0,
417            seq_flags: 3.try_into()?,
418            data: (*b"hello").into(),
419        })?;
420
421        assert!(fb.is_complete());
422        assert_eq!(MissingSegmentsBitmap::ZERO, fb.as_missing());
423
424        let reassembled: Frame = fb.try_into()?;
425        assert_eq!(1, reassembled.frame_id);
426        assert_eq!(b"hello new world", reassembled.data.as_ref());
427        assert!(reassembled.is_terminating);
428
429        Ok(())
430    }
431
432    #[test]
433    fn frame_builder_should_correctly_allow_empty_segments() -> anyhow::Result<()> {
434        let mut fb = FrameBuilder::from(Segment {
435            frame_id: 1,
436            seq_idx: 1,
437            seq_flags: 4.try_into()?,
438            data: (*b" new ").into(),
439        });
440
441        fb.add_segment(Segment {
442            frame_id: 1,
443            seq_idx: 2,
444            seq_flags: 4.try_into()?,
445            data: (*b"world").into(),
446        })?;
447
448        fb.add_segment(Segment {
449            frame_id: 1,
450            seq_idx: 0,
451            seq_flags: 4.try_into()?,
452            data: (*b"hello").into(),
453        })?;
454
455        fb.add_segment(Segment {
456            frame_id: 1,
457            seq_idx: 3,
458            seq_flags: SeqIndicator::new_with_flags(4, true),
459            data: Box::new([]),
460        })?;
461
462        assert!(fb.is_complete());
463        assert_eq!(MissingSegmentsBitmap::ZERO, fb.as_missing());
464
465        let reassembled: Frame = fb.try_into()?;
466        assert_eq!(1, reassembled.frame_id);
467        assert_eq!(b"hello new world", reassembled.data.as_ref());
468        assert!(reassembled.is_terminating);
469
470        Ok(())
471    }
472
473    #[test]
474    fn frame_builder_should_not_accept_invalid_segments() -> anyhow::Result<()> {
475        let mut fb = FrameBuilder::from(Segment {
476            frame_id: 1,
477            seq_idx: 0,
478            seq_flags: 2.try_into()?,
479            data: (*b"hello world").into(),
480        });
481
482        fb.add_segment(Segment {
483            frame_id: 1,
484            seq_idx: 3,
485            seq_flags: 2.try_into()?,
486            data: (*b"foo").into(),
487        })
488        .expect_err("should not accept invalid segment");
489
490        fb.add_segment(Segment {
491            frame_id: 2,
492            seq_idx: 1,
493            seq_flags: 2.try_into()?,
494            data: (*b"foo").into(),
495        })
496        .expect_err("should not accept segment from another frame");
497
498        fb.add_segment(Segment {
499            frame_id: 1,
500            seq_idx: 1,
501            seq_flags: 3.try_into()?,
502            data: (*b"foo").into(),
503        })
504        .expect_err("should not accept invalid segment");
505
506        Ok(())
507    }
508
509    #[test]
510    fn frame_builder_should_not_accept_segments_when_complete() -> anyhow::Result<()> {
511        let mut fb = FrameBuilder::from(Segment {
512            frame_id: 1,
513            seq_idx: 0,
514            seq_flags: 1.try_into()?,
515            data: (*b"hello world").into(),
516        });
517
518        fb.add_segment(Segment {
519            frame_id: 1,
520            seq_idx: 0,
521            seq_flags: 1.try_into()?,
522            data: (*b"foo").into(),
523        })
524        .expect_err("should not accept segment when complete");
525
526        Ok(())
527    }
528
529    #[test]
530    fn frame_builder_should_not_accept_duplicate_segment() -> anyhow::Result<()> {
531        let mut fb = FrameBuilder::from(Segment {
532            frame_id: 1,
533            seq_idx: 0,
534            seq_flags: 2.try_into()?,
535            data: (*b"hello world").into(),
536        });
537
538        fb.add_segment(Segment {
539            frame_id: 1,
540            seq_idx: 0,
541            seq_flags: 2.try_into()?,
542            data: (*b"foo").into(),
543        })
544        .expect_err("should not accept duplicate segment");
545
546        Ok(())
547    }
548}