1use async_lock::Mutex;
2use async_trait::async_trait;
3use hopr_internal_types::prelude::*;
4use std::time::Duration;
5
6use crate::config::MessageInboxConfiguration;
7
8pub type TimestampFn = fn() -> Duration;
11
12#[async_trait]
15pub trait InboxBackend<T: Copy + Default + std::marker::Send, M: Clone + std::marker::Send> {
16 fn new_with_capacity(cap: usize, ts: TimestampFn) -> Self;
18
19 async fn push(&mut self, tag: Option<T>, payload: M);
21
22 async fn count(&self, tag: Option<T>) -> usize;
26
27 async fn pop(&mut self, tag: Option<T>) -> Option<(M, Duration)>;
31
32 async fn pop_all(&mut self, tag: Option<T>) -> Vec<(M, Duration)>;
34
35 async fn peek(&mut self, tag: Option<T>) -> Option<(M, Duration)>;
39
40 async fn peek_all(&mut self, tag: Option<T>, timestamp: Option<Duration>) -> Vec<(M, Duration)>;
45
46 async fn purge(&mut self, older_than_ts: Duration);
48}
49
50pub struct MessageInbox<B>
54where
55 B: InboxBackend<Tag, ApplicationData>,
56{
57 cfg: MessageInboxConfiguration,
58 backend: Mutex<B>,
59 time: TimestampFn,
60}
61
62impl<B> MessageInbox<B>
63where
64 B: InboxBackend<Tag, ApplicationData>,
65{
66 pub fn new(cfg: MessageInboxConfiguration) -> Self {
70 Self::new_with_time(cfg, || {
71 std::time::SystemTime::now()
72 .duration_since(std::time::UNIX_EPOCH)
73 .expect("Time went backwards")
74 })
75 }
76
77 pub fn new_with_time(cfg: MessageInboxConfiguration, time: TimestampFn) -> Self {
79 Self {
80 backend: Mutex::new(B::new_with_capacity(cfg.capacity as usize, time)),
81 time,
82 cfg,
83 }
84 }
85
86 fn is_excluded_tag(&self, tag: &Option<Tag>) -> bool {
94 match tag {
95 None => false,
96 Some(t) => self.cfg.excluded_tags.iter().any(|e| e.eq(t)),
97 }
98 }
99
100 #[tracing::instrument(level = "debug", skip(self))]
103 pub async fn push(&self, payload: ApplicationData) -> bool {
104 if self.is_excluded_tag(&payload.application_tag) {
105 return false;
106 }
107
108 let mut db = self.backend.lock().await;
110 db.push(payload.application_tag, payload).await;
111 db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
112
113 true
114 }
115
116 #[tracing::instrument(level = "debug", skip(self))]
119 pub async fn size(&self, tag: Option<Tag>) -> usize {
120 if self.is_excluded_tag(&tag) {
121 return 0;
122 }
123
124 let mut db = self.backend.lock().await;
125 db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
126 db.count(tag).await
127 }
128
129 #[tracing::instrument(level = "debug", skip(self))]
135 pub async fn pop(&self, tag: Option<Tag>) -> Option<(ApplicationData, Duration)> {
136 if self.is_excluded_tag(&tag) {
137 return None;
138 }
139
140 let mut db = self.backend.lock().await;
141 db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
142
143 db.pop(tag).await
144 }
145
146 #[tracing::instrument(level = "debug", skip(self))]
152 pub async fn peek(&self, tag: Option<Tag>) -> Option<(ApplicationData, Duration)> {
153 if self.is_excluded_tag(&tag) {
154 return None;
155 }
156
157 let mut db = self.backend.lock().await;
158 db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
159
160 db.peek(tag).await
161 }
162
163 #[tracing::instrument(level = "debug", skip(self))]
166 pub async fn peek_all(&self, tag: Option<Tag>, timestamp: Option<Duration>) -> Vec<(ApplicationData, Duration)> {
167 if self.is_excluded_tag(&tag) {
168 return Vec::new();
169 }
170
171 let mut db = self.backend.lock().await;
172 db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
173 db.peek_all(tag, timestamp).await
174 }
175
176 #[tracing::instrument(level = "debug", skip(self))]
179 pub async fn pop_all(&self, tag: Option<Tag>) -> Vec<(ApplicationData, Duration)> {
180 if self.is_excluded_tag(&tag) {
181 return Vec::new();
182 }
183
184 let mut db = self.backend.lock().await;
185 db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
186 db.pop_all(tag).await
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use crate::inbox::{MessageInbox, MessageInboxConfiguration};
193 use crate::ring::RingBufferInboxBackend;
194 use anyhow::Context;
195 use hopr_internal_types::prelude::*;
196 use std::time::Duration;
197
198 #[async_std::test]
199 async fn test_basic_flow() -> anyhow::Result<()> {
200 let cfg = MessageInboxConfiguration {
201 capacity: 4,
202 max_age: std::time::Duration::from_secs(2),
203 excluded_tags: vec![2],
204 };
205
206 let mi = MessageInbox::<RingBufferInboxBackend<Tag, ApplicationData>>::new(cfg);
207
208 assert!(
209 mi.push(ApplicationData {
210 application_tag: None,
211 plain_text: (*b"test msg 0").into()
212 })
213 .await
214 );
215 assert!(
216 mi.push(ApplicationData {
217 application_tag: Some(1),
218 plain_text: (*b"test msg 1").into()
219 })
220 .await
221 );
222 assert!(
223 mi.push(ApplicationData {
224 application_tag: Some(1),
225 plain_text: (*b"test msg 2").into()
226 })
227 .await
228 );
229 assert!(
230 !mi.push(ApplicationData {
231 application_tag: Some(2),
232 plain_text: (*b"test msg").into()
233 })
234 .await
235 );
236 assert_eq!(3, mi.size(None).await);
237 assert_eq!(2, mi.size(Some(1)).await);
238 assert_eq!(0, mi.size(Some(2)).await);
239
240 let ad = mi.pop(None).await.context("message should be present")?;
241 assert_eq!(b"test msg 0", ad.0.plain_text.as_ref());
242
243 let ad = mi.pop(Some(1)).await.context("message should be present")?;
244 assert_eq!(b"test msg 1", ad.0.plain_text.as_ref());
245 assert_eq!(1, mi.size(Some(1)).await);
246
247 assert_eq!(1, mi.size(None).await);
248
249 async_std::task::sleep(Duration::from_millis(2500)).await;
250
251 assert_eq!(0, mi.size(None).await);
252
253 Ok(())
254 }
255}