hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! See also the `hopr_protocol_start` crate for details on Start sub-protocol which initiates a Session.
6//!
7//! As such, the transport layer components should be only those that are directly necessary to:
8//!
9//! 1. send and receive a packet, acknowledgement or ticket aggregation request
10//! 2. send and receive a network telemetry request
11//! 3. automate transport level processes
12//! 4. algorithms associated with the transport layer operational management
13//! 5. interface specifications to allow modular behavioral extensions
14
15/// Configuration of the [crate::HoprTransport].
16pub mod config;
17/// Constants used and exposed by the crate.
18pub mod constants;
19/// Errors used by the crate.
20pub mod errors;
21mod helpers;
22pub mod network_notifier;
23
24pub mod socket;
25
26use std::{
27    sync::{Arc, OnceLock},
28    time::Duration,
29};
30
31use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
32use futures::{
33    FutureExt, SinkExt, StreamExt,
34    channel::mpsc::{Sender, channel},
35};
36use helpers::PathPlanner;
37use hopr_api::{
38    chain::{AccountSelector, ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
39    db::{
40        HoprDbPeersOperations, HoprDbProtocolOperations, HoprDbTicketOperations, PeerOrigin, PeerStatus, TicketSelector,
41    },
42};
43use hopr_async_runtime::{AbortableList, prelude::spawn, spawn_as_abortable};
44use hopr_crypto_packet::prelude::PacketSignal;
45pub use hopr_crypto_types::{
46    keypairs::{ChainKeypair, Keypair, OffchainKeypair},
47    types::{HalfKeyChallenge, Hash, OffchainPublicKey},
48};
49pub use hopr_internal_types::prelude::HoprPseudonym;
50use hopr_internal_types::prelude::*;
51pub use hopr_network_types::prelude::RoutingOptions;
52use hopr_network_types::prelude::{DestinationRouting, *};
53use hopr_primitive_types::prelude::*;
54pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
55use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
56pub use hopr_transport_identity::{Multiaddr, PeerId, Protocol};
57use hopr_transport_mixer::MixerConfig;
58pub use hopr_transport_network::network::{Health, Network};
59use hopr_transport_p2p::HoprSwarm;
60use hopr_transport_probe::{
61    Probe,
62    neighbors::ImmediateNeighborProber,
63    ping::{PingConfig, Pinger},
64};
65pub use hopr_transport_probe::{
66    errors::ProbeError,
67    ping::PingQueryReplier,
68    traits::TrafficGeneration,
69    types::{NeighborTelemetry, Telemetry},
70};
71pub use hopr_transport_protocol::PeerDiscovery;
72use hopr_transport_protocol::processor::PacketInteractionConfig;
73pub use hopr_transport_session as session;
74#[cfg(feature = "runtime-tokio")]
75pub use hopr_transport_session::transfer_session;
76pub use hopr_transport_session::{
77    Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
78    SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
79    errors::{SessionManagerError, TransportSessionError},
80};
81use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
82#[cfg(feature = "mixer-stream")]
83use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
84use tracing::{Instrument, debug, error, info, trace, warn};
85
86pub use crate::{config::HoprTransportConfig, helpers::TicketStatistics};
87use crate::{constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, socket::HoprSocket};
88
89pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
90
91#[cfg(any(
92    all(feature = "mixer-channel", feature = "mixer-stream"),
93    all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
94))]
95compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
96
97// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
98lazy_static::lazy_static! {
99    static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
100}
101
102#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
103pub enum HoprTransportProcess {
104    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
105    Medium,
106    #[strum(to_string = "HOPR protocol ({0})")]
107    Protocol(hopr_transport_protocol::ProtocolProcesses),
108    #[strum(to_string = "session manager sub-process #{0}")]
109    SessionsManagement(usize),
110    #[strum(to_string = "network probing sub-process: {0}")]
111    Probing(hopr_transport_probe::HoprProbeProcess),
112}
113
114// TODO (4.1): implement path selector based on probing
115/// Currently used implementation of [`PathSelector`](hopr_path::selectors::PathSelector).
116type CurrentPathSelector = NoPathSelector;
117
118/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
119/// the transport, as well as off-chain ticket manipulation.
120pub struct HoprTransport<Db, R> {
121    me: OffchainKeypair,
122    me_peerid: PeerId, // Cache to avoid an expensive conversion: OffchainPublicKey -> PeerId
123    me_address: Address,
124    cfg: HoprTransportConfig,
125    db: Db,
126    resolver: R,
127    ping: Arc<OnceLock<Pinger>>,
128    network: Arc<Network<Db>>,
129    path_planner: PathPlanner<Db, R, CurrentPathSelector>,
130    my_multiaddresses: Vec<Multiaddr>,
131    smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>,
132}
133
134impl<Db, R> HoprTransport<Db, R>
135where
136    Db: HoprDbTicketOperations + HoprDbPeersOperations + HoprDbProtocolOperations + Clone + Send + Sync + 'static,
137    R: ChainReadChannelOperations
138        + ChainReadAccountOperations
139        + ChainKeyOperations
140        + ChainValues
141        + Clone
142        + Send
143        + Sync
144        + 'static,
145{
146    pub fn new(
147        me: &OffchainKeypair,
148        me_onchain: &ChainKeypair,
149        cfg: HoprTransportConfig,
150        db: Db,
151        resolver: R,
152        my_multiaddresses: Vec<Multiaddr>,
153    ) -> Self {
154        let me_peerid: PeerId = me.into();
155        let me_chain_addr = me_onchain.public().to_address();
156
157        Self {
158            me: me.clone(),
159            me_peerid,
160            me_address: me_chain_addr,
161            ping: Arc::new(OnceLock::new()),
162            network: Arc::new(Network::new(
163                me_peerid,
164                my_multiaddresses.clone(),
165                cfg.network.clone(),
166                db.clone(),
167            )),
168            path_planner: PathPlanner::new(
169                me_chain_addr,
170                db.clone(),
171                resolver.clone(),
172                CurrentPathSelector::default(),
173            ),
174            my_multiaddresses,
175            smgr: SessionManager::new(SessionManagerConfig {
176                // TODO(v3.1): Use the entire range of tags properly
177                session_tag_range: (16..65535),
178                maximum_sessions: cfg.session.maximum_sessions as usize,
179                frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
180                    .ok()
181                    .and_then(|s| s.parse::<usize>().ok())
182                    .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
183                    .max(ApplicationData::PAYLOAD_SIZE),
184                max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
185                    .ok()
186                    .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
187                    .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
188                    .max(Duration::from_millis(100)),
189                initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
190                idle_timeout: cfg.session.idle_timeout,
191                balancer_sampling_interval: cfg.session.balancer_sampling_interval,
192                initial_return_session_egress_rate: 10,
193                minimum_surb_buffer_duration: Duration::from_secs(5),
194                maximum_surb_buffer_size: db.get_surb_config().rb_capacity,
195                // Allow a 10% increase of the target SURB buffer on incoming Sessions
196                // if the SURB buffer level has surpassed it by at least 10% in the last 2 minutes.
197                growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
198            }),
199            db,
200            resolver,
201            cfg,
202        }
203    }
204
205    /// Execute all processes of the [`HoprTransport`] object.
206    ///
207    /// This method will spawn the [`crate::HoprTransportProcess::Heartbeat`],
208    /// [`crate::HoprTransportProcess::BloomFilterSave`], [`crate::HoprTransportProcess::Swarm`] and session-related
209    /// processes and return join handles to the calling function. These processes are not started immediately but
210    /// are waiting for a trigger from this piece of code.
211    #[allow(clippy::too_many_arguments)]
212    pub async fn run<S, Ct>(
213        &self,
214        cover_traffic: Option<Ct>,
215        discovery_updates: S,
216        on_incoming_session: Sender<IncomingSession>,
217    ) -> errors::Result<(
218        HoprSocket<
219            futures::channel::mpsc::Receiver<ApplicationDataIn>,
220            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
221        >,
222        AbortableList<HoprTransportProcess>,
223    )>
224    where
225        S: futures::Stream<Item = PeerDiscovery> + Send + 'static,
226        Ct: TrafficGeneration + Send + Sync + 'static,
227    {
228        info!("Loading initial peers from the chain");
229        let public_nodes = self
230            .resolver
231            .stream_accounts(AccountSelector {
232                public_only: true,
233                ..Default::default()
234            })
235            .await
236            .map_err(|e| HoprTransportError::Other(e.into()))?
237            .collect::<Vec<_>>()
238            .await;
239
240        // Calculate the minimum capacity based on public nodes (each node can generate 2 messages)
241        // plus 100 as additional buffer
242        let minimum_capacity = public_nodes.len().saturating_mul(2).saturating_add(100);
243
244        let internal_discovery_updates_capacity = std::env::var("HOPR_INTERNAL_DISCOVERY_UPDATES_CAPACITY")
245            .ok()
246            .and_then(|s| s.trim().parse::<usize>().ok())
247            .filter(|&c| c > 0)
248            .unwrap_or(2048)
249            .max(minimum_capacity);
250
251        debug!(
252            capacity = internal_discovery_updates_capacity,
253            minimum_required = minimum_capacity,
254            "Creating internal discovery updates channel"
255        );
256        let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
257            futures::channel::mpsc::channel::<PeerDiscovery>(internal_discovery_updates_capacity);
258
259        let me_peerid = self.me_peerid;
260        let network = self.network.clone();
261        let discovery_updates = futures_concurrency::stream::StreamExt::merge(
262            discovery_updates,
263            internal_discovery_update_rx,
264        )
265        .filter_map(move |event| {
266            let network = network.clone();
267            async move {
268                match event {
269                    PeerDiscovery::Announce(peer, multiaddresses) => {
270                        debug!(%peer, ?multiaddresses, "processing peer discovery event: Announce");
271                        if peer != me_peerid {
272                            // decapsulate the `p2p/<peer_id>` to remove duplicities
273                            let mas = multiaddresses
274                                .into_iter()
275                                .map(|ma| strip_p2p_protocol(&ma))
276                                .filter(|v| !v.is_empty())
277                                .collect::<Vec<_>>();
278
279                            if !mas.is_empty() {
280                                if let Err(error) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await {
281                                    error!(%peer, %error, "failed to add peer to the network");
282                                    None
283                                } else {
284                                    Some(PeerDiscovery::Announce(peer, mas))
285                                }
286                            } else {
287                                None
288                            }
289                        } else {
290                            None
291                        }
292                    }
293                    _ => None,
294                }
295            }
296        });
297
298        info!(
299            public_nodes = public_nodes.len(),
300            "Initializing swarm with peers from chain"
301        );
302
303        for node_entry in public_nodes {
304            if let AccountType::Announced(multiaddresses) = node_entry.entry_type {
305                let peer: PeerId = node_entry.public_key.into();
306
307                debug!(%peer, ?multiaddresses, "Using initial public node");
308
309                internal_discovery_update_tx
310                    .send(PeerDiscovery::Announce(peer, multiaddresses))
311                    .await
312                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
313            }
314        }
315
316        let mut processes = AbortableList::<HoprTransportProcess>::default();
317
318        let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
319            channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
320
321        // -- transport medium
322        let mixer_cfg = build_mixer_cfg_from_env();
323
324        #[cfg(feature = "mixer-channel")]
325        let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
326
327        #[cfg(feature = "mixer-stream")]
328        let (mixing_channel_tx, mixing_channel_rx) = {
329            let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
330            let rx = rx.then_concurrent(move |v| {
331                let cfg = mixer_cfg;
332
333                async move {
334                    let random_delay = cfg.random_delay();
335                    trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
336
337                    #[cfg(all(feature = "prometheus", not(test)))]
338                    hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
339
340                    sleep(random_delay).await;
341
342                    #[cfg(all(feature = "prometheus", not(test)))]
343                    {
344                        hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
345
346                        let weight = 1.0f64 / cfg.metric_delay_window as f64;
347                        hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
348                            (weight * random_delay.as_millis() as f64)
349                                + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
350                        );
351                    }
352
353                    v
354                }
355            });
356
357            (tx, rx)
358        };
359
360        let transport_layer = HoprSwarm::new(
361            (&self.me).into(),
362            discovery_updates,
363            self.my_multiaddresses.clone(),
364            self.cfg.transport.prefer_local_addresses,
365        )
366        .await;
367
368        let msg_proto_control =
369            transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
370        let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
371        let (wire_msg_tx, wire_msg_rx) =
372            hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
373
374        let _mixing_process_before_sending_out = hopr_async_runtime::prelude::spawn(
375            mixing_channel_rx
376                .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
377                .map(Ok)
378                .forward(wire_msg_tx)
379                .inspect(|_| {
380                    tracing::warn!(
381                        task = "mixer -> egress process",
382                        "long-running background task finished"
383                    )
384                }),
385        );
386
387        let (transport_events_tx, transport_events_rx) =
388            futures::channel::mpsc::channel::<hopr_transport_p2p::DiscoveryEvent>(2048);
389
390        let network_clone = self.network.clone();
391        spawn(
392            transport_events_rx
393                .for_each(move |event| {
394                    let network = network_clone.clone();
395
396                    async move {
397                        match event {
398                            hopr_transport_p2p::DiscoveryEvent::IncomingConnection(peer, multiaddr) => {
399                                if let Err(error) = network
400                                    .add(&peer, PeerOrigin::IncomingConnection, vec![multiaddr])
401                                    .await
402                                {
403                                    tracing::error!(%peer, %error, "Failed to add incoming connection peer");
404                                }
405                            }
406                            hopr_transport_p2p::DiscoveryEvent::FailedDial(peer) => {
407                                if let Err(error) = network
408                                    .update(&peer, Err(hopr_transport_network::network::UpdateFailure::DialFailure))
409                                    .await
410                                {
411                                    tracing::error!(%peer, %error, "Failed to update peer status after failed dial");
412                                }
413                            }
414                        }
415                    }
416                })
417                .inspect(|_| {
418                    tracing::warn!(
419                        task = "transport events recording",
420                        "long-running background task finished"
421                    )
422                }),
423        );
424
425        processes.insert(
426            HoprTransportProcess::Medium,
427            spawn_as_abortable!(transport_layer.run(transport_events_tx).inspect(|_| tracing::warn!(
428                task = %HoprTransportProcess::Medium,
429                "long-running background task finished"
430            ))),
431        );
432
433        // -- msg-ack protocol over the wire transport
434        let packet_cfg = PacketInteractionConfig {
435            packet_keypair: self.me.clone(),
436            outgoing_ticket_win_prob: self
437                .cfg
438                .protocol
439                .outgoing_ticket_winning_prob
440                .map(WinningProbability::try_from)
441                .transpose()?,
442            outgoing_ticket_price: self.cfg.protocol.outgoing_ticket_price,
443        };
444
445        let msg_protocol_bidirectional_channel_capacity =
446            std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
447                .ok()
448                .and_then(|s| s.trim().parse::<usize>().ok())
449                .filter(|&c| c > 0)
450                .unwrap_or(16_384);
451
452        let (on_incoming_data_tx, on_incoming_data_rx) =
453            channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
454
455        debug!(
456            capacity = msg_protocol_bidirectional_channel_capacity,
457            "Creating protocol bidirectional channel"
458        );
459        let (tx_from_protocol, rx_from_protocol) =
460            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
461
462        // We have to resolve DestinationRouting -> ResolvedTransportRouting before
463        // sending the external packets to the transport pipeline.
464        let path_planner = self.path_planner.clone();
465        let distress_threshold = self.db.get_surb_config().distress_threshold;
466        let all_resolved_external_msg_rx = unresolved_routing_msg_rx.filter_map(move |(unresolved, mut data)| {
467            let path_planner = path_planner.clone();
468            async move {
469                trace!(?unresolved, "resolving routing for packet");
470                match path_planner
471                    .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
472                    .await
473                {
474                    Ok((resolved, rem_surbs)) => {
475                        // Set the SURB distress/out-of-SURBs flag if applicable.
476                        // These flags are translated into HOPR protocol packet signals and are
477                        // applicable only on the return path.
478                        let mut signals_to_dst = data
479                            .packet_info
480                            .as_ref()
481                            .map(|info| info.signals_to_destination)
482                            .unwrap_or_default();
483
484                        if resolved.is_return() {
485                            signals_to_dst = match rem_surbs {
486                                Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
487                                    signals_to_dst | PacketSignal::SurbDistress
488                                }
489                                Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
490                                _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
491                            };
492                        } else {
493                            // Unset these flags as they make no sense on the forward path.
494                            signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
495                        }
496
497                        data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
498                        trace!(?resolved, "resolved routing for packet");
499                        Some((resolved, data))
500                    }
501                    Err(error) => {
502                        error!(%error, "failed to resolve routing");
503                        None
504                    }
505                }
506            }
507            .in_current_span()
508        });
509
510        for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
511            packet_cfg,
512            self.db.clone(),
513            self.resolver.clone(),
514            (
515                mixing_channel_tx.with(|(peer, msg): (PeerId, Box<[u8]>)| {
516                    trace!(%peer, "sending message to peer");
517                    futures::future::ok::<_, hopr_transport_mixer::channel::SenderError>((peer, msg))
518                }),
519                wire_msg_rx.inspect(|(peer, _)| trace!(%peer, "received message from peer")),
520            ),
521            (tx_from_protocol, all_resolved_external_msg_rx),
522        )
523        .await
524        .into_iter()
525        {
526            processes.insert(HoprTransportProcess::Protocol(k), v);
527        }
528
529        // -- network probing
530        debug!(
531            capacity = msg_protocol_bidirectional_channel_capacity,
532            note = "same as protocol bidirectional",
533            "Creating probing channel"
534        );
535
536        let (tx_from_probing, rx_from_probing) =
537            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
538
539        let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
540            .ok()
541            .and_then(|s| s.trim().parse::<usize>().ok())
542            .filter(|&c| c > 0)
543            .unwrap_or(128);
544        debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
545        let (manual_ping_tx, manual_ping_rx) = channel::<(PeerId, PingQueryReplier)>(manual_ping_channel_capacity);
546
547        let probe = Probe::new(self.cfg.probe);
548        let probing_processes = if let Some(ct) = cover_traffic {
549            probe
550                .continuously_scan(
551                    (unresolved_routing_msg_tx.clone(), rx_from_protocol),
552                    manual_ping_rx,
553                    tx_from_probing,
554                    ct,
555                )
556                .await
557        } else {
558            probe
559                .continuously_scan(
560                    (unresolved_routing_msg_tx.clone(), rx_from_protocol),
561                    manual_ping_rx,
562                    tx_from_probing,
563                    ImmediateNeighborProber::new(
564                        self.cfg.probe,
565                        network_notifier::ProbeNetworkInteractions::new(self.network.clone()),
566                    ),
567                )
568                .await
569        };
570
571        for (k, v) in probing_processes.into_iter() {
572            processes.insert(HoprTransportProcess::Probing(k), v);
573        }
574
575        // manual ping
576        self.ping
577            .clone()
578            .set(Pinger::new(
579                PingConfig {
580                    timeout: self.cfg.probe.timeout,
581                },
582                manual_ping_tx,
583            ))
584            .expect("must set the ticket aggregation writer only once");
585
586        // -- session management
587        self.smgr
588            .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
589            .expect("failed to start session manager")
590            .into_iter()
591            .enumerate()
592            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
593            .for_each(|(k, v)| {
594                processes.insert(k, v);
595            });
596
597        let smgr = self.smgr.clone();
598        processes.insert(
599            HoprTransportProcess::SessionsManagement(0),
600            spawn_as_abortable!(
601                StreamExt::filter_map(rx_from_probing, move |(pseudonym, data)| {
602                    let smgr = smgr.clone();
603                    async move {
604                        match smgr.dispatch_message(pseudonym, data).await {
605                            Ok(DispatchResult::Processed) => {
606                                trace!("message dispatch completed");
607                                None
608                            }
609                            Ok(DispatchResult::Unrelated(data)) => {
610                                trace!("unrelated message dispatch completed");
611                                Some(data)
612                            }
613                            Err(e) => {
614                                error!(error = %e, "error while processing packet");
615                                None
616                            }
617                        }
618                    }
619                })
620                .map(Ok)
621                .forward(on_incoming_data_tx)
622                .inspect(|_| tracing::warn!(
623                    task = %HoprTransportProcess::SessionsManagement(0),
624                    "long-running background task finished"
625                ))
626            ),
627        );
628
629        Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
630    }
631
632    #[tracing::instrument(level = "debug", skip(self))]
633    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
634        if peer == &self.me_peerid {
635            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
636        }
637
638        let pinger = self
639            .ping
640            .get()
641            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
642
643        if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
644            error!(error = %e, "Failed to store the peer observation");
645        }
646
647        let latency = (*pinger).ping(*peer).await?;
648
649        let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::Probe(
650            hopr_transport_probe::errors::ProbeError::NonExistingPeer,
651        ))?;
652
653        Ok((latency, peer_status))
654    }
655
656    #[tracing::instrument(level = "debug", skip(self))]
657    pub async fn new_session(
658        &self,
659        destination: Address,
660        target: SessionTarget,
661        cfg: SessionClientConfig,
662    ) -> errors::Result<HoprSession> {
663        Ok(self.smgr.new_session(destination, target, cfg).await?)
664    }
665
666    #[tracing::instrument(level = "debug", skip(self))]
667    pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
668        Ok(self.smgr.ping_session(id).await?)
669    }
670
671    pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
672        Ok(self.smgr.get_surb_balancer_config(id).await?)
673    }
674
675    pub async fn update_session_surb_balancing_cfg(
676        &self,
677        id: &SessionId,
678        cfg: SurbBalancerConfig,
679    ) -> errors::Result<()> {
680        Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
681    }
682
683    #[tracing::instrument(level = "debug", skip(self))]
684    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
685        self.network
686            .get(&self.me_peerid)
687            .await
688            .unwrap_or_else(|e| {
689                error!(error = %e, "failed to obtain listening multi-addresses");
690                None
691            })
692            .map(|peer| peer.multiaddresses)
693            .unwrap_or_default()
694    }
695
696    #[tracing::instrument(level = "debug", skip(self))]
697    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
698        let mut mas = self
699            .local_multiaddresses()
700            .into_iter()
701            .filter(|ma| {
702                hopr_transport_identity::multiaddrs::is_supported(ma)
703                    && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
704            })
705            .map(|ma| strip_p2p_protocol(&ma))
706            .filter(|v| !v.is_empty())
707            .collect::<Vec<_>>();
708
709        mas.sort_by(|l, r| {
710            let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
711            let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
712
713            if !(is_left_dns ^ is_right_dns) {
714                std::cmp::Ordering::Equal
715            } else if is_left_dns {
716                std::cmp::Ordering::Less
717            } else {
718                std::cmp::Ordering::Greater
719            }
720        });
721
722        mas
723    }
724
725    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
726        self.my_multiaddresses.clone()
727    }
728
729    #[tracing::instrument(level = "debug", skip(self))]
730    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
731        self.network
732            .get(peer)
733            .await
734            .unwrap_or(None)
735            .map(|peer| peer.multiaddresses)
736            .unwrap_or(vec![])
737    }
738
739    #[tracing::instrument(level = "debug", skip(self))]
740    pub async fn network_health(&self) -> Health {
741        self.network.health().await
742    }
743
744    #[tracing::instrument(level = "debug", skip(self))]
745    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
746        Ok(self.network.connected_peers().await?)
747    }
748
749    #[tracing::instrument(level = "debug", skip(self))]
750    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
751        Ok(self.network.get(peer).await?)
752    }
753
754    #[tracing::instrument(level = "debug", skip(self))]
755    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
756        let ticket_stats = self
757            .db
758            .get_ticket_statistics(None)
759            .await
760            .map_err(|e| HoprTransportError::Other(e.into()))?;
761
762        Ok(TicketStatistics {
763            winning_count: ticket_stats.winning_tickets,
764            unredeemed_value: ticket_stats.unredeemed_value,
765            redeemed_value: ticket_stats.redeemed_value(),
766            neglected_value: ticket_stats.neglected_value(),
767            rejected_value: ticket_stats.rejected_value(),
768        })
769    }
770
771    #[tracing::instrument(level = "debug", skip(self))]
772    pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<RedeemableTicket>>> {
773        if let Some(channel) = self
774            .resolver
775            .channel_by_id(channel_id)
776            .await
777            .map_err(|e| HoprTransportError::Other(e.into()))?
778        {
779            if channel.destination == self.me_address {
780                Ok(Some(
781                    self.db
782                        .stream_tickets([&channel])
783                        .await
784                        .map_err(|e| HoprTransportError::Other(e.into()))?
785                        .collect()
786                        .await,
787                ))
788            } else {
789                Ok(None)
790            }
791        } else {
792            Ok(None)
793        }
794    }
795
796    #[tracing::instrument(level = "debug", skip(self))]
797    pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
798        Ok(self
799            .db
800            .stream_tickets(None::<TicketSelector>)
801            .await
802            .map_err(|e| HoprTransportError::Other(e.into()))?
803            .map(|v| v.ticket)
804            .collect()
805            .await)
806    }
807}
808
809fn build_mixer_cfg_from_env() -> MixerConfig {
810    let mixer_cfg = MixerConfig {
811        min_delay: std::time::Duration::from_millis(
812            std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
813                .map(|v| {
814                    v.trim()
815                        .parse::<u64>()
816                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
817                })
818                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
819        ),
820        delay_range: std::time::Duration::from_millis(
821            std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
822                .map(|v| {
823                    v.trim()
824                        .parse::<u64>()
825                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
826                })
827                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
828        ),
829        capacity: {
830            let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
831                .ok()
832                .and_then(|s| s.trim().parse::<usize>().ok())
833                .filter(|&c| c > 0)
834                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
835            debug!(capacity = capacity, "Setting mixer capacity");
836            capacity
837        },
838        ..MixerConfig::default()
839    };
840    debug!(?mixer_cfg, "Mixer configuration");
841
842    mixer_cfg
843}