1use std::{
2 ops::Range,
3 sync::{Arc, OnceLock},
4 time::Duration,
5};
6
7use anyhow::anyhow;
8use futures::{
9 FutureExt, SinkExt, StreamExt, TryStreamExt,
10 channel::mpsc::{Sender, UnboundedSender},
11 future::AbortHandle,
12 pin_mut,
13};
14use futures_time::future::FutureExt as TimeExt;
15use hopr_async_runtime::AbortableList;
16use hopr_crypto_packet::prelude::HoprPacket;
17use hopr_crypto_random::Randomizable;
18use hopr_internal_types::prelude::HoprPseudonym;
19use hopr_network_types::prelude::*;
20use hopr_primitive_types::prelude::Address;
21use hopr_protocol_app::prelude::*;
22use hopr_protocol_start::{
23 KeepAliveFlag, KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished,
24 StartInitiation,
25};
26use tracing::{debug, error, info, trace, warn};
27
28#[cfg(feature = "telemetry")]
29use crate::telemetry::{SessionLifecycleState, SessionStatsSnapshot, SessionTelemetry};
30use crate::{
31 Capability, HoprSession, IncomingSession, SESSION_MTU, SessionClientConfig, SessionId, SessionTarget,
32 SurbBalancerConfig,
33 balancer::{
34 AtomicSurbFlowEstimator, BalancerStateValues, RateController, RateLimitSinkExt, SurbBalancer,
35 SurbControllerWithCorrection,
36 pid::{PidBalancerController, PidControllerGains},
37 simple::SimpleBalancerController,
38 },
39 errors::{SessionManagerError, TransportSessionError},
40 types::{ByteCapabilities, ClosureReason, HoprSessionConfig, HoprStartProtocol},
41 utils,
42 utils::{SurbNotificationMode, insert_into_next_slot},
43};
44
45#[cfg(all(feature = "prometheus", not(test)))]
46lazy_static::lazy_static! {
47 static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
48 "hopr_session_num_active_sessions",
49 "Number of currently active HOPR sessions"
50 ).unwrap();
51 static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
52 "hopr_session_established_sessions_count",
53 "Number of sessions that were successfully established as an Exit node"
54 ).unwrap();
55 static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
56 "hopr_session_initiated_sessions_count",
57 "Number of sessions that were successfully initiated as an Entry node"
58 ).unwrap();
59 static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
60 "hopr_session_received_error_count",
61 "Number of HOPR session errors received from an Exit node",
62 &["kind"]
63 ).unwrap();
64 static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
65 "hopr_session_sent_error_count",
66 "Number of HOPR session errors sent to an Entry node",
67 &["kind"]
68 ).unwrap();
69}
70
71fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
72 debug!(?session_id, ?reason, "closing session");
73
74 #[cfg(feature = "telemetry")]
75 {
76 session_data.telemetry.set_state(SessionLifecycleState::Closed);
77 session_data.telemetry.touch_activity();
78 }
79
80 if reason != ClosureReason::EmptyRead {
81 session_data.session_tx.close_channel();
83 trace!(?session_id, "data tx channel closed on session");
84 }
85
86 session_data.abort_handles.lock().abort_all();
88
89 #[cfg(all(feature = "prometheus", not(test)))]
90 METRIC_ACTIVE_SESSIONS.decrement(1.0);
91}
92
93fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
94 base * (hops as u32)
95}
96
97pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
99pub const MIN_SURB_BUFFER_NOTIFICATION_PERIOD: Duration = Duration::from_secs(1);
101
102pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
104
105const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
107
108const MIN_FRAME_TIMEOUT: Duration = Duration::from_millis(10);
110
111type SessionInitiationCache =
115 moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
116
117#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
118enum SessionTasks {
119 KeepAlive,
120 Balancer,
121}
122
123#[derive(Clone)]
124struct SessionSlot {
125 session_tx: Arc<UnboundedSender<ApplicationDataIn>>,
128 routing_opts: DestinationRouting,
129 abort_handles: Arc<parking_lot::Mutex<AbortableList<SessionTasks>>>,
131 #[cfg(feature = "telemetry")]
133 telemetry: Arc<SessionTelemetry>,
134 surb_mgmt: Arc<BalancerStateValues>,
137 surb_estimator: AtomicSurbFlowEstimator,
140}
141
142#[derive(Clone, Debug, PartialEq, Eq)]
144pub enum DispatchResult {
145 Processed,
147 Unrelated(ApplicationDataIn),
149}
150
151#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
153pub struct SessionManagerConfig {
154 #[doc(hidden)]
162 #[default(_code = "16u64..1024u64")]
163 pub session_tag_range: Range<u64>,
164
165 #[default(128)]
174 pub maximum_sessions: usize,
175
176 #[default(1500)]
180 pub frame_mtu: usize,
181
182 #[default(Duration::from_millis(800))]
186 pub max_frame_timeout: Duration,
187
188 #[default(Duration::from_millis(500))]
195 pub initiation_timeout_base: Duration,
196
197 #[default(Duration::from_secs(180))]
201 pub idle_timeout: Duration,
202
203 #[default(Duration::from_millis(100))]
208 pub balancer_sampling_interval: Duration,
209
210 #[default(10)]
216 pub initial_return_session_egress_rate: usize,
217
218 #[default(Duration::from_secs(5))]
229 pub minimum_surb_buffer_duration: Duration,
230
231 #[default(10_000)]
239 pub maximum_surb_buffer_size: usize,
240
241 #[default(None)]
249 pub surb_balance_notify_period: Option<Duration>,
250
251 #[default(true)]
258 pub surb_target_notify: bool,
259}
260
261pub struct SessionManager<S, T> {
402 session_initiations: SessionInitiationCache,
403 #[allow(clippy::type_complexity)]
404 session_notifiers: Arc<OnceLock<(T, Sender<(SessionId, ClosureReason)>)>>,
405 sessions: moka::future::Cache<SessionId, SessionSlot>,
406 msg_sender: Arc<OnceLock<S>>,
407 cfg: SessionManagerConfig,
408}
409
410impl<S, T> Clone for SessionManager<S, T> {
411 fn clone(&self) -> Self {
412 Self {
413 session_initiations: self.session_initiations.clone(),
414 session_notifiers: self.session_notifiers.clone(),
415 sessions: self.sessions.clone(),
416 cfg: self.cfg.clone(),
417 msg_sender: self.msg_sender.clone(),
418 }
419 }
420}
421
422const EXTERNAL_SEND_TIMEOUT: Duration = Duration::from_millis(200);
423
424impl<S, T> SessionManager<S, T>
425where
426 S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
427 T: futures::Sink<IncomingSession> + Clone + Send + Sync + Unpin + 'static,
428 S::Error: std::error::Error + Send + Sync + Clone + 'static,
429 T::Error: std::error::Error + Send + Sync + Clone + 'static,
430{
431 pub fn new(mut cfg: SessionManagerConfig) -> Self {
433 let min_session_tag_range_reservation = ReservedTag::range().end;
434 debug_assert!(
435 min_session_tag_range_reservation > HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG.as_u64(),
436 "invalid tag reservation range"
437 );
438
439 if cfg.session_tag_range.start < min_session_tag_range_reservation {
441 let diff = min_session_tag_range_reservation - cfg.session_tag_range.start;
442 cfg.session_tag_range = min_session_tag_range_reservation..cfg.session_tag_range.end + diff;
443 }
444 cfg.maximum_sessions = cfg
445 .maximum_sessions
446 .clamp(1, (cfg.session_tag_range.end - cfg.session_tag_range.start) as usize);
447 cfg.surb_balance_notify_period = cfg
448 .surb_balance_notify_period
449 .map(|p| p.max(MIN_SURB_BUFFER_NOTIFICATION_PERIOD));
450 cfg.minimum_surb_buffer_duration = cfg.minimum_surb_buffer_duration.max(MIN_SURB_BUFFER_DURATION);
451
452 cfg.frame_mtu = cfg.frame_mtu.max(SESSION_MTU);
454 cfg.max_frame_timeout = cfg.max_frame_timeout.max(MIN_FRAME_TIMEOUT);
455
456 #[cfg(all(feature = "prometheus", not(test)))]
457 METRIC_ACTIVE_SESSIONS.set(0.0);
458
459 let msg_sender = Arc::new(OnceLock::new());
460 Self {
461 msg_sender: msg_sender.clone(),
462 session_initiations: moka::future::Cache::builder()
463 .max_capacity(cfg.maximum_sessions as u64)
464 .time_to_live(
465 2 * initiation_timeout_max_one_way(
466 cfg.initiation_timeout_base,
467 RoutingOptions::MAX_INTERMEDIATE_HOPS,
468 ),
469 )
470 .build(),
471 sessions: moka::future::Cache::builder()
472 .max_capacity(cfg.maximum_sessions as u64)
473 .time_to_idle(cfg.idle_timeout)
474 .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
475 moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
476 trace!(?session_id, ?reason, "session evicted from the cache");
477 close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
478 }
479 _ => {}
480 })
481 .build(),
482 session_notifiers: Arc::new(OnceLock::new()),
483 cfg,
484 }
485 }
486
487 pub fn start(&self, msg_sender: S, new_session_notifier: T) -> crate::errors::Result<Vec<AbortHandle>> {
493 self.msg_sender
494 .set(msg_sender)
495 .map_err(|_| SessionManagerError::AlreadyStarted)?;
496
497 let (session_close_tx, session_close_rx) = futures::channel::mpsc::channel(self.cfg.maximum_sessions + 10);
498 self.session_notifiers
499 .set((new_session_notifier, session_close_tx))
500 .map_err(|_| SessionManagerError::AlreadyStarted)?;
501
502 let myself = self.clone();
503 let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable!(session_close_rx.for_each_concurrent(
504 None,
505 move |(session_id, closure_reason)| {
506 let myself = myself.clone();
507 async move {
508 if let Some(session_data) = myself.sessions.remove(&session_id).await {
512 close_session(session_id, session_data, closure_reason);
513 } else {
514 debug!(
516 ?session_id,
517 ?closure_reason,
518 "could not find session id to close, maybe the session is already closed"
519 );
520 }
521 }
522 },
523 ));
524
525 let myself = self.clone();
530 let ah_session_expiration = hopr_async_runtime::spawn_as_abortable!(async move {
531 let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
532 let timeout = 2 * initiation_timeout_max_one_way(
533 myself.cfg.initiation_timeout_base,
534 RoutingOptions::MAX_INTERMEDIATE_HOPS,
535 )
536 .min(myself.cfg.idle_timeout)
537 .mul_f64(jitter)
538 / 2;
539 futures_time::stream::interval(timeout.into())
540 .for_each(|_| {
541 trace!("executing session cache evictions");
542 futures::future::join(
543 myself.sessions.run_pending_tasks(),
544 myself.session_initiations.run_pending_tasks(),
545 )
546 .map(|_| ())
547 })
548 .await;
549 });
550
551 Ok(vec![ah_closure_notifications, ah_session_expiration])
552 }
553
554 pub fn is_started(&self) -> bool {
556 self.session_notifiers.get().is_some()
557 }
558
559 async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
560 if let moka::ops::compute::CompResult::Inserted(_) = self
562 .sessions
563 .entry(session_id)
564 .and_compute_with(|entry| {
565 futures::future::ready(if entry.is_none() {
566 moka::ops::compute::Op::Put(slot)
567 } else {
568 moka::ops::compute::Op::Nop
569 })
570 })
571 .await
572 {
573 #[cfg(all(feature = "prometheus", not(test)))]
574 {
575 METRIC_NUM_INITIATED_SESSIONS.increment();
576 METRIC_ACTIVE_SESSIONS.increment(1.0);
577 }
578
579 Ok(())
580 } else {
581 error!(%session_id, "session already exists - loopback attempt");
583 Err(SessionManagerError::Loopback.into())
584 }
585 }
586
587 pub async fn new_session(
595 &self,
596 destination: Address,
597 target: SessionTarget,
598 cfg: SessionClientConfig,
599 ) -> crate::errors::Result<HoprSession> {
600 self.sessions.run_pending_tasks().await;
601 if self.cfg.maximum_sessions <= self.sessions.entry_count() as usize {
602 return Err(SessionManagerError::TooManySessions.into());
603 }
604
605 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
606
607 let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
608 let (challenge, _) = insert_into_next_slot(
609 &self.session_initiations,
610 |ch| {
611 if let Some(challenge) = ch {
612 ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
613 } else {
614 hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
615 }
616 },
617 |_| tx_initiation_done,
618 )
619 .await
620 .ok_or(SessionManagerError::NoChallengeSlots)?; trace!(challenge, ?cfg, "initiating session with config");
624 let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
625 challenge,
626 target,
627 capabilities: ByteCapabilities(cfg.capabilities),
628 additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
629 cfg.surb_management
630 .map(|c| c.target_surb_buffer_size)
631 .unwrap_or(
632 self.cfg.initial_return_session_egress_rate as u64
633 * self
634 .cfg
635 .minimum_surb_buffer_duration
636 .max(MIN_SURB_BUFFER_DURATION)
637 .as_secs(),
638 )
639 .min(u32::MAX as u64) as u32
640 } else {
641 0
642 },
643 });
644
645 let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
646 let forward_routing = DestinationRouting::Forward {
647 destination: Box::new(destination.into()),
648 pseudonym: Some(pseudonym), forward_options: cfg.forward_path_options.clone(),
650 return_options: cfg.return_path_options.clone().into(),
651 };
652
653 info!(challenge, %pseudonym, %destination, "new session request");
655 msg_sender
656 .send((
657 forward_routing.clone(),
658 ApplicationDataOut::with_no_packet_info(start_session_msg.try_into()?),
659 ))
660 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
661 .await
662 .map_err(|_| {
663 error!(challenge, %pseudonym, %destination, "timeout sending session request message");
664 TransportSessionError::Timeout
665 })?
666 .map_err(TransportSessionError::packet_sending)?;
667
668 let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
670 self.cfg.initiation_timeout_base,
671 cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
672 )
673 .into();
674
675 pin_mut!(rx_initiation_done);
677
678 trace!(challenge, "awaiting session establishment");
679 match rx_initiation_done.try_next().timeout(initiation_timeout).await {
680 Ok(Ok(Some(est))) => {
681 let session_id = est.session_id;
683 debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
684
685 let (tx, rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
686 let notifier = self
687 .session_notifiers
688 .get()
689 .map(|(_, notifier)| {
690 let mut notifier = notifier.clone();
691 Box::new(move |session_id: SessionId, reason: ClosureReason| {
692 let _ = notifier
693 .try_send((session_id, reason))
694 .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
695 })
696 })
697 .ok_or(SessionManagerError::NotStarted)?;
698
699 #[cfg(feature = "telemetry")]
700 let telemetry = Arc::new(SessionTelemetry::new(
701 session_id,
702 HoprSessionConfig {
703 capabilities: cfg.capabilities,
704 frame_mtu: self.cfg.frame_mtu,
705 frame_timeout: self.cfg.max_frame_timeout,
706 },
707 ));
708
709 if let Some(balancer_config) = cfg.surb_management {
713 let surb_estimator = AtomicSurbFlowEstimator::default();
714
715 let surb_estimator_clone = surb_estimator.clone();
717 let full_surb_scoring_sender =
718 msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
719 surb_estimator_clone.produced.fetch_add(
721 data.estimate_surbs_with_msg() as u64,
722 std::sync::atomic::Ordering::Relaxed,
723 );
724 futures::future::ok::<_, S::Error>((routing, data))
725 });
726
727 let max_out_organic_surbs = cfg.always_max_out_surbs;
730 let reduced_surb_scoring_sender = full_surb_scoring_sender.clone().with(
731 move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
735 if !max_out_organic_surbs {
736 data.packet_info
738 .get_or_insert_with(|| OutgoingPacketInfo {
739 max_surbs_in_packet: 1,
740 ..Default::default()
741 })
742 .max_surbs_in_packet = 1;
743 }
744 futures::future::ok::<_, S::Error>((routing, data))
745 },
746 );
747
748 let mut abort_handles = AbortableList::default();
749 let surb_mgmt = Arc::new(BalancerStateValues::from(balancer_config));
750
751 let (ka_controller, ka_abort_handle) = utils::spawn_keep_alive_stream(
753 session_id,
754 full_surb_scoring_sender,
755 forward_routing.clone(),
756 if self.cfg.surb_target_notify {
757 SurbNotificationMode::Target
758 } else {
759 SurbNotificationMode::DoNotNotify
760 },
761 surb_mgmt.clone(),
762 );
763 abort_handles.insert(SessionTasks::KeepAlive, ka_abort_handle);
764
765 debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
767 let balancer = SurbBalancer::new(
768 session_id,
769 PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
771 surb_estimator.clone(),
772 SurbControllerWithCorrection(ka_controller, HoprPacket::MAX_SURBS_IN_PACKET as u32),
775 surb_mgmt.clone(),
776 );
777
778 let (level_stream, balancer_abort_handle) =
779 balancer.start_control_loop(self.cfg.balancer_sampling_interval);
780 abort_handles.insert(SessionTasks::Balancer, balancer_abort_handle);
781
782 self.insert_session_slot(
784 session_id,
785 SessionSlot {
786 session_tx: Arc::new(tx),
787 routing_opts: forward_routing.clone(),
788 abort_handles: Arc::new(parking_lot::Mutex::new(abort_handles)),
789 surb_mgmt: surb_mgmt.clone(),
790 surb_estimator: surb_estimator.clone(),
791 #[cfg(feature = "telemetry")]
792 telemetry: telemetry.clone(),
793 },
794 )
795 .await?;
796
797 #[cfg(feature = "telemetry")]
798 {
799 telemetry.set_state(SessionLifecycleState::Active);
800 telemetry.set_balancer_data(surb_estimator.clone(), surb_mgmt);
801 }
802
803 match level_stream
806 .skip_while(|current_level| {
807 futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
808 })
809 .next()
810 .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
811 .await
812 {
813 Ok(Some(surb_level)) => {
814 info!(%session_id, surb_level, "session is ready");
815 }
816 Ok(None) => {
817 return Err(
818 SessionManagerError::other(anyhow!("surb balancer was cancelled prematurely")).into(),
819 );
820 }
821 Err(_) => {
822 warn!(%session_id, "session didn't reach target SURB buffer size in time");
823 }
824 }
825
826 HoprSession::new(
827 session_id,
828 forward_routing,
829 HoprSessionConfig {
830 capabilities: cfg.capabilities,
831 frame_mtu: self.cfg.frame_mtu,
832 frame_timeout: self.cfg.max_frame_timeout,
833 },
834 (
835 reduced_surb_scoring_sender,
836 rx.inspect(move |_| {
837 surb_estimator
840 .consumed
841 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
842 }),
843 ),
844 Some(notifier),
845 #[cfg(feature = "telemetry")]
846 telemetry,
847 )
848 } else {
849 warn!(%session_id, "session ready without SURB balancing");
850
851 self.insert_session_slot(
852 session_id,
853 SessionSlot {
854 session_tx: Arc::new(tx),
855 routing_opts: forward_routing.clone(),
856 abort_handles: Default::default(),
857 surb_mgmt: Default::default(), surb_estimator: Default::default(), #[cfg(feature = "telemetry")]
860 telemetry: telemetry.clone(),
861 },
862 )
863 .await?;
864 #[cfg(feature = "telemetry")]
865 telemetry.set_state(SessionLifecycleState::Active);
866
867 let max_out_organic_surbs = cfg.always_max_out_surbs;
870 let reduced_surb_sender =
871 msg_sender.with(move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
872 if !max_out_organic_surbs {
873 data.packet_info
874 .get_or_insert_with(|| OutgoingPacketInfo {
875 max_surbs_in_packet: 1,
876 ..Default::default()
877 })
878 .max_surbs_in_packet = 1;
879 }
880 futures::future::ok::<_, S::Error>((routing, data))
881 });
882
883 HoprSession::new(
884 session_id,
885 forward_routing,
886 HoprSessionConfig {
887 capabilities: cfg.capabilities,
888 frame_mtu: self.cfg.frame_mtu,
889 frame_timeout: self.cfg.max_frame_timeout,
890 },
891 (reduced_surb_sender, rx),
892 Some(notifier),
893 #[cfg(feature = "telemetry")]
894 telemetry,
895 )
896 }
897 }
898 Ok(Ok(None)) => Err(SessionManagerError::other(anyhow!(
899 "internal error: sender has been closed without completing the session establishment"
900 ))
901 .into()),
902 Ok(Err(error)) => {
903 error!(
905 challenge = error.challenge,
906 ?error,
907 "the other party rejected the session initiation with error"
908 );
909 Err(TransportSessionError::Rejected(error.reason))
910 }
911 Err(_) => {
912 error!(challenge, "session initiation attempt timed out");
914
915 #[cfg(all(feature = "prometheus", not(test)))]
916 METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
917
918 Err(TransportSessionError::Timeout)
919 }
920 }
921 }
922
923 pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
927 if let Some(session_data) = self.sessions.get(id).await {
928 trace!(session_id = ?id, "pinging manually session");
929 Ok(self
930 .msg_sender
931 .get()
932 .cloned()
933 .ok_or(SessionManagerError::NotStarted)?
934 .send((
935 session_data.routing_opts.clone(),
936 ApplicationDataOut::with_no_packet_info(HoprStartProtocol::KeepAlive((*id).into()).try_into()?),
937 ))
938 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
939 .await
940 .map_err(|_| {
941 error!("timeout sending session ping message");
942 TransportSessionError::Timeout
943 })?
944 .map_err(TransportSessionError::packet_sending)?)
945 } else {
946 Err(SessionManagerError::NonExistingSession.into())
947 }
948 }
949
950 pub async fn active_sessions(&self) -> Vec<SessionId> {
952 self.sessions.run_pending_tasks().await;
953 self.sessions.iter().map(|(k, _)| *k).collect()
954 }
955
956 pub async fn update_surb_balancer_config(
961 &self,
962 id: &SessionId,
963 config: SurbBalancerConfig,
964 ) -> crate::errors::Result<()> {
965 let cfg = self
966 .sessions
967 .get(id)
968 .await
969 .ok_or(SessionManagerError::NonExistingSession)?
970 .surb_mgmt;
971
972 if !cfg.is_disabled() {
974 cfg.update(&config);
975 Ok(())
976 } else {
977 Err(SessionManagerError::other(anyhow!("session does not use SURB balancing")).into())
978 }
979 }
980
981 pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
985 match self.sessions.get(id).await {
986 Some(session) => Ok(Some(session.surb_mgmt.as_ref())
987 .filter(|c| !c.is_disabled())
988 .map(|d| d.as_config())),
989 None => Err(SessionManagerError::NonExistingSession.into()),
990 }
991 }
992
993 pub async fn get_surb_level_estimates(&self, id: &SessionId) -> crate::errors::Result<(u64, u64)> {
1000 match self.sessions.get(id).await {
1001 Some(session) => Ok((
1002 session
1003 .surb_estimator
1004 .produced
1005 .load(std::sync::atomic::Ordering::Relaxed),
1006 session
1007 .surb_estimator
1008 .consumed
1009 .load(std::sync::atomic::Ordering::Relaxed),
1010 )),
1011 None => Err(SessionManagerError::NonExistingSession.into()),
1012 }
1013 }
1014
1015 #[cfg(feature = "telemetry")]
1019 pub async fn get_session_telemetry(&self, id: &SessionId) -> crate::errors::Result<SessionStatsSnapshot> {
1020 match self.sessions.get(id).await.map(|session| session.telemetry) {
1021 Some(telemetry) => Ok(telemetry.snapshot()),
1022 None => Err(SessionManagerError::NonExistingSession.into()),
1023 }
1024 }
1025
1026 pub async fn dispatch_message(
1033 &self,
1034 pseudonym: HoprPseudonym,
1035 in_data: ApplicationDataIn,
1036 ) -> crate::errors::Result<DispatchResult> {
1037 if in_data.data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
1038 trace!("dispatching Start protocol message");
1040 return self
1041 .handle_start_protocol_message(pseudonym, in_data)
1042 .await
1043 .map(|_| DispatchResult::Processed);
1044 } else if self
1045 .cfg
1046 .session_tag_range
1047 .contains(&in_data.data.application_tag.as_u64())
1048 {
1049 let session_id = SessionId::new(in_data.data.application_tag, pseudonym);
1050
1051 return if let Some(session_slot) = self.sessions.get(&session_id).await {
1052 trace!(?session_id, "received data for a registered session");
1053
1054 Ok(session_slot
1055 .session_tx
1056 .unbounded_send(in_data)
1057 .map(|_| DispatchResult::Processed)
1058 .map_err(SessionManagerError::other)?)
1059 } else {
1060 error!(%session_id, "received data from an unestablished session");
1061 Err(TransportSessionError::UnknownData)
1062 };
1063 }
1064
1065 trace!(tag = %in_data.data.application_tag, "received data not associated with session protocol or any existing session");
1066 Ok(DispatchResult::Unrelated(in_data))
1067 }
1068
1069 async fn handle_incoming_session_initiation(
1070 &self,
1071 pseudonym: HoprPseudonym,
1072 session_req: StartInitiation<SessionTarget, ByteCapabilities>,
1073 ) -> crate::errors::Result<()> {
1074 trace!(challenge = session_req.challenge, "received session initiation request");
1075
1076 debug!(%pseudonym, "got new session request, searching for a free session slot");
1077
1078 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
1079
1080 let (mut new_session_notifier, mut close_session_notifier) = self
1081 .session_notifiers
1082 .get()
1083 .cloned()
1084 .ok_or(SessionManagerError::NotStarted)?;
1085
1086 let reply_routing = DestinationRouting::Return(pseudonym.into());
1088
1089 let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1090
1091 self.sessions.run_pending_tasks().await; let allocated_slot = if self.cfg.maximum_sessions > self.sessions.entry_count() as usize {
1094 insert_into_next_slot(
1095 &self.sessions,
1096 |sid| {
1097 let next_tag: Tag = match sid {
1100 Some(session_id) => ((session_id.tag().as_u64() + 1) % self.cfg.session_tag_range.end)
1101 .max(self.cfg.session_tag_range.start)
1102 .into(),
1103 None => hopr_crypto_random::random_integer(
1104 self.cfg.session_tag_range.start,
1105 Some(self.cfg.session_tag_range.end),
1106 )
1107 .into(),
1108 };
1109 SessionId::new(next_tag, pseudonym)
1110 },
1111 |_sid| SessionSlot {
1112 session_tx: Arc::new(tx_session_data),
1113 routing_opts: reply_routing.clone(),
1114 abort_handles: Default::default(),
1115 surb_mgmt: Default::default(),
1116 surb_estimator: Default::default(),
1117 #[cfg(feature = "telemetry")]
1118 telemetry: Arc::new(SessionTelemetry::new(
1119 _sid,
1120 HoprSessionConfig {
1121 capabilities: session_req.capabilities.0,
1122 frame_mtu: self.cfg.frame_mtu,
1123 frame_timeout: self.cfg.max_frame_timeout,
1124 },
1125 )),
1126 },
1127 )
1128 .await
1129 } else {
1130 error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
1131 None
1132 };
1133
1134 if let Some((session_id, slot)) = allocated_slot {
1137 debug!(%session_id, ?session_req, "assigned a new session");
1138
1139 let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
1140 if let Err(error) = close_session_notifier.try_send((session_id, reason)) {
1141 error!(%session_id, %error, %reason, "failed to notify session closure");
1142 }
1143 });
1144
1145 let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
1146 let egress_rate_control =
1148 RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1149
1150 let target_surb_buffer_size = if session_req.additional_data > 0 {
1153 (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1154 } else {
1155 self.cfg.initial_return_session_egress_rate as u64
1156 * self
1157 .cfg
1158 .minimum_surb_buffer_duration
1159 .max(MIN_SURB_BUFFER_DURATION)
1160 .as_secs()
1161 };
1162
1163 let surb_estimator_clone = slot.surb_estimator.clone();
1164 let session = HoprSession::new(
1165 session_id,
1166 reply_routing.clone(),
1167 HoprSessionConfig {
1168 capabilities: session_req.capabilities.into(),
1169 frame_mtu: self.cfg.frame_mtu,
1170 frame_timeout: self.cfg.max_frame_timeout,
1171 },
1172 (
1173 msg_sender
1175 .clone()
1176 .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1177 surb_estimator_clone
1179 .consumed
1180 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1181 futures::future::ok::<_, S::Error>((routing, data))
1182 })
1183 .rate_limit_with_controller(&egress_rate_control)
1184 .buffer((2 * target_surb_buffer_size) as usize),
1185 rx_session_data.inspect(move |data| {
1187 surb_estimator_clone
1189 .produced
1190 .fetch_add(data.num_surbs_with_msg() as u64, std::sync::atomic::Ordering::Relaxed);
1191 }),
1192 ),
1193 Some(closure_notifier),
1194 #[cfg(feature = "telemetry")]
1195 slot.telemetry.clone(),
1196 )?;
1197
1198 let balancer_config = SurbBalancerConfig {
1202 target_surb_buffer_size,
1203 max_surbs_per_sec: target_surb_buffer_size / self.cfg.minimum_surb_buffer_duration.as_secs(),
1205 surb_decay: None,
1208 };
1209
1210 slot.surb_mgmt.update(&balancer_config);
1211
1212 #[cfg(feature = "telemetry")]
1213 {
1214 slot.telemetry.set_state(SessionLifecycleState::Active);
1215 slot.telemetry
1216 .set_balancer_data(slot.surb_estimator.clone(), slot.surb_mgmt.clone());
1217 }
1218
1219 debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1222 let balancer = SurbBalancer::new(
1223 session_id,
1224 SimpleBalancerController::default(),
1225 slot.surb_estimator.clone(),
1226 SurbControllerWithCorrection(egress_rate_control, 1), slot.surb_mgmt.clone(),
1228 );
1229
1230 let (_, balancer_abort_handle) = balancer.start_control_loop(self.cfg.balancer_sampling_interval);
1232 slot.abort_handles
1233 .lock()
1234 .insert(SessionTasks::Balancer, balancer_abort_handle);
1235
1236 if let Some(period) = self.cfg.surb_balance_notify_period {
1238 let surb_estimator_clone = slot.surb_estimator.clone();
1239 let (ka_controller, ka_abort_handle) = utils::spawn_keep_alive_stream(
1240 session_id,
1241 msg_sender
1243 .clone()
1244 .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1245 surb_estimator_clone
1247 .consumed
1248 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1249 futures::future::ok::<_, S::Error>((routing, data))
1250 }),
1251 slot.routing_opts.clone(),
1252 SurbNotificationMode::Level(slot.surb_estimator.clone()),
1253 slot.surb_mgmt.clone(),
1254 );
1255
1256 hopr_async_runtime::prelude::spawn(async move {
1258 hopr_async_runtime::prelude::sleep(period).await;
1260 ka_controller.set_rate_per_unit(1, period);
1261 });
1262
1263 slot.abort_handles
1264 .lock()
1265 .insert(SessionTasks::KeepAlive, ka_abort_handle);
1266
1267 debug!(%session_id, ?period, "started SURB level-notifying keep-alive stream");
1268 }
1269
1270 session
1271 } else {
1272 HoprSession::new(
1273 session_id,
1274 reply_routing.clone(),
1275 HoprSessionConfig {
1276 capabilities: session_req.capabilities.into(),
1277 frame_mtu: self.cfg.frame_mtu,
1278 frame_timeout: self.cfg.max_frame_timeout,
1279 },
1280 (msg_sender.clone(), rx_session_data),
1281 Some(closure_notifier),
1282 #[cfg(feature = "telemetry")]
1283 slot.telemetry.clone(),
1284 )?
1285 };
1286
1287 let incoming_session = IncomingSession {
1289 session,
1290 target: session_req.target,
1291 };
1292
1293 match new_session_notifier
1295 .send(incoming_session)
1296 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1297 .await
1298 {
1299 Err(_) => {
1300 error!(%session_id, "timeout to notify about new incoming session");
1301 return Err(TransportSessionError::Timeout);
1302 }
1303 Ok(Err(error)) => {
1304 error!(%session_id, %error, "failed to notify about new incoming session");
1305 return Err(SessionManagerError::other(error).into());
1306 }
1307 _ => {}
1308 };
1309
1310 trace!(?session_id, "session notification sent");
1311
1312 let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1315 orig_challenge: session_req.challenge,
1316 session_id,
1317 });
1318
1319 msg_sender
1320 .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1321 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1322 .await
1323 .map_err(|_| {
1324 error!(%session_id, "timeout sending session establishment message");
1325 TransportSessionError::Timeout
1326 })?
1327 .map_err(|error| {
1328 error!(%session_id, %error, "failed to send session establishment message");
1329 SessionManagerError::other(error)
1330 })?;
1331
1332 info!(%session_id, "new session established");
1333
1334 #[cfg(all(feature = "prometheus", not(test)))]
1335 {
1336 METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1337 METRIC_ACTIVE_SESSIONS.increment(1.0);
1338 }
1339 } else {
1340 error!(%pseudonym,"failed to reserve a new session slot");
1341
1342 let reason = StartErrorReason::NoSlotsAvailable;
1344 let data = HoprStartProtocol::SessionError(StartErrorType {
1345 challenge: session_req.challenge,
1346 reason,
1347 });
1348
1349 msg_sender
1350 .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1351 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1352 .await
1353 .map_err(|_| {
1354 error!("timeout sending session error message");
1355 TransportSessionError::Timeout
1356 })?
1357 .map_err(|error| {
1358 error!(%error, "failed to send session error message");
1359 SessionManagerError::other(error)
1360 })?;
1361
1362 trace!(%pseudonym, "session establishment failure message sent");
1363
1364 #[cfg(all(feature = "prometheus", not(test)))]
1365 METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1366 }
1367
1368 Ok(())
1369 }
1370
1371 async fn handle_start_protocol_message(
1372 &self,
1373 pseudonym: HoprPseudonym,
1374 data: ApplicationDataIn,
1375 ) -> crate::errors::Result<()> {
1376 match HoprStartProtocol::try_from(data.data)? {
1377 HoprStartProtocol::StartSession(session_req) => {
1378 self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1379 }
1380 HoprStartProtocol::SessionEstablished(est) => {
1381 trace!(
1382 session_id = ?est.session_id,
1383 "received session establishment confirmation"
1384 );
1385 let challenge = est.orig_challenge;
1386 let session_id = est.session_id;
1387 if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1388 if let Err(error) = tx_est.unbounded_send(Ok(est)) {
1389 error!(%challenge, %session_id, %error, "failed to send session establishment confirmation");
1390 return Err(SessionManagerError::other(error).into());
1391 }
1392 debug!(?session_id, challenge, "session establishment complete");
1393 } else {
1394 error!(%session_id, challenge, "unknown session establishment attempt or expired");
1395 }
1396 }
1397 HoprStartProtocol::SessionError(error_type) => {
1398 trace!(
1399 challenge = error_type.challenge,
1400 error = ?error_type.reason,
1401 "failed to initialize a session",
1402 );
1403 if let Some(tx_est) = self.session_initiations.remove(&error_type.challenge).await {
1406 if let Err(error) = tx_est.unbounded_send(Err(error_type)) {
1407 error!(%error, ?error_type, "could not send session error message");
1408 return Err(SessionManagerError::other(error).into());
1409 }
1410 error!(
1411 challenge = error_type.challenge,
1412 ?error_type,
1413 "session establishment error received"
1414 );
1415 } else {
1416 error!(
1417 challenge = error_type.challenge,
1418 ?error_type,
1419 "session establishment attempt expired before error could be delivered"
1420 );
1421 }
1422
1423 #[cfg(all(feature = "prometheus", not(test)))]
1424 METRIC_RECEIVED_SESSION_ERRS.increment(&[&error_type.reason.to_string()])
1425 }
1426 HoprStartProtocol::KeepAlive(msg) => {
1427 let session_id = msg.session_id;
1428 if let Some(session_slot) = self.sessions.get(&session_id).await {
1429 trace!(?session_id, "received keep-alive message");
1430 match &session_slot.routing_opts {
1431 DestinationRouting::Forward { .. } => {
1433 if msg.flags.contains(KeepAliveFlag::BalancerState)
1434 && !session_slot.surb_mgmt.is_disabled()
1435 && session_slot.surb_mgmt.buffer_level() != msg.additional_data
1436 {
1437 session_slot
1439 .surb_mgmt
1440 .buffer_level
1441 .store(msg.additional_data, std::sync::atomic::Ordering::Relaxed);
1442 debug!(%session_id, surb_level = msg.additional_data, "keep-alive updated SURB buffer size from the Exit");
1443 }
1444
1445 session_slot
1447 .surb_estimator
1448 .consumed
1449 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1450 }
1451 DestinationRouting::Return(_) => {
1453 if msg.flags.contains(KeepAliveFlag::BalancerTarget)
1455 && msg.additional_data > 0
1456 && !session_slot.surb_mgmt.is_disabled()
1457 && session_slot.surb_mgmt.controller_bounds().target() != msg.additional_data
1458 {
1459 session_slot
1461 .surb_mgmt
1462 .target_surb_buffer_size
1463 .store(msg.additional_data, std::sync::atomic::Ordering::Relaxed);
1464 session_slot.surb_mgmt.max_surbs_per_sec.store(
1466 msg.additional_data / self.cfg.minimum_surb_buffer_duration.as_secs(),
1467 std::sync::atomic::Ordering::Relaxed,
1468 );
1469 debug!(%session_id, target_surb_buffer_size = msg.additional_data, "keep-alive updated SURB balancer target buffer size from the Entry");
1470 }
1471
1472 session_slot.surb_estimator.produced.fetch_add(
1475 KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64,
1476 std::sync::atomic::Ordering::Relaxed,
1477 );
1478 }
1479 }
1480 } else {
1481 debug!(%session_id, "received keep-alive request for an unknown session");
1482 }
1483 }
1484 }
1485
1486 Ok(())
1487 }
1488}
1489
1490#[cfg(test)]
1491mod tests {
1492 use anyhow::anyhow;
1493 use futures::{AsyncWriteExt, future::BoxFuture};
1494 use hopr_crypto_random::Randomizable;
1495 use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
1496 use hopr_primitive_types::prelude::Address;
1497 use hopr_protocol_start::{StartProtocol, StartProtocolDiscriminants};
1498 use tokio::time::timeout;
1499
1500 use super::*;
1501 use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1502
1503 #[async_trait::async_trait]
1504 trait SendMsg {
1505 async fn send_message(
1506 &self,
1507 routing: DestinationRouting,
1508 data: ApplicationDataOut,
1509 ) -> crate::errors::Result<()>;
1510 }
1511
1512 mockall::mock! {
1513 MsgSender {}
1514 impl SendMsg for MsgSender {
1515 fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationDataOut)
1516 -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1517 }
1518 }
1519
1520 fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationDataOut)> {
1521 let (tx, rx) = futures::channel::mpsc::unbounded();
1522 tokio::task::spawn(async move {
1523 pin_mut!(rx);
1524 while let Some((routing, data)) = rx.next().await {
1525 sender
1526 .send_message(routing, data)
1527 .await
1528 .expect("send message must not fail in mock");
1529 }
1530 });
1531 tx
1532 }
1533
1534 fn msg_type(data: &ApplicationDataOut, expected: StartProtocolDiscriminants) -> bool {
1535 HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1536 .map(|d| StartProtocolDiscriminants::from(d) == expected)
1537 .unwrap_or(false)
1538 }
1539
1540 fn start_msg_match(data: &ApplicationDataOut, msg: impl Fn(HoprStartProtocol) -> bool) -> bool {
1541 HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1542 .map(msg)
1543 .unwrap_or(false)
1544 }
1545
1546 #[test_log::test(tokio::test)]
1547 async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1548 {
1549 let alice_pseudonym = HoprPseudonym::random();
1550 let bob_peer: Address = (&ChainKeypair::random()).into();
1551
1552 let alice_mgr = SessionManager::new(Default::default());
1553 let bob_mgr = SessionManager::new(Default::default());
1554
1555 let mut sequence = mockall::Sequence::new();
1556 let mut alice_transport = MockMsgSender::new();
1557 let mut bob_transport = MockMsgSender::new();
1558
1559 let bob_mgr_clone = bob_mgr.clone();
1561 alice_transport
1562 .expect_send_message()
1563 .once()
1564 .in_sequence(&mut sequence)
1565 .withf(move |peer, data| {
1566 info!("alice sends {}", data.data.application_tag);
1567 msg_type(data, StartProtocolDiscriminants::StartSession)
1568 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1569 })
1570 .returning(move |_, data| {
1571 let bob_mgr_clone = bob_mgr_clone.clone();
1572 Box::pin(async move {
1573 bob_mgr_clone
1574 .dispatch_message(
1575 alice_pseudonym,
1576 ApplicationDataIn {
1577 data: data.data,
1578 packet_info: Default::default(),
1579 },
1580 )
1581 .await?;
1582 Ok(())
1583 })
1584 });
1585
1586 let alice_mgr_clone = alice_mgr.clone();
1588 bob_transport
1589 .expect_send_message()
1590 .once()
1591 .in_sequence(&mut sequence)
1592 .withf(move |peer, data| {
1593 info!("bob sends {}", data.data.application_tag);
1594 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1595 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1596 })
1597 .returning(move |_, data| {
1598 let alice_mgr_clone = alice_mgr_clone.clone();
1599
1600 Box::pin(async move {
1601 alice_mgr_clone
1602 .dispatch_message(
1603 alice_pseudonym,
1604 ApplicationDataIn {
1605 data: data.data,
1606 packet_info: Default::default(),
1607 },
1608 )
1609 .await?;
1610 Ok(())
1611 })
1612 });
1613
1614 let bob_mgr_clone = bob_mgr.clone();
1616 alice_transport
1617 .expect_send_message()
1618 .once()
1619 .in_sequence(&mut sequence)
1620 .withf(move |peer, data| {
1621 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1622 data.data.plain_text.as_ref(),
1623 )
1624 .expect("must be a session message")
1625 .try_as_segment()
1626 .expect("must be a segment")
1627 .is_terminating()
1628 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1629 })
1630 .returning(move |_, data| {
1631 let bob_mgr_clone = bob_mgr_clone.clone();
1632 Box::pin(async move {
1633 bob_mgr_clone
1634 .dispatch_message(
1635 alice_pseudonym,
1636 ApplicationDataIn {
1637 data: data.data,
1638 packet_info: Default::default(),
1639 },
1640 )
1641 .await?;
1642 Ok(())
1643 })
1644 });
1645
1646 let mut ahs = Vec::new();
1647
1648 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1650 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1651
1652 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1654 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1655
1656 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1657
1658 pin_mut!(new_session_rx_bob);
1659 let (alice_session, bob_session) = timeout(
1660 Duration::from_secs(2),
1661 futures::future::join(
1662 alice_mgr.new_session(
1663 bob_peer,
1664 SessionTarget::TcpStream(target.clone()),
1665 SessionClientConfig {
1666 pseudonym: alice_pseudonym.into(),
1667 capabilities: Capability::NoRateControl | Capability::Segmentation,
1668 surb_management: None,
1669 ..Default::default()
1670 },
1671 ),
1672 new_session_rx_bob.next(),
1673 ),
1674 )
1675 .await?;
1676
1677 let mut alice_session = alice_session?;
1678 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1679
1680 assert_eq!(
1681 alice_session.config().capabilities,
1682 Capability::Segmentation | Capability::NoRateControl
1683 );
1684 assert_eq!(
1685 alice_session.config().capabilities,
1686 bob_session.session.config().capabilities
1687 );
1688 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1689
1690 assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1691 assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1692 assert!(
1693 alice_mgr
1694 .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1695 .await
1696 .is_err()
1697 );
1698
1699 assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1700 assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1701 assert!(
1702 bob_mgr
1703 .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1704 .await
1705 .is_err()
1706 );
1707
1708 tokio::time::sleep(Duration::from_millis(100)).await;
1709 alice_session.close().await?;
1710
1711 tokio::time::sleep(Duration::from_millis(100)).await;
1712
1713 assert!(matches!(
1714 alice_mgr.ping_session(alice_session.id()).await,
1715 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1716 ));
1717
1718 futures::stream::iter(ahs)
1719 .for_each(|ah| async move { ah.abort() })
1720 .await;
1721
1722 Ok(())
1723 }
1724
1725 #[test_log::test(tokio::test)]
1726 async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1727 let alice_pseudonym = HoprPseudonym::random();
1728 let bob_peer: Address = (&ChainKeypair::random()).into();
1729
1730 let cfg = SessionManagerConfig {
1731 idle_timeout: Duration::from_millis(200),
1732 ..Default::default()
1733 };
1734
1735 let alice_mgr = SessionManager::new(cfg);
1736 let bob_mgr = SessionManager::new(Default::default());
1737
1738 let mut sequence = mockall::Sequence::new();
1739 let mut alice_transport = MockMsgSender::new();
1740 let mut bob_transport = MockMsgSender::new();
1741
1742 let bob_mgr_clone = bob_mgr.clone();
1744 alice_transport
1745 .expect_send_message()
1746 .once()
1747 .in_sequence(&mut sequence)
1748 .withf(move |peer, data| {
1749 msg_type(data, StartProtocolDiscriminants::StartSession)
1750 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1751 })
1752 .returning(move |_, data| {
1753 let bob_mgr_clone = bob_mgr_clone.clone();
1754 Box::pin(async move {
1755 bob_mgr_clone
1756 .dispatch_message(
1757 alice_pseudonym,
1758 ApplicationDataIn {
1759 data: data.data,
1760 packet_info: Default::default(),
1761 },
1762 )
1763 .await?;
1764 Ok(())
1765 })
1766 });
1767
1768 let alice_mgr_clone = alice_mgr.clone();
1770 bob_transport
1771 .expect_send_message()
1772 .once()
1773 .in_sequence(&mut sequence)
1774 .withf(move |peer, data| {
1775 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1776 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1777 })
1778 .returning(move |_, data| {
1779 let alice_mgr_clone = alice_mgr_clone.clone();
1780
1781 Box::pin(async move {
1782 alice_mgr_clone
1783 .dispatch_message(
1784 alice_pseudonym,
1785 ApplicationDataIn {
1786 data: data.data,
1787 packet_info: Default::default(),
1788 },
1789 )
1790 .await?;
1791 Ok(())
1792 })
1793 });
1794
1795 let mut ahs = Vec::new();
1796
1797 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1799 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1800
1801 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1803 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1804
1805 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1806
1807 pin_mut!(new_session_rx_bob);
1808 let (alice_session, bob_session) = timeout(
1809 Duration::from_secs(2),
1810 futures::future::join(
1811 alice_mgr.new_session(
1812 bob_peer,
1813 SessionTarget::TcpStream(target.clone()),
1814 SessionClientConfig {
1815 pseudonym: alice_pseudonym.into(),
1816 capabilities: Capability::NoRateControl | Capability::Segmentation,
1817 surb_management: None,
1818 ..Default::default()
1819 },
1820 ),
1821 new_session_rx_bob.next(),
1822 ),
1823 )
1824 .await?;
1825
1826 let alice_session = alice_session?;
1827 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1828
1829 assert_eq!(
1830 alice_session.config().capabilities,
1831 Capability::Segmentation | Capability::NoRateControl,
1832 );
1833 assert_eq!(
1834 alice_session.config().capabilities,
1835 bob_session.session.config().capabilities
1836 );
1837 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1838
1839 tokio::time::sleep(Duration::from_millis(300)).await;
1841
1842 assert!(matches!(
1843 alice_mgr.ping_session(alice_session.id()).await,
1844 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1845 ));
1846
1847 futures::stream::iter(ahs)
1848 .for_each(|ah| async move { ah.abort() })
1849 .await;
1850
1851 Ok(())
1852 }
1853
1854 #[test_log::test(tokio::test)]
1855 async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1856 let alice_pseudonym = HoprPseudonym::random();
1857 let session_id = SessionId::new(16u64, alice_pseudonym);
1858 let balancer_cfg = SurbBalancerConfig {
1859 target_surb_buffer_size: 1000,
1860 max_surbs_per_sec: 100,
1861 ..Default::default()
1862 };
1863
1864 let alice_mgr = SessionManager::<
1865 UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
1866 futures::channel::mpsc::Sender<IncomingSession>,
1867 >::new(Default::default());
1868
1869 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1870 alice_mgr
1871 .sessions
1872 .insert(
1873 session_id,
1874 SessionSlot {
1875 session_tx: Arc::new(dummy_tx),
1876 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1877 abort_handles: Default::default(),
1878 surb_mgmt: Arc::new(BalancerStateValues::from(balancer_cfg)),
1879 surb_estimator: Default::default(),
1880 #[cfg(feature = "telemetry")]
1881 telemetry: Arc::new(SessionTelemetry::new(session_id, Default::default())),
1882 },
1883 )
1884 .await;
1885
1886 let actual_cfg = alice_mgr
1887 .get_surb_balancer_config(&session_id)
1888 .await?
1889 .ok_or(anyhow!("session must have a surb balancer config"))?;
1890 assert_eq!(actual_cfg, balancer_cfg);
1891
1892 let new_cfg = SurbBalancerConfig {
1893 target_surb_buffer_size: 2000,
1894 max_surbs_per_sec: 200,
1895 ..Default::default()
1896 };
1897 alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1898
1899 let actual_cfg = alice_mgr
1900 .get_surb_balancer_config(&session_id)
1901 .await?
1902 .ok_or(anyhow!("session must have a surb balancer config"))?;
1903 assert_eq!(actual_cfg, new_cfg);
1904
1905 Ok(())
1906 }
1907
1908 #[test_log::test(tokio::test)]
1909 async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1910 let alice_pseudonym = HoprPseudonym::random();
1911 let bob_peer: Address = (&ChainKeypair::random()).into();
1912
1913 let cfg = SessionManagerConfig {
1914 session_tag_range: 16u64..17u64, ..Default::default()
1916 };
1917
1918 let alice_mgr = SessionManager::new(Default::default());
1919 let bob_mgr = SessionManager::new(cfg);
1920
1921 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1923 bob_mgr
1924 .sessions
1925 .insert(
1926 SessionId::new(16u64, alice_pseudonym),
1927 SessionSlot {
1928 session_tx: Arc::new(dummy_tx),
1929 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1930 abort_handles: Default::default(),
1931 #[cfg(feature = "telemetry")]
1932 telemetry: Arc::new(SessionTelemetry::new(
1933 SessionId::new(16u64, alice_pseudonym),
1934 Default::default(),
1935 )),
1936 surb_mgmt: Default::default(),
1937 surb_estimator: Default::default(),
1938 },
1939 )
1940 .await;
1941
1942 let mut sequence = mockall::Sequence::new();
1943 let mut alice_transport = MockMsgSender::new();
1944 let mut bob_transport = MockMsgSender::new();
1945
1946 let bob_mgr_clone = bob_mgr.clone();
1948 alice_transport
1949 .expect_send_message()
1950 .once()
1951 .in_sequence(&mut sequence)
1952 .withf(move |peer, data| {
1953 msg_type(data, StartProtocolDiscriminants::StartSession)
1954 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1955 })
1956 .returning(move |_, data| {
1957 let bob_mgr_clone = bob_mgr_clone.clone();
1958 Box::pin(async move {
1959 bob_mgr_clone
1960 .dispatch_message(
1961 alice_pseudonym,
1962 ApplicationDataIn {
1963 data: data.data,
1964 packet_info: Default::default(),
1965 },
1966 )
1967 .await?;
1968 Ok(())
1969 })
1970 });
1971
1972 let alice_mgr_clone = alice_mgr.clone();
1974 bob_transport
1975 .expect_send_message()
1976 .once()
1977 .in_sequence(&mut sequence)
1978 .withf(move |peer, data| {
1979 msg_type(data, StartProtocolDiscriminants::SessionError)
1980 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1981 })
1982 .returning(move |_, data| {
1983 let alice_mgr_clone = alice_mgr_clone.clone();
1984 Box::pin(async move {
1985 alice_mgr_clone
1986 .dispatch_message(
1987 alice_pseudonym,
1988 ApplicationDataIn {
1989 data: data.data,
1990 packet_info: Default::default(),
1991 },
1992 )
1993 .await?;
1994 Ok(())
1995 })
1996 });
1997
1998 let mut jhs = Vec::new();
1999
2000 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2002 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2003
2004 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2006 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2007
2008 let result = alice_mgr
2009 .new_session(
2010 bob_peer,
2011 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2012 SessionClientConfig {
2013 capabilities: Capabilities::empty(),
2014 pseudonym: alice_pseudonym.into(),
2015 surb_management: None,
2016 ..Default::default()
2017 },
2018 )
2019 .await;
2020
2021 assert!(
2022 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2023 );
2024
2025 Ok(())
2026 }
2027
2028 #[test_log::test(tokio::test)]
2029 async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
2030 -> anyhow::Result<()> {
2031 let alice_pseudonym = HoprPseudonym::random();
2032 let bob_peer: Address = (&ChainKeypair::random()).into();
2033
2034 let cfg = SessionManagerConfig {
2035 maximum_sessions: 1,
2036 ..Default::default()
2037 };
2038
2039 let alice_mgr = SessionManager::new(Default::default());
2040 let bob_mgr = SessionManager::new(cfg);
2041
2042 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
2044 bob_mgr
2045 .sessions
2046 .insert(
2047 SessionId::new(16u64, alice_pseudonym),
2048 SessionSlot {
2049 session_tx: Arc::new(dummy_tx),
2050 routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
2051 abort_handles: Default::default(),
2052 surb_mgmt: Default::default(),
2053 surb_estimator: Default::default(),
2054 #[cfg(feature = "telemetry")]
2055 telemetry: Arc::new(SessionTelemetry::new(
2056 SessionId::new(16u64, alice_pseudonym),
2057 Default::default(),
2058 )),
2059 },
2060 )
2061 .await;
2062
2063 let mut sequence = mockall::Sequence::new();
2064 let mut alice_transport = MockMsgSender::new();
2065 let mut bob_transport = MockMsgSender::new();
2066
2067 let bob_mgr_clone = bob_mgr.clone();
2069 alice_transport
2070 .expect_send_message()
2071 .once()
2072 .in_sequence(&mut sequence)
2073 .withf(move |peer, data| {
2074 msg_type(data, StartProtocolDiscriminants::StartSession)
2075 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2076 })
2077 .returning(move |_, data| {
2078 let bob_mgr_clone = bob_mgr_clone.clone();
2079 Box::pin(async move {
2080 bob_mgr_clone
2081 .dispatch_message(
2082 alice_pseudonym,
2083 ApplicationDataIn {
2084 data: data.data,
2085 packet_info: Default::default(),
2086 },
2087 )
2088 .await?;
2089 Ok(())
2090 })
2091 });
2092
2093 let alice_mgr_clone = alice_mgr.clone();
2095 bob_transport
2096 .expect_send_message()
2097 .once()
2098 .in_sequence(&mut sequence)
2099 .withf(move |peer, data| {
2100 msg_type(data, StartProtocolDiscriminants::SessionError)
2101 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2102 })
2103 .returning(move |_, data| {
2104 let alice_mgr_clone = alice_mgr_clone.clone();
2105 Box::pin(async move {
2106 alice_mgr_clone
2107 .dispatch_message(
2108 alice_pseudonym,
2109 ApplicationDataIn {
2110 data: data.data,
2111 packet_info: Default::default(),
2112 },
2113 )
2114 .await?;
2115 Ok(())
2116 })
2117 });
2118
2119 let mut jhs = Vec::new();
2120
2121 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2123 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2124
2125 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2127 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2128
2129 let result = alice_mgr
2130 .new_session(
2131 bob_peer,
2132 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2133 SessionClientConfig {
2134 capabilities: None.into(),
2135 pseudonym: alice_pseudonym.into(),
2136 surb_management: None,
2137 ..Default::default()
2138 },
2139 )
2140 .await;
2141
2142 assert!(
2143 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2144 );
2145
2146 Ok(())
2147 }
2148
2149 #[test_log::test(tokio::test)]
2150 async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
2151 let alice_pseudonym = HoprPseudonym::random();
2152 let bob_peer: Address = (&ChainKeypair::random()).into();
2153
2154 let alice_mgr = SessionManager::new(Default::default());
2155
2156 let mut sequence = mockall::Sequence::new();
2157 let mut alice_transport = MockMsgSender::new();
2158
2159 let alice_mgr_clone = alice_mgr.clone();
2161 alice_transport
2162 .expect_send_message()
2163 .once()
2164 .in_sequence(&mut sequence)
2165 .withf(move |peer, data| {
2166 msg_type(data, StartProtocolDiscriminants::StartSession)
2167 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2168 })
2169 .returning(move |_, data| {
2170 let alice_mgr_clone = alice_mgr_clone.clone();
2172 Box::pin(async move {
2173 alice_mgr_clone
2174 .dispatch_message(
2175 alice_pseudonym,
2176 ApplicationDataIn {
2177 data: data.data,
2178 packet_info: Default::default(),
2179 },
2180 )
2181 .await?;
2182 Ok(())
2183 })
2184 });
2185
2186 let alice_mgr_clone = alice_mgr.clone();
2188 alice_transport
2189 .expect_send_message()
2190 .once()
2191 .in_sequence(&mut sequence)
2192 .withf(move |peer, data| {
2193 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2194 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2195 })
2196 .returning(move |_, data| {
2197 let alice_mgr_clone = alice_mgr_clone.clone();
2198
2199 Box::pin(async move {
2200 alice_mgr_clone
2201 .dispatch_message(
2202 alice_pseudonym,
2203 ApplicationDataIn {
2204 data: data.data,
2205 packet_info: Default::default(),
2206 },
2207 )
2208 .await?;
2209 Ok(())
2210 })
2211 });
2212
2213 let (new_session_tx_alice, new_session_rx_alice) = futures::channel::mpsc::channel(1024);
2215 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2216
2217 let alice_session = alice_mgr
2218 .new_session(
2219 bob_peer,
2220 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2221 SessionClientConfig {
2222 capabilities: None.into(),
2223 pseudonym: alice_pseudonym.into(),
2224 surb_management: None,
2225 ..Default::default()
2226 },
2227 )
2228 .await;
2229
2230 println!("{alice_session:?}");
2231 assert!(matches!(
2232 alice_session,
2233 Err(TransportSessionError::Manager(SessionManagerError::Loopback))
2234 ));
2235
2236 drop(new_session_rx_alice);
2237 Ok(())
2238 }
2239
2240 #[test_log::test(tokio::test)]
2241 async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
2242 let bob_peer: Address = (&ChainKeypair::random()).into();
2243
2244 let cfg = SessionManagerConfig {
2245 initiation_timeout_base: Duration::from_millis(100),
2246 ..Default::default()
2247 };
2248
2249 let alice_mgr = SessionManager::new(cfg);
2250 let bob_mgr = SessionManager::new(Default::default());
2251
2252 let mut sequence = mockall::Sequence::new();
2253 let mut alice_transport = MockMsgSender::new();
2254 let bob_transport = MockMsgSender::new();
2255
2256 alice_transport
2258 .expect_send_message()
2259 .once()
2260 .in_sequence(&mut sequence)
2261 .withf(move |peer, data| {
2262 msg_type(data, StartProtocolDiscriminants::StartSession)
2263 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2264 })
2265 .returning(|_, _| Box::pin(async { Ok(()) }));
2266
2267 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2269 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2270
2271 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2273 bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
2274
2275 let result = alice_mgr
2276 .new_session(
2277 bob_peer,
2278 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2279 SessionClientConfig {
2280 capabilities: None.into(),
2281 pseudonym: None,
2282 surb_management: None,
2283 ..Default::default()
2284 },
2285 )
2286 .await;
2287
2288 assert!(matches!(result, Err(TransportSessionError::Timeout)));
2289
2290 Ok(())
2291 }
2292
2293 #[test_log::test(tokio::test)]
2294 async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
2295 let alice_pseudonym = HoprPseudonym::random();
2296 let bob_peer: Address = (&ChainKeypair::random()).into();
2297
2298 let bob_cfg = SessionManagerConfig {
2299 surb_balance_notify_period: Some(Duration::from_millis(500)),
2300 ..Default::default()
2301 };
2302 let alice_mgr = SessionManager::new(Default::default());
2303 let bob_mgr = SessionManager::new(bob_cfg.clone());
2304
2305 let mut alice_transport = MockMsgSender::new();
2306 let mut bob_transport = MockMsgSender::new();
2307
2308 let mut open_sequence = mockall::Sequence::new();
2310 let bob_mgr_clone = bob_mgr.clone();
2311 alice_transport
2312 .expect_send_message()
2313 .once()
2314 .in_sequence(&mut open_sequence)
2315 .withf(move |peer, data| {
2316 msg_type(data, StartProtocolDiscriminants::StartSession)
2317 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2318 })
2319 .returning(move |_, data| {
2320 let bob_mgr_clone = bob_mgr_clone.clone();
2321 Box::pin(async move {
2322 bob_mgr_clone
2323 .dispatch_message(
2324 alice_pseudonym,
2325 ApplicationDataIn {
2326 data: data.data,
2327 packet_info: Default::default(),
2328 },
2329 )
2330 .await?;
2331 Ok(())
2332 })
2333 });
2334
2335 let alice_mgr_clone = alice_mgr.clone();
2337 bob_transport
2338 .expect_send_message()
2339 .once()
2340 .in_sequence(&mut open_sequence)
2341 .withf(move |peer, data| {
2342 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2343 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2344 })
2345 .returning(move |_, data| {
2346 let alice_mgr_clone = alice_mgr_clone.clone();
2347 Box::pin(async move {
2348 alice_mgr_clone
2349 .dispatch_message(
2350 alice_pseudonym,
2351 ApplicationDataIn {
2352 data: data.data,
2353 packet_info: Default::default(),
2354 },
2355 )
2356 .await?;
2357 Ok(())
2358 })
2359 });
2360
2361 const INITIAL_BALANCER_TARGET: u64 = 10;
2362
2363 let bob_mgr_clone = bob_mgr.clone();
2365 alice_transport
2366 .expect_send_message()
2367 .times(5..)
2368 .withf(move |peer, data| {
2370 start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerTarget) && ka.additional_data == INITIAL_BALANCER_TARGET))
2371 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2373 })
2374 .returning(move |_, data| {
2375 let bob_mgr_clone = bob_mgr_clone.clone();
2376 Box::pin(async move {
2377 bob_mgr_clone
2378 .dispatch_message(
2379 alice_pseudonym,
2380 ApplicationDataIn {
2381 data: data.data,
2382 packet_info: Default::default(),
2383 },
2384 )
2385 .await?;
2386 Ok(())
2387 })
2388 });
2389
2390 const NEXT_BALANCER_TARGET: u64 = 50;
2391
2392 let bob_mgr_clone = bob_mgr.clone();
2394 alice_transport
2395 .expect_send_message()
2396 .times(5..)
2397 .withf(move |peer, data| {
2399 start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerTarget) && ka.additional_data == NEXT_BALANCER_TARGET))
2400 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2401 })
2402 .returning(move |_, data| {
2403 let bob_mgr_clone = bob_mgr_clone.clone();
2404 Box::pin(async move {
2405 bob_mgr_clone
2406 .dispatch_message(
2407 alice_pseudonym,
2408 ApplicationDataIn {
2409 data: data.data,
2410 packet_info: Default::default(),
2411 },
2412 )
2413 .await?;
2414 Ok(())
2415 })
2416 });
2417
2418 let alice_mgr_clone = alice_mgr.clone();
2420 bob_transport
2421 .expect_send_message()
2422 .times(1..)
2423 .withf(move |peer, data| {
2425 start_msg_match(&data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerState) && ka.additional_data > 0))
2426 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2427 })
2428 .returning(move |_, data| {
2429 let alice_mgr_clone = alice_mgr_clone.clone();
2430 Box::pin(async move {
2431 alice_mgr_clone
2432 .dispatch_message(
2433 alice_pseudonym,
2434 ApplicationDataIn {
2435 data: data.data,
2436 packet_info: Default::default(),
2437 },
2438 )
2439 .await?;
2440 Ok(())
2441 })
2442 });
2443
2444 let bob_mgr_clone = bob_mgr.clone();
2446 alice_transport
2447 .expect_send_message()
2448 .once()
2449 .withf(move |peer, data| {
2451 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2452 data.data.plain_text.as_ref(),
2453 )
2454 .ok()
2455 .and_then(|m| m.try_as_segment())
2456 .map(|s| s.is_terminating())
2457 .unwrap_or(false)
2458 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2459 })
2460 .returning(move |_, data| {
2461 let bob_mgr_clone = bob_mgr_clone.clone();
2462 Box::pin(async move {
2463 bob_mgr_clone
2464 .dispatch_message(
2465 alice_pseudonym,
2466 ApplicationDataIn {
2467 data: data.data,
2468 packet_info: Default::default(),
2469 },
2470 )
2471 .await?;
2472 Ok(())
2473 })
2474 });
2475
2476 let mut ahs = Vec::new();
2477
2478 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2480 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2481
2482 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
2484 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2485
2486 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2487
2488 let balancer_cfg = SurbBalancerConfig {
2489 target_surb_buffer_size: INITIAL_BALANCER_TARGET,
2490 max_surbs_per_sec: 100,
2491 ..Default::default()
2492 };
2493
2494 pin_mut!(new_session_rx_bob);
2495 let (alice_session, bob_session) = timeout(
2496 Duration::from_secs(2),
2497 futures::future::join(
2498 alice_mgr.new_session(
2499 bob_peer,
2500 SessionTarget::TcpStream(target.clone()),
2501 SessionClientConfig {
2502 pseudonym: alice_pseudonym.into(),
2503 capabilities: Capability::Segmentation.into(),
2504 surb_management: Some(balancer_cfg),
2505 ..Default::default()
2506 },
2507 ),
2508 new_session_rx_bob.next(),
2509 ),
2510 )
2511 .await?;
2512
2513 let mut alice_session = alice_session?;
2514 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2515
2516 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2517
2518 assert_eq!(
2519 Some(balancer_cfg),
2520 alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2521 );
2522
2523 let remote_cfg = bob_mgr
2524 .get_surb_balancer_config(bob_session.session.id())
2525 .await?
2526 .ok_or(anyhow!("no remote config at bob"))?;
2527 assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2528 assert_eq!(
2529 remote_cfg.max_surbs_per_sec,
2530 remote_cfg.target_surb_buffer_size
2531 / bob_cfg
2532 .minimum_surb_buffer_duration
2533 .max(MIN_SURB_BUFFER_DURATION)
2534 .as_secs()
2535 );
2536
2537 tokio::time::sleep(Duration::from_millis(1500)).await;
2539
2540 let new_balancer_cfg = SurbBalancerConfig {
2541 target_surb_buffer_size: NEXT_BALANCER_TARGET,
2542 max_surbs_per_sec: 100,
2543 ..Default::default()
2544 };
2545
2546 alice_mgr
2548 .update_surb_balancer_config(alice_session.id(), new_balancer_cfg)
2549 .await?;
2550
2551 tokio::time::sleep(Duration::from_millis(1500)).await;
2553
2554 let remote_cfg = bob_mgr
2556 .get_surb_balancer_config(bob_session.session.id())
2557 .await?
2558 .ok_or(anyhow!("no remote config at bob"))?;
2559 assert_eq!(
2560 remote_cfg.target_surb_buffer_size,
2561 new_balancer_cfg.target_surb_buffer_size
2562 );
2563 assert_eq!(
2564 remote_cfg.max_surbs_per_sec,
2565 new_balancer_cfg.target_surb_buffer_size / bob_cfg.minimum_surb_buffer_duration.as_secs()
2566 );
2567
2568 let (alice_surb_sent, alice_surb_used) = alice_mgr.get_surb_level_estimates(alice_session.id()).await?;
2569 let (bob_surb_recv, bob_surb_used) = bob_mgr.get_surb_level_estimates(bob_session.session.id()).await?;
2570
2571 alice_session.close().await?;
2572
2573 assert!(alice_surb_sent > 0, "alice must've sent surbs");
2574 assert!(bob_surb_recv > 0, "bob must've received surbs");
2575 assert!(
2576 bob_surb_recv <= alice_surb_sent,
2577 "bob cannot receive more surbs than alice sent"
2578 );
2579
2580 assert!(alice_surb_used > 0, "alice must see bob used surbs");
2581 assert!(bob_surb_used > 0, "bob must've used surbs");
2582 assert!(
2583 alice_surb_used <= bob_surb_used,
2584 "alice cannot see bob used more surbs than bob actually used"
2585 );
2586
2587 tokio::time::sleep(Duration::from_millis(300)).await;
2588 assert!(matches!(
2589 alice_mgr.ping_session(alice_session.id()).await,
2590 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2591 ));
2592
2593 futures::stream::iter(ahs)
2594 .for_each(|ah| async move { ah.abort() })
2595 .await;
2596
2597 Ok(())
2598 }
2599}