1use std::{
2 fmt::{Debug, Display, Formatter},
3 hash::{Hash, Hasher},
4 pin::Pin,
5 str::FromStr,
6 task::{Context, Poll},
7 time::Duration,
8};
9
10use futures::{SinkExt, StreamExt, TryStreamExt};
11use hopr_internal_types::prelude::HoprPseudonym;
12use hopr_network_types::{
13 prelude::{DestinationRouting, SealedHost},
14 utils::{AsyncWriteSink, DuplexIO},
15};
16use hopr_primitive_types::{
17 errors::GeneralError,
18 prelude::{BytesRepresentable, ToHex},
19};
20use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
21use hopr_protocol_session::{
22 AcknowledgementMode, AcknowledgementState, AcknowledgementStateConfig, ReliableSocket, SessionSocketConfig,
23 UnreliableSocket,
24};
25use hopr_protocol_start::StartProtocol;
26use tracing::{debug, instrument};
27
28use crate::{Capabilities, Capability, errors::TransportSessionError};
29
30#[derive(Clone, Copy, Debug, PartialEq, Eq)]
32pub struct ByteCapabilities(pub Capabilities);
33
34impl TryFrom<u8> for ByteCapabilities {
35 type Error = GeneralError;
36
37 fn try_from(value: u8) -> Result<Self, Self::Error> {
38 Capabilities::new(value)
39 .map(Self)
40 .map_err(|_| GeneralError::ParseError("capabilities".into()))
41 }
42}
43
44impl From<ByteCapabilities> for u8 {
45 fn from(value: ByteCapabilities) -> Self {
46 *value.0.as_ref()
47 }
48}
49
50impl From<ByteCapabilities> for Capabilities {
51 fn from(value: ByteCapabilities) -> Self {
52 value.0
53 }
54}
55
56impl From<Capabilities> for ByteCapabilities {
57 fn from(value: Capabilities) -> Self {
58 Self(value)
59 }
60}
61
62impl AsRef<Capabilities> for ByteCapabilities {
63 fn as_ref(&self) -> &Capabilities {
64 &self.0
65 }
66}
67
68pub type HoprStartProtocol = StartProtocol<SessionId, SessionTarget, ByteCapabilities>;
70
71const fn max_decimal_digits_for_n_bytes(n: usize) -> usize {
76 const LOG10_2_SCALED: u64 = 301030;
78 const SCALE: u64 = 1_000_000;
79
80 let scaled = 8 * n as u64 * LOG10_2_SCALED;
82
83 scaled.div_ceil(SCALE) as usize
84}
85
86const MAX_SESSION_ID_STR_LEN: usize = 2 + 2 * HoprPseudonym::SIZE + 1 + max_decimal_digits_for_n_bytes(Tag::SIZE);
88
89#[derive(Clone, Copy)]
95pub struct SessionId {
96 tag: Tag,
97 pseudonym: HoprPseudonym,
98 cached: arrayvec::ArrayString<MAX_SESSION_ID_STR_LEN>,
104}
105
106impl SessionId {
107 const DELIMITER: char = ':';
108
109 pub fn new<T: Into<Tag>>(tag: T, pseudonym: HoprPseudonym) -> Self {
110 let tag = tag.into();
111 let mut cached = format!("{pseudonym}{}{tag}", Self::DELIMITER);
112 cached.truncate(MAX_SESSION_ID_STR_LEN);
113
114 Self {
115 tag,
116 pseudonym,
117 cached: cached.parse().expect("cannot fail due to truncation"),
118 }
119 }
120
121 pub fn tag(&self) -> Tag {
122 self.tag
123 }
124
125 pub fn pseudonym(&self) -> &HoprPseudonym {
126 &self.pseudonym
127 }
128
129 pub fn as_str(&self) -> &str {
130 &self.cached
131 }
132}
133
134impl FromStr for SessionId {
135 type Err = TransportSessionError;
136
137 fn from_str(s: &str) -> Result<Self, Self::Err> {
138 s.split_once(Self::DELIMITER)
139 .ok_or(TransportSessionError::InvalidSessionId)
140 .and_then(
141 |(pseudonym, tag)| match (HoprPseudonym::from_hex(pseudonym), Tag::from_str(tag)) {
142 (Ok(p), Ok(t)) => Ok(Self::new(t, p)),
143 _ => Err(TransportSessionError::InvalidSessionId),
144 },
145 )
146 }
147}
148
149impl serde::Serialize for SessionId {
150 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
151 where
152 S: serde::Serializer,
153 {
154 use serde::ser::SerializeStruct;
155 let mut state = serializer.serialize_struct("SessionId", 2)?;
156 state.serialize_field("tag", &self.tag)?;
157 state.serialize_field("pseudonym", &self.pseudonym)?;
158 state.end()
159 }
160}
161
162impl<'de> serde::Deserialize<'de> for SessionId {
163 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
164 where
165 D: serde::Deserializer<'de>,
166 {
167 use serde::de;
168
169 #[derive(serde::Deserialize)]
170 #[serde(field_identifier, rename_all = "lowercase")]
171 enum Field {
172 Tag,
173 Pseudonym,
174 }
175
176 struct SessionIdVisitor;
177
178 impl<'de> de::Visitor<'de> for SessionIdVisitor {
179 type Value = SessionId;
180
181 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
182 formatter.write_str("struct SessionId")
183 }
184
185 fn visit_seq<A>(self, mut seq: A) -> Result<SessionId, A::Error>
186 where
187 A: de::SeqAccess<'de>,
188 {
189 Ok(SessionId::new(
190 seq.next_element::<Tag>()?
191 .ok_or_else(|| de::Error::invalid_length(0, &self))?,
192 seq.next_element()?.ok_or_else(|| de::Error::invalid_length(1, &self))?,
193 ))
194 }
195
196 fn visit_map<V>(self, mut map: V) -> Result<SessionId, V::Error>
197 where
198 V: de::MapAccess<'de>,
199 {
200 let mut tag: Option<Tag> = None;
201 let mut pseudonym: Option<HoprPseudonym> = None;
202 while let Some(key) = map.next_key()? {
203 match key {
204 Field::Tag => {
205 if tag.is_some() {
206 return Err(de::Error::duplicate_field("tag"));
207 }
208 tag = Some(map.next_value()?);
209 }
210 Field::Pseudonym => {
211 if pseudonym.is_some() {
212 return Err(de::Error::duplicate_field("pseudonym"));
213 }
214 pseudonym = Some(map.next_value()?);
215 }
216 }
217 }
218
219 Ok(SessionId::new(
220 tag.ok_or_else(|| de::Error::missing_field("tag"))?,
221 pseudonym.ok_or_else(|| de::Error::missing_field("pseudonym"))?,
222 ))
223 }
224 }
225
226 const FIELDS: &[&str] = &["tag", "pseudonym"];
227 deserializer.deserialize_struct("SessionId", FIELDS, SessionIdVisitor)
228 }
229}
230
231impl Display for SessionId {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 write!(f, "{}", self.as_str())
234 }
235}
236
237impl Debug for SessionId {
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 write!(f, "{}", self.as_str())
240 }
241}
242
243impl PartialEq for SessionId {
244 fn eq(&self, other: &Self) -> bool {
245 self.tag == other.tag && self.pseudonym == other.pseudonym
246 }
247}
248
249impl Eq for SessionId {}
250
251impl Hash for SessionId {
252 fn hash<H: Hasher>(&self, state: &mut H) {
253 self.tag.hash(state);
254 self.pseudonym.hash(state);
255 }
256}
257
258pub(crate) fn caps_to_ack_mode(caps: Capabilities) -> AcknowledgementMode {
259 if caps.contains(Capability::RetransmissionAck | Capability::RetransmissionNack) {
260 AcknowledgementMode::Both
261 } else if caps.contains(Capability::RetransmissionAck) {
262 AcknowledgementMode::Full
263 } else {
264 AcknowledgementMode::Partial
265 }
266}
267
268#[derive(Clone, Copy, Debug, PartialEq, Eq, strum::Display)]
270pub enum ClosureReason {
271 WriteClosed,
273 EmptyRead,
275 Eviction,
277}
278
279trait AsyncReadWrite: futures::AsyncWrite + futures::AsyncRead + Send + Unpin {}
281impl<T: futures::AsyncWrite + futures::AsyncRead + Send + Unpin> AsyncReadWrite for T {}
282
283pub type ServiceId = u32;
289
290#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
293pub enum SessionTarget {
294 UdpStream(SealedHost),
296 TcpStream(SealedHost),
298 ExitNode(ServiceId),
300}
301
302#[derive(Debug)]
305pub struct IncomingSession {
306 pub session: HoprSession,
308 pub target: SessionTarget,
310}
311
312#[derive(Copy, Clone, Debug, PartialEq, Eq, smart_default::SmartDefault)]
314pub struct HoprSessionConfig {
315 #[default(Capabilities::empty())]
319 pub capabilities: Capabilities,
320 #[default(1500)]
324 pub frame_mtu: usize,
325 #[default(Duration::from_millis(800))]
329 pub frame_timeout: Duration,
330}
331
332#[pin_project::pin_project]
337pub struct HoprSession {
338 id: SessionId,
339 #[pin]
340 inner: Box<dyn AsyncReadWrite>,
341 routing: DestinationRouting,
342 cfg: HoprSessionConfig,
343 on_close: Option<Box<dyn FnOnce(SessionId, ClosureReason) + Send + Sync>>,
344 #[cfg(feature = "telemetry")]
345 metrics: std::sync::Arc<crate::telemetry::SessionTelemetry>,
346}
347
348pub(crate) const SESSION_SOCKET_CAPACITY: usize = 16384;
349
350impl HoprSession {
351 #[tracing::instrument(skip_all, fields(id, routing, cfg, session_id = %id))]
359 pub fn new<Tx, Rx>(
360 id: SessionId,
361 routing: DestinationRouting,
362 cfg: HoprSessionConfig,
363 hopr: (Tx, Rx),
364 on_close: Option<Box<dyn FnOnce(SessionId, ClosureReason) + Send + Sync>>,
365 #[cfg(feature = "telemetry")] metrics: std::sync::Arc<crate::telemetry::SessionTelemetry>,
366 ) -> Result<Self, TransportSessionError>
367 where
368 Tx: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Send + Sync + Unpin + 'static,
369 Rx: futures::Stream<Item = ApplicationDataIn> + Send + Sync + Unpin + 'static,
370 Tx::Error: std::error::Error + Send + Sync,
371 {
372 let routing_clone = routing.clone();
373
374 #[cfg(feature = "telemetry")]
375 let (metrics_write, metrics_read) = (metrics.clone(), metrics.clone());
376
377 let transport = DuplexIO(
379 AsyncWriteSink::<{ ApplicationData::PAYLOAD_SIZE }, _>(hopr.0.sink_map_err(std::io::Error::other).with(
380 move |buf: Box<[u8]>| {
381 #[cfg(feature = "telemetry")]
382 metrics_write.record_write(buf.len());
383 futures::future::ready(
386 ApplicationData::new(id.tag(), buf.into_vec())
387 .map(|data| (routing_clone.clone(), ApplicationDataOut::with_no_packet_info(data)))
388 .map_err(std::io::Error::other),
389 )
390 },
391 )),
392 hopr.1
395 .map(move |data| {
396 #[cfg(feature = "telemetry")]
397 metrics_read.record_read(data.data.plain_text.len());
398 Ok::<_, std::io::Error>(data.data.plain_text)
399 })
400 .into_async_read(),
401 );
402
403 let inner: Box<dyn AsyncReadWrite> = if cfg.capabilities.contains(Capability::Segmentation) {
405 let socket_cfg = SessionSocketConfig {
406 frame_size: cfg.frame_mtu,
407 frame_timeout: cfg.frame_timeout,
408 capacity: SESSION_SOCKET_CAPACITY,
409 flush_immediately: cfg.capabilities.contains(Capability::NoDelay),
410 ..Default::default()
411 };
412
413 if cfg.capabilities.contains(Capability::RetransmissionAck)
416 || cfg.capabilities.contains(Capability::RetransmissionNack)
417 {
418 let ack_cfg = AcknowledgementStateConfig {
420 expected_packet_latency: Duration::from_millis(200),
425 mode: caps_to_ack_mode(cfg.capabilities),
426 backoff_base: 0.2,
427 max_incoming_frame_retries: 1,
428 max_outgoing_frame_retries: 2,
429 ..Default::default()
430 };
431
432 debug!(?socket_cfg, ?ack_cfg, "opening new stateful session socket");
433
434 Box::new(ReliableSocket::new(
435 transport,
436 AcknowledgementState::<{ ApplicationData::PAYLOAD_SIZE }>::new(id, ack_cfg),
437 socket_cfg,
438 #[cfg(feature = "telemetry")]
439 metrics.clone(),
440 )?)
441 } else {
442 debug!(?socket_cfg, "opening new stateless session socket");
443
444 Box::new(UnreliableSocket::<{ ApplicationData::PAYLOAD_SIZE }>::new_stateless(
445 id,
446 transport,
447 socket_cfg,
448 #[cfg(feature = "telemetry")]
449 metrics.clone(),
450 )?)
451 }
452 } else {
453 debug!("opening raw session socket");
454 Box::new(transport)
455 };
456
457 Ok(Self {
458 id,
459 inner,
460 routing,
461 cfg,
462 on_close,
463 #[cfg(feature = "telemetry")]
464 metrics,
465 })
466 }
467
468 pub fn id(&self) -> &SessionId {
470 &self.id
471 }
472
473 pub fn routing(&self) -> &DestinationRouting {
475 &self.routing
476 }
477
478 pub fn config(&self) -> &HoprSessionConfig {
480 &self.cfg
481 }
482
483 #[cfg(feature = "telemetry")]
484 pub fn metrics(&self) -> &std::sync::Arc<crate::telemetry::SessionTelemetry> {
485 &self.metrics
486 }
487}
488
489impl std::fmt::Debug for HoprSession {
490 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
491 f.debug_struct("Session")
492 .field("id", &self.id)
493 .field("routing", &self.routing)
494 .finish_non_exhaustive()
495 }
496}
497
498impl futures::AsyncRead for HoprSession {
499 #[instrument(name = "Session::poll_read", level = "trace", skip_all, fields(session_id = %self.id), ret)]
500 fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
501 let this = self.project();
502 let read = futures::ready!(this.inner.poll_read(cx, buf))?;
503 if read == 0 {
504 tracing::trace!("hopr session empty read");
505 if let Some(notifier) = this.on_close.take() {
507 tracing::trace!("notifying read half closure of session");
508 notifier(*this.id, ClosureReason::EmptyRead);
509 }
510 }
511 Poll::Ready(Ok(read))
512 }
513}
514
515impl futures::AsyncWrite for HoprSession {
516 #[instrument(name = "Session::poll_write", level = "trace", skip_all, fields(session_id = %self.id), ret)]
517 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
518 self.project().inner.poll_write(cx, buf)
519 }
520
521 #[instrument(name = "Session::poll_flush", level = "trace", skip_all, fields(session_id = %self.id), ret)]
522 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
523 self.project().inner.poll_flush(cx)
524 }
525
526 #[instrument(name = "Session::poll_close", level = "trace", skip_all, fields(session_id = %self.id), ret)]
527 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
528 let this = self.project();
529 futures::ready!(this.inner.poll_close(cx))?;
530 tracing::trace!("hopr session closed");
531
532 #[cfg(feature = "telemetry")]
533 this.metrics.set_state(crate::telemetry::SessionLifecycleState::Closing);
534
535 if let Some(notifier) = this.on_close.take() {
536 tracing::trace!("notifying write half closure of session");
537 notifier(*this.id, ClosureReason::WriteClosed);
538 }
539
540 Poll::Ready(Ok(()))
541 }
542}
543
544#[cfg(feature = "runtime-tokio")]
545impl tokio::io::AsyncRead for HoprSession {
546 fn poll_read(
547 mut self: Pin<&mut Self>,
548 cx: &mut Context<'_>,
549 buf: &mut tokio::io::ReadBuf<'_>,
550 ) -> Poll<std::io::Result<()>> {
551 let slice = buf.initialize_unfilled();
552 let n = std::task::ready!(futures::AsyncRead::poll_read(self.as_mut(), cx, slice))?;
553 buf.advance(n);
554 Poll::Ready(Ok(()))
555 }
556}
557
558#[cfg(feature = "runtime-tokio")]
559impl tokio::io::AsyncWrite for HoprSession {
560 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, std::io::Error>> {
561 futures::AsyncWrite::poll_write(self.as_mut(), cx, buf)
562 }
563
564 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
565 futures::AsyncWrite::poll_flush(self.as_mut(), cx)
566 }
567
568 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
569 futures::AsyncWrite::poll_close(self.as_mut(), cx)
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 #[cfg(feature = "telemetry")]
576 use std::sync::Arc;
577
578 use anyhow::Context;
579 use futures::{AsyncReadExt, AsyncWriteExt};
580 use hopr_crypto_random::Randomizable;
581 use hopr_crypto_types::prelude::*;
582 use hopr_network_types::prelude::*;
583 use hopr_primitive_types::prelude::*;
584
585 use super::*;
586 #[cfg(feature = "telemetry")]
587 use crate::telemetry::SessionTelemetry;
588
589 #[test]
590 fn test_session_id_to_str_from_str() -> anyhow::Result<()> {
591 let id = SessionId::new(1234_u64, HoprPseudonym::random());
592 assert_eq!(id.as_str(), id.to_string());
593 assert_eq!(id, SessionId::from_str(id.as_str())?);
594
595 Ok(())
596 }
597
598 #[test]
599 fn test_max_decimal_digits_for_n_bytes() {
600 assert_eq!(3, max_decimal_digits_for_n_bytes(size_of::<u8>()));
601 assert_eq!(5, max_decimal_digits_for_n_bytes(size_of::<u16>()));
602 assert_eq!(10, max_decimal_digits_for_n_bytes(size_of::<u32>()));
603 assert_eq!(20, max_decimal_digits_for_n_bytes(size_of::<u64>()));
604 }
605
606 #[test]
607 fn standard_session_id_must_fit_within_limit() {
608 let id = format!("{}:{}", SimplePseudonym::random(), Tag::Application(Tag::MAX));
609 assert!(id.len() <= MAX_SESSION_ID_STR_LEN);
610 }
611
612 #[test]
613 fn session_id_should_serialize_and_deserialize_correctly() -> anyhow::Result<()> {
614 let pseudonym = HoprPseudonym::random();
615 let tag: Tag = 1234u64.into();
616
617 let session_id_1 = SessionId::new(tag, pseudonym);
618 let data = serde_cbor_2::to_vec(&session_id_1)?;
619 let session_id_2: SessionId = serde_cbor_2::from_slice(&data)?;
620
621 assert_eq!(tag, session_id_2.tag());
622 assert_eq!(pseudonym, *session_id_2.pseudonym());
623
624 assert_eq!(session_id_1.as_str(), session_id_2.as_str());
625 assert_eq!(session_id_1, session_id_2);
626
627 Ok(())
628 }
629
630 #[test_log::test(tokio::test)]
631 async fn test_session_bidirectional_flow_without_segmentation() -> anyhow::Result<()> {
632 let dst: Address = (&ChainKeypair::random()).into();
633 let id = SessionId::new(1234_u64, HoprPseudonym::random());
634 const DATA_LEN: usize = 5000;
635
636 #[cfg(feature = "telemetry")]
637 let alice_metrics = Arc::new(SessionTelemetry::new(id, Default::default()));
638 #[cfg(feature = "telemetry")]
639 let bob_metrics = Arc::new(SessionTelemetry::new(id, Default::default()));
640
641 let (alice_tx, bob_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
642 let (bob_tx, alice_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
643
644 let mut alice_session = HoprSession::new(
645 id,
646 DestinationRouting::forward_only(dst, RoutingOptions::Hops(0.try_into()?)),
647 Default::default(),
648 (
649 alice_tx,
650 alice_rx
651 .map(|(_, data)| ApplicationDataIn {
652 data: data.data,
653 packet_info: Default::default(),
654 })
655 .inspect(|d| debug!("alice rcvd: {}", d.data.total_len())),
656 ),
657 None,
658 #[cfg(feature = "telemetry")]
659 alice_metrics,
660 )?;
661
662 let mut bob_session = HoprSession::new(
663 id,
664 DestinationRouting::Return(id.pseudonym().into()),
665 Default::default(),
666 (
667 bob_tx,
668 bob_rx
669 .map(|(_, data)| ApplicationDataIn {
670 data: data.data,
671 packet_info: Default::default(),
672 })
673 .inspect(|d| debug!("bob rcvd: {}", d.data.total_len())),
674 ),
675 None,
676 #[cfg(feature = "telemetry")]
677 bob_metrics,
678 )?;
679
680 let alice_sent = hopr_crypto_random::random_bytes::<DATA_LEN>();
681 let bob_sent = hopr_crypto_random::random_bytes::<DATA_LEN>();
682
683 let mut bob_recv = [0u8; DATA_LEN];
684 let mut alice_recv = [0u8; DATA_LEN];
685
686 tokio::time::timeout(Duration::from_secs(1), alice_session.write_all(&alice_sent))
687 .await
688 .context("alice write failed")?
689 .context("alice write timed out")?;
690 alice_session.flush().await?;
691
692 tokio::time::timeout(Duration::from_secs(1), bob_session.write_all(&bob_sent))
693 .await
694 .context("bob write failed")?
695 .context("bob write timed out")?;
696 bob_session.flush().await?;
697
698 tokio::time::timeout(Duration::from_secs(1), bob_session.read_exact(&mut bob_recv))
699 .await
700 .context("bob read failed")?
701 .context("bob read timed out")?;
702
703 tokio::time::timeout(Duration::from_secs(1), alice_session.read_exact(&mut alice_recv))
704 .await
705 .context("alice read failed")?
706 .context("alice read timed out")?;
707
708 assert_eq!(&alice_sent, bob_recv.as_slice());
709 assert_eq!(bob_sent, alice_recv);
710
711 Ok(())
712 }
713
714 #[test_log::test(tokio::test)]
715 async fn test_session_bidirectional_flow_with_segmentation() -> anyhow::Result<()> {
716 let dst: Address = (&ChainKeypair::random()).into();
717 let id = SessionId::new(1234_u64, HoprPseudonym::random());
718 const DATA_LEN: usize = 5000;
719
720 #[cfg(feature = "telemetry")]
721 let alice_metrics = Arc::new(SessionTelemetry::new(id, Default::default()));
722 #[cfg(feature = "telemetry")]
723 let bob_metrics = Arc::new(SessionTelemetry::new(id, Default::default()));
724
725 let (alice_tx, bob_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
726 let (bob_tx, alice_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
727
728 let mut alice_session = HoprSession::new(
729 id,
730 DestinationRouting::forward_only(dst, RoutingOptions::Hops(0.try_into()?)),
731 HoprSessionConfig {
732 capabilities: Capability::Segmentation.into(),
733 ..Default::default()
734 },
735 (
736 alice_tx,
737 alice_rx
738 .map(|(_, data)| ApplicationDataIn {
739 data: data.data,
740 packet_info: Default::default(),
741 })
742 .inspect(|d| debug!("alice rcvd: {}", d.data.total_len())),
743 ),
744 None,
745 #[cfg(feature = "telemetry")]
746 alice_metrics,
747 )?;
748
749 let mut bob_session = HoprSession::new(
750 id,
751 DestinationRouting::Return(id.pseudonym().into()),
752 HoprSessionConfig {
753 capabilities: Capability::Segmentation.into(),
754 ..Default::default()
755 },
756 (
757 bob_tx,
758 bob_rx
759 .map(|(_, data)| ApplicationDataIn {
760 data: data.data,
761 packet_info: Default::default(),
762 })
763 .inspect(|d| debug!("bob rcvd: {}", d.data.total_len())),
764 ),
765 None,
766 #[cfg(feature = "telemetry")]
767 bob_metrics,
768 )?;
769
770 let alice_sent = hopr_crypto_random::random_bytes::<DATA_LEN>();
771 let bob_sent = hopr_crypto_random::random_bytes::<DATA_LEN>();
772
773 let mut bob_recv = [0u8; DATA_LEN];
774 let mut alice_recv = [0u8; DATA_LEN];
775
776 tokio::time::timeout(Duration::from_secs(1), alice_session.write_all(&alice_sent))
777 .await
778 .context("alice write failed")?
779 .context("alice write timed out")?;
780 alice_session.flush().await?;
781
782 tokio::time::timeout(Duration::from_secs(1), bob_session.write_all(&bob_sent))
783 .await
784 .context("bob write failed")?
785 .context("bob write timed out")?;
786 bob_session.flush().await?;
787
788 tokio::time::timeout(Duration::from_secs(1), bob_session.read_exact(&mut bob_recv))
789 .await
790 .context("bob read failed")?
791 .context("bob read timed out")?;
792
793 tokio::time::timeout(Duration::from_secs(1), alice_session.read_exact(&mut alice_recv))
794 .await
795 .context("alice read failed")?
796 .context("alice read timed out")?;
797
798 assert_eq!(alice_sent, bob_recv);
799 assert_eq!(bob_sent, alice_recv);
800
801 Ok(())
802 }
803}