Skip to main content

hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! See also the `hopr_protocol_start` crate for details on Start sub-protocol which initiates a Session.
6//!
7//! As such, the transport layer components should be only those that are directly necessary to:
8//!
9//! 1. send and receive a packet, acknowledgement or ticket aggregation request
10//! 2. send and receive a network telemetry request
11//! 3. automate transport level processes
12//! 4. algorithms associated with the transport layer operational management
13//! 5. interface specifications to allow modular behavioral extensions
14
15/// Configuration of the [crate::HoprTransport].
16pub mod config;
17/// Constants used and exposed by the crate.
18pub mod constants;
19/// Errors used by the crate.
20pub mod errors;
21/// Graph-based path planning for the HOPR transport layer.
22pub mod path;
23/// Transport binary protocol layer (codec, pipeline, heartbeat, stream).
24pub mod protocol;
25
26mod multiaddrs;
27
28#[cfg(feature = "capture")]
29mod capture;
30mod pipeline;
31pub mod socket;
32
33use std::{
34    sync::{Arc, OnceLock},
35    time::Duration,
36};
37
38use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
39use futures::{
40    FutureExt, StreamExt,
41    channel::mpsc::{Sender, channel},
42    stream::select_with_strategy,
43};
44pub use hopr_api::{
45    Multiaddr, PeerId,
46    network::{Health, traits::NetworkView},
47    types::{
48        crypto::{
49            keypairs::{ChainKeypair, Keypair, OffchainKeypair},
50            types::{HalfKeyChallenge, Hash, OffchainPublicKey},
51        },
52        internal::{prelude::HoprPseudonym, routing::RoutingOptions},
53    },
54};
55use hopr_api::{
56    chain::{ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
57    ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
58    graph::{NetworkGraphUpdate, NetworkGraphView, traits::EdgeObservableRead},
59    network::{BoxedProcessFn, NetworkStreamControl},
60    types::primitive::prelude::*,
61};
62use hopr_crypto_packet::prelude::PacketSignal;
63pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
64use hopr_protocol_hopr::MemorySurbStore;
65use hopr_transport_mixer::MixerConfig;
66pub use hopr_transport_probe::{NeighborTelemetry, PathTelemetry, errors::ProbeError, ping::PingQueryReplier};
67use hopr_transport_probe::{
68    Probe,
69    ping::{PingConfig, Pinger},
70};
71pub use hopr_transport_session as session;
72#[cfg(feature = "runtime-tokio")]
73pub use hopr_transport_session::transfer_session;
74pub use hopr_transport_session::{
75    Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
76    SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
77    errors::{SessionManagerError, TransportSessionError},
78};
79use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
80#[cfg(feature = "telemetry")]
81pub use hopr_transport_session::{SessionAckMode, SessionLifecycleState};
82pub use hopr_transport_tag_allocator::TagAllocatorConfig;
83use hopr_utils::{network_types::prelude::*, runtime::AbortableList};
84pub use multiaddr::Protocol;
85use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
86use tracing::{Instrument, debug, error, trace, warn};
87
88#[cfg(feature = "runtime-tokio")]
89use crate::path::BackgroundPathCacheRefreshable;
90pub use crate::{config::HoprProtocolConfig, protocol::PeerProtocolCounterRegistry};
91use crate::{
92    constants::SESSION_INITIATION_TIMEOUT_BASE,
93    errors::HoprTransportError,
94    multiaddrs::strip_p2p_protocol,
95    path::{HoprGraphPathSelector, PathPlanner},
96    pipeline::HoprPacketPipelineBuilder,
97    socket::HoprSocket,
98};
99
100pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
101
102pub use hopr_api as api;
103use hopr_api::{
104    chain::{ChainReadTicketOperations, ChainWriteTicketOperations},
105    tickets::TicketFactory,
106    types::internal::routing::DestinationRouting,
107};
108
109// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
110lazy_static::lazy_static! {
111    static ref SESSION_INITIATION_TIMEOUT_MAX: Duration = 2 * SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
112
113    static ref PEER_ID_CACHE: moka::sync::Cache<PeerId, OffchainPublicKey> = moka::sync::Cache::builder()
114        .time_to_idle(Duration::from_mins(15))
115        .max_capacity(10_000)
116        .build();
117
118    static ref RANDOM_DATA: [u8; 400] = hopr_api::types::crypto_random::random_bytes();
119}
120
121/// PeerId -> OffchainPublicKey is a CPU-intensive blocking operation.
122///
123/// This helper uses a cached static object to speed up the lookup and avoid blocking the async
124/// runtime on repeated conversions for the same [`PeerId`]s.
125pub fn peer_id_to_public_key(peer_id: &PeerId) -> crate::errors::Result<OffchainPublicKey> {
126    PEER_ID_CACHE
127        .try_get_with_by_ref(peer_id, move || {
128            OffchainPublicKey::from_peerid(peer_id).map_err(|e| e.into())
129        })
130        .map_err(|e: Arc<HoprTransportError>| {
131            crate::errors::HoprTransportError::Other(anyhow::anyhow!(
132                "failed to convert peer_id ({:?}) to an offchain public key: {e}",
133                peer_id
134            ))
135        })
136}
137
138#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
139pub enum HoprTransportProcess {
140    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
141    Medium,
142    #[strum(to_string = "HOPR packet pipeline ({0})")]
143    Pipeline(protocol::PacketPipelineProcesses),
144    #[strum(to_string = "session manager sub-process #{0}")]
145    SessionsManagement(usize),
146    #[strum(to_string = "network probing sub-process: {0}")]
147    Probing(hopr_transport_probe::HoprProbeProcess),
148    #[cfg(feature = "runtime-tokio")]
149    #[strum(to_string = "path cache refresh")]
150    PathRefresh,
151    #[strum(to_string = "sync of outgoing ticket indices")]
152    OutgoingIndexSync,
153    #[strum(to_string = "periodic protocol counter flush")]
154    CounterFlush,
155    #[cfg(feature = "capture")]
156    #[strum(to_string = "packet capture")]
157    Capture,
158}
159
160/// HOPR protocol specific instantiation of the SessionManager.
161type HoprSessionManager = SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>>;
162
163/// Allows configuration of one specific [`HoprSession`].
164///
165/// The configurator does not prevent the Session from being closed
166/// or the Session manager from being dropped.
167#[derive(Debug, Clone)]
168pub struct HoprSessionConfigurator {
169    id: SessionId,
170    // Makes sure configurator does not extend lifetime of the SessionManager.
171    smgr: std::sync::Weak<HoprSessionManager>,
172}
173
174impl HoprSessionConfigurator {
175    /// [`SessionId`] of the session this object can configure.
176    pub fn id(&self) -> &SessionId {
177        &self.id
178    }
179
180    /// Sends a Session Keep-Alive packet over the Session.
181    ///
182    /// NOTE: This usually carries at least 2 SURBs on the HOPR protocol level and can be
183    /// used for manual SURB balancing.
184    ///
185    /// NOTE: This operation only sends the Session Keep-Alive packet and **DOES NOT** guarantee the other side
186    /// has received it.
187    pub async fn ping(&self) -> errors::Result<()> {
188        Ok(self
189            .smgr
190            .upgrade()
191            .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
192            .ping_session(&self.id)
193            .await?)
194    }
195
196    /// Gets the configuration of the SURB balancer.
197    ///
198    /// Returns an error if the Session is closed, the Session manager is gone.
199    ///
200    /// Returns `Ok(None)` if the Session has been created without a SURB balancer.
201    pub async fn get_surb_balancer_config(&self) -> errors::Result<Option<SurbBalancerConfig>> {
202        Ok(self
203            .smgr
204            .upgrade()
205            .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
206            .get_surb_balancer_config(&self.id)
207            .await?)
208    }
209
210    /// Updates the configuration of the SURB balancer.
211    ///
212    /// Returns an error if the Session is closed, the Session manager is gone, or the
213    /// Session has been created without a SURB balancer.
214    pub async fn update_surb_balancer_config(&self, config: SurbBalancerConfig) -> errors::Result<()> {
215        Ok(self
216            .smgr
217            .upgrade()
218            .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
219            .update_surb_balancer_config(&self.id, config)
220            .await?)
221    }
222
223    /// Explicitly closes the underlying Session in the [`SessionManager`].
224    ///
225    /// Returns `true` if the session was found and closed, `false` if it was
226    /// already gone (or the manager is dropped). Frees the per-session state
227    /// (frame reassembly buffers, control channels, …) immediately rather than
228    /// waiting for the manager's idle-timeout eviction.
229    pub async fn close(&self) -> bool {
230        match self.smgr.upgrade() {
231            Some(smgr) => smgr.close_session(&self.id).await,
232            None => false,
233        }
234    }
235}
236
237/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
238/// the transport.
239pub struct HoprTransport<Chain, Graph, Net> {
240    packet_key: OffchainKeypair,
241    chain_key: ChainKeypair,
242    chain_api: Chain,
243    ping: Arc<OnceLock<Pinger>>,
244    network: Arc<OnceLock<Net>>,
245    graph: Graph,
246    path_planner: PathPlanner<MemorySurbStore, Chain, HoprGraphPathSelector<Graph>>,
247    my_multiaddresses: Vec<Multiaddr>,
248    smgr: Arc<HoprSessionManager>,
249    session_telemetry_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
250    probing_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
251    counters: PeerProtocolCounterRegistry,
252    cfg: HoprProtocolConfig,
253}
254
255impl<Chain, Graph, Net> HoprTransport<Chain, Graph, Net>
256where
257    Chain: ChainReadChannelOperations
258        + ChainReadAccountOperations
259        + ChainWriteTicketOperations
260        + ChainKeyOperations
261        + ChainReadTicketOperations
262        + ChainValues
263        + Clone
264        + Send
265        + Sync
266        + 'static,
267    Graph: NetworkGraphView<NodeId = OffchainPublicKey>
268        + NetworkGraphUpdate
269        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
270        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
271        + Clone
272        + Send
273        + Sync
274        + 'static,
275    <Graph as NetworkGraphView>::Observed: hopr_api::graph::traits::EdgeObservableRead + Send,
276    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
277        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
278    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
279    Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
280{
281    pub fn new(
282        identity: (&ChainKeypair, &OffchainKeypair),
283        resolver: Chain,
284        graph: Graph,
285        my_multiaddresses: Vec<Multiaddr>,
286        cfg: HoprProtocolConfig,
287    ) -> errors::Result<Self> {
288        let me_offchain = *identity.1.public();
289        let planner_config = cfg.path_planner;
290        let selector = HoprGraphPathSelector::new(
291            me_offchain,
292            graph.clone(),
293            planner_config.max_cached_paths,
294            planner_config.edge_penalty,
295            planner_config.min_ack_rate,
296        );
297
298        let tag_allocators = hopr_transport_tag_allocator::create_allocators_from_config(&cfg.session.tag_allocator)?;
299
300        let mut session_tag_allocator = None;
301        let mut session_telemetry_tag_allocator = None;
302        let mut probing_tag_allocator = None;
303        for (usage, alloc) in tag_allocators {
304            match usage {
305                hopr_transport_tag_allocator::Usage::Session => session_tag_allocator = Some(alloc),
306                hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry => {
307                    session_telemetry_tag_allocator = Some(alloc)
308                }
309                hopr_transport_tag_allocator::Usage::ProvingTelemetry => probing_tag_allocator = Some(alloc),
310            }
311        }
312        let session_tag_allocator =
313            session_tag_allocator.ok_or_else(|| HoprTransportError::Api("session tag allocator missing".into()))?;
314        let session_telemetry_tag_allocator = session_telemetry_tag_allocator
315            .ok_or_else(|| HoprTransportError::Api("session telemetry tag allocator missing".into()))?;
316        let probing_tag_allocator =
317            probing_tag_allocator.ok_or_else(|| HoprTransportError::Api("probing tag allocator missing".into()))?;
318
319        Ok(Self {
320            packet_key: identity.1.clone(),
321            chain_key: identity.0.clone(),
322            ping: Arc::new(OnceLock::new()),
323            network: Arc::new(OnceLock::new()),
324            graph,
325            path_planner: PathPlanner::new(
326                me_offchain,
327                MemorySurbStore::new(cfg.packet.surb_store),
328                resolver.clone(),
329                selector,
330                planner_config,
331            ),
332            my_multiaddresses,
333            smgr: Arc::new(SessionManager::new(
334                SessionManagerConfig {
335                    frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
336                        .ok()
337                        .and_then(|s| s.parse::<usize>().ok())
338                        .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
339                        .max(ApplicationData::PAYLOAD_SIZE),
340                    max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
341                        .ok()
342                        .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
343                        .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
344                        .max(Duration::from_millis(100)),
345                    initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
346                    idle_timeout: cfg.session.idle_timeout,
347                    balancer_sampling_interval: cfg.session.balancer_sampling_interval,
348                    initial_return_session_egress_rate: 10,
349                    minimum_surb_buffer_duration: cfg.session.balancer_minimum_surb_buffer_duration,
350                    maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
351                    surb_balance_notify_period: None,
352                    surb_target_notify: true,
353                },
354                session_tag_allocator,
355            )),
356            chain_api: resolver,
357            session_telemetry_tag_allocator,
358            probing_tag_allocator,
359            counters: PeerProtocolCounterRegistry::default(),
360            cfg,
361        })
362    }
363
364    /// Execute all processes of the [`HoprTransport`] object as a **Relay** node.
365    ///
366    /// Relay nodes run the full packet pipeline including incoming ticket/acknowledgement
367    /// processing and require a [`futures::Sink`] for ticket events as well as an
368    /// `on_incoming_session` channel from the SessionManager (they can accept incoming sessions).
369    pub async fn run_relay<T, TFact, Ct>(
370        &self,
371        cover_traffic: Ct,
372        network: Net,
373        network_process: BoxedProcessFn,
374        ticket_events: T,
375        ticket_factory: TFact,
376        on_incoming_session: Sender<IncomingSession>,
377    ) -> errors::Result<(
378        HoprSocket<
379            futures::stream::BoxStream<'static, ApplicationDataIn>,
380            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
381        >,
382        AbortableList<HoprTransportProcess>,
383    )>
384    where
385        T: futures::Sink<hopr_api::node::TicketEvent> + Clone + Send + Unpin + 'static,
386        T::Error: std::error::Error + Clone + Send,
387        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
388        TFact: TicketFactory + Clone + Send + Sync + 'static,
389    {
390        self.run_inner(
391            protocol::NodeType::Relay,
392            cover_traffic,
393            network,
394            network_process,
395            ticket_events,
396            ticket_factory,
397            Some(on_incoming_session),
398        )
399        .await
400    }
401
402    /// Execute all processes of the [`HoprTransport`] object as an **Exit** (destination) node.
403    ///
404    /// Exit nodes do not process tickets but keep the incoming acknowledgement
405    /// pipeline running and can accept incoming sessions via SessionManager.
406    pub async fn run_exit<TFact, Ct>(
407        &self,
408        cover_traffic: Ct,
409        network: Net,
410        network_process: BoxedProcessFn,
411        ticket_factory: TFact,
412        on_incoming_session: Sender<IncomingSession>,
413    ) -> errors::Result<(
414        HoprSocket<
415            futures::stream::BoxStream<'static, ApplicationDataIn>,
416            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
417        >,
418        AbortableList<HoprTransportProcess>,
419    )>
420    where
421        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
422        TFact: TicketFactory + Clone + Send + Sync + 'static,
423    {
424        self.run_inner(
425            protocol::NodeType::Exit,
426            cover_traffic,
427            network,
428            network_process,
429            futures::sink::drain(),
430            ticket_factory,
431            Some(on_incoming_session),
432        )
433        .await
434    }
435
436    /// Execute all processes of the [`HoprTransport`] object as an **Entry** (source) node.
437    ///
438    /// Entry nodes do not process tickets, do not start the incoming acknowledgement
439    /// pipeline, and do not accept incoming sessions — therefore, they require neither a
440    /// `ticket_events` sink nor an `on_incoming_session` channel.
441    pub async fn run_entry<TFact, Ct>(
442        &self,
443        cover_traffic: Ct,
444        network: Net,
445        network_process: BoxedProcessFn,
446        ticket_factory: TFact,
447    ) -> errors::Result<(
448        HoprSocket<
449            futures::stream::BoxStream<'static, ApplicationDataIn>,
450            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
451        >,
452        AbortableList<HoprTransportProcess>,
453    )>
454    where
455        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
456        TFact: TicketFactory + Clone + Send + Sync + 'static,
457    {
458        self.run_inner(
459            protocol::NodeType::Entry,
460            cover_traffic,
461            network,
462            network_process,
463            futures::sink::drain(),
464            ticket_factory,
465            None,
466        )
467        .await
468    }
469
470    /// Internal worker driving all node-type variants of `HoprTransport::run_*`.
471    ///
472    /// Branches on `role`:
473    /// - [`protocol::NodeType::Relay`]: full packet pipeline + SessionManager.
474    /// - [`protocol::NodeType::Exit`]: ack-drain pipeline + incoming Sessions.
475    /// - [`protocol::NodeType::Entry`]: no ack pipeline, no incoming Sessions.
476    #[allow(clippy::too_many_arguments)]
477    async fn run_inner<T, TFact, Ct>(
478        &self,
479        role: protocol::NodeType,
480        cover_traffic: Ct,
481        network: Net,
482        network_process: BoxedProcessFn,
483        ticket_events: T,
484        ticket_factory: TFact,
485        on_incoming_session: Option<Sender<IncomingSession>>,
486    ) -> errors::Result<(
487        HoprSocket<
488            futures::stream::BoxStream<'static, ApplicationDataIn>,
489            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
490        >,
491        AbortableList<HoprTransportProcess>,
492    )>
493    where
494        T: futures::Sink<hopr_api::node::TicketEvent> + Clone + Send + Unpin + 'static,
495        T::Error: std::error::Error + Clone + Send,
496        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
497        TFact: TicketFactory + Clone + Send + Sync + 'static,
498    {
499        let mut processes = AbortableList::<HoprTransportProcess>::default();
500
501        let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
502            channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
503
504        // -- transport medium
505
506        let transport_network = network;
507        let transport_layer_process = network_process;
508
509        let msg_codec = crate::protocol::HoprBinaryCodec {};
510        let (wire_msg_tx, wire_msg_rx) =
511            protocol::stream::process_stream_protocol(msg_codec, transport_network.clone()).await?;
512
513        let mixing_channel_tx = hopr_transport_mixer::MixerSink::new(wire_msg_tx, build_mixer_cfg_from_env());
514
515        // -- path cache background refresh (only when tokio runtime is available)
516        #[cfg(feature = "runtime-tokio")]
517        processes.insert(
518            HoprTransportProcess::PathRefresh,
519            hopr_utils::spawn_as_abortable!(self.path_planner.run_background_refresh()),
520        );
521
522        processes.insert(
523            HoprTransportProcess::Medium,
524            hopr_utils::spawn_as_abortable!(transport_layer_process().inspect(|_| tracing::warn!(
525                task = %HoprTransportProcess::Medium,
526                "long-running background task finished"
527            ))),
528        );
529
530        let msg_protocol_bidirectional_channel_capacity =
531            std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
532                .ok()
533                .and_then(|s| s.trim().parse::<usize>().ok())
534                .filter(|&c| c > 0)
535                .unwrap_or(16_384);
536
537        debug!(
538            capacity = msg_protocol_bidirectional_channel_capacity,
539            "creating protocol bidirectional channel"
540        );
541        let (tx_from_protocol, rx_from_protocol) =
542            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
543
544        // === START === cover traffic control
545        // Allocate a cover traffic tag from the session telemetry partition to avoid
546        // collisions with session and probing tags.
547        let cover_traffic_allocated_tag = self
548            .session_telemetry_tag_allocator
549            .allocate()
550            .ok_or_else(|| HoprTransportError::Api("failed to allocate cover traffic tag".into()))?;
551        let cover_traffic_tag: Tag = cover_traffic_allocated_tag.value().into();
552
553        // filter out the known cover traffic not to lose processing time with it
554        // The allocated tag is moved into the closure to keep it alive for the transport lifetime.
555        let rx_from_protocol = rx_from_protocol.filter_map(move |(pseudonym, data)| {
556            let _keep_alive = &cover_traffic_allocated_tag;
557            async move { (data.data.application_tag != cover_traffic_tag).then_some((pseudonym, data)) }
558        });
559
560        // prepare a cover traffic stream
561        let cover_traffic_stream = CoverTrafficGeneration::build(&cover_traffic).filter_map(move |routing| {
562            let start =
563                hopr_api::types::crypto_random::random_integer(0, Some((RANDOM_DATA.len() - 100) as u64)) as usize;
564            let data = &RANDOM_DATA[start..start + 100];
565
566            futures::future::ready(if let Ok(data) = ApplicationData::new(cover_traffic_tag, data) {
567                Some((routing, ApplicationDataOut::with_no_packet_info(data)))
568            } else {
569                tracing::error!("failed to construct cover traffic packet");
570                None
571            })
572        });
573
574        // merge cover traffic with other outgoing data
575        let merged_unresolved_output_data =
576            select_with_strategy(unresolved_routing_msg_rx, cover_traffic_stream, |_: &mut ()| {
577                futures::stream::PollNext::Left
578            });
579
580        // === END === cover traffic control
581
582        // We have to resolve DestinationRouting -> ResolvedTransportRouting before
583        // sending the external packets to the transport pipeline. Concurrency matches
584        // the encoder stage (output_concurrency) to avoid head-of-line blocking on
585        // cache-miss path lookups.
586        let path_planner = self.path_planner.clone();
587        let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
588        let routing_concurrency = {
589            let avail = std::thread::available_parallelism()
590                .ok()
591                .map(|n| n.get())
592                .unwrap_or(1)
593                .max(1)
594                * 8;
595            self.cfg
596                .packet
597                .pipeline
598                .output_concurrency
599                .filter(|&n| n > 0)
600                .unwrap_or(avail)
601        };
602        let all_resolved_external_msg_rx = merged_unresolved_output_data
603            .then_concurrent(
604                move |(unresolved, mut data)| {
605                    let path_planner = path_planner.clone();
606                    async move {
607                        trace!(?unresolved, "resolving routing for packet");
608                        match path_planner
609                            .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
610                            .await
611                        {
612                            Ok((resolved, rem_surbs)) => {
613                                // Set the SURB distress/out-of-SURBs flag if applicable.
614                                // These flags are translated into HOPR protocol packet signals and are
615                                // applicable only on the return path.
616                                let mut signals_to_dst = data
617                                    .packet_info
618                                    .as_ref()
619                                    .map(|info| info.signals_to_destination)
620                                    .unwrap_or_default();
621
622                                if resolved.is_return() {
623                                    signals_to_dst = match rem_surbs {
624                                        Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
625                                            signals_to_dst | PacketSignal::SurbDistress
626                                        }
627                                        Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
628                                        _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
629                                    };
630                                } else {
631                                    // Unset these flags as they make no sense on the forward path.
632                                    signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
633                                }
634
635                                data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
636                                trace!(?resolved, "resolved routing for packet");
637                                Some((resolved, data))
638                            }
639                            Err(error) => {
640                                error!(%error, "failed to resolve routing");
641                                None
642                            }
643                        }
644                    }
645                    .in_current_span()
646                },
647                routing_concurrency,
648            )
649            .filter_map(futures::future::ready);
650
651        let channels_dst = self
652            .chain_api
653            .domain_separators()
654            .await
655            .map_err(HoprTransportError::chain)?
656            .channel;
657
658        let pipeline_builder = HoprPacketPipelineBuilder::new()
659            .identity((&self.chain_key, &self.packet_key))
660            .transport((mixing_channel_tx, wire_msg_rx))
661            .api((tx_from_protocol, all_resolved_external_msg_rx))
662            .surb_store(self.path_planner.surb_store.clone())
663            .chain_api(self.chain_api.clone())
664            .ticket_factory(ticket_factory)
665            .channels_dst(channels_dst)
666            .with_counters(self.counters.clone())
667            .with_config(self.cfg.packet);
668
669        let pipeline_processes = match role {
670            protocol::NodeType::Relay => pipeline_builder.with_ticket_events(ticket_events).build_for_relay(),
671            protocol::NodeType::Exit => pipeline_builder.build_for_exit(),
672            protocol::NodeType::Entry => pipeline_builder.build_for_entry(),
673        };
674        processes.extend_from(pipeline_processes);
675
676        // -- periodic counter flush
677        let flush_counters = self.counters.clone();
678        let flush_graph = self.graph.clone();
679        let flush_me = *self.packet_key.public();
680        let flush_interval = self.cfg.counter_flush_interval;
681        processes.insert(
682            HoprTransportProcess::CounterFlush,
683            hopr_utils::spawn_as_abortable!(async move {
684                use hopr_api::graph::traits::{EdgeObservableWrite, EdgeWeightType};
685
686                futures_time::stream::interval(futures_time::time::Duration::from(flush_interval))
687                    .for_each(|_| {
688                        for (peer, num_packets, num_acks) in flush_counters.drain() {
689                            tracing::trace!(
690                                %peer,
691                                num_packets,
692                                num_acks,
693                                "flushing protocol conformance counters"
694                            );
695                            flush_graph.upsert_edge(&flush_me, &peer, |obs| {
696                                obs.record(EdgeWeightType::ImmediateProtocolConformance { num_packets, num_acks });
697                            });
698                        }
699                        futures::future::ready(())
700                    })
701                    .await;
702            }),
703        );
704
705        // -- network probing
706        let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
707            .ok()
708            .and_then(|s| s.trim().parse::<usize>().ok())
709            .filter(|&c| c > 0)
710            .unwrap_or(128);
711        debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
712        let (manual_ping_tx, manual_ping_rx) =
713            channel::<(OffchainPublicKey, PingQueryReplier)>(manual_ping_channel_capacity);
714
715        let probe = Probe::new(self.cfg.probe, self.probing_tag_allocator.clone());
716
717        let (probing_processes, probe_classifier) = probe
718            .continuously_scan(
719                unresolved_routing_msg_tx.clone(),
720                manual_ping_rx,
721                cover_traffic,
722                self.graph.clone(),
723            )
724            .await;
725
726        processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
727
728        // manual ping
729        self.ping
730            .clone()
731            .set(Pinger::new(
732                PingConfig {
733                    timeout: self.cfg.probe.timeout,
734                },
735                manual_ping_tx,
736            ))
737            .map_err(|_| HoprTransportError::Api("must set the ticket aggregation writer only once".into()))?;
738
739        // -- session management
740        let smgr_start_res = if role != protocol::NodeType::Entry {
741            // Relays and Exits can accept incoming Sessions
742            self.smgr.start(
743                unresolved_routing_msg_tx.clone(),
744                on_incoming_session.ok_or_else(|| {
745                    HoprTransportError::Api("on_incoming_session channel is required for relay/exit nodes".into())
746                })?,
747            )
748        } else {
749            // Entry nodes cannot accept incoming Sessions
750            self.smgr
751                .start(unresolved_routing_msg_tx.clone(), futures::sink::drain())
752        };
753
754        smgr_start_res
755            .map_err(|_| HoprTransportError::Api("failed to start session manager".into()))?
756            .into_iter()
757            .enumerate()
758            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
759            .for_each(|(k, v)| {
760                processes.insert(k, v);
761            });
762
763        // Wire incoming: cover-traffic-filtered stream → probe classify → (session dispatch).
764        // This stage must run in a background task, so the pipeline drains even when the
765        // caller discards the returned HoprSocket (e.g. edge-node builder).
766        let (on_incoming_data_tx, on_incoming_data_rx) =
767            channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
768        let smgr = self.smgr.clone();
769        processes.insert(
770            HoprTransportProcess::SessionsManagement(0),
771            hopr_utils::spawn_as_abortable!(
772                probe_classifier
773                    .filter_stream(unresolved_routing_msg_tx.clone(), rx_from_protocol)
774                    .filter_map(move |(pseudonym, data)| {
775                        let smgr = smgr.clone();
776                        async move {
777                            match smgr.dispatch_message(pseudonym, data).await {
778                                Ok(DispatchResult::Processed) => {
779                                    tracing::trace!("message dispatch completed");
780                                    None
781                                }
782                                Ok(DispatchResult::Unrelated(data)) => {
783                                    tracing::trace!("unrelated message dispatch completed");
784                                    Some(data)
785                                }
786                                Err(error) => {
787                                    tracing::error!(%error, "error while dispatching packet in the session manager");
788                                    None
789                                }
790                            }
791                        }
792                    })
793                    .map(Ok)
794                    .forward(on_incoming_data_tx)
795                    .inspect(|_| tracing::warn!(
796                        task = %HoprTransportProcess::SessionsManagement(0),
797                        "long-running background task finished"
798                    ))
799            ),
800        );
801
802        // Populate the OnceLock at the end, making sure everything before didn't fail.
803        self.network
804            .clone()
805            .set(transport_network)
806            .map_err(|_| HoprTransportError::Api("transport network viewer already set".into()))?;
807
808        Ok((
809            (on_incoming_data_rx.boxed(), unresolved_routing_msg_tx).into(),
810            processes,
811        ))
812    }
813
814    #[tracing::instrument(level = "debug", skip(self))]
815    pub async fn ping(
816        &self,
817        peer: &OffchainPublicKey,
818    ) -> errors::Result<(std::time::Duration, <Graph as NetworkGraphView>::Observed)> {
819        let me: &OffchainPublicKey = self.packet_key.public();
820        if peer == me {
821            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
822        }
823
824        let pinger = self
825            .ping
826            .get()
827            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
828
829        let latency = (*pinger).ping(peer).await?;
830
831        if let Some(observations) = self.graph.edge(me, peer) {
832            Ok((latency, observations))
833        } else {
834            Err(HoprTransportError::Api(format!(
835                "no observations available for peer {peer}",
836            )))
837        }
838    }
839
840    #[tracing::instrument(level = "debug", skip(self))]
841    pub async fn new_session(
842        &self,
843        destination: Address,
844        target: SessionTarget,
845        cfg: SessionClientConfig,
846    ) -> errors::Result<(HoprSession, HoprSessionConfigurator)> {
847        let session = self.smgr.new_session(destination, target, cfg).await?;
848        let id = *session.id();
849        Ok((
850            session,
851            HoprSessionConfigurator {
852                id,
853                smgr: Arc::downgrade(&self.smgr),
854            },
855        ))
856    }
857
858    #[tracing::instrument(level = "debug", skip(self))]
859    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
860        self.network
861            .get()
862            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
863            .map(|network| network.listening_as().into_iter().collect())
864            .unwrap_or_default()
865    }
866
867    #[tracing::instrument(level = "debug", skip(self))]
868    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
869        let mut mas = self
870            .local_multiaddresses()
871            .into_iter()
872            .filter(|ma| {
873                crate::multiaddrs::is_supported(ma)
874                    && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
875            })
876            .map(|ma| strip_p2p_protocol(&ma))
877            .filter(|v| !v.is_empty())
878            .collect::<Vec<_>>();
879
880        mas.sort_by(|l, r| {
881            let is_left_dns = crate::multiaddrs::is_dns(l);
882            let is_right_dns = crate::multiaddrs::is_dns(r);
883
884            if !(is_left_dns ^ is_right_dns) {
885                std::cmp::Ordering::Equal
886            } else if is_left_dns {
887                std::cmp::Ordering::Less
888            } else {
889                std::cmp::Ordering::Greater
890            }
891        });
892
893        mas
894    }
895
896    /// Returns a reference to the network graph.
897    pub fn graph(&self) -> &Graph {
898        &self.graph
899    }
900
901    #[tracing::instrument(level = "debug", skip(self))]
902    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
903        self.network
904            .get()
905            .map(|network| network.listening_as().into_iter().collect())
906            .unwrap_or_else(|| {
907                tracing::error!("transport network is not yet initialized, cannot fetch announced multiaddresses");
908                self.my_multiaddresses.clone()
909            })
910    }
911
912    #[tracing::instrument(level = "debug", skip(self))]
913    pub async fn network_observed_multiaddresses(&self, peer: &OffchainPublicKey) -> Vec<Multiaddr> {
914        match self
915            .network
916            .get()
917            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
918        {
919            Ok(network) => network
920                .multiaddress_of(&peer.into())
921                .unwrap_or_default()
922                .into_iter()
923                .collect(),
924            Err(error) => {
925                tracing::error!(%error, "failed to get observed multiaddresses");
926                return vec![];
927            }
928        }
929    }
930
931    #[tracing::instrument(level = "debug", skip(self))]
932    pub async fn network_health(&self) -> Health {
933        self.network
934            .get()
935            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
936            .map(|network| network.health())
937            .unwrap_or(Health::Red)
938    }
939
940    pub async fn network_connected_peers(&self) -> errors::Result<Vec<OffchainPublicKey>> {
941        Ok(futures::stream::iter(
942            self.network
943                .get()
944                .ok_or_else(|| {
945                    tracing::error!("transport network is not yet initialized");
946                    HoprTransportError::Api("transport network is not yet initialized".into())
947                })?
948                .connected_peers(),
949        )
950        .filter_map(|peer_id| async move {
951            match peer_id_to_public_key(&peer_id) {
952                Ok(key) => Some(key),
953                Err(error) => {
954                    tracing::warn!(%peer_id, %error, "failed to convert PeerId to OffchainPublicKey");
955                    None
956                }
957            }
958        })
959        .collect()
960        .await)
961    }
962
963    #[tracing::instrument(level = "debug", skip(self))]
964    pub fn network_peer_observations(&self, peer: &OffchainPublicKey) -> Option<<Graph as NetworkGraphView>::Observed> {
965        self.graph.edge(self.packet_key.public(), peer)
966    }
967
968    /// Get connected peers with quality higher than some value.
969    #[tracing::instrument(level = "debug", skip(self))]
970    pub async fn all_network_peers(
971        &self,
972        minimum_score: f64,
973    ) -> errors::Result<Vec<(OffchainPublicKey, <Graph as NetworkGraphView>::Observed)>> {
974        let me = self.packet_key.public();
975        Ok(self
976            .network_connected_peers()
977            .await?
978            .into_iter()
979            .filter_map(|peer| {
980                let observation = self.graph.edge(me, &peer);
981                if let Some(info) = observation {
982                    if info.score() >= minimum_score {
983                        Some((peer, info))
984                    } else {
985                        None
986                    }
987                } else {
988                    None
989                }
990            })
991            .collect::<Vec<_>>())
992    }
993}
994
995fn build_mixer_cfg_from_env() -> MixerConfig {
996    let mixer_cfg = MixerConfig {
997        min_delay: std::time::Duration::from_millis(
998            std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
999                .map(|v| {
1000                    v.trim()
1001                        .parse::<u64>()
1002                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
1003                })
1004                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
1005        ),
1006        delay_range: std::time::Duration::from_millis(
1007            std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
1008                .map(|v| {
1009                    v.trim()
1010                        .parse::<u64>()
1011                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
1012                })
1013                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
1014        ),
1015        capacity: {
1016            let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
1017                .ok()
1018                .and_then(|s| s.trim().parse::<usize>().ok())
1019                .filter(|&c| c > 0)
1020                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
1021            debug!(capacity = capacity, "Setting mixer capacity");
1022            capacity
1023        },
1024        ..MixerConfig::default()
1025    };
1026    debug!(?mixer_cfg, "Mixer configuration");
1027
1028    mixer_cfg
1029}
1030
1031// ---------------------------------------------------------------------------
1032// NetworkView impl for HoprTransport — wraps OnceLock<Net> access
1033// ---------------------------------------------------------------------------
1034
1035impl<Chain, Graph, Net> NetworkView for HoprTransport<Chain, Graph, Net>
1036where
1037    Net: NetworkView + Send + Sync + 'static,
1038{
1039    fn listening_as(&self) -> std::collections::HashSet<Multiaddr> {
1040        self.network.get().map(|n| n.listening_as()).unwrap_or_default()
1041    }
1042
1043    fn multiaddress_of(&self, peer: &PeerId) -> Option<std::collections::HashSet<Multiaddr>> {
1044        self.network.get()?.multiaddress_of(peer)
1045    }
1046
1047    fn discovered_peers(&self) -> std::collections::HashSet<PeerId> {
1048        self.network.get().map(|n| n.discovered_peers()).unwrap_or_default()
1049    }
1050
1051    fn connected_peers(&self) -> std::collections::HashSet<PeerId> {
1052        self.network.get().map(|n| n.connected_peers()).unwrap_or_default()
1053    }
1054
1055    fn is_connected(&self, peer: &PeerId) -> bool {
1056        self.network.get().map(|n| n.is_connected(peer)).unwrap_or(false)
1057    }
1058
1059    fn health(&self) -> Health {
1060        self.network.get().map(|n| n.health()).unwrap_or(Health::Red)
1061    }
1062
1063    fn subscribe_network_events(
1064        &self,
1065    ) -> impl futures::Stream<Item = hopr_api::network::NetworkEvent> + Send + 'static {
1066        match self.network.get() {
1067            Some(n) => futures::future::Either::Left(n.subscribe_network_events()),
1068            None => futures::future::Either::Right(futures::stream::empty()),
1069        }
1070    }
1071}
1072
1073// ---------------------------------------------------------------------------
1074// TransportOperations impl for HoprTransport
1075// ---------------------------------------------------------------------------
1076
1077#[async_trait::async_trait]
1078impl<Chain, Graph, Net> hopr_api::node::TransportOperations for HoprTransport<Chain, Graph, Net>
1079where
1080    Chain: ChainReadChannelOperations
1081        + ChainReadAccountOperations
1082        + hopr_api::chain::ChainWriteTicketOperations
1083        + ChainKeyOperations
1084        + hopr_api::chain::ChainReadTicketOperations
1085        + ChainValues
1086        + Clone
1087        + Send
1088        + Sync
1089        + 'static,
1090    Graph: NetworkGraphView<NodeId = OffchainPublicKey>
1091        + NetworkGraphUpdate
1092        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1093        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1094        + Clone
1095        + Send
1096        + Sync
1097        + 'static,
1098    <Graph as NetworkGraphView>::Observed: EdgeObservableRead + Send,
1099    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed: EdgeObservableRead + Send + 'static,
1100    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1101    Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
1102{
1103    type Error = errors::HoprTransportError;
1104    type Observable = <Graph as NetworkGraphView>::Observed;
1105
1106    async fn ping(&self, key: &OffchainPublicKey) -> Result<(Duration, Self::Observable), Self::Error> {
1107        self.ping(key).await
1108    }
1109
1110    async fn observed_multiaddresses(&self, key: &OffchainPublicKey) -> Vec<Multiaddr> {
1111        self.network_observed_multiaddresses(key).await
1112    }
1113}