1use std::sync::Arc;
5
6use futures::{
7 AsyncRead, AsyncReadExt, AsyncWrite, FutureExt, SinkExt as _, StreamExt,
8 channel::mpsc::{Receiver, Sender, channel},
9};
10use hopr_api::network::NetworkStreamControl;
11use hopr_utils::network_types::timeout::{SinkTimeoutError, TimeoutSinkExt};
12use libp2p::PeerId;
13use tokio_util::{
14 codec::{Decoder, Encoder, FramedRead, FramedWrite},
15 compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt},
16};
17
18#[cfg(all(feature = "telemetry", not(test)))]
19lazy_static::lazy_static! {
20 static ref METRIC_PER_PEER_SEND_TIMEOUT: hopr_types::telemetry::SimpleCounter =
21 hopr_types::telemetry::SimpleCounter::new(
22 "hopr_egress_per_peer_send_timed_out",
23 "Number of packets dropped due to per-peer egress send timeout",
24 )
25 .unwrap();
26}
27
28const GLOBAL_STREAM_OPEN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);
32
33const DEFAULT_PER_PEER_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
36const MAX_CONCURRENT_PACKETS: usize = 30;
37
38const DEFAULT_FRAME_WRITER_BACKPRESSURE_BYTES: usize = 4096;
50
51fn build_peer_stream_io<S, C>(
52 peer: PeerId,
53 stream: S,
54 cache: moka::future::Cache<PeerId, Sender<<C as Decoder>::Item>>,
55 codec: C,
56 ingress_from_peers: Sender<(PeerId, <C as Decoder>::Item)>,
57 frame_writer_backpressure_bytes: usize,
58) -> Sender<<C as Decoder>::Item>
59where
60 S: AsyncRead + AsyncWrite + Send + 'static,
61 C: Encoder<<C as Decoder>::Item> + Decoder + Send + Sync + Clone + 'static,
62 <C as Encoder<<C as Decoder>::Item>>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
63 <C as Decoder>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
64 <C as Decoder>::Item: AsRef<[u8]> + Clone + Send + 'static,
65{
66 let (stream_rx, stream_tx) = stream.split();
67 let (send, recv) = channel::<<C as Decoder>::Item>(1000);
68 let cache_internal = cache.clone();
69
70 let mut frame_writer = FramedWrite::new(stream_tx.compat_write(), codec.clone());
71
72 frame_writer.set_backpressure_boundary(frame_writer_backpressure_bytes);
78
79 hopr_utils::runtime::prelude::spawn(
81 recv.inspect(move |_| tracing::trace!(%peer, "writing message to peer stream"))
82 .map(Ok)
83 .forward(frame_writer)
84 .inspect(move |res| {
85 tracing::debug!(%peer, ?res, component = "stream", "writing stream with peer finished");
86 }),
87 );
88
89 hopr_utils::runtime::prelude::spawn(
91 FramedRead::new(stream_rx.compat(), codec)
92 .filter_map(move |v| {
93 futures::future::ready(match v {
94 Ok(v) => {
95 tracing::trace!(%peer, "read message from peer stream");
96 Some((peer, v))
97 }
98 Err(error) => {
99 tracing::error!(%error, "Error decoding object from the underlying stream");
100 None
101 }
102 })
103 })
104 .map(Ok)
105 .forward(ingress_from_peers)
106 .inspect(move |res| match res {
107 Ok(_) => tracing::debug!(%peer, component = "stream", "incoming stream done reading"),
108 Err(error) => {
109 tracing::error!(%peer, %error, component = "stream", "incoming stream failed on reading")
110 }
111 })
112 .then(move |_| {
113 let peer = peer;
115 async move {
116 cache_internal.invalidate(&peer).await;
117 }
118 }),
119 );
120
121 tracing::trace!(%peer, "created new io for peer");
122 send
123}
124
125pub async fn process_stream_protocol<C, V>(
126 codec: C,
127 control: V,
128) -> super::errors::Result<(
129 Sender<(PeerId, <C as Decoder>::Item)>, Receiver<(PeerId, <C as Decoder>::Item)>, )>
132where
133 C: Encoder<<C as Decoder>::Item> + Decoder + Send + Sync + Clone + 'static,
134 <C as Encoder<<C as Decoder>::Item>>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
135 <C as Decoder>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
136 <C as Decoder>::Item: AsRef<[u8]> + Clone + Send + 'static,
137 V: NetworkStreamControl + Clone + Send + Sync + 'static,
138{
139 let (tx_out, rx_out) = channel::<(PeerId, <C as Decoder>::Item)>(100_000);
140 let (tx_in, rx_in) = channel::<(PeerId, <C as Decoder>::Item)>(100_000);
141
142 let cache_out = moka::future::Cache::builder()
143 .max_capacity(2000)
144 .eviction_listener(|key: Arc<PeerId>, _, cause| {
145 tracing::trace!(peer = %key.as_ref(), ?cause, "evicting stream for peer");
146 })
147 .build();
148
149 let incoming = control
150 .clone()
151 .accept()
152 .map_err(|e| super::errors::ProtocolError::Logic(format!("failed to listen on protocol: {e}")))?;
153
154 let max_concurrent_packets = std::env::var("HOPR_TRANSPORT_MAX_CONCURRENT_PACKETS")
155 .ok()
156 .and_then(|v| v.parse().ok())
157 .filter(|&n: &usize| n > 0)
158 .unwrap_or(MAX_CONCURRENT_PACKETS);
159
160 let global_stream_open_timeout = std::env::var("HOPR_TRANSPORT_STREAM_OPEN_TIMEOUT_MS")
161 .ok()
162 .and_then(|v| v.parse().ok())
163 .map(std::time::Duration::from_millis)
164 .unwrap_or(GLOBAL_STREAM_OPEN_TIMEOUT);
165
166 let frame_writer_backpressure_bytes = std::env::var("HOPR_TRANSPORT_FRAME_WRITER_BACKPRESSURE_BYTES")
167 .ok()
168 .and_then(|v| v.parse().ok())
169 .filter(|&n: &usize| n > 0)
170 .unwrap_or(DEFAULT_FRAME_WRITER_BACKPRESSURE_BYTES);
171
172 let per_peer_send_timeout = std::env::var("HOPR_TRANSPORT_PER_PEER_SEND_TIMEOUT_MS")
173 .ok()
174 .and_then(|v| v.parse().ok())
175 .map(std::time::Duration::from_millis)
176 .unwrap_or(DEFAULT_PER_PEER_SEND_TIMEOUT);
177
178 let open_ctx = Arc::new((control, codec, tx_in));
183
184 let cache_ingress = cache_out.clone();
185 let open_ctx_ingress = open_ctx.clone();
186
187 let _ingress_process = hopr_utils::runtime::prelude::spawn(
189 incoming
190 .for_each(move |(peer, stream)| {
191 let cache = cache_ingress.clone();
192 let open_ctx = open_ctx_ingress.clone();
193
194 tracing::debug!(%peer, "received incoming peer-to-peer stream");
195 let (_control, codec, tx_in) = (&open_ctx.0, &open_ctx.1, &open_ctx.2);
196 let send = build_peer_stream_io(
197 peer,
198 stream,
199 cache.clone(),
200 codec.clone(),
201 tx_in.clone(),
202 frame_writer_backpressure_bytes,
203 );
204
205 async move {
206 cache.insert(peer, send).await;
207 }
208 })
209 .inspect(|_| {
210 tracing::info!(
211 task = "ingress stream processing",
212 "long-running background task finished"
213 )
214 }),
215 );
216
217 let _egress_process = hopr_utils::runtime::prelude::spawn(
219 rx_out
220 .inspect(|(peer, _)| tracing::trace!(%peer, "proceeding to deliver message to peer"))
221 .for_each_concurrent(max_concurrent_packets, move |(peer, msg)| {
222 let cache = cache_out.clone();
223 let open_ctx = open_ctx.clone();
224
225 async move {
226 tracing::trace!(%peer, "trying to deliver message to peer");
227
228 let cache_clone = cache.clone();
229 let cached: Result<Sender<<C as Decoder>::Item>, Arc<anyhow::Error>> = cache
230 .try_get_with(peer, async move {
231 tracing::trace!(%peer, "peer is not in cache, opening new stream");
232
233 let (control, codec, tx_in) = (&open_ctx.0, &open_ctx.1, &open_ctx.2);
237
238 use futures_time::future::FutureExt as TimeExt;
243 let stream = control
244 .clone()
245 .open(peer)
246 .timeout(futures_time::time::Duration::from(global_stream_open_timeout))
247 .await
248 .map_err(|_| anyhow::anyhow!("timeout trying to open stream to {peer}"))?
249 .map_err(|e| anyhow::anyhow!("could not open outgoing peer-to-peer stream: {e}"))?;
250
251 tracing::debug!(%peer, "opening outgoing peer-to-peer stream");
252
253 Ok(build_peer_stream_io(
254 peer,
255 stream,
256 cache_clone,
257 codec.clone(),
258 tx_in.clone(),
259 frame_writer_backpressure_bytes,
260 ))
261 })
262 .await;
263
264 match cached {
265 Ok(cached) => match cached.with_timeout(per_peer_send_timeout).send(msg).await {
266 Ok(()) => tracing::trace!(%peer, "message sent to peer"),
267 Err(SinkTimeoutError::Timeout) => {
268 #[cfg(all(feature = "telemetry", not(test)))]
269 METRIC_PER_PEER_SEND_TIMEOUT.increment();
270 tracing::warn!(%peer, "per-peer egress send timed out, dropping packet");
271 cache.invalidate(&peer).await;
272 }
273 Err(SinkTimeoutError::Inner(error)) => {
274 tracing::error!(%peer, %error, "error sending message to peer");
275 cache.invalidate(&peer).await;
276 }
277 },
278 Err(error) => {
279 tracing::debug!(%peer, %error, "failed to open a stream to peer");
280 }
281 }
282 }
283 })
284 .inspect(|_| {
285 tracing::info!(
286 task = "egress stream processing",
287 "long-running background task finished"
288 )
289 }),
290 );
291
292 Ok((tx_out, rx_in))
293}
294
295#[cfg(test)]
296mod tests {
297 use anyhow::Context;
298 use futures::SinkExt;
299
300 use super::*;
301
302 struct AsyncBinaryStreamChannel {
303 read: async_channel_io::ChannelReader,
304 write: async_channel_io::ChannelWriter,
305 }
306
307 impl AsyncBinaryStreamChannel {
308 pub fn new() -> Self {
309 let (write, read) = async_channel_io::pipe();
310 Self { read, write }
311 }
312 }
313
314 impl AsyncRead for AsyncBinaryStreamChannel {
315 fn poll_read(
316 self: std::pin::Pin<&mut Self>,
317 cx: &mut std::task::Context<'_>,
318 buf: &mut [u8],
319 ) -> std::task::Poll<std::io::Result<usize>> {
320 let mut pinned = std::pin::pin!(&mut self.get_mut().read);
321 pinned.as_mut().poll_read(cx, buf)
322 }
323 }
324
325 impl AsyncWrite for AsyncBinaryStreamChannel {
326 fn poll_write(
327 self: std::pin::Pin<&mut Self>,
328 cx: &mut std::task::Context<'_>,
329 buf: &[u8],
330 ) -> std::task::Poll<std::io::Result<usize>> {
331 let mut pinned = std::pin::pin!(&mut self.get_mut().write);
332 pinned.as_mut().poll_write(cx, buf)
333 }
334
335 fn poll_flush(
336 self: std::pin::Pin<&mut Self>,
337 cx: &mut std::task::Context<'_>,
338 ) -> std::task::Poll<std::io::Result<()>> {
339 let pinned = std::pin::pin!(&mut self.get_mut().write);
340 pinned.poll_flush(cx)
341 }
342
343 fn poll_close(
344 self: std::pin::Pin<&mut Self>,
345 cx: &mut std::task::Context<'_>,
346 ) -> std::task::Poll<std::io::Result<()>> {
347 let pinned = std::pin::pin!(&mut self.get_mut().write);
348 pinned.poll_close(cx)
349 }
350 }
351
352 #[tokio::test]
353 async fn split_codec_should_always_produce_correct_data() -> anyhow::Result<()> {
354 let stream = AsyncBinaryStreamChannel::new();
355 let codec = tokio_util::codec::BytesCodec::new();
356
357 let expected = [0u8, 1u8, 2u8, 3u8, 4u8, 5u8];
358 let value = tokio_util::bytes::BytesMut::from(expected.as_ref());
359
360 let (stream_rx, stream_tx) = stream.split();
361 let (mut tx, rx) = (
362 FramedWrite::new(stream_tx.compat_write(), codec),
363 FramedRead::new(stream_rx.compat(), codec),
364 );
365 tx.send(value)
366 .await
367 .map_err(|_| anyhow::anyhow!("should not fail on send"))?;
368
369 futures::pin_mut!(rx);
370
371 assert_eq!(
372 rx.next().await.context("Value must be present")??,
373 tokio_util::bytes::BytesMut::from(expected.as_ref())
374 );
375
376 Ok(())
377 }
378}