Skip to main content

hopr_transport_session/
manager.rs

1use std::{
2    ops::Range,
3    sync::{Arc, OnceLock},
4    time::Duration,
5};
6
7use anyhow::anyhow;
8use futures::{
9    FutureExt, SinkExt, StreamExt, TryStreamExt,
10    channel::mpsc::{Sender, UnboundedSender},
11    future::AbortHandle,
12    pin_mut,
13};
14use futures_time::future::FutureExt as TimeExt;
15use hopr_async_runtime::AbortableList;
16use hopr_crypto_packet::prelude::HoprPacket;
17use hopr_protocol_app::prelude::*;
18use hopr_protocol_start::{
19    KeepAliveFlag, KeepAliveMessage, StartChallenge, StartErrorReason, StartErrorType, StartEstablished,
20    StartInitiation,
21};
22use hopr_transport_tag_allocator::{AllocatedTag, TagAllocator};
23use hopr_types::{
24    crypto_random::Randomizable,
25    internal::{
26        prelude::HoprPseudonym,
27        routing::{DestinationRouting, RoutingOptions},
28    },
29    primitive::prelude::Address,
30};
31use tracing::{debug, error, info, trace, warn};
32
33#[cfg(feature = "telemetry")]
34use crate::telemetry::{
35    SessionLifecycleState, initialize_session_metrics, remove_session_metrics_state, set_session_balancer_data,
36    set_session_state,
37};
38use crate::{
39    Capability, HoprSession, IncomingSession, SESSION_MTU, SessionClientConfig, SessionId, SessionTarget,
40    SurbBalancerConfig,
41    balancer::{
42        AtomicSurbFlowEstimator, BalancerStateValues, RateController, RateLimitSinkExt, SurbBalancer,
43        SurbControllerWithCorrection,
44        pid::{PidBalancerController, PidControllerGains},
45        simple::SimpleBalancerController,
46    },
47    errors::{SessionManagerError, TransportSessionError},
48    types::{ByteCapabilities, ClosureReason, HoprSessionConfig, HoprStartProtocol},
49    utils,
50    utils::{SurbNotificationMode, insert_into_next_slot},
51};
52
53#[cfg(all(feature = "telemetry", not(test)))]
54lazy_static::lazy_static! {
55    static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
56        "hopr_session_num_active_sessions",
57        "Number of currently active HOPR sessions"
58    ).unwrap();
59    static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
60        "hopr_session_established_sessions_count",
61        "Number of sessions that were successfully established as an Exit node"
62    ).unwrap();
63    static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
64        "hopr_session_initiated_sessions_count",
65        "Number of sessions that were successfully initiated as an Entry node"
66    ).unwrap();
67    static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
68        "hopr_session_received_error_count",
69        "Number of HOPR session errors received from an Exit node",
70        &["kind"]
71    ).unwrap();
72    static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
73        "hopr_session_sent_error_count",
74        "Number of HOPR session errors sent to an Entry node",
75        &["kind"]
76    ).unwrap();
77}
78
79fn close_session(session_id: SessionId, session_data: SessionSlot, reason: ClosureReason) {
80    debug!(?session_id, ?reason, "closing session");
81
82    #[cfg(feature = "telemetry")]
83    {
84        set_session_state(&session_id, SessionLifecycleState::Closed);
85        remove_session_metrics_state(&session_id);
86    }
87
88    if reason != ClosureReason::EmptyRead {
89        // Closing the data sender will also cause it to close from the read side
90        session_data.session_tx.close_channel();
91        trace!(?session_id, "data tx channel closed on session");
92    }
93
94    // Terminate any additional tasks spawned by the Session
95    session_data.abort_handles.lock().abort_all();
96
97    #[cfg(all(feature = "telemetry", not(test)))]
98    METRIC_ACTIVE_SESSIONS.decrement(1.0);
99}
100
101fn initiation_timeout_max_one_way(base: Duration, hops: usize) -> Duration {
102    base * (hops as u32)
103}
104
105/// Minimum time the SURB buffer must endure if no SURBs are being produced.
106pub const MIN_SURB_BUFFER_DURATION: Duration = Duration::from_secs(1);
107/// Minimum time between SURB buffer notifications to the Entry.
108pub const MIN_SURB_BUFFER_NOTIFICATION_PERIOD: Duration = Duration::from_secs(1);
109
110/// The first challenge value used in Start protocol to initiate a session.
111pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
112
113/// Maximum time to wait for counterparty to receive the target number of SURBs.
114const SESSION_READINESS_TIMEOUT: Duration = Duration::from_secs(10);
115
116/// Minimum timeout until an unfinished frame is discarded.
117const MIN_FRAME_TIMEOUT: Duration = Duration::from_millis(10);
118
119// Needs to use an UnboundedSender instead of oneshot
120// because Moka cache requires the value to be Clone, which oneshot Sender is not.
121// It also cannot be enclosed in an Arc, since calling `send` consumes the oneshot Sender.
122type SessionInitiationCache =
123    moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
124
125#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
126enum SessionTasks {
127    KeepAlive,
128    Balancer,
129}
130
131#[derive(Clone)]
132struct SessionSlot {
133    // Sender needs to be put in Arc, so that no clones are made by `moka`.
134    // This makes sure that the entire channel closes once the one and only sender is closed.
135    session_tx: Arc<UnboundedSender<ApplicationDataIn>>,
136    routing_opts: DestinationRouting,
137    // Additional tasks spawned by the Session.
138    abort_handles: Arc<parking_lot::Mutex<AbortableList<SessionTasks>>>,
139    // Allows reconfiguring of the SURB balancer on-the-fly
140    // Set on both Entry and Exit sides.
141    surb_mgmt: Arc<BalancerStateValues>,
142    // SURB flow updates happening outside of Session protocol
143    // (e.g., due to Start protocol messages).
144    surb_estimator: AtomicSurbFlowEstimator,
145    // Holds the allocated tag so it is returned to the pool when the session is evicted.
146    // `None` for outgoing sessions where the tag was assigned by the remote peer.
147    #[allow(dead_code, reason = "kept alive for Drop-based tag deallocation")]
148    allocated_tag: Option<Arc<AllocatedTag>>,
149}
150
151/// Indicates the result of processing a message.
152#[derive(Clone, Debug, PartialEq, Eq)]
153pub enum DispatchResult {
154    /// Session or Start protocol message has been processed successfully.
155    Processed,
156    /// The message was not related to Start or Session protocol.
157    Unrelated(ApplicationDataIn),
158}
159
160/// Configuration for the [`SessionManager`].
161#[derive(Clone, Debug, PartialEq, smart_default::SmartDefault)]
162pub struct SessionManagerConfig {
163    /// The maximum chunk of data that can be written to the Session's input buffer.
164    ///
165    /// Default is 1500.
166    #[default(1500)]
167    pub frame_mtu: usize,
168
169    /// The maximum time for an incomplete frame to stay in the Session's output buffer.
170    ///
171    /// Default is 800 ms.
172    #[default(Duration::from_millis(800))]
173    pub max_frame_timeout: Duration,
174
175    /// The base timeout for initiation of Session initiation.
176    ///
177    /// The actual timeout is adjusted according to the number of hops for that Session:
178    /// `t = initiation_time_out_base * (num_forward_hops + num_return_hops + 2)`
179    ///
180    /// Default is 500 milliseconds.
181    #[default(Duration::from_millis(500))]
182    pub initiation_timeout_base: Duration,
183
184    /// Timeout for Session to be closed due to inactivity.
185    ///
186    /// Default is 180 seconds.
187    #[default(Duration::from_secs(180))]
188    pub idle_timeout: Duration,
189
190    /// The sampling interval for SURB balancer.
191    /// It will make SURB control decisions regularly at this interval.
192    ///
193    /// Default is 100 milliseconds.
194    #[default(Duration::from_millis(100))]
195    pub balancer_sampling_interval: Duration,
196
197    /// Initial packets per second egress rate on an incoming Session.
198    ///
199    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
200    ///
201    /// Default is 10 packets/second.
202    #[default(10)]
203    pub initial_return_session_egress_rate: usize,
204
205    /// Minimum period of time for which a SURB buffer at the Exit must
206    /// endure if no SURBs are being received.
207    ///
208    /// In other words, it is the minimum period of time an Exit must withstand when
209    /// no SURBs are received from the Entry at all. To do so, the egress traffic
210    /// will be shaped accordingly to meet this requirement.
211    ///
212    /// This only applies to incoming Sessions without the [`Capability::NoRateControl`] flag set.
213    ///
214    /// Default is 5 seconds, minimum is 1 second.
215    #[default(Duration::from_secs(5))]
216    pub minimum_surb_buffer_duration: Duration,
217
218    /// Indicates the maximum number of SURBs in the SURB buffer to be requested when creating a new Session.
219    ///
220    /// This value is theoretically capped by the size of the global transport SURB ring buffer,
221    /// so values greater than that do not make sense. This value should be ideally set equal
222    /// to the size of the global transport SURB RB.
223    ///
224    /// Default is 10 000 SURBs.
225    #[default(10_000)]
226    pub maximum_surb_buffer_size: usize,
227
228    /// If set, the Session recipient (Exit) will notify the Session initiator (Entry) about
229    /// its SURB balance for the Session using keep-alive packets periodically.
230    ///
231    /// Keep in mind that each notification also costs 1 SURB, so the notification period should
232    /// not be too frequent.
233    ///
234    /// Default is None (no notification sent to the client), minimum is 1 second.
235    #[default(None)]
236    pub surb_balance_notify_period: Option<Duration>,
237
238    /// If set, the Session initiator (Entry) will notify the Session recipient (Exit) about
239    /// the local SURB balancer target using keep-alive packets from the SURB balancer.
240    ///
241    /// This is useful when the client plans to change the SURB balancer target dynamically.
242    ///
243    /// Default is true.
244    #[default(true)]
245    pub surb_target_notify: bool,
246}
247
248/// Manages lifecycles of Sessions.
249///
250/// Once the manager is [started](SessionManager::start), the [`SessionManager::dispatch_message`]
251/// should be called for each [`ApplicationData`] received by the node.
252/// This way, the `SessionManager` takes care of proper Start sub-protocol message processing
253/// and correct dispatch of Session-related packets to individual existing Sessions.
254///
255/// Secondly, the manager can initiate new outgoing sessions via [`SessionManager::new_session`],
256/// probe sessions using [`SessionManager::ping_session`]
257/// and list them via [`SessionManager::active_sessions`].
258///
259/// Since the `SessionManager` operates over the HOPR protocol,
260/// the message transport `S` is required.
261/// Such transport must also be `Clone`, since it will be cloned into all the created [`HoprSession`] objects.
262///
263/// ## SURB balancing
264/// The manager also can take care of automatic [SURB balancing](SurbBalancerConfig) per Session.
265///
266/// With each packet sent from the session initiator over to the receiving party, zero to 2 SURBs might be delivered.
267/// When the receiving party wants to send reply packets back, it must consume 1 SURB per packet. This
268/// means that if the difference between the SURBs delivered and SURBs consumed is negative, the receiving party
269/// might soon run out of SURBs. If SURBs run out, the reply packets will be dropped, causing likely quality of
270/// service degradation.
271///
272/// In an attempt to counter this effect, there are two co-existing automated modes of SURB balancing:
273/// *local SURB balancing* and *remote SURB balancing*.
274///
275/// ### Local SURB balancing
276/// Local SURB balancing is performed on the sessions that were initiated by another party (and are
277/// therefore incoming to us).
278/// The local SURB balancing mechanism continuously evaluates the rate of SURB consumption and retrieval,
279/// and if SURBs are running out, the packet egress shaping takes effect. This by itself does not
280/// avoid the depletion of SURBs but slows it down in the hope that the initiating party can deliver
281/// more SURBs over time. This might happen either organically by sending effective payloads that
282/// allow non-zero number of SURBs in the packet, or non-organically by delivering KeepAlive messages
283/// via *remote SURB balancing*.
284///
285/// The egress shaping is done automatically, unless the Session initiator sets the [`Capability::NoRateControl`]
286/// flag during Session initiation.
287///
288/// ### Remote SURB balancing
289/// Remote SURB balancing is performed by the Session initiator. The SURB balancer estimates the number of SURBs
290/// delivered to the other party, and also the number of SURBs consumed by seeing the amount of traffic received
291/// in replies.
292/// When enabled, a desired target level of SURBs at the Session counterparty is set. According to measured
293/// inflow and outflow of SURBs to/from the counterparty, the production of non-organic SURBs is started
294/// via keep-alive messages (sent to counterparty) and is controlled to maintain that target level.
295///
296/// In other words, the Session initiator tries to compensate for the usage of SURBs by the counterparty by
297/// sending new ones via the keep-alive messages.
298///
299/// This mechanism is configurable via the `surb_management` field in [`SessionClientConfig`].
300///
301/// ### Possible scenarios
302/// There are 4 different scenarios of local vs. remote SURB balancing configuration, but
303/// an equilibrium (= matching the SURB production and consumption) is most likely to be reached
304/// only when both are configured (the ideal case below):
305///
306/// #### 1. Ideal local and remote SURB balancing
307/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
308///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
309/// 2. The Session initiator (Entry) sets the [`target_surb_buffer_size`](SurbBalancerConfig) which matches the
310///    [`maximum_surb_buffer_size`](SessionManagerConfig) of the counterparty.
311/// 3. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
312///    Session.
313/// 4. The Session initiator (Entry) sets [`max_surbs_per_sec`](SurbBalancerConfig) slightly higher than the
314///    `maximum_surb_buffer_size / max_surb_buffer_duration` value configured at the counterparty.
315///
316/// In this situation, the maximum Session egress from Exit to the Entry is given by the
317/// `maximum_surb_buffer_size / max_surb_buffer_duration` ratio. If there is enough bandwidth,
318/// the (remote) SURB balancer sending SURBs to the Exit will stabilize roughly at this rate of SURBs/sec,
319/// and the whole system will be in equilibrium during the Session's lifetime (under ideal network conditions).
320///
321/// #### 2. Remote SURB balancing only
322/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
323/// 2. The Session initiator (Entry) sets `max_surbs_per_sec` and `target_surb_buffer_size` values in
324///    [`SurbBalancerConfig`]
325///
326/// In this one-sided situation, the Entry node floods the Exit node with SURBs,
327/// only based on its estimated consumption of SURBs at the Exit. The Exit's egress is not
328/// rate-limited at all. If the Exit runs out of SURBs at any point in time, it will simply drop egress packets.
329///
330/// This configuration could potentially only lead to an equilibrium
331/// when the `SurbBalancer` at the Entry can react fast enough to Exit's demand.
332///
333/// #### 3. Local SURB balancing only
334/// 1. The Session recipient (Exit) set the `initial_return_session_egress_rate`, `max_surb_buffer_duration` and
335///    `maximum_surb_buffer_size` values in the [`SessionManagerConfig`].
336/// 2. The Session initiator (Entry) does *NOT* set the [`Capability::NoRateControl`] capability flag when opening
337///    Session.
338/// 3. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
339///
340/// In this one-sided situation, the Entry node does not provide any additional SURBs at all (except the
341/// ones that are naturally carried by the egress packets which have space to hold SURBs). It relies
342/// only on the Session egress limiting of the Exit node.
343/// The Exit will limit the egress roughly to the rate of natural SURB occurrence in the ingress.
344///
345/// This configuration could potentially only lead to an equilibrium when uploading non-full packets
346/// (ones that can carry at least a single SURB), and the Exit's egress is limiting itself to such a rate.
347/// If Exit's egress reaches low values due to SURB scarcity, the upper layer protocols over Session might break.
348///
349/// #### 4. No SURB balancing on each side
350/// 1. The Session initiator (Entry) *DOES* set the [`Capability::NoRateControl`] capability flag when opening Session.
351/// 2. The Session initiator (Entry) does *NOT* set the [`SurbBalancerConfig`] at all when opening Session.
352///
353/// In this situation, no additional SURBs are being produced by the Entry and no Session egress rate-limiting
354/// takes place at the Exit.
355///
356/// This configuration can only lead to an equilibrium when Entry sends non-full packets (ones that carry
357/// at least a single SURB) and the Exit is consuming the SURBs (Session egress) at a slower or equal rate.
358/// Such configuration is very fragile, as any disturbances in the SURB flow might lead to a packet drop
359/// at the Exit's egress.
360///
361/// ### SURB decay
362/// In a hypothetical scenario of a non-zero packet loss, the Session initiator (Entry) might send a
363/// certain number of SURBs to the Session recipient (Exit), but only a portion of it is actually delivered.
364/// The Entry has no way of knowing that and assumes that everything has been delivered.
365/// A similar problem happens when the Exit uses SURBs to construct return packets, but only a portion
366/// of those packets is actually delivered to the Entry. At this point, the Entry also subtracts
367/// fewer SURBs from its SURB estimate at the Exit.
368///
369/// In both situations, the Entry thinks there are more SURBs available at the Exit than there really are.
370///
371/// To compensate for a potential packet loss, the Entry's estimation of Exit's SURB buffer is regularly
372/// diminished by a percentage of the `target_surb_buffer_size`, even if no incoming traffic from the
373/// Exit is detected.
374///
375/// This behavior can be controlled via the `surb_decay` field of [`SurbBalancerConfig`].
376///
377/// ### SURB balance and target notification
378/// The Session recipient (Exit) can notify the Session initiator (Entry) periodically about its estimated
379/// number of SURBs for the Session. This can help the Entry to adjust its approximation of that level so
380/// that its Local SURB balancer can better intervene.
381/// This can be set using the `surb_balance_notify_period` field of [`SessionManagerConfig`] for the Exit.
382///
383/// Likewise, the Entry can inform the Exit about its desired SURB buffer target so that the Exit
384/// can better accommodate its Remote SURB balancing.
385/// This can be set using the `surb_target_notify` field of the [`SessionManagerConfig`] of each new Session.
386///
387/// Both mechanisms leverage the Keep Alive message to report the respective values.
388pub struct SessionManager<S, T> {
389    session_initiations: SessionInitiationCache,
390    #[allow(clippy::type_complexity)]
391    session_notifiers: Arc<OnceLock<(T, Sender<(SessionId, ClosureReason)>)>>,
392    sessions: moka::future::Cache<SessionId, SessionSlot>,
393    msg_sender: Arc<OnceLock<S>>,
394    tag_allocator: Arc<dyn TagAllocator + Send + Sync>,
395    /// Tag value range for sessions, derived from the [`TagAllocator`].
396    session_tag_range: Range<u64>,
397    /// Maximum number of concurrent sessions, derived from the [`TagAllocator`] capacity.
398    maximum_sessions: usize,
399    cfg: SessionManagerConfig,
400}
401
402impl<S, T> Clone for SessionManager<S, T> {
403    fn clone(&self) -> Self {
404        Self {
405            session_initiations: self.session_initiations.clone(),
406            session_notifiers: self.session_notifiers.clone(),
407            sessions: self.sessions.clone(),
408            cfg: self.cfg.clone(),
409            msg_sender: self.msg_sender.clone(),
410            tag_allocator: self.tag_allocator.clone(),
411            session_tag_range: self.session_tag_range.clone(),
412            maximum_sessions: self.maximum_sessions,
413        }
414    }
415}
416
417const EXTERNAL_SEND_TIMEOUT: Duration = Duration::from_millis(200);
418
419impl<S, T> SessionManager<S, T>
420where
421    S: futures::Sink<(DestinationRouting, ApplicationDataOut)> + Clone + Send + Sync + Unpin + 'static,
422    T: futures::Sink<IncomingSession> + Clone + Send + Sync + Unpin + 'static,
423    S::Error: std::error::Error + Send + Sync + Clone + 'static,
424    T::Error: std::error::Error + Send + Sync + Clone + 'static,
425{
426    /// Creates a new instance given the [`config`](SessionManagerConfig) and a [`TagAllocator`]
427    /// for allocating session tags on the Exit (incoming) side.
428    pub fn new(mut cfg: SessionManagerConfig, tag_allocator: Arc<dyn TagAllocator + Send + Sync>) -> Self {
429        let session_tag_range = tag_allocator.tag_range();
430        let maximum_sessions = tag_allocator.capacity().max(1) as usize;
431
432        debug_assert!(
433            session_tag_range.start >= ReservedTag::range().end,
434            "session tag range must not overlap with reserved tags"
435        );
436        cfg.surb_balance_notify_period = cfg
437            .surb_balance_notify_period
438            .map(|p| p.max(MIN_SURB_BUFFER_NOTIFICATION_PERIOD));
439        cfg.minimum_surb_buffer_duration = cfg.minimum_surb_buffer_duration.max(MIN_SURB_BUFFER_DURATION);
440
441        // Ensure the Frame MTU is at least the size of the Session segment MTU payload
442        cfg.frame_mtu = cfg.frame_mtu.max(SESSION_MTU);
443        cfg.max_frame_timeout = cfg.max_frame_timeout.max(MIN_FRAME_TIMEOUT);
444
445        #[cfg(all(feature = "telemetry", not(test)))]
446        METRIC_ACTIVE_SESSIONS.set(0.0);
447
448        let msg_sender = Arc::new(OnceLock::new());
449        Self {
450            msg_sender: msg_sender.clone(),
451            session_initiations: moka::future::Cache::builder()
452                .max_capacity(maximum_sessions as u64)
453                .time_to_live(
454                    2 * initiation_timeout_max_one_way(
455                        cfg.initiation_timeout_base,
456                        RoutingOptions::MAX_INTERMEDIATE_HOPS,
457                    ),
458                )
459                .build(),
460            sessions: moka::future::Cache::builder()
461                .max_capacity(maximum_sessions as u64)
462                .time_to_idle(cfg.idle_timeout)
463                .eviction_listener(|session_id: Arc<SessionId>, entry, reason| match &reason {
464                    moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size => {
465                        trace!(?session_id, ?reason, "session evicted from the cache");
466                        close_session(*session_id.as_ref(), entry, ClosureReason::Eviction);
467                    }
468                    _ => {}
469                })
470                .build(),
471            session_notifiers: Arc::new(OnceLock::new()),
472            tag_allocator,
473            session_tag_range,
474            maximum_sessions,
475            cfg,
476        }
477    }
478
479    /// Starts the instance with the given `msg_sender` `Sink`
480    /// and a channel `new_session_notifier` used to notify when a new incoming session is opened to us.
481    ///
482    /// This method must be called prior to any calls to [`SessionManager::new_session`] or
483    /// [`SessionManager::dispatch_message`].
484    pub fn start(&self, msg_sender: S, new_session_notifier: T) -> crate::errors::Result<Vec<AbortHandle>> {
485        self.msg_sender
486            .set(msg_sender)
487            .map_err(|_| SessionManagerError::AlreadyStarted)?;
488
489        let (session_close_tx, session_close_rx) = futures::channel::mpsc::channel(self.maximum_sessions + 10);
490        self.session_notifiers
491            .set((new_session_notifier, session_close_tx))
492            .map_err(|_| SessionManagerError::AlreadyStarted)?;
493
494        let myself = self.clone();
495        let ah_closure_notifications = hopr_async_runtime::spawn_as_abortable!(session_close_rx.for_each_concurrent(
496            None,
497            move |(session_id, closure_reason)| {
498                let myself = myself.clone();
499                async move {
500                    // These notifications come from the Sessions themselves once
501                    // an empty read is encountered, which means the closure was done by the
502                    // other party.
503                    if let Some(session_data) = myself.sessions.remove(&session_id).await {
504                        close_session(session_id, session_data, closure_reason);
505                    } else {
506                        // Do not treat this as an error
507                        debug!(
508                            ?session_id,
509                            ?closure_reason,
510                            "could not find session id to close, maybe the session is already closed"
511                        );
512                    }
513                }
514            },
515        ));
516
517        // This is necessary to evict expired entries from the caches if
518        // no session-related operations happen at all.
519        // This ensures the dangling expired sessions are properly closed
520        // and their closure is timely notified to the other party.
521        let myself = self.clone();
522        let ah_session_expiration = hopr_async_runtime::spawn_as_abortable!(async move {
523            let jitter = hopr_types::crypto_random::random_float_in_range(1.0..1.5);
524            let timeout = 2 * initiation_timeout_max_one_way(
525                myself.cfg.initiation_timeout_base,
526                RoutingOptions::MAX_INTERMEDIATE_HOPS,
527            )
528            .min(myself.cfg.idle_timeout)
529            .mul_f64(jitter)
530                / 2;
531            futures_time::stream::interval(timeout.into())
532                .for_each(|_| {
533                    trace!("executing session cache evictions");
534                    futures::future::join(
535                        myself.sessions.run_pending_tasks(),
536                        myself.session_initiations.run_pending_tasks(),
537                    )
538                    .map(|_| ())
539                })
540                .await;
541        });
542
543        Ok(vec![ah_closure_notifications, ah_session_expiration])
544    }
545
546    /// Check if [`start`](SessionManager::start) has been called and the instance is running.
547    pub fn is_started(&self) -> bool {
548        self.session_notifiers.get().is_some()
549    }
550
551    async fn insert_session_slot(&self, session_id: SessionId, slot: SessionSlot) -> crate::errors::Result<()> {
552        // We currently do not support loopback Sessions on ourselves.
553        if let moka::ops::compute::CompResult::Inserted(_) = self
554            .sessions
555            .entry(session_id)
556            .and_compute_with(|entry| {
557                futures::future::ready(if entry.is_none() {
558                    moka::ops::compute::Op::Put(slot)
559                } else {
560                    moka::ops::compute::Op::Nop
561                })
562            })
563            .await
564        {
565            #[cfg(all(feature = "telemetry", not(test)))]
566            {
567                METRIC_NUM_INITIATED_SESSIONS.increment();
568                METRIC_ACTIVE_SESSIONS.increment(1.0);
569            }
570
571            Ok(())
572        } else {
573            // Session already exists; it means it is most likely a loopback attempt
574            error!(%session_id, "session already exists - loopback attempt");
575            Err(SessionManagerError::Loopback.into())
576        }
577    }
578
579    /// Initiates a new outgoing Session to `destination` with the given configuration.
580    ///
581    /// If the Session's counterparty does not respond within
582    /// the [configured](SessionManagerConfig) period,
583    /// this method returns [`TransportSessionError::Timeout`].
584    ///
585    /// It will also fail if the instance has not been [started](SessionManager::start).
586    pub async fn new_session(
587        &self,
588        destination: Address,
589        target: SessionTarget,
590        cfg: SessionClientConfig,
591    ) -> crate::errors::Result<HoprSession> {
592        self.sessions.run_pending_tasks().await;
593        if self.maximum_sessions <= self.sessions.entry_count() as usize {
594            return Err(SessionManagerError::TooManySessions.into());
595        }
596
597        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
598
599        let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
600        let (challenge, _) = insert_into_next_slot(
601            &self.session_initiations,
602            |ch| {
603                if let Some(challenge) = ch {
604                    ((challenge + 1) % hopr_types::crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
605                } else {
606                    hopr_types::crypto_random::random_integer(MIN_CHALLENGE, None)
607                }
608            },
609            |_| tx_initiation_done,
610        )
611        .await
612        .ok_or(SessionManagerError::NoChallengeSlots)?; // almost impossible with u64
613
614        // Prepare the session initiation message in the Start protocol
615        trace!(challenge, ?cfg, "initiating session with config");
616        let start_session_msg = HoprStartProtocol::StartSession(StartInitiation {
617            challenge,
618            target,
619            capabilities: ByteCapabilities(cfg.capabilities),
620            additional_data: if !cfg.capabilities.contains(Capability::NoRateControl) {
621                cfg.surb_management
622                    .map(|c| c.target_surb_buffer_size)
623                    .unwrap_or(
624                        self.cfg.initial_return_session_egress_rate as u64
625                            * self
626                                .cfg
627                                .minimum_surb_buffer_duration
628                                .max(MIN_SURB_BUFFER_DURATION)
629                                .as_secs(),
630                    )
631                    .min(u32::MAX as u64) as u32
632            } else {
633                0
634            },
635        });
636
637        let pseudonym = cfg.pseudonym.unwrap_or(HoprPseudonym::random());
638        let forward_routing = DestinationRouting::Forward {
639            destination: Box::new(destination.into()),
640            pseudonym: Some(pseudonym), // Session must use a fixed pseudonym already
641            forward_options: cfg.forward_path_options.clone(),
642            return_options: cfg.return_path_options.clone().into(),
643        };
644
645        // Send the Session initiation message
646        info!(challenge, %pseudonym, %destination, "new session request");
647        msg_sender
648            .send((
649                forward_routing.clone(),
650                ApplicationDataOut::with_no_packet_info(start_session_msg.try_into()?),
651            ))
652            .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
653            .await
654            .map_err(|_| {
655                error!(challenge, %pseudonym, %destination, "timeout sending session request message");
656                TransportSessionError::Timeout
657            })?
658            .map_err(TransportSessionError::packet_sending)?;
659
660        // The timeout is given by the number of hops requested
661        let initiation_timeout: futures_time::time::Duration = initiation_timeout_max_one_way(
662            self.cfg.initiation_timeout_base,
663            cfg.forward_path_options.count_hops() + cfg.return_path_options.count_hops() + 2,
664        )
665        .into();
666
667        // Await session establishment response from the Exit node or timeout
668        pin_mut!(rx_initiation_done);
669
670        trace!(challenge, "awaiting session establishment");
671        match rx_initiation_done.try_next().timeout(initiation_timeout).await {
672            Ok(Ok(Some(est))) => {
673                // Session has been established, construct it
674                let session_id = est.session_id;
675                debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
676
677                let (tx, rx) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
678                let notifier = self
679                    .session_notifiers
680                    .get()
681                    .map(|(_, notifier)| {
682                        let mut notifier = notifier.clone();
683                        Box::new(move |session_id: SessionId, reason: ClosureReason| {
684                            let _ = notifier
685                                .try_send((session_id, reason))
686                                .inspect_err(|error| error!(%session_id, %error, "failed to notify session closure"));
687                        })
688                    })
689                    .ok_or(SessionManagerError::NotStarted)?;
690
691                // NOTE: the Exit node can have different `max_surb_buffer_size`
692                // setting on the Session manager, so it does not make sense to cap it here
693                // with our maximum value.
694                if let Some(balancer_config) = cfg.surb_management {
695                    let surb_estimator = AtomicSurbFlowEstimator::default();
696
697                    // Sender responsible for keep-alive and Session data will be counting produced SURBs
698                    let surb_estimator_clone = surb_estimator.clone();
699                    let full_surb_scoring_sender =
700                        msg_sender.with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
701                            let produced = data.estimate_surbs_with_msg() as u64;
702                            // Count how many SURBs we sent with each packet
703                            surb_estimator_clone
704                                .produced
705                                .fetch_add(produced, std::sync::atomic::Ordering::Relaxed);
706                            #[cfg(feature = "telemetry")]
707                            crate::telemetry::record_session_surb_produced(&session_id, produced);
708                            futures::future::ok::<_, S::Error>((routing, data))
709                        });
710
711                    // For standard Session data we first reduce the number of SURBs we want to produce,
712                    // unless requested to always max them out
713                    let max_out_organic_surbs = cfg.always_max_out_surbs;
714                    let reduced_surb_scoring_sender = full_surb_scoring_sender.clone().with(
715                        // NOTE: this is put in-front of the `full_surb_scoring_sender`,
716                        // so that its estimate of SURBs gets automatically updated based on
717                        // the `max_surbs_in_packets` set here.
718                        move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
719                            if !max_out_organic_surbs {
720                                // TODO: make this dynamic to honor the balancer target (#7439)
721                                data.packet_info
722                                    .get_or_insert_with(|| OutgoingPacketInfo {
723                                        max_surbs_in_packet: 1,
724                                        ..Default::default()
725                                    })
726                                    .max_surbs_in_packet = 1;
727                            }
728                            futures::future::ok::<_, S::Error>((routing, data))
729                        },
730                    );
731
732                    let mut abort_handles = AbortableList::default();
733                    let surb_mgmt = Arc::new(BalancerStateValues::from(balancer_config));
734
735                    // Spawn the SURB-bearing keep alive stream towards the Exit
736                    let (ka_controller, ka_abort_handle) = utils::spawn_keep_alive_stream(
737                        session_id,
738                        full_surb_scoring_sender,
739                        forward_routing.clone(),
740                        if self.cfg.surb_target_notify {
741                            SurbNotificationMode::Target
742                        } else {
743                            SurbNotificationMode::DoNotNotify
744                        },
745                        surb_mgmt.clone(),
746                    );
747                    abort_handles.insert(SessionTasks::KeepAlive, ka_abort_handle);
748
749                    // Spawn the SURB balancer, which will decide on the initial SURB rate.
750                    debug!(%session_id, ?balancer_config ,"spawning entry SURB balancer");
751                    let balancer = SurbBalancer::new(
752                        session_id,
753                        // The setpoint and output limit is immediately reconfigured by the SurbBalancer
754                        PidBalancerController::from_gains(PidControllerGains::from_env_or_default()),
755                        surb_estimator.clone(),
756                        // Currently, a keep-alive message can bear `HoprPacket::MAX_SURBS_IN_PACKET` SURBs,
757                        // so the correction by this factor is applied.
758                        SurbControllerWithCorrection(ka_controller, HoprPacket::MAX_SURBS_IN_PACKET as u32),
759                        surb_mgmt.clone(),
760                    );
761
762                    let (level_stream, balancer_abort_handle) =
763                        balancer.start_control_loop(self.cfg.balancer_sampling_interval);
764                    abort_handles.insert(SessionTasks::Balancer, balancer_abort_handle);
765
766                    // If the insertion fails prematurely, it will also kill all the abort handles
767                    self.insert_session_slot(
768                        session_id,
769                        SessionSlot {
770                            session_tx: Arc::new(tx),
771                            routing_opts: forward_routing.clone(),
772                            abort_handles: Arc::new(parking_lot::Mutex::new(abort_handles)),
773                            surb_mgmt: surb_mgmt.clone(),
774                            surb_estimator: surb_estimator.clone(),
775                            allocated_tag: None,
776                        },
777                    )
778                    .await?;
779
780                    // Wait for enough SURBs to be sent to the counterparty
781                    // TODO: consider making this interactive = other party reports the exact level periodically
782                    match level_stream
783                        .skip_while(|current_level| {
784                            futures::future::ready(*current_level < balancer_config.target_surb_buffer_size / 2)
785                        })
786                        .next()
787                        .timeout(futures_time::time::Duration::from(SESSION_READINESS_TIMEOUT))
788                        .await
789                    {
790                        Ok(Some(surb_level)) => {
791                            info!(%session_id, surb_level, "session is ready");
792                        }
793                        Ok(None) => {
794                            return Err(
795                                SessionManagerError::other(anyhow!("surb balancer was cancelled prematurely")).into(),
796                            );
797                        }
798                        Err(_) => {
799                            warn!(%session_id, "session didn't reach target SURB buffer size in time");
800                        }
801                    }
802
803                    let surb_estimator_for_rx = surb_estimator.clone();
804                    let session = HoprSession::new(
805                        session_id,
806                        forward_routing,
807                        HoprSessionConfig {
808                            capabilities: cfg.capabilities,
809                            frame_mtu: self.cfg.frame_mtu,
810                            frame_timeout: self.cfg.max_frame_timeout,
811                        },
812                        (
813                            reduced_surb_scoring_sender,
814                            rx.inspect(move |_| {
815                                // Received packets = SURB consumption estimate
816                                // The received packets always consume a single SURB.
817                                surb_estimator_for_rx
818                                    .consumed
819                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
820                                #[cfg(feature = "telemetry")]
821                                crate::telemetry::record_session_surb_consumed(&session_id, 1);
822                            }),
823                        ),
824                        Some(notifier),
825                    )?;
826
827                    #[cfg(feature = "telemetry")]
828                    {
829                        initialize_session_metrics(
830                            session_id,
831                            HoprSessionConfig {
832                                capabilities: cfg.capabilities,
833                                frame_mtu: self.cfg.frame_mtu,
834                                frame_timeout: self.cfg.max_frame_timeout,
835                            },
836                        );
837                        set_session_state(&session_id, SessionLifecycleState::Active);
838                        set_session_balancer_data(&session_id, surb_estimator.clone(), surb_mgmt.clone());
839                    }
840
841                    Ok(session)
842                } else {
843                    warn!(%session_id, "session ready without SURB balancing");
844
845                    self.insert_session_slot(
846                        session_id,
847                        SessionSlot {
848                            session_tx: Arc::new(tx),
849                            routing_opts: forward_routing.clone(),
850                            abort_handles: Default::default(),
851                            surb_mgmt: Default::default(),      // Disabled SURB management
852                            surb_estimator: Default::default(), // No SURB estimator needed
853                            allocated_tag: None,
854                        },
855                    )
856                    .await?;
857
858                    // For standard Session data we first reduce the number of SURBs we want to produce,
859                    // unless requested to always max them out
860                    let max_out_organic_surbs = cfg.always_max_out_surbs;
861                    let reduced_surb_sender =
862                        msg_sender.with(move |(routing, mut data): (DestinationRouting, ApplicationDataOut)| {
863                            if !max_out_organic_surbs {
864                                data.packet_info
865                                    .get_or_insert_with(|| OutgoingPacketInfo {
866                                        max_surbs_in_packet: 1,
867                                        ..Default::default()
868                                    })
869                                    .max_surbs_in_packet = 1;
870                            }
871                            futures::future::ok::<_, S::Error>((routing, data))
872                        });
873
874                    let session = HoprSession::new(
875                        session_id,
876                        forward_routing,
877                        HoprSessionConfig {
878                            capabilities: cfg.capabilities,
879                            frame_mtu: self.cfg.frame_mtu,
880                            frame_timeout: self.cfg.max_frame_timeout,
881                        },
882                        (reduced_surb_sender, rx),
883                        Some(notifier),
884                    )?;
885
886                    #[cfg(feature = "telemetry")]
887                    {
888                        initialize_session_metrics(
889                            session_id,
890                            HoprSessionConfig {
891                                capabilities: cfg.capabilities,
892                                frame_mtu: self.cfg.frame_mtu,
893                                frame_timeout: self.cfg.max_frame_timeout,
894                            },
895                        );
896                        set_session_state(&session_id, SessionLifecycleState::Active);
897                    }
898
899                    Ok(session)
900                }
901            }
902            Ok(Ok(None)) => Err(SessionManagerError::other(anyhow!(
903                "internal error: sender has been closed without completing the session establishment"
904            ))
905            .into()),
906            Ok(Err(error)) => {
907                // The other side did not allow us to establish a session
908                error!(
909                    challenge = error.challenge,
910                    ?error,
911                    "the other party rejected the session initiation with error"
912                );
913                Err(TransportSessionError::Rejected(error.reason))
914            }
915            Err(_) => {
916                // Timeout waiting for a session establishment
917                error!(challenge, "session initiation attempt timed out");
918
919                #[cfg(all(feature = "telemetry", not(test)))]
920                METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
921
922                Err(TransportSessionError::Timeout)
923            }
924        }
925    }
926
927    /// Sends a keep-alive packet with the given [`SessionId`].
928    ///
929    /// This currently "fires & forgets" and does not expect nor await any "pong" response.
930    pub async fn ping_session(&self, id: &SessionId) -> crate::errors::Result<()> {
931        if let Some(session_data) = self.sessions.get(id).await {
932            trace!(session_id = ?id, "pinging manually session");
933            Ok(self
934                .msg_sender
935                .get()
936                .cloned()
937                .ok_or(SessionManagerError::NotStarted)?
938                .send((
939                    session_data.routing_opts.clone(),
940                    ApplicationDataOut::with_no_packet_info(HoprStartProtocol::KeepAlive((*id).into()).try_into()?),
941                ))
942                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
943                .await
944                .map_err(|_| {
945                    error!("timeout sending session ping message");
946                    TransportSessionError::Timeout
947                })?
948                .map_err(TransportSessionError::packet_sending)?)
949        } else {
950            Err(SessionManagerError::NonExistingSession.into())
951        }
952    }
953
954    /// Returns [`SessionIds`](SessionId) of all currently active sessions.
955    pub async fn active_sessions(&self) -> Vec<SessionId> {
956        self.sessions.run_pending_tasks().await;
957        self.sessions.iter().map(|(k, _)| *k).collect()
958    }
959
960    /// Updates the configuration of the SURB balancer on the given [`SessionId`].
961    ///
962    /// Returns an error if the Session with the given `id` does not exist, or
963    /// if it does not use SURB balancing.
964    pub async fn update_surb_balancer_config(
965        &self,
966        id: &SessionId,
967        config: SurbBalancerConfig,
968    ) -> crate::errors::Result<()> {
969        let cfg = self
970            .sessions
971            .get(id)
972            .await
973            .ok_or(SessionManagerError::NonExistingSession)?
974            .surb_mgmt;
975
976        // Only update the config if there already was one before
977        if !cfg.is_disabled() {
978            cfg.update(&config);
979            Ok(())
980        } else {
981            Err(SessionManagerError::other(anyhow!("session does not use SURB balancing")).into())
982        }
983    }
984
985    /// Retrieves the configuration of SURB balancing for the given Session.
986    ///
987    /// Returns an error if the Session with the given `id` does not exist.
988    pub async fn get_surb_balancer_config(&self, id: &SessionId) -> crate::errors::Result<Option<SurbBalancerConfig>> {
989        match self.sessions.get(id).await {
990            Some(session) => Ok(Some(session.surb_mgmt.as_ref())
991                .filter(|c| !c.is_disabled())
992                .map(|d| d.as_config())),
993            None => Err(SessionManagerError::NonExistingSession.into()),
994        }
995    }
996
997    /// Gets estimations produced/received and consumed SURBs by the Session.
998    ///
999    /// For an outgoing Session (Entry) the pair is the number of SURBs sent (by us) and used (by the Exit).
1000    /// For an incoming Session (Exit) the pair is the number of SURBs received (from Entry) and used (by us).
1001    ///
1002    /// Returns an error if the Session with the given `id` does not exist.
1003    pub async fn get_surb_level_estimates(&self, id: &SessionId) -> crate::errors::Result<(u64, u64)> {
1004        match self.sessions.get(id).await {
1005            Some(session) => Ok((
1006                session
1007                    .surb_estimator
1008                    .produced
1009                    .load(std::sync::atomic::Ordering::Relaxed),
1010                session
1011                    .surb_estimator
1012                    .consumed
1013                    .load(std::sync::atomic::Ordering::Relaxed),
1014            )),
1015            None => Err(SessionManagerError::NonExistingSession.into()),
1016        }
1017    }
1018
1019    /// The main method to be called whenever data are received.
1020    ///
1021    /// It tries to recognize the message and correctly dispatches either
1022    /// the Session protocol or Start protocol messages.
1023    ///
1024    /// If the data are not recognized, they are returned as [`DispatchResult::Unrelated`].
1025    pub async fn dispatch_message(
1026        &self,
1027        pseudonym: HoprPseudonym,
1028        in_data: ApplicationDataIn,
1029    ) -> crate::errors::Result<DispatchResult> {
1030        if in_data.data.application_tag == HoprStartProtocol::START_PROTOCOL_MESSAGE_TAG {
1031            // This is a Start protocol message, so we handle it
1032            trace!("dispatching Start protocol message");
1033            return self
1034                .handle_start_protocol_message(pseudonym, in_data)
1035                .await
1036                .map(|_| DispatchResult::Processed);
1037        } else if self.session_tag_range.contains(&in_data.data.application_tag.as_u64()) {
1038            let session_id = SessionId::new(in_data.data.application_tag, pseudonym);
1039
1040            return if let Some(session_slot) = self.sessions.get(&session_id).await {
1041                trace!(?session_id, "received data for a registered session");
1042
1043                Ok(session_slot
1044                    .session_tx
1045                    .unbounded_send(in_data)
1046                    .map(|_| DispatchResult::Processed)
1047                    .map_err(SessionManagerError::other)?)
1048            } else {
1049                error!(%session_id, "received data from an unestablished session");
1050                Err(TransportSessionError::UnknownData)
1051            };
1052        }
1053
1054        trace!(tag = %in_data.data.application_tag, "received data not associated with session protocol or any existing session");
1055        Ok(DispatchResult::Unrelated(in_data))
1056    }
1057
1058    async fn handle_incoming_session_initiation(
1059        &self,
1060        pseudonym: HoprPseudonym,
1061        session_req: StartInitiation<SessionTarget, ByteCapabilities>,
1062    ) -> crate::errors::Result<()> {
1063        trace!(challenge = session_req.challenge, "received session initiation request");
1064
1065        debug!(%pseudonym, "got new session request, searching for a free session slot");
1066
1067        let mut msg_sender = self.msg_sender.get().cloned().ok_or(SessionManagerError::NotStarted)?;
1068
1069        let (mut new_session_notifier, mut close_session_notifier) = self
1070            .session_notifiers
1071            .get()
1072            .cloned()
1073            .ok_or(SessionManagerError::NotStarted)?;
1074
1075        // Reply routing uses SURBs only with the pseudonym of this Session's ID
1076        let reply_routing = DestinationRouting::Return(pseudonym.into());
1077
1078        let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<ApplicationDataIn>();
1079
1080        // Allocate a unique tag for this incoming session
1081        self.sessions.run_pending_tasks().await; // Needed so that entry_count is updated
1082        let allocated_tag = if self.maximum_sessions > self.sessions.entry_count() as usize {
1083            self.tag_allocator.allocate()
1084        } else {
1085            error!(%pseudonym, "cannot accept incoming session, the maximum number of sessions has been reached");
1086            None
1087        };
1088
1089        if let Some(allocated_tag) = allocated_tag {
1090            let session_id = SessionId::new(allocated_tag.value(), pseudonym);
1091            let allocated_tag = Arc::new(allocated_tag);
1092
1093            let slot = SessionSlot {
1094                session_tx: Arc::new(tx_session_data),
1095                routing_opts: reply_routing.clone(),
1096                abort_handles: Default::default(),
1097                surb_mgmt: Default::default(),
1098                surb_estimator: Default::default(),
1099                allocated_tag: Some(allocated_tag),
1100            };
1101            self.sessions.insert(session_id, slot.clone()).await;
1102
1103            debug!(%session_id, ?session_req, "assigned a new session");
1104
1105            let closure_notifier = Box::new(move |session_id: SessionId, reason: ClosureReason| {
1106                if let Err(error) = close_session_notifier.try_send((session_id, reason)) {
1107                    error!(%session_id, %error, %reason, "failed to notify session closure");
1108                }
1109            });
1110
1111            let session = if !session_req.capabilities.0.contains(Capability::NoRateControl) {
1112                // Because of SURB scarcity, control the egress rate of incoming sessions
1113                let egress_rate_control =
1114                    RateController::new(self.cfg.initial_return_session_egress_rate, Duration::from_secs(1));
1115
1116                // The Session request carries a "hint" as additional data telling what
1117                // the Session initiator has configured as its target buffer size in the Balancer.
1118                let target_surb_buffer_size = if session_req.additional_data > 0 {
1119                    (session_req.additional_data as u64).min(self.cfg.maximum_surb_buffer_size as u64)
1120                } else {
1121                    self.cfg.initial_return_session_egress_rate as u64
1122                        * self
1123                            .cfg
1124                            .minimum_surb_buffer_duration
1125                            .max(MIN_SURB_BUFFER_DURATION)
1126                            .as_secs()
1127                };
1128
1129                let surb_estimator_clone = slot.surb_estimator.clone();
1130                let session = HoprSession::new(
1131                    session_id,
1132                    reply_routing.clone(),
1133                    HoprSessionConfig {
1134                        capabilities: session_req.capabilities.into(),
1135                        frame_mtu: self.cfg.frame_mtu,
1136                        frame_timeout: self.cfg.max_frame_timeout,
1137                    },
1138                    (
1139                        // Sent packets = SURB consumption estimate
1140                        msg_sender
1141                            .clone()
1142                            .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1143                                // Each outgoing packet consumes one SURB
1144                                surb_estimator_clone
1145                                    .consumed
1146                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1147                                #[cfg(feature = "telemetry")]
1148                                crate::telemetry::record_session_surb_consumed(&session_id, 1);
1149                                futures::future::ok::<_, S::Error>((routing, data))
1150                            })
1151                            .rate_limit_with_controller(&egress_rate_control)
1152                            .buffer((2 * target_surb_buffer_size) as usize),
1153                        // Received packets = SURB retrieval estimate
1154                        rx_session_data.inspect(move |data| {
1155                            let produced = data.num_surbs_with_msg() as u64;
1156                            // Count the number of SURBs delivered with each incoming packet
1157                            surb_estimator_clone
1158                                .produced
1159                                .fetch_add(produced, std::sync::atomic::Ordering::Relaxed);
1160                            #[cfg(feature = "telemetry")]
1161                            crate::telemetry::record_session_surb_produced(&session_id, produced);
1162                        }),
1163                    ),
1164                    Some(closure_notifier),
1165                )?;
1166
1167                // The SURB balancer will start intervening by rate-limiting the
1168                // egress of the Session, once the estimated number of SURBs drops below
1169                // the target defined here. Otherwise, the maximum egress is allowed.
1170                let balancer_config = SurbBalancerConfig {
1171                    target_surb_buffer_size,
1172                    // At maximum egress, the SURB buffer drains in `minimum_surb_buffer_duration` seconds
1173                    max_surbs_per_sec: target_surb_buffer_size / self.cfg.minimum_surb_buffer_duration.as_secs(),
1174                    // No SURB decay at the Exit, since we know almost exactly how many SURBs
1175                    // were received
1176                    surb_decay: None,
1177                };
1178
1179                slot.surb_mgmt.update(&balancer_config);
1180
1181                // Spawn the SURB balancer only once we know we have registered the
1182                // abort handle with the pre-allocated Session slot
1183                debug!(%session_id, ?balancer_config ,"spawning exit SURB balancer");
1184                let balancer = SurbBalancer::new(
1185                    session_id,
1186                    SimpleBalancerController::default(),
1187                    slot.surb_estimator.clone(),
1188                    SurbControllerWithCorrection(egress_rate_control, 1), // 1 SURB per egress packet
1189                    slot.surb_mgmt.clone(),
1190                );
1191
1192                // Assign the SURB balancer and abort handles to the already allocated Session slot
1193                let (_, balancer_abort_handle) = balancer.start_control_loop(self.cfg.balancer_sampling_interval);
1194                slot.abort_handles
1195                    .lock()
1196                    .insert(SessionTasks::Balancer, balancer_abort_handle);
1197
1198                // Spawn a keep-alive stream notifying about the SURB buffer level towards the Entry
1199                if let Some(period) = self.cfg.surb_balance_notify_period {
1200                    let surb_estimator_clone = slot.surb_estimator.clone();
1201                    let (ka_controller, ka_abort_handle) = utils::spawn_keep_alive_stream(
1202                        session_id,
1203                        // Sent Keep-Alive packets also contribute to SURB consumption
1204                        msg_sender
1205                            .clone()
1206                            .with(move |(routing, data): (DestinationRouting, ApplicationDataOut)| {
1207                                // Each sent keepalive consumes 1 SURB
1208                                surb_estimator_clone
1209                                    .consumed
1210                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1211                                #[cfg(feature = "telemetry")]
1212                                crate::telemetry::record_session_surb_consumed(&session_id, 1);
1213                                futures::future::ok::<_, S::Error>((routing, data))
1214                            }),
1215                        slot.routing_opts.clone(),
1216                        SurbNotificationMode::Level(slot.surb_estimator.clone()),
1217                        slot.surb_mgmt.clone(),
1218                    );
1219
1220                    // Start keepalive stream towards the Entry with a predefined period
1221                    hopr_async_runtime::prelude::spawn(async move {
1222                        // Delay the stream execution by one period
1223                        hopr_async_runtime::prelude::sleep(period).await;
1224                        ka_controller.set_rate_per_unit(1, period);
1225                    });
1226
1227                    slot.abort_handles
1228                        .lock()
1229                        .insert(SessionTasks::KeepAlive, ka_abort_handle);
1230
1231                    debug!(%session_id, ?period, "started SURB level-notifying keep-alive stream");
1232                }
1233
1234                session
1235            } else {
1236                HoprSession::new(
1237                    session_id,
1238                    reply_routing.clone(),
1239                    HoprSessionConfig {
1240                        capabilities: session_req.capabilities.into(),
1241                        frame_mtu: self.cfg.frame_mtu,
1242                        frame_timeout: self.cfg.max_frame_timeout,
1243                    },
1244                    (msg_sender.clone(), rx_session_data),
1245                    Some(closure_notifier),
1246                )?
1247            };
1248
1249            // Extract useful information about the session from the Start protocol message
1250            let incoming_session = IncomingSession {
1251                session,
1252                target: session_req.target,
1253            };
1254
1255            // Notify that a new incoming session has been created
1256            match new_session_notifier
1257                .send(incoming_session)
1258                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1259                .await
1260            {
1261                Err(_) => {
1262                    error!(%session_id, "timeout to notify about new incoming session");
1263                    return Err(TransportSessionError::Timeout);
1264                }
1265                Ok(Err(error)) => {
1266                    error!(%session_id, %error, "failed to notify about new incoming session");
1267                    return Err(SessionManagerError::other(error).into());
1268                }
1269                _ => {}
1270            };
1271
1272            trace!(?session_id, "session notification sent");
1273
1274            // Notify the sender that the session has been established.
1275            // Set our peer ID in the session ID sent back to them.
1276            let data = HoprStartProtocol::SessionEstablished(StartEstablished {
1277                orig_challenge: session_req.challenge,
1278                session_id,
1279            });
1280
1281            msg_sender
1282                .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1283                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1284                .await
1285                .map_err(|_| {
1286                    error!(%session_id, "timeout sending session establishment message");
1287                    TransportSessionError::Timeout
1288                })?
1289                .map_err(|error| {
1290                    error!(%session_id, %error, "failed to send session establishment message");
1291                    SessionManagerError::other(error)
1292                })?;
1293
1294            #[cfg(feature = "telemetry")]
1295            {
1296                initialize_session_metrics(
1297                    session_id,
1298                    HoprSessionConfig {
1299                        capabilities: session_req.capabilities.0,
1300                        frame_mtu: self.cfg.frame_mtu,
1301                        frame_timeout: self.cfg.max_frame_timeout,
1302                    },
1303                );
1304                set_session_state(&session_id, SessionLifecycleState::Active);
1305                set_session_balancer_data(&session_id, slot.surb_estimator.clone(), slot.surb_mgmt.clone());
1306            }
1307
1308            info!(%session_id, "new session established");
1309
1310            #[cfg(all(feature = "telemetry", not(test)))]
1311            {
1312                METRIC_NUM_ESTABLISHED_SESSIONS.increment();
1313                METRIC_ACTIVE_SESSIONS.increment(1.0);
1314            }
1315        } else {
1316            error!(%pseudonym,"failed to reserve a new session slot");
1317
1318            // Notify the sender that the session could not be established
1319            let reason = StartErrorReason::NoSlotsAvailable;
1320            let data = HoprStartProtocol::SessionError(StartErrorType {
1321                challenge: session_req.challenge,
1322                reason,
1323            });
1324
1325            msg_sender
1326                .send((reply_routing, ApplicationDataOut::with_no_packet_info(data.try_into()?)))
1327                .timeout(futures_time::time::Duration::from(EXTERNAL_SEND_TIMEOUT))
1328                .await
1329                .map_err(|_| {
1330                    error!("timeout sending session error message");
1331                    TransportSessionError::Timeout
1332                })?
1333                .map_err(|error| {
1334                    error!(%error, "failed to send session error message");
1335                    SessionManagerError::other(error)
1336                })?;
1337
1338            trace!(%pseudonym, "session establishment failure message sent");
1339
1340            #[cfg(all(feature = "telemetry", not(test)))]
1341            METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
1342        }
1343
1344        Ok(())
1345    }
1346
1347    async fn handle_start_protocol_message(
1348        &self,
1349        pseudonym: HoprPseudonym,
1350        data: ApplicationDataIn,
1351    ) -> crate::errors::Result<()> {
1352        match HoprStartProtocol::try_from(data.data)? {
1353            HoprStartProtocol::StartSession(session_req) => {
1354                self.handle_incoming_session_initiation(pseudonym, session_req).await?;
1355            }
1356            HoprStartProtocol::SessionEstablished(est) => {
1357                trace!(
1358                    session_id = ?est.session_id,
1359                    "received session establishment confirmation"
1360                );
1361                let challenge = est.orig_challenge;
1362                let session_id = est.session_id;
1363                if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
1364                    if let Err(error) = tx_est.unbounded_send(Ok(est)) {
1365                        error!(%challenge, %session_id, %error, "failed to send session establishment confirmation");
1366                        return Err(SessionManagerError::other(error).into());
1367                    }
1368                    debug!(?session_id, challenge, "session establishment complete");
1369                } else {
1370                    error!(%session_id, challenge, "unknown session establishment attempt or expired");
1371                }
1372            }
1373            HoprStartProtocol::SessionError(error_type) => {
1374                trace!(
1375                    challenge = error_type.challenge,
1376                    error = ?error_type.reason,
1377                    "failed to initialize a session",
1378                );
1379                // Currently, we do not distinguish between individual error types
1380                // and just discard the initiation attempt and pass on the error.
1381                if let Some(tx_est) = self.session_initiations.remove(&error_type.challenge).await {
1382                    if let Err(error) = tx_est.unbounded_send(Err(error_type)) {
1383                        error!(%error, ?error_type, "could not send session error message");
1384                        return Err(SessionManagerError::other(error).into());
1385                    }
1386                    error!(
1387                        challenge = error_type.challenge,
1388                        ?error_type,
1389                        "session establishment error received"
1390                    );
1391                } else {
1392                    error!(
1393                        challenge = error_type.challenge,
1394                        ?error_type,
1395                        "session establishment attempt expired before error could be delivered"
1396                    );
1397                }
1398
1399                #[cfg(all(feature = "telemetry", not(test)))]
1400                METRIC_RECEIVED_SESSION_ERRS.increment(&[&error_type.reason.to_string()])
1401            }
1402            HoprStartProtocol::KeepAlive(msg) => {
1403                let session_id = msg.session_id;
1404                if let Some(session_slot) = self.sessions.get(&session_id).await {
1405                    trace!(?session_id, "received keep-alive message");
1406                    match &session_slot.routing_opts {
1407                        // Session is outgoing - keep-alive was received from the Exit
1408                        DestinationRouting::Forward { .. } => {
1409                            if msg.flags.contains(KeepAliveFlag::BalancerState)
1410                                && !session_slot.surb_mgmt.is_disabled()
1411                                && session_slot.surb_mgmt.buffer_level() != msg.additional_data
1412                            {
1413                                // Update the buffer level as sent to us from the Exit
1414                                session_slot
1415                                    .surb_mgmt
1416                                    .buffer_level
1417                                    .store(msg.additional_data, std::sync::atomic::Ordering::Relaxed);
1418                                debug!(%session_id, surb_level = msg.additional_data, "keep-alive updated SURB buffer size from the Exit");
1419                            }
1420
1421                            // Increase the number of consumed SURBs in the estimator
1422                            session_slot
1423                                .surb_estimator
1424                                .consumed
1425                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1426                            #[cfg(feature = "telemetry")]
1427                            crate::telemetry::record_session_surb_consumed(&session_id, 1);
1428                        }
1429                        // Session is incoming - keep-alive was received from the Entry
1430                        DestinationRouting::Return(_) => {
1431                            // Allow updating SURB balancer target based on the received Keep-Alive message
1432                            if msg.flags.contains(KeepAliveFlag::BalancerTarget)
1433                                && msg.additional_data > 0
1434                                && !session_slot.surb_mgmt.is_disabled()
1435                                && session_slot.surb_mgmt.controller_bounds().target() != msg.additional_data
1436                            {
1437                                // Update the target buffer size as sent to us from the Entry
1438                                session_slot
1439                                    .surb_mgmt
1440                                    .target_surb_buffer_size
1441                                    .store(msg.additional_data, std::sync::atomic::Ordering::Relaxed);
1442                                // Update maximum SURBs per second based on the new target
1443                                session_slot.surb_mgmt.max_surbs_per_sec.store(
1444                                    msg.additional_data / self.cfg.minimum_surb_buffer_duration.as_secs(),
1445                                    std::sync::atomic::Ordering::Relaxed,
1446                                );
1447                                debug!(%session_id, target_surb_buffer_size = msg.additional_data, "keep-alive updated SURB balancer target buffer size from the Entry");
1448                            }
1449
1450                            // Increase the number of received SURBs in the estimator.
1451                            // Typically, 2 SURBs per Keep-Alive message
1452                            let produced = KeepAliveMessage::<SessionId>::MIN_SURBS_PER_MESSAGE as u64;
1453                            session_slot
1454                                .surb_estimator
1455                                .produced
1456                                .fetch_add(produced, std::sync::atomic::Ordering::Relaxed);
1457                            #[cfg(feature = "telemetry")]
1458                            crate::telemetry::record_session_surb_produced(&session_id, produced);
1459                        }
1460                    }
1461                } else {
1462                    debug!(%session_id, "received keep-alive request for an unknown session");
1463                }
1464            }
1465        }
1466
1467        Ok(())
1468    }
1469}
1470
1471#[cfg(test)]
1472mod tests {
1473    use anyhow::anyhow;
1474    use futures::{AsyncWriteExt, future::BoxFuture};
1475    use hopr_network_types::prelude::SealedHost;
1476    use hopr_protocol_start::{StartProtocol, StartProtocolDiscriminants};
1477    use hopr_types::{
1478        crypto::{keypairs::ChainKeypair, prelude::Keypair},
1479        crypto_random::Randomizable,
1480        internal::routing::SurbMatcher,
1481        primitive::prelude::Address,
1482    };
1483    use tokio::time::timeout;
1484
1485    use super::*;
1486    use crate::{Capabilities, Capability, balancer::SurbBalancerConfig, types::SessionTarget};
1487
1488    /// Create a test tag allocator with a large partition.
1489    fn test_tag_allocator() -> Arc<dyn TagAllocator + Send + Sync> {
1490        test_tag_allocator_with_session_capacity(10000)
1491    }
1492
1493    /// Create a test tag allocator with a specific session partition capacity.
1494    fn test_tag_allocator_with_session_capacity(session_capacity: u64) -> Arc<dyn TagAllocator + Send + Sync> {
1495        hopr_transport_tag_allocator::create_allocators(
1496            ReservedTag::range().end..u16::MAX as u64 + 1,
1497            [
1498                (hopr_transport_tag_allocator::Usage::Session, session_capacity),
1499                (hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry, 10000),
1500                (hopr_transport_tag_allocator::Usage::ProvingTelemetry, 10000),
1501            ],
1502        )
1503        .expect("test allocator creation must not fail")
1504        .into_iter()
1505        .find(|(u, _)| matches!(u, hopr_transport_tag_allocator::Usage::Session))
1506        .expect("session allocator must exist")
1507        .1
1508    }
1509
1510    #[async_trait::async_trait]
1511    trait SendMsg {
1512        async fn send_message(
1513            &self,
1514            routing: DestinationRouting,
1515            data: ApplicationDataOut,
1516        ) -> crate::errors::Result<()>;
1517    }
1518
1519    mockall::mock! {
1520        MsgSender {}
1521        impl SendMsg for MsgSender {
1522            fn send_message<'a, 'b>(&'a self, routing: DestinationRouting, data: ApplicationDataOut)
1523            -> BoxFuture<'b, crate::errors::Result<()>> where 'a: 'b, Self: Sync + 'b;
1524        }
1525    }
1526
1527    fn mock_packet_planning(sender: MockMsgSender) -> UnboundedSender<(DestinationRouting, ApplicationDataOut)> {
1528        let (tx, rx) = futures::channel::mpsc::unbounded();
1529        tokio::task::spawn(async move {
1530            pin_mut!(rx);
1531            while let Some((routing, data)) = rx.next().await {
1532                sender
1533                    .send_message(routing, data)
1534                    .await
1535                    .expect("send message must not fail in mock");
1536            }
1537        });
1538        tx
1539    }
1540
1541    fn msg_type(data: &ApplicationDataOut, expected: StartProtocolDiscriminants) -> bool {
1542        HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1543            .map(|d| StartProtocolDiscriminants::from(d) == expected)
1544            .unwrap_or(false)
1545    }
1546
1547    fn start_msg_match(data: &ApplicationDataOut, msg: impl Fn(HoprStartProtocol) -> bool) -> bool {
1548        HoprStartProtocol::decode(data.data.application_tag, &data.data.plain_text)
1549            .map(msg)
1550            .unwrap_or(false)
1551    }
1552
1553    #[test_log::test(tokio::test)]
1554    async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
1555    {
1556        let alice_pseudonym = HoprPseudonym::random();
1557        let bob_peer: Address = (&ChainKeypair::random()).into();
1558
1559        let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1560        let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1561
1562        let mut sequence = mockall::Sequence::new();
1563        let mut alice_transport = MockMsgSender::new();
1564        let mut bob_transport = MockMsgSender::new();
1565
1566        // Alice sends the StartSession message
1567        let bob_mgr_clone = bob_mgr.clone();
1568        alice_transport
1569            .expect_send_message()
1570            .once()
1571            .in_sequence(&mut sequence)
1572            .withf(move |peer, data| {
1573                info!("alice sends {}", data.data.application_tag);
1574                msg_type(data, StartProtocolDiscriminants::StartSession)
1575                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1576            })
1577            .returning(move |_, data| {
1578                let bob_mgr_clone = bob_mgr_clone.clone();
1579                Box::pin(async move {
1580                    bob_mgr_clone
1581                        .dispatch_message(
1582                            alice_pseudonym,
1583                            ApplicationDataIn {
1584                                data: data.data,
1585                                packet_info: Default::default(),
1586                            },
1587                        )
1588                        .await?;
1589                    Ok(())
1590                })
1591            });
1592
1593        // Bob sends the SessionEstablished message
1594        let alice_mgr_clone = alice_mgr.clone();
1595        bob_transport
1596            .expect_send_message()
1597            .once()
1598            .in_sequence(&mut sequence)
1599            .withf(move |peer, data| {
1600                info!("bob sends {}", data.data.application_tag);
1601                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1602                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1603            })
1604            .returning(move |_, data| {
1605                let alice_mgr_clone = alice_mgr_clone.clone();
1606
1607                Box::pin(async move {
1608                    alice_mgr_clone
1609                        .dispatch_message(
1610                            alice_pseudonym,
1611                            ApplicationDataIn {
1612                                data: data.data,
1613                                packet_info: Default::default(),
1614                            },
1615                        )
1616                        .await?;
1617                    Ok(())
1618                })
1619            });
1620
1621        // Alice sends the terminating segment to close the Session
1622        let bob_mgr_clone = bob_mgr.clone();
1623        alice_transport
1624            .expect_send_message()
1625            .once()
1626            .in_sequence(&mut sequence)
1627            .withf(move |peer, data| {
1628                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
1629                    data.data.plain_text.as_ref(),
1630                )
1631                .expect("must be a session message")
1632                .try_as_segment()
1633                .expect("must be a segment")
1634                .is_terminating()
1635                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1636            })
1637            .returning(move |_, data| {
1638                let bob_mgr_clone = bob_mgr_clone.clone();
1639                Box::pin(async move {
1640                    bob_mgr_clone
1641                        .dispatch_message(
1642                            alice_pseudonym,
1643                            ApplicationDataIn {
1644                                data: data.data,
1645                                packet_info: Default::default(),
1646                            },
1647                        )
1648                        .await?;
1649                    Ok(())
1650                })
1651            });
1652
1653        let mut ahs = Vec::new();
1654
1655        // Start Alice
1656        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1657        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1658
1659        // Start Bob
1660        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1661        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1662
1663        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1664
1665        pin_mut!(new_session_rx_bob);
1666        let (alice_session, bob_session) = timeout(
1667            Duration::from_secs(2),
1668            futures::future::join(
1669                alice_mgr.new_session(
1670                    bob_peer,
1671                    SessionTarget::TcpStream(target.clone()),
1672                    SessionClientConfig {
1673                        pseudonym: alice_pseudonym.into(),
1674                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1675                        surb_management: None,
1676                        ..Default::default()
1677                    },
1678                ),
1679                new_session_rx_bob.next(),
1680            ),
1681        )
1682        .await?;
1683
1684        let mut alice_session = alice_session?;
1685        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1686
1687        assert_eq!(
1688            alice_session.config().capabilities,
1689            Capability::Segmentation | Capability::NoRateControl
1690        );
1691        assert_eq!(
1692            alice_session.config().capabilities,
1693            bob_session.session.config().capabilities
1694        );
1695        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1696
1697        assert_eq!(vec![*alice_session.id()], alice_mgr.active_sessions().await);
1698        assert_eq!(None, alice_mgr.get_surb_balancer_config(alice_session.id()).await?);
1699        assert!(
1700            alice_mgr
1701                .update_surb_balancer_config(alice_session.id(), SurbBalancerConfig::default())
1702                .await
1703                .is_err()
1704        );
1705
1706        assert_eq!(vec![*bob_session.session.id()], bob_mgr.active_sessions().await);
1707        assert_eq!(None, bob_mgr.get_surb_balancer_config(bob_session.session.id()).await?);
1708        assert!(
1709            bob_mgr
1710                .update_surb_balancer_config(bob_session.session.id(), SurbBalancerConfig::default())
1711                .await
1712                .is_err()
1713        );
1714
1715        tokio::time::sleep(Duration::from_millis(100)).await;
1716        alice_session.close().await?;
1717
1718        tokio::time::sleep(Duration::from_millis(100)).await;
1719
1720        assert!(matches!(
1721            alice_mgr.ping_session(alice_session.id()).await,
1722            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1723        ));
1724
1725        futures::stream::iter(ahs)
1726            .for_each(|ah| async move { ah.abort() })
1727            .await;
1728
1729        Ok(())
1730    }
1731
1732    #[test_log::test(tokio::test)]
1733    async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
1734        let alice_pseudonym = HoprPseudonym::random();
1735        let bob_peer: Address = (&ChainKeypair::random()).into();
1736
1737        let cfg = SessionManagerConfig {
1738            idle_timeout: Duration::from_millis(200),
1739            ..Default::default()
1740        };
1741
1742        let alice_mgr = SessionManager::new(cfg, test_tag_allocator());
1743        let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1744
1745        let mut sequence = mockall::Sequence::new();
1746        let mut alice_transport = MockMsgSender::new();
1747        let mut bob_transport = MockMsgSender::new();
1748
1749        // Alice sends the StartSession message
1750        let bob_mgr_clone = bob_mgr.clone();
1751        alice_transport
1752            .expect_send_message()
1753            .once()
1754            .in_sequence(&mut sequence)
1755            .withf(move |peer, data| {
1756                msg_type(data, StartProtocolDiscriminants::StartSession)
1757                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1758            })
1759            .returning(move |_, data| {
1760                let bob_mgr_clone = bob_mgr_clone.clone();
1761                Box::pin(async move {
1762                    bob_mgr_clone
1763                        .dispatch_message(
1764                            alice_pseudonym,
1765                            ApplicationDataIn {
1766                                data: data.data,
1767                                packet_info: Default::default(),
1768                            },
1769                        )
1770                        .await?;
1771                    Ok(())
1772                })
1773            });
1774
1775        // Bob sends the SessionEstablished message
1776        let alice_mgr_clone = alice_mgr.clone();
1777        bob_transport
1778            .expect_send_message()
1779            .once()
1780            .in_sequence(&mut sequence)
1781            .withf(move |peer, data| {
1782                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
1783                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1784            })
1785            .returning(move |_, data| {
1786                let alice_mgr_clone = alice_mgr_clone.clone();
1787
1788                Box::pin(async move {
1789                    alice_mgr_clone
1790                        .dispatch_message(
1791                            alice_pseudonym,
1792                            ApplicationDataIn {
1793                                data: data.data,
1794                                packet_info: Default::default(),
1795                            },
1796                        )
1797                        .await?;
1798                    Ok(())
1799                })
1800            });
1801
1802        let mut ahs = Vec::new();
1803
1804        // Start Alice
1805        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1806        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
1807
1808        // Start Bob
1809        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
1810        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
1811
1812        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
1813
1814        pin_mut!(new_session_rx_bob);
1815        let (alice_session, bob_session) = timeout(
1816            Duration::from_secs(2),
1817            futures::future::join(
1818                alice_mgr.new_session(
1819                    bob_peer,
1820                    SessionTarget::TcpStream(target.clone()),
1821                    SessionClientConfig {
1822                        pseudonym: alice_pseudonym.into(),
1823                        capabilities: Capability::NoRateControl | Capability::Segmentation,
1824                        surb_management: None,
1825                        ..Default::default()
1826                    },
1827                ),
1828                new_session_rx_bob.next(),
1829            ),
1830        )
1831        .await?;
1832
1833        let alice_session = alice_session?;
1834        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
1835
1836        assert_eq!(
1837            alice_session.config().capabilities,
1838            Capability::Segmentation | Capability::NoRateControl,
1839        );
1840        assert_eq!(
1841            alice_session.config().capabilities,
1842            bob_session.session.config().capabilities
1843        );
1844        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
1845
1846        // Let the session timeout at Alice
1847        tokio::time::sleep(Duration::from_millis(300)).await;
1848
1849        assert!(matches!(
1850            alice_mgr.ping_session(alice_session.id()).await,
1851            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
1852        ));
1853
1854        futures::stream::iter(ahs)
1855            .for_each(|ah| async move { ah.abort() })
1856            .await;
1857
1858        Ok(())
1859    }
1860
1861    #[test_log::test(tokio::test)]
1862    async fn session_manager_should_update_surb_balancer_config() -> anyhow::Result<()> {
1863        let alice_pseudonym = HoprPseudonym::random();
1864        let session_id = SessionId::new(16u64, alice_pseudonym);
1865        let balancer_cfg = SurbBalancerConfig {
1866            target_surb_buffer_size: 1000,
1867            max_surbs_per_sec: 100,
1868            ..Default::default()
1869        };
1870
1871        let alice_mgr = SessionManager::<
1872            UnboundedSender<(DestinationRouting, ApplicationDataOut)>,
1873            futures::channel::mpsc::Sender<IncomingSession>,
1874        >::new(Default::default(), test_tag_allocator());
1875
1876        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1877        alice_mgr
1878            .sessions
1879            .insert(
1880                session_id,
1881                SessionSlot {
1882                    session_tx: Arc::new(dummy_tx),
1883                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1884                    abort_handles: Default::default(),
1885                    surb_mgmt: Arc::new(BalancerStateValues::from(balancer_cfg)),
1886                    surb_estimator: Default::default(),
1887                    allocated_tag: None,
1888                },
1889            )
1890            .await;
1891
1892        let actual_cfg = alice_mgr
1893            .get_surb_balancer_config(&session_id)
1894            .await?
1895            .ok_or(anyhow!("session must have a surb balancer config"))?;
1896        assert_eq!(actual_cfg, balancer_cfg);
1897
1898        let new_cfg = SurbBalancerConfig {
1899            target_surb_buffer_size: 2000,
1900            max_surbs_per_sec: 200,
1901            ..Default::default()
1902        };
1903        alice_mgr.update_surb_balancer_config(&session_id, new_cfg).await?;
1904
1905        let actual_cfg = alice_mgr
1906            .get_surb_balancer_config(&session_id)
1907            .await?
1908            .ok_or(anyhow!("session must have a surb balancer config"))?;
1909        assert_eq!(actual_cfg, new_cfg);
1910
1911        Ok(())
1912    }
1913
1914    #[test_log::test(tokio::test)]
1915    async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1916        let alice_pseudonym = HoprPseudonym::random();
1917        let bob_peer: Address = (&ChainKeypair::random()).into();
1918
1919        let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
1920        let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator_with_session_capacity(1));
1921
1922        // Occupy the only free slot with tag 16
1923        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1924        bob_mgr
1925            .sessions
1926            .insert(
1927                SessionId::new(16u64, alice_pseudonym),
1928                SessionSlot {
1929                    session_tx: Arc::new(dummy_tx),
1930                    routing_opts: DestinationRouting::Return(SurbMatcher::Pseudonym(alice_pseudonym)),
1931                    abort_handles: Default::default(),
1932                    surb_mgmt: Default::default(),
1933                    surb_estimator: Default::default(),
1934                    allocated_tag: None,
1935                },
1936            )
1937            .await;
1938
1939        let mut sequence = mockall::Sequence::new();
1940        let mut alice_transport = MockMsgSender::new();
1941        let mut bob_transport = MockMsgSender::new();
1942
1943        // Alice sends the StartSession message
1944        let bob_mgr_clone = bob_mgr.clone();
1945        alice_transport
1946            .expect_send_message()
1947            .once()
1948            .in_sequence(&mut sequence)
1949            .withf(move |peer, data| {
1950                msg_type(data, StartProtocolDiscriminants::StartSession)
1951                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
1952            })
1953            .returning(move |_, data| {
1954                let bob_mgr_clone = bob_mgr_clone.clone();
1955                Box::pin(async move {
1956                    bob_mgr_clone
1957                        .dispatch_message(
1958                            alice_pseudonym,
1959                            ApplicationDataIn {
1960                                data: data.data,
1961                                packet_info: Default::default(),
1962                            },
1963                        )
1964                        .await?;
1965                    Ok(())
1966                })
1967            });
1968
1969        // Bob sends the SessionError message
1970        let alice_mgr_clone = alice_mgr.clone();
1971        bob_transport
1972            .expect_send_message()
1973            .once()
1974            .in_sequence(&mut sequence)
1975            .withf(move |peer, data| {
1976                msg_type(data, StartProtocolDiscriminants::SessionError)
1977                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
1978            })
1979            .returning(move |_, data| {
1980                let alice_mgr_clone = alice_mgr_clone.clone();
1981                Box::pin(async move {
1982                    alice_mgr_clone
1983                        .dispatch_message(
1984                            alice_pseudonym,
1985                            ApplicationDataIn {
1986                                data: data.data,
1987                                packet_info: Default::default(),
1988                            },
1989                        )
1990                        .await?;
1991                    Ok(())
1992                })
1993            });
1994
1995        let mut jhs = Vec::new();
1996
1997        // Start Alice
1998        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
1999        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2000
2001        // Start Bob
2002        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2003        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2004
2005        let result = alice_mgr
2006            .new_session(
2007                bob_peer,
2008                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2009                SessionClientConfig {
2010                    capabilities: Capabilities::empty(),
2011                    pseudonym: alice_pseudonym.into(),
2012                    surb_management: None,
2013                    ..Default::default()
2014                },
2015            )
2016            .await;
2017
2018        assert!(
2019            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2020        );
2021
2022        Ok(())
2023    }
2024
2025    #[test_log::test(tokio::test)]
2026    async fn session_manager_should_not_allow_establish_session_when_maximum_number_of_session_is_reached()
2027    -> anyhow::Result<()> {
2028        let alice_pseudonym = HoprPseudonym::random();
2029        let bob_peer: Address = (&ChainKeypair::random()).into();
2030
2031        let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2032        let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator_with_session_capacity(1));
2033
2034        // Occupy the only free slot with tag 16
2035        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
2036        bob_mgr
2037            .sessions
2038            .insert(
2039                SessionId::new(16u64, alice_pseudonym),
2040                SessionSlot {
2041                    session_tx: Arc::new(dummy_tx),
2042                    routing_opts: DestinationRouting::Return(alice_pseudonym.into()),
2043                    abort_handles: Default::default(),
2044                    surb_mgmt: Default::default(),
2045                    surb_estimator: Default::default(),
2046                    allocated_tag: None,
2047                },
2048            )
2049            .await;
2050
2051        let mut sequence = mockall::Sequence::new();
2052        let mut alice_transport = MockMsgSender::new();
2053        let mut bob_transport = MockMsgSender::new();
2054
2055        // Alice sends the StartSession message
2056        let bob_mgr_clone = bob_mgr.clone();
2057        alice_transport
2058            .expect_send_message()
2059            .once()
2060            .in_sequence(&mut sequence)
2061            .withf(move |peer, data| {
2062                msg_type(data, StartProtocolDiscriminants::StartSession)
2063                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2064            })
2065            .returning(move |_, data| {
2066                let bob_mgr_clone = bob_mgr_clone.clone();
2067                Box::pin(async move {
2068                    bob_mgr_clone
2069                        .dispatch_message(
2070                            alice_pseudonym,
2071                            ApplicationDataIn {
2072                                data: data.data,
2073                                packet_info: Default::default(),
2074                            },
2075                        )
2076                        .await?;
2077                    Ok(())
2078                })
2079            });
2080
2081        // Bob sends the SessionError message
2082        let alice_mgr_clone = alice_mgr.clone();
2083        bob_transport
2084            .expect_send_message()
2085            .once()
2086            .in_sequence(&mut sequence)
2087            .withf(move |peer, data| {
2088                msg_type(data, StartProtocolDiscriminants::SessionError)
2089                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2090            })
2091            .returning(move |_, data| {
2092                let alice_mgr_clone = alice_mgr_clone.clone();
2093                Box::pin(async move {
2094                    alice_mgr_clone
2095                        .dispatch_message(
2096                            alice_pseudonym,
2097                            ApplicationDataIn {
2098                                data: data.data,
2099                                packet_info: Default::default(),
2100                            },
2101                        )
2102                        .await?;
2103                    Ok(())
2104                })
2105            });
2106
2107        let mut jhs = Vec::new();
2108
2109        // Start Alice
2110        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2111        jhs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2112
2113        // Start Bob
2114        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2115        jhs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2116
2117        let result = alice_mgr
2118            .new_session(
2119                bob_peer,
2120                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2121                SessionClientConfig {
2122                    capabilities: None.into(),
2123                    pseudonym: alice_pseudonym.into(),
2124                    surb_management: None,
2125                    ..Default::default()
2126                },
2127            )
2128            .await;
2129
2130        assert!(
2131            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
2132        );
2133
2134        Ok(())
2135    }
2136
2137    #[test_log::test(tokio::test)]
2138    async fn session_manager_should_not_allow_loopback_sessions() -> anyhow::Result<()> {
2139        let alice_pseudonym = HoprPseudonym::random();
2140        let bob_peer: Address = (&ChainKeypair::random()).into();
2141
2142        let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2143
2144        let mut sequence = mockall::Sequence::new();
2145        let mut alice_transport = MockMsgSender::new();
2146
2147        // Alice sends the StartSession message
2148        let alice_mgr_clone = alice_mgr.clone();
2149        alice_transport
2150            .expect_send_message()
2151            .once()
2152            .in_sequence(&mut sequence)
2153            .withf(move |peer, data| {
2154                msg_type(data, StartProtocolDiscriminants::StartSession)
2155                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2156            })
2157            .returning(move |_, data| {
2158                // But the message is again processed by Alice due to Loopback
2159                let alice_mgr_clone = alice_mgr_clone.clone();
2160                Box::pin(async move {
2161                    alice_mgr_clone
2162                        .dispatch_message(
2163                            alice_pseudonym,
2164                            ApplicationDataIn {
2165                                data: data.data,
2166                                packet_info: Default::default(),
2167                            },
2168                        )
2169                        .await?;
2170                    Ok(())
2171                })
2172            });
2173
2174        // Alice sends the SessionEstablished message (as Bob)
2175        let alice_mgr_clone = alice_mgr.clone();
2176        alice_transport
2177            .expect_send_message()
2178            .once()
2179            .in_sequence(&mut sequence)
2180            .withf(move |peer, data| {
2181                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2182                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2183            })
2184            .returning(move |_, data| {
2185                let alice_mgr_clone = alice_mgr_clone.clone();
2186
2187                Box::pin(async move {
2188                    alice_mgr_clone
2189                        .dispatch_message(
2190                            alice_pseudonym,
2191                            ApplicationDataIn {
2192                                data: data.data,
2193                                packet_info: Default::default(),
2194                            },
2195                        )
2196                        .await?;
2197                    Ok(())
2198                })
2199            });
2200
2201        // Start Alice
2202        let (new_session_tx_alice, new_session_rx_alice) = futures::channel::mpsc::channel(1024);
2203        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2204
2205        let alice_session = alice_mgr
2206            .new_session(
2207                bob_peer,
2208                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2209                SessionClientConfig {
2210                    capabilities: None.into(),
2211                    pseudonym: alice_pseudonym.into(),
2212                    surb_management: None,
2213                    ..Default::default()
2214                },
2215            )
2216            .await;
2217
2218        println!("{alice_session:?}");
2219        assert!(matches!(
2220            alice_session,
2221            Err(TransportSessionError::Manager(SessionManagerError::Loopback))
2222        ));
2223
2224        drop(new_session_rx_alice);
2225        Ok(())
2226    }
2227
2228    #[test_log::test(tokio::test)]
2229    async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
2230        let bob_peer: Address = (&ChainKeypair::random()).into();
2231
2232        let cfg = SessionManagerConfig {
2233            initiation_timeout_base: Duration::from_millis(100),
2234            ..Default::default()
2235        };
2236
2237        let alice_mgr = SessionManager::new(cfg, test_tag_allocator());
2238        let bob_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2239
2240        let mut sequence = mockall::Sequence::new();
2241        let mut alice_transport = MockMsgSender::new();
2242        let bob_transport = MockMsgSender::new();
2243
2244        // Alice sends the StartSession message, but Bob does not handle it
2245        alice_transport
2246            .expect_send_message()
2247            .once()
2248            .in_sequence(&mut sequence)
2249            .withf(move |peer, data| {
2250                msg_type(data, StartProtocolDiscriminants::StartSession)
2251                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2252            })
2253            .returning(|_, _| Box::pin(async { Ok(()) }));
2254
2255        // Start Alice
2256        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2257        alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?;
2258
2259        // Start Bob
2260        let (new_session_tx_bob, _) = futures::channel::mpsc::channel(1024);
2261        bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?;
2262
2263        let result = alice_mgr
2264            .new_session(
2265                bob_peer,
2266                SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2267                SessionClientConfig {
2268                    capabilities: None.into(),
2269                    pseudonym: None,
2270                    surb_management: None,
2271                    ..Default::default()
2272                },
2273            )
2274            .await;
2275
2276        assert!(matches!(result, Err(TransportSessionError::Timeout)));
2277
2278        Ok(())
2279    }
2280
2281    #[cfg(feature = "telemetry")]
2282    #[test_log::test(tokio::test)]
2283    async fn failed_incoming_session_establishment_does_not_register_telemetry() -> anyhow::Result<()> {
2284        let mgr = SessionManager::new(Default::default(), test_tag_allocator());
2285
2286        let transport = MockMsgSender::new();
2287        let (new_session_tx, new_session_rx) = futures::channel::mpsc::channel(1);
2288        drop(new_session_rx);
2289        mgr.start(mock_packet_planning(transport), new_session_tx)?;
2290
2291        let pseudonym = HoprPseudonym::random();
2292        let result = mgr
2293            .handle_incoming_session_initiation(
2294                pseudonym,
2295                StartInitiation {
2296                    challenge: MIN_CHALLENGE,
2297                    target: SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
2298                    capabilities: ByteCapabilities(Capabilities::empty()),
2299                    additional_data: 0,
2300                },
2301            )
2302            .await;
2303
2304        assert!(result.is_err());
2305
2306        let allocated_session_ids = mgr.active_sessions().await;
2307        assert_eq!(1, allocated_session_ids.len());
2308
2309        Ok(())
2310    }
2311
2312    #[test_log::test(tokio::test)]
2313    async fn session_manager_should_send_keep_alives_via_surb_balancer() -> anyhow::Result<()> {
2314        let alice_pseudonym = HoprPseudonym::random();
2315        let bob_peer: Address = (&ChainKeypair::random()).into();
2316
2317        let bob_cfg = SessionManagerConfig {
2318            surb_balance_notify_period: Some(Duration::from_millis(500)),
2319            ..Default::default()
2320        };
2321        let alice_mgr = SessionManager::new(Default::default(), test_tag_allocator());
2322        let bob_mgr = SessionManager::new(bob_cfg.clone(), test_tag_allocator());
2323
2324        let mut alice_transport = MockMsgSender::new();
2325        let mut bob_transport = MockMsgSender::new();
2326
2327        // Alice sends the StartSession message
2328        let mut open_sequence = mockall::Sequence::new();
2329        let bob_mgr_clone = bob_mgr.clone();
2330        alice_transport
2331            .expect_send_message()
2332            .once()
2333            .in_sequence(&mut open_sequence)
2334            .withf(move |peer, data| {
2335                msg_type(data, StartProtocolDiscriminants::StartSession)
2336                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2337            })
2338            .returning(move |_, data| {
2339                let bob_mgr_clone = bob_mgr_clone.clone();
2340                Box::pin(async move {
2341                    bob_mgr_clone
2342                        .dispatch_message(
2343                            alice_pseudonym,
2344                            ApplicationDataIn {
2345                                data: data.data,
2346                                packet_info: Default::default(),
2347                            },
2348                        )
2349                        .await?;
2350                    Ok(())
2351                })
2352            });
2353
2354        // Bob sends the SessionEstablished message
2355        let alice_mgr_clone = alice_mgr.clone();
2356        bob_transport
2357            .expect_send_message()
2358            .once()
2359            .in_sequence(&mut open_sequence)
2360            .withf(move |peer, data| {
2361                msg_type(data, StartProtocolDiscriminants::SessionEstablished)
2362                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2363            })
2364            .returning(move |_, data| {
2365                let alice_mgr_clone = alice_mgr_clone.clone();
2366                Box::pin(async move {
2367                    alice_mgr_clone
2368                        .dispatch_message(
2369                            alice_pseudonym,
2370                            ApplicationDataIn {
2371                                data: data.data,
2372                                packet_info: Default::default(),
2373                            },
2374                        )
2375                        .await?;
2376                    Ok(())
2377                })
2378            });
2379
2380        const INITIAL_BALANCER_TARGET: u64 = 10;
2381
2382        // Alice sends the KeepAlive messages reporting the initial balancer target
2383        let bob_mgr_clone = bob_mgr.clone();
2384        alice_transport
2385            .expect_send_message()
2386            .times(5..)
2387            //.in_sequence(&mut sequence)
2388            .withf(move |peer, data| {
2389                start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerTarget) && ka.additional_data == INITIAL_BALANCER_TARGET))
2390                //msg_type(data, StartProtocolDiscriminants::KeepAlive)
2391                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2392            })
2393            .returning(move |_, data| {
2394                let bob_mgr_clone = bob_mgr_clone.clone();
2395                Box::pin(async move {
2396                    bob_mgr_clone
2397                        .dispatch_message(
2398                            alice_pseudonym,
2399                            ApplicationDataIn {
2400                                data: data.data,
2401                                packet_info: Default::default(),
2402                            },
2403                        )
2404                        .await?;
2405                    Ok(())
2406                })
2407            });
2408
2409        const NEXT_BALANCER_TARGET: u64 = 50;
2410
2411        // Alice sends also the KeepAlive messages reporting the updated balancer target
2412        let bob_mgr_clone = bob_mgr.clone();
2413        alice_transport
2414            .expect_send_message()
2415            .times(5..)
2416            //.in_sequence(&mut sequence)
2417            .withf(move |peer, data| {
2418                start_msg_match(data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerTarget) && ka.additional_data == NEXT_BALANCER_TARGET))
2419                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2420            })
2421            .returning(move |_, data| {
2422                let bob_mgr_clone = bob_mgr_clone.clone();
2423                Box::pin(async move {
2424                    bob_mgr_clone
2425                        .dispatch_message(
2426                            alice_pseudonym,
2427                            ApplicationDataIn {
2428                                data: data.data,
2429                                packet_info: Default::default(),
2430                            },
2431                        )
2432                        .await?;
2433                    Ok(())
2434                })
2435            });
2436
2437        // Bob sends at least 1 Keep Alive back reporting its SURB estimate
2438        let alice_mgr_clone = alice_mgr.clone();
2439        bob_transport
2440            .expect_send_message()
2441            .times(1..)
2442            //.in_sequence(&mut open_sequence)
2443            .withf(move |peer, data| {
2444                start_msg_match(&data, |msg| matches!(msg, StartProtocol::KeepAlive(ka) if ka.flags.contains(KeepAliveFlag::BalancerState) && ka.additional_data > 0))
2445                    && matches!(peer, DestinationRouting::Return(SurbMatcher::Pseudonym(p)) if p == &alice_pseudonym)
2446            })
2447            .returning(move |_, data| {
2448                let alice_mgr_clone = alice_mgr_clone.clone();
2449                Box::pin(async move {
2450                    alice_mgr_clone
2451                        .dispatch_message(
2452                            alice_pseudonym,
2453                            ApplicationDataIn {
2454                                data: data.data,
2455                                packet_info: Default::default(),
2456                            },
2457                        )
2458                        .await?;
2459                    Ok(())
2460                })
2461            });
2462
2463        // Alice sends the terminating segment to close the Session
2464        let bob_mgr_clone = bob_mgr.clone();
2465        alice_transport
2466            .expect_send_message()
2467            .once()
2468            //.in_sequence(&mut sequence)
2469            .withf(move |peer, data| {
2470                hopr_protocol_session::types::SessionMessage::<{ ApplicationData::PAYLOAD_SIZE }>::try_from(
2471                    data.data.plain_text.as_ref(),
2472                )
2473                .ok()
2474                .and_then(|m| m.try_as_segment())
2475                .map(|s| s.is_terminating())
2476                .unwrap_or(false)
2477                    && matches!(peer, DestinationRouting::Forward { destination, .. } if destination.as_ref() == &bob_peer.into())
2478            })
2479            .returning(move |_, data| {
2480                let bob_mgr_clone = bob_mgr_clone.clone();
2481                Box::pin(async move {
2482                    bob_mgr_clone
2483                        .dispatch_message(
2484                            alice_pseudonym,
2485                            ApplicationDataIn {
2486                                data: data.data,
2487                                packet_info: Default::default(),
2488                            },
2489                        )
2490                        .await?;
2491                    Ok(())
2492                })
2493            });
2494
2495        let mut ahs = Vec::new();
2496
2497        // Start Alice
2498        let (new_session_tx_alice, _) = futures::channel::mpsc::channel(1024);
2499        ahs.extend(alice_mgr.start(mock_packet_planning(alice_transport), new_session_tx_alice)?);
2500
2501        // Start Bob
2502        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::channel(1024);
2503        ahs.extend(bob_mgr.start(mock_packet_planning(bob_transport), new_session_tx_bob)?);
2504
2505        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
2506
2507        let balancer_cfg = SurbBalancerConfig {
2508            target_surb_buffer_size: INITIAL_BALANCER_TARGET,
2509            max_surbs_per_sec: 100,
2510            ..Default::default()
2511        };
2512
2513        pin_mut!(new_session_rx_bob);
2514        let (alice_session, bob_session) = timeout(
2515            Duration::from_secs(2),
2516            futures::future::join(
2517                alice_mgr.new_session(
2518                    bob_peer,
2519                    SessionTarget::TcpStream(target.clone()),
2520                    SessionClientConfig {
2521                        pseudonym: alice_pseudonym.into(),
2522                        capabilities: Capability::Segmentation.into(),
2523                        surb_management: Some(balancer_cfg),
2524                        ..Default::default()
2525                    },
2526                ),
2527                new_session_rx_bob.next(),
2528            ),
2529        )
2530        .await?;
2531
2532        let mut alice_session = alice_session?;
2533        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
2534
2535        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
2536
2537        assert_eq!(
2538            Some(balancer_cfg),
2539            alice_mgr.get_surb_balancer_config(alice_session.id()).await?
2540        );
2541
2542        let remote_cfg = bob_mgr
2543            .get_surb_balancer_config(bob_session.session.id())
2544            .await?
2545            .ok_or(anyhow!("no remote config at bob"))?;
2546        assert_eq!(remote_cfg.target_surb_buffer_size, balancer_cfg.target_surb_buffer_size);
2547        assert_eq!(
2548            remote_cfg.max_surbs_per_sec,
2549            remote_cfg.target_surb_buffer_size
2550                / bob_cfg
2551                    .minimum_surb_buffer_duration
2552                    .max(MIN_SURB_BUFFER_DURATION)
2553                    .as_secs()
2554        );
2555
2556        // Let the Surb balancer send enough KeepAlive messages
2557        tokio::time::sleep(Duration::from_millis(1500)).await;
2558
2559        let new_balancer_cfg = SurbBalancerConfig {
2560            target_surb_buffer_size: NEXT_BALANCER_TARGET,
2561            max_surbs_per_sec: 100,
2562            ..Default::default()
2563        };
2564
2565        // Update to a higher target
2566        alice_mgr
2567            .update_surb_balancer_config(alice_session.id(), new_balancer_cfg)
2568            .await?;
2569
2570        // Let the Surb balancer send enough KeepAlive messages
2571        tokio::time::sleep(Duration::from_millis(1500)).await;
2572
2573        // Bob should know about the updated target
2574        let remote_cfg = bob_mgr
2575            .get_surb_balancer_config(bob_session.session.id())
2576            .await?
2577            .ok_or(anyhow!("no remote config at bob"))?;
2578        assert_eq!(
2579            remote_cfg.target_surb_buffer_size,
2580            new_balancer_cfg.target_surb_buffer_size
2581        );
2582        assert_eq!(
2583            remote_cfg.max_surbs_per_sec,
2584            new_balancer_cfg.target_surb_buffer_size / bob_cfg.minimum_surb_buffer_duration.as_secs()
2585        );
2586
2587        let (alice_surb_sent, alice_surb_used) = alice_mgr.get_surb_level_estimates(alice_session.id()).await?;
2588        let (bob_surb_recv, bob_surb_used) = bob_mgr.get_surb_level_estimates(bob_session.session.id()).await?;
2589
2590        alice_session.close().await?;
2591
2592        assert!(alice_surb_sent > 0, "alice must've sent surbs");
2593        assert!(bob_surb_recv > 0, "bob must've received surbs");
2594        assert!(
2595            bob_surb_recv <= alice_surb_sent,
2596            "bob cannot receive more surbs than alice sent"
2597        );
2598
2599        assert!(alice_surb_used > 0, "alice must see bob used surbs");
2600        assert!(bob_surb_used > 0, "bob must've used surbs");
2601        assert!(
2602            alice_surb_used <= bob_surb_used,
2603            "alice cannot see bob used more surbs than bob actually used"
2604        );
2605
2606        tokio::time::sleep(Duration::from_millis(300)).await;
2607        assert!(matches!(
2608            alice_mgr.ping_session(alice_session.id()).await,
2609            Err(TransportSessionError::Manager(SessionManagerError::NonExistingSession))
2610        ));
2611
2612        futures::stream::iter(ahs)
2613            .for_each(|ah| async move { ah.abort() })
2614            .await;
2615
2616        Ok(())
2617    }
2618}