hopr_transport/
lib.rs

1//! The crate aggregates and composes individual transport level objects and functionality
2//! into a unified [`crate::HoprTransport`] object with the goal of isolating the transport layer
3//! and defining a fully specified transport API.
4//!
5//! It also implements the Session negotiation via Start sub-protocol.
6//! See the [`hopr_transport_session::initiation`] module for details on Start sub-protocol.
7//!
8//! As such, the transport layer components should be only those that are directly necessary to:
9//!
10//! 1. send and receive a packet, acknowledgement or ticket aggregation request
11//! 2. send and receive a network telemetry request
12//! 3. automate transport level processes
13//! 4. algorithms associated with the transport layer operational management
14//! 5. interface specifications to allow modular behavioral extensions
15
16/// Configuration of the [crate::HoprTransport].
17pub mod config;
18/// Constants used and exposed by the crate.
19pub mod constants;
20/// Errors used by the crate.
21pub mod errors;
22pub mod helpers;
23pub mod network_notifier;
24/// Objects used and possibly exported by the crate for re-use for transport functionality
25pub mod proxy;
26
27use async_lock::RwLock;
28use futures::{
29    channel::mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
30    future::{select, Either},
31    pin_mut, FutureExt, SinkExt, StreamExt,
32};
33use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
34use hopr_transport_mixer::MixerConfig;
35use std::time::Duration;
36use std::{
37    collections::HashMap,
38    sync::{Arc, OnceLock},
39};
40use tracing::{debug, error, info, trace, warn};
41
42use hopr_async_runtime::prelude::{sleep, spawn, JoinHandle};
43use hopr_db_sql::{
44    accounts::ChainOrPacketKey,
45    api::tickets::{AggregationPrerequisites, HoprDbTicketOperations},
46    HoprDbAllOperations,
47};
48use hopr_internal_types::prelude::*;
49use hopr_path::path::TransportPath;
50use hopr_platform::time::native::current_time;
51use hopr_primitive_types::prelude::*;
52use hopr_transport_network::{
53    heartbeat::Heartbeat,
54    ping::{PingConfig, PingQueryReplier, Pinger, Pinging},
55};
56use hopr_transport_p2p::{
57    swarm::{TicketAggregationRequestType, TicketAggregationResponseType},
58    HoprSwarm,
59};
60use hopr_transport_protocol::{
61    errors::ProtocolError,
62    msg::processor::{MsgSender, PacketInteractionConfig, PacketSendFinalizer, SendMsgInput},
63    ticket_aggregation::processor::{
64        AwaitingAggregator, TicketAggregationActions, TicketAggregationInteraction, TicketAggregatorTrait,
65    },
66};
67use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
68
69use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
70
71#[cfg(feature = "mixer-stream")]
72use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
73
74use crate::helpers::PathPlanner;
75
76use hopr_path::selectors::dfs::{DfsPathSelector, DfsPathSelectorConfig, RandomizedEdgeWeighting};
77#[cfg(feature = "runtime-tokio")]
78pub use hopr_transport_session::types::transfer_session;
79pub use {
80    hopr_crypto_types::{
81        keypairs::{ChainKeypair, Keypair, OffchainKeypair},
82        types::{HalfKeyChallenge, Hash, OffchainPublicKey},
83    },
84    hopr_internal_types::protocol::ApplicationData,
85    hopr_network_types::prelude::RoutingOptions,
86    hopr_transport_identity::{Multiaddr, PeerId},
87    hopr_transport_network::network::{Health, Network, NetworkTriggeredEvent, PeerOrigin, PeerStatus},
88    hopr_transport_protocol::{execute_on_tick, PeerDiscovery},
89    hopr_transport_session::types::{ServiceId, SessionTarget},
90    hopr_transport_session::{
91        errors::TransportSessionError, traits::SendMsg, Capability as SessionCapability, IncomingSession, Session,
92        SessionClientConfig, SessionId, SESSION_USABLE_MTU_SIZE,
93    },
94};
95
96use crate::{
97    constants::{
98        RESERVED_SESSION_TAG_UPPER_LIMIT, RESERVED_SUBPROTOCOL_TAG_UPPER_LIMIT, SESSION_INITIATION_TIMEOUT_BASE,
99    },
100    errors::HoprTransportError,
101};
102
103pub use crate::{
104    config::HoprTransportConfig,
105    helpers::{PeerEligibility, TicketStatistics},
106};
107
108#[cfg(any(
109    all(feature = "mixer-channel", feature = "mixer-stream"),
110    all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
111))]
112compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
113
114// Needs lazy-static, since Duration multiplication by a constant is yet not a const-operation.
115lazy_static::lazy_static! {
116    static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
117}
118
119#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
120pub enum HoprTransportProcess {
121    #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
122    Medium,
123    #[strum(to_string = "HOPR protocol ({0})")]
124    Protocol(hopr_transport_protocol::ProtocolProcesses),
125    #[strum(to_string = "session manager sub-process #{0}")]
126    SessionsManagement(usize),
127    #[strum(to_string = "protocol [HOPR [heartbeat]]")]
128    Heartbeat,
129}
130
131#[derive(Debug, Clone)]
132pub struct TicketAggregatorProxy<Db>
133where
134    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
135{
136    db: Db,
137    maybe_writer: Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
138    agg_timeout: std::time::Duration,
139}
140
141impl<Db> TicketAggregatorProxy<Db>
142where
143    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
144{
145    pub fn new(
146        db: Db,
147        maybe_writer: Arc<
148            OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>,
149        >,
150        agg_timeout: std::time::Duration,
151    ) -> Self {
152        Self {
153            db,
154            maybe_writer,
155            agg_timeout,
156        }
157    }
158}
159
160#[async_trait::async_trait]
161impl<Db> TicketAggregatorTrait for TicketAggregatorProxy<Db>
162where
163    Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
164{
165    async fn aggregate_tickets(
166        &self,
167        channel: &Hash,
168        prerequisites: AggregationPrerequisites,
169    ) -> hopr_transport_protocol::errors::Result<()> {
170        if let Some(writer) = self.maybe_writer.clone().get() {
171            AwaitingAggregator::new(self.db.clone(), writer.clone(), self.agg_timeout)
172                .aggregate_tickets(channel, prerequisites)
173                .await
174        } else {
175            Err(ProtocolError::TransportError(
176                "Ticket aggregation writer not available, the object was not yet initialized".to_string(),
177            ))
178        }
179    }
180}
181
182/// Currently used implementation of [`PathSelector`](hopr_path::selectors::PathSelector).
183type CurrentPathSelector = DfsPathSelector<RandomizedEdgeWeighting>;
184
185/// Interface into the physical transport mechanism allowing all off-chain HOPR-related tasks on
186/// the transport, as well as off-chain ticket manipulation.
187pub struct HoprTransport<T>
188where
189    T: HoprDbAllOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
190{
191    me: OffchainKeypair,
192    me_peerid: PeerId, // Cache to avoid an expensive conversion: OffchainPublicKey -> PeerId
193    cfg: HoprTransportConfig,
194    db: T,
195    ping: Arc<OnceLock<Pinger<network_notifier::PingExternalInteractions<T>>>>,
196    network: Arc<Network<T>>,
197    process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
198    path_planner: PathPlanner<T, CurrentPathSelector>,
199    my_multiaddresses: Vec<Multiaddr>,
200    process_ticket_aggregate:
201        Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
202    smgr: SessionManager<helpers::MessageSender<T, CurrentPathSelector>>,
203}
204
205impl<T> HoprTransport<T>
206where
207    T: HoprDbAllOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
208{
209    pub fn new(
210        me: &OffchainKeypair,
211        me_onchain: &ChainKeypair,
212        cfg: HoprTransportConfig,
213        db: T,
214        channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
215        my_multiaddresses: Vec<Multiaddr>,
216    ) -> Self {
217        let process_packet_send = Arc::new(OnceLock::new());
218
219        let me_peerid: PeerId = me.into();
220
221        Self {
222            me: me.clone(),
223            me_peerid,
224            ping: Arc::new(OnceLock::new()),
225            network: Arc::new(Network::new(
226                me_peerid,
227                my_multiaddresses.clone(),
228                cfg.network.clone(),
229                db.clone(),
230            )),
231            process_packet_send,
232            path_planner: PathPlanner::new(
233                db.clone(),
234                CurrentPathSelector::new(
235                    channel_graph.clone(),
236                    DfsPathSelectorConfig {
237                        node_score_threshold: cfg.network.node_score_auto_path_threshold,
238                        max_first_hop_latency: cfg.network.max_first_hop_latency_threshold,
239                        ..Default::default()
240                    },
241                ),
242                channel_graph.clone(),
243                me_onchain.public().to_address(),
244            ),
245            db,
246            my_multiaddresses,
247            process_ticket_aggregate: Arc::new(OnceLock::new()),
248            smgr: SessionManager::new(
249                me_peerid,
250                SessionManagerConfig {
251                    session_tag_range: RESERVED_SUBPROTOCOL_TAG_UPPER_LIMIT..RESERVED_SESSION_TAG_UPPER_LIMIT,
252                    initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
253                    idle_timeout: cfg.session.idle_timeout,
254                },
255            ),
256            cfg,
257        }
258    }
259
260    /// Execute all processes of the [`crate::HoprTransport`] object.
261    ///
262    /// This method will spawn the [`crate::HoprTransportProcess::Heartbeat`], [`crate::HoprTransportProcess::BloomFilterSave`],
263    /// [`crate::HoprTransportProcess::Swarm`] and session-related processes and return
264    /// join handles to the calling function. These processes are not started immediately but are
265    /// waiting for a trigger from this piece of code.
266    #[allow(clippy::too_many_arguments)]
267    pub async fn run(
268        &self,
269        me_onchain: &ChainKeypair,
270        version: String,
271        tbf_path: String,
272        on_incoming_data: UnboundedSender<ApplicationData>,
273        discovery_updates: UnboundedReceiver<PeerDiscovery>,
274        on_incoming_session: UnboundedSender<IncomingSession>,
275    ) -> crate::errors::Result<HashMap<HoprTransportProcess, JoinHandle<()>>> {
276        let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
277            futures::channel::mpsc::unbounded::<PeerDiscovery>();
278
279        let network_clone = self.network.clone();
280        let db_clone = self.db.clone();
281        let me_peerid = self.me_peerid;
282        let discovery_updates =
283            futures_concurrency::stream::StreamExt::merge(discovery_updates, internal_discovery_update_rx)
284                .filter_map(move |event| {
285                    let network = network_clone.clone();
286                    let db = db_clone.clone();
287                    let me = me_peerid;
288
289                    async move {
290                        match event {
291                            PeerDiscovery::Allow(peer_id) => {
292                                if let Ok(pk) = OffchainPublicKey::try_from(peer_id) {
293                                    if !network.has(&peer_id).await {
294                                        let mas = db
295                                            .get_account(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pk))
296                                            .await
297                                            .map(|entry| {
298                                                entry
299                                                    .map(|v| Vec::from_iter(v.get_multiaddr().into_iter()))
300                                                    .unwrap_or_default()
301                                            })
302                                            .unwrap_or_default();
303
304                                        if let Err(e) = network.add(&peer_id, PeerOrigin::NetworkRegistry, mas).await {
305                                            error!(peer = %peer_id, error = %e, "Failed to allow locally (already allowed on-chain)");
306                                            return None;
307                                        }
308                                    }
309
310                                    return Some(PeerDiscovery::Allow(peer_id))
311                                } else {
312                                    error!(peer = %peer_id, "Failed to allow locally (already allowed on-chain): peer id not convertible to off-chain public key")
313                                }
314                            }
315                            PeerDiscovery::Ban(peer_id) => {
316                                if let Err(e) = network.remove(&peer_id).await {
317                                    error!(peer = %peer_id, error = %e, "Failed to ban locally (already banned on-chain)")
318                                } else {
319                                    return Some(PeerDiscovery::Ban(peer_id))
320                                }
321                            }
322                            PeerDiscovery::Announce(peer, multiaddresses) => {
323                                if peer != me {
324                                    // decapsulate the `p2p/<peer_id>` to remove duplicities
325                                    let mas = multiaddresses
326                                        .into_iter()
327                                        .map(|ma| strip_p2p_protocol(&ma))
328                                        .filter(|v| !v.is_empty())
329                                        .collect::<Vec<_>>();
330
331                                    if ! mas.is_empty() {
332                                        if let Ok(pk) = OffchainPublicKey::try_from(peer) {
333                                            if let Ok(Some(key)) = db.translate_key(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pk)).await {
334                                                let key: Result<Address, _> = key.try_into();
335
336                                                if let Ok(key) = key {
337                                                    if db
338                                                        .is_allowed_in_network_registry(None, &key)
339                                                        .await
340                                                        .unwrap_or(false)
341                                                    {
342                                                        if let Err(e) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await
343                                                        {
344                                                            error!(%peer, error = %e, "failed to record peer from the NetworkRegistry");
345                                                        } else {
346                                                            return Some(PeerDiscovery::Announce(peer, mas))
347                                                        }
348                                                    }
349                                                }
350                                            } else {
351                                                error!(%peer, "Failed to announce peer due to convertibility error");
352                                            }
353                                        }
354                                    }
355                                }
356                            }
357                        }
358
359                        None
360                    }
361                });
362
363        info!("Loading initial peers from the storage");
364
365        let nodes = self.get_public_nodes().await?;
366        for (peer, _address, multiaddresses) in nodes {
367            if self.is_allowed_to_access_network(either::Left(&peer)).await? {
368                debug!(%peer, ?multiaddresses, "Using initial public node");
369
370                internal_discovery_update_tx
371                    .send(PeerDiscovery::Allow(peer))
372                    .await
373                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
374
375                internal_discovery_update_tx
376                    .send(PeerDiscovery::Announce(peer, multiaddresses.clone()))
377                    .await
378                    .map_err(|e| HoprTransportError::Api(e.to_string()))?;
379            }
380        }
381
382        let mut processes: HashMap<HoprTransportProcess, JoinHandle<()>> = HashMap::new();
383
384        // network event processing channel
385        let (network_events_tx, network_events_rx) =
386            mpsc::channel::<NetworkTriggeredEvent>(constants::MAXIMUM_NETWORK_UPDATE_EVENT_QUEUE_SIZE);
387
388        // manual ping
389        let (ping_tx, ping_rx) = mpsc::unbounded::<(PeerId, PingQueryReplier)>();
390
391        let ping_cfg = PingConfig {
392            timeout: self.cfg.protocol.heartbeat.timeout,
393            max_parallel_pings: self.cfg.heartbeat.max_parallel_probes,
394        };
395
396        let ping: Pinger<network_notifier::PingExternalInteractions<T>> = Pinger::new(
397            ping_cfg,
398            ping_tx.clone(),
399            network_notifier::PingExternalInteractions::new(
400                self.network.clone(),
401                self.db.clone(),
402                self.path_planner.channel_graph(),
403                network_events_tx,
404            ),
405        );
406
407        self.ping
408            .clone()
409            .set(ping)
410            .expect("must set the ping executor only once");
411
412        let ticket_agg_proc = TicketAggregationInteraction::new(self.db.clone(), me_onchain);
413        let tkt_agg_writer = ticket_agg_proc.writer();
414
415        let (external_msg_send, external_msg_rx) =
416            mpsc::channel::<(ApplicationData, TransportPath, PacketSendFinalizer)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
417
418        self.process_packet_send
419            .clone()
420            .set(MsgSender::new(external_msg_send))
421            .expect("must set the packet processing writer only once");
422
423        self.process_ticket_aggregate
424            .clone()
425            .set(tkt_agg_writer.clone())
426            .expect("must set the ticket aggregation writer only once");
427
428        // heartbeat
429        let mut heartbeat = Heartbeat::new(
430            self.cfg.heartbeat,
431            self.ping
432                .get()
433                .expect("Ping should be initialized at this point")
434                .clone(),
435            hopr_transport_network::heartbeat::HeartbeatExternalInteractions::new(self.network.clone()),
436            Box::new(|dur| Box::pin(sleep(dur))),
437        );
438
439        // initiate the transport layer
440        let mixer_cfg = MixerConfig {
441            min_delay: std::time::Duration::from_millis(
442                std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
443                    .map(|v| {
444                        v.trim()
445                            .parse::<u64>()
446                            .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
447                    })
448                    .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
449            ),
450            delay_range: std::time::Duration::from_millis(
451                std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
452                    .map(|v| {
453                        v.trim()
454                            .parse::<u64>()
455                            .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
456                    })
457                    .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
458            ),
459            capacity: std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
460                .map(|v| {
461                    v.trim()
462                        .parse::<usize>()
463                        .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY)
464                })
465                .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY),
466            ..MixerConfig::default()
467        };
468        #[cfg(feature = "mixer-channel")]
469        let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
470
471        #[cfg(feature = "mixer-stream")]
472        let (mixing_channel_tx, mixing_channel_rx) = {
473            let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
474            let rx = rx.then_concurrent(move |v| {
475                let cfg = mixer_cfg;
476
477                async move {
478                    let random_delay = cfg.random_delay();
479                    trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
480
481                    #[cfg(all(feature = "prometheus", not(test)))]
482                    hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
483
484                    sleep(random_delay).await;
485
486                    #[cfg(all(feature = "prometheus", not(test)))]
487                    {
488                        hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
489
490                        let weight = 1.0f64 / cfg.metric_delay_window as f64;
491                        hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
492                            (weight * random_delay.as_millis() as f64)
493                                + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
494                        );
495                    }
496
497                    v
498                }
499            });
500
501            (tx, rx)
502        };
503
504        let transport_layer = HoprSwarm::new(
505            (&self.me).into(),
506            network_events_rx,
507            discovery_updates,
508            ping_rx,
509            ticket_agg_proc,
510            self.my_multiaddresses.clone(),
511            self.cfg.protocol,
512        )
513        .await;
514
515        let msg_proto_control =
516            transport_layer.build_protocol_control(hopr_transport_protocol::msg::CURRENT_HOPR_MSG_PROTOCOL);
517        let msg_codec = hopr_transport_protocol::msg::MsgCodec;
518        let (wire_msg_tx, wire_msg_rx) =
519            hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
520
521        let _mixing_process_before_sending_out =
522            hopr_async_runtime::prelude::spawn(mixing_channel_rx.map(Ok).forward(wire_msg_tx));
523
524        let ack_proto_control =
525            transport_layer.build_protocol_control(hopr_transport_protocol::ack::CURRENT_HOPR_ACK_PROTOCOL);
526        let ack_codec = hopr_transport_protocol::ack::AckCodec::new();
527        let (wire_ack_tx, wire_ack_rx) =
528            hopr_transport_protocol::stream::process_stream_protocol(ack_codec, ack_proto_control).await?;
529
530        let transport_layer = transport_layer.with_processors(tkt_agg_writer);
531
532        processes.insert(HoprTransportProcess::Medium, spawn(transport_layer.run(version)));
533
534        // initiate the msg-ack protocol stack over the wire transport
535        let packet_cfg = PacketInteractionConfig::new(
536            &self.me,
537            me_onchain,
538            self.cfg.protocol.outgoing_ticket_winning_prob,
539            self.cfg.protocol.outgoing_ticket_price,
540        );
541
542        let (tx_from_protocol, rx_from_protocol) = mpsc::unbounded::<ApplicationData>();
543        for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
544            packet_cfg,
545            self.db.clone(),
546            Some(tbf_path),
547            (wire_ack_tx, wire_ack_rx),
548            (mixing_channel_tx, wire_msg_rx),
549            (tx_from_protocol, external_msg_rx),
550        )
551        .await
552        .into_iter()
553        {
554            processes.insert(HoprTransportProcess::Protocol(k), v);
555        }
556
557        let msg_sender = helpers::MessageSender::new(self.process_packet_send.clone(), self.path_planner.clone());
558
559        self.smgr
560            .start(msg_sender, on_incoming_session)
561            .expect("failed to start session manager")
562            .into_iter()
563            .enumerate()
564            .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
565            .for_each(|(k, v)| {
566                processes.insert(k, v);
567            });
568
569        let smgr = self.smgr.clone();
570        processes.insert(
571            HoprTransportProcess::SessionsManagement(0),
572            spawn(async move {
573                let _the_process_should_not_end = StreamExt::filter_map(rx_from_protocol, |data| {
574                    let smgr = smgr.clone();
575                    async move {
576                        match smgr.dispatch_message(data).await {
577                            Ok(DispatchResult::Processed) => {
578                                trace!("message dispatch completed");
579                                None
580                            }
581                            Ok(DispatchResult::Unrelated(data)) => {
582                                trace!("unrelated message dispatch completed");
583                                Some(data)
584                            }
585                            Err(e) => {
586                                error!(error = %e, "error while processing packet");
587                                None
588                            }
589                        }
590                    }
591                })
592                .map(Ok)
593                .forward(on_incoming_data)
594                .await;
595            }),
596        );
597
598        // initiate the network telemetry
599        let half_the_hearbeat_interval = self.cfg.heartbeat.interval / 4;
600        processes.insert(
601            HoprTransportProcess::Heartbeat,
602            spawn(async move {
603                // present to make sure that the heartbeat does not start immediately
604                hopr_async_runtime::prelude::sleep(half_the_hearbeat_interval).await;
605                heartbeat.heartbeat_loop().await
606            }),
607        );
608
609        Ok(processes)
610    }
611
612    pub fn ticket_aggregator(&self) -> Arc<dyn TicketAggregatorTrait + Send + Sync + 'static> {
613        Arc::new(proxy::TicketAggregatorProxy::new(
614            self.db.clone(),
615            self.process_ticket_aggregate.clone(),
616            self.cfg.protocol.ticket_aggregation.timeout,
617        ))
618    }
619
620    #[tracing::instrument(level = "debug", skip(self))]
621    pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
622        if !self.is_allowed_to_access_network(either::Left(peer)).await? {
623            return Err(HoprTransportError::Api(format!(
624                "ping to '{peer}' not allowed due to network registry"
625            )));
626        }
627
628        if peer == &self.me_peerid {
629            return Err(HoprTransportError::Api("ping to self does not make sense".into()));
630        }
631
632        let pinger = self
633            .ping
634            .get()
635            .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
636
637        let timeout = sleep(Duration::from_secs(30)).fuse();
638        let ping = (*pinger).ping(vec![*peer]);
639
640        pin_mut!(timeout, ping);
641
642        if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
643            error!(error = %e, "Failed to store the peer observation");
644        }
645
646        let start = current_time().as_unix_timestamp();
647
648        match select(timeout, ping.next().fuse()).await {
649            Either::Left(_) => {
650                warn!(%peer, "Manual ping to peer timed out");
651                return Err(ProtocolError::Timeout.into());
652            }
653            Either::Right((v, _)) => {
654                match v
655                    .into_iter()
656                    .map(|r| r.map_err(HoprTransportError::NetworkError))
657                    .collect::<errors::Result<Vec<Duration>>>()
658                {
659                    Ok(d) => info!(%peer, rtt = ?d, "Manual ping succeeded"),
660                    Err(e) => {
661                        error!(%peer, error = %e, "Manual ping failed");
662                        return Err(e);
663                    }
664                }
665            }
666        };
667
668        let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::NetworkError(
669            errors::NetworkingError::NonExistingPeer,
670        ))?;
671
672        Ok((
673            peer_status.last_seen.as_unix_timestamp().saturating_sub(start),
674            peer_status,
675        ))
676    }
677
678    #[tracing::instrument(level = "debug", skip(self))]
679    pub async fn new_session(&self, cfg: SessionClientConfig) -> errors::Result<Session> {
680        Ok(self.smgr.new_session(cfg).await?)
681    }
682
683    #[tracing::instrument(level = "info", skip(self, msg), fields(uuid = uuid::Uuid::new_v4().to_string()))]
684    pub async fn send_message(
685        &self,
686        msg: Box<[u8]>,
687        destination: PeerId,
688        options: RoutingOptions,
689        application_tag: Option<u16>,
690    ) -> errors::Result<()> {
691        // The send_message logic will be entirely removed in 3.0
692        if let Some(application_tag) = application_tag {
693            if application_tag < RESERVED_SESSION_TAG_UPPER_LIMIT {
694                return Err(HoprTransportError::Api(format!(
695                    "Application tag must not be lower than {RESERVED_SESSION_TAG_UPPER_LIMIT}"
696                )));
697            }
698        }
699
700        if msg.len() > PAYLOAD_SIZE {
701            return Err(HoprTransportError::Api(format!(
702                "Message exceeds the maximum allowed size of {PAYLOAD_SIZE} bytes"
703            )));
704        }
705
706        let app_data = ApplicationData::new_from_owned(application_tag, msg)?;
707
708        // Here we do not use msg_sender directly,
709        // since it internally follows Session-oriented logic
710        let path = self.path_planner.resolve_path(destination, options).await?;
711        let sender = self.process_packet_send.get().ok_or_else(|| {
712            HoprTransportError::Api("send msg: failed because message processing is not yet initialized".into())
713        })?;
714
715        sender
716            .send_packet(app_data, path)
717            .await
718            .map_err(|e| HoprTransportError::Api(format!("send msg failed to enqueue msg: {e}")))?
719            .consume_and_wait(crate::constants::PACKET_QUEUE_TIMEOUT_MILLISECONDS)
720            .await
721            .map_err(|e| HoprTransportError::Api(e.to_string()))
722    }
723
724    #[tracing::instrument(level = "debug", skip(self))]
725    pub async fn aggregate_tickets(&self, channel_id: &Hash) -> errors::Result<()> {
726        let entry = self
727            .db
728            .get_channel_by_id(None, channel_id)
729            .await
730            .map_err(hopr_db_sql::api::errors::DbError::from)
731            .map_err(HoprTransportError::from)
732            .and_then(|c| {
733                if let Some(c) = c {
734                    Ok(c)
735                } else {
736                    Err(ProtocolError::ChannelNotFound.into())
737                }
738            })?;
739
740        if entry.status != ChannelStatus::Open {
741            return Err(ProtocolError::ChannelClosed.into());
742        }
743
744        Ok(Arc::new(proxy::TicketAggregatorProxy::new(
745            self.db.clone(),
746            self.process_ticket_aggregate.clone(),
747            self.cfg.protocol.ticket_aggregation.timeout,
748        ))
749        .aggregate_tickets(&entry.get_id(), Default::default())
750        .await?)
751    }
752
753    #[tracing::instrument(level = "debug", skip(self))]
754    pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
755        Ok(self
756            .db
757            .get_accounts(None, true)
758            .await
759            .map_err(hopr_db_sql::api::errors::DbError::from)?
760            .into_iter()
761            .map(|entry| {
762                (
763                    PeerId::from(entry.public_key),
764                    entry.chain_addr,
765                    Vec::from_iter(entry.get_multiaddr().into_iter()),
766                )
767            })
768            .collect())
769    }
770
771    pub async fn is_allowed_to_access_network<'a>(
772        &self,
773        address_like: either::Either<&'a PeerId, Address>,
774    ) -> errors::Result<bool>
775    where
776        T: 'a,
777    {
778        let db_clone = self.db.clone();
779        let address_like_noref = address_like.map_left(|peer| *peer);
780
781        Ok(self
782            .db
783            .begin_transaction()
784            .await
785            .map_err(hopr_db_sql::api::errors::DbError::from)?
786            .perform(|tx| {
787                Box::pin(async move {
788                    match address_like_noref {
789                        either::Left(peer) => {
790                            let pk = OffchainPublicKey::try_from(peer)?;
791                            if let Some(address) = db_clone.translate_key(Some(tx), pk).await? {
792                                db_clone.is_allowed_in_network_registry(Some(tx), &address).await
793                            } else {
794                                Err(hopr_db_sql::errors::DbSqlError::LogicalError(
795                                    "cannot translate off-chain key".into(),
796                                ))
797                            }
798                        }
799                        either::Right(address) => db_clone.is_allowed_in_network_registry(Some(tx), &address).await,
800                    }
801                })
802            })
803            .await
804            .map_err(hopr_db_sql::api::errors::DbError::from)?)
805    }
806
807    #[tracing::instrument(level = "debug", skip(self))]
808    pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
809        self.network
810            .get(&self.me_peerid)
811            .await
812            .unwrap_or_else(|e| {
813                error!(error = %e, "failed to obtain listening multi-addresses");
814                None
815            })
816            .map(|peer| peer.multiaddresses)
817            .unwrap_or_default()
818    }
819
820    #[tracing::instrument(level = "debug", skip(self))]
821    pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
822        let mut mas = self
823            .local_multiaddresses()
824            .into_iter()
825            .filter(|ma| {
826                hopr_transport_identity::multiaddrs::is_supported(ma)
827                    && (self.cfg.transport.announce_local_addresses
828                        || !hopr_transport_identity::multiaddrs::is_private(ma))
829            })
830            .map(|ma| strip_p2p_protocol(&ma))
831            .filter(|v| !v.is_empty())
832            .collect::<Vec<_>>();
833
834        mas.sort_by(|l, r| {
835            let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
836            let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
837
838            if !(is_left_dns ^ is_right_dns) {
839                std::cmp::Ordering::Equal
840            } else if is_left_dns {
841                std::cmp::Ordering::Less
842            } else {
843                std::cmp::Ordering::Greater
844            }
845        });
846
847        mas
848    }
849
850    pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
851        self.my_multiaddresses.clone()
852    }
853
854    #[tracing::instrument(level = "debug", skip(self))]
855    pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
856        self.network
857            .get(peer)
858            .await
859            .unwrap_or(None)
860            .map(|peer| peer.multiaddresses)
861            .unwrap_or(vec![])
862    }
863
864    #[tracing::instrument(level = "debug", skip(self))]
865    pub async fn network_health(&self) -> Health {
866        self.network.health().await
867    }
868
869    #[tracing::instrument(level = "debug", skip(self))]
870    pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
871        Ok(self.network.connected_peers().await?)
872    }
873
874    #[tracing::instrument(level = "debug", skip(self))]
875    pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
876        Ok(self.network.get(peer).await?)
877    }
878
879    #[tracing::instrument(level = "debug", skip(self))]
880    pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
881        let ticket_stats = self.db.get_ticket_statistics(None).await?;
882
883        Ok(TicketStatistics {
884            winning_count: ticket_stats.winning_tickets,
885            unredeemed_value: ticket_stats.unredeemed_value,
886            redeemed_value: ticket_stats.redeemed_value,
887            neglected_value: ticket_stats.neglected_value,
888            rejected_value: ticket_stats.rejected_value,
889        })
890    }
891
892    #[tracing::instrument(level = "debug", skip(self))]
893    pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
894        if let Some(channel) = self
895            .db
896            .get_channel_by_id(None, channel_id)
897            .await
898            .map_err(hopr_db_sql::api::errors::DbError::from)?
899        {
900            let own_address: Address = self
901                .db
902                .translate_key(None, ChainOrPacketKey::PacketKey(*self.me.public()))
903                .await?
904                .ok_or_else(|| {
905                    HoprTransportError::Api("Failed to translate the off-chain key to on-chain address".into())
906                })?
907                .try_into()?;
908
909            if channel.destination == own_address {
910                Ok(Some(self.db.get_tickets((&channel).into()).await?))
911            } else {
912                Ok(None)
913            }
914        } else {
915            Ok(None)
916        }
917    }
918
919    #[tracing::instrument(level = "debug", skip(self))]
920    pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
921        Ok(self
922            .db
923            .get_all_tickets()
924            .await?
925            .into_iter()
926            .map(|v| v.ticket.leak())
927            .collect())
928    }
929}