1use std::{
2 collections::VecDeque,
3 fmt::Formatter,
4 future::Future,
5 hash::Hash,
6 net::{IpAddr, SocketAddr},
7 num::NonZeroUsize,
8 str::FromStr,
9 sync::Arc,
10};
11
12use axum::{
13 Error,
14 extract::{
15 Json, Path, State,
16 ws::{Message, WebSocket, WebSocketUpgrade},
17 },
18 http::status::StatusCode,
19 response::IntoResponse,
20};
21use axum_extra::extract::Query;
22use base64::Engine;
23use dashmap::DashMap;
24use futures::{
25 AsyncReadExt, AsyncWriteExt, FutureExt, SinkExt, StreamExt, TryStreamExt,
26 future::{AbortHandle, AbortRegistration},
27};
28use futures_concurrency::stream::Merge;
29use hopr_lib::{
30 Address, Hopr, HoprSession, HoprSessionId, HoprTransportError, SESSION_MTU, SURB_SIZE, ServiceId,
31 SessionCapabilities, SessionClientConfig, SessionManagerError, SessionTarget, SurbBalancerConfig,
32 TransportSessionError, errors::HoprLibError, transfer_session,
33};
34use hopr_network_types::{
35 prelude::{ConnectedUdpStream, IpOrHost, SealedHost, UdpStreamParallelism},
36 udp::ForeignDataMode,
37 utils::AsyncReadStreamer,
38};
39use serde::{Deserialize, Serialize};
40use serde_with::{DisplayFromStr, serde_as};
41use tokio::net::TcpListener;
42use tracing::{debug, error, info, trace};
43
44use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState, ListenerId};
45
46pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
48
49pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
51
52pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
54
55#[cfg(all(feature = "prometheus", not(test)))]
56lazy_static::lazy_static! {
57 static ref METRIC_ACTIVE_CLIENTS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
58 "hopr_session_hoprd_clients",
59 "Number of clients connected at this Entry node",
60 &["type"]
61 ).unwrap();
62}
63
64#[allow(unused_imports)]
66use serde_json::json;
67
68#[serde_as]
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
70#[schema(
71 example = json!({"Plain": "example.com:80"}),
72 example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), example = json!({"Service": 0})
74)]
75pub enum SessionTargetSpec {
77 Plain(String),
78 Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
79 #[schema(value_type = u32)]
80 Service(ServiceId),
81}
82
83impl std::fmt::Display for SessionTargetSpec {
84 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85 match self {
86 SessionTargetSpec::Plain(t) => write!(f, "{t}"),
87 SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
88 SessionTargetSpec::Service(t) => write!(f, "#{t}"),
89 }
90 }
91}
92
93impl FromStr for SessionTargetSpec {
94 type Err = HoprLibError;
95
96 fn from_str(s: &str) -> Result<Self, Self::Err> {
97 Ok(if let Some(stripped) = s.strip_prefix("$$") {
98 Self::Sealed(
99 base64::prelude::BASE64_URL_SAFE
100 .decode(stripped)
101 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
102 )
103 } else if let Some(stripped) = s.strip_prefix("#") {
104 Self::Service(
105 stripped
106 .parse()
107 .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
108 )
109 } else {
110 Self::Plain(s.to_owned())
111 })
112 }
113}
114
115impl SessionTargetSpec {
116 pub fn into_target(self, protocol: IpProtocol) -> Result<SessionTarget, HoprLibError> {
117 Ok(match (protocol, self) {
118 (IpProtocol::TCP, SessionTargetSpec::Plain(plain)) => SessionTarget::TcpStream(
119 IpOrHost::from_str(&plain)
120 .map(SealedHost::from)
121 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
122 ),
123 (IpProtocol::UDP, SessionTargetSpec::Plain(plain)) => SessionTarget::UdpStream(
124 IpOrHost::from_str(&plain)
125 .map(SealedHost::from)
126 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
127 ),
128 (IpProtocol::TCP, SessionTargetSpec::Sealed(enc)) => {
129 SessionTarget::TcpStream(SealedHost::Sealed(enc.into_boxed_slice()))
130 }
131 (IpProtocol::UDP, SessionTargetSpec::Sealed(enc)) => {
132 SessionTarget::UdpStream(SealedHost::Sealed(enc.into_boxed_slice()))
133 }
134 (_, SessionTargetSpec::Service(id)) => SessionTarget::ExitNode(id),
135 })
136 }
137}
138
139#[derive(Debug)]
141pub struct StoredSessionEntry {
142 pub destination: Address,
144 pub target: SessionTargetSpec,
146 pub forward_path: RoutingOptions,
148 pub return_path: RoutingOptions,
150 pub max_client_sessions: usize,
152 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
154 pub response_buffer: Option<bytesize::ByteSize>,
157 pub session_pool: Option<usize>,
159 pub abort_handle: AbortHandle,
161
162 clients: Arc<DashMap<HoprSessionId, (SocketAddr, AbortHandle)>>,
163}
164
165#[repr(u8)]
166#[derive(
167 Debug,
168 Clone,
169 strum::EnumIter,
170 strum::Display,
171 strum::EnumString,
172 Serialize,
173 Deserialize,
174 PartialEq,
175 utoipa::ToSchema,
176)]
177#[schema(example = "Segmentation")]
178pub enum SessionCapability {
180 Segmentation,
182 Retransmission,
184 RetransmissionAckOnly,
186 NoDelay,
188 NoRateControl,
190}
191
192impl From<SessionCapability> for hopr_lib::SessionCapabilities {
193 fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
194 match cap {
195 SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
196 SessionCapability::Retransmission => {
197 hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
198 }
199 SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
200 SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
201 SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
202 }
203 }
204}
205
206#[serde_as]
207#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
208#[serde(rename_all = "camelCase")]
209pub(crate) struct SessionWebsocketClientQueryRequest {
210 #[serde_as(as = "DisplayFromStr")]
211 #[schema(required = true, value_type = String)]
212 pub destination: Address,
213 #[schema(required = true)]
214 pub hops: u8,
215 #[cfg(feature = "explicit-path")]
216 #[schema(required = false, value_type = String)]
217 pub path: Option<Vec<Address>>,
218 #[schema(required = true)]
219 #[serde_as(as = "Vec<DisplayFromStr>")]
220 pub capabilities: Vec<SessionCapability>,
221 #[schema(required = true)]
222 #[serde_as(as = "DisplayFromStr")]
223 pub target: SessionTargetSpec,
224 #[schema(required = false)]
225 #[serde(default = "default_protocol")]
226 pub protocol: IpProtocol,
227}
228
229#[inline]
230fn default_protocol() -> IpProtocol {
231 IpProtocol::TCP
232}
233
234impl SessionWebsocketClientQueryRequest {
235 pub(crate) async fn into_protocol_session_config(
236 self,
237 ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
238 #[cfg(not(feature = "explicit-path"))]
239 let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
240
241 #[cfg(feature = "explicit-path")]
242 let path_options = if let Some(path) = self.path {
243 hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
245 } else {
246 hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
247 };
248
249 let mut capabilities = SessionCapabilities::empty();
250 capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
251
252 Ok((
253 self.destination,
254 self.target.into_target(self.protocol)?,
255 SessionClientConfig {
256 forward_path_options: path_options.clone(),
257 return_path_options: path_options.clone(), capabilities,
259 ..Default::default()
260 },
261 ))
262 }
263}
264
265#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
266#[schema(value_type = String, format = Binary)]
267#[allow(dead_code)] struct WssData(Vec<u8>);
269
270#[allow(dead_code)] #[utoipa::path(
282 get,
283 path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
284 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
285 request_body(
286 content = SessionWebsocketClientQueryRequest,
287 content_type = "application/json",
288 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
289 ),
290 responses(
291 (status = 200, description = "Successfully created a new client websocket session."),
292 (status = 401, description = "Invalid authorization token.", body = ApiError),
293 (status = 422, description = "Unknown failure", body = ApiError),
294 (status = 429, description = "Too many open websocket connections.", body = ApiError),
295 ),
296 security(
297 ("api_token" = []),
298 ("bearer_token" = [])
299 ),
300 tag = "Session",
301 )]
302
303pub(crate) async fn websocket(
304 ws: WebSocketUpgrade,
305 Query(query): Query<SessionWebsocketClientQueryRequest>,
306 State(state): State<Arc<InternalState>>,
307) -> Result<impl IntoResponse, impl IntoResponse> {
308 let (dst, target, data) = query
309 .into_protocol_session_config()
310 .await
311 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
312
313 let hopr = state.hopr.clone();
314 let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
315 error!(error = %e, "Failed to establish session");
316 (
317 StatusCode::UNPROCESSABLE_ENTITY,
318 ApiErrorStatus::UnknownFailure(e.to_string()),
319 )
320 })?;
321
322 Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
323}
324
325enum WebSocketInput {
326 Network(Result<Box<[u8]>, std::io::Error>),
327 WsInput(Result<Message, Error>),
328}
329
330const WS_MAX_SESSION_READ_SIZE: usize = 4096;
332
333#[tracing::instrument(level = "debug", skip(socket, session))]
334async fn websocket_connection(socket: WebSocket, session: HoprSession) {
335 let session_id = *session.id();
336
337 let (rx, mut tx) = session.split();
338 let (mut sender, receiver) = socket.split();
339
340 let mut queue = (
341 receiver.map(WebSocketInput::WsInput),
342 AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
343 )
344 .merge();
345
346 let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
347
348 while let Some(v) = queue.next().await {
349 match v {
350 WebSocketInput::Network(bytes) => match bytes {
351 Ok(bytes) => {
352 let len = bytes.len();
353 if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
354 error!(
355 error = %e,
356 "Failed to emit read data onto the websocket, closing connection"
357 );
358 break;
359 };
360 bytes_from_session += len;
361 }
362 Err(e) => {
363 error!(
364 error = %e,
365 "Failed to push data from network to socket, closing connection"
366 );
367 break;
368 }
369 },
370 WebSocketInput::WsInput(ws_in) => match ws_in {
371 Ok(Message::Binary(data)) => {
372 let len = data.len();
373 if let Err(e) = tx.write(data.as_ref()).await {
374 error!(error = %e, "Failed to write data to the session, closing connection");
375 break;
376 }
377 bytes_to_session += len;
378 }
379 Ok(Message::Text(_)) => {
380 error!("Received string instead of binary data, closing connection");
381 break;
382 }
383 Ok(Message::Close(_)) => {
384 debug!("Received close frame, closing connection");
385 break;
386 }
387 Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
388 Err(e) => {
389 error!(error = %e, "Failed to get a valid websocket message, closing connection");
390 break;
391 }
392 },
393 }
394 }
395
396 info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
397}
398
399#[serde_as]
400#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
401#[schema(example = json!({ "Hops": 1 }))]
402pub enum RoutingOptions {
404 #[cfg(feature = "explicit-path")]
405 #[schema(value_type = Vec<String>)]
406 IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<Address>),
407 Hops(usize),
408}
409
410impl RoutingOptions {
411 pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
412 Ok(match self {
413 #[cfg(feature = "explicit-path")]
414 RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
415 RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
416 })
417 }
418}
419
420#[serde_as]
421#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
422#[schema(example = json!({
423 "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
424 "forwardPath": { "Hops": 1 },
425 "returnPath": { "Hops": 1 },
426 "target": {"Plain": "localhost:8080"},
427 "listenHost": "127.0.0.1:10000",
428 "capabilities": ["Retransmission", "Segmentation"],
429 "responseBuffer": "2 MB",
430 "maxSurbUpstream": "2000 kb/s",
431 "sessionPool": 0,
432 "maxClientSessions": 2
433 }))]
434#[serde(rename_all = "camelCase")]
435pub(crate) struct SessionClientRequest {
437 #[serde_as(as = "DisplayFromStr")]
439 #[schema(value_type = String)]
440 pub destination: Address,
441 pub forward_path: RoutingOptions,
443 pub return_path: RoutingOptions,
445 pub target: SessionTargetSpec,
447 pub listen_host: Option<String>,
452 #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
453 pub capabilities: Option<Vec<SessionCapability>>,
457 #[serde_as(as = "Option<DisplayFromStr>")]
467 #[schema(value_type = Option<String>)]
468 pub response_buffer: Option<bytesize::ByteSize>,
469 #[serde(default)]
477 #[serde(with = "human_bandwidth::option")]
478 #[schema(value_type = Option<String>)]
479 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
480 pub session_pool: Option<usize>,
487 pub max_client_sessions: Option<usize>,
497}
498
499impl SessionClientRequest {
500 pub(crate) async fn into_protocol_session_config(
501 self,
502 target_protocol: IpProtocol,
503 ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
504 Ok((
505 self.destination,
506 self.target.into_target(target_protocol)?,
507 SessionClientConfig {
508 forward_path_options: self.forward_path.resolve().await?,
509 return_path_options: self.return_path.resolve().await?,
510 capabilities: self
511 .capabilities
512 .map(|vs| {
513 let mut caps = SessionCapabilities::empty();
514 caps.extend(vs.into_iter().map(SessionCapabilities::from));
515 caps
516 })
517 .unwrap_or_else(|| match target_protocol {
518 IpProtocol::TCP => {
519 hopr_lib::SessionCapability::RetransmissionAck
520 | hopr_lib::SessionCapability::RetransmissionNack
521 | hopr_lib::SessionCapability::Segmentation
522 }
523 _ => SessionCapability::Segmentation.into(),
525 }),
526 surb_management: SessionConfig {
527 response_buffer: self.response_buffer,
528 max_surb_upstream: self.max_surb_upstream,
529 }
530 .into(),
531 ..Default::default()
532 },
533 ))
534 }
535}
536
537#[serde_as]
538#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
539#[schema(example = json!({
540 "target": "example.com:80",
541 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
542 "forwardPath": { "Hops": 1 },
543 "returnPath": { "Hops": 1 },
544 "protocol": "tcp",
545 "ip": "127.0.0.1",
546 "port": 5542,
547 "hoprMtu": 1002,
548 "surbLen": 398,
549 "activeClients": [],
550 "maxClientSessions": 2,
551 "maxSurbUpstream": "2000 kb/s",
552 "responseBuffer": "2 MB",
553 "sessionPool": 0
554 }))]
555#[serde(rename_all = "camelCase")]
556pub(crate) struct SessionClientResponse {
558 #[schema(example = "example.com:80")]
559 pub target: String,
561 #[serde_as(as = "DisplayFromStr")]
563 #[schema(value_type = String)]
564 pub destination: Address,
565 pub forward_path: RoutingOptions,
567 pub return_path: RoutingOptions,
569 #[serde_as(as = "DisplayFromStr")]
571 #[schema(example = "tcp")]
572 pub protocol: IpProtocol,
573 #[schema(example = "127.0.0.1")]
575 pub ip: String,
576 #[schema(example = 5542)]
577 pub port: u16,
579 pub hopr_mtu: usize,
581 pub surb_len: usize,
585 pub active_clients: Vec<String>,
590 pub max_client_sessions: usize,
595 #[serde(default)]
598 #[serde(with = "human_bandwidth::option")]
599 #[schema(value_type = Option<String>)]
600 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
601 #[serde_as(as = "Option<DisplayFromStr>")]
604 #[schema(value_type = Option<String>)]
605 pub response_buffer: Option<bytesize::ByteSize>,
606 pub session_pool: Option<usize>,
608}
609
610fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
615 match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
616 Some(Err(requested)) => {
617 debug!(requested, %default, "using partially default listen host");
619 std::net::SocketAddr::new(
620 requested.parse().unwrap_or(default.ip()),
621 requested
622 .strip_prefix(":")
623 .and_then(|p| u16::from_str(p).ok())
624 .unwrap_or(default.port()),
625 )
626 }
627 Some(Ok(requested)) => {
628 debug!(%requested, "using requested listen host");
629 requested
630 }
631 None => {
632 debug!(%default, "using default listen host");
633 default
634 }
635 }
636}
637
638struct SessionPool {
639 pool: Option<Arc<std::sync::Mutex<VecDeque<HoprSession>>>>,
640 ah: Option<AbortHandle>,
641}
642
643impl SessionPool {
644 pub const MAX_SESSION_POOL_SIZE: usize = 5;
645
646 async fn new(
647 size: usize,
648 dst: Address,
649 target: SessionTarget,
650 cfg: SessionClientConfig,
651 hopr: Arc<Hopr>,
652 ) -> Result<Self, (StatusCode, ApiErrorStatus)> {
653 let pool = Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(size)));
654 let hopr_clone = hopr.clone();
655 let pool_clone = pool.clone();
656 futures::stream::iter(0..size.min(Self::MAX_SESSION_POOL_SIZE))
657 .map(Ok)
658 .try_for_each_concurrent(Self::MAX_SESSION_POOL_SIZE, move |i| {
659 let pool = pool_clone.clone();
660 let hopr = hopr_clone.clone();
661 let target = target.clone();
662 let cfg = cfg.clone();
663 async move {
664 match hopr.connect_to(dst, target.clone(), cfg.clone()).await {
665 Ok(s) => {
666 debug!(session_id = %s.id(), num_session = i, "created a new session in pool");
667 pool.lock()
668 .map_err(|_| {
669 (
670 StatusCode::INTERNAL_SERVER_ERROR,
671 ApiErrorStatus::UnknownFailure("lock failed".into()),
672 )
673 })?
674 .push_back(s);
675 Ok(())
676 }
677 Err(error) => {
678 error!(%error, num_session = i, "failed to establish session for pool");
679 Err((
680 StatusCode::INTERNAL_SERVER_ERROR,
681 ApiErrorStatus::UnknownFailure(format!(
682 "failed to establish session #{i} in pool to {dst}: {error}"
683 )),
684 ))
685 }
686 }
687 }
688 })
689 .await?;
690
691 if !pool.lock().map(|p| p.is_empty()).unwrap_or(true) {
693 let pool_clone_1 = pool.clone();
694 let pool_clone_2 = pool.clone();
695 let pool_clone_3 = pool.clone();
696 Ok(Self {
697 pool: Some(pool),
698 ah: Some(hopr_async_runtime::spawn_as_abortable!(
699 futures_time::stream::interval(futures_time::time::Duration::from(
700 std::time::Duration::from_secs(1).max(hopr.config().session.idle_timeout / 2)
701 ))
702 .take_while(move |_| {
703 futures::future::ready(pool_clone_1.lock().is_ok_and(|p| !p.is_empty()))
705 })
706 .flat_map(move |_| {
707 let ids = pool_clone_2.lock().ok().map(|v| v.iter().map(|s| *s.id()).collect::<Vec<_>>());
709 futures::stream::iter(ids.into_iter().flatten())
710 })
711 .for_each(move |id| {
712 let hopr = hopr.clone();
713 let pool = pool_clone_3.clone();
714 async move {
715 if let Err(error) = hopr.keep_alive_session(&id).await {
717 error!(%error, %dst, session_id = %id, "session in pool is not alive, removing from pool");
718 if let Ok(mut pool) = pool.lock() {
719 pool.retain(|s| *s.id() != id);
720 }
721 }
722 }
723 })
724 ))
725 })
726 } else {
727 Ok(Self { pool: None, ah: None })
728 }
729 }
730
731 fn pop(&mut self) -> Option<HoprSession> {
732 self.pool.as_ref().and_then(|pool| pool.lock().ok()?.pop_front())
733 }
734}
735
736impl Drop for SessionPool {
737 fn drop(&mut self) {
738 if let Some(ah) = self.ah.take() {
739 ah.abort();
740 }
741 }
742}
743
744async fn create_tcp_client_binding(
745 bind_host: std::net::SocketAddr,
746 state: Arc<InternalState>,
747 args: SessionClientRequest,
748) -> Result<(std::net::SocketAddr, Option<HoprSessionId>, usize), (StatusCode, ApiErrorStatus)> {
749 let target_spec = args.target.clone();
750 let (dst, target, data) = args
751 .clone()
752 .into_protocol_session_config(IpProtocol::TCP)
753 .await
754 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
755
756 let (bound_host, tcp_listener) = tcp_listen_on(bind_host).await.map_err(|e| {
758 if e.kind() == std::io::ErrorKind::AddrInUse {
759 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
760 } else {
761 (
762 StatusCode::UNPROCESSABLE_ENTITY,
763 ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
764 )
765 }
766 })?;
767 info!(%bound_host, "TCP session listener bound");
768
769 let hopr = state.hopr.clone();
772
773 let session_pool_size = args.session_pool.unwrap_or(0);
775 let mut session_pool = SessionPool::new(session_pool_size, dst, target.clone(), data.clone(), hopr.clone()).await?;
776
777 let active_sessions = Arc::new(DashMap::new());
778 let mut max_clients = args.max_client_sessions.unwrap_or(5).max(1);
779
780 if max_clients < session_pool_size {
781 max_clients = session_pool_size;
782 }
783
784 let (abort_handle, abort_reg) = AbortHandle::new_pair();
786 let active_sessions_clone = active_sessions.clone();
787 hopr_async_runtime::prelude::spawn(async move {
788 let active_sessions_clone_2 = active_sessions_clone.clone();
789
790 futures::stream::Abortable::new(tokio_stream::wrappers::TcpListenerStream::new(tcp_listener), abort_reg)
791 .and_then(|sock| async { Ok((sock.peer_addr()?, sock)) })
792 .for_each(move |accepted_client| {
793 let data = data.clone();
794 let target = target.clone();
795 let hopr = hopr.clone();
796 let active_sessions = active_sessions_clone_2.clone();
797
798 let maybe_pooled_session = accepted_client.is_ok().then(|| session_pool.pop()).flatten();
800 async move {
801 match accepted_client {
802 Ok((sock_addr, mut stream)) => {
803 debug!(?sock_addr, "incoming TCP connection");
804
805 if active_sessions.len() >= max_clients {
808 error!(?bind_host, "no more client slots available at listener");
809 use tokio::io::AsyncWriteExt;
810 if let Err(error) = stream.shutdown().await {
811 error!(%error, ?sock_addr, "failed to shutdown TCP connection");
812 }
813 return;
814 }
815
816 let session = match maybe_pooled_session {
818 Some(s) => {
819 debug!(session_id = %s.id(), "using pooled session");
820 s
821 }
822 None => {
823 debug!("no more active sessions in the pool, creating a new one");
824 match hopr.connect_to(dst, target, data).await {
825 Ok(s) => s,
826 Err(error) => {
827 error!(%error, "failed to establish session");
828 return;
829 }
830 }
831 }
832 };
833
834 let session_id = *session.id();
835 debug!(?sock_addr, %session_id, "new session for incoming TCP connection");
836
837 let (abort_handle, abort_reg) = AbortHandle::new_pair();
838 active_sessions.insert(session_id, (sock_addr, abort_handle));
839
840 #[cfg(all(feature = "prometheus", not(test)))]
841 METRIC_ACTIVE_CLIENTS.increment(&["tcp"], 1.0);
842
843 hopr_async_runtime::prelude::spawn(
844 bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, Some(abort_reg)).then(
847 move |_| async move {
848 active_sessions.remove(&session_id);
851
852 debug!(%session_id, "tcp session has ended");
853
854 #[cfg(all(feature = "prometheus", not(test)))]
855 METRIC_ACTIVE_CLIENTS.decrement(&["tcp"], 1.0);
856 },
857 ),
858 );
859 }
860 Err(error) => error!(%error, "failed to accept connection"),
861 }
862 }
863 })
864 .await;
865
866 active_sessions_clone.iter().for_each(|entry| {
868 let (sock_addr, handle) = entry.value();
869 debug!(session_id = %entry.key(), ?sock_addr, "aborting opened TCP session after listener has been closed");
870 handle.abort()
871 });
872 });
873
874 state.open_listeners.write_arc().await.insert(
875 ListenerId(hopr_network_types::types::IpProtocol::TCP, bound_host),
876 StoredSessionEntry {
877 destination: dst,
878 target: target_spec.clone(),
879 forward_path: args.forward_path.clone(),
880 return_path: args.return_path.clone(),
881 clients: active_sessions,
882 max_client_sessions: max_clients,
883 max_surb_upstream: args.max_surb_upstream,
884 response_buffer: args.response_buffer,
885 session_pool: Some(session_pool_size),
886 abort_handle,
887 },
888 );
889 Ok((bound_host, None, max_clients))
890}
891
892async fn create_udp_client_binding(
893 bind_host: std::net::SocketAddr,
894 state: Arc<InternalState>,
895 args: SessionClientRequest,
896) -> Result<(std::net::SocketAddr, Option<HoprSessionId>, usize), (StatusCode, ApiErrorStatus)> {
897 let target_spec = args.target.clone();
898 let (dst, target, data) = args
899 .clone()
900 .into_protocol_session_config(IpProtocol::UDP)
901 .await
902 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
903
904 let (bound_host, udp_socket) = udp_bind_to(bind_host).await.map_err(|e| {
906 if e.kind() == std::io::ErrorKind::AddrInUse {
907 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
908 } else {
909 (
910 StatusCode::UNPROCESSABLE_ENTITY,
911 ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
912 )
913 }
914 })?;
915
916 info!(%bound_host, "UDP session listener bound");
917
918 let hopr = state.hopr.clone();
919
920 let session = hopr.connect_to(dst, target, data.clone()).await.map_err(|e| {
922 (
923 StatusCode::UNPROCESSABLE_ENTITY,
924 ApiErrorStatus::UnknownFailure(e.to_string()),
925 )
926 })?;
927
928 let open_listeners_clone = state.open_listeners.clone();
929 let listener_id = ListenerId(hopr_network_types::types::IpProtocol::UDP, bound_host);
930
931 let (abort_handle, abort_reg) = AbortHandle::new_pair();
942 let clients = Arc::new(DashMap::new());
943 let max_clients: usize = 1; let session_id = *session.id();
947 clients.insert(session_id, (bind_host, abort_handle.clone()));
948 hopr_async_runtime::prelude::spawn(async move {
949 #[cfg(all(feature = "prometheus", not(test)))]
950 METRIC_ACTIVE_CLIENTS.increment(&["udp"], 1.0);
951
952 bind_session_to_stream(session, udp_socket, HOPR_UDP_BUFFER_SIZE, Some(abort_reg)).await;
953
954 #[cfg(all(feature = "prometheus", not(test)))]
955 METRIC_ACTIVE_CLIENTS.decrement(&["udp"], 1.0);
956
957 open_listeners_clone.write_arc().await.remove(&listener_id);
959 });
960
961 state.open_listeners.write_arc().await.insert(
962 listener_id,
963 StoredSessionEntry {
964 destination: dst,
965 target: target_spec.clone(),
966 forward_path: args.forward_path.clone(),
967 return_path: args.return_path.clone(),
968 max_client_sessions: max_clients,
969 max_surb_upstream: args.max_surb_upstream,
970 response_buffer: args.response_buffer,
971 session_pool: None,
972 abort_handle,
973 clients,
974 },
975 );
976 Ok((bound_host, Some(session_id), max_clients))
977}
978
979#[utoipa::path(
996 post,
997 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
998 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
999 params(
1000 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
1001 ),
1002 request_body(
1003 content = SessionClientRequest,
1004 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
1005 content_type = "application/json"),
1006 responses(
1007 (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
1008 (status = 400, description = "Invalid IP protocol.", body = ApiError),
1009 (status = 401, description = "Invalid authorization token.", body = ApiError),
1010 (status = 409, description = "Listening address and port already in use.", body = ApiError),
1011 (status = 422, description = "Unknown failure", body = ApiError),
1012 ),
1013 security(
1014 ("api_token" = []),
1015 ("bearer_token" = [])
1016 ),
1017 tag = "Session"
1018 )]
1019pub(crate) async fn create_client(
1020 State(state): State<Arc<InternalState>>,
1021 Path(protocol): Path<IpProtocol>,
1022 Json(args): Json<SessionClientRequest>,
1023) -> Result<impl IntoResponse, impl IntoResponse> {
1024 let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
1025
1026 let listener_id = ListenerId(protocol.into(), bind_host);
1027 if bind_host.port() > 0 && state.open_listeners.read_arc().await.contains_key(&listener_id) {
1028 return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
1029 }
1030
1031 debug!("binding {protocol} session listening socket to {bind_host}");
1032 let (bound_host, udp_session_id, max_clients) = match protocol {
1033 IpProtocol::TCP => create_tcp_client_binding(bind_host, state.clone(), args.clone()).await?,
1034 IpProtocol::UDP => create_udp_client_binding(bind_host, state.clone(), args.clone()).await?,
1035 };
1036
1037 Ok::<_, (StatusCode, ApiErrorStatus)>(
1038 (
1039 StatusCode::OK,
1040 Json(SessionClientResponse {
1041 protocol,
1042 ip: bound_host.ip().to_string(),
1043 port: bound_host.port(),
1044 target: args.target.to_string(),
1045 destination: args.destination,
1046 forward_path: args.forward_path.clone(),
1047 return_path: args.return_path.clone(),
1048 hopr_mtu: SESSION_MTU,
1049 surb_len: SURB_SIZE,
1050 active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
1051 max_client_sessions: max_clients,
1052 max_surb_upstream: args.max_surb_upstream,
1053 response_buffer: args.response_buffer,
1054 session_pool: args.session_pool,
1055 }),
1056 )
1057 .into_response(),
1058 )
1059}
1060
1061#[utoipa::path(
1063 get,
1064 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
1065 description = "Lists existing Session listeners for the given IP protocol.",
1066 params(
1067 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
1068 ),
1069 responses(
1070 (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
1071 {
1072 "target": "example.com:80",
1073 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
1074 "forwardPath": { "Hops": 1 },
1075 "returnPath": { "Hops": 1 },
1076 "protocol": "tcp",
1077 "ip": "127.0.0.1",
1078 "port": 5542,
1079 "surbLen": 400,
1080 "hoprMtu": 1020,
1081 "activeClients": [],
1082 "maxClientSessions": 2,
1083 "maxSurbUpstream": "2000 kb/s",
1084 "responseBuffer": "2 MB",
1085 "sessionPool": 0
1086 }
1087 ])),
1088 (status = 400, description = "Invalid IP protocol.", body = ApiError),
1089 (status = 401, description = "Invalid authorization token.", body = ApiError),
1090 (status = 422, description = "Unknown failure", body = ApiError)
1091 ),
1092 security(
1093 ("api_token" = []),
1094 ("bearer_token" = [])
1095 ),
1096 tag = "Session",
1097)]
1098pub(crate) async fn list_clients(
1099 State(state): State<Arc<InternalState>>,
1100 Path(protocol): Path<IpProtocol>,
1101) -> Result<impl IntoResponse, impl IntoResponse> {
1102 let response = state
1103 .open_listeners
1104 .read_arc()
1105 .await
1106 .iter()
1107 .filter(|(id, _)| id.0 == protocol.into())
1108 .map(|(id, entry)| SessionClientResponse {
1109 protocol,
1110 ip: id.1.ip().to_string(),
1111 port: id.1.port(),
1112 target: entry.target.to_string(),
1113 forward_path: entry.forward_path.clone(),
1114 return_path: entry.return_path.clone(),
1115 destination: entry.destination,
1116 hopr_mtu: SESSION_MTU,
1117 surb_len: SURB_SIZE,
1118 active_clients: entry.clients.iter().map(|e| e.key().to_string()).collect(),
1119 max_client_sessions: entry.max_client_sessions,
1120 max_surb_upstream: entry.max_surb_upstream,
1121 response_buffer: entry.response_buffer,
1122 session_pool: entry.session_pool,
1123 })
1124 .collect::<Vec<_>>();
1125
1126 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
1127}
1128
1129#[serde_as]
1130#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
1131#[schema(example = json!({
1132 "responseBuffer": "2 MB",
1133 "maxSurbUpstream": "2 Mbps"
1134 }))]
1135#[serde(rename_all = "camelCase")]
1136pub(crate) struct SessionConfig {
1137 #[serde(default)]
1147 #[serde_as(as = "Option<DisplayFromStr>")]
1148 #[schema(value_type = String)]
1149 pub response_buffer: Option<bytesize::ByteSize>,
1150 #[serde(default)]
1158 #[serde(with = "human_bandwidth::option")]
1159 #[schema(value_type = String)]
1160 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
1161}
1162
1163impl From<SessionConfig> for Option<SurbBalancerConfig> {
1164 fn from(value: SessionConfig) -> Self {
1165 match value.response_buffer {
1166 Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
1168 target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
1169 max_surbs_per_sec: value
1170 .max_surb_upstream
1171 .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
1172 .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
1173 ..Default::default()
1174 }),
1175 Some(_) => None,
1177 None => Some(SurbBalancerConfig::default()),
1179 }
1180 }
1181}
1182
1183impl From<SurbBalancerConfig> for SessionConfig {
1184 fn from(value: SurbBalancerConfig) -> Self {
1185 Self {
1186 response_buffer: Some(bytesize::ByteSize::b(
1187 value.target_surb_buffer_size * SESSION_MTU as u64,
1188 )),
1189 max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
1190 value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
1191 )),
1192 }
1193 }
1194}
1195
1196#[utoipa::path(
1197 post,
1198 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
1199 description = "Updates configuration of an existing active session.",
1200 params(
1201 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
1202 ),
1203 request_body(
1204 content = SessionConfig,
1205 description = "Allows updating of several parameters of an existing active session.",
1206 content_type = "application/json"),
1207 responses(
1208 (status = 204, description = "Successfully updated the configuration"),
1209 (status = 400, description = "Invalid configuration.", body = ApiError),
1210 (status = 401, description = "Invalid authorization token.", body = ApiError),
1211 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
1212 (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
1213 (status = 422, description = "Unknown failure", body = ApiError),
1214 ),
1215 security(
1216 ("api_token" = []),
1217 ("bearer_token" = [])
1218 ),
1219 tag = "Session"
1220)]
1221pub(crate) async fn adjust_session(
1222 State(state): State<Arc<InternalState>>,
1223 Path(session_id): Path<String>,
1224 Json(args): Json<SessionConfig>,
1225) -> Result<impl IntoResponse, impl IntoResponse> {
1226 let session_id = HoprSessionId::from_str(&session_id)
1227 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
1228
1229 if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
1230 match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
1231 Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
1232 Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
1233 SessionManagerError::NonExistingSession,
1234 )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
1235 Err(e) => Err((
1236 StatusCode::NOT_ACCEPTABLE,
1237 ApiErrorStatus::UnknownFailure(e.to_string()),
1238 )),
1239 }
1240 } else {
1241 Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
1242 }
1243}
1244
1245#[utoipa::path(
1246 get,
1247 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
1248 description = "Gets configuration of an existing active session.",
1249 params(
1250 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
1251 ),
1252 responses(
1253 (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
1254 (status = 400, description = "Invalid session ID.", body = ApiError),
1255 (status = 401, description = "Invalid authorization token.", body = ApiError),
1256 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
1257 (status = 422, description = "Unknown failure", body = ApiError),
1258 ),
1259 security(
1260 ("api_token" = []),
1261 ("bearer_token" = [])
1262 ),
1263 tag = "Session"
1264)]
1265pub(crate) async fn session_config(
1266 State(state): State<Arc<InternalState>>,
1267 Path(session_id): Path<String>,
1268) -> Result<impl IntoResponse, impl IntoResponse> {
1269 let session_id = HoprSessionId::from_str(&session_id)
1270 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
1271
1272 match state.hopr.get_session_surb_balancer_config(&session_id).await {
1273 Ok(Some(cfg)) => {
1274 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
1275 }
1276 Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
1277 Err(e) => Err((
1278 StatusCode::UNPROCESSABLE_ENTITY,
1279 ApiErrorStatus::UnknownFailure(e.to_string()),
1280 )),
1281 }
1282}
1283
1284#[derive(
1285 Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
1286)]
1287#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
1288#[serde(rename_all = "lowercase")]
1289#[schema(example = "tcp")]
1290pub enum IpProtocol {
1292 #[allow(clippy::upper_case_acronyms)]
1293 TCP,
1294 #[allow(clippy::upper_case_acronyms)]
1295 UDP,
1296}
1297
1298impl From<IpProtocol> for hopr_lib::IpProtocol {
1299 fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
1300 match protocol {
1301 IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
1302 IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
1303 }
1304 }
1305}
1306
1307#[serde_as]
1308#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
1309pub struct SessionCloseClientQuery {
1310 #[serde_as(as = "DisplayFromStr")]
1311 #[schema(value_type = String, example = "tcp")]
1312 pub protocol: IpProtocol,
1314
1315 #[schema(example = "127.0.0.1:8545")]
1317 pub ip: String,
1318
1319 #[schema(value_type = u16, example = 10101)]
1321 pub port: u16,
1322}
1323
1324#[utoipa::path(
1330 delete,
1331 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
1332 description = "Closes an existing Session listener.",
1333 params(SessionCloseClientQuery),
1334 responses(
1335 (status = 204, description = "Listener closed successfully"),
1336 (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
1337 (status = 401, description = "Invalid authorization token.", body = ApiError),
1338 (status = 404, description = "Listener not found.", body = ApiError),
1339 (status = 422, description = "Unknown failure", body = ApiError)
1340 ),
1341 security(
1342 ("api_token" = []),
1343 ("bearer_token" = [])
1344 ),
1345 tag = "Session",
1346)]
1347pub(crate) async fn close_client(
1348 State(state): State<Arc<InternalState>>,
1349 Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
1350) -> Result<impl IntoResponse, impl IntoResponse> {
1351 let listening_ip: IpAddr = ip
1352 .parse()
1353 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
1354
1355 {
1356 let mut open_listeners = state.open_listeners.write_arc().await;
1357
1358 let mut to_remove = Vec::new();
1359
1360 open_listeners
1362 .iter()
1363 .filter(|(ListenerId(proto, addr), _)| {
1364 let protocol: hopr_lib::IpProtocol = protocol.into();
1365 protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1366 })
1367 .for_each(|(id, _)| to_remove.push(*id));
1368
1369 if to_remove.is_empty() {
1370 return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1371 }
1372
1373 for bound_addr in to_remove {
1374 let entry = open_listeners
1375 .remove(&bound_addr)
1376 .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1377
1378 entry.abort_handle.abort();
1379 }
1380 }
1381
1382 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1383}
1384
1385async fn try_restricted_bind<F, S, Fut>(
1386 addrs: Vec<std::net::SocketAddr>,
1387 range_str: &str,
1388 binder: F,
1389) -> std::io::Result<S>
1390where
1391 F: Fn(Vec<std::net::SocketAddr>) -> Fut,
1392 Fut: Future<Output = std::io::Result<S>>,
1393{
1394 if addrs.is_empty() {
1395 return Err(std::io::Error::other("no valid socket addresses found"));
1396 }
1397
1398 let range = range_str
1399 .split_once(":")
1400 .and_then(
1401 |(a, b)| match u16::from_str(a).and_then(|a| Ok((a, u16::from_str(b)?))) {
1402 Ok((a, b)) if a <= b => Some(a..=b),
1403 _ => None,
1404 },
1405 )
1406 .ok_or(std::io::Error::other(format!("invalid port range {range_str}")))?;
1407
1408 for port in range {
1409 let addrs = addrs
1410 .iter()
1411 .map(|addr| std::net::SocketAddr::new(addr.ip(), port))
1412 .collect::<Vec<_>>();
1413 match binder(addrs).await {
1414 Ok(listener) => return Ok(listener),
1415 Err(error) => debug!(%error, "listen address not usable"),
1416 }
1417 }
1418
1419 Err(std::io::Error::new(
1420 std::io::ErrorKind::AddrNotAvailable,
1421 format!("no valid socket addresses found within range: {range_str}"),
1422 ))
1423}
1424
1425async fn tcp_listen_on<A: std::net::ToSocketAddrs>(address: A) -> std::io::Result<(std::net::SocketAddr, TcpListener)> {
1426 let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
1427
1428 if addrs.iter().all(|a| a.port() == 0) {
1431 if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
1432 let tcp_listener =
1433 try_restricted_bind(
1434 addrs,
1435 &range_str,
1436 |a| async move { TcpListener::bind(a.as_slice()).await },
1437 )
1438 .await?;
1439 return Ok((tcp_listener.local_addr()?, tcp_listener));
1440 }
1441 }
1442
1443 let tcp_listener = TcpListener::bind(addrs.as_slice()).await?;
1444 Ok((tcp_listener.local_addr()?, tcp_listener))
1445}
1446
1447async fn udp_bind_to<A: std::net::ToSocketAddrs>(
1448 address: A,
1449) -> std::io::Result<(std::net::SocketAddr, ConnectedUdpStream)> {
1450 let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
1451
1452 let builder = ConnectedUdpStream::builder()
1453 .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
1454 .with_foreign_data_mode(ForeignDataMode::Discard) .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1456 .with_receiver_parallelism(
1457 std::env::var("HOPRD_SESSION_ENTRY_UDP_RX_PARALLELISM")
1458 .ok()
1459 .and_then(|s| s.parse::<NonZeroUsize>().ok())
1460 .map(UdpStreamParallelism::Specific)
1461 .unwrap_or(UdpStreamParallelism::Auto),
1462 );
1463
1464 if addrs.iter().all(|a| a.port() == 0) {
1467 if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
1468 let udp_listener = try_restricted_bind(addrs, &range_str, |addrs| {
1469 futures::future::ready(builder.clone().build(addrs.as_slice()))
1470 })
1471 .await?;
1472
1473 return Ok((*udp_listener.bound_address(), udp_listener));
1474 }
1475 }
1476
1477 let udp_socket = builder.build(address)?;
1478 Ok((*udp_socket.bound_address(), udp_socket))
1479}
1480
1481async fn bind_session_to_stream<T>(
1482 mut session: HoprSession,
1483 mut stream: T,
1484 max_buf: usize,
1485 abort_reg: Option<AbortRegistration>,
1486) where
1487 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
1488{
1489 let session_id = *session.id();
1490 match transfer_session(&mut session, &mut stream, max_buf, abort_reg).await {
1491 Ok((session_to_stream_bytes, stream_to_session_bytes)) => info!(
1492 session_id = ?session_id,
1493 session_to_stream_bytes, stream_to_session_bytes, "client session ended",
1494 ),
1495 Err(error) => error!(
1496 session_id = ?session_id,
1497 %error,
1498 "error during data transfer"
1499 ),
1500 }
1501}
1502
1503#[cfg(test)]
1504mod tests {
1505 use anyhow::Context;
1506 use futures::{
1507 FutureExt, StreamExt,
1508 channel::mpsc::{UnboundedReceiver, UnboundedSender},
1509 };
1510 use futures_time::future::FutureExt as TimeFutureExt;
1511 use hopr_crypto_types::crypto_traits::Randomizable;
1512 use hopr_lib::{ApplicationData, ApplicationDataIn, ApplicationDataOut, HoprPseudonym};
1513 use hopr_network_types::prelude::DestinationRouting;
1514 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1515
1516 use super::*;
1517
1518 fn loopback_transport() -> (
1519 UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
1520 UnboundedReceiver<ApplicationDataIn>,
1521 ) {
1522 let (input_tx, input_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
1523 let (output_tx, output_rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1524 tokio::task::spawn(
1525 input_rx
1526 .map(|(_, data)| {
1527 Ok(ApplicationDataIn {
1528 data: data.data,
1529 packet_info: Default::default(),
1530 })
1531 })
1532 .forward(output_tx)
1533 .map(|e| tracing::debug!(?e, "loopback transport completed")),
1534 );
1535
1536 (input_tx, output_rx)
1537 }
1538
1539 #[tokio::test]
1540 async fn hoprd_session_connection_should_create_a_working_tcp_socket_through_which_data_can_be_sent_and_received()
1541 -> anyhow::Result<()> {
1542 let session_id = hopr_lib::HoprSessionId::new(4567u64, HoprPseudonym::random());
1543 let peer: hopr_lib::Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
1544 let session = hopr_lib::HoprSession::new(
1545 session_id,
1546 hopr_lib::DestinationRouting::forward_only(
1547 peer,
1548 hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1549 ),
1550 Default::default(),
1551 loopback_transport(),
1552 None,
1553 )?;
1554
1555 let (bound_addr, tcp_listener) = tcp_listen_on(("127.0.0.1", 0)).await.context("listen_on failed")?;
1556
1557 tokio::task::spawn(async move {
1558 match tcp_listener.accept().await {
1559 Ok((stream, _)) => bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, None).await,
1560 Err(e) => error!("failed to accept connection: {e}"),
1561 }
1562 });
1563
1564 let mut tcp_stream = tokio::net::TcpStream::connect(bound_addr)
1565 .await
1566 .context("connect failed")?;
1567
1568 let data = vec![b"hello", b"world", b"this ", b"is ", b" a", b" test"];
1569
1570 for d in data.clone().into_iter() {
1571 tcp_stream.write_all(d).await.context("write failed")?;
1572 }
1573
1574 for d in data.iter() {
1575 let mut buf = vec![0; d.len()];
1576 tcp_stream.read_exact(&mut buf).await.context("read failed")?;
1577 }
1578
1579 Ok(())
1580 }
1581
1582 #[test_log::test(tokio::test)]
1583 async fn hoprd_session_connection_should_create_a_working_udp_socket_through_which_data_can_be_sent_and_received()
1584 -> anyhow::Result<()> {
1585 let session_id = hopr_lib::HoprSessionId::new(4567u64, HoprPseudonym::random());
1586 let peer: hopr_lib::Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
1587 let session = hopr_lib::HoprSession::new(
1588 session_id,
1589 hopr_lib::DestinationRouting::forward_only(
1590 peer,
1591 hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1592 ),
1593 Default::default(),
1594 loopback_transport(),
1595 None,
1596 )?;
1597
1598 let (listen_addr, udp_listener) = udp_bind_to(("127.0.0.1", 0)).await.context("udp_bind_to failed")?;
1599
1600 let (abort_handle, abort_registration) = AbortHandle::new_pair();
1601 let jh = tokio::task::spawn(bind_session_to_stream(
1602 session,
1603 udp_listener,
1604 ApplicationData::PAYLOAD_SIZE,
1605 Some(abort_registration),
1606 ));
1607
1608 let mut udp_stream = ConnectedUdpStream::builder()
1609 .with_buffer_size(ApplicationData::PAYLOAD_SIZE)
1610 .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1611 .with_counterparty(listen_addr)
1612 .build(("127.0.0.1", 0))
1613 .context("bind failed")?;
1614
1615 let data = vec![b"hello", b"world", b"this ", b"is ", b" a", b" test"];
1616
1617 for d in data.clone().into_iter() {
1618 udp_stream.write_all(d).await.context("write failed")?;
1619 }
1621
1622 for d in data.iter() {
1623 let mut buf = vec![0; d.len()];
1624 udp_stream.read_exact(&mut buf).await.context("read failed")?;
1625 }
1626
1627 abort_handle.abort();
1629 jh.timeout(futures_time::time::Duration::from_millis(200)).await??;
1630
1631 Ok(())
1632 }
1633
1634 #[test]
1635 fn test_build_binding_address() {
1636 let default = "10.0.0.1:10000".parse().unwrap();
1637
1638 let result = build_binding_host(Some("127.0.0.1:10000"), default);
1639 assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1640
1641 let result = build_binding_host(None, default);
1642 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1643
1644 let result = build_binding_host(Some("127.0.0.1"), default);
1645 assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1646
1647 let result = build_binding_host(Some(":1234"), default);
1648 assert_eq!(result, "10.0.0.1:1234".parse::<std::net::SocketAddr>().unwrap());
1649
1650 let result = build_binding_host(Some(":"), default);
1651 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1652
1653 let result = build_binding_host(Some(""), default);
1654 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1655 }
1656}