hopr_transport_session/
manager.rs

1use std::{
2    ops::Range,
3    sync::{Arc, OnceLock},
4    time::Duration,
5};
6
7use futures::{
8    FutureExt, SinkExt, StreamExt, TryStreamExt, channel::mpsc::UnboundedSender, future::AbortHandle, pin_mut,
9};
10use futures_time::future::FutureExt as TimeExt;
11use hopr_crypto_random::Randomizable;
12use hopr_internal_types::prelude::HoprPseudonym;
13use hopr_network_types::prelude::*;
14use hopr_primitive_types::prelude::Address;
15use hopr_protocol_app::prelude::*;
16use hopr_protocol_start::{
17    KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished, StartInitiation,
18};
19use tracing::{debug, error, info, trace, warn};
20
21use crate::{
22    Capability, HoprSession, IncomingSession, SESSION_MTU, SessionClientConfig, SessionId, SessionTarget,
23    SurbBalancerConfig,
24    balancer::{
25        AtomicSurbFlowEstimator, BalancerConfigFeedback, RateController, RateLimitSinkExt, SurbBalancer,
26        SurbControllerWithCorrection,
27        pid::{PidBalancerController, PidControllerGains},
28        simple::SimpleBalancerController,
29    },
30    errors::{SessionManagerError, TransportSessionError},
31    types::{ByteCapabilities, ClosureReason, HoprSessionConfig, HoprStartProtocol},
32    utils,
33    utils::insert_into_next_slot,
34};
35
36#[cfg(all(feature = "prometheus", not(test)))]
37lazy_static::lazy_static! {
38    static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
39        "hopr_session_num_active_sessions",
40        "Number of currently active HOPR sessions"
41    ).unwrap();
42    static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
43        "hopr_session_established_sessions_count",
44        "Number of sessions that were successfully established as an Exit node"
45    ).unwrap();
46    static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
47        "hopr_session_initiated_sessions_count",
48        "Number of sessions that were successfully initiated as an Entry node"
49    ).unwrap();
50    static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
51        "hopr_session_received_error_count",
52        "Number of HOPR session errors received from an Exit node",
53        &["kind"]
54    ).unwrap();
55    static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
56        "hopr_session_sent_error_count",
57        "Number of HOPR session errors sent to an Entry node",
58        &["kind"]
59    ).unwrap();
60}
61
62fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
63    debug!(?session_id, ?reason, "closing session");
64
65    if reason != ClosureReason::EmptyRead {
66        // Closing the data sender will also cause it to close from the read side
67        session_data.session_tx.close_channel();
68        trace!(?session_id, "data tx channel closed on session");
69    }
70
71    // Terminate any additional tasks spawned by the Session
72    session_data.abort_handles.into_iter().for_each(|h| h.abort());
73
74    #[cfg(all(feature = "prometheus", not(test)))]
75    METRIC_ACTIVE_SESSIONS.decrement(1.0);
76}
77
78fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
79    base * (hops as u32)
80}
81
82/// Minimum time the SURB buffer must endure if no SURBs are being produced.
83pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
84
85/// The first challenge value used in Start protocol to initiate a session.
86pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
87
88/// Maximum time to wait for counterparty to receive the target number of SURBs.
89const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
90
91/// Minimum timeout until an unfinished frame is discarded.
92const MIN_FRAME_TIMEOUT: Duration = Duration::from_millis(10);
93
94// Needs to use an UnboundedSender instead of oneshot
95// because Moka cache requires the value to be Clone, which oneshot Sender is not.
96// It also cannot be enclosed in an Arc, since calling `send` consumes the oneshot Sender.
97type SessionInitiationCache =
98    moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
99
100#[derive(Clone)]
101struct SessionSlot {
102    // Sender needs to be put in Arc, so that no clones are made by `moka`.
103    // This makes sure that the entire channel closes once the one and only sender is closed.
104    session_tx: Arc<UnboundedSender<ApplicationDataIn>>,
105    routing_opts: DestinationRouting,
106    abort_handles: Vec<AbortHandle>,
107    // Allows reconfiguring of the SURB balancer on-the-fly
108    // Set on both Entry and Exit sides.
109    surb_mgmt: Option<SurbBalancerConfig>,
110    // Set only on the Exit side.
111    surb_estimator: Option<AtomicSurbFlowEstimator>,
112}
113
114/// Allows forward and backward propagation of SURB management configuration changes.
115struct SessionCacheBalancerFeedback(moka::future::Cache<SessionId, SessionSlot>);
116
117#[async_trait::async_trait]
118impl BalancerConfigFeedback for SessionCacheBalancerFeedback {
119    async fn get_config(&self, id: &SessionId) -> crate::errors::Result<SurbBalancerConfig> {
120        // Intentionally using `iter().find()` instead of `get()` here,
121        // so that the popularity estimator is not hit.
122        self.0
123            .iter()
124            .find(|(sid, _)| sid.as_ref() == id)
125            .ok_or(SessionManagerError::NonExistingSession)?
126            .1
127            .surb_mgmt
128            .ok_or(SessionManagerError::Other("missing surb balancer config".into()).into())
129    }
130
131    async fn on_config_update(&self, id: &SessionId, cfg: SurbBalancerConfig) -> crate::errors::Result<()> {
132        if let moka::ops::compute::CompResult::ReplacedWith(_) = self
133            .0
134            .entry_by_ref(id)
135            .and_compute_with(|entry| {
136                futures::future::ready(match entry.map(|e| e.into_value()) {
137                    None => moka::ops::compute::Op::Nop,
138                    Some(mut updated_slot) => {
139                        if let Some(balancer_cfg) = &mut updated_slot.surb_mgmt {
140                            *balancer_cfg = cfg;
141                            moka::ops::compute::Op::Put(updated_slot)
142                        } else {
143                            moka::ops::compute::Op::Nop
144                        }
145                    }
146                })
147            })
148            .await
149        {
150            Ok(())
151        } else {
152            Err(SessionManagerError::NonExistingSession.into())
153        }
154    }
155}
156
157/// Indicates the result of processing a message.
158#[derive(Clone, Debug, PartialEq, Eq)]
159pub enum DispatchResult {
160    /// Session or Start protocol message has been processed successfully.
161    Processed,
162    /// The message was not related to Start or Session protocol.
163    Unrelated(ApplicationDataIn),
164}
165
166/// Incoming session notifier and session closure notifier
167type SessionNotifiers = (
168    UnboundedSender<IncomingSession>,
169    UnboundedSender<(SessionId, ClosureReason)>,
170);
171
172/// Configuration for the [`SessionManager`].
173#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
174pub struct SessionManagerConfig {
175    /// Ranges of tags available for Sessions.
176    ///
177    /// **NOTE**: If the range starts lower than [`ReservedTag`]'s range end,
178    /// it will be automatically transformed to start at this value.
179    /// This is due to the reserved range by the Start sub-protocol.
180    ///
181    /// Default is 16..1024.
182    #[doc(hidden)]
183    #[default(_code = "16u64..1024u64")]
184    pub session_tag_range: Range<u64>,
185
186    /// The maximum number of sessions (incoming and outgoing) that is allowed
187    /// to be managed by the manager.
188    ///
189    /// When reached, creating [new sessions](SessionManager::new_session) will return
190    /// the [`SessionManagerError::TooManySessions`] error, and incoming sessions will be rejected
191    /// with [`StartErrorReason::NoSlotsAvailable`] Start protocol error.
192    ///
193    /// Default is 128, minimum is 1; maximum is given by the `session_tag_range`.
194    #[default(128)]
195    pub maximum_sessions: usize,
196
197    /// The maximum chunk of data that can be written to the Session's input buffer.
198    ///
199    /// Default is 1500.
200    #[default(1500)]
201    pub frame_mtu: usize,
202
203    /// The maximum time for an incomplete frame to stay in the Session's output buffer.
204    ///
205    /// Default is 800 ms.
206    #[default(Duration::from_millis(800))]
207    pub max_frame_timeout: Duration,
208
209    /// The base timeout for initiation of Session initiation.
210    ///
211    /// The actual timeout is adjusted according to the number of hops for that Session:
212    /// `t = initiation_time_out_base * (num_forward_hops + num_return_hops + 2)`
213    ///
214    /// Default is 500 milliseconds.
215    #[default(Duration::from_millis(500))]
216    pub initiation_timeout_base: Duration,
217
218    /// Timeout for Session to be closed due to inactivity.
219    ///
220    /// Default is 180 seconds.
221    #[default(Duration::from_secs(180))]
222    pub idle_timeout: Duration,
223
224    /// The sampling interval for SURB balancer.
225    /// It will make SURB control decisions regularly at this interval.
226    ///
227    /// Default is 100 milliseconds.
228    #[default(Duration::from_millis(100))]
229    pub balancer_sampling_interval: Duration,
230
231    /// Initial packets per second egress rate on an incoming Session.
232    ///
233    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
234    ///
235    /// Default is 10 packets/second.
236    #[default(10)]
237    pub initial_return_session_egress_rate: usize,
238    /// Minimum period of time for which a SURB buffer at the Exit must
239    /// endure if no SURBs are being received.
240    ///
241    /// In other words, it is the minimum period of time an Exit must withstand when
242    /// no SURBs are received from the Entry at all. To do so, the egress traffic
243    /// will be shaped accordingly to meet this requirement.
244    ///
245    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
246    ///
247    /// Default is 5 seconds, minimum is 1 second.
248    #[default(Duration::from_secs(5))]
249    pub minimum_surb_buffer_duration: Duration,
250    /// Indicates the maximum number of SURBs in the SURB buffer to be requested when creating a new Session.
251    ///
252    /// This value is theoretically capped by the size of the global transport SURB ring buffer,
253    /// so values greater than that do not make sense. This value should be ideally set equal
254    /// to the size of the global transport SURB RB.
255    ///
256    /// Default is 10 000 SURBs.
257    #[default(10_000)]
258    pub maximum_surb_buffer_size: usize,
259
260    /// Determines if the `target_surb_buffer_size` can grow if the simple moving average of the
261    /// current SURB buffer size estimate surpasses the given threshold over the given period.
262    ///
263    /// This only applies if the used [`SurbBalancerController`] has this option.
264    ///
265    /// The default is `(60, 0.20)` (20% growth if the average SURB buffer size surpasses 20%
266    /// of the target buffer size in a 60-second window).
267    #[default(_code = "Some((Duration::from_secs(60), 0.20))")]
268    pub growable_target_surb_buffer: Option<(Duration, f64)>,
269}
270
271/// Manages lifecycles of Sessions.
272///
273/// Once the manager is [started](SessionManager::start), the [`SessionManager::dispatch_message`]
274/// should be called for each [`ApplicationData`] received by the node.
275/// This way, the `SessionManager` takes care of proper Start sub-protocol message processing
276/// and correct dispatch of Session-related packets to individual existing Sessions.
277///
278/// Secondly, the manager can initiate new outgoing sessions via [`SessionManager::new_session`],
279/// probe sessions using [`SessionManager::ping_session`]
280/// and list them via [SessionManager::active_sessions].
281///
282/// Since the `SessionManager` operates over the HOPR protocol,
283/// the message transport `S` is required.
284/// Such transport must also be `Clone`, since it will be cloned into all the created [`HoprSession`] objects.
285///
286/// ## SURB balancing
287/// The manager also can take care of automatic [SURB balancing](SurbBalancerConfig) per Session.
288///
289/// With each packet sent from the session initiator over to the receiving party, zero to 2 SURBs might be delivered.
290/// When the receiving party wants to send reply packets back, it must consume 1 SURB per packet. This
291/// means that if the difference between the SURBs delivered and SURBs consumed is negative, the receiving party
292/// might soon run out of SURBs. If SURBs run out, the reply packets will be dropped, causing likely quality of
293/// service degradation.
294///
295/// In an attempt to counter this effect, there are two co-existing automated modes of SURB balancing:
296/// *local SURB balancing* and *remote SURB balancing*.
297///
298/// ### Local SURB balancing
299/// Local SURB balancing is performed on the sessions that were initiated by another party (and are
300/// therefore incoming to us).
301/// The local SURB balancing mechanism continuously evaluates the rate of SURB consumption and retrieval,
302/// and if SURBs are running out, the packet egress shaping takes effect. This by itself does not
303/// avoid the depletion of SURBs but slows it down in the hope that the initiating party can deliver
304/// more SURBs over time. This might happen either organically by sending effective payloads that
305/// allow non-zero number of SURBs in the packet, or non-organically by delivering KeepAlive messages
306/// via *remote SURB balancing*.
307///
308/// The egress shaping is done automatically, unless the Session initiator sets the [`Capability::NoRateControl`]
309/// flag during Session initiation.
310///
311/// ### Remote SURB balancing
312/// Remote SURB balancing is performed by the Session initiator. The SURB balancer estimates the number of SURBs
313/// delivered to the other party, and also the number of SURBs consumed by seeing the amount of traffic received
314/// in replies.
315/// When enabled, a desired target level of SURBs at the Session counterparty is set. According to measured
316/// inflow and outflow of SURBs to/from the counterparty, the production of non-organic SURBs is started
317/// via keep-alive messages (sent to counterparty) and is controlled to maintain that target level.
318///
319/// In other words, the Session initiator tries to compensate for the usage of SURBs by the counterparty by
320/// sending new ones via the keep-alive messages.
321///
322/// This mechanism is configurable via the `surb_management` field in [`SessionClientConfig`].
323///
324/// ### Possible scenarios
325/// There are 4 different scenarios of local vs. remote SURB balancing configuration, but
326/// an equilibrium (= matching the SURB production and consumption) is most likely to be reached
327/// only when both are configured (the ideal case below):
328///
329/// #### 1. Ideal local and remote SURB balancing
330/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
331///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
332/// 2. The Session initiator (Entry) sets the [`target_surb_buffer_size`](SurbBalancerConfig) which matches the
333///    [`maximum_surb_buffer_size`](SessionManagerConfig) of the counterparty.
334/// 3. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
335///    Session.
336/// 4. The Session initiator (Entry) sets [`max_surbs_per_sec`](SurbBalancerConfig) slightly higher than the
337///    `maximum_surb_buffer_size / max_surb_buffer_duration` value configured at the counterparty.
338///
339/// In this situation, the maximum Session egress from Exit to the Entry is given by the
340/// `maximum_surb_buffer_size / max_surb_buffer_duration` ratio. If there is enough bandwidth,
341/// the (remote) SURB balancer sending SURBs to the Exit will stabilize roughly at this rate of SURBs/sec,
342/// and the whole system will be in equilibrium during the Session's lifetime (under ideal network conditions).
343///
344/// #### 2. Remote SURB balancing only
345/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
346/// 2. The Session initiator (Entry) sets `max_surbs_per_sec` and `target_surb_buffer_size` values in
347///    [`SurbBalancerConfig`]
348///
349/// In this one-sided situation, the Entry node floods the Exit node with SURBs,
350/// only based on its estimated consumption of SURBs at the Exit. The Exit's egress is not
351/// rate-limited at all. If the Exit runs out of SURBs at any point in time, it will simply drop egress packets.
352///
353/// This configuration could potentially only lead to an equilibrium
354/// when the `SurbBalancer` at the Entry can react fast enough to Exit's demand.
355///
356/// #### 3. Local SURB balancing only
357/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
358///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
359/// 2. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
360///    Session.
361/// 3. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
362///
363/// In this one-sided situation, the Entry node does not provide any additional SURBs at all (except the
364/// ones which are naturally carried by the egress packets which have space to hold SURBs). It relies
365/// only on the Session egress limiting of the Exit node.
366/// The Exit will limit the egress roughly to the rate of natural SURB occurrence in the ingress.
367///
368/// This configuration could potentially only lead to an equilibrium when uploading non-full packets
369/// (ones that can carry at least a single SURB), and the Exit's egress is limiting itself to such a rate.
370/// If Exit's egress reaches low values due to SURB scarcity, the upper layer protocols over Session might break.
371///
372/// #### 4. No SURB balancing on each side
373/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
374/// 2. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
375///
376/// In this situation, no additional SURBs are being produced by the Entry and no Session egress rate-limiting
377/// takes place at the Exit.
378///
379/// This configuration can only lead to an equilibrium when Entry sends non-full packets (ones that carry
380/// at least a single SURB) and the Exit is consuming the SURBs (Session egress) at a slower or equal rate.
381/// Such configuration is very fragile, as any disturbances in the SURB flow might lead to a packet drop
382/// at the Exit's egress.
383///
384/// ### SURB decay
385/// In a hypothetical scenario of a non-zero packet loss, the Session initiator (Entry) might send a
386/// certain number of SURBs to the Session recipient (Exit), but only a portion of it is actually delivered.
387/// The Entry has no way of knowing that and assumes that everything has been delivered.
388/// A similar problem happens when the Exit uses SURBs to construct return packets, but only a portion
389/// of those packets is actually delivered to the Entry. At this point, the Entry also subtracts
390/// fewer SURBs from its SURB estimate at the Exit.
391///
392/// In both situations, the Entry thinks there are more SURBs available at the Exit than there really are.
393///
394/// To compensate for a potential packet loss, the Entry's estimation of Exit's SURB buffer is regularly
395/// diminished by a percentage of the `target_surb_buffer_size`, even if no incoming traffic from the
396/// Exit is detected.
397///
398/// This behavior can be controlled via the `surb_decay` field of [`SurbBalancerConfig`].
399///
400/// ### Automatic `target_surb_buffer_size` increase
401/// This mechanism only applies to the Session recipient (Exit) and on Sessions without the
402/// [`Capability::NoRateControl`] flag set.
403/// In this case, the Exit throttles the Session egress based on the ratio between
404/// of the estimated SURB balance and the *hint* of Entry's `target_surb_buffer_size` set during the Session initiation.
405///
406/// However, as the Entry might [increase](SessionManager::update_surb_balancer_config) the `target_surb_buffer_size`
407/// of the Session dynamically, the new value is never hinted again to the Exit (this only happens once during
408/// the Session initiation).
409/// For this reason, the Exit then might observe the ratio going higher than 1. When this happens consistently
410/// over some given time period, the Exit can decide to increase the initial hint to the newly observed value.
411///
412/// See the `growable_target_surb_buffer` field in the [`SessionManagerConfig`] for details.
413pub struct SessionManager<S> {
414    session_initiations: SessionInitiationCache,
415    session_notifiers: Arc<OnceLock<SessionNotifiers>>,
416    sessions: moka::future::Cache<SessionId, SessionSlot>,
417    msg_sender: Arc<OnceLock<S>>,
418    cfg: SessionManagerConfig,
419}
420
421impl<S> Clone for SessionManager<S> {
422    fn clone(&self) -> Self {
423        Self {
424            session_initiations: self.session_initiations.clone(),
425            session_notifiers: self.session_notifiers.clone(),
426            sessions: self.sessions.clone(),
427            cfg: self.cfg.clone(),
428            msg_sender: self.msg_sender.clone(),
429        }
430    }
431}
432
433impl<S> SessionManager<S>
434where
435    S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
436    S::Error: std::error::Error + Send + Sync + Clone + 'static,
437{
438    /// Creates a new instance given the [`config`](SessionManagerConfig).
439    pub fn new(mut cfg: SessionManagerConfig) -> Self {
440        let min_session_tag_range_reservation = ReservedTag::range().end;
441        debug_assert!(
442            min_session_tag_range_reservation > HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG.as_u64(),
443            "invalid tag reservation range"
444        );
445
446        // Accommodate the lower bound if too low.
447        if cfg.session_tag_range.start < min_session_tag_range_reservation {
448            let diff = min_session_tag_range_reservation - cfg.session_tag_range.start;
449            cfg.session_tag_range = min_session_tag_range_reservation..cfg.session_tag_range.end + diff;
450        }
451        cfg.maximum_sessions = cfg
452            .maximum_sessions
453            .clamp(1, (cfg.session_tag_range.end - cfg.session_tag_range.start) as usize);
454
455        // Ensure the Frame MTU is at least the size of the Session segment MTU payload
456        cfg.frame_mtu = cfg.frame_mtu.max(SESSION_MTU);
457        cfg.max_frame_timeout = cfg.max_frame_timeout.max(MIN_FRAME_TIMEOUT);
458
459        #[cfg(all(feature = "prometheus", not(test)))]
460        METRIC_ACTIVE_SESSIONS.set(0.0);
461
462        let msg_sender = Arc::new(OnceLock::new());
463        Self {
464            msg_sender: msg_sender.clone(),
465            session_initiations: moka::future::Cache::builder()
466                .max_capacity(cfg.maximum_sessions as u64)
467                .time_to_live(
468                    2 * initiation_timeout_max_one_way(
469                        cfg.initiation_timeout_base,
470                        RoutingOptions::MAX_INTERMEDIATE_HOPS,
471                    ),
472                )
473                .build(),
474            sessions: moka::future::Cache::builder()
475                .max_capacity(cfg.maximum_sessions as u64)
476                .time_to_idle(cfg.idle_timeout)
477                .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
478                    moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
479                        trace!(?session_id, ?reason, "session evicted from the cache");
480                        close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
481                    }
482                    _ => {}
483                })
484                .build(),
485            session_notifiers: Arc::new(OnceLock::new()),
486            cfg,
487        }
488    }
489
490    /// Starts the instance with the given `msg_sender` `Sink`
491    /// and a channel `new_session_notifier` used to notify when a new incoming session is opened to us.
492    ///
493    /// This method must be called prior to any calls to [`SessionManager::new_session`] or
494    /// [`SessionManager::dispatch_message`].
495    pub fn start(
496        &self,
497        msg_sender: S,
498        new_session_notifier: UnboundedSender<IncomingSession>,
499    ) -> crate::errors::Result<Vec<AbortHandle>> {
500        self.msg_sender
501            .set(msg_sender)
502            .map_err(|_| SessionManagerError::AlreadyStarted)?;
503
504        let (session_close_tx, session_close_rx) = futures::channel::mpsc::unbounded();
505        self.session_notifiers
506            .set((new_session_notifier, session_close_tx))
507            .map_err(|_| SessionManagerError::AlreadyStarted)?;
508
509        let myself = self.clone();
510        let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable!(session_close_rx.for_each_concurrent(
511            None,
512            move |(session_id, closure_reason)| {
513                let myself = myself.clone();
514                async move {
515                    // These notifications come from the Sessions themselves once
516                    // an empty read is encountered, which means the closure was done by the
517                    // other party.
518                    if let Some(session_data) = myself.sessions.remove(&session_id).await {
519                        close_session(session_id, session_data, closure_reason);
520                    } else {
521                        // Do not treat this as an error
522                        debug!(
523                            ?session_id,
524                            ?closure_reason,
525                            "could not find session id to close, maybe the session is already closed"
526                        );
527                    }
528                }
529            },
530        ));
531
532        // This is necessary to evict expired entries from the caches if
533        // no session-related operations happen at all.
534        // This ensures the dangling expired sessions are properly closed
535        // and their closure is timely notified to the other party.
536        let myself = self.clone();
537        let ah_session_expiration = hopr_async_runtime::spawn_as_abortable!(async move {
538            let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
539            let timeout = 2 * initiation_timeout_max_one_way(
540                myself.cfg.initiation_timeout_base,
541                RoutingOptions::MAX_INTERMEDIATE_HOPS,
542            )
543            .min(myself.cfg.idle_timeout)
544            .mul_f64(jitter)
545                / 2;
546            futures_time::stream::interval(timeout.into())
547                .for_each(|_| {
548                    trace!("executing session cache evictions");
549                    futures::future::join(
550                        myself.sessions.run_pending_tasks(),
551                        myself.session_initiations.run_pending_tasks(),
552                    )
553                    .map(|_| ())
554                })
555                .await;
556        });
557
558        Ok(vec![ah_closure_notifications, ah_session_expiration])
559    }
560
561    /// Check if [`start`](SessionManager::start) has been called and the instance is running.
562    pub fn is_started(&self) -> bool {
563        self.session_notifiers.get().is_some()
564    }
565
566    async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
567        // We currently do not support loopback Sessions on ourselves.
568        let abort_handles_clone = slot.abort_handles.clone();
569        if let moka::ops::compute::CompResult::Inserted(_) = self
570            .sessions
571            .entry(session_id)
572            .and_compute_with(|entry| {
573                futures::future::ready(if entry.is_none() {
574                    moka::ops::compute::Op::Put(slot)
575                } else {
576                    moka::ops::compute::Op::Nop
577                })
578            })
579            .await
580        {
581            #[cfg(all(feature = "prometheus", not(test)))]
582            {
583                METRIC_NUM_INITIATED_SESSIONS.increment();
584                METRIC_ACTIVE_SESSIONS.increment(1.0);
585            }
586
587            Ok(())
588        } else {
589            // Session already exists; it means it is most likely a loopback attempt
590            error!(%session_id, "session already exists - loopback attempt");
591            abort_handles_clone.into_iter().for_each(|ah| ah.abort());
592            Err(SessionManagerError::Loopback.into())
593        }
594    }
595
596    /// Initiates a new outgoing Session to `destination` with the given configuration.
597    ///
598    /// If the Session's counterparty does not respond within
599    /// the [configured](SessionManagerConfig) period,
600    /// this method returns [`TransportSessionError::Timeout`].
601    ///
602    /// It will also fail if the instance has not been [started](SessionManager::start).
603    pub async fn new_session(
604        &self,
605        destination: Address,
606        target: SessionTarget,
607        cfg: SessionClientConfig,
608    ) -> crate::errors::Result<HoprSession> {
609        self.sessions.run_pending_tasks().await;
610        if self.cfg.maximum_sessions <= self.sessions.entry_count() as usize {
611            return Err(SessionManagerError::TooManySessions.into());
612        }
613
614        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
615
616        let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
617        let challenge = insert_into_next_slot(
618            &self.session_initiations,
619            |ch| {
620                if let Some(challenge) = ch {
621                    ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
622                } else {
623                    hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
624                }
625            },
626            tx_initiation_done,
627        )
628        .await
629        .ok_or(SessionManagerError::NoChallengeSlots)?; // almost impossible with u64
630
631        // Prepare the session initiation message in the Start protocol
632        trace!(challenge, ?cfg, "initiating session with config");
633        let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
634            challenge,
635            target,
636            capabilities: ByteCapabilities(cfg.capabilities),
637            additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
638                cfg.surb_management
639                    .map(|c| c.target_surb_buffer_size)
640                    .unwrap_or(
641                        self.cfg.initial_return_session_egress_rate as u64
642                            * self
643                                .cfg
644                                .minimum_surb_buffer_duration
645                                .max(MIN_SURB_BUFFER_DURATION)
646                                .as_secs(),
647                    )
648                    .min(u32::MAX as u64) as u32
649            } else {
650                0
651            },
652        });
653
654        let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
655        let forward_routing = DestinationRouting::Forward {
656            destination,
657            pseudonym: Some(pseudonym), // Session must use a fixed pseudonym already
658            forward_options: cfg.forward_path_options.clone(),
659            return_options: cfg.return_path_options.clone().into(),
660        };
661
662        // Send the Session initiation message
663        info!(challenge, %pseudonym, %destination, "new session request");
664        msg_sender
665            .send((
666                forward_routing.clone(),
667                ApplicationDataOut::with_no_packet_info(start_session_msg.try_into()?),
668            ))
669            .await
670            .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?;
671
672        // The timeout is given by the number of hops requested
673        let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
674            self.cfg.initiation_timeout_base,
675            cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
676        )
677        .into();
678
679        // Await session establishment response from the Exit node or timeout
680        pin_mut!(rx_initiation_done);
681
682        trace!(challenge, "awaiting session establishment");
683        match rx_initiation_done.try_next().timeout(initiation_timeout).await {
684            Ok(Ok(Some(est))) => {
685                // Session has been established, construct it
686                let session_id = est.session_id;
687                debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
688
689                let (tx, rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
690                let notifier = self
691                    .session_notifiers
692                    .get()
693                    .map(|(_, notifier)| {
694                        let notifier = notifier.clone();
695                        Box::new(move |session_id: SessionId, reason: ClosureReason| {
696                            let _ = notifier
697                                .unbounded_send((session_id, reason))
698                                .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
699                        })
700                    })
701                    .ok_or(SessionManagerError::NotStarted)?;
702
703                // NOTE: the Exit node can have different `max_surb_buffer_size`
704                // setting on the Session manager, so it does not make sense to cap it here
705                // with our maximum value.
706                if let Some(balancer_config) = cfg.surb_management {
707                    let surb_estimator = AtomicSurbFlowEstimator::default();
708
709                    // Sender responsible for keep-alive and Session data will be counting produced SURBs
710                    let surb_estimator_clone = surb_estimator.clone();
711                    let full_surb_scoring_sender =
712                        msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
713                            // Count how many SURBs we sent with each packet
714                            surb_estimator_clone.produced.fetch_add(
715                                data.estimate_surbs_with_msg() as u64,
716                                std::sync::atomic::Ordering::Relaxed,
717                            );
718                            futures::future::ok::<_, S::Error>((routing, data))
719                        });
720
721                    // For standard Session data we first reduce the number of SURBs we want to produce,
722                    // unless requested to always max them out
723                    let max_out_organic_surbs = cfg.always_max_out_surbs;
724                    let reduced_surb_scoring_sender = full_surb_scoring_sender.clone().with(
725                        // NOTE: this is put in-front of the `full_surb_scoring_sender`,
726                        // so that its estimate of SURBs gets automatically updated based on
727                        // the `max_surbs_in_packets` set here.
728                        move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
729                            if !max_out_organic_surbs {
730                                // TODO: make this dynamic to honor the balancer target (#7439)
731                                data.packet_info
732                                    .get_or_insert_with(|| OutgoingPacketInfo {
733                                        max_surbs_in_packet: 1,
734                                        ..Default::default()
735                                    })
736                                    .max_surbs_in_packet = 1;
737                            }
738                            futures::future::ok::<_, S::Error>((routing, data))
739                        },
740                    );
741
742                    let mut abort_handles = Vec::new();
743
744                    // Spawn the SURB-bearing keep alive stream
745                    let (ka_controller, ka_abort_handle) =
746                        utils::spawn_keep_alive_stream(session_id, full_surb_scoring_sender, forward_routing.clone());
747                    abort_handles.push(ka_abort_handle);
748
749                    // Spawn the SURB balancer, which will decide on the initial SURB rate.
750                    debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
751                    let balancer = SurbBalancer::new(
752                        session_id,
753                        // The setpoint and output limit is immediately reconfigured by SurbBalancer
754                        PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
755                        surb_estimator.clone(),
756                        ka_controller,
757                        balancer_config,
758                    );
759
760                    let (level_stream, balancer_abort_handle) = balancer.start_control_loop(
761                        self.cfg.balancer_sampling_interval,
762                        SessionCacheBalancerFeedback(self.sessions.clone()),
763                        None,
764                    );
765                    abort_handles.push(balancer_abort_handle);
766
767                    // If the insertion fails prematurely, it will also kill all the abort handles
768                    self.insert_session_slot(
769                        session_id,
770                        SessionSlot {
771                            session_tx: Arc::new(tx),
772                            routing_opts: forward_routing.clone(),
773                            abort_handles,
774                            surb_mgmt: Some(balancer_config),
775                            surb_estimator: None,
776                        },
777                    )
778                    .await?;
779
780                    // Wait for enough SURBs to be sent to the counterparty
781                    // TODO: consider making this interactive = other party reports the exact level periodically
782                    match level_stream
783                        .skip_while(|current_level| {
784                            futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
785                        })
786                        .next()
787                        .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
788                        .await
789                    {
790                        Ok(Some(surb_level)) => {
791                            info!(%session_id, surb_level, "session is ready");
792                        }
793                        Ok(None) => {
794                            return Err(
795                                SessionManagerError::Other("surb balancer was cancelled prematurely".into()).into(),
796                            );
797                        }
798                        Err(_) => {
799                            warn!(%session_id, "session didn't reach target SURB buffer size in time");
800                        }
801                    }
802
803                    HoprSession::new(
804                        session_id,
805                        forward_routing,
806                        HoprSessionConfig {
807                            capabilities: cfg.capabilities,
808                            frame_mtu: self.cfg.frame_mtu,
809                            frame_timeout: self.cfg.max_frame_timeout,
810                        },
811                        (
812                            reduced_surb_scoring_sender,
813                            rx.inspect(move |_| {
814                                // Received packets = SURB consumption estimate
815                                // The received packets always consume a single SURB.
816                                surb_estimator
817                                    .consumed
818                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
819                            }),
820                        ),
821                        Some(notifier),
822                    )
823                } else {
824                    warn!(%session_id, "session ready without SURB balancing");
825
826                    self.insert_session_slot(
827                        session_id,
828                        SessionSlot {
829                            session_tx: Arc::new(tx),
830                            routing_opts: forward_routing.clone(),
831                            abort_handles: vec![],
832                            surb_mgmt: None,
833                            surb_estimator: None,
834                        },
835                    )
836                    .await?;
837
838                    // For standard Session data we first reduce the number of SURBs we want to produce,
839                    // unless requested to always max them out
840                    let max_out_organic_surbs = cfg.always_max_out_surbs;
841                    let reduced_surb_sender =
842                        msg_sender.with(move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
843                            if !max_out_organic_surbs {
844                                data.packet_info
845                                    .get_or_insert_with(|| OutgoingPacketInfo {
846                                        max_surbs_in_packet: 1,
847                                        ..Default::default()
848                                    })
849                                    .max_surbs_in_packet = 1;
850                            }
851                            futures::future::ok::<_, S::Error>((routing, data))
852                        });
853
854                    HoprSession::new(
855                        session_id,
856                        forward_routing,
857                        HoprSessionConfig {
858                            capabilities: cfg.capabilities,
859                            frame_mtu: self.cfg.frame_mtu,
860                            frame_timeout: self.cfg.max_frame_timeout,
861                        },
862                        (reduced_surb_sender, rx),
863                        Some(notifier),
864                    )
865                }
866            }
867            Ok(Ok(None)) => Err(SessionManagerError::Other(
868                "internal error: sender has been closed without completing the session establishment".into(),
869            )
870            .into()),
871            Ok(Err(e)) => {
872                // The other side did not allow us to establish a session
873                error!(
874                    challenge = e.challenge,
875                    error = ?e,
876                    "the other party rejected the session initiation with error"
877                );
878                Err(TransportSessionError::Rejected(e.reason))
879            }
880            Err(_) => {
881                // Timeout waiting for a session establishment
882                error!(challenge, "session initiation attempt timed out");
883
884                #[cfg(all(feature = "prometheus", not(test)))]
885                METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
886
887                Err(TransportSessionError::Timeout)
888            }
889        }
890    }
891
892    /// Sends a keep-alive packet with the given [`SessionId`].
893    ///
894    /// This currently "fires & forgets" and does not expect nor await any "pong" response.
895    pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
896        if let Some(session_data) = self.sessions.get(id).await {
897            trace!(session_id = ?id, "pinging manually session");
898            Ok(self
899                .msg_sender
900                .get()
901                .cloned()
902                .ok_or(SessionManagerError::NotStarted)?
903                .send((
904                    session_data.routing_opts.clone(),
905                    ApplicationDataOut::with_no_packet_info(HoprStartProtocol::KeepAlive((*id).into()).try_into()?),
906                ))
907                .await
908                .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?)
909        } else {
910            Err(SessionManagerError::NonExistingSession.into())
911        }
912    }
913
914    /// Returns [`SessionIds`](SessionId) of all currently active sessions.
915    pub async fn active_sessions(&self) -> Vec<SessionId> {
916        self.sessions.run_pending_tasks().await;
917        self.sessions.iter().map(|(k, _)| *k).collect()
918    }
919
920    /// Updates the configuration of the SURB balancer on the given [`SessionId`].
921    ///
922    /// Returns an error if the Session with the given `id` does not exist, or
923    /// if it does not use SURB balancing.
924    pub async fn update_surb_balancer_config(
925        &self,
926        id: &SessionId,
927        config: SurbBalancerConfig,
928    ) -> crate::errors::Result<()> {
929        match self
930            .sessions
931            .entry_by_ref(id)
932            .and_compute_with(|entry| {
933                futures::future::ready(if let Some(mut cached_session) = entry.map(|e| e.into_value()) {
934                    // Only update the config if there already was one before
935                    if cached_session.surb_mgmt.is_some() {
936                        cached_session.surb_mgmt = Some(config);
937                        moka::ops::compute::Op::Put(cached_session)
938                    } else {
939                        moka::ops::compute::Op::Nop
940                    }
941                } else {
942                    moka::ops::compute::Op::Nop
943                })
944            })
945            .await
946        {
947            moka::ops::compute::CompResult::ReplacedWith(_) => Ok(()),
948            moka::ops::compute::CompResult::Unchanged(_) => {
949                Err(SessionManagerError::Other("session does not use SURB balancing".into()).into())
950            }
951            _ => Err(SessionManagerError::NonExistingSession.into()),
952        }
953    }
954
955    /// Retrieves the configuration of SURB balancing for the given Session.
956    ///
957    /// Returns an error if the Session with the given `id` does not exist.
958    pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
959        match self.sessions.get(id).await {
960            Some(session) => Ok(session.surb_mgmt),
961            None => Err(SessionManagerError::NonExistingSession.into()),
962        }
963    }
964
965    /// The main method to be called whenever data are received.
966    ///
967    /// It tries to recognize the message and correctly dispatches either
968    /// the Session protocol or Start protocol messages.
969    ///
970    /// If the data are not recognized, they are returned as [`DispatchResult::Unrelated`].
971    pub async fn dispatch_message(
972        &self,
973        pseudonym: HoprPseudonym,
974        in_data: ApplicationDataIn,
975    ) -> crate::errors::Result<DispatchResult> {
976        if in_data.data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
977            // This is a Start protocol message, so we handle it
978            trace!("dispatching Start protocol message");
979            return self
980                .handle_start_protocol_message(pseudonym, in_data)
981                .await
982                .map(|_| DispatchResult::Processed);
983        } else if self
984            .cfg
985            .session_tag_range
986            .contains(&in_data.data.application_tag.as_u64())
987        {
988            let session_id = SessionId::new(in_data.data.application_tag, pseudonym);
989
990            return if let Some(session_data) = self.sessions.get(&session_id).await {
991                trace!(?session_id, "received data for a registered session");
992
993                Ok(session_data
994                    .session_tx
995                    .unbounded_send(in_data)
996                    .map(|_| DispatchResult::Processed)
997                    .map_err(|e| SessionManagerError::Other(e.to_string()))?)
998            } else {
999                error!(%session_id, "received data from an unestablished session");
1000                Err(TransportSessionError::UnknownData)
1001            };
1002        }
1003
1004        trace!(tag = %in_data.data.application_tag, "received data not associated with session protocol or any existing session");
1005        Ok(DispatchResult::Unrelated(in_data))
1006    }
1007
1008    async fn handle_incoming_session_initiation(
1009        &self,
1010        pseudonym: HoprPseudonym,
1011        session_req: StartInitiation<SessionTarget, ByteCapabilities>,
1012    ) -> crate::errors::Result<()> {
1013        trace!(challenge = session_req.challenge, "received session initiation request");
1014
1015        debug!(%pseudonym, "got new session request, searching for a free session slot");
1016
1017        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
1018
1019        let (new_session_notifier, close_session_notifier) = self
1020            .session_notifiers
1021            .get()
1022            .cloned()
1023            .ok_or(SessionManagerError::NotStarted)?;
1024
1025        // Reply routing uses SURBs only with the pseudonym of this Session's ID
1026        let reply_routing = DestinationRouting::Return(pseudonym.into());
1027
1028        let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1029
1030        // Search for a free Session ID slot
1031        self.sessions.run_pending_tasks().await; // Needed so that entry_count is updated
1032        let allocated_slot = if self.cfg.maximum_sessions > self.sessions.entry_count() as usize {
1033            insert_into_next_slot(
1034                &self.sessions,
1035                |sid| {
1036                    // NOTE: It is allowed to insert sessions using the same tag
1037                    // but different pseudonyms because the SessionId is different.
1038                    let next_tag: Tag = match sid {
1039                        Some(session_id) => ((session_id.tag().as_u64() + 1) % self.cfg.session_tag_range.end)
1040                            .max(self.cfg.session_tag_range.start)
1041                            .into(),
1042                        None => hopr_crypto_random::random_integer(
1043                            self.cfg.session_tag_range.start,
1044                            Some(self.cfg.session_tag_range.end),
1045                        )
1046                        .into(),
1047                    };
1048                    SessionId::new(next_tag, pseudonym)
1049                },
1050                SessionSlot {
1051                    session_tx: Arc::new(tx_session_data),
1052                    routing_opts: reply_routing.clone(),
1053                    abort_handles: vec![],
1054                    surb_mgmt: None,
1055                    surb_estimator: None,
1056                },
1057            )
1058            .await
1059        } else {
1060            error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
1061            None
1062        };
1063
1064        if let Some(session_id) = allocated_slot {
1065            debug!(%session_id, ?session_req, "assigned a new session");
1066
1067            let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
1068                if let Err(error) = close_session_notifier.unbounded_send((session_id, reason)) {
1069                    error!(%session_id, %error, %reason, "failed to notify session closure");
1070                }
1071            });
1072
1073            let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
1074                let surb_estimator = AtomicSurbFlowEstimator::default();
1075
1076                // Because of SURB scarcity, control the egress rate of incoming sessions
1077                let egress_rate_control =
1078                    RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1079
1080                // The Session request carries a "hint" as additional data telling what
1081                // the Session initiator has configured as its target buffer size in the Balancer.
1082                let target_surb_buffer_size = if session_req.additional_data > 0 {
1083                    (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1084                } else {
1085                    self.cfg.initial_return_session_egress_rate as u64
1086                        * self
1087                            .cfg
1088                            .minimum_surb_buffer_duration
1089                            .max(MIN_SURB_BUFFER_DURATION)
1090                            .as_secs()
1091                };
1092
1093                let surb_estimator_clone = surb_estimator.clone();
1094                let session = HoprSession::new(
1095                    session_id,
1096                    reply_routing.clone(),
1097                    HoprSessionConfig {
1098                        capabilities: session_req.capabilities.into(),
1099                        frame_mtu: self.cfg.frame_mtu,
1100                        frame_timeout: self.cfg.max_frame_timeout,
1101                    },
1102                    (
1103                        // Sent packets = SURB consumption estimate
1104                        msg_sender
1105                            .clone()
1106                            .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1107                                // Each outgoing packet consumes one SURB
1108                                surb_estimator_clone
1109                                    .consumed
1110                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1111                                futures::future::ok::<_, S::Error>((routing, data))
1112                            })
1113                            .rate_limit_with_controller(&egress_rate_control)
1114                            .buffer((2 * target_surb_buffer_size) as usize),
1115                        // Received packets = SURB retrieval estimate
1116                        rx_session_data.inspect(move |data| {
1117                            // Count the number of SURBs delivered with each incoming packet
1118                            surb_estimator_clone
1119                                .produced
1120                                .fetch_add(data.num_surbs_with_msg() as u64, std::sync::atomic::Ordering::Relaxed);
1121                        }),
1122                    ),
1123                    Some(closure_notifier),
1124                )?;
1125
1126                // The SURB balancer will start intervening by rate-limiting the
1127                // egress of the Session, once the estimated number of SURBs drops below
1128                // the target defined here. Otherwise, the maximum egress is allowed.
1129                let balancer_config = SurbBalancerConfig {
1130                    target_surb_buffer_size,
1131                    // At maximum egress, the SURB buffer drains in `minimum_surb_buffer_duration` seconds
1132                    max_surbs_per_sec: target_surb_buffer_size
1133                        / self
1134                            .cfg
1135                            .minimum_surb_buffer_duration
1136                            .max(MIN_SURB_BUFFER_DURATION)
1137                            .as_secs(),
1138                    // No SURB decay at the Exit, since we know almost exactly how many SURBs
1139                    // were received
1140                    surb_decay: None,
1141                };
1142
1143                // Assign the SURB balancer and abort handles to the already allocated Session slot
1144                let (balancer_abort_handle, balancer_abort_reg) = AbortHandle::new_pair();
1145                if let moka::ops::compute::CompResult::ReplacedWith(_) = self
1146                    .sessions
1147                    .entry(session_id)
1148                    .and_compute_with(|entry| {
1149                        if let Some(mut cached_session) = entry.map(|c| c.into_value()) {
1150                            cached_session.abort_handles.push(balancer_abort_handle);
1151                            cached_session.surb_mgmt = Some(balancer_config);
1152                            cached_session.surb_estimator = Some(surb_estimator.clone());
1153                            futures::future::ready(moka::ops::compute::Op::Put(cached_session))
1154                        } else {
1155                            futures::future::ready(moka::ops::compute::Op::Nop)
1156                        }
1157                    })
1158                    .await
1159                {
1160                    // Spawn the SURB balancer only once we know we have registered the
1161                    // abort handle with the pre-allocated Session slot
1162                    debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1163                    let balancer = SurbBalancer::new(
1164                        session_id,
1165                        if let Some((growth_window, ratio_threshold)) = self.cfg.growable_target_surb_buffer.as_ref() {
1166                            // Allow automatic setpoint increase
1167                            SimpleBalancerController::with_increasing_setpoint(
1168                                *ratio_threshold,
1169                                (growth_window
1170                                    .div_duration_f64(self.cfg.balancer_sampling_interval)
1171                                    .round() as usize)
1172                                    .max(1),
1173                            )
1174                        } else {
1175                            SimpleBalancerController::default()
1176                        },
1177                        surb_estimator,
1178                        SurbControllerWithCorrection(egress_rate_control, 1), // 1 SURB per egress packet
1179                        balancer_config,
1180                    );
1181
1182                    let _ = balancer.start_control_loop(
1183                        self.cfg.balancer_sampling_interval,
1184                        SessionCacheBalancerFeedback(self.sessions.clone()),
1185                        Some(balancer_abort_reg),
1186                    );
1187                } else {
1188                    // This should never happen, but be sure we handle this error
1189                    return Err(SessionManagerError::Other(
1190                        "failed to spawn SURB balancer - inconsistent cache".into(),
1191                    )
1192                    .into());
1193                }
1194
1195                session
1196            } else {
1197                HoprSession::new(
1198                    session_id,
1199                    reply_routing.clone(),
1200                    HoprSessionConfig {
1201                        capabilities: session_req.capabilities.into(),
1202                        frame_mtu: self.cfg.frame_mtu,
1203                        frame_timeout: self.cfg.max_frame_timeout,
1204                    },
1205                    (msg_sender.clone(), rx_session_data),
1206                    Some(closure_notifier),
1207                )?
1208            };
1209
1210            // Extract useful information about the session from the Start protocol message
1211            let incoming_session = IncomingSession {
1212                session,
1213                target: session_req.target,
1214            };
1215
1216            // Notify that a new incoming session has been created
1217            if let Err(error) = new_session_notifier.unbounded_send(incoming_session) {
1218                warn!(%error, "failed to send session to incoming session queue");
1219            }
1220
1221            trace!(?session_id, "session notification sent");
1222
1223            // Notify the sender that the session has been established.
1224            // Set our peer ID in the session ID sent back to them.
1225            let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1226                orig_challenge: session_req.challenge,
1227                session_id,
1228            });
1229
1230            msg_sender
1231                .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1232                .await
1233                .map_err(|e| {
1234                    SessionManagerError::Other(format!("failed to send session establishment message: {e}"))
1235                })?;
1236
1237            info!(%session_id, "new session established");
1238
1239            #[cfg(all(feature = "prometheus", not(test)))]
1240            {
1241                METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1242                METRIC_ACTIVE_SESSIONS.increment(1.0);
1243            }
1244        } else {
1245            error!(%pseudonym,"failed to reserve a new session slot");
1246
1247            // Notify the sender that the session could not be established
1248            let reason = StartErrorReason::NoSlotsAvailable;
1249            let data = HoprStartProtocol::SessionError(StartErrorType {
1250                challenge: session_req.challenge,
1251                reason,
1252            });
1253
1254            msg_sender
1255                .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1256                .await
1257                .map_err(|e| {
1258                    SessionManagerError::Other(format!("failed to send session establishment error message: {e}"))
1259                })?;
1260
1261            trace!(%pseudonym, "session establishment failure message sent");
1262
1263            #[cfg(all(feature = "prometheus", not(test)))]
1264            METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1265        }
1266
1267        Ok(())
1268    }
1269
1270    async fn handle_start_protocol_message(
1271        &self,
1272        pseudonym: HoprPseudonym,
1273        data: ApplicationDataIn,
1274    ) -> crate::errors::Result<()> {
1275        match HoprStartProtocol::try_from(data.data)? {
1276            HoprStartProtocol::StartSession(session_req) => {
1277                self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1278            }
1279            HoprStartProtocol::SessionEstablished(est) => {
1280                trace!(
1281                    session_id = ?est.session_id,
1282                    "received session establishment confirmation"
1283                );
1284                let challenge = est.orig_challenge;
1285                let session_id = est.session_id;
1286                if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1287                    if let Err(e) = tx_est.unbounded_send(Ok(est)) {
1288                        return Err(SessionManagerError::Other(format!(
1289                            "could not notify session {session_id} establishment: {e}"
1290                        ))
1291                        .into());
1292                    }
1293                    debug!(?session_id, challenge, "session establishment complete");
1294                } else {
1295                    error!(%session_id, challenge, "unknown session establishment attempt or expired");
1296                }
1297            }
1298            HoprStartProtocol::SessionError(error) => {
1299                trace!(
1300                    challenge = error.challenge,
1301                    error = ?error.reason,
1302                    "failed to initialize a session",
1303                );
1304                // Currently, we do not distinguish between individual error types
1305                // and just discard the initiation attempt and pass on the error.
1306                if let Some(tx_est) = self.session_initiations.remove(&error.challenge).await {
1307                    if let Err(e) = tx_est.unbounded_send(Err(error)) {
1308                        return Err(SessionManagerError::Other(format!(
1309                            "could not notify session establishment error {error:?}: {e}"
1310                        ))
1311                        .into());
1312                    }
1313                    error!(
1314                        challenge = error.challenge,
1315                        ?error,
1316                        "session establishment error received"
1317                    );
1318                } else {
1319                    error!(
1320                        challenge = error.challenge,
1321                        ?error,
1322                        "session establishment attempt expired before error could be delivered"
1323                    );
1324                }
1325
1326                #[cfg(all(feature = "prometheus", not(test)))]
1327                METRIC_RECEIVED_SESSION_ERRS.increment(&[&error.reason.to_string()])
1328            }
1329            HoprStartProtocol::KeepAlive(msg) => {
1330                let session_id = msg.session_id;
1331                if let Some(session_slot) = self.sessions.get(&session_id).await {
1332                    trace!(?session_id, "received keep-alive request");
1333                    // If the session has a SURB flow estimator, increase the number of received SURBs.
1334                    // This applies to the incoming sessions currently
1335                    if let Some(estimator) = session_slot.surb_estimator.as_ref() {
1336                        // Two SURBs per Keep-Alive message
1337                        estimator.produced.fetch_add(
1338                            KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64,
1339                            std::sync::atomic::Ordering::Relaxed,
1340                        );
1341                    }
1342                } else {
1343                    debug!(%session_id, "received keep-alive request for an unknown session");
1344                }
1345            }
1346        }
1347
1348        Ok(())
1349    }
1350}
1351
1352#[cfg(test)]
1353mod tests {
1354    use anyhow::anyhow;
1355    use futures::{AsyncWriteExt, future::BoxFuture};
1356    use hopr_crypto_random::Randomizable;
1357    use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
1358    use hopr_primitive_types::prelude::Address;
1359    use hopr_protocol_start::StartProtocolDiscriminants;
1360    use tokio::time::timeout;
1361
1362    use super::*;
1363    use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1364
1365    #[async_trait::async_trait]
1366    trait SendMsg {
1367        async fn send_message(
1368            &self,
1369            routing: DestinationRouting,
1370            data: ApplicationDataOut,
1371        ) -> crate::errors::Result<()>;
1372    }
1373
1374    mockall::mock! {
1375        MsgSender {}
1376        impl SendMsg for MsgSender {
1377            fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationDataOut)
1378            -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1379        }
1380    }
1381
1382    fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationDataOut)> {
1383        let (tx, rx) = futures::channel::mpsc::unbounded();
1384        tokio::task::spawn(async move {
1385            pin_mut!(rx);
1386            while let Some((routing, data)) = rx.next().await {
1387                sender
1388                    .send_message(routing, data)
1389                    .await
1390                    .expect("send message must not fail in mock");
1391            }
1392        });
1393        tx
1394    }
1395
1396    fn msg_type(data: &ApplicationDataOut, expected: StartProtocolDiscriminants) -> bool {
1397        HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1398            .map(|d| StartProtocolDiscriminants::from(d) == expected)
1399            .unwrap_or(false)
1400    }
1401
1402    #[test_log::test(tokio::test)]
1403    async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1404    {
1405        let alice_pseudonym = HoprPseudonym::random();
1406        let bob_peer: Address = (&ChainKeypair::random()).into();
1407
1408        let alice_mgr = SessionManager::new(Default::default());
1409        let bob_mgr = SessionManager::new(Default::default());
1410
1411        let mut sequence = mockall::Sequence::new();
1412        let mut alice_transport = MockMsgSender::new();
1413        let mut bob_transport = MockMsgSender::new();
1414
1415        // Alice sends the StartSession message
1416        let bob_mgr_clone = bob_mgr.clone();
1417        alice_transport
1418            .expect_send_message()
1419            .once()
1420            .in_sequence(&mut sequence)
1421            .withf(move |peer, data| {
1422                info!("alice sends {}", data.data.application_tag);
1423                msg_type(data, StartProtocolDiscriminants::StartSession)
1424                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1425            })
1426            .returning(move |_, data| {
1427                let bob_mgr_clone = bob_mgr_clone.clone();
1428                Box::pin(async move {
1429                    bob_mgr_clone
1430                        .dispatch_message(
1431                            alice_pseudonym,
1432                            ApplicationDataIn {
1433                                data: data.data,
1434                                packet_info: Default::default(),
1435                            },
1436                        )
1437                        .await?;
1438                    Ok(())
1439                })
1440            });
1441
1442        // Bob sends the SessionEstablished message
1443        let alice_mgr_clone = alice_mgr.clone();
1444        bob_transport
1445            .expect_send_message()
1446            .once()
1447            .in_sequence(&mut sequence)
1448            .withf(move |peer, data| {
1449                info!("bob sends {}", data.data.application_tag);
1450                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1451                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1452            })
1453            .returning(move |_, data| {
1454                let alice_mgr_clone = alice_mgr_clone.clone();
1455
1456                Box::pin(async move {
1457                    alice_mgr_clone
1458                        .dispatch_message(
1459                            alice_pseudonym,
1460                            ApplicationDataIn {
1461                                data: data.data,
1462                                packet_info: Default::default(),
1463                            },
1464                        )
1465                        .await?;
1466                    Ok(())
1467                })
1468            });
1469
1470        // Alice sends the terminating segment to close the Session
1471        let bob_mgr_clone = bob_mgr.clone();
1472        alice_transport
1473            .expect_send_message()
1474            .once()
1475            .in_sequence(&mut sequence)
1476            .withf(move |peer, data| {
1477                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1478                    data.data.plain_text.as_ref(),
1479                )
1480                .expect("must be a session message")
1481                .try_as_segment()
1482                .expect("must be a segment")
1483                .is_terminating()
1484                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1485            })
1486            .returning(move |_, data| {
1487                let bob_mgr_clone = bob_mgr_clone.clone();
1488                Box::pin(async move {
1489                    bob_mgr_clone
1490                        .dispatch_message(
1491                            alice_pseudonym,
1492                            ApplicationDataIn {
1493                                data: data.data,
1494                                packet_info: Default::default(),
1495                            },
1496                        )
1497                        .await?;
1498                    Ok(())
1499                })
1500            });
1501
1502        let mut ahs = Vec::new();
1503
1504        // Start Alice
1505        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1506        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1507
1508        // Start Bob
1509        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1510        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1511
1512        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1513
1514        pin_mut!(new_session_rx_bob);
1515        let (alice_session, bob_session) = timeout(
1516            Duration::from_secs(2),
1517            futures::future::join(
1518                alice_mgr.new_session(
1519                    bob_peer,
1520                    SessionTarget::TcpStream(target.clone()),
1521                    SessionClientConfig {
1522                        pseudonym: alice_pseudonym.into(),
1523                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1524                        surb_management: None,
1525                        ..Default::default()
1526                    },
1527                ),
1528                new_session_rx_bob.next(),
1529            ),
1530        )
1531        .await?;
1532
1533        let mut alice_session = alice_session?;
1534        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1535
1536        assert_eq!(
1537            alice_session.config().capabilities,
1538            Capability::Segmentation | Capability::NoRateControl
1539        );
1540        assert_eq!(
1541            alice_session.config().capabilities,
1542            bob_session.session.config().capabilities
1543        );
1544        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1545
1546        assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1547        assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1548        assert!(
1549            alice_mgr
1550                .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1551                .await
1552                .is_err()
1553        );
1554
1555        assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1556        assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1557        assert!(
1558            bob_mgr
1559                .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1560                .await
1561                .is_err()
1562        );
1563
1564        tokio::time::sleep(Duration::from_millis(100)).await;
1565        alice_session.close().await?;
1566
1567        tokio::time::sleep(Duration::from_millis(100)).await;
1568
1569        assert!(matches!(
1570            alice_mgr.ping_session(alice_session.id()).await,
1571            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1572        ));
1573
1574        futures::stream::iter(ahs)
1575            .for_each(|ah| async move { ah.abort() })
1576            .await;
1577
1578        Ok(())
1579    }
1580
1581    #[test_log::test(tokio::test)]
1582    async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1583        let alice_pseudonym = HoprPseudonym::random();
1584        let bob_peer: Address = (&ChainKeypair::random()).into();
1585
1586        let cfg = SessionManagerConfig {
1587            idle_timeout: Duration::from_millis(200),
1588            ..Default::default()
1589        };
1590
1591        let alice_mgr = SessionManager::new(cfg);
1592        let bob_mgr = SessionManager::new(Default::default());
1593
1594        let mut sequence = mockall::Sequence::new();
1595        let mut alice_transport = MockMsgSender::new();
1596        let mut bob_transport = MockMsgSender::new();
1597
1598        // Alice sends the StartSession message
1599        let bob_mgr_clone = bob_mgr.clone();
1600        alice_transport
1601            .expect_send_message()
1602            .once()
1603            .in_sequence(&mut sequence)
1604            .withf(move |peer, data| {
1605                msg_type(data, StartProtocolDiscriminants::StartSession)
1606                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1607            })
1608            .returning(move |_, data| {
1609                let bob_mgr_clone = bob_mgr_clone.clone();
1610                Box::pin(async move {
1611                    bob_mgr_clone
1612                        .dispatch_message(
1613                            alice_pseudonym,
1614                            ApplicationDataIn {
1615                                data: data.data,
1616                                packet_info: Default::default(),
1617                            },
1618                        )
1619                        .await?;
1620                    Ok(())
1621                })
1622            });
1623
1624        // Bob sends the SessionEstablished message
1625        let alice_mgr_clone = alice_mgr.clone();
1626        bob_transport
1627            .expect_send_message()
1628            .once()
1629            .in_sequence(&mut sequence)
1630            .withf(move |peer, data| {
1631                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1632                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1633            })
1634            .returning(move |_, data| {
1635                let alice_mgr_clone = alice_mgr_clone.clone();
1636
1637                Box::pin(async move {
1638                    alice_mgr_clone
1639                        .dispatch_message(
1640                            alice_pseudonym,
1641                            ApplicationDataIn {
1642                                data: data.data,
1643                                packet_info: Default::default(),
1644                            },
1645                        )
1646                        .await?;
1647                    Ok(())
1648                })
1649            });
1650
1651        let mut ahs = Vec::new();
1652
1653        // Start Alice
1654        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1655        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1656
1657        // Start Bob
1658        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
1659        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1660
1661        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1662
1663        pin_mut!(new_session_rx_bob);
1664        let (alice_session, bob_session) = timeout(
1665            Duration::from_secs(2),
1666            futures::future::join(
1667                alice_mgr.new_session(
1668                    bob_peer,
1669                    SessionTarget::TcpStream(target.clone()),
1670                    SessionClientConfig {
1671                        pseudonym: alice_pseudonym.into(),
1672                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1673                        surb_management: None,
1674                        ..Default::default()
1675                    },
1676                ),
1677                new_session_rx_bob.next(),
1678            ),
1679        )
1680        .await?;
1681
1682        let alice_session = alice_session?;
1683        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1684
1685        assert_eq!(
1686            alice_session.config().capabilities,
1687            Capability::Segmentation | Capability::NoRateControl,
1688        );
1689        assert_eq!(
1690            alice_session.config().capabilities,
1691            bob_session.session.config().capabilities
1692        );
1693        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1694
1695        // Let the session timeout at Alice
1696        tokio::time::sleep(Duration::from_millis(300)).await;
1697
1698        assert!(matches!(
1699            alice_mgr.ping_session(alice_session.id()).await,
1700            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1701        ));
1702
1703        futures::stream::iter(ahs)
1704            .for_each(|ah| async move { ah.abort() })
1705            .await;
1706
1707        Ok(())
1708    }
1709
1710    #[test_log::test(tokio::test)]
1711    async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1712        let alice_pseudonym = HoprPseudonym::random();
1713        let session_id = SessionId::new(16u64, alice_pseudonym);
1714        let balancer_cfg = SurbBalancerConfig {
1715            target_surb_buffer_size: 1000,
1716            max_surbs_per_sec: 100,
1717            ..Default::default()
1718        };
1719
1720        let alice_mgr =
1721            SessionManager::<UnboundedSender<(DestinationRouting, ApplicationDataOut)>>::new(Default::default());
1722
1723        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1724        alice_mgr
1725            .sessions
1726            .insert(
1727                session_id,
1728                SessionSlot {
1729                    session_tx: Arc::new(dummy_tx),
1730                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1731                    abort_handles: Vec::new(),
1732                    surb_mgmt: Some(balancer_cfg.clone()),
1733                    surb_estimator: None,
1734                },
1735            )
1736            .await;
1737
1738        let actual_cfg = alice_mgr
1739            .get_surb_balancer_config(&session_id)
1740            .await?
1741            .ok_or(anyhow!("session must have a surb balancer config"))?;
1742        assert_eq!(actual_cfg, balancer_cfg);
1743
1744        let new_cfg = SurbBalancerConfig {
1745            target_surb_buffer_size: 2000,
1746            max_surbs_per_sec: 200,
1747            ..Default::default()
1748        };
1749        alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1750
1751        let actual_cfg = alice_mgr
1752            .get_surb_balancer_config(&session_id)
1753            .await?
1754            .ok_or(anyhow!("session must have a surb balancer config"))?;
1755        assert_eq!(actual_cfg, new_cfg);
1756
1757        Ok(())
1758    }
1759
1760    #[test_log::test(tokio::test)]
1761    async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1762        let alice_pseudonym = HoprPseudonym::random();
1763        let bob_peer: Address = (&ChainKeypair::random()).into();
1764
1765        let cfg = SessionManagerConfig {
1766            session_tag_range: 16u64..17u64, // Slot for exactly one session
1767            ..Default::default()
1768        };
1769
1770        let alice_mgr = SessionManager::new(Default::default());
1771        let bob_mgr = SessionManager::new(cfg);
1772
1773        // Occupy the only free slot with tag 16
1774        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1775        bob_mgr
1776            .sessions
1777            .insert(
1778                SessionId::new(16u64, alice_pseudonym),
1779                SessionSlot {
1780                    session_tx: Arc::new(dummy_tx),
1781                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1782                    abort_handles: Vec::new(),
1783                    surb_mgmt: None,
1784                    surb_estimator: None,
1785                },
1786            )
1787            .await;
1788
1789        let mut sequence = mockall::Sequence::new();
1790        let mut alice_transport = MockMsgSender::new();
1791        let mut bob_transport = MockMsgSender::new();
1792
1793        // Alice sends the StartSession message
1794        let bob_mgr_clone = bob_mgr.clone();
1795        alice_transport
1796            .expect_send_message()
1797            .once()
1798            .in_sequence(&mut sequence)
1799            .withf(move |peer, data| {
1800                msg_type(data, StartProtocolDiscriminants::StartSession)
1801                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1802            })
1803            .returning(move |_, data| {
1804                let bob_mgr_clone = bob_mgr_clone.clone();
1805                Box::pin(async move {
1806                    bob_mgr_clone
1807                        .dispatch_message(
1808                            alice_pseudonym,
1809                            ApplicationDataIn {
1810                                data: data.data,
1811                                packet_info: Default::default(),
1812                            },
1813                        )
1814                        .await?;
1815                    Ok(())
1816                })
1817            });
1818
1819        // Bob sends the SessionError message
1820        let alice_mgr_clone = alice_mgr.clone();
1821        bob_transport
1822            .expect_send_message()
1823            .once()
1824            .in_sequence(&mut sequence)
1825            .withf(move |peer, data| {
1826                msg_type(data, StartProtocolDiscriminants::SessionError)
1827                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1828            })
1829            .returning(move |_, data| {
1830                let alice_mgr_clone = alice_mgr_clone.clone();
1831                Box::pin(async move {
1832                    alice_mgr_clone
1833                        .dispatch_message(
1834                            alice_pseudonym,
1835                            ApplicationDataIn {
1836                                data: data.data,
1837                                packet_info: Default::default(),
1838                            },
1839                        )
1840                        .await?;
1841                    Ok(())
1842                })
1843            });
1844
1845        let mut jhs = Vec::new();
1846
1847        // Start Alice
1848        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1849        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1850
1851        // Start Bob
1852        let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1853        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1854
1855        let result = alice_mgr
1856            .new_session(
1857                bob_peer,
1858                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1859                SessionClientConfig {
1860                    capabilities: Capabilities::empty(),
1861                    pseudonym: alice_pseudonym.into(),
1862                    surb_management: None,
1863                    ..Default::default()
1864                },
1865            )
1866            .await;
1867
1868        assert!(
1869            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1870        );
1871
1872        Ok(())
1873    }
1874
1875    #[test_log::test(tokio::test)]
1876    async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
1877    -> anyhow::Result<()> {
1878        let alice_pseudonym = HoprPseudonym::random();
1879        let bob_peer: Address = (&ChainKeypair::random()).into();
1880
1881        let cfg = SessionManagerConfig {
1882            maximum_sessions: 1,
1883            ..Default::default()
1884        };
1885
1886        let alice_mgr = SessionManager::new(Default::default());
1887        let bob_mgr = SessionManager::new(cfg);
1888
1889        // Occupy the only free slot with tag 16
1890        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1891        bob_mgr
1892            .sessions
1893            .insert(
1894                SessionId::new(16u64, alice_pseudonym),
1895                SessionSlot {
1896                    session_tx: Arc::new(dummy_tx),
1897                    routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
1898                    abort_handles: Vec::new(),
1899                    surb_mgmt: None,
1900                    surb_estimator: None,
1901                },
1902            )
1903            .await;
1904
1905        let mut sequence = mockall::Sequence::new();
1906        let mut alice_transport = MockMsgSender::new();
1907        let mut bob_transport = MockMsgSender::new();
1908
1909        // Alice sends the StartSession message
1910        let bob_mgr_clone = bob_mgr.clone();
1911        alice_transport
1912            .expect_send_message()
1913            .once()
1914            .in_sequence(&mut sequence)
1915            .withf(move |peer, data| {
1916                msg_type(data, StartProtocolDiscriminants::StartSession)
1917                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
1918            })
1919            .returning(move |_, data| {
1920                let bob_mgr_clone = bob_mgr_clone.clone();
1921                Box::pin(async move {
1922                    bob_mgr_clone
1923                        .dispatch_message(
1924                            alice_pseudonym,
1925                            ApplicationDataIn {
1926                                data: data.data,
1927                                packet_info: Default::default(),
1928                            },
1929                        )
1930                        .await?;
1931                    Ok(())
1932                })
1933            });
1934
1935        // Bob sends the SessionError message
1936        let alice_mgr_clone = alice_mgr.clone();
1937        bob_transport
1938            .expect_send_message()
1939            .once()
1940            .in_sequence(&mut sequence)
1941            .withf(move |peer, data| {
1942                msg_type(data, StartProtocolDiscriminants::SessionError)
1943                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1944            })
1945            .returning(move |_, data| {
1946                let alice_mgr_clone = alice_mgr_clone.clone();
1947                Box::pin(async move {
1948                    alice_mgr_clone
1949                        .dispatch_message(
1950                            alice_pseudonym,
1951                            ApplicationDataIn {
1952                                data: data.data,
1953                                packet_info: Default::default(),
1954                            },
1955                        )
1956                        .await?;
1957                    Ok(())
1958                })
1959            });
1960
1961        let mut jhs = Vec::new();
1962
1963        // Start Alice
1964        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1965        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1966
1967        // Start Bob
1968        let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1969        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1970
1971        let result = alice_mgr
1972            .new_session(
1973                bob_peer,
1974                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1975                SessionClientConfig {
1976                    capabilities: None.into(),
1977                    pseudonym: alice_pseudonym.into(),
1978                    surb_management: None,
1979                    ..Default::default()
1980                },
1981            )
1982            .await;
1983
1984        assert!(
1985            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1986        );
1987
1988        Ok(())
1989    }
1990
1991    #[test_log::test(tokio::test)]
1992    async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
1993        let alice_pseudonym = HoprPseudonym::random();
1994        let bob_peer: Address = (&ChainKeypair::random()).into();
1995
1996        let alice_mgr = SessionManager::new(Default::default());
1997
1998        let mut sequence = mockall::Sequence::new();
1999        let mut alice_transport = MockMsgSender::new();
2000
2001        // Alice sends the StartSession message
2002        let alice_mgr_clone = alice_mgr.clone();
2003        alice_transport
2004            .expect_send_message()
2005            .once()
2006            .in_sequence(&mut sequence)
2007            .withf(move |peer, data| {
2008                msg_type(data, StartProtocolDiscriminants::StartSession)
2009                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2010            })
2011            .returning(move |_, data| {
2012                // But the message is again processed by Alice due to Loopback
2013                let alice_mgr_clone = alice_mgr_clone.clone();
2014                Box::pin(async move {
2015                    alice_mgr_clone
2016                        .dispatch_message(
2017                            alice_pseudonym,
2018                            ApplicationDataIn {
2019                                data: data.data,
2020                                packet_info: Default::default(),
2021                            },
2022                        )
2023                        .await?;
2024                    Ok(())
2025                })
2026            });
2027
2028        // Alice sends the SessionEstablished message (as Bob)
2029        let alice_mgr_clone = alice_mgr.clone();
2030        alice_transport
2031            .expect_send_message()
2032            .once()
2033            .in_sequence(&mut sequence)
2034            .withf(move |peer, data| {
2035                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2036                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2037            })
2038            .returning(move |_, data| {
2039                let alice_mgr_clone = alice_mgr_clone.clone();
2040
2041                Box::pin(async move {
2042                    alice_mgr_clone
2043                        .dispatch_message(
2044                            alice_pseudonym,
2045                            ApplicationDataIn {
2046                                data: data.data,
2047                                packet_info: Default::default(),
2048                            },
2049                        )
2050                        .await?;
2051                    Ok(())
2052                })
2053            });
2054
2055        // Start Alice
2056        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
2057        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2058
2059        let alice_session = alice_mgr
2060            .new_session(
2061                bob_peer,
2062                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2063                SessionClientConfig {
2064                    capabilities: None.into(),
2065                    pseudonym: alice_pseudonym.into(),
2066                    surb_management: None,
2067                    ..Default::default()
2068                },
2069            )
2070            .await;
2071
2072        println!("{alice_session:?}");
2073        assert!(matches!(
2074            alice_session,
2075            Err(TransportSessionError::Manager(SessionManagerError::Loopback))
2076        ));
2077
2078        Ok(())
2079    }
2080
2081    #[test_log::test(tokio::test)]
2082    async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
2083        let bob_peer: Address = (&ChainKeypair::random()).into();
2084
2085        let cfg = SessionManagerConfig {
2086            initiation_timeout_base: Duration::from_millis(100),
2087            ..Default::default()
2088        };
2089
2090        let alice_mgr = SessionManager::new(cfg);
2091        let bob_mgr = SessionManager::new(Default::default());
2092
2093        let mut sequence = mockall::Sequence::new();
2094        let mut alice_transport = MockMsgSender::new();
2095        let bob_transport = MockMsgSender::new();
2096
2097        // Alice sends the StartSession message, but Bob does not handle it
2098        alice_transport
2099            .expect_send_message()
2100            .once()
2101            .in_sequence(&mut sequence)
2102            .withf(move |peer, data| {
2103                msg_type(data, StartProtocolDiscriminants::StartSession)
2104                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2105            })
2106            .returning(|_, _| Box::pin(async { Ok(()) }));
2107
2108        // Start Alice
2109        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
2110        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2111
2112        // Start Bob
2113        let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
2114        bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
2115
2116        let result = alice_mgr
2117            .new_session(
2118                bob_peer,
2119                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2120                SessionClientConfig {
2121                    capabilities: None.into(),
2122                    pseudonym: None,
2123                    surb_management: None,
2124                    ..Default::default()
2125                },
2126            )
2127            .await;
2128
2129        assert!(matches!(result, Err(TransportSessionError::Timeout)));
2130
2131        Ok(())
2132    }
2133
2134    #[test_log::test(tokio::test)]
2135    async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
2136        let alice_pseudonym = HoprPseudonym::random();
2137        let bob_peer: Address = (&ChainKeypair::random()).into();
2138
2139        let bob_cfg = SessionManagerConfig::default();
2140        let alice_mgr = SessionManager::new(Default::default());
2141        let bob_mgr = SessionManager::new(bob_cfg.clone());
2142
2143        let mut alice_transport = MockMsgSender::new();
2144        let mut bob_transport = MockMsgSender::new();
2145
2146        // Alice sends the StartSession message
2147        let mut open_sequence = mockall::Sequence::new();
2148        let bob_mgr_clone = bob_mgr.clone();
2149        alice_transport
2150            .expect_send_message()
2151            .once()
2152            .in_sequence(&mut open_sequence)
2153            .withf(move |peer, data| {
2154                msg_type(data, StartProtocolDiscriminants::StartSession)
2155                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2156            })
2157            .returning(move |_, data| {
2158                let bob_mgr_clone = bob_mgr_clone.clone();
2159                Box::pin(async move {
2160                    bob_mgr_clone
2161                        .dispatch_message(
2162                            alice_pseudonym,
2163                            ApplicationDataIn {
2164                                data: data.data,
2165                                packet_info: Default::default(),
2166                            },
2167                        )
2168                        .await?;
2169                    Ok(())
2170                })
2171            });
2172
2173        // Bob sends the SessionEstablished message
2174        let alice_mgr_clone = alice_mgr.clone();
2175        bob_transport
2176            .expect_send_message()
2177            .once()
2178            .in_sequence(&mut open_sequence)
2179            .withf(move |peer, data| {
2180                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2181                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2182            })
2183            .returning(move |_, data| {
2184                let alice_mgr_clone = alice_mgr_clone.clone();
2185                Box::pin(async move {
2186                    alice_mgr_clone
2187                        .dispatch_message(
2188                            alice_pseudonym,
2189                            ApplicationDataIn {
2190                                data: data.data,
2191                                packet_info: Default::default(),
2192                            },
2193                        )
2194                        .await?;
2195                    Ok(())
2196                })
2197            });
2198
2199        // Alice sends the KeepAlive messages
2200        let bob_mgr_clone = bob_mgr.clone();
2201        alice_transport
2202            .expect_send_message()
2203            .times(5..)
2204            //.in_sequence(&mut sequence)
2205            .withf(move |peer, data| {
2206                msg_type(data, StartProtocolDiscriminants::KeepAlive)
2207                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2208            })
2209            .returning(move |_, data| {
2210                let bob_mgr_clone = bob_mgr_clone.clone();
2211                Box::pin(async move {
2212                    bob_mgr_clone
2213                        .dispatch_message(
2214                            alice_pseudonym,
2215                            ApplicationDataIn {
2216                                data: data.data,
2217                                packet_info: Default::default(),
2218                            },
2219                        )
2220                        .await?;
2221                    Ok(())
2222                })
2223            });
2224
2225        // Alice sends the terminating segment to close the Session
2226        let bob_mgr_clone = bob_mgr.clone();
2227        alice_transport
2228            .expect_send_message()
2229            .once()
2230            //.in_sequence(&mut sequence)
2231            .withf(move |peer, data| {
2232                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2233                    data.data.plain_text.as_ref(),
2234                )
2235                .ok()
2236                .and_then(|m| m.try_as_segment())
2237                .map(|s| s.is_terminating())
2238                .unwrap_or(false)
2239                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination == &bob_peer)
2240            })
2241            .returning(move |_, data| {
2242                let bob_mgr_clone = bob_mgr_clone.clone();
2243                Box::pin(async move {
2244                    bob_mgr_clone
2245                        .dispatch_message(
2246                            alice_pseudonym,
2247                            ApplicationDataIn {
2248                                data: data.data,
2249                                packet_info: Default::default(),
2250                            },
2251                        )
2252                        .await?;
2253                    Ok(())
2254                })
2255            });
2256
2257        let mut ahs = Vec::new();
2258
2259        // Start Alice
2260        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
2261        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2262
2263        // Start Bob
2264        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
2265        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2266
2267        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2268
2269        let balancer_cfg = SurbBalancerConfig {
2270            target_surb_buffer_size: 10,
2271            max_surbs_per_sec: 100,
2272            ..Default::default()
2273        };
2274
2275        pin_mut!(new_session_rx_bob);
2276        let (alice_session, bob_session) = timeout(
2277            Duration::from_secs(2),
2278            futures::future::join(
2279                alice_mgr.new_session(
2280                    bob_peer,
2281                    SessionTarget::TcpStream(target.clone()),
2282                    SessionClientConfig {
2283                        pseudonym: alice_pseudonym.into(),
2284                        capabilities: Capability::Segmentation.into(),
2285                        surb_management: Some(balancer_cfg),
2286                        ..Default::default()
2287                    },
2288                ),
2289                new_session_rx_bob.next(),
2290            ),
2291        )
2292        .await?;
2293
2294        let mut alice_session = alice_session?;
2295        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2296
2297        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2298
2299        assert_eq!(
2300            Some(balancer_cfg),
2301            alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2302        );
2303
2304        let remote_cfg = bob_mgr
2305            .get_surb_balancer_config(bob_session.session.id())
2306            .await?
2307            .ok_or(anyhow!("no remote config at bob"))?;
2308        assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2309        assert_eq!(
2310            remote_cfg.max_surbs_per_sec,
2311            remote_cfg.target_surb_buffer_size
2312                / bob_cfg
2313                    .minimum_surb_buffer_duration
2314                    .max(MIN_SURB_BUFFER_DURATION)
2315                    .as_secs()
2316        );
2317
2318        // Let the Surb balancer send enough KeepAlive messages
2319        tokio::time::sleep(Duration::from_millis(1500)).await;
2320        alice_session.close().await?;
2321
2322        tokio::time::sleep(Duration::from_millis(300)).await;
2323        assert!(matches!(
2324            alice_mgr.ping_session(alice_session.id()).await,
2325            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2326        ));
2327
2328        futures::stream::iter(ahs)
2329            .for_each(|ah| async move { ah.abort() })
2330            .await;
2331
2332        Ok(())
2333    }
2334}