1pub mod config;
17pub mod constants;
19pub mod errors;
21pub mod helpers;
22pub mod network_notifier;
23pub mod proxy;
25
26use std::{
27 collections::{HashMap, HashSet},
28 sync::{Arc, OnceLock},
29 time::Duration,
30};
31
32use async_lock::RwLock;
33use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
34use futures::{
35 FutureExt, SinkExt, StreamExt,
36 channel::mpsc::{self, Sender, UnboundedReceiver, UnboundedSender, unbounded},
37};
38use helpers::PathPlanner;
39use hopr_async_runtime::{AbortHandle, prelude::spawn, spawn_as_abortable};
40use hopr_crypto_packet::prelude::HoprPacket;
41pub use hopr_crypto_types::{
42 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
43 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
44};
45use hopr_db_sql::{
46 HoprDbAllOperations,
47 accounts::ChainOrPacketKey,
48 api::tickets::{AggregationPrerequisites, HoprDbTicketOperations},
49};
50pub use hopr_internal_types::prelude::HoprPseudonym;
51use hopr_internal_types::prelude::*;
52pub use hopr_network_types::prelude::RoutingOptions;
53use hopr_network_types::prelude::{DestinationRouting, ResolvedTransportRouting};
54use hopr_path::{
55 PathAddressResolver,
56 selectors::dfs::{DfsPathSelector, DfsPathSelectorConfig, RandomizedEdgeWeighting},
57};
58use hopr_primitive_types::prelude::*;
59pub use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, Tag};
60use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
61pub use hopr_transport_identity::{Multiaddr, PeerId};
62use hopr_transport_mixer::MixerConfig;
63pub use hopr_transport_network::network::{Health, Network, PeerOrigin, PeerStatus};
64use hopr_transport_p2p::{
65 HoprSwarm,
66 swarm::{TicketAggregationRequestType, TicketAggregationResponseType},
67};
68use hopr_transport_probe::{
69 DbProxy, Probe,
70 ping::{PingConfig, Pinger},
71};
72pub use hopr_transport_probe::{errors::ProbeError, ping::PingQueryReplier};
73pub use hopr_transport_protocol::{PeerDiscovery, execute_on_tick};
74use hopr_transport_protocol::{
75 errors::ProtocolError,
76 processor::{MsgSender, PacketInteractionConfig, PacketSendFinalizer, SendMsgInput},
77};
78#[cfg(feature = "runtime-tokio")]
79pub use hopr_transport_session::transfer_session;
80pub use hopr_transport_session::{
81 Capabilities as SessionCapabilities, Capability as SessionCapability, HoprSession, IncomingSession, SESSION_MTU,
82 SURB_SIZE, ServiceId, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
83 errors::{SessionManagerError, TransportSessionError},
84};
85use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
86use hopr_transport_ticket_aggregation::{
87 AwaitingAggregator, TicketAggregationActions, TicketAggregationError, TicketAggregationInteraction,
88 TicketAggregatorTrait,
89};
90use rand::seq::SliceRandom;
91#[cfg(feature = "mixer-stream")]
92use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
93use tracing::{debug, error, info, trace, warn};
94
95pub use crate::{
96 config::HoprTransportConfig,
97 helpers::{PeerEligibility, TicketStatistics},
98};
99use crate::{constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError, helpers::run_packet_planner};
100
101pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
102
103#[cfg(any(
104 all(feature = "mixer-channel", feature = "mixer-stream"),
105 all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
106))]
107compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
108
109lazy_static::lazy_static! {
111 static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
112}
113
114#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
115pub enum HoprTransportProcess {
116 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
117 Medium,
118 #[strum(to_string = "HOPR protocol ({0})")]
119 Protocol(hopr_transport_protocol::ProtocolProcesses),
120 #[strum(to_string = "session manager sub-process #{0}")]
121 SessionsManagement(usize),
122 #[strum(to_string = "network probing sub-process: {0}")]
123 Probing(hopr_transport_probe::HoprProbeProcess),
124}
125
126#[derive(Debug, Clone)]
127pub struct TicketAggregatorProxy<Db>
128where
129 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
130{
131 db: Db,
132 maybe_writer: Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
133 agg_timeout: std::time::Duration,
134}
135
136impl<Db> TicketAggregatorProxy<Db>
137where
138 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
139{
140 pub fn new(
141 db: Db,
142 maybe_writer: Arc<
143 OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>,
144 >,
145 agg_timeout: std::time::Duration,
146 ) -> Self {
147 Self {
148 db,
149 maybe_writer,
150 agg_timeout,
151 }
152 }
153}
154
155#[async_trait::async_trait]
156impl<Db> TicketAggregatorTrait for TicketAggregatorProxy<Db>
157where
158 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
159{
160 async fn aggregate_tickets(
161 &self,
162 channel: &Hash,
163 prerequisites: AggregationPrerequisites,
164 ) -> hopr_transport_ticket_aggregation::Result<()> {
165 if let Some(writer) = self.maybe_writer.clone().get() {
166 AwaitingAggregator::new(self.db.clone(), writer.clone(), self.agg_timeout)
167 .aggregate_tickets(channel, prerequisites)
168 .await
169 } else {
170 Err(TicketAggregationError::TransportError(
171 "Ticket aggregation writer not available, the object was not yet initialized".to_string(),
172 ))
173 }
174 }
175}
176
177type CurrentPathSelector = DfsPathSelector<RandomizedEdgeWeighting>;
179
180pub struct HoprTransport<T>
183where
184 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
185{
186 me: OffchainKeypair,
187 me_peerid: PeerId, me_address: Address,
189 cfg: HoprTransportConfig,
190 db: T,
191 ping: Arc<OnceLock<Pinger>>,
192 network: Arc<Network<T>>,
193 process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
194 path_planner: PathPlanner<T, CurrentPathSelector>,
195 my_multiaddresses: Vec<Multiaddr>,
196 process_ticket_aggregate:
197 Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
198 smgr: SessionManager<Sender<(DestinationRouting, ApplicationDataOut)>>,
199}
200
201impl<T> HoprTransport<T>
202where
203 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
204{
205 pub fn new(
206 me: &OffchainKeypair,
207 me_onchain: &ChainKeypair,
208 cfg: HoprTransportConfig,
209 db: T,
210 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
211 my_multiaddresses: Vec<Multiaddr>,
212 ) -> Self {
213 let process_packet_send = Arc::new(OnceLock::new());
214
215 let me_peerid: PeerId = me.into();
216 let me_chain_addr = me_onchain.public().to_address();
217
218 Self {
219 me: me.clone(),
220 me_peerid,
221 me_address: me_chain_addr,
222 ping: Arc::new(OnceLock::new()),
223 network: Arc::new(Network::new(
224 me_peerid,
225 my_multiaddresses.clone(),
226 cfg.network.clone(),
227 db.clone(),
228 )),
229 process_packet_send,
230 path_planner: PathPlanner::new(
231 me_chain_addr,
232 db.clone(),
233 CurrentPathSelector::new(
234 channel_graph.clone(),
235 DfsPathSelectorConfig {
236 node_score_threshold: cfg.network.node_score_auto_path_threshold,
237 max_first_hop_latency: cfg.network.max_first_hop_latency_threshold,
238 ..Default::default()
239 },
240 ),
241 channel_graph.clone(),
242 ),
243 my_multiaddresses,
244 process_ticket_aggregate: Arc::new(OnceLock::new()),
245 smgr: SessionManager::new(SessionManagerConfig {
246 session_tag_range: (16..65535),
248 maximum_sessions: cfg.session.maximum_sessions as usize,
249 frame_mtu: std::env::var("HOPR_SESSION_FRAME_SIZE")
250 .ok()
251 .and_then(|s| s.parse::<usize>().ok())
252 .unwrap_or_else(|| SessionManagerConfig::default().frame_mtu)
253 .max(ApplicationData::PAYLOAD_SIZE),
254 max_frame_timeout: std::env::var("HOPR_SESSION_FRAME_TIMEOUT_MS")
255 .ok()
256 .and_then(|s| s.parse::<u64>().ok().map(Duration::from_millis))
257 .unwrap_or_else(|| SessionManagerConfig::default().max_frame_timeout)
258 .max(Duration::from_millis(100)),
259 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
260 idle_timeout: cfg.session.idle_timeout,
261 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
262 initial_return_session_egress_rate: 10,
263 minimum_surb_buffer_duration: Duration::from_secs(5),
264 maximum_surb_buffer_size: db.get_surb_config().rb_capacity,
265 growable_target_surb_buffer: Some((Duration::from_secs(120), 0.10)),
268 }),
269 db,
270 cfg,
271 }
272 }
273
274 #[allow(clippy::too_many_arguments)]
281 pub async fn run(
282 &self,
283 me_onchain: &ChainKeypair,
284 on_incoming_data: UnboundedSender<ApplicationDataIn>,
285 discovery_updates: UnboundedReceiver<PeerDiscovery>,
286 on_incoming_session: UnboundedSender<IncomingSession>,
287 ) -> crate::errors::Result<HashMap<HoprTransportProcess, AbortHandle>> {
288 let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
289 futures::channel::mpsc::unbounded::<PeerDiscovery>();
290
291 let network_clone = self.network.clone();
292 let db_clone = self.db.clone();
293 let me_peerid = self.me_peerid;
294 let discovery_updates =
295 futures_concurrency::stream::StreamExt::merge(discovery_updates, internal_discovery_update_rx)
296 .filter_map(move |event| {
297 let network = network_clone.clone();
298 let db = db_clone.clone();
299 let me = me_peerid;
300
301 async move {
302 match event {
303 PeerDiscovery::Allow(peer_id) => {
304 debug!(peer = %peer_id, "Processing peer discovery event: Allow");
305
306 if let Ok(pubkey) = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer_id))
308 .await {
309 if !network.has(&peer_id).await {
310 let mas = db
311 .get_account(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pubkey))
312 .await
313 .map(|entry| {
314 entry
315 .map(|v| Vec::from_iter(v.get_multiaddr().into_iter()))
316 .unwrap_or_default()
317 })
318 .unwrap_or_default();
319
320 if let Err(e) = network.add(&peer_id, PeerOrigin::NetworkRegistry, mas).await {
321 error!(peer = %peer_id, error = %e, "Failed to allow locally (already allowed on-chain)");
322 return None;
323 }
324 }
325
326 return Some(PeerDiscovery::Allow(peer_id))
327 } else {
328 error!(peer = %peer_id, "Failed to allow locally (already allowed on-chain): peer id not convertible to off-chain public key")
329 }
330 }
331 PeerDiscovery::Ban(peer_id) => {
332 if let Err(e) = network.remove(&peer_id).await {
333 error!(peer = %peer_id, error = %e, "Failed to ban locally (already banned on-chain)")
334 } else {
335 return Some(PeerDiscovery::Ban(peer_id))
336 }
337 }
338 PeerDiscovery::Announce(peer, multiaddresses) => {
339 debug!(peer = %peer, ?multiaddresses, "Processing peer discovery event: Announce");
340 if peer != me {
341 let mas = multiaddresses
343 .into_iter()
344 .map(|ma| strip_p2p_protocol(&ma))
345 .filter(|v| !v.is_empty())
346 .collect::<Vec<_>>();
347
348 if ! mas.is_empty() {
349 if let Ok(pubkey) = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer))
351 .await {
352 if let Ok(Some(key)) = db.translate_key(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pubkey)).await {
353 let key: Result<Address, _> = key.try_into();
354
355 if let Ok(key) = key {
356 if db
357 .is_allowed_in_network_registry(None, &key)
358 .await
359 .unwrap_or(false)
360 {
361 if let Err(e) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await
362 {
363 error!(%peer, error = %e, "failed to record peer from the NetworkRegistry");
364 } else {
365 return Some(PeerDiscovery::Announce(peer, mas))
366 }
367 }
368 }
369 } else {
370 error!(%peer, "Failed to announce peer due to convertibility error");
371 }
372 }
373 }
374 }
375 }
376 }
377
378 None
379 }
380 });
381
382 info!("Loading initial peers from the storage");
383
384 let mut addresses: HashSet<Multiaddr> = HashSet::new();
385 let nodes = self.get_public_nodes().await?;
386 for (peer, _address, multiaddresses) in nodes {
387 if self.is_allowed_to_access_network(either::Left(&peer)).await? {
388 debug!(%peer, ?multiaddresses, "Using initial public node");
389 addresses.extend(multiaddresses.clone());
390
391 internal_discovery_update_tx
392 .send(PeerDiscovery::Announce(peer, multiaddresses.clone()))
393 .await
394 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
395
396 internal_discovery_update_tx
397 .send(PeerDiscovery::Allow(peer))
398 .await
399 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
400 }
401 }
402
403 let mut processes: HashMap<HoprTransportProcess, AbortHandle> = HashMap::new();
404
405 let ticket_agg_proc = TicketAggregationInteraction::new(self.db.clone(), me_onchain);
406 let tkt_agg_writer = ticket_agg_proc.writer();
407
408 let (external_msg_send, external_msg_rx) =
409 mpsc::channel::<(ApplicationDataOut, ResolvedTransportRouting, PacketSendFinalizer)>(
410 MAXIMUM_MSG_OUTGOING_BUFFER_SIZE,
411 );
412
413 self.process_packet_send
414 .clone()
415 .set(MsgSender::new(external_msg_send.clone()))
416 .expect("must set the packet processing writer only once");
417
418 self.process_ticket_aggregate
419 .clone()
420 .set(tkt_agg_writer.clone())
421 .expect("must set the ticket aggregation writer only once");
422
423 let mixer_cfg = build_mixer_cfg_from_env();
425
426 #[cfg(feature = "mixer-channel")]
427 let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
428
429 #[cfg(feature = "mixer-stream")]
430 let (mixing_channel_tx, mixing_channel_rx) = {
431 let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
432 let rx = rx.then_concurrent(move |v| {
433 let cfg = mixer_cfg;
434
435 async move {
436 let random_delay = cfg.random_delay();
437 trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
438
439 #[cfg(all(feature = "prometheus", not(test)))]
440 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
441
442 sleep(random_delay).await;
443
444 #[cfg(all(feature = "prometheus", not(test)))]
445 {
446 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
447
448 let weight = 1.0f64 / cfg.metric_delay_window as f64;
449 hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
450 (weight * random_delay.as_millis() as f64)
451 + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
452 );
453 }
454
455 v
456 }
457 });
458
459 (tx, rx)
460 };
461
462 let mut transport_layer =
463 HoprSwarm::new((&self.me).into(), discovery_updates, self.my_multiaddresses.clone()).await;
464
465 if let Some(port) = self.cfg.protocol.autonat_port {
466 transport_layer.run_nat_server(port);
467 }
468
469 if addresses.is_empty() {
470 warn!("No addresses found in the database, not dialing any NAT servers");
471 } else {
472 info!(num_addresses = addresses.len(), "Found addresses from the database");
473 let mut randomized_addresses: Vec<_> = addresses.into_iter().collect();
474 randomized_addresses.shuffle(&mut rand::thread_rng());
475 transport_layer.dial_nat_server(randomized_addresses);
476 }
477
478 let msg_proto_control =
479 transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
480 let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
481 let (wire_msg_tx, wire_msg_rx) =
482 hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
483
484 let _mixing_process_before_sending_out = hopr_async_runtime::prelude::spawn(
485 mixing_channel_rx
486 .inspect(|(peer, _)| tracing::trace!(%peer, "moving message from mixer to p2p stream"))
487 .map(Ok)
488 .forward(wire_msg_tx),
489 );
490
491 let (transport_events_tx, transport_events_rx) =
492 futures::channel::mpsc::channel::<hopr_transport_p2p::DiscoveryEvent>(1000);
493
494 let network_clone = self.network.clone();
495 spawn(transport_events_rx.for_each(move |event| {
496 let network = network_clone.clone();
497
498 async move {
499 match event {
500 hopr_transport_p2p::DiscoveryEvent::IncomingConnection(peer, multiaddr) => {
501 if let Err(error) = network
502 .add(&peer, PeerOrigin::IncomingConnection, vec![multiaddr])
503 .await
504 {
505 tracing::error!(%peer, %error, "Failed to add incoming connection peer");
506 }
507 }
508 hopr_transport_p2p::DiscoveryEvent::FailedDial(peer) => {
509 if let Err(error) = network
510 .update(&peer, Err(hopr_transport_network::network::UpdateFailure::DialFailure))
511 .await
512 {
513 tracing::error!(%peer, %error, "Failed to update peer status after failed dial");
514 }
515 }
516 }
517 }
518 .inspect(|_| {
519 tracing::info!(
520 task = "transport event notifier",
521 "long-running background task finished"
522 )
523 })
524 }));
525
526 processes.insert(
527 HoprTransportProcess::Medium,
528 spawn_as_abortable!(transport_layer.run(transport_events_tx)),
529 );
530
531 let packet_cfg = PacketInteractionConfig {
533 packet_keypair: self.me.clone(),
534 outgoing_ticket_win_prob: self
535 .cfg
536 .protocol
537 .outgoing_ticket_winning_prob
538 .map(WinningProbability::try_from)
539 .transpose()?,
540 outgoing_ticket_price: self.cfg.protocol.outgoing_ticket_price,
541 };
542
543 let (tx_from_protocol, rx_from_protocol) = unbounded::<(HoprPseudonym, ApplicationDataIn)>();
544 for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
545 packet_cfg,
546 self.db.clone(),
547 (
548 mixing_channel_tx.with(|(peer, msg): (PeerId, Box<[u8]>)| {
549 trace!(%peer, "sending message to peer");
550 futures::future::ok::<_, hopr_transport_mixer::channel::SenderError>((peer, msg))
551 }),
552 wire_msg_rx.inspect(|(peer, _)| trace!(%peer, "received message from peer")),
553 ),
554 (tx_from_protocol, external_msg_rx),
555 )
556 .await
557 .into_iter()
558 {
559 processes.insert(HoprTransportProcess::Protocol(k), v);
560 }
561
562 let (tx_from_probing, rx_from_probing) = unbounded::<(HoprPseudonym, ApplicationDataIn)>();
564
565 let (manual_ping_tx, manual_ping_rx) = unbounded::<(PeerId, PingQueryReplier)>();
566
567 let probe = Probe::new((*self.me.public(), self.me_address), self.cfg.probe);
568 for (k, v) in probe
569 .continuously_scan(
570 (external_msg_send, rx_from_protocol),
571 manual_ping_rx,
572 network_notifier::ProbeNetworkInteractions::new(
573 self.network.clone(),
574 self.db.clone(),
575 self.path_planner.channel_graph(),
576 ),
577 DbProxy::new(self.db.clone()),
578 tx_from_probing,
579 )
580 .await
581 .into_iter()
582 {
583 processes.insert(HoprTransportProcess::Probing(k), v);
584 }
585
586 self.ping
588 .clone()
589 .set(Pinger::new(
590 PingConfig {
591 timeout: self.cfg.probe.timeout,
592 },
593 manual_ping_tx,
594 ))
595 .expect("must set the ticket aggregation writer only once");
596
597 let packet_planner = run_packet_planner(
599 self.path_planner.clone(),
600 self.process_packet_send
601 .get()
602 .cloned()
603 .expect("packet sender must be set"),
604 );
605
606 self.smgr
607 .start(packet_planner, on_incoming_session)
608 .expect("failed to start session manager")
609 .into_iter()
610 .enumerate()
611 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
612 .for_each(|(k, v)| {
613 processes.insert(k, v);
614 });
615
616 let smgr = self.smgr.clone();
617 processes.insert(
618 HoprTransportProcess::SessionsManagement(0),
619 spawn_as_abortable!(async move {
620 let _the_process_should_not_end = StreamExt::filter_map(rx_from_probing, |(pseudonym, data)| {
621 let smgr = smgr.clone();
622 async move {
623 match smgr.dispatch_message(pseudonym, data).await {
624 Ok(DispatchResult::Processed) => {
625 trace!("message dispatch completed");
626 None
627 }
628 Ok(DispatchResult::Unrelated(data)) => {
629 trace!("unrelated message dispatch completed");
630 Some(data)
631 }
632 Err(e) => {
633 error!(error = %e, "error while processing packet");
634 None
635 }
636 }
637 }
638 })
639 .map(Ok)
640 .forward(on_incoming_data)
641 .await;
642 }),
643 );
644
645 Ok(processes)
646 }
647
648 pub fn ticket_aggregator(&self) -> Arc<dyn TicketAggregatorTrait + Send + Sync + 'static> {
649 Arc::new(proxy::TicketAggregatorProxy::new(
650 self.db.clone(),
651 self.process_ticket_aggregate.clone(),
652 std::time::Duration::from_secs(15),
653 ))
654 }
655
656 #[tracing::instrument(level = "debug", skip(self))]
657 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
658 if !self.is_allowed_to_access_network(either::Left(peer)).await? {
659 return Err(HoprTransportError::Api(format!(
660 "ping to '{peer}' not allowed due to network registry"
661 )));
662 }
663
664 if peer == &self.me_peerid {
665 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
666 }
667
668 let pinger = self
669 .ping
670 .get()
671 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
672
673 if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
674 error!(error = %e, "Failed to store the peer observation");
675 }
676
677 let latency = (*pinger).ping(*peer).await?;
678
679 let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::Probe(
680 hopr_transport_probe::errors::ProbeError::NonExistingPeer,
681 ))?;
682
683 Ok((latency, peer_status))
684 }
685
686 #[tracing::instrument(level = "debug", skip(self))]
687 pub async fn new_session(
688 &self,
689 destination: Address,
690 target: SessionTarget,
691 cfg: SessionClientConfig,
692 ) -> errors::Result<HoprSession> {
693 Ok(self.smgr.new_session(destination, target, cfg).await?)
694 }
695
696 #[tracing::instrument(level = "debug", skip(self))]
697 pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
698 Ok(self.smgr.ping_session(id).await?)
699 }
700
701 pub async fn session_surb_balancing_cfg(&self, id: &SessionId) -> errors::Result<Option<SurbBalancerConfig>> {
702 Ok(self.smgr.get_surb_balancer_config(id).await?)
703 }
704
705 pub async fn update_session_surb_balancing_cfg(
706 &self,
707 id: &SessionId,
708 cfg: SurbBalancerConfig,
709 ) -> errors::Result<()> {
710 Ok(self.smgr.update_surb_balancer_config(id, cfg).await?)
711 }
712
713 #[tracing::instrument(level = "info", skip(self, msg), fields(uuid = uuid::Uuid::new_v4().to_string()))]
714 pub async fn send_message(&self, msg: Box<[u8]>, routing: DestinationRouting, tag: Tag) -> errors::Result<()> {
715 if let Tag::Reserved(reserved_tag) = tag {
716 return Err(HoprTransportError::Api(format!(
717 "Application tag must not from range: {:?}, but was {reserved_tag:?}",
718 Tag::APPLICATION_TAG_RANGE
719 )));
720 }
721
722 if msg.len() > HoprPacket::PAYLOAD_SIZE {
723 return Err(HoprTransportError::Api(format!(
724 "Message exceeds the maximum allowed size of {} bytes",
725 HoprPacket::PAYLOAD_SIZE
726 )));
727 }
728
729 let app_data = ApplicationData::new(tag, msg.into_vec())?;
730 let routing = self
731 .path_planner
732 .resolve_routing(app_data.total_len(), usize::MAX, routing)
733 .await?
734 .0;
735
736 let sender = self.process_packet_send.get().ok_or_else(|| {
739 HoprTransportError::Api("send msg: failed because message processing is not yet initialized".into())
740 })?;
741
742 sender
743 .send_packet(ApplicationDataOut::with_no_packet_info(app_data), routing)
744 .await
745 .map_err(|e| HoprTransportError::Api(format!("send msg failed to enqueue msg: {e}")))?
746 .consume_and_wait(crate::constants::PACKET_QUEUE_TIMEOUT_MILLISECONDS)
747 .await
748 .map_err(|e| HoprTransportError::Api(e.to_string()))
749 }
750
751 #[tracing::instrument(level = "debug", skip(self))]
752 pub async fn aggregate_tickets(&self, channel_id: &Hash) -> errors::Result<()> {
753 let entry = self
754 .db
755 .get_channel_by_id(None, channel_id)
756 .await
757 .map_err(hopr_db_sql::api::errors::DbError::from)
758 .map_err(HoprTransportError::from)
759 .and_then(|c| {
760 if let Some(c) = c {
761 Ok(c)
762 } else {
763 Err(ProtocolError::ChannelNotFound.into())
764 }
765 })?;
766
767 if entry.status != ChannelStatus::Open {
768 return Err(ProtocolError::ChannelClosed.into());
769 }
770
771 Err(TicketAggregationError::TransportError(
780 "Ticket aggregation not supported as a session protocol yet".to_string(),
781 )
782 .into())
783 }
784
785 #[tracing::instrument(level = "debug", skip(self))]
786 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
787 Ok(self
788 .db
789 .get_accounts(None, true)
790 .await
791 .map_err(hopr_db_sql::api::errors::DbError::from)?
792 .into_iter()
793 .map(|entry| {
794 (
795 PeerId::from(entry.public_key),
796 entry.chain_addr,
797 Vec::from_iter(entry.get_multiaddr().into_iter()),
798 )
799 })
800 .collect())
801 }
802
803 pub async fn is_allowed_to_access_network<'a>(
804 &self,
805 address_like: either::Either<&'a PeerId, Address>,
806 ) -> errors::Result<bool>
807 where
808 T: 'a,
809 {
810 let db_clone = self.db.clone();
811 let address_like_noref = address_like.map_left(|peer| *peer);
812
813 Ok(self
814 .db
815 .begin_transaction()
816 .await
817 .map_err(hopr_db_sql::api::errors::DbError::from)?
818 .perform(|tx| {
819 Box::pin(async move {
820 match address_like_noref {
821 either::Left(peer) => {
822 let pubkey =
824 hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer))
825 .await
826 .map_err(|e| hopr_db_sql::api::errors::DbError::General(e.to_string()))?;
827 if let Some(address) = db_clone.translate_key(Some(tx), pubkey).await? {
828 db_clone.is_allowed_in_network_registry(Some(tx), &address).await
829 } else {
830 Err(hopr_db_sql::errors::DbSqlError::LogicalError(
831 "cannot translate off-chain key".into(),
832 ))
833 }
834 }
835 either::Right(address) => db_clone.is_allowed_in_network_registry(Some(tx), &address).await,
836 }
837 })
838 })
839 .await
840 .map_err(hopr_db_sql::api::errors::DbError::from)?)
841 }
842
843 #[tracing::instrument(level = "debug", skip(self))]
844 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
845 self.network
846 .get(&self.me_peerid)
847 .await
848 .unwrap_or_else(|e| {
849 error!(error = %e, "failed to obtain listening multi-addresses");
850 None
851 })
852 .map(|peer| peer.multiaddresses)
853 .unwrap_or_default()
854 }
855
856 #[tracing::instrument(level = "debug", skip(self))]
857 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
858 let mut mas = self
859 .local_multiaddresses()
860 .into_iter()
861 .filter(|ma| {
862 hopr_transport_identity::multiaddrs::is_supported(ma)
863 && (self.cfg.transport.announce_local_addresses
864 || !hopr_transport_identity::multiaddrs::is_private(ma))
865 })
866 .map(|ma| strip_p2p_protocol(&ma))
867 .filter(|v| !v.is_empty())
868 .collect::<Vec<_>>();
869
870 mas.sort_by(|l, r| {
871 let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
872 let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
873
874 if !(is_left_dns ^ is_right_dns) {
875 std::cmp::Ordering::Equal
876 } else if is_left_dns {
877 std::cmp::Ordering::Less
878 } else {
879 std::cmp::Ordering::Greater
880 }
881 });
882
883 mas
884 }
885
886 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
887 self.my_multiaddresses.clone()
888 }
889
890 #[tracing::instrument(level = "debug", skip(self))]
891 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
892 self.network
893 .get(peer)
894 .await
895 .unwrap_or(None)
896 .map(|peer| peer.multiaddresses)
897 .unwrap_or(vec![])
898 }
899
900 #[tracing::instrument(level = "debug", skip(self))]
901 pub async fn network_health(&self) -> Health {
902 self.network.health().await
903 }
904
905 #[tracing::instrument(level = "debug", skip(self))]
906 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
907 Ok(self.network.connected_peers().await?)
908 }
909
910 #[tracing::instrument(level = "debug", skip(self))]
911 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
912 Ok(self.network.get(peer).await?)
913 }
914
915 #[tracing::instrument(level = "debug", skip(self))]
916 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
917 let ticket_stats = self.db.get_ticket_statistics(None).await?;
918
919 Ok(TicketStatistics {
920 winning_count: ticket_stats.winning_tickets,
921 unredeemed_value: ticket_stats.unredeemed_value,
922 redeemed_value: ticket_stats.redeemed_value,
923 neglected_value: ticket_stats.neglected_value,
924 rejected_value: ticket_stats.rejected_value,
925 })
926 }
927
928 #[tracing::instrument(level = "debug", skip(self))]
929 pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
930 if let Some(channel) = self
931 .db
932 .get_channel_by_id(None, channel_id)
933 .await
934 .map_err(hopr_db_sql::api::errors::DbError::from)?
935 {
936 let own_address: Address = self
937 .db
938 .translate_key(None, ChainOrPacketKey::PacketKey(*self.me.public()))
939 .await?
940 .ok_or_else(|| {
941 HoprTransportError::Api("Failed to translate the off-chain key to on-chain address".into())
942 })?
943 .try_into()?;
944
945 if channel.destination == own_address {
946 Ok(Some(self.db.get_tickets((&channel).into()).await?))
947 } else {
948 Ok(None)
949 }
950 } else {
951 Ok(None)
952 }
953 }
954
955 #[tracing::instrument(level = "debug", skip(self))]
956 pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
957 Ok(self
958 .db
959 .get_all_tickets()
960 .await?
961 .into_iter()
962 .map(|v| v.ticket.leak())
963 .collect())
964 }
965}
966
967fn build_mixer_cfg_from_env() -> MixerConfig {
968 let mixer_cfg = MixerConfig {
969 min_delay: std::time::Duration::from_millis(
970 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
971 .map(|v| {
972 v.trim()
973 .parse::<u64>()
974 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
975 })
976 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
977 ),
978 delay_range: std::time::Duration::from_millis(
979 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
980 .map(|v| {
981 v.trim()
982 .parse::<u64>()
983 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
984 })
985 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
986 ),
987 capacity: std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
988 .map(|v| {
989 v.trim()
990 .parse::<usize>()
991 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY)
992 })
993 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY),
994 ..MixerConfig::default()
995 };
996 debug!(?mixer_cfg, "Mixer configuration");
997
998 mixer_cfg
999}