1pub mod config;
17pub mod constants;
19pub mod errors;
21pub mod path;
23pub mod protocol;
25
26mod multiaddrs;
27
28#[cfg(feature = "capture")]
29mod capture;
30mod pipeline;
31pub mod socket;
32
33use std::{
34 sync::{Arc, OnceLock},
35 time::Duration,
36};
37
38use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
39use futures::{
40 FutureExt, SinkExt, StreamExt,
41 channel::mpsc::{Sender, channel},
42 stream::select_with_strategy,
43};
44pub use hopr_api::{
45 Multiaddr, PeerId,
46 network::{Health, traits::NetworkView},
47 types::{
48 crypto::{
49 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
50 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
51 },
52 internal::{prelude::HoprPseudonym, routing::RoutingOptions},
53 },
54};
55use hopr_api::{
56 chain::{ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
57 ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
58 graph::{NetworkGraphUpdate, NetworkGraphView, traits::EdgeObservableRead},
59 network::{BoxedProcessFn, NetworkStreamControl},
60 types::primitive::prelude::*,
61};
62use hopr_crypto_packet::prelude::PacketSignal;
63pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
64use hopr_protocol_hopr::MemorySurbStore;
65pub use hopr_transport_probe::{NeighborTelemetry, PathTelemetry, errors::ProbeError, ping::PingQueryReplier};
66use hopr_transport_probe::{
67 Probe,
68 ping::{PingConfig, Pinger},
69};
70pub use hopr_transport_session as session;
71#[cfg(feature = "runtime-tokio")]
72pub use hopr_transport_session::transfer_session;
73pub use hopr_transport_session::{
74 Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
75 SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
76 errors::{SessionManagerError, TransportSessionError},
77};
78use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
79#[cfg(feature = "telemetry")]
80pub use hopr_transport_session::{SessionAckMode, SessionLifecycleState};
81pub use hopr_transport_tag_allocator::TagAllocatorConfig;
82use hopr_utils::{network_types::prelude::*, runtime::AbortableList};
83pub use multiaddr::Protocol;
84use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
85use tracing::{Instrument, debug, error, trace, warn};
86
87#[cfg(feature = "runtime-tokio")]
88use crate::path::BackgroundPathCacheRefreshable;
89pub use crate::{config::HoprProtocolConfig, protocol::PeerProtocolCounterRegistry};
90use crate::{
91 constants::SESSION_INITIATION_TIMEOUT_BASE,
92 errors::HoprTransportError,
93 multiaddrs::strip_p2p_protocol,
94 path::{HoprGraphPathSelector, PathPlanner},
95 pipeline::HoprPacketPipelineBuilder,
96 socket::HoprSocket,
97};
98
99pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
100
101pub use hopr_api as api;
102use hopr_api::{
103 chain::{ChainReadTicketOperations, ChainWriteTicketOperations},
104 tickets::TicketFactory,
105 types::internal::routing::DestinationRouting,
106};
107
108lazy_static::lazy_static! {
110 static ref SESSION_INITIATION_TIMEOUT_MAX: Duration = 2 * SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
111
112 static ref PEER_ID_CACHE: moka::sync::Cache<PeerId, OffchainPublicKey> = moka::sync::Cache::builder()
113 .time_to_idle(Duration::from_mins(15))
114 .max_capacity(10_000)
115 .build();
116
117 static ref RANDOM_DATA: [u8; 400] = hopr_api::types::crypto_random::random_bytes();
118}
119
120pub fn peer_id_to_public_key(peer_id: &PeerId) -> crate::errors::Result<OffchainPublicKey> {
125 PEER_ID_CACHE
126 .try_get_with_by_ref(peer_id, move || {
127 OffchainPublicKey::from_peerid(peer_id).map_err(|e| e.into())
128 })
129 .map_err(|e: Arc<HoprTransportError>| {
130 crate::errors::HoprTransportError::Other(anyhow::anyhow!(
131 "failed to convert peer_id ({:?}) to an offchain public key: {e}",
132 peer_id
133 ))
134 })
135}
136
137#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
138pub enum HoprTransportProcess {
139 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
140 Medium,
141 #[strum(to_string = "HOPR packet pipeline ({0})")]
142 Pipeline(protocol::PacketPipelineProcesses),
143 #[strum(to_string = "session manager sub-process #{0}")]
144 SessionsManagement(usize),
145 #[strum(to_string = "network probing sub-process: {0}")]
146 Probing(hopr_transport_probe::HoprProbeProcess),
147 #[cfg(feature = "runtime-tokio")]
148 #[strum(to_string = "path cache refresh")]
149 PathRefresh,
150 #[strum(to_string = "sync of outgoing ticket indices")]
151 OutgoingIndexSync,
152 #[strum(to_string = "periodic protocol counter flush")]
153 CounterFlush,
154 #[strum(to_string = "mixer→wire forwarder")]
155 MixerForwarder,
156 #[cfg(feature = "capture")]
157 #[strum(to_string = "packet capture")]
158 Capture,
159}
160
161type HoprSessionManager = SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>>;
163
164#[derive(Debug, Clone)]
169pub struct HoprSessionConfigurator {
170 id: SessionId,
171 smgr: std::sync::Weak<HoprSessionManager>,
173}
174
175impl HoprSessionConfigurator {
176 pub fn id(&self) -> &SessionId {
178 &self.id
179 }
180
181 pub async fn ping(&self) -> errors::Result<()> {
189 Ok(self
190 .smgr
191 .upgrade()
192 .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
193 .ping_session(&self.id)
194 .await?)
195 }
196
197 pub async fn get_surb_balancer_config(&self) -> errors::Result<Option<SurbBalancerConfig>> {
203 Ok(self
204 .smgr
205 .upgrade()
206 .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
207 .get_surb_balancer_config(&self.id)
208 .await?)
209 }
210
211 pub async fn update_surb_balancer_config(&self, config: SurbBalancerConfig) -> errors::Result<()> {
216 Ok(self
217 .smgr
218 .upgrade()
219 .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
220 .update_surb_balancer_config(&self.id, config)
221 .await?)
222 }
223
224 pub async fn close(&self) -> bool {
231 match self.smgr.upgrade() {
232 Some(smgr) => smgr.close_session(&self.id).await,
233 None => false,
234 }
235 }
236}
237
238pub struct HoprTransport<Chain, Graph, Net> {
241 packet_key: OffchainKeypair,
242 chain_key: ChainKeypair,
243 chain_api: Chain,
244 ping: Arc<OnceLock<Pinger>>,
245 network: Arc<OnceLock<Net>>,
246 graph: Graph,
247 path_planner: PathPlanner<MemorySurbStore, Chain, HoprGraphPathSelector<Graph>>,
248 my_multiaddresses: Vec<Multiaddr>,
249 smgr: Arc<HoprSessionManager>,
250 session_telemetry_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
251 probing_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
252 counters: PeerProtocolCounterRegistry,
253 cfg: HoprProtocolConfig,
254}
255
256impl<Chain, Graph, Net> HoprTransport<Chain, Graph, Net>
257where
258 Chain: ChainReadChannelOperations
259 + ChainReadAccountOperations
260 + ChainWriteTicketOperations
261 + ChainKeyOperations
262 + ChainReadTicketOperations
263 + ChainValues
264 + Clone
265 + Send
266 + Sync
267 + 'static,
268 Graph: NetworkGraphView<NodeId = OffchainPublicKey>
269 + NetworkGraphUpdate
270 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
271 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
272 + Clone
273 + Send
274 + Sync
275 + 'static,
276 <Graph as NetworkGraphView>::Observed: hopr_api::graph::traits::EdgeObservableRead + Send,
277 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
278 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
279 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
280 Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
281{
282 pub fn new(
283 identity: (&ChainKeypair, &OffchainKeypair),
284 resolver: Chain,
285 graph: Graph,
286 my_multiaddresses: Vec<Multiaddr>,
287 cfg: HoprProtocolConfig,
288 ) -> errors::Result<Self> {
289 let me_offchain = *identity.1.public();
290 let planner_config = cfg.path_planner;
291 let selector = HoprGraphPathSelector::new(
292 me_offchain,
293 graph.clone(),
294 planner_config.max_cached_paths,
295 planner_config.edge_penalty,
296 planner_config.min_ack_rate,
297 planner_config.min_paths_anonymity_floor,
298 );
299
300 let tag_allocators = hopr_transport_tag_allocator::create_allocators_from_config(&cfg.session.tag_allocator)?;
301
302 let mut session_tag_allocator = None;
303 let mut session_telemetry_tag_allocator = None;
304 let mut probing_tag_allocator = None;
305 for (usage, alloc) in tag_allocators {
306 match usage {
307 hopr_transport_tag_allocator::Usage::Session => session_tag_allocator = Some(alloc),
308 hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry => {
309 session_telemetry_tag_allocator = Some(alloc)
310 }
311 hopr_transport_tag_allocator::Usage::ProvingTelemetry => probing_tag_allocator = Some(alloc),
312 }
313 }
314 let session_telemetry_tag_allocator = session_telemetry_tag_allocator
315 .ok_or_else(|| HoprTransportError::Api("session telemetry tag allocator missing".into()))?;
316 let probing_tag_allocator =
317 probing_tag_allocator.ok_or_else(|| HoprTransportError::Api("probing tag allocator missing".into()))?;
318
319 Ok(Self {
320 packet_key: identity.1.clone(),
321 chain_key: identity.0.clone(),
322 ping: Arc::new(OnceLock::new()),
323 network: Arc::new(OnceLock::new()),
324 graph,
325 path_planner: PathPlanner::new(
326 me_offchain,
327 MemorySurbStore::new(cfg.packet.surb_store),
328 resolver.clone(),
329 selector,
330 planner_config,
331 ),
332 my_multiaddresses,
333 smgr: Arc::new(SessionManager::new(SessionManagerConfig {
334 frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
335 .ok()
336 .and_then(|s| s.parse::<usize>().ok())
337 .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
338 .max(ApplicationData::PAYLOAD_SIZE),
339 max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
340 .ok()
341 .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
342 .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
343 .max(Duration::from_millis(100)),
344 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
345 idle_timeout: cfg.session.idle_timeout,
346 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
347 initial_return_session_egress_rate: 10,
348 minimum_surb_buffer_duration: cfg.session.balancer_minimum_surb_buffer_duration,
349 maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
350 surb_balance_notify_period: None,
351 surb_target_notify: true,
352 maximum_sessions: cfg.session.maximum_managed_sessions,
353 })),
354 chain_api: resolver,
355 session_telemetry_tag_allocator,
356 probing_tag_allocator,
357 counters: PeerProtocolCounterRegistry::default(),
358 cfg,
359 })
360 }
361
362 pub async fn run_relay<T, TFact, Ct>(
368 &self,
369 cover_traffic: Ct,
370 network: Net,
371 network_process: BoxedProcessFn,
372 ticket_events: T,
373 ticket_factory: TFact,
374 on_incoming_session: Sender<IncomingSession>,
375 ) -> errors::Result<(
376 HoprSocket<
377 futures::stream::BoxStream<'static, ApplicationDataIn>,
378 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
379 >,
380 AbortableList<HoprTransportProcess>,
381 )>
382 where
383 T: futures::Sink<hopr_api::node::TicketEvent> + Clone + Send + Unpin + 'static,
384 T::Error: std::error::Error + Clone + Send,
385 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
386 TFact: TicketFactory + Clone + Send + Sync + 'static,
387 {
388 self.run_inner(
389 protocol::NodeType::Relay,
390 cover_traffic,
391 network,
392 network_process,
393 ticket_events,
394 ticket_factory,
395 Some(on_incoming_session),
396 )
397 .await
398 }
399
400 pub async fn run_exit<TFact, Ct>(
405 &self,
406 cover_traffic: Ct,
407 network: Net,
408 network_process: BoxedProcessFn,
409 ticket_factory: TFact,
410 on_incoming_session: Sender<IncomingSession>,
411 ) -> errors::Result<(
412 HoprSocket<
413 futures::stream::BoxStream<'static, ApplicationDataIn>,
414 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
415 >,
416 AbortableList<HoprTransportProcess>,
417 )>
418 where
419 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
420 TFact: TicketFactory + Clone + Send + Sync + 'static,
421 {
422 self.run_inner(
423 protocol::NodeType::Exit,
424 cover_traffic,
425 network,
426 network_process,
427 futures::sink::drain(),
428 ticket_factory,
429 Some(on_incoming_session),
430 )
431 .await
432 }
433
434 pub async fn run_entry<TFact, Ct>(
440 &self,
441 cover_traffic: Ct,
442 network: Net,
443 network_process: BoxedProcessFn,
444 ticket_factory: TFact,
445 ) -> errors::Result<(
446 HoprSocket<
447 futures::stream::BoxStream<'static, ApplicationDataIn>,
448 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
449 >,
450 AbortableList<HoprTransportProcess>,
451 )>
452 where
453 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
454 TFact: TicketFactory + Clone + Send + Sync + 'static,
455 {
456 self.run_inner(
457 protocol::NodeType::Entry,
458 cover_traffic,
459 network,
460 network_process,
461 futures::sink::drain(),
462 ticket_factory,
463 None,
464 )
465 .await
466 }
467
468 #[allow(clippy::too_many_arguments)]
475 async fn run_inner<T, TFact, Ct>(
476 &self,
477 role: protocol::NodeType,
478 cover_traffic: Ct,
479 network: Net,
480 network_process: BoxedProcessFn,
481 ticket_events: T,
482 ticket_factory: TFact,
483 on_incoming_session: Option<Sender<IncomingSession>>,
484 ) -> errors::Result<(
485 HoprSocket<
486 futures::stream::BoxStream<'static, ApplicationDataIn>,
487 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
488 >,
489 AbortableList<HoprTransportProcess>,
490 )>
491 where
492 T: futures::Sink<hopr_api::node::TicketEvent> + Clone + Send + Unpin + 'static,
493 T::Error: std::error::Error + Clone + Send,
494 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
495 TFact: TicketFactory + Clone + Send + Sync + 'static,
496 {
497 let mut processes = AbortableList::<HoprTransportProcess>::default();
498
499 let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
500 channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
501
502 let transport_network = network;
505 let transport_layer_process = network_process;
506
507 let msg_codec = crate::protocol::HoprBinaryCodec {};
508 let (wire_msg_tx, wire_msg_rx) =
509 protocol::stream::process_stream_protocol(msg_codec, transport_network.clone(), self.cfg.stream).await?;
510
511 let mut mixer_cfg = self.cfg.mixer;
516 mixer_cfg.metric_delay_window = u64::try_from(5 * mixer_cfg.delay_range.as_millis())
517 .unwrap_or(u64::MAX)
518 .max(1);
519 let (mixing_channel_tx, mix_rx) = hopr_transport_mixer::channel(mixer_cfg);
520 processes.insert(
521 HoprTransportProcess::MixerForwarder,
522 hopr_utils::spawn_as_abortable!(async move {
523 mix_rx
524 .fold(wire_msg_tx, |mut sink, item| async move {
525 if sink.send(item).await.is_err() {
526 tracing::error!(
527 task = %HoprTransportProcess::MixerForwarder,
528 "wire sink dropped — discarding mixed packet"
529 );
530 }
531 sink
532 })
533 .await;
534 tracing::warn!(
535 task = %HoprTransportProcess::MixerForwarder,
536 "long-running background task finished"
537 );
538 }),
539 );
540
541 #[cfg(feature = "runtime-tokio")]
543 processes.insert(
544 HoprTransportProcess::PathRefresh,
545 hopr_utils::spawn_as_abortable!(self.path_planner.run_background_refresh()),
546 );
547
548 processes.insert(
549 HoprTransportProcess::Medium,
550 hopr_utils::spawn_as_abortable!(transport_layer_process().inspect(|_| tracing::warn!(
551 task = %HoprTransportProcess::Medium,
552 "long-running background task finished"
553 ))),
554 );
555
556 let msg_protocol_bidirectional_channel_capacity =
557 std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
558 .ok()
559 .and_then(|s| s.trim().parse::<usize>().ok())
560 .filter(|&c| c > 0)
561 .unwrap_or(16_384);
562
563 debug!(
564 capacity = msg_protocol_bidirectional_channel_capacity,
565 "creating protocol bidirectional channel"
566 );
567 let (tx_from_protocol, rx_from_protocol) =
568 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
569
570 let cover_traffic_allocated_tag = self
574 .session_telemetry_tag_allocator
575 .allocate()
576 .ok_or_else(|| HoprTransportError::Api("failed to allocate cover traffic tag".into()))?;
577 let cover_traffic_tag: Tag = cover_traffic_allocated_tag.value().into();
578
579 let rx_from_protocol = rx_from_protocol.filter_map(move |(pseudonym, data)| {
582 let _keep_alive = &cover_traffic_allocated_tag;
583 async move { (data.data.application_tag != cover_traffic_tag).then_some((pseudonym, data)) }
584 });
585
586 let cover_traffic_stream = CoverTrafficGeneration::build(&cover_traffic).filter_map(move |routing| {
588 let start =
589 hopr_api::types::crypto_random::random_integer(0, Some((RANDOM_DATA.len() - 100) as u64)) as usize;
590 let data = &RANDOM_DATA[start..start + 100];
591
592 futures::future::ready(if let Ok(data) = ApplicationData::new(cover_traffic_tag, data) {
593 Some((routing, ApplicationDataOut::with_no_packet_info(data)))
594 } else {
595 tracing::error!("failed to construct cover traffic packet");
596 None
597 })
598 });
599
600 let merged_unresolved_output_data =
602 select_with_strategy(unresolved_routing_msg_rx, cover_traffic_stream, |_: &mut ()| {
603 futures::stream::PollNext::Left
604 });
605
606 let path_planner = self.path_planner.clone();
613 let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
614 let routing_concurrency = {
615 let avail = std::thread::available_parallelism()
616 .ok()
617 .map(|n| n.get())
618 .unwrap_or(1)
619 .max(1)
620 * 8;
621 self.cfg
622 .packet
623 .pipeline
624 .output_concurrency
625 .filter(|&n| n > 0)
626 .unwrap_or(avail)
627 };
628 let all_resolved_external_msg_rx = merged_unresolved_output_data
629 .then_concurrent(
630 move |(unresolved, mut data)| {
631 let path_planner = path_planner.clone();
632 async move {
633 trace!(?unresolved, "resolving routing for packet");
634 match path_planner
635 .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
636 .await
637 {
638 Ok((resolved, rem_surbs)) => {
639 let mut signals_to_dst = data
643 .packet_info
644 .as_ref()
645 .map(|info| info.signals_to_destination)
646 .unwrap_or_default();
647
648 if resolved.is_return() {
649 signals_to_dst = match rem_surbs {
650 Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
651 signals_to_dst | PacketSignal::SurbDistress
652 }
653 Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
654 _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
655 };
656 } else {
657 signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
659 }
660
661 data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
662 trace!(?resolved, "resolved routing for packet");
663 Some((resolved, data))
664 }
665 Err(error) => {
666 error!(%error, "failed to resolve routing");
667 None
668 }
669 }
670 }
671 .in_current_span()
672 },
673 routing_concurrency,
674 )
675 .filter_map(futures::future::ready);
676
677 let channels_dst = self
678 .chain_api
679 .domain_separators()
680 .await
681 .map_err(HoprTransportError::chain)?
682 .channel;
683
684 let pipeline_builder = HoprPacketPipelineBuilder::new()
685 .identity((&self.chain_key, &self.packet_key))
686 .transport((mixing_channel_tx, wire_msg_rx))
687 .api((tx_from_protocol, all_resolved_external_msg_rx))
688 .surb_store(self.path_planner.surb_store.clone())
689 .chain_api(self.chain_api.clone())
690 .ticket_factory(ticket_factory)
691 .channels_dst(channels_dst)
692 .with_counters(self.counters.clone())
693 .with_config(self.cfg.packet);
694
695 let pipeline_processes = match role {
696 protocol::NodeType::Relay => pipeline_builder.with_ticket_events(ticket_events).build_for_relay(),
697 protocol::NodeType::Exit => pipeline_builder.build_for_exit(),
698 protocol::NodeType::Entry => pipeline_builder.build_for_entry(),
699 };
700 processes.extend_from(pipeline_processes);
701
702 let flush_counters = self.counters.clone();
704 let flush_graph = self.graph.clone();
705 let flush_me = *self.packet_key.public();
706 let flush_interval = self.cfg.counter_flush_interval;
707 processes.insert(
708 HoprTransportProcess::CounterFlush,
709 hopr_utils::spawn_as_abortable!(async move {
710 use hopr_api::graph::traits::{EdgeObservableWrite, EdgeWeightType};
711
712 futures_time::stream::interval(futures_time::time::Duration::from(flush_interval))
713 .for_each(|_| {
714 for (peer, num_packets, num_acks) in flush_counters.drain() {
715 tracing::trace!(
716 %peer,
717 num_packets,
718 num_acks,
719 "flushing protocol conformance counters"
720 );
721 flush_graph.upsert_edge(&flush_me, &peer, |obs| {
722 obs.record(EdgeWeightType::ImmediateProtocolConformance { num_packets, num_acks });
723 });
724 }
725 futures::future::ready(())
726 })
727 .await;
728 }),
729 );
730
731 let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
733 .ok()
734 .and_then(|s| s.trim().parse::<usize>().ok())
735 .filter(|&c| c > 0)
736 .unwrap_or(128);
737 debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
738 let (manual_ping_tx, manual_ping_rx) =
739 channel::<(OffchainPublicKey, PingQueryReplier)>(manual_ping_channel_capacity);
740
741 let probe = Probe::new(self.cfg.probe, self.probing_tag_allocator.clone());
742
743 let (probing_processes, probe_classifier) = probe
744 .continuously_scan(
745 unresolved_routing_msg_tx.clone(),
746 manual_ping_rx,
747 cover_traffic,
748 self.graph.clone(),
749 )
750 .await;
751
752 processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
753
754 self.ping
756 .clone()
757 .set(Pinger::new(
758 PingConfig {
759 timeout: self.cfg.probe.timeout,
760 },
761 manual_ping_tx,
762 ))
763 .map_err(|_| HoprTransportError::Api("must set the ticket aggregation writer only once".into()))?;
764
765 let smgr_start_res = if role != protocol::NodeType::Entry {
767 self.smgr.start(
769 unresolved_routing_msg_tx.clone(),
770 on_incoming_session.ok_or_else(|| {
771 HoprTransportError::Api("on_incoming_session channel is required for relay/exit nodes".into())
772 })?,
773 )
774 } else {
775 self.smgr
777 .start(unresolved_routing_msg_tx.clone(), futures::sink::drain())
778 };
779
780 smgr_start_res
781 .map_err(|_| HoprTransportError::Api("failed to start session manager".into()))?
782 .into_iter()
783 .enumerate()
784 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
785 .for_each(|(k, v)| {
786 processes.insert(k, v);
787 });
788
789 let (on_incoming_data_tx, on_incoming_data_rx) =
798 channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
799 let smgr = self.smgr.clone();
800 let unresolved_routing_msg_tx_for_task = unresolved_routing_msg_tx.clone();
801 processes.insert(
802 HoprTransportProcess::SessionsManagement(0),
803 hopr_utils::spawn_as_abortable!(async move {
804 probe_classifier
805 .filter_stream(unresolved_routing_msg_tx_for_task, rx_from_protocol)
806 .filter_map(move |(pseudonym, data)| {
807 let smgr = smgr.clone();
808 async move {
809 match smgr.dispatch_message(pseudonym, data).await {
810 Ok(DispatchResult::Processed) => {
811 tracing::trace!("message dispatch completed");
812 None
813 }
814 Ok(DispatchResult::Unrelated(data)) => {
815 tracing::trace!("unrelated message dispatch completed");
816 Some(data)
817 }
818 Err(error) => {
819 tracing::error!(%error, "error while dispatching packet in the session manager");
820 None
821 }
822 }
823 }
824 })
825 .fold(on_incoming_data_tx, |mut tx, data| async move {
826 if tx.send(data).await.is_err() {
827 tracing::error!(
828 task = %HoprTransportProcess::SessionsManagement(0),
829 "incoming-data channel disconnected — dropping unrelated packet; \
830 HoprSocket must be consumed or drained by the caller"
831 );
832 }
833 tx
834 })
835 .await;
836 tracing::warn!(
837 task = %HoprTransportProcess::SessionsManagement(0),
838 "long-running background task finished"
839 );
840 }),
841 );
842
843 self.network
845 .clone()
846 .set(transport_network)
847 .map_err(|_| HoprTransportError::Api("transport network viewer already set".into()))?;
848
849 Ok((
850 (on_incoming_data_rx.boxed(), unresolved_routing_msg_tx).into(),
851 processes,
852 ))
853 }
854
855 #[tracing::instrument(level = "debug", skip(self))]
856 pub async fn ping(
857 &self,
858 peer: &OffchainPublicKey,
859 ) -> errors::Result<(std::time::Duration, <Graph as NetworkGraphView>::Observed)> {
860 let me: &OffchainPublicKey = self.packet_key.public();
861 if peer == me {
862 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
863 }
864
865 let pinger = self
866 .ping
867 .get()
868 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
869
870 let latency = (*pinger).ping(peer).await?;
871
872 if let Some(observations) = self.graph.edge(me, peer) {
873 Ok((latency, observations))
874 } else {
875 Err(HoprTransportError::Api(format!(
876 "no observations available for peer {peer}",
877 )))
878 }
879 }
880
881 #[tracing::instrument(level = "debug", skip(self))]
882 pub async fn new_session(
883 &self,
884 destination: Address,
885 target: SessionTarget,
886 cfg: SessionClientConfig,
887 ) -> errors::Result<(HoprSession, HoprSessionConfigurator)> {
888 let session = self.smgr.new_session(destination, target, cfg).await?;
889 let id = *session.id();
890 Ok((
891 session,
892 HoprSessionConfigurator {
893 id,
894 smgr: Arc::downgrade(&self.smgr),
895 },
896 ))
897 }
898
899 #[tracing::instrument(level = "debug", skip(self))]
900 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
901 self.network
902 .get()
903 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
904 .map(|network| network.listening_as().into_iter().collect())
905 .unwrap_or_default()
906 }
907
908 #[tracing::instrument(level = "debug", skip(self))]
909 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
910 let mut mas = self
911 .local_multiaddresses()
912 .into_iter()
913 .filter(|ma| {
914 crate::multiaddrs::is_supported(ma)
915 && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
916 })
917 .map(|ma| strip_p2p_protocol(&ma))
918 .filter(|v| !v.is_empty())
919 .collect::<Vec<_>>();
920
921 mas.sort_by(|l, r| {
922 let is_left_dns = crate::multiaddrs::is_dns(l);
923 let is_right_dns = crate::multiaddrs::is_dns(r);
924
925 if !(is_left_dns ^ is_right_dns) {
926 std::cmp::Ordering::Equal
927 } else if is_left_dns {
928 std::cmp::Ordering::Less
929 } else {
930 std::cmp::Ordering::Greater
931 }
932 });
933
934 mas
935 }
936
937 pub fn graph(&self) -> &Graph {
939 &self.graph
940 }
941
942 #[tracing::instrument(level = "debug", skip(self))]
943 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
944 self.network
945 .get()
946 .map(|network| network.listening_as().into_iter().collect())
947 .unwrap_or_else(|| {
948 tracing::error!("transport network is not yet initialized, cannot fetch announced multiaddresses");
949 self.my_multiaddresses.clone()
950 })
951 }
952
953 #[tracing::instrument(level = "debug", skip(self))]
954 pub async fn network_observed_multiaddresses(&self, peer: &OffchainPublicKey) -> Vec<Multiaddr> {
955 match self
956 .network
957 .get()
958 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
959 {
960 Ok(network) => network
961 .multiaddress_of(&peer.into())
962 .unwrap_or_default()
963 .into_iter()
964 .collect(),
965 Err(error) => {
966 tracing::error!(%error, "failed to get observed multiaddresses");
967 return vec![];
968 }
969 }
970 }
971
972 #[tracing::instrument(level = "debug", skip(self))]
973 pub async fn network_health(&self) -> Health {
974 self.network
975 .get()
976 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
977 .map(|network| network.health())
978 .unwrap_or(Health::Red)
979 }
980
981 pub async fn network_connected_peers(&self) -> errors::Result<Vec<OffchainPublicKey>> {
982 Ok(futures::stream::iter(
983 self.network
984 .get()
985 .ok_or_else(|| {
986 tracing::error!("transport network is not yet initialized");
987 HoprTransportError::Api("transport network is not yet initialized".into())
988 })?
989 .connected_peers(),
990 )
991 .filter_map(|peer_id| async move {
992 match peer_id_to_public_key(&peer_id) {
993 Ok(key) => Some(key),
994 Err(error) => {
995 tracing::warn!(%peer_id, %error, "failed to convert PeerId to OffchainPublicKey");
996 None
997 }
998 }
999 })
1000 .collect()
1001 .await)
1002 }
1003
1004 #[tracing::instrument(level = "debug", skip(self))]
1005 pub fn network_peer_observations(&self, peer: &OffchainPublicKey) -> Option<<Graph as NetworkGraphView>::Observed> {
1006 self.graph.edge(self.packet_key.public(), peer)
1007 }
1008
1009 #[tracing::instrument(level = "debug", skip(self))]
1011 pub async fn all_network_peers(
1012 &self,
1013 minimum_score: f64,
1014 ) -> errors::Result<Vec<(OffchainPublicKey, <Graph as NetworkGraphView>::Observed)>> {
1015 let me = self.packet_key.public();
1016 Ok(self
1017 .network_connected_peers()
1018 .await?
1019 .into_iter()
1020 .filter_map(|peer| {
1021 let observation = self.graph.edge(me, &peer);
1022 if let Some(info) = observation {
1023 if info.score() >= minimum_score {
1024 Some((peer, info))
1025 } else {
1026 None
1027 }
1028 } else {
1029 None
1030 }
1031 })
1032 .collect::<Vec<_>>())
1033 }
1034}
1035
1036impl<Chain, Graph, Net> NetworkView for HoprTransport<Chain, Graph, Net>
1041where
1042 Net: NetworkView + Send + Sync + 'static,
1043{
1044 fn listening_as(&self) -> std::collections::HashSet<Multiaddr> {
1045 self.network.get().map(|n| n.listening_as()).unwrap_or_default()
1046 }
1047
1048 fn multiaddress_of(&self, peer: &PeerId) -> Option<std::collections::HashSet<Multiaddr>> {
1049 self.network.get()?.multiaddress_of(peer)
1050 }
1051
1052 fn discovered_peers(&self) -> std::collections::HashSet<PeerId> {
1053 self.network.get().map(|n| n.discovered_peers()).unwrap_or_default()
1054 }
1055
1056 fn connected_peers(&self) -> std::collections::HashSet<PeerId> {
1057 self.network.get().map(|n| n.connected_peers()).unwrap_or_default()
1058 }
1059
1060 fn is_connected(&self, peer: &PeerId) -> bool {
1061 self.network.get().map(|n| n.is_connected(peer)).unwrap_or(false)
1062 }
1063
1064 fn health(&self) -> Health {
1065 self.network.get().map(|n| n.health()).unwrap_or(Health::Red)
1066 }
1067
1068 fn subscribe_network_events(
1069 &self,
1070 ) -> impl futures::Stream<Item = hopr_api::network::NetworkEvent> + Send + 'static {
1071 match self.network.get() {
1072 Some(n) => futures::future::Either::Left(n.subscribe_network_events()),
1073 None => futures::future::Either::Right(futures::stream::empty()),
1074 }
1075 }
1076}
1077
1078#[async_trait::async_trait]
1083impl<Chain, Graph, Net> hopr_api::node::TransportOperations for HoprTransport<Chain, Graph, Net>
1084where
1085 Chain: ChainReadChannelOperations
1086 + ChainReadAccountOperations
1087 + hopr_api::chain::ChainWriteTicketOperations
1088 + ChainKeyOperations
1089 + hopr_api::chain::ChainReadTicketOperations
1090 + ChainValues
1091 + Clone
1092 + Send
1093 + Sync
1094 + 'static,
1095 Graph: NetworkGraphView<NodeId = OffchainPublicKey>
1096 + NetworkGraphUpdate
1097 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1098 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1099 + Clone
1100 + Send
1101 + Sync
1102 + 'static,
1103 <Graph as NetworkGraphView>::Observed: EdgeObservableRead + Send,
1104 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed: EdgeObservableRead + Send + 'static,
1105 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1106 Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
1107{
1108 type Error = errors::HoprTransportError;
1109 type Observable = <Graph as NetworkGraphView>::Observed;
1110
1111 async fn ping(&self, key: &OffchainPublicKey) -> Result<(Duration, Self::Observable), Self::Error> {
1112 self.ping(key).await
1113 }
1114
1115 async fn observed_multiaddresses(&self, key: &OffchainPublicKey) -> Vec<Multiaddr> {
1116 self.network_observed_multiaddresses(key).await
1117 }
1118}