1pub mod config;
17pub mod constants;
19pub mod errors;
21mod helpers;
22pub mod network_notifier;
23
24#[cfg(feature = "capture")]
25mod capture;
26mod pipeline;
27pub mod socket;
28
29use std::{
30 sync::{Arc, OnceLock},
31 time::Duration,
32};
33
34use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
35use futures::{
36 FutureExt, SinkExt, StreamExt,
37 channel::mpsc::{Sender, channel},
38};
39use helpers::PathPlanner;
40pub use hopr_api::db::ChannelTicketStatistics;
41use hopr_api::{
42 chain::{AccountSelector, ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
43 db::{HoprDbPeersOperations, HoprDbTicketOperations, PeerOrigin, PeerStatus},
44};
45use hopr_async_runtime::{AbortableList, prelude::spawn, spawn_as_abortable};
46use hopr_crypto_packet::prelude::PacketSignal;
47pub use hopr_crypto_types::{
48 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
49 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
50};
51pub use hopr_internal_types::prelude::HoprPseudonym;
52use hopr_internal_types::prelude::*;
53pub use hopr_network_types::prelude::RoutingOptions;
54use hopr_network_types::prelude::{DestinationRouting, *};
55use hopr_primitive_types::prelude::*;
56pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
57use hopr_protocol_hopr::MemorySurbStore;
58use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
59pub use hopr_transport_identity::{Multiaddr, PeerId, Protocol};
60use hopr_transport_mixer::MixerConfig;
61pub use hopr_transport_network::network::{Health, Network};
62use hopr_transport_p2p::HoprSwarm;
63use hopr_transport_probe::{
64 Probe,
65 neighbors::ImmediateNeighborProber,
66 ping::{PingConfig, Pinger},
67};
68pub use hopr_transport_probe::{
69 errors::ProbeError,
70 ping::PingQueryReplier,
71 traits::TrafficGeneration,
72 types::{NeighborTelemetry, Telemetry},
73};
74pub use hopr_transport_protocol::{PeerDiscovery, TicketEvent};
75pub use hopr_transport_session as session;
76#[cfg(feature = "runtime-tokio")]
77pub use hopr_transport_session::transfer_session;
78pub use hopr_transport_session::{
79 Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
80 SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
81 errors::{SessionManagerError, TransportSessionError},
82};
83use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
84use tracing::{Instrument, debug, error, info, trace, warn};
85
86pub use crate::config::HoprProtocolConfig;
87use crate::{
88 constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, pipeline::HoprPipelineComponents,
89 socket::HoprSocket,
90};
91
92pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
93
94lazy_static::lazy_static! {
96 static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
97}
98
99#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
100pub enum HoprTransportProcess {
101 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
102 Medium,
103 #[strum(to_string = "HOPR packet pipeline ({0})")]
104 Pipeline(hopr_transport_protocol::PacketPipelineProcesses),
105 #[strum(to_string = "session manager sub-process #{0}")]
106 SessionsManagement(usize),
107 #[strum(to_string = "network probing sub-process: {0}")]
108 Probing(hopr_transport_probe::HoprProbeProcess),
109 #[strum(to_string = "sync of outgoing ticket indices")]
110 OutgoingIndexSync,
111 #[cfg(feature = "capture")]
112 #[strum(to_string = "packet capture")]
113 Capture,
114}
115
116type CurrentPathSelector = NoPathSelector;
119
120pub struct HoprTransport<Chain, Db> {
123 packet_key: OffchainKeypair,
124 chain_key: ChainKeypair,
125 db: Db,
126 chain_api: Chain,
127 ping: Arc<OnceLock<Pinger>>,
128 network: Arc<Network<Db>>,
129 path_planner: PathPlanner<MemorySurbStore, Chain, CurrentPathSelector>,
130 my_multiaddresses: Vec<Multiaddr>,
131 smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>,
132 cfg: HoprProtocolConfig,
133}
134
135impl<Chain, Db> HoprTransport<Chain, Db>
136where
137 Db: HoprDbTicketOperations + HoprDbPeersOperations + Clone + Send + Sync + 'static,
138 Chain: ChainReadChannelOperations
139 + ChainReadAccountOperations
140 + ChainKeyOperations
141 + ChainValues
142 + Clone
143 + Send
144 + Sync
145 + 'static,
146{
147 pub fn new(
148 identity: (&ChainKeypair, &OffchainKeypair),
149 resolver: Chain,
150 db: Db,
151 my_multiaddresses: Vec<Multiaddr>,
152 cfg: HoprProtocolConfig,
153 ) -> Self {
154 Self {
155 packet_key: identity.1.clone(),
156 chain_key: identity.0.clone(),
157 ping: Arc::new(OnceLock::new()),
158 network: Arc::new(Network::new(
159 identity.1.into(),
160 my_multiaddresses.clone(),
161 cfg.network,
162 db.clone(),
163 )),
164 path_planner: PathPlanner::new(
165 *identity.0.as_ref(),
166 MemorySurbStore::new(cfg.packet.surb_store),
167 resolver.clone(),
168 CurrentPathSelector::default(),
169 ),
170 my_multiaddresses,
171 smgr: SessionManager::new(SessionManagerConfig {
172 session_tag_range: (16..65535),
174 maximum_sessions: cfg.session.maximum_sessions as usize,
175 frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
176 .ok()
177 .and_then(|s| s.parse::<usize>().ok())
178 .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
179 .max(ApplicationData::PAYLOAD_SIZE),
180 max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
181 .ok()
182 .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
183 .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
184 .max(Duration::from_millis(100)),
185 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
186 idle_timeout: cfg.session.idle_timeout,
187 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
188 initial_return_session_egress_rate: 10,
189 minimum_surb_buffer_duration: Duration::from_secs(5),
190 maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
191 growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
194 }),
195 db,
196 chain_api: resolver,
197 cfg,
198 }
199 }
200
201 pub async fn run<S, T, Ct>(
208 &self,
209 cover_traffic: Option<Ct>,
210 discovery_updates: S,
211 ticket_events: T,
212 on_incoming_session: Sender<IncomingSession>,
213 ) -> errors::Result<(
214 HoprSocket<
215 futures::channel::mpsc::Receiver<ApplicationDataIn>,
216 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
217 >,
218 AbortableList<HoprTransportProcess>,
219 )>
220 where
221 S: futures::Stream<Item = PeerDiscovery> + Send + 'static,
222 T: futures::Sink<TicketEvent> + Clone + Send + Unpin + 'static,
223 T::Error: std::error::Error,
224 Ct: TrafficGeneration + Send + Sync + 'static,
225 {
226 info!("loading initial peers from the chain");
227 let public_nodes = self
228 .chain_api
229 .stream_accounts(AccountSelector {
230 public_only: true,
231 ..Default::default()
232 })
233 .await
234 .map_err(|e| HoprTransportError::Other(e.into()))?
235 .collect::<Vec<_>>()
236 .await;
237
238 let minimum_capacity = public_nodes.len().saturating_mul(2).saturating_add(100);
241
242 let internal_discovery_updates_capacity = std::env::var("HOPR_INTERNAL_DISCOVERY_UPDATES_CAPACITY")
243 .ok()
244 .and_then(|s| s.trim().parse::<usize>().ok())
245 .filter(|&c| c > 0)
246 .unwrap_or(2048)
247 .max(minimum_capacity);
248
249 debug!(
250 capacity = internal_discovery_updates_capacity,
251 minimum_required = minimum_capacity,
252 "creating internal discovery updates channel"
253 );
254 let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
255 futures::channel::mpsc::channel::<PeerDiscovery>(internal_discovery_updates_capacity);
256
257 let me_peerid: PeerId = self.packet_key.public().into();
258 let network = self.network.clone();
259 let discovery_updates = futures_concurrency::stream::StreamExt::merge(
260 discovery_updates,
261 internal_discovery_update_rx,
262 )
263 .filter_map(move |event| {
264 let network = network.clone();
265 async move {
266 match event {
267 PeerDiscovery::Announce(peer, multiaddresses) => {
268 debug!(%peer, ?multiaddresses, "processing peer discovery event: Announce");
269 if peer != me_peerid {
270 let mas = multiaddresses
272 .into_iter()
273 .map(|ma| strip_p2p_protocol(&ma))
274 .filter(|v| !v.is_empty())
275 .collect::<Vec<_>>();
276
277 if !mas.is_empty() {
278 if let Err(error) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await {
279 error!(%peer, %error, "failed to add peer to the network");
280 None
281 } else {
282 Some(PeerDiscovery::Announce(peer, mas))
283 }
284 } else {
285 None
286 }
287 } else {
288 None
289 }
290 }
291 }
292 }
293 });
294
295 info!(
296 public_nodes = public_nodes.len(),
297 "initializing swarm with peers from chain"
298 );
299
300 for node_entry in public_nodes {
301 if let AccountType::Announced(multiaddresses) = node_entry.entry_type {
302 let peer: PeerId = node_entry.public_key.into();
303
304 debug!(%peer, ?multiaddresses, "using initial public node");
305
306 internal_discovery_update_tx
307 .send(PeerDiscovery::Announce(peer, multiaddresses))
308 .await
309 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
310 }
311 }
312
313 let mut processes = AbortableList::<HoprTransportProcess>::default();
314
315 let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
316 channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
317
318 let mixer_cfg = build_mixer_cfg_from_env();
320
321 let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
322
323 let transport_layer = HoprSwarm::new(
324 (&self.packet_key).into(),
325 discovery_updates,
326 self.my_multiaddresses.clone(),
327 self.cfg.transport.prefer_local_addresses,
328 )
329 .await;
330
331 let msg_proto_control =
332 transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
333 let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
334 let (wire_msg_tx, wire_msg_rx) =
335 hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
336
337 let _mixing_process_before_sending_out = spawn(
338 mixing_channel_rx
339 .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
340 .map(Ok)
341 .forward(wire_msg_tx)
342 .inspect(|_| {
343 tracing::warn!(
344 task = "mixer -> egress process",
345 "long-running background task finished"
346 )
347 }),
348 );
349
350 let (transport_events_tx, transport_events_rx) = channel::<hopr_transport_p2p::DiscoveryEvent>(2048);
351
352 let network_clone = self.network.clone();
353 spawn(
354 transport_events_rx
355 .for_each(move |event| {
356 let network = network_clone.clone();
357 async move {
358 match event {
359 hopr_transport_p2p::DiscoveryEvent::IncomingConnection(peer, multiaddr) => {
360 if let Err(error) = network
361 .add(&peer, PeerOrigin::IncomingConnection, vec![multiaddr])
362 .await
363 {
364 tracing::error!(%peer, %error, "Failed to add incoming connection peer");
365 }
366 }
367 hopr_transport_p2p::DiscoveryEvent::FailedDial(peer) => {
368 if let Err(error) = network
369 .update(&peer, Err(hopr_transport_network::network::UpdateFailure::DialFailure))
370 .await
371 {
372 tracing::error!(%peer, %error, "Failed to update peer status after failed dial");
373 }
374 }
375 }
376 }
377 })
378 .inspect(|_| {
379 tracing::warn!(
380 task = "transport events recording",
381 "long-running background task finished"
382 )
383 }),
384 );
385
386 processes.insert(
387 HoprTransportProcess::Medium,
388 spawn_as_abortable!(transport_layer.run(transport_events_tx).inspect(|_| tracing::warn!(
389 task = %HoprTransportProcess::Medium,
390 "long-running background task finished"
391 ))),
392 );
393
394 let msg_protocol_bidirectional_channel_capacity =
395 std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
396 .ok()
397 .and_then(|s| s.trim().parse::<usize>().ok())
398 .filter(|&c| c > 0)
399 .unwrap_or(16_384);
400
401 let (on_incoming_data_tx, on_incoming_data_rx) =
402 channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
403
404 debug!(
405 capacity = msg_protocol_bidirectional_channel_capacity,
406 "creating protocol bidirectional channel"
407 );
408 let (tx_from_protocol, rx_from_protocol) =
409 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
410
411 let path_planner = self.path_planner.clone();
414 let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
415 let all_resolved_external_msg_rx = unresolved_routing_msg_rx.filter_map(move |(unresolved, mut data)| {
416 let path_planner = path_planner.clone();
417 async move {
418 trace!(?unresolved, "resolving routing for packet");
419 match path_planner
420 .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
421 .await
422 {
423 Ok((resolved, rem_surbs)) => {
424 let mut signals_to_dst = data
428 .packet_info
429 .as_ref()
430 .map(|info| info.signals_to_destination)
431 .unwrap_or_default();
432
433 if resolved.is_return() {
434 signals_to_dst = match rem_surbs {
435 Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
436 signals_to_dst | PacketSignal::SurbDistress
437 }
438 Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
439 _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
440 };
441 } else {
442 signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
444 }
445
446 data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
447 trace!(?resolved, "resolved routing for packet");
448 Some((resolved, data))
449 }
450 Err(error) => {
451 error!(%error, "failed to resolve routing");
452 None
453 }
454 }
455 }
456 .in_current_span()
457 });
458
459 let channels_dst = self
460 .chain_api
461 .domain_separators()
462 .await
463 .map_err(HoprTransportError::chain)?
464 .channel;
465
466 processes.extend_from(pipeline::run_hopr_packet_pipeline(
467 (self.packet_key.clone(), self.chain_key.clone()),
468 (mixing_channel_tx, wire_msg_rx),
469 (tx_from_protocol, all_resolved_external_msg_rx),
470 HoprPipelineComponents {
471 ticket_events,
472 surb_store: self.path_planner.surb_store.clone(),
473 chain_api: self.chain_api.clone(),
474 db: self.db.clone(),
475 },
476 channels_dst,
477 self.cfg.packet,
478 ));
479
480 debug!(
482 capacity = msg_protocol_bidirectional_channel_capacity,
483 note = "same as protocol bidirectional",
484 "Creating probing channel"
485 );
486
487 let (tx_from_probing, rx_from_probing) =
488 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
489
490 let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
491 .ok()
492 .and_then(|s| s.trim().parse::<usize>().ok())
493 .filter(|&c| c > 0)
494 .unwrap_or(128);
495 debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
496 let (manual_ping_tx, manual_ping_rx) = channel::<(PeerId, PingQueryReplier)>(manual_ping_channel_capacity);
497
498 let probe = Probe::new(self.cfg.probe);
499 let probing_processes = if let Some(ct) = cover_traffic {
500 probe
501 .continuously_scan(
502 (unresolved_routing_msg_tx.clone(), rx_from_protocol),
503 manual_ping_rx,
504 tx_from_probing,
505 ct,
506 )
507 .await
508 } else {
509 probe
510 .continuously_scan(
511 (unresolved_routing_msg_tx.clone(), rx_from_protocol),
512 manual_ping_rx,
513 tx_from_probing,
514 ImmediateNeighborProber::new(
515 self.cfg.probe,
516 network_notifier::ProbeNetworkInteractions::new(self.network.clone()),
517 ),
518 )
519 .await
520 };
521
522 processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
523
524 self.ping
526 .clone()
527 .set(Pinger::new(
528 PingConfig {
529 timeout: self.cfg.probe.timeout,
530 },
531 manual_ping_tx,
532 ))
533 .expect("must set the ticket aggregation writer only once");
534
535 self.smgr
537 .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
538 .expect("failed to start session manager")
539 .into_iter()
540 .enumerate()
541 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
542 .for_each(|(k, v)| {
543 processes.insert(k, v);
544 });
545
546 let smgr = self.smgr.clone();
547 processes.insert(
548 HoprTransportProcess::SessionsManagement(0),
549 spawn_as_abortable!(
550 rx_from_probing
551 .filter_map(move |(pseudonym, data)| {
552 let smgr = smgr.clone();
553 async move {
554 match smgr.dispatch_message(pseudonym, data).await {
555 Ok(DispatchResult::Processed) => {
556 tracing::trace!("message dispatch completed");
557 None
558 }
559 Ok(DispatchResult::Unrelated(data)) => {
560 tracing::trace!("unrelated message dispatch completed");
561 Some(data)
562 }
563 Err(error) => {
564 tracing::error!(%error, "error while dispatching packet in the session manager");
565 None
566 }
567 }
568 }
569 })
570 .map(Ok)
571 .forward(on_incoming_data_tx)
572 .inspect(|_| tracing::warn!(
573 task = %HoprTransportProcess::SessionsManagement(0),
574 "long-running background task finished"
575 ))
576 ),
577 );
578
579 Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
580 }
581
582 #[tracing::instrument(level = "debug", skip(self))]
583 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
584 if peer == &self.packet_key.public().into() {
585 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
586 }
587
588 let pinger = self
589 .ping
590 .get()
591 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
592
593 if let Err(error) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
594 error!(%error, "failed to store the peer observation");
595 }
596
597 let latency = (*pinger).ping(*peer).await?;
598
599 let peer_status = self
600 .network
601 .get(peer)
602 .await?
603 .ok_or(HoprTransportError::Probe(ProbeError::NonExistingPeer))?;
604
605 Ok((latency, peer_status))
606 }
607
608 #[tracing::instrument(level = "debug", skip(self))]
609 pub async fn new_session(
610 &self,
611 destination: Address,
612 target: SessionTarget,
613 cfg: SessionClientConfig,
614 ) -> errors::Result<HoprSession> {
615 Ok(self.smgr.new_session(destination, target, cfg).await?)
616 }
617
618 #[tracing::instrument(level = "debug", skip(self))]
619 pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
620 Ok(self.smgr.ping_session(id).await?)
621 }
622
623 pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
624 Ok(self.smgr.get_surb_balancer_config(id).await?)
625 }
626
627 pub async fn update_session_surb_balancing_cfg(
628 &self,
629 id: &SessionId,
630 cfg: SurbBalancerConfig,
631 ) -> errors::Result<()> {
632 Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
633 }
634
635 #[tracing::instrument(level = "debug", skip(self))]
636 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
637 self.network
638 .get(&self.packet_key.public().into())
639 .await
640 .unwrap_or_else(|e| {
641 error!(error = %e, "failed to obtain listening multi-addresses");
642 None
643 })
644 .map(|peer| peer.multiaddresses)
645 .unwrap_or_default()
646 }
647
648 #[tracing::instrument(level = "debug", skip(self))]
649 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
650 let mut mas = self
651 .local_multiaddresses()
652 .into_iter()
653 .filter(|ma| {
654 hopr_transport_identity::multiaddrs::is_supported(ma)
655 && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
656 })
657 .map(|ma| strip_p2p_protocol(&ma))
658 .filter(|v| !v.is_empty())
659 .collect::<Vec<_>>();
660
661 mas.sort_by(|l, r| {
662 let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
663 let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
664
665 if !(is_left_dns ^ is_right_dns) {
666 std::cmp::Ordering::Equal
667 } else if is_left_dns {
668 std::cmp::Ordering::Less
669 } else {
670 std::cmp::Ordering::Greater
671 }
672 });
673
674 mas
675 }
676
677 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
678 self.my_multiaddresses.clone()
679 }
680
681 #[tracing::instrument(level = "debug", skip(self))]
682 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
683 self.network
684 .get(peer)
685 .await
686 .unwrap_or(None)
687 .map(|peer| peer.multiaddresses)
688 .unwrap_or(vec![])
689 }
690
691 #[tracing::instrument(level = "debug", skip(self))]
692 pub async fn network_health(&self) -> Health {
693 self.network.health().await
694 }
695
696 #[tracing::instrument(level = "debug", skip(self))]
697 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
698 Ok(self.network.connected_peers().await?)
699 }
700
701 #[tracing::instrument(level = "debug", skip(self))]
702 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
703 Ok(self.network.get(peer).await?)
704 }
705}
706
707fn build_mixer_cfg_from_env() -> MixerConfig {
708 let mixer_cfg = MixerConfig {
709 min_delay: std::time::Duration::from_millis(
710 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
711 .map(|v| {
712 v.trim()
713 .parse::<u64>()
714 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
715 })
716 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
717 ),
718 delay_range: std::time::Duration::from_millis(
719 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
720 .map(|v| {
721 v.trim()
722 .parse::<u64>()
723 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
724 })
725 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
726 ),
727 capacity: {
728 let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
729 .ok()
730 .and_then(|s| s.trim().parse::<usize>().ok())
731 .filter(|&c| c > 0)
732 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
733 debug!(capacity = capacity, "Setting mixer capacity");
734 capacity
735 },
736 ..MixerConfig::default()
737 };
738 debug!(?mixer_cfg, "Mixer configuration");
739
740 mixer_cfg
741}