hoprd_api/
session.rs

1use std::{
2    collections::VecDeque,
3    fmt::Formatter,
4    future::Future,
5    hash::Hash,
6    net::{IpAddr, SocketAddr},
7    str::FromStr,
8    sync::Arc,
9};
10
11use axum::{
12    Error,
13    extract::{
14        Json, Path, State,
15        ws::{Message, WebSocket, WebSocketUpgrade},
16    },
17    http::status::StatusCode,
18    response::IntoResponse,
19};
20use axum_extra::extract::Query;
21use base64::Engine;
22use dashmap::DashMap;
23use futures::{
24    AsyncReadExt, AsyncWriteExt, FutureExt, SinkExt, StreamExt, TryStreamExt,
25    future::{AbortHandle, AbortRegistration},
26};
27use futures_concurrency::stream::Merge;
28use hopr_lib::{
29    Address, Hopr, HoprSession, HoprSessionId, HoprTransportError, SESSION_MTU, SURB_SIZE, ServiceId,
30    SessionCapabilities, SessionClientConfig, SessionManagerError, SessionTarget, SurbBalancerConfig,
31    TransportSessionError, errors::HoprLibError, transfer_session,
32};
33use hopr_network_types::{
34    prelude::{ConnectedUdpStream, IpOrHost, SealedHost, UdpStreamParallelism},
35    udp::ForeignDataMode,
36    utils::AsyncReadStreamer,
37};
38use serde::{Deserialize, Serialize};
39use serde_with::{DisplayFromStr, serde_as};
40use tokio::net::TcpListener;
41use tracing::{debug, error, info, trace};
42
43use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState, ListenerId};
44
45/// Size of the buffer for forwarding data to/from a TCP stream.
46pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
47
48/// Size of the buffer for forwarding data to/from a UDP stream.
49pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
50
51/// Size of the queue (back-pressure) for data incoming from a UDP stream.
52pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
53
54#[cfg(all(feature = "prometheus", not(test)))]
55lazy_static::lazy_static! {
56    static ref METRIC_ACTIVE_CLIENTS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
57        "hopr_session_hoprd_clients",
58        "Number of clients connected at this Entry node",
59        &["type"]
60    ).unwrap();
61}
62
63// Imported for some IDEs to not treat the `json!` macro inside the `schema` macro as an error
64#[allow(unused_imports)]
65use serde_json::json;
66
67#[serde_as]
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
69#[schema(
70    example = json!({"Plain": "example.com:80"}),
71    example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), // base64 for "HelloWorld"
72    example = json!({"Service": 0})
73)]
74/// Session target specification.
75pub enum SessionTargetSpec {
76    Plain(String),
77    Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
78    #[schema(value_type = u32)]
79    Service(ServiceId),
80}
81
82impl std::fmt::Display for SessionTargetSpec {
83    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
84        match self {
85            SessionTargetSpec::Plain(t) => write!(f, "{t}"),
86            SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
87            SessionTargetSpec::Service(t) => write!(f, "#{t}"),
88        }
89    }
90}
91
92impl FromStr for SessionTargetSpec {
93    type Err = HoprLibError;
94
95    fn from_str(s: &str) -> Result<Self, Self::Err> {
96        Ok(if let Some(stripped) = s.strip_prefix("$$") {
97            Self::Sealed(
98                base64::prelude::BASE64_URL_SAFE
99                    .decode(stripped)
100                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
101            )
102        } else if let Some(stripped) = s.strip_prefix("#") {
103            Self::Service(
104                stripped
105                    .parse()
106                    .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
107            )
108        } else {
109            Self::Plain(s.to_owned())
110        })
111    }
112}
113
114impl SessionTargetSpec {
115    pub fn into_target(self, protocol: IpProtocol) -> Result<SessionTarget, HoprLibError> {
116        Ok(match (protocol, self) {
117            (IpProtocol::TCP, SessionTargetSpec::Plain(plain)) => SessionTarget::TcpStream(
118                IpOrHost::from_str(&plain)
119                    .map(SealedHost::from)
120                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
121            ),
122            (IpProtocol::UDP, SessionTargetSpec::Plain(plain)) => SessionTarget::UdpStream(
123                IpOrHost::from_str(&plain)
124                    .map(SealedHost::from)
125                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
126            ),
127            (IpProtocol::TCP, SessionTargetSpec::Sealed(enc)) => {
128                SessionTarget::TcpStream(SealedHost::Sealed(enc.into_boxed_slice()))
129            }
130            (IpProtocol::UDP, SessionTargetSpec::Sealed(enc)) => {
131                SessionTarget::UdpStream(SealedHost::Sealed(enc.into_boxed_slice()))
132            }
133            (_, SessionTargetSpec::Service(id)) => SessionTarget::ExitNode(id),
134        })
135    }
136}
137
138/// Entry stored in the session registry table.
139#[derive(Debug)]
140pub struct StoredSessionEntry {
141    /// Destination address of the Session counterparty.
142    pub destination: Address,
143    /// Target of the Session.
144    pub target: SessionTargetSpec,
145    /// Forward path used for the Session.
146    pub forward_path: RoutingOptions,
147    /// Return path used for the Session.
148    pub return_path: RoutingOptions,
149    /// The abort handle for the Session processing.
150    pub abort_handle: AbortHandle,
151
152    clients: Arc<DashMap<HoprSessionId, (SocketAddr, AbortHandle)>>,
153}
154
155#[repr(u8)]
156#[derive(
157    Debug,
158    Clone,
159    strum::EnumIter,
160    strum::Display,
161    strum::EnumString,
162    Serialize,
163    Deserialize,
164    PartialEq,
165    utoipa::ToSchema,
166)]
167#[schema(example = "Segmentation")]
168/// Session capabilities that can be negotiated with the target peer.
169pub enum SessionCapability {
170    /// Frame segmentation
171    Segmentation,
172    /// Frame retransmission (ACK and NACK-based)
173    Retransmission,
174    /// Frame retransmission (only ACK-based)
175    RetransmissionAckOnly,
176    /// Disable packet buffering
177    NoDelay,
178    /// Disable SURB-based egress rate control at the Exit.
179    NoRateControl,
180}
181
182impl From<SessionCapability> for hopr_lib::SessionCapabilities {
183    fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
184        match cap {
185            SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
186            SessionCapability::Retransmission => {
187                hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
188            }
189            SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
190            SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
191            SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
192        }
193    }
194}
195
196#[serde_as]
197#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
198#[serde(rename_all = "camelCase")]
199pub(crate) struct SessionWebsocketClientQueryRequest {
200    #[serde_as(as = "DisplayFromStr")]
201    #[schema(required = true, value_type = String)]
202    pub destination: Address,
203    #[schema(required = true)]
204    pub hops: u8,
205    #[cfg(feature = "explicit-path")]
206    #[schema(required = false, value_type = String)]
207    pub path: Option<Vec<Address>>,
208    #[schema(required = true)]
209    #[serde_as(as = "Vec<DisplayFromStr>")]
210    pub capabilities: Vec<SessionCapability>,
211    #[schema(required = true)]
212    #[serde_as(as = "DisplayFromStr")]
213    pub target: SessionTargetSpec,
214    #[schema(required = false)]
215    #[serde(default = "default_protocol")]
216    pub protocol: IpProtocol,
217}
218
219#[inline]
220fn default_protocol() -> IpProtocol {
221    IpProtocol::TCP
222}
223
224impl SessionWebsocketClientQueryRequest {
225    pub(crate) async fn into_protocol_session_config(
226        self,
227    ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
228        #[cfg(not(feature = "explicit-path"))]
229        let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
230
231        #[cfg(feature = "explicit-path")]
232        let path_options = if let Some(path) = self.path {
233            // Explicit `path` will override `hops`
234            hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
235        } else {
236            hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
237        };
238
239        let mut capabilities = SessionCapabilities::empty();
240        capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
241
242        Ok((
243            self.destination,
244            self.target.into_target(self.protocol)?,
245            SessionClientConfig {
246                forward_path_options: path_options.clone(),
247                return_path_options: path_options.clone(), // TODO: allow using separate return options
248                capabilities,
249                ..Default::default()
250            },
251        ))
252    }
253}
254
255#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
256#[schema(value_type = String, format = Binary)]
257#[allow(dead_code)] // not dead code, just for codegen
258struct WssData(Vec<u8>);
259
260/// Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR
261/// sessions.
262///
263/// Once configured, the session represents and automatically managed connection to a target peer through a network
264/// routing configuration. The session can be used to send and receive binary data over the network.
265///
266/// Authentication (if enabled) is done by cookie `X-Auth-Token`.
267///
268/// Connect to the endpoint by using a WS client. No preview is available. Example:
269/// `ws://127.0.0.1:3001/api/v4/session/websocket`
270#[allow(dead_code)] // not dead code, just for documentation
271#[utoipa::path(
272        get,
273        path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
274        description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
275        request_body(
276            content = SessionWebsocketClientQueryRequest,
277            content_type = "application/json",
278            description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
279        ),
280        responses(
281            (status = 200, description = "Successfully created a new client websocket session."),
282            (status = 401, description = "Invalid authorization token.", body = ApiError),
283            (status = 422, description = "Unknown failure", body = ApiError),
284            (status = 429, description = "Too many open websocket connections.", body = ApiError),
285        ),
286        security(
287            ("api_token" = []),
288            ("bearer_token" = [])
289        ),
290        tag = "Session",
291    )]
292
293pub(crate) async fn websocket(
294    ws: WebSocketUpgrade,
295    Query(query): Query<SessionWebsocketClientQueryRequest>,
296    State(state): State<Arc<InternalState>>,
297) -> Result<impl IntoResponse, impl IntoResponse> {
298    let (dst, target, data) = query
299        .into_protocol_session_config()
300        .await
301        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
302
303    let hopr = state.hopr.clone();
304    let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
305        error!(error = %e, "Failed to establish session");
306        (
307            StatusCode::UNPROCESSABLE_ENTITY,
308            ApiErrorStatus::UnknownFailure(e.to_string()),
309        )
310    })?;
311
312    Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
313}
314
315enum WebSocketInput {
316    Network(Result<Box<[u8]>, std::io::Error>),
317    WsInput(Result<Message, Error>),
318}
319
320/// The maximum number of bytes read from a Session that WS can transfer within a single message.
321const WS_MAX_SESSION_READ_SIZE: usize = 4096;
322
323#[tracing::instrument(level = "debug", skip(socket, session))]
324async fn websocket_connection(socket: WebSocket, session: HoprSession) {
325    let session_id = *session.id();
326
327    let (rx, mut tx) = session.split();
328    let (mut sender, receiver) = socket.split();
329
330    let mut queue = (
331        receiver.map(WebSocketInput::WsInput),
332        AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
333    )
334        .merge();
335
336    let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
337
338    while let Some(v) = queue.next().await {
339        match v {
340            WebSocketInput::Network(bytes) => match bytes {
341                Ok(bytes) => {
342                    let len = bytes.len();
343                    if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
344                        error!(
345                            error = %e,
346                            "Failed to emit read data onto the websocket, closing connection"
347                        );
348                        break;
349                    };
350                    bytes_from_session += len;
351                }
352                Err(e) => {
353                    error!(
354                        error = %e,
355                        "Failed to push data from network to socket, closing connection"
356                    );
357                    break;
358                }
359            },
360            WebSocketInput::WsInput(ws_in) => match ws_in {
361                Ok(Message::Binary(data)) => {
362                    let len = data.len();
363                    if let Err(e) = tx.write(data.as_ref()).await {
364                        error!(error = %e, "Failed to write data to the session, closing connection");
365                        break;
366                    }
367                    bytes_to_session += len;
368                }
369                Ok(Message::Text(_)) => {
370                    error!("Received string instead of binary data, closing connection");
371                    break;
372                }
373                Ok(Message::Close(_)) => {
374                    debug!("Received close frame, closing connection");
375                    break;
376                }
377                Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
378                Err(e) => {
379                    error!(error = %e, "Failed to get a valid websocket message, closing connection");
380                    break;
381                }
382            },
383        }
384    }
385
386    info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
387}
388
389#[serde_as]
390#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
391#[schema(example = json!({ "Hops": 1 }))]
392/// Routing options for the Session.
393pub enum RoutingOptions {
394    #[cfg(feature = "explicit-path")]
395    #[schema(value_type = Vec<String>)]
396    IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<Address>),
397    Hops(usize),
398}
399
400impl RoutingOptions {
401    pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
402        Ok(match self {
403            #[cfg(feature = "explicit-path")]
404            RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
405            RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
406        })
407    }
408}
409
410#[serde_as]
411#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
412#[schema(example = json!({
413        "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
414        "forwardPath": { "Hops": 1 },
415        "returnPath": { "Hops": 1 },
416        "target": {"Plain": "localhost:8080"},
417        "listenHost": "127.0.0.1:10000",
418        "capabilities": ["Retransmission", "Segmentation"],
419        "responseBuffer": "2 MB",
420        "maxSurbUpstream": "2000 kb/s",
421        "sessionPool": 0,
422        "maxClientSessions": 2
423    }))]
424#[serde(rename_all = "camelCase")]
425/// Request body for creating a new client session.
426pub(crate) struct SessionClientRequest {
427    /// Address of the Exit node.
428    #[serde_as(as = "DisplayFromStr")]
429    #[schema(value_type = String)]
430    pub destination: Address,
431    /// The forward path for the Session.
432    pub forward_path: RoutingOptions,
433    /// The return path for the Session.
434    pub return_path: RoutingOptions,
435    /// Target for the Session.
436    pub target: SessionTargetSpec,
437    /// Listen host (`ip:port`) for the Session socket at the Entry node.
438    ///
439    /// Supports also partial specification (only `ip` or only `:port`) with the
440    /// respective part replaced by the node's configured default.
441    pub listen_host: Option<String>,
442    #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
443    /// Capabilities for the Session protocol.
444    ///
445    /// Defaults to `Segmentation` and `Retransmission` for TCP and nothing for UDP.
446    pub capabilities: Option<Vec<SessionCapability>>,
447    /// The amount of response data the Session counterparty can deliver back to us,
448    /// without us sending any SURBs to them.
449    ///
450    /// In other words, this size is recalculated to a number of SURBs delivered
451    /// to the counterparty upfront and then maintained.
452    /// The maintenance is dynamic, based on the number of responses we receive.
453    ///
454    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
455    /// at least the size of 2 Session packet payloads.
456    #[serde_as(as = "Option<DisplayFromStr>")]
457    #[schema(value_type = String)]
458    pub response_buffer: Option<bytesize::ByteSize>,
459    /// The maximum throughput at which artificial SURBs might be generated and sent
460    /// to the recipient of the Session.
461    ///
462    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
463    /// this should roughly match the maximum retrieval throughput.
464    ///
465    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
466    #[serde(default)]
467    #[serde(with = "human_bandwidth::option")]
468    #[schema(value_type = String)]
469    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
470    /// How many Sessions to pool for clients.
471    ///
472    /// If no sessions are pooled, they will be opened ad-hoc when a client connects.
473    /// It has no effect on UDP sessions in the current implementation.
474    ///
475    /// Currently, the maximum value is 5.
476    pub session_pool: Option<usize>,
477    /// The maximum number of client sessions that the listener can spawn.
478    ///
479    /// This currently applies only to the TCP sessions, as UDP sessions cannot
480    /// handle multiple clients (and spawn therefore always only a single session).
481    ///
482    /// If this value is smaller than the value specified in `session_pool`, it will
483    /// be set to that value.
484    ///
485    /// The default value is 5.
486    pub max_client_sessions: Option<usize>,
487}
488
489impl SessionClientRequest {
490    pub(crate) async fn into_protocol_session_config(
491        self,
492        target_protocol: IpProtocol,
493    ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
494        Ok((
495            self.destination,
496            self.target.into_target(target_protocol)?,
497            SessionClientConfig {
498                forward_path_options: self.forward_path.resolve().await?,
499                return_path_options: self.return_path.resolve().await?,
500                capabilities: self
501                    .capabilities
502                    .map(|vs| {
503                        let mut caps = SessionCapabilities::empty();
504                        caps.extend(vs.into_iter().map(SessionCapabilities::from));
505                        caps
506                    })
507                    .unwrap_or_else(|| match target_protocol {
508                        IpProtocol::TCP => {
509                            hopr_lib::SessionCapability::RetransmissionAck
510                                | hopr_lib::SessionCapability::RetransmissionNack
511                                | hopr_lib::SessionCapability::Segmentation
512                        }
513                        // Only Segmentation capability for UDP per default
514                        _ => SessionCapability::Segmentation.into(),
515                    }),
516                surb_management: SessionConfig {
517                    response_buffer: self.response_buffer,
518                    max_surb_upstream: self.max_surb_upstream,
519                }
520                .into(),
521                ..Default::default()
522            },
523        ))
524    }
525}
526
527#[serde_as]
528#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
529#[schema(example = json!({
530        "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
531        "target": "example.com:80",
532        "forwardPath": { "Hops": 1 },
533        "returnPath": { "Hops": 1 },
534        "protocol": "tcp",
535        "ip": "127.0.0.1",
536        "port": 5542,
537        "mtu": 1020,
538        "surb_len": 400,
539        "active_clients": []
540    }))]
541#[serde(rename_all = "camelCase")]
542/// Response body for creating a new client session.
543pub(crate) struct SessionClientResponse {
544    #[schema(example = "example.com:80")]
545    /// Target of the Session.
546    pub target: String,
547    /// Destination node (exit node) of the Session.
548    #[serde_as(as = "DisplayFromStr")]
549    #[schema(value_type = String)]
550    pub destination: Address,
551    /// Forward routing path.
552    pub forward_path: RoutingOptions,
553    /// Return routing path.
554    pub return_path: RoutingOptions,
555    /// IP protocol used by Session's listening socket.
556    #[serde_as(as = "DisplayFromStr")]
557    #[schema(example = "tcp")]
558    pub protocol: IpProtocol,
559    /// Listening IP address of the Session's socket.
560    #[schema(example = "127.0.0.1")]
561    pub ip: String,
562    #[schema(example = 5542)]
563    /// Listening port of the Session's socket.
564    pub port: u16,
565    /// MTU used by the Session.
566    pub mtu: usize,
567    /// Size of a Single Use Reply Block used by the protocol.
568    ///
569    /// This is usefult for SURB balancing calculations.
570    pub surb_len: usize,
571    /// Lists Session IDs of all active clients.
572    ///
573    /// Can contain multiple entries on TCP sessions, but currently
574    /// always only a single entry on UDP sessions.
575    pub active_clients: Vec<String>,
576}
577
578/// This function first tries to parse `requested` as the `ip:port` host pair.
579/// If that does not work, it tries to parse `requested` as a single IP address
580/// and as a `:` prefixed port number. Whichever of those fails, is replaced by the corresponding
581/// part from the given `default`.
582fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
583    match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
584        Some(Err(requested)) => {
585            // If the requested host is not parseable as a whole as `SocketAddr`, try only its parts
586            debug!(requested, %default, "using partially default listen host");
587            std::net::SocketAddr::new(
588                requested.parse().unwrap_or(default.ip()),
589                requested
590                    .strip_prefix(":")
591                    .and_then(|p| u16::from_str(p).ok())
592                    .unwrap_or(default.port()),
593            )
594        }
595        Some(Ok(requested)) => {
596            debug!(%requested, "using requested listen host");
597            requested
598        }
599        None => {
600            debug!(%default, "using default listen host");
601            default
602        }
603    }
604}
605
606struct SessionPool {
607    pool: Option<Arc<std::sync::Mutex<VecDeque<HoprSession>>>>,
608    ah: Option<AbortHandle>,
609}
610
611impl SessionPool {
612    pub const MAX_SESSION_POOL_SIZE: usize = 5;
613
614    async fn new(
615        size: usize,
616        dst: Address,
617        target: SessionTarget,
618        cfg: SessionClientConfig,
619        hopr: Arc<Hopr>,
620    ) -> Result<Self, (StatusCode, ApiErrorStatus)> {
621        let pool = Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(size)));
622        let hopr_clone = hopr.clone();
623        let pool_clone = pool.clone();
624        futures::stream::iter(0..size.min(Self::MAX_SESSION_POOL_SIZE))
625            .map(Ok)
626            .try_for_each_concurrent(Self::MAX_SESSION_POOL_SIZE, move |i| {
627                let pool = pool_clone.clone();
628                let hopr = hopr_clone.clone();
629                let target = target.clone();
630                let cfg = cfg.clone();
631                async move {
632                    match hopr.connect_to(dst, target.clone(), cfg.clone()).await {
633                        Ok(s) => {
634                            debug!(session_id = %s.id(), num_session = i, "created a new session in pool");
635                            pool.lock()
636                                .map_err(|_| {
637                                    (
638                                        StatusCode::INTERNAL_SERVER_ERROR,
639                                        ApiErrorStatus::UnknownFailure("lock failed".into()),
640                                    )
641                                })?
642                                .push_back(s);
643                            Ok(())
644                        }
645                        Err(error) => {
646                            error!(%error, num_session = i, "failed to establish session for pool");
647                            Err((
648                                StatusCode::INTERNAL_SERVER_ERROR,
649                                ApiErrorStatus::UnknownFailure(format!(
650                                    "failed to establish session #{i} in pool to {dst}: {error}"
651                                )),
652                            ))
653                        }
654                    }
655                }
656            })
657            .await?;
658
659        // Spawn a task that periodically sends keep alive messages to the Session in the pool.
660        if !pool.lock().map(|p| p.is_empty()).unwrap_or(true) {
661            let pool_clone_1 = pool.clone();
662            let pool_clone_2 = pool.clone();
663            let pool_clone_3 = pool.clone();
664            Ok(Self {
665                pool: Some(pool),
666                ah: Some(hopr_async_runtime::spawn_as_abortable!(
667                    futures_time::stream::interval(futures_time::time::Duration::from(
668                        std::time::Duration::from_secs(1).max(hopr.config().session.idle_timeout / 2)
669                    ))
670                    .take_while(move |_| {
671                        // Continue the infinite interval stream until there are sessions in the pool
672                        futures::future::ready(pool_clone_1.lock().is_ok_and(|p| !p.is_empty()))
673                    })
674                    .flat_map(move |_| {
675                        // Get all SessionIds of the remaining Sessions in the pool
676                        let ids = pool_clone_2.lock().ok().map(|v| v.iter().map(|s| *s.id()).collect::<Vec<_>>());
677                        futures::stream::iter(ids.into_iter().flatten())
678                    })
679                    .for_each(move |id| {
680                        let hopr = hopr.clone();
681                        let pool = pool_clone_3.clone();
682                        async move {
683                            // Make sure the Session is still alive, otherwise remove it from the pool
684                            if let Err(error) = hopr.keep_alive_session(&id).await {
685                                error!(%error, %dst, session_id = %id, "session in pool is not alive, removing from pool");
686                                if let Ok(mut pool) = pool.lock() {
687                                    pool.retain(|s| *s.id() != id);
688                                }
689                            }
690                        }
691                    })
692                ))
693            })
694        } else {
695            Ok(Self { pool: None, ah: None })
696        }
697    }
698
699    fn pop(&mut self) -> Option<HoprSession> {
700        self.pool.as_ref().and_then(|pool| pool.lock().ok()?.pop_front())
701    }
702}
703
704impl Drop for SessionPool {
705    fn drop(&mut self) {
706        if let Some(ah) = self.ah.take() {
707            ah.abort();
708        }
709    }
710}
711
712async fn create_tcp_client_binding(
713    bind_host: std::net::SocketAddr,
714    state: Arc<InternalState>,
715    args: SessionClientRequest,
716) -> Result<(std::net::SocketAddr, Option<HoprSessionId>), (StatusCode, ApiErrorStatus)> {
717    let target_spec = args.target.clone();
718    let (dst, target, data) = args
719        .clone()
720        .into_protocol_session_config(IpProtocol::TCP)
721        .await
722        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
723
724    // Bind the TCP socket first
725    let (bound_host, tcp_listener) = tcp_listen_on(bind_host).await.map_err(|e| {
726        if e.kind() == std::io::ErrorKind::AddrInUse {
727            (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
728        } else {
729            (
730                StatusCode::UNPROCESSABLE_ENTITY,
731                ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
732            )
733        }
734    })?;
735    info!(%bound_host, "TCP session listener bound");
736
737    // For each new TCP connection coming to the listener,
738    // open a Session with the same parameters
739    let hopr = state.hopr.clone();
740
741    // Create a session pool if requested
742    let session_pool_size = args.session_pool.unwrap_or(0);
743    let mut session_pool = SessionPool::new(session_pool_size, dst, target.clone(), data.clone(), hopr.clone()).await?;
744
745    let active_sessions = Arc::new(DashMap::new());
746    let mut max_clients = args.max_client_sessions.unwrap_or(5).max(1);
747
748    if max_clients < session_pool_size {
749        max_clients = session_pool_size;
750    }
751
752    // Create an abort handler for the listener
753    let (abort_handle, abort_reg) = AbortHandle::new_pair();
754    let active_sessions_clone = active_sessions.clone();
755    hopr_async_runtime::prelude::spawn(async move {
756        let active_sessions_clone_2 = active_sessions_clone.clone();
757
758        futures::stream::Abortable::new(tokio_stream::wrappers::TcpListenerStream::new(tcp_listener), abort_reg)
759            .and_then(|sock| async { Ok((sock.peer_addr()?, sock)) })
760            .for_each(move |accepted_client| {
761                let data = data.clone();
762                let target = target.clone();
763                let hopr = hopr.clone();
764                let active_sessions = active_sessions_clone_2.clone();
765
766                // Try to pop from the pool only if a client was accepted
767                let maybe_pooled_session = accepted_client.is_ok().then(|| session_pool.pop()).flatten();
768                async move {
769                    match accepted_client {
770                        Ok((sock_addr, mut stream)) => {
771                            debug!(?sock_addr, "incoming TCP connection");
772
773                            // Check that we are still within the quota,
774                            // otherwise shutdown the new client immediately
775                            if active_sessions.len() >= max_clients {
776                                error!(?bind_host, "no more client slots available at listener");
777                                use tokio::io::AsyncWriteExt;
778                                if let Err(error) = stream.shutdown().await {
779                                    error!(%error, ?sock_addr, "failed to shutdown TCP connection");
780                                }
781                                return;
782                            }
783
784                            // See if we still have some session pooled
785                            let session = match maybe_pooled_session {
786                                Some(s) => {
787                                    debug!(session_id = %s.id(), "using pooled session");
788                                    s
789                                }
790                                None => {
791                                    debug!("no more active sessions in the pool, creating a new one");
792                                    match hopr.connect_to(dst, target, data).await {
793                                        Ok(s) => s,
794                                        Err(error) => {
795                                            error!(%error, "failed to establish session");
796                                            return;
797                                        }
798                                    }
799                                }
800                            };
801
802                            let session_id = *session.id();
803                            debug!(?sock_addr, %session_id, "new session for incoming TCP connection");
804
805                            let (abort_handle, abort_reg) = AbortHandle::new_pair();
806                            active_sessions.insert(session_id, (sock_addr, abort_handle));
807
808                            #[cfg(all(feature = "prometheus", not(test)))]
809                            METRIC_ACTIVE_CLIENTS.increment(&["tcp"], 1.0);
810
811                            hopr_async_runtime::prelude::spawn(
812                                // The stream either terminates naturally (by the client closing the TCP connection)
813                                // or is terminated via the abort handle.
814                                bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, Some(abort_reg)).then(
815                                    move |_| async move {
816                                        // Regardless how the session ended, remove the abort handle
817                                        // from the map
818                                        active_sessions.remove(&session_id);
819
820                                        debug!(%session_id, "tcp session has ended");
821
822                                        #[cfg(all(feature = "prometheus", not(test)))]
823                                        METRIC_ACTIVE_CLIENTS.decrement(&["tcp"], 1.0);
824                                    },
825                                ),
826                            );
827                        }
828                        Err(error) => error!(%error, "failed to accept connection"),
829                    }
830                }
831            })
832            .await;
833
834        // Once the listener is done, abort all active sessions created by the listener
835        active_sessions_clone.iter().for_each(|entry| {
836            let (sock_addr, handle) = entry.value();
837            debug!(session_id = %entry.key(), ?sock_addr, "aborting opened TCP session after listener has been closed");
838            handle.abort()
839        });
840    });
841
842    state.open_listeners.write_arc().await.insert(
843        ListenerId(hopr_network_types::types::IpProtocol::TCP, bound_host),
844        StoredSessionEntry {
845            destination: dst,
846            target: target_spec.clone(),
847            forward_path: args.forward_path.clone(),
848            return_path: args.return_path.clone(),
849            clients: active_sessions,
850            abort_handle,
851        },
852    );
853    Ok((bound_host, None))
854}
855
856async fn create_udp_client_binding(
857    bind_host: std::net::SocketAddr,
858    state: Arc<InternalState>,
859    args: SessionClientRequest,
860) -> Result<(std::net::SocketAddr, Option<HoprSessionId>), (StatusCode, ApiErrorStatus)> {
861    let target_spec = args.target.clone();
862    let (dst, target, data) = args
863        .clone()
864        .into_protocol_session_config(IpProtocol::UDP)
865        .await
866        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
867
868    // Bind the UDP socket first
869    let (bound_host, udp_socket) = udp_bind_to(bind_host).await.map_err(|e| {
870        if e.kind() == std::io::ErrorKind::AddrInUse {
871            (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
872        } else {
873            (
874                StatusCode::UNPROCESSABLE_ENTITY,
875                ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
876            )
877        }
878    })?;
879
880    info!(%bound_host, "UDP session listener bound");
881
882    let hopr = state.hopr.clone();
883
884    // Create a single session for the UDP socket
885    let session = hopr.connect_to(dst, target, data).await.map_err(|e| {
886        (
887            StatusCode::UNPROCESSABLE_ENTITY,
888            ApiErrorStatus::UnknownFailure(e.to_string()),
889        )
890    })?;
891
892    let open_listeners_clone = state.open_listeners.clone();
893    let listener_id = ListenerId(hopr_network_types::types::IpProtocol::UDP, bound_host);
894
895    // Create an abort handle so that the Session can be terminated by aborting
896    // the UDP stream first. Because under the hood, the bind_session_to_stream uses
897    // `transfer_session` which in turn uses `copy_duplex_abortable`, aborting the
898    // `udp_socket` will:
899    //
900    // 1. Initiate graceful shutdown of `udp_socket`
901    // 2. Once done, initiate a graceful shutdown of `session`
902    // 3. Finally, return from the `bind_session_to_stream` which will terminate the spawned task
903    //
904    // This is needed because the `udp_socket` cannot terminate by itself.
905    let (abort_handle, abort_reg) = AbortHandle::new_pair();
906    let clients = Arc::new(DashMap::new());
907    // TODO: add multiple client support to UDP sessions (#7370)
908    let session_id = *session.id();
909    clients.insert(session_id, (bind_host, abort_handle.clone()));
910    hopr_async_runtime::prelude::spawn(async move {
911        #[cfg(all(feature = "prometheus", not(test)))]
912        METRIC_ACTIVE_CLIENTS.increment(&["udp"], 1.0);
913
914        bind_session_to_stream(session, udp_socket, HOPR_UDP_BUFFER_SIZE, Some(abort_reg)).await;
915
916        #[cfg(all(feature = "prometheus", not(test)))]
917        METRIC_ACTIVE_CLIENTS.decrement(&["udp"], 1.0);
918
919        // Once the Session closes, remove it from the list
920        open_listeners_clone.write_arc().await.remove(&listener_id);
921    });
922
923    state.open_listeners.write_arc().await.insert(
924        listener_id,
925        StoredSessionEntry {
926            destination: dst,
927            target: target_spec.clone(),
928            forward_path: args.forward_path.clone(),
929            return_path: args.return_path.clone(),
930            abort_handle,
931            clients,
932        },
933    );
934    Ok((bound_host, Some(session_id)))
935}
936
937/// Creates a new client session returning the given session listening host and port over TCP or UDP.
938/// If no listening port is given in the request, the socket will be bound to a random free
939/// port and returned in the response.
940/// Different capabilities can be configured for the session, such as data segmentation or
941/// retransmission.
942///
943/// Once the host and port are bound, it is possible to use the socket for bidirectional read/write
944/// communication over the selected IP protocol and HOPR network routing with the given destination.
945/// The destination HOPR node forwards all the data to the given target over the selected IP protocol.
946///
947/// Various services require different types of socket communications:
948/// - services running over UDP usually do not require data retransmission, as it is already expected
949/// that UDP does not provide these and is therefore handled at the application layer.
950/// - On the contrary, services running over TCP *almost always* expect data segmentation and
951/// retransmission capabilities, so these should be configured while creating a session that passes
952/// TCP data.
953#[utoipa::path(
954        post,
955        path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
956        description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
957        params(
958            ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
959        ),
960        request_body(
961            content = SessionClientRequest,
962            description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
963            content_type = "application/json"),
964        responses(
965            (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
966            (status = 400, description = "Invalid IP protocol.", body = ApiError),
967            (status = 401, description = "Invalid authorization token.", body = ApiError),
968            (status = 409, description = "Listening address and port already in use.", body = ApiError),
969            (status = 422, description = "Unknown failure", body = ApiError),
970        ),
971        security(
972            ("api_token" = []),
973            ("bearer_token" = [])
974        ),
975        tag = "Session"
976    )]
977pub(crate) async fn create_client(
978    State(state): State<Arc<InternalState>>,
979    Path(protocol): Path<IpProtocol>,
980    Json(args): Json<SessionClientRequest>,
981) -> Result<impl IntoResponse, impl IntoResponse> {
982    let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
983
984    let listener_id = ListenerId(protocol.into(), bind_host);
985    if bind_host.port() > 0 && state.open_listeners.read_arc().await.contains_key(&listener_id) {
986        return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
987    }
988
989    debug!("binding {protocol} session listening socket to {bind_host}");
990    let (bound_host, udp_session_id) = match protocol {
991        IpProtocol::TCP => create_tcp_client_binding(bind_host, state.clone(), args.clone()).await?,
992        IpProtocol::UDP => create_udp_client_binding(bind_host, state.clone(), args.clone()).await?,
993    };
994
995    Ok::<_, (StatusCode, ApiErrorStatus)>(
996        (
997            StatusCode::OK,
998            Json(SessionClientResponse {
999                protocol,
1000                ip: bound_host.ip().to_string(),
1001                port: bound_host.port(),
1002                target: args.target.to_string(),
1003                destination: args.destination,
1004                forward_path: args.forward_path.clone(),
1005                return_path: args.return_path.clone(),
1006                mtu: SESSION_MTU,
1007                surb_len: SURB_SIZE,
1008                active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
1009            }),
1010        )
1011            .into_response(),
1012    )
1013}
1014
1015/// Lists existing Session listeners for the given IP protocol.
1016#[utoipa::path(
1017    get,
1018    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
1019    description = "Lists existing Session listeners for the given IP protocol.",
1020    params(
1021        ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
1022    ),
1023    responses(
1024        (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
1025            {
1026                "target": "example.com:80",
1027                "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
1028                "forwardPath": { "Hops": 1 },
1029                "returnPath": { "Hops": 1 },
1030                "protocol": "tcp",
1031                "ip": "127.0.0.1",
1032                "port": 5542,
1033                "surbLen": 400,
1034                "mtu": 1020,
1035                "activeClients": []
1036            }
1037        ])),
1038        (status = 400, description = "Invalid IP protocol.", body = ApiError),
1039        (status = 401, description = "Invalid authorization token.", body = ApiError),
1040        (status = 422, description = "Unknown failure", body = ApiError)
1041    ),
1042    security(
1043        ("api_token" = []),
1044        ("bearer_token" = [])
1045    ),
1046    tag = "Session",
1047)]
1048pub(crate) async fn list_clients(
1049    State(state): State<Arc<InternalState>>,
1050    Path(protocol): Path<IpProtocol>,
1051) -> Result<impl IntoResponse, impl IntoResponse> {
1052    let response = state
1053        .open_listeners
1054        .read_arc()
1055        .await
1056        .iter()
1057        .filter(|(id, _)| id.0 == protocol.into())
1058        .map(|(id, entry)| SessionClientResponse {
1059            protocol,
1060            ip: id.1.ip().to_string(),
1061            port: id.1.port(),
1062            target: entry.target.to_string(),
1063            forward_path: entry.forward_path.clone(),
1064            return_path: entry.return_path.clone(),
1065            destination: entry.destination,
1066            mtu: SESSION_MTU,
1067            surb_len: SURB_SIZE,
1068            active_clients: entry.clients.iter().map(|e| e.key().to_string()).collect(),
1069        })
1070        .collect::<Vec<_>>();
1071
1072    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
1073}
1074
1075#[serde_as]
1076#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
1077#[schema(example = json!({
1078        "responseBuffer": "2 MB",
1079        "maxSurbUpstream": "2 Mbps"
1080    }))]
1081#[serde(rename_all = "camelCase")]
1082pub(crate) struct SessionConfig {
1083    /// The amount of response data the Session counterparty can deliver back to us,
1084    /// without us sending any SURBs to them.
1085    ///
1086    /// In other words, this size is recalculated to a number of SURBs delivered
1087    /// to the counterparty upfront and then maintained.
1088    /// The maintenance is dynamic, based on the number of responses we receive.
1089    ///
1090    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
1091    /// at least the size of 2 Session packet payloads.
1092    #[serde(default)]
1093    #[serde_as(as = "Option<DisplayFromStr>")]
1094    #[schema(value_type = String)]
1095    pub response_buffer: Option<bytesize::ByteSize>,
1096    /// The maximum throughput at which artificial SURBs might be generated and sent
1097    /// to the recipient of the Session.
1098    ///
1099    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
1100    /// this should roughly match the maximum retrieval throughput.
1101    ///
1102    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
1103    #[serde(default)]
1104    #[serde(with = "human_bandwidth::option")]
1105    #[schema(value_type = String)]
1106    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
1107}
1108
1109impl From<SessionConfig> for Option<SurbBalancerConfig> {
1110    fn from(value: SessionConfig) -> Self {
1111        match value.response_buffer {
1112            // Buffer worth at least 2 reply packets
1113            Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
1114                target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
1115                max_surbs_per_sec: value
1116                    .max_surb_upstream
1117                    .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
1118                    .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
1119                ..Default::default()
1120            }),
1121            // No additional SURBs are set up and maintained, useful for high-send low-reply sessions
1122            Some(_) => None,
1123            // Use defaults otherwise
1124            None => Some(SurbBalancerConfig::default()),
1125        }
1126    }
1127}
1128
1129impl From<SurbBalancerConfig> for SessionConfig {
1130    fn from(value: SurbBalancerConfig) -> Self {
1131        Self {
1132            response_buffer: Some(bytesize::ByteSize::b(
1133                value.target_surb_buffer_size * SESSION_MTU as u64,
1134            )),
1135            max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
1136                value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
1137            )),
1138        }
1139    }
1140}
1141
1142#[utoipa::path(
1143    post,
1144    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
1145    description = "Updates configuration of an existing active session.",
1146    params(
1147        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
1148    ),
1149    request_body(
1150            content = SessionConfig,
1151            description = "Allows updating of several parameters of an existing active session.",
1152            content_type = "application/json"),
1153    responses(
1154            (status = 204, description = "Successfully updated the configuration"),
1155            (status = 400, description = "Invalid configuration.", body = ApiError),
1156            (status = 401, description = "Invalid authorization token.", body = ApiError),
1157            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
1158            (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
1159            (status = 422, description = "Unknown failure", body = ApiError),
1160    ),
1161    security(
1162            ("api_token" = []),
1163            ("bearer_token" = [])
1164    ),
1165    tag = "Session"
1166)]
1167pub(crate) async fn adjust_session(
1168    State(state): State<Arc<InternalState>>,
1169    Path(session_id): Path<String>,
1170    Json(args): Json<SessionConfig>,
1171) -> Result<impl IntoResponse, impl IntoResponse> {
1172    let session_id = HoprSessionId::from_str(&session_id)
1173        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
1174
1175    if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
1176        match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
1177            Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
1178            Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
1179                SessionManagerError::NonExistingSession,
1180            )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
1181            Err(e) => Err((
1182                StatusCode::NOT_ACCEPTABLE,
1183                ApiErrorStatus::UnknownFailure(e.to_string()),
1184            )),
1185        }
1186    } else {
1187        Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
1188    }
1189}
1190
1191#[utoipa::path(
1192    get,
1193    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
1194    description = "Gets configuration of an existing active session.",
1195    params(
1196        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
1197    ),
1198    responses(
1199            (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
1200            (status = 400, description = "Invalid session ID.", body = ApiError),
1201            (status = 401, description = "Invalid authorization token.", body = ApiError),
1202            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
1203            (status = 422, description = "Unknown failure", body = ApiError),
1204    ),
1205    security(
1206            ("api_token" = []),
1207            ("bearer_token" = [])
1208    ),
1209    tag = "Session"
1210)]
1211pub(crate) async fn session_config(
1212    State(state): State<Arc<InternalState>>,
1213    Path(session_id): Path<String>,
1214) -> Result<impl IntoResponse, impl IntoResponse> {
1215    let session_id = HoprSessionId::from_str(&session_id)
1216        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
1217
1218    match state.hopr.get_session_surb_balancer_config(&session_id).await {
1219        Ok(Some(cfg)) => {
1220            Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
1221        }
1222        Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
1223        Err(e) => Err((
1224            StatusCode::UNPROCESSABLE_ENTITY,
1225            ApiErrorStatus::UnknownFailure(e.to_string()),
1226        )),
1227    }
1228}
1229
1230#[derive(
1231    Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
1232)]
1233#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
1234#[serde(rename_all = "lowercase")]
1235#[schema(example = "tcp")]
1236/// IP transport protocol
1237pub enum IpProtocol {
1238    #[allow(clippy::upper_case_acronyms)]
1239    TCP,
1240    #[allow(clippy::upper_case_acronyms)]
1241    UDP,
1242}
1243
1244impl From<IpProtocol> for hopr_lib::IpProtocol {
1245    fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
1246        match protocol {
1247            IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
1248            IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
1249        }
1250    }
1251}
1252
1253#[serde_as]
1254#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
1255pub struct SessionCloseClientQuery {
1256    #[serde_as(as = "DisplayFromStr")]
1257    #[schema(value_type = String, example = "tcp")]
1258    /// IP transport protocol
1259    pub protocol: IpProtocol,
1260
1261    /// Listening IP address of the Session.
1262    #[schema(example = "127.0.0.1:8545")]
1263    pub ip: String,
1264
1265    /// Session port used for the listener.
1266    #[schema(value_type = u16, example = 10101)]
1267    pub port: u16,
1268}
1269
1270/// Closes an existing Session listener.
1271/// The listener must've been previously created and bound for the given IP protocol.
1272/// Once a listener is closed, no more socket connections can be made to it.
1273/// If the passed port number is 0, listeners on all ports of the given listening IP and protocol
1274/// will be closed.
1275#[utoipa::path(
1276    delete,
1277    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
1278    description = "Closes an existing Session listener.",
1279    params(SessionCloseClientQuery),
1280    responses(
1281            (status = 204, description = "Listener closed successfully"),
1282            (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
1283            (status = 401, description = "Invalid authorization token.", body = ApiError),
1284            (status = 404, description = "Listener not found.", body = ApiError),
1285            (status = 422, description = "Unknown failure", body = ApiError)
1286    ),
1287    security(
1288            ("api_token" = []),
1289            ("bearer_token" = [])
1290    ),
1291    tag = "Session",
1292)]
1293pub(crate) async fn close_client(
1294    State(state): State<Arc<InternalState>>,
1295    Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
1296) -> Result<impl IntoResponse, impl IntoResponse> {
1297    let listening_ip: IpAddr = ip
1298        .parse()
1299        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
1300
1301    {
1302        let mut open_listeners = state.open_listeners.write_arc().await;
1303
1304        let mut to_remove = Vec::new();
1305
1306        // Find all listeners with protocol, listening IP and optionally port number (if > 0)
1307        open_listeners
1308            .iter()
1309            .filter(|(ListenerId(proto, addr), _)| {
1310                let protocol: hopr_lib::IpProtocol = protocol.into();
1311                protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1312            })
1313            .for_each(|(id, _)| to_remove.push(*id));
1314
1315        if to_remove.is_empty() {
1316            return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1317        }
1318
1319        for bound_addr in to_remove {
1320            let entry = open_listeners
1321                .remove(&bound_addr)
1322                .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1323
1324            entry.abort_handle.abort();
1325        }
1326    }
1327
1328    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1329}
1330
1331async fn try_restricted_bind<F, S, Fut>(
1332    addrs: Vec<std::net::SocketAddr>,
1333    range_str: &str,
1334    binder: F,
1335) -> std::io::Result<S>
1336where
1337    F: Fn(Vec<std::net::SocketAddr>) -> Fut,
1338    Fut: Future<Output = std::io::Result<S>>,
1339{
1340    if addrs.is_empty() {
1341        return Err(std::io::Error::other("no valid socket addresses found"));
1342    }
1343
1344    let range = range_str
1345        .split_once(":")
1346        .and_then(
1347            |(a, b)| match u16::from_str(a).and_then(|a| Ok((a, u16::from_str(b)?))) {
1348                Ok((a, b)) if a <= b => Some(a..=b),
1349                _ => None,
1350            },
1351        )
1352        .ok_or(std::io::Error::other(format!("invalid port range {range_str}")))?;
1353
1354    for port in range {
1355        let addrs = addrs
1356            .iter()
1357            .map(|addr| std::net::SocketAddr::new(addr.ip(), port))
1358            .collect::<Vec<_>>();
1359        match binder(addrs).await {
1360            Ok(listener) => return Ok(listener),
1361            Err(error) => debug!(%error, "listen address not usable"),
1362        }
1363    }
1364
1365    Err(std::io::Error::new(
1366        std::io::ErrorKind::AddrNotAvailable,
1367        format!("no valid socket addresses found within range: {range_str}"),
1368    ))
1369}
1370
1371async fn tcp_listen_on<A: std::net::ToSocketAddrs>(address: A) -> std::io::Result<(std::net::SocketAddr, TcpListener)> {
1372    let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
1373
1374    // If automatic port allocation is requested and there's a restriction on the port range
1375    // (via HOPRD_SESSION_PORT_RANGE), try to find an address within that range.
1376    if addrs.iter().all(|a| a.port() == 0) {
1377        if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
1378            let tcp_listener =
1379                try_restricted_bind(
1380                    addrs,
1381                    &range_str,
1382                    |a| async move { TcpListener::bind(a.as_slice()).await },
1383                )
1384                .await?;
1385            return Ok((tcp_listener.local_addr()?, tcp_listener));
1386        }
1387    }
1388
1389    let tcp_listener = TcpListener::bind(addrs.as_slice()).await?;
1390    Ok((tcp_listener.local_addr()?, tcp_listener))
1391}
1392
1393async fn udp_bind_to<A: std::net::ToSocketAddrs>(
1394    address: A,
1395) -> std::io::Result<(std::net::SocketAddr, ConnectedUdpStream)> {
1396    let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
1397
1398    let builder = ConnectedUdpStream::builder()
1399        .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
1400        .with_foreign_data_mode(ForeignDataMode::Discard) // discard data from UDP clients other than the first one served
1401        .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1402        .with_receiver_parallelism(UdpStreamParallelism::Auto);
1403
1404    // If automatic port allocation is requested and there's a restriction on the port range
1405    // (via HOPRD_SESSION_PORT_RANGE), try to find an address within that range.
1406    if addrs.iter().all(|a| a.port() == 0) {
1407        if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
1408            let udp_listener = try_restricted_bind(addrs, &range_str, |addrs| {
1409                futures::future::ready(builder.clone().build(addrs.as_slice()))
1410            })
1411            .await?;
1412
1413            return Ok((*udp_listener.bound_address(), udp_listener));
1414        }
1415    }
1416
1417    let udp_socket = builder.build(address)?;
1418    Ok((*udp_socket.bound_address(), udp_socket))
1419}
1420
1421async fn bind_session_to_stream<T>(
1422    mut session: HoprSession,
1423    mut stream: T,
1424    max_buf: usize,
1425    abort_reg: Option<AbortRegistration>,
1426) where
1427    T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
1428{
1429    let session_id = *session.id();
1430    match transfer_session(&mut session, &mut stream, max_buf, abort_reg).await {
1431        Ok((session_to_stream_bytes, stream_to_session_bytes)) => info!(
1432            session_id = ?session_id,
1433            session_to_stream_bytes, stream_to_session_bytes, "client session ended",
1434        ),
1435        Err(error) => error!(
1436            session_id = ?session_id,
1437            %error,
1438            "error during data transfer"
1439        ),
1440    }
1441}
1442
1443#[cfg(test)]
1444mod tests {
1445    use anyhow::Context;
1446    use futures::{
1447        FutureExt, StreamExt,
1448        channel::mpsc::{UnboundedReceiver, UnboundedSender},
1449    };
1450    use futures_time::future::FutureExt as TimeFutureExt;
1451    use hopr_crypto_types::crypto_traits::Randomizable;
1452    use hopr_lib::{ApplicationData, HoprPseudonym};
1453    use hopr_network_types::prelude::DestinationRouting;
1454    use tokio::io::{AsyncReadExt, AsyncWriteExt};
1455
1456    use super::*;
1457
1458    fn loopback_transport() -> (
1459        UnboundedSender<(DestinationRouting, ApplicationData)>,
1460        UnboundedReceiver<Box<[u8]>>,
1461    ) {
1462        let (input_tx, input_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationData)>();
1463        let (output_tx, output_rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
1464        tokio::task::spawn(
1465            input_rx
1466                .map(|(_, data)| Ok(data.plain_text))
1467                .forward(output_tx)
1468                .map(|e| tracing::debug!(?e, "loopback transport completed")),
1469        );
1470
1471        (input_tx, output_rx)
1472    }
1473
1474    #[tokio::test]
1475    async fn hoprd_session_connection_should_create_a_working_tcp_socket_through_which_data_can_be_sent_and_received()
1476    -> anyhow::Result<()> {
1477        let session_id = hopr_lib::HoprSessionId::new(4567u64, HoprPseudonym::random());
1478        let peer: hopr_lib::Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
1479        let session = hopr_lib::HoprSession::new(
1480            session_id,
1481            hopr_lib::DestinationRouting::forward_only(
1482                peer,
1483                hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1484            ),
1485            None,
1486            loopback_transport(),
1487            None,
1488        )?;
1489
1490        let (bound_addr, tcp_listener) = tcp_listen_on(("127.0.0.1", 0)).await.context("listen_on failed")?;
1491
1492        tokio::task::spawn(async move {
1493            match tcp_listener.accept().await {
1494                Ok((stream, _)) => bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, None).await,
1495                Err(e) => error!("failed to accept connection: {e}"),
1496            }
1497        });
1498
1499        let mut tcp_stream = tokio::net::TcpStream::connect(bound_addr)
1500            .await
1501            .context("connect failed")?;
1502
1503        let data = vec![b"hello", b"world", b"this ", b"is   ", b"    a", b" test"];
1504
1505        for d in data.clone().into_iter() {
1506            tcp_stream.write_all(d).await.context("write failed")?;
1507        }
1508
1509        for d in data.iter() {
1510            let mut buf = vec![0; d.len()];
1511            tcp_stream.read_exact(&mut buf).await.context("read failed")?;
1512        }
1513
1514        Ok(())
1515    }
1516
1517    #[test_log::test(tokio::test)]
1518    async fn hoprd_session_connection_should_create_a_working_udp_socket_through_which_data_can_be_sent_and_received()
1519    -> anyhow::Result<()> {
1520        let session_id = hopr_lib::HoprSessionId::new(4567u64, HoprPseudonym::random());
1521        let peer: hopr_lib::Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
1522        let session = hopr_lib::HoprSession::new(
1523            session_id,
1524            hopr_lib::DestinationRouting::forward_only(
1525                peer,
1526                hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1527            ),
1528            None,
1529            loopback_transport(),
1530            None,
1531        )?;
1532
1533        let (listen_addr, udp_listener) = udp_bind_to(("127.0.0.1", 0)).await.context("udp_bind_to failed")?;
1534
1535        let (abort_handle, abort_registration) = AbortHandle::new_pair();
1536        let jh = tokio::task::spawn(bind_session_to_stream(
1537            session,
1538            udp_listener,
1539            ApplicationData::PAYLOAD_SIZE,
1540            Some(abort_registration),
1541        ));
1542
1543        let mut udp_stream = ConnectedUdpStream::builder()
1544            .with_buffer_size(ApplicationData::PAYLOAD_SIZE)
1545            .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1546            .with_counterparty(listen_addr)
1547            .build(("127.0.0.1", 0))
1548            .context("bind failed")?;
1549
1550        let data = vec![b"hello", b"world", b"this ", b"is   ", b"    a", b" test"];
1551
1552        for d in data.clone().into_iter() {
1553            udp_stream.write_all(d).await.context("write failed")?;
1554            // ConnectedUdpStream performs flush with each write
1555        }
1556
1557        for d in data.iter() {
1558            let mut buf = vec![0; d.len()];
1559            udp_stream.read_exact(&mut buf).await.context("read failed")?;
1560        }
1561
1562        // Once aborted, the bind_session_to_stream task must terminate too
1563        abort_handle.abort();
1564        jh.timeout(futures_time::time::Duration::from_millis(200)).await??;
1565
1566        Ok(())
1567    }
1568
1569    #[test]
1570    fn test_build_binding_address() {
1571        let default = "10.0.0.1:10000".parse().unwrap();
1572
1573        let result = build_binding_host(Some("127.0.0.1:10000"), default);
1574        assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1575
1576        let result = build_binding_host(None, default);
1577        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1578
1579        let result = build_binding_host(Some("127.0.0.1"), default);
1580        assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1581
1582        let result = build_binding_host(Some(":1234"), default);
1583        assert_eq!(result, "10.0.0.1:1234".parse::<std::net::SocketAddr>().unwrap());
1584
1585        let result = build_binding_host(Some(":"), default);
1586        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1587
1588        let result = build_binding_host(Some(""), default);
1589        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1590    }
1591}