hoprd_inbox/
inbox.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
use async_lock::Mutex;
use async_trait::async_trait;
use hopr_internal_types::prelude::*;
use std::time::Duration;

use crate::config::MessageInboxConfiguration;

/// Represents a simple timestamping function.
/// This is useful if used in WASM or environment which might have different means of measuring time.
pub type TimestampFn = fn() -> Duration;

/// Represents a generic backend trait for the message inbox.
/// Messages `M` can be tagged or untagged via the type `T`
#[async_trait]
pub trait InboxBackend<T: Copy + Default + std::marker::Send, M: Clone + std::marker::Send> {
    /// Create new storage with the given capacity and the timestamping function
    fn new_with_capacity(cap: usize, ts: TimestampFn) -> Self;

    /// Push a new entry with an optional `tag`.
    async fn push(&mut self, tag: Option<T>, payload: M);

    /// Count number of entries with the given `tag`.
    ///
    /// If no `tag` is given, returns the total count of all tagged and untagged entries.
    async fn count(&self, tag: Option<T>) -> usize;

    /// Pops oldest entry with the given `tag` or oldest entry in general, if no `tag` was given.
    ///
    /// Returns `None` if queue with the given `tag` is empty, or the entire store is empty (if no `tag` was given).
    async fn pop(&mut self, tag: Option<T>) -> Option<(M, Duration)>;

    /// Pops all entries of the given `tag`, or all entries (tagged and untagged) and returns them.
    async fn pop_all(&mut self, tag: Option<T>) -> Vec<(M, Duration)>;

    /// Peeks the oldest entry with the given `tag` or oldest entry in general, if no `tag` was given.
    ///
    /// Returns `None` if queue with the given `tag` is empty, or the entire store is empty (if no `tag` was given).
    async fn peek(&mut self, tag: Option<T>) -> Option<(M, Duration)>;

    /// Peeks all entries of the given `tag`, or all entries (tagged and untagged) and returns them.
    ///
    /// If the optional parameter `timestamp` is provided, only entries more recent than this are returned.
    /// NOTE: the timestamp comparison precision should be at most up to milliseconds.
    async fn peek_all(&mut self, tag: Option<T>, timestamp: Option<Duration>) -> Vec<(M, Duration)>;

    /// Purges all entries strictly older than the given timestamp.
    async fn purge(&mut self, older_than_ts: Duration);
}

/// Represents a thread-safe message inbox of messages of type `M`
/// This type is thin frontend over `InboxBackend` using 16-bit unsigned integer tags.
/// Each operation also performs `purge` of the backend.
pub struct MessageInbox<B>
where
    B: InboxBackend<Tag, ApplicationData>,
{
    cfg: MessageInboxConfiguration,
    backend: Mutex<B>,
    time: TimestampFn,
}

impl<B> MessageInbox<B>
where
    B: InboxBackend<Tag, ApplicationData>,
{
    /// Creates new instance given the configuration.
    ///
    /// Uses `std::time::SystemTime` as timestamping function.
    pub fn new(cfg: MessageInboxConfiguration) -> Self {
        Self::new_with_time(cfg, || {
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .expect("Time went backwards")
        })
    }

    /// Creates new instance given the configuration and the timestamping function.
    pub fn new_with_time(cfg: MessageInboxConfiguration, time: TimestampFn) -> Self {
        Self {
            backend: Mutex::new(B::new_with_capacity(cfg.capacity as usize, time)),
            time,
            cfg,
        }
    }

    /// Checks whether the tag is in the exclusion list.
    ///
    /// NOTE: If the `tag` is `None`, it will never be considered as excluded.
    /// This has the following implication:
    /// Since [DEFAULT_APPLICATION_TAG](hopr_internal_types::protocol::DEFAULT_APPLICATION_TAG) is also considered as excluded per default,
    /// all the messages without a tag (= with implicit [DEFAULT_APPLICATION_TAG](hopr_internal_types::protocol::DEFAULT_APPLICATION_TAG))
    /// will be allowed into the inbox, whereas the messages which explicitly specify that tag, will not make it into the inbox.
    fn is_excluded_tag(&self, tag: &Option<Tag>) -> bool {
        match tag {
            None => false,
            Some(t) => self.cfg.excluded_tags.iter().any(|e| e.eq(t)),
        }
    }

    /// Push message into the inbox. Returns `true` if the message has been enqueued, `false` if it
    /// has been excluded based on the configured excluded tags.
    #[tracing::instrument(level = "debug", skip(self))]
    pub async fn push(&self, payload: ApplicationData) -> bool {
        if self.is_excluded_tag(&payload.application_tag) {
            return false;
        }

        // Push only if there is no tag, or if the tag is not excluded
        let mut db = self.backend.lock().await;
        db.push(payload.application_tag, payload).await;
        db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;

        true
    }

    /// Number of messages in the inbox with the given `tag`, or the total number of all messages
    /// if no `tag` is given.
    #[tracing::instrument(level = "debug", skip(self))]
    pub async fn size(&self, tag: Option<Tag>) -> usize {
        if self.is_excluded_tag(&tag) {
            return 0;
        }

        let mut db = self.backend.lock().await;
        db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
        db.count(tag).await
    }

    /// Pop the oldest message with the given tag, or the oldest message regardless the tag
    /// if it is not given.
    ///
    /// Returns `None` if there's no message with such `tag` (if given) in the inbox
    /// or if the whole inbox is empty (if no `tag` is given).
    #[tracing::instrument(level = "debug", skip(self))]
    pub async fn pop(&self, tag: Option<Tag>) -> Option<(ApplicationData, Duration)> {
        if self.is_excluded_tag(&tag) {
            return None;
        }

        let mut db = self.backend.lock().await;
        db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;

        db.pop(tag).await
    }

    /// Peek the oldest message with the given tag, or the oldest message regardless the tag
    /// if it is not given.
    ///
    /// Returns `None` if there's no message with such `tag` (if given) in the inbox
    /// or if the whole inbox is empty (if no `tag` is given).
    #[tracing::instrument(level = "debug", skip(self))]
    pub async fn peek(&self, tag: Option<Tag>) -> Option<(ApplicationData, Duration)> {
        if self.is_excluded_tag(&tag) {
            return None;
        }

        let mut db = self.backend.lock().await;
        db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;

        db.peek(tag).await
    }

    /// Peeks all the messages with the given `tag` (ordered oldest to latest) or
    /// all the messages from the entire inbox (ordered oldest to latest) if no `tag` is given.
    #[tracing::instrument(level = "debug", skip(self))]
    pub async fn peek_all(&self, tag: Option<Tag>, timestamp: Option<Duration>) -> Vec<(ApplicationData, Duration)> {
        if self.is_excluded_tag(&tag) {
            return Vec::new();
        }

        let mut db = self.backend.lock().await;
        db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
        db.peek_all(tag, timestamp).await
    }

    /// Pops all the messages with the given `tag` (ordered oldest to latest) or
    /// all the messages from the entire inbox (ordered oldest to latest) if no `tag` is given.
    #[tracing::instrument(level = "debug", skip(self))]
    pub async fn pop_all(&self, tag: Option<Tag>) -> Vec<(ApplicationData, Duration)> {
        if self.is_excluded_tag(&tag) {
            return Vec::new();
        }

        let mut db = self.backend.lock().await;
        db.purge((self.time)().saturating_sub(self.cfg.max_age)).await;
        db.pop_all(tag).await
    }
}

#[cfg(test)]
mod tests {
    use crate::inbox::{MessageInbox, MessageInboxConfiguration};
    use crate::ring::RingBufferInboxBackend;
    use anyhow::Context;
    use hopr_internal_types::prelude::*;
    use std::time::Duration;

    #[async_std::test]
    async fn test_basic_flow() -> anyhow::Result<()> {
        let cfg = MessageInboxConfiguration {
            capacity: 4,
            max_age: std::time::Duration::from_secs(2),
            excluded_tags: vec![2],
        };

        let mi = MessageInbox::<RingBufferInboxBackend<Tag, ApplicationData>>::new(cfg);

        assert!(
            mi.push(ApplicationData {
                application_tag: None,
                plain_text: (*b"test msg 0").into()
            })
            .await
        );
        assert!(
            mi.push(ApplicationData {
                application_tag: Some(1),
                plain_text: (*b"test msg 1").into()
            })
            .await
        );
        assert!(
            mi.push(ApplicationData {
                application_tag: Some(1),
                plain_text: (*b"test msg 2").into()
            })
            .await
        );
        assert!(
            !mi.push(ApplicationData {
                application_tag: Some(2),
                plain_text: (*b"test msg").into()
            })
            .await
        );
        assert_eq!(3, mi.size(None).await);
        assert_eq!(2, mi.size(Some(1)).await);
        assert_eq!(0, mi.size(Some(2)).await);

        let ad = mi.pop(None).await.context("message should be present")?;
        assert_eq!(b"test msg 0", ad.0.plain_text.as_ref());

        let ad = mi.pop(Some(1)).await.context("message should be present")?;
        assert_eq!(b"test msg 1", ad.0.plain_text.as_ref());
        assert_eq!(1, mi.size(Some(1)).await);

        assert_eq!(1, mi.size(None).await);

        async_std::task::sleep(Duration::from_millis(2500)).await;

        assert_eq!(0, mi.size(None).await);

        Ok(())
    }
}