Skip to main content

hopr_protocol_session/utils/
mod.rs

1pub mod skip_queue;
2
3#[cfg(test)]
4pub mod test;
5
6use std::{
7    cmp::Ordering,
8    sync::Arc,
9    time::{Duration, Instant},
10};
11
12use futures::StreamExt;
13use ringbuffer::{AllocRingBuffer, RingBuffer};
14
15use crate::{
16    errors::SessionError,
17    protocol::{FrameId, Segment, SeqIndicator, SeqNum},
18};
19
20#[derive(Debug)]
21pub(crate) struct RingBufferProducer<T>(futures::channel::mpsc::Sender<T>);
22
23impl<T> Clone for RingBufferProducer<T> {
24    fn clone(&self) -> Self {
25        Self(self.0.clone())
26    }
27}
28
29impl<T> RingBufferProducer<T> {
30    pub fn push(&mut self, item: T) -> bool {
31        self.0.try_send(item).is_ok()
32    }
33}
34
35#[derive(Debug)]
36pub(crate) struct RingBufferView<T>(Arc<parking_lot::FairMutex<AllocRingBuffer<T>>>);
37
38impl<T> Clone for RingBufferView<T> {
39    fn clone(&self) -> Self {
40        Self(self.0.clone())
41    }
42}
43
44impl<T: Clone> RingBufferView<T> {
45    pub fn find<F: FnMut(&T) -> bool>(&self, mut predicate: F) -> Vec<T> {
46        self.0.lock().iter().filter(|item| predicate(item)).cloned().collect()
47    }
48}
49
50pub(crate) fn searchable_ringbuffer<T: Send + Sync + 'static>(
51    capacity: usize,
52) -> (RingBufferProducer<T>, RingBufferView<T>) {
53    // The channel gets the same capacity as RB in case the task cannot keep up (due to locking)
54    let (rb_tx, rb_rx) = futures::channel::mpsc::channel(capacity);
55    let rb = Arc::new(parking_lot::FairMutex::new(AllocRingBuffer::new(capacity)));
56
57    let rb_clone = rb.clone();
58    hopr_utils::runtime::prelude::spawn(rb_rx.for_each(move |s| {
59        rb_clone.lock().enqueue(s);
60        futures::future::ready(())
61    }));
62
63    (RingBufferProducer(rb_tx), RingBufferView(rb))
64}
65
66const MAX_BACKOFF: Duration = Duration::from_secs(300);
67
68pub(crate) fn next_deadline_with_backoff(n: usize, base: f64, duration: Duration) -> Instant {
69    let backoff = duration.mul_f64(base.powi(n.min((i32::MAX / 2) as usize) as i32 + 1));
70    Instant::now() + backoff.min(MAX_BACKOFF)
71}
72
73#[derive(Debug, Copy, Clone, Eq)]
74pub(crate) struct RetriedFrameId {
75    pub frame_id: FrameId,
76    pub retry_count: usize,
77    max_retries: usize,
78}
79
80impl RetriedFrameId {
81    pub fn no_retries(frame_id: FrameId) -> Self {
82        Self {
83            frame_id,
84            retry_count: 1,
85            max_retries: 1,
86        }
87    }
88
89    pub fn with_retries(frame_id: FrameId, max_retries: usize) -> Self {
90        Self {
91            frame_id,
92            retry_count: 1,
93            max_retries,
94        }
95    }
96
97    pub fn next(self) -> Option<Self> {
98        if self.retry_count < self.max_retries {
99            Some(Self {
100                frame_id: self.frame_id,
101                retry_count: self.retry_count + 1,
102                max_retries: self.max_retries,
103            })
104        } else {
105            None
106        }
107    }
108}
109
110impl PartialEq<Self> for RetriedFrameId {
111    fn eq(&self, other: &Self) -> bool {
112        self.frame_id.eq(&other.frame_id)
113    }
114}
115
116impl PartialOrd<Self> for RetriedFrameId {
117    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
118        Some(self.cmp(other))
119    }
120}
121
122impl Ord for RetriedFrameId {
123    fn cmp(&self, other: &Self) -> Ordering {
124        self.frame_id.cmp(&other.frame_id)
125    }
126}
127
128/// Helper function to segment `data` into segments of a given ` max_segment_size ` length.
129/// All segments are tagged with the same `frame_id` and output into the given ` segments ` buffer.
130pub fn segment_into<T: AsRef<[u8]>, E: Extend<Segment>>(
131    data: T,
132    max_segment_size: usize,
133    frame_id: FrameId,
134    segments: &mut E,
135) -> crate::errors::Result<()> {
136    if frame_id == 0 {
137        return Err(SessionError::InvalidFrameId);
138    }
139
140    if max_segment_size == 0 {
141        return Err(SessionError::IncorrectMessageLength);
142    }
143
144    let data = data.as_ref();
145
146    let num_chunks = data.len().div_ceil(max_segment_size);
147    if num_chunks > SeqNum::MAX as usize {
148        return Err(SessionError::DataTooLong);
149    }
150
151    let chunks = data.chunks(max_segment_size);
152
153    let seq_len = SeqIndicator::try_from(chunks.len() as SeqNum)?;
154    segments.extend(chunks.enumerate().map(|(idx, data)| Segment {
155        frame_id,
156        seq_flags: seq_len,
157        seq_idx: idx as u8,
158        data: data.into(),
159    }));
160
161    Ok(())
162}
163
164/// Convenience wrapper for [`segment_into`] that allocates its own output buffer and returns it.
165#[allow(unused)]
166pub fn segment<T: AsRef<[u8]>>(data: T, max_segment_size: usize, frame_id: u32) -> crate::errors::Result<Vec<Segment>> {
167    let mut out = Vec::with_capacity(data.as_ref().len().div_ceil(max_segment_size));
168    segment_into(data, max_segment_size, frame_id, &mut out)?;
169    Ok(out)
170}
171
172#[cfg(test)]
173mod tests {
174    use hex_literal::hex;
175
176    use super::*;
177
178    #[test]
179    fn segment_should_split_data_correctly() -> anyhow::Result<()> {
180        let data = hex!("deadbeefcafebabe");
181
182        let segments = segment(data, 3, 1)?;
183        assert_eq!(3, segments.len());
184
185        assert_eq!(hex!("deadbe"), segments[0].data.as_ref());
186        assert_eq!(0, segments[0].seq_idx);
187        assert_eq!(3, segments[0].seq_flags.seq_len());
188        assert_eq!(1, segments[0].frame_id);
189
190        assert_eq!(hex!("efcafe"), segments[1].data.as_ref());
191        assert_eq!(1, segments[1].seq_idx);
192        assert_eq!(3, segments[1].seq_flags.seq_len());
193        assert_eq!(1, segments[1].frame_id);
194
195        assert_eq!(hex!("babe"), segments[2].data.as_ref());
196        assert_eq!(2, segments[2].seq_idx);
197        assert_eq!(3, segments[2].seq_flags.seq_len());
198        assert_eq!(1, segments[2].frame_id);
199
200        Ok(())
201    }
202}