1use std::{
2 collections::VecDeque,
3 fmt::Formatter,
4 future::Future,
5 hash::Hash,
6 net::{IpAddr, SocketAddr},
7 str::FromStr,
8 sync::Arc,
9};
10
11use axum::{
12 Error,
13 extract::{
14 Json, Path, State,
15 ws::{Message, WebSocket, WebSocketUpgrade},
16 },
17 http::status::StatusCode,
18 response::IntoResponse,
19};
20use axum_extra::extract::Query;
21use base64::Engine;
22use dashmap::DashMap;
23use futures::{
24 AsyncReadExt, AsyncWriteExt, FutureExt, SinkExt, StreamExt, TryStreamExt,
25 future::{AbortHandle, AbortRegistration},
26};
27use futures_concurrency::stream::Merge;
28use hopr_lib::{
29 Address, Hopr, HoprSession, HoprSessionId, HoprTransportError, SESSION_MTU, SURB_SIZE, ServiceId,
30 SessionCapabilities, SessionClientConfig, SessionManagerError, SessionTarget, SurbBalancerConfig,
31 TransportSessionError, errors::HoprLibError, transfer_session,
32};
33use hopr_network_types::{
34 prelude::{ConnectedUdpStream, IpOrHost, SealedHost, UdpStreamParallelism},
35 udp::ForeignDataMode,
36 utils::AsyncReadStreamer,
37};
38use serde::{Deserialize, Serialize};
39use serde_with::{DisplayFromStr, serde_as};
40use tokio::net::TcpListener;
41use tracing::{debug, error, info, trace};
42
43use crate::{ApiError, ApiErrorStatus, BASE_PATH, InternalState, ListenerId};
44
45pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
47
48pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
50
51pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
53
54#[cfg(all(feature = "prometheus", not(test)))]
55lazy_static::lazy_static! {
56 static ref METRIC_ACTIVE_CLIENTS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
57 "hopr_session_hoprd_clients",
58 "Number of clients connected at this Entry node",
59 &["type"]
60 ).unwrap();
61}
62
63#[allow(unused_imports)]
65use serde_json::json;
66
67#[serde_as]
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
69#[schema(
70 example = json!({"Plain": "example.com:80"}),
71 example = json!({"Sealed": "SGVsbG9Xb3JsZA"}), example = json!({"Service": 0})
73)]
74pub enum SessionTargetSpec {
76 Plain(String),
77 Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
78 #[schema(value_type = u32)]
79 Service(ServiceId),
80}
81
82impl std::fmt::Display for SessionTargetSpec {
83 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
84 match self {
85 SessionTargetSpec::Plain(t) => write!(f, "{t}"),
86 SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
87 SessionTargetSpec::Service(t) => write!(f, "#{t}"),
88 }
89 }
90}
91
92impl FromStr for SessionTargetSpec {
93 type Err = HoprLibError;
94
95 fn from_str(s: &str) -> Result<Self, Self::Err> {
96 Ok(if let Some(stripped) = s.strip_prefix("$$") {
97 Self::Sealed(
98 base64::prelude::BASE64_URL_SAFE
99 .decode(stripped)
100 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
101 )
102 } else if let Some(stripped) = s.strip_prefix("#") {
103 Self::Service(
104 stripped
105 .parse()
106 .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
107 )
108 } else {
109 Self::Plain(s.to_owned())
110 })
111 }
112}
113
114impl SessionTargetSpec {
115 pub fn into_target(self, protocol: IpProtocol) -> Result<SessionTarget, HoprLibError> {
116 Ok(match (protocol, self) {
117 (IpProtocol::TCP, SessionTargetSpec::Plain(plain)) => SessionTarget::TcpStream(
118 IpOrHost::from_str(&plain)
119 .map(SealedHost::from)
120 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
121 ),
122 (IpProtocol::UDP, SessionTargetSpec::Plain(plain)) => SessionTarget::UdpStream(
123 IpOrHost::from_str(&plain)
124 .map(SealedHost::from)
125 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
126 ),
127 (IpProtocol::TCP, SessionTargetSpec::Sealed(enc)) => {
128 SessionTarget::TcpStream(SealedHost::Sealed(enc.into_boxed_slice()))
129 }
130 (IpProtocol::UDP, SessionTargetSpec::Sealed(enc)) => {
131 SessionTarget::UdpStream(SealedHost::Sealed(enc.into_boxed_slice()))
132 }
133 (_, SessionTargetSpec::Service(id)) => SessionTarget::ExitNode(id),
134 })
135 }
136}
137
138#[derive(Debug)]
140pub struct StoredSessionEntry {
141 pub destination: Address,
143 pub target: SessionTargetSpec,
145 pub forward_path: RoutingOptions,
147 pub return_path: RoutingOptions,
149 pub abort_handle: AbortHandle,
151
152 clients: Arc<DashMap<HoprSessionId, (SocketAddr, AbortHandle)>>,
153}
154
155#[repr(u8)]
156#[derive(
157 Debug,
158 Clone,
159 strum::EnumIter,
160 strum::Display,
161 strum::EnumString,
162 Serialize,
163 Deserialize,
164 PartialEq,
165 utoipa::ToSchema,
166)]
167#[schema(example = "Segmentation")]
168pub enum SessionCapability {
170 Segmentation,
172 Retransmission,
174 RetransmissionAckOnly,
176 NoDelay,
178 NoRateControl,
180}
181
182impl From<SessionCapability> for hopr_lib::SessionCapabilities {
183 fn from(cap: SessionCapability) -> hopr_lib::SessionCapabilities {
184 match cap {
185 SessionCapability::Segmentation => hopr_lib::SessionCapability::Segmentation.into(),
186 SessionCapability::Retransmission => {
187 hopr_lib::SessionCapability::RetransmissionNack | hopr_lib::SessionCapability::RetransmissionAck
188 }
189 SessionCapability::RetransmissionAckOnly => hopr_lib::SessionCapability::RetransmissionAck.into(),
190 SessionCapability::NoDelay => hopr_lib::SessionCapability::NoDelay.into(),
191 SessionCapability::NoRateControl => hopr_lib::SessionCapability::NoRateControl.into(),
192 }
193 }
194}
195
196#[serde_as]
197#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
198#[serde(rename_all = "camelCase")]
199pub(crate) struct SessionWebsocketClientQueryRequest {
200 #[serde_as(as = "DisplayFromStr")]
201 #[schema(required = true, value_type = String)]
202 pub destination: Address,
203 #[schema(required = true)]
204 pub hops: u8,
205 #[cfg(feature = "explicit-path")]
206 #[schema(required = false, value_type = String)]
207 pub path: Option<Vec<Address>>,
208 #[schema(required = true)]
209 #[serde_as(as = "Vec<DisplayFromStr>")]
210 pub capabilities: Vec<SessionCapability>,
211 #[schema(required = true)]
212 #[serde_as(as = "DisplayFromStr")]
213 pub target: SessionTargetSpec,
214 #[schema(required = false)]
215 #[serde(default = "default_protocol")]
216 pub protocol: IpProtocol,
217}
218
219#[inline]
220fn default_protocol() -> IpProtocol {
221 IpProtocol::TCP
222}
223
224impl SessionWebsocketClientQueryRequest {
225 pub(crate) async fn into_protocol_session_config(
226 self,
227 ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
228 #[cfg(not(feature = "explicit-path"))]
229 let path_options = hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?);
230
231 #[cfg(feature = "explicit-path")]
232 let path_options = if let Some(path) = self.path {
233 hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?)
235 } else {
236 hopr_lib::RoutingOptions::Hops((self.hops as u32).try_into()?)
237 };
238
239 let mut capabilities = SessionCapabilities::empty();
240 capabilities.extend(self.capabilities.into_iter().flat_map(SessionCapabilities::from));
241
242 Ok((
243 self.destination,
244 self.target.into_target(self.protocol)?,
245 SessionClientConfig {
246 forward_path_options: path_options.clone(),
247 return_path_options: path_options.clone(), capabilities,
249 ..Default::default()
250 },
251 ))
252 }
253}
254
255#[derive(Debug, Default, Clone, Deserialize, utoipa::ToSchema)]
256#[schema(value_type = String, format = Binary)]
257#[allow(dead_code)] struct WssData(Vec<u8>);
259
260#[allow(dead_code)] #[utoipa::path(
272 get,
273 path = const_format::formatcp!("{BASE_PATH}/session/websocket"),
274 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
275 request_body(
276 content = SessionWebsocketClientQueryRequest,
277 content_type = "application/json",
278 description = "Websocket endpoint exposing a binary socket-like connection to a peer through websockets using underlying HOPR sessions.",
279 ),
280 responses(
281 (status = 200, description = "Successfully created a new client websocket session."),
282 (status = 401, description = "Invalid authorization token.", body = ApiError),
283 (status = 422, description = "Unknown failure", body = ApiError),
284 (status = 429, description = "Too many open websocket connections.", body = ApiError),
285 ),
286 security(
287 ("api_token" = []),
288 ("bearer_token" = [])
289 ),
290 tag = "Session",
291 )]
292
293pub(crate) async fn websocket(
294 ws: WebSocketUpgrade,
295 Query(query): Query<SessionWebsocketClientQueryRequest>,
296 State(state): State<Arc<InternalState>>,
297) -> Result<impl IntoResponse, impl IntoResponse> {
298 let (dst, target, data) = query
299 .into_protocol_session_config()
300 .await
301 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
302
303 let hopr = state.hopr.clone();
304 let session: HoprSession = hopr.connect_to(dst, target, data).await.map_err(|e| {
305 error!(error = %e, "Failed to establish session");
306 (
307 StatusCode::UNPROCESSABLE_ENTITY,
308 ApiErrorStatus::UnknownFailure(e.to_string()),
309 )
310 })?;
311
312 Ok::<_, (StatusCode, ApiErrorStatus)>(ws.on_upgrade(move |socket| websocket_connection(socket, session)))
313}
314
315enum WebSocketInput {
316 Network(Result<Box<[u8]>, std::io::Error>),
317 WsInput(Result<Message, Error>),
318}
319
320const WS_MAX_SESSION_READ_SIZE: usize = 4096;
322
323#[tracing::instrument(level = "debug", skip(socket, session))]
324async fn websocket_connection(socket: WebSocket, session: HoprSession) {
325 let session_id = *session.id();
326
327 let (rx, mut tx) = session.split();
328 let (mut sender, receiver) = socket.split();
329
330 let mut queue = (
331 receiver.map(WebSocketInput::WsInput),
332 AsyncReadStreamer::<WS_MAX_SESSION_READ_SIZE, _>(rx).map(WebSocketInput::Network),
333 )
334 .merge();
335
336 let (mut bytes_to_session, mut bytes_from_session) = (0, 0);
337
338 while let Some(v) = queue.next().await {
339 match v {
340 WebSocketInput::Network(bytes) => match bytes {
341 Ok(bytes) => {
342 let len = bytes.len();
343 if let Err(e) = sender.send(Message::Binary(bytes.into())).await {
344 error!(
345 error = %e,
346 "Failed to emit read data onto the websocket, closing connection"
347 );
348 break;
349 };
350 bytes_from_session += len;
351 }
352 Err(e) => {
353 error!(
354 error = %e,
355 "Failed to push data from network to socket, closing connection"
356 );
357 break;
358 }
359 },
360 WebSocketInput::WsInput(ws_in) => match ws_in {
361 Ok(Message::Binary(data)) => {
362 let len = data.len();
363 if let Err(e) = tx.write(data.as_ref()).await {
364 error!(error = %e, "Failed to write data to the session, closing connection");
365 break;
366 }
367 bytes_to_session += len;
368 }
369 Ok(Message::Text(_)) => {
370 error!("Received string instead of binary data, closing connection");
371 break;
372 }
373 Ok(Message::Close(_)) => {
374 debug!("Received close frame, closing connection");
375 break;
376 }
377 Ok(m) => trace!(message = ?m, "skipping an unsupported websocket message"),
378 Err(e) => {
379 error!(error = %e, "Failed to get a valid websocket message, closing connection");
380 break;
381 }
382 },
383 }
384 }
385
386 info!(%session_id, bytes_from_session, bytes_to_session, "WS session connection ended");
387}
388
389#[serde_as]
390#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
391#[schema(example = json!({ "Hops": 1 }))]
392pub enum RoutingOptions {
394 #[cfg(feature = "explicit-path")]
395 #[schema(value_type = Vec<String>)]
396 IntermediatePath(#[serde_as(as = "Vec<DisplayFromStr>")] Vec<Address>),
397 Hops(usize),
398}
399
400impl RoutingOptions {
401 pub(crate) async fn resolve(self) -> Result<hopr_lib::RoutingOptions, ApiErrorStatus> {
402 Ok(match self {
403 #[cfg(feature = "explicit-path")]
404 RoutingOptions::IntermediatePath(path) => hopr_lib::RoutingOptions::IntermediatePath(path.try_into()?),
405 RoutingOptions::Hops(hops) => hopr_lib::RoutingOptions::Hops(hops.try_into()?),
406 })
407 }
408}
409
410#[serde_as]
411#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
412#[schema(example = json!({
413 "destination": "0x1B482420Afa04aeC1Ef0e4a00C18451E84466c75",
414 "forwardPath": { "Hops": 1 },
415 "returnPath": { "Hops": 1 },
416 "target": {"Plain": "localhost:8080"},
417 "listenHost": "127.0.0.1:10000",
418 "capabilities": ["Retransmission", "Segmentation"],
419 "responseBuffer": "2 MB",
420 "maxSurbUpstream": "2000 kb/s",
421 "sessionPool": 0,
422 "maxClientSessions": 2
423 }))]
424#[serde(rename_all = "camelCase")]
425pub(crate) struct SessionClientRequest {
427 #[serde_as(as = "DisplayFromStr")]
429 #[schema(value_type = String)]
430 pub destination: Address,
431 pub forward_path: RoutingOptions,
433 pub return_path: RoutingOptions,
435 pub target: SessionTargetSpec,
437 pub listen_host: Option<String>,
442 #[serde_as(as = "Option<Vec<DisplayFromStr>>")]
443 pub capabilities: Option<Vec<SessionCapability>>,
447 #[serde_as(as = "Option<DisplayFromStr>")]
457 #[schema(value_type = String)]
458 pub response_buffer: Option<bytesize::ByteSize>,
459 #[serde(default)]
467 #[serde(with = "human_bandwidth::option")]
468 #[schema(value_type = String)]
469 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
470 pub session_pool: Option<usize>,
477 pub max_client_sessions: Option<usize>,
487}
488
489impl SessionClientRequest {
490 pub(crate) async fn into_protocol_session_config(
491 self,
492 target_protocol: IpProtocol,
493 ) -> Result<(Address, SessionTarget, SessionClientConfig), ApiErrorStatus> {
494 Ok((
495 self.destination,
496 self.target.into_target(target_protocol)?,
497 SessionClientConfig {
498 forward_path_options: self.forward_path.resolve().await?,
499 return_path_options: self.return_path.resolve().await?,
500 capabilities: self
501 .capabilities
502 .map(|vs| {
503 let mut caps = SessionCapabilities::empty();
504 caps.extend(vs.into_iter().map(SessionCapabilities::from));
505 caps
506 })
507 .unwrap_or_else(|| match target_protocol {
508 IpProtocol::TCP => {
509 hopr_lib::SessionCapability::RetransmissionAck
510 | hopr_lib::SessionCapability::RetransmissionNack
511 | hopr_lib::SessionCapability::Segmentation
512 }
513 _ => SessionCapability::Segmentation.into(),
515 }),
516 surb_management: SessionConfig {
517 response_buffer: self.response_buffer,
518 max_surb_upstream: self.max_surb_upstream,
519 }
520 .into(),
521 ..Default::default()
522 },
523 ))
524 }
525}
526
527#[serde_as]
528#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
529#[schema(example = json!({
530 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
531 "target": "example.com:80",
532 "forwardPath": { "Hops": 1 },
533 "returnPath": { "Hops": 1 },
534 "protocol": "tcp",
535 "ip": "127.0.0.1",
536 "port": 5542,
537 "mtu": 1020,
538 "surb_len": 400,
539 "active_clients": []
540 }))]
541#[serde(rename_all = "camelCase")]
542pub(crate) struct SessionClientResponse {
544 #[schema(example = "example.com:80")]
545 pub target: String,
547 #[serde_as(as = "DisplayFromStr")]
549 #[schema(value_type = String)]
550 pub destination: Address,
551 pub forward_path: RoutingOptions,
553 pub return_path: RoutingOptions,
555 #[serde_as(as = "DisplayFromStr")]
557 #[schema(example = "tcp")]
558 pub protocol: IpProtocol,
559 #[schema(example = "127.0.0.1")]
561 pub ip: String,
562 #[schema(example = 5542)]
563 pub port: u16,
565 pub mtu: usize,
567 pub surb_len: usize,
571 pub active_clients: Vec<String>,
576}
577
578fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
583 match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
584 Some(Err(requested)) => {
585 debug!(requested, %default, "using partially default listen host");
587 std::net::SocketAddr::new(
588 requested.parse().unwrap_or(default.ip()),
589 requested
590 .strip_prefix(":")
591 .and_then(|p| u16::from_str(p).ok())
592 .unwrap_or(default.port()),
593 )
594 }
595 Some(Ok(requested)) => {
596 debug!(%requested, "using requested listen host");
597 requested
598 }
599 None => {
600 debug!(%default, "using default listen host");
601 default
602 }
603 }
604}
605
606struct SessionPool {
607 pool: Option<Arc<std::sync::Mutex<VecDeque<HoprSession>>>>,
608 ah: Option<AbortHandle>,
609}
610
611impl SessionPool {
612 pub const MAX_SESSION_POOL_SIZE: usize = 5;
613
614 async fn new(
615 size: usize,
616 dst: Address,
617 target: SessionTarget,
618 cfg: SessionClientConfig,
619 hopr: Arc<Hopr>,
620 ) -> Result<Self, (StatusCode, ApiErrorStatus)> {
621 let pool = Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(size)));
622 let hopr_clone = hopr.clone();
623 let pool_clone = pool.clone();
624 futures::stream::iter(0..size.min(Self::MAX_SESSION_POOL_SIZE))
625 .map(Ok)
626 .try_for_each_concurrent(Self::MAX_SESSION_POOL_SIZE, move |i| {
627 let pool = pool_clone.clone();
628 let hopr = hopr_clone.clone();
629 let target = target.clone();
630 let cfg = cfg.clone();
631 async move {
632 match hopr.connect_to(dst, target.clone(), cfg.clone()).await {
633 Ok(s) => {
634 debug!(session_id = %s.id(), num_session = i, "created a new session in pool");
635 pool.lock()
636 .map_err(|_| {
637 (
638 StatusCode::INTERNAL_SERVER_ERROR,
639 ApiErrorStatus::UnknownFailure("lock failed".into()),
640 )
641 })?
642 .push_back(s);
643 Ok(())
644 }
645 Err(error) => {
646 error!(%error, num_session = i, "failed to establish session for pool");
647 Err((
648 StatusCode::INTERNAL_SERVER_ERROR,
649 ApiErrorStatus::UnknownFailure(format!(
650 "failed to establish session #{i} in pool to {dst}: {error}"
651 )),
652 ))
653 }
654 }
655 }
656 })
657 .await?;
658
659 if !pool.lock().map(|p| p.is_empty()).unwrap_or(true) {
661 let pool_clone_1 = pool.clone();
662 let pool_clone_2 = pool.clone();
663 let pool_clone_3 = pool.clone();
664 Ok(Self {
665 pool: Some(pool),
666 ah: Some(hopr_async_runtime::spawn_as_abortable!(
667 futures_time::stream::interval(futures_time::time::Duration::from(
668 std::time::Duration::from_secs(1).max(hopr.config().session.idle_timeout / 2)
669 ))
670 .take_while(move |_| {
671 futures::future::ready(pool_clone_1.lock().is_ok_and(|p| !p.is_empty()))
673 })
674 .flat_map(move |_| {
675 let ids = pool_clone_2.lock().ok().map(|v| v.iter().map(|s| *s.id()).collect::<Vec<_>>());
677 futures::stream::iter(ids.into_iter().flatten())
678 })
679 .for_each(move |id| {
680 let hopr = hopr.clone();
681 let pool = pool_clone_3.clone();
682 async move {
683 if let Err(error) = hopr.keep_alive_session(&id).await {
685 error!(%error, %dst, session_id = %id, "session in pool is not alive, removing from pool");
686 if let Ok(mut pool) = pool.lock() {
687 pool.retain(|s| *s.id() != id);
688 }
689 }
690 }
691 })
692 ))
693 })
694 } else {
695 Ok(Self { pool: None, ah: None })
696 }
697 }
698
699 fn pop(&mut self) -> Option<HoprSession> {
700 self.pool.as_ref().and_then(|pool| pool.lock().ok()?.pop_front())
701 }
702}
703
704impl Drop for SessionPool {
705 fn drop(&mut self) {
706 if let Some(ah) = self.ah.take() {
707 ah.abort();
708 }
709 }
710}
711
712async fn create_tcp_client_binding(
713 bind_host: std::net::SocketAddr,
714 state: Arc<InternalState>,
715 args: SessionClientRequest,
716) -> Result<(std::net::SocketAddr, Option<HoprSessionId>), (StatusCode, ApiErrorStatus)> {
717 let target_spec = args.target.clone();
718 let (dst, target, data) = args
719 .clone()
720 .into_protocol_session_config(IpProtocol::TCP)
721 .await
722 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
723
724 let (bound_host, tcp_listener) = tcp_listen_on(bind_host).await.map_err(|e| {
726 if e.kind() == std::io::ErrorKind::AddrInUse {
727 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
728 } else {
729 (
730 StatusCode::UNPROCESSABLE_ENTITY,
731 ApiErrorStatus::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}")),
732 )
733 }
734 })?;
735 info!(%bound_host, "TCP session listener bound");
736
737 let hopr = state.hopr.clone();
740
741 let session_pool_size = args.session_pool.unwrap_or(0);
743 let mut session_pool = SessionPool::new(session_pool_size, dst, target.clone(), data.clone(), hopr.clone()).await?;
744
745 let active_sessions = Arc::new(DashMap::new());
746 let mut max_clients = args.max_client_sessions.unwrap_or(5).max(1);
747
748 if max_clients < session_pool_size {
749 max_clients = session_pool_size;
750 }
751
752 let (abort_handle, abort_reg) = AbortHandle::new_pair();
754 let active_sessions_clone = active_sessions.clone();
755 hopr_async_runtime::prelude::spawn(async move {
756 let active_sessions_clone_2 = active_sessions_clone.clone();
757
758 futures::stream::Abortable::new(tokio_stream::wrappers::TcpListenerStream::new(tcp_listener), abort_reg)
759 .and_then(|sock| async { Ok((sock.peer_addr()?, sock)) })
760 .for_each(move |accepted_client| {
761 let data = data.clone();
762 let target = target.clone();
763 let hopr = hopr.clone();
764 let active_sessions = active_sessions_clone_2.clone();
765
766 let maybe_pooled_session = accepted_client.is_ok().then(|| session_pool.pop()).flatten();
768 async move {
769 match accepted_client {
770 Ok((sock_addr, mut stream)) => {
771 debug!(?sock_addr, "incoming TCP connection");
772
773 if active_sessions.len() >= max_clients {
776 error!(?bind_host, "no more client slots available at listener");
777 use tokio::io::AsyncWriteExt;
778 if let Err(error) = stream.shutdown().await {
779 error!(%error, ?sock_addr, "failed to shutdown TCP connection");
780 }
781 return;
782 }
783
784 let session = match maybe_pooled_session {
786 Some(s) => {
787 debug!(session_id = %s.id(), "using pooled session");
788 s
789 }
790 None => {
791 debug!("no more active sessions in the pool, creating a new one");
792 match hopr.connect_to(dst, target, data).await {
793 Ok(s) => s,
794 Err(error) => {
795 error!(%error, "failed to establish session");
796 return;
797 }
798 }
799 }
800 };
801
802 let session_id = *session.id();
803 debug!(?sock_addr, %session_id, "new session for incoming TCP connection");
804
805 let (abort_handle, abort_reg) = AbortHandle::new_pair();
806 active_sessions.insert(session_id, (sock_addr, abort_handle));
807
808 #[cfg(all(feature = "prometheus", not(test)))]
809 METRIC_ACTIVE_CLIENTS.increment(&["tcp"], 1.0);
810
811 hopr_async_runtime::prelude::spawn(
812 bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, Some(abort_reg)).then(
815 move |_| async move {
816 active_sessions.remove(&session_id);
819
820 debug!(%session_id, "tcp session has ended");
821
822 #[cfg(all(feature = "prometheus", not(test)))]
823 METRIC_ACTIVE_CLIENTS.decrement(&["tcp"], 1.0);
824 },
825 ),
826 );
827 }
828 Err(error) => error!(%error, "failed to accept connection"),
829 }
830 }
831 })
832 .await;
833
834 active_sessions_clone.iter().for_each(|entry| {
836 let (sock_addr, handle) = entry.value();
837 debug!(session_id = %entry.key(), ?sock_addr, "aborting opened TCP session after listener has been closed");
838 handle.abort()
839 });
840 });
841
842 state.open_listeners.write_arc().await.insert(
843 ListenerId(hopr_network_types::types::IpProtocol::TCP, bound_host),
844 StoredSessionEntry {
845 destination: dst,
846 target: target_spec.clone(),
847 forward_path: args.forward_path.clone(),
848 return_path: args.return_path.clone(),
849 clients: active_sessions,
850 abort_handle,
851 },
852 );
853 Ok((bound_host, None))
854}
855
856async fn create_udp_client_binding(
857 bind_host: std::net::SocketAddr,
858 state: Arc<InternalState>,
859 args: SessionClientRequest,
860) -> Result<(std::net::SocketAddr, Option<HoprSessionId>), (StatusCode, ApiErrorStatus)> {
861 let target_spec = args.target.clone();
862 let (dst, target, data) = args
863 .clone()
864 .into_protocol_session_config(IpProtocol::UDP)
865 .await
866 .map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
867
868 let (bound_host, udp_socket) = udp_bind_to(bind_host).await.map_err(|e| {
870 if e.kind() == std::io::ErrorKind::AddrInUse {
871 (StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed)
872 } else {
873 (
874 StatusCode::UNPROCESSABLE_ENTITY,
875 ApiErrorStatus::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}")),
876 )
877 }
878 })?;
879
880 info!(%bound_host, "UDP session listener bound");
881
882 let hopr = state.hopr.clone();
883
884 let session = hopr.connect_to(dst, target, data).await.map_err(|e| {
886 (
887 StatusCode::UNPROCESSABLE_ENTITY,
888 ApiErrorStatus::UnknownFailure(e.to_string()),
889 )
890 })?;
891
892 let open_listeners_clone = state.open_listeners.clone();
893 let listener_id = ListenerId(hopr_network_types::types::IpProtocol::UDP, bound_host);
894
895 let (abort_handle, abort_reg) = AbortHandle::new_pair();
906 let clients = Arc::new(DashMap::new());
907 let session_id = *session.id();
909 clients.insert(session_id, (bind_host, abort_handle.clone()));
910 hopr_async_runtime::prelude::spawn(async move {
911 #[cfg(all(feature = "prometheus", not(test)))]
912 METRIC_ACTIVE_CLIENTS.increment(&["udp"], 1.0);
913
914 bind_session_to_stream(session, udp_socket, HOPR_UDP_BUFFER_SIZE, Some(abort_reg)).await;
915
916 #[cfg(all(feature = "prometheus", not(test)))]
917 METRIC_ACTIVE_CLIENTS.decrement(&["udp"], 1.0);
918
919 open_listeners_clone.write_arc().await.remove(&listener_id);
921 });
922
923 state.open_listeners.write_arc().await.insert(
924 listener_id,
925 StoredSessionEntry {
926 destination: dst,
927 target: target_spec.clone(),
928 forward_path: args.forward_path.clone(),
929 return_path: args.return_path.clone(),
930 abort_handle,
931 clients,
932 },
933 );
934 Ok((bound_host, Some(session_id)))
935}
936
937#[utoipa::path(
954 post,
955 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
956 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
957 params(
958 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
959 ),
960 request_body(
961 content = SessionClientRequest,
962 description = "Creates a new client HOPR session that will start listening on a dedicated port. Once the port is bound, it is possible to use the socket for bidirectional read and write communication.",
963 content_type = "application/json"),
964 responses(
965 (status = 200, description = "Successfully created a new client session.", body = SessionClientResponse),
966 (status = 400, description = "Invalid IP protocol.", body = ApiError),
967 (status = 401, description = "Invalid authorization token.", body = ApiError),
968 (status = 409, description = "Listening address and port already in use.", body = ApiError),
969 (status = 422, description = "Unknown failure", body = ApiError),
970 ),
971 security(
972 ("api_token" = []),
973 ("bearer_token" = [])
974 ),
975 tag = "Session"
976 )]
977pub(crate) async fn create_client(
978 State(state): State<Arc<InternalState>>,
979 Path(protocol): Path<IpProtocol>,
980 Json(args): Json<SessionClientRequest>,
981) -> Result<impl IntoResponse, impl IntoResponse> {
982 let bind_host: std::net::SocketAddr = build_binding_host(args.listen_host.as_deref(), state.default_listen_host);
983
984 let listener_id = ListenerId(protocol.into(), bind_host);
985 if bind_host.port() > 0 && state.open_listeners.read_arc().await.contains_key(&listener_id) {
986 return Err((StatusCode::CONFLICT, ApiErrorStatus::ListenHostAlreadyUsed));
987 }
988
989 debug!("binding {protocol} session listening socket to {bind_host}");
990 let (bound_host, udp_session_id) = match protocol {
991 IpProtocol::TCP => create_tcp_client_binding(bind_host, state.clone(), args.clone()).await?,
992 IpProtocol::UDP => create_udp_client_binding(bind_host, state.clone(), args.clone()).await?,
993 };
994
995 Ok::<_, (StatusCode, ApiErrorStatus)>(
996 (
997 StatusCode::OK,
998 Json(SessionClientResponse {
999 protocol,
1000 ip: bound_host.ip().to_string(),
1001 port: bound_host.port(),
1002 target: args.target.to_string(),
1003 destination: args.destination,
1004 forward_path: args.forward_path.clone(),
1005 return_path: args.return_path.clone(),
1006 mtu: SESSION_MTU,
1007 surb_len: SURB_SIZE,
1008 active_clients: udp_session_id.into_iter().map(|s| s.to_string()).collect(),
1009 }),
1010 )
1011 .into_response(),
1012 )
1013}
1014
1015#[utoipa::path(
1017 get,
1018 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}"),
1019 description = "Lists existing Session listeners for the given IP protocol.",
1020 params(
1021 ("protocol" = String, Path, description = "IP transport protocol", example = "tcp"),
1022 ),
1023 responses(
1024 (status = 200, description = "Opened session listeners for the given IP protocol.", body = Vec<SessionClientResponse>, example = json!([
1025 {
1026 "target": "example.com:80",
1027 "destination": "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F",
1028 "forwardPath": { "Hops": 1 },
1029 "returnPath": { "Hops": 1 },
1030 "protocol": "tcp",
1031 "ip": "127.0.0.1",
1032 "port": 5542,
1033 "surbLen": 400,
1034 "mtu": 1020,
1035 "activeClients": []
1036 }
1037 ])),
1038 (status = 400, description = "Invalid IP protocol.", body = ApiError),
1039 (status = 401, description = "Invalid authorization token.", body = ApiError),
1040 (status = 422, description = "Unknown failure", body = ApiError)
1041 ),
1042 security(
1043 ("api_token" = []),
1044 ("bearer_token" = [])
1045 ),
1046 tag = "Session",
1047)]
1048pub(crate) async fn list_clients(
1049 State(state): State<Arc<InternalState>>,
1050 Path(protocol): Path<IpProtocol>,
1051) -> Result<impl IntoResponse, impl IntoResponse> {
1052 let response = state
1053 .open_listeners
1054 .read_arc()
1055 .await
1056 .iter()
1057 .filter(|(id, _)| id.0 == protocol.into())
1058 .map(|(id, entry)| SessionClientResponse {
1059 protocol,
1060 ip: id.1.ip().to_string(),
1061 port: id.1.port(),
1062 target: entry.target.to_string(),
1063 forward_path: entry.forward_path.clone(),
1064 return_path: entry.return_path.clone(),
1065 destination: entry.destination,
1066 mtu: SESSION_MTU,
1067 surb_len: SURB_SIZE,
1068 active_clients: entry.clients.iter().map(|e| e.key().to_string()).collect(),
1069 })
1070 .collect::<Vec<_>>();
1071
1072 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(response)).into_response())
1073}
1074
1075#[serde_as]
1076#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
1077#[schema(example = json!({
1078 "responseBuffer": "2 MB",
1079 "maxSurbUpstream": "2 Mbps"
1080 }))]
1081#[serde(rename_all = "camelCase")]
1082pub(crate) struct SessionConfig {
1083 #[serde(default)]
1093 #[serde_as(as = "Option<DisplayFromStr>")]
1094 #[schema(value_type = String)]
1095 pub response_buffer: Option<bytesize::ByteSize>,
1096 #[serde(default)]
1104 #[serde(with = "human_bandwidth::option")]
1105 #[schema(value_type = String)]
1106 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
1107}
1108
1109impl From<SessionConfig> for Option<SurbBalancerConfig> {
1110 fn from(value: SessionConfig) -> Self {
1111 match value.response_buffer {
1112 Some(buffer_size) if buffer_size.as_u64() >= 2 * SESSION_MTU as u64 => Some(SurbBalancerConfig {
1114 target_surb_buffer_size: buffer_size.as_u64() / SESSION_MTU as u64,
1115 max_surbs_per_sec: value
1116 .max_surb_upstream
1117 .map(|b| (b.as_bps() as usize / (8 * SURB_SIZE)) as u64)
1118 .unwrap_or_else(|| SurbBalancerConfig::default().max_surbs_per_sec),
1119 ..Default::default()
1120 }),
1121 Some(_) => None,
1123 None => Some(SurbBalancerConfig::default()),
1125 }
1126 }
1127}
1128
1129impl From<SurbBalancerConfig> for SessionConfig {
1130 fn from(value: SurbBalancerConfig) -> Self {
1131 Self {
1132 response_buffer: Some(bytesize::ByteSize::b(
1133 value.target_surb_buffer_size * SESSION_MTU as u64,
1134 )),
1135 max_surb_upstream: Some(human_bandwidth::re::bandwidth::Bandwidth::from_bps(
1136 value.max_surbs_per_sec * (8 * SURB_SIZE) as u64,
1137 )),
1138 }
1139 }
1140}
1141
1142#[utoipa::path(
1143 post,
1144 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
1145 description = "Updates configuration of an existing active session.",
1146 params(
1147 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
1148 ),
1149 request_body(
1150 content = SessionConfig,
1151 description = "Allows updating of several parameters of an existing active session.",
1152 content_type = "application/json"),
1153 responses(
1154 (status = 204, description = "Successfully updated the configuration"),
1155 (status = 400, description = "Invalid configuration.", body = ApiError),
1156 (status = 401, description = "Invalid authorization token.", body = ApiError),
1157 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
1158 (status = 406, description = "Session cannot be reconfigured.", body = ApiError),
1159 (status = 422, description = "Unknown failure", body = ApiError),
1160 ),
1161 security(
1162 ("api_token" = []),
1163 ("bearer_token" = [])
1164 ),
1165 tag = "Session"
1166)]
1167pub(crate) async fn adjust_session(
1168 State(state): State<Arc<InternalState>>,
1169 Path(session_id): Path<String>,
1170 Json(args): Json<SessionConfig>,
1171) -> Result<impl IntoResponse, impl IntoResponse> {
1172 let session_id = HoprSessionId::from_str(&session_id)
1173 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
1174
1175 if let Some(cfg) = Option::<SurbBalancerConfig>::from(args) {
1176 match state.hopr.update_session_surb_balancer_config(&session_id, cfg).await {
1177 Ok(_) => Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response()),
1178 Err(HoprLibError::TransportError(HoprTransportError::Session(TransportSessionError::Manager(
1179 SessionManagerError::NonExistingSession,
1180 )))) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
1181 Err(e) => Err((
1182 StatusCode::NOT_ACCEPTABLE,
1183 ApiErrorStatus::UnknownFailure(e.to_string()),
1184 )),
1185 }
1186 } else {
1187 Err::<_, (StatusCode, ApiErrorStatus)>((StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))
1188 }
1189}
1190
1191#[utoipa::path(
1192 get,
1193 path = const_format::formatcp!("{BASE_PATH}/session/config/{{id}}"),
1194 description = "Gets configuration of an existing active session.",
1195 params(
1196 ("id" = String, Path, description = "Session ID", example = "0x5112D584a1C72Fc25017:487"),
1197 ),
1198 responses(
1199 (status = 200, description = "Retrieved session configuration.", body = SessionConfig),
1200 (status = 400, description = "Invalid session ID.", body = ApiError),
1201 (status = 401, description = "Invalid authorization token.", body = ApiError),
1202 (status = 404, description = "Given session ID does not refer to an existing Session", body = ApiError),
1203 (status = 422, description = "Unknown failure", body = ApiError),
1204 ),
1205 security(
1206 ("api_token" = []),
1207 ("bearer_token" = [])
1208 ),
1209 tag = "Session"
1210)]
1211pub(crate) async fn session_config(
1212 State(state): State<Arc<InternalState>>,
1213 Path(session_id): Path<String>,
1214) -> Result<impl IntoResponse, impl IntoResponse> {
1215 let session_id = HoprSessionId::from_str(&session_id)
1216 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidSessionId))?;
1217
1218 match state.hopr.get_session_surb_balancer_config(&session_id).await {
1219 Ok(Some(cfg)) => {
1220 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::OK, Json(SessionConfig::from(cfg))).into_response())
1221 }
1222 Ok(None) => Err((StatusCode::NOT_FOUND, ApiErrorStatus::SessionNotFound)),
1223 Err(e) => Err((
1224 StatusCode::UNPROCESSABLE_ENTITY,
1225 ApiErrorStatus::UnknownFailure(e.to_string()),
1226 )),
1227 }
1228}
1229
1230#[derive(
1231 Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, strum::Display, strum::EnumString, utoipa::ToSchema,
1232)]
1233#[strum(serialize_all = "lowercase", ascii_case_insensitive)]
1234#[serde(rename_all = "lowercase")]
1235#[schema(example = "tcp")]
1236pub enum IpProtocol {
1238 #[allow(clippy::upper_case_acronyms)]
1239 TCP,
1240 #[allow(clippy::upper_case_acronyms)]
1241 UDP,
1242}
1243
1244impl From<IpProtocol> for hopr_lib::IpProtocol {
1245 fn from(protocol: IpProtocol) -> hopr_lib::IpProtocol {
1246 match protocol {
1247 IpProtocol::TCP => hopr_lib::IpProtocol::TCP,
1248 IpProtocol::UDP => hopr_lib::IpProtocol::UDP,
1249 }
1250 }
1251}
1252
1253#[serde_as]
1254#[derive(Debug, Serialize, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
1255pub struct SessionCloseClientQuery {
1256 #[serde_as(as = "DisplayFromStr")]
1257 #[schema(value_type = String, example = "tcp")]
1258 pub protocol: IpProtocol,
1260
1261 #[schema(example = "127.0.0.1:8545")]
1263 pub ip: String,
1264
1265 #[schema(value_type = u16, example = 10101)]
1267 pub port: u16,
1268}
1269
1270#[utoipa::path(
1276 delete,
1277 path = const_format::formatcp!("{BASE_PATH}/session/{{protocol}}/{{ip}}/{{port}}"),
1278 description = "Closes an existing Session listener.",
1279 params(SessionCloseClientQuery),
1280 responses(
1281 (status = 204, description = "Listener closed successfully"),
1282 (status = 400, description = "Invalid IP protocol or port.", body = ApiError),
1283 (status = 401, description = "Invalid authorization token.", body = ApiError),
1284 (status = 404, description = "Listener not found.", body = ApiError),
1285 (status = 422, description = "Unknown failure", body = ApiError)
1286 ),
1287 security(
1288 ("api_token" = []),
1289 ("bearer_token" = [])
1290 ),
1291 tag = "Session",
1292)]
1293pub(crate) async fn close_client(
1294 State(state): State<Arc<InternalState>>,
1295 Path(SessionCloseClientQuery { protocol, ip, port }): Path<SessionCloseClientQuery>,
1296) -> Result<impl IntoResponse, impl IntoResponse> {
1297 let listening_ip: IpAddr = ip
1298 .parse()
1299 .map_err(|_| (StatusCode::BAD_REQUEST, ApiErrorStatus::InvalidInput))?;
1300
1301 {
1302 let mut open_listeners = state.open_listeners.write_arc().await;
1303
1304 let mut to_remove = Vec::new();
1305
1306 open_listeners
1308 .iter()
1309 .filter(|(ListenerId(proto, addr), _)| {
1310 let protocol: hopr_lib::IpProtocol = protocol.into();
1311 protocol == *proto && addr.ip() == listening_ip && (addr.port() == port || port == 0)
1312 })
1313 .for_each(|(id, _)| to_remove.push(*id));
1314
1315 if to_remove.is_empty() {
1316 return Err((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput));
1317 }
1318
1319 for bound_addr in to_remove {
1320 let entry = open_listeners
1321 .remove(&bound_addr)
1322 .ok_or((StatusCode::NOT_FOUND, ApiErrorStatus::InvalidInput))?;
1323
1324 entry.abort_handle.abort();
1325 }
1326 }
1327
1328 Ok::<_, (StatusCode, ApiErrorStatus)>((StatusCode::NO_CONTENT, "").into_response())
1329}
1330
1331async fn try_restricted_bind<F, S, Fut>(
1332 addrs: Vec<std::net::SocketAddr>,
1333 range_str: &str,
1334 binder: F,
1335) -> std::io::Result<S>
1336where
1337 F: Fn(Vec<std::net::SocketAddr>) -> Fut,
1338 Fut: Future<Output = std::io::Result<S>>,
1339{
1340 if addrs.is_empty() {
1341 return Err(std::io::Error::other("no valid socket addresses found"));
1342 }
1343
1344 let range = range_str
1345 .split_once(":")
1346 .and_then(
1347 |(a, b)| match u16::from_str(a).and_then(|a| Ok((a, u16::from_str(b)?))) {
1348 Ok((a, b)) if a <= b => Some(a..=b),
1349 _ => None,
1350 },
1351 )
1352 .ok_or(std::io::Error::other(format!("invalid port range {range_str}")))?;
1353
1354 for port in range {
1355 let addrs = addrs
1356 .iter()
1357 .map(|addr| std::net::SocketAddr::new(addr.ip(), port))
1358 .collect::<Vec<_>>();
1359 match binder(addrs).await {
1360 Ok(listener) => return Ok(listener),
1361 Err(error) => debug!(%error, "listen address not usable"),
1362 }
1363 }
1364
1365 Err(std::io::Error::new(
1366 std::io::ErrorKind::AddrNotAvailable,
1367 format!("no valid socket addresses found within range: {range_str}"),
1368 ))
1369}
1370
1371async fn tcp_listen_on<A: std::net::ToSocketAddrs>(address: A) -> std::io::Result<(std::net::SocketAddr, TcpListener)> {
1372 let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
1373
1374 if addrs.iter().all(|a| a.port() == 0) {
1377 if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
1378 let tcp_listener =
1379 try_restricted_bind(
1380 addrs,
1381 &range_str,
1382 |a| async move { TcpListener::bind(a.as_slice()).await },
1383 )
1384 .await?;
1385 return Ok((tcp_listener.local_addr()?, tcp_listener));
1386 }
1387 }
1388
1389 let tcp_listener = TcpListener::bind(addrs.as_slice()).await?;
1390 Ok((tcp_listener.local_addr()?, tcp_listener))
1391}
1392
1393async fn udp_bind_to<A: std::net::ToSocketAddrs>(
1394 address: A,
1395) -> std::io::Result<(std::net::SocketAddr, ConnectedUdpStream)> {
1396 let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
1397
1398 let builder = ConnectedUdpStream::builder()
1399 .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
1400 .with_foreign_data_mode(ForeignDataMode::Discard) .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1402 .with_receiver_parallelism(UdpStreamParallelism::Auto);
1403
1404 if addrs.iter().all(|a| a.port() == 0) {
1407 if let Ok(range_str) = std::env::var(crate::env::HOPRD_SESSION_PORT_RANGE) {
1408 let udp_listener = try_restricted_bind(addrs, &range_str, |addrs| {
1409 futures::future::ready(builder.clone().build(addrs.as_slice()))
1410 })
1411 .await?;
1412
1413 return Ok((*udp_listener.bound_address(), udp_listener));
1414 }
1415 }
1416
1417 let udp_socket = builder.build(address)?;
1418 Ok((*udp_socket.bound_address(), udp_socket))
1419}
1420
1421async fn bind_session_to_stream<T>(
1422 mut session: HoprSession,
1423 mut stream: T,
1424 max_buf: usize,
1425 abort_reg: Option<AbortRegistration>,
1426) where
1427 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
1428{
1429 let session_id = *session.id();
1430 match transfer_session(&mut session, &mut stream, max_buf, abort_reg).await {
1431 Ok((session_to_stream_bytes, stream_to_session_bytes)) => info!(
1432 session_id = ?session_id,
1433 session_to_stream_bytes, stream_to_session_bytes, "client session ended",
1434 ),
1435 Err(error) => error!(
1436 session_id = ?session_id,
1437 %error,
1438 "error during data transfer"
1439 ),
1440 }
1441}
1442
1443#[cfg(test)]
1444mod tests {
1445 use anyhow::Context;
1446 use futures::{
1447 FutureExt, StreamExt,
1448 channel::mpsc::{UnboundedReceiver, UnboundedSender},
1449 };
1450 use futures_time::future::FutureExt as TimeFutureExt;
1451 use hopr_crypto_types::crypto_traits::Randomizable;
1452 use hopr_lib::{ApplicationData, HoprPseudonym};
1453 use hopr_network_types::prelude::DestinationRouting;
1454 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1455
1456 use super::*;
1457
1458 fn loopback_transport() -> (
1459 UnboundedSender<(DestinationRouting, ApplicationData)>,
1460 UnboundedReceiver<Box<[u8]>>,
1461 ) {
1462 let (input_tx, input_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationData)>();
1463 let (output_tx, output_rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
1464 tokio::task::spawn(
1465 input_rx
1466 .map(|(_, data)| Ok(data.plain_text))
1467 .forward(output_tx)
1468 .map(|e| tracing::debug!(?e, "loopback transport completed")),
1469 );
1470
1471 (input_tx, output_rx)
1472 }
1473
1474 #[tokio::test]
1475 async fn hoprd_session_connection_should_create_a_working_tcp_socket_through_which_data_can_be_sent_and_received()
1476 -> anyhow::Result<()> {
1477 let session_id = hopr_lib::HoprSessionId::new(4567u64, HoprPseudonym::random());
1478 let peer: hopr_lib::Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
1479 let session = hopr_lib::HoprSession::new(
1480 session_id,
1481 hopr_lib::DestinationRouting::forward_only(
1482 peer,
1483 hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1484 ),
1485 None,
1486 loopback_transport(),
1487 None,
1488 )?;
1489
1490 let (bound_addr, tcp_listener) = tcp_listen_on(("127.0.0.1", 0)).await.context("listen_on failed")?;
1491
1492 tokio::task::spawn(async move {
1493 match tcp_listener.accept().await {
1494 Ok((stream, _)) => bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, None).await,
1495 Err(e) => error!("failed to accept connection: {e}"),
1496 }
1497 });
1498
1499 let mut tcp_stream = tokio::net::TcpStream::connect(bound_addr)
1500 .await
1501 .context("connect failed")?;
1502
1503 let data = vec![b"hello", b"world", b"this ", b"is ", b" a", b" test"];
1504
1505 for d in data.clone().into_iter() {
1506 tcp_stream.write_all(d).await.context("write failed")?;
1507 }
1508
1509 for d in data.iter() {
1510 let mut buf = vec![0; d.len()];
1511 tcp_stream.read_exact(&mut buf).await.context("read failed")?;
1512 }
1513
1514 Ok(())
1515 }
1516
1517 #[test_log::test(tokio::test)]
1518 async fn hoprd_session_connection_should_create_a_working_udp_socket_through_which_data_can_be_sent_and_received()
1519 -> anyhow::Result<()> {
1520 let session_id = hopr_lib::HoprSessionId::new(4567u64, HoprPseudonym::random());
1521 let peer: hopr_lib::Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
1522 let session = hopr_lib::HoprSession::new(
1523 session_id,
1524 hopr_lib::DestinationRouting::forward_only(
1525 peer,
1526 hopr_lib::RoutingOptions::IntermediatePath(Default::default()),
1527 ),
1528 None,
1529 loopback_transport(),
1530 None,
1531 )?;
1532
1533 let (listen_addr, udp_listener) = udp_bind_to(("127.0.0.1", 0)).await.context("udp_bind_to failed")?;
1534
1535 let (abort_handle, abort_registration) = AbortHandle::new_pair();
1536 let jh = tokio::task::spawn(bind_session_to_stream(
1537 session,
1538 udp_listener,
1539 ApplicationData::PAYLOAD_SIZE,
1540 Some(abort_registration),
1541 ));
1542
1543 let mut udp_stream = ConnectedUdpStream::builder()
1544 .with_buffer_size(ApplicationData::PAYLOAD_SIZE)
1545 .with_queue_size(HOPR_UDP_QUEUE_SIZE)
1546 .with_counterparty(listen_addr)
1547 .build(("127.0.0.1", 0))
1548 .context("bind failed")?;
1549
1550 let data = vec![b"hello", b"world", b"this ", b"is ", b" a", b" test"];
1551
1552 for d in data.clone().into_iter() {
1553 udp_stream.write_all(d).await.context("write failed")?;
1554 }
1556
1557 for d in data.iter() {
1558 let mut buf = vec![0; d.len()];
1559 udp_stream.read_exact(&mut buf).await.context("read failed")?;
1560 }
1561
1562 abort_handle.abort();
1564 jh.timeout(futures_time::time::Duration::from_millis(200)).await??;
1565
1566 Ok(())
1567 }
1568
1569 #[test]
1570 fn test_build_binding_address() {
1571 let default = "10.0.0.1:10000".parse().unwrap();
1572
1573 let result = build_binding_host(Some("127.0.0.1:10000"), default);
1574 assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1575
1576 let result = build_binding_host(None, default);
1577 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1578
1579 let result = build_binding_host(Some("127.0.0.1"), default);
1580 assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1581
1582 let result = build_binding_host(Some(":1234"), default);
1583 assert_eq!(result, "10.0.0.1:1234".parse::<std::net::SocketAddr>().unwrap());
1584
1585 let result = build_binding_host(Some(":"), default);
1586 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1587
1588 let result = build_binding_host(Some(""), default);
1589 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
1590 }
1591}