1use std::{fmt::Formatter, hash::Hash, net::IpAddr, str::FromStr, sync::Arc};
2
3use axum::{
4 extract::{Json, Path, State},
5 http::status::StatusCode,
6 response::IntoResponse,
7};
8use base64::Engine;
9use hopr_lib::{
10 Address, HopRouting, HoprSessionClientConfig, SESSION_MTU, SURB_SIZE, ServiceId, SessionCapabilities, SessionId,
11 SessionManagerError, SessionTarget, SurbBalancerConfig, TransportSessionError,
12 errors::{HoprLibError, HoprTransportError},
13};
14use hopr_utils_session::{ListenerId, build_binding_host, create_tcp_client_binding, create_udp_client_binding};
15use serde::{Deserialize, Serialize};
16#[allow(unused_imports)]
18use serde_json::json;
19use serde_with::{DisplayFromStr, serde_as};
20
21use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState};
22
23#[serde_as]
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
25#[schema(
26 example = json!({"Plain": "example.com:80"}),
27 example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), example = json!({"Service": 0})
29)]
30pub enum SessionTargetSpec {
32 Plain(String),
33 Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
34 #[schema(value_type = u32)]
35 Service(ServiceId),
36}
37
38impl std::fmt::Display for SessionTargetSpec {
39 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40 match self {
41 SessionTargetSpec::Plain(t) => write!(f, "{t}"),
42 SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
43 SessionTargetSpec::Service(t) => write!(f, "#{t}"),
44 }
45 }
46}
47
48impl FromStr for SessionTargetSpec {
49 type Err = HoprLibError;
50
51 fn from_str(s: &str) -> Result<Self, Self::Err> {
52 Ok(if let Some(stripped) = s.strip_prefix("$$") {
53 Self::Sealed(
54 base64::prelude::BASE64_URL_SAFE
55 .decode(stripped)
56 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
57 )
58 } else if let Some(stripped) = s.strip_prefix("#") {
59 Self::Service(
60 stripped
61 .parse()
62 .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
63 )
64 } else {
65 Self::Plain(s.to_owned())
66 })
67 }
68}
69
70impl From<SessionTargetSpec> for hopr_utils_session::SessionTargetSpec {
71 fn from(spec: SessionTargetSpec) -> Self {
72 match spec {
73 SessionTargetSpec::Plain(t) => Self::Plain(t),
74 SessionTargetSpec::Sealed(t) => Self::Sealed(t),
75 SessionTargetSpec::Service(t) => Self::Service(t),
76 }
77 }
78}
79
80#[repr(u8)]
81#[derive(
82 Debug,
83 Clone,
84 strum::EnumIter,
85 strum::Display,
86 strum::EnumString,
87 Serialize,
88 Deserialize,
89 PartialEq,
90 utoipa::ToSchema,
91)]
92#[schema(example = "Segmentation")]
93pub enum SessionCapability {
95 Segmentation,
97 Retransmission,
99 RetransmissionAckOnly,
101 NoDelay,
103 NoRateControl,
105}
106
107impl From<SessionCapability> for hopr_lib::SessionCapabilities {
108 fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
109 match cap {
110 SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
111 SessionCapability::Retransmission => {
112 hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
113 }
114 SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
115 SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
116 SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
117 }
118 }
119}
120
121#[serde_as]
122#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
123#[schema(example = json!({ "Hops": 1 }))]
124pub enum RoutingOptions {
125 Hops(usize),
126}
127
128impl TryFrom<RoutingOptions> for hopr_lib::HopRouting {
129 type Error = hopr_lib::GeneralError;
130
131 fn try_from(value: RoutingOptions) -> Result<Self, Self::Error> {
133 match value {
134 RoutingOptions::Hops(hops) => HopRouting::try_from(hops),
135 }
136 }
137}
138
139impl From<hopr_lib::HopRouting> for RoutingOptions {
140 fn from(opts: hopr_lib::HopRouting) -> Self {
141 RoutingOptions::Hops(opts.hop_count())
142 }
143}
144
145#[serde_as]
146#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
147#[schema(example = json!({
148 "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
149 "forwardPath": { "Hops": 1 },
150 "returnPath": { "Hops": 1 },
151 "target": {"Plain": "localhost:8080"},
152 "listenHost": "127.0.0.1:10000",
153 "capabilities": ["Retransmission", "Segmentation"],
154 "responseBuffer": "2 MB",
155 "maxSurbUpstream": "2000 kb/s",
156 "sessionPool": 0,
157 "maxClientSessions": 2
158 }))]
159#[serde(rename_all = "camelCase")]
160pub(crate) struct SessionClientRequest {
162 #[serde_as(as = "DisplayFromStr")]
164 #[schema(value_type = String)]
165 pub destination: Address,
166 pub forward_path: RoutingOptions,
168 pub return_path: RoutingOptions,
170 pub target: SessionTargetSpec,
172 pub listen_host: Option<String>,
177 #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
178 pub capabilities: Option<Vec<SessionCapability>>,
182 #[serde_as(as = "Option<DisplayFromStr>")]
192 #[schema(value_type = Option<String>)]
193 pub response_buffer: Option<bytesize::ByteSize>,
194 #[serde(default)]
202 #[serde(with = "human_bandwidth::option")]
203 #[schema(value_type = Option<String>)]
204 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
205 pub session_pool: Option<usize>,
212 pub max_client_sessions: Option<usize>,
222}
223
224impl SessionClientRequest {
225 pub(crate) async fn into_protocol_session_config(
227 self,
228 target_protocol: IpProtocol,
229 ) -> Result<(hopr_lib::Address, SessionTarget, HoprSessionClientConfig), ApiErrorStatus> {
230 let target_spec: hopr_utils_session::SessionTargetSpec = self.target.clone().into();
231 Ok((
232 self.destination,
233 target_spec.into_target(target_protocol.into())?,
234 HoprSessionClientConfig {
235 forward_path: self.forward_path.try_into()?,
236 return_path: self.return_path.try_into()?,
237 capabilities: self
238 .capabilities
239 .map(|vs| {
240 let mut caps = SessionCapabilities::empty();
241 caps.extend(vs.into_iter().map(SessionCapabilities::from));
242 caps
243 })
244 .unwrap_or_else(|| match target_protocol {
245 IpProtocol::TCP => {
246 hopr_lib::SessionCapability::RetransmissionAck
247 | hopr_lib::SessionCapability::RetransmissionNack
248 | hopr_lib::SessionCapability::Segmentation
249 }
250 _ => SessionCapability::Segmentation.into(),
252 }),
253 surb_management: SessionConfig {
254 response_buffer: self.response_buffer,
255 max_surb_upstream: self.max_surb_upstream,
256 }
257 .into(),
258 ..Default::default()
259 },
260 ))
261 }
262}
263
264#[serde_as]
265#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
266#[schema(example = json!({
267 "target": "example.com:80",
268 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
269 "forwardPath": { "Hops": 1 },
270 "returnPath": { "Hops": 1 },
271 "protocol": "tcp",
272 "ip": "127.0.0.1",
273 "port": 5542,
274 "hoprMtu": 1002,
275 "surbLen": 398,
276 "activeClients": [],
277 "maxClientSessions": 2,
278 "maxSurbUpstream": "2000 kb/s",
279 "responseBuffer": "2 MB",
280 "sessionPool": 0
281 }))]
282#[serde(rename_all = "camelCase")]
283pub(crate) struct SessionClientResponse {
285 #[schema(example = "example.com:80")]
286 pub target: String,
288 #[serde_as(as = "DisplayFromStr")]
290 #[schema(value_type = String)]
291 pub destination: Address,
292 pub forward_path: RoutingOptions,
294 pub return_path: RoutingOptions,
296 #[serde_as(as = "DisplayFromStr")]
298 #[schema(example = "tcp")]
299 pub protocol: IpProtocol,
300 #[schema(example = "127.0.0.1")]
302 pub ip: String,
303 #[schema(example = 5542)]
304 pub port: u16,
306 pub hopr_mtu: usize,
308 pub surb_len: usize,
312 pub active_clients: Vec<String>,
317 pub max_client_sessions: usize,
322 #[serde(default)]
325 #[serde(with = "human_bandwidth::option")]
326 #[schema(value_type = Option<String>)]
327 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
328 #[serde_as(as = "Option<DisplayFromStr>")]
331 #[schema(value_type = Option<String>)]
332 pub response_buffer: Option<bytesize::ByteSize>,
333 pub session_pool: Option<usize>,
335}
336
337#[utoipa::path(
354 post,
355 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
356 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
357 params(
358 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
359 ),
360 request_body(
361 content = SessionClientRequest,
362 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
363 content_type = "application/json"),
364 responses(
365 (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
366 (status = 400, description = "Invalid IP protocol.", body = ApiError),
367 (status = 401, description = "Invalid authorization token.", body = ApiError),
368 (status = 409, description = "Listening address and port already in use.", body = ApiError),
369 (status = 422, description = "Unknown failure", body = ApiError),
370 ),
371 security(
372 ("api_token" = []),
373 ("bearer_token" = [])
374 ),
375 tag = "Session"
376 )]
377pub(crate) async fn create_client(
378 State(state): State<Arc<InternalState>>,
379 Path(protocol): Path<IpProtocol>,
380 Json(args): Json<SessionClientRequest>,
381) -> Result<impl IntoResponse, impl IntoResponse> {
382 let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
383
384 let listener_id = ListenerId(protocol.into(), bind_host);
385 if bind_host.port() > 0 && state.open_listeners.0.contains_key(&listener_id) {
386 return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
387 }
388
389 let port_range = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE).ok();
390 tracing::debug!(%protocol, %bind_host, ?port_range, "binding session listening socket");
391
392 let (bound_host, udp_session_id, max_clients) = match protocol {
393 IpProtocol::TCP => {
394 let session_pool = args.session_pool;
395 let max_client_sessions = args.max_client_sessions;
396 let target_spec: hopr_utils_session::SessionTargetSpec = args.target.clone().into();
397 let (destination, _target, config) = args
398 .clone()
399 .into_protocol_session_config(IpProtocol::TCP)
400 .await
401 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
402
403 create_tcp_client_binding(
404 bind_host,
405 port_range,
406 state.hopr.clone(),
407 state.open_listeners.clone(),
408 destination,
409 target_spec,
410 config,
411 session_pool,
412 max_client_sessions,
413 )
414 .await
415 .map_err(|e| match e {
416 hopr_utils_session::BindError::ListenHostAlreadyUsed => {
417 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
418 }
419 hopr_utils_session::BindError::UnknownFailure(_) => (
420 StatusCode::UNPROCESSABLE_ENTITY,
421 ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
422 ),
423 })?
424 }
425 IpProtocol::UDP => {
426 let target_spec: hopr_utils_session::SessionTargetSpec = args.target.clone().into();
427 let (destination, _target, config) = args
428 .clone()
429 .into_protocol_session_config(IpProtocol::UDP)
430 .await
431 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
432
433 create_udp_client_binding(
434 bind_host,
435 port_range,
436 state.hopr.clone(),
437 state.open_listeners.clone(),
438 destination,
439 target_spec,
440 config,
441 )
442 .await
443 .map_err(|e| match e {
444 hopr_utils_session::BindError::ListenHostAlreadyUsed => {
445 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
446 }
447 hopr_utils_session::BindError::UnknownFailure(_) => (
448 StatusCode::UNPROCESSABLE_ENTITY,
449 ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
450 ),
451 })?
452 }
453 };
454
455 Ok::<_, (StatusCode, ApiErrorStatus)>(
456 (
457 StatusCode::OK,
458 Json(SessionClientResponse {
459 protocol,
460 ip: bound_host.ip().to_string(),
461 port: bound_host.port(),
462 target: args.target.to_string(),
463 destination: args.destination,
464 forward_path: args.forward_path.clone(),
465 return_path: args.return_path.clone(),
466 hopr_mtu: SESSION_MTU,
467 surb_len: SURB_SIZE,
468 active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
469 max_client_sessions: max_clients,
470 max_surb_upstream: args.max_surb_upstream,
471 response_buffer: args.response_buffer,
472 session_pool: args.session_pool,
473 }),
474 )
475 .into_response(),
476 )
477}
478
479#[utoipa::path(
481 get,
482 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
483 description = "Lists existing Session listeners for the given IP protocol.",
484 params(
485 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
486 ),
487 responses(
488 (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
489 {
490 "target": "example.com:80",
491 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
492 "forwardPath": { "Hops": 1 },
493 "returnPath": { "Hops": 1 },
494 "protocol": "tcp",
495 "ip": "127.0.0.1",
496 "port": 5542,
497 "surbLen": 400,
498 "hoprMtu": 1020,
499 "activeClients": [],
500 "maxClientSessions": 2,
501 "maxSurbUpstream": "2000 kb/s",
502 "responseBuffer": "2 MB",
503 "sessionPool": 0
504 }
505 ])),
506 (status = 400, description = "Invalid IP protocol.", body = ApiError),
507 (status = 401, description = "Invalid authorization token.", body = ApiError),
508 (status = 422, description = "Unknown failure", body = ApiError)
509 ),
510 security(
511 ("api_token" = []),
512 ("bearer_token" = [])
513 ),
514 tag = "Session",
515)]
516pub(crate) async fn list_clients(
517 State(state): State<Arc<InternalState>>,
518 Path(protocol): Path<IpProtocol>,
519) -> Result<impl IntoResponse, impl IntoResponse> {
520 let response = state
521 .open_listeners
522 .0
523 .iter()
524 .filter(|v| v.key().0 == protocol.into())
525 .map(|v| {
526 let ListenerId(_, addr) = *v.key();
527 let entry = v.value();
528 let forward_path = hopr_lib::HopRouting::try_from(entry.forward_path.count_hops())
529 .expect("stored routing options always have bounded hop count");
530 let return_path = hopr_lib::HopRouting::try_from(entry.return_path.count_hops())
531 .expect("stored routing options always have bounded hop count");
532 SessionClientResponse {
533 protocol,
534 ip: addr.ip().to_string(),
535 port: addr.port(),
536 target: entry.target.to_string(),
537 forward_path: forward_path.into(),
538 return_path: return_path.into(),
539 destination: entry.destination,
540 hopr_mtu: SESSION_MTU,
541 surb_len: SURB_SIZE,
542 active_clients: entry.get_clients().iter().map(|e| e.key().to_string()).collect(),
543 max_client_sessions: entry.max_client_sessions,
544 max_surb_upstream: entry.max_surb_upstream,
545 response_buffer: entry.response_buffer,
546 session_pool: entry.session_pool,
547 }
548 })
549 .collect::<Vec<_>>();
550
551 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
552}
553
554#[serde_as]
555#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
556#[schema(example = json!({
557 "responseBuffer": "2 MB",
558 "maxSurbUpstream": "2 Mbps"
559}))]
560#[serde(rename_all = "camelCase")]
561pub(crate) struct SessionConfig {
562 #[serde(default)]
572 #[serde_as(as = "Option<DisplayFromStr>")]
573 #[schema(value_type = String)]
574 pub response_buffer: Option<bytesize::ByteSize>,
575 #[serde(default)]
583 #[serde(with = "human_bandwidth::option")]
584 #[schema(value_type = String)]
585 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
586}
587
588impl From<SessionConfig> for Option<SurbBalancerConfig> {
589 fn from(value: SessionConfig) -> Self {
591 match value.response_buffer {
592 Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
594 target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
595 max_surbs_per_sec: value
596 .max_surb_upstream
597 .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
598 .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
599 ..Default::default()
600 }),
601 Some(_) => None,
603 None => Some(SurbBalancerConfig::default()),
605 }
606 }
607}
608
609impl From<SurbBalancerConfig> for SessionConfig {
610 fn from(value: SurbBalancerConfig) -> Self {
612 Self {
613 response_buffer: Some(bytesize::ByteSize::b(
614 value.target_surb_buffer_size * SESSION_MTU as u64,
615 )),
616 max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
617 value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
618 )),
619 }
620 }
621}
622
623#[utoipa::path(
624 post,
625 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
626 description = "Updates configuration of an existing active session.",
627 params(
628 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
629 ),
630 request_body(
631 content = SessionConfig,
632 description = "Allows updating of several parameters of an existing active session.",
633 content_type = "application/json"),
634 responses(
635 (status = 204, description = "Successfully updated the configuration"),
636 (status = 400, description = "Invalid configuration.", body = ApiError),
637 (status = 401, description = "Invalid authorization token.", body = ApiError),
638 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
639 (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
640 (status = 422, description = "Unknown failure", body = ApiError),
641 ),
642 security(
643 ("api_token" = []),
644 ("bearer_token" = [])
645 ),
646 tag = "Session"
647)]
648pub(crate) async fn adjust_session(
649 State(state): State<Arc<InternalState>>,
650 Path(session_id): Path<String>,
651 Json(args): Json<SessionConfig>,
652) -> Result<impl IntoResponse, impl IntoResponse> {
653 let session_id =
654 SessionId::from_str(&session_id).map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
655
656 if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
657 match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
658 Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
659 Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
660 SessionManagerError::NonExistingSession,
661 )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
662 Err(e) => Err((
663 StatusCode::NOT_ACCEPTABLE,
664 ApiErrorStatus::UnknownFailure(e.to_string()),
665 )),
666 }
667 } else {
668 Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
669 }
670}
671
672#[utoipa::path(
673 get,
674 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
675 description = "Gets configuration of an existing active session.",
676 params(
677 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
678 ),
679 responses(
680 (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
681 (status = 400, description = "Invalid session ID.", body = ApiError),
682 (status = 401, description = "Invalid authorization token.", body = ApiError),
683 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
684 (status = 422, description = "Unknown failure", body = ApiError),
685 ),
686 security(
687 ("api_token" = []),
688 ("bearer_token" = [])
689 ),
690 tag = "Session"
691)]
692pub(crate) async fn session_config(
693 State(state): State<Arc<InternalState>>,
694 Path(session_id): Path<String>,
695) -> Result<impl IntoResponse, impl IntoResponse> {
696 let session_id =
697 SessionId::from_str(&session_id).map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
698
699 match state.hopr.get_session_surb_balancer_config(&session_id).await {
700 Ok(Some(cfg)) => {
701 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
702 }
703 Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
704 Err(e) => Err((
705 StatusCode::UNPROCESSABLE_ENTITY,
706 ApiErrorStatus::UnknownFailure(e.to_string()),
707 )),
708 }
709}
710
711#[derive(
712 Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
713)]
714#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
715#[serde(rename_all = "lowercase")]
716#[schema(example = "tcp")]
717pub enum IpProtocol {
719 #[allow(clippy::upper_case_acronyms)]
720 TCP,
721 #[allow(clippy::upper_case_acronyms)]
722 UDP,
723}
724
725impl From<IpProtocol> for hopr_lib::IpProtocol {
726 fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
727 match protocol {
728 IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
729 IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
730 }
731 }
732}
733
734#[serde_as]
735#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
736pub struct SessionCloseClientQuery {
737 #[serde_as(as = "DisplayFromStr")]
738 #[schema(value_type = String, example = "tcp")]
739 pub protocol: IpProtocol,
741
742 #[schema(example = "127.0.0.1:8545")]
744 pub ip: String,
745
746 #[schema(value_type = u16, example = 10101)]
748 pub port: u16,
749}
750
751#[utoipa::path(
757 delete,
758 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
759 description = "Closes an existing Session listener.",
760 params(SessionCloseClientQuery),
761 responses(
762 (status = 204, description = "Listener closed successfully"),
763 (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
764 (status = 401, description = "Invalid authorization token.", body = ApiError),
765 (status = 404, description = "Listener not found.", body = ApiError),
766 (status = 422, description = "Unknown failure", body = ApiError)
767 ),
768 security(
769 ("api_token" = []),
770 ("bearer_token" = [])
771 ),
772 tag = "Session",
773)]
774pub(crate) async fn close_client(
775 State(state): State<Arc<InternalState>>,
776 Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
777) -> Result<impl IntoResponse, impl IntoResponse> {
778 let listening_ip: IpAddr = ip
779 .parse()
780 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
781
782 {
783 let open_listeners = &state.open_listeners.0;
784
785 let mut to_remove = Vec::new();
786 let protocol: hopr_lib::IpProtocol = protocol.into();
787
788 open_listeners
790 .iter()
791 .filter(|v| {
792 let ListenerId(proto, addr) = v.key();
793 protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
794 })
795 .for_each(|v| to_remove.push(*v.key()));
796
797 if to_remove.is_empty() {
798 return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
799 }
800
801 for bound_addr in to_remove {
802 let (_, entry) = open_listeners
803 .remove(&bound_addr)
804 .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
805
806 entry.abort_handle.abort();
807 }
808 }
809
810 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
811}