Skip to main content

hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! See also the `hopr_protocol_start` crate for details on Start sub-protocol which initiates a Session.
6//!
7//! As such, the transport layer components should be only those that are directly necessary to:
8//!
9//! 1. send and receive a packet, acknowledgement or ticket aggregation request
10//! 2. send and receive a network telemetry request
11//! 3. automate transport level processes
12//! 4. algorithms associated with the transport layer operational management
13//! 5. interface specifications to allow modular behavioral extensions
14
15/// Configuration of the [crate::HoprTransport].
16pub mod config;
17/// Constants used and exposed by the crate.
18pub mod constants;
19/// Errors used by the crate.
20pub mod errors;
21mod helpers;
22
23#[cfg(feature = "telemetry")]
24pub mod stats;
25
26#[cfg(feature = "capture")]
27mod capture;
28mod pipeline;
29pub mod socket;
30
31use std::{
32    sync::{Arc, OnceLock},
33    time::Duration,
34};
35
36use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
37use futures::{
38    FutureExt, SinkExt, StreamExt,
39    channel::mpsc::{Sender, channel},
40};
41use helpers::PathPlanner;
42use hopr_api::{
43    chain::{AccountSelector, ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
44    db::HoprDbTicketOperations,
45};
46pub use hopr_api::{
47    db::ChannelTicketStatistics,
48    network::{Health, Observable, traits::NetworkView},
49};
50use hopr_async_runtime::{AbortableList, prelude::spawn, spawn_as_abortable};
51use hopr_crypto_packet::prelude::PacketSignal;
52pub use hopr_crypto_types::{
53    keypairs::{ChainKeypair, Keypair, OffchainKeypair},
54    types::{HalfKeyChallenge, Hash, OffchainPublicKey},
55};
56use hopr_ct_telemetry::ImmediateNeighborChannelGraph;
57pub use hopr_internal_types::prelude::HoprPseudonym;
58use hopr_internal_types::prelude::*;
59pub use hopr_network_types::prelude::RoutingOptions;
60use hopr_network_types::prelude::{DestinationRouting, *};
61use hopr_primitive_types::prelude::*;
62pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
63use hopr_protocol_hopr::MemorySurbStore;
64use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
65pub use hopr_transport_identity::{Multiaddr, PeerId, Protocol};
66use hopr_transport_mixer::MixerConfig;
67pub use hopr_transport_network::observation::Observations;
68use hopr_transport_p2p::{HoprLibp2pNetworkBuilder, HoprNetwork};
69use hopr_transport_probe::{
70    Probe, TrafficGeneration,
71    ping::{PingConfig, Pinger},
72};
73pub use hopr_transport_probe::{errors::ProbeError, ping::PingQueryReplier};
74pub use hopr_transport_protocol::{PeerDiscovery, TicketEvent};
75pub use hopr_transport_session as session;
76#[cfg(feature = "runtime-tokio")]
77pub use hopr_transport_session::transfer_session;
78pub use hopr_transport_session::{
79    Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
80    SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
81    errors::{SessionManagerError, TransportSessionError},
82};
83use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
84#[cfg(feature = "telemetry")]
85pub use hopr_transport_session::{
86    SessionAckMode, SessionLifecycleState, SessionLifetimeSnapshot, SessionStatsSnapshot,
87};
88#[cfg(feature = "telemetry")]
89pub use stats::PeerPacketStats;
90use tracing::{Instrument, debug, error, info, trace, warn};
91
92pub use crate::config::HoprProtocolConfig;
93#[cfg(feature = "telemetry")]
94pub use crate::stats::PeerPacketStatsSnapshot;
95use crate::{
96    constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, pipeline::HoprPipelineComponents,
97    socket::HoprSocket,
98};
99
100pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
101
102pub use hopr_api as api;
103
104// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
105lazy_static::lazy_static! {
106    static ref SESSION_INITIATION_TIMEOUT_MAX: Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
107}
108
109#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
110pub enum HoprTransportProcess {
111    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
112    Medium,
113    #[strum(to_string = "HOPR packet pipeline ({0})")]
114    Pipeline(hopr_transport_protocol::PacketPipelineProcesses),
115    #[strum(to_string = "session manager sub-process #{0}")]
116    SessionsManagement(usize),
117    #[strum(to_string = "network probing sub-process: {0}")]
118    Probing(hopr_transport_probe::HoprProbeProcess),
119    #[strum(to_string = "sync of outgoing ticket indices")]
120    OutgoingIndexSync,
121    #[cfg(feature = "capture")]
122    #[strum(to_string = "packet capture")]
123    Capture,
124}
125
126// TODO (4.1): implement path selector based on probing
127/// Currently used implementation of [`PathSelector`](hopr_path::selectors::PathSelector).
128type CurrentPathSelector = NoPathSelector;
129
130/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
131/// the transport.
132pub struct HoprTransport<Chain, Db> {
133    packet_key: OffchainKeypair,
134    chain_key: ChainKeypair,
135    db: Db,
136    chain_api: Chain,
137    ping: Arc<OnceLock<Pinger>>,
138    network: Arc<OnceLock<HoprNetwork>>,
139    path_planner: PathPlanner<MemorySurbStore, Chain, CurrentPathSelector>,
140    my_multiaddresses: Vec<Multiaddr>,
141    smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>,
142    cfg: HoprProtocolConfig,
143}
144
145impl<Chain, Db> HoprTransport<Chain, Db>
146where
147    Db: HoprDbTicketOperations + Clone + Send + Sync + 'static,
148    Chain: ChainReadChannelOperations
149        + ChainReadAccountOperations
150        + ChainKeyOperations
151        + ChainValues
152        + Clone
153        + Send
154        + Sync
155        + 'static,
156{
157    pub fn new(
158        identity: (&ChainKeypair, &OffchainKeypair),
159        resolver: Chain,
160        db: Db,
161        my_multiaddresses: Vec<Multiaddr>,
162        cfg: HoprProtocolConfig,
163    ) -> Self {
164        Self {
165            packet_key: identity.1.clone(),
166            chain_key: identity.0.clone(),
167            ping: Arc::new(OnceLock::new()),
168            network: Arc::new(OnceLock::new()),
169            path_planner: PathPlanner::new(
170                *identity.0.as_ref(),
171                MemorySurbStore::new(cfg.packet.surb_store),
172                resolver.clone(),
173                CurrentPathSelector::default(),
174            ),
175            my_multiaddresses,
176            smgr: SessionManager::new(SessionManagerConfig {
177                // TODO(v4.0): Use the entire range of tags properly
178                session_tag_range: (16..65535),
179                maximum_sessions: cfg.session.maximum_sessions as usize,
180                frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
181                    .ok()
182                    .and_then(|s| s.parse::<usize>().ok())
183                    .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
184                    .max(ApplicationData::PAYLOAD_SIZE),
185                max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
186                    .ok()
187                    .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
188                    .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
189                    .max(Duration::from_millis(100)),
190                initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
191                idle_timeout: cfg.session.idle_timeout,
192                balancer_sampling_interval: cfg.session.balancer_sampling_interval,
193                initial_return_session_egress_rate: 10,
194                minimum_surb_buffer_duration: Duration::from_secs(5),
195                maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
196                // Allow a 10% increase of the target SURB buffer on incoming Sessions
197                // if the SURB buffer level has surpassed it by at least 10% in the last 2 minutes.
198                growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
199            }),
200            db,
201            chain_api: resolver,
202            cfg,
203        }
204    }
205
206    /// Execute all processes of the [`HoprTransport`] object.
207    ///
208    /// This method will spawn the [`crate::HoprTransportProcess::Heartbeat`],
209    /// [`crate::HoprTransportProcess::BloomFilterSave`], [`crate::HoprTransportProcess::Swarm`] and session-related
210    /// processes and return join handles to the calling function. These processes are not started immediately but
211    /// are waiting for a trigger from this piece of code.
212    pub async fn run<S, T, Ct>(
213        &self,
214        cover_traffic: Ct,
215        discovery_updates: S,
216        ticket_events: T,
217        on_incoming_session: Sender<IncomingSession>,
218    ) -> errors::Result<(
219        HoprSocket<
220            futures::channel::mpsc::Receiver<ApplicationDataIn>,
221            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
222        >,
223        AbortableList<HoprTransportProcess>,
224    )>
225    where
226        S: futures::Stream<Item = PeerDiscovery> + Send + 'static,
227        T: futures::Sink<TicketEvent> + Clone + Send + Unpin + 'static,
228        T::Error: std::error::Error,
229        Ct: TrafficGeneration + Send + Sync + 'static,
230    {
231        info!("loading initial peers from the chain");
232        let public_nodes = self
233            .chain_api
234            .stream_accounts(AccountSelector {
235                public_only: true,
236                ..Default::default()
237            })
238            .await
239            .map_err(|e| HoprTransportError::Other(e.into()))?
240            .collect::<Vec<_>>()
241            .await;
242
243        // Calculate the minimum capacity based on public nodes (each node can generate 2 messages)
244        // plus 100 as an additional buffer
245        let minimum_capacity = public_nodes.len().saturating_mul(2).saturating_add(100);
246
247        let internal_discovery_updates_capacity = std::env::var("HOPR_INTERNAL_DISCOVERY_UPDATES_CAPACITY")
248            .ok()
249            .and_then(|s| s.trim().parse::<usize>().ok())
250            .filter(|&c| c > 0)
251            .unwrap_or(2048)
252            .max(minimum_capacity);
253
254        debug!(
255            capacity = internal_discovery_updates_capacity,
256            minimum_required = minimum_capacity,
257            "creating internal discovery updates channel"
258        );
259        let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
260            futures::channel::mpsc::channel::<PeerDiscovery>(internal_discovery_updates_capacity);
261
262        let discovery_updates =
263            futures_concurrency::stream::StreamExt::merge(discovery_updates, internal_discovery_update_rx);
264
265        info!(
266            public_nodes = public_nodes.len(),
267            "initializing swarm with peers from chain"
268        );
269
270        for node_entry in public_nodes {
271            if let AccountType::Announced(multiaddresses) = node_entry.entry_type {
272                let peer: PeerId = node_entry.public_key.into();
273
274                debug!(%peer, ?multiaddresses, "using initial public node");
275
276                internal_discovery_update_tx
277                    .send(PeerDiscovery::Announce(peer, multiaddresses))
278                    .await
279                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
280            }
281        }
282
283        let mut processes = AbortableList::<HoprTransportProcess>::default();
284
285        let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
286            channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
287
288        // -- transport medium
289
290        // NOTE: Private address filtering is implemented at multiple levels for defense-in-depth:
291        // 1. Discovery events are filtered before they reach the transport component
292        // 2. SwarmEvent::NewExternalAddrOfPeer events are filtered using is_public_address()
293        let allow_private_addresses = self.cfg.transport.prefer_local_addresses;
294        let (transport_network, transport_layer_process) = HoprLibp2pNetworkBuilder::new(
295            (&self.packet_key).into(),
296            discovery_updates.filter_map(move |event| async move {
297                match event {
298                    PeerDiscovery::Announce(peer, multiaddrs) => {
299                        let multiaddrs = multiaddrs
300                            .into_iter()
301                            .filter(|ma| {
302                                hopr_transport_identity::multiaddrs::is_supported(ma)
303                                    && (allow_private_addresses || is_public_address(ma))
304                            })
305                            .collect::<Vec<_>>();
306                        if multiaddrs.is_empty() {
307                            None
308                        } else {
309                            Some(PeerDiscovery::Announce(peer, multiaddrs))
310                        }
311                    }
312                }
313            }),
314            self.my_multiaddresses.clone(),
315        )
316        .await
317        .into_network_with_stream_protocol_process(
318            hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL,
319            allow_private_addresses,
320        );
321        self.network
322            .clone()
323            .set(transport_network.clone())
324            .map_err(|_| HoprTransportError::Api("transport network viewer already set".into()))?;
325
326        let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
327        let (wire_msg_tx, wire_msg_rx) =
328            hopr_transport_protocol::stream::process_stream_protocol(msg_codec, transport_network.clone()).await?;
329
330        let (mixing_channel_tx, mixing_channel_rx) =
331            hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(build_mixer_cfg_from_env());
332
333        // the process is terminated, when the input stream runs out
334        let _mixing_process_before_sending_out = spawn(
335            mixing_channel_rx
336                .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
337                .map(Ok)
338                .forward(wire_msg_tx)
339                .inspect(|_| {
340                    tracing::warn!(
341                        task = "mixer -> egress process",
342                        "long-running background task finished"
343                    )
344                }),
345        );
346
347        processes.insert(
348            HoprTransportProcess::Medium,
349            spawn_as_abortable!(transport_layer_process.inspect(|_| tracing::warn!(
350                task = %HoprTransportProcess::Medium,
351                "long-running background task finished"
352            ))),
353        );
354
355        let msg_protocol_bidirectional_channel_capacity =
356            std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
357                .ok()
358                .and_then(|s| s.trim().parse::<usize>().ok())
359                .filter(|&c| c > 0)
360                .unwrap_or(16_384);
361
362        let (on_incoming_data_tx, on_incoming_data_rx) =
363            channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
364
365        debug!(
366            capacity = msg_protocol_bidirectional_channel_capacity,
367            "creating protocol bidirectional channel"
368        );
369        let (tx_from_protocol, rx_from_protocol) =
370            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
371
372        // We have to resolve DestinationRouting -> ResolvedTransportRouting before
373        // sending the external packets to the transport pipeline.
374        let path_planner = self.path_planner.clone();
375        let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
376        let all_resolved_external_msg_rx = unresolved_routing_msg_rx.filter_map(move |(unresolved, mut data)| {
377            let path_planner = path_planner.clone();
378            async move {
379                trace!(?unresolved, "resolving routing for packet");
380                match path_planner
381                    .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
382                    .await
383                {
384                    Ok((resolved, rem_surbs)) => {
385                        // Set the SURB distress/out-of-SURBs flag if applicable.
386                        // These flags are translated into HOPR protocol packet signals and are
387                        // applicable only on the return path.
388                        let mut signals_to_dst = data
389                            .packet_info
390                            .as_ref()
391                            .map(|info| info.signals_to_destination)
392                            .unwrap_or_default();
393
394                        if resolved.is_return() {
395                            signals_to_dst = match rem_surbs {
396                                Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
397                                    signals_to_dst | PacketSignal::SurbDistress
398                                }
399                                Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
400                                _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
401                            };
402                        } else {
403                            // Unset these flags as they make no sense on the forward path.
404                            signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
405                        }
406
407                        data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
408                        trace!(?resolved, "resolved routing for packet");
409                        Some((resolved, data))
410                    }
411                    Err(error) => {
412                        error!(%error, "failed to resolve routing");
413                        None
414                    }
415                }
416            }
417            .in_current_span()
418        });
419
420        let channels_dst = self
421            .chain_api
422            .domain_separators()
423            .await
424            .map_err(HoprTransportError::chain)?
425            .channel;
426
427        processes.extend_from(pipeline::run_hopr_packet_pipeline(
428            (self.packet_key.clone(), self.chain_key.clone()),
429            (mixing_channel_tx, wire_msg_rx),
430            (tx_from_protocol, all_resolved_external_msg_rx),
431            HoprPipelineComponents {
432                ticket_events,
433                surb_store: self.path_planner.surb_store.clone(),
434                chain_api: self.chain_api.clone(),
435                db: self.db.clone(),
436            },
437            channels_dst,
438            self.cfg.packet,
439        ));
440
441        // -- network probing
442        debug!(
443            capacity = msg_protocol_bidirectional_channel_capacity,
444            note = "same as protocol bidirectional",
445            "Creating probing channel"
446        );
447
448        let (tx_from_probing, rx_from_probing) =
449            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
450
451        let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
452            .ok()
453            .and_then(|s| s.trim().parse::<usize>().ok())
454            .filter(|&c| c > 0)
455            .unwrap_or(128);
456        debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
457        let (manual_ping_tx, manual_ping_rx) = channel::<(PeerId, PingQueryReplier)>(manual_ping_channel_capacity);
458
459        let probe = Probe::new(self.cfg.probe);
460
461        let probing_processes = probe
462            .continuously_scan(
463                (unresolved_routing_msg_tx.clone(), rx_from_protocol),
464                manual_ping_rx,
465                tx_from_probing,
466                cover_traffic,
467                ImmediateNeighborChannelGraph::new(transport_network.clone(), self.cfg.probe.recheck_threshold),
468            )
469            .await;
470
471        processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
472
473        // manual ping
474        self.ping
475            .clone()
476            .set(Pinger::new(
477                PingConfig {
478                    timeout: self.cfg.probe.timeout,
479                },
480                manual_ping_tx,
481            ))
482            .map_err(|_| HoprTransportError::Api("must set the ticket aggregation writer only once".into()))?;
483
484        // -- session management
485        self.smgr
486            .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
487            .map_err(|_| HoprTransportError::Api("failed to start session manager".into()))?
488            .into_iter()
489            .enumerate()
490            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
491            .for_each(|(k, v)| {
492                processes.insert(k, v);
493            });
494
495        let smgr = self.smgr.clone();
496        processes.insert(
497            HoprTransportProcess::SessionsManagement(0),
498            spawn_as_abortable!(
499                rx_from_probing
500                    .filter_map(move |(pseudonym, data)| {
501                        let smgr = smgr.clone();
502                        async move {
503                            match smgr.dispatch_message(pseudonym, data).await {
504                                Ok(DispatchResult::Processed) => {
505                                    tracing::trace!("message dispatch completed");
506                                    None
507                                }
508                                Ok(DispatchResult::Unrelated(data)) => {
509                                    tracing::trace!("unrelated message dispatch completed");
510                                    Some(data)
511                                }
512                                Err(error) => {
513                                    tracing::error!(%error, "error while dispatching packet in the session manager");
514                                    None
515                                }
516                            }
517                        }
518                    })
519                    .map(Ok)
520                    .forward(on_incoming_data_tx)
521                    .inspect(|_| tracing::warn!(
522                        task = %HoprTransportProcess::SessionsManagement(0),
523                        "long-running background task finished"
524                    ))
525            ),
526        );
527
528        Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
529    }
530
531    #[tracing::instrument(level = "debug", skip(self))]
532    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
533        if peer == &self.packet_key.public().into() {
534            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
535        }
536
537        let pinger = self
538            .ping
539            .get()
540            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
541
542        let network = self
543            .network
544            .get()
545            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))?;
546
547        let latency = (*pinger).ping(*peer).await?;
548
549        let observations = match network.observations_for(peer) {
550            Some(observations) => observations,
551            None => {
552                // artificial delay to allow the observations to be recorded, the 50ms is random value
553                futures_time::task::sleep(futures_time::time::Duration::from_millis(50)).await;
554
555                network
556                    .observations_for(peer)
557                    .ok_or(HoprTransportError::Probe(ProbeError::NonExistingPeer))?
558            }
559        };
560
561        Ok((latency, observations))
562    }
563
564    #[tracing::instrument(level = "debug", skip(self))]
565    pub async fn new_session(
566        &self,
567        destination: Address,
568        target: SessionTarget,
569        cfg: SessionClientConfig,
570    ) -> errors::Result<HoprSession> {
571        Ok(self.smgr.new_session(destination, target, cfg).await?)
572    }
573
574    #[tracing::instrument(level = "debug", skip(self))]
575    pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
576        Ok(self.smgr.ping_session(id).await?)
577    }
578
579    pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
580        Ok(self.smgr.get_surb_balancer_config(id).await?)
581    }
582
583    #[cfg(feature = "telemetry")]
584    pub async fn session_stats(&self, id: &SessionId) -> errors::Result<SessionStatsSnapshot> {
585        Ok(self.smgr.get_session_stats(id).await?)
586    }
587
588    pub async fn update_session_surb_balancing_cfg(
589        &self,
590        id: &SessionId,
591        cfg: SurbBalancerConfig,
592    ) -> errors::Result<()> {
593        Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
594    }
595
596    #[tracing::instrument(level = "debug", skip(self))]
597    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
598        self.network
599            .get()
600            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
601            .map(|network| network.listening_as().into_iter().collect())
602            .unwrap_or_default()
603    }
604
605    #[tracing::instrument(level = "debug", skip(self))]
606    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
607        let mut mas = self
608            .local_multiaddresses()
609            .into_iter()
610            .filter(|ma| {
611                hopr_transport_identity::multiaddrs::is_supported(ma)
612                    && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
613            })
614            .map(|ma| strip_p2p_protocol(&ma))
615            .filter(|v| !v.is_empty())
616            .collect::<Vec<_>>();
617
618        mas.sort_by(|l, r| {
619            let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
620            let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
621
622            if !(is_left_dns ^ is_right_dns) {
623                std::cmp::Ordering::Equal
624            } else if is_left_dns {
625                std::cmp::Ordering::Less
626            } else {
627                std::cmp::Ordering::Greater
628            }
629        });
630
631        mas
632    }
633
634    #[tracing::instrument(level = "debug", skip(self))]
635    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
636        self.network
637            .get()
638            .map(|network| network.listening_as().into_iter().collect())
639            .unwrap_or_else(|| {
640                tracing::error!("transport network is not yet initialized, cannot fetch announced multiaddresses");
641                self.my_multiaddresses.clone()
642            })
643    }
644
645    #[tracing::instrument(level = "debug", skip(self))]
646    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
647        match self
648            .network
649            .get()
650            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
651        {
652            Ok(network) => network.multiaddress_of(peer).unwrap_or_default().into_iter().collect(),
653            Err(error) => {
654                tracing::error!(%error, "failed to get observed multiaddresses");
655                return vec![];
656            }
657        }
658    }
659
660    #[tracing::instrument(level = "debug", skip(self))]
661    pub async fn network_health(&self) -> Health {
662        self.network
663            .get()
664            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
665            .map(|network| network.health())
666            .unwrap_or(Health::Red)
667    }
668
669    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
670        Ok(self
671            .network
672            .get()
673            .ok_or_else(|| {
674                tracing::error!("transport network is not yet initialized");
675                HoprTransportError::Api("transport network is not yet initialized".into())
676            })?
677            .connected_peers()
678            .into_iter()
679            .collect())
680    }
681
682    #[tracing::instrument(level = "debug", skip(self))]
683    pub async fn network_peer_observations(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
684        Ok(self
685            .network
686            .get()
687            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))?
688            .observations_for(peer))
689    }
690
691    /// Get packet stats snapshot for a specific peer.
692    #[cfg(feature = "telemetry")]
693    #[tracing::instrument(level = "debug", skip(self))]
694    pub async fn network_peer_packet_stats(&self, peer: &PeerId) -> errors::Result<Option<PeerPacketStatsSnapshot>> {
695        Err(HoprTransportError::Api("not implemented yet".into()))
696    }
697
698    /// Get packet stats for all connected peers.
699    #[cfg(feature = "telemetry")]
700    #[tracing::instrument(level = "debug", skip(self))]
701    pub async fn network_all_packet_stats(&self) -> errors::Result<Vec<(PeerId, PeerPacketStatsSnapshot)>> {
702        Err(HoprTransportError::Api("not implemented yet".into()))
703    }
704}
705
706fn build_mixer_cfg_from_env() -> MixerConfig {
707    let mixer_cfg = MixerConfig {
708        min_delay: std::time::Duration::from_millis(
709            std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
710                .map(|v| {
711                    v.trim()
712                        .parse::<u64>()
713                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
714                })
715                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
716        ),
717        delay_range: std::time::Duration::from_millis(
718            std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
719                .map(|v| {
720                    v.trim()
721                        .parse::<u64>()
722                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
723                })
724                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
725        ),
726        capacity: {
727            let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
728                .ok()
729                .and_then(|s| s.trim().parse::<usize>().ok())
730                .filter(|&c| c > 0)
731                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
732            debug!(capacity = capacity, "Setting mixer capacity");
733            capacity
734        },
735        ..MixerConfig::default()
736    };
737    debug!(?mixer_cfg, "Mixer configuration");
738
739    mixer_cfg
740}