hopr_network_types/
utils.rs

1use std::{
2    fmt::{Debug, Display, Formatter},
3    hash::{Hash, Hasher},
4    net::SocketAddr,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use futures::io::{AsyncRead, AsyncWrite};
10
11/// Joins [`futures::AsyncRead`] and [`futures::AsyncWrite`] into a single object.
12#[pin_project::pin_project]
13pub struct DuplexIO<W, R>(#[pin] pub W, #[pin] pub R);
14
15impl<R, W> From<(W, R)> for DuplexIO<W, R>
16where
17    R: AsyncRead,
18    W: AsyncWrite,
19{
20    fn from(value: (W, R)) -> Self {
21        Self(value.0, value.1)
22    }
23}
24
25impl<R, W> AsyncRead for DuplexIO<W, R>
26where
27    R: AsyncRead,
28    W: AsyncWrite,
29{
30    fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
31        self.project().1.poll_read(cx, buf)
32    }
33}
34
35impl<R, W> AsyncWrite for DuplexIO<W, R>
36where
37    R: AsyncRead,
38    W: AsyncWrite,
39{
40    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
41        self.project().0.poll_write(cx, buf)
42    }
43
44    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
45        self.project().0.poll_flush(cx)
46    }
47
48    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
49        self.project().0.poll_close(cx)
50    }
51}
52
53// IPv6 + ':' + 65535 = 45 + 1 + 5
54const SOCKET_ADDRESS_MAX_LEN: usize = 52;
55
56/// Caches the string representation of a SocketAddr for fast conversion to `&str`
57#[derive(Copy, Clone)]
58pub(crate) struct SocketAddrStr(SocketAddr, arrayvec::ArrayString<SOCKET_ADDRESS_MAX_LEN>);
59
60impl SocketAddrStr {
61    #[allow(dead_code)]
62    pub fn as_str(&self) -> &str {
63        self.1.as_str()
64    }
65}
66
67impl AsRef<SocketAddr> for SocketAddrStr {
68    fn as_ref(&self) -> &SocketAddr {
69        &self.0
70    }
71}
72
73impl From<SocketAddr> for SocketAddrStr {
74    fn from(value: SocketAddr) -> Self {
75        let mut cached = value.to_string();
76        cached.truncate(SOCKET_ADDRESS_MAX_LEN);
77        Self(value, cached.parse().expect("cannot fail due to truncation"))
78    }
79}
80
81impl PartialEq for SocketAddrStr {
82    fn eq(&self, other: &Self) -> bool {
83        self.0 == other.0
84    }
85}
86
87impl Eq for SocketAddrStr {}
88
89impl Debug for SocketAddrStr {
90    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91        write!(f, "{}", self.1)
92    }
93}
94
95impl Display for SocketAddrStr {
96    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
97        write!(f, "{}", self.1)
98    }
99}
100
101impl PartialEq<SocketAddrStr> for SocketAddr {
102    fn eq(&self, other: &SocketAddrStr) -> bool {
103        self.eq(&other.0)
104    }
105}
106
107impl Hash for SocketAddrStr {
108    fn hash<H: Hasher>(&self, state: &mut H) {
109        self.0.hash(state);
110    }
111}
112
113#[cfg(feature = "runtime-tokio")]
114pub use tokio_utils::{copy_duplex, copy_duplex_abortable};
115
116#[cfg(feature = "runtime-tokio")]
117mod tokio_utils {
118    use futures::{
119        FutureExt,
120        future::{AbortHandle, Abortable},
121    };
122    use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
123
124    use super::*;
125
126    #[derive(Debug)]
127    enum TransferState {
128        Running(CopyBuffer),
129        ShuttingDown(u64),
130        Done(u64),
131    }
132
133    fn transfer_one_direction<A, B>(
134        cx: &mut Context<'_>,
135        state: &mut TransferState,
136        r: &mut A,
137        w: &mut B,
138    ) -> Poll<std::io::Result<u64>>
139    where
140        A: AsyncRead + AsyncWrite + Unpin + ?Sized,
141        B: AsyncRead + AsyncWrite + Unpin + ?Sized,
142    {
143        let mut r = Pin::new(r);
144        let mut w = Pin::new(w);
145        loop {
146            match state {
147                TransferState::Running(buf) => {
148                    let count = std::task::ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?;
149                    tracing::trace!(processed = count, "direction copy complete");
150                    *state = TransferState::ShuttingDown(count);
151                }
152                TransferState::ShuttingDown(count) => {
153                    std::task::ready!(w.as_mut().poll_shutdown(cx))?;
154                    tracing::trace!(processed = *count, "direction shutdown complete");
155                    *state = TransferState::Done(*count);
156                }
157                TransferState::Done(count) => return Poll::Ready(Ok(*count)),
158            }
159        }
160    }
161
162    /// This is a proper re-implementation of Tokio's
163    /// [`copy_bidirectional_with_sizes`](tokio::io::copy_bidirectional_with_sizes), which does not leave the stream
164    /// in half-open-state when one side closes read or write side.
165    ///
166    /// Instead, if either side encounters and empty
167    /// read (EOF indication), the write-side is closed as well and vice versa.
168    pub async fn copy_duplex<A, B>(
169        a: &mut A,
170        b: &mut B,
171        (a_to_b_buffer_size, b_to_a_buffer_size): (usize, usize),
172    ) -> std::io::Result<(u64, u64)>
173    where
174        A: AsyncRead + AsyncWrite + Unpin + ?Sized,
175        B: AsyncRead + AsyncWrite + Unpin + ?Sized,
176    {
177        let (_, ar_a) = AbortHandle::new_pair();
178        let (_, ar_b) = AbortHandle::new_pair();
179
180        copy_duplex_abortable(a, b, (a_to_b_buffer_size, b_to_a_buffer_size), (ar_a, ar_b)).await
181    }
182
183    /// Variant of [`copy_duplex`] with an option to abort either side early using the given
184    /// [`AbortRegistrations`](futures::future::AbortRegistration).
185    ///
186    /// Once a side is aborted, its proper shutdown is initiated, and once done, the other side's
187    /// shutdown is also initiated.
188    /// The difference between the two abort handles is only in the order - which side gets shutdown
189    /// first after the abort is called.
190    pub async fn copy_duplex_abortable<A, B>(
191        a: &mut A,
192        b: &mut B,
193        (a_to_b_buffer_size, b_to_a_buffer_size): (usize, usize),
194        (a_abort, b_abort): (futures::future::AbortRegistration, futures::future::AbortRegistration),
195    ) -> std::io::Result<(u64, u64)>
196    where
197        A: AsyncRead + AsyncWrite + Unpin + ?Sized,
198        B: AsyncRead + AsyncWrite + Unpin + ?Sized,
199    {
200        let mut a_to_b = TransferState::Running(CopyBuffer::new(a_to_b_buffer_size));
201        let mut b_to_a = TransferState::Running(CopyBuffer::new(b_to_a_buffer_size));
202
203        // Abort futures are fused: once aborted, each poll returns Err(Aborted)
204        let (mut abort_a, mut abort_b) = (
205            Abortable::new(futures::future::pending::<()>(), a_abort),
206            Abortable::new(futures::future::pending::<()>(), b_abort),
207        );
208
209        std::future::poll_fn(|cx| {
210            let mut a_to_b_result = transfer_one_direction(cx, &mut a_to_b, a, b)?;
211            let mut b_to_a_result = transfer_one_direction(cx, &mut b_to_a, b, a)?;
212
213            // Initiate A's shutdown if A is aborted while still running
214            if let (Poll::Ready(Err(_)), TransferState::Running(buf)) = (abort_a.poll_unpin(cx), &a_to_b) {
215                tracing::trace!("A-side has been aborted.");
216                a_to_b = TransferState::ShuttingDown(buf.amt);
217                // We need an artificial wake-up here, as if an empty read was received
218                cx.waker().wake_by_ref();
219            }
220
221            // Initiate B's shutdown if B is aborted while still running
222            if let (Poll::Ready(Err(_)), TransferState::Running(buf)) = (abort_b.poll_unpin(cx), &b_to_a) {
223                tracing::trace!("B-side has been aborted.");
224                b_to_a = TransferState::ShuttingDown(buf.amt);
225                // We need an artificial wake-up here, as if an empty read was received
226                cx.waker().wake_by_ref();
227            }
228
229            // Once B-side is done, initiate shutdown of A-side
230            if let TransferState::Done(_) = b_to_a {
231                if let TransferState::Running(buf) = &a_to_b {
232                    tracing::trace!("B-side has completed, terminating A-side.");
233                    a_to_b = TransferState::ShuttingDown(buf.amt);
234                    a_to_b_result = transfer_one_direction(cx, &mut a_to_b, a, b)?;
235                }
236            }
237
238            // Once A-side is done, initiate shutdown of B-side
239            if let TransferState::Done(_) = a_to_b {
240                if let TransferState::Running(buf) = &b_to_a {
241                    tracing::trace!("A-side has completed, terminate B-side.");
242                    b_to_a = TransferState::ShuttingDown(buf.amt);
243                    b_to_a_result = transfer_one_direction(cx, &mut b_to_a, b, a)?;
244                }
245            }
246
247            // Not a problem if ready! returns early
248            let a_to_b_bytes_transferred = std::task::ready!(a_to_b_result);
249            let b_to_a_bytes_transferred = std::task::ready!(b_to_a_result);
250
251            tracing::trace!(
252                a_to_b = a_to_b_bytes_transferred,
253                b_to_a = b_to_a_bytes_transferred,
254                "copy completed"
255            );
256            Poll::Ready(Ok((a_to_b_bytes_transferred, b_to_a_bytes_transferred)))
257        })
258        .await
259    }
260
261    #[derive(Debug)]
262    struct CopyBuffer {
263        read_done: bool,
264        need_flush: bool,
265        pos: usize,
266        cap: usize,
267        amt: u64,
268        buf: Box<[u8]>,
269    }
270
271    impl CopyBuffer {
272        fn new(buf_size: usize) -> Self {
273            Self {
274                read_done: false,
275                need_flush: false,
276                pos: 0,
277                cap: 0,
278                amt: 0,
279                buf: vec![0; buf_size].into_boxed_slice(),
280            }
281        }
282
283        fn poll_fill_buf<R>(&mut self, cx: &mut Context<'_>, reader: Pin<&mut R>) -> Poll<std::io::Result<()>>
284        where
285            R: AsyncRead + ?Sized,
286        {
287            let me = &mut *self;
288            let mut buf = ReadBuf::new(&mut me.buf);
289            buf.set_filled(me.cap);
290
291            let res = reader.poll_read(cx, &mut buf);
292            if let Poll::Ready(Ok(())) = res {
293                let filled_len = buf.filled().len();
294                me.read_done = me.cap == filled_len;
295                me.cap = filled_len;
296            }
297            res
298        }
299
300        fn poll_write_buf<R, W>(
301            &mut self,
302            cx: &mut Context<'_>,
303            mut reader: Pin<&mut R>,
304            mut writer: Pin<&mut W>,
305        ) -> Poll<std::io::Result<usize>>
306        where
307            R: AsyncRead + ?Sized,
308            W: AsyncWrite + ?Sized,
309        {
310            let this = &mut *self;
311            match writer.as_mut().poll_write(cx, &this.buf[this.pos..this.cap]) {
312                Poll::Pending => {
313                    // Top up the buffer towards full if we can read a bit more
314                    // data - this should improve the chances of a large write
315                    if !this.read_done && this.cap < this.buf.len() {
316                        std::task::ready!(this.poll_fill_buf(cx, reader.as_mut()))?;
317                    }
318                    Poll::Pending
319                }
320                res @ Poll::Ready(_) => res,
321            }
322        }
323
324        pub(super) fn poll_copy<R, W>(
325            &mut self,
326            cx: &mut Context<'_>,
327            mut reader: Pin<&mut R>,
328            mut writer: Pin<&mut W>,
329        ) -> Poll<std::io::Result<u64>>
330        where
331            R: AsyncRead + ?Sized,
332            W: AsyncWrite + ?Sized,
333        {
334            loop {
335                // If our buffer is empty, then we need to read some data to
336                // continue.
337                if self.pos == self.cap && !self.read_done {
338                    self.pos = 0;
339                    self.cap = 0;
340
341                    match self.poll_fill_buf(cx, reader.as_mut()) {
342                        Poll::Ready(Ok(())) => (),
343                        Poll::Ready(Err(err)) => {
344                            return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, err)));
345                        }
346                        Poll::Pending => {
347                            // Try flushing when the reader has no progress to avoid deadlock
348                            // when the reader depends on a buffered writer.
349                            if self.need_flush {
350                                std::task::ready!(writer.as_mut().poll_flush(cx))?;
351                                self.need_flush = false;
352                            }
353
354                            return Poll::Pending;
355                        }
356                    }
357                }
358
359                // If our buffer has some data, let's write it out
360                while self.pos < self.cap {
361                    let i = std::task::ready!(self.poll_write_buf(cx, reader.as_mut(), writer.as_mut()))?;
362                    if i == 0 {
363                        return Poll::Ready(Err(std::io::Error::new(
364                            std::io::ErrorKind::WriteZero,
365                            "write zero byte",
366                        )));
367                    }
368                    self.pos += i;
369                    self.amt += i as u64;
370                    self.need_flush = true;
371                }
372
373                // If pos larger than cap, this loop will never stop.
374                // In particular, a user's wrong poll_write implementation returning
375                // incorrect written length may lead to thread blocking.
376                debug_assert!(self.pos <= self.cap, "writer returned length larger than input slice");
377
378                // If we've written all the data, and we've seen EOF, flush out the
379                // data and finish the transfer.
380                if self.pos == self.cap && self.read_done {
381                    std::task::ready!(writer.as_mut().poll_flush(cx))?;
382                    return Poll::Ready(Ok(self.amt));
383                }
384            }
385        }
386    }
387}
388
389/// Converts a [`AsyncRead`] into `futures::Stream` by reading at most `S` bytes
390/// in each call to `Stream::poll_next`.
391#[pin_project::pin_project]
392pub struct AsyncReadStreamer<const S: usize, R>(#[pin] pub R);
393
394impl<const S: usize, R: AsyncRead> futures::Stream for AsyncReadStreamer<S, R> {
395    type Item = std::io::Result<Box<[u8]>>;
396
397    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
398        let mut buffer = vec![0u8; S];
399        let mut this = self.project();
400
401        match futures::ready!(this.0.as_mut().poll_read(cx, &mut buffer)) {
402            Ok(0) => Poll::Ready(None),
403            Ok(size) => {
404                buffer.truncate(size);
405                Poll::Ready(Some(Ok(buffer.into_boxed_slice())))
406            }
407            Err(err) => Poll::Ready(Some(Err(err))),
408        }
409    }
410}
411
412/// Wraps a [`futures::Sink`] that accepts `Box<[u8]>` with an [`AsyncWrite`] interface,
413/// with each write to the underlying `Sink` being at most `C` bytes.
414#[pin_project::pin_project]
415pub struct AsyncWriteSink<const C: usize, S>(#[pin] pub S);
416
417impl<const C: usize, S> AsyncWrite for AsyncWriteSink<C, S>
418where
419    S: futures::Sink<Box<[u8]>>,
420    S::Error: Into<std::io::Error>,
421{
422    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
423        let mut this = self.project();
424
425        futures::ready!(this.0.as_mut().poll_ready(cx).map_err(Into::into))?;
426        let len = buf.len().min(C);
427
428        match this.0.as_mut().start_send(Box::from(&buf[..len])) {
429            Ok(()) => Poll::Ready(Ok(len)),
430            Err(e) => Poll::Ready(Err(e.into())),
431        }
432    }
433
434    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
435        self.project().0.poll_flush(cx).map_err(Into::into)
436    }
437
438    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
439        self.project().0.poll_close(cx).map_err(Into::into)
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use futures::{SinkExt, StreamExt, TryStreamExt};
446    #[cfg(feature = "runtime-tokio")]
447    use tokio::io::AsyncWriteExt;
448
449    use super::*;
450
451    #[cfg(feature = "runtime-tokio")]
452    #[tokio::test]
453    async fn test_copy_duplex() -> anyhow::Result<()> {
454        const DATA_LEN: usize = 2000;
455
456        let alice_tx = hopr_crypto_random::random_bytes::<DATA_LEN>();
457        let mut alice_rx = [0u8; DATA_LEN];
458
459        let bob_tx = hopr_crypto_random::random_bytes::<DATA_LEN>();
460        let mut bob_rx = [0u8; DATA_LEN];
461
462        let alice = DuplexIO(futures::io::Cursor::new(alice_rx.as_mut()), alice_tx.as_ref());
463        let bob = DuplexIO(futures::io::Cursor::new(bob_rx.as_mut()), bob_tx.as_ref());
464
465        let (a_to_b, b_to_a) = copy_duplex(
466            &mut tokio_util::compat::FuturesAsyncReadCompatExt::compat(alice),
467            &mut tokio_util::compat::FuturesAsyncReadCompatExt::compat(bob),
468            (128, 128),
469        )
470        .await?;
471
472        assert_eq!(DATA_LEN, a_to_b as usize);
473        assert_eq!(DATA_LEN, b_to_a as usize);
474
475        assert_eq!(alice_tx, bob_rx);
476        assert_eq!(bob_tx, alice_rx);
477
478        Ok(())
479    }
480
481    #[cfg(feature = "runtime-tokio")]
482    #[test_log::test(tokio::test(flavor = "multi_thread"))]
483    async fn test_copy_duplex_with_abort_from_client() -> anyhow::Result<()> {
484        let (mut client_tx, mut client_rx) = tokio::io::duplex(10); // Create a mock duplex stream
485        let (mut server_rx, mut server_tx) = tokio::io::duplex(10); // Create a mock duplex stream
486
487        // Simulate 'a' finishing while there's still data for 'b'
488        client_tx.write_all(b"hello").await?;
489        server_tx.write_all(b"data").await?;
490
491        let (handle_a, reg_a) = futures::future::AbortHandle::new_pair();
492        let (_, reg_b) = futures::future::AbortHandle::new_pair();
493
494        let result = tokio::task::spawn(async move {
495            crate::utils::copy_duplex_abortable(&mut client_rx, &mut server_rx, (2, 2), (reg_a, reg_b)).await
496        });
497
498        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
499
500        // The abort must make the task terminate, although none of the streams were shutdown
501        handle_a.abort();
502
503        let (a, b) = tokio::time::timeout(std::time::Duration::from_millis(100), result).await???;
504        assert_eq!(a, 5);
505        assert_eq!(b, 4);
506
507        Ok(())
508    }
509
510    #[cfg(feature = "runtime-tokio")]
511    #[tokio::test(flavor = "multi_thread")]
512    async fn test_copy_duplex_with_abort_from_server() -> anyhow::Result<()> {
513        let (mut client_tx, mut client_rx) = tokio::io::duplex(10); // Create a mock duplex stream
514        let (mut server_rx, mut server_tx) = tokio::io::duplex(10); // Create a mock duplex stream
515
516        // Simulate 'a' finishing while there's still data for 'b'
517        client_tx.write_all(b"hello").await?;
518        server_tx.write_all(b"data").await?;
519
520        let (_, reg_a) = futures::future::AbortHandle::new_pair();
521        let (handle_b, reg_b) = futures::future::AbortHandle::new_pair();
522
523        let result = tokio::task::spawn(async move {
524            crate::utils::copy_duplex_abortable(&mut client_rx, &mut server_rx, (2, 2), (reg_a, reg_b)).await
525        });
526
527        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
528
529        // The abort must make the task terminate, although none of the streams were shutdown
530        handle_b.abort();
531
532        let (a, b) = tokio::time::timeout(std::time::Duration::from_millis(100), result).await???;
533        assert_eq!(a, 5);
534        assert_eq!(b, 4);
535
536        Ok(())
537    }
538
539    #[cfg(feature = "runtime-tokio")]
540    #[tokio::test]
541    async fn test_copy_duplex_small() -> anyhow::Result<()> {
542        const DATA_LEN: usize = 100;
543
544        let alice_tx = hopr_crypto_random::random_bytes::<DATA_LEN>();
545        let mut alice_rx = [0u8; DATA_LEN];
546
547        let bob_tx = hopr_crypto_random::random_bytes::<DATA_LEN>();
548        let mut bob_rx = [0u8; DATA_LEN];
549
550        let alice = DuplexIO(futures::io::Cursor::new(alice_rx.as_mut()), alice_tx.as_ref());
551        let bob = DuplexIO(futures::io::Cursor::new(bob_rx.as_mut()), bob_tx.as_ref());
552
553        let (a_to_b, b_to_a) = copy_duplex(
554            &mut tokio_util::compat::FuturesAsyncReadCompatExt::compat(alice),
555            &mut tokio_util::compat::FuturesAsyncReadCompatExt::compat(bob),
556            (128, 128),
557        )
558        .await?;
559
560        assert_eq!(DATA_LEN, a_to_b as usize);
561        assert_eq!(DATA_LEN, b_to_a as usize);
562
563        assert_eq!(alice_tx, bob_rx);
564        assert_eq!(bob_tx, alice_rx);
565
566        Ok(())
567    }
568
569    #[cfg(feature = "runtime-tokio")]
570    #[tokio::test]
571    async fn test_client_to_server() -> anyhow::Result<()> {
572        let (mut client_tx, mut client_rx) = tokio::io::duplex(8); // Create a mock duplex stream
573        let (mut server_rx, mut server_tx) = tokio::io::duplex(32); // Create a mock duplex stream
574
575        // Simulate 'a' finishing while there's still data for 'b'
576        client_tx.write_all(b"hello").await?;
577        client_tx.shutdown().await?;
578
579        server_tx.write_all(b"data").await?;
580        server_tx.shutdown().await?;
581
582        let result = crate::utils::copy_duplex(&mut client_rx, &mut server_rx, (2, 2)).await?;
583
584        let (client_to_server_count, server_to_client_count) = result;
585        assert_eq!(client_to_server_count, 5); // 'hello' was transferred
586        assert_eq!(server_to_client_count, 4); // response only partially transferred or not at all
587
588        Ok(())
589    }
590
591    #[cfg(feature = "runtime-tokio")]
592    #[tokio::test]
593    async fn test_server_to_client() -> anyhow::Result<()> {
594        let (mut client_tx, mut client_rx) = tokio::io::duplex(32); // Create a mock duplex stream
595        let (mut server_rx, mut server_tx) = tokio::io::duplex(8); // Create a mock duplex stream
596
597        // Simulate 'a' finishing while there's still data for 'b'
598        server_tx.write_all(b"hello").await?;
599        server_tx.shutdown().await?;
600
601        client_tx.write_all(b"some longer data to transfer").await?;
602
603        let result = crate::utils::copy_duplex(&mut client_rx, &mut server_rx, (2, 2)).await?;
604
605        let (client_to_server_count, server_to_client_count) = result;
606        assert_eq!(server_to_client_count, 5); // 'hello' was transferred
607        assert!(client_to_server_count <= 8); // response only partially transferred or not at all
608
609        Ok(())
610    }
611
612    #[cfg(feature = "runtime-tokio")]
613    #[tokio::test]
614    async fn test_async_read_streamer_complete_chunk() {
615        let data = b"Hello, World!!";
616        let mut streamer = AsyncReadStreamer::<14, _>(&data[..]);
617        let mut results = Vec::new();
618
619        while let Some(res) = streamer.try_next().await.unwrap() {
620            results.push(res);
621        }
622
623        assert_eq!(results, vec![Box::from(*data)]);
624    }
625
626    #[tokio::test]
627    async fn test_async_read_streamer_complete_more_chunks() {
628        let data = b"Hello, World and do it twice";
629        let mut streamer = AsyncReadStreamer::<14, _>(&data[..]);
630        let mut results = Vec::new();
631
632        while let Some(res) = streamer.try_next().await.unwrap() {
633            results.push(res);
634        }
635
636        let (data1, data2) = data.split_at(14);
637        assert_eq!(results, vec![Box::from(data1), Box::from(data2)]);
638    }
639
640    #[tokio::test]
641    async fn test_async_read_streamer_complete_more_chunks_with_incomplete() -> anyhow::Result<()> {
642        let data = b"Hello, World and do it twice, ...";
643        let streamer = AsyncReadStreamer::<14, _>(&data[..]);
644
645        let results = streamer.try_collect::<Vec<_>>().await?;
646
647        let (data1, rest) = data.split_at(14);
648        let (data2, data3) = rest.split_at(14);
649        assert_eq!(results, vec![Box::from(data1), Box::from(data2), Box::from(data3)]);
650
651        Ok(())
652    }
653
654    #[tokio::test]
655    async fn test_async_read_streamer_incomplete_chunk() -> anyhow::Result<()> {
656        let data = b"Hello, World!!";
657        let reader = &data[0..8]; // An incomplete chunk
658        let mut streamer = AsyncReadStreamer::<14, _>(reader);
659
660        assert_eq!(Some(Box::from(reader)), streamer.try_next().await?);
661
662        Ok(())
663    }
664
665    #[tokio::test]
666    async fn test_async_write_sink_should_perform_write_in_chunks() -> anyhow::Result<()> {
667        let data = b"Hello, World!!";
668        let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
669
670        use futures::AsyncWriteExt;
671
672        let mut writer = AsyncWriteSink::<7, _>(tx.sink_map_err(|e| std::io::Error::other(e)));
673
674        AsyncWriteExt::write_all(&mut writer, data).await?;
675        AsyncWriteExt::flush(&mut writer).await?;
676        AsyncWriteExt::close(&mut writer).await?;
677
678        let rx_data = rx.collect::<Vec<_>>().await;
679        assert_eq!(2, rx_data.len());
680        assert_eq!(rx_data[0], (&data[0..7]).into());
681        assert_eq!(rx_data[1], (&data[7..]).into());
682
683        Ok(())
684    }
685}