Skip to main content

hopr_transport_protocol/
stream.rs

1//! Infrastructure supporting converting a collection of [`PeerId`] split `libp2p_stream` managed
2//! individual peer-to-peer `libp2p::swarm::Stream`s.
3
4use std::sync::Arc;
5
6use futures::{
7    AsyncRead, AsyncReadExt, AsyncWrite, FutureExt, SinkExt as _, StreamExt,
8    channel::mpsc::{Receiver, Sender, channel},
9};
10use hopr_api::network::NetworkStreamControl;
11use libp2p::PeerId;
12use tokio_util::{
13    codec::{Decoder, Encoder, FramedRead, FramedWrite},
14    compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt},
15};
16
17// TODO: see if these constants should be configurable instead
18
19/// Global timeout for the `BidirectionalStreamControl::open` operation.
20const GLOBAL_STREAM_OPEN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);
21const MAX_CONCURRENT_PACKETS: usize = 30;
22
23/// Default pending-write-buffer byte threshold on the framed writer before a flush is
24/// forced. **This value is in bytes, not in messages** — that is how
25/// `tokio_util::codec::FramedWrite::set_backpressure_boundary` is defined.
26///
27/// The previous value of `1` byte forced a flush syscall on every message (the buffer
28/// exceeds one byte the moment a frame is encoded), making a relay forwarding N packets
29/// issue N individual writes rather than coalescing adjacent small frames. `4096` bytes
30/// is a conservative default that lets roughly ~4 HOPR packets (~1 KiB each) batch into
31/// a single write while still flushing quickly under single-packet traffic.
32///
33/// Override with `HOPR_TRANSPORT_FRAME_WRITER_BACKPRESSURE_BYTES`.
34const DEFAULT_FRAME_WRITER_BACKPRESSURE_BYTES: usize = 4096;
35
36fn build_peer_stream_io<S, C>(
37    peer: PeerId,
38    stream: S,
39    cache: moka::future::Cache<PeerId, Sender<<C as Decoder>::Item>>,
40    codec: C,
41    ingress_from_peers: Sender<(PeerId, <C as Decoder>::Item)>,
42    frame_writer_backpressure_bytes: usize,
43) -> Sender<<C as Decoder>::Item>
44where
45    S: AsyncRead + AsyncWrite + Send + 'static,
46    C: Encoder<<C as Decoder>::Item> + Decoder + Send + Sync + Clone + 'static,
47    <C as Encoder<<C as Decoder>::Item>>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
48    <C as Decoder>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
49    <C as Decoder>::Item: AsRef<[u8]> + Clone + Send + 'static,
50{
51    let (stream_rx, stream_tx) = stream.split();
52    let (send, recv) = channel::<<C as Decoder>::Item>(1000);
53    let cache_internal = cache.clone();
54
55    let mut frame_writer = FramedWrite::new(stream_tx.compat_write(), codec.clone());
56
57    // `set_backpressure_boundary` is a *byte* threshold on `FramedWrite`'s internal
58    // pending-write buffer: once the encoded frames exceed this many bytes, the next
59    // `poll_ready` call will issue a flush. A small value (e.g. `1`) flushes on every
60    // message for the lowest latency at the cost of one syscall per frame; a larger
61    // value lets adjacent small frames coalesce into a single write on busy relays.
62    frame_writer.set_backpressure_boundary(frame_writer_backpressure_bytes);
63
64    // Send all outgoing data to the peer
65    hopr_async_runtime::prelude::spawn(
66        recv.inspect(move |_| tracing::trace!(%peer, "writing message to peer stream"))
67            .map(Ok)
68            .forward(frame_writer)
69            .inspect(move |res| {
70                tracing::debug!(%peer, ?res, component = "stream", "writing stream with peer finished");
71            }),
72    );
73
74    // Read all incoming data from that peer and pass it to the general ingress stream
75    hopr_async_runtime::prelude::spawn(
76        FramedRead::new(stream_rx.compat(), codec)
77            .filter_map(move |v| {
78                futures::future::ready(match v {
79                    Ok(v) => {
80                        tracing::trace!(%peer, "read message from peer stream");
81                        Some((peer, v))
82                    }
83                    Err(error) => {
84                        tracing::error!(%error, "Error decoding object from the underlying stream");
85                        None
86                    }
87                })
88            })
89            .map(Ok)
90            .forward(ingress_from_peers)
91            .inspect(move |res| match res {
92                Ok(_) => tracing::debug!(%peer, component = "stream", "incoming stream done reading"),
93                Err(error) => {
94                    tracing::error!(%peer, %error, component = "stream", "incoming stream failed on reading")
95                }
96            })
97            .then(move |_| {
98                // Make sure we invalidate the peer entry from the cache once the stream ends
99                let peer = peer;
100                async move {
101                    cache_internal.invalidate(&peer).await;
102                }
103            }),
104    );
105
106    tracing::trace!(%peer, "created new io for peer");
107    send
108}
109
110pub async fn process_stream_protocol<C, V>(
111    codec: C,
112    control: V,
113) -> crate::errors::Result<(
114    Sender<(PeerId, <C as Decoder>::Item)>, // impl Sink<(PeerId, <C as Decoder>::Item)>,
115    Receiver<(PeerId, <C as Decoder>::Item)>, // impl Stream<Item = (PeerId, <C as Decoder>::Item)>,
116)>
117where
118    C: Encoder<<C as Decoder>::Item> + Decoder + Send + Sync + Clone + 'static,
119    <C as Encoder<<C as Decoder>::Item>>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
120    <C as Decoder>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
121    <C as Decoder>::Item: AsRef<[u8]> + Clone + Send + 'static,
122    V: NetworkStreamControl + Clone + Send + Sync + 'static,
123{
124    let (tx_out, rx_out) = channel::<(PeerId, <C as Decoder>::Item)>(100_000);
125    let (tx_in, rx_in) = channel::<(PeerId, <C as Decoder>::Item)>(100_000);
126
127    let cache_out = moka::future::Cache::builder()
128        .max_capacity(2000)
129        .eviction_listener(|key: Arc<PeerId>, _, cause| {
130            tracing::trace!(peer = %key.as_ref(), ?cause, "evicting stream for peer");
131        })
132        .build();
133
134    let incoming = control
135        .clone()
136        .accept()
137        .map_err(|e| crate::errors::ProtocolError::Logic(format!("failed to listen on protocol: {e}")))?;
138
139    let max_concurrent_packets = std::env::var("HOPR_TRANSPORT_MAX_CONCURRENT_PACKETS")
140        .ok()
141        .and_then(|v| v.parse().ok())
142        .filter(|&n: &usize| n > 0)
143        .unwrap_or(MAX_CONCURRENT_PACKETS);
144
145    let global_stream_open_timeout = std::env::var("HOPR_TRANSPORT_STREAM_OPEN_TIMEOUT_MS")
146        .ok()
147        .and_then(|v| v.parse().ok())
148        .map(std::time::Duration::from_millis)
149        .unwrap_or(GLOBAL_STREAM_OPEN_TIMEOUT);
150
151    let frame_writer_backpressure_bytes = std::env::var("HOPR_TRANSPORT_FRAME_WRITER_BACKPRESSURE_BYTES")
152        .ok()
153        .and_then(|v| v.parse().ok())
154        .filter(|&n: &usize| n > 0)
155        .unwrap_or(DEFAULT_FRAME_WRITER_BACKPRESSURE_BYTES);
156
157    // Pack the handles only needed to open a NEW peer stream into a single Arc. This
158    // lets us pay one Arc bump per packet at the closure level instead of three (control,
159    // codec, tx_in), and defers the per-field `.clone()` to the cache-miss path that
160    // actually needs them.
161    let open_ctx = Arc::new((control, codec, tx_in));
162
163    let cache_ingress = cache_out.clone();
164    let open_ctx_ingress = open_ctx.clone();
165
166    // terminated when the incoming is dropped
167    let _ingress_process = hopr_async_runtime::prelude::spawn(
168        incoming
169            .for_each(move |(peer, stream)| {
170                let cache = cache_ingress.clone();
171                let open_ctx = open_ctx_ingress.clone();
172
173                tracing::debug!(%peer, "received incoming peer-to-peer stream");
174                let (_control, codec, tx_in) = (&open_ctx.0, &open_ctx.1, &open_ctx.2);
175                let send = build_peer_stream_io(
176                    peer,
177                    stream,
178                    cache.clone(),
179                    codec.clone(),
180                    tx_in.clone(),
181                    frame_writer_backpressure_bytes,
182                );
183
184                async move {
185                    cache.insert(peer, send).await;
186                }
187            })
188            .inspect(|_| {
189                tracing::info!(
190                    task = "ingress stream processing",
191                    "long-running background task finished"
192                )
193            }),
194    );
195
196    // terminated when the rx_in is dropped
197    let _egress_process = hopr_async_runtime::prelude::spawn(
198        rx_out
199            .inspect(|(peer, _)| tracing::trace!(%peer, "proceeding to deliver message to peer"))
200            .for_each_concurrent(max_concurrent_packets, move |(peer, msg)| {
201                let cache = cache_out.clone();
202                let open_ctx = open_ctx.clone();
203
204                async move {
205                    tracing::trace!(%peer, "trying to deliver message to peer");
206
207                    let cache_clone = cache.clone();
208                    let cached: Result<Sender<<C as Decoder>::Item>, Arc<anyhow::Error>> = cache
209                        .try_get_with(peer, async move {
210                            tracing::trace!(%peer, "peer is not in cache, opening new stream");
211
212                            // Only the cache-miss path needs the stream-open handles.
213                            // Clone them out of the shared `open_ctx` once here rather
214                            // than on every outgoing message.
215                            let (control, codec, tx_in) = (&open_ctx.0, &open_ctx.1, &open_ctx.2);
216
217                            // There must be a timeout on the `open` operation; otherwise
218                            //  a single impossible ` open ` operation will block the peer ID entry in the
219                            // cache forever, having a direct detrimental effect on the packet
220                            // processing pipeline.
221                            use futures_time::future::FutureExt as TimeExt;
222                            let stream = control
223                                .clone()
224                                .open(peer)
225                                .timeout(futures_time::time::Duration::from(global_stream_open_timeout))
226                                .await
227                                .map_err(|_| anyhow::anyhow!("timeout trying to open stream to {peer}"))?
228                                .map_err(|e| anyhow::anyhow!("could not open outgoing peer-to-peer stream: {e}"))?;
229
230                            tracing::debug!(%peer, "opening outgoing peer-to-peer stream");
231
232                            Ok(build_peer_stream_io(
233                                peer,
234                                stream,
235                                cache_clone,
236                                codec.clone(),
237                                tx_in.clone(),
238                                frame_writer_backpressure_bytes,
239                            ))
240                        })
241                        .await;
242
243                    match cached {
244                        Ok(mut cached) => {
245                            if let Err(error) = cached.send(msg).await {
246                                tracing::error!(%peer, %error, "error sending message to peer");
247                                cache.invalidate(&peer).await;
248                            } else {
249                                tracing::trace!(%peer, "message sent to peer");
250                            }
251                        }
252                        Err(error) => {
253                            tracing::debug!(%peer, %error, "failed to open a stream to peer");
254                        }
255                    }
256                }
257            })
258            .inspect(|_| {
259                tracing::info!(
260                    task = "egress stream processing",
261                    "long-running background task finished"
262                )
263            }),
264    );
265
266    Ok((tx_out, rx_in))
267}
268
269#[cfg(test)]
270mod tests {
271    use anyhow::Context;
272    use futures::SinkExt;
273
274    use super::*;
275
276    struct AsyncBinaryStreamChannel {
277        read: async_channel_io::ChannelReader,
278        write: async_channel_io::ChannelWriter,
279    }
280
281    impl AsyncBinaryStreamChannel {
282        pub fn new() -> Self {
283            let (write, read) = async_channel_io::pipe();
284            Self { read, write }
285        }
286    }
287
288    impl AsyncRead for AsyncBinaryStreamChannel {
289        fn poll_read(
290            self: std::pin::Pin<&mut Self>,
291            cx: &mut std::task::Context<'_>,
292            buf: &mut [u8],
293        ) -> std::task::Poll<std::io::Result<usize>> {
294            let mut pinned = std::pin::pin!(&mut self.get_mut().read);
295            pinned.as_mut().poll_read(cx, buf)
296        }
297    }
298
299    impl AsyncWrite for AsyncBinaryStreamChannel {
300        fn poll_write(
301            self: std::pin::Pin<&mut Self>,
302            cx: &mut std::task::Context<'_>,
303            buf: &[u8],
304        ) -> std::task::Poll<std::io::Result<usize>> {
305            let mut pinned = std::pin::pin!(&mut self.get_mut().write);
306            pinned.as_mut().poll_write(cx, buf)
307        }
308
309        fn poll_flush(
310            self: std::pin::Pin<&mut Self>,
311            cx: &mut std::task::Context<'_>,
312        ) -> std::task::Poll<std::io::Result<()>> {
313            let pinned = std::pin::pin!(&mut self.get_mut().write);
314            pinned.poll_flush(cx)
315        }
316
317        fn poll_close(
318            self: std::pin::Pin<&mut Self>,
319            cx: &mut std::task::Context<'_>,
320        ) -> std::task::Poll<std::io::Result<()>> {
321            let pinned = std::pin::pin!(&mut self.get_mut().write);
322            pinned.poll_close(cx)
323        }
324    }
325
326    #[tokio::test]
327    async fn split_codec_should_always_produce_correct_data() -> anyhow::Result<()> {
328        let stream = AsyncBinaryStreamChannel::new();
329        let codec = tokio_util::codec::BytesCodec::new();
330
331        let expected = [0u8, 1u8, 2u8, 3u8, 4u8, 5u8];
332        let value = tokio_util::bytes::BytesMut::from(expected.as_ref());
333
334        let (stream_rx, stream_tx) = stream.split();
335        let (mut tx, rx) = (
336            FramedWrite::new(stream_tx.compat_write(), codec),
337            FramedRead::new(stream_rx.compat(), codec),
338        );
339        tx.send(value)
340            .await
341            .map_err(|_| anyhow::anyhow!("should not fail on send"))?;
342
343        futures::pin_mut!(rx);
344
345        assert_eq!(
346            rx.next().await.context("Value must be present")??,
347            tokio_util::bytes::BytesMut::from(expected.as_ref())
348        );
349
350        Ok(())
351    }
352}