hoprd_api/
session.rs

1use std::{fmt::Formatter, hash::Hash, net::IpAddr, str::FromStr, sync::Arc};
2
3use axum::{
4    Error,
5    extract::{
6        Json, Path, State,
7        ws::{Message, WebSocket, WebSocketUpgrade},
8    },
9    http::status::StatusCode,
10    response::IntoResponse,
11};
12use axum_extra::extract::Query;
13use base64::Engine;
14use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt};
15use futures_concurrency::stream::Merge;
16use hopr_lib::{
17    Address, HoprSession, HoprSessionId, HoprTransportError, SESSION_MTU, SURB_SIZE, ServiceId, SessionCapabilities,
18    SessionClientConfig, SessionManagerError, SessionTarget, SurbBalancerConfig, TransportSessionError,
19    errors::HoprLibError,
20    utils::{
21        futures::AsyncReadStreamer,
22        session::{ListenerId, build_binding_host, create_tcp_client_binding, create_udp_client_binding},
23    },
24};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use tracing::{debug, error, info, trace};
28
29use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState};
30
31/// Size of the buffer for forwarding data to/from a TCP stream.
32pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
33
34/// Size of the buffer for forwarding data to/from a UDP stream.
35pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
36
37/// Size of the queue (back-pressure) for data incoming from a UDP stream.
38pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
39
40// Imported for some IDEs to not treat the `json!` macro inside the `schema` macro as an error
41#[allow(unused_imports)]
42use serde_json::json;
43
44#[serde_as]
45#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
46#[schema(
47    example = json!({"Plain": "example.com:80"}),
48    example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), // base64 for "HelloWorld"
49    example = json!({"Service": 0})
50)]
51/// Session target specification.
52pub enum SessionTargetSpec {
53    Plain(String),
54    Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
55    #[schema(value_type = u32)]
56    Service(ServiceId),
57}
58
59impl std::fmt::Display for SessionTargetSpec {
60    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61        match self {
62            SessionTargetSpec::Plain(t) => write!(f, "{t}"),
63            SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
64            SessionTargetSpec::Service(t) => write!(f, "#{t}"),
65        }
66    }
67}
68
69impl FromStr for SessionTargetSpec {
70    type Err = HoprLibError;
71
72    fn from_str(s: &str) -> Result<Self, Self::Err> {
73        Ok(if let Some(stripped) = s.strip_prefix("$$") {
74            Self::Sealed(
75                base64::prelude::BASE64_URL_SAFE
76                    .decode(stripped)
77                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
78            )
79        } else if let Some(stripped) = s.strip_prefix("#") {
80            Self::Service(
81                stripped
82                    .parse()
83                    .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
84            )
85        } else {
86            Self::Plain(s.to_owned())
87        })
88    }
89}
90
91impl From<SessionTargetSpec> for hopr_lib::utils::session::SessionTargetSpec {
92    fn from(spec: SessionTargetSpec) -> Self {
93        match spec {
94            SessionTargetSpec::Plain(t) => Self::Plain(t),
95            SessionTargetSpec::Sealed(t) => Self::Sealed(t),
96            SessionTargetSpec::Service(t) => Self::Service(t),
97        }
98    }
99}
100
101#[repr(u8)]
102#[derive(
103    Debug,
104    Clone,
105    strum::EnumIter,
106    strum::Display,
107    strum::EnumString,
108    Serialize,
109    Deserialize,
110    PartialEq,
111    utoipa::ToSchema,
112)]
113#[schema(example = "Segmentation")]
114/// Session capabilities that can be negotiated with the target peer.
115pub enum SessionCapability {
116    /// Frame segmentation
117    Segmentation,
118    /// Frame retransmission (ACK and NACK-based)
119    Retransmission,
120    /// Frame retransmission (only ACK-based)
121    RetransmissionAckOnly,
122    /// Disable packet buffering
123    NoDelay,
124    /// Disable SURB-based egress rate control at the Exit.
125    NoRateControl,
126}
127
128impl From<SessionCapability> for hopr_lib::SessionCapabilities {
129    fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
130        match cap {
131            SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
132            SessionCapability::Retransmission => {
133                hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
134            }
135            SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
136            SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
137            SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
138        }
139    }
140}
141
142#[serde_as]
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
144#[serde(rename_all = "camelCase")]
145pub(crate) struct SessionWebsocketClientQueryRequest {
146    #[serde_as(as = "DisplayFromStr")]
147    #[schema(required = true, value_type = String)]
148    pub destination: Address,
149    #[schema(required = true)]
150    pub hops: u8,
151    #[cfg(feature = "explicit-path")]
152    #[schema(required = false, value_type = String)]
153    pub path: Option<Vec<Address>>,
154    #[schema(required = true)]
155    #[serde_as(as = "Vec<DisplayFromStr>")]
156    pub capabilities: Vec<SessionCapability>,
157    #[schema(required = true)]
158    #[serde_as(as = "DisplayFromStr")]
159    pub target: SessionTargetSpec,
160    #[schema(required = false)]
161    #[serde(default = "default_protocol")]
162    pub protocol: IpProtocol,
163}
164
165#[inline]
166fn default_protocol() -> IpProtocol {
167    IpProtocol::TCP
168}
169
170impl SessionWebsocketClientQueryRequest {
171    pub(crate) async fn into_protocol_session_config(
172        self,
173    ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
174        #[cfg(not(feature = "explicit-path"))]
175        let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
176
177        #[cfg(feature = "explicit-path")]
178        let path_options = if let Some(path) = self.path {
179            // Explicit `path` will override `hops`
180            hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
181        } else {
182            hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
183        };
184
185        let mut capabilities = SessionCapabilities::empty();
186        capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
187
188        let target_spec: hopr_lib::utils::session::SessionTargetSpec = self.target.into();
189
190        Ok((
191            self.destination,
192            target_spec.into_target(self.protocol.into())?,
193            SessionClientConfig {
194                forward_path_options: path_options.clone(),
195                return_path_options: path_options.clone(), // TODO: allow using separate return options
196                capabilities,
197                ..Default::default()
198            },
199        ))
200    }
201}
202
203#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
204#[schema(value_type = String, format = Binary)]
205#[allow(dead_code)] // not dead code, just for codegen
206struct WssData(Vec<u8>);
207
208/// Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR
209/// sessions.
210///
211/// Once configured, the session represents and automatically managed connection to a target peer through a network
212/// routing configuration. The session can be used to send and receive binary data over the network.
213///
214/// Authentication (if enabled) is done by cookie `X-Auth-Token`.
215///
216/// Connect to the endpoint by using a WS client. No preview is available. Example:
217/// `ws://127.0.0.1:3001/api/v4/session/websocket`
218#[allow(dead_code)] // not dead code, just for documentation
219#[utoipa::path(
220        get,
221        path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
222        description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
223        request_body(
224            content = SessionWebsocketClientQueryRequest,
225            content_type = "application/json",
226            description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
227        ),
228        responses(
229            (status = 200, description = "Successfully created a new client websocket session."),
230            (status = 401, description = "Invalid authorization token.", body = ApiError),
231            (status = 422, description = "Unknown failure", body = ApiError),
232            (status = 429, description = "Too many open websocket connections.", body = ApiError),
233        ),
234        security(
235            ("api_token" = []),
236            ("bearer_token" = [])
237        ),
238        tag = "Session",
239    )]
240
241pub(crate) async fn websocket(
242    ws: WebSocketUpgrade,
243    Query(query): Query<SessionWebsocketClientQueryRequest>,
244    State(state): State<Arc<InternalState>>,
245) -> Result<impl IntoResponse, impl IntoResponse> {
246    let (dst, target, data) = query
247        .into_protocol_session_config()
248        .await
249        .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
250
251    let hopr = state.hopr.clone();
252    let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
253        error!(error = %e, "Failed to establish session");
254        (
255            StatusCode::UNPROCESSABLE_ENTITY,
256            ApiErrorStatus::UnknownFailure(e.to_string()),
257        )
258    })?;
259
260    Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
261}
262
263enum WebSocketInput {
264    Network(Result<Box<[u8]>, std::io::Error>),
265    WsInput(Result<Message, Error>),
266}
267
268/// The maximum number of bytes read from a Session that WS can transfer within a single message.
269const WS_MAX_SESSION_READ_SIZE: usize = 4096;
270
271#[tracing::instrument(level = "debug", skip(socket, session))]
272async fn websocket_connection(socket: WebSocket, session: HoprSession) {
273    let session_id = *session.id();
274
275    let (rx, mut tx) = session.split();
276    let (mut sender, receiver) = socket.split();
277
278    let mut queue = (
279        receiver.map(WebSocketInput::WsInput),
280        AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
281    )
282        .merge();
283
284    let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
285
286    while let Some(v) = queue.next().await {
287        match v {
288            WebSocketInput::Network(bytes) => match bytes {
289                Ok(bytes) => {
290                    let len = bytes.len();
291                    if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
292                        error!(
293                            error = %e,
294                            "Failed to emit read data onto the websocket, closing connection"
295                        );
296                        break;
297                    };
298                    bytes_from_session += len;
299                }
300                Err(e) => {
301                    error!(
302                        error = %e,
303                        "Failed to push data from network to socket, closing connection"
304                    );
305                    break;
306                }
307            },
308            WebSocketInput::WsInput(ws_in) => match ws_in {
309                Ok(Message::Binary(data)) => {
310                    let len = data.len();
311                    if let Err(e) = tx.write(data.as_ref()).await {
312                        error!(error = %e, "Failed to write data to the session, closing connection");
313                        break;
314                    }
315                    bytes_to_session += len;
316                }
317                Ok(Message::Text(_)) => {
318                    error!("Received string instead of binary data, closing connection");
319                    break;
320                }
321                Ok(Message::Close(_)) => {
322                    debug!("Received close frame, closing connection");
323                    break;
324                }
325                Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
326                Err(e) => {
327                    error!(error = %e, "Failed to get a valid websocket message, closing connection");
328                    break;
329                }
330            },
331        }
332    }
333
334    info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
335}
336
337#[serde_as]
338#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
339#[schema(example = json!({ "Hops": 1 }))]
340/// Routing options for the Session.
341pub enum RoutingOptions {
342    #[cfg(feature = "explicit-path")]
343    #[schema(value_type = Vec<String>)]
344    IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<Address>),
345    Hops(usize),
346}
347
348impl RoutingOptions {
349    pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
350        Ok(match self {
351            #[cfg(feature = "explicit-path")]
352            RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
353            RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
354        })
355    }
356}
357
358impl From<hopr_lib::RoutingOptions> for RoutingOptions {
359    fn from(opts: hopr_lib::RoutingOptions) -> Self {
360        match opts {
361            #[cfg(feature = "explicit-path")]
362            hopr_lib::RoutingOptions::IntermediatePath(path) => {
363                RoutingOptions::IntermediatePath(path.into_iter().collect())
364            }
365            hopr_lib::RoutingOptions::Hops(hops) => RoutingOptions::Hops(usize::from(hops)),
366        }
367    }
368}
369
370#[serde_as]
371#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
372#[schema(example = json!({
373        "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
374        "forwardPath": { "Hops": 1 },
375        "returnPath": { "Hops": 1 },
376        "target": {"Plain": "localhost:8080"},
377        "listenHost": "127.0.0.1:10000",
378        "capabilities": ["Retransmission", "Segmentation"],
379        "responseBuffer": "2 MB",
380        "maxSurbUpstream": "2000 kb/s",
381        "sessionPool": 0,
382        "maxClientSessions": 2
383    }))]
384#[serde(rename_all = "camelCase")]
385/// Request body for creating a new client session.
386pub(crate) struct SessionClientRequest {
387    /// Address of the Exit node.
388    #[serde_as(as = "DisplayFromStr")]
389    #[schema(value_type = String)]
390    pub destination: Address,
391    /// The forward path for the Session.
392    pub forward_path: RoutingOptions,
393    /// The return path for the Session.
394    pub return_path: RoutingOptions,
395    /// Target for the Session.
396    pub target: SessionTargetSpec,
397    /// Listen host (`ip:port`) for the Session socket at the Entry node.
398    ///
399    /// Supports also partial specification (only `ip` or only `:port`) with the
400    /// respective part replaced by the node's configured default.
401    pub listen_host: Option<String>,
402    #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
403    /// Capabilities for the Session protocol.
404    ///
405    /// Defaults to `Segmentation` and `Retransmission` for TCP and nothing for UDP.
406    pub capabilities: Option<Vec<SessionCapability>>,
407    /// The amount of response data the Session counterparty can deliver back to us,
408    /// without us sending any SURBs to them.
409    ///
410    /// In other words, this size is recalculated to a number of SURBs delivered
411    /// to the counterparty upfront and then maintained.
412    /// The maintenance is dynamic, based on the number of responses we receive.
413    ///
414    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
415    /// at least the size of 2 Session packet payloads.
416    #[serde_as(as = "Option<DisplayFromStr>")]
417    #[schema(value_type = Option<String>)]
418    pub response_buffer: Option<bytesize::ByteSize>,
419    /// The maximum throughput at which artificial SURBs might be generated and sent
420    /// to the recipient of the Session.
421    ///
422    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
423    /// this should roughly match the maximum retrieval throughput.
424    ///
425    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
426    #[serde(default)]
427    #[serde(with = "human_bandwidth::option")]
428    #[schema(value_type = Option<String>)]
429    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
430    /// How many Sessions to pool for clients.
431    ///
432    /// If no sessions are pooled, they will be opened ad-hoc when a client connects.
433    /// It has no effect on UDP sessions in the current implementation.
434    ///
435    /// Currently, the maximum value is 5.
436    pub session_pool: Option<usize>,
437    /// The maximum number of client sessions that the listener can spawn.
438    ///
439    /// This currently applies only to the TCP sessions, as UDP sessions cannot
440    /// handle multiple clients (and spawn therefore always only a single session).
441    ///
442    /// If this value is smaller than the value specified in `session_pool`, it will
443    /// be set to that value.
444    ///
445    /// The default value is 5.
446    pub max_client_sessions: Option<usize>,
447}
448
449impl SessionClientRequest {
450    pub(crate) async fn into_protocol_session_config(
451        self,
452        target_protocol: IpProtocol,
453    ) -> Result<(hopr_lib::Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
454        let target_spec: hopr_lib::utils::session::SessionTargetSpec = self.target.clone().into();
455        Ok((
456            self.destination,
457            target_spec.into_target(target_protocol.into())?,
458            SessionClientConfig {
459                forward_path_options: self.forward_path.resolve().await?,
460                return_path_options: self.return_path.resolve().await?,
461                capabilities: self
462                    .capabilities
463                    .map(|vs| {
464                        let mut caps = SessionCapabilities::empty();
465                        caps.extend(vs.into_iter().map(SessionCapabilities::from));
466                        caps
467                    })
468                    .unwrap_or_else(|| match target_protocol {
469                        IpProtocol::TCP => {
470                            hopr_lib::SessionCapability::RetransmissionAck
471                                | hopr_lib::SessionCapability::RetransmissionNack
472                                | hopr_lib::SessionCapability::Segmentation
473                        }
474                        // Only Segmentation capability for UDP per default
475                        _ => SessionCapability::Segmentation.into(),
476                    }),
477                surb_management: SessionConfig {
478                    response_buffer: self.response_buffer,
479                    max_surb_upstream: self.max_surb_upstream,
480                }
481                .into(),
482                ..Default::default()
483            },
484        ))
485    }
486}
487
488#[serde_as]
489#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
490#[schema(example = json!({
491        "target": "example.com:80",
492        "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
493        "forwardPath": { "Hops": 1 },
494        "returnPath": { "Hops": 1 },
495        "protocol": "tcp",
496        "ip": "127.0.0.1",
497        "port": 5542,
498        "hoprMtu": 1002,
499        "surbLen": 398,
500        "activeClients": [],
501        "maxClientSessions": 2,
502        "maxSurbUpstream": "2000 kb/s",
503        "responseBuffer": "2 MB",
504        "sessionPool": 0
505    }))]
506#[serde(rename_all = "camelCase")]
507/// Response body for creating a new client session.
508pub(crate) struct SessionClientResponse {
509    #[schema(example = "example.com:80")]
510    /// Target of the Session.
511    pub target: String,
512    /// Destination node (exit node) of the Session.
513    #[serde_as(as = "DisplayFromStr")]
514    #[schema(value_type = String)]
515    pub destination: Address,
516    /// Forward routing path.
517    pub forward_path: RoutingOptions,
518    /// Return routing path.
519    pub return_path: RoutingOptions,
520    /// IP protocol used by Session's listening socket.
521    #[serde_as(as = "DisplayFromStr")]
522    #[schema(example = "tcp")]
523    pub protocol: IpProtocol,
524    /// Listening IP address of the Session's socket.
525    #[schema(example = "127.0.0.1")]
526    pub ip: String,
527    #[schema(example = 5542)]
528    /// Listening port of the Session's socket.
529    pub port: u16,
530    /// MTU used by the underlying HOPR transport.
531    pub hopr_mtu: usize,
532    /// Size of a Single Use Reply Block used by the protocol.
533    ///
534    /// This is useful for SURB balancing calculations.
535    pub surb_len: usize,
536    /// Lists Session IDs of all active clients.
537    ///
538    /// Can contain multiple entries on TCP sessions, but currently
539    /// always only a single entry on UDP sessions.
540    pub active_clients: Vec<String>,
541    /// The maximum number of client sessions that the listener can spawn.
542    ///
543    /// This currently applies only to the TCP sessions, as UDP sessions cannot
544    /// have multiple clients (defaults to 1 for UDP).
545    pub max_client_sessions: usize,
546    /// The maximum throughput at which artificial SURBs might be generated and sent
547    /// to the recipient of the Session.    
548    #[serde(default)]
549    #[serde(with = "human_bandwidth::option")]
550    #[schema(value_type = Option<String>)]
551    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
552    /// The amount of response data the Session counterparty can deliver back to us, without us
553    /// sending any SURBs to them.
554    #[serde_as(as = "Option<DisplayFromStr>")]
555    #[schema(value_type = Option<String>)]
556    pub response_buffer: Option<bytesize::ByteSize>,
557    /// How many Sessions to pool for clients.
558    pub session_pool: Option<usize>,
559}
560
561/// Creates a new client session returning the given session listening host and port over TCP or UDP.
562/// If no listening port is given in the request, the socket will be bound to a random free
563/// port and returned in the response.
564/// Different capabilities can be configured for the session, such as data segmentation or
565/// retransmission.
566///
567/// Once the host and port are bound, it is possible to use the socket for bidirectional read/write
568/// communication over the selected IP protocol and HOPR network routing with the given destination.
569/// The destination HOPR node forwards all the data to the given target over the selected IP protocol.
570///
571/// Various services require different types of socket communications:
572/// - services running over UDP usually do not require data retransmission, as it is already expected
573/// that UDP does not provide these and is therefore handled at the application layer.
574/// - On the contrary, services running over TCP *almost always* expect data segmentation and
575/// retransmission capabilities, so these should be configured while creating a session that passes
576/// TCP data.
577#[utoipa::path(
578        post,
579        path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
580        description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
581        params(
582            ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
583        ),
584        request_body(
585            content = SessionClientRequest,
586            description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
587            content_type = "application/json"),
588        responses(
589            (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
590            (status = 400, description = "Invalid IP protocol.", body = ApiError),
591            (status = 401, description = "Invalid authorization token.", body = ApiError),
592            (status = 409, description = "Listening address and port already in use.", body = ApiError),
593            (status = 422, description = "Unknown failure", body = ApiError),
594        ),
595        security(
596            ("api_token" = []),
597            ("bearer_token" = [])
598        ),
599        tag = "Session"
600    )]
601pub(crate) async fn create_client(
602    State(state): State<Arc<InternalState>>,
603    Path(protocol): Path<IpProtocol>,
604    Json(args): Json<SessionClientRequest>,
605) -> Result<impl IntoResponse, impl IntoResponse> {
606    let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
607
608    let listener_id = ListenerId(protocol.into(), bind_host);
609    if bind_host.port() > 0 && state.open_listeners.read_arc().await.contains_key(&listener_id) {
610        return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
611    }
612
613    let port_range = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE).ok();
614    debug!("binding {protocol} session listening socket to {bind_host} (port range limitations: {port_range:?})");
615
616    let (bound_host, udp_session_id, max_clients) = match protocol {
617        IpProtocol::TCP => {
618            let session_pool = args.session_pool;
619            let max_client_sessions = args.max_client_sessions;
620            let target_spec: hopr_lib::utils::session::SessionTargetSpec = args.target.clone().into();
621            let (destination, _target, config) = args
622                .clone()
623                .into_protocol_session_config(IpProtocol::TCP)
624                .await
625                .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
626
627            create_tcp_client_binding(
628                bind_host,
629                port_range,
630                state.hopr.clone(),
631                state.open_listeners.clone(),
632                destination,
633                target_spec,
634                config,
635                session_pool,
636                max_client_sessions,
637            )
638            .await
639            .map_err(|e| match e {
640                hopr_lib::utils::session::BindError::ListenHostAlreadyUsed => {
641                    (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
642                }
643                hopr_lib::utils::session::BindError::UnknownFailure(_) => (
644                    StatusCode::UNPROCESSABLE_ENTITY,
645                    ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
646                ),
647            })?
648        }
649        IpProtocol::UDP => {
650            let target_spec: hopr_lib::utils::session::SessionTargetSpec = args.target.clone().into();
651            let (destination, _target, config) = args
652                .clone()
653                .into_protocol_session_config(IpProtocol::UDP)
654                .await
655                .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
656
657            create_udp_client_binding(
658                bind_host,
659                port_range,
660                state.hopr.clone(),
661                state.open_listeners.clone(),
662                destination,
663                target_spec,
664                config,
665            )
666            .await
667            .map_err(|e| match e {
668                hopr_lib::utils::session::BindError::ListenHostAlreadyUsed => {
669                    (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
670                }
671                hopr_lib::utils::session::BindError::UnknownFailure(_) => (
672                    StatusCode::UNPROCESSABLE_ENTITY,
673                    ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
674                ),
675            })?
676        }
677    };
678
679    Ok::<_, (StatusCode, ApiErrorStatus)>(
680        (
681            StatusCode::OK,
682            Json(SessionClientResponse {
683                protocol,
684                ip: bound_host.ip().to_string(),
685                port: bound_host.port(),
686                target: args.target.to_string(),
687                destination: args.destination,
688                forward_path: args.forward_path.clone(),
689                return_path: args.return_path.clone(),
690                hopr_mtu: SESSION_MTU,
691                surb_len: SURB_SIZE,
692                active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
693                max_client_sessions: max_clients,
694                max_surb_upstream: args.max_surb_upstream,
695                response_buffer: args.response_buffer,
696                session_pool: args.session_pool,
697            }),
698        )
699            .into_response(),
700    )
701}
702
703/// Lists existing Session listeners for the given IP protocol.
704#[utoipa::path(
705    get,
706    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
707    description = "Lists existing Session listeners for the given IP protocol.",
708    params(
709        ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
710    ),
711    responses(
712        (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
713            {
714                "target": "example.com:80",
715                "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
716                "forwardPath": { "Hops": 1 },
717                "returnPath": { "Hops": 1 },
718                "protocol": "tcp",
719                "ip": "127.0.0.1",
720                "port": 5542,
721                "surbLen": 400,
722                "hoprMtu": 1020,
723                "activeClients": [],
724                "maxClientSessions": 2,
725                "maxSurbUpstream": "2000 kb/s",
726                "responseBuffer": "2 MB",
727                "sessionPool": 0
728            }
729        ])),
730        (status = 400, description = "Invalid IP protocol.", body = ApiError),
731        (status = 401, description = "Invalid authorization token.", body = ApiError),
732        (status = 422, description = "Unknown failure", body = ApiError)
733    ),
734    security(
735        ("api_token" = []),
736        ("bearer_token" = [])
737    ),
738    tag = "Session",
739)]
740pub(crate) async fn list_clients(
741    State(state): State<Arc<InternalState>>,
742    Path(protocol): Path<IpProtocol>,
743) -> Result<impl IntoResponse, impl IntoResponse> {
744    let response = state
745        .open_listeners
746        .read_arc()
747        .await
748        .iter()
749        .filter(|(id, _)| id.0 == protocol.into())
750        .map(|(id, entry)| SessionClientResponse {
751            protocol,
752            ip: id.1.ip().to_string(),
753            port: id.1.port(),
754            target: entry.target.to_string(),
755            forward_path: entry.forward_path.clone().into(),
756            return_path: entry.return_path.clone().into(),
757            destination: entry.destination,
758            hopr_mtu: SESSION_MTU,
759            surb_len: SURB_SIZE,
760            active_clients: entry.get_clients().iter().map(|e| e.key().to_string()).collect(),
761            max_client_sessions: entry.max_client_sessions,
762            max_surb_upstream: entry.max_surb_upstream,
763            response_buffer: entry.response_buffer,
764            session_pool: entry.session_pool,
765        })
766        .collect::<Vec<_>>();
767
768    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
769}
770
771#[serde_as]
772#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
773#[schema(example = json!({
774    "responseBuffer": "2 MB",
775    "maxSurbUpstream": "2 Mbps"
776}))]
777#[serde(rename_all = "camelCase")]
778pub(crate) struct SessionConfig {
779    /// The amount of response data the Session counterparty can deliver back to us,
780    /// without us sending any SURBs to them.
781    ///
782    /// In other words, this size is recalculated to a number of SURBs delivered
783    /// to the counterparty upfront and then maintained.
784    /// The maintenance is dynamic, based on the number of responses we receive.
785    ///
786    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
787    /// at least the size of 2 Session packet payloads.
788    #[serde(default)]
789    #[serde_as(as = "Option<DisplayFromStr>")]
790    #[schema(value_type = String)]
791    pub response_buffer: Option<bytesize::ByteSize>,
792    /// The maximum throughput at which artificial SURBs might be generated and sent
793    /// to the recipient of the Session.
794    ///
795    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
796    /// this should roughly match the maximum retrieval throughput.
797    ///
798    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
799    #[serde(default)]
800    #[serde(with = "human_bandwidth::option")]
801    #[schema(value_type = String)]
802    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
803}
804
805impl From<SessionConfig> for Option<SurbBalancerConfig> {
806    fn from(value: SessionConfig) -> Self {
807        match value.response_buffer {
808            // Buffer worth at least 2 reply packets
809            Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
810                target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
811                max_surbs_per_sec: value
812                    .max_surb_upstream
813                    .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
814                    .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
815                ..Default::default()
816            }),
817            // No additional SURBs are set up and maintained, useful for high-send low-reply sessions
818            Some(_) => None,
819            // Use defaults otherwise
820            None => Some(SurbBalancerConfig::default()),
821        }
822    }
823}
824
825impl From<SurbBalancerConfig> for SessionConfig {
826    fn from(value: SurbBalancerConfig) -> Self {
827        Self {
828            response_buffer: Some(bytesize::ByteSize::b(
829                value.target_surb_buffer_size * SESSION_MTU as u64,
830            )),
831            max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
832                value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
833            )),
834        }
835    }
836}
837
838#[utoipa::path(
839    post,
840    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
841    description = "Updates configuration of an existing active session.",
842    params(
843        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
844    ),
845    request_body(
846            content = SessionConfig,
847            description = "Allows updating of several parameters of an existing active session.",
848            content_type = "application/json"),
849    responses(
850            (status = 204, description = "Successfully updated the configuration"),
851            (status = 400, description = "Invalid configuration.", body = ApiError),
852            (status = 401, description = "Invalid authorization token.", body = ApiError),
853            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
854            (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
855            (status = 422, description = "Unknown failure", body = ApiError),
856    ),
857    security(
858            ("api_token" = []),
859            ("bearer_token" = [])
860    ),
861    tag = "Session"
862)]
863pub(crate) async fn adjust_session(
864    State(state): State<Arc<InternalState>>,
865    Path(session_id): Path<String>,
866    Json(args): Json<SessionConfig>,
867) -> Result<impl IntoResponse, impl IntoResponse> {
868    let session_id = HoprSessionId::from_str(&session_id)
869        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
870
871    if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
872        match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
873            Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
874            Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
875                SessionManagerError::NonExistingSession,
876            )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
877            Err(e) => Err((
878                StatusCode::NOT_ACCEPTABLE,
879                ApiErrorStatus::UnknownFailure(e.to_string()),
880            )),
881        }
882    } else {
883        Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
884    }
885}
886
887#[utoipa::path(
888    get,
889    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
890    description = "Gets configuration of an existing active session.",
891    params(
892        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
893    ),
894    responses(
895            (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
896            (status = 400, description = "Invalid session ID.", body = ApiError),
897            (status = 401, description = "Invalid authorization token.", body = ApiError),
898            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
899            (status = 422, description = "Unknown failure", body = ApiError),
900    ),
901    security(
902            ("api_token" = []),
903            ("bearer_token" = [])
904    ),
905    tag = "Session"
906)]
907pub(crate) async fn session_config(
908    State(state): State<Arc<InternalState>>,
909    Path(session_id): Path<String>,
910) -> Result<impl IntoResponse, impl IntoResponse> {
911    let session_id = HoprSessionId::from_str(&session_id)
912        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
913
914    match state.hopr.get_session_surb_balancer_config(&session_id).await {
915        Ok(Some(cfg)) => {
916            Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
917        }
918        Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
919        Err(e) => Err((
920            StatusCode::UNPROCESSABLE_ENTITY,
921            ApiErrorStatus::UnknownFailure(e.to_string()),
922        )),
923    }
924}
925
926#[derive(
927    Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
928)]
929#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
930#[serde(rename_all = "lowercase")]
931#[schema(example = "tcp")]
932/// IP transport protocol
933pub enum IpProtocol {
934    #[allow(clippy::upper_case_acronyms)]
935    TCP,
936    #[allow(clippy::upper_case_acronyms)]
937    UDP,
938}
939
940impl From<IpProtocol> for hopr_lib::IpProtocol {
941    fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
942        match protocol {
943            IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
944            IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
945        }
946    }
947}
948
949#[serde_as]
950#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
951pub struct SessionCloseClientQuery {
952    #[serde_as(as = "DisplayFromStr")]
953    #[schema(value_type = String, example = "tcp")]
954    /// IP transport protocol
955    pub protocol: IpProtocol,
956
957    /// Listening IP address of the Session.
958    #[schema(example = "127.0.0.1:8545")]
959    pub ip: String,
960
961    /// Session port used for the listener.
962    #[schema(value_type = u16, example = 10101)]
963    pub port: u16,
964}
965
966/// Closes an existing Session listener.
967/// The listener must've been previously created and bound for the given IP protocol.
968/// Once a listener is closed, no more socket connections can be made to it.
969/// If the passed port number is 0, listeners on all ports of the given listening IP and protocol
970/// will be closed.
971#[utoipa::path(
972    delete,
973    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
974    description = "Closes an existing Session listener.",
975    params(SessionCloseClientQuery),
976    responses(
977            (status = 204, description = "Listener closed successfully"),
978            (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
979            (status = 401, description = "Invalid authorization token.", body = ApiError),
980            (status = 404, description = "Listener not found.", body = ApiError),
981            (status = 422, description = "Unknown failure", body = ApiError)
982    ),
983    security(
984            ("api_token" = []),
985            ("bearer_token" = [])
986    ),
987    tag = "Session",
988)]
989pub(crate) async fn close_client(
990    State(state): State<Arc<InternalState>>,
991    Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
992) -> Result<impl IntoResponse, impl IntoResponse> {
993    let listening_ip: IpAddr = ip
994        .parse()
995        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
996
997    {
998        let mut open_listeners = state.open_listeners.write_arc().await;
999
1000        let mut to_remove = Vec::new();
1001        let protocol: hopr_lib::IpProtocol = protocol.into();
1002
1003        // Find all listeners with protocol, listening IP and optionally port number (if > 0)
1004        open_listeners
1005            .iter()
1006            .filter(|(ListenerId(proto, addr), _)| {
1007                protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1008            })
1009            .for_each(|(id, _)| to_remove.push(*id));
1010
1011        if to_remove.is_empty() {
1012            return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1013        }
1014
1015        for bound_addr in to_remove {
1016            let entry = open_listeners
1017                .remove(&bound_addr)
1018                .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1019
1020            entry.abort_handle.abort();
1021        }
1022    }
1023
1024    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1025}