1pub mod config;
17pub mod constants;
19pub mod errors;
21
22mod multiaddrs;
23
24#[cfg(feature = "capture")]
25mod capture;
26mod pipeline;
27pub mod socket;
28
29use std::{
30 sync::{Arc, OnceLock},
31 time::Duration,
32};
33
34use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
35use futures::{
36 FutureExt, StreamExt,
37 channel::mpsc::{Sender, channel},
38 stream::select_with_strategy,
39};
40pub use hopr_api::{
41 Multiaddr, PeerId,
42 network::{Health, traits::NetworkView},
43 types::{
44 crypto::{
45 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
46 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
47 },
48 internal::{prelude::HoprPseudonym, routing::RoutingOptions},
49 },
50};
51use hopr_api::{
52 chain::{ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
53 ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
54 graph::{NetworkGraphUpdate, NetworkGraphView, traits::EdgeObservableRead},
55 network::{BoxedProcessFn, NetworkStreamControl},
56 types::primitive::prelude::*,
57};
58use hopr_async_runtime::{AbortableList, prelude::spawn, spawn_as_abortable};
59use hopr_crypto_packet::prelude::PacketSignal;
60use hopr_network_types::prelude::*;
61pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
62use hopr_protocol_hopr::MemorySurbStore;
63use hopr_transport_mixer::MixerConfig;
64use hopr_transport_path::{BackgroundPathCacheRefreshable, HoprGraphPathSelector, PathPlanner};
65pub use hopr_transport_probe::{NeighborTelemetry, PathTelemetry, errors::ProbeError, ping::PingQueryReplier};
66use hopr_transport_probe::{
67 Probe,
68 ping::{PingConfig, Pinger},
69};
70pub use hopr_transport_protocol::PeerProtocolCounterRegistry;
71pub use hopr_transport_session as session;
72#[cfg(feature = "runtime-tokio")]
73pub use hopr_transport_session::transfer_session;
74pub use hopr_transport_session::{
75 Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
76 SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
77 errors::{SessionManagerError, TransportSessionError},
78};
79use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
80#[cfg(feature = "telemetry")]
81pub use hopr_transport_session::{SessionAckMode, SessionLifecycleState};
82pub use hopr_transport_tag_allocator::TagAllocatorConfig;
83pub use multiaddr::Protocol;
84use tracing::{Instrument, debug, error, trace, warn};
85
86pub use crate::config::HoprProtocolConfig;
87use crate::{
88 constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, multiaddrs::strip_p2p_protocol,
89 pipeline::HoprPipelineComponents, socket::HoprSocket,
90};
91
92pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
93
94pub use hopr_api as api;
95use hopr_api::{
96 chain::{ChainReadTicketOperations, ChainWriteTicketOperations},
97 tickets::TicketFactory,
98 types::internal::routing::DestinationRouting,
99};
100
101lazy_static::lazy_static! {
103 static ref SESSION_INITIATION_TIMEOUT_MAX: Duration = 2 * SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
104
105 static ref PEER_ID_CACHE: moka::sync::Cache<PeerId, OffchainPublicKey> = moka::sync::Cache::builder()
106 .time_to_idle(Duration::from_mins(15))
107 .max_capacity(10_000)
108 .build();
109
110 static ref RANDOM_DATA: [u8; 400] = hopr_api::types::crypto_random::random_bytes();
111}
112
113pub fn peer_id_to_public_key(peer_id: &PeerId) -> crate::errors::Result<OffchainPublicKey> {
118 PEER_ID_CACHE
119 .try_get_with_by_ref(peer_id, move || {
120 OffchainPublicKey::from_peerid(peer_id).map_err(|e| e.into())
121 })
122 .map_err(|e: Arc<HoprTransportError>| {
123 crate::errors::HoprTransportError::Other(anyhow::anyhow!(
124 "failed to convert peer_id ({:?}) to an offchain public key: {e}",
125 peer_id
126 ))
127 })
128}
129
130#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
131pub enum HoprTransportProcess {
132 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
133 Medium,
134 #[strum(to_string = "HOPR packet pipeline ({0})")]
135 Pipeline(hopr_transport_protocol::PacketPipelineProcesses),
136 #[strum(to_string = "session manager sub-process #{0}")]
137 SessionsManagement(usize),
138 #[strum(to_string = "network probing sub-process: {0}")]
139 Probing(hopr_transport_probe::HoprProbeProcess),
140 #[cfg(feature = "runtime-tokio")]
141 #[strum(to_string = "path cache refresh")]
142 PathRefresh,
143 #[strum(to_string = "sync of outgoing ticket indices")]
144 OutgoingIndexSync,
145 #[strum(to_string = "periodic protocol counter flush")]
146 CounterFlush,
147 #[cfg(feature = "capture")]
148 #[strum(to_string = "packet capture")]
149 Capture,
150}
151
152type HoprSessionManager = SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>;
154
155#[derive(Debug, Clone)]
160pub struct HoprSessionConfigurator {
161 id: SessionId,
162 smgr: std::sync::Weak<HoprSessionManager>,
164}
165
166impl HoprSessionConfigurator {
167 pub fn id(&self) -> &SessionId {
169 &self.id
170 }
171
172 pub async fn ping(&self) -> errors::Result<()> {
180 Ok(self
181 .smgr
182 .upgrade()
183 .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
184 .ping_session(&self.id)
185 .await?)
186 }
187
188 pub async fn get_surb_balancer_config(&self) -> errors::Result<Option<SurbBalancerConfig>> {
194 Ok(self
195 .smgr
196 .upgrade()
197 .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
198 .get_surb_balancer_config(&self.id)
199 .await?)
200 }
201
202 pub async fn update_surb_balancer_config(&self, config: SurbBalancerConfig) -> errors::Result<()> {
207 Ok(self
208 .smgr
209 .upgrade()
210 .ok_or(HoprTransportError::Other(anyhow::anyhow!("session manager is dropped")))?
211 .update_surb_balancer_config(&self.id, config)
212 .await?)
213 }
214}
215
216pub struct HoprTransport<Chain, Graph, Net> {
219 packet_key: OffchainKeypair,
220 chain_key: ChainKeypair,
221 chain_api: Chain,
222 ping: Arc<OnceLock<Pinger>>,
223 network: Arc<OnceLock<Net>>,
224 graph: Graph,
225 path_planner: PathPlanner<MemorySurbStore, Chain, HoprGraphPathSelector<Graph>>,
226 my_multiaddresses: Vec<Multiaddr>,
227 smgr: Arc<HoprSessionManager>,
228 session_telemetry_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
229 probing_tag_allocator: Arc<dyn hopr_transport_tag_allocator::TagAllocator + Send + Sync>,
230 counters: PeerProtocolCounterRegistry,
231 cfg: HoprProtocolConfig,
232}
233
234impl<Chain, Graph, Net> HoprTransport<Chain, Graph, Net>
235where
236 Chain: ChainReadChannelOperations
237 + ChainReadAccountOperations
238 + ChainWriteTicketOperations
239 + ChainKeyOperations
240 + ChainReadTicketOperations
241 + ChainValues
242 + Clone
243 + Send
244 + Sync
245 + 'static,
246 Graph: NetworkGraphView<NodeId = OffchainPublicKey>
247 + NetworkGraphUpdate
248 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
249 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
250 + Clone
251 + Send
252 + Sync
253 + 'static,
254 <Graph as NetworkGraphView>::Observed: hopr_api::graph::traits::EdgeObservableRead + Send,
255 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
256 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
257 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
258 Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
259{
260 pub fn new(
261 identity: (&ChainKeypair, &OffchainKeypair),
262 resolver: Chain,
263 graph: Graph,
264 my_multiaddresses: Vec<Multiaddr>,
265 cfg: HoprProtocolConfig,
266 ) -> errors::Result<Self> {
267 let me_offchain = *identity.1.public();
268 let planner_config = cfg.path_planner;
269 let selector = HoprGraphPathSelector::new(
270 me_offchain,
271 graph.clone(),
272 planner_config.max_cached_paths,
273 planner_config.edge_penalty,
274 planner_config.min_ack_rate,
275 );
276
277 let tag_allocators = hopr_transport_tag_allocator::create_allocators_from_config(&cfg.session.tag_allocator)?;
278
279 let mut session_tag_allocator = None;
280 let mut session_telemetry_tag_allocator = None;
281 let mut probing_tag_allocator = None;
282 for (usage, alloc) in tag_allocators {
283 match usage {
284 hopr_transport_tag_allocator::Usage::Session => session_tag_allocator = Some(alloc),
285 hopr_transport_tag_allocator::Usage::SessionTerminalTelemetry => {
286 session_telemetry_tag_allocator = Some(alloc)
287 }
288 hopr_transport_tag_allocator::Usage::ProvingTelemetry => probing_tag_allocator = Some(alloc),
289 }
290 }
291 let session_tag_allocator = session_tag_allocator
292 .ok_or_else(|| errors::HoprTransportError::Api("session tag allocator missing".into()))?;
293 let session_telemetry_tag_allocator = session_telemetry_tag_allocator
294 .ok_or_else(|| errors::HoprTransportError::Api("session telemetry tag allocator missing".into()))?;
295 let probing_tag_allocator = probing_tag_allocator
296 .ok_or_else(|| errors::HoprTransportError::Api("probing tag allocator missing".into()))?;
297
298 Ok(Self {
299 packet_key: identity.1.clone(),
300 chain_key: identity.0.clone(),
301 ping: Arc::new(OnceLock::new()),
302 network: Arc::new(OnceLock::new()),
303 graph,
304 path_planner: PathPlanner::new(
305 me_offchain,
306 MemorySurbStore::new(cfg.packet.surb_store),
307 resolver.clone(),
308 selector,
309 planner_config,
310 ),
311 my_multiaddresses,
312 smgr: SessionManager::new(
313 SessionManagerConfig {
314 frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
315 .ok()
316 .and_then(|s| s.parse::<usize>().ok())
317 .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
318 .max(ApplicationData::PAYLOAD_SIZE),
319 max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
320 .ok()
321 .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
322 .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
323 .max(Duration::from_millis(100)),
324 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
325 idle_timeout: cfg.session.idle_timeout,
326 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
327 initial_return_session_egress_rate: 10,
328 minimum_surb_buffer_duration: cfg.session.balancer_minimum_surb_buffer_duration,
329 maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
330 surb_balance_notify_period: None,
331 surb_target_notify: true,
332 },
333 session_tag_allocator,
334 )
335 .into(),
336 chain_api: resolver,
337 session_telemetry_tag_allocator,
338 probing_tag_allocator,
339 counters: PeerProtocolCounterRegistry::default(),
340 cfg,
341 })
342 }
343
344 pub async fn run<T, TFact, Ct>(
351 &self,
352 cover_traffic: Ct,
353 network: Net,
354 network_process: BoxedProcessFn,
355 ticket_events: T,
356 ticket_factory: TFact,
357 on_incoming_session: Sender<IncomingSession>,
358 ) -> errors::Result<(
359 HoprSocket<
360 futures::channel::mpsc::Receiver<ApplicationDataIn>,
361 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
362 >,
363 AbortableList<HoprTransportProcess>,
364 )>
365 where
366 T: futures::Sink<hopr_api::node::TicketEvent> + Clone + Send + Unpin + 'static,
367 T::Error: std::error::Error + Clone + Send,
368 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
369 TFact: TicketFactory + Clone + Send + Sync + 'static,
370 {
371 let mut processes = AbortableList::<HoprTransportProcess>::default();
372
373 let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
374 channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
375
376 let transport_network = network;
379 let transport_layer_process = network_process;
380
381 let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
382 let (wire_msg_tx, wire_msg_rx) =
383 hopr_transport_protocol::stream::process_stream_protocol(msg_codec, transport_network.clone()).await?;
384
385 let (mixing_channel_tx, mixing_channel_rx) =
386 hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(build_mixer_cfg_from_env());
387
388 let _mixing_process_before_sending_out = spawn(
390 mixing_channel_rx
391 .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
392 .map(Ok)
393 .forward(wire_msg_tx)
394 .inspect(|_| {
395 tracing::warn!(
396 task = "mixer -> egress process",
397 "long-running background task finished"
398 )
399 }),
400 );
401
402 #[cfg(feature = "runtime-tokio")]
404 processes.insert(
405 HoprTransportProcess::PathRefresh,
406 spawn_as_abortable!(self.path_planner.run_background_refresh()),
407 );
408
409 processes.insert(
410 HoprTransportProcess::Medium,
411 spawn_as_abortable!(transport_layer_process().inspect(|_| tracing::warn!(
412 task = %HoprTransportProcess::Medium,
413 "long-running background task finished"
414 ))),
415 );
416
417 let msg_protocol_bidirectional_channel_capacity =
418 std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
419 .ok()
420 .and_then(|s| s.trim().parse::<usize>().ok())
421 .filter(|&c| c > 0)
422 .unwrap_or(16_384);
423
424 let (on_incoming_data_tx, on_incoming_data_rx) =
425 channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
426
427 debug!(
428 capacity = msg_protocol_bidirectional_channel_capacity,
429 "creating protocol bidirectional channel"
430 );
431 let (tx_from_protocol, rx_from_protocol) =
432 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
433
434 let cover_traffic_allocated_tag = self
438 .session_telemetry_tag_allocator
439 .allocate()
440 .ok_or_else(|| HoprTransportError::Api("failed to allocate cover traffic tag".into()))?;
441 let cover_traffic_tag: Tag = cover_traffic_allocated_tag.value().into();
442
443 let rx_from_protocol = rx_from_protocol.filter_map(move |(pseudonym, data)| {
446 let _keep_alive = &cover_traffic_allocated_tag;
447 async move { (data.data.application_tag != cover_traffic_tag).then_some((pseudonym, data)) }
448 });
449
450 let cover_traffic_stream = CoverTrafficGeneration::build(&cover_traffic).filter_map(move |routing| {
452 let start =
453 hopr_api::types::crypto_random::random_integer(0, Some((RANDOM_DATA.len() - 100) as u64)) as usize;
454 let data = &RANDOM_DATA[start..start + 100];
455
456 futures::future::ready(if let Ok(data) = ApplicationData::new(cover_traffic_tag, data) {
457 Some((routing, ApplicationDataOut::with_no_packet_info(data)))
458 } else {
459 tracing::error!("failed to construct cover traffic packet");
460 None
461 })
462 });
463
464 let merged_unresolved_output_data =
466 select_with_strategy(unresolved_routing_msg_rx, cover_traffic_stream, |_: &mut ()| {
467 futures::stream::PollNext::Left
468 });
469
470 let path_planner = self.path_planner.clone();
475 let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
476 let all_resolved_external_msg_rx = merged_unresolved_output_data.filter_map(move |(unresolved, mut data)| {
477 let path_planner = path_planner.clone();
478 async move {
479 trace!(?unresolved, "resolving routing for packet");
480 match path_planner
481 .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
482 .await
483 {
484 Ok((resolved, rem_surbs)) => {
485 let mut signals_to_dst = data
489 .packet_info
490 .as_ref()
491 .map(|info| info.signals_to_destination)
492 .unwrap_or_default();
493
494 if resolved.is_return() {
495 signals_to_dst = match rem_surbs {
496 Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
497 signals_to_dst | PacketSignal::SurbDistress
498 }
499 Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
500 _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
501 };
502 } else {
503 signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
505 }
506
507 data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
508 trace!(?resolved, "resolved routing for packet");
509 Some((resolved, data))
510 }
511 Err(error) => {
512 error!(%error, "failed to resolve routing");
513 None
514 }
515 }
516 }
517 .in_current_span()
518 });
519
520 let channels_dst = self
521 .chain_api
522 .domain_separators()
523 .await
524 .map_err(HoprTransportError::chain)?
525 .channel;
526
527 processes.extend_from(pipeline::run_hopr_packet_pipeline(
528 (self.packet_key.clone(), self.chain_key.clone()),
529 (mixing_channel_tx, wire_msg_rx),
530 (tx_from_protocol, all_resolved_external_msg_rx),
531 HoprPipelineComponents {
532 surb_store: self.path_planner.surb_store.clone(),
533 chain_api: self.chain_api.clone(),
534 counters: self.counters.clone(),
535 ticket_factory,
536 ticket_events,
537 },
538 channels_dst,
539 self.cfg.packet,
540 ));
541
542 let flush_counters = self.counters.clone();
544 let flush_graph = self.graph.clone();
545 let flush_me = *self.packet_key.public();
546 let flush_interval = self.cfg.counter_flush_interval;
547 processes.insert(
548 HoprTransportProcess::CounterFlush,
549 spawn_as_abortable!(async move {
550 use hopr_api::graph::traits::{EdgeObservableWrite, EdgeWeightType};
551
552 futures_time::stream::interval(futures_time::time::Duration::from(flush_interval))
553 .for_each(|_| {
554 for (peer, num_packets, num_acks) in flush_counters.drain() {
555 tracing::trace!(
556 %peer,
557 num_packets,
558 num_acks,
559 "flushing protocol conformance counters"
560 );
561 flush_graph.upsert_edge(&flush_me, &peer, |obs| {
562 obs.record(EdgeWeightType::ImmediateProtocolConformance { num_packets, num_acks });
563 });
564 }
565 futures::future::ready(())
566 })
567 .await;
568 }),
569 );
570
571 debug!(
573 capacity = msg_protocol_bidirectional_channel_capacity,
574 note = "same as protocol bidirectional",
575 "Creating probing channel"
576 );
577
578 let (tx_from_probing, rx_from_probing) =
579 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
580
581 let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
582 .ok()
583 .and_then(|s| s.trim().parse::<usize>().ok())
584 .filter(|&c| c > 0)
585 .unwrap_or(128);
586 debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
587 let (manual_ping_tx, manual_ping_rx) =
588 channel::<(OffchainPublicKey, PingQueryReplier)>(manual_ping_channel_capacity);
589
590 let probe = Probe::new(self.cfg.probe, self.probing_tag_allocator.clone());
591
592 let probing_processes = probe
593 .continuously_scan(
594 (unresolved_routing_msg_tx.clone(), rx_from_protocol),
595 manual_ping_rx,
596 tx_from_probing,
597 cover_traffic,
598 self.graph.clone(),
599 )
600 .await;
601
602 processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
603
604 self.ping
606 .clone()
607 .set(Pinger::new(
608 PingConfig {
609 timeout: self.cfg.probe.timeout,
610 },
611 manual_ping_tx,
612 ))
613 .map_err(|_| HoprTransportError::Api("must set the ticket aggregation writer only once".into()))?;
614
615 self.smgr
617 .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
618 .map_err(|_| HoprTransportError::Api("failed to start session manager".into()))?
619 .into_iter()
620 .enumerate()
621 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
622 .for_each(|(k, v)| {
623 processes.insert(k, v);
624 });
625
626 let smgr = self.smgr.clone();
627 processes.insert(
628 HoprTransportProcess::SessionsManagement(0),
629 spawn_as_abortable!(
630 rx_from_probing
631 .filter_map(move |(pseudonym, data)| {
632 let smgr = smgr.clone();
633 async move {
634 match smgr.dispatch_message(pseudonym, data).await {
635 Ok(DispatchResult::Processed) => {
636 tracing::trace!("message dispatch completed");
637 None
638 }
639 Ok(DispatchResult::Unrelated(data)) => {
640 tracing::trace!("unrelated message dispatch completed");
641 Some(data)
642 }
643 Err(error) => {
644 tracing::error!(%error, "error while dispatching packet in the session manager");
645 None
646 }
647 }
648 }
649 })
650 .map(Ok)
651 .forward(on_incoming_data_tx)
652 .inspect(|_| tracing::warn!(
653 task = %HoprTransportProcess::SessionsManagement(0),
654 "long-running background task finished"
655 ))
656 ),
657 );
658
659 self.network
661 .clone()
662 .set(transport_network)
663 .map_err(|_| HoprTransportError::Api("transport network viewer already set".into()))?;
664
665 Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
666 }
667
668 #[tracing::instrument(level = "debug", skip(self))]
669 pub async fn ping(
670 &self,
671 peer: &OffchainPublicKey,
672 ) -> errors::Result<(std::time::Duration, <Graph as NetworkGraphView>::Observed)> {
673 let me: &OffchainPublicKey = self.packet_key.public();
674 if peer == me {
675 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
676 }
677
678 let pinger = self
679 .ping
680 .get()
681 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
682
683 let latency = (*pinger).ping(peer).await?;
684
685 if let Some(observations) = self.graph.edge(me, peer) {
686 Ok((latency, observations))
687 } else {
688 Err(HoprTransportError::Api(format!(
689 "no observations available for peer {peer}",
690 )))
691 }
692 }
693
694 #[tracing::instrument(level = "debug", skip(self))]
695 pub async fn new_session(
696 &self,
697 destination: Address,
698 target: SessionTarget,
699 cfg: SessionClientConfig,
700 ) -> errors::Result<(HoprSession, HoprSessionConfigurator)> {
701 let session = self.smgr.new_session(destination, target, cfg).await?;
702 let id = *session.id();
703 Ok((
704 session,
705 HoprSessionConfigurator {
706 id,
707 smgr: Arc::downgrade(&self.smgr),
708 },
709 ))
710 }
711
712 #[tracing::instrument(level = "debug", skip(self))]
713 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
714 self.network
715 .get()
716 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
717 .map(|network| network.listening_as().into_iter().collect())
718 .unwrap_or_default()
719 }
720
721 #[tracing::instrument(level = "debug", skip(self))]
722 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
723 let mut mas = self
724 .local_multiaddresses()
725 .into_iter()
726 .filter(|ma| {
727 crate::multiaddrs::is_supported(ma)
728 && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
729 })
730 .map(|ma| strip_p2p_protocol(&ma))
731 .filter(|v| !v.is_empty())
732 .collect::<Vec<_>>();
733
734 mas.sort_by(|l, r| {
735 let is_left_dns = crate::multiaddrs::is_dns(l);
736 let is_right_dns = crate::multiaddrs::is_dns(r);
737
738 if !(is_left_dns ^ is_right_dns) {
739 std::cmp::Ordering::Equal
740 } else if is_left_dns {
741 std::cmp::Ordering::Less
742 } else {
743 std::cmp::Ordering::Greater
744 }
745 });
746
747 mas
748 }
749
750 pub fn graph(&self) -> &Graph {
752 &self.graph
753 }
754
755 #[tracing::instrument(level = "debug", skip(self))]
756 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
757 self.network
758 .get()
759 .map(|network| network.listening_as().into_iter().collect())
760 .unwrap_or_else(|| {
761 tracing::error!("transport network is not yet initialized, cannot fetch announced multiaddresses");
762 self.my_multiaddresses.clone()
763 })
764 }
765
766 #[tracing::instrument(level = "debug", skip(self))]
767 pub async fn network_observed_multiaddresses(&self, peer: &OffchainPublicKey) -> Vec<Multiaddr> {
768 match self
769 .network
770 .get()
771 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
772 {
773 Ok(network) => network
774 .multiaddress_of(&peer.into())
775 .unwrap_or_default()
776 .into_iter()
777 .collect(),
778 Err(error) => {
779 tracing::error!(%error, "failed to get observed multiaddresses");
780 return vec![];
781 }
782 }
783 }
784
785 #[tracing::instrument(level = "debug", skip(self))]
786 pub async fn network_health(&self) -> Health {
787 self.network
788 .get()
789 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
790 .map(|network| network.health())
791 .unwrap_or(Health::Red)
792 }
793
794 pub async fn network_connected_peers(&self) -> errors::Result<Vec<OffchainPublicKey>> {
795 Ok(futures::stream::iter(
796 self.network
797 .get()
798 .ok_or_else(|| {
799 tracing::error!("transport network is not yet initialized");
800 HoprTransportError::Api("transport network is not yet initialized".into())
801 })?
802 .connected_peers(),
803 )
804 .filter_map(|peer_id| async move {
805 match peer_id_to_public_key(&peer_id) {
806 Ok(key) => Some(key),
807 Err(error) => {
808 tracing::warn!(%peer_id, %error, "failed to convert PeerId to OffchainPublicKey");
809 None
810 }
811 }
812 })
813 .collect()
814 .await)
815 }
816
817 #[tracing::instrument(level = "debug", skip(self))]
818 pub fn network_peer_observations(&self, peer: &OffchainPublicKey) -> Option<<Graph as NetworkGraphView>::Observed> {
819 self.graph.edge(self.packet_key.public(), peer)
820 }
821
822 #[tracing::instrument(level = "debug", skip(self))]
824 pub async fn all_network_peers(
825 &self,
826 minimum_score: f64,
827 ) -> errors::Result<Vec<(OffchainPublicKey, <Graph as NetworkGraphView>::Observed)>> {
828 let me = self.packet_key.public();
829 Ok(self
830 .network_connected_peers()
831 .await?
832 .into_iter()
833 .filter_map(|peer| {
834 let observation = self.graph.edge(me, &peer);
835 if let Some(info) = observation {
836 if info.score() >= minimum_score {
837 Some((peer, info))
838 } else {
839 None
840 }
841 } else {
842 None
843 }
844 })
845 .collect::<Vec<_>>())
846 }
847}
848
849fn build_mixer_cfg_from_env() -> MixerConfig {
850 let mixer_cfg = MixerConfig {
851 min_delay: std::time::Duration::from_millis(
852 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
853 .map(|v| {
854 v.trim()
855 .parse::<u64>()
856 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
857 })
858 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
859 ),
860 delay_range: std::time::Duration::from_millis(
861 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
862 .map(|v| {
863 v.trim()
864 .parse::<u64>()
865 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
866 })
867 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
868 ),
869 capacity: {
870 let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
871 .ok()
872 .and_then(|s| s.trim().parse::<usize>().ok())
873 .filter(|&c| c > 0)
874 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
875 debug!(capacity = capacity, "Setting mixer capacity");
876 capacity
877 },
878 ..MixerConfig::default()
879 };
880 debug!(?mixer_cfg, "Mixer configuration");
881
882 mixer_cfg
883}
884
885impl<Chain, Graph, Net> NetworkView for HoprTransport<Chain, Graph, Net>
890where
891 Net: NetworkView + Send + Sync + 'static,
892{
893 fn listening_as(&self) -> std::collections::HashSet<Multiaddr> {
894 self.network.get().map(|n| n.listening_as()).unwrap_or_default()
895 }
896
897 fn multiaddress_of(&self, peer: &PeerId) -> Option<std::collections::HashSet<Multiaddr>> {
898 self.network.get()?.multiaddress_of(peer)
899 }
900
901 fn discovered_peers(&self) -> std::collections::HashSet<PeerId> {
902 self.network.get().map(|n| n.discovered_peers()).unwrap_or_default()
903 }
904
905 fn connected_peers(&self) -> std::collections::HashSet<PeerId> {
906 self.network.get().map(|n| n.connected_peers()).unwrap_or_default()
907 }
908
909 fn is_connected(&self, peer: &PeerId) -> bool {
910 self.network.get().map(|n| n.is_connected(peer)).unwrap_or(false)
911 }
912
913 fn health(&self) -> Health {
914 self.network.get().map(|n| n.health()).unwrap_or(Health::Red)
915 }
916
917 fn subscribe_network_events(
918 &self,
919 ) -> impl futures::Stream<Item = hopr_api::network::NetworkEvent> + Send + 'static {
920 match self.network.get() {
921 Some(n) => futures::future::Either::Left(n.subscribe_network_events()),
922 None => futures::future::Either::Right(futures::stream::empty()),
923 }
924 }
925}
926
927#[async_trait::async_trait]
932impl<Chain, Graph, Net> hopr_api::node::TransportOperations for HoprTransport<Chain, Graph, Net>
933where
934 Chain: ChainReadChannelOperations
935 + ChainReadAccountOperations
936 + hopr_api::chain::ChainWriteTicketOperations
937 + ChainKeyOperations
938 + hopr_api::chain::ChainReadTicketOperations
939 + ChainValues
940 + Clone
941 + Send
942 + Sync
943 + 'static,
944 Graph: NetworkGraphView<NodeId = OffchainPublicKey>
945 + NetworkGraphUpdate
946 + hopr_api::graph::NetworkGraphWrite<NodeId = OffchainPublicKey>
947 + hopr_api::graph::NetworkGraphTraverse<NodeId = OffchainPublicKey>
948 + Clone
949 + Send
950 + Sync
951 + 'static,
952 <Graph as NetworkGraphView>::Observed: EdgeObservableRead + Send,
953 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed: EdgeObservableRead + Send + 'static,
954 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
955 Net: NetworkView + NetworkStreamControl + Clone + Send + Sync + 'static,
956{
957 type Error = errors::HoprTransportError;
958 type Observable = <Graph as NetworkGraphView>::Observed;
959
960 async fn ping(&self, key: &OffchainPublicKey) -> Result<(Duration, Self::Observable), Self::Error> {
961 self.ping(key).await
962 }
963
964 async fn observed_multiaddresses(&self, key: &OffchainPublicKey) -> Vec<Multiaddr> {
965 self.network_observed_multiaddresses(key).await
966 }
967}