hopr_transport_session/
types.rs

1use std::{
2    fmt::{Debug, Display, Formatter},
3    hash::{Hash, Hasher},
4    io::{Error, ErrorKind},
5    pin::Pin,
6    sync::Arc,
7    task::{Context, Poll},
8    time::Duration,
9};
10
11use futures::{StreamExt, pin_mut};
12use hopr_crypto_packet::prelude::HoprPacket;
13use hopr_internal_types::prelude::HoprPseudonym;
14use hopr_network_types::{
15    prelude::{DestinationRouting, SealedHost},
16    session::state::{SessionConfig, SessionSocket},
17};
18use hopr_primitive_types::prelude::BytesRepresentable;
19use hopr_transport_packet::prelude::{ApplicationData, Tag};
20use tracing::{debug, error};
21
22use crate::{Capabilities, Capability, capabilities_to_features, errors::TransportSessionError, traits::SendMsg};
23
24#[cfg(all(feature = "prometheus", not(test)))]
25lazy_static::lazy_static! {
26    static ref METRIC_SESSION_INNER_SIZES: hopr_metrics::MultiHistogram =
27        hopr_metrics::MultiHistogram::new(
28            "hopr_session_inner_sizes",
29            "Sizes of data chunks fed from inner session to HOPR protocol",
30            vec![20.0, 40.0, 80.0, 160.0, 320.0, 640.0, 1280.0],
31            &["session_id"]
32    ).unwrap();
33}
34
35/// Calculates the maximum number of decimal digits needed to represent an N-byte unsigned integer.
36///
37/// The calculation is based on the formula: ⌈8n × log_10(2)⌉
38/// where n is the number of bytes.
39const fn max_decimal_digits_for_n_bytes(n: usize) -> usize {
40    // log_10(2) = 0.301029995664 multiplied by 1 000 000 to work with integers in a const function
41    const LOG10_2_SCALED: u64 = 301030;
42    const SCALE: u64 = 1_000_000;
43
44    // 8n * log_10(2) scaled
45    let scaled = 8 * n as u64 * LOG10_2_SCALED;
46
47    scaled.div_ceil(SCALE) as usize
48}
49
50// Enough to fit HoprPseudonym in hex (with 0x prefix), delimiter and tag number
51const MAX_SESSION_ID_STR_LEN: usize = 2 + 2 * HoprPseudonym::SIZE + 1 + max_decimal_digits_for_n_bytes(Tag::SIZE);
52
53/// Unique ID of a specific Session in a certain direction.
54///
55/// Simple wrapper around the maximum range of the port like session unique identifier.
56/// It is a simple combination of an application tag for the Session and
57/// a [`HoprPseudonym`].
58#[derive(Clone, Copy)]
59pub struct SessionId {
60    tag: Tag,
61    pseudonym: HoprPseudonym,
62    // Since this SessionId is commonly represented as a string,
63    // we cache its string representation here.
64    // Also, by using a statically allocated ArrayString, we allow the SessionId to remain Copy.
65    // This representation is possibly truncated to MAX_SESSION_ID_STR_LEN.
66    // This member is always computed and is therefore not serialized.
67    cached: arrayvec::ArrayString<MAX_SESSION_ID_STR_LEN>,
68}
69
70impl SessionId {
71    pub fn new<T: Into<Tag>>(tag: T, pseudonym: HoprPseudonym) -> Self {
72        let tag = tag.into();
73        let mut cached = format!("{pseudonym}:{tag}");
74        cached.truncate(MAX_SESSION_ID_STR_LEN);
75
76        Self {
77            tag,
78            pseudonym,
79            cached: cached.parse().expect("cannot fail due to truncation"),
80        }
81    }
82
83    pub fn tag(&self) -> Tag {
84        self.tag
85    }
86
87    pub fn pseudonym(&self) -> &HoprPseudonym {
88        &self.pseudonym
89    }
90
91    pub fn as_str(&self) -> &str {
92        &self.cached
93    }
94}
95
96#[cfg(feature = "serde")]
97impl serde::Serialize for SessionId {
98    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
99    where
100        S: serde::Serializer,
101    {
102        use serde::ser::SerializeStruct;
103        let mut state = serializer.serialize_struct("SessionId", 2)?;
104        state.serialize_field("tag", &self.tag)?;
105        state.serialize_field("pseudonym", &self.pseudonym)?;
106        state.end()
107    }
108}
109
110#[cfg(feature = "serde")]
111impl<'de> serde::Deserialize<'de> for SessionId {
112    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
113    where
114        D: serde::Deserializer<'de>,
115    {
116        use serde::de;
117
118        #[derive(serde::Deserialize)]
119        #[serde(field_identifier, rename_all = "lowercase")]
120        enum Field {
121            Tag,
122            Pseudonym,
123        }
124
125        struct SessionIdVisitor;
126
127        impl<'de> de::Visitor<'de> for SessionIdVisitor {
128            type Value = SessionId;
129
130            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
131                formatter.write_str("struct SessionId")
132            }
133
134            fn visit_seq<A>(self, mut seq: A) -> Result<SessionId, A::Error>
135            where
136                A: de::SeqAccess<'de>,
137            {
138                Ok(SessionId::new(
139                    seq.next_element::<Tag>()?
140                        .ok_or_else(|| de::Error::invalid_length(0, &self))?,
141                    seq.next_element()?.ok_or_else(|| de::Error::invalid_length(1, &self))?,
142                ))
143            }
144
145            fn visit_map<V>(self, mut map: V) -> Result<SessionId, V::Error>
146            where
147                V: de::MapAccess<'de>,
148            {
149                let mut tag: Option<Tag> = None;
150                let mut pseudonym: Option<HoprPseudonym> = None;
151                while let Some(key) = map.next_key()? {
152                    match key {
153                        Field::Tag => {
154                            if tag.is_some() {
155                                return Err(de::Error::duplicate_field("tag"));
156                            }
157                            tag = Some(map.next_value()?);
158                        }
159                        Field::Pseudonym => {
160                            if pseudonym.is_some() {
161                                return Err(de::Error::duplicate_field("pseudonym"));
162                            }
163                            pseudonym = Some(map.next_value()?);
164                        }
165                    }
166                }
167
168                Ok(SessionId::new(
169                    tag.ok_or_else(|| de::Error::missing_field("tag"))?,
170                    pseudonym.ok_or_else(|| de::Error::missing_field("pseudonym"))?,
171                ))
172            }
173        }
174
175        const FIELDS: &[&str] = &["tag", "pseudonym"];
176        deserializer.deserialize_struct("SessionId", FIELDS, SessionIdVisitor)
177    }
178}
179
180impl Display for SessionId {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        write!(f, "{}", self.as_str())
183    }
184}
185
186impl Debug for SessionId {
187    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188        write!(f, "{}", self.as_str())
189    }
190}
191
192impl PartialEq for SessionId {
193    fn eq(&self, other: &Self) -> bool {
194        self.tag == other.tag && self.pseudonym == other.pseudonym
195    }
196}
197
198impl Eq for SessionId {}
199
200impl Hash for SessionId {
201    fn hash<H: Hasher>(&self, state: &mut H) {
202        self.tag.hash(state);
203        self.pseudonym.hash(state);
204    }
205}
206
207/// Helper trait to allow Box aliasing
208trait AsyncReadWrite: futures::AsyncWrite + futures::AsyncRead + Send {}
209impl<T: futures::AsyncWrite + futures::AsyncRead + Send> AsyncReadWrite for T {}
210
211/// Describes a node service target.
212/// These are specialized [`SessionTargets`](SessionTarget::ExitNode)
213/// that are local to the Exit node and have different purposes, such as Cover Traffic.
214///
215/// These targets cannot be [sealed](SealedHost) from the Entry node.
216pub type ServiceId = u32;
217
218/// Defines what should happen with the data at the recipient where the
219/// data from the established session are supposed to be forwarded to some `target`.
220#[derive(Debug, Clone, PartialEq, Eq)]
221#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
222pub enum SessionTarget {
223    /// Target is running over UDP with the given IP address and port.
224    UdpStream(SealedHost),
225    /// Target is running over TCP with the given address and port.
226    TcpStream(SealedHost),
227    /// Target is a service directly at the exit node with the given service ID.
228    ExitNode(ServiceId),
229}
230
231/// Wrapper for incoming [Session] along with other information
232/// extracted from the Start protocol during the session establishment.
233#[derive(Debug)]
234pub struct IncomingSession {
235    /// Actual incoming session.
236    pub session: Session,
237    /// Desired [target](SessionTarget) of the data received over the session.
238    pub target: SessionTarget,
239}
240
241// TODO: missing docs
242pub struct Session {
243    id: SessionId,
244    inner: Pin<Box<dyn AsyncReadWrite>>,
245    routing: DestinationRouting,
246    capabilities: Capabilities,
247    on_close: Option<Box<dyn FnOnce(SessionId) + Send + Sync>>,
248}
249
250impl Session {
251    pub fn new(
252        id: SessionId,
253        routing: DestinationRouting,
254        capabilities: Capabilities,
255        tx: Arc<dyn SendMsg + Send + Sync>,
256        rx: Pin<Box<dyn futures::Stream<Item = Box<[u8]>> + Send + Sync>>,
257        on_close: Option<Box<dyn FnOnce(SessionId) + Send + Sync>>,
258    ) -> Self {
259        let inner_session = InnerSession::new(id, routing.clone(), tx, rx);
260
261        // If we request any capability, we need to use Session protocol
262        if !capabilities.is_empty() {
263            // This is a very coarse assumption, that 3-hop takes at most 3 seconds.
264            // We can no longer base this timeout on the number of hops because
265            // it is not known for SURB-based routing.
266            let rto_base = Duration::from_secs(3);
267
268            let expiration_coefficient =
269                if !capabilities.is_disjoint(Capability::RetransmissionAck | Capability::RetransmissionNack) {
270                    4
271                } else {
272                    1
273                };
274
275            // TODO: tweak the default Session protocol config
276            let cfg = SessionConfig {
277                enabled_features: capabilities_to_features(&capabilities),
278                acknowledged_frames_buffer: 100_000, // Can hold frames for > 40 sec at 2000 frames/sec
279                frame_expiration_age: rto_base * expiration_coefficient,
280                rto_base_receiver: rto_base, // Ask for segment resend, if not yet complete after this period
281                rto_base_sender: rto_base * 2, // Resend frame if is not acknowledged after this period
282                ..Default::default()
283            };
284            debug!(
285                session_id = ?id,
286                ?cfg,
287                "opening new session socket"
288            );
289
290            Self {
291                id,
292                inner: Box::pin(SessionSocket::<{ ApplicationData::PAYLOAD_SIZE }>::new(
293                    id,
294                    inner_session,
295                    cfg,
296                )),
297                routing,
298                capabilities,
299                on_close,
300            }
301        } else {
302            // Otherwise, no additional sub protocol is necessary
303            Self {
304                id,
305                inner: Box::pin(inner_session),
306                routing,
307                capabilities,
308                on_close,
309            }
310        }
311    }
312
313    /// ID of this Session.
314    pub fn id(&self) -> &SessionId {
315        &self.id
316    }
317
318    /// Routing options used to deliver data.
319    pub fn routing(&self) -> &DestinationRouting {
320        &self.routing
321    }
322
323    /// Capabilities of this Session.
324    pub fn capabilities(&self) -> &Capabilities {
325        &self.capabilities
326    }
327}
328
329impl std::fmt::Debug for Session {
330    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
331        f.debug_struct("Session")
332            .field("id", &self.id)
333            .field("routing", &self.routing)
334            .finish_non_exhaustive()
335    }
336}
337
338impl futures::AsyncRead for Session {
339    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
340        let inner = self.inner.as_mut();
341        pin_mut!(inner);
342        inner.poll_read(cx, buf)
343    }
344}
345
346impl futures::AsyncWrite for Session {
347    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
348        let inner = &mut self.inner;
349        pin_mut!(inner);
350        inner.poll_write(cx, buf)
351    }
352
353    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
354        let inner = &mut self.inner;
355        pin_mut!(inner);
356        inner.poll_flush(cx)
357    }
358
359    fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<std::io::Result<()>> {
360        let inner = &mut self.inner;
361        pin_mut!(inner);
362        match inner.poll_close(cx) {
363            Poll::Ready(res) => {
364                // Notify about closure if desired
365                if let Some(notifier) = self.on_close.take() {
366                    notifier(self.id);
367                }
368                Poll::Ready(res)
369            }
370            Poll::Pending => Poll::Pending,
371        }
372    }
373}
374
375#[cfg(feature = "runtime-tokio")]
376impl tokio::io::AsyncRead for Session {
377    fn poll_read(
378        mut self: Pin<&mut Self>,
379        cx: &mut Context<'_>,
380        buf: &mut tokio::io::ReadBuf<'_>,
381    ) -> Poll<std::io::Result<()>> {
382        let slice = buf.initialize_unfilled();
383        let n = std::task::ready!(futures::AsyncRead::poll_read(self.as_mut(), cx, slice))?;
384        buf.advance(n);
385        Poll::Ready(Ok(()))
386    }
387}
388
389#[cfg(feature = "runtime-tokio")]
390impl tokio::io::AsyncWrite for Session {
391    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, Error>> {
392        futures::AsyncWrite::poll_write(self.as_mut(), cx, buf)
393    }
394
395    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
396        futures::AsyncWrite::poll_flush(self.as_mut(), cx)
397    }
398
399    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
400        futures::AsyncWrite::poll_close(self.as_mut(), cx)
401    }
402}
403
404type FuturesBuffer = futures::stream::FuturesUnordered<
405    Pin<Box<dyn std::future::Future<Output = Result<(), TransportSessionError>> + Send>>,
406>;
407struct InnerSession {
408    id: SessionId,
409    routing: DestinationRouting,
410    rx: Pin<Box<dyn futures::Stream<Item = Box<[u8]>> + Send + Sync>>,
411    tx: Arc<dyn SendMsg + Send + Sync>,
412    tx_bytes: usize,
413    tx_buffer: FuturesBuffer,
414    rx_buffer: [u8; HoprPacket::PAYLOAD_SIZE],
415    rx_buffer_range: (usize, usize),
416    closed: bool,
417}
418
419impl InnerSession {
420    pub fn new(
421        id: SessionId,
422        routing: DestinationRouting,
423        tx: Arc<dyn SendMsg + Send + Sync>,
424        rx: Pin<Box<dyn futures::Stream<Item = Box<[u8]>> + Send + Sync>>,
425    ) -> Self {
426        Self {
427            id,
428            routing,
429            rx,
430            tx,
431            tx_bytes: 0,
432            tx_buffer: futures::stream::FuturesUnordered::new(),
433            rx_buffer: [0; HoprPacket::PAYLOAD_SIZE],
434            rx_buffer_range: (0, 0),
435            closed: false,
436        }
437    }
438}
439
440impl futures::AsyncWrite for InnerSession {
441    fn poll_write(
442        mut self: std::pin::Pin<&mut Self>,
443        cx: &mut std::task::Context<'_>,
444        buf: &[u8],
445    ) -> Poll<std::io::Result<usize>> {
446        if self.closed {
447            return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, "session closed")));
448        }
449
450        #[cfg(all(feature = "prometheus", not(test)))]
451        METRIC_SESSION_INNER_SIZES.observe(&[self.id.as_str()], buf.len() as f64);
452
453        if !self.tx_buffer.is_empty() {
454            loop {
455                match self.tx_buffer.poll_next_unpin(cx) {
456                    Poll::Ready(Some(Ok(()))) => {
457                        continue;
458                    }
459                    Poll::Ready(Some(Err(TransportSessionError::OutOfSurbs))) => {
460                        // Discard messages until SURBs are available
461                        error!(session_id = %self.id, "message discarded due to missing SURB for reply");
462                        continue;
463                    }
464                    Poll::Ready(Some(Err(e))) => {
465                        error!(session_id = %self.id, error = %e, "failed to send the message chunk inside a session");
466                        return Poll::Ready(Err(Error::from(ErrorKind::BrokenPipe)));
467                    }
468                    Poll::Ready(None) => {
469                        self.tx_buffer.clear();
470                        return Poll::Ready(Ok(self.tx_bytes));
471                    }
472                    Poll::Pending => {
473                        return Poll::Pending;
474                    }
475                }
476            }
477        }
478
479        let tag = self.id.tag();
480
481        self.tx_buffer.clear();
482        self.tx_bytes = 0;
483
484        for i in
485            0..(buf.len() / ApplicationData::PAYLOAD_SIZE + ((buf.len() % ApplicationData::PAYLOAD_SIZE != 0) as usize))
486        {
487            let start = i * ApplicationData::PAYLOAD_SIZE;
488            let end = ((i + 1) * ApplicationData::PAYLOAD_SIZE).min(buf.len());
489
490            let payload = ApplicationData::new(tag, &buf[start..end]);
491            let sender = self.tx.clone();
492            let routing = self.routing.clone();
493
494            self.tx_buffer
495                .push(Box::pin(async move { sender.send_message(payload, routing).await }));
496
497            self.tx_bytes += end - start;
498        }
499
500        loop {
501            match self.tx_buffer.poll_next_unpin(cx) {
502                Poll::Ready(Some(Ok(_))) => {
503                    continue;
504                }
505                Poll::Ready(Some(Err(TransportSessionError::OutOfSurbs))) => {
506                    // Discard messages until SURBs are available
507                    error!(session_id = %self.id, "message discarded due to missing SURB for reply");
508                    continue;
509                }
510                Poll::Ready(Some(Err(error))) => {
511                    error!(session_id = %self.id, %error, "failed to send the message chunk inside a session");
512                    break Poll::Ready(Err(Error::from(ErrorKind::BrokenPipe)));
513                }
514                Poll::Ready(None) => {
515                    self.tx_buffer.clear();
516                    break Poll::Ready(Ok(self.tx_bytes));
517                }
518                Poll::Pending => {
519                    break Poll::Pending;
520                }
521            }
522        }
523    }
524
525    fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<std::io::Result<()>> {
526        if self.closed {
527            return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, "session closed")));
528        }
529
530        while let Poll::Ready(Some(result)) = self.tx_buffer.poll_next_unpin(cx) {
531            if let Err(error) = result {
532                error!(session_id = %self.id, %error, "failed to send message chunk inside session during flush");
533                return Poll::Ready(Err(Error::from(ErrorKind::BrokenPipe)));
534            }
535        }
536        Poll::Ready(Ok(()))
537    }
538
539    fn poll_close(mut self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<std::io::Result<()>> {
540        self.closed = true;
541        Poll::Ready(Ok(()))
542    }
543}
544
545impl futures::AsyncRead for InnerSession {
546    fn poll_read(
547        mut self: std::pin::Pin<&mut Self>,
548        cx: &mut std::task::Context<'_>,
549        buf: &mut [u8],
550    ) -> Poll<std::io::Result<usize>> {
551        if self.rx_buffer_range.0 != self.rx_buffer_range.1 {
552            let start = self.rx_buffer_range.0;
553            let copy_len = self.rx_buffer_range.1.min(buf.len());
554
555            buf[..copy_len].copy_from_slice(&self.rx_buffer[start..start + copy_len]);
556
557            self.rx_buffer_range.0 += copy_len;
558            if self.rx_buffer_range.0 == self.rx_buffer_range.1 {
559                self.rx_buffer_range = (0, 0);
560            }
561
562            return Poll::Ready(Ok(copy_len));
563        }
564
565        match self.rx.poll_next_unpin(cx) {
566            Poll::Ready(Some(data)) => {
567                let data_len = data.len();
568                let copy_len = data_len.min(buf.len());
569                if copy_len < data_len {
570                    self.rx_buffer[0..data_len - copy_len].copy_from_slice(&data[copy_len..]);
571                    self.rx_buffer_range = (0, data_len - copy_len);
572                }
573
574                buf[..copy_len].copy_from_slice(&data[..copy_len]);
575
576                Poll::Ready(Ok(copy_len))
577            }
578            Poll::Ready(None) => {
579                Poll::Ready(Ok(0)) // due to convention, Ok(0) indicates EOF
580            }
581            Poll::Pending => Poll::Pending,
582        }
583    }
584}
585
586/// Convenience function to copy data in both directions between a [Session] and arbitrary
587/// async IO stream.
588/// This function is only available with Tokio and will panic with other runtimes.
589#[cfg(feature = "runtime-tokio")]
590pub async fn transfer_session<S>(
591    session: &mut Session,
592    stream: &mut S,
593    max_buffer: usize,
594) -> std::io::Result<(usize, usize)>
595where
596    S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
597{
598    // We can always read as much as possible from the Session and then write it to the Stream.
599    // There are two possibilities for the opposite direction:
600    // 1) If Session protocol is used for segmentation, we need to buffer up data at MAX_WRITE_SIZE.
601    // 2) Otherwise, the bare session implements chunking, therefore, data can be written with arbitrary sizes.
602    let into_session_len = if session.capabilities().contains(Capability::Segmentation) {
603        max_buffer.min(SessionSocket::<{ ApplicationData::PAYLOAD_SIZE }>::MAX_WRITE_SIZE)
604    } else {
605        max_buffer
606    };
607
608    debug!(
609        session_id = ?session.id(),
610        egress_buffer = max_buffer,
611        ingress_buffer = into_session_len,
612        "session buffers"
613    );
614
615    hopr_network_types::utils::copy_duplex(session, stream, max_buffer, into_session_len)
616        .await
617        .map(|(a, b)| (a as usize, b as usize))
618}
619
620#[cfg(test)]
621mod tests {
622    use futures::{AsyncReadExt, AsyncWriteExt};
623    use hopr_crypto_random::Randomizable;
624    use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
625    use hopr_network_types::prelude::{RoutingOptions, SurbMatcher};
626    use hopr_primitive_types::prelude::Address;
627
628    use super::*;
629    use crate::traits::MockSendMsg;
630
631    #[test]
632    fn test_max_decimal_digits_for_n_bytes() {
633        assert_eq!(3, max_decimal_digits_for_n_bytes(size_of::<u8>()));
634        assert_eq!(5, max_decimal_digits_for_n_bytes(size_of::<u16>()));
635        assert_eq!(10, max_decimal_digits_for_n_bytes(size_of::<u32>()));
636        assert_eq!(20, max_decimal_digits_for_n_bytes(size_of::<u64>()));
637    }
638
639    #[cfg(feature = "serde")]
640    #[test]
641    fn session_id_should_serialize_and_deserialize_correctly() -> anyhow::Result<()> {
642        const SESSION_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
643            .with_little_endian()
644            .with_variable_int_encoding();
645
646        let pseudonym = HoprPseudonym::random();
647        let tag: Tag = 1234u64.into();
648
649        let session_id_1 = SessionId::new(tag, pseudonym);
650        let data = bincode::serde::encode_to_vec(session_id_1, SESSION_BINCODE_CONFIGURATION)?;
651        let session_id_2: SessionId = bincode::serde::decode_from_slice(&data, SESSION_BINCODE_CONFIGURATION)?.0;
652
653        assert_eq!(tag, session_id_2.tag());
654        assert_eq!(pseudonym, *session_id_2.pseudonym());
655
656        assert_eq!(session_id_1.as_str(), session_id_2.as_str());
657        assert_eq!(session_id_1, session_id_2);
658
659        Ok(())
660    }
661
662    #[test]
663    fn session_should_identify_with_its_own_id() -> anyhow::Result<()> {
664        let addr: Address = (&ChainKeypair::random()).into();
665        let id = SessionId::new(1u64, HoprPseudonym::random());
666        let (_tx, rx) = futures::channel::mpsc::unbounded();
667        let mock = MockSendMsg::new();
668
669        let session = InnerSession::new(
670            id,
671            DestinationRouting::forward_only(addr, RoutingOptions::Hops(1_u32.try_into()?)),
672            Arc::new(mock),
673            Box::pin(rx),
674        );
675
676        assert_eq!(session.id, id);
677
678        Ok(())
679    }
680
681    #[tokio::test]
682    async fn session_should_read_data_in_one_swoop_if_the_buffer_is_sufficiently_large() -> anyhow::Result<()> {
683        let addr: Address = (&ChainKeypair::random()).into();
684        let id = SessionId::new(1u64, HoprPseudonym::random());
685        let (tx, rx) = futures::channel::mpsc::unbounded();
686        let mock = MockSendMsg::new();
687
688        let mut session = InnerSession::new(
689            id,
690            DestinationRouting::forward_only(addr, RoutingOptions::Hops(1_u32.try_into()?)),
691            Arc::new(mock),
692            Box::pin(rx),
693        );
694
695        let random_data = hopr_crypto_random::random_bytes::<{ HoprPacket::PAYLOAD_SIZE }>()
696            .as_ref()
697            .to_vec()
698            .into_boxed_slice();
699
700        assert!(tx.unbounded_send(random_data.clone()).is_ok());
701
702        let mut buffer = vec![0; HoprPacket::PAYLOAD_SIZE * 2];
703
704        let bytes_read = session.read(&mut buffer[..]).await?;
705
706        assert_eq!(bytes_read, random_data.len());
707        assert_eq!(&buffer[..bytes_read], random_data.as_ref());
708
709        Ok(())
710    }
711
712    #[tokio::test]
713    async fn session_should_read_data_in_multiple_rounds_if_the_buffer_is_not_sufficiently_large() -> anyhow::Result<()>
714    {
715        let addr: Address = (&ChainKeypair::random()).into();
716        let id = SessionId::new(1u64, HoprPseudonym::random());
717        let (tx, rx) = futures::channel::mpsc::unbounded();
718        let mock = MockSendMsg::new();
719
720        let mut session = InnerSession::new(
721            id,
722            DestinationRouting::forward_only(addr, RoutingOptions::Hops(1_u32.try_into()?)),
723            Arc::new(mock),
724            Box::pin(rx),
725        );
726
727        let random_data = hopr_crypto_random::random_bytes::<{ HoprPacket::PAYLOAD_SIZE }>()
728            .as_ref()
729            .to_vec()
730            .into_boxed_slice();
731
732        assert!(tx.unbounded_send(random_data.clone()).is_ok());
733
734        const BUFFER_SIZE: usize = HoprPacket::PAYLOAD_SIZE - 1;
735        let mut buffer = vec![0; BUFFER_SIZE];
736
737        let bytes_read = session.read(&mut buffer[..]).await?;
738
739        assert_eq!(bytes_read, BUFFER_SIZE);
740        assert_eq!(&buffer[..bytes_read], &random_data[..BUFFER_SIZE]);
741
742        let bytes_read = session.read(&mut buffer[..]).await?;
743
744        assert_eq!(bytes_read, HoprPacket::PAYLOAD_SIZE - BUFFER_SIZE);
745        assert_eq!(&buffer[..bytes_read], &random_data[BUFFER_SIZE..]);
746
747        Ok(())
748    }
749
750    #[tokio::test]
751    async fn session_should_write_data_on_forward_path() -> anyhow::Result<()> {
752        let addr: Address = (&ChainKeypair::random()).into();
753        let id = SessionId::new(1u64, HoprPseudonym::random());
754        let (_tx, rx) = futures::channel::mpsc::unbounded();
755        let mut mock = MockSendMsg::new();
756
757        let data = b"Hello, world!".to_vec().into_boxed_slice();
758
759        mock.expect_send_message()
760            .times(1)
761            .withf(move |data, routing,| {
762                assert_eq!(data.plain_text, b"Hello, world!".to_vec().into_boxed_slice());
763                assert!(matches!(routing, DestinationRouting::Forward {forward_options,..} if forward_options == &RoutingOptions::Hops(1_u32.try_into().expect("must be convertible"))));
764                true
765            })
766            .returning(|_, _| Ok(()));
767
768        let mut session = InnerSession::new(
769            id,
770            DestinationRouting::forward_only(addr, RoutingOptions::Hops(1_u32.try_into()?)),
771            Arc::new(mock),
772            Box::pin(rx),
773        );
774
775        let bytes_written = session.write(&data).await?;
776        assert_eq!(bytes_written, data.len());
777
778        Ok(())
779    }
780
781    #[tokio::test]
782    async fn session_should_write_data_on_return_path() -> anyhow::Result<()> {
783        let id = SessionId::new(1u64, HoprPseudonym::random());
784        let (_tx, rx) = futures::channel::mpsc::unbounded();
785        let mut mock = MockSendMsg::new();
786
787        let data = b"Hello, world!".to_vec().into_boxed_slice();
788
789        mock.expect_send_message()
790            .times(1)
791            .withf(move |data, routing,| {
792                assert_eq!(data.plain_text, b"Hello, world!".to_vec().into_boxed_slice());
793                assert!(matches!(routing, DestinationRouting::Return(SurbMatcher::Pseudonym(pseudonym)) if pseudonym == &id.pseudonym));
794                true
795            })
796            .returning(|_, _| Ok(()));
797
798        let mut session = InnerSession::new(
799            id,
800            DestinationRouting::Return(SurbMatcher::Pseudonym(id.pseudonym)),
801            Arc::new(mock),
802            Box::pin(rx),
803        );
804
805        let bytes_written = session.write(&data).await?;
806        assert_eq!(bytes_written, data.len());
807
808        Ok(())
809    }
810
811    #[tokio::test]
812    async fn session_should_chunk_the_data_if_without_segmentation_the_write_size_is_greater_than_the_usable_mtu_size()
813    -> anyhow::Result<()> {
814        const TO_SEND: usize = ApplicationData::PAYLOAD_SIZE * 2 + 10;
815
816        let addr: Address = (&ChainKeypair::random()).into();
817        let id = SessionId::new(1u64, HoprPseudonym::random());
818        let (_tx, rx) = futures::channel::mpsc::unbounded();
819        let mut mock = MockSendMsg::new();
820
821        let data = hopr_crypto_random::random_bytes::<TO_SEND>()
822            .as_ref()
823            .to_vec()
824            .into_boxed_slice();
825
826        mock.expect_send_message().times(3).returning(|_, _| Ok(()));
827
828        let mut session = InnerSession::new(
829            id,
830            DestinationRouting::forward_only(addr, RoutingOptions::Hops(1_u32.try_into()?)),
831            Arc::new(mock),
832            Box::pin(rx),
833        );
834
835        let bytes_written = session.write(&data).await?;
836        assert_eq!(bytes_written, TO_SEND);
837
838        Ok(())
839    }
840}