1use std::{
2 ops::Range,
3 sync::{Arc, OnceLock},
4 time::Duration,
5};
6
7use anyhow::anyhow;
8use futures::{
9 FutureExt, SinkExt, StreamExt, TryStreamExt,
10 channel::mpsc::{Sender, UnboundedSender},
11 future::AbortHandle,
12 pin_mut,
13};
14use futures_time::future::FutureExt as TimeExt;
15use hopr_async_runtime::AbortableList;
16use hopr_crypto_packet::prelude::HoprPacket;
17use hopr_protocol_app::prelude::*;
18use hopr_protocol_start::{
19 KeepAliveFlag, KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished,
20 StartInitiation,
21};
22use hopr_transport_tag_allocator::{AllocatedTag, TagAllocator};
23use hopr_types::{
24 crypto_random::Randomizable,
25 internal::{
26 prelude::HoprPseudonym,
27 routing::{DestinationRouting, RoutingOptions},
28 },
29 primitive::prelude::Address,
30};
31use tracing::{debug, error, info, trace, warn};
32
33#[cfg(feature = "telemetry")]
34use crate::telemetry::{
35 SessionLifecycleState, initialize_session_metrics, remove_session_metrics_state, set_session_balancer_data,
36 set_session_state,
37};
38use crate::{
39 Capability, HoprSession, IncomingSession, SESSION_MTU, SessionClientConfig, SessionId, SessionTarget,
40 SurbBalancerConfig,
41 balancer::{
42 AtomicSurbFlowEstimator, BalancerStateValues, RateController, RateLimitSinkExt, SurbBalancer,
43 SurbControllerWithCorrection,
44 pid::{PidBalancerController, PidControllerGains},
45 simple::SimpleBalancerController,
46 },
47 errors::{SessionManagerError, TransportSessionError},
48 types::{ByteCapabilities, ClosureReason, HoprSessionConfig, HoprStartProtocol},
49 utils,
50 utils::{SurbNotificationMode, insert_into_next_slot},
51};
52
53#[cfg(all(feature = "telemetry", not(test)))]
54lazy_static::lazy_static! {
55 static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
56 "hopr_session_num_active_sessions",
57 "Number of currently active HOPR sessions"
58 ).unwrap();
59 static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
60 "hopr_session_established_sessions_count",
61 "Number of sessions that were successfully established as an Exit node"
62 ).unwrap();
63 static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
64 "hopr_session_initiated_sessions_count",
65 "Number of sessions that were successfully initiated as an Entry node"
66 ).unwrap();
67 static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
68 "hopr_session_received_error_count",
69 "Number of HOPR session errors received from an Exit node",
70 &["kind"]
71 ).unwrap();
72 static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
73 "hopr_session_sent_error_count",
74 "Number of HOPR session errors sent to an Entry node",
75 &["kind"]
76 ).unwrap();
77}
78
79fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
80 debug!(?session_id, ?reason, "closing session");
81
82 #[cfg(feature = "telemetry")]
83 {
84 set_session_state(&session_id, SessionLifecycleState::Closed);
85 remove_session_metrics_state(&session_id);
86 }
87
88 if reason != ClosureReason::EmptyRead {
89 session_data.session_tx.close_channel();
91 trace!(?session_id, "data tx channel closed on session");
92 }
93
94 session_data.abort_handles.lock().abort_all();
96
97 #[cfg(all(feature = "telemetry", not(test)))]
98 METRIC_ACTIVE_SESSIONS.decrement(1.0);
99}
100
101fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
102 base * (hops as u32)
103}
104
105pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
107pub const MIN_SURB_BUFFER_NOTIFICATION_PERIOD: Duration = Duration::from_secs(1);
109
110pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
112
113const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
115
116const MIN_FRAME_TIMEOUT: Duration = Duration::from_millis(10);
118
119type SessionInitiationCache =
123 moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
124
125#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
126enum SessionTasks {
127 KeepAlive,
128 Balancer,
129}
130
131#[derive(Clone)]
132struct SessionSlot {
133 session_tx: Arc<UnboundedSender<ApplicationDataIn>>,
136 routing_opts: DestinationRouting,
137 abort_handles: Arc<parking_lot::Mutex<AbortableList<SessionTasks>>>,
139 surb_mgmt: Arc<BalancerStateValues>,
142 surb_estimator: AtomicSurbFlowEstimator,
145 #[allow(dead_code, reason = "kept alive for Drop-based tag deallocation")]
148 allocated_tag: Option<Arc<AllocatedTag>>,
149}
150
151#[derive(Clone, Debug, PartialEq, Eq)]
153pub enum DispatchResult {
154 Processed,
156 Unrelated(ApplicationDataIn),
158}
159
160#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
162pub struct SessionManagerConfig {
163 #[default(1500)]
167 pub frame_mtu: usize,
168
169 #[default(Duration::from_millis(800))]
173 pub max_frame_timeout: Duration,
174
175 #[default(Duration::from_millis(500))]
182 pub initiation_timeout_base: Duration,
183
184 #[default(Duration::from_secs(180))]
188 pub idle_timeout: Duration,
189
190 #[default(Duration::from_millis(100))]
195 pub balancer_sampling_interval: Duration,
196
197 #[default(10)]
203 pub initial_return_session_egress_rate: usize,
204
205 #[default(Duration::from_secs(5))]
216 pub minimum_surb_buffer_duration: Duration,
217
218 #[default(10_000)]
226 pub maximum_surb_buffer_size: usize,
227
228 #[default(None)]
236 pub surb_balance_notify_period: Option<Duration>,
237
238 #[default(true)]
245 pub surb_target_notify: bool,
246}
247
248pub struct SessionManager<S, T> {
389 session_initiations: SessionInitiationCache,
390 #[allow(clippy::type_complexity)]
391 session_notifiers: Arc<OnceLock<(T, Sender<(SessionId, ClosureReason)>)>>,
392 sessions: moka::future::Cache<SessionId, SessionSlot>,
393 msg_sender: Arc<OnceLock<S>>,
394 tag_allocator: Arc<dyn TagAllocator + Send + Sync>,
395 session_tag_range: Range<u64>,
397 maximum_sessions: usize,
399 cfg: SessionManagerConfig,
400}
401
402impl<S, T> Clone for SessionManager<S, T> {
403 fn clone(&self) -> Self {
404 Self {
405 session_initiations: self.session_initiations.clone(),
406 session_notifiers: self.session_notifiers.clone(),
407 sessions: self.sessions.clone(),
408 cfg: self.cfg.clone(),
409 msg_sender: self.msg_sender.clone(),
410 tag_allocator: self.tag_allocator.clone(),
411 session_tag_range: self.session_tag_range.clone(),
412 maximum_sessions: self.maximum_sessions,
413 }
414 }
415}
416
417const EXTERNAL_SEND_TIMEOUT: Duration = Duration::from_millis(200);
418
419impl<S, T> SessionManager<S, T>
420where
421 S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
422 T: futures::Sink<IncomingSession> + Clone + Send + Sync + Unpin + 'static,
423 S::Error: std::error::Error + Send + Sync + Clone + 'static,
424 T::Error: std::error::Error + Send + Sync + Clone + 'static,
425{
426 pub fn new(mut cfg: SessionManagerConfig, tag_allocator: Arc<dyn TagAllocator + Send + Sync>) -> Self {
429 let session_tag_range = tag_allocator.tag_range();
430 let maximum_sessions = tag_allocator.capacity().max(1) as usize;
431
432 debug_assert!(
433 session_tag_range.start >= ReservedTag::range().end,
434 "session tag range must not overlap with reserved tags"
435 );
436 cfg.surb_balance_notify_period = cfg
437 .surb_balance_notify_period
438 .map(|p| p.max(MIN_SURB_BUFFER_NOTIFICATION_PERIOD));
439 cfg.minimum_surb_buffer_duration = cfg.minimum_surb_buffer_duration.max(MIN_SURB_BUFFER_DURATION);
440
441 cfg.frame_mtu = cfg.frame_mtu.max(SESSION_MTU);
443 cfg.max_frame_timeout = cfg.max_frame_timeout.max(MIN_FRAME_TIMEOUT);
444
445 #[cfg(all(feature = "telemetry", not(test)))]
446 METRIC_ACTIVE_SESSIONS.set(0.0);
447
448 let msg_sender = Arc::new(OnceLock::new());
449 Self {
450 msg_sender: msg_sender.clone(),
451 session_initiations: moka::future::Cache::builder()
452 .max_capacity(maximum_sessions as u64)
453 .time_to_live(
454 2 * initiation_timeout_max_one_way(
455 cfg.initiation_timeout_base,
456 RoutingOptions::MAX_INTERMEDIATE_HOPS,
457 ),
458 )
459 .build(),
460 sessions: moka::future::Cache::builder()
461 .max_capacity(maximum_sessions as u64)
462 .time_to_idle(cfg.idle_timeout)
463 .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
464 moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
465 trace!(?session_id, ?reason, "session evicted from the cache");
466 close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
467 }
468 _ => {}
469 })
470 .build(),
471 session_notifiers: Arc::new(OnceLock::new()),
472 tag_allocator,
473 session_tag_range,
474 maximum_sessions,
475 cfg,
476 }
477 }
478
479 pub fn start(&self, msg_sender: S, new_session_notifier: T) -> crate::errors::Result<Vec<AbortHandle>> {
485 self.msg_sender
486 .set(msg_sender)
487 .map_err(|_| SessionManagerError::AlreadyStarted)?;
488
489 let (session_close_tx, session_close_rx) = futures::channel::mpsc::channel(self.maximum_sessions + 10);
490 self.session_notifiers
491 .set((new_session_notifier, session_close_tx))
492 .map_err(|_| SessionManagerError::AlreadyStarted)?;
493
494 let myself = self.clone();
495 let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable!(session_close_rx.for_each_concurrent(
496 None,
497 move |(session_id, closure_reason)| {
498 let myself = myself.clone();
499 async move {
500 if let Some(session_data) = myself.sessions.remove(&session_id).await {
504 close_session(session_id, session_data, closure_reason);
505 } else {
506 debug!(
508 ?session_id,
509 ?closure_reason,
510 "could not find session id to close, maybe the session is already closed"
511 );
512 }
513 }
514 },
515 ));
516
517 let myself = self.clone();
522 let ah_session_expiration = hopr_async_runtime::spawn_as_abortable!(async move {
523 let jitter = hopr_types::crypto_random::random_float_in_range(1.0..1.5);
524 let timeout = 2 * initiation_timeout_max_one_way(
525 myself.cfg.initiation_timeout_base,
526 RoutingOptions::MAX_INTERMEDIATE_HOPS,
527 )
528 .min(myself.cfg.idle_timeout)
529 .mul_f64(jitter)
530 / 2;
531 futures_time::stream::interval(timeout.into())
532 .for_each(|_| {
533 trace!("executing session cache evictions");
534 futures::future::join(
535 myself.sessions.run_pending_tasks(),
536 myself.session_initiations.run_pending_tasks(),
537 )
538 .map(|_| ())
539 })
540 .await;
541 });
542
543 Ok(vec![ah_closure_notifications, ah_session_expiration])
544 }
545
546 pub fn is_started(&self) -> bool {
548 self.session_notifiers.get().is_some()
549 }
550
551 async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
552 if let moka::ops::compute::CompResult::Inserted(_) = self
554 .sessions
555 .entry(session_id)
556 .and_compute_with(|entry| {
557 futures::future::ready(if entry.is_none() {
558 moka::ops::compute::Op::Put(slot)
559 } else {
560 moka::ops::compute::Op::Nop
561 })
562 })
563 .await
564 {
565 #[cfg(all(feature = "telemetry", not(test)))]
566 {
567 METRIC_NUM_INITIATED_SESSIONS.increment();
568 METRIC_ACTIVE_SESSIONS.increment(1.0);
569 }
570
571 Ok(())
572 } else {
573 error!(%session_id, "session already exists - loopback attempt");
575 Err(SessionManagerError::Loopback.into())
576 }
577 }
578
579 pub async fn new_session(
587 &self,
588 destination: Address,
589 target: SessionTarget,
590 cfg: SessionClientConfig,
591 ) -> crate::errors::Result<HoprSession> {
592 self.sessions.run_pending_tasks().await;
593 if self.maximum_sessions <= self.sessions.entry_count() as usize {
594 return Err(SessionManagerError::TooManySessions.into());
595 }
596
597 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
598
599 let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
600 let (challenge, _) = insert_into_next_slot(
601 &self.session_initiations,
602 |ch| {
603 if let Some(challenge) = ch {
604 ((challenge + 1) % hopr_types::crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
605 } else {
606 hopr_types::crypto_random::random_integer(MIN_CHALLENGE, None)
607 }
608 },
609 |_| tx_initiation_done,
610 )
611 .await
612 .ok_or(SessionManagerError::NoChallengeSlots)?; trace!(challenge, ?cfg, "initiating session with config");
616 let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
617 challenge,
618 target,
619 capabilities: ByteCapabilities(cfg.capabilities),
620 additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
621 cfg.surb_management
622 .map(|c| c.target_surb_buffer_size)
623 .unwrap_or(
624 self.cfg.initial_return_session_egress_rate as u64
625 * self
626 .cfg
627 .minimum_surb_buffer_duration
628 .max(MIN_SURB_BUFFER_DURATION)
629 .as_secs(),
630 )
631 .min(u32::MAX as u64) as u32
632 } else {
633 0
634 },
635 });
636
637 let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
638 let forward_routing = DestinationRouting::Forward {
639 destination: Box::new(destination.into()),
640 pseudonym: Some(pseudonym), forward_options: cfg.forward_path_options.clone(),
642 return_options: cfg.return_path_options.clone().into(),
643 };
644
645 info!(challenge, %pseudonym, %destination, "new session request");
647 msg_sender
648 .send((
649 forward_routing.clone(),
650 ApplicationDataOut::with_no_packet_info(start_session_msg.try_into()?),
651 ))
652 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
653 .await
654 .map_err(|_| {
655 error!(challenge, %pseudonym, %destination, "timeout sending session request message");
656 TransportSessionError::Timeout
657 })?
658 .map_err(TransportSessionError::packet_sending)?;
659
660 let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
662 self.cfg.initiation_timeout_base,
663 cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
664 )
665 .into();
666
667 pin_mut!(rx_initiation_done);
669
670 trace!(challenge, "awaiting session establishment");
671 match rx_initiation_done.try_next().timeout(initiation_timeout).await {
672 Ok(Ok(Some(est))) => {
673 let session_id = est.session_id;
675 debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
676
677 let (tx, rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
678 let notifier = self
679 .session_notifiers
680 .get()
681 .map(|(_, notifier)| {
682 let mut notifier = notifier.clone();
683 Box::new(move |session_id: SessionId, reason: ClosureReason| {
684 let _ = notifier
685 .try_send((session_id, reason))
686 .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
687 })
688 })
689 .ok_or(SessionManagerError::NotStarted)?;
690
691 if let Some(balancer_config) = cfg.surb_management {
695 let surb_estimator = AtomicSurbFlowEstimator::default();
696
697 let surb_estimator_clone = surb_estimator.clone();
699 let full_surb_scoring_sender =
700 msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
701 let produced = data.estimate_surbs_with_msg() as u64;
702 surb_estimator_clone
704 .produced
705 .fetch_add(produced, std::sync::atomic::Ordering::Relaxed);
706 #[cfg(feature = "telemetry")]
707 crate::telemetry::record_session_surb_produced(&session_id, produced);
708 futures::future::ok::<_, S::Error>((routing, data))
709 });
710
711 let max_out_organic_surbs = cfg.always_max_out_surbs;
714 let reduced_surb_scoring_sender = full_surb_scoring_sender.clone().with(
715 move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
719 if !max_out_organic_surbs {
720 data.packet_info
722 .get_or_insert_with(|| OutgoingPacketInfo {
723 max_surbs_in_packet: 1,
724 ..Default::default()
725 })
726 .max_surbs_in_packet = 1;
727 }
728 futures::future::ok::<_, S::Error>((routing, data))
729 },
730 );
731
732 let mut abort_handles = AbortableList::default();
733 let surb_mgmt = Arc::new(BalancerStateValues::from(balancer_config));
734
735 let (ka_controller, ka_abort_handle) = utils::spawn_keep_alive_stream(
737 session_id,
738 full_surb_scoring_sender,
739 forward_routing.clone(),
740 if self.cfg.surb_target_notify {
741 SurbNotificationMode::Target
742 } else {
743 SurbNotificationMode::DoNotNotify
744 },
745 surb_mgmt.clone(),
746 );
747 abort_handles.insert(SessionTasks::KeepAlive, ka_abort_handle);
748
749 debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
751 let balancer = SurbBalancer::new(
752 session_id,
753 PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
755 surb_estimator.clone(),
756 SurbControllerWithCorrection(ka_controller, HoprPacket::MAX_SURBS_IN_PACKET as u32),
759 surb_mgmt.clone(),
760 );
761
762 let (level_stream, balancer_abort_handle) =
763 balancer.start_control_loop(self.cfg.balancer_sampling_interval);
764 abort_handles.insert(SessionTasks::Balancer, balancer_abort_handle);
765
766 self.insert_session_slot(
768 session_id,
769 SessionSlot {
770 session_tx: Arc::new(tx),
771 routing_opts: forward_routing.clone(),
772 abort_handles: Arc::new(parking_lot::Mutex::new(abort_handles)),
773 surb_mgmt: surb_mgmt.clone(),
774 surb_estimator: surb_estimator.clone(),
775 allocated_tag: None,
776 },
777 )
778 .await?;
779
780 match level_stream
783 .skip_while(|current_level| {
784 futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
785 })
786 .next()
787 .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
788 .await
789 {
790 Ok(Some(surb_level)) => {
791 info!(%session_id, surb_level, "session is ready");
792 }
793 Ok(None) => {
794 return Err(
795 SessionManagerError::other(anyhow!("surb balancer was cancelled prematurely")).into(),
796 );
797 }
798 Err(_) => {
799 warn!(%session_id, "session didn't reach target SURB buffer size in time");
800 }
801 }
802
803 let surb_estimator_for_rx = surb_estimator.clone();
804 let session = HoprSession::new(
805 session_id,
806 forward_routing,
807 HoprSessionConfig {
808 capabilities: cfg.capabilities,
809 frame_mtu: self.cfg.frame_mtu,
810 frame_timeout: self.cfg.max_frame_timeout,
811 },
812 (
813 reduced_surb_scoring_sender,
814 rx.inspect(move |_| {
815 surb_estimator_for_rx
818 .consumed
819 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
820 #[cfg(feature = "telemetry")]
821 crate::telemetry::record_session_surb_consumed(&session_id, 1);
822 }),
823 ),
824 Some(notifier),
825 )?;
826
827 #[cfg(feature = "telemetry")]
828 {
829 initialize_session_metrics(
830 session_id,
831 HoprSessionConfig {
832 capabilities: cfg.capabilities,
833 frame_mtu: self.cfg.frame_mtu,
834 frame_timeout: self.cfg.max_frame_timeout,
835 },
836 );
837 set_session_state(&session_id, SessionLifecycleState::Active);
838 set_session_balancer_data(&session_id, surb_estimator.clone(), surb_mgmt.clone());
839 }
840
841 Ok(session)
842 } else {
843 warn!(%session_id, "session ready without SURB balancing");
844
845 self.insert_session_slot(
846 session_id,
847 SessionSlot {
848 session_tx: Arc::new(tx),
849 routing_opts: forward_routing.clone(),
850 abort_handles: Default::default(),
851 surb_mgmt: Default::default(), surb_estimator: Default::default(), allocated_tag: None,
854 },
855 )
856 .await?;
857
858 let max_out_organic_surbs = cfg.always_max_out_surbs;
861 let reduced_surb_sender =
862 msg_sender.with(move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
863 if !max_out_organic_surbs {
864 data.packet_info
865 .get_or_insert_with(|| OutgoingPacketInfo {
866 max_surbs_in_packet: 1,
867 ..Default::default()
868 })
869 .max_surbs_in_packet = 1;
870 }
871 futures::future::ok::<_, S::Error>((routing, data))
872 });
873
874 let session = HoprSession::new(
875 session_id,
876 forward_routing,
877 HoprSessionConfig {
878 capabilities: cfg.capabilities,
879 frame_mtu: self.cfg.frame_mtu,
880 frame_timeout: self.cfg.max_frame_timeout,
881 },
882 (reduced_surb_sender, rx),
883 Some(notifier),
884 )?;
885
886 #[cfg(feature = "telemetry")]
887 {
888 initialize_session_metrics(
889 session_id,
890 HoprSessionConfig {
891 capabilities: cfg.capabilities,
892 frame_mtu: self.cfg.frame_mtu,
893 frame_timeout: self.cfg.max_frame_timeout,
894 },
895 );
896 set_session_state(&session_id, SessionLifecycleState::Active);
897 }
898
899 Ok(session)
900 }
901 }
902 Ok(Ok(None)) => Err(SessionManagerError::other(anyhow!(
903 "internal error: sender has been closed without completing the session establishment"
904 ))
905 .into()),
906 Ok(Err(error)) => {
907 error!(
909 challenge = error.challenge,
910 ?error,
911 "the other party rejected the session initiation with error"
912 );
913 Err(TransportSessionError::Rejected(error.reason))
914 }
915 Err(_) => {
916 error!(challenge, "session initiation attempt timed out");
918
919 #[cfg(all(feature = "telemetry", not(test)))]
920 METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
921
922 Err(TransportSessionError::Timeout)
923 }
924 }
925 }
926
927 pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
931 if let Some(session_data) = self.sessions.get(id).await {
932 trace!(session_id = ?id, "pinging manually session");
933 Ok(self
934 .msg_sender
935 .get()
936 .cloned()
937 .ok_or(SessionManagerError::NotStarted)?
938 .send((
939 session_data.routing_opts.clone(),
940 ApplicationDataOut::with_no_packet_info(HoprStartProtocol::KeepAlive((*id).into()).try_into()?),
941 ))
942 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
943 .await
944 .map_err(|_| {
945 error!("timeout sending session ping message");
946 TransportSessionError::Timeout
947 })?
948 .map_err(TransportSessionError::packet_sending)?)
949 } else {
950 Err(SessionManagerError::NonExistingSession.into())
951 }
952 }
953
954 pub async fn active_sessions(&self) -> Vec<SessionId> {
956 self.sessions.run_pending_tasks().await;
957 self.sessions.iter().map(|(k, _)| *k).collect()
958 }
959
960 pub async fn update_surb_balancer_config(
965 &self,
966 id: &SessionId,
967 config: SurbBalancerConfig,
968 ) -> crate::errors::Result<()> {
969 let cfg = self
970 .sessions
971 .get(id)
972 .await
973 .ok_or(SessionManagerError::NonExistingSession)?
974 .surb_mgmt;
975
976 if !cfg.is_disabled() {
978 cfg.update(&config);
979 Ok(())
980 } else {
981 Err(SessionManagerError::other(anyhow!("session does not use SURB balancing")).into())
982 }
983 }
984
985 pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
989 match self.sessions.get(id).await {
990 Some(session) => Ok(Some(session.surb_mgmt.as_ref())
991 .filter(|c| !c.is_disabled())
992 .map(|d| d.as_config())),
993 None => Err(SessionManagerError::NonExistingSession.into()),
994 }
995 }
996
997 pub async fn get_surb_level_estimates(&self, id: &SessionId) -> crate::errors::Result<(u64, u64)> {
1004 match self.sessions.get(id).await {
1005 Some(session) => Ok((
1006 session
1007 .surb_estimator
1008 .produced
1009 .load(std::sync::atomic::Ordering::Relaxed),
1010 session
1011 .surb_estimator
1012 .consumed
1013 .load(std::sync::atomic::Ordering::Relaxed),
1014 )),
1015 None => Err(SessionManagerError::NonExistingSession.into()),
1016 }
1017 }
1018
1019 pub async fn dispatch_message(
1026 &self,
1027 pseudonym: HoprPseudonym,
1028 in_data: ApplicationDataIn,
1029 ) -> crate::errors::Result<DispatchResult> {
1030 if in_data.data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
1031 trace!("dispatching Start protocol message");
1033 return self
1034 .handle_start_protocol_message(pseudonym, in_data)
1035 .await
1036 .map(|_| DispatchResult::Processed);
1037 } else if self.session_tag_range.contains(&in_data.data.application_tag.as_u64()) {
1038 let session_id = SessionId::new(in_data.data.application_tag, pseudonym);
1039
1040 return if let Some(session_slot) = self.sessions.get(&session_id).await {
1041 trace!(?session_id, "received data for a registered session");
1042
1043 Ok(session_slot
1044 .session_tx
1045 .unbounded_send(in_data)
1046 .map(|_| DispatchResult::Processed)
1047 .map_err(SessionManagerError::other)?)
1048 } else {
1049 error!(%session_id, "received data from an unestablished session");
1050 Err(TransportSessionError::UnknownData)
1051 };
1052 }
1053
1054 trace!(tag = %in_data.data.application_tag, "received data not associated with session protocol or any existing session");
1055 Ok(DispatchResult::Unrelated(in_data))
1056 }
1057
1058 async fn handle_incoming_session_initiation(
1059 &self,
1060 pseudonym: HoprPseudonym,
1061 session_req: StartInitiation<SessionTarget, ByteCapabilities>,
1062 ) -> crate::errors::Result<()> {
1063 trace!(challenge = session_req.challenge, "received session initiation request");
1064
1065 debug!(%pseudonym, "got new session request, searching for a free session slot");
1066
1067 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
1068
1069 let (mut new_session_notifier, mut close_session_notifier) = self
1070 .session_notifiers
1071 .get()
1072 .cloned()
1073 .ok_or(SessionManagerError::NotStarted)?;
1074
1075 let reply_routing = DestinationRouting::Return(pseudonym.into());
1077
1078 let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1079
1080 self.sessions.run_pending_tasks().await; let allocated_tag = if self.maximum_sessions > self.sessions.entry_count() as usize {
1083 self.tag_allocator.allocate()
1084 } else {
1085 error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
1086 None
1087 };
1088
1089 if let Some(allocated_tag) = allocated_tag {
1090 let session_id = SessionId::new(allocated_tag.value(), pseudonym);
1091 let allocated_tag = Arc::new(allocated_tag);
1092
1093 let slot = SessionSlot {
1094 session_tx: Arc::new(tx_session_data),
1095 routing_opts: reply_routing.clone(),
1096 abort_handles: Default::default(),
1097 surb_mgmt: Default::default(),
1098 surb_estimator: Default::default(),
1099 allocated_tag: Some(allocated_tag),
1100 };
1101 self.sessions.insert(session_id, slot.clone()).await;
1102
1103 debug!(%session_id, ?session_req, "assigned a new session");
1104
1105 let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
1106 if let Err(error) = close_session_notifier.try_send((session_id, reason)) {
1107 error!(%session_id, %error, %reason, "failed to notify session closure");
1108 }
1109 });
1110
1111 let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
1112 let egress_rate_control =
1114 RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1115
1116 let target_surb_buffer_size = if session_req.additional_data > 0 {
1119 (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1120 } else {
1121 self.cfg.initial_return_session_egress_rate as u64
1122 * self
1123 .cfg
1124 .minimum_surb_buffer_duration
1125 .max(MIN_SURB_BUFFER_DURATION)
1126 .as_secs()
1127 };
1128
1129 let surb_estimator_clone = slot.surb_estimator.clone();
1130 let session = HoprSession::new(
1131 session_id,
1132 reply_routing.clone(),
1133 HoprSessionConfig {
1134 capabilities: session_req.capabilities.into(),
1135 frame_mtu: self.cfg.frame_mtu,
1136 frame_timeout: self.cfg.max_frame_timeout,
1137 },
1138 (
1139 msg_sender
1141 .clone()
1142 .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1143 surb_estimator_clone
1145 .consumed
1146 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1147 #[cfg(feature = "telemetry")]
1148 crate::telemetry::record_session_surb_consumed(&session_id, 1);
1149 futures::future::ok::<_, S::Error>((routing, data))
1150 })
1151 .rate_limit_with_controller(&egress_rate_control)
1152 .buffer((2 * target_surb_buffer_size) as usize),
1153 rx_session_data.inspect(move |data| {
1155 let produced = data.num_surbs_with_msg() as u64;
1156 surb_estimator_clone
1158 .produced
1159 .fetch_add(produced, std::sync::atomic::Ordering::Relaxed);
1160 #[cfg(feature = "telemetry")]
1161 crate::telemetry::record_session_surb_produced(&session_id, produced);
1162 }),
1163 ),
1164 Some(closure_notifier),
1165 )?;
1166
1167 let balancer_config = SurbBalancerConfig {
1171 target_surb_buffer_size,
1172 max_surbs_per_sec: target_surb_buffer_size / self.cfg.minimum_surb_buffer_duration.as_secs(),
1174 surb_decay: None,
1177 };
1178
1179 slot.surb_mgmt.update(&balancer_config);
1180
1181 debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1184 let balancer = SurbBalancer::new(
1185 session_id,
1186 SimpleBalancerController::default(),
1187 slot.surb_estimator.clone(),
1188 SurbControllerWithCorrection(egress_rate_control, 1), slot.surb_mgmt.clone(),
1190 );
1191
1192 let (_, balancer_abort_handle) = balancer.start_control_loop(self.cfg.balancer_sampling_interval);
1194 slot.abort_handles
1195 .lock()
1196 .insert(SessionTasks::Balancer, balancer_abort_handle);
1197
1198 if let Some(period) = self.cfg.surb_balance_notify_period {
1200 let surb_estimator_clone = slot.surb_estimator.clone();
1201 let (ka_controller, ka_abort_handle) = utils::spawn_keep_alive_stream(
1202 session_id,
1203 msg_sender
1205 .clone()
1206 .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1207 surb_estimator_clone
1209 .consumed
1210 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1211 #[cfg(feature = "telemetry")]
1212 crate::telemetry::record_session_surb_consumed(&session_id, 1);
1213 futures::future::ok::<_, S::Error>((routing, data))
1214 }),
1215 slot.routing_opts.clone(),
1216 SurbNotificationMode::Level(slot.surb_estimator.clone()),
1217 slot.surb_mgmt.clone(),
1218 );
1219
1220 hopr_async_runtime::prelude::spawn(async move {
1222 hopr_async_runtime::prelude::sleep(period).await;
1224 ka_controller.set_rate_per_unit(1, period);
1225 });
1226
1227 slot.abort_handles
1228 .lock()
1229 .insert(SessionTasks::KeepAlive, ka_abort_handle);
1230
1231 debug!(%session_id, ?period, "started SURB level-notifying keep-alive stream");
1232 }
1233
1234 session
1235 } else {
1236 HoprSession::new(
1237 session_id,
1238 reply_routing.clone(),
1239 HoprSessionConfig {
1240 capabilities: session_req.capabilities.into(),
1241 frame_mtu: self.cfg.frame_mtu,
1242 frame_timeout: self.cfg.max_frame_timeout,
1243 },
1244 (msg_sender.clone(), rx_session_data),
1245 Some(closure_notifier),
1246 )?
1247 };
1248
1249 let incoming_session = IncomingSession {
1251 session,
1252 target: session_req.target,
1253 };
1254
1255 match new_session_notifier
1257 .send(incoming_session)
1258 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1259 .await
1260 {
1261 Err(_) => {
1262 error!(%session_id, "timeout to notify about new incoming session");
1263 return Err(TransportSessionError::Timeout);
1264 }
1265 Ok(Err(error)) => {
1266 error!(%session_id, %error, "failed to notify about new incoming session");
1267 return Err(SessionManagerError::other(error).into());
1268 }
1269 _ => {}
1270 };
1271
1272 trace!(?session_id, "session notification sent");
1273
1274 let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1277 orig_challenge: session_req.challenge,
1278 session_id,
1279 });
1280
1281 msg_sender
1282 .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1283 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1284 .await
1285 .map_err(|_| {
1286 error!(%session_id, "timeout sending session establishment message");
1287 TransportSessionError::Timeout
1288 })?
1289 .map_err(|error| {
1290 error!(%session_id, %error, "failed to send session establishment message");
1291 SessionManagerError::other(error)
1292 })?;
1293
1294 #[cfg(feature = "telemetry")]
1295 {
1296 initialize_session_metrics(
1297 session_id,
1298 HoprSessionConfig {
1299 capabilities: session_req.capabilities.0,
1300 frame_mtu: self.cfg.frame_mtu,
1301 frame_timeout: self.cfg.max_frame_timeout,
1302 },
1303 );
1304 set_session_state(&session_id, SessionLifecycleState::Active);
1305 set_session_balancer_data(&session_id, slot.surb_estimator.clone(), slot.surb_mgmt.clone());
1306 }
1307
1308 info!(%session_id, "new session established");
1309
1310 #[cfg(all(feature = "telemetry", not(test)))]
1311 {
1312 METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1313 METRIC_ACTIVE_SESSIONS.increment(1.0);
1314 }
1315 } else {
1316 error!(%pseudonym,"failed to reserve a new session slot");
1317
1318 let reason = StartErrorReason::NoSlotsAvailable;
1320 let data = HoprStartProtocol::SessionError(StartErrorType {
1321 challenge: session_req.challenge,
1322 reason,
1323 });
1324
1325 msg_sender
1326 .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1327 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1328 .await
1329 .map_err(|_| {
1330 error!("timeout sending session error message");
1331 TransportSessionError::Timeout
1332 })?
1333 .map_err(|error| {
1334 error!(%error, "failed to send session error message");
1335 SessionManagerError::other(error)
1336 })?;
1337
1338 trace!(%pseudonym, "session establishment failure message sent");
1339
1340 #[cfg(all(feature = "telemetry", not(test)))]
1341 METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1342 }
1343
1344 Ok(())
1345 }
1346
1347 async fn handle_start_protocol_message(
1348 &self,
1349 pseudonym: HoprPseudonym,
1350 data: ApplicationDataIn,
1351 ) -> crate::errors::Result<()> {
1352 match HoprStartProtocol::try_from(data.data)? {
1353 HoprStartProtocol::StartSession(session_req) => {
1354 self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1355 }
1356 HoprStartProtocol::SessionEstablished(est) => {
1357 trace!(
1358 session_id = ?est.session_id,
1359 "received session establishment confirmation"
1360 );
1361 let challenge = est.orig_challenge;
1362 let session_id = est.session_id;
1363 if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1364 if let Err(error) = tx_est.unbounded_send(Ok(est)) {
1365 error!(%challenge, %session_id, %error, "failed to send session establishment confirmation");
1366 return Err(SessionManagerError::other(error).into());
1367 }
1368 debug!(?session_id, challenge, "session establishment complete");
1369 } else {
1370 error!(%session_id, challenge, "unknown session establishment attempt or expired");
1371 }
1372 }
1373 HoprStartProtocol::SessionError(error_type) => {
1374 trace!(
1375 challenge = error_type.challenge,
1376 error = ?error_type.reason,
1377 "failed to initialize a session",
1378 );
1379 if let Some(tx_est) = self.session_initiations.remove(&error_type.challenge).await {
1382 if let Err(error) = tx_est.unbounded_send(Err(error_type)) {
1383 error!(%error, ?error_type, "could not send session error message");
1384 return Err(SessionManagerError::other(error).into());
1385 }
1386 error!(
1387 challenge = error_type.challenge,
1388 ?error_type,
1389 "session establishment error received"
1390 );
1391 } else {
1392 error!(
1393 challenge = error_type.challenge,
1394 ?error_type,
1395 "session establishment attempt expired before error could be delivered"
1396 );
1397 }
1398
1399 #[cfg(all(feature = "telemetry", not(test)))]
1400 METRIC_RECEIVED_SESSION_ERRS.increment(&[&error_type.reason.to_string()])
1401 }
1402 HoprStartProtocol::KeepAlive(msg) => {
1403 let session_id = msg.session_id;
1404 if let Some(session_slot) = self.sessions.get(&session_id).await {
1405 trace!(?session_id, "received keep-alive message");
1406 match &session_slot.routing_opts {
1407 DestinationRouting::Forward { .. } => {
1409 if msg.flags.contains(KeepAliveFlag::BalancerState)
1410 && !session_slot.surb_mgmt.is_disabled()
1411 && session_slot.surb_mgmt.buffer_level() != msg.additional_data
1412 {
1413 session_slot
1415 .surb_mgmt
1416 .buffer_level
1417 .store(msg.additional_data, std::sync::atomic::Ordering::Relaxed);
1418 debug!(%session_id, surb_level = msg.additional_data, "keep-alive updated SURB buffer size from the Exit");
1419 }
1420
1421 session_slot
1423 .surb_estimator
1424 .consumed
1425 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1426 #[cfg(feature = "telemetry")]
1427 crate::telemetry::record_session_surb_consumed(&session_id, 1);
1428 }
1429 DestinationRouting::Return(_) => {
1431 if msg.flags.contains(KeepAliveFlag::BalancerTarget)
1433 && msg.additional_data > 0
1434 && !session_slot.surb_mgmt.is_disabled()
1435 && session_slot.surb_mgmt.controller_bounds().target() != msg.additional_data
1436 {
1437 session_slot
1439 .surb_mgmt
1440 .target_surb_buffer_size
1441 .store(msg.additional_data, std::sync::atomic::Ordering::Relaxed);
1442 session_slot.surb_mgmt.max_surbs_per_sec.store(
1444 msg.additional_data / self.cfg.minimum_surb_buffer_duration.as_secs(),
1445 std::sync::atomic::Ordering::Relaxed,
1446 );
1447 debug!(%session_id, target_surb_buffer_size = msg.additional_data, "keep-alive updated SURB balancer target buffer size from the Entry");
1448 }
1449
1450 let produced = KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64;
1453 session_slot
1454 .surb_estimator
1455 .produced
1456 .fetch_add(produced, std::sync::atomic::Ordering::Relaxed);
1457 #[cfg(feature = "telemetry")]
1458 crate::telemetry::record_session_surb_produced(&session_id, produced);
1459 }
1460 }
1461 } else {
1462 debug!(%session_id, "received keep-alive request for an unknown session");
1463 }
1464 }
1465 }
1466
1467 Ok(())
1468 }
1469}
1470
1471#[cfg(test)]
1472mod tests {
1473 use anyhow::anyhow;
1474 use futures::{AsyncWriteExt, future::BoxFuture};
1475 use hopr_network_types::prelude::SealedHost;
1476 use hopr_protocol_start::{StartProtocol, StartProtocolDiscriminants};
1477 use hopr_types::{
1478 crypto::{keypairs::ChainKeypair, prelude::Keypair},
1479 crypto_random::Randomizable,
1480 internal::routing::SurbMatcher,
1481 primitive::prelude::Address,
1482 };
1483 use tokio::time::timeout;
1484
1485 use super::*;
1486 use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1487
1488 fn test_tag_allocator() -> Arc<dyn TagAllocator + Send + Sync> {
1490 test_tag_allocator_with_session_capacity(10000)
1491 }
1492
1493 fn test_tag_allocator_with_session_capacity(session_capacity: u64) -> Arc<dyn TagAllocator + Send + Sync> {
1495 hopr_transport_tag_allocator::create_allocators(
1496 ReservedTag::range().end..u16::MAX as u64 + 1,
1497 [
1498 (hopr_transport_tag_allocator::Usage::Session, session_capacity),
1499 (hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry, 10000),
1500 (hopr_transport_tag_allocator::Usage::ProvingTelemetry, 10000),
1501 ],
1502 )
1503 .expect("test allocator creation must not fail")
1504 .into_iter()
1505 .find(|(u, _)| matches!(u, hopr_transport_tag_allocator::Usage::Session))
1506 .expect("session allocator must exist")
1507 .1
1508 }
1509
1510 #[async_trait::async_trait]
1511 trait SendMsg {
1512 async fn send_message(
1513 &self,
1514 routing: DestinationRouting,
1515 data: ApplicationDataOut,
1516 ) -> crate::errors::Result<()>;
1517 }
1518
1519 mockall::mock! {
1520 MsgSender {}
1521 impl SendMsg for MsgSender {
1522 fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationDataOut)
1523 -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1524 }
1525 }
1526
1527 fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationDataOut)> {
1528 let (tx, rx) = futures::channel::mpsc::unbounded();
1529 tokio::task::spawn(async move {
1530 pin_mut!(rx);
1531 while let Some((routing, data)) = rx.next().await {
1532 sender
1533 .send_message(routing, data)
1534 .await
1535 .expect("send message must not fail in mock");
1536 }
1537 });
1538 tx
1539 }
1540
1541 fn msg_type(data: &ApplicationDataOut, expected: StartProtocolDiscriminants) -> bool {
1542 HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1543 .map(|d| StartProtocolDiscriminants::from(d) == expected)
1544 .unwrap_or(false)
1545 }
1546
1547 fn start_msg_match(data: &ApplicationDataOut, msg: impl Fn(HoprStartProtocol) -> bool) -> bool {
1548 HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1549 .map(msg)
1550 .unwrap_or(false)
1551 }
1552
1553 #[test_log::test(tokio::test)]
1554 async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1555 {
1556 let alice_pseudonym = HoprPseudonym::random();
1557 let bob_peer: Address = (&ChainKeypair::random()).into();
1558
1559 let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1560 let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1561
1562 let mut sequence = mockall::Sequence::new();
1563 let mut alice_transport = MockMsgSender::new();
1564 let mut bob_transport = MockMsgSender::new();
1565
1566 let bob_mgr_clone = bob_mgr.clone();
1568 alice_transport
1569 .expect_send_message()
1570 .once()
1571 .in_sequence(&mut sequence)
1572 .withf(move |peer, data| {
1573 info!("alice sends {}", data.data.application_tag);
1574 msg_type(data, StartProtocolDiscriminants::StartSession)
1575 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1576 })
1577 .returning(move |_, data| {
1578 let bob_mgr_clone = bob_mgr_clone.clone();
1579 Box::pin(async move {
1580 bob_mgr_clone
1581 .dispatch_message(
1582 alice_pseudonym,
1583 ApplicationDataIn {
1584 data: data.data,
1585 packet_info: Default::default(),
1586 },
1587 )
1588 .await?;
1589 Ok(())
1590 })
1591 });
1592
1593 let alice_mgr_clone = alice_mgr.clone();
1595 bob_transport
1596 .expect_send_message()
1597 .once()
1598 .in_sequence(&mut sequence)
1599 .withf(move |peer, data| {
1600 info!("bob sends {}", data.data.application_tag);
1601 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1602 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1603 })
1604 .returning(move |_, data| {
1605 let alice_mgr_clone = alice_mgr_clone.clone();
1606
1607 Box::pin(async move {
1608 alice_mgr_clone
1609 .dispatch_message(
1610 alice_pseudonym,
1611 ApplicationDataIn {
1612 data: data.data,
1613 packet_info: Default::default(),
1614 },
1615 )
1616 .await?;
1617 Ok(())
1618 })
1619 });
1620
1621 let bob_mgr_clone = bob_mgr.clone();
1623 alice_transport
1624 .expect_send_message()
1625 .once()
1626 .in_sequence(&mut sequence)
1627 .withf(move |peer, data| {
1628 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1629 data.data.plain_text.as_ref(),
1630 )
1631 .expect("must be a session message")
1632 .try_as_segment()
1633 .expect("must be a segment")
1634 .is_terminating()
1635 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1636 })
1637 .returning(move |_, data| {
1638 let bob_mgr_clone = bob_mgr_clone.clone();
1639 Box::pin(async move {
1640 bob_mgr_clone
1641 .dispatch_message(
1642 alice_pseudonym,
1643 ApplicationDataIn {
1644 data: data.data,
1645 packet_info: Default::default(),
1646 },
1647 )
1648 .await?;
1649 Ok(())
1650 })
1651 });
1652
1653 let mut ahs = Vec::new();
1654
1655 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1657 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1658
1659 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1661 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1662
1663 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1664
1665 pin_mut!(new_session_rx_bob);
1666 let (alice_session, bob_session) = timeout(
1667 Duration::from_secs(2),
1668 futures::future::join(
1669 alice_mgr.new_session(
1670 bob_peer,
1671 SessionTarget::TcpStream(target.clone()),
1672 SessionClientConfig {
1673 pseudonym: alice_pseudonym.into(),
1674 capabilities: Capability::NoRateControl | Capability::Segmentation,
1675 surb_management: None,
1676 ..Default::default()
1677 },
1678 ),
1679 new_session_rx_bob.next(),
1680 ),
1681 )
1682 .await?;
1683
1684 let mut alice_session = alice_session?;
1685 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1686
1687 assert_eq!(
1688 alice_session.config().capabilities,
1689 Capability::Segmentation | Capability::NoRateControl
1690 );
1691 assert_eq!(
1692 alice_session.config().capabilities,
1693 bob_session.session.config().capabilities
1694 );
1695 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1696
1697 assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1698 assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1699 assert!(
1700 alice_mgr
1701 .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1702 .await
1703 .is_err()
1704 );
1705
1706 assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1707 assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1708 assert!(
1709 bob_mgr
1710 .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1711 .await
1712 .is_err()
1713 );
1714
1715 tokio::time::sleep(Duration::from_millis(100)).await;
1716 alice_session.close().await?;
1717
1718 tokio::time::sleep(Duration::from_millis(100)).await;
1719
1720 assert!(matches!(
1721 alice_mgr.ping_session(alice_session.id()).await,
1722 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1723 ));
1724
1725 futures::stream::iter(ahs)
1726 .for_each(|ah| async move { ah.abort() })
1727 .await;
1728
1729 Ok(())
1730 }
1731
1732 #[test_log::test(tokio::test)]
1733 async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1734 let alice_pseudonym = HoprPseudonym::random();
1735 let bob_peer: Address = (&ChainKeypair::random()).into();
1736
1737 let cfg = SessionManagerConfig {
1738 idle_timeout: Duration::from_millis(200),
1739 ..Default::default()
1740 };
1741
1742 let alice_mgr = SessionManager::new(cfg, test_tag_allocator());
1743 let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1744
1745 let mut sequence = mockall::Sequence::new();
1746 let mut alice_transport = MockMsgSender::new();
1747 let mut bob_transport = MockMsgSender::new();
1748
1749 let bob_mgr_clone = bob_mgr.clone();
1751 alice_transport
1752 .expect_send_message()
1753 .once()
1754 .in_sequence(&mut sequence)
1755 .withf(move |peer, data| {
1756 msg_type(data, StartProtocolDiscriminants::StartSession)
1757 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1758 })
1759 .returning(move |_, data| {
1760 let bob_mgr_clone = bob_mgr_clone.clone();
1761 Box::pin(async move {
1762 bob_mgr_clone
1763 .dispatch_message(
1764 alice_pseudonym,
1765 ApplicationDataIn {
1766 data: data.data,
1767 packet_info: Default::default(),
1768 },
1769 )
1770 .await?;
1771 Ok(())
1772 })
1773 });
1774
1775 let alice_mgr_clone = alice_mgr.clone();
1777 bob_transport
1778 .expect_send_message()
1779 .once()
1780 .in_sequence(&mut sequence)
1781 .withf(move |peer, data| {
1782 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1783 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1784 })
1785 .returning(move |_, data| {
1786 let alice_mgr_clone = alice_mgr_clone.clone();
1787
1788 Box::pin(async move {
1789 alice_mgr_clone
1790 .dispatch_message(
1791 alice_pseudonym,
1792 ApplicationDataIn {
1793 data: data.data,
1794 packet_info: Default::default(),
1795 },
1796 )
1797 .await?;
1798 Ok(())
1799 })
1800 });
1801
1802 let mut ahs = Vec::new();
1803
1804 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1806 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1807
1808 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1810 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1811
1812 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1813
1814 pin_mut!(new_session_rx_bob);
1815 let (alice_session, bob_session) = timeout(
1816 Duration::from_secs(2),
1817 futures::future::join(
1818 alice_mgr.new_session(
1819 bob_peer,
1820 SessionTarget::TcpStream(target.clone()),
1821 SessionClientConfig {
1822 pseudonym: alice_pseudonym.into(),
1823 capabilities: Capability::NoRateControl | Capability::Segmentation,
1824 surb_management: None,
1825 ..Default::default()
1826 },
1827 ),
1828 new_session_rx_bob.next(),
1829 ),
1830 )
1831 .await?;
1832
1833 let alice_session = alice_session?;
1834 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1835
1836 assert_eq!(
1837 alice_session.config().capabilities,
1838 Capability::Segmentation | Capability::NoRateControl,
1839 );
1840 assert_eq!(
1841 alice_session.config().capabilities,
1842 bob_session.session.config().capabilities
1843 );
1844 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1845
1846 tokio::time::sleep(Duration::from_millis(300)).await;
1848
1849 assert!(matches!(
1850 alice_mgr.ping_session(alice_session.id()).await,
1851 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1852 ));
1853
1854 futures::stream::iter(ahs)
1855 .for_each(|ah| async move { ah.abort() })
1856 .await;
1857
1858 Ok(())
1859 }
1860
1861 #[test_log::test(tokio::test)]
1862 async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1863 let alice_pseudonym = HoprPseudonym::random();
1864 let session_id = SessionId::new(16u64, alice_pseudonym);
1865 let balancer_cfg = SurbBalancerConfig {
1866 target_surb_buffer_size: 1000,
1867 max_surbs_per_sec: 100,
1868 ..Default::default()
1869 };
1870
1871 let alice_mgr = SessionManager::<
1872 UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
1873 futures::channel::mpsc::Sender<IncomingSession>,
1874 >::new(Default::default(), test_tag_allocator());
1875
1876 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1877 alice_mgr
1878 .sessions
1879 .insert(
1880 session_id,
1881 SessionSlot {
1882 session_tx: Arc::new(dummy_tx),
1883 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1884 abort_handles: Default::default(),
1885 surb_mgmt: Arc::new(BalancerStateValues::from(balancer_cfg)),
1886 surb_estimator: Default::default(),
1887 allocated_tag: None,
1888 },
1889 )
1890 .await;
1891
1892 let actual_cfg = alice_mgr
1893 .get_surb_balancer_config(&session_id)
1894 .await?
1895 .ok_or(anyhow!("session must have a surb balancer config"))?;
1896 assert_eq!(actual_cfg, balancer_cfg);
1897
1898 let new_cfg = SurbBalancerConfig {
1899 target_surb_buffer_size: 2000,
1900 max_surbs_per_sec: 200,
1901 ..Default::default()
1902 };
1903 alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1904
1905 let actual_cfg = alice_mgr
1906 .get_surb_balancer_config(&session_id)
1907 .await?
1908 .ok_or(anyhow!("session must have a surb balancer config"))?;
1909 assert_eq!(actual_cfg, new_cfg);
1910
1911 Ok(())
1912 }
1913
1914 #[test_log::test(tokio::test)]
1915 async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1916 let alice_pseudonym = HoprPseudonym::random();
1917 let bob_peer: Address = (&ChainKeypair::random()).into();
1918
1919 let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1920 let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator_with_session_capacity(1));
1921
1922 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1924 bob_mgr
1925 .sessions
1926 .insert(
1927 SessionId::new(16u64, alice_pseudonym),
1928 SessionSlot {
1929 session_tx: Arc::new(dummy_tx),
1930 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1931 abort_handles: Default::default(),
1932 surb_mgmt: Default::default(),
1933 surb_estimator: Default::default(),
1934 allocated_tag: None,
1935 },
1936 )
1937 .await;
1938
1939 let mut sequence = mockall::Sequence::new();
1940 let mut alice_transport = MockMsgSender::new();
1941 let mut bob_transport = MockMsgSender::new();
1942
1943 let bob_mgr_clone = bob_mgr.clone();
1945 alice_transport
1946 .expect_send_message()
1947 .once()
1948 .in_sequence(&mut sequence)
1949 .withf(move |peer, data| {
1950 msg_type(data, StartProtocolDiscriminants::StartSession)
1951 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1952 })
1953 .returning(move |_, data| {
1954 let bob_mgr_clone = bob_mgr_clone.clone();
1955 Box::pin(async move {
1956 bob_mgr_clone
1957 .dispatch_message(
1958 alice_pseudonym,
1959 ApplicationDataIn {
1960 data: data.data,
1961 packet_info: Default::default(),
1962 },
1963 )
1964 .await?;
1965 Ok(())
1966 })
1967 });
1968
1969 let alice_mgr_clone = alice_mgr.clone();
1971 bob_transport
1972 .expect_send_message()
1973 .once()
1974 .in_sequence(&mut sequence)
1975 .withf(move |peer, data| {
1976 msg_type(data, StartProtocolDiscriminants::SessionError)
1977 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1978 })
1979 .returning(move |_, data| {
1980 let alice_mgr_clone = alice_mgr_clone.clone();
1981 Box::pin(async move {
1982 alice_mgr_clone
1983 .dispatch_message(
1984 alice_pseudonym,
1985 ApplicationDataIn {
1986 data: data.data,
1987 packet_info: Default::default(),
1988 },
1989 )
1990 .await?;
1991 Ok(())
1992 })
1993 });
1994
1995 let mut jhs = Vec::new();
1996
1997 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1999 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2000
2001 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2003 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2004
2005 let result = alice_mgr
2006 .new_session(
2007 bob_peer,
2008 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2009 SessionClientConfig {
2010 capabilities: Capabilities::empty(),
2011 pseudonym: alice_pseudonym.into(),
2012 surb_management: None,
2013 ..Default::default()
2014 },
2015 )
2016 .await;
2017
2018 assert!(
2019 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2020 );
2021
2022 Ok(())
2023 }
2024
2025 #[test_log::test(tokio::test)]
2026 async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
2027 -> anyhow::Result<()> {
2028 let alice_pseudonym = HoprPseudonym::random();
2029 let bob_peer: Address = (&ChainKeypair::random()).into();
2030
2031 let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2032 let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator_with_session_capacity(1));
2033
2034 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
2036 bob_mgr
2037 .sessions
2038 .insert(
2039 SessionId::new(16u64, alice_pseudonym),
2040 SessionSlot {
2041 session_tx: Arc::new(dummy_tx),
2042 routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
2043 abort_handles: Default::default(),
2044 surb_mgmt: Default::default(),
2045 surb_estimator: Default::default(),
2046 allocated_tag: None,
2047 },
2048 )
2049 .await;
2050
2051 let mut sequence = mockall::Sequence::new();
2052 let mut alice_transport = MockMsgSender::new();
2053 let mut bob_transport = MockMsgSender::new();
2054
2055 let bob_mgr_clone = bob_mgr.clone();
2057 alice_transport
2058 .expect_send_message()
2059 .once()
2060 .in_sequence(&mut sequence)
2061 .withf(move |peer, data| {
2062 msg_type(data, StartProtocolDiscriminants::StartSession)
2063 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2064 })
2065 .returning(move |_, data| {
2066 let bob_mgr_clone = bob_mgr_clone.clone();
2067 Box::pin(async move {
2068 bob_mgr_clone
2069 .dispatch_message(
2070 alice_pseudonym,
2071 ApplicationDataIn {
2072 data: data.data,
2073 packet_info: Default::default(),
2074 },
2075 )
2076 .await?;
2077 Ok(())
2078 })
2079 });
2080
2081 let alice_mgr_clone = alice_mgr.clone();
2083 bob_transport
2084 .expect_send_message()
2085 .once()
2086 .in_sequence(&mut sequence)
2087 .withf(move |peer, data| {
2088 msg_type(data, StartProtocolDiscriminants::SessionError)
2089 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2090 })
2091 .returning(move |_, data| {
2092 let alice_mgr_clone = alice_mgr_clone.clone();
2093 Box::pin(async move {
2094 alice_mgr_clone
2095 .dispatch_message(
2096 alice_pseudonym,
2097 ApplicationDataIn {
2098 data: data.data,
2099 packet_info: Default::default(),
2100 },
2101 )
2102 .await?;
2103 Ok(())
2104 })
2105 });
2106
2107 let mut jhs = Vec::new();
2108
2109 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2111 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2112
2113 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2115 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2116
2117 let result = alice_mgr
2118 .new_session(
2119 bob_peer,
2120 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2121 SessionClientConfig {
2122 capabilities: None.into(),
2123 pseudonym: alice_pseudonym.into(),
2124 surb_management: None,
2125 ..Default::default()
2126 },
2127 )
2128 .await;
2129
2130 assert!(
2131 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2132 );
2133
2134 Ok(())
2135 }
2136
2137 #[test_log::test(tokio::test)]
2138 async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
2139 let alice_pseudonym = HoprPseudonym::random();
2140 let bob_peer: Address = (&ChainKeypair::random()).into();
2141
2142 let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2143
2144 let mut sequence = mockall::Sequence::new();
2145 let mut alice_transport = MockMsgSender::new();
2146
2147 let alice_mgr_clone = alice_mgr.clone();
2149 alice_transport
2150 .expect_send_message()
2151 .once()
2152 .in_sequence(&mut sequence)
2153 .withf(move |peer, data| {
2154 msg_type(data, StartProtocolDiscriminants::StartSession)
2155 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2156 })
2157 .returning(move |_, data| {
2158 let alice_mgr_clone = alice_mgr_clone.clone();
2160 Box::pin(async move {
2161 alice_mgr_clone
2162 .dispatch_message(
2163 alice_pseudonym,
2164 ApplicationDataIn {
2165 data: data.data,
2166 packet_info: Default::default(),
2167 },
2168 )
2169 .await?;
2170 Ok(())
2171 })
2172 });
2173
2174 let alice_mgr_clone = alice_mgr.clone();
2176 alice_transport
2177 .expect_send_message()
2178 .once()
2179 .in_sequence(&mut sequence)
2180 .withf(move |peer, data| {
2181 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2182 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2183 })
2184 .returning(move |_, data| {
2185 let alice_mgr_clone = alice_mgr_clone.clone();
2186
2187 Box::pin(async move {
2188 alice_mgr_clone
2189 .dispatch_message(
2190 alice_pseudonym,
2191 ApplicationDataIn {
2192 data: data.data,
2193 packet_info: Default::default(),
2194 },
2195 )
2196 .await?;
2197 Ok(())
2198 })
2199 });
2200
2201 let (new_session_tx_alice, new_session_rx_alice) = futures::channel::mpsc::channel(1024);
2203 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2204
2205 let alice_session = alice_mgr
2206 .new_session(
2207 bob_peer,
2208 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2209 SessionClientConfig {
2210 capabilities: None.into(),
2211 pseudonym: alice_pseudonym.into(),
2212 surb_management: None,
2213 ..Default::default()
2214 },
2215 )
2216 .await;
2217
2218 println!("{alice_session:?}");
2219 assert!(matches!(
2220 alice_session,
2221 Err(TransportSessionError::Manager(SessionManagerError::Loopback))
2222 ));
2223
2224 drop(new_session_rx_alice);
2225 Ok(())
2226 }
2227
2228 #[test_log::test(tokio::test)]
2229 async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
2230 let bob_peer: Address = (&ChainKeypair::random()).into();
2231
2232 let cfg = SessionManagerConfig {
2233 initiation_timeout_base: Duration::from_millis(100),
2234 ..Default::default()
2235 };
2236
2237 let alice_mgr = SessionManager::new(cfg, test_tag_allocator());
2238 let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2239
2240 let mut sequence = mockall::Sequence::new();
2241 let mut alice_transport = MockMsgSender::new();
2242 let bob_transport = MockMsgSender::new();
2243
2244 alice_transport
2246 .expect_send_message()
2247 .once()
2248 .in_sequence(&mut sequence)
2249 .withf(move |peer, data| {
2250 msg_type(data, StartProtocolDiscriminants::StartSession)
2251 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2252 })
2253 .returning(|_, _| Box::pin(async { Ok(()) }));
2254
2255 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2257 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2258
2259 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2261 bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
2262
2263 let result = alice_mgr
2264 .new_session(
2265 bob_peer,
2266 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2267 SessionClientConfig {
2268 capabilities: None.into(),
2269 pseudonym: None,
2270 surb_management: None,
2271 ..Default::default()
2272 },
2273 )
2274 .await;
2275
2276 assert!(matches!(result, Err(TransportSessionError::Timeout)));
2277
2278 Ok(())
2279 }
2280
2281 #[cfg(feature = "telemetry")]
2282 #[test_log::test(tokio::test)]
2283 async fn failed_incoming_session_establishment_does_not_register_telemetry() -> anyhow::Result<()> {
2284 let mgr = SessionManager::new(Default::default(), test_tag_allocator());
2285
2286 let transport = MockMsgSender::new();
2287 let (new_session_tx, new_session_rx) = futures::channel::mpsc::channel(1);
2288 drop(new_session_rx);
2289 mgr.start(mock_packet_planning(transport), new_session_tx)?;
2290
2291 let pseudonym = HoprPseudonym::random();
2292 let result = mgr
2293 .handle_incoming_session_initiation(
2294 pseudonym,
2295 StartInitiation {
2296 challenge: MIN_CHALLENGE,
2297 target: SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2298 capabilities: ByteCapabilities(Capabilities::empty()),
2299 additional_data: 0,
2300 },
2301 )
2302 .await;
2303
2304 assert!(result.is_err());
2305
2306 let allocated_session_ids = mgr.active_sessions().await;
2307 assert_eq!(1, allocated_session_ids.len());
2308
2309 Ok(())
2310 }
2311
2312 #[test_log::test(tokio::test)]
2313 async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
2314 let alice_pseudonym = HoprPseudonym::random();
2315 let bob_peer: Address = (&ChainKeypair::random()).into();
2316
2317 let bob_cfg = SessionManagerConfig {
2318 surb_balance_notify_period: Some(Duration::from_millis(500)),
2319 ..Default::default()
2320 };
2321 let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2322 let bob_mgr = SessionManager::new(bob_cfg.clone(), test_tag_allocator());
2323
2324 let mut alice_transport = MockMsgSender::new();
2325 let mut bob_transport = MockMsgSender::new();
2326
2327 let mut open_sequence = mockall::Sequence::new();
2329 let bob_mgr_clone = bob_mgr.clone();
2330 alice_transport
2331 .expect_send_message()
2332 .once()
2333 .in_sequence(&mut open_sequence)
2334 .withf(move |peer, data| {
2335 msg_type(data, StartProtocolDiscriminants::StartSession)
2336 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2337 })
2338 .returning(move |_, data| {
2339 let bob_mgr_clone = bob_mgr_clone.clone();
2340 Box::pin(async move {
2341 bob_mgr_clone
2342 .dispatch_message(
2343 alice_pseudonym,
2344 ApplicationDataIn {
2345 data: data.data,
2346 packet_info: Default::default(),
2347 },
2348 )
2349 .await?;
2350 Ok(())
2351 })
2352 });
2353
2354 let alice_mgr_clone = alice_mgr.clone();
2356 bob_transport
2357 .expect_send_message()
2358 .once()
2359 .in_sequence(&mut open_sequence)
2360 .withf(move |peer, data| {
2361 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2362 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2363 })
2364 .returning(move |_, data| {
2365 let alice_mgr_clone = alice_mgr_clone.clone();
2366 Box::pin(async move {
2367 alice_mgr_clone
2368 .dispatch_message(
2369 alice_pseudonym,
2370 ApplicationDataIn {
2371 data: data.data,
2372 packet_info: Default::default(),
2373 },
2374 )
2375 .await?;
2376 Ok(())
2377 })
2378 });
2379
2380 const INITIAL_BALANCER_TARGET: u64 = 10;
2381
2382 let bob_mgr_clone = bob_mgr.clone();
2384 alice_transport
2385 .expect_send_message()
2386 .times(5..)
2387 .withf(move |peer, data| {
2389 start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerTarget) && ka.additional_data == INITIAL_BALANCER_TARGET))
2390 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2392 })
2393 .returning(move |_, data| {
2394 let bob_mgr_clone = bob_mgr_clone.clone();
2395 Box::pin(async move {
2396 bob_mgr_clone
2397 .dispatch_message(
2398 alice_pseudonym,
2399 ApplicationDataIn {
2400 data: data.data,
2401 packet_info: Default::default(),
2402 },
2403 )
2404 .await?;
2405 Ok(())
2406 })
2407 });
2408
2409 const NEXT_BALANCER_TARGET: u64 = 50;
2410
2411 let bob_mgr_clone = bob_mgr.clone();
2413 alice_transport
2414 .expect_send_message()
2415 .times(5..)
2416 .withf(move |peer, data| {
2418 start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerTarget) && ka.additional_data == NEXT_BALANCER_TARGET))
2419 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2420 })
2421 .returning(move |_, data| {
2422 let bob_mgr_clone = bob_mgr_clone.clone();
2423 Box::pin(async move {
2424 bob_mgr_clone
2425 .dispatch_message(
2426 alice_pseudonym,
2427 ApplicationDataIn {
2428 data: data.data,
2429 packet_info: Default::default(),
2430 },
2431 )
2432 .await?;
2433 Ok(())
2434 })
2435 });
2436
2437 let alice_mgr_clone = alice_mgr.clone();
2439 bob_transport
2440 .expect_send_message()
2441 .times(1..)
2442 .withf(move |peer, data| {
2444 start_msg_match(&data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerState) && ka.additional_data > 0))
2445 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2446 })
2447 .returning(move |_, data| {
2448 let alice_mgr_clone = alice_mgr_clone.clone();
2449 Box::pin(async move {
2450 alice_mgr_clone
2451 .dispatch_message(
2452 alice_pseudonym,
2453 ApplicationDataIn {
2454 data: data.data,
2455 packet_info: Default::default(),
2456 },
2457 )
2458 .await?;
2459 Ok(())
2460 })
2461 });
2462
2463 let bob_mgr_clone = bob_mgr.clone();
2465 alice_transport
2466 .expect_send_message()
2467 .once()
2468 .withf(move |peer, data| {
2470 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2471 data.data.plain_text.as_ref(),
2472 )
2473 .ok()
2474 .and_then(|m| m.try_as_segment())
2475 .map(|s| s.is_terminating())
2476 .unwrap_or(false)
2477 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2478 })
2479 .returning(move |_, data| {
2480 let bob_mgr_clone = bob_mgr_clone.clone();
2481 Box::pin(async move {
2482 bob_mgr_clone
2483 .dispatch_message(
2484 alice_pseudonym,
2485 ApplicationDataIn {
2486 data: data.data,
2487 packet_info: Default::default(),
2488 },
2489 )
2490 .await?;
2491 Ok(())
2492 })
2493 });
2494
2495 let mut ahs = Vec::new();
2496
2497 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2499 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2500
2501 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
2503 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2504
2505 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2506
2507 let balancer_cfg = SurbBalancerConfig {
2508 target_surb_buffer_size: INITIAL_BALANCER_TARGET,
2509 max_surbs_per_sec: 100,
2510 ..Default::default()
2511 };
2512
2513 pin_mut!(new_session_rx_bob);
2514 let (alice_session, bob_session) = timeout(
2515 Duration::from_secs(2),
2516 futures::future::join(
2517 alice_mgr.new_session(
2518 bob_peer,
2519 SessionTarget::TcpStream(target.clone()),
2520 SessionClientConfig {
2521 pseudonym: alice_pseudonym.into(),
2522 capabilities: Capability::Segmentation.into(),
2523 surb_management: Some(balancer_cfg),
2524 ..Default::default()
2525 },
2526 ),
2527 new_session_rx_bob.next(),
2528 ),
2529 )
2530 .await?;
2531
2532 let mut alice_session = alice_session?;
2533 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2534
2535 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2536
2537 assert_eq!(
2538 Some(balancer_cfg),
2539 alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2540 );
2541
2542 let remote_cfg = bob_mgr
2543 .get_surb_balancer_config(bob_session.session.id())
2544 .await?
2545 .ok_or(anyhow!("no remote config at bob"))?;
2546 assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2547 assert_eq!(
2548 remote_cfg.max_surbs_per_sec,
2549 remote_cfg.target_surb_buffer_size
2550 / bob_cfg
2551 .minimum_surb_buffer_duration
2552 .max(MIN_SURB_BUFFER_DURATION)
2553 .as_secs()
2554 );
2555
2556 tokio::time::sleep(Duration::from_millis(1500)).await;
2558
2559 let new_balancer_cfg = SurbBalancerConfig {
2560 target_surb_buffer_size: NEXT_BALANCER_TARGET,
2561 max_surbs_per_sec: 100,
2562 ..Default::default()
2563 };
2564
2565 alice_mgr
2567 .update_surb_balancer_config(alice_session.id(), new_balancer_cfg)
2568 .await?;
2569
2570 tokio::time::sleep(Duration::from_millis(1500)).await;
2572
2573 let remote_cfg = bob_mgr
2575 .get_surb_balancer_config(bob_session.session.id())
2576 .await?
2577 .ok_or(anyhow!("no remote config at bob"))?;
2578 assert_eq!(
2579 remote_cfg.target_surb_buffer_size,
2580 new_balancer_cfg.target_surb_buffer_size
2581 );
2582 assert_eq!(
2583 remote_cfg.max_surbs_per_sec,
2584 new_balancer_cfg.target_surb_buffer_size / bob_cfg.minimum_surb_buffer_duration.as_secs()
2585 );
2586
2587 let (alice_surb_sent, alice_surb_used) = alice_mgr.get_surb_level_estimates(alice_session.id()).await?;
2588 let (bob_surb_recv, bob_surb_used) = bob_mgr.get_surb_level_estimates(bob_session.session.id()).await?;
2589
2590 alice_session.close().await?;
2591
2592 assert!(alice_surb_sent > 0, "alice must've sent surbs");
2593 assert!(bob_surb_recv > 0, "bob must've received surbs");
2594 assert!(
2595 bob_surb_recv <= alice_surb_sent,
2596 "bob cannot receive more surbs than alice sent"
2597 );
2598
2599 assert!(alice_surb_used > 0, "alice must see bob used surbs");
2600 assert!(bob_surb_used > 0, "bob must've used surbs");
2601 assert!(
2602 alice_surb_used <= bob_surb_used,
2603 "alice cannot see bob used more surbs than bob actually used"
2604 );
2605
2606 tokio::time::sleep(Duration::from_millis(300)).await;
2607 assert!(matches!(
2608 alice_mgr.ping_session(alice_session.id()).await,
2609 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2610 ));
2611
2612 futures::stream::iter(ahs)
2613 .for_each(|ah| async move { ah.abort() })
2614 .await;
2615
2616 Ok(())
2617 }
2618}