1use std::{
2 ops::Range,
3 sync::{Arc, OnceLock, atomic::AtomicU64},
4 time::Duration,
5};
6
7use futures::{
8 FutureExt, StreamExt, TryStreamExt,
9 channel::mpsc::UnboundedSender,
10 future::{AbortHandle, Either},
11 pin_mut,
12};
13use hopr_crypto_packet::prelude::HoprPacket;
14use hopr_crypto_random::Randomizable;
15use hopr_internal_types::prelude::HoprPseudonym;
16use hopr_network_types::prelude::*;
17use hopr_primitive_types::prelude::Address;
18use hopr_transport_packet::prelude::{ApplicationData, ReservedTag, Tag};
19use tracing::{debug, error, info, trace, warn};
20
21use crate::{
22 IncomingSession, Session, SessionClientConfig, SessionId, SessionTarget,
23 balancer::{RateController, RateLimitExt, SurbBalancer, SurbFlowController},
24 errors::{SessionManagerError, TransportSessionError},
25 initiation::{StartChallenge, StartErrorReason, StartErrorType, StartEstablished, StartInitiation, StartProtocol},
26 traits::SendMsg,
27};
28
29#[cfg(all(feature = "prometheus", not(test)))]
30lazy_static::lazy_static! {
31 static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
32 "hopr_session_num_active_sessions",
33 "Number of currently active HOPR sessions"
34 ).unwrap();
35 static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
36 "hopr_session_established_sessions_count",
37 "Number of sessions that were successfully established as an Exit node"
38 ).unwrap();
39 static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
40 "hopr_session_initiated_sessions_count",
41 "Number of sessions that were successfully initiated as an Entry node"
42 ).unwrap();
43 static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
44 "hopr_session_received_error_count",
45 "Number of HOPR session errors received from an Exit node",
46 &["kind"]
47 ).unwrap();
48 static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
49 "hopr_session_sent_error_count",
50 "Number of HOPR session errors sent to an Entry node",
51 &["kind"]
52 ).unwrap();
53}
54
55#[derive(Clone, Debug, PartialEq, Eq, smart_default::SmartDefault)]
57pub struct SessionManagerConfig {
58 #[doc(hidden)]
66 #[default(_code = "16u64..1024u64")]
67 pub session_tag_range: Range<u64>,
68
69 #[default(Duration::from_secs(5))]
76 pub initiation_timeout_base: Duration,
77
78 #[default(Duration::from_secs(180))]
82 pub idle_timeout: Duration,
83
84 #[default(Duration::from_millis(500))]
89 pub balancer_sampling_interval: Duration,
90}
91
92fn close_session_after_eviction<S: SendMsg + Send + Sync + 'static>(
93 msg_sender: Arc<OnceLock<S>>,
94 session_id: SessionId,
95 session_data: CachedSession,
96 cause: moka::notification::RemovalCause,
97) -> moka::notification::ListenerFuture {
98 match cause {
101 r @ moka::notification::RemovalCause::Expired | r @ moka::notification::RemovalCause::Size
102 if msg_sender.get().is_some() =>
103 {
104 trace!(
105 ?session_id,
106 reason = ?r,
107 "session termination due to eviction from the cache"
108 );
109 let data = match ApplicationData::try_from(StartProtocol::CloseSession(session_id)) {
110 Ok(data) => data,
111 Err(error) => {
112 error!(%session_id, %error, "failed to serialize CloseSession");
113 return futures::future::ready(()).boxed();
114 }
115 };
116
117 let msg_sender = msg_sender.clone();
118 async move {
119 if let Err(error) = msg_sender
121 .get()
122 .unwrap()
123 .send_message(data, session_data.routing_opts)
124 .await
125 {
126 error!(
127 %session_id,
128 %error,
129 "could not send notification of session closure after cache eviction"
130 );
131 }
132
133 session_data.session_tx.close_channel();
134 debug!(
135 ?session_id,
136 reason = ?r,
137 "session has been closed due to cache eviction"
138 );
139
140 session_data.abort_handles.into_iter().for_each(|h| h.abort());
142
143 #[cfg(all(feature = "prometheus", not(test)))]
144 METRIC_ACTIVE_SESSIONS.decrement(1.0);
145 }
146 .boxed()
147 }
148 _ => futures::future::ready(()).boxed(),
149 }
150}
151
152async fn insert_into_next_slot<K, V, F>(cache: &moka::future::Cache<K, V>, generator: F, value: V) -> Option<K>
158where
159 K: Copy + std::hash::Hash + Eq + Send + Sync + 'static,
160 V: Clone + Send + Sync + 'static,
161 F: Fn(Option<K>) -> K,
162{
163 let initial = generator(None);
164 let mut next = initial;
165 loop {
166 let insertion_result = cache
167 .entry(next)
168 .and_try_compute_with(|e| {
169 if e.is_none() {
170 futures::future::ok::<_, ()>(moka::ops::compute::Op::Put(value.clone()))
171 } else {
172 futures::future::ok::<_, ()>(moka::ops::compute::Op::Nop)
173 }
174 })
175 .await;
176
177 if let Ok(moka::ops::compute::CompResult::Inserted(_)) = insertion_result {
179 return Some(next);
180 }
181
182 next = generator(Some(next));
184
185 if next == initial {
187 return None;
188 }
189 }
190}
191
192pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
194
195const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
197
198type SessionInitiationCache =
202 moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
203
204#[derive(Clone)]
205struct CachedSession {
206 session_tx: Arc<UnboundedSender<Box<[u8]>>>,
209 routing_opts: DestinationRouting,
210 abort_handles: Vec<AbortHandle>,
211}
212
213#[derive(Clone, Debug, PartialEq, Eq)]
215pub enum DispatchResult {
216 Processed,
218 Unrelated(ApplicationData),
220}
221
222pub(crate) struct KeepAliveController(pub(crate) RateController);
223
224impl SurbFlowController for KeepAliveController {
225 fn adjust_surb_flow(&self, surbs_per_sec: usize) {
226 self.0.set_rate_per_unit(
229 surbs_per_sec,
230 HoprPacket::MAX_SURBS_IN_PACKET as u32 * Duration::from_secs(1),
231 );
232 }
233}
234
235pub(crate) struct CountingSendMsg<T>(T, Arc<AtomicU64>);
236
237impl<T: SendMsg> CountingSendMsg<T> {
238 pub fn new(msg: T, counter: Arc<AtomicU64>) -> Self {
239 Self(msg, counter)
240 }
241}
242
243#[async_trait::async_trait]
244impl<T: SendMsg + Send + Sync> SendMsg for CountingSendMsg<T> {
245 async fn send_message(
246 &self,
247 data: ApplicationData,
248 destination: DestinationRouting,
249 ) -> Result<(), TransportSessionError> {
250 let num_surbs = HoprPacket::max_surbs_with_message(data.len()) as u64;
251 self.0.send_message(data, destination).await.inspect(|_| {
252 self.1.fetch_add(num_surbs, std::sync::atomic::Ordering::Relaxed);
253 })
254 }
255}
256
257pub struct SessionManager<S> {
275 session_initiations: SessionInitiationCache,
276 session_notifiers: Arc<OnceLock<(UnboundedSender<IncomingSession>, UnboundedSender<SessionId>)>>,
277 sessions: moka::future::Cache<SessionId, CachedSession>,
278 msg_sender: Arc<OnceLock<S>>,
279 cfg: SessionManagerConfig,
280}
281
282impl<S> Clone for SessionManager<S> {
283 fn clone(&self) -> Self {
284 Self {
285 session_initiations: self.session_initiations.clone(),
286 session_notifiers: self.session_notifiers.clone(),
287 sessions: self.sessions.clone(),
288 cfg: self.cfg.clone(),
289 msg_sender: self.msg_sender.clone(),
290 }
291 }
292}
293
294fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
295 base * (hops as u32)
296}
297
298pub const MIN_BALANCER_SAMPLING_INTERVAL: Duration = Duration::from_millis(100);
300
301impl<S: SendMsg + Clone + Send + Sync + 'static> SessionManager<S> {
302 pub fn new(mut cfg: SessionManagerConfig) -> Self {
304 let min_session_tag_range_reservation = ReservedTag::range().end;
305 debug_assert!(
306 min_session_tag_range_reservation > StartProtocol::<SessionId>::START_PROTOCOL_MESSAGE_TAG.as_u64(),
307 "invalid tag reservation range"
308 );
309
310 if cfg.session_tag_range.start < min_session_tag_range_reservation {
312 let diff = min_session_tag_range_reservation - cfg.session_tag_range.start;
313 cfg.session_tag_range = min_session_tag_range_reservation..cfg.session_tag_range.end + diff;
314 }
315
316 #[cfg(all(feature = "prometheus", not(test)))]
317 METRIC_ACTIVE_SESSIONS.set(0.0);
318
319 let msg_sender = Arc::new(OnceLock::new());
320 Self {
321 msg_sender: msg_sender.clone(),
322 session_initiations: moka::future::Cache::builder()
323 .max_capacity(
324 std::ops::Range {
325 start: cfg.session_tag_range.start,
326 end: cfg.session_tag_range.end,
327 }
328 .count() as u64,
329 )
330 .time_to_live(
331 2 * initiation_timeout_max_one_way(
332 cfg.initiation_timeout_base,
333 RoutingOptions::MAX_INTERMEDIATE_HOPS,
334 ),
335 )
336 .build(),
337 sessions: moka::future::Cache::builder()
338 .max_capacity(u16::MAX as u64)
339 .time_to_idle(cfg.idle_timeout)
340 .async_eviction_listener(move |k, v, c| {
341 let msg_sender = msg_sender.clone();
342 close_session_after_eviction(msg_sender, *k, v, c)
343 })
344 .build(),
345 session_notifiers: Arc::new(OnceLock::new()),
346 cfg,
347 }
348 }
349
350 pub fn start(
356 &self,
357 msg_sender: S,
358 new_session_notifier: UnboundedSender<IncomingSession>,
359 ) -> crate::errors::Result<Vec<hopr_async_runtime::AbortHandle>> {
360 self.msg_sender
361 .set(msg_sender)
362 .map_err(|_| SessionManagerError::AlreadyStarted)?;
363
364 let (session_close_tx, session_close_rx) = futures::channel::mpsc::unbounded();
365 self.session_notifiers
366 .set((new_session_notifier, session_close_tx))
367 .map_err(|_| SessionManagerError::AlreadyStarted)?;
368
369 let myself = self.clone();
370 let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable(session_close_rx.for_each_concurrent(
371 None,
372 move |closed_session_id| {
373 let myself = myself.clone();
374 async move {
375 trace!(
376 session_id = ?closed_session_id,
377 "sending notification of session closure done by us"
378 );
379 match myself.close_session(closed_session_id, true).await {
380 Ok(closed) if closed => debug!(
381 session_id = ?closed_session_id,
382 "session has been closed by us"
383 ),
384 Err(e) => error!(
385 session_id = ?closed_session_id,
386 error = %e,
387 "cannot initiate session closure notification"
388 ),
389 _ => {}
390 }
391 }
392 },
393 ));
394
395 let myself = self.clone();
400 let ah_session_expiration = hopr_async_runtime::spawn_as_abortable(async move {
401 let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
402 let timeout = 2 * initiation_timeout_max_one_way(
403 myself.cfg.initiation_timeout_base,
404 RoutingOptions::MAX_INTERMEDIATE_HOPS,
405 )
406 .min(myself.cfg.idle_timeout)
407 .mul_f64(jitter)
408 / 2;
409 futures_time::stream::interval(timeout.into())
410 .for_each(|_| {
411 trace!("executing session cache evictions");
412 futures::future::join(
413 myself.sessions.run_pending_tasks(),
414 myself.session_initiations.run_pending_tasks(),
415 )
416 .map(|_| ())
417 })
418 .await;
419 });
420
421 Ok(vec![ah_closure_notifications, ah_session_expiration])
422 }
423
424 pub fn is_started(&self) -> bool {
426 self.session_notifiers.get().is_some()
427 }
428
429 fn spawn_keep_alive_stream(
430 &self,
431 session_id: SessionId,
432 sender: Arc<CountingSendMsg<S>>,
433 routing: DestinationRouting,
434 ) -> (KeepAliveController, AbortHandle) {
435 let elem = StartProtocol::KeepAlive(session_id.into());
436
437 let (ka_stream, controller) = futures::stream::repeat(elem).rate_limit_per_unit(0, Duration::from_secs(1));
439
440 let (abort_handle, reg) = AbortHandle::new_pair();
441 let sender_clone = sender.clone();
442 let fwd_routing_clone = routing.clone();
443
444 debug!(%session_id, "spawning keep-alive stream");
446 hopr_async_runtime::prelude::spawn(
447 futures::stream::Abortable::new(ka_stream, reg)
448 .map(ApplicationData::try_from)
449 .try_for_each_concurrent(None, move |msg| {
450 let sender_clone = sender_clone.clone();
451 let fwd_routing_clone = fwd_routing_clone.clone();
452 async move { sender_clone.send_message(msg, fwd_routing_clone).await }
453 })
454 .then(move |res| {
455 match res {
456 Ok(_) => debug!(%session_id, "keep-alive stream done"),
457 Err(error) => error!(%session_id, %error, "keep-alive stream failed"),
458 }
459 futures::future::ready(())
460 }),
461 );
462
463 (KeepAliveController(controller), abort_handle)
464 }
465
466 pub async fn new_session(
474 &self,
475 destination: Address,
476 target: SessionTarget,
477 cfg: SessionClientConfig,
478 ) -> crate::errors::Result<Session> {
479 let msg_sender = self.msg_sender.get().ok_or(SessionManagerError::NotStarted)?;
480
481 let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
482 let challenge = insert_into_next_slot(
483 &self.session_initiations,
484 |ch| {
485 if let Some(challenge) = ch {
486 ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
487 } else {
488 hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
489 }
490 },
491 tx_initiation_done,
492 )
493 .await
494 .ok_or(SessionManagerError::NoChallengeSlots)?; trace!(challenge, ?cfg, "initiating session with config");
498 let start_session_msg = StartProtocol::<SessionId>::StartSession(StartInitiation {
499 challenge,
500 target,
501 capabilities: cfg.capabilities,
502 });
503
504 let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
505 let forward_routing = DestinationRouting::Forward {
506 destination,
507 pseudonym: Some(pseudonym), forward_options: cfg.forward_path_options.clone(),
509 return_options: cfg.return_path_options.clone().into(),
510 };
511
512 info!(challenge, %pseudonym, %destination, "new session request");
514 msg_sender
515 .send_message(start_session_msg.try_into()?, forward_routing.clone())
516 .await?;
517
518 pin_mut!(rx_initiation_done);
520 let initiation_done = TryStreamExt::try_next(&mut rx_initiation_done);
521
522 let timeout = hopr_async_runtime::prelude::sleep(initiation_timeout_max_one_way(
524 self.cfg.initiation_timeout_base,
525 cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
526 ));
527 pin_mut!(timeout);
528
529 trace!(challenge, "awaiting session establishment");
530 match futures::future::select(initiation_done, timeout).await {
531 Either::Left((Ok(Some(est)), _)) => {
532 let session_id = est.session_id;
534 debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
535
536 let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
537 let mut abort_handles = Vec::new();
538 let notifier = self
539 .session_notifiers
540 .get()
541 .map(|(_, notifier)| {
542 let notifier = notifier.clone();
543 Box::new(move |session_id: SessionId| {
544 let _ = notifier
545 .unbounded_send(session_id)
546 .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
547 })
548 })
549 .ok_or(SessionManagerError::NotStarted)?;
550
551 let session = if let Some(balancer_config) = cfg.surb_management {
552 let surb_production_counter = Arc::new(AtomicU64::new(0));
553 let surb_consumption_counter = Arc::new(AtomicU64::new(0));
554
555 let sender = Arc::new(CountingSendMsg::new(
557 msg_sender.clone(),
558 surb_production_counter.clone(),
559 ));
560
561 let (ka_controller, ka_abort_handle) =
563 self.spawn_keep_alive_stream(session_id, sender.clone(), forward_routing.clone());
564 abort_handles.push(ka_abort_handle);
565
566 debug!(%session_id, ?balancer_config, "spawning SURB balancer");
568 let mut balancer = SurbBalancer::new(
569 session_id,
570 surb_production_counter,
571 surb_consumption_counter.clone(),
572 ka_controller,
573 balancer_config,
574 );
575
576 let (surbs_ready_tx, surbs_ready_rx) = futures::channel::oneshot::channel();
577 let mut surbs_ready_tx = Some(surbs_ready_tx);
578 let (balancer_abort_handle, reg) = AbortHandle::new_pair();
579 hopr_async_runtime::prelude::spawn(
580 futures::stream::Abortable::new(
581 futures_time::stream::interval(
582 self.cfg
583 .balancer_sampling_interval
584 .max(MIN_BALANCER_SAMPLING_INTERVAL)
585 .into(),
586 ),
587 reg,
588 )
589 .for_each(move |_| {
590 let level = balancer.update();
591 if surbs_ready_tx.is_some() && level >= balancer_config.target_surb_buffer_size / 2 {
593 let _ = surbs_ready_tx.take().unwrap().send(level);
594 }
595 futures::future::ready(())
596 })
597 .then(move |_| {
598 debug!(%session_id, "balancer done");
599 futures::future::ready(())
600 }),
601 );
602 abort_handles.push(balancer_abort_handle);
603
604 let timeout = hopr_async_runtime::prelude::sleep(SESSION_READINESS_TIMEOUT);
607 pin_mut!(timeout);
608 match futures::future::select(surbs_ready_rx, timeout).await {
609 Either::Left((Ok(surb_level), _)) => {
610 info!(%session_id, surb_level, "session is ready");
611 }
612 Either::Left((Err(_), _)) => {
613 return Err(
614 SessionManagerError::Other("surb balancer was cancelled prematurely".into()).into(),
615 );
616 }
617 Either::Right(_) => {
618 warn!(%session_id, "session didn't reach target SURB buffer size");
619 }
620 }
621
622 Session::new(
623 session_id,
624 forward_routing.clone(),
625 cfg.capabilities,
626 sender,
627 Box::pin(rx.inspect(move |_| {
628 surb_consumption_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
630 })),
631 Some(notifier),
632 )
633 } else {
634 warn!(%session_id, "session ready without SURB balancing");
635 Session::new(
636 session_id,
637 forward_routing.clone(),
638 cfg.capabilities,
639 Arc::new(msg_sender.clone()),
640 Box::pin(rx),
641 Some(notifier),
642 )
643 };
644
645 if let moka::ops::compute::CompResult::Inserted(_) = self
647 .sessions
648 .entry(session_id)
649 .and_compute_with(|entry| {
650 futures::future::ready(if entry.is_none() {
651 moka::ops::compute::Op::Put(CachedSession {
652 session_tx: Arc::new(tx),
653 routing_opts: forward_routing,
654 abort_handles,
655 })
656 } else {
657 moka::ops::compute::Op::Nop
658 })
659 })
660 .await
661 {
662 #[cfg(all(feature = "prometheus", not(test)))]
663 {
664 METRIC_NUM_INITIATED_SESSIONS.increment();
665 METRIC_ACTIVE_SESSIONS.increment(1.0);
666 }
667
668 Ok(session)
669 } else {
670 error!(%session_id, "session already exists - loopback attempt");
672 Err(SessionManagerError::Loopback.into())
673 }
674 }
675 Either::Left((Ok(None), _)) => Err(SessionManagerError::Other(
676 "internal error: sender has been closed without completing the session establishment".into(),
677 )
678 .into()),
679 Either::Left((Err(e), _)) => {
680 error!(
682 challenge = e.challenge,
683 error = ?e,
684 "the other party rejected the session initiation with error"
685 );
686 Err(TransportSessionError::Rejected(e.reason))
687 }
688 Either::Right(_) => {
689 error!(challenge, "session initiation attempt timed out");
691
692 #[cfg(all(feature = "prometheus", not(test)))]
693 METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
694
695 Err(TransportSessionError::Timeout)
696 }
697 }
698 }
699
700 pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
704 if let Some(session_data) = self.sessions.get(id).await {
705 Ok(self
706 .msg_sender
707 .get()
708 .ok_or(SessionManagerError::NotStarted)?
709 .send_message(
710 StartProtocol::KeepAlive((*id).into()).try_into()?,
711 session_data.routing_opts.clone(),
712 )
713 .await?)
714 } else {
715 Err(SessionManagerError::NonExistingSession.into())
716 }
717 }
718
719 pub async fn dispatch_message(
726 &self,
727 pseudonym: HoprPseudonym,
728 data: ApplicationData,
729 ) -> crate::errors::Result<DispatchResult> {
730 if data.application_tag == StartProtocol::<SessionId>::START_PROTOCOL_MESSAGE_TAG {
731 trace!(tag = %data.application_tag, "dispatching Start protocol message");
733 return self
734 .handle_start_protocol_message(pseudonym, data)
735 .await
736 .map(|_| DispatchResult::Processed);
737 } else if self.cfg.session_tag_range.contains(&data.application_tag.as_u64()) {
738 let session_id = SessionId::new(data.application_tag, pseudonym);
739
740 return if let Some(session_data) = self.sessions.get(&session_id).await {
741 trace!(?session_id, "received data for a registered session");
742
743 Ok(session_data
744 .session_tx
745 .unbounded_send(data.plain_text)
746 .map(|_| DispatchResult::Processed)
747 .map_err(|e| SessionManagerError::Other(e.to_string()))?)
748 } else {
749 error!(%session_id, "received data from an unestablished session");
750 Err(TransportSessionError::UnknownData)
751 };
752 }
753
754 trace!(%data.application_tag, "received data not associated with session protocol or any existing session");
755 Ok(DispatchResult::Unrelated(data))
756 }
757
758 async fn handle_start_protocol_message(
759 &self,
760 pseudonym: HoprPseudonym,
761 data: ApplicationData,
762 ) -> crate::errors::Result<()> {
763 match StartProtocol::<SessionId>::try_from(data)? {
764 StartProtocol::StartSession(session_req) => {
765 trace!(challenge = session_req.challenge, "received session initiation request");
766
767 debug!(%pseudonym, "got new session request, searching for a free session slot");
768
769 let msg_sender = self.msg_sender.get().ok_or(SessionManagerError::NotStarted)?;
770
771 let (new_session_notifier, close_session_notifier) = self
772 .session_notifiers
773 .get()
774 .cloned()
775 .ok_or(SessionManagerError::NotStarted)?;
776
777 let reply_routing = DestinationRouting::Return(SurbMatcher::Pseudonym(pseudonym));
779
780 let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
782 if let Some(session_id) = insert_into_next_slot(
783 &self.sessions,
784 |sid| {
785 let next_tag: Tag = match sid {
788 Some(session_id) => ((session_id.tag().as_u64() + 1) % self.cfg.session_tag_range.end)
789 .max(self.cfg.session_tag_range.start)
790 .into(),
791 None => hopr_crypto_random::random_integer(
792 self.cfg.session_tag_range.start,
793 Some(self.cfg.session_tag_range.end),
794 )
795 .into(),
796 };
797 SessionId::new(next_tag, pseudonym)
798 },
799 CachedSession {
800 session_tx: Arc::new(tx_session_data),
801 routing_opts: reply_routing.clone(),
802 abort_handles: vec![],
803 },
804 )
805 .await
806 {
807 debug!(?session_id, "assigning a new session");
808
809 let session = Session::new(
810 session_id,
811 reply_routing.clone(),
812 session_req.capabilities,
813 Arc::new(msg_sender.clone()),
814 Box::pin(rx_session_data),
815 Some(Box::new(move |session_id: SessionId| {
816 if let Err(error) = close_session_notifier.unbounded_send(session_id) {
817 error!(%session_id, %error, "failed to notify session closure");
818 }
819 })),
820 );
821
822 let incoming_session = IncomingSession {
824 session,
825 target: session_req.target,
826 };
827
828 if let Err(error) = new_session_notifier.unbounded_send(incoming_session) {
830 warn!(%error, "failed to send session to incoming session queue");
831 }
832
833 trace!(?session_id, "session notification sent");
834
835 let data = StartProtocol::SessionEstablished(StartEstablished {
838 orig_challenge: session_req.challenge,
839 session_id,
840 });
841
842 msg_sender
843 .send_message(data.try_into()?, reply_routing)
844 .await
845 .map_err(|e| {
846 SessionManagerError::Other(format!("failed to send session establishment message: {e}"))
847 })?;
848
849 info!(%session_id, "new session established");
850
851 #[cfg(all(feature = "prometheus", not(test)))]
852 {
853 METRIC_NUM_ESTABLISHED_SESSIONS.increment();
854 METRIC_ACTIVE_SESSIONS.increment(1.0);
855 }
856 } else {
857 error!(
858 %pseudonym,
859 "failed to reserve a new session slot"
860 );
861
862 let reason = StartErrorReason::NoSlotsAvailable;
864 let data = StartProtocol::<SessionId>::SessionError(StartErrorType {
865 challenge: session_req.challenge,
866 reason,
867 });
868
869 msg_sender
870 .send_message(data.try_into()?, reply_routing.clone())
871 .await
872 .map_err(|e| {
873 SessionManagerError::Other(format!(
874 "failed to send session establishment error message: {e}"
875 ))
876 })?;
877
878 trace!(%pseudonym, "session establishment failure message sent");
879
880 #[cfg(all(feature = "prometheus", not(test)))]
881 METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
882 }
883 }
884 StartProtocol::SessionEstablished(est) => {
885 trace!(
886 session_id = ?est.session_id,
887 "received session establishment confirmation"
888 );
889 let challenge = est.orig_challenge;
890 let session_id = est.session_id;
891 if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
892 if let Err(e) = tx_est.unbounded_send(Ok(est)) {
893 return Err(SessionManagerError::Other(format!(
894 "could not notify session {session_id} establishment: {e}"
895 ))
896 .into());
897 }
898 debug!(?session_id, challenge, "session establishment complete");
899 } else {
900 error!(%session_id, challenge, "session establishment attempt expired");
901 }
902 }
903 StartProtocol::SessionError(error) => {
904 trace!(
905 challenge = error.challenge,
906 error = ?error.reason,
907 "failed to initialize a session",
908 );
909 if let Some(tx_est) = self.session_initiations.remove(&error.challenge).await {
912 if let Err(e) = tx_est.unbounded_send(Err(error)) {
913 return Err(SessionManagerError::Other(format!(
914 "could not notify session establishment error {error:?}: {e}"
915 ))
916 .into());
917 }
918 error!(
919 challenge = error.challenge,
920 ?error,
921 "session establishment error received"
922 );
923 } else {
924 error!(
925 challenge = error.challenge,
926 ?error,
927 "session establishment attempt expired before error could be delivered"
928 );
929 }
930
931 #[cfg(all(feature = "prometheus", not(test)))]
932 METRIC_RECEIVED_SESSION_ERRS.increment(&[&error.reason.to_string()])
933 }
934 StartProtocol::CloseSession(session_id) => {
935 trace!(?session_id, "received session close request");
936 match self.close_session(session_id, false).await {
937 Ok(closed) if closed => debug!(?session_id, "session has been closed by the other party"),
938 Err(error) => error!(
939 %session_id,
940 %error,
941 "session could not be closed on other party's request"
942 ),
943 _ => {}
944 }
945 }
946 StartProtocol::KeepAlive(msg) => {
947 let session_id = msg.id;
948 if self.sessions.get(&session_id).await.is_some() {
949 trace!(?session_id, "received keep-alive request");
950 } else {
951 error!(%session_id, "received keep-alive request for an unknown session");
952 }
953 }
954 }
955
956 Ok(())
957 }
958
959 async fn close_session(&self, session_id: SessionId, notify_closure: bool) -> crate::errors::Result<bool> {
960 if let Some(session_data) = self.sessions.remove(&session_id).await {
961 if notify_closure {
963 trace!(?session_id, "sending session termination");
964 self.msg_sender
965 .get()
966 .ok_or(SessionManagerError::NotStarted)?
967 .send_message(
968 StartProtocol::CloseSession(session_id).try_into()?,
969 session_data.routing_opts,
970 )
971 .await?;
972 }
973
974 session_data.session_tx.close_channel();
976 trace!(?session_id, "data tx channel closed on session");
977
978 session_data.abort_handles.into_iter().for_each(|h| h.abort());
980
981 #[cfg(all(feature = "prometheus", not(test)))]
982 METRIC_ACTIVE_SESSIONS.decrement(1.0);
983 Ok(true)
984 } else {
985 debug!(
987 ?session_id,
988 "could not find session id to close, maybe the session is already closed"
989 );
990 Ok(false)
991 }
992 }
993}
994
995#[cfg(test)]
996mod tests {
997 use anyhow::anyhow;
998 use futures::AsyncWriteExt;
999 use hopr_crypto_random::Randomizable;
1000 use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
1001 use hopr_primitive_types::prelude::Address;
1002 use tokio::time::timeout;
1003
1004 use super::*;
1005 use crate::{
1006 Capabilities, Capability, balancer::SurbBalancerConfig, initiation::StartProtocolDiscriminants,
1007 types::SessionTarget,
1008 };
1009
1010 mockall::mock! {
1011 MsgSender {}
1012 impl Clone for MsgSender {
1013 fn clone(&self) -> Self;
1014 }
1015 impl SendMsg for MsgSender {
1016 fn send_message<'life0, 'async_trait>
1017 (
1018 &'life0 self,
1019 data: ApplicationData,
1020 routing: DestinationRouting,
1021 )
1022 -> std::pin::Pin<Box<dyn std::future::Future<Output=std::result::Result<(),TransportSessionError>> + Send + 'async_trait>>
1023 where
1024 'life0: 'async_trait,
1025 Self: Sync + 'async_trait;
1026 }
1027 }
1028
1029 fn msg_type(data: &ApplicationData, expected: StartProtocolDiscriminants) -> bool {
1030 expected
1031 == StartProtocolDiscriminants::from(
1032 StartProtocol::<SessionId>::decode(data.application_tag, &data.plain_text)
1033 .expect("failed to parse message"),
1034 )
1035 }
1036
1037 #[tokio::test]
1038 async fn test_insert_into_next_slot() -> anyhow::Result<()> {
1039 let cache = moka::future::Cache::new(10);
1040
1041 for i in 0..5 {
1042 let v = insert_into_next_slot(&cache, |prev| prev.map(|v| (v + 1) % 5).unwrap_or(0), "foo".to_string())
1043 .await
1044 .ok_or(anyhow!("should insert"))?;
1045 assert_eq!(v, i);
1046 assert_eq!(Some("foo".to_string()), cache.get(&i).await);
1047 }
1048
1049 assert!(
1050 insert_into_next_slot(&cache, |prev| prev.map(|v| (v + 1) % 5).unwrap_or(0), "foo".to_string())
1051 .await
1052 .is_none(),
1053 "must not find slot when full"
1054 );
1055
1056 Ok(())
1057 }
1058
1059 #[test_log::test(tokio::test)]
1060 async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1061 {
1062 let alice_pseudonym = HoprPseudonym::random();
1063 let bob_peer: Address = (&ChainKeypair::random()).into();
1064
1065 let alice_mgr = SessionManager::new(Default::default());
1066 let bob_mgr = SessionManager::new(Default::default());
1067
1068 let mut sequence = mockall::Sequence::new();
1069 let mut alice_transport = MockMsgSender::new();
1070 let mut bob_transport = MockMsgSender::new();
1071
1072 let bob_mgr_clone = bob_mgr.clone();
1074 alice_transport
1075 .expect_send_message()
1076 .once()
1077 .in_sequence(&mut sequence)
1078 .withf(move |data, peer| {
1079 msg_type(data, StartProtocolDiscriminants::StartSession)
1080 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1081 })
1082 .returning(move |data, _| {
1083 let bob_mgr_clone = bob_mgr_clone.clone();
1084 Box::pin(async move {
1085 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1086 Ok(())
1087 })
1088 });
1089
1090 bob_transport
1092 .expect_clone()
1093 .once()
1094 .in_sequence(&mut sequence)
1095 .return_once(MockMsgSender::new);
1096
1097 let alice_mgr_clone = alice_mgr.clone();
1099 bob_transport
1100 .expect_send_message()
1101 .once()
1102 .in_sequence(&mut sequence)
1103 .withf(move |data, peer| {
1104 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1105 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1106 })
1107 .returning(move |data, _| {
1108 let alice_mgr_clone = alice_mgr_clone.clone();
1109
1110 Box::pin(async move {
1111 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1112 Ok(())
1113 })
1114 });
1115
1116 alice_transport
1118 .expect_clone()
1119 .once()
1120 .in_sequence(&mut sequence)
1121 .return_once(MockMsgSender::new);
1122
1123 let bob_mgr_clone = bob_mgr.clone();
1125 alice_transport
1126 .expect_send_message()
1127 .once()
1128 .in_sequence(&mut sequence)
1129 .withf(move |data, peer| {
1130 msg_type(data, StartProtocolDiscriminants::CloseSession)
1131 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1132 })
1133 .returning(move |data, _| {
1134 let bob_mgr_clone = bob_mgr_clone.clone();
1135 Box::pin(async move {
1136 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1137 Ok(())
1138 })
1139 });
1140
1141 let alice_mgr_clone = alice_mgr.clone();
1143 bob_transport
1144 .expect_send_message()
1145 .once()
1146 .in_sequence(&mut sequence)
1147 .withf(move |data, peer| {
1148 msg_type(data, StartProtocolDiscriminants::CloseSession)
1149 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1150 })
1151 .returning(move |data, _| {
1152 let alice_mgr_clone = alice_mgr_clone.clone();
1153 Box::pin(async move {
1154 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1155 Ok(())
1156 })
1157 });
1158
1159 let mut ahs = Vec::new();
1160
1161 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1163 ahs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
1164
1165 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1167 ahs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
1168
1169 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1170
1171 pin_mut!(new_session_rx_bob);
1172 let (alice_session, bob_session) = timeout(
1173 Duration::from_secs(2),
1174 futures::future::join(
1175 alice_mgr.new_session(
1176 bob_peer,
1177 SessionTarget::TcpStream(target.clone()),
1178 SessionClientConfig {
1179 pseudonym: alice_pseudonym.into(),
1180 surb_management: None,
1181 ..Default::default()
1182 },
1183 ),
1184 new_session_rx_bob.next(),
1185 ),
1186 )
1187 .await?;
1188
1189 let mut alice_session = alice_session?;
1190 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1191
1192 assert_eq!(
1193 alice_session.capabilities(),
1194 &Capabilities::from(Capability::Segmentation)
1195 );
1196 assert_eq!(alice_session.capabilities(), bob_session.session.capabilities());
1197 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1198
1199 tokio::time::sleep(Duration::from_millis(100)).await;
1200 alice_session.close().await?;
1201
1202 tokio::time::sleep(Duration::from_millis(100)).await;
1203 futures::stream::iter(ahs)
1204 .for_each(|ah| async move { ah.abort() })
1205 .await;
1206
1207 Ok(())
1208 }
1209
1210 #[test_log::test(tokio::test)]
1211 async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1212 let alice_pseudonym = HoprPseudonym::random();
1213 let bob_peer: Address = (&ChainKeypair::random()).into();
1214
1215 let cfg = SessionManagerConfig {
1216 idle_timeout: Duration::from_millis(200),
1217 ..Default::default()
1218 };
1219
1220 let alice_mgr = SessionManager::new(cfg);
1221 let bob_mgr = SessionManager::new(Default::default());
1222
1223 let mut sequence = mockall::Sequence::new();
1224 let mut alice_transport = MockMsgSender::new();
1225 let mut bob_transport = MockMsgSender::new();
1226
1227 let bob_mgr_clone = bob_mgr.clone();
1229 alice_transport
1230 .expect_send_message()
1231 .once()
1232 .in_sequence(&mut sequence)
1233 .withf(move |data, peer| {
1234 msg_type(data, StartProtocolDiscriminants::StartSession)
1235 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1236 })
1237 .returning(move |data, _| {
1238 let bob_mgr_clone = bob_mgr_clone.clone();
1239 Box::pin(async move {
1240 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1241 Ok(())
1242 })
1243 });
1244
1245 bob_transport
1247 .expect_clone()
1248 .once()
1249 .in_sequence(&mut sequence)
1250 .return_once(MockMsgSender::new);
1251
1252 let alice_mgr_clone = alice_mgr.clone();
1254 bob_transport
1255 .expect_send_message()
1256 .once()
1257 .in_sequence(&mut sequence)
1258 .withf(move |data, peer| {
1259 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1260 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1261 })
1262 .returning(move |data, _| {
1263 let alice_mgr_clone = alice_mgr_clone.clone();
1264
1265 Box::pin(async move {
1266 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1267 Ok(())
1268 })
1269 });
1270
1271 alice_transport
1273 .expect_clone()
1274 .once()
1275 .in_sequence(&mut sequence)
1276 .return_once(MockMsgSender::new);
1277
1278 let bob_mgr_clone = bob_mgr.clone();
1280 alice_transport
1281 .expect_send_message()
1282 .once()
1283 .in_sequence(&mut sequence)
1284 .withf(move |data, peer| {
1285 msg_type(data, StartProtocolDiscriminants::CloseSession)
1286 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1287 })
1288 .returning(move |data, _| {
1289 let bob_mgr_clone = bob_mgr_clone.clone();
1290 Box::pin(async move {
1291 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1292 Ok(())
1293 })
1294 });
1295
1296 let alice_mgr_clone = alice_mgr.clone();
1298 bob_transport
1299 .expect_send_message()
1300 .once()
1301 .in_sequence(&mut sequence)
1302 .withf(move |data, peer| {
1303 msg_type(data, StartProtocolDiscriminants::CloseSession)
1304 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1305 })
1306 .returning(move |data, _| {
1307 let alice_mgr_clone = alice_mgr_clone.clone();
1308
1309 Box::pin(async move {
1310 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1311 Ok(())
1312 })
1313 });
1314
1315 let mut ahs = Vec::new();
1316
1317 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1319 ahs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
1320
1321 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1323 ahs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
1324
1325 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1326
1327 pin_mut!(new_session_rx_bob);
1328 let (alice_session, bob_session) = timeout(
1329 Duration::from_secs(2),
1330 futures::future::join(
1331 alice_mgr.new_session(
1332 bob_peer,
1333 SessionTarget::TcpStream(target.clone()),
1334 SessionClientConfig {
1335 pseudonym: alice_pseudonym.into(),
1336 surb_management: None,
1337 ..Default::default()
1338 },
1339 ),
1340 new_session_rx_bob.next(),
1341 ),
1342 )
1343 .await?;
1344
1345 let alice_session = alice_session?;
1346 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1347
1348 assert_eq!(
1349 alice_session.capabilities(),
1350 &Capabilities::from(Capability::Segmentation)
1351 );
1352 assert_eq!(alice_session.capabilities(), bob_session.session.capabilities());
1353 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1354
1355 tokio::time::sleep(Duration::from_millis(300)).await;
1357
1358 futures::stream::iter(ahs)
1359 .for_each(|ah| async move { ah.abort() })
1360 .await;
1361
1362 Ok(())
1363 }
1364
1365 #[test_log::test(tokio::test)]
1366 async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1367 let alice_pseudonym = HoprPseudonym::random();
1368 let bob_peer: Address = (&ChainKeypair::random()).into();
1369
1370 let cfg = SessionManagerConfig {
1371 session_tag_range: 16u64..17u64, ..Default::default()
1373 };
1374
1375 let alice_mgr = SessionManager::new(Default::default());
1376 let bob_mgr = SessionManager::new(cfg);
1377
1378 let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1380 bob_mgr
1381 .sessions
1382 .insert(
1383 SessionId::new(16u64, alice_pseudonym),
1384 CachedSession {
1385 session_tx: Arc::new(dummy_tx),
1386 routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1387 abort_handles: Vec::new(),
1388 },
1389 )
1390 .await;
1391
1392 let mut sequence = mockall::Sequence::new();
1393 let mut alice_transport = MockMsgSender::new();
1394 let mut bob_transport = MockMsgSender::new();
1395
1396 let bob_mgr_clone = bob_mgr.clone();
1398 alice_transport
1399 .expect_send_message()
1400 .once()
1401 .in_sequence(&mut sequence)
1402 .withf(move |data, peer| {
1403 msg_type(data, StartProtocolDiscriminants::StartSession)
1404 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1405 })
1406 .returning(move |data, _| {
1407 let bob_mgr_clone = bob_mgr_clone.clone();
1408 Box::pin(async move {
1409 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1410 Ok(())
1411 })
1412 });
1413
1414 let alice_mgr_clone = alice_mgr.clone();
1416 bob_transport
1417 .expect_send_message()
1418 .once()
1419 .in_sequence(&mut sequence)
1420 .withf(move |data, peer| {
1421 msg_type(data, StartProtocolDiscriminants::SessionError)
1422 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1423 })
1424 .returning(move |data, _| {
1425 let alice_mgr_clone = alice_mgr_clone.clone();
1426 Box::pin(async move {
1427 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1428 Ok(())
1429 })
1430 });
1431
1432 let mut jhs = Vec::new();
1433
1434 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1436 jhs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
1437
1438 let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1440 jhs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
1441
1442 let result = alice_mgr
1443 .new_session(
1444 bob_peer,
1445 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1446 SessionClientConfig {
1447 capabilities: Capabilities::empty(),
1448 pseudonym: alice_pseudonym.into(),
1449 surb_management: None,
1450 ..Default::default()
1451 },
1452 )
1453 .await;
1454
1455 assert!(
1456 matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1457 );
1458
1459 Ok(())
1460 }
1461
1462 #[test_log::test(tokio::test)]
1463 async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
1464 let alice_pseudonym = HoprPseudonym::random();
1465 let bob_peer: Address = (&ChainKeypair::random()).into();
1466
1467 let alice_mgr = SessionManager::new(Default::default());
1468
1469 let mut sequence = mockall::Sequence::new();
1470 let mut alice_transport = MockMsgSender::new();
1471
1472 let alice_mgr_clone = alice_mgr.clone();
1474 alice_transport
1475 .expect_send_message()
1476 .once()
1477 .in_sequence(&mut sequence)
1478 .withf(move |data, peer| {
1479 msg_type(data, StartProtocolDiscriminants::StartSession)
1480 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1481 })
1482 .returning(move |data, _| {
1483 let alice_mgr_clone = alice_mgr_clone.clone();
1485 Box::pin(async move {
1486 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1487 Ok(())
1488 })
1489 });
1490
1491 alice_transport
1493 .expect_clone()
1494 .once()
1495 .in_sequence(&mut sequence)
1496 .return_once(MockMsgSender::new);
1497
1498 let alice_mgr_clone = alice_mgr.clone();
1500 alice_transport
1501 .expect_send_message()
1502 .once()
1503 .in_sequence(&mut sequence)
1504 .withf(move |data, peer| {
1505 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1506 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1507 })
1508 .returning(move |data, _| {
1509 let alice_mgr_clone = alice_mgr_clone.clone();
1510
1511 Box::pin(async move {
1512 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1513 Ok(())
1514 })
1515 });
1516
1517 alice_transport
1519 .expect_clone()
1520 .once()
1521 .in_sequence(&mut sequence)
1522 .return_once(MockMsgSender::new);
1523
1524 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1526 alice_mgr.start(alice_transport, new_session_tx_alice)?;
1527
1528 let alice_session = alice_mgr
1529 .new_session(
1530 bob_peer,
1531 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1532 SessionClientConfig {
1533 capabilities: Capabilities::empty(),
1534 pseudonym: alice_pseudonym.into(),
1535 surb_management: None,
1536 ..Default::default()
1537 },
1538 )
1539 .await;
1540
1541 println!("{:?}", alice_session);
1542 assert!(matches!(
1543 alice_session,
1544 Err(TransportSessionError::Manager(SessionManagerError::Loopback))
1545 ));
1546
1547 Ok(())
1548 }
1549
1550 #[test_log::test(tokio::test)]
1551 async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
1552 let bob_peer: Address = (&ChainKeypair::random()).into();
1553
1554 let cfg = SessionManagerConfig {
1555 initiation_timeout_base: Duration::from_millis(100),
1556 ..Default::default()
1557 };
1558
1559 let alice_mgr = SessionManager::new(cfg);
1560 let bob_mgr = SessionManager::new(Default::default());
1561
1562 let mut sequence = mockall::Sequence::new();
1563 let mut alice_transport = MockMsgSender::new();
1564 let bob_transport = MockMsgSender::new();
1565
1566 alice_transport
1568 .expect_send_message()
1569 .once()
1570 .in_sequence(&mut sequence)
1571 .withf(move |data, peer| {
1572 msg_type(data, StartProtocolDiscriminants::StartSession)
1573 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1574 })
1575 .returning(|_, _| Box::pin(async { Ok(()) }));
1576
1577 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1579 alice_mgr.start(alice_transport, new_session_tx_alice)?;
1580
1581 let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1583 bob_mgr.start(bob_transport, new_session_tx_bob)?;
1584
1585 let result = alice_mgr
1586 .new_session(
1587 bob_peer,
1588 SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1589 SessionClientConfig {
1590 capabilities: Capabilities::empty(),
1591 pseudonym: None,
1592 surb_management: None,
1593 ..Default::default()
1594 },
1595 )
1596 .await;
1597
1598 assert!(matches!(result, Err(TransportSessionError::Timeout)));
1599
1600 Ok(())
1601 }
1602
1603 #[test_log::test(tokio::test)]
1604 async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
1605 let alice_pseudonym = HoprPseudonym::random();
1606 let bob_peer: Address = (&ChainKeypair::random()).into();
1607
1608 let alice_mgr = SessionManager::new(Default::default());
1609 let bob_mgr = SessionManager::new(Default::default());
1610
1611 let mut sequence = mockall::Sequence::new();
1612 let mut alice_transport = MockMsgSender::new();
1613 let mut bob_transport = MockMsgSender::new();
1614
1615 let bob_mgr_clone = bob_mgr.clone();
1617 alice_transport
1618 .expect_send_message()
1619 .once()
1620 .in_sequence(&mut sequence)
1621 .withf(move |data, peer| {
1622 msg_type(data, StartProtocolDiscriminants::StartSession)
1623 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1624 })
1625 .returning(move |data, _| {
1626 let bob_mgr_clone = bob_mgr_clone.clone();
1627 Box::pin(async move {
1628 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1629 Ok(())
1630 })
1631 });
1632
1633 bob_transport
1635 .expect_clone()
1636 .once()
1637 .in_sequence(&mut sequence)
1638 .return_once(MockMsgSender::new);
1639
1640 let alice_mgr_clone = alice_mgr.clone();
1642 bob_transport
1643 .expect_send_message()
1644 .once()
1645 .in_sequence(&mut sequence)
1646 .withf(move |data, peer| {
1647 msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1648 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1649 })
1650 .returning(move |data, _| {
1651 let alice_mgr_clone = alice_mgr_clone.clone();
1652 Box::pin(async move {
1653 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1654 Ok(())
1655 })
1656 });
1657
1658 let bob_mgr_clone = bob_mgr.clone();
1660 let mut alice_session_transport = MockMsgSender::new();
1661 alice_session_transport
1662 .expect_send_message()
1663 .times(5..)
1664 .withf(move |data, peer| {
1665 msg_type(data, StartProtocolDiscriminants::KeepAlive)
1666 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1667 })
1668 .returning(move |data, _| {
1669 let bob_mgr_clone = bob_mgr_clone.clone();
1670 Box::pin(async move {
1671 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1672 Ok(())
1673 })
1674 });
1675
1676 alice_transport
1678 .expect_clone()
1679 .once()
1680 .in_sequence(&mut sequence)
1681 .return_once(|| alice_session_transport);
1682
1683 let bob_mgr_clone = bob_mgr.clone();
1685 alice_transport
1686 .expect_send_message()
1687 .once()
1688 .in_sequence(&mut sequence)
1689 .withf(move |data, peer| {
1690 msg_type(data, StartProtocolDiscriminants::CloseSession)
1691 && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1692 })
1693 .returning(move |data, _| {
1694 let bob_mgr_clone = bob_mgr_clone.clone();
1695 Box::pin(async move {
1696 bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1697 Ok(())
1698 })
1699 });
1700
1701 let alice_mgr_clone = alice_mgr.clone();
1703 bob_transport
1704 .expect_send_message()
1705 .once()
1706 .in_sequence(&mut sequence)
1707 .withf(move |data, peer| {
1708 msg_type(data, StartProtocolDiscriminants::CloseSession)
1709 && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1710 })
1711 .returning(move |data, _| {
1712 let alice_mgr_clone = alice_mgr_clone.clone();
1713 Box::pin(async move {
1714 alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1715 Ok(())
1716 })
1717 });
1718
1719 let mut ahs = Vec::new();
1720
1721 let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1723 ahs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
1724
1725 let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1727 ahs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
1728
1729 let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1730
1731 pin_mut!(new_session_rx_bob);
1732 let (alice_session, bob_session) = timeout(
1733 Duration::from_secs(2),
1734 futures::future::join(
1735 alice_mgr.new_session(
1736 bob_peer,
1737 SessionTarget::TcpStream(target.clone()),
1738 SessionClientConfig {
1739 pseudonym: alice_pseudonym.into(),
1740 surb_management: Some(SurbBalancerConfig {
1741 target_surb_buffer_size: 10,
1742 max_surbs_per_sec: 100,
1743 }),
1744 ..Default::default()
1745 },
1746 ),
1747 new_session_rx_bob.next(),
1748 ),
1749 )
1750 .await?;
1751
1752 let mut alice_session = alice_session?;
1753 let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1754
1755 assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1756
1757 tokio::time::sleep(Duration::from_millis(3000)).await;
1759 alice_session.close().await?;
1760
1761 tokio::time::sleep(Duration::from_millis(300)).await;
1762 futures::stream::iter(ahs)
1763 .for_each(|ah| async move { ah.abort() })
1764 .await;
1765
1766 Ok(())
1767 }
1768}