hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! It also implements the Session negotiation via Start sub-protocol.
6//! See the [`hopr_transport_session::initiation`] module for details on Start sub-protocol.
7//!
8//! As such, the transport layer components should be only those that are directly necessary to:
9//!
10//! 1. send and receive a packet, acknowledgement or ticket aggregation request
11//! 2. send and receive a network telemetry request
12//! 3. automate transport level processes
13//! 4. algorithms associated with the transport layer operational management
14//! 5. interface specifications to allow modular behavioral extensions
15
16/// Configuration of the [crate::HoprTransport].
17pub mod config;
18/// Constants used and exposed by the crate.
19pub mod constants;
20/// Errors used by the crate.
21pub mod errors;
22pub mod helpers;
23pub mod network_notifier;
24/// Objects used and possibly exported by the crate for re-use for transport functionality
25pub mod proxy;
26
27use std::{
28    collections::{HashMap, HashSet},
29    sync::{Arc, OnceLock},
30};
31
32use async_lock::RwLock;
33use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
34use futures::{
35    SinkExt, StreamExt,
36    channel::mpsc::{self, Sender, UnboundedReceiver, UnboundedSender, unbounded},
37};
38use helpers::PathPlanner;
39use hopr_async_runtime::{AbortHandle, spawn_as_abortable};
40use hopr_crypto_packet::prelude::HoprPacket;
41pub use hopr_crypto_types::{
42    keypairs::{ChainKeypair, Keypair, OffchainKeypair},
43    types::{HalfKeyChallenge, Hash, OffchainPublicKey},
44};
45use hopr_db_sql::{
46    HoprDbAllOperations,
47    accounts::ChainOrPacketKey,
48    api::tickets::{AggregationPrerequisites, HoprDbTicketOperations},
49};
50pub use hopr_internal_types::prelude::HoprPseudonym;
51use hopr_internal_types::prelude::*;
52pub use hopr_network_types::prelude::RoutingOptions;
53use hopr_network_types::prelude::{DestinationRouting, ResolvedTransportRouting};
54use hopr_path::{
55    PathAddressResolver,
56    selectors::dfs::{DfsPathSelector, DfsPathSelectorConfig, RandomizedEdgeWeighting},
57};
58use hopr_primitive_types::prelude::*;
59use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
60pub use hopr_transport_identity::{Multiaddr, PeerId};
61use hopr_transport_mixer::MixerConfig;
62pub use hopr_transport_network::network::{Health, Network, PeerOrigin, PeerStatus};
63use hopr_transport_p2p::{
64    HoprSwarm,
65    swarm::{TicketAggregationRequestType, TicketAggregationResponseType},
66};
67pub use hopr_transport_packet::prelude::{ApplicationData, Tag};
68use hopr_transport_probe::{
69    DbProxy, Probe,
70    ping::{PingConfig, Pinger},
71};
72pub use hopr_transport_probe::{errors::ProbeError, ping::PingQueryReplier};
73pub use hopr_transport_protocol::{PeerDiscovery, execute_on_tick};
74use hopr_transport_protocol::{
75    errors::ProtocolError,
76    processor::{MsgSender, PacketInteractionConfig, PacketSendFinalizer, SendMsgInput},
77};
78#[cfg(feature = "runtime-tokio")]
79pub use hopr_transport_session::transfer_session;
80pub use hopr_transport_session::{
81    Capabilities as SessionCapabilities, Capability as SessionCapability, IncomingSession, SESSION_PAYLOAD_SIZE,
82    ServiceId, Session, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
83    errors::TransportSessionError, traits::SendMsg,
84};
85use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
86use hopr_transport_ticket_aggregation::{
87    AwaitingAggregator, TicketAggregationActions, TicketAggregationError, TicketAggregationInteraction,
88    TicketAggregatorTrait,
89};
90use rand::seq::SliceRandom;
91#[cfg(feature = "mixer-stream")]
92use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
93use tracing::{debug, error, info, trace, warn};
94
95pub use crate::{
96    config::HoprTransportConfig,
97    helpers::{PeerEligibility, TicketStatistics},
98};
99use crate::{constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError};
100
101pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
102
103#[cfg(any(
104    all(feature = "mixer-channel", feature = "mixer-stream"),
105    all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
106))]
107compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
108
109// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
110lazy_static::lazy_static! {
111    static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
112}
113
114#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
115pub enum HoprTransportProcess {
116    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
117    Medium,
118    #[strum(to_string = "HOPR protocol ({0})")]
119    Protocol(hopr_transport_protocol::ProtocolProcesses),
120    #[strum(to_string = "session manager sub-process #{0}")]
121    SessionsManagement(usize),
122    #[strum(to_string = "network probing sub-process: {0}")]
123    Probing(hopr_transport_probe::HoprProbeProcess),
124}
125
126#[derive(Debug, Clone)]
127pub struct TicketAggregatorProxy<Db>
128where
129    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
130{
131    db: Db,
132    maybe_writer: Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
133    agg_timeout: std::time::Duration,
134}
135
136impl<Db> TicketAggregatorProxy<Db>
137where
138    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
139{
140    pub fn new(
141        db: Db,
142        maybe_writer: Arc<
143            OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>,
144        >,
145        agg_timeout: std::time::Duration,
146    ) -> Self {
147        Self {
148            db,
149            maybe_writer,
150            agg_timeout,
151        }
152    }
153}
154
155#[async_trait::async_trait]
156impl<Db> TicketAggregatorTrait for TicketAggregatorProxy<Db>
157where
158    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
159{
160    async fn aggregate_tickets(
161        &self,
162        channel: &Hash,
163        prerequisites: AggregationPrerequisites,
164    ) -> hopr_transport_ticket_aggregation::Result<()> {
165        if let Some(writer) = self.maybe_writer.clone().get() {
166            AwaitingAggregator::new(self.db.clone(), writer.clone(), self.agg_timeout)
167                .aggregate_tickets(channel, prerequisites)
168                .await
169        } else {
170            Err(TicketAggregationError::TransportError(
171                "Ticket aggregation writer not available, the object was not yet initialized".to_string(),
172            ))
173        }
174    }
175}
176
177/// Currently used implementation of [`PathSelector`](hopr_path::selectors::PathSelector).
178type CurrentPathSelector = DfsPathSelector<RandomizedEdgeWeighting>;
179
180/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
181/// the transport, as well as off-chain ticket manipulation.
182pub struct HoprTransport<T>
183where
184    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
185{
186    me: OffchainKeypair,
187    me_peerid: PeerId, // Cache to avoid an expensive conversion: OffchainPublicKey -> PeerId
188    me_address: Address,
189    cfg: HoprTransportConfig,
190    db: T,
191    ping: Arc<OnceLock<Pinger>>,
192    network: Arc<Network<T>>,
193    process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
194    path_planner: PathPlanner<T, CurrentPathSelector>,
195    my_multiaddresses: Vec<Multiaddr>,
196    process_ticket_aggregate:
197        Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
198    smgr: SessionManager<helpers::MessageSender<T, CurrentPathSelector>>,
199}
200
201impl<T> HoprTransport<T>
202where
203    T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
204{
205    pub fn new(
206        me: &OffchainKeypair,
207        me_onchain: &ChainKeypair,
208        cfg: HoprTransportConfig,
209        db: T,
210        channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
211        my_multiaddresses: Vec<Multiaddr>,
212    ) -> Self {
213        let process_packet_send = Arc::new(OnceLock::new());
214
215        let me_peerid: PeerId = me.into();
216        let me_chain_addr = me_onchain.public().to_address();
217
218        Self {
219            me: me.clone(),
220            me_peerid,
221            me_address: me_chain_addr,
222            ping: Arc::new(OnceLock::new()),
223            network: Arc::new(Network::new(
224                me_peerid,
225                my_multiaddresses.clone(),
226                cfg.network.clone(),
227                db.clone(),
228            )),
229            process_packet_send,
230            path_planner: PathPlanner::new(
231                me_chain_addr,
232                db.clone(),
233                CurrentPathSelector::new(
234                    channel_graph.clone(),
235                    DfsPathSelectorConfig {
236                        node_score_threshold: cfg.network.node_score_auto_path_threshold,
237                        max_first_hop_latency: cfg.network.max_first_hop_latency_threshold,
238                        ..Default::default()
239                    },
240                ),
241                channel_graph.clone(),
242            ),
243            db,
244            my_multiaddresses,
245            process_ticket_aggregate: Arc::new(OnceLock::new()),
246            smgr: SessionManager::new(SessionManagerConfig {
247                // TODO(v3.1): Use the entire range of tags properly
248                session_tag_range: (16..65535),
249                initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
250                idle_timeout: cfg.session.idle_timeout,
251                balancer_sampling_interval: cfg.session.balancer_sampling_interval,
252            }),
253            cfg,
254        }
255    }
256
257    /// Execute all processes of the [`crate::HoprTransport`] object.
258    ///
259    /// This method will spawn the [`crate::HoprTransportProcess::Heartbeat`],
260    /// [`crate::HoprTransportProcess::BloomFilterSave`], [`crate::HoprTransportProcess::Swarm`] and session-related
261    /// processes and return join handles to the calling function. These processes are not started immediately but
262    /// are waiting for a trigger from this piece of code.
263    #[allow(clippy::too_many_arguments)]
264    pub async fn run(
265        &self,
266        me_onchain: &ChainKeypair,
267        tbf_path: String,
268        on_incoming_data: UnboundedSender<ApplicationData>,
269        discovery_updates: UnboundedReceiver<PeerDiscovery>,
270        on_incoming_session: UnboundedSender<IncomingSession>,
271    ) -> crate::errors::Result<HashMap<HoprTransportProcess, AbortHandle>> {
272        let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
273            futures::channel::mpsc::unbounded::<PeerDiscovery>();
274
275        let network_clone = self.network.clone();
276        let db_clone = self.db.clone();
277        let me_peerid = self.me_peerid;
278        let discovery_updates =
279            futures_concurrency::stream::StreamExt::merge(discovery_updates, internal_discovery_update_rx)
280                .filter_map(move |event| {
281                    let network = network_clone.clone();
282                    let db = db_clone.clone();
283                    let me = me_peerid;
284
285                    async move {
286                        match event {
287                            PeerDiscovery::Allow(peer_id) => {
288                                debug!(peer = %peer_id, "Processing peer discovery event: Allow");
289                                if let Ok(pk) = OffchainPublicKey::try_from(peer_id) {
290                                    if !network.has(&peer_id).await {
291                                        let mas = db
292                                            .get_account(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pk))
293                                            .await
294                                            .map(|entry| {
295                                                entry
296                                                    .map(|v| Vec::from_iter(v.get_multiaddr().into_iter()))
297                                                    .unwrap_or_default()
298                                            })
299                                            .unwrap_or_default();
300
301                                        if let Err(e) = network.add(&peer_id, PeerOrigin::NetworkRegistry, mas).await {
302                                            error!(peer = %peer_id, error = %e, "Failed to allow locally (already allowed on-chain)");
303                                            return None;
304                                        }
305                                    }
306
307                                    return Some(PeerDiscovery::Allow(peer_id))
308                                } else {
309                                    error!(peer = %peer_id, "Failed to allow locally (already allowed on-chain): peer id not convertible to off-chain public key")
310                                }
311                            }
312                            PeerDiscovery::Ban(peer_id) => {
313                                if let Err(e) = network.remove(&peer_id).await {
314                                    error!(peer = %peer_id, error = %e, "Failed to ban locally (already banned on-chain)")
315                                } else {
316                                    return Some(PeerDiscovery::Ban(peer_id))
317                                }
318                            }
319                            PeerDiscovery::Announce(peer, multiaddresses) => {
320                                debug!(peer = %peer, ?multiaddresses, "Processing peer discovery event: Announce");
321                                if peer != me {
322                                    // decapsulate the `p2p/<peer_id>` to remove duplicities
323                                    let mas = multiaddresses
324                                        .into_iter()
325                                        .map(|ma| strip_p2p_protocol(&ma))
326                                        .filter(|v| !v.is_empty())
327                                        .collect::<Vec<_>>();
328
329                                    if ! mas.is_empty() {
330                                        if let Ok(pk) = OffchainPublicKey::try_from(peer) {
331                                            if let Ok(Some(key)) = db.translate_key(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pk)).await {
332                                                let key: Result<Address, _> = key.try_into();
333
334                                                if let Ok(key) = key {
335                                                    if db
336                                                        .is_allowed_in_network_registry(None, &key)
337                                                        .await
338                                                        .unwrap_or(false)
339                                                    {
340                                                        if let Err(e) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await
341                                                        {
342                                                            error!(%peer, error = %e, "failed to record peer from the NetworkRegistry");
343                                                        } else {
344                                                            return Some(PeerDiscovery::Announce(peer, mas))
345                                                        }
346                                                    }
347                                                }
348                                            } else {
349                                                error!(%peer, "Failed to announce peer due to convertibility error");
350                                            }
351                                        }
352                                    }
353                                }
354                            }
355                        }
356
357                        None
358                    }
359                });
360
361        info!("Loading initial peers from the storage");
362
363        let mut addresses: HashSet<Multiaddr> = HashSet::new();
364        let nodes = self.get_public_nodes().await?;
365        for (peer, _address, multiaddresses) in nodes {
366            if self.is_allowed_to_access_network(either::Left(&peer)).await? {
367                debug!(%peer, ?multiaddresses, "Using initial public node");
368                addresses.extend(multiaddresses.clone());
369
370                internal_discovery_update_tx
371                    .send(PeerDiscovery::Announce(peer, multiaddresses.clone()))
372                    .await
373                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
374
375                internal_discovery_update_tx
376                    .send(PeerDiscovery::Allow(peer))
377                    .await
378                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
379            }
380        }
381
382        let mut processes: HashMap<HoprTransportProcess, AbortHandle> = HashMap::new();
383
384        let ticket_agg_proc = TicketAggregationInteraction::new(self.db.clone(), me_onchain);
385        let tkt_agg_writer = ticket_agg_proc.writer();
386
387        let (external_msg_send, external_msg_rx) =
388            mpsc::channel::<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>(
389                MAXIMUM_MSG_OUTGOING_BUFFER_SIZE,
390            );
391
392        self.process_packet_send
393            .clone()
394            .set(MsgSender::new(external_msg_send.clone()))
395            .expect("must set the packet processing writer only once");
396
397        self.process_ticket_aggregate
398            .clone()
399            .set(tkt_agg_writer.clone())
400            .expect("must set the ticket aggregation writer only once");
401
402        // -- transport medium
403        let mixer_cfg = build_mixer_cfg_from_env();
404
405        #[cfg(feature = "mixer-channel")]
406        let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
407
408        #[cfg(feature = "mixer-stream")]
409        let (mixing_channel_tx, mixing_channel_rx) = {
410            let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
411            let rx = rx.then_concurrent(move |v| {
412                let cfg = mixer_cfg;
413
414                async move {
415                    let random_delay = cfg.random_delay();
416                    trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
417
418                    #[cfg(all(feature = "prometheus", not(test)))]
419                    hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
420
421                    sleep(random_delay).await;
422
423                    #[cfg(all(feature = "prometheus", not(test)))]
424                    {
425                        hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
426
427                        let weight = 1.0f64 / cfg.metric_delay_window as f64;
428                        hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
429                            (weight * random_delay.as_millis() as f64)
430                                + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
431                        );
432                    }
433
434                    v
435                }
436            });
437
438            (tx, rx)
439        };
440
441        let mut transport_layer =
442            HoprSwarm::new((&self.me).into(), discovery_updates, self.my_multiaddresses.clone()).await;
443
444        if let Some(port) = self.cfg.protocol.autonat_port {
445            transport_layer.run_nat_server(port);
446        }
447
448        if addresses.is_empty() {
449            warn!("No addresses found in the database, not dialing any NAT servers");
450        } else {
451            info!(num_addresses = addresses.len(), "Found addresses from the database");
452            let mut randomized_addresses: Vec<_> = addresses.into_iter().collect();
453            randomized_addresses.shuffle(&mut rand::thread_rng());
454            transport_layer.dial_nat_server(randomized_addresses);
455        }
456
457        let msg_proto_control =
458            transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
459        let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
460        let (wire_msg_tx, wire_msg_rx) =
461            hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
462
463        let _mixing_process_before_sending_out =
464            hopr_async_runtime::prelude::spawn(mixing_channel_rx.map(Ok).forward(wire_msg_tx));
465
466        processes.insert(HoprTransportProcess::Medium, spawn_as_abortable(transport_layer.run()));
467
468        // -- msg-ack protocol over the wire transport
469        let packet_cfg = PacketInteractionConfig {
470            packet_keypair: self.me.clone(),
471            outgoing_ticket_win_prob: self
472                .cfg
473                .protocol
474                .outgoing_ticket_winning_prob
475                .map(WinningProbability::try_from)
476                .transpose()?,
477            outgoing_ticket_price: self.cfg.protocol.outgoing_ticket_price,
478        };
479
480        let (tx_from_protocol, rx_from_protocol) = unbounded::<(HoprPseudonym, ApplicationData)>();
481        for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
482            packet_cfg,
483            self.db.clone(),
484            Some(tbf_path),
485            (mixing_channel_tx, wire_msg_rx),
486            (tx_from_protocol, external_msg_rx),
487        )
488        .await
489        .into_iter()
490        {
491            processes.insert(HoprTransportProcess::Protocol(k), v);
492        }
493
494        // -- network probing
495        let (tx_from_probing, rx_from_probing) = unbounded::<(HoprPseudonym, ApplicationData)>();
496
497        let (manual_ping_tx, manual_ping_rx) = unbounded::<(PeerId, PingQueryReplier)>();
498
499        let probe = Probe::new((*self.me.public(), self.me_address), self.cfg.probe);
500        for (k, v) in probe
501            .continuously_scan(
502                (external_msg_send, rx_from_protocol),
503                manual_ping_rx,
504                network_notifier::ProbeNetworkInteractions::new(
505                    self.network.clone(),
506                    self.db.clone(),
507                    self.path_planner.channel_graph(),
508                ),
509                DbProxy::new(self.db.clone()),
510                tx_from_probing,
511            )
512            .await
513            .into_iter()
514        {
515            processes.insert(HoprTransportProcess::Probing(k), v);
516        }
517
518        // manual ping
519        self.ping
520            .clone()
521            .set(Pinger::new(
522                PingConfig {
523                    timeout: self.cfg.probe.timeout,
524                },
525                manual_ping_tx,
526            ))
527            .expect("must set the ticket aggregation writer only once");
528
529        // -- session management
530        let msg_sender = helpers::MessageSender::new(self.process_packet_send.clone(), self.path_planner.clone());
531        self.smgr
532            .start(msg_sender, on_incoming_session)
533            .expect("failed to start session manager")
534            .into_iter()
535            .enumerate()
536            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
537            .for_each(|(k, v)| {
538                processes.insert(k, v);
539            });
540
541        let smgr = self.smgr.clone();
542        processes.insert(
543            HoprTransportProcess::SessionsManagement(0),
544            spawn_as_abortable(async move {
545                let _the_process_should_not_end = StreamExt::filter_map(rx_from_probing, |(pseudonym, data)| {
546                    let smgr = smgr.clone();
547                    async move {
548                        match smgr.dispatch_message(pseudonym, data).await {
549                            Ok(DispatchResult::Processed) => {
550                                trace!("message dispatch completed");
551                                None
552                            }
553                            Ok(DispatchResult::Unrelated(data)) => {
554                                trace!("unrelated message dispatch completed");
555                                Some(data)
556                            }
557                            Err(e) => {
558                                error!(error = %e, "error while processing packet");
559                                None
560                            }
561                        }
562                    }
563                })
564                .map(Ok)
565                .forward(on_incoming_data)
566                .await;
567            }),
568        );
569
570        Ok(processes)
571    }
572
573    pub fn ticket_aggregator(&self) -> Arc<dyn TicketAggregatorTrait + Send + Sync + 'static> {
574        Arc::new(proxy::TicketAggregatorProxy::new(
575            self.db.clone(),
576            self.process_ticket_aggregate.clone(),
577            std::time::Duration::from_secs(15),
578        ))
579    }
580
581    #[tracing::instrument(level = "debug", skip(self))]
582    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
583        if !self.is_allowed_to_access_network(either::Left(peer)).await? {
584            return Err(HoprTransportError::Api(format!(
585                "ping to '{peer}' not allowed due to network registry"
586            )));
587        }
588
589        if peer == &self.me_peerid {
590            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
591        }
592
593        let pinger = self
594            .ping
595            .get()
596            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
597
598        if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
599            error!(error = %e, "Failed to store the peer observation");
600        }
601
602        let latency = (*pinger).ping(*peer).await?;
603
604        let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::Probe(
605            hopr_transport_probe::errors::ProbeError::NonExistingPeer,
606        ))?;
607
608        Ok((latency, peer_status))
609    }
610
611    #[tracing::instrument(level = "debug", skip(self))]
612    pub async fn new_session(
613        &self,
614        destination: Address,
615        target: SessionTarget,
616        cfg: SessionClientConfig,
617    ) -> errors::Result<Session> {
618        Ok(self.smgr.new_session(destination, target, cfg).await?)
619    }
620
621    #[tracing::instrument(level = "debug", skip(self))]
622    pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
623        Ok(self.smgr.ping_session(id).await?)
624    }
625
626    #[tracing::instrument(level = "info", skip(self, msg), fields(uuid = uuid::Uuid::new_v4().to_string()))]
627    pub async fn send_message(&self, msg: Box<[u8]>, routing: DestinationRouting, tag: Tag) -> errors::Result<()> {
628        if let Tag::Reserved(reserved_tag) = tag {
629            return Err(HoprTransportError::Api(format!(
630                "Application tag must not from range: {:?}, but was {reserved_tag:?}",
631                Tag::APPLICATION_TAG_RANGE
632            )));
633        }
634
635        if msg.len() > HoprPacket::PAYLOAD_SIZE {
636            return Err(HoprTransportError::Api(format!(
637                "Message exceeds the maximum allowed size of {} bytes",
638                HoprPacket::PAYLOAD_SIZE
639            )));
640        }
641
642        let app_data = ApplicationData::new_from_owned(tag, msg);
643        let routing = self.path_planner.resolve_routing(app_data.len(), routing).await?;
644
645        // Here we do not use msg_sender directly,
646        // since it internally follows Session-oriented logic
647        let sender = self.process_packet_send.get().ok_or_else(|| {
648            HoprTransportError::Api("send msg: failed because message processing is not yet initialized".into())
649        })?;
650
651        sender
652            .send_packet(app_data, routing)
653            .await
654            .map_err(|e| HoprTransportError::Api(format!("send msg failed to enqueue msg: {e}")))?
655            .consume_and_wait(crate::constants::PACKET_QUEUE_TIMEOUT_MILLISECONDS)
656            .await
657            .map_err(|e| HoprTransportError::Api(e.to_string()))
658    }
659
660    #[tracing::instrument(level = "debug", skip(self))]
661    pub async fn aggregate_tickets(&self, channel_id: &Hash) -> errors::Result<()> {
662        let entry = self
663            .db
664            .get_channel_by_id(None, channel_id)
665            .await
666            .map_err(hopr_db_sql::api::errors::DbError::from)
667            .map_err(HoprTransportError::from)
668            .and_then(|c| {
669                if let Some(c) = c {
670                    Ok(c)
671                } else {
672                    Err(ProtocolError::ChannelNotFound.into())
673                }
674            })?;
675
676        if entry.status != ChannelStatus::Open {
677            return Err(ProtocolError::ChannelClosed.into());
678        }
679
680        // Ok(Arc::new(proxy::TicketAggregatorProxy::new(
681        //     self.db.clone(),
682        //     self.process_ticket_aggregate.clone(),
683        //     std::time::Duration::from_secs(15),
684        // ))
685        // .aggregate_tickets(&entry.get_id(), Default::default())
686        // .await?)
687
688        Err(TicketAggregationError::TransportError(
689            "Ticket aggregation not supported as a session protocol yet".to_string(),
690        )
691        .into())
692    }
693
694    #[tracing::instrument(level = "debug", skip(self))]
695    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
696        Ok(self
697            .db
698            .get_accounts(None, true)
699            .await
700            .map_err(hopr_db_sql::api::errors::DbError::from)?
701            .into_iter()
702            .map(|entry| {
703                (
704                    PeerId::from(entry.public_key),
705                    entry.chain_addr,
706                    Vec::from_iter(entry.get_multiaddr().into_iter()),
707                )
708            })
709            .collect())
710    }
711
712    pub async fn is_allowed_to_access_network<'a>(
713        &self,
714        address_like: either::Either<&'a PeerId, Address>,
715    ) -> errors::Result<bool>
716    where
717        T: 'a,
718    {
719        let db_clone = self.db.clone();
720        let address_like_noref = address_like.map_left(|peer| *peer);
721
722        Ok(self
723            .db
724            .begin_transaction()
725            .await
726            .map_err(hopr_db_sql::api::errors::DbError::from)?
727            .perform(|tx| {
728                Box::pin(async move {
729                    match address_like_noref {
730                        either::Left(peer) => {
731                            let pk = OffchainPublicKey::try_from(peer)?;
732                            if let Some(address) = db_clone.translate_key(Some(tx), pk).await? {
733                                db_clone.is_allowed_in_network_registry(Some(tx), &address).await
734                            } else {
735                                Err(hopr_db_sql::errors::DbSqlError::LogicalError(
736                                    "cannot translate off-chain key".into(),
737                                ))
738                            }
739                        }
740                        either::Right(address) => db_clone.is_allowed_in_network_registry(Some(tx), &address).await,
741                    }
742                })
743            })
744            .await
745            .map_err(hopr_db_sql::api::errors::DbError::from)?)
746    }
747
748    #[tracing::instrument(level = "debug", skip(self))]
749    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
750        self.network
751            .get(&self.me_peerid)
752            .await
753            .unwrap_or_else(|e| {
754                error!(error = %e, "failed to obtain listening multi-addresses");
755                None
756            })
757            .map(|peer| peer.multiaddresses)
758            .unwrap_or_default()
759    }
760
761    #[tracing::instrument(level = "debug", skip(self))]
762    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
763        let mut mas = self
764            .local_multiaddresses()
765            .into_iter()
766            .filter(|ma| {
767                hopr_transport_identity::multiaddrs::is_supported(ma)
768                    && (self.cfg.transport.announce_local_addresses
769                        || !hopr_transport_identity::multiaddrs::is_private(ma))
770            })
771            .map(|ma| strip_p2p_protocol(&ma))
772            .filter(|v| !v.is_empty())
773            .collect::<Vec<_>>();
774
775        mas.sort_by(|l, r| {
776            let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
777            let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
778
779            if !(is_left_dns ^ is_right_dns) {
780                std::cmp::Ordering::Equal
781            } else if is_left_dns {
782                std::cmp::Ordering::Less
783            } else {
784                std::cmp::Ordering::Greater
785            }
786        });
787
788        mas
789    }
790
791    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
792        self.my_multiaddresses.clone()
793    }
794
795    #[tracing::instrument(level = "debug", skip(self))]
796    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
797        self.network
798            .get(peer)
799            .await
800            .unwrap_or(None)
801            .map(|peer| peer.multiaddresses)
802            .unwrap_or(vec![])
803    }
804
805    #[tracing::instrument(level = "debug", skip(self))]
806    pub async fn network_health(&self) -> Health {
807        self.network.health().await
808    }
809
810    #[tracing::instrument(level = "debug", skip(self))]
811    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
812        Ok(self.network.connected_peers().await?)
813    }
814
815    #[tracing::instrument(level = "debug", skip(self))]
816    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
817        Ok(self.network.get(peer).await?)
818    }
819
820    #[tracing::instrument(level = "debug", skip(self))]
821    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
822        let ticket_stats = self.db.get_ticket_statistics(None).await?;
823
824        Ok(TicketStatistics {
825            winning_count: ticket_stats.winning_tickets,
826            unredeemed_value: ticket_stats.unredeemed_value,
827            redeemed_value: ticket_stats.redeemed_value,
828            neglected_value: ticket_stats.neglected_value,
829            rejected_value: ticket_stats.rejected_value,
830        })
831    }
832
833    #[tracing::instrument(level = "debug", skip(self))]
834    pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
835        if let Some(channel) = self
836            .db
837            .get_channel_by_id(None, channel_id)
838            .await
839            .map_err(hopr_db_sql::api::errors::DbError::from)?
840        {
841            let own_address: Address = self
842                .db
843                .translate_key(None, ChainOrPacketKey::PacketKey(*self.me.public()))
844                .await?
845                .ok_or_else(|| {
846                    HoprTransportError::Api("Failed to translate the off-chain key to on-chain address".into())
847                })?
848                .try_into()?;
849
850            if channel.destination == own_address {
851                Ok(Some(self.db.get_tickets((&channel).into()).await?))
852            } else {
853                Ok(None)
854            }
855        } else {
856            Ok(None)
857        }
858    }
859
860    #[tracing::instrument(level = "debug", skip(self))]
861    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
862        Ok(self
863            .db
864            .get_all_tickets()
865            .await?
866            .into_iter()
867            .map(|v| v.ticket.leak())
868            .collect())
869    }
870}
871
872fn build_mixer_cfg_from_env() -> MixerConfig {
873    let mixer_cfg = MixerConfig {
874        min_delay: std::time::Duration::from_millis(
875            std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
876                .map(|v| {
877                    v.trim()
878                        .parse::<u64>()
879                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
880                })
881                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
882        ),
883        delay_range: std::time::Duration::from_millis(
884            std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
885                .map(|v| {
886                    v.trim()
887                        .parse::<u64>()
888                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
889                })
890                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
891        ),
892        capacity: std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
893            .map(|v| {
894                v.trim()
895                    .parse::<usize>()
896                    .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY)
897            })
898            .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY),
899        ..MixerConfig::default()
900    };
901    debug!(?mixer_cfg, "Mixer configuration");
902
903    mixer_cfg
904}