1use std::{
2 ops::Range,
3 pin::Pin,
4 sync::{Arc, OnceLock},
5 time::Duration,
6};
7
8use anyhow::anyhow;
9use futures::{
10 FutureExt, Sink, SinkExt, StreamExt, TryStreamExt,
11 channel::mpsc::{Sender, UnboundedSender},
12 future::AbortHandle,
13 pin_mut,
14};
15use futures_time::future::FutureExt as TimeExt;
16use hopr_crypto_packet::prelude::HoprPacket;
17use hopr_protocol_app::prelude::*;
18use hopr_protocol_start::{
19 KeepAliveFlag, KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished,
20 StartInitiation,
21};
22use hopr_transport_tag_allocator::{AllocatedTag, TagAllocator};
23use hopr_api::types::{
24 crypto_random::Randomizable,
25 internal::{
26 prelude::HoprPseudonym,
27 routing::{DestinationRouting, RoutingOptions},
28 },
29 primitive::prelude::Address,
30};
31use hopr_utils::runtime::AbortableList;
32use tracing::{debug, error, info, trace, warn};
33
34#[cfg(feature = "telemetry")]
35use crate::telemetry::{
36 SessionLifecycleState, initialize_session_metrics, remove_session_metrics_state, set_session_balancer_data,
37 set_session_state,
38};
39use crate::{
40 Capability, HoprSession, IncomingSession, SESSION_MTU, SessionClientConfig, SessionId, SessionTarget,
41 SurbBalancerConfig,
42 balancer::{
43 AtomicSurbFlowEstimator, BalancerStateValues, RateController, RateLimitSinkExt, SurbBalancer,
44 SurbControllerWithCorrection,
45 pid::{PidBalancerController, PidControllerGains},
46 simple::SimpleBalancerController,
47 },
48 errors::{SessionManagerError, TransportSessionError},
49 types::{ByteCapabilities, ClosureReason, HoprSessionConfig, HoprStartProtocol},
50 utils,
51 utils::{SurbNotificationMode, insert_into_next_slot},
52};
53
54#[cfg(all(feature = "telemetry", not(test)))]
55lazy_static::lazy_static! {
56 static ref METRIC_ACTIVE_SESSIONS: hopr_api::types::telemetry::SimpleGauge = hopr_api::types::telemetry::SimpleGauge::new(
57 "hopr_session_num_active_sessions",
58 "Number of currently active HOPR sessions"
59 ).unwrap();
60 static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_api::types::telemetry::SimpleCounter = hopr_api::types::telemetry::SimpleCounter::new(
61 "hopr_session_established_sessions_count",
62 "Number of sessions that were successfully established as an Exit node"
63 ).unwrap();
64 static ref METRIC_NUM_INITIATED_SESSIONS: hopr_api::types::telemetry::SimpleCounter = hopr_api::types::telemetry::SimpleCounter::new(
65 "hopr_session_initiated_sessions_count",
66 "Number of sessions that were successfully initiated as an Entry node"
67 ).unwrap();
68 static ref METRIC_RECEIVED_SESSION_ERRS: hopr_api::types::telemetry::MultiCounter = hopr_api::types::telemetry::MultiCounter::new(
69 "hopr_session_received_error_count",
70 "Number of HOPR session errors received from an Exit node",
71 &["kind"]
72 ).unwrap();
73 static ref METRIC_SENT_SESSION_ERRS: hopr_api::types::telemetry::MultiCounter = hopr_api::types::telemetry::MultiCounter::new(
74 "hopr_session_sent_error_count",
75 "Number of HOPR session errors sent to an Entry node",
76 &["kind"]
77 ).unwrap();
78}
79
80fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
81 debug!(?session_id, ?reason, "closing session");
82
83 #[cfg(feature = "telemetry")]
84 {
85 set_session_state(&session_id, SessionLifecycleState::Closed);
86 remove_session_metrics_state(&session_id);
87 }
88
89 if reason != ClosureReason::EmptyRead {
90 session_data.session_tx.close_channel();
92 trace!(?session_id, "data tx channel closed on session");
93 }
94
95 session_data.abort_handles.lock().abort_all();
97
98 #[cfg(all(feature = "telemetry", not(test)))]
99 METRIC_ACTIVE_SESSIONS.decrement(1.0);
100}
101
102fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
103 base * (hops as u32)
104}
105
106pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
108pub const MIN_SURB_BUFFER_NOTIFICATION_PERIOD: Duration = Duration::from_secs(1);
110
111pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
113
114const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
116
117const MIN_FRAME_TIMEOUT: Duration = Duration::from_millis(10);
119
120type SessionInitiationCache =
124 moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
125
126#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
127enum SessionTasks {
128 KeepAlive,
129 Balancer,
130}
131
132#[derive(Clone)]
133struct SessionSlot {
134 session_tx: Arc<UnboundedSender<ApplicationDataIn>>,
137 routing_opts: DestinationRouting,
138 abort_handles: Arc<parking_lot::Mutex<AbortableList<SessionTasks>>>,
140 surb_mgmt: Arc<BalancerStateValues>,
143 surb_estimator: AtomicSurbFlowEstimator,
146 #[allow(dead_code, reason = "kept alive for Drop-based tag deallocation")]
149 allocated_tag: Option<Arc<AllocatedTag>>,
150}
151
152#[derive(Clone, Debug, PartialEq, Eq)]
154pub enum DispatchResult {
155 Processed,
157 Unrelated(ApplicationDataIn),
159}
160
161#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
163pub struct SessionManagerConfig {
164 #[default(1500)]
168 pub frame_mtu: usize,
169
170 #[default(Duration::from_millis(800))]
174 pub max_frame_timeout: Duration,
175
176 #[default(Duration::from_millis(500))]
183 pub initiation_timeout_base: Duration,
184
185 #[default(Duration::from_secs(180))]
189 pub idle_timeout: Duration,
190
191 #[default(Duration::from_millis(100))]
196 pub balancer_sampling_interval: Duration,
197
198 #[default(10)]
204 pub initial_return_session_egress_rate: usize,
205
206 #[default(Duration::from_secs(5))]
217 pub minimum_surb_buffer_duration: Duration,
218
219 #[default(10_000)]
227 pub maximum_surb_buffer_size: usize,
228
229 #[default(None)]
237 pub surb_balance_notify_period: Option<Duration>,
238
239 #[default(true)]
246 pub surb_target_notify: bool,
247}
248
249type IncomingSessionSink = Pin<Box<dyn Sink<IncomingSession, Error = SessionManagerError> + Send>>;
252
253type SessionNotifiers = (
254 Arc<hopr_utils::runtime::prelude::Mutex<IncomingSessionSink>>,
255 Sender<(SessionId, ClosureReason)>,
256);
257
258pub struct SessionManager<S> {
399 session_initiations: SessionInitiationCache,
400 session_notifiers: Arc<OnceLock<SessionNotifiers>>,
401 sessions: moka::future::Cache<SessionId, SessionSlot>,
402 msg_sender: Arc<OnceLock<S>>,
403 tag_allocator: Arc<dyn TagAllocator + Send + Sync>,
404 session_tag_range: Range<u64>,
406 maximum_sessions: usize,
408 cfg: SessionManagerConfig,
409}
410
411impl<S> Clone for SessionManager<S> {
412 fn clone(&self) -> Self {
413 Self {
414 session_initiations: self.session_initiations.clone(),
415 session_notifiers: self.session_notifiers.clone(),
416 sessions: self.sessions.clone(),
417 cfg: self.cfg.clone(),
418 msg_sender: self.msg_sender.clone(),
419 tag_allocator: self.tag_allocator.clone(),
420 session_tag_range: self.session_tag_range.clone(),
421 maximum_sessions: self.maximum_sessions,
422 }
423 }
424}
425
426const EXTERNAL_SEND_TIMEOUT: Duration = Duration::from_millis(200);
427
428impl<S> SessionManager<S>
429where
430 S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
431 S::Error: std::error::Error + Send + Sync + Clone + 'static,
432{
433 pub fn new(mut cfg: SessionManagerConfig, tag_allocator: Arc<dyn TagAllocator + Send + Sync>) -> Self {
436 let session_tag_range = tag_allocator.tag_range();
437 let maximum_sessions = tag_allocator.capacity().max(1) as usize;
438
439 debug_assert!(
440 session_tag_range.start >= ReservedTag::range().end,
441 "session tag range must not overlap with reserved tags"
442 );
443 cfg.surb_balance_notify_period = cfg
444 .surb_balance_notify_period
445 .map(|p| p.max(MIN_SURB_BUFFER_NOTIFICATION_PERIOD));
446 cfg.minimum_surb_buffer_duration = cfg.minimum_surb_buffer_duration.max(MIN_SURB_BUFFER_DURATION);
447
448 cfg.frame_mtu = cfg.frame_mtu.max(SESSION_MTU);
450 cfg.max_frame_timeout = cfg.max_frame_timeout.max(MIN_FRAME_TIMEOUT);
451
452 #[cfg(all(feature = "telemetry", not(test)))]
453 METRIC_ACTIVE_SESSIONS.set(0.0);
454
455 let msg_sender = Arc::new(OnceLock::new());
456 Self {
457 msg_sender: msg_sender.clone(),
458 session_initiations: moka::future::Cache::builder()
459 .max_capacity(maximum_sessions as u64)
460 .time_to_live(
461 2 * initiation_timeout_max_one_way(
462 cfg.initiation_timeout_base,
463 RoutingOptions::MAX_INTERMEDIATE_HOPS,
464 ),
465 )
466 .build(),
467 sessions: moka::future::Cache::builder()
468 .max_capacity(maximum_sessions as u64)
469 .time_to_idle(cfg.idle_timeout)
470 .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
471 moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
472 trace!(?session_id, ?reason, "session evicted from the cache");
473 close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
474 }
475 _ => {}
476 })
477 .build(),
478 session_notifiers: Arc::new(OnceLock::new()),
479 tag_allocator,
480 session_tag_range,
481 maximum_sessions,
482 cfg,
483 }
484 }
485
486 pub fn start<T>(&self, msg_sender: S, new_session_notifier: T) -> crate::errors::Result<Vec<AbortHandle>>
492 where
493 T: futures::Sink<IncomingSession> + Send + 'static,
494 T::Error: std::error::Error + Send + Sync + 'static,
495 {
496 self.msg_sender
497 .set(msg_sender)
498 .map_err(|_| SessionManagerError::AlreadyStarted)?;
499
500 let new_session_notifier: IncomingSessionSink =
505 Box::pin(new_session_notifier.sink_map_err(SessionManagerError::other));
506 let new_session_notifier = Arc::new(hopr_utils::runtime::prelude::Mutex::new(new_session_notifier));
507
508 let (session_close_tx, session_close_rx) = futures::channel::mpsc::channel(self.maximum_sessions + 10);
509 self.session_notifiers
510 .set((new_session_notifier, session_close_tx))
511 .map_err(|_| SessionManagerError::AlreadyStarted)?;
512
513 let myself = self.clone();
514 let closure_diag = hopr_utils::runtime::diagnostics::ConcurrentDiagnostics::new(
515 "session_close_for_each_concurrent",
516 module_path!(),
517 file!(),
518 line!(),
519 );
520 let ah_closure_notifications = hopr_utils::spawn_as_abortable_named!(
521 "session_close_notifications",
522 session_close_rx.for_each_concurrent(None, move |(session_id, closure_reason)| {
523 let myself = myself.clone();
524 let closure_diag = closure_diag.clone();
525 closure_diag.wrap(async move {
526 if let Some(session_data) = myself.sessions.remove(&session_id).await {
530 close_session(session_id, session_data, closure_reason);
531 } else {
532 debug!(
534 ?session_id,
535 ?closure_reason,
536 "could not find session id to close, maybe the session is already closed"
537 );
538 }
539 })
540 },)
541 );
542
543 let myself = self.clone();
548 let ah_session_expiration = hopr_utils::spawn_as_abortable!(async move {
549 let jitter = hopr_api::types::crypto_random::random_float_in_range(1.0..1.5);
550 let timeout = 2 * initiation_timeout_max_one_way(
551 myself.cfg.initiation_timeout_base,
552 RoutingOptions::MAX_INTERMEDIATE_HOPS,
553 )
554 .min(myself.cfg.idle_timeout)
555 .mul_f64(jitter)
556 / 2;
557 futures_time::stream::interval(timeout.into())
558 .for_each(|_| {
559 trace!("executing session cache evictions");
560 futures::future::join(
561 myself.sessions.run_pending_tasks(),
562 myself.session_initiations.run_pending_tasks(),
563 )
564 .map(|_| ())
565 })
566 .await;
567 });
568
569 Ok(vec![ah_closure_notifications, ah_session_expiration])
570 }
571
572 pub fn is_started(&self) -> bool {
574 self.session_notifiers.get().is_some()
575 }
576
577 async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
578 if let moka::ops::compute::CompResult::Inserted(_) = self
580 .sessions
581 .entry(session_id)
582 .and_compute_with(|entry| {
583 futures::future::ready(if entry.is_none() {
584 moka::ops::compute::Op::Put(slot)
585 } else {
586 moka::ops::compute::Op::Nop
587 })
588 })
589 .await
590 {
591 #[cfg(all(feature = "telemetry", not(test)))]
592 {
593 METRIC_NUM_INITIATED_SESSIONS.increment();
594 METRIC_ACTIVE_SESSIONS.increment(1.0);
595 }
596
597 Ok(())
598 } else {
599 error!(%session_id, "session already exists - loopback attempt");
601 Err(SessionManagerError::Loopback.into())
602 }
603 }
604
605 pub async fn new_session(
613 &self,
614 destination: Address,
615 target: SessionTarget,
616 cfg: SessionClientConfig,
617 ) -> crate::errors::Result<HoprSession> {
618 self.sessions.run_pending_tasks().await;
619 if self.maximum_sessions <= self.sessions.entry_count() as usize {
620 return Err(SessionManagerError::TooManySessions.into());
621 }
622
623 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
624
625 let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
626 let (challenge, _) = insert_into_next_slot(
627 &self.session_initiations,
628 |ch| {
629 if let Some(challenge) = ch {
630 ((challenge + 1) % hopr_api::types::crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
631 } else {
632 hopr_api::types::crypto_random::random_integer(MIN_CHALLENGE, None)
633 }
634 },
635 |_| tx_initiation_done,
636 )
637 .await
638 .ok_or(SessionManagerError::NoChallengeSlots)?; trace!(challenge, ?cfg, "initiating session with config");
642 let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
643 challenge,
644 target,
645 capabilities: ByteCapabilities(cfg.capabilities),
646 additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
647 cfg.surb_management
648 .map(|c| c.target_surb_buffer_size)
649 .unwrap_or(
650 self.cfg.initial_return_session_egress_rate as u64
651 * self
652 .cfg
653 .minimum_surb_buffer_duration
654 .max(MIN_SURB_BUFFER_DURATION)
655 .as_secs(),
656 )
657 .min(u32::MAX as u64) as u32
658 } else {
659 0
660 },
661 });
662
663 let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
664 let forward_routing = DestinationRouting::Forward {
665 destination: Box::new(destination.into()),
666 pseudonym: Some(pseudonym), forward_options: cfg.forward_path_options.clone(),
668 return_options: cfg.return_path_options.clone().into(),
669 };
670
671 info!(challenge, %pseudonym, %destination, "new session request");
673 msg_sender
674 .send((
675 forward_routing.clone(),
676 ApplicationDataOut::with_no_packet_info(start_session_msg.try_into()?),
677 ))
678 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
679 .await
680 .map_err(|_| {
681 error!(challenge, %pseudonym, %destination, "timeout sending session request message");
682 TransportSessionError::Timeout
683 })?
684 .map_err(TransportSessionError::packet_sending)?;
685
686 let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
688 self.cfg.initiation_timeout_base,
689 cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
690 )
691 .into();
692
693 pin_mut!(rx_initiation_done);
695
696 trace!(challenge, "awaiting session establishment");
697 match rx_initiation_done.try_next().timeout(initiation_timeout).await {
698 Ok(Ok(Some(est))) => {
699 let session_id = est.session_id;
701 debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
702
703 let (tx, rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
704 let notifier = self
705 .session_notifiers
706 .get()
707 .map(|(_, notifier)| {
708 let mut notifier = notifier.clone();
709 Box::new(move |session_id: SessionId, reason: ClosureReason| {
710 let _ = notifier
711 .try_send((session_id, reason))
712 .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
713 })
714 })
715 .ok_or(SessionManagerError::NotStarted)?;
716
717 if let Some(balancer_config) = cfg.surb_management {
721 let surb_estimator = AtomicSurbFlowEstimator::default();
722
723 let surb_estimator_clone = surb_estimator.clone();
725 let full_surb_scoring_sender =
726 msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
727 let produced = data.estimate_surbs_with_msg() as u64;
728 surb_estimator_clone
730 .produced
731 .fetch_add(produced, std::sync::atomic::Ordering::Relaxed);
732 #[cfg(feature = "telemetry")]
733 crate::telemetry::record_session_surb_produced(&session_id, produced);
734 futures::future::ok::<_, S::Error>((routing, data))
735 });
736
737 let max_out_organic_surbs = cfg.always_max_out_surbs;
740 let reduced_surb_scoring_sender = full_surb_scoring_sender.clone().with(
741 move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
745 if !max_out_organic_surbs {
746 data.packet_info
748 .get_or_insert_with(|| OutgoingPacketInfo {
749 max_surbs_in_packet: 1,
750 ..Default::default()
751 })
752 .max_surbs_in_packet = 1;
753 }
754 futures::future::ok::<_, S::Error>((routing, data))
755 },
756 );
757
758 let mut abort_handles = AbortableList::default();
759 let surb_mgmt = Arc::new(BalancerStateValues::from(balancer_config));
760
761 let (ka_controller, ka_abort_handle) = utils::spawn_keep_alive_stream(
763 session_id,
764 full_surb_scoring_sender,
765 forward_routing.clone(),
766 if self.cfg.surb_target_notify {
767 SurbNotificationMode::Target
768 } else {
769 SurbNotificationMode::DoNotNotify
770 },
771 surb_mgmt.clone(),
772 );
773 abort_handles.insert(SessionTasks::KeepAlive, ka_abort_handle);
774
775 debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
777 let balancer = SurbBalancer::new(
778 session_id,
779 PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
781 surb_estimator.clone(),
782 SurbControllerWithCorrection(ka_controller, HoprPacket::MAX_SURBS_IN_PACKET as u32),
785 surb_mgmt.clone(),
786 );
787
788 let (level_stream, balancer_abort_handle) =
789 balancer.start_control_loop(self.cfg.balancer_sampling_interval);
790 abort_handles.insert(SessionTasks::Balancer, balancer_abort_handle);
791
792 self.insert_session_slot(
794 session_id,
795 SessionSlot {
796 session_tx: Arc::new(tx),
797 routing_opts: forward_routing.clone(),
798 abort_handles: Arc::new(parking_lot::Mutex::new(abort_handles)),
799 surb_mgmt: surb_mgmt.clone(),
800 surb_estimator: surb_estimator.clone(),
801 allocated_tag: None,
802 },
803 )
804 .await?;
805
806 match level_stream
809 .skip_while(|current_level| {
810 futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
811 })
812 .next()
813 .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
814 .await
815 {
816 Ok(Some(surb_level)) => {
817 info!(%session_id, surb_level, "session is ready");
818 }
819 Ok(None) => {
820 return Err(
821 SessionManagerError::other(anyhow!("surb balancer was cancelled prematurely")).into(),
822 );
823 }
824 Err(_) => {
825 warn!(%session_id, "session didn't reach target SURB buffer size in time");
826 }
827 }
828
829 let surb_estimator_for_rx = surb_estimator.clone();
830 let session = HoprSession::new(
831 session_id,
832 forward_routing,
833 HoprSessionConfig {
834 capabilities: cfg.capabilities,
835 frame_mtu: self.cfg.frame_mtu,
836 frame_timeout: self.cfg.max_frame_timeout,
837 },
838 (
839 reduced_surb_scoring_sender,
840 rx.inspect(move |_| {
841 surb_estimator_for_rx
844 .consumed
845 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
846 #[cfg(feature = "telemetry")]
847 crate::telemetry::record_session_surb_consumed(&session_id, 1);
848 }),
849 ),
850 Some(notifier),
851 )?;
852
853 #[cfg(feature = "telemetry")]
854 {
855 initialize_session_metrics(
856 session_id,
857 HoprSessionConfig {
858 capabilities: cfg.capabilities,
859 frame_mtu: self.cfg.frame_mtu,
860 frame_timeout: self.cfg.max_frame_timeout,
861 },
862 );
863 set_session_state(&session_id, SessionLifecycleState::Active);
864 set_session_balancer_data(&session_id, surb_estimator.clone(), surb_mgmt.clone());
865 }
866
867 Ok(session)
868 } else {
869 warn!(%session_id, "session ready without SURB balancing");
870
871 self.insert_session_slot(
872 session_id,
873 SessionSlot {
874 session_tx: Arc::new(tx),
875 routing_opts: forward_routing.clone(),
876 abort_handles: Default::default(),
877 surb_mgmt: Default::default(), surb_estimator: Default::default(), allocated_tag: None,
880 },
881 )
882 .await?;
883
884 let max_out_organic_surbs = cfg.always_max_out_surbs;
887 let reduced_surb_sender =
888 msg_sender.with(move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
889 if !max_out_organic_surbs {
890 data.packet_info
891 .get_or_insert_with(|| OutgoingPacketInfo {
892 max_surbs_in_packet: 1,
893 ..Default::default()
894 })
895 .max_surbs_in_packet = 1;
896 }
897 futures::future::ok::<_, S::Error>((routing, data))
898 });
899
900 let session = HoprSession::new(
901 session_id,
902 forward_routing,
903 HoprSessionConfig {
904 capabilities: cfg.capabilities,
905 frame_mtu: self.cfg.frame_mtu,
906 frame_timeout: self.cfg.max_frame_timeout,
907 },
908 (reduced_surb_sender, rx),
909 Some(notifier),
910 )?;
911
912 #[cfg(feature = "telemetry")]
913 {
914 initialize_session_metrics(
915 session_id,
916 HoprSessionConfig {
917 capabilities: cfg.capabilities,
918 frame_mtu: self.cfg.frame_mtu,
919 frame_timeout: self.cfg.max_frame_timeout,
920 },
921 );
922 set_session_state(&session_id, SessionLifecycleState::Active);
923 }
924
925 Ok(session)
926 }
927 }
928 Ok(Ok(None)) => Err(SessionManagerError::other(anyhow!(
929 "internal error: sender has been closed without completing the session establishment"
930 ))
931 .into()),
932 Ok(Err(error)) => {
933 error!(
935 challenge = error.challenge,
936 ?error,
937 "the other party rejected the session initiation with error"
938 );
939 Err(TransportSessionError::Rejected(error.reason))
940 }
941 Err(_) => {
942 error!(challenge, "session initiation attempt timed out");
944
945 #[cfg(all(feature = "telemetry", not(test)))]
946 METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
947
948 Err(TransportSessionError::Timeout)
949 }
950 }
951 }
952
953 pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
957 if let Some(session_data) = self.sessions.get(id).await {
958 trace!(session_id = ?id, "pinging manually session");
959 Ok(self
960 .msg_sender
961 .get()
962 .cloned()
963 .ok_or(SessionManagerError::NotStarted)?
964 .send((
965 session_data.routing_opts.clone(),
966 ApplicationDataOut::with_no_packet_info(HoprStartProtocol::KeepAlive((*id).into()).try_into()?),
967 ))
968 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
969 .await
970 .map_err(|_| {
971 error!("timeout sending session ping message");
972 TransportSessionError::Timeout
973 })?
974 .map_err(TransportSessionError::packet_sending)?)
975 } else {
976 Err(SessionManagerError::NonExistingSession.into())
977 }
978 }
979
980 pub async fn active_sessions(&self) -> Vec<SessionId> {
982 self.sessions.run_pending_tasks().await;
983 self.sessions.iter().map(|(k, _)| *k).collect()
984 }
985
986 pub async fn close_session(&self, id: &SessionId) -> bool {
996 if let Some(slot) = self.sessions.remove(id).await {
997 close_session(*id, slot, ClosureReason::Eviction);
998 true
999 } else {
1000 false
1001 }
1002 }
1003
1004 pub async fn update_surb_balancer_config(
1009 &self,
1010 id: &SessionId,
1011 config: SurbBalancerConfig,
1012 ) -> crate::errors::Result<()> {
1013 let cfg = self
1014 .sessions
1015 .get(id)
1016 .await
1017 .ok_or(SessionManagerError::NonExistingSession)?
1018 .surb_mgmt;
1019
1020 if !cfg.is_disabled() {
1022 cfg.update(&config);
1023 Ok(())
1024 } else {
1025 Err(SessionManagerError::other(anyhow!("session does not use SURB balancing")).into())
1026 }
1027 }
1028
1029 pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
1033 match self.sessions.get(id).await {
1034 Some(session) => Ok(Some(session.surb_mgmt.as_ref())
1035 .filter(|c| !c.is_disabled())
1036 .map(|d| d.as_config())),
1037 None => Err(SessionManagerError::NonExistingSession.into()),
1038 }
1039 }
1040
1041 pub async fn get_surb_level_estimates(&self, id: &SessionId) -> crate::errors::Result<(u64, u64)> {
1048 match self.sessions.get(id).await {
1049 Some(session) => Ok((
1050 session
1051 .surb_estimator
1052 .produced
1053 .load(std::sync::atomic::Ordering::Relaxed),
1054 session
1055 .surb_estimator
1056 .consumed
1057 .load(std::sync::atomic::Ordering::Relaxed),
1058 )),
1059 None => Err(SessionManagerError::NonExistingSession.into()),
1060 }
1061 }
1062
1063 pub async fn dispatch_message(
1070 &self,
1071 pseudonym: HoprPseudonym,
1072 in_data: ApplicationDataIn,
1073 ) -> crate::errors::Result<DispatchResult> {
1074 if in_data.data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
1075 trace!("dispatching Start protocol message");
1077 return self
1078 .handle_start_protocol_message(pseudonym, in_data)
1079 .await
1080 .map(|_| DispatchResult::Processed);
1081 } else if self.session_tag_range.contains(&in_data.data.application_tag.as_u64()) {
1082 let session_id = SessionId::new(in_data.data.application_tag, pseudonym);
1083
1084 return if let Some(session_slot) = self.sessions.get(&session_id).await {
1085 trace!(?session_id, "received data for a registered session");
1086
1087 Ok(session_slot
1088 .session_tx
1089 .unbounded_send(in_data)
1090 .map(|_| DispatchResult::Processed)
1091 .map_err(SessionManagerError::other)?)
1092 } else {
1093 error!(%session_id, "received data from an unestablished session");
1094 Err(TransportSessionError::UnknownData)
1095 };
1096 }
1097
1098 trace!(tag = %in_data.data.application_tag, "received data not associated with session protocol or any existing session");
1099 Ok(DispatchResult::Unrelated(in_data))
1100 }
1101
1102 async fn handle_incoming_session_initiation(
1103 &self,
1104 pseudonym: HoprPseudonym,
1105 session_req: StartInitiation<SessionTarget, ByteCapabilities>,
1106 ) -> crate::errors::Result<()> {
1107 trace!(challenge = session_req.challenge, "received session initiation request");
1108
1109 debug!(%pseudonym, "got new session request, searching for a free session slot");
1110
1111 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
1112
1113 let (new_session_notifier, mut close_session_notifier) = self
1114 .session_notifiers
1115 .get()
1116 .cloned()
1117 .ok_or(SessionManagerError::NotStarted)?;
1118
1119 let reply_routing = DestinationRouting::Return(pseudonym.into());
1121
1122 let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1123
1124 self.sessions.run_pending_tasks().await; let allocated_tag = if self.maximum_sessions > self.sessions.entry_count() as usize {
1127 self.tag_allocator.allocate()
1128 } else {
1129 error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
1130 None
1131 };
1132
1133 if let Some(allocated_tag) = allocated_tag {
1134 let session_id = SessionId::new(allocated_tag.value(), pseudonym);
1135 let allocated_tag = Arc::new(allocated_tag);
1136
1137 let slot = SessionSlot {
1138 session_tx: Arc::new(tx_session_data),
1139 routing_opts: reply_routing.clone(),
1140 abort_handles: Default::default(),
1141 surb_mgmt: Default::default(),
1142 surb_estimator: Default::default(),
1143 allocated_tag: Some(allocated_tag),
1144 };
1145 self.sessions.insert(session_id, slot.clone()).await;
1146
1147 debug!(%session_id, ?session_req, "assigned a new session");
1148
1149 let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
1150 if let Err(error) = close_session_notifier.try_send((session_id, reason)) {
1151 error!(%session_id, %error, %reason, "failed to notify session closure");
1152 }
1153 });
1154
1155 let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
1156 let egress_rate_control =
1158 RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1159
1160 let target_surb_buffer_size = if session_req.additional_data > 0 {
1163 (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1164 } else {
1165 self.cfg.initial_return_session_egress_rate as u64
1166 * self
1167 .cfg
1168 .minimum_surb_buffer_duration
1169 .max(MIN_SURB_BUFFER_DURATION)
1170 .as_secs()
1171 };
1172
1173 let surb_estimator_clone = slot.surb_estimator.clone();
1174 let session = HoprSession::new(
1175 session_id,
1176 reply_routing.clone(),
1177 HoprSessionConfig {
1178 capabilities: session_req.capabilities.into(),
1179 frame_mtu: self.cfg.frame_mtu,
1180 frame_timeout: self.cfg.max_frame_timeout,
1181 },
1182 (
1183 msg_sender
1185 .clone()
1186 .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1187 surb_estimator_clone
1189 .consumed
1190 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1191 #[cfg(feature = "telemetry")]
1192 crate::telemetry::record_session_surb_consumed(&session_id, 1);
1193 futures::future::ok::<_, S::Error>((routing, data))
1194 })
1195 .rate_limit_with_controller(&egress_rate_control)
1196 .buffer((2 * target_surb_buffer_size) as usize),
1197 rx_session_data.inspect(move |data| {
1199 let produced = data.num_surbs_with_msg() as u64;
1200 surb_estimator_clone
1202 .produced
1203 .fetch_add(produced, std::sync::atomic::Ordering::Relaxed);
1204 #[cfg(feature = "telemetry")]
1205 crate::telemetry::record_session_surb_produced(&session_id, produced);
1206 }),
1207 ),
1208 Some(closure_notifier),
1209 )?;
1210
1211 let balancer_config = SurbBalancerConfig {
1215 target_surb_buffer_size,
1216 max_surbs_per_sec: target_surb_buffer_size / self.cfg.minimum_surb_buffer_duration.as_secs(),
1218 surb_decay: None,
1221 };
1222
1223 slot.surb_mgmt.update(&balancer_config);
1224
1225 debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1228 let balancer = SurbBalancer::new(
1229 session_id,
1230 SimpleBalancerController::default(),
1231 slot.surb_estimator.clone(),
1232 SurbControllerWithCorrection(egress_rate_control, 1), slot.surb_mgmt.clone(),
1234 );
1235
1236 let (_, balancer_abort_handle) = balancer.start_control_loop(self.cfg.balancer_sampling_interval);
1238 slot.abort_handles
1239 .lock()
1240 .insert(SessionTasks::Balancer, balancer_abort_handle);
1241
1242 if let Some(period) = self.cfg.surb_balance_notify_period {
1244 let surb_estimator_clone = slot.surb_estimator.clone();
1245 let (ka_controller, ka_abort_handle) = utils::spawn_keep_alive_stream(
1246 session_id,
1247 msg_sender
1249 .clone()
1250 .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1251 surb_estimator_clone
1253 .consumed
1254 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1255 #[cfg(feature = "telemetry")]
1256 crate::telemetry::record_session_surb_consumed(&session_id, 1);
1257 futures::future::ok::<_, S::Error>((routing, data))
1258 }),
1259 slot.routing_opts.clone(),
1260 SurbNotificationMode::Level(slot.surb_estimator.clone()),
1261 slot.surb_mgmt.clone(),
1262 );
1263
1264 hopr_utils::runtime::prelude::spawn(async move {
1266 hopr_utils::runtime::prelude::sleep(period).await;
1268 ka_controller.set_rate_per_unit(1, period);
1269 });
1270
1271 slot.abort_handles
1272 .lock()
1273 .insert(SessionTasks::KeepAlive, ka_abort_handle);
1274
1275 debug!(%session_id, ?period, "started SURB level-notifying keep-alive stream");
1276 }
1277
1278 session
1279 } else {
1280 HoprSession::new(
1281 session_id,
1282 reply_routing.clone(),
1283 HoprSessionConfig {
1284 capabilities: session_req.capabilities.into(),
1285 frame_mtu: self.cfg.frame_mtu,
1286 frame_timeout: self.cfg.max_frame_timeout,
1287 },
1288 (msg_sender.clone(), rx_session_data),
1289 Some(closure_notifier),
1290 )?
1291 };
1292
1293 let incoming_session = IncomingSession {
1295 session,
1296 target: session_req.target,
1297 };
1298
1299 match async {
1302 let mut guard = new_session_notifier.lock().await;
1303 guard.send(incoming_session).await
1304 }
1305 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1306 .await
1307 {
1308 Err(_) => {
1309 error!(%session_id, "timeout to notify about new incoming session");
1310 return Err(TransportSessionError::Timeout);
1311 }
1312 Ok(Err(error)) => {
1313 error!(%session_id, %error, "failed to notify about new incoming session");
1314 return Err(SessionManagerError::other(error).into());
1315 }
1316 _ => {}
1317 };
1318
1319 trace!(?session_id, "session notification sent");
1320
1321 let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1324 orig_challenge: session_req.challenge,
1325 session_id,
1326 });
1327
1328 msg_sender
1329 .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1330 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1331 .await
1332 .map_err(|_| {
1333 error!(%session_id, "timeout sending session establishment message");
1334 TransportSessionError::Timeout
1335 })?
1336 .map_err(|error| {
1337 error!(%session_id, %error, "failed to send session establishment message");
1338 SessionManagerError::other(error)
1339 })?;
1340
1341 #[cfg(feature = "telemetry")]
1342 {
1343 initialize_session_metrics(
1344 session_id,
1345 HoprSessionConfig {
1346 capabilities: session_req.capabilities.0,
1347 frame_mtu: self.cfg.frame_mtu,
1348 frame_timeout: self.cfg.max_frame_timeout,
1349 },
1350 );
1351 set_session_state(&session_id, SessionLifecycleState::Active);
1352 set_session_balancer_data(&session_id, slot.surb_estimator.clone(), slot.surb_mgmt.clone());
1353 }
1354
1355 info!(%session_id, "new session established");
1356
1357 #[cfg(all(feature = "telemetry", not(test)))]
1358 {
1359 METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1360 METRIC_ACTIVE_SESSIONS.increment(1.0);
1361 }
1362 } else {
1363 error!(%pseudonym,"failed to reserve a new session slot");
1364
1365 let reason = StartErrorReason::NoSlotsAvailable;
1367 let data = HoprStartProtocol::SessionError(StartErrorType {
1368 challenge: session_req.challenge,
1369 reason,
1370 });
1371
1372 msg_sender
1373 .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1374 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1375 .await
1376 .map_err(|_| {
1377 error!("timeout sending session error message");
1378 TransportSessionError::Timeout
1379 })?
1380 .map_err(|error| {
1381 error!(%error, "failed to send session error message");
1382 SessionManagerError::other(error)
1383 })?;
1384
1385 trace!(%pseudonym, "session establishment failure message sent");
1386
1387 #[cfg(all(feature = "telemetry", not(test)))]
1388 METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1389 }
1390
1391 Ok(())
1392 }
1393
1394 async fn handle_start_protocol_message(
1395 &self,
1396 pseudonym: HoprPseudonym,
1397 data: ApplicationDataIn,
1398 ) -> crate::errors::Result<()> {
1399 match HoprStartProtocol::try_from(data.data)? {
1400 HoprStartProtocol::StartSession(session_req) => {
1401 self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1402 }
1403 HoprStartProtocol::SessionEstablished(est) => {
1404 trace!(
1405 session_id = ?est.session_id,
1406 "received session establishment confirmation"
1407 );
1408 let challenge = est.orig_challenge;
1409 let session_id = est.session_id;
1410 if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1411 if let Err(error) = tx_est.unbounded_send(Ok(est)) {
1412 error!(%challenge, %session_id, %error, "failed to send session establishment confirmation");
1413 return Err(SessionManagerError::other(error).into());
1414 }
1415 debug!(?session_id, challenge, "session establishment complete");
1416 } else {
1417 error!(%session_id, challenge, "unknown session establishment attempt or expired");
1418 }
1419 }
1420 HoprStartProtocol::SessionError(error_type) => {
1421 trace!(
1422 challenge = error_type.challenge,
1423 error = ?error_type.reason,
1424 "failed to initialize a session",
1425 );
1426 if let Some(tx_est) = self.session_initiations.remove(&error_type.challenge).await {
1429 if let Err(error) = tx_est.unbounded_send(Err(error_type)) {
1430 error!(%error, ?error_type, "could not send session error message");
1431 return Err(SessionManagerError::other(error).into());
1432 }
1433 error!(
1434 challenge = error_type.challenge,
1435 ?error_type,
1436 "session establishment error received"
1437 );
1438 } else {
1439 error!(
1440 challenge = error_type.challenge,
1441 ?error_type,
1442 "session establishment attempt expired before error could be delivered"
1443 );
1444 }
1445
1446 #[cfg(all(feature = "telemetry", not(test)))]
1447 METRIC_RECEIVED_SESSION_ERRS.increment(&[&error_type.reason.to_string()])
1448 }
1449 HoprStartProtocol::KeepAlive(msg) => {
1450 let session_id = msg.session_id;
1451 if let Some(session_slot) = self.sessions.get(&session_id).await {
1452 trace!(?session_id, "received keep-alive message");
1453 match &session_slot.routing_opts {
1454 DestinationRouting::Forward { .. } => {
1456 if msg.flags.contains(KeepAliveFlag::BalancerState)
1457 && !session_slot.surb_mgmt.is_disabled()
1458 && session_slot.surb_mgmt.buffer_level() != msg.additional_data
1459 {
1460 session_slot
1462 .surb_mgmt
1463 .buffer_level
1464 .store(msg.additional_data, std::sync::atomic::Ordering::Relaxed);
1465 debug!(%session_id, surb_level = msg.additional_data, "keep-alive updated SURB buffer size from the Exit");
1466 }
1467
1468 session_slot
1470 .surb_estimator
1471 .consumed
1472 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1473 #[cfg(feature = "telemetry")]
1474 crate::telemetry::record_session_surb_consumed(&session_id, 1);
1475 }
1476 DestinationRouting::Return(_) => {
1478 if msg.flags.contains(KeepAliveFlag::BalancerTarget)
1480 && msg.additional_data > 0
1481 && !session_slot.surb_mgmt.is_disabled()
1482 && session_slot.surb_mgmt.controller_bounds().target() != msg.additional_data
1483 {
1484 session_slot
1486 .surb_mgmt
1487 .target_surb_buffer_size
1488 .store(msg.additional_data, std::sync::atomic::Ordering::Relaxed);
1489 session_slot.surb_mgmt.max_surbs_per_sec.store(
1491 msg.additional_data / self.cfg.minimum_surb_buffer_duration.as_secs(),
1492 std::sync::atomic::Ordering::Relaxed,
1493 );
1494 debug!(%session_id, target_surb_buffer_size = msg.additional_data, "keep-alive updated SURB balancer target buffer size from the Entry");
1495 }
1496
1497 let produced = KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64;
1500 session_slot
1501 .surb_estimator
1502 .produced
1503 .fetch_add(produced, std::sync::atomic::Ordering::Relaxed);
1504 #[cfg(feature = "telemetry")]
1505 crate::telemetry::record_session_surb_produced(&session_id, produced);
1506 }
1507 }
1508 } else {
1509 debug!(%session_id, "received keep-alive request for an unknown session");
1510 }
1511 }
1512 }
1513
1514 Ok(())
1515 }
1516}
1517
1518#[cfg(test)]
1519mod tests {
1520 use anyhow::anyhow;
1521 use futures::{AsyncWriteExt, future::BoxFuture};
1522 use hopr_protocol_start::{StartProtocol, StartProtocolDiscriminants};
1523 use hopr_api::types::{
1524 crypto::{keypairs::ChainKeypair, prelude::Keypair},
1525 crypto_random::Randomizable,
1526 internal::routing::SurbMatcher,
1527 primitive::prelude::Address,
1528 };
1529 use hopr_utils::network_types::prelude::SealedHost;
1530 use tokio::time::timeout;
1531
1532 use super::*;
1533 use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1534
1535 fn test_tag_allocator() -> Arc<dyn TagAllocator + Send + Sync> {
1537 test_tag_allocator_with_session_capacity(10000)
1538 }
1539
1540 fn test_tag_allocator_with_session_capacity(session_capacity: u64) -> Arc<dyn TagAllocator + Send + Sync> {
1542 hopr_transport_tag_allocator::create_allocators(
1543 ReservedTag::range().end..u16::MAX as u64 + 1,
1544 [
1545 (hopr_transport_tag_allocator::Usage::Session, session_capacity),
1546 (hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry, 10000),
1547 (hopr_transport_tag_allocator::Usage::ProvingTelemetry, 10000),
1548 ],
1549 )
1550 .expect("test allocator creation must not fail")
1551 .into_iter()
1552 .find(|(u, _)| matches!(u, hopr_transport_tag_allocator::Usage::Session))
1553 .expect("session allocator must exist")
1554 .1
1555 }
1556
1557 #[async_trait::async_trait]
1558 trait SendMsg {
1559 async fn send_message(
1560 &self,
1561 routing: DestinationRouting,
1562 data: ApplicationDataOut,
1563 ) -> crate::errors::Result<()>;
1564 }
1565
1566 mockall::mock! {
1567 MsgSender {}
1568 impl SendMsg for MsgSender {
1569 fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationDataOut)
1570 -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1571 }
1572 }
1573
1574 fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationDataOut)> {
1575 let (tx, rx) = futures::channel::mpsc::unbounded();
1576 tokio::task::spawn(async move {
1577 pin_mut!(rx);
1578 while let Some((routing, data)) = rx.next().await {
1579 sender
1580 .send_message(routing, data)
1581 .await
1582 .expect("send message must not fail in mock");
1583 }
1584 });
1585 tx
1586 }
1587
1588 fn msg_type(data: &ApplicationDataOut, expected: StartProtocolDiscriminants) -> bool {
1589 HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1590 .map(|d| StartProtocolDiscriminants::from(d) == expected)
1591 .unwrap_or(false)
1592 }
1593
1594 fn start_msg_match(data: &ApplicationDataOut, msg: impl Fn(HoprStartProtocol) -> bool) -> bool {
1595 HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1596 .map(msg)
1597 .unwrap_or(false)
1598 }
1599
1600 #[test_log::test(tokio::test)]
1601 async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1602 {
1603 let alice_pseudonym = HoprPseudonym::random();
1604 let bob_peer: Address = (&ChainKeypair::random()).into();
1605
1606 let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1607 let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1608
1609 let mut sequence = mockall::Sequence::new();
1610 let mut alice_transport = MockMsgSender::new();
1611 let mut bob_transport = MockMsgSender::new();
1612
1613 let bob_mgr_clone = bob_mgr.clone();
1615 alice_transport
1616 .expect_send_message()
1617 .once()
1618 .in_sequence(&mut sequence)
1619 .withf(move |peer, data| {
1620 info!("alice sends {}", data.data.application_tag);
1621 msg_type(data, StartProtocolDiscriminants::StartSession)
1622 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1623 })
1624 .returning(move |_, data| {
1625 let bob_mgr_clone = bob_mgr_clone.clone();
1626 Box::pin(async move {
1627 bob_mgr_clone
1628 .dispatch_message(
1629 alice_pseudonym,
1630 ApplicationDataIn {
1631 data: data.data,
1632 packet_info: Default::default(),
1633 },
1634 )
1635 .await?;
1636 Ok(())
1637 })
1638 });
1639
1640 let alice_mgr_clone = alice_mgr.clone();
1642 bob_transport
1643 .expect_send_message()
1644 .once()
1645 .in_sequence(&mut sequence)
1646 .withf(move |peer, data| {
1647 info!("bob sends {}", data.data.application_tag);
1648 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1649 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1650 })
1651 .returning(move |_, data| {
1652 let alice_mgr_clone = alice_mgr_clone.clone();
1653
1654 Box::pin(async move {
1655 alice_mgr_clone
1656 .dispatch_message(
1657 alice_pseudonym,
1658 ApplicationDataIn {
1659 data: data.data,
1660 packet_info: Default::default(),
1661 },
1662 )
1663 .await?;
1664 Ok(())
1665 })
1666 });
1667
1668 let bob_mgr_clone = bob_mgr.clone();
1670 alice_transport
1671 .expect_send_message()
1672 .once()
1673 .in_sequence(&mut sequence)
1674 .withf(move |peer, data| {
1675 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1676 data.data.plain_text.as_ref(),
1677 )
1678 .expect("must be a session message")
1679 .try_as_segment()
1680 .expect("must be a segment")
1681 .is_terminating()
1682 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1683 })
1684 .returning(move |_, data| {
1685 let bob_mgr_clone = bob_mgr_clone.clone();
1686 Box::pin(async move {
1687 bob_mgr_clone
1688 .dispatch_message(
1689 alice_pseudonym,
1690 ApplicationDataIn {
1691 data: data.data,
1692 packet_info: Default::default(),
1693 },
1694 )
1695 .await?;
1696 Ok(())
1697 })
1698 });
1699
1700 let mut ahs = Vec::new();
1701
1702 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1704 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1705
1706 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1708 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1709
1710 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1711
1712 pin_mut!(new_session_rx_bob);
1713 let (alice_session, bob_session) = timeout(
1714 Duration::from_secs(2),
1715 futures::future::join(
1716 alice_mgr.new_session(
1717 bob_peer,
1718 SessionTarget::TcpStream(target.clone()),
1719 SessionClientConfig {
1720 pseudonym: alice_pseudonym.into(),
1721 capabilities: Capability::NoRateControl | Capability::Segmentation,
1722 surb_management: None,
1723 ..Default::default()
1724 },
1725 ),
1726 new_session_rx_bob.next(),
1727 ),
1728 )
1729 .await?;
1730
1731 let mut alice_session = alice_session?;
1732 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1733
1734 assert_eq!(
1735 alice_session.config().capabilities,
1736 Capability::Segmentation | Capability::NoRateControl
1737 );
1738 assert_eq!(
1739 alice_session.config().capabilities,
1740 bob_session.session.config().capabilities
1741 );
1742 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1743
1744 assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1745 assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1746 assert!(
1747 alice_mgr
1748 .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1749 .await
1750 .is_err()
1751 );
1752
1753 assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1754 assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1755 assert!(
1756 bob_mgr
1757 .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1758 .await
1759 .is_err()
1760 );
1761
1762 tokio::time::sleep(Duration::from_millis(100)).await;
1763 alice_session.close().await?;
1764
1765 tokio::time::sleep(Duration::from_millis(100)).await;
1766
1767 assert!(matches!(
1768 alice_mgr.ping_session(alice_session.id()).await,
1769 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1770 ));
1771
1772 futures::stream::iter(ahs)
1773 .for_each(|ah| async move { ah.abort() })
1774 .await;
1775
1776 Ok(())
1777 }
1778
1779 #[test_log::test(tokio::test)]
1780 async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1781 let alice_pseudonym = HoprPseudonym::random();
1782 let bob_peer: Address = (&ChainKeypair::random()).into();
1783
1784 let cfg = SessionManagerConfig {
1785 idle_timeout: Duration::from_millis(200),
1786 ..Default::default()
1787 };
1788
1789 let alice_mgr = SessionManager::new(cfg, test_tag_allocator());
1790 let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1791
1792 let mut sequence = mockall::Sequence::new();
1793 let mut alice_transport = MockMsgSender::new();
1794 let mut bob_transport = MockMsgSender::new();
1795
1796 let bob_mgr_clone = bob_mgr.clone();
1798 alice_transport
1799 .expect_send_message()
1800 .once()
1801 .in_sequence(&mut sequence)
1802 .withf(move |peer, data| {
1803 msg_type(data, StartProtocolDiscriminants::StartSession)
1804 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1805 })
1806 .returning(move |_, data| {
1807 let bob_mgr_clone = bob_mgr_clone.clone();
1808 Box::pin(async move {
1809 bob_mgr_clone
1810 .dispatch_message(
1811 alice_pseudonym,
1812 ApplicationDataIn {
1813 data: data.data,
1814 packet_info: Default::default(),
1815 },
1816 )
1817 .await?;
1818 Ok(())
1819 })
1820 });
1821
1822 let alice_mgr_clone = alice_mgr.clone();
1824 bob_transport
1825 .expect_send_message()
1826 .once()
1827 .in_sequence(&mut sequence)
1828 .withf(move |peer, data| {
1829 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1830 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1831 })
1832 .returning(move |_, data| {
1833 let alice_mgr_clone = alice_mgr_clone.clone();
1834
1835 Box::pin(async move {
1836 alice_mgr_clone
1837 .dispatch_message(
1838 alice_pseudonym,
1839 ApplicationDataIn {
1840 data: data.data,
1841 packet_info: Default::default(),
1842 },
1843 )
1844 .await?;
1845 Ok(())
1846 })
1847 });
1848
1849 let mut ahs = Vec::new();
1850
1851 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1853 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1854
1855 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1857 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1858
1859 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1860
1861 pin_mut!(new_session_rx_bob);
1862 let (alice_session, bob_session) = timeout(
1863 Duration::from_secs(2),
1864 futures::future::join(
1865 alice_mgr.new_session(
1866 bob_peer,
1867 SessionTarget::TcpStream(target.clone()),
1868 SessionClientConfig {
1869 pseudonym: alice_pseudonym.into(),
1870 capabilities: Capability::NoRateControl | Capability::Segmentation,
1871 surb_management: None,
1872 ..Default::default()
1873 },
1874 ),
1875 new_session_rx_bob.next(),
1876 ),
1877 )
1878 .await?;
1879
1880 let alice_session = alice_session?;
1881 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1882
1883 assert_eq!(
1884 alice_session.config().capabilities,
1885 Capability::Segmentation | Capability::NoRateControl,
1886 );
1887 assert_eq!(
1888 alice_session.config().capabilities,
1889 bob_session.session.config().capabilities
1890 );
1891 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1892
1893 tokio::time::sleep(Duration::from_millis(300)).await;
1895
1896 assert!(matches!(
1897 alice_mgr.ping_session(alice_session.id()).await,
1898 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1899 ));
1900
1901 futures::stream::iter(ahs)
1902 .for_each(|ah| async move { ah.abort() })
1903 .await;
1904
1905 Ok(())
1906 }
1907
1908 #[test_log::test(tokio::test)]
1909 async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1910 let alice_pseudonym = HoprPseudonym::random();
1911 let session_id = SessionId::new(16u64, alice_pseudonym);
1912 let balancer_cfg = SurbBalancerConfig {
1913 target_surb_buffer_size: 1000,
1914 max_surbs_per_sec: 100,
1915 ..Default::default()
1916 };
1917
1918 let alice_mgr = SessionManager::<UnboundedSender<(DestinationRouting, ApplicationDataOut)>>::new(
1919 Default::default(),
1920 test_tag_allocator(),
1921 );
1922
1923 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1924 alice_mgr
1925 .sessions
1926 .insert(
1927 session_id,
1928 SessionSlot {
1929 session_tx: Arc::new(dummy_tx),
1930 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1931 abort_handles: Default::default(),
1932 surb_mgmt: Arc::new(BalancerStateValues::from(balancer_cfg)),
1933 surb_estimator: Default::default(),
1934 allocated_tag: None,
1935 },
1936 )
1937 .await;
1938
1939 let actual_cfg = alice_mgr
1940 .get_surb_balancer_config(&session_id)
1941 .await?
1942 .ok_or(anyhow!("session must have a surb balancer config"))?;
1943 assert_eq!(actual_cfg, balancer_cfg);
1944
1945 let new_cfg = SurbBalancerConfig {
1946 target_surb_buffer_size: 2000,
1947 max_surbs_per_sec: 200,
1948 ..Default::default()
1949 };
1950 alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1951
1952 let actual_cfg = alice_mgr
1953 .get_surb_balancer_config(&session_id)
1954 .await?
1955 .ok_or(anyhow!("session must have a surb balancer config"))?;
1956 assert_eq!(actual_cfg, new_cfg);
1957
1958 Ok(())
1959 }
1960
1961 #[test_log::test(tokio::test)]
1962 async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1963 let alice_pseudonym = HoprPseudonym::random();
1964 let bob_peer: Address = (&ChainKeypair::random()).into();
1965
1966 let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1967 let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator_with_session_capacity(1));
1968
1969 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1971 bob_mgr
1972 .sessions
1973 .insert(
1974 SessionId::new(16u64, alice_pseudonym),
1975 SessionSlot {
1976 session_tx: Arc::new(dummy_tx),
1977 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1978 abort_handles: Default::default(),
1979 surb_mgmt: Default::default(),
1980 surb_estimator: Default::default(),
1981 allocated_tag: None,
1982 },
1983 )
1984 .await;
1985
1986 let mut sequence = mockall::Sequence::new();
1987 let mut alice_transport = MockMsgSender::new();
1988 let mut bob_transport = MockMsgSender::new();
1989
1990 let bob_mgr_clone = bob_mgr.clone();
1992 alice_transport
1993 .expect_send_message()
1994 .once()
1995 .in_sequence(&mut sequence)
1996 .withf(move |peer, data| {
1997 msg_type(data, StartProtocolDiscriminants::StartSession)
1998 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1999 })
2000 .returning(move |_, data| {
2001 let bob_mgr_clone = bob_mgr_clone.clone();
2002 Box::pin(async move {
2003 bob_mgr_clone
2004 .dispatch_message(
2005 alice_pseudonym,
2006 ApplicationDataIn {
2007 data: data.data,
2008 packet_info: Default::default(),
2009 },
2010 )
2011 .await?;
2012 Ok(())
2013 })
2014 });
2015
2016 let alice_mgr_clone = alice_mgr.clone();
2018 bob_transport
2019 .expect_send_message()
2020 .once()
2021 .in_sequence(&mut sequence)
2022 .withf(move |peer, data| {
2023 msg_type(data, StartProtocolDiscriminants::SessionError)
2024 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2025 })
2026 .returning(move |_, data| {
2027 let alice_mgr_clone = alice_mgr_clone.clone();
2028 Box::pin(async move {
2029 alice_mgr_clone
2030 .dispatch_message(
2031 alice_pseudonym,
2032 ApplicationDataIn {
2033 data: data.data,
2034 packet_info: Default::default(),
2035 },
2036 )
2037 .await?;
2038 Ok(())
2039 })
2040 });
2041
2042 let mut jhs = Vec::new();
2043
2044 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2046 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2047
2048 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2050 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2051
2052 let result = alice_mgr
2053 .new_session(
2054 bob_peer,
2055 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2056 SessionClientConfig {
2057 capabilities: Capabilities::empty(),
2058 pseudonym: alice_pseudonym.into(),
2059 surb_management: None,
2060 ..Default::default()
2061 },
2062 )
2063 .await;
2064
2065 assert!(
2066 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2067 );
2068
2069 Ok(())
2070 }
2071
2072 #[test_log::test(tokio::test)]
2073 async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
2074 -> anyhow::Result<()> {
2075 let alice_pseudonym = HoprPseudonym::random();
2076 let bob_peer: Address = (&ChainKeypair::random()).into();
2077
2078 let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2079 let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator_with_session_capacity(1));
2080
2081 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
2083 bob_mgr
2084 .sessions
2085 .insert(
2086 SessionId::new(16u64, alice_pseudonym),
2087 SessionSlot {
2088 session_tx: Arc::new(dummy_tx),
2089 routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
2090 abort_handles: Default::default(),
2091 surb_mgmt: Default::default(),
2092 surb_estimator: Default::default(),
2093 allocated_tag: None,
2094 },
2095 )
2096 .await;
2097
2098 let mut sequence = mockall::Sequence::new();
2099 let mut alice_transport = MockMsgSender::new();
2100 let mut bob_transport = MockMsgSender::new();
2101
2102 let bob_mgr_clone = bob_mgr.clone();
2104 alice_transport
2105 .expect_send_message()
2106 .once()
2107 .in_sequence(&mut sequence)
2108 .withf(move |peer, data| {
2109 msg_type(data, StartProtocolDiscriminants::StartSession)
2110 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2111 })
2112 .returning(move |_, data| {
2113 let bob_mgr_clone = bob_mgr_clone.clone();
2114 Box::pin(async move {
2115 bob_mgr_clone
2116 .dispatch_message(
2117 alice_pseudonym,
2118 ApplicationDataIn {
2119 data: data.data,
2120 packet_info: Default::default(),
2121 },
2122 )
2123 .await?;
2124 Ok(())
2125 })
2126 });
2127
2128 let alice_mgr_clone = alice_mgr.clone();
2130 bob_transport
2131 .expect_send_message()
2132 .once()
2133 .in_sequence(&mut sequence)
2134 .withf(move |peer, data| {
2135 msg_type(data, StartProtocolDiscriminants::SessionError)
2136 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2137 })
2138 .returning(move |_, data| {
2139 let alice_mgr_clone = alice_mgr_clone.clone();
2140 Box::pin(async move {
2141 alice_mgr_clone
2142 .dispatch_message(
2143 alice_pseudonym,
2144 ApplicationDataIn {
2145 data: data.data,
2146 packet_info: Default::default(),
2147 },
2148 )
2149 .await?;
2150 Ok(())
2151 })
2152 });
2153
2154 let mut jhs = Vec::new();
2155
2156 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2158 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2159
2160 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2162 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2163
2164 let result = alice_mgr
2165 .new_session(
2166 bob_peer,
2167 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2168 SessionClientConfig {
2169 capabilities: None.into(),
2170 pseudonym: alice_pseudonym.into(),
2171 surb_management: None,
2172 ..Default::default()
2173 },
2174 )
2175 .await;
2176
2177 assert!(
2178 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2179 );
2180
2181 Ok(())
2182 }
2183
2184 #[test_log::test(tokio::test)]
2185 async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
2186 let alice_pseudonym = HoprPseudonym::random();
2187 let bob_peer: Address = (&ChainKeypair::random()).into();
2188
2189 let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2190
2191 let mut sequence = mockall::Sequence::new();
2192 let mut alice_transport = MockMsgSender::new();
2193
2194 let alice_mgr_clone = alice_mgr.clone();
2196 alice_transport
2197 .expect_send_message()
2198 .once()
2199 .in_sequence(&mut sequence)
2200 .withf(move |peer, data| {
2201 msg_type(data, StartProtocolDiscriminants::StartSession)
2202 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2203 })
2204 .returning(move |_, data| {
2205 let alice_mgr_clone = alice_mgr_clone.clone();
2207 Box::pin(async move {
2208 alice_mgr_clone
2209 .dispatch_message(
2210 alice_pseudonym,
2211 ApplicationDataIn {
2212 data: data.data,
2213 packet_info: Default::default(),
2214 },
2215 )
2216 .await?;
2217 Ok(())
2218 })
2219 });
2220
2221 let alice_mgr_clone = alice_mgr.clone();
2223 alice_transport
2224 .expect_send_message()
2225 .once()
2226 .in_sequence(&mut sequence)
2227 .withf(move |peer, data| {
2228 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2229 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2230 })
2231 .returning(move |_, data| {
2232 let alice_mgr_clone = alice_mgr_clone.clone();
2233
2234 Box::pin(async move {
2235 alice_mgr_clone
2236 .dispatch_message(
2237 alice_pseudonym,
2238 ApplicationDataIn {
2239 data: data.data,
2240 packet_info: Default::default(),
2241 },
2242 )
2243 .await?;
2244 Ok(())
2245 })
2246 });
2247
2248 let (new_session_tx_alice, new_session_rx_alice) = futures::channel::mpsc::channel(1024);
2250 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2251
2252 let alice_session = alice_mgr
2253 .new_session(
2254 bob_peer,
2255 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2256 SessionClientConfig {
2257 capabilities: None.into(),
2258 pseudonym: alice_pseudonym.into(),
2259 surb_management: None,
2260 ..Default::default()
2261 },
2262 )
2263 .await;
2264
2265 println!("{alice_session:?}");
2266 assert!(matches!(
2267 alice_session,
2268 Err(TransportSessionError::Manager(SessionManagerError::Loopback))
2269 ));
2270
2271 drop(new_session_rx_alice);
2272 Ok(())
2273 }
2274
2275 #[test_log::test(tokio::test)]
2276 async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
2277 let bob_peer: Address = (&ChainKeypair::random()).into();
2278
2279 let cfg = SessionManagerConfig {
2280 initiation_timeout_base: Duration::from_millis(100),
2281 ..Default::default()
2282 };
2283
2284 let alice_mgr = SessionManager::new(cfg, test_tag_allocator());
2285 let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2286
2287 let mut sequence = mockall::Sequence::new();
2288 let mut alice_transport = MockMsgSender::new();
2289 let bob_transport = MockMsgSender::new();
2290
2291 alice_transport
2293 .expect_send_message()
2294 .once()
2295 .in_sequence(&mut sequence)
2296 .withf(move |peer, data| {
2297 msg_type(data, StartProtocolDiscriminants::StartSession)
2298 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2299 })
2300 .returning(|_, _| Box::pin(async { Ok(()) }));
2301
2302 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2304 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2305
2306 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2308 bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
2309
2310 let result = alice_mgr
2311 .new_session(
2312 bob_peer,
2313 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2314 SessionClientConfig {
2315 capabilities: None.into(),
2316 pseudonym: None,
2317 surb_management: None,
2318 ..Default::default()
2319 },
2320 )
2321 .await;
2322
2323 assert!(matches!(result, Err(TransportSessionError::Timeout)));
2324
2325 Ok(())
2326 }
2327
2328 #[cfg(feature = "telemetry")]
2329 #[test_log::test(tokio::test)]
2330 async fn failed_incoming_session_establishment_does_not_register_telemetry() -> anyhow::Result<()> {
2331 let mgr = SessionManager::new(Default::default(), test_tag_allocator());
2332
2333 let transport = MockMsgSender::new();
2334 let (new_session_tx, new_session_rx) = futures::channel::mpsc::channel(1);
2335 drop(new_session_rx);
2336 mgr.start(mock_packet_planning(transport), new_session_tx)?;
2337
2338 let pseudonym = HoprPseudonym::random();
2339 let result = mgr
2340 .handle_incoming_session_initiation(
2341 pseudonym,
2342 StartInitiation {
2343 challenge: MIN_CHALLENGE,
2344 target: SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2345 capabilities: ByteCapabilities(Capabilities::empty()),
2346 additional_data: 0,
2347 },
2348 )
2349 .await;
2350
2351 assert!(result.is_err());
2352
2353 let allocated_session_ids = mgr.active_sessions().await;
2354 assert_eq!(1, allocated_session_ids.len());
2355
2356 Ok(())
2357 }
2358
2359 #[test_log::test(tokio::test)]
2360 async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
2361 let alice_pseudonym = HoprPseudonym::random();
2362 let bob_peer: Address = (&ChainKeypair::random()).into();
2363
2364 let bob_cfg = SessionManagerConfig {
2365 surb_balance_notify_period: Some(Duration::from_millis(500)),
2366 ..Default::default()
2367 };
2368 let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2369 let bob_mgr = SessionManager::new(bob_cfg.clone(), test_tag_allocator());
2370
2371 let mut alice_transport = MockMsgSender::new();
2372 let mut bob_transport = MockMsgSender::new();
2373
2374 let mut open_sequence = mockall::Sequence::new();
2376 let bob_mgr_clone = bob_mgr.clone();
2377 alice_transport
2378 .expect_send_message()
2379 .once()
2380 .in_sequence(&mut open_sequence)
2381 .withf(move |peer, data| {
2382 msg_type(data, StartProtocolDiscriminants::StartSession)
2383 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2384 })
2385 .returning(move |_, data| {
2386 let bob_mgr_clone = bob_mgr_clone.clone();
2387 Box::pin(async move {
2388 bob_mgr_clone
2389 .dispatch_message(
2390 alice_pseudonym,
2391 ApplicationDataIn {
2392 data: data.data,
2393 packet_info: Default::default(),
2394 },
2395 )
2396 .await?;
2397 Ok(())
2398 })
2399 });
2400
2401 let alice_mgr_clone = alice_mgr.clone();
2403 bob_transport
2404 .expect_send_message()
2405 .once()
2406 .in_sequence(&mut open_sequence)
2407 .withf(move |peer, data| {
2408 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2409 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2410 })
2411 .returning(move |_, data| {
2412 let alice_mgr_clone = alice_mgr_clone.clone();
2413 Box::pin(async move {
2414 alice_mgr_clone
2415 .dispatch_message(
2416 alice_pseudonym,
2417 ApplicationDataIn {
2418 data: data.data,
2419 packet_info: Default::default(),
2420 },
2421 )
2422 .await?;
2423 Ok(())
2424 })
2425 });
2426
2427 const INITIAL_BALANCER_TARGET: u64 = 10;
2428
2429 let bob_mgr_clone = bob_mgr.clone();
2431 alice_transport
2432 .expect_send_message()
2433 .times(5..)
2434 .withf(move |peer, data| {
2436 start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerTarget) && ka.additional_data == INITIAL_BALANCER_TARGET))
2437 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2439 })
2440 .returning(move |_, data| {
2441 let bob_mgr_clone = bob_mgr_clone.clone();
2442 Box::pin(async move {
2443 bob_mgr_clone
2444 .dispatch_message(
2445 alice_pseudonym,
2446 ApplicationDataIn {
2447 data: data.data,
2448 packet_info: Default::default(),
2449 },
2450 )
2451 .await?;
2452 Ok(())
2453 })
2454 });
2455
2456 const NEXT_BALANCER_TARGET: u64 = 50;
2457
2458 let bob_mgr_clone = bob_mgr.clone();
2460 alice_transport
2461 .expect_send_message()
2462 .times(5..)
2463 .withf(move |peer, data| {
2465 start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerTarget) && ka.additional_data == NEXT_BALANCER_TARGET))
2466 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2467 })
2468 .returning(move |_, data| {
2469 let bob_mgr_clone = bob_mgr_clone.clone();
2470 Box::pin(async move {
2471 bob_mgr_clone
2472 .dispatch_message(
2473 alice_pseudonym,
2474 ApplicationDataIn {
2475 data: data.data,
2476 packet_info: Default::default(),
2477 },
2478 )
2479 .await?;
2480 Ok(())
2481 })
2482 });
2483
2484 let alice_mgr_clone = alice_mgr.clone();
2486 bob_transport
2487 .expect_send_message()
2488 .times(1..)
2489 .withf(move |peer, data| {
2491 start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerState) && ka.additional_data > 0))
2492 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2493 })
2494 .returning(move |_, data| {
2495 let alice_mgr_clone = alice_mgr_clone.clone();
2496 Box::pin(async move {
2497 alice_mgr_clone
2498 .dispatch_message(
2499 alice_pseudonym,
2500 ApplicationDataIn {
2501 data: data.data,
2502 packet_info: Default::default(),
2503 },
2504 )
2505 .await?;
2506 Ok(())
2507 })
2508 });
2509
2510 let bob_mgr_clone = bob_mgr.clone();
2512 alice_transport
2513 .expect_send_message()
2514 .once()
2515 .withf(move |peer, data| {
2517 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2518 data.data.plain_text.as_ref(),
2519 )
2520 .ok()
2521 .and_then(|m| m.try_as_segment())
2522 .map(|s| s.is_terminating())
2523 .unwrap_or(false)
2524 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2525 })
2526 .returning(move |_, data| {
2527 let bob_mgr_clone = bob_mgr_clone.clone();
2528 Box::pin(async move {
2529 bob_mgr_clone
2530 .dispatch_message(
2531 alice_pseudonym,
2532 ApplicationDataIn {
2533 data: data.data,
2534 packet_info: Default::default(),
2535 },
2536 )
2537 .await?;
2538 Ok(())
2539 })
2540 });
2541
2542 let mut ahs = Vec::new();
2543
2544 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2546 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2547
2548 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
2550 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2551
2552 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2553
2554 let balancer_cfg = SurbBalancerConfig {
2555 target_surb_buffer_size: INITIAL_BALANCER_TARGET,
2556 max_surbs_per_sec: 100,
2557 ..Default::default()
2558 };
2559
2560 pin_mut!(new_session_rx_bob);
2561 let (alice_session, bob_session) = timeout(
2562 Duration::from_secs(2),
2563 futures::future::join(
2564 alice_mgr.new_session(
2565 bob_peer,
2566 SessionTarget::TcpStream(target.clone()),
2567 SessionClientConfig {
2568 pseudonym: alice_pseudonym.into(),
2569 capabilities: Capability::Segmentation.into(),
2570 surb_management: Some(balancer_cfg),
2571 ..Default::default()
2572 },
2573 ),
2574 new_session_rx_bob.next(),
2575 ),
2576 )
2577 .await?;
2578
2579 let mut alice_session = alice_session?;
2580 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2581
2582 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2583
2584 assert_eq!(
2585 Some(balancer_cfg),
2586 alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2587 );
2588
2589 let remote_cfg = bob_mgr
2590 .get_surb_balancer_config(bob_session.session.id())
2591 .await?
2592 .ok_or(anyhow!("no remote config at bob"))?;
2593 assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2594 assert_eq!(
2595 remote_cfg.max_surbs_per_sec,
2596 remote_cfg.target_surb_buffer_size
2597 / bob_cfg
2598 .minimum_surb_buffer_duration
2599 .max(MIN_SURB_BUFFER_DURATION)
2600 .as_secs()
2601 );
2602
2603 tokio::time::sleep(Duration::from_millis(1500)).await;
2605
2606 let new_balancer_cfg = SurbBalancerConfig {
2607 target_surb_buffer_size: NEXT_BALANCER_TARGET,
2608 max_surbs_per_sec: 100,
2609 ..Default::default()
2610 };
2611
2612 alice_mgr
2614 .update_surb_balancer_config(alice_session.id(), new_balancer_cfg)
2615 .await?;
2616
2617 tokio::time::sleep(Duration::from_millis(1500)).await;
2619
2620 let remote_cfg = bob_mgr
2622 .get_surb_balancer_config(bob_session.session.id())
2623 .await?
2624 .ok_or(anyhow!("no remote config at bob"))?;
2625 assert_eq!(
2626 remote_cfg.target_surb_buffer_size,
2627 new_balancer_cfg.target_surb_buffer_size
2628 );
2629 assert_eq!(
2630 remote_cfg.max_surbs_per_sec,
2631 new_balancer_cfg.target_surb_buffer_size / bob_cfg.minimum_surb_buffer_duration.as_secs()
2632 );
2633
2634 let (alice_surb_sent, alice_surb_used) = alice_mgr.get_surb_level_estimates(alice_session.id()).await?;
2635 let (bob_surb_recv, bob_surb_used) = bob_mgr.get_surb_level_estimates(bob_session.session.id()).await?;
2636
2637 alice_session.close().await?;
2638
2639 assert!(alice_surb_sent > 0, "alice must've sent surbs");
2640 assert!(bob_surb_recv > 0, "bob must've received surbs");
2641 assert!(
2642 bob_surb_recv <= alice_surb_sent,
2643 "bob cannot receive more surbs than alice sent"
2644 );
2645
2646 assert!(alice_surb_used > 0, "alice must see bob used surbs");
2647 assert!(bob_surb_used > 0, "bob must've used surbs");
2648 assert!(
2649 alice_surb_used <= bob_surb_used,
2650 "alice cannot see bob used more surbs than bob actually used"
2651 );
2652
2653 tokio::time::sleep(Duration::from_millis(300)).await;
2654 assert!(matches!(
2655 alice_mgr.ping_session(alice_session.id()).await,
2656 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2657 ));
2658
2659 futures::stream::iter(ahs)
2660 .for_each(|ah| async move { ah.abort() })
2661 .await;
2662
2663 Ok(())
2664 }
2665}