1use std::{
2 ops::Range,
3 sync::{Arc, OnceLock},
4 time::Duration,
5};
6
7use futures::{
8 FutureExt, SinkExt, StreamExt, TryStreamExt, channel::mpsc::UnboundedSender, future::AbortHandle, pin_mut,
9};
10use futures_time::future::FutureExt as TimeExt;
11use hopr_crypto_random::Randomizable;
12use hopr_internal_types::prelude::HoprPseudonym;
13use hopr_network_types::prelude::*;
14use hopr_primitive_types::prelude::Address;
15use hopr_protocol_app::prelude::{ApplicationData, ReservedTag, Tag};
16use hopr_protocol_start::{
17 KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished, StartInitiation,
18};
19use tracing::{debug, error, info, trace, warn};
20
21use crate::{
22 Capability, IncomingSession, Session, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
23 balancer::{
24 AtomicSurbFlowEstimator, BalancerConfigFeedback, RateController, RateLimitSinkExt, SurbBalancer,
25 SurbControllerWithCorrection,
26 pid::{PidBalancerController, PidControllerGains},
27 simple::SimpleBalancerController,
28 },
29 errors::{SessionManagerError, TransportSessionError},
30 types::{ByteCapabilities, ClosureReason, HoprStartProtocol},
31 utils,
32 utils::insert_into_next_slot,
33};
34
35#[cfg(all(feature = "prometheus", not(test)))]
36lazy_static::lazy_static! {
37 static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
38 "hopr_session_num_active_sessions",
39 "Number of currently active HOPR sessions"
40 ).unwrap();
41 static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
42 "hopr_session_established_sessions_count",
43 "Number of sessions that were successfully established as an Exit node"
44 ).unwrap();
45 static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
46 "hopr_session_initiated_sessions_count",
47 "Number of sessions that were successfully initiated as an Entry node"
48 ).unwrap();
49 static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
50 "hopr_session_received_error_count",
51 "Number of HOPR session errors received from an Exit node",
52 &["kind"]
53 ).unwrap();
54 static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
55 "hopr_session_sent_error_count",
56 "Number of HOPR session errors sent to an Entry node",
57 &["kind"]
58 ).unwrap();
59}
60
61fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
62 debug!(?session_id, ?reason, "closing session");
63
64 if reason != ClosureReason::EmptyRead {
65 session_data.session_tx.close_channel();
67 trace!(?session_id, "data tx channel closed on session");
68 }
69
70 session_data.abort_handles.into_iter().for_each(|h| h.abort());
72
73 #[cfg(all(feature = "prometheus", not(test)))]
74 METRIC_ACTIVE_SESSIONS.decrement(1.0);
75}
76
77fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
78 base * (hops as u32)
79}
80
81pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
83
84pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
86
87const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
89
90type SessionInitiationCache =
94 moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
95
96#[derive(Clone)]
97struct SessionSlot {
98 session_tx: Arc<UnboundedSender<Box<[u8]>>>,
101 routing_opts: DestinationRouting,
102 abort_handles: Vec<AbortHandle>,
103 surb_mgmt: Option<SurbBalancerConfig>,
106 surb_estimator: Option<AtomicSurbFlowEstimator>,
108}
109
110struct SessionCacheBalancerFeedback(moka::future::Cache<SessionId, SessionSlot>);
112
113#[async_trait::async_trait]
114impl BalancerConfigFeedback for SessionCacheBalancerFeedback {
115 async fn get_config(&self, id: &SessionId) -> crate::errors::Result<SurbBalancerConfig> {
116 self.0
117 .get(id)
118 .await
119 .ok_or(SessionManagerError::NonExistingSession)?
120 .surb_mgmt
121 .ok_or(SessionManagerError::Other("missing surb balancer config".into()).into())
122 }
123
124 async fn on_config_update(&self, id: &SessionId, cfg: SurbBalancerConfig) -> crate::errors::Result<()> {
125 if let moka::ops::compute::CompResult::ReplacedWith(_) = self
126 .0
127 .entry_by_ref(id)
128 .and_compute_with(|entry| {
129 futures::future::ready(match entry.map(|e| e.into_value()) {
130 None => moka::ops::compute::Op::Nop,
131 Some(mut updated_slot) => {
132 if let Some(balancer_cfg) = &mut updated_slot.surb_mgmt {
133 *balancer_cfg = cfg;
134 moka::ops::compute::Op::Put(updated_slot)
135 } else {
136 moka::ops::compute::Op::Nop
137 }
138 }
139 })
140 })
141 .await
142 {
143 Ok(())
144 } else {
145 Err(SessionManagerError::NonExistingSession.into())
146 }
147 }
148}
149
150#[derive(Clone, Debug, PartialEq, Eq)]
152pub enum DispatchResult {
153 Processed,
155 Unrelated(ApplicationData),
157}
158
159type SessionNotifiers = (
161 UnboundedSender<IncomingSession>,
162 UnboundedSender<(SessionId, ClosureReason)>,
163);
164
165#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
167pub struct SessionManagerConfig {
168 #[doc(hidden)]
176 #[default(_code = "16u64..1024u64")]
177 pub session_tag_range: Range<u64>,
178
179 #[default(128)]
188 pub maximum_sessions: usize,
189
190 #[default(Duration::from_millis(500))]
197 pub initiation_timeout_base: Duration,
198
199 #[default(Duration::from_secs(180))]
203 pub idle_timeout: Duration,
204
205 #[default(Duration::from_millis(100))]
210 pub balancer_sampling_interval: Duration,
211
212 #[default(10)]
218 pub initial_return_session_egress_rate: usize,
219 #[default(Duration::from_secs(5))]
230 pub minimum_surb_buffer_duration: Duration,
231 #[default(10_000)]
239 pub maximum_surb_buffer_size: usize,
240
241 #[default(_code = "Some((Duration::from_secs(60), 0.20))")]
249 pub growable_target_surb_buffer: Option<(Duration, f64)>,
250}
251
252pub struct SessionManager<S> {
395 session_initiations: SessionInitiationCache,
396 session_notifiers: Arc<OnceLock<SessionNotifiers>>,
397 sessions: moka::future::Cache<SessionId, SessionSlot>,
398 msg_sender: Arc<OnceLock<S>>,
399 cfg: SessionManagerConfig,
400}
401
402impl<S> Clone for SessionManager<S> {
403 fn clone(&self) -> Self {
404 Self {
405 session_initiations: self.session_initiations.clone(),
406 session_notifiers: self.session_notifiers.clone(),
407 sessions: self.sessions.clone(),
408 cfg: self.cfg.clone(),
409 msg_sender: self.msg_sender.clone(),
410 }
411 }
412}
413
414impl<S> SessionManager<S>
415where
416 S: futures::Sink<(DestinationRouting, ApplicationData)> + Clone + Send + Sync + Unpin + 'static,
417 S::Error: std::error::Error + Send + Sync + Clone + 'static,
418{
419 pub fn new(mut cfg: SessionManagerConfig) -> Self {
421 let min_session_tag_range_reservation = ReservedTag::range().end;
422 debug_assert!(
423 min_session_tag_range_reservation > HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG.as_u64(),
424 "invalid tag reservation range"
425 );
426
427 if cfg.session_tag_range.start < min_session_tag_range_reservation {
429 let diff = min_session_tag_range_reservation - cfg.session_tag_range.start;
430 cfg.session_tag_range = min_session_tag_range_reservation..cfg.session_tag_range.end + diff;
431 }
432 cfg.maximum_sessions = cfg
433 .maximum_sessions
434 .clamp(1, (cfg.session_tag_range.end - cfg.session_tag_range.start) as usize);
435
436 #[cfg(all(feature = "prometheus", not(test)))]
437 METRIC_ACTIVE_SESSIONS.set(0.0);
438
439 let msg_sender = Arc::new(OnceLock::new());
440 Self {
441 msg_sender: msg_sender.clone(),
442 session_initiations: moka::future::Cache::builder()
443 .max_capacity(cfg.maximum_sessions as u64)
444 .time_to_live(
445 2 * initiation_timeout_max_one_way(
446 cfg.initiation_timeout_base,
447 RoutingOptions::MAX_INTERMEDIATE_HOPS,
448 ),
449 )
450 .build(),
451 sessions: moka::future::Cache::builder()
452 .max_capacity(cfg.maximum_sessions as u64)
453 .time_to_idle(cfg.idle_timeout)
454 .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
455 moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
456 trace!(?session_id, ?reason, "session evicted from the cache");
457 close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
458 }
459 _ => {}
460 })
461 .build(),
462 session_notifiers: Arc::new(OnceLock::new()),
463 cfg,
464 }
465 }
466
467 pub fn start(
473 &self,
474 msg_sender: S,
475 new_session_notifier: UnboundedSender<IncomingSession>,
476 ) -> crate::errors::Result<Vec<AbortHandle>> {
477 self.msg_sender
478 .set(msg_sender)
479 .map_err(|_| SessionManagerError::AlreadyStarted)?;
480
481 let (session_close_tx, session_close_rx) = futures::channel::mpsc::unbounded();
482 self.session_notifiers
483 .set((new_session_notifier, session_close_tx))
484 .map_err(|_| SessionManagerError::AlreadyStarted)?;
485
486 let myself = self.clone();
487 let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable!(session_close_rx.for_each_concurrent(
488 None,
489 move |(session_id, closure_reason)| {
490 let myself = myself.clone();
491 async move {
492 if let Some(session_data) = myself.sessions.remove(&session_id).await {
496 close_session(session_id, session_data, closure_reason);
497 } else {
498 debug!(
500 ?session_id,
501 ?closure_reason,
502 "could not find session id to close, maybe the session is already closed"
503 );
504 }
505 }
506 },
507 ));
508
509 let myself = self.clone();
514 let ah_session_expiration = hopr_async_runtime::spawn_as_abortable!(async move {
515 let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
516 let timeout = 2 * initiation_timeout_max_one_way(
517 myself.cfg.initiation_timeout_base,
518 RoutingOptions::MAX_INTERMEDIATE_HOPS,
519 )
520 .min(myself.cfg.idle_timeout)
521 .mul_f64(jitter)
522 / 2;
523 futures_time::stream::interval(timeout.into())
524 .for_each(|_| {
525 trace!("executing session cache evictions");
526 futures::future::join(
527 myself.sessions.run_pending_tasks(),
528 myself.session_initiations.run_pending_tasks(),
529 )
530 .map(|_| ())
531 })
532 .await;
533 });
534
535 Ok(vec![ah_closure_notifications, ah_session_expiration])
536 }
537
538 pub fn is_started(&self) -> bool {
540 self.session_notifiers.get().is_some()
541 }
542
543 async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
544 let abort_handles_clone = slot.abort_handles.clone();
546 if let moka::ops::compute::CompResult::Inserted(_) = self
547 .sessions
548 .entry(session_id)
549 .and_compute_with(|entry| {
550 futures::future::ready(if entry.is_none() {
551 moka::ops::compute::Op::Put(slot)
552 } else {
553 moka::ops::compute::Op::Nop
554 })
555 })
556 .await
557 {
558 #[cfg(all(feature = "prometheus", not(test)))]
559 {
560 METRIC_NUM_INITIATED_SESSIONS.increment();
561 METRIC_ACTIVE_SESSIONS.increment(1.0);
562 }
563
564 Ok(())
565 } else {
566 error!(%session_id, "session already exists - loopback attempt");
568 abort_handles_clone.into_iter().for_each(|ah| ah.abort());
569 Err(SessionManagerError::Loopback.into())
570 }
571 }
572
573 pub async fn new_session(
581 &self,
582 destination: Address,
583 target: SessionTarget,
584 cfg: SessionClientConfig,
585 ) -> crate::errors::Result<Session> {
586 self.sessions.run_pending_tasks().await;
587 if self.cfg.maximum_sessions <= self.sessions.entry_count() as usize {
588 return Err(SessionManagerError::TooManySessions.into());
589 }
590
591 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
592
593 let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
594 let challenge = insert_into_next_slot(
595 &self.session_initiations,
596 |ch| {
597 if let Some(challenge) = ch {
598 ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
599 } else {
600 hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
601 }
602 },
603 tx_initiation_done,
604 )
605 .await
606 .ok_or(SessionManagerError::NoChallengeSlots)?; trace!(challenge, ?cfg, "initiating session with config");
610 let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
611 challenge,
612 target,
613 capabilities: ByteCapabilities(cfg.capabilities),
614 additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
615 cfg.surb_management
616 .map(|c| c.target_surb_buffer_size)
617 .unwrap_or(
618 self.cfg.initial_return_session_egress_rate as u64
619 * self
620 .cfg
621 .minimum_surb_buffer_duration
622 .max(MIN_SURB_BUFFER_DURATION)
623 .as_secs(),
624 )
625 .min(u32::MAX as u64) as u32
626 } else {
627 0
628 },
629 });
630
631 let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
632 let forward_routing = DestinationRouting::Forward {
633 destination,
634 pseudonym: Some(pseudonym), forward_options: cfg.forward_path_options.clone(),
636 return_options: cfg.return_path_options.clone().into(),
637 };
638
639 info!(challenge, %pseudonym, %destination, "new session request");
641 msg_sender
642 .send((forward_routing.clone(), start_session_msg.try_into()?))
643 .await
644 .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?;
645
646 let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
648 self.cfg.initiation_timeout_base,
649 cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
650 )
651 .into();
652
653 pin_mut!(rx_initiation_done);
655
656 trace!(challenge, "awaiting session establishment");
657 match rx_initiation_done.try_next().timeout(initiation_timeout).await {
658 Ok(Ok(Some(est))) => {
659 let session_id = est.session_id;
661 debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
662
663 let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
664 let notifier = self
665 .session_notifiers
666 .get()
667 .map(|(_, notifier)| {
668 let notifier = notifier.clone();
669 Box::new(move |session_id: SessionId, reason: ClosureReason| {
670 let _ = notifier
671 .unbounded_send((session_id, reason))
672 .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
673 })
674 })
675 .ok_or(SessionManagerError::NotStarted)?;
676
677 if let Some(balancer_config) = cfg.surb_management {
681 let surb_estimator = AtomicSurbFlowEstimator::default();
682
683 let surb_estimator_clone = surb_estimator.clone();
685 let scoring_sender =
686 msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationData)| {
687 surb_estimator_clone.produced.fetch_add(
689 ApplicationData::estimate_surbs_with_msg(&data.plain_text) as u64,
690 std::sync::atomic::Ordering::Relaxed,
691 );
692 futures::future::ok::<_, S::Error>((routing, data))
693 });
694
695 let mut abort_handles = Vec::new();
696
697 let (ka_controller, ka_abort_handle) =
699 utils::spawn_keep_alive_stream(session_id, scoring_sender.clone(), forward_routing.clone());
700 abort_handles.push(ka_abort_handle);
701
702 debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
704 let balancer = SurbBalancer::new(
705 session_id,
706 PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
708 surb_estimator.clone(),
709 ka_controller,
710 balancer_config,
711 );
712
713 let (level_stream, balancer_abort_handle) = balancer.start_control_loop(
714 self.cfg.balancer_sampling_interval,
715 SessionCacheBalancerFeedback(self.sessions.clone()),
716 None,
717 );
718 abort_handles.push(balancer_abort_handle);
719
720 self.insert_session_slot(
722 session_id,
723 SessionSlot {
724 session_tx: Arc::new(tx),
725 routing_opts: forward_routing.clone(),
726 abort_handles,
727 surb_mgmt: Some(balancer_config),
728 surb_estimator: None,
729 },
730 )
731 .await?;
732
733 match level_stream
736 .skip_while(|current_level| {
737 futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
738 })
739 .next()
740 .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
741 .await
742 {
743 Ok(Some(surb_level)) => {
744 info!(%session_id, surb_level, "session is ready");
745 }
746 Ok(None) => {
747 return Err(
748 SessionManagerError::Other("surb balancer was cancelled prematurely".into()).into(),
749 );
750 }
751 Err(_) => {
752 warn!(%session_id, "session didn't reach target SURB buffer size in time");
753 }
754 }
755
756 Session::new(
757 session_id,
758 forward_routing,
759 cfg.capabilities,
760 (
761 scoring_sender,
762 rx.inspect(move |_| {
763 surb_estimator
765 .consumed
766 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
767 }),
768 ),
769 Some(notifier),
770 )
771 } else {
772 warn!(%session_id, "session ready without SURB balancing");
773
774 self.insert_session_slot(
775 session_id,
776 SessionSlot {
777 session_tx: Arc::new(tx),
778 routing_opts: forward_routing.clone(),
779 abort_handles: vec![],
780 surb_mgmt: None,
781 surb_estimator: None,
782 },
783 )
784 .await?;
785
786 Session::new(
787 session_id,
788 forward_routing,
789 cfg.capabilities,
790 (msg_sender, rx),
791 Some(notifier),
792 )
793 }
794 }
795 Ok(Ok(None)) => Err(SessionManagerError::Other(
796 "internal error: sender has been closed without completing the session establishment".into(),
797 )
798 .into()),
799 Ok(Err(e)) => {
800 error!(
802 challenge = e.challenge,
803 error = ?e,
804 "the other party rejected the session initiation with error"
805 );
806 Err(TransportSessionError::Rejected(e.reason))
807 }
808 Err(_) => {
809 error!(challenge, "session initiation attempt timed out");
811
812 #[cfg(all(feature = "prometheus", not(test)))]
813 METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
814
815 Err(TransportSessionError::Timeout)
816 }
817 }
818 }
819
820 pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
824 if let Some(session_data) = self.sessions.get(id).await {
825 trace!(session_id = ?id, "pinging manually session");
826 Ok(self
827 .msg_sender
828 .get()
829 .cloned()
830 .ok_or(SessionManagerError::NotStarted)?
831 .send((
832 session_data.routing_opts.clone(),
833 HoprStartProtocol::KeepAlive((*id).into()).try_into()?,
834 ))
835 .await
836 .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?)
837 } else {
838 Err(SessionManagerError::NonExistingSession.into())
839 }
840 }
841
842 pub async fn active_sessions(&self) -> Vec<SessionId> {
844 self.sessions.run_pending_tasks().await;
845 self.sessions.iter().map(|(k, _)| *k).collect()
846 }
847
848 pub async fn update_surb_balancer_config(
853 &self,
854 id: &SessionId,
855 config: SurbBalancerConfig,
856 ) -> crate::errors::Result<()> {
857 match self
858 .sessions
859 .entry_by_ref(id)
860 .and_compute_with(|entry| {
861 futures::future::ready(if let Some(mut cached_session) = entry.map(|e| e.into_value()) {
862 if cached_session.surb_mgmt.is_some() {
864 cached_session.surb_mgmt = Some(config);
865 moka::ops::compute::Op::Put(cached_session)
866 } else {
867 moka::ops::compute::Op::Nop
868 }
869 } else {
870 moka::ops::compute::Op::Nop
871 })
872 })
873 .await
874 {
875 moka::ops::compute::CompResult::ReplacedWith(_) => Ok(()),
876 moka::ops::compute::CompResult::Unchanged(_) => {
877 Err(SessionManagerError::Other("session does not use SURB balancing".into()).into())
878 }
879 _ => Err(SessionManagerError::NonExistingSession.into()),
880 }
881 }
882
883 pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
887 match self.sessions.get(id).await {
888 Some(session) => Ok(session.surb_mgmt),
889 None => Err(SessionManagerError::NonExistingSession.into()),
890 }
891 }
892
893 pub async fn dispatch_message(
900 &self,
901 pseudonym: HoprPseudonym,
902 data: ApplicationData,
903 ) -> crate::errors::Result<DispatchResult> {
904 if data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
905 trace!(tag = %data.application_tag, "dispatching Start protocol message");
907 return self
908 .handle_start_protocol_message(pseudonym, data)
909 .await
910 .map(|_| DispatchResult::Processed);
911 } else if self.cfg.session_tag_range.contains(&data.application_tag.as_u64()) {
912 let session_id = SessionId::new(data.application_tag, pseudonym);
913
914 return if let Some(session_data) = self.sessions.get(&session_id).await {
915 trace!(?session_id, "received data for a registered session");
916
917 Ok(session_data
918 .session_tx
919 .unbounded_send(data.plain_text)
920 .map(|_| DispatchResult::Processed)
921 .map_err(|e| SessionManagerError::Other(e.to_string()))?)
922 } else {
923 error!(%session_id, "received data from an unestablished session");
924 Err(TransportSessionError::UnknownData)
925 };
926 }
927
928 trace!(%data.application_tag, "received data not associated with session protocol or any existing session");
929 Ok(DispatchResult::Unrelated(data))
930 }
931
932 async fn handle_incoming_session_initiation(
933 &self,
934 pseudonym: HoprPseudonym,
935 session_req: StartInitiation<SessionTarget, ByteCapabilities>,
936 ) -> crate::errors::Result<()> {
937 trace!(challenge = session_req.challenge, "received session initiation request");
938
939 debug!(%pseudonym, "got new session request, searching for a free session slot");
940
941 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
942
943 let (new_session_notifier, close_session_notifier) = self
944 .session_notifiers
945 .get()
946 .cloned()
947 .ok_or(SessionManagerError::NotStarted)?;
948
949 let reply_routing = DestinationRouting::Return(pseudonym.into());
951
952 let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
953
954 self.sessions.run_pending_tasks().await; let allocated_slot = if self.cfg.maximum_sessions > self.sessions.entry_count() as usize {
957 insert_into_next_slot(
958 &self.sessions,
959 |sid| {
960 let next_tag: Tag = match sid {
963 Some(session_id) => ((session_id.tag().as_u64() + 1) % self.cfg.session_tag_range.end)
964 .max(self.cfg.session_tag_range.start)
965 .into(),
966 None => hopr_crypto_random::random_integer(
967 self.cfg.session_tag_range.start,
968 Some(self.cfg.session_tag_range.end),
969 )
970 .into(),
971 };
972 SessionId::new(next_tag, pseudonym)
973 },
974 SessionSlot {
975 session_tx: Arc::new(tx_session_data),
976 routing_opts: reply_routing.clone(),
977 abort_handles: vec![],
978 surb_mgmt: None,
979 surb_estimator: None,
980 },
981 )
982 .await
983 } else {
984 error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
985 None
986 };
987
988 if let Some(session_id) = allocated_slot {
989 debug!(%session_id, ?session_req, "assigned a new session");
990
991 let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
992 if let Err(error) = close_session_notifier.unbounded_send((session_id, reason)) {
993 error!(%session_id, %error, %reason, "failed to notify session closure");
994 }
995 });
996
997 let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
998 let surb_estimator = AtomicSurbFlowEstimator::default();
999
1000 let egress_rate_control =
1002 RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1003
1004 let target_surb_buffer_size = if session_req.additional_data > 0 {
1007 (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1008 } else {
1009 self.cfg.initial_return_session_egress_rate as u64
1010 * self
1011 .cfg
1012 .minimum_surb_buffer_duration
1013 .max(MIN_SURB_BUFFER_DURATION)
1014 .as_secs()
1015 };
1016
1017 let surb_estimator_clone = surb_estimator.clone();
1018 let session = Session::new(
1019 session_id,
1020 reply_routing.clone(),
1021 session_req.capabilities,
1022 (
1023 msg_sender
1025 .clone()
1026 .with(move |(routing, data): (DestinationRouting, ApplicationData)| {
1027 surb_estimator_clone
1029 .consumed
1030 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1031 futures::future::ok::<_, S::Error>((routing, data))
1032 })
1033 .rate_limit_with_controller(&egress_rate_control)
1034 .buffer((2 * target_surb_buffer_size) as usize),
1035 rx_session_data.inspect(move |data| {
1037 surb_estimator_clone.produced.fetch_add(
1039 ApplicationData::estimate_surbs_with_msg(data) as u64,
1040 std::sync::atomic::Ordering::Relaxed,
1041 );
1042 }),
1043 ),
1044 Some(closure_notifier),
1045 )?;
1046
1047 let balancer_config = SurbBalancerConfig {
1051 target_surb_buffer_size,
1052 max_surbs_per_sec: target_surb_buffer_size
1054 / self
1055 .cfg
1056 .minimum_surb_buffer_duration
1057 .max(MIN_SURB_BUFFER_DURATION)
1058 .as_secs(),
1059 surb_decay: None,
1062 };
1063
1064 let (balancer_abort_handle, balancer_abort_reg) = AbortHandle::new_pair();
1066 if let moka::ops::compute::CompResult::ReplacedWith(_) = self
1067 .sessions
1068 .entry(session_id)
1069 .and_compute_with(|entry| {
1070 if let Some(mut cached_session) = entry.map(|c| c.into_value()) {
1071 cached_session.abort_handles.push(balancer_abort_handle);
1072 cached_session.surb_mgmt = Some(balancer_config);
1073 cached_session.surb_estimator = Some(surb_estimator.clone());
1074 futures::future::ready(moka::ops::compute::Op::Put(cached_session))
1075 } else {
1076 futures::future::ready(moka::ops::compute::Op::Nop)
1077 }
1078 })
1079 .await
1080 {
1081 debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1084 let balancer = SurbBalancer::new(
1085 session_id,
1086 if let Some((growth_window, ratio_threshold)) = self.cfg.growable_target_surb_buffer.as_ref() {
1087 SimpleBalancerController::with_increasing_setpoint(
1089 *ratio_threshold,
1090 (growth_window
1091 .div_duration_f64(self.cfg.balancer_sampling_interval)
1092 .round() as usize)
1093 .max(1),
1094 )
1095 } else {
1096 SimpleBalancerController::default()
1097 },
1098 surb_estimator,
1099 SurbControllerWithCorrection(egress_rate_control, 1), balancer_config,
1101 );
1102
1103 let _ = balancer.start_control_loop(
1104 self.cfg.balancer_sampling_interval,
1105 SessionCacheBalancerFeedback(self.sessions.clone()),
1106 Some(balancer_abort_reg),
1107 );
1108 } else {
1109 return Err(SessionManagerError::Other(
1111 "failed to spawn SURB balancer - inconsistent cache".into(),
1112 )
1113 .into());
1114 }
1115
1116 session
1117 } else {
1118 Session::new(
1119 session_id,
1120 reply_routing.clone(),
1121 session_req.capabilities,
1122 (msg_sender.clone(), rx_session_data),
1123 Some(closure_notifier),
1124 )?
1125 };
1126
1127 let incoming_session = IncomingSession {
1129 session,
1130 target: session_req.target,
1131 };
1132
1133 if let Err(error) = new_session_notifier.unbounded_send(incoming_session) {
1135 warn!(%error, "failed to send session to incoming session queue");
1136 }
1137
1138 trace!(?session_id, "session notification sent");
1139
1140 let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1143 orig_challenge: session_req.challenge,
1144 session_id,
1145 });
1146
1147 msg_sender.send((reply_routing, data.try_into()?)).await.map_err(|e| {
1148 SessionManagerError::Other(format!("failed to send session establishment message: {e}"))
1149 })?;
1150
1151 info!(%session_id, "new session established");
1152
1153 #[cfg(all(feature = "prometheus", not(test)))]
1154 {
1155 METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1156 METRIC_ACTIVE_SESSIONS.increment(1.0);
1157 }
1158 } else {
1159 error!(%pseudonym,"failed to reserve a new session slot");
1160
1161 let reason = StartErrorReason::NoSlotsAvailable;
1163 let data = HoprStartProtocol::SessionError(StartErrorType {
1164 challenge: session_req.challenge,
1165 reason,
1166 });
1167
1168 msg_sender.send((reply_routing, data.try_into()?)).await.map_err(|e| {
1169 SessionManagerError::Other(format!("failed to send session establishment error message: {e}"))
1170 })?;
1171
1172 trace!(%pseudonym, "session establishment failure message sent");
1173
1174 #[cfg(all(feature = "prometheus", not(test)))]
1175 METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1176 }
1177
1178 Ok(())
1179 }
1180
1181 async fn handle_start_protocol_message(
1182 &self,
1183 pseudonym: HoprPseudonym,
1184 data: ApplicationData,
1185 ) -> crate::errors::Result<()> {
1186 match HoprStartProtocol::try_from(data)? {
1187 HoprStartProtocol::StartSession(session_req) => {
1188 self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1189 }
1190 HoprStartProtocol::SessionEstablished(est) => {
1191 trace!(
1192 session_id = ?est.session_id,
1193 "received session establishment confirmation"
1194 );
1195 let challenge = est.orig_challenge;
1196 let session_id = est.session_id;
1197 if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1198 if let Err(e) = tx_est.unbounded_send(Ok(est)) {
1199 return Err(SessionManagerError::Other(format!(
1200 "could not notify session {session_id} establishment: {e}"
1201 ))
1202 .into());
1203 }
1204 debug!(?session_id, challenge, "session establishment complete");
1205 } else {
1206 error!(%session_id, challenge, "unknown session establishment attempt or expired");
1207 }
1208 }
1209 HoprStartProtocol::SessionError(error) => {
1210 trace!(
1211 challenge = error.challenge,
1212 error = ?error.reason,
1213 "failed to initialize a session",
1214 );
1215 if let Some(tx_est) = self.session_initiations.remove(&error.challenge).await {
1218 if let Err(e) = tx_est.unbounded_send(Err(error)) {
1219 return Err(SessionManagerError::Other(format!(
1220 "could not notify session establishment error {error:?}: {e}"
1221 ))
1222 .into());
1223 }
1224 error!(
1225 challenge = error.challenge,
1226 ?error,
1227 "session establishment error received"
1228 );
1229 } else {
1230 error!(
1231 challenge = error.challenge,
1232 ?error,
1233 "session establishment attempt expired before error could be delivered"
1234 );
1235 }
1236
1237 #[cfg(all(feature = "prometheus", not(test)))]
1238 METRIC_RECEIVED_SESSION_ERRS.increment(&[&error.reason.to_string()])
1239 }
1240 HoprStartProtocol::KeepAlive(msg) => {
1241 let session_id = msg.session_id;
1242 if let Some(session_slot) = self.sessions.get(&session_id).await {
1243 trace!(?session_id, "received keep-alive request");
1244 if let Some(estimator) = session_slot.surb_estimator.as_ref() {
1247 estimator.produced.fetch_add(
1249 KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64,
1250 std::sync::atomic::Ordering::Relaxed,
1251 );
1252 }
1253 } else {
1254 debug!(%session_id, "received keep-alive request for an unknown session");
1255 }
1256 }
1257 }
1258
1259 Ok(())
1260 }
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265 use anyhow::anyhow;
1266 use futures::{AsyncWriteExt, future::BoxFuture};
1267 use hopr_crypto_random::Randomizable;
1268 use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
1269 use hopr_primitive_types::prelude::Address;
1270 use hopr_protocol_start::StartProtocolDiscriminants;
1271 use tokio::time::timeout;
1272
1273 use super::*;
1274 use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1275
1276 #[async_trait::async_trait]
1277 trait SendMsg {
1278 async fn send_message(&self, routing: DestinationRouting, data: ApplicationData) -> crate::errors::Result<()>;
1279 }
1280
1281 mockall::mock! {
1282 MsgSender {}
1283 impl SendMsg for MsgSender {
1284 fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationData)
1285 -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1286 }
1287 }
1288
1289 fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationData)> {
1290 let (tx, rx) = futures::channel::mpsc::unbounded();
1291 tokio::task::spawn(async move {
1292 pin_mut!(rx);
1293 while let Some((routing, data)) = rx.next().await {
1294 sender
1295 .send_message(routing, data)
1296 .await
1297 .expect("send message must not fail in mock");
1298 }
1299 });
1300 tx
1301 }
1302
1303 fn msg_type(data: &ApplicationData, expected: StartProtocolDiscriminants) -> bool {
1304 HoprStartProtocol::decode(data.application_tag, &data.plain_text)
1305 .map(|d| StartProtocolDiscriminants::from(d) == expected)
1306 .unwrap_or(false)
1307 }
1308
1309 #[test_log::test(tokio::test)]
1310 async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1311 {
1312 let alice_pseudonym = HoprPseudonym::random();
1313 let bob_peer: Address = (&ChainKeypair::random()).into();
1314
1315 let alice_mgr = SessionManager::new(Default::default());
1316 let bob_mgr = SessionManager::new(Default::default());
1317
1318 let mut sequence = mockall::Sequence::new();
1319 let mut alice_transport = MockMsgSender::new();
1320 let mut bob_transport = MockMsgSender::new();
1321
1322 let bob_mgr_clone = bob_mgr.clone();
1324 alice_transport
1325 .expect_send_message()
1326 .once()
1327 .in_sequence(&mut sequence)
1328 .withf(move |peer, data| {
1329 info!("alice sends {}", data.application_tag);
1330 msg_type(data, StartProtocolDiscriminants::StartSession)
1331 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1332 })
1333 .returning(move |_, data| {
1334 let bob_mgr_clone = bob_mgr_clone.clone();
1335 Box::pin(async move {
1336 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1337 Ok(())
1338 })
1339 });
1340
1341 let alice_mgr_clone = alice_mgr.clone();
1343 bob_transport
1344 .expect_send_message()
1345 .once()
1346 .in_sequence(&mut sequence)
1347 .withf(move |peer, data| {
1348 info!("bob sends {}", data.application_tag);
1349 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1350 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1351 })
1352 .returning(move |_, data| {
1353 let alice_mgr_clone = alice_mgr_clone.clone();
1354
1355 Box::pin(async move {
1356 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1357 Ok(())
1358 })
1359 });
1360
1361 let bob_mgr_clone = bob_mgr.clone();
1363 alice_transport
1364 .expect_send_message()
1365 .once()
1366 .in_sequence(&mut sequence)
1367 .withf(move |peer, data| {
1368 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1369 data.plain_text.as_ref(),
1370 )
1371 .expect("must be a session message")
1372 .try_as_segment()
1373 .expect("must be a segment")
1374 .is_terminating()
1375 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1376 })
1377 .returning(move |_, data| {
1378 let bob_mgr_clone = bob_mgr_clone.clone();
1379 Box::pin(async move {
1380 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1381 Ok(())
1382 })
1383 });
1384
1385 let mut ahs = Vec::new();
1386
1387 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1389 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1390
1391 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1393 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1394
1395 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1396
1397 pin_mut!(new_session_rx_bob);
1398 let (alice_session, bob_session) = timeout(
1399 Duration::from_secs(2),
1400 futures::future::join(
1401 alice_mgr.new_session(
1402 bob_peer,
1403 SessionTarget::TcpStream(target.clone()),
1404 SessionClientConfig {
1405 pseudonym: alice_pseudonym.into(),
1406 capabilities: Capability::NoRateControl | Capability::Segmentation,
1407 surb_management: None,
1408 ..Default::default()
1409 },
1410 ),
1411 new_session_rx_bob.next(),
1412 ),
1413 )
1414 .await?;
1415
1416 let mut alice_session = alice_session?;
1417 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1418
1419 assert_eq!(
1420 *alice_session.capabilities(),
1421 Capability::Segmentation | Capability::NoRateControl
1422 );
1423 assert_eq!(alice_session.capabilities(), bob_session.session.capabilities());
1424 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1425
1426 assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1427 assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1428 assert!(
1429 alice_mgr
1430 .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1431 .await
1432 .is_err()
1433 );
1434
1435 assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1436 assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1437 assert!(
1438 bob_mgr
1439 .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1440 .await
1441 .is_err()
1442 );
1443
1444 tokio::time::sleep(Duration::from_millis(100)).await;
1445 alice_session.close().await?;
1446
1447 tokio::time::sleep(Duration::from_millis(100)).await;
1448
1449 assert!(matches!(
1450 alice_mgr.ping_session(alice_session.id()).await,
1451 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1452 ));
1453
1454 futures::stream::iter(ahs)
1455 .for_each(|ah| async move { ah.abort() })
1456 .await;
1457
1458 Ok(())
1459 }
1460
1461 #[test_log::test(tokio::test)]
1462 async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1463 let alice_pseudonym = HoprPseudonym::random();
1464 let bob_peer: Address = (&ChainKeypair::random()).into();
1465
1466 let cfg = SessionManagerConfig {
1467 idle_timeout: Duration::from_millis(200),
1468 ..Default::default()
1469 };
1470
1471 let alice_mgr = SessionManager::new(cfg);
1472 let bob_mgr = SessionManager::new(Default::default());
1473
1474 let mut sequence = mockall::Sequence::new();
1475 let mut alice_transport = MockMsgSender::new();
1476 let mut bob_transport = MockMsgSender::new();
1477
1478 let bob_mgr_clone = bob_mgr.clone();
1480 alice_transport
1481 .expect_send_message()
1482 .once()
1483 .in_sequence(&mut sequence)
1484 .withf(move |peer, data| {
1485 msg_type(data, StartProtocolDiscriminants::StartSession)
1486 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1487 })
1488 .returning(move |_, data| {
1489 let bob_mgr_clone = bob_mgr_clone.clone();
1490 Box::pin(async move {
1491 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1492 Ok(())
1493 })
1494 });
1495
1496 let alice_mgr_clone = alice_mgr.clone();
1498 bob_transport
1499 .expect_send_message()
1500 .once()
1501 .in_sequence(&mut sequence)
1502 .withf(move |peer, data| {
1503 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1504 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1505 })
1506 .returning(move |_, data| {
1507 let alice_mgr_clone = alice_mgr_clone.clone();
1508
1509 Box::pin(async move {
1510 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1511 Ok(())
1512 })
1513 });
1514
1515 let mut ahs = Vec::new();
1516
1517 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1519 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1520
1521 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1523 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1524
1525 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1526
1527 pin_mut!(new_session_rx_bob);
1528 let (alice_session, bob_session) = timeout(
1529 Duration::from_secs(2),
1530 futures::future::join(
1531 alice_mgr.new_session(
1532 bob_peer,
1533 SessionTarget::TcpStream(target.clone()),
1534 SessionClientConfig {
1535 pseudonym: alice_pseudonym.into(),
1536 capabilities: Capability::NoRateControl | Capability::Segmentation,
1537 surb_management: None,
1538 ..Default::default()
1539 },
1540 ),
1541 new_session_rx_bob.next(),
1542 ),
1543 )
1544 .await?;
1545
1546 let alice_session = alice_session?;
1547 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1548
1549 assert_eq!(
1550 *alice_session.capabilities(),
1551 Capability::Segmentation | Capability::NoRateControl,
1552 );
1553 assert_eq!(alice_session.capabilities(), bob_session.session.capabilities());
1554 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1555
1556 tokio::time::sleep(Duration::from_millis(300)).await;
1558
1559 assert!(matches!(
1560 alice_mgr.ping_session(alice_session.id()).await,
1561 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1562 ));
1563
1564 futures::stream::iter(ahs)
1565 .for_each(|ah| async move { ah.abort() })
1566 .await;
1567
1568 Ok(())
1569 }
1570
1571 #[test_log::test(tokio::test)]
1572 async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1573 let alice_pseudonym = HoprPseudonym::random();
1574 let session_id = SessionId::new(16u64, alice_pseudonym);
1575 let balancer_cfg = SurbBalancerConfig {
1576 target_surb_buffer_size: 1000,
1577 max_surbs_per_sec: 100,
1578 ..Default::default()
1579 };
1580
1581 let alice_mgr =
1582 SessionManager::<UnboundedSender<(DestinationRouting, ApplicationData)>>::new(Default::default());
1583
1584 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1585 alice_mgr
1586 .sessions
1587 .insert(
1588 session_id,
1589 SessionSlot {
1590 session_tx: Arc::new(dummy_tx),
1591 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1592 abort_handles: Vec::new(),
1593 surb_mgmt: Some(balancer_cfg.clone()),
1594 surb_estimator: None,
1595 },
1596 )
1597 .await;
1598
1599 let actual_cfg = alice_mgr
1600 .get_surb_balancer_config(&session_id)
1601 .await?
1602 .ok_or(anyhow!("session must have a surb balancer config"))?;
1603 assert_eq!(actual_cfg, balancer_cfg);
1604
1605 let new_cfg = SurbBalancerConfig {
1606 target_surb_buffer_size: 2000,
1607 max_surbs_per_sec: 200,
1608 ..Default::default()
1609 };
1610 alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1611
1612 let actual_cfg = alice_mgr
1613 .get_surb_balancer_config(&session_id)
1614 .await?
1615 .ok_or(anyhow!("session must have a surb balancer config"))?;
1616 assert_eq!(actual_cfg, new_cfg);
1617
1618 Ok(())
1619 }
1620
1621 #[test_log::test(tokio::test)]
1622 async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1623 let alice_pseudonym = HoprPseudonym::random();
1624 let bob_peer: Address = (&ChainKeypair::random()).into();
1625
1626 let cfg = SessionManagerConfig {
1627 session_tag_range: 16u64..17u64, ..Default::default()
1629 };
1630
1631 let alice_mgr = SessionManager::new(Default::default());
1632 let bob_mgr = SessionManager::new(cfg);
1633
1634 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1636 bob_mgr
1637 .sessions
1638 .insert(
1639 SessionId::new(16u64, alice_pseudonym),
1640 SessionSlot {
1641 session_tx: Arc::new(dummy_tx),
1642 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1643 abort_handles: Vec::new(),
1644 surb_mgmt: None,
1645 surb_estimator: None,
1646 },
1647 )
1648 .await;
1649
1650 let mut sequence = mockall::Sequence::new();
1651 let mut alice_transport = MockMsgSender::new();
1652 let mut bob_transport = MockMsgSender::new();
1653
1654 let bob_mgr_clone = bob_mgr.clone();
1656 alice_transport
1657 .expect_send_message()
1658 .once()
1659 .in_sequence(&mut sequence)
1660 .withf(move |peer, data| {
1661 msg_type(data, StartProtocolDiscriminants::StartSession)
1662 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1663 })
1664 .returning(move |_, data| {
1665 let bob_mgr_clone = bob_mgr_clone.clone();
1666 Box::pin(async move {
1667 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1668 Ok(())
1669 })
1670 });
1671
1672 let alice_mgr_clone = alice_mgr.clone();
1674 bob_transport
1675 .expect_send_message()
1676 .once()
1677 .in_sequence(&mut sequence)
1678 .withf(move |peer, data| {
1679 msg_type(data, StartProtocolDiscriminants::SessionError)
1680 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1681 })
1682 .returning(move |_, data| {
1683 let alice_mgr_clone = alice_mgr_clone.clone();
1684 Box::pin(async move {
1685 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1686 Ok(())
1687 })
1688 });
1689
1690 let mut jhs = Vec::new();
1691
1692 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1694 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1695
1696 let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1698 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1699
1700 let result = alice_mgr
1701 .new_session(
1702 bob_peer,
1703 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1704 SessionClientConfig {
1705 capabilities: Capabilities::empty(),
1706 pseudonym: alice_pseudonym.into(),
1707 surb_management: None,
1708 ..Default::default()
1709 },
1710 )
1711 .await;
1712
1713 assert!(
1714 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1715 );
1716
1717 Ok(())
1718 }
1719
1720 #[test_log::test(tokio::test)]
1721 async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
1722 -> anyhow::Result<()> {
1723 let alice_pseudonym = HoprPseudonym::random();
1724 let bob_peer: Address = (&ChainKeypair::random()).into();
1725
1726 let cfg = SessionManagerConfig {
1727 maximum_sessions: 1,
1728 ..Default::default()
1729 };
1730
1731 let alice_mgr = SessionManager::new(Default::default());
1732 let bob_mgr = SessionManager::new(cfg);
1733
1734 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1736 bob_mgr
1737 .sessions
1738 .insert(
1739 SessionId::new(16u64, alice_pseudonym),
1740 SessionSlot {
1741 session_tx: Arc::new(dummy_tx),
1742 routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
1743 abort_handles: Vec::new(),
1744 surb_mgmt: None,
1745 surb_estimator: None,
1746 },
1747 )
1748 .await;
1749
1750 let mut sequence = mockall::Sequence::new();
1751 let mut alice_transport = MockMsgSender::new();
1752 let mut bob_transport = MockMsgSender::new();
1753
1754 let bob_mgr_clone = bob_mgr.clone();
1756 alice_transport
1757 .expect_send_message()
1758 .once()
1759 .in_sequence(&mut sequence)
1760 .withf(move |peer, data| {
1761 msg_type(data, StartProtocolDiscriminants::StartSession)
1762 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1763 })
1764 .returning(move |_, data| {
1765 let bob_mgr_clone = bob_mgr_clone.clone();
1766 Box::pin(async move {
1767 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1768 Ok(())
1769 })
1770 });
1771
1772 let alice_mgr_clone = alice_mgr.clone();
1774 bob_transport
1775 .expect_send_message()
1776 .once()
1777 .in_sequence(&mut sequence)
1778 .withf(move |peer, data| {
1779 msg_type(data, StartProtocolDiscriminants::SessionError)
1780 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1781 })
1782 .returning(move |_, data| {
1783 let alice_mgr_clone = alice_mgr_clone.clone();
1784 Box::pin(async move {
1785 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1786 Ok(())
1787 })
1788 });
1789
1790 let mut jhs = Vec::new();
1791
1792 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1794 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1795
1796 let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1798 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1799
1800 let result = alice_mgr
1801 .new_session(
1802 bob_peer,
1803 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1804 SessionClientConfig {
1805 capabilities: None.into(),
1806 pseudonym: alice_pseudonym.into(),
1807 surb_management: None,
1808 ..Default::default()
1809 },
1810 )
1811 .await;
1812
1813 assert!(
1814 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1815 );
1816
1817 Ok(())
1818 }
1819
1820 #[test_log::test(tokio::test)]
1821 async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
1822 let alice_pseudonym = HoprPseudonym::random();
1823 let bob_peer: Address = (&ChainKeypair::random()).into();
1824
1825 let alice_mgr = SessionManager::new(Default::default());
1826
1827 let mut sequence = mockall::Sequence::new();
1828 let mut alice_transport = MockMsgSender::new();
1829
1830 let alice_mgr_clone = alice_mgr.clone();
1832 alice_transport
1833 .expect_send_message()
1834 .once()
1835 .in_sequence(&mut sequence)
1836 .withf(move |peer, data| {
1837 msg_type(data, StartProtocolDiscriminants::StartSession)
1838 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1839 })
1840 .returning(move |_, data| {
1841 let alice_mgr_clone = alice_mgr_clone.clone();
1843 Box::pin(async move {
1844 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1845 Ok(())
1846 })
1847 });
1848
1849 let alice_mgr_clone = alice_mgr.clone();
1851 alice_transport
1852 .expect_send_message()
1853 .once()
1854 .in_sequence(&mut sequence)
1855 .withf(move |peer, data| {
1856 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1857 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1858 })
1859 .returning(move |_, data| {
1860 let alice_mgr_clone = alice_mgr_clone.clone();
1861
1862 Box::pin(async move {
1863 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1864 Ok(())
1865 })
1866 });
1867
1868 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1870 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
1871
1872 let alice_session = alice_mgr
1873 .new_session(
1874 bob_peer,
1875 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1876 SessionClientConfig {
1877 capabilities: None.into(),
1878 pseudonym: alice_pseudonym.into(),
1879 surb_management: None,
1880 ..Default::default()
1881 },
1882 )
1883 .await;
1884
1885 println!("{alice_session:?}");
1886 assert!(matches!(
1887 alice_session,
1888 Err(TransportSessionError::Manager(SessionManagerError::Loopback))
1889 ));
1890
1891 Ok(())
1892 }
1893
1894 #[test_log::test(tokio::test)]
1895 async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
1896 let bob_peer: Address = (&ChainKeypair::random()).into();
1897
1898 let cfg = SessionManagerConfig {
1899 initiation_timeout_base: Duration::from_millis(100),
1900 ..Default::default()
1901 };
1902
1903 let alice_mgr = SessionManager::new(cfg);
1904 let bob_mgr = SessionManager::new(Default::default());
1905
1906 let mut sequence = mockall::Sequence::new();
1907 let mut alice_transport = MockMsgSender::new();
1908 let bob_transport = MockMsgSender::new();
1909
1910 alice_transport
1912 .expect_send_message()
1913 .once()
1914 .in_sequence(&mut sequence)
1915 .withf(move |peer, data| {
1916 msg_type(data, StartProtocolDiscriminants::StartSession)
1917 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1918 })
1919 .returning(|_, _| Box::pin(async { Ok(()) }));
1920
1921 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1923 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
1924
1925 let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1927 bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
1928
1929 let result = alice_mgr
1930 .new_session(
1931 bob_peer,
1932 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1933 SessionClientConfig {
1934 capabilities: None.into(),
1935 pseudonym: None,
1936 surb_management: None,
1937 ..Default::default()
1938 },
1939 )
1940 .await;
1941
1942 assert!(matches!(result, Err(TransportSessionError::Timeout)));
1943
1944 Ok(())
1945 }
1946
1947 #[test_log::test(tokio::test)]
1948 async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
1949 let alice_pseudonym = HoprPseudonym::random();
1950 let bob_peer: Address = (&ChainKeypair::random()).into();
1951
1952 let bob_cfg = SessionManagerConfig::default();
1953 let alice_mgr = SessionManager::new(Default::default());
1954 let bob_mgr = SessionManager::new(bob_cfg.clone());
1955
1956 let mut alice_transport = MockMsgSender::new();
1957 let mut bob_transport = MockMsgSender::new();
1958
1959 let mut open_sequence = mockall::Sequence::new();
1961 let bob_mgr_clone = bob_mgr.clone();
1962 alice_transport
1963 .expect_send_message()
1964 .once()
1965 .in_sequence(&mut open_sequence)
1966 .withf(move |peer, data| {
1967 msg_type(data, StartProtocolDiscriminants::StartSession)
1968 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1969 })
1970 .returning(move |_, data| {
1971 let bob_mgr_clone = bob_mgr_clone.clone();
1972 Box::pin(async move {
1973 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1974 Ok(())
1975 })
1976 });
1977
1978 let alice_mgr_clone = alice_mgr.clone();
1980 bob_transport
1981 .expect_send_message()
1982 .once()
1983 .in_sequence(&mut open_sequence)
1984 .withf(move |peer, data| {
1985 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1986 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1987 })
1988 .returning(move |_, data| {
1989 let alice_mgr_clone = alice_mgr_clone.clone();
1990 Box::pin(async move {
1991 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1992 Ok(())
1993 })
1994 });
1995
1996 let bob_mgr_clone = bob_mgr.clone();
1998 alice_transport
1999 .expect_send_message()
2000 .times(5..)
2001 .withf(move |peer, data| {
2003 msg_type(data, StartProtocolDiscriminants::KeepAlive)
2004 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2005 })
2006 .returning(move |_, data| {
2007 let bob_mgr_clone = bob_mgr_clone.clone();
2008 Box::pin(async move {
2009 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
2010 Ok(())
2011 })
2012 });
2013
2014 let bob_mgr_clone = bob_mgr.clone();
2016 alice_transport
2017 .expect_send_message()
2018 .once()
2019 .withf(move |peer, data| {
2021 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2022 data.plain_text.as_ref(),
2023 )
2024 .ok()
2025 .and_then(|m| m.try_as_segment())
2026 .map(|s| s.is_terminating())
2027 .unwrap_or(false)
2028 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2029 })
2030 .returning(move |_, data| {
2031 let bob_mgr_clone = bob_mgr_clone.clone();
2032 Box::pin(async move {
2033 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
2034 Ok(())
2035 })
2036 });
2037
2038 let mut ahs = Vec::new();
2039
2040 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
2042 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2043
2044 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
2046 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2047
2048 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2049
2050 let balancer_cfg = SurbBalancerConfig {
2051 target_surb_buffer_size: 10,
2052 max_surbs_per_sec: 100,
2053 ..Default::default()
2054 };
2055
2056 pin_mut!(new_session_rx_bob);
2057 let (alice_session, bob_session) = timeout(
2058 Duration::from_secs(2),
2059 futures::future::join(
2060 alice_mgr.new_session(
2061 bob_peer,
2062 SessionTarget::TcpStream(target.clone()),
2063 SessionClientConfig {
2064 pseudonym: alice_pseudonym.into(),
2065 capabilities: Capability::Segmentation.into(),
2066 surb_management: Some(balancer_cfg),
2067 ..Default::default()
2068 },
2069 ),
2070 new_session_rx_bob.next(),
2071 ),
2072 )
2073 .await?;
2074
2075 let mut alice_session = alice_session?;
2076 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2077
2078 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2079
2080 assert_eq!(
2081 Some(balancer_cfg),
2082 alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2083 );
2084
2085 let remote_cfg = bob_mgr
2086 .get_surb_balancer_config(bob_session.session.id())
2087 .await?
2088 .ok_or(anyhow!("no remote config at bob"))?;
2089 assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2090 assert_eq!(
2091 remote_cfg.max_surbs_per_sec,
2092 remote_cfg.target_surb_buffer_size
2093 / bob_cfg
2094 .minimum_surb_buffer_duration
2095 .max(MIN_SURB_BUFFER_DURATION)
2096 .as_secs()
2097 );
2098
2099 tokio::time::sleep(Duration::from_millis(1500)).await;
2101 alice_session.close().await?;
2102
2103 tokio::time::sleep(Duration::from_millis(300)).await;
2104 assert!(matches!(
2105 alice_mgr.ping_session(alice_session.id()).await,
2106 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2107 ));
2108
2109 futures::stream::iter(ahs)
2110 .for_each(|ah| async move { ah.abort() })
2111 .await;
2112
2113 Ok(())
2114 }
2115}