1use std::{
8 collections::VecDeque, fmt::Formatter, future::Future, hash::Hash, net::SocketAddr, num::NonZeroUsize,
9 str::FromStr, sync::Arc,
10};
11
12use anyhow::anyhow;
13use base64::Engine;
14use bytesize::ByteSize;
15use dashmap::DashMap;
16use futures::{
17 FutureExt, StreamExt, TryStreamExt,
18 future::{AbortHandle, AbortRegistration},
19};
20use hopr_api::{chain::HoprChainApi, db::HoprNodeDbApi};
21use hopr_async_runtime::Abortable;
22use hopr_lib::{
23 Address, Hopr, HoprSession, SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget,
24 errors::HoprLibError, transfer_session,
25};
26use hopr_network_types::{
27 prelude::{ConnectedUdpStream, IpOrHost, IpProtocol, SealedHost, UdpStreamParallelism},
28 udp::ForeignDataMode,
29};
30use hopr_transport::RoutingOptions;
31use human_bandwidth::re::bandwidth::Bandwidth;
32use serde::{Deserialize, Serialize};
33use serde_with::serde_as;
34use tokio::net::TcpListener;
35use tracing::{debug, error, info};
36
37pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
39
40pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
42
43pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
45
46#[cfg(all(feature = "prometheus", not(test)))]
47lazy_static::lazy_static! {
48 static ref METRIC_ACTIVE_CLIENTS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
49 "hopr_session_hoprd_clients",
50 "Number of clients connected at this Entry node",
51 &["type"]
52 ).unwrap();
53}
54
55#[serde_as]
56#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
57pub enum SessionTargetSpec {
59 Plain(String),
60 Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
61 Service(ServiceId),
62}
63
64impl std::fmt::Display for SessionTargetSpec {
65 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66 match self {
67 SessionTargetSpec::Plain(t) => write!(f, "{t}"),
68 SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
69 SessionTargetSpec::Service(t) => write!(f, "#{t}"),
70 }
71 }
72}
73
74impl FromStr for SessionTargetSpec {
75 type Err = HoprLibError;
76
77 fn from_str(s: &str) -> Result<Self, Self::Err> {
78 Ok(if let Some(stripped) = s.strip_prefix("$$") {
79 Self::Sealed(
80 base64::prelude::BASE64_URL_SAFE
81 .decode(stripped)
82 .map_err(|e| HoprLibError::Other(e.into()))?,
83 )
84 } else if let Some(stripped) = s.strip_prefix("#") {
85 Self::Service(
86 stripped
87 .parse()
88 .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
89 )
90 } else {
91 Self::Plain(s.to_owned())
92 })
93 }
94}
95
96impl SessionTargetSpec {
97 pub fn into_target(self, protocol: IpProtocol) -> Result<SessionTarget, HoprLibError> {
98 Ok(match (protocol, self) {
99 (IpProtocol::TCP, SessionTargetSpec::Plain(plain)) => {
100 SessionTarget::TcpStream(IpOrHost::from_str(&plain).map(SealedHost::from)?)
101 }
102 (IpProtocol::UDP, SessionTargetSpec::Plain(plain)) => {
103 SessionTarget::UdpStream(IpOrHost::from_str(&plain).map(SealedHost::from)?)
104 }
105 (IpProtocol::TCP, SessionTargetSpec::Sealed(enc)) => {
106 SessionTarget::TcpStream(SealedHost::Sealed(enc.into_boxed_slice()))
107 }
108 (IpProtocol::UDP, SessionTargetSpec::Sealed(enc)) => {
109 SessionTarget::UdpStream(SealedHost::Sealed(enc.into_boxed_slice()))
110 }
111 (_, SessionTargetSpec::Service(id)) => SessionTarget::ExitNode(id),
112 })
113 }
114}
115
116#[derive(Debug)]
118pub struct StoredSessionEntry {
119 pub destination: Address,
121 pub target: SessionTargetSpec,
123 pub forward_path: RoutingOptions,
125 pub return_path: RoutingOptions,
127 pub max_client_sessions: usize,
129 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
131 pub response_buffer: Option<bytesize::ByteSize>,
134 pub session_pool: Option<usize>,
136 pub abort_handle: AbortHandle,
138
139 clients: Arc<DashMap<SessionId, (SocketAddr, AbortHandle)>>,
140}
141
142impl StoredSessionEntry {
143 pub fn get_clients(&self) -> &Arc<DashMap<SessionId, (SocketAddr, AbortHandle)>> {
144 &self.clients
145 }
146}
147
148pub fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
153 match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
154 Some(Err(requested)) => {
155 debug!(requested, %default, "using partially default listen host");
157 std::net::SocketAddr::new(
158 requested.parse().unwrap_or(default.ip()),
159 requested
160 .strip_prefix(":")
161 .and_then(|p| u16::from_str(p).ok())
162 .unwrap_or(default.port()),
163 )
164 }
165 Some(Ok(requested)) => {
166 debug!(%requested, "using requested listen host");
167 requested
168 }
169 None => {
170 debug!(%default, "using default listen host");
171 default
172 }
173 }
174}
175
176#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
177pub struct ListenerId(pub IpProtocol, pub std::net::SocketAddr);
178
179impl std::fmt::Display for ListenerId {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 write!(f, "{}://{}:{}", self.0, self.1.ip(), self.1.port())
182 }
183}
184
185#[derive(Default)]
186pub struct ListenerJoinHandles(pub DashMap<ListenerId, StoredSessionEntry>);
187
188impl Abortable for ListenerJoinHandles {
189 fn abort_task(&self) {
190 self.0.alter_all(|_, v| {
191 v.abort_handle.abort();
192 v
193 });
194 }
195
196 fn was_aborted(&self) -> bool {
197 self.0.iter().all(|v| v.abort_handle.is_aborted())
198 }
199}
200
201pub struct SessionPool {
202 pool: Option<Arc<parking_lot::Mutex<VecDeque<HoprSession>>>>,
203 ah: Option<AbortHandle>,
204}
205
206impl SessionPool {
207 pub const MAX_SESSION_POOL_SIZE: usize = 5;
208
209 pub async fn new<Chain, Db>(
210 size: usize,
211 dst: Address,
212 target: SessionTarget,
213 cfg: SessionClientConfig,
214 hopr: Arc<Hopr<Chain, Db>>,
215 ) -> Result<Self, anyhow::Error>
216 where
217 Chain: HoprChainApi + Clone + Send + Sync + 'static,
218 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
219 {
220 let pool = Arc::new(parking_lot::Mutex::new(VecDeque::with_capacity(size)));
221 let hopr_clone = hopr.clone();
222 let pool_clone = pool.clone();
223 futures::stream::iter(0..size.min(Self::MAX_SESSION_POOL_SIZE))
224 .map(Ok)
225 .try_for_each_concurrent(Self::MAX_SESSION_POOL_SIZE, move |i| {
226 let pool = pool_clone.clone();
227 let hopr = hopr_clone.clone();
228 let target = target.clone();
229 let cfg = cfg.clone();
230 async move {
231 match hopr.connect_to(dst, target.clone(), cfg.clone()).await {
232 Ok(s) => {
233 debug!(session_id = %s.id(), num_session = i, "created a new session in pool");
234 pool.lock().push_back(s);
235 Ok(())
236 }
237 Err(error) => {
238 error!(%error, num_session = i, "failed to establish session for pool");
239 Err(anyhow!("failed to establish session #{i} in pool to {dst}: {error}"))
240 }
241 }
242 }
243 })
244 .await?;
245
246 if !pool.lock().is_empty() {
248 let pool_clone_1 = pool.clone();
249 let pool_clone_2 = pool.clone();
250 let pool_clone_3 = pool.clone();
251 Ok(Self {
252 pool: Some(pool),
253 ah: Some(hopr_async_runtime::spawn_as_abortable!(
254 futures_time::stream::interval(futures_time::time::Duration::from(
255 std::time::Duration::from_secs(1).max(hopr.config().protocol.session.idle_timeout / 2)
256 ))
257 .take_while(move |_| {
258 futures::future::ready(!pool_clone_1.lock().is_empty())
260 })
261 .flat_map(move |_| {
262 let ids = pool_clone_2.lock().iter().map(|s| *s.id()).collect::<Vec<_>>();
264 futures::stream::iter(ids)
265 })
266 .for_each(move |id| {
267 let hopr = hopr.clone();
268 let pool = pool_clone_3.clone();
269 async move {
270 if let Err(error) = hopr.keep_alive_session(&id).await {
272 error!(%error, %dst, session_id = %id, "session in pool is not alive, removing from pool");
273 pool.lock().retain(|s| *s.id() != id);
274 }
275 }
276 })
277 ))
278 })
279 } else {
280 Ok(Self { pool: None, ah: None })
281 }
282 }
283
284 pub fn pop(&mut self) -> Option<HoprSession> {
285 self.pool.as_ref().and_then(|pool| pool.lock().pop_front())
286 }
287}
288
289impl Drop for SessionPool {
290 fn drop(&mut self) {
291 if let Some(ah) = self.ah.take() {
292 ah.abort();
293 }
294 }
295}
296
297#[allow(clippy::too_many_arguments)]
298pub async fn create_tcp_client_binding<Chain, Db>(
299 bind_host: std::net::SocketAddr,
300 port_range: Option<String>,
301 hopr: Arc<Hopr<Chain, Db>>,
302 open_listeners: Arc<ListenerJoinHandles>,
303 destination: Address,
304 target_spec: SessionTargetSpec,
305 config: SessionClientConfig,
306 use_session_pool: Option<usize>,
307 max_client_sessions: Option<usize>,
308) -> Result<(std::net::SocketAddr, Option<SessionId>, usize), BindError>
309where
310 Chain: HoprChainApi + Clone + Send + Sync + 'static,
311 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
312{
313 let (bound_host, tcp_listener) = tcp_listen_on(bind_host, port_range).await.map_err(|e| {
315 if e.kind() == std::io::ErrorKind::AddrInUse {
316 BindError::ListenHostAlreadyUsed
317 } else {
318 BindError::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}"))
319 }
320 })?;
321 info!(%bound_host, "TCP session listener bound");
322
323 let target = target_spec
326 .clone()
327 .into_target(IpProtocol::TCP)
328 .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
329
330 let session_pool_size = use_session_pool.unwrap_or(0);
332 let mut session_pool = SessionPool::new(
333 session_pool_size,
334 destination,
335 target.clone(),
336 config.clone(),
337 hopr.clone(),
338 )
339 .await
340 .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
341
342 let active_sessions = Arc::new(DashMap::new());
343 let mut max_clients = max_client_sessions.unwrap_or(5).max(1);
344
345 if max_clients < session_pool_size {
346 max_clients = session_pool_size;
347 }
348
349 let config_clone = config.clone();
350 let (abort_handle, abort_reg) = AbortHandle::new_pair();
352 let active_sessions_clone = active_sessions.clone();
353 hopr_async_runtime::prelude::spawn(async move {
354 let active_sessions_clone_2 = active_sessions_clone.clone();
355
356 futures::stream::Abortable::new(tokio_stream::wrappers::TcpListenerStream::new(tcp_listener), abort_reg)
357 .and_then(|sock| async { Ok((sock.peer_addr()?, sock)) })
358 .for_each(move |accepted_client| {
359 let data = config_clone.clone();
360 let target = target.clone();
361 let hopr = hopr.clone();
362 let active_sessions = active_sessions_clone_2.clone();
363
364 let maybe_pooled_session = accepted_client.is_ok().then(|| session_pool.pop()).flatten();
366 async move {
367 match accepted_client {
368 Ok((sock_addr, mut stream)) => {
369 debug!(?sock_addr, "incoming TCP connection");
370
371 if active_sessions.len() >= max_clients {
374 error!(?bind_host, "no more client slots available at listener");
375 use tokio::io::AsyncWriteExt;
376 if let Err(error) = stream.shutdown().await {
377 error!(%error, ?sock_addr, "failed to shutdown TCP connection");
378 }
379 return;
380 }
381
382 let session = match maybe_pooled_session {
384 Some(s) => {
385 debug!(session_id = %s.id(), "using pooled session");
386 s
387 }
388 None => {
389 debug!("no more active sessions in the pool, creating a new one");
390 match hopr.connect_to(destination, target, data).await {
391 Ok(s) => s,
392 Err(error) => {
393 error!(%error, "failed to establish session");
394 return;
395 }
396 }
397 }
398 };
399
400 let session_id = *session.id();
401 debug!(?sock_addr, %session_id, "new session for incoming TCP connection");
402
403 let (abort_handle, abort_reg) = AbortHandle::new_pair();
404 active_sessions.insert(session_id, (sock_addr, abort_handle));
405
406 #[cfg(all(feature = "prometheus", not(test)))]
407 METRIC_ACTIVE_CLIENTS.increment(&["tcp"], 1.0);
408
409 hopr_async_runtime::prelude::spawn(
410 bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, Some(abort_reg)).then(
413 move |_| async move {
414 active_sessions.remove(&session_id);
417
418 debug!(%session_id, "tcp session has ended");
419
420 #[cfg(all(feature = "prometheus", not(test)))]
421 METRIC_ACTIVE_CLIENTS.decrement(&["tcp"], 1.0);
422 },
423 ),
424 );
425 }
426 Err(error) => error!(%error, "failed to accept connection"),
427 }
428 }
429 })
430 .await;
431
432 active_sessions_clone.iter().for_each(|entry| {
434 let (sock_addr, handle) = entry.value();
435 debug!(session_id = %entry.key(), ?sock_addr, "aborting opened TCP session after listener has been closed");
436 handle.abort()
437 });
438 });
439
440 open_listeners.0.insert(
441 ListenerId(hopr_network_types::types::IpProtocol::TCP, bound_host),
442 StoredSessionEntry {
443 destination,
444 target: target_spec,
445 forward_path: config.forward_path_options,
446 return_path: config.return_path_options,
447 clients: active_sessions,
448 max_client_sessions: max_clients,
449 max_surb_upstream: config
450 .surb_management
451 .map(|v| Bandwidth::from_bps(v.max_surbs_per_sec * SURB_SIZE as u64)),
452 response_buffer: config
453 .surb_management
454 .map(|v| ByteSize::b(v.target_surb_buffer_size * SURB_SIZE as u64)),
455 session_pool: Some(session_pool_size),
456 abort_handle,
457 },
458 );
459 Ok((bound_host, None, max_clients))
460}
461
462#[derive(Debug, thiserror::Error)]
463pub enum BindError {
464 #[error("conflict detected: listen host already in use")]
465 ListenHostAlreadyUsed,
466
467 #[error("unknown failure: {0}")]
468 UnknownFailure(String),
469}
470
471pub async fn create_udp_client_binding<Chain, Db>(
472 bind_host: std::net::SocketAddr,
473 port_range: Option<String>,
474 hopr: Arc<Hopr<Chain, Db>>,
475 open_listeners: Arc<ListenerJoinHandles>,
476 destination: Address,
477 target_spec: SessionTargetSpec,
478 config: SessionClientConfig,
479) -> Result<(std::net::SocketAddr, Option<SessionId>, usize), BindError>
480where
481 Chain: HoprChainApi + Clone + Send + Sync + 'static,
482 Db: HoprNodeDbApi + Clone + Send + Sync + 'static,
483{
484 let (bound_host, udp_socket) = udp_bind_to(bind_host, port_range).await.map_err(|e| {
486 if e.kind() == std::io::ErrorKind::AddrInUse {
487 BindError::ListenHostAlreadyUsed
488 } else {
489 BindError::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}"))
490 }
491 })?;
492
493 info!(%bound_host, "UDP session listener bound");
494
495 let target = target_spec
496 .clone()
497 .into_target(IpProtocol::UDP)
498 .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
499
500 let session = hopr
502 .connect_to(destination, target, config.clone())
503 .await
504 .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
505
506 let open_listeners_clone = open_listeners.clone();
507 let listener_id = ListenerId(hopr_network_types::types::IpProtocol::UDP, bound_host);
508
509 let (abort_handle, abort_reg) = AbortHandle::new_pair();
520 let clients = Arc::new(DashMap::new());
521 let max_clients: usize = 1; let session_id = *session.id();
525 clients.insert(session_id, (bind_host, abort_handle.clone()));
526 hopr_async_runtime::prelude::spawn(async move {
527 #[cfg(all(feature = "prometheus", not(test)))]
528 METRIC_ACTIVE_CLIENTS.increment(&["udp"], 1.0);
529
530 bind_session_to_stream(session, udp_socket, HOPR_UDP_BUFFER_SIZE, Some(abort_reg)).await;
531
532 #[cfg(all(feature = "prometheus", not(test)))]
533 METRIC_ACTIVE_CLIENTS.decrement(&["udp"], 1.0);
534
535 open_listeners_clone.0.remove(&listener_id);
537 });
538
539 open_listeners.0.insert(
540 listener_id,
541 StoredSessionEntry {
542 destination,
543 target: target_spec,
544 forward_path: config.forward_path_options.clone(),
545 return_path: config.return_path_options.clone(),
546 max_client_sessions: max_clients,
547 max_surb_upstream: config
548 .surb_management
549 .map(|v| Bandwidth::from_bps(v.max_surbs_per_sec * SURB_SIZE as u64)),
550 response_buffer: config
551 .surb_management
552 .map(|v| ByteSize::b(v.target_surb_buffer_size * SURB_SIZE as u64)),
553 session_pool: None,
554 abort_handle,
555 clients,
556 },
557 );
558 Ok((bound_host, Some(session_id), max_clients))
559}
560
561async fn try_restricted_bind<F, S, Fut>(
562 addrs: Vec<std::net::SocketAddr>,
563 range_str: &str,
564 binder: F,
565) -> std::io::Result<S>
566where
567 F: Fn(Vec<std::net::SocketAddr>) -> Fut,
568 Fut: Future<Output = std::io::Result<S>>,
569{
570 if addrs.is_empty() {
571 return Err(std::io::Error::other("no valid socket addresses found"));
572 }
573
574 let range = range_str
575 .split_once(":")
576 .and_then(
577 |(a, b)| match u16::from_str(a).and_then(|a| Ok((a, u16::from_str(b)?))) {
578 Ok((a, b)) if a <= b => Some(a..=b),
579 _ => None,
580 },
581 )
582 .ok_or(std::io::Error::other(format!("invalid port range {range_str}")))?;
583
584 for port in range {
585 let addrs = addrs
586 .iter()
587 .map(|addr| std::net::SocketAddr::new(addr.ip(), port))
588 .collect::<Vec<_>>();
589 match binder(addrs).await {
590 Ok(listener) => return Ok(listener),
591 Err(error) => debug!(%error, "listen address not usable"),
592 }
593 }
594
595 Err(std::io::Error::new(
596 std::io::ErrorKind::AddrNotAvailable,
597 format!("no valid socket addresses found within range: {range_str}"),
598 ))
599}
600
601async fn tcp_listen_on<A: std::net::ToSocketAddrs>(
603 address: A,
604 port_range: Option<String>,
605) -> std::io::Result<(std::net::SocketAddr, TcpListener)> {
606 let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
607
608 if addrs.iter().all(|a| a.port() == 0)
611 && let Some(range_str) = port_range
612 {
613 let tcp_listener = try_restricted_bind(
614 addrs,
615 &range_str,
616 |a| async move { TcpListener::bind(a.as_slice()).await },
617 )
618 .await?;
619 return Ok((tcp_listener.local_addr()?, tcp_listener));
620 }
621
622 let tcp_listener = TcpListener::bind(addrs.as_slice()).await?;
623 Ok((tcp_listener.local_addr()?, tcp_listener))
624}
625
626pub async fn udp_bind_to<A: std::net::ToSocketAddrs>(
627 address: A,
628 port_range: Option<String>,
629) -> std::io::Result<(std::net::SocketAddr, ConnectedUdpStream)> {
630 let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
631
632 let builder = ConnectedUdpStream::builder()
633 .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
634 .with_foreign_data_mode(ForeignDataMode::Discard) .with_queue_size(HOPR_UDP_QUEUE_SIZE)
636 .with_receiver_parallelism(
637 std::env::var("HOPRD_SESSION_ENTRY_UDP_RX_PARALLELISM")
638 .ok()
639 .and_then(|s| s.parse::<NonZeroUsize>().ok())
640 .map(UdpStreamParallelism::Specific)
641 .unwrap_or(UdpStreamParallelism::Auto),
642 );
643
644 if addrs.iter().all(|a| a.port() == 0)
647 && let Some(range_str) = port_range
648 {
649 let udp_listener = try_restricted_bind(addrs, &range_str, |addrs| {
650 futures::future::ready(builder.clone().build(addrs.as_slice()))
651 })
652 .await?;
653
654 return Ok((*udp_listener.bound_address(), udp_listener));
655 }
656
657 let udp_socket = builder.build(address)?;
658 Ok((*udp_socket.bound_address(), udp_socket))
659}
660
661async fn bind_session_to_stream<T>(
662 mut session: HoprSession,
663 mut stream: T,
664 max_buf: usize,
665 abort_reg: Option<AbortRegistration>,
666) where
667 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
668{
669 let session_id = *session.id();
670 match transfer_session(&mut session, &mut stream, max_buf, abort_reg).await {
671 Ok((session_to_stream_bytes, stream_to_session_bytes)) => info!(
672 session_id = ?session_id,
673 session_to_stream_bytes, stream_to_session_bytes, "client session ended",
674 ),
675 Err(error) => error!(
676 session_id = ?session_id,
677 %error,
678 "error during data transfer"
679 ),
680 }
681}
682
683#[cfg(test)]
684mod tests {
685 use anyhow::Context;
686 use futures::{
687 FutureExt, StreamExt,
688 channel::mpsc::{UnboundedReceiver, UnboundedSender},
689 };
690 use futures_time::future::FutureExt as TimeFutureExt;
691 use hopr_crypto_types::crypto_traits::Randomizable;
692 use hopr_lib::{
693 Address, ApplicationData, ApplicationDataIn, ApplicationDataOut, DestinationRouting, HoprPseudonym,
694 HoprSession, RoutingOptions, SessionId,
695 };
696 use tokio::io::{AsyncReadExt, AsyncWriteExt};
697
698 use super::*;
699
700 fn loopback_transport() -> (
701 UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
702 UnboundedReceiver<ApplicationDataIn>,
703 ) {
704 let (input_tx, input_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
705 let (output_tx, output_rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
706 tokio::task::spawn(
707 input_rx
708 .map(|(_, data)| {
709 Ok(ApplicationDataIn {
710 data: data.data,
711 packet_info: Default::default(),
712 })
713 })
714 .forward(output_tx)
715 .map(|e| tracing::debug!(?e, "loopback transport completed")),
716 );
717
718 (input_tx, output_rx)
719 }
720
721 #[tokio::test]
722 async fn hoprd_session_connection_should_create_a_working_tcp_socket_through_which_data_can_be_sent_and_received()
723 -> anyhow::Result<()> {
724 let session_id = SessionId::new(4567u64, HoprPseudonym::random());
725 let peer: Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
726 let session = HoprSession::new(
727 session_id,
728 DestinationRouting::forward_only(peer, RoutingOptions::IntermediatePath(Default::default())),
729 Default::default(),
730 loopback_transport(),
731 None,
732 )?;
733
734 let (bound_addr, tcp_listener) = tcp_listen_on(("127.0.0.1", 0), None)
735 .await
736 .context("listen_on failed")?;
737
738 tokio::task::spawn(async move {
739 match tcp_listener.accept().await {
740 Ok((stream, _)) => bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, None).await,
741 Err(e) => error!("failed to accept connection: {e}"),
742 }
743 });
744
745 let mut tcp_stream = tokio::net::TcpStream::connect(bound_addr)
746 .await
747 .context("connect failed")?;
748
749 let data = vec![b"hello", b"world", b"this ", b"is ", b" a", b" test"];
750
751 for d in data.clone().into_iter() {
752 tcp_stream.write_all(d).await.context("write failed")?;
753 }
754
755 for d in data.iter() {
756 let mut buf = vec![0; d.len()];
757 tcp_stream.read_exact(&mut buf).await.context("read failed")?;
758 }
759
760 Ok(())
761 }
762
763 #[test_log::test(tokio::test)]
764 async fn hoprd_session_connection_should_create_a_working_udp_socket_through_which_data_can_be_sent_and_received()
765 -> anyhow::Result<()> {
766 let session_id = SessionId::new(4567u64, HoprPseudonym::random());
767 let peer: Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
768 let session = HoprSession::new(
769 session_id,
770 DestinationRouting::forward_only(peer, RoutingOptions::IntermediatePath(Default::default())),
771 Default::default(),
772 loopback_transport(),
773 None,
774 )?;
775
776 let (listen_addr, udp_listener) = udp_bind_to(("127.0.0.1", 0), None)
777 .await
778 .context("udp_bind_to failed")?;
779
780 let (abort_handle, abort_registration) = AbortHandle::new_pair();
781 let jh = tokio::task::spawn(bind_session_to_stream(
782 session,
783 udp_listener,
784 ApplicationData::PAYLOAD_SIZE,
785 Some(abort_registration),
786 ));
787
788 let mut udp_stream = ConnectedUdpStream::builder()
789 .with_buffer_size(ApplicationData::PAYLOAD_SIZE)
790 .with_queue_size(HOPR_UDP_QUEUE_SIZE)
791 .with_counterparty(listen_addr)
792 .build(("127.0.0.1", 0))
793 .context("bind failed")?;
794
795 let data = vec![b"hello", b"world", b"this ", b"is ", b" a", b" test"];
796
797 for d in data.clone().into_iter() {
798 udp_stream.write_all(d).await.context("write failed")?;
799 }
801
802 for d in data.iter() {
803 let mut buf = vec![0; d.len()];
804 udp_stream.read_exact(&mut buf).await.context("read failed")?;
805 }
806
807 abort_handle.abort();
809 jh.timeout(futures_time::time::Duration::from_millis(200)).await??;
810
811 Ok(())
812 }
813
814 #[test]
815 fn build_binding_address() {
816 let default = "10.0.0.1:10000".parse().unwrap();
817
818 let result = build_binding_host(Some("127.0.0.1:10000"), default);
819 assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
820
821 let result = build_binding_host(None, default);
822 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
823
824 let result = build_binding_host(Some("127.0.0.1"), default);
825 assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
826
827 let result = build_binding_host(Some(":1234"), default);
828 assert_eq!(result, "10.0.0.1:1234".parse::<std::net::SocketAddr>().unwrap());
829
830 let result = build_binding_host(Some(":"), default);
831 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
832
833 let result = build_binding_host(Some(""), default);
834 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
835 }
836}