hopr_lib/utils/
session.rs

1//! Session-related utilities for HOPR
2//!
3//! This module provides utility functions and structures for managing sessions,
4//! including session lifecycle management, session data handling, and common
5//! session operations.
6
7use std::{
8    collections::{HashMap, VecDeque},
9    fmt::Formatter,
10    future::Future,
11    hash::Hash,
12    net::SocketAddr,
13    num::NonZeroUsize,
14    str::FromStr,
15    sync::Arc,
16};
17
18use async_lock::RwLock;
19use base64::Engine;
20use bytesize::ByteSize;
21use dashmap::DashMap;
22use futures::{
23    FutureExt, StreamExt, TryStreamExt,
24    future::{AbortHandle, AbortRegistration},
25};
26use hopr_network_types::{
27    prelude::{ConnectedUdpStream, IpOrHost, IpProtocol, SealedHost, UdpStreamParallelism},
28    udp::ForeignDataMode,
29};
30use hopr_transport::RoutingOptions;
31use human_bandwidth::re::bandwidth::Bandwidth;
32use serde::{Deserialize, Serialize};
33use serde_with::serde_as;
34use tokio::net::TcpListener;
35use tracing::{debug, error, info};
36
37use crate::{
38    Address, Hopr, HoprSession, HoprSessionId, SURB_SIZE, ServiceId, SessionClientConfig, SessionTarget,
39    errors::HoprLibError, transfer_session,
40};
41
42/// Size of the buffer for forwarding data to/from a TCP stream.
43pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
44
45/// Size of the buffer for forwarding data to/from a UDP stream.
46pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
47
48/// Size of the queue (back-pressure) for data incoming from a UDP stream.
49pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
50
51#[cfg(all(feature = "prometheus", not(test)))]
52lazy_static::lazy_static! {
53    static ref METRIC_ACTIVE_CLIENTS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
54        "hopr_session_hoprd_clients",
55        "Number of clients connected at this Entry node",
56        &["type"]
57    ).unwrap();
58}
59
60#[serde_as]
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
62/// Session target specification.
63pub enum SessionTargetSpec {
64    Plain(String),
65    Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
66    Service(ServiceId),
67}
68
69impl std::fmt::Display for SessionTargetSpec {
70    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71        match self {
72            SessionTargetSpec::Plain(t) => write!(f, "{t}"),
73            SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
74            SessionTargetSpec::Service(t) => write!(f, "#{t}"),
75        }
76    }
77}
78
79impl FromStr for SessionTargetSpec {
80    type Err = HoprLibError;
81
82    fn from_str(s: &str) -> Result<Self, Self::Err> {
83        Ok(if let Some(stripped) = s.strip_prefix("$$") {
84            Self::Sealed(
85                base64::prelude::BASE64_URL_SAFE
86                    .decode(stripped)
87                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
88            )
89        } else if let Some(stripped) = s.strip_prefix("#") {
90            Self::Service(
91                stripped
92                    .parse()
93                    .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
94            )
95        } else {
96            Self::Plain(s.to_owned())
97        })
98    }
99}
100
101impl SessionTargetSpec {
102    pub fn into_target(self, protocol: IpProtocol) -> Result<SessionTarget, HoprLibError> {
103        Ok(match (protocol, self) {
104            (IpProtocol::TCP, SessionTargetSpec::Plain(plain)) => SessionTarget::TcpStream(
105                IpOrHost::from_str(&plain)
106                    .map(SealedHost::from)
107                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
108            ),
109            (IpProtocol::UDP, SessionTargetSpec::Plain(plain)) => SessionTarget::UdpStream(
110                IpOrHost::from_str(&plain)
111                    .map(SealedHost::from)
112                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
113            ),
114            (IpProtocol::TCP, SessionTargetSpec::Sealed(enc)) => {
115                SessionTarget::TcpStream(SealedHost::Sealed(enc.into_boxed_slice()))
116            }
117            (IpProtocol::UDP, SessionTargetSpec::Sealed(enc)) => {
118                SessionTarget::UdpStream(SealedHost::Sealed(enc.into_boxed_slice()))
119            }
120            (_, SessionTargetSpec::Service(id)) => SessionTarget::ExitNode(id),
121        })
122    }
123}
124
125/// Entry stored in the session registry table.
126#[derive(Debug)]
127pub struct StoredSessionEntry {
128    /// Destination address of the Session counterparty.
129    pub destination: Address,
130    /// Target of the Session.
131    pub target: SessionTargetSpec,
132    /// Forward path used for the Session.
133    pub forward_path: RoutingOptions,
134    /// Return path used for the Session.
135    pub return_path: RoutingOptions,
136    /// The maximum number of client sessions that the listener can spawn.
137    pub max_client_sessions: usize,
138    /// The maximum number of SURB packets that can be sent upstream.
139    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
140    /// The amount of response data the Session counterparty can deliver back to us, without us
141    /// having to request it.
142    pub response_buffer: Option<bytesize::ByteSize>,
143    /// How many Sessions to pool for clients.
144    pub session_pool: Option<usize>,
145    /// The abort handle for the Session processing.
146    pub abort_handle: AbortHandle,
147
148    clients: Arc<DashMap<HoprSessionId, (SocketAddr, AbortHandle)>>,
149}
150
151impl StoredSessionEntry {
152    pub fn get_clients(&self) -> &Arc<DashMap<HoprSessionId, (SocketAddr, AbortHandle)>> {
153        &self.clients
154    }
155}
156
157/// This function first tries to parse `requested` as the `ip:port` host pair.
158/// If that does not work, it tries to parse `requested` as a single IP address
159/// and as a `:` prefixed port number. Whichever of those fails, is replaced by the corresponding
160/// part from the given `default`.
161pub fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
162    match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
163        Some(Err(requested)) => {
164            // If the requested host is not parseable as a whole as `SocketAddr`, try only its parts
165            debug!(requested, %default, "using partially default listen host");
166            std::net::SocketAddr::new(
167                requested.parse().unwrap_or(default.ip()),
168                requested
169                    .strip_prefix(":")
170                    .and_then(|p| u16::from_str(p).ok())
171                    .unwrap_or(default.port()),
172            )
173        }
174        Some(Ok(requested)) => {
175            debug!(%requested, "using requested listen host");
176            requested
177        }
178        None => {
179            debug!(%default, "using default listen host");
180            default
181        }
182    }
183}
184
185#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
186pub struct ListenerId(pub IpProtocol, pub std::net::SocketAddr);
187
188pub type ListenerJoinHandles = Arc<RwLock<HashMap<ListenerId, StoredSessionEntry>>>;
189
190pub struct SessionPool {
191    pool: Option<Arc<std::sync::Mutex<VecDeque<HoprSession>>>>,
192    ah: Option<AbortHandle>,
193}
194
195impl SessionPool {
196    pub const MAX_SESSION_POOL_SIZE: usize = 5;
197
198    pub async fn new(
199        size: usize,
200        dst: Address,
201        target: SessionTarget,
202        cfg: SessionClientConfig,
203        hopr: Arc<Hopr>,
204    ) -> Result<Self, String> {
205        let pool = Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(size)));
206        let hopr_clone = hopr.clone();
207        let pool_clone = pool.clone();
208        futures::stream::iter(0..size.min(Self::MAX_SESSION_POOL_SIZE))
209            .map(Ok)
210            .try_for_each_concurrent(Self::MAX_SESSION_POOL_SIZE, move |i| {
211                let pool = pool_clone.clone();
212                let hopr = hopr_clone.clone();
213                let target = target.clone();
214                let cfg = cfg.clone();
215                async move {
216                    match hopr.connect_to(dst, target.clone(), cfg.clone()).await {
217                        Ok(s) => {
218                            debug!(session_id = %s.id(), num_session = i, "created a new session in pool");
219                            pool.lock().map_err(|_| "lock failed".to_string())?.push_back(s);
220                            Ok(())
221                        }
222                        Err(error) => {
223                            error!(%error, num_session = i, "failed to establish session for pool");
224                            Err(format!("failed to establish session #{i} in pool to {dst}: {error}"))
225                        }
226                    }
227                }
228            })
229            .await?;
230
231        // Spawn a task that periodically sends keep alive messages to the Session in the pool.
232        if !pool.lock().map(|p| p.is_empty()).unwrap_or(true) {
233            let pool_clone_1 = pool.clone();
234            let pool_clone_2 = pool.clone();
235            let pool_clone_3 = pool.clone();
236            Ok(Self {
237                pool: Some(pool),
238                ah: Some(hopr_async_runtime::spawn_as_abortable!(
239                    futures_time::stream::interval(futures_time::time::Duration::from(
240                        std::time::Duration::from_secs(1).max(hopr.config().session.idle_timeout / 2)
241                    ))
242                    .take_while(move |_| {
243                        // Continue the infinite interval stream until there are sessions in the pool
244                        futures::future::ready(pool_clone_1.lock().is_ok_and(|p| !p.is_empty()))
245                    })
246                    .flat_map(move |_| {
247                        // Get all SessionIds of the remaining Sessions in the pool
248                        let ids = pool_clone_2.lock().ok().map(|v| v.iter().map(|s| *s.id()).collect::<Vec<_>>());
249                        futures::stream::iter(ids.into_iter().flatten())
250                    })
251                    .for_each(move |id| {
252                        let hopr = hopr.clone();
253                        let pool = pool_clone_3.clone();
254                        async move {
255                            // Make sure the Session is still alive, otherwise remove it from the pool
256                            if let Err(error) = hopr.keep_alive_session(&id).await {
257                                error!(%error, %dst, session_id = %id, "session in pool is not alive, removing from pool");
258                                if let Ok(mut pool) = pool.lock() {
259                                    pool.retain(|s| *s.id() != id);
260                                }
261                            }
262                        }
263                    })
264                ))
265            })
266        } else {
267            Ok(Self { pool: None, ah: None })
268        }
269    }
270
271    pub fn pop(&mut self) -> Option<HoprSession> {
272        self.pool.as_ref().and_then(|pool| pool.lock().ok()?.pop_front())
273    }
274}
275
276impl Drop for SessionPool {
277    fn drop(&mut self) {
278        if let Some(ah) = self.ah.take() {
279            ah.abort();
280        }
281    }
282}
283
284#[allow(clippy::too_many_arguments)]
285pub async fn create_tcp_client_binding(
286    bind_host: std::net::SocketAddr,
287    port_range: Option<String>,
288    hopr: Arc<Hopr>,
289    open_listeners: ListenerJoinHandles,
290    destination: Address,
291    target_spec: SessionTargetSpec,
292    config: SessionClientConfig,
293    use_session_pool: Option<usize>,
294    max_client_sessions: Option<usize>,
295) -> Result<(std::net::SocketAddr, Option<HoprSessionId>, usize), BindError> {
296    // Bind the TCP socket first
297    let (bound_host, tcp_listener) = tcp_listen_on(bind_host, port_range).await.map_err(|e| {
298        if e.kind() == std::io::ErrorKind::AddrInUse {
299            BindError::ListenHostAlreadyUsed
300        } else {
301            BindError::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}"))
302        }
303    })?;
304    info!(%bound_host, "TCP session listener bound");
305
306    // For each new TCP connection coming to the listener,
307    // open a Session with the same parameters
308    let target = target_spec
309        .clone()
310        .into_target(IpProtocol::TCP)
311        .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
312
313    // Create a session pool if requested
314    let session_pool_size = use_session_pool.unwrap_or(0);
315    let mut session_pool = SessionPool::new(
316        session_pool_size,
317        destination,
318        target.clone(),
319        config.clone(),
320        hopr.clone(),
321    )
322    .await
323    .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
324
325    let active_sessions = Arc::new(DashMap::new());
326    let mut max_clients = max_client_sessions.unwrap_or(5).max(1);
327
328    if max_clients < session_pool_size {
329        max_clients = session_pool_size;
330    }
331
332    let config_clone = config.clone();
333    // Create an abort handler for the listener
334    let (abort_handle, abort_reg) = AbortHandle::new_pair();
335    let active_sessions_clone = active_sessions.clone();
336    hopr_async_runtime::prelude::spawn(async move {
337        let active_sessions_clone_2 = active_sessions_clone.clone();
338
339        futures::stream::Abortable::new(tokio_stream::wrappers::TcpListenerStream::new(tcp_listener), abort_reg)
340            .and_then(|sock| async { Ok((sock.peer_addr()?, sock)) })
341            .for_each(move |accepted_client| {
342                let data = config_clone.clone();
343                let target = target.clone();
344                let hopr = hopr.clone();
345                let active_sessions = active_sessions_clone_2.clone();
346
347                // Try to pop from the pool only if a client was accepted
348                let maybe_pooled_session = accepted_client.is_ok().then(|| session_pool.pop()).flatten();
349                async move {
350                    match accepted_client {
351                        Ok((sock_addr, mut stream)) => {
352                            debug!(?sock_addr, "incoming TCP connection");
353
354                            // Check that we are still within the quota,
355                            // otherwise shutdown the new client immediately
356                            if active_sessions.len() >= max_clients {
357                                error!(?bind_host, "no more client slots available at listener");
358                                use tokio::io::AsyncWriteExt;
359                                if let Err(error) = stream.shutdown().await {
360                                    error!(%error, ?sock_addr, "failed to shutdown TCP connection");
361                                }
362                                return;
363                            }
364
365                            // See if we still have some session pooled
366                            let session = match maybe_pooled_session {
367                                Some(s) => {
368                                    debug!(session_id = %s.id(), "using pooled session");
369                                    s
370                                }
371                                None => {
372                                    debug!("no more active sessions in the pool, creating a new one");
373                                    match hopr.connect_to(destination, target, data).await {
374                                        Ok(s) => s,
375                                        Err(error) => {
376                                            error!(%error, "failed to establish session");
377                                            return;
378                                        }
379                                    }
380                                }
381                            };
382
383                            let session_id = *session.id();
384                            debug!(?sock_addr, %session_id, "new session for incoming TCP connection");
385
386                            let (abort_handle, abort_reg) = AbortHandle::new_pair();
387                            active_sessions.insert(session_id, (sock_addr, abort_handle));
388
389                            #[cfg(all(feature = "prometheus", not(test)))]
390                            METRIC_ACTIVE_CLIENTS.increment(&["tcp"], 1.0);
391
392                            hopr_async_runtime::prelude::spawn(
393                                // The stream either terminates naturally (by the client closing the TCP connection)
394                                // or is terminated via the abort handle.
395                                bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, Some(abort_reg)).then(
396                                    move |_| async move {
397                                        // Regardless how the session ended, remove the abort handle
398                                        // from the map
399                                        active_sessions.remove(&session_id);
400
401                                        debug!(%session_id, "tcp session has ended");
402
403                                        #[cfg(all(feature = "prometheus", not(test)))]
404                                        METRIC_ACTIVE_CLIENTS.decrement(&["tcp"], 1.0);
405                                    },
406                                ),
407                            );
408                        }
409                        Err(error) => error!(%error, "failed to accept connection"),
410                    }
411                }
412            })
413            .await;
414
415        // Once the listener is done, abort all active sessions created by the listener
416        active_sessions_clone.iter().for_each(|entry| {
417            let (sock_addr, handle) = entry.value();
418            debug!(session_id = %entry.key(), ?sock_addr, "aborting opened TCP session after listener has been closed");
419            handle.abort()
420        });
421    });
422
423    open_listeners.write_arc().await.insert(
424        ListenerId(hopr_network_types::types::IpProtocol::TCP, bound_host),
425        StoredSessionEntry {
426            destination,
427            target: target_spec,
428            forward_path: config.forward_path_options,
429            return_path: config.return_path_options,
430            clients: active_sessions,
431            max_client_sessions: max_clients,
432            max_surb_upstream: config
433                .surb_management
434                .map(|v| Bandwidth::from_bps(v.max_surbs_per_sec * SURB_SIZE as u64)),
435            response_buffer: config
436                .surb_management
437                .map(|v| ByteSize::b(v.target_surb_buffer_size * SURB_SIZE as u64)),
438            session_pool: Some(session_pool_size),
439            abort_handle,
440        },
441    );
442    Ok((bound_host, None, max_clients))
443}
444
445#[derive(Debug, thiserror::Error)]
446pub enum BindError {
447    #[error("conflict detected: listen host already in use")]
448    ListenHostAlreadyUsed,
449
450    #[error("unknown failure: {0}")]
451    UnknownFailure(String),
452}
453
454pub async fn create_udp_client_binding(
455    bind_host: std::net::SocketAddr,
456    port_range: Option<String>,
457    hopr: Arc<Hopr>,
458    open_listeners: ListenerJoinHandles,
459    destination: Address,
460    target_spec: SessionTargetSpec,
461    config: SessionClientConfig,
462) -> Result<(std::net::SocketAddr, Option<HoprSessionId>, usize), BindError> {
463    // Bind the UDP socket first
464    let (bound_host, udp_socket) = udp_bind_to(bind_host, port_range).await.map_err(|e| {
465        if e.kind() == std::io::ErrorKind::AddrInUse {
466            BindError::ListenHostAlreadyUsed
467        } else {
468            BindError::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}"))
469        }
470    })?;
471
472    info!(%bound_host, "UDP session listener bound");
473
474    let target = target_spec
475        .clone()
476        .into_target(IpProtocol::UDP)
477        .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
478
479    // Create a single session for the UDP socket
480    let session = hopr
481        .connect_to(destination, target, config.clone())
482        .await
483        .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
484
485    let open_listeners_clone = open_listeners.clone();
486    let listener_id = ListenerId(hopr_network_types::types::IpProtocol::UDP, bound_host);
487
488    // Create an abort handle so that the Session can be terminated by aborting
489    // the UDP stream first. Because under the hood, the bind_session_to_stream uses
490    // `transfer_session` which in turn uses `copy_duplex_abortable`, aborting the
491    // `udp_socket` will:
492    //
493    // 1. Initiate graceful shutdown of `udp_socket`
494    // 2. Once done, initiate a graceful shutdown of `session`
495    // 3. Finally, return from the `bind_session_to_stream` which will terminate the spawned task
496    //
497    // This is needed because the `udp_socket` cannot terminate by itself.
498    let (abort_handle, abort_reg) = AbortHandle::new_pair();
499    let clients = Arc::new(DashMap::new());
500    let max_clients: usize = 1; // Maximum number of clients for this session. Currently always 1.
501
502    // TODO: add multiple client support to UDP sessions (#7370)
503    let session_id = *session.id();
504    clients.insert(session_id, (bind_host, abort_handle.clone()));
505    hopr_async_runtime::prelude::spawn(async move {
506        #[cfg(all(feature = "prometheus", not(test)))]
507        METRIC_ACTIVE_CLIENTS.increment(&["udp"], 1.0);
508
509        bind_session_to_stream(session, udp_socket, HOPR_UDP_BUFFER_SIZE, Some(abort_reg)).await;
510
511        #[cfg(all(feature = "prometheus", not(test)))]
512        METRIC_ACTIVE_CLIENTS.decrement(&["udp"], 1.0);
513
514        // Once the Session closes, remove it from the list
515        open_listeners_clone.write_arc().await.remove(&listener_id);
516    });
517
518    open_listeners.write_arc().await.insert(
519        listener_id,
520        StoredSessionEntry {
521            destination,
522            target: target_spec,
523            forward_path: config.forward_path_options.clone(),
524            return_path: config.return_path_options.clone(),
525            max_client_sessions: max_clients,
526            max_surb_upstream: config
527                .surb_management
528                .map(|v| Bandwidth::from_bps(v.max_surbs_per_sec * SURB_SIZE as u64)),
529            response_buffer: config
530                .surb_management
531                .map(|v| ByteSize::b(v.target_surb_buffer_size * SURB_SIZE as u64)),
532            session_pool: None,
533            abort_handle,
534            clients,
535        },
536    );
537    Ok((bound_host, Some(session_id), max_clients))
538}
539
540async fn try_restricted_bind<F, S, Fut>(
541    addrs: Vec<std::net::SocketAddr>,
542    range_str: &str,
543    binder: F,
544) -> std::io::Result<S>
545where
546    F: Fn(Vec<std::net::SocketAddr>) -> Fut,
547    Fut: Future<Output = std::io::Result<S>>,
548{
549    if addrs.is_empty() {
550        return Err(std::io::Error::other("no valid socket addresses found"));
551    }
552
553    let range = range_str
554        .split_once(":")
555        .and_then(
556            |(a, b)| match u16::from_str(a).and_then(|a| Ok((a, u16::from_str(b)?))) {
557                Ok((a, b)) if a <= b => Some(a..=b),
558                _ => None,
559            },
560        )
561        .ok_or(std::io::Error::other(format!("invalid port range {range_str}")))?;
562
563    for port in range {
564        let addrs = addrs
565            .iter()
566            .map(|addr| std::net::SocketAddr::new(addr.ip(), port))
567            .collect::<Vec<_>>();
568        match binder(addrs).await {
569            Ok(listener) => return Ok(listener),
570            Err(error) => debug!(%error, "listen address not usable"),
571        }
572    }
573
574    Err(std::io::Error::new(
575        std::io::ErrorKind::AddrNotAvailable,
576        format!("no valid socket addresses found within range: {range_str}"),
577    ))
578}
579
580/// Listen on a specified address with a port from an optional port range for TCP connections.
581async fn tcp_listen_on<A: std::net::ToSocketAddrs>(
582    address: A,
583    port_range: Option<String>,
584) -> std::io::Result<(std::net::SocketAddr, TcpListener)> {
585    let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
586
587    // If automatic port allocation is requested and there's a restriction on the port range
588    // (via HOPRD_SESSION_PORT_RANGE), try to find an address within that range.
589    if addrs.iter().all(|a| a.port() == 0) {
590        if let Some(range_str) = port_range {
591            let tcp_listener =
592                try_restricted_bind(
593                    addrs,
594                    &range_str,
595                    |a| async move { TcpListener::bind(a.as_slice()).await },
596                )
597                .await?;
598            return Ok((tcp_listener.local_addr()?, tcp_listener));
599        }
600    }
601
602    let tcp_listener = TcpListener::bind(addrs.as_slice()).await?;
603    Ok((tcp_listener.local_addr()?, tcp_listener))
604}
605
606pub async fn udp_bind_to<A: std::net::ToSocketAddrs>(
607    address: A,
608    port_range: Option<String>,
609) -> std::io::Result<(std::net::SocketAddr, ConnectedUdpStream)> {
610    let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
611
612    let builder = ConnectedUdpStream::builder()
613        .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
614        .with_foreign_data_mode(ForeignDataMode::Discard) // discard data from UDP clients other than the first one served
615        .with_queue_size(HOPR_UDP_QUEUE_SIZE)
616        .with_receiver_parallelism(
617            std::env::var("HOPRD_SESSION_ENTRY_UDP_RX_PARALLELISM")
618                .ok()
619                .and_then(|s| s.parse::<NonZeroUsize>().ok())
620                .map(UdpStreamParallelism::Specific)
621                .unwrap_or(UdpStreamParallelism::Auto),
622        );
623
624    // If automatic port allocation is requested and there's a restriction on the port range
625    // (via HOPRD_SESSION_PORT_RANGE), try to find an address within that range.
626    if addrs.iter().all(|a| a.port() == 0) {
627        if let Some(range_str) = port_range {
628            let udp_listener = try_restricted_bind(addrs, &range_str, |addrs| {
629                futures::future::ready(builder.clone().build(addrs.as_slice()))
630            })
631            .await?;
632
633            return Ok((*udp_listener.bound_address(), udp_listener));
634        }
635    }
636
637    let udp_socket = builder.build(address)?;
638    Ok((*udp_socket.bound_address(), udp_socket))
639}
640
641async fn bind_session_to_stream<T>(
642    mut session: HoprSession,
643    mut stream: T,
644    max_buf: usize,
645    abort_reg: Option<AbortRegistration>,
646) where
647    T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
648{
649    let session_id = *session.id();
650    match transfer_session(&mut session, &mut stream, max_buf, abort_reg).await {
651        Ok((session_to_stream_bytes, stream_to_session_bytes)) => info!(
652            session_id = ?session_id,
653            session_to_stream_bytes, stream_to_session_bytes, "client session ended",
654        ),
655        Err(error) => error!(
656            session_id = ?session_id,
657            %error,
658            "error during data transfer"
659        ),
660    }
661}
662
663#[cfg(test)]
664mod tests {
665    use anyhow::Context;
666    use futures::{
667        FutureExt, StreamExt,
668        channel::mpsc::{UnboundedReceiver, UnboundedSender},
669    };
670    use futures_time::future::FutureExt as TimeFutureExt;
671    use hopr_crypto_types::crypto_traits::Randomizable;
672    use tokio::io::{AsyncReadExt, AsyncWriteExt};
673
674    use super::*;
675    use crate::{
676        Address, ApplicationData, ApplicationDataIn, ApplicationDataOut, DestinationRouting, HoprPseudonym,
677        HoprSession, HoprSessionId, RoutingOptions,
678    };
679
680    fn loopback_transport() -> (
681        UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
682        UnboundedReceiver<ApplicationDataIn>,
683    ) {
684        let (input_tx, input_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
685        let (output_tx, output_rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
686        tokio::task::spawn(
687            input_rx
688                .map(|(_, data)| {
689                    Ok(ApplicationDataIn {
690                        data: data.data,
691                        packet_info: Default::default(),
692                    })
693                })
694                .forward(output_tx)
695                .map(|e| tracing::debug!(?e, "loopback transport completed")),
696        );
697
698        (input_tx, output_rx)
699    }
700
701    #[tokio::test]
702    async fn hoprd_session_connection_should_create_a_working_tcp_socket_through_which_data_can_be_sent_and_received()
703    -> anyhow::Result<()> {
704        let session_id = HoprSessionId::new(4567u64, HoprPseudonym::random());
705        let peer: Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
706        let session = HoprSession::new(
707            session_id,
708            DestinationRouting::forward_only(peer, RoutingOptions::IntermediatePath(Default::default())),
709            Default::default(),
710            loopback_transport(),
711            None,
712        )?;
713
714        let (bound_addr, tcp_listener) = tcp_listen_on(("127.0.0.1", 0), None)
715            .await
716            .context("listen_on failed")?;
717
718        tokio::task::spawn(async move {
719            match tcp_listener.accept().await {
720                Ok((stream, _)) => bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, None).await,
721                Err(e) => error!("failed to accept connection: {e}"),
722            }
723        });
724
725        let mut tcp_stream = tokio::net::TcpStream::connect(bound_addr)
726            .await
727            .context("connect failed")?;
728
729        let data = vec![b"hello", b"world", b"this ", b"is   ", b"    a", b" test"];
730
731        for d in data.clone().into_iter() {
732            tcp_stream.write_all(d).await.context("write failed")?;
733        }
734
735        for d in data.iter() {
736            let mut buf = vec![0; d.len()];
737            tcp_stream.read_exact(&mut buf).await.context("read failed")?;
738        }
739
740        Ok(())
741    }
742
743    #[test_log::test(tokio::test)]
744    async fn hoprd_session_connection_should_create_a_working_udp_socket_through_which_data_can_be_sent_and_received()
745    -> anyhow::Result<()> {
746        let session_id = HoprSessionId::new(4567u64, HoprPseudonym::random());
747        let peer: Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
748        let session = HoprSession::new(
749            session_id,
750            DestinationRouting::forward_only(peer, RoutingOptions::IntermediatePath(Default::default())),
751            Default::default(),
752            loopback_transport(),
753            None,
754        )?;
755
756        let (listen_addr, udp_listener) = udp_bind_to(("127.0.0.1", 0), None)
757            .await
758            .context("udp_bind_to failed")?;
759
760        let (abort_handle, abort_registration) = AbortHandle::new_pair();
761        let jh = tokio::task::spawn(bind_session_to_stream(
762            session,
763            udp_listener,
764            ApplicationData::PAYLOAD_SIZE,
765            Some(abort_registration),
766        ));
767
768        let mut udp_stream = ConnectedUdpStream::builder()
769            .with_buffer_size(ApplicationData::PAYLOAD_SIZE)
770            .with_queue_size(HOPR_UDP_QUEUE_SIZE)
771            .with_counterparty(listen_addr)
772            .build(("127.0.0.1", 0))
773            .context("bind failed")?;
774
775        let data = vec![b"hello", b"world", b"this ", b"is   ", b"    a", b" test"];
776
777        for d in data.clone().into_iter() {
778            udp_stream.write_all(d).await.context("write failed")?;
779            // ConnectedUdpStream performs flush with each write
780        }
781
782        for d in data.iter() {
783            let mut buf = vec![0; d.len()];
784            udp_stream.read_exact(&mut buf).await.context("read failed")?;
785        }
786
787        // Once aborted, the bind_session_to_stream task must terminate too
788        abort_handle.abort();
789        jh.timeout(futures_time::time::Duration::from_millis(200)).await??;
790
791        Ok(())
792    }
793
794    #[test]
795    fn build_binding_address() {
796        let default = "10.0.0.1:10000".parse().unwrap();
797
798        let result = build_binding_host(Some("127.0.0.1:10000"), default);
799        assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
800
801        let result = build_binding_host(None, default);
802        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
803
804        let result = build_binding_host(Some("127.0.0.1"), default);
805        assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
806
807        let result = build_binding_host(Some(":1234"), default);
808        assert_eq!(result, "10.0.0.1:1234".parse::<std::net::SocketAddr>().unwrap());
809
810        let result = build_binding_host(Some(":"), default);
811        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
812
813        let result = build_binding_host(Some(""), default);
814        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
815    }
816}