hoprd_inbox/
inbox.rs

1use async_lock::Mutex;
2use async_trait::async_trait;
3use hopr_internal_types::prelude::*;
4use std::time::Duration;
5
6use crate::config::MessageInboxConfiguration;
7
8/// Represents a simple timestamping function.
9/// This is useful if used in WASM or environment which might have different means of measuring time.
10pub type TimestampFn = fn() -> Duration;
11
12/// Represents a generic backend trait for the message inbox.
13/// Messages `M` can be tagged or untagged via the type `T`
14#[async_trait]
15pub trait InboxBackend<T: Copy + Default + std::marker::Send, M: Clone + std::marker::Send> {
16    /// Create new storage with the given capacity and the timestamping function
17    fn new_with_capacity(cap: usize, ts: TimestampFn) -> Self;
18
19    /// Push a new entry with an optional `tag`.
20    async fn push(&mut self, tag: Option<T>, payload: M);
21
22    /// Count number of entries with the given `tag`.
23    ///
24    /// If no `tag` is given, returns the total count of all tagged and untagged entries.
25    async fn count(&self, tag: Option<T>) -> usize;
26
27    /// Pops oldest entry with the given `tag` or oldest entry in general, if no `tag` was given.
28    ///
29    /// Returns `None` if queue with the given `tag` is empty, or the entire store is empty (if no `tag` was given).
30    async fn pop(&mut self, tag: Option<T>) -> Option<(M, Duration)>;
31
32    /// Pops all entries of the given `tag`, or all entries (tagged and untagged) and returns them.
33    async fn pop_all(&mut self, tag: Option<T>) -> Vec<(M, Duration)>;
34
35    /// Peeks the oldest entry with the given `tag` or oldest entry in general, if no `tag` was given.
36    ///
37    /// Returns `None` if queue with the given `tag` is empty, or the entire store is empty (if no `tag` was given).
38    async fn peek(&mut self, tag: Option<T>) -> Option<(M, Duration)>;
39
40    /// Peeks all entries of the given `tag`, or all entries (tagged and untagged) and returns them.
41    ///
42    /// If the optional parameter `timestamp` is provided, only entries more recent than this are returned.
43    /// NOTE: the timestamp comparison precision should be at most up to milliseconds.
44    async fn peek_all(&mut self, tag: Option<T>, timestamp: Option<Duration>) -> Vec<(M, Duration)>;
45
46    /// Purges all entries strictly older than the given timestamp.
47    async fn purge(&mut self, older_than_ts: Duration);
48}
49
50/// Represents a thread-safe message inbox of messages of type `M`
51/// This type is thin frontend over `InboxBackend` using 16-bit unsigned integer tags.
52/// Each operation also performs `purge` of the backend.
53pub struct MessageInbox<B>
54where
55    B: InboxBackend<Tag, ApplicationData>,
56{
57    cfg: MessageInboxConfiguration,
58    backend: Mutex<B>,
59    time: TimestampFn,
60}
61
62impl<B> MessageInbox<B>
63where
64    B: InboxBackend<Tag, ApplicationData>,
65{
66    /// Creates new instance given the configuration.
67    ///
68    /// Uses `std::time::SystemTime` as timestamping function.
69    pub fn new(cfg: MessageInboxConfiguration) -> Self {
70        Self::new_with_time(cfg, || {
71            std::time::SystemTime::now()
72                .duration_since(std::time::UNIX_EPOCH)
73                .expect("Time went backwards")
74        })
75    }
76
77    /// Creates new instance given the configuration and the timestamping function.
78    pub fn new_with_time(cfg: MessageInboxConfiguration, time: TimestampFn) -> Self {
79        Self {
80            backend: Mutex::new(B::new_with_capacity(cfg.capacity as usize, time)),
81            time,
82            cfg,
83        }
84    }
85
86    /// Checks whether the tag is in the exclusion list.
87    ///
88    /// NOTE: If the `tag` is `None`, it will never be considered as excluded.
89    /// This has the following implication:
90    /// Since [DEFAULT_APPLICATION_TAG](hopr_internal_types::protocol::DEFAULT_APPLICATION_TAG) is also considered as excluded per default,
91    /// all the messages without a tag (= with implicit [DEFAULT_APPLICATION_TAG](hopr_internal_types::protocol::DEFAULT_APPLICATION_TAG))
92    /// will be allowed into the inbox, whereas the messages which explicitly specify that tag, will not make it into the inbox.
93    fn is_excluded_tag(&self, tag: &Option<Tag>) -> bool {
94        match tag {
95            None => false,
96            Some(t) => self.cfg.excluded_tags.iter().any(|e| e.eq(t)),
97        }
98    }
99
100    /// Push message into the inbox. Returns `true` if the message has been enqueued, `false` if it
101    /// has been excluded based on the configured excluded tags.
102    #[tracing::instrument(level = "debug", skip(self))]
103    pub async fn push(&self, payload: ApplicationData) -> bool {
104        if self.is_excluded_tag(&payload.application_tag) {
105            return false;
106        }
107
108        // Push only if there is no tag, or if the tag is not excluded
109        let mut db = self.backend.lock().await;
110        db.push(payload.application_tag, payload).await;
111        db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
112
113        true
114    }
115
116    /// Number of messages in the inbox with the given `tag`, or the total number of all messages
117    /// if no `tag` is given.
118    #[tracing::instrument(level = "debug", skip(self))]
119    pub async fn size(&self, tag: Option<Tag>) -> usize {
120        if self.is_excluded_tag(&tag) {
121            return 0;
122        }
123
124        let mut db = self.backend.lock().await;
125        db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
126        db.count(tag).await
127    }
128
129    /// Pop the oldest message with the given tag, or the oldest message regardless the tag
130    /// if it is not given.
131    ///
132    /// Returns `None` if there's no message with such `tag` (if given) in the inbox
133    /// or if the whole inbox is empty (if no `tag` is given).
134    #[tracing::instrument(level = "debug", skip(self))]
135    pub async fn pop(&self, tag: Option<Tag>) -> Option<(ApplicationData, Duration)> {
136        if self.is_excluded_tag(&tag) {
137            return None;
138        }
139
140        let mut db = self.backend.lock().await;
141        db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
142
143        db.pop(tag).await
144    }
145
146    /// Peek the oldest message with the given tag, or the oldest message regardless the tag
147    /// if it is not given.
148    ///
149    /// Returns `None` if there's no message with such `tag` (if given) in the inbox
150    /// or if the whole inbox is empty (if no `tag` is given).
151    #[tracing::instrument(level = "debug", skip(self))]
152    pub async fn peek(&self, tag: Option<Tag>) -> Option<(ApplicationData, Duration)> {
153        if self.is_excluded_tag(&tag) {
154            return None;
155        }
156
157        let mut db = self.backend.lock().await;
158        db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
159
160        db.peek(tag).await
161    }
162
163    /// Peeks all the messages with the given `tag` (ordered oldest to latest) or
164    /// all the messages from the entire inbox (ordered oldest to latest) if no `tag` is given.
165    #[tracing::instrument(level = "debug", skip(self))]
166    pub async fn peek_all(&self, tag: Option<Tag>, timestamp: Option<Duration>) -> Vec<(ApplicationData, Duration)> {
167        if self.is_excluded_tag(&tag) {
168            return Vec::new();
169        }
170
171        let mut db = self.backend.lock().await;
172        db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
173        db.peek_all(tag, timestamp).await
174    }
175
176    /// Pops all the messages with the given `tag` (ordered oldest to latest) or
177    /// all the messages from the entire inbox (ordered oldest to latest) if no `tag` is given.
178    #[tracing::instrument(level = "debug", skip(self))]
179    pub async fn pop_all(&self, tag: Option<Tag>) -> Vec<(ApplicationData, Duration)> {
180        if self.is_excluded_tag(&tag) {
181            return Vec::new();
182        }
183
184        let mut db = self.backend.lock().await;
185        db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
186        db.pop_all(tag).await
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use crate::inbox::{MessageInbox, MessageInboxConfiguration};
193    use crate::ring::RingBufferInboxBackend;
194    use anyhow::Context;
195    use hopr_internal_types::prelude::*;
196    use std::time::Duration;
197
198    #[async_std::test]
199    async fn test_basic_flow() -> anyhow::Result<()> {
200        let cfg = MessageInboxConfiguration {
201            capacity: 4,
202            max_age: std::time::Duration::from_secs(2),
203            excluded_tags: vec![2],
204        };
205
206        let mi = MessageInbox::<RingBufferInboxBackend<Tag, ApplicationData>>::new(cfg);
207
208        assert!(
209            mi.push(ApplicationData {
210                application_tag: None,
211                plain_text: (*b"test msg 0").into()
212            })
213            .await
214        );
215        assert!(
216            mi.push(ApplicationData {
217                application_tag: Some(1),
218                plain_text: (*b"test msg 1").into()
219            })
220            .await
221        );
222        assert!(
223            mi.push(ApplicationData {
224                application_tag: Some(1),
225                plain_text: (*b"test msg 2").into()
226            })
227            .await
228        );
229        assert!(
230            !mi.push(ApplicationData {
231                application_tag: Some(2),
232                plain_text: (*b"test msg").into()
233            })
234            .await
235        );
236        assert_eq!(3, mi.size(None).await);
237        assert_eq!(2, mi.size(Some(1)).await);
238        assert_eq!(0, mi.size(Some(2)).await);
239
240        let ad = mi.pop(None).await.context("message should be present")?;
241        assert_eq!(b"test msg 0", ad.0.plain_text.as_ref());
242
243        let ad = mi.pop(Some(1)).await.context("message should be present")?;
244        assert_eq!(b"test msg 1", ad.0.plain_text.as_ref());
245        assert_eq!(1, mi.size(Some(1)).await);
246
247        assert_eq!(1, mi.size(None).await);
248
249        async_std::task::sleep(Duration::from_millis(2500)).await;
250
251        assert_eq!(0, mi.size(None).await);
252
253        Ok(())
254    }
255}