hopr_transport_session/
manager.rs

1use std::{
2    ops::Range,
3    sync::{Arc, OnceLock},
4    time::Duration,
5};
6
7use futures::{
8    FutureExt, SinkExt, StreamExt, TryStreamExt,
9    channel::mpsc::{Sender, UnboundedSender},
10    future::AbortHandle,
11    pin_mut,
12};
13use futures_time::future::FutureExt as TimeExt;
14use hopr_crypto_random::Randomizable;
15use hopr_internal_types::prelude::HoprPseudonym;
16use hopr_network_types::prelude::*;
17use hopr_primitive_types::prelude::Address;
18use hopr_protocol_app::prelude::*;
19use hopr_protocol_start::{
20    KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished, StartInitiation,
21};
22use tracing::{debug, error, info, trace, warn};
23
24use crate::{
25    Capability, HoprSession, IncomingSession, SESSION_MTU, SessionClientConfig, SessionId, SessionTarget,
26    SurbBalancerConfig,
27    balancer::{
28        AtomicSurbFlowEstimator, BalancerConfigFeedback, RateController, RateLimitSinkExt, SurbBalancer,
29        SurbControllerWithCorrection,
30        pid::{PidBalancerController, PidControllerGains},
31        simple::SimpleBalancerController,
32    },
33    errors::{SessionManagerError, TransportSessionError},
34    types::{ByteCapabilities, ClosureReason, HoprSessionConfig, HoprStartProtocol},
35    utils,
36    utils::insert_into_next_slot,
37};
38
39#[cfg(all(feature = "prometheus", not(test)))]
40lazy_static::lazy_static! {
41    static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
42        "hopr_session_num_active_sessions",
43        "Number of currently active HOPR sessions"
44    ).unwrap();
45    static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
46        "hopr_session_established_sessions_count",
47        "Number of sessions that were successfully established as an Exit node"
48    ).unwrap();
49    static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
50        "hopr_session_initiated_sessions_count",
51        "Number of sessions that were successfully initiated as an Entry node"
52    ).unwrap();
53    static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
54        "hopr_session_received_error_count",
55        "Number of HOPR session errors received from an Exit node",
56        &["kind"]
57    ).unwrap();
58    static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
59        "hopr_session_sent_error_count",
60        "Number of HOPR session errors sent to an Entry node",
61        &["kind"]
62    ).unwrap();
63}
64
65fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
66    debug!(?session_id, ?reason, "closing session");
67
68    if reason != ClosureReason::EmptyRead {
69        // Closing the data sender will also cause it to close from the read side
70        session_data.session_tx.close_channel();
71        trace!(?session_id, "data tx channel closed on session");
72    }
73
74    // Terminate any additional tasks spawned by the Session
75    session_data.abort_handles.into_iter().for_each(|h| h.abort());
76
77    #[cfg(all(feature = "prometheus", not(test)))]
78    METRIC_ACTIVE_SESSIONS.decrement(1.0);
79}
80
81fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
82    base * (hops as u32)
83}
84
85/// Minimum time the SURB buffer must endure if no SURBs are being produced.
86pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
87
88/// The first challenge value used in Start protocol to initiate a session.
89pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
90
91/// Maximum time to wait for counterparty to receive the target number of SURBs.
92const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
93
94/// Minimum timeout until an unfinished frame is discarded.
95const MIN_FRAME_TIMEOUT: Duration = Duration::from_millis(10);
96
97// Needs to use an UnboundedSender instead of oneshot
98// because Moka cache requires the value to be Clone, which oneshot Sender is not.
99// It also cannot be enclosed in an Arc, since calling `send` consumes the oneshot Sender.
100type SessionInitiationCache =
101    moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
102
103#[derive(Clone)]
104struct SessionSlot {
105    // Sender needs to be put in Arc, so that no clones are made by `moka`.
106    // This makes sure that the entire channel closes once the one and only sender is closed.
107    session_tx: Arc<UnboundedSender<ApplicationDataIn>>,
108    routing_opts: DestinationRouting,
109    abort_handles: Vec<AbortHandle>,
110    // Allows reconfiguring of the SURB balancer on-the-fly
111    // Set on both Entry and Exit sides.
112    surb_mgmt: Option<SurbBalancerConfig>,
113    // Set only on the Exit side.
114    surb_estimator: Option<AtomicSurbFlowEstimator>,
115}
116
117/// Allows forward and backward propagation of SURB management configuration changes.
118struct SessionCacheBalancerFeedback(moka::future::Cache<SessionId, SessionSlot>);
119
120#[async_trait::async_trait]
121impl BalancerConfigFeedback for SessionCacheBalancerFeedback {
122    async fn get_config(&self, id: &SessionId) -> crate::errors::Result<SurbBalancerConfig> {
123        // Intentionally using `iter().find()` instead of `get()` here,
124        // so that the popularity estimator is not hit.
125        self.0
126            .iter()
127            .find(|(sid, _)| sid.as_ref() == id)
128            .ok_or(SessionManagerError::NonExistingSession)?
129            .1
130            .surb_mgmt
131            .ok_or(SessionManagerError::Other("missing surb balancer config".into()).into())
132    }
133
134    async fn on_config_update(&self, id: &SessionId, cfg: SurbBalancerConfig) -> crate::errors::Result<()> {
135        if let moka::ops::compute::CompResult::ReplacedWith(_) = self
136            .0
137            .entry_by_ref(id)
138            .and_compute_with(|entry| {
139                futures::future::ready(match entry.map(|e| e.into_value()) {
140                    None => moka::ops::compute::Op::Nop,
141                    Some(mut updated_slot) => {
142                        if let Some(balancer_cfg) = &mut updated_slot.surb_mgmt {
143                            *balancer_cfg = cfg;
144                            moka::ops::compute::Op::Put(updated_slot)
145                        } else {
146                            moka::ops::compute::Op::Nop
147                        }
148                    }
149                })
150            })
151            .await
152        {
153            Ok(())
154        } else {
155            Err(SessionManagerError::NonExistingSession.into())
156        }
157    }
158}
159
160/// Indicates the result of processing a message.
161#[derive(Clone, Debug, PartialEq, Eq)]
162pub enum DispatchResult {
163    /// Session or Start protocol message has been processed successfully.
164    Processed,
165    /// The message was not related to Start or Session protocol.
166    Unrelated(ApplicationDataIn),
167}
168
169/// Configuration for the [`SessionManager`].
170#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
171pub struct SessionManagerConfig {
172    /// Ranges of tags available for Sessions.
173    ///
174    /// **NOTE**: If the range starts lower than [`ReservedTag`]'s range end,
175    /// it will be automatically transformed to start at this value.
176    /// This is due to the reserved range by the Start sub-protocol.
177    ///
178    /// Default is 16..1024.
179    #[doc(hidden)]
180    #[default(_code = "16u64..1024u64")]
181    pub session_tag_range: Range<u64>,
182
183    /// The maximum number of sessions (incoming and outgoing) that is allowed
184    /// to be managed by the manager.
185    ///
186    /// When reached, creating [new sessions](SessionManager::new_session) will return
187    /// the [`SessionManagerError::TooManySessions`] error, and incoming sessions will be rejected
188    /// with [`StartErrorReason::NoSlotsAvailable`] Start protocol error.
189    ///
190    /// Default is 128, minimum is 1; maximum is given by the `session_tag_range`.
191    #[default(128)]
192    pub maximum_sessions: usize,
193
194    /// The maximum chunk of data that can be written to the Session's input buffer.
195    ///
196    /// Default is 1500.
197    #[default(1500)]
198    pub frame_mtu: usize,
199
200    /// The maximum time for an incomplete frame to stay in the Session's output buffer.
201    ///
202    /// Default is 800 ms.
203    #[default(Duration::from_millis(800))]
204    pub max_frame_timeout: Duration,
205
206    /// The base timeout for initiation of Session initiation.
207    ///
208    /// The actual timeout is adjusted according to the number of hops for that Session:
209    /// `t = initiation_time_out_base * (num_forward_hops + num_return_hops + 2)`
210    ///
211    /// Default is 500 milliseconds.
212    #[default(Duration::from_millis(500))]
213    pub initiation_timeout_base: Duration,
214
215    /// Timeout for Session to be closed due to inactivity.
216    ///
217    /// Default is 180 seconds.
218    #[default(Duration::from_secs(180))]
219    pub idle_timeout: Duration,
220
221    /// The sampling interval for SURB balancer.
222    /// It will make SURB control decisions regularly at this interval.
223    ///
224    /// Default is 100 milliseconds.
225    #[default(Duration::from_millis(100))]
226    pub balancer_sampling_interval: Duration,
227
228    /// Initial packets per second egress rate on an incoming Session.
229    ///
230    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
231    ///
232    /// Default is 10 packets/second.
233    #[default(10)]
234    pub initial_return_session_egress_rate: usize,
235    /// Minimum period of time for which a SURB buffer at the Exit must
236    /// endure if no SURBs are being received.
237    ///
238    /// In other words, it is the minimum period of time an Exit must withstand when
239    /// no SURBs are received from the Entry at all. To do so, the egress traffic
240    /// will be shaped accordingly to meet this requirement.
241    ///
242    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
243    ///
244    /// Default is 5 seconds, minimum is 1 second.
245    #[default(Duration::from_secs(5))]
246    pub minimum_surb_buffer_duration: Duration,
247    /// Indicates the maximum number of SURBs in the SURB buffer to be requested when creating a new Session.
248    ///
249    /// This value is theoretically capped by the size of the global transport SURB ring buffer,
250    /// so values greater than that do not make sense. This value should be ideally set equal
251    /// to the size of the global transport SURB RB.
252    ///
253    /// Default is 10 000 SURBs.
254    #[default(10_000)]
255    pub maximum_surb_buffer_size: usize,
256
257    /// Determines if the `target_surb_buffer_size` can grow if the simple moving average of the
258    /// current SURB buffer size estimate surpasses the given threshold over the given period.
259    ///
260    /// This only applies if the used [`SurbBalancerController`] has this option.
261    ///
262    /// The default is `(60, 0.20)` (20% growth if the average SURB buffer size surpasses 20%
263    /// of the target buffer size in a 60-second window).
264    #[default(_code = "Some((Duration::from_secs(60), 0.20))")]
265    pub growable_target_surb_buffer: Option<(Duration, f64)>,
266}
267
268/// Manages lifecycles of Sessions.
269///
270/// Once the manager is [started](SessionManager::start), the [`SessionManager::dispatch_message`]
271/// should be called for each [`ApplicationData`] received by the node.
272/// This way, the `SessionManager` takes care of proper Start sub-protocol message processing
273/// and correct dispatch of Session-related packets to individual existing Sessions.
274///
275/// Secondly, the manager can initiate new outgoing sessions via [`SessionManager::new_session`],
276/// probe sessions using [`SessionManager::ping_session`]
277/// and list them via [SessionManager::active_sessions].
278///
279/// Since the `SessionManager` operates over the HOPR protocol,
280/// the message transport `S` is required.
281/// Such transport must also be `Clone`, since it will be cloned into all the created [`HoprSession`] objects.
282///
283/// ## SURB balancing
284/// The manager also can take care of automatic [SURB balancing](SurbBalancerConfig) per Session.
285///
286/// With each packet sent from the session initiator over to the receiving party, zero to 2 SURBs might be delivered.
287/// When the receiving party wants to send reply packets back, it must consume 1 SURB per packet. This
288/// means that if the difference between the SURBs delivered and SURBs consumed is negative, the receiving party
289/// might soon run out of SURBs. If SURBs run out, the reply packets will be dropped, causing likely quality of
290/// service degradation.
291///
292/// In an attempt to counter this effect, there are two co-existing automated modes of SURB balancing:
293/// *local SURB balancing* and *remote SURB balancing*.
294///
295/// ### Local SURB balancing
296/// Local SURB balancing is performed on the sessions that were initiated by another party (and are
297/// therefore incoming to us).
298/// The local SURB balancing mechanism continuously evaluates the rate of SURB consumption and retrieval,
299/// and if SURBs are running out, the packet egress shaping takes effect. This by itself does not
300/// avoid the depletion of SURBs but slows it down in the hope that the initiating party can deliver
301/// more SURBs over time. This might happen either organically by sending effective payloads that
302/// allow non-zero number of SURBs in the packet, or non-organically by delivering KeepAlive messages
303/// via *remote SURB balancing*.
304///
305/// The egress shaping is done automatically, unless the Session initiator sets the [`Capability::NoRateControl`]
306/// flag during Session initiation.
307///
308/// ### Remote SURB balancing
309/// Remote SURB balancing is performed by the Session initiator. The SURB balancer estimates the number of SURBs
310/// delivered to the other party, and also the number of SURBs consumed by seeing the amount of traffic received
311/// in replies.
312/// When enabled, a desired target level of SURBs at the Session counterparty is set. According to measured
313/// inflow and outflow of SURBs to/from the counterparty, the production of non-organic SURBs is started
314/// via keep-alive messages (sent to counterparty) and is controlled to maintain that target level.
315///
316/// In other words, the Session initiator tries to compensate for the usage of SURBs by the counterparty by
317/// sending new ones via the keep-alive messages.
318///
319/// This mechanism is configurable via the `surb_management` field in [`SessionClientConfig`].
320///
321/// ### Possible scenarios
322/// There are 4 different scenarios of local vs. remote SURB balancing configuration, but
323/// an equilibrium (= matching the SURB production and consumption) is most likely to be reached
324/// only when both are configured (the ideal case below):
325///
326/// #### 1. Ideal local and remote SURB balancing
327/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
328///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
329/// 2. The Session initiator (Entry) sets the [`target_surb_buffer_size`](SurbBalancerConfig) which matches the
330///    [`maximum_surb_buffer_size`](SessionManagerConfig) of the counterparty.
331/// 3. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
332///    Session.
333/// 4. The Session initiator (Entry) sets [`max_surbs_per_sec`](SurbBalancerConfig) slightly higher than the
334///    `maximum_surb_buffer_size / max_surb_buffer_duration` value configured at the counterparty.
335///
336/// In this situation, the maximum Session egress from Exit to the Entry is given by the
337/// `maximum_surb_buffer_size / max_surb_buffer_duration` ratio. If there is enough bandwidth,
338/// the (remote) SURB balancer sending SURBs to the Exit will stabilize roughly at this rate of SURBs/sec,
339/// and the whole system will be in equilibrium during the Session's lifetime (under ideal network conditions).
340///
341/// #### 2. Remote SURB balancing only
342/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
343/// 2. The Session initiator (Entry) sets `max_surbs_per_sec` and `target_surb_buffer_size` values in
344///    [`SurbBalancerConfig`]
345///
346/// In this one-sided situation, the Entry node floods the Exit node with SURBs,
347/// only based on its estimated consumption of SURBs at the Exit. The Exit's egress is not
348/// rate-limited at all. If the Exit runs out of SURBs at any point in time, it will simply drop egress packets.
349///
350/// This configuration could potentially only lead to an equilibrium
351/// when the `SurbBalancer` at the Entry can react fast enough to Exit's demand.
352///
353/// #### 3. Local SURB balancing only
354/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
355///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
356/// 2. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
357///    Session.
358/// 3. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
359///
360/// In this one-sided situation, the Entry node does not provide any additional SURBs at all (except the
361/// ones which are naturally carried by the egress packets which have space to hold SURBs). It relies
362/// only on the Session egress limiting of the Exit node.
363/// The Exit will limit the egress roughly to the rate of natural SURB occurrence in the ingress.
364///
365/// This configuration could potentially only lead to an equilibrium when uploading non-full packets
366/// (ones that can carry at least a single SURB), and the Exit's egress is limiting itself to such a rate.
367/// If Exit's egress reaches low values due to SURB scarcity, the upper layer protocols over Session might break.
368///
369/// #### 4. No SURB balancing on each side
370/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
371/// 2. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
372///
373/// In this situation, no additional SURBs are being produced by the Entry and no Session egress rate-limiting
374/// takes place at the Exit.
375///
376/// This configuration can only lead to an equilibrium when Entry sends non-full packets (ones that carry
377/// at least a single SURB) and the Exit is consuming the SURBs (Session egress) at a slower or equal rate.
378/// Such configuration is very fragile, as any disturbances in the SURB flow might lead to a packet drop
379/// at the Exit's egress.
380///
381/// ### SURB decay
382/// In a hypothetical scenario of a non-zero packet loss, the Session initiator (Entry) might send a
383/// certain number of SURBs to the Session recipient (Exit), but only a portion of it is actually delivered.
384/// The Entry has no way of knowing that and assumes that everything has been delivered.
385/// A similar problem happens when the Exit uses SURBs to construct return packets, but only a portion
386/// of those packets is actually delivered to the Entry. At this point, the Entry also subtracts
387/// fewer SURBs from its SURB estimate at the Exit.
388///
389/// In both situations, the Entry thinks there are more SURBs available at the Exit than there really are.
390///
391/// To compensate for a potential packet loss, the Entry's estimation of Exit's SURB buffer is regularly
392/// diminished by a percentage of the `target_surb_buffer_size`, even if no incoming traffic from the
393/// Exit is detected.
394///
395/// This behavior can be controlled via the `surb_decay` field of [`SurbBalancerConfig`].
396///
397/// ### Automatic `target_surb_buffer_size` increase
398/// This mechanism only applies to the Session recipient (Exit) and on Sessions without the
399/// [`Capability::NoRateControl`] flag set.
400/// In this case, the Exit throttles the Session egress based on the ratio between
401/// of the estimated SURB balance and the *hint* of Entry's `target_surb_buffer_size` set during the Session initiation.
402///
403/// However, as the Entry might [increase](SessionManager::update_surb_balancer_config) the `target_surb_buffer_size`
404/// of the Session dynamically, the new value is never hinted again to the Exit (this only happens once during
405/// the Session initiation).
406/// For this reason, the Exit then might observe the ratio going higher than 1. When this happens consistently
407/// over some given time period, the Exit can decide to increase the initial hint to the newly observed value.
408///
409/// See the `growable_target_surb_buffer` field in the [`SessionManagerConfig`] for details.
410pub struct SessionManager<S, T> {
411    session_initiations: SessionInitiationCache,
412    #[allow(clippy::type_complexity)]
413    session_notifiers: Arc<OnceLock<(T, Sender<(SessionId, ClosureReason)>)>>,
414    sessions: moka::future::Cache<SessionId, SessionSlot>,
415    msg_sender: Arc<OnceLock<S>>,
416    cfg: SessionManagerConfig,
417}
418
419impl<S, T> Clone for SessionManager<S, T> {
420    fn clone(&self) -> Self {
421        Self {
422            session_initiations: self.session_initiations.clone(),
423            session_notifiers: self.session_notifiers.clone(),
424            sessions: self.sessions.clone(),
425            cfg: self.cfg.clone(),
426            msg_sender: self.msg_sender.clone(),
427        }
428    }
429}
430
431const EXTERNAL_SEND_TIMEOUT: Duration = Duration::from_millis(200);
432
433impl<S, T> SessionManager<S, T>
434where
435    S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
436    T: futures::Sink<IncomingSession> + Clone + Send + Sync + Unpin + 'static,
437    S::Error: std::error::Error + Send + Sync + Clone + 'static,
438    T::Error: std::error::Error + Send + Sync + Clone + 'static,
439{
440    /// Creates a new instance given the [`config`](SessionManagerConfig).
441    pub fn new(mut cfg: SessionManagerConfig) -> Self {
442        let min_session_tag_range_reservation = ReservedTag::range().end;
443        debug_assert!(
444            min_session_tag_range_reservation > HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG.as_u64(),
445            "invalid tag reservation range"
446        );
447
448        // Accommodate the lower bound if too low.
449        if cfg.session_tag_range.start < min_session_tag_range_reservation {
450            let diff = min_session_tag_range_reservation - cfg.session_tag_range.start;
451            cfg.session_tag_range = min_session_tag_range_reservation..cfg.session_tag_range.end + diff;
452        }
453        cfg.maximum_sessions = cfg
454            .maximum_sessions
455            .clamp(1, (cfg.session_tag_range.end - cfg.session_tag_range.start) as usize);
456
457        // Ensure the Frame MTU is at least the size of the Session segment MTU payload
458        cfg.frame_mtu = cfg.frame_mtu.max(SESSION_MTU);
459        cfg.max_frame_timeout = cfg.max_frame_timeout.max(MIN_FRAME_TIMEOUT);
460
461        #[cfg(all(feature = "prometheus", not(test)))]
462        METRIC_ACTIVE_SESSIONS.set(0.0);
463
464        let msg_sender = Arc::new(OnceLock::new());
465        Self {
466            msg_sender: msg_sender.clone(),
467            session_initiations: moka::future::Cache::builder()
468                .max_capacity(cfg.maximum_sessions as u64)
469                .time_to_live(
470                    2 * initiation_timeout_max_one_way(
471                        cfg.initiation_timeout_base,
472                        RoutingOptions::MAX_INTERMEDIATE_HOPS,
473                    ),
474                )
475                .build(),
476            sessions: moka::future::Cache::builder()
477                .max_capacity(cfg.maximum_sessions as u64)
478                .time_to_idle(cfg.idle_timeout)
479                .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
480                    moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
481                        trace!(?session_id, ?reason, "session evicted from the cache");
482                        close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
483                    }
484                    _ => {}
485                })
486                .build(),
487            session_notifiers: Arc::new(OnceLock::new()),
488            cfg,
489        }
490    }
491
492    /// Starts the instance with the given `msg_sender` `Sink`
493    /// and a channel `new_session_notifier` used to notify when a new incoming session is opened to us.
494    ///
495    /// This method must be called prior to any calls to [`SessionManager::new_session`] or
496    /// [`SessionManager::dispatch_message`].
497    pub fn start(&self, msg_sender: S, new_session_notifier: T) -> crate::errors::Result<Vec<AbortHandle>> {
498        self.msg_sender
499            .set(msg_sender)
500            .map_err(|_| SessionManagerError::AlreadyStarted)?;
501
502        let (session_close_tx, session_close_rx) = futures::channel::mpsc::channel(self.cfg.maximum_sessions + 10);
503        self.session_notifiers
504            .set((new_session_notifier, session_close_tx))
505            .map_err(|_| SessionManagerError::AlreadyStarted)?;
506
507        let myself = self.clone();
508        let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable!(session_close_rx.for_each_concurrent(
509            None,
510            move |(session_id, closure_reason)| {
511                let myself = myself.clone();
512                async move {
513                    // These notifications come from the Sessions themselves once
514                    // an empty read is encountered, which means the closure was done by the
515                    // other party.
516                    if let Some(session_data) = myself.sessions.remove(&session_id).await {
517                        close_session(session_id, session_data, closure_reason);
518                    } else {
519                        // Do not treat this as an error
520                        debug!(
521                            ?session_id,
522                            ?closure_reason,
523                            "could not find session id to close, maybe the session is already closed"
524                        );
525                    }
526                }
527            },
528        ));
529
530        // This is necessary to evict expired entries from the caches if
531        // no session-related operations happen at all.
532        // This ensures the dangling expired sessions are properly closed
533        // and their closure is timely notified to the other party.
534        let myself = self.clone();
535        let ah_session_expiration = hopr_async_runtime::spawn_as_abortable!(async move {
536            let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
537            let timeout = 2 * initiation_timeout_max_one_way(
538                myself.cfg.initiation_timeout_base,
539                RoutingOptions::MAX_INTERMEDIATE_HOPS,
540            )
541            .min(myself.cfg.idle_timeout)
542            .mul_f64(jitter)
543                / 2;
544            futures_time::stream::interval(timeout.into())
545                .for_each(|_| {
546                    trace!("executing session cache evictions");
547                    futures::future::join(
548                        myself.sessions.run_pending_tasks(),
549                        myself.session_initiations.run_pending_tasks(),
550                    )
551                    .map(|_| ())
552                })
553                .await;
554        });
555
556        Ok(vec![ah_closure_notifications, ah_session_expiration])
557    }
558
559    /// Check if [`start`](SessionManager::start) has been called and the instance is running.
560    pub fn is_started(&self) -> bool {
561        self.session_notifiers.get().is_some()
562    }
563
564    async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
565        // We currently do not support loopback Sessions on ourselves.
566        let abort_handles_clone = slot.abort_handles.clone();
567        if let moka::ops::compute::CompResult::Inserted(_) = self
568            .sessions
569            .entry(session_id)
570            .and_compute_with(|entry| {
571                futures::future::ready(if entry.is_none() {
572                    moka::ops::compute::Op::Put(slot)
573                } else {
574                    moka::ops::compute::Op::Nop
575                })
576            })
577            .await
578        {
579            #[cfg(all(feature = "prometheus", not(test)))]
580            {
581                METRIC_NUM_INITIATED_SESSIONS.increment();
582                METRIC_ACTIVE_SESSIONS.increment(1.0);
583            }
584
585            Ok(())
586        } else {
587            // Session already exists; it means it is most likely a loopback attempt
588            error!(%session_id, "session already exists - loopback attempt");
589            abort_handles_clone.into_iter().for_each(|ah| ah.abort());
590            Err(SessionManagerError::Loopback.into())
591        }
592    }
593
594    /// Initiates a new outgoing Session to `destination` with the given configuration.
595    ///
596    /// If the Session's counterparty does not respond within
597    /// the [configured](SessionManagerConfig) period,
598    /// this method returns [`TransportSessionError::Timeout`].
599    ///
600    /// It will also fail if the instance has not been [started](SessionManager::start).
601    pub async fn new_session(
602        &self,
603        destination: Address,
604        target: SessionTarget,
605        cfg: SessionClientConfig,
606    ) -> crate::errors::Result<HoprSession> {
607        self.sessions.run_pending_tasks().await;
608        if self.cfg.maximum_sessions <= self.sessions.entry_count() as usize {
609            return Err(SessionManagerError::TooManySessions.into());
610        }
611
612        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
613
614        let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
615        let challenge = insert_into_next_slot(
616            &self.session_initiations,
617            |ch| {
618                if let Some(challenge) = ch {
619                    ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
620                } else {
621                    hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
622                }
623            },
624            tx_initiation_done,
625        )
626        .await
627        .ok_or(SessionManagerError::NoChallengeSlots)?; // almost impossible with u64
628
629        // Prepare the session initiation message in the Start protocol
630        trace!(challenge, ?cfg, "initiating session with config");
631        let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
632            challenge,
633            target,
634            capabilities: ByteCapabilities(cfg.capabilities),
635            additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
636                cfg.surb_management
637                    .map(|c| c.target_surb_buffer_size)
638                    .unwrap_or(
639                        self.cfg.initial_return_session_egress_rate as u64
640                            * self
641                                .cfg
642                                .minimum_surb_buffer_duration
643                                .max(MIN_SURB_BUFFER_DURATION)
644                                .as_secs(),
645                    )
646                    .min(u32::MAX as u64) as u32
647            } else {
648                0
649            },
650        });
651
652        let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
653        let forward_routing = DestinationRouting::Forward {
654            destination: Box::new(destination.into()),
655            pseudonym: Some(pseudonym), // Session must use a fixed pseudonym already
656            forward_options: cfg.forward_path_options.clone(),
657            return_options: cfg.return_path_options.clone().into(),
658        };
659
660        // Send the Session initiation message
661        info!(challenge, %pseudonym, %destination, "new session request");
662        msg_sender
663            .send((
664                forward_routing.clone(),
665                ApplicationDataOut::with_no_packet_info(start_session_msg.try_into()?),
666            ))
667            .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
668            .await
669            .map_err(|_| {
670                error!(challenge, %pseudonym, %destination, "timeout sending session request message");
671                TransportSessionError::Timeout
672            })?
673            .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?;
674
675        // The timeout is given by the number of hops requested
676        let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
677            self.cfg.initiation_timeout_base,
678            cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
679        )
680        .into();
681
682        // Await session establishment response from the Exit node or timeout
683        pin_mut!(rx_initiation_done);
684
685        trace!(challenge, "awaiting session establishment");
686        match rx_initiation_done.try_next().timeout(initiation_timeout).await {
687            Ok(Ok(Some(est))) => {
688                // Session has been established, construct it
689                let session_id = est.session_id;
690                debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
691
692                let (tx, rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
693                let notifier = self
694                    .session_notifiers
695                    .get()
696                    .map(|(_, notifier)| {
697                        let mut notifier = notifier.clone();
698                        Box::new(move |session_id: SessionId, reason: ClosureReason| {
699                            let _ = notifier
700                                .try_send((session_id, reason))
701                                .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
702                        })
703                    })
704                    .ok_or(SessionManagerError::NotStarted)?;
705
706                // NOTE: the Exit node can have different `max_surb_buffer_size`
707                // setting on the Session manager, so it does not make sense to cap it here
708                // with our maximum value.
709                if let Some(balancer_config) = cfg.surb_management {
710                    let surb_estimator = AtomicSurbFlowEstimator::default();
711
712                    // Sender responsible for keep-alive and Session data will be counting produced SURBs
713                    let surb_estimator_clone = surb_estimator.clone();
714                    let full_surb_scoring_sender =
715                        msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
716                            // Count how many SURBs we sent with each packet
717                            surb_estimator_clone.produced.fetch_add(
718                                data.estimate_surbs_with_msg() as u64,
719                                std::sync::atomic::Ordering::Relaxed,
720                            );
721                            futures::future::ok::<_, S::Error>((routing, data))
722                        });
723
724                    // For standard Session data we first reduce the number of SURBs we want to produce,
725                    // unless requested to always max them out
726                    let max_out_organic_surbs = cfg.always_max_out_surbs;
727                    let reduced_surb_scoring_sender = full_surb_scoring_sender.clone().with(
728                        // NOTE: this is put in-front of the `full_surb_scoring_sender`,
729                        // so that its estimate of SURBs gets automatically updated based on
730                        // the `max_surbs_in_packets` set here.
731                        move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
732                            if !max_out_organic_surbs {
733                                // TODO: make this dynamic to honor the balancer target (#7439)
734                                data.packet_info
735                                    .get_or_insert_with(|| OutgoingPacketInfo {
736                                        max_surbs_in_packet: 1,
737                                        ..Default::default()
738                                    })
739                                    .max_surbs_in_packet = 1;
740                            }
741                            futures::future::ok::<_, S::Error>((routing, data))
742                        },
743                    );
744
745                    let mut abort_handles = Vec::new();
746
747                    // Spawn the SURB-bearing keep alive stream
748                    let (ka_controller, ka_abort_handle) =
749                        utils::spawn_keep_alive_stream(session_id, full_surb_scoring_sender, forward_routing.clone());
750                    abort_handles.push(ka_abort_handle);
751
752                    // Spawn the SURB balancer, which will decide on the initial SURB rate.
753                    debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
754                    let balancer = SurbBalancer::new(
755                        session_id,
756                        // The setpoint and output limit is immediately reconfigured by SurbBalancer
757                        PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
758                        surb_estimator.clone(),
759                        ka_controller,
760                        balancer_config,
761                    );
762
763                    let (level_stream, balancer_abort_handle) = balancer.start_control_loop(
764                        self.cfg.balancer_sampling_interval,
765                        SessionCacheBalancerFeedback(self.sessions.clone()),
766                        None,
767                    );
768                    abort_handles.push(balancer_abort_handle);
769
770                    // If the insertion fails prematurely, it will also kill all the abort handles
771                    self.insert_session_slot(
772                        session_id,
773                        SessionSlot {
774                            session_tx: Arc::new(tx),
775                            routing_opts: forward_routing.clone(),
776                            abort_handles,
777                            surb_mgmt: Some(balancer_config),
778                            surb_estimator: None,
779                        },
780                    )
781                    .await?;
782
783                    // Wait for enough SURBs to be sent to the counterparty
784                    // TODO: consider making this interactive = other party reports the exact level periodically
785                    match level_stream
786                        .skip_while(|current_level| {
787                            futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
788                        })
789                        .next()
790                        .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
791                        .await
792                    {
793                        Ok(Some(surb_level)) => {
794                            info!(%session_id, surb_level, "session is ready");
795                        }
796                        Ok(None) => {
797                            return Err(
798                                SessionManagerError::Other("surb balancer was cancelled prematurely".into()).into(),
799                            );
800                        }
801                        Err(_) => {
802                            warn!(%session_id, "session didn't reach target SURB buffer size in time");
803                        }
804                    }
805
806                    HoprSession::new(
807                        session_id,
808                        forward_routing,
809                        HoprSessionConfig {
810                            capabilities: cfg.capabilities,
811                            frame_mtu: self.cfg.frame_mtu,
812                            frame_timeout: self.cfg.max_frame_timeout,
813                        },
814                        (
815                            reduced_surb_scoring_sender,
816                            rx.inspect(move |_| {
817                                // Received packets = SURB consumption estimate
818                                // The received packets always consume a single SURB.
819                                surb_estimator
820                                    .consumed
821                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
822                            }),
823                        ),
824                        Some(notifier),
825                    )
826                } else {
827                    warn!(%session_id, "session ready without SURB balancing");
828
829                    self.insert_session_slot(
830                        session_id,
831                        SessionSlot {
832                            session_tx: Arc::new(tx),
833                            routing_opts: forward_routing.clone(),
834                            abort_handles: vec![],
835                            surb_mgmt: None,
836                            surb_estimator: None,
837                        },
838                    )
839                    .await?;
840
841                    // For standard Session data we first reduce the number of SURBs we want to produce,
842                    // unless requested to always max them out
843                    let max_out_organic_surbs = cfg.always_max_out_surbs;
844                    let reduced_surb_sender =
845                        msg_sender.with(move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
846                            if !max_out_organic_surbs {
847                                data.packet_info
848                                    .get_or_insert_with(|| OutgoingPacketInfo {
849                                        max_surbs_in_packet: 1,
850                                        ..Default::default()
851                                    })
852                                    .max_surbs_in_packet = 1;
853                            }
854                            futures::future::ok::<_, S::Error>((routing, data))
855                        });
856
857                    HoprSession::new(
858                        session_id,
859                        forward_routing,
860                        HoprSessionConfig {
861                            capabilities: cfg.capabilities,
862                            frame_mtu: self.cfg.frame_mtu,
863                            frame_timeout: self.cfg.max_frame_timeout,
864                        },
865                        (reduced_surb_sender, rx),
866                        Some(notifier),
867                    )
868                }
869            }
870            Ok(Ok(None)) => Err(SessionManagerError::Other(
871                "internal error: sender has been closed without completing the session establishment".into(),
872            )
873            .into()),
874            Ok(Err(error)) => {
875                // The other side did not allow us to establish a session
876                error!(
877                    challenge = error.challenge,
878                    ?error,
879                    "the other party rejected the session initiation with error"
880                );
881                Err(TransportSessionError::Rejected(error.reason))
882            }
883            Err(_) => {
884                // Timeout waiting for a session establishment
885                error!(challenge, "session initiation attempt timed out");
886
887                #[cfg(all(feature = "prometheus", not(test)))]
888                METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
889
890                Err(TransportSessionError::Timeout)
891            }
892        }
893    }
894
895    /// Sends a keep-alive packet with the given [`SessionId`].
896    ///
897    /// This currently "fires & forgets" and does not expect nor await any "pong" response.
898    pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
899        if let Some(session_data) = self.sessions.get(id).await {
900            trace!(session_id = ?id, "pinging manually session");
901            Ok(self
902                .msg_sender
903                .get()
904                .cloned()
905                .ok_or(SessionManagerError::NotStarted)?
906                .send((
907                    session_data.routing_opts.clone(),
908                    ApplicationDataOut::with_no_packet_info(HoprStartProtocol::KeepAlive((*id).into()).try_into()?),
909                ))
910                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
911                .await
912                .map_err(|_| {
913                    error!("timeout sending session ping message");
914                    TransportSessionError::Timeout
915                })?
916                .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?)
917        } else {
918            Err(SessionManagerError::NonExistingSession.into())
919        }
920    }
921
922    /// Returns [`SessionIds`](SessionId) of all currently active sessions.
923    pub async fn active_sessions(&self) -> Vec<SessionId> {
924        self.sessions.run_pending_tasks().await;
925        self.sessions.iter().map(|(k, _)| *k).collect()
926    }
927
928    /// Updates the configuration of the SURB balancer on the given [`SessionId`].
929    ///
930    /// Returns an error if the Session with the given `id` does not exist, or
931    /// if it does not use SURB balancing.
932    pub async fn update_surb_balancer_config(
933        &self,
934        id: &SessionId,
935        config: SurbBalancerConfig,
936    ) -> crate::errors::Result<()> {
937        match self
938            .sessions
939            .entry_by_ref(id)
940            .and_compute_with(|entry| {
941                futures::future::ready(if let Some(mut cached_session) = entry.map(|e| e.into_value()) {
942                    // Only update the config if there already was one before
943                    if cached_session.surb_mgmt.is_some() {
944                        cached_session.surb_mgmt = Some(config);
945                        moka::ops::compute::Op::Put(cached_session)
946                    } else {
947                        moka::ops::compute::Op::Nop
948                    }
949                } else {
950                    moka::ops::compute::Op::Nop
951                })
952            })
953            .await
954        {
955            moka::ops::compute::CompResult::ReplacedWith(_) => Ok(()),
956            moka::ops::compute::CompResult::Unchanged(_) => {
957                Err(SessionManagerError::Other("session does not use SURB balancing".into()).into())
958            }
959            _ => Err(SessionManagerError::NonExistingSession.into()),
960        }
961    }
962
963    /// Retrieves the configuration of SURB balancing for the given Session.
964    ///
965    /// Returns an error if the Session with the given `id` does not exist.
966    pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
967        match self.sessions.get(id).await {
968            Some(session) => Ok(session.surb_mgmt),
969            None => Err(SessionManagerError::NonExistingSession.into()),
970        }
971    }
972
973    /// The main method to be called whenever data are received.
974    ///
975    /// It tries to recognize the message and correctly dispatches either
976    /// the Session protocol or Start protocol messages.
977    ///
978    /// If the data are not recognized, they are returned as [`DispatchResult::Unrelated`].
979    pub async fn dispatch_message(
980        &self,
981        pseudonym: HoprPseudonym,
982        in_data: ApplicationDataIn,
983    ) -> crate::errors::Result<DispatchResult> {
984        if in_data.data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
985            // This is a Start protocol message, so we handle it
986            trace!("dispatching Start protocol message");
987            return self
988                .handle_start_protocol_message(pseudonym, in_data)
989                .await
990                .map(|_| DispatchResult::Processed);
991        } else if self
992            .cfg
993            .session_tag_range
994            .contains(&in_data.data.application_tag.as_u64())
995        {
996            let session_id = SessionId::new(in_data.data.application_tag, pseudonym);
997
998            return if let Some(session_data) = self.sessions.get(&session_id).await {
999                trace!(?session_id, "received data for a registered session");
1000
1001                Ok(session_data
1002                    .session_tx
1003                    .unbounded_send(in_data)
1004                    .map(|_| DispatchResult::Processed)
1005                    .map_err(|e| SessionManagerError::Other(e.to_string()))?)
1006            } else {
1007                error!(%session_id, "received data from an unestablished session");
1008                Err(TransportSessionError::UnknownData)
1009            };
1010        }
1011
1012        trace!(tag = %in_data.data.application_tag, "received data not associated with session protocol or any existing session");
1013        Ok(DispatchResult::Unrelated(in_data))
1014    }
1015
1016    async fn handle_incoming_session_initiation(
1017        &self,
1018        pseudonym: HoprPseudonym,
1019        session_req: StartInitiation<SessionTarget, ByteCapabilities>,
1020    ) -> crate::errors::Result<()> {
1021        trace!(challenge = session_req.challenge, "received session initiation request");
1022
1023        debug!(%pseudonym, "got new session request, searching for a free session slot");
1024
1025        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
1026
1027        let (mut new_session_notifier, mut close_session_notifier) = self
1028            .session_notifiers
1029            .get()
1030            .cloned()
1031            .ok_or(SessionManagerError::NotStarted)?;
1032
1033        // Reply routing uses SURBs only with the pseudonym of this Session's ID
1034        let reply_routing = DestinationRouting::Return(pseudonym.into());
1035
1036        let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1037
1038        // Search for a free Session ID slot
1039        self.sessions.run_pending_tasks().await; // Needed so that entry_count is updated
1040        let allocated_slot = if self.cfg.maximum_sessions > self.sessions.entry_count() as usize {
1041            insert_into_next_slot(
1042                &self.sessions,
1043                |sid| {
1044                    // NOTE: It is allowed to insert sessions using the same tag
1045                    // but different pseudonyms because the SessionId is different.
1046                    let next_tag: Tag = match sid {
1047                        Some(session_id) => ((session_id.tag().as_u64() + 1) % self.cfg.session_tag_range.end)
1048                            .max(self.cfg.session_tag_range.start)
1049                            .into(),
1050                        None => hopr_crypto_random::random_integer(
1051                            self.cfg.session_tag_range.start,
1052                            Some(self.cfg.session_tag_range.end),
1053                        )
1054                        .into(),
1055                    };
1056                    SessionId::new(next_tag, pseudonym)
1057                },
1058                SessionSlot {
1059                    session_tx: Arc::new(tx_session_data),
1060                    routing_opts: reply_routing.clone(),
1061                    abort_handles: vec![],
1062                    surb_mgmt: None,
1063                    surb_estimator: None,
1064                },
1065            )
1066            .await
1067        } else {
1068            error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
1069            None
1070        };
1071
1072        // If some of the following code throws an error, the allocated slot will be kept
1073        // but will be later re-claimed when it expires.
1074        if let Some(session_id) = allocated_slot {
1075            debug!(%session_id, ?session_req, "assigned a new session");
1076
1077            let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
1078                if let Err(error) = close_session_notifier.try_send((session_id, reason)) {
1079                    error!(%session_id, %error, %reason, "failed to notify session closure");
1080                }
1081            });
1082
1083            let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
1084                let surb_estimator = AtomicSurbFlowEstimator::default();
1085
1086                // Because of SURB scarcity, control the egress rate of incoming sessions
1087                let egress_rate_control =
1088                    RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1089
1090                // The Session request carries a "hint" as additional data telling what
1091                // the Session initiator has configured as its target buffer size in the Balancer.
1092                let target_surb_buffer_size = if session_req.additional_data > 0 {
1093                    (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1094                } else {
1095                    self.cfg.initial_return_session_egress_rate as u64
1096                        * self
1097                            .cfg
1098                            .minimum_surb_buffer_duration
1099                            .max(MIN_SURB_BUFFER_DURATION)
1100                            .as_secs()
1101                };
1102
1103                let surb_estimator_clone = surb_estimator.clone();
1104                let session = HoprSession::new(
1105                    session_id,
1106                    reply_routing.clone(),
1107                    HoprSessionConfig {
1108                        capabilities: session_req.capabilities.into(),
1109                        frame_mtu: self.cfg.frame_mtu,
1110                        frame_timeout: self.cfg.max_frame_timeout,
1111                    },
1112                    (
1113                        // Sent packets = SURB consumption estimate
1114                        msg_sender
1115                            .clone()
1116                            .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1117                                // Each outgoing packet consumes one SURB
1118                                surb_estimator_clone
1119                                    .consumed
1120                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1121                                futures::future::ok::<_, S::Error>((routing, data))
1122                            })
1123                            .rate_limit_with_controller(&egress_rate_control)
1124                            .buffer((2 * target_surb_buffer_size) as usize),
1125                        // Received packets = SURB retrieval estimate
1126                        rx_session_data.inspect(move |data| {
1127                            // Count the number of SURBs delivered with each incoming packet
1128                            surb_estimator_clone
1129                                .produced
1130                                .fetch_add(data.num_surbs_with_msg() as u64, std::sync::atomic::Ordering::Relaxed);
1131                        }),
1132                    ),
1133                    Some(closure_notifier),
1134                )?;
1135
1136                // The SURB balancer will start intervening by rate-limiting the
1137                // egress of the Session, once the estimated number of SURBs drops below
1138                // the target defined here. Otherwise, the maximum egress is allowed.
1139                let balancer_config = SurbBalancerConfig {
1140                    target_surb_buffer_size,
1141                    // At maximum egress, the SURB buffer drains in `minimum_surb_buffer_duration` seconds
1142                    max_surbs_per_sec: target_surb_buffer_size
1143                        / self
1144                            .cfg
1145                            .minimum_surb_buffer_duration
1146                            .max(MIN_SURB_BUFFER_DURATION)
1147                            .as_secs(),
1148                    // No SURB decay at the Exit, since we know almost exactly how many SURBs
1149                    // were received
1150                    surb_decay: None,
1151                };
1152
1153                // Assign the SURB balancer and abort handles to the already allocated Session slot
1154                let (balancer_abort_handle, balancer_abort_reg) = AbortHandle::new_pair();
1155                if let moka::ops::compute::CompResult::ReplacedWith(_) = self
1156                    .sessions
1157                    .entry(session_id)
1158                    .and_compute_with(|entry| {
1159                        if let Some(mut cached_session) = entry.map(|c| c.into_value()) {
1160                            cached_session.abort_handles.push(balancer_abort_handle);
1161                            cached_session.surb_mgmt = Some(balancer_config);
1162                            cached_session.surb_estimator = Some(surb_estimator.clone());
1163                            futures::future::ready(moka::ops::compute::Op::Put(cached_session))
1164                        } else {
1165                            futures::future::ready(moka::ops::compute::Op::Nop)
1166                        }
1167                    })
1168                    .await
1169                {
1170                    // Spawn the SURB balancer only once we know we have registered the
1171                    // abort handle with the pre-allocated Session slot
1172                    debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1173                    let balancer = SurbBalancer::new(
1174                        session_id,
1175                        if let Some((growth_window, ratio_threshold)) = self.cfg.growable_target_surb_buffer.as_ref() {
1176                            // Allow automatic setpoint increase
1177                            SimpleBalancerController::with_increasing_setpoint(
1178                                *ratio_threshold,
1179                                (growth_window
1180                                    .div_duration_f64(self.cfg.balancer_sampling_interval)
1181                                    .round() as usize)
1182                                    .max(1),
1183                            )
1184                        } else {
1185                            SimpleBalancerController::default()
1186                        },
1187                        surb_estimator,
1188                        SurbControllerWithCorrection(egress_rate_control, 1), // 1 SURB per egress packet
1189                        balancer_config,
1190                    );
1191
1192                    let _ = balancer.start_control_loop(
1193                        self.cfg.balancer_sampling_interval,
1194                        SessionCacheBalancerFeedback(self.sessions.clone()),
1195                        Some(balancer_abort_reg),
1196                    );
1197                } else {
1198                    // This should never happen, but be sure we handle this error
1199                    return Err(SessionManagerError::Other(
1200                        "failed to spawn SURB balancer - inconsistent cache".into(),
1201                    )
1202                    .into());
1203                }
1204
1205                session
1206            } else {
1207                HoprSession::new(
1208                    session_id,
1209                    reply_routing.clone(),
1210                    HoprSessionConfig {
1211                        capabilities: session_req.capabilities.into(),
1212                        frame_mtu: self.cfg.frame_mtu,
1213                        frame_timeout: self.cfg.max_frame_timeout,
1214                    },
1215                    (msg_sender.clone(), rx_session_data),
1216                    Some(closure_notifier),
1217                )?
1218            };
1219
1220            // Extract useful information about the session from the Start protocol message
1221            let incoming_session = IncomingSession {
1222                session,
1223                target: session_req.target,
1224            };
1225
1226            // Notify that a new incoming session has been created
1227            match new_session_notifier
1228                .send(incoming_session)
1229                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1230                .await
1231            {
1232                Err(_) => {
1233                    error!(%session_id, "timeout to notify about new incoming session");
1234                    return Err(TransportSessionError::Timeout);
1235                }
1236                Ok(Err(error)) => {
1237                    error!(%session_id, %error, "failed to notify about new incoming session");
1238                    return Err(SessionManagerError::Other(error.to_string()).into());
1239                }
1240                _ => {}
1241            };
1242
1243            trace!(?session_id, "session notification sent");
1244
1245            // Notify the sender that the session has been established.
1246            // Set our peer ID in the session ID sent back to them.
1247            let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1248                orig_challenge: session_req.challenge,
1249                session_id,
1250            });
1251
1252            msg_sender
1253                .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1254                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1255                .await
1256                .map_err(|_| {
1257                    error!(%session_id, "timeout sending session establishment message");
1258                    TransportSessionError::Timeout
1259                })?
1260                .map_err(|e| {
1261                    SessionManagerError::Other(format!(
1262                        "failed to send session {session_id} establishment message: {e}"
1263                    ))
1264                })?;
1265
1266            info!(%session_id, "new session established");
1267
1268            #[cfg(all(feature = "prometheus", not(test)))]
1269            {
1270                METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1271                METRIC_ACTIVE_SESSIONS.increment(1.0);
1272            }
1273        } else {
1274            error!(%pseudonym,"failed to reserve a new session slot");
1275
1276            // Notify the sender that the session could not be established
1277            let reason = StartErrorReason::NoSlotsAvailable;
1278            let data = HoprStartProtocol::SessionError(StartErrorType {
1279                challenge: session_req.challenge,
1280                reason,
1281            });
1282
1283            msg_sender
1284                .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1285                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1286                .await
1287                .map_err(|_| {
1288                    error!("timeout sending session error message");
1289                    TransportSessionError::Timeout
1290                })?
1291                .map_err(|e| {
1292                    SessionManagerError::Other(format!("failed to send session establishment error message: {e}"))
1293                })?;
1294
1295            trace!(%pseudonym, "session establishment failure message sent");
1296
1297            #[cfg(all(feature = "prometheus", not(test)))]
1298            METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1299        }
1300
1301        Ok(())
1302    }
1303
1304    async fn handle_start_protocol_message(
1305        &self,
1306        pseudonym: HoprPseudonym,
1307        data: ApplicationDataIn,
1308    ) -> crate::errors::Result<()> {
1309        match HoprStartProtocol::try_from(data.data)? {
1310            HoprStartProtocol::StartSession(session_req) => {
1311                self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1312            }
1313            HoprStartProtocol::SessionEstablished(est) => {
1314                trace!(
1315                    session_id = ?est.session_id,
1316                    "received session establishment confirmation"
1317                );
1318                let challenge = est.orig_challenge;
1319                let session_id = est.session_id;
1320                if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1321                    if let Err(e) = tx_est.unbounded_send(Ok(est)) {
1322                        return Err(SessionManagerError::Other(format!(
1323                            "could not notify session {session_id} establishment: {e}"
1324                        ))
1325                        .into());
1326                    }
1327                    debug!(?session_id, challenge, "session establishment complete");
1328                } else {
1329                    error!(%session_id, challenge, "unknown session establishment attempt or expired");
1330                }
1331            }
1332            HoprStartProtocol::SessionError(error) => {
1333                trace!(
1334                    challenge = error.challenge,
1335                    error = ?error.reason,
1336                    "failed to initialize a session",
1337                );
1338                // Currently, we do not distinguish between individual error types
1339                // and just discard the initiation attempt and pass on the error.
1340                if let Some(tx_est) = self.session_initiations.remove(&error.challenge).await {
1341                    if let Err(e) = tx_est.unbounded_send(Err(error)) {
1342                        return Err(SessionManagerError::Other(format!(
1343                            "could not notify session establishment error {error:?}: {e}"
1344                        ))
1345                        .into());
1346                    }
1347                    error!(
1348                        challenge = error.challenge,
1349                        ?error,
1350                        "session establishment error received"
1351                    );
1352                } else {
1353                    error!(
1354                        challenge = error.challenge,
1355                        ?error,
1356                        "session establishment attempt expired before error could be delivered"
1357                    );
1358                }
1359
1360                #[cfg(all(feature = "prometheus", not(test)))]
1361                METRIC_RECEIVED_SESSION_ERRS.increment(&[&error.reason.to_string()])
1362            }
1363            HoprStartProtocol::KeepAlive(msg) => {
1364                let session_id = msg.session_id;
1365                if let Some(session_slot) = self.sessions.get(&session_id).await {
1366                    trace!(?session_id, "received keep-alive request");
1367                    // If the session has a SURB flow estimator, increase the number of received SURBs.
1368                    // This applies to the incoming sessions currently
1369                    if let Some(estimator) = session_slot.surb_estimator.as_ref() {
1370                        // Two SURBs per Keep-Alive message
1371                        estimator.produced.fetch_add(
1372                            KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64,
1373                            std::sync::atomic::Ordering::Relaxed,
1374                        );
1375                    }
1376                } else {
1377                    debug!(%session_id, "received keep-alive request for an unknown session");
1378                }
1379            }
1380        }
1381
1382        Ok(())
1383    }
1384}
1385
1386#[cfg(test)]
1387mod tests {
1388    use anyhow::anyhow;
1389    use futures::{AsyncWriteExt, future::BoxFuture};
1390    use hopr_crypto_random::Randomizable;
1391    use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
1392    use hopr_primitive_types::prelude::Address;
1393    use hopr_protocol_start::StartProtocolDiscriminants;
1394    use tokio::time::timeout;
1395
1396    use super::*;
1397    use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1398
1399    #[async_trait::async_trait]
1400    trait SendMsg {
1401        async fn send_message(
1402            &self,
1403            routing: DestinationRouting,
1404            data: ApplicationDataOut,
1405        ) -> crate::errors::Result<()>;
1406    }
1407
1408    mockall::mock! {
1409        MsgSender {}
1410        impl SendMsg for MsgSender {
1411            fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationDataOut)
1412            -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1413        }
1414    }
1415
1416    fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationDataOut)> {
1417        let (tx, rx) = futures::channel::mpsc::unbounded();
1418        tokio::task::spawn(async move {
1419            pin_mut!(rx);
1420            while let Some((routing, data)) = rx.next().await {
1421                sender
1422                    .send_message(routing, data)
1423                    .await
1424                    .expect("send message must not fail in mock");
1425            }
1426        });
1427        tx
1428    }
1429
1430    fn msg_type(data: &ApplicationDataOut, expected: StartProtocolDiscriminants) -> bool {
1431        HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1432            .map(|d| StartProtocolDiscriminants::from(d) == expected)
1433            .unwrap_or(false)
1434    }
1435
1436    #[test_log::test(tokio::test)]
1437    async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1438    {
1439        let alice_pseudonym = HoprPseudonym::random();
1440        let bob_peer: Address = (&ChainKeypair::random()).into();
1441
1442        let alice_mgr = SessionManager::new(Default::default());
1443        let bob_mgr = SessionManager::new(Default::default());
1444
1445        let mut sequence = mockall::Sequence::new();
1446        let mut alice_transport = MockMsgSender::new();
1447        let mut bob_transport = MockMsgSender::new();
1448
1449        // Alice sends the StartSession message
1450        let bob_mgr_clone = bob_mgr.clone();
1451        alice_transport
1452            .expect_send_message()
1453            .once()
1454            .in_sequence(&mut sequence)
1455            .withf(move |peer, data| {
1456                info!("alice sends {}", data.data.application_tag);
1457                msg_type(data, StartProtocolDiscriminants::StartSession)
1458                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1459            })
1460            .returning(move |_, data| {
1461                let bob_mgr_clone = bob_mgr_clone.clone();
1462                Box::pin(async move {
1463                    bob_mgr_clone
1464                        .dispatch_message(
1465                            alice_pseudonym,
1466                            ApplicationDataIn {
1467                                data: data.data,
1468                                packet_info: Default::default(),
1469                            },
1470                        )
1471                        .await?;
1472                    Ok(())
1473                })
1474            });
1475
1476        // Bob sends the SessionEstablished message
1477        let alice_mgr_clone = alice_mgr.clone();
1478        bob_transport
1479            .expect_send_message()
1480            .once()
1481            .in_sequence(&mut sequence)
1482            .withf(move |peer, data| {
1483                info!("bob sends {}", data.data.application_tag);
1484                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1485                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1486            })
1487            .returning(move |_, data| {
1488                let alice_mgr_clone = alice_mgr_clone.clone();
1489
1490                Box::pin(async move {
1491                    alice_mgr_clone
1492                        .dispatch_message(
1493                            alice_pseudonym,
1494                            ApplicationDataIn {
1495                                data: data.data,
1496                                packet_info: Default::default(),
1497                            },
1498                        )
1499                        .await?;
1500                    Ok(())
1501                })
1502            });
1503
1504        // Alice sends the terminating segment to close the Session
1505        let bob_mgr_clone = bob_mgr.clone();
1506        alice_transport
1507            .expect_send_message()
1508            .once()
1509            .in_sequence(&mut sequence)
1510            .withf(move |peer, data| {
1511                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1512                    data.data.plain_text.as_ref(),
1513                )
1514                .expect("must be a session message")
1515                .try_as_segment()
1516                .expect("must be a segment")
1517                .is_terminating()
1518                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1519            })
1520            .returning(move |_, data| {
1521                let bob_mgr_clone = bob_mgr_clone.clone();
1522                Box::pin(async move {
1523                    bob_mgr_clone
1524                        .dispatch_message(
1525                            alice_pseudonym,
1526                            ApplicationDataIn {
1527                                data: data.data,
1528                                packet_info: Default::default(),
1529                            },
1530                        )
1531                        .await?;
1532                    Ok(())
1533                })
1534            });
1535
1536        let mut ahs = Vec::new();
1537
1538        // Start Alice
1539        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1540        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1541
1542        // Start Bob
1543        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1544        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1545
1546        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1547
1548        pin_mut!(new_session_rx_bob);
1549        let (alice_session, bob_session) = timeout(
1550            Duration::from_secs(2),
1551            futures::future::join(
1552                alice_mgr.new_session(
1553                    bob_peer,
1554                    SessionTarget::TcpStream(target.clone()),
1555                    SessionClientConfig {
1556                        pseudonym: alice_pseudonym.into(),
1557                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1558                        surb_management: None,
1559                        ..Default::default()
1560                    },
1561                ),
1562                new_session_rx_bob.next(),
1563            ),
1564        )
1565        .await?;
1566
1567        let mut alice_session = alice_session?;
1568        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1569
1570        assert_eq!(
1571            alice_session.config().capabilities,
1572            Capability::Segmentation | Capability::NoRateControl
1573        );
1574        assert_eq!(
1575            alice_session.config().capabilities,
1576            bob_session.session.config().capabilities
1577        );
1578        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1579
1580        assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1581        assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1582        assert!(
1583            alice_mgr
1584                .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1585                .await
1586                .is_err()
1587        );
1588
1589        assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1590        assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1591        assert!(
1592            bob_mgr
1593                .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1594                .await
1595                .is_err()
1596        );
1597
1598        tokio::time::sleep(Duration::from_millis(100)).await;
1599        alice_session.close().await?;
1600
1601        tokio::time::sleep(Duration::from_millis(100)).await;
1602
1603        assert!(matches!(
1604            alice_mgr.ping_session(alice_session.id()).await,
1605            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1606        ));
1607
1608        futures::stream::iter(ahs)
1609            .for_each(|ah| async move { ah.abort() })
1610            .await;
1611
1612        Ok(())
1613    }
1614
1615    #[test_log::test(tokio::test)]
1616    async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1617        let alice_pseudonym = HoprPseudonym::random();
1618        let bob_peer: Address = (&ChainKeypair::random()).into();
1619
1620        let cfg = SessionManagerConfig {
1621            idle_timeout: Duration::from_millis(200),
1622            ..Default::default()
1623        };
1624
1625        let alice_mgr = SessionManager::new(cfg);
1626        let bob_mgr = SessionManager::new(Default::default());
1627
1628        let mut sequence = mockall::Sequence::new();
1629        let mut alice_transport = MockMsgSender::new();
1630        let mut bob_transport = MockMsgSender::new();
1631
1632        // Alice sends the StartSession message
1633        let bob_mgr_clone = bob_mgr.clone();
1634        alice_transport
1635            .expect_send_message()
1636            .once()
1637            .in_sequence(&mut sequence)
1638            .withf(move |peer, data| {
1639                msg_type(data, StartProtocolDiscriminants::StartSession)
1640                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1641            })
1642            .returning(move |_, data| {
1643                let bob_mgr_clone = bob_mgr_clone.clone();
1644                Box::pin(async move {
1645                    bob_mgr_clone
1646                        .dispatch_message(
1647                            alice_pseudonym,
1648                            ApplicationDataIn {
1649                                data: data.data,
1650                                packet_info: Default::default(),
1651                            },
1652                        )
1653                        .await?;
1654                    Ok(())
1655                })
1656            });
1657
1658        // Bob sends the SessionEstablished message
1659        let alice_mgr_clone = alice_mgr.clone();
1660        bob_transport
1661            .expect_send_message()
1662            .once()
1663            .in_sequence(&mut sequence)
1664            .withf(move |peer, data| {
1665                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1666                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1667            })
1668            .returning(move |_, data| {
1669                let alice_mgr_clone = alice_mgr_clone.clone();
1670
1671                Box::pin(async move {
1672                    alice_mgr_clone
1673                        .dispatch_message(
1674                            alice_pseudonym,
1675                            ApplicationDataIn {
1676                                data: data.data,
1677                                packet_info: Default::default(),
1678                            },
1679                        )
1680                        .await?;
1681                    Ok(())
1682                })
1683            });
1684
1685        let mut ahs = Vec::new();
1686
1687        // Start Alice
1688        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1689        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1690
1691        // Start Bob
1692        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1693        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1694
1695        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1696
1697        pin_mut!(new_session_rx_bob);
1698        let (alice_session, bob_session) = timeout(
1699            Duration::from_secs(2),
1700            futures::future::join(
1701                alice_mgr.new_session(
1702                    bob_peer,
1703                    SessionTarget::TcpStream(target.clone()),
1704                    SessionClientConfig {
1705                        pseudonym: alice_pseudonym.into(),
1706                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1707                        surb_management: None,
1708                        ..Default::default()
1709                    },
1710                ),
1711                new_session_rx_bob.next(),
1712            ),
1713        )
1714        .await?;
1715
1716        let alice_session = alice_session?;
1717        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1718
1719        assert_eq!(
1720            alice_session.config().capabilities,
1721            Capability::Segmentation | Capability::NoRateControl,
1722        );
1723        assert_eq!(
1724            alice_session.config().capabilities,
1725            bob_session.session.config().capabilities
1726        );
1727        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1728
1729        // Let the session timeout at Alice
1730        tokio::time::sleep(Duration::from_millis(300)).await;
1731
1732        assert!(matches!(
1733            alice_mgr.ping_session(alice_session.id()).await,
1734            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1735        ));
1736
1737        futures::stream::iter(ahs)
1738            .for_each(|ah| async move { ah.abort() })
1739            .await;
1740
1741        Ok(())
1742    }
1743
1744    #[test_log::test(tokio::test)]
1745    async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1746        let alice_pseudonym = HoprPseudonym::random();
1747        let session_id = SessionId::new(16u64, alice_pseudonym);
1748        let balancer_cfg = SurbBalancerConfig {
1749            target_surb_buffer_size: 1000,
1750            max_surbs_per_sec: 100,
1751            ..Default::default()
1752        };
1753
1754        let alice_mgr = SessionManager::<
1755            UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
1756            futures::channel::mpsc::Sender<IncomingSession>,
1757        >::new(Default::default());
1758
1759        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1760        alice_mgr
1761            .sessions
1762            .insert(
1763                session_id,
1764                SessionSlot {
1765                    session_tx: Arc::new(dummy_tx),
1766                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1767                    abort_handles: Vec::new(),
1768                    surb_mgmt: Some(balancer_cfg.clone()),
1769                    surb_estimator: None,
1770                },
1771            )
1772            .await;
1773
1774        let actual_cfg = alice_mgr
1775            .get_surb_balancer_config(&session_id)
1776            .await?
1777            .ok_or(anyhow!("session must have a surb balancer config"))?;
1778        assert_eq!(actual_cfg, balancer_cfg);
1779
1780        let new_cfg = SurbBalancerConfig {
1781            target_surb_buffer_size: 2000,
1782            max_surbs_per_sec: 200,
1783            ..Default::default()
1784        };
1785        alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1786
1787        let actual_cfg = alice_mgr
1788            .get_surb_balancer_config(&session_id)
1789            .await?
1790            .ok_or(anyhow!("session must have a surb balancer config"))?;
1791        assert_eq!(actual_cfg, new_cfg);
1792
1793        Ok(())
1794    }
1795
1796    #[test_log::test(tokio::test)]
1797    async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1798        let alice_pseudonym = HoprPseudonym::random();
1799        let bob_peer: Address = (&ChainKeypair::random()).into();
1800
1801        let cfg = SessionManagerConfig {
1802            session_tag_range: 16u64..17u64, // Slot for exactly one session
1803            ..Default::default()
1804        };
1805
1806        let alice_mgr = SessionManager::new(Default::default());
1807        let bob_mgr = SessionManager::new(cfg);
1808
1809        // Occupy the only free slot with tag 16
1810        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1811        bob_mgr
1812            .sessions
1813            .insert(
1814                SessionId::new(16u64, alice_pseudonym),
1815                SessionSlot {
1816                    session_tx: Arc::new(dummy_tx),
1817                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1818                    abort_handles: Vec::new(),
1819                    surb_mgmt: None,
1820                    surb_estimator: None,
1821                },
1822            )
1823            .await;
1824
1825        let mut sequence = mockall::Sequence::new();
1826        let mut alice_transport = MockMsgSender::new();
1827        let mut bob_transport = MockMsgSender::new();
1828
1829        // Alice sends the StartSession message
1830        let bob_mgr_clone = bob_mgr.clone();
1831        alice_transport
1832            .expect_send_message()
1833            .once()
1834            .in_sequence(&mut sequence)
1835            .withf(move |peer, data| {
1836                msg_type(data, StartProtocolDiscriminants::StartSession)
1837                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1838            })
1839            .returning(move |_, data| {
1840                let bob_mgr_clone = bob_mgr_clone.clone();
1841                Box::pin(async move {
1842                    bob_mgr_clone
1843                        .dispatch_message(
1844                            alice_pseudonym,
1845                            ApplicationDataIn {
1846                                data: data.data,
1847                                packet_info: Default::default(),
1848                            },
1849                        )
1850                        .await?;
1851                    Ok(())
1852                })
1853            });
1854
1855        // Bob sends the SessionError message
1856        let alice_mgr_clone = alice_mgr.clone();
1857        bob_transport
1858            .expect_send_message()
1859            .once()
1860            .in_sequence(&mut sequence)
1861            .withf(move |peer, data| {
1862                msg_type(data, StartProtocolDiscriminants::SessionError)
1863                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1864            })
1865            .returning(move |_, data| {
1866                let alice_mgr_clone = alice_mgr_clone.clone();
1867                Box::pin(async move {
1868                    alice_mgr_clone
1869                        .dispatch_message(
1870                            alice_pseudonym,
1871                            ApplicationDataIn {
1872                                data: data.data,
1873                                packet_info: Default::default(),
1874                            },
1875                        )
1876                        .await?;
1877                    Ok(())
1878                })
1879            });
1880
1881        let mut jhs = Vec::new();
1882
1883        // Start Alice
1884        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1885        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1886
1887        // Start Bob
1888        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
1889        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1890
1891        let result = alice_mgr
1892            .new_session(
1893                bob_peer,
1894                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1895                SessionClientConfig {
1896                    capabilities: Capabilities::empty(),
1897                    pseudonym: alice_pseudonym.into(),
1898                    surb_management: None,
1899                    ..Default::default()
1900                },
1901            )
1902            .await;
1903
1904        assert!(
1905            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1906        );
1907
1908        Ok(())
1909    }
1910
1911    #[test_log::test(tokio::test)]
1912    async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
1913    -> anyhow::Result<()> {
1914        let alice_pseudonym = HoprPseudonym::random();
1915        let bob_peer: Address = (&ChainKeypair::random()).into();
1916
1917        let cfg = SessionManagerConfig {
1918            maximum_sessions: 1,
1919            ..Default::default()
1920        };
1921
1922        let alice_mgr = SessionManager::new(Default::default());
1923        let bob_mgr = SessionManager::new(cfg);
1924
1925        // Occupy the only free slot with tag 16
1926        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1927        bob_mgr
1928            .sessions
1929            .insert(
1930                SessionId::new(16u64, alice_pseudonym),
1931                SessionSlot {
1932                    session_tx: Arc::new(dummy_tx),
1933                    routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
1934                    abort_handles: Vec::new(),
1935                    surb_mgmt: None,
1936                    surb_estimator: None,
1937                },
1938            )
1939            .await;
1940
1941        let mut sequence = mockall::Sequence::new();
1942        let mut alice_transport = MockMsgSender::new();
1943        let mut bob_transport = MockMsgSender::new();
1944
1945        // Alice sends the StartSession message
1946        let bob_mgr_clone = bob_mgr.clone();
1947        alice_transport
1948            .expect_send_message()
1949            .once()
1950            .in_sequence(&mut sequence)
1951            .withf(move |peer, data| {
1952                msg_type(data, StartProtocolDiscriminants::StartSession)
1953                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1954            })
1955            .returning(move |_, data| {
1956                let bob_mgr_clone = bob_mgr_clone.clone();
1957                Box::pin(async move {
1958                    bob_mgr_clone
1959                        .dispatch_message(
1960                            alice_pseudonym,
1961                            ApplicationDataIn {
1962                                data: data.data,
1963                                packet_info: Default::default(),
1964                            },
1965                        )
1966                        .await?;
1967                    Ok(())
1968                })
1969            });
1970
1971        // Bob sends the SessionError message
1972        let alice_mgr_clone = alice_mgr.clone();
1973        bob_transport
1974            .expect_send_message()
1975            .once()
1976            .in_sequence(&mut sequence)
1977            .withf(move |peer, data| {
1978                msg_type(data, StartProtocolDiscriminants::SessionError)
1979                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1980            })
1981            .returning(move |_, data| {
1982                let alice_mgr_clone = alice_mgr_clone.clone();
1983                Box::pin(async move {
1984                    alice_mgr_clone
1985                        .dispatch_message(
1986                            alice_pseudonym,
1987                            ApplicationDataIn {
1988                                data: data.data,
1989                                packet_info: Default::default(),
1990                            },
1991                        )
1992                        .await?;
1993                    Ok(())
1994                })
1995            });
1996
1997        let mut jhs = Vec::new();
1998
1999        // Start Alice
2000        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2001        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2002
2003        // Start Bob
2004        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2005        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2006
2007        let result = alice_mgr
2008            .new_session(
2009                bob_peer,
2010                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2011                SessionClientConfig {
2012                    capabilities: None.into(),
2013                    pseudonym: alice_pseudonym.into(),
2014                    surb_management: None,
2015                    ..Default::default()
2016                },
2017            )
2018            .await;
2019
2020        assert!(
2021            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2022        );
2023
2024        Ok(())
2025    }
2026
2027    #[test_log::test(tokio::test)]
2028    async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
2029        let alice_pseudonym = HoprPseudonym::random();
2030        let bob_peer: Address = (&ChainKeypair::random()).into();
2031
2032        let alice_mgr = SessionManager::new(Default::default());
2033
2034        let mut sequence = mockall::Sequence::new();
2035        let mut alice_transport = MockMsgSender::new();
2036
2037        // Alice sends the StartSession message
2038        let alice_mgr_clone = alice_mgr.clone();
2039        alice_transport
2040            .expect_send_message()
2041            .once()
2042            .in_sequence(&mut sequence)
2043            .withf(move |peer, data| {
2044                msg_type(data, StartProtocolDiscriminants::StartSession)
2045                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2046            })
2047            .returning(move |_, data| {
2048                // But the message is again processed by Alice due to Loopback
2049                let alice_mgr_clone = alice_mgr_clone.clone();
2050                Box::pin(async move {
2051                    alice_mgr_clone
2052                        .dispatch_message(
2053                            alice_pseudonym,
2054                            ApplicationDataIn {
2055                                data: data.data,
2056                                packet_info: Default::default(),
2057                            },
2058                        )
2059                        .await?;
2060                    Ok(())
2061                })
2062            });
2063
2064        // Alice sends the SessionEstablished message (as Bob)
2065        let alice_mgr_clone = alice_mgr.clone();
2066        alice_transport
2067            .expect_send_message()
2068            .once()
2069            .in_sequence(&mut sequence)
2070            .withf(move |peer, data| {
2071                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2072                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2073            })
2074            .returning(move |_, data| {
2075                let alice_mgr_clone = alice_mgr_clone.clone();
2076
2077                Box::pin(async move {
2078                    alice_mgr_clone
2079                        .dispatch_message(
2080                            alice_pseudonym,
2081                            ApplicationDataIn {
2082                                data: data.data,
2083                                packet_info: Default::default(),
2084                            },
2085                        )
2086                        .await?;
2087                    Ok(())
2088                })
2089            });
2090
2091        // Start Alice
2092        let (new_session_tx_alice, new_session_rx_alice) = futures::channel::mpsc::channel(1024);
2093        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2094
2095        let alice_session = alice_mgr
2096            .new_session(
2097                bob_peer,
2098                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2099                SessionClientConfig {
2100                    capabilities: None.into(),
2101                    pseudonym: alice_pseudonym.into(),
2102                    surb_management: None,
2103                    ..Default::default()
2104                },
2105            )
2106            .await;
2107
2108        println!("{alice_session:?}");
2109        assert!(matches!(
2110            alice_session,
2111            Err(TransportSessionError::Manager(SessionManagerError::Loopback))
2112        ));
2113
2114        drop(new_session_rx_alice);
2115        Ok(())
2116    }
2117
2118    #[test_log::test(tokio::test)]
2119    async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
2120        let bob_peer: Address = (&ChainKeypair::random()).into();
2121
2122        let cfg = SessionManagerConfig {
2123            initiation_timeout_base: Duration::from_millis(100),
2124            ..Default::default()
2125        };
2126
2127        let alice_mgr = SessionManager::new(cfg);
2128        let bob_mgr = SessionManager::new(Default::default());
2129
2130        let mut sequence = mockall::Sequence::new();
2131        let mut alice_transport = MockMsgSender::new();
2132        let bob_transport = MockMsgSender::new();
2133
2134        // Alice sends the StartSession message, but Bob does not handle it
2135        alice_transport
2136            .expect_send_message()
2137            .once()
2138            .in_sequence(&mut sequence)
2139            .withf(move |peer, data| {
2140                msg_type(data, StartProtocolDiscriminants::StartSession)
2141                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2142            })
2143            .returning(|_, _| Box::pin(async { Ok(()) }));
2144
2145        // Start Alice
2146        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2147        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2148
2149        // Start Bob
2150        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2151        bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
2152
2153        let result = alice_mgr
2154            .new_session(
2155                bob_peer,
2156                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2157                SessionClientConfig {
2158                    capabilities: None.into(),
2159                    pseudonym: None,
2160                    surb_management: None,
2161                    ..Default::default()
2162                },
2163            )
2164            .await;
2165
2166        assert!(matches!(result, Err(TransportSessionError::Timeout)));
2167
2168        Ok(())
2169    }
2170
2171    #[test_log::test(tokio::test)]
2172    async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
2173        let alice_pseudonym = HoprPseudonym::random();
2174        let bob_peer: Address = (&ChainKeypair::random()).into();
2175
2176        let bob_cfg = SessionManagerConfig::default();
2177        let alice_mgr = SessionManager::new(Default::default());
2178        let bob_mgr = SessionManager::new(bob_cfg.clone());
2179
2180        let mut alice_transport = MockMsgSender::new();
2181        let mut bob_transport = MockMsgSender::new();
2182
2183        // Alice sends the StartSession message
2184        let mut open_sequence = mockall::Sequence::new();
2185        let bob_mgr_clone = bob_mgr.clone();
2186        alice_transport
2187            .expect_send_message()
2188            .once()
2189            .in_sequence(&mut open_sequence)
2190            .withf(move |peer, data| {
2191                msg_type(data, StartProtocolDiscriminants::StartSession)
2192                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2193            })
2194            .returning(move |_, data| {
2195                let bob_mgr_clone = bob_mgr_clone.clone();
2196                Box::pin(async move {
2197                    bob_mgr_clone
2198                        .dispatch_message(
2199                            alice_pseudonym,
2200                            ApplicationDataIn {
2201                                data: data.data,
2202                                packet_info: Default::default(),
2203                            },
2204                        )
2205                        .await?;
2206                    Ok(())
2207                })
2208            });
2209
2210        // Bob sends the SessionEstablished message
2211        let alice_mgr_clone = alice_mgr.clone();
2212        bob_transport
2213            .expect_send_message()
2214            .once()
2215            .in_sequence(&mut open_sequence)
2216            .withf(move |peer, data| {
2217                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2218                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2219            })
2220            .returning(move |_, data| {
2221                let alice_mgr_clone = alice_mgr_clone.clone();
2222                Box::pin(async move {
2223                    alice_mgr_clone
2224                        .dispatch_message(
2225                            alice_pseudonym,
2226                            ApplicationDataIn {
2227                                data: data.data,
2228                                packet_info: Default::default(),
2229                            },
2230                        )
2231                        .await?;
2232                    Ok(())
2233                })
2234            });
2235
2236        // Alice sends the KeepAlive messages
2237        let bob_mgr_clone = bob_mgr.clone();
2238        alice_transport
2239            .expect_send_message()
2240            .times(5..)
2241            //.in_sequence(&mut sequence)
2242            .withf(move |peer, data| {
2243                msg_type(data, StartProtocolDiscriminants::KeepAlive)
2244                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2245            })
2246            .returning(move |_, data| {
2247                let bob_mgr_clone = bob_mgr_clone.clone();
2248                Box::pin(async move {
2249                    bob_mgr_clone
2250                        .dispatch_message(
2251                            alice_pseudonym,
2252                            ApplicationDataIn {
2253                                data: data.data,
2254                                packet_info: Default::default(),
2255                            },
2256                        )
2257                        .await?;
2258                    Ok(())
2259                })
2260            });
2261
2262        // Alice sends the terminating segment to close the Session
2263        let bob_mgr_clone = bob_mgr.clone();
2264        alice_transport
2265            .expect_send_message()
2266            .once()
2267            //.in_sequence(&mut sequence)
2268            .withf(move |peer, data| {
2269                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2270                    data.data.plain_text.as_ref(),
2271                )
2272                .ok()
2273                .and_then(|m| m.try_as_segment())
2274                .map(|s| s.is_terminating())
2275                .unwrap_or(false)
2276                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2277            })
2278            .returning(move |_, data| {
2279                let bob_mgr_clone = bob_mgr_clone.clone();
2280                Box::pin(async move {
2281                    bob_mgr_clone
2282                        .dispatch_message(
2283                            alice_pseudonym,
2284                            ApplicationDataIn {
2285                                data: data.data,
2286                                packet_info: Default::default(),
2287                            },
2288                        )
2289                        .await?;
2290                    Ok(())
2291                })
2292            });
2293
2294        let mut ahs = Vec::new();
2295
2296        // Start Alice
2297        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2298        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2299
2300        // Start Bob
2301        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
2302        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2303
2304        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2305
2306        let balancer_cfg = SurbBalancerConfig {
2307            target_surb_buffer_size: 10,
2308            max_surbs_per_sec: 100,
2309            ..Default::default()
2310        };
2311
2312        pin_mut!(new_session_rx_bob);
2313        let (alice_session, bob_session) = timeout(
2314            Duration::from_secs(2),
2315            futures::future::join(
2316                alice_mgr.new_session(
2317                    bob_peer,
2318                    SessionTarget::TcpStream(target.clone()),
2319                    SessionClientConfig {
2320                        pseudonym: alice_pseudonym.into(),
2321                        capabilities: Capability::Segmentation.into(),
2322                        surb_management: Some(balancer_cfg),
2323                        ..Default::default()
2324                    },
2325                ),
2326                new_session_rx_bob.next(),
2327            ),
2328        )
2329        .await?;
2330
2331        let mut alice_session = alice_session?;
2332        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2333
2334        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2335
2336        assert_eq!(
2337            Some(balancer_cfg),
2338            alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2339        );
2340
2341        let remote_cfg = bob_mgr
2342            .get_surb_balancer_config(bob_session.session.id())
2343            .await?
2344            .ok_or(anyhow!("no remote config at bob"))?;
2345        assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2346        assert_eq!(
2347            remote_cfg.max_surbs_per_sec,
2348            remote_cfg.target_surb_buffer_size
2349                / bob_cfg
2350                    .minimum_surb_buffer_duration
2351                    .max(MIN_SURB_BUFFER_DURATION)
2352                    .as_secs()
2353        );
2354
2355        // Let the Surb balancer send enough KeepAlive messages
2356        tokio::time::sleep(Duration::from_millis(1500)).await;
2357        alice_session.close().await?;
2358
2359        tokio::time::sleep(Duration::from_millis(300)).await;
2360        assert!(matches!(
2361            alice_mgr.ping_session(alice_session.id()).await,
2362            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2363        ));
2364
2365        futures::stream::iter(ahs)
2366            .for_each(|ah| async move { ah.abort() })
2367            .await;
2368
2369        Ok(())
2370    }
2371}