1use crate::inbox::{InboxBackend, TimestampFn};
2use async_trait::async_trait;
3
4use ringbuffer::{AllocRingBuffer, RingBuffer};
5use std::collections::hash_map::Entry;
6use std::collections::HashMap;
7use std::hash::Hash;
8use std::time::Duration;
9
10struct PayloadWrapper<M: std::marker::Send> {
12 payload: M,
13 ts: Duration,
14}
15
16pub struct RingBufferInboxBackend<T, M>
21where
22 T: Copy + Default + PartialEq + Eq + Hash + std::marker::Send + std::marker::Sync,
23 M: Clone + std::marker::Send + std::marker::Sync,
24{
25 buffers: HashMap<T, AllocRingBuffer<PayloadWrapper<M>>>,
26 capacity: usize,
27 ts: TimestampFn,
28}
29
30impl<T, M> RingBufferInboxBackend<T, M>
31where
32 T: Copy + Default + PartialEq + Eq + Hash + std::marker::Send + std::marker::Sync,
33 M: Clone + std::marker::Send + std::marker::Sync,
34{
35 pub fn new(capacity: usize) -> Self {
37 Self::new_with_capacity(capacity, || {
38 hopr_platform::time::native::current_time()
39 .duration_since(std::time::UNIX_EPOCH)
40 .unwrap_or_default()
41 })
42 }
43
44 pub fn count_untagged(&self) -> usize {
46 self.buffers.get(&T::default()).map(|buf| buf.len()).unwrap_or(0)
47 }
48
49 fn tag_resolution(&mut self, tag: Option<T>) -> Option<T> {
50 let specific_tag = match tag {
51 None => self
53 .buffers
54 .iter()
55 .min_by(|(_, a), (_, b)| {
56 let ts_a = a.peek().map(|w| w.ts).unwrap_or(Duration::MAX);
59 let ts_b = b.peek().map(|w| w.ts).unwrap_or(Duration::MAX);
60 ts_a.cmp(&ts_b)
61 })
62 .map(|(t, _)| *t)?,
63
64 Some(t) => t,
66 };
67
68 Some(specific_tag)
69 }
70}
71
72#[async_trait]
73impl<T, M> InboxBackend<T, M> for RingBufferInboxBackend<T, M>
74where
75 T: Copy + Default + PartialEq + Eq + Hash + std::marker::Send + std::marker::Sync,
76 M: Clone + std::marker::Send + std::marker::Sync,
77{
78 fn new_with_capacity(capacity: usize, ts: TimestampFn) -> Self {
79 assert!(capacity.is_power_of_two(), "capacity must be a power of two");
80 Self {
81 capacity,
82 buffers: HashMap::new(),
83 ts,
84 }
85 }
86
87 async fn push(&mut self, tag: Option<T>, payload: M) {
88 match self.buffers.entry(tag.unwrap_or_default()) {
90 Entry::Occupied(mut e) => e.get_mut().push(PayloadWrapper {
91 payload,
92 ts: (self.ts)(),
93 }),
94 Entry::Vacant(e) => e.insert(AllocRingBuffer::new(self.capacity)).push(PayloadWrapper {
95 payload,
96 ts: (self.ts)(),
97 }),
98 }
99 }
100
101 async fn count(&self, tag: Option<T>) -> usize {
102 match tag {
103 None => self.buffers.values().map(|buf| buf.len()).sum(),
105 Some(specific_tag) => self.buffers.get(&specific_tag).map(|buf| buf.len()).unwrap_or(0),
107 }
108 }
109
110 async fn pop(&mut self, tag: Option<T>) -> Option<(M, Duration)> {
111 if let Some(specific_tag) = self.tag_resolution(tag) {
112 self.buffers
113 .get_mut(&specific_tag)
114 .and_then(|buf| buf.dequeue().map(|w| (w.payload, w.ts)))
115 } else {
116 None
117 }
118 }
119
120 async fn pop_all(&mut self, tag: Option<T>) -> Vec<(M, Duration)> {
121 match tag {
122 Some(specific_tag) => {
123 self.buffers
125 .get_mut(&specific_tag)
126 .map(|buf| buf.drain().map(|w| (w.payload, w.ts)).collect::<Vec<_>>())
127 .unwrap_or_default()
128 }
129 None => {
130 let mut all = self
132 .buffers
133 .drain()
134 .flat_map(|(_, buf)| buf.into_iter())
135 .collect::<Vec<_>>();
136
137 all.sort_unstable_by(|a, b| a.ts.cmp(&b.ts));
141
142 all.into_iter().map(|w| (w.payload, w.ts)).collect()
143 }
144 }
145 }
146
147 async fn peek(&mut self, tag: Option<T>) -> Option<(M, Duration)> {
148 if let Some(specific_tag) = self.tag_resolution(tag) {
149 self.buffers
150 .get(&specific_tag)
151 .and_then(|buf| buf.peek().map(|w| (w.payload.clone(), w.ts)))
152 } else {
153 None
154 }
155 }
156
157 async fn peek_all(&mut self, tag: Option<T>, timestamp: Option<Duration>) -> Vec<(M, Duration)> {
158 let timestamp = timestamp.unwrap_or_default().as_millis();
159
160 match tag {
161 Some(specific_tag) => {
162 self.buffers
164 .get(&specific_tag)
165 .map(|buf| {
166 buf.iter()
167 .filter(|&w| w.ts.as_millis() >= timestamp)
168 .map(|w| (w.payload.clone(), w.ts))
169 .collect::<Vec<_>>()
170 })
171 .unwrap_or_default()
172 }
173 None => {
174 let mut all = self
176 .buffers
177 .iter()
178 .flat_map(|(_, buf)| buf.iter().filter(|w| w.ts.as_millis() >= timestamp))
179 .collect::<Vec<_>>();
180
181 all.sort_unstable_by(|a, b| a.ts.cmp(&b.ts));
185
186 all.into_iter().map(|w| (w.payload.clone(), w.ts)).collect()
187 }
188 }
189 }
190
191 async fn purge(&mut self, older_than: Duration) {
192 self.buffers.iter_mut().for_each(|(_, buf)| {
194 while buf.peek().map(|w| w.ts).unwrap_or(Duration::MAX) < older_than {
195 buf.dequeue();
196 }
197 });
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use crate::inbox::InboxBackend;
204 use crate::ring::RingBufferInboxBackend;
205 use std::ops::Add;
206 use std::time::Duration;
207
208 fn make_backend(capacity: usize) -> RingBufferInboxBackend<u16, i32> {
209 RingBufferInboxBackend::new_with_capacity(
210 capacity,
211 || {
212 std::time::SystemTime::now()
213 .duration_since(std::time::UNIX_EPOCH)
214 .expect("Time went backwards")
215 .add(Duration::from_millis(1))
216 }, )
218 }
219
220 #[async_std::test]
221 async fn test_push_pop_tag() {
222 let mut rb = make_backend(4);
223
224 rb.push(Some(1), 1).await;
225 rb.push(Some(1), 2).await;
226 rb.push(Some(1), 3).await;
227 rb.push(Some(1), 4).await;
228 rb.push(Some(1), 5).await;
229
230 rb.push(Some(10), 6).await;
231 rb.push(Some(11), 7).await;
232
233 rb.push(None, 0).await;
234
235 assert_eq!(4, rb.count(Some(1)).await);
236 assert_eq!(1, rb.count(Some(10)).await);
237 assert_eq!(1, rb.count(Some(11)).await);
238 assert_eq!(0, rb.count(Some(100)).await);
239 assert_eq!(0, rb.count(Some(23)).await);
240 assert_eq!(7, rb.count(None).await);
241 assert_eq!(1, rb.count_untagged());
242
243 assert_eq!(2, rb.pop(Some(1)).await.unwrap().0);
244 assert_eq!(3, rb.pop(Some(1)).await.unwrap().0);
245 assert_eq!(4, rb.pop(Some(1)).await.unwrap().0);
246 assert_eq!(5, rb.pop(Some(1)).await.unwrap().0);
247 assert_eq!(0, rb.count(Some(1)).await);
248 assert!(rb.pop(Some(1)).await.is_none());
249
250 assert!(rb.pop(Some(100)).await.is_none());
251 assert!(rb.pop(Some(23)).await.is_none());
252
253 assert_eq!(6, rb.pop(Some(10)).await.unwrap().0);
254 assert_eq!(0, rb.count(Some(10)).await);
255
256 assert_eq!(7, rb.pop(Some(11)).await.unwrap().0);
257 assert_eq!(0, rb.count(Some(11)).await);
258
259 assert_eq!(0, rb.pop(None).await.unwrap().0);
260 assert_eq!(0, rb.count_untagged());
261
262 assert_eq!(0, rb.count(None).await);
263
264 rb.push(None, 0).await;
265 rb.push(None, 0).await;
266 assert_eq!(2, rb.count_untagged());
267 assert_eq!(2, rb.count(None).await);
268 }
269
270 #[async_std::test]
271 async fn test_pop_all() {
272 let mut rb = make_backend(2);
273
274 rb.push(Some(1), 0).await;
275 rb.push(Some(1), 1).await;
276 rb.push(Some(1), 2).await;
277
278 rb.push(Some(2), 3).await;
279 rb.push(Some(2), 4).await;
280
281 rb.push(None, 5).await;
282
283 let mut popped = rb.pop_all(None).await.into_iter().map(|(d, _)| d).collect::<Vec<_>>();
284
285 popped.sort();
287
288 assert_eq!(vec![1, 2, 3, 4, 5], popped);
289 assert_eq!(0, rb.count(None).await);
290 assert_eq!(0, rb.count_untagged());
291 }
292
293 #[async_std::test]
294 async fn test_pop_all_specific() {
295 let mut rb = make_backend(2);
296
297 rb.push(Some(1), 0).await;
298 rb.push(Some(1), 2).await;
299 rb.push(Some(1), 1).await;
300
301 rb.push(Some(2), 3).await;
302 rb.push(Some(2), 4).await;
303
304 rb.push(None, 5).await;
305
306 let mut popped = rb
307 .pop_all(Some(1))
308 .await
309 .into_iter()
310 .map(|(d, _)| d)
311 .collect::<Vec<_>>();
312
313 popped.sort();
315
316 assert_eq!(vec![1, 2], popped);
317 assert_eq!(2, rb.count(Some(2)).await);
318 assert_eq!(3, rb.count(None).await);
319 }
320
321 #[async_std::test]
322 async fn test_pop_oldest() {
323 let mut rb = make_backend(2);
324
325 rb.push(Some(3), 10).await;
326
327 rb.push(Some(1), 1).await;
328 rb.push(Some(1), 2).await;
329
330 rb.push(Some(2), 3).await;
331 rb.push(Some(2), 4).await;
332
333 assert_eq!(5, rb.count(None).await);
334
335 assert_eq!(10, rb.pop(None).await.unwrap().0);
336
337 assert_eq!(0, rb.count(Some(3)).await);
338 assert_eq!(4, rb.count(None).await);
339
340 assert_eq!(1, rb.pop(None).await.unwrap().0);
341 assert_eq!(2, rb.pop(None).await.unwrap().0);
342
343 assert_eq!(0, rb.count(Some(1)).await);
344 assert_eq!(2, rb.count(Some(2)).await);
345
346 assert_eq!(3, rb.pop(None).await.unwrap().0);
347 assert_eq!(4, rb.pop(None).await.unwrap().0);
348
349 assert_eq!(0, rb.count(Some(2)).await);
350 assert_eq!(0, rb.count(None).await);
351 }
352
353 #[async_std::test]
354 async fn test_peek_oldest() {
355 let mut rb = make_backend(2);
356
357 rb.push(Some(3), 10).await;
358
359 rb.push(Some(1), 1).await;
360 rb.push(Some(1), 2).await;
361
362 rb.push(Some(2), 3).await;
363 rb.push(Some(2), 4).await;
364
365 assert_eq!(5, rb.count(None).await);
366
367 assert_eq!(10, rb.peek(None).await.unwrap().0);
368
369 assert_eq!(5, rb.count(None).await);
370 }
371
372 #[async_std::test]
373 async fn test_peek_oldest_specific() {
374 let mut rb = make_backend(2);
375
376 rb.push(Some(3), 10).await;
377
378 rb.push(Some(1), 1).await;
379 rb.push(Some(1), 2).await;
380
381 rb.push(Some(2), 3).await;
382 rb.push(Some(2), 4).await;
383
384 assert_eq!(5, rb.count(None).await);
385 assert_eq!(1, rb.count(Some(3)).await);
386 assert_eq!(2, rb.count(Some(1)).await);
387 assert_eq!(2, rb.count(Some(2)).await);
388
389 assert_eq!(10, rb.peek(Some(3)).await.unwrap().0);
390 assert_eq!(1, rb.peek(Some(1)).await.unwrap().0);
391 assert_eq!(3, rb.peek(Some(2)).await.unwrap().0);
392
393 assert_eq!(5, rb.count(None).await);
394 assert_eq!(1, rb.count(Some(3)).await);
395 assert_eq!(2, rb.count(Some(1)).await);
396 assert_eq!(2, rb.count(Some(2)).await);
397 }
398
399 #[async_std::test]
400 async fn test_peek_all() {
401 let mut rb = make_backend(4);
402
403 rb.push(Some(1), 0).await;
404 rb.push(Some(1), 2).await;
405 rb.push(Some(1), 1).await;
406
407 assert_eq!(
408 vec![0, 2, 1],
409 rb.peek_all(None, None)
410 .await
411 .into_iter()
412 .map(|(d, _)| d)
413 .collect::<Vec<_>>()
414 );
415 assert_eq!(
416 vec![0, 2, 1],
417 rb.peek_all(None, None)
418 .await
419 .into_iter()
420 .map(|(d, _)| d)
421 .collect::<Vec<_>>()
422 );
423 assert_eq!(3, rb.count(Some(1)).await);
424
425 rb.pop_all(None).await;
426 }
427
428 #[async_std::test]
429 async fn test_peek_all_specific() {
430 let mut rb = make_backend(4);
431
432 rb.push(Some(1), 0).await;
433 rb.push(Some(1), 2).await;
434 rb.push(Some(1), 1).await;
435
436 rb.push(Some(2), 3).await;
437 rb.push(Some(2), 4).await;
438
439 rb.push(None, 5).await;
440
441 assert_eq!(
442 vec![0, 2, 1],
443 rb.peek_all(Some(1), None)
444 .await
445 .into_iter()
446 .map(|(d, _)| d)
447 .collect::<Vec<_>>()
448 );
449 assert_eq!(3, rb.count(Some(1)).await);
450 assert_eq!(2, rb.count(Some(2)).await);
451 assert_eq!(6, rb.count(None).await);
452
453 rb.pop_all(None).await;
454 }
455
456 #[async_std::test]
457 async fn test_peek_all_with_timestamp() {
458 let mut rb = make_backend(8);
459
460 rb.push(Some(2), 0).await;
461 rb.push(Some(1), 0).await;
462 rb.push(Some(1), 1).await;
463 rb.push(Some(1), 2).await;
464 rb.push(Some(1), 3).await;
465 rb.push(Some(1), 4).await;
466 rb.push(Some(1), 5).await;
467
468 async_std::task::sleep(Duration::from_millis(10)).await;
470 let close_past = std::time::SystemTime::now()
471 .duration_since(std::time::UNIX_EPOCH)
472 .unwrap();
473
474 rb.push(Some(1), 6).await;
475 rb.push(Some(1), 7).await;
476 rb.push(Some(2), 1).await;
477
478 assert_eq!(
479 vec![6, 7],
480 rb.peek_all(Some(1), Some(close_past))
481 .await
482 .into_iter()
483 .map(|(d, _)| d)
484 .collect::<Vec<_>>()
485 );
486 assert_eq!(
487 vec![1],
488 rb.peek_all(Some(2), Some(close_past))
489 .await
490 .into_iter()
491 .map(|(d, _)| d)
492 .collect::<Vec<_>>()
493 );
494 assert_eq!(
495 vec![0, 1, 2, 3, 4, 5, 6, 7],
496 rb.peek_all(Some(1), None)
497 .await
498 .into_iter()
499 .map(|(d, _)| d)
500 .collect::<Vec<_>>()
501 );
502
503 rb.pop_all(None).await;
504 }
505
506 #[async_std::test]
507 async fn test_purge() {
508 let mut rb = make_backend(8);
509
510 rb.push(None, 0).await;
511 rb.push(None, 1).await;
512 rb.push(None, 2).await;
513 rb.push(None, 3).await;
514
515 async_std::task::sleep(Duration::from_millis(100)).await;
516 let ts = std::time::SystemTime::now()
517 .duration_since(std::time::UNIX_EPOCH)
518 .unwrap();
519
520 rb.push(None, 4).await;
521 rb.push(None, 5).await;
522 rb.push(None, 6).await;
523 rb.push(None, 7).await;
524
525 assert_eq!(8, rb.count(None).await);
526
527 rb.purge(ts).await;
528
529 assert_eq!(4, rb.count(None).await);
530 assert_eq!(
531 vec![4, 5, 6, 7],
532 rb.pop_all(None).await.into_iter().map(|(d, _)| d).collect::<Vec<_>>()
533 );
534 }
535
536 #[async_std::test]
537 async fn test_duplicates() {
538 let mut rb = make_backend(4);
539
540 rb.push(None, 1).await;
541 rb.push(None, 0).await;
542 rb.push(None, 0).await;
543 rb.push(None, 0).await;
544 rb.push(None, 0).await;
545
546 rb.push(Some(1), 1).await;
547 rb.push(Some(1), 0).await;
548 rb.push(Some(1), 0).await;
549 rb.push(Some(1), 0).await;
550 rb.push(Some(1), 0).await;
551
552 assert_eq!(
553 vec![0, 0, 0, 0, 0, 0, 0, 0],
554 rb.pop_all(None).await.into_iter().map(|(d, _)| d).collect::<Vec<_>>()
555 );
556 }
557}