1pub mod config;
17pub mod constants;
19pub mod errors;
21
22mod multiaddrs;
23
24#[cfg(feature = "capture")]
25mod capture;
26mod pipeline;
27pub mod socket;
28
29use std::{
30 sync::{Arc, OnceLock},
31 time::Duration,
32};
33
34use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
35use futures::{
36 FutureExt, StreamExt,
37 channel::mpsc::{Sender, channel},
38 stream::select_with_strategy,
39};
40pub use hopr_api::{
41 Multiaddr, PeerId,
42 network::{Health, traits::NetworkView},
43 types::{
44 crypto::{
45 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
46 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
47 },
48 internal::{prelude::HoprPseudonym, routing::RoutingOptions},
49 },
50};
51use hopr_api::{
52 chain::{ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
53 ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
54 graph::{NetworkGraphUpdate, NetworkGraphView, traits::EdgeObservableRead},
55 network::{BoxedProcessFn, NetworkStreamControl},
56 types::primitive::prelude::*,
57};
58use hopr_async_runtime::{AbortableList, prelude::spawn, spawn_as_abortable};
59use hopr_crypto_packet::prelude::PacketSignal;
60use hopr_network_types::prelude::*;
61pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
62use hopr_protocol_hopr::MemorySurbStore;
63use hopr_transport_mixer::MixerConfig;
64use hopr_transport_path::{BackgroundPathCacheRefreshable, HoprGraphPathSelector, PathPlanner};
65pub use hopr_transport_probe::{NeighborTelemetry, PathTelemetry, errors::ProbeError, ping::PingQueryReplier};
66use hopr_transport_probe::{
67 Probe,
68 ping::{PingConfig, Pinger},
69};
70pub use hopr_transport_protocol::PeerProtocolCounterRegistry;
71pub use hopr_transport_session as session;
72#[cfg(feature = "runtime-tokio")]
73pub use hopr_transport_session::transfer_session;
74pub use hopr_transport_session::{
75 Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
76 SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
77 errors::{SessionManagerError, TransportSessionError},
78};
79use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
80#[cfg(feature = "telemetry")]
81pub use hopr_transport_session::{SessionAckMode, SessionLifecycleState};
82pub use hopr_transport_tag_allocator::TagAllocatorConfig;
83pub use multiaddr::Protocol;
84use tracing::{Instrument, debug, error, trace, warn};
85
86pub use crate::config::HoprProtocolConfig;
87use crate::{
88 constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, multiaddrs::strip_p2p_protocol,
89 pipeline::HoprPipelineComponents, socket::HoprSocket,
90};
91
92pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
93
94pub use hopr_api as api;
95use hopr_api::{
96 chain::{ChainReadTicketOperations, ChainWriteTicketOperations},
97 tickets::TicketFactory,
98 types::internal::routing::DestinationRouting,
99};
100
101lazy_static::lazy_static! {
103 static ref SESSION_INITIATION_TIMEOUT_MAX: Duration = 2 * SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
104
105 static ref PEER_ID_CACHE: moka::sync::Cache<PeerId, OffchainPublicKey> = moka::sync::Cache::builder()
106 .time_to_idle(Duration::from_mins(15))
107 .max_capacity(10_000)
108 .build();
109
110 static ref RANDOM_DATA: [u8; 400] = hopr_api::types::crypto_random::random_bytes();
111}
112
113pub fn peer_id_to_public_key(peer_id: &PeerId) -> crate::errors::Result<OffchainPublicKey> {
118 PEER_ID_CACHE
119 .try_get_with_by_ref(peer_id, move || {
120 OffchainPublicKey::from_peerid(peer_id).map_err(|e| e.into())
121 })
122 .map_err(|e: Arc<HoprTransportError>| {
123 crate::errors::HoprTransportError::Other(anyhow::anyhow!(
124 "failed to convert peer_id ({:?}) to an offchain public key: {e}",
125 peer_id
126 ))
127 })
128}
129
130#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
131pub enum HoprTransportProcess {
132 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
133 Medium,
134 #[strum(to_string = "HOPR packet pipeline ({0})")]
135 Pipeline(hopr_transport_protocol::PacketPipelineProcesses),
136 #[strum(to_string = "session manager sub-process #{0}")]
137 SessionsManagement(usize),
138 #[strum(to_string = "network probing sub-process: {0}")]
139 Probing(hopr_transport_probe::HoprProbeProcess),
140 #[cfg(feature = "runtime-tokio")]
141 #[strum(to_string = "path cache refresh")]
142 PathRefresh,
143 #[strum(to_string = "sync of outgoing ticket indices")]
144 OutgoingIndexSync,
145 #[strum(to_string = "periodic protocol counter flush")]
146 CounterFlush,
147 #[cfg(feature = "capture")]
148 #[strum(to_string = "packet capture")]
149 Capture,
150}
151
152type HoprSessionManager = SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>;
154
155#[derive(Debug, Clone)]
160pub struct HoprSessionConfigurator {
161 id: SessionId,
162 smgr: std::sync::Weak<HoprSessionManager>,
164}
165
166impl HoprSessionConfigurator {
167 pub fn id(&self) -> &SessionId {
169 &self.id
170 }
171
172 pub async fn ping(&self) -> errors::Result<()> {
180 Ok(self
181 .smgr
182 .upgrade()
183 .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
184 .ping_session(&self.id)
185 .await?)
186 }
187
188 pub async fn get_surb_balancer_config(&self) -> errors::Result<Option<SurbBalancerConfig>> {
194 Ok(self
195 .smgr
196 .upgrade()
197 .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
198 .get_surb_balancer_config(&self.id)
199 .await?)
200 }
201
202 pub async fn update_surb_balancer_config(&self, config: SurbBalancerConfig) -> errors::Result<()> {
207 Ok(self
208 .smgr
209 .upgrade()
210 .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
211 .update_surb_balancer_config(&self.id, config)
212 .await?)
213 }
214}
215
216pub struct HoprTransport<Chain, Graph, Net> {
219 packet_key: OffchainKeypair,
220 chain_key: ChainKeypair,
221 chain_api: Chain,
222 ping: Arc<OnceLock<Pinger>>,
223 network: Arc<OnceLock<Net>>,
224 graph: Graph,
225 path_planner: PathPlanner<MemorySurbStore, Chain, HoprGraphPathSelector<Graph>>,
226 my_multiaddresses: Vec<Multiaddr>,
227 smgr: Arc<HoprSessionManager>,
228 session_telemetry_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
229 probing_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
230 counters: PeerProtocolCounterRegistry,
231 cfg: HoprProtocolConfig,
232}
233
234impl<Chain, Graph, Net> HoprTransport<Chain, Graph, Net>
235where
236 Chain: ChainReadChannelOperations
237 + ChainReadAccountOperations
238 + ChainWriteTicketOperations
239 + ChainKeyOperations
240 + ChainReadTicketOperations
241 + ChainValues
242 + Clone
243 + Send
244 + Sync
245 + 'static,
246 Graph: NetworkGraphView<NodeId = OffchainPublicKey>
247 + NetworkGraphUpdate
248 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
249 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
250 + Clone
251 + Send
252 + Sync
253 + 'static,
254 <Graph as NetworkGraphView>::Observed: hopr_api::graph::traits::EdgeObservableRead + Send,
255 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
256 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
257 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
258 Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
259{
260 pub fn new(
261 identity: (&ChainKeypair, &OffchainKeypair),
262 resolver: Chain,
263 graph: Graph,
264 my_multiaddresses: Vec<Multiaddr>,
265 cfg: HoprProtocolConfig,
266 ) -> errors::Result<Self> {
267 let me_offchain = *identity.1.public();
268 let planner_config = cfg.path_planner;
269 let selector = HoprGraphPathSelector::new(me_offchain, graph.clone(), planner_config.max_cached_paths);
270
271 let tag_allocators = hopr_transport_tag_allocator::create_allocators_from_config(&cfg.session.tag_allocator)?;
272
273 let mut session_tag_allocator = None;
274 let mut session_telemetry_tag_allocator = None;
275 let mut probing_tag_allocator = None;
276 for (usage, alloc) in tag_allocators {
277 match usage {
278 hopr_transport_tag_allocator::Usage::Session => session_tag_allocator = Some(alloc),
279 hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry => {
280 session_telemetry_tag_allocator = Some(alloc)
281 }
282 hopr_transport_tag_allocator::Usage::ProvingTelemetry => probing_tag_allocator = Some(alloc),
283 }
284 }
285 let session_tag_allocator = session_tag_allocator
286 .ok_or_else(|| errors::HoprTransportError::Api("session tag allocator missing".into()))?;
287 let session_telemetry_tag_allocator = session_telemetry_tag_allocator
288 .ok_or_else(|| errors::HoprTransportError::Api("session telemetry tag allocator missing".into()))?;
289 let probing_tag_allocator = probing_tag_allocator
290 .ok_or_else(|| errors::HoprTransportError::Api("probing tag allocator missing".into()))?;
291
292 Ok(Self {
293 packet_key: identity.1.clone(),
294 chain_key: identity.0.clone(),
295 ping: Arc::new(OnceLock::new()),
296 network: Arc::new(OnceLock::new()),
297 graph,
298 path_planner: PathPlanner::new(
299 me_offchain,
300 MemorySurbStore::new(cfg.packet.surb_store),
301 resolver.clone(),
302 selector,
303 planner_config,
304 ),
305 my_multiaddresses,
306 smgr: SessionManager::new(
307 SessionManagerConfig {
308 frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
309 .ok()
310 .and_then(|s| s.parse::<usize>().ok())
311 .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
312 .max(ApplicationData::PAYLOAD_SIZE),
313 max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
314 .ok()
315 .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
316 .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
317 .max(Duration::from_millis(100)),
318 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
319 idle_timeout: cfg.session.idle_timeout,
320 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
321 initial_return_session_egress_rate: 10,
322 minimum_surb_buffer_duration: cfg.session.balancer_minimum_surb_buffer_duration,
323 maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
324 surb_balance_notify_period: None,
325 surb_target_notify: true,
326 },
327 session_tag_allocator,
328 )
329 .into(),
330 chain_api: resolver,
331 session_telemetry_tag_allocator,
332 probing_tag_allocator,
333 counters: PeerProtocolCounterRegistry::default(),
334 cfg,
335 })
336 }
337
338 pub async fn run<T, TFact, Ct>(
345 &self,
346 cover_traffic: Ct,
347 network: Net,
348 network_process: BoxedProcessFn,
349 ticket_events: T,
350 ticket_factory: TFact,
351 on_incoming_session: Sender<IncomingSession>,
352 ) -> errors::Result<(
353 HoprSocket<
354 futures::channel::mpsc::Receiver<ApplicationDataIn>,
355 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
356 >,
357 AbortableList<HoprTransportProcess>,
358 )>
359 where
360 T: futures::Sink<hopr_api::node::TicketEvent> + Clone + Send + Unpin + 'static,
361 T::Error: std::error::Error + Clone + Send,
362 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
363 TFact: TicketFactory + Clone + Send + Sync + 'static,
364 {
365 let mut processes = AbortableList::<HoprTransportProcess>::default();
366
367 let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
368 channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
369
370 let transport_network = network;
373 let transport_layer_process = network_process;
374
375 let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
376 let (wire_msg_tx, wire_msg_rx) =
377 hopr_transport_protocol::stream::process_stream_protocol(msg_codec, transport_network.clone()).await?;
378
379 let (mixing_channel_tx, mixing_channel_rx) =
380 hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(build_mixer_cfg_from_env());
381
382 let _mixing_process_before_sending_out = spawn(
384 mixing_channel_rx
385 .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
386 .map(Ok)
387 .forward(wire_msg_tx)
388 .inspect(|_| {
389 tracing::warn!(
390 task = "mixer -> egress process",
391 "long-running background task finished"
392 )
393 }),
394 );
395
396 #[cfg(feature = "runtime-tokio")]
398 processes.insert(
399 HoprTransportProcess::PathRefresh,
400 spawn_as_abortable!(self.path_planner.run_background_refresh()),
401 );
402
403 processes.insert(
404 HoprTransportProcess::Medium,
405 spawn_as_abortable!(transport_layer_process().inspect(|_| tracing::warn!(
406 task = %HoprTransportProcess::Medium,
407 "long-running background task finished"
408 ))),
409 );
410
411 let msg_protocol_bidirectional_channel_capacity =
412 std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
413 .ok()
414 .and_then(|s| s.trim().parse::<usize>().ok())
415 .filter(|&c| c > 0)
416 .unwrap_or(16_384);
417
418 let (on_incoming_data_tx, on_incoming_data_rx) =
419 channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
420
421 debug!(
422 capacity = msg_protocol_bidirectional_channel_capacity,
423 "creating protocol bidirectional channel"
424 );
425 let (tx_from_protocol, rx_from_protocol) =
426 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
427
428 let cover_traffic_allocated_tag = self
432 .session_telemetry_tag_allocator
433 .allocate()
434 .ok_or_else(|| HoprTransportError::Api("failed to allocate cover traffic tag".into()))?;
435 let cover_traffic_tag: Tag = cover_traffic_allocated_tag.value().into();
436
437 let rx_from_protocol = rx_from_protocol.filter_map(move |(pseudonym, data)| {
440 let _keep_alive = &cover_traffic_allocated_tag;
441 async move { (data.data.application_tag != cover_traffic_tag).then_some((pseudonym, data)) }
442 });
443
444 let cover_traffic_stream = CoverTrafficGeneration::build(&cover_traffic).filter_map(move |routing| {
446 let start =
447 hopr_api::types::crypto_random::random_integer(0, Some((RANDOM_DATA.len() - 100) as u64)) as usize;
448 let data = &RANDOM_DATA[start..start + 100];
449
450 futures::future::ready(if let Ok(data) = ApplicationData::new(cover_traffic_tag, data) {
451 Some((routing, ApplicationDataOut::with_no_packet_info(data)))
452 } else {
453 tracing::error!("failed to construct cover traffic packet");
454 None
455 })
456 });
457
458 let merged_unresolved_output_data =
460 select_with_strategy(unresolved_routing_msg_rx, cover_traffic_stream, |_: &mut ()| {
461 futures::stream::PollNext::Left
462 });
463
464 let path_planner = self.path_planner.clone();
469 let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
470 let all_resolved_external_msg_rx = merged_unresolved_output_data.filter_map(move |(unresolved, mut data)| {
471 let path_planner = path_planner.clone();
472 async move {
473 trace!(?unresolved, "resolving routing for packet");
474 match path_planner
475 .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
476 .await
477 {
478 Ok((resolved, rem_surbs)) => {
479 let mut signals_to_dst = data
483 .packet_info
484 .as_ref()
485 .map(|info| info.signals_to_destination)
486 .unwrap_or_default();
487
488 if resolved.is_return() {
489 signals_to_dst = match rem_surbs {
490 Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
491 signals_to_dst | PacketSignal::SurbDistress
492 }
493 Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
494 _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
495 };
496 } else {
497 signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
499 }
500
501 data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
502 trace!(?resolved, "resolved routing for packet");
503 Some((resolved, data))
504 }
505 Err(error) => {
506 error!(%error, "failed to resolve routing");
507 None
508 }
509 }
510 }
511 .in_current_span()
512 });
513
514 let channels_dst = self
515 .chain_api
516 .domain_separators()
517 .await
518 .map_err(HoprTransportError::chain)?
519 .channel;
520
521 processes.extend_from(pipeline::run_hopr_packet_pipeline(
522 (self.packet_key.clone(), self.chain_key.clone()),
523 (mixing_channel_tx, wire_msg_rx),
524 (tx_from_protocol, all_resolved_external_msg_rx),
525 HoprPipelineComponents {
526 surb_store: self.path_planner.surb_store.clone(),
527 chain_api: self.chain_api.clone(),
528 counters: self.counters.clone(),
529 ticket_factory,
530 ticket_events,
531 },
532 channels_dst,
533 self.cfg.packet,
534 ));
535
536 let flush_counters = self.counters.clone();
538 let flush_graph = self.graph.clone();
539 let flush_me = *self.packet_key.public();
540 let flush_interval = self.cfg.counter_flush_interval;
541 processes.insert(
542 HoprTransportProcess::CounterFlush,
543 spawn_as_abortable!(async move {
544 use hopr_api::graph::traits::{EdgeObservableWrite, EdgeWeightType};
545
546 futures_time::stream::interval(futures_time::time::Duration::from(flush_interval))
547 .for_each(|_| {
548 for (peer, num_packets, num_acks) in flush_counters.drain() {
549 tracing::trace!(
550 %peer,
551 num_packets,
552 num_acks,
553 "flushing protocol conformance counters"
554 );
555 flush_graph.upsert_edge(&flush_me, &peer, |obs| {
556 obs.record(EdgeWeightType::ImmediateProtocolConformance { num_packets, num_acks });
557 });
558 }
559 futures::future::ready(())
560 })
561 .await;
562 }),
563 );
564
565 debug!(
567 capacity = msg_protocol_bidirectional_channel_capacity,
568 note = "same as protocol bidirectional",
569 "Creating probing channel"
570 );
571
572 let (tx_from_probing, rx_from_probing) =
573 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
574
575 let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
576 .ok()
577 .and_then(|s| s.trim().parse::<usize>().ok())
578 .filter(|&c| c > 0)
579 .unwrap_or(128);
580 debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
581 let (manual_ping_tx, manual_ping_rx) =
582 channel::<(OffchainPublicKey, PingQueryReplier)>(manual_ping_channel_capacity);
583
584 let probe = Probe::new(self.cfg.probe, self.probing_tag_allocator.clone());
585
586 let probing_processes = probe
587 .continuously_scan(
588 (unresolved_routing_msg_tx.clone(), rx_from_protocol),
589 manual_ping_rx,
590 tx_from_probing,
591 cover_traffic,
592 self.graph.clone(),
593 )
594 .await;
595
596 processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
597
598 self.ping
600 .clone()
601 .set(Pinger::new(
602 PingConfig {
603 timeout: self.cfg.probe.timeout,
604 },
605 manual_ping_tx,
606 ))
607 .map_err(|_| HoprTransportError::Api("must set the ticket aggregation writer only once".into()))?;
608
609 self.smgr
611 .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
612 .map_err(|_| HoprTransportError::Api("failed to start session manager".into()))?
613 .into_iter()
614 .enumerate()
615 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
616 .for_each(|(k, v)| {
617 processes.insert(k, v);
618 });
619
620 let smgr = self.smgr.clone();
621 processes.insert(
622 HoprTransportProcess::SessionsManagement(0),
623 spawn_as_abortable!(
624 rx_from_probing
625 .filter_map(move |(pseudonym, data)| {
626 let smgr = smgr.clone();
627 async move {
628 match smgr.dispatch_message(pseudonym, data).await {
629 Ok(DispatchResult::Processed) => {
630 tracing::trace!("message dispatch completed");
631 None
632 }
633 Ok(DispatchResult::Unrelated(data)) => {
634 tracing::trace!("unrelated message dispatch completed");
635 Some(data)
636 }
637 Err(error) => {
638 tracing::error!(%error, "error while dispatching packet in the session manager");
639 None
640 }
641 }
642 }
643 })
644 .map(Ok)
645 .forward(on_incoming_data_tx)
646 .inspect(|_| tracing::warn!(
647 task = %HoprTransportProcess::SessionsManagement(0),
648 "long-running background task finished"
649 ))
650 ),
651 );
652
653 self.network
655 .clone()
656 .set(transport_network)
657 .map_err(|_| HoprTransportError::Api("transport network viewer already set".into()))?;
658
659 Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
660 }
661
662 #[tracing::instrument(level = "debug", skip(self))]
663 pub async fn ping(
664 &self,
665 peer: &OffchainPublicKey,
666 ) -> errors::Result<(std::time::Duration, <Graph as NetworkGraphView>::Observed)> {
667 let me: &OffchainPublicKey = self.packet_key.public();
668 if peer == me {
669 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
670 }
671
672 let pinger = self
673 .ping
674 .get()
675 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
676
677 let latency = (*pinger).ping(peer).await?;
678
679 if let Some(observations) = self.graph.edge(me, peer) {
680 Ok((latency, observations))
681 } else {
682 Err(HoprTransportError::Api(format!(
683 "no observations available for peer {peer}",
684 )))
685 }
686 }
687
688 #[tracing::instrument(level = "debug", skip(self))]
689 pub async fn new_session(
690 &self,
691 destination: Address,
692 target: SessionTarget,
693 cfg: SessionClientConfig,
694 ) -> errors::Result<(HoprSession, HoprSessionConfigurator)> {
695 let session = self.smgr.new_session(destination, target, cfg).await?;
696 let id = *session.id();
697 Ok((
698 session,
699 HoprSessionConfigurator {
700 id,
701 smgr: Arc::downgrade(&self.smgr),
702 },
703 ))
704 }
705
706 #[tracing::instrument(level = "debug", skip(self))]
707 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
708 self.network
709 .get()
710 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
711 .map(|network| network.listening_as().into_iter().collect())
712 .unwrap_or_default()
713 }
714
715 #[tracing::instrument(level = "debug", skip(self))]
716 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
717 let mut mas = self
718 .local_multiaddresses()
719 .into_iter()
720 .filter(|ma| {
721 crate::multiaddrs::is_supported(ma)
722 && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
723 })
724 .map(|ma| strip_p2p_protocol(&ma))
725 .filter(|v| !v.is_empty())
726 .collect::<Vec<_>>();
727
728 mas.sort_by(|l, r| {
729 let is_left_dns = crate::multiaddrs::is_dns(l);
730 let is_right_dns = crate::multiaddrs::is_dns(r);
731
732 if !(is_left_dns ^ is_right_dns) {
733 std::cmp::Ordering::Equal
734 } else if is_left_dns {
735 std::cmp::Ordering::Less
736 } else {
737 std::cmp::Ordering::Greater
738 }
739 });
740
741 mas
742 }
743
744 pub fn graph(&self) -> &Graph {
746 &self.graph
747 }
748
749 #[tracing::instrument(level = "debug", skip(self))]
750 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
751 self.network
752 .get()
753 .map(|network| network.listening_as().into_iter().collect())
754 .unwrap_or_else(|| {
755 tracing::error!("transport network is not yet initialized, cannot fetch announced multiaddresses");
756 self.my_multiaddresses.clone()
757 })
758 }
759
760 #[tracing::instrument(level = "debug", skip(self))]
761 pub async fn network_observed_multiaddresses(&self, peer: &OffchainPublicKey) -> Vec<Multiaddr> {
762 match self
763 .network
764 .get()
765 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
766 {
767 Ok(network) => network
768 .multiaddress_of(&peer.into())
769 .unwrap_or_default()
770 .into_iter()
771 .collect(),
772 Err(error) => {
773 tracing::error!(%error, "failed to get observed multiaddresses");
774 return vec![];
775 }
776 }
777 }
778
779 #[tracing::instrument(level = "debug", skip(self))]
780 pub async fn network_health(&self) -> Health {
781 self.network
782 .get()
783 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
784 .map(|network| network.health())
785 .unwrap_or(Health::Red)
786 }
787
788 pub async fn network_connected_peers(&self) -> errors::Result<Vec<OffchainPublicKey>> {
789 Ok(futures::stream::iter(
790 self.network
791 .get()
792 .ok_or_else(|| {
793 tracing::error!("transport network is not yet initialized");
794 HoprTransportError::Api("transport network is not yet initialized".into())
795 })?
796 .connected_peers(),
797 )
798 .filter_map(|peer_id| async move {
799 match peer_id_to_public_key(&peer_id) {
800 Ok(key) => Some(key),
801 Err(error) => {
802 tracing::warn!(%peer_id, %error, "failed to convert PeerId to OffchainPublicKey");
803 None
804 }
805 }
806 })
807 .collect()
808 .await)
809 }
810
811 #[tracing::instrument(level = "debug", skip(self))]
812 pub fn network_peer_observations(&self, peer: &OffchainPublicKey) -> Option<<Graph as NetworkGraphView>::Observed> {
813 self.graph.edge(self.packet_key.public(), peer)
814 }
815
816 #[tracing::instrument(level = "debug", skip(self))]
818 pub async fn all_network_peers(
819 &self,
820 minimum_score: f64,
821 ) -> errors::Result<Vec<(OffchainPublicKey, <Graph as NetworkGraphView>::Observed)>> {
822 let me = self.packet_key.public();
823 Ok(self
824 .network_connected_peers()
825 .await?
826 .into_iter()
827 .filter_map(|peer| {
828 let observation = self.graph.edge(me, &peer);
829 if let Some(info) = observation {
830 if info.score() >= minimum_score {
831 Some((peer, info))
832 } else {
833 None
834 }
835 } else {
836 None
837 }
838 })
839 .collect::<Vec<_>>())
840 }
841}
842
843fn build_mixer_cfg_from_env() -> MixerConfig {
844 let mixer_cfg = MixerConfig {
845 min_delay: std::time::Duration::from_millis(
846 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
847 .map(|v| {
848 v.trim()
849 .parse::<u64>()
850 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
851 })
852 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
853 ),
854 delay_range: std::time::Duration::from_millis(
855 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
856 .map(|v| {
857 v.trim()
858 .parse::<u64>()
859 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
860 })
861 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
862 ),
863 capacity: {
864 let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
865 .ok()
866 .and_then(|s| s.trim().parse::<usize>().ok())
867 .filter(|&c| c > 0)
868 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
869 debug!(capacity = capacity, "Setting mixer capacity");
870 capacity
871 },
872 ..MixerConfig::default()
873 };
874 debug!(?mixer_cfg, "Mixer configuration");
875
876 mixer_cfg
877}
878
879impl<Chain, Graph, Net> NetworkView for HoprTransport<Chain, Graph, Net>
884where
885 Net: NetworkView + Send + Sync + 'static,
886{
887 fn listening_as(&self) -> std::collections::HashSet<Multiaddr> {
888 self.network.get().map(|n| n.listening_as()).unwrap_or_default()
889 }
890
891 fn multiaddress_of(&self, peer: &PeerId) -> Option<std::collections::HashSet<Multiaddr>> {
892 self.network.get()?.multiaddress_of(peer)
893 }
894
895 fn discovered_peers(&self) -> std::collections::HashSet<PeerId> {
896 self.network.get().map(|n| n.discovered_peers()).unwrap_or_default()
897 }
898
899 fn connected_peers(&self) -> std::collections::HashSet<PeerId> {
900 self.network.get().map(|n| n.connected_peers()).unwrap_or_default()
901 }
902
903 fn is_connected(&self, peer: &PeerId) -> bool {
904 self.network.get().map(|n| n.is_connected(peer)).unwrap_or(false)
905 }
906
907 fn health(&self) -> Health {
908 self.network.get().map(|n| n.health()).unwrap_or(Health::Red)
909 }
910
911 fn subscribe_network_events(
912 &self,
913 ) -> impl futures::Stream<Item = hopr_api::network::NetworkEvent> + Send + 'static {
914 match self.network.get() {
915 Some(n) => futures::future::Either::Left(n.subscribe_network_events()),
916 None => futures::future::Either::Right(futures::stream::empty()),
917 }
918 }
919}
920
921#[async_trait::async_trait]
926impl<Chain, Graph, Net> hopr_api::node::TransportOperations for HoprTransport<Chain, Graph, Net>
927where
928 Chain: ChainReadChannelOperations
929 + ChainReadAccountOperations
930 + hopr_api::chain::ChainWriteTicketOperations
931 + ChainKeyOperations
932 + hopr_api::chain::ChainReadTicketOperations
933 + ChainValues
934 + Clone
935 + Send
936 + Sync
937 + 'static,
938 Graph: NetworkGraphView<NodeId = OffchainPublicKey>
939 + NetworkGraphUpdate
940 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
941 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
942 + Clone
943 + Send
944 + Sync
945 + 'static,
946 <Graph as NetworkGraphView>::Observed: EdgeObservableRead + Send,
947 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed: EdgeObservableRead + Send + 'static,
948 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
949 Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
950{
951 type Error = errors::HoprTransportError;
952 type Observable = <Graph as NetworkGraphView>::Observed;
953
954 async fn ping(&self, key: &OffchainPublicKey) -> Result<(Duration, Self::Observable), Self::Error> {
955 self.ping(key).await
956 }
957
958 async fn observed_multiaddresses(&self, key: &OffchainPublicKey) -> Vec<Multiaddr> {
959 self.network_observed_multiaddresses(key).await
960 }
961}