hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! See also the `hopr_protocol_start` crate for details on Start sub-protocol which initiates a Session.
6//!
7//! As such, the transport layer components should be only those that are directly necessary to:
8//!
9//! 1. send and receive a packet, acknowledgement or ticket aggregation request
10//! 2. send and receive a network telemetry request
11//! 3. automate transport level processes
12//! 4. algorithms associated with the transport layer operational management
13//! 5. interface specifications to allow modular behavioral extensions
14
15/// Configuration of the [crate::HoprTransport].
16pub mod config;
17/// Constants used and exposed by the crate.
18pub mod constants;
19/// Errors used by the crate.
20pub mod errors;
21mod helpers;
22
23#[cfg(feature = "capture")]
24mod capture;
25mod pipeline;
26pub mod socket;
27
28use std::{
29    sync::{Arc, OnceLock},
30    time::Duration,
31};
32
33use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
34use futures::{
35    FutureExt, SinkExt, StreamExt,
36    channel::mpsc::{Sender, channel},
37};
38use helpers::PathPlanner;
39use hopr_api::{
40    chain::{AccountSelector, ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
41    db::HoprDbTicketOperations,
42};
43pub use hopr_api::{
44    db::ChannelTicketStatistics,
45    network::{Health, Observable, traits::NetworkView},
46};
47use hopr_async_runtime::{AbortableList, prelude::spawn, spawn_as_abortable};
48use hopr_crypto_packet::prelude::PacketSignal;
49pub use hopr_crypto_types::{
50    keypairs::{ChainKeypair, Keypair, OffchainKeypair},
51    types::{HalfKeyChallenge, Hash, OffchainPublicKey},
52};
53use hopr_ct_telemetry::ImmediateNeighborChannelGraph;
54pub use hopr_internal_types::prelude::HoprPseudonym;
55use hopr_internal_types::prelude::*;
56pub use hopr_network_types::prelude::RoutingOptions;
57use hopr_network_types::prelude::{DestinationRouting, *};
58use hopr_primitive_types::prelude::*;
59pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
60use hopr_protocol_hopr::MemorySurbStore;
61use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
62pub use hopr_transport_identity::{Multiaddr, PeerId, Protocol};
63use hopr_transport_mixer::MixerConfig;
64pub use hopr_transport_network::observation::Observations;
65use hopr_transport_p2p::{HoprLibp2pNetworkBuilder, HoprNetwork};
66use hopr_transport_probe::{
67    Probe, TrafficGeneration,
68    ping::{PingConfig, Pinger},
69};
70pub use hopr_transport_probe::{errors::ProbeError, ping::PingQueryReplier};
71pub use hopr_transport_protocol::{PeerDiscovery, TicketEvent};
72pub use hopr_transport_session as session;
73#[cfg(feature = "runtime-tokio")]
74pub use hopr_transport_session::transfer_session;
75pub use hopr_transport_session::{
76    Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
77    SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
78    errors::{SessionManagerError, TransportSessionError},
79};
80use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
81use tracing::{Instrument, debug, error, info, trace, warn};
82
83pub use crate::config::HoprProtocolConfig;
84use crate::{
85    constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, pipeline::HoprPipelineComponents,
86    socket::HoprSocket,
87};
88
89pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
90
91pub use hopr_api as api;
92
93// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
94lazy_static::lazy_static! {
95    static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
96}
97
98#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
99pub enum HoprTransportProcess {
100    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
101    Medium,
102    #[strum(to_string = "HOPR packet pipeline ({0})")]
103    Pipeline(hopr_transport_protocol::PacketPipelineProcesses),
104    #[strum(to_string = "session manager sub-process #{0}")]
105    SessionsManagement(usize),
106    #[strum(to_string = "network probing sub-process: {0}")]
107    Probing(hopr_transport_probe::HoprProbeProcess),
108    #[strum(to_string = "sync of outgoing ticket indices")]
109    OutgoingIndexSync,
110    #[cfg(feature = "capture")]
111    #[strum(to_string = "packet capture")]
112    Capture,
113}
114
115// TODO (4.1): implement path selector based on probing
116/// Currently used implementation of [`PathSelector`](hopr_path::selectors::PathSelector).
117type CurrentPathSelector = NoPathSelector;
118
119/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
120/// the transport.
121pub struct HoprTransport<Chain, Db> {
122    packet_key: OffchainKeypair,
123    chain_key: ChainKeypair,
124    db: Db,
125    chain_api: Chain,
126    ping: Arc<OnceLock<Pinger>>,
127    network: Arc<OnceLock<HoprNetwork>>,
128    path_planner: PathPlanner<MemorySurbStore, Chain, CurrentPathSelector>,
129    my_multiaddresses: Vec<Multiaddr>,
130    smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>,
131    cfg: HoprProtocolConfig,
132}
133
134impl<Chain, Db> HoprTransport<Chain, Db>
135where
136    Db: HoprDbTicketOperations + Clone + Send + Sync + 'static,
137    Chain: ChainReadChannelOperations
138        + ChainReadAccountOperations
139        + ChainKeyOperations
140        + ChainValues
141        + Clone
142        + Send
143        + Sync
144        + 'static,
145{
146    pub fn new(
147        identity: (&ChainKeypair, &OffchainKeypair),
148        resolver: Chain,
149        db: Db,
150        my_multiaddresses: Vec<Multiaddr>,
151        cfg: HoprProtocolConfig,
152    ) -> Self {
153        Self {
154            packet_key: identity.1.clone(),
155            chain_key: identity.0.clone(),
156            ping: Arc::new(OnceLock::new()),
157            network: Arc::new(OnceLock::new()),
158            path_planner: PathPlanner::new(
159                *identity.0.as_ref(),
160                MemorySurbStore::new(cfg.packet.surb_store),
161                resolver.clone(),
162                CurrentPathSelector::default(),
163            ),
164            my_multiaddresses,
165            smgr: SessionManager::new(SessionManagerConfig {
166                // TODO(v4.0): Use the entire range of tags properly
167                session_tag_range: (16..65535),
168                maximum_sessions: cfg.session.maximum_sessions as usize,
169                frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
170                    .ok()
171                    .and_then(|s| s.parse::<usize>().ok())
172                    .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
173                    .max(ApplicationData::PAYLOAD_SIZE),
174                max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
175                    .ok()
176                    .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
177                    .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
178                    .max(Duration::from_millis(100)),
179                initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
180                idle_timeout: cfg.session.idle_timeout,
181                balancer_sampling_interval: cfg.session.balancer_sampling_interval,
182                initial_return_session_egress_rate: 10,
183                minimum_surb_buffer_duration: Duration::from_secs(5),
184                maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
185                // Allow a 10% increase of the target SURB buffer on incoming Sessions
186                // if the SURB buffer level has surpassed it by at least 10% in the last 2 minutes.
187                growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
188            }),
189            db,
190            chain_api: resolver,
191            cfg,
192        }
193    }
194
195    /// Execute all processes of the [`HoprTransport`] object.
196    ///
197    /// This method will spawn the [`crate::HoprTransportProcess::Heartbeat`],
198    /// [`crate::HoprTransportProcess::BloomFilterSave`], [`crate::HoprTransportProcess::Swarm`] and session-related
199    /// processes and return join handles to the calling function. These processes are not started immediately but
200    /// are waiting for a trigger from this piece of code.
201    pub async fn run<S, T, Ct>(
202        &self,
203        cover_traffic: Ct,
204        discovery_updates: S,
205        ticket_events: T,
206        on_incoming_session: Sender<IncomingSession>,
207    ) -> errors::Result<(
208        HoprSocket<
209            futures::channel::mpsc::Receiver<ApplicationDataIn>,
210            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
211        >,
212        AbortableList<HoprTransportProcess>,
213    )>
214    where
215        S: futures::Stream<Item = PeerDiscovery> + Send + 'static,
216        T: futures::Sink<TicketEvent> + Clone + Send + Unpin + 'static,
217        T::Error: std::error::Error,
218        Ct: TrafficGeneration + Send + Sync + 'static,
219    {
220        info!("loading initial peers from the chain");
221        let public_nodes = self
222            .chain_api
223            .stream_accounts(AccountSelector {
224                public_only: true,
225                ..Default::default()
226            })
227            .await
228            .map_err(|e| HoprTransportError::Other(e.into()))?
229            .collect::<Vec<_>>()
230            .await;
231
232        // Calculate the minimum capacity based on public nodes (each node can generate 2 messages)
233        // plus 100 as an additional buffer
234        let minimum_capacity = public_nodes.len().saturating_mul(2).saturating_add(100);
235
236        let internal_discovery_updates_capacity = std::env::var("HOPR_INTERNAL_DISCOVERY_UPDATES_CAPACITY")
237            .ok()
238            .and_then(|s| s.trim().parse::<usize>().ok())
239            .filter(|&c| c > 0)
240            .unwrap_or(2048)
241            .max(minimum_capacity);
242
243        debug!(
244            capacity = internal_discovery_updates_capacity,
245            minimum_required = minimum_capacity,
246            "creating internal discovery updates channel"
247        );
248        let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
249            futures::channel::mpsc::channel::<PeerDiscovery>(internal_discovery_updates_capacity);
250
251        let discovery_updates =
252            futures_concurrency::stream::StreamExt::merge(discovery_updates, internal_discovery_update_rx);
253
254        info!(
255            public_nodes = public_nodes.len(),
256            "initializing swarm with peers from chain"
257        );
258
259        for node_entry in public_nodes {
260            if let AccountType::Announced(multiaddresses) = node_entry.entry_type {
261                let peer: PeerId = node_entry.public_key.into();
262
263                debug!(%peer, ?multiaddresses, "using initial public node");
264
265                internal_discovery_update_tx
266                    .send(PeerDiscovery::Announce(peer, multiaddresses))
267                    .await
268                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
269            }
270        }
271
272        let mut processes = AbortableList::<HoprTransportProcess>::default();
273
274        let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
275            channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
276
277        // -- transport medium
278
279        // NOTE: Private address filtering is implemented at multiple levels for defense-in-depth:
280        // 1. Discovery events are filtered before they reach the transport component
281        // 2. SwarmEvent::NewExternalAddrOfPeer events are filtered using is_public_address()
282        let allow_private_addresses = self.cfg.transport.prefer_local_addresses;
283        let (transport_network, transport_layer_process) = HoprLibp2pNetworkBuilder::new(
284            (&self.packet_key).into(),
285            discovery_updates.filter_map(move |event| async move {
286                match event {
287                    PeerDiscovery::Announce(peer, multiaddrs) => {
288                        let multiaddrs = multiaddrs
289                            .into_iter()
290                            .filter(|ma| {
291                                hopr_transport_identity::multiaddrs::is_supported(ma)
292                                    && (allow_private_addresses || is_public_address(ma))
293                            })
294                            .collect::<Vec<_>>();
295                        if multiaddrs.is_empty() {
296                            None
297                        } else {
298                            Some(PeerDiscovery::Announce(peer, multiaddrs))
299                        }
300                    }
301                }
302            }),
303            self.my_multiaddresses.clone(),
304        )
305        .await
306        .into_network_with_stream_protocol_process(
307            hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL,
308            allow_private_addresses,
309        );
310        self.network
311            .clone()
312            .set(transport_network.clone())
313            .map_err(|_| HoprTransportError::Api("transport network viewer already set".into()))?;
314
315        let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
316        let (wire_msg_tx, wire_msg_rx) =
317            hopr_transport_protocol::stream::process_stream_protocol(msg_codec, transport_network.clone()).await?;
318
319        let (mixing_channel_tx, mixing_channel_rx) =
320            hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(build_mixer_cfg_from_env());
321
322        // the process is terminated, when the input stream runs out
323        let _mixing_process_before_sending_out = spawn(
324            mixing_channel_rx
325                .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
326                .map(Ok)
327                .forward(wire_msg_tx)
328                .inspect(|_| {
329                    tracing::warn!(
330                        task = "mixer -> egress process",
331                        "long-running background task finished"
332                    )
333                }),
334        );
335
336        processes.insert(
337            HoprTransportProcess::Medium,
338            spawn_as_abortable!(transport_layer_process.inspect(|_| tracing::warn!(
339                task = %HoprTransportProcess::Medium,
340                "long-running background task finished"
341            ))),
342        );
343
344        let msg_protocol_bidirectional_channel_capacity =
345            std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
346                .ok()
347                .and_then(|s| s.trim().parse::<usize>().ok())
348                .filter(|&c| c > 0)
349                .unwrap_or(16_384);
350
351        let (on_incoming_data_tx, on_incoming_data_rx) =
352            channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
353
354        debug!(
355            capacity = msg_protocol_bidirectional_channel_capacity,
356            "creating protocol bidirectional channel"
357        );
358        let (tx_from_protocol, rx_from_protocol) =
359            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
360
361        // We have to resolve DestinationRouting -> ResolvedTransportRouting before
362        // sending the external packets to the transport pipeline.
363        let path_planner = self.path_planner.clone();
364        let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
365        let all_resolved_external_msg_rx = unresolved_routing_msg_rx.filter_map(move |(unresolved, mut data)| {
366            let path_planner = path_planner.clone();
367            async move {
368                trace!(?unresolved, "resolving routing for packet");
369                match path_planner
370                    .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
371                    .await
372                {
373                    Ok((resolved, rem_surbs)) => {
374                        // Set the SURB distress/out-of-SURBs flag if applicable.
375                        // These flags are translated into HOPR protocol packet signals and are
376                        // applicable only on the return path.
377                        let mut signals_to_dst = data
378                            .packet_info
379                            .as_ref()
380                            .map(|info| info.signals_to_destination)
381                            .unwrap_or_default();
382
383                        if resolved.is_return() {
384                            signals_to_dst = match rem_surbs {
385                                Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
386                                    signals_to_dst | PacketSignal::SurbDistress
387                                }
388                                Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
389                                _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
390                            };
391                        } else {
392                            // Unset these flags as they make no sense on the forward path.
393                            signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
394                        }
395
396                        data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
397                        trace!(?resolved, "resolved routing for packet");
398                        Some((resolved, data))
399                    }
400                    Err(error) => {
401                        error!(%error, "failed to resolve routing");
402                        None
403                    }
404                }
405            }
406            .in_current_span()
407        });
408
409        let channels_dst = self
410            .chain_api
411            .domain_separators()
412            .await
413            .map_err(HoprTransportError::chain)?
414            .channel;
415
416        processes.extend_from(pipeline::run_hopr_packet_pipeline(
417            (self.packet_key.clone(), self.chain_key.clone()),
418            (mixing_channel_tx, wire_msg_rx),
419            (tx_from_protocol, all_resolved_external_msg_rx),
420            HoprPipelineComponents {
421                ticket_events,
422                surb_store: self.path_planner.surb_store.clone(),
423                chain_api: self.chain_api.clone(),
424                db: self.db.clone(),
425            },
426            channels_dst,
427            self.cfg.packet,
428        ));
429
430        // -- network probing
431        debug!(
432            capacity = msg_protocol_bidirectional_channel_capacity,
433            note = "same as protocol bidirectional",
434            "Creating probing channel"
435        );
436
437        let (tx_from_probing, rx_from_probing) =
438            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
439
440        let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
441            .ok()
442            .and_then(|s| s.trim().parse::<usize>().ok())
443            .filter(|&c| c > 0)
444            .unwrap_or(128);
445        debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
446        let (manual_ping_tx, manual_ping_rx) = channel::<(PeerId, PingQueryReplier)>(manual_ping_channel_capacity);
447
448        let probe = Probe::new(self.cfg.probe);
449
450        let probing_processes = probe
451            .continuously_scan(
452                (unresolved_routing_msg_tx.clone(), rx_from_protocol),
453                manual_ping_rx,
454                tx_from_probing,
455                cover_traffic,
456                ImmediateNeighborChannelGraph::new(transport_network.clone(), self.cfg.probe.recheck_threshold),
457            )
458            .await;
459
460        processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
461
462        // manual ping
463        self.ping
464            .clone()
465            .set(Pinger::new(
466                PingConfig {
467                    timeout: self.cfg.probe.timeout,
468                },
469                manual_ping_tx,
470            ))
471            .map_err(|_| HoprTransportError::Api("must set the ticket aggregation writer only once".into()))?;
472
473        // -- session management
474        self.smgr
475            .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
476            .map_err(|_| HoprTransportError::Api("failed to start session manager".into()))?
477            .into_iter()
478            .enumerate()
479            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
480            .for_each(|(k, v)| {
481                processes.insert(k, v);
482            });
483
484        let smgr = self.smgr.clone();
485        processes.insert(
486            HoprTransportProcess::SessionsManagement(0),
487            spawn_as_abortable!(
488                rx_from_probing
489                    .filter_map(move |(pseudonym, data)| {
490                        let smgr = smgr.clone();
491                        async move {
492                            match smgr.dispatch_message(pseudonym, data).await {
493                                Ok(DispatchResult::Processed) => {
494                                    tracing::trace!("message dispatch completed");
495                                    None
496                                }
497                                Ok(DispatchResult::Unrelated(data)) => {
498                                    tracing::trace!("unrelated message dispatch completed");
499                                    Some(data)
500                                }
501                                Err(error) => {
502                                    tracing::error!(%error, "error while dispatching packet in the session manager");
503                                    None
504                                }
505                            }
506                        }
507                    })
508                    .map(Ok)
509                    .forward(on_incoming_data_tx)
510                    .inspect(|_| tracing::warn!(
511                        task = %HoprTransportProcess::SessionsManagement(0),
512                        "long-running background task finished"
513                    ))
514            ),
515        );
516
517        Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
518    }
519
520    #[tracing::instrument(level = "debug", skip(self))]
521    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
522        if peer == &self.packet_key.public().into() {
523            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
524        }
525
526        let pinger = self
527            .ping
528            .get()
529            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
530
531        let network = self
532            .network
533            .get()
534            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))?;
535
536        let latency = (*pinger).ping(*peer).await?;
537
538        let observations = network
539            .observations_for(peer)
540            .ok_or(HoprTransportError::Probe(ProbeError::NonExistingPeer))?;
541
542        Ok((latency, observations))
543    }
544
545    #[tracing::instrument(level = "debug", skip(self))]
546    pub async fn new_session(
547        &self,
548        destination: Address,
549        target: SessionTarget,
550        cfg: SessionClientConfig,
551    ) -> errors::Result<HoprSession> {
552        Ok(self.smgr.new_session(destination, target, cfg).await?)
553    }
554
555    #[tracing::instrument(level = "debug", skip(self))]
556    pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
557        Ok(self.smgr.ping_session(id).await?)
558    }
559
560    pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
561        Ok(self.smgr.get_surb_balancer_config(id).await?)
562    }
563
564    pub async fn update_session_surb_balancing_cfg(
565        &self,
566        id: &SessionId,
567        cfg: SurbBalancerConfig,
568    ) -> errors::Result<()> {
569        Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
570    }
571
572    #[tracing::instrument(level = "debug", skip(self))]
573    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
574        self.network
575            .get()
576            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
577            .map(|network| network.listening_as().into_iter().collect())
578            .unwrap_or_default()
579    }
580
581    #[tracing::instrument(level = "debug", skip(self))]
582    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
583        let mut mas = self
584            .local_multiaddresses()
585            .into_iter()
586            .filter(|ma| {
587                hopr_transport_identity::multiaddrs::is_supported(ma)
588                    && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
589            })
590            .map(|ma| strip_p2p_protocol(&ma))
591            .filter(|v| !v.is_empty())
592            .collect::<Vec<_>>();
593
594        mas.sort_by(|l, r| {
595            let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
596            let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
597
598            if !(is_left_dns ^ is_right_dns) {
599                std::cmp::Ordering::Equal
600            } else if is_left_dns {
601                std::cmp::Ordering::Less
602            } else {
603                std::cmp::Ordering::Greater
604            }
605        });
606
607        mas
608    }
609
610    #[tracing::instrument(level = "debug", skip(self))]
611    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
612        self.network
613            .get()
614            .map(|network| network.listening_as().into_iter().collect())
615            .unwrap_or_else(|| {
616                tracing::error!("transport network is not yet initialized, cannot fetch announced multiaddresses");
617                self.my_multiaddresses.clone()
618            })
619    }
620
621    #[tracing::instrument(level = "debug", skip(self))]
622    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
623        match self
624            .network
625            .get()
626            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
627        {
628            Ok(network) => network.multiaddress_of(peer).unwrap_or_default().into_iter().collect(),
629            Err(error) => {
630                tracing::error!(%error, "failed to get observed multiaddresses");
631                return vec![];
632            }
633        }
634    }
635
636    #[tracing::instrument(level = "debug", skip(self))]
637    pub async fn network_health(&self) -> Health {
638        self.network
639            .get()
640            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
641            .map(|network| network.health())
642            .unwrap_or(Health::Red)
643    }
644
645    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
646        Ok(self
647            .network
648            .get()
649            .ok_or_else(|| {
650                tracing::error!("transport network is not yet initialized");
651                HoprTransportError::Api("transport network is not yet initialized".into())
652            })?
653            .connected_peers()
654            .into_iter()
655            .collect())
656    }
657
658    #[tracing::instrument(level = "debug", skip(self))]
659    pub async fn network_peer_observations(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
660        Ok(self
661            .network
662            .get()
663            .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))?
664            .observations_for(peer))
665    }
666}
667
668fn build_mixer_cfg_from_env() -> MixerConfig {
669    let mixer_cfg = MixerConfig {
670        min_delay: std::time::Duration::from_millis(
671            std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
672                .map(|v| {
673                    v.trim()
674                        .parse::<u64>()
675                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
676                })
677                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
678        ),
679        delay_range: std::time::Duration::from_millis(
680            std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
681                .map(|v| {
682                    v.trim()
683                        .parse::<u64>()
684                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
685                })
686                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
687        ),
688        capacity: {
689            let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
690                .ok()
691                .and_then(|s| s.trim().parse::<usize>().ok())
692                .filter(|&c| c > 0)
693                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
694            debug!(capacity = capacity, "Setting mixer capacity");
695            capacity
696        },
697        ..MixerConfig::default()
698    };
699    debug!(?mixer_cfg, "Mixer configuration");
700
701    mixer_cfg
702}