1use std::{fmt::Formatter, hash::Hash, net::IpAddr, str::FromStr, sync::Arc};
2
3use axum::{
4 Error,
5 extract::{
6 Json, Path, State,
7 ws::{Message, WebSocket, WebSocketUpgrade},
8 },
9 http::status::StatusCode,
10 response::IntoResponse,
11};
12use axum_extra::extract::Query;
13use base64::Engine;
14use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt};
15use futures_concurrency::stream::Merge;
16use hopr_lib::{
17 Address, HoprSession, HoprSessionId, HoprTransportError, NodeId, SESSION_MTU, SURB_SIZE, ServiceId,
18 SessionCapabilities, SessionClientConfig, SessionManagerError, SessionTarget, SurbBalancerConfig,
19 TransportSessionError,
20 errors::HoprLibError,
21 utils::{
22 futures::AsyncReadStreamer,
23 session::{ListenerId, build_binding_host, create_tcp_client_binding, create_udp_client_binding},
24 },
25};
26use serde::{Deserialize, Serialize};
27use serde_with::{DisplayFromStr, serde_as};
28use tracing::{debug, error, info, trace};
29
30use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState};
31
32pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
34
35pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
37
38pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
40
41#[allow(unused_imports)]
43use serde_json::json;
44
45#[serde_as]
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
47#[schema(
48 example = json!({"Plain": "example.com:80"}),
49 example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), example = json!({"Service": 0})
51)]
52pub enum SessionTargetSpec {
54 Plain(String),
55 Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
56 #[schema(value_type = u32)]
57 Service(ServiceId),
58}
59
60impl std::fmt::Display for SessionTargetSpec {
61 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62 match self {
63 SessionTargetSpec::Plain(t) => write!(f, "{t}"),
64 SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
65 SessionTargetSpec::Service(t) => write!(f, "#{t}"),
66 }
67 }
68}
69
70impl FromStr for SessionTargetSpec {
71 type Err = HoprLibError;
72
73 fn from_str(s: &str) -> Result<Self, Self::Err> {
74 Ok(if let Some(stripped) = s.strip_prefix("$$") {
75 Self::Sealed(
76 base64::prelude::BASE64_URL_SAFE
77 .decode(stripped)
78 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
79 )
80 } else if let Some(stripped) = s.strip_prefix("#") {
81 Self::Service(
82 stripped
83 .parse()
84 .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
85 )
86 } else {
87 Self::Plain(s.to_owned())
88 })
89 }
90}
91
92impl From<SessionTargetSpec> for hopr_lib::utils::session::SessionTargetSpec {
93 fn from(spec: SessionTargetSpec) -> Self {
94 match spec {
95 SessionTargetSpec::Plain(t) => Self::Plain(t),
96 SessionTargetSpec::Sealed(t) => Self::Sealed(t),
97 SessionTargetSpec::Service(t) => Self::Service(t),
98 }
99 }
100}
101
102#[repr(u8)]
103#[derive(
104 Debug,
105 Clone,
106 strum::EnumIter,
107 strum::Display,
108 strum::EnumString,
109 Serialize,
110 Deserialize,
111 PartialEq,
112 utoipa::ToSchema,
113)]
114#[schema(example = "Segmentation")]
115pub enum SessionCapability {
117 Segmentation,
119 Retransmission,
121 RetransmissionAckOnly,
123 NoDelay,
125 NoRateControl,
127}
128
129impl From<SessionCapability> for hopr_lib::SessionCapabilities {
130 fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
131 match cap {
132 SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
133 SessionCapability::Retransmission => {
134 hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
135 }
136 SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
137 SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
138 SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
139 }
140 }
141}
142
143#[serde_as]
144#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
145#[serde(rename_all = "camelCase")]
146pub(crate) struct SessionWebsocketClientQueryRequest {
147 #[serde_as(as = "DisplayFromStr")]
148 #[schema(required = true, value_type = String)]
149 pub destination: Address,
150 #[schema(required = true)]
151 pub hops: u8,
152 #[cfg(feature = "explicit-path")]
153 #[schema(required = false, value_type = String)]
154 pub path: Option<Vec<Address>>,
155 #[schema(required = true)]
156 #[serde_as(as = "Vec<DisplayFromStr>")]
157 pub capabilities: Vec<SessionCapability>,
158 #[schema(required = true)]
159 #[serde_as(as = "DisplayFromStr")]
160 pub target: SessionTargetSpec,
161 #[schema(required = false)]
162 #[serde(default = "default_protocol")]
163 pub protocol: IpProtocol,
164}
165
166#[inline]
167fn default_protocol() -> IpProtocol {
168 IpProtocol::TCP
169}
170
171impl SessionWebsocketClientQueryRequest {
172 pub(crate) async fn into_protocol_session_config(
173 self,
174 ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
175 #[cfg(not(feature = "explicit-path"))]
176 let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
177
178 #[cfg(feature = "explicit-path")]
179 let path_options = if let Some(path) = self.path {
180 let path = path.into_iter().map(NodeId::from).collect::<Vec<_>>();
182 hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
183 } else {
184 hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
185 };
186
187 let mut capabilities = SessionCapabilities::empty();
188 capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
189
190 let target_spec: hopr_lib::utils::session::SessionTargetSpec = self.target.into();
191
192 Ok((
193 self.destination,
194 target_spec.into_target(self.protocol.into())?,
195 SessionClientConfig {
196 forward_path_options: path_options.clone(),
197 return_path_options: path_options.clone(), capabilities,
199 ..Default::default()
200 },
201 ))
202 }
203}
204
205#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
206#[schema(value_type = String, format = Binary)]
207#[allow(dead_code)] struct WssData(Vec<u8>);
209
210#[allow(dead_code)] #[utoipa::path(
222 get,
223 path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
224 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
225 request_body(
226 content = SessionWebsocketClientQueryRequest,
227 content_type = "application/json",
228 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
229 ),
230 responses(
231 (status = 200, description = "Successfully created a new client websocket session."),
232 (status = 401, description = "Invalid authorization token.", body = ApiError),
233 (status = 422, description = "Unknown failure", body = ApiError),
234 (status = 429, description = "Too many open websocket connections.", body = ApiError),
235 ),
236 security(
237 ("api_token" = []),
238 ("bearer_token" = [])
239 ),
240 tag = "Session",
241 )]
242
243pub(crate) async fn websocket(
244 ws: WebSocketUpgrade,
245 Query(query): Query<SessionWebsocketClientQueryRequest>,
246 State(state): State<Arc<InternalState>>,
247) -> Result<impl IntoResponse, impl IntoResponse> {
248 let (dst, target, data) = query
249 .into_protocol_session_config()
250 .await
251 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
252
253 let hopr = state.hopr.clone();
254 let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
255 error!(error = %e, "Failed to establish session");
256 (
257 StatusCode::UNPROCESSABLE_ENTITY,
258 ApiErrorStatus::UnknownFailure(e.to_string()),
259 )
260 })?;
261
262 Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
263}
264
265enum WebSocketInput {
266 Network(Result<Box<[u8]>, std::io::Error>),
267 WsInput(Result<Message, Error>),
268}
269
270const WS_MAX_SESSION_READ_SIZE: usize = 4096;
272
273#[tracing::instrument(level = "debug", skip(socket, session))]
274async fn websocket_connection(socket: WebSocket, session: HoprSession) {
275 let session_id = *session.id();
276
277 let (rx, mut tx) = session.split();
278 let (mut sender, receiver) = socket.split();
279
280 let mut queue = (
281 receiver.map(WebSocketInput::WsInput),
282 AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
283 )
284 .merge();
285
286 let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
287
288 while let Some(v) = queue.next().await {
289 match v {
290 WebSocketInput::Network(bytes) => match bytes {
291 Ok(bytes) => {
292 let len = bytes.len();
293 if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
294 error!(
295 error = %e,
296 "Failed to emit read data onto the websocket, closing connection"
297 );
298 break;
299 };
300 bytes_from_session += len;
301 }
302 Err(e) => {
303 error!(
304 error = %e,
305 "Failed to push data from network to socket, closing connection"
306 );
307 break;
308 }
309 },
310 WebSocketInput::WsInput(ws_in) => match ws_in {
311 Ok(Message::Binary(data)) => {
312 let len = data.len();
313 if let Err(e) = tx.write(data.as_ref()).await {
314 error!(error = %e, "Failed to write data to the session, closing connection");
315 break;
316 }
317 bytes_to_session += len;
318 }
319 Ok(Message::Text(_)) => {
320 error!("Received string instead of binary data, closing connection");
321 break;
322 }
323 Ok(Message::Close(_)) => {
324 debug!("Received close frame, closing connection");
325 break;
326 }
327 Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
328 Err(e) => {
329 error!(error = %e, "Failed to get a valid websocket message, closing connection");
330 break;
331 }
332 },
333 }
334 }
335
336 info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
337}
338
339#[serde_as]
340#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
341#[schema(example = json!({ "Hops": 1 }))]
342pub enum RoutingOptions {
344 #[cfg(feature = "explicit-path")]
345 #[schema(value_type = Vec<String>)]
346 IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<NodeId>),
347 Hops(usize),
348}
349
350impl RoutingOptions {
351 pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
352 Ok(match self {
353 #[cfg(feature = "explicit-path")]
354 RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
355 RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
356 })
357 }
358}
359
360impl From<hopr_lib::RoutingOptions> for RoutingOptions {
361 fn from(opts: hopr_lib::RoutingOptions) -> Self {
362 match opts {
363 #[cfg(feature = "explicit-path")]
364 hopr_lib::RoutingOptions::IntermediatePath(path) => {
365 RoutingOptions::IntermediatePath(path.into_iter().collect())
366 }
367 hopr_lib::RoutingOptions::Hops(hops) => RoutingOptions::Hops(usize::from(hops)),
368 }
369 }
370}
371
372#[serde_as]
373#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
374#[schema(example = json!({
375 "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
376 "forwardPath": { "Hops": 1 },
377 "returnPath": { "Hops": 1 },
378 "target": {"Plain": "localhost:8080"},
379 "listenHost": "127.0.0.1:10000",
380 "capabilities": ["Retransmission", "Segmentation"],
381 "responseBuffer": "2 MB",
382 "maxSurbUpstream": "2000 kb/s",
383 "sessionPool": 0,
384 "maxClientSessions": 2
385 }))]
386#[serde(rename_all = "camelCase")]
387pub(crate) struct SessionClientRequest {
389 #[serde_as(as = "DisplayFromStr")]
391 #[schema(value_type = String)]
392 pub destination: Address,
393 pub forward_path: RoutingOptions,
395 pub return_path: RoutingOptions,
397 pub target: SessionTargetSpec,
399 pub listen_host: Option<String>,
404 #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
405 pub capabilities: Option<Vec<SessionCapability>>,
409 #[serde_as(as = "Option<DisplayFromStr>")]
419 #[schema(value_type = Option<String>)]
420 pub response_buffer: Option<bytesize::ByteSize>,
421 #[serde(default)]
429 #[serde(with = "human_bandwidth::option")]
430 #[schema(value_type = Option<String>)]
431 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
432 pub session_pool: Option<usize>,
439 pub max_client_sessions: Option<usize>,
449}
450
451impl SessionClientRequest {
452 pub(crate) async fn into_protocol_session_config(
453 self,
454 target_protocol: IpProtocol,
455 ) -> Result<(hopr_lib::Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
456 let target_spec: hopr_lib::utils::session::SessionTargetSpec = self.target.clone().into();
457 Ok((
458 self.destination,
459 target_spec.into_target(target_protocol.into())?,
460 SessionClientConfig {
461 forward_path_options: self.forward_path.resolve().await?,
462 return_path_options: self.return_path.resolve().await?,
463 capabilities: self
464 .capabilities
465 .map(|vs| {
466 let mut caps = SessionCapabilities::empty();
467 caps.extend(vs.into_iter().map(SessionCapabilities::from));
468 caps
469 })
470 .unwrap_or_else(|| match target_protocol {
471 IpProtocol::TCP => {
472 hopr_lib::SessionCapability::RetransmissionAck
473 | hopr_lib::SessionCapability::RetransmissionNack
474 | hopr_lib::SessionCapability::Segmentation
475 }
476 _ => SessionCapability::Segmentation.into(),
478 }),
479 surb_management: SessionConfig {
480 response_buffer: self.response_buffer,
481 max_surb_upstream: self.max_surb_upstream,
482 }
483 .into(),
484 ..Default::default()
485 },
486 ))
487 }
488}
489
490#[serde_as]
491#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
492#[schema(example = json!({
493 "target": "example.com:80",
494 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
495 "forwardPath": { "Hops": 1 },
496 "returnPath": { "Hops": 1 },
497 "protocol": "tcp",
498 "ip": "127.0.0.1",
499 "port": 5542,
500 "hoprMtu": 1002,
501 "surbLen": 398,
502 "activeClients": [],
503 "maxClientSessions": 2,
504 "maxSurbUpstream": "2000 kb/s",
505 "responseBuffer": "2 MB",
506 "sessionPool": 0
507 }))]
508#[serde(rename_all = "camelCase")]
509pub(crate) struct SessionClientResponse {
511 #[schema(example = "example.com:80")]
512 pub target: String,
514 #[serde_as(as = "DisplayFromStr")]
516 #[schema(value_type = String)]
517 pub destination: Address,
518 pub forward_path: RoutingOptions,
520 pub return_path: RoutingOptions,
522 #[serde_as(as = "DisplayFromStr")]
524 #[schema(example = "tcp")]
525 pub protocol: IpProtocol,
526 #[schema(example = "127.0.0.1")]
528 pub ip: String,
529 #[schema(example = 5542)]
530 pub port: u16,
532 pub hopr_mtu: usize,
534 pub surb_len: usize,
538 pub active_clients: Vec<String>,
543 pub max_client_sessions: usize,
548 #[serde(default)]
551 #[serde(with = "human_bandwidth::option")]
552 #[schema(value_type = Option<String>)]
553 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
554 #[serde_as(as = "Option<DisplayFromStr>")]
557 #[schema(value_type = Option<String>)]
558 pub response_buffer: Option<bytesize::ByteSize>,
559 pub session_pool: Option<usize>,
561}
562
563#[utoipa::path(
580 post,
581 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
582 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
583 params(
584 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
585 ),
586 request_body(
587 content = SessionClientRequest,
588 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
589 content_type = "application/json"),
590 responses(
591 (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
592 (status = 400, description = "Invalid IP protocol.", body = ApiError),
593 (status = 401, description = "Invalid authorization token.", body = ApiError),
594 (status = 409, description = "Listening address and port already in use.", body = ApiError),
595 (status = 422, description = "Unknown failure", body = ApiError),
596 ),
597 security(
598 ("api_token" = []),
599 ("bearer_token" = [])
600 ),
601 tag = "Session"
602 )]
603pub(crate) async fn create_client(
604 State(state): State<Arc<InternalState>>,
605 Path(protocol): Path<IpProtocol>,
606 Json(args): Json<SessionClientRequest>,
607) -> Result<impl IntoResponse, impl IntoResponse> {
608 let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
609
610 let listener_id = ListenerId(protocol.into(), bind_host);
611 if bind_host.port() > 0 && state.open_listeners.read_arc().await.contains_key(&listener_id) {
612 return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
613 }
614
615 let port_range = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE).ok();
616 debug!("binding {protocol} session listening socket to {bind_host} (port range limitations: {port_range:?})");
617
618 let (bound_host, udp_session_id, max_clients) = match protocol {
619 IpProtocol::TCP => {
620 let session_pool = args.session_pool;
621 let max_client_sessions = args.max_client_sessions;
622 let target_spec: hopr_lib::utils::session::SessionTargetSpec = args.target.clone().into();
623 let (destination, _target, config) = args
624 .clone()
625 .into_protocol_session_config(IpProtocol::TCP)
626 .await
627 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
628
629 create_tcp_client_binding(
630 bind_host,
631 port_range,
632 state.hopr.clone(),
633 state.open_listeners.clone(),
634 destination,
635 target_spec,
636 config,
637 session_pool,
638 max_client_sessions,
639 )
640 .await
641 .map_err(|e| match e {
642 hopr_lib::utils::session::BindError::ListenHostAlreadyUsed => {
643 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
644 }
645 hopr_lib::utils::session::BindError::UnknownFailure(_) => (
646 StatusCode::UNPROCESSABLE_ENTITY,
647 ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
648 ),
649 })?
650 }
651 IpProtocol::UDP => {
652 let target_spec: hopr_lib::utils::session::SessionTargetSpec = args.target.clone().into();
653 let (destination, _target, config) = args
654 .clone()
655 .into_protocol_session_config(IpProtocol::UDP)
656 .await
657 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
658
659 create_udp_client_binding(
660 bind_host,
661 port_range,
662 state.hopr.clone(),
663 state.open_listeners.clone(),
664 destination,
665 target_spec,
666 config,
667 )
668 .await
669 .map_err(|e| match e {
670 hopr_lib::utils::session::BindError::ListenHostAlreadyUsed => {
671 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
672 }
673 hopr_lib::utils::session::BindError::UnknownFailure(_) => (
674 StatusCode::UNPROCESSABLE_ENTITY,
675 ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
676 ),
677 })?
678 }
679 };
680
681 Ok::<_, (StatusCode, ApiErrorStatus)>(
682 (
683 StatusCode::OK,
684 Json(SessionClientResponse {
685 protocol,
686 ip: bound_host.ip().to_string(),
687 port: bound_host.port(),
688 target: args.target.to_string(),
689 destination: args.destination,
690 forward_path: args.forward_path.clone(),
691 return_path: args.return_path.clone(),
692 hopr_mtu: SESSION_MTU,
693 surb_len: SURB_SIZE,
694 active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
695 max_client_sessions: max_clients,
696 max_surb_upstream: args.max_surb_upstream,
697 response_buffer: args.response_buffer,
698 session_pool: args.session_pool,
699 }),
700 )
701 .into_response(),
702 )
703}
704
705#[utoipa::path(
707 get,
708 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
709 description = "Lists existing Session listeners for the given IP protocol.",
710 params(
711 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
712 ),
713 responses(
714 (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
715 {
716 "target": "example.com:80",
717 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
718 "forwardPath": { "Hops": 1 },
719 "returnPath": { "Hops": 1 },
720 "protocol": "tcp",
721 "ip": "127.0.0.1",
722 "port": 5542,
723 "surbLen": 400,
724 "hoprMtu": 1020,
725 "activeClients": [],
726 "maxClientSessions": 2,
727 "maxSurbUpstream": "2000 kb/s",
728 "responseBuffer": "2 MB",
729 "sessionPool": 0
730 }
731 ])),
732 (status = 400, description = "Invalid IP protocol.", body = ApiError),
733 (status = 401, description = "Invalid authorization token.", body = ApiError),
734 (status = 422, description = "Unknown failure", body = ApiError)
735 ),
736 security(
737 ("api_token" = []),
738 ("bearer_token" = [])
739 ),
740 tag = "Session",
741)]
742pub(crate) async fn list_clients(
743 State(state): State<Arc<InternalState>>,
744 Path(protocol): Path<IpProtocol>,
745) -> Result<impl IntoResponse, impl IntoResponse> {
746 let response = state
747 .open_listeners
748 .read_arc()
749 .await
750 .iter()
751 .filter(|(id, _)| id.0 == protocol.into())
752 .map(|(id, entry)| SessionClientResponse {
753 protocol,
754 ip: id.1.ip().to_string(),
755 port: id.1.port(),
756 target: entry.target.to_string(),
757 forward_path: entry.forward_path.clone().into(),
758 return_path: entry.return_path.clone().into(),
759 destination: entry.destination,
760 hopr_mtu: SESSION_MTU,
761 surb_len: SURB_SIZE,
762 active_clients: entry.get_clients().iter().map(|e| e.key().to_string()).collect(),
763 max_client_sessions: entry.max_client_sessions,
764 max_surb_upstream: entry.max_surb_upstream,
765 response_buffer: entry.response_buffer,
766 session_pool: entry.session_pool,
767 })
768 .collect::<Vec<_>>();
769
770 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
771}
772
773#[serde_as]
774#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
775#[schema(example = json!({
776 "responseBuffer": "2 MB",
777 "maxSurbUpstream": "2 Mbps"
778}))]
779#[serde(rename_all = "camelCase")]
780pub(crate) struct SessionConfig {
781 #[serde(default)]
791 #[serde_as(as = "Option<DisplayFromStr>")]
792 #[schema(value_type = String)]
793 pub response_buffer: Option<bytesize::ByteSize>,
794 #[serde(default)]
802 #[serde(with = "human_bandwidth::option")]
803 #[schema(value_type = String)]
804 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
805}
806
807impl From<SessionConfig> for Option<SurbBalancerConfig> {
808 fn from(value: SessionConfig) -> Self {
809 match value.response_buffer {
810 Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
812 target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
813 max_surbs_per_sec: value
814 .max_surb_upstream
815 .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
816 .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
817 ..Default::default()
818 }),
819 Some(_) => None,
821 None => Some(SurbBalancerConfig::default()),
823 }
824 }
825}
826
827impl From<SurbBalancerConfig> for SessionConfig {
828 fn from(value: SurbBalancerConfig) -> Self {
829 Self {
830 response_buffer: Some(bytesize::ByteSize::b(
831 value.target_surb_buffer_size * SESSION_MTU as u64,
832 )),
833 max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
834 value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
835 )),
836 }
837 }
838}
839
840#[utoipa::path(
841 post,
842 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
843 description = "Updates configuration of an existing active session.",
844 params(
845 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
846 ),
847 request_body(
848 content = SessionConfig,
849 description = "Allows updating of several parameters of an existing active session.",
850 content_type = "application/json"),
851 responses(
852 (status = 204, description = "Successfully updated the configuration"),
853 (status = 400, description = "Invalid configuration.", body = ApiError),
854 (status = 401, description = "Invalid authorization token.", body = ApiError),
855 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
856 (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
857 (status = 422, description = "Unknown failure", body = ApiError),
858 ),
859 security(
860 ("api_token" = []),
861 ("bearer_token" = [])
862 ),
863 tag = "Session"
864)]
865pub(crate) async fn adjust_session(
866 State(state): State<Arc<InternalState>>,
867 Path(session_id): Path<String>,
868 Json(args): Json<SessionConfig>,
869) -> Result<impl IntoResponse, impl IntoResponse> {
870 let session_id = HoprSessionId::from_str(&session_id)
871 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
872
873 if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
874 match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
875 Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
876 Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
877 SessionManagerError::NonExistingSession,
878 )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
879 Err(e) => Err((
880 StatusCode::NOT_ACCEPTABLE,
881 ApiErrorStatus::UnknownFailure(e.to_string()),
882 )),
883 }
884 } else {
885 Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
886 }
887}
888
889#[utoipa::path(
890 get,
891 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
892 description = "Gets configuration of an existing active session.",
893 params(
894 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
895 ),
896 responses(
897 (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
898 (status = 400, description = "Invalid session ID.", body = ApiError),
899 (status = 401, description = "Invalid authorization token.", body = ApiError),
900 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
901 (status = 422, description = "Unknown failure", body = ApiError),
902 ),
903 security(
904 ("api_token" = []),
905 ("bearer_token" = [])
906 ),
907 tag = "Session"
908)]
909pub(crate) async fn session_config(
910 State(state): State<Arc<InternalState>>,
911 Path(session_id): Path<String>,
912) -> Result<impl IntoResponse, impl IntoResponse> {
913 let session_id = HoprSessionId::from_str(&session_id)
914 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
915
916 match state.hopr.get_session_surb_balancer_config(&session_id).await {
917 Ok(Some(cfg)) => {
918 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
919 }
920 Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
921 Err(e) => Err((
922 StatusCode::UNPROCESSABLE_ENTITY,
923 ApiErrorStatus::UnknownFailure(e.to_string()),
924 )),
925 }
926}
927
928#[derive(
929 Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
930)]
931#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
932#[serde(rename_all = "lowercase")]
933#[schema(example = "tcp")]
934pub enum IpProtocol {
936 #[allow(clippy::upper_case_acronyms)]
937 TCP,
938 #[allow(clippy::upper_case_acronyms)]
939 UDP,
940}
941
942impl From<IpProtocol> for hopr_lib::IpProtocol {
943 fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
944 match protocol {
945 IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
946 IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
947 }
948 }
949}
950
951#[serde_as]
952#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
953pub struct SessionCloseClientQuery {
954 #[serde_as(as = "DisplayFromStr")]
955 #[schema(value_type = String, example = "tcp")]
956 pub protocol: IpProtocol,
958
959 #[schema(example = "127.0.0.1:8545")]
961 pub ip: String,
962
963 #[schema(value_type = u16, example = 10101)]
965 pub port: u16,
966}
967
968#[utoipa::path(
974 delete,
975 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
976 description = "Closes an existing Session listener.",
977 params(SessionCloseClientQuery),
978 responses(
979 (status = 204, description = "Listener closed successfully"),
980 (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
981 (status = 401, description = "Invalid authorization token.", body = ApiError),
982 (status = 404, description = "Listener not found.", body = ApiError),
983 (status = 422, description = "Unknown failure", body = ApiError)
984 ),
985 security(
986 ("api_token" = []),
987 ("bearer_token" = [])
988 ),
989 tag = "Session",
990)]
991pub(crate) async fn close_client(
992 State(state): State<Arc<InternalState>>,
993 Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
994) -> Result<impl IntoResponse, impl IntoResponse> {
995 let listening_ip: IpAddr = ip
996 .parse()
997 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
998
999 {
1000 let mut open_listeners = state.open_listeners.write_arc().await;
1001
1002 let mut to_remove = Vec::new();
1003 let protocol: hopr_lib::IpProtocol = protocol.into();
1004
1005 open_listeners
1007 .iter()
1008 .filter(|(ListenerId(proto, addr), _)| {
1009 protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1010 })
1011 .for_each(|(id, _)| to_remove.push(*id));
1012
1013 if to_remove.is_empty() {
1014 return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1015 }
1016
1017 for bound_addr in to_remove {
1018 let entry = open_listeners
1019 .remove(&bound_addr)
1020 .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1021
1022 entry.abort_handle.abort();
1023 }
1024 }
1025
1026 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1027}