1pub mod config;
17pub mod constants;
19pub mod errors;
21mod helpers;
22
23#[cfg(feature = "capture")]
24mod capture;
25mod pipeline;
26pub mod socket;
27
28use std::{
29 sync::{Arc, OnceLock},
30 time::Duration,
31};
32
33use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
34use futures::{
35 FutureExt, SinkExt, StreamExt,
36 channel::mpsc::{Sender, channel},
37};
38use helpers::PathPlanner;
39use hopr_api::{
40 chain::{AccountSelector, ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
41 db::HoprDbTicketOperations,
42};
43pub use hopr_api::{
44 db::ChannelTicketStatistics,
45 network::{Health, Observable, traits::NetworkView},
46};
47use hopr_async_runtime::{AbortableList, prelude::spawn, spawn_as_abortable};
48use hopr_crypto_packet::prelude::PacketSignal;
49pub use hopr_crypto_types::{
50 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
51 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
52};
53use hopr_ct_telemetry::ImmediateNeighborChannelGraph;
54pub use hopr_internal_types::prelude::HoprPseudonym;
55use hopr_internal_types::prelude::*;
56pub use hopr_network_types::prelude::RoutingOptions;
57use hopr_network_types::prelude::{DestinationRouting, *};
58use hopr_primitive_types::prelude::*;
59pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
60use hopr_protocol_hopr::MemorySurbStore;
61use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
62pub use hopr_transport_identity::{Multiaddr, PeerId, Protocol};
63use hopr_transport_mixer::MixerConfig;
64pub use hopr_transport_network::observation::Observations;
65use hopr_transport_p2p::{HoprLibp2pNetworkBuilder, HoprNetwork};
66use hopr_transport_probe::{
67 Probe, TrafficGeneration,
68 ping::{PingConfig, Pinger},
69};
70pub use hopr_transport_probe::{errors::ProbeError, ping::PingQueryReplier};
71pub use hopr_transport_protocol::{PeerDiscovery, TicketEvent};
72pub use hopr_transport_session as session;
73#[cfg(feature = "runtime-tokio")]
74pub use hopr_transport_session::transfer_session;
75pub use hopr_transport_session::{
76 Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
77 SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
78 errors::{SessionManagerError, TransportSessionError},
79};
80use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
81use tracing::{Instrument, debug, error, info, trace, warn};
82
83pub use crate::config::HoprProtocolConfig;
84use crate::{
85 constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, pipeline::HoprPipelineComponents,
86 socket::HoprSocket,
87};
88
89pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
90
91pub use hopr_api as api;
92
93lazy_static::lazy_static! {
95 static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
96}
97
98#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
99pub enum HoprTransportProcess {
100 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
101 Medium,
102 #[strum(to_string = "HOPR packet pipeline ({0})")]
103 Pipeline(hopr_transport_protocol::PacketPipelineProcesses),
104 #[strum(to_string = "session manager sub-process #{0}")]
105 SessionsManagement(usize),
106 #[strum(to_string = "network probing sub-process: {0}")]
107 Probing(hopr_transport_probe::HoprProbeProcess),
108 #[strum(to_string = "sync of outgoing ticket indices")]
109 OutgoingIndexSync,
110 #[cfg(feature = "capture")]
111 #[strum(to_string = "packet capture")]
112 Capture,
113}
114
115type CurrentPathSelector = NoPathSelector;
118
119pub struct HoprTransport<Chain, Db> {
122 packet_key: OffchainKeypair,
123 chain_key: ChainKeypair,
124 db: Db,
125 chain_api: Chain,
126 ping: Arc<OnceLock<Pinger>>,
127 network: Arc<OnceLock<HoprNetwork>>,
128 path_planner: PathPlanner<MemorySurbStore, Chain, CurrentPathSelector>,
129 my_multiaddresses: Vec<Multiaddr>,
130 smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>,
131 cfg: HoprProtocolConfig,
132}
133
134impl<Chain, Db> HoprTransport<Chain, Db>
135where
136 Db: HoprDbTicketOperations + Clone + Send + Sync + 'static,
137 Chain: ChainReadChannelOperations
138 + ChainReadAccountOperations
139 + ChainKeyOperations
140 + ChainValues
141 + Clone
142 + Send
143 + Sync
144 + 'static,
145{
146 pub fn new(
147 identity: (&ChainKeypair, &OffchainKeypair),
148 resolver: Chain,
149 db: Db,
150 my_multiaddresses: Vec<Multiaddr>,
151 cfg: HoprProtocolConfig,
152 ) -> Self {
153 Self {
154 packet_key: identity.1.clone(),
155 chain_key: identity.0.clone(),
156 ping: Arc::new(OnceLock::new()),
157 network: Arc::new(OnceLock::new()),
158 path_planner: PathPlanner::new(
159 *identity.0.as_ref(),
160 MemorySurbStore::new(cfg.packet.surb_store),
161 resolver.clone(),
162 CurrentPathSelector::default(),
163 ),
164 my_multiaddresses,
165 smgr: SessionManager::new(SessionManagerConfig {
166 session_tag_range: (16..65535),
168 maximum_sessions: cfg.session.maximum_sessions as usize,
169 frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
170 .ok()
171 .and_then(|s| s.parse::<usize>().ok())
172 .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
173 .max(ApplicationData::PAYLOAD_SIZE),
174 max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
175 .ok()
176 .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
177 .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
178 .max(Duration::from_millis(100)),
179 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
180 idle_timeout: cfg.session.idle_timeout,
181 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
182 initial_return_session_egress_rate: 10,
183 minimum_surb_buffer_duration: Duration::from_secs(5),
184 maximum_surb_buffer_size: cfg.packet.surb_store.rb_capacity,
185 growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
188 }),
189 db,
190 chain_api: resolver,
191 cfg,
192 }
193 }
194
195 pub async fn run<S, T, Ct>(
202 &self,
203 cover_traffic: Ct,
204 discovery_updates: S,
205 ticket_events: T,
206 on_incoming_session: Sender<IncomingSession>,
207 ) -> errors::Result<(
208 HoprSocket<
209 futures::channel::mpsc::Receiver<ApplicationDataIn>,
210 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
211 >,
212 AbortableList<HoprTransportProcess>,
213 )>
214 where
215 S: futures::Stream<Item = PeerDiscovery> + Send + 'static,
216 T: futures::Sink<TicketEvent> + Clone + Send + Unpin + 'static,
217 T::Error: std::error::Error,
218 Ct: TrafficGeneration + Send + Sync + 'static,
219 {
220 info!("loading initial peers from the chain");
221 let public_nodes = self
222 .chain_api
223 .stream_accounts(AccountSelector {
224 public_only: true,
225 ..Default::default()
226 })
227 .await
228 .map_err(|e| HoprTransportError::Other(e.into()))?
229 .collect::<Vec<_>>()
230 .await;
231
232 let minimum_capacity = public_nodes.len().saturating_mul(2).saturating_add(100);
235
236 let internal_discovery_updates_capacity = std::env::var("HOPR_INTERNAL_DISCOVERY_UPDATES_CAPACITY")
237 .ok()
238 .and_then(|s| s.trim().parse::<usize>().ok())
239 .filter(|&c| c > 0)
240 .unwrap_or(2048)
241 .max(minimum_capacity);
242
243 debug!(
244 capacity = internal_discovery_updates_capacity,
245 minimum_required = minimum_capacity,
246 "creating internal discovery updates channel"
247 );
248 let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
249 futures::channel::mpsc::channel::<PeerDiscovery>(internal_discovery_updates_capacity);
250
251 let discovery_updates =
252 futures_concurrency::stream::StreamExt::merge(discovery_updates, internal_discovery_update_rx);
253
254 info!(
255 public_nodes = public_nodes.len(),
256 "initializing swarm with peers from chain"
257 );
258
259 for node_entry in public_nodes {
260 if let AccountType::Announced(multiaddresses) = node_entry.entry_type {
261 let peer: PeerId = node_entry.public_key.into();
262
263 debug!(%peer, ?multiaddresses, "using initial public node");
264
265 internal_discovery_update_tx
266 .send(PeerDiscovery::Announce(peer, multiaddresses))
267 .await
268 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
269 }
270 }
271
272 let mut processes = AbortableList::<HoprTransportProcess>::default();
273
274 let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
275 channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
276
277 let allow_private_addresses = self.cfg.transport.prefer_local_addresses;
283 let (transport_network, transport_layer_process) = HoprLibp2pNetworkBuilder::new(
284 (&self.packet_key).into(),
285 discovery_updates.filter_map(move |event| async move {
286 match event {
287 PeerDiscovery::Announce(peer, multiaddrs) => {
288 let multiaddrs = multiaddrs
289 .into_iter()
290 .filter(|ma| {
291 hopr_transport_identity::multiaddrs::is_supported(ma)
292 && (allow_private_addresses || is_public_address(ma))
293 })
294 .collect::<Vec<_>>();
295 if multiaddrs.is_empty() {
296 None
297 } else {
298 Some(PeerDiscovery::Announce(peer, multiaddrs))
299 }
300 }
301 }
302 }),
303 self.my_multiaddresses.clone(),
304 )
305 .await
306 .into_network_with_stream_protocol_process(
307 hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL,
308 allow_private_addresses,
309 );
310 self.network
311 .clone()
312 .set(transport_network.clone())
313 .map_err(|_| HoprTransportError::Api("transport network viewer already set".into()))?;
314
315 let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
316 let (wire_msg_tx, wire_msg_rx) =
317 hopr_transport_protocol::stream::process_stream_protocol(msg_codec, transport_network.clone()).await?;
318
319 let (mixing_channel_tx, mixing_channel_rx) =
320 hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(build_mixer_cfg_from_env());
321
322 let _mixing_process_before_sending_out = spawn(
324 mixing_channel_rx
325 .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
326 .map(Ok)
327 .forward(wire_msg_tx)
328 .inspect(|_| {
329 tracing::warn!(
330 task = "mixer -> egress process",
331 "long-running background task finished"
332 )
333 }),
334 );
335
336 processes.insert(
337 HoprTransportProcess::Medium,
338 spawn_as_abortable!(transport_layer_process.inspect(|_| tracing::warn!(
339 task = %HoprTransportProcess::Medium,
340 "long-running background task finished"
341 ))),
342 );
343
344 let msg_protocol_bidirectional_channel_capacity =
345 std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
346 .ok()
347 .and_then(|s| s.trim().parse::<usize>().ok())
348 .filter(|&c| c > 0)
349 .unwrap_or(16_384);
350
351 let (on_incoming_data_tx, on_incoming_data_rx) =
352 channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
353
354 debug!(
355 capacity = msg_protocol_bidirectional_channel_capacity,
356 "creating protocol bidirectional channel"
357 );
358 let (tx_from_protocol, rx_from_protocol) =
359 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
360
361 let path_planner = self.path_planner.clone();
364 let distress_threshold = self.cfg.packet.surb_store.distress_threshold;
365 let all_resolved_external_msg_rx = unresolved_routing_msg_rx.filter_map(move |(unresolved, mut data)| {
366 let path_planner = path_planner.clone();
367 async move {
368 trace!(?unresolved, "resolving routing for packet");
369 match path_planner
370 .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
371 .await
372 {
373 Ok((resolved, rem_surbs)) => {
374 let mut signals_to_dst = data
378 .packet_info
379 .as_ref()
380 .map(|info| info.signals_to_destination)
381 .unwrap_or_default();
382
383 if resolved.is_return() {
384 signals_to_dst = match rem_surbs {
385 Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
386 signals_to_dst | PacketSignal::SurbDistress
387 }
388 Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
389 _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
390 };
391 } else {
392 signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
394 }
395
396 data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
397 trace!(?resolved, "resolved routing for packet");
398 Some((resolved, data))
399 }
400 Err(error) => {
401 error!(%error, "failed to resolve routing");
402 None
403 }
404 }
405 }
406 .in_current_span()
407 });
408
409 let channels_dst = self
410 .chain_api
411 .domain_separators()
412 .await
413 .map_err(HoprTransportError::chain)?
414 .channel;
415
416 processes.extend_from(pipeline::run_hopr_packet_pipeline(
417 (self.packet_key.clone(), self.chain_key.clone()),
418 (mixing_channel_tx, wire_msg_rx),
419 (tx_from_protocol, all_resolved_external_msg_rx),
420 HoprPipelineComponents {
421 ticket_events,
422 surb_store: self.path_planner.surb_store.clone(),
423 chain_api: self.chain_api.clone(),
424 db: self.db.clone(),
425 },
426 channels_dst,
427 self.cfg.packet,
428 ));
429
430 debug!(
432 capacity = msg_protocol_bidirectional_channel_capacity,
433 note = "same as protocol bidirectional",
434 "Creating probing channel"
435 );
436
437 let (tx_from_probing, rx_from_probing) =
438 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
439
440 let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
441 .ok()
442 .and_then(|s| s.trim().parse::<usize>().ok())
443 .filter(|&c| c > 0)
444 .unwrap_or(128);
445 debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
446 let (manual_ping_tx, manual_ping_rx) = channel::<(PeerId, PingQueryReplier)>(manual_ping_channel_capacity);
447
448 let probe = Probe::new(self.cfg.probe);
449
450 let probing_processes = probe
451 .continuously_scan(
452 (unresolved_routing_msg_tx.clone(), rx_from_protocol),
453 manual_ping_rx,
454 tx_from_probing,
455 cover_traffic,
456 ImmediateNeighborChannelGraph::new(transport_network.clone(), self.cfg.probe.recheck_threshold),
457 )
458 .await;
459
460 processes.flat_map_extend_from(probing_processes, HoprTransportProcess::Probing);
461
462 self.ping
464 .clone()
465 .set(Pinger::new(
466 PingConfig {
467 timeout: self.cfg.probe.timeout,
468 },
469 manual_ping_tx,
470 ))
471 .map_err(|_| HoprTransportError::Api("must set the ticket aggregation writer only once".into()))?;
472
473 self.smgr
475 .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
476 .map_err(|_| HoprTransportError::Api("failed to start session manager".into()))?
477 .into_iter()
478 .enumerate()
479 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
480 .for_each(|(k, v)| {
481 processes.insert(k, v);
482 });
483
484 let smgr = self.smgr.clone();
485 processes.insert(
486 HoprTransportProcess::SessionsManagement(0),
487 spawn_as_abortable!(
488 rx_from_probing
489 .filter_map(move |(pseudonym, data)| {
490 let smgr = smgr.clone();
491 async move {
492 match smgr.dispatch_message(pseudonym, data).await {
493 Ok(DispatchResult::Processed) => {
494 tracing::trace!("message dispatch completed");
495 None
496 }
497 Ok(DispatchResult::Unrelated(data)) => {
498 tracing::trace!("unrelated message dispatch completed");
499 Some(data)
500 }
501 Err(error) => {
502 tracing::error!(%error, "error while dispatching packet in the session manager");
503 None
504 }
505 }
506 }
507 })
508 .map(Ok)
509 .forward(on_incoming_data_tx)
510 .inspect(|_| tracing::warn!(
511 task = %HoprTransportProcess::SessionsManagement(0),
512 "long-running background task finished"
513 ))
514 ),
515 );
516
517 Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
518 }
519
520 #[tracing::instrument(level = "debug", skip(self))]
521 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, Observations)> {
522 if peer == &self.packet_key.public().into() {
523 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
524 }
525
526 let pinger = self
527 .ping
528 .get()
529 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
530
531 let network = self
532 .network
533 .get()
534 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))?;
535
536 let latency = (*pinger).ping(*peer).await?;
537
538 let observations = network
539 .observations_for(peer)
540 .ok_or(HoprTransportError::Probe(ProbeError::NonExistingPeer))?;
541
542 Ok((latency, observations))
543 }
544
545 #[tracing::instrument(level = "debug", skip(self))]
546 pub async fn new_session(
547 &self,
548 destination: Address,
549 target: SessionTarget,
550 cfg: SessionClientConfig,
551 ) -> errors::Result<HoprSession> {
552 Ok(self.smgr.new_session(destination, target, cfg).await?)
553 }
554
555 #[tracing::instrument(level = "debug", skip(self))]
556 pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
557 Ok(self.smgr.ping_session(id).await?)
558 }
559
560 pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
561 Ok(self.smgr.get_surb_balancer_config(id).await?)
562 }
563
564 pub async fn update_session_surb_balancing_cfg(
565 &self,
566 id: &SessionId,
567 cfg: SurbBalancerConfig,
568 ) -> errors::Result<()> {
569 Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
570 }
571
572 #[tracing::instrument(level = "debug", skip(self))]
573 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
574 self.network
575 .get()
576 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
577 .map(|network| network.listening_as().into_iter().collect())
578 .unwrap_or_default()
579 }
580
581 #[tracing::instrument(level = "debug", skip(self))]
582 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
583 let mut mas = self
584 .local_multiaddresses()
585 .into_iter()
586 .filter(|ma| {
587 hopr_transport_identity::multiaddrs::is_supported(ma)
588 && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
589 })
590 .map(|ma| strip_p2p_protocol(&ma))
591 .filter(|v| !v.is_empty())
592 .collect::<Vec<_>>();
593
594 mas.sort_by(|l, r| {
595 let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
596 let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
597
598 if !(is_left_dns ^ is_right_dns) {
599 std::cmp::Ordering::Equal
600 } else if is_left_dns {
601 std::cmp::Ordering::Less
602 } else {
603 std::cmp::Ordering::Greater
604 }
605 });
606
607 mas
608 }
609
610 #[tracing::instrument(level = "debug", skip(self))]
611 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
612 self.network
613 .get()
614 .map(|network| network.listening_as().into_iter().collect())
615 .unwrap_or_else(|| {
616 tracing::error!("transport network is not yet initialized, cannot fetch announced multiaddresses");
617 self.my_multiaddresses.clone()
618 })
619 }
620
621 #[tracing::instrument(level = "debug", skip(self))]
622 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
623 match self
624 .network
625 .get()
626 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
627 {
628 Ok(network) => network.multiaddress_of(peer).unwrap_or_default().into_iter().collect(),
629 Err(error) => {
630 tracing::error!(%error, "failed to get observed multiaddresses");
631 return vec![];
632 }
633 }
634 }
635
636 #[tracing::instrument(level = "debug", skip(self))]
637 pub async fn network_health(&self) -> Health {
638 self.network
639 .get()
640 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))
641 .map(|network| network.health())
642 .unwrap_or(Health::Red)
643 }
644
645 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
646 Ok(self
647 .network
648 .get()
649 .ok_or_else(|| {
650 tracing::error!("transport network is not yet initialized");
651 HoprTransportError::Api("transport network is not yet initialized".into())
652 })?
653 .connected_peers()
654 .into_iter()
655 .collect())
656 }
657
658 #[tracing::instrument(level = "debug", skip(self))]
659 pub async fn network_peer_observations(&self, peer: &PeerId) -> errors::Result<Option<Observations>> {
660 Ok(self
661 .network
662 .get()
663 .ok_or_else(|| HoprTransportError::Api("transport network is not yet initialized".into()))?
664 .observations_for(peer))
665 }
666}
667
668fn build_mixer_cfg_from_env() -> MixerConfig {
669 let mixer_cfg = MixerConfig {
670 min_delay: std::time::Duration::from_millis(
671 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
672 .map(|v| {
673 v.trim()
674 .parse::<u64>()
675 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
676 })
677 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
678 ),
679 delay_range: std::time::Duration::from_millis(
680 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
681 .map(|v| {
682 v.trim()
683 .parse::<u64>()
684 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
685 })
686 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
687 ),
688 capacity: {
689 let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
690 .ok()
691 .and_then(|s| s.trim().parse::<usize>().ok())
692 .filter(|&c| c > 0)
693 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
694 debug!(capacity = capacity, "Setting mixer capacity");
695 capacity
696 },
697 ..MixerConfig::default()
698 };
699 debug!(?mixer_cfg, "Mixer configuration");
700
701 mixer_cfg
702}