hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! See also the `hopr_protocol_start` crate for details on Start sub-protocol which initiates a Session.
6//!
7//! As such, the transport layer components should be only those that are directly necessary to:
8//!
9//! 1. send and receive a packet, acknowledgement or ticket aggregation request
10//! 2. send and receive a network telemetry request
11//! 3. automate transport level processes
12//! 4. algorithms associated with the transport layer operational management
13//! 5. interface specifications to allow modular behavioral extensions
14
15/// Configuration of the [crate::HoprTransport].
16pub mod config;
17/// Constants used and exposed by the crate.
18pub mod constants;
19/// Errors used by the crate.
20pub mod errors;
21pub mod helpers;
22pub mod network_notifier;
23
24pub mod socket;
25
26use std::{
27    collections::{HashMap, HashSet},
28    sync::{Arc, OnceLock},
29    time::Duration,
30};
31
32use async_lock::RwLock;
33use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
34use futures::{
35    FutureExt, SinkExt, StreamExt,
36    channel::mpsc::{Sender, channel},
37};
38use futures_concurrency::stream::StreamExt as ConcurrentStreamExt;
39use helpers::PathPlanner;
40use hopr_api::{
41    chain::{AccountSelector, ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
42    db::{HoprDbPeersOperations, HoprDbProtocolOperations, HoprDbTicketOperations, PeerOrigin, PeerStatus},
43};
44use hopr_async_runtime::{AbortHandle, prelude::spawn, spawn_as_abortable};
45use hopr_crypto_packet::prelude::PacketSignal;
46pub use hopr_crypto_types::{
47    keypairs::{ChainKeypair, Keypair, OffchainKeypair},
48    types::{HalfKeyChallenge, Hash, OffchainPublicKey},
49};
50pub use hopr_internal_types::prelude::HoprPseudonym;
51use hopr_internal_types::prelude::*;
52pub use hopr_network_types::prelude::RoutingOptions;
53use hopr_network_types::prelude::{DestinationRouting, ResolvedTransportRouting};
54use hopr_path::selectors::dfs::{DfsPathSelector, DfsPathSelectorConfig, RandomizedEdgeWeighting};
55use hopr_primitive_types::prelude::*;
56pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
57use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
58pub use hopr_transport_identity::{Multiaddr, PeerId};
59use hopr_transport_mixer::MixerConfig;
60pub use hopr_transport_network::network::{Health, Network};
61use hopr_transport_p2p::HoprSwarm;
62use hopr_transport_probe::{
63    DbProxy, Probe,
64    ping::{PingConfig, Pinger},
65};
66pub use hopr_transport_probe::{errors::ProbeError, ping::PingQueryReplier};
67use hopr_transport_protocol::processor::PacketInteractionConfig;
68pub use hopr_transport_protocol::{PeerDiscovery, execute_on_tick};
69pub use hopr_transport_session as session;
70#[cfg(feature = "runtime-tokio")]
71pub use hopr_transport_session::transfer_session;
72pub use hopr_transport_session::{
73    Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
74    SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
75    errors::{SessionManagerError, TransportSessionError},
76};
77use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
78use rand::seq::SliceRandom;
79#[cfg(feature = "mixer-stream")]
80use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
81use tracing::{Instrument, debug, error, info, trace, warn};
82
83pub use crate::{
84    config::HoprTransportConfig,
85    helpers::{PeerEligibility, TicketStatistics},
86};
87use crate::{constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, socket::HoprSocket};
88
89pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
90
91#[cfg(any(
92    all(feature = "mixer-channel", feature = "mixer-stream"),
93    all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
94))]
95compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
96
97// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
98lazy_static::lazy_static! {
99    static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
100}
101
102#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
103pub enum HoprTransportProcess {
104    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
105    Medium,
106    #[strum(to_string = "HOPR protocol ({0})")]
107    Protocol(hopr_transport_protocol::ProtocolProcesses),
108    #[strum(to_string = "session manager sub-process #{0}")]
109    SessionsManagement(usize),
110    #[strum(to_string = "network probing sub-process: {0}")]
111    Probing(hopr_transport_probe::HoprProbeProcess),
112}
113
114/// Currently used implementation of [`PathSelector`](hopr_path::selectors::PathSelector).
115type CurrentPathSelector = DfsPathSelector<RandomizedEdgeWeighting>;
116
117/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
118/// the transport, as well as off-chain ticket manipulation.
119pub struct HoprTransport<Db, R> {
120    me: OffchainKeypair,
121    me_peerid: PeerId, // Cache to avoid an expensive conversion: OffchainPublicKey -> PeerId
122    me_address: Address,
123    cfg: HoprTransportConfig,
124    db: Db,
125    resolver: R,
126    ping: Arc<OnceLock<Pinger>>,
127    network: Arc<Network<Db>>,
128    path_planner: PathPlanner<Db, R, CurrentPathSelector>,
129    my_multiaddresses: Vec<Multiaddr>,
130    smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>,
131}
132
133impl<Db, R> HoprTransport<Db, R>
134where
135    Db: HoprDbTicketOperations + HoprDbPeersOperations + HoprDbProtocolOperations + Clone + Send + Sync + 'static,
136    R: ChainReadChannelOperations
137        + ChainReadAccountOperations
138        + ChainKeyOperations
139        + ChainValues
140        + Clone
141        + Send
142        + Sync
143        + 'static,
144{
145    pub fn new(
146        me: &OffchainKeypair,
147        me_onchain: &ChainKeypair,
148        cfg: HoprTransportConfig,
149        db: Db,
150        resolver: R,
151        channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
152        my_multiaddresses: Vec<Multiaddr>,
153    ) -> Self {
154        let me_peerid: PeerId = me.into();
155        let me_chain_addr = me_onchain.public().to_address();
156
157        Self {
158            me: me.clone(),
159            me_peerid,
160            me_address: me_chain_addr,
161            ping: Arc::new(OnceLock::new()),
162            network: Arc::new(Network::new(
163                me_peerid,
164                my_multiaddresses.clone(),
165                cfg.network.clone(),
166                db.clone(),
167            )),
168            path_planner: PathPlanner::new(
169                me_chain_addr,
170                db.clone(),
171                resolver.clone(),
172                CurrentPathSelector::new(
173                    channel_graph.clone(),
174                    DfsPathSelectorConfig {
175                        node_score_threshold: cfg.network.node_score_auto_path_threshold,
176                        max_first_hop_latency: cfg.network.max_first_hop_latency_threshold,
177                        ..Default::default()
178                    },
179                ),
180                channel_graph.clone(),
181            ),
182            my_multiaddresses,
183            smgr: SessionManager::new(SessionManagerConfig {
184                // TODO(v3.1): Use the entire range of tags properly
185                session_tag_range: (16..65535),
186                maximum_sessions: cfg.session.maximum_sessions as usize,
187                frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
188                    .ok()
189                    .and_then(|s| s.parse::<usize>().ok())
190                    .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
191                    .max(ApplicationData::PAYLOAD_SIZE),
192                max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
193                    .ok()
194                    .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
195                    .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
196                    .max(Duration::from_millis(100)),
197                initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
198                idle_timeout: cfg.session.idle_timeout,
199                balancer_sampling_interval: cfg.session.balancer_sampling_interval,
200                initial_return_session_egress_rate: 10,
201                minimum_surb_buffer_duration: Duration::from_secs(5),
202                maximum_surb_buffer_size: db.get_surb_config().rb_capacity,
203                // Allow a 10% increase of the target SURB buffer on incoming Sessions
204                // if the SURB buffer level has surpassed it by at least 10% in the last 2 minutes.
205                growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
206            }),
207            db,
208            resolver,
209            cfg,
210        }
211    }
212
213    /// Execute all processes of the [`crate::HoprTransport`] object.
214    ///
215    /// This method will spawn the [`crate::HoprTransportProcess::Heartbeat`],
216    /// [`crate::HoprTransportProcess::BloomFilterSave`], [`crate::HoprTransportProcess::Swarm`] and session-related
217    /// processes and return join handles to the calling function. These processes are not started immediately but
218    /// are waiting for a trigger from this piece of code.
219    #[allow(clippy::too_many_arguments)]
220    pub async fn run<S>(
221        &self,
222        discovery_updates: S,
223        on_incoming_session: Sender<IncomingSession>,
224    ) -> crate::errors::Result<(
225        HoprSocket<
226            futures::channel::mpsc::Receiver<ApplicationDataIn>,
227            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
228        >,
229        HashMap<HoprTransportProcess, AbortHandle>,
230    )>
231    where
232        S: futures::Stream<Item = PeerDiscovery> + Send + 'static,
233    {
234        info!("Loading initial peers from the chain");
235        let public_nodes = self
236            .resolver
237            .stream_accounts(AccountSelector {
238                public_only: true,
239                ..Default::default()
240            })
241            .await
242            .map_err(|e| HoprTransportError::Other(e.into()))?
243            .collect::<Vec<_>>()
244            .await;
245
246        // Calculate the minimum capacity based on public nodes (each node can generate 2 messages)
247        // plus 100 as additional buffer
248        let minimum_capacity = public_nodes.len().saturating_mul(2).saturating_add(100);
249
250        let internal_discovery_updates_capacity = std::env::var("HOPR_INTERNAL_DISCOVERY_UPDATES_CAPACITY")
251            .ok()
252            .and_then(|s| s.trim().parse::<usize>().ok())
253            .filter(|&c| c > 0)
254            .unwrap_or(2048)
255            .max(minimum_capacity);
256
257        debug!(
258            capacity = internal_discovery_updates_capacity,
259            minimum_required = minimum_capacity,
260            "Creating internal discovery updates channel"
261        );
262        let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
263            futures::channel::mpsc::channel::<PeerDiscovery>(internal_discovery_updates_capacity);
264
265        let me_peerid = self.me_peerid;
266        let network = self.network.clone();
267        let discovery_updates = futures_concurrency::stream::StreamExt::merge(
268            discovery_updates,
269            internal_discovery_update_rx,
270        )
271        .filter_map(move |event| {
272            let network = network.clone();
273            async move {
274                match event {
275                    PeerDiscovery::Announce(peer, multiaddresses) => {
276                        debug!(%peer, ?multiaddresses, "processing peer discovery event: Announce");
277                        if peer != me_peerid {
278                            // decapsulate the `p2p/<peer_id>` to remove duplicities
279                            let mas = multiaddresses
280                                .into_iter()
281                                .map(|ma| strip_p2p_protocol(&ma))
282                                .filter(|v| !v.is_empty())
283                                .collect::<Vec<_>>();
284
285                            if !mas.is_empty() {
286                                if let Err(error) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await {
287                                    error!(%peer, %error, "failed to add peer to the network");
288                                    None
289                                } else {
290                                    Some(PeerDiscovery::Announce(peer, mas))
291                                }
292                            } else {
293                                None
294                            }
295                        } else {
296                            None
297                        }
298                    }
299                    _ => None,
300                }
301            }
302        });
303
304        info!(
305            public_nodes = public_nodes.len(),
306            "Initializing swarm with peers from chain"
307        );
308
309        let mut addresses: HashSet<Multiaddr> = HashSet::new();
310        for node_entry in public_nodes {
311            if let AccountType::Announced { multiaddr, .. } = node_entry.entry_type {
312                let peer: PeerId = node_entry.public_key.into();
313                let multiaddresses = vec![multiaddr];
314
315                debug!(%peer, ?multiaddresses, "Using initial public node");
316                addresses.extend(multiaddresses.clone());
317
318                internal_discovery_update_tx
319                    .send(PeerDiscovery::Announce(peer, multiaddresses.clone()))
320                    .await
321                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
322            }
323        }
324
325        let mut processes: HashMap<HoprTransportProcess, AbortHandle> = HashMap::new();
326
327        let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
328            channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
329
330        let (resolved_routing_msg_tx, resolved_routing_msg_rx) =
331            channel::<(ResolvedTransportRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
332
333        // -- transport medium
334        let mixer_cfg = build_mixer_cfg_from_env();
335
336        #[cfg(feature = "mixer-channel")]
337        let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
338
339        #[cfg(feature = "mixer-stream")]
340        let (mixing_channel_tx, mixing_channel_rx) = {
341            let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
342            let rx = rx.then_concurrent(move |v| {
343                let cfg = mixer_cfg;
344
345                async move {
346                    let random_delay = cfg.random_delay();
347                    trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
348
349                    #[cfg(all(feature = "prometheus", not(test)))]
350                    hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
351
352                    sleep(random_delay).await;
353
354                    #[cfg(all(feature = "prometheus", not(test)))]
355                    {
356                        hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
357
358                        let weight = 1.0f64 / cfg.metric_delay_window as f64;
359                        hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
360                            (weight * random_delay.as_millis() as f64)
361                                + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
362                        );
363                    }
364
365                    v
366                }
367            });
368
369            (tx, rx)
370        };
371
372        let mut transport_layer =
373            HoprSwarm::new((&self.me).into(), discovery_updates, self.my_multiaddresses.clone()).await;
374
375        if let Some(port) = self.cfg.protocol.autonat_port {
376            transport_layer.run_nat_server(port);
377        }
378
379        if addresses.is_empty() {
380            warn!("No addresses found in the database, not dialing any NAT servers");
381        } else {
382            info!(num_addresses = addresses.len(), "Found addresses from the database");
383            let mut randomized_addresses: Vec<_> = addresses.into_iter().collect();
384            randomized_addresses.shuffle(&mut rand::thread_rng());
385            transport_layer.dial_nat_server(randomized_addresses);
386        }
387
388        let msg_proto_control =
389            transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
390        let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
391        let (wire_msg_tx, wire_msg_rx) =
392            hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
393
394        let _mixing_process_before_sending_out = hopr_async_runtime::prelude::spawn(
395            mixing_channel_rx
396                .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
397                .map(Ok)
398                .forward(wire_msg_tx)
399                .inspect(|_| {
400                    tracing::warn!(
401                        task = "mixer -> egress process",
402                        "long-running background task finished"
403                    )
404                }),
405        );
406
407        let (transport_events_tx, transport_events_rx) =
408            futures::channel::mpsc::channel::<hopr_transport_p2p::DiscoveryEvent>(2048);
409
410        let network_clone = self.network.clone();
411        spawn(
412            transport_events_rx
413                .for_each(move |event| {
414                    let network = network_clone.clone();
415
416                    async move {
417                        match event {
418                            hopr_transport_p2p::DiscoveryEvent::IncomingConnection(peer, multiaddr) => {
419                                if let Err(error) = network
420                                    .add(&peer, PeerOrigin::IncomingConnection, vec![multiaddr])
421                                    .await
422                                {
423                                    tracing::error!(%peer, %error, "Failed to add incoming connection peer");
424                                }
425                            }
426                            hopr_transport_p2p::DiscoveryEvent::FailedDial(peer) => {
427                                if let Err(error) = network
428                                    .update(&peer, Err(hopr_transport_network::network::UpdateFailure::DialFailure))
429                                    .await
430                                {
431                                    tracing::error!(%peer, %error, "Failed to update peer status after failed dial");
432                                }
433                            }
434                        }
435                    }
436                })
437                .inspect(|_| {
438                    tracing::warn!(
439                        task = "transport events recording",
440                        "long-running background task finished"
441                    )
442                }),
443        );
444
445        processes.insert(
446            HoprTransportProcess::Medium,
447            spawn_as_abortable!(transport_layer.run(transport_events_tx).inspect(|_| tracing::warn!(
448                task = %HoprTransportProcess::Medium,
449                "long-running background task finished"
450            ))),
451        );
452
453        // -- msg-ack protocol over the wire transport
454        let packet_cfg = PacketInteractionConfig {
455            packet_keypair: self.me.clone(),
456            outgoing_ticket_win_prob: self
457                .cfg
458                .protocol
459                .outgoing_ticket_winning_prob
460                .map(WinningProbability::try_from)
461                .transpose()?,
462            outgoing_ticket_price: self.cfg.protocol.outgoing_ticket_price,
463        };
464
465        let msg_protocol_bidirectional_channel_capacity =
466            std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
467                .ok()
468                .and_then(|s| s.trim().parse::<usize>().ok())
469                .filter(|&c| c > 0)
470                .unwrap_or(16_384);
471
472        let (on_incoming_data_tx, on_incoming_data_rx) =
473            channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
474
475        debug!(
476            capacity = msg_protocol_bidirectional_channel_capacity,
477            "Creating protocol bidirectional channel"
478        );
479        let (tx_from_protocol, rx_from_protocol) =
480            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
481
482        // We have to resolve DestinationRouting -> ResolvedTransportRouting before
483        // sending the external packets to the transport pipeline.
484        let path_planner = self.path_planner.clone();
485        let distress_threshold = self.db.get_surb_config().distress_threshold;
486        let all_resolved_external_msg_rx = unresolved_routing_msg_rx
487            .filter_map(move |(unresolved, mut data)| {
488                let path_planner = path_planner.clone();
489                async move {
490                    trace!(?unresolved, "resolving routing for packet");
491                    match path_planner
492                        .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
493                        .await
494                    {
495                        Ok((resolved, rem_surbs)) => {
496                            // Set the SURB distress/out-of-SURBs flag if applicable.
497                            // These flags are translated into HOPR protocol packet signals and are
498                            // applicable only on the return path.
499                            let mut signals_to_dst = data
500                                .packet_info
501                                .as_ref()
502                                .map(|info| info.signals_to_destination)
503                                .unwrap_or_default();
504
505                            if resolved.is_return() {
506                                signals_to_dst = match rem_surbs {
507                                    Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
508                                        signals_to_dst | PacketSignal::SurbDistress
509                                    }
510                                    Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
511                                    _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
512                                };
513                            } else {
514                                // Unset these flags as they make no sense on the forward path.
515                                signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
516                            }
517
518                            data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
519                            trace!(?resolved, "resolved routing for packet");
520                            Some((resolved, data))
521                        }
522                        Err(error) => {
523                            error!(%error, "failed to resolve routing");
524                            None
525                        }
526                    }
527                }
528                .in_current_span()
529            })
530            .merge(resolved_routing_msg_rx);
531
532        for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
533            packet_cfg,
534            self.db.clone(),
535            self.resolver.clone(),
536            (
537                mixing_channel_tx.with(|(peer, msg): (PeerId, Box<[u8]>)| {
538                    trace!(%peer, "sending message to peer");
539                    futures::future::ok::<_, hopr_transport_mixer::channel::SenderError>((peer, msg))
540                }),
541                wire_msg_rx.inspect(|(peer, _)| trace!(%peer, "received message from peer")),
542            ),
543            (tx_from_protocol, all_resolved_external_msg_rx),
544        )
545        .await
546        .into_iter()
547        {
548            processes.insert(HoprTransportProcess::Protocol(k), v);
549        }
550
551        // -- network probing
552        debug!(
553            capacity = msg_protocol_bidirectional_channel_capacity,
554            note = "same as protocol bidirectional",
555            "Creating probing channel"
556        );
557        let (tx_from_probing, rx_from_probing) =
558            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
559
560        let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
561            .ok()
562            .and_then(|s| s.trim().parse::<usize>().ok())
563            .filter(|&c| c > 0)
564            .unwrap_or(128);
565        debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
566        let (manual_ping_tx, manual_ping_rx) = channel::<(PeerId, PingQueryReplier)>(manual_ping_channel_capacity);
567
568        // TODO (Tibor): make Probing accept Sender<(DestinationRouting, ApplicationDataOut)> and remove the
569        // resolved_routing_msg_tx channel completely
570        let probe = Probe::new((*self.me.public(), self.me_address), self.cfg.probe);
571        for (k, v) in probe
572            .continuously_scan(
573                (resolved_routing_msg_tx, rx_from_protocol),
574                manual_ping_rx,
575                network_notifier::ProbeNetworkInteractions::new(
576                    self.network.clone(),
577                    self.resolver.clone(),
578                    self.path_planner.channel_graph(),
579                ),
580                DbProxy::new(self.db.clone(), self.resolver.clone()),
581                tx_from_probing,
582            )
583            .await
584            .into_iter()
585        {
586            processes.insert(HoprTransportProcess::Probing(k), v);
587        }
588
589        // manual ping
590        self.ping
591            .clone()
592            .set(Pinger::new(
593                PingConfig {
594                    timeout: self.cfg.probe.timeout,
595                },
596                manual_ping_tx,
597            ))
598            .expect("must set the ticket aggregation writer only once");
599
600        // -- session management
601        self.smgr
602            .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
603            .expect("failed to start session manager")
604            .into_iter()
605            .enumerate()
606            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
607            .for_each(|(k, v)| {
608                processes.insert(k, v);
609            });
610
611        let smgr = self.smgr.clone();
612        processes.insert(
613            HoprTransportProcess::SessionsManagement(0),
614            spawn_as_abortable!(
615                StreamExt::filter_map(rx_from_probing, move |(pseudonym, data)| {
616                    let smgr = smgr.clone();
617                    async move {
618                        match smgr.dispatch_message(pseudonym, data).await {
619                            Ok(DispatchResult::Processed) => {
620                                trace!("message dispatch completed");
621                                None
622                            }
623                            Ok(DispatchResult::Unrelated(data)) => {
624                                trace!("unrelated message dispatch completed");
625                                Some(data)
626                            }
627                            Err(e) => {
628                                error!(error = %e, "error while processing packet");
629                                None
630                            }
631                        }
632                    }
633                })
634                .map(Ok)
635                .forward(on_incoming_data_tx)
636                .inspect(|_| tracing::warn!(
637                    task = %HoprTransportProcess::SessionsManagement(0),
638                    "long-running background task finished"
639                ))
640            ),
641        );
642
643        Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
644    }
645
646    #[tracing::instrument(level = "debug", skip(self))]
647    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
648        if peer == &self.me_peerid {
649            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
650        }
651
652        let pinger = self
653            .ping
654            .get()
655            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
656
657        if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
658            error!(error = %e, "Failed to store the peer observation");
659        }
660
661        let latency = (*pinger).ping(*peer).await?;
662
663        let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::Probe(
664            hopr_transport_probe::errors::ProbeError::NonExistingPeer,
665        ))?;
666
667        Ok((latency, peer_status))
668    }
669
670    #[tracing::instrument(level = "debug", skip(self))]
671    pub async fn new_session(
672        &self,
673        destination: Address,
674        target: SessionTarget,
675        cfg: SessionClientConfig,
676    ) -> errors::Result<HoprSession> {
677        Ok(self.smgr.new_session(destination, target, cfg).await?)
678    }
679
680    #[tracing::instrument(level = "debug", skip(self))]
681    pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
682        Ok(self.smgr.ping_session(id).await?)
683    }
684
685    pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
686        Ok(self.smgr.get_surb_balancer_config(id).await?)
687    }
688
689    pub async fn update_session_surb_balancing_cfg(
690        &self,
691        id: &SessionId,
692        cfg: SurbBalancerConfig,
693    ) -> errors::Result<()> {
694        Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
695    }
696
697    #[tracing::instrument(level = "debug", skip(self))]
698    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
699        self.network
700            .get(&self.me_peerid)
701            .await
702            .unwrap_or_else(|e| {
703                error!(error = %e, "failed to obtain listening multi-addresses");
704                None
705            })
706            .map(|peer| peer.multiaddresses)
707            .unwrap_or_default()
708    }
709
710    #[tracing::instrument(level = "debug", skip(self))]
711    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
712        let mut mas = self
713            .local_multiaddresses()
714            .into_iter()
715            .filter(|ma| {
716                hopr_transport_identity::multiaddrs::is_supported(ma)
717                    && (self.cfg.transport.announce_local_addresses
718                        || !hopr_transport_identity::multiaddrs::is_private(ma))
719            })
720            .map(|ma| strip_p2p_protocol(&ma))
721            .filter(|v| !v.is_empty())
722            .collect::<Vec<_>>();
723
724        mas.sort_by(|l, r| {
725            let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
726            let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
727
728            if !(is_left_dns ^ is_right_dns) {
729                std::cmp::Ordering::Equal
730            } else if is_left_dns {
731                std::cmp::Ordering::Less
732            } else {
733                std::cmp::Ordering::Greater
734            }
735        });
736
737        mas
738    }
739
740    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
741        self.my_multiaddresses.clone()
742    }
743
744    #[tracing::instrument(level = "debug", skip(self))]
745    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
746        self.network
747            .get(peer)
748            .await
749            .unwrap_or(None)
750            .map(|peer| peer.multiaddresses)
751            .unwrap_or(vec![])
752    }
753
754    #[tracing::instrument(level = "debug", skip(self))]
755    pub async fn network_health(&self) -> Health {
756        self.network.health().await
757    }
758
759    #[tracing::instrument(level = "debug", skip(self))]
760    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
761        Ok(self.network.connected_peers().await?)
762    }
763
764    #[tracing::instrument(level = "debug", skip(self))]
765    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
766        Ok(self.network.get(peer).await?)
767    }
768
769    #[tracing::instrument(level = "debug", skip(self))]
770    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
771        let ticket_stats = self
772            .db
773            .get_ticket_statistics(None)
774            .await
775            .map_err(|e| HoprTransportError::Other(e.into()))?;
776
777        Ok(TicketStatistics {
778            winning_count: ticket_stats.winning_tickets,
779            unredeemed_value: ticket_stats.unredeemed_value,
780            redeemed_value: ticket_stats.redeemed_value,
781            neglected_value: ticket_stats.neglected_value,
782            rejected_value: ticket_stats.rejected_value,
783        })
784    }
785
786    #[tracing::instrument(level = "debug", skip(self))]
787    pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
788        if let Some(channel) = self
789            .resolver
790            .channel_by_id(channel_id)
791            .await
792            .map_err(|e| HoprTransportError::Other(e.into()))?
793        {
794            if channel.destination == self.me_address {
795                Ok(Some(
796                    self.db
797                        .stream_tickets(Some((&channel).into()))
798                        .await
799                        .map_err(|e| HoprTransportError::Other(e.into()))?
800                        .collect()
801                        .await,
802                ))
803            } else {
804                Ok(None)
805            }
806        } else {
807            Ok(None)
808        }
809    }
810
811    #[tracing::instrument(level = "debug", skip(self))]
812    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
813        Ok(self
814            .db
815            .stream_tickets(None)
816            .await
817            .map_err(|e| HoprTransportError::Other(e.into()))?
818            .map(|v| v.ticket.leak())
819            .collect()
820            .await)
821    }
822}
823
824fn build_mixer_cfg_from_env() -> MixerConfig {
825    let mixer_cfg = MixerConfig {
826        min_delay: std::time::Duration::from_millis(
827            std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
828                .map(|v| {
829                    v.trim()
830                        .parse::<u64>()
831                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
832                })
833                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
834        ),
835        delay_range: std::time::Duration::from_millis(
836            std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
837                .map(|v| {
838                    v.trim()
839                        .parse::<u64>()
840                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
841                })
842                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
843        ),
844        capacity: {
845            let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
846                .ok()
847                .and_then(|s| s.trim().parse::<usize>().ok())
848                .filter(|&c| c > 0)
849                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
850            debug!(capacity = capacity, "Setting mixer capacity");
851            capacity
852        },
853        ..MixerConfig::default()
854    };
855    debug!(?mixer_cfg, "Mixer configuration");
856
857    mixer_cfg
858}