hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! See also the `hopr_protocol_start` crate for details on Start sub-protocol which initiates a Session.
6//!
7//! As such, the transport layer components should be only those that are directly necessary to:
8//!
9//! 1. send and receive a packet, acknowledgement or ticket aggregation request
10//! 2. send and receive a network telemetry request
11//! 3. automate transport level processes
12//! 4. algorithms associated with the transport layer operational management
13//! 5. interface specifications to allow modular behavioral extensions
14
15/// Configuration of the [crate::HoprTransport].
16pub mod config;
17/// Constants used and exposed by the crate.
18pub mod constants;
19/// Errors used by the crate.
20pub mod errors;
21pub mod helpers;
22pub mod network_notifier;
23/// Objects used and possibly exported by the crate for re-use for transport functionality
24pub mod proxy;
25
26use std::{
27    collections::{HashMap, HashSet},
28    sync::{Arc, OnceLock},
29    time::Duration,
30};
31
32use async_lock::RwLock;
33use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
34use futures::{
35    SinkExt, StreamExt,
36    channel::mpsc::{self, Sender, UnboundedReceiver, UnboundedSender, unbounded},
37};
38use helpers::PathPlanner;
39use hopr_async_runtime::{AbortHandle, prelude::spawn, spawn_as_abortable};
40use hopr_crypto_packet::prelude::HoprPacket;
41pub use hopr_crypto_types::{
42    keypairs::{ChainKeypair, Keypair, OffchainKeypair},
43    types::{HalfKeyChallenge, Hash, OffchainPublicKey},
44};
45use hopr_db_sql::{
46    HoprDbAllOperations,
47    accounts::ChainOrPacketKey,
48    api::tickets::{AggregationPrerequisites, HoprDbTicketOperations},
49};
50pub use hopr_internal_types::prelude::HoprPseudonym;
51use hopr_internal_types::prelude::*;
52pub use hopr_network_types::prelude::RoutingOptions;
53use hopr_network_types::prelude::{DestinationRouting, ResolvedTransportRouting};
54use hopr_path::{
55    PathAddressResolver,
56    selectors::dfs::{DfsPathSelector, DfsPathSelectorConfig, RandomizedEdgeWeighting},
57};
58use hopr_primitive_types::prelude::*;
59pub use hopr_protocol_app::prelude::{ApplicationData, Tag};
60use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
61pub use hopr_transport_identity::{Multiaddr, PeerId};
62use hopr_transport_mixer::MixerConfig;
63pub use hopr_transport_network::network::{Health, Network, PeerOrigin, PeerStatus};
64use hopr_transport_p2p::{
65    HoprSwarm,
66    swarm::{TicketAggregationRequestType, TicketAggregationResponseType},
67};
68use hopr_transport_probe::{
69    DbProxy, Probe,
70    ping::{PingConfig, Pinger},
71};
72pub use hopr_transport_probe::{errors::ProbeError, ping::PingQueryReplier};
73pub use hopr_transport_protocol::{PeerDiscovery, execute_on_tick};
74use hopr_transport_protocol::{
75    errors::ProtocolError,
76    processor::{MsgSender, PacketInteractionConfig, PacketSendFinalizer, SendMsgInput},
77};
78#[cfg(feature = "runtime-tokio")]
79pub use hopr_transport_session::transfer_session;
80pub use hopr_transport_session::{
81    Capabilities as SessionCapabilities, Capability as SessionCapability, IncomingSession, SESSION_MTU, SURB_SIZE,
82    ServiceId, Session, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
83    errors::{SessionManagerError, TransportSessionError},
84};
85use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
86use hopr_transport_ticket_aggregation::{
87    AwaitingAggregator, TicketAggregationActions, TicketAggregationError, TicketAggregationInteraction,
88    TicketAggregatorTrait,
89};
90use rand::seq::SliceRandom;
91#[cfg(feature = "mixer-stream")]
92use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
93use tracing::{debug, error, info, trace, warn};
94
95pub use crate::{
96    config::HoprTransportConfig,
97    helpers::{PeerEligibility, TicketStatistics},
98};
99use crate::{constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, helpers::run_packet_planner};
100
101pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
102
103#[cfg(any(
104    all(feature = "mixer-channel", feature = "mixer-stream"),
105    all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
106))]
107compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
108
109// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
110lazy_static::lazy_static! {
111    static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
112}
113
114#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
115pub enum HoprTransportProcess {
116    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
117    Medium,
118    #[strum(to_string = "HOPR protocol ({0})")]
119    Protocol(hopr_transport_protocol::ProtocolProcesses),
120    #[strum(to_string = "session manager sub-process #{0}")]
121    SessionsManagement(usize),
122    #[strum(to_string = "network probing sub-process: {0}")]
123    Probing(hopr_transport_probe::HoprProbeProcess),
124}
125
126#[derive(Debug, Clone)]
127pub struct TicketAggregatorProxy<Db>
128where
129    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
130{
131    db: Db,
132    maybe_writer: Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
133    agg_timeout: std::time::Duration,
134}
135
136impl<Db> TicketAggregatorProxy<Db>
137where
138    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
139{
140    pub fn new(
141        db: Db,
142        maybe_writer: Arc<
143            OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>,
144        >,
145        agg_timeout: std::time::Duration,
146    ) -> Self {
147        Self {
148            db,
149            maybe_writer,
150            agg_timeout,
151        }
152    }
153}
154
155#[async_trait::async_trait]
156impl<Db> TicketAggregatorTrait for TicketAggregatorProxy<Db>
157where
158    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
159{
160    async fn aggregate_tickets(
161        &self,
162        channel: &Hash,
163        prerequisites: AggregationPrerequisites,
164    ) -> hopr_transport_ticket_aggregation::Result<()> {
165        if let Some(writer) = self.maybe_writer.clone().get() {
166            AwaitingAggregator::new(self.db.clone(), writer.clone(), self.agg_timeout)
167                .aggregate_tickets(channel, prerequisites)
168                .await
169        } else {
170            Err(TicketAggregationError::TransportError(
171                "Ticket aggregation writer not available, the object was not yet initialized".to_string(),
172            ))
173        }
174    }
175}
176
177/// Currently used implementation of [`PathSelector`](hopr_path::selectors::PathSelector).
178type CurrentPathSelector = DfsPathSelector<RandomizedEdgeWeighting>;
179
180/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
181/// the transport, as well as off-chain ticket manipulation.
182pub struct HoprTransport<T>
183where
184    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
185{
186    me: OffchainKeypair,
187    me_peerid: PeerId, // Cache to avoid an expensive conversion: OffchainPublicKey -> PeerId
188    me_address: Address,
189    cfg: HoprTransportConfig,
190    db: T,
191    ping: Arc<OnceLock<Pinger>>,
192    network: Arc<Network<T>>,
193    process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
194    path_planner: PathPlanner<T, CurrentPathSelector>,
195    my_multiaddresses: Vec<Multiaddr>,
196    process_ticket_aggregate:
197        Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
198    smgr: SessionManager<Sender<(DestinationRouting, ApplicationData)>>,
199}
200
201impl<T> HoprTransport<T>
202where
203    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
204{
205    pub fn new(
206        me: &OffchainKeypair,
207        me_onchain: &ChainKeypair,
208        cfg: HoprTransportConfig,
209        db: T,
210        channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
211        my_multiaddresses: Vec<Multiaddr>,
212    ) -> Self {
213        let process_packet_send = Arc::new(OnceLock::new());
214
215        let me_peerid: PeerId = me.into();
216        let me_chain_addr = me_onchain.public().to_address();
217
218        Self {
219            me: me.clone(),
220            me_peerid,
221            me_address: me_chain_addr,
222            ping: Arc::new(OnceLock::new()),
223            network: Arc::new(Network::new(
224                me_peerid,
225                my_multiaddresses.clone(),
226                cfg.network.clone(),
227                db.clone(),
228            )),
229            process_packet_send,
230            path_planner: PathPlanner::new(
231                me_chain_addr,
232                db.clone(),
233                CurrentPathSelector::new(
234                    channel_graph.clone(),
235                    DfsPathSelectorConfig {
236                        node_score_threshold: cfg.network.node_score_auto_path_threshold,
237                        max_first_hop_latency: cfg.network.max_first_hop_latency_threshold,
238                        ..Default::default()
239                    },
240                ),
241                channel_graph.clone(),
242            ),
243            my_multiaddresses,
244            process_ticket_aggregate: Arc::new(OnceLock::new()),
245            smgr: SessionManager::new(SessionManagerConfig {
246                // TODO(v3.1): Use the entire range of tags properly
247                session_tag_range: (16..65535),
248                maximum_sessions: cfg.session.maximum_sessions as usize,
249                initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
250                idle_timeout: cfg.session.idle_timeout,
251                balancer_sampling_interval: cfg.session.balancer_sampling_interval,
252                initial_return_session_egress_rate: 10,
253                minimum_surb_buffer_duration: Duration::from_secs(5),
254                maximum_surb_buffer_size: db.get_surb_config().rb_capacity,
255                // Allow a 10% increase of the target SURB buffer on incoming Sessions
256                // if the SURB buffer level has surpassed it by at least 10% in the last 2 minutes.
257                growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
258            }),
259            db,
260            cfg,
261        }
262    }
263
264    /// Execute all processes of the [`crate::HoprTransport`] object.
265    ///
266    /// This method will spawn the [`crate::HoprTransportProcess::Heartbeat`],
267    /// [`crate::HoprTransportProcess::BloomFilterSave`], [`crate::HoprTransportProcess::Swarm`] and session-related
268    /// processes and return join handles to the calling function. These processes are not started immediately but
269    /// are waiting for a trigger from this piece of code.
270    #[allow(clippy::too_many_arguments)]
271    pub async fn run(
272        &self,
273        me_onchain: &ChainKeypair,
274        tbf_path: String,
275        on_incoming_data: UnboundedSender<ApplicationData>,
276        discovery_updates: UnboundedReceiver<PeerDiscovery>,
277        on_incoming_session: UnboundedSender<IncomingSession>,
278    ) -> crate::errors::Result<HashMap<HoprTransportProcess, AbortHandle>> {
279        let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
280            futures::channel::mpsc::unbounded::<PeerDiscovery>();
281
282        let network_clone = self.network.clone();
283        let db_clone = self.db.clone();
284        let me_peerid = self.me_peerid;
285        let discovery_updates =
286            futures_concurrency::stream::StreamExt::merge(discovery_updates, internal_discovery_update_rx)
287                .filter_map(move |event| {
288                    let network = network_clone.clone();
289                    let db = db_clone.clone();
290                    let me = me_peerid;
291
292                    async move {
293                        match event {
294                            PeerDiscovery::Allow(peer_id) => {
295                                debug!(peer = %peer_id, "Processing peer discovery event: Allow");
296                                if let Ok(pk) = OffchainPublicKey::try_from(peer_id) {
297                                    if !network.has(&peer_id).await {
298                                        let mas = db
299                                            .get_account(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pk))
300                                            .await
301                                            .map(|entry| {
302                                                entry
303                                                    .map(|v| Vec::from_iter(v.get_multiaddr().into_iter()))
304                                                    .unwrap_or_default()
305                                            })
306                                            .unwrap_or_default();
307
308                                        if let Err(e) = network.add(&peer_id, PeerOrigin::NetworkRegistry, mas).await {
309                                            error!(peer = %peer_id, error = %e, "Failed to allow locally (already allowed on-chain)");
310                                            return None;
311                                        }
312                                    }
313
314                                    return Some(PeerDiscovery::Allow(peer_id))
315                                } else {
316                                    error!(peer = %peer_id, "Failed to allow locally (already allowed on-chain): peer id not convertible to off-chain public key")
317                                }
318                            }
319                            PeerDiscovery::Ban(peer_id) => {
320                                if let Err(e) = network.remove(&peer_id).await {
321                                    error!(peer = %peer_id, error = %e, "Failed to ban locally (already banned on-chain)")
322                                } else {
323                                    return Some(PeerDiscovery::Ban(peer_id))
324                                }
325                            }
326                            PeerDiscovery::Announce(peer, multiaddresses) => {
327                                debug!(peer = %peer, ?multiaddresses, "Processing peer discovery event: Announce");
328                                if peer != me {
329                                    // decapsulate the `p2p/<peer_id>` to remove duplicities
330                                    let mas = multiaddresses
331                                        .into_iter()
332                                        .map(|ma| strip_p2p_protocol(&ma))
333                                        .filter(|v| !v.is_empty())
334                                        .collect::<Vec<_>>();
335
336                                    if ! mas.is_empty() {
337                                        if let Ok(pk) = OffchainPublicKey::try_from(peer) {
338                                            if let Ok(Some(key)) = db.translate_key(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pk)).await {
339                                                let key: Result<Address, _> = key.try_into();
340
341                                                if let Ok(key) = key {
342                                                    if db
343                                                        .is_allowed_in_network_registry(None, &key)
344                                                        .await
345                                                        .unwrap_or(false)
346                                                    {
347                                                        if let Err(e) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await
348                                                        {
349                                                            error!(%peer, error = %e, "failed to record peer from the NetworkRegistry");
350                                                        } else {
351                                                            return Some(PeerDiscovery::Announce(peer, mas))
352                                                        }
353                                                    }
354                                                }
355                                            } else {
356                                                error!(%peer, "Failed to announce peer due to convertibility error");
357                                            }
358                                        }
359                                    }
360                                }
361                            }
362                        }
363
364                        None
365                    }
366                });
367
368        info!("Loading initial peers from the storage");
369
370        let mut addresses: HashSet<Multiaddr> = HashSet::new();
371        let nodes = self.get_public_nodes().await?;
372        for (peer, _address, multiaddresses) in nodes {
373            if self.is_allowed_to_access_network(either::Left(&peer)).await? {
374                debug!(%peer, ?multiaddresses, "Using initial public node");
375                addresses.extend(multiaddresses.clone());
376
377                internal_discovery_update_tx
378                    .send(PeerDiscovery::Announce(peer, multiaddresses.clone()))
379                    .await
380                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
381
382                internal_discovery_update_tx
383                    .send(PeerDiscovery::Allow(peer))
384                    .await
385                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
386            }
387        }
388
389        let mut processes: HashMap<HoprTransportProcess, AbortHandle> = HashMap::new();
390
391        let ticket_agg_proc = TicketAggregationInteraction::new(self.db.clone(), me_onchain);
392        let tkt_agg_writer = ticket_agg_proc.writer();
393
394        let (external_msg_send, external_msg_rx) =
395            mpsc::channel::<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>(
396                MAXIMUM_MSG_OUTGOING_BUFFER_SIZE,
397            );
398
399        self.process_packet_send
400            .clone()
401            .set(MsgSender::new(external_msg_send.clone()))
402            .expect("must set the packet processing writer only once");
403
404        self.process_ticket_aggregate
405            .clone()
406            .set(tkt_agg_writer.clone())
407            .expect("must set the ticket aggregation writer only once");
408
409        // -- transport medium
410        let mixer_cfg = build_mixer_cfg_from_env();
411
412        #[cfg(feature = "mixer-channel")]
413        let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
414
415        #[cfg(feature = "mixer-stream")]
416        let (mixing_channel_tx, mixing_channel_rx) = {
417            let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
418            let rx = rx.then_concurrent(move |v| {
419                let cfg = mixer_cfg;
420
421                async move {
422                    let random_delay = cfg.random_delay();
423                    trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
424
425                    #[cfg(all(feature = "prometheus", not(test)))]
426                    hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
427
428                    sleep(random_delay).await;
429
430                    #[cfg(all(feature = "prometheus", not(test)))]
431                    {
432                        hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
433
434                        let weight = 1.0f64 / cfg.metric_delay_window as f64;
435                        hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
436                            (weight * random_delay.as_millis() as f64)
437                                + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
438                        );
439                    }
440
441                    v
442                }
443            });
444
445            (tx, rx)
446        };
447
448        let mut transport_layer =
449            HoprSwarm::new((&self.me).into(), discovery_updates, self.my_multiaddresses.clone()).await;
450
451        if let Some(port) = self.cfg.protocol.autonat_port {
452            transport_layer.run_nat_server(port);
453        }
454
455        if addresses.is_empty() {
456            warn!("No addresses found in the database, not dialing any NAT servers");
457        } else {
458            info!(num_addresses = addresses.len(), "Found addresses from the database");
459            let mut randomized_addresses: Vec<_> = addresses.into_iter().collect();
460            randomized_addresses.shuffle(&mut rand::thread_rng());
461            transport_layer.dial_nat_server(randomized_addresses);
462        }
463
464        let msg_proto_control =
465            transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
466        let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
467        let (wire_msg_tx, wire_msg_rx) =
468            hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
469
470        let _mixing_process_before_sending_out = hopr_async_runtime::prelude::spawn(
471            mixing_channel_rx
472                .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
473                .map(Ok)
474                .forward(wire_msg_tx),
475        );
476
477        let (transport_events_tx, transport_events_rx) =
478            futures::channel::mpsc::channel::<hopr_transport_p2p::DiscoveryEvent>(1000);
479
480        let network_clone = self.network.clone();
481        spawn(transport_events_rx.for_each(move |event| {
482            let network = network_clone.clone();
483
484            async move {
485                match event {
486                    hopr_transport_p2p::DiscoveryEvent::IncomingConnection(peer, multiaddr) => {
487                        if let Err(error) = network
488                            .add(&peer, PeerOrigin::IncomingConnection, vec![multiaddr])
489                            .await
490                        {
491                            tracing::error!(%peer, %error, "Failed to add incoming connection peer");
492                        }
493                    }
494                    hopr_transport_p2p::DiscoveryEvent::FailedDial(peer) => {
495                        if let Err(error) = network
496                            .update(&peer, Err(hopr_transport_network::network::UpdateFailure::DialFailure))
497                            .await
498                        {
499                            tracing::error!(%peer, %error, "Failed to update peer status after failed dial");
500                        }
501                    }
502                }
503            }
504        }));
505
506        processes.insert(
507            HoprTransportProcess::Medium,
508            spawn_as_abortable!(transport_layer.run(transport_events_tx)),
509        );
510
511        // -- msg-ack protocol over the wire transport
512        let packet_cfg = PacketInteractionConfig {
513            packet_keypair: self.me.clone(),
514            outgoing_ticket_win_prob: self
515                .cfg
516                .protocol
517                .outgoing_ticket_winning_prob
518                .map(WinningProbability::try_from)
519                .transpose()?,
520            outgoing_ticket_price: self.cfg.protocol.outgoing_ticket_price,
521        };
522
523        let (tx_from_protocol, rx_from_protocol) = unbounded::<(HoprPseudonym, ApplicationData)>();
524        for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
525            packet_cfg,
526            self.db.clone(),
527            Some(tbf_path),
528            (
529                mixing_channel_tx.with(|(peer, msg): (PeerId, Box<[u8]>)| {
530                    trace!(%peer, "sending message to peer");
531                    futures::future::ok::<_, hopr_transport_mixer::channel::SenderError>((peer, msg))
532                }),
533                wire_msg_rx.inspect(|(peer, _)| trace!(%peer, "received message from peer")),
534            ),
535            (tx_from_protocol, external_msg_rx),
536        )
537        .await
538        .into_iter()
539        {
540            processes.insert(HoprTransportProcess::Protocol(k), v);
541        }
542
543        // -- network probing
544        let (tx_from_probing, rx_from_probing) = unbounded::<(HoprPseudonym, ApplicationData)>();
545
546        let (manual_ping_tx, manual_ping_rx) = unbounded::<(PeerId, PingQueryReplier)>();
547
548        let probe = Probe::new((*self.me.public(), self.me_address), self.cfg.probe);
549        for (k, v) in probe
550            .continuously_scan(
551                (external_msg_send, rx_from_protocol),
552                manual_ping_rx,
553                network_notifier::ProbeNetworkInteractions::new(
554                    self.network.clone(),
555                    self.db.clone(),
556                    self.path_planner.channel_graph(),
557                ),
558                DbProxy::new(self.db.clone()),
559                tx_from_probing,
560            )
561            .await
562            .into_iter()
563        {
564            processes.insert(HoprTransportProcess::Probing(k), v);
565        }
566
567        // manual ping
568        self.ping
569            .clone()
570            .set(Pinger::new(
571                PingConfig {
572                    timeout: self.cfg.probe.timeout,
573                },
574                manual_ping_tx,
575            ))
576            .expect("must set the ticket aggregation writer only once");
577
578        // -- session management
579        let packet_planner = run_packet_planner(
580            self.path_planner.clone(),
581            self.process_packet_send
582                .get()
583                .cloned()
584                .expect("packet sender must be set"),
585        );
586
587        self.smgr
588            .start(packet_planner, on_incoming_session)
589            .expect("failed to start session manager")
590            .into_iter()
591            .enumerate()
592            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
593            .for_each(|(k, v)| {
594                processes.insert(k, v);
595            });
596
597        let smgr = self.smgr.clone();
598        processes.insert(
599            HoprTransportProcess::SessionsManagement(0),
600            spawn_as_abortable!(async move {
601                let _the_process_should_not_end = StreamExt::filter_map(rx_from_probing, |(pseudonym, data)| {
602                    let smgr = smgr.clone();
603                    async move {
604                        match smgr.dispatch_message(pseudonym, data).await {
605                            Ok(DispatchResult::Processed) => {
606                                trace!("message dispatch completed");
607                                None
608                            }
609                            Ok(DispatchResult::Unrelated(data)) => {
610                                trace!("unrelated message dispatch completed");
611                                Some(data)
612                            }
613                            Err(e) => {
614                                error!(error = %e, "error while processing packet");
615                                None
616                            }
617                        }
618                    }
619                })
620                .map(Ok)
621                .forward(on_incoming_data)
622                .await;
623            }),
624        );
625
626        Ok(processes)
627    }
628
629    pub fn ticket_aggregator(&self) -> Arc<dyn TicketAggregatorTrait + Send + Sync + 'static> {
630        Arc::new(proxy::TicketAggregatorProxy::new(
631            self.db.clone(),
632            self.process_ticket_aggregate.clone(),
633            std::time::Duration::from_secs(15),
634        ))
635    }
636
637    #[tracing::instrument(level = "debug", skip(self))]
638    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
639        if !self.is_allowed_to_access_network(either::Left(peer)).await? {
640            return Err(HoprTransportError::Api(format!(
641                "ping to '{peer}' not allowed due to network registry"
642            )));
643        }
644
645        if peer == &self.me_peerid {
646            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
647        }
648
649        let pinger = self
650            .ping
651            .get()
652            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
653
654        if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
655            error!(error = %e, "Failed to store the peer observation");
656        }
657
658        let latency = (*pinger).ping(*peer).await?;
659
660        let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::Probe(
661            hopr_transport_probe::errors::ProbeError::NonExistingPeer,
662        ))?;
663
664        Ok((latency, peer_status))
665    }
666
667    #[tracing::instrument(level = "debug", skip(self))]
668    pub async fn new_session(
669        &self,
670        destination: Address,
671        target: SessionTarget,
672        cfg: SessionClientConfig,
673    ) -> errors::Result<Session> {
674        Ok(self.smgr.new_session(destination, target, cfg).await?)
675    }
676
677    #[tracing::instrument(level = "debug", skip(self))]
678    pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
679        Ok(self.smgr.ping_session(id).await?)
680    }
681
682    pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
683        Ok(self.smgr.get_surb_balancer_config(id).await?)
684    }
685
686    pub async fn update_session_surb_balancing_cfg(
687        &self,
688        id: &SessionId,
689        cfg: SurbBalancerConfig,
690    ) -> errors::Result<()> {
691        Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
692    }
693
694    #[tracing::instrument(level = "info", skip(self, msg), fields(uuid = uuid::Uuid::new_v4().to_string()))]
695    pub async fn send_message(&self, msg: Box<[u8]>, routing: DestinationRouting, tag: Tag) -> errors::Result<()> {
696        if let Tag::Reserved(reserved_tag) = tag {
697            return Err(HoprTransportError::Api(format!(
698                "Application tag must not from range: {:?}, but was {reserved_tag:?}",
699                Tag::APPLICATION_TAG_RANGE
700            )));
701        }
702
703        if msg.len() > HoprPacket::PAYLOAD_SIZE {
704            return Err(HoprTransportError::Api(format!(
705                "Message exceeds the maximum allowed size of {} bytes",
706                HoprPacket::PAYLOAD_SIZE
707            )));
708        }
709
710        let app_data = ApplicationData::new_from_owned(tag, msg);
711        let routing = self.path_planner.resolve_routing(app_data.len(), routing).await?.0;
712
713        // Here we do not use msg_sender directly,
714        // since it internally follows Session-oriented logic
715        let sender = self.process_packet_send.get().ok_or_else(|| {
716            HoprTransportError::Api("send msg: failed because message processing is not yet initialized".into())
717        })?;
718
719        sender
720            .send_packet(app_data, routing)
721            .await
722            .map_err(|e| HoprTransportError::Api(format!("send msg failed to enqueue msg: {e}")))?
723            .consume_and_wait(crate::constants::PACKET_QUEUE_TIMEOUT_MILLISECONDS)
724            .await
725            .map_err(|e| HoprTransportError::Api(e.to_string()))
726    }
727
728    #[tracing::instrument(level = "debug", skip(self))]
729    pub async fn aggregate_tickets(&self, channel_id: &Hash) -> errors::Result<()> {
730        let entry = self
731            .db
732            .get_channel_by_id(None, channel_id)
733            .await
734            .map_err(hopr_db_sql::api::errors::DbError::from)
735            .map_err(HoprTransportError::from)
736            .and_then(|c| {
737                if let Some(c) = c {
738                    Ok(c)
739                } else {
740                    Err(ProtocolError::ChannelNotFound.into())
741                }
742            })?;
743
744        if entry.status != ChannelStatus::Open {
745            return Err(ProtocolError::ChannelClosed.into());
746        }
747
748        // Ok(Arc::new(proxy::TicketAggregatorProxy::new(
749        //     self.db.clone(),
750        //     self.process_ticket_aggregate.clone(),
751        //     std::time::Duration::from_secs(15),
752        // ))
753        // .aggregate_tickets(&entry.get_id(), Default::default())
754        // .await?)
755
756        Err(TicketAggregationError::TransportError(
757            "Ticket aggregation not supported as a session protocol yet".to_string(),
758        )
759        .into())
760    }
761
762    #[tracing::instrument(level = "debug", skip(self))]
763    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
764        Ok(self
765            .db
766            .get_accounts(None, true)
767            .await
768            .map_err(hopr_db_sql::api::errors::DbError::from)?
769            .into_iter()
770            .map(|entry| {
771                (
772                    PeerId::from(entry.public_key),
773                    entry.chain_addr,
774                    Vec::from_iter(entry.get_multiaddr().into_iter()),
775                )
776            })
777            .collect())
778    }
779
780    pub async fn is_allowed_to_access_network<'a>(
781        &self,
782        address_like: either::Either<&'a PeerId, Address>,
783    ) -> errors::Result<bool>
784    where
785        T: 'a,
786    {
787        let db_clone = self.db.clone();
788        let address_like_noref = address_like.map_left(|peer| *peer);
789
790        Ok(self
791            .db
792            .begin_transaction()
793            .await
794            .map_err(hopr_db_sql::api::errors::DbError::from)?
795            .perform(|tx| {
796                Box::pin(async move {
797                    match address_like_noref {
798                        either::Left(peer) => {
799                            let pk = OffchainPublicKey::try_from(peer)?;
800                            if let Some(address) = db_clone.translate_key(Some(tx), pk).await? {
801                                db_clone.is_allowed_in_network_registry(Some(tx), &address).await
802                            } else {
803                                Err(hopr_db_sql::errors::DbSqlError::LogicalError(
804                                    "cannot translate off-chain key".into(),
805                                ))
806                            }
807                        }
808                        either::Right(address) => db_clone.is_allowed_in_network_registry(Some(tx), &address).await,
809                    }
810                })
811            })
812            .await
813            .map_err(hopr_db_sql::api::errors::DbError::from)?)
814    }
815
816    #[tracing::instrument(level = "debug", skip(self))]
817    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
818        self.network
819            .get(&self.me_peerid)
820            .await
821            .unwrap_or_else(|e| {
822                error!(error = %e, "failed to obtain listening multi-addresses");
823                None
824            })
825            .map(|peer| peer.multiaddresses)
826            .unwrap_or_default()
827    }
828
829    #[tracing::instrument(level = "debug", skip(self))]
830    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
831        let mut mas = self
832            .local_multiaddresses()
833            .into_iter()
834            .filter(|ma| {
835                hopr_transport_identity::multiaddrs::is_supported(ma)
836                    && (self.cfg.transport.announce_local_addresses
837                        || !hopr_transport_identity::multiaddrs::is_private(ma))
838            })
839            .map(|ma| strip_p2p_protocol(&ma))
840            .filter(|v| !v.is_empty())
841            .collect::<Vec<_>>();
842
843        mas.sort_by(|l, r| {
844            let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
845            let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
846
847            if !(is_left_dns ^ is_right_dns) {
848                std::cmp::Ordering::Equal
849            } else if is_left_dns {
850                std::cmp::Ordering::Less
851            } else {
852                std::cmp::Ordering::Greater
853            }
854        });
855
856        mas
857    }
858
859    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
860        self.my_multiaddresses.clone()
861    }
862
863    #[tracing::instrument(level = "debug", skip(self))]
864    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
865        self.network
866            .get(peer)
867            .await
868            .unwrap_or(None)
869            .map(|peer| peer.multiaddresses)
870            .unwrap_or(vec![])
871    }
872
873    #[tracing::instrument(level = "debug", skip(self))]
874    pub async fn network_health(&self) -> Health {
875        self.network.health().await
876    }
877
878    #[tracing::instrument(level = "debug", skip(self))]
879    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
880        Ok(self.network.connected_peers().await?)
881    }
882
883    #[tracing::instrument(level = "debug", skip(self))]
884    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
885        Ok(self.network.get(peer).await?)
886    }
887
888    #[tracing::instrument(level = "debug", skip(self))]
889    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
890        let ticket_stats = self.db.get_ticket_statistics(None).await?;
891
892        Ok(TicketStatistics {
893            winning_count: ticket_stats.winning_tickets,
894            unredeemed_value: ticket_stats.unredeemed_value,
895            redeemed_value: ticket_stats.redeemed_value,
896            neglected_value: ticket_stats.neglected_value,
897            rejected_value: ticket_stats.rejected_value,
898        })
899    }
900
901    #[tracing::instrument(level = "debug", skip(self))]
902    pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
903        if let Some(channel) = self
904            .db
905            .get_channel_by_id(None, channel_id)
906            .await
907            .map_err(hopr_db_sql::api::errors::DbError::from)?
908        {
909            let own_address: Address = self
910                .db
911                .translate_key(None, ChainOrPacketKey::PacketKey(*self.me.public()))
912                .await?
913                .ok_or_else(|| {
914                    HoprTransportError::Api("Failed to translate the off-chain key to on-chain address".into())
915                })?
916                .try_into()?;
917
918            if channel.destination == own_address {
919                Ok(Some(self.db.get_tickets((&channel).into()).await?))
920            } else {
921                Ok(None)
922            }
923        } else {
924            Ok(None)
925        }
926    }
927
928    #[tracing::instrument(level = "debug", skip(self))]
929    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
930        Ok(self
931            .db
932            .get_all_tickets()
933            .await?
934            .into_iter()
935            .map(|v| v.ticket.leak())
936            .collect())
937    }
938}
939
940fn build_mixer_cfg_from_env() -> MixerConfig {
941    let mixer_cfg = MixerConfig {
942        min_delay: std::time::Duration::from_millis(
943            std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
944                .map(|v| {
945                    v.trim()
946                        .parse::<u64>()
947                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
948                })
949                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
950        ),
951        delay_range: std::time::Duration::from_millis(
952            std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
953                .map(|v| {
954                    v.trim()
955                        .parse::<u64>()
956                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
957                })
958                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
959        ),
960        capacity: std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
961            .map(|v| {
962                v.trim()
963                    .parse::<usize>()
964                    .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY)
965            })
966            .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY),
967        ..MixerConfig::default()
968    };
969    debug!(?mixer_cfg, "Mixer configuration");
970
971    mixer_cfg
972}