1use std::{
2 fmt::{Debug, Display, Formatter},
3 hash::{Hash, Hasher},
4 io::{Error, ErrorKind},
5 pin::Pin,
6 sync::Arc,
7 task::{Context, Poll},
8 time::Duration,
9};
10
11use futures::{StreamExt, pin_mut};
12use hopr_crypto_packet::prelude::HoprPacket;
13use hopr_internal_types::prelude::HoprPseudonym;
14use hopr_network_types::{
15 prelude::{DestinationRouting, SealedHost},
16 session::state::{SessionConfig, SessionSocket},
17};
18use hopr_primitive_types::prelude::BytesRepresentable;
19use hopr_transport_packet::prelude::{ApplicationData, Tag};
20use tracing::{debug, error};
21
22use crate::{Capabilities, Capability, capabilities_to_features, errors::TransportSessionError, traits::SendMsg};
23
24#[cfg(all(feature = "prometheus", not(test)))]
25lazy_static::lazy_static! {
26 static ref METRIC_SESSION_INNER_SIZES: hopr_metrics::MultiHistogram =
27 hopr_metrics::MultiHistogram::new(
28 "hopr_session_inner_sizes",
29 "Sizes of data chunks fed from inner session to HOPR protocol",
30 vec![20.0, 40.0, 80.0, 160.0, 320.0, 640.0, 1280.0],
31 &["session_id"]
32 ).unwrap();
33}
34
35const fn max_decimal_digits_for_n_bytes(n: usize) -> usize {
40 const LOG10_2_SCALED: u64 = 301030;
42 const SCALE: u64 = 1_000_000;
43
44 let scaled = 8 * n as u64 * LOG10_2_SCALED;
46
47 scaled.div_ceil(SCALE) as usize
48}
49
50const MAX_SESSION_ID_STR_LEN: usize = 2 + 2 * HoprPseudonym::SIZE + 1 + max_decimal_digits_for_n_bytes(Tag::SIZE);
52
53#[derive(Clone, Copy)]
59pub struct SessionId {
60 tag: Tag,
61 pseudonym: HoprPseudonym,
62 cached: arrayvec::ArrayString<MAX_SESSION_ID_STR_LEN>,
68}
69
70impl SessionId {
71 pub fn new<T: Into<Tag>>(tag: T, pseudonym: HoprPseudonym) -> Self {
72 let tag = tag.into();
73 let mut cached = format!("{pseudonym}:{tag}");
74 cached.truncate(MAX_SESSION_ID_STR_LEN);
75
76 Self {
77 tag,
78 pseudonym,
79 cached: cached.parse().expect("cannot fail due to truncation"),
80 }
81 }
82
83 pub fn tag(&self) -> Tag {
84 self.tag
85 }
86
87 pub fn pseudonym(&self) -> &HoprPseudonym {
88 &self.pseudonym
89 }
90
91 pub fn as_str(&self) -> &str {
92 &self.cached
93 }
94}
95
96#[cfg(feature = "serde")]
97impl serde::Serialize for SessionId {
98 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
99 where
100 S: serde::Serializer,
101 {
102 use serde::ser::SerializeStruct;
103 let mut state = serializer.serialize_struct("SessionId", 2)?;
104 state.serialize_field("tag", &self.tag)?;
105 state.serialize_field("pseudonym", &self.pseudonym)?;
106 state.end()
107 }
108}
109
110#[cfg(feature = "serde")]
111impl<'de> serde::Deserialize<'de> for SessionId {
112 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
113 where
114 D: serde::Deserializer<'de>,
115 {
116 use serde::de;
117
118 #[derive(serde::Deserialize)]
119 #[serde(field_identifier, rename_all = "lowercase")]
120 enum Field {
121 Tag,
122 Pseudonym,
123 }
124
125 struct SessionIdVisitor;
126
127 impl<'de> de::Visitor<'de> for SessionIdVisitor {
128 type Value = SessionId;
129
130 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
131 formatter.write_str("struct SessionId")
132 }
133
134 fn visit_seq<A>(self, mut seq: A) -> Result<SessionId, A::Error>
135 where
136 A: de::SeqAccess<'de>,
137 {
138 Ok(SessionId::new(
139 seq.next_element::<Tag>()?
140 .ok_or_else(|| de::Error::invalid_length(0, &self))?,
141 seq.next_element()?.ok_or_else(|| de::Error::invalid_length(1, &self))?,
142 ))
143 }
144
145 fn visit_map<V>(self, mut map: V) -> Result<SessionId, V::Error>
146 where
147 V: de::MapAccess<'de>,
148 {
149 let mut tag: Option<Tag> = None;
150 let mut pseudonym: Option<HoprPseudonym> = None;
151 while let Some(key) = map.next_key()? {
152 match key {
153 Field::Tag => {
154 if tag.is_some() {
155 return Err(de::Error::duplicate_field("tag"));
156 }
157 tag = Some(map.next_value()?);
158 }
159 Field::Pseudonym => {
160 if pseudonym.is_some() {
161 return Err(de::Error::duplicate_field("pseudonym"));
162 }
163 pseudonym = Some(map.next_value()?);
164 }
165 }
166 }
167
168 Ok(SessionId::new(
169 tag.ok_or_else(|| de::Error::missing_field("tag"))?,
170 pseudonym.ok_or_else(|| de::Error::missing_field("pseudonym"))?,
171 ))
172 }
173 }
174
175 const FIELDS: &[&str] = &["tag", "pseudonym"];
176 deserializer.deserialize_struct("SessionId", FIELDS, SessionIdVisitor)
177 }
178}
179
180impl Display for SessionId {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 write!(f, "{}", self.as_str())
183 }
184}
185
186impl Debug for SessionId {
187 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188 write!(f, "{}", self.as_str())
189 }
190}
191
192impl PartialEq for SessionId {
193 fn eq(&self, other: &Self) -> bool {
194 self.tag == other.tag && self.pseudonym == other.pseudonym
195 }
196}
197
198impl Eq for SessionId {}
199
200impl Hash for SessionId {
201 fn hash<H: Hasher>(&self, state: &mut H) {
202 self.tag.hash(state);
203 self.pseudonym.hash(state);
204 }
205}
206
207trait AsyncReadWrite: futures::AsyncWrite + futures::AsyncRead + Send {}
209impl<T: futures::AsyncWrite + futures::AsyncRead + Send> AsyncReadWrite for T {}
210
211pub type ServiceId = u32;
217
218#[derive(Debug, Clone, PartialEq, Eq)]
221#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
222pub enum SessionTarget {
223 UdpStream(SealedHost),
225 TcpStream(SealedHost),
227 ExitNode(ServiceId),
229}
230
231#[derive(Debug)]
234pub struct IncomingSession {
235 pub session: Session,
237 pub target: SessionTarget,
239}
240
241pub struct Session {
243 id: SessionId,
244 inner: Pin<Box<dyn AsyncReadWrite>>,
245 routing: DestinationRouting,
246 capabilities: Capabilities,
247 on_close: Option<Box<dyn FnOnce(SessionId) + Send + Sync>>,
248}
249
250impl Session {
251 pub fn new(
252 id: SessionId,
253 routing: DestinationRouting,
254 capabilities: Capabilities,
255 tx: Arc<dyn SendMsg + Send + Sync>,
256 rx: Pin<Box<dyn futures::Stream<Item = Box<[u8]>> + Send + Sync>>,
257 on_close: Option<Box<dyn FnOnce(SessionId) + Send + Sync>>,
258 ) -> Self {
259 let inner_session = InnerSession::new(id, routing.clone(), tx, rx);
260
261 if !capabilities.is_empty() {
263 let rto_base = Duration::from_secs(3);
267
268 let expiration_coefficient =
269 if !capabilities.is_disjoint(Capability::RetransmissionAck | Capability::RetransmissionNack) {
270 4
271 } else {
272 1
273 };
274
275 let cfg = SessionConfig {
277 enabled_features: capabilities_to_features(&capabilities),
278 acknowledged_frames_buffer: 100_000, frame_expiration_age: rto_base * expiration_coefficient,
280 rto_base_receiver: rto_base, rto_base_sender: rto_base * 2, ..Default::default()
283 };
284 debug!(
285 session_id = ?id,
286 ?cfg,
287 "opening new session socket"
288 );
289
290 Self {
291 id,
292 inner: Box::pin(SessionSocket::<{ ApplicationData::PAYLOAD_SIZE }>::new(
293 id,
294 inner_session,
295 cfg,
296 )),
297 routing,
298 capabilities,
299 on_close,
300 }
301 } else {
302 Self {
304 id,
305 inner: Box::pin(inner_session),
306 routing,
307 capabilities,
308 on_close,
309 }
310 }
311 }
312
313 pub fn id(&self) -> &SessionId {
315 &self.id
316 }
317
318 pub fn routing(&self) -> &DestinationRouting {
320 &self.routing
321 }
322
323 pub fn capabilities(&self) -> &Capabilities {
325 &self.capabilities
326 }
327}
328
329impl std::fmt::Debug for Session {
330 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
331 f.debug_struct("Session")
332 .field("id", &self.id)
333 .field("routing", &self.routing)
334 .finish_non_exhaustive()
335 }
336}
337
338impl futures::AsyncRead for Session {
339 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
340 let inner = self.inner.as_mut();
341 pin_mut!(inner);
342 inner.poll_read(cx, buf)
343 }
344}
345
346impl futures::AsyncWrite for Session {
347 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
348 let inner = &mut self.inner;
349 pin_mut!(inner);
350 inner.poll_write(cx, buf)
351 }
352
353 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
354 let inner = &mut self.inner;
355 pin_mut!(inner);
356 inner.poll_flush(cx)
357 }
358
359 fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<std::io::Result<()>> {
360 let inner = &mut self.inner;
361 pin_mut!(inner);
362 match inner.poll_close(cx) {
363 Poll::Ready(res) => {
364 if let Some(notifier) = self.on_close.take() {
366 notifier(self.id);
367 }
368 Poll::Ready(res)
369 }
370 Poll::Pending => Poll::Pending,
371 }
372 }
373}
374
375#[cfg(feature = "runtime-tokio")]
376impl tokio::io::AsyncRead for Session {
377 fn poll_read(
378 mut self: Pin<&mut Self>,
379 cx: &mut Context<'_>,
380 buf: &mut tokio::io::ReadBuf<'_>,
381 ) -> Poll<std::io::Result<()>> {
382 let slice = buf.initialize_unfilled();
383 let n = std::task::ready!(futures::AsyncRead::poll_read(self.as_mut(), cx, slice))?;
384 buf.advance(n);
385 Poll::Ready(Ok(()))
386 }
387}
388
389#[cfg(feature = "runtime-tokio")]
390impl tokio::io::AsyncWrite for Session {
391 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, Error>> {
392 futures::AsyncWrite::poll_write(self.as_mut(), cx, buf)
393 }
394
395 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
396 futures::AsyncWrite::poll_flush(self.as_mut(), cx)
397 }
398
399 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
400 futures::AsyncWrite::poll_close(self.as_mut(), cx)
401 }
402}
403
404type FuturesBuffer = futures::stream::FuturesUnordered<
405 Pin<Box<dyn std::future::Future<Output = Result<(), TransportSessionError>> + Send>>,
406>;
407struct InnerSession {
408 id: SessionId,
409 routing: DestinationRouting,
410 rx: Pin<Box<dyn futures::Stream<Item = Box<[u8]>> + Send + Sync>>,
411 tx: Arc<dyn SendMsg + Send + Sync>,
412 tx_bytes: usize,
413 tx_buffer: FuturesBuffer,
414 rx_buffer: [u8; HoprPacket::PAYLOAD_SIZE],
415 rx_buffer_range: (usize, usize),
416 closed: bool,
417}
418
419impl InnerSession {
420 pub fn new(
421 id: SessionId,
422 routing: DestinationRouting,
423 tx: Arc<dyn SendMsg + Send + Sync>,
424 rx: Pin<Box<dyn futures::Stream<Item = Box<[u8]>> + Send + Sync>>,
425 ) -> Self {
426 Self {
427 id,
428 routing,
429 rx,
430 tx,
431 tx_bytes: 0,
432 tx_buffer: futures::stream::FuturesUnordered::new(),
433 rx_buffer: [0; HoprPacket::PAYLOAD_SIZE],
434 rx_buffer_range: (0, 0),
435 closed: false,
436 }
437 }
438}
439
440impl futures::AsyncWrite for InnerSession {
441 fn poll_write(
442 mut self: std::pin::Pin<&mut Self>,
443 cx: &mut std::task::Context<'_>,
444 buf: &[u8],
445 ) -> Poll<std::io::Result<usize>> {
446 if self.closed {
447 return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, "session closed")));
448 }
449
450 #[cfg(all(feature = "prometheus", not(test)))]
451 METRIC_SESSION_INNER_SIZES.observe(&[self.id.as_str()], buf.len() as f64);
452
453 if !self.tx_buffer.is_empty() {
454 loop {
455 match self.tx_buffer.poll_next_unpin(cx) {
456 Poll::Ready(Some(Ok(()))) => {
457 continue;
458 }
459 Poll::Ready(Some(Err(TransportSessionError::OutOfSurbs))) => {
460 error!(session_id = %self.id, "message discarded due to missing SURB for reply");
462 continue;
463 }
464 Poll::Ready(Some(Err(e))) => {
465 error!(session_id = %self.id, error = %e, "failed to send the message chunk inside a session");
466 return Poll::Ready(Err(Error::from(ErrorKind::BrokenPipe)));
467 }
468 Poll::Ready(None) => {
469 self.tx_buffer.clear();
470 return Poll::Ready(Ok(self.tx_bytes));
471 }
472 Poll::Pending => {
473 return Poll::Pending;
474 }
475 }
476 }
477 }
478
479 let tag = self.id.tag();
480
481 self.tx_buffer.clear();
482 self.tx_bytes = 0;
483
484 for i in
485 0..(buf.len() / ApplicationData::PAYLOAD_SIZE + ((buf.len() % ApplicationData::PAYLOAD_SIZE != 0) as usize))
486 {
487 let start = i * ApplicationData::PAYLOAD_SIZE;
488 let end = ((i + 1) * ApplicationData::PAYLOAD_SIZE).min(buf.len());
489
490 let payload = ApplicationData::new(tag, &buf[start..end]);
491 let sender = self.tx.clone();
492 let routing = self.routing.clone();
493
494 self.tx_buffer
495 .push(Box::pin(async move { sender.send_message(payload, routing).await }));
496
497 self.tx_bytes += end - start;
498 }
499
500 loop {
501 match self.tx_buffer.poll_next_unpin(cx) {
502 Poll::Ready(Some(Ok(_))) => {
503 continue;
504 }
505 Poll::Ready(Some(Err(TransportSessionError::OutOfSurbs))) => {
506 error!(session_id = %self.id, "message discarded due to missing SURB for reply");
508 continue;
509 }
510 Poll::Ready(Some(Err(error))) => {
511 error!(session_id = %self.id, %error, "failed to send the message chunk inside a session");
512 break Poll::Ready(Err(Error::from(ErrorKind::BrokenPipe)));
513 }
514 Poll::Ready(None) => {
515 self.tx_buffer.clear();
516 break Poll::Ready(Ok(self.tx_bytes));
517 }
518 Poll::Pending => {
519 break Poll::Pending;
520 }
521 }
522 }
523 }
524
525 fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<std::io::Result<()>> {
526 if self.closed {
527 return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, "session closed")));
528 }
529
530 while let Poll::Ready(Some(result)) = self.tx_buffer.poll_next_unpin(cx) {
531 if let Err(error) = result {
532 error!(session_id = %self.id, %error, "failed to send message chunk inside session during flush");
533 return Poll::Ready(Err(Error::from(ErrorKind::BrokenPipe)));
534 }
535 }
536 Poll::Ready(Ok(()))
537 }
538
539 fn poll_close(mut self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<std::io::Result<()>> {
540 self.closed = true;
541 Poll::Ready(Ok(()))
542 }
543}
544
545impl futures::AsyncRead for InnerSession {
546 fn poll_read(
547 mut self: std::pin::Pin<&mut Self>,
548 cx: &mut std::task::Context<'_>,
549 buf: &mut [u8],
550 ) -> Poll<std::io::Result<usize>> {
551 if self.rx_buffer_range.0 != self.rx_buffer_range.1 {
552 let start = self.rx_buffer_range.0;
553 let copy_len = self.rx_buffer_range.1.min(buf.len());
554
555 buf[..copy_len].copy_from_slice(&self.rx_buffer[start..start + copy_len]);
556
557 self.rx_buffer_range.0 += copy_len;
558 if self.rx_buffer_range.0 == self.rx_buffer_range.1 {
559 self.rx_buffer_range = (0, 0);
560 }
561
562 return Poll::Ready(Ok(copy_len));
563 }
564
565 match self.rx.poll_next_unpin(cx) {
566 Poll::Ready(Some(data)) => {
567 let data_len = data.len();
568 let copy_len = data_len.min(buf.len());
569 if copy_len < data_len {
570 self.rx_buffer[0..data_len - copy_len].copy_from_slice(&data[copy_len..]);
571 self.rx_buffer_range = (0, data_len - copy_len);
572 }
573
574 buf[..copy_len].copy_from_slice(&data[..copy_len]);
575
576 Poll::Ready(Ok(copy_len))
577 }
578 Poll::Ready(None) => {
579 Poll::Ready(Ok(0)) }
581 Poll::Pending => Poll::Pending,
582 }
583 }
584}
585
586#[cfg(feature = "runtime-tokio")]
590pub async fn transfer_session<S>(
591 session: &mut Session,
592 stream: &mut S,
593 max_buffer: usize,
594) -> std::io::Result<(usize, usize)>
595where
596 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
597{
598 let into_session_len = if session.capabilities().contains(Capability::Segmentation) {
603 max_buffer.min(SessionSocket::<{ ApplicationData::PAYLOAD_SIZE }>::MAX_WRITE_SIZE)
604 } else {
605 max_buffer
606 };
607
608 debug!(
609 session_id = ?session.id(),
610 egress_buffer = max_buffer,
611 ingress_buffer = into_session_len,
612 "session buffers"
613 );
614
615 hopr_network_types::utils::copy_duplex(session, stream, max_buffer, into_session_len)
616 .await
617 .map(|(a, b)| (a as usize, b as usize))
618}
619
620#[cfg(test)]
621mod tests {
622 use futures::{AsyncReadExt, AsyncWriteExt};
623 use hopr_crypto_random::Randomizable;
624 use hopr_crypto_types::keypairs::{ChainKeypair, Keypair};
625 use hopr_network_types::prelude::{RoutingOptions, SurbMatcher};
626 use hopr_primitive_types::prelude::Address;
627
628 use super::*;
629 use crate::traits::MockSendMsg;
630
631 #[test]
632 fn test_max_decimal_digits_for_n_bytes() {
633 assert_eq!(3, max_decimal_digits_for_n_bytes(size_of::<u8>()));
634 assert_eq!(5, max_decimal_digits_for_n_bytes(size_of::<u16>()));
635 assert_eq!(10, max_decimal_digits_for_n_bytes(size_of::<u32>()));
636 assert_eq!(20, max_decimal_digits_for_n_bytes(size_of::<u64>()));
637 }
638
639 #[cfg(feature = "serde")]
640 #[test]
641 fn session_id_should_serialize_and_deserialize_correctly() -> anyhow::Result<()> {
642 const SESSION_BINCODE_CONFIGURATION: bincode::config::Configuration = bincode::config::standard()
643 .with_little_endian()
644 .with_variable_int_encoding();
645
646 let pseudonym = HoprPseudonym::random();
647 let tag: Tag = 1234u64.into();
648
649 let session_id_1 = SessionId::new(tag, pseudonym);
650 let data = bincode::serde::encode_to_vec(session_id_1, SESSION_BINCODE_CONFIGURATION)?;
651 let session_id_2: SessionId = bincode::serde::decode_from_slice(&data, SESSION_BINCODE_CONFIGURATION)?.0;
652
653 assert_eq!(tag, session_id_2.tag());
654 assert_eq!(pseudonym, *session_id_2.pseudonym());
655
656 assert_eq!(session_id_1.as_str(), session_id_2.as_str());
657 assert_eq!(session_id_1, session_id_2);
658
659 Ok(())
660 }
661
662 #[test]
663 fn session_should_identify_with_its_own_id() -> anyhow::Result<()> {
664 let addr: Address = (&ChainKeypair::random()).into();
665 let id = SessionId::new(1u64, HoprPseudonym::random());
666 let (_tx, rx) = futures::channel::mpsc::unbounded();
667 let mock = MockSendMsg::new();
668
669 let session = InnerSession::new(
670 id,
671 DestinationRouting::forward_only(addr, RoutingOptions::Hops(1_u32.try_into()?)),
672 Arc::new(mock),
673 Box::pin(rx),
674 );
675
676 assert_eq!(session.id, id);
677
678 Ok(())
679 }
680
681 #[tokio::test]
682 async fn session_should_read_data_in_one_swoop_if_the_buffer_is_sufficiently_large() -> anyhow::Result<()> {
683 let addr: Address = (&ChainKeypair::random()).into();
684 let id = SessionId::new(1u64, HoprPseudonym::random());
685 let (tx, rx) = futures::channel::mpsc::unbounded();
686 let mock = MockSendMsg::new();
687
688 let mut session = InnerSession::new(
689 id,
690 DestinationRouting::forward_only(addr, RoutingOptions::Hops(1_u32.try_into()?)),
691 Arc::new(mock),
692 Box::pin(rx),
693 );
694
695 let random_data = hopr_crypto_random::random_bytes::<{ HoprPacket::PAYLOAD_SIZE }>()
696 .as_ref()
697 .to_vec()
698 .into_boxed_slice();
699
700 assert!(tx.unbounded_send(random_data.clone()).is_ok());
701
702 let mut buffer = vec![0; HoprPacket::PAYLOAD_SIZE * 2];
703
704 let bytes_read = session.read(&mut buffer[..]).await?;
705
706 assert_eq!(bytes_read, random_data.len());
707 assert_eq!(&buffer[..bytes_read], random_data.as_ref());
708
709 Ok(())
710 }
711
712 #[tokio::test]
713 async fn session_should_read_data_in_multiple_rounds_if_the_buffer_is_not_sufficiently_large() -> anyhow::Result<()>
714 {
715 let addr: Address = (&ChainKeypair::random()).into();
716 let id = SessionId::new(1u64, HoprPseudonym::random());
717 let (tx, rx) = futures::channel::mpsc::unbounded();
718 let mock = MockSendMsg::new();
719
720 let mut session = InnerSession::new(
721 id,
722 DestinationRouting::forward_only(addr, RoutingOptions::Hops(1_u32.try_into()?)),
723 Arc::new(mock),
724 Box::pin(rx),
725 );
726
727 let random_data = hopr_crypto_random::random_bytes::<{ HoprPacket::PAYLOAD_SIZE }>()
728 .as_ref()
729 .to_vec()
730 .into_boxed_slice();
731
732 assert!(tx.unbounded_send(random_data.clone()).is_ok());
733
734 const BUFFER_SIZE: usize = HoprPacket::PAYLOAD_SIZE - 1;
735 let mut buffer = vec![0; BUFFER_SIZE];
736
737 let bytes_read = session.read(&mut buffer[..]).await?;
738
739 assert_eq!(bytes_read, BUFFER_SIZE);
740 assert_eq!(&buffer[..bytes_read], &random_data[..BUFFER_SIZE]);
741
742 let bytes_read = session.read(&mut buffer[..]).await?;
743
744 assert_eq!(bytes_read, HoprPacket::PAYLOAD_SIZE - BUFFER_SIZE);
745 assert_eq!(&buffer[..bytes_read], &random_data[BUFFER_SIZE..]);
746
747 Ok(())
748 }
749
750 #[tokio::test]
751 async fn session_should_write_data_on_forward_path() -> anyhow::Result<()> {
752 let addr: Address = (&ChainKeypair::random()).into();
753 let id = SessionId::new(1u64, HoprPseudonym::random());
754 let (_tx, rx) = futures::channel::mpsc::unbounded();
755 let mut mock = MockSendMsg::new();
756
757 let data = b"Hello, world!".to_vec().into_boxed_slice();
758
759 mock.expect_send_message()
760 .times(1)
761 .withf(move |data, routing,| {
762 assert_eq!(data.plain_text, b"Hello, world!".to_vec().into_boxed_slice());
763 assert!(matches!(routing, DestinationRouting::Forward {forward_options,..} if forward_options == &RoutingOptions::Hops(1_u32.try_into().expect("must be convertible"))));
764 true
765 })
766 .returning(|_, _| Ok(()));
767
768 let mut session = InnerSession::new(
769 id,
770 DestinationRouting::forward_only(addr, RoutingOptions::Hops(1_u32.try_into()?)),
771 Arc::new(mock),
772 Box::pin(rx),
773 );
774
775 let bytes_written = session.write(&data).await?;
776 assert_eq!(bytes_written, data.len());
777
778 Ok(())
779 }
780
781 #[tokio::test]
782 async fn session_should_write_data_on_return_path() -> anyhow::Result<()> {
783 let id = SessionId::new(1u64, HoprPseudonym::random());
784 let (_tx, rx) = futures::channel::mpsc::unbounded();
785 let mut mock = MockSendMsg::new();
786
787 let data = b"Hello, world!".to_vec().into_boxed_slice();
788
789 mock.expect_send_message()
790 .times(1)
791 .withf(move |data, routing,| {
792 assert_eq!(data.plain_text, b"Hello, world!".to_vec().into_boxed_slice());
793 assert!(matches!(routing, DestinationRouting::Return(SurbMatcher::Pseudonym(pseudonym)) if pseudonym == &id.pseudonym));
794 true
795 })
796 .returning(|_, _| Ok(()));
797
798 let mut session = InnerSession::new(
799 id,
800 DestinationRouting::Return(SurbMatcher::Pseudonym(id.pseudonym)),
801 Arc::new(mock),
802 Box::pin(rx),
803 );
804
805 let bytes_written = session.write(&data).await?;
806 assert_eq!(bytes_written, data.len());
807
808 Ok(())
809 }
810
811 #[tokio::test]
812 async fn session_should_chunk_the_data_if_without_segmentation_the_write_size_is_greater_than_the_usable_mtu_size()
813 -> anyhow::Result<()> {
814 const TO_SEND: usize = ApplicationData::PAYLOAD_SIZE * 2 + 10;
815
816 let addr: Address = (&ChainKeypair::random()).into();
817 let id = SessionId::new(1u64, HoprPseudonym::random());
818 let (_tx, rx) = futures::channel::mpsc::unbounded();
819 let mut mock = MockSendMsg::new();
820
821 let data = hopr_crypto_random::random_bytes::<TO_SEND>()
822 .as_ref()
823 .to_vec()
824 .into_boxed_slice();
825
826 mock.expect_send_message().times(3).returning(|_, _| Ok(()));
827
828 let mut session = InnerSession::new(
829 id,
830 DestinationRouting::forward_only(addr, RoutingOptions::Hops(1_u32.try_into()?)),
831 Arc::new(mock),
832 Box::pin(rx),
833 );
834
835 let bytes_written = session.write(&data).await?;
836 assert_eq!(bytes_written, TO_SEND);
837
838 Ok(())
839 }
840}