hoprd_api/
session.rs

1use std::{fmt::Formatter, hash::Hash, net::IpAddr, str::FromStr, sync::Arc};
2
3use axum::{
4    Error,
5    extract::{
6        Json, Path, State,
7        ws::{Message, WebSocket, WebSocketUpgrade},
8    },
9    http::status::StatusCode,
10    response::IntoResponse,
11};
12use axum_extra::extract::Query;
13use base64::Engine;
14use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt};
15use futures_concurrency::stream::Merge;
16use hopr_lib::{
17    Address, HoprSession, HoprSessionId, HoprTransportError, NodeId, SESSION_MTU, SURB_SIZE, ServiceId,
18    SessionCapabilities, SessionClientConfig, SessionManagerError, SessionTarget, SurbBalancerConfig,
19    TransportSessionError,
20    errors::HoprLibError,
21    utils::{
22        futures::AsyncReadStreamer,
23        session::{ListenerId, build_binding_host, create_tcp_client_binding, create_udp_client_binding},
24    },
25};
26use serde::{Deserialize, Serialize};
27use serde_with::{DisplayFromStr, serde_as};
28use tracing::{debug, error, info, trace};
29
30use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState};
31
32/// Size of the buffer for forwarding data to/from a TCP stream.
33pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
34
35/// Size of the buffer for forwarding data to/from a UDP stream.
36pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
37
38/// Size of the queue (back-pressure) for data incoming from a UDP stream.
39pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
40
41// Imported for some IDEs to not treat the `json!` macro inside the `schema` macro as an error
42#[allow(unused_imports)]
43use serde_json::json;
44
45#[serde_as]
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
47#[schema(
48    example = json!({"Plain": "example.com:80"}),
49    example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), // base64 for "HelloWorld"
50    example = json!({"Service": 0})
51)]
52/// Session target specification.
53pub enum SessionTargetSpec {
54    Plain(String),
55    Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
56    #[schema(value_type = u32)]
57    Service(ServiceId),
58}
59
60impl std::fmt::Display for SessionTargetSpec {
61    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62        match self {
63            SessionTargetSpec::Plain(t) => write!(f, "{t}"),
64            SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
65            SessionTargetSpec::Service(t) => write!(f, "#{t}"),
66        }
67    }
68}
69
70impl FromStr for SessionTargetSpec {
71    type Err = HoprLibError;
72
73    fn from_str(s: &str) -> Result<Self, Self::Err> {
74        Ok(if let Some(stripped) = s.strip_prefix("$$") {
75            Self::Sealed(
76                base64::prelude::BASE64_URL_SAFE
77                    .decode(stripped)
78                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
79            )
80        } else if let Some(stripped) = s.strip_prefix("#") {
81            Self::Service(
82                stripped
83                    .parse()
84                    .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
85            )
86        } else {
87            Self::Plain(s.to_owned())
88        })
89    }
90}
91
92impl From<SessionTargetSpec> for hopr_lib::utils::session::SessionTargetSpec {
93    fn from(spec: SessionTargetSpec) -> Self {
94        match spec {
95            SessionTargetSpec::Plain(t) => Self::Plain(t),
96            SessionTargetSpec::Sealed(t) => Self::Sealed(t),
97            SessionTargetSpec::Service(t) => Self::Service(t),
98        }
99    }
100}
101
102#[repr(u8)]
103#[derive(
104    Debug,
105    Clone,
106    strum::EnumIter,
107    strum::Display,
108    strum::EnumString,
109    Serialize,
110    Deserialize,
111    PartialEq,
112    utoipa::ToSchema,
113)]
114#[schema(example = "Segmentation")]
115/// Session capabilities that can be negotiated with the target peer.
116pub enum SessionCapability {
117    /// Frame segmentation
118    Segmentation,
119    /// Frame retransmission (ACK and NACK-based)
120    Retransmission,
121    /// Frame retransmission (only ACK-based)
122    RetransmissionAckOnly,
123    /// Disable packet buffering
124    NoDelay,
125    /// Disable SURB-based egress rate control at the Exit.
126    NoRateControl,
127}
128
129impl From<SessionCapability> for hopr_lib::SessionCapabilities {
130    fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
131        match cap {
132            SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
133            SessionCapability::Retransmission => {
134                hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
135            }
136            SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
137            SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
138            SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
139        }
140    }
141}
142
143#[serde_as]
144#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
145#[serde(rename_all = "camelCase")]
146pub(crate) struct SessionWebsocketClientQueryRequest {
147    #[serde_as(as = "DisplayFromStr")]
148    #[schema(required = true, value_type = String)]
149    pub destination: Address,
150    #[schema(required = true)]
151    pub hops: u8,
152    #[cfg(feature = "explicit-path")]
153    #[schema(required = false, value_type = String)]
154    pub path: Option<Vec<Address>>,
155    #[schema(required = true)]
156    #[serde_as(as = "Vec<DisplayFromStr>")]
157    pub capabilities: Vec<SessionCapability>,
158    #[schema(required = true)]
159    #[serde_as(as = "DisplayFromStr")]
160    pub target: SessionTargetSpec,
161    #[schema(required = false)]
162    #[serde(default = "default_protocol")]
163    pub protocol: IpProtocol,
164}
165
166#[inline]
167fn default_protocol() -> IpProtocol {
168    IpProtocol::TCP
169}
170
171impl SessionWebsocketClientQueryRequest {
172    pub(crate) async fn into_protocol_session_config(
173        self,
174    ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
175        #[cfg(not(feature = "explicit-path"))]
176        let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
177
178        #[cfg(feature = "explicit-path")]
179        let path_options = if let Some(path) = self.path {
180            // Explicit `path` will override `hops`
181            let path = path.into_iter().map(NodeId::from).collect::<Vec<_>>();
182            hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
183        } else {
184            hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
185        };
186
187        let mut capabilities = SessionCapabilities::empty();
188        capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
189
190        let target_spec: hopr_lib::utils::session::SessionTargetSpec = self.target.into();
191
192        Ok((
193            self.destination,
194            target_spec.into_target(self.protocol.into())?,
195            SessionClientConfig {
196                forward_path_options: path_options.clone(),
197                return_path_options: path_options.clone(), // TODO: allow using separate return options
198                capabilities,
199                ..Default::default()
200            },
201        ))
202    }
203}
204
205#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
206#[schema(value_type = String, format = Binary)]
207#[allow(dead_code)] // not dead code, just for codegen
208struct WssData(Vec<u8>);
209
210/// Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR
211/// sessions.
212///
213/// Once configured, the session represents and automatically managed connection to a target peer through a network
214/// routing configuration. The session can be used to send and receive binary data over the network.
215///
216/// Authentication (if enabled) is done by cookie `X-Auth-Token`.
217///
218/// Connect to the endpoint by using a WS client. No preview is available. Example:
219/// `ws://127.0.0.1:3001/api/v4/session/websocket`
220#[allow(dead_code)] // not dead code, just for documentation
221#[utoipa::path(
222        get,
223        path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
224        description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
225        request_body(
226            content = SessionWebsocketClientQueryRequest,
227            content_type = "application/json",
228            description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
229        ),
230        responses(
231            (status = 200, description = "Successfully created a new client websocket session."),
232            (status = 401, description = "Invalid authorization token.", body = ApiError),
233            (status = 422, description = "Unknown failure", body = ApiError),
234            (status = 429, description = "Too many open websocket connections.", body = ApiError),
235        ),
236        security(
237            ("api_token" = []),
238            ("bearer_token" = [])
239        ),
240        tag = "Session",
241    )]
242
243pub(crate) async fn websocket(
244    ws: WebSocketUpgrade,
245    Query(query): Query<SessionWebsocketClientQueryRequest>,
246    State(state): State<Arc<InternalState>>,
247) -> Result<impl IntoResponse, impl IntoResponse> {
248    let (dst, target, data) = query
249        .into_protocol_session_config()
250        .await
251        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
252
253    let hopr = state.hopr.clone();
254    let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
255        error!(error = %e, "Failed to establish session");
256        (
257            StatusCode::UNPROCESSABLE_ENTITY,
258            ApiErrorStatus::UnknownFailure(e.to_string()),
259        )
260    })?;
261
262    Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
263}
264
265enum WebSocketInput {
266    Network(Result<Box<[u8]>, std::io::Error>),
267    WsInput(Result<Message, Error>),
268}
269
270/// The maximum number of bytes read from a Session that WS can transfer within a single message.
271const WS_MAX_SESSION_READ_SIZE: usize = 4096;
272
273#[tracing::instrument(level = "debug", skip(socket, session))]
274async fn websocket_connection(socket: WebSocket, session: HoprSession) {
275    let session_id = *session.id();
276
277    let (rx, mut tx) = session.split();
278    let (mut sender, receiver) = socket.split();
279
280    let mut queue = (
281        receiver.map(WebSocketInput::WsInput),
282        AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
283    )
284        .merge();
285
286    let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
287
288    while let Some(v) = queue.next().await {
289        match v {
290            WebSocketInput::Network(bytes) => match bytes {
291                Ok(bytes) => {
292                    let len = bytes.len();
293                    if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
294                        error!(
295                            error = %e,
296                            "Failed to emit read data onto the websocket, closing connection"
297                        );
298                        break;
299                    };
300                    bytes_from_session += len;
301                }
302                Err(e) => {
303                    error!(
304                        error = %e,
305                        "Failed to push data from network to socket, closing connection"
306                    );
307                    break;
308                }
309            },
310            WebSocketInput::WsInput(ws_in) => match ws_in {
311                Ok(Message::Binary(data)) => {
312                    let len = data.len();
313                    if let Err(e) = tx.write(data.as_ref()).await {
314                        error!(error = %e, "Failed to write data to the session, closing connection");
315                        break;
316                    }
317                    bytes_to_session += len;
318                }
319                Ok(Message::Text(_)) => {
320                    error!("Received string instead of binary data, closing connection");
321                    break;
322                }
323                Ok(Message::Close(_)) => {
324                    debug!("Received close frame, closing connection");
325                    break;
326                }
327                Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
328                Err(e) => {
329                    error!(error = %e, "Failed to get a valid websocket message, closing connection");
330                    break;
331                }
332            },
333        }
334    }
335
336    info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
337}
338
339#[serde_as]
340#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
341#[schema(example = json!({ "Hops": 1 }))]
342/// Routing options for the Session.
343pub enum RoutingOptions {
344    #[cfg(feature = "explicit-path")]
345    #[schema(value_type = Vec<String>)]
346    IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<NodeId>),
347    Hops(usize),
348}
349
350impl RoutingOptions {
351    pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
352        Ok(match self {
353            #[cfg(feature = "explicit-path")]
354            RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
355            RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
356        })
357    }
358}
359
360impl From<hopr_lib::RoutingOptions> for RoutingOptions {
361    fn from(opts: hopr_lib::RoutingOptions) -> Self {
362        match opts {
363            #[cfg(feature = "explicit-path")]
364            hopr_lib::RoutingOptions::IntermediatePath(path) => {
365                RoutingOptions::IntermediatePath(path.into_iter().collect())
366            }
367            hopr_lib::RoutingOptions::Hops(hops) => RoutingOptions::Hops(usize::from(hops)),
368        }
369    }
370}
371
372#[serde_as]
373#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
374#[schema(example = json!({
375        "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
376        "forwardPath": { "Hops": 1 },
377        "returnPath": { "Hops": 1 },
378        "target": {"Plain": "localhost:8080"},
379        "listenHost": "127.0.0.1:10000",
380        "capabilities": ["Retransmission", "Segmentation"],
381        "responseBuffer": "2 MB",
382        "maxSurbUpstream": "2000 kb/s",
383        "sessionPool": 0,
384        "maxClientSessions": 2
385    }))]
386#[serde(rename_all = "camelCase")]
387/// Request body for creating a new client session.
388pub(crate) struct SessionClientRequest {
389    /// Address of the Exit node.
390    #[serde_as(as = "DisplayFromStr")]
391    #[schema(value_type = String)]
392    pub destination: Address,
393    /// The forward path for the Session.
394    pub forward_path: RoutingOptions,
395    /// The return path for the Session.
396    pub return_path: RoutingOptions,
397    /// Target for the Session.
398    pub target: SessionTargetSpec,
399    /// Listen host (`ip:port`) for the Session socket at the Entry node.
400    ///
401    /// Supports also partial specification (only `ip` or only `:port`) with the
402    /// respective part replaced by the node's configured default.
403    pub listen_host: Option<String>,
404    #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
405    /// Capabilities for the Session protocol.
406    ///
407    /// Defaults to `Segmentation` and `Retransmission` for TCP and nothing for UDP.
408    pub capabilities: Option<Vec<SessionCapability>>,
409    /// The amount of response data the Session counterparty can deliver back to us,
410    /// without us sending any SURBs to them.
411    ///
412    /// In other words, this size is recalculated to a number of SURBs delivered
413    /// to the counterparty upfront and then maintained.
414    /// The maintenance is dynamic, based on the number of responses we receive.
415    ///
416    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
417    /// at least the size of 2 Session packet payloads.
418    #[serde_as(as = "Option<DisplayFromStr>")]
419    #[schema(value_type = Option<String>)]
420    pub response_buffer: Option<bytesize::ByteSize>,
421    /// The maximum throughput at which artificial SURBs might be generated and sent
422    /// to the recipient of the Session.
423    ///
424    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
425    /// this should roughly match the maximum retrieval throughput.
426    ///
427    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
428    #[serde(default)]
429    #[serde(with = "human_bandwidth::option")]
430    #[schema(value_type = Option<String>)]
431    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
432    /// How many Sessions to pool for clients.
433    ///
434    /// If no sessions are pooled, they will be opened ad-hoc when a client connects.
435    /// It has no effect on UDP sessions in the current implementation.
436    ///
437    /// Currently, the maximum value is 5.
438    pub session_pool: Option<usize>,
439    /// The maximum number of client sessions that the listener can spawn.
440    ///
441    /// This currently applies only to the TCP sessions, as UDP sessions cannot
442    /// handle multiple clients (and spawn therefore always only a single session).
443    ///
444    /// If this value is smaller than the value specified in `session_pool`, it will
445    /// be set to that value.
446    ///
447    /// The default value is 5.
448    pub max_client_sessions: Option<usize>,
449}
450
451impl SessionClientRequest {
452    pub(crate) async fn into_protocol_session_config(
453        self,
454        target_protocol: IpProtocol,
455    ) -> Result<(hopr_lib::Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
456        let target_spec: hopr_lib::utils::session::SessionTargetSpec = self.target.clone().into();
457        Ok((
458            self.destination,
459            target_spec.into_target(target_protocol.into())?,
460            SessionClientConfig {
461                forward_path_options: self.forward_path.resolve().await?,
462                return_path_options: self.return_path.resolve().await?,
463                capabilities: self
464                    .capabilities
465                    .map(|vs| {
466                        let mut caps = SessionCapabilities::empty();
467                        caps.extend(vs.into_iter().map(SessionCapabilities::from));
468                        caps
469                    })
470                    .unwrap_or_else(|| match target_protocol {
471                        IpProtocol::TCP => {
472                            hopr_lib::SessionCapability::RetransmissionAck
473                                | hopr_lib::SessionCapability::RetransmissionNack
474                                | hopr_lib::SessionCapability::Segmentation
475                        }
476                        // Only Segmentation capability for UDP per default
477                        _ => SessionCapability::Segmentation.into(),
478                    }),
479                surb_management: SessionConfig {
480                    response_buffer: self.response_buffer,
481                    max_surb_upstream: self.max_surb_upstream,
482                }
483                .into(),
484                ..Default::default()
485            },
486        ))
487    }
488}
489
490#[serde_as]
491#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
492#[schema(example = json!({
493        "target": "example.com:80",
494        "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
495        "forwardPath": { "Hops": 1 },
496        "returnPath": { "Hops": 1 },
497        "protocol": "tcp",
498        "ip": "127.0.0.1",
499        "port": 5542,
500        "hoprMtu": 1002,
501        "surbLen": 398,
502        "activeClients": [],
503        "maxClientSessions": 2,
504        "maxSurbUpstream": "2000 kb/s",
505        "responseBuffer": "2 MB",
506        "sessionPool": 0
507    }))]
508#[serde(rename_all = "camelCase")]
509/// Response body for creating a new client session.
510pub(crate) struct SessionClientResponse {
511    #[schema(example = "example.com:80")]
512    /// Target of the Session.
513    pub target: String,
514    /// Destination node (exit node) of the Session.
515    #[serde_as(as = "DisplayFromStr")]
516    #[schema(value_type = String)]
517    pub destination: Address,
518    /// Forward routing path.
519    pub forward_path: RoutingOptions,
520    /// Return routing path.
521    pub return_path: RoutingOptions,
522    /// IP protocol used by Session's listening socket.
523    #[serde_as(as = "DisplayFromStr")]
524    #[schema(example = "tcp")]
525    pub protocol: IpProtocol,
526    /// Listening IP address of the Session's socket.
527    #[schema(example = "127.0.0.1")]
528    pub ip: String,
529    #[schema(example = 5542)]
530    /// Listening port of the Session's socket.
531    pub port: u16,
532    /// MTU used by the underlying HOPR transport.
533    pub hopr_mtu: usize,
534    /// Size of a Single Use Reply Block used by the protocol.
535    ///
536    /// This is useful for SURB balancing calculations.
537    pub surb_len: usize,
538    /// Lists Session IDs of all active clients.
539    ///
540    /// Can contain multiple entries on TCP sessions, but currently
541    /// always only a single entry on UDP sessions.
542    pub active_clients: Vec<String>,
543    /// The maximum number of client sessions that the listener can spawn.
544    ///
545    /// This currently applies only to the TCP sessions, as UDP sessions cannot
546    /// have multiple clients (defaults to 1 for UDP).
547    pub max_client_sessions: usize,
548    /// The maximum throughput at which artificial SURBs might be generated and sent
549    /// to the recipient of the Session.    
550    #[serde(default)]
551    #[serde(with = "human_bandwidth::option")]
552    #[schema(value_type = Option<String>)]
553    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
554    /// The amount of response data the Session counterparty can deliver back to us, without us
555    /// sending any SURBs to them.
556    #[serde_as(as = "Option<DisplayFromStr>")]
557    #[schema(value_type = Option<String>)]
558    pub response_buffer: Option<bytesize::ByteSize>,
559    /// How many Sessions to pool for clients.
560    pub session_pool: Option<usize>,
561}
562
563/// Creates a new client session returning the given session listening host and port over TCP or UDP.
564/// If no listening port is given in the request, the socket will be bound to a random free
565/// port and returned in the response.
566/// Different capabilities can be configured for the session, such as data segmentation or
567/// retransmission.
568///
569/// Once the host and port are bound, it is possible to use the socket for bidirectional read/write
570/// communication over the selected IP protocol and HOPR network routing with the given destination.
571/// The destination HOPR node forwards all the data to the given target over the selected IP protocol.
572///
573/// Various services require different types of socket communications:
574/// - services running over UDP usually do not require data retransmission, as it is already expected
575/// that UDP does not provide these and is therefore handled at the application layer.
576/// - On the contrary, services running over TCP *almost always* expect data segmentation and
577/// retransmission capabilities, so these should be configured while creating a session that passes
578/// TCP data.
579#[utoipa::path(
580        post,
581        path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
582        description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
583        params(
584            ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
585        ),
586        request_body(
587            content = SessionClientRequest,
588            description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
589            content_type = "application/json"),
590        responses(
591            (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
592            (status = 400, description = "Invalid IP protocol.", body = ApiError),
593            (status = 401, description = "Invalid authorization token.", body = ApiError),
594            (status = 409, description = "Listening address and port already in use.", body = ApiError),
595            (status = 422, description = "Unknown failure", body = ApiError),
596        ),
597        security(
598            ("api_token" = []),
599            ("bearer_token" = [])
600        ),
601        tag = "Session"
602    )]
603pub(crate) async fn create_client(
604    State(state): State<Arc<InternalState>>,
605    Path(protocol): Path<IpProtocol>,
606    Json(args): Json<SessionClientRequest>,
607) -> Result<impl IntoResponse, impl IntoResponse> {
608    let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
609
610    let listener_id = ListenerId(protocol.into(), bind_host);
611    if bind_host.port() > 0 && state.open_listeners.read_arc().await.contains_key(&listener_id) {
612        return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
613    }
614
615    let port_range = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE).ok();
616    debug!("binding {protocol} session listening socket to {bind_host} (port range limitations: {port_range:?})");
617
618    let (bound_host, udp_session_id, max_clients) = match protocol {
619        IpProtocol::TCP => {
620            let session_pool = args.session_pool;
621            let max_client_sessions = args.max_client_sessions;
622            let target_spec: hopr_lib::utils::session::SessionTargetSpec = args.target.clone().into();
623            let (destination, _target, config) = args
624                .clone()
625                .into_protocol_session_config(IpProtocol::TCP)
626                .await
627                .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
628
629            create_tcp_client_binding(
630                bind_host,
631                port_range,
632                state.hopr.clone(),
633                state.open_listeners.clone(),
634                destination,
635                target_spec,
636                config,
637                session_pool,
638                max_client_sessions,
639            )
640            .await
641            .map_err(|e| match e {
642                hopr_lib::utils::session::BindError::ListenHostAlreadyUsed => {
643                    (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
644                }
645                hopr_lib::utils::session::BindError::UnknownFailure(_) => (
646                    StatusCode::UNPROCESSABLE_ENTITY,
647                    ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
648                ),
649            })?
650        }
651        IpProtocol::UDP => {
652            let target_spec: hopr_lib::utils::session::SessionTargetSpec = args.target.clone().into();
653            let (destination, _target, config) = args
654                .clone()
655                .into_protocol_session_config(IpProtocol::UDP)
656                .await
657                .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
658
659            create_udp_client_binding(
660                bind_host,
661                port_range,
662                state.hopr.clone(),
663                state.open_listeners.clone(),
664                destination,
665                target_spec,
666                config,
667            )
668            .await
669            .map_err(|e| match e {
670                hopr_lib::utils::session::BindError::ListenHostAlreadyUsed => {
671                    (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
672                }
673                hopr_lib::utils::session::BindError::UnknownFailure(_) => (
674                    StatusCode::UNPROCESSABLE_ENTITY,
675                    ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
676                ),
677            })?
678        }
679    };
680
681    Ok::<_, (StatusCode, ApiErrorStatus)>(
682        (
683            StatusCode::OK,
684            Json(SessionClientResponse {
685                protocol,
686                ip: bound_host.ip().to_string(),
687                port: bound_host.port(),
688                target: args.target.to_string(),
689                destination: args.destination,
690                forward_path: args.forward_path.clone(),
691                return_path: args.return_path.clone(),
692                hopr_mtu: SESSION_MTU,
693                surb_len: SURB_SIZE,
694                active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
695                max_client_sessions: max_clients,
696                max_surb_upstream: args.max_surb_upstream,
697                response_buffer: args.response_buffer,
698                session_pool: args.session_pool,
699            }),
700        )
701            .into_response(),
702    )
703}
704
705/// Lists existing Session listeners for the given IP protocol.
706#[utoipa::path(
707    get,
708    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
709    description = "Lists existing Session listeners for the given IP protocol.",
710    params(
711        ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
712    ),
713    responses(
714        (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
715            {
716                "target": "example.com:80",
717                "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
718                "forwardPath": { "Hops": 1 },
719                "returnPath": { "Hops": 1 },
720                "protocol": "tcp",
721                "ip": "127.0.0.1",
722                "port": 5542,
723                "surbLen": 400,
724                "hoprMtu": 1020,
725                "activeClients": [],
726                "maxClientSessions": 2,
727                "maxSurbUpstream": "2000 kb/s",
728                "responseBuffer": "2 MB",
729                "sessionPool": 0
730            }
731        ])),
732        (status = 400, description = "Invalid IP protocol.", body = ApiError),
733        (status = 401, description = "Invalid authorization token.", body = ApiError),
734        (status = 422, description = "Unknown failure", body = ApiError)
735    ),
736    security(
737        ("api_token" = []),
738        ("bearer_token" = [])
739    ),
740    tag = "Session",
741)]
742pub(crate) async fn list_clients(
743    State(state): State<Arc<InternalState>>,
744    Path(protocol): Path<IpProtocol>,
745) -> Result<impl IntoResponse, impl IntoResponse> {
746    let response = state
747        .open_listeners
748        .read_arc()
749        .await
750        .iter()
751        .filter(|(id, _)| id.0 == protocol.into())
752        .map(|(id, entry)| SessionClientResponse {
753            protocol,
754            ip: id.1.ip().to_string(),
755            port: id.1.port(),
756            target: entry.target.to_string(),
757            forward_path: entry.forward_path.clone().into(),
758            return_path: entry.return_path.clone().into(),
759            destination: entry.destination,
760            hopr_mtu: SESSION_MTU,
761            surb_len: SURB_SIZE,
762            active_clients: entry.get_clients().iter().map(|e| e.key().to_string()).collect(),
763            max_client_sessions: entry.max_client_sessions,
764            max_surb_upstream: entry.max_surb_upstream,
765            response_buffer: entry.response_buffer,
766            session_pool: entry.session_pool,
767        })
768        .collect::<Vec<_>>();
769
770    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
771}
772
773#[serde_as]
774#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
775#[schema(example = json!({
776    "responseBuffer": "2 MB",
777    "maxSurbUpstream": "2 Mbps"
778}))]
779#[serde(rename_all = "camelCase")]
780pub(crate) struct SessionConfig {
781    /// The amount of response data the Session counterparty can deliver back to us,
782    /// without us sending any SURBs to them.
783    ///
784    /// In other words, this size is recalculated to a number of SURBs delivered
785    /// to the counterparty upfront and then maintained.
786    /// The maintenance is dynamic, based on the number of responses we receive.
787    ///
788    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
789    /// at least the size of 2 Session packet payloads.
790    #[serde(default)]
791    #[serde_as(as = "Option<DisplayFromStr>")]
792    #[schema(value_type = String)]
793    pub response_buffer: Option<bytesize::ByteSize>,
794    /// The maximum throughput at which artificial SURBs might be generated and sent
795    /// to the recipient of the Session.
796    ///
797    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
798    /// this should roughly match the maximum retrieval throughput.
799    ///
800    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
801    #[serde(default)]
802    #[serde(with = "human_bandwidth::option")]
803    #[schema(value_type = String)]
804    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
805}
806
807impl From<SessionConfig> for Option<SurbBalancerConfig> {
808    fn from(value: SessionConfig) -> Self {
809        match value.response_buffer {
810            // Buffer worth at least 2 reply packets
811            Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
812                target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
813                max_surbs_per_sec: value
814                    .max_surb_upstream
815                    .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
816                    .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
817                ..Default::default()
818            }),
819            // No additional SURBs are set up and maintained, useful for high-send low-reply sessions
820            Some(_) => None,
821            // Use defaults otherwise
822            None => Some(SurbBalancerConfig::default()),
823        }
824    }
825}
826
827impl From<SurbBalancerConfig> for SessionConfig {
828    fn from(value: SurbBalancerConfig) -> Self {
829        Self {
830            response_buffer: Some(bytesize::ByteSize::b(
831                value.target_surb_buffer_size * SESSION_MTU as u64,
832            )),
833            max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
834                value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
835            )),
836        }
837    }
838}
839
840#[utoipa::path(
841    post,
842    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
843    description = "Updates configuration of an existing active session.",
844    params(
845        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
846    ),
847    request_body(
848            content = SessionConfig,
849            description = "Allows updating of several parameters of an existing active session.",
850            content_type = "application/json"),
851    responses(
852            (status = 204, description = "Successfully updated the configuration"),
853            (status = 400, description = "Invalid configuration.", body = ApiError),
854            (status = 401, description = "Invalid authorization token.", body = ApiError),
855            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
856            (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
857            (status = 422, description = "Unknown failure", body = ApiError),
858    ),
859    security(
860            ("api_token" = []),
861            ("bearer_token" = [])
862    ),
863    tag = "Session"
864)]
865pub(crate) async fn adjust_session(
866    State(state): State<Arc<InternalState>>,
867    Path(session_id): Path<String>,
868    Json(args): Json<SessionConfig>,
869) -> Result<impl IntoResponse, impl IntoResponse> {
870    let session_id = HoprSessionId::from_str(&session_id)
871        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
872
873    if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
874        match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
875            Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
876            Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
877                SessionManagerError::NonExistingSession,
878            )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
879            Err(e) => Err((
880                StatusCode::NOT_ACCEPTABLE,
881                ApiErrorStatus::UnknownFailure(e.to_string()),
882            )),
883        }
884    } else {
885        Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
886    }
887}
888
889#[utoipa::path(
890    get,
891    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
892    description = "Gets configuration of an existing active session.",
893    params(
894        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
895    ),
896    responses(
897            (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
898            (status = 400, description = "Invalid session ID.", body = ApiError),
899            (status = 401, description = "Invalid authorization token.", body = ApiError),
900            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
901            (status = 422, description = "Unknown failure", body = ApiError),
902    ),
903    security(
904            ("api_token" = []),
905            ("bearer_token" = [])
906    ),
907    tag = "Session"
908)]
909pub(crate) async fn session_config(
910    State(state): State<Arc<InternalState>>,
911    Path(session_id): Path<String>,
912) -> Result<impl IntoResponse, impl IntoResponse> {
913    let session_id = HoprSessionId::from_str(&session_id)
914        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
915
916    match state.hopr.get_session_surb_balancer_config(&session_id).await {
917        Ok(Some(cfg)) => {
918            Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
919        }
920        Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
921        Err(e) => Err((
922            StatusCode::UNPROCESSABLE_ENTITY,
923            ApiErrorStatus::UnknownFailure(e.to_string()),
924        )),
925    }
926}
927
928#[derive(
929    Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
930)]
931#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
932#[serde(rename_all = "lowercase")]
933#[schema(example = "tcp")]
934/// IP transport protocol
935pub enum IpProtocol {
936    #[allow(clippy::upper_case_acronyms)]
937    TCP,
938    #[allow(clippy::upper_case_acronyms)]
939    UDP,
940}
941
942impl From<IpProtocol> for hopr_lib::IpProtocol {
943    fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
944        match protocol {
945            IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
946            IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
947        }
948    }
949}
950
951#[serde_as]
952#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
953pub struct SessionCloseClientQuery {
954    #[serde_as(as = "DisplayFromStr")]
955    #[schema(value_type = String, example = "tcp")]
956    /// IP transport protocol
957    pub protocol: IpProtocol,
958
959    /// Listening IP address of the Session.
960    #[schema(example = "127.0.0.1:8545")]
961    pub ip: String,
962
963    /// Session port used for the listener.
964    #[schema(value_type = u16, example = 10101)]
965    pub port: u16,
966}
967
968/// Closes an existing Session listener.
969/// The listener must've been previously created and bound for the given IP protocol.
970/// Once a listener is closed, no more socket connections can be made to it.
971/// If the passed port number is 0, listeners on all ports of the given listening IP and protocol
972/// will be closed.
973#[utoipa::path(
974    delete,
975    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
976    description = "Closes an existing Session listener.",
977    params(SessionCloseClientQuery),
978    responses(
979            (status = 204, description = "Listener closed successfully"),
980            (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
981            (status = 401, description = "Invalid authorization token.", body = ApiError),
982            (status = 404, description = "Listener not found.", body = ApiError),
983            (status = 422, description = "Unknown failure", body = ApiError)
984    ),
985    security(
986            ("api_token" = []),
987            ("bearer_token" = [])
988    ),
989    tag = "Session",
990)]
991pub(crate) async fn close_client(
992    State(state): State<Arc<InternalState>>,
993    Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
994) -> Result<impl IntoResponse, impl IntoResponse> {
995    let listening_ip: IpAddr = ip
996        .parse()
997        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
998
999    {
1000        let mut open_listeners = state.open_listeners.write_arc().await;
1001
1002        let mut to_remove = Vec::new();
1003        let protocol: hopr_lib::IpProtocol = protocol.into();
1004
1005        // Find all listeners with protocol, listening IP and optionally port number (if > 0)
1006        open_listeners
1007            .iter()
1008            .filter(|(ListenerId(proto, addr), _)| {
1009                protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1010            })
1011            .for_each(|(id, _)| to_remove.push(*id));
1012
1013        if to_remove.is_empty() {
1014            return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1015        }
1016
1017        for bound_addr in to_remove {
1018            let entry = open_listeners
1019                .remove(&bound_addr)
1020                .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1021
1022            entry.abort_handle.abort();
1023        }
1024    }
1025
1026    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1027}