1use std::{
2 ops::Range,
3 sync::{Arc, OnceLock},
4 time::Duration,
5};
6
7use futures::{
8 FutureExt, SinkExt, StreamExt, TryStreamExt,
9 channel::mpsc::{Sender, UnboundedSender},
10 future::AbortHandle,
11 pin_mut,
12};
13use futures_time::future::FutureExt as TimeExt;
14use hopr_crypto_random::Randomizable;
15use hopr_internal_types::prelude::HoprPseudonym;
16use hopr_network_types::prelude::*;
17use hopr_primitive_types::prelude::Address;
18use hopr_protocol_app::prelude::*;
19use hopr_protocol_start::{
20 KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished, StartInitiation,
21};
22use tracing::{debug, error, info, trace, warn};
23
24use crate::{
25 Capability, HoprSession, IncomingSession, SESSION_MTU, SessionClientConfig, SessionId, SessionTarget,
26 SurbBalancerConfig,
27 balancer::{
28 AtomicSurbFlowEstimator, BalancerConfigFeedback, RateController, RateLimitSinkExt, SurbBalancer,
29 SurbControllerWithCorrection,
30 pid::{PidBalancerController, PidControllerGains},
31 simple::SimpleBalancerController,
32 },
33 errors::{SessionManagerError, TransportSessionError},
34 types::{ByteCapabilities, ClosureReason, HoprSessionConfig, HoprStartProtocol},
35 utils,
36 utils::insert_into_next_slot,
37};
38
39#[cfg(all(feature = "prometheus", not(test)))]
40lazy_static::lazy_static! {
41 static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
42 "hopr_session_num_active_sessions",
43 "Number of currently active HOPR sessions"
44 ).unwrap();
45 static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
46 "hopr_session_established_sessions_count",
47 "Number of sessions that were successfully established as an Exit node"
48 ).unwrap();
49 static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
50 "hopr_session_initiated_sessions_count",
51 "Number of sessions that were successfully initiated as an Entry node"
52 ).unwrap();
53 static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
54 "hopr_session_received_error_count",
55 "Number of HOPR session errors received from an Exit node",
56 &["kind"]
57 ).unwrap();
58 static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
59 "hopr_session_sent_error_count",
60 "Number of HOPR session errors sent to an Entry node",
61 &["kind"]
62 ).unwrap();
63}
64
65fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
66 debug!(?session_id, ?reason, "closing session");
67
68 if reason != ClosureReason::EmptyRead {
69 session_data.session_tx.close_channel();
71 trace!(?session_id, "data tx channel closed on session");
72 }
73
74 session_data.abort_handles.into_iter().for_each(|h| h.abort());
76
77 #[cfg(all(feature = "prometheus", not(test)))]
78 METRIC_ACTIVE_SESSIONS.decrement(1.0);
79}
80
81fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
82 base * (hops as u32)
83}
84
85pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
87
88pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
90
91const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
93
94const MIN_FRAME_TIMEOUT: Duration = Duration::from_millis(10);
96
97type SessionInitiationCache =
101 moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
102
103#[derive(Clone)]
104struct SessionSlot {
105 session_tx: Arc<UnboundedSender<ApplicationDataIn>>,
108 routing_opts: DestinationRouting,
109 abort_handles: Vec<AbortHandle>,
110 surb_mgmt: Option<SurbBalancerConfig>,
113 surb_estimator: Option<AtomicSurbFlowEstimator>,
115}
116
117struct SessionCacheBalancerFeedback(moka::future::Cache<SessionId, SessionSlot>);
119
120#[async_trait::async_trait]
121impl BalancerConfigFeedback for SessionCacheBalancerFeedback {
122 async fn get_config(&self, id: &SessionId) -> crate::errors::Result<SurbBalancerConfig> {
123 self.0
126 .iter()
127 .find(|(sid, _)| sid.as_ref() == id)
128 .ok_or(SessionManagerError::NonExistingSession)?
129 .1
130 .surb_mgmt
131 .ok_or(SessionManagerError::Other("missing surb balancer config".into()).into())
132 }
133
134 async fn on_config_update(&self, id: &SessionId, cfg: SurbBalancerConfig) -> crate::errors::Result<()> {
135 if let moka::ops::compute::CompResult::ReplacedWith(_) = self
136 .0
137 .entry_by_ref(id)
138 .and_compute_with(|entry| {
139 futures::future::ready(match entry.map(|e| e.into_value()) {
140 None => moka::ops::compute::Op::Nop,
141 Some(mut updated_slot) => {
142 if let Some(balancer_cfg) = &mut updated_slot.surb_mgmt {
143 *balancer_cfg = cfg;
144 moka::ops::compute::Op::Put(updated_slot)
145 } else {
146 moka::ops::compute::Op::Nop
147 }
148 }
149 })
150 })
151 .await
152 {
153 Ok(())
154 } else {
155 Err(SessionManagerError::NonExistingSession.into())
156 }
157 }
158}
159
160#[derive(Clone, Debug, PartialEq, Eq)]
162pub enum DispatchResult {
163 Processed,
165 Unrelated(ApplicationDataIn),
167}
168
169#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
171pub struct SessionManagerConfig {
172 #[doc(hidden)]
180 #[default(_code = "16u64..1024u64")]
181 pub session_tag_range: Range<u64>,
182
183 #[default(128)]
192 pub maximum_sessions: usize,
193
194 #[default(1500)]
198 pub frame_mtu: usize,
199
200 #[default(Duration::from_millis(800))]
204 pub max_frame_timeout: Duration,
205
206 #[default(Duration::from_millis(500))]
213 pub initiation_timeout_base: Duration,
214
215 #[default(Duration::from_secs(180))]
219 pub idle_timeout: Duration,
220
221 #[default(Duration::from_millis(100))]
226 pub balancer_sampling_interval: Duration,
227
228 #[default(10)]
234 pub initial_return_session_egress_rate: usize,
235 #[default(Duration::from_secs(5))]
246 pub minimum_surb_buffer_duration: Duration,
247 #[default(10_000)]
255 pub maximum_surb_buffer_size: usize,
256
257 #[default(_code = "Some((Duration::from_secs(60), 0.20))")]
265 pub growable_target_surb_buffer: Option<(Duration, f64)>,
266}
267
268pub struct SessionManager<S, T> {
411 session_initiations: SessionInitiationCache,
412 #[allow(clippy::type_complexity)]
413 session_notifiers: Arc<OnceLock<(T, Sender<(SessionId, ClosureReason)>)>>,
414 sessions: moka::future::Cache<SessionId, SessionSlot>,
415 msg_sender: Arc<OnceLock<S>>,
416 cfg: SessionManagerConfig,
417}
418
419impl<S, T> Clone for SessionManager<S, T> {
420 fn clone(&self) -> Self {
421 Self {
422 session_initiations: self.session_initiations.clone(),
423 session_notifiers: self.session_notifiers.clone(),
424 sessions: self.sessions.clone(),
425 cfg: self.cfg.clone(),
426 msg_sender: self.msg_sender.clone(),
427 }
428 }
429}
430
431const EXTERNAL_SEND_TIMEOUT: Duration = Duration::from_millis(200);
432
433impl<S, T> SessionManager<S, T>
434where
435 S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
436 T: futures::Sink<IncomingSession> + Clone + Send + Sync + Unpin + 'static,
437 S::Error: std::error::Error + Send + Sync + Clone + 'static,
438 T::Error: std::error::Error + Send + Sync + Clone + 'static,
439{
440 pub fn new(mut cfg: SessionManagerConfig) -> Self {
442 let min_session_tag_range_reservation = ReservedTag::range().end;
443 debug_assert!(
444 min_session_tag_range_reservation > HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG.as_u64(),
445 "invalid tag reservation range"
446 );
447
448 if cfg.session_tag_range.start < min_session_tag_range_reservation {
450 let diff = min_session_tag_range_reservation - cfg.session_tag_range.start;
451 cfg.session_tag_range = min_session_tag_range_reservation..cfg.session_tag_range.end + diff;
452 }
453 cfg.maximum_sessions = cfg
454 .maximum_sessions
455 .clamp(1, (cfg.session_tag_range.end - cfg.session_tag_range.start) as usize);
456
457 cfg.frame_mtu = cfg.frame_mtu.max(SESSION_MTU);
459 cfg.max_frame_timeout = cfg.max_frame_timeout.max(MIN_FRAME_TIMEOUT);
460
461 #[cfg(all(feature = "prometheus", not(test)))]
462 METRIC_ACTIVE_SESSIONS.set(0.0);
463
464 let msg_sender = Arc::new(OnceLock::new());
465 Self {
466 msg_sender: msg_sender.clone(),
467 session_initiations: moka::future::Cache::builder()
468 .max_capacity(cfg.maximum_sessions as u64)
469 .time_to_live(
470 2 * initiation_timeout_max_one_way(
471 cfg.initiation_timeout_base,
472 RoutingOptions::MAX_INTERMEDIATE_HOPS,
473 ),
474 )
475 .build(),
476 sessions: moka::future::Cache::builder()
477 .max_capacity(cfg.maximum_sessions as u64)
478 .time_to_idle(cfg.idle_timeout)
479 .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
480 moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
481 trace!(?session_id, ?reason, "session evicted from the cache");
482 close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
483 }
484 _ => {}
485 })
486 .build(),
487 session_notifiers: Arc::new(OnceLock::new()),
488 cfg,
489 }
490 }
491
492 pub fn start(&self, msg_sender: S, new_session_notifier: T) -> crate::errors::Result<Vec<AbortHandle>> {
498 self.msg_sender
499 .set(msg_sender)
500 .map_err(|_| SessionManagerError::AlreadyStarted)?;
501
502 let (session_close_tx, session_close_rx) = futures::channel::mpsc::channel(self.cfg.maximum_sessions + 10);
503 self.session_notifiers
504 .set((new_session_notifier, session_close_tx))
505 .map_err(|_| SessionManagerError::AlreadyStarted)?;
506
507 let myself = self.clone();
508 let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable!(session_close_rx.for_each_concurrent(
509 None,
510 move |(session_id, closure_reason)| {
511 let myself = myself.clone();
512 async move {
513 if let Some(session_data) = myself.sessions.remove(&session_id).await {
517 close_session(session_id, session_data, closure_reason);
518 } else {
519 debug!(
521 ?session_id,
522 ?closure_reason,
523 "could not find session id to close, maybe the session is already closed"
524 );
525 }
526 }
527 },
528 ));
529
530 let myself = self.clone();
535 let ah_session_expiration = hopr_async_runtime::spawn_as_abortable!(async move {
536 let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
537 let timeout = 2 * initiation_timeout_max_one_way(
538 myself.cfg.initiation_timeout_base,
539 RoutingOptions::MAX_INTERMEDIATE_HOPS,
540 )
541 .min(myself.cfg.idle_timeout)
542 .mul_f64(jitter)
543 / 2;
544 futures_time::stream::interval(timeout.into())
545 .for_each(|_| {
546 trace!("executing session cache evictions");
547 futures::future::join(
548 myself.sessions.run_pending_tasks(),
549 myself.session_initiations.run_pending_tasks(),
550 )
551 .map(|_| ())
552 })
553 .await;
554 });
555
556 Ok(vec![ah_closure_notifications, ah_session_expiration])
557 }
558
559 pub fn is_started(&self) -> bool {
561 self.session_notifiers.get().is_some()
562 }
563
564 async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
565 let abort_handles_clone = slot.abort_handles.clone();
567 if let moka::ops::compute::CompResult::Inserted(_) = self
568 .sessions
569 .entry(session_id)
570 .and_compute_with(|entry| {
571 futures::future::ready(if entry.is_none() {
572 moka::ops::compute::Op::Put(slot)
573 } else {
574 moka::ops::compute::Op::Nop
575 })
576 })
577 .await
578 {
579 #[cfg(all(feature = "prometheus", not(test)))]
580 {
581 METRIC_NUM_INITIATED_SESSIONS.increment();
582 METRIC_ACTIVE_SESSIONS.increment(1.0);
583 }
584
585 Ok(())
586 } else {
587 error!(%session_id, "session already exists - loopback attempt");
589 abort_handles_clone.into_iter().for_each(|ah| ah.abort());
590 Err(SessionManagerError::Loopback.into())
591 }
592 }
593
594 pub async fn new_session(
602 &self,
603 destination: Address,
604 target: SessionTarget,
605 cfg: SessionClientConfig,
606 ) -> crate::errors::Result<HoprSession> {
607 self.sessions.run_pending_tasks().await;
608 if self.cfg.maximum_sessions <= self.sessions.entry_count() as usize {
609 return Err(SessionManagerError::TooManySessions.into());
610 }
611
612 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
613
614 let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
615 let challenge = insert_into_next_slot(
616 &self.session_initiations,
617 |ch| {
618 if let Some(challenge) = ch {
619 ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
620 } else {
621 hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
622 }
623 },
624 tx_initiation_done,
625 )
626 .await
627 .ok_or(SessionManagerError::NoChallengeSlots)?; trace!(challenge, ?cfg, "initiating session with config");
631 let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
632 challenge,
633 target,
634 capabilities: ByteCapabilities(cfg.capabilities),
635 additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
636 cfg.surb_management
637 .map(|c| c.target_surb_buffer_size)
638 .unwrap_or(
639 self.cfg.initial_return_session_egress_rate as u64
640 * self
641 .cfg
642 .minimum_surb_buffer_duration
643 .max(MIN_SURB_BUFFER_DURATION)
644 .as_secs(),
645 )
646 .min(u32::MAX as u64) as u32
647 } else {
648 0
649 },
650 });
651
652 let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
653 let forward_routing = DestinationRouting::Forward {
654 destination: Box::new(destination.into()),
655 pseudonym: Some(pseudonym), forward_options: cfg.forward_path_options.clone(),
657 return_options: cfg.return_path_options.clone().into(),
658 };
659
660 info!(challenge, %pseudonym, %destination, "new session request");
662 msg_sender
663 .send((
664 forward_routing.clone(),
665 ApplicationDataOut::with_no_packet_info(start_session_msg.try_into()?),
666 ))
667 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
668 .await
669 .map_err(|_| {
670 error!(challenge, %pseudonym, %destination, "timeout sending session request message");
671 TransportSessionError::Timeout
672 })?
673 .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?;
674
675 let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
677 self.cfg.initiation_timeout_base,
678 cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
679 )
680 .into();
681
682 pin_mut!(rx_initiation_done);
684
685 trace!(challenge, "awaiting session establishment");
686 match rx_initiation_done.try_next().timeout(initiation_timeout).await {
687 Ok(Ok(Some(est))) => {
688 let session_id = est.session_id;
690 debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
691
692 let (tx, rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
693 let notifier = self
694 .session_notifiers
695 .get()
696 .map(|(_, notifier)| {
697 let mut notifier = notifier.clone();
698 Box::new(move |session_id: SessionId, reason: ClosureReason| {
699 let _ = notifier
700 .try_send((session_id, reason))
701 .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
702 })
703 })
704 .ok_or(SessionManagerError::NotStarted)?;
705
706 if let Some(balancer_config) = cfg.surb_management {
710 let surb_estimator = AtomicSurbFlowEstimator::default();
711
712 let surb_estimator_clone = surb_estimator.clone();
714 let full_surb_scoring_sender =
715 msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
716 surb_estimator_clone.produced.fetch_add(
718 data.estimate_surbs_with_msg() as u64,
719 std::sync::atomic::Ordering::Relaxed,
720 );
721 futures::future::ok::<_, S::Error>((routing, data))
722 });
723
724 let max_out_organic_surbs = cfg.always_max_out_surbs;
727 let reduced_surb_scoring_sender = full_surb_scoring_sender.clone().with(
728 move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
732 if !max_out_organic_surbs {
733 data.packet_info
735 .get_or_insert_with(|| OutgoingPacketInfo {
736 max_surbs_in_packet: 1,
737 ..Default::default()
738 })
739 .max_surbs_in_packet = 1;
740 }
741 futures::future::ok::<_, S::Error>((routing, data))
742 },
743 );
744
745 let mut abort_handles = Vec::new();
746
747 let (ka_controller, ka_abort_handle) =
749 utils::spawn_keep_alive_stream(session_id, full_surb_scoring_sender, forward_routing.clone());
750 abort_handles.push(ka_abort_handle);
751
752 debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
754 let balancer = SurbBalancer::new(
755 session_id,
756 PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
758 surb_estimator.clone(),
759 ka_controller,
760 balancer_config,
761 );
762
763 let (level_stream, balancer_abort_handle) = balancer.start_control_loop(
764 self.cfg.balancer_sampling_interval,
765 SessionCacheBalancerFeedback(self.sessions.clone()),
766 None,
767 );
768 abort_handles.push(balancer_abort_handle);
769
770 self.insert_session_slot(
772 session_id,
773 SessionSlot {
774 session_tx: Arc::new(tx),
775 routing_opts: forward_routing.clone(),
776 abort_handles,
777 surb_mgmt: Some(balancer_config),
778 surb_estimator: None,
779 },
780 )
781 .await?;
782
783 match level_stream
786 .skip_while(|current_level| {
787 futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
788 })
789 .next()
790 .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
791 .await
792 {
793 Ok(Some(surb_level)) => {
794 info!(%session_id, surb_level, "session is ready");
795 }
796 Ok(None) => {
797 return Err(
798 SessionManagerError::Other("surb balancer was cancelled prematurely".into()).into(),
799 );
800 }
801 Err(_) => {
802 warn!(%session_id, "session didn't reach target SURB buffer size in time");
803 }
804 }
805
806 HoprSession::new(
807 session_id,
808 forward_routing,
809 HoprSessionConfig {
810 capabilities: cfg.capabilities,
811 frame_mtu: self.cfg.frame_mtu,
812 frame_timeout: self.cfg.max_frame_timeout,
813 },
814 (
815 reduced_surb_scoring_sender,
816 rx.inspect(move |_| {
817 surb_estimator
820 .consumed
821 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
822 }),
823 ),
824 Some(notifier),
825 )
826 } else {
827 warn!(%session_id, "session ready without SURB balancing");
828
829 self.insert_session_slot(
830 session_id,
831 SessionSlot {
832 session_tx: Arc::new(tx),
833 routing_opts: forward_routing.clone(),
834 abort_handles: vec![],
835 surb_mgmt: None,
836 surb_estimator: None,
837 },
838 )
839 .await?;
840
841 let max_out_organic_surbs = cfg.always_max_out_surbs;
844 let reduced_surb_sender =
845 msg_sender.with(move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
846 if !max_out_organic_surbs {
847 data.packet_info
848 .get_or_insert_with(|| OutgoingPacketInfo {
849 max_surbs_in_packet: 1,
850 ..Default::default()
851 })
852 .max_surbs_in_packet = 1;
853 }
854 futures::future::ok::<_, S::Error>((routing, data))
855 });
856
857 HoprSession::new(
858 session_id,
859 forward_routing,
860 HoprSessionConfig {
861 capabilities: cfg.capabilities,
862 frame_mtu: self.cfg.frame_mtu,
863 frame_timeout: self.cfg.max_frame_timeout,
864 },
865 (reduced_surb_sender, rx),
866 Some(notifier),
867 )
868 }
869 }
870 Ok(Ok(None)) => Err(SessionManagerError::Other(
871 "internal error: sender has been closed without completing the session establishment".into(),
872 )
873 .into()),
874 Ok(Err(error)) => {
875 error!(
877 challenge = error.challenge,
878 ?error,
879 "the other party rejected the session initiation with error"
880 );
881 Err(TransportSessionError::Rejected(error.reason))
882 }
883 Err(_) => {
884 error!(challenge, "session initiation attempt timed out");
886
887 #[cfg(all(feature = "prometheus", not(test)))]
888 METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
889
890 Err(TransportSessionError::Timeout)
891 }
892 }
893 }
894
895 pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
899 if let Some(session_data) = self.sessions.get(id).await {
900 trace!(session_id = ?id, "pinging manually session");
901 Ok(self
902 .msg_sender
903 .get()
904 .cloned()
905 .ok_or(SessionManagerError::NotStarted)?
906 .send((
907 session_data.routing_opts.clone(),
908 ApplicationDataOut::with_no_packet_info(HoprStartProtocol::KeepAlive((*id).into()).try_into()?),
909 ))
910 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
911 .await
912 .map_err(|_| {
913 error!("timeout sending session ping message");
914 TransportSessionError::Timeout
915 })?
916 .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?)
917 } else {
918 Err(SessionManagerError::NonExistingSession.into())
919 }
920 }
921
922 pub async fn active_sessions(&self) -> Vec<SessionId> {
924 self.sessions.run_pending_tasks().await;
925 self.sessions.iter().map(|(k, _)| *k).collect()
926 }
927
928 pub async fn update_surb_balancer_config(
933 &self,
934 id: &SessionId,
935 config: SurbBalancerConfig,
936 ) -> crate::errors::Result<()> {
937 match self
938 .sessions
939 .entry_by_ref(id)
940 .and_compute_with(|entry| {
941 futures::future::ready(if let Some(mut cached_session) = entry.map(|e| e.into_value()) {
942 if cached_session.surb_mgmt.is_some() {
944 cached_session.surb_mgmt = Some(config);
945 moka::ops::compute::Op::Put(cached_session)
946 } else {
947 moka::ops::compute::Op::Nop
948 }
949 } else {
950 moka::ops::compute::Op::Nop
951 })
952 })
953 .await
954 {
955 moka::ops::compute::CompResult::ReplacedWith(_) => Ok(()),
956 moka::ops::compute::CompResult::Unchanged(_) => {
957 Err(SessionManagerError::Other("session does not use SURB balancing".into()).into())
958 }
959 _ => Err(SessionManagerError::NonExistingSession.into()),
960 }
961 }
962
963 pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
967 match self.sessions.get(id).await {
968 Some(session) => Ok(session.surb_mgmt),
969 None => Err(SessionManagerError::NonExistingSession.into()),
970 }
971 }
972
973 pub async fn dispatch_message(
980 &self,
981 pseudonym: HoprPseudonym,
982 in_data: ApplicationDataIn,
983 ) -> crate::errors::Result<DispatchResult> {
984 if in_data.data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
985 trace!("dispatching Start protocol message");
987 return self
988 .handle_start_protocol_message(pseudonym, in_data)
989 .await
990 .map(|_| DispatchResult::Processed);
991 } else if self
992 .cfg
993 .session_tag_range
994 .contains(&in_data.data.application_tag.as_u64())
995 {
996 let session_id = SessionId::new(in_data.data.application_tag, pseudonym);
997
998 return if let Some(session_data) = self.sessions.get(&session_id).await {
999 trace!(?session_id, "received data for a registered session");
1000
1001 Ok(session_data
1002 .session_tx
1003 .unbounded_send(in_data)
1004 .map(|_| DispatchResult::Processed)
1005 .map_err(|e| SessionManagerError::Other(e.to_string()))?)
1006 } else {
1007 error!(%session_id, "received data from an unestablished session");
1008 Err(TransportSessionError::UnknownData)
1009 };
1010 }
1011
1012 trace!(tag = %in_data.data.application_tag, "received data not associated with session protocol or any existing session");
1013 Ok(DispatchResult::Unrelated(in_data))
1014 }
1015
1016 async fn handle_incoming_session_initiation(
1017 &self,
1018 pseudonym: HoprPseudonym,
1019 session_req: StartInitiation<SessionTarget, ByteCapabilities>,
1020 ) -> crate::errors::Result<()> {
1021 trace!(challenge = session_req.challenge, "received session initiation request");
1022
1023 debug!(%pseudonym, "got new session request, searching for a free session slot");
1024
1025 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
1026
1027 let (mut new_session_notifier, mut close_session_notifier) = self
1028 .session_notifiers
1029 .get()
1030 .cloned()
1031 .ok_or(SessionManagerError::NotStarted)?;
1032
1033 let reply_routing = DestinationRouting::Return(pseudonym.into());
1035
1036 let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1037
1038 self.sessions.run_pending_tasks().await; let allocated_slot = if self.cfg.maximum_sessions > self.sessions.entry_count() as usize {
1041 insert_into_next_slot(
1042 &self.sessions,
1043 |sid| {
1044 let next_tag: Tag = match sid {
1047 Some(session_id) => ((session_id.tag().as_u64() + 1) % self.cfg.session_tag_range.end)
1048 .max(self.cfg.session_tag_range.start)
1049 .into(),
1050 None => hopr_crypto_random::random_integer(
1051 self.cfg.session_tag_range.start,
1052 Some(self.cfg.session_tag_range.end),
1053 )
1054 .into(),
1055 };
1056 SessionId::new(next_tag, pseudonym)
1057 },
1058 SessionSlot {
1059 session_tx: Arc::new(tx_session_data),
1060 routing_opts: reply_routing.clone(),
1061 abort_handles: vec![],
1062 surb_mgmt: None,
1063 surb_estimator: None,
1064 },
1065 )
1066 .await
1067 } else {
1068 error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
1069 None
1070 };
1071
1072 if let Some(session_id) = allocated_slot {
1075 debug!(%session_id, ?session_req, "assigned a new session");
1076
1077 let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
1078 if let Err(error) = close_session_notifier.try_send((session_id, reason)) {
1079 error!(%session_id, %error, %reason, "failed to notify session closure");
1080 }
1081 });
1082
1083 let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
1084 let surb_estimator = AtomicSurbFlowEstimator::default();
1085
1086 let egress_rate_control =
1088 RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1089
1090 let target_surb_buffer_size = if session_req.additional_data > 0 {
1093 (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1094 } else {
1095 self.cfg.initial_return_session_egress_rate as u64
1096 * self
1097 .cfg
1098 .minimum_surb_buffer_duration
1099 .max(MIN_SURB_BUFFER_DURATION)
1100 .as_secs()
1101 };
1102
1103 let surb_estimator_clone = surb_estimator.clone();
1104 let session = HoprSession::new(
1105 session_id,
1106 reply_routing.clone(),
1107 HoprSessionConfig {
1108 capabilities: session_req.capabilities.into(),
1109 frame_mtu: self.cfg.frame_mtu,
1110 frame_timeout: self.cfg.max_frame_timeout,
1111 },
1112 (
1113 msg_sender
1115 .clone()
1116 .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1117 surb_estimator_clone
1119 .consumed
1120 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1121 futures::future::ok::<_, S::Error>((routing, data))
1122 })
1123 .rate_limit_with_controller(&egress_rate_control)
1124 .buffer((2 * target_surb_buffer_size) as usize),
1125 rx_session_data.inspect(move |data| {
1127 surb_estimator_clone
1129 .produced
1130 .fetch_add(data.num_surbs_with_msg() as u64, std::sync::atomic::Ordering::Relaxed);
1131 }),
1132 ),
1133 Some(closure_notifier),
1134 )?;
1135
1136 let balancer_config = SurbBalancerConfig {
1140 target_surb_buffer_size,
1141 max_surbs_per_sec: target_surb_buffer_size
1143 / self
1144 .cfg
1145 .minimum_surb_buffer_duration
1146 .max(MIN_SURB_BUFFER_DURATION)
1147 .as_secs(),
1148 surb_decay: None,
1151 };
1152
1153 let (balancer_abort_handle, balancer_abort_reg) = AbortHandle::new_pair();
1155 if let moka::ops::compute::CompResult::ReplacedWith(_) = self
1156 .sessions
1157 .entry(session_id)
1158 .and_compute_with(|entry| {
1159 if let Some(mut cached_session) = entry.map(|c| c.into_value()) {
1160 cached_session.abort_handles.push(balancer_abort_handle);
1161 cached_session.surb_mgmt = Some(balancer_config);
1162 cached_session.surb_estimator = Some(surb_estimator.clone());
1163 futures::future::ready(moka::ops::compute::Op::Put(cached_session))
1164 } else {
1165 futures::future::ready(moka::ops::compute::Op::Nop)
1166 }
1167 })
1168 .await
1169 {
1170 debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1173 let balancer = SurbBalancer::new(
1174 session_id,
1175 if let Some((growth_window, ratio_threshold)) = self.cfg.growable_target_surb_buffer.as_ref() {
1176 SimpleBalancerController::with_increasing_setpoint(
1178 *ratio_threshold,
1179 (growth_window
1180 .div_duration_f64(self.cfg.balancer_sampling_interval)
1181 .round() as usize)
1182 .max(1),
1183 )
1184 } else {
1185 SimpleBalancerController::default()
1186 },
1187 surb_estimator,
1188 SurbControllerWithCorrection(egress_rate_control, 1), balancer_config,
1190 );
1191
1192 let _ = balancer.start_control_loop(
1193 self.cfg.balancer_sampling_interval,
1194 SessionCacheBalancerFeedback(self.sessions.clone()),
1195 Some(balancer_abort_reg),
1196 );
1197 } else {
1198 return Err(SessionManagerError::Other(
1200 "failed to spawn SURB balancer - inconsistent cache".into(),
1201 )
1202 .into());
1203 }
1204
1205 session
1206 } else {
1207 HoprSession::new(
1208 session_id,
1209 reply_routing.clone(),
1210 HoprSessionConfig {
1211 capabilities: session_req.capabilities.into(),
1212 frame_mtu: self.cfg.frame_mtu,
1213 frame_timeout: self.cfg.max_frame_timeout,
1214 },
1215 (msg_sender.clone(), rx_session_data),
1216 Some(closure_notifier),
1217 )?
1218 };
1219
1220 let incoming_session = IncomingSession {
1222 session,
1223 target: session_req.target,
1224 };
1225
1226 match new_session_notifier
1228 .send(incoming_session)
1229 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1230 .await
1231 {
1232 Err(_) => {
1233 error!(%session_id, "timeout to notify about new incoming session");
1234 return Err(TransportSessionError::Timeout);
1235 }
1236 Ok(Err(error)) => {
1237 error!(%session_id, %error, "failed to notify about new incoming session");
1238 return Err(SessionManagerError::Other(error.to_string()).into());
1239 }
1240 _ => {}
1241 };
1242
1243 trace!(?session_id, "session notification sent");
1244
1245 let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1248 orig_challenge: session_req.challenge,
1249 session_id,
1250 });
1251
1252 msg_sender
1253 .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1254 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1255 .await
1256 .map_err(|_| {
1257 error!(%session_id, "timeout sending session establishment message");
1258 TransportSessionError::Timeout
1259 })?
1260 .map_err(|e| {
1261 SessionManagerError::Other(format!(
1262 "failed to send session {session_id} establishment message: {e}"
1263 ))
1264 })?;
1265
1266 info!(%session_id, "new session established");
1267
1268 #[cfg(all(feature = "prometheus", not(test)))]
1269 {
1270 METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1271 METRIC_ACTIVE_SESSIONS.increment(1.0);
1272 }
1273 } else {
1274 error!(%pseudonym,"failed to reserve a new session slot");
1275
1276 let reason = StartErrorReason::NoSlotsAvailable;
1278 let data = HoprStartProtocol::SessionError(StartErrorType {
1279 challenge: session_req.challenge,
1280 reason,
1281 });
1282
1283 msg_sender
1284 .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1285 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1286 .await
1287 .map_err(|_| {
1288 error!("timeout sending session error message");
1289 TransportSessionError::Timeout
1290 })?
1291 .map_err(|e| {
1292 SessionManagerError::Other(format!("failed to send session establishment error message: {e}"))
1293 })?;
1294
1295 trace!(%pseudonym, "session establishment failure message sent");
1296
1297 #[cfg(all(feature = "prometheus", not(test)))]
1298 METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1299 }
1300
1301 Ok(())
1302 }
1303
1304 async fn handle_start_protocol_message(
1305 &self,
1306 pseudonym: HoprPseudonym,
1307 data: ApplicationDataIn,
1308 ) -> crate::errors::Result<()> {
1309 match HoprStartProtocol::try_from(data.data)? {
1310 HoprStartProtocol::StartSession(session_req) => {
1311 self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1312 }
1313 HoprStartProtocol::SessionEstablished(est) => {
1314 trace!(
1315 session_id = ?est.session_id,
1316 "received session establishment confirmation"
1317 );
1318 let challenge = est.orig_challenge;
1319 let session_id = est.session_id;
1320 if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1321 if let Err(e) = tx_est.unbounded_send(Ok(est)) {
1322 return Err(SessionManagerError::Other(format!(
1323 "could not notify session {session_id} establishment: {e}"
1324 ))
1325 .into());
1326 }
1327 debug!(?session_id, challenge, "session establishment complete");
1328 } else {
1329 error!(%session_id, challenge, "unknown session establishment attempt or expired");
1330 }
1331 }
1332 HoprStartProtocol::SessionError(error) => {
1333 trace!(
1334 challenge = error.challenge,
1335 error = ?error.reason,
1336 "failed to initialize a session",
1337 );
1338 if let Some(tx_est) = self.session_initiations.remove(&error.challenge).await {
1341 if let Err(e) = tx_est.unbounded_send(Err(error)) {
1342 return Err(SessionManagerError::Other(format!(
1343 "could not notify session establishment error {error:?}: {e}"
1344 ))
1345 .into());
1346 }
1347 error!(
1348 challenge = error.challenge,
1349 ?error,
1350 "session establishment error received"
1351 );
1352 } else {
1353 error!(
1354 challenge = error.challenge,
1355 ?error,
1356 "session establishment attempt expired before error could be delivered"
1357 );
1358 }
1359
1360 #[cfg(all(feature = "prometheus", not(test)))]
1361 METRIC_RECEIVED_SESSION_ERRS.increment(&[&error.reason.to_string()])
1362 }
1363 HoprStartProtocol::KeepAlive(msg) => {
1364 let session_id = msg.session_id;
1365 if let Some(session_slot) = self.sessions.get(&session_id).await {
1366 trace!(?session_id, "received keep-alive request");
1367 if let Some(estimator) = session_slot.surb_estimator.as_ref() {
1370 estimator.produced.fetch_add(
1372 KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64,
1373 std::sync::atomic::Ordering::Relaxed,
1374 );
1375 }
1376 } else {
1377 debug!(%session_id, "received keep-alive request for an unknown session");
1378 }
1379 }
1380 }
1381
1382 Ok(())
1383 }
1384}
1385
1386#[cfg(test)]
1387mod tests {
1388 use anyhow::anyhow;
1389 use futures::{AsyncWriteExt, future::BoxFuture};
1390 use hopr_crypto_random::Randomizable;
1391 use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
1392 use hopr_primitive_types::prelude::Address;
1393 use hopr_protocol_start::StartProtocolDiscriminants;
1394 use tokio::time::timeout;
1395
1396 use super::*;
1397 use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1398
1399 #[async_trait::async_trait]
1400 trait SendMsg {
1401 async fn send_message(
1402 &self,
1403 routing: DestinationRouting,
1404 data: ApplicationDataOut,
1405 ) -> crate::errors::Result<()>;
1406 }
1407
1408 mockall::mock! {
1409 MsgSender {}
1410 impl SendMsg for MsgSender {
1411 fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationDataOut)
1412 -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1413 }
1414 }
1415
1416 fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationDataOut)> {
1417 let (tx, rx) = futures::channel::mpsc::unbounded();
1418 tokio::task::spawn(async move {
1419 pin_mut!(rx);
1420 while let Some((routing, data)) = rx.next().await {
1421 sender
1422 .send_message(routing, data)
1423 .await
1424 .expect("send message must not fail in mock");
1425 }
1426 });
1427 tx
1428 }
1429
1430 fn msg_type(data: &ApplicationDataOut, expected: StartProtocolDiscriminants) -> bool {
1431 HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1432 .map(|d| StartProtocolDiscriminants::from(d) == expected)
1433 .unwrap_or(false)
1434 }
1435
1436 #[test_log::test(tokio::test)]
1437 async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1438 {
1439 let alice_pseudonym = HoprPseudonym::random();
1440 let bob_peer: Address = (&ChainKeypair::random()).into();
1441
1442 let alice_mgr = SessionManager::new(Default::default());
1443 let bob_mgr = SessionManager::new(Default::default());
1444
1445 let mut sequence = mockall::Sequence::new();
1446 let mut alice_transport = MockMsgSender::new();
1447 let mut bob_transport = MockMsgSender::new();
1448
1449 let bob_mgr_clone = bob_mgr.clone();
1451 alice_transport
1452 .expect_send_message()
1453 .once()
1454 .in_sequence(&mut sequence)
1455 .withf(move |peer, data| {
1456 info!("alice sends {}", data.data.application_tag);
1457 msg_type(data, StartProtocolDiscriminants::StartSession)
1458 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1459 })
1460 .returning(move |_, data| {
1461 let bob_mgr_clone = bob_mgr_clone.clone();
1462 Box::pin(async move {
1463 bob_mgr_clone
1464 .dispatch_message(
1465 alice_pseudonym,
1466 ApplicationDataIn {
1467 data: data.data,
1468 packet_info: Default::default(),
1469 },
1470 )
1471 .await?;
1472 Ok(())
1473 })
1474 });
1475
1476 let alice_mgr_clone = alice_mgr.clone();
1478 bob_transport
1479 .expect_send_message()
1480 .once()
1481 .in_sequence(&mut sequence)
1482 .withf(move |peer, data| {
1483 info!("bob sends {}", data.data.application_tag);
1484 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1485 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1486 })
1487 .returning(move |_, data| {
1488 let alice_mgr_clone = alice_mgr_clone.clone();
1489
1490 Box::pin(async move {
1491 alice_mgr_clone
1492 .dispatch_message(
1493 alice_pseudonym,
1494 ApplicationDataIn {
1495 data: data.data,
1496 packet_info: Default::default(),
1497 },
1498 )
1499 .await?;
1500 Ok(())
1501 })
1502 });
1503
1504 let bob_mgr_clone = bob_mgr.clone();
1506 alice_transport
1507 .expect_send_message()
1508 .once()
1509 .in_sequence(&mut sequence)
1510 .withf(move |peer, data| {
1511 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1512 data.data.plain_text.as_ref(),
1513 )
1514 .expect("must be a session message")
1515 .try_as_segment()
1516 .expect("must be a segment")
1517 .is_terminating()
1518 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1519 })
1520 .returning(move |_, data| {
1521 let bob_mgr_clone = bob_mgr_clone.clone();
1522 Box::pin(async move {
1523 bob_mgr_clone
1524 .dispatch_message(
1525 alice_pseudonym,
1526 ApplicationDataIn {
1527 data: data.data,
1528 packet_info: Default::default(),
1529 },
1530 )
1531 .await?;
1532 Ok(())
1533 })
1534 });
1535
1536 let mut ahs = Vec::new();
1537
1538 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1540 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1541
1542 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1544 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1545
1546 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1547
1548 pin_mut!(new_session_rx_bob);
1549 let (alice_session, bob_session) = timeout(
1550 Duration::from_secs(2),
1551 futures::future::join(
1552 alice_mgr.new_session(
1553 bob_peer,
1554 SessionTarget::TcpStream(target.clone()),
1555 SessionClientConfig {
1556 pseudonym: alice_pseudonym.into(),
1557 capabilities: Capability::NoRateControl | Capability::Segmentation,
1558 surb_management: None,
1559 ..Default::default()
1560 },
1561 ),
1562 new_session_rx_bob.next(),
1563 ),
1564 )
1565 .await?;
1566
1567 let mut alice_session = alice_session?;
1568 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1569
1570 assert_eq!(
1571 alice_session.config().capabilities,
1572 Capability::Segmentation | Capability::NoRateControl
1573 );
1574 assert_eq!(
1575 alice_session.config().capabilities,
1576 bob_session.session.config().capabilities
1577 );
1578 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1579
1580 assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1581 assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1582 assert!(
1583 alice_mgr
1584 .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1585 .await
1586 .is_err()
1587 );
1588
1589 assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1590 assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1591 assert!(
1592 bob_mgr
1593 .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1594 .await
1595 .is_err()
1596 );
1597
1598 tokio::time::sleep(Duration::from_millis(100)).await;
1599 alice_session.close().await?;
1600
1601 tokio::time::sleep(Duration::from_millis(100)).await;
1602
1603 assert!(matches!(
1604 alice_mgr.ping_session(alice_session.id()).await,
1605 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1606 ));
1607
1608 futures::stream::iter(ahs)
1609 .for_each(|ah| async move { ah.abort() })
1610 .await;
1611
1612 Ok(())
1613 }
1614
1615 #[test_log::test(tokio::test)]
1616 async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1617 let alice_pseudonym = HoprPseudonym::random();
1618 let bob_peer: Address = (&ChainKeypair::random()).into();
1619
1620 let cfg = SessionManagerConfig {
1621 idle_timeout: Duration::from_millis(200),
1622 ..Default::default()
1623 };
1624
1625 let alice_mgr = SessionManager::new(cfg);
1626 let bob_mgr = SessionManager::new(Default::default());
1627
1628 let mut sequence = mockall::Sequence::new();
1629 let mut alice_transport = MockMsgSender::new();
1630 let mut bob_transport = MockMsgSender::new();
1631
1632 let bob_mgr_clone = bob_mgr.clone();
1634 alice_transport
1635 .expect_send_message()
1636 .once()
1637 .in_sequence(&mut sequence)
1638 .withf(move |peer, data| {
1639 msg_type(data, StartProtocolDiscriminants::StartSession)
1640 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1641 })
1642 .returning(move |_, data| {
1643 let bob_mgr_clone = bob_mgr_clone.clone();
1644 Box::pin(async move {
1645 bob_mgr_clone
1646 .dispatch_message(
1647 alice_pseudonym,
1648 ApplicationDataIn {
1649 data: data.data,
1650 packet_info: Default::default(),
1651 },
1652 )
1653 .await?;
1654 Ok(())
1655 })
1656 });
1657
1658 let alice_mgr_clone = alice_mgr.clone();
1660 bob_transport
1661 .expect_send_message()
1662 .once()
1663 .in_sequence(&mut sequence)
1664 .withf(move |peer, data| {
1665 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1666 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1667 })
1668 .returning(move |_, data| {
1669 let alice_mgr_clone = alice_mgr_clone.clone();
1670
1671 Box::pin(async move {
1672 alice_mgr_clone
1673 .dispatch_message(
1674 alice_pseudonym,
1675 ApplicationDataIn {
1676 data: data.data,
1677 packet_info: Default::default(),
1678 },
1679 )
1680 .await?;
1681 Ok(())
1682 })
1683 });
1684
1685 let mut ahs = Vec::new();
1686
1687 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1689 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1690
1691 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1693 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1694
1695 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1696
1697 pin_mut!(new_session_rx_bob);
1698 let (alice_session, bob_session) = timeout(
1699 Duration::from_secs(2),
1700 futures::future::join(
1701 alice_mgr.new_session(
1702 bob_peer,
1703 SessionTarget::TcpStream(target.clone()),
1704 SessionClientConfig {
1705 pseudonym: alice_pseudonym.into(),
1706 capabilities: Capability::NoRateControl | Capability::Segmentation,
1707 surb_management: None,
1708 ..Default::default()
1709 },
1710 ),
1711 new_session_rx_bob.next(),
1712 ),
1713 )
1714 .await?;
1715
1716 let alice_session = alice_session?;
1717 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1718
1719 assert_eq!(
1720 alice_session.config().capabilities,
1721 Capability::Segmentation | Capability::NoRateControl,
1722 );
1723 assert_eq!(
1724 alice_session.config().capabilities,
1725 bob_session.session.config().capabilities
1726 );
1727 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1728
1729 tokio::time::sleep(Duration::from_millis(300)).await;
1731
1732 assert!(matches!(
1733 alice_mgr.ping_session(alice_session.id()).await,
1734 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1735 ));
1736
1737 futures::stream::iter(ahs)
1738 .for_each(|ah| async move { ah.abort() })
1739 .await;
1740
1741 Ok(())
1742 }
1743
1744 #[test_log::test(tokio::test)]
1745 async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1746 let alice_pseudonym = HoprPseudonym::random();
1747 let session_id = SessionId::new(16u64, alice_pseudonym);
1748 let balancer_cfg = SurbBalancerConfig {
1749 target_surb_buffer_size: 1000,
1750 max_surbs_per_sec: 100,
1751 ..Default::default()
1752 };
1753
1754 let alice_mgr = SessionManager::<
1755 UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
1756 futures::channel::mpsc::Sender<IncomingSession>,
1757 >::new(Default::default());
1758
1759 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1760 alice_mgr
1761 .sessions
1762 .insert(
1763 session_id,
1764 SessionSlot {
1765 session_tx: Arc::new(dummy_tx),
1766 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1767 abort_handles: Vec::new(),
1768 surb_mgmt: Some(balancer_cfg.clone()),
1769 surb_estimator: None,
1770 },
1771 )
1772 .await;
1773
1774 let actual_cfg = alice_mgr
1775 .get_surb_balancer_config(&session_id)
1776 .await?
1777 .ok_or(anyhow!("session must have a surb balancer config"))?;
1778 assert_eq!(actual_cfg, balancer_cfg);
1779
1780 let new_cfg = SurbBalancerConfig {
1781 target_surb_buffer_size: 2000,
1782 max_surbs_per_sec: 200,
1783 ..Default::default()
1784 };
1785 alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1786
1787 let actual_cfg = alice_mgr
1788 .get_surb_balancer_config(&session_id)
1789 .await?
1790 .ok_or(anyhow!("session must have a surb balancer config"))?;
1791 assert_eq!(actual_cfg, new_cfg);
1792
1793 Ok(())
1794 }
1795
1796 #[test_log::test(tokio::test)]
1797 async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1798 let alice_pseudonym = HoprPseudonym::random();
1799 let bob_peer: Address = (&ChainKeypair::random()).into();
1800
1801 let cfg = SessionManagerConfig {
1802 session_tag_range: 16u64..17u64, ..Default::default()
1804 };
1805
1806 let alice_mgr = SessionManager::new(Default::default());
1807 let bob_mgr = SessionManager::new(cfg);
1808
1809 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1811 bob_mgr
1812 .sessions
1813 .insert(
1814 SessionId::new(16u64, alice_pseudonym),
1815 SessionSlot {
1816 session_tx: Arc::new(dummy_tx),
1817 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1818 abort_handles: Vec::new(),
1819 surb_mgmt: None,
1820 surb_estimator: None,
1821 },
1822 )
1823 .await;
1824
1825 let mut sequence = mockall::Sequence::new();
1826 let mut alice_transport = MockMsgSender::new();
1827 let mut bob_transport = MockMsgSender::new();
1828
1829 let bob_mgr_clone = bob_mgr.clone();
1831 alice_transport
1832 .expect_send_message()
1833 .once()
1834 .in_sequence(&mut sequence)
1835 .withf(move |peer, data| {
1836 msg_type(data, StartProtocolDiscriminants::StartSession)
1837 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1838 })
1839 .returning(move |_, data| {
1840 let bob_mgr_clone = bob_mgr_clone.clone();
1841 Box::pin(async move {
1842 bob_mgr_clone
1843 .dispatch_message(
1844 alice_pseudonym,
1845 ApplicationDataIn {
1846 data: data.data,
1847 packet_info: Default::default(),
1848 },
1849 )
1850 .await?;
1851 Ok(())
1852 })
1853 });
1854
1855 let alice_mgr_clone = alice_mgr.clone();
1857 bob_transport
1858 .expect_send_message()
1859 .once()
1860 .in_sequence(&mut sequence)
1861 .withf(move |peer, data| {
1862 msg_type(data, StartProtocolDiscriminants::SessionError)
1863 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1864 })
1865 .returning(move |_, data| {
1866 let alice_mgr_clone = alice_mgr_clone.clone();
1867 Box::pin(async move {
1868 alice_mgr_clone
1869 .dispatch_message(
1870 alice_pseudonym,
1871 ApplicationDataIn {
1872 data: data.data,
1873 packet_info: Default::default(),
1874 },
1875 )
1876 .await?;
1877 Ok(())
1878 })
1879 });
1880
1881 let mut jhs = Vec::new();
1882
1883 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1885 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1886
1887 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
1889 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1890
1891 let result = alice_mgr
1892 .new_session(
1893 bob_peer,
1894 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1895 SessionClientConfig {
1896 capabilities: Capabilities::empty(),
1897 pseudonym: alice_pseudonym.into(),
1898 surb_management: None,
1899 ..Default::default()
1900 },
1901 )
1902 .await;
1903
1904 assert!(
1905 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1906 );
1907
1908 Ok(())
1909 }
1910
1911 #[test_log::test(tokio::test)]
1912 async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
1913 -> anyhow::Result<()> {
1914 let alice_pseudonym = HoprPseudonym::random();
1915 let bob_peer: Address = (&ChainKeypair::random()).into();
1916
1917 let cfg = SessionManagerConfig {
1918 maximum_sessions: 1,
1919 ..Default::default()
1920 };
1921
1922 let alice_mgr = SessionManager::new(Default::default());
1923 let bob_mgr = SessionManager::new(cfg);
1924
1925 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1927 bob_mgr
1928 .sessions
1929 .insert(
1930 SessionId::new(16u64, alice_pseudonym),
1931 SessionSlot {
1932 session_tx: Arc::new(dummy_tx),
1933 routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
1934 abort_handles: Vec::new(),
1935 surb_mgmt: None,
1936 surb_estimator: None,
1937 },
1938 )
1939 .await;
1940
1941 let mut sequence = mockall::Sequence::new();
1942 let mut alice_transport = MockMsgSender::new();
1943 let mut bob_transport = MockMsgSender::new();
1944
1945 let bob_mgr_clone = bob_mgr.clone();
1947 alice_transport
1948 .expect_send_message()
1949 .once()
1950 .in_sequence(&mut sequence)
1951 .withf(move |peer, data| {
1952 msg_type(data, StartProtocolDiscriminants::StartSession)
1953 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1954 })
1955 .returning(move |_, data| {
1956 let bob_mgr_clone = bob_mgr_clone.clone();
1957 Box::pin(async move {
1958 bob_mgr_clone
1959 .dispatch_message(
1960 alice_pseudonym,
1961 ApplicationDataIn {
1962 data: data.data,
1963 packet_info: Default::default(),
1964 },
1965 )
1966 .await?;
1967 Ok(())
1968 })
1969 });
1970
1971 let alice_mgr_clone = alice_mgr.clone();
1973 bob_transport
1974 .expect_send_message()
1975 .once()
1976 .in_sequence(&mut sequence)
1977 .withf(move |peer, data| {
1978 msg_type(data, StartProtocolDiscriminants::SessionError)
1979 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1980 })
1981 .returning(move |_, data| {
1982 let alice_mgr_clone = alice_mgr_clone.clone();
1983 Box::pin(async move {
1984 alice_mgr_clone
1985 .dispatch_message(
1986 alice_pseudonym,
1987 ApplicationDataIn {
1988 data: data.data,
1989 packet_info: Default::default(),
1990 },
1991 )
1992 .await?;
1993 Ok(())
1994 })
1995 });
1996
1997 let mut jhs = Vec::new();
1998
1999 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2001 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2002
2003 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2005 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2006
2007 let result = alice_mgr
2008 .new_session(
2009 bob_peer,
2010 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2011 SessionClientConfig {
2012 capabilities: None.into(),
2013 pseudonym: alice_pseudonym.into(),
2014 surb_management: None,
2015 ..Default::default()
2016 },
2017 )
2018 .await;
2019
2020 assert!(
2021 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2022 );
2023
2024 Ok(())
2025 }
2026
2027 #[test_log::test(tokio::test)]
2028 async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
2029 let alice_pseudonym = HoprPseudonym::random();
2030 let bob_peer: Address = (&ChainKeypair::random()).into();
2031
2032 let alice_mgr = SessionManager::new(Default::default());
2033
2034 let mut sequence = mockall::Sequence::new();
2035 let mut alice_transport = MockMsgSender::new();
2036
2037 let alice_mgr_clone = alice_mgr.clone();
2039 alice_transport
2040 .expect_send_message()
2041 .once()
2042 .in_sequence(&mut sequence)
2043 .withf(move |peer, data| {
2044 msg_type(data, StartProtocolDiscriminants::StartSession)
2045 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2046 })
2047 .returning(move |_, data| {
2048 let alice_mgr_clone = alice_mgr_clone.clone();
2050 Box::pin(async move {
2051 alice_mgr_clone
2052 .dispatch_message(
2053 alice_pseudonym,
2054 ApplicationDataIn {
2055 data: data.data,
2056 packet_info: Default::default(),
2057 },
2058 )
2059 .await?;
2060 Ok(())
2061 })
2062 });
2063
2064 let alice_mgr_clone = alice_mgr.clone();
2066 alice_transport
2067 .expect_send_message()
2068 .once()
2069 .in_sequence(&mut sequence)
2070 .withf(move |peer, data| {
2071 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2072 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2073 })
2074 .returning(move |_, data| {
2075 let alice_mgr_clone = alice_mgr_clone.clone();
2076
2077 Box::pin(async move {
2078 alice_mgr_clone
2079 .dispatch_message(
2080 alice_pseudonym,
2081 ApplicationDataIn {
2082 data: data.data,
2083 packet_info: Default::default(),
2084 },
2085 )
2086 .await?;
2087 Ok(())
2088 })
2089 });
2090
2091 let (new_session_tx_alice, new_session_rx_alice) = futures::channel::mpsc::channel(1024);
2093 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2094
2095 let alice_session = alice_mgr
2096 .new_session(
2097 bob_peer,
2098 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2099 SessionClientConfig {
2100 capabilities: None.into(),
2101 pseudonym: alice_pseudonym.into(),
2102 surb_management: None,
2103 ..Default::default()
2104 },
2105 )
2106 .await;
2107
2108 println!("{alice_session:?}");
2109 assert!(matches!(
2110 alice_session,
2111 Err(TransportSessionError::Manager(SessionManagerError::Loopback))
2112 ));
2113
2114 drop(new_session_rx_alice);
2115 Ok(())
2116 }
2117
2118 #[test_log::test(tokio::test)]
2119 async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
2120 let bob_peer: Address = (&ChainKeypair::random()).into();
2121
2122 let cfg = SessionManagerConfig {
2123 initiation_timeout_base: Duration::from_millis(100),
2124 ..Default::default()
2125 };
2126
2127 let alice_mgr = SessionManager::new(cfg);
2128 let bob_mgr = SessionManager::new(Default::default());
2129
2130 let mut sequence = mockall::Sequence::new();
2131 let mut alice_transport = MockMsgSender::new();
2132 let bob_transport = MockMsgSender::new();
2133
2134 alice_transport
2136 .expect_send_message()
2137 .once()
2138 .in_sequence(&mut sequence)
2139 .withf(move |peer, data| {
2140 msg_type(data, StartProtocolDiscriminants::StartSession)
2141 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2142 })
2143 .returning(|_, _| Box::pin(async { Ok(()) }));
2144
2145 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2147 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2148
2149 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2151 bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
2152
2153 let result = alice_mgr
2154 .new_session(
2155 bob_peer,
2156 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2157 SessionClientConfig {
2158 capabilities: None.into(),
2159 pseudonym: None,
2160 surb_management: None,
2161 ..Default::default()
2162 },
2163 )
2164 .await;
2165
2166 assert!(matches!(result, Err(TransportSessionError::Timeout)));
2167
2168 Ok(())
2169 }
2170
2171 #[test_log::test(tokio::test)]
2172 async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
2173 let alice_pseudonym = HoprPseudonym::random();
2174 let bob_peer: Address = (&ChainKeypair::random()).into();
2175
2176 let bob_cfg = SessionManagerConfig::default();
2177 let alice_mgr = SessionManager::new(Default::default());
2178 let bob_mgr = SessionManager::new(bob_cfg.clone());
2179
2180 let mut alice_transport = MockMsgSender::new();
2181 let mut bob_transport = MockMsgSender::new();
2182
2183 let mut open_sequence = mockall::Sequence::new();
2185 let bob_mgr_clone = bob_mgr.clone();
2186 alice_transport
2187 .expect_send_message()
2188 .once()
2189 .in_sequence(&mut open_sequence)
2190 .withf(move |peer, data| {
2191 msg_type(data, StartProtocolDiscriminants::StartSession)
2192 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2193 })
2194 .returning(move |_, data| {
2195 let bob_mgr_clone = bob_mgr_clone.clone();
2196 Box::pin(async move {
2197 bob_mgr_clone
2198 .dispatch_message(
2199 alice_pseudonym,
2200 ApplicationDataIn {
2201 data: data.data,
2202 packet_info: Default::default(),
2203 },
2204 )
2205 .await?;
2206 Ok(())
2207 })
2208 });
2209
2210 let alice_mgr_clone = alice_mgr.clone();
2212 bob_transport
2213 .expect_send_message()
2214 .once()
2215 .in_sequence(&mut open_sequence)
2216 .withf(move |peer, data| {
2217 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2218 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2219 })
2220 .returning(move |_, data| {
2221 let alice_mgr_clone = alice_mgr_clone.clone();
2222 Box::pin(async move {
2223 alice_mgr_clone
2224 .dispatch_message(
2225 alice_pseudonym,
2226 ApplicationDataIn {
2227 data: data.data,
2228 packet_info: Default::default(),
2229 },
2230 )
2231 .await?;
2232 Ok(())
2233 })
2234 });
2235
2236 let bob_mgr_clone = bob_mgr.clone();
2238 alice_transport
2239 .expect_send_message()
2240 .times(5..)
2241 .withf(move |peer, data| {
2243 msg_type(data, StartProtocolDiscriminants::KeepAlive)
2244 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2245 })
2246 .returning(move |_, data| {
2247 let bob_mgr_clone = bob_mgr_clone.clone();
2248 Box::pin(async move {
2249 bob_mgr_clone
2250 .dispatch_message(
2251 alice_pseudonym,
2252 ApplicationDataIn {
2253 data: data.data,
2254 packet_info: Default::default(),
2255 },
2256 )
2257 .await?;
2258 Ok(())
2259 })
2260 });
2261
2262 let bob_mgr_clone = bob_mgr.clone();
2264 alice_transport
2265 .expect_send_message()
2266 .once()
2267 .withf(move |peer, data| {
2269 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2270 data.data.plain_text.as_ref(),
2271 )
2272 .ok()
2273 .and_then(|m| m.try_as_segment())
2274 .map(|s| s.is_terminating())
2275 .unwrap_or(false)
2276 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2277 })
2278 .returning(move |_, data| {
2279 let bob_mgr_clone = bob_mgr_clone.clone();
2280 Box::pin(async move {
2281 bob_mgr_clone
2282 .dispatch_message(
2283 alice_pseudonym,
2284 ApplicationDataIn {
2285 data: data.data,
2286 packet_info: Default::default(),
2287 },
2288 )
2289 .await?;
2290 Ok(())
2291 })
2292 });
2293
2294 let mut ahs = Vec::new();
2295
2296 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2298 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2299
2300 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
2302 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2303
2304 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2305
2306 let balancer_cfg = SurbBalancerConfig {
2307 target_surb_buffer_size: 10,
2308 max_surbs_per_sec: 100,
2309 ..Default::default()
2310 };
2311
2312 pin_mut!(new_session_rx_bob);
2313 let (alice_session, bob_session) = timeout(
2314 Duration::from_secs(2),
2315 futures::future::join(
2316 alice_mgr.new_session(
2317 bob_peer,
2318 SessionTarget::TcpStream(target.clone()),
2319 SessionClientConfig {
2320 pseudonym: alice_pseudonym.into(),
2321 capabilities: Capability::Segmentation.into(),
2322 surb_management: Some(balancer_cfg),
2323 ..Default::default()
2324 },
2325 ),
2326 new_session_rx_bob.next(),
2327 ),
2328 )
2329 .await?;
2330
2331 let mut alice_session = alice_session?;
2332 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2333
2334 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2335
2336 assert_eq!(
2337 Some(balancer_cfg),
2338 alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2339 );
2340
2341 let remote_cfg = bob_mgr
2342 .get_surb_balancer_config(bob_session.session.id())
2343 .await?
2344 .ok_or(anyhow!("no remote config at bob"))?;
2345 assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2346 assert_eq!(
2347 remote_cfg.max_surbs_per_sec,
2348 remote_cfg.target_surb_buffer_size
2349 / bob_cfg
2350 .minimum_surb_buffer_duration
2351 .max(MIN_SURB_BUFFER_DURATION)
2352 .as_secs()
2353 );
2354
2355 tokio::time::sleep(Duration::from_millis(1500)).await;
2357 alice_session.close().await?;
2358
2359 tokio::time::sleep(Duration::from_millis(300)).await;
2360 assert!(matches!(
2361 alice_mgr.ping_session(alice_session.id()).await,
2362 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2363 ));
2364
2365 futures::stream::iter(ahs)
2366 .for_each(|ah| async move { ah.abort() })
2367 .await;
2368
2369 Ok(())
2370 }
2371}