hoprd_api/
session.rs

1use std::{collections::VecDeque, fmt::Formatter, future::Future, net::IpAddr, str::FromStr, sync::Arc};
2
3use axum::{
4    Error,
5    extract::{
6        Json, Path, State,
7        ws::{Message, WebSocket, WebSocketUpgrade},
8    },
9    http::status::StatusCode,
10    response::IntoResponse,
11};
12use axum_extra::extract::Query;
13use base64::Engine;
14use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt, TryStreamExt, future::AbortHandle};
15use futures_concurrency::stream::Merge;
16use hopr_lib::{
17    Address, Hopr, HoprSession, SESSION_PAYLOAD_SIZE, ServiceId, SessionCapabilities, SessionClientConfig,
18    SessionTarget, SurbBalancerConfig, errors::HoprLibError, transfer_session,
19};
20use hopr_network_types::{
21    prelude::{ConnectedUdpStream, IpOrHost, SealedHost, UdpStreamParallelism},
22    udp::ForeignDataMode,
23    utils::AsyncReadStreamer,
24};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use tokio::net::TcpListener;
28use tracing::{debug, error, info, trace};
29
30use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState, ListenerId};
31
32/// Size of the buffer for forwarding data to/from a TCP stream.
33pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
34
35/// Size of the buffer for forwarding data to/from a UDP stream.
36pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
37
38/// Size of the queue (back-pressure) for data incoming from a UDP stream.
39pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
40
41#[cfg(all(feature = "prometheus", not(test)))]
42lazy_static::lazy_static! {
43    static ref METRIC_ACTIVE_CLIENTS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
44        "hopr_session_hoprd_clients",
45        "Number of clients connected at this Entry node",
46        &["type"]
47    ).unwrap();
48}
49#[serde_as]
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
51#[schema(
52    example = json!({"Plain": "example.com:80"}),
53    example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), // base64 for "HelloWorld"
54    example = json!({"Service": 0})
55)]
56/// Session target specification.
57pub enum SessionTargetSpec {
58    Plain(String),
59    Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
60    #[schema(value_type = u32)]
61    Service(ServiceId),
62}
63
64impl std::fmt::Display for SessionTargetSpec {
65    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66        match self {
67            SessionTargetSpec::Plain(t) => write!(f, "{t}"),
68            SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
69            SessionTargetSpec::Service(t) => write!(f, "#{t}"),
70        }
71    }
72}
73
74impl FromStr for SessionTargetSpec {
75    type Err = HoprLibError;
76
77    fn from_str(s: &str) -> Result<Self, Self::Err> {
78        Ok(if let Some(stripped) = s.strip_prefix("$$") {
79            Self::Sealed(
80                base64::prelude::BASE64_URL_SAFE
81                    .decode(stripped)
82                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
83            )
84        } else if let Some(stripped) = s.strip_prefix("#") {
85            Self::Service(
86                stripped
87                    .parse()
88                    .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
89            )
90        } else {
91            Self::Plain(s.to_owned())
92        })
93    }
94}
95
96impl SessionTargetSpec {
97    pub fn into_target(self, protocol: IpProtocol) -> Result<SessionTarget, HoprLibError> {
98        Ok(match (protocol, self) {
99            (IpProtocol::TCP, SessionTargetSpec::Plain(plain)) => SessionTarget::TcpStream(
100                IpOrHost::from_str(&plain)
101                    .map(SealedHost::from)
102                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
103            ),
104            (IpProtocol::UDP, SessionTargetSpec::Plain(plain)) => SessionTarget::UdpStream(
105                IpOrHost::from_str(&plain)
106                    .map(SealedHost::from)
107                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
108            ),
109            (IpProtocol::TCP, SessionTargetSpec::Sealed(enc)) => {
110                SessionTarget::TcpStream(SealedHost::Sealed(enc.into_boxed_slice()))
111            }
112            (IpProtocol::UDP, SessionTargetSpec::Sealed(enc)) => {
113                SessionTarget::UdpStream(SealedHost::Sealed(enc.into_boxed_slice()))
114            }
115            (_, SessionTargetSpec::Service(id)) => SessionTarget::ExitNode(id),
116        })
117    }
118}
119
120/// Entry stored in the session registry table.
121#[derive(Debug)]
122pub struct StoredSessionEntry {
123    /// Destination address of the Session counterparty.
124    pub destination: Address,
125    /// Target of the Session.
126    pub target: SessionTargetSpec,
127    /// Forward path used for the Session.
128    pub forward_path: RoutingOptions,
129    /// Return path used for the Session.
130    pub return_path: RoutingOptions,
131    /// The join handle for the Session processing.
132    pub abort_handle: AbortHandle,
133}
134
135#[repr(u8)]
136#[derive(
137    Debug,
138    Clone,
139    strum::EnumIter,
140    strum::Display,
141    strum::EnumString,
142    Serialize,
143    Deserialize,
144    PartialEq,
145    utoipa::ToSchema,
146)]
147#[schema(example = "Segmentation")]
148/// Session capabilities that can be negotiated with the target peer.
149pub enum SessionCapability {
150    /// Frame segmentation
151    Segmentation,
152    /// Frame retransmission (ACK and NACK-based)
153    Retransmission,
154    /// Frame retransmission (only ACK-based)
155    RetransmissionAckOnly,
156    /// Disable packet buffering
157    NoDelay,
158}
159
160impl From<SessionCapability> for hopr_lib::SessionCapabilities {
161    fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
162        match cap {
163            SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
164            SessionCapability::Retransmission => {
165                hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
166            }
167            SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
168            SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
169        }
170    }
171}
172
173#[serde_as]
174#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
175#[serde(rename_all = "camelCase")]
176pub(crate) struct SessionWebsocketClientQueryRequest {
177    #[serde_as(as = "DisplayFromStr")]
178    #[schema(required = true, value_type = String)]
179    pub destination: Address,
180    #[schema(required = true)]
181    pub hops: u8,
182    #[cfg(feature = "explicit-path")]
183    #[schema(required = false, value_type = String)]
184    pub path: Option<Vec<Address>>,
185    #[schema(required = true)]
186    #[serde_as(as = "Vec<DisplayFromStr>")]
187    pub capabilities: Vec<SessionCapability>,
188    #[schema(required = true)]
189    #[serde_as(as = "DisplayFromStr")]
190    pub target: SessionTargetSpec,
191    #[schema(required = false)]
192    #[serde(default = "default_protocol")]
193    pub protocol: IpProtocol,
194}
195
196#[inline]
197fn default_protocol() -> IpProtocol {
198    IpProtocol::TCP
199}
200
201impl SessionWebsocketClientQueryRequest {
202    pub(crate) async fn into_protocol_session_config(
203        self,
204    ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
205        #[cfg(not(feature = "explicit-path"))]
206        let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
207
208        #[cfg(feature = "explicit-path")]
209        let path_options = if let Some(path) = self.path {
210            // Explicit `path` will override `hops`
211            hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
212        } else {
213            hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
214        };
215
216        let mut capabilities = SessionCapabilities::empty();
217        capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
218
219        Ok((
220            self.destination,
221            self.target.into_target(self.protocol)?,
222            SessionClientConfig {
223                forward_path_options: path_options.clone(),
224                return_path_options: path_options.clone(), // TODO: allow using separate return options
225                capabilities,
226                ..Default::default()
227            },
228        ))
229    }
230}
231
232#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
233#[schema(value_type = String, format = Binary)]
234#[allow(dead_code)] // not dead code, just for codegen
235struct WssData(Vec<u8>);
236
237/// Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR
238/// sessions.
239///
240/// Once configured, the session represents and automatically managed connection to a target peer through a network
241/// routing configuration. The session can be used to send and receive binary data over the network.
242///
243/// Authentication (if enabled) is done by cookie `X-Auth-Token`.
244///
245/// Connect to the endpoint by using a WS client. No preview is available. Example:
246/// `ws://127.0.0.1:3001/api/v4/session/websocket`
247#[allow(dead_code)] // not dead code, just for documentation
248#[utoipa::path(
249        get,
250        path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
251        description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
252        request_body(
253            content = SessionWebsocketClientQueryRequest,
254            content_type = "application/json",
255            description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
256        ),
257        responses(
258            (status = 200, description = "Successfully created a new client websocket session."),
259            (status = 401, description = "Invalid authorization token.", body = ApiError),
260            (status = 422, description = "Unknown failure", body = ApiError),
261            (status = 429, description = "Too many open websocket connections.", body = ApiError),
262        ),
263        security(
264            ("api_token" = []),
265            ("bearer_token" = [])
266        ),
267        tag = "Session",
268    )]
269
270pub(crate) async fn websocket(
271    ws: WebSocketUpgrade,
272    Query(query): Query<SessionWebsocketClientQueryRequest>,
273    State(state): State<Arc<InternalState>>,
274) -> Result<impl IntoResponse, impl IntoResponse> {
275    let (dst, target, data) = query
276        .into_protocol_session_config()
277        .await
278        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
279
280    let hopr = state.hopr.clone();
281    let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
282        error!(error = %e, "Failed to establish session");
283        (
284            StatusCode::UNPROCESSABLE_ENTITY,
285            ApiErrorStatus::UnknownFailure(e.to_string()),
286        )
287    })?;
288
289    Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
290}
291
292enum WebSocketInput {
293    Network(Result<Box<[u8]>, std::io::Error>),
294    WsInput(Result<Message, Error>),
295}
296
297/// The maximum number of bytes read from a Session that WS can transfer within a single message.
298const WS_MAX_SESSION_READ_SIZE: usize = 4096;
299
300#[tracing::instrument(level = "debug", skip(socket, session))]
301async fn websocket_connection(socket: WebSocket, session: HoprSession) {
302    let session_id = *session.id();
303
304    let (rx, mut tx) = session.split();
305    let (mut sender, receiver) = socket.split();
306
307    let mut queue = (
308        receiver.map(WebSocketInput::WsInput),
309        AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
310    )
311        .merge();
312
313    let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
314
315    while let Some(v) = queue.next().await {
316        match v {
317            WebSocketInput::Network(bytes) => match bytes {
318                Ok(bytes) => {
319                    let len = bytes.len();
320                    if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
321                        error!(
322                            error = %e,
323                            "Failed to emit read data onto the websocket, closing connection"
324                        );
325                        break;
326                    };
327                    bytes_from_session += len;
328                }
329                Err(e) => {
330                    error!(
331                        error = %e,
332                        "Failed to push data from network to socket, closing connection"
333                    );
334                    break;
335                }
336            },
337            WebSocketInput::WsInput(ws_in) => match ws_in {
338                Ok(Message::Binary(data)) => {
339                    let len = data.len();
340                    if let Err(e) = tx.write(data.as_ref()).await {
341                        error!(error = %e, "Failed to write data to the session, closing connection");
342                        break;
343                    }
344                    bytes_to_session += len;
345                }
346                Ok(Message::Text(_)) => {
347                    error!("Received string instead of binary data, closing connection");
348                    break;
349                }
350                Ok(Message::Close(_)) => {
351                    debug!("Received close frame, closing connection");
352                    break;
353                }
354                Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
355                Err(e) => {
356                    error!(error = %e, "Failed to get a valid websocket message, closing connection");
357                    break;
358                }
359            },
360        }
361    }
362
363    info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
364}
365
366#[serde_as]
367#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
368#[schema(example = json!({ "Hops": 1 }))]
369/// Routing options for the Session.
370pub enum RoutingOptions {
371    #[cfg(feature = "explicit-path")]
372    #[schema(value_type = Vec<String>)]
373    IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<Address>),
374    Hops(usize),
375}
376
377impl RoutingOptions {
378    pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
379        Ok(match self {
380            #[cfg(feature = "explicit-path")]
381            RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
382            RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
383        })
384    }
385}
386
387#[serde_as]
388#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
389#[schema(example = json!({
390        "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
391        "forwardPath": { "Hops": 1 },
392        "returnPath": { "Hops": 1 },
393        "target": {"Plain": "localhost:8080"},
394        "listenHost": "127.0.0.1:10000",
395        "capabilities": ["Retransmission", "Segmentation"],
396        "responseBuffer": "2 MB",
397        "sessionPool": 0,
398    }))]
399#[serde(rename_all = "camelCase")]
400/// Request body for creating a new client session.
401pub(crate) struct SessionClientRequest {
402    /// Address of the Exit node.
403    #[serde_as(as = "DisplayFromStr")]
404    #[schema(value_type = String)]
405    pub destination: Address,
406    /// The forward path for the Session.
407    pub forward_path: RoutingOptions,
408    /// The return path for the Session.
409    pub return_path: RoutingOptions,
410    /// Target for the Session.
411    pub target: SessionTargetSpec,
412    /// Listen host (`ip:port`) for the Session socket at the Entry node.
413    ///
414    /// Supports also partial specification (only `ip` or only `:port`) with the
415    /// respective part replaced by the node's configured default.
416    pub listen_host: Option<String>,
417    #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
418    /// Capabilities for the Session protocol.
419    ///
420    /// Defaults to `Segmentation` and `Retransmission` for TCP and nothing for UDP.
421    pub capabilities: Option<Vec<SessionCapability>>,
422    /// The amount of response data the Session counterparty can deliver back to us,
423    /// without us sending any SURBs to them.
424    ///
425    /// In other words, this size is recalculated to a number of SURBs delivered
426    /// to the counterparty upfront and then maintained.
427    /// The maintenance is dynamic, based on the number of responses we receive.
428    ///
429    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
430    /// at least the size of 2 Session packet payloads.
431    #[serde_as(as = "Option<DisplayFromStr>")]
432    #[schema(value_type = String)]
433    pub response_buffer: Option<bytesize::ByteSize>,
434    /// How many Sessions to pool for clients.
435    ///
436    /// If no sessions are pooled, they will be opened ad-hoc when a client connects.
437    /// It has no effect on UDP sessions in the current implementation.
438    ///
439    /// Currently, the maximum value is 5.
440    pub session_pool: Option<usize>,
441}
442
443impl SessionClientRequest {
444    pub(crate) async fn into_protocol_session_config(
445        self,
446        target_protocol: IpProtocol,
447    ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
448        Ok((
449            self.destination,
450            self.target.into_target(target_protocol)?,
451            SessionClientConfig {
452                forward_path_options: self.forward_path.resolve().await?,
453                return_path_options: self.return_path.resolve().await?,
454                capabilities: self
455                    .capabilities
456                    .map(|vs| {
457                        let mut caps = SessionCapabilities::empty();
458                        caps.extend(vs.into_iter().map(SessionCapabilities::from));
459                        caps
460                    })
461                    .unwrap_or_else(|| match target_protocol {
462                        IpProtocol::TCP => {
463                            hopr_lib::SessionCapability::RetransmissionAck
464                                | hopr_lib::SessionCapability::RetransmissionNack
465                                | hopr_lib::SessionCapability::Segmentation
466                        }
467                        // Only Segmentation capability for UDP per default
468                        _ => SessionCapability::Segmentation.into(),
469                    }),
470                surb_management: match self.response_buffer {
471                    // Buffer worth at least 2 reply packets
472                    Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_PAYLOAD_SIZE as u64 => {
473                        Some(SurbBalancerConfig {
474                            target_surb_buffer_size: buffer_size.as_u64() / SESSION_PAYLOAD_SIZE as u64,
475                            ..Default::default()
476                        })
477                    }
478                    // No SURBs are set up and maintained, useful for high-send low-reply sessions
479                    Some(_) => None,
480                    // Use defaults otherwise
481                    None => Some(SurbBalancerConfig::default()),
482                },
483                ..Default::default()
484            },
485        ))
486    }
487}
488
489#[serde_as]
490#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
491#[schema(example = json!({
492        "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
493        "target": "example.com:80",
494        "forwardPath": { "Hops": 1 },
495        "returnPath": { "Hops": 1 },
496        "protocol": "tcp",
497        "ip": "127.0.0.1",
498        "port": 5542,
499        "mtu": 987
500    }))]
501#[serde(rename_all = "camelCase")]
502/// Response body for creating a new client session.
503pub(crate) struct SessionClientResponse {
504    #[schema(example = "example.com:80")]
505    /// Target of the Session.
506    pub target: String,
507    /// Destination node (exit node) of the Session.
508    #[serde_as(as = "DisplayFromStr")]
509    #[schema(value_type = String)]
510    pub destination: Address,
511    /// Forward routing path.
512    pub forward_path: RoutingOptions,
513    /// Return routing path.
514    pub return_path: RoutingOptions,
515    /// IP protocol used by Session's listening socket.
516    #[serde_as(as = "DisplayFromStr")]
517    #[schema(example = "tcp")]
518    pub protocol: IpProtocol,
519    /// Listening IP address of the Session's socket.
520    #[schema(example = "127.0.0.1")]
521    pub ip: String,
522    #[schema(example = 5542)]
523    /// Listening port of the Session's socket.
524    pub port: u16,
525    /// MTU used by the Session.
526    pub mtu: usize,
527}
528
529/// This function first tries to parse `requested` as the `ip:port` host pair.
530/// If that does not work, it tries to parse `requested` as a single IP address
531/// and as a `:` prefixed port number. Whichever of those fails, is replaced by the corresponding
532/// part from the given `default`.
533fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
534    match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
535        Some(Err(requested)) => {
536            // If the requested host is not parseable as a whole as `SocketAddr`, try only its parts
537            debug!(requested, %default, "using partially default listen host");
538            std::net::SocketAddr::new(
539                requested.parse().unwrap_or(default.ip()),
540                requested
541                    .strip_prefix(":")
542                    .and_then(|p| u16::from_str(p).ok())
543                    .unwrap_or(default.port()),
544            )
545        }
546        Some(Ok(requested)) => {
547            debug!(%requested, "using requested listen host");
548            requested
549        }
550        None => {
551            debug!(%default, "using default listen host");
552            default
553        }
554    }
555}
556
557struct SessionPool {
558    pool: Option<Arc<std::sync::Mutex<VecDeque<HoprSession>>>>,
559    ah: Option<AbortHandle>,
560}
561
562impl SessionPool {
563    pub const MAX_SESSION_POOL_SIZE: usize = 5;
564
565    async fn new(
566        size: usize,
567        dst: Address,
568        target: SessionTarget,
569        cfg: SessionClientConfig,
570        hopr: Arc<Hopr>,
571    ) -> Result<Self, (StatusCode, ApiErrorStatus)> {
572        let pool = Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(size)));
573        let hopr_clone = hopr.clone();
574        let pool_clone = pool.clone();
575        futures::stream::iter(0..size.min(Self::MAX_SESSION_POOL_SIZE))
576            .map(Ok)
577            .try_for_each_concurrent(Self::MAX_SESSION_POOL_SIZE, move |i| {
578                let pool = pool_clone.clone();
579                let hopr = hopr_clone.clone();
580                let target = target.clone();
581                let cfg = cfg.clone();
582                async move {
583                    match hopr.connect_to(dst, target.clone(), cfg.clone()).await {
584                        Ok(s) => {
585                            debug!(session_id = %s.id(), num_session = i, "created a new session in pool");
586                            pool.lock()
587                                .map_err(|_| {
588                                    (
589                                        StatusCode::INTERNAL_SERVER_ERROR,
590                                        ApiErrorStatus::UnknownFailure("lock failed".into()),
591                                    )
592                                })?
593                                .push_back(s);
594                            Ok(())
595                        }
596                        Err(error) => {
597                            error!(%error, num_session = i, "failed to establish session for pool");
598                            Err((
599                                StatusCode::INTERNAL_SERVER_ERROR,
600                                ApiErrorStatus::UnknownFailure(format!(
601                                    "failed to establish session #{i} in pool to {dst}: {error}"
602                                )),
603                            ))
604                        }
605                    }
606                }
607            })
608            .await?;
609
610        // Spawn a task that periodically sends keep alive messages to the Session in the pool.
611        if !pool.lock().map(|p| p.is_empty()).unwrap_or(true) {
612            let pool_clone_1 = pool.clone();
613            let pool_clone_2 = pool.clone();
614            let pool_clone_3 = pool.clone();
615            Ok(Self {
616                pool: Some(pool),
617                ah: Some(hopr_async_runtime::spawn_as_abortable(
618                    futures_time::stream::interval(futures_time::time::Duration::from(
619                        std::time::Duration::from_secs(1).max(hopr.config().session.idle_timeout / 2)
620                    ))
621                    .take_while(move |_| {
622                        // Continue the infinite interval stream until there are sessions in the pool
623                        futures::future::ready(pool_clone_1.lock().is_ok_and(|p| !p.is_empty()))
624                    })
625                    .flat_map(move |_| {
626                        // Get all SessionIds of the remaining Sessions in the pool
627                        let ids = pool_clone_2.lock().ok().map(|v| v.iter().map(|s| *s.id()).collect::<Vec<_>>());
628                        futures::stream::iter(ids.into_iter().flatten())
629                    })
630                    .for_each(move |id| {
631                        let hopr = hopr.clone();
632                        let pool = pool_clone_3.clone();
633                        async move {
634                            // Make sure the Session is still alive, otherwise remove it from the pool
635                            if let Err(error) = hopr.keep_alive_session(&id).await {
636                                error!(%error, %dst, session_id = %id, "session in pool is not alive, removing from pool");
637                                if let Ok(mut pool) = pool.lock() {
638                                    pool.retain(|s| *s.id() != id);
639                                }
640                            }
641                        }
642                    })
643                ))
644            })
645        } else {
646            Ok(Self { pool: None, ah: None })
647        }
648    }
649
650    fn pop(&mut self) -> Option<HoprSession> {
651        self.pool.as_ref().and_then(|pool| pool.lock().ok()?.pop_front())
652    }
653}
654
655impl Drop for SessionPool {
656    fn drop(&mut self) {
657        if let Some(ah) = self.ah.take() {
658            ah.abort();
659        }
660    }
661}
662
663async fn create_tcp_client_binding(
664    bind_host: std::net::SocketAddr,
665    state: Arc<InternalState>,
666    args: SessionClientRequest,
667) -> Result<std::net::SocketAddr, (StatusCode, ApiErrorStatus)> {
668    let target_spec = args.target.clone();
669    let (dst, target, data) = args
670        .clone()
671        .into_protocol_session_config(IpProtocol::TCP)
672        .await
673        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
674
675    // Bind the TCP socket first
676    let (bound_host, tcp_listener) = tcp_listen_on(bind_host).await.map_err(|e| {
677        if e.kind() == std::io::ErrorKind::AddrInUse {
678            (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
679        } else {
680            (
681                StatusCode::UNPROCESSABLE_ENTITY,
682                ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
683            )
684        }
685    })?;
686    info!(%bound_host, "TCP session listener bound");
687
688    // For each new TCP connection coming to the listener,
689    // open a Session with the same parameters
690    let hopr = state.hopr.clone();
691
692    // Create a session pool if requested
693    let mut session_pool = SessionPool::new(
694        args.session_pool.unwrap_or(0),
695        dst,
696        target.clone(),
697        data.clone(),
698        hopr.clone(),
699    )
700    .await?;
701
702    state.open_listeners.write_arc().await.insert(
703        ListenerId(hopr_network_types::types::IpProtocol::TCP, bound_host),
704        StoredSessionEntry {
705            destination: dst,
706            target: target_spec.clone(),
707            forward_path: args.forward_path.clone(),
708            return_path: args.return_path.clone(),
709            abort_handle: hopr_async_runtime::spawn_as_abortable(
710                tokio_stream::wrappers::TcpListenerStream::new(tcp_listener)
711                    .and_then(|sock| async { Ok((sock.peer_addr()?, sock)) })
712                    .for_each_concurrent(None, move |accepted_client| {
713                        let data = data.clone();
714                        let target = target.clone();
715                        let hopr = hopr.clone();
716
717                        // Try to pop from the pool only if a client was accepted
718                        let maybe_pooled_session = accepted_client.is_ok().then(|| session_pool.pop()).flatten();
719                        async move {
720                            match accepted_client {
721                                Ok((sock_addr, stream)) => {
722                                    debug!(socket = ?sock_addr, "incoming TCP connection");
723                                    let session = match maybe_pooled_session {
724                                        Some(s) => {
725                                            debug!(session_id = %s.id(), "using pooled session");
726                                            s
727                                        }
728                                        None => {
729                                            debug!("no more active sessions in pool, creating a new one");
730                                            match hopr.connect_to(dst, target, data).await {
731                                                Ok(s) => s,
732                                                Err(error) => {
733                                                    error!(%error, "failed to establish session");
734                                                    return;
735                                                }
736                                            }
737                                        }
738                                    };
739
740                                    debug!(
741                                        socket = ?sock_addr,
742                                        session_id = tracing::field::debug(*session.id()),
743                                        "new session for incoming TCP connection",
744                                    );
745
746                                    #[cfg(all(feature = "prometheus", not(test)))]
747                                    METRIC_ACTIVE_CLIENTS.increment(&["tcp"], 1.0);
748
749                                    bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE).await;
750
751                                    #[cfg(all(feature = "prometheus", not(test)))]
752                                    METRIC_ACTIVE_CLIENTS.decrement(&["tcp"], 1.0);
753                                }
754                                Err(e) => error!(error = %e, "failed to accept connection"),
755                            }
756                        }
757                    }),
758            ),
759        },
760    );
761    Ok(bound_host)
762}
763
764async fn create_udp_client_binding(
765    bind_host: std::net::SocketAddr,
766    state: Arc<InternalState>,
767    args: SessionClientRequest,
768) -> Result<std::net::SocketAddr, (StatusCode, ApiErrorStatus)> {
769    let target_spec = args.target.clone();
770    let (dst, target, data) = args
771        .clone()
772        .into_protocol_session_config(IpProtocol::UDP)
773        .await
774        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
775
776    // Bind the UDP socket first
777    let (bound_host, udp_socket) = udp_bind_to(bind_host).await.map_err(|e| {
778        if e.kind() == std::io::ErrorKind::AddrInUse {
779            (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
780        } else {
781            (
782                StatusCode::UNPROCESSABLE_ENTITY,
783                ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
784            )
785        }
786    })?;
787
788    info!(%bound_host, "UDP session listener bound");
789
790    let hopr = state.hopr.clone();
791
792    // Create a single session for the UDP socket
793    let session = hopr.connect_to(dst, target, data).await.map_err(|e| {
794        (
795            StatusCode::UNPROCESSABLE_ENTITY,
796            ApiErrorStatus::UnknownFailure(e.to_string()),
797        )
798    })?;
799
800    let open_listeners_clone = state.open_listeners.clone();
801    let listener_id = ListenerId(hopr_network_types::types::IpProtocol::UDP, bound_host);
802
803    state.open_listeners.write_arc().await.insert(
804        listener_id,
805        StoredSessionEntry {
806            destination: dst,
807            target: target_spec.clone(),
808            forward_path: args.forward_path.clone(),
809            return_path: args.return_path.clone(),
810            abort_handle: hopr_async_runtime::spawn_as_abortable(async move {
811                #[cfg(all(feature = "prometheus", not(test)))]
812                METRIC_ACTIVE_CLIENTS.increment(&["udp"], 1.0);
813
814                bind_session_to_stream(session, udp_socket, HOPR_UDP_BUFFER_SIZE).await;
815
816                #[cfg(all(feature = "prometheus", not(test)))]
817                METRIC_ACTIVE_CLIENTS.decrement(&["udp"], 1.0);
818
819                // Once the Session closes, remove it from the list
820                open_listeners_clone.write_arc().await.remove(&listener_id);
821            }),
822        },
823    );
824    Ok(bound_host)
825}
826
827/// Creates a new client session returning the given session listening host and port over TCP or UDP.
828/// If no listening port is given in the request, the socket will be bound to a random free
829/// port and returned in the response.
830/// Different capabilities can be configured for the session, such as data segmentation or
831/// retransmission.
832///
833/// Once the host and port are bound, it is possible to use the socket for bidirectional read/write
834/// communication over the selected IP protocol and HOPR network routing with the given destination.
835/// The destination HOPR node forwards all the data to the given target over the selected IP protocol.
836///
837/// Various services require different types of socket communications:
838/// - services running over UDP usually do not require data retransmission, as it is already expected
839/// that UDP does not provide these and is therefore handled at the application layer.
840/// - On the contrary, services running over TCP *almost always* expect data segmentation and
841/// retransmission capabilities, so these should be configured while creating a session that passes
842/// TCP data.
843#[utoipa::path(
844        post,
845        path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
846        description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
847        params(
848            ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
849        ),
850        request_body(
851            content = SessionClientRequest,
852            description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
853            content_type = "application/json"),
854        responses(
855            (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
856            (status = 400, description = "Invalid IP protocol.", body = ApiError),
857            (status = 401, description = "Invalid authorization token.", body = ApiError),
858            (status = 409, description = "Listening address and port already in use.", body = ApiError),
859            (status = 422, description = "Unknown failure", body = ApiError),
860        ),
861        security(
862            ("api_token" = []),
863            ("bearer_token" = [])
864        ),
865        tag = "Session"
866    )]
867pub(crate) async fn create_client(
868    State(state): State<Arc<InternalState>>,
869    Path(protocol): Path<IpProtocol>,
870    Json(args): Json<SessionClientRequest>,
871) -> Result<impl IntoResponse, impl IntoResponse> {
872    let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
873
874    if bind_host.port() > 0
875        && state
876            .open_listeners
877            .read_arc()
878            .await
879            .contains_key(&ListenerId(protocol.into(), bind_host))
880    {
881        return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
882    }
883
884    debug!("binding {protocol} session listening socket to {bind_host}");
885    let bound_host = match protocol {
886        IpProtocol::TCP => create_tcp_client_binding(bind_host, state, args.clone()).await?,
887        IpProtocol::UDP => create_udp_client_binding(bind_host, state, args.clone()).await?,
888    };
889
890    Ok::<_, (StatusCode, ApiErrorStatus)>(
891        (
892            StatusCode::OK,
893            Json(SessionClientResponse {
894                protocol,
895                ip: bound_host.ip().to_string(),
896                port: bound_host.port(),
897                target: args.target.to_string(),
898                destination: args.destination,
899                forward_path: args.forward_path.clone(),
900                return_path: args.return_path.clone(),
901                mtu: SESSION_PAYLOAD_SIZE,
902            }),
903        )
904            .into_response(),
905    )
906}
907
908/// Lists existing Session listeners for the given IP protocol.
909#[utoipa::path(
910    get,
911    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
912    description = "Lists existing Session listeners for the given IP protocol.",
913    params(
914        ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
915    ),
916    responses(
917        (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
918            {
919                "target": "example.com:80",
920                "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
921                "forwardPath": { "Hops": 1 },
922                "returnPath": { "Hops": 1 },
923                "protocol": "tcp",
924                "ip": "127.0.0.1",
925                "port": 5542,
926                "mtu": 987
927            }
928        ])),
929        (status = 400, description = "Invalid IP protocol.", body = ApiError),
930        (status = 401, description = "Invalid authorization token.", body = ApiError),
931        (status = 422, description = "Unknown failure", body = ApiError)
932    ),
933    security(
934        ("api_token" = []),
935        ("bearer_token" = [])
936    ),
937    tag = "Session",
938)]
939pub(crate) async fn list_clients(
940    State(state): State<Arc<InternalState>>,
941    Path(protocol): Path<IpProtocol>,
942) -> Result<impl IntoResponse, impl IntoResponse> {
943    let response = state
944        .open_listeners
945        .read_arc()
946        .await
947        .iter()
948        .filter(|(id, _)| id.0 == protocol.into())
949        .map(|(id, entry)| SessionClientResponse {
950            protocol,
951            ip: id.1.ip().to_string(),
952            port: id.1.port(),
953            target: entry.target.to_string(),
954            forward_path: entry.forward_path.clone(),
955            return_path: entry.return_path.clone(),
956            destination: entry.destination,
957            mtu: SESSION_PAYLOAD_SIZE,
958        })
959        .collect::<Vec<_>>();
960
961    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
962}
963
964#[derive(
965    Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
966)]
967#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
968#[serde(rename_all = "lowercase")]
969#[schema(example = "tcp")]
970/// IP transport protocol
971pub enum IpProtocol {
972    #[allow(clippy::upper_case_acronyms)]
973    TCP,
974    #[allow(clippy::upper_case_acronyms)]
975    UDP,
976}
977
978impl From<IpProtocol> for hopr_lib::IpProtocol {
979    fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
980        match protocol {
981            IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
982            IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
983        }
984    }
985}
986
987#[serde_as]
988#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
989pub struct SessionCloseClientQuery {
990    #[serde_as(as = "DisplayFromStr")]
991    #[schema(value_type = String, example = "tcp")]
992    /// IP transport protocol
993    pub protocol: IpProtocol,
994
995    /// Listening IP address of the Session.
996    #[schema(example = "127.0.0.1:8545")]
997    pub ip: String,
998
999    /// Session port used for the listener.
1000    #[schema(value_type = u16, example = 10101)]
1001    pub port: u16,
1002}
1003
1004/// Closes an existing Session listener.
1005/// The listener must've been previously created and bound for the given IP protocol.
1006/// Once a listener is closed, no more socket connections can be made to it.
1007/// If the passed port number is 0, listeners on all ports of the given listening IP and protocol
1008/// will be closed.
1009#[utoipa::path(
1010    delete,
1011    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
1012    description = "Closes an existing Session listener.",
1013    params(SessionCloseClientQuery),
1014    responses(
1015            (status = 204, description = "Listener closed successfully"),
1016            (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
1017            (status = 401, description = "Invalid authorization token.", body = ApiError),
1018            (status = 404, description = "Listener not found.", body = ApiError),
1019            (status = 422, description = "Unknown failure", body = ApiError)
1020    ),
1021    security(
1022            ("api_token" = []),
1023            ("bearer_token" = [])
1024    ),
1025    tag = "Session",
1026)]
1027pub(crate) async fn close_client(
1028    State(state): State<Arc<InternalState>>,
1029    Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
1030) -> Result<impl IntoResponse, impl IntoResponse> {
1031    let listening_ip: IpAddr = ip
1032        .parse()
1033        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
1034
1035    {
1036        let mut open_listeners = state.open_listeners.write_arc().await;
1037
1038        let mut to_remove = Vec::new();
1039
1040        // Find all listeners with protocol, listening IP and optionally port number (if > 0)
1041        open_listeners
1042            .iter()
1043            .filter(|(ListenerId(proto, addr), _)| {
1044                let protocol: hopr_lib::IpProtocol = protocol.into();
1045                protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1046            })
1047            .for_each(|(id, _)| to_remove.push(*id));
1048
1049        if to_remove.is_empty() {
1050            return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1051        }
1052
1053        for bound_addr in to_remove {
1054            let entry = open_listeners
1055                .remove(&bound_addr)
1056                .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1057
1058            entry.abort_handle.abort();
1059        }
1060    }
1061
1062    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1063}
1064
1065async fn try_restricted_bind<F, S, Fut>(
1066    addrs: Vec<std::net::SocketAddr>,
1067    range_str: &str,
1068    binder: F,
1069) -> std::io::Result<S>
1070where
1071    F: Fn(Vec<std::net::SocketAddr>) -> Fut,
1072    Fut: Future<Output = std::io::Result<S>>,
1073{
1074    if addrs.is_empty() {
1075        return Err(std::io::Error::other("no valid socket addresses found"));
1076    }
1077
1078    let range = range_str
1079        .split_once(":")
1080        .and_then(
1081            |(a, b)| match u16::from_str(a).and_then(|a| Ok((a, u16::from_str(b)?))) {
1082                Ok((a, b)) if a <= b => Some(a..=b),
1083                _ => None,
1084            },
1085        )
1086        .ok_or(std::io::Error::other(format!("invalid port range {range_str}")))?;
1087
1088    for port in range {
1089        let addrs = addrs
1090            .iter()
1091            .map(|addr| std::net::SocketAddr::new(addr.ip(), port))
1092            .collect::<Vec<_>>();
1093        match binder(addrs).await {
1094            Ok(listener) => return Ok(listener),
1095            Err(error) => debug!(%error, "listen address not usable"),
1096        }
1097    }
1098
1099    Err(std::io::Error::new(
1100        std::io::ErrorKind::AddrNotAvailable,
1101        format!("no valid socket addresses found within range: {range_str}"),
1102    ))
1103}
1104
1105async fn tcp_listen_on<A: std::net::ToSocketAddrs>(address: A) -> std::io::Result<(std::net::SocketAddr, TcpListener)> {
1106    let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
1107
1108    // If automatic port allocation is requested and there's a restriction on the port range
1109    // (via HOPRD_SESSION_PORT_RANGE), try to find an address within that range.
1110    if addrs.iter().all(|a| a.port() == 0) {
1111        if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
1112            let tcp_listener =
1113                try_restricted_bind(
1114                    addrs,
1115                    &range_str,
1116                    |a| async move { TcpListener::bind(a.as_slice()).await },
1117                )
1118                .await?;
1119            return Ok((tcp_listener.local_addr()?, tcp_listener));
1120        }
1121    }
1122
1123    let tcp_listener = TcpListener::bind(addrs.as_slice()).await?;
1124    Ok((tcp_listener.local_addr()?, tcp_listener))
1125}
1126
1127async fn udp_bind_to<A: std::net::ToSocketAddrs>(
1128    address: A,
1129) -> std::io::Result<(std::net::SocketAddr, ConnectedUdpStream)> {
1130    let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
1131
1132    let builder = ConnectedUdpStream::builder()
1133        .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
1134        .with_foreign_data_mode(ForeignDataMode::Discard) // discard data from UDP clients other than the first one served
1135        .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1136        .with_receiver_parallelism(UdpStreamParallelism::Auto);
1137
1138    // If automatic port allocation is requested and there's a restriction on the port range
1139    // (via HOPRD_SESSION_PORT_RANGE), try to find an address within that range.
1140    if addrs.iter().all(|a| a.port() == 0) {
1141        if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
1142            let udp_listener = try_restricted_bind(addrs, &range_str, |addrs| {
1143                futures::future::ready(builder.clone().build(addrs.as_slice()))
1144            })
1145            .await?;
1146
1147            return Ok((*udp_listener.bound_address(), udp_listener));
1148        }
1149    }
1150
1151    let udp_socket = builder.build(address)?;
1152    Ok((*udp_socket.bound_address(), udp_socket))
1153}
1154
1155async fn bind_session_to_stream<T>(mut session: HoprSession, mut stream: T, max_buf: usize)
1156where
1157    T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
1158{
1159    let session_id = *session.id();
1160    match transfer_session(&mut session, &mut stream, max_buf).await {
1161        Ok((session_to_stream_bytes, stream_to_session_bytes)) => info!(
1162            session_id = ?session_id,
1163            session_to_stream_bytes, stream_to_session_bytes, "client session ended",
1164        ),
1165        Err(error) => error!(
1166            session_id = ?session_id,
1167            %error,
1168            "error during data transfer"
1169        ),
1170    }
1171}
1172
1173#[cfg(test)]
1174mod tests {
1175    use anyhow::Context;
1176    use futures::channel::mpsc::UnboundedSender;
1177    use hopr_crypto_types::crypto_traits::Randomizable;
1178    use hopr_lib::{ApplicationData, HoprPseudonym, SendMsg};
1179    use hopr_network_types::prelude::DestinationRouting;
1180    use hopr_transport_session::errors::TransportSessionError;
1181    use tokio::io::{AsyncReadExt, AsyncWriteExt};
1182
1183    use super::*;
1184
1185    pub struct SendMsgResender {
1186        tx: UnboundedSender<Box<[u8]>>,
1187    }
1188
1189    impl SendMsgResender {
1190        pub fn new(tx: UnboundedSender<Box<[u8]>>) -> Self {
1191            Self { tx }
1192        }
1193    }
1194
1195    #[hopr_lib::async_trait]
1196    impl SendMsg for SendMsgResender {
1197        // Mimics the echo server by feeding the data back in instead of sending it over the wire
1198        async fn send_message(
1199            &self,
1200            data: ApplicationData,
1201            _: DestinationRouting,
1202        ) -> std::result::Result<(), TransportSessionError> {
1203            self.tx
1204                .clone()
1205                .unbounded_send(data.plain_text)
1206                .map_err(|_| TransportSessionError::Closed)?;
1207
1208            Ok(())
1209        }
1210    }
1211
1212    #[tokio::test]
1213    async fn hoprd_session_connection_should_create_a_working_tcp_socket_through_which_data_can_be_sent_and_received()
1214    -> anyhow::Result<()> {
1215        let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
1216
1217        let session_id = hopr_lib::HoprSessionId::new(4567u64, HoprPseudonym::random());
1218        let peer: hopr_lib::Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
1219        let session = hopr_lib::HoprSession::new(
1220            session_id,
1221            hopr_lib::DestinationRouting::forward_only(
1222                peer,
1223                hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1224            ),
1225            SessionCapabilities::empty(),
1226            Arc::new(SendMsgResender::new(tx)),
1227            Box::pin(rx),
1228            None,
1229        );
1230
1231        let (bound_addr, tcp_listener) = tcp_listen_on(("127.0.0.1", 0)).await.context("listen_on failed")?;
1232
1233        tokio::task::spawn(async move {
1234            match tcp_listener.accept().await {
1235                Ok((stream, _)) => bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE).await,
1236                Err(e) => error!("failed to accept connection: {e}"),
1237            }
1238        });
1239
1240        let mut tcp_stream = tokio::net::TcpStream::connect(bound_addr)
1241            .await
1242            .context("connect failed")?;
1243
1244        let data = vec![b"hello", b"world", b"this ", b"is   ", b"    a", b" test"];
1245
1246        for d in data.clone().into_iter() {
1247            tcp_stream.write_all(d).await.context("write failed")?;
1248        }
1249
1250        for d in data.iter() {
1251            let mut buf = vec![0; d.len()];
1252            tcp_stream.read_exact(&mut buf).await.context("read failed")?;
1253        }
1254
1255        Ok(())
1256    }
1257
1258    #[tokio::test]
1259    async fn hoprd_session_connection_should_create_a_working_udp_socket_through_which_data_can_be_sent_and_received()
1260    -> anyhow::Result<()> {
1261        let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
1262
1263        let session_id = hopr_lib::HoprSessionId::new(4567u64, HoprPseudonym::random());
1264        let peer: hopr_lib::Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
1265        let session = hopr_lib::HoprSession::new(
1266            session_id,
1267            hopr_lib::DestinationRouting::forward_only(
1268                peer,
1269                hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1270            ),
1271            SessionCapabilities::empty(),
1272            Arc::new(SendMsgResender::new(tx)),
1273            Box::pin(rx),
1274            None,
1275        );
1276
1277        let (listen_addr, udp_listener) = udp_bind_to(("127.0.0.1", 0)).await.context("udp_bind_to failed")?;
1278
1279        tokio::task::spawn(bind_session_to_stream(
1280            session,
1281            udp_listener,
1282            ApplicationData::PAYLOAD_SIZE,
1283        ));
1284
1285        let mut udp_stream = ConnectedUdpStream::builder()
1286            .with_buffer_size(ApplicationData::PAYLOAD_SIZE)
1287            .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1288            .with_counterparty(listen_addr)
1289            .build(("127.0.0.1", 0))
1290            .context("bind failed")?;
1291
1292        let data = vec![b"hello", b"world", b"this ", b"is   ", b"    a", b" test"];
1293
1294        for d in data.clone().into_iter() {
1295            udp_stream.write_all(d).await.context("write failed")?;
1296        }
1297
1298        for d in data.iter() {
1299            let mut buf = vec![0; d.len()];
1300            udp_stream.read_exact(&mut buf).await.context("read failed")?;
1301        }
1302
1303        Ok(())
1304    }
1305
1306    #[test]
1307    fn test_build_binding_address() {
1308        let default = "10.0.0.1:10000".parse().unwrap();
1309
1310        let result = build_binding_host(Some("127.0.0.1:10000"), default);
1311        assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1312
1313        let result = build_binding_host(None, default);
1314        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1315
1316        let result = build_binding_host(Some("127.0.0.1"), default);
1317        assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1318
1319        let result = build_binding_host(Some(":1234"), default);
1320        assert_eq!(result, "10.0.0.1:1234".parse::<std::net::SocketAddr>().unwrap());
1321
1322        let result = build_binding_host(Some(":"), default);
1323        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1324
1325        let result = build_binding_host(Some(""), default);
1326        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1327    }
1328}