1use std::{
2 cmp::Reverse,
3 collections::BinaryHeap,
4 future::poll_fn,
5 sync::{
6 Arc,
7 atomic::{AtomicBool, AtomicUsize, Ordering},
8 },
9 task::Poll,
10 time::Duration,
11};
12
13use futures::{FutureExt, Stream, StreamExt};
14use futures_timer::Delay;
15use parking_lot::Mutex;
16use tracing::trace;
17
18use crate::{config::MixerConfig, data::DelayedData};
19
20#[cfg(all(feature = "telemetry", not(test)))]
21lazy_static::lazy_static! {
22 pub static ref METRIC_QUEUE_SIZE: hopr_types::telemetry::SimpleGauge =
23 hopr_types::telemetry::SimpleGauge::new("hopr_mixer_queue_size", "Current mixer queue size").unwrap();
24 pub static ref METRIC_MIXER_AVERAGE_DELAY: hopr_types::telemetry::SimpleGauge = hopr_types::telemetry::SimpleGauge::new(
25 "hopr_mixer_average_packet_delay",
26 "Average mixer packet delay averaged over a packet window"
27 )
28 .unwrap();
29}
30
31struct Channel<T> {
50 buffer: BinaryHeap<Reverse<DelayedData<T>>>,
52 waker: Option<std::task::Waker>,
53 cfg: MixerConfig,
54}
55
56struct TrackedChannel<T> {
58 channel: Arc<Mutex<Channel<T>>>,
59 sender_count: Arc<AtomicUsize>,
60 receiver_active: Arc<AtomicBool>,
61}
62
63impl<T> Clone for TrackedChannel<T> {
64 fn clone(&self) -> Self {
65 Self {
66 channel: self.channel.clone(),
67 sender_count: self.sender_count.clone(),
68 receiver_active: self.receiver_active.clone(),
69 }
70 }
71}
72
73#[derive(Clone, Debug, thiserror::Error)]
75pub enum SenderError {
76 #[error("Channel is closed")]
78 Closed,
79}
80
81pub struct Sender<T> {
83 channel: TrackedChannel<T>,
84}
85
86impl<T> Clone for Sender<T> {
87 fn clone(&self) -> Self {
88 let channel = self.channel.clone();
89 channel.sender_count.fetch_add(1, Ordering::Relaxed);
90
91 Sender { channel }
92 }
93}
94
95impl<T> Drop for Sender<T> {
96 fn drop(&mut self) {
97 if self.channel.sender_count.fetch_sub(1, Ordering::Relaxed) == 1
98 && !self.channel.receiver_active.load(Ordering::Relaxed)
99 {
100 self.channel.channel.lock().waker = None;
101 }
102 }
103}
104
105impl<T> Sender<T> {
106 pub fn send(&self, item: T) -> Result<(), SenderError> {
108 self.push_item(item)
109 }
110
111 #[tracing::instrument(level = "trace", skip(self, item))]
113 fn push_item(&self, item: T) -> Result<(), SenderError> {
114 if !self.channel.receiver_active.load(Ordering::Relaxed) {
115 return Err(SenderError::Closed);
116 }
117
118 let mut channel = self.channel.channel.lock();
119
120 let random_delay = channel.cfg.random_delay();
121
122 trace!(delay_in_ms = random_delay.as_millis(), "generated mixer delay",);
123
124 let delayed_data: DelayedData<T> = (std::time::Instant::now() + random_delay, item).into();
125 channel.buffer.push(Reverse(delayed_data));
126
127 if let Some(waker) = channel.waker.as_ref() {
128 waker.wake_by_ref();
129 }
130
131 #[cfg(all(feature = "telemetry", not(test)))]
132 {
133 METRIC_QUEUE_SIZE.increment(1.0f64);
134
135 let weight = 1.0f64 / channel.cfg.metric_delay_window as f64;
136 METRIC_MIXER_AVERAGE_DELAY.set(
137 (weight * random_delay.as_millis() as f64) + ((1.0f64 - weight) * METRIC_MIXER_AVERAGE_DELAY.get()),
138 );
139 }
140
141 Ok(())
142 }
143}
144
145impl<T> futures::sink::Sink<T> for Sender<T> {
146 type Error = SenderError;
147
148 fn poll_ready(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
149 let is_active = self.channel.receiver_active.load(Ordering::Relaxed);
150 if is_active {
151 Poll::Ready(Ok(()))
152 } else {
153 Poll::Ready(Err(SenderError::Closed))
154 }
155 }
156
157 fn start_send(self: std::pin::Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
158 self.push_item(item)
159 }
160
161 fn poll_flush(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
162 Poll::Ready(Ok(()))
163 }
164
165 fn poll_close(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
166 Poll::Ready(Ok(()))
168 }
169}
170
171#[derive(Debug, thiserror::Error)]
173pub enum ReceiverError {
174 #[error("Channel is closed")]
176 Closed,
177}
178
179pub struct Receiver<T> {
188 channel: TrackedChannel<T>,
189 timer: Delay,
190}
191
192impl<T> Stream for Receiver<T> {
193 type Item = T;
194
195 #[tracing::instrument(level = "trace", skip(self, cx))]
196 fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
197 let now = std::time::Instant::now();
198 let no_senders = self.channel.sender_count.load(Ordering::Relaxed) == 0;
199
200 let sleep_for = {
208 let mut channel = self.channel.channel.lock();
209
210 if channel.buffer.peek().map(|x| x.0.release_at < now).unwrap_or(false) {
211 let data = channel
212 .buffer
213 .pop()
214 .expect("value must be present during the same locked access")
215 .0
216 .item;
217
218 trace!(from = "direct", "yield item");
219
220 #[cfg(all(feature = "telemetry", not(test)))]
221 METRIC_QUEUE_SIZE.decrement(1.0f64);
222
223 return Poll::Ready(Some(data));
224 }
225
226 if channel.buffer.is_empty() && no_senders {
228 drop(channel);
230 self.channel.receiver_active.store(false, Ordering::Relaxed);
231 return Poll::Ready(None);
232 }
233
234 match channel.waker.as_mut() {
235 Some(existing) => existing.clone_from(cx.waker()),
236 None => channel.waker = Some(cx.waker().clone()),
237 }
238
239 match channel.buffer.peek() {
240 Some(next) => next.0.release_at.duration_since(now),
241 None => {
242 trace!(from = "direct", "pending (empty buffer)");
245 return Poll::Pending;
246 }
247 }
248 };
249
250 let this = self.get_mut();
252 trace!("resetting the timer");
253 this.timer.reset(sleep_for);
254 futures::ready!(this.timer.poll_unpin(cx));
255
256 let mut channel = this.channel.channel.lock();
262 match channel.buffer.pop() {
263 Some(entry) => {
264 trace!(from = "timer", "yield item");
265
266 #[cfg(all(feature = "telemetry", not(test)))]
267 METRIC_QUEUE_SIZE.decrement(1.0f64);
268
269 Poll::Ready(Some(entry.0.item))
270 }
271 None => {
272 trace!(from = "timer", "buffer drained before we re-acquired the lock");
273 Poll::Pending
274 }
275 }
276 }
277}
278
279impl<T> Receiver<T> {
280 pub async fn recv(&mut self) -> Option<T> {
282 poll_fn(|cx| self.poll_next_unpin(cx)).await
283 }
284}
285
286pub fn channel<T>(cfg: crate::config::MixerConfig) -> (Sender<T>, Receiver<T>) {
288 #[cfg(all(feature = "telemetry", not(test)))]
289 {
290 lazy_static::initialize(&METRIC_QUEUE_SIZE);
292 lazy_static::initialize(&METRIC_MIXER_AVERAGE_DELAY);
293 }
294
295 let mut buffer = BinaryHeap::new();
296 buffer.reserve(cfg.capacity);
297
298 let channel = TrackedChannel {
299 channel: Arc::new(Mutex::new(Channel::<T> {
300 buffer,
301 waker: None,
302 cfg,
303 })),
304 sender_count: Arc::new(AtomicUsize::new(1)),
305 receiver_active: Arc::new(AtomicBool::new(true)),
306 };
307 (
308 Sender {
309 channel: channel.clone(),
310 },
311 Receiver {
312 channel,
313 timer: Delay::new(Duration::from_secs(0)),
314 },
315 )
316}
317
318#[cfg(test)]
319mod tests {
320 use futures::{SinkExt, StreamExt};
321 use tokio::time::timeout;
322
323 use super::*;
324
325 const PROCESSING_LEEWAY: Duration = Duration::from_millis(250);
326 const MAXIMUM_SINGLE_DELAY_DURATION: Duration = Duration::from_millis(
327 crate::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS + crate::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS,
328 );
329
330 #[tokio::test]
331 async fn mixer_channel_should_pass_an_element() -> anyhow::Result<()> {
332 let (tx, mut rx) = channel(MixerConfig::default());
333 tx.send(1)?;
334 assert_eq!(rx.recv().await, Some(1));
335
336 Ok(())
337 }
338
339 #[tokio::test]
340 async fn mixer_channel_should_introduce_random_delay() -> anyhow::Result<()> {
341 let start = std::time::SystemTime::now();
342
343 let (tx, mut rx) = channel(MixerConfig::default());
344 tx.send(1)?;
345 assert_eq!(rx.recv().await, Some(1));
346
347 let elapsed = start.elapsed()?;
348
349 assert!(elapsed < MAXIMUM_SINGLE_DELAY_DURATION + PROCESSING_LEEWAY);
350 assert!(elapsed > Duration::from_millis(crate::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS));
351 Ok(())
352 }
353
354 #[tokio::test]
355 async fn mixer_channel_should_batch_on_sending_emulating_concurrency() -> anyhow::Result<()> {
357 const ITERATIONS: usize = 10;
358
359 let (tx, mut rx) = channel(MixerConfig::default());
360
361 let start = std::time::SystemTime::now();
362
363 for i in 0..ITERATIONS {
364 tx.send(i)?;
365 }
366 for _ in 0..ITERATIONS {
367 let data = timeout(MAXIMUM_SINGLE_DELAY_DURATION, rx.next()).await?;
368 assert!(data.is_some());
369 }
370
371 let elapsed = start.elapsed()?;
372
373 assert!(elapsed < MAXIMUM_SINGLE_DELAY_DURATION + PROCESSING_LEEWAY);
374 assert!(elapsed > Duration::from_millis(crate::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS));
375 Ok(())
376 }
377
378 #[tokio::test]
379 async fn mixer_channel_should_work_concurrently_and_properly_closed_channels() -> anyhow::Result<()> {
381 const ITERATIONS: usize = 1000;
382
383 let (tx, mut rx) = channel(MixerConfig::default());
384
385 let recv_task = tokio::task::spawn(async move {
386 while let Some(_item) = timeout(2 * MAXIMUM_SINGLE_DELAY_DURATION, rx.next())
387 .await
388 .expect("receiver should not fail")
389 {}
390 });
391
392 let send_task =
393 tokio::task::spawn(async move { futures::stream::iter(0..ITERATIONS).map(Ok).forward(tx).await });
394
395 let (_recv, send) = futures::try_join!(
396 timeout(MAXIMUM_SINGLE_DELAY_DURATION, recv_task),
397 timeout(MAXIMUM_SINGLE_DELAY_DURATION, send_task)
398 )?;
399
400 send??;
401
402 Ok(())
403 }
404
405 #[tokio::test]
406 async fn mixer_channel_should_produce_mixed_output_from_the_supplied_input_using_sync_send() -> anyhow::Result<()> {
408 const ITERATIONS: usize = 20; let (tx, rx) = channel(MixerConfig::default());
411
412 let input = (0..ITERATIONS).collect::<Vec<_>>();
413
414 for i in input.iter() {
415 tx.send(*i)?;
416 }
417
418 let mixed_output = timeout(
419 2 * MAXIMUM_SINGLE_DELAY_DURATION,
420 rx.take(ITERATIONS).collect::<Vec<_>>(),
421 )
422 .await?;
423
424 tracing::info!(?input, ?mixed_output, "asserted data");
425 assert_ne!(input, mixed_output);
426 Ok(())
427 }
428
429 #[tokio::test]
430 async fn mixer_channel_should_produce_mixed_output_from_the_supplied_input_using_async_send() -> anyhow::Result<()>
432 {
433 const ITERATIONS: usize = 20; let (mut tx, rx) = channel(MixerConfig::default());
436
437 let input = (0..ITERATIONS).collect::<Vec<_>>();
438
439 for i in input.iter() {
440 SinkExt::send(&mut tx, *i).await?;
441 }
442
443 let mixed_output = timeout(
444 2 * MAXIMUM_SINGLE_DELAY_DURATION,
445 rx.take(ITERATIONS).collect::<Vec<_>>(),
446 )
447 .await?;
448
449 tracing::info!(?input, ?mixed_output, "asserted data");
450 assert_ne!(input, mixed_output);
451 Ok(())
452 }
453
454 #[tokio::test]
455 async fn mixer_channel_should_produce_mixed_output_from_the_supplied_input_using_async_feed() -> anyhow::Result<()>
457 {
458 const ITERATIONS: usize = 20; let (mut tx, rx) = channel(MixerConfig::default());
461
462 let input = (0..ITERATIONS).collect::<Vec<_>>();
463
464 for i in input.iter() {
465 SinkExt::feed(&mut tx, *i).await?;
466 }
467 SinkExt::flush(&mut tx).await?;
468
469 let mixed_output = timeout(
470 2 * MAXIMUM_SINGLE_DELAY_DURATION,
471 rx.take(ITERATIONS).collect::<Vec<_>>(),
472 )
473 .await?;
474
475 tracing::info!(?input, ?mixed_output, "asserted data");
476 assert_ne!(input, mixed_output);
477 Ok(())
478 }
479
480 #[tokio::test]
481 async fn mixer_channel_should_not_mix_the_order_if_the_min_delay_and_delay_range_is_0() -> anyhow::Result<()> {
483 const ITERATIONS: usize = 40; let (tx, rx) = channel(MixerConfig {
486 min_delay: Duration::from_millis(0),
487 delay_range: Duration::from_millis(0),
488 ..MixerConfig::default()
489 });
490
491 let input = (0..ITERATIONS).collect::<Vec<_>>();
492
493 for i in input.iter() {
494 tx.send(*i)?;
495 tokio::time::sleep(std::time::Duration::from_micros(10)).await; }
497
498 let mixed_output = timeout(
499 2 * MAXIMUM_SINGLE_DELAY_DURATION,
500 rx.take(ITERATIONS).collect::<Vec<_>>(),
501 )
502 .await?;
503
504 tracing::info!(?input, ?mixed_output, "asserted data");
505 assert_eq!(input, mixed_output);
506
507 Ok(())
508 }
509
510 #[tokio::test]
511 async fn sender_should_return_closed_when_receiver_inactive() -> anyhow::Result<()> {
512 let (tx, _rx) = channel::<i32>(MixerConfig::default());
513
514 tx.channel.receiver_active.store(false, Ordering::Relaxed);
517
518 let result = tx.send(42);
519 assert!(
520 matches!(result, Err(SenderError::Closed)),
521 "send with inactive receiver should return Closed, got: {result:?}"
522 );
523 Ok(())
524 }
525
526 #[tokio::test]
530 async fn sender_can_push_while_receiver_is_parked_on_timer() -> anyhow::Result<()> {
531 let cfg = MixerConfig {
534 min_delay: Duration::from_millis(500),
535 delay_range: Duration::from_millis(1),
536 ..MixerConfig::default()
537 };
538 let (tx, mut rx) = channel::<u32>(cfg);
539
540 tx.send(0)?;
542
543 let rx_task = tokio::task::spawn(async move {
547 let first = rx.next().await;
549 let second = rx.next().await;
550 (first, second)
551 });
552
553 tokio::time::sleep(Duration::from_millis(50)).await;
557
558 let send_started = std::time::Instant::now();
561 tx.send(1)?;
562 let send_latency = send_started.elapsed();
563 assert!(
564 send_latency < Duration::from_millis(100),
565 "sender was blocked by receiver's timer poll: send took {send_latency:?}"
566 );
567
568 let (first, second) = timeout(Duration::from_millis(1500), rx_task)
570 .await?
571 .expect("receiver task should finish");
572 assert!(first.is_some(), "first item must be received");
573 assert!(second.is_some(), "second item must be received");
574 Ok(())
575 }
576
577 #[tokio::test]
581 async fn receiver_drains_buffered_items_after_last_sender_drops() -> anyhow::Result<()> {
582 let cfg = MixerConfig {
583 min_delay: Duration::from_millis(0),
584 delay_range: Duration::from_millis(1),
585 ..MixerConfig::default()
586 };
587 let (tx, mut rx) = channel::<u32>(cfg);
588
589 const ITERATIONS: usize = 16;
590 for i in 0..ITERATIONS as u32 {
591 tx.send(i)?;
592 }
593
594 drop(tx);
596
597 let mut received = 0usize;
599 while let Some(_item) = timeout(Duration::from_millis(500), rx.next()).await? {
600 received += 1;
601 }
602 assert_eq!(
603 received,
604 ITERATIONS,
605 "receiver dropped {} pending items on sender shutdown",
606 ITERATIONS - received
607 );
608 Ok(())
609 }
610
611 #[tokio::test]
621 async fn sender_clones_share_heap() -> anyhow::Result<()> {
622 let cfg = MixerConfig {
623 min_delay: Duration::from_millis(50),
624 delay_range: Duration::from_millis(1),
625 ..MixerConfig::default()
626 };
627 let (tx_a, mut rx) = channel::<u32>(cfg);
628 let tx_b = tx_a.clone();
629
630 tx_a.send(1)?;
631 tx_b.send(2)?;
632 drop(tx_a);
633 drop(tx_b);
634
635 let mut got = vec![
637 timeout(MAXIMUM_SINGLE_DELAY_DURATION + PROCESSING_LEEWAY, rx.next())
638 .await?
639 .expect("first item"),
640 timeout(MAXIMUM_SINGLE_DELAY_DURATION + PROCESSING_LEEWAY, rx.next())
641 .await?
642 .expect("second item"),
643 ];
644 got.sort();
645 assert_eq!(got, vec![1, 2]);
646 assert!(rx.next().await.is_none(), "expected channel closed with no more items");
647 Ok(())
648 }
649}