1use futures::channel::mpsc::UnboundedSender;
2use futures::future::Either;
3use futures::{pin_mut, FutureExt, StreamExt, TryStreamExt};
4use hopr_internal_types::prelude::{ApplicationData, Tag};
5use hopr_network_types::prelude::*;
6use std::ops::Range;
7use std::sync::{Arc, OnceLock};
8use std::time::Duration;
9use tracing::{debug, error, info, trace, warn};
10
11use crate::errors::{SessionManagerError, TransportSessionError};
12use crate::initiation::{
13 StartChallenge, StartErrorReason, StartErrorType, StartEstablished, StartInitiation, StartProtocol,
14};
15use crate::traits::SendMsg;
16use crate::types::unwrap_offchain_key;
17use crate::{IncomingSession, Session, SessionClientConfig, SessionId};
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21 static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
22 "hopr_session_num_active_sessions",
23 "Number of currently active HOPR sessions"
24 ).unwrap();
25 static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
26 "hopr_session_established_sessions_count",
27 "Number of sessions that were successfully established as an Exit node"
28 ).unwrap();
29 static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
30 "hopr_session_initiated_sessions_count",
31 "Number of sessions that were successfully initiated as an Entry node"
32 ).unwrap();
33 static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
34 "hopr_session_received_error_count",
35 "Number of HOPR session errors received from an Exit node",
36 &["kind"]
37 ).unwrap();
38 static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
39 "hopr_session_sent_error_count",
40 "Number of HOPR session errors sent to an Entry node",
41 &["kind"]
42 ).unwrap();
43}
44
45#[derive(Clone, Debug, PartialEq, Eq, smart_default::SmartDefault)]
47pub struct SessionManagerConfig {
48 #[default(_code = "16..1024")]
56 pub session_tag_range: Range<Tag>,
57
58 #[default(Duration::from_secs(5))]
65 pub initiation_timeout_base: Duration,
66
67 #[default(Duration::from_secs(180))]
71 pub idle_timeout: Duration,
72}
73
74fn close_session_after_eviction<S: SendMsg + Send + Sync + 'static>(
75 msg_sender: Arc<OnceLock<S>>,
76 me: PeerId,
77 session_id: SessionId,
78 session_data: CachedSession,
79 cause: moka::notification::RemovalCause,
80) -> moka::notification::ListenerFuture {
81 match cause {
84 r @ moka::notification::RemovalCause::Expired | r @ moka::notification::RemovalCause::Size
85 if msg_sender.get().is_some() =>
86 {
87 trace!(
88 ?session_id,
89 reason = ?r,
90 "session termination due to eviction from the cache"
91 );
92 let data = match ApplicationData::try_from(StartProtocol::CloseSession(session_id.with_peer(me))) {
93 Ok(data) => data,
94 Err(e) => {
95 error!(
96 ?session_id,
97 error = %e,
98 "failed to serialize CloseSession"
99 );
100 return futures::future::ready(()).boxed();
101 }
102 };
103
104 let msg_sender = msg_sender.clone();
105 async move {
106 if let Err(err) = msg_sender
108 .get()
109 .unwrap()
110 .send_message(data, *session_id.peer(), session_data.routing_opts)
111 .await
112 {
113 error!(
114 ?session_id,
115 error = %err,
116 "could not send notification of session closure after cache eviction"
117 );
118 }
119
120 session_data.session_tx.close_channel();
121 debug!(
122 ?session_id,
123 reason = ?r,
124 "session has been closed due to cache eviction"
125 );
126
127 #[cfg(all(feature = "prometheus", not(test)))]
128 METRIC_ACTIVE_SESSIONS.decrement(1.0);
129 }
130 .boxed()
131 }
132 _ => futures::future::ready(()).boxed(),
133 }
134}
135
136async fn insert_into_next_slot<K, V, F>(cache: &moka::future::Cache<K, V>, gen: F, value: V) -> Option<K>
142where
143 K: Copy + std::hash::Hash + Eq + Send + Sync + 'static,
144 V: Clone + Send + Sync + 'static,
145 F: Fn(Option<K>) -> K,
146{
147 let initial = gen(None);
148 let mut next = initial;
149 loop {
150 let insertion_result = cache
151 .entry(next)
152 .and_try_compute_with(|e| {
153 if e.is_none() {
154 futures::future::ok::<_, ()>(moka::ops::compute::Op::Put(value.clone()))
155 } else {
156 futures::future::ok::<_, ()>(moka::ops::compute::Op::Nop)
157 }
158 })
159 .await;
160
161 if let Ok(moka::ops::compute::CompResult::Inserted(_)) = insertion_result {
163 return Some(next);
164 }
165
166 next = gen(Some(next));
168
169 if next == initial {
171 return None;
172 }
173 }
174}
175
176pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
178
179type SessionInitiationCache =
183 moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
184
185#[derive(Clone)]
186struct CachedSession {
187 session_tx: Arc<UnboundedSender<Box<[u8]>>>,
190 routing_opts: RoutingOptions,
191}
192
193#[derive(Clone, Debug, PartialEq, Eq)]
195pub enum DispatchResult {
196 Processed,
198 Unrelated(ApplicationData),
200}
201
202pub struct SessionManager<S> {
215 session_initiations: SessionInitiationCache,
216 session_notifiers: Arc<OnceLock<(UnboundedSender<IncomingSession>, UnboundedSender<SessionId>)>>,
217 sessions: moka::future::Cache<SessionId, CachedSession>,
218 me: PeerId,
219 msg_sender: Arc<OnceLock<S>>,
220 cfg: SessionManagerConfig,
221}
222
223impl<S> Clone for SessionManager<S> {
224 fn clone(&self) -> Self {
225 Self {
226 session_initiations: self.session_initiations.clone(),
227 session_notifiers: self.session_notifiers.clone(),
228 sessions: self.sessions.clone(),
229 me: self.me,
230 cfg: self.cfg.clone(),
231 msg_sender: self.msg_sender.clone(),
232 }
233 }
234}
235
236fn initiation_timeout_max(base: Duration, hops: usize) -> Duration {
237 2 * base * (hops as u32)
238}
239
240pub const MIN_SESSION_TAG_RANGE_RESERVATION: Tag = 16;
242
243impl<S: SendMsg + Clone + Send + Sync + 'static> SessionManager<S> {
244 pub fn new(me: PeerId, mut cfg: SessionManagerConfig) -> Self {
246 if cfg.session_tag_range.start < MIN_SESSION_TAG_RANGE_RESERVATION {
248 let diff = MIN_SESSION_TAG_RANGE_RESERVATION - cfg.session_tag_range.start;
249 cfg.session_tag_range = MIN_SESSION_TAG_RANGE_RESERVATION..cfg.session_tag_range.end + diff;
250 }
251
252 #[cfg(all(feature = "prometheus", not(test)))]
253 METRIC_ACTIVE_SESSIONS.set(0.0);
254
255 let msg_sender = Arc::new(OnceLock::new());
256 Self {
257 msg_sender: msg_sender.clone(),
258 session_initiations: moka::future::Cache::builder()
259 .max_capacity(cfg.session_tag_range.clone().count() as u64)
260 .time_to_live(initiation_timeout_max(
261 cfg.initiation_timeout_base,
262 RoutingOptions::MAX_INTERMEDIATE_HOPS,
263 ))
264 .build(),
265 sessions: moka::future::Cache::builder()
266 .max_capacity(u16::MAX as u64)
267 .time_to_idle(cfg.idle_timeout)
268 .async_eviction_listener(move |k, v, c| {
269 let msg_sender = msg_sender.clone();
270 close_session_after_eviction(msg_sender, me, *k, v, c)
271 })
272 .build(),
273 session_notifiers: Arc::new(OnceLock::new()),
274 me,
275 cfg,
276 }
277 }
278
279 pub fn start(
285 &self,
286 msg_sender: S,
287 new_session_notifier: UnboundedSender<IncomingSession>,
288 ) -> crate::errors::Result<Vec<hopr_async_runtime::prelude::JoinHandle<()>>> {
289 self.msg_sender
290 .set(msg_sender)
291 .map_err(|_| SessionManagerError::AlreadyStarted)?;
292
293 let (session_close_tx, session_close_rx) = futures::channel::mpsc::unbounded();
294 self.session_notifiers
295 .set((new_session_notifier, session_close_tx))
296 .map_err(|_| SessionManagerError::AlreadyStarted)?;
297
298 let myself = self.clone();
299 let jh_closure_notifications =
300 hopr_async_runtime::prelude::spawn(session_close_rx.for_each_concurrent(None, move |closed_session_id| {
301 let myself = myself.clone();
302 async move {
303 trace!(
304 session_id = ?closed_session_id,
305 "sending notification of session closure done by us"
306 );
307 match myself.close_session(closed_session_id, true).await {
308 Ok(closed) if closed => debug!(
309 session_id = ?closed_session_id,
310 "session has been closed by us"
311 ),
312 Err(e) => error!(
313 session_id = ?closed_session_id,
314 error = %e,
315 "cannot initiate session closure notification"
316 ),
317 _ => {}
318 }
319 }
320 }));
321
322 let myself = self.clone();
327 let jh_session_expiration = hopr_async_runtime::prelude::spawn(async move {
328 let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
329 let timeout = initiation_timeout_max(
330 myself.cfg.initiation_timeout_base,
331 RoutingOptions::MAX_INTERMEDIATE_HOPS,
332 )
333 .min(myself.cfg.idle_timeout)
334 .mul_f64(jitter)
335 / 2;
336 loop {
337 hopr_async_runtime::prelude::sleep(timeout).await;
338 trace!("executing session cache evictions");
339 futures::join!(
340 myself.sessions.run_pending_tasks(),
341 myself.session_initiations.run_pending_tasks()
342 );
343 }
344 });
345
346 Ok(vec![jh_closure_notifications, jh_session_expiration])
347 }
348
349 pub fn is_started(&self) -> bool {
351 self.session_notifiers.get().is_some()
352 }
353
354 pub async fn new_session(&self, cfg: SessionClientConfig) -> crate::errors::Result<Session> {
362 let msg_sender = self.msg_sender.get().ok_or(SessionManagerError::NotStarted)?;
363
364 let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
365 let challenge = insert_into_next_slot(
366 &self.session_initiations,
367 |ch| {
368 if let Some(challenge) = ch {
369 ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
370 } else {
371 hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
372 }
373 },
374 tx_initiation_done,
375 )
376 .await
377 .ok_or(SessionManagerError::NoChallengeSlots)?; trace!(challenge, ?cfg, "initiating session with config");
381 let start_session_msg = StartProtocol::<SessionId>::StartSession(StartInitiation {
382 challenge,
383 target: cfg.target,
384 capabilities: cfg.capabilities.iter().copied().collect(),
385 back_routing: Some((cfg.path_options.clone().invert(), self.me)),
387 });
388
389 trace!(challenge, "sending new session request");
391 msg_sender
392 .send_message(start_session_msg.try_into()?, cfg.peer, cfg.path_options.clone())
393 .await?;
394
395 pin_mut!(rx_initiation_done);
397 let initiation_done = TryStreamExt::try_next(&mut rx_initiation_done);
398
399 let timeout = hopr_async_runtime::prelude::sleep(initiation_timeout_max(
401 self.cfg.initiation_timeout_base,
402 cfg.path_options.count_hops() + 1,
403 ));
404 pin_mut!(timeout);
405
406 trace!(challenge, "awaiting session establishment");
407 match futures::future::select(initiation_done, timeout).await {
408 Either::Left((Ok(Some(est)), _)) => {
409 let session_id = est.session_id;
411 debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
412
413 let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
415 self.sessions
416 .insert(
417 session_id,
418 CachedSession {
419 session_tx: Arc::new(tx),
420 routing_opts: cfg.path_options.clone(),
421 },
422 )
423 .await;
424
425 #[cfg(all(feature = "prometheus", not(test)))]
426 {
427 METRIC_NUM_INITIATED_SESSIONS.increment();
428 METRIC_ACTIVE_SESSIONS.increment(1.0);
429 }
430
431 Ok(Session::new(
432 session_id,
433 self.me,
434 cfg.path_options,
435 cfg.capabilities.into_iter().collect(),
436 Arc::new(msg_sender.clone()),
437 rx,
438 self.session_notifiers.get().map(|(_, c)| c.clone()),
439 ))
440 }
441 Either::Left((Ok(None), _)) => Err(SessionManagerError::Other(
442 "internal error: sender has been closed without completing the session establishment".into(),
443 )
444 .into()),
445 Either::Left((Err(e), _)) => {
446 error!(
448 challenge = e.challenge,
449 error = ?e,
450 "the other party rejected the session initiation with error"
451 );
452 Err(TransportSessionError::Rejected(e.reason))
453 }
454 Either::Right(_) => {
455 error!(challenge, "session initiation attempt timed out");
457
458 #[cfg(all(feature = "prometheus", not(test)))]
459 METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
460
461 Err(TransportSessionError::Timeout)
462 }
463 }
464 }
465
466 pub async fn dispatch_message(&self, data: ApplicationData) -> crate::errors::Result<DispatchResult> {
473 if let Some(app_tag) = &data.application_tag {
474 if (0..self.cfg.session_tag_range.start).contains(app_tag) {
475 trace!(tag = app_tag, "dispatching Start protocol message");
476 return self
477 .handle_start_protocol_message(data)
478 .await
479 .map(|_| DispatchResult::Processed);
480 } else if self.cfg.session_tag_range.contains(app_tag) {
481 let (peer, data) = unwrap_offchain_key(data.plain_text.clone())?;
482
483 let session_id = SessionId::new(*app_tag, peer);
484
485 return if let Some(session_data) = self.sessions.get(&session_id).await {
486 trace!(?session_id, "received data for a registered session");
487
488 Ok(session_data
489 .session_tx
490 .unbounded_send(data)
491 .map(|_| DispatchResult::Processed)
492 .map_err(|e| SessionManagerError::Other(e.to_string()))?)
493 } else {
494 error!(%session_id, "received data from an unestablished session");
495 Err(TransportSessionError::UnknownData)
496 };
497 }
498 }
499
500 Ok(DispatchResult::Unrelated(data))
501 }
502
503 async fn handle_start_protocol_message(&self, data: ApplicationData) -> crate::errors::Result<()> {
504 match StartProtocol::<SessionId>::try_from(data)? {
505 StartProtocol::StartSession(session_req) => {
506 trace!(challenge = session_req.challenge, "received session initiation request");
507
508 let (route, peer) = session_req.back_routing.ok_or(SessionManagerError::NoBackRoutingInfo)?;
510
511 debug!(%peer, "got new session request, searching for a free session slot");
512
513 let msg_sender = self.msg_sender.get().ok_or(SessionManagerError::NotStarted)?;
514
515 let (new_session_notifier, close_session_notifier) = self
516 .session_notifiers
517 .get()
518 .cloned()
519 .ok_or(SessionManagerError::NotStarted)?;
520
521 let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
523 if let Some(session_id) = insert_into_next_slot(
524 &self.sessions,
525 |sid| {
526 let next_tag = match sid {
527 Some(session_id) => ((session_id.tag() + 1) % self.cfg.session_tag_range.end)
528 .max(self.cfg.session_tag_range.start),
529 None => hopr_crypto_random::random_integer(
530 self.cfg.session_tag_range.start as u64,
531 Some(self.cfg.session_tag_range.end as u64),
532 ) as u16,
533 };
534 SessionId::new(next_tag, peer)
535 },
536 CachedSession {
537 session_tx: Arc::new(tx_session_data),
538 routing_opts: route.clone(),
539 },
540 )
541 .await
542 {
543 debug!(?session_id, "assigning a new session");
544
545 let session = Session::new(
546 session_id,
547 self.me,
548 route.clone(),
549 session_req.capabilities,
550 Arc::new(msg_sender.clone()),
551 rx_session_data,
552 close_session_notifier.into(),
553 );
554
555 let incoming_session = IncomingSession {
557 session,
558 target: session_req.target,
559 };
560
561 if let Err(e) = new_session_notifier.unbounded_send(incoming_session) {
563 warn!(error = %e, "failed to send session to incoming session queue");
564 }
565
566 trace!(?session_id, "session notification sent");
567
568 let data = StartProtocol::SessionEstablished(StartEstablished {
571 orig_challenge: session_req.challenge,
572 session_id: session_id.with_peer(self.me),
573 });
574
575 msg_sender
576 .send_message(data.try_into()?, peer, route)
577 .await
578 .map_err(|e| {
579 SessionManagerError::Other(format!("failed to send session establishment message: {e}"))
580 })?;
581
582 info!(%session_id, "new session established");
583
584 #[cfg(all(feature = "prometheus", not(test)))]
585 {
586 METRIC_NUM_ESTABLISHED_SESSIONS.increment();
587 METRIC_ACTIVE_SESSIONS.increment(1.0);
588 }
589 } else {
590 error!(
591 %peer,
592 "failed to reserve a new session slot"
593 );
594
595 let reason = StartErrorReason::NoSlotsAvailable;
597 let data = StartProtocol::<SessionId>::SessionError(StartErrorType {
598 challenge: session_req.challenge,
599 reason,
600 });
601
602 msg_sender
603 .send_message(data.try_into()?, peer, route)
604 .await
605 .map_err(|e| {
606 SessionManagerError::Other(format!(
607 "failed to send session establishment error message: {e}"
608 ))
609 })?;
610
611 trace!(%peer, "session establishment failure message sent");
612
613 #[cfg(all(feature = "prometheus", not(test)))]
614 METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
615 }
616 }
617 StartProtocol::SessionEstablished(est) => {
618 trace!(
619 session_id = ?est.session_id,
620 "received session establishment confirmation"
621 );
622 let challenge = est.orig_challenge;
623 if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
624 if let Err(e) = tx_est.unbounded_send(Ok(est)) {
625 return Err(
626 SessionManagerError::Other(format!("could not notify session establishment: {e}")).into(),
627 );
628 }
629 debug!(challenge, "session establishment complete");
630 } else {
631 error!(challenge, "session establishment attempt expired");
632 }
633 }
634 StartProtocol::SessionError(err) => {
635 trace!(
636 challenge = err.challenge,
637 error = ?err.reason,
638 "failed to initialize a session",
639 );
640 if let Some(tx_est) = self.session_initiations.remove(&err.challenge).await {
643 if let Err(e) = tx_est.unbounded_send(Err(err)) {
644 return Err(SessionManagerError::Other(format!(
645 "could not notify session establishment error {err:?}: {e}"
646 ))
647 .into());
648 }
649 error!(
650 challenge = err.challenge,
651 error = ?err,
652 "session establishment error received"
653 );
654 } else {
655 error!(
656 challenge = err.challenge,
657 error = ?err,
658 "session establishment attempt expired before error could be delivered"
659 );
660 }
661
662 #[cfg(all(feature = "prometheus", not(test)))]
663 METRIC_RECEIVED_SESSION_ERRS.increment(&[&err.reason.to_string()])
664 }
665 StartProtocol::CloseSession(session_id) => {
666 trace!(?session_id, "received session close request");
667 match self.close_session(session_id, false).await {
668 Ok(closed) if closed => debug!(?session_id, "session has been closed by the other party"),
669 Err(e) => error!(
670 ?session_id,
671 error = %e,
672 "session could not be closed on other party's request"
673 ),
674 _ => {}
675 }
676 }
677 }
678
679 Ok(())
680 }
681
682 async fn close_session(&self, session_id: SessionId, notify_closure: bool) -> crate::errors::Result<bool> {
683 if let Some(session_data) = self.sessions.remove(&session_id).await {
684 if notify_closure {
686 trace!(?session_id, "sending session termination");
687 self.msg_sender
688 .get()
689 .ok_or(SessionManagerError::NotStarted)?
690 .send_message(
691 StartProtocol::CloseSession(session_id.with_peer(self.me)).try_into()?,
692 *session_id.peer(),
693 session_data.routing_opts,
694 )
695 .await?;
696 }
697
698 session_data.session_tx.close_channel();
700 trace!(?session_id, "data tx channel closed on session");
701
702 #[cfg(all(feature = "prometheus", not(test)))]
703 METRIC_ACTIVE_SESSIONS.decrement(1.0);
704 Ok(true)
705 } else {
706 debug!(
708 ?session_id,
709 "could not find session id to close, maybe the session is already closed"
710 );
711 Ok(false)
712 }
713 }
714}
715
716#[cfg(test)]
717mod tests {
718 use super::*;
719
720 use crate::types::SessionTarget;
721 use crate::Capability;
722 use anyhow::anyhow;
723 use async_std::prelude::FutureExt;
724 use async_trait::async_trait;
725 use futures::AsyncWriteExt;
726 use hopr_primitive_types::bounded::BoundedSize;
727
728 mockall::mock! {
729 MsgSender {}
730 impl Clone for MsgSender {
731 fn clone(&self) -> Self;
732 }
733 #[async_trait]
734 impl SendMsg for MsgSender {
735 async fn send_message(
736 &self,
737 data: ApplicationData,
738 destination: PeerId,
739 options: RoutingOptions,
740 ) -> std::result::Result<(), TransportSessionError>;
741 }
742 }
743
744 #[async_std::test]
745 async fn test_insert_into_next_slot() -> anyhow::Result<()> {
746 let cache = moka::future::Cache::new(10);
747
748 for i in 0..5 {
749 let v = insert_into_next_slot(&cache, |prev| prev.map(|v| (v + 1) % 5).unwrap_or(0), "foo".to_string())
750 .await
751 .ok_or(anyhow!("should insert"))?;
752 assert_eq!(v, i);
753 assert_eq!(Some("foo".to_string()), cache.get(&i).await);
754 }
755
756 assert!(
757 insert_into_next_slot(&cache, |prev| prev.map(|v| (v + 1) % 5).unwrap_or(0), "foo".to_string())
758 .await
759 .is_none(),
760 "must not find slot when full"
761 );
762
763 Ok(())
764 }
765
766 #[test_log::test(async_std::test)]
767 async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
768 {
769 let alice_peer = PeerId::random();
770 let bob_peer = PeerId::random();
771
772 let alice_mgr = SessionManager::new(alice_peer, Default::default());
773 let bob_mgr = SessionManager::new(bob_peer, Default::default());
774
775 let mut sequence = mockall::Sequence::new();
776 let mut alice_transport = MockMsgSender::new();
777 let mut bob_transport = MockMsgSender::new();
778
779 let bob_mgr_clone = bob_mgr.clone();
781 alice_transport
782 .expect_send_message()
783 .once()
784 .in_sequence(&mut sequence)
785 .withf(move |_, peer, _| *peer == bob_peer)
786 .returning(move |data, _, _| {
787 async_std::task::block_on(bob_mgr_clone.dispatch_message(data))?;
788 Ok(())
789 });
790
791 bob_transport
793 .expect_clone()
794 .once()
795 .in_sequence(&mut sequence)
796 .return_once(|| MockMsgSender::new());
797
798 let alice_mgr_clone = alice_mgr.clone();
800 bob_transport
801 .expect_send_message()
802 .once()
803 .in_sequence(&mut sequence)
804 .withf(move |_, peer, _| *peer == alice_peer)
805 .returning(move |data, _, _| {
806 async_std::task::block_on(alice_mgr_clone.dispatch_message(data))?;
807 Ok(())
808 });
809
810 alice_transport
812 .expect_clone()
813 .once()
814 .in_sequence(&mut sequence)
815 .return_once(|| MockMsgSender::new());
816
817 let bob_mgr_clone = bob_mgr.clone();
819 alice_transport
820 .expect_send_message()
821 .once()
822 .in_sequence(&mut sequence)
823 .withf(move |_, peer, _| *peer == bob_peer)
824 .returning(move |data, _, _| {
825 async_std::task::block_on(bob_mgr_clone.dispatch_message(data))?;
826 Ok(())
827 });
828
829 let alice_mgr_clone = alice_mgr.clone();
831 bob_transport
832 .expect_send_message()
833 .once()
834 .in_sequence(&mut sequence)
835 .withf(move |_, peer, _| *peer == alice_peer)
836 .returning(move |data, _, _| {
837 async_std::task::block_on(alice_mgr_clone.dispatch_message(data))?;
838 Ok(())
839 });
840
841 let mut jhs = Vec::new();
842
843 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
845 jhs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
846
847 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
849 jhs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
850
851 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
852
853 pin_mut!(new_session_rx_bob);
854 let (alice_session, bob_session) = futures::future::join(
855 alice_mgr.new_session(SessionClientConfig {
856 peer: bob_peer,
857 path_options: RoutingOptions::Hops(BoundedSize::MIN),
858 target: SessionTarget::TcpStream(target.clone()),
859 capabilities: vec![Capability::Segmentation],
860 }),
861 new_session_rx_bob.next(),
862 )
863 .timeout(Duration::from_secs(2))
864 .await?;
865
866 let mut alice_session = alice_session?;
867 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
868
869 assert!(
870 alice_session.capabilities().len() == 1 && alice_session.capabilities().contains(&Capability::Segmentation)
871 );
872 assert_eq!(alice_session.capabilities(), bob_session.session.capabilities());
873 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
874
875 async_std::task::sleep(Duration::from_millis(100)).await;
876 alice_session.close().await?;
877
878 async_std::task::sleep(Duration::from_millis(100)).await;
879 futures::stream::iter(jhs)
880 .for_each(hopr_async_runtime::prelude::cancel_join_handle)
881 .await;
882
883 Ok(())
884 }
885
886 #[test_log::test(async_std::test)]
887 async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
888 let alice_peer = PeerId::random();
889 let bob_peer = PeerId::random();
890
891 let cfg = SessionManagerConfig {
892 idle_timeout: Duration::from_millis(200),
893 ..Default::default()
894 };
895
896 let alice_mgr = SessionManager::new(alice_peer, cfg);
897 let bob_mgr = SessionManager::new(bob_peer, Default::default());
898
899 let mut sequence = mockall::Sequence::new();
900 let mut alice_transport = MockMsgSender::new();
901 let mut bob_transport = MockMsgSender::new();
902
903 let bob_mgr_clone = bob_mgr.clone();
905 alice_transport
906 .expect_send_message()
907 .once()
908 .in_sequence(&mut sequence)
909 .withf(move |_, peer, _| *peer == bob_peer)
910 .returning(move |data, _, _| {
911 async_std::task::block_on(bob_mgr_clone.dispatch_message(data))?;
912 Ok(())
913 });
914
915 bob_transport
917 .expect_clone()
918 .once()
919 .in_sequence(&mut sequence)
920 .return_once(|| MockMsgSender::new());
921
922 let alice_mgr_clone = alice_mgr.clone();
924 bob_transport
925 .expect_send_message()
926 .once()
927 .in_sequence(&mut sequence)
928 .withf(move |_, peer, _| *peer == alice_peer)
929 .returning(move |data, _, _| {
930 async_std::task::block_on(alice_mgr_clone.dispatch_message(data))?;
931 Ok(())
932 });
933
934 alice_transport
936 .expect_clone()
937 .once()
938 .in_sequence(&mut sequence)
939 .return_once(|| MockMsgSender::new());
940
941 let bob_mgr_clone = bob_mgr.clone();
943 alice_transport
944 .expect_send_message()
945 .once()
946 .in_sequence(&mut sequence)
947 .withf(move |_, peer, _| *peer == bob_peer)
948 .returning(move |data, _, _| {
949 async_std::task::block_on(bob_mgr_clone.dispatch_message(data))?;
950 Ok(())
951 });
952
953 let alice_mgr_clone = alice_mgr.clone();
955 bob_transport
956 .expect_send_message()
957 .once()
958 .in_sequence(&mut sequence)
959 .withf(move |_, peer, _| *peer == alice_peer)
960 .returning(move |data, _, _| {
961 async_std::task::block_on(alice_mgr_clone.dispatch_message(data))?;
962 Ok(())
963 });
964
965 let mut jhs = Vec::new();
966
967 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
969 jhs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
970
971 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
973 jhs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
974
975 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
976
977 pin_mut!(new_session_rx_bob);
978 let (alice_session, bob_session) = futures::future::join(
979 alice_mgr.new_session(SessionClientConfig {
980 peer: bob_peer,
981 path_options: RoutingOptions::Hops(BoundedSize::MIN),
982 target: SessionTarget::TcpStream(target.clone()),
983 capabilities: vec![Capability::Segmentation],
984 }),
985 new_session_rx_bob.next(),
986 )
987 .timeout(Duration::from_secs(2))
988 .await?;
989
990 let alice_session = alice_session?;
991 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
992
993 assert!(
994 alice_session.capabilities().len() == 1 && alice_session.capabilities().contains(&Capability::Segmentation)
995 );
996 assert_eq!(alice_session.capabilities(), bob_session.session.capabilities());
997 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
998
999 async_std::task::sleep(Duration::from_millis(300)).await;
1001
1002 futures::stream::iter(jhs)
1003 .for_each(hopr_async_runtime::prelude::cancel_join_handle)
1004 .await;
1005
1006 Ok(())
1007 }
1008
1009 #[test_log::test(async_std::test)]
1010 async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1011 let alice_peer = PeerId::random();
1012 let bob_peer = PeerId::random();
1013
1014 let cfg = SessionManagerConfig {
1015 session_tag_range: 16..17, ..Default::default()
1017 };
1018
1019 let alice_mgr = SessionManager::new(alice_peer, Default::default());
1020 let bob_mgr = SessionManager::new(bob_peer, cfg);
1021
1022 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1024 bob_mgr
1025 .sessions
1026 .insert(
1027 SessionId::new(16, alice_peer),
1028 CachedSession {
1029 session_tx: Arc::new(dummy_tx),
1030 routing_opts: RoutingOptions::Hops(BoundedSize::MIN),
1031 },
1032 )
1033 .await;
1034
1035 let mut sequence = mockall::Sequence::new();
1036 let mut alice_transport = MockMsgSender::new();
1037 let mut bob_transport = MockMsgSender::new();
1038
1039 let bob_mgr_clone = bob_mgr.clone();
1041 alice_transport
1042 .expect_send_message()
1043 .once()
1044 .in_sequence(&mut sequence)
1045 .withf(move |_, peer, _| *peer == bob_peer)
1046 .returning(move |data, _, _| {
1047 async_std::task::block_on(bob_mgr_clone.dispatch_message(data))?;
1048 Ok(())
1049 });
1050
1051 let alice_mgr_clone = alice_mgr.clone();
1053 bob_transport
1054 .expect_send_message()
1055 .once()
1056 .in_sequence(&mut sequence)
1057 .withf(move |_, peer, _| *peer == alice_peer)
1058 .returning(move |data, _, _| {
1059 async_std::task::block_on(alice_mgr_clone.dispatch_message(data))?;
1060 Ok(())
1061 });
1062
1063 let mut jhs = Vec::new();
1064
1065 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1067 jhs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
1068
1069 let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1071 jhs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
1072
1073 let result = alice_mgr
1074 .new_session(SessionClientConfig {
1075 peer: bob_peer,
1076 path_options: RoutingOptions::Hops(BoundedSize::MIN),
1077 target: SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1078 capabilities: vec![],
1079 })
1080 .await;
1081
1082 assert!(
1083 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1084 );
1085
1086 Ok(())
1087 }
1088
1089 #[test_log::test(async_std::test)]
1090 async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
1091 let alice_peer = PeerId::random();
1092 let bob_peer = PeerId::random();
1093
1094 let cfg = SessionManagerConfig {
1095 initiation_timeout_base: Duration::from_millis(100),
1096 ..Default::default()
1097 };
1098
1099 let alice_mgr = SessionManager::new(alice_peer, cfg);
1100 let bob_mgr = SessionManager::new(bob_peer, Default::default());
1101
1102 let mut sequence = mockall::Sequence::new();
1103 let mut alice_transport = MockMsgSender::new();
1104 let bob_transport = MockMsgSender::new();
1105
1106 alice_transport
1108 .expect_send_message()
1109 .once()
1110 .in_sequence(&mut sequence)
1111 .withf(move |_, peer, _| *peer == bob_peer)
1112 .returning(|_, _, _| Ok(()));
1113
1114 let mut jhs = Vec::new();
1115
1116 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1118 jhs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
1119
1120 let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1122 jhs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
1123
1124 let result = alice_mgr
1125 .new_session(SessionClientConfig {
1126 peer: bob_peer,
1127 path_options: RoutingOptions::Hops(BoundedSize::MIN),
1128 target: SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1129 capabilities: vec![],
1130 })
1131 .await;
1132
1133 assert!(matches!(result, Err(TransportSessionError::Timeout)));
1134
1135 Ok(())
1136 }
1137}