hoprd_api/
session.rs

1use std::{
2    collections::VecDeque,
3    fmt::Formatter,
4    future::Future,
5    hash::Hash,
6    net::{IpAddr, SocketAddr},
7    num::NonZeroUsize,
8    str::FromStr,
9    sync::Arc,
10};
11
12use axum::{
13    Error,
14    extract::{
15        Json, Path, State,
16        ws::{Message, WebSocket, WebSocketUpgrade},
17    },
18    http::status::StatusCode,
19    response::IntoResponse,
20};
21use axum_extra::extract::Query;
22use base64::Engine;
23use dashmap::DashMap;
24use futures::{
25    AsyncReadExt, AsyncWriteExt, FutureExt, SinkExt, StreamExt, TryStreamExt,
26    future::{AbortHandle, AbortRegistration},
27};
28use futures_concurrency::stream::Merge;
29use hopr_lib::{
30    Address, Hopr, HoprSession, HoprSessionId, HoprTransportError, SESSION_MTU, SURB_SIZE, ServiceId,
31    SessionCapabilities, SessionClientConfig, SessionManagerError, SessionTarget, SurbBalancerConfig,
32    TransportSessionError, errors::HoprLibError, transfer_session,
33};
34use hopr_network_types::{
35    prelude::{ConnectedUdpStream, IpOrHost, SealedHost, UdpStreamParallelism},
36    udp::ForeignDataMode,
37    utils::AsyncReadStreamer,
38};
39use serde::{Deserialize, Serialize};
40use serde_with::{DisplayFromStr, serde_as};
41use tokio::net::TcpListener;
42use tracing::{debug, error, info, trace};
43
44use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState, ListenerId};
45
46/// Size of the buffer for forwarding data to/from a TCP stream.
47pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
48
49/// Size of the buffer for forwarding data to/from a UDP stream.
50pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
51
52/// Size of the queue (back-pressure) for data incoming from a UDP stream.
53pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
54
55#[cfg(all(feature = "prometheus", not(test)))]
56lazy_static::lazy_static! {
57    static ref METRIC_ACTIVE_CLIENTS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
58        "hopr_session_hoprd_clients",
59        "Number of clients connected at this Entry node",
60        &["type"]
61    ).unwrap();
62}
63
64// Imported for some IDEs to not treat the `json!` macro inside the `schema` macro as an error
65#[allow(unused_imports)]
66use serde_json::json;
67
68#[serde_as]
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
70#[schema(
71    example = json!({"Plain": "example.com:80"}),
72    example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), // base64 for "HelloWorld"
73    example = json!({"Service": 0})
74)]
75/// Session target specification.
76pub enum SessionTargetSpec {
77    Plain(String),
78    Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
79    #[schema(value_type = u32)]
80    Service(ServiceId),
81}
82
83impl std::fmt::Display for SessionTargetSpec {
84    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85        match self {
86            SessionTargetSpec::Plain(t) => write!(f, "{t}"),
87            SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
88            SessionTargetSpec::Service(t) => write!(f, "#{t}"),
89        }
90    }
91}
92
93impl FromStr for SessionTargetSpec {
94    type Err = HoprLibError;
95
96    fn from_str(s: &str) -> Result<Self, Self::Err> {
97        Ok(if let Some(stripped) = s.strip_prefix("$$") {
98            Self::Sealed(
99                base64::prelude::BASE64_URL_SAFE
100                    .decode(stripped)
101                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
102            )
103        } else if let Some(stripped) = s.strip_prefix("#") {
104            Self::Service(
105                stripped
106                    .parse()
107                    .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
108            )
109        } else {
110            Self::Plain(s.to_owned())
111        })
112    }
113}
114
115impl SessionTargetSpec {
116    pub fn into_target(self, protocol: IpProtocol) -> Result<SessionTarget, HoprLibError> {
117        Ok(match (protocol, self) {
118            (IpProtocol::TCP, SessionTargetSpec::Plain(plain)) => SessionTarget::TcpStream(
119                IpOrHost::from_str(&plain)
120                    .map(SealedHost::from)
121                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
122            ),
123            (IpProtocol::UDP, SessionTargetSpec::Plain(plain)) => SessionTarget::UdpStream(
124                IpOrHost::from_str(&plain)
125                    .map(SealedHost::from)
126                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
127            ),
128            (IpProtocol::TCP, SessionTargetSpec::Sealed(enc)) => {
129                SessionTarget::TcpStream(SealedHost::Sealed(enc.into_boxed_slice()))
130            }
131            (IpProtocol::UDP, SessionTargetSpec::Sealed(enc)) => {
132                SessionTarget::UdpStream(SealedHost::Sealed(enc.into_boxed_slice()))
133            }
134            (_, SessionTargetSpec::Service(id)) => SessionTarget::ExitNode(id),
135        })
136    }
137}
138
139/// Entry stored in the session registry table.
140#[derive(Debug)]
141pub struct StoredSessionEntry {
142    /// Destination address of the Session counterparty.
143    pub destination: Address,
144    /// Target of the Session.
145    pub target: SessionTargetSpec,
146    /// Forward path used for the Session.
147    pub forward_path: RoutingOptions,
148    /// Return path used for the Session.
149    pub return_path: RoutingOptions,
150    /// The maximum number of client sessions that the listener can spawn.
151    pub max_client_sessions: usize,
152    /// The maximum number of SURB packets that can be sent upstream.
153    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
154    /// The amount of response data the Session counterparty can deliver back to us, without us
155    /// having to request it.
156    pub response_buffer: Option<bytesize::ByteSize>,
157    /// How many Sessions to pool for clients.
158    pub session_pool: Option<usize>,
159    /// The abort handle for the Session processing.
160    pub abort_handle: AbortHandle,
161
162    clients: Arc<DashMap<HoprSessionId, (SocketAddr, AbortHandle)>>,
163}
164
165#[repr(u8)]
166#[derive(
167    Debug,
168    Clone,
169    strum::EnumIter,
170    strum::Display,
171    strum::EnumString,
172    Serialize,
173    Deserialize,
174    PartialEq,
175    utoipa::ToSchema,
176)]
177#[schema(example = "Segmentation")]
178/// Session capabilities that can be negotiated with the target peer.
179pub enum SessionCapability {
180    /// Frame segmentation
181    Segmentation,
182    /// Frame retransmission (ACK and NACK-based)
183    Retransmission,
184    /// Frame retransmission (only ACK-based)
185    RetransmissionAckOnly,
186    /// Disable packet buffering
187    NoDelay,
188    /// Disable SURB-based egress rate control at the Exit.
189    NoRateControl,
190}
191
192impl From<SessionCapability> for hopr_lib::SessionCapabilities {
193    fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
194        match cap {
195            SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
196            SessionCapability::Retransmission => {
197                hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
198            }
199            SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
200            SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
201            SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
202        }
203    }
204}
205
206#[serde_as]
207#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
208#[serde(rename_all = "camelCase")]
209pub(crate) struct SessionWebsocketClientQueryRequest {
210    #[serde_as(as = "DisplayFromStr")]
211    #[schema(required = true, value_type = String)]
212    pub destination: Address,
213    #[schema(required = true)]
214    pub hops: u8,
215    #[cfg(feature = "explicit-path")]
216    #[schema(required = false, value_type = String)]
217    pub path: Option<Vec<Address>>,
218    #[schema(required = true)]
219    #[serde_as(as = "Vec<DisplayFromStr>")]
220    pub capabilities: Vec<SessionCapability>,
221    #[schema(required = true)]
222    #[serde_as(as = "DisplayFromStr")]
223    pub target: SessionTargetSpec,
224    #[schema(required = false)]
225    #[serde(default = "default_protocol")]
226    pub protocol: IpProtocol,
227}
228
229#[inline]
230fn default_protocol() -> IpProtocol {
231    IpProtocol::TCP
232}
233
234impl SessionWebsocketClientQueryRequest {
235    pub(crate) async fn into_protocol_session_config(
236        self,
237    ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
238        #[cfg(not(feature = "explicit-path"))]
239        let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
240
241        #[cfg(feature = "explicit-path")]
242        let path_options = if let Some(path) = self.path {
243            // Explicit `path` will override `hops`
244            hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
245        } else {
246            hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
247        };
248
249        let mut capabilities = SessionCapabilities::empty();
250        capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
251
252        Ok((
253            self.destination,
254            self.target.into_target(self.protocol)?,
255            SessionClientConfig {
256                forward_path_options: path_options.clone(),
257                return_path_options: path_options.clone(), // TODO: allow using separate return options
258                capabilities,
259                ..Default::default()
260            },
261        ))
262    }
263}
264
265#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
266#[schema(value_type = String, format = Binary)]
267#[allow(dead_code)] // not dead code, just for codegen
268struct WssData(Vec<u8>);
269
270/// Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR
271/// sessions.
272///
273/// Once configured, the session represents and automatically managed connection to a target peer through a network
274/// routing configuration. The session can be used to send and receive binary data over the network.
275///
276/// Authentication (if enabled) is done by cookie `X-Auth-Token`.
277///
278/// Connect to the endpoint by using a WS client. No preview is available. Example:
279/// `ws://127.0.0.1:3001/api/v4/session/websocket`
280#[allow(dead_code)] // not dead code, just for documentation
281#[utoipa::path(
282        get,
283        path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
284        description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
285        request_body(
286            content = SessionWebsocketClientQueryRequest,
287            content_type = "application/json",
288            description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
289        ),
290        responses(
291            (status = 200, description = "Successfully created a new client websocket session."),
292            (status = 401, description = "Invalid authorization token.", body = ApiError),
293            (status = 422, description = "Unknown failure", body = ApiError),
294            (status = 429, description = "Too many open websocket connections.", body = ApiError),
295        ),
296        security(
297            ("api_token" = []),
298            ("bearer_token" = [])
299        ),
300        tag = "Session",
301    )]
302
303pub(crate) async fn websocket(
304    ws: WebSocketUpgrade,
305    Query(query): Query<SessionWebsocketClientQueryRequest>,
306    State(state): State<Arc<InternalState>>,
307) -> Result<impl IntoResponse, impl IntoResponse> {
308    let (dst, target, data) = query
309        .into_protocol_session_config()
310        .await
311        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
312
313    let hopr = state.hopr.clone();
314    let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
315        error!(error = %e, "Failed to establish session");
316        (
317            StatusCode::UNPROCESSABLE_ENTITY,
318            ApiErrorStatus::UnknownFailure(e.to_string()),
319        )
320    })?;
321
322    Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
323}
324
325enum WebSocketInput {
326    Network(Result<Box<[u8]>, std::io::Error>),
327    WsInput(Result<Message, Error>),
328}
329
330/// The maximum number of bytes read from a Session that WS can transfer within a single message.
331const WS_MAX_SESSION_READ_SIZE: usize = 4096;
332
333#[tracing::instrument(level = "debug", skip(socket, session))]
334async fn websocket_connection(socket: WebSocket, session: HoprSession) {
335    let session_id = *session.id();
336
337    let (rx, mut tx) = session.split();
338    let (mut sender, receiver) = socket.split();
339
340    let mut queue = (
341        receiver.map(WebSocketInput::WsInput),
342        AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
343    )
344        .merge();
345
346    let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
347
348    while let Some(v) = queue.next().await {
349        match v {
350            WebSocketInput::Network(bytes) => match bytes {
351                Ok(bytes) => {
352                    let len = bytes.len();
353                    if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
354                        error!(
355                            error = %e,
356                            "Failed to emit read data onto the websocket, closing connection"
357                        );
358                        break;
359                    };
360                    bytes_from_session += len;
361                }
362                Err(e) => {
363                    error!(
364                        error = %e,
365                        "Failed to push data from network to socket, closing connection"
366                    );
367                    break;
368                }
369            },
370            WebSocketInput::WsInput(ws_in) => match ws_in {
371                Ok(Message::Binary(data)) => {
372                    let len = data.len();
373                    if let Err(e) = tx.write(data.as_ref()).await {
374                        error!(error = %e, "Failed to write data to the session, closing connection");
375                        break;
376                    }
377                    bytes_to_session += len;
378                }
379                Ok(Message::Text(_)) => {
380                    error!("Received string instead of binary data, closing connection");
381                    break;
382                }
383                Ok(Message::Close(_)) => {
384                    debug!("Received close frame, closing connection");
385                    break;
386                }
387                Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
388                Err(e) => {
389                    error!(error = %e, "Failed to get a valid websocket message, closing connection");
390                    break;
391                }
392            },
393        }
394    }
395
396    info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
397}
398
399#[serde_as]
400#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
401#[schema(example = json!({ "Hops": 1 }))]
402/// Routing options for the Session.
403pub enum RoutingOptions {
404    #[cfg(feature = "explicit-path")]
405    #[schema(value_type = Vec<String>)]
406    IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<Address>),
407    Hops(usize),
408}
409
410impl RoutingOptions {
411    pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
412        Ok(match self {
413            #[cfg(feature = "explicit-path")]
414            RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
415            RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
416        })
417    }
418}
419
420#[serde_as]
421#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
422#[schema(example = json!({
423        "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
424        "forwardPath": { "Hops": 1 },
425        "returnPath": { "Hops": 1 },
426        "target": {"Plain": "localhost:8080"},
427        "listenHost": "127.0.0.1:10000",
428        "capabilities": ["Retransmission", "Segmentation"],
429        "responseBuffer": "2 MB",
430        "maxSurbUpstream": "2000 kb/s",
431        "sessionPool": 0,
432        "maxClientSessions": 2
433    }))]
434#[serde(rename_all = "camelCase")]
435/// Request body for creating a new client session.
436pub(crate) struct SessionClientRequest {
437    /// Address of the Exit node.
438    #[serde_as(as = "DisplayFromStr")]
439    #[schema(value_type = String)]
440    pub destination: Address,
441    /// The forward path for the Session.
442    pub forward_path: RoutingOptions,
443    /// The return path for the Session.
444    pub return_path: RoutingOptions,
445    /// Target for the Session.
446    pub target: SessionTargetSpec,
447    /// Listen host (`ip:port`) for the Session socket at the Entry node.
448    ///
449    /// Supports also partial specification (only `ip` or only `:port`) with the
450    /// respective part replaced by the node's configured default.
451    pub listen_host: Option<String>,
452    #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
453    /// Capabilities for the Session protocol.
454    ///
455    /// Defaults to `Segmentation` and `Retransmission` for TCP and nothing for UDP.
456    pub capabilities: Option<Vec<SessionCapability>>,
457    /// The amount of response data the Session counterparty can deliver back to us,
458    /// without us sending any SURBs to them.
459    ///
460    /// In other words, this size is recalculated to a number of SURBs delivered
461    /// to the counterparty upfront and then maintained.
462    /// The maintenance is dynamic, based on the number of responses we receive.
463    ///
464    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
465    /// at least the size of 2 Session packet payloads.
466    #[serde_as(as = "Option<DisplayFromStr>")]
467    #[schema(value_type = Option<String>)]
468    pub response_buffer: Option<bytesize::ByteSize>,
469    /// The maximum throughput at which artificial SURBs might be generated and sent
470    /// to the recipient of the Session.
471    ///
472    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
473    /// this should roughly match the maximum retrieval throughput.
474    ///
475    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
476    #[serde(default)]
477    #[serde(with = "human_bandwidth::option")]
478    #[schema(value_type = Option<String>)]
479    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
480    /// How many Sessions to pool for clients.
481    ///
482    /// If no sessions are pooled, they will be opened ad-hoc when a client connects.
483    /// It has no effect on UDP sessions in the current implementation.
484    ///
485    /// Currently, the maximum value is 5.
486    pub session_pool: Option<usize>,
487    /// The maximum number of client sessions that the listener can spawn.
488    ///
489    /// This currently applies only to the TCP sessions, as UDP sessions cannot
490    /// handle multiple clients (and spawn therefore always only a single session).
491    ///
492    /// If this value is smaller than the value specified in `session_pool`, it will
493    /// be set to that value.
494    ///
495    /// The default value is 5.
496    pub max_client_sessions: Option<usize>,
497}
498
499impl SessionClientRequest {
500    pub(crate) async fn into_protocol_session_config(
501        self,
502        target_protocol: IpProtocol,
503    ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
504        Ok((
505            self.destination,
506            self.target.into_target(target_protocol)?,
507            SessionClientConfig {
508                forward_path_options: self.forward_path.resolve().await?,
509                return_path_options: self.return_path.resolve().await?,
510                capabilities: self
511                    .capabilities
512                    .map(|vs| {
513                        let mut caps = SessionCapabilities::empty();
514                        caps.extend(vs.into_iter().map(SessionCapabilities::from));
515                        caps
516                    })
517                    .unwrap_or_else(|| match target_protocol {
518                        IpProtocol::TCP => {
519                            hopr_lib::SessionCapability::RetransmissionAck
520                                | hopr_lib::SessionCapability::RetransmissionNack
521                                | hopr_lib::SessionCapability::Segmentation
522                        }
523                        // Only Segmentation capability for UDP per default
524                        _ => SessionCapability::Segmentation.into(),
525                    }),
526                surb_management: SessionConfig {
527                    response_buffer: self.response_buffer,
528                    max_surb_upstream: self.max_surb_upstream,
529                }
530                .into(),
531                ..Default::default()
532            },
533        ))
534    }
535}
536
537#[serde_as]
538#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
539#[schema(example = json!({
540        "target": "example.com:80",
541        "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
542        "forwardPath": { "Hops": 1 },
543        "returnPath": { "Hops": 1 },
544        "protocol": "tcp",
545        "ip": "127.0.0.1",
546        "port": 5542,
547        "hoprMtu": 1002,
548        "surbLen": 398,
549        "activeClients": [],
550        "maxClientSessions": 2,
551        "maxSurbUpstream": "2000 kb/s",
552        "responseBuffer": "2 MB",
553        "sessionPool": 0
554    }))]
555#[serde(rename_all = "camelCase")]
556/// Response body for creating a new client session.
557pub(crate) struct SessionClientResponse {
558    #[schema(example = "example.com:80")]
559    /// Target of the Session.
560    pub target: String,
561    /// Destination node (exit node) of the Session.
562    #[serde_as(as = "DisplayFromStr")]
563    #[schema(value_type = String)]
564    pub destination: Address,
565    /// Forward routing path.
566    pub forward_path: RoutingOptions,
567    /// Return routing path.
568    pub return_path: RoutingOptions,
569    /// IP protocol used by Session's listening socket.
570    #[serde_as(as = "DisplayFromStr")]
571    #[schema(example = "tcp")]
572    pub protocol: IpProtocol,
573    /// Listening IP address of the Session's socket.
574    #[schema(example = "127.0.0.1")]
575    pub ip: String,
576    #[schema(example = 5542)]
577    /// Listening port of the Session's socket.
578    pub port: u16,
579    /// MTU used by the underlying HOPR transport.
580    pub hopr_mtu: usize,
581    /// Size of a Single Use Reply Block used by the protocol.
582    ///
583    /// This is useful for SURB balancing calculations.
584    pub surb_len: usize,
585    /// Lists Session IDs of all active clients.
586    ///
587    /// Can contain multiple entries on TCP sessions, but currently
588    /// always only a single entry on UDP sessions.
589    pub active_clients: Vec<String>,
590    /// The maximum number of client sessions that the listener can spawn.
591    ///
592    /// This currently applies only to the TCP sessions, as UDP sessions cannot
593    /// have multiple clients (defaults to 1 for UDP).
594    pub max_client_sessions: usize,
595    /// The maximum throughput at which artificial SURBs might be generated and sent
596    /// to the recipient of the Session.    
597    #[serde(default)]
598    #[serde(with = "human_bandwidth::option")]
599    #[schema(value_type = Option<String>)]
600    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
601    /// The amount of response data the Session counterparty can deliver back to us, without us
602    /// sending any SURBs to them.
603    #[serde_as(as = "Option<DisplayFromStr>")]
604    #[schema(value_type = Option<String>)]
605    pub response_buffer: Option<bytesize::ByteSize>,
606    /// How many Sessions to pool for clients.
607    pub session_pool: Option<usize>,
608}
609
610/// This function first tries to parse `requested` as the `ip:port` host pair.
611/// If that does not work, it tries to parse `requested` as a single IP address
612/// and as a `:` prefixed port number. Whichever of those fails, is replaced by the corresponding
613/// part from the given `default`.
614fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
615    match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
616        Some(Err(requested)) => {
617            // If the requested host is not parseable as a whole as `SocketAddr`, try only its parts
618            debug!(requested, %default, "using partially default listen host");
619            std::net::SocketAddr::new(
620                requested.parse().unwrap_or(default.ip()),
621                requested
622                    .strip_prefix(":")
623                    .and_then(|p| u16::from_str(p).ok())
624                    .unwrap_or(default.port()),
625            )
626        }
627        Some(Ok(requested)) => {
628            debug!(%requested, "using requested listen host");
629            requested
630        }
631        None => {
632            debug!(%default, "using default listen host");
633            default
634        }
635    }
636}
637
638struct SessionPool {
639    pool: Option<Arc<std::sync::Mutex<VecDeque<HoprSession>>>>,
640    ah: Option<AbortHandle>,
641}
642
643impl SessionPool {
644    pub const MAX_SESSION_POOL_SIZE: usize = 5;
645
646    async fn new(
647        size: usize,
648        dst: Address,
649        target: SessionTarget,
650        cfg: SessionClientConfig,
651        hopr: Arc<Hopr>,
652    ) -> Result<Self, (StatusCode, ApiErrorStatus)> {
653        let pool = Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(size)));
654        let hopr_clone = hopr.clone();
655        let pool_clone = pool.clone();
656        futures::stream::iter(0..size.min(Self::MAX_SESSION_POOL_SIZE))
657            .map(Ok)
658            .try_for_each_concurrent(Self::MAX_SESSION_POOL_SIZE, move |i| {
659                let pool = pool_clone.clone();
660                let hopr = hopr_clone.clone();
661                let target = target.clone();
662                let cfg = cfg.clone();
663                async move {
664                    match hopr.connect_to(dst, target.clone(), cfg.clone()).await {
665                        Ok(s) => {
666                            debug!(session_id = %s.id(), num_session = i, "created a new session in pool");
667                            pool.lock()
668                                .map_err(|_| {
669                                    (
670                                        StatusCode::INTERNAL_SERVER_ERROR,
671                                        ApiErrorStatus::UnknownFailure("lock failed".into()),
672                                    )
673                                })?
674                                .push_back(s);
675                            Ok(())
676                        }
677                        Err(error) => {
678                            error!(%error, num_session = i, "failed to establish session for pool");
679                            Err((
680                                StatusCode::INTERNAL_SERVER_ERROR,
681                                ApiErrorStatus::UnknownFailure(format!(
682                                    "failed to establish session #{i} in pool to {dst}: {error}"
683                                )),
684                            ))
685                        }
686                    }
687                }
688            })
689            .await?;
690
691        // Spawn a task that periodically sends keep alive messages to the Session in the pool.
692        if !pool.lock().map(|p| p.is_empty()).unwrap_or(true) {
693            let pool_clone_1 = pool.clone();
694            let pool_clone_2 = pool.clone();
695            let pool_clone_3 = pool.clone();
696            Ok(Self {
697                pool: Some(pool),
698                ah: Some(hopr_async_runtime::spawn_as_abortable!(
699                    futures_time::stream::interval(futures_time::time::Duration::from(
700                        std::time::Duration::from_secs(1).max(hopr.config().session.idle_timeout / 2)
701                    ))
702                    .take_while(move |_| {
703                        // Continue the infinite interval stream until there are sessions in the pool
704                        futures::future::ready(pool_clone_1.lock().is_ok_and(|p| !p.is_empty()))
705                    })
706                    .flat_map(move |_| {
707                        // Get all SessionIds of the remaining Sessions in the pool
708                        let ids = pool_clone_2.lock().ok().map(|v| v.iter().map(|s| *s.id()).collect::<Vec<_>>());
709                        futures::stream::iter(ids.into_iter().flatten())
710                    })
711                    .for_each(move |id| {
712                        let hopr = hopr.clone();
713                        let pool = pool_clone_3.clone();
714                        async move {
715                            // Make sure the Session is still alive, otherwise remove it from the pool
716                            if let Err(error) = hopr.keep_alive_session(&id).await {
717                                error!(%error, %dst, session_id = %id, "session in pool is not alive, removing from pool");
718                                if let Ok(mut pool) = pool.lock() {
719                                    pool.retain(|s| *s.id() != id);
720                                }
721                            }
722                        }
723                    })
724                ))
725            })
726        } else {
727            Ok(Self { pool: None, ah: None })
728        }
729    }
730
731    fn pop(&mut self) -> Option<HoprSession> {
732        self.pool.as_ref().and_then(|pool| pool.lock().ok()?.pop_front())
733    }
734}
735
736impl Drop for SessionPool {
737    fn drop(&mut self) {
738        if let Some(ah) = self.ah.take() {
739            ah.abort();
740        }
741    }
742}
743
744async fn create_tcp_client_binding(
745    bind_host: std::net::SocketAddr,
746    state: Arc<InternalState>,
747    args: SessionClientRequest,
748) -> Result<(std::net::SocketAddr, Option<HoprSessionId>, usize), (StatusCode, ApiErrorStatus)> {
749    let target_spec = args.target.clone();
750    let (dst, target, data) = args
751        .clone()
752        .into_protocol_session_config(IpProtocol::TCP)
753        .await
754        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
755
756    // Bind the TCP socket first
757    let (bound_host, tcp_listener) = tcp_listen_on(bind_host).await.map_err(|e| {
758        if e.kind() == std::io::ErrorKind::AddrInUse {
759            (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
760        } else {
761            (
762                StatusCode::UNPROCESSABLE_ENTITY,
763                ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
764            )
765        }
766    })?;
767    info!(%bound_host, "TCP session listener bound");
768
769    // For each new TCP connection coming to the listener,
770    // open a Session with the same parameters
771    let hopr = state.hopr.clone();
772
773    // Create a session pool if requested
774    let session_pool_size = args.session_pool.unwrap_or(0);
775    let mut session_pool = SessionPool::new(session_pool_size, dst, target.clone(), data.clone(), hopr.clone()).await?;
776
777    let active_sessions = Arc::new(DashMap::new());
778    let mut max_clients = args.max_client_sessions.unwrap_or(5).max(1);
779
780    if max_clients < session_pool_size {
781        max_clients = session_pool_size;
782    }
783
784    // Create an abort handler for the listener
785    let (abort_handle, abort_reg) = AbortHandle::new_pair();
786    let active_sessions_clone = active_sessions.clone();
787    hopr_async_runtime::prelude::spawn(async move {
788        let active_sessions_clone_2 = active_sessions_clone.clone();
789
790        futures::stream::Abortable::new(tokio_stream::wrappers::TcpListenerStream::new(tcp_listener), abort_reg)
791            .and_then(|sock| async { Ok((sock.peer_addr()?, sock)) })
792            .for_each(move |accepted_client| {
793                let data = data.clone();
794                let target = target.clone();
795                let hopr = hopr.clone();
796                let active_sessions = active_sessions_clone_2.clone();
797
798                // Try to pop from the pool only if a client was accepted
799                let maybe_pooled_session = accepted_client.is_ok().then(|| session_pool.pop()).flatten();
800                async move {
801                    match accepted_client {
802                        Ok((sock_addr, mut stream)) => {
803                            debug!(?sock_addr, "incoming TCP connection");
804
805                            // Check that we are still within the quota,
806                            // otherwise shutdown the new client immediately
807                            if active_sessions.len() >= max_clients {
808                                error!(?bind_host, "no more client slots available at listener");
809                                use tokio::io::AsyncWriteExt;
810                                if let Err(error) = stream.shutdown().await {
811                                    error!(%error, ?sock_addr, "failed to shutdown TCP connection");
812                                }
813                                return;
814                            }
815
816                            // See if we still have some session pooled
817                            let session = match maybe_pooled_session {
818                                Some(s) => {
819                                    debug!(session_id = %s.id(), "using pooled session");
820                                    s
821                                }
822                                None => {
823                                    debug!("no more active sessions in the pool, creating a new one");
824                                    match hopr.connect_to(dst, target, data).await {
825                                        Ok(s) => s,
826                                        Err(error) => {
827                                            error!(%error, "failed to establish session");
828                                            return;
829                                        }
830                                    }
831                                }
832                            };
833
834                            let session_id = *session.id();
835                            debug!(?sock_addr, %session_id, "new session for incoming TCP connection");
836
837                            let (abort_handle, abort_reg) = AbortHandle::new_pair();
838                            active_sessions.insert(session_id, (sock_addr, abort_handle));
839
840                            #[cfg(all(feature = "prometheus", not(test)))]
841                            METRIC_ACTIVE_CLIENTS.increment(&["tcp"], 1.0);
842
843                            hopr_async_runtime::prelude::spawn(
844                                // The stream either terminates naturally (by the client closing the TCP connection)
845                                // or is terminated via the abort handle.
846                                bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, Some(abort_reg)).then(
847                                    move |_| async move {
848                                        // Regardless how the session ended, remove the abort handle
849                                        // from the map
850                                        active_sessions.remove(&session_id);
851
852                                        debug!(%session_id, "tcp session has ended");
853
854                                        #[cfg(all(feature = "prometheus", not(test)))]
855                                        METRIC_ACTIVE_CLIENTS.decrement(&["tcp"], 1.0);
856                                    },
857                                ),
858                            );
859                        }
860                        Err(error) => error!(%error, "failed to accept connection"),
861                    }
862                }
863            })
864            .await;
865
866        // Once the listener is done, abort all active sessions created by the listener
867        active_sessions_clone.iter().for_each(|entry| {
868            let (sock_addr, handle) = entry.value();
869            debug!(session_id = %entry.key(), ?sock_addr, "aborting opened TCP session after listener has been closed");
870            handle.abort()
871        });
872    });
873
874    state.open_listeners.write_arc().await.insert(
875        ListenerId(hopr_network_types::types::IpProtocol::TCP, bound_host),
876        StoredSessionEntry {
877            destination: dst,
878            target: target_spec.clone(),
879            forward_path: args.forward_path.clone(),
880            return_path: args.return_path.clone(),
881            clients: active_sessions,
882            max_client_sessions: max_clients,
883            max_surb_upstream: args.max_surb_upstream,
884            response_buffer: args.response_buffer,
885            session_pool: Some(session_pool_size),
886            abort_handle,
887        },
888    );
889    Ok((bound_host, None, max_clients))
890}
891
892async fn create_udp_client_binding(
893    bind_host: std::net::SocketAddr,
894    state: Arc<InternalState>,
895    args: SessionClientRequest,
896) -> Result<(std::net::SocketAddr, Option<HoprSessionId>, usize), (StatusCode, ApiErrorStatus)> {
897    let target_spec = args.target.clone();
898    let (dst, target, data) = args
899        .clone()
900        .into_protocol_session_config(IpProtocol::UDP)
901        .await
902        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
903
904    // Bind the UDP socket first
905    let (bound_host, udp_socket) = udp_bind_to(bind_host).await.map_err(|e| {
906        if e.kind() == std::io::ErrorKind::AddrInUse {
907            (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
908        } else {
909            (
910                StatusCode::UNPROCESSABLE_ENTITY,
911                ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
912            )
913        }
914    })?;
915
916    info!(%bound_host, "UDP session listener bound");
917
918    let hopr = state.hopr.clone();
919
920    // Create a single session for the UDP socket
921    let session = hopr.connect_to(dst, target, data.clone()).await.map_err(|e| {
922        (
923            StatusCode::UNPROCESSABLE_ENTITY,
924            ApiErrorStatus::UnknownFailure(e.to_string()),
925        )
926    })?;
927
928    let open_listeners_clone = state.open_listeners.clone();
929    let listener_id = ListenerId(hopr_network_types::types::IpProtocol::UDP, bound_host);
930
931    // Create an abort handle so that the Session can be terminated by aborting
932    // the UDP stream first. Because under the hood, the bind_session_to_stream uses
933    // `transfer_session` which in turn uses `copy_duplex_abortable`, aborting the
934    // `udp_socket` will:
935    //
936    // 1. Initiate graceful shutdown of `udp_socket`
937    // 2. Once done, initiate a graceful shutdown of `session`
938    // 3. Finally, return from the `bind_session_to_stream` which will terminate the spawned task
939    //
940    // This is needed because the `udp_socket` cannot terminate by itself.
941    let (abort_handle, abort_reg) = AbortHandle::new_pair();
942    let clients = Arc::new(DashMap::new());
943    let max_clients: usize = 1; // Maximum number of clients for this session. Currently always 1.
944
945    // TODO: add multiple client support to UDP sessions (#7370)
946    let session_id = *session.id();
947    clients.insert(session_id, (bind_host, abort_handle.clone()));
948    hopr_async_runtime::prelude::spawn(async move {
949        #[cfg(all(feature = "prometheus", not(test)))]
950        METRIC_ACTIVE_CLIENTS.increment(&["udp"], 1.0);
951
952        bind_session_to_stream(session, udp_socket, HOPR_UDP_BUFFER_SIZE, Some(abort_reg)).await;
953
954        #[cfg(all(feature = "prometheus", not(test)))]
955        METRIC_ACTIVE_CLIENTS.decrement(&["udp"], 1.0);
956
957        // Once the Session closes, remove it from the list
958        open_listeners_clone.write_arc().await.remove(&listener_id);
959    });
960
961    state.open_listeners.write_arc().await.insert(
962        listener_id,
963        StoredSessionEntry {
964            destination: dst,
965            target: target_spec.clone(),
966            forward_path: args.forward_path.clone(),
967            return_path: args.return_path.clone(),
968            max_client_sessions: max_clients,
969            max_surb_upstream: args.max_surb_upstream,
970            response_buffer: args.response_buffer,
971            session_pool: None,
972            abort_handle,
973            clients,
974        },
975    );
976    Ok((bound_host, Some(session_id), max_clients))
977}
978
979/// Creates a new client session returning the given session listening host and port over TCP or UDP.
980/// If no listening port is given in the request, the socket will be bound to a random free
981/// port and returned in the response.
982/// Different capabilities can be configured for the session, such as data segmentation or
983/// retransmission.
984///
985/// Once the host and port are bound, it is possible to use the socket for bidirectional read/write
986/// communication over the selected IP protocol and HOPR network routing with the given destination.
987/// The destination HOPR node forwards all the data to the given target over the selected IP protocol.
988///
989/// Various services require different types of socket communications:
990/// - services running over UDP usually do not require data retransmission, as it is already expected
991/// that UDP does not provide these and is therefore handled at the application layer.
992/// - On the contrary, services running over TCP *almost always* expect data segmentation and
993/// retransmission capabilities, so these should be configured while creating a session that passes
994/// TCP data.
995#[utoipa::path(
996        post,
997        path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
998        description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
999        params(
1000            ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
1001        ),
1002        request_body(
1003            content = SessionClientRequest,
1004            description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
1005            content_type = "application/json"),
1006        responses(
1007            (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
1008            (status = 400, description = "Invalid IP protocol.", body = ApiError),
1009            (status = 401, description = "Invalid authorization token.", body = ApiError),
1010            (status = 409, description = "Listening address and port already in use.", body = ApiError),
1011            (status = 422, description = "Unknown failure", body = ApiError),
1012        ),
1013        security(
1014            ("api_token" = []),
1015            ("bearer_token" = [])
1016        ),
1017        tag = "Session"
1018    )]
1019pub(crate) async fn create_client(
1020    State(state): State<Arc<InternalState>>,
1021    Path(protocol): Path<IpProtocol>,
1022    Json(args): Json<SessionClientRequest>,
1023) -> Result<impl IntoResponse, impl IntoResponse> {
1024    let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
1025
1026    let listener_id = ListenerId(protocol.into(), bind_host);
1027    if bind_host.port() > 0 && state.open_listeners.read_arc().await.contains_key(&listener_id) {
1028        return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
1029    }
1030
1031    debug!("binding {protocol} session listening socket to {bind_host}");
1032    let (bound_host, udp_session_id, max_clients) = match protocol {
1033        IpProtocol::TCP => create_tcp_client_binding(bind_host, state.clone(), args.clone()).await?,
1034        IpProtocol::UDP => create_udp_client_binding(bind_host, state.clone(), args.clone()).await?,
1035    };
1036
1037    Ok::<_, (StatusCode, ApiErrorStatus)>(
1038        (
1039            StatusCode::OK,
1040            Json(SessionClientResponse {
1041                protocol,
1042                ip: bound_host.ip().to_string(),
1043                port: bound_host.port(),
1044                target: args.target.to_string(),
1045                destination: args.destination,
1046                forward_path: args.forward_path.clone(),
1047                return_path: args.return_path.clone(),
1048                hopr_mtu: SESSION_MTU,
1049                surb_len: SURB_SIZE,
1050                active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
1051                max_client_sessions: max_clients,
1052                max_surb_upstream: args.max_surb_upstream,
1053                response_buffer: args.response_buffer,
1054                session_pool: args.session_pool,
1055            }),
1056        )
1057            .into_response(),
1058    )
1059}
1060
1061/// Lists existing Session listeners for the given IP protocol.
1062#[utoipa::path(
1063    get,
1064    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
1065    description = "Lists existing Session listeners for the given IP protocol.",
1066    params(
1067        ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
1068    ),
1069    responses(
1070        (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
1071            {
1072                "target": "example.com:80",
1073                "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
1074                "forwardPath": { "Hops": 1 },
1075                "returnPath": { "Hops": 1 },
1076                "protocol": "tcp",
1077                "ip": "127.0.0.1",
1078                "port": 5542,
1079                "surbLen": 400,
1080                "hoprMtu": 1020,
1081                "activeClients": [],
1082                "maxClientSessions": 2,
1083                "maxSurbUpstream": "2000 kb/s",
1084                "responseBuffer": "2 MB",
1085                "sessionPool": 0
1086            }
1087        ])),
1088        (status = 400, description = "Invalid IP protocol.", body = ApiError),
1089        (status = 401, description = "Invalid authorization token.", body = ApiError),
1090        (status = 422, description = "Unknown failure", body = ApiError)
1091    ),
1092    security(
1093        ("api_token" = []),
1094        ("bearer_token" = [])
1095    ),
1096    tag = "Session",
1097)]
1098pub(crate) async fn list_clients(
1099    State(state): State<Arc<InternalState>>,
1100    Path(protocol): Path<IpProtocol>,
1101) -> Result<impl IntoResponse, impl IntoResponse> {
1102    let response = state
1103        .open_listeners
1104        .read_arc()
1105        .await
1106        .iter()
1107        .filter(|(id, _)| id.0 == protocol.into())
1108        .map(|(id, entry)| SessionClientResponse {
1109            protocol,
1110            ip: id.1.ip().to_string(),
1111            port: id.1.port(),
1112            target: entry.target.to_string(),
1113            forward_path: entry.forward_path.clone(),
1114            return_path: entry.return_path.clone(),
1115            destination: entry.destination,
1116            hopr_mtu: SESSION_MTU,
1117            surb_len: SURB_SIZE,
1118            active_clients: entry.clients.iter().map(|e| e.key().to_string()).collect(),
1119            max_client_sessions: entry.max_client_sessions,
1120            max_surb_upstream: entry.max_surb_upstream,
1121            response_buffer: entry.response_buffer,
1122            session_pool: entry.session_pool,
1123        })
1124        .collect::<Vec<_>>();
1125
1126    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
1127}
1128
1129#[serde_as]
1130#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
1131#[schema(example = json!({
1132        "responseBuffer": "2 MB",
1133        "maxSurbUpstream": "2 Mbps"
1134    }))]
1135#[serde(rename_all = "camelCase")]
1136pub(crate) struct SessionConfig {
1137    /// The amount of response data the Session counterparty can deliver back to us,
1138    /// without us sending any SURBs to them.
1139    ///
1140    /// In other words, this size is recalculated to a number of SURBs delivered
1141    /// to the counterparty upfront and then maintained.
1142    /// The maintenance is dynamic, based on the number of responses we receive.
1143    ///
1144    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
1145    /// at least the size of 2 Session packet payloads.
1146    #[serde(default)]
1147    #[serde_as(as = "Option<DisplayFromStr>")]
1148    #[schema(value_type = String)]
1149    pub response_buffer: Option<bytesize::ByteSize>,
1150    /// The maximum throughput at which artificial SURBs might be generated and sent
1151    /// to the recipient of the Session.
1152    ///
1153    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
1154    /// this should roughly match the maximum retrieval throughput.
1155    ///
1156    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
1157    #[serde(default)]
1158    #[serde(with = "human_bandwidth::option")]
1159    #[schema(value_type = String)]
1160    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
1161}
1162
1163impl From<SessionConfig> for Option<SurbBalancerConfig> {
1164    fn from(value: SessionConfig) -> Self {
1165        match value.response_buffer {
1166            // Buffer worth at least 2 reply packets
1167            Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
1168                target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
1169                max_surbs_per_sec: value
1170                    .max_surb_upstream
1171                    .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
1172                    .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
1173                ..Default::default()
1174            }),
1175            // No additional SURBs are set up and maintained, useful for high-send low-reply sessions
1176            Some(_) => None,
1177            // Use defaults otherwise
1178            None => Some(SurbBalancerConfig::default()),
1179        }
1180    }
1181}
1182
1183impl From<SurbBalancerConfig> for SessionConfig {
1184    fn from(value: SurbBalancerConfig) -> Self {
1185        Self {
1186            response_buffer: Some(bytesize::ByteSize::b(
1187                value.target_surb_buffer_size * SESSION_MTU as u64,
1188            )),
1189            max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
1190                value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
1191            )),
1192        }
1193    }
1194}
1195
1196#[utoipa::path(
1197    post,
1198    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
1199    description = "Updates configuration of an existing active session.",
1200    params(
1201        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
1202    ),
1203    request_body(
1204            content = SessionConfig,
1205            description = "Allows updating of several parameters of an existing active session.",
1206            content_type = "application/json"),
1207    responses(
1208            (status = 204, description = "Successfully updated the configuration"),
1209            (status = 400, description = "Invalid configuration.", body = ApiError),
1210            (status = 401, description = "Invalid authorization token.", body = ApiError),
1211            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
1212            (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
1213            (status = 422, description = "Unknown failure", body = ApiError),
1214    ),
1215    security(
1216            ("api_token" = []),
1217            ("bearer_token" = [])
1218    ),
1219    tag = "Session"
1220)]
1221pub(crate) async fn adjust_session(
1222    State(state): State<Arc<InternalState>>,
1223    Path(session_id): Path<String>,
1224    Json(args): Json<SessionConfig>,
1225) -> Result<impl IntoResponse, impl IntoResponse> {
1226    let session_id = HoprSessionId::from_str(&session_id)
1227        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
1228
1229    if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
1230        match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
1231            Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
1232            Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
1233                SessionManagerError::NonExistingSession,
1234            )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
1235            Err(e) => Err((
1236                StatusCode::NOT_ACCEPTABLE,
1237                ApiErrorStatus::UnknownFailure(e.to_string()),
1238            )),
1239        }
1240    } else {
1241        Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
1242    }
1243}
1244
1245#[utoipa::path(
1246    get,
1247    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
1248    description = "Gets configuration of an existing active session.",
1249    params(
1250        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
1251    ),
1252    responses(
1253            (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
1254            (status = 400, description = "Invalid session ID.", body = ApiError),
1255            (status = 401, description = "Invalid authorization token.", body = ApiError),
1256            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
1257            (status = 422, description = "Unknown failure", body = ApiError),
1258    ),
1259    security(
1260            ("api_token" = []),
1261            ("bearer_token" = [])
1262    ),
1263    tag = "Session"
1264)]
1265pub(crate) async fn session_config(
1266    State(state): State<Arc<InternalState>>,
1267    Path(session_id): Path<String>,
1268) -> Result<impl IntoResponse, impl IntoResponse> {
1269    let session_id = HoprSessionId::from_str(&session_id)
1270        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
1271
1272    match state.hopr.get_session_surb_balancer_config(&session_id).await {
1273        Ok(Some(cfg)) => {
1274            Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
1275        }
1276        Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
1277        Err(e) => Err((
1278            StatusCode::UNPROCESSABLE_ENTITY,
1279            ApiErrorStatus::UnknownFailure(e.to_string()),
1280        )),
1281    }
1282}
1283
1284#[derive(
1285    Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
1286)]
1287#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
1288#[serde(rename_all = "lowercase")]
1289#[schema(example = "tcp")]
1290/// IP transport protocol
1291pub enum IpProtocol {
1292    #[allow(clippy::upper_case_acronyms)]
1293    TCP,
1294    #[allow(clippy::upper_case_acronyms)]
1295    UDP,
1296}
1297
1298impl From<IpProtocol> for hopr_lib::IpProtocol {
1299    fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
1300        match protocol {
1301            IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
1302            IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
1303        }
1304    }
1305}
1306
1307#[serde_as]
1308#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
1309pub struct SessionCloseClientQuery {
1310    #[serde_as(as = "DisplayFromStr")]
1311    #[schema(value_type = String, example = "tcp")]
1312    /// IP transport protocol
1313    pub protocol: IpProtocol,
1314
1315    /// Listening IP address of the Session.
1316    #[schema(example = "127.0.0.1:8545")]
1317    pub ip: String,
1318
1319    /// Session port used for the listener.
1320    #[schema(value_type = u16, example = 10101)]
1321    pub port: u16,
1322}
1323
1324/// Closes an existing Session listener.
1325/// The listener must've been previously created and bound for the given IP protocol.
1326/// Once a listener is closed, no more socket connections can be made to it.
1327/// If the passed port number is 0, listeners on all ports of the given listening IP and protocol
1328/// will be closed.
1329#[utoipa::path(
1330    delete,
1331    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
1332    description = "Closes an existing Session listener.",
1333    params(SessionCloseClientQuery),
1334    responses(
1335            (status = 204, description = "Listener closed successfully"),
1336            (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
1337            (status = 401, description = "Invalid authorization token.", body = ApiError),
1338            (status = 404, description = "Listener not found.", body = ApiError),
1339            (status = 422, description = "Unknown failure", body = ApiError)
1340    ),
1341    security(
1342            ("api_token" = []),
1343            ("bearer_token" = [])
1344    ),
1345    tag = "Session",
1346)]
1347pub(crate) async fn close_client(
1348    State(state): State<Arc<InternalState>>,
1349    Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
1350) -> Result<impl IntoResponse, impl IntoResponse> {
1351    let listening_ip: IpAddr = ip
1352        .parse()
1353        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
1354
1355    {
1356        let mut open_listeners = state.open_listeners.write_arc().await;
1357
1358        let mut to_remove = Vec::new();
1359
1360        // Find all listeners with protocol, listening IP and optionally port number (if > 0)
1361        open_listeners
1362            .iter()
1363            .filter(|(ListenerId(proto, addr), _)| {
1364                let protocol: hopr_lib::IpProtocol = protocol.into();
1365                protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1366            })
1367            .for_each(|(id, _)| to_remove.push(*id));
1368
1369        if to_remove.is_empty() {
1370            return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1371        }
1372
1373        for bound_addr in to_remove {
1374            let entry = open_listeners
1375                .remove(&bound_addr)
1376                .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1377
1378            entry.abort_handle.abort();
1379        }
1380    }
1381
1382    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1383}
1384
1385async fn try_restricted_bind<F, S, Fut>(
1386    addrs: Vec<std::net::SocketAddr>,
1387    range_str: &str,
1388    binder: F,
1389) -> std::io::Result<S>
1390where
1391    F: Fn(Vec<std::net::SocketAddr>) -> Fut,
1392    Fut: Future<Output = std::io::Result<S>>,
1393{
1394    if addrs.is_empty() {
1395        return Err(std::io::Error::other("no valid socket addresses found"));
1396    }
1397
1398    let range = range_str
1399        .split_once(":")
1400        .and_then(
1401            |(a, b)| match u16::from_str(a).and_then(|a| Ok((a, u16::from_str(b)?))) {
1402                Ok((a, b)) if a <= b => Some(a..=b),
1403                _ => None,
1404            },
1405        )
1406        .ok_or(std::io::Error::other(format!("invalid port range {range_str}")))?;
1407
1408    for port in range {
1409        let addrs = addrs
1410            .iter()
1411            .map(|addr| std::net::SocketAddr::new(addr.ip(), port))
1412            .collect::<Vec<_>>();
1413        match binder(addrs).await {
1414            Ok(listener) => return Ok(listener),
1415            Err(error) => debug!(%error, "listen address not usable"),
1416        }
1417    }
1418
1419    Err(std::io::Error::new(
1420        std::io::ErrorKind::AddrNotAvailable,
1421        format!("no valid socket addresses found within range: {range_str}"),
1422    ))
1423}
1424
1425async fn tcp_listen_on<A: std::net::ToSocketAddrs>(address: A) -> std::io::Result<(std::net::SocketAddr, TcpListener)> {
1426    let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
1427
1428    // If automatic port allocation is requested and there's a restriction on the port range
1429    // (via HOPRD_SESSION_PORT_RANGE), try to find an address within that range.
1430    if addrs.iter().all(|a| a.port() == 0) {
1431        if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
1432            let tcp_listener =
1433                try_restricted_bind(
1434                    addrs,
1435                    &range_str,
1436                    |a| async move { TcpListener::bind(a.as_slice()).await },
1437                )
1438                .await?;
1439            return Ok((tcp_listener.local_addr()?, tcp_listener));
1440        }
1441    }
1442
1443    let tcp_listener = TcpListener::bind(addrs.as_slice()).await?;
1444    Ok((tcp_listener.local_addr()?, tcp_listener))
1445}
1446
1447async fn udp_bind_to<A: std::net::ToSocketAddrs>(
1448    address: A,
1449) -> std::io::Result<(std::net::SocketAddr, ConnectedUdpStream)> {
1450    let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
1451
1452    let builder = ConnectedUdpStream::builder()
1453        .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
1454        .with_foreign_data_mode(ForeignDataMode::Discard) // discard data from UDP clients other than the first one served
1455        .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1456        .with_receiver_parallelism(
1457            std::env::var("HOPRD_SESSION_ENTRY_UDP_RX_PARALLELISM")
1458                .ok()
1459                .and_then(|s| s.parse::<NonZeroUsize>().ok())
1460                .map(UdpStreamParallelism::Specific)
1461                .unwrap_or(UdpStreamParallelism::Auto),
1462        );
1463
1464    // If automatic port allocation is requested and there's a restriction on the port range
1465    // (via HOPRD_SESSION_PORT_RANGE), try to find an address within that range.
1466    if addrs.iter().all(|a| a.port() == 0) {
1467        if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
1468            let udp_listener = try_restricted_bind(addrs, &range_str, |addrs| {
1469                futures::future::ready(builder.clone().build(addrs.as_slice()))
1470            })
1471            .await?;
1472
1473            return Ok((*udp_listener.bound_address(), udp_listener));
1474        }
1475    }
1476
1477    let udp_socket = builder.build(address)?;
1478    Ok((*udp_socket.bound_address(), udp_socket))
1479}
1480
1481async fn bind_session_to_stream<T>(
1482    mut session: HoprSession,
1483    mut stream: T,
1484    max_buf: usize,
1485    abort_reg: Option<AbortRegistration>,
1486) where
1487    T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
1488{
1489    let session_id = *session.id();
1490    match transfer_session(&mut session, &mut stream, max_buf, abort_reg).await {
1491        Ok((session_to_stream_bytes, stream_to_session_bytes)) => info!(
1492            session_id = ?session_id,
1493            session_to_stream_bytes, stream_to_session_bytes, "client session ended",
1494        ),
1495        Err(error) => error!(
1496            session_id = ?session_id,
1497            %error,
1498            "error during data transfer"
1499        ),
1500    }
1501}
1502
1503#[cfg(test)]
1504mod tests {
1505    use anyhow::Context;
1506    use futures::{
1507        FutureExt, StreamExt,
1508        channel::mpsc::{UnboundedReceiver, UnboundedSender},
1509    };
1510    use futures_time::future::FutureExt as TimeFutureExt;
1511    use hopr_crypto_types::crypto_traits::Randomizable;
1512    use hopr_lib::{ApplicationData, ApplicationDataIn, ApplicationDataOut, HoprPseudonym};
1513    use hopr_network_types::prelude::DestinationRouting;
1514    use tokio::io::{AsyncReadExt, AsyncWriteExt};
1515
1516    use super::*;
1517
1518    fn loopback_transport() -> (
1519        UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
1520        UnboundedReceiver<ApplicationDataIn>,
1521    ) {
1522        let (input_tx, input_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
1523        let (output_tx, output_rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1524        tokio::task::spawn(
1525            input_rx
1526                .map(|(_, data)| {
1527                    Ok(ApplicationDataIn {
1528                        data: data.data,
1529                        packet_info: Default::default(),
1530                    })
1531                })
1532                .forward(output_tx)
1533                .map(|e| tracing::debug!(?e, "loopback transport completed")),
1534        );
1535
1536        (input_tx, output_rx)
1537    }
1538
1539    #[tokio::test]
1540    async fn hoprd_session_connection_should_create_a_working_tcp_socket_through_which_data_can_be_sent_and_received()
1541    -> anyhow::Result<()> {
1542        let session_id = hopr_lib::HoprSessionId::new(4567u64, HoprPseudonym::random());
1543        let peer: hopr_lib::Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
1544        let session = hopr_lib::HoprSession::new(
1545            session_id,
1546            hopr_lib::DestinationRouting::forward_only(
1547                peer,
1548                hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1549            ),
1550            Default::default(),
1551            loopback_transport(),
1552            None,
1553        )?;
1554
1555        let (bound_addr, tcp_listener) = tcp_listen_on(("127.0.0.1", 0)).await.context("listen_on failed")?;
1556
1557        tokio::task::spawn(async move {
1558            match tcp_listener.accept().await {
1559                Ok((stream, _)) => bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, None).await,
1560                Err(e) => error!("failed to accept connection: {e}"),
1561            }
1562        });
1563
1564        let mut tcp_stream = tokio::net::TcpStream::connect(bound_addr)
1565            .await
1566            .context("connect failed")?;
1567
1568        let data = vec![b"hello", b"world", b"this ", b"is   ", b"    a", b" test"];
1569
1570        for d in data.clone().into_iter() {
1571            tcp_stream.write_all(d).await.context("write failed")?;
1572        }
1573
1574        for d in data.iter() {
1575            let mut buf = vec![0; d.len()];
1576            tcp_stream.read_exact(&mut buf).await.context("read failed")?;
1577        }
1578
1579        Ok(())
1580    }
1581
1582    #[test_log::test(tokio::test)]
1583    async fn hoprd_session_connection_should_create_a_working_udp_socket_through_which_data_can_be_sent_and_received()
1584    -> anyhow::Result<()> {
1585        let session_id = hopr_lib::HoprSessionId::new(4567u64, HoprPseudonym::random());
1586        let peer: hopr_lib::Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
1587        let session = hopr_lib::HoprSession::new(
1588            session_id,
1589            hopr_lib::DestinationRouting::forward_only(
1590                peer,
1591                hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1592            ),
1593            Default::default(),
1594            loopback_transport(),
1595            None,
1596        )?;
1597
1598        let (listen_addr, udp_listener) = udp_bind_to(("127.0.0.1", 0)).await.context("udp_bind_to failed")?;
1599
1600        let (abort_handle, abort_registration) = AbortHandle::new_pair();
1601        let jh = tokio::task::spawn(bind_session_to_stream(
1602            session,
1603            udp_listener,
1604            ApplicationData::PAYLOAD_SIZE,
1605            Some(abort_registration),
1606        ));
1607
1608        let mut udp_stream = ConnectedUdpStream::builder()
1609            .with_buffer_size(ApplicationData::PAYLOAD_SIZE)
1610            .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1611            .with_counterparty(listen_addr)
1612            .build(("127.0.0.1", 0))
1613            .context("bind failed")?;
1614
1615        let data = vec![b"hello", b"world", b"this ", b"is   ", b"    a", b" test"];
1616
1617        for d in data.clone().into_iter() {
1618            udp_stream.write_all(d).await.context("write failed")?;
1619            // ConnectedUdpStream performs flush with each write
1620        }
1621
1622        for d in data.iter() {
1623            let mut buf = vec![0; d.len()];
1624            udp_stream.read_exact(&mut buf).await.context("read failed")?;
1625        }
1626
1627        // Once aborted, the bind_session_to_stream task must terminate too
1628        abort_handle.abort();
1629        jh.timeout(futures_time::time::Duration::from_millis(200)).await??;
1630
1631        Ok(())
1632    }
1633
1634    #[test]
1635    fn test_build_binding_address() {
1636        let default = "10.0.0.1:10000".parse().unwrap();
1637
1638        let result = build_binding_host(Some("127.0.0.1:10000"), default);
1639        assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1640
1641        let result = build_binding_host(None, default);
1642        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1643
1644        let result = build_binding_host(Some("127.0.0.1"), default);
1645        assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1646
1647        let result = build_binding_host(Some(":1234"), default);
1648        assert_eq!(result, "10.0.0.1:1234".parse::<std::net::SocketAddr>().unwrap());
1649
1650        let result = build_binding_host(Some(":"), default);
1651        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1652
1653        let result = build_binding_host(Some(""), default);
1654        assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1655    }
1656}