hopr_transport_session/
manager.rs

1use futures::channel::mpsc::UnboundedSender;
2use futures::future::Either;
3use futures::{pin_mut, FutureExt, StreamExt, TryStreamExt};
4use hopr_internal_types::prelude::{ApplicationData, Tag};
5use hopr_network_types::prelude::*;
6use std::ops::Range;
7use std::sync::{Arc, OnceLock};
8use std::time::Duration;
9use tracing::{debug, error, info, trace, warn};
10
11use crate::errors::{SessionManagerError, TransportSessionError};
12use crate::initiation::{
13    StartChallenge, StartErrorReason, StartErrorType, StartEstablished, StartInitiation, StartProtocol,
14};
15use crate::traits::SendMsg;
16use crate::types::unwrap_offchain_key;
17use crate::{IncomingSession, Session, SessionClientConfig, SessionId};
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21    static ref METRIC_ACTIVE_SESSIONS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
22        "hopr_session_num_active_sessions",
23        "Number of currently active HOPR sessions"
24    ).unwrap();
25    static ref METRIC_NUM_ESTABLISHED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
26        "hopr_session_established_sessions_count",
27        "Number of sessions that were successfully established as an Exit node"
28    ).unwrap();
29    static ref METRIC_NUM_INITIATED_SESSIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
30        "hopr_session_initiated_sessions_count",
31        "Number of sessions that were successfully initiated as an Entry node"
32    ).unwrap();
33    static ref METRIC_RECEIVED_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
34        "hopr_session_received_error_count",
35        "Number of HOPR session errors received from an Exit node",
36        &["kind"]
37    ).unwrap();
38    static ref METRIC_SENT_SESSION_ERRS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
39        "hopr_session_sent_error_count",
40        "Number of HOPR session errors sent to an Entry node",
41        &["kind"]
42    ).unwrap();
43}
44
45/// Configuration for the [`SessionManager`].
46#[derive(Clone, Debug, PartialEq, Eq, smart_default::SmartDefault)]
47pub struct SessionManagerConfig {
48    /// Ranges of tags available for Sessions.
49    ///
50    /// **NOTE**: If the range starts lower than [`MIN_SESSION_TAG_RANGE_RESERVATION`],
51    /// it will be automatically transformed to start at this value.
52    /// This is due to the reserved range by the Start sub-protocol.
53    ///
54    /// Default is 16..1024.
55    #[default(_code = "16..1024")]
56    pub session_tag_range: Range<Tag>,
57
58    /// The base timeout for initiation of Session initiation.
59    ///
60    /// The actual timeout is adjusted according to the number of hops for that Session:
61    /// `t = 2 * initiation_time_out_base * (num_hops + 1)`
62    ///
63    /// Default is 5 seconds.
64    #[default(Duration::from_secs(5))]
65    pub initiation_timeout_base: Duration,
66
67    /// Timeout for Session to be closed due to inactivity.
68    ///
69    /// Default is 180 seconds.
70    #[default(Duration::from_secs(180))]
71    pub idle_timeout: Duration,
72}
73
74fn close_session_after_eviction<S: SendMsg + Send + Sync + 'static>(
75    msg_sender: Arc<OnceLock<S>>,
76    me: PeerId,
77    session_id: SessionId,
78    session_data: CachedSession,
79    cause: moka::notification::RemovalCause,
80) -> moka::notification::ListenerFuture {
81    // When a Session is removed from the cache, we notify the other party only
82    // if this removal was due to expiration or cache size limit.
83    match cause {
84        r @ moka::notification::RemovalCause::Expired | r @ moka::notification::RemovalCause::Size
85            if msg_sender.get().is_some() =>
86        {
87            trace!(
88                ?session_id,
89                reason = ?r,
90                "session termination due to eviction from the cache"
91            );
92            let data = match ApplicationData::try_from(StartProtocol::CloseSession(session_id.with_peer(me))) {
93                Ok(data) => data,
94                Err(e) => {
95                    error!(
96                        ?session_id,
97                        error = %e,
98                        "failed to serialize CloseSession"
99                    );
100                    return futures::future::ready(()).boxed();
101                }
102            };
103
104            let msg_sender = msg_sender.clone();
105            async move {
106                // Unwrap cannot panic, since the value's existence is checked on L72
107                if let Err(err) = msg_sender
108                    .get()
109                    .unwrap()
110                    .send_message(data, *session_id.peer(), session_data.routing_opts)
111                    .await
112                {
113                    error!(
114                        ?session_id,
115                        error = %err,
116                        "could not send notification of session closure after cache eviction"
117                    );
118                }
119
120                session_data.session_tx.close_channel();
121                debug!(
122                    ?session_id,
123                    reason = ?r,
124                    "session has been closed due to cache eviction"
125                );
126
127                #[cfg(all(feature = "prometheus", not(test)))]
128                METRIC_ACTIVE_SESSIONS.decrement(1.0);
129            }
130            .boxed()
131        }
132        _ => futures::future::ready(()).boxed(),
133    }
134}
135
136/// This function will use the given generator to generate an initial seeding key.
137/// It will check whether the given cache already contains a value for that key, and if not,
138/// calls the generator (with the previous value) to generate a new seeding key and retry.
139/// The function either finds a suitable free slot, inserting the `value` and returns the found key,
140/// or terminates with `None` when `gen` returns the initial seed again.
141async fn insert_into_next_slot<K, V, F>(cache: &moka::future::Cache<K, V>, gen: F, value: V) -> Option<K>
142where
143    K: Copy + std::hash::Hash + Eq + Send + Sync + 'static,
144    V: Clone + Send + Sync + 'static,
145    F: Fn(Option<K>) -> K,
146{
147    let initial = gen(None);
148    let mut next = initial;
149    loop {
150        let insertion_result = cache
151            .entry(next)
152            .and_try_compute_with(|e| {
153                if e.is_none() {
154                    futures::future::ok::<_, ()>(moka::ops::compute::Op::Put(value.clone()))
155                } else {
156                    futures::future::ok::<_, ()>(moka::ops::compute::Op::Nop)
157                }
158            })
159            .await;
160
161        // If we inserted successfully, break the loop and return the insertion key
162        if let Ok(moka::ops::compute::CompResult::Inserted(_)) = insertion_result {
163            return Some(next);
164        }
165
166        // Otherwise, generate a next key
167        next = gen(Some(next));
168
169        // If generated keys made it to full loop, return failure
170        if next == initial {
171            return None;
172        }
173    }
174}
175
176/// The first challenge value used in Start protocol to initiate a session.
177pub(crate) const MIN_CHALLENGE: StartChallenge = 1;
178
179// Needs to use an UnboundedSender instead of oneshot
180// because Moka cache requires the value to be Clone, which oneshot Sender is not.
181// It also cannot be enclosed in an Arc, since calling `send` consumes the oneshot Sender.
182type SessionInitiationCache =
183    moka::future::Cache<StartChallenge, UnboundedSender<Result<StartEstablished<SessionId>, StartErrorType>>>;
184
185#[derive(Clone)]
186struct CachedSession {
187    // Sender needs to be put in Arc, so that no clones are made by `moka`.
188    // This makes sure that the entire channel closes once the one and only sender is closed.
189    session_tx: Arc<UnboundedSender<Box<[u8]>>>,
190    routing_opts: RoutingOptions,
191}
192
193/// Indicates the result of processing a message.
194#[derive(Clone, Debug, PartialEq, Eq)]
195pub enum DispatchResult {
196    /// Session or Start protocol message has been processed successfully.
197    Processed,
198    /// The message was not related to Start or Session protocol.
199    Unrelated(ApplicationData),
200}
201
202/// Manages lifecycles of Sessions.
203///
204/// Once the manager is [started](SessionManager::start), the [`SessionManager::dispatch_message`]
205/// should be called for each [`ApplicationData`] received by the node.
206/// This way, the `SessionManager` takes care of proper processing of the Start sub-protocol
207/// and correct dispatch of Session-related packets to individual existing Sessions.
208///
209/// Secondly, the manager can initiate new outgoing sessions via [`SessionManager::new_session`].
210///
211/// Since the `SessionManager` operates over the HOPR protocol,
212/// the [message transport `S`](SendMsg) is required.
213/// Such transport must also be `Clone`, since it will be cloned into the created [`Session`] objects.
214pub struct SessionManager<S> {
215    session_initiations: SessionInitiationCache,
216    session_notifiers: Arc<OnceLock<(UnboundedSender<IncomingSession>, UnboundedSender<SessionId>)>>,
217    sessions: moka::future::Cache<SessionId, CachedSession>,
218    me: PeerId,
219    msg_sender: Arc<OnceLock<S>>,
220    cfg: SessionManagerConfig,
221}
222
223impl<S> Clone for SessionManager<S> {
224    fn clone(&self) -> Self {
225        Self {
226            session_initiations: self.session_initiations.clone(),
227            session_notifiers: self.session_notifiers.clone(),
228            sessions: self.sessions.clone(),
229            me: self.me,
230            cfg: self.cfg.clone(),
231            msg_sender: self.msg_sender.clone(),
232        }
233    }
234}
235
236fn initiation_timeout_max(base: Duration, hops: usize) -> Duration {
237    2 * base * (hops as u32)
238}
239
240/// The Minimum Session tag due to Start-protocol messages.
241pub const MIN_SESSION_TAG_RANGE_RESERVATION: Tag = 16;
242
243impl<S: SendMsg + Clone + Send + Sync + 'static> SessionManager<S> {
244    /// Creates a new instance given the `PeerId` and [config](SessionManagerConfig).
245    pub fn new(me: PeerId, mut cfg: SessionManagerConfig) -> Self {
246        // Accommodate the lower bound if too low.
247        if cfg.session_tag_range.start < MIN_SESSION_TAG_RANGE_RESERVATION {
248            let diff = MIN_SESSION_TAG_RANGE_RESERVATION - cfg.session_tag_range.start;
249            cfg.session_tag_range = MIN_SESSION_TAG_RANGE_RESERVATION..cfg.session_tag_range.end + diff;
250        }
251
252        #[cfg(all(feature = "prometheus", not(test)))]
253        METRIC_ACTIVE_SESSIONS.set(0.0);
254
255        let msg_sender = Arc::new(OnceLock::new());
256        Self {
257            msg_sender: msg_sender.clone(),
258            session_initiations: moka::future::Cache::builder()
259                .max_capacity(cfg.session_tag_range.clone().count() as u64)
260                .time_to_live(initiation_timeout_max(
261                    cfg.initiation_timeout_base,
262                    RoutingOptions::MAX_INTERMEDIATE_HOPS,
263                ))
264                .build(),
265            sessions: moka::future::Cache::builder()
266                .max_capacity(u16::MAX as u64)
267                .time_to_idle(cfg.idle_timeout)
268                .async_eviction_listener(move |k, v, c| {
269                    let msg_sender = msg_sender.clone();
270                    close_session_after_eviction(msg_sender, me, *k, v, c)
271                })
272                .build(),
273            session_notifiers: Arc::new(OnceLock::new()),
274            me,
275            cfg,
276        }
277    }
278
279    /// Starts the instance with the given [transport](SendMsg) implementation
280    /// and a channel that is used to notify when new incoming session is opened to us.
281    ///
282    /// This method must be called prior to any calls to [`SessionManager::new_session`] or
283    /// [`SessionManager::dispatch_message`].
284    pub fn start(
285        &self,
286        msg_sender: S,
287        new_session_notifier: UnboundedSender<IncomingSession>,
288    ) -> crate::errors::Result<Vec<hopr_async_runtime::prelude::JoinHandle<()>>> {
289        self.msg_sender
290            .set(msg_sender)
291            .map_err(|_| SessionManagerError::AlreadyStarted)?;
292
293        let (session_close_tx, session_close_rx) = futures::channel::mpsc::unbounded();
294        self.session_notifiers
295            .set((new_session_notifier, session_close_tx))
296            .map_err(|_| SessionManagerError::AlreadyStarted)?;
297
298        let myself = self.clone();
299        let jh_closure_notifications =
300            hopr_async_runtime::prelude::spawn(session_close_rx.for_each_concurrent(None, move |closed_session_id| {
301                let myself = myself.clone();
302                async move {
303                    trace!(
304                        session_id = ?closed_session_id,
305                        "sending notification of session closure done by us"
306                    );
307                    match myself.close_session(closed_session_id, true).await {
308                        Ok(closed) if closed => debug!(
309                            session_id = ?closed_session_id,
310                            "session has been closed by us"
311                        ),
312                        Err(e) => error!(
313                            session_id = ?closed_session_id,
314                            error = %e,
315                            "cannot initiate session closure notification"
316                        ),
317                        _ => {}
318                    }
319                }
320            }));
321
322        // This is necessary to evict expired entries from the caches if
323        // no session-related operations happen at all.
324        // This ensures the dangling expired sessions are properly closed
325        // and their closure is timely notified to the other party.
326        let myself = self.clone();
327        let jh_session_expiration = hopr_async_runtime::prelude::spawn(async move {
328            let jitter = hopr_crypto_random::random_float_in_range(1.0..1.5);
329            let timeout = initiation_timeout_max(
330                myself.cfg.initiation_timeout_base,
331                RoutingOptions::MAX_INTERMEDIATE_HOPS,
332            )
333            .min(myself.cfg.idle_timeout)
334            .mul_f64(jitter)
335                / 2;
336            loop {
337                hopr_async_runtime::prelude::sleep(timeout).await;
338                trace!("executing session cache evictions");
339                futures::join!(
340                    myself.sessions.run_pending_tasks(),
341                    myself.session_initiations.run_pending_tasks()
342                );
343            }
344        });
345
346        Ok(vec![jh_closure_notifications, jh_session_expiration])
347    }
348
349    /// Check if [`start`](SessionManager::start) has been called and the instance is running.
350    pub fn is_started(&self) -> bool {
351        self.session_notifiers.get().is_some()
352    }
353
354    /// Initiates a new outgoing Session with the given configuration.
355    ///
356    /// If the Session's counterparty does not respond within
357    /// the [configured](SessionManagerConfig) period,
358    /// this method returns [`TransportSessionError::Timeout`].
359    ///
360    /// It will also fail if the instance has not been [started](SessionManager::start).
361    pub async fn new_session(&self, cfg: SessionClientConfig) -> crate::errors::Result<Session> {
362        let msg_sender = self.msg_sender.get().ok_or(SessionManagerError::NotStarted)?;
363
364        let (tx_initiation_done, rx_initiation_done) = futures::channel::mpsc::unbounded();
365        let challenge = insert_into_next_slot(
366            &self.session_initiations,
367            |ch| {
368                if let Some(challenge) = ch {
369                    ((challenge + 1) % hopr_crypto_random::MAX_RANDOM_INTEGER).max(MIN_CHALLENGE)
370                } else {
371                    hopr_crypto_random::random_integer(MIN_CHALLENGE, None)
372                }
373            },
374            tx_initiation_done,
375        )
376        .await
377        .ok_or(SessionManagerError::NoChallengeSlots)?; // almost impossible with u64
378
379        // Prepare the session initiation message in the Start protocol
380        trace!(challenge, ?cfg, "initiating session with config");
381        let start_session_msg = StartProtocol::<SessionId>::StartSession(StartInitiation {
382            challenge,
383            target: cfg.target,
384            capabilities: cfg.capabilities.iter().copied().collect(),
385            // Back-routing currently uses the same (inverted) route as session initiation
386            back_routing: Some((cfg.path_options.clone().invert(), self.me)),
387        });
388
389        // Send the Session initiation message
390        trace!(challenge, "sending new session request");
391        msg_sender
392            .send_message(start_session_msg.try_into()?, cfg.peer, cfg.path_options.clone())
393            .await?;
394
395        // Await session establishment response from the Exit node or timeout
396        pin_mut!(rx_initiation_done);
397        let initiation_done = TryStreamExt::try_next(&mut rx_initiation_done);
398
399        // The timeout is given by the number of hops requested
400        let timeout = hopr_async_runtime::prelude::sleep(initiation_timeout_max(
401            self.cfg.initiation_timeout_base,
402            cfg.path_options.count_hops() + 1,
403        ));
404        pin_mut!(timeout);
405
406        trace!(challenge, "awaiting session establishment");
407        match futures::future::select(initiation_done, timeout).await {
408            Either::Left((Ok(Some(est)), _)) => {
409                // Session has been established, construct it
410                let session_id = est.session_id;
411                debug!(challenge = est.orig_challenge, ?session_id, "started a new session");
412
413                // Insert the Session object, forcibly overwrite any other session with the same ID
414                let (tx, rx) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
415                self.sessions
416                    .insert(
417                        session_id,
418                        CachedSession {
419                            session_tx: Arc::new(tx),
420                            routing_opts: cfg.path_options.clone(),
421                        },
422                    )
423                    .await;
424
425                #[cfg(all(feature = "prometheus", not(test)))]
426                {
427                    METRIC_NUM_INITIATED_SESSIONS.increment();
428                    METRIC_ACTIVE_SESSIONS.increment(1.0);
429                }
430
431                Ok(Session::new(
432                    session_id,
433                    self.me,
434                    cfg.path_options,
435                    cfg.capabilities.into_iter().collect(),
436                    Arc::new(msg_sender.clone()),
437                    rx,
438                    self.session_notifiers.get().map(|(_, c)| c.clone()),
439                ))
440            }
441            Either::Left((Ok(None), _)) => Err(SessionManagerError::Other(
442                "internal error: sender has been closed without completing the session establishment".into(),
443            )
444            .into()),
445            Either::Left((Err(e), _)) => {
446                // The other side didn't allow us to establish a session
447                error!(
448                    challenge = e.challenge,
449                    error = ?e,
450                    "the other party rejected the session initiation with error"
451                );
452                Err(TransportSessionError::Rejected(e.reason))
453            }
454            Either::Right(_) => {
455                // Timeout waiting for a session establishment
456                error!(challenge, "session initiation attempt timed out");
457
458                #[cfg(all(feature = "prometheus", not(test)))]
459                METRIC_RECEIVED_SESSION_ERRS.increment(&["timeout"]);
460
461                Err(TransportSessionError::Timeout)
462            }
463        }
464    }
465
466    /// Main method to be called whenever data are received.
467    ///
468    /// It tries to recognize the message and correctly dispatches either
469    /// the Session protocol or Start protocol messages.
470    ///
471    /// If the data are not recognized, they are returned as [`DispatchResult::Unrelated`].
472    pub async fn dispatch_message(&self, data: ApplicationData) -> crate::errors::Result<DispatchResult> {
473        if let Some(app_tag) = &data.application_tag {
474            if (0..self.cfg.session_tag_range.start).contains(app_tag) {
475                trace!(tag = app_tag, "dispatching Start protocol message");
476                return self
477                    .handle_start_protocol_message(data)
478                    .await
479                    .map(|_| DispatchResult::Processed);
480            } else if self.cfg.session_tag_range.contains(app_tag) {
481                let (peer, data) = unwrap_offchain_key(data.plain_text.clone())?;
482
483                let session_id = SessionId::new(*app_tag, peer);
484
485                return if let Some(session_data) = self.sessions.get(&session_id).await {
486                    trace!(?session_id, "received data for a registered session");
487
488                    Ok(session_data
489                        .session_tx
490                        .unbounded_send(data)
491                        .map(|_| DispatchResult::Processed)
492                        .map_err(|e| SessionManagerError::Other(e.to_string()))?)
493                } else {
494                    error!(%session_id, "received data from an unestablished session");
495                    Err(TransportSessionError::UnknownData)
496                };
497            }
498        }
499
500        Ok(DispatchResult::Unrelated(data))
501    }
502
503    async fn handle_start_protocol_message(&self, data: ApplicationData) -> crate::errors::Result<()> {
504        match StartProtocol::<SessionId>::try_from(data)? {
505            StartProtocol::StartSession(session_req) => {
506                trace!(challenge = session_req.challenge, "received session initiation request");
507
508                // Back-routing information is mandatory until the Return Path is introduced
509                let (route, peer) = session_req.back_routing.ok_or(SessionManagerError::NoBackRoutingInfo)?;
510
511                debug!(%peer, "got new session request, searching for a free session slot");
512
513                let msg_sender = self.msg_sender.get().ok_or(SessionManagerError::NotStarted)?;
514
515                let (new_session_notifier, close_session_notifier) = self
516                    .session_notifiers
517                    .get()
518                    .cloned()
519                    .ok_or(SessionManagerError::NotStarted)?;
520
521                // Construct the session
522                let (tx_session_data, rx_session_data) = futures::channel::mpsc::unbounded::<Box<[u8]>>();
523                if let Some(session_id) = insert_into_next_slot(
524                    &self.sessions,
525                    |sid| {
526                        let next_tag = match sid {
527                            Some(session_id) => ((session_id.tag() + 1) % self.cfg.session_tag_range.end)
528                                .max(self.cfg.session_tag_range.start),
529                            None => hopr_crypto_random::random_integer(
530                                self.cfg.session_tag_range.start as u64,
531                                Some(self.cfg.session_tag_range.end as u64),
532                            ) as u16,
533                        };
534                        SessionId::new(next_tag, peer)
535                    },
536                    CachedSession {
537                        session_tx: Arc::new(tx_session_data),
538                        routing_opts: route.clone(),
539                    },
540                )
541                .await
542                {
543                    debug!(?session_id, "assigning a new session");
544
545                    let session = Session::new(
546                        session_id,
547                        self.me,
548                        route.clone(),
549                        session_req.capabilities,
550                        Arc::new(msg_sender.clone()),
551                        rx_session_data,
552                        close_session_notifier.into(),
553                    );
554
555                    // Extract useful information about the session from the Start protocol message
556                    let incoming_session = IncomingSession {
557                        session,
558                        target: session_req.target,
559                    };
560
561                    // Notify that a new incoming session has been created
562                    if let Err(e) = new_session_notifier.unbounded_send(incoming_session) {
563                        warn!(error = %e, "failed to send session to incoming session queue");
564                    }
565
566                    trace!(?session_id, "session notification sent");
567
568                    // Notify the sender that the session has been established.
569                    // Set our peer ID in the session ID sent back to them.
570                    let data = StartProtocol::SessionEstablished(StartEstablished {
571                        orig_challenge: session_req.challenge,
572                        session_id: session_id.with_peer(self.me),
573                    });
574
575                    msg_sender
576                        .send_message(data.try_into()?, peer, route)
577                        .await
578                        .map_err(|e| {
579                            SessionManagerError::Other(format!("failed to send session establishment message: {e}"))
580                        })?;
581
582                    info!(%session_id, "new session established");
583
584                    #[cfg(all(feature = "prometheus", not(test)))]
585                    {
586                        METRIC_NUM_ESTABLISHED_SESSIONS.increment();
587                        METRIC_ACTIVE_SESSIONS.increment(1.0);
588                    }
589                } else {
590                    error!(
591                        %peer,
592                        "failed to reserve a new session slot"
593                    );
594
595                    // Notify the sender that the session could not be established
596                    let reason = StartErrorReason::NoSlotsAvailable;
597                    let data = StartProtocol::<SessionId>::SessionError(StartErrorType {
598                        challenge: session_req.challenge,
599                        reason,
600                    });
601
602                    msg_sender
603                        .send_message(data.try_into()?, peer, route)
604                        .await
605                        .map_err(|e| {
606                            SessionManagerError::Other(format!(
607                                "failed to send session establishment error message: {e}"
608                            ))
609                        })?;
610
611                    trace!(%peer, "session establishment failure message sent");
612
613                    #[cfg(all(feature = "prometheus", not(test)))]
614                    METRIC_SENT_SESSION_ERRS.increment(&[&reason.to_string()])
615                }
616            }
617            StartProtocol::SessionEstablished(est) => {
618                trace!(
619                    session_id = ?est.session_id,
620                    "received session establishment confirmation"
621                );
622                let challenge = est.orig_challenge;
623                if let Some(tx_est) = self.session_initiations.remove(&est.orig_challenge).await {
624                    if let Err(e) = tx_est.unbounded_send(Ok(est)) {
625                        return Err(
626                            SessionManagerError::Other(format!("could not notify session establishment: {e}")).into(),
627                        );
628                    }
629                    debug!(challenge, "session establishment complete");
630                } else {
631                    error!(challenge, "session establishment attempt expired");
632                }
633            }
634            StartProtocol::SessionError(err) => {
635                trace!(
636                    challenge = err.challenge,
637                    error = ?err.reason,
638                    "failed to initialize a session",
639                );
640                // Currently, we do not distinguish between individual error types
641                // and just discard the initiation attempt and pass on the error.
642                if let Some(tx_est) = self.session_initiations.remove(&err.challenge).await {
643                    if let Err(e) = tx_est.unbounded_send(Err(err)) {
644                        return Err(SessionManagerError::Other(format!(
645                            "could not notify session establishment error {err:?}: {e}"
646                        ))
647                        .into());
648                    }
649                    error!(
650                        challenge = err.challenge,
651                        error = ?err,
652                        "session establishment error received"
653                    );
654                } else {
655                    error!(
656                        challenge = err.challenge,
657                        error = ?err,
658                        "session establishment attempt expired before error could be delivered"
659                    );
660                }
661
662                #[cfg(all(feature = "prometheus", not(test)))]
663                METRIC_RECEIVED_SESSION_ERRS.increment(&[&err.reason.to_string()])
664            }
665            StartProtocol::CloseSession(session_id) => {
666                trace!(?session_id, "received session close request");
667                match self.close_session(session_id, false).await {
668                    Ok(closed) if closed => debug!(?session_id, "session has been closed by the other party"),
669                    Err(e) => error!(
670                        ?session_id,
671                        error = %e,
672                        "session could not be closed on other party's request"
673                    ),
674                    _ => {}
675                }
676            }
677        }
678
679        Ok(())
680    }
681
682    async fn close_session(&self, session_id: SessionId, notify_closure: bool) -> crate::errors::Result<bool> {
683        if let Some(session_data) = self.sessions.remove(&session_id).await {
684            // Notification is not sent only when closing in response to the other party's request
685            if notify_closure {
686                trace!(?session_id, "sending session termination");
687                self.msg_sender
688                    .get()
689                    .ok_or(SessionManagerError::NotStarted)?
690                    .send_message(
691                        StartProtocol::CloseSession(session_id.with_peer(self.me)).try_into()?,
692                        *session_id.peer(),
693                        session_data.routing_opts,
694                    )
695                    .await?;
696            }
697
698            // Closing the data sender on the session will cause the Session to terminate
699            session_data.session_tx.close_channel();
700            trace!(?session_id, "data tx channel closed on session");
701
702            #[cfg(all(feature = "prometheus", not(test)))]
703            METRIC_ACTIVE_SESSIONS.decrement(1.0);
704            Ok(true)
705        } else {
706            // Do not treat this as an error
707            debug!(
708                ?session_id,
709                "could not find session id to close, maybe the session is already closed"
710            );
711            Ok(false)
712        }
713    }
714}
715
716#[cfg(test)]
717mod tests {
718    use super::*;
719
720    use crate::types::SessionTarget;
721    use crate::Capability;
722    use anyhow::anyhow;
723    use async_std::prelude::FutureExt;
724    use async_trait::async_trait;
725    use futures::AsyncWriteExt;
726    use hopr_primitive_types::bounded::BoundedSize;
727
728    mockall::mock! {
729        MsgSender {}
730        impl Clone for MsgSender {
731            fn clone(&self) -> Self;
732        }
733        #[async_trait]
734        impl SendMsg for MsgSender {
735            async fn send_message(
736                &self,
737                data: ApplicationData,
738                destination: PeerId,
739                options: RoutingOptions,
740            ) -> std::result::Result<(), TransportSessionError>;
741        }
742    }
743
744    #[async_std::test]
745    async fn test_insert_into_next_slot() -> anyhow::Result<()> {
746        let cache = moka::future::Cache::new(10);
747
748        for i in 0..5 {
749            let v = insert_into_next_slot(&cache, |prev| prev.map(|v| (v + 1) % 5).unwrap_or(0), "foo".to_string())
750                .await
751                .ok_or(anyhow!("should insert"))?;
752            assert_eq!(v, i);
753            assert_eq!(Some("foo".to_string()), cache.get(&i).await);
754        }
755
756        assert!(
757            insert_into_next_slot(&cache, |prev| prev.map(|v| (v + 1) % 5).unwrap_or(0), "foo".to_string())
758                .await
759                .is_none(),
760            "must not find slot when full"
761        );
762
763        Ok(())
764    }
765
766    #[test_log::test(async_std::test)]
767    async fn session_manager_should_follow_start_protocol_to_establish_new_session_and_close_it() -> anyhow::Result<()>
768    {
769        let alice_peer = PeerId::random();
770        let bob_peer = PeerId::random();
771
772        let alice_mgr = SessionManager::new(alice_peer, Default::default());
773        let bob_mgr = SessionManager::new(bob_peer, Default::default());
774
775        let mut sequence = mockall::Sequence::new();
776        let mut alice_transport = MockMsgSender::new();
777        let mut bob_transport = MockMsgSender::new();
778
779        // Alice sends the StartSession message
780        let bob_mgr_clone = bob_mgr.clone();
781        alice_transport
782            .expect_send_message()
783            .once()
784            .in_sequence(&mut sequence)
785            .withf(move |_, peer, _| *peer == bob_peer)
786            .returning(move |data, _, _| {
787                async_std::task::block_on(bob_mgr_clone.dispatch_message(data))?;
788                Ok(())
789            });
790
791        // Bob clones transport for Session
792        bob_transport
793            .expect_clone()
794            .once()
795            .in_sequence(&mut sequence)
796            .return_once(|| MockMsgSender::new());
797
798        // Bob sends the SessionEstablished message
799        let alice_mgr_clone = alice_mgr.clone();
800        bob_transport
801            .expect_send_message()
802            .once()
803            .in_sequence(&mut sequence)
804            .withf(move |_, peer, _| *peer == alice_peer)
805            .returning(move |data, _, _| {
806                async_std::task::block_on(alice_mgr_clone.dispatch_message(data))?;
807                Ok(())
808            });
809
810        // Alice clones transport for Session
811        alice_transport
812            .expect_clone()
813            .once()
814            .in_sequence(&mut sequence)
815            .return_once(|| MockMsgSender::new());
816
817        // Alice sends the CloseSession message to initiate closure
818        let bob_mgr_clone = bob_mgr.clone();
819        alice_transport
820            .expect_send_message()
821            .once()
822            .in_sequence(&mut sequence)
823            .withf(move |_, peer, _| *peer == bob_peer)
824            .returning(move |data, _, _| {
825                async_std::task::block_on(bob_mgr_clone.dispatch_message(data))?;
826                Ok(())
827            });
828
829        // Bob sends the CloseSession message to confirm
830        let alice_mgr_clone = alice_mgr.clone();
831        bob_transport
832            .expect_send_message()
833            .once()
834            .in_sequence(&mut sequence)
835            .withf(move |_, peer, _| *peer == alice_peer)
836            .returning(move |data, _, _| {
837                async_std::task::block_on(alice_mgr_clone.dispatch_message(data))?;
838                Ok(())
839            });
840
841        let mut jhs = Vec::new();
842
843        // Start Alice
844        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
845        jhs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
846
847        // Start Bob
848        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
849        jhs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
850
851        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
852
853        pin_mut!(new_session_rx_bob);
854        let (alice_session, bob_session) = futures::future::join(
855            alice_mgr.new_session(SessionClientConfig {
856                peer: bob_peer,
857                path_options: RoutingOptions::Hops(BoundedSize::MIN),
858                target: SessionTarget::TcpStream(target.clone()),
859                capabilities: vec![Capability::Segmentation],
860            }),
861            new_session_rx_bob.next(),
862        )
863        .timeout(Duration::from_secs(2))
864        .await?;
865
866        let mut alice_session = alice_session?;
867        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
868
869        assert!(
870            alice_session.capabilities().len() == 1 && alice_session.capabilities().contains(&Capability::Segmentation)
871        );
872        assert_eq!(alice_session.capabilities(), bob_session.session.capabilities());
873        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
874
875        async_std::task::sleep(Duration::from_millis(100)).await;
876        alice_session.close().await?;
877
878        async_std::task::sleep(Duration::from_millis(100)).await;
879        futures::stream::iter(jhs)
880            .for_each(hopr_async_runtime::prelude::cancel_join_handle)
881            .await;
882
883        Ok(())
884    }
885
886    #[test_log::test(async_std::test)]
887    async fn session_manager_should_close_idle_session_automatically() -> anyhow::Result<()> {
888        let alice_peer = PeerId::random();
889        let bob_peer = PeerId::random();
890
891        let cfg = SessionManagerConfig {
892            idle_timeout: Duration::from_millis(200),
893            ..Default::default()
894        };
895
896        let alice_mgr = SessionManager::new(alice_peer, cfg);
897        let bob_mgr = SessionManager::new(bob_peer, Default::default());
898
899        let mut sequence = mockall::Sequence::new();
900        let mut alice_transport = MockMsgSender::new();
901        let mut bob_transport = MockMsgSender::new();
902
903        // Alice sends the StartSession message
904        let bob_mgr_clone = bob_mgr.clone();
905        alice_transport
906            .expect_send_message()
907            .once()
908            .in_sequence(&mut sequence)
909            .withf(move |_, peer, _| *peer == bob_peer)
910            .returning(move |data, _, _| {
911                async_std::task::block_on(bob_mgr_clone.dispatch_message(data))?;
912                Ok(())
913            });
914
915        // Bob clones transport for Session
916        bob_transport
917            .expect_clone()
918            .once()
919            .in_sequence(&mut sequence)
920            .return_once(|| MockMsgSender::new());
921
922        // Bob sends the SessionEstablished message
923        let alice_mgr_clone = alice_mgr.clone();
924        bob_transport
925            .expect_send_message()
926            .once()
927            .in_sequence(&mut sequence)
928            .withf(move |_, peer, _| *peer == alice_peer)
929            .returning(move |data, _, _| {
930                async_std::task::block_on(alice_mgr_clone.dispatch_message(data))?;
931                Ok(())
932            });
933
934        // Alice clones transport for Session
935        alice_transport
936            .expect_clone()
937            .once()
938            .in_sequence(&mut sequence)
939            .return_once(|| MockMsgSender::new());
940
941        // Alice sends the CloseSession message to initiate closure
942        let bob_mgr_clone = bob_mgr.clone();
943        alice_transport
944            .expect_send_message()
945            .once()
946            .in_sequence(&mut sequence)
947            .withf(move |_, peer, _| *peer == bob_peer)
948            .returning(move |data, _, _| {
949                async_std::task::block_on(bob_mgr_clone.dispatch_message(data))?;
950                Ok(())
951            });
952
953        // Bob sends the CloseSession message to confirm
954        let alice_mgr_clone = alice_mgr.clone();
955        bob_transport
956            .expect_send_message()
957            .once()
958            .in_sequence(&mut sequence)
959            .withf(move |_, peer, _| *peer == alice_peer)
960            .returning(move |data, _, _| {
961                async_std::task::block_on(alice_mgr_clone.dispatch_message(data))?;
962                Ok(())
963            });
964
965        let mut jhs = Vec::new();
966
967        // Start Alice
968        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
969        jhs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
970
971        // Start Bob
972        let (new_session_tx_bob, new_session_rx_bob) = futures::channel::mpsc::unbounded();
973        jhs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
974
975        let target = SealedHost::Plain("127.0.0.1:80".parse()?);
976
977        pin_mut!(new_session_rx_bob);
978        let (alice_session, bob_session) = futures::future::join(
979            alice_mgr.new_session(SessionClientConfig {
980                peer: bob_peer,
981                path_options: RoutingOptions::Hops(BoundedSize::MIN),
982                target: SessionTarget::TcpStream(target.clone()),
983                capabilities: vec![Capability::Segmentation],
984            }),
985            new_session_rx_bob.next(),
986        )
987        .timeout(Duration::from_secs(2))
988        .await?;
989
990        let alice_session = alice_session?;
991        let bob_session = bob_session.ok_or(anyhow!("bob must get an incoming session"))?;
992
993        assert!(
994            alice_session.capabilities().len() == 1 && alice_session.capabilities().contains(&Capability::Segmentation)
995        );
996        assert_eq!(alice_session.capabilities(), bob_session.session.capabilities());
997        assert!(matches!(bob_session.target, SessionTarget::TcpStream(host) if host == target));
998
999        // Let the session timeout
1000        async_std::task::sleep(Duration::from_millis(300)).await;
1001
1002        futures::stream::iter(jhs)
1003            .for_each(hopr_async_runtime::prelude::cancel_join_handle)
1004            .await;
1005
1006        Ok(())
1007    }
1008
1009    #[test_log::test(async_std::test)]
1010    async fn session_manager_should_not_allow_establish_session_when_tag_range_is_used_up() -> anyhow::Result<()> {
1011        let alice_peer = PeerId::random();
1012        let bob_peer = PeerId::random();
1013
1014        let cfg = SessionManagerConfig {
1015            session_tag_range: 16..17, // Slot for exactly one session
1016            ..Default::default()
1017        };
1018
1019        let alice_mgr = SessionManager::new(alice_peer, Default::default());
1020        let bob_mgr = SessionManager::new(bob_peer, cfg);
1021
1022        // Occupy the only free slot with tag 16
1023        let (dummy_tx, _) = futures::channel::mpsc::unbounded();
1024        bob_mgr
1025            .sessions
1026            .insert(
1027                SessionId::new(16, alice_peer),
1028                CachedSession {
1029                    session_tx: Arc::new(dummy_tx),
1030                    routing_opts: RoutingOptions::Hops(BoundedSize::MIN),
1031                },
1032            )
1033            .await;
1034
1035        let mut sequence = mockall::Sequence::new();
1036        let mut alice_transport = MockMsgSender::new();
1037        let mut bob_transport = MockMsgSender::new();
1038
1039        // Alice sends the StartSession message
1040        let bob_mgr_clone = bob_mgr.clone();
1041        alice_transport
1042            .expect_send_message()
1043            .once()
1044            .in_sequence(&mut sequence)
1045            .withf(move |_, peer, _| *peer == bob_peer)
1046            .returning(move |data, _, _| {
1047                async_std::task::block_on(bob_mgr_clone.dispatch_message(data))?;
1048                Ok(())
1049            });
1050
1051        // Bob sends the SessionError message
1052        let alice_mgr_clone = alice_mgr.clone();
1053        bob_transport
1054            .expect_send_message()
1055            .once()
1056            .in_sequence(&mut sequence)
1057            .withf(move |_, peer, _| *peer == alice_peer)
1058            .returning(move |data, _, _| {
1059                async_std::task::block_on(alice_mgr_clone.dispatch_message(data))?;
1060                Ok(())
1061            });
1062
1063        let mut jhs = Vec::new();
1064
1065        // Start Alice
1066        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1067        jhs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
1068
1069        // Start Bob
1070        let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1071        jhs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
1072
1073        let result = alice_mgr
1074            .new_session(SessionClientConfig {
1075                peer: bob_peer,
1076                path_options: RoutingOptions::Hops(BoundedSize::MIN),
1077                target: SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1078                capabilities: vec![],
1079            })
1080            .await;
1081
1082        assert!(
1083            matches!(result, Err(TransportSessionError::Rejected(reason)) if reason == StartErrorReason::NoSlotsAvailable)
1084        );
1085
1086        Ok(())
1087    }
1088
1089    #[test_log::test(async_std::test)]
1090    async fn session_manager_should_timeout_new_session_attempt_when_no_response() -> anyhow::Result<()> {
1091        let alice_peer = PeerId::random();
1092        let bob_peer = PeerId::random();
1093
1094        let cfg = SessionManagerConfig {
1095            initiation_timeout_base: Duration::from_millis(100),
1096            ..Default::default()
1097        };
1098
1099        let alice_mgr = SessionManager::new(alice_peer, cfg);
1100        let bob_mgr = SessionManager::new(bob_peer, Default::default());
1101
1102        let mut sequence = mockall::Sequence::new();
1103        let mut alice_transport = MockMsgSender::new();
1104        let bob_transport = MockMsgSender::new();
1105
1106        // Alice sends the StartSession message, but Bob does not handle it
1107        alice_transport
1108            .expect_send_message()
1109            .once()
1110            .in_sequence(&mut sequence)
1111            .withf(move |_, peer, _| *peer == bob_peer)
1112            .returning(|_, _, _| Ok(()));
1113
1114        let mut jhs = Vec::new();
1115
1116        // Start Alice
1117        let (new_session_tx_alice, _) = futures::channel::mpsc::unbounded();
1118        jhs.extend(alice_mgr.start(alice_transport, new_session_tx_alice)?);
1119
1120        // Start Bob
1121        let (new_session_tx_bob, _) = futures::channel::mpsc::unbounded();
1122        jhs.extend(bob_mgr.start(bob_transport, new_session_tx_bob)?);
1123
1124        let result = alice_mgr
1125            .new_session(SessionClientConfig {
1126                peer: bob_peer,
1127                path_options: RoutingOptions::Hops(BoundedSize::MIN),
1128                target: SessionTarget::TcpStream(SealedHost::Plain("127.0.0.1:80".parse()?)),
1129                capabilities: vec![],
1130            })
1131            .await;
1132
1133        assert!(matches!(result, Err(TransportSessionError::Timeout)));
1134
1135        Ok(())
1136    }
1137}