Skip to main content

hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! See also the `hopr_protocol_start` crate for details on Start sub-protocol which initiates a Session.
6//!
7//! As such, the transport layer components should be only those that are directly necessary to:
8//!
9//! 1. send and receive a packet, acknowledgement or ticket aggregation request
10//! 2. send and receive a network telemetry request
11//! 3. automate transport level processes
12//! 4. algorithms associated with the transport layer operational management
13//! 5. interface specifications to allow modular behavioral extensions
14
15/// Configuration of the [crate::HoprTransport].
16pub mod config;
17/// Constants used and exposed by the crate.
18pub mod constants;
19/// Errors used by the crate.
20pub mod errors;
21
22mod multiaddrs;
23
24#[cfg(feature = "capture")]
25mod capture;
26mod pipeline;
27pub mod socket;
28
29use std::{
30    sync::{Arc, OnceLock},
31    time::Duration,
32};
33
34use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
35use futures::{
36    FutureExt, StreamExt,
37    channel::mpsc::{Sender, channel},
38    stream::select_with_strategy,
39};
40pub use hopr_api::{
41    Multiaddr, PeerId,
42    network::{Health, traits::NetworkView},
43    types::{
44        crypto::{
45            keypairs::{ChainKeypair, Keypair, OffchainKeypair},
46            types::{HalfKeyChallenge, Hash, OffchainPublicKey},
47        },
48        internal::{prelude::HoprPseudonym, routing::RoutingOptions},
49    },
50};
51use hopr_api::{
52    chain::{ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
53    ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
54    graph::{NetworkGraphUpdate, NetworkGraphView, traits::EdgeObservableRead},
55    network::{BoxedProcessFn, NetworkStreamControl},
56    types::primitive::prelude::*,
57};
58use hopr_async_runtime::{AbortableList, prelude::spawn, spawn_as_abortable};
59use hopr_crypto_packet::prelude::PacketSignal;
60use hopr_network_types::prelude::*;
61pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
62use hopr_protocol_hopr::MemorySurbStore;
63use hopr_transport_mixer::MixerConfig;
64use hopr_transport_path::{BackgroundPathCacheRefreshable, HoprGraphPathSelector, PathPlanner};
65pub use hopr_transport_probe::{NeighborTelemetry, PathTelemetry, errors::ProbeError, ping::PingQueryReplier};
66use hopr_transport_probe::{
67    Probe,
68    ping::{PingConfig, Pinger},
69};
70pub use hopr_transport_protocol::PeerProtocolCounterRegistry;
71pub use hopr_transport_session as session;
72#[cfg(feature = "runtime-tokio")]
73pub use hopr_transport_session::transfer_session;
74pub use hopr_transport_session::{
75    Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
76    SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
77    errors::{SessionManagerError, TransportSessionError},
78};
79use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
80#[cfg(feature = "telemetry")]
81pub use hopr_transport_session::{SessionAckMode, SessionLifecycleState};
82pub use hopr_transport_tag_allocator::TagAllocatorConfig;
83pub use multiaddr::Protocol;
84use tracing::{Instrument, debug, error, trace, warn};
85
86pub use crate::config::HoprProtocolConfig;
87use crate::{
88    constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, multiaddrs::strip_p2p_protocol,
89    pipeline::HoprPipelineComponents, socket::HoprSocket,
90};
91
92pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
93
94pub use hopr_api as api;
95use hopr_api::{
96    chain::{ChainReadTicketOperations, ChainWriteTicketOperations},
97    tickets::TicketFactory,
98    types::internal::routing::DestinationRouting,
99};
100
101// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
102lazy_static::lazy_static! {
103    static ref SESSION_INITIATION_TIMEOUT_MAX: Duration = 2 * SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
104
105    static ref PEER_ID_CACHE: moka::sync::Cache<PeerId, OffchainPublicKey> = moka::sync::Cache::builder()
106        .time_to_idle(Duration::from_mins(15))
107        .max_capacity(10_000)
108        .build();
109
110    static ref RANDOM_DATA: [u8; 400] = hopr_api::types::crypto_random::random_bytes();
111}
112
113/// PeerId -> OffchainPublicKey is a CPU-intensive blocking operation.
114///
115/// This helper uses a cached static object to speed up the lookup and avoid blocking the async
116/// runtime on repeated conversions for the same [`PeerId`]s.
117pub fn peer_id_to_public_key(peer_id: &PeerId) -> crate::errors::Result<OffchainPublicKey> {
118    PEER_ID_CACHE
119        .try_get_with_by_ref(peer_id, move || {
120            OffchainPublicKey::from_peerid(peer_id).map_err(|e| e.into())
121        })
122        .map_err(|e: Arc<HoprTransportError>| {
123            crate::errors::HoprTransportError::Other(anyhow::anyhow!(
124                "failed to convert peer_id ({:?}) to an offchain public key: {e}",
125                peer_id
126            ))
127        })
128}
129
130#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
131pub enum HoprTransportProcess {
132    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
133    Medium,
134    #[strum(to_string = "HOPR packet pipeline ({0})")]
135    Pipeline(hopr_transport_protocol::PacketPipelineProcesses),
136    #[strum(to_string = "session manager sub-process #{0}")]
137    SessionsManagement(usize),
138    #[strum(to_string = "network probing sub-process: {0}")]
139    Probing(hopr_transport_probe::HoprProbeProcess),
140    #[cfg(feature = "runtime-tokio")]
141    #[strum(to_string = "path cache refresh")]
142    PathRefresh,
143    #[strum(to_string = "sync of outgoing ticket indices")]
144    OutgoingIndexSync,
145    #[strum(to_string = "periodic protocol counter flush")]
146    CounterFlush,
147    #[cfg(feature = "capture")]
148    #[strum(to_string = "packet capture")]
149    Capture,
150}
151
152/// HOPR protocol specific instantiation of the SessionManager.
153type HoprSessionManager = SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>;
154
155/// Allows configuration of one specific [`HoprSession`].
156///
157/// The configurator does not prevent the Session from being closed
158/// or the Session manager from being dropped.
159#[derive(Debug, Clone)]
160pub struct HoprSessionConfigurator {
161    id: SessionId,
162    // Makes sure configurator does not extend lifetime of the SessionManager.
163    smgr: std::sync::Weak<HoprSessionManager>,
164}
165
166impl HoprSessionConfigurator {
167    /// [`SessionId`] of the session this object can configure.
168    pub fn id(&self) -> &SessionId {
169        &self.id
170    }
171
172    /// Sends a Session Keep-Alive packet over the Session.
173    ///
174    /// NOTE: This usually carries at least 2 SURBs on the HOPR protocol level and can be
175    /// used for manual SURB balancing.
176    ///
177    /// NOTE: This operation only sends the Session Keep-Alive packet and **DOES NOT** guarantee the other side
178    /// has received it.
179    pub async fn ping(&self) -> errors::Result<()> {
180        Ok(self
181            .smgr
182            .upgrade()
183            .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
184            .ping_session(&self.id)
185            .await?)
186    }
187
188    /// Gets the configuration of the SURB balancer.
189    ///
190    /// Returns an error if the Session is closed, the Session manager is gone.
191    ///
192    /// Returns `Ok(None)` if the Session has been created without a SURB balancer.
193    pub async fn get_surb_balancer_config(&self) -> errors::Result<Option<SurbBalancerConfig>> {
194        Ok(self
195            .smgr
196            .upgrade()
197            .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
198            .get_surb_balancer_config(&self.id)
199            .await?)
200    }
201
202    /// Updates the configuration of the SURB balancer.
203    ///
204    /// Returns an error if the Session is closed, the Session manager is gone, or the
205    /// Session has been created without a SURB balancer.
206    pub async fn update_surb_balancer_config(&self, config: SurbBalancerConfig) -> errors::Result<()> {
207        Ok(self
208            .smgr
209            .upgrade()
210            .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
211            .update_surb_balancer_config(&self.id, config)
212            .await?)
213    }
214}
215
216/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
217/// the transport.
218pub struct HoprTransport<Chain, Graph, Net> {
219    packet_key: OffchainKeypair,
220    chain_key: ChainKeypair,
221    chain_api: Chain,
222    ping: Arc<OnceLock<Pinger>>,
223    network: Arc<OnceLock<Net>>,
224    graph: Graph,
225    path_planner: PathPlanner<MemorySurbStore, Chain, HoprGraphPathSelector<Graph>>,
226    my_multiaddresses: Vec<Multiaddr>,
227    smgr: Arc<HoprSessionManager>,
228    session_telemetry_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
229    probing_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
230    counters: PeerProtocolCounterRegistry,
231    cfg: HoprProtocolConfig,
232}
233
234impl<Chain, Graph, Net> HoprTransport<Chain, Graph, Net>
235where
236    Chain: ChainReadChannelOperations
237        + ChainReadAccountOperations
238        + ChainWriteTicketOperations
239        + ChainKeyOperations
240        + ChainReadTicketOperations
241        + ChainValues
242        + Clone
243        + Send
244        + Sync
245        + 'static,
246    Graph: NetworkGraphView<NodeId = OffchainPublicKey>
247        + NetworkGraphUpdate
248        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
249        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
250        + Clone
251        + Send
252        + Sync
253        + 'static,
254    <Graph as NetworkGraphView>::Observed: hopr_api::graph::traits::EdgeObservableRead + Send,
255    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
256        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
257    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
258    Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
259{
260    pub fn new(
261        identity: (&ChainKeypair, &OffchainKeypair),
262        resolver: Chain,
263        graph: Graph,
264        my_multiaddresses: Vec<Multiaddr>,
265        cfg: HoprProtocolConfig,
266    ) -> errors::Result<Self> {
267        let me_offchain = *identity.1.public();
268        let planner_config = cfg.path_planner;
269        let selector = HoprGraphPathSelector::new(me_offchain, graph.clone(), planner_config.max_cached_paths);
270
271        let tag_allocators = hopr_transport_tag_allocator::create_allocators_from_config(&cfg.session.tag_allocator)?;
272
273        let mut session_tag_allocator = None;
274        let mut session_telemetry_tag_allocator = None;
275        let mut probing_tag_allocator = None;
276        for (usage, alloc) in tag_allocators {
277            match usage {
278                hopr_transport_tag_allocator::Usage::Session => session_tag_allocator = Some(alloc),
279                hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry => {
280                    session_telemetry_tag_allocator = Some(alloc)
281                }
282                hopr_transport_tag_allocator::Usage::ProvingTelemetry => probing_tag_allocator = Some(alloc),
283            }
284        }
285        let session_tag_allocator = session_tag_allocator
286            .ok_or_else(|| errors::HoprTransportError::Api("session tag allocator missing".into()))?;
287        let session_telemetry_tag_allocator = session_telemetry_tag_allocator
288            .ok_or_else(|| errors::HoprTransportError::Api("session telemetry tag allocator missing".into()))?;
289        let probing_tag_allocator = probing_tag_allocator
290            .ok_or_else(|| errors::HoprTransportError::Api("probing tag allocator missing".into()))?;
291
292        Ok(Self {
293            packet_key: identity.1.clone(),
294            chain_key: identity.0.clone(),
295            ping: Arc::new(OnceLock::new()),
296            network: Arc::new(OnceLock::new()),
297            graph,
298            path_planner: PathPlanner::new(
299                me_offchain,
300                MemorySurbStore::new(cfg.packet.surb_store),
301                resolver.clone(),
302                selector,
303                planner_config,
304            ),
305            my_multiaddresses,
306            smgr: SessionManager::new(
307                SessionManagerConfig {
308                    frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
309                        .ok()
310                        .and_then(|s| s.parse::<usize>().ok())
311                        .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
312                        .max(ApplicationData::PAYLOAD_SIZE),
313                    max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
314                        .ok()
315                        .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
316                        .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
317                        .max(Duration::from_millis(100)),
318                    initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
319                    idle_timeout: cfg.session.idle_timeout,
320                    balancer_sampling_interval: cfg.session.balancer_sampling_interval,
321                    initial_return_session_egress_rate: 10,
322                    minimum_surb_buffer_duration: cfg.session.balancer_minimum_surb_buffer_duration,
323                    maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
324                    surb_balance_notify_period: None,
325                    surb_target_notify: true,
326                },
327                session_tag_allocator,
328            )
329            .into(),
330            chain_api: resolver,
331            session_telemetry_tag_allocator,
332            probing_tag_allocator,
333            counters: PeerProtocolCounterRegistry::default(),
334            cfg,
335        })
336    }
337
338    /// Execute all processes of the [`HoprTransport`] object.
339    ///
340    /// This method will spawn the `HoprTransportProcess::Heartbeat`,
341    /// `HoprTransportProcess::BloomFilterSave`, `HoprTransportProcess::Swarm` and session-related
342    /// processes and return join handles to the calling function. These processes are not started immediately but
343    /// are waiting for a trigger from this piece of code.
344    pub async fn run<T, TFact, Ct>(
345        &self,
346        cover_traffic: Ct,
347        network: Net,
348        network_process: BoxedProcessFn,
349        ticket_events: T,
350        ticket_factory: TFact,
351        on_incoming_session: Sender<IncomingSession>,
352    ) -> errors::Result<(
353        HoprSocket<
354            futures::channel::mpsc::Receiver<ApplicationDataIn>,
355            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
356        >,
357        AbortableList<HoprTransportProcess>,
358    )>
359    where
360        T: futures::Sink<hopr_api::node::TicketEvent> + Clone + Send + Unpin + 'static,
361        T::Error: std::error::Error + Clone + Send,
362        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
363        TFact: TicketFactory + Clone + Send + Sync + 'static,
364    {
365        let mut processes = AbortableList::<HoprTransportProcess>::default();
366
367        let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
368            channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
369
370        // -- transport medium
371
372        let transport_network = network;
373        let transport_layer_process = network_process;
374
375        let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
376        let (wire_msg_tx, wire_msg_rx) =
377            hopr_transport_protocol::stream::process_stream_protocol(msg_codec, transport_network.clone()).await?;
378
379        let (mixing_channel_tx, mixing_channel_rx) =
380            hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(build_mixer_cfg_from_env());
381
382        // the process is terminated when the input stream runs out
383        let _mixing_process_before_sending_out = spawn(
384            mixing_channel_rx
385                .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
386                .map(Ok)
387                .forward(wire_msg_tx)
388                .inspect(|_| {
389                    tracing::warn!(
390                        task = "mixer -> egress process",
391                        "long-running background task finished"
392                    )
393                }),
394        );
395
396        // -- path cache background refresh (only when tokio runtime is available)
397        #[cfg(feature = "runtime-tokio")]
398        processes.insert(
399            HoprTransportProcess::PathRefresh,
400            spawn_as_abortable!(self.path_planner.run_background_refresh()),
401        );
402
403        processes.insert(
404            HoprTransportProcess::Medium,
405            spawn_as_abortable!(transport_layer_process().inspect(|_| tracing::warn!(
406                task = %HoprTransportProcess::Medium,
407                "long-running background task finished"
408            ))),
409        );
410
411        let msg_protocol_bidirectional_channel_capacity =
412            std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
413                .ok()
414                .and_then(|s| s.trim().parse::<usize>().ok())
415                .filter(|&c| c > 0)
416                .unwrap_or(16_384);
417
418        let (on_incoming_data_tx, on_incoming_data_rx) =
419            channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
420
421        debug!(
422            capacity = msg_protocol_bidirectional_channel_capacity,
423            "creating protocol bidirectional channel"
424        );
425        let (tx_from_protocol, rx_from_protocol) =
426            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
427
428        // === START === cover traffic control
429        // Allocate a cover traffic tag from the session telemetry partition to avoid
430        // collisions with session and probing tags.
431        let cover_traffic_allocated_tag = self
432            .session_telemetry_tag_allocator
433            .allocate()
434            .ok_or_else(|| HoprTransportError::Api("failed to allocate cover traffic tag".into()))?;
435        let cover_traffic_tag: Tag = cover_traffic_allocated_tag.value().into();
436
437        // filter out the known cover traffic not to lose processing time with it
438        // The allocated tag is moved into the closure to keep it alive for the transport lifetime.
439        let rx_from_protocol = rx_from_protocol.filter_map(move |(pseudonym, data)| {
440            let _keep_alive = &cover_traffic_allocated_tag;
441            async move { (data.data.application_tag != cover_traffic_tag).then_some((pseudonym, data)) }
442        });
443
444        // prepare a cover traffic stream
445        let cover_traffic_stream = CoverTrafficGeneration::build(&cover_traffic).filter_map(move |routing| {
446            let start =
447                hopr_api::types::crypto_random::random_integer(0, Some((RANDOM_DATA.len() - 100) as u64)) as usize;
448            let data = &RANDOM_DATA[start..start + 100];
449
450            futures::future::ready(if let Ok(data) = ApplicationData::new(cover_traffic_tag, data) {
451                Some((routing, ApplicationDataOut::with_no_packet_info(data)))
452            } else {
453                tracing::error!("failed to construct cover traffic packet");
454                None
455            })
456        });
457
458        // merge cover traffic with other outgoing data
459        let merged_unresolved_output_data =
460            select_with_strategy(unresolved_routing_msg_rx, cover_traffic_stream, |_: &mut ()| {
461                futures::stream::PollNext::Left
462            });
463
464        // === END === cover traffic control
465
466        // We have to resolve DestinationRouting -> ResolvedTransportRouting before
467        // sending the external packets to the transport pipeline.
468        let path_planner = self.path_planner.clone();
469        let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
470        let all_resolved_external_msg_rx = merged_unresolved_output_data.filter_map(move |(unresolved, mut data)| {
471            let path_planner = path_planner.clone();
472            async move {
473                trace!(?unresolved, "resolving routing for packet");
474                match path_planner
475                    .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
476                    .await
477                {
478                    Ok((resolved, rem_surbs)) => {
479                        // Set the SURB distress/out-of-SURBs flag if applicable.
480                        // These flags are translated into HOPR protocol packet signals and are
481                        // applicable only on the return path.
482                        let mut signals_to_dst = data
483                            .packet_info
484                            .as_ref()
485                            .map(|info| info.signals_to_destination)
486                            .unwrap_or_default();
487
488                        if resolved.is_return() {
489                            signals_to_dst = match rem_surbs {
490                                Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
491                                    signals_to_dst | PacketSignal::SurbDistress
492                                }
493                                Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
494                                _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
495                            };
496                        } else {
497                            // Unset these flags as they make no sense on the forward path.
498                            signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
499                        }
500
501                        data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
502                        trace!(?resolved, "resolved routing for packet");
503                        Some((resolved, data))
504                    }
505                    Err(error) => {
506                        error!(%error, "failed to resolve routing");
507                        None
508                    }
509                }
510            }
511            .in_current_span()
512        });
513
514        let channels_dst = self
515            .chain_api
516            .domain_separators()
517            .await
518            .map_err(HoprTransportError::chain)?
519            .channel;
520
521        processes.extend_from(pipeline::run_hopr_packet_pipeline(
522            (self.packet_key.clone(), self.chain_key.clone()),
523            (mixing_channel_tx, wire_msg_rx),
524            (tx_from_protocol, all_resolved_external_msg_rx),
525            HoprPipelineComponents {
526                surb_store: self.path_planner.surb_store.clone(),
527                chain_api: self.chain_api.clone(),
528                counters: self.counters.clone(),
529                ticket_factory,
530                ticket_events,
531            },
532            channels_dst,
533            self.cfg.packet,
534        ));
535
536        // -- periodic counter flush
537        let flush_counters = self.counters.clone();
538        let flush_graph = self.graph.clone();
539        let flush_me = *self.packet_key.public();
540        let flush_interval = self.cfg.counter_flush_interval;
541        processes.insert(
542            HoprTransportProcess::CounterFlush,
543            spawn_as_abortable!(async move {
544                use hopr_api::graph::traits::{EdgeObservableWrite, EdgeWeightType};
545
546                futures_time::stream::interval(futures_time::time::Duration::from(flush_interval))
547                    .for_each(|_| {
548                        for (peer, num_packets, num_acks) in flush_counters.drain() {
549                            tracing::trace!(
550                                %peer,
551                                num_packets,
552                                num_acks,
553                                "flushing protocol conformance counters"
554                            );
555                            flush_graph.upsert_edge(&flush_me, &peer, |obs| {
556                                obs.record(EdgeWeightType::ImmediateProtocolConformance { num_packets, num_acks });
557                            });
558                        }
559                        futures::future::ready(())
560                    })
561                    .await;
562            }),
563        );
564
565        // -- network probing
566        debug!(
567            capacity = msg_protocol_bidirectional_channel_capacity,
568            note = "same as protocol bidirectional",
569            "Creating probing channel"
570        );
571
572        let (tx_from_probing, rx_from_probing) =
573            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
574
575        let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
576            .ok()
577            .and_then(|s| s.trim().parse::<usize>().ok())
578            .filter(|&c| c > 0)
579            .unwrap_or(128);
580        debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
581        let (manual_ping_tx, manual_ping_rx) =
582            channel::<(OffchainPublicKey, PingQueryReplier)>(manual_ping_channel_capacity);
583
584        let probe = Probe::new(self.cfg.probe, self.probing_tag_allocator.clone());
585
586        let probing_processes = probe
587            .continuously_scan(
588                (unresolved_routing_msg_tx.clone(), rx_from_protocol),
589                manual_ping_rx,
590                tx_from_probing,
591                cover_traffic,
592                self.graph.clone(),
593            )
594            .await;
595
596        processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
597
598        // manual ping
599        self.ping
600            .clone()
601            .set(Pinger::new(
602                PingConfig {
603                    timeout: self.cfg.probe.timeout,
604                },
605                manual_ping_tx,
606            ))
607            .map_err(|_| HoprTransportError::Api("must set the ticket aggregation writer only once".into()))?;
608
609        // -- session management
610        self.smgr
611            .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
612            .map_err(|_| HoprTransportError::Api("failed to start session manager".into()))?
613            .into_iter()
614            .enumerate()
615            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
616            .for_each(|(k, v)| {
617                processes.insert(k, v);
618            });
619
620        let smgr = self.smgr.clone();
621        processes.insert(
622            HoprTransportProcess::SessionsManagement(0),
623            spawn_as_abortable!(
624                rx_from_probing
625                    .filter_map(move |(pseudonym, data)| {
626                        let smgr = smgr.clone();
627                        async move {
628                            match smgr.dispatch_message(pseudonym, data).await {
629                                Ok(DispatchResult::Processed) => {
630                                    tracing::trace!("message dispatch completed");
631                                    None
632                                }
633                                Ok(DispatchResult::Unrelated(data)) => {
634                                    tracing::trace!("unrelated message dispatch completed");
635                                    Some(data)
636                                }
637                                Err(error) => {
638                                    tracing::error!(%error, "error while dispatching packet in the session manager");
639                                    None
640                                }
641                            }
642                        }
643                    })
644                    .map(Ok)
645                    .forward(on_incoming_data_tx)
646                    .inspect(|_| tracing::warn!(
647                        task = %HoprTransportProcess::SessionsManagement(0),
648                        "long-running background task finished"
649                    ))
650            ),
651        );
652
653        // Populate the OnceLock at the end, making sure everything before didn't fail.
654        self.network
655            .clone()
656            .set(transport_network)
657            .map_err(|_| HoprTransportError::Api("transport network viewer already set".into()))?;
658
659        Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
660    }
661
662    #[tracing::instrument(level = "debug", skip(self))]
663    pub async fn ping(
664        &self,
665        peer: &OffchainPublicKey,
666    ) -> errors::Result<(std::time::Duration, <Graph as NetworkGraphView>::Observed)> {
667        let me: &OffchainPublicKey = self.packet_key.public();
668        if peer == me {
669            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
670        }
671
672        let pinger = self
673            .ping
674            .get()
675            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
676
677        let latency = (*pinger).ping(peer).await?;
678
679        if let Some(observations) = self.graph.edge(me, peer) {
680            Ok((latency, observations))
681        } else {
682            Err(HoprTransportError::Api(format!(
683                "no observations available for peer {peer}",
684            )))
685        }
686    }
687
688    #[tracing::instrument(level = "debug", skip(self))]
689    pub async fn new_session(
690        &self,
691        destination: Address,
692        target: SessionTarget,
693        cfg: SessionClientConfig,
694    ) -> errors::Result<(HoprSession, HoprSessionConfigurator)> {
695        let session = self.smgr.new_session(destination, target, cfg).await?;
696        let id = *session.id();
697        Ok((
698            session,
699            HoprSessionConfigurator {
700                id,
701                smgr: Arc::downgrade(&self.smgr),
702            },
703        ))
704    }
705
706    #[tracing::instrument(level = "debug", skip(self))]
707    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
708        self.network
709            .get()
710            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
711            .map(|network| network.listening_as().into_iter().collect())
712            .unwrap_or_default()
713    }
714
715    #[tracing::instrument(level = "debug", skip(self))]
716    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
717        let mut mas = self
718            .local_multiaddresses()
719            .into_iter()
720            .filter(|ma| {
721                crate::multiaddrs::is_supported(ma)
722                    && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
723            })
724            .map(|ma| strip_p2p_protocol(&ma))
725            .filter(|v| !v.is_empty())
726            .collect::<Vec<_>>();
727
728        mas.sort_by(|l, r| {
729            let is_left_dns = crate::multiaddrs::is_dns(l);
730            let is_right_dns = crate::multiaddrs::is_dns(r);
731
732            if !(is_left_dns ^ is_right_dns) {
733                std::cmp::Ordering::Equal
734            } else if is_left_dns {
735                std::cmp::Ordering::Less
736            } else {
737                std::cmp::Ordering::Greater
738            }
739        });
740
741        mas
742    }
743
744    /// Returns a reference to the network graph.
745    pub fn graph(&self) -> &Graph {
746        &self.graph
747    }
748
749    #[tracing::instrument(level = "debug", skip(self))]
750    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
751        self.network
752            .get()
753            .map(|network| network.listening_as().into_iter().collect())
754            .unwrap_or_else(|| {
755                tracing::error!("transport network is not yet initialized, cannot fetch announced multiaddresses");
756                self.my_multiaddresses.clone()
757            })
758    }
759
760    #[tracing::instrument(level = "debug", skip(self))]
761    pub async fn network_observed_multiaddresses(&self, peer: &OffchainPublicKey) -> Vec<Multiaddr> {
762        match self
763            .network
764            .get()
765            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
766        {
767            Ok(network) => network
768                .multiaddress_of(&peer.into())
769                .unwrap_or_default()
770                .into_iter()
771                .collect(),
772            Err(error) => {
773                tracing::error!(%error, "failed to get observed multiaddresses");
774                return vec![];
775            }
776        }
777    }
778
779    #[tracing::instrument(level = "debug", skip(self))]
780    pub async fn network_health(&self) -> Health {
781        self.network
782            .get()
783            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
784            .map(|network| network.health())
785            .unwrap_or(Health::Red)
786    }
787
788    pub async fn network_connected_peers(&self) -> errors::Result<Vec<OffchainPublicKey>> {
789        Ok(futures::stream::iter(
790            self.network
791                .get()
792                .ok_or_else(|| {
793                    tracing::error!("transport network is not yet initialized");
794                    HoprTransportError::Api("transport network is not yet initialized".into())
795                })?
796                .connected_peers(),
797        )
798        .filter_map(|peer_id| async move {
799            match peer_id_to_public_key(&peer_id) {
800                Ok(key) => Some(key),
801                Err(error) => {
802                    tracing::warn!(%peer_id, %error, "failed to convert PeerId to OffchainPublicKey");
803                    None
804                }
805            }
806        })
807        .collect()
808        .await)
809    }
810
811    #[tracing::instrument(level = "debug", skip(self))]
812    pub fn network_peer_observations(&self, peer: &OffchainPublicKey) -> Option<<Graph as NetworkGraphView>::Observed> {
813        self.graph.edge(self.packet_key.public(), peer)
814    }
815
816    /// Get connected peers with quality higher than some value.
817    #[tracing::instrument(level = "debug", skip(self))]
818    pub async fn all_network_peers(
819        &self,
820        minimum_score: f64,
821    ) -> errors::Result<Vec<(OffchainPublicKey, <Graph as NetworkGraphView>::Observed)>> {
822        let me = self.packet_key.public();
823        Ok(self
824            .network_connected_peers()
825            .await?
826            .into_iter()
827            .filter_map(|peer| {
828                let observation = self.graph.edge(me, &peer);
829                if let Some(info) = observation {
830                    if info.score() >= minimum_score {
831                        Some((peer, info))
832                    } else {
833                        None
834                    }
835                } else {
836                    None
837                }
838            })
839            .collect::<Vec<_>>())
840    }
841}
842
843fn build_mixer_cfg_from_env() -> MixerConfig {
844    let mixer_cfg = MixerConfig {
845        min_delay: std::time::Duration::from_millis(
846            std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
847                .map(|v| {
848                    v.trim()
849                        .parse::<u64>()
850                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
851                })
852                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
853        ),
854        delay_range: std::time::Duration::from_millis(
855            std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
856                .map(|v| {
857                    v.trim()
858                        .parse::<u64>()
859                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
860                })
861                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
862        ),
863        capacity: {
864            let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
865                .ok()
866                .and_then(|s| s.trim().parse::<usize>().ok())
867                .filter(|&c| c > 0)
868                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
869            debug!(capacity = capacity, "Setting mixer capacity");
870            capacity
871        },
872        ..MixerConfig::default()
873    };
874    debug!(?mixer_cfg, "Mixer configuration");
875
876    mixer_cfg
877}
878
879// ---------------------------------------------------------------------------
880// NetworkView impl for HoprTransport — wraps OnceLock<Net> access
881// ---------------------------------------------------------------------------
882
883impl<Chain, Graph, Net> NetworkView for HoprTransport<Chain, Graph, Net>
884where
885    Net: NetworkView + Send + Sync + 'static,
886{
887    fn listening_as(&self) -> std::collections::HashSet<Multiaddr> {
888        self.network.get().map(|n| n.listening_as()).unwrap_or_default()
889    }
890
891    fn multiaddress_of(&self, peer: &PeerId) -> Option<std::collections::HashSet<Multiaddr>> {
892        self.network.get()?.multiaddress_of(peer)
893    }
894
895    fn discovered_peers(&self) -> std::collections::HashSet<PeerId> {
896        self.network.get().map(|n| n.discovered_peers()).unwrap_or_default()
897    }
898
899    fn connected_peers(&self) -> std::collections::HashSet<PeerId> {
900        self.network.get().map(|n| n.connected_peers()).unwrap_or_default()
901    }
902
903    fn is_connected(&self, peer: &PeerId) -> bool {
904        self.network.get().map(|n| n.is_connected(peer)).unwrap_or(false)
905    }
906
907    fn health(&self) -> Health {
908        self.network.get().map(|n| n.health()).unwrap_or(Health::Red)
909    }
910
911    fn subscribe_network_events(
912        &self,
913    ) -> impl futures::Stream<Item = hopr_api::network::NetworkEvent> + Send + 'static {
914        match self.network.get() {
915            Some(n) => futures::future::Either::Left(n.subscribe_network_events()),
916            None => futures::future::Either::Right(futures::stream::empty()),
917        }
918    }
919}
920
921// ---------------------------------------------------------------------------
922// TransportOperations impl for HoprTransport
923// ---------------------------------------------------------------------------
924
925#[async_trait::async_trait]
926impl<Chain, Graph, Net> hopr_api::node::TransportOperations for HoprTransport<Chain, Graph, Net>
927where
928    Chain: ChainReadChannelOperations
929        + ChainReadAccountOperations
930        + hopr_api::chain::ChainWriteTicketOperations
931        + ChainKeyOperations
932        + hopr_api::chain::ChainReadTicketOperations
933        + ChainValues
934        + Clone
935        + Send
936        + Sync
937        + 'static,
938    Graph: NetworkGraphView<NodeId = OffchainPublicKey>
939        + NetworkGraphUpdate
940        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
941        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
942        + Clone
943        + Send
944        + Sync
945        + 'static,
946    <Graph as NetworkGraphView>::Observed: EdgeObservableRead + Send,
947    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed: EdgeObservableRead + Send + 'static,
948    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
949    Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
950{
951    type Error = errors::HoprTransportError;
952    type Observable = <Graph as NetworkGraphView>::Observed;
953
954    async fn ping(&self, key: &OffchainPublicKey) -> Result<(Duration, Self::Observable), Self::Error> {
955        self.ping(key).await
956    }
957
958    async fn observed_multiaddresses(&self, key: &OffchainPublicKey) -> Vec<Multiaddr> {
959        self.network_observed_multiaddresses(key).await
960    }
961}