1pub mod config;
17pub mod constants;
19pub mod errors;
21pub mod path;
23pub mod protocol;
25
26mod multiaddrs;
27
28#[cfg(feature = "capture")]
29mod capture;
30mod pipeline;
31pub mod socket;
32
33use std::{
34 sync::{Arc, OnceLock},
35 time::Duration,
36};
37
38use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
39use futures::{
40 FutureExt, StreamExt,
41 channel::mpsc::{Sender, channel},
42 stream::select_with_strategy,
43};
44pub use hopr_api::{
45 Multiaddr, PeerId,
46 network::{Health, traits::NetworkView},
47 types::{
48 crypto::{
49 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
50 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
51 },
52 internal::{prelude::HoprPseudonym, routing::RoutingOptions},
53 },
54};
55use hopr_api::{
56 chain::{ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
57 ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
58 graph::{NetworkGraphUpdate, NetworkGraphView, traits::EdgeObservableRead},
59 network::{BoxedProcessFn, NetworkStreamControl},
60 types::primitive::prelude::*,
61};
62use hopr_crypto_packet::prelude::PacketSignal;
63pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
64use hopr_protocol_hopr::MemorySurbStore;
65use hopr_transport_mixer::MixerConfig;
66pub use hopr_transport_probe::{NeighborTelemetry, PathTelemetry, errors::ProbeError, ping::PingQueryReplier};
67use hopr_transport_probe::{
68 Probe,
69 ping::{PingConfig, Pinger},
70};
71pub use hopr_transport_session as session;
72#[cfg(feature = "runtime-tokio")]
73pub use hopr_transport_session::transfer_session;
74pub use hopr_transport_session::{
75 Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
76 SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
77 errors::{SessionManagerError, TransportSessionError},
78};
79use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
80#[cfg(feature = "telemetry")]
81pub use hopr_transport_session::{SessionAckMode, SessionLifecycleState};
82pub use hopr_transport_tag_allocator::TagAllocatorConfig;
83use hopr_utils::{network_types::prelude::*, runtime::AbortableList};
84pub use multiaddr::Protocol;
85use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
86use tracing::{Instrument, debug, error, trace, warn};
87
88#[cfg(feature = "runtime-tokio")]
89use crate::path::BackgroundPathCacheRefreshable;
90pub use crate::{config::HoprProtocolConfig, protocol::PeerProtocolCounterRegistry};
91use crate::{
92 constants::SESSION_INITIATION_TIMEOUT_BASE,
93 errors::HoprTransportError,
94 multiaddrs::strip_p2p_protocol,
95 path::{HoprGraphPathSelector, PathPlanner},
96 pipeline::HoprPacketPipelineBuilder,
97 socket::HoprSocket,
98};
99
100pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
101
102pub use hopr_api as api;
103use hopr_api::{
104 chain::{ChainReadTicketOperations, ChainWriteTicketOperations},
105 tickets::TicketFactory,
106 types::internal::routing::DestinationRouting,
107};
108
109lazy_static::lazy_static! {
111 static ref SESSION_INITIATION_TIMEOUT_MAX: Duration = 2 * SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
112
113 static ref PEER_ID_CACHE: moka::sync::Cache<PeerId, OffchainPublicKey> = moka::sync::Cache::builder()
114 .time_to_idle(Duration::from_mins(15))
115 .max_capacity(10_000)
116 .build();
117
118 static ref RANDOM_DATA: [u8; 400] = hopr_api::types::crypto_random::random_bytes();
119}
120
121pub fn peer_id_to_public_key(peer_id: &PeerId) -> crate::errors::Result<OffchainPublicKey> {
126 PEER_ID_CACHE
127 .try_get_with_by_ref(peer_id, move || {
128 OffchainPublicKey::from_peerid(peer_id).map_err(|e| e.into())
129 })
130 .map_err(|e: Arc<HoprTransportError>| {
131 crate::errors::HoprTransportError::Other(anyhow::anyhow!(
132 "failed to convert peer_id ({:?}) to an offchain public key: {e}",
133 peer_id
134 ))
135 })
136}
137
138#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
139pub enum HoprTransportProcess {
140 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
141 Medium,
142 #[strum(to_string = "HOPR packet pipeline ({0})")]
143 Pipeline(protocol::PacketPipelineProcesses),
144 #[strum(to_string = "session manager sub-process #{0}")]
145 SessionsManagement(usize),
146 #[strum(to_string = "network probing sub-process: {0}")]
147 Probing(hopr_transport_probe::HoprProbeProcess),
148 #[cfg(feature = "runtime-tokio")]
149 #[strum(to_string = "path cache refresh")]
150 PathRefresh,
151 #[strum(to_string = "sync of outgoing ticket indices")]
152 OutgoingIndexSync,
153 #[strum(to_string = "periodic protocol counter flush")]
154 CounterFlush,
155 #[cfg(feature = "capture")]
156 #[strum(to_string = "packet capture")]
157 Capture,
158}
159
160type HoprSessionManager = SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>>;
162
163#[derive(Debug, Clone)]
168pub struct HoprSessionConfigurator {
169 id: SessionId,
170 smgr: std::sync::Weak<HoprSessionManager>,
172}
173
174impl HoprSessionConfigurator {
175 pub fn id(&self) -> &SessionId {
177 &self.id
178 }
179
180 pub async fn ping(&self) -> errors::Result<()> {
188 Ok(self
189 .smgr
190 .upgrade()
191 .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
192 .ping_session(&self.id)
193 .await?)
194 }
195
196 pub async fn get_surb_balancer_config(&self) -> errors::Result<Option<SurbBalancerConfig>> {
202 Ok(self
203 .smgr
204 .upgrade()
205 .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
206 .get_surb_balancer_config(&self.id)
207 .await?)
208 }
209
210 pub async fn update_surb_balancer_config(&self, config: SurbBalancerConfig) -> errors::Result<()> {
215 Ok(self
216 .smgr
217 .upgrade()
218 .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
219 .update_surb_balancer_config(&self.id, config)
220 .await?)
221 }
222
223 pub async fn close(&self) -> bool {
230 match self.smgr.upgrade() {
231 Some(smgr) => smgr.close_session(&self.id).await,
232 None => false,
233 }
234 }
235}
236
237pub struct HoprTransport<Chain, Graph, Net> {
240 packet_key: OffchainKeypair,
241 chain_key: ChainKeypair,
242 chain_api: Chain,
243 ping: Arc<OnceLock<Pinger>>,
244 network: Arc<OnceLock<Net>>,
245 graph: Graph,
246 path_planner: PathPlanner<MemorySurbStore, Chain, HoprGraphPathSelector<Graph>>,
247 my_multiaddresses: Vec<Multiaddr>,
248 smgr: Arc<HoprSessionManager>,
249 session_telemetry_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
250 probing_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
251 counters: PeerProtocolCounterRegistry,
252 cfg: HoprProtocolConfig,
253}
254
255impl<Chain, Graph, Net> HoprTransport<Chain, Graph, Net>
256where
257 Chain: ChainReadChannelOperations
258 + ChainReadAccountOperations
259 + ChainWriteTicketOperations
260 + ChainKeyOperations
261 + ChainReadTicketOperations
262 + ChainValues
263 + Clone
264 + Send
265 + Sync
266 + 'static,
267 Graph: NetworkGraphView<NodeId = OffchainPublicKey>
268 + NetworkGraphUpdate
269 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
270 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
271 + Clone
272 + Send
273 + Sync
274 + 'static,
275 <Graph as NetworkGraphView>::Observed: hopr_api::graph::traits::EdgeObservableRead + Send,
276 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
277 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
278 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
279 Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
280{
281 pub fn new(
282 identity: (&ChainKeypair, &OffchainKeypair),
283 resolver: Chain,
284 graph: Graph,
285 my_multiaddresses: Vec<Multiaddr>,
286 cfg: HoprProtocolConfig,
287 ) -> errors::Result<Self> {
288 let me_offchain = *identity.1.public();
289 let planner_config = cfg.path_planner;
290 let selector = HoprGraphPathSelector::new(
291 me_offchain,
292 graph.clone(),
293 planner_config.max_cached_paths,
294 planner_config.edge_penalty,
295 planner_config.min_ack_rate,
296 );
297
298 let tag_allocators = hopr_transport_tag_allocator::create_allocators_from_config(&cfg.session.tag_allocator)?;
299
300 let mut session_tag_allocator = None;
301 let mut session_telemetry_tag_allocator = None;
302 let mut probing_tag_allocator = None;
303 for (usage, alloc) in tag_allocators {
304 match usage {
305 hopr_transport_tag_allocator::Usage::Session => session_tag_allocator = Some(alloc),
306 hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry => {
307 session_telemetry_tag_allocator = Some(alloc)
308 }
309 hopr_transport_tag_allocator::Usage::ProvingTelemetry => probing_tag_allocator = Some(alloc),
310 }
311 }
312 let session_tag_allocator =
313 session_tag_allocator.ok_or_else(|| HoprTransportError::Api("session tag allocator missing".into()))?;
314 let session_telemetry_tag_allocator = session_telemetry_tag_allocator
315 .ok_or_else(|| HoprTransportError::Api("session telemetry tag allocator missing".into()))?;
316 let probing_tag_allocator =
317 probing_tag_allocator.ok_or_else(|| HoprTransportError::Api("probing tag allocator missing".into()))?;
318
319 Ok(Self {
320 packet_key: identity.1.clone(),
321 chain_key: identity.0.clone(),
322 ping: Arc::new(OnceLock::new()),
323 network: Arc::new(OnceLock::new()),
324 graph,
325 path_planner: PathPlanner::new(
326 me_offchain,
327 MemorySurbStore::new(cfg.packet.surb_store),
328 resolver.clone(),
329 selector,
330 planner_config,
331 ),
332 my_multiaddresses,
333 smgr: Arc::new(SessionManager::new(
334 SessionManagerConfig {
335 frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
336 .ok()
337 .and_then(|s| s.parse::<usize>().ok())
338 .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
339 .max(ApplicationData::PAYLOAD_SIZE),
340 max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
341 .ok()
342 .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
343 .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
344 .max(Duration::from_millis(100)),
345 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
346 idle_timeout: cfg.session.idle_timeout,
347 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
348 initial_return_session_egress_rate: 10,
349 minimum_surb_buffer_duration: cfg.session.balancer_minimum_surb_buffer_duration,
350 maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
351 surb_balance_notify_period: None,
352 surb_target_notify: true,
353 },
354 session_tag_allocator,
355 )),
356 chain_api: resolver,
357 session_telemetry_tag_allocator,
358 probing_tag_allocator,
359 counters: PeerProtocolCounterRegistry::default(),
360 cfg,
361 })
362 }
363
364 pub async fn run_relay<T, TFact, Ct>(
370 &self,
371 cover_traffic: Ct,
372 network: Net,
373 network_process: BoxedProcessFn,
374 ticket_events: T,
375 ticket_factory: TFact,
376 on_incoming_session: Sender<IncomingSession>,
377 ) -> errors::Result<(
378 HoprSocket<
379 futures::stream::BoxStream<'static, ApplicationDataIn>,
380 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
381 >,
382 AbortableList<HoprTransportProcess>,
383 )>
384 where
385 T: futures::Sink<hopr_api::node::TicketEvent> + Clone + Send + Unpin + 'static,
386 T::Error: std::error::Error + Clone + Send,
387 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
388 TFact: TicketFactory + Clone + Send + Sync + 'static,
389 {
390 self.run_inner(
391 protocol::NodeType::Relay,
392 cover_traffic,
393 network,
394 network_process,
395 ticket_events,
396 ticket_factory,
397 Some(on_incoming_session),
398 )
399 .await
400 }
401
402 pub async fn run_exit<TFact, Ct>(
407 &self,
408 cover_traffic: Ct,
409 network: Net,
410 network_process: BoxedProcessFn,
411 ticket_factory: TFact,
412 on_incoming_session: Sender<IncomingSession>,
413 ) -> errors::Result<(
414 HoprSocket<
415 futures::stream::BoxStream<'static, ApplicationDataIn>,
416 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
417 >,
418 AbortableList<HoprTransportProcess>,
419 )>
420 where
421 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
422 TFact: TicketFactory + Clone + Send + Sync + 'static,
423 {
424 self.run_inner(
425 protocol::NodeType::Exit,
426 cover_traffic,
427 network,
428 network_process,
429 futures::sink::drain(),
430 ticket_factory,
431 Some(on_incoming_session),
432 )
433 .await
434 }
435
436 pub async fn run_entry<TFact, Ct>(
442 &self,
443 cover_traffic: Ct,
444 network: Net,
445 network_process: BoxedProcessFn,
446 ticket_factory: TFact,
447 ) -> errors::Result<(
448 HoprSocket<
449 futures::stream::BoxStream<'static, ApplicationDataIn>,
450 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
451 >,
452 AbortableList<HoprTransportProcess>,
453 )>
454 where
455 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
456 TFact: TicketFactory + Clone + Send + Sync + 'static,
457 {
458 self.run_inner(
459 protocol::NodeType::Entry,
460 cover_traffic,
461 network,
462 network_process,
463 futures::sink::drain(),
464 ticket_factory,
465 None,
466 )
467 .await
468 }
469
470 #[allow(clippy::too_many_arguments)]
477 async fn run_inner<T, TFact, Ct>(
478 &self,
479 role: protocol::NodeType,
480 cover_traffic: Ct,
481 network: Net,
482 network_process: BoxedProcessFn,
483 ticket_events: T,
484 ticket_factory: TFact,
485 on_incoming_session: Option<Sender<IncomingSession>>,
486 ) -> errors::Result<(
487 HoprSocket<
488 futures::stream::BoxStream<'static, ApplicationDataIn>,
489 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
490 >,
491 AbortableList<HoprTransportProcess>,
492 )>
493 where
494 T: futures::Sink<hopr_api::node::TicketEvent> + Clone + Send + Unpin + 'static,
495 T::Error: std::error::Error + Clone + Send,
496 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
497 TFact: TicketFactory + Clone + Send + Sync + 'static,
498 {
499 let mut processes = AbortableList::<HoprTransportProcess>::default();
500
501 let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
502 channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
503
504 let transport_network = network;
507 let transport_layer_process = network_process;
508
509 let msg_codec = crate::protocol::HoprBinaryCodec {};
510 let (wire_msg_tx, wire_msg_rx) =
511 protocol::stream::process_stream_protocol(msg_codec, transport_network.clone()).await?;
512
513 let mixing_channel_tx = hopr_transport_mixer::MixerSink::new(wire_msg_tx, build_mixer_cfg_from_env());
514
515 #[cfg(feature = "runtime-tokio")]
517 processes.insert(
518 HoprTransportProcess::PathRefresh,
519 hopr_utils::spawn_as_abortable!(self.path_planner.run_background_refresh()),
520 );
521
522 processes.insert(
523 HoprTransportProcess::Medium,
524 hopr_utils::spawn_as_abortable!(transport_layer_process().inspect(|_| tracing::warn!(
525 task = %HoprTransportProcess::Medium,
526 "long-running background task finished"
527 ))),
528 );
529
530 let msg_protocol_bidirectional_channel_capacity =
531 std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
532 .ok()
533 .and_then(|s| s.trim().parse::<usize>().ok())
534 .filter(|&c| c > 0)
535 .unwrap_or(16_384);
536
537 debug!(
538 capacity = msg_protocol_bidirectional_channel_capacity,
539 "creating protocol bidirectional channel"
540 );
541 let (tx_from_protocol, rx_from_protocol) =
542 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
543
544 let cover_traffic_allocated_tag = self
548 .session_telemetry_tag_allocator
549 .allocate()
550 .ok_or_else(|| HoprTransportError::Api("failed to allocate cover traffic tag".into()))?;
551 let cover_traffic_tag: Tag = cover_traffic_allocated_tag.value().into();
552
553 let rx_from_protocol = rx_from_protocol.filter_map(move |(pseudonym, data)| {
556 let _keep_alive = &cover_traffic_allocated_tag;
557 async move { (data.data.application_tag != cover_traffic_tag).then_some((pseudonym, data)) }
558 });
559
560 let cover_traffic_stream = CoverTrafficGeneration::build(&cover_traffic).filter_map(move |routing| {
562 let start =
563 hopr_api::types::crypto_random::random_integer(0, Some((RANDOM_DATA.len() - 100) as u64)) as usize;
564 let data = &RANDOM_DATA[start..start + 100];
565
566 futures::future::ready(if let Ok(data) = ApplicationData::new(cover_traffic_tag, data) {
567 Some((routing, ApplicationDataOut::with_no_packet_info(data)))
568 } else {
569 tracing::error!("failed to construct cover traffic packet");
570 None
571 })
572 });
573
574 let merged_unresolved_output_data =
576 select_with_strategy(unresolved_routing_msg_rx, cover_traffic_stream, |_: &mut ()| {
577 futures::stream::PollNext::Left
578 });
579
580 let path_planner = self.path_planner.clone();
587 let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
588 let routing_concurrency = {
589 let avail = std::thread::available_parallelism()
590 .ok()
591 .map(|n| n.get())
592 .unwrap_or(1)
593 .max(1)
594 * 8;
595 self.cfg
596 .packet
597 .pipeline
598 .output_concurrency
599 .filter(|&n| n > 0)
600 .unwrap_or(avail)
601 };
602 let all_resolved_external_msg_rx = merged_unresolved_output_data
603 .then_concurrent(
604 move |(unresolved, mut data)| {
605 let path_planner = path_planner.clone();
606 async move {
607 trace!(?unresolved, "resolving routing for packet");
608 match path_planner
609 .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
610 .await
611 {
612 Ok((resolved, rem_surbs)) => {
613 let mut signals_to_dst = data
617 .packet_info
618 .as_ref()
619 .map(|info| info.signals_to_destination)
620 .unwrap_or_default();
621
622 if resolved.is_return() {
623 signals_to_dst = match rem_surbs {
624 Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
625 signals_to_dst | PacketSignal::SurbDistress
626 }
627 Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
628 _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
629 };
630 } else {
631 signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
633 }
634
635 data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
636 trace!(?resolved, "resolved routing for packet");
637 Some((resolved, data))
638 }
639 Err(error) => {
640 error!(%error, "failed to resolve routing");
641 None
642 }
643 }
644 }
645 .in_current_span()
646 },
647 routing_concurrency,
648 )
649 .filter_map(futures::future::ready);
650
651 let channels_dst = self
652 .chain_api
653 .domain_separators()
654 .await
655 .map_err(HoprTransportError::chain)?
656 .channel;
657
658 let pipeline_builder = HoprPacketPipelineBuilder::new()
659 .identity((&self.chain_key, &self.packet_key))
660 .transport((mixing_channel_tx, wire_msg_rx))
661 .api((tx_from_protocol, all_resolved_external_msg_rx))
662 .surb_store(self.path_planner.surb_store.clone())
663 .chain_api(self.chain_api.clone())
664 .ticket_factory(ticket_factory)
665 .channels_dst(channels_dst)
666 .with_counters(self.counters.clone())
667 .with_config(self.cfg.packet);
668
669 let pipeline_processes = match role {
670 protocol::NodeType::Relay => pipeline_builder.with_ticket_events(ticket_events).build_for_relay(),
671 protocol::NodeType::Exit => pipeline_builder.build_for_exit(),
672 protocol::NodeType::Entry => pipeline_builder.build_for_entry(),
673 };
674 processes.extend_from(pipeline_processes);
675
676 let flush_counters = self.counters.clone();
678 let flush_graph = self.graph.clone();
679 let flush_me = *self.packet_key.public();
680 let flush_interval = self.cfg.counter_flush_interval;
681 processes.insert(
682 HoprTransportProcess::CounterFlush,
683 hopr_utils::spawn_as_abortable!(async move {
684 use hopr_api::graph::traits::{EdgeObservableWrite, EdgeWeightType};
685
686 futures_time::stream::interval(futures_time::time::Duration::from(flush_interval))
687 .for_each(|_| {
688 for (peer, num_packets, num_acks) in flush_counters.drain() {
689 tracing::trace!(
690 %peer,
691 num_packets,
692 num_acks,
693 "flushing protocol conformance counters"
694 );
695 flush_graph.upsert_edge(&flush_me, &peer, |obs| {
696 obs.record(EdgeWeightType::ImmediateProtocolConformance { num_packets, num_acks });
697 });
698 }
699 futures::future::ready(())
700 })
701 .await;
702 }),
703 );
704
705 let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
707 .ok()
708 .and_then(|s| s.trim().parse::<usize>().ok())
709 .filter(|&c| c > 0)
710 .unwrap_or(128);
711 debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
712 let (manual_ping_tx, manual_ping_rx) =
713 channel::<(OffchainPublicKey, PingQueryReplier)>(manual_ping_channel_capacity);
714
715 let probe = Probe::new(self.cfg.probe, self.probing_tag_allocator.clone());
716
717 let (probing_processes, probe_classifier) = probe
718 .continuously_scan(
719 unresolved_routing_msg_tx.clone(),
720 manual_ping_rx,
721 cover_traffic,
722 self.graph.clone(),
723 )
724 .await;
725
726 processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
727
728 self.ping
730 .clone()
731 .set(Pinger::new(
732 PingConfig {
733 timeout: self.cfg.probe.timeout,
734 },
735 manual_ping_tx,
736 ))
737 .map_err(|_| HoprTransportError::Api("must set the ticket aggregation writer only once".into()))?;
738
739 let smgr_start_res = if role != protocol::NodeType::Entry {
741 self.smgr.start(
743 unresolved_routing_msg_tx.clone(),
744 on_incoming_session.ok_or_else(|| {
745 HoprTransportError::Api("on_incoming_session channel is required for relay/exit nodes".into())
746 })?,
747 )
748 } else {
749 self.smgr
751 .start(unresolved_routing_msg_tx.clone(), futures::sink::drain())
752 };
753
754 smgr_start_res
755 .map_err(|_| HoprTransportError::Api("failed to start session manager".into()))?
756 .into_iter()
757 .enumerate()
758 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
759 .for_each(|(k, v)| {
760 processes.insert(k, v);
761 });
762
763 let (on_incoming_data_tx, on_incoming_data_rx) =
767 channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
768 let smgr = self.smgr.clone();
769 processes.insert(
770 HoprTransportProcess::SessionsManagement(0),
771 hopr_utils::spawn_as_abortable!(
772 probe_classifier
773 .filter_stream(unresolved_routing_msg_tx.clone(), rx_from_protocol)
774 .filter_map(move |(pseudonym, data)| {
775 let smgr = smgr.clone();
776 async move {
777 match smgr.dispatch_message(pseudonym, data).await {
778 Ok(DispatchResult::Processed) => {
779 tracing::trace!("message dispatch completed");
780 None
781 }
782 Ok(DispatchResult::Unrelated(data)) => {
783 tracing::trace!("unrelated message dispatch completed");
784 Some(data)
785 }
786 Err(error) => {
787 tracing::error!(%error, "error while dispatching packet in the session manager");
788 None
789 }
790 }
791 }
792 })
793 .map(Ok)
794 .forward(on_incoming_data_tx)
795 .inspect(|_| tracing::warn!(
796 task = %HoprTransportProcess::SessionsManagement(0),
797 "long-running background task finished"
798 ))
799 ),
800 );
801
802 self.network
804 .clone()
805 .set(transport_network)
806 .map_err(|_| HoprTransportError::Api("transport network viewer already set".into()))?;
807
808 Ok((
809 (on_incoming_data_rx.boxed(), unresolved_routing_msg_tx).into(),
810 processes,
811 ))
812 }
813
814 #[tracing::instrument(level = "debug", skip(self))]
815 pub async fn ping(
816 &self,
817 peer: &OffchainPublicKey,
818 ) -> errors::Result<(std::time::Duration, <Graph as NetworkGraphView>::Observed)> {
819 let me: &OffchainPublicKey = self.packet_key.public();
820 if peer == me {
821 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
822 }
823
824 let pinger = self
825 .ping
826 .get()
827 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
828
829 let latency = (*pinger).ping(peer).await?;
830
831 if let Some(observations) = self.graph.edge(me, peer) {
832 Ok((latency, observations))
833 } else {
834 Err(HoprTransportError::Api(format!(
835 "no observations available for peer {peer}",
836 )))
837 }
838 }
839
840 #[tracing::instrument(level = "debug", skip(self))]
841 pub async fn new_session(
842 &self,
843 destination: Address,
844 target: SessionTarget,
845 cfg: SessionClientConfig,
846 ) -> errors::Result<(HoprSession, HoprSessionConfigurator)> {
847 let session = self.smgr.new_session(destination, target, cfg).await?;
848 let id = *session.id();
849 Ok((
850 session,
851 HoprSessionConfigurator {
852 id,
853 smgr: Arc::downgrade(&self.smgr),
854 },
855 ))
856 }
857
858 #[tracing::instrument(level = "debug", skip(self))]
859 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
860 self.network
861 .get()
862 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
863 .map(|network| network.listening_as().into_iter().collect())
864 .unwrap_or_default()
865 }
866
867 #[tracing::instrument(level = "debug", skip(self))]
868 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
869 let mut mas = self
870 .local_multiaddresses()
871 .into_iter()
872 .filter(|ma| {
873 crate::multiaddrs::is_supported(ma)
874 && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
875 })
876 .map(|ma| strip_p2p_protocol(&ma))
877 .filter(|v| !v.is_empty())
878 .collect::<Vec<_>>();
879
880 mas.sort_by(|l, r| {
881 let is_left_dns = crate::multiaddrs::is_dns(l);
882 let is_right_dns = crate::multiaddrs::is_dns(r);
883
884 if !(is_left_dns ^ is_right_dns) {
885 std::cmp::Ordering::Equal
886 } else if is_left_dns {
887 std::cmp::Ordering::Less
888 } else {
889 std::cmp::Ordering::Greater
890 }
891 });
892
893 mas
894 }
895
896 pub fn graph(&self) -> &Graph {
898 &self.graph
899 }
900
901 #[tracing::instrument(level = "debug", skip(self))]
902 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
903 self.network
904 .get()
905 .map(|network| network.listening_as().into_iter().collect())
906 .unwrap_or_else(|| {
907 tracing::error!("transport network is not yet initialized, cannot fetch announced multiaddresses");
908 self.my_multiaddresses.clone()
909 })
910 }
911
912 #[tracing::instrument(level = "debug", skip(self))]
913 pub async fn network_observed_multiaddresses(&self, peer: &OffchainPublicKey) -> Vec<Multiaddr> {
914 match self
915 .network
916 .get()
917 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
918 {
919 Ok(network) => network
920 .multiaddress_of(&peer.into())
921 .unwrap_or_default()
922 .into_iter()
923 .collect(),
924 Err(error) => {
925 tracing::error!(%error, "failed to get observed multiaddresses");
926 return vec![];
927 }
928 }
929 }
930
931 #[tracing::instrument(level = "debug", skip(self))]
932 pub async fn network_health(&self) -> Health {
933 self.network
934 .get()
935 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
936 .map(|network| network.health())
937 .unwrap_or(Health::Red)
938 }
939
940 pub async fn network_connected_peers(&self) -> errors::Result<Vec<OffchainPublicKey>> {
941 Ok(futures::stream::iter(
942 self.network
943 .get()
944 .ok_or_else(|| {
945 tracing::error!("transport network is not yet initialized");
946 HoprTransportError::Api("transport network is not yet initialized".into())
947 })?
948 .connected_peers(),
949 )
950 .filter_map(|peer_id| async move {
951 match peer_id_to_public_key(&peer_id) {
952 Ok(key) => Some(key),
953 Err(error) => {
954 tracing::warn!(%peer_id, %error, "failed to convert PeerId to OffchainPublicKey");
955 None
956 }
957 }
958 })
959 .collect()
960 .await)
961 }
962
963 #[tracing::instrument(level = "debug", skip(self))]
964 pub fn network_peer_observations(&self, peer: &OffchainPublicKey) -> Option<<Graph as NetworkGraphView>::Observed> {
965 self.graph.edge(self.packet_key.public(), peer)
966 }
967
968 #[tracing::instrument(level = "debug", skip(self))]
970 pub async fn all_network_peers(
971 &self,
972 minimum_score: f64,
973 ) -> errors::Result<Vec<(OffchainPublicKey, <Graph as NetworkGraphView>::Observed)>> {
974 let me = self.packet_key.public();
975 Ok(self
976 .network_connected_peers()
977 .await?
978 .into_iter()
979 .filter_map(|peer| {
980 let observation = self.graph.edge(me, &peer);
981 if let Some(info) = observation {
982 if info.score() >= minimum_score {
983 Some((peer, info))
984 } else {
985 None
986 }
987 } else {
988 None
989 }
990 })
991 .collect::<Vec<_>>())
992 }
993}
994
995fn build_mixer_cfg_from_env() -> MixerConfig {
996 let mixer_cfg = MixerConfig {
997 min_delay: std::time::Duration::from_millis(
998 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
999 .map(|v| {
1000 v.trim()
1001 .parse::<u64>()
1002 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
1003 })
1004 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
1005 ),
1006 delay_range: std::time::Duration::from_millis(
1007 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
1008 .map(|v| {
1009 v.trim()
1010 .parse::<u64>()
1011 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
1012 })
1013 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
1014 ),
1015 capacity: {
1016 let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
1017 .ok()
1018 .and_then(|s| s.trim().parse::<usize>().ok())
1019 .filter(|&c| c > 0)
1020 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
1021 debug!(capacity = capacity, "Setting mixer capacity");
1022 capacity
1023 },
1024 ..MixerConfig::default()
1025 };
1026 debug!(?mixer_cfg, "Mixer configuration");
1027
1028 mixer_cfg
1029}
1030
1031impl<Chain, Graph, Net> NetworkView for HoprTransport<Chain, Graph, Net>
1036where
1037 Net: NetworkView + Send + Sync + 'static,
1038{
1039 fn listening_as(&self) -> std::collections::HashSet<Multiaddr> {
1040 self.network.get().map(|n| n.listening_as()).unwrap_or_default()
1041 }
1042
1043 fn multiaddress_of(&self, peer: &PeerId) -> Option<std::collections::HashSet<Multiaddr>> {
1044 self.network.get()?.multiaddress_of(peer)
1045 }
1046
1047 fn discovered_peers(&self) -> std::collections::HashSet<PeerId> {
1048 self.network.get().map(|n| n.discovered_peers()).unwrap_or_default()
1049 }
1050
1051 fn connected_peers(&self) -> std::collections::HashSet<PeerId> {
1052 self.network.get().map(|n| n.connected_peers()).unwrap_or_default()
1053 }
1054
1055 fn is_connected(&self, peer: &PeerId) -> bool {
1056 self.network.get().map(|n| n.is_connected(peer)).unwrap_or(false)
1057 }
1058
1059 fn health(&self) -> Health {
1060 self.network.get().map(|n| n.health()).unwrap_or(Health::Red)
1061 }
1062
1063 fn subscribe_network_events(
1064 &self,
1065 ) -> impl futures::Stream<Item = hopr_api::network::NetworkEvent> + Send + 'static {
1066 match self.network.get() {
1067 Some(n) => futures::future::Either::Left(n.subscribe_network_events()),
1068 None => futures::future::Either::Right(futures::stream::empty()),
1069 }
1070 }
1071}
1072
1073#[async_trait::async_trait]
1078impl<Chain, Graph, Net> hopr_api::node::TransportOperations for HoprTransport<Chain, Graph, Net>
1079where
1080 Chain: ChainReadChannelOperations
1081 + ChainReadAccountOperations
1082 + hopr_api::chain::ChainWriteTicketOperations
1083 + ChainKeyOperations
1084 + hopr_api::chain::ChainReadTicketOperations
1085 + ChainValues
1086 + Clone
1087 + Send
1088 + Sync
1089 + 'static,
1090 Graph: NetworkGraphView<NodeId = OffchainPublicKey>
1091 + NetworkGraphUpdate
1092 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
1093 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
1094 + Clone
1095 + Send
1096 + Sync
1097 + 'static,
1098 <Graph as NetworkGraphView>::Observed: EdgeObservableRead + Send,
1099 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed: EdgeObservableRead + Send + 'static,
1100 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
1101 Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
1102{
1103 type Error = errors::HoprTransportError;
1104 type Observable = <Graph as NetworkGraphView>::Observed;
1105
1106 async fn ping(&self, key: &OffchainPublicKey) -> Result<(Duration, Self::Observable), Self::Error> {
1107 self.ping(key).await
1108 }
1109
1110 async fn observed_multiaddresses(&self, key: &OffchainPublicKey) -> Vec<Multiaddr> {
1111 self.network_observed_multiaddresses(key).await
1112 }
1113}