hopr_transport_session/
manager.rs

1use std::{
2    ops::Range,
3    sync::{Arc, OnceLock, atomic::AtomicU64},
4    time::Duration,
5};
6
7use futures::{
8    FutureExt, StreamExt, TryStreamExt,
9    channel::mpsc::UnboundedSender,
10    future::{AbortHandle, Either},
11    pin_mut,
12};
13use hopr_crypto_packet::prelude::HoprPacket;
14use hopr_crypto_random::Randomizable;
15use hopr_internal_types::prelude::HoprPseudonym;
16use hopr_network_types::prelude::*;
17use hopr_primitive_types::prelude::Address;
18use hopr_transport_packet::prelude::{ApplicationData, ReservedTag, Tag};
19use tracing::{debug, error, info, trace, warn};
20
21use crate::{
22    IncomingSession, Session, SessionClientConfig, SessionId, SessionTarget,
23    balancer::{RateController, RateLimitExt, SurbBalancer, SurbFlowController},
24    errors::{SessionManagerError, TransportSessionError},
25    initiation::{StartChallenge, StartErrorReason, StartErrorType, StartEstablished, StartInitiation, StartProtocol},
26    traits::SendMsg,
27};
28
29#[cfg(all(feature = "prometheus", not(test)))]
30lazy_static::lazy_static! {
31    static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
32        "hopr_session_num_active_sessions",
33        "Number of currently active HOPR sessions"
34    ).unwrap();
35    static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
36        "hopr_session_established_sessions_count",
37        "Number of sessions that were successfully established as an Exit node"
38    ).unwrap();
39    static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
40        "hopr_session_initiated_sessions_count",
41        "Number of sessions that were successfully initiated as an Entry node"
42    ).unwrap();
43    static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
44        "hopr_session_received_error_count",
45        "Number of HOPR session errors received from an Exit node",
46        &["kind"]
47    ).unwrap();
48    static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
49        "hopr_session_sent_error_count",
50        "Number of HOPR session errors sent to an Entry node",
51        &["kind"]
52    ).unwrap();
53}
54
55/// Configuration for the [`SessionManager`].
56#[derive(Clone, Debug, PartialEq, Eq, smart_default::SmartDefault)]
57pub struct SessionManagerConfig {
58    /// Ranges of tags available for Sessions.
59    ///
60    /// **NOTE**: If the range starts lower than [`MIN_SESSION_TAG_RANGE_RESERVATION`],
61    /// it will be automatically transformed to start at this value.
62    /// This is due to the reserved range by the Start sub-protocol.
63    ///
64    /// Default is 16..1024.
65    #[doc(hidden)]
66    #[default(_code = "16u64..1024u64")]
67    pub session_tag_range: Range<u64>,
68
69    /// The base timeout for initiation of Session initiation.
70    ///
71    /// The actual timeout is adjusted according to the number of hops for that Session:
72    /// `t = initiation_time_out_base * (num_forward_hops + num_return_hops + 2)`
73    ///
74    /// Default is 5 seconds.
75    #[default(Duration::from_secs(5))]
76    pub initiation_timeout_base: Duration,
77
78    /// Timeout for Session to be closed due to inactivity.
79    ///
80    /// Default is 180 seconds.
81    #[default(Duration::from_secs(180))]
82    pub idle_timeout: Duration,
83
84    /// The sampling interval for SURB balancer.
85    /// It will make SURB control decisions regularly at this interval.
86    ///
87    /// Default is 500 milliseconds.
88    #[default(Duration::from_millis(500))]
89    pub balancer_sampling_interval: Duration,
90}
91
92fn close_session_after_eviction<S: SendMsg + Send + Sync + 'static>(
93    msg_sender: Arc<OnceLock<S>>,
94    session_id: SessionId,
95    session_data: CachedSession,
96    cause: moka::notification::RemovalCause,
97) -> moka::notification::ListenerFuture {
98    // When a Session is removed from the cache, we notify the other party only
99    // if this removal was due to expiration or cache size limit.
100    match cause {
101        r @ moka::notification::RemovalCause::Expired | r @ moka::notification::RemovalCause::Size
102            if msg_sender.get().is_some() =>
103        {
104            trace!(
105                ?session_id,
106                reason = ?r,
107                "session termination due to eviction from the cache"
108            );
109            let data = match ApplicationData::try_from(StartProtocol::CloseSession(session_id)) {
110                Ok(data) => data,
111                Err(error) => {
112                    error!(%session_id, %error, "failed to serialize CloseSession");
113                    return futures::future::ready(()).boxed();
114                }
115            };
116
117            let msg_sender = msg_sender.clone();
118            async move {
119                // Unwrap cannot panic, since the value's existence is checked on L72
120                if let Err(error) = msg_sender
121                    .get()
122                    .unwrap()
123                    .send_message(data, session_data.routing_opts)
124                    .await
125                {
126                    error!(
127                        %session_id,
128                        %error,
129                        "could not send notification of session closure after cache eviction"
130                    );
131                }
132
133                session_data.session_tx.close_channel();
134                debug!(
135                    ?session_id,
136                    reason = ?r,
137                    "session has been closed due to cache eviction"
138                );
139
140                // Terminate any additional tasks spawned by the Session
141                session_data.abort_handles.into_iter().for_each(|h| h.abort());
142
143                #[cfg(all(feature = "prometheus", not(test)))]
144                METRIC_ACTIVE_SESSIONS.decrement(1.0);
145            }
146            .boxed()
147        }
148        _ => futures::future::ready(()).boxed(),
149    }
150}
151
152/// This function will use the given generator to generate an initial seeding key.
153/// It will check whether the given cache already contains a value for that key, and if not,
154/// calls the generator (with the previous value) to generate a new seeding key and retry.
155/// The function either finds a suitable free slot, inserting the `value` and returns the found key,
156/// or terminates with `None` when `gen` returns the initial seed again.
157async fn insert_into_next_slot<K, V, F>(cache: &moka::future::Cache<K, V>, generator: F, value: V) -> Option<K>
158where
159    K: Copy + std::hash::Hash + Eq + Send + Sync + 'static,
160    V: Clone + Send + Sync + 'static,
161    F: Fn(Option<K>) -> K,
162{
163    let initial = generator(None);
164    let mut next = initial;
165    loop {
166        let insertion_result = cache
167            .entry(next)
168            .and_try_compute_with(|e| {
169                if e.is_none() {
170                    futures::future::ok::<_, ()>(moka::ops::compute::Op::Put(value.clone()))
171                } else {
172                    futures::future::ok::<_, ()>(moka::ops::compute::Op::Nop)
173                }
174            })
175            .await;
176
177        // If we inserted successfully, break the loop and return the insertion key
178        if let Ok(moka::ops::compute::CompResult::Inserted(_)) = insertion_result {
179            return Some(next);
180        }
181
182        // Otherwise, generate the next key
183        next = generator(Some(next));
184
185        // If generated keys made it to full loop, return failure
186        if next == initial {
187            return None;
188        }
189    }
190}
191
192/// The first challenge value used in Start protocol to initiate a session.
193pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
194
195/// Maximum time to wait for counterparty to receive the target amount of SURBs.
196const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
197
198// Needs to use an UnboundedSender instead of oneshot
199// because Moka cache requires the value to be Clone, which oneshot Sender is not.
200// It also cannot be enclosed in an Arc, since calling `send` consumes the oneshot Sender.
201type SessionInitiationCache =
202    moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
203
204#[derive(Clone)]
205struct CachedSession {
206    // Sender needs to be put in Arc, so that no clones are made by `moka`.
207    // This makes sure that the entire channel closes once the one and only sender is closed.
208    session_tx: Arc<UnboundedSender<Box<[u8]>>>,
209    routing_opts: DestinationRouting,
210    abort_handles: Vec<AbortHandle>,
211}
212
213/// Indicates the result of processing a message.
214#[derive(Clone, Debug, PartialEq, Eq)]
215pub enum DispatchResult {
216    /// Session or Start protocol message has been processed successfully.
217    Processed,
218    /// The message was not related to Start or Session protocol.
219    Unrelated(ApplicationData),
220}
221
222pub(crate) struct KeepAliveController(pub(crate) RateController);
223
224impl SurbFlowController for KeepAliveController {
225    fn adjust_surb_flow(&self, surbs_per_sec: usize) {
226        // Currently, a keep-alive message can bear `HoprPacket::MAX_SURBS_IN_PACKET` SURBs,
227        // so the correction by this factor is applied.
228        self.0.set_rate_per_unit(
229            surbs_per_sec,
230            HoprPacket::MAX_SURBS_IN_PACKET as u32 * Duration::from_secs(1),
231        );
232    }
233}
234
235pub(crate) struct CountingSendMsg<T>(T, Arc<AtomicU64>);
236
237impl<T: SendMsg> CountingSendMsg<T> {
238    pub fn new(msg: T, counter: Arc<AtomicU64>) -> Self {
239        Self(msg, counter)
240    }
241}
242
243#[async_trait::async_trait]
244impl<T: SendMsg + Send + Sync> SendMsg for CountingSendMsg<T> {
245    async fn send_message(
246        &self,
247        data: ApplicationData,
248        destination: DestinationRouting,
249    ) -> Result<(), TransportSessionError> {
250        let num_surbs = HoprPacket::max_surbs_with_message(data.len()) as u64;
251        self.0.send_message(data, destination).await.inspect(|_| {
252            self.1.fetch_add(num_surbs, std::sync::atomic::Ordering::Relaxed);
253        })
254    }
255}
256
257/// Manages lifecycles of Sessions.
258///
259/// Once the manager is [started](SessionManager::start), the [`SessionManager::dispatch_message`]
260/// should be called for each [`ApplicationData`] received by the node.
261/// This way, the `SessionManager` takes care of proper Start sub-protocol message processing
262/// and correct dispatch of Session-related packets to individual existing Sessions.
263///
264/// Secondly, the manager can initiate new outgoing sessions via [`SessionManager::new_session`].
265///
266/// Since the `SessionManager` operates over the HOPR protocol,
267/// the [message transport `S`](SendMsg) is required.
268/// Such transport must also be `Clone`, since it will be cloned into the created [`Session`] objects.
269///
270/// The manager also can take care of [SURB balancing](SurbBalancerConfig) per Session. When enabled,
271/// a desired target level of SURBs at the Session counterparty is set. According to measured
272/// inflow and outflow of SURBS to/from the counterparty, the production of non-organic SURBs
273/// through keep-alive messages (sent to counterparty) is controlled to maintain that target level.
274pub struct SessionManager<S> {
275    session_initiations: SessionInitiationCache,
276    session_notifiers: Arc<OnceLock<(UnboundedSender<IncomingSession>, UnboundedSender<SessionId>)>>,
277    sessions: moka::future::Cache<SessionId, CachedSession>,
278    msg_sender: Arc<OnceLock<S>>,
279    cfg: SessionManagerConfig,
280}
281
282impl<S> Clone for SessionManager<S> {
283    fn clone(&self) -> Self {
284        Self {
285            session_initiations: self.session_initiations.clone(),
286            session_notifiers: self.session_notifiers.clone(),
287            sessions: self.sessions.clone(),
288            cfg: self.cfg.clone(),
289            msg_sender: self.msg_sender.clone(),
290        }
291    }
292}
293
294fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
295    base * (hops as u32)
296}
297
298/// Smallest possible interval for balancer sampling.
299pub const MIN_BALANCER_SAMPLING_INTERVAL: Duration = Duration::from_millis(100);
300
301impl<S: SendMsg + Clone + Send + Sync + 'static> SessionManager<S> {
302    /// Creates a new instance given the `PeerId` and [config](SessionManagerConfig).
303    pub fn new(mut cfg: SessionManagerConfig) -> Self {
304        let min_session_tag_range_reservation = ReservedTag::range().end;
305        debug_assert!(
306            min_session_tag_range_reservation > StartProtocol::<SessionId>::START_PROTOCOL_MESSAGE_TAG.as_u64(),
307            "invalid tag reservation range"
308        );
309
310        // Accommodate the lower bound if too low.
311        if cfg.session_tag_range.start < min_session_tag_range_reservation {
312            let diff = min_session_tag_range_reservation - cfg.session_tag_range.start;
313            cfg.session_tag_range = min_session_tag_range_reservation..cfg.session_tag_range.end + diff;
314        }
315
316        #[cfg(all(feature = "prometheus", not(test)))]
317        METRIC_ACTIVE_SESSIONS.set(0.0);
318
319        let msg_sender = Arc::new(OnceLock::new());
320        Self {
321            msg_sender: msg_sender.clone(),
322            session_initiations: moka::future::Cache::builder()
323                .max_capacity(
324                    std::ops::Range {
325                        start: cfg.session_tag_range.start,
326                        end: cfg.session_tag_range.end,
327                    }
328                    .count() as u64,
329                )
330                .time_to_live(
331                    2 * initiation_timeout_max_one_way(
332                        cfg.initiation_timeout_base,
333                        RoutingOptions::MAX_INTERMEDIATE_HOPS,
334                    ),
335                )
336                .build(),
337            sessions: moka::future::Cache::builder()
338                .max_capacity(u16::MAX as u64)
339                .time_to_idle(cfg.idle_timeout)
340                .async_eviction_listener(move |k, v, c| {
341                    let msg_sender = msg_sender.clone();
342                    close_session_after_eviction(msg_sender, *k, v, c)
343                })
344                .build(),
345            session_notifiers: Arc::new(OnceLock::new()),
346            cfg,
347        }
348    }
349
350    /// Starts the instance with the given [transport](SendMsg) implementation
351    /// and a channel that is used to notify when a new incoming session is opened to us.
352    ///
353    /// This method must be called prior to any calls to [`SessionManager::new_session`] or
354    /// [`SessionManager::dispatch_message`].
355    pub fn start(
356        &self,
357        msg_sender: S,
358        new_session_notifier: UnboundedSender<IncomingSession>,
359    ) -> crate::errors::Result<Vec<hopr_async_runtime::AbortHandle>> {
360        self.msg_sender
361            .set(msg_sender)
362            .map_err(|_| SessionManagerError::AlreadyStarted)?;
363
364        let (session_close_tx, session_close_rx) = futures::channel::mpsc::unbounded();
365        self.session_notifiers
366            .set((new_session_notifier, session_close_tx))
367            .map_err(|_| SessionManagerError::AlreadyStarted)?;
368
369        let myself = self.clone();
370        let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable(session_close_rx.for_each_concurrent(
371            None,
372            move |closed_session_id| {
373                let myself = myself.clone();
374                async move {
375                    trace!(
376                        session_id = ?closed_session_id,
377                        "sending notification of session closure done by us"
378                    );
379                    match myself.close_session(closed_session_id, true).await {
380                        Ok(closed) if closed => debug!(
381                            session_id = ?closed_session_id,
382                            "session has been closed by us"
383                        ),
384                        Err(e) => error!(
385                            session_id = ?closed_session_id,
386                            error = %e,
387                            "cannot initiate session closure notification"
388                        ),
389                        _ => {}
390                    }
391                }
392            },
393        ));
394
395        // This is necessary to evict expired entries from the caches if
396        // no session-related operations happen at all.
397        // This ensures the dangling expired sessions are properly closed
398        // and their closure is timely notified to the other party.
399        let myself = self.clone();
400        let ah_session_expiration = hopr_async_runtime::spawn_as_abortable(async move {
401            let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
402            let timeout = 2 * initiation_timeout_max_one_way(
403                myself.cfg.initiation_timeout_base,
404                RoutingOptions::MAX_INTERMEDIATE_HOPS,
405            )
406            .min(myself.cfg.idle_timeout)
407            .mul_f64(jitter)
408                / 2;
409            futures_time::stream::interval(timeout.into())
410                .for_each(|_| {
411                    trace!("executing session cache evictions");
412                    futures::future::join(
413                        myself.sessions.run_pending_tasks(),
414                        myself.session_initiations.run_pending_tasks(),
415                    )
416                    .map(|_| ())
417                })
418                .await;
419        });
420
421        Ok(vec![ah_closure_notifications, ah_session_expiration])
422    }
423
424    /// Check if [`start`](SessionManager::start) has been called and the instance is running.
425    pub fn is_started(&self) -> bool {
426        self.session_notifiers.get().is_some()
427    }
428
429    fn spawn_keep_alive_stream(
430        &self,
431        session_id: SessionId,
432        sender: Arc<CountingSendMsg<S>>,
433        routing: DestinationRouting,
434    ) -> (KeepAliveController, AbortHandle) {
435        let elem = StartProtocol::KeepAlive(session_id.into());
436
437        // The stream is suspended until the caller sets a rate via the Controller
438        let (ka_stream, controller) = futures::stream::repeat(elem).rate_limit_per_unit(0, Duration::from_secs(1));
439
440        let (abort_handle, reg) = AbortHandle::new_pair();
441        let sender_clone = sender.clone();
442        let fwd_routing_clone = routing.clone();
443
444        // This task will automatically terminate once the returned abort handle is used.
445        debug!(%session_id, "spawning keep-alive stream");
446        hopr_async_runtime::prelude::spawn(
447            futures::stream::Abortable::new(ka_stream, reg)
448                .map(ApplicationData::try_from)
449                .try_for_each_concurrent(None, move |msg| {
450                    let sender_clone = sender_clone.clone();
451                    let fwd_routing_clone = fwd_routing_clone.clone();
452                    async move { sender_clone.send_message(msg, fwd_routing_clone).await }
453                })
454                .then(move |res| {
455                    match res {
456                        Ok(_) => debug!(%session_id, "keep-alive stream done"),
457                        Err(error) => error!(%session_id, %error, "keep-alive stream failed"),
458                    }
459                    futures::future::ready(())
460                }),
461        );
462
463        (KeepAliveController(controller), abort_handle)
464    }
465
466    /// Initiates a new outgoing Session to `destination` with the given configuration.
467    ///
468    /// If the Session's counterparty does not respond within
469    /// the [configured](SessionManagerConfig) period,
470    /// this method returns [`TransportSessionError::Timeout`].
471    ///
472    /// It will also fail if the instance has not been [started](SessionManager::start).
473    pub async fn new_session(
474        &self,
475        destination: Address,
476        target: SessionTarget,
477        cfg: SessionClientConfig,
478    ) -> crate::errors::Result<Session> {
479        let msg_sender = self.msg_sender.get().ok_or(SessionManagerError::NotStarted)?;
480
481        let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
482        let challenge = insert_into_next_slot(
483            &self.session_initiations,
484            |ch| {
485                if let Some(challenge) = ch {
486                    ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
487                } else {
488                    hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
489                }
490            },
491            tx_initiation_done,
492        )
493        .await
494        .ok_or(SessionManagerError::NoChallengeSlots)?; // almost impossible with u64
495
496        // Prepare the session initiation message in the Start protocol
497        trace!(challenge, ?cfg, "initiating session with config");
498        let start_session_msg = StartProtocol::<SessionId>::StartSession(StartInitiation {
499            challenge,
500            target,
501            capabilities: cfg.capabilities,
502        });
503
504        let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
505        let forward_routing = DestinationRouting::Forward {
506            destination,
507            pseudonym: Some(pseudonym), // Session must use a fixed pseudonym already
508            forward_options: cfg.forward_path_options.clone(),
509            return_options: cfg.return_path_options.clone().into(),
510        };
511
512        // Send the Session initiation message
513        info!(challenge, %pseudonym, %destination, "new session request");
514        msg_sender
515            .send_message(start_session_msg.try_into()?, forward_routing.clone())
516            .await?;
517
518        // Await session establishment response from the Exit node or timeout
519        pin_mut!(rx_initiation_done);
520        let initiation_done = TryStreamExt::try_next(&mut rx_initiation_done);
521
522        // The timeout is given by the number of hops requested
523        let timeout = hopr_async_runtime::prelude::sleep(initiation_timeout_max_one_way(
524            self.cfg.initiation_timeout_base,
525            cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
526        ));
527        pin_mut!(timeout);
528
529        trace!(challenge, "awaiting session establishment");
530        match futures::future::select(initiation_done, timeout).await {
531            Either::Left((Ok(Some(est)), _)) => {
532                // Session has been established, construct it
533                let session_id = est.session_id;
534                debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
535
536                let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
537                let mut abort_handles = Vec::new();
538                let notifier = self
539                    .session_notifiers
540                    .get()
541                    .map(|(_, notifier)| {
542                        let notifier = notifier.clone();
543                        Box::new(move |session_id: SessionId| {
544                            let _ = notifier
545                                .unbounded_send(session_id)
546                                .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
547                        })
548                    })
549                    .ok_or(SessionManagerError::NotStarted)?;
550
551                let session = if let Some(balancer_config) = cfg.surb_management {
552                    let surb_production_counter = Arc::new(AtomicU64::new(0));
553                    let surb_consumption_counter = Arc::new(AtomicU64::new(0));
554
555                    // Sender responsible for keep-alive and Session data is counting produced SURBs
556                    let sender = Arc::new(CountingSendMsg::new(
557                        msg_sender.clone(),
558                        surb_production_counter.clone(),
559                    ));
560
561                    // Spawn the SURB-bearing keep alive stream
562                    let (ka_controller, ka_abort_handle) =
563                        self.spawn_keep_alive_stream(session_id, sender.clone(), forward_routing.clone());
564                    abort_handles.push(ka_abort_handle);
565
566                    // Spawn the SURB balancer, which will decide on the initial SURB rate.
567                    debug!(%session_id, ?balancer_config, "spawning SURB balancer");
568                    let mut balancer = SurbBalancer::new(
569                        session_id,
570                        surb_production_counter,
571                        surb_consumption_counter.clone(),
572                        ka_controller,
573                        balancer_config,
574                    );
575
576                    let (surbs_ready_tx, surbs_ready_rx) = futures::channel::oneshot::channel();
577                    let mut surbs_ready_tx = Some(surbs_ready_tx);
578                    let (balancer_abort_handle, reg) = AbortHandle::new_pair();
579                    hopr_async_runtime::prelude::spawn(
580                        futures::stream::Abortable::new(
581                            futures_time::stream::interval(
582                                self.cfg
583                                    .balancer_sampling_interval
584                                    .max(MIN_BALANCER_SAMPLING_INTERVAL)
585                                    .into(),
586                            ),
587                            reg,
588                        )
589                        .for_each(move |_| {
590                            let level = balancer.update();
591                            // We will wait until at least half of the target buffer has been sent
592                            if surbs_ready_tx.is_some() && level >= balancer_config.target_surb_buffer_size / 2 {
593                                let _ = surbs_ready_tx.take().unwrap().send(level);
594                            }
595                            futures::future::ready(())
596                        })
597                        .then(move |_| {
598                            debug!(%session_id, "balancer done");
599                            futures::future::ready(())
600                        }),
601                    );
602                    abort_handles.push(balancer_abort_handle);
603
604                    // Wait for enough SURBs to be sent to the counterparty
605                    // TODO: consider making this interactive = other party reports the exact level periodically
606                    let timeout = hopr_async_runtime::prelude::sleep(SESSION_READINESS_TIMEOUT);
607                    pin_mut!(timeout);
608                    match futures::future::select(surbs_ready_rx, timeout).await {
609                        Either::Left((Ok(surb_level), _)) => {
610                            info!(%session_id, surb_level, "session is ready");
611                        }
612                        Either::Left((Err(_), _)) => {
613                            return Err(
614                                SessionManagerError::Other("surb balancer was cancelled prematurely".into()).into(),
615                            );
616                        }
617                        Either::Right(_) => {
618                            warn!(%session_id, "session didn't reach target SURB buffer size");
619                        }
620                    }
621
622                    Session::new(
623                        session_id,
624                        forward_routing.clone(),
625                        cfg.capabilities,
626                        sender,
627                        Box::pin(rx.inspect(move |_| {
628                            // Received packets = SURB consumption estimate
629                            surb_consumption_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
630                        })),
631                        Some(notifier),
632                    )
633                } else {
634                    warn!(%session_id, "session ready without SURB balancing");
635                    Session::new(
636                        session_id,
637                        forward_routing.clone(),
638                        cfg.capabilities,
639                        Arc::new(msg_sender.clone()),
640                        Box::pin(rx),
641                        Some(notifier),
642                    )
643                };
644
645                // We currently do not support loopback Sessions on ourselves.
646                if let moka::ops::compute::CompResult::Inserted(_) = self
647                    .sessions
648                    .entry(session_id)
649                    .and_compute_with(|entry| {
650                        futures::future::ready(if entry.is_none() {
651                            moka::ops::compute::Op::Put(CachedSession {
652                                session_tx: Arc::new(tx),
653                                routing_opts: forward_routing,
654                                abort_handles,
655                            })
656                        } else {
657                            moka::ops::compute::Op::Nop
658                        })
659                    })
660                    .await
661                {
662                    #[cfg(all(feature = "prometheus", not(test)))]
663                    {
664                        METRIC_NUM_INITIATED_SESSIONS.increment();
665                        METRIC_ACTIVE_SESSIONS.increment(1.0);
666                    }
667
668                    Ok(session)
669                } else {
670                    // Session already exists; it means it is most likely a loopback attempt
671                    error!(%session_id, "session already exists - loopback attempt");
672                    Err(SessionManagerError::Loopback.into())
673                }
674            }
675            Either::Left((Ok(None), _)) => Err(SessionManagerError::Other(
676                "internal error: sender has been closed without completing the session establishment".into(),
677            )
678            .into()),
679            Either::Left((Err(e), _)) => {
680                // The other side did not allow us to establish a session
681                error!(
682                    challenge = e.challenge,
683                    error = ?e,
684                    "the other party rejected the session initiation with error"
685                );
686                Err(TransportSessionError::Rejected(e.reason))
687            }
688            Either::Right(_) => {
689                // Timeout waiting for a session establishment
690                error!(challenge, "session initiation attempt timed out");
691
692                #[cfg(all(feature = "prometheus", not(test)))]
693                METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
694
695                Err(TransportSessionError::Timeout)
696            }
697        }
698    }
699
700    /// Sends a keep-alive packet with the given [`SessionId`].
701    ///
702    /// This currently "fires & forgets" and does not expect nor await any "pong" response.
703    pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
704        if let Some(session_data) = self.sessions.get(id).await {
705            Ok(self
706                .msg_sender
707                .get()
708                .ok_or(SessionManagerError::NotStarted)?
709                .send_message(
710                    StartProtocol::KeepAlive((*id).into()).try_into()?,
711                    session_data.routing_opts.clone(),
712                )
713                .await?)
714        } else {
715            Err(SessionManagerError::NonExistingSession.into())
716        }
717    }
718
719    /// The main method to be called whenever data are received.
720    ///
721    /// It tries to recognize the message and correctly dispatches either
722    /// the Session protocol or Start protocol messages.
723    ///
724    /// If the data are not recognized, they are returned as [`DispatchResult::Unrelated`].
725    pub async fn dispatch_message(
726        &self,
727        pseudonym: HoprPseudonym,
728        data: ApplicationData,
729    ) -> crate::errors::Result<DispatchResult> {
730        if data.application_tag == StartProtocol::<SessionId>::START_PROTOCOL_MESSAGE_TAG {
731            // This is a Start protocol message, so we handle it
732            trace!(tag = %data.application_tag, "dispatching Start protocol message");
733            return self
734                .handle_start_protocol_message(pseudonym, data)
735                .await
736                .map(|_| DispatchResult::Processed);
737        } else if self.cfg.session_tag_range.contains(&data.application_tag.as_u64()) {
738            let session_id = SessionId::new(data.application_tag, pseudonym);
739
740            return if let Some(session_data) = self.sessions.get(&session_id).await {
741                trace!(?session_id, "received data for a registered session");
742
743                Ok(session_data
744                    .session_tx
745                    .unbounded_send(data.plain_text)
746                    .map(|_| DispatchResult::Processed)
747                    .map_err(|e| SessionManagerError::Other(e.to_string()))?)
748            } else {
749                error!(%session_id, "received data from an unestablished session");
750                Err(TransportSessionError::UnknownData)
751            };
752        }
753
754        trace!(%data.application_tag, "received data not associated with session protocol or any existing session");
755        Ok(DispatchResult::Unrelated(data))
756    }
757
758    async fn handle_start_protocol_message(
759        &self,
760        pseudonym: HoprPseudonym,
761        data: ApplicationData,
762    ) -> crate::errors::Result<()> {
763        match StartProtocol::<SessionId>::try_from(data)? {
764            StartProtocol::StartSession(session_req) => {
765                trace!(challenge = session_req.challenge, "received session initiation request");
766
767                debug!(%pseudonym, "got new session request, searching for a free session slot");
768
769                let msg_sender = self.msg_sender.get().ok_or(SessionManagerError::NotStarted)?;
770
771                let (new_session_notifier, close_session_notifier) = self
772                    .session_notifiers
773                    .get()
774                    .cloned()
775                    .ok_or(SessionManagerError::NotStarted)?;
776
777                // Reply routing uses SURBs only with the pseudonym of this Session's ID
778                let reply_routing = DestinationRouting::Return(SurbMatcher::Pseudonym(pseudonym));
779
780                // Construct the session
781                let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
782                if let Some(session_id) = insert_into_next_slot(
783                    &self.sessions,
784                    |sid| {
785                        // NOTE: It is allowed to insert sessions using the same tag
786                        // but different pseudonyms because the SessionId is different.
787                        let next_tag: Tag = match sid {
788                            Some(session_id) => ((session_id.tag().as_u64() + 1) % self.cfg.session_tag_range.end)
789                                .max(self.cfg.session_tag_range.start)
790                                .into(),
791                            None => hopr_crypto_random::random_integer(
792                                self.cfg.session_tag_range.start,
793                                Some(self.cfg.session_tag_range.end),
794                            )
795                            .into(),
796                        };
797                        SessionId::new(next_tag, pseudonym)
798                    },
799                    CachedSession {
800                        session_tx: Arc::new(tx_session_data),
801                        routing_opts: reply_routing.clone(),
802                        abort_handles: vec![],
803                    },
804                )
805                .await
806                {
807                    debug!(?session_id, "assigning a new session");
808
809                    let session = Session::new(
810                        session_id,
811                        reply_routing.clone(),
812                        session_req.capabilities,
813                        Arc::new(msg_sender.clone()),
814                        Box::pin(rx_session_data),
815                        Some(Box::new(move |session_id: SessionId| {
816                            if let Err(error) = close_session_notifier.unbounded_send(session_id) {
817                                error!(%session_id, %error, "failed to notify session closure");
818                            }
819                        })),
820                    );
821
822                    // Extract useful information about the session from the Start protocol message
823                    let incoming_session = IncomingSession {
824                        session,
825                        target: session_req.target,
826                    };
827
828                    // Notify that a new incoming session has been created
829                    if let Err(error) = new_session_notifier.unbounded_send(incoming_session) {
830                        warn!(%error, "failed to send session to incoming session queue");
831                    }
832
833                    trace!(?session_id, "session notification sent");
834
835                    // Notify the sender that the session has been established.
836                    // Set our peer ID in the session ID sent back to them.
837                    let data = StartProtocol::SessionEstablished(StartEstablished {
838                        orig_challenge: session_req.challenge,
839                        session_id,
840                    });
841
842                    msg_sender
843                        .send_message(data.try_into()?, reply_routing)
844                        .await
845                        .map_err(|e| {
846                            SessionManagerError::Other(format!("failed to send session establishment message: {e}"))
847                        })?;
848
849                    info!(%session_id, "new session established");
850
851                    #[cfg(all(feature = "prometheus", not(test)))]
852                    {
853                        METRIC_NUM_ESTABLISHED_SESSIONS.increment();
854                        METRIC_ACTIVE_SESSIONS.increment(1.0);
855                    }
856                } else {
857                    error!(
858                        %pseudonym,
859                        "failed to reserve a new session slot"
860                    );
861
862                    // Notify the sender that the session could not be established
863                    let reason = StartErrorReason::NoSlotsAvailable;
864                    let data = StartProtocol::<SessionId>::SessionError(StartErrorType {
865                        challenge: session_req.challenge,
866                        reason,
867                    });
868
869                    msg_sender
870                        .send_message(data.try_into()?, reply_routing.clone())
871                        .await
872                        .map_err(|e| {
873                            SessionManagerError::Other(format!(
874                                "failed to send session establishment error message: {e}"
875                            ))
876                        })?;
877
878                    trace!(%pseudonym, "session establishment failure message sent");
879
880                    #[cfg(all(feature = "prometheus", not(test)))]
881                    METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
882                }
883            }
884            StartProtocol::SessionEstablished(est) => {
885                trace!(
886                    session_id = ?est.session_id,
887                    "received session establishment confirmation"
888                );
889                let challenge = est.orig_challenge;
890                let session_id = est.session_id;
891                if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
892                    if let Err(e) = tx_est.unbounded_send(Ok(est)) {
893                        return Err(SessionManagerError::Other(format!(
894                            "could not notify session {session_id} establishment: {e}"
895                        ))
896                        .into());
897                    }
898                    debug!(?session_id, challenge, "session establishment complete");
899                } else {
900                    error!(%session_id, challenge, "session establishment attempt expired");
901                }
902            }
903            StartProtocol::SessionError(error) => {
904                trace!(
905                    challenge = error.challenge,
906                    error = ?error.reason,
907                    "failed to initialize a session",
908                );
909                // Currently, we do not distinguish between individual error types
910                // and just discard the initiation attempt and pass on the error.
911                if let Some(tx_est) = self.session_initiations.remove(&error.challenge).await {
912                    if let Err(e) = tx_est.unbounded_send(Err(error)) {
913                        return Err(SessionManagerError::Other(format!(
914                            "could not notify session establishment error {error:?}: {e}"
915                        ))
916                        .into());
917                    }
918                    error!(
919                        challenge = error.challenge,
920                        ?error,
921                        "session establishment error received"
922                    );
923                } else {
924                    error!(
925                        challenge = error.challenge,
926                        ?error,
927                        "session establishment attempt expired before error could be delivered"
928                    );
929                }
930
931                #[cfg(all(feature = "prometheus", not(test)))]
932                METRIC_RECEIVED_SESSION_ERRS.increment(&[&error.reason.to_string()])
933            }
934            StartProtocol::CloseSession(session_id) => {
935                trace!(?session_id, "received session close request");
936                match self.close_session(session_id, false).await {
937                    Ok(closed) if closed => debug!(?session_id, "session has been closed by the other party"),
938                    Err(error) => error!(
939                        %session_id,
940                        %error,
941                        "session could not be closed on other party's request"
942                    ),
943                    _ => {}
944                }
945            }
946            StartProtocol::KeepAlive(msg) => {
947                let session_id = msg.id;
948                if self.sessions.get(&session_id).await.is_some() {
949                    trace!(?session_id, "received keep-alive request");
950                } else {
951                    error!(%session_id, "received keep-alive request for an unknown session");
952                }
953            }
954        }
955
956        Ok(())
957    }
958
959    async fn close_session(&self, session_id: SessionId, notify_closure: bool) -> crate::errors::Result<bool> {
960        if let Some(session_data) = self.sessions.remove(&session_id).await {
961            // Notification is not sent only when closing in response to the other party's request
962            if notify_closure {
963                trace!(?session_id, "sending session termination");
964                self.msg_sender
965                    .get()
966                    .ok_or(SessionManagerError::NotStarted)?
967                    .send_message(
968                        StartProtocol::CloseSession(session_id).try_into()?,
969                        session_data.routing_opts,
970                    )
971                    .await?;
972            }
973
974            // Closing the data sender on the session will cause the Session to terminate
975            session_data.session_tx.close_channel();
976            trace!(?session_id, "data tx channel closed on session");
977
978            // Terminate any additional tasks spawned by the Session
979            session_data.abort_handles.into_iter().for_each(|h| h.abort());
980
981            #[cfg(all(feature = "prometheus", not(test)))]
982            METRIC_ACTIVE_SESSIONS.decrement(1.0);
983            Ok(true)
984        } else {
985            // Do not treat this as an error
986            debug!(
987                ?session_id,
988                "could not find session id to close, maybe the session is already closed"
989            );
990            Ok(false)
991        }
992    }
993}
994
995#[cfg(test)]
996mod tests {
997    use anyhow::anyhow;
998    use futures::AsyncWriteExt;
999    use hopr_crypto_random::Randomizable;
1000    use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
1001    use hopr_primitive_types::prelude::Address;
1002    use tokio::time::timeout;
1003
1004    use super::*;
1005    use crate::{
1006        Capabilities, Capability, balancer::SurbBalancerConfig, initiation::StartProtocolDiscriminants,
1007        types::SessionTarget,
1008    };
1009
1010    mockall::mock! {
1011        MsgSender {}
1012        impl Clone for MsgSender {
1013            fn clone(&self) -> Self;
1014        }
1015        impl SendMsg for MsgSender {
1016            fn send_message<'life0, 'async_trait>
1017            (
1018                &'life0 self,
1019                data: ApplicationData,
1020                routing: DestinationRouting,
1021            )
1022            -> std::pin::Pin<Box<dyn std::future::Future<Output=std::result::Result<(),TransportSessionError>> + Send + 'async_trait>>
1023            where
1024                'life0: 'async_trait,
1025                Self: Sync + 'async_trait;
1026        }
1027    }
1028
1029    fn msg_type(data: &ApplicationData, expected: StartProtocolDiscriminants) -> bool {
1030        expected
1031            == StartProtocolDiscriminants::from(
1032                StartProtocol::<SessionId>::decode(data.application_tag, &data.plain_text)
1033                    .expect("failed to parse message"),
1034            )
1035    }
1036
1037    #[tokio::test]
1038    async fn test_insert_into_next_slot() -> anyhow::Result<()> {
1039        let cache = moka::future::Cache::new(10);
1040
1041        for i in 0..5 {
1042            let v = insert_into_next_slot(&cache, |prev| prev.map(|v| (v + 1) % 5).unwrap_or(0), "foo".to_string())
1043                .await
1044                .ok_or(anyhow!("should insert"))?;
1045            assert_eq!(v, i);
1046            assert_eq!(Some("foo".to_string()), cache.get(&i).await);
1047        }
1048
1049        assert!(
1050            insert_into_next_slot(&cache, |prev| prev.map(|v| (v + 1) % 5).unwrap_or(0), "foo".to_string())
1051                .await
1052                .is_none(),
1053            "must not find slot when full"
1054        );
1055
1056        Ok(())
1057    }
1058
1059    #[test_log::test(tokio::test)]
1060    async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1061    {
1062        let alice_pseudonym = HoprPseudonym::random();
1063        let bob_peer: Address = (&ChainKeypair::random()).into();
1064
1065        let alice_mgr = SessionManager::new(Default::default());
1066        let bob_mgr = SessionManager::new(Default::default());
1067
1068        let mut sequence = mockall::Sequence::new();
1069        let mut alice_transport = MockMsgSender::new();
1070        let mut bob_transport = MockMsgSender::new();
1071
1072        // Alice sends the StartSession message
1073        let bob_mgr_clone = bob_mgr.clone();
1074        alice_transport
1075            .expect_send_message()
1076            .once()
1077            .in_sequence(&mut sequence)
1078            .withf(move |data, peer| {
1079                msg_type(data, StartProtocolDiscriminants::StartSession)
1080                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1081            })
1082            .returning(move |data, _| {
1083                let bob_mgr_clone = bob_mgr_clone.clone();
1084                Box::pin(async move {
1085                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1086                    Ok(())
1087                })
1088            });
1089
1090        // Bob clones transport for Session
1091        bob_transport
1092            .expect_clone()
1093            .once()
1094            .in_sequence(&mut sequence)
1095            .return_once(MockMsgSender::new);
1096
1097        // Bob sends the SessionEstablished message
1098        let alice_mgr_clone = alice_mgr.clone();
1099        bob_transport
1100            .expect_send_message()
1101            .once()
1102            .in_sequence(&mut sequence)
1103            .withf(move |data, peer| {
1104                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1105                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1106            })
1107            .returning(move |data, _| {
1108                let alice_mgr_clone = alice_mgr_clone.clone();
1109
1110                Box::pin(async move {
1111                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1112                    Ok(())
1113                })
1114            });
1115
1116        // Alice clones transport for Session
1117        alice_transport
1118            .expect_clone()
1119            .once()
1120            .in_sequence(&mut sequence)
1121            .return_once(MockMsgSender::new);
1122
1123        // Alice sends the CloseSession message to initiate closure
1124        let bob_mgr_clone = bob_mgr.clone();
1125        alice_transport
1126            .expect_send_message()
1127            .once()
1128            .in_sequence(&mut sequence)
1129            .withf(move |data, peer| {
1130                msg_type(data, StartProtocolDiscriminants::CloseSession)
1131                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1132            })
1133            .returning(move |data, _| {
1134                let bob_mgr_clone = bob_mgr_clone.clone();
1135                Box::pin(async move {
1136                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1137                    Ok(())
1138                })
1139            });
1140
1141        // Bob sends the CloseSession message to confirm
1142        let alice_mgr_clone = alice_mgr.clone();
1143        bob_transport
1144            .expect_send_message()
1145            .once()
1146            .in_sequence(&mut sequence)
1147            .withf(move |data, peer| {
1148                msg_type(data, StartProtocolDiscriminants::CloseSession)
1149                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1150            })
1151            .returning(move |data, _| {
1152                let alice_mgr_clone = alice_mgr_clone.clone();
1153                Box::pin(async move {
1154                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1155                    Ok(())
1156                })
1157            });
1158
1159        let mut ahs = Vec::new();
1160
1161        // Start Alice
1162        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1163        ahs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
1164
1165        // Start Bob
1166        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1167        ahs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
1168
1169        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1170
1171        pin_mut!(new_session_rx_bob);
1172        let (alice_session, bob_session) = timeout(
1173            Duration::from_secs(2),
1174            futures::future::join(
1175                alice_mgr.new_session(
1176                    bob_peer,
1177                    SessionTarget::TcpStream(target.clone()),
1178                    SessionClientConfig {
1179                        pseudonym: alice_pseudonym.into(),
1180                        surb_management: None,
1181                        ..Default::default()
1182                    },
1183                ),
1184                new_session_rx_bob.next(),
1185            ),
1186        )
1187        .await?;
1188
1189        let mut alice_session = alice_session?;
1190        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1191
1192        assert_eq!(
1193            alice_session.capabilities(),
1194            &Capabilities::from(Capability::Segmentation)
1195        );
1196        assert_eq!(alice_session.capabilities(), bob_session.session.capabilities());
1197        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1198
1199        tokio::time::sleep(Duration::from_millis(100)).await;
1200        alice_session.close().await?;
1201
1202        tokio::time::sleep(Duration::from_millis(100)).await;
1203        futures::stream::iter(ahs)
1204            .for_each(|ah| async move { ah.abort() })
1205            .await;
1206
1207        Ok(())
1208    }
1209
1210    #[test_log::test(tokio::test)]
1211    async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1212        let alice_pseudonym = HoprPseudonym::random();
1213        let bob_peer: Address = (&ChainKeypair::random()).into();
1214
1215        let cfg = SessionManagerConfig {
1216            idle_timeout: Duration::from_millis(200),
1217            ..Default::default()
1218        };
1219
1220        let alice_mgr = SessionManager::new(cfg);
1221        let bob_mgr = SessionManager::new(Default::default());
1222
1223        let mut sequence = mockall::Sequence::new();
1224        let mut alice_transport = MockMsgSender::new();
1225        let mut bob_transport = MockMsgSender::new();
1226
1227        // Alice sends the StartSession message
1228        let bob_mgr_clone = bob_mgr.clone();
1229        alice_transport
1230            .expect_send_message()
1231            .once()
1232            .in_sequence(&mut sequence)
1233            .withf(move |data, peer| {
1234                msg_type(data, StartProtocolDiscriminants::StartSession)
1235                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1236            })
1237            .returning(move |data, _| {
1238                let bob_mgr_clone = bob_mgr_clone.clone();
1239                Box::pin(async move {
1240                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1241                    Ok(())
1242                })
1243            });
1244
1245        // Bob clones transport for Session
1246        bob_transport
1247            .expect_clone()
1248            .once()
1249            .in_sequence(&mut sequence)
1250            .return_once(MockMsgSender::new);
1251
1252        // Bob sends the SessionEstablished message
1253        let alice_mgr_clone = alice_mgr.clone();
1254        bob_transport
1255            .expect_send_message()
1256            .once()
1257            .in_sequence(&mut sequence)
1258            .withf(move |data, peer| {
1259                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1260                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1261            })
1262            .returning(move |data, _| {
1263                let alice_mgr_clone = alice_mgr_clone.clone();
1264
1265                Box::pin(async move {
1266                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1267                    Ok(())
1268                })
1269            });
1270
1271        // Alice clones transport for Session
1272        alice_transport
1273            .expect_clone()
1274            .once()
1275            .in_sequence(&mut sequence)
1276            .return_once(MockMsgSender::new);
1277
1278        // Alice sends the CloseSession message to initiate closure
1279        let bob_mgr_clone = bob_mgr.clone();
1280        alice_transport
1281            .expect_send_message()
1282            .once()
1283            .in_sequence(&mut sequence)
1284            .withf(move |data, peer| {
1285                msg_type(data, StartProtocolDiscriminants::CloseSession)
1286                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1287            })
1288            .returning(move |data, _| {
1289                let bob_mgr_clone = bob_mgr_clone.clone();
1290                Box::pin(async move {
1291                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1292                    Ok(())
1293                })
1294            });
1295
1296        // Bob sends the CloseSession message to confirm
1297        let alice_mgr_clone = alice_mgr.clone();
1298        bob_transport
1299            .expect_send_message()
1300            .once()
1301            .in_sequence(&mut sequence)
1302            .withf(move |data, peer| {
1303                msg_type(data, StartProtocolDiscriminants::CloseSession)
1304                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1305            })
1306            .returning(move |data, _| {
1307                let alice_mgr_clone = alice_mgr_clone.clone();
1308
1309                Box::pin(async move {
1310                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1311                    Ok(())
1312                })
1313            });
1314
1315        let mut ahs = Vec::new();
1316
1317        // Start Alice
1318        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1319        ahs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
1320
1321        // Start Bob
1322        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1323        ahs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
1324
1325        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1326
1327        pin_mut!(new_session_rx_bob);
1328        let (alice_session, bob_session) = timeout(
1329            Duration::from_secs(2),
1330            futures::future::join(
1331                alice_mgr.new_session(
1332                    bob_peer,
1333                    SessionTarget::TcpStream(target.clone()),
1334                    SessionClientConfig {
1335                        pseudonym: alice_pseudonym.into(),
1336                        surb_management: None,
1337                        ..Default::default()
1338                    },
1339                ),
1340                new_session_rx_bob.next(),
1341            ),
1342        )
1343        .await?;
1344
1345        let alice_session = alice_session?;
1346        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1347
1348        assert_eq!(
1349            alice_session.capabilities(),
1350            &Capabilities::from(Capability::Segmentation)
1351        );
1352        assert_eq!(alice_session.capabilities(), bob_session.session.capabilities());
1353        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1354
1355        // Let the session timeout
1356        tokio::time::sleep(Duration::from_millis(300)).await;
1357
1358        futures::stream::iter(ahs)
1359            .for_each(|ah| async move { ah.abort() })
1360            .await;
1361
1362        Ok(())
1363    }
1364
1365    #[test_log::test(tokio::test)]
1366    async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1367        let alice_pseudonym = HoprPseudonym::random();
1368        let bob_peer: Address = (&ChainKeypair::random()).into();
1369
1370        let cfg = SessionManagerConfig {
1371            session_tag_range: 16u64..17u64, // Slot for exactly one session
1372            ..Default::default()
1373        };
1374
1375        let alice_mgr = SessionManager::new(Default::default());
1376        let bob_mgr = SessionManager::new(cfg);
1377
1378        // Occupy the only free slot with tag 16
1379        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1380        bob_mgr
1381            .sessions
1382            .insert(
1383                SessionId::new(16u64, alice_pseudonym),
1384                CachedSession {
1385                    session_tx: Arc::new(dummy_tx),
1386                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1387                    abort_handles: Vec::new(),
1388                },
1389            )
1390            .await;
1391
1392        let mut sequence = mockall::Sequence::new();
1393        let mut alice_transport = MockMsgSender::new();
1394        let mut bob_transport = MockMsgSender::new();
1395
1396        // Alice sends the StartSession message
1397        let bob_mgr_clone = bob_mgr.clone();
1398        alice_transport
1399            .expect_send_message()
1400            .once()
1401            .in_sequence(&mut sequence)
1402            .withf(move |data, peer| {
1403                msg_type(data, StartProtocolDiscriminants::StartSession)
1404                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1405            })
1406            .returning(move |data, _| {
1407                let bob_mgr_clone = bob_mgr_clone.clone();
1408                Box::pin(async move {
1409                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1410                    Ok(())
1411                })
1412            });
1413
1414        // Bob sends the SessionError message
1415        let alice_mgr_clone = alice_mgr.clone();
1416        bob_transport
1417            .expect_send_message()
1418            .once()
1419            .in_sequence(&mut sequence)
1420            .withf(move |data, peer| {
1421                msg_type(data, StartProtocolDiscriminants::SessionError)
1422                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1423            })
1424            .returning(move |data, _| {
1425                let alice_mgr_clone = alice_mgr_clone.clone();
1426                Box::pin(async move {
1427                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1428                    Ok(())
1429                })
1430            });
1431
1432        let mut jhs = Vec::new();
1433
1434        // Start Alice
1435        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1436        jhs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
1437
1438        // Start Bob
1439        let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1440        jhs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
1441
1442        let result = alice_mgr
1443            .new_session(
1444                bob_peer,
1445                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1446                SessionClientConfig {
1447                    capabilities: Capabilities::empty(),
1448                    pseudonym: alice_pseudonym.into(),
1449                    surb_management: None,
1450                    ..Default::default()
1451                },
1452            )
1453            .await;
1454
1455        assert!(
1456            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1457        );
1458
1459        Ok(())
1460    }
1461
1462    #[test_log::test(tokio::test)]
1463    async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
1464        let alice_pseudonym = HoprPseudonym::random();
1465        let bob_peer: Address = (&ChainKeypair::random()).into();
1466
1467        let alice_mgr = SessionManager::new(Default::default());
1468
1469        let mut sequence = mockall::Sequence::new();
1470        let mut alice_transport = MockMsgSender::new();
1471
1472        // Alice sends the StartSession message
1473        let alice_mgr_clone = alice_mgr.clone();
1474        alice_transport
1475            .expect_send_message()
1476            .once()
1477            .in_sequence(&mut sequence)
1478            .withf(move |data, peer| {
1479                msg_type(data, StartProtocolDiscriminants::StartSession)
1480                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1481            })
1482            .returning(move |data, _| {
1483                // But the message is again processed by Alice due to Loopback
1484                let alice_mgr_clone = alice_mgr_clone.clone();
1485                Box::pin(async move {
1486                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1487                    Ok(())
1488                })
1489            });
1490
1491        // Alice clones transport for Session (as Bob)
1492        alice_transport
1493            .expect_clone()
1494            .once()
1495            .in_sequence(&mut sequence)
1496            .return_once(MockMsgSender::new);
1497
1498        // Alice sends the SessionEstablished message (as Bob)
1499        let alice_mgr_clone = alice_mgr.clone();
1500        alice_transport
1501            .expect_send_message()
1502            .once()
1503            .in_sequence(&mut sequence)
1504            .withf(move |data, peer| {
1505                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1506                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1507            })
1508            .returning(move |data, _| {
1509                let alice_mgr_clone = alice_mgr_clone.clone();
1510
1511                Box::pin(async move {
1512                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1513                    Ok(())
1514                })
1515            });
1516
1517        // Alice clones transport for Session
1518        alice_transport
1519            .expect_clone()
1520            .once()
1521            .in_sequence(&mut sequence)
1522            .return_once(MockMsgSender::new);
1523
1524        // Start Alice
1525        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1526        alice_mgr.start(alice_transport, new_session_tx_alice)?;
1527
1528        let alice_session = alice_mgr
1529            .new_session(
1530                bob_peer,
1531                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1532                SessionClientConfig {
1533                    capabilities: Capabilities::empty(),
1534                    pseudonym: alice_pseudonym.into(),
1535                    surb_management: None,
1536                    ..Default::default()
1537                },
1538            )
1539            .await;
1540
1541        println!("{:?}", alice_session);
1542        assert!(matches!(
1543            alice_session,
1544            Err(TransportSessionError::Manager(SessionManagerError::Loopback))
1545        ));
1546
1547        Ok(())
1548    }
1549
1550    #[test_log::test(tokio::test)]
1551    async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
1552        let bob_peer: Address = (&ChainKeypair::random()).into();
1553
1554        let cfg = SessionManagerConfig {
1555            initiation_timeout_base: Duration::from_millis(100),
1556            ..Default::default()
1557        };
1558
1559        let alice_mgr = SessionManager::new(cfg);
1560        let bob_mgr = SessionManager::new(Default::default());
1561
1562        let mut sequence = mockall::Sequence::new();
1563        let mut alice_transport = MockMsgSender::new();
1564        let bob_transport = MockMsgSender::new();
1565
1566        // Alice sends the StartSession message, but Bob does not handle it
1567        alice_transport
1568            .expect_send_message()
1569            .once()
1570            .in_sequence(&mut sequence)
1571            .withf(move |data, peer| {
1572                msg_type(data, StartProtocolDiscriminants::StartSession)
1573                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1574            })
1575            .returning(|_, _| Box::pin(async { Ok(()) }));
1576
1577        // Start Alice
1578        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1579        alice_mgr.start(alice_transport, new_session_tx_alice)?;
1580
1581        // Start Bob
1582        let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1583        bob_mgr.start(bob_transport, new_session_tx_bob)?;
1584
1585        let result = alice_mgr
1586            .new_session(
1587                bob_peer,
1588                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1589                SessionClientConfig {
1590                    capabilities: Capabilities::empty(),
1591                    pseudonym: None,
1592                    surb_management: None,
1593                    ..Default::default()
1594                },
1595            )
1596            .await;
1597
1598        assert!(matches!(result, Err(TransportSessionError::Timeout)));
1599
1600        Ok(())
1601    }
1602
1603    #[test_log::test(tokio::test)]
1604    async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
1605        let alice_pseudonym = HoprPseudonym::random();
1606        let bob_peer: Address = (&ChainKeypair::random()).into();
1607
1608        let alice_mgr = SessionManager::new(Default::default());
1609        let bob_mgr = SessionManager::new(Default::default());
1610
1611        let mut sequence = mockall::Sequence::new();
1612        let mut alice_transport = MockMsgSender::new();
1613        let mut bob_transport = MockMsgSender::new();
1614
1615        // Alice sends the StartSession message
1616        let bob_mgr_clone = bob_mgr.clone();
1617        alice_transport
1618            .expect_send_message()
1619            .once()
1620            .in_sequence(&mut sequence)
1621            .withf(move |data, peer| {
1622                msg_type(data, StartProtocolDiscriminants::StartSession)
1623                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1624            })
1625            .returning(move |data, _| {
1626                let bob_mgr_clone = bob_mgr_clone.clone();
1627                Box::pin(async move {
1628                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1629                    Ok(())
1630                })
1631            });
1632
1633        // Bob clones transport for Session
1634        bob_transport
1635            .expect_clone()
1636            .once()
1637            .in_sequence(&mut sequence)
1638            .return_once(MockMsgSender::new);
1639
1640        // Bob sends the SessionEstablished message
1641        let alice_mgr_clone = alice_mgr.clone();
1642        bob_transport
1643            .expect_send_message()
1644            .once()
1645            .in_sequence(&mut sequence)
1646            .withf(move |data, peer| {
1647                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1648                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1649            })
1650            .returning(move |data, _| {
1651                let alice_mgr_clone = alice_mgr_clone.clone();
1652                Box::pin(async move {
1653                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1654                    Ok(())
1655                })
1656            });
1657
1658        // On cloned SendMsg: Alice sends the KeepAlive messages
1659        let bob_mgr_clone = bob_mgr.clone();
1660        let mut alice_session_transport = MockMsgSender::new();
1661        alice_session_transport
1662            .expect_send_message()
1663            .times(5..)
1664            .withf(move |data, peer| {
1665                msg_type(data, StartProtocolDiscriminants::KeepAlive)
1666                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1667            })
1668            .returning(move |data, _| {
1669                let bob_mgr_clone = bob_mgr_clone.clone();
1670                Box::pin(async move {
1671                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1672                    Ok(())
1673                })
1674            });
1675
1676        // Alice clones transport for Session
1677        alice_transport
1678            .expect_clone()
1679            .once()
1680            .in_sequence(&mut sequence)
1681            .return_once(|| alice_session_transport);
1682
1683        // Alice sends the CloseSession message to initiate closure
1684        let bob_mgr_clone = bob_mgr.clone();
1685        alice_transport
1686            .expect_send_message()
1687            .once()
1688            .in_sequence(&mut sequence)
1689            .withf(move |data, peer| {
1690                msg_type(data, StartProtocolDiscriminants::CloseSession)
1691                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1692            })
1693            .returning(move |data, _| {
1694                let bob_mgr_clone = bob_mgr_clone.clone();
1695                Box::pin(async move {
1696                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1697                    Ok(())
1698                })
1699            });
1700
1701        // Bob sends the CloseSession message to confirm
1702        let alice_mgr_clone = alice_mgr.clone();
1703        bob_transport
1704            .expect_send_message()
1705            .once()
1706            .in_sequence(&mut sequence)
1707            .withf(move |data, peer| {
1708                msg_type(data, StartProtocolDiscriminants::CloseSession)
1709                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1710            })
1711            .returning(move |data, _| {
1712                let alice_mgr_clone = alice_mgr_clone.clone();
1713                Box::pin(async move {
1714                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1715                    Ok(())
1716                })
1717            });
1718
1719        let mut ahs = Vec::new();
1720
1721        // Start Alice
1722        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1723        ahs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
1724
1725        // Start Bob
1726        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1727        ahs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
1728
1729        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1730
1731        pin_mut!(new_session_rx_bob);
1732        let (alice_session, bob_session) = timeout(
1733            Duration::from_secs(2),
1734            futures::future::join(
1735                alice_mgr.new_session(
1736                    bob_peer,
1737                    SessionTarget::TcpStream(target.clone()),
1738                    SessionClientConfig {
1739                        pseudonym: alice_pseudonym.into(),
1740                        surb_management: Some(SurbBalancerConfig {
1741                            target_surb_buffer_size: 10,
1742                            max_surbs_per_sec: 100,
1743                        }),
1744                        ..Default::default()
1745                    },
1746                ),
1747                new_session_rx_bob.next(),
1748            ),
1749        )
1750        .await?;
1751
1752        let mut alice_session = alice_session?;
1753        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1754
1755        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1756
1757        // Let the Surb balancer send enough KeepAlive messages
1758        tokio::time::sleep(Duration::from_millis(3000)).await;
1759        alice_session.close().await?;
1760
1761        tokio::time::sleep(Duration::from_millis(300)).await;
1762        futures::stream::iter(ahs)
1763            .for_each(|ah| async move { ah.abort() })
1764            .await;
1765
1766        Ok(())
1767    }
1768}