1use std::{
2 ops::Range,
3 sync::{Arc, OnceLock},
4 time::Duration,
5};
6
7use futures::{
8 FutureExt, SinkExt, StreamExt, TryStreamExt, channel::mpsc::UnboundedSender, future::AbortHandle, pin_mut,
9};
10use futures_time::future::FutureExt as TimeExt;
11use hopr_crypto_random::Randomizable;
12use hopr_internal_types::prelude::HoprPseudonym;
13use hopr_network_types::prelude::*;
14use hopr_primitive_types::prelude::Address;
15use hopr_protocol_app::prelude::*;
16use hopr_protocol_start::{
17 KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished, StartInitiation,
18};
19use tracing::{debug, error, info, trace, warn};
20
21use crate::{
22 Capability, HoprSession, IncomingSession, SESSION_MTU, SessionClientConfig, SessionId, SessionTarget,
23 SurbBalancerConfig,
24 balancer::{
25 AtomicSurbFlowEstimator, BalancerConfigFeedback, RateController, RateLimitSinkExt, SurbBalancer,
26 SurbControllerWithCorrection,
27 pid::{PidBalancerController, PidControllerGains},
28 simple::SimpleBalancerController,
29 },
30 errors::{SessionManagerError, TransportSessionError},
31 types::{ByteCapabilities, ClosureReason, HoprSessionConfig, HoprStartProtocol},
32 utils,
33 utils::insert_into_next_slot,
34};
35
36#[cfg(all(feature = "prometheus", not(test)))]
37lazy_static::lazy_static! {
38 static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
39 "hopr_session_num_active_sessions",
40 "Number of currently active HOPR sessions"
41 ).unwrap();
42 static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
43 "hopr_session_established_sessions_count",
44 "Number of sessions that were successfully established as an Exit node"
45 ).unwrap();
46 static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
47 "hopr_session_initiated_sessions_count",
48 "Number of sessions that were successfully initiated as an Entry node"
49 ).unwrap();
50 static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
51 "hopr_session_received_error_count",
52 "Number of HOPR session errors received from an Exit node",
53 &["kind"]
54 ).unwrap();
55 static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
56 "hopr_session_sent_error_count",
57 "Number of HOPR session errors sent to an Entry node",
58 &["kind"]
59 ).unwrap();
60}
61
62fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
63 debug!(?session_id, ?reason, "closing session");
64
65 if reason != ClosureReason::EmptyRead {
66 session_data.session_tx.close_channel();
68 trace!(?session_id, "data tx channel closed on session");
69 }
70
71 session_data.abort_handles.into_iter().for_each(|h| h.abort());
73
74 #[cfg(all(feature = "prometheus", not(test)))]
75 METRIC_ACTIVE_SESSIONS.decrement(1.0);
76}
77
78fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
79 base * (hops as u32)
80}
81
82pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
84
85pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
87
88const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
90
91const MIN_FRAME_TIMEOUT: Duration = Duration::from_millis(10);
93
94type SessionInitiationCache =
98 moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
99
100#[derive(Clone)]
101struct SessionSlot {
102 session_tx: Arc<UnboundedSender<ApplicationDataIn>>,
105 routing_opts: DestinationRouting,
106 abort_handles: Vec<AbortHandle>,
107 surb_mgmt: Option<SurbBalancerConfig>,
110 surb_estimator: Option<AtomicSurbFlowEstimator>,
112}
113
114struct SessionCacheBalancerFeedback(moka::future::Cache<SessionId, SessionSlot>);
116
117#[async_trait::async_trait]
118impl BalancerConfigFeedback for SessionCacheBalancerFeedback {
119 async fn get_config(&self, id: &SessionId) -> crate::errors::Result<SurbBalancerConfig> {
120 self.0
123 .iter()
124 .find(|(sid, _)| sid.as_ref() == id)
125 .ok_or(SessionManagerError::NonExistingSession)?
126 .1
127 .surb_mgmt
128 .ok_or(SessionManagerError::Other("missing surb balancer config".into()).into())
129 }
130
131 async fn on_config_update(&self, id: &SessionId, cfg: SurbBalancerConfig) -> crate::errors::Result<()> {
132 if let moka::ops::compute::CompResult::ReplacedWith(_) = self
133 .0
134 .entry_by_ref(id)
135 .and_compute_with(|entry| {
136 futures::future::ready(match entry.map(|e| e.into_value()) {
137 None => moka::ops::compute::Op::Nop,
138 Some(mut updated_slot) => {
139 if let Some(balancer_cfg) = &mut updated_slot.surb_mgmt {
140 *balancer_cfg = cfg;
141 moka::ops::compute::Op::Put(updated_slot)
142 } else {
143 moka::ops::compute::Op::Nop
144 }
145 }
146 })
147 })
148 .await
149 {
150 Ok(())
151 } else {
152 Err(SessionManagerError::NonExistingSession.into())
153 }
154 }
155}
156
157#[derive(Clone, Debug, PartialEq, Eq)]
159pub enum DispatchResult {
160 Processed,
162 Unrelated(ApplicationDataIn),
164}
165
166type SessionNotifiers = (
168 UnboundedSender<IncomingSession>,
169 UnboundedSender<(SessionId, ClosureReason)>,
170);
171
172#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
174pub struct SessionManagerConfig {
175 #[doc(hidden)]
183 #[default(_code = "16u64..1024u64")]
184 pub session_tag_range: Range<u64>,
185
186 #[default(128)]
195 pub maximum_sessions: usize,
196
197 #[default(1500)]
201 pub frame_mtu: usize,
202
203 #[default(Duration::from_millis(800))]
207 pub max_frame_timeout: Duration,
208
209 #[default(Duration::from_millis(500))]
216 pub initiation_timeout_base: Duration,
217
218 #[default(Duration::from_secs(180))]
222 pub idle_timeout: Duration,
223
224 #[default(Duration::from_millis(100))]
229 pub balancer_sampling_interval: Duration,
230
231 #[default(10)]
237 pub initial_return_session_egress_rate: usize,
238 #[default(Duration::from_secs(5))]
249 pub minimum_surb_buffer_duration: Duration,
250 #[default(10_000)]
258 pub maximum_surb_buffer_size: usize,
259
260 #[default(_code = "Some((Duration::from_secs(60), 0.20))")]
268 pub growable_target_surb_buffer: Option<(Duration, f64)>,
269}
270
271pub struct SessionManager<S> {
414 session_initiations: SessionInitiationCache,
415 session_notifiers: Arc<OnceLock<SessionNotifiers>>,
416 sessions: moka::future::Cache<SessionId, SessionSlot>,
417 msg_sender: Arc<OnceLock<S>>,
418 cfg: SessionManagerConfig,
419}
420
421impl<S> Clone for SessionManager<S> {
422 fn clone(&self) -> Self {
423 Self {
424 session_initiations: self.session_initiations.clone(),
425 session_notifiers: self.session_notifiers.clone(),
426 sessions: self.sessions.clone(),
427 cfg: self.cfg.clone(),
428 msg_sender: self.msg_sender.clone(),
429 }
430 }
431}
432
433impl<S> SessionManager<S>
434where
435 S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
436 S::Error: std::error::Error + Send + Sync + Clone + 'static,
437{
438 pub fn new(mut cfg: SessionManagerConfig) -> Self {
440 let min_session_tag_range_reservation = ReservedTag::range().end;
441 debug_assert!(
442 min_session_tag_range_reservation > HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG.as_u64(),
443 "invalid tag reservation range"
444 );
445
446 if cfg.session_tag_range.start < min_session_tag_range_reservation {
448 let diff = min_session_tag_range_reservation - cfg.session_tag_range.start;
449 cfg.session_tag_range = min_session_tag_range_reservation..cfg.session_tag_range.end + diff;
450 }
451 cfg.maximum_sessions = cfg
452 .maximum_sessions
453 .clamp(1, (cfg.session_tag_range.end - cfg.session_tag_range.start) as usize);
454
455 cfg.frame_mtu = cfg.frame_mtu.max(SESSION_MTU);
457 cfg.max_frame_timeout = cfg.max_frame_timeout.max(MIN_FRAME_TIMEOUT);
458
459 #[cfg(all(feature = "prometheus", not(test)))]
460 METRIC_ACTIVE_SESSIONS.set(0.0);
461
462 let msg_sender = Arc::new(OnceLock::new());
463 Self {
464 msg_sender: msg_sender.clone(),
465 session_initiations: moka::future::Cache::builder()
466 .max_capacity(cfg.maximum_sessions as u64)
467 .time_to_live(
468 2 * initiation_timeout_max_one_way(
469 cfg.initiation_timeout_base,
470 RoutingOptions::MAX_INTERMEDIATE_HOPS,
471 ),
472 )
473 .build(),
474 sessions: moka::future::Cache::builder()
475 .max_capacity(cfg.maximum_sessions as u64)
476 .time_to_idle(cfg.idle_timeout)
477 .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
478 moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
479 trace!(?session_id, ?reason, "session evicted from the cache");
480 close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
481 }
482 _ => {}
483 })
484 .build(),
485 session_notifiers: Arc::new(OnceLock::new()),
486 cfg,
487 }
488 }
489
490 pub fn start(
496 &self,
497 msg_sender: S,
498 new_session_notifier: UnboundedSender<IncomingSession>,
499 ) -> crate::errors::Result<Vec<AbortHandle>> {
500 self.msg_sender
501 .set(msg_sender)
502 .map_err(|_| SessionManagerError::AlreadyStarted)?;
503
504 let (session_close_tx, session_close_rx) = futures::channel::mpsc::unbounded();
505 self.session_notifiers
506 .set((new_session_notifier, session_close_tx))
507 .map_err(|_| SessionManagerError::AlreadyStarted)?;
508
509 let myself = self.clone();
510 let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable!(session_close_rx.for_each_concurrent(
511 None,
512 move |(session_id, closure_reason)| {
513 let myself = myself.clone();
514 async move {
515 if let Some(session_data) = myself.sessions.remove(&session_id).await {
519 close_session(session_id, session_data, closure_reason);
520 } else {
521 debug!(
523 ?session_id,
524 ?closure_reason,
525 "could not find session id to close, maybe the session is already closed"
526 );
527 }
528 }
529 },
530 ));
531
532 let myself = self.clone();
537 let ah_session_expiration = hopr_async_runtime::spawn_as_abortable!(async move {
538 let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
539 let timeout = 2 * initiation_timeout_max_one_way(
540 myself.cfg.initiation_timeout_base,
541 RoutingOptions::MAX_INTERMEDIATE_HOPS,
542 )
543 .min(myself.cfg.idle_timeout)
544 .mul_f64(jitter)
545 / 2;
546 futures_time::stream::interval(timeout.into())
547 .for_each(|_| {
548 trace!("executing session cache evictions");
549 futures::future::join(
550 myself.sessions.run_pending_tasks(),
551 myself.session_initiations.run_pending_tasks(),
552 )
553 .map(|_| ())
554 })
555 .await;
556 });
557
558 Ok(vec![ah_closure_notifications, ah_session_expiration])
559 }
560
561 pub fn is_started(&self) -> bool {
563 self.session_notifiers.get().is_some()
564 }
565
566 async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
567 let abort_handles_clone = slot.abort_handles.clone();
569 if let moka::ops::compute::CompResult::Inserted(_) = self
570 .sessions
571 .entry(session_id)
572 .and_compute_with(|entry| {
573 futures::future::ready(if entry.is_none() {
574 moka::ops::compute::Op::Put(slot)
575 } else {
576 moka::ops::compute::Op::Nop
577 })
578 })
579 .await
580 {
581 #[cfg(all(feature = "prometheus", not(test)))]
582 {
583 METRIC_NUM_INITIATED_SESSIONS.increment();
584 METRIC_ACTIVE_SESSIONS.increment(1.0);
585 }
586
587 Ok(())
588 } else {
589 error!(%session_id, "session already exists - loopback attempt");
591 abort_handles_clone.into_iter().for_each(|ah| ah.abort());
592 Err(SessionManagerError::Loopback.into())
593 }
594 }
595
596 pub async fn new_session(
604 &self,
605 destination: Address,
606 target: SessionTarget,
607 cfg: SessionClientConfig,
608 ) -> crate::errors::Result<HoprSession> {
609 self.sessions.run_pending_tasks().await;
610 if self.cfg.maximum_sessions <= self.sessions.entry_count() as usize {
611 return Err(SessionManagerError::TooManySessions.into());
612 }
613
614 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
615
616 let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
617 let challenge = insert_into_next_slot(
618 &self.session_initiations,
619 |ch| {
620 if let Some(challenge) = ch {
621 ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
622 } else {
623 hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
624 }
625 },
626 tx_initiation_done,
627 )
628 .await
629 .ok_or(SessionManagerError::NoChallengeSlots)?; trace!(challenge, ?cfg, "initiating session with config");
633 let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
634 challenge,
635 target,
636 capabilities: ByteCapabilities(cfg.capabilities),
637 additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
638 cfg.surb_management
639 .map(|c| c.target_surb_buffer_size)
640 .unwrap_or(
641 self.cfg.initial_return_session_egress_rate as u64
642 * self
643 .cfg
644 .minimum_surb_buffer_duration
645 .max(MIN_SURB_BUFFER_DURATION)
646 .as_secs(),
647 )
648 .min(u32::MAX as u64) as u32
649 } else {
650 0
651 },
652 });
653
654 let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
655 let forward_routing = DestinationRouting::Forward {
656 destination,
657 pseudonym: Some(pseudonym), forward_options: cfg.forward_path_options.clone(),
659 return_options: cfg.return_path_options.clone().into(),
660 };
661
662 info!(challenge, %pseudonym, %destination, "new session request");
664 msg_sender
665 .send((
666 forward_routing.clone(),
667 ApplicationDataOut::with_no_packet_info(start_session_msg.try_into()?),
668 ))
669 .await
670 .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?;
671
672 let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
674 self.cfg.initiation_timeout_base,
675 cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
676 )
677 .into();
678
679 pin_mut!(rx_initiation_done);
681
682 trace!(challenge, "awaiting session establishment");
683 match rx_initiation_done.try_next().timeout(initiation_timeout).await {
684 Ok(Ok(Some(est))) => {
685 let session_id = est.session_id;
687 debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
688
689 let (tx, rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
690 let notifier = self
691 .session_notifiers
692 .get()
693 .map(|(_, notifier)| {
694 let notifier = notifier.clone();
695 Box::new(move |session_id: SessionId, reason: ClosureReason| {
696 let _ = notifier
697 .unbounded_send((session_id, reason))
698 .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
699 })
700 })
701 .ok_or(SessionManagerError::NotStarted)?;
702
703 if let Some(balancer_config) = cfg.surb_management {
707 let surb_estimator = AtomicSurbFlowEstimator::default();
708
709 let surb_estimator_clone = surb_estimator.clone();
711 let full_surb_scoring_sender =
712 msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
713 surb_estimator_clone.produced.fetch_add(
715 data.estimate_surbs_with_msg() as u64,
716 std::sync::atomic::Ordering::Relaxed,
717 );
718 futures::future::ok::<_, S::Error>((routing, data))
719 });
720
721 let max_out_organic_surbs = cfg.always_max_out_surbs;
724 let reduced_surb_scoring_sender = full_surb_scoring_sender.clone().with(
725 move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
729 if !max_out_organic_surbs {
730 data.packet_info
732 .get_or_insert_with(|| OutgoingPacketInfo {
733 max_surbs_in_packet: 1,
734 ..Default::default()
735 })
736 .max_surbs_in_packet = 1;
737 }
738 futures::future::ok::<_, S::Error>((routing, data))
739 },
740 );
741
742 let mut abort_handles = Vec::new();
743
744 let (ka_controller, ka_abort_handle) =
746 utils::spawn_keep_alive_stream(session_id, full_surb_scoring_sender, forward_routing.clone());
747 abort_handles.push(ka_abort_handle);
748
749 debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
751 let balancer = SurbBalancer::new(
752 session_id,
753 PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
755 surb_estimator.clone(),
756 ka_controller,
757 balancer_config,
758 );
759
760 let (level_stream, balancer_abort_handle) = balancer.start_control_loop(
761 self.cfg.balancer_sampling_interval,
762 SessionCacheBalancerFeedback(self.sessions.clone()),
763 None,
764 );
765 abort_handles.push(balancer_abort_handle);
766
767 self.insert_session_slot(
769 session_id,
770 SessionSlot {
771 session_tx: Arc::new(tx),
772 routing_opts: forward_routing.clone(),
773 abort_handles,
774 surb_mgmt: Some(balancer_config),
775 surb_estimator: None,
776 },
777 )
778 .await?;
779
780 match level_stream
783 .skip_while(|current_level| {
784 futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
785 })
786 .next()
787 .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
788 .await
789 {
790 Ok(Some(surb_level)) => {
791 info!(%session_id, surb_level, "session is ready");
792 }
793 Ok(None) => {
794 return Err(
795 SessionManagerError::Other("surb balancer was cancelled prematurely".into()).into(),
796 );
797 }
798 Err(_) => {
799 warn!(%session_id, "session didn't reach target SURB buffer size in time");
800 }
801 }
802
803 HoprSession::new(
804 session_id,
805 forward_routing,
806 HoprSessionConfig {
807 capabilities: cfg.capabilities,
808 frame_mtu: self.cfg.frame_mtu,
809 frame_timeout: self.cfg.max_frame_timeout,
810 },
811 (
812 reduced_surb_scoring_sender,
813 rx.inspect(move |_| {
814 surb_estimator
817 .consumed
818 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
819 }),
820 ),
821 Some(notifier),
822 )
823 } else {
824 warn!(%session_id, "session ready without SURB balancing");
825
826 self.insert_session_slot(
827 session_id,
828 SessionSlot {
829 session_tx: Arc::new(tx),
830 routing_opts: forward_routing.clone(),
831 abort_handles: vec![],
832 surb_mgmt: None,
833 surb_estimator: None,
834 },
835 )
836 .await?;
837
838 let max_out_organic_surbs = cfg.always_max_out_surbs;
841 let reduced_surb_sender =
842 msg_sender.with(move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
843 if !max_out_organic_surbs {
844 data.packet_info
845 .get_or_insert_with(|| OutgoingPacketInfo {
846 max_surbs_in_packet: 1,
847 ..Default::default()
848 })
849 .max_surbs_in_packet = 1;
850 }
851 futures::future::ok::<_, S::Error>((routing, data))
852 });
853
854 HoprSession::new(
855 session_id,
856 forward_routing,
857 HoprSessionConfig {
858 capabilities: cfg.capabilities,
859 frame_mtu: self.cfg.frame_mtu,
860 frame_timeout: self.cfg.max_frame_timeout,
861 },
862 (reduced_surb_sender, rx),
863 Some(notifier),
864 )
865 }
866 }
867 Ok(Ok(None)) => Err(SessionManagerError::Other(
868 "internal error: sender has been closed without completing the session establishment".into(),
869 )
870 .into()),
871 Ok(Err(e)) => {
872 error!(
874 challenge = e.challenge,
875 error = ?e,
876 "the other party rejected the session initiation with error"
877 );
878 Err(TransportSessionError::Rejected(e.reason))
879 }
880 Err(_) => {
881 error!(challenge, "session initiation attempt timed out");
883
884 #[cfg(all(feature = "prometheus", not(test)))]
885 METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
886
887 Err(TransportSessionError::Timeout)
888 }
889 }
890 }
891
892 pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
896 if let Some(session_data) = self.sessions.get(id).await {
897 trace!(session_id = ?id, "pinging manually session");
898 Ok(self
899 .msg_sender
900 .get()
901 .cloned()
902 .ok_or(SessionManagerError::NotStarted)?
903 .send((
904 session_data.routing_opts.clone(),
905 ApplicationDataOut::with_no_packet_info(HoprStartProtocol::KeepAlive((*id).into()).try_into()?),
906 ))
907 .await
908 .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?)
909 } else {
910 Err(SessionManagerError::NonExistingSession.into())
911 }
912 }
913
914 pub async fn active_sessions(&self) -> Vec<SessionId> {
916 self.sessions.run_pending_tasks().await;
917 self.sessions.iter().map(|(k, _)| *k).collect()
918 }
919
920 pub async fn update_surb_balancer_config(
925 &self,
926 id: &SessionId,
927 config: SurbBalancerConfig,
928 ) -> crate::errors::Result<()> {
929 match self
930 .sessions
931 .entry_by_ref(id)
932 .and_compute_with(|entry| {
933 futures::future::ready(if let Some(mut cached_session) = entry.map(|e| e.into_value()) {
934 if cached_session.surb_mgmt.is_some() {
936 cached_session.surb_mgmt = Some(config);
937 moka::ops::compute::Op::Put(cached_session)
938 } else {
939 moka::ops::compute::Op::Nop
940 }
941 } else {
942 moka::ops::compute::Op::Nop
943 })
944 })
945 .await
946 {
947 moka::ops::compute::CompResult::ReplacedWith(_) => Ok(()),
948 moka::ops::compute::CompResult::Unchanged(_) => {
949 Err(SessionManagerError::Other("session does not use SURB balancing".into()).into())
950 }
951 _ => Err(SessionManagerError::NonExistingSession.into()),
952 }
953 }
954
955 pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
959 match self.sessions.get(id).await {
960 Some(session) => Ok(session.surb_mgmt),
961 None => Err(SessionManagerError::NonExistingSession.into()),
962 }
963 }
964
965 pub async fn dispatch_message(
972 &self,
973 pseudonym: HoprPseudonym,
974 in_data: ApplicationDataIn,
975 ) -> crate::errors::Result<DispatchResult> {
976 if in_data.data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
977 trace!("dispatching Start protocol message");
979 return self
980 .handle_start_protocol_message(pseudonym, in_data)
981 .await
982 .map(|_| DispatchResult::Processed);
983 } else if self
984 .cfg
985 .session_tag_range
986 .contains(&in_data.data.application_tag.as_u64())
987 {
988 let session_id = SessionId::new(in_data.data.application_tag, pseudonym);
989
990 return if let Some(session_data) = self.sessions.get(&session_id).await {
991 trace!(?session_id, "received data for a registered session");
992
993 Ok(session_data
994 .session_tx
995 .unbounded_send(in_data)
996 .map(|_| DispatchResult::Processed)
997 .map_err(|e| SessionManagerError::Other(e.to_string()))?)
998 } else {
999 error!(%session_id, "received data from an unestablished session");
1000 Err(TransportSessionError::UnknownData)
1001 };
1002 }
1003
1004 trace!(tag = %in_data.data.application_tag, "received data not associated with session protocol or any existing session");
1005 Ok(DispatchResult::Unrelated(in_data))
1006 }
1007
1008 async fn handle_incoming_session_initiation(
1009 &self,
1010 pseudonym: HoprPseudonym,
1011 session_req: StartInitiation<SessionTarget, ByteCapabilities>,
1012 ) -> crate::errors::Result<()> {
1013 trace!(challenge = session_req.challenge, "received session initiation request");
1014
1015 debug!(%pseudonym, "got new session request, searching for a free session slot");
1016
1017 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
1018
1019 let (new_session_notifier, close_session_notifier) = self
1020 .session_notifiers
1021 .get()
1022 .cloned()
1023 .ok_or(SessionManagerError::NotStarted)?;
1024
1025 let reply_routing = DestinationRouting::Return(pseudonym.into());
1027
1028 let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1029
1030 self.sessions.run_pending_tasks().await; let allocated_slot = if self.cfg.maximum_sessions > self.sessions.entry_count() as usize {
1033 insert_into_next_slot(
1034 &self.sessions,
1035 |sid| {
1036 let next_tag: Tag = match sid {
1039 Some(session_id) => ((session_id.tag().as_u64() + 1) % self.cfg.session_tag_range.end)
1040 .max(self.cfg.session_tag_range.start)
1041 .into(),
1042 None => hopr_crypto_random::random_integer(
1043 self.cfg.session_tag_range.start,
1044 Some(self.cfg.session_tag_range.end),
1045 )
1046 .into(),
1047 };
1048 SessionId::new(next_tag, pseudonym)
1049 },
1050 SessionSlot {
1051 session_tx: Arc::new(tx_session_data),
1052 routing_opts: reply_routing.clone(),
1053 abort_handles: vec![],
1054 surb_mgmt: None,
1055 surb_estimator: None,
1056 },
1057 )
1058 .await
1059 } else {
1060 error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
1061 None
1062 };
1063
1064 if let Some(session_id) = allocated_slot {
1065 debug!(%session_id, ?session_req, "assigned a new session");
1066
1067 let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
1068 if let Err(error) = close_session_notifier.unbounded_send((session_id, reason)) {
1069 error!(%session_id, %error, %reason, "failed to notify session closure");
1070 }
1071 });
1072
1073 let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
1074 let surb_estimator = AtomicSurbFlowEstimator::default();
1075
1076 let egress_rate_control =
1078 RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1079
1080 let target_surb_buffer_size = if session_req.additional_data > 0 {
1083 (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1084 } else {
1085 self.cfg.initial_return_session_egress_rate as u64
1086 * self
1087 .cfg
1088 .minimum_surb_buffer_duration
1089 .max(MIN_SURB_BUFFER_DURATION)
1090 .as_secs()
1091 };
1092
1093 let surb_estimator_clone = surb_estimator.clone();
1094 let session = HoprSession::new(
1095 session_id,
1096 reply_routing.clone(),
1097 HoprSessionConfig {
1098 capabilities: session_req.capabilities.into(),
1099 frame_mtu: self.cfg.frame_mtu,
1100 frame_timeout: self.cfg.max_frame_timeout,
1101 },
1102 (
1103 msg_sender
1105 .clone()
1106 .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1107 surb_estimator_clone
1109 .consumed
1110 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1111 futures::future::ok::<_, S::Error>((routing, data))
1112 })
1113 .rate_limit_with_controller(&egress_rate_control)
1114 .buffer((2 * target_surb_buffer_size) as usize),
1115 rx_session_data.inspect(move |data| {
1117 surb_estimator_clone
1119 .produced
1120 .fetch_add(data.num_surbs_with_msg() as u64, std::sync::atomic::Ordering::Relaxed);
1121 }),
1122 ),
1123 Some(closure_notifier),
1124 )?;
1125
1126 let balancer_config = SurbBalancerConfig {
1130 target_surb_buffer_size,
1131 max_surbs_per_sec: target_surb_buffer_size
1133 / self
1134 .cfg
1135 .minimum_surb_buffer_duration
1136 .max(MIN_SURB_BUFFER_DURATION)
1137 .as_secs(),
1138 surb_decay: None,
1141 };
1142
1143 let (balancer_abort_handle, balancer_abort_reg) = AbortHandle::new_pair();
1145 if let moka::ops::compute::CompResult::ReplacedWith(_) = self
1146 .sessions
1147 .entry(session_id)
1148 .and_compute_with(|entry| {
1149 if let Some(mut cached_session) = entry.map(|c| c.into_value()) {
1150 cached_session.abort_handles.push(balancer_abort_handle);
1151 cached_session.surb_mgmt = Some(balancer_config);
1152 cached_session.surb_estimator = Some(surb_estimator.clone());
1153 futures::future::ready(moka::ops::compute::Op::Put(cached_session))
1154 } else {
1155 futures::future::ready(moka::ops::compute::Op::Nop)
1156 }
1157 })
1158 .await
1159 {
1160 debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1163 let balancer = SurbBalancer::new(
1164 session_id,
1165 if let Some((growth_window, ratio_threshold)) = self.cfg.growable_target_surb_buffer.as_ref() {
1166 SimpleBalancerController::with_increasing_setpoint(
1168 *ratio_threshold,
1169 (growth_window
1170 .div_duration_f64(self.cfg.balancer_sampling_interval)
1171 .round() as usize)
1172 .max(1),
1173 )
1174 } else {
1175 SimpleBalancerController::default()
1176 },
1177 surb_estimator,
1178 SurbControllerWithCorrection(egress_rate_control, 1), balancer_config,
1180 );
1181
1182 let _ = balancer.start_control_loop(
1183 self.cfg.balancer_sampling_interval,
1184 SessionCacheBalancerFeedback(self.sessions.clone()),
1185 Some(balancer_abort_reg),
1186 );
1187 } else {
1188 return Err(SessionManagerError::Other(
1190 "failed to spawn SURB balancer - inconsistent cache".into(),
1191 )
1192 .into());
1193 }
1194
1195 session
1196 } else {
1197 HoprSession::new(
1198 session_id,
1199 reply_routing.clone(),
1200 HoprSessionConfig {
1201 capabilities: session_req.capabilities.into(),
1202 frame_mtu: self.cfg.frame_mtu,
1203 frame_timeout: self.cfg.max_frame_timeout,
1204 },
1205 (msg_sender.clone(), rx_session_data),
1206 Some(closure_notifier),
1207 )?
1208 };
1209
1210 let incoming_session = IncomingSession {
1212 session,
1213 target: session_req.target,
1214 };
1215
1216 if let Err(error) = new_session_notifier.unbounded_send(incoming_session) {
1218 warn!(%error, "failed to send session to incoming session queue");
1219 }
1220
1221 trace!(?session_id, "session notification sent");
1222
1223 let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1226 orig_challenge: session_req.challenge,
1227 session_id,
1228 });
1229
1230 msg_sender
1231 .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1232 .await
1233 .map_err(|e| {
1234 SessionManagerError::Other(format!("failed to send session establishment message: {e}"))
1235 })?;
1236
1237 info!(%session_id, "new session established");
1238
1239 #[cfg(all(feature = "prometheus", not(test)))]
1240 {
1241 METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1242 METRIC_ACTIVE_SESSIONS.increment(1.0);
1243 }
1244 } else {
1245 error!(%pseudonym,"failed to reserve a new session slot");
1246
1247 let reason = StartErrorReason::NoSlotsAvailable;
1249 let data = HoprStartProtocol::SessionError(StartErrorType {
1250 challenge: session_req.challenge,
1251 reason,
1252 });
1253
1254 msg_sender
1255 .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1256 .await
1257 .map_err(|e| {
1258 SessionManagerError::Other(format!("failed to send session establishment error message: {e}"))
1259 })?;
1260
1261 trace!(%pseudonym, "session establishment failure message sent");
1262
1263 #[cfg(all(feature = "prometheus", not(test)))]
1264 METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1265 }
1266
1267 Ok(())
1268 }
1269
1270 async fn handle_start_protocol_message(
1271 &self,
1272 pseudonym: HoprPseudonym,
1273 data: ApplicationDataIn,
1274 ) -> crate::errors::Result<()> {
1275 match HoprStartProtocol::try_from(data.data)? {
1276 HoprStartProtocol::StartSession(session_req) => {
1277 self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1278 }
1279 HoprStartProtocol::SessionEstablished(est) => {
1280 trace!(
1281 session_id = ?est.session_id,
1282 "received session establishment confirmation"
1283 );
1284 let challenge = est.orig_challenge;
1285 let session_id = est.session_id;
1286 if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1287 if let Err(e) = tx_est.unbounded_send(Ok(est)) {
1288 return Err(SessionManagerError::Other(format!(
1289 "could not notify session {session_id} establishment: {e}"
1290 ))
1291 .into());
1292 }
1293 debug!(?session_id, challenge, "session establishment complete");
1294 } else {
1295 error!(%session_id, challenge, "unknown session establishment attempt or expired");
1296 }
1297 }
1298 HoprStartProtocol::SessionError(error) => {
1299 trace!(
1300 challenge = error.challenge,
1301 error = ?error.reason,
1302 "failed to initialize a session",
1303 );
1304 if let Some(tx_est) = self.session_initiations.remove(&error.challenge).await {
1307 if let Err(e) = tx_est.unbounded_send(Err(error)) {
1308 return Err(SessionManagerError::Other(format!(
1309 "could not notify session establishment error {error:?}: {e}"
1310 ))
1311 .into());
1312 }
1313 error!(
1314 challenge = error.challenge,
1315 ?error,
1316 "session establishment error received"
1317 );
1318 } else {
1319 error!(
1320 challenge = error.challenge,
1321 ?error,
1322 "session establishment attempt expired before error could be delivered"
1323 );
1324 }
1325
1326 #[cfg(all(feature = "prometheus", not(test)))]
1327 METRIC_RECEIVED_SESSION_ERRS.increment(&[&error.reason.to_string()])
1328 }
1329 HoprStartProtocol::KeepAlive(msg) => {
1330 let session_id = msg.session_id;
1331 if let Some(session_slot) = self.sessions.get(&session_id).await {
1332 trace!(?session_id, "received keep-alive request");
1333 if let Some(estimator) = session_slot.surb_estimator.as_ref() {
1336 estimator.produced.fetch_add(
1338 KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64,
1339 std::sync::atomic::Ordering::Relaxed,
1340 );
1341 }
1342 } else {
1343 debug!(%session_id, "received keep-alive request for an unknown session");
1344 }
1345 }
1346 }
1347
1348 Ok(())
1349 }
1350}
1351
1352#[cfg(test)]
1353mod tests {
1354 use anyhow::anyhow;
1355 use futures::{AsyncWriteExt, future::BoxFuture};
1356 use hopr_crypto_random::Randomizable;
1357 use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
1358 use hopr_primitive_types::prelude::Address;
1359 use hopr_protocol_start::StartProtocolDiscriminants;
1360 use tokio::time::timeout;
1361
1362 use super::*;
1363 use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1364
1365 #[async_trait::async_trait]
1366 trait SendMsg {
1367 async fn send_message(
1368 &self,
1369 routing: DestinationRouting,
1370 data: ApplicationDataOut,
1371 ) -> crate::errors::Result<()>;
1372 }
1373
1374 mockall::mock! {
1375 MsgSender {}
1376 impl SendMsg for MsgSender {
1377 fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationDataOut)
1378 -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1379 }
1380 }
1381
1382 fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationDataOut)> {
1383 let (tx, rx) = futures::channel::mpsc::unbounded();
1384 tokio::task::spawn(async move {
1385 pin_mut!(rx);
1386 while let Some((routing, data)) = rx.next().await {
1387 sender
1388 .send_message(routing, data)
1389 .await
1390 .expect("send message must not fail in mock");
1391 }
1392 });
1393 tx
1394 }
1395
1396 fn msg_type(data: &ApplicationDataOut, expected: StartProtocolDiscriminants) -> bool {
1397 HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1398 .map(|d| StartProtocolDiscriminants::from(d) == expected)
1399 .unwrap_or(false)
1400 }
1401
1402 #[test_log::test(tokio::test)]
1403 async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1404 {
1405 let alice_pseudonym = HoprPseudonym::random();
1406 let bob_peer: Address = (&ChainKeypair::random()).into();
1407
1408 let alice_mgr = SessionManager::new(Default::default());
1409 let bob_mgr = SessionManager::new(Default::default());
1410
1411 let mut sequence = mockall::Sequence::new();
1412 let mut alice_transport = MockMsgSender::new();
1413 let mut bob_transport = MockMsgSender::new();
1414
1415 let bob_mgr_clone = bob_mgr.clone();
1417 alice_transport
1418 .expect_send_message()
1419 .once()
1420 .in_sequence(&mut sequence)
1421 .withf(move |peer, data| {
1422 info!("alice sends {}", data.data.application_tag);
1423 msg_type(data, StartProtocolDiscriminants::StartSession)
1424 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1425 })
1426 .returning(move |_, data| {
1427 let bob_mgr_clone = bob_mgr_clone.clone();
1428 Box::pin(async move {
1429 bob_mgr_clone
1430 .dispatch_message(
1431 alice_pseudonym,
1432 ApplicationDataIn {
1433 data: data.data,
1434 packet_info: Default::default(),
1435 },
1436 )
1437 .await?;
1438 Ok(())
1439 })
1440 });
1441
1442 let alice_mgr_clone = alice_mgr.clone();
1444 bob_transport
1445 .expect_send_message()
1446 .once()
1447 .in_sequence(&mut sequence)
1448 .withf(move |peer, data| {
1449 info!("bob sends {}", data.data.application_tag);
1450 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1451 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1452 })
1453 .returning(move |_, data| {
1454 let alice_mgr_clone = alice_mgr_clone.clone();
1455
1456 Box::pin(async move {
1457 alice_mgr_clone
1458 .dispatch_message(
1459 alice_pseudonym,
1460 ApplicationDataIn {
1461 data: data.data,
1462 packet_info: Default::default(),
1463 },
1464 )
1465 .await?;
1466 Ok(())
1467 })
1468 });
1469
1470 let bob_mgr_clone = bob_mgr.clone();
1472 alice_transport
1473 .expect_send_message()
1474 .once()
1475 .in_sequence(&mut sequence)
1476 .withf(move |peer, data| {
1477 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1478 data.data.plain_text.as_ref(),
1479 )
1480 .expect("must be a session message")
1481 .try_as_segment()
1482 .expect("must be a segment")
1483 .is_terminating()
1484 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1485 })
1486 .returning(move |_, data| {
1487 let bob_mgr_clone = bob_mgr_clone.clone();
1488 Box::pin(async move {
1489 bob_mgr_clone
1490 .dispatch_message(
1491 alice_pseudonym,
1492 ApplicationDataIn {
1493 data: data.data,
1494 packet_info: Default::default(),
1495 },
1496 )
1497 .await?;
1498 Ok(())
1499 })
1500 });
1501
1502 let mut ahs = Vec::new();
1503
1504 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1506 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1507
1508 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1510 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1511
1512 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1513
1514 pin_mut!(new_session_rx_bob);
1515 let (alice_session, bob_session) = timeout(
1516 Duration::from_secs(2),
1517 futures::future::join(
1518 alice_mgr.new_session(
1519 bob_peer,
1520 SessionTarget::TcpStream(target.clone()),
1521 SessionClientConfig {
1522 pseudonym: alice_pseudonym.into(),
1523 capabilities: Capability::NoRateControl | Capability::Segmentation,
1524 surb_management: None,
1525 ..Default::default()
1526 },
1527 ),
1528 new_session_rx_bob.next(),
1529 ),
1530 )
1531 .await?;
1532
1533 let mut alice_session = alice_session?;
1534 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1535
1536 assert_eq!(
1537 alice_session.config().capabilities,
1538 Capability::Segmentation | Capability::NoRateControl
1539 );
1540 assert_eq!(
1541 alice_session.config().capabilities,
1542 bob_session.session.config().capabilities
1543 );
1544 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1545
1546 assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1547 assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1548 assert!(
1549 alice_mgr
1550 .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1551 .await
1552 .is_err()
1553 );
1554
1555 assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1556 assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1557 assert!(
1558 bob_mgr
1559 .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1560 .await
1561 .is_err()
1562 );
1563
1564 tokio::time::sleep(Duration::from_millis(100)).await;
1565 alice_session.close().await?;
1566
1567 tokio::time::sleep(Duration::from_millis(100)).await;
1568
1569 assert!(matches!(
1570 alice_mgr.ping_session(alice_session.id()).await,
1571 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1572 ));
1573
1574 futures::stream::iter(ahs)
1575 .for_each(|ah| async move { ah.abort() })
1576 .await;
1577
1578 Ok(())
1579 }
1580
1581 #[test_log::test(tokio::test)]
1582 async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1583 let alice_pseudonym = HoprPseudonym::random();
1584 let bob_peer: Address = (&ChainKeypair::random()).into();
1585
1586 let cfg = SessionManagerConfig {
1587 idle_timeout: Duration::from_millis(200),
1588 ..Default::default()
1589 };
1590
1591 let alice_mgr = SessionManager::new(cfg);
1592 let bob_mgr = SessionManager::new(Default::default());
1593
1594 let mut sequence = mockall::Sequence::new();
1595 let mut alice_transport = MockMsgSender::new();
1596 let mut bob_transport = MockMsgSender::new();
1597
1598 let bob_mgr_clone = bob_mgr.clone();
1600 alice_transport
1601 .expect_send_message()
1602 .once()
1603 .in_sequence(&mut sequence)
1604 .withf(move |peer, data| {
1605 msg_type(data, StartProtocolDiscriminants::StartSession)
1606 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1607 })
1608 .returning(move |_, data| {
1609 let bob_mgr_clone = bob_mgr_clone.clone();
1610 Box::pin(async move {
1611 bob_mgr_clone
1612 .dispatch_message(
1613 alice_pseudonym,
1614 ApplicationDataIn {
1615 data: data.data,
1616 packet_info: Default::default(),
1617 },
1618 )
1619 .await?;
1620 Ok(())
1621 })
1622 });
1623
1624 let alice_mgr_clone = alice_mgr.clone();
1626 bob_transport
1627 .expect_send_message()
1628 .once()
1629 .in_sequence(&mut sequence)
1630 .withf(move |peer, data| {
1631 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1632 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1633 })
1634 .returning(move |_, data| {
1635 let alice_mgr_clone = alice_mgr_clone.clone();
1636
1637 Box::pin(async move {
1638 alice_mgr_clone
1639 .dispatch_message(
1640 alice_pseudonym,
1641 ApplicationDataIn {
1642 data: data.data,
1643 packet_info: Default::default(),
1644 },
1645 )
1646 .await?;
1647 Ok(())
1648 })
1649 });
1650
1651 let mut ahs = Vec::new();
1652
1653 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1655 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1656
1657 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1659 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1660
1661 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1662
1663 pin_mut!(new_session_rx_bob);
1664 let (alice_session, bob_session) = timeout(
1665 Duration::from_secs(2),
1666 futures::future::join(
1667 alice_mgr.new_session(
1668 bob_peer,
1669 SessionTarget::TcpStream(target.clone()),
1670 SessionClientConfig {
1671 pseudonym: alice_pseudonym.into(),
1672 capabilities: Capability::NoRateControl | Capability::Segmentation,
1673 surb_management: None,
1674 ..Default::default()
1675 },
1676 ),
1677 new_session_rx_bob.next(),
1678 ),
1679 )
1680 .await?;
1681
1682 let alice_session = alice_session?;
1683 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1684
1685 assert_eq!(
1686 alice_session.config().capabilities,
1687 Capability::Segmentation | Capability::NoRateControl,
1688 );
1689 assert_eq!(
1690 alice_session.config().capabilities,
1691 bob_session.session.config().capabilities
1692 );
1693 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1694
1695 tokio::time::sleep(Duration::from_millis(300)).await;
1697
1698 assert!(matches!(
1699 alice_mgr.ping_session(alice_session.id()).await,
1700 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1701 ));
1702
1703 futures::stream::iter(ahs)
1704 .for_each(|ah| async move { ah.abort() })
1705 .await;
1706
1707 Ok(())
1708 }
1709
1710 #[test_log::test(tokio::test)]
1711 async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1712 let alice_pseudonym = HoprPseudonym::random();
1713 let session_id = SessionId::new(16u64, alice_pseudonym);
1714 let balancer_cfg = SurbBalancerConfig {
1715 target_surb_buffer_size: 1000,
1716 max_surbs_per_sec: 100,
1717 ..Default::default()
1718 };
1719
1720 let alice_mgr =
1721 SessionManager::<UnboundedSender<(DestinationRouting, ApplicationDataOut)>>::new(Default::default());
1722
1723 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1724 alice_mgr
1725 .sessions
1726 .insert(
1727 session_id,
1728 SessionSlot {
1729 session_tx: Arc::new(dummy_tx),
1730 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1731 abort_handles: Vec::new(),
1732 surb_mgmt: Some(balancer_cfg.clone()),
1733 surb_estimator: None,
1734 },
1735 )
1736 .await;
1737
1738 let actual_cfg = alice_mgr
1739 .get_surb_balancer_config(&session_id)
1740 .await?
1741 .ok_or(anyhow!("session must have a surb balancer config"))?;
1742 assert_eq!(actual_cfg, balancer_cfg);
1743
1744 let new_cfg = SurbBalancerConfig {
1745 target_surb_buffer_size: 2000,
1746 max_surbs_per_sec: 200,
1747 ..Default::default()
1748 };
1749 alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1750
1751 let actual_cfg = alice_mgr
1752 .get_surb_balancer_config(&session_id)
1753 .await?
1754 .ok_or(anyhow!("session must have a surb balancer config"))?;
1755 assert_eq!(actual_cfg, new_cfg);
1756
1757 Ok(())
1758 }
1759
1760 #[test_log::test(tokio::test)]
1761 async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1762 let alice_pseudonym = HoprPseudonym::random();
1763 let bob_peer: Address = (&ChainKeypair::random()).into();
1764
1765 let cfg = SessionManagerConfig {
1766 session_tag_range: 16u64..17u64, ..Default::default()
1768 };
1769
1770 let alice_mgr = SessionManager::new(Default::default());
1771 let bob_mgr = SessionManager::new(cfg);
1772
1773 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1775 bob_mgr
1776 .sessions
1777 .insert(
1778 SessionId::new(16u64, alice_pseudonym),
1779 SessionSlot {
1780 session_tx: Arc::new(dummy_tx),
1781 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1782 abort_handles: Vec::new(),
1783 surb_mgmt: None,
1784 surb_estimator: None,
1785 },
1786 )
1787 .await;
1788
1789 let mut sequence = mockall::Sequence::new();
1790 let mut alice_transport = MockMsgSender::new();
1791 let mut bob_transport = MockMsgSender::new();
1792
1793 let bob_mgr_clone = bob_mgr.clone();
1795 alice_transport
1796 .expect_send_message()
1797 .once()
1798 .in_sequence(&mut sequence)
1799 .withf(move |peer, data| {
1800 msg_type(data, StartProtocolDiscriminants::StartSession)
1801 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1802 })
1803 .returning(move |_, data| {
1804 let bob_mgr_clone = bob_mgr_clone.clone();
1805 Box::pin(async move {
1806 bob_mgr_clone
1807 .dispatch_message(
1808 alice_pseudonym,
1809 ApplicationDataIn {
1810 data: data.data,
1811 packet_info: Default::default(),
1812 },
1813 )
1814 .await?;
1815 Ok(())
1816 })
1817 });
1818
1819 let alice_mgr_clone = alice_mgr.clone();
1821 bob_transport
1822 .expect_send_message()
1823 .once()
1824 .in_sequence(&mut sequence)
1825 .withf(move |peer, data| {
1826 msg_type(data, StartProtocolDiscriminants::SessionError)
1827 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1828 })
1829 .returning(move |_, data| {
1830 let alice_mgr_clone = alice_mgr_clone.clone();
1831 Box::pin(async move {
1832 alice_mgr_clone
1833 .dispatch_message(
1834 alice_pseudonym,
1835 ApplicationDataIn {
1836 data: data.data,
1837 packet_info: Default::default(),
1838 },
1839 )
1840 .await?;
1841 Ok(())
1842 })
1843 });
1844
1845 let mut jhs = Vec::new();
1846
1847 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1849 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1850
1851 let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1853 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1854
1855 let result = alice_mgr
1856 .new_session(
1857 bob_peer,
1858 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1859 SessionClientConfig {
1860 capabilities: Capabilities::empty(),
1861 pseudonym: alice_pseudonym.into(),
1862 surb_management: None,
1863 ..Default::default()
1864 },
1865 )
1866 .await;
1867
1868 assert!(
1869 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1870 );
1871
1872 Ok(())
1873 }
1874
1875 #[test_log::test(tokio::test)]
1876 async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
1877 -> anyhow::Result<()> {
1878 let alice_pseudonym = HoprPseudonym::random();
1879 let bob_peer: Address = (&ChainKeypair::random()).into();
1880
1881 let cfg = SessionManagerConfig {
1882 maximum_sessions: 1,
1883 ..Default::default()
1884 };
1885
1886 let alice_mgr = SessionManager::new(Default::default());
1887 let bob_mgr = SessionManager::new(cfg);
1888
1889 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1891 bob_mgr
1892 .sessions
1893 .insert(
1894 SessionId::new(16u64, alice_pseudonym),
1895 SessionSlot {
1896 session_tx: Arc::new(dummy_tx),
1897 routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
1898 abort_handles: Vec::new(),
1899 surb_mgmt: None,
1900 surb_estimator: None,
1901 },
1902 )
1903 .await;
1904
1905 let mut sequence = mockall::Sequence::new();
1906 let mut alice_transport = MockMsgSender::new();
1907 let mut bob_transport = MockMsgSender::new();
1908
1909 let bob_mgr_clone = bob_mgr.clone();
1911 alice_transport
1912 .expect_send_message()
1913 .once()
1914 .in_sequence(&mut sequence)
1915 .withf(move |peer, data| {
1916 msg_type(data, StartProtocolDiscriminants::StartSession)
1917 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1918 })
1919 .returning(move |_, data| {
1920 let bob_mgr_clone = bob_mgr_clone.clone();
1921 Box::pin(async move {
1922 bob_mgr_clone
1923 .dispatch_message(
1924 alice_pseudonym,
1925 ApplicationDataIn {
1926 data: data.data,
1927 packet_info: Default::default(),
1928 },
1929 )
1930 .await?;
1931 Ok(())
1932 })
1933 });
1934
1935 let alice_mgr_clone = alice_mgr.clone();
1937 bob_transport
1938 .expect_send_message()
1939 .once()
1940 .in_sequence(&mut sequence)
1941 .withf(move |peer, data| {
1942 msg_type(data, StartProtocolDiscriminants::SessionError)
1943 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1944 })
1945 .returning(move |_, data| {
1946 let alice_mgr_clone = alice_mgr_clone.clone();
1947 Box::pin(async move {
1948 alice_mgr_clone
1949 .dispatch_message(
1950 alice_pseudonym,
1951 ApplicationDataIn {
1952 data: data.data,
1953 packet_info: Default::default(),
1954 },
1955 )
1956 .await?;
1957 Ok(())
1958 })
1959 });
1960
1961 let mut jhs = Vec::new();
1962
1963 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1965 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1966
1967 let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1969 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1970
1971 let result = alice_mgr
1972 .new_session(
1973 bob_peer,
1974 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1975 SessionClientConfig {
1976 capabilities: None.into(),
1977 pseudonym: alice_pseudonym.into(),
1978 surb_management: None,
1979 ..Default::default()
1980 },
1981 )
1982 .await;
1983
1984 assert!(
1985 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1986 );
1987
1988 Ok(())
1989 }
1990
1991 #[test_log::test(tokio::test)]
1992 async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
1993 let alice_pseudonym = HoprPseudonym::random();
1994 let bob_peer: Address = (&ChainKeypair::random()).into();
1995
1996 let alice_mgr = SessionManager::new(Default::default());
1997
1998 let mut sequence = mockall::Sequence::new();
1999 let mut alice_transport = MockMsgSender::new();
2000
2001 let alice_mgr_clone = alice_mgr.clone();
2003 alice_transport
2004 .expect_send_message()
2005 .once()
2006 .in_sequence(&mut sequence)
2007 .withf(move |peer, data| {
2008 msg_type(data, StartProtocolDiscriminants::StartSession)
2009 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2010 })
2011 .returning(move |_, data| {
2012 let alice_mgr_clone = alice_mgr_clone.clone();
2014 Box::pin(async move {
2015 alice_mgr_clone
2016 .dispatch_message(
2017 alice_pseudonym,
2018 ApplicationDataIn {
2019 data: data.data,
2020 packet_info: Default::default(),
2021 },
2022 )
2023 .await?;
2024 Ok(())
2025 })
2026 });
2027
2028 let alice_mgr_clone = alice_mgr.clone();
2030 alice_transport
2031 .expect_send_message()
2032 .once()
2033 .in_sequence(&mut sequence)
2034 .withf(move |peer, data| {
2035 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2036 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2037 })
2038 .returning(move |_, data| {
2039 let alice_mgr_clone = alice_mgr_clone.clone();
2040
2041 Box::pin(async move {
2042 alice_mgr_clone
2043 .dispatch_message(
2044 alice_pseudonym,
2045 ApplicationDataIn {
2046 data: data.data,
2047 packet_info: Default::default(),
2048 },
2049 )
2050 .await?;
2051 Ok(())
2052 })
2053 });
2054
2055 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
2057 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2058
2059 let alice_session = alice_mgr
2060 .new_session(
2061 bob_peer,
2062 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2063 SessionClientConfig {
2064 capabilities: None.into(),
2065 pseudonym: alice_pseudonym.into(),
2066 surb_management: None,
2067 ..Default::default()
2068 },
2069 )
2070 .await;
2071
2072 println!("{alice_session:?}");
2073 assert!(matches!(
2074 alice_session,
2075 Err(TransportSessionError::Manager(SessionManagerError::Loopback))
2076 ));
2077
2078 Ok(())
2079 }
2080
2081 #[test_log::test(tokio::test)]
2082 async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
2083 let bob_peer: Address = (&ChainKeypair::random()).into();
2084
2085 let cfg = SessionManagerConfig {
2086 initiation_timeout_base: Duration::from_millis(100),
2087 ..Default::default()
2088 };
2089
2090 let alice_mgr = SessionManager::new(cfg);
2091 let bob_mgr = SessionManager::new(Default::default());
2092
2093 let mut sequence = mockall::Sequence::new();
2094 let mut alice_transport = MockMsgSender::new();
2095 let bob_transport = MockMsgSender::new();
2096
2097 alice_transport
2099 .expect_send_message()
2100 .once()
2101 .in_sequence(&mut sequence)
2102 .withf(move |peer, data| {
2103 msg_type(data, StartProtocolDiscriminants::StartSession)
2104 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2105 })
2106 .returning(|_, _| Box::pin(async { Ok(()) }));
2107
2108 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
2110 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2111
2112 let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
2114 bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
2115
2116 let result = alice_mgr
2117 .new_session(
2118 bob_peer,
2119 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2120 SessionClientConfig {
2121 capabilities: None.into(),
2122 pseudonym: None,
2123 surb_management: None,
2124 ..Default::default()
2125 },
2126 )
2127 .await;
2128
2129 assert!(matches!(result, Err(TransportSessionError::Timeout)));
2130
2131 Ok(())
2132 }
2133
2134 #[test_log::test(tokio::test)]
2135 async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
2136 let alice_pseudonym = HoprPseudonym::random();
2137 let bob_peer: Address = (&ChainKeypair::random()).into();
2138
2139 let bob_cfg = SessionManagerConfig::default();
2140 let alice_mgr = SessionManager::new(Default::default());
2141 let bob_mgr = SessionManager::new(bob_cfg.clone());
2142
2143 let mut alice_transport = MockMsgSender::new();
2144 let mut bob_transport = MockMsgSender::new();
2145
2146 let mut open_sequence = mockall::Sequence::new();
2148 let bob_mgr_clone = bob_mgr.clone();
2149 alice_transport
2150 .expect_send_message()
2151 .once()
2152 .in_sequence(&mut open_sequence)
2153 .withf(move |peer, data| {
2154 msg_type(data, StartProtocolDiscriminants::StartSession)
2155 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2156 })
2157 .returning(move |_, data| {
2158 let bob_mgr_clone = bob_mgr_clone.clone();
2159 Box::pin(async move {
2160 bob_mgr_clone
2161 .dispatch_message(
2162 alice_pseudonym,
2163 ApplicationDataIn {
2164 data: data.data,
2165 packet_info: Default::default(),
2166 },
2167 )
2168 .await?;
2169 Ok(())
2170 })
2171 });
2172
2173 let alice_mgr_clone = alice_mgr.clone();
2175 bob_transport
2176 .expect_send_message()
2177 .once()
2178 .in_sequence(&mut open_sequence)
2179 .withf(move |peer, data| {
2180 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2181 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2182 })
2183 .returning(move |_, data| {
2184 let alice_mgr_clone = alice_mgr_clone.clone();
2185 Box::pin(async move {
2186 alice_mgr_clone
2187 .dispatch_message(
2188 alice_pseudonym,
2189 ApplicationDataIn {
2190 data: data.data,
2191 packet_info: Default::default(),
2192 },
2193 )
2194 .await?;
2195 Ok(())
2196 })
2197 });
2198
2199 let bob_mgr_clone = bob_mgr.clone();
2201 alice_transport
2202 .expect_send_message()
2203 .times(5..)
2204 .withf(move |peer, data| {
2206 msg_type(data, StartProtocolDiscriminants::KeepAlive)
2207 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2208 })
2209 .returning(move |_, data| {
2210 let bob_mgr_clone = bob_mgr_clone.clone();
2211 Box::pin(async move {
2212 bob_mgr_clone
2213 .dispatch_message(
2214 alice_pseudonym,
2215 ApplicationDataIn {
2216 data: data.data,
2217 packet_info: Default::default(),
2218 },
2219 )
2220 .await?;
2221 Ok(())
2222 })
2223 });
2224
2225 let bob_mgr_clone = bob_mgr.clone();
2227 alice_transport
2228 .expect_send_message()
2229 .once()
2230 .withf(move |peer, data| {
2232 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2233 data.data.plain_text.as_ref(),
2234 )
2235 .ok()
2236 .and_then(|m| m.try_as_segment())
2237 .map(|s| s.is_terminating())
2238 .unwrap_or(false)
2239 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2240 })
2241 .returning(move |_, data| {
2242 let bob_mgr_clone = bob_mgr_clone.clone();
2243 Box::pin(async move {
2244 bob_mgr_clone
2245 .dispatch_message(
2246 alice_pseudonym,
2247 ApplicationDataIn {
2248 data: data.data,
2249 packet_info: Default::default(),
2250 },
2251 )
2252 .await?;
2253 Ok(())
2254 })
2255 });
2256
2257 let mut ahs = Vec::new();
2258
2259 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
2261 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2262
2263 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
2265 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2266
2267 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2268
2269 let balancer_cfg = SurbBalancerConfig {
2270 target_surb_buffer_size: 10,
2271 max_surbs_per_sec: 100,
2272 ..Default::default()
2273 };
2274
2275 pin_mut!(new_session_rx_bob);
2276 let (alice_session, bob_session) = timeout(
2277 Duration::from_secs(2),
2278 futures::future::join(
2279 alice_mgr.new_session(
2280 bob_peer,
2281 SessionTarget::TcpStream(target.clone()),
2282 SessionClientConfig {
2283 pseudonym: alice_pseudonym.into(),
2284 capabilities: Capability::Segmentation.into(),
2285 surb_management: Some(balancer_cfg),
2286 ..Default::default()
2287 },
2288 ),
2289 new_session_rx_bob.next(),
2290 ),
2291 )
2292 .await?;
2293
2294 let mut alice_session = alice_session?;
2295 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2296
2297 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2298
2299 assert_eq!(
2300 Some(balancer_cfg),
2301 alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2302 );
2303
2304 let remote_cfg = bob_mgr
2305 .get_surb_balancer_config(bob_session.session.id())
2306 .await?
2307 .ok_or(anyhow!("no remote config at bob"))?;
2308 assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2309 assert_eq!(
2310 remote_cfg.max_surbs_per_sec,
2311 remote_cfg.target_surb_buffer_size
2312 / bob_cfg
2313 .minimum_surb_buffer_duration
2314 .max(MIN_SURB_BUFFER_DURATION)
2315 .as_secs()
2316 );
2317
2318 tokio::time::sleep(Duration::from_millis(1500)).await;
2320 alice_session.close().await?;
2321
2322 tokio::time::sleep(Duration::from_millis(300)).await;
2323 assert!(matches!(
2324 alice_mgr.ping_session(alice_session.id()).await,
2325 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2326 ));
2327
2328 futures::stream::iter(ahs)
2329 .for_each(|ah| async move { ah.abort() })
2330 .await;
2331
2332 Ok(())
2333 }
2334}