hoprd_api/
session.rs

1use std::{fmt::Formatter, hash::Hash, net::IpAddr, str::FromStr, sync::Arc};
2
3use axum::{
4    Error,
5    extract::{
6        Json, Path, State,
7        ws::{Message, WebSocket, WebSocketUpgrade},
8    },
9    http::status::StatusCode,
10    response::IntoResponse,
11};
12use axum_extra::extract::Query;
13use base64::Engine;
14use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt};
15use futures_concurrency::stream::Merge;
16use hopr_lib::{
17    Address, HoprSession, NodeId, SESSION_MTU, SURB_SIZE, ServiceId, SessionCapabilities, SessionClientConfig,
18    SessionId, SessionManagerError, SessionTarget, SurbBalancerConfig, TransportSessionError,
19    errors::{HoprLibError, HoprTransportError},
20    utils::{
21        futures::AsyncReadStreamer,
22        session::{ListenerId, build_binding_host, create_tcp_client_binding, create_udp_client_binding},
23    },
24};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use tracing::{debug, error, info, trace};
28
29use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState};
30
31/// Size of the buffer for forwarding data to/from a TCP stream.
32pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
33
34/// Size of the buffer for forwarding data to/from a UDP stream.
35pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
36
37/// Size of the queue (back-pressure) for data incoming from a UDP stream.
38pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
39
40// Imported for some IDEs to not treat the `json!` macro inside the `schema` macro as an error
41#[allow(unused_imports)]
42use serde_json::json;
43
44#[serde_as]
45#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
46#[schema(
47    example = json!({"Plain": "example.com:80"}),
48    example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), // base64 for "HelloWorld"
49    example = json!({"Service": 0})
50)]
51/// Session target specification.
52pub enum SessionTargetSpec {
53    Plain(String),
54    Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
55    #[schema(value_type = u32)]
56    Service(ServiceId),
57}
58
59impl std::fmt::Display for SessionTargetSpec {
60    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61        match self {
62            SessionTargetSpec::Plain(t) => write!(f, "{t}"),
63            SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
64            SessionTargetSpec::Service(t) => write!(f, "#{t}"),
65        }
66    }
67}
68
69impl FromStr for SessionTargetSpec {
70    type Err = HoprLibError;
71
72    fn from_str(s: &str) -> Result<Self, Self::Err> {
73        Ok(if let Some(stripped) = s.strip_prefix("$$") {
74            Self::Sealed(
75                base64::prelude::BASE64_URL_SAFE
76                    .decode(stripped)
77                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
78            )
79        } else if let Some(stripped) = s.strip_prefix("#") {
80            Self::Service(
81                stripped
82                    .parse()
83                    .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
84            )
85        } else {
86            Self::Plain(s.to_owned())
87        })
88    }
89}
90
91impl From<SessionTargetSpec> for hopr_lib::utils::session::SessionTargetSpec {
92    fn from(spec: SessionTargetSpec) -> Self {
93        match spec {
94            SessionTargetSpec::Plain(t) => Self::Plain(t),
95            SessionTargetSpec::Sealed(t) => Self::Sealed(t),
96            SessionTargetSpec::Service(t) => Self::Service(t),
97        }
98    }
99}
100
101#[repr(u8)]
102#[derive(
103    Debug,
104    Clone,
105    strum::EnumIter,
106    strum::Display,
107    strum::EnumString,
108    Serialize,
109    Deserialize,
110    PartialEq,
111    utoipa::ToSchema,
112)]
113#[schema(example = "Segmentation")]
114/// Session capabilities that can be negotiated with the target peer.
115pub enum SessionCapability {
116    /// Frame segmentation
117    Segmentation,
118    /// Frame retransmission (ACK and NACK-based)
119    Retransmission,
120    /// Frame retransmission (only ACK-based)
121    RetransmissionAckOnly,
122    /// Disable packet buffering
123    NoDelay,
124    /// Disable SURB-based egress rate control at the Exit.
125    NoRateControl,
126}
127
128impl From<SessionCapability> for hopr_lib::SessionCapabilities {
129    fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
130        match cap {
131            SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
132            SessionCapability::Retransmission => {
133                hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
134            }
135            SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
136            SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
137            SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
138        }
139    }
140}
141
142#[serde_as]
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
144#[serde(rename_all = "camelCase")]
145pub(crate) struct SessionWebsocketClientQueryRequest {
146    #[serde_as(as = "DisplayFromStr")]
147    #[schema(required = true, value_type = String)]
148    pub destination: Address,
149    #[schema(required = true)]
150    pub hops: u8,
151    #[cfg(feature = "explicit-path")]
152    #[schema(required = false, value_type = String)]
153    pub path: Option<Vec<Address>>,
154    #[schema(required = true)]
155    #[serde_as(as = "Vec<DisplayFromStr>")]
156    pub capabilities: Vec<SessionCapability>,
157    #[schema(required = true)]
158    #[serde_as(as = "DisplayFromStr")]
159    pub target: SessionTargetSpec,
160    #[schema(required = false)]
161    #[serde(default = "default_protocol")]
162    pub protocol: IpProtocol,
163}
164
165#[inline]
166fn default_protocol() -> IpProtocol {
167    IpProtocol::TCP
168}
169
170impl SessionWebsocketClientQueryRequest {
171    pub(crate) async fn into_protocol_session_config(
172        self,
173    ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
174        #[cfg(not(feature = "explicit-path"))]
175        let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
176
177        #[cfg(feature = "explicit-path")]
178        let path_options = if let Some(path) = self.path {
179            // Explicit `path` will override `hops`
180            let path = path.into_iter().map(NodeId::from).collect::<Vec<_>>();
181            hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
182        } else {
183            hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
184        };
185
186        let mut capabilities = SessionCapabilities::empty();
187        capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
188
189        let target_spec: hopr_lib::utils::session::SessionTargetSpec = self.target.into();
190
191        Ok((
192            self.destination,
193            target_spec.into_target(self.protocol.into())?,
194            SessionClientConfig {
195                forward_path_options: path_options.clone(),
196                return_path_options: path_options.clone(), // TODO: allow using separate return options
197                capabilities,
198                ..Default::default()
199            },
200        ))
201    }
202}
203
204#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
205#[schema(value_type = String, format = Binary)]
206#[allow(dead_code)] // not dead code, just for codegen
207struct WssData(Vec<u8>);
208
209/// Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR
210/// sessions.
211///
212/// Once configured, the session represents and automatically managed connection to a target peer through a network
213/// routing configuration. The session can be used to send and receive binary data over the network.
214///
215/// Authentication (if enabled) is done by cookie `X-Auth-Token`.
216///
217/// Connect to the endpoint by using a WS client. No preview is available. Example:
218/// `ws://127.0.0.1:3001/api/v4/session/websocket`
219#[allow(dead_code)] // not dead code, just for documentation
220#[utoipa::path(
221        get,
222        path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
223        description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
224        request_body(
225            content = SessionWebsocketClientQueryRequest,
226            content_type = "application/json",
227            description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
228        ),
229        responses(
230            (status = 200, description = "Successfully created a new client websocket session."),
231            (status = 401, description = "Invalid authorization token.", body = ApiError),
232            (status = 422, description = "Unknown failure", body = ApiError),
233            (status = 429, description = "Too many open websocket connections.", body = ApiError),
234        ),
235        security(
236            ("api_token" = []),
237            ("bearer_token" = [])
238        ),
239        tag = "Session",
240    )]
241
242pub(crate) async fn websocket(
243    ws: WebSocketUpgrade,
244    Query(query): Query<SessionWebsocketClientQueryRequest>,
245    State(state): State<Arc<InternalState>>,
246) -> Result<impl IntoResponse, impl IntoResponse> {
247    let (dst, target, data) = query
248        .into_protocol_session_config()
249        .await
250        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
251
252    let hopr = state.hopr.clone();
253    let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
254        error!(error = %e, "Failed to establish session");
255        (
256            StatusCode::UNPROCESSABLE_ENTITY,
257            ApiErrorStatus::UnknownFailure(e.to_string()),
258        )
259    })?;
260
261    Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
262}
263
264enum WebSocketInput {
265    Network(Result<Box<[u8]>, std::io::Error>),
266    WsInput(Result<Message, Error>),
267}
268
269/// The maximum number of bytes read from a Session that WS can transfer within a single message.
270const WS_MAX_SESSION_READ_SIZE: usize = 4096;
271
272#[tracing::instrument(level = "debug", skip(socket, session))]
273async fn websocket_connection(socket: WebSocket, session: HoprSession) {
274    let session_id = *session.id();
275
276    let (rx, mut tx) = session.split();
277    let (mut sender, receiver) = socket.split();
278
279    let mut queue = (
280        receiver.map(WebSocketInput::WsInput),
281        AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
282    )
283        .merge();
284
285    let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
286
287    while let Some(v) = queue.next().await {
288        match v {
289            WebSocketInput::Network(bytes) => match bytes {
290                Ok(bytes) => {
291                    let len = bytes.len();
292                    if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
293                        error!(
294                            error = %e,
295                            "Failed to emit read data onto the websocket, closing connection"
296                        );
297                        break;
298                    };
299                    bytes_from_session += len;
300                }
301                Err(e) => {
302                    error!(
303                        error = %e,
304                        "Failed to push data from network to socket, closing connection"
305                    );
306                    break;
307                }
308            },
309            WebSocketInput::WsInput(ws_in) => match ws_in {
310                Ok(Message::Binary(data)) => {
311                    let len = data.len();
312                    if let Err(e) = tx.write(data.as_ref()).await {
313                        error!(error = %e, "Failed to write data to the session, closing connection");
314                        break;
315                    }
316                    bytes_to_session += len;
317                }
318                Ok(Message::Text(_)) => {
319                    error!("Received string instead of binary data, closing connection");
320                    break;
321                }
322                Ok(Message::Close(_)) => {
323                    debug!("Received close frame, closing connection");
324                    break;
325                }
326                Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
327                Err(e) => {
328                    error!(error = %e, "Failed to get a valid websocket message, closing connection");
329                    break;
330                }
331            },
332        }
333    }
334
335    info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
336}
337
338#[serde_as]
339#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
340#[schema(example = json!({ "Hops": 1 }))]
341/// Routing options for the Session.
342pub enum RoutingOptions {
343    #[cfg(feature = "explicit-path")]
344    #[schema(value_type = Vec<String>)]
345    IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<NodeId>),
346    Hops(usize),
347}
348
349impl RoutingOptions {
350    pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
351        Ok(match self {
352            #[cfg(feature = "explicit-path")]
353            RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
354            RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
355        })
356    }
357}
358
359impl From<hopr_lib::RoutingOptions> for RoutingOptions {
360    fn from(opts: hopr_lib::RoutingOptions) -> Self {
361        match opts {
362            #[cfg(feature = "explicit-path")]
363            hopr_lib::RoutingOptions::IntermediatePath(path) => {
364                RoutingOptions::IntermediatePath(path.into_iter().collect())
365            }
366            hopr_lib::RoutingOptions::Hops(hops) => RoutingOptions::Hops(usize::from(hops)),
367        }
368    }
369}
370
371#[serde_as]
372#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
373#[schema(example = json!({
374        "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
375        "forwardPath": { "Hops": 1 },
376        "returnPath": { "Hops": 1 },
377        "target": {"Plain": "localhost:8080"},
378        "listenHost": "127.0.0.1:10000",
379        "capabilities": ["Retransmission", "Segmentation"],
380        "responseBuffer": "2 MB",
381        "maxSurbUpstream": "2000 kb/s",
382        "sessionPool": 0,
383        "maxClientSessions": 2
384    }))]
385#[serde(rename_all = "camelCase")]
386/// Request body for creating a new client session.
387pub(crate) struct SessionClientRequest {
388    /// Address of the Exit node.
389    #[serde_as(as = "DisplayFromStr")]
390    #[schema(value_type = String)]
391    pub destination: Address,
392    /// The forward path for the Session.
393    pub forward_path: RoutingOptions,
394    /// The return path for the Session.
395    pub return_path: RoutingOptions,
396    /// Target for the Session.
397    pub target: SessionTargetSpec,
398    /// Listen host (`ip:port`) for the Session socket at the Entry node.
399    ///
400    /// Supports also partial specification (only `ip` or only `:port`) with the
401    /// respective part replaced by the node's configured default.
402    pub listen_host: Option<String>,
403    #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
404    /// Capabilities for the Session protocol.
405    ///
406    /// Defaults to `Segmentation` and `Retransmission` for TCP and nothing for UDP.
407    pub capabilities: Option<Vec<SessionCapability>>,
408    /// The amount of response data the Session counterparty can deliver back to us,
409    /// without us sending any SURBs to them.
410    ///
411    /// In other words, this size is recalculated to a number of SURBs delivered
412    /// to the counterparty upfront and then maintained.
413    /// The maintenance is dynamic, based on the number of responses we receive.
414    ///
415    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
416    /// at least the size of 2 Session packet payloads.
417    #[serde_as(as = "Option<DisplayFromStr>")]
418    #[schema(value_type = Option<String>)]
419    pub response_buffer: Option<bytesize::ByteSize>,
420    /// The maximum throughput at which artificial SURBs might be generated and sent
421    /// to the recipient of the Session.
422    ///
423    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
424    /// this should roughly match the maximum retrieval throughput.
425    ///
426    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
427    #[serde(default)]
428    #[serde(with = "human_bandwidth::option")]
429    #[schema(value_type = Option<String>)]
430    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
431    /// How many Sessions to pool for clients.
432    ///
433    /// If no sessions are pooled, they will be opened ad-hoc when a client connects.
434    /// It has no effect on UDP sessions in the current implementation.
435    ///
436    /// Currently, the maximum value is 5.
437    pub session_pool: Option<usize>,
438    /// The maximum number of client sessions that the listener can spawn.
439    ///
440    /// This currently applies only to the TCP sessions, as UDP sessions cannot
441    /// handle multiple clients (and spawn therefore always only a single session).
442    ///
443    /// If this value is smaller than the value specified in `session_pool`, it will
444    /// be set to that value.
445    ///
446    /// The default value is 5.
447    pub max_client_sessions: Option<usize>,
448}
449
450impl SessionClientRequest {
451    pub(crate) async fn into_protocol_session_config(
452        self,
453        target_protocol: IpProtocol,
454    ) -> Result<(hopr_lib::Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
455        let target_spec: hopr_lib::utils::session::SessionTargetSpec = self.target.clone().into();
456        Ok((
457            self.destination,
458            target_spec.into_target(target_protocol.into())?,
459            SessionClientConfig {
460                forward_path_options: self.forward_path.resolve().await?,
461                return_path_options: self.return_path.resolve().await?,
462                capabilities: self
463                    .capabilities
464                    .map(|vs| {
465                        let mut caps = SessionCapabilities::empty();
466                        caps.extend(vs.into_iter().map(SessionCapabilities::from));
467                        caps
468                    })
469                    .unwrap_or_else(|| match target_protocol {
470                        IpProtocol::TCP => {
471                            hopr_lib::SessionCapability::RetransmissionAck
472                                | hopr_lib::SessionCapability::RetransmissionNack
473                                | hopr_lib::SessionCapability::Segmentation
474                        }
475                        // Only Segmentation capability for UDP per default
476                        _ => SessionCapability::Segmentation.into(),
477                    }),
478                surb_management: SessionConfig {
479                    response_buffer: self.response_buffer,
480                    max_surb_upstream: self.max_surb_upstream,
481                }
482                .into(),
483                ..Default::default()
484            },
485        ))
486    }
487}
488
489#[serde_as]
490#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
491#[schema(example = json!({
492        "target": "example.com:80",
493        "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
494        "forwardPath": { "Hops": 1 },
495        "returnPath": { "Hops": 1 },
496        "protocol": "tcp",
497        "ip": "127.0.0.1",
498        "port": 5542,
499        "hoprMtu": 1002,
500        "surbLen": 398,
501        "activeClients": [],
502        "maxClientSessions": 2,
503        "maxSurbUpstream": "2000 kb/s",
504        "responseBuffer": "2 MB",
505        "sessionPool": 0
506    }))]
507#[serde(rename_all = "camelCase")]
508/// Response body for creating a new client session.
509pub(crate) struct SessionClientResponse {
510    #[schema(example = "example.com:80")]
511    /// Target of the Session.
512    pub target: String,
513    /// Destination node (exit node) of the Session.
514    #[serde_as(as = "DisplayFromStr")]
515    #[schema(value_type = String)]
516    pub destination: Address,
517    /// Forward routing path.
518    pub forward_path: RoutingOptions,
519    /// Return routing path.
520    pub return_path: RoutingOptions,
521    /// IP protocol used by Session's listening socket.
522    #[serde_as(as = "DisplayFromStr")]
523    #[schema(example = "tcp")]
524    pub protocol: IpProtocol,
525    /// Listening IP address of the Session's socket.
526    #[schema(example = "127.0.0.1")]
527    pub ip: String,
528    #[schema(example = 5542)]
529    /// Listening port of the Session's socket.
530    pub port: u16,
531    /// MTU used by the underlying HOPR transport.
532    pub hopr_mtu: usize,
533    /// Size of a Single Use Reply Block used by the protocol.
534    ///
535    /// This is useful for SURB balancing calculations.
536    pub surb_len: usize,
537    /// Lists Session IDs of all active clients.
538    ///
539    /// Can contain multiple entries on TCP sessions, but currently
540    /// always only a single entry on UDP sessions.
541    pub active_clients: Vec<String>,
542    /// The maximum number of client sessions that the listener can spawn.
543    ///
544    /// This currently applies only to the TCP sessions, as UDP sessions cannot
545    /// have multiple clients (defaults to 1 for UDP).
546    pub max_client_sessions: usize,
547    /// The maximum throughput at which artificial SURBs might be generated and sent
548    /// to the recipient of the Session.    
549    #[serde(default)]
550    #[serde(with = "human_bandwidth::option")]
551    #[schema(value_type = Option<String>)]
552    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
553    /// The amount of response data the Session counterparty can deliver back to us, without us
554    /// sending any SURBs to them.
555    #[serde_as(as = "Option<DisplayFromStr>")]
556    #[schema(value_type = Option<String>)]
557    pub response_buffer: Option<bytesize::ByteSize>,
558    /// How many Sessions to pool for clients.
559    pub session_pool: Option<usize>,
560}
561
562/// Creates a new client session returning the given session listening host and port over TCP or UDP.
563/// If no listening port is given in the request, the socket will be bound to a random free
564/// port and returned in the response.
565/// Different capabilities can be configured for the session, such as data segmentation or
566/// retransmission.
567///
568/// Once the host and port are bound, it is possible to use the socket for bidirectional read/write
569/// communication over the selected IP protocol and HOPR network routing with the given destination.
570/// The destination HOPR node forwards all the data to the given target over the selected IP protocol.
571///
572/// Various services require different types of socket communications:
573/// - services running over UDP usually do not require data retransmission, as it is already expected
574/// that UDP does not provide these and is therefore handled at the application layer.
575/// - On the contrary, services running over TCP *almost always* expect data segmentation and
576/// retransmission capabilities, so these should be configured while creating a session that passes
577/// TCP data.
578#[utoipa::path(
579        post,
580        path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
581        description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
582        params(
583            ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
584        ),
585        request_body(
586            content = SessionClientRequest,
587            description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
588            content_type = "application/json"),
589        responses(
590            (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
591            (status = 400, description = "Invalid IP protocol.", body = ApiError),
592            (status = 401, description = "Invalid authorization token.", body = ApiError),
593            (status = 409, description = "Listening address and port already in use.", body = ApiError),
594            (status = 422, description = "Unknown failure", body = ApiError),
595        ),
596        security(
597            ("api_token" = []),
598            ("bearer_token" = [])
599        ),
600        tag = "Session"
601    )]
602pub(crate) async fn create_client(
603    State(state): State<Arc<InternalState>>,
604    Path(protocol): Path<IpProtocol>,
605    Json(args): Json<SessionClientRequest>,
606) -> Result<impl IntoResponse, impl IntoResponse> {
607    let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
608
609    let listener_id = ListenerId(protocol.into(), bind_host);
610    if bind_host.port() > 0 && state.open_listeners.0.contains_key(&listener_id) {
611        return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
612    }
613
614    let port_range = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE).ok();
615    debug!("binding {protocol} session listening socket to {bind_host} (port range limitations: {port_range:?})");
616
617    let (bound_host, udp_session_id, max_clients) = match protocol {
618        IpProtocol::TCP => {
619            let session_pool = args.session_pool;
620            let max_client_sessions = args.max_client_sessions;
621            let target_spec: hopr_lib::utils::session::SessionTargetSpec = args.target.clone().into();
622            let (destination, _target, config) = args
623                .clone()
624                .into_protocol_session_config(IpProtocol::TCP)
625                .await
626                .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
627
628            create_tcp_client_binding(
629                bind_host,
630                port_range,
631                state.hopr.clone(),
632                state.open_listeners.clone(),
633                destination,
634                target_spec,
635                config,
636                session_pool,
637                max_client_sessions,
638            )
639            .await
640            .map_err(|e| match e {
641                hopr_lib::utils::session::BindError::ListenHostAlreadyUsed => {
642                    (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
643                }
644                hopr_lib::utils::session::BindError::UnknownFailure(_) => (
645                    StatusCode::UNPROCESSABLE_ENTITY,
646                    ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
647                ),
648            })?
649        }
650        IpProtocol::UDP => {
651            let target_spec: hopr_lib::utils::session::SessionTargetSpec = args.target.clone().into();
652            let (destination, _target, config) = args
653                .clone()
654                .into_protocol_session_config(IpProtocol::UDP)
655                .await
656                .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
657
658            create_udp_client_binding(
659                bind_host,
660                port_range,
661                state.hopr.clone(),
662                state.open_listeners.clone(),
663                destination,
664                target_spec,
665                config,
666            )
667            .await
668            .map_err(|e| match e {
669                hopr_lib::utils::session::BindError::ListenHostAlreadyUsed => {
670                    (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
671                }
672                hopr_lib::utils::session::BindError::UnknownFailure(_) => (
673                    StatusCode::UNPROCESSABLE_ENTITY,
674                    ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
675                ),
676            })?
677        }
678    };
679
680    Ok::<_, (StatusCode, ApiErrorStatus)>(
681        (
682            StatusCode::OK,
683            Json(SessionClientResponse {
684                protocol,
685                ip: bound_host.ip().to_string(),
686                port: bound_host.port(),
687                target: args.target.to_string(),
688                destination: args.destination,
689                forward_path: args.forward_path.clone(),
690                return_path: args.return_path.clone(),
691                hopr_mtu: SESSION_MTU,
692                surb_len: SURB_SIZE,
693                active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
694                max_client_sessions: max_clients,
695                max_surb_upstream: args.max_surb_upstream,
696                response_buffer: args.response_buffer,
697                session_pool: args.session_pool,
698            }),
699        )
700            .into_response(),
701    )
702}
703
704/// Lists existing Session listeners for the given IP protocol.
705#[utoipa::path(
706    get,
707    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
708    description = "Lists existing Session listeners for the given IP protocol.",
709    params(
710        ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
711    ),
712    responses(
713        (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
714            {
715                "target": "example.com:80",
716                "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
717                "forwardPath": { "Hops": 1 },
718                "returnPath": { "Hops": 1 },
719                "protocol": "tcp",
720                "ip": "127.0.0.1",
721                "port": 5542,
722                "surbLen": 400,
723                "hoprMtu": 1020,
724                "activeClients": [],
725                "maxClientSessions": 2,
726                "maxSurbUpstream": "2000 kb/s",
727                "responseBuffer": "2 MB",
728                "sessionPool": 0
729            }
730        ])),
731        (status = 400, description = "Invalid IP protocol.", body = ApiError),
732        (status = 401, description = "Invalid authorization token.", body = ApiError),
733        (status = 422, description = "Unknown failure", body = ApiError)
734    ),
735    security(
736        ("api_token" = []),
737        ("bearer_token" = [])
738    ),
739    tag = "Session",
740)]
741pub(crate) async fn list_clients(
742    State(state): State<Arc<InternalState>>,
743    Path(protocol): Path<IpProtocol>,
744) -> Result<impl IntoResponse, impl IntoResponse> {
745    let response = state
746        .open_listeners
747        .0
748        .iter()
749        .filter(|v| v.key().0 == protocol.into())
750        .map(|v| {
751            let ListenerId(_, addr) = *v.key();
752            let entry = v.value();
753            SessionClientResponse {
754                protocol,
755                ip: addr.ip().to_string(),
756                port: addr.port(),
757                target: entry.target.to_string(),
758                forward_path: entry.forward_path.clone().into(),
759                return_path: entry.return_path.clone().into(),
760                destination: entry.destination,
761                hopr_mtu: SESSION_MTU,
762                surb_len: SURB_SIZE,
763                active_clients: entry.get_clients().iter().map(|e| e.key().to_string()).collect(),
764                max_client_sessions: entry.max_client_sessions,
765                max_surb_upstream: entry.max_surb_upstream,
766                response_buffer: entry.response_buffer,
767                session_pool: entry.session_pool,
768            }
769        })
770        .collect::<Vec<_>>();
771
772    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
773}
774
775#[serde_as]
776#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
777#[schema(example = json!({
778    "responseBuffer": "2 MB",
779    "maxSurbUpstream": "2 Mbps"
780}))]
781#[serde(rename_all = "camelCase")]
782pub(crate) struct SessionConfig {
783    /// The amount of response data the Session counterparty can deliver back to us,
784    /// without us sending any SURBs to them.
785    ///
786    /// In other words, this size is recalculated to a number of SURBs delivered
787    /// to the counterparty upfront and then maintained.
788    /// The maintenance is dynamic, based on the number of responses we receive.
789    ///
790    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
791    /// at least the size of 2 Session packet payloads.
792    #[serde(default)]
793    #[serde_as(as = "Option<DisplayFromStr>")]
794    #[schema(value_type = String)]
795    pub response_buffer: Option<bytesize::ByteSize>,
796    /// The maximum throughput at which artificial SURBs might be generated and sent
797    /// to the recipient of the Session.
798    ///
799    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
800    /// this should roughly match the maximum retrieval throughput.
801    ///
802    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
803    #[serde(default)]
804    #[serde(with = "human_bandwidth::option")]
805    #[schema(value_type = String)]
806    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
807}
808
809impl From<SessionConfig> for Option<SurbBalancerConfig> {
810    fn from(value: SessionConfig) -> Self {
811        match value.response_buffer {
812            // Buffer worth at least 2 reply packets
813            Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
814                target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
815                max_surbs_per_sec: value
816                    .max_surb_upstream
817                    .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
818                    .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
819                ..Default::default()
820            }),
821            // No additional SURBs are set up and maintained, useful for high-send low-reply sessions
822            Some(_) => None,
823            // Use defaults otherwise
824            None => Some(SurbBalancerConfig::default()),
825        }
826    }
827}
828
829impl From<SurbBalancerConfig> for SessionConfig {
830    fn from(value: SurbBalancerConfig) -> Self {
831        Self {
832            response_buffer: Some(bytesize::ByteSize::b(
833                value.target_surb_buffer_size * SESSION_MTU as u64,
834            )),
835            max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
836                value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
837            )),
838        }
839    }
840}
841
842#[utoipa::path(
843    post,
844    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
845    description = "Updates configuration of an existing active session.",
846    params(
847        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
848    ),
849    request_body(
850            content = SessionConfig,
851            description = "Allows updating of several parameters of an existing active session.",
852            content_type = "application/json"),
853    responses(
854            (status = 204, description = "Successfully updated the configuration"),
855            (status = 400, description = "Invalid configuration.", body = ApiError),
856            (status = 401, description = "Invalid authorization token.", body = ApiError),
857            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
858            (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
859            (status = 422, description = "Unknown failure", body = ApiError),
860    ),
861    security(
862            ("api_token" = []),
863            ("bearer_token" = [])
864    ),
865    tag = "Session"
866)]
867pub(crate) async fn adjust_session(
868    State(state): State<Arc<InternalState>>,
869    Path(session_id): Path<String>,
870    Json(args): Json<SessionConfig>,
871) -> Result<impl IntoResponse, impl IntoResponse> {
872    let session_id =
873        SessionId::from_str(&session_id).map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
874
875    if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
876        match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
877            Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
878            Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
879                SessionManagerError::NonExistingSession,
880            )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
881            Err(e) => Err((
882                StatusCode::NOT_ACCEPTABLE,
883                ApiErrorStatus::UnknownFailure(e.to_string()),
884            )),
885        }
886    } else {
887        Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
888    }
889}
890
891#[utoipa::path(
892    get,
893    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
894    description = "Gets configuration of an existing active session.",
895    params(
896        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
897    ),
898    responses(
899            (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
900            (status = 400, description = "Invalid session ID.", body = ApiError),
901            (status = 401, description = "Invalid authorization token.", body = ApiError),
902            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
903            (status = 422, description = "Unknown failure", body = ApiError),
904    ),
905    security(
906            ("api_token" = []),
907            ("bearer_token" = [])
908    ),
909    tag = "Session"
910)]
911pub(crate) async fn session_config(
912    State(state): State<Arc<InternalState>>,
913    Path(session_id): Path<String>,
914) -> Result<impl IntoResponse, impl IntoResponse> {
915    let session_id =
916        SessionId::from_str(&session_id).map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
917
918    match state.hopr.get_session_surb_balancer_config(&session_id).await {
919        Ok(Some(cfg)) => {
920            Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
921        }
922        Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
923        Err(e) => Err((
924            StatusCode::UNPROCESSABLE_ENTITY,
925            ApiErrorStatus::UnknownFailure(e.to_string()),
926        )),
927    }
928}
929
930#[derive(
931    Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
932)]
933#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
934#[serde(rename_all = "lowercase")]
935#[schema(example = "tcp")]
936/// IP transport protocol
937pub enum IpProtocol {
938    #[allow(clippy::upper_case_acronyms)]
939    TCP,
940    #[allow(clippy::upper_case_acronyms)]
941    UDP,
942}
943
944impl From<IpProtocol> for hopr_lib::IpProtocol {
945    fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
946        match protocol {
947            IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
948            IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
949        }
950    }
951}
952
953#[serde_as]
954#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
955pub struct SessionCloseClientQuery {
956    #[serde_as(as = "DisplayFromStr")]
957    #[schema(value_type = String, example = "tcp")]
958    /// IP transport protocol
959    pub protocol: IpProtocol,
960
961    /// Listening IP address of the Session.
962    #[schema(example = "127.0.0.1:8545")]
963    pub ip: String,
964
965    /// Session port used for the listener.
966    #[schema(value_type = u16, example = 10101)]
967    pub port: u16,
968}
969
970/// Closes an existing Session listener.
971/// The listener must've been previously created and bound for the given IP protocol.
972/// Once a listener is closed, no more socket connections can be made to it.
973/// If the passed port number is 0, listeners on all ports of the given listening IP and protocol
974/// will be closed.
975#[utoipa::path(
976    delete,
977    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
978    description = "Closes an existing Session listener.",
979    params(SessionCloseClientQuery),
980    responses(
981            (status = 204, description = "Listener closed successfully"),
982            (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
983            (status = 401, description = "Invalid authorization token.", body = ApiError),
984            (status = 404, description = "Listener not found.", body = ApiError),
985            (status = 422, description = "Unknown failure", body = ApiError)
986    ),
987    security(
988            ("api_token" = []),
989            ("bearer_token" = [])
990    ),
991    tag = "Session",
992)]
993pub(crate) async fn close_client(
994    State(state): State<Arc<InternalState>>,
995    Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
996) -> Result<impl IntoResponse, impl IntoResponse> {
997    let listening_ip: IpAddr = ip
998        .parse()
999        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
1000
1001    {
1002        let open_listeners = &state.open_listeners.0;
1003
1004        let mut to_remove = Vec::new();
1005        let protocol: hopr_lib::IpProtocol = protocol.into();
1006
1007        // Find all listeners with protocol, listening IP and optionally port number (if > 0)
1008        open_listeners
1009            .iter()
1010            .filter(|v| {
1011                let ListenerId(proto, addr) = v.key();
1012                protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1013            })
1014            .for_each(|v| to_remove.push(*v.key()));
1015
1016        if to_remove.is_empty() {
1017            return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1018        }
1019
1020        for bound_addr in to_remove {
1021            let (_, entry) = open_listeners
1022                .remove(&bound_addr)
1023                .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1024
1025            entry.abort_handle.abort();
1026        }
1027    }
1028
1029    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1030}