hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! See also the `hopr_protocol_start` crate for details on Start sub-protocol which initiates a Session.
6//!
7//! As such, the transport layer components should be only those that are directly necessary to:
8//!
9//! 1. send and receive a packet, acknowledgement or ticket aggregation request
10//! 2. send and receive a network telemetry request
11//! 3. automate transport level processes
12//! 4. algorithms associated with the transport layer operational management
13//! 5. interface specifications to allow modular behavioral extensions
14
15/// Configuration of the [crate::HoprTransport].
16pub mod config;
17/// Constants used and exposed by the crate.
18pub mod constants;
19/// Errors used by the crate.
20pub mod errors;
21pub mod helpers;
22pub mod network_notifier;
23/// Objects used and possibly exported by the crate for re-use for transport functionality
24pub mod proxy;
25
26use std::{
27    collections::{HashMap, HashSet},
28    sync::{Arc, OnceLock},
29    time::Duration,
30};
31
32use async_lock::RwLock;
33use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
34use futures::{
35    FutureExt, SinkExt, StreamExt,
36    channel::mpsc::{self, Sender, UnboundedReceiver, UnboundedSender, unbounded},
37};
38use helpers::PathPlanner;
39use hopr_async_runtime::{AbortHandle, prelude::spawn, spawn_as_abortable};
40use hopr_crypto_packet::prelude::HoprPacket;
41pub use hopr_crypto_types::{
42    keypairs::{ChainKeypair, Keypair, OffchainKeypair},
43    types::{HalfKeyChallenge, Hash, OffchainPublicKey},
44};
45use hopr_db_sql::{
46    HoprDbAllOperations,
47    accounts::ChainOrPacketKey,
48    api::tickets::{AggregationPrerequisites, HoprDbTicketOperations},
49};
50pub use hopr_internal_types::prelude::HoprPseudonym;
51use hopr_internal_types::prelude::*;
52pub use hopr_network_types::prelude::RoutingOptions;
53use hopr_network_types::prelude::{DestinationRouting, ResolvedTransportRouting};
54use hopr_path::{
55    PathAddressResolver,
56    selectors::dfs::{DfsPathSelector, DfsPathSelectorConfig, RandomizedEdgeWeighting},
57};
58use hopr_primitive_types::prelude::*;
59pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
60use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
61pub use hopr_transport_identity::{Multiaddr, PeerId};
62use hopr_transport_mixer::MixerConfig;
63pub use hopr_transport_network::network::{Health, Network, PeerOrigin, PeerStatus};
64use hopr_transport_p2p::{
65    HoprSwarm,
66    swarm::{TicketAggregationRequestType, TicketAggregationResponseType},
67};
68use hopr_transport_probe::{
69    DbProxy, Probe,
70    ping::{PingConfig, Pinger},
71};
72pub use hopr_transport_probe::{errors::ProbeError, ping::PingQueryReplier};
73pub use hopr_transport_protocol::{PeerDiscovery, execute_on_tick};
74use hopr_transport_protocol::{
75    errors::ProtocolError,
76    processor::{MsgSender, PacketInteractionConfig, PacketSendFinalizer, SendMsgInput},
77};
78#[cfg(feature = "runtime-tokio")]
79pub use hopr_transport_session::transfer_session;
80pub use hopr_transport_session::{
81    Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
82    SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
83    errors::{SessionManagerError, TransportSessionError},
84};
85use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
86use hopr_transport_ticket_aggregation::{
87    AwaitingAggregator, TicketAggregationActions, TicketAggregationError, TicketAggregationInteraction,
88    TicketAggregatorTrait,
89};
90use rand::seq::SliceRandom;
91#[cfg(feature = "mixer-stream")]
92use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
93use tracing::{debug, error, info, trace, warn};
94
95pub use crate::{
96    config::HoprTransportConfig,
97    helpers::{PeerEligibility, TicketStatistics},
98};
99use crate::{constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, helpers::run_packet_planner};
100
101pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
102
103#[cfg(any(
104    all(feature = "mixer-channel", feature = "mixer-stream"),
105    all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
106))]
107compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
108
109// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
110lazy_static::lazy_static! {
111    static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
112}
113
114#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
115pub enum HoprTransportProcess {
116    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
117    Medium,
118    #[strum(to_string = "HOPR protocol ({0})")]
119    Protocol(hopr_transport_protocol::ProtocolProcesses),
120    #[strum(to_string = "session manager sub-process #{0}")]
121    SessionsManagement(usize),
122    #[strum(to_string = "network probing sub-process: {0}")]
123    Probing(hopr_transport_probe::HoprProbeProcess),
124}
125
126#[derive(Debug, Clone)]
127pub struct TicketAggregatorProxy<Db>
128where
129    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
130{
131    db: Db,
132    maybe_writer: Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
133    agg_timeout: std::time::Duration,
134}
135
136impl<Db> TicketAggregatorProxy<Db>
137where
138    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
139{
140    pub fn new(
141        db: Db,
142        maybe_writer: Arc<
143            OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>,
144        >,
145        agg_timeout: std::time::Duration,
146    ) -> Self {
147        Self {
148            db,
149            maybe_writer,
150            agg_timeout,
151        }
152    }
153}
154
155#[async_trait::async_trait]
156impl<Db> TicketAggregatorTrait for TicketAggregatorProxy<Db>
157where
158    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
159{
160    async fn aggregate_tickets(
161        &self,
162        channel: &Hash,
163        prerequisites: AggregationPrerequisites,
164    ) -> hopr_transport_ticket_aggregation::Result<()> {
165        if let Some(writer) = self.maybe_writer.clone().get() {
166            AwaitingAggregator::new(self.db.clone(), writer.clone(), self.agg_timeout)
167                .aggregate_tickets(channel, prerequisites)
168                .await
169        } else {
170            Err(TicketAggregationError::TransportError(
171                "Ticket aggregation writer not available, the object was not yet initialized".to_string(),
172            ))
173        }
174    }
175}
176
177/// Currently used implementation of [`PathSelector`](hopr_path::selectors::PathSelector).
178type CurrentPathSelector = DfsPathSelector<RandomizedEdgeWeighting>;
179
180/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
181/// the transport, as well as off-chain ticket manipulation.
182pub struct HoprTransport<T>
183where
184    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
185{
186    me: OffchainKeypair,
187    me_peerid: PeerId, // Cache to avoid an expensive conversion: OffchainPublicKey -> PeerId
188    me_address: Address,
189    cfg: HoprTransportConfig,
190    db: T,
191    ping: Arc<OnceLock<Pinger>>,
192    network: Arc<Network<T>>,
193    process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
194    path_planner: PathPlanner<T, CurrentPathSelector>,
195    my_multiaddresses: Vec<Multiaddr>,
196    process_ticket_aggregate:
197        Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
198    smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>>,
199}
200
201impl<T> HoprTransport<T>
202where
203    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
204{
205    pub fn new(
206        me: &OffchainKeypair,
207        me_onchain: &ChainKeypair,
208        cfg: HoprTransportConfig,
209        db: T,
210        channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
211        my_multiaddresses: Vec<Multiaddr>,
212    ) -> Self {
213        let process_packet_send = Arc::new(OnceLock::new());
214
215        let me_peerid: PeerId = me.into();
216        let me_chain_addr = me_onchain.public().to_address();
217
218        Self {
219            me: me.clone(),
220            me_peerid,
221            me_address: me_chain_addr,
222            ping: Arc::new(OnceLock::new()),
223            network: Arc::new(Network::new(
224                me_peerid,
225                my_multiaddresses.clone(),
226                cfg.network.clone(),
227                db.clone(),
228            )),
229            process_packet_send,
230            path_planner: PathPlanner::new(
231                me_chain_addr,
232                db.clone(),
233                CurrentPathSelector::new(
234                    channel_graph.clone(),
235                    DfsPathSelectorConfig {
236                        node_score_threshold: cfg.network.node_score_auto_path_threshold,
237                        max_first_hop_latency: cfg.network.max_first_hop_latency_threshold,
238                        ..Default::default()
239                    },
240                ),
241                channel_graph.clone(),
242            ),
243            my_multiaddresses,
244            process_ticket_aggregate: Arc::new(OnceLock::new()),
245            smgr: SessionManager::new(SessionManagerConfig {
246                // TODO(v3.1): Use the entire range of tags properly
247                session_tag_range: (16..65535),
248                maximum_sessions: cfg.session.maximum_sessions as usize,
249                frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
250                    .ok()
251                    .and_then(|s| s.parse::<usize>().ok())
252                    .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
253                    .max(ApplicationData::PAYLOAD_SIZE),
254                max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
255                    .ok()
256                    .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
257                    .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
258                    .max(Duration::from_millis(100)),
259                initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
260                idle_timeout: cfg.session.idle_timeout,
261                balancer_sampling_interval: cfg.session.balancer_sampling_interval,
262                initial_return_session_egress_rate: 10,
263                minimum_surb_buffer_duration: Duration::from_secs(5),
264                maximum_surb_buffer_size: db.get_surb_config().rb_capacity,
265                // Allow a 10% increase of the target SURB buffer on incoming Sessions
266                // if the SURB buffer level has surpassed it by at least 10% in the last 2 minutes.
267                growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
268            }),
269            db,
270            cfg,
271        }
272    }
273
274    /// Execute all processes of the [`crate::HoprTransport`] object.
275    ///
276    /// This method will spawn the [`crate::HoprTransportProcess::Heartbeat`],
277    /// [`crate::HoprTransportProcess::BloomFilterSave`], [`crate::HoprTransportProcess::Swarm`] and session-related
278    /// processes and return join handles to the calling function. These processes are not started immediately but
279    /// are waiting for a trigger from this piece of code.
280    #[allow(clippy::too_many_arguments)]
281    pub async fn run(
282        &self,
283        me_onchain: &ChainKeypair,
284        on_incoming_data: UnboundedSender<ApplicationDataIn>,
285        discovery_updates: UnboundedReceiver<PeerDiscovery>,
286        on_incoming_session: UnboundedSender<IncomingSession>,
287    ) -> crate::errors::Result<HashMap<HoprTransportProcess, AbortHandle>> {
288        let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
289            futures::channel::mpsc::unbounded::<PeerDiscovery>();
290
291        let network_clone = self.network.clone();
292        let db_clone = self.db.clone();
293        let me_peerid = self.me_peerid;
294        let discovery_updates =
295            futures_concurrency::stream::StreamExt::merge(discovery_updates, internal_discovery_update_rx)
296                .filter_map(move |event| {
297                    let network = network_clone.clone();
298                    let db = db_clone.clone();
299                    let me = me_peerid;
300
301                    async move {
302                        match event {
303                            PeerDiscovery::Allow(peer_id) => {
304                                debug!(peer = %peer_id, "Processing peer discovery event: Allow");
305
306                                // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
307                                if let Ok(pubkey) = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer_id))
308                                    .await {
309                                    if !network.has(&peer_id).await {
310                                        let mas = db
311                                            .get_account(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pubkey))
312                                            .await
313                                            .map(|entry| {
314                                                entry
315                                                    .map(|v| Vec::from_iter(v.get_multiaddr().into_iter()))
316                                                    .unwrap_or_default()
317                                            })
318                                            .unwrap_or_default();
319
320                                        if let Err(e) = network.add(&peer_id, PeerOrigin::NetworkRegistry, mas).await {
321                                            error!(peer = %peer_id, error = %e, "Failed to allow locally (already allowed on-chain)");
322                                            return None;
323                                        }
324                                    }
325
326                                    return Some(PeerDiscovery::Allow(peer_id))
327                                } else {
328                                    error!(peer = %peer_id, "Failed to allow locally (already allowed on-chain): peer id not convertible to off-chain public key")
329                                }
330                            }
331                            PeerDiscovery::Ban(peer_id) => {
332                                if let Err(e) = network.remove(&peer_id).await {
333                                    error!(peer = %peer_id, error = %e, "Failed to ban locally (already banned on-chain)")
334                                } else {
335                                    return Some(PeerDiscovery::Ban(peer_id))
336                                }
337                            }
338                            PeerDiscovery::Announce(peer, multiaddresses) => {
339                                debug!(peer = %peer, ?multiaddresses, "Processing peer discovery event: Announce");
340                                if peer != me {
341                                    // decapsulate the `p2p/<peer_id>` to remove duplicities
342                                    let mas = multiaddresses
343                                        .into_iter()
344                                        .map(|ma| strip_p2p_protocol(&ma))
345                                        .filter(|v| !v.is_empty())
346                                        .collect::<Vec<_>>();
347
348                                    if ! mas.is_empty() {
349                                        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
350                                        if let Ok(pubkey) = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer))
351                                            .await {
352                                            if let Ok(Some(key)) = db.translate_key(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pubkey)).await {
353                                                let key: Result<Address, _> = key.try_into();
354
355                                                if let Ok(key) = key {
356                                                    if db
357                                                        .is_allowed_in_network_registry(None, &key)
358                                                        .await
359                                                        .unwrap_or(false)
360                                                    {
361                                                        if let Err(e) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await
362                                                        {
363                                                            error!(%peer, error = %e, "failed to record peer from the NetworkRegistry");
364                                                        } else {
365                                                            return Some(PeerDiscovery::Announce(peer, mas))
366                                                        }
367                                                    }
368                                                }
369                                            } else {
370                                                error!(%peer, "Failed to announce peer due to convertibility error");
371                                            }
372                                        }
373                                    }
374                                }
375                            }
376                        }
377
378                        None
379                    }
380                });
381
382        info!("Loading initial peers from the storage");
383
384        let mut addresses: HashSet<Multiaddr> = HashSet::new();
385        let nodes = self.get_public_nodes().await?;
386        for (peer, _address, multiaddresses) in nodes {
387            if self.is_allowed_to_access_network(either::Left(&peer)).await? {
388                debug!(%peer, ?multiaddresses, "Using initial public node");
389                addresses.extend(multiaddresses.clone());
390
391                internal_discovery_update_tx
392                    .send(PeerDiscovery::Announce(peer, multiaddresses.clone()))
393                    .await
394                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
395
396                internal_discovery_update_tx
397                    .send(PeerDiscovery::Allow(peer))
398                    .await
399                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
400            }
401        }
402
403        let mut processes: HashMap<HoprTransportProcess, AbortHandle> = HashMap::new();
404
405        let ticket_agg_proc = TicketAggregationInteraction::new(self.db.clone(), me_onchain);
406        let tkt_agg_writer = ticket_agg_proc.writer();
407
408        let (external_msg_send, external_msg_rx) =
409            mpsc::channel::<(ApplicationDataOut, ResolvedTransportRouting, PacketSendFinalizer)>(
410                MAXIMUM_MSG_OUTGOING_BUFFER_SIZE,
411            );
412
413        self.process_packet_send
414            .clone()
415            .set(MsgSender::new(external_msg_send.clone()))
416            .expect("must set the packet processing writer only once");
417
418        self.process_ticket_aggregate
419            .clone()
420            .set(tkt_agg_writer.clone())
421            .expect("must set the ticket aggregation writer only once");
422
423        // -- transport medium
424        let mixer_cfg = build_mixer_cfg_from_env();
425
426        #[cfg(feature = "mixer-channel")]
427        let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
428
429        #[cfg(feature = "mixer-stream")]
430        let (mixing_channel_tx, mixing_channel_rx) = {
431            let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
432            let rx = rx.then_concurrent(move |v| {
433                let cfg = mixer_cfg;
434
435                async move {
436                    let random_delay = cfg.random_delay();
437                    trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
438
439                    #[cfg(all(feature = "prometheus", not(test)))]
440                    hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
441
442                    sleep(random_delay).await;
443
444                    #[cfg(all(feature = "prometheus", not(test)))]
445                    {
446                        hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
447
448                        let weight = 1.0f64 / cfg.metric_delay_window as f64;
449                        hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
450                            (weight * random_delay.as_millis() as f64)
451                                + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
452                        );
453                    }
454
455                    v
456                }
457            });
458
459            (tx, rx)
460        };
461
462        let mut transport_layer =
463            HoprSwarm::new((&self.me).into(), discovery_updates, self.my_multiaddresses.clone()).await;
464
465        if let Some(port) = self.cfg.protocol.autonat_port {
466            transport_layer.run_nat_server(port);
467        }
468
469        if addresses.is_empty() {
470            warn!("No addresses found in the database, not dialing any NAT servers");
471        } else {
472            info!(num_addresses = addresses.len(), "Found addresses from the database");
473            let mut randomized_addresses: Vec<_> = addresses.into_iter().collect();
474            randomized_addresses.shuffle(&mut rand::thread_rng());
475            transport_layer.dial_nat_server(randomized_addresses);
476        }
477
478        let msg_proto_control =
479            transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
480        let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
481        let (wire_msg_tx, wire_msg_rx) =
482            hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
483
484        let _mixing_process_before_sending_out = hopr_async_runtime::prelude::spawn(
485            mixing_channel_rx
486                .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
487                .map(Ok)
488                .forward(wire_msg_tx),
489        );
490
491        let (transport_events_tx, transport_events_rx) =
492            futures::channel::mpsc::channel::<hopr_transport_p2p::DiscoveryEvent>(1000);
493
494        let network_clone = self.network.clone();
495        spawn(transport_events_rx.for_each(move |event| {
496            let network = network_clone.clone();
497
498            async move {
499                match event {
500                    hopr_transport_p2p::DiscoveryEvent::IncomingConnection(peer, multiaddr) => {
501                        if let Err(error) = network
502                            .add(&peer, PeerOrigin::IncomingConnection, vec![multiaddr])
503                            .await
504                        {
505                            tracing::error!(%peer, %error, "Failed to add incoming connection peer");
506                        }
507                    }
508                    hopr_transport_p2p::DiscoveryEvent::FailedDial(peer) => {
509                        if let Err(error) = network
510                            .update(&peer, Err(hopr_transport_network::network::UpdateFailure::DialFailure))
511                            .await
512                        {
513                            tracing::error!(%peer, %error, "Failed to update peer status after failed dial");
514                        }
515                    }
516                }
517            }
518            .inspect(|_| {
519                tracing::info!(
520                    task = "transport event notifier",
521                    "long-running background task finished"
522                )
523            })
524        }));
525
526        processes.insert(
527            HoprTransportProcess::Medium,
528            spawn_as_abortable!(transport_layer.run(transport_events_tx)),
529        );
530
531        // -- msg-ack protocol over the wire transport
532        let packet_cfg = PacketInteractionConfig {
533            packet_keypair: self.me.clone(),
534            outgoing_ticket_win_prob: self
535                .cfg
536                .protocol
537                .outgoing_ticket_winning_prob
538                .map(WinningProbability::try_from)
539                .transpose()?,
540            outgoing_ticket_price: self.cfg.protocol.outgoing_ticket_price,
541        };
542
543        let (tx_from_protocol, rx_from_protocol) = unbounded::<(HoprPseudonym, ApplicationDataIn)>();
544        for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
545            packet_cfg,
546            self.db.clone(),
547            (
548                mixing_channel_tx.with(|(peer, msg): (PeerId, Box<[u8]>)| {
549                    trace!(%peer, "sending message to peer");
550                    futures::future::ok::<_, hopr_transport_mixer::channel::SenderError>((peer, msg))
551                }),
552                wire_msg_rx.inspect(|(peer, _)| trace!(%peer, "received message from peer")),
553            ),
554            (tx_from_protocol, external_msg_rx),
555        )
556        .await
557        .into_iter()
558        {
559            processes.insert(HoprTransportProcess::Protocol(k), v);
560        }
561
562        // -- network probing
563        let (tx_from_probing, rx_from_probing) = unbounded::<(HoprPseudonym, ApplicationDataIn)>();
564
565        let (manual_ping_tx, manual_ping_rx) = unbounded::<(PeerId, PingQueryReplier)>();
566
567        let probe = Probe::new((*self.me.public(), self.me_address), self.cfg.probe);
568        for (k, v) in probe
569            .continuously_scan(
570                (external_msg_send, rx_from_protocol),
571                manual_ping_rx,
572                network_notifier::ProbeNetworkInteractions::new(
573                    self.network.clone(),
574                    self.db.clone(),
575                    self.path_planner.channel_graph(),
576                ),
577                DbProxy::new(self.db.clone()),
578                tx_from_probing,
579            )
580            .await
581            .into_iter()
582        {
583            processes.insert(HoprTransportProcess::Probing(k), v);
584        }
585
586        // manual ping
587        self.ping
588            .clone()
589            .set(Pinger::new(
590                PingConfig {
591                    timeout: self.cfg.probe.timeout,
592                },
593                manual_ping_tx,
594            ))
595            .expect("must set the ticket aggregation writer only once");
596
597        // -- session management
598        let packet_planner = run_packet_planner(
599            self.path_planner.clone(),
600            self.process_packet_send
601                .get()
602                .cloned()
603                .expect("packet sender must be set"),
604        );
605
606        self.smgr
607            .start(packet_planner, on_incoming_session)
608            .expect("failed to start session manager")
609            .into_iter()
610            .enumerate()
611            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
612            .for_each(|(k, v)| {
613                processes.insert(k, v);
614            });
615
616        let smgr = self.smgr.clone();
617        processes.insert(
618            HoprTransportProcess::SessionsManagement(0),
619            spawn_as_abortable!(async move {
620                let _the_process_should_not_end = StreamExt::filter_map(rx_from_probing, |(pseudonym, data)| {
621                    let smgr = smgr.clone();
622                    async move {
623                        match smgr.dispatch_message(pseudonym, data).await {
624                            Ok(DispatchResult::Processed) => {
625                                trace!("message dispatch completed");
626                                None
627                            }
628                            Ok(DispatchResult::Unrelated(data)) => {
629                                trace!("unrelated message dispatch completed");
630                                Some(data)
631                            }
632                            Err(e) => {
633                                error!(error = %e, "error while processing packet");
634                                None
635                            }
636                        }
637                    }
638                })
639                .map(Ok)
640                .forward(on_incoming_data)
641                .await;
642            }),
643        );
644
645        Ok(processes)
646    }
647
648    pub fn ticket_aggregator(&self) -> Arc<dyn TicketAggregatorTrait + Send + Sync + 'static> {
649        Arc::new(proxy::TicketAggregatorProxy::new(
650            self.db.clone(),
651            self.process_ticket_aggregate.clone(),
652            std::time::Duration::from_secs(15),
653        ))
654    }
655
656    #[tracing::instrument(level = "debug", skip(self))]
657    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
658        if !self.is_allowed_to_access_network(either::Left(peer)).await? {
659            return Err(HoprTransportError::Api(format!(
660                "ping to '{peer}' not allowed due to network registry"
661            )));
662        }
663
664        if peer == &self.me_peerid {
665            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
666        }
667
668        let pinger = self
669            .ping
670            .get()
671            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
672
673        if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
674            error!(error = %e, "Failed to store the peer observation");
675        }
676
677        let latency = (*pinger).ping(*peer).await?;
678
679        let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::Probe(
680            hopr_transport_probe::errors::ProbeError::NonExistingPeer,
681        ))?;
682
683        Ok((latency, peer_status))
684    }
685
686    #[tracing::instrument(level = "debug", skip(self))]
687    pub async fn new_session(
688        &self,
689        destination: Address,
690        target: SessionTarget,
691        cfg: SessionClientConfig,
692    ) -> errors::Result<HoprSession> {
693        Ok(self.smgr.new_session(destination, target, cfg).await?)
694    }
695
696    #[tracing::instrument(level = "debug", skip(self))]
697    pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
698        Ok(self.smgr.ping_session(id).await?)
699    }
700
701    pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
702        Ok(self.smgr.get_surb_balancer_config(id).await?)
703    }
704
705    pub async fn update_session_surb_balancing_cfg(
706        &self,
707        id: &SessionId,
708        cfg: SurbBalancerConfig,
709    ) -> errors::Result<()> {
710        Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
711    }
712
713    #[tracing::instrument(level = "info", skip(self, msg), fields(uuid = uuid::Uuid::new_v4().to_string()))]
714    pub async fn send_message(&self, msg: Box<[u8]>, routing: DestinationRouting, tag: Tag) -> errors::Result<()> {
715        if let Tag::Reserved(reserved_tag) = tag {
716            return Err(HoprTransportError::Api(format!(
717                "Application tag must not from range: {:?}, but was {reserved_tag:?}",
718                Tag::APPLICATION_TAG_RANGE
719            )));
720        }
721
722        if msg.len() > HoprPacket::PAYLOAD_SIZE {
723            return Err(HoprTransportError::Api(format!(
724                "Message exceeds the maximum allowed size of {} bytes",
725                HoprPacket::PAYLOAD_SIZE
726            )));
727        }
728
729        let app_data = ApplicationData::new(tag, msg.into_vec())?;
730        let routing = self
731            .path_planner
732            .resolve_routing(app_data.total_len(), usize::MAX, routing)
733            .await?
734            .0;
735
736        // Here we do not use msg_sender directly,
737        // since it internally follows Session-oriented logic
738        let sender = self.process_packet_send.get().ok_or_else(|| {
739            HoprTransportError::Api("send msg: failed because message processing is not yet initialized".into())
740        })?;
741
742        sender
743            .send_packet(ApplicationDataOut::with_no_packet_info(app_data), routing)
744            .await
745            .map_err(|e| HoprTransportError::Api(format!("send msg failed to enqueue msg: {e}")))?
746            .consume_and_wait(crate::constants::PACKET_QUEUE_TIMEOUT_MILLISECONDS)
747            .await
748            .map_err(|e| HoprTransportError::Api(e.to_string()))
749    }
750
751    #[tracing::instrument(level = "debug", skip(self))]
752    pub async fn aggregate_tickets(&self, channel_id: &Hash) -> errors::Result<()> {
753        let entry = self
754            .db
755            .get_channel_by_id(None, channel_id)
756            .await
757            .map_err(hopr_db_sql::api::errors::DbError::from)
758            .map_err(HoprTransportError::from)
759            .and_then(|c| {
760                if let Some(c) = c {
761                    Ok(c)
762                } else {
763                    Err(ProtocolError::ChannelNotFound.into())
764                }
765            })?;
766
767        if entry.status != ChannelStatus::Open {
768            return Err(ProtocolError::ChannelClosed.into());
769        }
770
771        // Ok(Arc::new(proxy::TicketAggregatorProxy::new(
772        //     self.db.clone(),
773        //     self.process_ticket_aggregate.clone(),
774        //     std::time::Duration::from_secs(15),
775        // ))
776        // .aggregate_tickets(&entry.get_id(), Default::default())
777        // .await?)
778
779        Err(TicketAggregationError::TransportError(
780            "Ticket aggregation not supported as a session protocol yet".to_string(),
781        )
782        .into())
783    }
784
785    #[tracing::instrument(level = "debug", skip(self))]
786    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
787        Ok(self
788            .db
789            .get_accounts(None, true)
790            .await
791            .map_err(hopr_db_sql::api::errors::DbError::from)?
792            .into_iter()
793            .map(|entry| {
794                (
795                    PeerId::from(entry.public_key),
796                    entry.chain_addr,
797                    Vec::from_iter(entry.get_multiaddr().into_iter()),
798                )
799            })
800            .collect())
801    }
802
803    pub async fn is_allowed_to_access_network<'a>(
804        &self,
805        address_like: either::Either<&'a PeerId, Address>,
806    ) -> errors::Result<bool>
807    where
808        T: 'a,
809    {
810        let db_clone = self.db.clone();
811        let address_like_noref = address_like.map_left(|peer| *peer);
812
813        Ok(self
814            .db
815            .begin_transaction()
816            .await
817            .map_err(hopr_db_sql::api::errors::DbError::from)?
818            .perform(|tx| {
819                Box::pin(async move {
820                    match address_like_noref {
821                        either::Left(peer) => {
822                            // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
823                            let pubkey =
824                                hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer))
825                                    .await
826                                    .map_err(|e| hopr_db_sql::api::errors::DbError::General(e.to_string()))?;
827                            if let Some(address) = db_clone.translate_key(Some(tx), pubkey).await? {
828                                db_clone.is_allowed_in_network_registry(Some(tx), &address).await
829                            } else {
830                                Err(hopr_db_sql::errors::DbSqlError::LogicalError(
831                                    "cannot translate off-chain key".into(),
832                                ))
833                            }
834                        }
835                        either::Right(address) => db_clone.is_allowed_in_network_registry(Some(tx), &address).await,
836                    }
837                })
838            })
839            .await
840            .map_err(hopr_db_sql::api::errors::DbError::from)?)
841    }
842
843    #[tracing::instrument(level = "debug", skip(self))]
844    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
845        self.network
846            .get(&self.me_peerid)
847            .await
848            .unwrap_or_else(|e| {
849                error!(error = %e, "failed to obtain listening multi-addresses");
850                None
851            })
852            .map(|peer| peer.multiaddresses)
853            .unwrap_or_default()
854    }
855
856    #[tracing::instrument(level = "debug", skip(self))]
857    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
858        let mut mas = self
859            .local_multiaddresses()
860            .into_iter()
861            .filter(|ma| {
862                hopr_transport_identity::multiaddrs::is_supported(ma)
863                    && (self.cfg.transport.announce_local_addresses
864                        || !hopr_transport_identity::multiaddrs::is_private(ma))
865            })
866            .map(|ma| strip_p2p_protocol(&ma))
867            .filter(|v| !v.is_empty())
868            .collect::<Vec<_>>();
869
870        mas.sort_by(|l, r| {
871            let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
872            let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
873
874            if !(is_left_dns ^ is_right_dns) {
875                std::cmp::Ordering::Equal
876            } else if is_left_dns {
877                std::cmp::Ordering::Less
878            } else {
879                std::cmp::Ordering::Greater
880            }
881        });
882
883        mas
884    }
885
886    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
887        self.my_multiaddresses.clone()
888    }
889
890    #[tracing::instrument(level = "debug", skip(self))]
891    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
892        self.network
893            .get(peer)
894            .await
895            .unwrap_or(None)
896            .map(|peer| peer.multiaddresses)
897            .unwrap_or(vec![])
898    }
899
900    #[tracing::instrument(level = "debug", skip(self))]
901    pub async fn network_health(&self) -> Health {
902        self.network.health().await
903    }
904
905    #[tracing::instrument(level = "debug", skip(self))]
906    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
907        Ok(self.network.connected_peers().await?)
908    }
909
910    #[tracing::instrument(level = "debug", skip(self))]
911    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
912        Ok(self.network.get(peer).await?)
913    }
914
915    #[tracing::instrument(level = "debug", skip(self))]
916    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
917        let ticket_stats = self.db.get_ticket_statistics(None).await?;
918
919        Ok(TicketStatistics {
920            winning_count: ticket_stats.winning_tickets,
921            unredeemed_value: ticket_stats.unredeemed_value,
922            redeemed_value: ticket_stats.redeemed_value,
923            neglected_value: ticket_stats.neglected_value,
924            rejected_value: ticket_stats.rejected_value,
925        })
926    }
927
928    #[tracing::instrument(level = "debug", skip(self))]
929    pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
930        if let Some(channel) = self
931            .db
932            .get_channel_by_id(None, channel_id)
933            .await
934            .map_err(hopr_db_sql::api::errors::DbError::from)?
935        {
936            let own_address: Address = self
937                .db
938                .translate_key(None, ChainOrPacketKey::PacketKey(*self.me.public()))
939                .await?
940                .ok_or_else(|| {
941                    HoprTransportError::Api("Failed to translate the off-chain key to on-chain address".into())
942                })?
943                .try_into()?;
944
945            if channel.destination == own_address {
946                Ok(Some(self.db.get_tickets((&channel).into()).await?))
947            } else {
948                Ok(None)
949            }
950        } else {
951            Ok(None)
952        }
953    }
954
955    #[tracing::instrument(level = "debug", skip(self))]
956    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
957        Ok(self
958            .db
959            .get_all_tickets()
960            .await?
961            .into_iter()
962            .map(|v| v.ticket.leak())
963            .collect())
964    }
965}
966
967fn build_mixer_cfg_from_env() -> MixerConfig {
968    let mixer_cfg = MixerConfig {
969        min_delay: std::time::Duration::from_millis(
970            std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
971                .map(|v| {
972                    v.trim()
973                        .parse::<u64>()
974                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
975                })
976                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
977        ),
978        delay_range: std::time::Duration::from_millis(
979            std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
980                .map(|v| {
981                    v.trim()
982                        .parse::<u64>()
983                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
984                })
985                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
986        ),
987        capacity: std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
988            .map(|v| {
989                v.trim()
990                    .parse::<usize>()
991                    .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY)
992            })
993            .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY),
994        ..MixerConfig::default()
995    };
996    debug!(?mixer_cfg, "Mixer configuration");
997
998    mixer_cfg
999}