1use std::{
8 collections::{HashMap, VecDeque},
9 fmt::Formatter,
10 future::Future,
11 hash::Hash,
12 net::SocketAddr,
13 num::NonZeroUsize,
14 str::FromStr,
15 sync::Arc,
16};
17
18use async_lock::RwLock;
19use base64::Engine;
20use bytesize::ByteSize;
21use dashmap::DashMap;
22use futures::{
23 FutureExt, StreamExt, TryStreamExt,
24 future::{AbortHandle, AbortRegistration},
25};
26use hopr_network_types::{
27 prelude::{ConnectedUdpStream, IpOrHost, IpProtocol, SealedHost, UdpStreamParallelism},
28 udp::ForeignDataMode,
29};
30use hopr_transport::RoutingOptions;
31use human_bandwidth::re::bandwidth::Bandwidth;
32use serde::{Deserialize, Serialize};
33use serde_with::serde_as;
34use tokio::net::TcpListener;
35use tracing::{debug, error, info};
36
37use crate::{
38 Address, Hopr, HoprSession, HoprSessionId, SURB_SIZE, ServiceId, SessionClientConfig, SessionTarget,
39 errors::HoprLibError, transfer_session,
40};
41
42pub const HOPR_TCP_BUFFER_SIZE: usize = 4096;
44
45pub const HOPR_UDP_BUFFER_SIZE: usize = 16384;
47
48pub const HOPR_UDP_QUEUE_SIZE: usize = 8192;
50
51#[cfg(all(feature = "prometheus", not(test)))]
52lazy_static::lazy_static! {
53 static ref METRIC_ACTIVE_CLIENTS: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
54 "hopr_session_hoprd_clients",
55 "Number of clients connected at this Entry node",
56 &["type"]
57 ).unwrap();
58}
59
60#[serde_as]
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
62pub enum SessionTargetSpec {
64 Plain(String),
65 Sealed(#[serde_as(as = "serde_with::base64::Base64")] Vec<u8>),
66 Service(ServiceId),
67}
68
69impl std::fmt::Display for SessionTargetSpec {
70 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71 match self {
72 SessionTargetSpec::Plain(t) => write!(f, "{t}"),
73 SessionTargetSpec::Sealed(t) => write!(f, "$${}", base64::prelude::BASE64_URL_SAFE.encode(t)),
74 SessionTargetSpec::Service(t) => write!(f, "#{t}"),
75 }
76 }
77}
78
79impl FromStr for SessionTargetSpec {
80 type Err = HoprLibError;
81
82 fn from_str(s: &str) -> Result<Self, Self::Err> {
83 Ok(if let Some(stripped) = s.strip_prefix("$$") {
84 Self::Sealed(
85 base64::prelude::BASE64_URL_SAFE
86 .decode(stripped)
87 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
88 )
89 } else if let Some(stripped) = s.strip_prefix("#") {
90 Self::Service(
91 stripped
92 .parse()
93 .map_err(|_| HoprLibError::GeneralError("cannot parse service id".into()))?,
94 )
95 } else {
96 Self::Plain(s.to_owned())
97 })
98 }
99}
100
101impl SessionTargetSpec {
102 pub fn into_target(self, protocol: IpProtocol) -> Result<SessionTarget, HoprLibError> {
103 Ok(match (protocol, self) {
104 (IpProtocol::TCP, SessionTargetSpec::Plain(plain)) => SessionTarget::TcpStream(
105 IpOrHost::from_str(&plain)
106 .map(SealedHost::from)
107 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
108 ),
109 (IpProtocol::UDP, SessionTargetSpec::Plain(plain)) => SessionTarget::UdpStream(
110 IpOrHost::from_str(&plain)
111 .map(SealedHost::from)
112 .map_err(|e| HoprLibError::GeneralError(e.to_string()))?,
113 ),
114 (IpProtocol::TCP, SessionTargetSpec::Sealed(enc)) => {
115 SessionTarget::TcpStream(SealedHost::Sealed(enc.into_boxed_slice()))
116 }
117 (IpProtocol::UDP, SessionTargetSpec::Sealed(enc)) => {
118 SessionTarget::UdpStream(SealedHost::Sealed(enc.into_boxed_slice()))
119 }
120 (_, SessionTargetSpec::Service(id)) => SessionTarget::ExitNode(id),
121 })
122 }
123}
124
125#[derive(Debug)]
127pub struct StoredSessionEntry {
128 pub destination: Address,
130 pub target: SessionTargetSpec,
132 pub forward_path: RoutingOptions,
134 pub return_path: RoutingOptions,
136 pub max_client_sessions: usize,
138 pub max_surb_upstream: Option<human_bandwidth::re::bandwidth::Bandwidth>,
140 pub response_buffer: Option<bytesize::ByteSize>,
143 pub session_pool: Option<usize>,
145 pub abort_handle: AbortHandle,
147
148 clients: Arc<DashMap<HoprSessionId, (SocketAddr, AbortHandle)>>,
149}
150
151impl StoredSessionEntry {
152 pub fn get_clients(&self) -> &Arc<DashMap<HoprSessionId, (SocketAddr, AbortHandle)>> {
153 &self.clients
154 }
155}
156
157pub fn build_binding_host(requested: Option<&str>, default: std::net::SocketAddr) -> std::net::SocketAddr {
162 match requested.map(|r| std::net::SocketAddr::from_str(r).map_err(|_| r)) {
163 Some(Err(requested)) => {
164 debug!(requested, %default, "using partially default listen host");
166 std::net::SocketAddr::new(
167 requested.parse().unwrap_or(default.ip()),
168 requested
169 .strip_prefix(":")
170 .and_then(|p| u16::from_str(p).ok())
171 .unwrap_or(default.port()),
172 )
173 }
174 Some(Ok(requested)) => {
175 debug!(%requested, "using requested listen host");
176 requested
177 }
178 None => {
179 debug!(%default, "using default listen host");
180 default
181 }
182 }
183}
184
185#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
186pub struct ListenerId(pub IpProtocol, pub std::net::SocketAddr);
187
188pub type ListenerJoinHandles = Arc<RwLock<HashMap<ListenerId, StoredSessionEntry>>>;
189
190pub struct SessionPool {
191 pool: Option<Arc<std::sync::Mutex<VecDeque<HoprSession>>>>,
192 ah: Option<AbortHandle>,
193}
194
195impl SessionPool {
196 pub const MAX_SESSION_POOL_SIZE: usize = 5;
197
198 pub async fn new(
199 size: usize,
200 dst: Address,
201 target: SessionTarget,
202 cfg: SessionClientConfig,
203 hopr: Arc<Hopr>,
204 ) -> Result<Self, String> {
205 let pool = Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(size)));
206 let hopr_clone = hopr.clone();
207 let pool_clone = pool.clone();
208 futures::stream::iter(0..size.min(Self::MAX_SESSION_POOL_SIZE))
209 .map(Ok)
210 .try_for_each_concurrent(Self::MAX_SESSION_POOL_SIZE, move |i| {
211 let pool = pool_clone.clone();
212 let hopr = hopr_clone.clone();
213 let target = target.clone();
214 let cfg = cfg.clone();
215 async move {
216 match hopr.connect_to(dst, target.clone(), cfg.clone()).await {
217 Ok(s) => {
218 debug!(session_id = %s.id(), num_session = i, "created a new session in pool");
219 pool.lock().map_err(|_| "lock failed".to_string())?.push_back(s);
220 Ok(())
221 }
222 Err(error) => {
223 error!(%error, num_session = i, "failed to establish session for pool");
224 Err(format!("failed to establish session #{i} in pool to {dst}: {error}"))
225 }
226 }
227 }
228 })
229 .await?;
230
231 if !pool.lock().map(|p| p.is_empty()).unwrap_or(true) {
233 let pool_clone_1 = pool.clone();
234 let pool_clone_2 = pool.clone();
235 let pool_clone_3 = pool.clone();
236 Ok(Self {
237 pool: Some(pool),
238 ah: Some(hopr_async_runtime::spawn_as_abortable!(
239 futures_time::stream::interval(futures_time::time::Duration::from(
240 std::time::Duration::from_secs(1).max(hopr.config().session.idle_timeout / 2)
241 ))
242 .take_while(move |_| {
243 futures::future::ready(pool_clone_1.lock().is_ok_and(|p| !p.is_empty()))
245 })
246 .flat_map(move |_| {
247 let ids = pool_clone_2.lock().ok().map(|v| v.iter().map(|s| *s.id()).collect::<Vec<_>>());
249 futures::stream::iter(ids.into_iter().flatten())
250 })
251 .for_each(move |id| {
252 let hopr = hopr.clone();
253 let pool = pool_clone_3.clone();
254 async move {
255 if let Err(error) = hopr.keep_alive_session(&id).await {
257 error!(%error, %dst, session_id = %id, "session in pool is not alive, removing from pool");
258 if let Ok(mut pool) = pool.lock() {
259 pool.retain(|s| *s.id() != id);
260 }
261 }
262 }
263 })
264 ))
265 })
266 } else {
267 Ok(Self { pool: None, ah: None })
268 }
269 }
270
271 pub fn pop(&mut self) -> Option<HoprSession> {
272 self.pool.as_ref().and_then(|pool| pool.lock().ok()?.pop_front())
273 }
274}
275
276impl Drop for SessionPool {
277 fn drop(&mut self) {
278 if let Some(ah) = self.ah.take() {
279 ah.abort();
280 }
281 }
282}
283
284#[allow(clippy::too_many_arguments)]
285pub async fn create_tcp_client_binding(
286 bind_host: std::net::SocketAddr,
287 port_range: Option<String>,
288 hopr: Arc<Hopr>,
289 open_listeners: ListenerJoinHandles,
290 destination: Address,
291 target_spec: SessionTargetSpec,
292 config: SessionClientConfig,
293 use_session_pool: Option<usize>,
294 max_client_sessions: Option<usize>,
295) -> Result<(std::net::SocketAddr, Option<HoprSessionId>, usize), BindError> {
296 let (bound_host, tcp_listener) = tcp_listen_on(bind_host, port_range).await.map_err(|e| {
298 if e.kind() == std::io::ErrorKind::AddrInUse {
299 BindError::ListenHostAlreadyUsed
300 } else {
301 BindError::UnknownFailure(format!("failed to start TCP listener on {bind_host}: {e}"))
302 }
303 })?;
304 info!(%bound_host, "TCP session listener bound");
305
306 let target = target_spec
309 .clone()
310 .into_target(IpProtocol::TCP)
311 .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
312
313 let session_pool_size = use_session_pool.unwrap_or(0);
315 let mut session_pool = SessionPool::new(
316 session_pool_size,
317 destination,
318 target.clone(),
319 config.clone(),
320 hopr.clone(),
321 )
322 .await
323 .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
324
325 let active_sessions = Arc::new(DashMap::new());
326 let mut max_clients = max_client_sessions.unwrap_or(5).max(1);
327
328 if max_clients < session_pool_size {
329 max_clients = session_pool_size;
330 }
331
332 let config_clone = config.clone();
333 let (abort_handle, abort_reg) = AbortHandle::new_pair();
335 let active_sessions_clone = active_sessions.clone();
336 hopr_async_runtime::prelude::spawn(async move {
337 let active_sessions_clone_2 = active_sessions_clone.clone();
338
339 futures::stream::Abortable::new(tokio_stream::wrappers::TcpListenerStream::new(tcp_listener), abort_reg)
340 .and_then(|sock| async { Ok((sock.peer_addr()?, sock)) })
341 .for_each(move |accepted_client| {
342 let data = config_clone.clone();
343 let target = target.clone();
344 let hopr = hopr.clone();
345 let active_sessions = active_sessions_clone_2.clone();
346
347 let maybe_pooled_session = accepted_client.is_ok().then(|| session_pool.pop()).flatten();
349 async move {
350 match accepted_client {
351 Ok((sock_addr, mut stream)) => {
352 debug!(?sock_addr, "incoming TCP connection");
353
354 if active_sessions.len() >= max_clients {
357 error!(?bind_host, "no more client slots available at listener");
358 use tokio::io::AsyncWriteExt;
359 if let Err(error) = stream.shutdown().await {
360 error!(%error, ?sock_addr, "failed to shutdown TCP connection");
361 }
362 return;
363 }
364
365 let session = match maybe_pooled_session {
367 Some(s) => {
368 debug!(session_id = %s.id(), "using pooled session");
369 s
370 }
371 None => {
372 debug!("no more active sessions in the pool, creating a new one");
373 match hopr.connect_to(destination, target, data).await {
374 Ok(s) => s,
375 Err(error) => {
376 error!(%error, "failed to establish session");
377 return;
378 }
379 }
380 }
381 };
382
383 let session_id = *session.id();
384 debug!(?sock_addr, %session_id, "new session for incoming TCP connection");
385
386 let (abort_handle, abort_reg) = AbortHandle::new_pair();
387 active_sessions.insert(session_id, (sock_addr, abort_handle));
388
389 #[cfg(all(feature = "prometheus", not(test)))]
390 METRIC_ACTIVE_CLIENTS.increment(&["tcp"], 1.0);
391
392 hopr_async_runtime::prelude::spawn(
393 bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, Some(abort_reg)).then(
396 move |_| async move {
397 active_sessions.remove(&session_id);
400
401 debug!(%session_id, "tcp session has ended");
402
403 #[cfg(all(feature = "prometheus", not(test)))]
404 METRIC_ACTIVE_CLIENTS.decrement(&["tcp"], 1.0);
405 },
406 ),
407 );
408 }
409 Err(error) => error!(%error, "failed to accept connection"),
410 }
411 }
412 })
413 .await;
414
415 active_sessions_clone.iter().for_each(|entry| {
417 let (sock_addr, handle) = entry.value();
418 debug!(session_id = %entry.key(), ?sock_addr, "aborting opened TCP session after listener has been closed");
419 handle.abort()
420 });
421 });
422
423 open_listeners.write_arc().await.insert(
424 ListenerId(hopr_network_types::types::IpProtocol::TCP, bound_host),
425 StoredSessionEntry {
426 destination,
427 target: target_spec,
428 forward_path: config.forward_path_options,
429 return_path: config.return_path_options,
430 clients: active_sessions,
431 max_client_sessions: max_clients,
432 max_surb_upstream: config
433 .surb_management
434 .map(|v| Bandwidth::from_bps(v.max_surbs_per_sec * SURB_SIZE as u64)),
435 response_buffer: config
436 .surb_management
437 .map(|v| ByteSize::b(v.target_surb_buffer_size * SURB_SIZE as u64)),
438 session_pool: Some(session_pool_size),
439 abort_handle,
440 },
441 );
442 Ok((bound_host, None, max_clients))
443}
444
445#[derive(Debug, thiserror::Error)]
446pub enum BindError {
447 #[error("conflict detected: listen host already in use")]
448 ListenHostAlreadyUsed,
449
450 #[error("unknown failure: {0}")]
451 UnknownFailure(String),
452}
453
454pub async fn create_udp_client_binding(
455 bind_host: std::net::SocketAddr,
456 port_range: Option<String>,
457 hopr: Arc<Hopr>,
458 open_listeners: ListenerJoinHandles,
459 destination: Address,
460 target_spec: SessionTargetSpec,
461 config: SessionClientConfig,
462) -> Result<(std::net::SocketAddr, Option<HoprSessionId>, usize), BindError> {
463 let (bound_host, udp_socket) = udp_bind_to(bind_host, port_range).await.map_err(|e| {
465 if e.kind() == std::io::ErrorKind::AddrInUse {
466 BindError::ListenHostAlreadyUsed
467 } else {
468 BindError::UnknownFailure(format!("failed to start UDP listener on {bind_host}: {e}"))
469 }
470 })?;
471
472 info!(%bound_host, "UDP session listener bound");
473
474 let target = target_spec
475 .clone()
476 .into_target(IpProtocol::UDP)
477 .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
478
479 let session = hopr
481 .connect_to(destination, target, config.clone())
482 .await
483 .map_err(|e| BindError::UnknownFailure(e.to_string()))?;
484
485 let open_listeners_clone = open_listeners.clone();
486 let listener_id = ListenerId(hopr_network_types::types::IpProtocol::UDP, bound_host);
487
488 let (abort_handle, abort_reg) = AbortHandle::new_pair();
499 let clients = Arc::new(DashMap::new());
500 let max_clients: usize = 1; let session_id = *session.id();
504 clients.insert(session_id, (bind_host, abort_handle.clone()));
505 hopr_async_runtime::prelude::spawn(async move {
506 #[cfg(all(feature = "prometheus", not(test)))]
507 METRIC_ACTIVE_CLIENTS.increment(&["udp"], 1.0);
508
509 bind_session_to_stream(session, udp_socket, HOPR_UDP_BUFFER_SIZE, Some(abort_reg)).await;
510
511 #[cfg(all(feature = "prometheus", not(test)))]
512 METRIC_ACTIVE_CLIENTS.decrement(&["udp"], 1.0);
513
514 open_listeners_clone.write_arc().await.remove(&listener_id);
516 });
517
518 open_listeners.write_arc().await.insert(
519 listener_id,
520 StoredSessionEntry {
521 destination,
522 target: target_spec,
523 forward_path: config.forward_path_options.clone(),
524 return_path: config.return_path_options.clone(),
525 max_client_sessions: max_clients,
526 max_surb_upstream: config
527 .surb_management
528 .map(|v| Bandwidth::from_bps(v.max_surbs_per_sec * SURB_SIZE as u64)),
529 response_buffer: config
530 .surb_management
531 .map(|v| ByteSize::b(v.target_surb_buffer_size * SURB_SIZE as u64)),
532 session_pool: None,
533 abort_handle,
534 clients,
535 },
536 );
537 Ok((bound_host, Some(session_id), max_clients))
538}
539
540async fn try_restricted_bind<F, S, Fut>(
541 addrs: Vec<std::net::SocketAddr>,
542 range_str: &str,
543 binder: F,
544) -> std::io::Result<S>
545where
546 F: Fn(Vec<std::net::SocketAddr>) -> Fut,
547 Fut: Future<Output = std::io::Result<S>>,
548{
549 if addrs.is_empty() {
550 return Err(std::io::Error::other("no valid socket addresses found"));
551 }
552
553 let range = range_str
554 .split_once(":")
555 .and_then(
556 |(a, b)| match u16::from_str(a).and_then(|a| Ok((a, u16::from_str(b)?))) {
557 Ok((a, b)) if a <= b => Some(a..=b),
558 _ => None,
559 },
560 )
561 .ok_or(std::io::Error::other(format!("invalid port range {range_str}")))?;
562
563 for port in range {
564 let addrs = addrs
565 .iter()
566 .map(|addr| std::net::SocketAddr::new(addr.ip(), port))
567 .collect::<Vec<_>>();
568 match binder(addrs).await {
569 Ok(listener) => return Ok(listener),
570 Err(error) => debug!(%error, "listen address not usable"),
571 }
572 }
573
574 Err(std::io::Error::new(
575 std::io::ErrorKind::AddrNotAvailable,
576 format!("no valid socket addresses found within range: {range_str}"),
577 ))
578}
579
580async fn tcp_listen_on<A: std::net::ToSocketAddrs>(
582 address: A,
583 port_range: Option<String>,
584) -> std::io::Result<(std::net::SocketAddr, TcpListener)> {
585 let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
586
587 if addrs.iter().all(|a| a.port() == 0) {
590 if let Some(range_str) = port_range {
591 let tcp_listener =
592 try_restricted_bind(
593 addrs,
594 &range_str,
595 |a| async move { TcpListener::bind(a.as_slice()).await },
596 )
597 .await?;
598 return Ok((tcp_listener.local_addr()?, tcp_listener));
599 }
600 }
601
602 let tcp_listener = TcpListener::bind(addrs.as_slice()).await?;
603 Ok((tcp_listener.local_addr()?, tcp_listener))
604}
605
606pub async fn udp_bind_to<A: std::net::ToSocketAddrs>(
607 address: A,
608 port_range: Option<String>,
609) -> std::io::Result<(std::net::SocketAddr, ConnectedUdpStream)> {
610 let addrs = address.to_socket_addrs()?.collect::<Vec<_>>();
611
612 let builder = ConnectedUdpStream::builder()
613 .with_buffer_size(HOPR_UDP_BUFFER_SIZE)
614 .with_foreign_data_mode(ForeignDataMode::Discard) .with_queue_size(HOPR_UDP_QUEUE_SIZE)
616 .with_receiver_parallelism(
617 std::env::var("HOPRD_SESSION_ENTRY_UDP_RX_PARALLELISM")
618 .ok()
619 .and_then(|s| s.parse::<NonZeroUsize>().ok())
620 .map(UdpStreamParallelism::Specific)
621 .unwrap_or(UdpStreamParallelism::Auto),
622 );
623
624 if addrs.iter().all(|a| a.port() == 0) {
627 if let Some(range_str) = port_range {
628 let udp_listener = try_restricted_bind(addrs, &range_str, |addrs| {
629 futures::future::ready(builder.clone().build(addrs.as_slice()))
630 })
631 .await?;
632
633 return Ok((*udp_listener.bound_address(), udp_listener));
634 }
635 }
636
637 let udp_socket = builder.build(address)?;
638 Ok((*udp_socket.bound_address(), udp_socket))
639}
640
641async fn bind_session_to_stream<T>(
642 mut session: HoprSession,
643 mut stream: T,
644 max_buf: usize,
645 abort_reg: Option<AbortRegistration>,
646) where
647 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
648{
649 let session_id = *session.id();
650 match transfer_session(&mut session, &mut stream, max_buf, abort_reg).await {
651 Ok((session_to_stream_bytes, stream_to_session_bytes)) => info!(
652 session_id = ?session_id,
653 session_to_stream_bytes, stream_to_session_bytes, "client session ended",
654 ),
655 Err(error) => error!(
656 session_id = ?session_id,
657 %error,
658 "error during data transfer"
659 ),
660 }
661}
662
663#[cfg(test)]
664mod tests {
665 use anyhow::Context;
666 use futures::{
667 FutureExt, StreamExt,
668 channel::mpsc::{UnboundedReceiver, UnboundedSender},
669 };
670 use futures_time::future::FutureExt as TimeFutureExt;
671 use hopr_crypto_types::crypto_traits::Randomizable;
672 use tokio::io::{AsyncReadExt, AsyncWriteExt};
673
674 use super::*;
675 use crate::{
676 Address, ApplicationData, ApplicationDataIn, ApplicationDataOut, DestinationRouting, HoprPseudonym,
677 HoprSession, HoprSessionId, RoutingOptions,
678 };
679
680 fn loopback_transport() -> (
681 UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
682 UnboundedReceiver<ApplicationDataIn>,
683 ) {
684 let (input_tx, input_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
685 let (output_tx, output_rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
686 tokio::task::spawn(
687 input_rx
688 .map(|(_, data)| {
689 Ok(ApplicationDataIn {
690 data: data.data,
691 packet_info: Default::default(),
692 })
693 })
694 .forward(output_tx)
695 .map(|e| tracing::debug!(?e, "loopback transport completed")),
696 );
697
698 (input_tx, output_rx)
699 }
700
701 #[tokio::test]
702 async fn hoprd_session_connection_should_create_a_working_tcp_socket_through_which_data_can_be_sent_and_received()
703 -> anyhow::Result<()> {
704 let session_id = HoprSessionId::new(4567u64, HoprPseudonym::random());
705 let peer: Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
706 let session = HoprSession::new(
707 session_id,
708 DestinationRouting::forward_only(peer, RoutingOptions::IntermediatePath(Default::default())),
709 Default::default(),
710 loopback_transport(),
711 None,
712 )?;
713
714 let (bound_addr, tcp_listener) = tcp_listen_on(("127.0.0.1", 0), None)
715 .await
716 .context("listen_on failed")?;
717
718 tokio::task::spawn(async move {
719 match tcp_listener.accept().await {
720 Ok((stream, _)) => bind_session_to_stream(session, stream, HOPR_TCP_BUFFER_SIZE, None).await,
721 Err(e) => error!("failed to accept connection: {e}"),
722 }
723 });
724
725 let mut tcp_stream = tokio::net::TcpStream::connect(bound_addr)
726 .await
727 .context("connect failed")?;
728
729 let data = vec![b"hello", b"world", b"this ", b"is ", b" a", b" test"];
730
731 for d in data.clone().into_iter() {
732 tcp_stream.write_all(d).await.context("write failed")?;
733 }
734
735 for d in data.iter() {
736 let mut buf = vec![0; d.len()];
737 tcp_stream.read_exact(&mut buf).await.context("read failed")?;
738 }
739
740 Ok(())
741 }
742
743 #[test_log::test(tokio::test)]
744 async fn hoprd_session_connection_should_create_a_working_udp_socket_through_which_data_can_be_sent_and_received()
745 -> anyhow::Result<()> {
746 let session_id = HoprSessionId::new(4567u64, HoprPseudonym::random());
747 let peer: Address = "0x5112D584a1C72Fc250176B57aEba5fFbbB287D8F".parse()?;
748 let session = HoprSession::new(
749 session_id,
750 DestinationRouting::forward_only(peer, RoutingOptions::IntermediatePath(Default::default())),
751 Default::default(),
752 loopback_transport(),
753 None,
754 )?;
755
756 let (listen_addr, udp_listener) = udp_bind_to(("127.0.0.1", 0), None)
757 .await
758 .context("udp_bind_to failed")?;
759
760 let (abort_handle, abort_registration) = AbortHandle::new_pair();
761 let jh = tokio::task::spawn(bind_session_to_stream(
762 session,
763 udp_listener,
764 ApplicationData::PAYLOAD_SIZE,
765 Some(abort_registration),
766 ));
767
768 let mut udp_stream = ConnectedUdpStream::builder()
769 .with_buffer_size(ApplicationData::PAYLOAD_SIZE)
770 .with_queue_size(HOPR_UDP_QUEUE_SIZE)
771 .with_counterparty(listen_addr)
772 .build(("127.0.0.1", 0))
773 .context("bind failed")?;
774
775 let data = vec![b"hello", b"world", b"this ", b"is ", b" a", b" test"];
776
777 for d in data.clone().into_iter() {
778 udp_stream.write_all(d).await.context("write failed")?;
779 }
781
782 for d in data.iter() {
783 let mut buf = vec![0; d.len()];
784 udp_stream.read_exact(&mut buf).await.context("read failed")?;
785 }
786
787 abort_handle.abort();
789 jh.timeout(futures_time::time::Duration::from_millis(200)).await??;
790
791 Ok(())
792 }
793
794 #[test]
795 fn build_binding_address() {
796 let default = "10.0.0.1:10000".parse().unwrap();
797
798 let result = build_binding_host(Some("127.0.0.1:10000"), default);
799 assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
800
801 let result = build_binding_host(None, default);
802 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
803
804 let result = build_binding_host(Some("127.0.0.1"), default);
805 assert_eq!(result, "127.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
806
807 let result = build_binding_host(Some(":1234"), default);
808 assert_eq!(result, "10.0.0.1:1234".parse::<std::net::SocketAddr>().unwrap());
809
810 let result = build_binding_host(Some(":"), default);
811 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
812
813 let result = build_binding_host(Some(""), default);
814 assert_eq!(result, "10.0.0.1:10000".parse::<std::net::SocketAddr>().unwrap());
815 }
816}