Skip to main content

hopr_transport_mixer/
sink.rs

1use std::{
2    cmp::Reverse,
3    collections::BinaryHeap,
4    pin::Pin,
5    task::{Context, Poll},
6    time::{Duration, Instant},
7};
8
9use futures::{FutureExt, Sink};
10use futures_timer::Delay;
11use tracing::trace;
12
13#[cfg(all(feature = "telemetry", not(test)))]
14use crate::channel::{METRIC_MIXER_AVERAGE_DELAY, METRIC_QUEUE_SIZE};
15use crate::{config::MixerConfig, data::DelayedData};
16
17/// A [`Sink`] adapter that applies random delays to items before forwarding them to an inner sink.
18///
19/// Items pushed via `start_send` are held in an internal heap until their randomly-assigned
20/// release time, then forwarded to the wrapped sink. `poll_flush` drains due items and parks
21/// on a timer for the next pending item, so the owning task wakes up automatically when items
22/// become ready — no separate forwarding task is required.
23///
24/// Cloning creates a fresh, empty sink that shares only the inner sink clone and configuration;
25/// each clone maintains an independent delay heap.
26pub struct MixerSink<S, T> {
27    inner: S,
28    heap: BinaryHeap<Reverse<DelayedData<T>>>,
29    timer: Delay,
30    cfg: MixerConfig,
31}
32
33impl<S, T> MixerSink<S, T> {
34    pub fn new(inner: S, cfg: MixerConfig) -> Self {
35        let mut heap = BinaryHeap::new();
36        heap.reserve(cfg.capacity);
37        Self {
38            inner,
39            heap,
40            timer: Delay::new(Duration::ZERO),
41            cfg,
42        }
43    }
44}
45
46impl<S: Clone, T> Clone for MixerSink<S, T> {
47    fn clone(&self) -> Self {
48        Self::new(self.inner.clone(), self.cfg)
49    }
50}
51
52impl<S, T> Sink<T> for MixerSink<S, T>
53where
54    S: Sink<T> + Unpin,
55    T: Unpin,
56{
57    type Error = S::Error;
58
59    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
60        Poll::Ready(Ok(()))
61    }
62
63    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
64        let this = Pin::into_inner(self);
65        let random_delay = this.cfg.random_delay();
66
67        trace!(delay_ms = random_delay.as_millis(), "mixer: delaying item");
68
69        this.heap
70            .push(Reverse(DelayedData::from((Instant::now() + random_delay, item))));
71
72        #[cfg(all(feature = "telemetry", not(test)))]
73        {
74            METRIC_QUEUE_SIZE.increment(1.0f64);
75
76            let weight = 1.0f64 / this.cfg.metric_delay_window as f64;
77            METRIC_MIXER_AVERAGE_DELAY.set(
78                (weight * random_delay.as_millis() as f64) + ((1.0f64 - weight) * METRIC_MIXER_AVERAGE_DELAY.get()),
79            );
80        }
81
82        Ok(())
83    }
84
85    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86        let this = Pin::into_inner(self);
87
88        loop {
89            let now = Instant::now();
90
91            while this.heap.peek().is_some_and(|x| x.0.release_at <= now) {
92                let item = this.heap.pop().unwrap().0.item;
93
94                match Pin::new(&mut this.inner).poll_ready(cx) {
95                    Poll::Ready(Ok(())) => {}
96                    Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
97                    Poll::Pending => {
98                        this.heap.push(Reverse(DelayedData::from((now, item))));
99                        break;
100                    }
101                }
102
103                if let Err(e) = Pin::new(&mut this.inner).start_send(item) {
104                    return Poll::Ready(Err(e));
105                }
106
107                #[cfg(all(feature = "telemetry", not(test)))]
108                METRIC_QUEUE_SIZE.decrement(1.0f64);
109            }
110
111            futures::ready!(Pin::new(&mut this.inner).poll_flush(cx))?;
112
113            if this.heap.is_empty() {
114                return Poll::Ready(Ok(()));
115            }
116
117            let next_release = this.heap.peek().unwrap().0.release_at;
118            let sleep_for = next_release.saturating_duration_since(now);
119
120            if sleep_for.is_zero() {
121                // Reachable only because inner.poll_ready returned Pending earlier in
122                // this poll (we pushed the item back with release_at = now). The inner
123                // sink already registered its waker via that poll_ready call, and any
124                // items it had buffered were flushed by the poll_flush call above —
125                // delegate to the lower sink's wake-up signal and yield.
126                return Poll::Pending;
127            }
128
129            this.timer.reset(sleep_for);
130            // Park until the timer fires; cx.waker() is registered inside poll_unpin.
131            futures::ready!(this.timer.poll_unpin(cx));
132        }
133    }
134
135    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
136        futures::ready!(self.as_mut().poll_flush(cx))?;
137        Pin::new(&mut Pin::into_inner(self).inner).poll_close(cx)
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use std::time::Duration;
144
145    use futures::{SinkExt, StreamExt, channel::mpsc};
146    use tokio::time::timeout;
147
148    use super::*;
149
150    const LEEWAY: Duration = Duration::from_millis(200);
151
152    #[tokio::test]
153    async fn items_are_forwarded_after_delay() {
154        let (tx, mut rx) = mpsc::channel::<u32>(100);
155        let cfg = MixerConfig {
156            min_delay: Duration::from_millis(50),
157            delay_range: Duration::from_millis(10),
158            capacity: 16,
159            metric_delay_window: 100,
160        };
161
162        let mut sink = MixerSink::new(tx, cfg);
163
164        sink.send(42u32).await.unwrap();
165
166        let item = timeout(Duration::from_millis(60) + LEEWAY, rx.next())
167            .await
168            .expect("should receive within leeway")
169            .unwrap();
170
171        assert_eq!(item, 42);
172    }
173
174    #[tokio::test]
175    async fn all_items_are_forwarded_with_zero_delay() {
176        let (tx, mut rx) = mpsc::channel::<u32>(100);
177        let cfg = MixerConfig {
178            min_delay: Duration::from_millis(0),
179            delay_range: Duration::from_millis(0),
180            capacity: 16,
181            metric_delay_window: 100,
182        };
183
184        let mut sink = MixerSink::new(tx, cfg);
185
186        for i in 0u32..5 {
187            sink.start_send_unpin(i).unwrap();
188        }
189        sink.flush().await.unwrap();
190
191        let mut received = Vec::new();
192        while let Ok(Some(item)) = timeout(Duration::from_millis(10), rx.next()).await {
193            received.push(item);
194        }
195
196        assert_eq!(received.len(), 5, "all items should be forwarded");
197    }
198
199    #[tokio::test]
200    async fn clone_starts_with_empty_heap() {
201        let (tx, mut rx) = mpsc::channel::<u32>(100);
202        let cfg = MixerConfig {
203            min_delay: Duration::from_millis(50),
204            delay_range: Duration::from_millis(10),
205            capacity: 16,
206            metric_delay_window: 100,
207        };
208
209        let mut sink = MixerSink::new(tx, cfg);
210        sink.start_send_unpin(1u32).unwrap();
211
212        let mut cloned = sink.clone();
213        cloned.flush().await.unwrap();
214
215        sink.flush().await.unwrap();
216
217        let item = timeout(Duration::from_millis(60) + LEEWAY, rx.next())
218            .await
219            .expect("original item should arrive")
220            .unwrap();
221        assert_eq!(item, 1);
222
223        assert!(timeout(Duration::from_millis(10), rx.next()).await.is_err());
224    }
225
226    /// Regression test for the `poll_flush` busy-spin bug.
227    ///
228    /// When `inner.poll_ready` returns `Poll::Pending` (inner sink is full) and
229    /// `inner.poll_flush` returns `Poll::Ready(Ok(()))` (channels are no-op flush),
230    /// the outer loop must return `Poll::Pending` instead of spinning.
231    ///
232    /// If this test hangs, the busy-spin bug has been re-introduced: the executor
233    /// thread is pegged and the timeout future can never fire.
234    #[tokio::test]
235    async fn poll_flush_returns_pending_not_spin_when_inner_full() {
236        use std::convert::Infallible;
237
238        use futures::Sink;
239
240        struct AlwaysFullNoOpFlushSink;
241
242        impl Sink<u32> for AlwaysFullNoOpFlushSink {
243            type Error = Infallible;
244
245            fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
246                Poll::Pending // always full — simulates a saturated mpsc channel
247            }
248
249            fn start_send(self: Pin<&mut Self>, _: u32) -> Result<(), Infallible> {
250                unreachable!("start_send must not be called when poll_ready returns Pending")
251            }
252
253            fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
254                Poll::Ready(Ok(())) // no-op flush — identical to futures::channel::mpsc
255            }
256
257            fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
258                Poll::Ready(Ok(()))
259            }
260        }
261
262        let cfg = MixerConfig {
263            min_delay: Duration::from_millis(0),
264            delay_range: Duration::from_millis(0),
265            capacity: 16,
266            metric_delay_window: 100,
267        };
268        let mut sink = MixerSink::new(AlwaysFullNoOpFlushSink, cfg);
269        sink.start_send_unpin(42u32).unwrap();
270
271        // flush() must return Poll::Pending (inner not ready), allowing the timeout to fire.
272        // If the spin bug is present, poll_flush loops forever on the single-threaded
273        // executor and the timeout can never be polled — the test hangs.
274        let result = timeout(Duration::from_millis(50), sink.flush()).await;
275        assert!(
276            result.is_err(),
277            "flush should have returned Pending (inner is full) and triggered the timeout, but it completed — inner \
278             should not have become ready"
279        );
280    }
281}