hopr_protocol_session/utils/
mod.rs1pub mod skip_queue;
2
3#[cfg(test)]
4pub mod test;
5
6use std::{
7 cmp::Ordering,
8 sync::Arc,
9 time::{Duration, Instant},
10};
11
12use futures::StreamExt;
13use ringbuffer::{AllocRingBuffer, RingBuffer};
14
15use crate::{
16 errors::SessionError,
17 protocol::{FrameId, Segment, SeqIndicator, SeqNum},
18};
19
20#[derive(Debug)]
21pub(crate) struct RingBufferProducer<T>(futures::channel::mpsc::Sender<T>);
22
23impl<T> Clone for RingBufferProducer<T> {
24 fn clone(&self) -> Self {
25 Self(self.0.clone())
26 }
27}
28
29impl<T> RingBufferProducer<T> {
30 pub fn push(&mut self, item: T) -> bool {
31 self.0.try_send(item).is_ok()
32 }
33}
34
35#[derive(Debug)]
36pub(crate) struct RingBufferView<T>(Arc<parking_lot::FairMutex<AllocRingBuffer<T>>>);
37
38impl<T> Clone for RingBufferView<T> {
39 fn clone(&self) -> Self {
40 Self(self.0.clone())
41 }
42}
43
44impl<T: Clone> RingBufferView<T> {
45 pub fn find<F: FnMut(&T) -> bool>(&self, mut predicate: F) -> Vec<T> {
46 self.0.lock().iter().filter(|item| predicate(item)).cloned().collect()
47 }
48}
49
50pub(crate) fn searchable_ringbuffer<T: Send + Sync + 'static>(
51 capacity: usize,
52) -> (RingBufferProducer<T>, RingBufferView<T>) {
53 let (rb_tx, rb_rx) = futures::channel::mpsc::channel(capacity);
55 let rb = Arc::new(parking_lot::FairMutex::new(AllocRingBuffer::new(capacity)));
56
57 let rb_clone = rb.clone();
58 hopr_utils::runtime::prelude::spawn(rb_rx.for_each(move |s| {
59 rb_clone.lock().enqueue(s);
60 futures::future::ready(())
61 }));
62
63 (RingBufferProducer(rb_tx), RingBufferView(rb))
64}
65
66const MAX_BACKOFF: Duration = Duration::from_secs(300);
67
68pub(crate) fn next_deadline_with_backoff(n: usize, base: f64, duration: Duration) -> Instant {
69 let backoff = duration.mul_f64(base.powi(n.min((i32::MAX / 2) as usize) as i32 + 1));
70 Instant::now() + backoff.min(MAX_BACKOFF)
71}
72
73#[derive(Debug, Copy, Clone, Eq)]
74pub(crate) struct RetriedFrameId {
75 pub frame_id: FrameId,
76 pub retry_count: usize,
77 max_retries: usize,
78}
79
80impl RetriedFrameId {
81 pub fn no_retries(frame_id: FrameId) -> Self {
82 Self {
83 frame_id,
84 retry_count: 1,
85 max_retries: 1,
86 }
87 }
88
89 pub fn with_retries(frame_id: FrameId, max_retries: usize) -> Self {
90 Self {
91 frame_id,
92 retry_count: 1,
93 max_retries,
94 }
95 }
96
97 pub fn next(self) -> Option<Self> {
98 if self.retry_count < self.max_retries {
99 Some(Self {
100 frame_id: self.frame_id,
101 retry_count: self.retry_count + 1,
102 max_retries: self.max_retries,
103 })
104 } else {
105 None
106 }
107 }
108}
109
110impl PartialEq<Self> for RetriedFrameId {
111 fn eq(&self, other: &Self) -> bool {
112 self.frame_id.eq(&other.frame_id)
113 }
114}
115
116impl PartialOrd<Self> for RetriedFrameId {
117 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
118 Some(self.cmp(other))
119 }
120}
121
122impl Ord for RetriedFrameId {
123 fn cmp(&self, other: &Self) -> Ordering {
124 self.frame_id.cmp(&other.frame_id)
125 }
126}
127
128pub fn segment_into<T: AsRef<[u8]>, E: Extend<Segment>>(
131 data: T,
132 max_segment_size: usize,
133 frame_id: FrameId,
134 segments: &mut E,
135) -> crate::errors::Result<()> {
136 if frame_id == 0 {
137 return Err(SessionError::InvalidFrameId);
138 }
139
140 if max_segment_size == 0 {
141 return Err(SessionError::IncorrectMessageLength);
142 }
143
144 let data = data.as_ref();
145
146 let num_chunks = data.len().div_ceil(max_segment_size);
147 if num_chunks > SeqNum::MAX as usize {
148 return Err(SessionError::DataTooLong);
149 }
150
151 let chunks = data.chunks(max_segment_size);
152
153 let seq_len = SeqIndicator::try_from(chunks.len() as SeqNum)?;
154 segments.extend(chunks.enumerate().map(|(idx, data)| Segment {
155 frame_id,
156 seq_flags: seq_len,
157 seq_idx: idx as u8,
158 data: data.into(),
159 }));
160
161 Ok(())
162}
163
164#[allow(unused)]
166pub fn segment<T: AsRef<[u8]>>(data: T, max_segment_size: usize, frame_id: u32) -> crate::errors::Result<Vec<Segment>> {
167 let mut out = Vec::with_capacity(data.as_ref().len().div_ceil(max_segment_size));
168 segment_into(data, max_segment_size, frame_id, &mut out)?;
169 Ok(out)
170}
171
172#[cfg(test)]
173mod tests {
174 use hex_literal::hex;
175
176 use super::*;
177
178 #[test]
179 fn segment_should_split_data_correctly() -> anyhow::Result<()> {
180 let data = hex!("deadbeefcafebabe");
181
182 let segments = segment(data, 3, 1)?;
183 assert_eq!(3, segments.len());
184
185 assert_eq!(hex!("deadbe"), segments[0].data.as_ref());
186 assert_eq!(0, segments[0].seq_idx);
187 assert_eq!(3, segments[0].seq_flags.seq_len());
188 assert_eq!(1, segments[0].frame_id);
189
190 assert_eq!(hex!("efcafe"), segments[1].data.as_ref());
191 assert_eq!(1, segments[1].seq_idx);
192 assert_eq!(3, segments[1].seq_flags.seq_len());
193 assert_eq!(1, segments[1].frame_id);
194
195 assert_eq!(hex!("babe"), segments[2].data.as_ref());
196 assert_eq!(2, segments[2].seq_idx);
197 assert_eq!(3, segments[2].seq_flags.seq_len());
198 assert_eq!(1, segments[2].frame_id);
199
200 Ok(())
201 }
202}