1use std::sync::Arc;
5
6use futures::{
7 AsyncRead, AsyncReadExt, AsyncWrite, FutureExt, SinkExt as _, StreamExt,
8 channel::mpsc::{Receiver, Sender, channel},
9};
10use hopr_api::network::NetworkStreamControl;
11use libp2p::PeerId;
12use tokio_util::{
13 codec::{Decoder, Encoder, FramedRead, FramedWrite},
14 compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt},
15};
16
17const GLOBAL_STREAM_OPEN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);
21const MAX_CONCURRENT_PACKETS: usize = 30;
22
23const DEFAULT_FRAME_WRITER_BACKPRESSURE_BYTES: usize = 4096;
35
36fn build_peer_stream_io<S, C>(
37 peer: PeerId,
38 stream: S,
39 cache: moka::future::Cache<PeerId, Sender<<C as Decoder>::Item>>,
40 codec: C,
41 ingress_from_peers: Sender<(PeerId, <C as Decoder>::Item)>,
42 frame_writer_backpressure_bytes: usize,
43) -> Sender<<C as Decoder>::Item>
44where
45 S: AsyncRead + AsyncWrite + Send + 'static,
46 C: Encoder<<C as Decoder>::Item> + Decoder + Send + Sync + Clone + 'static,
47 <C as Encoder<<C as Decoder>::Item>>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
48 <C as Decoder>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
49 <C as Decoder>::Item: AsRef<[u8]> + Clone + Send + 'static,
50{
51 let (stream_rx, stream_tx) = stream.split();
52 let (send, recv) = channel::<<C as Decoder>::Item>(1000);
53 let cache_internal = cache.clone();
54
55 let mut frame_writer = FramedWrite::new(stream_tx.compat_write(), codec.clone());
56
57 frame_writer.set_backpressure_boundary(frame_writer_backpressure_bytes);
63
64 hopr_async_runtime::prelude::spawn(
66 recv.inspect(move |_| tracing::trace!(%peer, "writing message to peer stream"))
67 .map(Ok)
68 .forward(frame_writer)
69 .inspect(move |res| {
70 tracing::debug!(%peer, ?res, component = "stream", "writing stream with peer finished");
71 }),
72 );
73
74 hopr_async_runtime::prelude::spawn(
76 FramedRead::new(stream_rx.compat(), codec)
77 .filter_map(move |v| {
78 futures::future::ready(match v {
79 Ok(v) => {
80 tracing::trace!(%peer, "read message from peer stream");
81 Some((peer, v))
82 }
83 Err(error) => {
84 tracing::error!(%error, "Error decoding object from the underlying stream");
85 None
86 }
87 })
88 })
89 .map(Ok)
90 .forward(ingress_from_peers)
91 .inspect(move |res| match res {
92 Ok(_) => tracing::debug!(%peer, component = "stream", "incoming stream done reading"),
93 Err(error) => {
94 tracing::error!(%peer, %error, component = "stream", "incoming stream failed on reading")
95 }
96 })
97 .then(move |_| {
98 let peer = peer;
100 async move {
101 cache_internal.invalidate(&peer).await;
102 }
103 }),
104 );
105
106 tracing::trace!(%peer, "created new io for peer");
107 send
108}
109
110pub async fn process_stream_protocol<C, V>(
111 codec: C,
112 control: V,
113) -> crate::errors::Result<(
114 Sender<(PeerId, <C as Decoder>::Item)>, Receiver<(PeerId, <C as Decoder>::Item)>, )>
117where
118 C: Encoder<<C as Decoder>::Item> + Decoder + Send + Sync + Clone + 'static,
119 <C as Encoder<<C as Decoder>::Item>>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
120 <C as Decoder>::Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
121 <C as Decoder>::Item: AsRef<[u8]> + Clone + Send + 'static,
122 V: NetworkStreamControl + Clone + Send + Sync + 'static,
123{
124 let (tx_out, rx_out) = channel::<(PeerId, <C as Decoder>::Item)>(100_000);
125 let (tx_in, rx_in) = channel::<(PeerId, <C as Decoder>::Item)>(100_000);
126
127 let cache_out = moka::future::Cache::builder()
128 .max_capacity(2000)
129 .eviction_listener(|key: Arc<PeerId>, _, cause| {
130 tracing::trace!(peer = %key.as_ref(), ?cause, "evicting stream for peer");
131 })
132 .build();
133
134 let incoming = control
135 .clone()
136 .accept()
137 .map_err(|e| crate::errors::ProtocolError::Logic(format!("failed to listen on protocol: {e}")))?;
138
139 let max_concurrent_packets = std::env::var("HOPR_TRANSPORT_MAX_CONCURRENT_PACKETS")
140 .ok()
141 .and_then(|v| v.parse().ok())
142 .filter(|&n: &usize| n > 0)
143 .unwrap_or(MAX_CONCURRENT_PACKETS);
144
145 let global_stream_open_timeout = std::env::var("HOPR_TRANSPORT_STREAM_OPEN_TIMEOUT_MS")
146 .ok()
147 .and_then(|v| v.parse().ok())
148 .map(std::time::Duration::from_millis)
149 .unwrap_or(GLOBAL_STREAM_OPEN_TIMEOUT);
150
151 let frame_writer_backpressure_bytes = std::env::var("HOPR_TRANSPORT_FRAME_WRITER_BACKPRESSURE_BYTES")
152 .ok()
153 .and_then(|v| v.parse().ok())
154 .filter(|&n: &usize| n > 0)
155 .unwrap_or(DEFAULT_FRAME_WRITER_BACKPRESSURE_BYTES);
156
157 let open_ctx = Arc::new((control, codec, tx_in));
162
163 let cache_ingress = cache_out.clone();
164 let open_ctx_ingress = open_ctx.clone();
165
166 let _ingress_process = hopr_async_runtime::prelude::spawn(
168 incoming
169 .for_each(move |(peer, stream)| {
170 let cache = cache_ingress.clone();
171 let open_ctx = open_ctx_ingress.clone();
172
173 tracing::debug!(%peer, "received incoming peer-to-peer stream");
174 let (_control, codec, tx_in) = (&open_ctx.0, &open_ctx.1, &open_ctx.2);
175 let send = build_peer_stream_io(
176 peer,
177 stream,
178 cache.clone(),
179 codec.clone(),
180 tx_in.clone(),
181 frame_writer_backpressure_bytes,
182 );
183
184 async move {
185 cache.insert(peer, send).await;
186 }
187 })
188 .inspect(|_| {
189 tracing::info!(
190 task = "ingress stream processing",
191 "long-running background task finished"
192 )
193 }),
194 );
195
196 let _egress_process = hopr_async_runtime::prelude::spawn(
198 rx_out
199 .inspect(|(peer, _)| tracing::trace!(%peer, "proceeding to deliver message to peer"))
200 .for_each_concurrent(max_concurrent_packets, move |(peer, msg)| {
201 let cache = cache_out.clone();
202 let open_ctx = open_ctx.clone();
203
204 async move {
205 tracing::trace!(%peer, "trying to deliver message to peer");
206
207 let cache_clone = cache.clone();
208 let cached: Result<Sender<<C as Decoder>::Item>, Arc<anyhow::Error>> = cache
209 .try_get_with(peer, async move {
210 tracing::trace!(%peer, "peer is not in cache, opening new stream");
211
212 let (control, codec, tx_in) = (&open_ctx.0, &open_ctx.1, &open_ctx.2);
216
217 use futures_time::future::FutureExt as TimeExt;
222 let stream = control
223 .clone()
224 .open(peer)
225 .timeout(futures_time::time::Duration::from(global_stream_open_timeout))
226 .await
227 .map_err(|_| anyhow::anyhow!("timeout trying to open stream to {peer}"))?
228 .map_err(|e| anyhow::anyhow!("could not open outgoing peer-to-peer stream: {e}"))?;
229
230 tracing::debug!(%peer, "opening outgoing peer-to-peer stream");
231
232 Ok(build_peer_stream_io(
233 peer,
234 stream,
235 cache_clone,
236 codec.clone(),
237 tx_in.clone(),
238 frame_writer_backpressure_bytes,
239 ))
240 })
241 .await;
242
243 match cached {
244 Ok(mut cached) => {
245 if let Err(error) = cached.send(msg).await {
246 tracing::error!(%peer, %error, "error sending message to peer");
247 cache.invalidate(&peer).await;
248 } else {
249 tracing::trace!(%peer, "message sent to peer");
250 }
251 }
252 Err(error) => {
253 tracing::debug!(%peer, %error, "failed to open a stream to peer");
254 }
255 }
256 }
257 })
258 .inspect(|_| {
259 tracing::info!(
260 task = "egress stream processing",
261 "long-running background task finished"
262 )
263 }),
264 );
265
266 Ok((tx_out, rx_in))
267}
268
269#[cfg(test)]
270mod tests {
271 use anyhow::Context;
272 use futures::SinkExt;
273
274 use super::*;
275
276 struct AsyncBinaryStreamChannel {
277 read: async_channel_io::ChannelReader,
278 write: async_channel_io::ChannelWriter,
279 }
280
281 impl AsyncBinaryStreamChannel {
282 pub fn new() -> Self {
283 let (write, read) = async_channel_io::pipe();
284 Self { read, write }
285 }
286 }
287
288 impl AsyncRead for AsyncBinaryStreamChannel {
289 fn poll_read(
290 self: std::pin::Pin<&mut Self>,
291 cx: &mut std::task::Context<'_>,
292 buf: &mut [u8],
293 ) -> std::task::Poll<std::io::Result<usize>> {
294 let mut pinned = std::pin::pin!(&mut self.get_mut().read);
295 pinned.as_mut().poll_read(cx, buf)
296 }
297 }
298
299 impl AsyncWrite for AsyncBinaryStreamChannel {
300 fn poll_write(
301 self: std::pin::Pin<&mut Self>,
302 cx: &mut std::task::Context<'_>,
303 buf: &[u8],
304 ) -> std::task::Poll<std::io::Result<usize>> {
305 let mut pinned = std::pin::pin!(&mut self.get_mut().write);
306 pinned.as_mut().poll_write(cx, buf)
307 }
308
309 fn poll_flush(
310 self: std::pin::Pin<&mut Self>,
311 cx: &mut std::task::Context<'_>,
312 ) -> std::task::Poll<std::io::Result<()>> {
313 let pinned = std::pin::pin!(&mut self.get_mut().write);
314 pinned.poll_flush(cx)
315 }
316
317 fn poll_close(
318 self: std::pin::Pin<&mut Self>,
319 cx: &mut std::task::Context<'_>,
320 ) -> std::task::Poll<std::io::Result<()>> {
321 let pinned = std::pin::pin!(&mut self.get_mut().write);
322 pinned.poll_close(cx)
323 }
324 }
325
326 #[tokio::test]
327 async fn split_codec_should_always_produce_correct_data() -> anyhow::Result<()> {
328 let stream = AsyncBinaryStreamChannel::new();
329 let codec = tokio_util::codec::BytesCodec::new();
330
331 let expected = [0u8, 1u8, 2u8, 3u8, 4u8, 5u8];
332 let value = tokio_util::bytes::BytesMut::from(expected.as_ref());
333
334 let (stream_rx, stream_tx) = stream.split();
335 let (mut tx, rx) = (
336 FramedWrite::new(stream_tx.compat_write(), codec),
337 FramedRead::new(stream_rx.compat(), codec),
338 );
339 tx.send(value)
340 .await
341 .map_err(|_| anyhow::anyhow!("should not fail on send"))?;
342
343 futures::pin_mut!(rx);
344
345 assert_eq!(
346 rx.next().await.context("Value must be present")??,
347 tokio_util::bytes::BytesMut::from(expected.as_ref())
348 );
349
350 Ok(())
351 }
352}