hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! See also the `hopr_protocol_start` crate for details on Start sub-protocol which initiates a Session.
6//!
7//! As such, the transport layer components should be only those that are directly necessary to:
8//!
9//! 1. send and receive a packet, acknowledgement or ticket aggregation request
10//! 2. send and receive a network telemetry request
11//! 3. automate transport level processes
12//! 4. algorithms associated with the transport layer operational management
13//! 5. interface specifications to allow modular behavioral extensions
14
15/// Configuration of the [crate::HoprTransport].
16pub mod config;
17/// Constants used and exposed by the crate.
18pub mod constants;
19/// Errors used by the crate.
20pub mod errors;
21mod helpers;
22pub mod network_notifier;
23
24#[cfg(feature = "capture")]
25mod capture;
26mod pipeline;
27pub mod socket;
28
29use std::{
30    sync::{Arc, OnceLock},
31    time::Duration,
32};
33
34use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
35use futures::{
36    FutureExt, SinkExt, StreamExt,
37    channel::mpsc::{Sender, channel},
38};
39use helpers::PathPlanner;
40pub use hopr_api::db::ChannelTicketStatistics;
41use hopr_api::{
42    chain::{AccountSelector, ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
43    db::{HoprDbPeersOperations, HoprDbTicketOperations, PeerOrigin, PeerStatus},
44};
45use hopr_async_runtime::{AbortableList, prelude::spawn, spawn_as_abortable};
46use hopr_crypto_packet::prelude::PacketSignal;
47pub use hopr_crypto_types::{
48    keypairs::{ChainKeypair, Keypair, OffchainKeypair},
49    types::{HalfKeyChallenge, Hash, OffchainPublicKey},
50};
51pub use hopr_internal_types::prelude::HoprPseudonym;
52use hopr_internal_types::prelude::*;
53pub use hopr_network_types::prelude::RoutingOptions;
54use hopr_network_types::prelude::{DestinationRouting, *};
55use hopr_primitive_types::prelude::*;
56pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
57use hopr_protocol_hopr::MemorySurbStore;
58use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
59pub use hopr_transport_identity::{Multiaddr, PeerId, Protocol};
60use hopr_transport_mixer::MixerConfig;
61pub use hopr_transport_network::network::{Health, Network};
62use hopr_transport_p2p::HoprSwarm;
63use hopr_transport_probe::{
64    Probe,
65    neighbors::ImmediateNeighborProber,
66    ping::{PingConfig, Pinger},
67};
68pub use hopr_transport_probe::{
69    errors::ProbeError,
70    ping::PingQueryReplier,
71    traits::TrafficGeneration,
72    types::{NeighborTelemetry, Telemetry},
73};
74pub use hopr_transport_protocol::{PeerDiscovery, TicketEvent};
75pub use hopr_transport_session as session;
76#[cfg(feature = "runtime-tokio")]
77pub use hopr_transport_session::transfer_session;
78pub use hopr_transport_session::{
79    Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
80    SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
81    errors::{SessionManagerError, TransportSessionError},
82};
83use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
84use tracing::{Instrument, debug, error, info, trace, warn};
85
86pub use crate::config::HoprProtocolConfig;
87use crate::{
88    constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, pipeline::HoprPipelineComponents,
89    socket::HoprSocket,
90};
91
92pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
93
94// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
95lazy_static::lazy_static! {
96    static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
97}
98
99#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
100pub enum HoprTransportProcess {
101    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
102    Medium,
103    #[strum(to_string = "HOPR packet pipeline ({0})")]
104    Pipeline(hopr_transport_protocol::PacketPipelineProcesses),
105    #[strum(to_string = "session manager sub-process #{0}")]
106    SessionsManagement(usize),
107    #[strum(to_string = "network probing sub-process: {0}")]
108    Probing(hopr_transport_probe::HoprProbeProcess),
109    #[strum(to_string = "sync of outgoing ticket indices")]
110    OutgoingIndexSync,
111    #[cfg(feature = "capture")]
112    #[strum(to_string = "packet capture")]
113    Capture,
114}
115
116// TODO (4.1): implement path selector based on probing
117/// Currently used implementation of [`PathSelector`](hopr_path::selectors::PathSelector).
118type CurrentPathSelector = NoPathSelector;
119
120/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
121/// the transport.
122pub struct HoprTransport<Chain, Db> {
123    packet_key: OffchainKeypair,
124    chain_key: ChainKeypair,
125    db: Db,
126    chain_api: Chain,
127    ping: Arc<OnceLock<Pinger>>,
128    network: Arc<Network<Db>>,
129    path_planner: PathPlanner<MemorySurbStore, Chain, CurrentPathSelector>,
130    my_multiaddresses: Vec<Multiaddr>,
131    smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>,
132    cfg: HoprProtocolConfig,
133}
134
135impl<Chain, Db> HoprTransport<Chain, Db>
136where
137    Db: HoprDbTicketOperations + HoprDbPeersOperations + Clone + Send + Sync + 'static,
138    Chain: ChainReadChannelOperations
139        + ChainReadAccountOperations
140        + ChainKeyOperations
141        + ChainValues
142        + Clone
143        + Send
144        + Sync
145        + 'static,
146{
147    pub fn new(
148        identity: (&ChainKeypair, &OffchainKeypair),
149        resolver: Chain,
150        db: Db,
151        my_multiaddresses: Vec<Multiaddr>,
152        cfg: HoprProtocolConfig,
153    ) -> Self {
154        Self {
155            packet_key: identity.1.clone(),
156            chain_key: identity.0.clone(),
157            ping: Arc::new(OnceLock::new()),
158            network: Arc::new(Network::new(
159                identity.1.into(),
160                my_multiaddresses.clone(),
161                cfg.network,
162                db.clone(),
163            )),
164            path_planner: PathPlanner::new(
165                *identity.0.as_ref(),
166                MemorySurbStore::new(cfg.packet.surb_store),
167                resolver.clone(),
168                CurrentPathSelector::default(),
169            ),
170            my_multiaddresses,
171            smgr: SessionManager::new(SessionManagerConfig {
172                // TODO(v3.1): Use the entire range of tags properly
173                session_tag_range: (16..65535),
174                maximum_sessions: cfg.session.maximum_sessions as usize,
175                frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
176                    .ok()
177                    .and_then(|s| s.parse::<usize>().ok())
178                    .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
179                    .max(ApplicationData::PAYLOAD_SIZE),
180                max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
181                    .ok()
182                    .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
183                    .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
184                    .max(Duration::from_millis(100)),
185                initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
186                idle_timeout: cfg.session.idle_timeout,
187                balancer_sampling_interval: cfg.session.balancer_sampling_interval,
188                initial_return_session_egress_rate: 10,
189                minimum_surb_buffer_duration: Duration::from_secs(5),
190                maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
191                // Allow a 10% increase of the target SURB buffer on incoming Sessions
192                // if the SURB buffer level has surpassed it by at least 10% in the last 2 minutes.
193                growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
194            }),
195            db,
196            chain_api: resolver,
197            cfg,
198        }
199    }
200
201    /// Execute all processes of the [`HoprTransport`] object.
202    ///
203    /// This method will spawn the [`crate::HoprTransportProcess::Heartbeat`],
204    /// [`crate::HoprTransportProcess::BloomFilterSave`], [`crate::HoprTransportProcess::Swarm`] and session-related
205    /// processes and return join handles to the calling function. These processes are not started immediately but
206    /// are waiting for a trigger from this piece of code.
207    pub async fn run<S, T, Ct>(
208        &self,
209        cover_traffic: Option<Ct>,
210        discovery_updates: S,
211        ticket_events: T,
212        on_incoming_session: Sender<IncomingSession>,
213    ) -> errors::Result<(
214        HoprSocket<
215            futures::channel::mpsc::Receiver<ApplicationDataIn>,
216            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
217        >,
218        AbortableList<HoprTransportProcess>,
219    )>
220    where
221        S: futures::Stream<Item = PeerDiscovery> + Send + 'static,
222        T: futures::Sink<TicketEvent> + Clone + Send + Unpin + 'static,
223        T::Error: std::error::Error,
224        Ct: TrafficGeneration + Send + Sync + 'static,
225    {
226        info!("loading initial peers from the chain");
227        let public_nodes = self
228            .chain_api
229            .stream_accounts(AccountSelector {
230                public_only: true,
231                ..Default::default()
232            })
233            .await
234            .map_err(|e| HoprTransportError::Other(e.into()))?
235            .collect::<Vec<_>>()
236            .await;
237
238        // Calculate the minimum capacity based on public nodes (each node can generate 2 messages)
239        // plus 100 as an additional buffer
240        let minimum_capacity = public_nodes.len().saturating_mul(2).saturating_add(100);
241
242        let internal_discovery_updates_capacity = std::env::var("HOPR_INTERNAL_DISCOVERY_UPDATES_CAPACITY")
243            .ok()
244            .and_then(|s| s.trim().parse::<usize>().ok())
245            .filter(|&c| c > 0)
246            .unwrap_or(2048)
247            .max(minimum_capacity);
248
249        debug!(
250            capacity = internal_discovery_updates_capacity,
251            minimum_required = minimum_capacity,
252            "creating internal discovery updates channel"
253        );
254        let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
255            futures::channel::mpsc::channel::<PeerDiscovery>(internal_discovery_updates_capacity);
256
257        let me_peerid: PeerId = self.packet_key.public().into();
258        let network = self.network.clone();
259        let discovery_updates = futures_concurrency::stream::StreamExt::merge(
260            discovery_updates,
261            internal_discovery_update_rx,
262        )
263        .filter_map(move |event| {
264            let network = network.clone();
265            async move {
266                match event {
267                    PeerDiscovery::Announce(peer, multiaddresses) => {
268                        debug!(%peer, ?multiaddresses, "processing peer discovery event: Announce");
269                        if peer != me_peerid {
270                            // decapsulate the `p2p/<peer_id>` to remove duplicities
271                            let mas = multiaddresses
272                                .into_iter()
273                                .map(|ma| strip_p2p_protocol(&ma))
274                                .filter(|v| !v.is_empty())
275                                .collect::<Vec<_>>();
276
277                            if !mas.is_empty() {
278                                if let Err(error) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await {
279                                    error!(%peer, %error, "failed to add peer to the network");
280                                    None
281                                } else {
282                                    Some(PeerDiscovery::Announce(peer, mas))
283                                }
284                            } else {
285                                None
286                            }
287                        } else {
288                            None
289                        }
290                    }
291                }
292            }
293        });
294
295        info!(
296            public_nodes = public_nodes.len(),
297            "initializing swarm with peers from chain"
298        );
299
300        for node_entry in public_nodes {
301            if let AccountType::Announced(multiaddresses) = node_entry.entry_type {
302                let peer: PeerId = node_entry.public_key.into();
303
304                debug!(%peer, ?multiaddresses, "using initial public node");
305
306                internal_discovery_update_tx
307                    .send(PeerDiscovery::Announce(peer, multiaddresses))
308                    .await
309                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
310            }
311        }
312
313        let mut processes = AbortableList::<HoprTransportProcess>::default();
314
315        let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
316            channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
317
318        // -- transport medium
319        let mixer_cfg = build_mixer_cfg_from_env();
320
321        let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
322
323        let transport_layer = HoprSwarm::new(
324            (&self.packet_key).into(),
325            discovery_updates,
326            self.my_multiaddresses.clone(),
327            self.cfg.transport.prefer_local_addresses,
328        )
329        .await;
330
331        let msg_proto_control =
332            transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
333        let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
334        let (wire_msg_tx, wire_msg_rx) =
335            hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
336
337        let _mixing_process_before_sending_out = spawn(
338            mixing_channel_rx
339                .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
340                .map(Ok)
341                .forward(wire_msg_tx)
342                .inspect(|_| {
343                    tracing::warn!(
344                        task = "mixer -> egress process",
345                        "long-running background task finished"
346                    )
347                }),
348        );
349
350        let (transport_events_tx, transport_events_rx) = channel::<hopr_transport_p2p::DiscoveryEvent>(2048);
351
352        let network_clone = self.network.clone();
353        spawn(
354            transport_events_rx
355                .for_each(move |event| {
356                    let network = network_clone.clone();
357                    async move {
358                        match event {
359                            hopr_transport_p2p::DiscoveryEvent::IncomingConnection(peer, multiaddr) => {
360                                if let Err(error) = network
361                                    .add(&peer, PeerOrigin::IncomingConnection, vec![multiaddr])
362                                    .await
363                                {
364                                    tracing::error!(%peer, %error, "Failed to add incoming connection peer");
365                                }
366                            }
367                            hopr_transport_p2p::DiscoveryEvent::FailedDial(peer) => {
368                                if let Err(error) = network
369                                    .update(&peer, Err(hopr_transport_network::network::UpdateFailure::DialFailure))
370                                    .await
371                                {
372                                    tracing::error!(%peer, %error, "Failed to update peer status after failed dial");
373                                }
374                            }
375                        }
376                    }
377                })
378                .inspect(|_| {
379                    tracing::warn!(
380                        task = "transport events recording",
381                        "long-running background task finished"
382                    )
383                }),
384        );
385
386        processes.insert(
387            HoprTransportProcess::Medium,
388            spawn_as_abortable!(transport_layer.run(transport_events_tx).inspect(|_| tracing::warn!(
389                task = %HoprTransportProcess::Medium,
390                "long-running background task finished"
391            ))),
392        );
393
394        let msg_protocol_bidirectional_channel_capacity =
395            std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
396                .ok()
397                .and_then(|s| s.trim().parse::<usize>().ok())
398                .filter(|&c| c > 0)
399                .unwrap_or(16_384);
400
401        let (on_incoming_data_tx, on_incoming_data_rx) =
402            channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
403
404        debug!(
405            capacity = msg_protocol_bidirectional_channel_capacity,
406            "creating protocol bidirectional channel"
407        );
408        let (tx_from_protocol, rx_from_protocol) =
409            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
410
411        // We have to resolve DestinationRouting -> ResolvedTransportRouting before
412        // sending the external packets to the transport pipeline.
413        let path_planner = self.path_planner.clone();
414        let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
415        let all_resolved_external_msg_rx = unresolved_routing_msg_rx.filter_map(move |(unresolved, mut data)| {
416            let path_planner = path_planner.clone();
417            async move {
418                trace!(?unresolved, "resolving routing for packet");
419                match path_planner
420                    .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
421                    .await
422                {
423                    Ok((resolved, rem_surbs)) => {
424                        // Set the SURB distress/out-of-SURBs flag if applicable.
425                        // These flags are translated into HOPR protocol packet signals and are
426                        // applicable only on the return path.
427                        let mut signals_to_dst = data
428                            .packet_info
429                            .as_ref()
430                            .map(|info| info.signals_to_destination)
431                            .unwrap_or_default();
432
433                        if resolved.is_return() {
434                            signals_to_dst = match rem_surbs {
435                                Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
436                                    signals_to_dst | PacketSignal::SurbDistress
437                                }
438                                Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
439                                _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
440                            };
441                        } else {
442                            // Unset these flags as they make no sense on the forward path.
443                            signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
444                        }
445
446                        data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
447                        trace!(?resolved, "resolved routing for packet");
448                        Some((resolved, data))
449                    }
450                    Err(error) => {
451                        error!(%error, "failed to resolve routing");
452                        None
453                    }
454                }
455            }
456            .in_current_span()
457        });
458
459        let channels_dst = self
460            .chain_api
461            .domain_separators()
462            .await
463            .map_err(HoprTransportError::chain)?
464            .channel;
465
466        processes.extend_from(pipeline::run_hopr_packet_pipeline(
467            (self.packet_key.clone(), self.chain_key.clone()),
468            (mixing_channel_tx, wire_msg_rx),
469            (tx_from_protocol, all_resolved_external_msg_rx),
470            HoprPipelineComponents {
471                ticket_events,
472                surb_store: self.path_planner.surb_store.clone(),
473                chain_api: self.chain_api.clone(),
474                db: self.db.clone(),
475            },
476            channels_dst,
477            self.cfg.packet,
478        ));
479
480        // -- network probing
481        debug!(
482            capacity = msg_protocol_bidirectional_channel_capacity,
483            note = "same as protocol bidirectional",
484            "Creating probing channel"
485        );
486
487        let (tx_from_probing, rx_from_probing) =
488            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
489
490        let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
491            .ok()
492            .and_then(|s| s.trim().parse::<usize>().ok())
493            .filter(|&c| c > 0)
494            .unwrap_or(128);
495        debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
496        let (manual_ping_tx, manual_ping_rx) = channel::<(PeerId, PingQueryReplier)>(manual_ping_channel_capacity);
497
498        let probe = Probe::new(self.cfg.probe);
499        let probing_processes = if let Some(ct) = cover_traffic {
500            probe
501                .continuously_scan(
502                    (unresolved_routing_msg_tx.clone(), rx_from_protocol),
503                    manual_ping_rx,
504                    tx_from_probing,
505                    ct,
506                )
507                .await
508        } else {
509            probe
510                .continuously_scan(
511                    (unresolved_routing_msg_tx.clone(), rx_from_protocol),
512                    manual_ping_rx,
513                    tx_from_probing,
514                    ImmediateNeighborProber::new(
515                        self.cfg.probe,
516                        network_notifier::ProbeNetworkInteractions::new(self.network.clone()),
517                    ),
518                )
519                .await
520        };
521
522        processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
523
524        // manual ping
525        self.ping
526            .clone()
527            .set(Pinger::new(
528                PingConfig {
529                    timeout: self.cfg.probe.timeout,
530                },
531                manual_ping_tx,
532            ))
533            .expect("must set the ticket aggregation writer only once");
534
535        // -- session management
536        self.smgr
537            .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
538            .expect("failed to start session manager")
539            .into_iter()
540            .enumerate()
541            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
542            .for_each(|(k, v)| {
543                processes.insert(k, v);
544            });
545
546        let smgr = self.smgr.clone();
547        processes.insert(
548            HoprTransportProcess::SessionsManagement(0),
549            spawn_as_abortable!(
550                rx_from_probing
551                    .filter_map(move |(pseudonym, data)| {
552                        let smgr = smgr.clone();
553                        async move {
554                            match smgr.dispatch_message(pseudonym, data).await {
555                                Ok(DispatchResult::Processed) => {
556                                    tracing::trace!("message dispatch completed");
557                                    None
558                                }
559                                Ok(DispatchResult::Unrelated(data)) => {
560                                    tracing::trace!("unrelated message dispatch completed");
561                                    Some(data)
562                                }
563                                Err(error) => {
564                                    tracing::error!(%error, "error while dispatching packet in the session manager");
565                                    None
566                                }
567                            }
568                        }
569                    })
570                    .map(Ok)
571                    .forward(on_incoming_data_tx)
572                    .inspect(|_| tracing::warn!(
573                        task = %HoprTransportProcess::SessionsManagement(0),
574                        "long-running background task finished"
575                    ))
576            ),
577        );
578
579        Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
580    }
581
582    #[tracing::instrument(level = "debug", skip(self))]
583    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
584        if peer == &self.packet_key.public().into() {
585            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
586        }
587
588        let pinger = self
589            .ping
590            .get()
591            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
592
593        if let Err(error) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
594            error!(%error, "failed to store the peer observation");
595        }
596
597        let latency = (*pinger).ping(*peer).await?;
598
599        let peer_status = self
600            .network
601            .get(peer)
602            .await?
603            .ok_or(HoprTransportError::Probe(ProbeError::NonExistingPeer))?;
604
605        Ok((latency, peer_status))
606    }
607
608    #[tracing::instrument(level = "debug", skip(self))]
609    pub async fn new_session(
610        &self,
611        destination: Address,
612        target: SessionTarget,
613        cfg: SessionClientConfig,
614    ) -> errors::Result<HoprSession> {
615        Ok(self.smgr.new_session(destination, target, cfg).await?)
616    }
617
618    #[tracing::instrument(level = "debug", skip(self))]
619    pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
620        Ok(self.smgr.ping_session(id).await?)
621    }
622
623    pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
624        Ok(self.smgr.get_surb_balancer_config(id).await?)
625    }
626
627    pub async fn update_session_surb_balancing_cfg(
628        &self,
629        id: &SessionId,
630        cfg: SurbBalancerConfig,
631    ) -> errors::Result<()> {
632        Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
633    }
634
635    #[tracing::instrument(level = "debug", skip(self))]
636    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
637        self.network
638            .get(&self.packet_key.public().into())
639            .await
640            .unwrap_or_else(|e| {
641                error!(error = %e, "failed to obtain listening multi-addresses");
642                None
643            })
644            .map(|peer| peer.multiaddresses)
645            .unwrap_or_default()
646    }
647
648    #[tracing::instrument(level = "debug", skip(self))]
649    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
650        let mut mas = self
651            .local_multiaddresses()
652            .into_iter()
653            .filter(|ma| {
654                hopr_transport_identity::multiaddrs::is_supported(ma)
655                    && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
656            })
657            .map(|ma| strip_p2p_protocol(&ma))
658            .filter(|v| !v.is_empty())
659            .collect::<Vec<_>>();
660
661        mas.sort_by(|l, r| {
662            let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
663            let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
664
665            if !(is_left_dns ^ is_right_dns) {
666                std::cmp::Ordering::Equal
667            } else if is_left_dns {
668                std::cmp::Ordering::Less
669            } else {
670                std::cmp::Ordering::Greater
671            }
672        });
673
674        mas
675    }
676
677    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
678        self.my_multiaddresses.clone()
679    }
680
681    #[tracing::instrument(level = "debug", skip(self))]
682    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
683        self.network
684            .get(peer)
685            .await
686            .unwrap_or(None)
687            .map(|peer| peer.multiaddresses)
688            .unwrap_or(vec![])
689    }
690
691    #[tracing::instrument(level = "debug", skip(self))]
692    pub async fn network_health(&self) -> Health {
693        self.network.health().await
694    }
695
696    #[tracing::instrument(level = "debug", skip(self))]
697    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
698        Ok(self.network.connected_peers().await?)
699    }
700
701    #[tracing::instrument(level = "debug", skip(self))]
702    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
703        Ok(self.network.get(peer).await?)
704    }
705}
706
707fn build_mixer_cfg_from_env() -> MixerConfig {
708    let mixer_cfg = MixerConfig {
709        min_delay: std::time::Duration::from_millis(
710            std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
711                .map(|v| {
712                    v.trim()
713                        .parse::<u64>()
714                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
715                })
716                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
717        ),
718        delay_range: std::time::Duration::from_millis(
719            std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
720                .map(|v| {
721                    v.trim()
722                        .parse::<u64>()
723                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
724                })
725                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
726        ),
727        capacity: {
728            let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
729                .ok()
730                .and_then(|s| s.trim().parse::<usize>().ok())
731                .filter(|&c| c > 0)
732                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
733            debug!(capacity = capacity, "Setting mixer capacity");
734            capacity
735        },
736        ..MixerConfig::default()
737    };
738    debug!(?mixer_cfg, "Mixer configuration");
739
740    mixer_cfg
741}