1pub mod config;
17pub mod constants;
19pub mod errors;
21mod helpers;
22
23#[cfg(feature = "telemetry")]
24pub mod stats;
25
26#[cfg(feature = "capture")]
27mod capture;
28mod pipeline;
29pub mod socket;
30
31use std::{
32 sync::{Arc, OnceLock},
33 time::Duration,
34};
35
36use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
37use futures::{
38 FutureExt, SinkExt, StreamExt,
39 channel::mpsc::{Sender, channel},
40};
41use helpers::PathPlanner;
42use hopr_api::{
43 chain::{AccountSelector, ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
44 db::HoprDbTicketOperations,
45};
46pub use hopr_api::{
47 db::ChannelTicketStatistics,
48 network::{Health, Observable, traits::NetworkView},
49};
50use hopr_async_runtime::{AbortableList, prelude::spawn, spawn_as_abortable};
51use hopr_crypto_packet::prelude::PacketSignal;
52pub use hopr_crypto_types::{
53 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
54 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
55};
56use hopr_ct_telemetry::ImmediateNeighborChannelGraph;
57pub use hopr_internal_types::prelude::HoprPseudonym;
58use hopr_internal_types::prelude::*;
59pub use hopr_network_types::prelude::RoutingOptions;
60use hopr_network_types::prelude::{DestinationRouting, *};
61use hopr_primitive_types::prelude::*;
62pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
63use hopr_protocol_hopr::MemorySurbStore;
64use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
65pub use hopr_transport_identity::{Multiaddr, PeerId, Protocol};
66use hopr_transport_mixer::MixerConfig;
67pub use hopr_transport_network::observation::Observations;
68use hopr_transport_p2p::{HoprLibp2pNetworkBuilder, HoprNetwork};
69use hopr_transport_probe::{
70 Probe, TrafficGeneration,
71 ping::{PingConfig, Pinger},
72};
73pub use hopr_transport_probe::{errors::ProbeError, ping::PingQueryReplier};
74pub use hopr_transport_protocol::{PeerDiscovery, TicketEvent};
75pub use hopr_transport_session as session;
76#[cfg(feature = "runtime-tokio")]
77pub use hopr_transport_session::transfer_session;
78pub use hopr_transport_session::{
79 Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
80 SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
81 errors::{SessionManagerError, TransportSessionError},
82};
83use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
84#[cfg(feature = "telemetry")]
85pub use hopr_transport_session::{
86 SessionAckMode, SessionLifecycleState, SessionLifetimeSnapshot, SessionStatsSnapshot,
87};
88#[cfg(feature = "telemetry")]
89pub use stats::PeerPacketStats;
90use tracing::{Instrument, debug, error, info, trace, warn};
91
92pub use crate::config::HoprProtocolConfig;
93#[cfg(feature = "telemetry")]
94pub use crate::stats::PeerPacketStatsSnapshot;
95use crate::{
96 constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, pipeline::HoprPipelineComponents,
97 socket::HoprSocket,
98};
99
100pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
101
102pub use hopr_api as api;
103
104lazy_static::lazy_static! {
106 static ref SESSION_INITIATION_TIMEOUT_MAX: Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
107}
108
109#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
110pub enum HoprTransportProcess {
111 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
112 Medium,
113 #[strum(to_string = "HOPR packet pipeline ({0})")]
114 Pipeline(hopr_transport_protocol::PacketPipelineProcesses),
115 #[strum(to_string = "session manager sub-process #{0}")]
116 SessionsManagement(usize),
117 #[strum(to_string = "network probing sub-process: {0}")]
118 Probing(hopr_transport_probe::HoprProbeProcess),
119 #[strum(to_string = "sync of outgoing ticket indices")]
120 OutgoingIndexSync,
121 #[cfg(feature = "capture")]
122 #[strum(to_string = "packet capture")]
123 Capture,
124}
125
126type CurrentPathSelector = NoPathSelector;
129
130pub struct HoprTransport<Chain, Db> {
133 packet_key: OffchainKeypair,
134 chain_key: ChainKeypair,
135 db: Db,
136 chain_api: Chain,
137 ping: Arc<OnceLock<Pinger>>,
138 network: Arc<OnceLock<HoprNetwork>>,
139 path_planner: PathPlanner<MemorySurbStore, Chain, CurrentPathSelector>,
140 my_multiaddresses: Vec<Multiaddr>,
141 smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>,
142 cfg: HoprProtocolConfig,
143}
144
145impl<Chain, Db> HoprTransport<Chain, Db>
146where
147 Db: HoprDbTicketOperations + Clone + Send + Sync + 'static,
148 Chain: ChainReadChannelOperations
149 + ChainReadAccountOperations
150 + ChainKeyOperations
151 + ChainValues
152 + Clone
153 + Send
154 + Sync
155 + 'static,
156{
157 pub fn new(
158 identity: (&ChainKeypair, &OffchainKeypair),
159 resolver: Chain,
160 db: Db,
161 my_multiaddresses: Vec<Multiaddr>,
162 cfg: HoprProtocolConfig,
163 ) -> Self {
164 Self {
165 packet_key: identity.1.clone(),
166 chain_key: identity.0.clone(),
167 ping: Arc::new(OnceLock::new()),
168 network: Arc::new(OnceLock::new()),
169 path_planner: PathPlanner::new(
170 *identity.0.as_ref(),
171 MemorySurbStore::new(cfg.packet.surb_store),
172 resolver.clone(),
173 CurrentPathSelector::default(),
174 ),
175 my_multiaddresses,
176 smgr: SessionManager::new(SessionManagerConfig {
177 session_tag_range: (16..65535),
179 maximum_sessions: cfg.session.maximum_sessions as usize,
180 frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
181 .ok()
182 .and_then(|s| s.parse::<usize>().ok())
183 .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
184 .max(ApplicationData::PAYLOAD_SIZE),
185 max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
186 .ok()
187 .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
188 .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
189 .max(Duration::from_millis(100)),
190 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
191 idle_timeout: cfg.session.idle_timeout,
192 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
193 initial_return_session_egress_rate: 10,
194 minimum_surb_buffer_duration: Duration::from_secs(5),
195 maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
196 growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
199 }),
200 db,
201 chain_api: resolver,
202 cfg,
203 }
204 }
205
206 pub async fn run<S, T, Ct>(
213 &self,
214 cover_traffic: Ct,
215 discovery_updates: S,
216 ticket_events: T,
217 on_incoming_session: Sender<IncomingSession>,
218 ) -> errors::Result<(
219 HoprSocket<
220 futures::channel::mpsc::Receiver<ApplicationDataIn>,
221 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
222 >,
223 AbortableList<HoprTransportProcess>,
224 )>
225 where
226 S: futures::Stream<Item = PeerDiscovery> + Send + 'static,
227 T: futures::Sink<TicketEvent> + Clone + Send + Unpin + 'static,
228 T::Error: std::error::Error,
229 Ct: TrafficGeneration + Send + Sync + 'static,
230 {
231 info!("loading initial peers from the chain");
232 let public_nodes = self
233 .chain_api
234 .stream_accounts(AccountSelector {
235 public_only: true,
236 ..Default::default()
237 })
238 .await
239 .map_err(|e| HoprTransportError::Other(e.into()))?
240 .collect::<Vec<_>>()
241 .await;
242
243 let minimum_capacity = public_nodes.len().saturating_mul(2).saturating_add(100);
246
247 let internal_discovery_updates_capacity = std::env::var("HOPR_INTERNAL_DISCOVERY_UPDATES_CAPACITY")
248 .ok()
249 .and_then(|s| s.trim().parse::<usize>().ok())
250 .filter(|&c| c > 0)
251 .unwrap_or(2048)
252 .max(minimum_capacity);
253
254 debug!(
255 capacity = internal_discovery_updates_capacity,
256 minimum_required = minimum_capacity,
257 "creating internal discovery updates channel"
258 );
259 let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
260 futures::channel::mpsc::channel::<PeerDiscovery>(internal_discovery_updates_capacity);
261
262 let discovery_updates =
263 futures_concurrency::stream::StreamExt::merge(discovery_updates, internal_discovery_update_rx);
264
265 info!(
266 public_nodes = public_nodes.len(),
267 "initializing swarm with peers from chain"
268 );
269
270 for node_entry in public_nodes {
271 if let AccountType::Announced(multiaddresses) = node_entry.entry_type {
272 let peer: PeerId = node_entry.public_key.into();
273
274 debug!(%peer, ?multiaddresses, "using initial public node");
275
276 internal_discovery_update_tx
277 .send(PeerDiscovery::Announce(peer, multiaddresses))
278 .await
279 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
280 }
281 }
282
283 let mut processes = AbortableList::<HoprTransportProcess>::default();
284
285 let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
286 channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
287
288 let allow_private_addresses = self.cfg.transport.prefer_local_addresses;
294 let (transport_network, transport_layer_process) = HoprLibp2pNetworkBuilder::new(
295 (&self.packet_key).into(),
296 discovery_updates.filter_map(move |event| async move {
297 match event {
298 PeerDiscovery::Announce(peer, multiaddrs) => {
299 let multiaddrs = multiaddrs
300 .into_iter()
301 .filter(|ma| {
302 hopr_transport_identity::multiaddrs::is_supported(ma)
303 && (allow_private_addresses || is_public_address(ma))
304 })
305 .collect::<Vec<_>>();
306 if multiaddrs.is_empty() {
307 None
308 } else {
309 Some(PeerDiscovery::Announce(peer, multiaddrs))
310 }
311 }
312 }
313 }),
314 self.my_multiaddresses.clone(),
315 )
316 .await
317 .into_network_with_stream_protocol_process(
318 hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL,
319 allow_private_addresses,
320 );
321 self.network
322 .clone()
323 .set(transport_network.clone())
324 .map_err(|_| HoprTransportError::Api("transport network viewer already set".into()))?;
325
326 let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
327 let (wire_msg_tx, wire_msg_rx) =
328 hopr_transport_protocol::stream::process_stream_protocol(msg_codec, transport_network.clone()).await?;
329
330 let (mixing_channel_tx, mixing_channel_rx) =
331 hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(build_mixer_cfg_from_env());
332
333 let _mixing_process_before_sending_out = spawn(
335 mixing_channel_rx
336 .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
337 .map(Ok)
338 .forward(wire_msg_tx)
339 .inspect(|_| {
340 tracing::warn!(
341 task = "mixer -> egress process",
342 "long-running background task finished"
343 )
344 }),
345 );
346
347 processes.insert(
348 HoprTransportProcess::Medium,
349 spawn_as_abortable!(transport_layer_process.inspect(|_| tracing::warn!(
350 task = %HoprTransportProcess::Medium,
351 "long-running background task finished"
352 ))),
353 );
354
355 let msg_protocol_bidirectional_channel_capacity =
356 std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
357 .ok()
358 .and_then(|s| s.trim().parse::<usize>().ok())
359 .filter(|&c| c > 0)
360 .unwrap_or(16_384);
361
362 let (on_incoming_data_tx, on_incoming_data_rx) =
363 channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
364
365 debug!(
366 capacity = msg_protocol_bidirectional_channel_capacity,
367 "creating protocol bidirectional channel"
368 );
369 let (tx_from_protocol, rx_from_protocol) =
370 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
371
372 let path_planner = self.path_planner.clone();
375 let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
376 let all_resolved_external_msg_rx = unresolved_routing_msg_rx.filter_map(move |(unresolved, mut data)| {
377 let path_planner = path_planner.clone();
378 async move {
379 trace!(?unresolved, "resolving routing for packet");
380 match path_planner
381 .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
382 .await
383 {
384 Ok((resolved, rem_surbs)) => {
385 let mut signals_to_dst = data
389 .packet_info
390 .as_ref()
391 .map(|info| info.signals_to_destination)
392 .unwrap_or_default();
393
394 if resolved.is_return() {
395 signals_to_dst = match rem_surbs {
396 Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
397 signals_to_dst | PacketSignal::SurbDistress
398 }
399 Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
400 _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
401 };
402 } else {
403 signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
405 }
406
407 data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
408 trace!(?resolved, "resolved routing for packet");
409 Some((resolved, data))
410 }
411 Err(error) => {
412 error!(%error, "failed to resolve routing");
413 None
414 }
415 }
416 }
417 .in_current_span()
418 });
419
420 let channels_dst = self
421 .chain_api
422 .domain_separators()
423 .await
424 .map_err(HoprTransportError::chain)?
425 .channel;
426
427 processes.extend_from(pipeline::run_hopr_packet_pipeline(
428 (self.packet_key.clone(), self.chain_key.clone()),
429 (mixing_channel_tx, wire_msg_rx),
430 (tx_from_protocol, all_resolved_external_msg_rx),
431 HoprPipelineComponents {
432 ticket_events,
433 surb_store: self.path_planner.surb_store.clone(),
434 chain_api: self.chain_api.clone(),
435 db: self.db.clone(),
436 },
437 channels_dst,
438 self.cfg.packet,
439 ));
440
441 debug!(
443 capacity = msg_protocol_bidirectional_channel_capacity,
444 note = "same as protocol bidirectional",
445 "Creating probing channel"
446 );
447
448 let (tx_from_probing, rx_from_probing) =
449 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
450
451 let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
452 .ok()
453 .and_then(|s| s.trim().parse::<usize>().ok())
454 .filter(|&c| c > 0)
455 .unwrap_or(128);
456 debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
457 let (manual_ping_tx, manual_ping_rx) = channel::<(PeerId, PingQueryReplier)>(manual_ping_channel_capacity);
458
459 let probe = Probe::new(self.cfg.probe);
460
461 let probing_processes = probe
462 .continuously_scan(
463 (unresolved_routing_msg_tx.clone(), rx_from_protocol),
464 manual_ping_rx,
465 tx_from_probing,
466 cover_traffic,
467 ImmediateNeighborChannelGraph::new(transport_network.clone(), self.cfg.probe.recheck_threshold),
468 )
469 .await;
470
471 processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
472
473 self.ping
475 .clone()
476 .set(Pinger::new(
477 PingConfig {
478 timeout: self.cfg.probe.timeout,
479 },
480 manual_ping_tx,
481 ))
482 .map_err(|_| HoprTransportError::Api("must set the ticket aggregation writer only once".into()))?;
483
484 self.smgr
486 .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
487 .map_err(|_| HoprTransportError::Api("failed to start session manager".into()))?
488 .into_iter()
489 .enumerate()
490 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
491 .for_each(|(k, v)| {
492 processes.insert(k, v);
493 });
494
495 let smgr = self.smgr.clone();
496 processes.insert(
497 HoprTransportProcess::SessionsManagement(0),
498 spawn_as_abortable!(
499 rx_from_probing
500 .filter_map(move |(pseudonym, data)| {
501 let smgr = smgr.clone();
502 async move {
503 match smgr.dispatch_message(pseudonym, data).await {
504 Ok(DispatchResult::Processed) => {
505 tracing::trace!("message dispatch completed");
506 None
507 }
508 Ok(DispatchResult::Unrelated(data)) => {
509 tracing::trace!("unrelated message dispatch completed");
510 Some(data)
511 }
512 Err(error) => {
513 tracing::error!(%error, "error while dispatching packet in the session manager");
514 None
515 }
516 }
517 }
518 })
519 .map(Ok)
520 .forward(on_incoming_data_tx)
521 .inspect(|_| tracing::warn!(
522 task = %HoprTransportProcess::SessionsManagement(0),
523 "long-running background task finished"
524 ))
525 ),
526 );
527
528 Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
529 }
530
531 #[tracing::instrument(level = "debug", skip(self))]
532 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
533 if peer == &self.packet_key.public().into() {
534 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
535 }
536
537 let pinger = self
538 .ping
539 .get()
540 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
541
542 let network = self
543 .network
544 .get()
545 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))?;
546
547 let latency = (*pinger).ping(*peer).await?;
548
549 let observations = match network.observations_for(peer) {
550 Some(observations) => observations,
551 None => {
552 futures_time::task::sleep(futures_time::time::Duration::from_millis(50)).await;
554
555 network
556 .observations_for(peer)
557 .ok_or(HoprTransportError::Probe(ProbeError::NonExistingPeer))?
558 }
559 };
560
561 Ok((latency, observations))
562 }
563
564 #[tracing::instrument(level = "debug", skip(self))]
565 pub async fn new_session(
566 &self,
567 destination: Address,
568 target: SessionTarget,
569 cfg: SessionClientConfig,
570 ) -> errors::Result<HoprSession> {
571 Ok(self.smgr.new_session(destination, target, cfg).await?)
572 }
573
574 #[tracing::instrument(level = "debug", skip(self))]
575 pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
576 Ok(self.smgr.ping_session(id).await?)
577 }
578
579 pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
580 Ok(self.smgr.get_surb_balancer_config(id).await?)
581 }
582
583 #[cfg(feature = "telemetry")]
584 pub async fn session_stats(&self, id: &SessionId) -> errors::Result<SessionStatsSnapshot> {
585 Ok(self.smgr.get_session_stats(id).await?)
586 }
587
588 pub async fn update_session_surb_balancing_cfg(
589 &self,
590 id: &SessionId,
591 cfg: SurbBalancerConfig,
592 ) -> errors::Result<()> {
593 Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
594 }
595
596 #[tracing::instrument(level = "debug", skip(self))]
597 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
598 self.network
599 .get()
600 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
601 .map(|network| network.listening_as().into_iter().collect())
602 .unwrap_or_default()
603 }
604
605 #[tracing::instrument(level = "debug", skip(self))]
606 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
607 let mut mas = self
608 .local_multiaddresses()
609 .into_iter()
610 .filter(|ma| {
611 hopr_transport_identity::multiaddrs::is_supported(ma)
612 && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
613 })
614 .map(|ma| strip_p2p_protocol(&ma))
615 .filter(|v| !v.is_empty())
616 .collect::<Vec<_>>();
617
618 mas.sort_by(|l, r| {
619 let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
620 let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
621
622 if !(is_left_dns ^ is_right_dns) {
623 std::cmp::Ordering::Equal
624 } else if is_left_dns {
625 std::cmp::Ordering::Less
626 } else {
627 std::cmp::Ordering::Greater
628 }
629 });
630
631 mas
632 }
633
634 #[tracing::instrument(level = "debug", skip(self))]
635 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
636 self.network
637 .get()
638 .map(|network| network.listening_as().into_iter().collect())
639 .unwrap_or_else(|| {
640 tracing::error!("transport network is not yet initialized, cannot fetch announced multiaddresses");
641 self.my_multiaddresses.clone()
642 })
643 }
644
645 #[tracing::instrument(level = "debug", skip(self))]
646 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
647 match self
648 .network
649 .get()
650 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
651 {
652 Ok(network) => network.multiaddress_of(peer).unwrap_or_default().into_iter().collect(),
653 Err(error) => {
654 tracing::error!(%error, "failed to get observed multiaddresses");
655 return vec![];
656 }
657 }
658 }
659
660 #[tracing::instrument(level = "debug", skip(self))]
661 pub async fn network_health(&self) -> Health {
662 self.network
663 .get()
664 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
665 .map(|network| network.health())
666 .unwrap_or(Health::Red)
667 }
668
669 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
670 Ok(self
671 .network
672 .get()
673 .ok_or_else(|| {
674 tracing::error!("transport network is not yet initialized");
675 HoprTransportError::Api("transport network is not yet initialized".into())
676 })?
677 .connected_peers()
678 .into_iter()
679 .collect())
680 }
681
682 #[tracing::instrument(level = "debug", skip(self))]
683 pub async fn network_peer_observations(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
684 Ok(self
685 .network
686 .get()
687 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))?
688 .observations_for(peer))
689 }
690
691 #[cfg(feature = "telemetry")]
693 #[tracing::instrument(level = "debug", skip(self))]
694 pub async fn network_peer_packet_stats(&self, peer: &PeerId) -> errors::Result<Option<PeerPacketStatsSnapshot>> {
695 Err(HoprTransportError::Api("not implemented yet".into()))
696 }
697
698 #[cfg(feature = "telemetry")]
700 #[tracing::instrument(level = "debug", skip(self))]
701 pub async fn network_all_packet_stats(&self) -> errors::Result<Vec<(PeerId, PeerPacketStatsSnapshot)>> {
702 Err(HoprTransportError::Api("not implemented yet".into()))
703 }
704}
705
706fn build_mixer_cfg_from_env() -> MixerConfig {
707 let mixer_cfg = MixerConfig {
708 min_delay: std::time::Duration::from_millis(
709 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
710 .map(|v| {
711 v.trim()
712 .parse::<u64>()
713 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
714 })
715 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
716 ),
717 delay_range: std::time::Duration::from_millis(
718 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
719 .map(|v| {
720 v.trim()
721 .parse::<u64>()
722 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
723 })
724 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
725 ),
726 capacity: {
727 let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
728 .ok()
729 .and_then(|s| s.trim().parse::<usize>().ok())
730 .filter(|&c| c > 0)
731 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
732 debug!(capacity = capacity, "Setting mixer capacity");
733 capacity
734 },
735 ..MixerConfig::default()
736 };
737 debug!(?mixer_cfg, "Mixer configuration");
738
739 mixer_cfg
740}