Skip to main content

hopr_transport_session/
manager.rs

1use std::{
2    ops::Range,
3    pin::Pin,
4    sync::{Arc, OnceLock},
5    time::Duration,
6};
7
8use anyhow::anyhow;
9use futures::{
10    FutureExt, Sink, SinkExt, StreamExt, TryStreamExt,
11    channel::mpsc::{Sender, UnboundedSender},
12    future::AbortHandle,
13    pin_mut,
14};
15use futures_time::future::FutureExt as TimeExt;
16use hopr_crypto_packet::prelude::HoprPacket;
17use hopr_protocol_app::prelude::*;
18use hopr_protocol_start::{
19    KeepAliveFlag, KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished,
20    StartInitiation,
21};
22use hopr_transport_tag_allocator::{AllocatedTag, TagAllocator};
23use hopr_api::types::{
24    crypto_random::Randomizable,
25    internal::{
26        prelude::HoprPseudonym,
27        routing::{DestinationRouting, RoutingOptions},
28    },
29    primitive::prelude::Address,
30};
31use hopr_utils::runtime::AbortableList;
32use tracing::{debug, error, info, trace, warn};
33
34#[cfg(feature = "telemetry")]
35use crate::telemetry::{
36    SessionLifecycleState, initialize_session_metrics, remove_session_metrics_state, set_session_balancer_data,
37    set_session_state,
38};
39use crate::{
40    Capability, HoprSession, IncomingSession, SESSION_MTU, SessionClientConfig, SessionId, SessionTarget,
41    SurbBalancerConfig,
42    balancer::{
43        AtomicSurbFlowEstimator, BalancerStateValues, RateController, RateLimitSinkExt, SurbBalancer,
44        SurbControllerWithCorrection,
45        pid::{PidBalancerController, PidControllerGains},
46        simple::SimpleBalancerController,
47    },
48    errors::{SessionManagerError, TransportSessionError},
49    types::{ByteCapabilities, ClosureReason, HoprSessionConfig, HoprStartProtocol},
50    utils,
51    utils::{SurbNotificationMode, insert_into_next_slot},
52};
53
54#[cfg(all(feature = "telemetry", not(test)))]
55lazy_static::lazy_static! {
56    static ref METRIC_ACTIVE_SESSIONS: hopr_api::types::telemetry::SimpleGauge = hopr_api::types::telemetry::SimpleGauge::new(
57        "hopr_session_num_active_sessions",
58        "Number of currently active HOPR sessions"
59    ).unwrap();
60    static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_api::types::telemetry::SimpleCounter = hopr_api::types::telemetry::SimpleCounter::new(
61        "hopr_session_established_sessions_count",
62        "Number of sessions that were successfully established as an Exit node"
63    ).unwrap();
64    static ref METRIC_NUM_INITIATED_SESSIONS: hopr_api::types::telemetry::SimpleCounter = hopr_api::types::telemetry::SimpleCounter::new(
65        "hopr_session_initiated_sessions_count",
66        "Number of sessions that were successfully initiated as an Entry node"
67    ).unwrap();
68    static ref METRIC_RECEIVED_SESSION_ERRS: hopr_api::types::telemetry::MultiCounter = hopr_api::types::telemetry::MultiCounter::new(
69        "hopr_session_received_error_count",
70        "Number of HOPR session errors received from an Exit node",
71        &["kind"]
72    ).unwrap();
73    static ref METRIC_SENT_SESSION_ERRS: hopr_api::types::telemetry::MultiCounter = hopr_api::types::telemetry::MultiCounter::new(
74        "hopr_session_sent_error_count",
75        "Number of HOPR session errors sent to an Entry node",
76        &["kind"]
77    ).unwrap();
78}
79
80fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
81    debug!(?session_id, ?reason, "closing session");
82
83    #[cfg(feature = "telemetry")]
84    {
85        set_session_state(&session_id, SessionLifecycleState::Closed);
86        remove_session_metrics_state(&session_id);
87    }
88
89    if reason != ClosureReason::EmptyRead {
90        // Closing the data sender will also cause it to close from the read side
91        session_data.session_tx.close_channel();
92        trace!(?session_id, "data tx channel closed on session");
93    }
94
95    // Terminate any additional tasks spawned by the Session
96    session_data.abort_handles.lock().abort_all();
97
98    #[cfg(all(feature = "telemetry", not(test)))]
99    METRIC_ACTIVE_SESSIONS.decrement(1.0);
100}
101
102fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
103    base * (hops as u32)
104}
105
106/// Minimum time the SURB buffer must endure if no SURBs are being produced.
107pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
108/// Minimum time between SURB buffer notifications to the Entry.
109pub const MIN_SURB_BUFFER_NOTIFICATION_PERIOD: Duration = Duration::from_secs(1);
110
111/// The first challenge value used in Start protocol to initiate a session.
112pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
113
114/// Maximum time to wait for counterparty to receive the target number of SURBs.
115const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
116
117/// Minimum timeout until an unfinished frame is discarded.
118const MIN_FRAME_TIMEOUT: Duration = Duration::from_millis(10);
119
120// Needs to use an UnboundedSender instead of oneshot
121// because Moka cache requires the value to be Clone, which oneshot Sender is not.
122// It also cannot be enclosed in an Arc, since calling `send` consumes the oneshot Sender.
123type SessionInitiationCache =
124    moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
125
126#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
127enum SessionTasks {
128    KeepAlive,
129    Balancer,
130}
131
132#[derive(Clone)]
133struct SessionSlot {
134    // Sender needs to be put in Arc, so that no clones are made by `moka`.
135    // This makes sure that the entire channel closes once the one and only sender is closed.
136    session_tx: Arc<UnboundedSender<ApplicationDataIn>>,
137    routing_opts: DestinationRouting,
138    // Additional tasks spawned by the Session.
139    abort_handles: Arc<parking_lot::Mutex<AbortableList<SessionTasks>>>,
140    // Allows reconfiguring of the SURB balancer on-the-fly
141    // Set on both Entry and Exit sides.
142    surb_mgmt: Arc<BalancerStateValues>,
143    // SURB flow updates happening outside of Session protocol
144    // (e.g., due to Start protocol messages).
145    surb_estimator: AtomicSurbFlowEstimator,
146    // Holds the allocated tag so it is returned to the pool when the session is evicted.
147    // `None` for outgoing sessions where the tag was assigned by the remote peer.
148    #[allow(dead_code, reason = "kept alive for Drop-based tag deallocation")]
149    allocated_tag: Option<Arc<AllocatedTag>>,
150}
151
152/// Indicates the result of processing a message.
153#[derive(Clone, Debug, PartialEq, Eq)]
154pub enum DispatchResult {
155    /// Session or Start protocol message has been processed successfully.
156    Processed,
157    /// The message was not related to Start or Session protocol.
158    Unrelated(ApplicationDataIn),
159}
160
161/// Configuration for the [`SessionManager`].
162#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
163pub struct SessionManagerConfig {
164    /// The maximum chunk of data that can be written to the Session's input buffer.
165    ///
166    /// Default is 1500.
167    #[default(1500)]
168    pub frame_mtu: usize,
169
170    /// The maximum time for an incomplete frame to stay in the Session's output buffer.
171    ///
172    /// Default is 800 ms.
173    #[default(Duration::from_millis(800))]
174    pub max_frame_timeout: Duration,
175
176    /// The base timeout for initiation of Session initiation.
177    ///
178    /// The actual timeout is adjusted according to the number of hops for that Session:
179    /// `t = initiation_time_out_base * (num_forward_hops + num_return_hops + 2)`
180    ///
181    /// Default is 500 milliseconds.
182    #[default(Duration::from_millis(500))]
183    pub initiation_timeout_base: Duration,
184
185    /// Timeout for Session to be closed due to inactivity.
186    ///
187    /// Default is 180 seconds.
188    #[default(Duration::from_secs(180))]
189    pub idle_timeout: Duration,
190
191    /// The sampling interval for SURB balancer.
192    /// It will make SURB control decisions regularly at this interval.
193    ///
194    /// Default is 100 milliseconds.
195    #[default(Duration::from_millis(100))]
196    pub balancer_sampling_interval: Duration,
197
198    /// Initial packets per second egress rate on an incoming Session.
199    ///
200    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
201    ///
202    /// Default is 10 packets/second.
203    #[default(10)]
204    pub initial_return_session_egress_rate: usize,
205
206    /// Minimum period of time for which a SURB buffer at the Exit must
207    /// endure if no SURBs are being received.
208    ///
209    /// In other words, it is the minimum period of time an Exit must withstand when
210    /// no SURBs are received from the Entry at all. To do so, the egress traffic
211    /// will be shaped accordingly to meet this requirement.
212    ///
213    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
214    ///
215    /// Default is 5 seconds, minimum is 1 second.
216    #[default(Duration::from_secs(5))]
217    pub minimum_surb_buffer_duration: Duration,
218
219    /// Indicates the maximum number of SURBs in the SURB buffer to be requested when creating a new Session.
220    ///
221    /// This value is theoretically capped by the size of the global transport SURB ring buffer,
222    /// so values greater than that do not make sense. This value should be ideally set equal
223    /// to the size of the global transport SURB RB.
224    ///
225    /// Default is 10 000 SURBs.
226    #[default(10_000)]
227    pub maximum_surb_buffer_size: usize,
228
229    /// If set, the Session recipient (Exit) will notify the Session initiator (Entry) about
230    /// its SURB balance for the Session using keep-alive packets periodically.
231    ///
232    /// Keep in mind that each notification also costs 1 SURB, so the notification period should
233    /// not be too frequent.
234    ///
235    /// Default is None (no notification sent to the client), minimum is 1 second.
236    #[default(None)]
237    pub surb_balance_notify_period: Option<Duration>,
238
239    /// If set, the Session initiator (Entry) will notify the Session recipient (Exit) about
240    /// the local SURB balancer target using keep-alive packets from the SURB balancer.
241    ///
242    /// This is useful when the client plans to change the SURB balancer target dynamically.
243    ///
244    /// Default is true.
245    #[default(true)]
246    pub surb_target_notify: bool,
247}
248
249// Type-erased sink used by the `SessionManager` to notify about newly incoming sessions.
250// The errors produced by the underlying sink are remapped into `SessionManagerError`.
251type IncomingSessionSink = Pin<Box<dyn Sink<IncomingSession, Error = SessionManagerError> + Send>>;
252
253type SessionNotifiers = (
254    Arc<hopr_utils::runtime::prelude::Mutex<IncomingSessionSink>>,
255    Sender<(SessionId, ClosureReason)>,
256);
257
258/// Manages lifecycles of Sessions.
259///
260/// Once the manager is [started](SessionManager::start), the [`SessionManager::dispatch_message`]
261/// should be called for each [`ApplicationData`] received by the node.
262/// This way, the `SessionManager` takes care of proper Start sub-protocol message processing
263/// and correct dispatch of Session-related packets to individual existing Sessions.
264///
265/// Secondly, the manager can initiate new outgoing sessions via [`SessionManager::new_session`],
266/// probe sessions using [`SessionManager::ping_session`]
267/// and list them via [`SessionManager::active_sessions`].
268///
269/// Since the `SessionManager` operates over the HOPR protocol,
270/// the message transport `S` is required.
271/// Such transport must also be `Clone`, since it will be cloned into all the created [`HoprSession`] objects.
272///
273/// ## SURB balancing
274/// The manager also can take care of automatic [SURB balancing](SurbBalancerConfig) per Session.
275///
276/// With each packet sent from the session initiator over to the receiving party, zero to 2 SURBs might be delivered.
277/// When the receiving party wants to send reply packets back, it must consume 1 SURB per packet. This
278/// means that if the difference between the SURBs delivered and SURBs consumed is negative, the receiving party
279/// might soon run out of SURBs. If SURBs run out, the reply packets will be dropped, causing likely quality of
280/// service degradation.
281///
282/// In an attempt to counter this effect, there are two co-existing automated modes of SURB balancing:
283/// *local SURB balancing* and *remote SURB balancing*.
284///
285/// ### Local SURB balancing
286/// Local SURB balancing is performed on the sessions that were initiated by another party (and are
287/// therefore incoming to us).
288/// The local SURB balancing mechanism continuously evaluates the rate of SURB consumption and retrieval,
289/// and if SURBs are running out, the packet egress shaping takes effect. This by itself does not
290/// avoid the depletion of SURBs but slows it down in the hope that the initiating party can deliver
291/// more SURBs over time. This might happen either organically by sending effective payloads that
292/// allow non-zero number of SURBs in the packet, or non-organically by delivering KeepAlive messages
293/// via *remote SURB balancing*.
294///
295/// The egress shaping is done automatically, unless the Session initiator sets the [`Capability::NoRateControl`]
296/// flag during Session initiation.
297///
298/// ### Remote SURB balancing
299/// Remote SURB balancing is performed by the Session initiator. The SURB balancer estimates the number of SURBs
300/// delivered to the other party, and also the number of SURBs consumed by seeing the amount of traffic received
301/// in replies.
302/// When enabled, a desired target level of SURBs at the Session counterparty is set. According to measured
303/// inflow and outflow of SURBs to/from the counterparty, the production of non-organic SURBs is started
304/// via keep-alive messages (sent to counterparty) and is controlled to maintain that target level.
305///
306/// In other words, the Session initiator tries to compensate for the usage of SURBs by the counterparty by
307/// sending new ones via the keep-alive messages.
308///
309/// This mechanism is configurable via the `surb_management` field in [`SessionClientConfig`].
310///
311/// ### Possible scenarios
312/// There are 4 different scenarios of local vs. remote SURB balancing configuration, but
313/// an equilibrium (= matching the SURB production and consumption) is most likely to be reached
314/// only when both are configured (the ideal case below):
315///
316/// #### 1. Ideal local and remote SURB balancing
317/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
318///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
319/// 2. The Session initiator (Entry) sets the [`target_surb_buffer_size`](SurbBalancerConfig) which matches the
320///    [`maximum_surb_buffer_size`](SessionManagerConfig) of the counterparty.
321/// 3. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
322///    Session.
323/// 4. The Session initiator (Entry) sets [`max_surbs_per_sec`](SurbBalancerConfig) slightly higher than the
324///    `maximum_surb_buffer_size / max_surb_buffer_duration` value configured at the counterparty.
325///
326/// In this situation, the maximum Session egress from Exit to the Entry is given by the
327/// `maximum_surb_buffer_size / max_surb_buffer_duration` ratio. If there is enough bandwidth,
328/// the (remote) SURB balancer sending SURBs to the Exit will stabilize roughly at this rate of SURBs/sec,
329/// and the whole system will be in equilibrium during the Session's lifetime (under ideal network conditions).
330///
331/// #### 2. Remote SURB balancing only
332/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
333/// 2. The Session initiator (Entry) sets `max_surbs_per_sec` and `target_surb_buffer_size` values in
334///    [`SurbBalancerConfig`]
335///
336/// In this one-sided situation, the Entry node floods the Exit node with SURBs,
337/// only based on its estimated consumption of SURBs at the Exit. The Exit's egress is not
338/// rate-limited at all. If the Exit runs out of SURBs at any point in time, it will simply drop egress packets.
339///
340/// This configuration could potentially only lead to an equilibrium
341/// when the `SurbBalancer` at the Entry can react fast enough to Exit's demand.
342///
343/// #### 3. Local SURB balancing only
344/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
345///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
346/// 2. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
347///    Session.
348/// 3. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
349///
350/// In this one-sided situation, the Entry node does not provide any additional SURBs at all (except the
351/// ones that are naturally carried by the egress packets which have space to hold SURBs). It relies
352/// only on the Session egress limiting of the Exit node.
353/// The Exit will limit the egress roughly to the rate of natural SURB occurrence in the ingress.
354///
355/// This configuration could potentially only lead to an equilibrium when uploading non-full packets
356/// (ones that can carry at least a single SURB), and the Exit's egress is limiting itself to such a rate.
357/// If Exit's egress reaches low values due to SURB scarcity, the upper layer protocols over Session might break.
358///
359/// #### 4. No SURB balancing on each side
360/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
361/// 2. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
362///
363/// In this situation, no additional SURBs are being produced by the Entry and no Session egress rate-limiting
364/// takes place at the Exit.
365///
366/// This configuration can only lead to an equilibrium when Entry sends non-full packets (ones that carry
367/// at least a single SURB) and the Exit is consuming the SURBs (Session egress) at a slower or equal rate.
368/// Such configuration is very fragile, as any disturbances in the SURB flow might lead to a packet drop
369/// at the Exit's egress.
370///
371/// ### SURB decay
372/// In a hypothetical scenario of a non-zero packet loss, the Session initiator (Entry) might send a
373/// certain number of SURBs to the Session recipient (Exit), but only a portion of it is actually delivered.
374/// The Entry has no way of knowing that and assumes that everything has been delivered.
375/// A similar problem happens when the Exit uses SURBs to construct return packets, but only a portion
376/// of those packets is actually delivered to the Entry. At this point, the Entry also subtracts
377/// fewer SURBs from its SURB estimate at the Exit.
378///
379/// In both situations, the Entry thinks there are more SURBs available at the Exit than there really are.
380///
381/// To compensate for a potential packet loss, the Entry's estimation of Exit's SURB buffer is regularly
382/// diminished by a percentage of the `target_surb_buffer_size`, even if no incoming traffic from the
383/// Exit is detected.
384///
385/// This behavior can be controlled via the `surb_decay` field of [`SurbBalancerConfig`].
386///
387/// ### SURB balance and target notification
388/// The Session recipient (Exit) can notify the Session initiator (Entry) periodically about its estimated
389/// number of SURBs for the Session. This can help the Entry to adjust its approximation of that level so
390/// that its Local SURB balancer can better intervene.
391/// This can be set using the `surb_balance_notify_period` field of [`SessionManagerConfig`] for the Exit.
392///
393/// Likewise, the Entry can inform the Exit about its desired SURB buffer target so that the Exit
394/// can better accommodate its Remote SURB balancing.
395/// This can be set using the `surb_target_notify` field of the [`SessionManagerConfig`] of each new Session.
396///
397/// Both mechanisms leverage the Keep Alive message to report the respective values.
398pub struct SessionManager<S> {
399    session_initiations: SessionInitiationCache,
400    session_notifiers: Arc<OnceLock<SessionNotifiers>>,
401    sessions: moka::future::Cache<SessionId, SessionSlot>,
402    msg_sender: Arc<OnceLock<S>>,
403    tag_allocator: Arc<dyn TagAllocator + Send + Sync>,
404    /// Tag value range for sessions, derived from the [`TagAllocator`].
405    session_tag_range: Range<u64>,
406    /// Maximum number of concurrent sessions, derived from the [`TagAllocator`] capacity.
407    maximum_sessions: usize,
408    cfg: SessionManagerConfig,
409}
410
411impl<S> Clone for SessionManager<S> {
412    fn clone(&self) -> Self {
413        Self {
414            session_initiations: self.session_initiations.clone(),
415            session_notifiers: self.session_notifiers.clone(),
416            sessions: self.sessions.clone(),
417            cfg: self.cfg.clone(),
418            msg_sender: self.msg_sender.clone(),
419            tag_allocator: self.tag_allocator.clone(),
420            session_tag_range: self.session_tag_range.clone(),
421            maximum_sessions: self.maximum_sessions,
422        }
423    }
424}
425
426const EXTERNAL_SEND_TIMEOUT: Duration = Duration::from_millis(200);
427
428impl<S> SessionManager<S>
429where
430    S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
431    S::Error: std::error::Error + Send + Sync + Clone + 'static,
432{
433    /// Creates a new instance given the [`config`](SessionManagerConfig) and a [`TagAllocator`]
434    /// for allocating session tags on the Exit (incoming) side.
435    pub fn new(mut cfg: SessionManagerConfig, tag_allocator: Arc<dyn TagAllocator + Send + Sync>) -> Self {
436        let session_tag_range = tag_allocator.tag_range();
437        let maximum_sessions = tag_allocator.capacity().max(1) as usize;
438
439        debug_assert!(
440            session_tag_range.start >= ReservedTag::range().end,
441            "session tag range must not overlap with reserved tags"
442        );
443        cfg.surb_balance_notify_period = cfg
444            .surb_balance_notify_period
445            .map(|p| p.max(MIN_SURB_BUFFER_NOTIFICATION_PERIOD));
446        cfg.minimum_surb_buffer_duration = cfg.minimum_surb_buffer_duration.max(MIN_SURB_BUFFER_DURATION);
447
448        // Ensure the Frame MTU is at least the size of the Session segment MTU payload
449        cfg.frame_mtu = cfg.frame_mtu.max(SESSION_MTU);
450        cfg.max_frame_timeout = cfg.max_frame_timeout.max(MIN_FRAME_TIMEOUT);
451
452        #[cfg(all(feature = "telemetry", not(test)))]
453        METRIC_ACTIVE_SESSIONS.set(0.0);
454
455        let msg_sender = Arc::new(OnceLock::new());
456        Self {
457            msg_sender: msg_sender.clone(),
458            session_initiations: moka::future::Cache::builder()
459                .max_capacity(maximum_sessions as u64)
460                .time_to_live(
461                    2 * initiation_timeout_max_one_way(
462                        cfg.initiation_timeout_base,
463                        RoutingOptions::MAX_INTERMEDIATE_HOPS,
464                    ),
465                )
466                .build(),
467            sessions: moka::future::Cache::builder()
468                .max_capacity(maximum_sessions as u64)
469                .time_to_idle(cfg.idle_timeout)
470                .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
471                    moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
472                        trace!(?session_id, ?reason, "session evicted from the cache");
473                        close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
474                    }
475                    _ => {}
476                })
477                .build(),
478            session_notifiers: Arc::new(OnceLock::new()),
479            tag_allocator,
480            session_tag_range,
481            maximum_sessions,
482            cfg,
483        }
484    }
485
486    /// Starts the instance with the given `msg_sender` `Sink`
487    /// and a channel `new_session_notifier` used to notify when a new incoming session is opened to us.
488    ///
489    /// This method must be called prior to any calls to [`SessionManager::new_session`] or
490    /// [`SessionManager::dispatch_message`].
491    pub fn start<T>(&self, msg_sender: S, new_session_notifier: T) -> crate::errors::Result<Vec<AbortHandle>>
492    where
493        T: futures::Sink<IncomingSession> + Send + 'static,
494        T::Error: std::error::Error + Send + Sync + 'static,
495    {
496        self.msg_sender
497            .set(msg_sender)
498            .map_err(|_| SessionManagerError::AlreadyStarted)?;
499
500        // Re-map the user-provided sink errors to `SessionManagerError` and erase the concrete
501        // type, so that the `SessionManager` does not need to be generic over it. This also avoids
502        // having to spawn a separate task to forward items between channels: senders simply lock
503        // the sink and send directly.
504        let new_session_notifier: IncomingSessionSink =
505            Box::pin(new_session_notifier.sink_map_err(SessionManagerError::other));
506        let new_session_notifier = Arc::new(hopr_utils::runtime::prelude::Mutex::new(new_session_notifier));
507
508        let (session_close_tx, session_close_rx) = futures::channel::mpsc::channel(self.maximum_sessions + 10);
509        self.session_notifiers
510            .set((new_session_notifier, session_close_tx))
511            .map_err(|_| SessionManagerError::AlreadyStarted)?;
512
513        let myself = self.clone();
514        let closure_diag = hopr_utils::runtime::diagnostics::ConcurrentDiagnostics::new(
515            "session_close_for_each_concurrent",
516            module_path!(),
517            file!(),
518            line!(),
519        );
520        let ah_closure_notifications = hopr_utils::spawn_as_abortable_named!(
521            "session_close_notifications",
522            session_close_rx.for_each_concurrent(None, move |(session_id, closure_reason)| {
523                let myself = myself.clone();
524                let closure_diag = closure_diag.clone();
525                closure_diag.wrap(async move {
526                    // These notifications come from the Sessions themselves once
527                    // an empty read is encountered, which means the closure was done by the
528                    // other party.
529                    if let Some(session_data) = myself.sessions.remove(&session_id).await {
530                        close_session(session_id, session_data, closure_reason);
531                    } else {
532                        // Do not treat this as an error
533                        debug!(
534                            ?session_id,
535                            ?closure_reason,
536                            "could not find session id to close, maybe the session is already closed"
537                        );
538                    }
539                })
540            },)
541        );
542
543        // This is necessary to evict expired entries from the caches if
544        // no session-related operations happen at all.
545        // This ensures the dangling expired sessions are properly closed
546        // and their closure is timely notified to the other party.
547        let myself = self.clone();
548        let ah_session_expiration = hopr_utils::spawn_as_abortable!(async move {
549            let jitter = hopr_api::types::crypto_random::random_float_in_range(1.0..1.5);
550            let timeout = 2 * initiation_timeout_max_one_way(
551                myself.cfg.initiation_timeout_base,
552                RoutingOptions::MAX_INTERMEDIATE_HOPS,
553            )
554            .min(myself.cfg.idle_timeout)
555            .mul_f64(jitter)
556                / 2;
557            futures_time::stream::interval(timeout.into())
558                .for_each(|_| {
559                    trace!("executing session cache evictions");
560                    futures::future::join(
561                        myself.sessions.run_pending_tasks(),
562                        myself.session_initiations.run_pending_tasks(),
563                    )
564                    .map(|_| ())
565                })
566                .await;
567        });
568
569        Ok(vec![ah_closure_notifications, ah_session_expiration])
570    }
571
572    /// Check if [`start`](SessionManager::start) has been called and the instance is running.
573    pub fn is_started(&self) -> bool {
574        self.session_notifiers.get().is_some()
575    }
576
577    async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
578        // We currently do not support loopback Sessions on ourselves.
579        if let moka::ops::compute::CompResult::Inserted(_) = self
580            .sessions
581            .entry(session_id)
582            .and_compute_with(|entry| {
583                futures::future::ready(if entry.is_none() {
584                    moka::ops::compute::Op::Put(slot)
585                } else {
586                    moka::ops::compute::Op::Nop
587                })
588            })
589            .await
590        {
591            #[cfg(all(feature = "telemetry", not(test)))]
592            {
593                METRIC_NUM_INITIATED_SESSIONS.increment();
594                METRIC_ACTIVE_SESSIONS.increment(1.0);
595            }
596
597            Ok(())
598        } else {
599            // Session already exists; it means it is most likely a loopback attempt
600            error!(%session_id, "session already exists - loopback attempt");
601            Err(SessionManagerError::Loopback.into())
602        }
603    }
604
605    /// Initiates a new outgoing Session to `destination` with the given configuration.
606    ///
607    /// If the Session's counterparty does not respond within
608    /// the [configured](SessionManagerConfig) period,
609    /// this method returns [`TransportSessionError::Timeout`].
610    ///
611    /// It will also fail if the instance has not been [started](SessionManager::start).
612    pub async fn new_session(
613        &self,
614        destination: Address,
615        target: SessionTarget,
616        cfg: SessionClientConfig,
617    ) -> crate::errors::Result<HoprSession> {
618        self.sessions.run_pending_tasks().await;
619        if self.maximum_sessions <= self.sessions.entry_count() as usize {
620            return Err(SessionManagerError::TooManySessions.into());
621        }
622
623        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
624
625        let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
626        let (challenge, _) = insert_into_next_slot(
627            &self.session_initiations,
628            |ch| {
629                if let Some(challenge) = ch {
630                    ((challenge + 1) % hopr_api::types::crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
631                } else {
632                    hopr_api::types::crypto_random::random_integer(MIN_CHALLENGE, None)
633                }
634            },
635            |_| tx_initiation_done,
636        )
637        .await
638        .ok_or(SessionManagerError::NoChallengeSlots)?; // almost impossible with u64
639
640        // Prepare the session initiation message in the Start protocol
641        trace!(challenge, ?cfg, "initiating session with config");
642        let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
643            challenge,
644            target,
645            capabilities: ByteCapabilities(cfg.capabilities),
646            additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
647                cfg.surb_management
648                    .map(|c| c.target_surb_buffer_size)
649                    .unwrap_or(
650                        self.cfg.initial_return_session_egress_rate as u64
651                            * self
652                                .cfg
653                                .minimum_surb_buffer_duration
654                                .max(MIN_SURB_BUFFER_DURATION)
655                                .as_secs(),
656                    )
657                    .min(u32::MAX as u64) as u32
658            } else {
659                0
660            },
661        });
662
663        let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
664        let forward_routing = DestinationRouting::Forward {
665            destination: Box::new(destination.into()),
666            pseudonym: Some(pseudonym), // Session must use a fixed pseudonym already
667            forward_options: cfg.forward_path_options.clone(),
668            return_options: cfg.return_path_options.clone().into(),
669        };
670
671        // Send the Session initiation message
672        info!(challenge, %pseudonym, %destination, "new session request");
673        msg_sender
674            .send((
675                forward_routing.clone(),
676                ApplicationDataOut::with_no_packet_info(start_session_msg.try_into()?),
677            ))
678            .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
679            .await
680            .map_err(|_| {
681                error!(challenge, %pseudonym, %destination, "timeout sending session request message");
682                TransportSessionError::Timeout
683            })?
684            .map_err(TransportSessionError::packet_sending)?;
685
686        // The timeout is given by the number of hops requested
687        let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
688            self.cfg.initiation_timeout_base,
689            cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
690        )
691        .into();
692
693        // Await session establishment response from the Exit node or timeout
694        pin_mut!(rx_initiation_done);
695
696        trace!(challenge, "awaiting session establishment");
697        match rx_initiation_done.try_next().timeout(initiation_timeout).await {
698            Ok(Ok(Some(est))) => {
699                // Session has been established, construct it
700                let session_id = est.session_id;
701                debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
702
703                let (tx, rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
704                let notifier = self
705                    .session_notifiers
706                    .get()
707                    .map(|(_, notifier)| {
708                        let mut notifier = notifier.clone();
709                        Box::new(move |session_id: SessionId, reason: ClosureReason| {
710                            let _ = notifier
711                                .try_send((session_id, reason))
712                                .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
713                        })
714                    })
715                    .ok_or(SessionManagerError::NotStarted)?;
716
717                // NOTE: the Exit node can have different `max_surb_buffer_size`
718                // setting on the Session manager, so it does not make sense to cap it here
719                // with our maximum value.
720                if let Some(balancer_config) = cfg.surb_management {
721                    let surb_estimator = AtomicSurbFlowEstimator::default();
722
723                    // Sender responsible for keep-alive and Session data will be counting produced SURBs
724                    let surb_estimator_clone = surb_estimator.clone();
725                    let full_surb_scoring_sender =
726                        msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
727                            let produced = data.estimate_surbs_with_msg() as u64;
728                            // Count how many SURBs we sent with each packet
729                            surb_estimator_clone
730                                .produced
731                                .fetch_add(produced, std::sync::atomic::Ordering::Relaxed);
732                            #[cfg(feature = "telemetry")]
733                            crate::telemetry::record_session_surb_produced(&session_id, produced);
734                            futures::future::ok::<_, S::Error>((routing, data))
735                        });
736
737                    // For standard Session data we first reduce the number of SURBs we want to produce,
738                    // unless requested to always max them out
739                    let max_out_organic_surbs = cfg.always_max_out_surbs;
740                    let reduced_surb_scoring_sender = full_surb_scoring_sender.clone().with(
741                        // NOTE: this is put in-front of the `full_surb_scoring_sender`,
742                        // so that its estimate of SURBs gets automatically updated based on
743                        // the `max_surbs_in_packets` set here.
744                        move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
745                            if !max_out_organic_surbs {
746                                // TODO: make this dynamic to honor the balancer target (#7439)
747                                data.packet_info
748                                    .get_or_insert_with(|| OutgoingPacketInfo {
749                                        max_surbs_in_packet: 1,
750                                        ..Default::default()
751                                    })
752                                    .max_surbs_in_packet = 1;
753                            }
754                            futures::future::ok::<_, S::Error>((routing, data))
755                        },
756                    );
757
758                    let mut abort_handles = AbortableList::default();
759                    let surb_mgmt = Arc::new(BalancerStateValues::from(balancer_config));
760
761                    // Spawn the SURB-bearing keep alive stream towards the Exit
762                    let (ka_controller, ka_abort_handle) = utils::spawn_keep_alive_stream(
763                        session_id,
764                        full_surb_scoring_sender,
765                        forward_routing.clone(),
766                        if self.cfg.surb_target_notify {
767                            SurbNotificationMode::Target
768                        } else {
769                            SurbNotificationMode::DoNotNotify
770                        },
771                        surb_mgmt.clone(),
772                    );
773                    abort_handles.insert(SessionTasks::KeepAlive, ka_abort_handle);
774
775                    // Spawn the SURB balancer, which will decide on the initial SURB rate.
776                    debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
777                    let balancer = SurbBalancer::new(
778                        session_id,
779                        // The setpoint and output limit is immediately reconfigured by the SurbBalancer
780                        PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
781                        surb_estimator.clone(),
782                        // Currently, a keep-alive message can bear `HoprPacket::MAX_SURBS_IN_PACKET` SURBs,
783                        // so the correction by this factor is applied.
784                        SurbControllerWithCorrection(ka_controller, HoprPacket::MAX_SURBS_IN_PACKET as u32),
785                        surb_mgmt.clone(),
786                    );
787
788                    let (level_stream, balancer_abort_handle) =
789                        balancer.start_control_loop(self.cfg.balancer_sampling_interval);
790                    abort_handles.insert(SessionTasks::Balancer, balancer_abort_handle);
791
792                    // If the insertion fails prematurely, it will also kill all the abort handles
793                    self.insert_session_slot(
794                        session_id,
795                        SessionSlot {
796                            session_tx: Arc::new(tx),
797                            routing_opts: forward_routing.clone(),
798                            abort_handles: Arc::new(parking_lot::Mutex::new(abort_handles)),
799                            surb_mgmt: surb_mgmt.clone(),
800                            surb_estimator: surb_estimator.clone(),
801                            allocated_tag: None,
802                        },
803                    )
804                    .await?;
805
806                    // Wait for enough SURBs to be sent to the counterparty
807                    // TODO: consider making this interactive = other party reports the exact level periodically
808                    match level_stream
809                        .skip_while(|current_level| {
810                            futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
811                        })
812                        .next()
813                        .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
814                        .await
815                    {
816                        Ok(Some(surb_level)) => {
817                            info!(%session_id, surb_level, "session is ready");
818                        }
819                        Ok(None) => {
820                            return Err(
821                                SessionManagerError::other(anyhow!("surb balancer was cancelled prematurely")).into(),
822                            );
823                        }
824                        Err(_) => {
825                            warn!(%session_id, "session didn't reach target SURB buffer size in time");
826                        }
827                    }
828
829                    let surb_estimator_for_rx = surb_estimator.clone();
830                    let session = HoprSession::new(
831                        session_id,
832                        forward_routing,
833                        HoprSessionConfig {
834                            capabilities: cfg.capabilities,
835                            frame_mtu: self.cfg.frame_mtu,
836                            frame_timeout: self.cfg.max_frame_timeout,
837                        },
838                        (
839                            reduced_surb_scoring_sender,
840                            rx.inspect(move |_| {
841                                // Received packets = SURB consumption estimate
842                                // The received packets always consume a single SURB.
843                                surb_estimator_for_rx
844                                    .consumed
845                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
846                                #[cfg(feature = "telemetry")]
847                                crate::telemetry::record_session_surb_consumed(&session_id, 1);
848                            }),
849                        ),
850                        Some(notifier),
851                    )?;
852
853                    #[cfg(feature = "telemetry")]
854                    {
855                        initialize_session_metrics(
856                            session_id,
857                            HoprSessionConfig {
858                                capabilities: cfg.capabilities,
859                                frame_mtu: self.cfg.frame_mtu,
860                                frame_timeout: self.cfg.max_frame_timeout,
861                            },
862                        );
863                        set_session_state(&session_id, SessionLifecycleState::Active);
864                        set_session_balancer_data(&session_id, surb_estimator.clone(), surb_mgmt.clone());
865                    }
866
867                    Ok(session)
868                } else {
869                    warn!(%session_id, "session ready without SURB balancing");
870
871                    self.insert_session_slot(
872                        session_id,
873                        SessionSlot {
874                            session_tx: Arc::new(tx),
875                            routing_opts: forward_routing.clone(),
876                            abort_handles: Default::default(),
877                            surb_mgmt: Default::default(),      // Disabled SURB management
878                            surb_estimator: Default::default(), // No SURB estimator needed
879                            allocated_tag: None,
880                        },
881                    )
882                    .await?;
883
884                    // For standard Session data we first reduce the number of SURBs we want to produce,
885                    // unless requested to always max them out
886                    let max_out_organic_surbs = cfg.always_max_out_surbs;
887                    let reduced_surb_sender =
888                        msg_sender.with(move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
889                            if !max_out_organic_surbs {
890                                data.packet_info
891                                    .get_or_insert_with(|| OutgoingPacketInfo {
892                                        max_surbs_in_packet: 1,
893                                        ..Default::default()
894                                    })
895                                    .max_surbs_in_packet = 1;
896                            }
897                            futures::future::ok::<_, S::Error>((routing, data))
898                        });
899
900                    let session = HoprSession::new(
901                        session_id,
902                        forward_routing,
903                        HoprSessionConfig {
904                            capabilities: cfg.capabilities,
905                            frame_mtu: self.cfg.frame_mtu,
906                            frame_timeout: self.cfg.max_frame_timeout,
907                        },
908                        (reduced_surb_sender, rx),
909                        Some(notifier),
910                    )?;
911
912                    #[cfg(feature = "telemetry")]
913                    {
914                        initialize_session_metrics(
915                            session_id,
916                            HoprSessionConfig {
917                                capabilities: cfg.capabilities,
918                                frame_mtu: self.cfg.frame_mtu,
919                                frame_timeout: self.cfg.max_frame_timeout,
920                            },
921                        );
922                        set_session_state(&session_id, SessionLifecycleState::Active);
923                    }
924
925                    Ok(session)
926                }
927            }
928            Ok(Ok(None)) => Err(SessionManagerError::other(anyhow!(
929                "internal error: sender has been closed without completing the session establishment"
930            ))
931            .into()),
932            Ok(Err(error)) => {
933                // The other side did not allow us to establish a session
934                error!(
935                    challenge = error.challenge,
936                    ?error,
937                    "the other party rejected the session initiation with error"
938                );
939                Err(TransportSessionError::Rejected(error.reason))
940            }
941            Err(_) => {
942                // Timeout waiting for a session establishment
943                error!(challenge, "session initiation attempt timed out");
944
945                #[cfg(all(feature = "telemetry", not(test)))]
946                METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
947
948                Err(TransportSessionError::Timeout)
949            }
950        }
951    }
952
953    /// Sends a keep-alive packet with the given [`SessionId`].
954    ///
955    /// This currently "fires & forgets" and does not expect nor await any "pong" response.
956    pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
957        if let Some(session_data) = self.sessions.get(id).await {
958            trace!(session_id = ?id, "pinging manually session");
959            Ok(self
960                .msg_sender
961                .get()
962                .cloned()
963                .ok_or(SessionManagerError::NotStarted)?
964                .send((
965                    session_data.routing_opts.clone(),
966                    ApplicationDataOut::with_no_packet_info(HoprStartProtocol::KeepAlive((*id).into()).try_into()?),
967                ))
968                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
969                .await
970                .map_err(|_| {
971                    error!("timeout sending session ping message");
972                    TransportSessionError::Timeout
973                })?
974                .map_err(TransportSessionError::packet_sending)?)
975        } else {
976            Err(SessionManagerError::NonExistingSession.into())
977        }
978    }
979
980    /// Returns [`SessionIds`](SessionId) of all currently active sessions.
981    pub async fn active_sessions(&self) -> Vec<SessionId> {
982        self.sessions.run_pending_tasks().await;
983        self.sessions.iter().map(|(k, _)| *k).collect()
984    }
985
986    /// Explicitly closes the session with the given `id`.
987    ///
988    /// Removes the entry from the internal session cache, closes the data channel,
989    /// and aborts any auxiliary tasks. Returns `true` if a session was found and
990    /// closed, `false` otherwise.
991    ///
992    /// This avoids waiting for the idle timeout (`time_to_idle`) or the LRU
993    /// capacity bound to evict the entry, which is the desired behaviour when
994    /// the caller (e.g. REST `DELETE /session`) knows the session is finished.
995    pub async fn close_session(&self, id: &SessionId) -> bool {
996        if let Some(slot) = self.sessions.remove(id).await {
997            close_session(*id, slot, ClosureReason::Eviction);
998            true
999        } else {
1000            false
1001        }
1002    }
1003
1004    /// Updates the configuration of the SURB balancer on the given [`SessionId`].
1005    ///
1006    /// Returns an error if the Session with the given `id` does not exist, or
1007    /// if it does not use SURB balancing.
1008    pub async fn update_surb_balancer_config(
1009        &self,
1010        id: &SessionId,
1011        config: SurbBalancerConfig,
1012    ) -> crate::errors::Result<()> {
1013        let cfg = self
1014            .sessions
1015            .get(id)
1016            .await
1017            .ok_or(SessionManagerError::NonExistingSession)?
1018            .surb_mgmt;
1019
1020        // Only update the config if there already was one before
1021        if !cfg.is_disabled() {
1022            cfg.update(&config);
1023            Ok(())
1024        } else {
1025            Err(SessionManagerError::other(anyhow!("session does not use SURB balancing")).into())
1026        }
1027    }
1028
1029    /// Retrieves the configuration of SURB balancing for the given Session.
1030    ///
1031    /// Returns an error if the Session with the given `id` does not exist.
1032    pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
1033        match self.sessions.get(id).await {
1034            Some(session) => Ok(Some(session.surb_mgmt.as_ref())
1035                .filter(|c| !c.is_disabled())
1036                .map(|d| d.as_config())),
1037            None => Err(SessionManagerError::NonExistingSession.into()),
1038        }
1039    }
1040
1041    /// Gets estimations produced/received and consumed SURBs by the Session.
1042    ///
1043    /// For an outgoing Session (Entry) the pair is the number of SURBs sent (by us) and used (by the Exit).
1044    /// For an incoming Session (Exit) the pair is the number of SURBs received (from Entry) and used (by us).
1045    ///
1046    /// Returns an error if the Session with the given `id` does not exist.
1047    pub async fn get_surb_level_estimates(&self, id: &SessionId) -> crate::errors::Result<(u64, u64)> {
1048        match self.sessions.get(id).await {
1049            Some(session) => Ok((
1050                session
1051                    .surb_estimator
1052                    .produced
1053                    .load(std::sync::atomic::Ordering::Relaxed),
1054                session
1055                    .surb_estimator
1056                    .consumed
1057                    .load(std::sync::atomic::Ordering::Relaxed),
1058            )),
1059            None => Err(SessionManagerError::NonExistingSession.into()),
1060        }
1061    }
1062
1063    /// The main method to be called whenever data are received.
1064    ///
1065    /// It tries to recognize the message and correctly dispatches either
1066    /// the Session protocol or Start protocol messages.
1067    ///
1068    /// If the data are not recognized, they are returned as [`DispatchResult::Unrelated`].
1069    pub async fn dispatch_message(
1070        &self,
1071        pseudonym: HoprPseudonym,
1072        in_data: ApplicationDataIn,
1073    ) -> crate::errors::Result<DispatchResult> {
1074        if in_data.data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
1075            // This is a Start protocol message, so we handle it
1076            trace!("dispatching Start protocol message");
1077            return self
1078                .handle_start_protocol_message(pseudonym, in_data)
1079                .await
1080                .map(|_| DispatchResult::Processed);
1081        } else if self.session_tag_range.contains(&in_data.data.application_tag.as_u64()) {
1082            let session_id = SessionId::new(in_data.data.application_tag, pseudonym);
1083
1084            return if let Some(session_slot) = self.sessions.get(&session_id).await {
1085                trace!(?session_id, "received data for a registered session");
1086
1087                Ok(session_slot
1088                    .session_tx
1089                    .unbounded_send(in_data)
1090                    .map(|_| DispatchResult::Processed)
1091                    .map_err(SessionManagerError::other)?)
1092            } else {
1093                error!(%session_id, "received data from an unestablished session");
1094                Err(TransportSessionError::UnknownData)
1095            };
1096        }
1097
1098        trace!(tag = %in_data.data.application_tag, "received data not associated with session protocol or any existing session");
1099        Ok(DispatchResult::Unrelated(in_data))
1100    }
1101
1102    async fn handle_incoming_session_initiation(
1103        &self,
1104        pseudonym: HoprPseudonym,
1105        session_req: StartInitiation<SessionTarget, ByteCapabilities>,
1106    ) -> crate::errors::Result<()> {
1107        trace!(challenge = session_req.challenge, "received session initiation request");
1108
1109        debug!(%pseudonym, "got new session request, searching for a free session slot");
1110
1111        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
1112
1113        let (new_session_notifier, mut close_session_notifier) = self
1114            .session_notifiers
1115            .get()
1116            .cloned()
1117            .ok_or(SessionManagerError::NotStarted)?;
1118
1119        // Reply routing uses SURBs only with the pseudonym of this Session's ID
1120        let reply_routing = DestinationRouting::Return(pseudonym.into());
1121
1122        let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1123
1124        // Allocate a unique tag for this incoming session
1125        self.sessions.run_pending_tasks().await; // Needed so that entry_count is updated
1126        let allocated_tag = if self.maximum_sessions > self.sessions.entry_count() as usize {
1127            self.tag_allocator.allocate()
1128        } else {
1129            error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
1130            None
1131        };
1132
1133        if let Some(allocated_tag) = allocated_tag {
1134            let session_id = SessionId::new(allocated_tag.value(), pseudonym);
1135            let allocated_tag = Arc::new(allocated_tag);
1136
1137            let slot = SessionSlot {
1138                session_tx: Arc::new(tx_session_data),
1139                routing_opts: reply_routing.clone(),
1140                abort_handles: Default::default(),
1141                surb_mgmt: Default::default(),
1142                surb_estimator: Default::default(),
1143                allocated_tag: Some(allocated_tag),
1144            };
1145            self.sessions.insert(session_id, slot.clone()).await;
1146
1147            debug!(%session_id, ?session_req, "assigned a new session");
1148
1149            let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
1150                if let Err(error) = close_session_notifier.try_send((session_id, reason)) {
1151                    error!(%session_id, %error, %reason, "failed to notify session closure");
1152                }
1153            });
1154
1155            let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
1156                // Because of SURB scarcity, control the egress rate of incoming sessions
1157                let egress_rate_control =
1158                    RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1159
1160                // The Session request carries a "hint" as additional data telling what
1161                // the Session initiator has configured as its target buffer size in the Balancer.
1162                let target_surb_buffer_size = if session_req.additional_data > 0 {
1163                    (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1164                } else {
1165                    self.cfg.initial_return_session_egress_rate as u64
1166                        * self
1167                            .cfg
1168                            .minimum_surb_buffer_duration
1169                            .max(MIN_SURB_BUFFER_DURATION)
1170                            .as_secs()
1171                };
1172
1173                let surb_estimator_clone = slot.surb_estimator.clone();
1174                let session = HoprSession::new(
1175                    session_id,
1176                    reply_routing.clone(),
1177                    HoprSessionConfig {
1178                        capabilities: session_req.capabilities.into(),
1179                        frame_mtu: self.cfg.frame_mtu,
1180                        frame_timeout: self.cfg.max_frame_timeout,
1181                    },
1182                    (
1183                        // Sent packets = SURB consumption estimate
1184                        msg_sender
1185                            .clone()
1186                            .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1187                                // Each outgoing packet consumes one SURB
1188                                surb_estimator_clone
1189                                    .consumed
1190                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1191                                #[cfg(feature = "telemetry")]
1192                                crate::telemetry::record_session_surb_consumed(&session_id, 1);
1193                                futures::future::ok::<_, S::Error>((routing, data))
1194                            })
1195                            .rate_limit_with_controller(&egress_rate_control)
1196                            .buffer((2 * target_surb_buffer_size) as usize),
1197                        // Received packets = SURB retrieval estimate
1198                        rx_session_data.inspect(move |data| {
1199                            let produced = data.num_surbs_with_msg() as u64;
1200                            // Count the number of SURBs delivered with each incoming packet
1201                            surb_estimator_clone
1202                                .produced
1203                                .fetch_add(produced, std::sync::atomic::Ordering::Relaxed);
1204                            #[cfg(feature = "telemetry")]
1205                            crate::telemetry::record_session_surb_produced(&session_id, produced);
1206                        }),
1207                    ),
1208                    Some(closure_notifier),
1209                )?;
1210
1211                // The SURB balancer will start intervening by rate-limiting the
1212                // egress of the Session, once the estimated number of SURBs drops below
1213                // the target defined here. Otherwise, the maximum egress is allowed.
1214                let balancer_config = SurbBalancerConfig {
1215                    target_surb_buffer_size,
1216                    // At maximum egress, the SURB buffer drains in `minimum_surb_buffer_duration` seconds
1217                    max_surbs_per_sec: target_surb_buffer_size / self.cfg.minimum_surb_buffer_duration.as_secs(),
1218                    // No SURB decay at the Exit, since we know almost exactly how many SURBs
1219                    // were received
1220                    surb_decay: None,
1221                };
1222
1223                slot.surb_mgmt.update(&balancer_config);
1224
1225                // Spawn the SURB balancer only once we know we have registered the
1226                // abort handle with the pre-allocated Session slot
1227                debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1228                let balancer = SurbBalancer::new(
1229                    session_id,
1230                    SimpleBalancerController::default(),
1231                    slot.surb_estimator.clone(),
1232                    SurbControllerWithCorrection(egress_rate_control, 1), // 1 SURB per egress packet
1233                    slot.surb_mgmt.clone(),
1234                );
1235
1236                // Assign the SURB balancer and abort handles to the already allocated Session slot
1237                let (_, balancer_abort_handle) = balancer.start_control_loop(self.cfg.balancer_sampling_interval);
1238                slot.abort_handles
1239                    .lock()
1240                    .insert(SessionTasks::Balancer, balancer_abort_handle);
1241
1242                // Spawn a keep-alive stream notifying about the SURB buffer level towards the Entry
1243                if let Some(period) = self.cfg.surb_balance_notify_period {
1244                    let surb_estimator_clone = slot.surb_estimator.clone();
1245                    let (ka_controller, ka_abort_handle) = utils::spawn_keep_alive_stream(
1246                        session_id,
1247                        // Sent Keep-Alive packets also contribute to SURB consumption
1248                        msg_sender
1249                            .clone()
1250                            .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1251                                // Each sent keepalive consumes 1 SURB
1252                                surb_estimator_clone
1253                                    .consumed
1254                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1255                                #[cfg(feature = "telemetry")]
1256                                crate::telemetry::record_session_surb_consumed(&session_id, 1);
1257                                futures::future::ok::<_, S::Error>((routing, data))
1258                            }),
1259                        slot.routing_opts.clone(),
1260                        SurbNotificationMode::Level(slot.surb_estimator.clone()),
1261                        slot.surb_mgmt.clone(),
1262                    );
1263
1264                    // Start keepalive stream towards the Entry with a predefined period
1265                    hopr_utils::runtime::prelude::spawn(async move {
1266                        // Delay the stream execution by one period
1267                        hopr_utils::runtime::prelude::sleep(period).await;
1268                        ka_controller.set_rate_per_unit(1, period);
1269                    });
1270
1271                    slot.abort_handles
1272                        .lock()
1273                        .insert(SessionTasks::KeepAlive, ka_abort_handle);
1274
1275                    debug!(%session_id, ?period, "started SURB level-notifying keep-alive stream");
1276                }
1277
1278                session
1279            } else {
1280                HoprSession::new(
1281                    session_id,
1282                    reply_routing.clone(),
1283                    HoprSessionConfig {
1284                        capabilities: session_req.capabilities.into(),
1285                        frame_mtu: self.cfg.frame_mtu,
1286                        frame_timeout: self.cfg.max_frame_timeout,
1287                    },
1288                    (msg_sender.clone(), rx_session_data),
1289                    Some(closure_notifier),
1290                )?
1291            };
1292
1293            // Extract useful information about the session from the Start protocol message
1294            let incoming_session = IncomingSession {
1295                session,
1296                target: session_req.target,
1297            };
1298
1299            // Notify that a new incoming session has been created. Lock the sink and send
1300            // directly into it, so no extra forwarding task between channels is needed.
1301            match async {
1302                let mut guard = new_session_notifier.lock().await;
1303                guard.send(incoming_session).await
1304            }
1305            .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1306            .await
1307            {
1308                Err(_) => {
1309                    error!(%session_id, "timeout to notify about new incoming session");
1310                    return Err(TransportSessionError::Timeout);
1311                }
1312                Ok(Err(error)) => {
1313                    error!(%session_id, %error, "failed to notify about new incoming session");
1314                    return Err(SessionManagerError::other(error).into());
1315                }
1316                _ => {}
1317            };
1318
1319            trace!(?session_id, "session notification sent");
1320
1321            // Notify the sender that the session has been established.
1322            // Set our peer ID in the session ID sent back to them.
1323            let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1324                orig_challenge: session_req.challenge,
1325                session_id,
1326            });
1327
1328            msg_sender
1329                .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1330                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1331                .await
1332                .map_err(|_| {
1333                    error!(%session_id, "timeout sending session establishment message");
1334                    TransportSessionError::Timeout
1335                })?
1336                .map_err(|error| {
1337                    error!(%session_id, %error, "failed to send session establishment message");
1338                    SessionManagerError::other(error)
1339                })?;
1340
1341            #[cfg(feature = "telemetry")]
1342            {
1343                initialize_session_metrics(
1344                    session_id,
1345                    HoprSessionConfig {
1346                        capabilities: session_req.capabilities.0,
1347                        frame_mtu: self.cfg.frame_mtu,
1348                        frame_timeout: self.cfg.max_frame_timeout,
1349                    },
1350                );
1351                set_session_state(&session_id, SessionLifecycleState::Active);
1352                set_session_balancer_data(&session_id, slot.surb_estimator.clone(), slot.surb_mgmt.clone());
1353            }
1354
1355            info!(%session_id, "new session established");
1356
1357            #[cfg(all(feature = "telemetry", not(test)))]
1358            {
1359                METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1360                METRIC_ACTIVE_SESSIONS.increment(1.0);
1361            }
1362        } else {
1363            error!(%pseudonym,"failed to reserve a new session slot");
1364
1365            // Notify the sender that the session could not be established
1366            let reason = StartErrorReason::NoSlotsAvailable;
1367            let data = HoprStartProtocol::SessionError(StartErrorType {
1368                challenge: session_req.challenge,
1369                reason,
1370            });
1371
1372            msg_sender
1373                .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1374                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1375                .await
1376                .map_err(|_| {
1377                    error!("timeout sending session error message");
1378                    TransportSessionError::Timeout
1379                })?
1380                .map_err(|error| {
1381                    error!(%error, "failed to send session error message");
1382                    SessionManagerError::other(error)
1383                })?;
1384
1385            trace!(%pseudonym, "session establishment failure message sent");
1386
1387            #[cfg(all(feature = "telemetry", not(test)))]
1388            METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1389        }
1390
1391        Ok(())
1392    }
1393
1394    async fn handle_start_protocol_message(
1395        &self,
1396        pseudonym: HoprPseudonym,
1397        data: ApplicationDataIn,
1398    ) -> crate::errors::Result<()> {
1399        match HoprStartProtocol::try_from(data.data)? {
1400            HoprStartProtocol::StartSession(session_req) => {
1401                self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1402            }
1403            HoprStartProtocol::SessionEstablished(est) => {
1404                trace!(
1405                    session_id = ?est.session_id,
1406                    "received session establishment confirmation"
1407                );
1408                let challenge = est.orig_challenge;
1409                let session_id = est.session_id;
1410                if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1411                    if let Err(error) = tx_est.unbounded_send(Ok(est)) {
1412                        error!(%challenge, %session_id, %error, "failed to send session establishment confirmation");
1413                        return Err(SessionManagerError::other(error).into());
1414                    }
1415                    debug!(?session_id, challenge, "session establishment complete");
1416                } else {
1417                    error!(%session_id, challenge, "unknown session establishment attempt or expired");
1418                }
1419            }
1420            HoprStartProtocol::SessionError(error_type) => {
1421                trace!(
1422                    challenge = error_type.challenge,
1423                    error = ?error_type.reason,
1424                    "failed to initialize a session",
1425                );
1426                // Currently, we do not distinguish between individual error types
1427                // and just discard the initiation attempt and pass on the error.
1428                if let Some(tx_est) = self.session_initiations.remove(&error_type.challenge).await {
1429                    if let Err(error) = tx_est.unbounded_send(Err(error_type)) {
1430                        error!(%error, ?error_type, "could not send session error message");
1431                        return Err(SessionManagerError::other(error).into());
1432                    }
1433                    error!(
1434                        challenge = error_type.challenge,
1435                        ?error_type,
1436                        "session establishment error received"
1437                    );
1438                } else {
1439                    error!(
1440                        challenge = error_type.challenge,
1441                        ?error_type,
1442                        "session establishment attempt expired before error could be delivered"
1443                    );
1444                }
1445
1446                #[cfg(all(feature = "telemetry", not(test)))]
1447                METRIC_RECEIVED_SESSION_ERRS.increment(&[&error_type.reason.to_string()])
1448            }
1449            HoprStartProtocol::KeepAlive(msg) => {
1450                let session_id = msg.session_id;
1451                if let Some(session_slot) = self.sessions.get(&session_id).await {
1452                    trace!(?session_id, "received keep-alive message");
1453                    match &session_slot.routing_opts {
1454                        // Session is outgoing - keep-alive was received from the Exit
1455                        DestinationRouting::Forward { .. } => {
1456                            if msg.flags.contains(KeepAliveFlag::BalancerState)
1457                                && !session_slot.surb_mgmt.is_disabled()
1458                                && session_slot.surb_mgmt.buffer_level() != msg.additional_data
1459                            {
1460                                // Update the buffer level as sent to us from the Exit
1461                                session_slot
1462                                    .surb_mgmt
1463                                    .buffer_level
1464                                    .store(msg.additional_data, std::sync::atomic::Ordering::Relaxed);
1465                                debug!(%session_id, surb_level = msg.additional_data, "keep-alive updated SURB buffer size from the Exit");
1466                            }
1467
1468                            // Increase the number of consumed SURBs in the estimator
1469                            session_slot
1470                                .surb_estimator
1471                                .consumed
1472                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1473                            #[cfg(feature = "telemetry")]
1474                            crate::telemetry::record_session_surb_consumed(&session_id, 1);
1475                        }
1476                        // Session is incoming - keep-alive was received from the Entry
1477                        DestinationRouting::Return(_) => {
1478                            // Allow updating SURB balancer target based on the received Keep-Alive message
1479                            if msg.flags.contains(KeepAliveFlag::BalancerTarget)
1480                                && msg.additional_data > 0
1481                                && !session_slot.surb_mgmt.is_disabled()
1482                                && session_slot.surb_mgmt.controller_bounds().target() != msg.additional_data
1483                            {
1484                                // Update the target buffer size as sent to us from the Entry
1485                                session_slot
1486                                    .surb_mgmt
1487                                    .target_surb_buffer_size
1488                                    .store(msg.additional_data, std::sync::atomic::Ordering::Relaxed);
1489                                // Update maximum SURBs per second based on the new target
1490                                session_slot.surb_mgmt.max_surbs_per_sec.store(
1491                                    msg.additional_data / self.cfg.minimum_surb_buffer_duration.as_secs(),
1492                                    std::sync::atomic::Ordering::Relaxed,
1493                                );
1494                                debug!(%session_id, target_surb_buffer_size = msg.additional_data, "keep-alive updated SURB balancer target buffer size from the Entry");
1495                            }
1496
1497                            // Increase the number of received SURBs in the estimator.
1498                            // Typically, 2 SURBs per Keep-Alive message
1499                            let produced = KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64;
1500                            session_slot
1501                                .surb_estimator
1502                                .produced
1503                                .fetch_add(produced, std::sync::atomic::Ordering::Relaxed);
1504                            #[cfg(feature = "telemetry")]
1505                            crate::telemetry::record_session_surb_produced(&session_id, produced);
1506                        }
1507                    }
1508                } else {
1509                    debug!(%session_id, "received keep-alive request for an unknown session");
1510                }
1511            }
1512        }
1513
1514        Ok(())
1515    }
1516}
1517
1518#[cfg(test)]
1519mod tests {
1520    use anyhow::anyhow;
1521    use futures::{AsyncWriteExt, future::BoxFuture};
1522    use hopr_protocol_start::{StartProtocol, StartProtocolDiscriminants};
1523    use hopr_api::types::{
1524        crypto::{keypairs::ChainKeypair, prelude::Keypair},
1525        crypto_random::Randomizable,
1526        internal::routing::SurbMatcher,
1527        primitive::prelude::Address,
1528    };
1529    use hopr_utils::network_types::prelude::SealedHost;
1530    use tokio::time::timeout;
1531
1532    use super::*;
1533    use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1534
1535    /// Create a test tag allocator with a large partition.
1536    fn test_tag_allocator() -> Arc<dyn TagAllocator + Send + Sync> {
1537        test_tag_allocator_with_session_capacity(10000)
1538    }
1539
1540    /// Create a test tag allocator with a specific session partition capacity.
1541    fn test_tag_allocator_with_session_capacity(session_capacity: u64) -> Arc<dyn TagAllocator + Send + Sync> {
1542        hopr_transport_tag_allocator::create_allocators(
1543            ReservedTag::range().end..u16::MAX as u64 + 1,
1544            [
1545                (hopr_transport_tag_allocator::Usage::Session, session_capacity),
1546                (hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry, 10000),
1547                (hopr_transport_tag_allocator::Usage::ProvingTelemetry, 10000),
1548            ],
1549        )
1550        .expect("test allocator creation must not fail")
1551        .into_iter()
1552        .find(|(u, _)| matches!(u, hopr_transport_tag_allocator::Usage::Session))
1553        .expect("session allocator must exist")
1554        .1
1555    }
1556
1557    #[async_trait::async_trait]
1558    trait SendMsg {
1559        async fn send_message(
1560            &self,
1561            routing: DestinationRouting,
1562            data: ApplicationDataOut,
1563        ) -> crate::errors::Result<()>;
1564    }
1565
1566    mockall::mock! {
1567        MsgSender {}
1568        impl SendMsg for MsgSender {
1569            fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationDataOut)
1570            -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1571        }
1572    }
1573
1574    fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationDataOut)> {
1575        let (tx, rx) = futures::channel::mpsc::unbounded();
1576        tokio::task::spawn(async move {
1577            pin_mut!(rx);
1578            while let Some((routing, data)) = rx.next().await {
1579                sender
1580                    .send_message(routing, data)
1581                    .await
1582                    .expect("send message must not fail in mock");
1583            }
1584        });
1585        tx
1586    }
1587
1588    fn msg_type(data: &ApplicationDataOut, expected: StartProtocolDiscriminants) -> bool {
1589        HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1590            .map(|d| StartProtocolDiscriminants::from(d) == expected)
1591            .unwrap_or(false)
1592    }
1593
1594    fn start_msg_match(data: &ApplicationDataOut, msg: impl Fn(HoprStartProtocol) -> bool) -> bool {
1595        HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1596            .map(msg)
1597            .unwrap_or(false)
1598    }
1599
1600    #[test_log::test(tokio::test)]
1601    async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1602    {
1603        let alice_pseudonym = HoprPseudonym::random();
1604        let bob_peer: Address = (&ChainKeypair::random()).into();
1605
1606        let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1607        let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1608
1609        let mut sequence = mockall::Sequence::new();
1610        let mut alice_transport = MockMsgSender::new();
1611        let mut bob_transport = MockMsgSender::new();
1612
1613        // Alice sends the StartSession message
1614        let bob_mgr_clone = bob_mgr.clone();
1615        alice_transport
1616            .expect_send_message()
1617            .once()
1618            .in_sequence(&mut sequence)
1619            .withf(move |peer, data| {
1620                info!("alice sends {}", data.data.application_tag);
1621                msg_type(data, StartProtocolDiscriminants::StartSession)
1622                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1623            })
1624            .returning(move |_, data| {
1625                let bob_mgr_clone = bob_mgr_clone.clone();
1626                Box::pin(async move {
1627                    bob_mgr_clone
1628                        .dispatch_message(
1629                            alice_pseudonym,
1630                            ApplicationDataIn {
1631                                data: data.data,
1632                                packet_info: Default::default(),
1633                            },
1634                        )
1635                        .await?;
1636                    Ok(())
1637                })
1638            });
1639
1640        // Bob sends the SessionEstablished message
1641        let alice_mgr_clone = alice_mgr.clone();
1642        bob_transport
1643            .expect_send_message()
1644            .once()
1645            .in_sequence(&mut sequence)
1646            .withf(move |peer, data| {
1647                info!("bob sends {}", data.data.application_tag);
1648                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1649                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1650            })
1651            .returning(move |_, data| {
1652                let alice_mgr_clone = alice_mgr_clone.clone();
1653
1654                Box::pin(async move {
1655                    alice_mgr_clone
1656                        .dispatch_message(
1657                            alice_pseudonym,
1658                            ApplicationDataIn {
1659                                data: data.data,
1660                                packet_info: Default::default(),
1661                            },
1662                        )
1663                        .await?;
1664                    Ok(())
1665                })
1666            });
1667
1668        // Alice sends the terminating segment to close the Session
1669        let bob_mgr_clone = bob_mgr.clone();
1670        alice_transport
1671            .expect_send_message()
1672            .once()
1673            .in_sequence(&mut sequence)
1674            .withf(move |peer, data| {
1675                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1676                    data.data.plain_text.as_ref(),
1677                )
1678                .expect("must be a session message")
1679                .try_as_segment()
1680                .expect("must be a segment")
1681                .is_terminating()
1682                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1683            })
1684            .returning(move |_, data| {
1685                let bob_mgr_clone = bob_mgr_clone.clone();
1686                Box::pin(async move {
1687                    bob_mgr_clone
1688                        .dispatch_message(
1689                            alice_pseudonym,
1690                            ApplicationDataIn {
1691                                data: data.data,
1692                                packet_info: Default::default(),
1693                            },
1694                        )
1695                        .await?;
1696                    Ok(())
1697                })
1698            });
1699
1700        let mut ahs = Vec::new();
1701
1702        // Start Alice
1703        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1704        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1705
1706        // Start Bob
1707        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1708        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1709
1710        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1711
1712        pin_mut!(new_session_rx_bob);
1713        let (alice_session, bob_session) = timeout(
1714            Duration::from_secs(2),
1715            futures::future::join(
1716                alice_mgr.new_session(
1717                    bob_peer,
1718                    SessionTarget::TcpStream(target.clone()),
1719                    SessionClientConfig {
1720                        pseudonym: alice_pseudonym.into(),
1721                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1722                        surb_management: None,
1723                        ..Default::default()
1724                    },
1725                ),
1726                new_session_rx_bob.next(),
1727            ),
1728        )
1729        .await?;
1730
1731        let mut alice_session = alice_session?;
1732        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1733
1734        assert_eq!(
1735            alice_session.config().capabilities,
1736            Capability::Segmentation | Capability::NoRateControl
1737        );
1738        assert_eq!(
1739            alice_session.config().capabilities,
1740            bob_session.session.config().capabilities
1741        );
1742        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1743
1744        assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1745        assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1746        assert!(
1747            alice_mgr
1748                .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1749                .await
1750                .is_err()
1751        );
1752
1753        assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1754        assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1755        assert!(
1756            bob_mgr
1757                .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1758                .await
1759                .is_err()
1760        );
1761
1762        tokio::time::sleep(Duration::from_millis(100)).await;
1763        alice_session.close().await?;
1764
1765        tokio::time::sleep(Duration::from_millis(100)).await;
1766
1767        assert!(matches!(
1768            alice_mgr.ping_session(alice_session.id()).await,
1769            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1770        ));
1771
1772        futures::stream::iter(ahs)
1773            .for_each(|ah| async move { ah.abort() })
1774            .await;
1775
1776        Ok(())
1777    }
1778
1779    #[test_log::test(tokio::test)]
1780    async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1781        let alice_pseudonym = HoprPseudonym::random();
1782        let bob_peer: Address = (&ChainKeypair::random()).into();
1783
1784        let cfg = SessionManagerConfig {
1785            idle_timeout: Duration::from_millis(200),
1786            ..Default::default()
1787        };
1788
1789        let alice_mgr = SessionManager::new(cfg, test_tag_allocator());
1790        let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1791
1792        let mut sequence = mockall::Sequence::new();
1793        let mut alice_transport = MockMsgSender::new();
1794        let mut bob_transport = MockMsgSender::new();
1795
1796        // Alice sends the StartSession message
1797        let bob_mgr_clone = bob_mgr.clone();
1798        alice_transport
1799            .expect_send_message()
1800            .once()
1801            .in_sequence(&mut sequence)
1802            .withf(move |peer, data| {
1803                msg_type(data, StartProtocolDiscriminants::StartSession)
1804                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1805            })
1806            .returning(move |_, data| {
1807                let bob_mgr_clone = bob_mgr_clone.clone();
1808                Box::pin(async move {
1809                    bob_mgr_clone
1810                        .dispatch_message(
1811                            alice_pseudonym,
1812                            ApplicationDataIn {
1813                                data: data.data,
1814                                packet_info: Default::default(),
1815                            },
1816                        )
1817                        .await?;
1818                    Ok(())
1819                })
1820            });
1821
1822        // Bob sends the SessionEstablished message
1823        let alice_mgr_clone = alice_mgr.clone();
1824        bob_transport
1825            .expect_send_message()
1826            .once()
1827            .in_sequence(&mut sequence)
1828            .withf(move |peer, data| {
1829                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1830                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1831            })
1832            .returning(move |_, data| {
1833                let alice_mgr_clone = alice_mgr_clone.clone();
1834
1835                Box::pin(async move {
1836                    alice_mgr_clone
1837                        .dispatch_message(
1838                            alice_pseudonym,
1839                            ApplicationDataIn {
1840                                data: data.data,
1841                                packet_info: Default::default(),
1842                            },
1843                        )
1844                        .await?;
1845                    Ok(())
1846                })
1847            });
1848
1849        let mut ahs = Vec::new();
1850
1851        // Start Alice
1852        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1853        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1854
1855        // Start Bob
1856        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1857        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1858
1859        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1860
1861        pin_mut!(new_session_rx_bob);
1862        let (alice_session, bob_session) = timeout(
1863            Duration::from_secs(2),
1864            futures::future::join(
1865                alice_mgr.new_session(
1866                    bob_peer,
1867                    SessionTarget::TcpStream(target.clone()),
1868                    SessionClientConfig {
1869                        pseudonym: alice_pseudonym.into(),
1870                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1871                        surb_management: None,
1872                        ..Default::default()
1873                    },
1874                ),
1875                new_session_rx_bob.next(),
1876            ),
1877        )
1878        .await?;
1879
1880        let alice_session = alice_session?;
1881        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1882
1883        assert_eq!(
1884            alice_session.config().capabilities,
1885            Capability::Segmentation | Capability::NoRateControl,
1886        );
1887        assert_eq!(
1888            alice_session.config().capabilities,
1889            bob_session.session.config().capabilities
1890        );
1891        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1892
1893        // Let the session timeout at Alice
1894        tokio::time::sleep(Duration::from_millis(300)).await;
1895
1896        assert!(matches!(
1897            alice_mgr.ping_session(alice_session.id()).await,
1898            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1899        ));
1900
1901        futures::stream::iter(ahs)
1902            .for_each(|ah| async move { ah.abort() })
1903            .await;
1904
1905        Ok(())
1906    }
1907
1908    #[test_log::test(tokio::test)]
1909    async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1910        let alice_pseudonym = HoprPseudonym::random();
1911        let session_id = SessionId::new(16u64, alice_pseudonym);
1912        let balancer_cfg = SurbBalancerConfig {
1913            target_surb_buffer_size: 1000,
1914            max_surbs_per_sec: 100,
1915            ..Default::default()
1916        };
1917
1918        let alice_mgr = SessionManager::<UnboundedSender<(DestinationRouting, ApplicationDataOut)>>::new(
1919            Default::default(),
1920            test_tag_allocator(),
1921        );
1922
1923        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1924        alice_mgr
1925            .sessions
1926            .insert(
1927                session_id,
1928                SessionSlot {
1929                    session_tx: Arc::new(dummy_tx),
1930                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1931                    abort_handles: Default::default(),
1932                    surb_mgmt: Arc::new(BalancerStateValues::from(balancer_cfg)),
1933                    surb_estimator: Default::default(),
1934                    allocated_tag: None,
1935                },
1936            )
1937            .await;
1938
1939        let actual_cfg = alice_mgr
1940            .get_surb_balancer_config(&session_id)
1941            .await?
1942            .ok_or(anyhow!("session must have a surb balancer config"))?;
1943        assert_eq!(actual_cfg, balancer_cfg);
1944
1945        let new_cfg = SurbBalancerConfig {
1946            target_surb_buffer_size: 2000,
1947            max_surbs_per_sec: 200,
1948            ..Default::default()
1949        };
1950        alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1951
1952        let actual_cfg = alice_mgr
1953            .get_surb_balancer_config(&session_id)
1954            .await?
1955            .ok_or(anyhow!("session must have a surb balancer config"))?;
1956        assert_eq!(actual_cfg, new_cfg);
1957
1958        Ok(())
1959    }
1960
1961    #[test_log::test(tokio::test)]
1962    async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1963        let alice_pseudonym = HoprPseudonym::random();
1964        let bob_peer: Address = (&ChainKeypair::random()).into();
1965
1966        let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1967        let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator_with_session_capacity(1));
1968
1969        // Occupy the only free slot with tag 16
1970        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1971        bob_mgr
1972            .sessions
1973            .insert(
1974                SessionId::new(16u64, alice_pseudonym),
1975                SessionSlot {
1976                    session_tx: Arc::new(dummy_tx),
1977                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1978                    abort_handles: Default::default(),
1979                    surb_mgmt: Default::default(),
1980                    surb_estimator: Default::default(),
1981                    allocated_tag: None,
1982                },
1983            )
1984            .await;
1985
1986        let mut sequence = mockall::Sequence::new();
1987        let mut alice_transport = MockMsgSender::new();
1988        let mut bob_transport = MockMsgSender::new();
1989
1990        // Alice sends the StartSession message
1991        let bob_mgr_clone = bob_mgr.clone();
1992        alice_transport
1993            .expect_send_message()
1994            .once()
1995            .in_sequence(&mut sequence)
1996            .withf(move |peer, data| {
1997                msg_type(data, StartProtocolDiscriminants::StartSession)
1998                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1999            })
2000            .returning(move |_, data| {
2001                let bob_mgr_clone = bob_mgr_clone.clone();
2002                Box::pin(async move {
2003                    bob_mgr_clone
2004                        .dispatch_message(
2005                            alice_pseudonym,
2006                            ApplicationDataIn {
2007                                data: data.data,
2008                                packet_info: Default::default(),
2009                            },
2010                        )
2011                        .await?;
2012                    Ok(())
2013                })
2014            });
2015
2016        // Bob sends the SessionError message
2017        let alice_mgr_clone = alice_mgr.clone();
2018        bob_transport
2019            .expect_send_message()
2020            .once()
2021            .in_sequence(&mut sequence)
2022            .withf(move |peer, data| {
2023                msg_type(data, StartProtocolDiscriminants::SessionError)
2024                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2025            })
2026            .returning(move |_, data| {
2027                let alice_mgr_clone = alice_mgr_clone.clone();
2028                Box::pin(async move {
2029                    alice_mgr_clone
2030                        .dispatch_message(
2031                            alice_pseudonym,
2032                            ApplicationDataIn {
2033                                data: data.data,
2034                                packet_info: Default::default(),
2035                            },
2036                        )
2037                        .await?;
2038                    Ok(())
2039                })
2040            });
2041
2042        let mut jhs = Vec::new();
2043
2044        // Start Alice
2045        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2046        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2047
2048        // Start Bob
2049        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2050        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2051
2052        let result = alice_mgr
2053            .new_session(
2054                bob_peer,
2055                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2056                SessionClientConfig {
2057                    capabilities: Capabilities::empty(),
2058                    pseudonym: alice_pseudonym.into(),
2059                    surb_management: None,
2060                    ..Default::default()
2061                },
2062            )
2063            .await;
2064
2065        assert!(
2066            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2067        );
2068
2069        Ok(())
2070    }
2071
2072    #[test_log::test(tokio::test)]
2073    async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
2074    -> anyhow::Result<()> {
2075        let alice_pseudonym = HoprPseudonym::random();
2076        let bob_peer: Address = (&ChainKeypair::random()).into();
2077
2078        let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2079        let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator_with_session_capacity(1));
2080
2081        // Occupy the only free slot with tag 16
2082        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
2083        bob_mgr
2084            .sessions
2085            .insert(
2086                SessionId::new(16u64, alice_pseudonym),
2087                SessionSlot {
2088                    session_tx: Arc::new(dummy_tx),
2089                    routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
2090                    abort_handles: Default::default(),
2091                    surb_mgmt: Default::default(),
2092                    surb_estimator: Default::default(),
2093                    allocated_tag: None,
2094                },
2095            )
2096            .await;
2097
2098        let mut sequence = mockall::Sequence::new();
2099        let mut alice_transport = MockMsgSender::new();
2100        let mut bob_transport = MockMsgSender::new();
2101
2102        // Alice sends the StartSession message
2103        let bob_mgr_clone = bob_mgr.clone();
2104        alice_transport
2105            .expect_send_message()
2106            .once()
2107            .in_sequence(&mut sequence)
2108            .withf(move |peer, data| {
2109                msg_type(data, StartProtocolDiscriminants::StartSession)
2110                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2111            })
2112            .returning(move |_, data| {
2113                let bob_mgr_clone = bob_mgr_clone.clone();
2114                Box::pin(async move {
2115                    bob_mgr_clone
2116                        .dispatch_message(
2117                            alice_pseudonym,
2118                            ApplicationDataIn {
2119                                data: data.data,
2120                                packet_info: Default::default(),
2121                            },
2122                        )
2123                        .await?;
2124                    Ok(())
2125                })
2126            });
2127
2128        // Bob sends the SessionError message
2129        let alice_mgr_clone = alice_mgr.clone();
2130        bob_transport
2131            .expect_send_message()
2132            .once()
2133            .in_sequence(&mut sequence)
2134            .withf(move |peer, data| {
2135                msg_type(data, StartProtocolDiscriminants::SessionError)
2136                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2137            })
2138            .returning(move |_, data| {
2139                let alice_mgr_clone = alice_mgr_clone.clone();
2140                Box::pin(async move {
2141                    alice_mgr_clone
2142                        .dispatch_message(
2143                            alice_pseudonym,
2144                            ApplicationDataIn {
2145                                data: data.data,
2146                                packet_info: Default::default(),
2147                            },
2148                        )
2149                        .await?;
2150                    Ok(())
2151                })
2152            });
2153
2154        let mut jhs = Vec::new();
2155
2156        // Start Alice
2157        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2158        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2159
2160        // Start Bob
2161        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2162        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2163
2164        let result = alice_mgr
2165            .new_session(
2166                bob_peer,
2167                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2168                SessionClientConfig {
2169                    capabilities: None.into(),
2170                    pseudonym: alice_pseudonym.into(),
2171                    surb_management: None,
2172                    ..Default::default()
2173                },
2174            )
2175            .await;
2176
2177        assert!(
2178            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2179        );
2180
2181        Ok(())
2182    }
2183
2184    #[test_log::test(tokio::test)]
2185    async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
2186        let alice_pseudonym = HoprPseudonym::random();
2187        let bob_peer: Address = (&ChainKeypair::random()).into();
2188
2189        let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2190
2191        let mut sequence = mockall::Sequence::new();
2192        let mut alice_transport = MockMsgSender::new();
2193
2194        // Alice sends the StartSession message
2195        let alice_mgr_clone = alice_mgr.clone();
2196        alice_transport
2197            .expect_send_message()
2198            .once()
2199            .in_sequence(&mut sequence)
2200            .withf(move |peer, data| {
2201                msg_type(data, StartProtocolDiscriminants::StartSession)
2202                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2203            })
2204            .returning(move |_, data| {
2205                // But the message is again processed by Alice due to Loopback
2206                let alice_mgr_clone = alice_mgr_clone.clone();
2207                Box::pin(async move {
2208                    alice_mgr_clone
2209                        .dispatch_message(
2210                            alice_pseudonym,
2211                            ApplicationDataIn {
2212                                data: data.data,
2213                                packet_info: Default::default(),
2214                            },
2215                        )
2216                        .await?;
2217                    Ok(())
2218                })
2219            });
2220
2221        // Alice sends the SessionEstablished message (as Bob)
2222        let alice_mgr_clone = alice_mgr.clone();
2223        alice_transport
2224            .expect_send_message()
2225            .once()
2226            .in_sequence(&mut sequence)
2227            .withf(move |peer, data| {
2228                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2229                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2230            })
2231            .returning(move |_, data| {
2232                let alice_mgr_clone = alice_mgr_clone.clone();
2233
2234                Box::pin(async move {
2235                    alice_mgr_clone
2236                        .dispatch_message(
2237                            alice_pseudonym,
2238                            ApplicationDataIn {
2239                                data: data.data,
2240                                packet_info: Default::default(),
2241                            },
2242                        )
2243                        .await?;
2244                    Ok(())
2245                })
2246            });
2247
2248        // Start Alice
2249        let (new_session_tx_alice, new_session_rx_alice) = futures::channel::mpsc::channel(1024);
2250        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2251
2252        let alice_session = alice_mgr
2253            .new_session(
2254                bob_peer,
2255                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2256                SessionClientConfig {
2257                    capabilities: None.into(),
2258                    pseudonym: alice_pseudonym.into(),
2259                    surb_management: None,
2260                    ..Default::default()
2261                },
2262            )
2263            .await;
2264
2265        println!("{alice_session:?}");
2266        assert!(matches!(
2267            alice_session,
2268            Err(TransportSessionError::Manager(SessionManagerError::Loopback))
2269        ));
2270
2271        drop(new_session_rx_alice);
2272        Ok(())
2273    }
2274
2275    #[test_log::test(tokio::test)]
2276    async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
2277        let bob_peer: Address = (&ChainKeypair::random()).into();
2278
2279        let cfg = SessionManagerConfig {
2280            initiation_timeout_base: Duration::from_millis(100),
2281            ..Default::default()
2282        };
2283
2284        let alice_mgr = SessionManager::new(cfg, test_tag_allocator());
2285        let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2286
2287        let mut sequence = mockall::Sequence::new();
2288        let mut alice_transport = MockMsgSender::new();
2289        let bob_transport = MockMsgSender::new();
2290
2291        // Alice sends the StartSession message, but Bob does not handle it
2292        alice_transport
2293            .expect_send_message()
2294            .once()
2295            .in_sequence(&mut sequence)
2296            .withf(move |peer, data| {
2297                msg_type(data, StartProtocolDiscriminants::StartSession)
2298                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2299            })
2300            .returning(|_, _| Box::pin(async { Ok(()) }));
2301
2302        // Start Alice
2303        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2304        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2305
2306        // Start Bob
2307        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2308        bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
2309
2310        let result = alice_mgr
2311            .new_session(
2312                bob_peer,
2313                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2314                SessionClientConfig {
2315                    capabilities: None.into(),
2316                    pseudonym: None,
2317                    surb_management: None,
2318                    ..Default::default()
2319                },
2320            )
2321            .await;
2322
2323        assert!(matches!(result, Err(TransportSessionError::Timeout)));
2324
2325        Ok(())
2326    }
2327
2328    #[cfg(feature = "telemetry")]
2329    #[test_log::test(tokio::test)]
2330    async fn failed_incoming_session_establishment_does_not_register_telemetry() -> anyhow::Result<()> {
2331        let mgr = SessionManager::new(Default::default(), test_tag_allocator());
2332
2333        let transport = MockMsgSender::new();
2334        let (new_session_tx, new_session_rx) = futures::channel::mpsc::channel(1);
2335        drop(new_session_rx);
2336        mgr.start(mock_packet_planning(transport), new_session_tx)?;
2337
2338        let pseudonym = HoprPseudonym::random();
2339        let result = mgr
2340            .handle_incoming_session_initiation(
2341                pseudonym,
2342                StartInitiation {
2343                    challenge: MIN_CHALLENGE,
2344                    target: SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2345                    capabilities: ByteCapabilities(Capabilities::empty()),
2346                    additional_data: 0,
2347                },
2348            )
2349            .await;
2350
2351        assert!(result.is_err());
2352
2353        let allocated_session_ids = mgr.active_sessions().await;
2354        assert_eq!(1, allocated_session_ids.len());
2355
2356        Ok(())
2357    }
2358
2359    #[test_log::test(tokio::test)]
2360    async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
2361        let alice_pseudonym = HoprPseudonym::random();
2362        let bob_peer: Address = (&ChainKeypair::random()).into();
2363
2364        let bob_cfg = SessionManagerConfig {
2365            surb_balance_notify_period: Some(Duration::from_millis(500)),
2366            ..Default::default()
2367        };
2368        let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2369        let bob_mgr = SessionManager::new(bob_cfg.clone(), test_tag_allocator());
2370
2371        let mut alice_transport = MockMsgSender::new();
2372        let mut bob_transport = MockMsgSender::new();
2373
2374        // Alice sends the StartSession message
2375        let mut open_sequence = mockall::Sequence::new();
2376        let bob_mgr_clone = bob_mgr.clone();
2377        alice_transport
2378            .expect_send_message()
2379            .once()
2380            .in_sequence(&mut open_sequence)
2381            .withf(move |peer, data| {
2382                msg_type(data, StartProtocolDiscriminants::StartSession)
2383                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2384            })
2385            .returning(move |_, data| {
2386                let bob_mgr_clone = bob_mgr_clone.clone();
2387                Box::pin(async move {
2388                    bob_mgr_clone
2389                        .dispatch_message(
2390                            alice_pseudonym,
2391                            ApplicationDataIn {
2392                                data: data.data,
2393                                packet_info: Default::default(),
2394                            },
2395                        )
2396                        .await?;
2397                    Ok(())
2398                })
2399            });
2400
2401        // Bob sends the SessionEstablished message
2402        let alice_mgr_clone = alice_mgr.clone();
2403        bob_transport
2404            .expect_send_message()
2405            .once()
2406            .in_sequence(&mut open_sequence)
2407            .withf(move |peer, data| {
2408                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2409                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2410            })
2411            .returning(move |_, data| {
2412                let alice_mgr_clone = alice_mgr_clone.clone();
2413                Box::pin(async move {
2414                    alice_mgr_clone
2415                        .dispatch_message(
2416                            alice_pseudonym,
2417                            ApplicationDataIn {
2418                                data: data.data,
2419                                packet_info: Default::default(),
2420                            },
2421                        )
2422                        .await?;
2423                    Ok(())
2424                })
2425            });
2426
2427        const INITIAL_BALANCER_TARGET: u64 = 10;
2428
2429        // Alice sends the KeepAlive messages reporting the initial balancer target
2430        let bob_mgr_clone = bob_mgr.clone();
2431        alice_transport
2432            .expect_send_message()
2433            .times(5..)
2434            //.in_sequence(&mut sequence)
2435            .withf(move |peer, data| {
2436                start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerTarget) && ka.additional_data == INITIAL_BALANCER_TARGET))
2437                //msg_type(data, StartProtocolDiscriminants::KeepAlive)
2438                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2439            })
2440            .returning(move |_, data| {
2441                let bob_mgr_clone = bob_mgr_clone.clone();
2442                Box::pin(async move {
2443                    bob_mgr_clone
2444                        .dispatch_message(
2445                            alice_pseudonym,
2446                            ApplicationDataIn {
2447                                data: data.data,
2448                                packet_info: Default::default(),
2449                            },
2450                        )
2451                        .await?;
2452                    Ok(())
2453                })
2454            });
2455
2456        const NEXT_BALANCER_TARGET: u64 = 50;
2457
2458        // Alice sends also the KeepAlive messages reporting the updated balancer target
2459        let bob_mgr_clone = bob_mgr.clone();
2460        alice_transport
2461            .expect_send_message()
2462            .times(5..)
2463            //.in_sequence(&mut sequence)
2464            .withf(move |peer, data| {
2465                start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerTarget) && ka.additional_data == NEXT_BALANCER_TARGET))
2466                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2467            })
2468            .returning(move |_, data| {
2469                let bob_mgr_clone = bob_mgr_clone.clone();
2470                Box::pin(async move {
2471                    bob_mgr_clone
2472                        .dispatch_message(
2473                            alice_pseudonym,
2474                            ApplicationDataIn {
2475                                data: data.data,
2476                                packet_info: Default::default(),
2477                            },
2478                        )
2479                        .await?;
2480                    Ok(())
2481                })
2482            });
2483
2484        // Bob sends at least 1 Keep Alive back reporting its SURB estimate
2485        let alice_mgr_clone = alice_mgr.clone();
2486        bob_transport
2487            .expect_send_message()
2488            .times(1..)
2489            //.in_sequence(&mut open_sequence)
2490            .withf(move |peer, data| {
2491                start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerState) && ka.additional_data > 0))
2492                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2493            })
2494            .returning(move |_, data| {
2495                let alice_mgr_clone = alice_mgr_clone.clone();
2496                Box::pin(async move {
2497                    alice_mgr_clone
2498                        .dispatch_message(
2499                            alice_pseudonym,
2500                            ApplicationDataIn {
2501                                data: data.data,
2502                                packet_info: Default::default(),
2503                            },
2504                        )
2505                        .await?;
2506                    Ok(())
2507                })
2508            });
2509
2510        // Alice sends the terminating segment to close the Session
2511        let bob_mgr_clone = bob_mgr.clone();
2512        alice_transport
2513            .expect_send_message()
2514            .once()
2515            //.in_sequence(&mut sequence)
2516            .withf(move |peer, data| {
2517                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2518                    data.data.plain_text.as_ref(),
2519                )
2520                .ok()
2521                .and_then(|m| m.try_as_segment())
2522                .map(|s| s.is_terminating())
2523                .unwrap_or(false)
2524                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2525            })
2526            .returning(move |_, data| {
2527                let bob_mgr_clone = bob_mgr_clone.clone();
2528                Box::pin(async move {
2529                    bob_mgr_clone
2530                        .dispatch_message(
2531                            alice_pseudonym,
2532                            ApplicationDataIn {
2533                                data: data.data,
2534                                packet_info: Default::default(),
2535                            },
2536                        )
2537                        .await?;
2538                    Ok(())
2539                })
2540            });
2541
2542        let mut ahs = Vec::new();
2543
2544        // Start Alice
2545        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2546        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2547
2548        // Start Bob
2549        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
2550        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2551
2552        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2553
2554        let balancer_cfg = SurbBalancerConfig {
2555            target_surb_buffer_size: INITIAL_BALANCER_TARGET,
2556            max_surbs_per_sec: 100,
2557            ..Default::default()
2558        };
2559
2560        pin_mut!(new_session_rx_bob);
2561        let (alice_session, bob_session) = timeout(
2562            Duration::from_secs(2),
2563            futures::future::join(
2564                alice_mgr.new_session(
2565                    bob_peer,
2566                    SessionTarget::TcpStream(target.clone()),
2567                    SessionClientConfig {
2568                        pseudonym: alice_pseudonym.into(),
2569                        capabilities: Capability::Segmentation.into(),
2570                        surb_management: Some(balancer_cfg),
2571                        ..Default::default()
2572                    },
2573                ),
2574                new_session_rx_bob.next(),
2575            ),
2576        )
2577        .await?;
2578
2579        let mut alice_session = alice_session?;
2580        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2581
2582        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2583
2584        assert_eq!(
2585            Some(balancer_cfg),
2586            alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2587        );
2588
2589        let remote_cfg = bob_mgr
2590            .get_surb_balancer_config(bob_session.session.id())
2591            .await?
2592            .ok_or(anyhow!("no remote config at bob"))?;
2593        assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2594        assert_eq!(
2595            remote_cfg.max_surbs_per_sec,
2596            remote_cfg.target_surb_buffer_size
2597                / bob_cfg
2598                    .minimum_surb_buffer_duration
2599                    .max(MIN_SURB_BUFFER_DURATION)
2600                    .as_secs()
2601        );
2602
2603        // Let the Surb balancer send enough KeepAlive messages
2604        tokio::time::sleep(Duration::from_millis(1500)).await;
2605
2606        let new_balancer_cfg = SurbBalancerConfig {
2607            target_surb_buffer_size: NEXT_BALANCER_TARGET,
2608            max_surbs_per_sec: 100,
2609            ..Default::default()
2610        };
2611
2612        // Update to a higher target
2613        alice_mgr
2614            .update_surb_balancer_config(alice_session.id(), new_balancer_cfg)
2615            .await?;
2616
2617        // Let the Surb balancer send enough KeepAlive messages
2618        tokio::time::sleep(Duration::from_millis(1500)).await;
2619
2620        // Bob should know about the updated target
2621        let remote_cfg = bob_mgr
2622            .get_surb_balancer_config(bob_session.session.id())
2623            .await?
2624            .ok_or(anyhow!("no remote config at bob"))?;
2625        assert_eq!(
2626            remote_cfg.target_surb_buffer_size,
2627            new_balancer_cfg.target_surb_buffer_size
2628        );
2629        assert_eq!(
2630            remote_cfg.max_surbs_per_sec,
2631            new_balancer_cfg.target_surb_buffer_size / bob_cfg.minimum_surb_buffer_duration.as_secs()
2632        );
2633
2634        let (alice_surb_sent, alice_surb_used) = alice_mgr.get_surb_level_estimates(alice_session.id()).await?;
2635        let (bob_surb_recv, bob_surb_used) = bob_mgr.get_surb_level_estimates(bob_session.session.id()).await?;
2636
2637        alice_session.close().await?;
2638
2639        assert!(alice_surb_sent > 0, "alice must've sent surbs");
2640        assert!(bob_surb_recv > 0, "bob must've received surbs");
2641        assert!(
2642            bob_surb_recv <= alice_surb_sent,
2643            "bob cannot receive more surbs than alice sent"
2644        );
2645
2646        assert!(alice_surb_used > 0, "alice must see bob used surbs");
2647        assert!(bob_surb_used > 0, "bob must've used surbs");
2648        assert!(
2649            alice_surb_used <= bob_surb_used,
2650            "alice cannot see bob used more surbs than bob actually used"
2651        );
2652
2653        tokio::time::sleep(Duration::from_millis(300)).await;
2654        assert!(matches!(
2655            alice_mgr.ping_session(alice_session.id()).await,
2656            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2657        ));
2658
2659        futures::stream::iter(ahs)
2660            .for_each(|ah| async move { ah.abort() })
2661            .await;
2662
2663        Ok(())
2664    }
2665}