1pub mod config;
17pub mod constants;
19pub mod errors;
21pub mod helpers;
22pub mod network_notifier;
23pub mod proxy;
25
26use std::{
27 collections::{HashMap, HashSet},
28 sync::{Arc, OnceLock},
29 time::Duration,
30};
31
32use async_lock::RwLock;
33use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
34use futures::{
35 SinkExt, StreamExt,
36 channel::mpsc::{self, Sender, UnboundedReceiver, UnboundedSender, unbounded},
37};
38use helpers::PathPlanner;
39use hopr_async_runtime::{AbortHandle, prelude::spawn, spawn_as_abortable};
40use hopr_crypto_packet::prelude::HoprPacket;
41pub use hopr_crypto_types::{
42 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
43 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
44};
45use hopr_db_sql::{
46 HoprDbAllOperations,
47 accounts::ChainOrPacketKey,
48 api::tickets::{AggregationPrerequisites, HoprDbTicketOperations},
49};
50pub use hopr_internal_types::prelude::HoprPseudonym;
51use hopr_internal_types::prelude::*;
52pub use hopr_network_types::prelude::RoutingOptions;
53use hopr_network_types::prelude::{DestinationRouting, ResolvedTransportRouting};
54use hopr_path::{
55 PathAddressResolver,
56 selectors::dfs::{DfsPathSelector, DfsPathSelectorConfig, RandomizedEdgeWeighting},
57};
58use hopr_primitive_types::prelude::*;
59pub use hopr_protocol_app::prelude::{ApplicationData, Tag};
60use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
61pub use hopr_transport_identity::{Multiaddr, PeerId};
62use hopr_transport_mixer::MixerConfig;
63pub use hopr_transport_network::network::{Health, Network, PeerOrigin, PeerStatus};
64use hopr_transport_p2p::{
65 HoprSwarm,
66 swarm::{TicketAggregationRequestType, TicketAggregationResponseType},
67};
68use hopr_transport_probe::{
69 DbProxy, Probe,
70 ping::{PingConfig, Pinger},
71};
72pub use hopr_transport_probe::{errors::ProbeError, ping::PingQueryReplier};
73pub use hopr_transport_protocol::{PeerDiscovery, execute_on_tick};
74use hopr_transport_protocol::{
75 errors::ProtocolError,
76 processor::{MsgSender, PacketInteractionConfig, PacketSendFinalizer, SendMsgInput},
77};
78#[cfg(feature = "runtime-tokio")]
79pub use hopr_transport_session::transfer_session;
80pub use hopr_transport_session::{
81 Capabilities as SessionCapabilities, Capability as SessionCapability, IncomingSession, SESSION_MTU, SURB_SIZE,
82 ServiceId, Session, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
83 errors::{SessionManagerError, TransportSessionError},
84};
85use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
86use hopr_transport_ticket_aggregation::{
87 AwaitingAggregator, TicketAggregationActions, TicketAggregationError, TicketAggregationInteraction,
88 TicketAggregatorTrait,
89};
90use rand::seq::SliceRandom;
91#[cfg(feature = "mixer-stream")]
92use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
93use tracing::{debug, error, info, trace, warn};
94
95pub use crate::{
96 config::HoprTransportConfig,
97 helpers::{PeerEligibility, TicketStatistics},
98};
99use crate::{constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, helpers::run_packet_planner};
100
101pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
102
103#[cfg(any(
104 all(feature = "mixer-channel", feature = "mixer-stream"),
105 all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
106))]
107compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
108
109lazy_static::lazy_static! {
111 static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
112}
113
114#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
115pub enum HoprTransportProcess {
116 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
117 Medium,
118 #[strum(to_string = "HOPR protocol ({0})")]
119 Protocol(hopr_transport_protocol::ProtocolProcesses),
120 #[strum(to_string = "session manager sub-process #{0}")]
121 SessionsManagement(usize),
122 #[strum(to_string = "network probing sub-process: {0}")]
123 Probing(hopr_transport_probe::HoprProbeProcess),
124}
125
126#[derive(Debug, Clone)]
127pub struct TicketAggregatorProxy<Db>
128where
129 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
130{
131 db: Db,
132 maybe_writer: Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
133 agg_timeout: std::time::Duration,
134}
135
136impl<Db> TicketAggregatorProxy<Db>
137where
138 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
139{
140 pub fn new(
141 db: Db,
142 maybe_writer: Arc<
143 OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>,
144 >,
145 agg_timeout: std::time::Duration,
146 ) -> Self {
147 Self {
148 db,
149 maybe_writer,
150 agg_timeout,
151 }
152 }
153}
154
155#[async_trait::async_trait]
156impl<Db> TicketAggregatorTrait for TicketAggregatorProxy<Db>
157where
158 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
159{
160 async fn aggregate_tickets(
161 &self,
162 channel: &Hash,
163 prerequisites: AggregationPrerequisites,
164 ) -> hopr_transport_ticket_aggregation::Result<()> {
165 if let Some(writer) = self.maybe_writer.clone().get() {
166 AwaitingAggregator::new(self.db.clone(), writer.clone(), self.agg_timeout)
167 .aggregate_tickets(channel, prerequisites)
168 .await
169 } else {
170 Err(TicketAggregationError::TransportError(
171 "Ticket aggregation writer not available, the object was not yet initialized".to_string(),
172 ))
173 }
174 }
175}
176
177type CurrentPathSelector = DfsPathSelector<RandomizedEdgeWeighting>;
179
180pub struct HoprTransport<T>
183where
184 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
185{
186 me: OffchainKeypair,
187 me_peerid: PeerId, me_address: Address,
189 cfg: HoprTransportConfig,
190 db: T,
191 ping: Arc<OnceLock<Pinger>>,
192 network: Arc<Network<T>>,
193 process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
194 path_planner: PathPlanner<T, CurrentPathSelector>,
195 my_multiaddresses: Vec<Multiaddr>,
196 process_ticket_aggregate:
197 Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
198 smgr: SessionManager<Sender<(DestinationRouting, ApplicationData)>>,
199}
200
201impl<T> HoprTransport<T>
202where
203 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
204{
205 pub fn new(
206 me: &OffchainKeypair,
207 me_onchain: &ChainKeypair,
208 cfg: HoprTransportConfig,
209 db: T,
210 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
211 my_multiaddresses: Vec<Multiaddr>,
212 ) -> Self {
213 let process_packet_send = Arc::new(OnceLock::new());
214
215 let me_peerid: PeerId = me.into();
216 let me_chain_addr = me_onchain.public().to_address();
217
218 Self {
219 me: me.clone(),
220 me_peerid,
221 me_address: me_chain_addr,
222 ping: Arc::new(OnceLock::new()),
223 network: Arc::new(Network::new(
224 me_peerid,
225 my_multiaddresses.clone(),
226 cfg.network.clone(),
227 db.clone(),
228 )),
229 process_packet_send,
230 path_planner: PathPlanner::new(
231 me_chain_addr,
232 db.clone(),
233 CurrentPathSelector::new(
234 channel_graph.clone(),
235 DfsPathSelectorConfig {
236 node_score_threshold: cfg.network.node_score_auto_path_threshold,
237 max_first_hop_latency: cfg.network.max_first_hop_latency_threshold,
238 ..Default::default()
239 },
240 ),
241 channel_graph.clone(),
242 ),
243 my_multiaddresses,
244 process_ticket_aggregate: Arc::new(OnceLock::new()),
245 smgr: SessionManager::new(SessionManagerConfig {
246 session_tag_range: (16..65535),
248 maximum_sessions: cfg.session.maximum_sessions as usize,
249 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
250 idle_timeout: cfg.session.idle_timeout,
251 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
252 initial_return_session_egress_rate: 10,
253 minimum_surb_buffer_duration: Duration::from_secs(5),
254 maximum_surb_buffer_size: db.get_surb_config().rb_capacity,
255 growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
258 }),
259 db,
260 cfg,
261 }
262 }
263
264 #[allow(clippy::too_many_arguments)]
271 pub async fn run(
272 &self,
273 me_onchain: &ChainKeypair,
274 tbf_path: String,
275 on_incoming_data: UnboundedSender<ApplicationData>,
276 discovery_updates: UnboundedReceiver<PeerDiscovery>,
277 on_incoming_session: UnboundedSender<IncomingSession>,
278 ) -> crate::errors::Result<HashMap<HoprTransportProcess, AbortHandle>> {
279 let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
280 futures::channel::mpsc::unbounded::<PeerDiscovery>();
281
282 let network_clone = self.network.clone();
283 let db_clone = self.db.clone();
284 let me_peerid = self.me_peerid;
285 let discovery_updates =
286 futures_concurrency::stream::StreamExt::merge(discovery_updates, internal_discovery_update_rx)
287 .filter_map(move |event| {
288 let network = network_clone.clone();
289 let db = db_clone.clone();
290 let me = me_peerid;
291
292 async move {
293 match event {
294 PeerDiscovery::Allow(peer_id) => {
295 debug!(peer = %peer_id, "Processing peer discovery event: Allow");
296 if let Ok(pk) = OffchainPublicKey::try_from(peer_id) {
297 if !network.has(&peer_id).await {
298 let mas = db
299 .get_account(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pk))
300 .await
301 .map(|entry| {
302 entry
303 .map(|v| Vec::from_iter(v.get_multiaddr().into_iter()))
304 .unwrap_or_default()
305 })
306 .unwrap_or_default();
307
308 if let Err(e) = network.add(&peer_id, PeerOrigin::NetworkRegistry, mas).await {
309 error!(peer = %peer_id, error = %e, "Failed to allow locally (already allowed on-chain)");
310 return None;
311 }
312 }
313
314 return Some(PeerDiscovery::Allow(peer_id))
315 } else {
316 error!(peer = %peer_id, "Failed to allow locally (already allowed on-chain): peer id not convertible to off-chain public key")
317 }
318 }
319 PeerDiscovery::Ban(peer_id) => {
320 if let Err(e) = network.remove(&peer_id).await {
321 error!(peer = %peer_id, error = %e, "Failed to ban locally (already banned on-chain)")
322 } else {
323 return Some(PeerDiscovery::Ban(peer_id))
324 }
325 }
326 PeerDiscovery::Announce(peer, multiaddresses) => {
327 debug!(peer = %peer, ?multiaddresses, "Processing peer discovery event: Announce");
328 if peer != me {
329 let mas = multiaddresses
331 .into_iter()
332 .map(|ma| strip_p2p_protocol(&ma))
333 .filter(|v| !v.is_empty())
334 .collect::<Vec<_>>();
335
336 if ! mas.is_empty() {
337 if let Ok(pk) = OffchainPublicKey::try_from(peer) {
338 if let Ok(Some(key)) = db.translate_key(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pk)).await {
339 let key: Result<Address, _> = key.try_into();
340
341 if let Ok(key) = key {
342 if db
343 .is_allowed_in_network_registry(None, &key)
344 .await
345 .unwrap_or(false)
346 {
347 if let Err(e) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await
348 {
349 error!(%peer, error = %e, "failed to record peer from the NetworkRegistry");
350 } else {
351 return Some(PeerDiscovery::Announce(peer, mas))
352 }
353 }
354 }
355 } else {
356 error!(%peer, "Failed to announce peer due to convertibility error");
357 }
358 }
359 }
360 }
361 }
362 }
363
364 None
365 }
366 });
367
368 info!("Loading initial peers from the storage");
369
370 let mut addresses: HashSet<Multiaddr> = HashSet::new();
371 let nodes = self.get_public_nodes().await?;
372 for (peer, _address, multiaddresses) in nodes {
373 if self.is_allowed_to_access_network(either::Left(&peer)).await? {
374 debug!(%peer, ?multiaddresses, "Using initial public node");
375 addresses.extend(multiaddresses.clone());
376
377 internal_discovery_update_tx
378 .send(PeerDiscovery::Announce(peer, multiaddresses.clone()))
379 .await
380 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
381
382 internal_discovery_update_tx
383 .send(PeerDiscovery::Allow(peer))
384 .await
385 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
386 }
387 }
388
389 let mut processes: HashMap<HoprTransportProcess, AbortHandle> = HashMap::new();
390
391 let ticket_agg_proc = TicketAggregationInteraction::new(self.db.clone(), me_onchain);
392 let tkt_agg_writer = ticket_agg_proc.writer();
393
394 let (external_msg_send, external_msg_rx) =
395 mpsc::channel::<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>(
396 MAXIMUM_MSG_OUTGOING_BUFFER_SIZE,
397 );
398
399 self.process_packet_send
400 .clone()
401 .set(MsgSender::new(external_msg_send.clone()))
402 .expect("must set the packet processing writer only once");
403
404 self.process_ticket_aggregate
405 .clone()
406 .set(tkt_agg_writer.clone())
407 .expect("must set the ticket aggregation writer only once");
408
409 let mixer_cfg = build_mixer_cfg_from_env();
411
412 #[cfg(feature = "mixer-channel")]
413 let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
414
415 #[cfg(feature = "mixer-stream")]
416 let (mixing_channel_tx, mixing_channel_rx) = {
417 let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
418 let rx = rx.then_concurrent(move |v| {
419 let cfg = mixer_cfg;
420
421 async move {
422 let random_delay = cfg.random_delay();
423 trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
424
425 #[cfg(all(feature = "prometheus", not(test)))]
426 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
427
428 sleep(random_delay).await;
429
430 #[cfg(all(feature = "prometheus", not(test)))]
431 {
432 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
433
434 let weight = 1.0f64 / cfg.metric_delay_window as f64;
435 hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
436 (weight * random_delay.as_millis() as f64)
437 + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
438 );
439 }
440
441 v
442 }
443 });
444
445 (tx, rx)
446 };
447
448 let mut transport_layer =
449 HoprSwarm::new((&self.me).into(), discovery_updates, self.my_multiaddresses.clone()).await;
450
451 if let Some(port) = self.cfg.protocol.autonat_port {
452 transport_layer.run_nat_server(port);
453 }
454
455 if addresses.is_empty() {
456 warn!("No addresses found in the database, not dialing any NAT servers");
457 } else {
458 info!(num_addresses = addresses.len(), "Found addresses from the database");
459 let mut randomized_addresses: Vec<_> = addresses.into_iter().collect();
460 randomized_addresses.shuffle(&mut rand::thread_rng());
461 transport_layer.dial_nat_server(randomized_addresses);
462 }
463
464 let msg_proto_control =
465 transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
466 let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
467 let (wire_msg_tx, wire_msg_rx) =
468 hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
469
470 let _mixing_process_before_sending_out = hopr_async_runtime::prelude::spawn(
471 mixing_channel_rx
472 .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
473 .map(Ok)
474 .forward(wire_msg_tx),
475 );
476
477 let (transport_events_tx, transport_events_rx) =
478 futures::channel::mpsc::channel::<hopr_transport_p2p::DiscoveryEvent>(1000);
479
480 let network_clone = self.network.clone();
481 spawn(transport_events_rx.for_each(move |event| {
482 let network = network_clone.clone();
483
484 async move {
485 match event {
486 hopr_transport_p2p::DiscoveryEvent::IncomingConnection(peer, multiaddr) => {
487 if let Err(error) = network
488 .add(&peer, PeerOrigin::IncomingConnection, vec![multiaddr])
489 .await
490 {
491 tracing::error!(%peer, %error, "Failed to add incoming connection peer");
492 }
493 }
494 hopr_transport_p2p::DiscoveryEvent::FailedDial(peer) => {
495 if let Err(error) = network
496 .update(&peer, Err(hopr_transport_network::network::UpdateFailure::DialFailure))
497 .await
498 {
499 tracing::error!(%peer, %error, "Failed to update peer status after failed dial");
500 }
501 }
502 }
503 }
504 }));
505
506 processes.insert(
507 HoprTransportProcess::Medium,
508 spawn_as_abortable!(transport_layer.run(transport_events_tx)),
509 );
510
511 let packet_cfg = PacketInteractionConfig {
513 packet_keypair: self.me.clone(),
514 outgoing_ticket_win_prob: self
515 .cfg
516 .protocol
517 .outgoing_ticket_winning_prob
518 .map(WinningProbability::try_from)
519 .transpose()?,
520 outgoing_ticket_price: self.cfg.protocol.outgoing_ticket_price,
521 };
522
523 let (tx_from_protocol, rx_from_protocol) = unbounded::<(HoprPseudonym, ApplicationData)>();
524 for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
525 packet_cfg,
526 self.db.clone(),
527 Some(tbf_path),
528 (
529 mixing_channel_tx.with(|(peer, msg): (PeerId, Box<[u8]>)| {
530 trace!(%peer, "sending message to peer");
531 futures::future::ok::<_, hopr_transport_mixer::channel::SenderError>((peer, msg))
532 }),
533 wire_msg_rx.inspect(|(peer, _)| trace!(%peer, "received message from peer")),
534 ),
535 (tx_from_protocol, external_msg_rx),
536 )
537 .await
538 .into_iter()
539 {
540 processes.insert(HoprTransportProcess::Protocol(k), v);
541 }
542
543 let (tx_from_probing, rx_from_probing) = unbounded::<(HoprPseudonym, ApplicationData)>();
545
546 let (manual_ping_tx, manual_ping_rx) = unbounded::<(PeerId, PingQueryReplier)>();
547
548 let probe = Probe::new((*self.me.public(), self.me_address), self.cfg.probe);
549 for (k, v) in probe
550 .continuously_scan(
551 (external_msg_send, rx_from_protocol),
552 manual_ping_rx,
553 network_notifier::ProbeNetworkInteractions::new(
554 self.network.clone(),
555 self.db.clone(),
556 self.path_planner.channel_graph(),
557 ),
558 DbProxy::new(self.db.clone()),
559 tx_from_probing,
560 )
561 .await
562 .into_iter()
563 {
564 processes.insert(HoprTransportProcess::Probing(k), v);
565 }
566
567 self.ping
569 .clone()
570 .set(Pinger::new(
571 PingConfig {
572 timeout: self.cfg.probe.timeout,
573 },
574 manual_ping_tx,
575 ))
576 .expect("must set the ticket aggregation writer only once");
577
578 let packet_planner = run_packet_planner(
580 self.path_planner.clone(),
581 self.process_packet_send
582 .get()
583 .cloned()
584 .expect("packet sender must be set"),
585 );
586
587 self.smgr
588 .start(packet_planner, on_incoming_session)
589 .expect("failed to start session manager")
590 .into_iter()
591 .enumerate()
592 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
593 .for_each(|(k, v)| {
594 processes.insert(k, v);
595 });
596
597 let smgr = self.smgr.clone();
598 processes.insert(
599 HoprTransportProcess::SessionsManagement(0),
600 spawn_as_abortable!(async move {
601 let _the_process_should_not_end = StreamExt::filter_map(rx_from_probing, |(pseudonym, data)| {
602 let smgr = smgr.clone();
603 async move {
604 match smgr.dispatch_message(pseudonym, data).await {
605 Ok(DispatchResult::Processed) => {
606 trace!("message dispatch completed");
607 None
608 }
609 Ok(DispatchResult::Unrelated(data)) => {
610 trace!("unrelated message dispatch completed");
611 Some(data)
612 }
613 Err(e) => {
614 error!(error = %e, "error while processing packet");
615 None
616 }
617 }
618 }
619 })
620 .map(Ok)
621 .forward(on_incoming_data)
622 .await;
623 }),
624 );
625
626 Ok(processes)
627 }
628
629 pub fn ticket_aggregator(&self) -> Arc<dyn TicketAggregatorTrait + Send + Sync + 'static> {
630 Arc::new(proxy::TicketAggregatorProxy::new(
631 self.db.clone(),
632 self.process_ticket_aggregate.clone(),
633 std::time::Duration::from_secs(15),
634 ))
635 }
636
637 #[tracing::instrument(level = "debug", skip(self))]
638 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
639 if !self.is_allowed_to_access_network(either::Left(peer)).await? {
640 return Err(HoprTransportError::Api(format!(
641 "ping to '{peer}' not allowed due to network registry"
642 )));
643 }
644
645 if peer == &self.me_peerid {
646 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
647 }
648
649 let pinger = self
650 .ping
651 .get()
652 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
653
654 if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
655 error!(error = %e, "Failed to store the peer observation");
656 }
657
658 let latency = (*pinger).ping(*peer).await?;
659
660 let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::Probe(
661 hopr_transport_probe::errors::ProbeError::NonExistingPeer,
662 ))?;
663
664 Ok((latency, peer_status))
665 }
666
667 #[tracing::instrument(level = "debug", skip(self))]
668 pub async fn new_session(
669 &self,
670 destination: Address,
671 target: SessionTarget,
672 cfg: SessionClientConfig,
673 ) -> errors::Result<Session> {
674 Ok(self.smgr.new_session(destination, target, cfg).await?)
675 }
676
677 #[tracing::instrument(level = "debug", skip(self))]
678 pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
679 Ok(self.smgr.ping_session(id).await?)
680 }
681
682 pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
683 Ok(self.smgr.get_surb_balancer_config(id).await?)
684 }
685
686 pub async fn update_session_surb_balancing_cfg(
687 &self,
688 id: &SessionId,
689 cfg: SurbBalancerConfig,
690 ) -> errors::Result<()> {
691 Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
692 }
693
694 #[tracing::instrument(level = "info", skip(self, msg), fields(uuid = uuid::Uuid::new_v4().to_string()))]
695 pub async fn send_message(&self, msg: Box<[u8]>, routing: DestinationRouting, tag: Tag) -> errors::Result<()> {
696 if let Tag::Reserved(reserved_tag) = tag {
697 return Err(HoprTransportError::Api(format!(
698 "Application tag must not from range: {:?}, but was {reserved_tag:?}",
699 Tag::APPLICATION_TAG_RANGE
700 )));
701 }
702
703 if msg.len() > HoprPacket::PAYLOAD_SIZE {
704 return Err(HoprTransportError::Api(format!(
705 "Message exceeds the maximum allowed size of {} bytes",
706 HoprPacket::PAYLOAD_SIZE
707 )));
708 }
709
710 let app_data = ApplicationData::new_from_owned(tag, msg);
711 let routing = self.path_planner.resolve_routing(app_data.len(), routing).await?.0;
712
713 let sender = self.process_packet_send.get().ok_or_else(|| {
716 HoprTransportError::Api("send msg: failed because message processing is not yet initialized".into())
717 })?;
718
719 sender
720 .send_packet(app_data, routing)
721 .await
722 .map_err(|e| HoprTransportError::Api(format!("send msg failed to enqueue msg: {e}")))?
723 .consume_and_wait(crate::constants::PACKET_QUEUE_TIMEOUT_MILLISECONDS)
724 .await
725 .map_err(|e| HoprTransportError::Api(e.to_string()))
726 }
727
728 #[tracing::instrument(level = "debug", skip(self))]
729 pub async fn aggregate_tickets(&self, channel_id: &Hash) -> errors::Result<()> {
730 let entry = self
731 .db
732 .get_channel_by_id(None, channel_id)
733 .await
734 .map_err(hopr_db_sql::api::errors::DbError::from)
735 .map_err(HoprTransportError::from)
736 .and_then(|c| {
737 if let Some(c) = c {
738 Ok(c)
739 } else {
740 Err(ProtocolError::ChannelNotFound.into())
741 }
742 })?;
743
744 if entry.status != ChannelStatus::Open {
745 return Err(ProtocolError::ChannelClosed.into());
746 }
747
748 Err(TicketAggregationError::TransportError(
757 "Ticket aggregation not supported as a session protocol yet".to_string(),
758 )
759 .into())
760 }
761
762 #[tracing::instrument(level = "debug", skip(self))]
763 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
764 Ok(self
765 .db
766 .get_accounts(None, true)
767 .await
768 .map_err(hopr_db_sql::api::errors::DbError::from)?
769 .into_iter()
770 .map(|entry| {
771 (
772 PeerId::from(entry.public_key),
773 entry.chain_addr,
774 Vec::from_iter(entry.get_multiaddr().into_iter()),
775 )
776 })
777 .collect())
778 }
779
780 pub async fn is_allowed_to_access_network<'a>(
781 &self,
782 address_like: either::Either<&'a PeerId, Address>,
783 ) -> errors::Result<bool>
784 where
785 T: 'a,
786 {
787 let db_clone = self.db.clone();
788 let address_like_noref = address_like.map_left(|peer| *peer);
789
790 Ok(self
791 .db
792 .begin_transaction()
793 .await
794 .map_err(hopr_db_sql::api::errors::DbError::from)?
795 .perform(|tx| {
796 Box::pin(async move {
797 match address_like_noref {
798 either::Left(peer) => {
799 let pk = OffchainPublicKey::try_from(peer)?;
800 if let Some(address) = db_clone.translate_key(Some(tx), pk).await? {
801 db_clone.is_allowed_in_network_registry(Some(tx), &address).await
802 } else {
803 Err(hopr_db_sql::errors::DbSqlError::LogicalError(
804 "cannot translate off-chain key".into(),
805 ))
806 }
807 }
808 either::Right(address) => db_clone.is_allowed_in_network_registry(Some(tx), &address).await,
809 }
810 })
811 })
812 .await
813 .map_err(hopr_db_sql::api::errors::DbError::from)?)
814 }
815
816 #[tracing::instrument(level = "debug", skip(self))]
817 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
818 self.network
819 .get(&self.me_peerid)
820 .await
821 .unwrap_or_else(|e| {
822 error!(error = %e, "failed to obtain listening multi-addresses");
823 None
824 })
825 .map(|peer| peer.multiaddresses)
826 .unwrap_or_default()
827 }
828
829 #[tracing::instrument(level = "debug", skip(self))]
830 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
831 let mut mas = self
832 .local_multiaddresses()
833 .into_iter()
834 .filter(|ma| {
835 hopr_transport_identity::multiaddrs::is_supported(ma)
836 && (self.cfg.transport.announce_local_addresses
837 || !hopr_transport_identity::multiaddrs::is_private(ma))
838 })
839 .map(|ma| strip_p2p_protocol(&ma))
840 .filter(|v| !v.is_empty())
841 .collect::<Vec<_>>();
842
843 mas.sort_by(|l, r| {
844 let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
845 let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
846
847 if !(is_left_dns ^ is_right_dns) {
848 std::cmp::Ordering::Equal
849 } else if is_left_dns {
850 std::cmp::Ordering::Less
851 } else {
852 std::cmp::Ordering::Greater
853 }
854 });
855
856 mas
857 }
858
859 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
860 self.my_multiaddresses.clone()
861 }
862
863 #[tracing::instrument(level = "debug", skip(self))]
864 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
865 self.network
866 .get(peer)
867 .await
868 .unwrap_or(None)
869 .map(|peer| peer.multiaddresses)
870 .unwrap_or(vec![])
871 }
872
873 #[tracing::instrument(level = "debug", skip(self))]
874 pub async fn network_health(&self) -> Health {
875 self.network.health().await
876 }
877
878 #[tracing::instrument(level = "debug", skip(self))]
879 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
880 Ok(self.network.connected_peers().await?)
881 }
882
883 #[tracing::instrument(level = "debug", skip(self))]
884 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
885 Ok(self.network.get(peer).await?)
886 }
887
888 #[tracing::instrument(level = "debug", skip(self))]
889 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
890 let ticket_stats = self.db.get_ticket_statistics(None).await?;
891
892 Ok(TicketStatistics {
893 winning_count: ticket_stats.winning_tickets,
894 unredeemed_value: ticket_stats.unredeemed_value,
895 redeemed_value: ticket_stats.redeemed_value,
896 neglected_value: ticket_stats.neglected_value,
897 rejected_value: ticket_stats.rejected_value,
898 })
899 }
900
901 #[tracing::instrument(level = "debug", skip(self))]
902 pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
903 if let Some(channel) = self
904 .db
905 .get_channel_by_id(None, channel_id)
906 .await
907 .map_err(hopr_db_sql::api::errors::DbError::from)?
908 {
909 let own_address: Address = self
910 .db
911 .translate_key(None, ChainOrPacketKey::PacketKey(*self.me.public()))
912 .await?
913 .ok_or_else(|| {
914 HoprTransportError::Api("Failed to translate the off-chain key to on-chain address".into())
915 })?
916 .try_into()?;
917
918 if channel.destination == own_address {
919 Ok(Some(self.db.get_tickets((&channel).into()).await?))
920 } else {
921 Ok(None)
922 }
923 } else {
924 Ok(None)
925 }
926 }
927
928 #[tracing::instrument(level = "debug", skip(self))]
929 pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
930 Ok(self
931 .db
932 .get_all_tickets()
933 .await?
934 .into_iter()
935 .map(|v| v.ticket.leak())
936 .collect())
937 }
938}
939
940fn build_mixer_cfg_from_env() -> MixerConfig {
941 let mixer_cfg = MixerConfig {
942 min_delay: std::time::Duration::from_millis(
943 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
944 .map(|v| {
945 v.trim()
946 .parse::<u64>()
947 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
948 })
949 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
950 ),
951 delay_range: std::time::Duration::from_millis(
952 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
953 .map(|v| {
954 v.trim()
955 .parse::<u64>()
956 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
957 })
958 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
959 ),
960 capacity: std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
961 .map(|v| {
962 v.trim()
963 .parse::<usize>()
964 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY)
965 })
966 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY),
967 ..MixerConfig::default()
968 };
969 debug!(?mixer_cfg, "Mixer configuration");
970
971 mixer_cfg
972}