1pub mod config;
17pub mod constants;
19pub mod errors;
21pub mod helpers;
22pub mod network_notifier;
23
24pub mod socket;
25
26use std::{
27 collections::{HashMap, HashSet},
28 sync::{Arc, OnceLock},
29 time::Duration,
30};
31
32use async_lock::RwLock;
33use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
34use futures::{
35 FutureExt, SinkExt, StreamExt,
36 channel::mpsc::{Sender, channel},
37};
38use futures_concurrency::stream::StreamExt as ConcurrentStreamExt;
39use helpers::PathPlanner;
40use hopr_api::{
41 chain::{AccountSelector, ChainKeyOperations, ChainReadAccountOperations, ChainReadChannelOperations, ChainValues},
42 db::{HoprDbPeersOperations, HoprDbProtocolOperations, HoprDbTicketOperations, PeerOrigin, PeerStatus},
43};
44use hopr_async_runtime::{AbortHandle, prelude::spawn, spawn_as_abortable};
45use hopr_crypto_packet::prelude::PacketSignal;
46pub use hopr_crypto_types::{
47 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
48 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
49};
50pub use hopr_internal_types::prelude::HoprPseudonym;
51use hopr_internal_types::prelude::*;
52pub use hopr_network_types::prelude::RoutingOptions;
53use hopr_network_types::prelude::{DestinationRouting, ResolvedTransportRouting};
54use hopr_path::selectors::dfs::{DfsPathSelector, DfsPathSelectorConfig, RandomizedEdgeWeighting};
55use hopr_primitive_types::prelude::*;
56pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
57use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
58pub use hopr_transport_identity::{Multiaddr, PeerId};
59use hopr_transport_mixer::MixerConfig;
60pub use hopr_transport_network::network::{Health, Network};
61use hopr_transport_p2p::HoprSwarm;
62use hopr_transport_probe::{
63 DbProxy, Probe,
64 ping::{PingConfig, Pinger},
65};
66pub use hopr_transport_probe::{errors::ProbeError, ping::PingQueryReplier};
67use hopr_transport_protocol::processor::PacketInteractionConfig;
68pub use hopr_transport_protocol::{PeerDiscovery, execute_on_tick};
69pub use hopr_transport_session as session;
70#[cfg(feature = "runtime-tokio")]
71pub use hopr_transport_session::transfer_session;
72pub use hopr_transport_session::{
73 Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
74 SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
75 errors::{SessionManagerError, TransportSessionError},
76};
77use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
78use rand::seq::SliceRandom;
79#[cfg(feature = "mixer-stream")]
80use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
81use tracing::{Instrument, debug, error, info, trace, warn};
82
83pub use crate::{
84 config::HoprTransportConfig,
85 helpers::{PeerEligibility, TicketStatistics},
86};
87use crate::{constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, socket::HoprSocket};
88
89pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
90
91#[cfg(any(
92 all(feature = "mixer-channel", feature = "mixer-stream"),
93 all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
94))]
95compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
96
97lazy_static::lazy_static! {
99 static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
100}
101
102#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
103pub enum HoprTransportProcess {
104 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
105 Medium,
106 #[strum(to_string = "HOPR protocol ({0})")]
107 Protocol(hopr_transport_protocol::ProtocolProcesses),
108 #[strum(to_string = "session manager sub-process #{0}")]
109 SessionsManagement(usize),
110 #[strum(to_string = "network probing sub-process: {0}")]
111 Probing(hopr_transport_probe::HoprProbeProcess),
112}
113
114type CurrentPathSelector = DfsPathSelector<RandomizedEdgeWeighting>;
116
117pub struct HoprTransport<Db, R> {
120 me: OffchainKeypair,
121 me_peerid: PeerId, me_address: Address,
123 cfg: HoprTransportConfig,
124 db: Db,
125 resolver: R,
126 ping: Arc<OnceLock<Pinger>>,
127 network: Arc<Network<Db>>,
128 path_planner: PathPlanner<Db, R, CurrentPathSelector>,
129 my_multiaddresses: Vec<Multiaddr>,
130 smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>, Sender<IncomingSession>>,
131}
132
133impl<Db, R> HoprTransport<Db, R>
134where
135 Db: HoprDbTicketOperations + HoprDbPeersOperations + HoprDbProtocolOperations + Clone + Send + Sync + 'static,
136 R: ChainReadChannelOperations
137 + ChainReadAccountOperations
138 + ChainKeyOperations
139 + ChainValues
140 + Clone
141 + Send
142 + Sync
143 + 'static,
144{
145 pub fn new(
146 me: &OffchainKeypair,
147 me_onchain: &ChainKeypair,
148 cfg: HoprTransportConfig,
149 db: Db,
150 resolver: R,
151 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
152 my_multiaddresses: Vec<Multiaddr>,
153 ) -> Self {
154 let me_peerid: PeerId = me.into();
155 let me_chain_addr = me_onchain.public().to_address();
156
157 Self {
158 me: me.clone(),
159 me_peerid,
160 me_address: me_chain_addr,
161 ping: Arc::new(OnceLock::new()),
162 network: Arc::new(Network::new(
163 me_peerid,
164 my_multiaddresses.clone(),
165 cfg.network.clone(),
166 db.clone(),
167 )),
168 path_planner: PathPlanner::new(
169 me_chain_addr,
170 db.clone(),
171 resolver.clone(),
172 CurrentPathSelector::new(
173 channel_graph.clone(),
174 DfsPathSelectorConfig {
175 node_score_threshold: cfg.network.node_score_auto_path_threshold,
176 max_first_hop_latency: cfg.network.max_first_hop_latency_threshold,
177 ..Default::default()
178 },
179 ),
180 channel_graph.clone(),
181 ),
182 my_multiaddresses,
183 smgr: SessionManager::new(SessionManagerConfig {
184 session_tag_range: (16..65535),
186 maximum_sessions: cfg.session.maximum_sessions as usize,
187 frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
188 .ok()
189 .and_then(|s| s.parse::<usize>().ok())
190 .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
191 .max(ApplicationData::PAYLOAD_SIZE),
192 max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
193 .ok()
194 .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
195 .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
196 .max(Duration::from_millis(100)),
197 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
198 idle_timeout: cfg.session.idle_timeout,
199 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
200 initial_return_session_egress_rate: 10,
201 minimum_surb_buffer_duration: Duration::from_secs(5),
202 maximum_surb_buffer_size: db.get_surb_config().rb_capacity,
203 growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
206 }),
207 db,
208 resolver,
209 cfg,
210 }
211 }
212
213 #[allow(clippy::too_many_arguments)]
220 pub async fn run<S>(
221 &self,
222 discovery_updates: S,
223 on_incoming_session: Sender<IncomingSession>,
224 ) -> crate::errors::Result<(
225 HoprSocket<
226 futures::channel::mpsc::Receiver<ApplicationDataIn>,
227 futures::channel::mpsc::Sender<(DestinationRouting, ApplicationDataOut)>,
228 >,
229 HashMap<HoprTransportProcess, AbortHandle>,
230 )>
231 where
232 S: futures::Stream<Item = PeerDiscovery> + Send + 'static,
233 {
234 info!("Loading initial peers from the chain");
235 let public_nodes = self
236 .resolver
237 .stream_accounts(AccountSelector {
238 public_only: true,
239 ..Default::default()
240 })
241 .await
242 .map_err(|e| HoprTransportError::Other(e.into()))?
243 .collect::<Vec<_>>()
244 .await;
245
246 let minimum_capacity = public_nodes.len().saturating_mul(2).saturating_add(100);
249
250 let internal_discovery_updates_capacity = std::env::var("HOPR_INTERNAL_DISCOVERY_UPDATES_CAPACITY")
251 .ok()
252 .and_then(|s| s.trim().parse::<usize>().ok())
253 .filter(|&c| c > 0)
254 .unwrap_or(2048)
255 .max(minimum_capacity);
256
257 debug!(
258 capacity = internal_discovery_updates_capacity,
259 minimum_required = minimum_capacity,
260 "Creating internal discovery updates channel"
261 );
262 let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
263 futures::channel::mpsc::channel::<PeerDiscovery>(internal_discovery_updates_capacity);
264
265 let me_peerid = self.me_peerid;
266 let network = self.network.clone();
267 let discovery_updates = futures_concurrency::stream::StreamExt::merge(
268 discovery_updates,
269 internal_discovery_update_rx,
270 )
271 .filter_map(move |event| {
272 let network = network.clone();
273 async move {
274 match event {
275 PeerDiscovery::Announce(peer, multiaddresses) => {
276 debug!(%peer, ?multiaddresses, "processing peer discovery event: Announce");
277 if peer != me_peerid {
278 let mas = multiaddresses
280 .into_iter()
281 .map(|ma| strip_p2p_protocol(&ma))
282 .filter(|v| !v.is_empty())
283 .collect::<Vec<_>>();
284
285 if !mas.is_empty() {
286 if let Err(error) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await {
287 error!(%peer, %error, "failed to add peer to the network");
288 None
289 } else {
290 Some(PeerDiscovery::Announce(peer, mas))
291 }
292 } else {
293 None
294 }
295 } else {
296 None
297 }
298 }
299 _ => None,
300 }
301 }
302 });
303
304 info!(
305 public_nodes = public_nodes.len(),
306 "Initializing swarm with peers from chain"
307 );
308
309 let mut addresses: HashSet<Multiaddr> = HashSet::new();
310 for node_entry in public_nodes {
311 if let AccountType::Announced { multiaddr, .. } = node_entry.entry_type {
312 let peer: PeerId = node_entry.public_key.into();
313 let multiaddresses = vec![multiaddr];
314
315 debug!(%peer, ?multiaddresses, "Using initial public node");
316 addresses.extend(multiaddresses.clone());
317
318 internal_discovery_update_tx
319 .send(PeerDiscovery::Announce(peer, multiaddresses.clone()))
320 .await
321 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
322 }
323 }
324
325 let mut processes: HashMap<HoprTransportProcess, AbortHandle> = HashMap::new();
326
327 let (unresolved_routing_msg_tx, unresolved_routing_msg_rx) =
328 channel::<(DestinationRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
329
330 let (resolved_routing_msg_tx, resolved_routing_msg_rx) =
331 channel::<(ResolvedTransportRouting, ApplicationDataOut)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
332
333 let mixer_cfg = build_mixer_cfg_from_env();
335
336 #[cfg(feature = "mixer-channel")]
337 let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
338
339 #[cfg(feature = "mixer-stream")]
340 let (mixing_channel_tx, mixing_channel_rx) = {
341 let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
342 let rx = rx.then_concurrent(move |v| {
343 let cfg = mixer_cfg;
344
345 async move {
346 let random_delay = cfg.random_delay();
347 trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
348
349 #[cfg(all(feature = "prometheus", not(test)))]
350 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
351
352 sleep(random_delay).await;
353
354 #[cfg(all(feature = "prometheus", not(test)))]
355 {
356 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
357
358 let weight = 1.0f64 / cfg.metric_delay_window as f64;
359 hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
360 (weight * random_delay.as_millis() as f64)
361 + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
362 );
363 }
364
365 v
366 }
367 });
368
369 (tx, rx)
370 };
371
372 let mut transport_layer =
373 HoprSwarm::new((&self.me).into(), discovery_updates, self.my_multiaddresses.clone()).await;
374
375 if let Some(port) = self.cfg.protocol.autonat_port {
376 transport_layer.run_nat_server(port);
377 }
378
379 if addresses.is_empty() {
380 warn!("No addresses found in the database, not dialing any NAT servers");
381 } else {
382 info!(num_addresses = addresses.len(), "Found addresses from the database");
383 let mut randomized_addresses: Vec<_> = addresses.into_iter().collect();
384 randomized_addresses.shuffle(&mut rand::thread_rng());
385 transport_layer.dial_nat_server(randomized_addresses);
386 }
387
388 let msg_proto_control =
389 transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
390 let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
391 let (wire_msg_tx, wire_msg_rx) =
392 hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
393
394 let _mixing_process_before_sending_out = hopr_async_runtime::prelude::spawn(
395 mixing_channel_rx
396 .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
397 .map(Ok)
398 .forward(wire_msg_tx)
399 .inspect(|_| {
400 tracing::warn!(
401 task = "mixer -> egress process",
402 "long-running background task finished"
403 )
404 }),
405 );
406
407 let (transport_events_tx, transport_events_rx) =
408 futures::channel::mpsc::channel::<hopr_transport_p2p::DiscoveryEvent>(2048);
409
410 let network_clone = self.network.clone();
411 spawn(
412 transport_events_rx
413 .for_each(move |event| {
414 let network = network_clone.clone();
415
416 async move {
417 match event {
418 hopr_transport_p2p::DiscoveryEvent::IncomingConnection(peer, multiaddr) => {
419 if let Err(error) = network
420 .add(&peer, PeerOrigin::IncomingConnection, vec![multiaddr])
421 .await
422 {
423 tracing::error!(%peer, %error, "Failed to add incoming connection peer");
424 }
425 }
426 hopr_transport_p2p::DiscoveryEvent::FailedDial(peer) => {
427 if let Err(error) = network
428 .update(&peer, Err(hopr_transport_network::network::UpdateFailure::DialFailure))
429 .await
430 {
431 tracing::error!(%peer, %error, "Failed to update peer status after failed dial");
432 }
433 }
434 }
435 }
436 })
437 .inspect(|_| {
438 tracing::warn!(
439 task = "transport events recording",
440 "long-running background task finished"
441 )
442 }),
443 );
444
445 processes.insert(
446 HoprTransportProcess::Medium,
447 spawn_as_abortable!(transport_layer.run(transport_events_tx).inspect(|_| tracing::warn!(
448 task = %HoprTransportProcess::Medium,
449 "long-running background task finished"
450 ))),
451 );
452
453 let packet_cfg = PacketInteractionConfig {
455 packet_keypair: self.me.clone(),
456 outgoing_ticket_win_prob: self
457 .cfg
458 .protocol
459 .outgoing_ticket_winning_prob
460 .map(WinningProbability::try_from)
461 .transpose()?,
462 outgoing_ticket_price: self.cfg.protocol.outgoing_ticket_price,
463 };
464
465 let msg_protocol_bidirectional_channel_capacity =
466 std::env::var("HOPR_INTERNAL_PROTOCOL_BIDIRECTIONAL_CHANNEL_CAPACITY")
467 .ok()
468 .and_then(|s| s.trim().parse::<usize>().ok())
469 .filter(|&c| c > 0)
470 .unwrap_or(16_384);
471
472 let (on_incoming_data_tx, on_incoming_data_rx) =
473 channel::<ApplicationDataIn>(msg_protocol_bidirectional_channel_capacity);
474
475 debug!(
476 capacity = msg_protocol_bidirectional_channel_capacity,
477 "Creating protocol bidirectional channel"
478 );
479 let (tx_from_protocol, rx_from_protocol) =
480 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
481
482 let path_planner = self.path_planner.clone();
485 let distress_threshold = self.db.get_surb_config().distress_threshold;
486 let all_resolved_external_msg_rx = unresolved_routing_msg_rx
487 .filter_map(move |(unresolved, mut data)| {
488 let path_planner = path_planner.clone();
489 async move {
490 trace!(?unresolved, "resolving routing for packet");
491 match path_planner
492 .resolve_routing(data.data.total_len(), data.estimate_surbs_with_msg(), unresolved)
493 .await
494 {
495 Ok((resolved, rem_surbs)) => {
496 let mut signals_to_dst = data
500 .packet_info
501 .as_ref()
502 .map(|info| info.signals_to_destination)
503 .unwrap_or_default();
504
505 if resolved.is_return() {
506 signals_to_dst = match rem_surbs {
507 Some(rem) if (1..distress_threshold.max(2)).contains(&rem) => {
508 signals_to_dst | PacketSignal::SurbDistress
509 }
510 Some(0) => signals_to_dst | PacketSignal::OutOfSurbs,
511 _ => signals_to_dst - (PacketSignal::OutOfSurbs | PacketSignal::SurbDistress),
512 };
513 } else {
514 signals_to_dst -= PacketSignal::SurbDistress | PacketSignal::OutOfSurbs;
516 }
517
518 data.packet_info.get_or_insert_default().signals_to_destination = signals_to_dst;
519 trace!(?resolved, "resolved routing for packet");
520 Some((resolved, data))
521 }
522 Err(error) => {
523 error!(%error, "failed to resolve routing");
524 None
525 }
526 }
527 }
528 .in_current_span()
529 })
530 .merge(resolved_routing_msg_rx);
531
532 for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
533 packet_cfg,
534 self.db.clone(),
535 self.resolver.clone(),
536 (
537 mixing_channel_tx.with(|(peer, msg): (PeerId, Box<[u8]>)| {
538 trace!(%peer, "sending message to peer");
539 futures::future::ok::<_, hopr_transport_mixer::channel::SenderError>((peer, msg))
540 }),
541 wire_msg_rx.inspect(|(peer, _)| trace!(%peer, "received message from peer")),
542 ),
543 (tx_from_protocol, all_resolved_external_msg_rx),
544 )
545 .await
546 .into_iter()
547 {
548 processes.insert(HoprTransportProcess::Protocol(k), v);
549 }
550
551 debug!(
553 capacity = msg_protocol_bidirectional_channel_capacity,
554 note = "same as protocol bidirectional",
555 "Creating probing channel"
556 );
557 let (tx_from_probing, rx_from_probing) =
558 channel::<(HoprPseudonym, ApplicationDataIn)>(msg_protocol_bidirectional_channel_capacity);
559
560 let manual_ping_channel_capacity = std::env::var("HOPR_INTERNAL_MANUAL_PING_CHANNEL_CAPACITY")
561 .ok()
562 .and_then(|s| s.trim().parse::<usize>().ok())
563 .filter(|&c| c > 0)
564 .unwrap_or(128);
565 debug!(capacity = manual_ping_channel_capacity, "Creating manual ping channel");
566 let (manual_ping_tx, manual_ping_rx) = channel::<(PeerId, PingQueryReplier)>(manual_ping_channel_capacity);
567
568 let probe = Probe::new((*self.me.public(), self.me_address), self.cfg.probe);
571 for (k, v) in probe
572 .continuously_scan(
573 (resolved_routing_msg_tx, rx_from_protocol),
574 manual_ping_rx,
575 network_notifier::ProbeNetworkInteractions::new(
576 self.network.clone(),
577 self.resolver.clone(),
578 self.path_planner.channel_graph(),
579 ),
580 DbProxy::new(self.db.clone(), self.resolver.clone()),
581 tx_from_probing,
582 )
583 .await
584 .into_iter()
585 {
586 processes.insert(HoprTransportProcess::Probing(k), v);
587 }
588
589 self.ping
591 .clone()
592 .set(Pinger::new(
593 PingConfig {
594 timeout: self.cfg.probe.timeout,
595 },
596 manual_ping_tx,
597 ))
598 .expect("must set the ticket aggregation writer only once");
599
600 self.smgr
602 .start(unresolved_routing_msg_tx.clone(), on_incoming_session)
603 .expect("failed to start session manager")
604 .into_iter()
605 .enumerate()
606 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
607 .for_each(|(k, v)| {
608 processes.insert(k, v);
609 });
610
611 let smgr = self.smgr.clone();
612 processes.insert(
613 HoprTransportProcess::SessionsManagement(0),
614 spawn_as_abortable!(
615 StreamExt::filter_map(rx_from_probing, move |(pseudonym, data)| {
616 let smgr = smgr.clone();
617 async move {
618 match smgr.dispatch_message(pseudonym, data).await {
619 Ok(DispatchResult::Processed) => {
620 trace!("message dispatch completed");
621 None
622 }
623 Ok(DispatchResult::Unrelated(data)) => {
624 trace!("unrelated message dispatch completed");
625 Some(data)
626 }
627 Err(e) => {
628 error!(error = %e, "error while processing packet");
629 None
630 }
631 }
632 }
633 })
634 .map(Ok)
635 .forward(on_incoming_data_tx)
636 .inspect(|_| tracing::warn!(
637 task = %HoprTransportProcess::SessionsManagement(0),
638 "long-running background task finished"
639 ))
640 ),
641 );
642
643 Ok(((on_incoming_data_rx, unresolved_routing_msg_tx).into(), processes))
644 }
645
646 #[tracing::instrument(level = "debug", skip(self))]
647 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
648 if peer == &self.me_peerid {
649 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
650 }
651
652 let pinger = self
653 .ping
654 .get()
655 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
656
657 if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
658 error!(error = %e, "Failed to store the peer observation");
659 }
660
661 let latency = (*pinger).ping(*peer).await?;
662
663 let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::Probe(
664 hopr_transport_probe::errors::ProbeError::NonExistingPeer,
665 ))?;
666
667 Ok((latency, peer_status))
668 }
669
670 #[tracing::instrument(level = "debug", skip(self))]
671 pub async fn new_session(
672 &self,
673 destination: Address,
674 target: SessionTarget,
675 cfg: SessionClientConfig,
676 ) -> errors::Result<HoprSession> {
677 Ok(self.smgr.new_session(destination, target, cfg).await?)
678 }
679
680 #[tracing::instrument(level = "debug", skip(self))]
681 pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
682 Ok(self.smgr.ping_session(id).await?)
683 }
684
685 pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
686 Ok(self.smgr.get_surb_balancer_config(id).await?)
687 }
688
689 pub async fn update_session_surb_balancing_cfg(
690 &self,
691 id: &SessionId,
692 cfg: SurbBalancerConfig,
693 ) -> errors::Result<()> {
694 Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
695 }
696
697 #[tracing::instrument(level = "debug", skip(self))]
698 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
699 self.network
700 .get(&self.me_peerid)
701 .await
702 .unwrap_or_else(|e| {
703 error!(error = %e, "failed to obtain listening multi-addresses");
704 None
705 })
706 .map(|peer| peer.multiaddresses)
707 .unwrap_or_default()
708 }
709
710 #[tracing::instrument(level = "debug", skip(self))]
711 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
712 let mut mas = self
713 .local_multiaddresses()
714 .into_iter()
715 .filter(|ma| {
716 hopr_transport_identity::multiaddrs::is_supported(ma)
717 && (self.cfg.transport.announce_local_addresses
718 || !hopr_transport_identity::multiaddrs::is_private(ma))
719 })
720 .map(|ma| strip_p2p_protocol(&ma))
721 .filter(|v| !v.is_empty())
722 .collect::<Vec<_>>();
723
724 mas.sort_by(|l, r| {
725 let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
726 let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
727
728 if !(is_left_dns ^ is_right_dns) {
729 std::cmp::Ordering::Equal
730 } else if is_left_dns {
731 std::cmp::Ordering::Less
732 } else {
733 std::cmp::Ordering::Greater
734 }
735 });
736
737 mas
738 }
739
740 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
741 self.my_multiaddresses.clone()
742 }
743
744 #[tracing::instrument(level = "debug", skip(self))]
745 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
746 self.network
747 .get(peer)
748 .await
749 .unwrap_or(None)
750 .map(|peer| peer.multiaddresses)
751 .unwrap_or(vec![])
752 }
753
754 #[tracing::instrument(level = "debug", skip(self))]
755 pub async fn network_health(&self) -> Health {
756 self.network.health().await
757 }
758
759 #[tracing::instrument(level = "debug", skip(self))]
760 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
761 Ok(self.network.connected_peers().await?)
762 }
763
764 #[tracing::instrument(level = "debug", skip(self))]
765 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
766 Ok(self.network.get(peer).await?)
767 }
768
769 #[tracing::instrument(level = "debug", skip(self))]
770 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
771 let ticket_stats = self
772 .db
773 .get_ticket_statistics(None)
774 .await
775 .map_err(|e| HoprTransportError::Other(e.into()))?;
776
777 Ok(TicketStatistics {
778 winning_count: ticket_stats.winning_tickets,
779 unredeemed_value: ticket_stats.unredeemed_value,
780 redeemed_value: ticket_stats.redeemed_value,
781 neglected_value: ticket_stats.neglected_value,
782 rejected_value: ticket_stats.rejected_value,
783 })
784 }
785
786 #[tracing::instrument(level = "debug", skip(self))]
787 pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
788 if let Some(channel) = self
789 .resolver
790 .channel_by_id(channel_id)
791 .await
792 .map_err(|e| HoprTransportError::Other(e.into()))?
793 {
794 if channel.destination == self.me_address {
795 Ok(Some(
796 self.db
797 .stream_tickets(Some((&channel).into()))
798 .await
799 .map_err(|e| HoprTransportError::Other(e.into()))?
800 .collect()
801 .await,
802 ))
803 } else {
804 Ok(None)
805 }
806 } else {
807 Ok(None)
808 }
809 }
810
811 #[tracing::instrument(level = "debug", skip(self))]
812 pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
813 Ok(self
814 .db
815 .stream_tickets(None)
816 .await
817 .map_err(|e| HoprTransportError::Other(e.into()))?
818 .map(|v| v.ticket.leak())
819 .collect()
820 .await)
821 }
822}
823
824fn build_mixer_cfg_from_env() -> MixerConfig {
825 let mixer_cfg = MixerConfig {
826 min_delay: std::time::Duration::from_millis(
827 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
828 .map(|v| {
829 v.trim()
830 .parse::<u64>()
831 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
832 })
833 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
834 ),
835 delay_range: std::time::Duration::from_millis(
836 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
837 .map(|v| {
838 v.trim()
839 .parse::<u64>()
840 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
841 })
842 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
843 ),
844 capacity: {
845 let capacity = std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
846 .ok()
847 .and_then(|s| s.trim().parse::<usize>().ok())
848 .filter(|&c| c > 0)
849 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY);
850 debug!(capacity = capacity, "Setting mixer capacity");
851 capacity
852 },
853 ..MixerConfig::default()
854 };
855 debug!(?mixer_cfg, "Mixer configuration");
856
857 mixer_cfg
858}