1pub mod config;
17pub mod constants;
19pub mod errors;
21pub mod helpers;
22pub mod network_notifier;
23
24pub mod socket;
25
26use std::{
27 collections::HashMap,
28 sync::{Arc, OnceLock},
29 time::Duration,
30};
31
32use async_lock::RwLock;
33use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
34use futures::{
35 FutureExt, SinkExt, StreamExt,
36 channel::mpsc::{Sender, channel},
37};
38use helpers::PathPlanner;
39use hopr_api::{
40 chain::{AccountSelector, ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
41 db::{HoprDbPeersOperations, HoprDbProtocolOperations, HoprDbTicketOperations, PeerOrigin, PeerStatus},
42};
43use hopr_async_runtime::{AbortHandle, prelude::spawn, spawn_as_abortable};
44use hopr_crypto_packet::prelude::PacketSignal;
45pub use hopr_crypto_types::{
46 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
47 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
48};
49pub use hopr_internal_types::prelude::HoprPseudonym;
50use hopr_internal_types::prelude::*;
51use hopr_network_types::prelude::DestinationRouting;
52pub use hopr_network_types::prelude::RoutingOptions;
53use hopr_path::selectors::dfs::{DfsPathSelector, DfsPathSelectorConfig, RandomizedEdgeWeighting};
54use hopr_primitive_types::prelude::*;
55pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
56use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
57pub use hopr_transport_identity::{Multiaddr, PeerId, Protocol};
58use hopr_transport_mixer::MixerConfig;
59pub use hopr_transport_network::network::{Health, Network};
60use hopr_transport_p2p::HoprSwarm;
61use hopr_transport_probe::{
62 Probe,
63 neighbors::ImmediateNeighborProber,
64 ping::{PingConfig, Pinger},
65};
66pub use hopr_transport_probe::{
67 errors::ProbeError,
68 ping::PingQueryReplier,
69 traits::TrafficGeneration,
70 types::{NeighborTelemetry, Telemetry},
71};
72use hopr_transport_protocol::processor::PacketInteractionConfig;
73pub use hopr_transport_protocol::{PeerDiscovery, execute_on_tick};
74pub use hopr_transport_session as session;
75#[cfg(feature = "runtime-tokio")]
76pub use hopr_transport_session::transfer_session;
77pub use hopr_transport_session::{
78 Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
79 SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
80 errors::{SessionManagerError, TransportSessionError},
81};
82use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
83#[cfg(feature = "mixer-stream")]
84use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
85use tracing::{Instrument, debug, error, info, trace, warn};
86
87pub use crate::{
88 config::HoprTransportConfig,
89 helpers::{PeerEligibility, TicketStatistics},
90};
91use crate::{constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, socket::HoprSocket};
92
93pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
94
95#[cfg(any(
96 all(feature = "mixer-channel", feature = "mixer-stream"),
97 all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
98))]
99compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
100
101lazy_static::lazy_static! {
103 static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
104}
105
106#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
107pub enum HoprTransportProcess {
108 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
109 Medium,
110 #[strum(to_string = "HOPR protocol ({0})")]
111 Protocol(hopr_transport_protocol::ProtocolProcesses),
112 #[strum(to_string = "session manager sub-process #{0}")]
113 SessionsManagement(usize),
114 #[strum(to_string = "network probing sub-process: {0}")]
115 Probing(hopr_transport_probe::HoprProbeProcess),
116}
117
118type CurrentPathSelector = DfsPathSelector<RandomizedEdgeWeighting>;
120
121pub struct HoprTransport<Db, R> {
124 me: OffchainKeypair,
125 me_peerid: PeerId, me_address: Address,
127 cfg: HoprTransportConfig,
128 db: Db,
129 resolver: R,
130 ping: Arc<OnceLock<Pinger>>,
131 network: Arc<Network<Db>>,
132 path_planner: PathPlanner<Db, R, CurrentPathSelector>,
133 my_multiaddresses: Vec<Multiaddr>,
134 smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>,
135}
136
137impl<Db, R> HoprTransport<Db, R>
138where
139 Db: HoprDbTicketOperations + HoprDbPeersOperations + HoprDbProtocolOperations + Clone + Send + Sync + 'static,
140 R: ChainReadChannelOperations
141 + ChainReadAccountOperations
142 + ChainKeyOperations
143 + ChainValues
144 + Clone
145 + Send
146 + Sync
147 + 'static,
148{
149 pub fn new(
150 me: &OffchainKeypair,
151 me_onchain: &ChainKeypair,
152 cfg: HoprTransportConfig,
153 db: Db,
154 resolver: R,
155 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
156 my_multiaddresses: Vec<Multiaddr>,
157 ) -> Self {
158 let me_peerid: PeerId = me.into();
159 let me_chain_addr = me_onchain.public().to_address();
160
161 Self {
162 me: me.clone(),
163 me_peerid,
164 me_address: me_chain_addr,
165 ping: Arc::new(OnceLock::new()),
166 network: Arc::new(Network::new(
167 me_peerid,
168 my_multiaddresses.clone(),
169 cfg.network.clone(),
170 db.clone(),
171 )),
172 path_planner: PathPlanner::new(
173 me_chain_addr,
174 db.clone(),
175 resolver.clone(),
176 CurrentPathSelector::new(
177 channel_graph.clone(),
178 DfsPathSelectorConfig {
179 node_score_threshold: cfg.network.node_score_auto_path_threshold,
180 max_first_hop_latency: cfg.network.max_first_hop_latency_threshold,
181 ..Default::default()
182 },
183 ),
184 channel_graph.clone(),
185 ),
186 my_multiaddresses,
187 smgr: SessionManager::new(SessionManagerConfig {
188 session_tag_range: (16..65535),
190 maximum_sessions: cfg.session.maximum_sessions as usize,
191 frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
192 .ok()
193 .and_then(|s| s.parse::<usize>().ok())
194 .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
195 .max(ApplicationData::PAYLOAD_SIZE),
196 max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
197 .ok()
198 .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
199 .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
200 .max(Duration::from_millis(100)),
201 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
202 idle_timeout: cfg.session.idle_timeout,
203 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
204 initial_return_session_egress_rate: 10,
205 minimum_surb_buffer_duration: Duration::from_secs(5),
206 maximum_surb_buffer_size: db.get_surb_config().rb_capacity,
207 growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
210 }),
211 db,
212 resolver,
213 cfg,
214 }
215 }
216
217 #[allow(clippy::too_many_arguments)]
224 pub async fn run<S, Ct>(
225 &self,
226 cover_traffic: Option<Ct>,
227 discovery_updates: S,
228 on_incoming_session: Sender<IncomingSession>,
229 ) -> crate::errors::Result<(
230 HoprSocket<
231 futures::channel::mpsc::Receiver<ApplicationDataIn>,
232 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
233 >,
234 HashMap<HoprTransportProcess, AbortHandle>,
235 )>
236 where
237 S: futures::Stream<Item = PeerDiscovery> + Send + 'static,
238 Ct: TrafficGeneration + Send + Sync + 'static,
239 {
240 info!("Loading initial peers from the chain");
241 let public_nodes = self
242 .resolver
243 .stream_accounts(AccountSelector {
244 public_only: true,
245 ..Default::default()
246 })
247 .await
248 .map_err(|e| HoprTransportError::Other(e.into()))?
249 .collect::<Vec<_>>()
250 .await;
251
252 let minimum_capacity = public_nodes.len().saturating_mul(2).saturating_add(100);
255
256 let internal_discovery_updates_capacity = std::env::var("HOPR_INTERNAL_DISCOVERY_UPDATES_CAPACITY")
257 .ok()
258 .and_then(|s| s.trim().parse::<usize>().ok())
259 .filter(|&c| c > 0)
260 .unwrap_or(2048)
261 .max(minimum_capacity);
262
263 debug!(
264 capacity = internal_discovery_updates_capacity,
265 minimum_required = minimum_capacity,
266 "Creating internal discovery updates channel"
267 );
268 let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
269 futures::channel::mpsc::channel::<PeerDiscovery>(internal_discovery_updates_capacity);
270
271 let me_peerid = self.me_peerid;
272 let network = self.network.clone();
273 let discovery_updates = futures_concurrency::stream::StreamExt::merge(
274 discovery_updates,
275 internal_discovery_update_rx,
276 )
277 .filter_map(move |event| {
278 let network = network.clone();
279 async move {
280 match event {
281 PeerDiscovery::Announce(peer, multiaddresses) => {
282 debug!(%peer, ?multiaddresses, "processing peer discovery event: Announce");
283 if peer != me_peerid {
284 let mas = multiaddresses
286 .into_iter()
287 .map(|ma| strip_p2p_protocol(&ma))
288 .filter(|v| !v.is_empty())
289 .collect::<Vec<_>>();
290
291 if !mas.is_empty() {
292 if let Err(error) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await {
293 error!(%peer, %error, "failed to add peer to the network");
294 None
295 } else {
296 Some(PeerDiscovery::Announce(peer, mas))
297 }
298 } else {
299 None
300 }
301 } else {
302 None
303 }
304 }
305 _ => None,
306 }
307 }
308 });
309
310 info!(
311 public_nodes = public_nodes.len(),
312 "Initializing swarm with peers from chain"
313 );
314
315 for node_entry in public_nodes {
316 if let AccountType::Announced { multiaddr, .. } = node_entry.entry_type {
317 let peer: PeerId = node_entry.public_key.into();
318 let multiaddresses = vec![multiaddr];
319
320 debug!(%peer, ?multiaddresses, "Using initial public node");
321
322 internal_discovery_update_tx
323 .send(PeerDiscovery::Announce(peer, multiaddresses.clone()))
324 .await
325 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
326 }
327 }
328
329 let mut processes: HashMap<HoprTransportProcess, AbortHandle> = HashMap::new();
330
331 let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
332 channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
333
334 let mixer_cfg = build_mixer_cfg_from_env();
336
337 #[cfg(feature = "mixer-channel")]
338 let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
339
340 #[cfg(feature = "mixer-stream")]
341 let (mixing_channel_tx, mixing_channel_rx) = {
342 let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
343 let rx = rx.then_concurrent(move |v| {
344 let cfg = mixer_cfg;
345
346 async move {
347 let random_delay = cfg.random_delay();
348 trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
349
350 #[cfg(all(feature = "prometheus", not(test)))]
351 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
352
353 sleep(random_delay).await;
354
355 #[cfg(all(feature = "prometheus", not(test)))]
356 {
357 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
358
359 let weight = 1.0f64 / cfg.metric_delay_window as f64;
360 hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
361 (weight * random_delay.as_millis() as f64)
362 + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
363 );
364 }
365
366 v
367 }
368 });
369
370 (tx, rx)
371 };
372
373 let transport_layer =
374 HoprSwarm::new((&self.me).into(), discovery_updates, self.my_multiaddresses.clone()).await;
375
376 let msg_proto_control =
377 transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
378 let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
379 let (wire_msg_tx, wire_msg_rx) =
380 hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
381
382 let _mixing_process_before_sending_out = hopr_async_runtime::prelude::spawn(
383 mixing_channel_rx
384 .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
385 .map(Ok)
386 .forward(wire_msg_tx)
387 .inspect(|_| {
388 tracing::warn!(
389 task = "mixer -> egress process",
390 "long-running background task finished"
391 )
392 }),
393 );
394
395 let (transport_events_tx, transport_events_rx) =
396 futures::channel::mpsc::channel::<hopr_transport_p2p::DiscoveryEvent>(2048);
397
398 let network_clone = self.network.clone();
399 spawn(
400 transport_events_rx
401 .for_each(move |event| {
402 let network = network_clone.clone();
403
404 async move {
405 match event {
406 hopr_transport_p2p::DiscoveryEvent::IncomingConnection(peer, multiaddr) => {
407 if let Err(error) = network
408 .add(&peer, PeerOrigin::IncomingConnection, vec![multiaddr])
409 .await
410 {
411 tracing::error!(%peer, %error, "Failed to add incoming connection peer");
412 }
413 }
414 hopr_transport_p2p::DiscoveryEvent::FailedDial(peer) => {
415 if let Err(error) = network
416 .update(&peer, Err(hopr_transport_network::network::UpdateFailure::DialFailure))
417 .await
418 {
419 tracing::error!(%peer, %error, "Failed to update peer status after failed dial");
420 }
421 }
422 }
423 }
424 })
425 .inspect(|_| {
426 tracing::warn!(
427 task = "transport events recording",
428 "long-running background task finished"
429 )
430 }),
431 );
432
433 processes.insert(
434 HoprTransportProcess::Medium,
435 spawn_as_abortable!(transport_layer.run(transport_events_tx).inspect(|_| tracing::warn!(
436 task = %HoprTransportProcess::Medium,
437 "long-running background task finished"
438 ))),
439 );
440
441 let packet_cfg = PacketInteractionConfig {
443 packet_keypair: self.me.clone(),
444 outgoing_ticket_win_prob: self
445 .cfg
446 .protocol
447 .outgoing_ticket_winning_prob
448 .map(WinningProbability::try_from)
449 .transpose()?,
450 outgoing_ticket_price: self.cfg.protocol.outgoing_ticket_price,
451 };
452
453 let msg_protocol_bidirectional_channel_capacity =
454 std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
455 .ok()
456 .and_then(|s| s.trim().parse::<usize>().ok())
457 .filter(|&c| c > 0)
458 .unwrap_or(16_384);
459
460 let (on_incoming_data_tx, on_incoming_data_rx) =
461 channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
462
463 debug!(
464 capacity = msg_protocol_bidirectional_channel_capacity,
465 "Creating protocol bidirectional channel"
466 );
467 let (tx_from_protocol, rx_from_protocol) =
468 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
469
470 let path_planner = self.path_planner.clone();
473 let distress_threshold = self.db.get_surb_config().distress_threshold;
474 let all_resolved_external_msg_rx = unresolved_routing_msg_rx.filter_map(move |(unresolved, mut data)| {
475 let path_planner = path_planner.clone();
476 async move {
477 trace!(?unresolved, "resolving routing for packet");
478 match path_planner
479 .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
480 .await
481 {
482 Ok((resolved, rem_surbs)) => {
483 let mut signals_to_dst = data
487 .packet_info
488 .as_ref()
489 .map(|info| info.signals_to_destination)
490 .unwrap_or_default();
491
492 if resolved.is_return() {
493 signals_to_dst = match rem_surbs {
494 Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
495 signals_to_dst | PacketSignal::SurbDistress
496 }
497 Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
498 _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
499 };
500 } else {
501 signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
503 }
504
505 data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
506 trace!(?resolved, "resolved routing for packet");
507 Some((resolved, data))
508 }
509 Err(error) => {
510 error!(%error, "failed to resolve routing");
511 None
512 }
513 }
514 }
515 .in_current_span()
516 });
517
518 for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
519 packet_cfg,
520 self.db.clone(),
521 self.resolver.clone(),
522 (
523 mixing_channel_tx.with(|(peer, msg): (PeerId, Box<[u8]>)| {
524 trace!(%peer, "sending message to peer");
525 futures::future::ok::<_, hopr_transport_mixer::channel::SenderError>((peer, msg))
526 }),
527 wire_msg_rx.inspect(|(peer, _)| trace!(%peer, "received message from peer")),
528 ),
529 (tx_from_protocol, all_resolved_external_msg_rx),
530 )
531 .await
532 .into_iter()
533 {
534 processes.insert(HoprTransportProcess::Protocol(k), v);
535 }
536
537 debug!(
539 capacity = msg_protocol_bidirectional_channel_capacity,
540 note = "same as protocol bidirectional",
541 "Creating probing channel"
542 );
543
544 let (tx_from_probing, rx_from_probing) =
545 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
546
547 let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
548 .ok()
549 .and_then(|s| s.trim().parse::<usize>().ok())
550 .filter(|&c| c > 0)
551 .unwrap_or(128);
552 debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
553 let (manual_ping_tx, manual_ping_rx) = channel::<(PeerId, PingQueryReplier)>(manual_ping_channel_capacity);
554
555 let probe = Probe::new(self.cfg.probe);
556 let probing_processes = if let Some(ct) = cover_traffic {
557 probe
558 .continuously_scan(
559 (unresolved_routing_msg_tx.clone(), rx_from_protocol),
560 manual_ping_rx,
561 tx_from_probing,
562 ct,
563 )
564 .await
565 } else {
566 probe
567 .continuously_scan(
568 (unresolved_routing_msg_tx.clone(), rx_from_protocol),
569 manual_ping_rx,
570 tx_from_probing,
571 ImmediateNeighborProber::new(
572 self.cfg.probe,
573 network_notifier::ProbeNetworkInteractions::new(
574 self.network.clone(),
575 self.resolver.clone(),
576 self.path_planner.channel_graph(),
577 ),
578 ),
579 )
580 .await
581 };
582
583 for (k, v) in probing_processes.into_iter() {
584 processes.insert(HoprTransportProcess::Probing(k), v);
585 }
586
587 self.ping
589 .clone()
590 .set(Pinger::new(
591 PingConfig {
592 timeout: self.cfg.probe.timeout,
593 },
594 manual_ping_tx,
595 ))
596 .expect("must set the ticket aggregation writer only once");
597
598 self.smgr
600 .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
601 .expect("failed to start session manager")
602 .into_iter()
603 .enumerate()
604 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
605 .for_each(|(k, v)| {
606 processes.insert(k, v);
607 });
608
609 let smgr = self.smgr.clone();
610 processes.insert(
611 HoprTransportProcess::SessionsManagement(0),
612 spawn_as_abortable!(
613 StreamExt::filter_map(rx_from_probing, move |(pseudonym, data)| {
614 let smgr = smgr.clone();
615 async move {
616 match smgr.dispatch_message(pseudonym, data).await {
617 Ok(DispatchResult::Processed) => {
618 trace!("message dispatch completed");
619 None
620 }
621 Ok(DispatchResult::Unrelated(data)) => {
622 trace!("unrelated message dispatch completed");
623 Some(data)
624 }
625 Err(e) => {
626 error!(error = %e, "error while processing packet");
627 None
628 }
629 }
630 }
631 })
632 .map(Ok)
633 .forward(on_incoming_data_tx)
634 .inspect(|_| tracing::warn!(
635 task = %HoprTransportProcess::SessionsManagement(0),
636 "long-running background task finished"
637 ))
638 ),
639 );
640
641 Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
642 }
643
644 #[tracing::instrument(level = "debug", skip(self))]
645 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
646 if peer == &self.me_peerid {
647 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
648 }
649
650 let pinger = self
651 .ping
652 .get()
653 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
654
655 if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
656 error!(error = %e, "Failed to store the peer observation");
657 }
658
659 let latency = (*pinger).ping(*peer).await?;
660
661 let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::Probe(
662 hopr_transport_probe::errors::ProbeError::NonExistingPeer,
663 ))?;
664
665 Ok((latency, peer_status))
666 }
667
668 #[tracing::instrument(level = "debug", skip(self))]
669 pub async fn new_session(
670 &self,
671 destination: Address,
672 target: SessionTarget,
673 cfg: SessionClientConfig,
674 ) -> errors::Result<HoprSession> {
675 Ok(self.smgr.new_session(destination, target, cfg).await?)
676 }
677
678 #[tracing::instrument(level = "debug", skip(self))]
679 pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
680 Ok(self.smgr.ping_session(id).await?)
681 }
682
683 pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
684 Ok(self.smgr.get_surb_balancer_config(id).await?)
685 }
686
687 pub async fn update_session_surb_balancing_cfg(
688 &self,
689 id: &SessionId,
690 cfg: SurbBalancerConfig,
691 ) -> errors::Result<()> {
692 Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
693 }
694
695 #[tracing::instrument(level = "debug", skip(self))]
696 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
697 self.network
698 .get(&self.me_peerid)
699 .await
700 .unwrap_or_else(|e| {
701 error!(error = %e, "failed to obtain listening multi-addresses");
702 None
703 })
704 .map(|peer| peer.multiaddresses)
705 .unwrap_or_default()
706 }
707
708 #[tracing::instrument(level = "debug", skip(self))]
709 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
710 let mut mas = self
711 .local_multiaddresses()
712 .into_iter()
713 .filter(|ma| {
714 hopr_transport_identity::multiaddrs::is_supported(ma)
715 && (self.cfg.transport.announce_local_addresses
716 || !hopr_transport_identity::multiaddrs::is_private(ma))
717 })
718 .map(|ma| strip_p2p_protocol(&ma))
719 .filter(|v| !v.is_empty())
720 .collect::<Vec<_>>();
721
722 mas.sort_by(|l, r| {
723 let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
724 let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
725
726 if !(is_left_dns ^ is_right_dns) {
727 std::cmp::Ordering::Equal
728 } else if is_left_dns {
729 std::cmp::Ordering::Less
730 } else {
731 std::cmp::Ordering::Greater
732 }
733 });
734
735 mas
736 }
737
738 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
739 self.my_multiaddresses.clone()
740 }
741
742 #[tracing::instrument(level = "debug", skip(self))]
743 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
744 self.network
745 .get(peer)
746 .await
747 .unwrap_or(None)
748 .map(|peer| peer.multiaddresses)
749 .unwrap_or(vec![])
750 }
751
752 #[tracing::instrument(level = "debug", skip(self))]
753 pub async fn network_health(&self) -> Health {
754 self.network.health().await
755 }
756
757 #[tracing::instrument(level = "debug", skip(self))]
758 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
759 Ok(self.network.connected_peers().await?)
760 }
761
762 #[tracing::instrument(level = "debug", skip(self))]
763 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
764 Ok(self.network.get(peer).await?)
765 }
766
767 #[tracing::instrument(level = "debug", skip(self))]
768 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
769 let ticket_stats = self
770 .db
771 .get_ticket_statistics(None)
772 .await
773 .map_err(|e| HoprTransportError::Other(e.into()))?;
774
775 Ok(TicketStatistics {
776 winning_count: ticket_stats.winning_tickets,
777 unredeemed_value: ticket_stats.unredeemed_value,
778 redeemed_value: ticket_stats.redeemed_value,
779 neglected_value: ticket_stats.neglected_value,
780 rejected_value: ticket_stats.rejected_value,
781 })
782 }
783
784 #[tracing::instrument(level = "debug", skip(self))]
785 pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
786 if let Some(channel) = self
787 .resolver
788 .channel_by_id(channel_id)
789 .await
790 .map_err(|e| HoprTransportError::Other(e.into()))?
791 {
792 if channel.destination == self.me_address {
793 Ok(Some(
794 self.db
795 .stream_tickets(Some((&channel).into()))
796 .await
797 .map_err(|e| HoprTransportError::Other(e.into()))?
798 .collect()
799 .await,
800 ))
801 } else {
802 Ok(None)
803 }
804 } else {
805 Ok(None)
806 }
807 }
808
809 #[tracing::instrument(level = "debug", skip(self))]
810 pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
811 Ok(self
812 .db
813 .stream_tickets(None)
814 .await
815 .map_err(|e| HoprTransportError::Other(e.into()))?
816 .map(|v| v.ticket.leak())
817 .collect()
818 .await)
819 }
820}
821
822fn build_mixer_cfg_from_env() -> MixerConfig {
823 let mixer_cfg = MixerConfig {
824 min_delay: std::time::Duration::from_millis(
825 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
826 .map(|v| {
827 v.trim()
828 .parse::<u64>()
829 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
830 })
831 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
832 ),
833 delay_range: std::time::Duration::from_millis(
834 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
835 .map(|v| {
836 v.trim()
837 .parse::<u64>()
838 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
839 })
840 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
841 ),
842 capacity: {
843 let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
844 .ok()
845 .and_then(|s| s.trim().parse::<usize>().ok())
846 .filter(|&c| c > 0)
847 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
848 debug!(capacity = capacity, "Setting mixer capacity");
849 capacity
850 },
851 ..MixerConfig::default()
852 };
853 debug!(?mixer_cfg, "Mixer configuration");
854
855 mixer_cfg
856}