Skip to main content

hopr_transport_session/
manager.rs

1use std::{
2    ops::Range,
3    sync::{Arc, OnceLock},
4    time::Duration,
5};
6
7use anyhow::anyhow;
8use futures::{
9    FutureExt, SinkExt, StreamExt, TryStreamExt,
10    channel::mpsc::{Sender, UnboundedSender},
11    future::AbortHandle,
12    pin_mut,
13};
14use futures_time::future::FutureExt as TimeExt;
15use hopr_async_runtime::AbortableList;
16use hopr_crypto_packet::prelude::HoprPacket;
17use hopr_crypto_random::Randomizable;
18use hopr_internal_types::prelude::HoprPseudonym;
19use hopr_network_types::prelude::*;
20use hopr_primitive_types::prelude::Address;
21use hopr_protocol_app::prelude::*;
22use hopr_protocol_start::{
23    KeepAliveFlag, KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished,
24    StartInitiation,
25};
26use tracing::{debug, error, info, trace, warn};
27
28#[cfg(feature = "telemetry")]
29use crate::telemetry::{SessionLifecycleState, SessionStatsSnapshot, SessionTelemetry};
30use crate::{
31    Capability, HoprSession, IncomingSession, SESSION_MTU, SessionClientConfig, SessionId, SessionTarget,
32    SurbBalancerConfig,
33    balancer::{
34        AtomicSurbFlowEstimator, BalancerStateValues, RateController, RateLimitSinkExt, SurbBalancer,
35        SurbControllerWithCorrection,
36        pid::{PidBalancerController, PidControllerGains},
37        simple::SimpleBalancerController,
38    },
39    errors::{SessionManagerError, TransportSessionError},
40    types::{ByteCapabilities, ClosureReason, HoprSessionConfig, HoprStartProtocol},
41    utils,
42    utils::{SurbNotificationMode, insert_into_next_slot},
43};
44
45#[cfg(all(feature = "prometheus", not(test)))]
46lazy_static::lazy_static! {
47    static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
48        "hopr_session_num_active_sessions",
49        "Number of currently active HOPR sessions"
50    ).unwrap();
51    static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
52        "hopr_session_established_sessions_count",
53        "Number of sessions that were successfully established as an Exit node"
54    ).unwrap();
55    static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
56        "hopr_session_initiated_sessions_count",
57        "Number of sessions that were successfully initiated as an Entry node"
58    ).unwrap();
59    static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
60        "hopr_session_received_error_count",
61        "Number of HOPR session errors received from an Exit node",
62        &["kind"]
63    ).unwrap();
64    static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
65        "hopr_session_sent_error_count",
66        "Number of HOPR session errors sent to an Entry node",
67        &["kind"]
68    ).unwrap();
69}
70
71fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
72    debug!(?session_id, ?reason, "closing session");
73
74    #[cfg(feature = "telemetry")]
75    {
76        session_data.telemetry.set_state(SessionLifecycleState::Closed);
77        session_data.telemetry.touch_activity();
78    }
79
80    if reason != ClosureReason::EmptyRead {
81        // Closing the data sender will also cause it to close from the read side
82        session_data.session_tx.close_channel();
83        trace!(?session_id, "data tx channel closed on session");
84    }
85
86    // Terminate any additional tasks spawned by the Session
87    session_data.abort_handles.lock().abort_all();
88
89    #[cfg(all(feature = "prometheus", not(test)))]
90    METRIC_ACTIVE_SESSIONS.decrement(1.0);
91}
92
93fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
94    base * (hops as u32)
95}
96
97/// Minimum time the SURB buffer must endure if no SURBs are being produced.
98pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
99/// Minimum time between SURB buffer notifications to the Entry.
100pub const MIN_SURB_BUFFER_NOTIFICATION_PERIOD: Duration = Duration::from_secs(1);
101
102/// The first challenge value used in Start protocol to initiate a session.
103pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
104
105/// Maximum time to wait for counterparty to receive the target number of SURBs.
106const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
107
108/// Minimum timeout until an unfinished frame is discarded.
109const MIN_FRAME_TIMEOUT: Duration = Duration::from_millis(10);
110
111// Needs to use an UnboundedSender instead of oneshot
112// because Moka cache requires the value to be Clone, which oneshot Sender is not.
113// It also cannot be enclosed in an Arc, since calling `send` consumes the oneshot Sender.
114type SessionInitiationCache =
115    moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
116
117#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
118enum SessionTasks {
119    KeepAlive,
120    Balancer,
121}
122
123#[derive(Clone)]
124struct SessionSlot {
125    // Sender needs to be put in Arc, so that no clones are made by `moka`.
126    // This makes sure that the entire channel closes once the one and only sender is closed.
127    session_tx: Arc<UnboundedSender<ApplicationDataIn>>,
128    routing_opts: DestinationRouting,
129    // Additional tasks spawned by the Session.
130    abort_handles: Arc<parking_lot::Mutex<AbortableList<SessionTasks>>>,
131    // Additional per-session telemetry
132    #[cfg(feature = "telemetry")]
133    telemetry: Arc<SessionTelemetry>,
134    // Allows reconfiguring of the SURB balancer on-the-fly
135    // Set on both Entry and Exit sides.
136    surb_mgmt: Arc<BalancerStateValues>,
137    // SURB flow updates happening outside of Session protocol
138    // (e.g., due to Start protocol messages).
139    surb_estimator: AtomicSurbFlowEstimator,
140}
141
142/// Indicates the result of processing a message.
143#[derive(Clone, Debug, PartialEq, Eq)]
144pub enum DispatchResult {
145    /// Session or Start protocol message has been processed successfully.
146    Processed,
147    /// The message was not related to Start or Session protocol.
148    Unrelated(ApplicationDataIn),
149}
150
151/// Configuration for the [`SessionManager`].
152#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
153pub struct SessionManagerConfig {
154    /// Ranges of tags available for Sessions.
155    ///
156    /// **NOTE**: If the range starts lower than [`ReservedTag`]'s range end,
157    /// it will be automatically transformed to start at this value.
158    /// This is due to the reserved range by the Start sub-protocol.
159    ///
160    /// Default is 16..1024.
161    #[doc(hidden)]
162    #[default(_code = "16u64..1024u64")]
163    pub session_tag_range: Range<u64>,
164
165    /// The maximum number of sessions (incoming and outgoing) that is allowed
166    /// to be managed by the manager.
167    ///
168    /// When reached, creating [new sessions](SessionManager::new_session) will return
169    /// the [`SessionManagerError::TooManySessions`] error, and incoming sessions will be rejected
170    /// with [`StartErrorReason::NoSlotsAvailable`] Start protocol error.
171    ///
172    /// Default is 128, minimum is 1; maximum is given by the `session_tag_range`.
173    #[default(128)]
174    pub maximum_sessions: usize,
175
176    /// The maximum chunk of data that can be written to the Session's input buffer.
177    ///
178    /// Default is 1500.
179    #[default(1500)]
180    pub frame_mtu: usize,
181
182    /// The maximum time for an incomplete frame to stay in the Session's output buffer.
183    ///
184    /// Default is 800 ms.
185    #[default(Duration::from_millis(800))]
186    pub max_frame_timeout: Duration,
187
188    /// The base timeout for initiation of Session initiation.
189    ///
190    /// The actual timeout is adjusted according to the number of hops for that Session:
191    /// `t = initiation_time_out_base * (num_forward_hops + num_return_hops + 2)`
192    ///
193    /// Default is 500 milliseconds.
194    #[default(Duration::from_millis(500))]
195    pub initiation_timeout_base: Duration,
196
197    /// Timeout for Session to be closed due to inactivity.
198    ///
199    /// Default is 180 seconds.
200    #[default(Duration::from_secs(180))]
201    pub idle_timeout: Duration,
202
203    /// The sampling interval for SURB balancer.
204    /// It will make SURB control decisions regularly at this interval.
205    ///
206    /// Default is 100 milliseconds.
207    #[default(Duration::from_millis(100))]
208    pub balancer_sampling_interval: Duration,
209
210    /// Initial packets per second egress rate on an incoming Session.
211    ///
212    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
213    ///
214    /// Default is 10 packets/second.
215    #[default(10)]
216    pub initial_return_session_egress_rate: usize,
217
218    /// Minimum period of time for which a SURB buffer at the Exit must
219    /// endure if no SURBs are being received.
220    ///
221    /// In other words, it is the minimum period of time an Exit must withstand when
222    /// no SURBs are received from the Entry at all. To do so, the egress traffic
223    /// will be shaped accordingly to meet this requirement.
224    ///
225    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
226    ///
227    /// Default is 5 seconds, minimum is 1 second.
228    #[default(Duration::from_secs(5))]
229    pub minimum_surb_buffer_duration: Duration,
230
231    /// Indicates the maximum number of SURBs in the SURB buffer to be requested when creating a new Session.
232    ///
233    /// This value is theoretically capped by the size of the global transport SURB ring buffer,
234    /// so values greater than that do not make sense. This value should be ideally set equal
235    /// to the size of the global transport SURB RB.
236    ///
237    /// Default is 10 000 SURBs.
238    #[default(10_000)]
239    pub maximum_surb_buffer_size: usize,
240
241    /// If set, the Session recipient (Exit) will notify the Session initiator (Entry) about
242    /// its SURB balance for the Session using keep-alive packets periodically.
243    ///
244    /// Keep in mind that each notification also costs 1 SURB, so the notification period should
245    /// not be too frequent.
246    ///
247    /// Default is None (no notification sent to the client), minimum is 1 second.
248    #[default(None)]
249    pub surb_balance_notify_period: Option<Duration>,
250
251    /// If set, the Session initiator (Entry) will notify the Session recipient (Exit) about
252    /// the local SURB balancer target using keep-alive packets from the SURB balancer.
253    ///
254    /// This is useful when the client plans to change the SURB balancer target dynamically.
255    ///
256    /// Default is true.
257    #[default(true)]
258    pub surb_target_notify: bool,
259}
260
261/// Manages lifecycles of Sessions.
262///
263/// Once the manager is [started](SessionManager::start), the [`SessionManager::dispatch_message`]
264/// should be called for each [`ApplicationData`] received by the node.
265/// This way, the `SessionManager` takes care of proper Start sub-protocol message processing
266/// and correct dispatch of Session-related packets to individual existing Sessions.
267///
268/// Secondly, the manager can initiate new outgoing sessions via [`SessionManager::new_session`],
269/// probe sessions using [`SessionManager::ping_session`]
270/// and list them via [`SessionManager::active_sessions`].
271///
272/// Since the `SessionManager` operates over the HOPR protocol,
273/// the message transport `S` is required.
274/// Such transport must also be `Clone`, since it will be cloned into all the created [`HoprSession`] objects.
275///
276/// ## SURB balancing
277/// The manager also can take care of automatic [SURB balancing](SurbBalancerConfig) per Session.
278///
279/// With each packet sent from the session initiator over to the receiving party, zero to 2 SURBs might be delivered.
280/// When the receiving party wants to send reply packets back, it must consume 1 SURB per packet. This
281/// means that if the difference between the SURBs delivered and SURBs consumed is negative, the receiving party
282/// might soon run out of SURBs. If SURBs run out, the reply packets will be dropped, causing likely quality of
283/// service degradation.
284///
285/// In an attempt to counter this effect, there are two co-existing automated modes of SURB balancing:
286/// *local SURB balancing* and *remote SURB balancing*.
287///
288/// ### Local SURB balancing
289/// Local SURB balancing is performed on the sessions that were initiated by another party (and are
290/// therefore incoming to us).
291/// The local SURB balancing mechanism continuously evaluates the rate of SURB consumption and retrieval,
292/// and if SURBs are running out, the packet egress shaping takes effect. This by itself does not
293/// avoid the depletion of SURBs but slows it down in the hope that the initiating party can deliver
294/// more SURBs over time. This might happen either organically by sending effective payloads that
295/// allow non-zero number of SURBs in the packet, or non-organically by delivering KeepAlive messages
296/// via *remote SURB balancing*.
297///
298/// The egress shaping is done automatically, unless the Session initiator sets the [`Capability::NoRateControl`]
299/// flag during Session initiation.
300///
301/// ### Remote SURB balancing
302/// Remote SURB balancing is performed by the Session initiator. The SURB balancer estimates the number of SURBs
303/// delivered to the other party, and also the number of SURBs consumed by seeing the amount of traffic received
304/// in replies.
305/// When enabled, a desired target level of SURBs at the Session counterparty is set. According to measured
306/// inflow and outflow of SURBs to/from the counterparty, the production of non-organic SURBs is started
307/// via keep-alive messages (sent to counterparty) and is controlled to maintain that target level.
308///
309/// In other words, the Session initiator tries to compensate for the usage of SURBs by the counterparty by
310/// sending new ones via the keep-alive messages.
311///
312/// This mechanism is configurable via the `surb_management` field in [`SessionClientConfig`].
313///
314/// ### Possible scenarios
315/// There are 4 different scenarios of local vs. remote SURB balancing configuration, but
316/// an equilibrium (= matching the SURB production and consumption) is most likely to be reached
317/// only when both are configured (the ideal case below):
318///
319/// #### 1. Ideal local and remote SURB balancing
320/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
321///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
322/// 2. The Session initiator (Entry) sets the [`target_surb_buffer_size`](SurbBalancerConfig) which matches the
323///    [`maximum_surb_buffer_size`](SessionManagerConfig) of the counterparty.
324/// 3. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
325///    Session.
326/// 4. The Session initiator (Entry) sets [`max_surbs_per_sec`](SurbBalancerConfig) slightly higher than the
327///    `maximum_surb_buffer_size / max_surb_buffer_duration` value configured at the counterparty.
328///
329/// In this situation, the maximum Session egress from Exit to the Entry is given by the
330/// `maximum_surb_buffer_size / max_surb_buffer_duration` ratio. If there is enough bandwidth,
331/// the (remote) SURB balancer sending SURBs to the Exit will stabilize roughly at this rate of SURBs/sec,
332/// and the whole system will be in equilibrium during the Session's lifetime (under ideal network conditions).
333///
334/// #### 2. Remote SURB balancing only
335/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
336/// 2. The Session initiator (Entry) sets `max_surbs_per_sec` and `target_surb_buffer_size` values in
337///    [`SurbBalancerConfig`]
338///
339/// In this one-sided situation, the Entry node floods the Exit node with SURBs,
340/// only based on its estimated consumption of SURBs at the Exit. The Exit's egress is not
341/// rate-limited at all. If the Exit runs out of SURBs at any point in time, it will simply drop egress packets.
342///
343/// This configuration could potentially only lead to an equilibrium
344/// when the `SurbBalancer` at the Entry can react fast enough to Exit's demand.
345///
346/// #### 3. Local SURB balancing only
347/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
348///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
349/// 2. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
350///    Session.
351/// 3. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
352///
353/// In this one-sided situation, the Entry node does not provide any additional SURBs at all (except the
354/// ones that are naturally carried by the egress packets which have space to hold SURBs). It relies
355/// only on the Session egress limiting of the Exit node.
356/// The Exit will limit the egress roughly to the rate of natural SURB occurrence in the ingress.
357///
358/// This configuration could potentially only lead to an equilibrium when uploading non-full packets
359/// (ones that can carry at least a single SURB), and the Exit's egress is limiting itself to such a rate.
360/// If Exit's egress reaches low values due to SURB scarcity, the upper layer protocols over Session might break.
361///
362/// #### 4. No SURB balancing on each side
363/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
364/// 2. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
365///
366/// In this situation, no additional SURBs are being produced by the Entry and no Session egress rate-limiting
367/// takes place at the Exit.
368///
369/// This configuration can only lead to an equilibrium when Entry sends non-full packets (ones that carry
370/// at least a single SURB) and the Exit is consuming the SURBs (Session egress) at a slower or equal rate.
371/// Such configuration is very fragile, as any disturbances in the SURB flow might lead to a packet drop
372/// at the Exit's egress.
373///
374/// ### SURB decay
375/// In a hypothetical scenario of a non-zero packet loss, the Session initiator (Entry) might send a
376/// certain number of SURBs to the Session recipient (Exit), but only a portion of it is actually delivered.
377/// The Entry has no way of knowing that and assumes that everything has been delivered.
378/// A similar problem happens when the Exit uses SURBs to construct return packets, but only a portion
379/// of those packets is actually delivered to the Entry. At this point, the Entry also subtracts
380/// fewer SURBs from its SURB estimate at the Exit.
381///
382/// In both situations, the Entry thinks there are more SURBs available at the Exit than there really are.
383///
384/// To compensate for a potential packet loss, the Entry's estimation of Exit's SURB buffer is regularly
385/// diminished by a percentage of the `target_surb_buffer_size`, even if no incoming traffic from the
386/// Exit is detected.
387///
388/// This behavior can be controlled via the `surb_decay` field of [`SurbBalancerConfig`].
389///
390/// ### SURB balance and target notification
391/// The Session recipient (Exit) can notify the Session initiator (Entry) periodically about its estimated
392/// number of SURBs for the Session. This can help the Entry to adjust its approximation of that level so
393/// that its Local SURB balancer can better intervene.
394/// This can be set using the `surb_balance_notify_period` field of [`SessionManagerConfig`] for the Exit.
395///
396/// Likewise, the Entry can inform the Exit about its desired SURB buffer target so that the Exit
397/// can better accommodate its Remote SURB balancing.
398/// This can be set using the `surb_target_notify` field of the [`SessionManagerConfig`] of each new Session.
399///
400/// Both mechanisms leverage the Keep Alive message to report the respective values.
401pub struct SessionManager<S, T> {
402    session_initiations: SessionInitiationCache,
403    #[allow(clippy::type_complexity)]
404    session_notifiers: Arc<OnceLock<(T, Sender<(SessionId, ClosureReason)>)>>,
405    sessions: moka::future::Cache<SessionId, SessionSlot>,
406    msg_sender: Arc<OnceLock<S>>,
407    cfg: SessionManagerConfig,
408}
409
410impl<S, T> Clone for SessionManager<S, T> {
411    fn clone(&self) -> Self {
412        Self {
413            session_initiations: self.session_initiations.clone(),
414            session_notifiers: self.session_notifiers.clone(),
415            sessions: self.sessions.clone(),
416            cfg: self.cfg.clone(),
417            msg_sender: self.msg_sender.clone(),
418        }
419    }
420}
421
422const EXTERNAL_SEND_TIMEOUT: Duration = Duration::from_millis(200);
423
424impl<S, T> SessionManager<S, T>
425where
426    S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
427    T: futures::Sink<IncomingSession> + Clone + Send + Sync + Unpin + 'static,
428    S::Error: std::error::Error + Send + Sync + Clone + 'static,
429    T::Error: std::error::Error + Send + Sync + Clone + 'static,
430{
431    /// Creates a new instance given the [`config`](SessionManagerConfig).
432    pub fn new(mut cfg: SessionManagerConfig) -> Self {
433        let min_session_tag_range_reservation = ReservedTag::range().end;
434        debug_assert!(
435            min_session_tag_range_reservation > HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG.as_u64(),
436            "invalid tag reservation range"
437        );
438
439        // Accommodate the lower bound if too low.
440        if cfg.session_tag_range.start < min_session_tag_range_reservation {
441            let diff = min_session_tag_range_reservation - cfg.session_tag_range.start;
442            cfg.session_tag_range = min_session_tag_range_reservation..cfg.session_tag_range.end + diff;
443        }
444        cfg.maximum_sessions = cfg
445            .maximum_sessions
446            .clamp(1, (cfg.session_tag_range.end - cfg.session_tag_range.start) as usize);
447        cfg.surb_balance_notify_period = cfg
448            .surb_balance_notify_period
449            .map(|p| p.max(MIN_SURB_BUFFER_NOTIFICATION_PERIOD));
450        cfg.minimum_surb_buffer_duration = cfg.minimum_surb_buffer_duration.max(MIN_SURB_BUFFER_DURATION);
451
452        // Ensure the Frame MTU is at least the size of the Session segment MTU payload
453        cfg.frame_mtu = cfg.frame_mtu.max(SESSION_MTU);
454        cfg.max_frame_timeout = cfg.max_frame_timeout.max(MIN_FRAME_TIMEOUT);
455
456        #[cfg(all(feature = "prometheus", not(test)))]
457        METRIC_ACTIVE_SESSIONS.set(0.0);
458
459        let msg_sender = Arc::new(OnceLock::new());
460        Self {
461            msg_sender: msg_sender.clone(),
462            session_initiations: moka::future::Cache::builder()
463                .max_capacity(cfg.maximum_sessions as u64)
464                .time_to_live(
465                    2 * initiation_timeout_max_one_way(
466                        cfg.initiation_timeout_base,
467                        RoutingOptions::MAX_INTERMEDIATE_HOPS,
468                    ),
469                )
470                .build(),
471            sessions: moka::future::Cache::builder()
472                .max_capacity(cfg.maximum_sessions as u64)
473                .time_to_idle(cfg.idle_timeout)
474                .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
475                    moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
476                        trace!(?session_id, ?reason, "session evicted from the cache");
477                        close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
478                    }
479                    _ => {}
480                })
481                .build(),
482            session_notifiers: Arc::new(OnceLock::new()),
483            cfg,
484        }
485    }
486
487    /// Starts the instance with the given `msg_sender` `Sink`
488    /// and a channel `new_session_notifier` used to notify when a new incoming session is opened to us.
489    ///
490    /// This method must be called prior to any calls to [`SessionManager::new_session`] or
491    /// [`SessionManager::dispatch_message`].
492    pub fn start(&self, msg_sender: S, new_session_notifier: T) -> crate::errors::Result<Vec<AbortHandle>> {
493        self.msg_sender
494            .set(msg_sender)
495            .map_err(|_| SessionManagerError::AlreadyStarted)?;
496
497        let (session_close_tx, session_close_rx) = futures::channel::mpsc::channel(self.cfg.maximum_sessions + 10);
498        self.session_notifiers
499            .set((new_session_notifier, session_close_tx))
500            .map_err(|_| SessionManagerError::AlreadyStarted)?;
501
502        let myself = self.clone();
503        let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable!(session_close_rx.for_each_concurrent(
504            None,
505            move |(session_id, closure_reason)| {
506                let myself = myself.clone();
507                async move {
508                    // These notifications come from the Sessions themselves once
509                    // an empty read is encountered, which means the closure was done by the
510                    // other party.
511                    if let Some(session_data) = myself.sessions.remove(&session_id).await {
512                        close_session(session_id, session_data, closure_reason);
513                    } else {
514                        // Do not treat this as an error
515                        debug!(
516                            ?session_id,
517                            ?closure_reason,
518                            "could not find session id to close, maybe the session is already closed"
519                        );
520                    }
521                }
522            },
523        ));
524
525        // This is necessary to evict expired entries from the caches if
526        // no session-related operations happen at all.
527        // This ensures the dangling expired sessions are properly closed
528        // and their closure is timely notified to the other party.
529        let myself = self.clone();
530        let ah_session_expiration = hopr_async_runtime::spawn_as_abortable!(async move {
531            let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
532            let timeout = 2 * initiation_timeout_max_one_way(
533                myself.cfg.initiation_timeout_base,
534                RoutingOptions::MAX_INTERMEDIATE_HOPS,
535            )
536            .min(myself.cfg.idle_timeout)
537            .mul_f64(jitter)
538                / 2;
539            futures_time::stream::interval(timeout.into())
540                .for_each(|_| {
541                    trace!("executing session cache evictions");
542                    futures::future::join(
543                        myself.sessions.run_pending_tasks(),
544                        myself.session_initiations.run_pending_tasks(),
545                    )
546                    .map(|_| ())
547                })
548                .await;
549        });
550
551        Ok(vec![ah_closure_notifications, ah_session_expiration])
552    }
553
554    /// Check if [`start`](SessionManager::start) has been called and the instance is running.
555    pub fn is_started(&self) -> bool {
556        self.session_notifiers.get().is_some()
557    }
558
559    async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
560        // We currently do not support loopback Sessions on ourselves.
561        if let moka::ops::compute::CompResult::Inserted(_) = self
562            .sessions
563            .entry(session_id)
564            .and_compute_with(|entry| {
565                futures::future::ready(if entry.is_none() {
566                    moka::ops::compute::Op::Put(slot)
567                } else {
568                    moka::ops::compute::Op::Nop
569                })
570            })
571            .await
572        {
573            #[cfg(all(feature = "prometheus", not(test)))]
574            {
575                METRIC_NUM_INITIATED_SESSIONS.increment();
576                METRIC_ACTIVE_SESSIONS.increment(1.0);
577            }
578
579            Ok(())
580        } else {
581            // Session already exists; it means it is most likely a loopback attempt
582            error!(%session_id, "session already exists - loopback attempt");
583            Err(SessionManagerError::Loopback.into())
584        }
585    }
586
587    /// Initiates a new outgoing Session to `destination` with the given configuration.
588    ///
589    /// If the Session's counterparty does not respond within
590    /// the [configured](SessionManagerConfig) period,
591    /// this method returns [`TransportSessionError::Timeout`].
592    ///
593    /// It will also fail if the instance has not been [started](SessionManager::start).
594    pub async fn new_session(
595        &self,
596        destination: Address,
597        target: SessionTarget,
598        cfg: SessionClientConfig,
599    ) -> crate::errors::Result<HoprSession> {
600        self.sessions.run_pending_tasks().await;
601        if self.cfg.maximum_sessions <= self.sessions.entry_count() as usize {
602            return Err(SessionManagerError::TooManySessions.into());
603        }
604
605        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
606
607        let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
608        let (challenge, _) = insert_into_next_slot(
609            &self.session_initiations,
610            |ch| {
611                if let Some(challenge) = ch {
612                    ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
613                } else {
614                    hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
615                }
616            },
617            |_| tx_initiation_done,
618        )
619        .await
620        .ok_or(SessionManagerError::NoChallengeSlots)?; // almost impossible with u64
621
622        // Prepare the session initiation message in the Start protocol
623        trace!(challenge, ?cfg, "initiating session with config");
624        let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
625            challenge,
626            target,
627            capabilities: ByteCapabilities(cfg.capabilities),
628            additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
629                cfg.surb_management
630                    .map(|c| c.target_surb_buffer_size)
631                    .unwrap_or(
632                        self.cfg.initial_return_session_egress_rate as u64
633                            * self
634                                .cfg
635                                .minimum_surb_buffer_duration
636                                .max(MIN_SURB_BUFFER_DURATION)
637                                .as_secs(),
638                    )
639                    .min(u32::MAX as u64) as u32
640            } else {
641                0
642            },
643        });
644
645        let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
646        let forward_routing = DestinationRouting::Forward {
647            destination: Box::new(destination.into()),
648            pseudonym: Some(pseudonym), // Session must use a fixed pseudonym already
649            forward_options: cfg.forward_path_options.clone(),
650            return_options: cfg.return_path_options.clone().into(),
651        };
652
653        // Send the Session initiation message
654        info!(challenge, %pseudonym, %destination, "new session request");
655        msg_sender
656            .send((
657                forward_routing.clone(),
658                ApplicationDataOut::with_no_packet_info(start_session_msg.try_into()?),
659            ))
660            .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
661            .await
662            .map_err(|_| {
663                error!(challenge, %pseudonym, %destination, "timeout sending session request message");
664                TransportSessionError::Timeout
665            })?
666            .map_err(TransportSessionError::packet_sending)?;
667
668        // The timeout is given by the number of hops requested
669        let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
670            self.cfg.initiation_timeout_base,
671            cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
672        )
673        .into();
674
675        // Await session establishment response from the Exit node or timeout
676        pin_mut!(rx_initiation_done);
677
678        trace!(challenge, "awaiting session establishment");
679        match rx_initiation_done.try_next().timeout(initiation_timeout).await {
680            Ok(Ok(Some(est))) => {
681                // Session has been established, construct it
682                let session_id = est.session_id;
683                debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
684
685                let (tx, rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
686                let notifier = self
687                    .session_notifiers
688                    .get()
689                    .map(|(_, notifier)| {
690                        let mut notifier = notifier.clone();
691                        Box::new(move |session_id: SessionId, reason: ClosureReason| {
692                            let _ = notifier
693                                .try_send((session_id, reason))
694                                .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
695                        })
696                    })
697                    .ok_or(SessionManagerError::NotStarted)?;
698
699                #[cfg(feature = "telemetry")]
700                let telemetry = Arc::new(SessionTelemetry::new(
701                    session_id,
702                    HoprSessionConfig {
703                        capabilities: cfg.capabilities,
704                        frame_mtu: self.cfg.frame_mtu,
705                        frame_timeout: self.cfg.max_frame_timeout,
706                    },
707                ));
708
709                // NOTE: the Exit node can have different `max_surb_buffer_size`
710                // setting on the Session manager, so it does not make sense to cap it here
711                // with our maximum value.
712                if let Some(balancer_config) = cfg.surb_management {
713                    let surb_estimator = AtomicSurbFlowEstimator::default();
714
715                    // Sender responsible for keep-alive and Session data will be counting produced SURBs
716                    let surb_estimator_clone = surb_estimator.clone();
717                    let full_surb_scoring_sender =
718                        msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
719                            // Count how many SURBs we sent with each packet
720                            surb_estimator_clone.produced.fetch_add(
721                                data.estimate_surbs_with_msg() as u64,
722                                std::sync::atomic::Ordering::Relaxed,
723                            );
724                            futures::future::ok::<_, S::Error>((routing, data))
725                        });
726
727                    // For standard Session data we first reduce the number of SURBs we want to produce,
728                    // unless requested to always max them out
729                    let max_out_organic_surbs = cfg.always_max_out_surbs;
730                    let reduced_surb_scoring_sender = full_surb_scoring_sender.clone().with(
731                        // NOTE: this is put in-front of the `full_surb_scoring_sender`,
732                        // so that its estimate of SURBs gets automatically updated based on
733                        // the `max_surbs_in_packets` set here.
734                        move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
735                            if !max_out_organic_surbs {
736                                // TODO: make this dynamic to honor the balancer target (#7439)
737                                data.packet_info
738                                    .get_or_insert_with(|| OutgoingPacketInfo {
739                                        max_surbs_in_packet: 1,
740                                        ..Default::default()
741                                    })
742                                    .max_surbs_in_packet = 1;
743                            }
744                            futures::future::ok::<_, S::Error>((routing, data))
745                        },
746                    );
747
748                    let mut abort_handles = AbortableList::default();
749                    let surb_mgmt = Arc::new(BalancerStateValues::from(balancer_config));
750
751                    // Spawn the SURB-bearing keep alive stream towards the Exit
752                    let (ka_controller, ka_abort_handle) = utils::spawn_keep_alive_stream(
753                        session_id,
754                        full_surb_scoring_sender,
755                        forward_routing.clone(),
756                        if self.cfg.surb_target_notify {
757                            SurbNotificationMode::Target
758                        } else {
759                            SurbNotificationMode::DoNotNotify
760                        },
761                        surb_mgmt.clone(),
762                    );
763                    abort_handles.insert(SessionTasks::KeepAlive, ka_abort_handle);
764
765                    // Spawn the SURB balancer, which will decide on the initial SURB rate.
766                    debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
767                    let balancer = SurbBalancer::new(
768                        session_id,
769                        // The setpoint and output limit is immediately reconfigured by the SurbBalancer
770                        PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
771                        surb_estimator.clone(),
772                        // Currently, a keep-alive message can bear `HoprPacket::MAX_SURBS_IN_PACKET` SURBs,
773                        // so the correction by this factor is applied.
774                        SurbControllerWithCorrection(ka_controller, HoprPacket::MAX_SURBS_IN_PACKET as u32),
775                        surb_mgmt.clone(),
776                    );
777
778                    let (level_stream, balancer_abort_handle) =
779                        balancer.start_control_loop(self.cfg.balancer_sampling_interval);
780                    abort_handles.insert(SessionTasks::Balancer, balancer_abort_handle);
781
782                    // If the insertion fails prematurely, it will also kill all the abort handles
783                    self.insert_session_slot(
784                        session_id,
785                        SessionSlot {
786                            session_tx: Arc::new(tx),
787                            routing_opts: forward_routing.clone(),
788                            abort_handles: Arc::new(parking_lot::Mutex::new(abort_handles)),
789                            surb_mgmt: surb_mgmt.clone(),
790                            surb_estimator: surb_estimator.clone(),
791                            #[cfg(feature = "telemetry")]
792                            telemetry: telemetry.clone(),
793                        },
794                    )
795                    .await?;
796
797                    #[cfg(feature = "telemetry")]
798                    {
799                        telemetry.set_state(SessionLifecycleState::Active);
800                        telemetry.set_balancer_data(surb_estimator.clone(), surb_mgmt);
801                    }
802
803                    // Wait for enough SURBs to be sent to the counterparty
804                    // TODO: consider making this interactive = other party reports the exact level periodically
805                    match level_stream
806                        .skip_while(|current_level| {
807                            futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
808                        })
809                        .next()
810                        .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
811                        .await
812                    {
813                        Ok(Some(surb_level)) => {
814                            info!(%session_id, surb_level, "session is ready");
815                        }
816                        Ok(None) => {
817                            return Err(
818                                SessionManagerError::other(anyhow!("surb balancer was cancelled prematurely")).into(),
819                            );
820                        }
821                        Err(_) => {
822                            warn!(%session_id, "session didn't reach target SURB buffer size in time");
823                        }
824                    }
825
826                    HoprSession::new(
827                        session_id,
828                        forward_routing,
829                        HoprSessionConfig {
830                            capabilities: cfg.capabilities,
831                            frame_mtu: self.cfg.frame_mtu,
832                            frame_timeout: self.cfg.max_frame_timeout,
833                        },
834                        (
835                            reduced_surb_scoring_sender,
836                            rx.inspect(move |_| {
837                                // Received packets = SURB consumption estimate
838                                // The received packets always consume a single SURB.
839                                surb_estimator
840                                    .consumed
841                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
842                            }),
843                        ),
844                        Some(notifier),
845                        #[cfg(feature = "telemetry")]
846                        telemetry,
847                    )
848                } else {
849                    warn!(%session_id, "session ready without SURB balancing");
850
851                    self.insert_session_slot(
852                        session_id,
853                        SessionSlot {
854                            session_tx: Arc::new(tx),
855                            routing_opts: forward_routing.clone(),
856                            abort_handles: Default::default(),
857                            surb_mgmt: Default::default(),      // Disabled SURB management
858                            surb_estimator: Default::default(), // No SURB estimator needed
859                            #[cfg(feature = "telemetry")]
860                            telemetry: telemetry.clone(),
861                        },
862                    )
863                    .await?;
864                    #[cfg(feature = "telemetry")]
865                    telemetry.set_state(SessionLifecycleState::Active);
866
867                    // For standard Session data we first reduce the number of SURBs we want to produce,
868                    // unless requested to always max them out
869                    let max_out_organic_surbs = cfg.always_max_out_surbs;
870                    let reduced_surb_sender =
871                        msg_sender.with(move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
872                            if !max_out_organic_surbs {
873                                data.packet_info
874                                    .get_or_insert_with(|| OutgoingPacketInfo {
875                                        max_surbs_in_packet: 1,
876                                        ..Default::default()
877                                    })
878                                    .max_surbs_in_packet = 1;
879                            }
880                            futures::future::ok::<_, S::Error>((routing, data))
881                        });
882
883                    HoprSession::new(
884                        session_id,
885                        forward_routing,
886                        HoprSessionConfig {
887                            capabilities: cfg.capabilities,
888                            frame_mtu: self.cfg.frame_mtu,
889                            frame_timeout: self.cfg.max_frame_timeout,
890                        },
891                        (reduced_surb_sender, rx),
892                        Some(notifier),
893                        #[cfg(feature = "telemetry")]
894                        telemetry,
895                    )
896                }
897            }
898            Ok(Ok(None)) => Err(SessionManagerError::other(anyhow!(
899                "internal error: sender has been closed without completing the session establishment"
900            ))
901            .into()),
902            Ok(Err(error)) => {
903                // The other side did not allow us to establish a session
904                error!(
905                    challenge = error.challenge,
906                    ?error,
907                    "the other party rejected the session initiation with error"
908                );
909                Err(TransportSessionError::Rejected(error.reason))
910            }
911            Err(_) => {
912                // Timeout waiting for a session establishment
913                error!(challenge, "session initiation attempt timed out");
914
915                #[cfg(all(feature = "prometheus", not(test)))]
916                METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
917
918                Err(TransportSessionError::Timeout)
919            }
920        }
921    }
922
923    /// Sends a keep-alive packet with the given [`SessionId`].
924    ///
925    /// This currently "fires & forgets" and does not expect nor await any "pong" response.
926    pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
927        if let Some(session_data) = self.sessions.get(id).await {
928            trace!(session_id = ?id, "pinging manually session");
929            Ok(self
930                .msg_sender
931                .get()
932                .cloned()
933                .ok_or(SessionManagerError::NotStarted)?
934                .send((
935                    session_data.routing_opts.clone(),
936                    ApplicationDataOut::with_no_packet_info(HoprStartProtocol::KeepAlive((*id).into()).try_into()?),
937                ))
938                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
939                .await
940                .map_err(|_| {
941                    error!("timeout sending session ping message");
942                    TransportSessionError::Timeout
943                })?
944                .map_err(TransportSessionError::packet_sending)?)
945        } else {
946            Err(SessionManagerError::NonExistingSession.into())
947        }
948    }
949
950    /// Returns [`SessionIds`](SessionId) of all currently active sessions.
951    pub async fn active_sessions(&self) -> Vec<SessionId> {
952        self.sessions.run_pending_tasks().await;
953        self.sessions.iter().map(|(k, _)| *k).collect()
954    }
955
956    /// Updates the configuration of the SURB balancer on the given [`SessionId`].
957    ///
958    /// Returns an error if the Session with the given `id` does not exist, or
959    /// if it does not use SURB balancing.
960    pub async fn update_surb_balancer_config(
961        &self,
962        id: &SessionId,
963        config: SurbBalancerConfig,
964    ) -> crate::errors::Result<()> {
965        let cfg = self
966            .sessions
967            .get(id)
968            .await
969            .ok_or(SessionManagerError::NonExistingSession)?
970            .surb_mgmt;
971
972        // Only update the config if there already was one before
973        if !cfg.is_disabled() {
974            cfg.update(&config);
975            Ok(())
976        } else {
977            Err(SessionManagerError::other(anyhow!("session does not use SURB balancing")).into())
978        }
979    }
980
981    /// Retrieves the configuration of SURB balancing for the given Session.
982    ///
983    /// Returns an error if the Session with the given `id` does not exist.
984    pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
985        match self.sessions.get(id).await {
986            Some(session) => Ok(Some(session.surb_mgmt.as_ref())
987                .filter(|c| !c.is_disabled())
988                .map(|d| d.as_config())),
989            None => Err(SessionManagerError::NonExistingSession.into()),
990        }
991    }
992
993    /// Gets estimations produced/received and consumed SURBs by the Session.
994    ///
995    /// For an outgoing Session (Entry) the pair is the number of SURBs sent (by us) and used (by the Exit).
996    /// For an incoming Session (Exit) the pair is the number of SURBs received (from Entry) and used (by us).
997    ///
998    /// Returns an error if the Session with the given `id` does not exist.
999    pub async fn get_surb_level_estimates(&self, id: &SessionId) -> crate::errors::Result<(u64, u64)> {
1000        match self.sessions.get(id).await {
1001            Some(session) => Ok((
1002                session
1003                    .surb_estimator
1004                    .produced
1005                    .load(std::sync::atomic::Ordering::Relaxed),
1006                session
1007                    .surb_estimator
1008                    .consumed
1009                    .load(std::sync::atomic::Ordering::Relaxed),
1010            )),
1011            None => Err(SessionManagerError::NonExistingSession.into()),
1012        }
1013    }
1014
1015    /// Retrieves useful statistics of the given [session](HoprSession).
1016    ///
1017    /// Returns an error if the Session with the given `id` does not exist.
1018    #[cfg(feature = "telemetry")]
1019    pub async fn get_session_telemetry(&self, id: &SessionId) -> crate::errors::Result<SessionStatsSnapshot> {
1020        match self.sessions.get(id).await.map(|session| session.telemetry) {
1021            Some(telemetry) => Ok(telemetry.snapshot()),
1022            None => Err(SessionManagerError::NonExistingSession.into()),
1023        }
1024    }
1025
1026    /// The main method to be called whenever data are received.
1027    ///
1028    /// It tries to recognize the message and correctly dispatches either
1029    /// the Session protocol or Start protocol messages.
1030    ///
1031    /// If the data are not recognized, they are returned as [`DispatchResult::Unrelated`].
1032    pub async fn dispatch_message(
1033        &self,
1034        pseudonym: HoprPseudonym,
1035        in_data: ApplicationDataIn,
1036    ) -> crate::errors::Result<DispatchResult> {
1037        if in_data.data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
1038            // This is a Start protocol message, so we handle it
1039            trace!("dispatching Start protocol message");
1040            return self
1041                .handle_start_protocol_message(pseudonym, in_data)
1042                .await
1043                .map(|_| DispatchResult::Processed);
1044        } else if self
1045            .cfg
1046            .session_tag_range
1047            .contains(&in_data.data.application_tag.as_u64())
1048        {
1049            let session_id = SessionId::new(in_data.data.application_tag, pseudonym);
1050
1051            return if let Some(session_slot) = self.sessions.get(&session_id).await {
1052                trace!(?session_id, "received data for a registered session");
1053
1054                Ok(session_slot
1055                    .session_tx
1056                    .unbounded_send(in_data)
1057                    .map(|_| DispatchResult::Processed)
1058                    .map_err(SessionManagerError::other)?)
1059            } else {
1060                error!(%session_id, "received data from an unestablished session");
1061                Err(TransportSessionError::UnknownData)
1062            };
1063        }
1064
1065        trace!(tag = %in_data.data.application_tag, "received data not associated with session protocol or any existing session");
1066        Ok(DispatchResult::Unrelated(in_data))
1067    }
1068
1069    async fn handle_incoming_session_initiation(
1070        &self,
1071        pseudonym: HoprPseudonym,
1072        session_req: StartInitiation<SessionTarget, ByteCapabilities>,
1073    ) -> crate::errors::Result<()> {
1074        trace!(challenge = session_req.challenge, "received session initiation request");
1075
1076        debug!(%pseudonym, "got new session request, searching for a free session slot");
1077
1078        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
1079
1080        let (mut new_session_notifier, mut close_session_notifier) = self
1081            .session_notifiers
1082            .get()
1083            .cloned()
1084            .ok_or(SessionManagerError::NotStarted)?;
1085
1086        // Reply routing uses SURBs only with the pseudonym of this Session's ID
1087        let reply_routing = DestinationRouting::Return(pseudonym.into());
1088
1089        let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1090
1091        // Search for a free Session ID slot
1092        self.sessions.run_pending_tasks().await; // Needed so that entry_count is updated
1093        let allocated_slot = if self.cfg.maximum_sessions > self.sessions.entry_count() as usize {
1094            insert_into_next_slot(
1095                &self.sessions,
1096                |sid| {
1097                    // NOTE: It is allowed to insert sessions using the same tag
1098                    // but different pseudonyms because the SessionId is different.
1099                    let next_tag: Tag = match sid {
1100                        Some(session_id) => ((session_id.tag().as_u64() + 1) % self.cfg.session_tag_range.end)
1101                            .max(self.cfg.session_tag_range.start)
1102                            .into(),
1103                        None => hopr_crypto_random::random_integer(
1104                            self.cfg.session_tag_range.start,
1105                            Some(self.cfg.session_tag_range.end),
1106                        )
1107                        .into(),
1108                    };
1109                    SessionId::new(next_tag, pseudonym)
1110                },
1111                |_sid| SessionSlot {
1112                    session_tx: Arc::new(tx_session_data),
1113                    routing_opts: reply_routing.clone(),
1114                    abort_handles: Default::default(),
1115                    surb_mgmt: Default::default(),
1116                    surb_estimator: Default::default(),
1117                    #[cfg(feature = "telemetry")]
1118                    telemetry: Arc::new(SessionTelemetry::new(
1119                        _sid,
1120                        HoprSessionConfig {
1121                            capabilities: session_req.capabilities.0,
1122                            frame_mtu: self.cfg.frame_mtu,
1123                            frame_timeout: self.cfg.max_frame_timeout,
1124                        },
1125                    )),
1126                },
1127            )
1128            .await
1129        } else {
1130            error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
1131            None
1132        };
1133
1134        // If some of the following code throws an error, the allocated slot will be kept
1135        // but will be later re-claimed when it expires.
1136        if let Some((session_id, slot)) = allocated_slot {
1137            debug!(%session_id, ?session_req, "assigned a new session");
1138
1139            let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
1140                if let Err(error) = close_session_notifier.try_send((session_id, reason)) {
1141                    error!(%session_id, %error, %reason, "failed to notify session closure");
1142                }
1143            });
1144
1145            let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
1146                // Because of SURB scarcity, control the egress rate of incoming sessions
1147                let egress_rate_control =
1148                    RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1149
1150                // The Session request carries a "hint" as additional data telling what
1151                // the Session initiator has configured as its target buffer size in the Balancer.
1152                let target_surb_buffer_size = if session_req.additional_data > 0 {
1153                    (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1154                } else {
1155                    self.cfg.initial_return_session_egress_rate as u64
1156                        * self
1157                            .cfg
1158                            .minimum_surb_buffer_duration
1159                            .max(MIN_SURB_BUFFER_DURATION)
1160                            .as_secs()
1161                };
1162
1163                let surb_estimator_clone = slot.surb_estimator.clone();
1164                let session = HoprSession::new(
1165                    session_id,
1166                    reply_routing.clone(),
1167                    HoprSessionConfig {
1168                        capabilities: session_req.capabilities.into(),
1169                        frame_mtu: self.cfg.frame_mtu,
1170                        frame_timeout: self.cfg.max_frame_timeout,
1171                    },
1172                    (
1173                        // Sent packets = SURB consumption estimate
1174                        msg_sender
1175                            .clone()
1176                            .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1177                                // Each outgoing packet consumes one SURB
1178                                surb_estimator_clone
1179                                    .consumed
1180                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1181                                futures::future::ok::<_, S::Error>((routing, data))
1182                            })
1183                            .rate_limit_with_controller(&egress_rate_control)
1184                            .buffer((2 * target_surb_buffer_size) as usize),
1185                        // Received packets = SURB retrieval estimate
1186                        rx_session_data.inspect(move |data| {
1187                            // Count the number of SURBs delivered with each incoming packet
1188                            surb_estimator_clone
1189                                .produced
1190                                .fetch_add(data.num_surbs_with_msg() as u64, std::sync::atomic::Ordering::Relaxed);
1191                        }),
1192                    ),
1193                    Some(closure_notifier),
1194                    #[cfg(feature = "telemetry")]
1195                    slot.telemetry.clone(),
1196                )?;
1197
1198                // The SURB balancer will start intervening by rate-limiting the
1199                // egress of the Session, once the estimated number of SURBs drops below
1200                // the target defined here. Otherwise, the maximum egress is allowed.
1201                let balancer_config = SurbBalancerConfig {
1202                    target_surb_buffer_size,
1203                    // At maximum egress, the SURB buffer drains in `minimum_surb_buffer_duration` seconds
1204                    max_surbs_per_sec: target_surb_buffer_size / self.cfg.minimum_surb_buffer_duration.as_secs(),
1205                    // No SURB decay at the Exit, since we know almost exactly how many SURBs
1206                    // were received
1207                    surb_decay: None,
1208                };
1209
1210                slot.surb_mgmt.update(&balancer_config);
1211
1212                #[cfg(feature = "telemetry")]
1213                {
1214                    slot.telemetry.set_state(SessionLifecycleState::Active);
1215                    slot.telemetry
1216                        .set_balancer_data(slot.surb_estimator.clone(), slot.surb_mgmt.clone());
1217                }
1218
1219                // Spawn the SURB balancer only once we know we have registered the
1220                // abort handle with the pre-allocated Session slot
1221                debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1222                let balancer = SurbBalancer::new(
1223                    session_id,
1224                    SimpleBalancerController::default(),
1225                    slot.surb_estimator.clone(),
1226                    SurbControllerWithCorrection(egress_rate_control, 1), // 1 SURB per egress packet
1227                    slot.surb_mgmt.clone(),
1228                );
1229
1230                // Assign the SURB balancer and abort handles to the already allocated Session slot
1231                let (_, balancer_abort_handle) = balancer.start_control_loop(self.cfg.balancer_sampling_interval);
1232                slot.abort_handles
1233                    .lock()
1234                    .insert(SessionTasks::Balancer, balancer_abort_handle);
1235
1236                // Spawn a keep-alive stream notifying about the SURB buffer level towards the Entry
1237                if let Some(period) = self.cfg.surb_balance_notify_period {
1238                    let surb_estimator_clone = slot.surb_estimator.clone();
1239                    let (ka_controller, ka_abort_handle) = utils::spawn_keep_alive_stream(
1240                        session_id,
1241                        // Sent Keep-Alive packets also contribute to SURB consumption
1242                        msg_sender
1243                            .clone()
1244                            .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1245                                // Each sent keepalive consumes 1 SURB
1246                                surb_estimator_clone
1247                                    .consumed
1248                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1249                                futures::future::ok::<_, S::Error>((routing, data))
1250                            }),
1251                        slot.routing_opts.clone(),
1252                        SurbNotificationMode::Level(slot.surb_estimator.clone()),
1253                        slot.surb_mgmt.clone(),
1254                    );
1255
1256                    // Start keepalive stream towards the Entry with a predefined period
1257                    hopr_async_runtime::prelude::spawn(async move {
1258                        // Delay the stream execution by one period
1259                        hopr_async_runtime::prelude::sleep(period).await;
1260                        ka_controller.set_rate_per_unit(1, period);
1261                    });
1262
1263                    slot.abort_handles
1264                        .lock()
1265                        .insert(SessionTasks::KeepAlive, ka_abort_handle);
1266
1267                    debug!(%session_id, ?period, "started SURB level-notifying keep-alive stream");
1268                }
1269
1270                session
1271            } else {
1272                HoprSession::new(
1273                    session_id,
1274                    reply_routing.clone(),
1275                    HoprSessionConfig {
1276                        capabilities: session_req.capabilities.into(),
1277                        frame_mtu: self.cfg.frame_mtu,
1278                        frame_timeout: self.cfg.max_frame_timeout,
1279                    },
1280                    (msg_sender.clone(), rx_session_data),
1281                    Some(closure_notifier),
1282                    #[cfg(feature = "telemetry")]
1283                    slot.telemetry.clone(),
1284                )?
1285            };
1286
1287            // Extract useful information about the session from the Start protocol message
1288            let incoming_session = IncomingSession {
1289                session,
1290                target: session_req.target,
1291            };
1292
1293            // Notify that a new incoming session has been created
1294            match new_session_notifier
1295                .send(incoming_session)
1296                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1297                .await
1298            {
1299                Err(_) => {
1300                    error!(%session_id, "timeout to notify about new incoming session");
1301                    return Err(TransportSessionError::Timeout);
1302                }
1303                Ok(Err(error)) => {
1304                    error!(%session_id, %error, "failed to notify about new incoming session");
1305                    return Err(SessionManagerError::other(error).into());
1306                }
1307                _ => {}
1308            };
1309
1310            trace!(?session_id, "session notification sent");
1311
1312            // Notify the sender that the session has been established.
1313            // Set our peer ID in the session ID sent back to them.
1314            let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1315                orig_challenge: session_req.challenge,
1316                session_id,
1317            });
1318
1319            msg_sender
1320                .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1321                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1322                .await
1323                .map_err(|_| {
1324                    error!(%session_id, "timeout sending session establishment message");
1325                    TransportSessionError::Timeout
1326                })?
1327                .map_err(|error| {
1328                    error!(%session_id, %error, "failed to send session establishment message");
1329                    SessionManagerError::other(error)
1330                })?;
1331
1332            info!(%session_id, "new session established");
1333
1334            #[cfg(all(feature = "prometheus", not(test)))]
1335            {
1336                METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1337                METRIC_ACTIVE_SESSIONS.increment(1.0);
1338            }
1339        } else {
1340            error!(%pseudonym,"failed to reserve a new session slot");
1341
1342            // Notify the sender that the session could not be established
1343            let reason = StartErrorReason::NoSlotsAvailable;
1344            let data = HoprStartProtocol::SessionError(StartErrorType {
1345                challenge: session_req.challenge,
1346                reason,
1347            });
1348
1349            msg_sender
1350                .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1351                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1352                .await
1353                .map_err(|_| {
1354                    error!("timeout sending session error message");
1355                    TransportSessionError::Timeout
1356                })?
1357                .map_err(|error| {
1358                    error!(%error, "failed to send session error message");
1359                    SessionManagerError::other(error)
1360                })?;
1361
1362            trace!(%pseudonym, "session establishment failure message sent");
1363
1364            #[cfg(all(feature = "prometheus", not(test)))]
1365            METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1366        }
1367
1368        Ok(())
1369    }
1370
1371    async fn handle_start_protocol_message(
1372        &self,
1373        pseudonym: HoprPseudonym,
1374        data: ApplicationDataIn,
1375    ) -> crate::errors::Result<()> {
1376        match HoprStartProtocol::try_from(data.data)? {
1377            HoprStartProtocol::StartSession(session_req) => {
1378                self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1379            }
1380            HoprStartProtocol::SessionEstablished(est) => {
1381                trace!(
1382                    session_id = ?est.session_id,
1383                    "received session establishment confirmation"
1384                );
1385                let challenge = est.orig_challenge;
1386                let session_id = est.session_id;
1387                if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1388                    if let Err(error) = tx_est.unbounded_send(Ok(est)) {
1389                        error!(%challenge, %session_id, %error, "failed to send session establishment confirmation");
1390                        return Err(SessionManagerError::other(error).into());
1391                    }
1392                    debug!(?session_id, challenge, "session establishment complete");
1393                } else {
1394                    error!(%session_id, challenge, "unknown session establishment attempt or expired");
1395                }
1396            }
1397            HoprStartProtocol::SessionError(error_type) => {
1398                trace!(
1399                    challenge = error_type.challenge,
1400                    error = ?error_type.reason,
1401                    "failed to initialize a session",
1402                );
1403                // Currently, we do not distinguish between individual error types
1404                // and just discard the initiation attempt and pass on the error.
1405                if let Some(tx_est) = self.session_initiations.remove(&error_type.challenge).await {
1406                    if let Err(error) = tx_est.unbounded_send(Err(error_type)) {
1407                        error!(%error, ?error_type, "could not send session error message");
1408                        return Err(SessionManagerError::other(error).into());
1409                    }
1410                    error!(
1411                        challenge = error_type.challenge,
1412                        ?error_type,
1413                        "session establishment error received"
1414                    );
1415                } else {
1416                    error!(
1417                        challenge = error_type.challenge,
1418                        ?error_type,
1419                        "session establishment attempt expired before error could be delivered"
1420                    );
1421                }
1422
1423                #[cfg(all(feature = "prometheus", not(test)))]
1424                METRIC_RECEIVED_SESSION_ERRS.increment(&[&error_type.reason.to_string()])
1425            }
1426            HoprStartProtocol::KeepAlive(msg) => {
1427                let session_id = msg.session_id;
1428                if let Some(session_slot) = self.sessions.get(&session_id).await {
1429                    trace!(?session_id, "received keep-alive message");
1430                    match &session_slot.routing_opts {
1431                        // Session is outgoing - keep-alive was received from the Exit
1432                        DestinationRouting::Forward { .. } => {
1433                            if msg.flags.contains(KeepAliveFlag::BalancerState)
1434                                && !session_slot.surb_mgmt.is_disabled()
1435                                && session_slot.surb_mgmt.buffer_level() != msg.additional_data
1436                            {
1437                                // Update the buffer level as sent to us from the Exit
1438                                session_slot
1439                                    .surb_mgmt
1440                                    .buffer_level
1441                                    .store(msg.additional_data, std::sync::atomic::Ordering::Relaxed);
1442                                debug!(%session_id, surb_level = msg.additional_data, "keep-alive updated SURB buffer size from the Exit");
1443                            }
1444
1445                            // Increase the number of consumed SURBs in the estimator
1446                            session_slot
1447                                .surb_estimator
1448                                .consumed
1449                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1450                        }
1451                        // Session is incoming - keep-alive was received from the Entry
1452                        DestinationRouting::Return(_) => {
1453                            // Allow updating SURB balancer target based on the received Keep-Alive message
1454                            if msg.flags.contains(KeepAliveFlag::BalancerTarget)
1455                                && msg.additional_data > 0
1456                                && !session_slot.surb_mgmt.is_disabled()
1457                                && session_slot.surb_mgmt.controller_bounds().target() != msg.additional_data
1458                            {
1459                                // Update the target buffer size as sent to us from the Entry
1460                                session_slot
1461                                    .surb_mgmt
1462                                    .target_surb_buffer_size
1463                                    .store(msg.additional_data, std::sync::atomic::Ordering::Relaxed);
1464                                // Update maximum SURBs per second based on the new target
1465                                session_slot.surb_mgmt.max_surbs_per_sec.store(
1466                                    msg.additional_data / self.cfg.minimum_surb_buffer_duration.as_secs(),
1467                                    std::sync::atomic::Ordering::Relaxed,
1468                                );
1469                                debug!(%session_id, target_surb_buffer_size = msg.additional_data, "keep-alive updated SURB balancer target buffer size from the Entry");
1470                            }
1471
1472                            // Increase the number of received SURBs in the estimator.
1473                            // Typically, 2 SURBs per Keep-Alive message
1474                            session_slot.surb_estimator.produced.fetch_add(
1475                                KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64,
1476                                std::sync::atomic::Ordering::Relaxed,
1477                            );
1478                        }
1479                    }
1480                } else {
1481                    debug!(%session_id, "received keep-alive request for an unknown session");
1482                }
1483            }
1484        }
1485
1486        Ok(())
1487    }
1488}
1489
1490#[cfg(test)]
1491mod tests {
1492    use anyhow::anyhow;
1493    use futures::{AsyncWriteExt, future::BoxFuture};
1494    use hopr_crypto_random::Randomizable;
1495    use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
1496    use hopr_primitive_types::prelude::Address;
1497    use hopr_protocol_start::{StartProtocol, StartProtocolDiscriminants};
1498    use tokio::time::timeout;
1499
1500    use super::*;
1501    use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1502
1503    #[async_trait::async_trait]
1504    trait SendMsg {
1505        async fn send_message(
1506            &self,
1507            routing: DestinationRouting,
1508            data: ApplicationDataOut,
1509        ) -> crate::errors::Result<()>;
1510    }
1511
1512    mockall::mock! {
1513        MsgSender {}
1514        impl SendMsg for MsgSender {
1515            fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationDataOut)
1516            -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1517        }
1518    }
1519
1520    fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationDataOut)> {
1521        let (tx, rx) = futures::channel::mpsc::unbounded();
1522        tokio::task::spawn(async move {
1523            pin_mut!(rx);
1524            while let Some((routing, data)) = rx.next().await {
1525                sender
1526                    .send_message(routing, data)
1527                    .await
1528                    .expect("send message must not fail in mock");
1529            }
1530        });
1531        tx
1532    }
1533
1534    fn msg_type(data: &ApplicationDataOut, expected: StartProtocolDiscriminants) -> bool {
1535        HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1536            .map(|d| StartProtocolDiscriminants::from(d) == expected)
1537            .unwrap_or(false)
1538    }
1539
1540    fn start_msg_match(data: &ApplicationDataOut, msg: impl Fn(HoprStartProtocol) -> bool) -> bool {
1541        HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1542            .map(msg)
1543            .unwrap_or(false)
1544    }
1545
1546    #[test_log::test(tokio::test)]
1547    async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1548    {
1549        let alice_pseudonym = HoprPseudonym::random();
1550        let bob_peer: Address = (&ChainKeypair::random()).into();
1551
1552        let alice_mgr = SessionManager::new(Default::default());
1553        let bob_mgr = SessionManager::new(Default::default());
1554
1555        let mut sequence = mockall::Sequence::new();
1556        let mut alice_transport = MockMsgSender::new();
1557        let mut bob_transport = MockMsgSender::new();
1558
1559        // Alice sends the StartSession message
1560        let bob_mgr_clone = bob_mgr.clone();
1561        alice_transport
1562            .expect_send_message()
1563            .once()
1564            .in_sequence(&mut sequence)
1565            .withf(move |peer, data| {
1566                info!("alice sends {}", data.data.application_tag);
1567                msg_type(data, StartProtocolDiscriminants::StartSession)
1568                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1569            })
1570            .returning(move |_, data| {
1571                let bob_mgr_clone = bob_mgr_clone.clone();
1572                Box::pin(async move {
1573                    bob_mgr_clone
1574                        .dispatch_message(
1575                            alice_pseudonym,
1576                            ApplicationDataIn {
1577                                data: data.data,
1578                                packet_info: Default::default(),
1579                            },
1580                        )
1581                        .await?;
1582                    Ok(())
1583                })
1584            });
1585
1586        // Bob sends the SessionEstablished message
1587        let alice_mgr_clone = alice_mgr.clone();
1588        bob_transport
1589            .expect_send_message()
1590            .once()
1591            .in_sequence(&mut sequence)
1592            .withf(move |peer, data| {
1593                info!("bob sends {}", data.data.application_tag);
1594                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1595                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1596            })
1597            .returning(move |_, data| {
1598                let alice_mgr_clone = alice_mgr_clone.clone();
1599
1600                Box::pin(async move {
1601                    alice_mgr_clone
1602                        .dispatch_message(
1603                            alice_pseudonym,
1604                            ApplicationDataIn {
1605                                data: data.data,
1606                                packet_info: Default::default(),
1607                            },
1608                        )
1609                        .await?;
1610                    Ok(())
1611                })
1612            });
1613
1614        // Alice sends the terminating segment to close the Session
1615        let bob_mgr_clone = bob_mgr.clone();
1616        alice_transport
1617            .expect_send_message()
1618            .once()
1619            .in_sequence(&mut sequence)
1620            .withf(move |peer, data| {
1621                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1622                    data.data.plain_text.as_ref(),
1623                )
1624                .expect("must be a session message")
1625                .try_as_segment()
1626                .expect("must be a segment")
1627                .is_terminating()
1628                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1629            })
1630            .returning(move |_, data| {
1631                let bob_mgr_clone = bob_mgr_clone.clone();
1632                Box::pin(async move {
1633                    bob_mgr_clone
1634                        .dispatch_message(
1635                            alice_pseudonym,
1636                            ApplicationDataIn {
1637                                data: data.data,
1638                                packet_info: Default::default(),
1639                            },
1640                        )
1641                        .await?;
1642                    Ok(())
1643                })
1644            });
1645
1646        let mut ahs = Vec::new();
1647
1648        // Start Alice
1649        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1650        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1651
1652        // Start Bob
1653        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1654        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1655
1656        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1657
1658        pin_mut!(new_session_rx_bob);
1659        let (alice_session, bob_session) = timeout(
1660            Duration::from_secs(2),
1661            futures::future::join(
1662                alice_mgr.new_session(
1663                    bob_peer,
1664                    SessionTarget::TcpStream(target.clone()),
1665                    SessionClientConfig {
1666                        pseudonym: alice_pseudonym.into(),
1667                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1668                        surb_management: None,
1669                        ..Default::default()
1670                    },
1671                ),
1672                new_session_rx_bob.next(),
1673            ),
1674        )
1675        .await?;
1676
1677        let mut alice_session = alice_session?;
1678        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1679
1680        assert_eq!(
1681            alice_session.config().capabilities,
1682            Capability::Segmentation | Capability::NoRateControl
1683        );
1684        assert_eq!(
1685            alice_session.config().capabilities,
1686            bob_session.session.config().capabilities
1687        );
1688        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1689
1690        assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1691        assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1692        assert!(
1693            alice_mgr
1694                .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1695                .await
1696                .is_err()
1697        );
1698
1699        assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1700        assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1701        assert!(
1702            bob_mgr
1703                .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1704                .await
1705                .is_err()
1706        );
1707
1708        tokio::time::sleep(Duration::from_millis(100)).await;
1709        alice_session.close().await?;
1710
1711        tokio::time::sleep(Duration::from_millis(100)).await;
1712
1713        assert!(matches!(
1714            alice_mgr.ping_session(alice_session.id()).await,
1715            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1716        ));
1717
1718        futures::stream::iter(ahs)
1719            .for_each(|ah| async move { ah.abort() })
1720            .await;
1721
1722        Ok(())
1723    }
1724
1725    #[test_log::test(tokio::test)]
1726    async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1727        let alice_pseudonym = HoprPseudonym::random();
1728        let bob_peer: Address = (&ChainKeypair::random()).into();
1729
1730        let cfg = SessionManagerConfig {
1731            idle_timeout: Duration::from_millis(200),
1732            ..Default::default()
1733        };
1734
1735        let alice_mgr = SessionManager::new(cfg);
1736        let bob_mgr = SessionManager::new(Default::default());
1737
1738        let mut sequence = mockall::Sequence::new();
1739        let mut alice_transport = MockMsgSender::new();
1740        let mut bob_transport = MockMsgSender::new();
1741
1742        // Alice sends the StartSession message
1743        let bob_mgr_clone = bob_mgr.clone();
1744        alice_transport
1745            .expect_send_message()
1746            .once()
1747            .in_sequence(&mut sequence)
1748            .withf(move |peer, data| {
1749                msg_type(data, StartProtocolDiscriminants::StartSession)
1750                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1751            })
1752            .returning(move |_, data| {
1753                let bob_mgr_clone = bob_mgr_clone.clone();
1754                Box::pin(async move {
1755                    bob_mgr_clone
1756                        .dispatch_message(
1757                            alice_pseudonym,
1758                            ApplicationDataIn {
1759                                data: data.data,
1760                                packet_info: Default::default(),
1761                            },
1762                        )
1763                        .await?;
1764                    Ok(())
1765                })
1766            });
1767
1768        // Bob sends the SessionEstablished message
1769        let alice_mgr_clone = alice_mgr.clone();
1770        bob_transport
1771            .expect_send_message()
1772            .once()
1773            .in_sequence(&mut sequence)
1774            .withf(move |peer, data| {
1775                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1776                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1777            })
1778            .returning(move |_, data| {
1779                let alice_mgr_clone = alice_mgr_clone.clone();
1780
1781                Box::pin(async move {
1782                    alice_mgr_clone
1783                        .dispatch_message(
1784                            alice_pseudonym,
1785                            ApplicationDataIn {
1786                                data: data.data,
1787                                packet_info: Default::default(),
1788                            },
1789                        )
1790                        .await?;
1791                    Ok(())
1792                })
1793            });
1794
1795        let mut ahs = Vec::new();
1796
1797        // Start Alice
1798        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1799        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1800
1801        // Start Bob
1802        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1803        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1804
1805        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1806
1807        pin_mut!(new_session_rx_bob);
1808        let (alice_session, bob_session) = timeout(
1809            Duration::from_secs(2),
1810            futures::future::join(
1811                alice_mgr.new_session(
1812                    bob_peer,
1813                    SessionTarget::TcpStream(target.clone()),
1814                    SessionClientConfig {
1815                        pseudonym: alice_pseudonym.into(),
1816                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1817                        surb_management: None,
1818                        ..Default::default()
1819                    },
1820                ),
1821                new_session_rx_bob.next(),
1822            ),
1823        )
1824        .await?;
1825
1826        let alice_session = alice_session?;
1827        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1828
1829        assert_eq!(
1830            alice_session.config().capabilities,
1831            Capability::Segmentation | Capability::NoRateControl,
1832        );
1833        assert_eq!(
1834            alice_session.config().capabilities,
1835            bob_session.session.config().capabilities
1836        );
1837        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1838
1839        // Let the session timeout at Alice
1840        tokio::time::sleep(Duration::from_millis(300)).await;
1841
1842        assert!(matches!(
1843            alice_mgr.ping_session(alice_session.id()).await,
1844            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1845        ));
1846
1847        futures::stream::iter(ahs)
1848            .for_each(|ah| async move { ah.abort() })
1849            .await;
1850
1851        Ok(())
1852    }
1853
1854    #[test_log::test(tokio::test)]
1855    async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1856        let alice_pseudonym = HoprPseudonym::random();
1857        let session_id = SessionId::new(16u64, alice_pseudonym);
1858        let balancer_cfg = SurbBalancerConfig {
1859            target_surb_buffer_size: 1000,
1860            max_surbs_per_sec: 100,
1861            ..Default::default()
1862        };
1863
1864        let alice_mgr = SessionManager::<
1865            UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
1866            futures::channel::mpsc::Sender<IncomingSession>,
1867        >::new(Default::default());
1868
1869        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1870        alice_mgr
1871            .sessions
1872            .insert(
1873                session_id,
1874                SessionSlot {
1875                    session_tx: Arc::new(dummy_tx),
1876                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1877                    abort_handles: Default::default(),
1878                    surb_mgmt: Arc::new(BalancerStateValues::from(balancer_cfg)),
1879                    surb_estimator: Default::default(),
1880                    #[cfg(feature = "telemetry")]
1881                    telemetry: Arc::new(SessionTelemetry::new(session_id, Default::default())),
1882                },
1883            )
1884            .await;
1885
1886        let actual_cfg = alice_mgr
1887            .get_surb_balancer_config(&session_id)
1888            .await?
1889            .ok_or(anyhow!("session must have a surb balancer config"))?;
1890        assert_eq!(actual_cfg, balancer_cfg);
1891
1892        let new_cfg = SurbBalancerConfig {
1893            target_surb_buffer_size: 2000,
1894            max_surbs_per_sec: 200,
1895            ..Default::default()
1896        };
1897        alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1898
1899        let actual_cfg = alice_mgr
1900            .get_surb_balancer_config(&session_id)
1901            .await?
1902            .ok_or(anyhow!("session must have a surb balancer config"))?;
1903        assert_eq!(actual_cfg, new_cfg);
1904
1905        Ok(())
1906    }
1907
1908    #[test_log::test(tokio::test)]
1909    async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1910        let alice_pseudonym = HoprPseudonym::random();
1911        let bob_peer: Address = (&ChainKeypair::random()).into();
1912
1913        let cfg = SessionManagerConfig {
1914            session_tag_range: 16u64..17u64, // Slot for exactly one session
1915            ..Default::default()
1916        };
1917
1918        let alice_mgr = SessionManager::new(Default::default());
1919        let bob_mgr = SessionManager::new(cfg);
1920
1921        // Occupy the only free slot with tag 16
1922        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1923        bob_mgr
1924            .sessions
1925            .insert(
1926                SessionId::new(16u64, alice_pseudonym),
1927                SessionSlot {
1928                    session_tx: Arc::new(dummy_tx),
1929                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1930                    abort_handles: Default::default(),
1931                    #[cfg(feature = "telemetry")]
1932                    telemetry: Arc::new(SessionTelemetry::new(
1933                        SessionId::new(16u64, alice_pseudonym),
1934                        Default::default(),
1935                    )),
1936                    surb_mgmt: Default::default(),
1937                    surb_estimator: Default::default(),
1938                },
1939            )
1940            .await;
1941
1942        let mut sequence = mockall::Sequence::new();
1943        let mut alice_transport = MockMsgSender::new();
1944        let mut bob_transport = MockMsgSender::new();
1945
1946        // Alice sends the StartSession message
1947        let bob_mgr_clone = bob_mgr.clone();
1948        alice_transport
1949            .expect_send_message()
1950            .once()
1951            .in_sequence(&mut sequence)
1952            .withf(move |peer, data| {
1953                msg_type(data, StartProtocolDiscriminants::StartSession)
1954                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1955            })
1956            .returning(move |_, data| {
1957                let bob_mgr_clone = bob_mgr_clone.clone();
1958                Box::pin(async move {
1959                    bob_mgr_clone
1960                        .dispatch_message(
1961                            alice_pseudonym,
1962                            ApplicationDataIn {
1963                                data: data.data,
1964                                packet_info: Default::default(),
1965                            },
1966                        )
1967                        .await?;
1968                    Ok(())
1969                })
1970            });
1971
1972        // Bob sends the SessionError message
1973        let alice_mgr_clone = alice_mgr.clone();
1974        bob_transport
1975            .expect_send_message()
1976            .once()
1977            .in_sequence(&mut sequence)
1978            .withf(move |peer, data| {
1979                msg_type(data, StartProtocolDiscriminants::SessionError)
1980                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1981            })
1982            .returning(move |_, data| {
1983                let alice_mgr_clone = alice_mgr_clone.clone();
1984                Box::pin(async move {
1985                    alice_mgr_clone
1986                        .dispatch_message(
1987                            alice_pseudonym,
1988                            ApplicationDataIn {
1989                                data: data.data,
1990                                packet_info: Default::default(),
1991                            },
1992                        )
1993                        .await?;
1994                    Ok(())
1995                })
1996            });
1997
1998        let mut jhs = Vec::new();
1999
2000        // Start Alice
2001        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2002        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2003
2004        // Start Bob
2005        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2006        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2007
2008        let result = alice_mgr
2009            .new_session(
2010                bob_peer,
2011                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2012                SessionClientConfig {
2013                    capabilities: Capabilities::empty(),
2014                    pseudonym: alice_pseudonym.into(),
2015                    surb_management: None,
2016                    ..Default::default()
2017                },
2018            )
2019            .await;
2020
2021        assert!(
2022            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2023        );
2024
2025        Ok(())
2026    }
2027
2028    #[test_log::test(tokio::test)]
2029    async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
2030    -> anyhow::Result<()> {
2031        let alice_pseudonym = HoprPseudonym::random();
2032        let bob_peer: Address = (&ChainKeypair::random()).into();
2033
2034        let cfg = SessionManagerConfig {
2035            maximum_sessions: 1,
2036            ..Default::default()
2037        };
2038
2039        let alice_mgr = SessionManager::new(Default::default());
2040        let bob_mgr = SessionManager::new(cfg);
2041
2042        // Occupy the only free slot with tag 16
2043        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
2044        bob_mgr
2045            .sessions
2046            .insert(
2047                SessionId::new(16u64, alice_pseudonym),
2048                SessionSlot {
2049                    session_tx: Arc::new(dummy_tx),
2050                    routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
2051                    abort_handles: Default::default(),
2052                    surb_mgmt: Default::default(),
2053                    surb_estimator: Default::default(),
2054                    #[cfg(feature = "telemetry")]
2055                    telemetry: Arc::new(SessionTelemetry::new(
2056                        SessionId::new(16u64, alice_pseudonym),
2057                        Default::default(),
2058                    )),
2059                },
2060            )
2061            .await;
2062
2063        let mut sequence = mockall::Sequence::new();
2064        let mut alice_transport = MockMsgSender::new();
2065        let mut bob_transport = MockMsgSender::new();
2066
2067        // Alice sends the StartSession message
2068        let bob_mgr_clone = bob_mgr.clone();
2069        alice_transport
2070            .expect_send_message()
2071            .once()
2072            .in_sequence(&mut sequence)
2073            .withf(move |peer, data| {
2074                msg_type(data, StartProtocolDiscriminants::StartSession)
2075                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2076            })
2077            .returning(move |_, data| {
2078                let bob_mgr_clone = bob_mgr_clone.clone();
2079                Box::pin(async move {
2080                    bob_mgr_clone
2081                        .dispatch_message(
2082                            alice_pseudonym,
2083                            ApplicationDataIn {
2084                                data: data.data,
2085                                packet_info: Default::default(),
2086                            },
2087                        )
2088                        .await?;
2089                    Ok(())
2090                })
2091            });
2092
2093        // Bob sends the SessionError message
2094        let alice_mgr_clone = alice_mgr.clone();
2095        bob_transport
2096            .expect_send_message()
2097            .once()
2098            .in_sequence(&mut sequence)
2099            .withf(move |peer, data| {
2100                msg_type(data, StartProtocolDiscriminants::SessionError)
2101                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2102            })
2103            .returning(move |_, data| {
2104                let alice_mgr_clone = alice_mgr_clone.clone();
2105                Box::pin(async move {
2106                    alice_mgr_clone
2107                        .dispatch_message(
2108                            alice_pseudonym,
2109                            ApplicationDataIn {
2110                                data: data.data,
2111                                packet_info: Default::default(),
2112                            },
2113                        )
2114                        .await?;
2115                    Ok(())
2116                })
2117            });
2118
2119        let mut jhs = Vec::new();
2120
2121        // Start Alice
2122        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2123        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2124
2125        // Start Bob
2126        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2127        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2128
2129        let result = alice_mgr
2130            .new_session(
2131                bob_peer,
2132                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2133                SessionClientConfig {
2134                    capabilities: None.into(),
2135                    pseudonym: alice_pseudonym.into(),
2136                    surb_management: None,
2137                    ..Default::default()
2138                },
2139            )
2140            .await;
2141
2142        assert!(
2143            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2144        );
2145
2146        Ok(())
2147    }
2148
2149    #[test_log::test(tokio::test)]
2150    async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
2151        let alice_pseudonym = HoprPseudonym::random();
2152        let bob_peer: Address = (&ChainKeypair::random()).into();
2153
2154        let alice_mgr = SessionManager::new(Default::default());
2155
2156        let mut sequence = mockall::Sequence::new();
2157        let mut alice_transport = MockMsgSender::new();
2158
2159        // Alice sends the StartSession message
2160        let alice_mgr_clone = alice_mgr.clone();
2161        alice_transport
2162            .expect_send_message()
2163            .once()
2164            .in_sequence(&mut sequence)
2165            .withf(move |peer, data| {
2166                msg_type(data, StartProtocolDiscriminants::StartSession)
2167                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2168            })
2169            .returning(move |_, data| {
2170                // But the message is again processed by Alice due to Loopback
2171                let alice_mgr_clone = alice_mgr_clone.clone();
2172                Box::pin(async move {
2173                    alice_mgr_clone
2174                        .dispatch_message(
2175                            alice_pseudonym,
2176                            ApplicationDataIn {
2177                                data: data.data,
2178                                packet_info: Default::default(),
2179                            },
2180                        )
2181                        .await?;
2182                    Ok(())
2183                })
2184            });
2185
2186        // Alice sends the SessionEstablished message (as Bob)
2187        let alice_mgr_clone = alice_mgr.clone();
2188        alice_transport
2189            .expect_send_message()
2190            .once()
2191            .in_sequence(&mut sequence)
2192            .withf(move |peer, data| {
2193                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2194                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2195            })
2196            .returning(move |_, data| {
2197                let alice_mgr_clone = alice_mgr_clone.clone();
2198
2199                Box::pin(async move {
2200                    alice_mgr_clone
2201                        .dispatch_message(
2202                            alice_pseudonym,
2203                            ApplicationDataIn {
2204                                data: data.data,
2205                                packet_info: Default::default(),
2206                            },
2207                        )
2208                        .await?;
2209                    Ok(())
2210                })
2211            });
2212
2213        // Start Alice
2214        let (new_session_tx_alice, new_session_rx_alice) = futures::channel::mpsc::channel(1024);
2215        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2216
2217        let alice_session = alice_mgr
2218            .new_session(
2219                bob_peer,
2220                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2221                SessionClientConfig {
2222                    capabilities: None.into(),
2223                    pseudonym: alice_pseudonym.into(),
2224                    surb_management: None,
2225                    ..Default::default()
2226                },
2227            )
2228            .await;
2229
2230        println!("{alice_session:?}");
2231        assert!(matches!(
2232            alice_session,
2233            Err(TransportSessionError::Manager(SessionManagerError::Loopback))
2234        ));
2235
2236        drop(new_session_rx_alice);
2237        Ok(())
2238    }
2239
2240    #[test_log::test(tokio::test)]
2241    async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
2242        let bob_peer: Address = (&ChainKeypair::random()).into();
2243
2244        let cfg = SessionManagerConfig {
2245            initiation_timeout_base: Duration::from_millis(100),
2246            ..Default::default()
2247        };
2248
2249        let alice_mgr = SessionManager::new(cfg);
2250        let bob_mgr = SessionManager::new(Default::default());
2251
2252        let mut sequence = mockall::Sequence::new();
2253        let mut alice_transport = MockMsgSender::new();
2254        let bob_transport = MockMsgSender::new();
2255
2256        // Alice sends the StartSession message, but Bob does not handle it
2257        alice_transport
2258            .expect_send_message()
2259            .once()
2260            .in_sequence(&mut sequence)
2261            .withf(move |peer, data| {
2262                msg_type(data, StartProtocolDiscriminants::StartSession)
2263                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2264            })
2265            .returning(|_, _| Box::pin(async { Ok(()) }));
2266
2267        // Start Alice
2268        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2269        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2270
2271        // Start Bob
2272        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2273        bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
2274
2275        let result = alice_mgr
2276            .new_session(
2277                bob_peer,
2278                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2279                SessionClientConfig {
2280                    capabilities: None.into(),
2281                    pseudonym: None,
2282                    surb_management: None,
2283                    ..Default::default()
2284                },
2285            )
2286            .await;
2287
2288        assert!(matches!(result, Err(TransportSessionError::Timeout)));
2289
2290        Ok(())
2291    }
2292
2293    #[test_log::test(tokio::test)]
2294    async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
2295        let alice_pseudonym = HoprPseudonym::random();
2296        let bob_peer: Address = (&ChainKeypair::random()).into();
2297
2298        let bob_cfg = SessionManagerConfig {
2299            surb_balance_notify_period: Some(Duration::from_millis(500)),
2300            ..Default::default()
2301        };
2302        let alice_mgr = SessionManager::new(Default::default());
2303        let bob_mgr = SessionManager::new(bob_cfg.clone());
2304
2305        let mut alice_transport = MockMsgSender::new();
2306        let mut bob_transport = MockMsgSender::new();
2307
2308        // Alice sends the StartSession message
2309        let mut open_sequence = mockall::Sequence::new();
2310        let bob_mgr_clone = bob_mgr.clone();
2311        alice_transport
2312            .expect_send_message()
2313            .once()
2314            .in_sequence(&mut open_sequence)
2315            .withf(move |peer, data| {
2316                msg_type(data, StartProtocolDiscriminants::StartSession)
2317                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2318            })
2319            .returning(move |_, data| {
2320                let bob_mgr_clone = bob_mgr_clone.clone();
2321                Box::pin(async move {
2322                    bob_mgr_clone
2323                        .dispatch_message(
2324                            alice_pseudonym,
2325                            ApplicationDataIn {
2326                                data: data.data,
2327                                packet_info: Default::default(),
2328                            },
2329                        )
2330                        .await?;
2331                    Ok(())
2332                })
2333            });
2334
2335        // Bob sends the SessionEstablished message
2336        let alice_mgr_clone = alice_mgr.clone();
2337        bob_transport
2338            .expect_send_message()
2339            .once()
2340            .in_sequence(&mut open_sequence)
2341            .withf(move |peer, data| {
2342                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2343                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2344            })
2345            .returning(move |_, data| {
2346                let alice_mgr_clone = alice_mgr_clone.clone();
2347                Box::pin(async move {
2348                    alice_mgr_clone
2349                        .dispatch_message(
2350                            alice_pseudonym,
2351                            ApplicationDataIn {
2352                                data: data.data,
2353                                packet_info: Default::default(),
2354                            },
2355                        )
2356                        .await?;
2357                    Ok(())
2358                })
2359            });
2360
2361        const INITIAL_BALANCER_TARGET: u64 = 10;
2362
2363        // Alice sends the KeepAlive messages reporting the initial balancer target
2364        let bob_mgr_clone = bob_mgr.clone();
2365        alice_transport
2366            .expect_send_message()
2367            .times(5..)
2368            //.in_sequence(&mut sequence)
2369            .withf(move |peer, data| {
2370                start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerTarget) && ka.additional_data == INITIAL_BALANCER_TARGET))
2371                //msg_type(data, StartProtocolDiscriminants::KeepAlive)
2372                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2373            })
2374            .returning(move |_, data| {
2375                let bob_mgr_clone = bob_mgr_clone.clone();
2376                Box::pin(async move {
2377                    bob_mgr_clone
2378                        .dispatch_message(
2379                            alice_pseudonym,
2380                            ApplicationDataIn {
2381                                data: data.data,
2382                                packet_info: Default::default(),
2383                            },
2384                        )
2385                        .await?;
2386                    Ok(())
2387                })
2388            });
2389
2390        const NEXT_BALANCER_TARGET: u64 = 50;
2391
2392        // Alice sends also the KeepAlive messages reporting the updated balancer target
2393        let bob_mgr_clone = bob_mgr.clone();
2394        alice_transport
2395            .expect_send_message()
2396            .times(5..)
2397            //.in_sequence(&mut sequence)
2398            .withf(move |peer, data| {
2399                start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerTarget) && ka.additional_data == NEXT_BALANCER_TARGET))
2400                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2401            })
2402            .returning(move |_, data| {
2403                let bob_mgr_clone = bob_mgr_clone.clone();
2404                Box::pin(async move {
2405                    bob_mgr_clone
2406                        .dispatch_message(
2407                            alice_pseudonym,
2408                            ApplicationDataIn {
2409                                data: data.data,
2410                                packet_info: Default::default(),
2411                            },
2412                        )
2413                        .await?;
2414                    Ok(())
2415                })
2416            });
2417
2418        // Bob sends at least 1 Keep Alive back reporting its SURB estimate
2419        let alice_mgr_clone = alice_mgr.clone();
2420        bob_transport
2421            .expect_send_message()
2422            .times(1..)
2423            //.in_sequence(&mut open_sequence)
2424            .withf(move |peer, data| {
2425                start_msg_match(&data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerState) && ka.additional_data > 0))
2426                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2427            })
2428            .returning(move |_, data| {
2429                let alice_mgr_clone = alice_mgr_clone.clone();
2430                Box::pin(async move {
2431                    alice_mgr_clone
2432                        .dispatch_message(
2433                            alice_pseudonym,
2434                            ApplicationDataIn {
2435                                data: data.data,
2436                                packet_info: Default::default(),
2437                            },
2438                        )
2439                        .await?;
2440                    Ok(())
2441                })
2442            });
2443
2444        // Alice sends the terminating segment to close the Session
2445        let bob_mgr_clone = bob_mgr.clone();
2446        alice_transport
2447            .expect_send_message()
2448            .once()
2449            //.in_sequence(&mut sequence)
2450            .withf(move |peer, data| {
2451                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2452                    data.data.plain_text.as_ref(),
2453                )
2454                .ok()
2455                .and_then(|m| m.try_as_segment())
2456                .map(|s| s.is_terminating())
2457                .unwrap_or(false)
2458                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2459            })
2460            .returning(move |_, data| {
2461                let bob_mgr_clone = bob_mgr_clone.clone();
2462                Box::pin(async move {
2463                    bob_mgr_clone
2464                        .dispatch_message(
2465                            alice_pseudonym,
2466                            ApplicationDataIn {
2467                                data: data.data,
2468                                packet_info: Default::default(),
2469                            },
2470                        )
2471                        .await?;
2472                    Ok(())
2473                })
2474            });
2475
2476        let mut ahs = Vec::new();
2477
2478        // Start Alice
2479        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2480        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2481
2482        // Start Bob
2483        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
2484        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2485
2486        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2487
2488        let balancer_cfg = SurbBalancerConfig {
2489            target_surb_buffer_size: INITIAL_BALANCER_TARGET,
2490            max_surbs_per_sec: 100,
2491            ..Default::default()
2492        };
2493
2494        pin_mut!(new_session_rx_bob);
2495        let (alice_session, bob_session) = timeout(
2496            Duration::from_secs(2),
2497            futures::future::join(
2498                alice_mgr.new_session(
2499                    bob_peer,
2500                    SessionTarget::TcpStream(target.clone()),
2501                    SessionClientConfig {
2502                        pseudonym: alice_pseudonym.into(),
2503                        capabilities: Capability::Segmentation.into(),
2504                        surb_management: Some(balancer_cfg),
2505                        ..Default::default()
2506                    },
2507                ),
2508                new_session_rx_bob.next(),
2509            ),
2510        )
2511        .await?;
2512
2513        let mut alice_session = alice_session?;
2514        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2515
2516        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2517
2518        assert_eq!(
2519            Some(balancer_cfg),
2520            alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2521        );
2522
2523        let remote_cfg = bob_mgr
2524            .get_surb_balancer_config(bob_session.session.id())
2525            .await?
2526            .ok_or(anyhow!("no remote config at bob"))?;
2527        assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2528        assert_eq!(
2529            remote_cfg.max_surbs_per_sec,
2530            remote_cfg.target_surb_buffer_size
2531                / bob_cfg
2532                    .minimum_surb_buffer_duration
2533                    .max(MIN_SURB_BUFFER_DURATION)
2534                    .as_secs()
2535        );
2536
2537        // Let the Surb balancer send enough KeepAlive messages
2538        tokio::time::sleep(Duration::from_millis(1500)).await;
2539
2540        let new_balancer_cfg = SurbBalancerConfig {
2541            target_surb_buffer_size: NEXT_BALANCER_TARGET,
2542            max_surbs_per_sec: 100,
2543            ..Default::default()
2544        };
2545
2546        // Update to a higher target
2547        alice_mgr
2548            .update_surb_balancer_config(alice_session.id(), new_balancer_cfg)
2549            .await?;
2550
2551        // Let the Surb balancer send enough KeepAlive messages
2552        tokio::time::sleep(Duration::from_millis(1500)).await;
2553
2554        // Bob should know about the updated target
2555        let remote_cfg = bob_mgr
2556            .get_surb_balancer_config(bob_session.session.id())
2557            .await?
2558            .ok_or(anyhow!("no remote config at bob"))?;
2559        assert_eq!(
2560            remote_cfg.target_surb_buffer_size,
2561            new_balancer_cfg.target_surb_buffer_size
2562        );
2563        assert_eq!(
2564            remote_cfg.max_surbs_per_sec,
2565            new_balancer_cfg.target_surb_buffer_size / bob_cfg.minimum_surb_buffer_duration.as_secs()
2566        );
2567
2568        let (alice_surb_sent, alice_surb_used) = alice_mgr.get_surb_level_estimates(alice_session.id()).await?;
2569        let (bob_surb_recv, bob_surb_used) = bob_mgr.get_surb_level_estimates(bob_session.session.id()).await?;
2570
2571        alice_session.close().await?;
2572
2573        assert!(alice_surb_sent > 0, "alice must've sent surbs");
2574        assert!(bob_surb_recv > 0, "bob must've received surbs");
2575        assert!(
2576            bob_surb_recv <= alice_surb_sent,
2577            "bob cannot receive more surbs than alice sent"
2578        );
2579
2580        assert!(alice_surb_used > 0, "alice must see bob used surbs");
2581        assert!(bob_surb_used > 0, "bob must've used surbs");
2582        assert!(
2583            alice_surb_used <= bob_surb_used,
2584            "alice cannot see bob used more surbs than bob actually used"
2585        );
2586
2587        tokio::time::sleep(Duration::from_millis(300)).await;
2588        assert!(matches!(
2589            alice_mgr.ping_session(alice_session.id()).await,
2590            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2591        ));
2592
2593        futures::stream::iter(ahs)
2594            .for_each(|ah| async move { ah.abort() })
2595            .await;
2596
2597        Ok(())
2598    }
2599}