1use std::{fmt::Formatter, hash::Hash, net::IpAddr, str::FromStr, sync::Arc};
2
3use axum::{
4 Error,
5 extract::{
6 Json, Path, State,
7 ws::{Message, WebSocket, WebSocketUpgrade},
8 },
9 http::status::StatusCode,
10 response::IntoResponse,
11};
12use axum_extra::extract::Query;
13use base64::Engine;
14use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt};
15use futures_concurrency::stream::Merge;
16use hopr_lib::{
17 Address, HoprSession, NodeId, SESSION_MTU, SURB_SIZE, ServiceId, SessionCapabilities, SessionClientConfig,
18 SessionId, SessionManagerError, SessionTarget, SurbBalancerConfig, TransportSessionError,
19 errors::{HoprLibError, HoprTransportError},
20 utils::{
21 futures::AsyncReadStreamer,
22 session::{ListenerId, build_binding_host, create_tcp_client_binding, create_udp_client_binding},
23 },
24};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use tracing::{debug, error, info, trace};
28
29use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState};
30
31pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
33
34pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
36
37pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
39
40#[allow(unused_imports)]
42use serde_json::json;
43
44#[serde_as]
45#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
46#[schema(
47 example = json!({"Plain": "example.com:80"}),
48 example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), example = json!({"Service": 0})
50)]
51pub enum SessionTargetSpec {
53 Plain(String),
54 Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
55 #[schema(value_type = u32)]
56 Service(ServiceId),
57}
58
59impl std::fmt::Display for SessionTargetSpec {
60 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61 match self {
62 SessionTargetSpec::Plain(t) => write!(f, "{t}"),
63 SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
64 SessionTargetSpec::Service(t) => write!(f, "#{t}"),
65 }
66 }
67}
68
69impl FromStr for SessionTargetSpec {
70 type Err = HoprLibError;
71
72 fn from_str(s: &str) -> Result<Self, Self::Err> {
73 Ok(if let Some(stripped) = s.strip_prefix("$$") {
74 Self::Sealed(
75 base64::prelude::BASE64_URL_SAFE
76 .decode(stripped)
77 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
78 )
79 } else if let Some(stripped) = s.strip_prefix("#") {
80 Self::Service(
81 stripped
82 .parse()
83 .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
84 )
85 } else {
86 Self::Plain(s.to_owned())
87 })
88 }
89}
90
91impl From<SessionTargetSpec> for hopr_lib::utils::session::SessionTargetSpec {
92 fn from(spec: SessionTargetSpec) -> Self {
93 match spec {
94 SessionTargetSpec::Plain(t) => Self::Plain(t),
95 SessionTargetSpec::Sealed(t) => Self::Sealed(t),
96 SessionTargetSpec::Service(t) => Self::Service(t),
97 }
98 }
99}
100
101#[repr(u8)]
102#[derive(
103 Debug,
104 Clone,
105 strum::EnumIter,
106 strum::Display,
107 strum::EnumString,
108 Serialize,
109 Deserialize,
110 PartialEq,
111 utoipa::ToSchema,
112)]
113#[schema(example = "Segmentation")]
114pub enum SessionCapability {
116 Segmentation,
118 Retransmission,
120 RetransmissionAckOnly,
122 NoDelay,
124 NoRateControl,
126}
127
128impl From<SessionCapability> for hopr_lib::SessionCapabilities {
129 fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
130 match cap {
131 SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
132 SessionCapability::Retransmission => {
133 hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
134 }
135 SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
136 SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
137 SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
138 }
139 }
140}
141
142#[serde_as]
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
144#[serde(rename_all = "camelCase")]
145pub(crate) struct SessionWebsocketClientQueryRequest {
146 #[serde_as(as = "DisplayFromStr")]
147 #[schema(required = true, value_type = String)]
148 pub destination: Address,
149 #[schema(required = true)]
150 pub hops: u8,
151 #[cfg(feature = "explicit-path")]
152 #[schema(required = false, value_type = String)]
153 pub path: Option<Vec<Address>>,
154 #[schema(required = true)]
155 #[serde_as(as = "Vec<DisplayFromStr>")]
156 pub capabilities: Vec<SessionCapability>,
157 #[schema(required = true)]
158 #[serde_as(as = "DisplayFromStr")]
159 pub target: SessionTargetSpec,
160 #[schema(required = false)]
161 #[serde(default = "default_protocol")]
162 pub protocol: IpProtocol,
163}
164
165#[inline]
166fn default_protocol() -> IpProtocol {
167 IpProtocol::TCP
168}
169
170impl SessionWebsocketClientQueryRequest {
171 pub(crate) async fn into_protocol_session_config(
172 self,
173 ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
174 #[cfg(not(feature = "explicit-path"))]
175 let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
176
177 #[cfg(feature = "explicit-path")]
178 let path_options = if let Some(path) = self.path {
179 let path = path.into_iter().map(NodeId::from).collect::<Vec<_>>();
181 hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
182 } else {
183 hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
184 };
185
186 let mut capabilities = SessionCapabilities::empty();
187 capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
188
189 let target_spec: hopr_lib::utils::session::SessionTargetSpec = self.target.into();
190
191 Ok((
192 self.destination,
193 target_spec.into_target(self.protocol.into())?,
194 SessionClientConfig {
195 forward_path_options: path_options.clone(),
196 return_path_options: path_options.clone(), capabilities,
198 ..Default::default()
199 },
200 ))
201 }
202}
203
204#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
205#[schema(value_type = String, format = Binary)]
206#[allow(dead_code)] struct WssData(Vec<u8>);
208
209#[allow(dead_code)] #[utoipa::path(
221 get,
222 path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
223 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
224 request_body(
225 content = SessionWebsocketClientQueryRequest,
226 content_type = "application/json",
227 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
228 ),
229 responses(
230 (status = 200, description = "Successfully created a new client websocket session."),
231 (status = 401, description = "Invalid authorization token.", body = ApiError),
232 (status = 422, description = "Unknown failure", body = ApiError),
233 (status = 429, description = "Too many open websocket connections.", body = ApiError),
234 ),
235 security(
236 ("api_token" = []),
237 ("bearer_token" = [])
238 ),
239 tag = "Session",
240 )]
241
242pub(crate) async fn websocket(
243 ws: WebSocketUpgrade,
244 Query(query): Query<SessionWebsocketClientQueryRequest>,
245 State(state): State<Arc<InternalState>>,
246) -> Result<impl IntoResponse, impl IntoResponse> {
247 let (dst, target, data) = query
248 .into_protocol_session_config()
249 .await
250 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
251
252 let hopr = state.hopr.clone();
253 let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
254 error!(error = %e, "Failed to establish session");
255 (
256 StatusCode::UNPROCESSABLE_ENTITY,
257 ApiErrorStatus::UnknownFailure(e.to_string()),
258 )
259 })?;
260
261 Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
262}
263
264enum WebSocketInput {
265 Network(Result<Box<[u8]>, std::io::Error>),
266 WsInput(Result<Message, Error>),
267}
268
269const WS_MAX_SESSION_READ_SIZE: usize = 4096;
271
272#[tracing::instrument(level = "debug", skip(socket, session))]
273async fn websocket_connection(socket: WebSocket, session: HoprSession) {
274 let session_id = *session.id();
275
276 let (rx, mut tx) = session.split();
277 let (mut sender, receiver) = socket.split();
278
279 let mut queue = (
280 receiver.map(WebSocketInput::WsInput),
281 AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
282 )
283 .merge();
284
285 let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
286
287 while let Some(v) = queue.next().await {
288 match v {
289 WebSocketInput::Network(bytes) => match bytes {
290 Ok(bytes) => {
291 let len = bytes.len();
292 if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
293 error!(
294 error = %e,
295 "Failed to emit read data onto the websocket, closing connection"
296 );
297 break;
298 };
299 bytes_from_session += len;
300 }
301 Err(e) => {
302 error!(
303 error = %e,
304 "Failed to push data from network to socket, closing connection"
305 );
306 break;
307 }
308 },
309 WebSocketInput::WsInput(ws_in) => match ws_in {
310 Ok(Message::Binary(data)) => {
311 let len = data.len();
312 if let Err(e) = tx.write(data.as_ref()).await {
313 error!(error = %e, "Failed to write data to the session, closing connection");
314 break;
315 }
316 bytes_to_session += len;
317 }
318 Ok(Message::Text(_)) => {
319 error!("Received string instead of binary data, closing connection");
320 break;
321 }
322 Ok(Message::Close(_)) => {
323 debug!("Received close frame, closing connection");
324 break;
325 }
326 Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
327 Err(e) => {
328 error!(error = %e, "Failed to get a valid websocket message, closing connection");
329 break;
330 }
331 },
332 }
333 }
334
335 info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
336}
337
338#[serde_as]
339#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
340#[schema(example = json!({ "Hops": 1 }))]
341pub enum RoutingOptions {
343 #[cfg(feature = "explicit-path")]
344 #[schema(value_type = Vec<String>)]
345 IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<NodeId>),
346 Hops(usize),
347}
348
349impl RoutingOptions {
350 pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
351 Ok(match self {
352 #[cfg(feature = "explicit-path")]
353 RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
354 RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
355 })
356 }
357}
358
359impl From<hopr_lib::RoutingOptions> for RoutingOptions {
360 fn from(opts: hopr_lib::RoutingOptions) -> Self {
361 match opts {
362 #[cfg(feature = "explicit-path")]
363 hopr_lib::RoutingOptions::IntermediatePath(path) => {
364 RoutingOptions::IntermediatePath(path.into_iter().collect())
365 }
366 hopr_lib::RoutingOptions::Hops(hops) => RoutingOptions::Hops(usize::from(hops)),
367 }
368 }
369}
370
371#[serde_as]
372#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
373#[schema(example = json!({
374 "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
375 "forwardPath": { "Hops": 1 },
376 "returnPath": { "Hops": 1 },
377 "target": {"Plain": "localhost:8080"},
378 "listenHost": "127.0.0.1:10000",
379 "capabilities": ["Retransmission", "Segmentation"],
380 "responseBuffer": "2 MB",
381 "maxSurbUpstream": "2000 kb/s",
382 "sessionPool": 0,
383 "maxClientSessions": 2
384 }))]
385#[serde(rename_all = "camelCase")]
386pub(crate) struct SessionClientRequest {
388 #[serde_as(as = "DisplayFromStr")]
390 #[schema(value_type = String)]
391 pub destination: Address,
392 pub forward_path: RoutingOptions,
394 pub return_path: RoutingOptions,
396 pub target: SessionTargetSpec,
398 pub listen_host: Option<String>,
403 #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
404 pub capabilities: Option<Vec<SessionCapability>>,
408 #[serde_as(as = "Option<DisplayFromStr>")]
418 #[schema(value_type = Option<String>)]
419 pub response_buffer: Option<bytesize::ByteSize>,
420 #[serde(default)]
428 #[serde(with = "human_bandwidth::option")]
429 #[schema(value_type = Option<String>)]
430 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
431 pub session_pool: Option<usize>,
438 pub max_client_sessions: Option<usize>,
448}
449
450impl SessionClientRequest {
451 pub(crate) async fn into_protocol_session_config(
452 self,
453 target_protocol: IpProtocol,
454 ) -> Result<(hopr_lib::Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
455 let target_spec: hopr_lib::utils::session::SessionTargetSpec = self.target.clone().into();
456 Ok((
457 self.destination,
458 target_spec.into_target(target_protocol.into())?,
459 SessionClientConfig {
460 forward_path_options: self.forward_path.resolve().await?,
461 return_path_options: self.return_path.resolve().await?,
462 capabilities: self
463 .capabilities
464 .map(|vs| {
465 let mut caps = SessionCapabilities::empty();
466 caps.extend(vs.into_iter().map(SessionCapabilities::from));
467 caps
468 })
469 .unwrap_or_else(|| match target_protocol {
470 IpProtocol::TCP => {
471 hopr_lib::SessionCapability::RetransmissionAck
472 | hopr_lib::SessionCapability::RetransmissionNack
473 | hopr_lib::SessionCapability::Segmentation
474 }
475 _ => SessionCapability::Segmentation.into(),
477 }),
478 surb_management: SessionConfig {
479 response_buffer: self.response_buffer,
480 max_surb_upstream: self.max_surb_upstream,
481 }
482 .into(),
483 ..Default::default()
484 },
485 ))
486 }
487}
488
489#[serde_as]
490#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
491#[schema(example = json!({
492 "target": "example.com:80",
493 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
494 "forwardPath": { "Hops": 1 },
495 "returnPath": { "Hops": 1 },
496 "protocol": "tcp",
497 "ip": "127.0.0.1",
498 "port": 5542,
499 "hoprMtu": 1002,
500 "surbLen": 398,
501 "activeClients": [],
502 "maxClientSessions": 2,
503 "maxSurbUpstream": "2000 kb/s",
504 "responseBuffer": "2 MB",
505 "sessionPool": 0
506 }))]
507#[serde(rename_all = "camelCase")]
508pub(crate) struct SessionClientResponse {
510 #[schema(example = "example.com:80")]
511 pub target: String,
513 #[serde_as(as = "DisplayFromStr")]
515 #[schema(value_type = String)]
516 pub destination: Address,
517 pub forward_path: RoutingOptions,
519 pub return_path: RoutingOptions,
521 #[serde_as(as = "DisplayFromStr")]
523 #[schema(example = "tcp")]
524 pub protocol: IpProtocol,
525 #[schema(example = "127.0.0.1")]
527 pub ip: String,
528 #[schema(example = 5542)]
529 pub port: u16,
531 pub hopr_mtu: usize,
533 pub surb_len: usize,
537 pub active_clients: Vec<String>,
542 pub max_client_sessions: usize,
547 #[serde(default)]
550 #[serde(with = "human_bandwidth::option")]
551 #[schema(value_type = Option<String>)]
552 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
553 #[serde_as(as = "Option<DisplayFromStr>")]
556 #[schema(value_type = Option<String>)]
557 pub response_buffer: Option<bytesize::ByteSize>,
558 pub session_pool: Option<usize>,
560}
561
562#[utoipa::path(
579 post,
580 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
581 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
582 params(
583 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
584 ),
585 request_body(
586 content = SessionClientRequest,
587 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
588 content_type = "application/json"),
589 responses(
590 (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
591 (status = 400, description = "Invalid IP protocol.", body = ApiError),
592 (status = 401, description = "Invalid authorization token.", body = ApiError),
593 (status = 409, description = "Listening address and port already in use.", body = ApiError),
594 (status = 422, description = "Unknown failure", body = ApiError),
595 ),
596 security(
597 ("api_token" = []),
598 ("bearer_token" = [])
599 ),
600 tag = "Session"
601 )]
602pub(crate) async fn create_client(
603 State(state): State<Arc<InternalState>>,
604 Path(protocol): Path<IpProtocol>,
605 Json(args): Json<SessionClientRequest>,
606) -> Result<impl IntoResponse, impl IntoResponse> {
607 let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
608
609 let listener_id = ListenerId(protocol.into(), bind_host);
610 if bind_host.port() > 0 && state.open_listeners.0.contains_key(&listener_id) {
611 return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
612 }
613
614 let port_range = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE).ok();
615 debug!("binding {protocol} session listening socket to {bind_host} (port range limitations: {port_range:?})");
616
617 let (bound_host, udp_session_id, max_clients) = match protocol {
618 IpProtocol::TCP => {
619 let session_pool = args.session_pool;
620 let max_client_sessions = args.max_client_sessions;
621 let target_spec: hopr_lib::utils::session::SessionTargetSpec = args.target.clone().into();
622 let (destination, _target, config) = args
623 .clone()
624 .into_protocol_session_config(IpProtocol::TCP)
625 .await
626 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
627
628 create_tcp_client_binding(
629 bind_host,
630 port_range,
631 state.hopr.clone(),
632 state.open_listeners.clone(),
633 destination,
634 target_spec,
635 config,
636 session_pool,
637 max_client_sessions,
638 )
639 .await
640 .map_err(|e| match e {
641 hopr_lib::utils::session::BindError::ListenHostAlreadyUsed => {
642 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
643 }
644 hopr_lib::utils::session::BindError::UnknownFailure(_) => (
645 StatusCode::UNPROCESSABLE_ENTITY,
646 ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
647 ),
648 })?
649 }
650 IpProtocol::UDP => {
651 let target_spec: hopr_lib::utils::session::SessionTargetSpec = args.target.clone().into();
652 let (destination, _target, config) = args
653 .clone()
654 .into_protocol_session_config(IpProtocol::UDP)
655 .await
656 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
657
658 create_udp_client_binding(
659 bind_host,
660 port_range,
661 state.hopr.clone(),
662 state.open_listeners.clone(),
663 destination,
664 target_spec,
665 config,
666 )
667 .await
668 .map_err(|e| match e {
669 hopr_lib::utils::session::BindError::ListenHostAlreadyUsed => {
670 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
671 }
672 hopr_lib::utils::session::BindError::UnknownFailure(_) => (
673 StatusCode::UNPROCESSABLE_ENTITY,
674 ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
675 ),
676 })?
677 }
678 };
679
680 Ok::<_, (StatusCode, ApiErrorStatus)>(
681 (
682 StatusCode::OK,
683 Json(SessionClientResponse {
684 protocol,
685 ip: bound_host.ip().to_string(),
686 port: bound_host.port(),
687 target: args.target.to_string(),
688 destination: args.destination,
689 forward_path: args.forward_path.clone(),
690 return_path: args.return_path.clone(),
691 hopr_mtu: SESSION_MTU,
692 surb_len: SURB_SIZE,
693 active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
694 max_client_sessions: max_clients,
695 max_surb_upstream: args.max_surb_upstream,
696 response_buffer: args.response_buffer,
697 session_pool: args.session_pool,
698 }),
699 )
700 .into_response(),
701 )
702}
703
704#[utoipa::path(
706 get,
707 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
708 description = "Lists existing Session listeners for the given IP protocol.",
709 params(
710 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
711 ),
712 responses(
713 (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
714 {
715 "target": "example.com:80",
716 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
717 "forwardPath": { "Hops": 1 },
718 "returnPath": { "Hops": 1 },
719 "protocol": "tcp",
720 "ip": "127.0.0.1",
721 "port": 5542,
722 "surbLen": 400,
723 "hoprMtu": 1020,
724 "activeClients": [],
725 "maxClientSessions": 2,
726 "maxSurbUpstream": "2000 kb/s",
727 "responseBuffer": "2 MB",
728 "sessionPool": 0
729 }
730 ])),
731 (status = 400, description = "Invalid IP protocol.", body = ApiError),
732 (status = 401, description = "Invalid authorization token.", body = ApiError),
733 (status = 422, description = "Unknown failure", body = ApiError)
734 ),
735 security(
736 ("api_token" = []),
737 ("bearer_token" = [])
738 ),
739 tag = "Session",
740)]
741pub(crate) async fn list_clients(
742 State(state): State<Arc<InternalState>>,
743 Path(protocol): Path<IpProtocol>,
744) -> Result<impl IntoResponse, impl IntoResponse> {
745 let response = state
746 .open_listeners
747 .0
748 .iter()
749 .filter(|v| v.key().0 == protocol.into())
750 .map(|v| {
751 let ListenerId(_, addr) = *v.key();
752 let entry = v.value();
753 SessionClientResponse {
754 protocol,
755 ip: addr.ip().to_string(),
756 port: addr.port(),
757 target: entry.target.to_string(),
758 forward_path: entry.forward_path.clone().into(),
759 return_path: entry.return_path.clone().into(),
760 destination: entry.destination,
761 hopr_mtu: SESSION_MTU,
762 surb_len: SURB_SIZE,
763 active_clients: entry.get_clients().iter().map(|e| e.key().to_string()).collect(),
764 max_client_sessions: entry.max_client_sessions,
765 max_surb_upstream: entry.max_surb_upstream,
766 response_buffer: entry.response_buffer,
767 session_pool: entry.session_pool,
768 }
769 })
770 .collect::<Vec<_>>();
771
772 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
773}
774
775#[serde_as]
776#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
777#[schema(example = json!({
778 "responseBuffer": "2 MB",
779 "maxSurbUpstream": "2 Mbps"
780}))]
781#[serde(rename_all = "camelCase")]
782pub(crate) struct SessionConfig {
783 #[serde(default)]
793 #[serde_as(as = "Option<DisplayFromStr>")]
794 #[schema(value_type = String)]
795 pub response_buffer: Option<bytesize::ByteSize>,
796 #[serde(default)]
804 #[serde(with = "human_bandwidth::option")]
805 #[schema(value_type = String)]
806 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
807}
808
809impl From<SessionConfig> for Option<SurbBalancerConfig> {
810 fn from(value: SessionConfig) -> Self {
811 match value.response_buffer {
812 Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
814 target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
815 max_surbs_per_sec: value
816 .max_surb_upstream
817 .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
818 .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
819 ..Default::default()
820 }),
821 Some(_) => None,
823 None => Some(SurbBalancerConfig::default()),
825 }
826 }
827}
828
829impl From<SurbBalancerConfig> for SessionConfig {
830 fn from(value: SurbBalancerConfig) -> Self {
831 Self {
832 response_buffer: Some(bytesize::ByteSize::b(
833 value.target_surb_buffer_size * SESSION_MTU as u64,
834 )),
835 max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
836 value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
837 )),
838 }
839 }
840}
841
842#[utoipa::path(
843 post,
844 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
845 description = "Updates configuration of an existing active session.",
846 params(
847 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
848 ),
849 request_body(
850 content = SessionConfig,
851 description = "Allows updating of several parameters of an existing active session.",
852 content_type = "application/json"),
853 responses(
854 (status = 204, description = "Successfully updated the configuration"),
855 (status = 400, description = "Invalid configuration.", body = ApiError),
856 (status = 401, description = "Invalid authorization token.", body = ApiError),
857 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
858 (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
859 (status = 422, description = "Unknown failure", body = ApiError),
860 ),
861 security(
862 ("api_token" = []),
863 ("bearer_token" = [])
864 ),
865 tag = "Session"
866)]
867pub(crate) async fn adjust_session(
868 State(state): State<Arc<InternalState>>,
869 Path(session_id): Path<String>,
870 Json(args): Json<SessionConfig>,
871) -> Result<impl IntoResponse, impl IntoResponse> {
872 let session_id =
873 SessionId::from_str(&session_id).map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
874
875 if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
876 match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
877 Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
878 Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
879 SessionManagerError::NonExistingSession,
880 )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
881 Err(e) => Err((
882 StatusCode::NOT_ACCEPTABLE,
883 ApiErrorStatus::UnknownFailure(e.to_string()),
884 )),
885 }
886 } else {
887 Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
888 }
889}
890
891#[utoipa::path(
892 get,
893 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
894 description = "Gets configuration of an existing active session.",
895 params(
896 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
897 ),
898 responses(
899 (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
900 (status = 400, description = "Invalid session ID.", body = ApiError),
901 (status = 401, description = "Invalid authorization token.", body = ApiError),
902 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
903 (status = 422, description = "Unknown failure", body = ApiError),
904 ),
905 security(
906 ("api_token" = []),
907 ("bearer_token" = [])
908 ),
909 tag = "Session"
910)]
911pub(crate) async fn session_config(
912 State(state): State<Arc<InternalState>>,
913 Path(session_id): Path<String>,
914) -> Result<impl IntoResponse, impl IntoResponse> {
915 let session_id =
916 SessionId::from_str(&session_id).map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
917
918 match state.hopr.get_session_surb_balancer_config(&session_id).await {
919 Ok(Some(cfg)) => {
920 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
921 }
922 Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
923 Err(e) => Err((
924 StatusCode::UNPROCESSABLE_ENTITY,
925 ApiErrorStatus::UnknownFailure(e.to_string()),
926 )),
927 }
928}
929
930#[derive(
931 Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
932)]
933#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
934#[serde(rename_all = "lowercase")]
935#[schema(example = "tcp")]
936pub enum IpProtocol {
938 #[allow(clippy::upper_case_acronyms)]
939 TCP,
940 #[allow(clippy::upper_case_acronyms)]
941 UDP,
942}
943
944impl From<IpProtocol> for hopr_lib::IpProtocol {
945 fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
946 match protocol {
947 IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
948 IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
949 }
950 }
951}
952
953#[serde_as]
954#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
955pub struct SessionCloseClientQuery {
956 #[serde_as(as = "DisplayFromStr")]
957 #[schema(value_type = String, example = "tcp")]
958 pub protocol: IpProtocol,
960
961 #[schema(example = "127.0.0.1:8545")]
963 pub ip: String,
964
965 #[schema(value_type = u16, example = 10101)]
967 pub port: u16,
968}
969
970#[utoipa::path(
976 delete,
977 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
978 description = "Closes an existing Session listener.",
979 params(SessionCloseClientQuery),
980 responses(
981 (status = 204, description = "Listener closed successfully"),
982 (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
983 (status = 401, description = "Invalid authorization token.", body = ApiError),
984 (status = 404, description = "Listener not found.", body = ApiError),
985 (status = 422, description = "Unknown failure", body = ApiError)
986 ),
987 security(
988 ("api_token" = []),
989 ("bearer_token" = [])
990 ),
991 tag = "Session",
992)]
993pub(crate) async fn close_client(
994 State(state): State<Arc<InternalState>>,
995 Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
996) -> Result<impl IntoResponse, impl IntoResponse> {
997 let listening_ip: IpAddr = ip
998 .parse()
999 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
1000
1001 {
1002 let open_listeners = &state.open_listeners.0;
1003
1004 let mut to_remove = Vec::new();
1005 let protocol: hopr_lib::IpProtocol = protocol.into();
1006
1007 open_listeners
1009 .iter()
1010 .filter(|v| {
1011 let ListenerId(proto, addr) = v.key();
1012 protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1013 })
1014 .for_each(|v| to_remove.push(*v.key()));
1015
1016 if to_remove.is_empty() {
1017 return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1018 }
1019
1020 for bound_addr in to_remove {
1021 let (_, entry) = open_listeners
1022 .remove(&bound_addr)
1023 .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1024
1025 entry.abort_handle.abort();
1026 }
1027 }
1028
1029 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1030}