1use std::{fmt::Formatter, hash::Hash, net::IpAddr, str::FromStr, sync::Arc};
2
3use axum::{
4 Error,
5 extract::{
6 Json, Path, State,
7 ws::{Message, WebSocket, WebSocketUpgrade},
8 },
9 http::status::StatusCode,
10 response::IntoResponse,
11};
12use axum_extra::extract::Query;
13use base64::Engine;
14use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt};
15use futures_concurrency::stream::Merge;
16use hopr_lib::{
17 Address, HoprSession, HoprSessionId, HoprTransportError, SESSION_MTU, SURB_SIZE, ServiceId, SessionCapabilities,
18 SessionClientConfig, SessionManagerError, SessionTarget, SurbBalancerConfig, TransportSessionError,
19 errors::HoprLibError,
20 utils::{
21 futures::AsyncReadStreamer,
22 session::{ListenerId, build_binding_host, create_tcp_client_binding, create_udp_client_binding},
23 },
24};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use tracing::{debug, error, info, trace};
28
29use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState};
30
31pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
33
34pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
36
37pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
39
40#[allow(unused_imports)]
42use serde_json::json;
43
44#[serde_as]
45#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
46#[schema(
47 example = json!({"Plain": "example.com:80"}),
48 example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), example = json!({"Service": 0})
50)]
51pub enum SessionTargetSpec {
53 Plain(String),
54 Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
55 #[schema(value_type = u32)]
56 Service(ServiceId),
57}
58
59impl std::fmt::Display for SessionTargetSpec {
60 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61 match self {
62 SessionTargetSpec::Plain(t) => write!(f, "{t}"),
63 SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
64 SessionTargetSpec::Service(t) => write!(f, "#{t}"),
65 }
66 }
67}
68
69impl FromStr for SessionTargetSpec {
70 type Err = HoprLibError;
71
72 fn from_str(s: &str) -> Result<Self, Self::Err> {
73 Ok(if let Some(stripped) = s.strip_prefix("$$") {
74 Self::Sealed(
75 base64::prelude::BASE64_URL_SAFE
76 .decode(stripped)
77 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
78 )
79 } else if let Some(stripped) = s.strip_prefix("#") {
80 Self::Service(
81 stripped
82 .parse()
83 .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
84 )
85 } else {
86 Self::Plain(s.to_owned())
87 })
88 }
89}
90
91impl From<SessionTargetSpec> for hopr_lib::utils::session::SessionTargetSpec {
92 fn from(spec: SessionTargetSpec) -> Self {
93 match spec {
94 SessionTargetSpec::Plain(t) => Self::Plain(t),
95 SessionTargetSpec::Sealed(t) => Self::Sealed(t),
96 SessionTargetSpec::Service(t) => Self::Service(t),
97 }
98 }
99}
100
101#[repr(u8)]
102#[derive(
103 Debug,
104 Clone,
105 strum::EnumIter,
106 strum::Display,
107 strum::EnumString,
108 Serialize,
109 Deserialize,
110 PartialEq,
111 utoipa::ToSchema,
112)]
113#[schema(example = "Segmentation")]
114pub enum SessionCapability {
116 Segmentation,
118 Retransmission,
120 RetransmissionAckOnly,
122 NoDelay,
124 NoRateControl,
126}
127
128impl From<SessionCapability> for hopr_lib::SessionCapabilities {
129 fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
130 match cap {
131 SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
132 SessionCapability::Retransmission => {
133 hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
134 }
135 SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
136 SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
137 SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
138 }
139 }
140}
141
142#[serde_as]
143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
144#[serde(rename_all = "camelCase")]
145pub(crate) struct SessionWebsocketClientQueryRequest {
146 #[serde_as(as = "DisplayFromStr")]
147 #[schema(required = true, value_type = String)]
148 pub destination: Address,
149 #[schema(required = true)]
150 pub hops: u8,
151 #[cfg(feature = "explicit-path")]
152 #[schema(required = false, value_type = String)]
153 pub path: Option<Vec<Address>>,
154 #[schema(required = true)]
155 #[serde_as(as = "Vec<DisplayFromStr>")]
156 pub capabilities: Vec<SessionCapability>,
157 #[schema(required = true)]
158 #[serde_as(as = "DisplayFromStr")]
159 pub target: SessionTargetSpec,
160 #[schema(required = false)]
161 #[serde(default = "default_protocol")]
162 pub protocol: IpProtocol,
163}
164
165#[inline]
166fn default_protocol() -> IpProtocol {
167 IpProtocol::TCP
168}
169
170impl SessionWebsocketClientQueryRequest {
171 pub(crate) async fn into_protocol_session_config(
172 self,
173 ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
174 #[cfg(not(feature = "explicit-path"))]
175 let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
176
177 #[cfg(feature = "explicit-path")]
178 let path_options = if let Some(path) = self.path {
179 hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
181 } else {
182 hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
183 };
184
185 let mut capabilities = SessionCapabilities::empty();
186 capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
187
188 let target_spec: hopr_lib::utils::session::SessionTargetSpec = self.target.into();
189
190 Ok((
191 self.destination,
192 target_spec.into_target(self.protocol.into())?,
193 SessionClientConfig {
194 forward_path_options: path_options.clone(),
195 return_path_options: path_options.clone(), capabilities,
197 ..Default::default()
198 },
199 ))
200 }
201}
202
203#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
204#[schema(value_type = String, format = Binary)]
205#[allow(dead_code)] struct WssData(Vec<u8>);
207
208#[allow(dead_code)] #[utoipa::path(
220 get,
221 path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
222 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
223 request_body(
224 content = SessionWebsocketClientQueryRequest,
225 content_type = "application/json",
226 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
227 ),
228 responses(
229 (status = 200, description = "Successfully created a new client websocket session."),
230 (status = 401, description = "Invalid authorization token.", body = ApiError),
231 (status = 422, description = "Unknown failure", body = ApiError),
232 (status = 429, description = "Too many open websocket connections.", body = ApiError),
233 ),
234 security(
235 ("api_token" = []),
236 ("bearer_token" = [])
237 ),
238 tag = "Session",
239 )]
240
241pub(crate) async fn websocket(
242 ws: WebSocketUpgrade,
243 Query(query): Query<SessionWebsocketClientQueryRequest>,
244 State(state): State<Arc<InternalState>>,
245) -> Result<impl IntoResponse, impl IntoResponse> {
246 let (dst, target, data) = query
247 .into_protocol_session_config()
248 .await
249 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
250
251 let hopr = state.hopr.clone();
252 let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
253 error!(error = %e, "Failed to establish session");
254 (
255 StatusCode::UNPROCESSABLE_ENTITY,
256 ApiErrorStatus::UnknownFailure(e.to_string()),
257 )
258 })?;
259
260 Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
261}
262
263enum WebSocketInput {
264 Network(Result<Box<[u8]>, std::io::Error>),
265 WsInput(Result<Message, Error>),
266}
267
268const WS_MAX_SESSION_READ_SIZE: usize = 4096;
270
271#[tracing::instrument(level = "debug", skip(socket, session))]
272async fn websocket_connection(socket: WebSocket, session: HoprSession) {
273 let session_id = *session.id();
274
275 let (rx, mut tx) = session.split();
276 let (mut sender, receiver) = socket.split();
277
278 let mut queue = (
279 receiver.map(WebSocketInput::WsInput),
280 AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
281 )
282 .merge();
283
284 let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
285
286 while let Some(v) = queue.next().await {
287 match v {
288 WebSocketInput::Network(bytes) => match bytes {
289 Ok(bytes) => {
290 let len = bytes.len();
291 if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
292 error!(
293 error = %e,
294 "Failed to emit read data onto the websocket, closing connection"
295 );
296 break;
297 };
298 bytes_from_session += len;
299 }
300 Err(e) => {
301 error!(
302 error = %e,
303 "Failed to push data from network to socket, closing connection"
304 );
305 break;
306 }
307 },
308 WebSocketInput::WsInput(ws_in) => match ws_in {
309 Ok(Message::Binary(data)) => {
310 let len = data.len();
311 if let Err(e) = tx.write(data.as_ref()).await {
312 error!(error = %e, "Failed to write data to the session, closing connection");
313 break;
314 }
315 bytes_to_session += len;
316 }
317 Ok(Message::Text(_)) => {
318 error!("Received string instead of binary data, closing connection");
319 break;
320 }
321 Ok(Message::Close(_)) => {
322 debug!("Received close frame, closing connection");
323 break;
324 }
325 Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
326 Err(e) => {
327 error!(error = %e, "Failed to get a valid websocket message, closing connection");
328 break;
329 }
330 },
331 }
332 }
333
334 info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
335}
336
337#[serde_as]
338#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
339#[schema(example = json!({ "Hops": 1 }))]
340pub enum RoutingOptions {
342 #[cfg(feature = "explicit-path")]
343 #[schema(value_type = Vec<String>)]
344 IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<Address>),
345 Hops(usize),
346}
347
348impl RoutingOptions {
349 pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
350 Ok(match self {
351 #[cfg(feature = "explicit-path")]
352 RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
353 RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
354 })
355 }
356}
357
358impl From<hopr_lib::RoutingOptions> for RoutingOptions {
359 fn from(opts: hopr_lib::RoutingOptions) -> Self {
360 match opts {
361 #[cfg(feature = "explicit-path")]
362 hopr_lib::RoutingOptions::IntermediatePath(path) => {
363 RoutingOptions::IntermediatePath(path.into_iter().collect())
364 }
365 hopr_lib::RoutingOptions::Hops(hops) => RoutingOptions::Hops(usize::from(hops)),
366 }
367 }
368}
369
370#[serde_as]
371#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
372#[schema(example = json!({
373 "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
374 "forwardPath": { "Hops": 1 },
375 "returnPath": { "Hops": 1 },
376 "target": {"Plain": "localhost:8080"},
377 "listenHost": "127.0.0.1:10000",
378 "capabilities": ["Retransmission", "Segmentation"],
379 "responseBuffer": "2 MB",
380 "maxSurbUpstream": "2000 kb/s",
381 "sessionPool": 0,
382 "maxClientSessions": 2
383 }))]
384#[serde(rename_all = "camelCase")]
385pub(crate) struct SessionClientRequest {
387 #[serde_as(as = "DisplayFromStr")]
389 #[schema(value_type = String)]
390 pub destination: Address,
391 pub forward_path: RoutingOptions,
393 pub return_path: RoutingOptions,
395 pub target: SessionTargetSpec,
397 pub listen_host: Option<String>,
402 #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
403 pub capabilities: Option<Vec<SessionCapability>>,
407 #[serde_as(as = "Option<DisplayFromStr>")]
417 #[schema(value_type = Option<String>)]
418 pub response_buffer: Option<bytesize::ByteSize>,
419 #[serde(default)]
427 #[serde(with = "human_bandwidth::option")]
428 #[schema(value_type = Option<String>)]
429 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
430 pub session_pool: Option<usize>,
437 pub max_client_sessions: Option<usize>,
447}
448
449impl SessionClientRequest {
450 pub(crate) async fn into_protocol_session_config(
451 self,
452 target_protocol: IpProtocol,
453 ) -> Result<(hopr_lib::Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
454 let target_spec: hopr_lib::utils::session::SessionTargetSpec = self.target.clone().into();
455 Ok((
456 self.destination,
457 target_spec.into_target(target_protocol.into())?,
458 SessionClientConfig {
459 forward_path_options: self.forward_path.resolve().await?,
460 return_path_options: self.return_path.resolve().await?,
461 capabilities: self
462 .capabilities
463 .map(|vs| {
464 let mut caps = SessionCapabilities::empty();
465 caps.extend(vs.into_iter().map(SessionCapabilities::from));
466 caps
467 })
468 .unwrap_or_else(|| match target_protocol {
469 IpProtocol::TCP => {
470 hopr_lib::SessionCapability::RetransmissionAck
471 | hopr_lib::SessionCapability::RetransmissionNack
472 | hopr_lib::SessionCapability::Segmentation
473 }
474 _ => SessionCapability::Segmentation.into(),
476 }),
477 surb_management: SessionConfig {
478 response_buffer: self.response_buffer,
479 max_surb_upstream: self.max_surb_upstream,
480 }
481 .into(),
482 ..Default::default()
483 },
484 ))
485 }
486}
487
488#[serde_as]
489#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
490#[schema(example = json!({
491 "target": "example.com:80",
492 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
493 "forwardPath": { "Hops": 1 },
494 "returnPath": { "Hops": 1 },
495 "protocol": "tcp",
496 "ip": "127.0.0.1",
497 "port": 5542,
498 "hoprMtu": 1002,
499 "surbLen": 398,
500 "activeClients": [],
501 "maxClientSessions": 2,
502 "maxSurbUpstream": "2000 kb/s",
503 "responseBuffer": "2 MB",
504 "sessionPool": 0
505 }))]
506#[serde(rename_all = "camelCase")]
507pub(crate) struct SessionClientResponse {
509 #[schema(example = "example.com:80")]
510 pub target: String,
512 #[serde_as(as = "DisplayFromStr")]
514 #[schema(value_type = String)]
515 pub destination: Address,
516 pub forward_path: RoutingOptions,
518 pub return_path: RoutingOptions,
520 #[serde_as(as = "DisplayFromStr")]
522 #[schema(example = "tcp")]
523 pub protocol: IpProtocol,
524 #[schema(example = "127.0.0.1")]
526 pub ip: String,
527 #[schema(example = 5542)]
528 pub port: u16,
530 pub hopr_mtu: usize,
532 pub surb_len: usize,
536 pub active_clients: Vec<String>,
541 pub max_client_sessions: usize,
546 #[serde(default)]
549 #[serde(with = "human_bandwidth::option")]
550 #[schema(value_type = Option<String>)]
551 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
552 #[serde_as(as = "Option<DisplayFromStr>")]
555 #[schema(value_type = Option<String>)]
556 pub response_buffer: Option<bytesize::ByteSize>,
557 pub session_pool: Option<usize>,
559}
560
561#[utoipa::path(
578 post,
579 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
580 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
581 params(
582 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
583 ),
584 request_body(
585 content = SessionClientRequest,
586 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
587 content_type = "application/json"),
588 responses(
589 (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
590 (status = 400, description = "Invalid IP protocol.", body = ApiError),
591 (status = 401, description = "Invalid authorization token.", body = ApiError),
592 (status = 409, description = "Listening address and port already in use.", body = ApiError),
593 (status = 422, description = "Unknown failure", body = ApiError),
594 ),
595 security(
596 ("api_token" = []),
597 ("bearer_token" = [])
598 ),
599 tag = "Session"
600 )]
601pub(crate) async fn create_client(
602 State(state): State<Arc<InternalState>>,
603 Path(protocol): Path<IpProtocol>,
604 Json(args): Json<SessionClientRequest>,
605) -> Result<impl IntoResponse, impl IntoResponse> {
606 let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
607
608 let listener_id = ListenerId(protocol.into(), bind_host);
609 if bind_host.port() > 0 && state.open_listeners.read_arc().await.contains_key(&listener_id) {
610 return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
611 }
612
613 let port_range = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE).ok();
614 debug!("binding {protocol} session listening socket to {bind_host} (port range limitations: {port_range:?})");
615
616 let (bound_host, udp_session_id, max_clients) = match protocol {
617 IpProtocol::TCP => {
618 let session_pool = args.session_pool;
619 let max_client_sessions = args.max_client_sessions;
620 let target_spec: hopr_lib::utils::session::SessionTargetSpec = args.target.clone().into();
621 let (destination, _target, config) = args
622 .clone()
623 .into_protocol_session_config(IpProtocol::TCP)
624 .await
625 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
626
627 create_tcp_client_binding(
628 bind_host,
629 port_range,
630 state.hopr.clone(),
631 state.open_listeners.clone(),
632 destination,
633 target_spec,
634 config,
635 session_pool,
636 max_client_sessions,
637 )
638 .await
639 .map_err(|e| match e {
640 hopr_lib::utils::session::BindError::ListenHostAlreadyUsed => {
641 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
642 }
643 hopr_lib::utils::session::BindError::UnknownFailure(_) => (
644 StatusCode::UNPROCESSABLE_ENTITY,
645 ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
646 ),
647 })?
648 }
649 IpProtocol::UDP => {
650 let target_spec: hopr_lib::utils::session::SessionTargetSpec = args.target.clone().into();
651 let (destination, _target, config) = args
652 .clone()
653 .into_protocol_session_config(IpProtocol::UDP)
654 .await
655 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
656
657 create_udp_client_binding(
658 bind_host,
659 port_range,
660 state.hopr.clone(),
661 state.open_listeners.clone(),
662 destination,
663 target_spec,
664 config,
665 )
666 .await
667 .map_err(|e| match e {
668 hopr_lib::utils::session::BindError::ListenHostAlreadyUsed => {
669 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
670 }
671 hopr_lib::utils::session::BindError::UnknownFailure(_) => (
672 StatusCode::UNPROCESSABLE_ENTITY,
673 ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
674 ),
675 })?
676 }
677 };
678
679 Ok::<_, (StatusCode, ApiErrorStatus)>(
680 (
681 StatusCode::OK,
682 Json(SessionClientResponse {
683 protocol,
684 ip: bound_host.ip().to_string(),
685 port: bound_host.port(),
686 target: args.target.to_string(),
687 destination: args.destination,
688 forward_path: args.forward_path.clone(),
689 return_path: args.return_path.clone(),
690 hopr_mtu: SESSION_MTU,
691 surb_len: SURB_SIZE,
692 active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
693 max_client_sessions: max_clients,
694 max_surb_upstream: args.max_surb_upstream,
695 response_buffer: args.response_buffer,
696 session_pool: args.session_pool,
697 }),
698 )
699 .into_response(),
700 )
701}
702
703#[utoipa::path(
705 get,
706 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
707 description = "Lists existing Session listeners for the given IP protocol.",
708 params(
709 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
710 ),
711 responses(
712 (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
713 {
714 "target": "example.com:80",
715 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
716 "forwardPath": { "Hops": 1 },
717 "returnPath": { "Hops": 1 },
718 "protocol": "tcp",
719 "ip": "127.0.0.1",
720 "port": 5542,
721 "surbLen": 400,
722 "hoprMtu": 1020,
723 "activeClients": [],
724 "maxClientSessions": 2,
725 "maxSurbUpstream": "2000 kb/s",
726 "responseBuffer": "2 MB",
727 "sessionPool": 0
728 }
729 ])),
730 (status = 400, description = "Invalid IP protocol.", body = ApiError),
731 (status = 401, description = "Invalid authorization token.", body = ApiError),
732 (status = 422, description = "Unknown failure", body = ApiError)
733 ),
734 security(
735 ("api_token" = []),
736 ("bearer_token" = [])
737 ),
738 tag = "Session",
739)]
740pub(crate) async fn list_clients(
741 State(state): State<Arc<InternalState>>,
742 Path(protocol): Path<IpProtocol>,
743) -> Result<impl IntoResponse, impl IntoResponse> {
744 let response = state
745 .open_listeners
746 .read_arc()
747 .await
748 .iter()
749 .filter(|(id, _)| id.0 == protocol.into())
750 .map(|(id, entry)| SessionClientResponse {
751 protocol,
752 ip: id.1.ip().to_string(),
753 port: id.1.port(),
754 target: entry.target.to_string(),
755 forward_path: entry.forward_path.clone().into(),
756 return_path: entry.return_path.clone().into(),
757 destination: entry.destination,
758 hopr_mtu: SESSION_MTU,
759 surb_len: SURB_SIZE,
760 active_clients: entry.get_clients().iter().map(|e| e.key().to_string()).collect(),
761 max_client_sessions: entry.max_client_sessions,
762 max_surb_upstream: entry.max_surb_upstream,
763 response_buffer: entry.response_buffer,
764 session_pool: entry.session_pool,
765 })
766 .collect::<Vec<_>>();
767
768 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
769}
770
771#[serde_as]
772#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
773#[schema(example = json!({
774 "responseBuffer": "2 MB",
775 "maxSurbUpstream": "2 Mbps"
776}))]
777#[serde(rename_all = "camelCase")]
778pub(crate) struct SessionConfig {
779 #[serde(default)]
789 #[serde_as(as = "Option<DisplayFromStr>")]
790 #[schema(value_type = String)]
791 pub response_buffer: Option<bytesize::ByteSize>,
792 #[serde(default)]
800 #[serde(with = "human_bandwidth::option")]
801 #[schema(value_type = String)]
802 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
803}
804
805impl From<SessionConfig> for Option<SurbBalancerConfig> {
806 fn from(value: SessionConfig) -> Self {
807 match value.response_buffer {
808 Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
810 target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
811 max_surbs_per_sec: value
812 .max_surb_upstream
813 .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
814 .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
815 ..Default::default()
816 }),
817 Some(_) => None,
819 None => Some(SurbBalancerConfig::default()),
821 }
822 }
823}
824
825impl From<SurbBalancerConfig> for SessionConfig {
826 fn from(value: SurbBalancerConfig) -> Self {
827 Self {
828 response_buffer: Some(bytesize::ByteSize::b(
829 value.target_surb_buffer_size * SESSION_MTU as u64,
830 )),
831 max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
832 value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
833 )),
834 }
835 }
836}
837
838#[utoipa::path(
839 post,
840 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
841 description = "Updates configuration of an existing active session.",
842 params(
843 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
844 ),
845 request_body(
846 content = SessionConfig,
847 description = "Allows updating of several parameters of an existing active session.",
848 content_type = "application/json"),
849 responses(
850 (status = 204, description = "Successfully updated the configuration"),
851 (status = 400, description = "Invalid configuration.", body = ApiError),
852 (status = 401, description = "Invalid authorization token.", body = ApiError),
853 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
854 (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
855 (status = 422, description = "Unknown failure", body = ApiError),
856 ),
857 security(
858 ("api_token" = []),
859 ("bearer_token" = [])
860 ),
861 tag = "Session"
862)]
863pub(crate) async fn adjust_session(
864 State(state): State<Arc<InternalState>>,
865 Path(session_id): Path<String>,
866 Json(args): Json<SessionConfig>,
867) -> Result<impl IntoResponse, impl IntoResponse> {
868 let session_id = HoprSessionId::from_str(&session_id)
869 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
870
871 if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
872 match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
873 Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
874 Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
875 SessionManagerError::NonExistingSession,
876 )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
877 Err(e) => Err((
878 StatusCode::NOT_ACCEPTABLE,
879 ApiErrorStatus::UnknownFailure(e.to_string()),
880 )),
881 }
882 } else {
883 Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
884 }
885}
886
887#[utoipa::path(
888 get,
889 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
890 description = "Gets configuration of an existing active session.",
891 params(
892 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
893 ),
894 responses(
895 (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
896 (status = 400, description = "Invalid session ID.", body = ApiError),
897 (status = 401, description = "Invalid authorization token.", body = ApiError),
898 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
899 (status = 422, description = "Unknown failure", body = ApiError),
900 ),
901 security(
902 ("api_token" = []),
903 ("bearer_token" = [])
904 ),
905 tag = "Session"
906)]
907pub(crate) async fn session_config(
908 State(state): State<Arc<InternalState>>,
909 Path(session_id): Path<String>,
910) -> Result<impl IntoResponse, impl IntoResponse> {
911 let session_id = HoprSessionId::from_str(&session_id)
912 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
913
914 match state.hopr.get_session_surb_balancer_config(&session_id).await {
915 Ok(Some(cfg)) => {
916 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
917 }
918 Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
919 Err(e) => Err((
920 StatusCode::UNPROCESSABLE_ENTITY,
921 ApiErrorStatus::UnknownFailure(e.to_string()),
922 )),
923 }
924}
925
926#[derive(
927 Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
928)]
929#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
930#[serde(rename_all = "lowercase")]
931#[schema(example = "tcp")]
932pub enum IpProtocol {
934 #[allow(clippy::upper_case_acronyms)]
935 TCP,
936 #[allow(clippy::upper_case_acronyms)]
937 UDP,
938}
939
940impl From<IpProtocol> for hopr_lib::IpProtocol {
941 fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
942 match protocol {
943 IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
944 IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
945 }
946 }
947}
948
949#[serde_as]
950#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
951pub struct SessionCloseClientQuery {
952 #[serde_as(as = "DisplayFromStr")]
953 #[schema(value_type = String, example = "tcp")]
954 pub protocol: IpProtocol,
956
957 #[schema(example = "127.0.0.1:8545")]
959 pub ip: String,
960
961 #[schema(value_type = u16, example = 10101)]
963 pub port: u16,
964}
965
966#[utoipa::path(
972 delete,
973 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
974 description = "Closes an existing Session listener.",
975 params(SessionCloseClientQuery),
976 responses(
977 (status = 204, description = "Listener closed successfully"),
978 (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
979 (status = 401, description = "Invalid authorization token.", body = ApiError),
980 (status = 404, description = "Listener not found.", body = ApiError),
981 (status = 422, description = "Unknown failure", body = ApiError)
982 ),
983 security(
984 ("api_token" = []),
985 ("bearer_token" = [])
986 ),
987 tag = "Session",
988)]
989pub(crate) async fn close_client(
990 State(state): State<Arc<InternalState>>,
991 Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
992) -> Result<impl IntoResponse, impl IntoResponse> {
993 let listening_ip: IpAddr = ip
994 .parse()
995 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
996
997 {
998 let mut open_listeners = state.open_listeners.write_arc().await;
999
1000 let mut to_remove = Vec::new();
1001 let protocol: hopr_lib::IpProtocol = protocol.into();
1002
1003 open_listeners
1005 .iter()
1006 .filter(|(ListenerId(proto, addr), _)| {
1007 protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1008 })
1009 .for_each(|(id, _)| to_remove.push(*id));
1010
1011 if to_remove.is_empty() {
1012 return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1013 }
1014
1015 for bound_addr in to_remove {
1016 let entry = open_listeners
1017 .remove(&bound_addr)
1018 .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1019
1020 entry.abort_handle.abort();
1021 }
1022 }
1023
1024 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1025}