hopr_utils_session/
lib.rs

1//! Session-related utilities for HOPR
2//!
3//! This module provides utility functions and structures for managing sessions,
4//! including session lifecycle management, session data handling, and common
5//! session operations.
6
7use std::{
8    collections::VecDeque, fmt::Formatter, future::Future, hash::Hash, net::SocketAddr, num::NonZeroUsize,
9    str::FromStr, sync::Arc,
10};
11
12use anyhow::anyhow;
13use base64::Engine;
14use bytesize::ByteSize;
15use dashmap::DashMap;
16use futures::{
17    FutureExt, StreamExt, TryStreamExt,
18    future::{AbortHandle, AbortRegistration},
19};
20use hopr_api::{chain::HoprChainApi, db::HoprNodeDbApi};
21use hopr_async_runtime::Abortable;
22use hopr_lib::{
23    Address, Hopr, HoprSession, SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget,
24    errors::HoprLibError, transfer_session,
25};
26use hopr_network_types::{
27    prelude::{ConnectedUdpStream, IpOrHost, IpProtocol, SealedHost, UdpStreamParallelism},
28    udp::ForeignDataMode,
29};
30use hopr_transport::RoutingOptions;
31use human_bandwidth::re::bandwidth::Bandwidth;
32use serde::{Deserialize, Serialize};
33use serde_with::serde_as;
34use tokio::net::TcpListener;
35use tracing::{debug, error, info};
36
37/// Size of the buffer for forwarding data to/from a TCP stream.
38pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
39
40/// Size of the buffer for forwarding data to/from a UDP stream.
41pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
42
43/// Size of the queue (back-pressure) for data incoming from a UDP stream.
44pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
45
46#[cfg(all(feature = "prometheus", not(test)))]
47lazy_static::lazy_static! {
48    static ref METRIC_ACTIVE_CLIENTS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
49        "hopr_session_hoprd_clients",
50        "Number of clients connected at this Entry node",
51        &["type"]
52    ).unwrap();
53}
54
55#[serde_as]
56#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
57/// Session target specification.
58pub enum SessionTargetSpec {
59    Plain(String),
60    Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
61    Service(ServiceId),
62}
63
64impl std::fmt::Display for SessionTargetSpec {
65    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66        match self {
67            SessionTargetSpec::Plain(t) => write!(f, "{t}"),
68            SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
69            SessionTargetSpec::Service(t) => write!(f, "#{t}"),
70        }
71    }
72}
73
74impl FromStr for SessionTargetSpec {
75    type Err = HoprLibError;
76
77    fn from_str(s: &str) -> Result<Self, Self::Err> {
78        Ok(if let Some(stripped) = s.strip_prefix("$$") {
79            Self::Sealed(
80                base64::prelude::BASE64_URL_SAFE
81                    .decode(stripped)
82                    .map_err(|e| HoprLibError::Other(e.into()))?,
83            )
84        } else if let Some(stripped) = s.strip_prefix("#") {
85            Self::Service(
86                stripped
87                    .parse()
88                    .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
89            )
90        } else {
91            Self::Plain(s.to_owned())
92        })
93    }
94}
95
96impl SessionTargetSpec {
97    pub fn into_target(self, protocol: IpProtocol) -> Result<SessionTarget, HoprLibError> {
98        Ok(match (protocol, self) {
99            (IpProtocol::TCP, SessionTargetSpec::Plain(plain)) => {
100                SessionTarget::TcpStream(IpOrHost::from_str(&plain).map(SealedHost::from)?)
101            }
102            (IpProtocol::UDP, SessionTargetSpec::Plain(plain)) => {
103                SessionTarget::UdpStream(IpOrHost::from_str(&plain).map(SealedHost::from)?)
104            }
105            (IpProtocol::TCP, SessionTargetSpec::Sealed(enc)) => {
106                SessionTarget::TcpStream(SealedHost::Sealed(enc.into_boxed_slice()))
107            }
108            (IpProtocol::UDP, SessionTargetSpec::Sealed(enc)) => {
109                SessionTarget::UdpStream(SealedHost::Sealed(enc.into_boxed_slice()))
110            }
111            (_, SessionTargetSpec::Service(id)) => SessionTarget::ExitNode(id),
112        })
113    }
114}
115
116/// Entry stored in the session registry table.
117#[derive(Debug)]
118pub struct StoredSessionEntry {
119    /// Destination address of the Session counterparty.
120    pub destination: Address,
121    /// Target of the Session.
122    pub target: SessionTargetSpec,
123    /// Forward path used for the Session.
124    pub forward_path: RoutingOptions,
125    /// Return path used for the Session.
126    pub return_path: RoutingOptions,
127    /// The maximum number of client sessions that the listener can spawn.
128    pub max_client_sessions: usize,
129    /// The maximum number of SURB packets that can be sent upstream.
130    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
131    /// The amount of response data the Session counterparty can deliver back to us, without us
132    /// having to request it.
133    pub response_buffer: Option<bytesize::ByteSize>,
134    /// How many Sessions to pool for clients.
135    pub session_pool: Option<usize>,
136    /// The abort handle for the Session processing.
137    pub abort_handle: AbortHandle,
138
139    clients: Arc<DashMap<SessionId, (SocketAddr, AbortHandle)>>,
140}
141
142impl StoredSessionEntry {
143    pub fn get_clients(&self) -> &Arc<DashMap<SessionId, (SocketAddr, AbortHandle)>> {
144        &self.clients
145    }
146}
147
148/// This function first tries to parse `requested` as the `ip:port` host pair.
149/// If that does not work, it tries to parse `requested` as a single IP address
150/// and as a `:` prefixed port number. Whichever of those fails, is replaced by the corresponding
151/// part from the given `default`.
152pub fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
153    match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
154        Some(Err(requested)) => {
155            // If the requested host is not parseable as a whole as `SocketAddr`, try only its parts
156            debug!(requested, %default, "using partially default listen host");
157            std::net::SocketAddr::new(
158                requested.parse().unwrap_or(default.ip()),
159                requested
160                    .strip_prefix(":")
161                    .and_then(|p| u16::from_str(p).ok())
162                    .unwrap_or(default.port()),
163            )
164        }
165        Some(Ok(requested)) => {
166            debug!(%requested, "using requested listen host");
167            requested
168        }
169        None => {
170            debug!(%default, "using default listen host");
171            default
172        }
173    }
174}
175
176#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
177pub struct ListenerId(pub IpProtocol, pub std::net::SocketAddr);
178
179impl std::fmt::Display for ListenerId {
180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181        write!(f, "{}://{}:{}", self.0, self.1.ip(), self.1.port())
182    }
183}
184
185#[derive(Default)]
186pub struct ListenerJoinHandles(pub DashMap<ListenerId, StoredSessionEntry>);
187
188impl Abortable for ListenerJoinHandles {
189    fn abort_task(&self) {
190        self.0.alter_all(|_, v| {
191            v.abort_handle.abort();
192            v
193        });
194    }
195
196    fn was_aborted(&self) -> bool {
197        self.0.iter().all(|v| v.abort_handle.is_aborted())
198    }
199}
200
201pub struct SessionPool {
202    pool: Option<Arc<parking_lot::Mutex<VecDeque<HoprSession>>>>,
203    ah: Option<AbortHandle>,
204}
205
206impl SessionPool {
207    pub const MAX_SESSION_POOL_SIZE: usize = 5;
208
209    pub async fn new<Chain, Db>(
210        size: usize,
211        dst: Address,
212        target: SessionTarget,
213        cfg: SessionClientConfig,
214        hopr: Arc<Hopr<Chain, Db>>,
215    ) -> Result<Self, anyhow::Error>
216    where
217        Chain: HoprChainApi + Clone + Send + Sync + 'static,
218        Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
219    {
220        let pool = Arc::new(parking_lot::Mutex::new(VecDeque::with_capacity(size)));
221        let hopr_clone = hopr.clone();
222        let pool_clone = pool.clone();
223        futures::stream::iter(0..size.min(Self::MAX_SESSION_POOL_SIZE))
224            .map(Ok)
225            .try_for_each_concurrent(Self::MAX_SESSION_POOL_SIZE, move |i| {
226                let pool = pool_clone.clone();
227                let hopr = hopr_clone.clone();
228                let target = target.clone();
229                let cfg = cfg.clone();
230                async move {
231                    match hopr.connect_to(dst, target.clone(), cfg.clone()).await {
232                        Ok(s) => {
233                            debug!(session_id = %s.id(), num_session = i, "created a new session in pool");
234                            pool.lock().push_back(s);
235                            Ok(())
236                        }
237                        Err(error) => {
238                            error!(%error, num_session = i, "failed to establish session for pool");
239                            Err(anyhow!("failed to establish session #{i} in pool to {dst}: {error}"))
240                        }
241                    }
242                }
243            })
244            .await?;
245
246        // Spawn a task that periodically sends keep alive messages to the Session in the pool.
247        if !pool.lock().is_empty() {
248            let pool_clone_1 = pool.clone();
249            let pool_clone_2 = pool.clone();
250            let pool_clone_3 = pool.clone();
251            Ok(Self {
252                pool: Some(pool),
253                ah: Some(hopr_async_runtime::spawn_as_abortable!(
254                    futures_time::stream::interval(futures_time::time::Duration::from(
255                        std::time::Duration::from_secs(1).max(hopr.config().protocol.session.idle_timeout / 2)
256                    ))
257                    .take_while(move |_| {
258                        // Continue the infinite interval stream until there are sessions in the pool
259                        futures::future::ready(!pool_clone_1.lock().is_empty())
260                    })
261                    .flat_map(move |_| {
262                        // Get all SessionIds of the remaining Sessions in the pool
263                        let ids = pool_clone_2.lock().iter().map(|s| *s.id()).collect::<Vec<_>>();
264                        futures::stream::iter(ids)
265                    })
266                    .for_each(move |id| {
267                        let hopr = hopr.clone();
268                        let pool = pool_clone_3.clone();
269                        async move {
270                            // Make sure the Session is still alive, otherwise remove it from the pool
271                            if let Err(error) = hopr.keep_alive_session(&id).await {
272                                error!(%error, %dst, session_id = %id, "session in pool is not alive, removing from pool");
273                                pool.lock().retain(|s| *s.id() != id);
274                            }
275                        }
276                    })
277                ))
278            })
279        } else {
280            Ok(Self { pool: None, ah: None })
281        }
282    }
283
284    pub fn pop(&mut self) -> Option<HoprSession> {
285        self.pool.as_ref().and_then(|pool| pool.lock().pop_front())
286    }
287}
288
289impl Drop for SessionPool {
290    fn drop(&mut self) {
291        if let Some(ah) = self.ah.take() {
292            ah.abort();
293        }
294    }
295}
296
297#[allow(clippy::too_many_arguments)]
298pub async fn create_tcp_client_binding<Chain, Db>(
299    bind_host: std::net::SocketAddr,
300    port_range: Option<String>,
301    hopr: Arc<Hopr<Chain, Db>>,
302    open_listeners: Arc<ListenerJoinHandles>,
303    destination: Address,
304    target_spec: SessionTargetSpec,
305    config: SessionClientConfig,
306    use_session_pool: Option<usize>,
307    max_client_sessions: Option<usize>,
308) -> Result<(std::net::SocketAddr, Option<SessionId>, usize), BindError>
309where
310    Chain: HoprChainApi + Clone + Send + Sync + 'static,
311    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
312{
313    // Bind the TCP socket first
314    let (bound_host, tcp_listener) = tcp_listen_on(bind_host, port_range).await.map_err(|e| {
315        if e.kind() == std::io::ErrorKind::AddrInUse {
316            BindError::ListenHostAlreadyUsed
317        } else {
318            BindError::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}"))
319        }
320    })?;
321    info!(%bound_host, "TCP session listener bound");
322
323    // For each new TCP connection coming to the listener,
324    // open a Session with the same parameters
325    let target = target_spec
326        .clone()
327        .into_target(IpProtocol::TCP)
328        .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
329
330    // Create a session pool if requested
331    let session_pool_size = use_session_pool.unwrap_or(0);
332    let mut session_pool = SessionPool::new(
333        session_pool_size,
334        destination,
335        target.clone(),
336        config.clone(),
337        hopr.clone(),
338    )
339    .await
340    .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
341
342    let active_sessions = Arc::new(DashMap::new());
343    let mut max_clients = max_client_sessions.unwrap_or(5).max(1);
344
345    if max_clients < session_pool_size {
346        max_clients = session_pool_size;
347    }
348
349    let config_clone = config.clone();
350    // Create an abort handler for the listener
351    let (abort_handle, abort_reg) = AbortHandle::new_pair();
352    let active_sessions_clone = active_sessions.clone();
353    hopr_async_runtime::prelude::spawn(async move {
354        let active_sessions_clone_2 = active_sessions_clone.clone();
355
356        futures::stream::Abortable::new(tokio_stream::wrappers::TcpListenerStream::new(tcp_listener), abort_reg)
357            .and_then(|sock| async { Ok((sock.peer_addr()?, sock)) })
358            .for_each(move |accepted_client| {
359                let data = config_clone.clone();
360                let target = target.clone();
361                let hopr = hopr.clone();
362                let active_sessions = active_sessions_clone_2.clone();
363
364                // Try to pop from the pool only if a client was accepted
365                let maybe_pooled_session = accepted_client.is_ok().then(|| session_pool.pop()).flatten();
366                async move {
367                    match accepted_client {
368                        Ok((sock_addr, mut stream)) => {
369                            debug!(?sock_addr, "incoming TCP connection");
370
371                            // Check that we are still within the quota,
372                            // otherwise shutdown the new client immediately
373                            if active_sessions.len() >= max_clients {
374                                error!(?bind_host, "no more client slots available at listener");
375                                use tokio::io::AsyncWriteExt;
376                                if let Err(error) = stream.shutdown().await {
377                                    error!(%error, ?sock_addr, "failed to shutdown TCP connection");
378                                }
379                                return;
380                            }
381
382                            // See if we still have some session pooled
383                            let session = match maybe_pooled_session {
384                                Some(s) => {
385                                    debug!(session_id = %s.id(), "using pooled session");
386                                    s
387                                }
388                                None => {
389                                    debug!("no more active sessions in the pool, creating a new one");
390                                    match hopr.connect_to(destination, target, data).await {
391                                        Ok(s) => s,
392                                        Err(error) => {
393                                            error!(%error, "failed to establish session");
394                                            return;
395                                        }
396                                    }
397                                }
398                            };
399
400                            let session_id = *session.id();
401                            debug!(?sock_addr, %session_id, "new session for incoming TCP connection");
402
403                            let (abort_handle, abort_reg) = AbortHandle::new_pair();
404                            active_sessions.insert(session_id, (sock_addr, abort_handle));
405
406                            #[cfg(all(feature = "prometheus", not(test)))]
407                            METRIC_ACTIVE_CLIENTS.increment(&["tcp"], 1.0);
408
409                            hopr_async_runtime::prelude::spawn(
410                                // The stream either terminates naturally (by the client closing the TCP connection)
411                                // or is terminated via the abort handle.
412                                bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, Some(abort_reg)).then(
413                                    move |_| async move {
414                                        // Regardless how the session ended, remove the abort handle
415                                        // from the map
416                                        active_sessions.remove(&session_id);
417
418                                        debug!(%session_id, "tcp session has ended");
419
420                                        #[cfg(all(feature = "prometheus", not(test)))]
421                                        METRIC_ACTIVE_CLIENTS.decrement(&["tcp"], 1.0);
422                                    },
423                                ),
424                            );
425                        }
426                        Err(error) => error!(%error, "failed to accept connection"),
427                    }
428                }
429            })
430            .await;
431
432        // Once the listener is done, abort all active sessions created by the listener
433        active_sessions_clone.iter().for_each(|entry| {
434            let (sock_addr, handle) = entry.value();
435            debug!(session_id = %entry.key(), ?sock_addr, "aborting opened TCP session after listener has been closed");
436            handle.abort()
437        });
438    });
439
440    open_listeners.0.insert(
441        ListenerId(hopr_network_types::types::IpProtocol::TCP, bound_host),
442        StoredSessionEntry {
443            destination,
444            target: target_spec,
445            forward_path: config.forward_path_options,
446            return_path: config.return_path_options,
447            clients: active_sessions,
448            max_client_sessions: max_clients,
449            max_surb_upstream: config
450                .surb_management
451                .map(|v| Bandwidth::from_bps(v.max_surbs_per_sec * SURB_SIZE as u64)),
452            response_buffer: config
453                .surb_management
454                .map(|v| ByteSize::b(v.target_surb_buffer_size * SURB_SIZE as u64)),
455            session_pool: Some(session_pool_size),
456            abort_handle,
457        },
458    );
459    Ok((bound_host, None, max_clients))
460}
461
462#[derive(Debug, thiserror::Error)]
463pub enum BindError {
464    #[error("conflict detected: listen host already in use")]
465    ListenHostAlreadyUsed,
466
467    #[error("unknown failure: {0}")]
468    UnknownFailure(String),
469}
470
471pub async fn create_udp_client_binding<Chain, Db>(
472    bind_host: std::net::SocketAddr,
473    port_range: Option<String>,
474    hopr: Arc<Hopr<Chain, Db>>,
475    open_listeners: Arc<ListenerJoinHandles>,
476    destination: Address,
477    target_spec: SessionTargetSpec,
478    config: SessionClientConfig,
479) -> Result<(std::net::SocketAddr, Option<SessionId>, usize), BindError>
480where
481    Chain: HoprChainApi + Clone + Send + Sync + 'static,
482    Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
483{
484    // Bind the UDP socket first
485    let (bound_host, udp_socket) = udp_bind_to(bind_host, port_range).await.map_err(|e| {
486        if e.kind() == std::io::ErrorKind::AddrInUse {
487            BindError::ListenHostAlreadyUsed
488        } else {
489            BindError::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}"))
490        }
491    })?;
492
493    info!(%bound_host, "UDP session listener bound");
494
495    let target = target_spec
496        .clone()
497        .into_target(IpProtocol::UDP)
498        .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
499
500    // Create a single session for the UDP socket
501    let session = hopr
502        .connect_to(destination, target, config.clone())
503        .await
504        .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
505
506    let open_listeners_clone = open_listeners.clone();
507    let listener_id = ListenerId(hopr_network_types::types::IpProtocol::UDP, bound_host);
508
509    // Create an abort handle so that the Session can be terminated by aborting
510    // the UDP stream first. Because under the hood, the bind_session_to_stream uses
511    // `transfer_session` which in turn uses `copy_duplex_abortable`, aborting the
512    // `udp_socket` will:
513    //
514    // 1. Initiate graceful shutdown of `udp_socket`
515    // 2. Once done, initiate a graceful shutdown of `session`
516    // 3. Finally, return from the `bind_session_to_stream` which will terminate the spawned task
517    //
518    // This is needed because the `udp_socket` cannot terminate by itself.
519    let (abort_handle, abort_reg) = AbortHandle::new_pair();
520    let clients = Arc::new(DashMap::new());
521    let max_clients: usize = 1; // Maximum number of clients for this session. Currently always 1.
522
523    // TODO: add multiple client support to UDP sessions (#7370)
524    let session_id = *session.id();
525    clients.insert(session_id, (bind_host, abort_handle.clone()));
526    hopr_async_runtime::prelude::spawn(async move {
527        #[cfg(all(feature = "prometheus", not(test)))]
528        METRIC_ACTIVE_CLIENTS.increment(&["udp"], 1.0);
529
530        bind_session_to_stream(session, udp_socket, HOPR_UDP_BUFFER_SIZE, Some(abort_reg)).await;
531
532        #[cfg(all(feature = "prometheus", not(test)))]
533        METRIC_ACTIVE_CLIENTS.decrement(&["udp"], 1.0);
534
535        // Once the Session closes, remove it from the list
536        open_listeners_clone.0.remove(&listener_id);
537    });
538
539    open_listeners.0.insert(
540        listener_id,
541        StoredSessionEntry {
542            destination,
543            target: target_spec,
544            forward_path: config.forward_path_options.clone(),
545            return_path: config.return_path_options.clone(),
546            max_client_sessions: max_clients,
547            max_surb_upstream: config
548                .surb_management
549                .map(|v| Bandwidth::from_bps(v.max_surbs_per_sec * SURB_SIZE as u64)),
550            response_buffer: config
551                .surb_management
552                .map(|v| ByteSize::b(v.target_surb_buffer_size * SURB_SIZE as u64)),
553            session_pool: None,
554            abort_handle,
555            clients,
556        },
557    );
558    Ok((bound_host, Some(session_id), max_clients))
559}
560
561async fn try_restricted_bind<F, S, Fut>(
562    addrs: Vec<std::net::SocketAddr>,
563    range_str: &str,
564    binder: F,
565) -> std::io::Result<S>
566where
567    F: Fn(Vec<std::net::SocketAddr>) -> Fut,
568    Fut: Future<Output = std::io::Result<S>>,
569{
570    if addrs.is_empty() {
571        return Err(std::io::Error::other("no valid socket addresses found"));
572    }
573
574    let range = range_str
575        .split_once(":")
576        .and_then(
577            |(a, b)| match u16::from_str(a).and_then(|a| Ok((a, u16::from_str(b)?))) {
578                Ok((a, b)) if a <= b => Some(a..=b),
579                _ => None,
580            },
581        )
582        .ok_or(std::io::Error::other(format!("invalid port range {range_str}")))?;
583
584    for port in range {
585        let addrs = addrs
586            .iter()
587            .map(|addr| std::net::SocketAddr::new(addr.ip(), port))
588            .collect::<Vec<_>>();
589        match binder(addrs).await {
590            Ok(listener) => return Ok(listener),
591            Err(error) => debug!(%error, "listen address not usable"),
592        }
593    }
594
595    Err(std::io::Error::new(
596        std::io::ErrorKind::AddrNotAvailable,
597        format!("no valid socket addresses found within range: {range_str}"),
598    ))
599}
600
601/// Listen on a specified address with a port from an optional port range for TCP connections.
602async fn tcp_listen_on<A: std::net::ToSocketAddrs>(
603    address: A,
604    port_range: Option<String>,
605) -> std::io::Result<(std::net::SocketAddr, TcpListener)> {
606    let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
607
608    // If automatic port allocation is requested and there's a restriction on the port range
609    // (via HOPRD_SESSION_PORT_RANGE), try to find an address within that range.
610    if addrs.iter().all(|a| a.port() == 0)
611        && let Some(range_str) = port_range
612    {
613        let tcp_listener = try_restricted_bind(
614            addrs,
615            &range_str,
616            |a| async move { TcpListener::bind(a.as_slice()).await },
617        )
618        .await?;
619        return Ok((tcp_listener.local_addr()?, tcp_listener));
620    }
621
622    let tcp_listener = TcpListener::bind(addrs.as_slice()).await?;
623    Ok((tcp_listener.local_addr()?, tcp_listener))
624}
625
626pub async fn udp_bind_to<A: std::net::ToSocketAddrs>(
627    address: A,
628    port_range: Option<String>,
629) -> std::io::Result<(std::net::SocketAddr, ConnectedUdpStream)> {
630    let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
631
632    let builder = ConnectedUdpStream::builder()
633        .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
634        .with_foreign_data_mode(ForeignDataMode::Discard) // discard data from UDP clients other than the first one served
635        .with_queue_size(HOPR_UDP_QUEUE_SIZE)
636        .with_receiver_parallelism(
637            std::env::var("HOPRD_SESSION_ENTRY_UDP_RX_PARALLELISM")
638                .ok()
639                .and_then(|s| s.parse::<NonZeroUsize>().ok())
640                .map(UdpStreamParallelism::Specific)
641                .unwrap_or(UdpStreamParallelism::Auto),
642        );
643
644    // If automatic port allocation is requested and there's a restriction on the port range
645    // (via HOPRD_SESSION_PORT_RANGE), try to find an address within that range.
646    if addrs.iter().all(|a| a.port() == 0)
647        && let Some(range_str) = port_range
648    {
649        let udp_listener = try_restricted_bind(addrs, &range_str, |addrs| {
650            futures::future::ready(builder.clone().build(addrs.as_slice()))
651        })
652        .await?;
653
654        return Ok((*udp_listener.bound_address(), udp_listener));
655    }
656
657    let udp_socket = builder.build(address)?;
658    Ok((*udp_socket.bound_address(), udp_socket))
659}
660
661async fn bind_session_to_stream<T>(
662    mut session: HoprSession,
663    mut stream: T,
664    max_buf: usize,
665    abort_reg: Option<AbortRegistration>,
666) where
667    T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
668{
669    let session_id = *session.id();
670    match transfer_session(&mut session, &mut stream, max_buf, abort_reg).await {
671        Ok((session_to_stream_bytes, stream_to_session_bytes)) => info!(
672            session_id = ?session_id,
673            session_to_stream_bytes, stream_to_session_bytes, "client session ended",
674        ),
675        Err(error) => error!(
676            session_id = ?session_id,
677            %error,
678            "error during data transfer"
679        ),
680    }
681}
682
683#[cfg(test)]
684mod tests {
685    use anyhow::Context;
686    use futures::{
687        FutureExt, StreamExt,
688        channel::mpsc::{UnboundedReceiver, UnboundedSender},
689    };
690    use futures_time::future::FutureExt as TimeFutureExt;
691    use hopr_crypto_types::crypto_traits::Randomizable;
692    use hopr_lib::{
693        Address, ApplicationData, ApplicationDataIn, ApplicationDataOut, DestinationRouting, HoprPseudonym,
694        HoprSession, RoutingOptions, SessionId,
695    };
696    use tokio::io::{AsyncReadExt, AsyncWriteExt};
697
698    use super::*;
699
700    fn loopback_transport() -> (
701        UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
702        UnboundedReceiver<ApplicationDataIn>,
703    ) {
704        let (input_tx, input_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
705        let (output_tx, output_rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
706        tokio::task::spawn(
707            input_rx
708                .map(|(_, data)| {
709                    Ok(ApplicationDataIn {
710                        data: data.data,
711                        packet_info: Default::default(),
712                    })
713                })
714                .forward(output_tx)
715                .map(|e| tracing::debug!(?e, "loopback transport completed")),
716        );
717
718        (input_tx, output_rx)
719    }
720
721    #[tokio::test]
722    async fn hoprd_session_connection_should_create_a_working_tcp_socket_through_which_data_can_be_sent_and_received()
723    -> anyhow::Result<()> {
724        let session_id = SessionId::new(4567u64, HoprPseudonym::random());
725        let peer: Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
726        let session = HoprSession::new(
727            session_id,
728            DestinationRouting::forward_only(peer, RoutingOptions::IntermediatePath(Default::default())),
729            Default::default(),
730            loopback_transport(),
731            None,
732        )?;
733
734        let (bound_addr, tcp_listener) = tcp_listen_on(("127.0.0.1", 0), None)
735            .await
736            .context("listen_on failed")?;
737
738        tokio::task::spawn(async move {
739            match tcp_listener.accept().await {
740                Ok((stream, _)) => bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, None).await,
741                Err(e) => error!("failed to accept connection: {e}"),
742            }
743        });
744
745        let mut tcp_stream = tokio::net::TcpStream::connect(bound_addr)
746            .await
747            .context("connect failed")?;
748
749        let data = vec![b"hello", b"world", b"this ", b"is   ", b"    a", b" test"];
750
751        for d in data.clone().into_iter() {
752            tcp_stream.write_all(d).await.context("write failed")?;
753        }
754
755        for d in data.iter() {
756            let mut buf = vec![0; d.len()];
757            tcp_stream.read_exact(&mut buf).await.context("read failed")?;
758        }
759
760        Ok(())
761    }
762
763    #[test_log::test(tokio::test)]
764    async fn hoprd_session_connection_should_create_a_working_udp_socket_through_which_data_can_be_sent_and_received()
765    -> anyhow::Result<()> {
766        let session_id = SessionId::new(4567u64, HoprPseudonym::random());
767        let peer: Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
768        let session = HoprSession::new(
769            session_id,
770            DestinationRouting::forward_only(peer, RoutingOptions::IntermediatePath(Default::default())),
771            Default::default(),
772            loopback_transport(),
773            None,
774        )?;
775
776        let (listen_addr, udp_listener) = udp_bind_to(("127.0.0.1", 0), None)
777            .await
778            .context("udp_bind_to failed")?;
779
780        let (abort_handle, abort_registration) = AbortHandle::new_pair();
781        let jh = tokio::task::spawn(bind_session_to_stream(
782            session,
783            udp_listener,
784            ApplicationData::PAYLOAD_SIZE,
785            Some(abort_registration),
786        ));
787
788        let mut udp_stream = ConnectedUdpStream::builder()
789            .with_buffer_size(ApplicationData::PAYLOAD_SIZE)
790            .with_queue_size(HOPR_UDP_QUEUE_SIZE)
791            .with_counterparty(listen_addr)
792            .build(("127.0.0.1", 0))
793            .context("bind failed")?;
794
795        let data = vec![b"hello", b"world", b"this ", b"is   ", b"    a", b" test"];
796
797        for d in data.clone().into_iter() {
798            udp_stream.write_all(d).await.context("write failed")?;
799            // ConnectedUdpStream performs flush with each write
800        }
801
802        for d in data.iter() {
803            let mut buf = vec![0; d.len()];
804            udp_stream.read_exact(&mut buf).await.context("read failed")?;
805        }
806
807        // Once aborted, the bind_session_to_stream task must terminate too
808        abort_handle.abort();
809        jh.timeout(futures_time::time::Duration::from_millis(200)).await??;
810
811        Ok(())
812    }
813
814    #[test]
815    fn build_binding_address() {
816        let default = "10.0.0.1:10000".parse().unwrap();
817
818        let result = build_binding_host(Some("127.0.0.1:10000"), default);
819        assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
820
821        let result = build_binding_host(None, default);
822        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
823
824        let result = build_binding_host(Some("127.0.0.1"), default);
825        assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
826
827        let result = build_binding_host(Some(":1234"), default);
828        assert_eq!(result, "10.0.0.1:1234".parse::<std::net::SocketAddr>().unwrap());
829
830        let result = build_binding_host(Some(":"), default);
831        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
832
833        let result = build_binding_host(Some(""), default);
834        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
835    }
836}