1use std::fmt::Formatter;
2use std::future::Future;
3use std::str::FromStr;
4
5use axum::extract::Path;
6use axum::Error;
7use axum::{
8 extract::{
9 ws::{Message, WebSocket, WebSocketUpgrade},
10 Json, State,
11 },
12 http::status::StatusCode,
13 response::IntoResponse,
14};
15use axum_extra::extract::Query;
16use base64::Engine;
17use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt, TryStreamExt};
18use futures_concurrency::stream::Merge;
19use libp2p_identity::PeerId;
20use serde::{Deserialize, Serialize};
21use serde_with::{serde_as, DisplayFromStr};
22use std::net::IpAddr;
23use std::sync::Arc;
24use tokio::net::TcpListener;
25use tracing::{debug, error, info, trace};
26
27use hopr_lib::errors::HoprLibError;
28use hopr_lib::transfer_session;
29use hopr_lib::{HoprSession, ServiceId, SessionClientConfig, SessionTarget};
30use hopr_network_types::prelude::{ConnectedUdpStream, IpOrHost, SealedHost, UdpStreamParallelism};
31use hopr_network_types::udp::ForeignDataMode;
32use hopr_network_types::utils::AsyncReadStreamer;
33
34use crate::types::PeerOrAddress;
35use crate::{ApiError, ApiErrorStatus, InternalState, ListenerId, BASE_PATH};
36
37pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
39
40pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
42
43pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
45
46#[cfg(all(feature = "prometheus", not(test)))]
47lazy_static::lazy_static! {
48 static ref METRIC_ACTIVE_CLIENTS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
49 "hopr_session_hoprd_clients",
50 "Number of clients connected at this Entry node",
51 &["type"]
52 ).unwrap();
53}
54
55#[serde_as]
56#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
57pub enum SessionTargetSpec {
58 Plain(String),
59 Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
60 Service(ServiceId),
61}
62
63impl std::fmt::Display for SessionTargetSpec {
64 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
65 match self {
66 SessionTargetSpec::Plain(t) => write!(f, "{t}"),
67 SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
68 SessionTargetSpec::Service(t) => write!(f, "#{t}"),
69 }
70 }
71}
72
73impl std::str::FromStr for SessionTargetSpec {
74 type Err = HoprLibError;
75
76 fn from_str(s: &str) -> Result<Self, Self::Err> {
77 Ok(if let Some(stripped) = s.strip_prefix("$$") {
78 Self::Sealed(
79 base64::prelude::BASE64_URL_SAFE
80 .decode(stripped)
81 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
82 )
83 } else if let Some(stripped) = s.strip_prefix("#") {
84 Self::Service(
85 stripped
86 .parse()
87 .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
88 )
89 } else {
90 Self::Plain(s.to_owned())
91 })
92 }
93}
94
95impl SessionTargetSpec {
96 pub fn into_target(self, protocol: IpProtocol) -> Result<SessionTarget, HoprLibError> {
97 Ok(match (protocol, self) {
98 (IpProtocol::TCP, SessionTargetSpec::Plain(plain)) => SessionTarget::TcpStream(
99 IpOrHost::from_str(&plain)
100 .map(SealedHost::from)
101 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
102 ),
103 (IpProtocol::UDP, SessionTargetSpec::Plain(plain)) => SessionTarget::UdpStream(
104 IpOrHost::from_str(&plain)
105 .map(SealedHost::from)
106 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
107 ),
108 (IpProtocol::TCP, SessionTargetSpec::Sealed(enc)) => {
109 SessionTarget::TcpStream(SealedHost::Sealed(enc.into_boxed_slice()))
110 }
111 (IpProtocol::UDP, SessionTargetSpec::Sealed(enc)) => {
112 SessionTarget::UdpStream(SealedHost::Sealed(enc.into_boxed_slice()))
113 }
114 (_, SessionTargetSpec::Service(id)) => SessionTarget::ExitNode(id),
115 })
116 }
117}
118
119#[derive(Debug)]
121pub struct StoredSessionEntry {
122 pub target: SessionTargetSpec,
124 pub path: RoutingOptions,
126 pub jh: hopr_async_runtime::prelude::JoinHandle<()>,
128}
129
130#[repr(u8)]
131#[derive(
132 Debug, Clone, strum::EnumIter, strum::Display, strum::EnumString, Serialize, Deserialize, utoipa::ToSchema,
133)]
134pub enum SessionCapability {
135 Segmentation,
137 Retransmission,
139 RetransmissionAckOnly,
141 NoDelay,
143}
144
145impl From<SessionCapability> for hopr_lib::SessionCapability {
146 fn from(cap: SessionCapability) -> hopr_lib::SessionCapability {
147 match cap {
148 SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation,
149 SessionCapability::Retransmission => hopr_lib::SessionCapability::Retransmission,
150 SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAckOnly,
151 SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay,
152 }
153 }
154}
155
156#[serde_as]
157#[derive(Debug, Clone, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
158#[into_params(parameter_in = Query)]
159#[serde(rename_all = "camelCase")]
160pub(crate) struct SessionWebsocketClientQueryRequest {
161 #[serde_as(as = "DisplayFromStr")]
162 #[schema(required = true, value_type = String)]
163 pub destination: String, #[schema(required = true)]
165 pub hops: u8,
166 #[cfg(feature = "explicit-path")]
167 #[schema(required = false)]
168 pub path: Option<String>,
169 #[schema(required = true)]
170 #[serde_as(as = "Vec<DisplayFromStr>")]
171 pub capabilities: Vec<SessionCapability>,
172 #[schema(required = true)]
173 #[serde_as(as = "DisplayFromStr")]
174 pub target: SessionTargetSpec,
175 #[schema(required = false)]
176 #[serde(default = "default_protocol")]
177 pub protocol: IpProtocol,
178}
179
180#[inline]
181fn default_protocol() -> IpProtocol {
182 IpProtocol::TCP
183}
184
185impl SessionWebsocketClientQueryRequest {
186 pub(crate) fn into_protocol_session_config(self) -> Result<SessionClientConfig, HoprLibError> {
187 #[cfg(not(feature = "explicit-path"))]
188 let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
189
190 #[cfg(feature = "explicit-path")]
191 let path_options = if let Some(path) = self.path {
192 hopr_lib::RoutingOptions::IntermediatePath(
194 path.split(',')
195 .map(PeerId::from_str)
196 .collect::<Result<Vec<PeerId>, _>>()
197 .map_err(|e| HoprLibError::GeneralError(format!("invalid peer id on path: {e}")))?
198 .try_into()?,
199 )
200 } else {
201 hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
202 };
203
204 Ok(SessionClientConfig {
205 peer: PeerId::from_str(self.destination.as_str())
206 .map_err(|_e| HoprLibError::GeneralError(format!("invalid destination: {}", self.destination)))?,
207 path_options,
208 target: self.target.into_target(self.protocol)?,
209 capabilities: self.capabilities.into_iter().map(SessionCapability::into).collect(),
210 })
211 }
212}
213
214#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
215#[schema(value_type = String, format = Binary)]
216#[allow(dead_code)] struct WssData(Vec<u8>);
218
219#[allow(dead_code)] #[utoipa::path(
229 get,
230 path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
231 params(SessionWebsocketClientQueryRequest),
232 responses(
233 (status = 200, description = "Successfully created a new client websocket session."),
234 (status = 401, description = "Invalid authorization token.", body = ApiError),
235 (status = 422, description = "Unknown failure", body = ApiError),
236 (status = 429, description = "Too many open websocket connections.", body = ApiError),
237 ),
238 security(
239 ("api_token" = []),
240 ("bearer_token" = [])
241 ),
242 tag = "Session",
243 )]
244
245pub(crate) async fn websocket(
246 ws: WebSocketUpgrade,
247 Query(query): Query<SessionWebsocketClientQueryRequest>,
248 State(state): State<Arc<InternalState>>,
249) -> Result<impl IntoResponse, impl IntoResponse> {
250 let data = query.into_protocol_session_config().map_err(|e| {
251 (
252 StatusCode::UNPROCESSABLE_ENTITY,
253 ApiErrorStatus::UnknownFailure(e.to_string()),
254 )
255 })?;
256
257 let hopr = state.hopr.clone();
258 let session: HoprSession = hopr.connect_to(data).await.map_err(|e| {
259 error!(error = %e, "Failed to establish session");
260 (
261 StatusCode::UNPROCESSABLE_ENTITY,
262 ApiErrorStatus::UnknownFailure(e.to_string()),
263 )
264 })?;
265
266 Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
267}
268
269enum WebSocketInput {
270 Network(Result<Box<[u8]>, std::io::Error>),
271 WsInput(Result<Message, Error>),
272}
273
274const WS_MAX_SESSION_READ_SIZE: usize = 4096;
276
277#[tracing::instrument(level = "debug", skip(socket, session))]
278async fn websocket_connection(socket: WebSocket, session: HoprSession) {
279 let session_id = *session.id();
280
281 let (rx, mut tx) = session.split();
282 let (mut sender, receiver) = socket.split();
283
284 let mut queue = (
285 receiver.map(WebSocketInput::WsInput),
286 AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
287 )
288 .merge();
289
290 let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
291
292 while let Some(v) = queue.next().await {
293 match v {
294 WebSocketInput::Network(bytes) => match bytes {
295 Ok(bytes) => {
296 let len = bytes.len();
297 if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
298 error!(
299 error = %e,
300 "Failed to emit read data onto the websocket, closing connection"
301 );
302 break;
303 };
304 bytes_from_session += len;
305 }
306 Err(e) => {
307 error!(
308 error = %e,
309 "Failed to push data from network to socket, closing connection"
310 );
311 break;
312 }
313 },
314 WebSocketInput::WsInput(ws_in) => match ws_in {
315 Ok(Message::Binary(data)) => {
316 let len = data.len();
317 if let Err(e) = tx.write(data.as_ref()).await {
318 error!(error = %e, "Failed to write data to the session, closing connection");
319 break;
320 }
321 bytes_to_session += len;
322 }
323 Ok(Message::Text(_)) => {
324 error!("Received string instead of binary data, closing connection");
325 break;
326 }
327 Ok(Message::Close(_)) => {
328 debug!("Received close frame, closing connection");
329 break;
330 }
331 Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
332 Err(e) => {
333 error!(error = %e, "Failed to get a valid websocket message, closing connection");
334 break;
335 }
336 },
337 }
338 }
339
340 info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
341}
342
343#[serde_as]
344#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
345pub enum RoutingOptions {
346 #[cfg(feature = "explicit-path")]
347 #[schema(value_type = Vec<String>)]
348 IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<PeerId>),
349 Hops(usize),
350}
351
352impl TryFrom<RoutingOptions> for hopr_lib::RoutingOptions {
353 type Error = HoprLibError;
354
355 fn try_from(value: RoutingOptions) -> Result<Self, Self::Error> {
356 match value {
357 #[cfg(feature = "explicit-path")]
358 RoutingOptions::IntermediatePath(path) => {
359 Ok(hopr_lib::RoutingOptions::IntermediatePath(path.into_iter().collect()))
360 }
361 RoutingOptions::Hops(hops) => Ok(hopr_lib::RoutingOptions::Hops(hops.try_into()?)),
362 }
363 }
364}
365
366#[serde_as]
367#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
368#[schema(example = json!({
369 "destination": "12D3KooWR4uwjKCDCAY1xsEFB4esuWLF9Q5ijYvCjz5PNkTbnu33",
370 "path": {
371 "Hops": 1
372 },
373 "target": {"Plain": "localhost:8080"},
374 "listenHost": "127.0.0.1:10000",
375 "capabilities": ["Retransmission", "Segmentation"]
376 }))]
377#[serde(rename_all = "camelCase")]
378pub(crate) struct SessionClientRequest {
379 #[serde_as(as = "DisplayFromStr")]
381 #[schema(value_type = String)]
382 pub destination: PeerOrAddress,
383 pub path: RoutingOptions,
384 pub target: SessionTargetSpec,
385 pub listen_host: Option<String>,
390 #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
391 pub capabilities: Option<Vec<SessionCapability>>,
395}
396
397impl SessionClientRequest {
398 pub(crate) fn into_protocol_session_config(
399 self,
400 target_protocol: IpProtocol,
401 ) -> Result<SessionClientConfig, HoprLibError> {
402 let peer = match self.destination {
403 PeerOrAddress::PeerId(peer_id) => peer_id,
404 PeerOrAddress::Address(address) => {
405 return Err(HoprLibError::GeneralError(format!("invalid destination: {address}")))
406 }
407 };
408
409 Ok(SessionClientConfig {
410 peer,
411 path_options: self.path.try_into()?,
412 target: self.target.into_target(target_protocol)?,
413 capabilities: self
414 .capabilities
415 .map(|vs| {
416 vs.into_iter()
417 .map(|v| {
418 let cap: hopr_lib::SessionCapability = v.into();
419 cap
420 })
421 .collect::<Vec<_>>()
422 })
423 .unwrap_or_else(|| match target_protocol {
424 IpProtocol::TCP => {
425 vec![
426 hopr_lib::SessionCapability::Retransmission,
427 hopr_lib::SessionCapability::Segmentation,
428 ]
429 }
430 _ => vec![], }),
432 })
433 }
434}
435
436#[serde_as]
437#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
438#[schema(example = json!({
439 "target": "example.com:80",
440 "protocol": "tcp",
441 "ip": "127.0.0.1",
442 "port": 5542,
443 "path": { "Hops": 1 }
444 }))]
445#[serde(rename_all = "camelCase")]
446pub(crate) struct SessionClientResponse {
447 pub target: String,
448 #[serde_as(as = "DisplayFromStr")]
449 #[schema(value_type = String)]
450 pub protocol: IpProtocol,
451 pub ip: String,
452 pub path: RoutingOptions,
453 pub port: u16,
454}
455
456fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
461 match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
462 Some(Err(requested)) => {
463 debug!(requested, %default, "using partially default listen host");
465 std::net::SocketAddr::new(
466 requested.parse().unwrap_or(default.ip()),
467 requested
468 .strip_prefix(":")
469 .and_then(|p| u16::from_str(p).ok())
470 .unwrap_or(default.port()),
471 )
472 }
473 Some(Ok(requested)) => {
474 debug!(%requested, "using requested listen host");
475 requested
476 }
477 None => {
478 debug!(%default, "using default listen host");
479 default
480 }
481 }
482}
483
484#[utoipa::path(
501 post,
502 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
503 params(
504 ("protocol" = String, Path, description = "IP transport protocol")
505 ),
506 request_body(
507 content = SessionClientRequest,
508 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
509 content_type = "application/json"),
510 responses(
511 (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
512 (status = 400, description = "Invalid IP protocol.", body = ApiError),
513 (status = 401, description = "Invalid authorization token.", body = ApiError),
514 (status = 409, description = "Listening address and port already in use.", body = ApiError),
515 (status = 422, description = "Unknown failure", body = ApiError),
516 ),
517 security(
518 ("api_token" = []),
519 ("bearer_token" = [])
520 ),
521 tag = "Session"
522 )]
523pub(crate) async fn create_client(
524 State(state): State<Arc<InternalState>>,
525 Path(protocol): Path<IpProtocol>,
526 Json(args): Json<SessionClientRequest>,
527) -> Result<impl IntoResponse, impl IntoResponse> {
528 let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
529
530 if bind_host.port() > 0
531 && state
532 .open_listeners
533 .read()
534 .await
535 .contains_key(&ListenerId(protocol.into(), bind_host))
536 {
537 return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
538 }
539
540 let target = args.target.clone();
541 let path = args.path.clone();
542 let data = args.into_protocol_session_config(protocol).map_err(|e| {
543 (
544 StatusCode::UNPROCESSABLE_ENTITY,
545 ApiErrorStatus::UnknownFailure(e.to_string()),
546 )
547 })?;
548
549 debug!("binding {protocol} session listening socket to {bind_host}");
552 let bound_host = match protocol {
553 IpProtocol::TCP => {
554 let (bound_host, tcp_listener) = tcp_listen_on(bind_host).await.map_err(|e| {
556 if e.kind() == std::io::ErrorKind::AddrInUse {
557 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
558 } else {
559 (
560 StatusCode::UNPROCESSABLE_ENTITY,
561 ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
562 )
563 }
564 })?;
565 info!(%bound_host, "TCP session listener bound");
566
567 let hopr = state.hopr.clone();
570 let jh = hopr_async_runtime::prelude::spawn(
571 tokio_stream::wrappers::TcpListenerStream::new(tcp_listener)
572 .and_then(|sock| async { Ok((sock.peer_addr()?, sock)) })
573 .for_each_concurrent(None, move |accepted_client| {
574 let data = data.clone();
575 let hopr = hopr.clone();
576 async move {
577 match accepted_client {
578 Ok((sock_addr, stream)) => {
579 debug!(socket = ?sock_addr, "incoming TCP connection");
580 let session = match hopr.connect_to(data).await {
581 Ok(s) => s,
582 Err(e) => {
583 error!(error = %e, "failed to establish session");
584 return;
585 }
586 };
587
588 debug!(
589 socket = ?sock_addr,
590 session_id = tracing::field::debug(*session.id()),
591 "new session for incoming TCP connection",
592 );
593
594 #[cfg(all(feature = "prometheus", not(test)))]
595 METRIC_ACTIVE_CLIENTS.increment(&["tcp"], 1.0);
596
597 bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE).await;
598
599 #[cfg(all(feature = "prometheus", not(test)))]
600 METRIC_ACTIVE_CLIENTS.decrement(&["tcp"], 1.0);
601 }
602 Err(e) => error!(error = %e, "failed to accept connection"),
603 }
604 }
605 }),
606 );
607
608 state.open_listeners.write().await.insert(
609 ListenerId(protocol.into(), bound_host),
610 StoredSessionEntry {
611 target: target.clone(),
612 path: path.clone(),
613 jh,
614 },
615 );
616 bound_host
617 }
618 IpProtocol::UDP => {
619 let (bound_host, udp_socket) = udp_bind_to(bind_host).await.map_err(|e| {
621 if e.kind() == std::io::ErrorKind::AddrInUse {
622 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
623 } else {
624 (
625 StatusCode::UNPROCESSABLE_ENTITY,
626 ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
627 )
628 }
629 })?;
630
631 info!(%bound_host, "UDP session listener bound");
632
633 let hopr = state.hopr.clone();
634
635 let session = hopr.connect_to(data).await.map_err(|e| {
637 (
638 StatusCode::UNPROCESSABLE_ENTITY,
639 ApiErrorStatus::UnknownFailure(e.to_string()),
640 )
641 })?;
642
643 let open_listeners_clone = state.open_listeners.clone();
644 let listener_id = ListenerId(protocol.into(), bound_host);
645
646 state.open_listeners.write().await.insert(
647 listener_id,
648 StoredSessionEntry {
649 target: target.clone(),
650 path: path.clone(),
651 jh: hopr_async_runtime::prelude::spawn(async move {
652 #[cfg(all(feature = "prometheus", not(test)))]
653 METRIC_ACTIVE_CLIENTS.increment(&["udp"], 1.0);
654
655 bind_session_to_stream(session, udp_socket, HOPR_UDP_BUFFER_SIZE).await;
656
657 #[cfg(all(feature = "prometheus", not(test)))]
658 METRIC_ACTIVE_CLIENTS.decrement(&["udp"], 1.0);
659
660 open_listeners_clone.write().await.remove(&listener_id);
662 }),
663 },
664 );
665 bound_host
666 }
667 };
668
669 Ok::<_, (StatusCode, ApiErrorStatus)>(
670 (
671 StatusCode::OK,
672 Json(SessionClientResponse {
673 protocol,
674 path,
675 target: target.to_string(),
676 ip: bound_host.ip().to_string(),
677 port: bound_host.port(),
678 }),
679 )
680 .into_response(),
681 )
682}
683
684#[utoipa::path(
686 get,
687 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
688 params(
689 ("protocol" = String, Path, description = "IP transport protocol")
690 ),
691 responses(
692 (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>),
693 (status = 400, description = "Invalid IP protocol.", body = ApiError),
694 (status = 401, description = "Invalid authorization token.", body = ApiError),
695 (status = 422, description = "Unknown failure", body = ApiError)
696 ),
697 security(
698 ("api_token" = []),
699 ("bearer_token" = [])
700 ),
701 tag = "Session",
702)]
703pub(crate) async fn list_clients(
704 State(state): State<Arc<InternalState>>,
705 Path(protocol): Path<IpProtocol>,
706) -> Result<impl IntoResponse, impl IntoResponse> {
707 let response = state
708 .open_listeners
709 .read()
710 .await
711 .iter()
712 .filter(|(id, _)| id.0 == protocol.into())
713 .map(|(id, entry)| SessionClientResponse {
714 protocol,
715 target: entry.target.to_string(),
716 ip: id.1.ip().to_string(),
717 port: id.1.port(),
718 path: entry.path.clone(),
719 })
720 .collect::<Vec<_>>();
721
722 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
723}
724
725#[derive(
726 Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
727)]
728#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
729#[serde(rename_all = "lowercase")]
730pub enum IpProtocol {
731 #[allow(clippy::upper_case_acronyms)]
732 TCP,
733 #[allow(clippy::upper_case_acronyms)]
734 UDP,
735}
736
737impl From<IpProtocol> for hopr_lib::IpProtocol {
738 fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
739 match protocol {
740 IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
741 IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
742 }
743 }
744}
745
746#[serde_as]
747#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
748pub struct SessionCloseClientQuery {
749 #[serde_as(as = "DisplayFromStr")]
750 #[schema(value_type = String)]
751 pub protocol: IpProtocol,
752 pub ip: String,
753 pub port: u16,
754}
755
756#[utoipa::path(
762 delete,
763 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
764 params(SessionCloseClientQuery),
765 responses(
766 (status = 204, description = "Listener closed successfully"),
767 (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
768 (status = 401, description = "Invalid authorization token.", body = ApiError),
769 (status = 404, description = "Listener not found.", body = ApiError),
770 (status = 422, description = "Unknown failure", body = ApiError)
771 ),
772 security(
773 ("api_token" = []),
774 ("bearer_token" = [])
775 ),
776 tag = "Session",
777)]
778pub(crate) async fn close_client(
779 State(state): State<Arc<InternalState>>,
780 Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
781) -> Result<impl IntoResponse, impl IntoResponse> {
782 let listening_ip: IpAddr = ip
783 .parse()
784 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
785
786 {
787 let mut open_listeners = state.open_listeners.write().await;
788
789 let mut to_remove = Vec::new();
790
791 open_listeners
793 .iter()
794 .filter(|(ListenerId(proto, addr), _)| {
795 let protocol: hopr_lib::IpProtocol = protocol.into();
796 protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
797 })
798 .for_each(|(id, _)| to_remove.push(*id));
799
800 if to_remove.is_empty() {
801 return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
802 }
803
804 for bound_addr in to_remove {
805 let entry = open_listeners
806 .remove(&bound_addr)
807 .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
808
809 hopr_async_runtime::prelude::cancel_join_handle(entry.jh).await;
810 }
811 }
812
813 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
814}
815
816async fn try_restricted_bind<F, S, Fut>(
817 addrs: Vec<std::net::SocketAddr>,
818 range_str: &str,
819 binder: F,
820) -> std::io::Result<S>
821where
822 F: Fn(Vec<std::net::SocketAddr>) -> Fut,
823 Fut: Future<Output = std::io::Result<S>>,
824{
825 if addrs.is_empty() {
826 return Err(std::io::Error::other("no valid socket addresses found"));
827 }
828
829 let range = range_str
830 .split_once(":")
831 .and_then(
832 |(a, b)| match u16::from_str(a).and_then(|a| Ok((a, u16::from_str(b)?))) {
833 Ok((a, b)) if a <= b => Some(a..=b),
834 _ => None,
835 },
836 )
837 .ok_or(std::io::Error::other(format!("invalid port range {range_str}")))?;
838
839 for port in range {
840 let addrs = addrs
841 .iter()
842 .map(|addr| std::net::SocketAddr::new(addr.ip(), port))
843 .collect::<Vec<_>>();
844 match binder(addrs).await {
845 Ok(listener) => return Ok(listener),
846 Err(error) => debug!(%error, "listen address not usable"),
847 }
848 }
849
850 Err(std::io::Error::new(
851 std::io::ErrorKind::AddrNotAvailable,
852 format!("no valid socket addresses found within range: {range_str}"),
853 ))
854}
855
856async fn tcp_listen_on<A: std::net::ToSocketAddrs>(address: A) -> std::io::Result<(std::net::SocketAddr, TcpListener)> {
857 let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
858
859 if addrs.iter().all(|a| a.port() == 0) {
862 if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
863 let tcp_listener =
864 try_restricted_bind(
865 addrs,
866 &range_str,
867 |a| async move { TcpListener::bind(a.as_slice()).await },
868 )
869 .await?;
870 return Ok((tcp_listener.local_addr()?, tcp_listener));
871 }
872 }
873
874 let tcp_listener = TcpListener::bind(addrs.as_slice()).await?;
875 Ok((tcp_listener.local_addr()?, tcp_listener))
876}
877
878async fn udp_bind_to<A: std::net::ToSocketAddrs>(
879 address: A,
880) -> std::io::Result<(std::net::SocketAddr, ConnectedUdpStream)> {
881 let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
882
883 let builder = ConnectedUdpStream::builder()
884 .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
885 .with_foreign_data_mode(ForeignDataMode::Discard) .with_queue_size(HOPR_UDP_QUEUE_SIZE)
887 .with_receiver_parallelism(UdpStreamParallelism::Auto);
888
889 if addrs.iter().all(|a| a.port() == 0) {
892 if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
893 let udp_listener = try_restricted_bind(addrs, &range_str, |addrs| {
894 futures::future::ready(builder.clone().build(addrs.as_slice()))
895 })
896 .await?;
897
898 return Ok((*udp_listener.bound_address(), udp_listener));
899 }
900 }
901
902 let udp_socket = builder.build(address)?;
903 Ok((*udp_socket.bound_address(), udp_socket))
904}
905
906async fn bind_session_to_stream<T>(mut session: HoprSession, mut stream: T, max_buf: usize)
907where
908 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
909{
910 let session_id = *session.id();
911 match transfer_session(&mut session, &mut stream, max_buf).await {
912 Ok((session_to_stream_bytes, stream_to_session_bytes)) => info!(
913 session_id = ?session_id,
914 session_to_stream_bytes, stream_to_session_bytes, "client session ended",
915 ),
916 Err(error) => error!(
917 session_id = ?session_id,
918 %error,
919 "error during data transfer"
920 ),
921 }
922}
923
924#[cfg(test)]
925mod tests {
926 use super::*;
927 use anyhow::Context;
928 use futures::channel::mpsc::UnboundedSender;
929 use hopr_lib::{ApplicationData, Keypair, PeerId, SendMsg};
930 use hopr_transport_session::errors::TransportSessionError;
931 use std::collections::HashSet;
932 use tokio::io::{AsyncReadExt, AsyncWriteExt};
933
934 pub struct SendMsgResender {
935 tx: UnboundedSender<Box<[u8]>>,
936 }
937
938 impl SendMsgResender {
939 pub fn new(tx: UnboundedSender<Box<[u8]>>) -> Self {
940 Self { tx }
941 }
942 }
943
944 #[hopr_lib::async_trait]
945 impl SendMsg for SendMsgResender {
946 async fn send_message(
948 &self,
949 data: ApplicationData,
950 _destination: PeerId,
951 _options: hopr_lib::RoutingOptions,
952 ) -> std::result::Result<(), TransportSessionError> {
953 let (_peer, data) = hopr_transport_session::types::unwrap_offchain_key(data.plain_text)?;
954
955 self.tx
956 .clone()
957 .unbounded_send(data)
958 .map_err(|_| TransportSessionError::Closed)?;
959
960 Ok(())
961 }
962 }
963
964 #[tokio::test]
965 async fn hoprd_session_connection_should_create_a_working_tcp_socket_through_which_data_can_be_sent_and_received(
966 ) -> anyhow::Result<()> {
967 let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
968
969 let peer: hopr_lib::PeerId = hopr_lib::HoprOffchainKeypair::random().public().into();
970 let session = hopr_lib::HoprSession::new(
971 hopr_lib::HoprSessionId::new(4567, peer),
972 peer,
973 hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
974 HashSet::default(),
975 Arc::new(SendMsgResender::new(tx)),
976 rx,
977 None,
978 );
979
980 let (bound_addr, tcp_listener) = tcp_listen_on(("127.0.0.1", 0)).await.context("listen_on failed")?;
981
982 tokio::task::spawn(async move {
983 match tcp_listener.accept().await {
984 Ok((stream, _)) => bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE).await,
985 Err(e) => error!("failed to accept connection: {e}"),
986 }
987 });
988
989 let mut tcp_stream = tokio::net::TcpStream::connect(bound_addr)
990 .await
991 .context("connect failed")?;
992
993 let data = vec![b"hello", b"world", b"this ", b"is ", b" a", b" test"];
994
995 for d in data.clone().into_iter() {
996 tcp_stream.write_all(d).await.context("write failed")?;
997 }
998
999 for d in data.iter() {
1000 let mut buf = vec![0; d.len()];
1001 tcp_stream.read_exact(&mut buf).await.context("read failed")?;
1002 }
1003
1004 Ok(())
1005 }
1006
1007 #[tokio::test]
1008 async fn hoprd_session_connection_should_create_a_working_udp_socket_through_which_data_can_be_sent_and_received(
1009 ) -> anyhow::Result<()> {
1010 let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
1011
1012 let peer: hopr_lib::PeerId = hopr_lib::HoprOffchainKeypair::random().public().into();
1013 let session = hopr_lib::HoprSession::new(
1014 hopr_lib::HoprSessionId::new(4567, peer),
1015 peer,
1016 hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1017 HashSet::default(),
1018 Arc::new(SendMsgResender::new(tx)),
1019 rx,
1020 None,
1021 );
1022
1023 let (listen_addr, udp_listener) = udp_bind_to(("127.0.0.1", 0)).await.context("udp_bind_to failed")?;
1024
1025 tokio::task::spawn(bind_session_to_stream(
1026 session,
1027 udp_listener,
1028 hopr_lib::SESSION_USABLE_MTU_SIZE,
1029 ));
1030
1031 let mut udp_stream = ConnectedUdpStream::builder()
1032 .with_buffer_size(hopr_lib::SESSION_USABLE_MTU_SIZE)
1033 .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1034 .with_counterparty(listen_addr)
1035 .build(("127.0.0.1", 0))
1036 .context("bind failed")?;
1037
1038 let data = vec![b"hello", b"world", b"this ", b"is ", b" a", b" test"];
1039
1040 for d in data.clone().into_iter() {
1041 udp_stream.write_all(d).await.context("write failed")?;
1042 }
1043
1044 for d in data.iter() {
1045 let mut buf = vec![0; d.len()];
1046 udp_stream.read_exact(&mut buf).await.context("read failed")?;
1047 }
1048
1049 Ok(())
1050 }
1051
1052 #[test]
1053 fn test_build_binding_address() {
1054 let default = "10.0.0.1:10000".parse().unwrap();
1055
1056 let result = build_binding_host(Some("127.0.0.1:10000"), default);
1057 assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1058
1059 let result = build_binding_host(None, default);
1060 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1061
1062 let result = build_binding_host(Some("127.0.0.1"), default);
1063 assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1064
1065 let result = build_binding_host(Some(":1234"), default);
1066 assert_eq!(result, "10.0.0.1:1234".parse::<std::net::SocketAddr>().unwrap());
1067
1068 let result = build_binding_host(Some(":"), default);
1069 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1070
1071 let result = build_binding_host(Some(""), default);
1072 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1073 }
1074}