hopr_transport_session/
manager.rs

1use std::{
2    ops::Range,
3    sync::{Arc, OnceLock},
4    time::Duration,
5};
6
7use futures::{
8    FutureExt, SinkExt, StreamExt, TryStreamExt, channel::mpsc::UnboundedSender, future::AbortHandle, pin_mut,
9};
10use futures_time::future::FutureExt as TimeExt;
11use hopr_crypto_random::Randomizable;
12use hopr_internal_types::prelude::HoprPseudonym;
13use hopr_network_types::prelude::*;
14use hopr_primitive_types::prelude::Address;
15use hopr_protocol_app::prelude::{ApplicationData, ReservedTag, Tag};
16use hopr_protocol_start::{
17    KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished, StartInitiation,
18};
19use tracing::{debug, error, info, trace, warn};
20
21use crate::{
22    Capability, IncomingSession, Session, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
23    balancer::{
24        AtomicSurbFlowEstimator, BalancerConfigFeedback, RateController, RateLimitSinkExt, SurbBalancer,
25        SurbControllerWithCorrection,
26        pid::{PidBalancerController, PidControllerGains},
27        simple::SimpleBalancerController,
28    },
29    errors::{SessionManagerError, TransportSessionError},
30    types::{ByteCapabilities, ClosureReason, HoprStartProtocol},
31    utils,
32    utils::insert_into_next_slot,
33};
34
35#[cfg(all(feature = "prometheus", not(test)))]
36lazy_static::lazy_static! {
37    static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
38        "hopr_session_num_active_sessions",
39        "Number of currently active HOPR sessions"
40    ).unwrap();
41    static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
42        "hopr_session_established_sessions_count",
43        "Number of sessions that were successfully established as an Exit node"
44    ).unwrap();
45    static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
46        "hopr_session_initiated_sessions_count",
47        "Number of sessions that were successfully initiated as an Entry node"
48    ).unwrap();
49    static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
50        "hopr_session_received_error_count",
51        "Number of HOPR session errors received from an Exit node",
52        &["kind"]
53    ).unwrap();
54    static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
55        "hopr_session_sent_error_count",
56        "Number of HOPR session errors sent to an Entry node",
57        &["kind"]
58    ).unwrap();
59}
60
61fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
62    debug!(?session_id, ?reason, "closing session");
63
64    if reason != ClosureReason::EmptyRead {
65        // Closing the data sender will also cause it to close from the read side
66        session_data.session_tx.close_channel();
67        trace!(?session_id, "data tx channel closed on session");
68    }
69
70    // Terminate any additional tasks spawned by the Session
71    session_data.abort_handles.into_iter().for_each(|h| h.abort());
72
73    #[cfg(all(feature = "prometheus", not(test)))]
74    METRIC_ACTIVE_SESSIONS.decrement(1.0);
75}
76
77fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
78    base * (hops as u32)
79}
80
81/// Minimum time the SURB buffer must endure if no SURBs are being produced.
82pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
83
84/// The first challenge value used in Start protocol to initiate a session.
85pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
86
87/// Maximum time to wait for counterparty to receive the target number of SURBs.
88const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
89
90// Needs to use an UnboundedSender instead of oneshot
91// because Moka cache requires the value to be Clone, which oneshot Sender is not.
92// It also cannot be enclosed in an Arc, since calling `send` consumes the oneshot Sender.
93type SessionInitiationCache =
94    moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
95
96#[derive(Clone)]
97struct SessionSlot {
98    // Sender needs to be put in Arc, so that no clones are made by `moka`.
99    // This makes sure that the entire channel closes once the one and only sender is closed.
100    session_tx: Arc<UnboundedSender<Box<[u8]>>>,
101    routing_opts: DestinationRouting,
102    abort_handles: Vec<AbortHandle>,
103    // Allows reconfiguring of the SURB balancer on-the-fly
104    // Set on both Entry and Exit sides.
105    surb_mgmt: Option<SurbBalancerConfig>,
106    // Set only on the Exit side.
107    surb_estimator: Option<AtomicSurbFlowEstimator>,
108}
109
110/// Allows forward and backward propagation of SURB management configuration changes.
111struct SessionCacheBalancerFeedback(moka::future::Cache<SessionId, SessionSlot>);
112
113#[async_trait::async_trait]
114impl BalancerConfigFeedback for SessionCacheBalancerFeedback {
115    async fn get_config(&self, id: &SessionId) -> crate::errors::Result<SurbBalancerConfig> {
116        self.0
117            .get(id)
118            .await
119            .ok_or(SessionManagerError::NonExistingSession)?
120            .surb_mgmt
121            .ok_or(SessionManagerError::Other("missing surb balancer config".into()).into())
122    }
123
124    async fn on_config_update(&self, id: &SessionId, cfg: SurbBalancerConfig) -> crate::errors::Result<()> {
125        if let moka::ops::compute::CompResult::ReplacedWith(_) = self
126            .0
127            .entry_by_ref(id)
128            .and_compute_with(|entry| {
129                futures::future::ready(match entry.map(|e| e.into_value()) {
130                    None => moka::ops::compute::Op::Nop,
131                    Some(mut updated_slot) => {
132                        if let Some(balancer_cfg) = &mut updated_slot.surb_mgmt {
133                            *balancer_cfg = cfg;
134                            moka::ops::compute::Op::Put(updated_slot)
135                        } else {
136                            moka::ops::compute::Op::Nop
137                        }
138                    }
139                })
140            })
141            .await
142        {
143            Ok(())
144        } else {
145            Err(SessionManagerError::NonExistingSession.into())
146        }
147    }
148}
149
150/// Indicates the result of processing a message.
151#[derive(Clone, Debug, PartialEq, Eq)]
152pub enum DispatchResult {
153    /// Session or Start protocol message has been processed successfully.
154    Processed,
155    /// The message was not related to Start or Session protocol.
156    Unrelated(ApplicationData),
157}
158
159/// Incoming session notifier and session closure notifier
160type SessionNotifiers = (
161    UnboundedSender<IncomingSession>,
162    UnboundedSender<(SessionId, ClosureReason)>,
163);
164
165/// Configuration for the [`SessionManager`].
166#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
167pub struct SessionManagerConfig {
168    /// Ranges of tags available for Sessions.
169    ///
170    /// **NOTE**: If the range starts lower than [`ReservedTag`]'s range end,
171    /// it will be automatically transformed to start at this value.
172    /// This is due to the reserved range by the Start sub-protocol.
173    ///
174    /// Default is 16..1024.
175    #[doc(hidden)]
176    #[default(_code = "16u64..1024u64")]
177    pub session_tag_range: Range<u64>,
178
179    /// The maximum number of sessions (incoming and outgoing) that is allowed
180    /// to be managed by the manager.
181    ///
182    /// When reached, creating [new sessions](SessionManager::new_session) will return
183    /// the [`SessionManagerError::TooManySessions`] error, and incoming sessions will be rejected
184    /// with [`StartErrorReason::NoSlotsAvailable`] Start protocol error.
185    ///
186    /// Default is 128, minimum is 1; maximum is given by the `session_tag_range`.
187    #[default(128)]
188    pub maximum_sessions: usize,
189
190    /// The base timeout for initiation of Session initiation.
191    ///
192    /// The actual timeout is adjusted according to the number of hops for that Session:
193    /// `t = initiation_time_out_base * (num_forward_hops + num_return_hops + 2)`
194    ///
195    /// Default is 500 milliseconds.
196    #[default(Duration::from_millis(500))]
197    pub initiation_timeout_base: Duration,
198
199    /// Timeout for Session to be closed due to inactivity.
200    ///
201    /// Default is 180 seconds.
202    #[default(Duration::from_secs(180))]
203    pub idle_timeout: Duration,
204
205    /// The sampling interval for SURB balancer.
206    /// It will make SURB control decisions regularly at this interval.
207    ///
208    /// Default is 100 milliseconds.
209    #[default(Duration::from_millis(100))]
210    pub balancer_sampling_interval: Duration,
211
212    /// Initial packets per second egress rate on an incoming Session.
213    ///
214    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
215    ///
216    /// Default is 10 packets/second.
217    #[default(10)]
218    pub initial_return_session_egress_rate: usize,
219    /// Minimum period of time for which a SURB buffer at the Exit must
220    /// endure if no SURBs are being received.
221    ///
222    /// In other words, it is the minimum period of time an Exit must withstand when
223    /// no SURBs are received from the Entry at all. To do so, the egress traffic
224    /// will be shaped accordingly to meet this requirement.
225    ///
226    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
227    ///
228    /// Default is 5 seconds, minimum is 1 second.
229    #[default(Duration::from_secs(5))]
230    pub minimum_surb_buffer_duration: Duration,
231    /// Indicates the maximum number of SURBs in the SURB buffer to be requested when creating a new Session.
232    ///
233    /// This value is theoretically capped by the size of the global transport SURB ring buffer,
234    /// so values greater than that do not make sense. This value should be ideally set equal
235    /// to the size of the global transport SURB RB.
236    ///
237    /// Default is 10 000 SURBs.
238    #[default(10_000)]
239    pub maximum_surb_buffer_size: usize,
240
241    /// Determines if the `target_surb_buffer_size` can grow if the simple moving average of the
242    /// current SURB buffer size estimate surpasses the given threshold over the given period.
243    ///
244    /// This only applies if the used [`SurbBalancerController`] has this option.
245    ///
246    /// The default is `(60, 0.20)` (20% growth if the average SURB buffer size surpasses 20%
247    /// of the target buffer size in a 60-second window).
248    #[default(_code = "Some((Duration::from_secs(60), 0.20))")]
249    pub growable_target_surb_buffer: Option<(Duration, f64)>,
250}
251
252/// Manages lifecycles of Sessions.
253///
254/// Once the manager is [started](SessionManager::start), the [`SessionManager::dispatch_message`]
255/// should be called for each [`ApplicationData`] received by the node.
256/// This way, the `SessionManager` takes care of proper Start sub-protocol message processing
257/// and correct dispatch of Session-related packets to individual existing Sessions.
258///
259/// Secondly, the manager can initiate new outgoing sessions via [`SessionManager::new_session`],
260/// probe sessions using [`SessionManager::ping_session`]
261/// and list them via [SessionManager::active_sessions].
262///
263/// Since the `SessionManager` operates over the HOPR protocol,
264/// the message transport `S` is required.
265/// Such transport must also be `Clone`, since it will be cloned into all the created [`Session`] objects.
266///
267/// ## SURB balancing
268/// The manager also can take care of automatic [SURB balancing](SurbBalancerConfig) per Session.
269///
270/// With each packet sent from the session initiator over to the receiving party, zero to 2 SURBs might be delivered.
271/// When the receiving party wants to send reply packets back, it must consume 1 SURB per packet. This
272/// means that if the difference between the SURBs delivered and SURBs consumed is negative, the receiving party
273/// might soon run out of SURBs. If SURBs run out, the reply packets will be dropped, causing likely quality of
274/// service degradation.
275///
276/// In an attempt to counter this effect, there are two co-existing automated modes of SURB balancing:
277/// *local SURB balancing* and *remote SURB balancing*.
278///
279/// ### Local SURB balancing
280/// Local SURB balancing is performed on the sessions that were initiated by another party (and are
281/// therefore incoming to us).
282/// The local SURB balancing mechanism continuously evaluates the rate of SURB consumption and retrieval,
283/// and if SURBs are running out, the packet egress shaping takes effect. This by itself does not
284/// avoid the depletion of SURBs but slows it down in the hope that the initiating party can deliver
285/// more SURBs over time. This might happen either organically by sending effective payloads that
286/// allow non-zero number of SURBs in the packet, or non-organically by delivering KeepAlive messages
287/// via *remote SURB balancing*.
288///
289/// The egress shaping is done automatically, unless the Session initiator sets the [`Capability::NoRateControl`]
290/// flag during Session initiation.
291///
292/// ### Remote SURB balancing
293/// Remote SURB balancing is performed by the Session initiator. The SURB balancer estimates the number of SURBs
294/// delivered to the other party, and also the number of SURBs consumed by seeing the amount of traffic received
295/// in replies.
296/// When enabled, a desired target level of SURBs at the Session counterparty is set. According to measured
297/// inflow and outflow of SURBs to/from the counterparty, the production of non-organic SURBs is started
298/// via keep-alive messages (sent to counterparty) and is controlled to maintain that target level.
299///
300/// In other words, the Session initiator tries to compensate for the usage of SURBs by the counterparty by
301/// sending new ones via the keep-alive messages.
302///
303/// This mechanism is configurable via the `surb_management` field in [`SessionClientConfig`].
304///
305/// ### Possible scenarios
306/// There are 4 different scenarios of local vs. remote SURB balancing configuration, but
307/// an equilibrium (= matching the SURB production and consumption) is most likely to be reached
308/// only when both are configured (the ideal case below):
309///
310/// #### 1. Ideal local and remote SURB balancing
311/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
312///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
313/// 2. The Session initiator (Entry) sets the [`target_surb_buffer_size`](SurbBalancerConfig) which matches the
314///    [`maximum_surb_buffer_size`](SessionManagerConfig) of the counterparty.
315/// 3. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
316///    Session.
317/// 4. The Session initiator (Entry) sets [`max_surbs_per_sec`](SurbBalancerConfig) slightly higher than the
318///    `maximum_surb_buffer_size / max_surb_buffer_duration` value configured at the counterparty.
319///
320/// In this situation, the maximum Session egress from Exit to the Entry is given by the
321/// `maximum_surb_buffer_size / max_surb_buffer_duration` ratio. If there is enough bandwidth,
322/// the (remote) SURB balancer sending SURBs to the Exit will stabilize roughly at this rate of SURBs/sec,
323/// and the whole system will be in equilibrium during the Session's lifetime (under ideal network conditions).
324///
325/// #### 2. Remote SURB balancing only
326/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
327/// 2. The Session initiator (Entry) sets `max_surbs_per_sec` and `target_surb_buffer_size` values in
328///    [`SurbBalancerConfig`]
329///
330/// In this one-sided situation, the Entry node floods the Exit node with SURBs,
331/// only based on its estimated consumption of SURBs at the Exit. The Exit's egress is not
332/// rate-limited at all. If the Exit runs out of SURBs at any point in time, it will simply drop egress packets.
333///
334/// This configuration could potentially only lead to an equilibrium
335/// when the `SurbBalancer` at the Entry can react fast enough to Exit's demand.
336///
337/// #### 3. Local SURB balancing only
338/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
339///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
340/// 2. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
341///    Session.
342/// 3. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
343///
344/// In this one-sided situation, the Entry node does not provide any additional SURBs at all (except the
345/// ones which are naturally carried by the egress packets which have space to hold SURBs). It relies
346/// only on the Session egress limiting of the Exit node.
347/// The Exit will limit the egress roughly to the rate of natural SURB occurrence in the ingress.
348///
349/// This configuration could potentially only lead to an equilibrium when uploading non-full packets
350/// (ones that can carry at least a single SURB), and the Exit's egress is limiting itself to such a rate.
351/// If Exit's egress reaches low values due to SURB scarcity, the upper layer protocols over Session might break.
352///
353/// #### 4. No SURB balancing on each side
354/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
355/// 2. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
356///
357/// In this situation, no additional SURBs are being produced by the Entry and no Session egress rate-limiting
358/// takes place at the Exit.
359///
360/// This configuration can only lead to an equilibrium when Entry sends non-full packets (ones that carry
361/// at least a single SURB) and the Exit is consuming the SURBs (Session egress) at a slower or equal rate.
362/// Such configuration is very fragile, as any disturbances in the SURB flow might lead to a packet drop
363/// at the Exit's egress.
364///
365/// ### SURB decay
366/// In a hypothetical scenario of a non-zero packet loss, the Session initiator (Entry) might send a
367/// certain number of SURBs to the Session recipient (Exit), but only a portion of it is actually delivered.
368/// The Entry has no way of knowing that and assumes that everything has been delivered.
369/// A similar problem happens when the Exit uses SURBs to construct return packets, but only a portion
370/// of those packets is actually delivered to the Entry. At this point, the Entry also subtracts
371/// fewer SURBs from its SURB estimate at the Exit.
372///
373/// In both situations, the Entry thinks there are more SURBs available at the Exit than there really are.
374///
375/// To compensate for a potential packet loss, the Entry's estimation of Exit's SURB buffer is regularly
376/// diminished by a percentage of the `target_surb_buffer_size`, even if no incoming traffic from the
377/// Exit is detected.
378///
379/// This behavior can be controlled via the `surb_decay` field of [`SurbBalancerConfig`].
380///
381/// ### Automatic `target_surb_buffer_size` increase
382/// This mechanism only applies to the Session recipient (Exit) and on Sessions without the
383/// [`Capability::NoRateControl`] flag set.
384/// In this case, the Exit throttles the Session egress based on the ratio between
385/// of the estimated SURB balance and the *hint* of Entry's `target_surb_buffer_size` set during the Session initiation.
386///
387/// However, as the Entry might [increase](SessionManager::update_surb_balancer_config) the `target_surb_buffer_size`
388/// of the Session dynamically, the new value is never hinted again to the Exit (this only happens once during
389/// the Session initiation).
390/// For this reason, the Exit then might observe the ratio going higher than 1. When this happens consistently
391/// over some given time period, the Exit can decide to increase the initial hint to the newly observed value.
392///
393/// See the `growable_target_surb_buffer` field in the [`SessionManagerConfig`] for details.
394pub struct SessionManager<S> {
395    session_initiations: SessionInitiationCache,
396    session_notifiers: Arc<OnceLock<SessionNotifiers>>,
397    sessions: moka::future::Cache<SessionId, SessionSlot>,
398    msg_sender: Arc<OnceLock<S>>,
399    cfg: SessionManagerConfig,
400}
401
402impl<S> Clone for SessionManager<S> {
403    fn clone(&self) -> Self {
404        Self {
405            session_initiations: self.session_initiations.clone(),
406            session_notifiers: self.session_notifiers.clone(),
407            sessions: self.sessions.clone(),
408            cfg: self.cfg.clone(),
409            msg_sender: self.msg_sender.clone(),
410        }
411    }
412}
413
414impl<S> SessionManager<S>
415where
416    S: futures::Sink<(DestinationRouting, ApplicationData)> + Clone + Send + Sync + Unpin + 'static,
417    S::Error: std::error::Error + Send + Sync + Clone + 'static,
418{
419    /// Creates a new instance given the [`config`](SessionManagerConfig).
420    pub fn new(mut cfg: SessionManagerConfig) -> Self {
421        let min_session_tag_range_reservation = ReservedTag::range().end;
422        debug_assert!(
423            min_session_tag_range_reservation > HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG.as_u64(),
424            "invalid tag reservation range"
425        );
426
427        // Accommodate the lower bound if too low.
428        if cfg.session_tag_range.start < min_session_tag_range_reservation {
429            let diff = min_session_tag_range_reservation - cfg.session_tag_range.start;
430            cfg.session_tag_range = min_session_tag_range_reservation..cfg.session_tag_range.end + diff;
431        }
432        cfg.maximum_sessions = cfg
433            .maximum_sessions
434            .clamp(1, (cfg.session_tag_range.end - cfg.session_tag_range.start) as usize);
435
436        #[cfg(all(feature = "prometheus", not(test)))]
437        METRIC_ACTIVE_SESSIONS.set(0.0);
438
439        let msg_sender = Arc::new(OnceLock::new());
440        Self {
441            msg_sender: msg_sender.clone(),
442            session_initiations: moka::future::Cache::builder()
443                .max_capacity(cfg.maximum_sessions as u64)
444                .time_to_live(
445                    2 * initiation_timeout_max_one_way(
446                        cfg.initiation_timeout_base,
447                        RoutingOptions::MAX_INTERMEDIATE_HOPS,
448                    ),
449                )
450                .build(),
451            sessions: moka::future::Cache::builder()
452                .max_capacity(cfg.maximum_sessions as u64)
453                .time_to_idle(cfg.idle_timeout)
454                .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
455                    moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
456                        trace!(?session_id, ?reason, "session evicted from the cache");
457                        close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
458                    }
459                    _ => {}
460                })
461                .build(),
462            session_notifiers: Arc::new(OnceLock::new()),
463            cfg,
464        }
465    }
466
467    /// Starts the instance with the given `msg_sender` `Sink`
468    /// and a channel `new_session_notifier` used to notify when a new incoming session is opened to us.
469    ///
470    /// This method must be called prior to any calls to [`SessionManager::new_session`] or
471    /// [`SessionManager::dispatch_message`].
472    pub fn start(
473        &self,
474        msg_sender: S,
475        new_session_notifier: UnboundedSender<IncomingSession>,
476    ) -> crate::errors::Result<Vec<AbortHandle>> {
477        self.msg_sender
478            .set(msg_sender)
479            .map_err(|_| SessionManagerError::AlreadyStarted)?;
480
481        let (session_close_tx, session_close_rx) = futures::channel::mpsc::unbounded();
482        self.session_notifiers
483            .set((new_session_notifier, session_close_tx))
484            .map_err(|_| SessionManagerError::AlreadyStarted)?;
485
486        let myself = self.clone();
487        let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable!(session_close_rx.for_each_concurrent(
488            None,
489            move |(session_id, closure_reason)| {
490                let myself = myself.clone();
491                async move {
492                    // These notifications come from the Sessions themselves once
493                    // an empty read is encountered, which means the closure was done by the
494                    // other party.
495                    if let Some(session_data) = myself.sessions.remove(&session_id).await {
496                        close_session(session_id, session_data, closure_reason);
497                    } else {
498                        // Do not treat this as an error
499                        debug!(
500                            ?session_id,
501                            ?closure_reason,
502                            "could not find session id to close, maybe the session is already closed"
503                        );
504                    }
505                }
506            },
507        ));
508
509        // This is necessary to evict expired entries from the caches if
510        // no session-related operations happen at all.
511        // This ensures the dangling expired sessions are properly closed
512        // and their closure is timely notified to the other party.
513        let myself = self.clone();
514        let ah_session_expiration = hopr_async_runtime::spawn_as_abortable!(async move {
515            let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
516            let timeout = 2 * initiation_timeout_max_one_way(
517                myself.cfg.initiation_timeout_base,
518                RoutingOptions::MAX_INTERMEDIATE_HOPS,
519            )
520            .min(myself.cfg.idle_timeout)
521            .mul_f64(jitter)
522                / 2;
523            futures_time::stream::interval(timeout.into())
524                .for_each(|_| {
525                    trace!("executing session cache evictions");
526                    futures::future::join(
527                        myself.sessions.run_pending_tasks(),
528                        myself.session_initiations.run_pending_tasks(),
529                    )
530                    .map(|_| ())
531                })
532                .await;
533        });
534
535        Ok(vec![ah_closure_notifications, ah_session_expiration])
536    }
537
538    /// Check if [`start`](SessionManager::start) has been called and the instance is running.
539    pub fn is_started(&self) -> bool {
540        self.session_notifiers.get().is_some()
541    }
542
543    async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
544        // We currently do not support loopback Sessions on ourselves.
545        let abort_handles_clone = slot.abort_handles.clone();
546        if let moka::ops::compute::CompResult::Inserted(_) = self
547            .sessions
548            .entry(session_id)
549            .and_compute_with(|entry| {
550                futures::future::ready(if entry.is_none() {
551                    moka::ops::compute::Op::Put(slot)
552                } else {
553                    moka::ops::compute::Op::Nop
554                })
555            })
556            .await
557        {
558            #[cfg(all(feature = "prometheus", not(test)))]
559            {
560                METRIC_NUM_INITIATED_SESSIONS.increment();
561                METRIC_ACTIVE_SESSIONS.increment(1.0);
562            }
563
564            Ok(())
565        } else {
566            // Session already exists; it means it is most likely a loopback attempt
567            error!(%session_id, "session already exists - loopback attempt");
568            abort_handles_clone.into_iter().for_each(|ah| ah.abort());
569            Err(SessionManagerError::Loopback.into())
570        }
571    }
572
573    /// Initiates a new outgoing Session to `destination` with the given configuration.
574    ///
575    /// If the Session's counterparty does not respond within
576    /// the [configured](SessionManagerConfig) period,
577    /// this method returns [`TransportSessionError::Timeout`].
578    ///
579    /// It will also fail if the instance has not been [started](SessionManager::start).
580    pub async fn new_session(
581        &self,
582        destination: Address,
583        target: SessionTarget,
584        cfg: SessionClientConfig,
585    ) -> crate::errors::Result<Session> {
586        self.sessions.run_pending_tasks().await;
587        if self.cfg.maximum_sessions <= self.sessions.entry_count() as usize {
588            return Err(SessionManagerError::TooManySessions.into());
589        }
590
591        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
592
593        let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
594        let challenge = insert_into_next_slot(
595            &self.session_initiations,
596            |ch| {
597                if let Some(challenge) = ch {
598                    ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
599                } else {
600                    hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
601                }
602            },
603            tx_initiation_done,
604        )
605        .await
606        .ok_or(SessionManagerError::NoChallengeSlots)?; // almost impossible with u64
607
608        // Prepare the session initiation message in the Start protocol
609        trace!(challenge, ?cfg, "initiating session with config");
610        let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
611            challenge,
612            target,
613            capabilities: ByteCapabilities(cfg.capabilities),
614            additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
615                cfg.surb_management
616                    .map(|c| c.target_surb_buffer_size)
617                    .unwrap_or(
618                        self.cfg.initial_return_session_egress_rate as u64
619                            * self
620                                .cfg
621                                .minimum_surb_buffer_duration
622                                .max(MIN_SURB_BUFFER_DURATION)
623                                .as_secs(),
624                    )
625                    .min(u32::MAX as u64) as u32
626            } else {
627                0
628            },
629        });
630
631        let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
632        let forward_routing = DestinationRouting::Forward {
633            destination,
634            pseudonym: Some(pseudonym), // Session must use a fixed pseudonym already
635            forward_options: cfg.forward_path_options.clone(),
636            return_options: cfg.return_path_options.clone().into(),
637        };
638
639        // Send the Session initiation message
640        info!(challenge, %pseudonym, %destination, "new session request");
641        msg_sender
642            .send((forward_routing.clone(), start_session_msg.try_into()?))
643            .await
644            .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?;
645
646        // The timeout is given by the number of hops requested
647        let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
648            self.cfg.initiation_timeout_base,
649            cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
650        )
651        .into();
652
653        // Await session establishment response from the Exit node or timeout
654        pin_mut!(rx_initiation_done);
655
656        trace!(challenge, "awaiting session establishment");
657        match rx_initiation_done.try_next().timeout(initiation_timeout).await {
658            Ok(Ok(Some(est))) => {
659                // Session has been established, construct it
660                let session_id = est.session_id;
661                debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
662
663                let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
664                let notifier = self
665                    .session_notifiers
666                    .get()
667                    .map(|(_, notifier)| {
668                        let notifier = notifier.clone();
669                        Box::new(move |session_id: SessionId, reason: ClosureReason| {
670                            let _ = notifier
671                                .unbounded_send((session_id, reason))
672                                .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
673                        })
674                    })
675                    .ok_or(SessionManagerError::NotStarted)?;
676
677                // NOTE: the Exit node can have different `max_surb_buffer_size`
678                // setting on the Session manager, so it does not make sense to cap it here
679                // with our maximum value.
680                if let Some(balancer_config) = cfg.surb_management {
681                    let surb_estimator = AtomicSurbFlowEstimator::default();
682
683                    // Sender responsible for keep-alive and Session data will be counting produced SURBs
684                    let surb_estimator_clone = surb_estimator.clone();
685                    let scoring_sender =
686                        msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationData)| {
687                            // Count how many SURBs we sent with each packet
688                            surb_estimator_clone.produced.fetch_add(
689                                ApplicationData::estimate_surbs_with_msg(&data.plain_text) as u64,
690                                std::sync::atomic::Ordering::Relaxed,
691                            );
692                            futures::future::ok::<_, S::Error>((routing, data))
693                        });
694
695                    let mut abort_handles = Vec::new();
696
697                    // Spawn the SURB-bearing keep alive stream
698                    let (ka_controller, ka_abort_handle) =
699                        utils::spawn_keep_alive_stream(session_id, scoring_sender.clone(), forward_routing.clone());
700                    abort_handles.push(ka_abort_handle);
701
702                    // Spawn the SURB balancer, which will decide on the initial SURB rate.
703                    debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
704                    let balancer = SurbBalancer::new(
705                        session_id,
706                        // The setpoint and output limit is immediately reconfigured by SurbBalancer
707                        PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
708                        surb_estimator.clone(),
709                        ka_controller,
710                        balancer_config,
711                    );
712
713                    let (level_stream, balancer_abort_handle) = balancer.start_control_loop(
714                        self.cfg.balancer_sampling_interval,
715                        SessionCacheBalancerFeedback(self.sessions.clone()),
716                        None,
717                    );
718                    abort_handles.push(balancer_abort_handle);
719
720                    // If the insertion fails prematurely, it will also kill all the abort handles
721                    self.insert_session_slot(
722                        session_id,
723                        SessionSlot {
724                            session_tx: Arc::new(tx),
725                            routing_opts: forward_routing.clone(),
726                            abort_handles,
727                            surb_mgmt: Some(balancer_config),
728                            surb_estimator: None,
729                        },
730                    )
731                    .await?;
732
733                    // Wait for enough SURBs to be sent to the counterparty
734                    // TODO: consider making this interactive = other party reports the exact level periodically
735                    match level_stream
736                        .skip_while(|current_level| {
737                            futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
738                        })
739                        .next()
740                        .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
741                        .await
742                    {
743                        Ok(Some(surb_level)) => {
744                            info!(%session_id, surb_level, "session is ready");
745                        }
746                        Ok(None) => {
747                            return Err(
748                                SessionManagerError::Other("surb balancer was cancelled prematurely".into()).into(),
749                            );
750                        }
751                        Err(_) => {
752                            warn!(%session_id, "session didn't reach target SURB buffer size in time");
753                        }
754                    }
755
756                    Session::new(
757                        session_id,
758                        forward_routing,
759                        cfg.capabilities,
760                        (
761                            scoring_sender,
762                            rx.inspect(move |_| {
763                                // Received packets = SURB consumption estimate
764                                surb_estimator
765                                    .consumed
766                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
767                            }),
768                        ),
769                        Some(notifier),
770                    )
771                } else {
772                    warn!(%session_id, "session ready without SURB balancing");
773
774                    self.insert_session_slot(
775                        session_id,
776                        SessionSlot {
777                            session_tx: Arc::new(tx),
778                            routing_opts: forward_routing.clone(),
779                            abort_handles: vec![],
780                            surb_mgmt: None,
781                            surb_estimator: None,
782                        },
783                    )
784                    .await?;
785
786                    Session::new(
787                        session_id,
788                        forward_routing,
789                        cfg.capabilities,
790                        (msg_sender, rx),
791                        Some(notifier),
792                    )
793                }
794            }
795            Ok(Ok(None)) => Err(SessionManagerError::Other(
796                "internal error: sender has been closed without completing the session establishment".into(),
797            )
798            .into()),
799            Ok(Err(e)) => {
800                // The other side did not allow us to establish a session
801                error!(
802                    challenge = e.challenge,
803                    error = ?e,
804                    "the other party rejected the session initiation with error"
805                );
806                Err(TransportSessionError::Rejected(e.reason))
807            }
808            Err(_) => {
809                // Timeout waiting for a session establishment
810                error!(challenge, "session initiation attempt timed out");
811
812                #[cfg(all(feature = "prometheus", not(test)))]
813                METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
814
815                Err(TransportSessionError::Timeout)
816            }
817        }
818    }
819
820    /// Sends a keep-alive packet with the given [`SessionId`].
821    ///
822    /// This currently "fires & forgets" and does not expect nor await any "pong" response.
823    pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
824        if let Some(session_data) = self.sessions.get(id).await {
825            trace!(session_id = ?id, "pinging manually session");
826            Ok(self
827                .msg_sender
828                .get()
829                .cloned()
830                .ok_or(SessionManagerError::NotStarted)?
831                .send((
832                    session_data.routing_opts.clone(),
833                    HoprStartProtocol::KeepAlive((*id).into()).try_into()?,
834                ))
835                .await
836                .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?)
837        } else {
838            Err(SessionManagerError::NonExistingSession.into())
839        }
840    }
841
842    /// Returns [`SessionIds`](SessionId) of all currently active sessions.
843    pub async fn active_sessions(&self) -> Vec<SessionId> {
844        self.sessions.run_pending_tasks().await;
845        self.sessions.iter().map(|(k, _)| *k).collect()
846    }
847
848    /// Updates the configuration of the SURB balancer on the given [`SessionId`].
849    ///
850    /// Returns an error if the Session with the given `id` does not exist, or
851    /// if it does not use SURB balancing.
852    pub async fn update_surb_balancer_config(
853        &self,
854        id: &SessionId,
855        config: SurbBalancerConfig,
856    ) -> crate::errors::Result<()> {
857        match self
858            .sessions
859            .entry_by_ref(id)
860            .and_compute_with(|entry| {
861                futures::future::ready(if let Some(mut cached_session) = entry.map(|e| e.into_value()) {
862                    // Only update the config if there already was one before
863                    if cached_session.surb_mgmt.is_some() {
864                        cached_session.surb_mgmt = Some(config);
865                        moka::ops::compute::Op::Put(cached_session)
866                    } else {
867                        moka::ops::compute::Op::Nop
868                    }
869                } else {
870                    moka::ops::compute::Op::Nop
871                })
872            })
873            .await
874        {
875            moka::ops::compute::CompResult::ReplacedWith(_) => Ok(()),
876            moka::ops::compute::CompResult::Unchanged(_) => {
877                Err(SessionManagerError::Other("session does not use SURB balancing".into()).into())
878            }
879            _ => Err(SessionManagerError::NonExistingSession.into()),
880        }
881    }
882
883    /// Retrieves the configuration of SURB balancing for the given Session.
884    ///
885    /// Returns an error if the Session with the given `id` does not exist.
886    pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
887        match self.sessions.get(id).await {
888            Some(session) => Ok(session.surb_mgmt),
889            None => Err(SessionManagerError::NonExistingSession.into()),
890        }
891    }
892
893    /// The main method to be called whenever data are received.
894    ///
895    /// It tries to recognize the message and correctly dispatches either
896    /// the Session protocol or Start protocol messages.
897    ///
898    /// If the data are not recognized, they are returned as [`DispatchResult::Unrelated`].
899    pub async fn dispatch_message(
900        &self,
901        pseudonym: HoprPseudonym,
902        data: ApplicationData,
903    ) -> crate::errors::Result<DispatchResult> {
904        if data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
905            // This is a Start protocol message, so we handle it
906            trace!(tag = %data.application_tag, "dispatching Start protocol message");
907            return self
908                .handle_start_protocol_message(pseudonym, data)
909                .await
910                .map(|_| DispatchResult::Processed);
911        } else if self.cfg.session_tag_range.contains(&data.application_tag.as_u64()) {
912            let session_id = SessionId::new(data.application_tag, pseudonym);
913
914            return if let Some(session_data) = self.sessions.get(&session_id).await {
915                trace!(?session_id, "received data for a registered session");
916
917                Ok(session_data
918                    .session_tx
919                    .unbounded_send(data.plain_text)
920                    .map(|_| DispatchResult::Processed)
921                    .map_err(|e| SessionManagerError::Other(e.to_string()))?)
922            } else {
923                error!(%session_id, "received data from an unestablished session");
924                Err(TransportSessionError::UnknownData)
925            };
926        }
927
928        trace!(%data.application_tag, "received data not associated with session protocol or any existing session");
929        Ok(DispatchResult::Unrelated(data))
930    }
931
932    async fn handle_incoming_session_initiation(
933        &self,
934        pseudonym: HoprPseudonym,
935        session_req: StartInitiation<SessionTarget, ByteCapabilities>,
936    ) -> crate::errors::Result<()> {
937        trace!(challenge = session_req.challenge, "received session initiation request");
938
939        debug!(%pseudonym, "got new session request, searching for a free session slot");
940
941        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
942
943        let (new_session_notifier, close_session_notifier) = self
944            .session_notifiers
945            .get()
946            .cloned()
947            .ok_or(SessionManagerError::NotStarted)?;
948
949        // Reply routing uses SURBs only with the pseudonym of this Session's ID
950        let reply_routing = DestinationRouting::Return(pseudonym.into());
951
952        let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
953
954        // Search for a free Session ID slot
955        self.sessions.run_pending_tasks().await; // Needed so that entry_count is updated
956        let allocated_slot = if self.cfg.maximum_sessions > self.sessions.entry_count() as usize {
957            insert_into_next_slot(
958                &self.sessions,
959                |sid| {
960                    // NOTE: It is allowed to insert sessions using the same tag
961                    // but different pseudonyms because the SessionId is different.
962                    let next_tag: Tag = match sid {
963                        Some(session_id) => ((session_id.tag().as_u64() + 1) % self.cfg.session_tag_range.end)
964                            .max(self.cfg.session_tag_range.start)
965                            .into(),
966                        None => hopr_crypto_random::random_integer(
967                            self.cfg.session_tag_range.start,
968                            Some(self.cfg.session_tag_range.end),
969                        )
970                        .into(),
971                    };
972                    SessionId::new(next_tag, pseudonym)
973                },
974                SessionSlot {
975                    session_tx: Arc::new(tx_session_data),
976                    routing_opts: reply_routing.clone(),
977                    abort_handles: vec![],
978                    surb_mgmt: None,
979                    surb_estimator: None,
980                },
981            )
982            .await
983        } else {
984            error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
985            None
986        };
987
988        if let Some(session_id) = allocated_slot {
989            debug!(%session_id, ?session_req, "assigned a new session");
990
991            let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
992                if let Err(error) = close_session_notifier.unbounded_send((session_id, reason)) {
993                    error!(%session_id, %error, %reason, "failed to notify session closure");
994                }
995            });
996
997            let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
998                let surb_estimator = AtomicSurbFlowEstimator::default();
999
1000                // Because of SURB scarcity, control the egress rate of incoming sessions
1001                let egress_rate_control =
1002                    RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1003
1004                // The Session request carries a "hint" as additional data telling what
1005                // the Session initiator has configured as its target buffer size in the Balancer.
1006                let target_surb_buffer_size = if session_req.additional_data > 0 {
1007                    (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1008                } else {
1009                    self.cfg.initial_return_session_egress_rate as u64
1010                        * self
1011                            .cfg
1012                            .minimum_surb_buffer_duration
1013                            .max(MIN_SURB_BUFFER_DURATION)
1014                            .as_secs()
1015                };
1016
1017                let surb_estimator_clone = surb_estimator.clone();
1018                let session = Session::new(
1019                    session_id,
1020                    reply_routing.clone(),
1021                    session_req.capabilities,
1022                    (
1023                        // Sent packets = SURB consumption estimate
1024                        msg_sender
1025                            .clone()
1026                            .with(move |(routing, data): (DestinationRouting, ApplicationData)| {
1027                                // Each outgoing packet consumes one SURB
1028                                surb_estimator_clone
1029                                    .consumed
1030                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1031                                futures::future::ok::<_, S::Error>((routing, data))
1032                            })
1033                            .rate_limit_with_controller(&egress_rate_control)
1034                            .buffer((2 * target_surb_buffer_size) as usize),
1035                        // Received packets = SURB retrieval estimate
1036                        rx_session_data.inspect(move |data| {
1037                            // Count the number of SURBs delivered with each incoming packet
1038                            surb_estimator_clone.produced.fetch_add(
1039                                ApplicationData::estimate_surbs_with_msg(data) as u64,
1040                                std::sync::atomic::Ordering::Relaxed,
1041                            );
1042                        }),
1043                    ),
1044                    Some(closure_notifier),
1045                )?;
1046
1047                // The SURB balancer will start intervening by rate-limiting the
1048                // egress of the Session, once the estimated number of SURBs drops below
1049                // the target defined here. Otherwise, the maximum egress is allowed.
1050                let balancer_config = SurbBalancerConfig {
1051                    target_surb_buffer_size,
1052                    // At maximum egress, the SURB buffer drains in `minimum_surb_buffer_duration` seconds
1053                    max_surbs_per_sec: target_surb_buffer_size
1054                        / self
1055                            .cfg
1056                            .minimum_surb_buffer_duration
1057                            .max(MIN_SURB_BUFFER_DURATION)
1058                            .as_secs(),
1059                    // No SURB decay at the Exit, since we know almost exactly how many SURBs
1060                    // were received
1061                    surb_decay: None,
1062                };
1063
1064                // Assign the SURB balancer and abort handles to the already allocated Session slot
1065                let (balancer_abort_handle, balancer_abort_reg) = AbortHandle::new_pair();
1066                if let moka::ops::compute::CompResult::ReplacedWith(_) = self
1067                    .sessions
1068                    .entry(session_id)
1069                    .and_compute_with(|entry| {
1070                        if let Some(mut cached_session) = entry.map(|c| c.into_value()) {
1071                            cached_session.abort_handles.push(balancer_abort_handle);
1072                            cached_session.surb_mgmt = Some(balancer_config);
1073                            cached_session.surb_estimator = Some(surb_estimator.clone());
1074                            futures::future::ready(moka::ops::compute::Op::Put(cached_session))
1075                        } else {
1076                            futures::future::ready(moka::ops::compute::Op::Nop)
1077                        }
1078                    })
1079                    .await
1080                {
1081                    // Spawn the SURB balancer only once we know we have registered the
1082                    // abort handle with the pre-allocated Session slot
1083                    debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1084                    let balancer = SurbBalancer::new(
1085                        session_id,
1086                        if let Some((growth_window, ratio_threshold)) = self.cfg.growable_target_surb_buffer.as_ref() {
1087                            // Allow automatic setpoint increase
1088                            SimpleBalancerController::with_increasing_setpoint(
1089                                *ratio_threshold,
1090                                (growth_window
1091                                    .div_duration_f64(self.cfg.balancer_sampling_interval)
1092                                    .round() as usize)
1093                                    .max(1),
1094                            )
1095                        } else {
1096                            SimpleBalancerController::default()
1097                        },
1098                        surb_estimator,
1099                        SurbControllerWithCorrection(egress_rate_control, 1), // 1 SURB per egress packet
1100                        balancer_config,
1101                    );
1102
1103                    let _ = balancer.start_control_loop(
1104                        self.cfg.balancer_sampling_interval,
1105                        SessionCacheBalancerFeedback(self.sessions.clone()),
1106                        Some(balancer_abort_reg),
1107                    );
1108                } else {
1109                    // This should never happen, but be sure we handle this error
1110                    return Err(SessionManagerError::Other(
1111                        "failed to spawn SURB balancer - inconsistent cache".into(),
1112                    )
1113                    .into());
1114                }
1115
1116                session
1117            } else {
1118                Session::new(
1119                    session_id,
1120                    reply_routing.clone(),
1121                    session_req.capabilities,
1122                    (msg_sender.clone(), rx_session_data),
1123                    Some(closure_notifier),
1124                )?
1125            };
1126
1127            // Extract useful information about the session from the Start protocol message
1128            let incoming_session = IncomingSession {
1129                session,
1130                target: session_req.target,
1131            };
1132
1133            // Notify that a new incoming session has been created
1134            if let Err(error) = new_session_notifier.unbounded_send(incoming_session) {
1135                warn!(%error, "failed to send session to incoming session queue");
1136            }
1137
1138            trace!(?session_id, "session notification sent");
1139
1140            // Notify the sender that the session has been established.
1141            // Set our peer ID in the session ID sent back to them.
1142            let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1143                orig_challenge: session_req.challenge,
1144                session_id,
1145            });
1146
1147            msg_sender.send((reply_routing, data.try_into()?)).await.map_err(|e| {
1148                SessionManagerError::Other(format!("failed to send session establishment message: {e}"))
1149            })?;
1150
1151            info!(%session_id, "new session established");
1152
1153            #[cfg(all(feature = "prometheus", not(test)))]
1154            {
1155                METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1156                METRIC_ACTIVE_SESSIONS.increment(1.0);
1157            }
1158        } else {
1159            error!(%pseudonym,"failed to reserve a new session slot");
1160
1161            // Notify the sender that the session could not be established
1162            let reason = StartErrorReason::NoSlotsAvailable;
1163            let data = HoprStartProtocol::SessionError(StartErrorType {
1164                challenge: session_req.challenge,
1165                reason,
1166            });
1167
1168            msg_sender.send((reply_routing, data.try_into()?)).await.map_err(|e| {
1169                SessionManagerError::Other(format!("failed to send session establishment error message: {e}"))
1170            })?;
1171
1172            trace!(%pseudonym, "session establishment failure message sent");
1173
1174            #[cfg(all(feature = "prometheus", not(test)))]
1175            METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1176        }
1177
1178        Ok(())
1179    }
1180
1181    async fn handle_start_protocol_message(
1182        &self,
1183        pseudonym: HoprPseudonym,
1184        data: ApplicationData,
1185    ) -> crate::errors::Result<()> {
1186        match HoprStartProtocol::try_from(data)? {
1187            HoprStartProtocol::StartSession(session_req) => {
1188                self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1189            }
1190            HoprStartProtocol::SessionEstablished(est) => {
1191                trace!(
1192                    session_id = ?est.session_id,
1193                    "received session establishment confirmation"
1194                );
1195                let challenge = est.orig_challenge;
1196                let session_id = est.session_id;
1197                if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1198                    if let Err(e) = tx_est.unbounded_send(Ok(est)) {
1199                        return Err(SessionManagerError::Other(format!(
1200                            "could not notify session {session_id} establishment: {e}"
1201                        ))
1202                        .into());
1203                    }
1204                    debug!(?session_id, challenge, "session establishment complete");
1205                } else {
1206                    error!(%session_id, challenge, "unknown session establishment attempt or expired");
1207                }
1208            }
1209            HoprStartProtocol::SessionError(error) => {
1210                trace!(
1211                    challenge = error.challenge,
1212                    error = ?error.reason,
1213                    "failed to initialize a session",
1214                );
1215                // Currently, we do not distinguish between individual error types
1216                // and just discard the initiation attempt and pass on the error.
1217                if let Some(tx_est) = self.session_initiations.remove(&error.challenge).await {
1218                    if let Err(e) = tx_est.unbounded_send(Err(error)) {
1219                        return Err(SessionManagerError::Other(format!(
1220                            "could not notify session establishment error {error:?}: {e}"
1221                        ))
1222                        .into());
1223                    }
1224                    error!(
1225                        challenge = error.challenge,
1226                        ?error,
1227                        "session establishment error received"
1228                    );
1229                } else {
1230                    error!(
1231                        challenge = error.challenge,
1232                        ?error,
1233                        "session establishment attempt expired before error could be delivered"
1234                    );
1235                }
1236
1237                #[cfg(all(feature = "prometheus", not(test)))]
1238                METRIC_RECEIVED_SESSION_ERRS.increment(&[&error.reason.to_string()])
1239            }
1240            HoprStartProtocol::KeepAlive(msg) => {
1241                let session_id = msg.session_id;
1242                if let Some(session_slot) = self.sessions.get(&session_id).await {
1243                    trace!(?session_id, "received keep-alive request");
1244                    // If the session has a SURB flow estimator, increase the number of received SURBs.
1245                    // This applies to the incoming sessions currently
1246                    if let Some(estimator) = session_slot.surb_estimator.as_ref() {
1247                        // Two SURBs per Keep-Alive message
1248                        estimator.produced.fetch_add(
1249                            KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64,
1250                            std::sync::atomic::Ordering::Relaxed,
1251                        );
1252                    }
1253                } else {
1254                    debug!(%session_id, "received keep-alive request for an unknown session");
1255                }
1256            }
1257        }
1258
1259        Ok(())
1260    }
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265    use anyhow::anyhow;
1266    use futures::{AsyncWriteExt, future::BoxFuture};
1267    use hopr_crypto_random::Randomizable;
1268    use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
1269    use hopr_primitive_types::prelude::Address;
1270    use hopr_protocol_start::StartProtocolDiscriminants;
1271    use tokio::time::timeout;
1272
1273    use super::*;
1274    use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1275
1276    #[async_trait::async_trait]
1277    trait SendMsg {
1278        async fn send_message(&self, routing: DestinationRouting, data: ApplicationData) -> crate::errors::Result<()>;
1279    }
1280
1281    mockall::mock! {
1282        MsgSender {}
1283        impl SendMsg for MsgSender {
1284            fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationData)
1285            -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1286        }
1287    }
1288
1289    fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationData)> {
1290        let (tx, rx) = futures::channel::mpsc::unbounded();
1291        tokio::task::spawn(async move {
1292            pin_mut!(rx);
1293            while let Some((routing, data)) = rx.next().await {
1294                sender
1295                    .send_message(routing, data)
1296                    .await
1297                    .expect("send message must not fail in mock");
1298            }
1299        });
1300        tx
1301    }
1302
1303    fn msg_type(data: &ApplicationData, expected: StartProtocolDiscriminants) -> bool {
1304        HoprStartProtocol::decode(data.application_tag, &data.plain_text)
1305            .map(|d| StartProtocolDiscriminants::from(d) == expected)
1306            .unwrap_or(false)
1307    }
1308
1309    #[test_log::test(tokio::test)]
1310    async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1311    {
1312        let alice_pseudonym = HoprPseudonym::random();
1313        let bob_peer: Address = (&ChainKeypair::random()).into();
1314
1315        let alice_mgr = SessionManager::new(Default::default());
1316        let bob_mgr = SessionManager::new(Default::default());
1317
1318        let mut sequence = mockall::Sequence::new();
1319        let mut alice_transport = MockMsgSender::new();
1320        let mut bob_transport = MockMsgSender::new();
1321
1322        // Alice sends the StartSession message
1323        let bob_mgr_clone = bob_mgr.clone();
1324        alice_transport
1325            .expect_send_message()
1326            .once()
1327            .in_sequence(&mut sequence)
1328            .withf(move |peer, data| {
1329                info!("alice sends {}", data.application_tag);
1330                msg_type(data, StartProtocolDiscriminants::StartSession)
1331                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1332            })
1333            .returning(move |_, data| {
1334                let bob_mgr_clone = bob_mgr_clone.clone();
1335                Box::pin(async move {
1336                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1337                    Ok(())
1338                })
1339            });
1340
1341        // Bob sends the SessionEstablished message
1342        let alice_mgr_clone = alice_mgr.clone();
1343        bob_transport
1344            .expect_send_message()
1345            .once()
1346            .in_sequence(&mut sequence)
1347            .withf(move |peer, data| {
1348                info!("bob sends {}", data.application_tag);
1349                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1350                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1351            })
1352            .returning(move |_, data| {
1353                let alice_mgr_clone = alice_mgr_clone.clone();
1354
1355                Box::pin(async move {
1356                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1357                    Ok(())
1358                })
1359            });
1360
1361        // Alice sends the terminating segment to close the Session
1362        let bob_mgr_clone = bob_mgr.clone();
1363        alice_transport
1364            .expect_send_message()
1365            .once()
1366            .in_sequence(&mut sequence)
1367            .withf(move |peer, data| {
1368                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1369                    data.plain_text.as_ref(),
1370                )
1371                .expect("must be a session message")
1372                .try_as_segment()
1373                .expect("must be a segment")
1374                .is_terminating()
1375                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1376            })
1377            .returning(move |_, data| {
1378                let bob_mgr_clone = bob_mgr_clone.clone();
1379                Box::pin(async move {
1380                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1381                    Ok(())
1382                })
1383            });
1384
1385        let mut ahs = Vec::new();
1386
1387        // Start Alice
1388        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1389        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1390
1391        // Start Bob
1392        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1393        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1394
1395        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1396
1397        pin_mut!(new_session_rx_bob);
1398        let (alice_session, bob_session) = timeout(
1399            Duration::from_secs(2),
1400            futures::future::join(
1401                alice_mgr.new_session(
1402                    bob_peer,
1403                    SessionTarget::TcpStream(target.clone()),
1404                    SessionClientConfig {
1405                        pseudonym: alice_pseudonym.into(),
1406                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1407                        surb_management: None,
1408                        ..Default::default()
1409                    },
1410                ),
1411                new_session_rx_bob.next(),
1412            ),
1413        )
1414        .await?;
1415
1416        let mut alice_session = alice_session?;
1417        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1418
1419        assert_eq!(
1420            *alice_session.capabilities(),
1421            Capability::Segmentation | Capability::NoRateControl
1422        );
1423        assert_eq!(alice_session.capabilities(), bob_session.session.capabilities());
1424        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1425
1426        assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1427        assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1428        assert!(
1429            alice_mgr
1430                .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1431                .await
1432                .is_err()
1433        );
1434
1435        assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1436        assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1437        assert!(
1438            bob_mgr
1439                .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1440                .await
1441                .is_err()
1442        );
1443
1444        tokio::time::sleep(Duration::from_millis(100)).await;
1445        alice_session.close().await?;
1446
1447        tokio::time::sleep(Duration::from_millis(100)).await;
1448
1449        assert!(matches!(
1450            alice_mgr.ping_session(alice_session.id()).await,
1451            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1452        ));
1453
1454        futures::stream::iter(ahs)
1455            .for_each(|ah| async move { ah.abort() })
1456            .await;
1457
1458        Ok(())
1459    }
1460
1461    #[test_log::test(tokio::test)]
1462    async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1463        let alice_pseudonym = HoprPseudonym::random();
1464        let bob_peer: Address = (&ChainKeypair::random()).into();
1465
1466        let cfg = SessionManagerConfig {
1467            idle_timeout: Duration::from_millis(200),
1468            ..Default::default()
1469        };
1470
1471        let alice_mgr = SessionManager::new(cfg);
1472        let bob_mgr = SessionManager::new(Default::default());
1473
1474        let mut sequence = mockall::Sequence::new();
1475        let mut alice_transport = MockMsgSender::new();
1476        let mut bob_transport = MockMsgSender::new();
1477
1478        // Alice sends the StartSession message
1479        let bob_mgr_clone = bob_mgr.clone();
1480        alice_transport
1481            .expect_send_message()
1482            .once()
1483            .in_sequence(&mut sequence)
1484            .withf(move |peer, data| {
1485                msg_type(data, StartProtocolDiscriminants::StartSession)
1486                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1487            })
1488            .returning(move |_, data| {
1489                let bob_mgr_clone = bob_mgr_clone.clone();
1490                Box::pin(async move {
1491                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1492                    Ok(())
1493                })
1494            });
1495
1496        // Bob sends the SessionEstablished message
1497        let alice_mgr_clone = alice_mgr.clone();
1498        bob_transport
1499            .expect_send_message()
1500            .once()
1501            .in_sequence(&mut sequence)
1502            .withf(move |peer, data| {
1503                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1504                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1505            })
1506            .returning(move |_, data| {
1507                let alice_mgr_clone = alice_mgr_clone.clone();
1508
1509                Box::pin(async move {
1510                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1511                    Ok(())
1512                })
1513            });
1514
1515        let mut ahs = Vec::new();
1516
1517        // Start Alice
1518        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1519        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1520
1521        // Start Bob
1522        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1523        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1524
1525        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1526
1527        pin_mut!(new_session_rx_bob);
1528        let (alice_session, bob_session) = timeout(
1529            Duration::from_secs(2),
1530            futures::future::join(
1531                alice_mgr.new_session(
1532                    bob_peer,
1533                    SessionTarget::TcpStream(target.clone()),
1534                    SessionClientConfig {
1535                        pseudonym: alice_pseudonym.into(),
1536                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1537                        surb_management: None,
1538                        ..Default::default()
1539                    },
1540                ),
1541                new_session_rx_bob.next(),
1542            ),
1543        )
1544        .await?;
1545
1546        let alice_session = alice_session?;
1547        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1548
1549        assert_eq!(
1550            *alice_session.capabilities(),
1551            Capability::Segmentation | Capability::NoRateControl,
1552        );
1553        assert_eq!(alice_session.capabilities(), bob_session.session.capabilities());
1554        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1555
1556        // Let the session timeout at Alice
1557        tokio::time::sleep(Duration::from_millis(300)).await;
1558
1559        assert!(matches!(
1560            alice_mgr.ping_session(alice_session.id()).await,
1561            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1562        ));
1563
1564        futures::stream::iter(ahs)
1565            .for_each(|ah| async move { ah.abort() })
1566            .await;
1567
1568        Ok(())
1569    }
1570
1571    #[test_log::test(tokio::test)]
1572    async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1573        let alice_pseudonym = HoprPseudonym::random();
1574        let session_id = SessionId::new(16u64, alice_pseudonym);
1575        let balancer_cfg = SurbBalancerConfig {
1576            target_surb_buffer_size: 1000,
1577            max_surbs_per_sec: 100,
1578            ..Default::default()
1579        };
1580
1581        let alice_mgr =
1582            SessionManager::<UnboundedSender<(DestinationRouting, ApplicationData)>>::new(Default::default());
1583
1584        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1585        alice_mgr
1586            .sessions
1587            .insert(
1588                session_id,
1589                SessionSlot {
1590                    session_tx: Arc::new(dummy_tx),
1591                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1592                    abort_handles: Vec::new(),
1593                    surb_mgmt: Some(balancer_cfg.clone()),
1594                    surb_estimator: None,
1595                },
1596            )
1597            .await;
1598
1599        let actual_cfg = alice_mgr
1600            .get_surb_balancer_config(&session_id)
1601            .await?
1602            .ok_or(anyhow!("session must have a surb balancer config"))?;
1603        assert_eq!(actual_cfg, balancer_cfg);
1604
1605        let new_cfg = SurbBalancerConfig {
1606            target_surb_buffer_size: 2000,
1607            max_surbs_per_sec: 200,
1608            ..Default::default()
1609        };
1610        alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1611
1612        let actual_cfg = alice_mgr
1613            .get_surb_balancer_config(&session_id)
1614            .await?
1615            .ok_or(anyhow!("session must have a surb balancer config"))?;
1616        assert_eq!(actual_cfg, new_cfg);
1617
1618        Ok(())
1619    }
1620
1621    #[test_log::test(tokio::test)]
1622    async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1623        let alice_pseudonym = HoprPseudonym::random();
1624        let bob_peer: Address = (&ChainKeypair::random()).into();
1625
1626        let cfg = SessionManagerConfig {
1627            session_tag_range: 16u64..17u64, // Slot for exactly one session
1628            ..Default::default()
1629        };
1630
1631        let alice_mgr = SessionManager::new(Default::default());
1632        let bob_mgr = SessionManager::new(cfg);
1633
1634        // Occupy the only free slot with tag 16
1635        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1636        bob_mgr
1637            .sessions
1638            .insert(
1639                SessionId::new(16u64, alice_pseudonym),
1640                SessionSlot {
1641                    session_tx: Arc::new(dummy_tx),
1642                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1643                    abort_handles: Vec::new(),
1644                    surb_mgmt: None,
1645                    surb_estimator: None,
1646                },
1647            )
1648            .await;
1649
1650        let mut sequence = mockall::Sequence::new();
1651        let mut alice_transport = MockMsgSender::new();
1652        let mut bob_transport = MockMsgSender::new();
1653
1654        // Alice sends the StartSession message
1655        let bob_mgr_clone = bob_mgr.clone();
1656        alice_transport
1657            .expect_send_message()
1658            .once()
1659            .in_sequence(&mut sequence)
1660            .withf(move |peer, data| {
1661                msg_type(data, StartProtocolDiscriminants::StartSession)
1662                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1663            })
1664            .returning(move |_, data| {
1665                let bob_mgr_clone = bob_mgr_clone.clone();
1666                Box::pin(async move {
1667                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1668                    Ok(())
1669                })
1670            });
1671
1672        // Bob sends the SessionError message
1673        let alice_mgr_clone = alice_mgr.clone();
1674        bob_transport
1675            .expect_send_message()
1676            .once()
1677            .in_sequence(&mut sequence)
1678            .withf(move |peer, data| {
1679                msg_type(data, StartProtocolDiscriminants::SessionError)
1680                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1681            })
1682            .returning(move |_, data| {
1683                let alice_mgr_clone = alice_mgr_clone.clone();
1684                Box::pin(async move {
1685                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1686                    Ok(())
1687                })
1688            });
1689
1690        let mut jhs = Vec::new();
1691
1692        // Start Alice
1693        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1694        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1695
1696        // Start Bob
1697        let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1698        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1699
1700        let result = alice_mgr
1701            .new_session(
1702                bob_peer,
1703                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1704                SessionClientConfig {
1705                    capabilities: Capabilities::empty(),
1706                    pseudonym: alice_pseudonym.into(),
1707                    surb_management: None,
1708                    ..Default::default()
1709                },
1710            )
1711            .await;
1712
1713        assert!(
1714            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1715        );
1716
1717        Ok(())
1718    }
1719
1720    #[test_log::test(tokio::test)]
1721    async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
1722    -> anyhow::Result<()> {
1723        let alice_pseudonym = HoprPseudonym::random();
1724        let bob_peer: Address = (&ChainKeypair::random()).into();
1725
1726        let cfg = SessionManagerConfig {
1727            maximum_sessions: 1,
1728            ..Default::default()
1729        };
1730
1731        let alice_mgr = SessionManager::new(Default::default());
1732        let bob_mgr = SessionManager::new(cfg);
1733
1734        // Occupy the only free slot with tag 16
1735        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1736        bob_mgr
1737            .sessions
1738            .insert(
1739                SessionId::new(16u64, alice_pseudonym),
1740                SessionSlot {
1741                    session_tx: Arc::new(dummy_tx),
1742                    routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
1743                    abort_handles: Vec::new(),
1744                    surb_mgmt: None,
1745                    surb_estimator: None,
1746                },
1747            )
1748            .await;
1749
1750        let mut sequence = mockall::Sequence::new();
1751        let mut alice_transport = MockMsgSender::new();
1752        let mut bob_transport = MockMsgSender::new();
1753
1754        // Alice sends the StartSession message
1755        let bob_mgr_clone = bob_mgr.clone();
1756        alice_transport
1757            .expect_send_message()
1758            .once()
1759            .in_sequence(&mut sequence)
1760            .withf(move |peer, data| {
1761                msg_type(data, StartProtocolDiscriminants::StartSession)
1762                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1763            })
1764            .returning(move |_, data| {
1765                let bob_mgr_clone = bob_mgr_clone.clone();
1766                Box::pin(async move {
1767                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1768                    Ok(())
1769                })
1770            });
1771
1772        // Bob sends the SessionError message
1773        let alice_mgr_clone = alice_mgr.clone();
1774        bob_transport
1775            .expect_send_message()
1776            .once()
1777            .in_sequence(&mut sequence)
1778            .withf(move |peer, data| {
1779                msg_type(data, StartProtocolDiscriminants::SessionError)
1780                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1781            })
1782            .returning(move |_, data| {
1783                let alice_mgr_clone = alice_mgr_clone.clone();
1784                Box::pin(async move {
1785                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1786                    Ok(())
1787                })
1788            });
1789
1790        let mut jhs = Vec::new();
1791
1792        // Start Alice
1793        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1794        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1795
1796        // Start Bob
1797        let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1798        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1799
1800        let result = alice_mgr
1801            .new_session(
1802                bob_peer,
1803                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1804                SessionClientConfig {
1805                    capabilities: None.into(),
1806                    pseudonym: alice_pseudonym.into(),
1807                    surb_management: None,
1808                    ..Default::default()
1809                },
1810            )
1811            .await;
1812
1813        assert!(
1814            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1815        );
1816
1817        Ok(())
1818    }
1819
1820    #[test_log::test(tokio::test)]
1821    async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
1822        let alice_pseudonym = HoprPseudonym::random();
1823        let bob_peer: Address = (&ChainKeypair::random()).into();
1824
1825        let alice_mgr = SessionManager::new(Default::default());
1826
1827        let mut sequence = mockall::Sequence::new();
1828        let mut alice_transport = MockMsgSender::new();
1829
1830        // Alice sends the StartSession message
1831        let alice_mgr_clone = alice_mgr.clone();
1832        alice_transport
1833            .expect_send_message()
1834            .once()
1835            .in_sequence(&mut sequence)
1836            .withf(move |peer, data| {
1837                msg_type(data, StartProtocolDiscriminants::StartSession)
1838                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1839            })
1840            .returning(move |_, data| {
1841                // But the message is again processed by Alice due to Loopback
1842                let alice_mgr_clone = alice_mgr_clone.clone();
1843                Box::pin(async move {
1844                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1845                    Ok(())
1846                })
1847            });
1848
1849        // Alice sends the SessionEstablished message (as Bob)
1850        let alice_mgr_clone = alice_mgr.clone();
1851        alice_transport
1852            .expect_send_message()
1853            .once()
1854            .in_sequence(&mut sequence)
1855            .withf(move |peer, data| {
1856                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1857                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1858            })
1859            .returning(move |_, data| {
1860                let alice_mgr_clone = alice_mgr_clone.clone();
1861
1862                Box::pin(async move {
1863                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1864                    Ok(())
1865                })
1866            });
1867
1868        // Start Alice
1869        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1870        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
1871
1872        let alice_session = alice_mgr
1873            .new_session(
1874                bob_peer,
1875                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1876                SessionClientConfig {
1877                    capabilities: None.into(),
1878                    pseudonym: alice_pseudonym.into(),
1879                    surb_management: None,
1880                    ..Default::default()
1881                },
1882            )
1883            .await;
1884
1885        println!("{alice_session:?}");
1886        assert!(matches!(
1887            alice_session,
1888            Err(TransportSessionError::Manager(SessionManagerError::Loopback))
1889        ));
1890
1891        Ok(())
1892    }
1893
1894    #[test_log::test(tokio::test)]
1895    async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
1896        let bob_peer: Address = (&ChainKeypair::random()).into();
1897
1898        let cfg = SessionManagerConfig {
1899            initiation_timeout_base: Duration::from_millis(100),
1900            ..Default::default()
1901        };
1902
1903        let alice_mgr = SessionManager::new(cfg);
1904        let bob_mgr = SessionManager::new(Default::default());
1905
1906        let mut sequence = mockall::Sequence::new();
1907        let mut alice_transport = MockMsgSender::new();
1908        let bob_transport = MockMsgSender::new();
1909
1910        // Alice sends the StartSession message, but Bob does not handle it
1911        alice_transport
1912            .expect_send_message()
1913            .once()
1914            .in_sequence(&mut sequence)
1915            .withf(move |peer, data| {
1916                msg_type(data, StartProtocolDiscriminants::StartSession)
1917                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1918            })
1919            .returning(|_, _| Box::pin(async { Ok(()) }));
1920
1921        // Start Alice
1922        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1923        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
1924
1925        // Start Bob
1926        let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1927        bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
1928
1929        let result = alice_mgr
1930            .new_session(
1931                bob_peer,
1932                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1933                SessionClientConfig {
1934                    capabilities: None.into(),
1935                    pseudonym: None,
1936                    surb_management: None,
1937                    ..Default::default()
1938                },
1939            )
1940            .await;
1941
1942        assert!(matches!(result, Err(TransportSessionError::Timeout)));
1943
1944        Ok(())
1945    }
1946
1947    #[test_log::test(tokio::test)]
1948    async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
1949        let alice_pseudonym = HoprPseudonym::random();
1950        let bob_peer: Address = (&ChainKeypair::random()).into();
1951
1952        let bob_cfg = SessionManagerConfig::default();
1953        let alice_mgr = SessionManager::new(Default::default());
1954        let bob_mgr = SessionManager::new(bob_cfg.clone());
1955
1956        let mut alice_transport = MockMsgSender::new();
1957        let mut bob_transport = MockMsgSender::new();
1958
1959        // Alice sends the StartSession message
1960        let mut open_sequence = mockall::Sequence::new();
1961        let bob_mgr_clone = bob_mgr.clone();
1962        alice_transport
1963            .expect_send_message()
1964            .once()
1965            .in_sequence(&mut open_sequence)
1966            .withf(move |peer, data| {
1967                msg_type(data, StartProtocolDiscriminants::StartSession)
1968                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1969            })
1970            .returning(move |_, data| {
1971                let bob_mgr_clone = bob_mgr_clone.clone();
1972                Box::pin(async move {
1973                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1974                    Ok(())
1975                })
1976            });
1977
1978        // Bob sends the SessionEstablished message
1979        let alice_mgr_clone = alice_mgr.clone();
1980        bob_transport
1981            .expect_send_message()
1982            .once()
1983            .in_sequence(&mut open_sequence)
1984            .withf(move |peer, data| {
1985                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1986                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1987            })
1988            .returning(move |_, data| {
1989                let alice_mgr_clone = alice_mgr_clone.clone();
1990                Box::pin(async move {
1991                    alice_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
1992                    Ok(())
1993                })
1994            });
1995
1996        // Alice sends the KeepAlive messages
1997        let bob_mgr_clone = bob_mgr.clone();
1998        alice_transport
1999            .expect_send_message()
2000            .times(5..)
2001            //.in_sequence(&mut sequence)
2002            .withf(move |peer, data| {
2003                msg_type(data, StartProtocolDiscriminants::KeepAlive)
2004                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2005            })
2006            .returning(move |_, data| {
2007                let bob_mgr_clone = bob_mgr_clone.clone();
2008                Box::pin(async move {
2009                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
2010                    Ok(())
2011                })
2012            });
2013
2014        // Alice sends the terminating segment to close the Session
2015        let bob_mgr_clone = bob_mgr.clone();
2016        alice_transport
2017            .expect_send_message()
2018            .once()
2019            //.in_sequence(&mut sequence)
2020            .withf(move |peer, data| {
2021                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2022                    data.plain_text.as_ref(),
2023                )
2024                .ok()
2025                .and_then(|m| m.try_as_segment())
2026                .map(|s| s.is_terminating())
2027                .unwrap_or(false)
2028                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2029            })
2030            .returning(move |_, data| {
2031                let bob_mgr_clone = bob_mgr_clone.clone();
2032                Box::pin(async move {
2033                    bob_mgr_clone.dispatch_message(alice_pseudonym, data).await?;
2034                    Ok(())
2035                })
2036            });
2037
2038        let mut ahs = Vec::new();
2039
2040        // Start Alice
2041        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
2042        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2043
2044        // Start Bob
2045        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
2046        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2047
2048        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2049
2050        let balancer_cfg = SurbBalancerConfig {
2051            target_surb_buffer_size: 10,
2052            max_surbs_per_sec: 100,
2053            ..Default::default()
2054        };
2055
2056        pin_mut!(new_session_rx_bob);
2057        let (alice_session, bob_session) = timeout(
2058            Duration::from_secs(2),
2059            futures::future::join(
2060                alice_mgr.new_session(
2061                    bob_peer,
2062                    SessionTarget::TcpStream(target.clone()),
2063                    SessionClientConfig {
2064                        pseudonym: alice_pseudonym.into(),
2065                        capabilities: Capability::Segmentation.into(),
2066                        surb_management: Some(balancer_cfg),
2067                        ..Default::default()
2068                    },
2069                ),
2070                new_session_rx_bob.next(),
2071            ),
2072        )
2073        .await?;
2074
2075        let mut alice_session = alice_session?;
2076        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2077
2078        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2079
2080        assert_eq!(
2081            Some(balancer_cfg),
2082            alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2083        );
2084
2085        let remote_cfg = bob_mgr
2086            .get_surb_balancer_config(bob_session.session.id())
2087            .await?
2088            .ok_or(anyhow!("no remote config at bob"))?;
2089        assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2090        assert_eq!(
2091            remote_cfg.max_surbs_per_sec,
2092            remote_cfg.target_surb_buffer_size
2093                / bob_cfg
2094                    .minimum_surb_buffer_duration
2095                    .max(MIN_SURB_BUFFER_DURATION)
2096                    .as_secs()
2097        );
2098
2099        // Let the Surb balancer send enough KeepAlive messages
2100        tokio::time::sleep(Duration::from_millis(1500)).await;
2101        alice_session.close().await?;
2102
2103        tokio::time::sleep(Duration::from_millis(300)).await;
2104        assert!(matches!(
2105            alice_mgr.ping_session(alice_session.id()).await,
2106            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2107        ));
2108
2109        futures::stream::iter(ahs)
2110            .for_each(|ah| async move { ah.abort() })
2111            .await;
2112
2113        Ok(())
2114    }
2115}