hoprd_inbox/
ring.rs

1use crate::inbox::{InboxBackend, TimestampFn};
2use async_trait::async_trait;
3
4use ringbuffer::{AllocRingBuffer, RingBuffer};
5use std::collections::hash_map::Entry;
6use std::collections::HashMap;
7use std::hash::Hash;
8use std::time::Duration;
9
10/// Acts a simple wrapper of a message with added insertion timestamp.
11struct PayloadWrapper<M: std::marker::Send> {
12    payload: M,
13    ts: Duration,
14}
15
16/// Ring buffer based heap-allocated backend.
17///
18/// The capacity must be a power-of-two due to optimizations.
19/// Tags `T` must be represented by a type that's also a valid key for the `HashMap`
20pub struct RingBufferInboxBackend<T, M>
21where
22    T: Copy + Default + PartialEq + Eq + Hash + std::marker::Send + std::marker::Sync,
23    M: Clone + std::marker::Send + std::marker::Sync,
24{
25    buffers: HashMap<T, AllocRingBuffer<PayloadWrapper<M>>>,
26    capacity: usize,
27    ts: TimestampFn,
28}
29
30impl<T, M> RingBufferInboxBackend<T, M>
31where
32    T: Copy + Default + PartialEq + Eq + Hash + std::marker::Send + std::marker::Sync,
33    M: Clone + std::marker::Send + std::marker::Sync,
34{
35    /// Creates new backend with default timestamping function from std::time.
36    pub fn new(capacity: usize) -> Self {
37        Self::new_with_capacity(capacity, || {
38            hopr_platform::time::native::current_time()
39                .duration_since(std::time::UNIX_EPOCH)
40                .unwrap_or_default()
41        })
42    }
43
44    /// Counts only the untagged entries.
45    pub fn count_untagged(&self) -> usize {
46        self.buffers.get(&T::default()).map(|buf| buf.len()).unwrap_or(0)
47    }
48
49    fn tag_resolution(&mut self, tag: Option<T>) -> Option<T> {
50        let specific_tag = match tag {
51            // If no tag was given, we need to find a tag which has the oldest entry in it
52            None => self
53                .buffers
54                .iter()
55                .min_by(|(_, a), (_, b)| {
56                    // Compare timestamps of the oldest entries in buckets
57                    // Empty buckets are moved away
58                    let ts_a = a.peek().map(|w| w.ts).unwrap_or(Duration::MAX);
59                    let ts_b = b.peek().map(|w| w.ts).unwrap_or(Duration::MAX);
60                    ts_a.cmp(&ts_b)
61                })
62                .map(|(t, _)| *t)?,
63
64            // If a tag was given, just use it
65            Some(t) => t,
66        };
67
68        Some(specific_tag)
69    }
70}
71
72#[async_trait]
73impl<T, M> InboxBackend<T, M> for RingBufferInboxBackend<T, M>
74where
75    T: Copy + Default + PartialEq + Eq + Hash + std::marker::Send + std::marker::Sync,
76    M: Clone + std::marker::Send + std::marker::Sync,
77{
78    fn new_with_capacity(capacity: usize, ts: TimestampFn) -> Self {
79        assert!(capacity.is_power_of_two(), "capacity must be a power of two");
80        Self {
81            capacity,
82            buffers: HashMap::new(),
83            ts,
84        }
85    }
86
87    async fn push(&mut self, tag: Option<T>, payload: M) {
88        // Either use an existing ringbuffer or initialize a new one, if such tag does not exist yet.
89        match self.buffers.entry(tag.unwrap_or_default()) {
90            Entry::Occupied(mut e) => e.get_mut().push(PayloadWrapper {
91                payload,
92                ts: (self.ts)(),
93            }),
94            Entry::Vacant(e) => e.insert(AllocRingBuffer::new(self.capacity)).push(PayloadWrapper {
95                payload,
96                ts: (self.ts)(),
97            }),
98        }
99    }
100
101    async fn count(&self, tag: Option<T>) -> usize {
102        match tag {
103            // Count across all the tags
104            None => self.buffers.values().map(|buf| buf.len()).sum(),
105            // Count messages with a specific tag only
106            Some(specific_tag) => self.buffers.get(&specific_tag).map(|buf| buf.len()).unwrap_or(0),
107        }
108    }
109
110    async fn pop(&mut self, tag: Option<T>) -> Option<(M, Duration)> {
111        if let Some(specific_tag) = self.tag_resolution(tag) {
112            self.buffers
113                .get_mut(&specific_tag)
114                .and_then(|buf| buf.dequeue().map(|w| (w.payload, w.ts)))
115        } else {
116            None
117        }
118    }
119
120    async fn pop_all(&mut self, tag: Option<T>) -> Vec<(M, Duration)> {
121        match tag {
122            Some(specific_tag) => {
123                // Pop only all messages of a specific tag
124                self.buffers
125                    .get_mut(&specific_tag)
126                    .map(|buf| buf.drain().map(|w| (w.payload, w.ts)).collect::<Vec<_>>())
127                    .unwrap_or_default()
128            }
129            None => {
130                // Pop across all the tags, need to sort again based on the timestamp
131                let mut all = self
132                    .buffers
133                    .drain()
134                    .flat_map(|(_, buf)| buf.into_iter())
135                    .collect::<Vec<_>>();
136
137                // NOTE: this approach is due to the requirement of considering
138                // messages with equal payload and timestamp to be distinct
139                // If this requirement was relaxed, the drained entries could be collected into a BTreeSet.
140                all.sort_unstable_by(|a, b| a.ts.cmp(&b.ts));
141
142                all.into_iter().map(|w| (w.payload, w.ts)).collect()
143            }
144        }
145    }
146
147    async fn peek(&mut self, tag: Option<T>) -> Option<(M, Duration)> {
148        if let Some(specific_tag) = self.tag_resolution(tag) {
149            self.buffers
150                .get(&specific_tag)
151                .and_then(|buf| buf.peek().map(|w| (w.payload.clone(), w.ts)))
152        } else {
153            None
154        }
155    }
156
157    async fn peek_all(&mut self, tag: Option<T>, timestamp: Option<Duration>) -> Vec<(M, Duration)> {
158        let timestamp = timestamp.unwrap_or_default().as_millis();
159
160        match tag {
161            Some(specific_tag) => {
162                // Peek only all messages of a specific tag
163                self.buffers
164                    .get(&specific_tag)
165                    .map(|buf| {
166                        buf.iter()
167                            .filter(|&w| w.ts.as_millis() >= timestamp)
168                            .map(|w| (w.payload.clone(), w.ts))
169                            .collect::<Vec<_>>()
170                    })
171                    .unwrap_or_default()
172            }
173            None => {
174                // Peek across all the tags, need to sort again based on the timestamp
175                let mut all = self
176                    .buffers
177                    .iter()
178                    .flat_map(|(_, buf)| buf.iter().filter(|w| w.ts.as_millis() >= timestamp))
179                    .collect::<Vec<_>>();
180
181                // NOTE: this approach is due to the requirement of considering
182                // messages with equal payload and timestamp to be distinct
183                // If this requirement was relaxed, the drained entries could be collected into a BTreeSet.
184                all.sort_unstable_by(|a, b| a.ts.cmp(&b.ts));
185
186                all.into_iter().map(|w| (w.payload.clone(), w.ts)).collect()
187            }
188        }
189    }
190
191    async fn purge(&mut self, older_than: Duration) {
192        // Remove all the messages across all the tags, which do not satisfy the threshold
193        self.buffers.iter_mut().for_each(|(_, buf)| {
194            while buf.peek().map(|w| w.ts).unwrap_or(Duration::MAX) < older_than {
195                buf.dequeue();
196            }
197        });
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use crate::inbox::InboxBackend;
204    use crate::ring::RingBufferInboxBackend;
205    use std::ops::Add;
206    use std::time::Duration;
207
208    fn make_backend(capacity: usize) -> RingBufferInboxBackend<u16, i32> {
209        RingBufferInboxBackend::new_with_capacity(
210            capacity,
211            || {
212                std::time::SystemTime::now()
213                    .duration_since(std::time::UNIX_EPOCH)
214                    .expect("Time went backwards")
215                    .add(Duration::from_millis(1))
216            }, // for testing, ensure the timestamps are at least 5ms apart
217        )
218    }
219
220    #[async_std::test]
221    async fn test_push_pop_tag() {
222        let mut rb = make_backend(4);
223
224        rb.push(Some(1), 1).await;
225        rb.push(Some(1), 2).await;
226        rb.push(Some(1), 3).await;
227        rb.push(Some(1), 4).await;
228        rb.push(Some(1), 5).await;
229
230        rb.push(Some(10), 6).await;
231        rb.push(Some(11), 7).await;
232
233        rb.push(None, 0).await;
234
235        assert_eq!(4, rb.count(Some(1)).await);
236        assert_eq!(1, rb.count(Some(10)).await);
237        assert_eq!(1, rb.count(Some(11)).await);
238        assert_eq!(0, rb.count(Some(100)).await);
239        assert_eq!(0, rb.count(Some(23)).await);
240        assert_eq!(7, rb.count(None).await);
241        assert_eq!(1, rb.count_untagged());
242
243        assert_eq!(2, rb.pop(Some(1)).await.unwrap().0);
244        assert_eq!(3, rb.pop(Some(1)).await.unwrap().0);
245        assert_eq!(4, rb.pop(Some(1)).await.unwrap().0);
246        assert_eq!(5, rb.pop(Some(1)).await.unwrap().0);
247        assert_eq!(0, rb.count(Some(1)).await);
248        assert!(rb.pop(Some(1)).await.is_none());
249
250        assert!(rb.pop(Some(100)).await.is_none());
251        assert!(rb.pop(Some(23)).await.is_none());
252
253        assert_eq!(6, rb.pop(Some(10)).await.unwrap().0);
254        assert_eq!(0, rb.count(Some(10)).await);
255
256        assert_eq!(7, rb.pop(Some(11)).await.unwrap().0);
257        assert_eq!(0, rb.count(Some(11)).await);
258
259        assert_eq!(0, rb.pop(None).await.unwrap().0);
260        assert_eq!(0, rb.count_untagged());
261
262        assert_eq!(0, rb.count(None).await);
263
264        rb.push(None, 0).await;
265        rb.push(None, 0).await;
266        assert_eq!(2, rb.count_untagged());
267        assert_eq!(2, rb.count(None).await);
268    }
269
270    #[async_std::test]
271    async fn test_pop_all() {
272        let mut rb = make_backend(2);
273
274        rb.push(Some(1), 0).await;
275        rb.push(Some(1), 1).await;
276        rb.push(Some(1), 2).await;
277
278        rb.push(Some(2), 3).await;
279        rb.push(Some(2), 4).await;
280
281        rb.push(None, 5).await;
282
283        let mut popped = rb.pop_all(None).await.into_iter().map(|(d, _)| d).collect::<Vec<_>>();
284
285        // We don't really care about the order, since this could differ based on CPU timing
286        popped.sort();
287
288        assert_eq!(vec![1, 2, 3, 4, 5], popped);
289        assert_eq!(0, rb.count(None).await);
290        assert_eq!(0, rb.count_untagged());
291    }
292
293    #[async_std::test]
294    async fn test_pop_all_specific() {
295        let mut rb = make_backend(2);
296
297        rb.push(Some(1), 0).await;
298        rb.push(Some(1), 2).await;
299        rb.push(Some(1), 1).await;
300
301        rb.push(Some(2), 3).await;
302        rb.push(Some(2), 4).await;
303
304        rb.push(None, 5).await;
305
306        let mut popped = rb
307            .pop_all(Some(1))
308            .await
309            .into_iter()
310            .map(|(d, _)| d)
311            .collect::<Vec<_>>();
312
313        // We don't really care about the order, since this could differ based on CPU timing
314        popped.sort();
315
316        assert_eq!(vec![1, 2], popped);
317        assert_eq!(2, rb.count(Some(2)).await);
318        assert_eq!(3, rb.count(None).await);
319    }
320
321    #[async_std::test]
322    async fn test_pop_oldest() {
323        let mut rb = make_backend(2);
324
325        rb.push(Some(3), 10).await;
326
327        rb.push(Some(1), 1).await;
328        rb.push(Some(1), 2).await;
329
330        rb.push(Some(2), 3).await;
331        rb.push(Some(2), 4).await;
332
333        assert_eq!(5, rb.count(None).await);
334
335        assert_eq!(10, rb.pop(None).await.unwrap().0);
336
337        assert_eq!(0, rb.count(Some(3)).await);
338        assert_eq!(4, rb.count(None).await);
339
340        assert_eq!(1, rb.pop(None).await.unwrap().0);
341        assert_eq!(2, rb.pop(None).await.unwrap().0);
342
343        assert_eq!(0, rb.count(Some(1)).await);
344        assert_eq!(2, rb.count(Some(2)).await);
345
346        assert_eq!(3, rb.pop(None).await.unwrap().0);
347        assert_eq!(4, rb.pop(None).await.unwrap().0);
348
349        assert_eq!(0, rb.count(Some(2)).await);
350        assert_eq!(0, rb.count(None).await);
351    }
352
353    #[async_std::test]
354    async fn test_peek_oldest() {
355        let mut rb = make_backend(2);
356
357        rb.push(Some(3), 10).await;
358
359        rb.push(Some(1), 1).await;
360        rb.push(Some(1), 2).await;
361
362        rb.push(Some(2), 3).await;
363        rb.push(Some(2), 4).await;
364
365        assert_eq!(5, rb.count(None).await);
366
367        assert_eq!(10, rb.peek(None).await.unwrap().0);
368
369        assert_eq!(5, rb.count(None).await);
370    }
371
372    #[async_std::test]
373    async fn test_peek_oldest_specific() {
374        let mut rb = make_backend(2);
375
376        rb.push(Some(3), 10).await;
377
378        rb.push(Some(1), 1).await;
379        rb.push(Some(1), 2).await;
380
381        rb.push(Some(2), 3).await;
382        rb.push(Some(2), 4).await;
383
384        assert_eq!(5, rb.count(None).await);
385        assert_eq!(1, rb.count(Some(3)).await);
386        assert_eq!(2, rb.count(Some(1)).await);
387        assert_eq!(2, rb.count(Some(2)).await);
388
389        assert_eq!(10, rb.peek(Some(3)).await.unwrap().0);
390        assert_eq!(1, rb.peek(Some(1)).await.unwrap().0);
391        assert_eq!(3, rb.peek(Some(2)).await.unwrap().0);
392
393        assert_eq!(5, rb.count(None).await);
394        assert_eq!(1, rb.count(Some(3)).await);
395        assert_eq!(2, rb.count(Some(1)).await);
396        assert_eq!(2, rb.count(Some(2)).await);
397    }
398
399    #[async_std::test]
400    async fn test_peek_all() {
401        let mut rb = make_backend(4);
402
403        rb.push(Some(1), 0).await;
404        rb.push(Some(1), 2).await;
405        rb.push(Some(1), 1).await;
406
407        assert_eq!(
408            vec![0, 2, 1],
409            rb.peek_all(None, None)
410                .await
411                .into_iter()
412                .map(|(d, _)| d)
413                .collect::<Vec<_>>()
414        );
415        assert_eq!(
416            vec![0, 2, 1],
417            rb.peek_all(None, None)
418                .await
419                .into_iter()
420                .map(|(d, _)| d)
421                .collect::<Vec<_>>()
422        );
423        assert_eq!(3, rb.count(Some(1)).await);
424
425        rb.pop_all(None).await;
426    }
427
428    #[async_std::test]
429    async fn test_peek_all_specific() {
430        let mut rb = make_backend(4);
431
432        rb.push(Some(1), 0).await;
433        rb.push(Some(1), 2).await;
434        rb.push(Some(1), 1).await;
435
436        rb.push(Some(2), 3).await;
437        rb.push(Some(2), 4).await;
438
439        rb.push(None, 5).await;
440
441        assert_eq!(
442            vec![0, 2, 1],
443            rb.peek_all(Some(1), None)
444                .await
445                .into_iter()
446                .map(|(d, _)| d)
447                .collect::<Vec<_>>()
448        );
449        assert_eq!(3, rb.count(Some(1)).await);
450        assert_eq!(2, rb.count(Some(2)).await);
451        assert_eq!(6, rb.count(None).await);
452
453        rb.pop_all(None).await;
454    }
455
456    #[async_std::test]
457    async fn test_peek_all_with_timestamp() {
458        let mut rb = make_backend(8);
459
460        rb.push(Some(2), 0).await;
461        rb.push(Some(1), 0).await;
462        rb.push(Some(1), 1).await;
463        rb.push(Some(1), 2).await;
464        rb.push(Some(1), 3).await;
465        rb.push(Some(1), 4).await;
466        rb.push(Some(1), 5).await;
467
468        // sleep for 10ms to ensure a break between timestamps
469        async_std::task::sleep(Duration::from_millis(10)).await;
470        let close_past = std::time::SystemTime::now()
471            .duration_since(std::time::UNIX_EPOCH)
472            .unwrap();
473
474        rb.push(Some(1), 6).await;
475        rb.push(Some(1), 7).await;
476        rb.push(Some(2), 1).await;
477
478        assert_eq!(
479            vec![6, 7],
480            rb.peek_all(Some(1), Some(close_past))
481                .await
482                .into_iter()
483                .map(|(d, _)| d)
484                .collect::<Vec<_>>()
485        );
486        assert_eq!(
487            vec![1],
488            rb.peek_all(Some(2), Some(close_past))
489                .await
490                .into_iter()
491                .map(|(d, _)| d)
492                .collect::<Vec<_>>()
493        );
494        assert_eq!(
495            vec![0, 1, 2, 3, 4, 5, 6, 7],
496            rb.peek_all(Some(1), None)
497                .await
498                .into_iter()
499                .map(|(d, _)| d)
500                .collect::<Vec<_>>()
501        );
502
503        rb.pop_all(None).await;
504    }
505
506    #[async_std::test]
507    async fn test_purge() {
508        let mut rb = make_backend(8);
509
510        rb.push(None, 0).await;
511        rb.push(None, 1).await;
512        rb.push(None, 2).await;
513        rb.push(None, 3).await;
514
515        async_std::task::sleep(Duration::from_millis(100)).await;
516        let ts = std::time::SystemTime::now()
517            .duration_since(std::time::UNIX_EPOCH)
518            .unwrap();
519
520        rb.push(None, 4).await;
521        rb.push(None, 5).await;
522        rb.push(None, 6).await;
523        rb.push(None, 7).await;
524
525        assert_eq!(8, rb.count(None).await);
526
527        rb.purge(ts).await;
528
529        assert_eq!(4, rb.count(None).await);
530        assert_eq!(
531            vec![4, 5, 6, 7],
532            rb.pop_all(None).await.into_iter().map(|(d, _)| d).collect::<Vec<_>>()
533        );
534    }
535
536    #[async_std::test]
537    async fn test_duplicates() {
538        let mut rb = make_backend(4);
539
540        rb.push(None, 1).await;
541        rb.push(None, 0).await;
542        rb.push(None, 0).await;
543        rb.push(None, 0).await;
544        rb.push(None, 0).await;
545
546        rb.push(Some(1), 1).await;
547        rb.push(Some(1), 0).await;
548        rb.push(Some(1), 0).await;
549        rb.push(Some(1), 0).await;
550        rb.push(Some(1), 0).await;
551
552        assert_eq!(
553            vec![0, 0, 0, 0, 0, 0, 0, 0],
554            rb.pop_all(None).await.into_iter().map(|(d, _)| d).collect::<Vec<_>>()
555        );
556    }
557}