hoprd_api/
session.rs

1use std::{fmt::Formatter, hash::Hash, net::IpAddr, str::FromStr, sync::Arc};
2
3use axum::{
4    Error,
5    extract::{
6        Json, Path, State,
7        ws::{Message, WebSocket, WebSocketUpgrade},
8    },
9    http::status::StatusCode,
10    response::IntoResponse,
11};
12use axum_extra::extract::Query;
13use base64::Engine;
14use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt};
15use futures_concurrency::stream::Merge;
16use hopr_lib::{
17    Address, HoprSession, NodeId, SESSION_MTU, SURB_SIZE, ServiceId, SessionCapabilities, SessionClientConfig,
18    SessionId, SessionManagerError, SessionTarget, SurbBalancerConfig, TransportSessionError,
19    errors::{HoprLibError, HoprTransportError},
20    utils::futures::AsyncReadStreamer,
21};
22use hopr_utils_session::{ListenerId, build_binding_host, create_tcp_client_binding, create_udp_client_binding};
23use serde::{Deserialize, Serialize};
24use serde_with::{DisplayFromStr, serde_as};
25use tracing::{debug, error, info, trace};
26
27use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState};
28
29/// Size of the buffer for forwarding data to/from a TCP stream.
30pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
31
32/// Size of the buffer for forwarding data to/from a UDP stream.
33pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
34
35/// Size of the queue (back-pressure) for data incoming from a UDP stream.
36pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
37
38// Imported for some IDEs to not treat the `json!` macro inside the `schema` macro as an error
39#[allow(unused_imports)]
40use serde_json::json;
41
42#[serde_as]
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
44#[schema(
45    example = json!({"Plain": "example.com:80"}),
46    example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), // base64 for "HelloWorld"
47    example = json!({"Service": 0})
48)]
49/// Session target specification.
50pub enum SessionTargetSpec {
51    Plain(String),
52    Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
53    #[schema(value_type = u32)]
54    Service(ServiceId),
55}
56
57impl std::fmt::Display for SessionTargetSpec {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        match self {
60            SessionTargetSpec::Plain(t) => write!(f, "{t}"),
61            SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
62            SessionTargetSpec::Service(t) => write!(f, "#{t}"),
63        }
64    }
65}
66
67impl FromStr for SessionTargetSpec {
68    type Err = HoprLibError;
69
70    fn from_str(s: &str) -> Result<Self, Self::Err> {
71        Ok(if let Some(stripped) = s.strip_prefix("$$") {
72            Self::Sealed(
73                base64::prelude::BASE64_URL_SAFE
74                    .decode(stripped)
75                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
76            )
77        } else if let Some(stripped) = s.strip_prefix("#") {
78            Self::Service(
79                stripped
80                    .parse()
81                    .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
82            )
83        } else {
84            Self::Plain(s.to_owned())
85        })
86    }
87}
88
89impl From<SessionTargetSpec> for hopr_utils_session::SessionTargetSpec {
90    fn from(spec: SessionTargetSpec) -> Self {
91        match spec {
92            SessionTargetSpec::Plain(t) => Self::Plain(t),
93            SessionTargetSpec::Sealed(t) => Self::Sealed(t),
94            SessionTargetSpec::Service(t) => Self::Service(t),
95        }
96    }
97}
98
99#[repr(u8)]
100#[derive(
101    Debug,
102    Clone,
103    strum::EnumIter,
104    strum::Display,
105    strum::EnumString,
106    Serialize,
107    Deserialize,
108    PartialEq,
109    utoipa::ToSchema,
110)]
111#[schema(example = "Segmentation")]
112/// Session capabilities that can be negotiated with the target peer.
113pub enum SessionCapability {
114    /// Frame segmentation
115    Segmentation,
116    /// Frame retransmission (ACK and NACK-based)
117    Retransmission,
118    /// Frame retransmission (only ACK-based)
119    RetransmissionAckOnly,
120    /// Disable packet buffering
121    NoDelay,
122    /// Disable SURB-based egress rate control at the Exit.
123    NoRateControl,
124}
125
126impl From<SessionCapability> for hopr_lib::SessionCapabilities {
127    fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
128        match cap {
129            SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
130            SessionCapability::Retransmission => {
131                hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
132            }
133            SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
134            SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
135            SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
136        }
137    }
138}
139
140#[serde_as]
141#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
142#[serde(rename_all = "camelCase")]
143pub(crate) struct SessionWebsocketClientQueryRequest {
144    #[serde_as(as = "DisplayFromStr")]
145    #[schema(required = true, value_type = String)]
146    pub destination: Address,
147    #[schema(required = true)]
148    pub hops: u8,
149    #[cfg(feature = "explicit-path")]
150    #[schema(required = false, value_type = String)]
151    pub path: Option<Vec<Address>>,
152    #[schema(required = true)]
153    #[serde_as(as = "Vec<DisplayFromStr>")]
154    pub capabilities: Vec<SessionCapability>,
155    #[schema(required = true)]
156    #[serde_as(as = "DisplayFromStr")]
157    pub target: SessionTargetSpec,
158    #[schema(required = false)]
159    #[serde(default = "default_protocol")]
160    pub protocol: IpProtocol,
161}
162
163#[inline]
164fn default_protocol() -> IpProtocol {
165    IpProtocol::TCP
166}
167
168impl SessionWebsocketClientQueryRequest {
169    pub(crate) async fn into_protocol_session_config(
170        self,
171    ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
172        #[cfg(not(feature = "explicit-path"))]
173        let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
174
175        #[cfg(feature = "explicit-path")]
176        let path_options = if let Some(path) = self.path {
177            // Explicit `path` will override `hops`
178            let path = path.into_iter().map(NodeId::from).collect::<Vec<_>>();
179            hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
180        } else {
181            hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
182        };
183
184        let mut capabilities = SessionCapabilities::empty();
185        capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
186
187        let target_spec: hopr_utils_session::SessionTargetSpec = self.target.into();
188
189        Ok((
190            self.destination,
191            target_spec.into_target(self.protocol.into())?,
192            SessionClientConfig {
193                forward_path_options: path_options.clone(),
194                return_path_options: path_options.clone(), // TODO: allow using separate return options
195                capabilities,
196                ..Default::default()
197            },
198        ))
199    }
200}
201
202#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
203#[schema(value_type = String, format = Binary)]
204#[allow(dead_code)] // not dead code, just for codegen
205struct WssData(Vec<u8>);
206
207/// Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR
208/// sessions.
209///
210/// Once configured, the session represents and automatically managed connection to a target peer through a network
211/// routing configuration. The session can be used to send and receive binary data over the network.
212///
213/// Authentication (if enabled) is done by cookie `X-Auth-Token`.
214///
215/// Connect to the endpoint by using a WS client. No preview is available. Example:
216/// `ws://127.0.0.1:3001/api/v4/session/websocket`
217#[allow(dead_code)] // not dead code, just for documentation
218#[utoipa::path(
219        get,
220        path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
221        description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
222        request_body(
223            content = SessionWebsocketClientQueryRequest,
224            content_type = "application/json",
225            description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
226        ),
227        responses(
228            (status = 200, description = "Successfully created a new client websocket session."),
229            (status = 401, description = "Invalid authorization token.", body = ApiError),
230            (status = 422, description = "Unknown failure", body = ApiError),
231            (status = 429, description = "Too many open websocket connections.", body = ApiError),
232        ),
233        security(
234            ("api_token" = []),
235            ("bearer_token" = [])
236        ),
237        tag = "Session",
238    )]
239
240pub(crate) async fn websocket(
241    ws: WebSocketUpgrade,
242    Query(query): Query<SessionWebsocketClientQueryRequest>,
243    State(state): State<Arc<InternalState>>,
244) -> Result<impl IntoResponse, impl IntoResponse> {
245    let (dst, target, data) = query
246        .into_protocol_session_config()
247        .await
248        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
249
250    let hopr = state.hopr.clone();
251    let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
252        error!(error = %e, "Failed to establish session");
253        (
254            StatusCode::UNPROCESSABLE_ENTITY,
255            ApiErrorStatus::UnknownFailure(e.to_string()),
256        )
257    })?;
258
259    Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
260}
261
262enum WebSocketInput {
263    Network(Result<Box<[u8]>, std::io::Error>),
264    WsInput(Result<Message, Error>),
265}
266
267/// The maximum number of bytes read from a Session that WS can transfer within a single message.
268const WS_MAX_SESSION_READ_SIZE: usize = 4096;
269
270#[tracing::instrument(level = "debug", skip(socket, session))]
271async fn websocket_connection(socket: WebSocket, session: HoprSession) {
272    let session_id = *session.id();
273
274    let (rx, mut tx) = session.split();
275    let (mut sender, receiver) = socket.split();
276
277    let mut queue = (
278        receiver.map(WebSocketInput::WsInput),
279        AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
280    )
281        .merge();
282
283    let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
284
285    while let Some(v) = queue.next().await {
286        match v {
287            WebSocketInput::Network(bytes) => match bytes {
288                Ok(bytes) => {
289                    let len = bytes.len();
290                    if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
291                        error!(
292                            error = %e,
293                            "Failed to emit read data onto the websocket, closing connection"
294                        );
295                        break;
296                    };
297                    bytes_from_session += len;
298                }
299                Err(e) => {
300                    error!(
301                        error = %e,
302                        "Failed to push data from network to socket, closing connection"
303                    );
304                    break;
305                }
306            },
307            WebSocketInput::WsInput(ws_in) => match ws_in {
308                Ok(Message::Binary(data)) => {
309                    let len = data.len();
310                    if let Err(e) = tx.write(data.as_ref()).await {
311                        error!(error = %e, "Failed to write data to the session, closing connection");
312                        break;
313                    }
314                    bytes_to_session += len;
315                }
316                Ok(Message::Text(_)) => {
317                    error!("Received string instead of binary data, closing connection");
318                    break;
319                }
320                Ok(Message::Close(_)) => {
321                    debug!("Received close frame, closing connection");
322                    break;
323                }
324                Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
325                Err(e) => {
326                    error!(error = %e, "Failed to get a valid websocket message, closing connection");
327                    break;
328                }
329            },
330        }
331    }
332
333    info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
334}
335
336#[serde_as]
337#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
338#[schema(example = json!({ "Hops": 1 }))]
339/// Routing options for the Session.
340pub enum RoutingOptions {
341    #[cfg(feature = "explicit-path")]
342    #[schema(value_type = Vec<String>)]
343    IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<NodeId>),
344    Hops(usize),
345}
346
347impl RoutingOptions {
348    pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
349        Ok(match self {
350            #[cfg(feature = "explicit-path")]
351            RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
352            RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
353        })
354    }
355}
356
357impl From<hopr_lib::RoutingOptions> for RoutingOptions {
358    fn from(opts: hopr_lib::RoutingOptions) -> Self {
359        match opts {
360            #[cfg(feature = "explicit-path")]
361            hopr_lib::RoutingOptions::IntermediatePath(path) => {
362                RoutingOptions::IntermediatePath(path.into_iter().collect())
363            }
364            hopr_lib::RoutingOptions::Hops(hops) => RoutingOptions::Hops(usize::from(hops)),
365        }
366    }
367}
368
369#[serde_as]
370#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
371#[schema(example = json!({
372        "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
373        "forwardPath": { "Hops": 1 },
374        "returnPath": { "Hops": 1 },
375        "target": {"Plain": "localhost:8080"},
376        "listenHost": "127.0.0.1:10000",
377        "capabilities": ["Retransmission", "Segmentation"],
378        "responseBuffer": "2 MB",
379        "maxSurbUpstream": "2000 kb/s",
380        "sessionPool": 0,
381        "maxClientSessions": 2
382    }))]
383#[serde(rename_all = "camelCase")]
384/// Request body for creating a new client session.
385pub(crate) struct SessionClientRequest {
386    /// Address of the Exit node.
387    #[serde_as(as = "DisplayFromStr")]
388    #[schema(value_type = String)]
389    pub destination: Address,
390    /// The forward path for the Session.
391    pub forward_path: RoutingOptions,
392    /// The return path for the Session.
393    pub return_path: RoutingOptions,
394    /// Target for the Session.
395    pub target: SessionTargetSpec,
396    /// Listen host (`ip:port`) for the Session socket at the Entry node.
397    ///
398    /// Supports also partial specification (only `ip` or only `:port`) with the
399    /// respective part replaced by the node's configured default.
400    pub listen_host: Option<String>,
401    #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
402    /// Capabilities for the Session protocol.
403    ///
404    /// Defaults to `Segmentation` and `Retransmission` for TCP and nothing for UDP.
405    pub capabilities: Option<Vec<SessionCapability>>,
406    /// The amount of response data the Session counterparty can deliver back to us,
407    /// without us sending any SURBs to them.
408    ///
409    /// In other words, this size is recalculated to a number of SURBs delivered
410    /// to the counterparty upfront and then maintained.
411    /// The maintenance is dynamic, based on the number of responses we receive.
412    ///
413    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
414    /// at least the size of 2 Session packet payloads.
415    #[serde_as(as = "Option<DisplayFromStr>")]
416    #[schema(value_type = Option<String>)]
417    pub response_buffer: Option<bytesize::ByteSize>,
418    /// The maximum throughput at which artificial SURBs might be generated and sent
419    /// to the recipient of the Session.
420    ///
421    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
422    /// this should roughly match the maximum retrieval throughput.
423    ///
424    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
425    #[serde(default)]
426    #[serde(with = "human_bandwidth::option")]
427    #[schema(value_type = Option<String>)]
428    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
429    /// How many Sessions to pool for clients.
430    ///
431    /// If no sessions are pooled, they will be opened ad-hoc when a client connects.
432    /// It has no effect on UDP sessions in the current implementation.
433    ///
434    /// Currently, the maximum value is 5.
435    pub session_pool: Option<usize>,
436    /// The maximum number of client sessions that the listener can spawn.
437    ///
438    /// This currently applies only to the TCP sessions, as UDP sessions cannot
439    /// handle multiple clients (and spawn therefore always only a single session).
440    ///
441    /// If this value is smaller than the value specified in `session_pool`, it will
442    /// be set to that value.
443    ///
444    /// The default value is 5.
445    pub max_client_sessions: Option<usize>,
446}
447
448impl SessionClientRequest {
449    pub(crate) async fn into_protocol_session_config(
450        self,
451        target_protocol: IpProtocol,
452    ) -> Result<(hopr_lib::Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
453        let target_spec: hopr_utils_session::SessionTargetSpec = self.target.clone().into();
454        Ok((
455            self.destination,
456            target_spec.into_target(target_protocol.into())?,
457            SessionClientConfig {
458                forward_path_options: self.forward_path.resolve().await?,
459                return_path_options: self.return_path.resolve().await?,
460                capabilities: self
461                    .capabilities
462                    .map(|vs| {
463                        let mut caps = SessionCapabilities::empty();
464                        caps.extend(vs.into_iter().map(SessionCapabilities::from));
465                        caps
466                    })
467                    .unwrap_or_else(|| match target_protocol {
468                        IpProtocol::TCP => {
469                            hopr_lib::SessionCapability::RetransmissionAck
470                                | hopr_lib::SessionCapability::RetransmissionNack
471                                | hopr_lib::SessionCapability::Segmentation
472                        }
473                        // Only Segmentation capability for UDP per default
474                        _ => SessionCapability::Segmentation.into(),
475                    }),
476                surb_management: SessionConfig {
477                    response_buffer: self.response_buffer,
478                    max_surb_upstream: self.max_surb_upstream,
479                }
480                .into(),
481                ..Default::default()
482            },
483        ))
484    }
485}
486
487#[serde_as]
488#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
489#[schema(example = json!({
490        "target": "example.com:80",
491        "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
492        "forwardPath": { "Hops": 1 },
493        "returnPath": { "Hops": 1 },
494        "protocol": "tcp",
495        "ip": "127.0.0.1",
496        "port": 5542,
497        "hoprMtu": 1002,
498        "surbLen": 398,
499        "activeClients": [],
500        "maxClientSessions": 2,
501        "maxSurbUpstream": "2000 kb/s",
502        "responseBuffer": "2 MB",
503        "sessionPool": 0
504    }))]
505#[serde(rename_all = "camelCase")]
506/// Response body for creating a new client session.
507pub(crate) struct SessionClientResponse {
508    #[schema(example = "example.com:80")]
509    /// Target of the Session.
510    pub target: String,
511    /// Destination node (exit node) of the Session.
512    #[serde_as(as = "DisplayFromStr")]
513    #[schema(value_type = String)]
514    pub destination: Address,
515    /// Forward routing path.
516    pub forward_path: RoutingOptions,
517    /// Return routing path.
518    pub return_path: RoutingOptions,
519    /// IP protocol used by Session's listening socket.
520    #[serde_as(as = "DisplayFromStr")]
521    #[schema(example = "tcp")]
522    pub protocol: IpProtocol,
523    /// Listening IP address of the Session's socket.
524    #[schema(example = "127.0.0.1")]
525    pub ip: String,
526    #[schema(example = 5542)]
527    /// Listening port of the Session's socket.
528    pub port: u16,
529    /// MTU used by the underlying HOPR transport.
530    pub hopr_mtu: usize,
531    /// Size of a Single Use Reply Block used by the protocol.
532    ///
533    /// This is useful for SURB balancing calculations.
534    pub surb_len: usize,
535    /// Lists Session IDs of all active clients.
536    ///
537    /// Can contain multiple entries on TCP sessions, but currently
538    /// always only a single entry on UDP sessions.
539    pub active_clients: Vec<String>,
540    /// The maximum number of client sessions that the listener can spawn.
541    ///
542    /// This currently applies only to the TCP sessions, as UDP sessions cannot
543    /// have multiple clients (defaults to 1 for UDP).
544    pub max_client_sessions: usize,
545    /// The maximum throughput at which artificial SURBs might be generated and sent
546    /// to the recipient of the Session.    
547    #[serde(default)]
548    #[serde(with = "human_bandwidth::option")]
549    #[schema(value_type = Option<String>)]
550    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
551    /// The amount of response data the Session counterparty can deliver back to us, without us
552    /// sending any SURBs to them.
553    #[serde_as(as = "Option<DisplayFromStr>")]
554    #[schema(value_type = Option<String>)]
555    pub response_buffer: Option<bytesize::ByteSize>,
556    /// How many Sessions to pool for clients.
557    pub session_pool: Option<usize>,
558}
559
560/// Creates a new client session returning the given session listening host and port over TCP or UDP.
561/// If no listening port is given in the request, the socket will be bound to a random free
562/// port and returned in the response.
563/// Different capabilities can be configured for the session, such as data segmentation or
564/// retransmission.
565///
566/// Once the host and port are bound, it is possible to use the socket for bidirectional read/write
567/// communication over the selected IP protocol and HOPR network routing with the given destination.
568/// The destination HOPR node forwards all the data to the given target over the selected IP protocol.
569///
570/// Various services require different types of socket communications:
571/// - services running over UDP usually do not require data retransmission, as it is already expected
572/// that UDP does not provide these and is therefore handled at the application layer.
573/// - On the contrary, services running over TCP *almost always* expect data segmentation and
574/// retransmission capabilities, so these should be configured while creating a session that passes
575/// TCP data.
576#[utoipa::path(
577        post,
578        path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
579        description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
580        params(
581            ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
582        ),
583        request_body(
584            content = SessionClientRequest,
585            description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
586            content_type = "application/json"),
587        responses(
588            (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
589            (status = 400, description = "Invalid IP protocol.", body = ApiError),
590            (status = 401, description = "Invalid authorization token.", body = ApiError),
591            (status = 409, description = "Listening address and port already in use.", body = ApiError),
592            (status = 422, description = "Unknown failure", body = ApiError),
593        ),
594        security(
595            ("api_token" = []),
596            ("bearer_token" = [])
597        ),
598        tag = "Session"
599    )]
600pub(crate) async fn create_client(
601    State(state): State<Arc<InternalState>>,
602    Path(protocol): Path<IpProtocol>,
603    Json(args): Json<SessionClientRequest>,
604) -> Result<impl IntoResponse, impl IntoResponse> {
605    let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
606
607    let listener_id = ListenerId(protocol.into(), bind_host);
608    if bind_host.port() > 0 && state.open_listeners.0.contains_key(&listener_id) {
609        return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
610    }
611
612    let port_range = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE).ok();
613    debug!("binding {protocol} session listening socket to {bind_host} (port range limitations: {port_range:?})");
614
615    let (bound_host, udp_session_id, max_clients) = match protocol {
616        IpProtocol::TCP => {
617            let session_pool = args.session_pool;
618            let max_client_sessions = args.max_client_sessions;
619            let target_spec: hopr_utils_session::SessionTargetSpec = args.target.clone().into();
620            let (destination, _target, config) = args
621                .clone()
622                .into_protocol_session_config(IpProtocol::TCP)
623                .await
624                .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
625
626            create_tcp_client_binding(
627                bind_host,
628                port_range,
629                state.hopr.clone(),
630                state.open_listeners.clone(),
631                destination,
632                target_spec,
633                config,
634                session_pool,
635                max_client_sessions,
636            )
637            .await
638            .map_err(|e| match e {
639                hopr_utils_session::BindError::ListenHostAlreadyUsed => {
640                    (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
641                }
642                hopr_utils_session::BindError::UnknownFailure(_) => (
643                    StatusCode::UNPROCESSABLE_ENTITY,
644                    ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
645                ),
646            })?
647        }
648        IpProtocol::UDP => {
649            let target_spec: hopr_utils_session::SessionTargetSpec = args.target.clone().into();
650            let (destination, _target, config) = args
651                .clone()
652                .into_protocol_session_config(IpProtocol::UDP)
653                .await
654                .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
655
656            create_udp_client_binding(
657                bind_host,
658                port_range,
659                state.hopr.clone(),
660                state.open_listeners.clone(),
661                destination,
662                target_spec,
663                config,
664            )
665            .await
666            .map_err(|e| match e {
667                hopr_utils_session::BindError::ListenHostAlreadyUsed => {
668                    (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
669                }
670                hopr_utils_session::BindError::UnknownFailure(_) => (
671                    StatusCode::UNPROCESSABLE_ENTITY,
672                    ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
673                ),
674            })?
675        }
676    };
677
678    Ok::<_, (StatusCode, ApiErrorStatus)>(
679        (
680            StatusCode::OK,
681            Json(SessionClientResponse {
682                protocol,
683                ip: bound_host.ip().to_string(),
684                port: bound_host.port(),
685                target: args.target.to_string(),
686                destination: args.destination,
687                forward_path: args.forward_path.clone(),
688                return_path: args.return_path.clone(),
689                hopr_mtu: SESSION_MTU,
690                surb_len: SURB_SIZE,
691                active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
692                max_client_sessions: max_clients,
693                max_surb_upstream: args.max_surb_upstream,
694                response_buffer: args.response_buffer,
695                session_pool: args.session_pool,
696            }),
697        )
698            .into_response(),
699    )
700}
701
702/// Lists existing Session listeners for the given IP protocol.
703#[utoipa::path(
704    get,
705    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
706    description = "Lists existing Session listeners for the given IP protocol.",
707    params(
708        ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
709    ),
710    responses(
711        (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
712            {
713                "target": "example.com:80",
714                "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
715                "forwardPath": { "Hops": 1 },
716                "returnPath": { "Hops": 1 },
717                "protocol": "tcp",
718                "ip": "127.0.0.1",
719                "port": 5542,
720                "surbLen": 400,
721                "hoprMtu": 1020,
722                "activeClients": [],
723                "maxClientSessions": 2,
724                "maxSurbUpstream": "2000 kb/s",
725                "responseBuffer": "2 MB",
726                "sessionPool": 0
727            }
728        ])),
729        (status = 400, description = "Invalid IP protocol.", body = ApiError),
730        (status = 401, description = "Invalid authorization token.", body = ApiError),
731        (status = 422, description = "Unknown failure", body = ApiError)
732    ),
733    security(
734        ("api_token" = []),
735        ("bearer_token" = [])
736    ),
737    tag = "Session",
738)]
739pub(crate) async fn list_clients(
740    State(state): State<Arc<InternalState>>,
741    Path(protocol): Path<IpProtocol>,
742) -> Result<impl IntoResponse, impl IntoResponse> {
743    let response = state
744        .open_listeners
745        .0
746        .iter()
747        .filter(|v| v.key().0 == protocol.into())
748        .map(|v| {
749            let ListenerId(_, addr) = *v.key();
750            let entry = v.value();
751            SessionClientResponse {
752                protocol,
753                ip: addr.ip().to_string(),
754                port: addr.port(),
755                target: entry.target.to_string(),
756                forward_path: entry.forward_path.clone().into(),
757                return_path: entry.return_path.clone().into(),
758                destination: entry.destination,
759                hopr_mtu: SESSION_MTU,
760                surb_len: SURB_SIZE,
761                active_clients: entry.get_clients().iter().map(|e| e.key().to_string()).collect(),
762                max_client_sessions: entry.max_client_sessions,
763                max_surb_upstream: entry.max_surb_upstream,
764                response_buffer: entry.response_buffer,
765                session_pool: entry.session_pool,
766            }
767        })
768        .collect::<Vec<_>>();
769
770    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
771}
772
773#[serde_as]
774#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
775#[schema(example = json!({
776    "responseBuffer": "2 MB",
777    "maxSurbUpstream": "2 Mbps"
778}))]
779#[serde(rename_all = "camelCase")]
780pub(crate) struct SessionConfig {
781    /// The amount of response data the Session counterparty can deliver back to us,
782    /// without us sending any SURBs to them.
783    ///
784    /// In other words, this size is recalculated to a number of SURBs delivered
785    /// to the counterparty upfront and then maintained.
786    /// The maintenance is dynamic, based on the number of responses we receive.
787    ///
788    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
789    /// at least the size of 2 Session packet payloads.
790    #[serde(default)]
791    #[serde_as(as = "Option<DisplayFromStr>")]
792    #[schema(value_type = String)]
793    pub response_buffer: Option<bytesize::ByteSize>,
794    /// The maximum throughput at which artificial SURBs might be generated and sent
795    /// to the recipient of the Session.
796    ///
797    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
798    /// this should roughly match the maximum retrieval throughput.
799    ///
800    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
801    #[serde(default)]
802    #[serde(with = "human_bandwidth::option")]
803    #[schema(value_type = String)]
804    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
805}
806
807impl From<SessionConfig> for Option<SurbBalancerConfig> {
808    fn from(value: SessionConfig) -> Self {
809        match value.response_buffer {
810            // Buffer worth at least 2 reply packets
811            Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
812                target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
813                max_surbs_per_sec: value
814                    .max_surb_upstream
815                    .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
816                    .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
817                ..Default::default()
818            }),
819            // No additional SURBs are set up and maintained, useful for high-send low-reply sessions
820            Some(_) => None,
821            // Use defaults otherwise
822            None => Some(SurbBalancerConfig::default()),
823        }
824    }
825}
826
827impl From<SurbBalancerConfig> for SessionConfig {
828    fn from(value: SurbBalancerConfig) -> Self {
829        Self {
830            response_buffer: Some(bytesize::ByteSize::b(
831                value.target_surb_buffer_size * SESSION_MTU as u64,
832            )),
833            max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
834                value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
835            )),
836        }
837    }
838}
839
840#[utoipa::path(
841    post,
842    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
843    description = "Updates configuration of an existing active session.",
844    params(
845        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
846    ),
847    request_body(
848            content = SessionConfig,
849            description = "Allows updating of several parameters of an existing active session.",
850            content_type = "application/json"),
851    responses(
852            (status = 204, description = "Successfully updated the configuration"),
853            (status = 400, description = "Invalid configuration.", body = ApiError),
854            (status = 401, description = "Invalid authorization token.", body = ApiError),
855            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
856            (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
857            (status = 422, description = "Unknown failure", body = ApiError),
858    ),
859    security(
860            ("api_token" = []),
861            ("bearer_token" = [])
862    ),
863    tag = "Session"
864)]
865pub(crate) async fn adjust_session(
866    State(state): State<Arc<InternalState>>,
867    Path(session_id): Path<String>,
868    Json(args): Json<SessionConfig>,
869) -> Result<impl IntoResponse, impl IntoResponse> {
870    let session_id =
871        SessionId::from_str(&session_id).map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
872
873    if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
874        match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
875            Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
876            Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
877                SessionManagerError::NonExistingSession,
878            )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
879            Err(e) => Err((
880                StatusCode::NOT_ACCEPTABLE,
881                ApiErrorStatus::UnknownFailure(e.to_string()),
882            )),
883        }
884    } else {
885        Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
886    }
887}
888
889#[utoipa::path(
890    get,
891    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
892    description = "Gets configuration of an existing active session.",
893    params(
894        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
895    ),
896    responses(
897            (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
898            (status = 400, description = "Invalid session ID.", body = ApiError),
899            (status = 401, description = "Invalid authorization token.", body = ApiError),
900            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
901            (status = 422, description = "Unknown failure", body = ApiError),
902    ),
903    security(
904            ("api_token" = []),
905            ("bearer_token" = [])
906    ),
907    tag = "Session"
908)]
909pub(crate) async fn session_config(
910    State(state): State<Arc<InternalState>>,
911    Path(session_id): Path<String>,
912) -> Result<impl IntoResponse, impl IntoResponse> {
913    let session_id =
914        SessionId::from_str(&session_id).map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
915
916    match state.hopr.get_session_surb_balancer_config(&session_id).await {
917        Ok(Some(cfg)) => {
918            Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
919        }
920        Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
921        Err(e) => Err((
922            StatusCode::UNPROCESSABLE_ENTITY,
923            ApiErrorStatus::UnknownFailure(e.to_string()),
924        )),
925    }
926}
927
928#[derive(
929    Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
930)]
931#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
932#[serde(rename_all = "lowercase")]
933#[schema(example = "tcp")]
934/// IP transport protocol
935pub enum IpProtocol {
936    #[allow(clippy::upper_case_acronyms)]
937    TCP,
938    #[allow(clippy::upper_case_acronyms)]
939    UDP,
940}
941
942impl From<IpProtocol> for hopr_lib::IpProtocol {
943    fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
944        match protocol {
945            IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
946            IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
947        }
948    }
949}
950
951#[serde_as]
952#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
953pub struct SessionCloseClientQuery {
954    #[serde_as(as = "DisplayFromStr")]
955    #[schema(value_type = String, example = "tcp")]
956    /// IP transport protocol
957    pub protocol: IpProtocol,
958
959    /// Listening IP address of the Session.
960    #[schema(example = "127.0.0.1:8545")]
961    pub ip: String,
962
963    /// Session port used for the listener.
964    #[schema(value_type = u16, example = 10101)]
965    pub port: u16,
966}
967
968/// Closes an existing Session listener.
969/// The listener must've been previously created and bound for the given IP protocol.
970/// Once a listener is closed, no more socket connections can be made to it.
971/// If the passed port number is 0, listeners on all ports of the given listening IP and protocol
972/// will be closed.
973#[utoipa::path(
974    delete,
975    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
976    description = "Closes an existing Session listener.",
977    params(SessionCloseClientQuery),
978    responses(
979            (status = 204, description = "Listener closed successfully"),
980            (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
981            (status = 401, description = "Invalid authorization token.", body = ApiError),
982            (status = 404, description = "Listener not found.", body = ApiError),
983            (status = 422, description = "Unknown failure", body = ApiError)
984    ),
985    security(
986            ("api_token" = []),
987            ("bearer_token" = [])
988    ),
989    tag = "Session",
990)]
991pub(crate) async fn close_client(
992    State(state): State<Arc<InternalState>>,
993    Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
994) -> Result<impl IntoResponse, impl IntoResponse> {
995    let listening_ip: IpAddr = ip
996        .parse()
997        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
998
999    {
1000        let open_listeners = &state.open_listeners.0;
1001
1002        let mut to_remove = Vec::new();
1003        let protocol: hopr_lib::IpProtocol = protocol.into();
1004
1005        // Find all listeners with protocol, listening IP and optionally port number (if > 0)
1006        open_listeners
1007            .iter()
1008            .filter(|v| {
1009                let ListenerId(proto, addr) = v.key();
1010                protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1011            })
1012            .for_each(|v| to_remove.push(*v.key()));
1013
1014        if to_remove.is_empty() {
1015            return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1016        }
1017
1018        for bound_addr in to_remove {
1019            let (_, entry) = open_listeners
1020                .remove(&bound_addr)
1021                .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1022
1023            entry.abort_handle.abort();
1024        }
1025    }
1026
1027    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1028}