hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! See also the `hopr_protocol_start` crate for details on Start sub-protocol which initiates a Session.
6//!
7//! As such, the transport layer components should be only those that are directly necessary to:
8//!
9//! 1. send and receive a packet, acknowledgement or ticket aggregation request
10//! 2. send and receive a network telemetry request
11//! 3. automate transport level processes
12//! 4. algorithms associated with the transport layer operational management
13//! 5. interface specifications to allow modular behavioral extensions
14
15/// Configuration of the [crate::HoprTransport].
16pub mod config;
17/// Constants used and exposed by the crate.
18pub mod constants;
19/// Errors used by the crate.
20pub mod errors;
21pub mod helpers;
22pub mod network_notifier;
23
24pub mod socket;
25
26use std::{
27    collections::HashMap,
28    sync::{Arc, OnceLock},
29    time::Duration,
30};
31
32use async_lock::RwLock;
33use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
34use futures::{
35    FutureExt, SinkExt, StreamExt,
36    channel::mpsc::{Sender, channel},
37};
38use helpers::PathPlanner;
39use hopr_api::{
40    chain::{AccountSelector, ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
41    db::{HoprDbPeersOperations, HoprDbProtocolOperations, HoprDbTicketOperations, PeerOrigin, PeerStatus},
42};
43use hopr_async_runtime::{AbortHandle, prelude::spawn, spawn_as_abortable};
44use hopr_crypto_packet::prelude::PacketSignal;
45pub use hopr_crypto_types::{
46    keypairs::{ChainKeypair, Keypair, OffchainKeypair},
47    types::{HalfKeyChallenge, Hash, OffchainPublicKey},
48};
49pub use hopr_internal_types::prelude::HoprPseudonym;
50use hopr_internal_types::prelude::*;
51use hopr_network_types::prelude::DestinationRouting;
52pub use hopr_network_types::prelude::RoutingOptions;
53use hopr_path::selectors::dfs::{DfsPathSelector, DfsPathSelectorConfig, RandomizedEdgeWeighting};
54use hopr_primitive_types::prelude::*;
55pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
56use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
57pub use hopr_transport_identity::{Multiaddr, PeerId, Protocol};
58use hopr_transport_mixer::MixerConfig;
59pub use hopr_transport_network::network::{Health, Network};
60use hopr_transport_p2p::HoprSwarm;
61use hopr_transport_probe::{
62    Probe,
63    neighbors::ImmediateNeighborProber,
64    ping::{PingConfig, Pinger},
65};
66pub use hopr_transport_probe::{
67    errors::ProbeError,
68    ping::PingQueryReplier,
69    traits::TrafficGeneration,
70    types::{NeighborTelemetry, Telemetry},
71};
72use hopr_transport_protocol::processor::PacketInteractionConfig;
73pub use hopr_transport_protocol::{PeerDiscovery, execute_on_tick};
74pub use hopr_transport_session as session;
75#[cfg(feature = "runtime-tokio")]
76pub use hopr_transport_session::transfer_session;
77pub use hopr_transport_session::{
78    Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
79    SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
80    errors::{SessionManagerError, TransportSessionError},
81};
82use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
83#[cfg(feature = "mixer-stream")]
84use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
85use tracing::{Instrument, debug, error, info, trace, warn};
86
87pub use crate::{
88    config::HoprTransportConfig,
89    helpers::{PeerEligibility, TicketStatistics},
90};
91use crate::{constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, socket::HoprSocket};
92
93pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
94
95#[cfg(any(
96    all(feature = "mixer-channel", feature = "mixer-stream"),
97    all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
98))]
99compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
100
101// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
102lazy_static::lazy_static! {
103    static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
104}
105
106#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
107pub enum HoprTransportProcess {
108    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
109    Medium,
110    #[strum(to_string = "HOPR protocol ({0})")]
111    Protocol(hopr_transport_protocol::ProtocolProcesses),
112    #[strum(to_string = "session manager sub-process #{0}")]
113    SessionsManagement(usize),
114    #[strum(to_string = "network probing sub-process: {0}")]
115    Probing(hopr_transport_probe::HoprProbeProcess),
116}
117
118/// Currently used implementation of [`PathSelector`](hopr_path::selectors::PathSelector).
119type CurrentPathSelector = DfsPathSelector<RandomizedEdgeWeighting>;
120
121/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
122/// the transport, as well as off-chain ticket manipulation.
123pub struct HoprTransport<Db, R> {
124    me: OffchainKeypair,
125    me_peerid: PeerId, // Cache to avoid an expensive conversion: OffchainPublicKey -> PeerId
126    me_address: Address,
127    cfg: HoprTransportConfig,
128    db: Db,
129    resolver: R,
130    ping: Arc<OnceLock<Pinger>>,
131    network: Arc<Network<Db>>,
132    path_planner: PathPlanner<Db, R, CurrentPathSelector>,
133    my_multiaddresses: Vec<Multiaddr>,
134    smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>,
135}
136
137impl<Db, R> HoprTransport<Db, R>
138where
139    Db: HoprDbTicketOperations + HoprDbPeersOperations + HoprDbProtocolOperations + Clone + Send + Sync + 'static,
140    R: ChainReadChannelOperations
141        + ChainReadAccountOperations
142        + ChainKeyOperations
143        + ChainValues
144        + Clone
145        + Send
146        + Sync
147        + 'static,
148{
149    pub fn new(
150        me: &OffchainKeypair,
151        me_onchain: &ChainKeypair,
152        cfg: HoprTransportConfig,
153        db: Db,
154        resolver: R,
155        channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
156        my_multiaddresses: Vec<Multiaddr>,
157    ) -> Self {
158        let me_peerid: PeerId = me.into();
159        let me_chain_addr = me_onchain.public().to_address();
160
161        Self {
162            me: me.clone(),
163            me_peerid,
164            me_address: me_chain_addr,
165            ping: Arc::new(OnceLock::new()),
166            network: Arc::new(Network::new(
167                me_peerid,
168                my_multiaddresses.clone(),
169                cfg.network.clone(),
170                db.clone(),
171            )),
172            path_planner: PathPlanner::new(
173                me_chain_addr,
174                db.clone(),
175                resolver.clone(),
176                CurrentPathSelector::new(
177                    channel_graph.clone(),
178                    DfsPathSelectorConfig {
179                        node_score_threshold: cfg.network.node_score_auto_path_threshold,
180                        max_first_hop_latency: cfg.network.max_first_hop_latency_threshold,
181                        ..Default::default()
182                    },
183                ),
184                channel_graph.clone(),
185            ),
186            my_multiaddresses,
187            smgr: SessionManager::new(SessionManagerConfig {
188                // TODO(v3.1): Use the entire range of tags properly
189                session_tag_range: (16..65535),
190                maximum_sessions: cfg.session.maximum_sessions as usize,
191                frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
192                    .ok()
193                    .and_then(|s| s.parse::<usize>().ok())
194                    .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
195                    .max(ApplicationData::PAYLOAD_SIZE),
196                max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
197                    .ok()
198                    .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
199                    .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
200                    .max(Duration::from_millis(100)),
201                initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
202                idle_timeout: cfg.session.idle_timeout,
203                balancer_sampling_interval: cfg.session.balancer_sampling_interval,
204                initial_return_session_egress_rate: 10,
205                minimum_surb_buffer_duration: Duration::from_secs(5),
206                maximum_surb_buffer_size: db.get_surb_config().rb_capacity,
207                // Allow a 10% increase of the target SURB buffer on incoming Sessions
208                // if the SURB buffer level has surpassed it by at least 10% in the last 2 minutes.
209                growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
210            }),
211            db,
212            resolver,
213            cfg,
214        }
215    }
216
217    /// Execute all processes of the [`crate::HoprTransport`] object.
218    ///
219    /// This method will spawn the [`crate::HoprTransportProcess::Heartbeat`],
220    /// [`crate::HoprTransportProcess::BloomFilterSave`], [`crate::HoprTransportProcess::Swarm`] and session-related
221    /// processes and return join handles to the calling function. These processes are not started immediately but
222    /// are waiting for a trigger from this piece of code.
223    #[allow(clippy::too_many_arguments)]
224    pub async fn run<S, Ct>(
225        &self,
226        cover_traffic: Option<Ct>,
227        discovery_updates: S,
228        on_incoming_session: Sender<IncomingSession>,
229    ) -> crate::errors::Result<(
230        HoprSocket<
231            futures::channel::mpsc::Receiver<ApplicationDataIn>,
232            futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
233        >,
234        HashMap<HoprTransportProcess, AbortHandle>,
235    )>
236    where
237        S: futures::Stream<Item = PeerDiscovery> + Send + 'static,
238        Ct: TrafficGeneration + Send + Sync + 'static,
239    {
240        info!("Loading initial peers from the chain");
241        let public_nodes = self
242            .resolver
243            .stream_accounts(AccountSelector {
244                public_only: true,
245                ..Default::default()
246            })
247            .await
248            .map_err(|e| HoprTransportError::Other(e.into()))?
249            .collect::<Vec<_>>()
250            .await;
251
252        // Calculate the minimum capacity based on public nodes (each node can generate 2 messages)
253        // plus 100 as additional buffer
254        let minimum_capacity = public_nodes.len().saturating_mul(2).saturating_add(100);
255
256        let internal_discovery_updates_capacity = std::env::var("HOPR_INTERNAL_DISCOVERY_UPDATES_CAPACITY")
257            .ok()
258            .and_then(|s| s.trim().parse::<usize>().ok())
259            .filter(|&c| c > 0)
260            .unwrap_or(2048)
261            .max(minimum_capacity);
262
263        debug!(
264            capacity = internal_discovery_updates_capacity,
265            minimum_required = minimum_capacity,
266            "Creating internal discovery updates channel"
267        );
268        let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
269            futures::channel::mpsc::channel::<PeerDiscovery>(internal_discovery_updates_capacity);
270
271        let me_peerid = self.me_peerid;
272        let network = self.network.clone();
273        let discovery_updates = futures_concurrency::stream::StreamExt::merge(
274            discovery_updates,
275            internal_discovery_update_rx,
276        )
277        .filter_map(move |event| {
278            let network = network.clone();
279            async move {
280                match event {
281                    PeerDiscovery::Announce(peer, multiaddresses) => {
282                        debug!(%peer, ?multiaddresses, "processing peer discovery event: Announce");
283                        if peer != me_peerid {
284                            // decapsulate the `p2p/<peer_id>` to remove duplicities
285                            let mas = multiaddresses
286                                .into_iter()
287                                .map(|ma| strip_p2p_protocol(&ma))
288                                .filter(|v| !v.is_empty())
289                                .collect::<Vec<_>>();
290
291                            if !mas.is_empty() {
292                                if let Err(error) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await {
293                                    error!(%peer, %error, "failed to add peer to the network");
294                                    None
295                                } else {
296                                    Some(PeerDiscovery::Announce(peer, mas))
297                                }
298                            } else {
299                                None
300                            }
301                        } else {
302                            None
303                        }
304                    }
305                    _ => None,
306                }
307            }
308        });
309
310        info!(
311            public_nodes = public_nodes.len(),
312            "Initializing swarm with peers from chain"
313        );
314
315        for node_entry in public_nodes {
316            if let AccountType::Announced { multiaddr, .. } = node_entry.entry_type {
317                let peer: PeerId = node_entry.public_key.into();
318                let multiaddresses = vec![multiaddr];
319
320                debug!(%peer, ?multiaddresses, "Using initial public node");
321
322                internal_discovery_update_tx
323                    .send(PeerDiscovery::Announce(peer, multiaddresses.clone()))
324                    .await
325                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
326            }
327        }
328
329        let mut processes: HashMap<HoprTransportProcess, AbortHandle> = HashMap::new();
330
331        let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
332            channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
333
334        // -- transport medium
335        let mixer_cfg = build_mixer_cfg_from_env();
336
337        #[cfg(feature = "mixer-channel")]
338        let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
339
340        #[cfg(feature = "mixer-stream")]
341        let (mixing_channel_tx, mixing_channel_rx) = {
342            let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
343            let rx = rx.then_concurrent(move |v| {
344                let cfg = mixer_cfg;
345
346                async move {
347                    let random_delay = cfg.random_delay();
348                    trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
349
350                    #[cfg(all(feature = "prometheus", not(test)))]
351                    hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
352
353                    sleep(random_delay).await;
354
355                    #[cfg(all(feature = "prometheus", not(test)))]
356                    {
357                        hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
358
359                        let weight = 1.0f64 / cfg.metric_delay_window as f64;
360                        hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
361                            (weight * random_delay.as_millis() as f64)
362                                + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
363                        );
364                    }
365
366                    v
367                }
368            });
369
370            (tx, rx)
371        };
372
373        let transport_layer =
374            HoprSwarm::new((&self.me).into(), discovery_updates, self.my_multiaddresses.clone()).await;
375
376        let msg_proto_control =
377            transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
378        let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
379        let (wire_msg_tx, wire_msg_rx) =
380            hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
381
382        let _mixing_process_before_sending_out = hopr_async_runtime::prelude::spawn(
383            mixing_channel_rx
384                .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
385                .map(Ok)
386                .forward(wire_msg_tx)
387                .inspect(|_| {
388                    tracing::warn!(
389                        task = "mixer -> egress process",
390                        "long-running background task finished"
391                    )
392                }),
393        );
394
395        let (transport_events_tx, transport_events_rx) =
396            futures::channel::mpsc::channel::<hopr_transport_p2p::DiscoveryEvent>(2048);
397
398        let network_clone = self.network.clone();
399        spawn(
400            transport_events_rx
401                .for_each(move |event| {
402                    let network = network_clone.clone();
403
404                    async move {
405                        match event {
406                            hopr_transport_p2p::DiscoveryEvent::IncomingConnection(peer, multiaddr) => {
407                                if let Err(error) = network
408                                    .add(&peer, PeerOrigin::IncomingConnection, vec![multiaddr])
409                                    .await
410                                {
411                                    tracing::error!(%peer, %error, "Failed to add incoming connection peer");
412                                }
413                            }
414                            hopr_transport_p2p::DiscoveryEvent::FailedDial(peer) => {
415                                if let Err(error) = network
416                                    .update(&peer, Err(hopr_transport_network::network::UpdateFailure::DialFailure))
417                                    .await
418                                {
419                                    tracing::error!(%peer, %error, "Failed to update peer status after failed dial");
420                                }
421                            }
422                        }
423                    }
424                })
425                .inspect(|_| {
426                    tracing::warn!(
427                        task = "transport events recording",
428                        "long-running background task finished"
429                    )
430                }),
431        );
432
433        processes.insert(
434            HoprTransportProcess::Medium,
435            spawn_as_abortable!(transport_layer.run(transport_events_tx).inspect(|_| tracing::warn!(
436                task = %HoprTransportProcess::Medium,
437                "long-running background task finished"
438            ))),
439        );
440
441        // -- msg-ack protocol over the wire transport
442        let packet_cfg = PacketInteractionConfig {
443            packet_keypair: self.me.clone(),
444            outgoing_ticket_win_prob: self
445                .cfg
446                .protocol
447                .outgoing_ticket_winning_prob
448                .map(WinningProbability::try_from)
449                .transpose()?,
450            outgoing_ticket_price: self.cfg.protocol.outgoing_ticket_price,
451        };
452
453        let msg_protocol_bidirectional_channel_capacity =
454            std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
455                .ok()
456                .and_then(|s| s.trim().parse::<usize>().ok())
457                .filter(|&c| c > 0)
458                .unwrap_or(16_384);
459
460        let (on_incoming_data_tx, on_incoming_data_rx) =
461            channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
462
463        debug!(
464            capacity = msg_protocol_bidirectional_channel_capacity,
465            "Creating protocol bidirectional channel"
466        );
467        let (tx_from_protocol, rx_from_protocol) =
468            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
469
470        // We have to resolve DestinationRouting -> ResolvedTransportRouting before
471        // sending the external packets to the transport pipeline.
472        let path_planner = self.path_planner.clone();
473        let distress_threshold = self.db.get_surb_config().distress_threshold;
474        let all_resolved_external_msg_rx = unresolved_routing_msg_rx.filter_map(move |(unresolved, mut data)| {
475            let path_planner = path_planner.clone();
476            async move {
477                trace!(?unresolved, "resolving routing for packet");
478                match path_planner
479                    .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
480                    .await
481                {
482                    Ok((resolved, rem_surbs)) => {
483                        // Set the SURB distress/out-of-SURBs flag if applicable.
484                        // These flags are translated into HOPR protocol packet signals and are
485                        // applicable only on the return path.
486                        let mut signals_to_dst = data
487                            .packet_info
488                            .as_ref()
489                            .map(|info| info.signals_to_destination)
490                            .unwrap_or_default();
491
492                        if resolved.is_return() {
493                            signals_to_dst = match rem_surbs {
494                                Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
495                                    signals_to_dst | PacketSignal::SurbDistress
496                                }
497                                Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
498                                _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
499                            };
500                        } else {
501                            // Unset these flags as they make no sense on the forward path.
502                            signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
503                        }
504
505                        data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
506                        trace!(?resolved, "resolved routing for packet");
507                        Some((resolved, data))
508                    }
509                    Err(error) => {
510                        error!(%error, "failed to resolve routing");
511                        None
512                    }
513                }
514            }
515            .in_current_span()
516        });
517
518        for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
519            packet_cfg,
520            self.db.clone(),
521            self.resolver.clone(),
522            (
523                mixing_channel_tx.with(|(peer, msg): (PeerId, Box<[u8]>)| {
524                    trace!(%peer, "sending message to peer");
525                    futures::future::ok::<_, hopr_transport_mixer::channel::SenderError>((peer, msg))
526                }),
527                wire_msg_rx.inspect(|(peer, _)| trace!(%peer, "received message from peer")),
528            ),
529            (tx_from_protocol, all_resolved_external_msg_rx),
530        )
531        .await
532        .into_iter()
533        {
534            processes.insert(HoprTransportProcess::Protocol(k), v);
535        }
536
537        // -- network probing
538        debug!(
539            capacity = msg_protocol_bidirectional_channel_capacity,
540            note = "same as protocol bidirectional",
541            "Creating probing channel"
542        );
543
544        let (tx_from_probing, rx_from_probing) =
545            channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
546
547        let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
548            .ok()
549            .and_then(|s| s.trim().parse::<usize>().ok())
550            .filter(|&c| c > 0)
551            .unwrap_or(128);
552        debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
553        let (manual_ping_tx, manual_ping_rx) = channel::<(PeerId, PingQueryReplier)>(manual_ping_channel_capacity);
554
555        let probe = Probe::new(self.cfg.probe);
556        let probing_processes = if let Some(ct) = cover_traffic {
557            probe
558                .continuously_scan(
559                    (unresolved_routing_msg_tx.clone(), rx_from_protocol),
560                    manual_ping_rx,
561                    tx_from_probing,
562                    ct,
563                )
564                .await
565        } else {
566            probe
567                .continuously_scan(
568                    (unresolved_routing_msg_tx.clone(), rx_from_protocol),
569                    manual_ping_rx,
570                    tx_from_probing,
571                    ImmediateNeighborProber::new(
572                        self.cfg.probe,
573                        network_notifier::ProbeNetworkInteractions::new(
574                            self.network.clone(),
575                            self.resolver.clone(),
576                            self.path_planner.channel_graph(),
577                        ),
578                    ),
579                )
580                .await
581        };
582
583        for (k, v) in probing_processes.into_iter() {
584            processes.insert(HoprTransportProcess::Probing(k), v);
585        }
586
587        // manual ping
588        self.ping
589            .clone()
590            .set(Pinger::new(
591                PingConfig {
592                    timeout: self.cfg.probe.timeout,
593                },
594                manual_ping_tx,
595            ))
596            .expect("must set the ticket aggregation writer only once");
597
598        // -- session management
599        self.smgr
600            .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
601            .expect("failed to start session manager")
602            .into_iter()
603            .enumerate()
604            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
605            .for_each(|(k, v)| {
606                processes.insert(k, v);
607            });
608
609        let smgr = self.smgr.clone();
610        processes.insert(
611            HoprTransportProcess::SessionsManagement(0),
612            spawn_as_abortable!(
613                StreamExt::filter_map(rx_from_probing, move |(pseudonym, data)| {
614                    let smgr = smgr.clone();
615                    async move {
616                        match smgr.dispatch_message(pseudonym, data).await {
617                            Ok(DispatchResult::Processed) => {
618                                trace!("message dispatch completed");
619                                None
620                            }
621                            Ok(DispatchResult::Unrelated(data)) => {
622                                trace!("unrelated message dispatch completed");
623                                Some(data)
624                            }
625                            Err(e) => {
626                                error!(error = %e, "error while processing packet");
627                                None
628                            }
629                        }
630                    }
631                })
632                .map(Ok)
633                .forward(on_incoming_data_tx)
634                .inspect(|_| tracing::warn!(
635                    task = %HoprTransportProcess::SessionsManagement(0),
636                    "long-running background task finished"
637                ))
638            ),
639        );
640
641        Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
642    }
643
644    #[tracing::instrument(level = "debug", skip(self))]
645    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
646        if peer == &self.me_peerid {
647            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
648        }
649
650        let pinger = self
651            .ping
652            .get()
653            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
654
655        if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
656            error!(error = %e, "Failed to store the peer observation");
657        }
658
659        let latency = (*pinger).ping(*peer).await?;
660
661        let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::Probe(
662            hopr_transport_probe::errors::ProbeError::NonExistingPeer,
663        ))?;
664
665        Ok((latency, peer_status))
666    }
667
668    #[tracing::instrument(level = "debug", skip(self))]
669    pub async fn new_session(
670        &self,
671        destination: Address,
672        target: SessionTarget,
673        cfg: SessionClientConfig,
674    ) -> errors::Result<HoprSession> {
675        Ok(self.smgr.new_session(destination, target, cfg).await?)
676    }
677
678    #[tracing::instrument(level = "debug", skip(self))]
679    pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
680        Ok(self.smgr.ping_session(id).await?)
681    }
682
683    pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
684        Ok(self.smgr.get_surb_balancer_config(id).await?)
685    }
686
687    pub async fn update_session_surb_balancing_cfg(
688        &self,
689        id: &SessionId,
690        cfg: SurbBalancerConfig,
691    ) -> errors::Result<()> {
692        Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
693    }
694
695    #[tracing::instrument(level = "debug", skip(self))]
696    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
697        self.network
698            .get(&self.me_peerid)
699            .await
700            .unwrap_or_else(|e| {
701                error!(error = %e, "failed to obtain listening multi-addresses");
702                None
703            })
704            .map(|peer| peer.multiaddresses)
705            .unwrap_or_default()
706    }
707
708    #[tracing::instrument(level = "debug", skip(self))]
709    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
710        let mut mas = self
711            .local_multiaddresses()
712            .into_iter()
713            .filter(|ma| {
714                hopr_transport_identity::multiaddrs::is_supported(ma)
715                    && (self.cfg.transport.announce_local_addresses
716                        || !hopr_transport_identity::multiaddrs::is_private(ma))
717            })
718            .map(|ma| strip_p2p_protocol(&ma))
719            .filter(|v| !v.is_empty())
720            .collect::<Vec<_>>();
721
722        mas.sort_by(|l, r| {
723            let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
724            let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
725
726            if !(is_left_dns ^ is_right_dns) {
727                std::cmp::Ordering::Equal
728            } else if is_left_dns {
729                std::cmp::Ordering::Less
730            } else {
731                std::cmp::Ordering::Greater
732            }
733        });
734
735        mas
736    }
737
738    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
739        self.my_multiaddresses.clone()
740    }
741
742    #[tracing::instrument(level = "debug", skip(self))]
743    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
744        self.network
745            .get(peer)
746            .await
747            .unwrap_or(None)
748            .map(|peer| peer.multiaddresses)
749            .unwrap_or(vec![])
750    }
751
752    #[tracing::instrument(level = "debug", skip(self))]
753    pub async fn network_health(&self) -> Health {
754        self.network.health().await
755    }
756
757    #[tracing::instrument(level = "debug", skip(self))]
758    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
759        Ok(self.network.connected_peers().await?)
760    }
761
762    #[tracing::instrument(level = "debug", skip(self))]
763    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
764        Ok(self.network.get(peer).await?)
765    }
766
767    #[tracing::instrument(level = "debug", skip(self))]
768    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
769        let ticket_stats = self
770            .db
771            .get_ticket_statistics(None)
772            .await
773            .map_err(|e| HoprTransportError::Other(e.into()))?;
774
775        Ok(TicketStatistics {
776            winning_count: ticket_stats.winning_tickets,
777            unredeemed_value: ticket_stats.unredeemed_value,
778            redeemed_value: ticket_stats.redeemed_value,
779            neglected_value: ticket_stats.neglected_value,
780            rejected_value: ticket_stats.rejected_value,
781        })
782    }
783
784    #[tracing::instrument(level = "debug", skip(self))]
785    pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
786        if let Some(channel) = self
787            .resolver
788            .channel_by_id(channel_id)
789            .await
790            .map_err(|e| HoprTransportError::Other(e.into()))?
791        {
792            if channel.destination == self.me_address {
793                Ok(Some(
794                    self.db
795                        .stream_tickets(Some((&channel).into()))
796                        .await
797                        .map_err(|e| HoprTransportError::Other(e.into()))?
798                        .collect()
799                        .await,
800                ))
801            } else {
802                Ok(None)
803            }
804        } else {
805            Ok(None)
806        }
807    }
808
809    #[tracing::instrument(level = "debug", skip(self))]
810    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
811        Ok(self
812            .db
813            .stream_tickets(None)
814            .await
815            .map_err(|e| HoprTransportError::Other(e.into()))?
816            .map(|v| v.ticket.leak())
817            .collect()
818            .await)
819    }
820}
821
822fn build_mixer_cfg_from_env() -> MixerConfig {
823    let mixer_cfg = MixerConfig {
824        min_delay: std::time::Duration::from_millis(
825            std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
826                .map(|v| {
827                    v.trim()
828                        .parse::<u64>()
829                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
830                })
831                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
832        ),
833        delay_range: std::time::Duration::from_millis(
834            std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
835                .map(|v| {
836                    v.trim()
837                        .parse::<u64>()
838                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
839                })
840                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
841        ),
842        capacity: {
843            let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
844                .ok()
845                .and_then(|s| s.trim().parse::<usize>().ok())
846                .filter(|&c| c > 0)
847                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
848            debug!(capacity = capacity, "Setting mixer capacity");
849            capacity
850        },
851        ..MixerConfig::default()
852    };
853    debug!(?mixer_cfg, "Mixer configuration");
854
855    mixer_cfg
856}