1use std::{
2 ops::Range,
3 sync::{Arc, OnceLock},
4 time::Duration,
5};
6
7use futures::{
8 FutureExt, SinkExt, StreamExt, TryStreamExt,
9 channel::mpsc::{Sender, UnboundedSender},
10 future::AbortHandle,
11 pin_mut,
12};
13use futures_time::future::FutureExt as TimeExt;
14use hopr_async_runtime::AbortableList;
15use hopr_crypto_random::Randomizable;
16use hopr_internal_types::prelude::HoprPseudonym;
17use hopr_network_types::prelude::*;
18use hopr_primitive_types::prelude::Address;
19use hopr_protocol_app::prelude::*;
20use hopr_protocol_start::{
21 KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished, StartInitiation,
22};
23use tracing::{debug, error, info, trace, warn};
24
25#[cfg(feature = "telemetry")]
26use crate::telemetry::{SessionLifecycleState, SessionStatsSnapshot, SessionTelemetry};
27use crate::{
28 Capability, HoprSession, IncomingSession, SESSION_MTU, SessionClientConfig, SessionId, SessionTarget,
29 SurbBalancerConfig,
30 balancer::{
31 AtomicSurbFlowEstimator, BalancerConfigFeedback, RateController, RateLimitSinkExt, SurbBalancer,
32 SurbControllerWithCorrection,
33 pid::{PidBalancerController, PidControllerGains},
34 simple::SimpleBalancerController,
35 },
36 errors::{SessionManagerError, TransportSessionError},
37 types::{ByteCapabilities, ClosureReason, HoprSessionConfig, HoprStartProtocol},
38 utils,
39 utils::insert_into_next_slot,
40};
41
42#[cfg(all(feature = "prometheus", not(test)))]
43lazy_static::lazy_static! {
44 static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
45 "hopr_session_num_active_sessions",
46 "Number of currently active HOPR sessions"
47 ).unwrap();
48 static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
49 "hopr_session_established_sessions_count",
50 "Number of sessions that were successfully established as an Exit node"
51 ).unwrap();
52 static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
53 "hopr_session_initiated_sessions_count",
54 "Number of sessions that were successfully initiated as an Entry node"
55 ).unwrap();
56 static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
57 "hopr_session_received_error_count",
58 "Number of HOPR session errors received from an Exit node",
59 &["kind"]
60 ).unwrap();
61 static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
62 "hopr_session_sent_error_count",
63 "Number of HOPR session errors sent to an Entry node",
64 &["kind"]
65 ).unwrap();
66}
67
68fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
69 debug!(?session_id, ?reason, "closing session");
70
71 #[cfg(feature = "telemetry")]
72 {
73 session_data.telemetry.set_state(SessionLifecycleState::Closed);
74 session_data.telemetry.touch_activity();
75 }
76
77 if reason != ClosureReason::EmptyRead {
78 session_data.session_tx.close_channel();
80 trace!(?session_id, "data tx channel closed on session");
81 }
82
83 session_data.abort_handles.lock().abort_all();
85
86 #[cfg(all(feature = "prometheus", not(test)))]
87 METRIC_ACTIVE_SESSIONS.decrement(1.0);
88}
89
90fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
91 base * (hops as u32)
92}
93
94pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
96
97pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
99
100const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
102
103const MIN_FRAME_TIMEOUT: Duration = Duration::from_millis(10);
105
106type SessionInitiationCache =
110 moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
111
112#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
113enum SessionTasks {
114 KeepAlive,
115 Balancer,
116}
117
118#[derive(Clone)]
119struct SessionSlot {
120 session_tx: Arc<UnboundedSender<ApplicationDataIn>>,
123 routing_opts: DestinationRouting,
124 abort_handles: Arc<parking_lot::Mutex<AbortableList<SessionTasks>>>,
125 #[cfg(feature = "telemetry")]
126 telemetry: Arc<SessionTelemetry>,
127 surb_mgmt: Option<SurbBalancerConfig>,
130 surb_estimator: Option<AtomicSurbFlowEstimator>,
132}
133
134struct SessionCacheBalancerFeedback(moka::future::Cache<SessionId, SessionSlot>);
136
137#[async_trait::async_trait]
138impl BalancerConfigFeedback for SessionCacheBalancerFeedback {
139 async fn get_config(&self, id: &SessionId) -> crate::errors::Result<SurbBalancerConfig> {
140 self.0
143 .iter()
144 .find(|(sid, _)| sid.as_ref() == id)
145 .ok_or(SessionManagerError::NonExistingSession)?
146 .1
147 .surb_mgmt
148 .ok_or(SessionManagerError::Other("missing surb balancer config".into()).into())
149 }
150
151 async fn on_config_update(&self, id: &SessionId, cfg: SurbBalancerConfig) -> crate::errors::Result<()> {
152 if let moka::ops::compute::CompResult::ReplacedWith(_) = self
153 .0
154 .entry_by_ref(id)
155 .and_compute_with(|entry| {
156 futures::future::ready(match entry.map(|e| e.into_value()) {
157 None => moka::ops::compute::Op::Nop,
158 Some(mut updated_slot) => {
159 if let Some(balancer_cfg) = &mut updated_slot.surb_mgmt {
160 *balancer_cfg = cfg;
161 moka::ops::compute::Op::Put(updated_slot)
162 } else {
163 moka::ops::compute::Op::Nop
164 }
165 }
166 })
167 })
168 .await
169 {
170 Ok(())
171 } else {
172 Err(SessionManagerError::NonExistingSession.into())
173 }
174 }
175}
176
177#[derive(Clone, Debug, PartialEq, Eq)]
179pub enum DispatchResult {
180 Processed,
182 Unrelated(ApplicationDataIn),
184}
185
186#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
188pub struct SessionManagerConfig {
189 #[doc(hidden)]
197 #[default(_code = "16u64..1024u64")]
198 pub session_tag_range: Range<u64>,
199
200 #[default(128)]
209 pub maximum_sessions: usize,
210
211 #[default(1500)]
215 pub frame_mtu: usize,
216
217 #[default(Duration::from_millis(800))]
221 pub max_frame_timeout: Duration,
222
223 #[default(Duration::from_millis(500))]
230 pub initiation_timeout_base: Duration,
231
232 #[default(Duration::from_secs(180))]
236 pub idle_timeout: Duration,
237
238 #[default(Duration::from_millis(100))]
243 pub balancer_sampling_interval: Duration,
244
245 #[default(10)]
251 pub initial_return_session_egress_rate: usize,
252
253 #[default(Duration::from_secs(5))]
264 pub minimum_surb_buffer_duration: Duration,
265
266 #[default(10_000)]
274 pub maximum_surb_buffer_size: usize,
275
276 #[default(_code = "Some((Duration::from_secs(60), 0.20))")]
284 pub growable_target_surb_buffer: Option<(Duration, f64)>,
285}
286
287pub struct SessionManager<S, T> {
430 session_initiations: SessionInitiationCache,
431 #[allow(clippy::type_complexity)]
432 session_notifiers: Arc<OnceLock<(T, Sender<(SessionId, ClosureReason)>)>>,
433 sessions: moka::future::Cache<SessionId, SessionSlot>,
434 msg_sender: Arc<OnceLock<S>>,
435 cfg: SessionManagerConfig,
436}
437
438impl<S, T> Clone for SessionManager<S, T> {
439 fn clone(&self) -> Self {
440 Self {
441 session_initiations: self.session_initiations.clone(),
442 session_notifiers: self.session_notifiers.clone(),
443 sessions: self.sessions.clone(),
444 cfg: self.cfg.clone(),
445 msg_sender: self.msg_sender.clone(),
446 }
447 }
448}
449
450const EXTERNAL_SEND_TIMEOUT: Duration = Duration::from_millis(200);
451
452impl<S, T> SessionManager<S, T>
453where
454 S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
455 T: futures::Sink<IncomingSession> + Clone + Send + Sync + Unpin + 'static,
456 S::Error: std::error::Error + Send + Sync + Clone + 'static,
457 T::Error: std::error::Error + Send + Sync + Clone + 'static,
458{
459 pub fn new(mut cfg: SessionManagerConfig) -> Self {
461 let min_session_tag_range_reservation = ReservedTag::range().end;
462 debug_assert!(
463 min_session_tag_range_reservation > HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG.as_u64(),
464 "invalid tag reservation range"
465 );
466
467 if cfg.session_tag_range.start < min_session_tag_range_reservation {
469 let diff = min_session_tag_range_reservation - cfg.session_tag_range.start;
470 cfg.session_tag_range = min_session_tag_range_reservation..cfg.session_tag_range.end + diff;
471 }
472 cfg.maximum_sessions = cfg
473 .maximum_sessions
474 .clamp(1, (cfg.session_tag_range.end - cfg.session_tag_range.start) as usize);
475
476 cfg.frame_mtu = cfg.frame_mtu.max(SESSION_MTU);
478 cfg.max_frame_timeout = cfg.max_frame_timeout.max(MIN_FRAME_TIMEOUT);
479
480 #[cfg(all(feature = "prometheus", not(test)))]
481 METRIC_ACTIVE_SESSIONS.set(0.0);
482
483 let msg_sender = Arc::new(OnceLock::new());
484 Self {
485 msg_sender: msg_sender.clone(),
486 session_initiations: moka::future::Cache::builder()
487 .max_capacity(cfg.maximum_sessions as u64)
488 .time_to_live(
489 2 * initiation_timeout_max_one_way(
490 cfg.initiation_timeout_base,
491 RoutingOptions::MAX_INTERMEDIATE_HOPS,
492 ),
493 )
494 .build(),
495 sessions: moka::future::Cache::builder()
496 .max_capacity(cfg.maximum_sessions as u64)
497 .time_to_idle(cfg.idle_timeout)
498 .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
499 moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
500 trace!(?session_id, ?reason, "session evicted from the cache");
501 close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
502 }
503 _ => {}
504 })
505 .build(),
506 session_notifiers: Arc::new(OnceLock::new()),
507 cfg,
508 }
509 }
510
511 pub fn start(&self, msg_sender: S, new_session_notifier: T) -> crate::errors::Result<Vec<AbortHandle>> {
517 self.msg_sender
518 .set(msg_sender)
519 .map_err(|_| SessionManagerError::AlreadyStarted)?;
520
521 let (session_close_tx, session_close_rx) = futures::channel::mpsc::channel(self.cfg.maximum_sessions + 10);
522 self.session_notifiers
523 .set((new_session_notifier, session_close_tx))
524 .map_err(|_| SessionManagerError::AlreadyStarted)?;
525
526 let myself = self.clone();
527 let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable!(session_close_rx.for_each_concurrent(
528 None,
529 move |(session_id, closure_reason)| {
530 let myself = myself.clone();
531 async move {
532 if let Some(session_data) = myself.sessions.remove(&session_id).await {
536 close_session(session_id, session_data, closure_reason);
537 } else {
538 debug!(
540 ?session_id,
541 ?closure_reason,
542 "could not find session id to close, maybe the session is already closed"
543 );
544 }
545 }
546 },
547 ));
548
549 let myself = self.clone();
554 let ah_session_expiration = hopr_async_runtime::spawn_as_abortable!(async move {
555 let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
556 let timeout = 2 * initiation_timeout_max_one_way(
557 myself.cfg.initiation_timeout_base,
558 RoutingOptions::MAX_INTERMEDIATE_HOPS,
559 )
560 .min(myself.cfg.idle_timeout)
561 .mul_f64(jitter)
562 / 2;
563 futures_time::stream::interval(timeout.into())
564 .for_each(|_| {
565 trace!("executing session cache evictions");
566 futures::future::join(
567 myself.sessions.run_pending_tasks(),
568 myself.session_initiations.run_pending_tasks(),
569 )
570 .map(|_| ())
571 })
572 .await;
573 });
574
575 Ok(vec![ah_closure_notifications, ah_session_expiration])
576 }
577
578 pub fn is_started(&self) -> bool {
580 self.session_notifiers.get().is_some()
581 }
582
583 async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
584 if let moka::ops::compute::CompResult::Inserted(_) = self
586 .sessions
587 .entry(session_id)
588 .and_compute_with(|entry| {
589 futures::future::ready(if entry.is_none() {
590 moka::ops::compute::Op::Put(slot)
591 } else {
592 moka::ops::compute::Op::Nop
593 })
594 })
595 .await
596 {
597 #[cfg(all(feature = "prometheus", not(test)))]
598 {
599 METRIC_NUM_INITIATED_SESSIONS.increment();
600 METRIC_ACTIVE_SESSIONS.increment(1.0);
601 }
602
603 Ok(())
604 } else {
605 error!(%session_id, "session already exists - loopback attempt");
607 Err(SessionManagerError::Loopback.into())
608 }
609 }
610
611 pub async fn new_session(
619 &self,
620 destination: Address,
621 target: SessionTarget,
622 cfg: SessionClientConfig,
623 ) -> crate::errors::Result<HoprSession> {
624 self.sessions.run_pending_tasks().await;
625 if self.cfg.maximum_sessions <= self.sessions.entry_count() as usize {
626 return Err(SessionManagerError::TooManySessions.into());
627 }
628
629 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
630
631 let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
632 let (challenge, _) = insert_into_next_slot(
633 &self.session_initiations,
634 |ch| {
635 if let Some(challenge) = ch {
636 ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
637 } else {
638 hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
639 }
640 },
641 |_| tx_initiation_done,
642 )
643 .await
644 .ok_or(SessionManagerError::NoChallengeSlots)?; trace!(challenge, ?cfg, "initiating session with config");
648 let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
649 challenge,
650 target,
651 capabilities: ByteCapabilities(cfg.capabilities),
652 additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
653 cfg.surb_management
654 .map(|c| c.target_surb_buffer_size)
655 .unwrap_or(
656 self.cfg.initial_return_session_egress_rate as u64
657 * self
658 .cfg
659 .minimum_surb_buffer_duration
660 .max(MIN_SURB_BUFFER_DURATION)
661 .as_secs(),
662 )
663 .min(u32::MAX as u64) as u32
664 } else {
665 0
666 },
667 });
668
669 let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
670 let forward_routing = DestinationRouting::Forward {
671 destination: Box::new(destination.into()),
672 pseudonym: Some(pseudonym), forward_options: cfg.forward_path_options.clone(),
674 return_options: cfg.return_path_options.clone().into(),
675 };
676
677 info!(challenge, %pseudonym, %destination, "new session request");
679 msg_sender
680 .send((
681 forward_routing.clone(),
682 ApplicationDataOut::with_no_packet_info(start_session_msg.try_into()?),
683 ))
684 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
685 .await
686 .map_err(|_| {
687 error!(challenge, %pseudonym, %destination, "timeout sending session request message");
688 TransportSessionError::Timeout
689 })?
690 .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?;
691
692 let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
694 self.cfg.initiation_timeout_base,
695 cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
696 )
697 .into();
698
699 pin_mut!(rx_initiation_done);
701
702 trace!(challenge, "awaiting session establishment");
703 match rx_initiation_done.try_next().timeout(initiation_timeout).await {
704 Ok(Ok(Some(est))) => {
705 let session_id = est.session_id;
707 debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
708
709 let (tx, rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
710 let notifier = self
711 .session_notifiers
712 .get()
713 .map(|(_, notifier)| {
714 let mut notifier = notifier.clone();
715 Box::new(move |session_id: SessionId, reason: ClosureReason| {
716 let _ = notifier
717 .try_send((session_id, reason))
718 .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
719 })
720 })
721 .ok_or(SessionManagerError::NotStarted)?;
722
723 #[cfg(feature = "telemetry")]
724 let metrics = Arc::new(SessionTelemetry::new(
725 session_id,
726 HoprSessionConfig {
727 capabilities: cfg.capabilities,
728 frame_mtu: self.cfg.frame_mtu,
729 frame_timeout: self.cfg.max_frame_timeout,
730 },
731 ));
732
733 if let Some(balancer_config) = cfg.surb_management {
737 let surb_estimator = AtomicSurbFlowEstimator::default();
738
739 #[cfg(feature = "telemetry")]
740 metrics.set_surb_estimator(surb_estimator.clone(), balancer_config.target_surb_buffer_size);
741
742 let surb_estimator_clone = surb_estimator.clone();
744 let full_surb_scoring_sender =
745 msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
746 surb_estimator_clone.produced.fetch_add(
748 data.estimate_surbs_with_msg() as u64,
749 std::sync::atomic::Ordering::Relaxed,
750 );
751 futures::future::ok::<_, S::Error>((routing, data))
752 });
753
754 let max_out_organic_surbs = cfg.always_max_out_surbs;
757 let reduced_surb_scoring_sender = full_surb_scoring_sender.clone().with(
758 move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
762 if !max_out_organic_surbs {
763 data.packet_info
765 .get_or_insert_with(|| OutgoingPacketInfo {
766 max_surbs_in_packet: 1,
767 ..Default::default()
768 })
769 .max_surbs_in_packet = 1;
770 }
771 futures::future::ok::<_, S::Error>((routing, data))
772 },
773 );
774
775 let mut abort_handles = AbortableList::default();
776
777 let (ka_controller, ka_abort_handle) =
779 utils::spawn_keep_alive_stream(session_id, full_surb_scoring_sender, forward_routing.clone());
780 abort_handles.insert(SessionTasks::KeepAlive, ka_abort_handle);
781
782 #[cfg(feature = "telemetry")]
783 metrics.set_refill_in_flight(true);
784
785 debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
787 let balancer = SurbBalancer::new(
788 session_id,
789 PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
791 surb_estimator.clone(),
792 ka_controller,
793 balancer_config,
794 );
795
796 let (level_stream, balancer_abort_handle) = balancer.start_control_loop(
797 self.cfg.balancer_sampling_interval,
798 SessionCacheBalancerFeedback(self.sessions.clone()),
799 None,
800 );
801 abort_handles.insert(SessionTasks::Balancer, balancer_abort_handle);
802
803 self.insert_session_slot(
805 session_id,
806 SessionSlot {
807 session_tx: Arc::new(tx),
808 routing_opts: forward_routing.clone(),
809 abort_handles: Arc::new(parking_lot::Mutex::new(abort_handles)),
810 surb_mgmt: Some(balancer_config),
811 surb_estimator: None, #[cfg(feature = "telemetry")]
813 telemetry: metrics.clone(),
814 },
815 )
816 .await?;
817
818 #[cfg(feature = "telemetry")]
819 metrics.set_state(SessionLifecycleState::Active);
820
821 match level_stream
824 .skip_while(|current_level| {
825 futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
826 })
827 .next()
828 .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
829 .await
830 {
831 Ok(Some(surb_level)) => {
832 info!(%session_id, surb_level, "session is ready");
833 }
834 Ok(None) => {
835 return Err(
836 SessionManagerError::Other("surb balancer was cancelled prematurely".into()).into(),
837 );
838 }
839 Err(_) => {
840 warn!(%session_id, "session didn't reach target SURB buffer size in time");
841 }
842 }
843
844 HoprSession::new(
845 session_id,
846 forward_routing,
847 HoprSessionConfig {
848 capabilities: cfg.capabilities,
849 frame_mtu: self.cfg.frame_mtu,
850 frame_timeout: self.cfg.max_frame_timeout,
851 },
852 (
853 reduced_surb_scoring_sender,
854 rx.inspect(move |_| {
855 surb_estimator
858 .consumed
859 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
860 }),
861 ),
862 Some(notifier),
863 #[cfg(feature = "telemetry")]
864 metrics,
865 )
866 } else {
867 warn!(%session_id, "session ready without SURB balancing");
868
869 self.insert_session_slot(
870 session_id,
871 SessionSlot {
872 session_tx: Arc::new(tx),
873 routing_opts: forward_routing.clone(),
874 abort_handles: Default::default(),
875 surb_mgmt: None,
876 surb_estimator: None,
877 #[cfg(feature = "telemetry")]
878 telemetry: metrics.clone(),
879 },
880 )
881 .await?;
882 #[cfg(feature = "telemetry")]
883 metrics.set_state(SessionLifecycleState::Active);
884
885 let max_out_organic_surbs = cfg.always_max_out_surbs;
888 let reduced_surb_sender =
889 msg_sender.with(move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
890 if !max_out_organic_surbs {
891 data.packet_info
892 .get_or_insert_with(|| OutgoingPacketInfo {
893 max_surbs_in_packet: 1,
894 ..Default::default()
895 })
896 .max_surbs_in_packet = 1;
897 }
898 futures::future::ok::<_, S::Error>((routing, data))
899 });
900
901 HoprSession::new(
902 session_id,
903 forward_routing,
904 HoprSessionConfig {
905 capabilities: cfg.capabilities,
906 frame_mtu: self.cfg.frame_mtu,
907 frame_timeout: self.cfg.max_frame_timeout,
908 },
909 (reduced_surb_sender, rx),
910 Some(notifier),
911 #[cfg(feature = "telemetry")]
912 metrics,
913 )
914 }
915 }
916 Ok(Ok(None)) => Err(SessionManagerError::Other(
917 "internal error: sender has been closed without completing the session establishment".into(),
918 )
919 .into()),
920 Ok(Err(error)) => {
921 error!(
923 challenge = error.challenge,
924 ?error,
925 "the other party rejected the session initiation with error"
926 );
927 Err(TransportSessionError::Rejected(error.reason))
928 }
929 Err(_) => {
930 error!(challenge, "session initiation attempt timed out");
932
933 #[cfg(all(feature = "prometheus", not(test)))]
934 METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
935
936 Err(TransportSessionError::Timeout)
937 }
938 }
939 }
940
941 pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
945 if let Some(session_data) = self.sessions.get(id).await {
946 trace!(session_id = ?id, "pinging manually session");
947 Ok(self
948 .msg_sender
949 .get()
950 .cloned()
951 .ok_or(SessionManagerError::NotStarted)?
952 .send((
953 session_data.routing_opts.clone(),
954 ApplicationDataOut::with_no_packet_info(HoprStartProtocol::KeepAlive((*id).into()).try_into()?),
955 ))
956 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
957 .await
958 .map_err(|_| {
959 error!("timeout sending session ping message");
960 TransportSessionError::Timeout
961 })?
962 .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?)
963 } else {
964 Err(SessionManagerError::NonExistingSession.into())
965 }
966 }
967
968 pub async fn active_sessions(&self) -> Vec<SessionId> {
970 self.sessions.run_pending_tasks().await;
971 self.sessions.iter().map(|(k, _)| *k).collect()
972 }
973
974 pub async fn update_surb_balancer_config(
979 &self,
980 id: &SessionId,
981 config: SurbBalancerConfig,
982 ) -> crate::errors::Result<()> {
983 match self
984 .sessions
985 .entry_by_ref(id)
986 .and_compute_with(|entry| {
987 futures::future::ready(if let Some(mut cached_session) = entry.map(|e| e.into_value()) {
988 if cached_session.surb_mgmt.is_some() {
990 cached_session.surb_mgmt = Some(config);
991 moka::ops::compute::Op::Put(cached_session)
992 } else {
993 moka::ops::compute::Op::Nop
994 }
995 } else {
996 moka::ops::compute::Op::Nop
997 })
998 })
999 .await
1000 {
1001 moka::ops::compute::CompResult::ReplacedWith(_) => Ok(()),
1002 moka::ops::compute::CompResult::Unchanged(_) => {
1003 Err(SessionManagerError::Other("session does not use SURB balancing".into()).into())
1004 }
1005 _ => Err(SessionManagerError::NonExistingSession.into()),
1006 }
1007 }
1008
1009 pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
1013 match self.sessions.get(id).await {
1014 Some(session) => Ok(session.surb_mgmt),
1015 None => Err(SessionManagerError::NonExistingSession.into()),
1016 }
1017 }
1018
1019 #[cfg(feature = "telemetry")]
1023 pub async fn get_session_stats(&self, id: &SessionId) -> crate::errors::Result<SessionStatsSnapshot> {
1024 match self.sessions.get(id).await {
1025 Some(session) => Ok(session.telemetry.snapshot()),
1026 None => Err(SessionManagerError::NonExistingSession.into()),
1027 }
1028 }
1029
1030 pub async fn dispatch_message(
1037 &self,
1038 pseudonym: HoprPseudonym,
1039 in_data: ApplicationDataIn,
1040 ) -> crate::errors::Result<DispatchResult> {
1041 if in_data.data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
1042 trace!("dispatching Start protocol message");
1044 return self
1045 .handle_start_protocol_message(pseudonym, in_data)
1046 .await
1047 .map(|_| DispatchResult::Processed);
1048 } else if self
1049 .cfg
1050 .session_tag_range
1051 .contains(&in_data.data.application_tag.as_u64())
1052 {
1053 let session_id = SessionId::new(in_data.data.application_tag, pseudonym);
1054
1055 return if let Some(session_data) = self.sessions.get(&session_id).await {
1056 trace!(?session_id, "received data for a registered session");
1057
1058 Ok(session_data
1059 .session_tx
1060 .unbounded_send(in_data)
1061 .map(|_| DispatchResult::Processed)
1062 .map_err(|e| SessionManagerError::Other(e.to_string()))?)
1063 } else {
1064 error!(%session_id, "received data from an unestablished session");
1065 Err(TransportSessionError::UnknownData)
1066 };
1067 }
1068
1069 trace!(tag = %in_data.data.application_tag, "received data not associated with session protocol or any existing session");
1070 Ok(DispatchResult::Unrelated(in_data))
1071 }
1072
1073 async fn handle_incoming_session_initiation(
1074 &self,
1075 pseudonym: HoprPseudonym,
1076 session_req: StartInitiation<SessionTarget, ByteCapabilities>,
1077 ) -> crate::errors::Result<()> {
1078 trace!(challenge = session_req.challenge, "received session initiation request");
1079
1080 debug!(%pseudonym, "got new session request, searching for a free session slot");
1081
1082 let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
1083
1084 let (mut new_session_notifier, mut close_session_notifier) = self
1085 .session_notifiers
1086 .get()
1087 .cloned()
1088 .ok_or(SessionManagerError::NotStarted)?;
1089
1090 let reply_routing = DestinationRouting::Return(pseudonym.into());
1092
1093 let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1094
1095 self.sessions.run_pending_tasks().await; let allocated_slot = if self.cfg.maximum_sessions > self.sessions.entry_count() as usize {
1098 insert_into_next_slot(
1099 &self.sessions,
1100 |sid| {
1101 let next_tag: Tag = match sid {
1104 Some(session_id) => ((session_id.tag().as_u64() + 1) % self.cfg.session_tag_range.end)
1105 .max(self.cfg.session_tag_range.start)
1106 .into(),
1107 None => hopr_crypto_random::random_integer(
1108 self.cfg.session_tag_range.start,
1109 Some(self.cfg.session_tag_range.end),
1110 )
1111 .into(),
1112 };
1113 SessionId::new(next_tag, pseudonym)
1114 },
1115 |_sid| SessionSlot {
1116 session_tx: Arc::new(tx_session_data),
1117 routing_opts: reply_routing.clone(),
1118 abort_handles: Default::default(),
1119 surb_mgmt: None,
1120 surb_estimator: None,
1121 #[cfg(feature = "telemetry")]
1122 telemetry: SessionTelemetry::new(
1123 _sid,
1124 HoprSessionConfig {
1125 capabilities: session_req.capabilities.0,
1126 frame_mtu: self.cfg.frame_mtu,
1127 frame_timeout: self.cfg.max_frame_timeout,
1128 },
1129 )
1130 .into(),
1131 },
1132 )
1133 .await
1134 } else {
1135 error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
1136 None
1137 };
1138
1139 if let Some((session_id, _slot)) = allocated_slot {
1142 debug!(%session_id, ?session_req, "assigned a new session");
1143
1144 #[cfg(feature = "telemetry")]
1145 let stats = _slot.telemetry;
1146
1147 let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
1148 if let Err(error) = close_session_notifier.try_send((session_id, reason)) {
1149 error!(%session_id, %error, %reason, "failed to notify session closure");
1150 }
1151 });
1152
1153 let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
1154 let surb_estimator = AtomicSurbFlowEstimator::default();
1155
1156 let egress_rate_control =
1158 RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1159
1160 let target_surb_buffer_size = if session_req.additional_data > 0 {
1163 (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1164 } else {
1165 self.cfg.initial_return_session_egress_rate as u64
1166 * self
1167 .cfg
1168 .minimum_surb_buffer_duration
1169 .max(MIN_SURB_BUFFER_DURATION)
1170 .as_secs()
1171 };
1172 #[cfg(feature = "telemetry")]
1173 stats.set_surb_estimator(surb_estimator.clone(), target_surb_buffer_size);
1174
1175 let surb_estimator_clone = surb_estimator.clone();
1176 let session = HoprSession::new(
1177 session_id,
1178 reply_routing.clone(),
1179 HoprSessionConfig {
1180 capabilities: session_req.capabilities.into(),
1181 frame_mtu: self.cfg.frame_mtu,
1182 frame_timeout: self.cfg.max_frame_timeout,
1183 },
1184 (
1185 msg_sender
1187 .clone()
1188 .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1189 surb_estimator_clone
1191 .consumed
1192 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1193 futures::future::ok::<_, S::Error>((routing, data))
1194 })
1195 .rate_limit_with_controller(&egress_rate_control)
1196 .buffer((2 * target_surb_buffer_size) as usize),
1197 rx_session_data.inspect(move |data| {
1199 surb_estimator_clone
1201 .produced
1202 .fetch_add(data.num_surbs_with_msg() as u64, std::sync::atomic::Ordering::Relaxed);
1203 }),
1204 ),
1205 Some(closure_notifier),
1206 #[cfg(feature = "telemetry")]
1207 stats.clone(),
1208 )?;
1209
1210 let balancer_config = SurbBalancerConfig {
1214 target_surb_buffer_size,
1215 max_surbs_per_sec: target_surb_buffer_size
1217 / self
1218 .cfg
1219 .minimum_surb_buffer_duration
1220 .max(MIN_SURB_BUFFER_DURATION)
1221 .as_secs(),
1222 surb_decay: None,
1225 };
1226
1227 let (balancer_abort_handle, balancer_abort_reg) = AbortHandle::new_pair();
1229 if let moka::ops::compute::CompResult::ReplacedWith(_) = self
1230 .sessions
1231 .entry(session_id)
1232 .and_compute_with(|entry| {
1233 if let Some(mut cached_session) = entry.map(|c| c.into_value()) {
1234 cached_session
1235 .abort_handles
1236 .lock()
1237 .insert(SessionTasks::Balancer, balancer_abort_handle);
1238 cached_session.surb_mgmt = Some(balancer_config);
1239 cached_session.surb_estimator = Some(surb_estimator.clone());
1240 #[cfg(feature = "telemetry")]
1241 {
1242 cached_session.telemetry = stats.clone();
1243 }
1244 futures::future::ready(moka::ops::compute::Op::Put(cached_session))
1245 } else {
1246 futures::future::ready(moka::ops::compute::Op::Nop)
1247 }
1248 })
1249 .await
1250 {
1251 debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1254 let balancer = SurbBalancer::new(
1255 session_id,
1256 if let Some((growth_window, ratio_threshold)) = self.cfg.growable_target_surb_buffer.as_ref() {
1257 SimpleBalancerController::with_increasing_setpoint(
1259 *ratio_threshold,
1260 (growth_window
1261 .div_duration_f64(self.cfg.balancer_sampling_interval)
1262 .round() as usize)
1263 .max(1),
1264 )
1265 } else {
1266 SimpleBalancerController::default()
1267 },
1268 surb_estimator,
1269 SurbControllerWithCorrection(egress_rate_control, 1), balancer_config,
1271 );
1272
1273 let _ = balancer.start_control_loop(
1274 self.cfg.balancer_sampling_interval,
1275 SessionCacheBalancerFeedback(self.sessions.clone()),
1276 Some(balancer_abort_reg),
1277 );
1278 } else {
1279 return Err(SessionManagerError::Other(
1281 "failed to spawn SURB balancer - inconsistent cache".into(),
1282 )
1283 .into());
1284 }
1285
1286 session
1287 } else {
1288 HoprSession::new(
1289 session_id,
1290 reply_routing.clone(),
1291 HoprSessionConfig {
1292 capabilities: session_req.capabilities.into(),
1293 frame_mtu: self.cfg.frame_mtu,
1294 frame_timeout: self.cfg.max_frame_timeout,
1295 },
1296 (msg_sender.clone(), rx_session_data),
1297 Some(closure_notifier),
1298 #[cfg(feature = "telemetry")]
1299 stats,
1300 )?
1301 };
1302
1303 let incoming_session = IncomingSession {
1305 session,
1306 target: session_req.target,
1307 };
1308
1309 match new_session_notifier
1311 .send(incoming_session)
1312 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1313 .await
1314 {
1315 Err(_) => {
1316 error!(%session_id, "timeout to notify about new incoming session");
1317 return Err(TransportSessionError::Timeout);
1318 }
1319 Ok(Err(error)) => {
1320 error!(%session_id, %error, "failed to notify about new incoming session");
1321 return Err(SessionManagerError::Other(error.to_string()).into());
1322 }
1323 _ => {}
1324 };
1325
1326 trace!(?session_id, "session notification sent");
1327
1328 let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1331 orig_challenge: session_req.challenge,
1332 session_id,
1333 });
1334
1335 msg_sender
1336 .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1337 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1338 .await
1339 .map_err(|_| {
1340 error!(%session_id, "timeout sending session establishment message");
1341 TransportSessionError::Timeout
1342 })?
1343 .map_err(|e| {
1344 SessionManagerError::Other(format!(
1345 "failed to send session {session_id} establishment message: {e}"
1346 ))
1347 })?;
1348
1349 info!(%session_id, "new session established");
1350
1351 #[cfg(all(feature = "prometheus", not(test)))]
1352 {
1353 METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1354 METRIC_ACTIVE_SESSIONS.increment(1.0);
1355 }
1356 } else {
1357 error!(%pseudonym,"failed to reserve a new session slot");
1358
1359 let reason = StartErrorReason::NoSlotsAvailable;
1361 let data = HoprStartProtocol::SessionError(StartErrorType {
1362 challenge: session_req.challenge,
1363 reason,
1364 });
1365
1366 msg_sender
1367 .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1368 .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1369 .await
1370 .map_err(|_| {
1371 error!("timeout sending session error message");
1372 TransportSessionError::Timeout
1373 })?
1374 .map_err(|e| {
1375 SessionManagerError::Other(format!("failed to send session establishment error message: {e}"))
1376 })?;
1377
1378 trace!(%pseudonym, "session establishment failure message sent");
1379
1380 #[cfg(all(feature = "prometheus", not(test)))]
1381 METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1382 }
1383
1384 Ok(())
1385 }
1386
1387 async fn handle_start_protocol_message(
1388 &self,
1389 pseudonym: HoprPseudonym,
1390 data: ApplicationDataIn,
1391 ) -> crate::errors::Result<()> {
1392 match HoprStartProtocol::try_from(data.data)? {
1393 HoprStartProtocol::StartSession(session_req) => {
1394 self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1395 }
1396 HoprStartProtocol::SessionEstablished(est) => {
1397 trace!(
1398 session_id = ?est.session_id,
1399 "received session establishment confirmation"
1400 );
1401 let challenge = est.orig_challenge;
1402 let session_id = est.session_id;
1403 if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1404 if let Err(e) = tx_est.unbounded_send(Ok(est)) {
1405 return Err(SessionManagerError::Other(format!(
1406 "could not notify session {session_id} establishment: {e}"
1407 ))
1408 .into());
1409 }
1410 debug!(?session_id, challenge, "session establishment complete");
1411 } else {
1412 error!(%session_id, challenge, "unknown session establishment attempt or expired");
1413 }
1414 }
1415 HoprStartProtocol::SessionError(error) => {
1416 trace!(
1417 challenge = error.challenge,
1418 error = ?error.reason,
1419 "failed to initialize a session",
1420 );
1421 if let Some(tx_est) = self.session_initiations.remove(&error.challenge).await {
1424 if let Err(e) = tx_est.unbounded_send(Err(error)) {
1425 return Err(SessionManagerError::Other(format!(
1426 "could not notify session establishment error {error:?}: {e}"
1427 ))
1428 .into());
1429 }
1430 error!(
1431 challenge = error.challenge,
1432 ?error,
1433 "session establishment error received"
1434 );
1435 } else {
1436 error!(
1437 challenge = error.challenge,
1438 ?error,
1439 "session establishment attempt expired before error could be delivered"
1440 );
1441 }
1442
1443 #[cfg(all(feature = "prometheus", not(test)))]
1444 METRIC_RECEIVED_SESSION_ERRS.increment(&[&error.reason.to_string()])
1445 }
1446 HoprStartProtocol::KeepAlive(msg) => {
1447 let session_id = msg.session_id;
1448 if let Some(session_slot) = self.sessions.get(&session_id).await {
1449 trace!(?session_id, "received keep-alive request");
1450 if let Some(estimator) = session_slot.surb_estimator.as_ref() {
1453 estimator.produced.fetch_add(
1455 KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64,
1456 std::sync::atomic::Ordering::Relaxed,
1457 );
1458 }
1459 } else {
1460 debug!(%session_id, "received keep-alive request for an unknown session");
1461 }
1462 }
1463 }
1464
1465 Ok(())
1466 }
1467}
1468
1469#[cfg(test)]
1470mod tests {
1471 use anyhow::anyhow;
1472 use futures::{AsyncWriteExt, future::BoxFuture};
1473 use hopr_crypto_random::Randomizable;
1474 use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
1475 use hopr_primitive_types::prelude::Address;
1476 use hopr_protocol_start::StartProtocolDiscriminants;
1477 use tokio::time::timeout;
1478
1479 use super::*;
1480 use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1481
1482 #[async_trait::async_trait]
1483 trait SendMsg {
1484 async fn send_message(
1485 &self,
1486 routing: DestinationRouting,
1487 data: ApplicationDataOut,
1488 ) -> crate::errors::Result<()>;
1489 }
1490
1491 mockall::mock! {
1492 MsgSender {}
1493 impl SendMsg for MsgSender {
1494 fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationDataOut)
1495 -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1496 }
1497 }
1498
1499 fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationDataOut)> {
1500 let (tx, rx) = futures::channel::mpsc::unbounded();
1501 tokio::task::spawn(async move {
1502 pin_mut!(rx);
1503 while let Some((routing, data)) = rx.next().await {
1504 sender
1505 .send_message(routing, data)
1506 .await
1507 .expect("send message must not fail in mock");
1508 }
1509 });
1510 tx
1511 }
1512
1513 fn msg_type(data: &ApplicationDataOut, expected: StartProtocolDiscriminants) -> bool {
1514 HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1515 .map(|d| StartProtocolDiscriminants::from(d) == expected)
1516 .unwrap_or(false)
1517 }
1518
1519 #[test_log::test(tokio::test)]
1520 async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1521 {
1522 let alice_pseudonym = HoprPseudonym::random();
1523 let bob_peer: Address = (&ChainKeypair::random()).into();
1524
1525 let alice_mgr = SessionManager::new(Default::default());
1526 let bob_mgr = SessionManager::new(Default::default());
1527
1528 let mut sequence = mockall::Sequence::new();
1529 let mut alice_transport = MockMsgSender::new();
1530 let mut bob_transport = MockMsgSender::new();
1531
1532 let bob_mgr_clone = bob_mgr.clone();
1534 alice_transport
1535 .expect_send_message()
1536 .once()
1537 .in_sequence(&mut sequence)
1538 .withf(move |peer, data| {
1539 info!("alice sends {}", data.data.application_tag);
1540 msg_type(data, StartProtocolDiscriminants::StartSession)
1541 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1542 })
1543 .returning(move |_, data| {
1544 let bob_mgr_clone = bob_mgr_clone.clone();
1545 Box::pin(async move {
1546 bob_mgr_clone
1547 .dispatch_message(
1548 alice_pseudonym,
1549 ApplicationDataIn {
1550 data: data.data,
1551 packet_info: Default::default(),
1552 },
1553 )
1554 .await?;
1555 Ok(())
1556 })
1557 });
1558
1559 let alice_mgr_clone = alice_mgr.clone();
1561 bob_transport
1562 .expect_send_message()
1563 .once()
1564 .in_sequence(&mut sequence)
1565 .withf(move |peer, data| {
1566 info!("bob sends {}", data.data.application_tag);
1567 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1568 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1569 })
1570 .returning(move |_, data| {
1571 let alice_mgr_clone = alice_mgr_clone.clone();
1572
1573 Box::pin(async move {
1574 alice_mgr_clone
1575 .dispatch_message(
1576 alice_pseudonym,
1577 ApplicationDataIn {
1578 data: data.data,
1579 packet_info: Default::default(),
1580 },
1581 )
1582 .await?;
1583 Ok(())
1584 })
1585 });
1586
1587 let bob_mgr_clone = bob_mgr.clone();
1589 alice_transport
1590 .expect_send_message()
1591 .once()
1592 .in_sequence(&mut sequence)
1593 .withf(move |peer, data| {
1594 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1595 data.data.plain_text.as_ref(),
1596 )
1597 .expect("must be a session message")
1598 .try_as_segment()
1599 .expect("must be a segment")
1600 .is_terminating()
1601 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1602 })
1603 .returning(move |_, data| {
1604 let bob_mgr_clone = bob_mgr_clone.clone();
1605 Box::pin(async move {
1606 bob_mgr_clone
1607 .dispatch_message(
1608 alice_pseudonym,
1609 ApplicationDataIn {
1610 data: data.data,
1611 packet_info: Default::default(),
1612 },
1613 )
1614 .await?;
1615 Ok(())
1616 })
1617 });
1618
1619 let mut ahs = Vec::new();
1620
1621 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1623 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1624
1625 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1627 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1628
1629 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1630
1631 pin_mut!(new_session_rx_bob);
1632 let (alice_session, bob_session) = timeout(
1633 Duration::from_secs(2),
1634 futures::future::join(
1635 alice_mgr.new_session(
1636 bob_peer,
1637 SessionTarget::TcpStream(target.clone()),
1638 SessionClientConfig {
1639 pseudonym: alice_pseudonym.into(),
1640 capabilities: Capability::NoRateControl | Capability::Segmentation,
1641 surb_management: None,
1642 ..Default::default()
1643 },
1644 ),
1645 new_session_rx_bob.next(),
1646 ),
1647 )
1648 .await?;
1649
1650 let mut alice_session = alice_session?;
1651 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1652
1653 assert_eq!(
1654 alice_session.config().capabilities,
1655 Capability::Segmentation | Capability::NoRateControl
1656 );
1657 assert_eq!(
1658 alice_session.config().capabilities,
1659 bob_session.session.config().capabilities
1660 );
1661 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1662
1663 assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1664 assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1665 assert!(
1666 alice_mgr
1667 .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1668 .await
1669 .is_err()
1670 );
1671
1672 assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1673 assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1674 assert!(
1675 bob_mgr
1676 .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1677 .await
1678 .is_err()
1679 );
1680
1681 tokio::time::sleep(Duration::from_millis(100)).await;
1682 alice_session.close().await?;
1683
1684 tokio::time::sleep(Duration::from_millis(100)).await;
1685
1686 assert!(matches!(
1687 alice_mgr.ping_session(alice_session.id()).await,
1688 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1689 ));
1690
1691 futures::stream::iter(ahs)
1692 .for_each(|ah| async move { ah.abort() })
1693 .await;
1694
1695 Ok(())
1696 }
1697
1698 #[test_log::test(tokio::test)]
1699 async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1700 let alice_pseudonym = HoprPseudonym::random();
1701 let bob_peer: Address = (&ChainKeypair::random()).into();
1702
1703 let cfg = SessionManagerConfig {
1704 idle_timeout: Duration::from_millis(200),
1705 ..Default::default()
1706 };
1707
1708 let alice_mgr = SessionManager::new(cfg);
1709 let bob_mgr = SessionManager::new(Default::default());
1710
1711 let mut sequence = mockall::Sequence::new();
1712 let mut alice_transport = MockMsgSender::new();
1713 let mut bob_transport = MockMsgSender::new();
1714
1715 let bob_mgr_clone = bob_mgr.clone();
1717 alice_transport
1718 .expect_send_message()
1719 .once()
1720 .in_sequence(&mut sequence)
1721 .withf(move |peer, data| {
1722 msg_type(data, StartProtocolDiscriminants::StartSession)
1723 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1724 })
1725 .returning(move |_, data| {
1726 let bob_mgr_clone = bob_mgr_clone.clone();
1727 Box::pin(async move {
1728 bob_mgr_clone
1729 .dispatch_message(
1730 alice_pseudonym,
1731 ApplicationDataIn {
1732 data: data.data,
1733 packet_info: Default::default(),
1734 },
1735 )
1736 .await?;
1737 Ok(())
1738 })
1739 });
1740
1741 let alice_mgr_clone = alice_mgr.clone();
1743 bob_transport
1744 .expect_send_message()
1745 .once()
1746 .in_sequence(&mut sequence)
1747 .withf(move |peer, data| {
1748 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1749 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1750 })
1751 .returning(move |_, data| {
1752 let alice_mgr_clone = alice_mgr_clone.clone();
1753
1754 Box::pin(async move {
1755 alice_mgr_clone
1756 .dispatch_message(
1757 alice_pseudonym,
1758 ApplicationDataIn {
1759 data: data.data,
1760 packet_info: Default::default(),
1761 },
1762 )
1763 .await?;
1764 Ok(())
1765 })
1766 });
1767
1768 let mut ahs = Vec::new();
1769
1770 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1772 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1773
1774 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1776 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1777
1778 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1779
1780 pin_mut!(new_session_rx_bob);
1781 let (alice_session, bob_session) = timeout(
1782 Duration::from_secs(2),
1783 futures::future::join(
1784 alice_mgr.new_session(
1785 bob_peer,
1786 SessionTarget::TcpStream(target.clone()),
1787 SessionClientConfig {
1788 pseudonym: alice_pseudonym.into(),
1789 capabilities: Capability::NoRateControl | Capability::Segmentation,
1790 surb_management: None,
1791 ..Default::default()
1792 },
1793 ),
1794 new_session_rx_bob.next(),
1795 ),
1796 )
1797 .await?;
1798
1799 let alice_session = alice_session?;
1800 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1801
1802 assert_eq!(
1803 alice_session.config().capabilities,
1804 Capability::Segmentation | Capability::NoRateControl,
1805 );
1806 assert_eq!(
1807 alice_session.config().capabilities,
1808 bob_session.session.config().capabilities
1809 );
1810 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1811
1812 tokio::time::sleep(Duration::from_millis(300)).await;
1814
1815 assert!(matches!(
1816 alice_mgr.ping_session(alice_session.id()).await,
1817 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1818 ));
1819
1820 futures::stream::iter(ahs)
1821 .for_each(|ah| async move { ah.abort() })
1822 .await;
1823
1824 Ok(())
1825 }
1826
1827 #[test_log::test(tokio::test)]
1828 async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1829 let alice_pseudonym = HoprPseudonym::random();
1830 let session_id = SessionId::new(16u64, alice_pseudonym);
1831 let balancer_cfg = SurbBalancerConfig {
1832 target_surb_buffer_size: 1000,
1833 max_surbs_per_sec: 100,
1834 ..Default::default()
1835 };
1836
1837 let alice_mgr = SessionManager::<
1838 UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
1839 futures::channel::mpsc::Sender<IncomingSession>,
1840 >::new(Default::default());
1841
1842 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1843 alice_mgr
1844 .sessions
1845 .insert(
1846 session_id,
1847 SessionSlot {
1848 session_tx: Arc::new(dummy_tx),
1849 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1850 abort_handles: Default::default(),
1851 surb_mgmt: Some(balancer_cfg),
1852 surb_estimator: None,
1853 #[cfg(feature = "telemetry")]
1854 telemetry: Arc::new(SessionTelemetry::new(session_id, Default::default())),
1855 },
1856 )
1857 .await;
1858
1859 let actual_cfg = alice_mgr
1860 .get_surb_balancer_config(&session_id)
1861 .await?
1862 .ok_or(anyhow!("session must have a surb balancer config"))?;
1863 assert_eq!(actual_cfg, balancer_cfg);
1864
1865 let new_cfg = SurbBalancerConfig {
1866 target_surb_buffer_size: 2000,
1867 max_surbs_per_sec: 200,
1868 ..Default::default()
1869 };
1870 alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1871
1872 let actual_cfg = alice_mgr
1873 .get_surb_balancer_config(&session_id)
1874 .await?
1875 .ok_or(anyhow!("session must have a surb balancer config"))?;
1876 assert_eq!(actual_cfg, new_cfg);
1877
1878 Ok(())
1879 }
1880
1881 #[test_log::test(tokio::test)]
1882 async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1883 let alice_pseudonym = HoprPseudonym::random();
1884 let bob_peer: Address = (&ChainKeypair::random()).into();
1885
1886 let cfg = SessionManagerConfig {
1887 session_tag_range: 16u64..17u64, ..Default::default()
1889 };
1890
1891 let alice_mgr = SessionManager::new(Default::default());
1892 let bob_mgr = SessionManager::new(cfg);
1893
1894 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1896 bob_mgr
1897 .sessions
1898 .insert(
1899 SessionId::new(16u64, alice_pseudonym),
1900 SessionSlot {
1901 session_tx: Arc::new(dummy_tx),
1902 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1903 abort_handles: Default::default(),
1904 #[cfg(feature = "telemetry")]
1905 telemetry: Arc::new(SessionTelemetry::new(
1906 SessionId::new(16u64, alice_pseudonym),
1907 Default::default(),
1908 )),
1909 surb_mgmt: None,
1910 surb_estimator: None,
1911 },
1912 )
1913 .await;
1914
1915 let mut sequence = mockall::Sequence::new();
1916 let mut alice_transport = MockMsgSender::new();
1917 let mut bob_transport = MockMsgSender::new();
1918
1919 let bob_mgr_clone = bob_mgr.clone();
1921 alice_transport
1922 .expect_send_message()
1923 .once()
1924 .in_sequence(&mut sequence)
1925 .withf(move |peer, data| {
1926 msg_type(data, StartProtocolDiscriminants::StartSession)
1927 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1928 })
1929 .returning(move |_, data| {
1930 let bob_mgr_clone = bob_mgr_clone.clone();
1931 Box::pin(async move {
1932 bob_mgr_clone
1933 .dispatch_message(
1934 alice_pseudonym,
1935 ApplicationDataIn {
1936 data: data.data,
1937 packet_info: Default::default(),
1938 },
1939 )
1940 .await?;
1941 Ok(())
1942 })
1943 });
1944
1945 let alice_mgr_clone = alice_mgr.clone();
1947 bob_transport
1948 .expect_send_message()
1949 .once()
1950 .in_sequence(&mut sequence)
1951 .withf(move |peer, data| {
1952 msg_type(data, StartProtocolDiscriminants::SessionError)
1953 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1954 })
1955 .returning(move |_, data| {
1956 let alice_mgr_clone = alice_mgr_clone.clone();
1957 Box::pin(async move {
1958 alice_mgr_clone
1959 .dispatch_message(
1960 alice_pseudonym,
1961 ApplicationDataIn {
1962 data: data.data,
1963 packet_info: Default::default(),
1964 },
1965 )
1966 .await?;
1967 Ok(())
1968 })
1969 });
1970
1971 let mut jhs = Vec::new();
1972
1973 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1975 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1976
1977 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
1979 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1980
1981 let result = alice_mgr
1982 .new_session(
1983 bob_peer,
1984 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1985 SessionClientConfig {
1986 capabilities: Capabilities::empty(),
1987 pseudonym: alice_pseudonym.into(),
1988 surb_management: None,
1989 ..Default::default()
1990 },
1991 )
1992 .await;
1993
1994 assert!(
1995 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1996 );
1997
1998 Ok(())
1999 }
2000
2001 #[test_log::test(tokio::test)]
2002 async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
2003 -> anyhow::Result<()> {
2004 let alice_pseudonym = HoprPseudonym::random();
2005 let bob_peer: Address = (&ChainKeypair::random()).into();
2006
2007 let cfg = SessionManagerConfig {
2008 maximum_sessions: 1,
2009 ..Default::default()
2010 };
2011
2012 let alice_mgr = SessionManager::new(Default::default());
2013 let bob_mgr = SessionManager::new(cfg);
2014
2015 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
2017 bob_mgr
2018 .sessions
2019 .insert(
2020 SessionId::new(16u64, alice_pseudonym),
2021 SessionSlot {
2022 session_tx: Arc::new(dummy_tx),
2023 routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
2024 abort_handles: Default::default(),
2025 surb_mgmt: None,
2026 surb_estimator: None,
2027 #[cfg(feature = "telemetry")]
2028 telemetry: Arc::new(SessionTelemetry::new(
2029 SessionId::new(16u64, alice_pseudonym),
2030 Default::default(),
2031 )),
2032 },
2033 )
2034 .await;
2035
2036 let mut sequence = mockall::Sequence::new();
2037 let mut alice_transport = MockMsgSender::new();
2038 let mut bob_transport = MockMsgSender::new();
2039
2040 let bob_mgr_clone = bob_mgr.clone();
2042 alice_transport
2043 .expect_send_message()
2044 .once()
2045 .in_sequence(&mut sequence)
2046 .withf(move |peer, data| {
2047 msg_type(data, StartProtocolDiscriminants::StartSession)
2048 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2049 })
2050 .returning(move |_, data| {
2051 let bob_mgr_clone = bob_mgr_clone.clone();
2052 Box::pin(async move {
2053 bob_mgr_clone
2054 .dispatch_message(
2055 alice_pseudonym,
2056 ApplicationDataIn {
2057 data: data.data,
2058 packet_info: Default::default(),
2059 },
2060 )
2061 .await?;
2062 Ok(())
2063 })
2064 });
2065
2066 let alice_mgr_clone = alice_mgr.clone();
2068 bob_transport
2069 .expect_send_message()
2070 .once()
2071 .in_sequence(&mut sequence)
2072 .withf(move |peer, data| {
2073 msg_type(data, StartProtocolDiscriminants::SessionError)
2074 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2075 })
2076 .returning(move |_, data| {
2077 let alice_mgr_clone = alice_mgr_clone.clone();
2078 Box::pin(async move {
2079 alice_mgr_clone
2080 .dispatch_message(
2081 alice_pseudonym,
2082 ApplicationDataIn {
2083 data: data.data,
2084 packet_info: Default::default(),
2085 },
2086 )
2087 .await?;
2088 Ok(())
2089 })
2090 });
2091
2092 let mut jhs = Vec::new();
2093
2094 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2096 jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2097
2098 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2100 jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2101
2102 let result = alice_mgr
2103 .new_session(
2104 bob_peer,
2105 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2106 SessionClientConfig {
2107 capabilities: None.into(),
2108 pseudonym: alice_pseudonym.into(),
2109 surb_management: None,
2110 ..Default::default()
2111 },
2112 )
2113 .await;
2114
2115 assert!(
2116 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2117 );
2118
2119 Ok(())
2120 }
2121
2122 #[test_log::test(tokio::test)]
2123 async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
2124 let alice_pseudonym = HoprPseudonym::random();
2125 let bob_peer: Address = (&ChainKeypair::random()).into();
2126
2127 let alice_mgr = SessionManager::new(Default::default());
2128
2129 let mut sequence = mockall::Sequence::new();
2130 let mut alice_transport = MockMsgSender::new();
2131
2132 let alice_mgr_clone = alice_mgr.clone();
2134 alice_transport
2135 .expect_send_message()
2136 .once()
2137 .in_sequence(&mut sequence)
2138 .withf(move |peer, data| {
2139 msg_type(data, StartProtocolDiscriminants::StartSession)
2140 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2141 })
2142 .returning(move |_, data| {
2143 let alice_mgr_clone = alice_mgr_clone.clone();
2145 Box::pin(async move {
2146 alice_mgr_clone
2147 .dispatch_message(
2148 alice_pseudonym,
2149 ApplicationDataIn {
2150 data: data.data,
2151 packet_info: Default::default(),
2152 },
2153 )
2154 .await?;
2155 Ok(())
2156 })
2157 });
2158
2159 let alice_mgr_clone = alice_mgr.clone();
2161 alice_transport
2162 .expect_send_message()
2163 .once()
2164 .in_sequence(&mut sequence)
2165 .withf(move |peer, data| {
2166 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2167 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2168 })
2169 .returning(move |_, data| {
2170 let alice_mgr_clone = alice_mgr_clone.clone();
2171
2172 Box::pin(async move {
2173 alice_mgr_clone
2174 .dispatch_message(
2175 alice_pseudonym,
2176 ApplicationDataIn {
2177 data: data.data,
2178 packet_info: Default::default(),
2179 },
2180 )
2181 .await?;
2182 Ok(())
2183 })
2184 });
2185
2186 let (new_session_tx_alice, new_session_rx_alice) = futures::channel::mpsc::channel(1024);
2188 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2189
2190 let alice_session = alice_mgr
2191 .new_session(
2192 bob_peer,
2193 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2194 SessionClientConfig {
2195 capabilities: None.into(),
2196 pseudonym: alice_pseudonym.into(),
2197 surb_management: None,
2198 ..Default::default()
2199 },
2200 )
2201 .await;
2202
2203 println!("{alice_session:?}");
2204 assert!(matches!(
2205 alice_session,
2206 Err(TransportSessionError::Manager(SessionManagerError::Loopback))
2207 ));
2208
2209 drop(new_session_rx_alice);
2210 Ok(())
2211 }
2212
2213 #[test_log::test(tokio::test)]
2214 async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
2215 let bob_peer: Address = (&ChainKeypair::random()).into();
2216
2217 let cfg = SessionManagerConfig {
2218 initiation_timeout_base: Duration::from_millis(100),
2219 ..Default::default()
2220 };
2221
2222 let alice_mgr = SessionManager::new(cfg);
2223 let bob_mgr = SessionManager::new(Default::default());
2224
2225 let mut sequence = mockall::Sequence::new();
2226 let mut alice_transport = MockMsgSender::new();
2227 let bob_transport = MockMsgSender::new();
2228
2229 alice_transport
2231 .expect_send_message()
2232 .once()
2233 .in_sequence(&mut sequence)
2234 .withf(move |peer, data| {
2235 msg_type(data, StartProtocolDiscriminants::StartSession)
2236 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2237 })
2238 .returning(|_, _| Box::pin(async { Ok(()) }));
2239
2240 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2242 alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2243
2244 let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2246 bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
2247
2248 let result = alice_mgr
2249 .new_session(
2250 bob_peer,
2251 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2252 SessionClientConfig {
2253 capabilities: None.into(),
2254 pseudonym: None,
2255 surb_management: None,
2256 ..Default::default()
2257 },
2258 )
2259 .await;
2260
2261 assert!(matches!(result, Err(TransportSessionError::Timeout)));
2262
2263 Ok(())
2264 }
2265
2266 #[test_log::test(tokio::test)]
2267 async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
2268 let alice_pseudonym = HoprPseudonym::random();
2269 let bob_peer: Address = (&ChainKeypair::random()).into();
2270
2271 let bob_cfg = SessionManagerConfig::default();
2272 let alice_mgr = SessionManager::new(Default::default());
2273 let bob_mgr = SessionManager::new(bob_cfg.clone());
2274
2275 let mut alice_transport = MockMsgSender::new();
2276 let mut bob_transport = MockMsgSender::new();
2277
2278 let mut open_sequence = mockall::Sequence::new();
2280 let bob_mgr_clone = bob_mgr.clone();
2281 alice_transport
2282 .expect_send_message()
2283 .once()
2284 .in_sequence(&mut open_sequence)
2285 .withf(move |peer, data| {
2286 msg_type(data, StartProtocolDiscriminants::StartSession)
2287 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2288 })
2289 .returning(move |_, data| {
2290 let bob_mgr_clone = bob_mgr_clone.clone();
2291 Box::pin(async move {
2292 bob_mgr_clone
2293 .dispatch_message(
2294 alice_pseudonym,
2295 ApplicationDataIn {
2296 data: data.data,
2297 packet_info: Default::default(),
2298 },
2299 )
2300 .await?;
2301 Ok(())
2302 })
2303 });
2304
2305 let alice_mgr_clone = alice_mgr.clone();
2307 bob_transport
2308 .expect_send_message()
2309 .once()
2310 .in_sequence(&mut open_sequence)
2311 .withf(move |peer, data| {
2312 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2313 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2314 })
2315 .returning(move |_, data| {
2316 let alice_mgr_clone = alice_mgr_clone.clone();
2317 Box::pin(async move {
2318 alice_mgr_clone
2319 .dispatch_message(
2320 alice_pseudonym,
2321 ApplicationDataIn {
2322 data: data.data,
2323 packet_info: Default::default(),
2324 },
2325 )
2326 .await?;
2327 Ok(())
2328 })
2329 });
2330
2331 let bob_mgr_clone = bob_mgr.clone();
2333 alice_transport
2334 .expect_send_message()
2335 .times(5..)
2336 .withf(move |peer, data| {
2338 msg_type(data, StartProtocolDiscriminants::KeepAlive)
2339 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2340 })
2341 .returning(move |_, data| {
2342 let bob_mgr_clone = bob_mgr_clone.clone();
2343 Box::pin(async move {
2344 bob_mgr_clone
2345 .dispatch_message(
2346 alice_pseudonym,
2347 ApplicationDataIn {
2348 data: data.data,
2349 packet_info: Default::default(),
2350 },
2351 )
2352 .await?;
2353 Ok(())
2354 })
2355 });
2356
2357 let bob_mgr_clone = bob_mgr.clone();
2359 alice_transport
2360 .expect_send_message()
2361 .once()
2362 .withf(move |peer, data| {
2364 hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2365 data.data.plain_text.as_ref(),
2366 )
2367 .ok()
2368 .and_then(|m| m.try_as_segment())
2369 .map(|s| s.is_terminating())
2370 .unwrap_or(false)
2371 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2372 })
2373 .returning(move |_, data| {
2374 let bob_mgr_clone = bob_mgr_clone.clone();
2375 Box::pin(async move {
2376 bob_mgr_clone
2377 .dispatch_message(
2378 alice_pseudonym,
2379 ApplicationDataIn {
2380 data: data.data,
2381 packet_info: Default::default(),
2382 },
2383 )
2384 .await?;
2385 Ok(())
2386 })
2387 });
2388
2389 let mut ahs = Vec::new();
2390
2391 let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2393 ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2394
2395 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
2397 ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2398
2399 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2400
2401 let balancer_cfg = SurbBalancerConfig {
2402 target_surb_buffer_size: 10,
2403 max_surbs_per_sec: 100,
2404 ..Default::default()
2405 };
2406
2407 pin_mut!(new_session_rx_bob);
2408 let (alice_session, bob_session) = timeout(
2409 Duration::from_secs(2),
2410 futures::future::join(
2411 alice_mgr.new_session(
2412 bob_peer,
2413 SessionTarget::TcpStream(target.clone()),
2414 SessionClientConfig {
2415 pseudonym: alice_pseudonym.into(),
2416 capabilities: Capability::Segmentation.into(),
2417 surb_management: Some(balancer_cfg),
2418 ..Default::default()
2419 },
2420 ),
2421 new_session_rx_bob.next(),
2422 ),
2423 )
2424 .await?;
2425
2426 let mut alice_session = alice_session?;
2427 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2428
2429 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2430
2431 assert_eq!(
2432 Some(balancer_cfg),
2433 alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2434 );
2435
2436 let remote_cfg = bob_mgr
2437 .get_surb_balancer_config(bob_session.session.id())
2438 .await?
2439 .ok_or(anyhow!("no remote config at bob"))?;
2440 assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2441 assert_eq!(
2442 remote_cfg.max_surbs_per_sec,
2443 remote_cfg.target_surb_buffer_size
2444 / bob_cfg
2445 .minimum_surb_buffer_duration
2446 .max(MIN_SURB_BUFFER_DURATION)
2447 .as_secs()
2448 );
2449
2450 tokio::time::sleep(Duration::from_millis(1500)).await;
2452 alice_session.close().await?;
2453
2454 tokio::time::sleep(Duration::from_millis(300)).await;
2455 assert!(matches!(
2456 alice_mgr.ping_session(alice_session.id()).await,
2457 Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2458 ));
2459
2460 futures::stream::iter(ahs)
2461 .for_each(|ah| async move { ah.abort() })
2462 .await;
2463
2464 Ok(())
2465 }
2466}