hoprd_api/
session.rs

1use std::fmt::Formatter;
2use std::future::Future;
3use std::str::FromStr;
4
5use axum::extract::Path;
6use axum::Error;
7use axum::{
8    extract::{
9        ws::{Message, WebSocket, WebSocketUpgrade},
10        Json, State,
11    },
12    http::status::StatusCode,
13    response::IntoResponse,
14};
15use axum_extra::extract::Query;
16use base64::Engine;
17use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt, TryStreamExt};
18use futures_concurrency::stream::Merge;
19use libp2p_identity::PeerId;
20use serde::{Deserialize, Serialize};
21use serde_with::{serde_as, DisplayFromStr};
22use std::net::IpAddr;
23use std::sync::Arc;
24use tokio::net::TcpListener;
25use tracing::{debug, error, info, trace};
26
27use hopr_lib::errors::HoprLibError;
28use hopr_lib::transfer_session;
29use hopr_lib::{HoprSession, ServiceId, SessionClientConfig, SessionTarget};
30use hopr_network_types::prelude::{ConnectedUdpStream, IpOrHost, SealedHost, UdpStreamParallelism};
31use hopr_network_types::udp::ForeignDataMode;
32use hopr_network_types::utils::AsyncReadStreamer;
33
34use crate::types::PeerOrAddress;
35use crate::{ApiError, ApiErrorStatus, InternalState, ListenerId, BASE_PATH};
36
37/// Size of the buffer for forwarding data to/from a TCP stream.
38pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
39
40/// Size of the buffer for forwarding data to/from a UDP stream.
41pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
42
43/// Size of the queue (back-pressure) for data incoming from a UDP stream.
44pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
45
46#[cfg(all(feature = "prometheus", not(test)))]
47lazy_static::lazy_static! {
48    static ref METRIC_ACTIVE_CLIENTS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
49        "hopr_session_hoprd_clients",
50        "Number of clients connected at this Entry node",
51        &["type"]
52    ).unwrap();
53}
54
55#[serde_as]
56#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
57pub enum SessionTargetSpec {
58    Plain(String),
59    Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
60    Service(ServiceId),
61}
62
63impl std::fmt::Display for SessionTargetSpec {
64    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
65        match self {
66            SessionTargetSpec::Plain(t) => write!(f, "{t}"),
67            SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
68            SessionTargetSpec::Service(t) => write!(f, "#{t}"),
69        }
70    }
71}
72
73impl std::str::FromStr for SessionTargetSpec {
74    type Err = HoprLibError;
75
76    fn from_str(s: &str) -> Result<Self, Self::Err> {
77        Ok(if let Some(stripped) = s.strip_prefix("$$") {
78            Self::Sealed(
79                base64::prelude::BASE64_URL_SAFE
80                    .decode(stripped)
81                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
82            )
83        } else if let Some(stripped) = s.strip_prefix("#") {
84            Self::Service(
85                stripped
86                    .parse()
87                    .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
88            )
89        } else {
90            Self::Plain(s.to_owned())
91        })
92    }
93}
94
95impl SessionTargetSpec {
96    pub fn into_target(self, protocol: IpProtocol) -> Result<SessionTarget, HoprLibError> {
97        Ok(match (protocol, self) {
98            (IpProtocol::TCP, SessionTargetSpec::Plain(plain)) => SessionTarget::TcpStream(
99                IpOrHost::from_str(&plain)
100                    .map(SealedHost::from)
101                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
102            ),
103            (IpProtocol::UDP, SessionTargetSpec::Plain(plain)) => SessionTarget::UdpStream(
104                IpOrHost::from_str(&plain)
105                    .map(SealedHost::from)
106                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
107            ),
108            (IpProtocol::TCP, SessionTargetSpec::Sealed(enc)) => {
109                SessionTarget::TcpStream(SealedHost::Sealed(enc.into_boxed_slice()))
110            }
111            (IpProtocol::UDP, SessionTargetSpec::Sealed(enc)) => {
112                SessionTarget::UdpStream(SealedHost::Sealed(enc.into_boxed_slice()))
113            }
114            (_, SessionTargetSpec::Service(id)) => SessionTarget::ExitNode(id),
115        })
116    }
117}
118
119/// Entry stored in the session registry table.
120#[derive(Debug)]
121pub struct StoredSessionEntry {
122    /// Target of the Session.
123    pub target: SessionTargetSpec,
124    /// Routing used for the Session.
125    pub path: RoutingOptions,
126    /// The join handle for the Session processing.
127    pub jh: hopr_async_runtime::prelude::JoinHandle<()>,
128}
129
130#[repr(u8)]
131#[derive(
132    Debug, Clone, strum::EnumIter, strum::Display, strum::EnumString, Serialize, Deserialize, utoipa::ToSchema,
133)]
134pub enum SessionCapability {
135    /// Frame segmentation
136    Segmentation,
137    /// Frame retransmission (ACK and NACK-based)
138    Retransmission,
139    /// Frame retransmission (only ACK-based)
140    RetransmissionAckOnly,
141    /// Disable packet buffering
142    NoDelay,
143}
144
145impl From<SessionCapability> for hopr_lib::SessionCapability {
146    fn from(cap: SessionCapability) -> hopr_lib::SessionCapability {
147        match cap {
148            SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation,
149            SessionCapability::Retransmission => hopr_lib::SessionCapability::Retransmission,
150            SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAckOnly,
151            SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay,
152        }
153    }
154}
155
156#[serde_as]
157#[derive(Debug, Clone, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
158#[into_params(parameter_in = Query)]
159#[serde(rename_all = "camelCase")]
160pub(crate) struct SessionWebsocketClientQueryRequest {
161    #[serde_as(as = "DisplayFromStr")]
162    #[schema(required = true, value_type = String)]
163    pub destination: String, //PeerId,  // issue in utoipa on overriding the type
164    #[schema(required = true)]
165    pub hops: u8,
166    #[cfg(feature = "explicit-path")]
167    #[schema(required = false)]
168    pub path: Option<String>,
169    #[schema(required = true)]
170    #[serde_as(as = "Vec<DisplayFromStr>")]
171    pub capabilities: Vec<SessionCapability>,
172    #[schema(required = true)]
173    #[serde_as(as = "DisplayFromStr")]
174    pub target: SessionTargetSpec,
175    #[schema(required = false)]
176    #[serde(default = "default_protocol")]
177    pub protocol: IpProtocol,
178}
179
180#[inline]
181fn default_protocol() -> IpProtocol {
182    IpProtocol::TCP
183}
184
185impl SessionWebsocketClientQueryRequest {
186    pub(crate) fn into_protocol_session_config(self) -> Result<SessionClientConfig, HoprLibError> {
187        #[cfg(not(feature = "explicit-path"))]
188        let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
189
190        #[cfg(feature = "explicit-path")]
191        let path_options = if let Some(path) = self.path {
192            // Explicit `path` will override `hops`
193            hopr_lib::RoutingOptions::IntermediatePath(
194                path.split(',')
195                    .map(PeerId::from_str)
196                    .collect::<Result<Vec<PeerId>, _>>()
197                    .map_err(|e| HoprLibError::GeneralError(format!("invalid peer id on path: {e}")))?
198                    .try_into()?,
199            )
200        } else {
201            hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
202        };
203
204        Ok(SessionClientConfig {
205            peer: PeerId::from_str(self.destination.as_str())
206                .map_err(|_e| HoprLibError::GeneralError(format!("invalid destination: {}", self.destination)))?,
207            path_options,
208            target: self.target.into_target(self.protocol)?,
209            capabilities: self.capabilities.into_iter().map(SessionCapability::into).collect(),
210        })
211    }
212}
213
214#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
215#[schema(value_type = String, format = Binary)]
216#[allow(dead_code)] // not dead code, just for codegen
217struct WssData(Vec<u8>);
218
219/// Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.
220///
221/// Once configured, the session represents and automatically managed connection to a target peer through a network routing
222/// configuration. The session can be used to send and receive binary data over the network.
223///
224/// Authentication (if enabled) is done by cookie `X-Auth-Token`.
225///
226/// Connect to the endpoint by using a WS client. No preview available. Example: `ws://127.0.0.1:3001/api/v3/session/websocket
227#[allow(dead_code)] // not dead code, just for documentation
228#[utoipa::path(
229        get,
230        path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
231        params(SessionWebsocketClientQueryRequest),
232        responses(
233            (status = 200, description = "Successfully created a new client websocket session."),
234            (status = 401, description = "Invalid authorization token.", body = ApiError),
235            (status = 422, description = "Unknown failure", body = ApiError),
236            (status = 429, description = "Too many open websocket connections.", body = ApiError),
237        ),
238        security(
239            ("api_token" = []),
240            ("bearer_token" = [])
241        ),
242        tag = "Session",
243    )]
244
245pub(crate) async fn websocket(
246    ws: WebSocketUpgrade,
247    Query(query): Query<SessionWebsocketClientQueryRequest>,
248    State(state): State<Arc<InternalState>>,
249) -> Result<impl IntoResponse, impl IntoResponse> {
250    let data = query.into_protocol_session_config().map_err(|e| {
251        (
252            StatusCode::UNPROCESSABLE_ENTITY,
253            ApiErrorStatus::UnknownFailure(e.to_string()),
254        )
255    })?;
256
257    let hopr = state.hopr.clone();
258    let session: HoprSession = hopr.connect_to(data).await.map_err(|e| {
259        error!(error = %e, "Failed to establish session");
260        (
261            StatusCode::UNPROCESSABLE_ENTITY,
262            ApiErrorStatus::UnknownFailure(e.to_string()),
263        )
264    })?;
265
266    Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
267}
268
269enum WebSocketInput {
270    Network(Result<Box<[u8]>, std::io::Error>),
271    WsInput(Result<Message, Error>),
272}
273
274/// The maximum number of bytes read from a Session that WS can transfer within a single message.
275const WS_MAX_SESSION_READ_SIZE: usize = 4096;
276
277#[tracing::instrument(level = "debug", skip(socket, session))]
278async fn websocket_connection(socket: WebSocket, session: HoprSession) {
279    let session_id = *session.id();
280
281    let (rx, mut tx) = session.split();
282    let (mut sender, receiver) = socket.split();
283
284    let mut queue = (
285        receiver.map(WebSocketInput::WsInput),
286        AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
287    )
288        .merge();
289
290    let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
291
292    while let Some(v) = queue.next().await {
293        match v {
294            WebSocketInput::Network(bytes) => match bytes {
295                Ok(bytes) => {
296                    let len = bytes.len();
297                    if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
298                        error!(
299                            error = %e,
300                            "Failed to emit read data onto the websocket, closing connection"
301                        );
302                        break;
303                    };
304                    bytes_from_session += len;
305                }
306                Err(e) => {
307                    error!(
308                        error = %e,
309                        "Failed to push data from network to socket, closing connection"
310                    );
311                    break;
312                }
313            },
314            WebSocketInput::WsInput(ws_in) => match ws_in {
315                Ok(Message::Binary(data)) => {
316                    let len = data.len();
317                    if let Err(e) = tx.write(data.as_ref()).await {
318                        error!(error = %e, "Failed to write data to the session, closing connection");
319                        break;
320                    }
321                    bytes_to_session += len;
322                }
323                Ok(Message::Text(_)) => {
324                    error!("Received string instead of binary data, closing connection");
325                    break;
326                }
327                Ok(Message::Close(_)) => {
328                    debug!("Received close frame, closing connection");
329                    break;
330                }
331                Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
332                Err(e) => {
333                    error!(error = %e, "Failed to get a valid websocket message, closing connection");
334                    break;
335                }
336            },
337        }
338    }
339
340    info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
341}
342
343#[serde_as]
344#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
345pub enum RoutingOptions {
346    #[cfg(feature = "explicit-path")]
347    #[schema(value_type = Vec<String>)]
348    IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<PeerId>),
349    Hops(usize),
350}
351
352impl TryFrom<RoutingOptions> for hopr_lib::RoutingOptions {
353    type Error = HoprLibError;
354
355    fn try_from(value: RoutingOptions) -> Result<Self, Self::Error> {
356        match value {
357            #[cfg(feature = "explicit-path")]
358            RoutingOptions::IntermediatePath(path) => {
359                Ok(hopr_lib::RoutingOptions::IntermediatePath(path.into_iter().collect()))
360            }
361            RoutingOptions::Hops(hops) => Ok(hopr_lib::RoutingOptions::Hops(hops.try_into()?)),
362        }
363    }
364}
365
366#[serde_as]
367#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
368#[schema(example = json!({
369        "destination": "12D3KooWR4uwjKCDCAY1xsEFB4esuWLF9Q5ijYvCjz5PNkTbnu33",
370        "path": {
371            "Hops": 1
372        },
373        "target": {"Plain": "localhost:8080"},
374        "listenHost": "127.0.0.1:10000",
375        "capabilities": ["Retransmission", "Segmentation"]
376    }))]
377#[serde(rename_all = "camelCase")]
378pub(crate) struct SessionClientRequest {
379    /// Peer ID of the Exit node.
380    #[serde_as(as = "DisplayFromStr")]
381    #[schema(value_type = String)]
382    pub destination: PeerOrAddress,
383    pub path: RoutingOptions,
384    pub target: SessionTargetSpec,
385    /// Listen host (`ip:port`) for the Session socket at the Entry node.
386    ///
387    /// Supports also partial specification (only `ip` or only `:port`) with the
388    /// respective part replaced by the node's configured default.
389    pub listen_host: Option<String>,
390    #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
391    /// Capabilities for the Session protocol.
392    ///
393    /// Defaults to `Segmentation` and `Retransmission` for TCP and nothing for UDP.
394    pub capabilities: Option<Vec<SessionCapability>>,
395}
396
397impl SessionClientRequest {
398    pub(crate) fn into_protocol_session_config(
399        self,
400        target_protocol: IpProtocol,
401    ) -> Result<SessionClientConfig, HoprLibError> {
402        let peer = match self.destination {
403            PeerOrAddress::PeerId(peer_id) => peer_id,
404            PeerOrAddress::Address(address) => {
405                return Err(HoprLibError::GeneralError(format!("invalid destination: {address}")))
406            }
407        };
408
409        Ok(SessionClientConfig {
410            peer,
411            path_options: self.path.try_into()?,
412            target: self.target.into_target(target_protocol)?,
413            capabilities: self
414                .capabilities
415                .map(|vs| {
416                    vs.into_iter()
417                        .map(|v| {
418                            let cap: hopr_lib::SessionCapability = v.into();
419                            cap
420                        })
421                        .collect::<Vec<_>>()
422                })
423                .unwrap_or_else(|| match target_protocol {
424                    IpProtocol::TCP => {
425                        vec![
426                            hopr_lib::SessionCapability::Retransmission,
427                            hopr_lib::SessionCapability::Segmentation,
428                        ]
429                    }
430                    _ => vec![], // no default capabilities for UDP, etc.
431                }),
432        })
433    }
434}
435
436#[serde_as]
437#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
438#[schema(example = json!({
439        "target": "example.com:80",
440        "protocol": "tcp",
441        "ip": "127.0.0.1",
442        "port": 5542,
443        "path": { "Hops": 1 }
444    }))]
445#[serde(rename_all = "camelCase")]
446pub(crate) struct SessionClientResponse {
447    pub target: String,
448    #[serde_as(as = "DisplayFromStr")]
449    #[schema(value_type = String)]
450    pub protocol: IpProtocol,
451    pub ip: String,
452    pub path: RoutingOptions,
453    pub port: u16,
454}
455
456/// This function first tries to parse `requested` as the `ip:port` host pair.
457/// If that does not work, it tries to parse `requested` as a single IP address
458/// and as a `:` prefixed port number. Whichever of those fails, is replaced by the corresponding
459/// part from the given `default`.
460fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
461    match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
462        Some(Err(requested)) => {
463            // If the requested host is not parseable as a whole as `SocketAddr`, try only its parts
464            debug!(requested, %default, "using partially default listen host");
465            std::net::SocketAddr::new(
466                requested.parse().unwrap_or(default.ip()),
467                requested
468                    .strip_prefix(":")
469                    .and_then(|p| u16::from_str(p).ok())
470                    .unwrap_or(default.port()),
471            )
472        }
473        Some(Ok(requested)) => {
474            debug!(%requested, "using requested listen host");
475            requested
476        }
477        None => {
478            debug!(%default, "using default listen host");
479            default
480        }
481    }
482}
483
484/// Creates a new client session returning the given session listening host and port over TCP or UDP.
485/// If no listening port is given in the request, the socket will be bound to a random free
486/// port and returned in the response.
487/// Different capabilities can be configured for the session, such as data segmentation or
488/// retransmission.
489///
490/// Once the host and port are bound, it is possible to use the socket for bidirectional read/write
491/// communication over the selected IP protocol and HOPR network routing with the given destination.
492/// The destination HOPR node forwards all the data to the given target over the selected IP protocol.
493///
494/// Various services require different types of socket communications:
495/// - services running over UDP usually do not require data retransmission, as it is already expected
496/// that UDP does not provide these and is therefore handled at the application layer.
497/// - On the contrary, services running over TCP *almost always* expect data segmentation and
498/// retransmission capabilities, so these should be configured while creating a session that passes
499/// TCP data.
500#[utoipa::path(
501        post,
502        path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
503        params(
504            ("protocol" = String, Path, description = "IP transport protocol")
505        ),
506        request_body(
507            content = SessionClientRequest,
508            description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
509            content_type = "application/json"),
510        responses(
511            (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
512            (status = 400, description = "Invalid IP protocol.", body = ApiError),
513            (status = 401, description = "Invalid authorization token.", body = ApiError),
514            (status = 409, description = "Listening address and port already in use.", body = ApiError),
515            (status = 422, description = "Unknown failure", body = ApiError),
516        ),
517        security(
518            ("api_token" = []),
519            ("bearer_token" = [])
520        ),
521        tag = "Session"
522    )]
523pub(crate) async fn create_client(
524    State(state): State<Arc<InternalState>>,
525    Path(protocol): Path<IpProtocol>,
526    Json(args): Json<SessionClientRequest>,
527) -> Result<impl IntoResponse, impl IntoResponse> {
528    let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
529
530    if bind_host.port() > 0
531        && state
532            .open_listeners
533            .read()
534            .await
535            .contains_key(&ListenerId(protocol.into(), bind_host))
536    {
537        return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
538    }
539
540    let target = args.target.clone();
541    let path = args.path.clone();
542    let data = args.into_protocol_session_config(protocol).map_err(|e| {
543        (
544            StatusCode::UNPROCESSABLE_ENTITY,
545            ApiErrorStatus::UnknownFailure(e.to_string()),
546        )
547    })?;
548
549    // TODO: consider pooling the sessions on a listener, so that the negotiation is amortized
550
551    debug!("binding {protocol} session listening socket to {bind_host}");
552    let bound_host = match protocol {
553        IpProtocol::TCP => {
554            // Bind the TCP socket first
555            let (bound_host, tcp_listener) = tcp_listen_on(bind_host).await.map_err(|e| {
556                if e.kind() == std::io::ErrorKind::AddrInUse {
557                    (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
558                } else {
559                    (
560                        StatusCode::UNPROCESSABLE_ENTITY,
561                        ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
562                    )
563                }
564            })?;
565            info!(%bound_host, "TCP session listener bound");
566
567            // For each new TCP connection coming to the listener,
568            // open a Session with the same parameters
569            let hopr = state.hopr.clone();
570            let jh = hopr_async_runtime::prelude::spawn(
571                tokio_stream::wrappers::TcpListenerStream::new(tcp_listener)
572                    .and_then(|sock| async { Ok((sock.peer_addr()?, sock)) })
573                    .for_each_concurrent(None, move |accepted_client| {
574                        let data = data.clone();
575                        let hopr = hopr.clone();
576                        async move {
577                            match accepted_client {
578                                Ok((sock_addr, stream)) => {
579                                    debug!(socket = ?sock_addr, "incoming TCP connection");
580                                    let session = match hopr.connect_to(data).await {
581                                        Ok(s) => s,
582                                        Err(e) => {
583                                            error!(error = %e, "failed to establish session");
584                                            return;
585                                        }
586                                    };
587
588                                    debug!(
589                                        socket = ?sock_addr,
590                                        session_id = tracing::field::debug(*session.id()),
591                                        "new session for incoming TCP connection",
592                                    );
593
594                                    #[cfg(all(feature = "prometheus", not(test)))]
595                                    METRIC_ACTIVE_CLIENTS.increment(&["tcp"], 1.0);
596
597                                    bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE).await;
598
599                                    #[cfg(all(feature = "prometheus", not(test)))]
600                                    METRIC_ACTIVE_CLIENTS.decrement(&["tcp"], 1.0);
601                                }
602                                Err(e) => error!(error = %e, "failed to accept connection"),
603                            }
604                        }
605                    }),
606            );
607
608            state.open_listeners.write().await.insert(
609                ListenerId(protocol.into(), bound_host),
610                StoredSessionEntry {
611                    target: target.clone(),
612                    path: path.clone(),
613                    jh,
614                },
615            );
616            bound_host
617        }
618        IpProtocol::UDP => {
619            // Bind the UDP socket first
620            let (bound_host, udp_socket) = udp_bind_to(bind_host).await.map_err(|e| {
621                if e.kind() == std::io::ErrorKind::AddrInUse {
622                    (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
623                } else {
624                    (
625                        StatusCode::UNPROCESSABLE_ENTITY,
626                        ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
627                    )
628                }
629            })?;
630
631            info!(%bound_host, "UDP session listener bound");
632
633            let hopr = state.hopr.clone();
634
635            // Create a single session for the UDP socket
636            let session = hopr.connect_to(data).await.map_err(|e| {
637                (
638                    StatusCode::UNPROCESSABLE_ENTITY,
639                    ApiErrorStatus::UnknownFailure(e.to_string()),
640                )
641            })?;
642
643            let open_listeners_clone = state.open_listeners.clone();
644            let listener_id = ListenerId(protocol.into(), bound_host);
645
646            state.open_listeners.write().await.insert(
647                listener_id,
648                StoredSessionEntry {
649                    target: target.clone(),
650                    path: path.clone(),
651                    jh: hopr_async_runtime::prelude::spawn(async move {
652                        #[cfg(all(feature = "prometheus", not(test)))]
653                        METRIC_ACTIVE_CLIENTS.increment(&["udp"], 1.0);
654
655                        bind_session_to_stream(session, udp_socket, HOPR_UDP_BUFFER_SIZE).await;
656
657                        #[cfg(all(feature = "prometheus", not(test)))]
658                        METRIC_ACTIVE_CLIENTS.decrement(&["udp"], 1.0);
659
660                        // Once the Session closes, remove it from the list
661                        open_listeners_clone.write().await.remove(&listener_id);
662                    }),
663                },
664            );
665            bound_host
666        }
667    };
668
669    Ok::<_, (StatusCode, ApiErrorStatus)>(
670        (
671            StatusCode::OK,
672            Json(SessionClientResponse {
673                protocol,
674                path,
675                target: target.to_string(),
676                ip: bound_host.ip().to_string(),
677                port: bound_host.port(),
678            }),
679        )
680            .into_response(),
681    )
682}
683
684/// Lists existing Session listeners for the given IP protocol.
685#[utoipa::path(
686    get,
687    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
688    params(
689            ("protocol" = String, Path, description = "IP transport protocol")
690    ),
691    responses(
692            (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>),
693            (status = 400, description = "Invalid IP protocol.", body = ApiError),
694            (status = 401, description = "Invalid authorization token.", body = ApiError),
695            (status = 422, description = "Unknown failure", body = ApiError)
696    ),
697    security(
698            ("api_token" = []),
699            ("bearer_token" = [])
700    ),
701    tag = "Session",
702)]
703pub(crate) async fn list_clients(
704    State(state): State<Arc<InternalState>>,
705    Path(protocol): Path<IpProtocol>,
706) -> Result<impl IntoResponse, impl IntoResponse> {
707    let response = state
708        .open_listeners
709        .read()
710        .await
711        .iter()
712        .filter(|(id, _)| id.0 == protocol.into())
713        .map(|(id, entry)| SessionClientResponse {
714            protocol,
715            target: entry.target.to_string(),
716            ip: id.1.ip().to_string(),
717            port: id.1.port(),
718            path: entry.path.clone(),
719        })
720        .collect::<Vec<_>>();
721
722    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
723}
724
725#[derive(
726    Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
727)]
728#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
729#[serde(rename_all = "lowercase")]
730pub enum IpProtocol {
731    #[allow(clippy::upper_case_acronyms)]
732    TCP,
733    #[allow(clippy::upper_case_acronyms)]
734    UDP,
735}
736
737impl From<IpProtocol> for hopr_lib::IpProtocol {
738    fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
739        match protocol {
740            IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
741            IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
742        }
743    }
744}
745
746#[serde_as]
747#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
748pub struct SessionCloseClientQuery {
749    #[serde_as(as = "DisplayFromStr")]
750    #[schema(value_type = String)]
751    pub protocol: IpProtocol,
752    pub ip: String,
753    pub port: u16,
754}
755
756/// Closes an existing Session listener.
757/// The listener must've been previously created and bound for the given IP protocol.
758/// Once a listener is closed, no more socket connections can be made to it.
759/// If the passed port number is 0, listeners on all ports of the given listening IP and protocol
760/// will be closed.
761#[utoipa::path(
762    delete,
763    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
764    params(SessionCloseClientQuery),
765    responses(
766            (status = 204, description = "Listener closed successfully"),
767            (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
768            (status = 401, description = "Invalid authorization token.", body = ApiError),
769            (status = 404, description = "Listener not found.", body = ApiError),
770            (status = 422, description = "Unknown failure", body = ApiError)
771    ),
772    security(
773            ("api_token" = []),
774            ("bearer_token" = [])
775    ),
776    tag = "Session",
777)]
778pub(crate) async fn close_client(
779    State(state): State<Arc<InternalState>>,
780    Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
781) -> Result<impl IntoResponse, impl IntoResponse> {
782    let listening_ip: IpAddr = ip
783        .parse()
784        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
785
786    {
787        let mut open_listeners = state.open_listeners.write().await;
788
789        let mut to_remove = Vec::new();
790
791        // Find all listeners with protocol, listening IP and optionally port number (if > 0)
792        open_listeners
793            .iter()
794            .filter(|(ListenerId(proto, addr), _)| {
795                let protocol: hopr_lib::IpProtocol = protocol.into();
796                protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
797            })
798            .for_each(|(id, _)| to_remove.push(*id));
799
800        if to_remove.is_empty() {
801            return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
802        }
803
804        for bound_addr in to_remove {
805            let entry = open_listeners
806                .remove(&bound_addr)
807                .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
808
809            hopr_async_runtime::prelude::cancel_join_handle(entry.jh).await;
810        }
811    }
812
813    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
814}
815
816async fn try_restricted_bind<F, S, Fut>(
817    addrs: Vec<std::net::SocketAddr>,
818    range_str: &str,
819    binder: F,
820) -> std::io::Result<S>
821where
822    F: Fn(Vec<std::net::SocketAddr>) -> Fut,
823    Fut: Future<Output = std::io::Result<S>>,
824{
825    if addrs.is_empty() {
826        return Err(std::io::Error::other("no valid socket addresses found"));
827    }
828
829    let range = range_str
830        .split_once(":")
831        .and_then(
832            |(a, b)| match u16::from_str(a).and_then(|a| Ok((a, u16::from_str(b)?))) {
833                Ok((a, b)) if a <= b => Some(a..=b),
834                _ => None,
835            },
836        )
837        .ok_or(std::io::Error::other(format!("invalid port range {range_str}")))?;
838
839    for port in range {
840        let addrs = addrs
841            .iter()
842            .map(|addr| std::net::SocketAddr::new(addr.ip(), port))
843            .collect::<Vec<_>>();
844        match binder(addrs).await {
845            Ok(listener) => return Ok(listener),
846            Err(error) => debug!(%error, "listen address not usable"),
847        }
848    }
849
850    Err(std::io::Error::new(
851        std::io::ErrorKind::AddrNotAvailable,
852        format!("no valid socket addresses found within range: {range_str}"),
853    ))
854}
855
856async fn tcp_listen_on<A: std::net::ToSocketAddrs>(address: A) -> std::io::Result<(std::net::SocketAddr, TcpListener)> {
857    let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
858
859    // If automatic port allocation is requested and there's a restriction on the port range
860    // (via HOPRD_SESSION_PORT_RANGE), try to find an address within that range.
861    if addrs.iter().all(|a| a.port() == 0) {
862        if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
863            let tcp_listener =
864                try_restricted_bind(
865                    addrs,
866                    &range_str,
867                    |a| async move { TcpListener::bind(a.as_slice()).await },
868                )
869                .await?;
870            return Ok((tcp_listener.local_addr()?, tcp_listener));
871        }
872    }
873
874    let tcp_listener = TcpListener::bind(addrs.as_slice()).await?;
875    Ok((tcp_listener.local_addr()?, tcp_listener))
876}
877
878async fn udp_bind_to<A: std::net::ToSocketAddrs>(
879    address: A,
880) -> std::io::Result<(std::net::SocketAddr, ConnectedUdpStream)> {
881    let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
882
883    let builder = ConnectedUdpStream::builder()
884        .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
885        .with_foreign_data_mode(ForeignDataMode::Discard) // discard data from UDP clients other than the first one served
886        .with_queue_size(HOPR_UDP_QUEUE_SIZE)
887        .with_receiver_parallelism(UdpStreamParallelism::Auto);
888
889    // If automatic port allocation is requested and there's a restriction on the port range
890    // (via HOPRD_SESSION_PORT_RANGE), try to find an address within that range.
891    if addrs.iter().all(|a| a.port() == 0) {
892        if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
893            let udp_listener = try_restricted_bind(addrs, &range_str, |addrs| {
894                futures::future::ready(builder.clone().build(addrs.as_slice()))
895            })
896            .await?;
897
898            return Ok((*udp_listener.bound_address(), udp_listener));
899        }
900    }
901
902    let udp_socket = builder.build(address)?;
903    Ok((*udp_socket.bound_address(), udp_socket))
904}
905
906async fn bind_session_to_stream<T>(mut session: HoprSession, mut stream: T, max_buf: usize)
907where
908    T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
909{
910    let session_id = *session.id();
911    match transfer_session(&mut session, &mut stream, max_buf).await {
912        Ok((session_to_stream_bytes, stream_to_session_bytes)) => info!(
913            session_id = ?session_id,
914            session_to_stream_bytes, stream_to_session_bytes, "client session ended",
915        ),
916        Err(error) => error!(
917            session_id = ?session_id,
918            %error,
919            "error during data transfer"
920        ),
921    }
922}
923
924#[cfg(test)]
925mod tests {
926    use super::*;
927    use anyhow::Context;
928    use futures::channel::mpsc::UnboundedSender;
929    use hopr_lib::{ApplicationData, Keypair, PeerId, SendMsg};
930    use hopr_transport_session::errors::TransportSessionError;
931    use std::collections::HashSet;
932    use tokio::io::{AsyncReadExt, AsyncWriteExt};
933
934    pub struct SendMsgResender {
935        tx: UnboundedSender<Box<[u8]>>,
936    }
937
938    impl SendMsgResender {
939        pub fn new(tx: UnboundedSender<Box<[u8]>>) -> Self {
940            Self { tx }
941        }
942    }
943
944    #[hopr_lib::async_trait]
945    impl SendMsg for SendMsgResender {
946        // Mimics the echo server by feeding the data back in instead of sending it over the wire
947        async fn send_message(
948            &self,
949            data: ApplicationData,
950            _destination: PeerId,
951            _options: hopr_lib::RoutingOptions,
952        ) -> std::result::Result<(), TransportSessionError> {
953            let (_peer, data) = hopr_transport_session::types::unwrap_offchain_key(data.plain_text)?;
954
955            self.tx
956                .clone()
957                .unbounded_send(data)
958                .map_err(|_| TransportSessionError::Closed)?;
959
960            Ok(())
961        }
962    }
963
964    #[tokio::test]
965    async fn hoprd_session_connection_should_create_a_working_tcp_socket_through_which_data_can_be_sent_and_received(
966    ) -> anyhow::Result<()> {
967        let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
968
969        let peer: hopr_lib::PeerId = hopr_lib::HoprOffchainKeypair::random().public().into();
970        let session = hopr_lib::HoprSession::new(
971            hopr_lib::HoprSessionId::new(4567, peer),
972            peer,
973            hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
974            HashSet::default(),
975            Arc::new(SendMsgResender::new(tx)),
976            rx,
977            None,
978        );
979
980        let (bound_addr, tcp_listener) = tcp_listen_on(("127.0.0.1", 0)).await.context("listen_on failed")?;
981
982        tokio::task::spawn(async move {
983            match tcp_listener.accept().await {
984                Ok((stream, _)) => bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE).await,
985                Err(e) => error!("failed to accept connection: {e}"),
986            }
987        });
988
989        let mut tcp_stream = tokio::net::TcpStream::connect(bound_addr)
990            .await
991            .context("connect failed")?;
992
993        let data = vec![b"hello", b"world", b"this ", b"is   ", b"    a", b" test"];
994
995        for d in data.clone().into_iter() {
996            tcp_stream.write_all(d).await.context("write failed")?;
997        }
998
999        for d in data.iter() {
1000            let mut buf = vec![0; d.len()];
1001            tcp_stream.read_exact(&mut buf).await.context("read failed")?;
1002        }
1003
1004        Ok(())
1005    }
1006
1007    #[tokio::test]
1008    async fn hoprd_session_connection_should_create_a_working_udp_socket_through_which_data_can_be_sent_and_received(
1009    ) -> anyhow::Result<()> {
1010        let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
1011
1012        let peer: hopr_lib::PeerId = hopr_lib::HoprOffchainKeypair::random().public().into();
1013        let session = hopr_lib::HoprSession::new(
1014            hopr_lib::HoprSessionId::new(4567, peer),
1015            peer,
1016            hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1017            HashSet::default(),
1018            Arc::new(SendMsgResender::new(tx)),
1019            rx,
1020            None,
1021        );
1022
1023        let (listen_addr, udp_listener) = udp_bind_to(("127.0.0.1", 0)).await.context("udp_bind_to failed")?;
1024
1025        tokio::task::spawn(bind_session_to_stream(
1026            session,
1027            udp_listener,
1028            hopr_lib::SESSION_USABLE_MTU_SIZE,
1029        ));
1030
1031        let mut udp_stream = ConnectedUdpStream::builder()
1032            .with_buffer_size(hopr_lib::SESSION_USABLE_MTU_SIZE)
1033            .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1034            .with_counterparty(listen_addr)
1035            .build(("127.0.0.1", 0))
1036            .context("bind failed")?;
1037
1038        let data = vec![b"hello", b"world", b"this ", b"is   ", b"    a", b" test"];
1039
1040        for d in data.clone().into_iter() {
1041            udp_stream.write_all(d).await.context("write failed")?;
1042        }
1043
1044        for d in data.iter() {
1045            let mut buf = vec![0; d.len()];
1046            udp_stream.read_exact(&mut buf).await.context("read failed")?;
1047        }
1048
1049        Ok(())
1050    }
1051
1052    #[test]
1053    fn test_build_binding_address() {
1054        let default = "10.0.0.1:10000".parse().unwrap();
1055
1056        let result = build_binding_host(Some("127.0.0.1:10000"), default);
1057        assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1058
1059        let result = build_binding_host(None, default);
1060        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1061
1062        let result = build_binding_host(Some("127.0.0.1"), default);
1063        assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1064
1065        let result = build_binding_host(Some(":1234"), default);
1066        assert_eq!(result, "10.0.0.1:1234".parse::<std::net::SocketAddr>().unwrap());
1067
1068        let result = build_binding_host(Some(":"), default);
1069        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1070
1071        let result = build_binding_host(Some(""), default);
1072        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1073    }
1074}