Skip to main content

hoprd_api/
session.rs

1use std::{fmt::Formatter, hash::Hash, net::IpAddr, str::FromStr, sync::Arc};
2
3use axum::{
4    extract::{Json, Path, State},
5    http::status::StatusCode,
6    response::IntoResponse,
7};
8use base64::Engine;
9use hopr_lib::{
10    Address, HopRouting, HoprSessionClientConfig, SESSION_MTU, SURB_SIZE, ServiceId, SessionCapabilities, SessionId,
11    SessionManagerError, SessionTarget, SurbBalancerConfig, TransportSessionError,
12    errors::{HoprLibError, HoprTransportError},
13};
14use hopr_utils_session::{ListenerId, build_binding_host, create_tcp_client_binding, create_udp_client_binding};
15use serde::{Deserialize, Serialize};
16// Imported for some IDEs to not treat the `json!` macro inside the `schema` macro as an error
17#[allow(unused_imports)]
18use serde_json::json;
19use serde_with::{DisplayFromStr, serde_as};
20
21use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState};
22
23#[serde_as]
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
25#[schema(
26    example = json!({"Plain": "example.com:80"}),
27    example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), // base64 for "HelloWorld"
28    example = json!({"Service": 0})
29)]
30/// Session target specification.
31pub enum SessionTargetSpec {
32    Plain(String),
33    Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
34    #[schema(value_type = u32)]
35    Service(ServiceId),
36}
37
38impl std::fmt::Display for SessionTargetSpec {
39    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40        match self {
41            SessionTargetSpec::Plain(t) => write!(f, "{t}"),
42            SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
43            SessionTargetSpec::Service(t) => write!(f, "#{t}"),
44        }
45    }
46}
47
48impl FromStr for SessionTargetSpec {
49    type Err = HoprLibError;
50
51    fn from_str(s: &str) -> Result<Self, Self::Err> {
52        Ok(if let Some(stripped) = s.strip_prefix("$$") {
53            Self::Sealed(
54                base64::prelude::BASE64_URL_SAFE
55                    .decode(stripped)
56                    .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
57            )
58        } else if let Some(stripped) = s.strip_prefix("#") {
59            Self::Service(
60                stripped
61                    .parse()
62                    .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
63            )
64        } else {
65            Self::Plain(s.to_owned())
66        })
67    }
68}
69
70impl From<SessionTargetSpec> for hopr_utils_session::SessionTargetSpec {
71    fn from(spec: SessionTargetSpec) -> Self {
72        match spec {
73            SessionTargetSpec::Plain(t) => Self::Plain(t),
74            SessionTargetSpec::Sealed(t) => Self::Sealed(t),
75            SessionTargetSpec::Service(t) => Self::Service(t),
76        }
77    }
78}
79
80#[repr(u8)]
81#[derive(
82    Debug,
83    Clone,
84    strum::EnumIter,
85    strum::Display,
86    strum::EnumString,
87    Serialize,
88    Deserialize,
89    PartialEq,
90    utoipa::ToSchema,
91)]
92#[schema(example = "Segmentation")]
93/// Session capabilities that can be negotiated with the target peer.
94pub enum SessionCapability {
95    /// Frame segmentation
96    Segmentation,
97    /// Frame retransmission (ACK and NACK-based)
98    Retransmission,
99    /// Frame retransmission (only ACK-based)
100    RetransmissionAckOnly,
101    /// Disable packet buffering
102    NoDelay,
103    /// Disable SURB-based egress rate control at the Exit.
104    NoRateControl,
105}
106
107impl From<SessionCapability> for hopr_lib::SessionCapabilities {
108    fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
109        match cap {
110            SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
111            SessionCapability::Retransmission => {
112                hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
113            }
114            SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
115            SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
116            SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
117        }
118    }
119}
120
121#[serde_as]
122#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
123#[schema(example = json!({ "Hops": 1 }))]
124pub enum RoutingOptions {
125    Hops(usize),
126}
127
128impl TryFrom<RoutingOptions> for hopr_lib::HopRouting {
129    type Error = hopr_lib::GeneralError;
130
131    /// Converts API routing options into protocol-level hop routing.
132    fn try_from(value: RoutingOptions) -> Result<Self, Self::Error> {
133        match value {
134            RoutingOptions::Hops(hops) => HopRouting::try_from(hops),
135        }
136    }
137}
138
139impl From<hopr_lib::HopRouting> for RoutingOptions {
140    fn from(opts: hopr_lib::HopRouting) -> Self {
141        RoutingOptions::Hops(opts.hop_count())
142    }
143}
144
145#[serde_as]
146#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
147#[schema(example = json!({
148        "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
149        "forwardPath": { "Hops": 1 },
150        "returnPath": { "Hops": 1 },
151        "target": {"Plain": "localhost:8080"},
152        "listenHost": "127.0.0.1:10000",
153        "capabilities": ["Retransmission", "Segmentation"],
154        "responseBuffer": "2 MB",
155        "maxSurbUpstream": "2000 kb/s",
156        "sessionPool": 0,
157        "maxClientSessions": 2
158    }))]
159#[serde(rename_all = "camelCase")]
160/// Request body for creating a new client session.
161pub(crate) struct SessionClientRequest {
162    /// Address of the Exit node.
163    #[serde_as(as = "DisplayFromStr")]
164    #[schema(value_type = String)]
165    pub destination: Address,
166    /// The forward path for the Session.
167    pub forward_path: RoutingOptions,
168    /// The return path for the Session.
169    pub return_path: RoutingOptions,
170    /// Target for the Session.
171    pub target: SessionTargetSpec,
172    /// Listen host (`ip:port`) for the Session socket at the Entry node.
173    ///
174    /// Supports also partial specification (only `ip` or only `:port`) with the
175    /// respective part replaced by the node's configured default.
176    pub listen_host: Option<String>,
177    #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
178    /// Capabilities for the Session protocol.
179    ///
180    /// Defaults to `Segmentation` and `Retransmission` for TCP and nothing for UDP.
181    pub capabilities: Option<Vec<SessionCapability>>,
182    /// The amount of response data the Session counterparty can deliver back to us,
183    /// without us sending any SURBs to them.
184    ///
185    /// In other words, this size is recalculated to a number of SURBs delivered
186    /// to the counterparty upfront and then maintained.
187    /// The maintenance is dynamic, based on the number of responses we receive.
188    ///
189    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
190    /// at least the size of 2 Session packet payloads.
191    #[serde_as(as = "Option<DisplayFromStr>")]
192    #[schema(value_type = Option<String>)]
193    pub response_buffer: Option<bytesize::ByteSize>,
194    /// The maximum throughput at which artificial SURBs might be generated and sent
195    /// to the recipient of the Session.
196    ///
197    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
198    /// this should roughly match the maximum retrieval throughput.
199    ///
200    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
201    #[serde(default)]
202    #[serde(with = "human_bandwidth::option")]
203    #[schema(value_type = Option<String>)]
204    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
205    /// How many Sessions to pool for clients.
206    ///
207    /// If no sessions are pooled, they will be opened ad-hoc when a client connects.
208    /// It has no effect on UDP sessions in the current implementation.
209    ///
210    /// Currently, the maximum value is 5.
211    pub session_pool: Option<usize>,
212    /// The maximum number of client sessions that the listener can spawn.
213    ///
214    /// This currently applies only to the TCP sessions, as UDP sessions cannot
215    /// handle multiple clients (and spawn therefore always only a single session).
216    ///
217    /// If this value is smaller than the value specified in `session_pool`, it will
218    /// be set to that value.
219    ///
220    /// The default value is 5.
221    pub max_client_sessions: Option<usize>,
222}
223
224impl SessionClientRequest {
225    /// Converts the API client session request into protocol-level session configuration.
226    pub(crate) async fn into_protocol_session_config(
227        self,
228        target_protocol: IpProtocol,
229    ) -> Result<(hopr_lib::Address, SessionTarget, HoprSessionClientConfig), ApiErrorStatus> {
230        let target_spec: hopr_utils_session::SessionTargetSpec = self.target.clone().into();
231        Ok((
232            self.destination,
233            target_spec.into_target(target_protocol.into())?,
234            HoprSessionClientConfig {
235                forward_path: self.forward_path.try_into()?,
236                return_path: self.return_path.try_into()?,
237                capabilities: self
238                    .capabilities
239                    .map(|vs| {
240                        let mut caps = SessionCapabilities::empty();
241                        caps.extend(vs.into_iter().map(SessionCapabilities::from));
242                        caps
243                    })
244                    .unwrap_or_else(|| match target_protocol {
245                        IpProtocol::TCP => {
246                            hopr_lib::SessionCapability::RetransmissionAck
247                                | hopr_lib::SessionCapability::RetransmissionNack
248                                | hopr_lib::SessionCapability::Segmentation
249                        }
250                        // Only Segmentation capability for UDP per default
251                        _ => SessionCapability::Segmentation.into(),
252                    }),
253                surb_management: SessionConfig {
254                    response_buffer: self.response_buffer,
255                    max_surb_upstream: self.max_surb_upstream,
256                }
257                .into(),
258                ..Default::default()
259            },
260        ))
261    }
262}
263
264#[serde_as]
265#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
266#[schema(example = json!({
267        "target": "example.com:80",
268        "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
269        "forwardPath": { "Hops": 1 },
270        "returnPath": { "Hops": 1 },
271        "protocol": "tcp",
272        "ip": "127.0.0.1",
273        "port": 5542,
274        "hoprMtu": 1002,
275        "surbLen": 398,
276        "activeClients": [],
277        "maxClientSessions": 2,
278        "maxSurbUpstream": "2000 kb/s",
279        "responseBuffer": "2 MB",
280        "sessionPool": 0
281    }))]
282#[serde(rename_all = "camelCase")]
283/// Response body for creating a new client session.
284pub(crate) struct SessionClientResponse {
285    #[schema(example = "example.com:80")]
286    /// Target of the Session.
287    pub target: String,
288    /// Destination node (exit node) of the Session.
289    #[serde_as(as = "DisplayFromStr")]
290    #[schema(value_type = String)]
291    pub destination: Address,
292    /// Forward routing path.
293    pub forward_path: RoutingOptions,
294    /// Return routing path.
295    pub return_path: RoutingOptions,
296    /// IP protocol used by Session's listening socket.
297    #[serde_as(as = "DisplayFromStr")]
298    #[schema(example = "tcp")]
299    pub protocol: IpProtocol,
300    /// Listening IP address of the Session's socket.
301    #[schema(example = "127.0.0.1")]
302    pub ip: String,
303    #[schema(example = 5542)]
304    /// Listening port of the Session's socket.
305    pub port: u16,
306    /// MTU used by the underlying HOPR transport.
307    pub hopr_mtu: usize,
308    /// Size of a Single Use Reply Block used by the protocol.
309    ///
310    /// This is useful for SURB balancing calculations.
311    pub surb_len: usize,
312    /// Lists Session IDs of all active clients.
313    ///
314    /// Can contain multiple entries on TCP sessions, but currently
315    /// always only a single entry on UDP sessions.
316    pub active_clients: Vec<String>,
317    /// The maximum number of client sessions that the listener can spawn.
318    ///
319    /// This currently applies only to the TCP sessions, as UDP sessions cannot
320    /// have multiple clients (defaults to 1 for UDP).
321    pub max_client_sessions: usize,
322    /// The maximum throughput at which artificial SURBs might be generated and sent
323    /// to the recipient of the Session.    
324    #[serde(default)]
325    #[serde(with = "human_bandwidth::option")]
326    #[schema(value_type = Option<String>)]
327    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
328    /// The amount of response data the Session counterparty can deliver back to us, without us
329    /// sending any SURBs to them.
330    #[serde_as(as = "Option<DisplayFromStr>")]
331    #[schema(value_type = Option<String>)]
332    pub response_buffer: Option<bytesize::ByteSize>,
333    /// How many Sessions to pool for clients.
334    pub session_pool: Option<usize>,
335}
336
337/// Creates a new client session returning the given session listening host and port over TCP or UDP.
338/// If no listening port is given in the request, the socket will be bound to a random free
339/// port and returned in the response.
340/// Different capabilities can be configured for the session, such as data segmentation or
341/// retransmission.
342///
343/// Once the host and port are bound, it is possible to use the socket for bidirectional read/write
344/// communication over the selected IP protocol and HOPR network routing with the given destination.
345/// The destination HOPR node forwards all the data to the given target over the selected IP protocol.
346///
347/// Various services require different types of socket communications:
348/// - services running over UDP usually do not require data retransmission, as it is already expected
349/// that UDP does not provide these and is therefore handled at the application layer.
350/// - On the contrary, services running over TCP *almost always* expect data segmentation and
351/// retransmission capabilities, so these should be configured while creating a session that passes
352/// TCP data.
353#[utoipa::path(
354        post,
355        path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
356        description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
357        params(
358            ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
359        ),
360        request_body(
361            content = SessionClientRequest,
362            description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
363            content_type = "application/json"),
364        responses(
365            (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
366            (status = 400, description = "Invalid IP protocol.", body = ApiError),
367            (status = 401, description = "Invalid authorization token.", body = ApiError),
368            (status = 409, description = "Listening address and port already in use.", body = ApiError),
369            (status = 422, description = "Unknown failure", body = ApiError),
370        ),
371        security(
372            ("api_token" = []),
373            ("bearer_token" = [])
374        ),
375        tag = "Session"
376    )]
377pub(crate) async fn create_client(
378    State(state): State<Arc<InternalState>>,
379    Path(protocol): Path<IpProtocol>,
380    Json(args): Json<SessionClientRequest>,
381) -> Result<impl IntoResponse, impl IntoResponse> {
382    let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
383
384    let listener_id = ListenerId(protocol.into(), bind_host);
385    if bind_host.port() > 0 && state.open_listeners.0.contains_key(&listener_id) {
386        return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
387    }
388
389    let port_range = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE).ok();
390    tracing::debug!(%protocol, %bind_host, ?port_range, "binding session listening socket");
391
392    let (bound_host, udp_session_id, max_clients) = match protocol {
393        IpProtocol::TCP => {
394            let session_pool = args.session_pool;
395            let max_client_sessions = args.max_client_sessions;
396            let target_spec: hopr_utils_session::SessionTargetSpec = args.target.clone().into();
397            let (destination, _target, config) = args
398                .clone()
399                .into_protocol_session_config(IpProtocol::TCP)
400                .await
401                .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
402
403            create_tcp_client_binding(
404                bind_host,
405                port_range,
406                state.hopr.clone(),
407                state.open_listeners.clone(),
408                destination,
409                target_spec,
410                config,
411                session_pool,
412                max_client_sessions,
413            )
414            .await
415            .map_err(|e| match e {
416                hopr_utils_session::BindError::ListenHostAlreadyUsed => {
417                    (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
418                }
419                hopr_utils_session::BindError::UnknownFailure(_) => (
420                    StatusCode::UNPROCESSABLE_ENTITY,
421                    ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
422                ),
423            })?
424        }
425        IpProtocol::UDP => {
426            let target_spec: hopr_utils_session::SessionTargetSpec = args.target.clone().into();
427            let (destination, _target, config) = args
428                .clone()
429                .into_protocol_session_config(IpProtocol::UDP)
430                .await
431                .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
432
433            create_udp_client_binding(
434                bind_host,
435                port_range,
436                state.hopr.clone(),
437                state.open_listeners.clone(),
438                destination,
439                target_spec,
440                config,
441            )
442            .await
443            .map_err(|e| match e {
444                hopr_utils_session::BindError::ListenHostAlreadyUsed => {
445                    (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
446                }
447                hopr_utils_session::BindError::UnknownFailure(_) => (
448                    StatusCode::UNPROCESSABLE_ENTITY,
449                    ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
450                ),
451            })?
452        }
453    };
454
455    Ok::<_, (StatusCode, ApiErrorStatus)>(
456        (
457            StatusCode::OK,
458            Json(SessionClientResponse {
459                protocol,
460                ip: bound_host.ip().to_string(),
461                port: bound_host.port(),
462                target: args.target.to_string(),
463                destination: args.destination,
464                forward_path: args.forward_path.clone(),
465                return_path: args.return_path.clone(),
466                hopr_mtu: SESSION_MTU,
467                surb_len: SURB_SIZE,
468                active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
469                max_client_sessions: max_clients,
470                max_surb_upstream: args.max_surb_upstream,
471                response_buffer: args.response_buffer,
472                session_pool: args.session_pool,
473            }),
474        )
475            .into_response(),
476    )
477}
478
479/// Lists existing Session listeners for the given IP protocol.
480#[utoipa::path(
481    get,
482    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
483    description = "Lists existing Session listeners for the given IP protocol.",
484    params(
485        ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
486    ),
487    responses(
488        (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
489            {
490                "target": "example.com:80",
491                "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
492                "forwardPath": { "Hops": 1 },
493                "returnPath": { "Hops": 1 },
494                "protocol": "tcp",
495                "ip": "127.0.0.1",
496                "port": 5542,
497                "surbLen": 400,
498                "hoprMtu": 1020,
499                "activeClients": [],
500                "maxClientSessions": 2,
501                "maxSurbUpstream": "2000 kb/s",
502                "responseBuffer": "2 MB",
503                "sessionPool": 0
504            }
505        ])),
506        (status = 400, description = "Invalid IP protocol.", body = ApiError),
507        (status = 401, description = "Invalid authorization token.", body = ApiError),
508        (status = 422, description = "Unknown failure", body = ApiError)
509    ),
510    security(
511        ("api_token" = []),
512        ("bearer_token" = [])
513    ),
514    tag = "Session",
515)]
516pub(crate) async fn list_clients(
517    State(state): State<Arc<InternalState>>,
518    Path(protocol): Path<IpProtocol>,
519) -> Result<impl IntoResponse, impl IntoResponse> {
520    let response = state
521        .open_listeners
522        .0
523        .iter()
524        .filter(|v| v.key().0 == protocol.into())
525        .map(|v| {
526            let ListenerId(_, addr) = *v.key();
527            let entry = v.value();
528            let forward_path = hopr_lib::HopRouting::try_from(entry.forward_path.count_hops())
529                .expect("stored routing options always have bounded hop count");
530            let return_path = hopr_lib::HopRouting::try_from(entry.return_path.count_hops())
531                .expect("stored routing options always have bounded hop count");
532            SessionClientResponse {
533                protocol,
534                ip: addr.ip().to_string(),
535                port: addr.port(),
536                target: entry.target.to_string(),
537                forward_path: forward_path.into(),
538                return_path: return_path.into(),
539                destination: entry.destination,
540                hopr_mtu: SESSION_MTU,
541                surb_len: SURB_SIZE,
542                active_clients: entry.get_clients().iter().map(|e| e.key().to_string()).collect(),
543                max_client_sessions: entry.max_client_sessions,
544                max_surb_upstream: entry.max_surb_upstream,
545                response_buffer: entry.response_buffer,
546                session_pool: entry.session_pool,
547            }
548        })
549        .collect::<Vec<_>>();
550
551    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
552}
553
554#[serde_as]
555#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
556#[schema(example = json!({
557    "responseBuffer": "2 MB",
558    "maxSurbUpstream": "2 Mbps"
559}))]
560#[serde(rename_all = "camelCase")]
561pub(crate) struct SessionConfig {
562    /// The amount of response data the Session counterparty can deliver back to us,
563    /// without us sending any SURBs to them.
564    ///
565    /// In other words, this size is recalculated to a number of SURBs delivered
566    /// to the counterparty upfront and then maintained.
567    /// The maintenance is dynamic, based on the number of responses we receive.
568    ///
569    /// All syntaxes like "2 MB", "128 kiB", "3MiB" are supported. The value must be
570    /// at least the size of 2 Session packet payloads.
571    #[serde(default)]
572    #[serde_as(as = "Option<DisplayFromStr>")]
573    #[schema(value_type = String)]
574    pub response_buffer: Option<bytesize::ByteSize>,
575    /// The maximum throughput at which artificial SURBs might be generated and sent
576    /// to the recipient of the Session.
577    ///
578    /// On Sessions that rarely send data but receive a lot (= Exit node has high SURB consumption),
579    /// this should roughly match the maximum retrieval throughput.
580    ///
581    /// All syntaxes like "2 MBps", "1.2Mbps", "300 kb/s", "1.23 Mb/s" are supported.
582    #[serde(default)]
583    #[serde(with = "human_bandwidth::option")]
584    #[schema(value_type = String)]
585    pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
586}
587
588impl From<SessionConfig> for Option<SurbBalancerConfig> {
589    /// Converts the API session config into protocol-level SURB balancer config.
590    fn from(value: SessionConfig) -> Self {
591        match value.response_buffer {
592            // Buffer worth at least 2 reply packets
593            Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
594                target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
595                max_surbs_per_sec: value
596                    .max_surb_upstream
597                    .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
598                    .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
599                ..Default::default()
600            }),
601            // No additional SURBs are set up and maintained, useful for high-send low-reply sessions
602            Some(_) => None,
603            // Use defaults otherwise
604            None => Some(SurbBalancerConfig::default()),
605        }
606    }
607}
608
609impl From<SurbBalancerConfig> for SessionConfig {
610    /// Converts protocol-level SURB balancer config into the API session config format.
611    fn from(value: SurbBalancerConfig) -> Self {
612        Self {
613            response_buffer: Some(bytesize::ByteSize::b(
614                value.target_surb_buffer_size * SESSION_MTU as u64,
615            )),
616            max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
617                value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
618            )),
619        }
620    }
621}
622
623#[utoipa::path(
624    post,
625    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
626    description = "Updates configuration of an existing active session.",
627    params(
628        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
629    ),
630    request_body(
631            content = SessionConfig,
632            description = "Allows updating of several parameters of an existing active session.",
633            content_type = "application/json"),
634    responses(
635            (status = 204, description = "Successfully updated the configuration"),
636            (status = 400, description = "Invalid configuration.", body = ApiError),
637            (status = 401, description = "Invalid authorization token.", body = ApiError),
638            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
639            (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
640            (status = 422, description = "Unknown failure", body = ApiError),
641    ),
642    security(
643            ("api_token" = []),
644            ("bearer_token" = [])
645    ),
646    tag = "Session"
647)]
648pub(crate) async fn adjust_session(
649    State(state): State<Arc<InternalState>>,
650    Path(session_id): Path<String>,
651    Json(args): Json<SessionConfig>,
652) -> Result<impl IntoResponse, impl IntoResponse> {
653    let session_id =
654        SessionId::from_str(&session_id).map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
655
656    if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
657        match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
658            Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
659            Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
660                SessionManagerError::NonExistingSession,
661            )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
662            Err(e) => Err((
663                StatusCode::NOT_ACCEPTABLE,
664                ApiErrorStatus::UnknownFailure(e.to_string()),
665            )),
666        }
667    } else {
668        Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
669    }
670}
671
672#[utoipa::path(
673    get,
674    path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
675    description = "Gets configuration of an existing active session.",
676    params(
677        ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
678    ),
679    responses(
680            (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
681            (status = 400, description = "Invalid session ID.", body = ApiError),
682            (status = 401, description = "Invalid authorization token.", body = ApiError),
683            (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
684            (status = 422, description = "Unknown failure", body = ApiError),
685    ),
686    security(
687            ("api_token" = []),
688            ("bearer_token" = [])
689    ),
690    tag = "Session"
691)]
692pub(crate) async fn session_config(
693    State(state): State<Arc<InternalState>>,
694    Path(session_id): Path<String>,
695) -> Result<impl IntoResponse, impl IntoResponse> {
696    let session_id =
697        SessionId::from_str(&session_id).map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
698
699    match state.hopr.get_session_surb_balancer_config(&session_id).await {
700        Ok(Some(cfg)) => {
701            Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
702        }
703        Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
704        Err(e) => Err((
705            StatusCode::UNPROCESSABLE_ENTITY,
706            ApiErrorStatus::UnknownFailure(e.to_string()),
707        )),
708    }
709}
710
711#[derive(
712    Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
713)]
714#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
715#[serde(rename_all = "lowercase")]
716#[schema(example = "tcp")]
717/// IP transport protocol
718pub enum IpProtocol {
719    #[allow(clippy::upper_case_acronyms)]
720    TCP,
721    #[allow(clippy::upper_case_acronyms)]
722    UDP,
723}
724
725impl From<IpProtocol> for hopr_lib::IpProtocol {
726    fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
727        match protocol {
728            IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
729            IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
730        }
731    }
732}
733
734#[serde_as]
735#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
736pub struct SessionCloseClientQuery {
737    #[serde_as(as = "DisplayFromStr")]
738    #[schema(value_type = String, example = "tcp")]
739    /// IP transport protocol
740    pub protocol: IpProtocol,
741
742    /// Listening IP address of the Session.
743    #[schema(example = "127.0.0.1:8545")]
744    pub ip: String,
745
746    /// Session port used for the listener.
747    #[schema(value_type = u16, example = 10101)]
748    pub port: u16,
749}
750
751/// Closes an existing Session listener.
752/// The listener must've been previously created and bound for the given IP protocol.
753/// Once a listener is closed, no more socket connections can be made to it.
754/// If the passed port number is 0, listeners on all ports of the given listening IP and protocol
755/// will be closed.
756#[utoipa::path(
757    delete,
758    path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
759    description = "Closes an existing Session listener.",
760    params(SessionCloseClientQuery),
761    responses(
762            (status = 204, description = "Listener closed successfully"),
763            (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
764            (status = 401, description = "Invalid authorization token.", body = ApiError),
765            (status = 404, description = "Listener not found.", body = ApiError),
766            (status = 422, description = "Unknown failure", body = ApiError)
767    ),
768    security(
769            ("api_token" = []),
770            ("bearer_token" = [])
771    ),
772    tag = "Session",
773)]
774pub(crate) async fn close_client(
775    State(state): State<Arc<InternalState>>,
776    Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
777) -> Result<impl IntoResponse, impl IntoResponse> {
778    let listening_ip: IpAddr = ip
779        .parse()
780        .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
781
782    {
783        let open_listeners = &state.open_listeners.0;
784
785        let mut to_remove = Vec::new();
786        let protocol: hopr_lib::IpProtocol = protocol.into();
787
788        // Find all listeners with protocol, listening IP and optionally port number (if > 0)
789        open_listeners
790            .iter()
791            .filter(|v| {
792                let ListenerId(proto, addr) = v.key();
793                protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
794            })
795            .for_each(|v| to_remove.push(*v.key()));
796
797        if to_remove.is_empty() {
798            return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
799        }
800
801        for bound_addr in to_remove {
802            let (_, entry) = open_listeners
803                .remove(&bound_addr)
804                .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
805
806            entry.abort_handle.abort();
807        }
808    }
809
810    Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
811}