1pub mod config;
18pub mod constants;
20pub mod errors;
22pub mod helpers;
23pub mod network_notifier;
24pub mod proxy;
26
27use async_lock::RwLock;
28use futures::{
29 channel::mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
30 future::{select, Either},
31 pin_mut, FutureExt, SinkExt, StreamExt,
32};
33use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
34use hopr_transport_mixer::MixerConfig;
35use std::time::Duration;
36use std::{
37 collections::HashMap,
38 sync::{Arc, OnceLock},
39};
40use tracing::{debug, error, info, trace, warn};
41
42use hopr_async_runtime::prelude::{sleep, spawn, JoinHandle};
43use hopr_db_sql::{
44 accounts::ChainOrPacketKey,
45 api::tickets::{AggregationPrerequisites, HoprDbTicketOperations},
46 HoprDbAllOperations,
47};
48use hopr_internal_types::prelude::*;
49use hopr_path::path::TransportPath;
50use hopr_platform::time::native::current_time;
51use hopr_primitive_types::prelude::*;
52use hopr_transport_network::{
53 heartbeat::Heartbeat,
54 ping::{PingConfig, PingQueryReplier, Pinger, Pinging},
55};
56use hopr_transport_p2p::{
57 swarm::{TicketAggregationRequestType, TicketAggregationResponseType},
58 HoprSwarm,
59};
60use hopr_transport_protocol::{
61 errors::ProtocolError,
62 msg::processor::{MsgSender, PacketInteractionConfig, PacketSendFinalizer, SendMsgInput},
63 ticket_aggregation::processor::{
64 AwaitingAggregator, TicketAggregationActions, TicketAggregationInteraction, TicketAggregatorTrait,
65 },
66};
67use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
68
69use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
70
71#[cfg(feature = "mixer-stream")]
72use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
73
74use crate::helpers::PathPlanner;
75
76use hopr_path::selectors::dfs::{DfsPathSelector, DfsPathSelectorConfig, RandomizedEdgeWeighting};
77#[cfg(feature = "runtime-tokio")]
78pub use hopr_transport_session::types::transfer_session;
79pub use {
80 hopr_crypto_types::{
81 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
82 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
83 },
84 hopr_internal_types::protocol::ApplicationData,
85 hopr_network_types::prelude::RoutingOptions,
86 hopr_transport_identity::{Multiaddr, PeerId},
87 hopr_transport_network::network::{Health, Network, NetworkTriggeredEvent, PeerOrigin, PeerStatus},
88 hopr_transport_protocol::{execute_on_tick, PeerDiscovery},
89 hopr_transport_session::types::{ServiceId, SessionTarget},
90 hopr_transport_session::{
91 errors::TransportSessionError, traits::SendMsg, Capability as SessionCapability, IncomingSession, Session,
92 SessionClientConfig, SessionId, SESSION_USABLE_MTU_SIZE,
93 },
94};
95
96use crate::{
97 constants::{
98 RESERVED_SESSION_TAG_UPPER_LIMIT, RESERVED_SUBPROTOCOL_TAG_UPPER_LIMIT, SESSION_INITIATION_TIMEOUT_BASE,
99 },
100 errors::HoprTransportError,
101};
102
103pub use crate::{
104 config::HoprTransportConfig,
105 helpers::{PeerEligibility, TicketStatistics},
106};
107
108#[cfg(any(
109 all(feature = "mixer-channel", feature = "mixer-stream"),
110 all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
111))]
112compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
113
114lazy_static::lazy_static! {
116 static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
117}
118
119#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
120pub enum HoprTransportProcess {
121 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
122 Medium,
123 #[strum(to_string = "HOPR protocol ({0})")]
124 Protocol(hopr_transport_protocol::ProtocolProcesses),
125 #[strum(to_string = "session manager sub-process #{0}")]
126 SessionsManagement(usize),
127 #[strum(to_string = "protocol [HOPR [heartbeat]]")]
128 Heartbeat,
129}
130
131#[derive(Debug, Clone)]
132pub struct TicketAggregatorProxy<Db>
133where
134 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
135{
136 db: Db,
137 maybe_writer: Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
138 agg_timeout: std::time::Duration,
139}
140
141impl<Db> TicketAggregatorProxy<Db>
142where
143 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
144{
145 pub fn new(
146 db: Db,
147 maybe_writer: Arc<
148 OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>,
149 >,
150 agg_timeout: std::time::Duration,
151 ) -> Self {
152 Self {
153 db,
154 maybe_writer,
155 agg_timeout,
156 }
157 }
158}
159
160#[async_trait::async_trait]
161impl<Db> TicketAggregatorTrait for TicketAggregatorProxy<Db>
162where
163 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
164{
165 async fn aggregate_tickets(
166 &self,
167 channel: &Hash,
168 prerequisites: AggregationPrerequisites,
169 ) -> hopr_transport_protocol::errors::Result<()> {
170 if let Some(writer) = self.maybe_writer.clone().get() {
171 AwaitingAggregator::new(self.db.clone(), writer.clone(), self.agg_timeout)
172 .aggregate_tickets(channel, prerequisites)
173 .await
174 } else {
175 Err(ProtocolError::TransportError(
176 "Ticket aggregation writer not available, the object was not yet initialized".to_string(),
177 ))
178 }
179 }
180}
181
182type CurrentPathSelector = DfsPathSelector<RandomizedEdgeWeighting>;
184
185pub struct HoprTransport<T>
188where
189 T: HoprDbAllOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
190{
191 me: OffchainKeypair,
192 me_peerid: PeerId, cfg: HoprTransportConfig,
194 db: T,
195 ping: Arc<OnceLock<Pinger<network_notifier::PingExternalInteractions<T>>>>,
196 network: Arc<Network<T>>,
197 process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
198 path_planner: PathPlanner<T, CurrentPathSelector>,
199 my_multiaddresses: Vec<Multiaddr>,
200 process_ticket_aggregate:
201 Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
202 smgr: SessionManager<helpers::MessageSender<T, CurrentPathSelector>>,
203}
204
205impl<T> HoprTransport<T>
206where
207 T: HoprDbAllOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
208{
209 pub fn new(
210 me: &OffchainKeypair,
211 me_onchain: &ChainKeypair,
212 cfg: HoprTransportConfig,
213 db: T,
214 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
215 my_multiaddresses: Vec<Multiaddr>,
216 ) -> Self {
217 let process_packet_send = Arc::new(OnceLock::new());
218
219 let me_peerid: PeerId = me.into();
220
221 Self {
222 me: me.clone(),
223 me_peerid,
224 ping: Arc::new(OnceLock::new()),
225 network: Arc::new(Network::new(
226 me_peerid,
227 my_multiaddresses.clone(),
228 cfg.network.clone(),
229 db.clone(),
230 )),
231 process_packet_send,
232 path_planner: PathPlanner::new(
233 db.clone(),
234 CurrentPathSelector::new(
235 channel_graph.clone(),
236 DfsPathSelectorConfig {
237 node_score_threshold: cfg.network.node_score_auto_path_threshold,
238 max_first_hop_latency: cfg.network.max_first_hop_latency_threshold,
239 ..Default::default()
240 },
241 ),
242 channel_graph.clone(),
243 me_onchain.public().to_address(),
244 ),
245 db,
246 my_multiaddresses,
247 process_ticket_aggregate: Arc::new(OnceLock::new()),
248 smgr: SessionManager::new(
249 me_peerid,
250 SessionManagerConfig {
251 session_tag_range: RESERVED_SUBPROTOCOL_TAG_UPPER_LIMIT..RESERVED_SESSION_TAG_UPPER_LIMIT,
252 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
253 idle_timeout: cfg.session.idle_timeout,
254 },
255 ),
256 cfg,
257 }
258 }
259
260 #[allow(clippy::too_many_arguments)]
267 pub async fn run(
268 &self,
269 me_onchain: &ChainKeypair,
270 version: String,
271 tbf_path: String,
272 on_incoming_data: UnboundedSender<ApplicationData>,
273 discovery_updates: UnboundedReceiver<PeerDiscovery>,
274 on_incoming_session: UnboundedSender<IncomingSession>,
275 ) -> crate::errors::Result<HashMap<HoprTransportProcess, JoinHandle<()>>> {
276 let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
277 futures::channel::mpsc::unbounded::<PeerDiscovery>();
278
279 let network_clone = self.network.clone();
280 let db_clone = self.db.clone();
281 let me_peerid = self.me_peerid;
282 let discovery_updates =
283 futures_concurrency::stream::StreamExt::merge(discovery_updates, internal_discovery_update_rx)
284 .filter_map(move |event| {
285 let network = network_clone.clone();
286 let db = db_clone.clone();
287 let me = me_peerid;
288
289 async move {
290 match event {
291 PeerDiscovery::Allow(peer_id) => {
292 if let Ok(pk) = OffchainPublicKey::try_from(peer_id) {
293 if !network.has(&peer_id).await {
294 let mas = db
295 .get_account(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pk))
296 .await
297 .map(|entry| {
298 entry
299 .map(|v| Vec::from_iter(v.get_multiaddr().into_iter()))
300 .unwrap_or_default()
301 })
302 .unwrap_or_default();
303
304 if let Err(e) = network.add(&peer_id, PeerOrigin::NetworkRegistry, mas).await {
305 error!(peer = %peer_id, error = %e, "Failed to allow locally (already allowed on-chain)");
306 return None;
307 }
308 }
309
310 return Some(PeerDiscovery::Allow(peer_id))
311 } else {
312 error!(peer = %peer_id, "Failed to allow locally (already allowed on-chain): peer id not convertible to off-chain public key")
313 }
314 }
315 PeerDiscovery::Ban(peer_id) => {
316 if let Err(e) = network.remove(&peer_id).await {
317 error!(peer = %peer_id, error = %e, "Failed to ban locally (already banned on-chain)")
318 } else {
319 return Some(PeerDiscovery::Ban(peer_id))
320 }
321 }
322 PeerDiscovery::Announce(peer, multiaddresses) => {
323 if peer != me {
324 let mas = multiaddresses
326 .into_iter()
327 .map(|ma| strip_p2p_protocol(&ma))
328 .filter(|v| !v.is_empty())
329 .collect::<Vec<_>>();
330
331 if ! mas.is_empty() {
332 if let Ok(pk) = OffchainPublicKey::try_from(peer) {
333 if let Ok(Some(key)) = db.translate_key(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pk)).await {
334 let key: Result<Address, _> = key.try_into();
335
336 if let Ok(key) = key {
337 if db
338 .is_allowed_in_network_registry(None, &key)
339 .await
340 .unwrap_or(false)
341 {
342 if let Err(e) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await
343 {
344 error!(%peer, error = %e, "failed to record peer from the NetworkRegistry");
345 } else {
346 return Some(PeerDiscovery::Announce(peer, mas))
347 }
348 }
349 }
350 } else {
351 error!(%peer, "Failed to announce peer due to convertibility error");
352 }
353 }
354 }
355 }
356 }
357 }
358
359 None
360 }
361 });
362
363 info!("Loading initial peers from the storage");
364
365 let nodes = self.get_public_nodes().await?;
366 for (peer, _address, multiaddresses) in nodes {
367 if self.is_allowed_to_access_network(either::Left(&peer)).await? {
368 debug!(%peer, ?multiaddresses, "Using initial public node");
369
370 internal_discovery_update_tx
371 .send(PeerDiscovery::Allow(peer))
372 .await
373 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
374
375 internal_discovery_update_tx
376 .send(PeerDiscovery::Announce(peer, multiaddresses.clone()))
377 .await
378 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
379 }
380 }
381
382 let mut processes: HashMap<HoprTransportProcess, JoinHandle<()>> = HashMap::new();
383
384 let (network_events_tx, network_events_rx) =
386 mpsc::channel::<NetworkTriggeredEvent>(constants::MAXIMUM_NETWORK_UPDATE_EVENT_QUEUE_SIZE);
387
388 let (ping_tx, ping_rx) = mpsc::unbounded::<(PeerId, PingQueryReplier)>();
390
391 let ping_cfg = PingConfig {
392 timeout: self.cfg.protocol.heartbeat.timeout,
393 max_parallel_pings: self.cfg.heartbeat.max_parallel_probes,
394 };
395
396 let ping: Pinger<network_notifier::PingExternalInteractions<T>> = Pinger::new(
397 ping_cfg,
398 ping_tx.clone(),
399 network_notifier::PingExternalInteractions::new(
400 self.network.clone(),
401 self.db.clone(),
402 self.path_planner.channel_graph(),
403 network_events_tx,
404 ),
405 );
406
407 self.ping
408 .clone()
409 .set(ping)
410 .expect("must set the ping executor only once");
411
412 let ticket_agg_proc = TicketAggregationInteraction::new(self.db.clone(), me_onchain);
413 let tkt_agg_writer = ticket_agg_proc.writer();
414
415 let (external_msg_send, external_msg_rx) =
416 mpsc::channel::<(ApplicationData, TransportPath, PacketSendFinalizer)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
417
418 self.process_packet_send
419 .clone()
420 .set(MsgSender::new(external_msg_send))
421 .expect("must set the packet processing writer only once");
422
423 self.process_ticket_aggregate
424 .clone()
425 .set(tkt_agg_writer.clone())
426 .expect("must set the ticket aggregation writer only once");
427
428 let mut heartbeat = Heartbeat::new(
430 self.cfg.heartbeat,
431 self.ping
432 .get()
433 .expect("Ping should be initialized at this point")
434 .clone(),
435 hopr_transport_network::heartbeat::HeartbeatExternalInteractions::new(self.network.clone()),
436 Box::new(|dur| Box::pin(sleep(dur))),
437 );
438
439 let mixer_cfg = MixerConfig {
441 min_delay: std::time::Duration::from_millis(
442 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
443 .map(|v| {
444 v.trim()
445 .parse::<u64>()
446 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
447 })
448 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
449 ),
450 delay_range: std::time::Duration::from_millis(
451 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
452 .map(|v| {
453 v.trim()
454 .parse::<u64>()
455 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
456 })
457 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
458 ),
459 capacity: std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
460 .map(|v| {
461 v.trim()
462 .parse::<usize>()
463 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY)
464 })
465 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY),
466 ..MixerConfig::default()
467 };
468 #[cfg(feature = "mixer-channel")]
469 let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
470
471 #[cfg(feature = "mixer-stream")]
472 let (mixing_channel_tx, mixing_channel_rx) = {
473 let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
474 let rx = rx.then_concurrent(move |v| {
475 let cfg = mixer_cfg;
476
477 async move {
478 let random_delay = cfg.random_delay();
479 trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
480
481 #[cfg(all(feature = "prometheus", not(test)))]
482 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
483
484 sleep(random_delay).await;
485
486 #[cfg(all(feature = "prometheus", not(test)))]
487 {
488 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
489
490 let weight = 1.0f64 / cfg.metric_delay_window as f64;
491 hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
492 (weight * random_delay.as_millis() as f64)
493 + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
494 );
495 }
496
497 v
498 }
499 });
500
501 (tx, rx)
502 };
503
504 let transport_layer = HoprSwarm::new(
505 (&self.me).into(),
506 network_events_rx,
507 discovery_updates,
508 ping_rx,
509 ticket_agg_proc,
510 self.my_multiaddresses.clone(),
511 self.cfg.protocol,
512 )
513 .await;
514
515 let msg_proto_control =
516 transport_layer.build_protocol_control(hopr_transport_protocol::msg::CURRENT_HOPR_MSG_PROTOCOL);
517 let msg_codec = hopr_transport_protocol::msg::MsgCodec;
518 let (wire_msg_tx, wire_msg_rx) =
519 hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
520
521 let _mixing_process_before_sending_out =
522 hopr_async_runtime::prelude::spawn(mixing_channel_rx.map(Ok).forward(wire_msg_tx));
523
524 let ack_proto_control =
525 transport_layer.build_protocol_control(hopr_transport_protocol::ack::CURRENT_HOPR_ACK_PROTOCOL);
526 let ack_codec = hopr_transport_protocol::ack::AckCodec::new();
527 let (wire_ack_tx, wire_ack_rx) =
528 hopr_transport_protocol::stream::process_stream_protocol(ack_codec, ack_proto_control).await?;
529
530 let transport_layer = transport_layer.with_processors(tkt_agg_writer);
531
532 processes.insert(HoprTransportProcess::Medium, spawn(transport_layer.run(version)));
533
534 let packet_cfg = PacketInteractionConfig::new(
536 &self.me,
537 me_onchain,
538 self.cfg.protocol.outgoing_ticket_winning_prob,
539 self.cfg.protocol.outgoing_ticket_price,
540 );
541
542 let (tx_from_protocol, rx_from_protocol) = mpsc::unbounded::<ApplicationData>();
543 for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
544 packet_cfg,
545 self.db.clone(),
546 Some(tbf_path),
547 (wire_ack_tx, wire_ack_rx),
548 (mixing_channel_tx, wire_msg_rx),
549 (tx_from_protocol, external_msg_rx),
550 )
551 .await
552 .into_iter()
553 {
554 processes.insert(HoprTransportProcess::Protocol(k), v);
555 }
556
557 let msg_sender = helpers::MessageSender::new(self.process_packet_send.clone(), self.path_planner.clone());
558
559 self.smgr
560 .start(msg_sender, on_incoming_session)
561 .expect("failed to start session manager")
562 .into_iter()
563 .enumerate()
564 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
565 .for_each(|(k, v)| {
566 processes.insert(k, v);
567 });
568
569 let smgr = self.smgr.clone();
570 processes.insert(
571 HoprTransportProcess::SessionsManagement(0),
572 spawn(async move {
573 let _the_process_should_not_end = StreamExt::filter_map(rx_from_protocol, |data| {
574 let smgr = smgr.clone();
575 async move {
576 match smgr.dispatch_message(data).await {
577 Ok(DispatchResult::Processed) => {
578 trace!("message dispatch completed");
579 None
580 }
581 Ok(DispatchResult::Unrelated(data)) => {
582 trace!("unrelated message dispatch completed");
583 Some(data)
584 }
585 Err(e) => {
586 error!(error = %e, "error while processing packet");
587 None
588 }
589 }
590 }
591 })
592 .map(Ok)
593 .forward(on_incoming_data)
594 .await;
595 }),
596 );
597
598 let half_the_hearbeat_interval = self.cfg.heartbeat.interval / 4;
600 processes.insert(
601 HoprTransportProcess::Heartbeat,
602 spawn(async move {
603 hopr_async_runtime::prelude::sleep(half_the_hearbeat_interval).await;
605 heartbeat.heartbeat_loop().await
606 }),
607 );
608
609 Ok(processes)
610 }
611
612 pub fn ticket_aggregator(&self) -> Arc<dyn TicketAggregatorTrait + Send + Sync + 'static> {
613 Arc::new(proxy::TicketAggregatorProxy::new(
614 self.db.clone(),
615 self.process_ticket_aggregate.clone(),
616 self.cfg.protocol.ticket_aggregation.timeout,
617 ))
618 }
619
620 #[tracing::instrument(level = "debug", skip(self))]
621 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
622 if !self.is_allowed_to_access_network(either::Left(peer)).await? {
623 return Err(HoprTransportError::Api(format!(
624 "ping to '{peer}' not allowed due to network registry"
625 )));
626 }
627
628 if peer == &self.me_peerid {
629 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
630 }
631
632 let pinger = self
633 .ping
634 .get()
635 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
636
637 let timeout = sleep(Duration::from_secs(30)).fuse();
638 let ping = (*pinger).ping(vec![*peer]);
639
640 pin_mut!(timeout, ping);
641
642 if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
643 error!(error = %e, "Failed to store the peer observation");
644 }
645
646 let start = current_time().as_unix_timestamp();
647
648 match select(timeout, ping.next().fuse()).await {
649 Either::Left(_) => {
650 warn!(%peer, "Manual ping to peer timed out");
651 return Err(ProtocolError::Timeout.into());
652 }
653 Either::Right((v, _)) => {
654 match v
655 .into_iter()
656 .map(|r| r.map_err(HoprTransportError::NetworkError))
657 .collect::<errors::Result<Vec<Duration>>>()
658 {
659 Ok(d) => info!(%peer, rtt = ?d, "Manual ping succeeded"),
660 Err(e) => {
661 error!(%peer, error = %e, "Manual ping failed");
662 return Err(e);
663 }
664 }
665 }
666 };
667
668 let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::NetworkError(
669 errors::NetworkingError::NonExistingPeer,
670 ))?;
671
672 Ok((
673 peer_status.last_seen.as_unix_timestamp().saturating_sub(start),
674 peer_status,
675 ))
676 }
677
678 #[tracing::instrument(level = "debug", skip(self))]
679 pub async fn new_session(&self, cfg: SessionClientConfig) -> errors::Result<Session> {
680 Ok(self.smgr.new_session(cfg).await?)
681 }
682
683 #[tracing::instrument(level = "info", skip(self, msg), fields(uuid = uuid::Uuid::new_v4().to_string()))]
684 pub async fn send_message(
685 &self,
686 msg: Box<[u8]>,
687 destination: PeerId,
688 options: RoutingOptions,
689 application_tag: Option<u16>,
690 ) -> errors::Result<()> {
691 if let Some(application_tag) = application_tag {
693 if application_tag < RESERVED_SESSION_TAG_UPPER_LIMIT {
694 return Err(HoprTransportError::Api(format!(
695 "Application tag must not be lower than {RESERVED_SESSION_TAG_UPPER_LIMIT}"
696 )));
697 }
698 }
699
700 if msg.len() > PAYLOAD_SIZE {
701 return Err(HoprTransportError::Api(format!(
702 "Message exceeds the maximum allowed size of {PAYLOAD_SIZE} bytes"
703 )));
704 }
705
706 let app_data = ApplicationData::new_from_owned(application_tag, msg)?;
707
708 let path = self.path_planner.resolve_path(destination, options).await?;
711 let sender = self.process_packet_send.get().ok_or_else(|| {
712 HoprTransportError::Api("send msg: failed because message processing is not yet initialized".into())
713 })?;
714
715 sender
716 .send_packet(app_data, path)
717 .await
718 .map_err(|e| HoprTransportError::Api(format!("send msg failed to enqueue msg: {e}")))?
719 .consume_and_wait(crate::constants::PACKET_QUEUE_TIMEOUT_MILLISECONDS)
720 .await
721 .map_err(|e| HoprTransportError::Api(e.to_string()))
722 }
723
724 #[tracing::instrument(level = "debug", skip(self))]
725 pub async fn aggregate_tickets(&self, channel_id: &Hash) -> errors::Result<()> {
726 let entry = self
727 .db
728 .get_channel_by_id(None, channel_id)
729 .await
730 .map_err(hopr_db_sql::api::errors::DbError::from)
731 .map_err(HoprTransportError::from)
732 .and_then(|c| {
733 if let Some(c) = c {
734 Ok(c)
735 } else {
736 Err(ProtocolError::ChannelNotFound.into())
737 }
738 })?;
739
740 if entry.status != ChannelStatus::Open {
741 return Err(ProtocolError::ChannelClosed.into());
742 }
743
744 Ok(Arc::new(proxy::TicketAggregatorProxy::new(
745 self.db.clone(),
746 self.process_ticket_aggregate.clone(),
747 self.cfg.protocol.ticket_aggregation.timeout,
748 ))
749 .aggregate_tickets(&entry.get_id(), Default::default())
750 .await?)
751 }
752
753 #[tracing::instrument(level = "debug", skip(self))]
754 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
755 Ok(self
756 .db
757 .get_accounts(None, true)
758 .await
759 .map_err(hopr_db_sql::api::errors::DbError::from)?
760 .into_iter()
761 .map(|entry| {
762 (
763 PeerId::from(entry.public_key),
764 entry.chain_addr,
765 Vec::from_iter(entry.get_multiaddr().into_iter()),
766 )
767 })
768 .collect())
769 }
770
771 pub async fn is_allowed_to_access_network<'a>(
772 &self,
773 address_like: either::Either<&'a PeerId, Address>,
774 ) -> errors::Result<bool>
775 where
776 T: 'a,
777 {
778 let db_clone = self.db.clone();
779 let address_like_noref = address_like.map_left(|peer| *peer);
780
781 Ok(self
782 .db
783 .begin_transaction()
784 .await
785 .map_err(hopr_db_sql::api::errors::DbError::from)?
786 .perform(|tx| {
787 Box::pin(async move {
788 match address_like_noref {
789 either::Left(peer) => {
790 let pk = OffchainPublicKey::try_from(peer)?;
791 if let Some(address) = db_clone.translate_key(Some(tx), pk).await? {
792 db_clone.is_allowed_in_network_registry(Some(tx), &address).await
793 } else {
794 Err(hopr_db_sql::errors::DbSqlError::LogicalError(
795 "cannot translate off-chain key".into(),
796 ))
797 }
798 }
799 either::Right(address) => db_clone.is_allowed_in_network_registry(Some(tx), &address).await,
800 }
801 })
802 })
803 .await
804 .map_err(hopr_db_sql::api::errors::DbError::from)?)
805 }
806
807 #[tracing::instrument(level = "debug", skip(self))]
808 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
809 self.network
810 .get(&self.me_peerid)
811 .await
812 .unwrap_or_else(|e| {
813 error!(error = %e, "failed to obtain listening multi-addresses");
814 None
815 })
816 .map(|peer| peer.multiaddresses)
817 .unwrap_or_default()
818 }
819
820 #[tracing::instrument(level = "debug", skip(self))]
821 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
822 let mut mas = self
823 .local_multiaddresses()
824 .into_iter()
825 .filter(|ma| {
826 hopr_transport_identity::multiaddrs::is_supported(ma)
827 && (self.cfg.transport.announce_local_addresses
828 || !hopr_transport_identity::multiaddrs::is_private(ma))
829 })
830 .map(|ma| strip_p2p_protocol(&ma))
831 .filter(|v| !v.is_empty())
832 .collect::<Vec<_>>();
833
834 mas.sort_by(|l, r| {
835 let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
836 let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
837
838 if !(is_left_dns ^ is_right_dns) {
839 std::cmp::Ordering::Equal
840 } else if is_left_dns {
841 std::cmp::Ordering::Less
842 } else {
843 std::cmp::Ordering::Greater
844 }
845 });
846
847 mas
848 }
849
850 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
851 self.my_multiaddresses.clone()
852 }
853
854 #[tracing::instrument(level = "debug", skip(self))]
855 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
856 self.network
857 .get(peer)
858 .await
859 .unwrap_or(None)
860 .map(|peer| peer.multiaddresses)
861 .unwrap_or(vec![])
862 }
863
864 #[tracing::instrument(level = "debug", skip(self))]
865 pub async fn network_health(&self) -> Health {
866 self.network.health().await
867 }
868
869 #[tracing::instrument(level = "debug", skip(self))]
870 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
871 Ok(self.network.connected_peers().await?)
872 }
873
874 #[tracing::instrument(level = "debug", skip(self))]
875 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
876 Ok(self.network.get(peer).await?)
877 }
878
879 #[tracing::instrument(level = "debug", skip(self))]
880 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
881 let ticket_stats = self.db.get_ticket_statistics(None).await?;
882
883 Ok(TicketStatistics {
884 winning_count: ticket_stats.winning_tickets,
885 unredeemed_value: ticket_stats.unredeemed_value,
886 redeemed_value: ticket_stats.redeemed_value,
887 neglected_value: ticket_stats.neglected_value,
888 rejected_value: ticket_stats.rejected_value,
889 })
890 }
891
892 #[tracing::instrument(level = "debug", skip(self))]
893 pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
894 if let Some(channel) = self
895 .db
896 .get_channel_by_id(None, channel_id)
897 .await
898 .map_err(hopr_db_sql::api::errors::DbError::from)?
899 {
900 let own_address: Address = self
901 .db
902 .translate_key(None, ChainOrPacketKey::PacketKey(*self.me.public()))
903 .await?
904 .ok_or_else(|| {
905 HoprTransportError::Api("Failed to translate the off-chain key to on-chain address".into())
906 })?
907 .try_into()?;
908
909 if channel.destination == own_address {
910 Ok(Some(self.db.get_tickets((&channel).into()).await?))
911 } else {
912 Ok(None)
913 }
914 } else {
915 Ok(None)
916 }
917 }
918
919 #[tracing::instrument(level = "debug", skip(self))]
920 pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
921 Ok(self
922 .db
923 .get_all_tickets()
924 .await?
925 .into_iter()
926 .map(|v| v.ticket.leak())
927 .collect())
928 }
929}