1use std::{collections::VecDeque, fmt::Formatter, future::Future, net::IpAddr, str::FromStr, sync::Arc};
2
3use axum::{
4 Error,
5 extract::{
6 Json, Path, State,
7 ws::{Message, WebSocket, WebSocketUpgrade},
8 },
9 http::status::StatusCode,
10 response::IntoResponse,
11};
12use axum_extra::extract::Query;
13use base64::Engine;
14use futures::{AsyncReadExt, AsyncWriteExt, SinkExt, StreamExt, TryStreamExt, future::AbortHandle};
15use futures_concurrency::stream::Merge;
16use hopr_lib::{
17 Address, Hopr, HoprSession, SESSION_PAYLOAD_SIZE, ServiceId, SessionCapabilities, SessionClientConfig,
18 SessionTarget, SurbBalancerConfig, errors::HoprLibError, transfer_session,
19};
20use hopr_network_types::{
21 prelude::{ConnectedUdpStream, IpOrHost, SealedHost, UdpStreamParallelism},
22 udp::ForeignDataMode,
23 utils::AsyncReadStreamer,
24};
25use serde::{Deserialize, Serialize};
26use serde_with::{DisplayFromStr, serde_as};
27use tokio::net::TcpListener;
28use tracing::{debug, error, info, trace};
29
30use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState, ListenerId};
31
32pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
34
35pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
37
38pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
40
41#[cfg(all(feature = "prometheus", not(test)))]
42lazy_static::lazy_static! {
43 static ref METRIC_ACTIVE_CLIENTS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
44 "hopr_session_hoprd_clients",
45 "Number of clients connected at this Entry node",
46 &["type"]
47 ).unwrap();
48}
49#[serde_as]
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
51#[schema(
52 example = json!({"Plain": "example.com:80"}),
53 example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), example = json!({"Service": 0})
55)]
56pub enum SessionTargetSpec {
58 Plain(String),
59 Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
60 #[schema(value_type = u32)]
61 Service(ServiceId),
62}
63
64impl std::fmt::Display for SessionTargetSpec {
65 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66 match self {
67 SessionTargetSpec::Plain(t) => write!(f, "{t}"),
68 SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
69 SessionTargetSpec::Service(t) => write!(f, "#{t}"),
70 }
71 }
72}
73
74impl FromStr for SessionTargetSpec {
75 type Err = HoprLibError;
76
77 fn from_str(s: &str) -> Result<Self, Self::Err> {
78 Ok(if let Some(stripped) = s.strip_prefix("$$") {
79 Self::Sealed(
80 base64::prelude::BASE64_URL_SAFE
81 .decode(stripped)
82 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
83 )
84 } else if let Some(stripped) = s.strip_prefix("#") {
85 Self::Service(
86 stripped
87 .parse()
88 .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
89 )
90 } else {
91 Self::Plain(s.to_owned())
92 })
93 }
94}
95
96impl SessionTargetSpec {
97 pub fn into_target(self, protocol: IpProtocol) -> Result<SessionTarget, HoprLibError> {
98 Ok(match (protocol, self) {
99 (IpProtocol::TCP, SessionTargetSpec::Plain(plain)) => SessionTarget::TcpStream(
100 IpOrHost::from_str(&plain)
101 .map(SealedHost::from)
102 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
103 ),
104 (IpProtocol::UDP, SessionTargetSpec::Plain(plain)) => SessionTarget::UdpStream(
105 IpOrHost::from_str(&plain)
106 .map(SealedHost::from)
107 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
108 ),
109 (IpProtocol::TCP, SessionTargetSpec::Sealed(enc)) => {
110 SessionTarget::TcpStream(SealedHost::Sealed(enc.into_boxed_slice()))
111 }
112 (IpProtocol::UDP, SessionTargetSpec::Sealed(enc)) => {
113 SessionTarget::UdpStream(SealedHost::Sealed(enc.into_boxed_slice()))
114 }
115 (_, SessionTargetSpec::Service(id)) => SessionTarget::ExitNode(id),
116 })
117 }
118}
119
120#[derive(Debug)]
122pub struct StoredSessionEntry {
123 pub destination: Address,
125 pub target: SessionTargetSpec,
127 pub forward_path: RoutingOptions,
129 pub return_path: RoutingOptions,
131 pub abort_handle: AbortHandle,
133}
134
135#[repr(u8)]
136#[derive(
137 Debug,
138 Clone,
139 strum::EnumIter,
140 strum::Display,
141 strum::EnumString,
142 Serialize,
143 Deserialize,
144 PartialEq,
145 utoipa::ToSchema,
146)]
147#[schema(example = "Segmentation")]
148pub enum SessionCapability {
150 Segmentation,
152 Retransmission,
154 RetransmissionAckOnly,
156 NoDelay,
158}
159
160impl From<SessionCapability> for hopr_lib::SessionCapabilities {
161 fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
162 match cap {
163 SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
164 SessionCapability::Retransmission => {
165 hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
166 }
167 SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
168 SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
169 }
170 }
171}
172
173#[serde_as]
174#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
175#[serde(rename_all = "camelCase")]
176pub(crate) struct SessionWebsocketClientQueryRequest {
177 #[serde_as(as = "DisplayFromStr")]
178 #[schema(required = true, value_type = String)]
179 pub destination: Address,
180 #[schema(required = true)]
181 pub hops: u8,
182 #[cfg(feature = "explicit-path")]
183 #[schema(required = false, value_type = String)]
184 pub path: Option<Vec<Address>>,
185 #[schema(required = true)]
186 #[serde_as(as = "Vec<DisplayFromStr>")]
187 pub capabilities: Vec<SessionCapability>,
188 #[schema(required = true)]
189 #[serde_as(as = "DisplayFromStr")]
190 pub target: SessionTargetSpec,
191 #[schema(required = false)]
192 #[serde(default = "default_protocol")]
193 pub protocol: IpProtocol,
194}
195
196#[inline]
197fn default_protocol() -> IpProtocol {
198 IpProtocol::TCP
199}
200
201impl SessionWebsocketClientQueryRequest {
202 pub(crate) async fn into_protocol_session_config(
203 self,
204 ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
205 #[cfg(not(feature = "explicit-path"))]
206 let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
207
208 #[cfg(feature = "explicit-path")]
209 let path_options = if let Some(path) = self.path {
210 hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
212 } else {
213 hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
214 };
215
216 let mut capabilities = SessionCapabilities::empty();
217 capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
218
219 Ok((
220 self.destination,
221 self.target.into_target(self.protocol)?,
222 SessionClientConfig {
223 forward_path_options: path_options.clone(),
224 return_path_options: path_options.clone(), capabilities,
226 ..Default::default()
227 },
228 ))
229 }
230}
231
232#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
233#[schema(value_type = String, format = Binary)]
234#[allow(dead_code)] struct WssData(Vec<u8>);
236
237#[allow(dead_code)] #[utoipa::path(
249 get,
250 path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
251 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
252 request_body(
253 content = SessionWebsocketClientQueryRequest,
254 content_type = "application/json",
255 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
256 ),
257 responses(
258 (status = 200, description = "Successfully created a new client websocket session."),
259 (status = 401, description = "Invalid authorization token.", body = ApiError),
260 (status = 422, description = "Unknown failure", body = ApiError),
261 (status = 429, description = "Too many open websocket connections.", body = ApiError),
262 ),
263 security(
264 ("api_token" = []),
265 ("bearer_token" = [])
266 ),
267 tag = "Session",
268 )]
269
270pub(crate) async fn websocket(
271 ws: WebSocketUpgrade,
272 Query(query): Query<SessionWebsocketClientQueryRequest>,
273 State(state): State<Arc<InternalState>>,
274) -> Result<impl IntoResponse, impl IntoResponse> {
275 let (dst, target, data) = query
276 .into_protocol_session_config()
277 .await
278 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
279
280 let hopr = state.hopr.clone();
281 let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
282 error!(error = %e, "Failed to establish session");
283 (
284 StatusCode::UNPROCESSABLE_ENTITY,
285 ApiErrorStatus::UnknownFailure(e.to_string()),
286 )
287 })?;
288
289 Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
290}
291
292enum WebSocketInput {
293 Network(Result<Box<[u8]>, std::io::Error>),
294 WsInput(Result<Message, Error>),
295}
296
297const WS_MAX_SESSION_READ_SIZE: usize = 4096;
299
300#[tracing::instrument(level = "debug", skip(socket, session))]
301async fn websocket_connection(socket: WebSocket, session: HoprSession) {
302 let session_id = *session.id();
303
304 let (rx, mut tx) = session.split();
305 let (mut sender, receiver) = socket.split();
306
307 let mut queue = (
308 receiver.map(WebSocketInput::WsInput),
309 AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
310 )
311 .merge();
312
313 let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
314
315 while let Some(v) = queue.next().await {
316 match v {
317 WebSocketInput::Network(bytes) => match bytes {
318 Ok(bytes) => {
319 let len = bytes.len();
320 if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
321 error!(
322 error = %e,
323 "Failed to emit read data onto the websocket, closing connection"
324 );
325 break;
326 };
327 bytes_from_session += len;
328 }
329 Err(e) => {
330 error!(
331 error = %e,
332 "Failed to push data from network to socket, closing connection"
333 );
334 break;
335 }
336 },
337 WebSocketInput::WsInput(ws_in) => match ws_in {
338 Ok(Message::Binary(data)) => {
339 let len = data.len();
340 if let Err(e) = tx.write(data.as_ref()).await {
341 error!(error = %e, "Failed to write data to the session, closing connection");
342 break;
343 }
344 bytes_to_session += len;
345 }
346 Ok(Message::Text(_)) => {
347 error!("Received string instead of binary data, closing connection");
348 break;
349 }
350 Ok(Message::Close(_)) => {
351 debug!("Received close frame, closing connection");
352 break;
353 }
354 Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
355 Err(e) => {
356 error!(error = %e, "Failed to get a valid websocket message, closing connection");
357 break;
358 }
359 },
360 }
361 }
362
363 info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
364}
365
366#[serde_as]
367#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
368#[schema(example = json!({ "Hops": 1 }))]
369pub enum RoutingOptions {
371 #[cfg(feature = "explicit-path")]
372 #[schema(value_type = Vec<String>)]
373 IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<Address>),
374 Hops(usize),
375}
376
377impl RoutingOptions {
378 pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
379 Ok(match self {
380 #[cfg(feature = "explicit-path")]
381 RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
382 RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
383 })
384 }
385}
386
387#[serde_as]
388#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
389#[schema(example = json!({
390 "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
391 "forwardPath": { "Hops": 1 },
392 "returnPath": { "Hops": 1 },
393 "target": {"Plain": "localhost:8080"},
394 "listenHost": "127.0.0.1:10000",
395 "capabilities": ["Retransmission", "Segmentation"],
396 "responseBuffer": "2 MB",
397 "sessionPool": 0,
398 }))]
399#[serde(rename_all = "camelCase")]
400pub(crate) struct SessionClientRequest {
402 #[serde_as(as = "DisplayFromStr")]
404 #[schema(value_type = String)]
405 pub destination: Address,
406 pub forward_path: RoutingOptions,
408 pub return_path: RoutingOptions,
410 pub target: SessionTargetSpec,
412 pub listen_host: Option<String>,
417 #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
418 pub capabilities: Option<Vec<SessionCapability>>,
422 #[serde_as(as = "Option<DisplayFromStr>")]
432 #[schema(value_type = String)]
433 pub response_buffer: Option<bytesize::ByteSize>,
434 pub session_pool: Option<usize>,
441}
442
443impl SessionClientRequest {
444 pub(crate) async fn into_protocol_session_config(
445 self,
446 target_protocol: IpProtocol,
447 ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
448 Ok((
449 self.destination,
450 self.target.into_target(target_protocol)?,
451 SessionClientConfig {
452 forward_path_options: self.forward_path.resolve().await?,
453 return_path_options: self.return_path.resolve().await?,
454 capabilities: self
455 .capabilities
456 .map(|vs| {
457 let mut caps = SessionCapabilities::empty();
458 caps.extend(vs.into_iter().map(SessionCapabilities::from));
459 caps
460 })
461 .unwrap_or_else(|| match target_protocol {
462 IpProtocol::TCP => {
463 hopr_lib::SessionCapability::RetransmissionAck
464 | hopr_lib::SessionCapability::RetransmissionNack
465 | hopr_lib::SessionCapability::Segmentation
466 }
467 _ => SessionCapability::Segmentation.into(),
469 }),
470 surb_management: match self.response_buffer {
471 Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_PAYLOAD_SIZE as u64 => {
473 Some(SurbBalancerConfig {
474 target_surb_buffer_size: buffer_size.as_u64() / SESSION_PAYLOAD_SIZE as u64,
475 ..Default::default()
476 })
477 }
478 Some(_) => None,
480 None => Some(SurbBalancerConfig::default()),
482 },
483 ..Default::default()
484 },
485 ))
486 }
487}
488
489#[serde_as]
490#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
491#[schema(example = json!({
492 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
493 "target": "example.com:80",
494 "forwardPath": { "Hops": 1 },
495 "returnPath": { "Hops": 1 },
496 "protocol": "tcp",
497 "ip": "127.0.0.1",
498 "port": 5542,
499 "mtu": 987
500 }))]
501#[serde(rename_all = "camelCase")]
502pub(crate) struct SessionClientResponse {
504 #[schema(example = "example.com:80")]
505 pub target: String,
507 #[serde_as(as = "DisplayFromStr")]
509 #[schema(value_type = String)]
510 pub destination: Address,
511 pub forward_path: RoutingOptions,
513 pub return_path: RoutingOptions,
515 #[serde_as(as = "DisplayFromStr")]
517 #[schema(example = "tcp")]
518 pub protocol: IpProtocol,
519 #[schema(example = "127.0.0.1")]
521 pub ip: String,
522 #[schema(example = 5542)]
523 pub port: u16,
525 pub mtu: usize,
527}
528
529fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
534 match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
535 Some(Err(requested)) => {
536 debug!(requested, %default, "using partially default listen host");
538 std::net::SocketAddr::new(
539 requested.parse().unwrap_or(default.ip()),
540 requested
541 .strip_prefix(":")
542 .and_then(|p| u16::from_str(p).ok())
543 .unwrap_or(default.port()),
544 )
545 }
546 Some(Ok(requested)) => {
547 debug!(%requested, "using requested listen host");
548 requested
549 }
550 None => {
551 debug!(%default, "using default listen host");
552 default
553 }
554 }
555}
556
557struct SessionPool {
558 pool: Option<Arc<std::sync::Mutex<VecDeque<HoprSession>>>>,
559 ah: Option<AbortHandle>,
560}
561
562impl SessionPool {
563 pub const MAX_SESSION_POOL_SIZE: usize = 5;
564
565 async fn new(
566 size: usize,
567 dst: Address,
568 target: SessionTarget,
569 cfg: SessionClientConfig,
570 hopr: Arc<Hopr>,
571 ) -> Result<Self, (StatusCode, ApiErrorStatus)> {
572 let pool = Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(size)));
573 let hopr_clone = hopr.clone();
574 let pool_clone = pool.clone();
575 futures::stream::iter(0..size.min(Self::MAX_SESSION_POOL_SIZE))
576 .map(Ok)
577 .try_for_each_concurrent(Self::MAX_SESSION_POOL_SIZE, move |i| {
578 let pool = pool_clone.clone();
579 let hopr = hopr_clone.clone();
580 let target = target.clone();
581 let cfg = cfg.clone();
582 async move {
583 match hopr.connect_to(dst, target.clone(), cfg.clone()).await {
584 Ok(s) => {
585 debug!(session_id = %s.id(), num_session = i, "created a new session in pool");
586 pool.lock()
587 .map_err(|_| {
588 (
589 StatusCode::INTERNAL_SERVER_ERROR,
590 ApiErrorStatus::UnknownFailure("lock failed".into()),
591 )
592 })?
593 .push_back(s);
594 Ok(())
595 }
596 Err(error) => {
597 error!(%error, num_session = i, "failed to establish session for pool");
598 Err((
599 StatusCode::INTERNAL_SERVER_ERROR,
600 ApiErrorStatus::UnknownFailure(format!(
601 "failed to establish session #{i} in pool to {dst}: {error}"
602 )),
603 ))
604 }
605 }
606 }
607 })
608 .await?;
609
610 if !pool.lock().map(|p| p.is_empty()).unwrap_or(true) {
612 let pool_clone_1 = pool.clone();
613 let pool_clone_2 = pool.clone();
614 let pool_clone_3 = pool.clone();
615 Ok(Self {
616 pool: Some(pool),
617 ah: Some(hopr_async_runtime::spawn_as_abortable(
618 futures_time::stream::interval(futures_time::time::Duration::from(
619 std::time::Duration::from_secs(1).max(hopr.config().session.idle_timeout / 2)
620 ))
621 .take_while(move |_| {
622 futures::future::ready(pool_clone_1.lock().is_ok_and(|p| !p.is_empty()))
624 })
625 .flat_map(move |_| {
626 let ids = pool_clone_2.lock().ok().map(|v| v.iter().map(|s| *s.id()).collect::<Vec<_>>());
628 futures::stream::iter(ids.into_iter().flatten())
629 })
630 .for_each(move |id| {
631 let hopr = hopr.clone();
632 let pool = pool_clone_3.clone();
633 async move {
634 if let Err(error) = hopr.keep_alive_session(&id).await {
636 error!(%error, %dst, session_id = %id, "session in pool is not alive, removing from pool");
637 if let Ok(mut pool) = pool.lock() {
638 pool.retain(|s| *s.id() != id);
639 }
640 }
641 }
642 })
643 ))
644 })
645 } else {
646 Ok(Self { pool: None, ah: None })
647 }
648 }
649
650 fn pop(&mut self) -> Option<HoprSession> {
651 self.pool.as_ref().and_then(|pool| pool.lock().ok()?.pop_front())
652 }
653}
654
655impl Drop for SessionPool {
656 fn drop(&mut self) {
657 if let Some(ah) = self.ah.take() {
658 ah.abort();
659 }
660 }
661}
662
663async fn create_tcp_client_binding(
664 bind_host: std::net::SocketAddr,
665 state: Arc<InternalState>,
666 args: SessionClientRequest,
667) -> Result<std::net::SocketAddr, (StatusCode, ApiErrorStatus)> {
668 let target_spec = args.target.clone();
669 let (dst, target, data) = args
670 .clone()
671 .into_protocol_session_config(IpProtocol::TCP)
672 .await
673 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
674
675 let (bound_host, tcp_listener) = tcp_listen_on(bind_host).await.map_err(|e| {
677 if e.kind() == std::io::ErrorKind::AddrInUse {
678 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
679 } else {
680 (
681 StatusCode::UNPROCESSABLE_ENTITY,
682 ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
683 )
684 }
685 })?;
686 info!(%bound_host, "TCP session listener bound");
687
688 let hopr = state.hopr.clone();
691
692 let mut session_pool = SessionPool::new(
694 args.session_pool.unwrap_or(0),
695 dst,
696 target.clone(),
697 data.clone(),
698 hopr.clone(),
699 )
700 .await?;
701
702 state.open_listeners.write_arc().await.insert(
703 ListenerId(hopr_network_types::types::IpProtocol::TCP, bound_host),
704 StoredSessionEntry {
705 destination: dst,
706 target: target_spec.clone(),
707 forward_path: args.forward_path.clone(),
708 return_path: args.return_path.clone(),
709 abort_handle: hopr_async_runtime::spawn_as_abortable(
710 tokio_stream::wrappers::TcpListenerStream::new(tcp_listener)
711 .and_then(|sock| async { Ok((sock.peer_addr()?, sock)) })
712 .for_each_concurrent(None, move |accepted_client| {
713 let data = data.clone();
714 let target = target.clone();
715 let hopr = hopr.clone();
716
717 let maybe_pooled_session = accepted_client.is_ok().then(|| session_pool.pop()).flatten();
719 async move {
720 match accepted_client {
721 Ok((sock_addr, stream)) => {
722 debug!(socket = ?sock_addr, "incoming TCP connection");
723 let session = match maybe_pooled_session {
724 Some(s) => {
725 debug!(session_id = %s.id(), "using pooled session");
726 s
727 }
728 None => {
729 debug!("no more active sessions in pool, creating a new one");
730 match hopr.connect_to(dst, target, data).await {
731 Ok(s) => s,
732 Err(error) => {
733 error!(%error, "failed to establish session");
734 return;
735 }
736 }
737 }
738 };
739
740 debug!(
741 socket = ?sock_addr,
742 session_id = tracing::field::debug(*session.id()),
743 "new session for incoming TCP connection",
744 );
745
746 #[cfg(all(feature = "prometheus", not(test)))]
747 METRIC_ACTIVE_CLIENTS.increment(&["tcp"], 1.0);
748
749 bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE).await;
750
751 #[cfg(all(feature = "prometheus", not(test)))]
752 METRIC_ACTIVE_CLIENTS.decrement(&["tcp"], 1.0);
753 }
754 Err(e) => error!(error = %e, "failed to accept connection"),
755 }
756 }
757 }),
758 ),
759 },
760 );
761 Ok(bound_host)
762}
763
764async fn create_udp_client_binding(
765 bind_host: std::net::SocketAddr,
766 state: Arc<InternalState>,
767 args: SessionClientRequest,
768) -> Result<std::net::SocketAddr, (StatusCode, ApiErrorStatus)> {
769 let target_spec = args.target.clone();
770 let (dst, target, data) = args
771 .clone()
772 .into_protocol_session_config(IpProtocol::UDP)
773 .await
774 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
775
776 let (bound_host, udp_socket) = udp_bind_to(bind_host).await.map_err(|e| {
778 if e.kind() == std::io::ErrorKind::AddrInUse {
779 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
780 } else {
781 (
782 StatusCode::UNPROCESSABLE_ENTITY,
783 ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
784 )
785 }
786 })?;
787
788 info!(%bound_host, "UDP session listener bound");
789
790 let hopr = state.hopr.clone();
791
792 let session = hopr.connect_to(dst, target, data).await.map_err(|e| {
794 (
795 StatusCode::UNPROCESSABLE_ENTITY,
796 ApiErrorStatus::UnknownFailure(e.to_string()),
797 )
798 })?;
799
800 let open_listeners_clone = state.open_listeners.clone();
801 let listener_id = ListenerId(hopr_network_types::types::IpProtocol::UDP, bound_host);
802
803 state.open_listeners.write_arc().await.insert(
804 listener_id,
805 StoredSessionEntry {
806 destination: dst,
807 target: target_spec.clone(),
808 forward_path: args.forward_path.clone(),
809 return_path: args.return_path.clone(),
810 abort_handle: hopr_async_runtime::spawn_as_abortable(async move {
811 #[cfg(all(feature = "prometheus", not(test)))]
812 METRIC_ACTIVE_CLIENTS.increment(&["udp"], 1.0);
813
814 bind_session_to_stream(session, udp_socket, HOPR_UDP_BUFFER_SIZE).await;
815
816 #[cfg(all(feature = "prometheus", not(test)))]
817 METRIC_ACTIVE_CLIENTS.decrement(&["udp"], 1.0);
818
819 open_listeners_clone.write_arc().await.remove(&listener_id);
821 }),
822 },
823 );
824 Ok(bound_host)
825}
826
827#[utoipa::path(
844 post,
845 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
846 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
847 params(
848 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
849 ),
850 request_body(
851 content = SessionClientRequest,
852 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
853 content_type = "application/json"),
854 responses(
855 (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
856 (status = 400, description = "Invalid IP protocol.", body = ApiError),
857 (status = 401, description = "Invalid authorization token.", body = ApiError),
858 (status = 409, description = "Listening address and port already in use.", body = ApiError),
859 (status = 422, description = "Unknown failure", body = ApiError),
860 ),
861 security(
862 ("api_token" = []),
863 ("bearer_token" = [])
864 ),
865 tag = "Session"
866 )]
867pub(crate) async fn create_client(
868 State(state): State<Arc<InternalState>>,
869 Path(protocol): Path<IpProtocol>,
870 Json(args): Json<SessionClientRequest>,
871) -> Result<impl IntoResponse, impl IntoResponse> {
872 let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
873
874 if bind_host.port() > 0
875 && state
876 .open_listeners
877 .read_arc()
878 .await
879 .contains_key(&ListenerId(protocol.into(), bind_host))
880 {
881 return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
882 }
883
884 debug!("binding {protocol} session listening socket to {bind_host}");
885 let bound_host = match protocol {
886 IpProtocol::TCP => create_tcp_client_binding(bind_host, state, args.clone()).await?,
887 IpProtocol::UDP => create_udp_client_binding(bind_host, state, args.clone()).await?,
888 };
889
890 Ok::<_, (StatusCode, ApiErrorStatus)>(
891 (
892 StatusCode::OK,
893 Json(SessionClientResponse {
894 protocol,
895 ip: bound_host.ip().to_string(),
896 port: bound_host.port(),
897 target: args.target.to_string(),
898 destination: args.destination,
899 forward_path: args.forward_path.clone(),
900 return_path: args.return_path.clone(),
901 mtu: SESSION_PAYLOAD_SIZE,
902 }),
903 )
904 .into_response(),
905 )
906}
907
908#[utoipa::path(
910 get,
911 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
912 description = "Lists existing Session listeners for the given IP protocol.",
913 params(
914 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
915 ),
916 responses(
917 (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
918 {
919 "target": "example.com:80",
920 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
921 "forwardPath": { "Hops": 1 },
922 "returnPath": { "Hops": 1 },
923 "protocol": "tcp",
924 "ip": "127.0.0.1",
925 "port": 5542,
926 "mtu": 987
927 }
928 ])),
929 (status = 400, description = "Invalid IP protocol.", body = ApiError),
930 (status = 401, description = "Invalid authorization token.", body = ApiError),
931 (status = 422, description = "Unknown failure", body = ApiError)
932 ),
933 security(
934 ("api_token" = []),
935 ("bearer_token" = [])
936 ),
937 tag = "Session",
938)]
939pub(crate) async fn list_clients(
940 State(state): State<Arc<InternalState>>,
941 Path(protocol): Path<IpProtocol>,
942) -> Result<impl IntoResponse, impl IntoResponse> {
943 let response = state
944 .open_listeners
945 .read_arc()
946 .await
947 .iter()
948 .filter(|(id, _)| id.0 == protocol.into())
949 .map(|(id, entry)| SessionClientResponse {
950 protocol,
951 ip: id.1.ip().to_string(),
952 port: id.1.port(),
953 target: entry.target.to_string(),
954 forward_path: entry.forward_path.clone(),
955 return_path: entry.return_path.clone(),
956 destination: entry.destination,
957 mtu: SESSION_PAYLOAD_SIZE,
958 })
959 .collect::<Vec<_>>();
960
961 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
962}
963
964#[derive(
965 Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
966)]
967#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
968#[serde(rename_all = "lowercase")]
969#[schema(example = "tcp")]
970pub enum IpProtocol {
972 #[allow(clippy::upper_case_acronyms)]
973 TCP,
974 #[allow(clippy::upper_case_acronyms)]
975 UDP,
976}
977
978impl From<IpProtocol> for hopr_lib::IpProtocol {
979 fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
980 match protocol {
981 IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
982 IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
983 }
984 }
985}
986
987#[serde_as]
988#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
989pub struct SessionCloseClientQuery {
990 #[serde_as(as = "DisplayFromStr")]
991 #[schema(value_type = String, example = "tcp")]
992 pub protocol: IpProtocol,
994
995 #[schema(example = "127.0.0.1:8545")]
997 pub ip: String,
998
999 #[schema(value_type = u16, example = 10101)]
1001 pub port: u16,
1002}
1003
1004#[utoipa::path(
1010 delete,
1011 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
1012 description = "Closes an existing Session listener.",
1013 params(SessionCloseClientQuery),
1014 responses(
1015 (status = 204, description = "Listener closed successfully"),
1016 (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
1017 (status = 401, description = "Invalid authorization token.", body = ApiError),
1018 (status = 404, description = "Listener not found.", body = ApiError),
1019 (status = 422, description = "Unknown failure", body = ApiError)
1020 ),
1021 security(
1022 ("api_token" = []),
1023 ("bearer_token" = [])
1024 ),
1025 tag = "Session",
1026)]
1027pub(crate) async fn close_client(
1028 State(state): State<Arc<InternalState>>,
1029 Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
1030) -> Result<impl IntoResponse, impl IntoResponse> {
1031 let listening_ip: IpAddr = ip
1032 .parse()
1033 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
1034
1035 {
1036 let mut open_listeners = state.open_listeners.write_arc().await;
1037
1038 let mut to_remove = Vec::new();
1039
1040 open_listeners
1042 .iter()
1043 .filter(|(ListenerId(proto, addr), _)| {
1044 let protocol: hopr_lib::IpProtocol = protocol.into();
1045 protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1046 })
1047 .for_each(|(id, _)| to_remove.push(*id));
1048
1049 if to_remove.is_empty() {
1050 return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1051 }
1052
1053 for bound_addr in to_remove {
1054 let entry = open_listeners
1055 .remove(&bound_addr)
1056 .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1057
1058 entry.abort_handle.abort();
1059 }
1060 }
1061
1062 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1063}
1064
1065async fn try_restricted_bind<F, S, Fut>(
1066 addrs: Vec<std::net::SocketAddr>,
1067 range_str: &str,
1068 binder: F,
1069) -> std::io::Result<S>
1070where
1071 F: Fn(Vec<std::net::SocketAddr>) -> Fut,
1072 Fut: Future<Output = std::io::Result<S>>,
1073{
1074 if addrs.is_empty() {
1075 return Err(std::io::Error::other("no valid socket addresses found"));
1076 }
1077
1078 let range = range_str
1079 .split_once(":")
1080 .and_then(
1081 |(a, b)| match u16::from_str(a).and_then(|a| Ok((a, u16::from_str(b)?))) {
1082 Ok((a, b)) if a <= b => Some(a..=b),
1083 _ => None,
1084 },
1085 )
1086 .ok_or(std::io::Error::other(format!("invalid port range {range_str}")))?;
1087
1088 for port in range {
1089 let addrs = addrs
1090 .iter()
1091 .map(|addr| std::net::SocketAddr::new(addr.ip(), port))
1092 .collect::<Vec<_>>();
1093 match binder(addrs).await {
1094 Ok(listener) => return Ok(listener),
1095 Err(error) => debug!(%error, "listen address not usable"),
1096 }
1097 }
1098
1099 Err(std::io::Error::new(
1100 std::io::ErrorKind::AddrNotAvailable,
1101 format!("no valid socket addresses found within range: {range_str}"),
1102 ))
1103}
1104
1105async fn tcp_listen_on<A: std::net::ToSocketAddrs>(address: A) -> std::io::Result<(std::net::SocketAddr, TcpListener)> {
1106 let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
1107
1108 if addrs.iter().all(|a| a.port() == 0) {
1111 if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
1112 let tcp_listener =
1113 try_restricted_bind(
1114 addrs,
1115 &range_str,
1116 |a| async move { TcpListener::bind(a.as_slice()).await },
1117 )
1118 .await?;
1119 return Ok((tcp_listener.local_addr()?, tcp_listener));
1120 }
1121 }
1122
1123 let tcp_listener = TcpListener::bind(addrs.as_slice()).await?;
1124 Ok((tcp_listener.local_addr()?, tcp_listener))
1125}
1126
1127async fn udp_bind_to<A: std::net::ToSocketAddrs>(
1128 address: A,
1129) -> std::io::Result<(std::net::SocketAddr, ConnectedUdpStream)> {
1130 let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
1131
1132 let builder = ConnectedUdpStream::builder()
1133 .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
1134 .with_foreign_data_mode(ForeignDataMode::Discard) .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1136 .with_receiver_parallelism(UdpStreamParallelism::Auto);
1137
1138 if addrs.iter().all(|a| a.port() == 0) {
1141 if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
1142 let udp_listener = try_restricted_bind(addrs, &range_str, |addrs| {
1143 futures::future::ready(builder.clone().build(addrs.as_slice()))
1144 })
1145 .await?;
1146
1147 return Ok((*udp_listener.bound_address(), udp_listener));
1148 }
1149 }
1150
1151 let udp_socket = builder.build(address)?;
1152 Ok((*udp_socket.bound_address(), udp_socket))
1153}
1154
1155async fn bind_session_to_stream<T>(mut session: HoprSession, mut stream: T, max_buf: usize)
1156where
1157 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
1158{
1159 let session_id = *session.id();
1160 match transfer_session(&mut session, &mut stream, max_buf).await {
1161 Ok((session_to_stream_bytes, stream_to_session_bytes)) => info!(
1162 session_id = ?session_id,
1163 session_to_stream_bytes, stream_to_session_bytes, "client session ended",
1164 ),
1165 Err(error) => error!(
1166 session_id = ?session_id,
1167 %error,
1168 "error during data transfer"
1169 ),
1170 }
1171}
1172
1173#[cfg(test)]
1174mod tests {
1175 use anyhow::Context;
1176 use futures::channel::mpsc::UnboundedSender;
1177 use hopr_crypto_types::crypto_traits::Randomizable;
1178 use hopr_lib::{ApplicationData, HoprPseudonym, SendMsg};
1179 use hopr_network_types::prelude::DestinationRouting;
1180 use hopr_transport_session::errors::TransportSessionError;
1181 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1182
1183 use super::*;
1184
1185 pub struct SendMsgResender {
1186 tx: UnboundedSender<Box<[u8]>>,
1187 }
1188
1189 impl SendMsgResender {
1190 pub fn new(tx: UnboundedSender<Box<[u8]>>) -> Self {
1191 Self { tx }
1192 }
1193 }
1194
1195 #[hopr_lib::async_trait]
1196 impl SendMsg for SendMsgResender {
1197 async fn send_message(
1199 &self,
1200 data: ApplicationData,
1201 _: DestinationRouting,
1202 ) -> std::result::Result<(), TransportSessionError> {
1203 self.tx
1204 .clone()
1205 .unbounded_send(data.plain_text)
1206 .map_err(|_| TransportSessionError::Closed)?;
1207
1208 Ok(())
1209 }
1210 }
1211
1212 #[tokio::test]
1213 async fn hoprd_session_connection_should_create_a_working_tcp_socket_through_which_data_can_be_sent_and_received()
1214 -> anyhow::Result<()> {
1215 let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
1216
1217 let session_id = hopr_lib::HoprSessionId::new(4567u64, HoprPseudonym::random());
1218 let peer: hopr_lib::Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
1219 let session = hopr_lib::HoprSession::new(
1220 session_id,
1221 hopr_lib::DestinationRouting::forward_only(
1222 peer,
1223 hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1224 ),
1225 SessionCapabilities::empty(),
1226 Arc::new(SendMsgResender::new(tx)),
1227 Box::pin(rx),
1228 None,
1229 );
1230
1231 let (bound_addr, tcp_listener) = tcp_listen_on(("127.0.0.1", 0)).await.context("listen_on failed")?;
1232
1233 tokio::task::spawn(async move {
1234 match tcp_listener.accept().await {
1235 Ok((stream, _)) => bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE).await,
1236 Err(e) => error!("failed to accept connection: {e}"),
1237 }
1238 });
1239
1240 let mut tcp_stream = tokio::net::TcpStream::connect(bound_addr)
1241 .await
1242 .context("connect failed")?;
1243
1244 let data = vec![b"hello", b"world", b"this ", b"is ", b" a", b" test"];
1245
1246 for d in data.clone().into_iter() {
1247 tcp_stream.write_all(d).await.context("write failed")?;
1248 }
1249
1250 for d in data.iter() {
1251 let mut buf = vec![0; d.len()];
1252 tcp_stream.read_exact(&mut buf).await.context("read failed")?;
1253 }
1254
1255 Ok(())
1256 }
1257
1258 #[tokio::test]
1259 async fn hoprd_session_connection_should_create_a_working_udp_socket_through_which_data_can_be_sent_and_received()
1260 -> anyhow::Result<()> {
1261 let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
1262
1263 let session_id = hopr_lib::HoprSessionId::new(4567u64, HoprPseudonym::random());
1264 let peer: hopr_lib::Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
1265 let session = hopr_lib::HoprSession::new(
1266 session_id,
1267 hopr_lib::DestinationRouting::forward_only(
1268 peer,
1269 hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1270 ),
1271 SessionCapabilities::empty(),
1272 Arc::new(SendMsgResender::new(tx)),
1273 Box::pin(rx),
1274 None,
1275 );
1276
1277 let (listen_addr, udp_listener) = udp_bind_to(("127.0.0.1", 0)).await.context("udp_bind_to failed")?;
1278
1279 tokio::task::spawn(bind_session_to_stream(
1280 session,
1281 udp_listener,
1282 ApplicationData::PAYLOAD_SIZE,
1283 ));
1284
1285 let mut udp_stream = ConnectedUdpStream::builder()
1286 .with_buffer_size(ApplicationData::PAYLOAD_SIZE)
1287 .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1288 .with_counterparty(listen_addr)
1289 .build(("127.0.0.1", 0))
1290 .context("bind failed")?;
1291
1292 let data = vec![b"hello", b"world", b"this ", b"is ", b" a", b" test"];
1293
1294 for d in data.clone().into_iter() {
1295 udp_stream.write_all(d).await.context("write failed")?;
1296 }
1297
1298 for d in data.iter() {
1299 let mut buf = vec![0; d.len()];
1300 udp_stream.read_exact(&mut buf).await.context("read failed")?;
1301 }
1302
1303 Ok(())
1304 }
1305
1306 #[test]
1307 fn test_build_binding_address() {
1308 let default = "10.0.0.1:10000".parse().unwrap();
1309
1310 let result = build_binding_host(Some("127.0.0.1:10000"), default);
1311 assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1312
1313 let result = build_binding_host(None, default);
1314 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1315
1316 let result = build_binding_host(Some("127.0.0.1"), default);
1317 assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1318
1319 let result = build_binding_host(Some(":1234"), default);
1320 assert_eq!(result, "10.0.0.1:1234".parse::<std::net::SocketAddr>().unwrap());
1321
1322 let result = build_binding_host(Some(":"), default);
1323 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1324
1325 let result = build_binding_host(Some(""), default);
1326 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1327 }
1328}