Skip to main content

hopr_transport_session/
manager.rs

1use std::{
2    ops::Range,
3    sync::{Arc, OnceLock},
4    time::Duration,
5};
6
7use futures::{
8    FutureExt, SinkExt, StreamExt, TryStreamExt,
9    channel::mpsc::{Sender, UnboundedSender},
10    future::AbortHandle,
11    pin_mut,
12};
13use futures_time::future::FutureExt as TimeExt;
14use hopr_async_runtime::AbortableList;
15use hopr_crypto_random::Randomizable;
16use hopr_internal_types::prelude::HoprPseudonym;
17use hopr_network_types::prelude::*;
18use hopr_primitive_types::prelude::Address;
19use hopr_protocol_app::prelude::*;
20use hopr_protocol_start::{
21    KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished, StartInitiation,
22};
23use tracing::{debug, error, info, trace, warn};
24
25#[cfg(feature = "telemetry")]
26use crate::telemetry::{SessionLifecycleState, SessionStatsSnapshot, SessionTelemetry};
27use crate::{
28    Capability, HoprSession, IncomingSession, SESSION_MTU, SessionClientConfig, SessionId, SessionTarget,
29    SurbBalancerConfig,
30    balancer::{
31        AtomicSurbFlowEstimator, BalancerConfigFeedback, RateController, RateLimitSinkExt, SurbBalancer,
32        SurbControllerWithCorrection,
33        pid::{PidBalancerController, PidControllerGains},
34        simple::SimpleBalancerController,
35    },
36    errors::{SessionManagerError, TransportSessionError},
37    types::{ByteCapabilities, ClosureReason, HoprSessionConfig, HoprStartProtocol},
38    utils,
39    utils::insert_into_next_slot,
40};
41
42#[cfg(all(feature = "prometheus", not(test)))]
43lazy_static::lazy_static! {
44    static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
45        "hopr_session_num_active_sessions",
46        "Number of currently active HOPR sessions"
47    ).unwrap();
48    static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
49        "hopr_session_established_sessions_count",
50        "Number of sessions that were successfully established as an Exit node"
51    ).unwrap();
52    static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
53        "hopr_session_initiated_sessions_count",
54        "Number of sessions that were successfully initiated as an Entry node"
55    ).unwrap();
56    static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
57        "hopr_session_received_error_count",
58        "Number of HOPR session errors received from an Exit node",
59        &["kind"]
60    ).unwrap();
61    static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
62        "hopr_session_sent_error_count",
63        "Number of HOPR session errors sent to an Entry node",
64        &["kind"]
65    ).unwrap();
66}
67
68fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
69    debug!(?session_id, ?reason, "closing session");
70
71    #[cfg(feature = "telemetry")]
72    {
73        session_data.telemetry.set_state(SessionLifecycleState::Closed);
74        session_data.telemetry.touch_activity();
75    }
76
77    if reason != ClosureReason::EmptyRead {
78        // Closing the data sender will also cause it to close from the read side
79        session_data.session_tx.close_channel();
80        trace!(?session_id, "data tx channel closed on session");
81    }
82
83    // Terminate any additional tasks spawned by the Session
84    session_data.abort_handles.lock().abort_all();
85
86    #[cfg(all(feature = "prometheus", not(test)))]
87    METRIC_ACTIVE_SESSIONS.decrement(1.0);
88}
89
90fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
91    base * (hops as u32)
92}
93
94/// Minimum time the SURB buffer must endure if no SURBs are being produced.
95pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
96
97/// The first challenge value used in Start protocol to initiate a session.
98pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
99
100/// Maximum time to wait for counterparty to receive the target number of SURBs.
101const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
102
103/// Minimum timeout until an unfinished frame is discarded.
104const MIN_FRAME_TIMEOUT: Duration = Duration::from_millis(10);
105
106// Needs to use an UnboundedSender instead of oneshot
107// because Moka cache requires the value to be Clone, which oneshot Sender is not.
108// It also cannot be enclosed in an Arc, since calling `send` consumes the oneshot Sender.
109type SessionInitiationCache =
110    moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
111
112#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
113enum SessionTasks {
114    KeepAlive,
115    Balancer,
116}
117
118#[derive(Clone)]
119struct SessionSlot {
120    // Sender needs to be put in Arc, so that no clones are made by `moka`.
121    // This makes sure that the entire channel closes once the one and only sender is closed.
122    session_tx: Arc<UnboundedSender<ApplicationDataIn>>,
123    routing_opts: DestinationRouting,
124    abort_handles: Arc<parking_lot::Mutex<AbortableList<SessionTasks>>>,
125    #[cfg(feature = "telemetry")]
126    telemetry: Arc<SessionTelemetry>,
127    // Allows reconfiguring of the SURB balancer on-the-fly
128    // Set on both Entry and Exit sides.
129    surb_mgmt: Option<SurbBalancerConfig>,
130    // Set only on the Exit side.
131    surb_estimator: Option<AtomicSurbFlowEstimator>,
132}
133
134/// Allows forward and backward propagation of SURB management configuration changes.
135struct SessionCacheBalancerFeedback(moka::future::Cache<SessionId, SessionSlot>);
136
137#[async_trait::async_trait]
138impl BalancerConfigFeedback for SessionCacheBalancerFeedback {
139    async fn get_config(&self, id: &SessionId) -> crate::errors::Result<SurbBalancerConfig> {
140        // Intentionally using `iter().find()` instead of `get()` here,
141        // so that the popularity estimator is not hit.
142        self.0
143            .iter()
144            .find(|(sid, _)| sid.as_ref() == id)
145            .ok_or(SessionManagerError::NonExistingSession)?
146            .1
147            .surb_mgmt
148            .ok_or(SessionManagerError::Other("missing surb balancer config".into()).into())
149    }
150
151    async fn on_config_update(&self, id: &SessionId, cfg: SurbBalancerConfig) -> crate::errors::Result<()> {
152        if let moka::ops::compute::CompResult::ReplacedWith(_) = self
153            .0
154            .entry_by_ref(id)
155            .and_compute_with(|entry| {
156                futures::future::ready(match entry.map(|e| e.into_value()) {
157                    None => moka::ops::compute::Op::Nop,
158                    Some(mut updated_slot) => {
159                        if let Some(balancer_cfg) = &mut updated_slot.surb_mgmt {
160                            *balancer_cfg = cfg;
161                            moka::ops::compute::Op::Put(updated_slot)
162                        } else {
163                            moka::ops::compute::Op::Nop
164                        }
165                    }
166                })
167            })
168            .await
169        {
170            Ok(())
171        } else {
172            Err(SessionManagerError::NonExistingSession.into())
173        }
174    }
175}
176
177/// Indicates the result of processing a message.
178#[derive(Clone, Debug, PartialEq, Eq)]
179pub enum DispatchResult {
180    /// Session or Start protocol message has been processed successfully.
181    Processed,
182    /// The message was not related to Start or Session protocol.
183    Unrelated(ApplicationDataIn),
184}
185
186/// Configuration for the [`SessionManager`].
187#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
188pub struct SessionManagerConfig {
189    /// Ranges of tags available for Sessions.
190    ///
191    /// **NOTE**: If the range starts lower than [`ReservedTag`]'s range end,
192    /// it will be automatically transformed to start at this value.
193    /// This is due to the reserved range by the Start sub-protocol.
194    ///
195    /// Default is 16..1024.
196    #[doc(hidden)]
197    #[default(_code = "16u64..1024u64")]
198    pub session_tag_range: Range<u64>,
199
200    /// The maximum number of sessions (incoming and outgoing) that is allowed
201    /// to be managed by the manager.
202    ///
203    /// When reached, creating [new sessions](SessionManager::new_session) will return
204    /// the [`SessionManagerError::TooManySessions`] error, and incoming sessions will be rejected
205    /// with [`StartErrorReason::NoSlotsAvailable`] Start protocol error.
206    ///
207    /// Default is 128, minimum is 1; maximum is given by the `session_tag_range`.
208    #[default(128)]
209    pub maximum_sessions: usize,
210
211    /// The maximum chunk of data that can be written to the Session's input buffer.
212    ///
213    /// Default is 1500.
214    #[default(1500)]
215    pub frame_mtu: usize,
216
217    /// The maximum time for an incomplete frame to stay in the Session's output buffer.
218    ///
219    /// Default is 800 ms.
220    #[default(Duration::from_millis(800))]
221    pub max_frame_timeout: Duration,
222
223    /// The base timeout for initiation of Session initiation.
224    ///
225    /// The actual timeout is adjusted according to the number of hops for that Session:
226    /// `t = initiation_time_out_base * (num_forward_hops + num_return_hops + 2)`
227    ///
228    /// Default is 500 milliseconds.
229    #[default(Duration::from_millis(500))]
230    pub initiation_timeout_base: Duration,
231
232    /// Timeout for Session to be closed due to inactivity.
233    ///
234    /// Default is 180 seconds.
235    #[default(Duration::from_secs(180))]
236    pub idle_timeout: Duration,
237
238    /// The sampling interval for SURB balancer.
239    /// It will make SURB control decisions regularly at this interval.
240    ///
241    /// Default is 100 milliseconds.
242    #[default(Duration::from_millis(100))]
243    pub balancer_sampling_interval: Duration,
244
245    /// Initial packets per second egress rate on an incoming Session.
246    ///
247    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
248    ///
249    /// Default is 10 packets/second.
250    #[default(10)]
251    pub initial_return_session_egress_rate: usize,
252
253    /// Minimum period of time for which a SURB buffer at the Exit must
254    /// endure if no SURBs are being received.
255    ///
256    /// In other words, it is the minimum period of time an Exit must withstand when
257    /// no SURBs are received from the Entry at all. To do so, the egress traffic
258    /// will be shaped accordingly to meet this requirement.
259    ///
260    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
261    ///
262    /// Default is 5 seconds, minimum is 1 second.
263    #[default(Duration::from_secs(5))]
264    pub minimum_surb_buffer_duration: Duration,
265
266    /// Indicates the maximum number of SURBs in the SURB buffer to be requested when creating a new Session.
267    ///
268    /// This value is theoretically capped by the size of the global transport SURB ring buffer,
269    /// so values greater than that do not make sense. This value should be ideally set equal
270    /// to the size of the global transport SURB RB.
271    ///
272    /// Default is 10 000 SURBs.
273    #[default(10_000)]
274    pub maximum_surb_buffer_size: usize,
275
276    /// Determines if the `target_surb_buffer_size` can grow if the simple moving average of the
277    /// current SURB buffer size estimate surpasses the given threshold over the given period.
278    ///
279    /// This only applies if the used [`SurbBalancerController`] has this option.
280    ///
281    /// The default is `(60, 0.20)` (20% growth if the average SURB buffer size surpasses 20%
282    /// of the target buffer size in a 60-second window).
283    #[default(_code = "Some((Duration::from_secs(60), 0.20))")]
284    pub growable_target_surb_buffer: Option<(Duration, f64)>,
285}
286
287/// Manages lifecycles of Sessions.
288///
289/// Once the manager is [started](SessionManager::start), the [`SessionManager::dispatch_message`]
290/// should be called for each [`ApplicationData`] received by the node.
291/// This way, the `SessionManager` takes care of proper Start sub-protocol message processing
292/// and correct dispatch of Session-related packets to individual existing Sessions.
293///
294/// Secondly, the manager can initiate new outgoing sessions via [`SessionManager::new_session`],
295/// probe sessions using [`SessionManager::ping_session`]
296/// and list them via [`SessionManager::active_sessions`].
297///
298/// Since the `SessionManager` operates over the HOPR protocol,
299/// the message transport `S` is required.
300/// Such transport must also be `Clone`, since it will be cloned into all the created [`HoprSession`] objects.
301///
302/// ## SURB balancing
303/// The manager also can take care of automatic [SURB balancing](SurbBalancerConfig) per Session.
304///
305/// With each packet sent from the session initiator over to the receiving party, zero to 2 SURBs might be delivered.
306/// When the receiving party wants to send reply packets back, it must consume 1 SURB per packet. This
307/// means that if the difference between the SURBs delivered and SURBs consumed is negative, the receiving party
308/// might soon run out of SURBs. If SURBs run out, the reply packets will be dropped, causing likely quality of
309/// service degradation.
310///
311/// In an attempt to counter this effect, there are two co-existing automated modes of SURB balancing:
312/// *local SURB balancing* and *remote SURB balancing*.
313///
314/// ### Local SURB balancing
315/// Local SURB balancing is performed on the sessions that were initiated by another party (and are
316/// therefore incoming to us).
317/// The local SURB balancing mechanism continuously evaluates the rate of SURB consumption and retrieval,
318/// and if SURBs are running out, the packet egress shaping takes effect. This by itself does not
319/// avoid the depletion of SURBs but slows it down in the hope that the initiating party can deliver
320/// more SURBs over time. This might happen either organically by sending effective payloads that
321/// allow non-zero number of SURBs in the packet, or non-organically by delivering KeepAlive messages
322/// via *remote SURB balancing*.
323///
324/// The egress shaping is done automatically, unless the Session initiator sets the [`Capability::NoRateControl`]
325/// flag during Session initiation.
326///
327/// ### Remote SURB balancing
328/// Remote SURB balancing is performed by the Session initiator. The SURB balancer estimates the number of SURBs
329/// delivered to the other party, and also the number of SURBs consumed by seeing the amount of traffic received
330/// in replies.
331/// When enabled, a desired target level of SURBs at the Session counterparty is set. According to measured
332/// inflow and outflow of SURBs to/from the counterparty, the production of non-organic SURBs is started
333/// via keep-alive messages (sent to counterparty) and is controlled to maintain that target level.
334///
335/// In other words, the Session initiator tries to compensate for the usage of SURBs by the counterparty by
336/// sending new ones via the keep-alive messages.
337///
338/// This mechanism is configurable via the `surb_management` field in [`SessionClientConfig`].
339///
340/// ### Possible scenarios
341/// There are 4 different scenarios of local vs. remote SURB balancing configuration, but
342/// an equilibrium (= matching the SURB production and consumption) is most likely to be reached
343/// only when both are configured (the ideal case below):
344///
345/// #### 1. Ideal local and remote SURB balancing
346/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
347///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
348/// 2. The Session initiator (Entry) sets the [`target_surb_buffer_size`](SurbBalancerConfig) which matches the
349///    [`maximum_surb_buffer_size`](SessionManagerConfig) of the counterparty.
350/// 3. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
351///    Session.
352/// 4. The Session initiator (Entry) sets [`max_surbs_per_sec`](SurbBalancerConfig) slightly higher than the
353///    `maximum_surb_buffer_size / max_surb_buffer_duration` value configured at the counterparty.
354///
355/// In this situation, the maximum Session egress from Exit to the Entry is given by the
356/// `maximum_surb_buffer_size / max_surb_buffer_duration` ratio. If there is enough bandwidth,
357/// the (remote) SURB balancer sending SURBs to the Exit will stabilize roughly at this rate of SURBs/sec,
358/// and the whole system will be in equilibrium during the Session's lifetime (under ideal network conditions).
359///
360/// #### 2. Remote SURB balancing only
361/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
362/// 2. The Session initiator (Entry) sets `max_surbs_per_sec` and `target_surb_buffer_size` values in
363///    [`SurbBalancerConfig`]
364///
365/// In this one-sided situation, the Entry node floods the Exit node with SURBs,
366/// only based on its estimated consumption of SURBs at the Exit. The Exit's egress is not
367/// rate-limited at all. If the Exit runs out of SURBs at any point in time, it will simply drop egress packets.
368///
369/// This configuration could potentially only lead to an equilibrium
370/// when the `SurbBalancer` at the Entry can react fast enough to Exit's demand.
371///
372/// #### 3. Local SURB balancing only
373/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
374///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
375/// 2. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
376///    Session.
377/// 3. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
378///
379/// In this one-sided situation, the Entry node does not provide any additional SURBs at all (except the
380/// ones that are naturally carried by the egress packets which have space to hold SURBs). It relies
381/// only on the Session egress limiting of the Exit node.
382/// The Exit will limit the egress roughly to the rate of natural SURB occurrence in the ingress.
383///
384/// This configuration could potentially only lead to an equilibrium when uploading non-full packets
385/// (ones that can carry at least a single SURB), and the Exit's egress is limiting itself to such a rate.
386/// If Exit's egress reaches low values due to SURB scarcity, the upper layer protocols over Session might break.
387///
388/// #### 4. No SURB balancing on each side
389/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
390/// 2. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
391///
392/// In this situation, no additional SURBs are being produced by the Entry and no Session egress rate-limiting
393/// takes place at the Exit.
394///
395/// This configuration can only lead to an equilibrium when Entry sends non-full packets (ones that carry
396/// at least a single SURB) and the Exit is consuming the SURBs (Session egress) at a slower or equal rate.
397/// Such configuration is very fragile, as any disturbances in the SURB flow might lead to a packet drop
398/// at the Exit's egress.
399///
400/// ### SURB decay
401/// In a hypothetical scenario of a non-zero packet loss, the Session initiator (Entry) might send a
402/// certain number of SURBs to the Session recipient (Exit), but only a portion of it is actually delivered.
403/// The Entry has no way of knowing that and assumes that everything has been delivered.
404/// A similar problem happens when the Exit uses SURBs to construct return packets, but only a portion
405/// of those packets is actually delivered to the Entry. At this point, the Entry also subtracts
406/// fewer SURBs from its SURB estimate at the Exit.
407///
408/// In both situations, the Entry thinks there are more SURBs available at the Exit than there really are.
409///
410/// To compensate for a potential packet loss, the Entry's estimation of Exit's SURB buffer is regularly
411/// diminished by a percentage of the `target_surb_buffer_size`, even if no incoming traffic from the
412/// Exit is detected.
413///
414/// This behavior can be controlled via the `surb_decay` field of [`SurbBalancerConfig`].
415///
416/// ### Automatic `target_surb_buffer_size` increase
417/// This mechanism only applies to the Session recipient (Exit) and on Sessions without the
418/// [`Capability::NoRateControl`] flag set.
419/// In this case, the Exit throttles the Session egress based on the ratio between
420/// of the estimated SURB balance and the *hint* of Entry's `target_surb_buffer_size` set during the Session initiation.
421///
422/// However, as the Entry might [increase](SessionManager::update_surb_balancer_config) the `target_surb_buffer_size`
423/// of the Session dynamically, the new value is never hinted again to the Exit (this only happens once during
424/// the Session initiation).
425/// For this reason, the Exit then might observe the ratio going higher than 1. When this happens consistently
426/// over some given time period, the Exit can decide to increase the initial hint to the newly observed value.
427///
428/// See the `growable_target_surb_buffer` field in the [`SessionManagerConfig`] for details.
429pub struct SessionManager<S, T> {
430    session_initiations: SessionInitiationCache,
431    #[allow(clippy::type_complexity)]
432    session_notifiers: Arc<OnceLock<(T, Sender<(SessionId, ClosureReason)>)>>,
433    sessions: moka::future::Cache<SessionId, SessionSlot>,
434    msg_sender: Arc<OnceLock<S>>,
435    cfg: SessionManagerConfig,
436}
437
438impl<S, T> Clone for SessionManager<S, T> {
439    fn clone(&self) -> Self {
440        Self {
441            session_initiations: self.session_initiations.clone(),
442            session_notifiers: self.session_notifiers.clone(),
443            sessions: self.sessions.clone(),
444            cfg: self.cfg.clone(),
445            msg_sender: self.msg_sender.clone(),
446        }
447    }
448}
449
450const EXTERNAL_SEND_TIMEOUT: Duration = Duration::from_millis(200);
451
452impl<S, T> SessionManager<S, T>
453where
454    S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
455    T: futures::Sink<IncomingSession> + Clone + Send + Sync + Unpin + 'static,
456    S::Error: std::error::Error + Send + Sync + Clone + 'static,
457    T::Error: std::error::Error + Send + Sync + Clone + 'static,
458{
459    /// Creates a new instance given the [`config`](SessionManagerConfig).
460    pub fn new(mut cfg: SessionManagerConfig) -> Self {
461        let min_session_tag_range_reservation = ReservedTag::range().end;
462        debug_assert!(
463            min_session_tag_range_reservation > HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG.as_u64(),
464            "invalid tag reservation range"
465        );
466
467        // Accommodate the lower bound if too low.
468        if cfg.session_tag_range.start < min_session_tag_range_reservation {
469            let diff = min_session_tag_range_reservation - cfg.session_tag_range.start;
470            cfg.session_tag_range = min_session_tag_range_reservation..cfg.session_tag_range.end + diff;
471        }
472        cfg.maximum_sessions = cfg
473            .maximum_sessions
474            .clamp(1, (cfg.session_tag_range.end - cfg.session_tag_range.start) as usize);
475
476        // Ensure the Frame MTU is at least the size of the Session segment MTU payload
477        cfg.frame_mtu = cfg.frame_mtu.max(SESSION_MTU);
478        cfg.max_frame_timeout = cfg.max_frame_timeout.max(MIN_FRAME_TIMEOUT);
479
480        #[cfg(all(feature = "prometheus", not(test)))]
481        METRIC_ACTIVE_SESSIONS.set(0.0);
482
483        let msg_sender = Arc::new(OnceLock::new());
484        Self {
485            msg_sender: msg_sender.clone(),
486            session_initiations: moka::future::Cache::builder()
487                .max_capacity(cfg.maximum_sessions as u64)
488                .time_to_live(
489                    2 * initiation_timeout_max_one_way(
490                        cfg.initiation_timeout_base,
491                        RoutingOptions::MAX_INTERMEDIATE_HOPS,
492                    ),
493                )
494                .build(),
495            sessions: moka::future::Cache::builder()
496                .max_capacity(cfg.maximum_sessions as u64)
497                .time_to_idle(cfg.idle_timeout)
498                .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
499                    moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
500                        trace!(?session_id, ?reason, "session evicted from the cache");
501                        close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
502                    }
503                    _ => {}
504                })
505                .build(),
506            session_notifiers: Arc::new(OnceLock::new()),
507            cfg,
508        }
509    }
510
511    /// Starts the instance with the given `msg_sender` `Sink`
512    /// and a channel `new_session_notifier` used to notify when a new incoming session is opened to us.
513    ///
514    /// This method must be called prior to any calls to [`SessionManager::new_session`] or
515    /// [`SessionManager::dispatch_message`].
516    pub fn start(&self, msg_sender: S, new_session_notifier: T) -> crate::errors::Result<Vec<AbortHandle>> {
517        self.msg_sender
518            .set(msg_sender)
519            .map_err(|_| SessionManagerError::AlreadyStarted)?;
520
521        let (session_close_tx, session_close_rx) = futures::channel::mpsc::channel(self.cfg.maximum_sessions + 10);
522        self.session_notifiers
523            .set((new_session_notifier, session_close_tx))
524            .map_err(|_| SessionManagerError::AlreadyStarted)?;
525
526        let myself = self.clone();
527        let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable!(session_close_rx.for_each_concurrent(
528            None,
529            move |(session_id, closure_reason)| {
530                let myself = myself.clone();
531                async move {
532                    // These notifications come from the Sessions themselves once
533                    // an empty read is encountered, which means the closure was done by the
534                    // other party.
535                    if let Some(session_data) = myself.sessions.remove(&session_id).await {
536                        close_session(session_id, session_data, closure_reason);
537                    } else {
538                        // Do not treat this as an error
539                        debug!(
540                            ?session_id,
541                            ?closure_reason,
542                            "could not find session id to close, maybe the session is already closed"
543                        );
544                    }
545                }
546            },
547        ));
548
549        // This is necessary to evict expired entries from the caches if
550        // no session-related operations happen at all.
551        // This ensures the dangling expired sessions are properly closed
552        // and their closure is timely notified to the other party.
553        let myself = self.clone();
554        let ah_session_expiration = hopr_async_runtime::spawn_as_abortable!(async move {
555            let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
556            let timeout = 2 * initiation_timeout_max_one_way(
557                myself.cfg.initiation_timeout_base,
558                RoutingOptions::MAX_INTERMEDIATE_HOPS,
559            )
560            .min(myself.cfg.idle_timeout)
561            .mul_f64(jitter)
562                / 2;
563            futures_time::stream::interval(timeout.into())
564                .for_each(|_| {
565                    trace!("executing session cache evictions");
566                    futures::future::join(
567                        myself.sessions.run_pending_tasks(),
568                        myself.session_initiations.run_pending_tasks(),
569                    )
570                    .map(|_| ())
571                })
572                .await;
573        });
574
575        Ok(vec![ah_closure_notifications, ah_session_expiration])
576    }
577
578    /// Check if [`start`](SessionManager::start) has been called and the instance is running.
579    pub fn is_started(&self) -> bool {
580        self.session_notifiers.get().is_some()
581    }
582
583    async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
584        // We currently do not support loopback Sessions on ourselves.
585        if let moka::ops::compute::CompResult::Inserted(_) = self
586            .sessions
587            .entry(session_id)
588            .and_compute_with(|entry| {
589                futures::future::ready(if entry.is_none() {
590                    moka::ops::compute::Op::Put(slot)
591                } else {
592                    moka::ops::compute::Op::Nop
593                })
594            })
595            .await
596        {
597            #[cfg(all(feature = "prometheus", not(test)))]
598            {
599                METRIC_NUM_INITIATED_SESSIONS.increment();
600                METRIC_ACTIVE_SESSIONS.increment(1.0);
601            }
602
603            Ok(())
604        } else {
605            // Session already exists; it means it is most likely a loopback attempt
606            error!(%session_id, "session already exists - loopback attempt");
607            Err(SessionManagerError::Loopback.into())
608        }
609    }
610
611    /// Initiates a new outgoing Session to `destination` with the given configuration.
612    ///
613    /// If the Session's counterparty does not respond within
614    /// the [configured](SessionManagerConfig) period,
615    /// this method returns [`TransportSessionError::Timeout`].
616    ///
617    /// It will also fail if the instance has not been [started](SessionManager::start).
618    pub async fn new_session(
619        &self,
620        destination: Address,
621        target: SessionTarget,
622        cfg: SessionClientConfig,
623    ) -> crate::errors::Result<HoprSession> {
624        self.sessions.run_pending_tasks().await;
625        if self.cfg.maximum_sessions <= self.sessions.entry_count() as usize {
626            return Err(SessionManagerError::TooManySessions.into());
627        }
628
629        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
630
631        let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
632        let (challenge, _) = insert_into_next_slot(
633            &self.session_initiations,
634            |ch| {
635                if let Some(challenge) = ch {
636                    ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
637                } else {
638                    hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
639                }
640            },
641            |_| tx_initiation_done,
642        )
643        .await
644        .ok_or(SessionManagerError::NoChallengeSlots)?; // almost impossible with u64
645
646        // Prepare the session initiation message in the Start protocol
647        trace!(challenge, ?cfg, "initiating session with config");
648        let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
649            challenge,
650            target,
651            capabilities: ByteCapabilities(cfg.capabilities),
652            additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
653                cfg.surb_management
654                    .map(|c| c.target_surb_buffer_size)
655                    .unwrap_or(
656                        self.cfg.initial_return_session_egress_rate as u64
657                            * self
658                                .cfg
659                                .minimum_surb_buffer_duration
660                                .max(MIN_SURB_BUFFER_DURATION)
661                                .as_secs(),
662                    )
663                    .min(u32::MAX as u64) as u32
664            } else {
665                0
666            },
667        });
668
669        let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
670        let forward_routing = DestinationRouting::Forward {
671            destination: Box::new(destination.into()),
672            pseudonym: Some(pseudonym), // Session must use a fixed pseudonym already
673            forward_options: cfg.forward_path_options.clone(),
674            return_options: cfg.return_path_options.clone().into(),
675        };
676
677        // Send the Session initiation message
678        info!(challenge, %pseudonym, %destination, "new session request");
679        msg_sender
680            .send((
681                forward_routing.clone(),
682                ApplicationDataOut::with_no_packet_info(start_session_msg.try_into()?),
683            ))
684            .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
685            .await
686            .map_err(|_| {
687                error!(challenge, %pseudonym, %destination, "timeout sending session request message");
688                TransportSessionError::Timeout
689            })?
690            .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?;
691
692        // The timeout is given by the number of hops requested
693        let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
694            self.cfg.initiation_timeout_base,
695            cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
696        )
697        .into();
698
699        // Await session establishment response from the Exit node or timeout
700        pin_mut!(rx_initiation_done);
701
702        trace!(challenge, "awaiting session establishment");
703        match rx_initiation_done.try_next().timeout(initiation_timeout).await {
704            Ok(Ok(Some(est))) => {
705                // Session has been established, construct it
706                let session_id = est.session_id;
707                debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
708
709                let (tx, rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
710                let notifier = self
711                    .session_notifiers
712                    .get()
713                    .map(|(_, notifier)| {
714                        let mut notifier = notifier.clone();
715                        Box::new(move |session_id: SessionId, reason: ClosureReason| {
716                            let _ = notifier
717                                .try_send((session_id, reason))
718                                .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
719                        })
720                    })
721                    .ok_or(SessionManagerError::NotStarted)?;
722
723                #[cfg(feature = "telemetry")]
724                let metrics = Arc::new(SessionTelemetry::new(
725                    session_id,
726                    HoprSessionConfig {
727                        capabilities: cfg.capabilities,
728                        frame_mtu: self.cfg.frame_mtu,
729                        frame_timeout: self.cfg.max_frame_timeout,
730                    },
731                ));
732
733                // NOTE: the Exit node can have different `max_surb_buffer_size`
734                // setting on the Session manager, so it does not make sense to cap it here
735                // with our maximum value.
736                if let Some(balancer_config) = cfg.surb_management {
737                    let surb_estimator = AtomicSurbFlowEstimator::default();
738
739                    #[cfg(feature = "telemetry")]
740                    metrics.set_surb_estimator(surb_estimator.clone(), balancer_config.target_surb_buffer_size);
741
742                    // Sender responsible for keep-alive and Session data will be counting produced SURBs
743                    let surb_estimator_clone = surb_estimator.clone();
744                    let full_surb_scoring_sender =
745                        msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
746                            // Count how many SURBs we sent with each packet
747                            surb_estimator_clone.produced.fetch_add(
748                                data.estimate_surbs_with_msg() as u64,
749                                std::sync::atomic::Ordering::Relaxed,
750                            );
751                            futures::future::ok::<_, S::Error>((routing, data))
752                        });
753
754                    // For standard Session data we first reduce the number of SURBs we want to produce,
755                    // unless requested to always max them out
756                    let max_out_organic_surbs = cfg.always_max_out_surbs;
757                    let reduced_surb_scoring_sender = full_surb_scoring_sender.clone().with(
758                        // NOTE: this is put in-front of the `full_surb_scoring_sender`,
759                        // so that its estimate of SURBs gets automatically updated based on
760                        // the `max_surbs_in_packets` set here.
761                        move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
762                            if !max_out_organic_surbs {
763                                // TODO: make this dynamic to honor the balancer target (#7439)
764                                data.packet_info
765                                    .get_or_insert_with(|| OutgoingPacketInfo {
766                                        max_surbs_in_packet: 1,
767                                        ..Default::default()
768                                    })
769                                    .max_surbs_in_packet = 1;
770                            }
771                            futures::future::ok::<_, S::Error>((routing, data))
772                        },
773                    );
774
775                    let mut abort_handles = AbortableList::default();
776
777                    // Spawn the SURB-bearing keep alive stream
778                    let (ka_controller, ka_abort_handle) =
779                        utils::spawn_keep_alive_stream(session_id, full_surb_scoring_sender, forward_routing.clone());
780                    abort_handles.insert(SessionTasks::KeepAlive, ka_abort_handle);
781
782                    #[cfg(feature = "telemetry")]
783                    metrics.set_refill_in_flight(true);
784
785                    // Spawn the SURB balancer, which will decide on the initial SURB rate.
786                    debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
787                    let balancer = SurbBalancer::new(
788                        session_id,
789                        // The setpoint and output limit is immediately reconfigured by SurbBalancer
790                        PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
791                        surb_estimator.clone(),
792                        ka_controller,
793                        balancer_config,
794                    );
795
796                    let (level_stream, balancer_abort_handle) = balancer.start_control_loop(
797                        self.cfg.balancer_sampling_interval,
798                        SessionCacheBalancerFeedback(self.sessions.clone()),
799                        None,
800                    );
801                    abort_handles.insert(SessionTasks::Balancer, balancer_abort_handle);
802
803                    // If the insertion fails prematurely, it will also kill all the abort handles
804                    self.insert_session_slot(
805                        session_id,
806                        SessionSlot {
807                            session_tx: Arc::new(tx),
808                            routing_opts: forward_routing.clone(),
809                            abort_handles: Arc::new(parking_lot::Mutex::new(abort_handles)),
810                            surb_mgmt: Some(balancer_config),
811                            surb_estimator: None, // Outgoing sessions do not have SURB estimator
812                            #[cfg(feature = "telemetry")]
813                            telemetry: metrics.clone(),
814                        },
815                    )
816                    .await?;
817
818                    #[cfg(feature = "telemetry")]
819                    metrics.set_state(SessionLifecycleState::Active);
820
821                    // Wait for enough SURBs to be sent to the counterparty
822                    // TODO: consider making this interactive = other party reports the exact level periodically
823                    match level_stream
824                        .skip_while(|current_level| {
825                            futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
826                        })
827                        .next()
828                        .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
829                        .await
830                    {
831                        Ok(Some(surb_level)) => {
832                            info!(%session_id, surb_level, "session is ready");
833                        }
834                        Ok(None) => {
835                            return Err(
836                                SessionManagerError::Other("surb balancer was cancelled prematurely".into()).into(),
837                            );
838                        }
839                        Err(_) => {
840                            warn!(%session_id, "session didn't reach target SURB buffer size in time");
841                        }
842                    }
843
844                    HoprSession::new(
845                        session_id,
846                        forward_routing,
847                        HoprSessionConfig {
848                            capabilities: cfg.capabilities,
849                            frame_mtu: self.cfg.frame_mtu,
850                            frame_timeout: self.cfg.max_frame_timeout,
851                        },
852                        (
853                            reduced_surb_scoring_sender,
854                            rx.inspect(move |_| {
855                                // Received packets = SURB consumption estimate
856                                // The received packets always consume a single SURB.
857                                surb_estimator
858                                    .consumed
859                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
860                            }),
861                        ),
862                        Some(notifier),
863                        #[cfg(feature = "telemetry")]
864                        metrics,
865                    )
866                } else {
867                    warn!(%session_id, "session ready without SURB balancing");
868
869                    self.insert_session_slot(
870                        session_id,
871                        SessionSlot {
872                            session_tx: Arc::new(tx),
873                            routing_opts: forward_routing.clone(),
874                            abort_handles: Default::default(),
875                            surb_mgmt: None,
876                            surb_estimator: None,
877                            #[cfg(feature = "telemetry")]
878                            telemetry: metrics.clone(),
879                        },
880                    )
881                    .await?;
882                    #[cfg(feature = "telemetry")]
883                    metrics.set_state(SessionLifecycleState::Active);
884
885                    // For standard Session data we first reduce the number of SURBs we want to produce,
886                    // unless requested to always max them out
887                    let max_out_organic_surbs = cfg.always_max_out_surbs;
888                    let reduced_surb_sender =
889                        msg_sender.with(move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
890                            if !max_out_organic_surbs {
891                                data.packet_info
892                                    .get_or_insert_with(|| OutgoingPacketInfo {
893                                        max_surbs_in_packet: 1,
894                                        ..Default::default()
895                                    })
896                                    .max_surbs_in_packet = 1;
897                            }
898                            futures::future::ok::<_, S::Error>((routing, data))
899                        });
900
901                    HoprSession::new(
902                        session_id,
903                        forward_routing,
904                        HoprSessionConfig {
905                            capabilities: cfg.capabilities,
906                            frame_mtu: self.cfg.frame_mtu,
907                            frame_timeout: self.cfg.max_frame_timeout,
908                        },
909                        (reduced_surb_sender, rx),
910                        Some(notifier),
911                        #[cfg(feature = "telemetry")]
912                        metrics,
913                    )
914                }
915            }
916            Ok(Ok(None)) => Err(SessionManagerError::Other(
917                "internal error: sender has been closed without completing the session establishment".into(),
918            )
919            .into()),
920            Ok(Err(error)) => {
921                // The other side did not allow us to establish a session
922                error!(
923                    challenge = error.challenge,
924                    ?error,
925                    "the other party rejected the session initiation with error"
926                );
927                Err(TransportSessionError::Rejected(error.reason))
928            }
929            Err(_) => {
930                // Timeout waiting for a session establishment
931                error!(challenge, "session initiation attempt timed out");
932
933                #[cfg(all(feature = "prometheus", not(test)))]
934                METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
935
936                Err(TransportSessionError::Timeout)
937            }
938        }
939    }
940
941    /// Sends a keep-alive packet with the given [`SessionId`].
942    ///
943    /// This currently "fires & forgets" and does not expect nor await any "pong" response.
944    pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
945        if let Some(session_data) = self.sessions.get(id).await {
946            trace!(session_id = ?id, "pinging manually session");
947            Ok(self
948                .msg_sender
949                .get()
950                .cloned()
951                .ok_or(SessionManagerError::NotStarted)?
952                .send((
953                    session_data.routing_opts.clone(),
954                    ApplicationDataOut::with_no_packet_info(HoprStartProtocol::KeepAlive((*id).into()).try_into()?),
955                ))
956                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
957                .await
958                .map_err(|_| {
959                    error!("timeout sending session ping message");
960                    TransportSessionError::Timeout
961                })?
962                .map_err(|e| TransportSessionError::PacketSendingError(e.to_string()))?)
963        } else {
964            Err(SessionManagerError::NonExistingSession.into())
965        }
966    }
967
968    /// Returns [`SessionIds`](SessionId) of all currently active sessions.
969    pub async fn active_sessions(&self) -> Vec<SessionId> {
970        self.sessions.run_pending_tasks().await;
971        self.sessions.iter().map(|(k, _)| *k).collect()
972    }
973
974    /// Updates the configuration of the SURB balancer on the given [`SessionId`].
975    ///
976    /// Returns an error if the Session with the given `id` does not exist, or
977    /// if it does not use SURB balancing.
978    pub async fn update_surb_balancer_config(
979        &self,
980        id: &SessionId,
981        config: SurbBalancerConfig,
982    ) -> crate::errors::Result<()> {
983        match self
984            .sessions
985            .entry_by_ref(id)
986            .and_compute_with(|entry| {
987                futures::future::ready(if let Some(mut cached_session) = entry.map(|e| e.into_value()) {
988                    // Only update the config if there already was one before
989                    if cached_session.surb_mgmt.is_some() {
990                        cached_session.surb_mgmt = Some(config);
991                        moka::ops::compute::Op::Put(cached_session)
992                    } else {
993                        moka::ops::compute::Op::Nop
994                    }
995                } else {
996                    moka::ops::compute::Op::Nop
997                })
998            })
999            .await
1000        {
1001            moka::ops::compute::CompResult::ReplacedWith(_) => Ok(()),
1002            moka::ops::compute::CompResult::Unchanged(_) => {
1003                Err(SessionManagerError::Other("session does not use SURB balancing".into()).into())
1004            }
1005            _ => Err(SessionManagerError::NonExistingSession.into()),
1006        }
1007    }
1008
1009    /// Retrieves the configuration of SURB balancing for the given Session.
1010    ///
1011    /// Returns an error if the Session with the given `id` does not exist.
1012    pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
1013        match self.sessions.get(id).await {
1014            Some(session) => Ok(session.surb_mgmt),
1015            None => Err(SessionManagerError::NonExistingSession.into()),
1016        }
1017    }
1018
1019    /// Retrieves useful statistics of the given [session](HoprSession).
1020    ///
1021    /// Returns an error if the Session with the given `id` does not exist.
1022    #[cfg(feature = "telemetry")]
1023    pub async fn get_session_stats(&self, id: &SessionId) -> crate::errors::Result<SessionStatsSnapshot> {
1024        match self.sessions.get(id).await {
1025            Some(session) => Ok(session.telemetry.snapshot()),
1026            None => Err(SessionManagerError::NonExistingSession.into()),
1027        }
1028    }
1029
1030    /// The main method to be called whenever data are received.
1031    ///
1032    /// It tries to recognize the message and correctly dispatches either
1033    /// the Session protocol or Start protocol messages.
1034    ///
1035    /// If the data are not recognized, they are returned as [`DispatchResult::Unrelated`].
1036    pub async fn dispatch_message(
1037        &self,
1038        pseudonym: HoprPseudonym,
1039        in_data: ApplicationDataIn,
1040    ) -> crate::errors::Result<DispatchResult> {
1041        if in_data.data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
1042            // This is a Start protocol message, so we handle it
1043            trace!("dispatching Start protocol message");
1044            return self
1045                .handle_start_protocol_message(pseudonym, in_data)
1046                .await
1047                .map(|_| DispatchResult::Processed);
1048        } else if self
1049            .cfg
1050            .session_tag_range
1051            .contains(&in_data.data.application_tag.as_u64())
1052        {
1053            let session_id = SessionId::new(in_data.data.application_tag, pseudonym);
1054
1055            return if let Some(session_data) = self.sessions.get(&session_id).await {
1056                trace!(?session_id, "received data for a registered session");
1057
1058                Ok(session_data
1059                    .session_tx
1060                    .unbounded_send(in_data)
1061                    .map(|_| DispatchResult::Processed)
1062                    .map_err(|e| SessionManagerError::Other(e.to_string()))?)
1063            } else {
1064                error!(%session_id, "received data from an unestablished session");
1065                Err(TransportSessionError::UnknownData)
1066            };
1067        }
1068
1069        trace!(tag = %in_data.data.application_tag, "received data not associated with session protocol or any existing session");
1070        Ok(DispatchResult::Unrelated(in_data))
1071    }
1072
1073    async fn handle_incoming_session_initiation(
1074        &self,
1075        pseudonym: HoprPseudonym,
1076        session_req: StartInitiation<SessionTarget, ByteCapabilities>,
1077    ) -> crate::errors::Result<()> {
1078        trace!(challenge = session_req.challenge, "received session initiation request");
1079
1080        debug!(%pseudonym, "got new session request, searching for a free session slot");
1081
1082        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
1083
1084        let (mut new_session_notifier, mut close_session_notifier) = self
1085            .session_notifiers
1086            .get()
1087            .cloned()
1088            .ok_or(SessionManagerError::NotStarted)?;
1089
1090        // Reply routing uses SURBs only with the pseudonym of this Session's ID
1091        let reply_routing = DestinationRouting::Return(pseudonym.into());
1092
1093        let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1094
1095        // Search for a free Session ID slot
1096        self.sessions.run_pending_tasks().await; // Needed so that entry_count is updated
1097        let allocated_slot = if self.cfg.maximum_sessions > self.sessions.entry_count() as usize {
1098            insert_into_next_slot(
1099                &self.sessions,
1100                |sid| {
1101                    // NOTE: It is allowed to insert sessions using the same tag
1102                    // but different pseudonyms because the SessionId is different.
1103                    let next_tag: Tag = match sid {
1104                        Some(session_id) => ((session_id.tag().as_u64() + 1) % self.cfg.session_tag_range.end)
1105                            .max(self.cfg.session_tag_range.start)
1106                            .into(),
1107                        None => hopr_crypto_random::random_integer(
1108                            self.cfg.session_tag_range.start,
1109                            Some(self.cfg.session_tag_range.end),
1110                        )
1111                        .into(),
1112                    };
1113                    SessionId::new(next_tag, pseudonym)
1114                },
1115                |_sid| SessionSlot {
1116                    session_tx: Arc::new(tx_session_data),
1117                    routing_opts: reply_routing.clone(),
1118                    abort_handles: Default::default(),
1119                    surb_mgmt: None,
1120                    surb_estimator: None,
1121                    #[cfg(feature = "telemetry")]
1122                    telemetry: SessionTelemetry::new(
1123                        _sid,
1124                        HoprSessionConfig {
1125                            capabilities: session_req.capabilities.0,
1126                            frame_mtu: self.cfg.frame_mtu,
1127                            frame_timeout: self.cfg.max_frame_timeout,
1128                        },
1129                    )
1130                    .into(),
1131                },
1132            )
1133            .await
1134        } else {
1135            error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
1136            None
1137        };
1138
1139        // If some of the following code throws an error, the allocated slot will be kept
1140        // but will be later re-claimed when it expires.
1141        if let Some((session_id, _slot)) = allocated_slot {
1142            debug!(%session_id, ?session_req, "assigned a new session");
1143
1144            #[cfg(feature = "telemetry")]
1145            let stats = _slot.telemetry;
1146
1147            let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
1148                if let Err(error) = close_session_notifier.try_send((session_id, reason)) {
1149                    error!(%session_id, %error, %reason, "failed to notify session closure");
1150                }
1151            });
1152
1153            let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
1154                let surb_estimator = AtomicSurbFlowEstimator::default();
1155
1156                // Because of SURB scarcity, control the egress rate of incoming sessions
1157                let egress_rate_control =
1158                    RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1159
1160                // The Session request carries a "hint" as additional data telling what
1161                // the Session initiator has configured as its target buffer size in the Balancer.
1162                let target_surb_buffer_size = if session_req.additional_data > 0 {
1163                    (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1164                } else {
1165                    self.cfg.initial_return_session_egress_rate as u64
1166                        * self
1167                            .cfg
1168                            .minimum_surb_buffer_duration
1169                            .max(MIN_SURB_BUFFER_DURATION)
1170                            .as_secs()
1171                };
1172                #[cfg(feature = "telemetry")]
1173                stats.set_surb_estimator(surb_estimator.clone(), target_surb_buffer_size);
1174
1175                let surb_estimator_clone = surb_estimator.clone();
1176                let session = HoprSession::new(
1177                    session_id,
1178                    reply_routing.clone(),
1179                    HoprSessionConfig {
1180                        capabilities: session_req.capabilities.into(),
1181                        frame_mtu: self.cfg.frame_mtu,
1182                        frame_timeout: self.cfg.max_frame_timeout,
1183                    },
1184                    (
1185                        // Sent packets = SURB consumption estimate
1186                        msg_sender
1187                            .clone()
1188                            .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1189                                // Each outgoing packet consumes one SURB
1190                                surb_estimator_clone
1191                                    .consumed
1192                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1193                                futures::future::ok::<_, S::Error>((routing, data))
1194                            })
1195                            .rate_limit_with_controller(&egress_rate_control)
1196                            .buffer((2 * target_surb_buffer_size) as usize),
1197                        // Received packets = SURB retrieval estimate
1198                        rx_session_data.inspect(move |data| {
1199                            // Count the number of SURBs delivered with each incoming packet
1200                            surb_estimator_clone
1201                                .produced
1202                                .fetch_add(data.num_surbs_with_msg() as u64, std::sync::atomic::Ordering::Relaxed);
1203                        }),
1204                    ),
1205                    Some(closure_notifier),
1206                    #[cfg(feature = "telemetry")]
1207                    stats.clone(),
1208                )?;
1209
1210                // The SURB balancer will start intervening by rate-limiting the
1211                // egress of the Session, once the estimated number of SURBs drops below
1212                // the target defined here. Otherwise, the maximum egress is allowed.
1213                let balancer_config = SurbBalancerConfig {
1214                    target_surb_buffer_size,
1215                    // At maximum egress, the SURB buffer drains in `minimum_surb_buffer_duration` seconds
1216                    max_surbs_per_sec: target_surb_buffer_size
1217                        / self
1218                            .cfg
1219                            .minimum_surb_buffer_duration
1220                            .max(MIN_SURB_BUFFER_DURATION)
1221                            .as_secs(),
1222                    // No SURB decay at the Exit, since we know almost exactly how many SURBs
1223                    // were received
1224                    surb_decay: None,
1225                };
1226
1227                // Assign the SURB balancer and abort handles to the already allocated Session slot
1228                let (balancer_abort_handle, balancer_abort_reg) = AbortHandle::new_pair();
1229                if let moka::ops::compute::CompResult::ReplacedWith(_) = self
1230                    .sessions
1231                    .entry(session_id)
1232                    .and_compute_with(|entry| {
1233                        if let Some(mut cached_session) = entry.map(|c| c.into_value()) {
1234                            cached_session
1235                                .abort_handles
1236                                .lock()
1237                                .insert(SessionTasks::Balancer, balancer_abort_handle);
1238                            cached_session.surb_mgmt = Some(balancer_config);
1239                            cached_session.surb_estimator = Some(surb_estimator.clone());
1240                            #[cfg(feature = "telemetry")]
1241                            {
1242                                cached_session.telemetry = stats.clone();
1243                            }
1244                            futures::future::ready(moka::ops::compute::Op::Put(cached_session))
1245                        } else {
1246                            futures::future::ready(moka::ops::compute::Op::Nop)
1247                        }
1248                    })
1249                    .await
1250                {
1251                    // Spawn the SURB balancer only once we know we have registered the
1252                    // abort handle with the pre-allocated Session slot
1253                    debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1254                    let balancer = SurbBalancer::new(
1255                        session_id,
1256                        if let Some((growth_window, ratio_threshold)) = self.cfg.growable_target_surb_buffer.as_ref() {
1257                            // Allow automatic setpoint increase
1258                            SimpleBalancerController::with_increasing_setpoint(
1259                                *ratio_threshold,
1260                                (growth_window
1261                                    .div_duration_f64(self.cfg.balancer_sampling_interval)
1262                                    .round() as usize)
1263                                    .max(1),
1264                            )
1265                        } else {
1266                            SimpleBalancerController::default()
1267                        },
1268                        surb_estimator,
1269                        SurbControllerWithCorrection(egress_rate_control, 1), // 1 SURB per egress packet
1270                        balancer_config,
1271                    );
1272
1273                    let _ = balancer.start_control_loop(
1274                        self.cfg.balancer_sampling_interval,
1275                        SessionCacheBalancerFeedback(self.sessions.clone()),
1276                        Some(balancer_abort_reg),
1277                    );
1278                } else {
1279                    // This should never happen, but be sure we handle this error
1280                    return Err(SessionManagerError::Other(
1281                        "failed to spawn SURB balancer - inconsistent cache".into(),
1282                    )
1283                    .into());
1284                }
1285
1286                session
1287            } else {
1288                HoprSession::new(
1289                    session_id,
1290                    reply_routing.clone(),
1291                    HoprSessionConfig {
1292                        capabilities: session_req.capabilities.into(),
1293                        frame_mtu: self.cfg.frame_mtu,
1294                        frame_timeout: self.cfg.max_frame_timeout,
1295                    },
1296                    (msg_sender.clone(), rx_session_data),
1297                    Some(closure_notifier),
1298                    #[cfg(feature = "telemetry")]
1299                    stats,
1300                )?
1301            };
1302
1303            // Extract useful information about the session from the Start protocol message
1304            let incoming_session = IncomingSession {
1305                session,
1306                target: session_req.target,
1307            };
1308
1309            // Notify that a new incoming session has been created
1310            match new_session_notifier
1311                .send(incoming_session)
1312                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1313                .await
1314            {
1315                Err(_) => {
1316                    error!(%session_id, "timeout to notify about new incoming session");
1317                    return Err(TransportSessionError::Timeout);
1318                }
1319                Ok(Err(error)) => {
1320                    error!(%session_id, %error, "failed to notify about new incoming session");
1321                    return Err(SessionManagerError::Other(error.to_string()).into());
1322                }
1323                _ => {}
1324            };
1325
1326            trace!(?session_id, "session notification sent");
1327
1328            // Notify the sender that the session has been established.
1329            // Set our peer ID in the session ID sent back to them.
1330            let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1331                orig_challenge: session_req.challenge,
1332                session_id,
1333            });
1334
1335            msg_sender
1336                .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1337                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1338                .await
1339                .map_err(|_| {
1340                    error!(%session_id, "timeout sending session establishment message");
1341                    TransportSessionError::Timeout
1342                })?
1343                .map_err(|e| {
1344                    SessionManagerError::Other(format!(
1345                        "failed to send session {session_id} establishment message: {e}"
1346                    ))
1347                })?;
1348
1349            info!(%session_id, "new session established");
1350
1351            #[cfg(all(feature = "prometheus", not(test)))]
1352            {
1353                METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1354                METRIC_ACTIVE_SESSIONS.increment(1.0);
1355            }
1356        } else {
1357            error!(%pseudonym,"failed to reserve a new session slot");
1358
1359            // Notify the sender that the session could not be established
1360            let reason = StartErrorReason::NoSlotsAvailable;
1361            let data = HoprStartProtocol::SessionError(StartErrorType {
1362                challenge: session_req.challenge,
1363                reason,
1364            });
1365
1366            msg_sender
1367                .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1368                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1369                .await
1370                .map_err(|_| {
1371                    error!("timeout sending session error message");
1372                    TransportSessionError::Timeout
1373                })?
1374                .map_err(|e| {
1375                    SessionManagerError::Other(format!("failed to send session establishment error message: {e}"))
1376                })?;
1377
1378            trace!(%pseudonym, "session establishment failure message sent");
1379
1380            #[cfg(all(feature = "prometheus", not(test)))]
1381            METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1382        }
1383
1384        Ok(())
1385    }
1386
1387    async fn handle_start_protocol_message(
1388        &self,
1389        pseudonym: HoprPseudonym,
1390        data: ApplicationDataIn,
1391    ) -> crate::errors::Result<()> {
1392        match HoprStartProtocol::try_from(data.data)? {
1393            HoprStartProtocol::StartSession(session_req) => {
1394                self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1395            }
1396            HoprStartProtocol::SessionEstablished(est) => {
1397                trace!(
1398                    session_id = ?est.session_id,
1399                    "received session establishment confirmation"
1400                );
1401                let challenge = est.orig_challenge;
1402                let session_id = est.session_id;
1403                if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1404                    if let Err(e) = tx_est.unbounded_send(Ok(est)) {
1405                        return Err(SessionManagerError::Other(format!(
1406                            "could not notify session {session_id} establishment: {e}"
1407                        ))
1408                        .into());
1409                    }
1410                    debug!(?session_id, challenge, "session establishment complete");
1411                } else {
1412                    error!(%session_id, challenge, "unknown session establishment attempt or expired");
1413                }
1414            }
1415            HoprStartProtocol::SessionError(error) => {
1416                trace!(
1417                    challenge = error.challenge,
1418                    error = ?error.reason,
1419                    "failed to initialize a session",
1420                );
1421                // Currently, we do not distinguish between individual error types
1422                // and just discard the initiation attempt and pass on the error.
1423                if let Some(tx_est) = self.session_initiations.remove(&error.challenge).await {
1424                    if let Err(e) = tx_est.unbounded_send(Err(error)) {
1425                        return Err(SessionManagerError::Other(format!(
1426                            "could not notify session establishment error {error:?}: {e}"
1427                        ))
1428                        .into());
1429                    }
1430                    error!(
1431                        challenge = error.challenge,
1432                        ?error,
1433                        "session establishment error received"
1434                    );
1435                } else {
1436                    error!(
1437                        challenge = error.challenge,
1438                        ?error,
1439                        "session establishment attempt expired before error could be delivered"
1440                    );
1441                }
1442
1443                #[cfg(all(feature = "prometheus", not(test)))]
1444                METRIC_RECEIVED_SESSION_ERRS.increment(&[&error.reason.to_string()])
1445            }
1446            HoprStartProtocol::KeepAlive(msg) => {
1447                let session_id = msg.session_id;
1448                if let Some(session_slot) = self.sessions.get(&session_id).await {
1449                    trace!(?session_id, "received keep-alive request");
1450                    // If the session has a SURB flow estimator, increase the number of received SURBs.
1451                    // This applies to the incoming sessions currently
1452                    if let Some(estimator) = session_slot.surb_estimator.as_ref() {
1453                        // Two SURBs per Keep-Alive message
1454                        estimator.produced.fetch_add(
1455                            KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64,
1456                            std::sync::atomic::Ordering::Relaxed,
1457                        );
1458                    }
1459                } else {
1460                    debug!(%session_id, "received keep-alive request for an unknown session");
1461                }
1462            }
1463        }
1464
1465        Ok(())
1466    }
1467}
1468
1469#[cfg(test)]
1470mod tests {
1471    use anyhow::anyhow;
1472    use futures::{AsyncWriteExt, future::BoxFuture};
1473    use hopr_crypto_random::Randomizable;
1474    use hopr_crypto_types::{keypairs::ChainKeypair, prelude::Keypair};
1475    use hopr_primitive_types::prelude::Address;
1476    use hopr_protocol_start::StartProtocolDiscriminants;
1477    use tokio::time::timeout;
1478
1479    use super::*;
1480    use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1481
1482    #[async_trait::async_trait]
1483    trait SendMsg {
1484        async fn send_message(
1485            &self,
1486            routing: DestinationRouting,
1487            data: ApplicationDataOut,
1488        ) -> crate::errors::Result<()>;
1489    }
1490
1491    mockall::mock! {
1492        MsgSender {}
1493        impl SendMsg for MsgSender {
1494            fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationDataOut)
1495            -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1496        }
1497    }
1498
1499    fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationDataOut)> {
1500        let (tx, rx) = futures::channel::mpsc::unbounded();
1501        tokio::task::spawn(async move {
1502            pin_mut!(rx);
1503            while let Some((routing, data)) = rx.next().await {
1504                sender
1505                    .send_message(routing, data)
1506                    .await
1507                    .expect("send message must not fail in mock");
1508            }
1509        });
1510        tx
1511    }
1512
1513    fn msg_type(data: &ApplicationDataOut, expected: StartProtocolDiscriminants) -> bool {
1514        HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1515            .map(|d| StartProtocolDiscriminants::from(d) == expected)
1516            .unwrap_or(false)
1517    }
1518
1519    #[test_log::test(tokio::test)]
1520    async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1521    {
1522        let alice_pseudonym = HoprPseudonym::random();
1523        let bob_peer: Address = (&ChainKeypair::random()).into();
1524
1525        let alice_mgr = SessionManager::new(Default::default());
1526        let bob_mgr = SessionManager::new(Default::default());
1527
1528        let mut sequence = mockall::Sequence::new();
1529        let mut alice_transport = MockMsgSender::new();
1530        let mut bob_transport = MockMsgSender::new();
1531
1532        // Alice sends the StartSession message
1533        let bob_mgr_clone = bob_mgr.clone();
1534        alice_transport
1535            .expect_send_message()
1536            .once()
1537            .in_sequence(&mut sequence)
1538            .withf(move |peer, data| {
1539                info!("alice sends {}", data.data.application_tag);
1540                msg_type(data, StartProtocolDiscriminants::StartSession)
1541                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1542            })
1543            .returning(move |_, data| {
1544                let bob_mgr_clone = bob_mgr_clone.clone();
1545                Box::pin(async move {
1546                    bob_mgr_clone
1547                        .dispatch_message(
1548                            alice_pseudonym,
1549                            ApplicationDataIn {
1550                                data: data.data,
1551                                packet_info: Default::default(),
1552                            },
1553                        )
1554                        .await?;
1555                    Ok(())
1556                })
1557            });
1558
1559        // Bob sends the SessionEstablished message
1560        let alice_mgr_clone = alice_mgr.clone();
1561        bob_transport
1562            .expect_send_message()
1563            .once()
1564            .in_sequence(&mut sequence)
1565            .withf(move |peer, data| {
1566                info!("bob sends {}", data.data.application_tag);
1567                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1568                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1569            })
1570            .returning(move |_, data| {
1571                let alice_mgr_clone = alice_mgr_clone.clone();
1572
1573                Box::pin(async move {
1574                    alice_mgr_clone
1575                        .dispatch_message(
1576                            alice_pseudonym,
1577                            ApplicationDataIn {
1578                                data: data.data,
1579                                packet_info: Default::default(),
1580                            },
1581                        )
1582                        .await?;
1583                    Ok(())
1584                })
1585            });
1586
1587        // Alice sends the terminating segment to close the Session
1588        let bob_mgr_clone = bob_mgr.clone();
1589        alice_transport
1590            .expect_send_message()
1591            .once()
1592            .in_sequence(&mut sequence)
1593            .withf(move |peer, data| {
1594                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1595                    data.data.plain_text.as_ref(),
1596                )
1597                .expect("must be a session message")
1598                .try_as_segment()
1599                .expect("must be a segment")
1600                .is_terminating()
1601                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1602            })
1603            .returning(move |_, data| {
1604                let bob_mgr_clone = bob_mgr_clone.clone();
1605                Box::pin(async move {
1606                    bob_mgr_clone
1607                        .dispatch_message(
1608                            alice_pseudonym,
1609                            ApplicationDataIn {
1610                                data: data.data,
1611                                packet_info: Default::default(),
1612                            },
1613                        )
1614                        .await?;
1615                    Ok(())
1616                })
1617            });
1618
1619        let mut ahs = Vec::new();
1620
1621        // Start Alice
1622        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1623        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1624
1625        // Start Bob
1626        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1627        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1628
1629        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1630
1631        pin_mut!(new_session_rx_bob);
1632        let (alice_session, bob_session) = timeout(
1633            Duration::from_secs(2),
1634            futures::future::join(
1635                alice_mgr.new_session(
1636                    bob_peer,
1637                    SessionTarget::TcpStream(target.clone()),
1638                    SessionClientConfig {
1639                        pseudonym: alice_pseudonym.into(),
1640                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1641                        surb_management: None,
1642                        ..Default::default()
1643                    },
1644                ),
1645                new_session_rx_bob.next(),
1646            ),
1647        )
1648        .await?;
1649
1650        let mut alice_session = alice_session?;
1651        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1652
1653        assert_eq!(
1654            alice_session.config().capabilities,
1655            Capability::Segmentation | Capability::NoRateControl
1656        );
1657        assert_eq!(
1658            alice_session.config().capabilities,
1659            bob_session.session.config().capabilities
1660        );
1661        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1662
1663        assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1664        assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1665        assert!(
1666            alice_mgr
1667                .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1668                .await
1669                .is_err()
1670        );
1671
1672        assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1673        assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1674        assert!(
1675            bob_mgr
1676                .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1677                .await
1678                .is_err()
1679        );
1680
1681        tokio::time::sleep(Duration::from_millis(100)).await;
1682        alice_session.close().await?;
1683
1684        tokio::time::sleep(Duration::from_millis(100)).await;
1685
1686        assert!(matches!(
1687            alice_mgr.ping_session(alice_session.id()).await,
1688            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1689        ));
1690
1691        futures::stream::iter(ahs)
1692            .for_each(|ah| async move { ah.abort() })
1693            .await;
1694
1695        Ok(())
1696    }
1697
1698    #[test_log::test(tokio::test)]
1699    async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1700        let alice_pseudonym = HoprPseudonym::random();
1701        let bob_peer: Address = (&ChainKeypair::random()).into();
1702
1703        let cfg = SessionManagerConfig {
1704            idle_timeout: Duration::from_millis(200),
1705            ..Default::default()
1706        };
1707
1708        let alice_mgr = SessionManager::new(cfg);
1709        let bob_mgr = SessionManager::new(Default::default());
1710
1711        let mut sequence = mockall::Sequence::new();
1712        let mut alice_transport = MockMsgSender::new();
1713        let mut bob_transport = MockMsgSender::new();
1714
1715        // Alice sends the StartSession message
1716        let bob_mgr_clone = bob_mgr.clone();
1717        alice_transport
1718            .expect_send_message()
1719            .once()
1720            .in_sequence(&mut sequence)
1721            .withf(move |peer, data| {
1722                msg_type(data, StartProtocolDiscriminants::StartSession)
1723                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1724            })
1725            .returning(move |_, data| {
1726                let bob_mgr_clone = bob_mgr_clone.clone();
1727                Box::pin(async move {
1728                    bob_mgr_clone
1729                        .dispatch_message(
1730                            alice_pseudonym,
1731                            ApplicationDataIn {
1732                                data: data.data,
1733                                packet_info: Default::default(),
1734                            },
1735                        )
1736                        .await?;
1737                    Ok(())
1738                })
1739            });
1740
1741        // Bob sends the SessionEstablished message
1742        let alice_mgr_clone = alice_mgr.clone();
1743        bob_transport
1744            .expect_send_message()
1745            .once()
1746            .in_sequence(&mut sequence)
1747            .withf(move |peer, data| {
1748                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1749                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1750            })
1751            .returning(move |_, data| {
1752                let alice_mgr_clone = alice_mgr_clone.clone();
1753
1754                Box::pin(async move {
1755                    alice_mgr_clone
1756                        .dispatch_message(
1757                            alice_pseudonym,
1758                            ApplicationDataIn {
1759                                data: data.data,
1760                                packet_info: Default::default(),
1761                            },
1762                        )
1763                        .await?;
1764                    Ok(())
1765                })
1766            });
1767
1768        let mut ahs = Vec::new();
1769
1770        // Start Alice
1771        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1772        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1773
1774        // Start Bob
1775        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1776        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1777
1778        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1779
1780        pin_mut!(new_session_rx_bob);
1781        let (alice_session, bob_session) = timeout(
1782            Duration::from_secs(2),
1783            futures::future::join(
1784                alice_mgr.new_session(
1785                    bob_peer,
1786                    SessionTarget::TcpStream(target.clone()),
1787                    SessionClientConfig {
1788                        pseudonym: alice_pseudonym.into(),
1789                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1790                        surb_management: None,
1791                        ..Default::default()
1792                    },
1793                ),
1794                new_session_rx_bob.next(),
1795            ),
1796        )
1797        .await?;
1798
1799        let alice_session = alice_session?;
1800        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1801
1802        assert_eq!(
1803            alice_session.config().capabilities,
1804            Capability::Segmentation | Capability::NoRateControl,
1805        );
1806        assert_eq!(
1807            alice_session.config().capabilities,
1808            bob_session.session.config().capabilities
1809        );
1810        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1811
1812        // Let the session timeout at Alice
1813        tokio::time::sleep(Duration::from_millis(300)).await;
1814
1815        assert!(matches!(
1816            alice_mgr.ping_session(alice_session.id()).await,
1817            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1818        ));
1819
1820        futures::stream::iter(ahs)
1821            .for_each(|ah| async move { ah.abort() })
1822            .await;
1823
1824        Ok(())
1825    }
1826
1827    #[test_log::test(tokio::test)]
1828    async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1829        let alice_pseudonym = HoprPseudonym::random();
1830        let session_id = SessionId::new(16u64, alice_pseudonym);
1831        let balancer_cfg = SurbBalancerConfig {
1832            target_surb_buffer_size: 1000,
1833            max_surbs_per_sec: 100,
1834            ..Default::default()
1835        };
1836
1837        let alice_mgr = SessionManager::<
1838            UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
1839            futures::channel::mpsc::Sender<IncomingSession>,
1840        >::new(Default::default());
1841
1842        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1843        alice_mgr
1844            .sessions
1845            .insert(
1846                session_id,
1847                SessionSlot {
1848                    session_tx: Arc::new(dummy_tx),
1849                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1850                    abort_handles: Default::default(),
1851                    surb_mgmt: Some(balancer_cfg),
1852                    surb_estimator: None,
1853                    #[cfg(feature = "telemetry")]
1854                    telemetry: Arc::new(SessionTelemetry::new(session_id, Default::default())),
1855                },
1856            )
1857            .await;
1858
1859        let actual_cfg = alice_mgr
1860            .get_surb_balancer_config(&session_id)
1861            .await?
1862            .ok_or(anyhow!("session must have a surb balancer config"))?;
1863        assert_eq!(actual_cfg, balancer_cfg);
1864
1865        let new_cfg = SurbBalancerConfig {
1866            target_surb_buffer_size: 2000,
1867            max_surbs_per_sec: 200,
1868            ..Default::default()
1869        };
1870        alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1871
1872        let actual_cfg = alice_mgr
1873            .get_surb_balancer_config(&session_id)
1874            .await?
1875            .ok_or(anyhow!("session must have a surb balancer config"))?;
1876        assert_eq!(actual_cfg, new_cfg);
1877
1878        Ok(())
1879    }
1880
1881    #[test_log::test(tokio::test)]
1882    async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1883        let alice_pseudonym = HoprPseudonym::random();
1884        let bob_peer: Address = (&ChainKeypair::random()).into();
1885
1886        let cfg = SessionManagerConfig {
1887            session_tag_range: 16u64..17u64, // Slot for exactly one session
1888            ..Default::default()
1889        };
1890
1891        let alice_mgr = SessionManager::new(Default::default());
1892        let bob_mgr = SessionManager::new(cfg);
1893
1894        // Occupy the only free slot with tag 16
1895        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1896        bob_mgr
1897            .sessions
1898            .insert(
1899                SessionId::new(16u64, alice_pseudonym),
1900                SessionSlot {
1901                    session_tx: Arc::new(dummy_tx),
1902                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1903                    abort_handles: Default::default(),
1904                    #[cfg(feature = "telemetry")]
1905                    telemetry: Arc::new(SessionTelemetry::new(
1906                        SessionId::new(16u64, alice_pseudonym),
1907                        Default::default(),
1908                    )),
1909                    surb_mgmt: None,
1910                    surb_estimator: None,
1911                },
1912            )
1913            .await;
1914
1915        let mut sequence = mockall::Sequence::new();
1916        let mut alice_transport = MockMsgSender::new();
1917        let mut bob_transport = MockMsgSender::new();
1918
1919        // Alice sends the StartSession message
1920        let bob_mgr_clone = bob_mgr.clone();
1921        alice_transport
1922            .expect_send_message()
1923            .once()
1924            .in_sequence(&mut sequence)
1925            .withf(move |peer, data| {
1926                msg_type(data, StartProtocolDiscriminants::StartSession)
1927                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1928            })
1929            .returning(move |_, data| {
1930                let bob_mgr_clone = bob_mgr_clone.clone();
1931                Box::pin(async move {
1932                    bob_mgr_clone
1933                        .dispatch_message(
1934                            alice_pseudonym,
1935                            ApplicationDataIn {
1936                                data: data.data,
1937                                packet_info: Default::default(),
1938                            },
1939                        )
1940                        .await?;
1941                    Ok(())
1942                })
1943            });
1944
1945        // Bob sends the SessionError message
1946        let alice_mgr_clone = alice_mgr.clone();
1947        bob_transport
1948            .expect_send_message()
1949            .once()
1950            .in_sequence(&mut sequence)
1951            .withf(move |peer, data| {
1952                msg_type(data, StartProtocolDiscriminants::SessionError)
1953                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1954            })
1955            .returning(move |_, data| {
1956                let alice_mgr_clone = alice_mgr_clone.clone();
1957                Box::pin(async move {
1958                    alice_mgr_clone
1959                        .dispatch_message(
1960                            alice_pseudonym,
1961                            ApplicationDataIn {
1962                                data: data.data,
1963                                packet_info: Default::default(),
1964                            },
1965                        )
1966                        .await?;
1967                    Ok(())
1968                })
1969            });
1970
1971        let mut jhs = Vec::new();
1972
1973        // Start Alice
1974        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1975        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1976
1977        // Start Bob
1978        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
1979        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1980
1981        let result = alice_mgr
1982            .new_session(
1983                bob_peer,
1984                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1985                SessionClientConfig {
1986                    capabilities: Capabilities::empty(),
1987                    pseudonym: alice_pseudonym.into(),
1988                    surb_management: None,
1989                    ..Default::default()
1990                },
1991            )
1992            .await;
1993
1994        assert!(
1995            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1996        );
1997
1998        Ok(())
1999    }
2000
2001    #[test_log::test(tokio::test)]
2002    async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
2003    -> anyhow::Result<()> {
2004        let alice_pseudonym = HoprPseudonym::random();
2005        let bob_peer: Address = (&ChainKeypair::random()).into();
2006
2007        let cfg = SessionManagerConfig {
2008            maximum_sessions: 1,
2009            ..Default::default()
2010        };
2011
2012        let alice_mgr = SessionManager::new(Default::default());
2013        let bob_mgr = SessionManager::new(cfg);
2014
2015        // Occupy the only free slot with tag 16
2016        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
2017        bob_mgr
2018            .sessions
2019            .insert(
2020                SessionId::new(16u64, alice_pseudonym),
2021                SessionSlot {
2022                    session_tx: Arc::new(dummy_tx),
2023                    routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
2024                    abort_handles: Default::default(),
2025                    surb_mgmt: None,
2026                    surb_estimator: None,
2027                    #[cfg(feature = "telemetry")]
2028                    telemetry: Arc::new(SessionTelemetry::new(
2029                        SessionId::new(16u64, alice_pseudonym),
2030                        Default::default(),
2031                    )),
2032                },
2033            )
2034            .await;
2035
2036        let mut sequence = mockall::Sequence::new();
2037        let mut alice_transport = MockMsgSender::new();
2038        let mut bob_transport = MockMsgSender::new();
2039
2040        // Alice sends the StartSession message
2041        let bob_mgr_clone = bob_mgr.clone();
2042        alice_transport
2043            .expect_send_message()
2044            .once()
2045            .in_sequence(&mut sequence)
2046            .withf(move |peer, data| {
2047                msg_type(data, StartProtocolDiscriminants::StartSession)
2048                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2049            })
2050            .returning(move |_, data| {
2051                let bob_mgr_clone = bob_mgr_clone.clone();
2052                Box::pin(async move {
2053                    bob_mgr_clone
2054                        .dispatch_message(
2055                            alice_pseudonym,
2056                            ApplicationDataIn {
2057                                data: data.data,
2058                                packet_info: Default::default(),
2059                            },
2060                        )
2061                        .await?;
2062                    Ok(())
2063                })
2064            });
2065
2066        // Bob sends the SessionError message
2067        let alice_mgr_clone = alice_mgr.clone();
2068        bob_transport
2069            .expect_send_message()
2070            .once()
2071            .in_sequence(&mut sequence)
2072            .withf(move |peer, data| {
2073                msg_type(data, StartProtocolDiscriminants::SessionError)
2074                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2075            })
2076            .returning(move |_, data| {
2077                let alice_mgr_clone = alice_mgr_clone.clone();
2078                Box::pin(async move {
2079                    alice_mgr_clone
2080                        .dispatch_message(
2081                            alice_pseudonym,
2082                            ApplicationDataIn {
2083                                data: data.data,
2084                                packet_info: Default::default(),
2085                            },
2086                        )
2087                        .await?;
2088                    Ok(())
2089                })
2090            });
2091
2092        let mut jhs = Vec::new();
2093
2094        // Start Alice
2095        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2096        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2097
2098        // Start Bob
2099        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2100        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2101
2102        let result = alice_mgr
2103            .new_session(
2104                bob_peer,
2105                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2106                SessionClientConfig {
2107                    capabilities: None.into(),
2108                    pseudonym: alice_pseudonym.into(),
2109                    surb_management: None,
2110                    ..Default::default()
2111                },
2112            )
2113            .await;
2114
2115        assert!(
2116            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2117        );
2118
2119        Ok(())
2120    }
2121
2122    #[test_log::test(tokio::test)]
2123    async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
2124        let alice_pseudonym = HoprPseudonym::random();
2125        let bob_peer: Address = (&ChainKeypair::random()).into();
2126
2127        let alice_mgr = SessionManager::new(Default::default());
2128
2129        let mut sequence = mockall::Sequence::new();
2130        let mut alice_transport = MockMsgSender::new();
2131
2132        // Alice sends the StartSession message
2133        let alice_mgr_clone = alice_mgr.clone();
2134        alice_transport
2135            .expect_send_message()
2136            .once()
2137            .in_sequence(&mut sequence)
2138            .withf(move |peer, data| {
2139                msg_type(data, StartProtocolDiscriminants::StartSession)
2140                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2141            })
2142            .returning(move |_, data| {
2143                // But the message is again processed by Alice due to Loopback
2144                let alice_mgr_clone = alice_mgr_clone.clone();
2145                Box::pin(async move {
2146                    alice_mgr_clone
2147                        .dispatch_message(
2148                            alice_pseudonym,
2149                            ApplicationDataIn {
2150                                data: data.data,
2151                                packet_info: Default::default(),
2152                            },
2153                        )
2154                        .await?;
2155                    Ok(())
2156                })
2157            });
2158
2159        // Alice sends the SessionEstablished message (as Bob)
2160        let alice_mgr_clone = alice_mgr.clone();
2161        alice_transport
2162            .expect_send_message()
2163            .once()
2164            .in_sequence(&mut sequence)
2165            .withf(move |peer, data| {
2166                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2167                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2168            })
2169            .returning(move |_, data| {
2170                let alice_mgr_clone = alice_mgr_clone.clone();
2171
2172                Box::pin(async move {
2173                    alice_mgr_clone
2174                        .dispatch_message(
2175                            alice_pseudonym,
2176                            ApplicationDataIn {
2177                                data: data.data,
2178                                packet_info: Default::default(),
2179                            },
2180                        )
2181                        .await?;
2182                    Ok(())
2183                })
2184            });
2185
2186        // Start Alice
2187        let (new_session_tx_alice, new_session_rx_alice) = futures::channel::mpsc::channel(1024);
2188        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2189
2190        let alice_session = alice_mgr
2191            .new_session(
2192                bob_peer,
2193                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2194                SessionClientConfig {
2195                    capabilities: None.into(),
2196                    pseudonym: alice_pseudonym.into(),
2197                    surb_management: None,
2198                    ..Default::default()
2199                },
2200            )
2201            .await;
2202
2203        println!("{alice_session:?}");
2204        assert!(matches!(
2205            alice_session,
2206            Err(TransportSessionError::Manager(SessionManagerError::Loopback))
2207        ));
2208
2209        drop(new_session_rx_alice);
2210        Ok(())
2211    }
2212
2213    #[test_log::test(tokio::test)]
2214    async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
2215        let bob_peer: Address = (&ChainKeypair::random()).into();
2216
2217        let cfg = SessionManagerConfig {
2218            initiation_timeout_base: Duration::from_millis(100),
2219            ..Default::default()
2220        };
2221
2222        let alice_mgr = SessionManager::new(cfg);
2223        let bob_mgr = SessionManager::new(Default::default());
2224
2225        let mut sequence = mockall::Sequence::new();
2226        let mut alice_transport = MockMsgSender::new();
2227        let bob_transport = MockMsgSender::new();
2228
2229        // Alice sends the StartSession message, but Bob does not handle it
2230        alice_transport
2231            .expect_send_message()
2232            .once()
2233            .in_sequence(&mut sequence)
2234            .withf(move |peer, data| {
2235                msg_type(data, StartProtocolDiscriminants::StartSession)
2236                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2237            })
2238            .returning(|_, _| Box::pin(async { Ok(()) }));
2239
2240        // Start Alice
2241        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2242        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2243
2244        // Start Bob
2245        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2246        bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
2247
2248        let result = alice_mgr
2249            .new_session(
2250                bob_peer,
2251                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2252                SessionClientConfig {
2253                    capabilities: None.into(),
2254                    pseudonym: None,
2255                    surb_management: None,
2256                    ..Default::default()
2257                },
2258            )
2259            .await;
2260
2261        assert!(matches!(result, Err(TransportSessionError::Timeout)));
2262
2263        Ok(())
2264    }
2265
2266    #[test_log::test(tokio::test)]
2267    async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
2268        let alice_pseudonym = HoprPseudonym::random();
2269        let bob_peer: Address = (&ChainKeypair::random()).into();
2270
2271        let bob_cfg = SessionManagerConfig::default();
2272        let alice_mgr = SessionManager::new(Default::default());
2273        let bob_mgr = SessionManager::new(bob_cfg.clone());
2274
2275        let mut alice_transport = MockMsgSender::new();
2276        let mut bob_transport = MockMsgSender::new();
2277
2278        // Alice sends the StartSession message
2279        let mut open_sequence = mockall::Sequence::new();
2280        let bob_mgr_clone = bob_mgr.clone();
2281        alice_transport
2282            .expect_send_message()
2283            .once()
2284            .in_sequence(&mut open_sequence)
2285            .withf(move |peer, data| {
2286                msg_type(data, StartProtocolDiscriminants::StartSession)
2287                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2288            })
2289            .returning(move |_, data| {
2290                let bob_mgr_clone = bob_mgr_clone.clone();
2291                Box::pin(async move {
2292                    bob_mgr_clone
2293                        .dispatch_message(
2294                            alice_pseudonym,
2295                            ApplicationDataIn {
2296                                data: data.data,
2297                                packet_info: Default::default(),
2298                            },
2299                        )
2300                        .await?;
2301                    Ok(())
2302                })
2303            });
2304
2305        // Bob sends the SessionEstablished message
2306        let alice_mgr_clone = alice_mgr.clone();
2307        bob_transport
2308            .expect_send_message()
2309            .once()
2310            .in_sequence(&mut open_sequence)
2311            .withf(move |peer, data| {
2312                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2313                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2314            })
2315            .returning(move |_, data| {
2316                let alice_mgr_clone = alice_mgr_clone.clone();
2317                Box::pin(async move {
2318                    alice_mgr_clone
2319                        .dispatch_message(
2320                            alice_pseudonym,
2321                            ApplicationDataIn {
2322                                data: data.data,
2323                                packet_info: Default::default(),
2324                            },
2325                        )
2326                        .await?;
2327                    Ok(())
2328                })
2329            });
2330
2331        // Alice sends the KeepAlive messages
2332        let bob_mgr_clone = bob_mgr.clone();
2333        alice_transport
2334            .expect_send_message()
2335            .times(5..)
2336            //.in_sequence(&mut sequence)
2337            .withf(move |peer, data| {
2338                msg_type(data, StartProtocolDiscriminants::KeepAlive)
2339                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2340            })
2341            .returning(move |_, data| {
2342                let bob_mgr_clone = bob_mgr_clone.clone();
2343                Box::pin(async move {
2344                    bob_mgr_clone
2345                        .dispatch_message(
2346                            alice_pseudonym,
2347                            ApplicationDataIn {
2348                                data: data.data,
2349                                packet_info: Default::default(),
2350                            },
2351                        )
2352                        .await?;
2353                    Ok(())
2354                })
2355            });
2356
2357        // Alice sends the terminating segment to close the Session
2358        let bob_mgr_clone = bob_mgr.clone();
2359        alice_transport
2360            .expect_send_message()
2361            .once()
2362            //.in_sequence(&mut sequence)
2363            .withf(move |peer, data| {
2364                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2365                    data.data.plain_text.as_ref(),
2366                )
2367                .ok()
2368                .and_then(|m| m.try_as_segment())
2369                .map(|s| s.is_terminating())
2370                .unwrap_or(false)
2371                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2372            })
2373            .returning(move |_, data| {
2374                let bob_mgr_clone = bob_mgr_clone.clone();
2375                Box::pin(async move {
2376                    bob_mgr_clone
2377                        .dispatch_message(
2378                            alice_pseudonym,
2379                            ApplicationDataIn {
2380                                data: data.data,
2381                                packet_info: Default::default(),
2382                            },
2383                        )
2384                        .await?;
2385                    Ok(())
2386                })
2387            });
2388
2389        let mut ahs = Vec::new();
2390
2391        // Start Alice
2392        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2393        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2394
2395        // Start Bob
2396        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
2397        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2398
2399        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2400
2401        let balancer_cfg = SurbBalancerConfig {
2402            target_surb_buffer_size: 10,
2403            max_surbs_per_sec: 100,
2404            ..Default::default()
2405        };
2406
2407        pin_mut!(new_session_rx_bob);
2408        let (alice_session, bob_session) = timeout(
2409            Duration::from_secs(2),
2410            futures::future::join(
2411                alice_mgr.new_session(
2412                    bob_peer,
2413                    SessionTarget::TcpStream(target.clone()),
2414                    SessionClientConfig {
2415                        pseudonym: alice_pseudonym.into(),
2416                        capabilities: Capability::Segmentation.into(),
2417                        surb_management: Some(balancer_cfg),
2418                        ..Default::default()
2419                    },
2420                ),
2421                new_session_rx_bob.next(),
2422            ),
2423        )
2424        .await?;
2425
2426        let mut alice_session = alice_session?;
2427        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2428
2429        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2430
2431        assert_eq!(
2432            Some(balancer_cfg),
2433            alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2434        );
2435
2436        let remote_cfg = bob_mgr
2437            .get_surb_balancer_config(bob_session.session.id())
2438            .await?
2439            .ok_or(anyhow!("no remote config at bob"))?;
2440        assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2441        assert_eq!(
2442            remote_cfg.max_surbs_per_sec,
2443            remote_cfg.target_surb_buffer_size
2444                / bob_cfg
2445                    .minimum_surb_buffer_duration
2446                    .max(MIN_SURB_BUFFER_DURATION)
2447                    .as_secs()
2448        );
2449
2450        // Let the Surb balancer send enough KeepAlive messages
2451        tokio::time::sleep(Duration::from_millis(1500)).await;
2452        alice_session.close().await?;
2453
2454        tokio::time::sleep(Duration::from_millis(300)).await;
2455        assert!(matches!(
2456            alice_mgr.ping_session(alice_session.id()).await,
2457            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2458        ));
2459
2460        futures::stream::iter(ahs)
2461            .for_each(|ah| async move { ah.abort() })
2462            .await;
2463
2464        Ok(())
2465    }
2466}