Skip to main content

hopr_transport_session/
types.rs

1use std::{
2    fmt::{Debug, Display, Formatter},
3    hash::{Hash, Hasher},
4    pin::Pin,
5    str::FromStr,
6    task::{Context, Poll},
7    time::Duration,
8};
9
10use futures::{SinkExt, StreamExt, TryStreamExt};
11use hopr_internal_types::prelude::HoprPseudonym;
12use hopr_network_types::{
13    prelude::{DestinationRouting, SealedHost},
14    utils::{AsyncWriteSink, DuplexIO},
15};
16use hopr_primitive_types::{
17    errors::GeneralError,
18    prelude::{BytesRepresentable, ToHex},
19};
20use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
21use hopr_protocol_session::{
22    AcknowledgementMode, AcknowledgementState, AcknowledgementStateConfig, ReliableSocket, SessionSocketConfig,
23    UnreliableSocket,
24};
25use hopr_protocol_start::StartProtocol;
26use tracing::{debug, instrument};
27
28use crate::{Capabilities, Capability, errors::TransportSessionError};
29
30/// Wrapper for [`Capabilities`] that makes conversion to/from `u8` possible.
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
32pub struct ByteCapabilities(pub Capabilities);
33
34impl TryFrom<u8> for ByteCapabilities {
35    type Error = GeneralError;
36
37    fn try_from(value: u8) -> Result<Self, Self::Error> {
38        Capabilities::new(value)
39            .map(Self)
40            .map_err(|_| GeneralError::ParseError("capabilities".into()))
41    }
42}
43
44impl From<ByteCapabilities> for u8 {
45    fn from(value: ByteCapabilities) -> Self {
46        *value.0.as_ref()
47    }
48}
49
50impl From<ByteCapabilities> for Capabilities {
51    fn from(value: ByteCapabilities) -> Self {
52        value.0
53    }
54}
55
56impl From<Capabilities> for ByteCapabilities {
57    fn from(value: Capabilities) -> Self {
58        Self(value)
59    }
60}
61
62impl AsRef<Capabilities> for ByteCapabilities {
63    fn as_ref(&self) -> &Capabilities {
64        &self.0
65    }
66}
67
68/// Start protocol instantiation for HOPR.
69pub type HoprStartProtocol = StartProtocol<SessionId, SessionTarget, ByteCapabilities>;
70
71/// Calculates the maximum number of decimal digits needed to represent an N-byte unsigned integer.
72///
73/// The calculation is based on the formula: ⌈8n × log_10(2)⌉
74/// where n is the number of bytes.
75const fn max_decimal_digits_for_n_bytes(n: usize) -> usize {
76    // log_10(2) = 0.301029995664 multiplied by 1 000 000 to work with integers in a const function
77    const LOG10_2_SCALED: u64 = 301030;
78    const SCALE: u64 = 1_000_000;
79
80    // 8n * log_10(2) scaled
81    let scaled = 8 * n as u64 * LOG10_2_SCALED;
82
83    scaled.div_ceil(SCALE) as usize
84}
85
86// Enough to fit HoprPseudonym in hex (with 0x prefix), delimiter and tag number
87const MAX_SESSION_ID_STR_LEN: usize = 2 + 2 * HoprPseudonym::SIZE + 1 + max_decimal_digits_for_n_bytes(Tag::SIZE);
88
89/// Unique ID of a specific Session in a certain direction.
90///
91/// Simple wrapper around the maximum range of the port like session unique identifier.
92/// It is a simple combination of an application tag for the Session and
93/// a [`HoprPseudonym`].
94#[derive(Clone, Copy)]
95pub struct SessionId {
96    tag: Tag,
97    pseudonym: HoprPseudonym,
98    // Since this SessionId is commonly represented as a string,
99    // we cache its string representation here.
100    // Also, by using a statically allocated ArrayString, we allow the SessionId to remain Copy.
101    // This representation is possibly truncated to MAX_SESSION_ID_STR_LEN.
102    // This member is always computed and is therefore not serialized.
103    cached: arrayvec::ArrayString<MAX_SESSION_ID_STR_LEN>,
104}
105
106impl SessionId {
107    const DELIMITER: char = ':';
108
109    pub fn new<T: Into<Tag>>(tag: T, pseudonym: HoprPseudonym) -> Self {
110        let tag = tag.into();
111        let mut cached = format!("{pseudonym}{}{tag}", Self::DELIMITER);
112        cached.truncate(MAX_SESSION_ID_STR_LEN);
113
114        Self {
115            tag,
116            pseudonym,
117            cached: cached.parse().expect("cannot fail due to truncation"),
118        }
119    }
120
121    pub fn tag(&self) -> Tag {
122        self.tag
123    }
124
125    pub fn pseudonym(&self) -> &HoprPseudonym {
126        &self.pseudonym
127    }
128
129    pub fn as_str(&self) -> &str {
130        &self.cached
131    }
132}
133
134impl FromStr for SessionId {
135    type Err = TransportSessionError;
136
137    fn from_str(s: &str) -> Result<Self, Self::Err> {
138        s.split_once(Self::DELIMITER)
139            .ok_or(TransportSessionError::InvalidSessionId)
140            .and_then(
141                |(pseudonym, tag)| match (HoprPseudonym::from_hex(pseudonym), Tag::from_str(tag)) {
142                    (Ok(p), Ok(t)) => Ok(Self::new(t, p)),
143                    _ => Err(TransportSessionError::InvalidSessionId),
144                },
145            )
146    }
147}
148
149impl serde::Serialize for SessionId {
150    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
151    where
152        S: serde::Serializer,
153    {
154        use serde::ser::SerializeStruct;
155        let mut state = serializer.serialize_struct("SessionId", 2)?;
156        state.serialize_field("tag", &self.tag)?;
157        state.serialize_field("pseudonym", &self.pseudonym)?;
158        state.end()
159    }
160}
161
162impl<'de> serde::Deserialize<'de> for SessionId {
163    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
164    where
165        D: serde::Deserializer<'de>,
166    {
167        use serde::de;
168
169        #[derive(serde::Deserialize)]
170        #[serde(field_identifier, rename_all = "lowercase")]
171        enum Field {
172            Tag,
173            Pseudonym,
174        }
175
176        struct SessionIdVisitor;
177
178        impl<'de> de::Visitor<'de> for SessionIdVisitor {
179            type Value = SessionId;
180
181            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
182                formatter.write_str("struct SessionId")
183            }
184
185            fn visit_seq<A>(self, mut seq: A) -> Result<SessionId, A::Error>
186            where
187                A: de::SeqAccess<'de>,
188            {
189                Ok(SessionId::new(
190                    seq.next_element::<Tag>()?
191                        .ok_or_else(|| de::Error::invalid_length(0, &self))?,
192                    seq.next_element()?.ok_or_else(|| de::Error::invalid_length(1, &self))?,
193                ))
194            }
195
196            fn visit_map<V>(self, mut map: V) -> Result<SessionId, V::Error>
197            where
198                V: de::MapAccess<'de>,
199            {
200                let mut tag: Option<Tag> = None;
201                let mut pseudonym: Option<HoprPseudonym> = None;
202                while let Some(key) = map.next_key()? {
203                    match key {
204                        Field::Tag => {
205                            if tag.is_some() {
206                                return Err(de::Error::duplicate_field("tag"));
207                            }
208                            tag = Some(map.next_value()?);
209                        }
210                        Field::Pseudonym => {
211                            if pseudonym.is_some() {
212                                return Err(de::Error::duplicate_field("pseudonym"));
213                            }
214                            pseudonym = Some(map.next_value()?);
215                        }
216                    }
217                }
218
219                Ok(SessionId::new(
220                    tag.ok_or_else(|| de::Error::missing_field("tag"))?,
221                    pseudonym.ok_or_else(|| de::Error::missing_field("pseudonym"))?,
222                ))
223            }
224        }
225
226        const FIELDS: &[&str] = &["tag", "pseudonym"];
227        deserializer.deserialize_struct("SessionId", FIELDS, SessionIdVisitor)
228    }
229}
230
231impl Display for SessionId {
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        write!(f, "{}", self.as_str())
234    }
235}
236
237impl Debug for SessionId {
238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239        write!(f, "{}", self.as_str())
240    }
241}
242
243impl PartialEq for SessionId {
244    fn eq(&self, other: &Self) -> bool {
245        self.tag == other.tag && self.pseudonym == other.pseudonym
246    }
247}
248
249impl Eq for SessionId {}
250
251impl Hash for SessionId {
252    fn hash<H: Hasher>(&self, state: &mut H) {
253        self.tag.hash(state);
254        self.pseudonym.hash(state);
255    }
256}
257
258pub(crate) fn caps_to_ack_mode(caps: Capabilities) -> AcknowledgementMode {
259    if caps.contains(Capability::RetransmissionAck | Capability::RetransmissionNack) {
260        AcknowledgementMode::Both
261    } else if caps.contains(Capability::RetransmissionAck) {
262        AcknowledgementMode::Full
263    } else {
264        AcknowledgementMode::Partial
265    }
266}
267
268/// Indicates the closure reason of a [`HoprSession`].
269#[derive(Clone, Copy, Debug, PartialEq, Eq, strum::Display)]
270pub enum ClosureReason {
271    /// Write-half of the Session has been closed.
272    WriteClosed,
273    /// Read-part of the Session has been closed (encountered empty read).
274    EmptyRead,
275    /// Session has been evicted from the cache due to inactivity or capacity reasons.
276    Eviction,
277}
278
279/// Helper trait to allow Box aliasing
280trait AsyncReadWrite: futures::AsyncWrite + futures::AsyncRead + Send + Unpin {}
281impl<T: futures::AsyncWrite + futures::AsyncRead + Send + Unpin> AsyncReadWrite for T {}
282
283/// Describes a node service target.
284/// These are specialized [`SessionTargets`](SessionTarget::ExitNode)
285/// that are local to the Exit node and have different purposes, such as Cover Traffic.
286///
287/// These targets cannot be [sealed](SealedHost) from the Entry node.
288pub type ServiceId = u32;
289
290/// Defines what should happen with the data at the recipient where the
291/// data from the established session are supposed to be forwarded to some `target`.
292#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
293pub enum SessionTarget {
294    /// Target is running over UDP with the given IP address and port.
295    UdpStream(SealedHost),
296    /// Target is running over TCP with the given address and port.
297    TcpStream(SealedHost),
298    /// Target is a service directly at the exit node with the given service ID.
299    ExitNode(ServiceId),
300}
301
302/// Wrapper for incoming [`HoprSession`] along with other information
303/// extracted from the Start protocol during the session establishment.
304#[derive(Debug)]
305pub struct IncomingSession {
306    /// Actual incoming session.
307    pub session: HoprSession,
308    /// Desired [target](SessionTarget) of the data received over the session.
309    pub target: SessionTarget,
310}
311
312/// Configures the Session protocol socket over HOPR.
313#[derive(Copy, Clone, Debug, PartialEq, Eq, smart_default::SmartDefault)]
314pub struct HoprSessionConfig {
315    /// Capabilities of the Session protocol socket.
316    ///
317    /// Default is no capabilities.
318    #[default(Capabilities::empty())]
319    pub capabilities: Capabilities,
320    /// Expected frame size of the Session protocol socket.
321    ///
322    /// Default is 1500.
323    #[default(1500)]
324    pub frame_mtu: usize,
325    /// Maximum amount of time an incomplete frame can be kept in the buffer.
326    ///
327    /// Default is 800 ms
328    #[default(Duration::from_millis(800))]
329    pub frame_timeout: Duration,
330}
331
332/// Represents the Session protocol socket over HOPR.
333///
334/// This is essentially a HOPR-specific wrapper for [`ReliableSocket`] and [`UnreliableSocket`]
335/// Session protocol sockets.
336#[pin_project::pin_project]
337pub struct HoprSession {
338    id: SessionId,
339    #[pin]
340    inner: Box<dyn AsyncReadWrite>,
341    routing: DestinationRouting,
342    cfg: HoprSessionConfig,
343    on_close: Option<Box<dyn FnOnce(SessionId, ClosureReason) + Send + Sync>>,
344    #[cfg(feature = "telemetry")]
345    metrics: std::sync::Arc<crate::telemetry::SessionTelemetry>,
346}
347
348pub(crate) const SESSION_SOCKET_CAPACITY: usize = 16384;
349
350impl HoprSession {
351    /// Creates a new HOPR Session.
352    ///
353    /// It builds an [`futures::io::AsyncRead`] + [`futures::io::AsyncWrite`] transport
354    /// from the given `hopr` interface and passing it to the appropriate [`UnreliableSocket`] or [`ReliableSocket`]
355    /// based on the given `capabilities`.
356    ///
357    /// The `on_close` closure can be optionally called when the Session has been closed via `poll_close`.
358    #[tracing::instrument(skip_all, fields(id, routing, cfg, session_id = %id))]
359    pub fn new<Tx, Rx>(
360        id: SessionId,
361        routing: DestinationRouting,
362        cfg: HoprSessionConfig,
363        hopr: (Tx, Rx),
364        on_close: Option<Box<dyn FnOnce(SessionId, ClosureReason) + Send + Sync>>,
365        #[cfg(feature = "telemetry")] metrics: std::sync::Arc<crate::telemetry::SessionTelemetry>,
366    ) -> Result<Self, TransportSessionError>
367    where
368        Tx: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Send + Sync + Unpin + 'static,
369        Rx: futures::Stream<Item = ApplicationDataIn> + Send + Sync + Unpin + 'static,
370        Tx::Error: std::error::Error + Send + Sync,
371    {
372        let routing_clone = routing.clone();
373
374        #[cfg(feature = "telemetry")]
375        let (metrics_write, metrics_read) = (metrics.clone(), metrics.clone());
376
377        // Wrap the HOPR transport so that it appears as regular transport to the SessionSocket
378        let transport = DuplexIO(
379            AsyncWriteSink::<{ ApplicationData::PAYLOAD_SIZE }, _>(hopr.0.sink_map_err(std::io::Error::other).with(
380                move |buf: Box<[u8]>| {
381                    #[cfg(feature = "telemetry")]
382                    metrics_write.record_write(buf.len());
383                    // The Session protocol does not set any packet info on outgoing packets.
384                    // However, the SessionManager on top usually overrides this.
385                    futures::future::ready(
386                        ApplicationData::new(id.tag(), buf.into_vec())
387                            .map(|data| (routing_clone.clone(), ApplicationDataOut::with_no_packet_info(data)))
388                            .map_err(std::io::Error::other),
389                    )
390                },
391            )),
392            // The Session protocol ignores the packet info on incoming packets.
393            // It is typically SessionManager's job to interpret those.
394            hopr.1
395                .map(move |data| {
396                    #[cfg(feature = "telemetry")]
397                    metrics_read.record_read(data.data.plain_text.len());
398                    Ok::<_, std::io::Error>(data.data.plain_text)
399                })
400                .into_async_read(),
401        );
402
403        // Based on the requested capabilities, see if we should use the Session protocol
404        let inner: Box<dyn AsyncReadWrite> = if cfg.capabilities.contains(Capability::Segmentation) {
405            let socket_cfg = SessionSocketConfig {
406                frame_size: cfg.frame_mtu,
407                frame_timeout: cfg.frame_timeout,
408                capacity: SESSION_SOCKET_CAPACITY,
409                flush_immediately: cfg.capabilities.contains(Capability::NoDelay),
410                ..Default::default()
411            };
412
413            // Need to test the capabilities separately, because any Retransmission capability
414            // implies Segmentation, and therefore `is_disjoint` would fail
415            if cfg.capabilities.contains(Capability::RetransmissionAck)
416                || cfg.capabilities.contains(Capability::RetransmissionNack)
417            {
418                // TODO: update config values
419                let ack_cfg = AcknowledgementStateConfig {
420                    // This is a very coarse assumption, that a single 3-hop packet
421                    // takes on average 200 ms to deliver.
422                    // We can no longer base this timeout on the number of hops because
423                    // it is not known for SURB-based routing.
424                    expected_packet_latency: Duration::from_millis(200),
425                    mode: caps_to_ack_mode(cfg.capabilities),
426                    backoff_base: 0.2,
427                    max_incoming_frame_retries: 1,
428                    max_outgoing_frame_retries: 2,
429                    ..Default::default()
430                };
431
432                debug!(?socket_cfg, ?ack_cfg, "opening new stateful session socket");
433
434                Box::new(ReliableSocket::new(
435                    transport,
436                    AcknowledgementState::<{ ApplicationData::PAYLOAD_SIZE }>::new(id, ack_cfg),
437                    socket_cfg,
438                    #[cfg(feature = "telemetry")]
439                    metrics.clone(),
440                )?)
441            } else {
442                debug!(?socket_cfg, "opening new stateless session socket");
443
444                Box::new(UnreliableSocket::<{ ApplicationData::PAYLOAD_SIZE }>::new_stateless(
445                    id,
446                    transport,
447                    socket_cfg,
448                    #[cfg(feature = "telemetry")]
449                    metrics.clone(),
450                )?)
451            }
452        } else {
453            debug!("opening raw session socket");
454            Box::new(transport)
455        };
456
457        Ok(Self {
458            id,
459            inner,
460            routing,
461            cfg,
462            on_close,
463            #[cfg(feature = "telemetry")]
464            metrics,
465        })
466    }
467
468    /// ID of this Session.
469    pub fn id(&self) -> &SessionId {
470        &self.id
471    }
472
473    /// Routing options used to deliver data.
474    pub fn routing(&self) -> &DestinationRouting {
475        &self.routing
476    }
477
478    /// Configuration of this Session.
479    pub fn config(&self) -> &HoprSessionConfig {
480        &self.cfg
481    }
482
483    #[cfg(feature = "telemetry")]
484    pub fn metrics(&self) -> &std::sync::Arc<crate::telemetry::SessionTelemetry> {
485        &self.metrics
486    }
487}
488
489impl std::fmt::Debug for HoprSession {
490    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
491        f.debug_struct("Session")
492            .field("id", &self.id)
493            .field("routing", &self.routing)
494            .finish_non_exhaustive()
495    }
496}
497
498impl futures::AsyncRead for HoprSession {
499    #[instrument(name = "Session::poll_read", level = "trace", skip_all, fields(session_id = %self.id), ret)]
500    fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
501        let this = self.project();
502        let read = futures::ready!(this.inner.poll_read(cx, buf))?;
503        if read == 0 {
504            tracing::trace!("hopr session empty read");
505            // Empty read signals end of the socket, notify if needed
506            if let Some(notifier) = this.on_close.take() {
507                tracing::trace!("notifying read half closure of session");
508                notifier(*this.id, ClosureReason::EmptyRead);
509            }
510        }
511        Poll::Ready(Ok(read))
512    }
513}
514
515impl futures::AsyncWrite for HoprSession {
516    #[instrument(name = "Session::poll_write", level = "trace", skip_all, fields(session_id = %self.id), ret)]
517    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
518        self.project().inner.poll_write(cx, buf)
519    }
520
521    #[instrument(name = "Session::poll_flush", level = "trace", skip_all, fields(session_id = %self.id), ret)]
522    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
523        self.project().inner.poll_flush(cx)
524    }
525
526    #[instrument(name = "Session::poll_close", level = "trace", skip_all, fields(session_id = %self.id), ret)]
527    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
528        let this = self.project();
529        futures::ready!(this.inner.poll_close(cx))?;
530        tracing::trace!("hopr session closed");
531
532        #[cfg(feature = "telemetry")]
533        this.metrics.set_state(crate::telemetry::SessionLifecycleState::Closing);
534
535        if let Some(notifier) = this.on_close.take() {
536            tracing::trace!("notifying write half closure of session");
537            notifier(*this.id, ClosureReason::WriteClosed);
538        }
539
540        Poll::Ready(Ok(()))
541    }
542}
543
544#[cfg(feature = "runtime-tokio")]
545impl tokio::io::AsyncRead for HoprSession {
546    fn poll_read(
547        mut self: Pin<&mut Self>,
548        cx: &mut Context<'_>,
549        buf: &mut tokio::io::ReadBuf<'_>,
550    ) -> Poll<std::io::Result<()>> {
551        let slice = buf.initialize_unfilled();
552        let n = std::task::ready!(futures::AsyncRead::poll_read(self.as_mut(), cx, slice))?;
553        buf.advance(n);
554        Poll::Ready(Ok(()))
555    }
556}
557
558#[cfg(feature = "runtime-tokio")]
559impl tokio::io::AsyncWrite for HoprSession {
560    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, std::io::Error>> {
561        futures::AsyncWrite::poll_write(self.as_mut(), cx, buf)
562    }
563
564    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
565        futures::AsyncWrite::poll_flush(self.as_mut(), cx)
566    }
567
568    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
569        futures::AsyncWrite::poll_close(self.as_mut(), cx)
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    #[cfg(feature = "telemetry")]
576    use std::sync::Arc;
577
578    use anyhow::Context;
579    use futures::{AsyncReadExt, AsyncWriteExt};
580    use hopr_crypto_random::Randomizable;
581    use hopr_crypto_types::prelude::*;
582    use hopr_network_types::prelude::*;
583    use hopr_primitive_types::prelude::*;
584
585    use super::*;
586    #[cfg(feature = "telemetry")]
587    use crate::telemetry::SessionTelemetry;
588
589    #[test]
590    fn test_session_id_to_str_from_str() -> anyhow::Result<()> {
591        let id = SessionId::new(1234_u64, HoprPseudonym::random());
592        assert_eq!(id.as_str(), id.to_string());
593        assert_eq!(id, SessionId::from_str(id.as_str())?);
594
595        Ok(())
596    }
597
598    #[test]
599    fn test_max_decimal_digits_for_n_bytes() {
600        assert_eq!(3, max_decimal_digits_for_n_bytes(size_of::<u8>()));
601        assert_eq!(5, max_decimal_digits_for_n_bytes(size_of::<u16>()));
602        assert_eq!(10, max_decimal_digits_for_n_bytes(size_of::<u32>()));
603        assert_eq!(20, max_decimal_digits_for_n_bytes(size_of::<u64>()));
604    }
605
606    #[test]
607    fn standard_session_id_must_fit_within_limit() {
608        let id = format!("{}:{}", SimplePseudonym::random(), Tag::Application(Tag::MAX));
609        assert!(id.len() <= MAX_SESSION_ID_STR_LEN);
610    }
611
612    #[test]
613    fn session_id_should_serialize_and_deserialize_correctly() -> anyhow::Result<()> {
614        let pseudonym = HoprPseudonym::random();
615        let tag: Tag = 1234u64.into();
616
617        let session_id_1 = SessionId::new(tag, pseudonym);
618        let data = serde_cbor_2::to_vec(&session_id_1)?;
619        let session_id_2: SessionId = serde_cbor_2::from_slice(&data)?;
620
621        assert_eq!(tag, session_id_2.tag());
622        assert_eq!(pseudonym, *session_id_2.pseudonym());
623
624        assert_eq!(session_id_1.as_str(), session_id_2.as_str());
625        assert_eq!(session_id_1, session_id_2);
626
627        Ok(())
628    }
629
630    #[test_log::test(tokio::test)]
631    async fn test_session_bidirectional_flow_without_segmentation() -> anyhow::Result<()> {
632        let dst: Address = (&ChainKeypair::random()).into();
633        let id = SessionId::new(1234_u64, HoprPseudonym::random());
634        const DATA_LEN: usize = 5000;
635
636        #[cfg(feature = "telemetry")]
637        let alice_metrics = Arc::new(SessionTelemetry::new(id, Default::default()));
638        #[cfg(feature = "telemetry")]
639        let bob_metrics = Arc::new(SessionTelemetry::new(id, Default::default()));
640
641        let (alice_tx, bob_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
642        let (bob_tx, alice_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
643
644        let mut alice_session = HoprSession::new(
645            id,
646            DestinationRouting::forward_only(dst, RoutingOptions::Hops(0.try_into()?)),
647            Default::default(),
648            (
649                alice_tx,
650                alice_rx
651                    .map(|(_, data)| ApplicationDataIn {
652                        data: data.data,
653                        packet_info: Default::default(),
654                    })
655                    .inspect(|d| debug!("alice rcvd: {}", d.data.total_len())),
656            ),
657            None,
658            #[cfg(feature = "telemetry")]
659            alice_metrics,
660        )?;
661
662        let mut bob_session = HoprSession::new(
663            id,
664            DestinationRouting::Return(id.pseudonym().into()),
665            Default::default(),
666            (
667                bob_tx,
668                bob_rx
669                    .map(|(_, data)| ApplicationDataIn {
670                        data: data.data,
671                        packet_info: Default::default(),
672                    })
673                    .inspect(|d| debug!("bob rcvd: {}", d.data.total_len())),
674            ),
675            None,
676            #[cfg(feature = "telemetry")]
677            bob_metrics,
678        )?;
679
680        let alice_sent = hopr_crypto_random::random_bytes::<DATA_LEN>();
681        let bob_sent = hopr_crypto_random::random_bytes::<DATA_LEN>();
682
683        let mut bob_recv = [0u8; DATA_LEN];
684        let mut alice_recv = [0u8; DATA_LEN];
685
686        tokio::time::timeout(Duration::from_secs(1), alice_session.write_all(&alice_sent))
687            .await
688            .context("alice write failed")?
689            .context("alice write timed out")?;
690        alice_session.flush().await?;
691
692        tokio::time::timeout(Duration::from_secs(1), bob_session.write_all(&bob_sent))
693            .await
694            .context("bob write failed")?
695            .context("bob write timed out")?;
696        bob_session.flush().await?;
697
698        tokio::time::timeout(Duration::from_secs(1), bob_session.read_exact(&mut bob_recv))
699            .await
700            .context("bob read failed")?
701            .context("bob read timed out")?;
702
703        tokio::time::timeout(Duration::from_secs(1), alice_session.read_exact(&mut alice_recv))
704            .await
705            .context("alice read failed")?
706            .context("alice read timed out")?;
707
708        assert_eq!(&alice_sent, bob_recv.as_slice());
709        assert_eq!(bob_sent, alice_recv);
710
711        Ok(())
712    }
713
714    #[test_log::test(tokio::test)]
715    async fn test_session_bidirectional_flow_with_segmentation() -> anyhow::Result<()> {
716        let dst: Address = (&ChainKeypair::random()).into();
717        let id = SessionId::new(1234_u64, HoprPseudonym::random());
718        const DATA_LEN: usize = 5000;
719
720        #[cfg(feature = "telemetry")]
721        let alice_metrics = Arc::new(SessionTelemetry::new(id, Default::default()));
722        #[cfg(feature = "telemetry")]
723        let bob_metrics = Arc::new(SessionTelemetry::new(id, Default::default()));
724
725        let (alice_tx, bob_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
726        let (bob_tx, alice_rx) = futures::channel::mpsc::unbounded::<(DestinationRouting, ApplicationDataOut)>();
727
728        let mut alice_session = HoprSession::new(
729            id,
730            DestinationRouting::forward_only(dst, RoutingOptions::Hops(0.try_into()?)),
731            HoprSessionConfig {
732                capabilities: Capability::Segmentation.into(),
733                ..Default::default()
734            },
735            (
736                alice_tx,
737                alice_rx
738                    .map(|(_, data)| ApplicationDataIn {
739                        data: data.data,
740                        packet_info: Default::default(),
741                    })
742                    .inspect(|d| debug!("alice rcvd: {}", d.data.total_len())),
743            ),
744            None,
745            #[cfg(feature = "telemetry")]
746            alice_metrics,
747        )?;
748
749        let mut bob_session = HoprSession::new(
750            id,
751            DestinationRouting::Return(id.pseudonym().into()),
752            HoprSessionConfig {
753                capabilities: Capability::Segmentation.into(),
754                ..Default::default()
755            },
756            (
757                bob_tx,
758                bob_rx
759                    .map(|(_, data)| ApplicationDataIn {
760                        data: data.data,
761                        packet_info: Default::default(),
762                    })
763                    .inspect(|d| debug!("bob rcvd: {}", d.data.total_len())),
764            ),
765            None,
766            #[cfg(feature = "telemetry")]
767            bob_metrics,
768        )?;
769
770        let alice_sent = hopr_crypto_random::random_bytes::<DATA_LEN>();
771        let bob_sent = hopr_crypto_random::random_bytes::<DATA_LEN>();
772
773        let mut bob_recv = [0u8; DATA_LEN];
774        let mut alice_recv = [0u8; DATA_LEN];
775
776        tokio::time::timeout(Duration::from_secs(1), alice_session.write_all(&alice_sent))
777            .await
778            .context("alice write failed")?
779            .context("alice write timed out")?;
780        alice_session.flush().await?;
781
782        tokio::time::timeout(Duration::from_secs(1), bob_session.write_all(&bob_sent))
783            .await
784            .context("bob write failed")?
785            .context("bob write timed out")?;
786        bob_session.flush().await?;
787
788        tokio::time::timeout(Duration::from_secs(1), bob_session.read_exact(&mut bob_recv))
789            .await
790            .context("bob read failed")?
791            .context("bob read timed out")?;
792
793        tokio::time::timeout(Duration::from_secs(1), alice_session.read_exact(&mut alice_recv))
794            .await
795            .context("alice read failed")?
796            .context("alice read timed out")?;
797
798        assert_eq!(alice_sent, bob_recv);
799        assert_eq!(bob_sent, alice_recv);
800
801        Ok(())
802    }
803}