1pub mod config;
17pub mod constants;
19pub mod errors;
21mod helpers;
22pub mod network_notifier;
23
24pub mod socket;
25
26use std::{
27 sync::{Arc, OnceLock},
28 time::Duration,
29};
30
31use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
32use futures::{
33 FutureExt, SinkExt, StreamExt,
34 channel::mpsc::{Sender, channel},
35};
36use helpers::PathPlanner;
37use hopr_api::{
38 chain::{AccountSelector, ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
39 db::{
40 HoprDbPeersOperations, HoprDbProtocolOperations, HoprDbTicketOperations, PeerOrigin, PeerStatus, TicketSelector,
41 },
42};
43use hopr_async_runtime::{AbortableList, prelude::spawn, spawn_as_abortable};
44use hopr_crypto_packet::prelude::PacketSignal;
45pub use hopr_crypto_types::{
46 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
47 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
48};
49pub use hopr_internal_types::prelude::HoprPseudonym;
50use hopr_internal_types::prelude::*;
51pub use hopr_network_types::prelude::RoutingOptions;
52use hopr_network_types::prelude::{DestinationRouting, *};
53use hopr_primitive_types::prelude::*;
54pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
55use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
56pub use hopr_transport_identity::{Multiaddr, PeerId, Protocol};
57use hopr_transport_mixer::MixerConfig;
58pub use hopr_transport_network::network::{Health, Network};
59use hopr_transport_p2p::HoprSwarm;
60use hopr_transport_probe::{
61 Probe,
62 neighbors::ImmediateNeighborProber,
63 ping::{PingConfig, Pinger},
64};
65pub use hopr_transport_probe::{
66 errors::ProbeError,
67 ping::PingQueryReplier,
68 traits::TrafficGeneration,
69 types::{NeighborTelemetry, Telemetry},
70};
71pub use hopr_transport_protocol::PeerDiscovery;
72use hopr_transport_protocol::processor::PacketInteractionConfig;
73pub use hopr_transport_session as session;
74#[cfg(feature = "runtime-tokio")]
75pub use hopr_transport_session::transfer_session;
76pub use hopr_transport_session::{
77 Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
78 SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
79 errors::{SessionManagerError, TransportSessionError},
80};
81use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
82#[cfg(feature = "mixer-stream")]
83use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
84use tracing::{Instrument, debug, error, info, trace, warn};
85
86pub use crate::{config::HoprTransportConfig, helpers::TicketStatistics};
87use crate::{constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, socket::HoprSocket};
88
89pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
90
91#[cfg(any(
92 all(feature = "mixer-channel", feature = "mixer-stream"),
93 all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
94))]
95compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
96
97lazy_static::lazy_static! {
99 static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
100}
101
102#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
103pub enum HoprTransportProcess {
104 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
105 Medium,
106 #[strum(to_string = "HOPR protocol ({0})")]
107 Protocol(hopr_transport_protocol::ProtocolProcesses),
108 #[strum(to_string = "session manager sub-process #{0}")]
109 SessionsManagement(usize),
110 #[strum(to_string = "network probing sub-process: {0}")]
111 Probing(hopr_transport_probe::HoprProbeProcess),
112}
113
114type CurrentPathSelector = NoPathSelector;
117
118pub struct HoprTransport<Db, R> {
121 me: OffchainKeypair,
122 me_peerid: PeerId, me_address: Address,
124 cfg: HoprTransportConfig,
125 db: Db,
126 resolver: R,
127 ping: Arc<OnceLock<Pinger>>,
128 network: Arc<Network<Db>>,
129 path_planner: PathPlanner<Db, R, CurrentPathSelector>,
130 my_multiaddresses: Vec<Multiaddr>,
131 smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>,
132}
133
134impl<Db, R> HoprTransport<Db, R>
135where
136 Db: HoprDbTicketOperations + HoprDbPeersOperations + HoprDbProtocolOperations + Clone + Send + Sync + 'static,
137 R: ChainReadChannelOperations
138 + ChainReadAccountOperations
139 + ChainKeyOperations
140 + ChainValues
141 + Clone
142 + Send
143 + Sync
144 + 'static,
145{
146 pub fn new(
147 me: &OffchainKeypair,
148 me_onchain: &ChainKeypair,
149 cfg: HoprTransportConfig,
150 db: Db,
151 resolver: R,
152 my_multiaddresses: Vec<Multiaddr>,
153 ) -> Self {
154 let me_peerid: PeerId = me.into();
155 let me_chain_addr = me_onchain.public().to_address();
156
157 Self {
158 me: me.clone(),
159 me_peerid,
160 me_address: me_chain_addr,
161 ping: Arc::new(OnceLock::new()),
162 network: Arc::new(Network::new(
163 me_peerid,
164 my_multiaddresses.clone(),
165 cfg.network.clone(),
166 db.clone(),
167 )),
168 path_planner: PathPlanner::new(
169 me_chain_addr,
170 db.clone(),
171 resolver.clone(),
172 CurrentPathSelector::default(),
173 ),
174 my_multiaddresses,
175 smgr: SessionManager::new(SessionManagerConfig {
176 session_tag_range: (16..65535),
178 maximum_sessions: cfg.session.maximum_sessions as usize,
179 frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
180 .ok()
181 .and_then(|s| s.parse::<usize>().ok())
182 .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
183 .max(ApplicationData::PAYLOAD_SIZE),
184 max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
185 .ok()
186 .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
187 .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
188 .max(Duration::from_millis(100)),
189 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
190 idle_timeout: cfg.session.idle_timeout,
191 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
192 initial_return_session_egress_rate: 10,
193 minimum_surb_buffer_duration: Duration::from_secs(5),
194 maximum_surb_buffer_size: db.get_surb_config().rb_capacity,
195 growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
198 }),
199 db,
200 resolver,
201 cfg,
202 }
203 }
204
205 #[allow(clippy::too_many_arguments)]
212 pub async fn run<S, Ct>(
213 &self,
214 cover_traffic: Option<Ct>,
215 discovery_updates: S,
216 on_incoming_session: Sender<IncomingSession>,
217 ) -> errors::Result<(
218 HoprSocket<
219 futures::channel::mpsc::Receiver<ApplicationDataIn>,
220 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
221 >,
222 AbortableList<HoprTransportProcess>,
223 )>
224 where
225 S: futures::Stream<Item = PeerDiscovery> + Send + 'static,
226 Ct: TrafficGeneration + Send + Sync + 'static,
227 {
228 info!("Loading initial peers from the chain");
229 let public_nodes = self
230 .resolver
231 .stream_accounts(AccountSelector {
232 public_only: true,
233 ..Default::default()
234 })
235 .await
236 .map_err(|e| HoprTransportError::Other(e.into()))?
237 .collect::<Vec<_>>()
238 .await;
239
240 let minimum_capacity = public_nodes.len().saturating_mul(2).saturating_add(100);
243
244 let internal_discovery_updates_capacity = std::env::var("HOPR_INTERNAL_DISCOVERY_UPDATES_CAPACITY")
245 .ok()
246 .and_then(|s| s.trim().parse::<usize>().ok())
247 .filter(|&c| c > 0)
248 .unwrap_or(2048)
249 .max(minimum_capacity);
250
251 debug!(
252 capacity = internal_discovery_updates_capacity,
253 minimum_required = minimum_capacity,
254 "Creating internal discovery updates channel"
255 );
256 let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
257 futures::channel::mpsc::channel::<PeerDiscovery>(internal_discovery_updates_capacity);
258
259 let me_peerid = self.me_peerid;
260 let network = self.network.clone();
261 let discovery_updates = futures_concurrency::stream::StreamExt::merge(
262 discovery_updates,
263 internal_discovery_update_rx,
264 )
265 .filter_map(move |event| {
266 let network = network.clone();
267 async move {
268 match event {
269 PeerDiscovery::Announce(peer, multiaddresses) => {
270 debug!(%peer, ?multiaddresses, "processing peer discovery event: Announce");
271 if peer != me_peerid {
272 let mas = multiaddresses
274 .into_iter()
275 .map(|ma| strip_p2p_protocol(&ma))
276 .filter(|v| !v.is_empty())
277 .collect::<Vec<_>>();
278
279 if !mas.is_empty() {
280 if let Err(error) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await {
281 error!(%peer, %error, "failed to add peer to the network");
282 None
283 } else {
284 Some(PeerDiscovery::Announce(peer, mas))
285 }
286 } else {
287 None
288 }
289 } else {
290 None
291 }
292 }
293 _ => None,
294 }
295 }
296 });
297
298 info!(
299 public_nodes = public_nodes.len(),
300 "Initializing swarm with peers from chain"
301 );
302
303 for node_entry in public_nodes {
304 if let AccountType::Announced(multiaddresses) = node_entry.entry_type {
305 let peer: PeerId = node_entry.public_key.into();
306
307 debug!(%peer, ?multiaddresses, "Using initial public node");
308
309 internal_discovery_update_tx
310 .send(PeerDiscovery::Announce(peer, multiaddresses))
311 .await
312 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
313 }
314 }
315
316 let mut processes = AbortableList::<HoprTransportProcess>::default();
317
318 let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
319 channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
320
321 let mixer_cfg = build_mixer_cfg_from_env();
323
324 #[cfg(feature = "mixer-channel")]
325 let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
326
327 #[cfg(feature = "mixer-stream")]
328 let (mixing_channel_tx, mixing_channel_rx) = {
329 let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
330 let rx = rx.then_concurrent(move |v| {
331 let cfg = mixer_cfg;
332
333 async move {
334 let random_delay = cfg.random_delay();
335 trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
336
337 #[cfg(all(feature = "prometheus", not(test)))]
338 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
339
340 sleep(random_delay).await;
341
342 #[cfg(all(feature = "prometheus", not(test)))]
343 {
344 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
345
346 let weight = 1.0f64 / cfg.metric_delay_window as f64;
347 hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
348 (weight * random_delay.as_millis() as f64)
349 + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
350 );
351 }
352
353 v
354 }
355 });
356
357 (tx, rx)
358 };
359
360 let transport_layer = HoprSwarm::new(
361 (&self.me).into(),
362 discovery_updates,
363 self.my_multiaddresses.clone(),
364 self.cfg.transport.prefer_local_addresses,
365 )
366 .await;
367
368 let msg_proto_control =
369 transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
370 let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
371 let (wire_msg_tx, wire_msg_rx) =
372 hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
373
374 let _mixing_process_before_sending_out = hopr_async_runtime::prelude::spawn(
375 mixing_channel_rx
376 .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
377 .map(Ok)
378 .forward(wire_msg_tx)
379 .inspect(|_| {
380 tracing::warn!(
381 task = "mixer -> egress process",
382 "long-running background task finished"
383 )
384 }),
385 );
386
387 let (transport_events_tx, transport_events_rx) =
388 futures::channel::mpsc::channel::<hopr_transport_p2p::DiscoveryEvent>(2048);
389
390 let network_clone = self.network.clone();
391 spawn(
392 transport_events_rx
393 .for_each(move |event| {
394 let network = network_clone.clone();
395
396 async move {
397 match event {
398 hopr_transport_p2p::DiscoveryEvent::IncomingConnection(peer, multiaddr) => {
399 if let Err(error) = network
400 .add(&peer, PeerOrigin::IncomingConnection, vec![multiaddr])
401 .await
402 {
403 tracing::error!(%peer, %error, "Failed to add incoming connection peer");
404 }
405 }
406 hopr_transport_p2p::DiscoveryEvent::FailedDial(peer) => {
407 if let Err(error) = network
408 .update(&peer, Err(hopr_transport_network::network::UpdateFailure::DialFailure))
409 .await
410 {
411 tracing::error!(%peer, %error, "Failed to update peer status after failed dial");
412 }
413 }
414 }
415 }
416 })
417 .inspect(|_| {
418 tracing::warn!(
419 task = "transport events recording",
420 "long-running background task finished"
421 )
422 }),
423 );
424
425 processes.insert(
426 HoprTransportProcess::Medium,
427 spawn_as_abortable!(transport_layer.run(transport_events_tx).inspect(|_| tracing::warn!(
428 task = %HoprTransportProcess::Medium,
429 "long-running background task finished"
430 ))),
431 );
432
433 let packet_cfg = PacketInteractionConfig {
435 packet_keypair: self.me.clone(),
436 outgoing_ticket_win_prob: self
437 .cfg
438 .protocol
439 .outgoing_ticket_winning_prob
440 .map(WinningProbability::try_from)
441 .transpose()?,
442 outgoing_ticket_price: self.cfg.protocol.outgoing_ticket_price,
443 };
444
445 let msg_protocol_bidirectional_channel_capacity =
446 std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
447 .ok()
448 .and_then(|s| s.trim().parse::<usize>().ok())
449 .filter(|&c| c > 0)
450 .unwrap_or(16_384);
451
452 let (on_incoming_data_tx, on_incoming_data_rx) =
453 channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
454
455 debug!(
456 capacity = msg_protocol_bidirectional_channel_capacity,
457 "Creating protocol bidirectional channel"
458 );
459 let (tx_from_protocol, rx_from_protocol) =
460 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
461
462 let path_planner = self.path_planner.clone();
465 let distress_threshold = self.db.get_surb_config().distress_threshold;
466 let all_resolved_external_msg_rx = unresolved_routing_msg_rx.filter_map(move |(unresolved, mut data)| {
467 let path_planner = path_planner.clone();
468 async move {
469 trace!(?unresolved, "resolving routing for packet");
470 match path_planner
471 .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
472 .await
473 {
474 Ok((resolved, rem_surbs)) => {
475 let mut signals_to_dst = data
479 .packet_info
480 .as_ref()
481 .map(|info| info.signals_to_destination)
482 .unwrap_or_default();
483
484 if resolved.is_return() {
485 signals_to_dst = match rem_surbs {
486 Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
487 signals_to_dst | PacketSignal::SurbDistress
488 }
489 Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
490 _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
491 };
492 } else {
493 signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
495 }
496
497 data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
498 trace!(?resolved, "resolved routing for packet");
499 Some((resolved, data))
500 }
501 Err(error) => {
502 error!(%error, "failed to resolve routing");
503 None
504 }
505 }
506 }
507 .in_current_span()
508 });
509
510 for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
511 packet_cfg,
512 self.db.clone(),
513 self.resolver.clone(),
514 (
515 mixing_channel_tx.with(|(peer, msg): (PeerId, Box<[u8]>)| {
516 trace!(%peer, "sending message to peer");
517 futures::future::ok::<_, hopr_transport_mixer::channel::SenderError>((peer, msg))
518 }),
519 wire_msg_rx.inspect(|(peer, _)| trace!(%peer, "received message from peer")),
520 ),
521 (tx_from_protocol, all_resolved_external_msg_rx),
522 )
523 .await
524 .into_iter()
525 {
526 processes.insert(HoprTransportProcess::Protocol(k), v);
527 }
528
529 debug!(
531 capacity = msg_protocol_bidirectional_channel_capacity,
532 note = "same as protocol bidirectional",
533 "Creating probing channel"
534 );
535
536 let (tx_from_probing, rx_from_probing) =
537 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
538
539 let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
540 .ok()
541 .and_then(|s| s.trim().parse::<usize>().ok())
542 .filter(|&c| c > 0)
543 .unwrap_or(128);
544 debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
545 let (manual_ping_tx, manual_ping_rx) = channel::<(PeerId, PingQueryReplier)>(manual_ping_channel_capacity);
546
547 let probe = Probe::new(self.cfg.probe);
548 let probing_processes = if let Some(ct) = cover_traffic {
549 probe
550 .continuously_scan(
551 (unresolved_routing_msg_tx.clone(), rx_from_protocol),
552 manual_ping_rx,
553 tx_from_probing,
554 ct,
555 )
556 .await
557 } else {
558 probe
559 .continuously_scan(
560 (unresolved_routing_msg_tx.clone(), rx_from_protocol),
561 manual_ping_rx,
562 tx_from_probing,
563 ImmediateNeighborProber::new(
564 self.cfg.probe,
565 network_notifier::ProbeNetworkInteractions::new(self.network.clone()),
566 ),
567 )
568 .await
569 };
570
571 for (k, v) in probing_processes.into_iter() {
572 processes.insert(HoprTransportProcess::Probing(k), v);
573 }
574
575 self.ping
577 .clone()
578 .set(Pinger::new(
579 PingConfig {
580 timeout: self.cfg.probe.timeout,
581 },
582 manual_ping_tx,
583 ))
584 .expect("must set the ticket aggregation writer only once");
585
586 self.smgr
588 .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
589 .expect("failed to start session manager")
590 .into_iter()
591 .enumerate()
592 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
593 .for_each(|(k, v)| {
594 processes.insert(k, v);
595 });
596
597 let smgr = self.smgr.clone();
598 processes.insert(
599 HoprTransportProcess::SessionsManagement(0),
600 spawn_as_abortable!(
601 StreamExt::filter_map(rx_from_probing, move |(pseudonym, data)| {
602 let smgr = smgr.clone();
603 async move {
604 match smgr.dispatch_message(pseudonym, data).await {
605 Ok(DispatchResult::Processed) => {
606 trace!("message dispatch completed");
607 None
608 }
609 Ok(DispatchResult::Unrelated(data)) => {
610 trace!("unrelated message dispatch completed");
611 Some(data)
612 }
613 Err(e) => {
614 error!(error = %e, "error while processing packet");
615 None
616 }
617 }
618 }
619 })
620 .map(Ok)
621 .forward(on_incoming_data_tx)
622 .inspect(|_| tracing::warn!(
623 task = %HoprTransportProcess::SessionsManagement(0),
624 "long-running background task finished"
625 ))
626 ),
627 );
628
629 Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
630 }
631
632 #[tracing::instrument(level = "debug", skip(self))]
633 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
634 if peer == &self.me_peerid {
635 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
636 }
637
638 let pinger = self
639 .ping
640 .get()
641 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
642
643 if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
644 error!(error = %e, "Failed to store the peer observation");
645 }
646
647 let latency = (*pinger).ping(*peer).await?;
648
649 let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::Probe(
650 hopr_transport_probe::errors::ProbeError::NonExistingPeer,
651 ))?;
652
653 Ok((latency, peer_status))
654 }
655
656 #[tracing::instrument(level = "debug", skip(self))]
657 pub async fn new_session(
658 &self,
659 destination: Address,
660 target: SessionTarget,
661 cfg: SessionClientConfig,
662 ) -> errors::Result<HoprSession> {
663 Ok(self.smgr.new_session(destination, target, cfg).await?)
664 }
665
666 #[tracing::instrument(level = "debug", skip(self))]
667 pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
668 Ok(self.smgr.ping_session(id).await?)
669 }
670
671 pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
672 Ok(self.smgr.get_surb_balancer_config(id).await?)
673 }
674
675 pub async fn update_session_surb_balancing_cfg(
676 &self,
677 id: &SessionId,
678 cfg: SurbBalancerConfig,
679 ) -> errors::Result<()> {
680 Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
681 }
682
683 #[tracing::instrument(level = "debug", skip(self))]
684 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
685 self.network
686 .get(&self.me_peerid)
687 .await
688 .unwrap_or_else(|e| {
689 error!(error = %e, "failed to obtain listening multi-addresses");
690 None
691 })
692 .map(|peer| peer.multiaddresses)
693 .unwrap_or_default()
694 }
695
696 #[tracing::instrument(level = "debug", skip(self))]
697 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
698 let mut mas = self
699 .local_multiaddresses()
700 .into_iter()
701 .filter(|ma| {
702 hopr_transport_identity::multiaddrs::is_supported(ma)
703 && (self.cfg.transport.announce_local_addresses || is_public_address(ma))
704 })
705 .map(|ma| strip_p2p_protocol(&ma))
706 .filter(|v| !v.is_empty())
707 .collect::<Vec<_>>();
708
709 mas.sort_by(|l, r| {
710 let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
711 let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
712
713 if !(is_left_dns ^ is_right_dns) {
714 std::cmp::Ordering::Equal
715 } else if is_left_dns {
716 std::cmp::Ordering::Less
717 } else {
718 std::cmp::Ordering::Greater
719 }
720 });
721
722 mas
723 }
724
725 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
726 self.my_multiaddresses.clone()
727 }
728
729 #[tracing::instrument(level = "debug", skip(self))]
730 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
731 self.network
732 .get(peer)
733 .await
734 .unwrap_or(None)
735 .map(|peer| peer.multiaddresses)
736 .unwrap_or(vec![])
737 }
738
739 #[tracing::instrument(level = "debug", skip(self))]
740 pub async fn network_health(&self) -> Health {
741 self.network.health().await
742 }
743
744 #[tracing::instrument(level = "debug", skip(self))]
745 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
746 Ok(self.network.connected_peers().await?)
747 }
748
749 #[tracing::instrument(level = "debug", skip(self))]
750 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
751 Ok(self.network.get(peer).await?)
752 }
753
754 #[tracing::instrument(level = "debug", skip(self))]
755 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
756 let ticket_stats = self
757 .db
758 .get_ticket_statistics(None)
759 .await
760 .map_err(|e| HoprTransportError::Other(e.into()))?;
761
762 Ok(TicketStatistics {
763 winning_count: ticket_stats.winning_tickets,
764 unredeemed_value: ticket_stats.unredeemed_value,
765 redeemed_value: ticket_stats.redeemed_value(),
766 neglected_value: ticket_stats.neglected_value(),
767 rejected_value: ticket_stats.rejected_value(),
768 })
769 }
770
771 #[tracing::instrument(level = "debug", skip(self))]
772 pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<RedeemableTicket>>> {
773 if let Some(channel) = self
774 .resolver
775 .channel_by_id(channel_id)
776 .await
777 .map_err(|e| HoprTransportError::Other(e.into()))?
778 {
779 if channel.destination == self.me_address {
780 Ok(Some(
781 self.db
782 .stream_tickets([&channel])
783 .await
784 .map_err(|e| HoprTransportError::Other(e.into()))?
785 .collect()
786 .await,
787 ))
788 } else {
789 Ok(None)
790 }
791 } else {
792 Ok(None)
793 }
794 }
795
796 #[tracing::instrument(level = "debug", skip(self))]
797 pub async fn all_tickets(&self) -> errors::Result<Vec<VerifiedTicket>> {
798 Ok(self
799 .db
800 .stream_tickets(None::<TicketSelector>)
801 .await
802 .map_err(|e| HoprTransportError::Other(e.into()))?
803 .map(|v| v.ticket)
804 .collect()
805 .await)
806 }
807}
808
809fn build_mixer_cfg_from_env() -> MixerConfig {
810 let mixer_cfg = MixerConfig {
811 min_delay: std::time::Duration::from_millis(
812 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
813 .map(|v| {
814 v.trim()
815 .parse::<u64>()
816 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
817 })
818 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
819 ),
820 delay_range: std::time::Duration::from_millis(
821 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
822 .map(|v| {
823 v.trim()
824 .parse::<u64>()
825 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
826 })
827 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
828 ),
829 capacity: {
830 let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
831 .ok()
832 .and_then(|s| s.trim().parse::<usize>().ok())
833 .filter(|&c| c > 0)
834 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
835 debug!(capacity = capacity, "Setting mixer capacity");
836 capacity
837 },
838 ..MixerConfig::default()
839 };
840 debug!(?mixer_cfg, "Mixer configuration");
841
842 mixer_cfg
843}