Skip to main content

hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! See also the `hopr_protocol_start` crate for details on Start sub-protocol which initiates a Session.
6//!
7//! As such, the transport layer components should be only those that are directly necessary to:
8//!
9//! 1. send and receive a packet, acknowledgement or ticket aggregation request
10//! 2. send and receive a network telemetry request
11//! 3. automate transport level processes
12//! 4. algorithms associated with the transport layer operational management
13//! 5. interface specifications to allow modular behavioral extensions
14
15/// Configuration of the [crate::HoprTransport].
16pub mod config;
17/// Constants used and exposed by the crate.
18pub mod constants;
19/// Errors used by the crate.
20pub mod errors;
21
22mod multiaddrs;
23
24#[cfg(feature = "capture")]
25mod capture;
26mod pipeline;
27pub mod socket;
28
29use std::{
30    sync::{Arc, OnceLock},
31    time::Duration,
32};
33
34use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
35use futures::{
36    FutureExt, StreamExt,
37    channel::mpsc::{Sender, channel},
38    stream::select_with_strategy,
39};
40pub use hopr_api::{
41    Multiaddr, PeerId,
42    network::{Health, traits::NetworkView},
43    types::{
44        crypto::{
45            keypairs::{ChainKeypair, Keypair, OffchainKeypair},
46            types::{HalfKeyChallenge, Hash, OffchainPublicKey},
47        },
48        internal::{prelude::HoprPseudonym, routing::RoutingOptions},
49    },
50};
51use hopr_api::{
52    chain::{ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
53    ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
54    graph::{NetworkGraphUpdate, NetworkGraphView, traits::EdgeObservableRead},
55    network::{BoxedProcessFn, NetworkStreamControl},
56    types::primitive::prelude::*,
57};
58use hopr_async_runtime::{AbortableList, prelude::spawn, spawn_as_abortable};
59use hopr_crypto_packet::prelude::PacketSignal;
60use hopr_network_types::prelude::*;
61pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
62use hopr_protocol_hopr::MemorySurbStore;
63use hopr_transport_mixer::MixerConfig;
64use hopr_transport_path::{BackgroundPathCacheRefreshable, HoprGraphPathSelector, PathPlanner};
65pub use hopr_transport_probe::{NeighborTelemetry, PathTelemetry, errors::ProbeError, ping::PingQueryReplier};
66use hopr_transport_probe::{
67    Probe,
68    ping::{PingConfig, Pinger},
69};
70pub use hopr_transport_protocol::PeerProtocolCounterRegistry;
71pub use hopr_transport_session as session;
72#[cfg(feature = "runtime-tokio")]
73pub use hopr_transport_session::transfer_session;
74pub use hopr_transport_session::{
75    Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
76    SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
77    errors::{SessionManagerError, TransportSessionError},
78};
79use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
80#[cfg(feature = "telemetry")]
81pub use hopr_transport_session::{SessionAckMode, SessionLifecycleState};
82pub use hopr_transport_tag_allocator::TagAllocatorConfig;
83pub use multiaddr::Protocol;
84use tracing::{Instrument, debug, error, trace, warn};
85
86pub use crate::config::HoprProtocolConfig;
87use crate::{
88    constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, multiaddrs::strip_p2p_protocol,
89    pipeline::HoprPipelineComponents, socket::HoprSocket,
90};
91
92pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
93
94pub use hopr_api as api;
95use hopr_api::{
96    chain::{ChainReadTicketOperations, ChainWriteTicketOperations},
97    tickets::TicketFactory,
98    types::internal::routing::DestinationRouting,
99};
100
101// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
102lazy_static::lazy_static! {
103    static ref SESSION_INITIATION_TIMEOUT_MAX: Duration = 2 * SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
104
105    static ref PEER_ID_CACHE: moka::sync::Cache<PeerId, OffchainPublicKey> = moka::sync::Cache::builder()
106        .time_to_idle(Duration::from_mins(15))
107        .max_capacity(10_000)
108        .build();
109
110    static ref RANDOM_DATA: [u8; 400] = hopr_api::types::crypto_random::random_bytes();
111}
112
113/// PeerId -> OffchainPublicKey is a CPU-intensive blocking operation.
114///
115/// This helper uses a cached static object to speed up the lookup and avoid blocking the async
116/// runtime on repeated conversions for the same [`PeerId`]s.
117pub fn peer_id_to_public_key(peer_id: &PeerId) -> crate::errors::Result<OffchainPublicKey> {
118    PEER_ID_CACHE
119        .try_get_with_by_ref(peer_id, move || {
120            OffchainPublicKey::from_peerid(peer_id).map_err(|e| e.into())
121        })
122        .map_err(|e: Arc<HoprTransportError>| {
123            crate::errors::HoprTransportError::Other(anyhow::anyhow!(
124                "failed to convert peer_id ({:?}) to an offchain public key: {e}",
125                peer_id
126            ))
127        })
128}
129
130#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
131pub enum HoprTransportProcess {
132    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
133    Medium,
134    #[strum(to_string = "HOPR packet pipeline ({0})")]
135    Pipeline(hopr_transport_protocol::PacketPipelineProcesses),
136    #[strum(to_string = "session manager sub-process #{0}")]
137    SessionsManagement(usize),
138    #[strum(to_string = "network probing sub-process: {0}")]
139    Probing(hopr_transport_probe::HoprProbeProcess),
140    #[cfg(feature = "runtime-tokio")]
141    #[strum(to_string = "path cache refresh")]
142    PathRefresh,
143    #[strum(to_string = "sync of outgoing ticket indices")]
144    OutgoingIndexSync,
145    #[strum(to_string = "periodic protocol counter flush")]
146    CounterFlush,
147    #[cfg(feature = "capture")]
148    #[strum(to_string = "packet capture")]
149    Capture,
150}
151
152/// HOPR protocol specific instantiation of the SessionManager.
153type HoprSessionManager = SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>;
154
155/// Allows configuration of one specific [`HoprSession`].
156///
157/// The configurator does not prevent the Session from being closed
158/// or the Session manager from being dropped.
159#[derive(Debug, Clone)]
160pub struct HoprSessionConfigurator {
161    id: SessionId,
162    // Makes sure configurator does not extend lifetime of the SessionManager.
163    smgr: std::sync::Weak<HoprSessionManager>,
164}
165
166impl HoprSessionConfigurator {
167    /// [`SessionId`] of the session this object can configure.
168    pub fn id(&self) -> &SessionId {
169        &self.id
170    }
171
172    /// Sends a Session Keep-Alive packet over the Session.
173    ///
174    /// NOTE: This usually carries at least 2 SURBs on the HOPR protocol level and can be
175    /// used for manual SURB balancing.
176    ///
177    /// NOTE: This operation only sends the Session Keep-Alive packet and **DOES NOT** guarantee the other side
178    /// has received it.
179    pub async fn ping(&self) -> errors::Result<()> {
180        Ok(self
181            .smgr
182            .upgrade()
183            .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
184            .ping_session(&self.id)
185            .await?)
186    }
187
188    /// Gets the configuration of the SURB balancer.
189    ///
190    /// Returns an error if the Session is closed, the Session manager is gone.
191    ///
192    /// Returns `Ok(None)` if the Session has been created without a SURB balancer.
193    pub async fn get_surb_balancer_config(&self) -> errors::Result<Option<SurbBalancerConfig>> {
194        Ok(self
195            .smgr
196            .upgrade()
197            .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
198            .get_surb_balancer_config(&self.id)
199            .await?)
200    }
201
202    /// Updates the configuration of the SURB balancer.
203    ///
204    /// Returns an error if the Session is closed, the Session manager is gone, or the
205    /// Session has been created without a SURB balancer.
206    pub async fn update_surb_balancer_config(&self, config: SurbBalancerConfig) -> errors::Result<()> {
207        Ok(self
208            .smgr
209            .upgrade()
210            .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
211            .update_surb_balancer_config(&self.id, config)
212            .await?)
213    }
214}
215
216/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
217/// the transport.
218pub struct HoprTransport<Chain, Graph, Net> {
219    packet_key: OffchainKeypair,
220    chain_key: ChainKeypair,
221    chain_api: Chain,
222    ping: Arc<OnceLock<Pinger>>,
223    network: Arc<OnceLock<Net>>,
224    graph: Graph,
225    path_planner: PathPlanner<MemorySurbStore, Chain, HoprGraphPathSelector<Graph>>,
226    my_multiaddresses: Vec<Multiaddr>,
227    smgr: Arc<HoprSessionManager>,
228    session_telemetry_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
229    probing_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
230    counters: PeerProtocolCounterRegistry,
231    cfg: HoprProtocolConfig,
232}
233
234impl<Chain, Graph, Net> HoprTransport<Chain, Graph, Net>
235where
236    Chain: ChainReadChannelOperations
237        + ChainReadAccountOperations
238        + ChainWriteTicketOperations
239        + ChainKeyOperations
240        + ChainReadTicketOperations
241        + ChainValues
242        + Clone
243        + Send
244        + Sync
245        + 'static,
246    Graph: NetworkGraphView<NodeId = OffchainPublicKey>
247        + NetworkGraphUpdate
248        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
249        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
250        + Clone
251        + Send
252        + Sync
253        + 'static,
254    <Graph as NetworkGraphView>::Observed: hopr_api::graph::traits::EdgeObservableRead + Send,
255    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
256        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
257    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
258    Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
259{
260    pub fn new(
261        identity: (&ChainKeypair, &OffchainKeypair),
262        resolver: Chain,
263        graph: Graph,
264        my_multiaddresses: Vec<Multiaddr>,
265        cfg: HoprProtocolConfig,
266    ) -> errors::Result<Self> {
267        let me_offchain = *identity.1.public();
268        let planner_config = cfg.path_planner;
269        let selector = HoprGraphPathSelector::new(
270            me_offchain,
271            graph.clone(),
272            planner_config.max_cached_paths,
273            planner_config.edge_penalty,
274            planner_config.min_ack_rate,
275        );
276
277        let tag_allocators = hopr_transport_tag_allocator::create_allocators_from_config(&cfg.session.tag_allocator)?;
278
279        let mut session_tag_allocator = None;
280        let mut session_telemetry_tag_allocator = None;
281        let mut probing_tag_allocator = None;
282        for (usage, alloc) in tag_allocators {
283            match usage {
284                hopr_transport_tag_allocator::Usage::Session => session_tag_allocator = Some(alloc),
285                hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry => {
286                    session_telemetry_tag_allocator = Some(alloc)
287                }
288                hopr_transport_tag_allocator::Usage::ProvingTelemetry => probing_tag_allocator = Some(alloc),
289            }
290        }
291        let session_tag_allocator = session_tag_allocator
292            .ok_or_else(|| errors::HoprTransportError::Api("session tag allocator missing".into()))?;
293        let session_telemetry_tag_allocator = session_telemetry_tag_allocator
294            .ok_or_else(|| errors::HoprTransportError::Api("session telemetry tag allocator missing".into()))?;
295        let probing_tag_allocator = probing_tag_allocator
296            .ok_or_else(|| errors::HoprTransportError::Api("probing tag allocator missing".into()))?;
297
298        Ok(Self {
299            packet_key: identity.1.clone(),
300            chain_key: identity.0.clone(),
301            ping: Arc::new(OnceLock::new()),
302            network: Arc::new(OnceLock::new()),
303            graph,
304            path_planner: PathPlanner::new(
305                me_offchain,
306                MemorySurbStore::new(cfg.packet.surb_store),
307                resolver.clone(),
308                selector,
309                planner_config,
310            ),
311            my_multiaddresses,
312            smgr: SessionManager::new(
313                SessionManagerConfig {
314                    frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
315                        .ok()
316                        .and_then(|s| s.parse::<usize>().ok())
317                        .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
318                        .max(ApplicationData::PAYLOAD_SIZE),
319                    max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
320                        .ok()
321                        .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
322                        .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
323                        .max(Duration::from_millis(100)),
324                    initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
325                    idle_timeout: cfg.session.idle_timeout,
326                    balancer_sampling_interval: cfg.session.balancer_sampling_interval,
327                    initial_return_session_egress_rate: 10,
328                    minimum_surb_buffer_duration: cfg.session.balancer_minimum_surb_buffer_duration,
329                    maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
330                    surb_balance_notify_period: None,
331                    surb_target_notify: true,
332                },
333                session_tag_allocator,
334            )
335            .into(),
336            chain_api: resolver,
337            session_telemetry_tag_allocator,
338            probing_tag_allocator,
339            counters: PeerProtocolCounterRegistry::default(),
340            cfg,
341        })
342    }
343
344    /// Execute all processes of the [`HoprTransport`] object.
345    ///
346    /// This method will spawn the `HoprTransportProcess::Heartbeat`,
347    /// `HoprTransportProcess::BloomFilterSave`, `HoprTransportProcess::Swarm` and session-related
348    /// processes and return join handles to the calling function. These processes are not started immediately but
349    /// are waiting for a trigger from this piece of code.
350    pub async fn run<T, TFact, Ct>(
351        &self,
352        cover_traffic: Ct,
353        network: Net,
354        network_process: BoxedProcessFn,
355        ticket_events: T,
356        ticket_factory: TFact,
357        on_incoming_session: Sender<IncomingSession>,
358    ) -> errors::Result<(
359        HoprSocket<
360            futures::channel::mpsc::Receiver<ApplicationDataIn>,
361            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
362        >,
363        AbortableList<HoprTransportProcess>,
364    )>
365    where
366        T: futures::Sink<hopr_api::node::TicketEvent> + Clone + Send + Unpin + 'static,
367        T::Error: std::error::Error + Clone + Send,
368        Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
369        TFact: TicketFactory + Clone + Send + Sync + 'static,
370    {
371        let mut processes = AbortableList::<HoprTransportProcess>::default();
372
373        let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
374            channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
375
376        // -- transport medium
377
378        let transport_network = network;
379        let transport_layer_process = network_process;
380
381        let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
382        let (wire_msg_tx, wire_msg_rx) =
383            hopr_transport_protocol::stream::process_stream_protocol(msg_codec, transport_network.clone()).await?;
384
385        let (mixing_channel_tx, mixing_channel_rx) =
386            hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(build_mixer_cfg_from_env());
387
388        // the process is terminated when the input stream runs out
389        let _mixing_process_before_sending_out = spawn(
390            mixing_channel_rx
391                .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
392                .map(Ok)
393                .forward(wire_msg_tx)
394                .inspect(|_| {
395                    tracing::warn!(
396                        task = "mixer -> egress process",
397                        "long-running background task finished"
398                    )
399                }),
400        );
401
402        // -- path cache background refresh (only when tokio runtime is available)
403        #[cfg(feature = "runtime-tokio")]
404        processes.insert(
405            HoprTransportProcess::PathRefresh,
406            spawn_as_abortable!(self.path_planner.run_background_refresh()),
407        );
408
409        processes.insert(
410            HoprTransportProcess::Medium,
411            spawn_as_abortable!(transport_layer_process().inspect(|_| tracing::warn!(
412                task = %HoprTransportProcess::Medium,
413                "long-running background task finished"
414            ))),
415        );
416
417        let msg_protocol_bidirectional_channel_capacity =
418            std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
419                .ok()
420                .and_then(|s| s.trim().parse::<usize>().ok())
421                .filter(|&c| c > 0)
422                .unwrap_or(16_384);
423
424        let (on_incoming_data_tx, on_incoming_data_rx) =
425            channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
426
427        debug!(
428            capacity = msg_protocol_bidirectional_channel_capacity,
429            "creating protocol bidirectional channel"
430        );
431        let (tx_from_protocol, rx_from_protocol) =
432            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
433
434        // === START === cover traffic control
435        // Allocate a cover traffic tag from the session telemetry partition to avoid
436        // collisions with session and probing tags.
437        let cover_traffic_allocated_tag = self
438            .session_telemetry_tag_allocator
439            .allocate()
440            .ok_or_else(|| HoprTransportError::Api("failed to allocate cover traffic tag".into()))?;
441        let cover_traffic_tag: Tag = cover_traffic_allocated_tag.value().into();
442
443        // filter out the known cover traffic not to lose processing time with it
444        // The allocated tag is moved into the closure to keep it alive for the transport lifetime.
445        let rx_from_protocol = rx_from_protocol.filter_map(move |(pseudonym, data)| {
446            let _keep_alive = &cover_traffic_allocated_tag;
447            async move { (data.data.application_tag != cover_traffic_tag).then_some((pseudonym, data)) }
448        });
449
450        // prepare a cover traffic stream
451        let cover_traffic_stream = CoverTrafficGeneration::build(&cover_traffic).filter_map(move |routing| {
452            let start =
453                hopr_api::types::crypto_random::random_integer(0, Some((RANDOM_DATA.len() - 100) as u64)) as usize;
454            let data = &RANDOM_DATA[start..start + 100];
455
456            futures::future::ready(if let Ok(data) = ApplicationData::new(cover_traffic_tag, data) {
457                Some((routing, ApplicationDataOut::with_no_packet_info(data)))
458            } else {
459                tracing::error!("failed to construct cover traffic packet");
460                None
461            })
462        });
463
464        // merge cover traffic with other outgoing data
465        let merged_unresolved_output_data =
466            select_with_strategy(unresolved_routing_msg_rx, cover_traffic_stream, |_: &mut ()| {
467                futures::stream::PollNext::Left
468            });
469
470        // === END === cover traffic control
471
472        // We have to resolve DestinationRouting -> ResolvedTransportRouting before
473        // sending the external packets to the transport pipeline.
474        let path_planner = self.path_planner.clone();
475        let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
476        let all_resolved_external_msg_rx = merged_unresolved_output_data.filter_map(move |(unresolved, mut data)| {
477            let path_planner = path_planner.clone();
478            async move {
479                trace!(?unresolved, "resolving routing for packet");
480                match path_planner
481                    .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
482                    .await
483                {
484                    Ok((resolved, rem_surbs)) => {
485                        // Set the SURB distress/out-of-SURBs flag if applicable.
486                        // These flags are translated into HOPR protocol packet signals and are
487                        // applicable only on the return path.
488                        let mut signals_to_dst = data
489                            .packet_info
490                            .as_ref()
491                            .map(|info| info.signals_to_destination)
492                            .unwrap_or_default();
493
494                        if resolved.is_return() {
495                            signals_to_dst = match rem_surbs {
496                                Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
497                                    signals_to_dst | PacketSignal::SurbDistress
498                                }
499                                Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
500                                _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
501                            };
502                        } else {
503                            // Unset these flags as they make no sense on the forward path.
504                            signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
505                        }
506
507                        data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
508                        trace!(?resolved, "resolved routing for packet");
509                        Some((resolved, data))
510                    }
511                    Err(error) => {
512                        error!(%error, "failed to resolve routing");
513                        None
514                    }
515                }
516            }
517            .in_current_span()
518        });
519
520        let channels_dst = self
521            .chain_api
522            .domain_separators()
523            .await
524            .map_err(HoprTransportError::chain)?
525            .channel;
526
527        processes.extend_from(pipeline::run_hopr_packet_pipeline(
528            (self.packet_key.clone(), self.chain_key.clone()),
529            (mixing_channel_tx, wire_msg_rx),
530            (tx_from_protocol, all_resolved_external_msg_rx),
531            HoprPipelineComponents {
532                surb_store: self.path_planner.surb_store.clone(),
533                chain_api: self.chain_api.clone(),
534                counters: self.counters.clone(),
535                ticket_factory,
536                ticket_events,
537            },
538            channels_dst,
539            self.cfg.packet,
540        ));
541
542        // -- periodic counter flush
543        let flush_counters = self.counters.clone();
544        let flush_graph = self.graph.clone();
545        let flush_me = *self.packet_key.public();
546        let flush_interval = self.cfg.counter_flush_interval;
547        processes.insert(
548            HoprTransportProcess::CounterFlush,
549            spawn_as_abortable!(async move {
550                use hopr_api::graph::traits::{EdgeObservableWrite, EdgeWeightType};
551
552                futures_time::stream::interval(futures_time::time::Duration::from(flush_interval))
553                    .for_each(|_| {
554                        for (peer, num_packets, num_acks) in flush_counters.drain() {
555                            tracing::trace!(
556                                %peer,
557                                num_packets,
558                                num_acks,
559                                "flushing protocol conformance counters"
560                            );
561                            flush_graph.upsert_edge(&flush_me, &peer, |obs| {
562                                obs.record(EdgeWeightType::ImmediateProtocolConformance { num_packets, num_acks });
563                            });
564                        }
565                        futures::future::ready(())
566                    })
567                    .await;
568            }),
569        );
570
571        // -- network probing
572        debug!(
573            capacity = msg_protocol_bidirectional_channel_capacity,
574            note = "same as protocol bidirectional",
575            "Creating probing channel"
576        );
577
578        let (tx_from_probing, rx_from_probing) =
579            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
580
581        let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
582            .ok()
583            .and_then(|s| s.trim().parse::<usize>().ok())
584            .filter(|&c| c > 0)
585            .unwrap_or(128);
586        debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
587        let (manual_ping_tx, manual_ping_rx) =
588            channel::<(OffchainPublicKey, PingQueryReplier)>(manual_ping_channel_capacity);
589
590        let probe = Probe::new(self.cfg.probe, self.probing_tag_allocator.clone());
591
592        let probing_processes = probe
593            .continuously_scan(
594                (unresolved_routing_msg_tx.clone(), rx_from_protocol),
595                manual_ping_rx,
596                tx_from_probing,
597                cover_traffic,
598                self.graph.clone(),
599            )
600            .await;
601
602        processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
603
604        // manual ping
605        self.ping
606            .clone()
607            .set(Pinger::new(
608                PingConfig {
609                    timeout: self.cfg.probe.timeout,
610                },
611                manual_ping_tx,
612            ))
613            .map_err(|_| HoprTransportError::Api("must set the ticket aggregation writer only once".into()))?;
614
615        // -- session management
616        self.smgr
617            .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
618            .map_err(|_| HoprTransportError::Api("failed to start session manager".into()))?
619            .into_iter()
620            .enumerate()
621            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
622            .for_each(|(k, v)| {
623                processes.insert(k, v);
624            });
625
626        let smgr = self.smgr.clone();
627        processes.insert(
628            HoprTransportProcess::SessionsManagement(0),
629            spawn_as_abortable!(
630                rx_from_probing
631                    .filter_map(move |(pseudonym, data)| {
632                        let smgr = smgr.clone();
633                        async move {
634                            match smgr.dispatch_message(pseudonym, data).await {
635                                Ok(DispatchResult::Processed) => {
636                                    tracing::trace!("message dispatch completed");
637                                    None
638                                }
639                                Ok(DispatchResult::Unrelated(data)) => {
640                                    tracing::trace!("unrelated message dispatch completed");
641                                    Some(data)
642                                }
643                                Err(error) => {
644                                    tracing::error!(%error, "error while dispatching packet in the session manager");
645                                    None
646                                }
647                            }
648                        }
649                    })
650                    .map(Ok)
651                    .forward(on_incoming_data_tx)
652                    .inspect(|_| tracing::warn!(
653                        task = %HoprTransportProcess::SessionsManagement(0),
654                        "long-running background task finished"
655                    ))
656            ),
657        );
658
659        // Populate the OnceLock at the end, making sure everything before didn't fail.
660        self.network
661            .clone()
662            .set(transport_network)
663            .map_err(|_| HoprTransportError::Api("transport network viewer already set".into()))?;
664
665        Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
666    }
667
668    #[tracing::instrument(level = "debug", skip(self))]
669    pub async fn ping(
670        &self,
671        peer: &OffchainPublicKey,
672    ) -> errors::Result<(std::time::Duration, <Graph as NetworkGraphView>::Observed)> {
673        let me: &OffchainPublicKey = self.packet_key.public();
674        if peer == me {
675            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
676        }
677
678        let pinger = self
679            .ping
680            .get()
681            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
682
683        let latency = (*pinger).ping(peer).await?;
684
685        if let Some(observations) = self.graph.edge(me, peer) {
686            Ok((latency, observations))
687        } else {
688            Err(HoprTransportError::Api(format!(
689                "no observations available for peer {peer}",
690            )))
691        }
692    }
693
694    #[tracing::instrument(level = "debug", skip(self))]
695    pub async fn new_session(
696        &self,
697        destination: Address,
698        target: SessionTarget,
699        cfg: SessionClientConfig,
700    ) -> errors::Result<(HoprSession, HoprSessionConfigurator)> {
701        let session = self.smgr.new_session(destination, target, cfg).await?;
702        let id = *session.id();
703        Ok((
704            session,
705            HoprSessionConfigurator {
706                id,
707                smgr: Arc::downgrade(&self.smgr),
708            },
709        ))
710    }
711
712    #[tracing::instrument(level = "debug", skip(self))]
713    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
714        self.network
715            .get()
716            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
717            .map(|network| network.listening_as().into_iter().collect())
718            .unwrap_or_default()
719    }
720
721    #[tracing::instrument(level = "debug", skip(self))]
722    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
723        let mut mas = self
724            .local_multiaddresses()
725            .into_iter()
726            .filter(|ma| {
727                crate::multiaddrs::is_supported(ma)
728                    && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
729            })
730            .map(|ma| strip_p2p_protocol(&ma))
731            .filter(|v| !v.is_empty())
732            .collect::<Vec<_>>();
733
734        mas.sort_by(|l, r| {
735            let is_left_dns = crate::multiaddrs::is_dns(l);
736            let is_right_dns = crate::multiaddrs::is_dns(r);
737
738            if !(is_left_dns ^ is_right_dns) {
739                std::cmp::Ordering::Equal
740            } else if is_left_dns {
741                std::cmp::Ordering::Less
742            } else {
743                std::cmp::Ordering::Greater
744            }
745        });
746
747        mas
748    }
749
750    /// Returns a reference to the network graph.
751    pub fn graph(&self) -> &Graph {
752        &self.graph
753    }
754
755    #[tracing::instrument(level = "debug", skip(self))]
756    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
757        self.network
758            .get()
759            .map(|network| network.listening_as().into_iter().collect())
760            .unwrap_or_else(|| {
761                tracing::error!("transport network is not yet initialized, cannot fetch announced multiaddresses");
762                self.my_multiaddresses.clone()
763            })
764    }
765
766    #[tracing::instrument(level = "debug", skip(self))]
767    pub async fn network_observed_multiaddresses(&self, peer: &OffchainPublicKey) -> Vec<Multiaddr> {
768        match self
769            .network
770            .get()
771            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
772        {
773            Ok(network) => network
774                .multiaddress_of(&peer.into())
775                .unwrap_or_default()
776                .into_iter()
777                .collect(),
778            Err(error) => {
779                tracing::error!(%error, "failed to get observed multiaddresses");
780                return vec![];
781            }
782        }
783    }
784
785    #[tracing::instrument(level = "debug", skip(self))]
786    pub async fn network_health(&self) -> Health {
787        self.network
788            .get()
789            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
790            .map(|network| network.health())
791            .unwrap_or(Health::Red)
792    }
793
794    pub async fn network_connected_peers(&self) -> errors::Result<Vec<OffchainPublicKey>> {
795        Ok(futures::stream::iter(
796            self.network
797                .get()
798                .ok_or_else(|| {
799                    tracing::error!("transport network is not yet initialized");
800                    HoprTransportError::Api("transport network is not yet initialized".into())
801                })?
802                .connected_peers(),
803        )
804        .filter_map(|peer_id| async move {
805            match peer_id_to_public_key(&peer_id) {
806                Ok(key) => Some(key),
807                Err(error) => {
808                    tracing::warn!(%peer_id, %error, "failed to convert PeerId to OffchainPublicKey");
809                    None
810                }
811            }
812        })
813        .collect()
814        .await)
815    }
816
817    #[tracing::instrument(level = "debug", skip(self))]
818    pub fn network_peer_observations(&self, peer: &OffchainPublicKey) -> Option<<Graph as NetworkGraphView>::Observed> {
819        self.graph.edge(self.packet_key.public(), peer)
820    }
821
822    /// Get connected peers with quality higher than some value.
823    #[tracing::instrument(level = "debug", skip(self))]
824    pub async fn all_network_peers(
825        &self,
826        minimum_score: f64,
827    ) -> errors::Result<Vec<(OffchainPublicKey, <Graph as NetworkGraphView>::Observed)>> {
828        let me = self.packet_key.public();
829        Ok(self
830            .network_connected_peers()
831            .await?
832            .into_iter()
833            .filter_map(|peer| {
834                let observation = self.graph.edge(me, &peer);
835                if let Some(info) = observation {
836                    if info.score() >= minimum_score {
837                        Some((peer, info))
838                    } else {
839                        None
840                    }
841                } else {
842                    None
843                }
844            })
845            .collect::<Vec<_>>())
846    }
847}
848
849fn build_mixer_cfg_from_env() -> MixerConfig {
850    let mixer_cfg = MixerConfig {
851        min_delay: std::time::Duration::from_millis(
852            std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
853                .map(|v| {
854                    v.trim()
855                        .parse::<u64>()
856                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
857                })
858                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
859        ),
860        delay_range: std::time::Duration::from_millis(
861            std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
862                .map(|v| {
863                    v.trim()
864                        .parse::<u64>()
865                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
866                })
867                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
868        ),
869        capacity: {
870            let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
871                .ok()
872                .and_then(|s| s.trim().parse::<usize>().ok())
873                .filter(|&c| c > 0)
874                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
875            debug!(capacity = capacity, "Setting mixer capacity");
876            capacity
877        },
878        ..MixerConfig::default()
879    };
880    debug!(?mixer_cfg, "Mixer configuration");
881
882    mixer_cfg
883}
884
885// ---------------------------------------------------------------------------
886// NetworkView impl for HoprTransport — wraps OnceLock<Net> access
887// ---------------------------------------------------------------------------
888
889impl<Chain, Graph, Net> NetworkView for HoprTransport<Chain, Graph, Net>
890where
891    Net: NetworkView + Send + Sync + 'static,
892{
893    fn listening_as(&self) -> std::collections::HashSet<Multiaddr> {
894        self.network.get().map(|n| n.listening_as()).unwrap_or_default()
895    }
896
897    fn multiaddress_of(&self, peer: &PeerId) -> Option<std::collections::HashSet<Multiaddr>> {
898        self.network.get()?.multiaddress_of(peer)
899    }
900
901    fn discovered_peers(&self) -> std::collections::HashSet<PeerId> {
902        self.network.get().map(|n| n.discovered_peers()).unwrap_or_default()
903    }
904
905    fn connected_peers(&self) -> std::collections::HashSet<PeerId> {
906        self.network.get().map(|n| n.connected_peers()).unwrap_or_default()
907    }
908
909    fn is_connected(&self, peer: &PeerId) -> bool {
910        self.network.get().map(|n| n.is_connected(peer)).unwrap_or(false)
911    }
912
913    fn health(&self) -> Health {
914        self.network.get().map(|n| n.health()).unwrap_or(Health::Red)
915    }
916
917    fn subscribe_network_events(
918        &self,
919    ) -> impl futures::Stream<Item = hopr_api::network::NetworkEvent> + Send + 'static {
920        match self.network.get() {
921            Some(n) => futures::future::Either::Left(n.subscribe_network_events()),
922            None => futures::future::Either::Right(futures::stream::empty()),
923        }
924    }
925}
926
927// ---------------------------------------------------------------------------
928// TransportOperations impl for HoprTransport
929// ---------------------------------------------------------------------------
930
931#[async_trait::async_trait]
932impl<Chain, Graph, Net> hopr_api::node::TransportOperations for HoprTransport<Chain, Graph, Net>
933where
934    Chain: ChainReadChannelOperations
935        + ChainReadAccountOperations
936        + hopr_api::chain::ChainWriteTicketOperations
937        + ChainKeyOperations
938        + hopr_api::chain::ChainReadTicketOperations
939        + ChainValues
940        + Clone
941        + Send
942        + Sync
943        + 'static,
944    Graph: NetworkGraphView<NodeId = OffchainPublicKey>
945        + NetworkGraphUpdate
946        + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
947        + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
948        + Clone
949        + Send
950        + Sync
951        + 'static,
952    <Graph as NetworkGraphView>::Observed: EdgeObservableRead + Send,
953    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed: EdgeObservableRead + Send + 'static,
954    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
955    Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
956{
957    type Error = errors::HoprTransportError;
958    type Observable = <Graph as NetworkGraphView>::Observed;
959
960    async fn ping(&self, key: &OffchainPublicKey) -> Result<(Duration, Self::Observable), Self::Error> {
961        self.ping(key).await
962    }
963
964    async fn observed_multiaddresses(&self, key: &OffchainPublicKey) -> Vec<Multiaddr> {
965        self.network_observed_multiaddresses(key).await
966    }
967}