Skip to main content

hopr_transport_mixer/
channel.rs

1use std::{
2    cmp::Reverse,
3    collections::BinaryHeap,
4    future::poll_fn,
5    sync::{
6        Arc,
7        atomic::{AtomicBool, AtomicUsize, Ordering},
8    },
9    task::Poll,
10    time::Duration,
11};
12
13use futures::{FutureExt, Stream, StreamExt};
14use futures_timer::Delay;
15use parking_lot::Mutex;
16use tracing::trace;
17
18use crate::{config::MixerConfig, data::DelayedData};
19
20#[cfg(all(feature = "telemetry", not(test)))]
21lazy_static::lazy_static! {
22    pub static ref METRIC_QUEUE_SIZE: hopr_types::telemetry::SimpleGauge =
23        hopr_types::telemetry::SimpleGauge::new("hopr_mixer_queue_size", "Current mixer queue size").unwrap();
24    pub static ref METRIC_MIXER_AVERAGE_DELAY: hopr_types::telemetry::SimpleGauge = hopr_types::telemetry::SimpleGauge::new(
25        "hopr_mixer_average_packet_delay",
26        "Average mixer packet delay averaged over a packet window"
27    )
28    .unwrap();
29}
30
31/// Mixing and delaying channel using random delay function.
32///
33/// Mixing is performed by assigning random delays to the ingress timestamp of data,
34/// then storing the values inside a binary heap with reversed ordering (max heap).
35/// This effectively creates a min heap behavior, which is required to ensure that
36/// data is released in order of their delay expiration.
37///
38/// When data arrives:
39/// 1. A random delay is assigned
40/// 2. Data is stored in the heap with its release timestamp
41/// 3. The heap maintains ordering so items with earliest release time are at the top
42///
43/// This channel is **unbounded** by nature using the `capacity` in the configuration
44/// to solely pre-allocate the buffer.
45///
46/// The timer used by the receiver to wait for the next release deadline is **not** stored
47/// behind this mutex — it lives on the [`Receiver`] itself. Keeping it out of the shared
48/// state is what lets the receiver poll the timer without blocking senders.
49struct Channel<T> {
50    /// Buffer holding the data with a timestamp ordering to ensure the min heap behavior.
51    buffer: BinaryHeap<Reverse<DelayedData<T>>>,
52    waker: Option<std::task::Waker>,
53    cfg: MixerConfig,
54}
55
56/// Channel with sender and receiver counters allowing closure tracking.
57struct TrackedChannel<T> {
58    channel: Arc<Mutex<Channel<T>>>,
59    sender_count: Arc<AtomicUsize>,
60    receiver_active: Arc<AtomicBool>,
61}
62
63impl<T> Clone for TrackedChannel<T> {
64    fn clone(&self) -> Self {
65        Self {
66            channel: self.channel.clone(),
67            sender_count: self.sender_count.clone(),
68            receiver_active: self.receiver_active.clone(),
69        }
70    }
71}
72
73/// Error returned by the [`Sender`].
74#[derive(Clone, Debug, thiserror::Error)]
75pub enum SenderError {
76    /// The channel is closed due to receiver being dropped.
77    #[error("Channel is closed")]
78    Closed,
79}
80
81/// Sender object interacting with the mixing channel.
82pub struct Sender<T> {
83    channel: TrackedChannel<T>,
84}
85
86impl<T> Clone for Sender<T> {
87    fn clone(&self) -> Self {
88        let channel = self.channel.clone();
89        channel.sender_count.fetch_add(1, Ordering::Relaxed);
90
91        Sender { channel }
92    }
93}
94
95impl<T> Drop for Sender<T> {
96    fn drop(&mut self) {
97        if self.channel.sender_count.fetch_sub(1, Ordering::Relaxed) == 1
98            && !self.channel.receiver_active.load(Ordering::Relaxed)
99        {
100            self.channel.channel.lock().waker = None;
101        }
102    }
103}
104
105impl<T> Sender<T> {
106    /// Send one item to the mixing channel.
107    pub fn send(&self, item: T) -> Result<(), SenderError> {
108        self.push_item(item)
109    }
110
111    /// Locked critical section shared between `Sink::start_send` and [`Sender::send`].
112    #[tracing::instrument(level = "trace", skip(self, item))]
113    fn push_item(&self, item: T) -> Result<(), SenderError> {
114        if !self.channel.receiver_active.load(Ordering::Relaxed) {
115            return Err(SenderError::Closed);
116        }
117
118        let mut channel = self.channel.channel.lock();
119
120        let random_delay = channel.cfg.random_delay();
121
122        trace!(delay_in_ms = random_delay.as_millis(), "generated mixer delay",);
123
124        let delayed_data: DelayedData<T> = (std::time::Instant::now() + random_delay, item).into();
125        channel.buffer.push(Reverse(delayed_data));
126
127        if let Some(waker) = channel.waker.as_ref() {
128            waker.wake_by_ref();
129        }
130
131        #[cfg(all(feature = "telemetry", not(test)))]
132        {
133            METRIC_QUEUE_SIZE.increment(1.0f64);
134
135            let weight = 1.0f64 / channel.cfg.metric_delay_window as f64;
136            METRIC_MIXER_AVERAGE_DELAY.set(
137                (weight * random_delay.as_millis() as f64) + ((1.0f64 - weight) * METRIC_MIXER_AVERAGE_DELAY.get()),
138            );
139        }
140
141        Ok(())
142    }
143}
144
145impl<T> futures::sink::Sink<T> for Sender<T> {
146    type Error = SenderError;
147
148    fn poll_ready(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
149        let is_active = self.channel.receiver_active.load(Ordering::Relaxed);
150        if is_active {
151            Poll::Ready(Ok(()))
152        } else {
153            Poll::Ready(Err(SenderError::Closed))
154        }
155    }
156
157    fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
158        self.push_item(item)
159    }
160
161    fn poll_flush(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
162        Poll::Ready(Ok(()))
163    }
164
165    fn poll_close(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
166        // The channel can only be closed by the receiver. The sender can be dropped at any point.
167        Poll::Ready(Ok(()))
168    }
169}
170
171/// Error returned by the [`Receiver`].
172#[derive(Debug, thiserror::Error)]
173pub enum ReceiverError {
174    /// The channel is closed due to receiver being dropped.
175    #[error("Channel is closed")]
176    Closed,
177}
178
179/// Receiver object interacting with the mixer channel.
180///
181/// The receiver receives already mixed elements without any knowledge of
182/// the original order.
183///
184/// The release-deadline timer lives on the receiver itself (not behind the shared
185/// mutex). Senders therefore never wait on this timer's poll — a sender can push into
186/// the buffer while the receiver is parked on the timer.
187pub struct Receiver<T> {
188    channel: TrackedChannel<T>,
189    timer: Delay,
190}
191
192impl<T> Stream for Receiver<T> {
193    type Item = T;
194
195    #[tracing::instrument(level = "trace", skip(self, cx))]
196    fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
197        let now = std::time::Instant::now();
198        let no_senders = self.channel.sender_count.load(Ordering::Relaxed) == 0;
199
200        // Phase 1: under lock, try to pop a due item; otherwise register the waker and
201        // capture the duration to sleep. Drop the lock before touching the timer.
202        //
203        // When all senders have dropped (`no_senders`), we must still drain any items
204        // remaining in the buffer before terminating the stream — returning `None`
205        // with pending items in flight would silently discard packets the senders
206        // enqueued before closing.
207        let sleep_for = {
208            let mut channel = self.channel.channel.lock();
209
210            if channel.buffer.peek().map(|x| x.0.release_at < now).unwrap_or(false) {
211                let data = channel
212                    .buffer
213                    .pop()
214                    .expect("value must be present during the same locked access")
215                    .0
216                    .item;
217
218                trace!(from = "direct", "yield item");
219
220                #[cfg(all(feature = "telemetry", not(test)))]
221                METRIC_QUEUE_SIZE.decrement(1.0f64);
222
223                return Poll::Ready(Some(data));
224            }
225
226            // Buffer is either empty or every item is still in the future.
227            if channel.buffer.is_empty() && no_senders {
228                // Nothing left to deliver and nobody is going to enqueue more.
229                drop(channel);
230                self.channel.receiver_active.store(false, Ordering::Relaxed);
231                return Poll::Ready(None);
232            }
233
234            match channel.waker.as_mut() {
235                Some(existing) => existing.clone_from(cx.waker()),
236                None => channel.waker = Some(cx.waker().clone()),
237            }
238
239            match channel.buffer.peek() {
240                Some(next) => next.0.release_at.duration_since(now),
241                None => {
242                    // Senders still alive (else we would have returned `None` above) —
243                    // wait for one of them to push.
244                    trace!(from = "direct", "pending (empty buffer)");
245                    return Poll::Pending;
246                }
247            }
248        };
249
250        // Phase 2: poll the timer WITHOUT holding the mutex so senders can keep pushing.
251        let this = self.get_mut();
252        trace!("resetting the timer");
253        this.timer.reset(sleep_for);
254        futures::ready!(this.timer.poll_unpin(cx));
255
256        // Phase 3: timer fired. Re-take the lock. Because there is only one receiver,
257        // the item at the top is still present; senders can only have added more.
258        // A newly-pushed item with an earlier deadline is also safe to pop — it merely
259        // yields in a different order than originally expected, which is consistent
260        // with the mixer's design of not preserving input order.
261        let mut channel = this.channel.channel.lock();
262        match channel.buffer.pop() {
263            Some(entry) => {
264                trace!(from = "timer", "yield item");
265
266                #[cfg(all(feature = "telemetry", not(test)))]
267                METRIC_QUEUE_SIZE.decrement(1.0f64);
268
269                Poll::Ready(Some(entry.0.item))
270            }
271            None => {
272                trace!(from = "timer", "buffer drained before we re-acquired the lock");
273                Poll::Pending
274            }
275        }
276    }
277}
278
279impl<T> Receiver<T> {
280    /// Receive a single delayed mixed item.
281    pub async fn recv(&mut self) -> Option<T> {
282        poll_fn(|cx| self.poll_next_unpin(cx)).await
283    }
284}
285
286/// Instantiate a mixing channel and return the sender and receiver end of the channel.
287pub fn channel<T>(cfg: crate::config::MixerConfig) -> (Sender<T>, Receiver<T>) {
288    #[cfg(all(feature = "telemetry", not(test)))]
289    {
290        // Initialize the lazy statics here
291        lazy_static::initialize(&METRIC_QUEUE_SIZE);
292        lazy_static::initialize(&METRIC_MIXER_AVERAGE_DELAY);
293    }
294
295    let mut buffer = BinaryHeap::new();
296    buffer.reserve(cfg.capacity);
297
298    let channel = TrackedChannel {
299        channel: Arc::new(Mutex::new(Channel::<T> {
300            buffer,
301            waker: None,
302            cfg,
303        })),
304        sender_count: Arc::new(AtomicUsize::new(1)),
305        receiver_active: Arc::new(AtomicBool::new(true)),
306    };
307    (
308        Sender {
309            channel: channel.clone(),
310        },
311        Receiver {
312            channel,
313            timer: Delay::new(Duration::from_secs(0)),
314        },
315    )
316}
317
318#[cfg(test)]
319mod tests {
320    use futures::{SinkExt, StreamExt};
321    use tokio::time::timeout;
322
323    use super::*;
324
325    const PROCESSING_LEEWAY: Duration = Duration::from_millis(250);
326    const MAXIMUM_SINGLE_DELAY_DURATION: Duration = Duration::from_millis(
327        crate::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS + crate::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS,
328    );
329
330    #[tokio::test]
331    async fn mixer_channel_should_pass_an_element() -> anyhow::Result<()> {
332        let (tx, mut rx) = channel(MixerConfig::default());
333        tx.send(1)?;
334        assert_eq!(rx.recv().await, Some(1));
335
336        Ok(())
337    }
338
339    #[tokio::test]
340    async fn mixer_channel_should_introduce_random_delay() -> anyhow::Result<()> {
341        let start = std::time::SystemTime::now();
342
343        let (tx, mut rx) = channel(MixerConfig::default());
344        tx.send(1)?;
345        assert_eq!(rx.recv().await, Some(1));
346
347        let elapsed = start.elapsed()?;
348
349        assert!(elapsed < MAXIMUM_SINGLE_DELAY_DURATION + PROCESSING_LEEWAY);
350        assert!(elapsed > Duration::from_millis(crate::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS));
351        Ok(())
352    }
353
354    #[tokio::test]
355    // #[tracing_test::traced_test]
356    async fn mixer_channel_should_batch_on_sending_emulating_concurrency() -> anyhow::Result<()> {
357        const ITERATIONS: usize = 10;
358
359        let (tx, mut rx) = channel(MixerConfig::default());
360
361        let start = std::time::SystemTime::now();
362
363        for i in 0..ITERATIONS {
364            tx.send(i)?;
365        }
366        for _ in 0..ITERATIONS {
367            let data = timeout(MAXIMUM_SINGLE_DELAY_DURATION, rx.next()).await?;
368            assert!(data.is_some());
369        }
370
371        let elapsed = start.elapsed()?;
372
373        assert!(elapsed < MAXIMUM_SINGLE_DELAY_DURATION + PROCESSING_LEEWAY);
374        assert!(elapsed > Duration::from_millis(crate::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS));
375        Ok(())
376    }
377
378    #[tokio::test]
379    // #[tracing_test::traced_test]
380    async fn mixer_channel_should_work_concurrently_and_properly_closed_channels() -> anyhow::Result<()> {
381        const ITERATIONS: usize = 1000;
382
383        let (tx, mut rx) = channel(MixerConfig::default());
384
385        let recv_task = tokio::task::spawn(async move {
386            while let Some(_item) = timeout(2 * MAXIMUM_SINGLE_DELAY_DURATION, rx.next())
387                .await
388                .expect("receiver should not fail")
389            {}
390        });
391
392        let send_task =
393            tokio::task::spawn(async move { futures::stream::iter(0..ITERATIONS).map(Ok).forward(tx).await });
394
395        let (_recv, send) = futures::try_join!(
396            timeout(MAXIMUM_SINGLE_DELAY_DURATION, recv_task),
397            timeout(MAXIMUM_SINGLE_DELAY_DURATION, send_task)
398        )?;
399
400        send??;
401
402        Ok(())
403    }
404
405    #[tokio::test]
406    // #[tracing_test::traced_test]
407    async fn mixer_channel_should_produce_mixed_output_from_the_supplied_input_using_sync_send() -> anyhow::Result<()> {
408        const ITERATIONS: usize = 20; // highly unlikely that this produces the same order on the input given the size
409
410        let (tx, rx) = channel(MixerConfig::default());
411
412        let input = (0..ITERATIONS).collect::<Vec<_>>();
413
414        for i in input.iter() {
415            tx.send(*i)?;
416        }
417
418        let mixed_output = timeout(
419            2 * MAXIMUM_SINGLE_DELAY_DURATION,
420            rx.take(ITERATIONS).collect::<Vec<_>>(),
421        )
422        .await?;
423
424        tracing::info!(?input, ?mixed_output, "asserted data");
425        assert_ne!(input, mixed_output);
426        Ok(())
427    }
428
429    #[tokio::test]
430    // #[tracing_test::traced_test]
431    async fn mixer_channel_should_produce_mixed_output_from_the_supplied_input_using_async_send() -> anyhow::Result<()>
432    {
433        const ITERATIONS: usize = 20; // highly unlikely that this produces the same order on the input given the size
434
435        let (mut tx, rx) = channel(MixerConfig::default());
436
437        let input = (0..ITERATIONS).collect::<Vec<_>>();
438
439        for i in input.iter() {
440            SinkExt::send(&mut tx, *i).await?;
441        }
442
443        let mixed_output = timeout(
444            2 * MAXIMUM_SINGLE_DELAY_DURATION,
445            rx.take(ITERATIONS).collect::<Vec<_>>(),
446        )
447        .await?;
448
449        tracing::info!(?input, ?mixed_output, "asserted data");
450        assert_ne!(input, mixed_output);
451        Ok(())
452    }
453
454    #[tokio::test]
455    // #[tracing_test::traced_test]
456    async fn mixer_channel_should_produce_mixed_output_from_the_supplied_input_using_async_feed() -> anyhow::Result<()>
457    {
458        const ITERATIONS: usize = 20; // highly unlikely that this produces the same order on the input given the size
459
460        let (mut tx, rx) = channel(MixerConfig::default());
461
462        let input = (0..ITERATIONS).collect::<Vec<_>>();
463
464        for i in input.iter() {
465            SinkExt::feed(&mut tx, *i).await?;
466        }
467        SinkExt::flush(&mut tx).await?;
468
469        let mixed_output = timeout(
470            2 * MAXIMUM_SINGLE_DELAY_DURATION,
471            rx.take(ITERATIONS).collect::<Vec<_>>(),
472        )
473        .await?;
474
475        tracing::info!(?input, ?mixed_output, "asserted data");
476        assert_ne!(input, mixed_output);
477        Ok(())
478    }
479
480    #[tokio::test]
481    // #[tracing_test::traced_test]
482    async fn mixer_channel_should_not_mix_the_order_if_the_min_delay_and_delay_range_is_0() -> anyhow::Result<()> {
483        const ITERATIONS: usize = 40; // highly unlikely that this produces the same order on the input given the size
484
485        let (tx, rx) = channel(MixerConfig {
486            min_delay: Duration::from_millis(0),
487            delay_range: Duration::from_millis(0),
488            ..MixerConfig::default()
489        });
490
491        let input = (0..ITERATIONS).collect::<Vec<_>>();
492
493        for i in input.iter() {
494            tx.send(*i)?;
495            tokio::time::sleep(std::time::Duration::from_micros(10)).await; // ensure we don't send too fast
496        }
497
498        let mixed_output = timeout(
499            2 * MAXIMUM_SINGLE_DELAY_DURATION,
500            rx.take(ITERATIONS).collect::<Vec<_>>(),
501        )
502        .await?;
503
504        tracing::info!(?input, ?mixed_output, "asserted data");
505        assert_eq!(input, mixed_output);
506
507        Ok(())
508    }
509
510    #[tokio::test]
511    async fn sender_should_return_closed_when_receiver_inactive() -> anyhow::Result<()> {
512        let (tx, _rx) = channel::<i32>(MixerConfig::default());
513
514        // Simulate the receiver marking itself inactive (normally happens
515        // in poll_next when sender_count drops to 0).
516        tx.channel.receiver_active.store(false, Ordering::Relaxed);
517
518        let result = tx.send(42);
519        assert!(
520            matches!(result, Err(SenderError::Closed)),
521            "send with inactive receiver should return Closed, got: {result:?}"
522        );
523        Ok(())
524    }
525
526    /// Regression test for #7947 item 2: the receiver must not hold the channel
527    /// mutex across its timer poll, otherwise a sender trying to push a new item
528    /// would block until the original timer deadline expired.
529    #[tokio::test]
530    async fn sender_can_push_while_receiver_is_parked_on_timer() -> anyhow::Result<()> {
531        // Mixer with a long minimum delay — the receiver's first poll will park on the
532        // timer for at least this duration.
533        let cfg = MixerConfig {
534            min_delay: Duration::from_millis(500),
535            delay_range: Duration::from_millis(1),
536            ..MixerConfig::default()
537        };
538        let (tx, mut rx) = channel::<u32>(cfg);
539
540        // Prime: push one item so the receiver's timer branch activates.
541        tx.send(0)?;
542
543        // Drive the receiver far enough to install the timer for the first item, then
544        // try to push again from another task. If the mutex were held across the timer
545        // poll, this second send would block for ~500 ms.
546        let rx_task = tokio::task::spawn(async move {
547            // Collect both items back.
548            let first = rx.next().await;
549            let second = rx.next().await;
550            (first, second)
551        });
552
553        // Give the receiver a moment to enter the timer branch (it will have locked,
554        // seen the not-yet-due item, stored the waker, unlocked, and started polling
555        // the Delay).
556        tokio::time::sleep(Duration::from_millis(50)).await;
557
558        // The crucial assertion: this `send()` must return promptly — not block for
559        // ~450 ms. We measure the latency of the send itself.
560        let send_started = std::time::Instant::now();
561        tx.send(1)?;
562        let send_latency = send_started.elapsed();
563        assert!(
564            send_latency < Duration::from_millis(100),
565            "sender was blocked by receiver's timer poll: send took {send_latency:?}"
566        );
567
568        // Sanity: both items eventually arrive.
569        let (first, second) = timeout(Duration::from_millis(1500), rx_task)
570            .await?
571            .expect("receiver task should finish");
572        assert!(first.is_some(), "first item must be received");
573        assert!(second.is_some(), "second item must be received");
574        Ok(())
575    }
576
577    /// Regression test: if the last sender drops after enqueueing items, the receiver
578    /// must drain them before closing. Pre-fix the receiver returned `None` as soon as
579    /// `sender_count == 0`, silently discarding anything still in the buffer.
580    #[tokio::test]
581    async fn receiver_drains_buffered_items_after_last_sender_drops() -> anyhow::Result<()> {
582        let cfg = MixerConfig {
583            min_delay: Duration::from_millis(0),
584            delay_range: Duration::from_millis(1),
585            ..MixerConfig::default()
586        };
587        let (tx, mut rx) = channel::<u32>(cfg);
588
589        const ITERATIONS: usize = 16;
590        for i in 0..ITERATIONS as u32 {
591            tx.send(i)?;
592        }
593
594        // Drop the only sender. sender_count goes to 0, but buffer still has ITERATIONS items.
595        drop(tx);
596
597        // Must still yield every item before terminating.
598        let mut received = 0usize;
599        while let Some(_item) = timeout(Duration::from_millis(500), rx.next()).await? {
600            received += 1;
601        }
602        assert_eq!(
603            received,
604            ITERATIONS,
605            "receiver dropped {} pending items on sender shutdown",
606            ITERATIONS - received
607        );
608        Ok(())
609    }
610
611    /// Regression test: all `Sender` clones push into the same shared heap,
612    /// so items from different clones are drained by the single `Receiver`.
613    ///
614    /// Pre-fix (`MixerSink::clone` with per-clone heap): each clone has its own
615    /// `BinaryHeap`; a separate receiver per clone would be required to observe
616    /// all items. With a single receiver the other clone's items are silently lost.
617    ///
618    /// Post-fix (`Sender`/`Receiver` channel): one shared heap, one receiver
619    /// sees every item regardless of which clone pushed it.
620    #[tokio::test]
621    async fn sender_clones_share_heap() -> anyhow::Result<()> {
622        let cfg = MixerConfig {
623            min_delay: Duration::from_millis(50),
624            delay_range: Duration::from_millis(1),
625            ..MixerConfig::default()
626        };
627        let (tx_a, mut rx) = channel::<u32>(cfg);
628        let tx_b = tx_a.clone();
629
630        tx_a.send(1)?;
631        tx_b.send(2)?;
632        drop(tx_a);
633        drop(tx_b);
634
635        // Single receiver must drain both items — proves the heap is shared.
636        let mut got = vec![
637            timeout(MAXIMUM_SINGLE_DELAY_DURATION + PROCESSING_LEEWAY, rx.next())
638                .await?
639                .expect("first item"),
640            timeout(MAXIMUM_SINGLE_DELAY_DURATION + PROCESSING_LEEWAY, rx.next())
641                .await?
642                .expect("second item"),
643        ];
644        got.sort();
645        assert_eq!(got, vec![1, 2]);
646        assert!(rx.next().await.is_none(), "expected channel closed with no more items");
647        Ok(())
648    }
649}