1use std::{fmt::Formatter, hash::Hash, net::IpAddr, str::FromStr, sync::Arc};
2
3use axum::{
4 Error,
5 extract::{
6 Json, Path, State,
7 ws::{Message, WebSocket, WebSocketUpgrade},
8 },
9 http::status::StatusCode,
10 response::IntoResponse,
11};
12use axum_extra::extract::Query;
13use base64::Engine;
14use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt};
15use futures_concurrency::stream::Merge;
16use hopr_lib::{
17 Address, HoprSession, NodeId, SESSION_MTU, SURB_SIZE, ServiceId, SessionCapabilities, SessionClientConfig,
18 SessionId, SessionManagerError, SessionTarget, SurbBalancerConfig, TransportSessionError,
19 errors::{HoprLibError, HoprTransportError},
20 utils::futures::AsyncReadStreamer,
21};
22use hopr_utils_session::{ListenerId, build_binding_host, create_tcp_client_binding, create_udp_client_binding};
23use serde::{Deserialize, Serialize};
24use serde_with::{DisplayFromStr, serde_as};
25use tracing::{debug, error, info, trace};
26
27use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState};
28
29pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
31
32pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
34
35pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
37
38#[allow(unused_imports)]
40use serde_json::json;
41
42#[serde_as]
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
44#[schema(
45 example = json!({"Plain": "example.com:80"}),
46 example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), example = json!({"Service": 0})
48)]
49pub enum SessionTargetSpec {
51 Plain(String),
52 Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
53 #[schema(value_type = u32)]
54 Service(ServiceId),
55}
56
57impl std::fmt::Display for SessionTargetSpec {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 match self {
60 SessionTargetSpec::Plain(t) => write!(f, "{t}"),
61 SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
62 SessionTargetSpec::Service(t) => write!(f, "#{t}"),
63 }
64 }
65}
66
67impl FromStr for SessionTargetSpec {
68 type Err = HoprLibError;
69
70 fn from_str(s: &str) -> Result<Self, Self::Err> {
71 Ok(if let Some(stripped) = s.strip_prefix("$$") {
72 Self::Sealed(
73 base64::prelude::BASE64_URL_SAFE
74 .decode(stripped)
75 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
76 )
77 } else if let Some(stripped) = s.strip_prefix("#") {
78 Self::Service(
79 stripped
80 .parse()
81 .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
82 )
83 } else {
84 Self::Plain(s.to_owned())
85 })
86 }
87}
88
89impl From<SessionTargetSpec> for hopr_utils_session::SessionTargetSpec {
90 fn from(spec: SessionTargetSpec) -> Self {
91 match spec {
92 SessionTargetSpec::Plain(t) => Self::Plain(t),
93 SessionTargetSpec::Sealed(t) => Self::Sealed(t),
94 SessionTargetSpec::Service(t) => Self::Service(t),
95 }
96 }
97}
98
99#[repr(u8)]
100#[derive(
101 Debug,
102 Clone,
103 strum::EnumIter,
104 strum::Display,
105 strum::EnumString,
106 Serialize,
107 Deserialize,
108 PartialEq,
109 utoipa::ToSchema,
110)]
111#[schema(example = "Segmentation")]
112pub enum SessionCapability {
114 Segmentation,
116 Retransmission,
118 RetransmissionAckOnly,
120 NoDelay,
122 NoRateControl,
124}
125
126impl From<SessionCapability> for hopr_lib::SessionCapabilities {
127 fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
128 match cap {
129 SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
130 SessionCapability::Retransmission => {
131 hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
132 }
133 SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
134 SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
135 SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
136 }
137 }
138}
139
140#[serde_as]
141#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
142#[serde(rename_all = "camelCase")]
143pub(crate) struct SessionWebsocketClientQueryRequest {
144 #[serde_as(as = "DisplayFromStr")]
145 #[schema(required = true, value_type = String)]
146 pub destination: Address,
147 #[schema(required = true)]
148 pub hops: u8,
149 #[cfg(feature = "explicit-path")]
150 #[schema(required = false, value_type = String)]
151 pub path: Option<Vec<Address>>,
152 #[schema(required = true)]
153 #[serde_as(as = "Vec<DisplayFromStr>")]
154 pub capabilities: Vec<SessionCapability>,
155 #[schema(required = true)]
156 #[serde_as(as = "DisplayFromStr")]
157 pub target: SessionTargetSpec,
158 #[schema(required = false)]
159 #[serde(default = "default_protocol")]
160 pub protocol: IpProtocol,
161}
162
163#[inline]
164fn default_protocol() -> IpProtocol {
165 IpProtocol::TCP
166}
167
168impl SessionWebsocketClientQueryRequest {
169 pub(crate) async fn into_protocol_session_config(
170 self,
171 ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
172 #[cfg(not(feature = "explicit-path"))]
173 let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
174
175 #[cfg(feature = "explicit-path")]
176 let path_options = if let Some(path) = self.path {
177 let path = path.into_iter().map(NodeId::from).collect::<Vec<_>>();
179 hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
180 } else {
181 hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
182 };
183
184 let mut capabilities = SessionCapabilities::empty();
185 capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
186
187 let target_spec: hopr_utils_session::SessionTargetSpec = self.target.into();
188
189 Ok((
190 self.destination,
191 target_spec.into_target(self.protocol.into())?,
192 SessionClientConfig {
193 forward_path_options: path_options.clone(),
194 return_path_options: path_options.clone(), capabilities,
196 ..Default::default()
197 },
198 ))
199 }
200}
201
202#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
203#[schema(value_type = String, format = Binary)]
204#[allow(dead_code)] struct WssData(Vec<u8>);
206
207#[allow(dead_code)] #[utoipa::path(
219 get,
220 path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
221 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
222 request_body(
223 content = SessionWebsocketClientQueryRequest,
224 content_type = "application/json",
225 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
226 ),
227 responses(
228 (status = 200, description = "Successfully created a new client websocket session."),
229 (status = 401, description = "Invalid authorization token.", body = ApiError),
230 (status = 422, description = "Unknown failure", body = ApiError),
231 (status = 429, description = "Too many open websocket connections.", body = ApiError),
232 ),
233 security(
234 ("api_token" = []),
235 ("bearer_token" = [])
236 ),
237 tag = "Session",
238 )]
239
240pub(crate) async fn websocket(
241 ws: WebSocketUpgrade,
242 Query(query): Query<SessionWebsocketClientQueryRequest>,
243 State(state): State<Arc<InternalState>>,
244) -> Result<impl IntoResponse, impl IntoResponse> {
245 let (dst, target, data) = query
246 .into_protocol_session_config()
247 .await
248 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
249
250 let hopr = state.hopr.clone();
251 let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
252 error!(error = %e, "Failed to establish session");
253 (
254 StatusCode::UNPROCESSABLE_ENTITY,
255 ApiErrorStatus::UnknownFailure(e.to_string()),
256 )
257 })?;
258
259 Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
260}
261
262enum WebSocketInput {
263 Network(Result<Box<[u8]>, std::io::Error>),
264 WsInput(Result<Message, Error>),
265}
266
267const WS_MAX_SESSION_READ_SIZE: usize = 4096;
269
270#[tracing::instrument(level = "debug", skip(socket, session))]
271async fn websocket_connection(socket: WebSocket, session: HoprSession) {
272 let session_id = *session.id();
273
274 let (rx, mut tx) = session.split();
275 let (mut sender, receiver) = socket.split();
276
277 let mut queue = (
278 receiver.map(WebSocketInput::WsInput),
279 AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
280 )
281 .merge();
282
283 let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
284
285 while let Some(v) = queue.next().await {
286 match v {
287 WebSocketInput::Network(bytes) => match bytes {
288 Ok(bytes) => {
289 let len = bytes.len();
290 if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
291 error!(
292 error = %e,
293 "Failed to emit read data onto the websocket, closing connection"
294 );
295 break;
296 };
297 bytes_from_session += len;
298 }
299 Err(e) => {
300 error!(
301 error = %e,
302 "Failed to push data from network to socket, closing connection"
303 );
304 break;
305 }
306 },
307 WebSocketInput::WsInput(ws_in) => match ws_in {
308 Ok(Message::Binary(data)) => {
309 let len = data.len();
310 if let Err(e) = tx.write(data.as_ref()).await {
311 error!(error = %e, "Failed to write data to the session, closing connection");
312 break;
313 }
314 bytes_to_session += len;
315 }
316 Ok(Message::Text(_)) => {
317 error!("Received string instead of binary data, closing connection");
318 break;
319 }
320 Ok(Message::Close(_)) => {
321 debug!("Received close frame, closing connection");
322 break;
323 }
324 Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
325 Err(e) => {
326 error!(error = %e, "Failed to get a valid websocket message, closing connection");
327 break;
328 }
329 },
330 }
331 }
332
333 info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
334}
335
336#[serde_as]
337#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
338#[schema(example = json!({ "Hops": 1 }))]
339pub enum RoutingOptions {
341 #[cfg(feature = "explicit-path")]
342 #[schema(value_type = Vec<String>)]
343 IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<NodeId>),
344 Hops(usize),
345}
346
347impl RoutingOptions {
348 pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
349 Ok(match self {
350 #[cfg(feature = "explicit-path")]
351 RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
352 RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
353 })
354 }
355}
356
357impl From<hopr_lib::RoutingOptions> for RoutingOptions {
358 fn from(opts: hopr_lib::RoutingOptions) -> Self {
359 match opts {
360 #[cfg(feature = "explicit-path")]
361 hopr_lib::RoutingOptions::IntermediatePath(path) => {
362 RoutingOptions::IntermediatePath(path.into_iter().collect())
363 }
364 hopr_lib::RoutingOptions::Hops(hops) => RoutingOptions::Hops(usize::from(hops)),
365 }
366 }
367}
368
369#[serde_as]
370#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
371#[schema(example = json!({
372 "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
373 "forwardPath": { "Hops": 1 },
374 "returnPath": { "Hops": 1 },
375 "target": {"Plain": "localhost:8080"},
376 "listenHost": "127.0.0.1:10000",
377 "capabilities": ["Retransmission", "Segmentation"],
378 "responseBuffer": "2 MB",
379 "maxSurbUpstream": "2000 kb/s",
380 "sessionPool": 0,
381 "maxClientSessions": 2
382 }))]
383#[serde(rename_all = "camelCase")]
384pub(crate) struct SessionClientRequest {
386 #[serde_as(as = "DisplayFromStr")]
388 #[schema(value_type = String)]
389 pub destination: Address,
390 pub forward_path: RoutingOptions,
392 pub return_path: RoutingOptions,
394 pub target: SessionTargetSpec,
396 pub listen_host: Option<String>,
401 #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
402 pub capabilities: Option<Vec<SessionCapability>>,
406 #[serde_as(as = "Option<DisplayFromStr>")]
416 #[schema(value_type = Option<String>)]
417 pub response_buffer: Option<bytesize::ByteSize>,
418 #[serde(default)]
426 #[serde(with = "human_bandwidth::option")]
427 #[schema(value_type = Option<String>)]
428 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
429 pub session_pool: Option<usize>,
436 pub max_client_sessions: Option<usize>,
446}
447
448impl SessionClientRequest {
449 pub(crate) async fn into_protocol_session_config(
450 self,
451 target_protocol: IpProtocol,
452 ) -> Result<(hopr_lib::Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
453 let target_spec: hopr_utils_session::SessionTargetSpec = self.target.clone().into();
454 Ok((
455 self.destination,
456 target_spec.into_target(target_protocol.into())?,
457 SessionClientConfig {
458 forward_path_options: self.forward_path.resolve().await?,
459 return_path_options: self.return_path.resolve().await?,
460 capabilities: self
461 .capabilities
462 .map(|vs| {
463 let mut caps = SessionCapabilities::empty();
464 caps.extend(vs.into_iter().map(SessionCapabilities::from));
465 caps
466 })
467 .unwrap_or_else(|| match target_protocol {
468 IpProtocol::TCP => {
469 hopr_lib::SessionCapability::RetransmissionAck
470 | hopr_lib::SessionCapability::RetransmissionNack
471 | hopr_lib::SessionCapability::Segmentation
472 }
473 _ => SessionCapability::Segmentation.into(),
475 }),
476 surb_management: SessionConfig {
477 response_buffer: self.response_buffer,
478 max_surb_upstream: self.max_surb_upstream,
479 }
480 .into(),
481 ..Default::default()
482 },
483 ))
484 }
485}
486
487#[serde_as]
488#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
489#[schema(example = json!({
490 "target": "example.com:80",
491 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
492 "forwardPath": { "Hops": 1 },
493 "returnPath": { "Hops": 1 },
494 "protocol": "tcp",
495 "ip": "127.0.0.1",
496 "port": 5542,
497 "hoprMtu": 1002,
498 "surbLen": 398,
499 "activeClients": [],
500 "maxClientSessions": 2,
501 "maxSurbUpstream": "2000 kb/s",
502 "responseBuffer": "2 MB",
503 "sessionPool": 0
504 }))]
505#[serde(rename_all = "camelCase")]
506pub(crate) struct SessionClientResponse {
508 #[schema(example = "example.com:80")]
509 pub target: String,
511 #[serde_as(as = "DisplayFromStr")]
513 #[schema(value_type = String)]
514 pub destination: Address,
515 pub forward_path: RoutingOptions,
517 pub return_path: RoutingOptions,
519 #[serde_as(as = "DisplayFromStr")]
521 #[schema(example = "tcp")]
522 pub protocol: IpProtocol,
523 #[schema(example = "127.0.0.1")]
525 pub ip: String,
526 #[schema(example = 5542)]
527 pub port: u16,
529 pub hopr_mtu: usize,
531 pub surb_len: usize,
535 pub active_clients: Vec<String>,
540 pub max_client_sessions: usize,
545 #[serde(default)]
548 #[serde(with = "human_bandwidth::option")]
549 #[schema(value_type = Option<String>)]
550 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
551 #[serde_as(as = "Option<DisplayFromStr>")]
554 #[schema(value_type = Option<String>)]
555 pub response_buffer: Option<bytesize::ByteSize>,
556 pub session_pool: Option<usize>,
558}
559
560#[utoipa::path(
577 post,
578 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
579 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
580 params(
581 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
582 ),
583 request_body(
584 content = SessionClientRequest,
585 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
586 content_type = "application/json"),
587 responses(
588 (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
589 (status = 400, description = "Invalid IP protocol.", body = ApiError),
590 (status = 401, description = "Invalid authorization token.", body = ApiError),
591 (status = 409, description = "Listening address and port already in use.", body = ApiError),
592 (status = 422, description = "Unknown failure", body = ApiError),
593 ),
594 security(
595 ("api_token" = []),
596 ("bearer_token" = [])
597 ),
598 tag = "Session"
599 )]
600pub(crate) async fn create_client(
601 State(state): State<Arc<InternalState>>,
602 Path(protocol): Path<IpProtocol>,
603 Json(args): Json<SessionClientRequest>,
604) -> Result<impl IntoResponse, impl IntoResponse> {
605 let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
606
607 let listener_id = ListenerId(protocol.into(), bind_host);
608 if bind_host.port() > 0 && state.open_listeners.0.contains_key(&listener_id) {
609 return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
610 }
611
612 let port_range = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE).ok();
613 debug!("binding {protocol} session listening socket to {bind_host} (port range limitations: {port_range:?})");
614
615 let (bound_host, udp_session_id, max_clients) = match protocol {
616 IpProtocol::TCP => {
617 let session_pool = args.session_pool;
618 let max_client_sessions = args.max_client_sessions;
619 let target_spec: hopr_utils_session::SessionTargetSpec = args.target.clone().into();
620 let (destination, _target, config) = args
621 .clone()
622 .into_protocol_session_config(IpProtocol::TCP)
623 .await
624 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
625
626 create_tcp_client_binding(
627 bind_host,
628 port_range,
629 state.hopr.clone(),
630 state.open_listeners.clone(),
631 destination,
632 target_spec,
633 config,
634 session_pool,
635 max_client_sessions,
636 )
637 .await
638 .map_err(|e| match e {
639 hopr_utils_session::BindError::ListenHostAlreadyUsed => {
640 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
641 }
642 hopr_utils_session::BindError::UnknownFailure(_) => (
643 StatusCode::UNPROCESSABLE_ENTITY,
644 ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
645 ),
646 })?
647 }
648 IpProtocol::UDP => {
649 let target_spec: hopr_utils_session::SessionTargetSpec = args.target.clone().into();
650 let (destination, _target, config) = args
651 .clone()
652 .into_protocol_session_config(IpProtocol::UDP)
653 .await
654 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
655
656 create_udp_client_binding(
657 bind_host,
658 port_range,
659 state.hopr.clone(),
660 state.open_listeners.clone(),
661 destination,
662 target_spec,
663 config,
664 )
665 .await
666 .map_err(|e| match e {
667 hopr_utils_session::BindError::ListenHostAlreadyUsed => {
668 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
669 }
670 hopr_utils_session::BindError::UnknownFailure(_) => (
671 StatusCode::UNPROCESSABLE_ENTITY,
672 ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
673 ),
674 })?
675 }
676 };
677
678 Ok::<_, (StatusCode, ApiErrorStatus)>(
679 (
680 StatusCode::OK,
681 Json(SessionClientResponse {
682 protocol,
683 ip: bound_host.ip().to_string(),
684 port: bound_host.port(),
685 target: args.target.to_string(),
686 destination: args.destination,
687 forward_path: args.forward_path.clone(),
688 return_path: args.return_path.clone(),
689 hopr_mtu: SESSION_MTU,
690 surb_len: SURB_SIZE,
691 active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
692 max_client_sessions: max_clients,
693 max_surb_upstream: args.max_surb_upstream,
694 response_buffer: args.response_buffer,
695 session_pool: args.session_pool,
696 }),
697 )
698 .into_response(),
699 )
700}
701
702#[utoipa::path(
704 get,
705 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
706 description = "Lists existing Session listeners for the given IP protocol.",
707 params(
708 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
709 ),
710 responses(
711 (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
712 {
713 "target": "example.com:80",
714 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
715 "forwardPath": { "Hops": 1 },
716 "returnPath": { "Hops": 1 },
717 "protocol": "tcp",
718 "ip": "127.0.0.1",
719 "port": 5542,
720 "surbLen": 400,
721 "hoprMtu": 1020,
722 "activeClients": [],
723 "maxClientSessions": 2,
724 "maxSurbUpstream": "2000 kb/s",
725 "responseBuffer": "2 MB",
726 "sessionPool": 0
727 }
728 ])),
729 (status = 400, description = "Invalid IP protocol.", body = ApiError),
730 (status = 401, description = "Invalid authorization token.", body = ApiError),
731 (status = 422, description = "Unknown failure", body = ApiError)
732 ),
733 security(
734 ("api_token" = []),
735 ("bearer_token" = [])
736 ),
737 tag = "Session",
738)]
739pub(crate) async fn list_clients(
740 State(state): State<Arc<InternalState>>,
741 Path(protocol): Path<IpProtocol>,
742) -> Result<impl IntoResponse, impl IntoResponse> {
743 let response = state
744 .open_listeners
745 .0
746 .iter()
747 .filter(|v| v.key().0 == protocol.into())
748 .map(|v| {
749 let ListenerId(_, addr) = *v.key();
750 let entry = v.value();
751 SessionClientResponse {
752 protocol,
753 ip: addr.ip().to_string(),
754 port: addr.port(),
755 target: entry.target.to_string(),
756 forward_path: entry.forward_path.clone().into(),
757 return_path: entry.return_path.clone().into(),
758 destination: entry.destination,
759 hopr_mtu: SESSION_MTU,
760 surb_len: SURB_SIZE,
761 active_clients: entry.get_clients().iter().map(|e| e.key().to_string()).collect(),
762 max_client_sessions: entry.max_client_sessions,
763 max_surb_upstream: entry.max_surb_upstream,
764 response_buffer: entry.response_buffer,
765 session_pool: entry.session_pool,
766 }
767 })
768 .collect::<Vec<_>>();
769
770 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
771}
772
773#[serde_as]
774#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
775#[schema(example = json!({
776 "responseBuffer": "2 MB",
777 "maxSurbUpstream": "2 Mbps"
778}))]
779#[serde(rename_all = "camelCase")]
780pub(crate) struct SessionConfig {
781 #[serde(default)]
791 #[serde_as(as = "Option<DisplayFromStr>")]
792 #[schema(value_type = String)]
793 pub response_buffer: Option<bytesize::ByteSize>,
794 #[serde(default)]
802 #[serde(with = "human_bandwidth::option")]
803 #[schema(value_type = String)]
804 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
805}
806
807impl From<SessionConfig> for Option<SurbBalancerConfig> {
808 fn from(value: SessionConfig) -> Self {
809 match value.response_buffer {
810 Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
812 target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
813 max_surbs_per_sec: value
814 .max_surb_upstream
815 .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
816 .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
817 ..Default::default()
818 }),
819 Some(_) => None,
821 None => Some(SurbBalancerConfig::default()),
823 }
824 }
825}
826
827impl From<SurbBalancerConfig> for SessionConfig {
828 fn from(value: SurbBalancerConfig) -> Self {
829 Self {
830 response_buffer: Some(bytesize::ByteSize::b(
831 value.target_surb_buffer_size * SESSION_MTU as u64,
832 )),
833 max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
834 value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
835 )),
836 }
837 }
838}
839
840#[utoipa::path(
841 post,
842 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
843 description = "Updates configuration of an existing active session.",
844 params(
845 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
846 ),
847 request_body(
848 content = SessionConfig,
849 description = "Allows updating of several parameters of an existing active session.",
850 content_type = "application/json"),
851 responses(
852 (status = 204, description = "Successfully updated the configuration"),
853 (status = 400, description = "Invalid configuration.", body = ApiError),
854 (status = 401, description = "Invalid authorization token.", body = ApiError),
855 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
856 (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
857 (status = 422, description = "Unknown failure", body = ApiError),
858 ),
859 security(
860 ("api_token" = []),
861 ("bearer_token" = [])
862 ),
863 tag = "Session"
864)]
865pub(crate) async fn adjust_session(
866 State(state): State<Arc<InternalState>>,
867 Path(session_id): Path<String>,
868 Json(args): Json<SessionConfig>,
869) -> Result<impl IntoResponse, impl IntoResponse> {
870 let session_id =
871 SessionId::from_str(&session_id).map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
872
873 if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
874 match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
875 Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
876 Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
877 SessionManagerError::NonExistingSession,
878 )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
879 Err(e) => Err((
880 StatusCode::NOT_ACCEPTABLE,
881 ApiErrorStatus::UnknownFailure(e.to_string()),
882 )),
883 }
884 } else {
885 Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
886 }
887}
888
889#[utoipa::path(
890 get,
891 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
892 description = "Gets configuration of an existing active session.",
893 params(
894 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
895 ),
896 responses(
897 (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
898 (status = 400, description = "Invalid session ID.", body = ApiError),
899 (status = 401, description = "Invalid authorization token.", body = ApiError),
900 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
901 (status = 422, description = "Unknown failure", body = ApiError),
902 ),
903 security(
904 ("api_token" = []),
905 ("bearer_token" = [])
906 ),
907 tag = "Session"
908)]
909pub(crate) async fn session_config(
910 State(state): State<Arc<InternalState>>,
911 Path(session_id): Path<String>,
912) -> Result<impl IntoResponse, impl IntoResponse> {
913 let session_id =
914 SessionId::from_str(&session_id).map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
915
916 match state.hopr.get_session_surb_balancer_config(&session_id).await {
917 Ok(Some(cfg)) => {
918 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
919 }
920 Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
921 Err(e) => Err((
922 StatusCode::UNPROCESSABLE_ENTITY,
923 ApiErrorStatus::UnknownFailure(e.to_string()),
924 )),
925 }
926}
927
928#[derive(
929 Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
930)]
931#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
932#[serde(rename_all = "lowercase")]
933#[schema(example = "tcp")]
934pub enum IpProtocol {
936 #[allow(clippy::upper_case_acronyms)]
937 TCP,
938 #[allow(clippy::upper_case_acronyms)]
939 UDP,
940}
941
942impl From<IpProtocol> for hopr_lib::IpProtocol {
943 fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
944 match protocol {
945 IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
946 IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
947 }
948 }
949}
950
951#[serde_as]
952#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
953pub struct SessionCloseClientQuery {
954 #[serde_as(as = "DisplayFromStr")]
955 #[schema(value_type = String, example = "tcp")]
956 pub protocol: IpProtocol,
958
959 #[schema(example = "127.0.0.1:8545")]
961 pub ip: String,
962
963 #[schema(value_type = u16, example = 10101)]
965 pub port: u16,
966}
967
968#[utoipa::path(
974 delete,
975 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
976 description = "Closes an existing Session listener.",
977 params(SessionCloseClientQuery),
978 responses(
979 (status = 204, description = "Listener closed successfully"),
980 (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
981 (status = 401, description = "Invalid authorization token.", body = ApiError),
982 (status = 404, description = "Listener not found.", body = ApiError),
983 (status = 422, description = "Unknown failure", body = ApiError)
984 ),
985 security(
986 ("api_token" = []),
987 ("bearer_token" = [])
988 ),
989 tag = "Session",
990)]
991pub(crate) async fn close_client(
992 State(state): State<Arc<InternalState>>,
993 Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
994) -> Result<impl IntoResponse, impl IntoResponse> {
995 let listening_ip: IpAddr = ip
996 .parse()
997 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
998
999 {
1000 let open_listeners = &state.open_listeners.0;
1001
1002 let mut to_remove = Vec::new();
1003 let protocol: hopr_lib::IpProtocol = protocol.into();
1004
1005 open_listeners
1007 .iter()
1008 .filter(|v| {
1009 let ListenerId(proto, addr) = v.key();
1010 protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1011 })
1012 .for_each(|v| to_remove.push(*v.key()));
1013
1014 if to_remove.is_empty() {
1015 return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1016 }
1017
1018 for bound_addr in to_remove {
1019 let (_, entry) = open_listeners
1020 .remove(&bound_addr)
1021 .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1022
1023 entry.abort_handle.abort();
1024 }
1025 }
1026
1027 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1028}