1use std::{
2 cmp::Reverse,
3 collections::BinaryHeap,
4 pin::Pin,
5 task::{Context, Poll},
6 time::{Duration, Instant},
7};
8
9use futures::{FutureExt, Sink};
10use futures_timer::Delay;
11use tracing::trace;
12
13#[cfg(all(feature = "telemetry", not(test)))]
14use crate::channel::{METRIC_MIXER_AVERAGE_DELAY, METRIC_QUEUE_SIZE};
15use crate::{config::MixerConfig, data::DelayedData};
16
17pub struct MixerSink<S, T> {
27 inner: S,
28 heap: BinaryHeap<Reverse<DelayedData<T>>>,
29 timer: Delay,
30 cfg: MixerConfig,
31}
32
33impl<S, T> MixerSink<S, T> {
34 pub fn new(inner: S, cfg: MixerConfig) -> Self {
35 let mut heap = BinaryHeap::new();
36 heap.reserve(cfg.capacity);
37 Self {
38 inner,
39 heap,
40 timer: Delay::new(Duration::ZERO),
41 cfg,
42 }
43 }
44}
45
46impl<S: Clone, T> Clone for MixerSink<S, T> {
47 fn clone(&self) -> Self {
48 Self::new(self.inner.clone(), self.cfg)
49 }
50}
51
52impl<S, T> Sink<T> for MixerSink<S, T>
53where
54 S: Sink<T> + Unpin,
55 T: Unpin,
56{
57 type Error = S::Error;
58
59 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
60 Poll::Ready(Ok(()))
61 }
62
63 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
64 let this = Pin::into_inner(self);
65 let random_delay = this.cfg.random_delay();
66
67 trace!(delay_ms = random_delay.as_millis(), "mixer: delaying item");
68
69 this.heap
70 .push(Reverse(DelayedData::from((Instant::now() + random_delay, item))));
71
72 #[cfg(all(feature = "telemetry", not(test)))]
73 {
74 METRIC_QUEUE_SIZE.increment(1.0f64);
75
76 let weight = 1.0f64 / this.cfg.metric_delay_window as f64;
77 METRIC_MIXER_AVERAGE_DELAY.set(
78 (weight * random_delay.as_millis() as f64) + ((1.0f64 - weight) * METRIC_MIXER_AVERAGE_DELAY.get()),
79 );
80 }
81
82 Ok(())
83 }
84
85 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86 let this = Pin::into_inner(self);
87
88 loop {
89 let now = Instant::now();
90
91 while this.heap.peek().is_some_and(|x| x.0.release_at <= now) {
92 let item = this.heap.pop().unwrap().0.item;
93
94 match Pin::new(&mut this.inner).poll_ready(cx) {
95 Poll::Ready(Ok(())) => {}
96 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
97 Poll::Pending => {
98 this.heap.push(Reverse(DelayedData::from((now, item))));
99 break;
100 }
101 }
102
103 if let Err(e) = Pin::new(&mut this.inner).start_send(item) {
104 return Poll::Ready(Err(e));
105 }
106
107 #[cfg(all(feature = "telemetry", not(test)))]
108 METRIC_QUEUE_SIZE.decrement(1.0f64);
109 }
110
111 futures::ready!(Pin::new(&mut this.inner).poll_flush(cx))?;
112
113 if this.heap.is_empty() {
114 return Poll::Ready(Ok(()));
115 }
116
117 let next_release = this.heap.peek().unwrap().0.release_at;
118 let sleep_for = next_release.saturating_duration_since(now);
119
120 if sleep_for.is_zero() {
121 return Poll::Pending;
127 }
128
129 this.timer.reset(sleep_for);
130 futures::ready!(this.timer.poll_unpin(cx));
132 }
133 }
134
135 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
136 futures::ready!(self.as_mut().poll_flush(cx))?;
137 Pin::new(&mut Pin::into_inner(self).inner).poll_close(cx)
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use std::time::Duration;
144
145 use futures::{SinkExt, StreamExt, channel::mpsc};
146 use tokio::time::timeout;
147
148 use super::*;
149
150 const LEEWAY: Duration = Duration::from_millis(200);
151
152 #[tokio::test]
153 async fn items_are_forwarded_after_delay() {
154 let (tx, mut rx) = mpsc::channel::<u32>(100);
155 let cfg = MixerConfig {
156 min_delay: Duration::from_millis(50),
157 delay_range: Duration::from_millis(10),
158 capacity: 16,
159 metric_delay_window: 100,
160 };
161
162 let mut sink = MixerSink::new(tx, cfg);
163
164 sink.send(42u32).await.unwrap();
165
166 let item = timeout(Duration::from_millis(60) + LEEWAY, rx.next())
167 .await
168 .expect("should receive within leeway")
169 .unwrap();
170
171 assert_eq!(item, 42);
172 }
173
174 #[tokio::test]
175 async fn all_items_are_forwarded_with_zero_delay() {
176 let (tx, mut rx) = mpsc::channel::<u32>(100);
177 let cfg = MixerConfig {
178 min_delay: Duration::from_millis(0),
179 delay_range: Duration::from_millis(0),
180 capacity: 16,
181 metric_delay_window: 100,
182 };
183
184 let mut sink = MixerSink::new(tx, cfg);
185
186 for i in 0u32..5 {
187 sink.start_send_unpin(i).unwrap();
188 }
189 sink.flush().await.unwrap();
190
191 let mut received = Vec::new();
192 while let Ok(Some(item)) = timeout(Duration::from_millis(10), rx.next()).await {
193 received.push(item);
194 }
195
196 assert_eq!(received.len(), 5, "all items should be forwarded");
197 }
198
199 #[tokio::test]
200 async fn clone_starts_with_empty_heap() {
201 let (tx, mut rx) = mpsc::channel::<u32>(100);
202 let cfg = MixerConfig {
203 min_delay: Duration::from_millis(50),
204 delay_range: Duration::from_millis(10),
205 capacity: 16,
206 metric_delay_window: 100,
207 };
208
209 let mut sink = MixerSink::new(tx, cfg);
210 sink.start_send_unpin(1u32).unwrap();
211
212 let mut cloned = sink.clone();
213 cloned.flush().await.unwrap();
214
215 sink.flush().await.unwrap();
216
217 let item = timeout(Duration::from_millis(60) + LEEWAY, rx.next())
218 .await
219 .expect("original item should arrive")
220 .unwrap();
221 assert_eq!(item, 1);
222
223 assert!(timeout(Duration::from_millis(10), rx.next()).await.is_err());
224 }
225
226 #[tokio::test]
235 async fn poll_flush_returns_pending_not_spin_when_inner_full() {
236 use std::convert::Infallible;
237
238 use futures::Sink;
239
240 struct AlwaysFullNoOpFlushSink;
241
242 impl Sink<u32> for AlwaysFullNoOpFlushSink {
243 type Error = Infallible;
244
245 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
246 Poll::Pending }
248
249 fn start_send(self: Pin<&mut Self>, _: u32) -> Result<(), Infallible> {
250 unreachable!("start_send must not be called when poll_ready returns Pending")
251 }
252
253 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
254 Poll::Ready(Ok(())) }
256
257 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Infallible>> {
258 Poll::Ready(Ok(()))
259 }
260 }
261
262 let cfg = MixerConfig {
263 min_delay: Duration::from_millis(0),
264 delay_range: Duration::from_millis(0),
265 capacity: 16,
266 metric_delay_window: 100,
267 };
268 let mut sink = MixerSink::new(AlwaysFullNoOpFlushSink, cfg);
269 sink.start_send_unpin(42u32).unwrap();
270
271 let result = timeout(Duration::from_millis(50), sink.flush()).await;
275 assert!(
276 result.is_err(),
277 "flush should have returned Pending (inner is full) and triggered the timeout, but it completed — inner \
278 should not have become ready"
279 );
280 }
281}