Skip to main content

hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! See also the `hopr_protocol_start` crate for details on Start sub-protocol which initiates a Session.
6//!
7//! As such, the transport layer components should be only those that are directly necessary to:
8//!
9//! 1. send and receive a packet, acknowledgement or ticket aggregation request
10//! 2. send and receive a network telemetry request
11//! 3. automate transport level processes
12//! 4. algorithms associated with the transport layer operational management
13//! 5. interface specifications to allow modular behavioral extensions
14
15/// Configuration of the [crate::HoprTransport].
16pub mod config;
17/// Constants used and exposed by the crate.
18pub mod constants;
19/// Errors used by the crate.
20pub mod errors;
21/// Graph-based path planning for the HOPR transport layer.
22pub mod path;
23/// Transport binary protocol layer (codec, pipeline, heartbeat, stream).
24pub mod protocol;
25
26mod multiaddrs;
27
28#[cfg(feature = "capture")]
29mod capture;
30mod pipeline;
31pub mod socket;
32
33use std::{
34    sync::{Arc, OnceLock},
35    time::Duration,
36};
37
38use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
39use futures::{
40    FutureExt, SinkExt, StreamExt,
41    channel::mpsc::{Sender, channel},
42    stream::select_with_strategy,
43};
44pub use hopr_api::{
45    Multiaddr, PeerId,
46    network::{Health, traits::NetworkView},
47    types::{
48        crypto::{
49            keypairs::{ChainKeypair, Keypair, OffchainKeypair},
50            types::{HalfKeyChallenge, Hash, OffchainPublicKey},
51        },
52        internal::{prelude::HoprPseudonym, routing::RoutingOptions},
53    },
54};
55use hopr_api::{
56    chain::{ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
57    ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
58    graph::{NetworkGraphUpdate, NetworkGraphView, traits::EdgeObservableRead},
59    network::{BoxedProcessFn, NetworkStreamControl},
60    types::primitive::prelude::*,
61};
62use hopr_crypto_packet::prelude::PacketSignal;
63pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
64use hopr_protocol_hopr::MemorySurbStore;
65pub use hopr_transport_probe::{NeighborTelemetry, PathTelemetry, errors::ProbeError, ping::PingQueryReplier};
66use hopr_transport_probe::{
67    Probe,
68    ping::{PingConfig, Pinger},
69};
70pub use hopr_transport_session as session;
71#[cfg(feature = "runtime-tokio")]
72pub use hopr_transport_session::transfer_session;
73pub use hopr_transport_session::{
74    Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
75    SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
76    errors::{SessionManagerError, TransportSessionError},
77};
78use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
79#[cfg(feature = "telemetry")]
80pub use hopr_transport_session::{SessionAckMode, SessionLifecycleState};
81pub use hopr_transport_tag_allocator::TagAllocatorConfig;
82use hopr_utils::{network_types::prelude::*, runtime::AbortableList};
83pub use multiaddr::Protocol;
84use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
85use tracing::{Instrument, debug, error, trace, warn};
86
87#[cfg(feature = "runtime-tokio")]
88use crate::path::BackgroundPathCacheRefreshable;
89pub use crate::{config::HoprProtocolConfig, protocol::PeerProtocolCounterRegistry};
90use crate::{
91    constants::SESSION_INITIATION_TIMEOUT_BASE,
92    errors::HoprTransportError,
93    multiaddrs::strip_p2p_protocol,
94    path::{HoprGraphPathSelector, PathPlanner},
95    pipeline::HoprPacketPipelineBuilder,
96    socket::HoprSocket,
97};
98
99pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
100
101pub use hopr_api as api;
102use hopr_api::{
103    chain::{ChainReadTicketOperations, ChainWriteTicketOperations},
104    tickets::TicketFactory,
105    types::internal::routing::DestinationRouting,
106};
107
108// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
109lazy_static::lazy_static! {
110    static ref SESSION_INITIATION_TIMEOUT_MAX: Duration = 2 * SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
111
112    static ref PEER_ID_CACHE: moka::sync::Cache<PeerId, OffchainPublicKey> = moka::sync::Cache::builder()
113        .time_to_idle(Duration::from_mins(15))
114        .max_capacity(10_000)
115        .build();
116
117    static ref RANDOM_DATA: [u8; 400] = hopr_api::types::crypto_random::random_bytes();
118}
119
120/// PeerId -> OffchainPublicKey is a CPU-intensive blocking operation.
121///
122/// This helper uses a cached static object to speed up the lookup and avoid blocking the async
123/// runtime on repeated conversions for the same [`PeerId`]s.
124pub fn peer_id_to_public_key(peer_id: &PeerId) -> crate::errors::Result<OffchainPublicKey> {
125    PEER_ID_CACHE
126        .try_get_with_by_ref(peer_id, move || {
127            OffchainPublicKey::from_peerid(peer_id).map_err(|e| e.into())
128        })
129        .map_err(|e: Arc<HoprTransportError>| {
130            crate::errors::HoprTransportError::Other(anyhow::anyhow!(
131                "failed to convert peer_id ({:?}) to an offchain public key: {e}",
132                peer_id
133            ))
134        })
135}
136
137#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
138pub enum HoprTransportProcess {
139    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
140    Medium,
141    #[strum(to_string = "HOPR packet pipeline ({0})")]
142    Pipeline(protocol::PacketPipelineProcesses),
143    #[strum(to_string = "session manager sub-process #{0}")]
144    SessionsManagement(usize),
145    #[strum(to_string = "network probing sub-process: {0}")]
146    Probing(hopr_transport_probe::HoprProbeProcess),
147    #[cfg(feature = "runtime-tokio")]
148    #[strum(to_string = "path cache refresh")]
149    PathRefresh,
150    #[strum(to_string = "sync of outgoing ticket indices")]
151    OutgoingIndexSync,
152    #[strum(to_string = "periodic protocol counter flush")]
153    CounterFlush,
154    #[strum(to_string = "mixer→wire forwarder")]
155    MixerForwarder,
156    #[cfg(feature = "capture")]
157    #[strum(to_string = "packet capture")]
158    Capture,
159}
160
161/// HOPR protocol specific instantiation of the SessionManager.
162type HoprSessionManager = SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>>;
163
164/// Allows configuration of one specific [`HoprSession`].
165///
166/// The configurator does not prevent the Session from being closed
167/// or the Session manager from being dropped.
168#[derive(Debug, Clone)]
169pub struct HoprSessionConfigurator {
170    id: SessionId,
171    // Makes sure configurator does not extend lifetime of the SessionManager.
172    smgr: std::sync::Weak<HoprSessionManager>,
173}
174
175impl HoprSessionConfigurator {
176    /// [`SessionId`] of the session this object can configure.
177    pub fn id(&self) -> &SessionId {
178        &self.id
179    }
180
181    /// Sends a Session Keep-Alive packet over the Session.
182    ///
183    /// NOTE: This usually carries at least 2 SURBs on the HOPR protocol level and can be
184    /// used for manual SURB balancing.
185    ///
186    /// NOTE: This operation only sends the Session Keep-Alive packet and **DOES NOT** guarantee the other side
187    /// has received it.
188    pub async fn ping(&self) -> errors::Result<()> {
189        Ok(self
190            .smgr
191            .upgrade()
192            .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
193            .ping_session(&self.id)
194            .await?)
195    }
196
197    /// Gets the configuration of the SURB balancer.
198    ///
199    /// Returns an error if the Session is closed, the Session manager is gone.
200    ///
201    /// Returns `Ok(None)` if the Session has been created without a SURB balancer.
202    pub async fn get_surb_balancer_config(&self) -> errors::Result<Option<SurbBalancerConfig>> {
203        Ok(self
204            .smgr
205            .upgrade()
206            .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
207            .get_surb_balancer_config(&self.id)
208            .await?)
209    }
210
211    /// Updates the configuration of the SURB balancer.
212    ///
213    /// Returns an error if the Session is closed, the Session manager is gone, or the
214    /// Session has been created without a SURB balancer.
215    pub async fn update_surb_balancer_config(&self, config: SurbBalancerConfig) -> errors::Result<()> {
216        Ok(self
217            .smgr
218            .upgrade()
219            .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
220            .update_surb_balancer_config(&self.id, config)
221            .await?)
222    }
223
224    /// Explicitly closes the underlying Session in the [`SessionManager`].
225    ///
226    /// Returns `true` if the session was found and closed, `false` if it was
227    /// already gone (or the manager is dropped). Frees the per-session state
228    /// (frame reassembly buffers, control channels, …) immediately rather than
229    /// waiting for the manager's idle-timeout eviction.
230    pub async fn close(&self) -> bool {
231        match self.smgr.upgrade() {
232            Some(smgr) => smgr.close_session(&self.id).await,
233            None => false,
234        }
235    }
236}
237
238/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
239/// the transport.
240pub struct HoprTransport<Chain, Graph, Net> {
241    packet_key: OffchainKeypair,
242    chain_key: ChainKeypair,
243    chain_api: Chain,
244    ping: Arc<OnceLock<Pinger>>,
245    network: Arc<OnceLock<Net>>,
246    graph: Graph,
247    path_planner: PathPlanner<MemorySurbStore, Chain, HoprGraphPathSelector<Graph>>,
248    my_multiaddresses: Vec<Multiaddr>,
249    smgr: Arc<HoprSessionManager>,
250    session_telemetry_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
251    probing_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
252    counters: PeerProtocolCounterRegistry,
253    cfg: HoprProtocolConfig,
254}
255
256impl<Chain, Graph, Net> HoprTransport<Chain, Graph, Net>
257where
258    Chain: ChainReadChannelOperations
259        + ChainReadAccountOperations
260        + ChainWriteTicketOperations
261        + ChainKeyOperations
262        + ChainReadTicketOperations
263        + ChainValues
264        + Clone
265        + Send
266        + Sync
267        + 'static,
268    Graph: NetworkGraphView<NodeId = OffchainPublicKey>
269        + NetworkGraphUpdate
270        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
271        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
272        + Clone
273        + Send
274        + Sync
275        + 'static,
276    <Graph as NetworkGraphView>::Observed: hopr_api::graph::traits::EdgeObservableRead + Send,
277    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
278        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
279    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
280    Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
281{
282    pub fn new(
283        identity: (&ChainKeypair, &OffchainKeypair),
284        resolver: Chain,
285        graph: Graph,
286        my_multiaddresses: Vec<Multiaddr>,
287        cfg: HoprProtocolConfig,
288    ) -> errors::Result<Self> {
289        let me_offchain = *identity.1.public();
290        let planner_config = cfg.path_planner;
291        let selector = HoprGraphPathSelector::new(
292            me_offchain,
293            graph.clone(),
294            planner_config.max_cached_paths,
295            planner_config.edge_penalty,
296            planner_config.min_ack_rate,
297            planner_config.min_paths_anonymity_floor,
298        );
299
300        let tag_allocators = hopr_transport_tag_allocator::create_allocators_from_config(&cfg.session.tag_allocator)?;
301
302        let mut session_tag_allocator = None;
303        let mut session_telemetry_tag_allocator = None;
304        let mut probing_tag_allocator = None;
305        for (usage, alloc) in tag_allocators {
306            match usage {
307                hopr_transport_tag_allocator::Usage::Session => session_tag_allocator = Some(alloc),
308                hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry => {
309                    session_telemetry_tag_allocator = Some(alloc)
310                }
311                hopr_transport_tag_allocator::Usage::ProvingTelemetry => probing_tag_allocator = Some(alloc),
312            }
313        }
314        let session_telemetry_tag_allocator = session_telemetry_tag_allocator
315            .ok_or_else(|| HoprTransportError::Api("session telemetry tag allocator missing".into()))?;
316        let probing_tag_allocator =
317            probing_tag_allocator.ok_or_else(|| HoprTransportError::Api("probing tag allocator missing".into()))?;
318
319        Ok(Self {
320            packet_key: identity.1.clone(),
321            chain_key: identity.0.clone(),
322            ping: Arc::new(OnceLock::new()),
323            network: Arc::new(OnceLock::new()),
324            graph,
325            path_planner: PathPlanner::new(
326                me_offchain,
327                MemorySurbStore::new(cfg.packet.surb_store),
328                resolver.clone(),
329                selector,
330                planner_config,
331            ),
332            my_multiaddresses,
333            smgr: Arc::new(SessionManager::new(SessionManagerConfig {
334                frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
335                    .ok()
336                    .and_then(|s| s.parse::<usize>().ok())
337                    .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
338                    .max(ApplicationData::PAYLOAD_SIZE),
339                max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
340                    .ok()
341                    .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
342                    .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
343                    .max(Duration::from_millis(100)),
344                initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
345                idle_timeout: cfg.session.idle_timeout,
346                balancer_sampling_interval: cfg.session.balancer_sampling_interval,
347                initial_return_session_egress_rate: 10,
348                minimum_surb_buffer_duration: cfg.session.balancer_minimum_surb_buffer_duration,
349                maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
350                surb_balance_notify_period: None,
351                surb_target_notify: true,
352                maximum_sessions: cfg.session.maximum_managed_sessions,
353            })),
354            chain_api: resolver,
355            session_telemetry_tag_allocator,
356            probing_tag_allocator,
357            counters: PeerProtocolCounterRegistry::default(),
358            cfg,
359        })
360    }
361
362    /// Execute all processes of the [`HoprTransport`] object as a **Relay** node.
363    ///
364    /// Relay nodes run the full packet pipeline including incoming ticket/acknowledgement
365    /// processing and require a [`futures::Sink`] for ticket events as well as an
366    /// `on_incoming_session` channel from the SessionManager (they can accept incoming sessions).
367    pub async fn run_relay<T, TFact, Ct>(
368        &self,
369        cover_traffic: Ct,
370        network: Net,
371        network_process: BoxedProcessFn,
372        ticket_events: T,
373        ticket_factory: TFact,
374        on_incoming_session: Sender<IncomingSession>,
375    ) -> errors::Result<(
376        HoprSocket<
377            futures::stream::BoxStream<'static, ApplicationDataIn>,
378            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
379        >,
380        AbortableList<HoprTransportProcess>,
381    )>
382    where
383        T: futures::Sink<hopr_api::node::TicketEvent> + Clone + Send + Unpin + 'static,
384        T::Error: std::error::Error + Clone + Send,
385        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
386        TFact: TicketFactory + Clone + Send + Sync + 'static,
387    {
388        self.run_inner(
389            protocol::NodeType::Relay,
390            cover_traffic,
391            network,
392            network_process,
393            ticket_events,
394            ticket_factory,
395            Some(on_incoming_session),
396        )
397        .await
398    }
399
400    /// Execute all processes of the [`HoprTransport`] object as an **Exit** (destination) node.
401    ///
402    /// Exit nodes do not process tickets but keep the incoming acknowledgement
403    /// pipeline running and can accept incoming sessions via SessionManager.
404    pub async fn run_exit<TFact, Ct>(
405        &self,
406        cover_traffic: Ct,
407        network: Net,
408        network_process: BoxedProcessFn,
409        ticket_factory: TFact,
410        on_incoming_session: Sender<IncomingSession>,
411    ) -> errors::Result<(
412        HoprSocket<
413            futures::stream::BoxStream<'static, ApplicationDataIn>,
414            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
415        >,
416        AbortableList<HoprTransportProcess>,
417    )>
418    where
419        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
420        TFact: TicketFactory + Clone + Send + Sync + 'static,
421    {
422        self.run_inner(
423            protocol::NodeType::Exit,
424            cover_traffic,
425            network,
426            network_process,
427            futures::sink::drain(),
428            ticket_factory,
429            Some(on_incoming_session),
430        )
431        .await
432    }
433
434    /// Execute all processes of the [`HoprTransport`] object as an **Entry** (source) node.
435    ///
436    /// Entry nodes do not process tickets, do not start the incoming acknowledgement
437    /// pipeline, and do not accept incoming sessions — therefore, they require neither a
438    /// `ticket_events` sink nor an `on_incoming_session` channel.
439    pub async fn run_entry<TFact, Ct>(
440        &self,
441        cover_traffic: Ct,
442        network: Net,
443        network_process: BoxedProcessFn,
444        ticket_factory: TFact,
445    ) -> errors::Result<(
446        HoprSocket<
447            futures::stream::BoxStream<'static, ApplicationDataIn>,
448            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
449        >,
450        AbortableList<HoprTransportProcess>,
451    )>
452    where
453        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
454        TFact: TicketFactory + Clone + Send + Sync + 'static,
455    {
456        self.run_inner(
457            protocol::NodeType::Entry,
458            cover_traffic,
459            network,
460            network_process,
461            futures::sink::drain(),
462            ticket_factory,
463            None,
464        )
465        .await
466    }
467
468    /// Internal worker driving all node-type variants of `HoprTransport::run_*`.
469    ///
470    /// Branches on `role`:
471    /// - [`protocol::NodeType::Relay`]: full packet pipeline + SessionManager.
472    /// - [`protocol::NodeType::Exit`]: ack-drain pipeline + incoming Sessions.
473    /// - [`protocol::NodeType::Entry`]: no ack pipeline, no incoming Sessions.
474    #[allow(clippy::too_many_arguments)]
475    async fn run_inner<T, TFact, Ct>(
476        &self,
477        role: protocol::NodeType,
478        cover_traffic: Ct,
479        network: Net,
480        network_process: BoxedProcessFn,
481        ticket_events: T,
482        ticket_factory: TFact,
483        on_incoming_session: Option<Sender<IncomingSession>>,
484    ) -> errors::Result<(
485        HoprSocket<
486            futures::stream::BoxStream<'static, ApplicationDataIn>,
487            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
488        >,
489        AbortableList<HoprTransportProcess>,
490    )>
491    where
492        T: futures::Sink<hopr_api::node::TicketEvent> + Clone + Send + Unpin + 'static,
493        T::Error: std::error::Error + Clone + Send,
494        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
495        TFact: TicketFactory + Clone + Send + Sync + 'static,
496    {
497        let mut processes = AbortableList::<HoprTransportProcess>::default();
498
499        let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
500            channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
501
502        // -- transport medium
503
504        let transport_network = network;
505        let transport_layer_process = network_process;
506
507        let msg_codec = crate::protocol::HoprBinaryCodec {};
508        let (wire_msg_tx, wire_msg_rx) =
509            protocol::stream::process_stream_protocol(msg_codec, transport_network.clone(), self.cfg.stream).await?;
510
511        // Shared mixing channel: all per-destination clones of `mixing_channel_tx` push into one
512        // heap, so cross-destination packets are mixed together rather than each destination
513        // getting its own independent delay queue. The single forwarder task owns the receiver
514        // (and therefore the heap timer) — no per-clone waker coordination is needed.
515        let mut mixer_cfg = self.cfg.mixer;
516        mixer_cfg.metric_delay_window = u64::try_from(5 * mixer_cfg.delay_range.as_millis())
517            .unwrap_or(u64::MAX)
518            .max(1);
519        let (mixing_channel_tx, mix_rx) = hopr_transport_mixer::channel(mixer_cfg);
520        processes.insert(
521            HoprTransportProcess::MixerForwarder,
522            hopr_utils::spawn_as_abortable!(async move {
523                mix_rx
524                    .fold(wire_msg_tx, |mut sink, item| async move {
525                        if sink.send(item).await.is_err() {
526                            tracing::error!(
527                                task = %HoprTransportProcess::MixerForwarder,
528                                "wire sink dropped — discarding mixed packet"
529                            );
530                        }
531                        sink
532                    })
533                    .await;
534                tracing::warn!(
535                    task = %HoprTransportProcess::MixerForwarder,
536                    "long-running background task finished"
537                );
538            }),
539        );
540
541        // -- path cache background refresh (only when tokio runtime is available)
542        #[cfg(feature = "runtime-tokio")]
543        processes.insert(
544            HoprTransportProcess::PathRefresh,
545            hopr_utils::spawn_as_abortable!(self.path_planner.run_background_refresh()),
546        );
547
548        processes.insert(
549            HoprTransportProcess::Medium,
550            hopr_utils::spawn_as_abortable!(transport_layer_process().inspect(|_| tracing::warn!(
551                task = %HoprTransportProcess::Medium,
552                "long-running background task finished"
553            ))),
554        );
555
556        let msg_protocol_bidirectional_channel_capacity =
557            std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
558                .ok()
559                .and_then(|s| s.trim().parse::<usize>().ok())
560                .filter(|&c| c > 0)
561                .unwrap_or(16_384);
562
563        debug!(
564            capacity = msg_protocol_bidirectional_channel_capacity,
565            "creating protocol bidirectional channel"
566        );
567        let (tx_from_protocol, rx_from_protocol) =
568            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
569
570        // === START === cover traffic control
571        // Allocate a cover traffic tag from the session telemetry partition to avoid
572        // collisions with session and probing tags.
573        let cover_traffic_allocated_tag = self
574            .session_telemetry_tag_allocator
575            .allocate()
576            .ok_or_else(|| HoprTransportError::Api("failed to allocate cover traffic tag".into()))?;
577        let cover_traffic_tag: Tag = cover_traffic_allocated_tag.value().into();
578
579        // filter out the known cover traffic not to lose processing time with it
580        // The allocated tag is moved into the closure to keep it alive for the transport lifetime.
581        let rx_from_protocol = rx_from_protocol.filter_map(move |(pseudonym, data)| {
582            let _keep_alive = &cover_traffic_allocated_tag;
583            async move { (data.data.application_tag != cover_traffic_tag).then_some((pseudonym, data)) }
584        });
585
586        // prepare a cover traffic stream
587        let cover_traffic_stream = CoverTrafficGeneration::build(&cover_traffic).filter_map(move |routing| {
588            let start =
589                hopr_api::types::crypto_random::random_integer(0, Some((RANDOM_DATA.len() - 100) as u64)) as usize;
590            let data = &RANDOM_DATA[start..start + 100];
591
592            futures::future::ready(if let Ok(data) = ApplicationData::new(cover_traffic_tag, data) {
593                Some((routing, ApplicationDataOut::with_no_packet_info(data)))
594            } else {
595                tracing::error!("failed to construct cover traffic packet");
596                None
597            })
598        });
599
600        // merge cover traffic with other outgoing data
601        let merged_unresolved_output_data =
602            select_with_strategy(unresolved_routing_msg_rx, cover_traffic_stream, |_: &mut ()| {
603                futures::stream::PollNext::Left
604            });
605
606        // === END === cover traffic control
607
608        // We have to resolve DestinationRouting -> ResolvedTransportRouting before
609        // sending the external packets to the transport pipeline. Concurrency matches
610        // the encoder stage (output_concurrency) to avoid head-of-line blocking on
611        // cache-miss path lookups.
612        let path_planner = self.path_planner.clone();
613        let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
614        let routing_concurrency = {
615            let avail = std::thread::available_parallelism()
616                .ok()
617                .map(|n| n.get())
618                .unwrap_or(1)
619                .max(1)
620                * 8;
621            self.cfg
622                .packet
623                .pipeline
624                .output_concurrency
625                .filter(|&n| n > 0)
626                .unwrap_or(avail)
627        };
628        let all_resolved_external_msg_rx = merged_unresolved_output_data
629            .then_concurrent(
630                move |(unresolved, mut data)| {
631                    let path_planner = path_planner.clone();
632                    async move {
633                        trace!(?unresolved, "resolving routing for packet");
634                        match path_planner
635                            .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
636                            .await
637                        {
638                            Ok((resolved, rem_surbs)) => {
639                                // Set the SURB distress/out-of-SURBs flag if applicable.
640                                // These flags are translated into HOPR protocol packet signals and are
641                                // applicable only on the return path.
642                                let mut signals_to_dst = data
643                                    .packet_info
644                                    .as_ref()
645                                    .map(|info| info.signals_to_destination)
646                                    .unwrap_or_default();
647
648                                if resolved.is_return() {
649                                    signals_to_dst = match rem_surbs {
650                                        Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
651                                            signals_to_dst | PacketSignal::SurbDistress
652                                        }
653                                        Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
654                                        _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
655                                    };
656                                } else {
657                                    // Unset these flags as they make no sense on the forward path.
658                                    signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
659                                }
660
661                                data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
662                                trace!(?resolved, "resolved routing for packet");
663                                Some((resolved, data))
664                            }
665                            Err(error) => {
666                                error!(%error, "failed to resolve routing");
667                                None
668                            }
669                        }
670                    }
671                    .in_current_span()
672                },
673                routing_concurrency,
674            )
675            .filter_map(futures::future::ready);
676
677        let channels_dst = self
678            .chain_api
679            .domain_separators()
680            .await
681            .map_err(HoprTransportError::chain)?
682            .channel;
683
684        let pipeline_builder = HoprPacketPipelineBuilder::new()
685            .identity((&self.chain_key, &self.packet_key))
686            .transport((mixing_channel_tx, wire_msg_rx))
687            .api((tx_from_protocol, all_resolved_external_msg_rx))
688            .surb_store(self.path_planner.surb_store.clone())
689            .chain_api(self.chain_api.clone())
690            .ticket_factory(ticket_factory)
691            .channels_dst(channels_dst)
692            .with_counters(self.counters.clone())
693            .with_config(self.cfg.packet);
694
695        let pipeline_processes = match role {
696            protocol::NodeType::Relay => pipeline_builder.with_ticket_events(ticket_events).build_for_relay(),
697            protocol::NodeType::Exit => pipeline_builder.build_for_exit(),
698            protocol::NodeType::Entry => pipeline_builder.build_for_entry(),
699        };
700        processes.extend_from(pipeline_processes);
701
702        // -- periodic counter flush
703        let flush_counters = self.counters.clone();
704        let flush_graph = self.graph.clone();
705        let flush_me = *self.packet_key.public();
706        let flush_interval = self.cfg.counter_flush_interval;
707        processes.insert(
708            HoprTransportProcess::CounterFlush,
709            hopr_utils::spawn_as_abortable!(async move {
710                use hopr_api::graph::traits::{EdgeObservableWrite, EdgeWeightType};
711
712                futures_time::stream::interval(futures_time::time::Duration::from(flush_interval))
713                    .for_each(|_| {
714                        for (peer, num_packets, num_acks) in flush_counters.drain() {
715                            tracing::trace!(
716                                %peer,
717                                num_packets,
718                                num_acks,
719                                "flushing protocol conformance counters"
720                            );
721                            flush_graph.upsert_edge(&flush_me, &peer, |obs| {
722                                obs.record(EdgeWeightType::ImmediateProtocolConformance { num_packets, num_acks });
723                            });
724                        }
725                        futures::future::ready(())
726                    })
727                    .await;
728            }),
729        );
730
731        // -- network probing
732        let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
733            .ok()
734            .and_then(|s| s.trim().parse::<usize>().ok())
735            .filter(|&c| c > 0)
736            .unwrap_or(128);
737        debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
738        let (manual_ping_tx, manual_ping_rx) =
739            channel::<(OffchainPublicKey, PingQueryReplier)>(manual_ping_channel_capacity);
740
741        let probe = Probe::new(self.cfg.probe, self.probing_tag_allocator.clone());
742
743        let (probing_processes, probe_classifier) = probe
744            .continuously_scan(
745                unresolved_routing_msg_tx.clone(),
746                manual_ping_rx,
747                cover_traffic,
748                self.graph.clone(),
749            )
750            .await;
751
752        processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
753
754        // manual ping
755        self.ping
756            .clone()
757            .set(Pinger::new(
758                PingConfig {
759                    timeout: self.cfg.probe.timeout,
760                },
761                manual_ping_tx,
762            ))
763            .map_err(|_| HoprTransportError::Api("must set the ticket aggregation writer only once".into()))?;
764
765        // -- session management
766        let smgr_start_res = if role != protocol::NodeType::Entry {
767            // Relays and Exits can accept incoming Sessions
768            self.smgr.start(
769                unresolved_routing_msg_tx.clone(),
770                on_incoming_session.ok_or_else(|| {
771                    HoprTransportError::Api("on_incoming_session channel is required for relay/exit nodes".into())
772                })?,
773            )
774        } else {
775            // Entry nodes cannot accept incoming Sessions
776            self.smgr
777                .start(unresolved_routing_msg_tx.clone(), futures::sink::drain())
778        };
779
780        smgr_start_res
781            .map_err(|_| HoprTransportError::Api("failed to start session manager".into()))?
782            .into_iter()
783            .enumerate()
784            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
785            .for_each(|(k, v)| {
786                processes.insert(k, v);
787            });
788
789        // Wire incoming: cover-traffic-filtered stream → probe classify → (session dispatch).
790        // This stage must run in a background task, so the pipeline drains even when the
791        // caller discards the returned HoprSocket (e.g. edge-node builder).
792        //
793        // The channel uses a resilient for_each rather than .forward() so that a disconnected
794        // receiver (HoprSocket dropped without consuming) logs an error and continues rather
795        // than collapsing the entire ingress pipeline. Callers should use HoprSocket::reader()
796        // and actively drain the stream; see hopr-lib builder for the reference drain.
797        let (on_incoming_data_tx, on_incoming_data_rx) =
798            channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
799        let smgr = self.smgr.clone();
800        let unresolved_routing_msg_tx_for_task = unresolved_routing_msg_tx.clone();
801        processes.insert(
802            HoprTransportProcess::SessionsManagement(0),
803            hopr_utils::spawn_as_abortable!(async move {
804                probe_classifier
805                    .filter_stream(unresolved_routing_msg_tx_for_task, rx_from_protocol)
806                    .filter_map(move |(pseudonym, data)| {
807                        let smgr = smgr.clone();
808                        async move {
809                            match smgr.dispatch_message(pseudonym, data).await {
810                                Ok(DispatchResult::Processed) => {
811                                    tracing::trace!("message dispatch completed");
812                                    None
813                                }
814                                Ok(DispatchResult::Unrelated(data)) => {
815                                    tracing::trace!("unrelated message dispatch completed");
816                                    Some(data)
817                                }
818                                Err(error) => {
819                                    tracing::error!(%error, "error while dispatching packet in the session manager");
820                                    None
821                                }
822                            }
823                        }
824                    })
825                    .fold(on_incoming_data_tx, |mut tx, data| async move {
826                        if tx.send(data).await.is_err() {
827                            tracing::error!(
828                                task = %HoprTransportProcess::SessionsManagement(0),
829                                "incoming-data channel disconnected — dropping unrelated packet; \
830                                 HoprSocket must be consumed or drained by the caller"
831                            );
832                        }
833                        tx
834                    })
835                    .await;
836                tracing::warn!(
837                    task = %HoprTransportProcess::SessionsManagement(0),
838                    "long-running background task finished"
839                );
840            }),
841        );
842
843        // Populate the OnceLock at the end, making sure everything before didn't fail.
844        self.network
845            .clone()
846            .set(transport_network)
847            .map_err(|_| HoprTransportError::Api("transport network viewer already set".into()))?;
848
849        Ok((
850            (on_incoming_data_rx.boxed(), unresolved_routing_msg_tx).into(),
851            processes,
852        ))
853    }
854
855    #[tracing::instrument(level = "debug", skip(self))]
856    pub async fn ping(
857        &self,
858        peer: &OffchainPublicKey,
859    ) -> errors::Result<(std::time::Duration, <Graph as NetworkGraphView>::Observed)> {
860        let me: &OffchainPublicKey = self.packet_key.public();
861        if peer == me {
862            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
863        }
864
865        let pinger = self
866            .ping
867            .get()
868            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
869
870        let latency = (*pinger).ping(peer).await?;
871
872        if let Some(observations) = self.graph.edge(me, peer) {
873            Ok((latency, observations))
874        } else {
875            Err(HoprTransportError::Api(format!(
876                "no observations available for peer {peer}",
877            )))
878        }
879    }
880
881    #[tracing::instrument(level = "debug", skip(self))]
882    pub async fn new_session(
883        &self,
884        destination: Address,
885        target: SessionTarget,
886        cfg: SessionClientConfig,
887    ) -> errors::Result<(HoprSession, HoprSessionConfigurator)> {
888        let session = self.smgr.new_session(destination, target, cfg).await?;
889        let id = *session.id();
890        Ok((
891            session,
892            HoprSessionConfigurator {
893                id,
894                smgr: Arc::downgrade(&self.smgr),
895            },
896        ))
897    }
898
899    #[tracing::instrument(level = "debug", skip(self))]
900    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
901        self.network
902            .get()
903            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
904            .map(|network| network.listening_as().into_iter().collect())
905            .unwrap_or_default()
906    }
907
908    #[tracing::instrument(level = "debug", skip(self))]
909    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
910        let mut mas = self
911            .local_multiaddresses()
912            .into_iter()
913            .filter(|ma| {
914                crate::multiaddrs::is_supported(ma)
915                    && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
916            })
917            .map(|ma| strip_p2p_protocol(&ma))
918            .filter(|v| !v.is_empty())
919            .collect::<Vec<_>>();
920
921        mas.sort_by(|l, r| {
922            let is_left_dns = crate::multiaddrs::is_dns(l);
923            let is_right_dns = crate::multiaddrs::is_dns(r);
924
925            if !(is_left_dns ^ is_right_dns) {
926                std::cmp::Ordering::Equal
927            } else if is_left_dns {
928                std::cmp::Ordering::Less
929            } else {
930                std::cmp::Ordering::Greater
931            }
932        });
933
934        mas
935    }
936
937    /// Returns a reference to the network graph.
938    pub fn graph(&self) -> &Graph {
939        &self.graph
940    }
941
942    #[tracing::instrument(level = "debug", skip(self))]
943    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
944        self.network
945            .get()
946            .map(|network| network.listening_as().into_iter().collect())
947            .unwrap_or_else(|| {
948                tracing::error!("transport network is not yet initialized, cannot fetch announced multiaddresses");
949                self.my_multiaddresses.clone()
950            })
951    }
952
953    #[tracing::instrument(level = "debug", skip(self))]
954    pub async fn network_observed_multiaddresses(&self, peer: &OffchainPublicKey) -> Vec<Multiaddr> {
955        match self
956            .network
957            .get()
958            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
959        {
960            Ok(network) => network
961                .multiaddress_of(&peer.into())
962                .unwrap_or_default()
963                .into_iter()
964                .collect(),
965            Err(error) => {
966                tracing::error!(%error, "failed to get observed multiaddresses");
967                return vec![];
968            }
969        }
970    }
971
972    #[tracing::instrument(level = "debug", skip(self))]
973    pub async fn network_health(&self) -> Health {
974        self.network
975            .get()
976            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
977            .map(|network| network.health())
978            .unwrap_or(Health::Red)
979    }
980
981    pub async fn network_connected_peers(&self) -> errors::Result<Vec<OffchainPublicKey>> {
982        Ok(futures::stream::iter(
983            self.network
984                .get()
985                .ok_or_else(|| {
986                    tracing::error!("transport network is not yet initialized");
987                    HoprTransportError::Api("transport network is not yet initialized".into())
988                })?
989                .connected_peers(),
990        )
991        .filter_map(|peer_id| async move {
992            match peer_id_to_public_key(&peer_id) {
993                Ok(key) => Some(key),
994                Err(error) => {
995                    tracing::warn!(%peer_id, %error, "failed to convert PeerId to OffchainPublicKey");
996                    None
997                }
998            }
999        })
1000        .collect()
1001        .await)
1002    }
1003
1004    #[tracing::instrument(level = "debug", skip(self))]
1005    pub fn network_peer_observations(&self, peer: &OffchainPublicKey) -> Option<<Graph as NetworkGraphView>::Observed> {
1006        self.graph.edge(self.packet_key.public(), peer)
1007    }
1008
1009    /// Get connected peers with quality higher than some value.
1010    #[tracing::instrument(level = "debug", skip(self))]
1011    pub async fn all_network_peers(
1012        &self,
1013        minimum_score: f64,
1014    ) -> errors::Result<Vec<(OffchainPublicKey, <Graph as NetworkGraphView>::Observed)>> {
1015        let me = self.packet_key.public();
1016        Ok(self
1017            .network_connected_peers()
1018            .await?
1019            .into_iter()
1020            .filter_map(|peer| {
1021                let observation = self.graph.edge(me, &peer);
1022                if let Some(info) = observation {
1023                    if info.score() >= minimum_score {
1024                        Some((peer, info))
1025                    } else {
1026                        None
1027                    }
1028                } else {
1029                    None
1030                }
1031            })
1032            .collect::<Vec<_>>())
1033    }
1034}
1035
1036// ---------------------------------------------------------------------------
1037// NetworkView impl for HoprTransport — wraps OnceLock<Net> access
1038// ---------------------------------------------------------------------------
1039
1040impl<Chain, Graph, Net> NetworkView for HoprTransport<Chain, Graph, Net>
1041where
1042    Net: NetworkView + Send + Sync + 'static,
1043{
1044    fn listening_as(&self) -> std::collections::HashSet<Multiaddr> {
1045        self.network.get().map(|n| n.listening_as()).unwrap_or_default()
1046    }
1047
1048    fn multiaddress_of(&self, peer: &PeerId) -> Option<std::collections::HashSet<Multiaddr>> {
1049        self.network.get()?.multiaddress_of(peer)
1050    }
1051
1052    fn discovered_peers(&self) -> std::collections::HashSet<PeerId> {
1053        self.network.get().map(|n| n.discovered_peers()).unwrap_or_default()
1054    }
1055
1056    fn connected_peers(&self) -> std::collections::HashSet<PeerId> {
1057        self.network.get().map(|n| n.connected_peers()).unwrap_or_default()
1058    }
1059
1060    fn is_connected(&self, peer: &PeerId) -> bool {
1061        self.network.get().map(|n| n.is_connected(peer)).unwrap_or(false)
1062    }
1063
1064    fn health(&self) -> Health {
1065        self.network.get().map(|n| n.health()).unwrap_or(Health::Red)
1066    }
1067
1068    fn subscribe_network_events(
1069        &self,
1070    ) -> impl futures::Stream<Item = hopr_api::network::NetworkEvent> + Send + 'static {
1071        match self.network.get() {
1072            Some(n) => futures::future::Either::Left(n.subscribe_network_events()),
1073            None => futures::future::Either::Right(futures::stream::empty()),
1074        }
1075    }
1076}
1077
1078// ---------------------------------------------------------------------------
1079// TransportOperations impl for HoprTransport
1080// ---------------------------------------------------------------------------
1081
1082#[async_trait::async_trait]
1083impl<Chain, Graph, Net> hopr_api::node::TransportOperations for HoprTransport<Chain, Graph, Net>
1084where
1085    Chain: ChainReadChannelOperations
1086        + ChainReadAccountOperations
1087        + hopr_api::chain::ChainWriteTicketOperations
1088        + ChainKeyOperations
1089        + hopr_api::chain::ChainReadTicketOperations
1090        + ChainValues
1091        + Clone
1092        + Send
1093        + Sync
1094        + 'static,
1095    Graph: NetworkGraphView<NodeId = OffchainPublicKey>
1096        + NetworkGraphUpdate
1097        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1098        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1099        + Clone
1100        + Send
1101        + Sync
1102        + 'static,
1103    <Graph as NetworkGraphView>::Observed: EdgeObservableRead + Send,
1104    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed: EdgeObservableRead + Send + 'static,
1105    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1106    Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
1107{
1108    type Error = errors::HoprTransportError;
1109    type Observable = <Graph as NetworkGraphView>::Observed;
1110
1111    async fn ping(&self, key: &OffchainPublicKey) -> Result<(Duration, Self::Observable), Self::Error> {
1112        self.ping(key).await
1113    }
1114
1115    async fn observed_multiaddresses(&self, key: &OffchainPublicKey) -> Vec<Multiaddr> {
1116        self.network_observed_multiaddresses(key).await
1117    }
1118}