Skip to main content

hopr_transport/protocol/
stream.rs

1//! Infrastructure supporting converting a collection of `PeerId` split `libp2p_stream` managed
2//! individual peer-to-peer `libp2p::swarm::Stream`s.
3
4use std::sync::Arc;
5
6use futures::{
7    AsyncRead, AsyncReadExt, AsyncWrite, FutureExt, SinkExt as _, StreamExt,
8    channel::mpsc::{Receiver, Sender, channel},
9};
10use hopr_api::network::NetworkStreamControl;
11use hopr_utils::network_types::timeout::{SinkTimeoutError, TimeoutSinkExt};
12use libp2p::PeerId;
13use tokio_util::{
14    codec::{Decoder, Encoder, FramedRead, FramedWrite},
15    compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt},
16};
17
18#[cfg(all(feature = "telemetry", not(test)))]
19lazy_static::lazy_static! {
20    static ref METRIC_PER_PEER_SEND_TIMEOUT: hopr_types::telemetry::SimpleCounter =
21        hopr_types::telemetry::SimpleCounter::new(
22            "hopr_egress_per_peer_send_timed_out",
23            "Number of packets dropped due to per-peer egress send timeout",
24        )
25        .unwrap();
26}
27
28// TODO: see if these constants should be configurable instead
29
30/// Global timeout for the `BidirectionalStreamControl::open` operation.
31const GLOBAL_STREAM_OPEN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);
32
33/// Timeout for sending a single message into the per-peer mpsc buffer.
34/// If the buffer stays full for longer than this, the message is dropped.
35const DEFAULT_PER_PEER_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
36const MAX_CONCURRENT_PACKETS: usize = 30;
37
38/// Default pending-write-buffer byte threshold on the framed writer before a flush is
39/// forced. **This value is in bytes, not in messages** — that is how
40/// `tokio_util::codec::FramedWrite::set_backpressure_boundary` is defined.
41///
42/// The previous value of `1` byte forced a flush syscall on every message (the buffer
43/// exceeds one byte the moment a frame is encoded), making a relay forwarding N packets
44/// issue N individual writes rather than coalescing adjacent small frames. `4096` bytes
45/// is a conservative default that lets roughly ~4 HOPR packets (~1 KiB each) batch into
46/// a single write while still flushing quickly under single-packet traffic.
47///
48/// Override with `HOPR_TRANSPORT_FRAME_WRITER_BACKPRESSURE_BYTES`.
49const DEFAULT_FRAME_WRITER_BACKPRESSURE_BYTES: usize = 4096;
50
51fn build_peer_stream_io<S, C>(
52    peer: PeerId,
53    stream: S,
54    cache: moka::future::Cache<PeerId, Sender<<C as Decoder>::Item>>,
55    codec: C,
56    ingress_from_peers: Sender<(PeerId, <C as Decoder>::Item)>,
57    frame_writer_backpressure_bytes: usize,
58) -> Sender<<C as Decoder>::Item>
59where
60    S: AsyncRead + AsyncWrite + Send + 'static,
61    C: Encoder<<C as Decoder>::Item> + Decoder + Send + Sync + Clone + 'static,
62    <C as Encoder<<C as Decoder>::Item>>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
63    <C as Decoder>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
64    <C as Decoder>::Item: AsRef<[u8]> + Clone + Send + 'static,
65{
66    let (stream_rx, stream_tx) = stream.split();
67    let (send, recv) = channel::<<C as Decoder>::Item>(1000);
68    let cache_internal = cache.clone();
69
70    let mut frame_writer = FramedWrite::new(stream_tx.compat_write(), codec.clone());
71
72    // `set_backpressure_boundary` is a *byte* threshold on `FramedWrite`'s internal
73    // pending-write buffer: once the encoded frames exceed this many bytes, the next
74    // `poll_ready` call will issue a flush. A small value (e.g. `1`) flushes on every
75    // message for the lowest latency at the cost of one syscall per frame; a larger
76    // value lets adjacent small frames coalesce into a single write on busy relays.
77    frame_writer.set_backpressure_boundary(frame_writer_backpressure_bytes);
78
79    // Send all outgoing data to the peer
80    hopr_utils::runtime::prelude::spawn(
81        recv.inspect(move |_| tracing::trace!(%peer, "writing message to peer stream"))
82            .map(Ok)
83            .forward(frame_writer)
84            .inspect(move |res| {
85                tracing::debug!(%peer, ?res, component = "stream", "writing stream with peer finished");
86            }),
87    );
88
89    // Read all incoming data from that peer and pass it to the general ingress stream
90    hopr_utils::runtime::prelude::spawn(
91        FramedRead::new(stream_rx.compat(), codec)
92            .filter_map(move |v| {
93                futures::future::ready(match v {
94                    Ok(v) => {
95                        tracing::trace!(%peer, "read message from peer stream");
96                        Some((peer, v))
97                    }
98                    Err(error) => {
99                        tracing::error!(%error, "Error decoding object from the underlying stream");
100                        None
101                    }
102                })
103            })
104            .map(Ok)
105            .forward(ingress_from_peers)
106            .inspect(move |res| match res {
107                Ok(_) => tracing::debug!(%peer, component = "stream", "incoming stream done reading"),
108                Err(error) => {
109                    tracing::error!(%peer, %error, component = "stream", "incoming stream failed on reading")
110                }
111            })
112            .then(move |_| {
113                // Make sure we invalidate the peer entry from the cache once the stream ends
114                let peer = peer;
115                async move {
116                    cache_internal.invalidate(&peer).await;
117                }
118            }),
119    );
120
121    tracing::trace!(%peer, "created new io for peer");
122    send
123}
124
125pub async fn process_stream_protocol<C, V>(
126    codec: C,
127    control: V,
128) -> super::errors::Result<(
129    Sender<(PeerId, <C as Decoder>::Item)>, // impl Sink<(PeerId, <C as Decoder>::Item)>,
130    Receiver<(PeerId, <C as Decoder>::Item)>, // impl Stream<Item = (PeerId, <C as Decoder>::Item)>,
131)>
132where
133    C: Encoder<<C as Decoder>::Item> + Decoder + Send + Sync + Clone + 'static,
134    <C as Encoder<<C as Decoder>::Item>>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
135    <C as Decoder>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
136    <C as Decoder>::Item: AsRef<[u8]> + Clone + Send + 'static,
137    V: NetworkStreamControl + Clone + Send + Sync + 'static,
138{
139    let (tx_out, rx_out) = channel::<(PeerId, <C as Decoder>::Item)>(100_000);
140    let (tx_in, rx_in) = channel::<(PeerId, <C as Decoder>::Item)>(100_000);
141
142    let cache_out = moka::future::Cache::builder()
143        .max_capacity(2000)
144        .eviction_listener(|key: Arc<PeerId>, _, cause| {
145            tracing::trace!(peer = %key.as_ref(), ?cause, "evicting stream for peer");
146        })
147        .build();
148
149    let incoming = control
150        .clone()
151        .accept()
152        .map_err(|e| super::errors::ProtocolError::Logic(format!("failed to listen on protocol: {e}")))?;
153
154    let max_concurrent_packets = std::env::var("HOPR_TRANSPORT_MAX_CONCURRENT_PACKETS")
155        .ok()
156        .and_then(|v| v.parse().ok())
157        .filter(|&n: &usize| n > 0)
158        .unwrap_or(MAX_CONCURRENT_PACKETS);
159
160    let global_stream_open_timeout = std::env::var("HOPR_TRANSPORT_STREAM_OPEN_TIMEOUT_MS")
161        .ok()
162        .and_then(|v| v.parse().ok())
163        .map(std::time::Duration::from_millis)
164        .unwrap_or(GLOBAL_STREAM_OPEN_TIMEOUT);
165
166    let frame_writer_backpressure_bytes = std::env::var("HOPR_TRANSPORT_FRAME_WRITER_BACKPRESSURE_BYTES")
167        .ok()
168        .and_then(|v| v.parse().ok())
169        .filter(|&n: &usize| n > 0)
170        .unwrap_or(DEFAULT_FRAME_WRITER_BACKPRESSURE_BYTES);
171
172    let per_peer_send_timeout = std::env::var("HOPR_TRANSPORT_PER_PEER_SEND_TIMEOUT_MS")
173        .ok()
174        .and_then(|v| v.parse().ok())
175        .map(std::time::Duration::from_millis)
176        .unwrap_or(DEFAULT_PER_PEER_SEND_TIMEOUT);
177
178    // Pack the handles only needed to open a NEW peer stream into a single Arc. This
179    // lets us pay one Arc bump per packet at the closure level instead of three (control,
180    // codec, tx_in), and defers the per-field `.clone()` to the cache-miss path that
181    // actually needs them.
182    let open_ctx = Arc::new((control, codec, tx_in));
183
184    let cache_ingress = cache_out.clone();
185    let open_ctx_ingress = open_ctx.clone();
186
187    // terminated when the incoming is dropped
188    let _ingress_process = hopr_utils::runtime::prelude::spawn(
189        incoming
190            .for_each(move |(peer, stream)| {
191                let cache = cache_ingress.clone();
192                let open_ctx = open_ctx_ingress.clone();
193
194                tracing::debug!(%peer, "received incoming peer-to-peer stream");
195                let (_control, codec, tx_in) = (&open_ctx.0, &open_ctx.1, &open_ctx.2);
196                let send = build_peer_stream_io(
197                    peer,
198                    stream,
199                    cache.clone(),
200                    codec.clone(),
201                    tx_in.clone(),
202                    frame_writer_backpressure_bytes,
203                );
204
205                async move {
206                    cache.insert(peer, send).await;
207                }
208            })
209            .inspect(|_| {
210                tracing::info!(
211                    task = "ingress stream processing",
212                    "long-running background task finished"
213                )
214            }),
215    );
216
217    // terminated when the rx_in is dropped
218    let _egress_process = hopr_utils::runtime::prelude::spawn(
219        rx_out
220            .inspect(|(peer, _)| tracing::trace!(%peer, "proceeding to deliver message to peer"))
221            .for_each_concurrent(max_concurrent_packets, move |(peer, msg)| {
222                let cache = cache_out.clone();
223                let open_ctx = open_ctx.clone();
224
225                async move {
226                    tracing::trace!(%peer, "trying to deliver message to peer");
227
228                    let cache_clone = cache.clone();
229                    let cached: Result<Sender<<C as Decoder>::Item>, Arc<anyhow::Error>> = cache
230                        .try_get_with(peer, async move {
231                            tracing::trace!(%peer, "peer is not in cache, opening new stream");
232
233                            // Only the cache-miss path needs the stream-open handles.
234                            // Clone them out of the shared `open_ctx` once here rather
235                            // than on every outgoing message.
236                            let (control, codec, tx_in) = (&open_ctx.0, &open_ctx.1, &open_ctx.2);
237
238                            // There must be a timeout on the `open` operation; otherwise
239                            //  a single impossible ` open ` operation will block the peer ID entry in the
240                            // cache forever, having a direct detrimental effect on the packet
241                            // processing pipeline.
242                            use futures_time::future::FutureExt as TimeExt;
243                            let stream = control
244                                .clone()
245                                .open(peer)
246                                .timeout(futures_time::time::Duration::from(global_stream_open_timeout))
247                                .await
248                                .map_err(|_| anyhow::anyhow!("timeout trying to open stream to {peer}"))?
249                                .map_err(|e| anyhow::anyhow!("could not open outgoing peer-to-peer stream: {e}"))?;
250
251                            tracing::debug!(%peer, "opening outgoing peer-to-peer stream");
252
253                            Ok(build_peer_stream_io(
254                                peer,
255                                stream,
256                                cache_clone,
257                                codec.clone(),
258                                tx_in.clone(),
259                                frame_writer_backpressure_bytes,
260                            ))
261                        })
262                        .await;
263
264                    match cached {
265                        Ok(cached) => match cached.with_timeout(per_peer_send_timeout).send(msg).await {
266                            Ok(()) => tracing::trace!(%peer, "message sent to peer"),
267                            Err(SinkTimeoutError::Timeout) => {
268                                #[cfg(all(feature = "telemetry", not(test)))]
269                                METRIC_PER_PEER_SEND_TIMEOUT.increment();
270                                tracing::warn!(%peer, "per-peer egress send timed out, dropping packet");
271                                cache.invalidate(&peer).await;
272                            }
273                            Err(SinkTimeoutError::Inner(error)) => {
274                                tracing::error!(%peer, %error, "error sending message to peer");
275                                cache.invalidate(&peer).await;
276                            }
277                        },
278                        Err(error) => {
279                            tracing::debug!(%peer, %error, "failed to open a stream to peer");
280                        }
281                    }
282                }
283            })
284            .inspect(|_| {
285                tracing::info!(
286                    task = "egress stream processing",
287                    "long-running background task finished"
288                )
289            }),
290    );
291
292    Ok((tx_out, rx_in))
293}
294
295#[cfg(test)]
296mod tests {
297    use anyhow::Context;
298    use futures::SinkExt;
299
300    use super::*;
301
302    struct AsyncBinaryStreamChannel {
303        read: async_channel_io::ChannelReader,
304        write: async_channel_io::ChannelWriter,
305    }
306
307    impl AsyncBinaryStreamChannel {
308        pub fn new() -> Self {
309            let (write, read) = async_channel_io::pipe();
310            Self { read, write }
311        }
312    }
313
314    impl AsyncRead for AsyncBinaryStreamChannel {
315        fn poll_read(
316            self: std::pin::Pin<&mut Self>,
317            cx: &mut std::task::Context<'_>,
318            buf: &mut [u8],
319        ) -> std::task::Poll<std::io::Result<usize>> {
320            let mut pinned = std::pin::pin!(&mut self.get_mut().read);
321            pinned.as_mut().poll_read(cx, buf)
322        }
323    }
324
325    impl AsyncWrite for AsyncBinaryStreamChannel {
326        fn poll_write(
327            self: std::pin::Pin<&mut Self>,
328            cx: &mut std::task::Context<'_>,
329            buf: &[u8],
330        ) -> std::task::Poll<std::io::Result<usize>> {
331            let mut pinned = std::pin::pin!(&mut self.get_mut().write);
332            pinned.as_mut().poll_write(cx, buf)
333        }
334
335        fn poll_flush(
336            self: std::pin::Pin<&mut Self>,
337            cx: &mut std::task::Context<'_>,
338        ) -> std::task::Poll<std::io::Result<()>> {
339            let pinned = std::pin::pin!(&mut self.get_mut().write);
340            pinned.poll_flush(cx)
341        }
342
343        fn poll_close(
344            self: std::pin::Pin<&mut Self>,
345            cx: &mut std::task::Context<'_>,
346        ) -> std::task::Poll<std::io::Result<()>> {
347            let pinned = std::pin::pin!(&mut self.get_mut().write);
348            pinned.poll_close(cx)
349        }
350    }
351
352    #[tokio::test]
353    async fn split_codec_should_always_produce_correct_data() -> anyhow::Result<()> {
354        let stream = AsyncBinaryStreamChannel::new();
355        let codec = tokio_util::codec::BytesCodec::new();
356
357        let expected = [0u8, 1u8, 2u8, 3u8, 4u8, 5u8];
358        let value = tokio_util::bytes::BytesMut::from(expected.as_ref());
359
360        let (stream_rx, stream_tx) = stream.split();
361        let (mut tx, rx) = (
362            FramedWrite::new(stream_tx.compat_write(), codec),
363            FramedRead::new(stream_rx.compat(), codec),
364        );
365        tx.send(value)
366            .await
367            .map_err(|_| anyhow::anyhow!("should not fail on send"))?;
368
369        futures::pin_mut!(rx);
370
371        assert_eq!(
372            rx.next().await.context("Value must be present")??,
373            tokio_util::bytes::BytesMut::from(expected.as_ref())
374        );
375
376        Ok(())
377    }
378}