1pub mod config;
18pub mod constants;
20pub mod errors;
22pub mod helpers;
23pub mod network_notifier;
24pub mod proxy;
26
27use std::{
28 collections::{HashMap, HashSet},
29 sync::{Arc, OnceLock},
30};
31
32use async_lock::RwLock;
33use constants::MAXIMUM_MSG_OUTGOING_BUFFER_SIZE;
34use futures::{
35 SinkExt, StreamExt,
36 channel::mpsc::{self, Sender, UnboundedReceiver, UnboundedSender, unbounded},
37};
38use helpers::PathPlanner;
39use hopr_async_runtime::{AbortHandle, spawn_as_abortable};
40use hopr_crypto_packet::prelude::HoprPacket;
41pub use hopr_crypto_types::{
42 keypairs::{ChainKeypair, Keypair, OffchainKeypair},
43 types::{HalfKeyChallenge, Hash, OffchainPublicKey},
44};
45use hopr_db_sql::{
46 HoprDbAllOperations,
47 accounts::ChainOrPacketKey,
48 api::tickets::{AggregationPrerequisites, HoprDbTicketOperations},
49};
50pub use hopr_internal_types::prelude::HoprPseudonym;
51use hopr_internal_types::prelude::*;
52pub use hopr_network_types::prelude::RoutingOptions;
53use hopr_network_types::prelude::{DestinationRouting, ResolvedTransportRouting};
54use hopr_path::{
55 PathAddressResolver,
56 selectors::dfs::{DfsPathSelector, DfsPathSelectorConfig, RandomizedEdgeWeighting},
57};
58use hopr_primitive_types::prelude::*;
59use hopr_transport_identity::multiaddrs::strip_p2p_protocol;
60pub use hopr_transport_identity::{Multiaddr, PeerId};
61use hopr_transport_mixer::MixerConfig;
62pub use hopr_transport_network::network::{Health, Network, PeerOrigin, PeerStatus};
63use hopr_transport_p2p::{
64 HoprSwarm,
65 swarm::{TicketAggregationRequestType, TicketAggregationResponseType},
66};
67pub use hopr_transport_packet::prelude::{ApplicationData, Tag};
68use hopr_transport_probe::{
69 DbProxy, Probe,
70 ping::{PingConfig, Pinger},
71};
72pub use hopr_transport_probe::{errors::ProbeError, ping::PingQueryReplier};
73pub use hopr_transport_protocol::{PeerDiscovery, execute_on_tick};
74use hopr_transport_protocol::{
75 errors::ProtocolError,
76 processor::{MsgSender, PacketInteractionConfig, PacketSendFinalizer, SendMsgInput},
77};
78#[cfg(feature = "runtime-tokio")]
79pub use hopr_transport_session::transfer_session;
80pub use hopr_transport_session::{
81 Capabilities as SessionCapabilities, Capability as SessionCapability, IncomingSession, SESSION_PAYLOAD_SIZE,
82 ServiceId, Session, SessionClientConfig, SessionId, SessionTarget, SurbBalancerConfig,
83 errors::TransportSessionError, traits::SendMsg,
84};
85use hopr_transport_session::{DispatchResult, SessionManager, SessionManagerConfig};
86use hopr_transport_ticket_aggregation::{
87 AwaitingAggregator, TicketAggregationActions, TicketAggregationError, TicketAggregationInteraction,
88 TicketAggregatorTrait,
89};
90use rand::seq::SliceRandom;
91#[cfg(feature = "mixer-stream")]
92use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
93use tracing::{debug, error, info, trace, warn};
94
95pub use crate::{
96 config::HoprTransportConfig,
97 helpers::{PeerEligibility, TicketStatistics},
98};
99use crate::{constants::SESSION_INITIATION_TIMEOUT_BASE, errors::HoprTransportError};
100
101pub const APPLICATION_TAG_RANGE: std::ops::Range<Tag> = Tag::APPLICATION_TAG_RANGE;
102
103#[cfg(any(
104 all(feature = "mixer-channel", feature = "mixer-stream"),
105 all(not(feature = "mixer-channel"), not(feature = "mixer-stream"))
106))]
107compile_error!("Exactly one of the 'mixer-channel' or 'mixer-stream' features must be specified");
108
109lazy_static::lazy_static! {
111 static ref SESSION_INITIATION_TIMEOUT_MAX: std::time::Duration = 2 * constants::SESSION_INITIATION_TIMEOUT_BASE * RoutingOptions::MAX_INTERMEDIATE_HOPS as u32;
112}
113
114#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, strum::Display)]
115pub enum HoprTransportProcess {
116 #[strum(to_string = "component responsible for the transport medium (libp2p swarm)")]
117 Medium,
118 #[strum(to_string = "HOPR protocol ({0})")]
119 Protocol(hopr_transport_protocol::ProtocolProcesses),
120 #[strum(to_string = "session manager sub-process #{0}")]
121 SessionsManagement(usize),
122 #[strum(to_string = "network probing sub-process: {0}")]
123 Probing(hopr_transport_probe::HoprProbeProcess),
124}
125
126#[derive(Debug, Clone)]
127pub struct TicketAggregatorProxy<Db>
128where
129 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
130{
131 db: Db,
132 maybe_writer: Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
133 agg_timeout: std::time::Duration,
134}
135
136impl<Db> TicketAggregatorProxy<Db>
137where
138 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
139{
140 pub fn new(
141 db: Db,
142 maybe_writer: Arc<
143 OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>,
144 >,
145 agg_timeout: std::time::Duration,
146 ) -> Self {
147 Self {
148 db,
149 maybe_writer,
150 agg_timeout,
151 }
152 }
153}
154
155#[async_trait::async_trait]
156impl<Db> TicketAggregatorTrait for TicketAggregatorProxy<Db>
157where
158 Db: HoprDbTicketOperations + Send + Sync + Clone + std::fmt::Debug,
159{
160 async fn aggregate_tickets(
161 &self,
162 channel: &Hash,
163 prerequisites: AggregationPrerequisites,
164 ) -> hopr_transport_ticket_aggregation::Result<()> {
165 if let Some(writer) = self.maybe_writer.clone().get() {
166 AwaitingAggregator::new(self.db.clone(), writer.clone(), self.agg_timeout)
167 .aggregate_tickets(channel, prerequisites)
168 .await
169 } else {
170 Err(TicketAggregationError::TransportError(
171 "Ticket aggregation writer not available, the object was not yet initialized".to_string(),
172 ))
173 }
174 }
175}
176
177type CurrentPathSelector = DfsPathSelector<RandomizedEdgeWeighting>;
179
180pub struct HoprTransport<T>
183where
184 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
185{
186 me: OffchainKeypair,
187 me_peerid: PeerId, me_address: Address,
189 cfg: HoprTransportConfig,
190 db: T,
191 ping: Arc<OnceLock<Pinger>>,
192 network: Arc<Network<T>>,
193 process_packet_send: Arc<OnceLock<MsgSender<Sender<SendMsgInput>>>>,
194 path_planner: PathPlanner<T, CurrentPathSelector>,
195 my_multiaddresses: Vec<Multiaddr>,
196 process_ticket_aggregate:
197 Arc<OnceLock<TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>>>,
198 smgr: SessionManager<helpers::MessageSender<T, CurrentPathSelector>>,
199}
200
201impl<T> HoprTransport<T>
202where
203 T: HoprDbAllOperations + PathAddressResolver + std::fmt::Debug + Clone + Send + Sync + 'static,
204{
205 pub fn new(
206 me: &OffchainKeypair,
207 me_onchain: &ChainKeypair,
208 cfg: HoprTransportConfig,
209 db: T,
210 channel_graph: Arc<RwLock<hopr_path::channel_graph::ChannelGraph>>,
211 my_multiaddresses: Vec<Multiaddr>,
212 ) -> Self {
213 let process_packet_send = Arc::new(OnceLock::new());
214
215 let me_peerid: PeerId = me.into();
216 let me_chain_addr = me_onchain.public().to_address();
217
218 Self {
219 me: me.clone(),
220 me_peerid,
221 me_address: me_chain_addr,
222 ping: Arc::new(OnceLock::new()),
223 network: Arc::new(Network::new(
224 me_peerid,
225 my_multiaddresses.clone(),
226 cfg.network.clone(),
227 db.clone(),
228 )),
229 process_packet_send,
230 path_planner: PathPlanner::new(
231 me_chain_addr,
232 db.clone(),
233 CurrentPathSelector::new(
234 channel_graph.clone(),
235 DfsPathSelectorConfig {
236 node_score_threshold: cfg.network.node_score_auto_path_threshold,
237 max_first_hop_latency: cfg.network.max_first_hop_latency_threshold,
238 ..Default::default()
239 },
240 ),
241 channel_graph.clone(),
242 ),
243 db,
244 my_multiaddresses,
245 process_ticket_aggregate: Arc::new(OnceLock::new()),
246 smgr: SessionManager::new(SessionManagerConfig {
247 session_tag_range: (16..65535),
249 initiation_timeout_base: SESSION_INITIATION_TIMEOUT_BASE,
250 idle_timeout: cfg.session.idle_timeout,
251 balancer_sampling_interval: cfg.session.balancer_sampling_interval,
252 }),
253 cfg,
254 }
255 }
256
257 #[allow(clippy::too_many_arguments)]
264 pub async fn run(
265 &self,
266 me_onchain: &ChainKeypair,
267 tbf_path: String,
268 on_incoming_data: UnboundedSender<ApplicationData>,
269 discovery_updates: UnboundedReceiver<PeerDiscovery>,
270 on_incoming_session: UnboundedSender<IncomingSession>,
271 ) -> crate::errors::Result<HashMap<HoprTransportProcess, AbortHandle>> {
272 let (mut internal_discovery_update_tx, internal_discovery_update_rx) =
273 futures::channel::mpsc::unbounded::<PeerDiscovery>();
274
275 let network_clone = self.network.clone();
276 let db_clone = self.db.clone();
277 let me_peerid = self.me_peerid;
278 let discovery_updates =
279 futures_concurrency::stream::StreamExt::merge(discovery_updates, internal_discovery_update_rx)
280 .filter_map(move |event| {
281 let network = network_clone.clone();
282 let db = db_clone.clone();
283 let me = me_peerid;
284
285 async move {
286 match event {
287 PeerDiscovery::Allow(peer_id) => {
288 debug!(peer = %peer_id, "Processing peer discovery event: Allow");
289 if let Ok(pk) = OffchainPublicKey::try_from(peer_id) {
290 if !network.has(&peer_id).await {
291 let mas = db
292 .get_account(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pk))
293 .await
294 .map(|entry| {
295 entry
296 .map(|v| Vec::from_iter(v.get_multiaddr().into_iter()))
297 .unwrap_or_default()
298 })
299 .unwrap_or_default();
300
301 if let Err(e) = network.add(&peer_id, PeerOrigin::NetworkRegistry, mas).await {
302 error!(peer = %peer_id, error = %e, "Failed to allow locally (already allowed on-chain)");
303 return None;
304 }
305 }
306
307 return Some(PeerDiscovery::Allow(peer_id))
308 } else {
309 error!(peer = %peer_id, "Failed to allow locally (already allowed on-chain): peer id not convertible to off-chain public key")
310 }
311 }
312 PeerDiscovery::Ban(peer_id) => {
313 if let Err(e) = network.remove(&peer_id).await {
314 error!(peer = %peer_id, error = %e, "Failed to ban locally (already banned on-chain)")
315 } else {
316 return Some(PeerDiscovery::Ban(peer_id))
317 }
318 }
319 PeerDiscovery::Announce(peer, multiaddresses) => {
320 debug!(peer = %peer, ?multiaddresses, "Processing peer discovery event: Announce");
321 if peer != me {
322 let mas = multiaddresses
324 .into_iter()
325 .map(|ma| strip_p2p_protocol(&ma))
326 .filter(|v| !v.is_empty())
327 .collect::<Vec<_>>();
328
329 if ! mas.is_empty() {
330 if let Ok(pk) = OffchainPublicKey::try_from(peer) {
331 if let Ok(Some(key)) = db.translate_key(None, hopr_db_sql::accounts::ChainOrPacketKey::PacketKey(pk)).await {
332 let key: Result<Address, _> = key.try_into();
333
334 if let Ok(key) = key {
335 if db
336 .is_allowed_in_network_registry(None, &key)
337 .await
338 .unwrap_or(false)
339 {
340 if let Err(e) = network.add(&peer, PeerOrigin::NetworkRegistry, mas.clone()).await
341 {
342 error!(%peer, error = %e, "failed to record peer from the NetworkRegistry");
343 } else {
344 return Some(PeerDiscovery::Announce(peer, mas))
345 }
346 }
347 }
348 } else {
349 error!(%peer, "Failed to announce peer due to convertibility error");
350 }
351 }
352 }
353 }
354 }
355 }
356
357 None
358 }
359 });
360
361 info!("Loading initial peers from the storage");
362
363 let mut addresses: HashSet<Multiaddr> = HashSet::new();
364 let nodes = self.get_public_nodes().await?;
365 for (peer, _address, multiaddresses) in nodes {
366 if self.is_allowed_to_access_network(either::Left(&peer)).await? {
367 debug!(%peer, ?multiaddresses, "Using initial public node");
368 addresses.extend(multiaddresses.clone());
369
370 internal_discovery_update_tx
371 .send(PeerDiscovery::Announce(peer, multiaddresses.clone()))
372 .await
373 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
374
375 internal_discovery_update_tx
376 .send(PeerDiscovery::Allow(peer))
377 .await
378 .map_err(|e| HoprTransportError::Api(e.to_string()))?;
379 }
380 }
381
382 let mut processes: HashMap<HoprTransportProcess, AbortHandle> = HashMap::new();
383
384 let ticket_agg_proc = TicketAggregationInteraction::new(self.db.clone(), me_onchain);
385 let tkt_agg_writer = ticket_agg_proc.writer();
386
387 let (external_msg_send, external_msg_rx) =
388 mpsc::channel::<(ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>(
389 MAXIMUM_MSG_OUTGOING_BUFFER_SIZE,
390 );
391
392 self.process_packet_send
393 .clone()
394 .set(MsgSender::new(external_msg_send.clone()))
395 .expect("must set the packet processing writer only once");
396
397 self.process_ticket_aggregate
398 .clone()
399 .set(tkt_agg_writer.clone())
400 .expect("must set the ticket aggregation writer only once");
401
402 let mixer_cfg = build_mixer_cfg_from_env();
404
405 #[cfg(feature = "mixer-channel")]
406 let (mixing_channel_tx, mixing_channel_rx) = hopr_transport_mixer::channel::<(PeerId, Box<[u8]>)>(mixer_cfg);
407
408 #[cfg(feature = "mixer-stream")]
409 let (mixing_channel_tx, mixing_channel_rx) = {
410 let (tx, rx) = futures::channel::mpsc::channel::<(PeerId, Box<[u8]>)>(MAXIMUM_MSG_OUTGOING_BUFFER_SIZE);
411 let rx = rx.then_concurrent(move |v| {
412 let cfg = mixer_cfg;
413
414 async move {
415 let random_delay = cfg.random_delay();
416 trace!(delay_in_ms = random_delay.as_millis(), "Created random mixer delay",);
417
418 #[cfg(all(feature = "prometheus", not(test)))]
419 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
420
421 sleep(random_delay).await;
422
423 #[cfg(all(feature = "prometheus", not(test)))]
424 {
425 hopr_transport_mixer::channel::METRIC_QUEUE_SIZE.decrement(1.0f64);
426
427 let weight = 1.0f64 / cfg.metric_delay_window as f64;
428 hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.set(
429 (weight * random_delay.as_millis() as f64)
430 + ((1.0f64 - weight) * hopr_transport_mixer::channel::METRIC_MIXER_AVERAGE_DELAY.get()),
431 );
432 }
433
434 v
435 }
436 });
437
438 (tx, rx)
439 };
440
441 let mut transport_layer =
442 HoprSwarm::new((&self.me).into(), discovery_updates, self.my_multiaddresses.clone()).await;
443
444 if let Some(port) = self.cfg.protocol.autonat_port {
445 transport_layer.run_nat_server(port);
446 }
447
448 if addresses.is_empty() {
449 warn!("No addresses found in the database, not dialing any NAT servers");
450 } else {
451 info!(num_addresses = addresses.len(), "Found addresses from the database");
452 let mut randomized_addresses: Vec<_> = addresses.into_iter().collect();
453 randomized_addresses.shuffle(&mut rand::thread_rng());
454 transport_layer.dial_nat_server(randomized_addresses);
455 }
456
457 let msg_proto_control =
458 transport_layer.build_protocol_control(hopr_transport_protocol::CURRENT_HOPR_MSG_PROTOCOL);
459 let msg_codec = hopr_transport_protocol::HoprBinaryCodec {};
460 let (wire_msg_tx, wire_msg_rx) =
461 hopr_transport_protocol::stream::process_stream_protocol(msg_codec, msg_proto_control).await?;
462
463 let _mixing_process_before_sending_out =
464 hopr_async_runtime::prelude::spawn(mixing_channel_rx.map(Ok).forward(wire_msg_tx));
465
466 processes.insert(HoprTransportProcess::Medium, spawn_as_abortable(transport_layer.run()));
467
468 let packet_cfg = PacketInteractionConfig {
470 packet_keypair: self.me.clone(),
471 outgoing_ticket_win_prob: self
472 .cfg
473 .protocol
474 .outgoing_ticket_winning_prob
475 .map(WinningProbability::try_from)
476 .transpose()?,
477 outgoing_ticket_price: self.cfg.protocol.outgoing_ticket_price,
478 };
479
480 let (tx_from_protocol, rx_from_protocol) = unbounded::<(HoprPseudonym, ApplicationData)>();
481 for (k, v) in hopr_transport_protocol::run_msg_ack_protocol(
482 packet_cfg,
483 self.db.clone(),
484 Some(tbf_path),
485 (mixing_channel_tx, wire_msg_rx),
486 (tx_from_protocol, external_msg_rx),
487 )
488 .await
489 .into_iter()
490 {
491 processes.insert(HoprTransportProcess::Protocol(k), v);
492 }
493
494 let (tx_from_probing, rx_from_probing) = unbounded::<(HoprPseudonym, ApplicationData)>();
496
497 let (manual_ping_tx, manual_ping_rx) = unbounded::<(PeerId, PingQueryReplier)>();
498
499 let probe = Probe::new((*self.me.public(), self.me_address), self.cfg.probe);
500 for (k, v) in probe
501 .continuously_scan(
502 (external_msg_send, rx_from_protocol),
503 manual_ping_rx,
504 network_notifier::ProbeNetworkInteractions::new(
505 self.network.clone(),
506 self.db.clone(),
507 self.path_planner.channel_graph(),
508 ),
509 DbProxy::new(self.db.clone()),
510 tx_from_probing,
511 )
512 .await
513 .into_iter()
514 {
515 processes.insert(HoprTransportProcess::Probing(k), v);
516 }
517
518 self.ping
520 .clone()
521 .set(Pinger::new(
522 PingConfig {
523 timeout: self.cfg.probe.timeout,
524 },
525 manual_ping_tx,
526 ))
527 .expect("must set the ticket aggregation writer only once");
528
529 let msg_sender = helpers::MessageSender::new(self.process_packet_send.clone(), self.path_planner.clone());
531 self.smgr
532 .start(msg_sender, on_incoming_session)
533 .expect("failed to start session manager")
534 .into_iter()
535 .enumerate()
536 .map(|(i, jh)| (HoprTransportProcess::SessionsManagement(i + 1), jh))
537 .for_each(|(k, v)| {
538 processes.insert(k, v);
539 });
540
541 let smgr = self.smgr.clone();
542 processes.insert(
543 HoprTransportProcess::SessionsManagement(0),
544 spawn_as_abortable(async move {
545 let _the_process_should_not_end = StreamExt::filter_map(rx_from_probing, |(pseudonym, data)| {
546 let smgr = smgr.clone();
547 async move {
548 match smgr.dispatch_message(pseudonym, data).await {
549 Ok(DispatchResult::Processed) => {
550 trace!("message dispatch completed");
551 None
552 }
553 Ok(DispatchResult::Unrelated(data)) => {
554 trace!("unrelated message dispatch completed");
555 Some(data)
556 }
557 Err(e) => {
558 error!(error = %e, "error while processing packet");
559 None
560 }
561 }
562 }
563 })
564 .map(Ok)
565 .forward(on_incoming_data)
566 .await;
567 }),
568 );
569
570 Ok(processes)
571 }
572
573 pub fn ticket_aggregator(&self) -> Arc<dyn TicketAggregatorTrait + Send + Sync + 'static> {
574 Arc::new(proxy::TicketAggregatorProxy::new(
575 self.db.clone(),
576 self.process_ticket_aggregate.clone(),
577 std::time::Duration::from_secs(15),
578 ))
579 }
580
581 #[tracing::instrument(level = "debug", skip(self))]
582 pub async fn ping(&self, peer: &PeerId) -> errors::Result<(std::time::Duration, PeerStatus)> {
583 if !self.is_allowed_to_access_network(either::Left(peer)).await? {
584 return Err(HoprTransportError::Api(format!(
585 "ping to '{peer}' not allowed due to network registry"
586 )));
587 }
588
589 if peer == &self.me_peerid {
590 return Err(HoprTransportError::Api("ping to self does not make sense".into()));
591 }
592
593 let pinger = self
594 .ping
595 .get()
596 .ok_or_else(|| HoprTransportError::Api("ping processing is not yet initialized".into()))?;
597
598 if let Err(e) = self.network.add(peer, PeerOrigin::ManualPing, vec![]).await {
599 error!(error = %e, "Failed to store the peer observation");
600 }
601
602 let latency = (*pinger).ping(*peer).await?;
603
604 let peer_status = self.network.get(peer).await?.ok_or(HoprTransportError::Probe(
605 hopr_transport_probe::errors::ProbeError::NonExistingPeer,
606 ))?;
607
608 Ok((latency, peer_status))
609 }
610
611 #[tracing::instrument(level = "debug", skip(self))]
612 pub async fn new_session(
613 &self,
614 destination: Address,
615 target: SessionTarget,
616 cfg: SessionClientConfig,
617 ) -> errors::Result<Session> {
618 Ok(self.smgr.new_session(destination, target, cfg).await?)
619 }
620
621 #[tracing::instrument(level = "debug", skip(self))]
622 pub async fn probe_session(&self, id: &SessionId) -> errors::Result<()> {
623 Ok(self.smgr.ping_session(id).await?)
624 }
625
626 #[tracing::instrument(level = "info", skip(self, msg), fields(uuid = uuid::Uuid::new_v4().to_string()))]
627 pub async fn send_message(&self, msg: Box<[u8]>, routing: DestinationRouting, tag: Tag) -> errors::Result<()> {
628 if let Tag::Reserved(reserved_tag) = tag {
629 return Err(HoprTransportError::Api(format!(
630 "Application tag must not from range: {:?}, but was {reserved_tag:?}",
631 Tag::APPLICATION_TAG_RANGE
632 )));
633 }
634
635 if msg.len() > HoprPacket::PAYLOAD_SIZE {
636 return Err(HoprTransportError::Api(format!(
637 "Message exceeds the maximum allowed size of {} bytes",
638 HoprPacket::PAYLOAD_SIZE
639 )));
640 }
641
642 let app_data = ApplicationData::new_from_owned(tag, msg);
643 let routing = self.path_planner.resolve_routing(app_data.len(), routing).await?;
644
645 let sender = self.process_packet_send.get().ok_or_else(|| {
648 HoprTransportError::Api("send msg: failed because message processing is not yet initialized".into())
649 })?;
650
651 sender
652 .send_packet(app_data, routing)
653 .await
654 .map_err(|e| HoprTransportError::Api(format!("send msg failed to enqueue msg: {e}")))?
655 .consume_and_wait(crate::constants::PACKET_QUEUE_TIMEOUT_MILLISECONDS)
656 .await
657 .map_err(|e| HoprTransportError::Api(e.to_string()))
658 }
659
660 #[tracing::instrument(level = "debug", skip(self))]
661 pub async fn aggregate_tickets(&self, channel_id: &Hash) -> errors::Result<()> {
662 let entry = self
663 .db
664 .get_channel_by_id(None, channel_id)
665 .await
666 .map_err(hopr_db_sql::api::errors::DbError::from)
667 .map_err(HoprTransportError::from)
668 .and_then(|c| {
669 if let Some(c) = c {
670 Ok(c)
671 } else {
672 Err(ProtocolError::ChannelNotFound.into())
673 }
674 })?;
675
676 if entry.status != ChannelStatus::Open {
677 return Err(ProtocolError::ChannelClosed.into());
678 }
679
680 Err(TicketAggregationError::TransportError(
689 "Ticket aggregation not supported as a session protocol yet".to_string(),
690 )
691 .into())
692 }
693
694 #[tracing::instrument(level = "debug", skip(self))]
695 pub async fn get_public_nodes(&self) -> errors::Result<Vec<(PeerId, Address, Vec<Multiaddr>)>> {
696 Ok(self
697 .db
698 .get_accounts(None, true)
699 .await
700 .map_err(hopr_db_sql::api::errors::DbError::from)?
701 .into_iter()
702 .map(|entry| {
703 (
704 PeerId::from(entry.public_key),
705 entry.chain_addr,
706 Vec::from_iter(entry.get_multiaddr().into_iter()),
707 )
708 })
709 .collect())
710 }
711
712 pub async fn is_allowed_to_access_network<'a>(
713 &self,
714 address_like: either::Either<&'a PeerId, Address>,
715 ) -> errors::Result<bool>
716 where
717 T: 'a,
718 {
719 let db_clone = self.db.clone();
720 let address_like_noref = address_like.map_left(|peer| *peer);
721
722 Ok(self
723 .db
724 .begin_transaction()
725 .await
726 .map_err(hopr_db_sql::api::errors::DbError::from)?
727 .perform(|tx| {
728 Box::pin(async move {
729 match address_like_noref {
730 either::Left(peer) => {
731 let pk = OffchainPublicKey::try_from(peer)?;
732 if let Some(address) = db_clone.translate_key(Some(tx), pk).await? {
733 db_clone.is_allowed_in_network_registry(Some(tx), &address).await
734 } else {
735 Err(hopr_db_sql::errors::DbSqlError::LogicalError(
736 "cannot translate off-chain key".into(),
737 ))
738 }
739 }
740 either::Right(address) => db_clone.is_allowed_in_network_registry(Some(tx), &address).await,
741 }
742 })
743 })
744 .await
745 .map_err(hopr_db_sql::api::errors::DbError::from)?)
746 }
747
748 #[tracing::instrument(level = "debug", skip(self))]
749 pub async fn listening_multiaddresses(&self) -> Vec<Multiaddr> {
750 self.network
751 .get(&self.me_peerid)
752 .await
753 .unwrap_or_else(|e| {
754 error!(error = %e, "failed to obtain listening multi-addresses");
755 None
756 })
757 .map(|peer| peer.multiaddresses)
758 .unwrap_or_default()
759 }
760
761 #[tracing::instrument(level = "debug", skip(self))]
762 pub fn announceable_multiaddresses(&self) -> Vec<Multiaddr> {
763 let mut mas = self
764 .local_multiaddresses()
765 .into_iter()
766 .filter(|ma| {
767 hopr_transport_identity::multiaddrs::is_supported(ma)
768 && (self.cfg.transport.announce_local_addresses
769 || !hopr_transport_identity::multiaddrs::is_private(ma))
770 })
771 .map(|ma| strip_p2p_protocol(&ma))
772 .filter(|v| !v.is_empty())
773 .collect::<Vec<_>>();
774
775 mas.sort_by(|l, r| {
776 let is_left_dns = hopr_transport_identity::multiaddrs::is_dns(l);
777 let is_right_dns = hopr_transport_identity::multiaddrs::is_dns(r);
778
779 if !(is_left_dns ^ is_right_dns) {
780 std::cmp::Ordering::Equal
781 } else if is_left_dns {
782 std::cmp::Ordering::Less
783 } else {
784 std::cmp::Ordering::Greater
785 }
786 });
787
788 mas
789 }
790
791 pub fn local_multiaddresses(&self) -> Vec<Multiaddr> {
792 self.my_multiaddresses.clone()
793 }
794
795 #[tracing::instrument(level = "debug", skip(self))]
796 pub async fn network_observed_multiaddresses(&self, peer: &PeerId) -> Vec<Multiaddr> {
797 self.network
798 .get(peer)
799 .await
800 .unwrap_or(None)
801 .map(|peer| peer.multiaddresses)
802 .unwrap_or(vec![])
803 }
804
805 #[tracing::instrument(level = "debug", skip(self))]
806 pub async fn network_health(&self) -> Health {
807 self.network.health().await
808 }
809
810 #[tracing::instrument(level = "debug", skip(self))]
811 pub async fn network_connected_peers(&self) -> errors::Result<Vec<PeerId>> {
812 Ok(self.network.connected_peers().await?)
813 }
814
815 #[tracing::instrument(level = "debug", skip(self))]
816 pub async fn network_peer_info(&self, peer: &PeerId) -> errors::Result<Option<PeerStatus>> {
817 Ok(self.network.get(peer).await?)
818 }
819
820 #[tracing::instrument(level = "debug", skip(self))]
821 pub async fn ticket_statistics(&self) -> errors::Result<TicketStatistics> {
822 let ticket_stats = self.db.get_ticket_statistics(None).await?;
823
824 Ok(TicketStatistics {
825 winning_count: ticket_stats.winning_tickets,
826 unredeemed_value: ticket_stats.unredeemed_value,
827 redeemed_value: ticket_stats.redeemed_value,
828 neglected_value: ticket_stats.neglected_value,
829 rejected_value: ticket_stats.rejected_value,
830 })
831 }
832
833 #[tracing::instrument(level = "debug", skip(self))]
834 pub async fn tickets_in_channel(&self, channel_id: &Hash) -> errors::Result<Option<Vec<AcknowledgedTicket>>> {
835 if let Some(channel) = self
836 .db
837 .get_channel_by_id(None, channel_id)
838 .await
839 .map_err(hopr_db_sql::api::errors::DbError::from)?
840 {
841 let own_address: Address = self
842 .db
843 .translate_key(None, ChainOrPacketKey::PacketKey(*self.me.public()))
844 .await?
845 .ok_or_else(|| {
846 HoprTransportError::Api("Failed to translate the off-chain key to on-chain address".into())
847 })?
848 .try_into()?;
849
850 if channel.destination == own_address {
851 Ok(Some(self.db.get_tickets((&channel).into()).await?))
852 } else {
853 Ok(None)
854 }
855 } else {
856 Ok(None)
857 }
858 }
859
860 #[tracing::instrument(level = "debug", skip(self))]
861 pub async fn all_tickets(&self) -> errors::Result<Vec<Ticket>> {
862 Ok(self
863 .db
864 .get_all_tickets()
865 .await?
866 .into_iter()
867 .map(|v| v.ticket.leak())
868 .collect())
869 }
870}
871
872fn build_mixer_cfg_from_env() -> MixerConfig {
873 let mixer_cfg = MixerConfig {
874 min_delay: std::time::Duration::from_millis(
875 std::env::var("HOPR_INTERNAL_MIXER_MINIMUM_DELAY_IN_MS")
876 .map(|v| {
877 v.trim()
878 .parse::<u64>()
879 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS)
880 })
881 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_MINIMUM_DEFAULT_DELAY_IN_MS),
882 ),
883 delay_range: std::time::Duration::from_millis(
884 std::env::var("HOPR_INTERNAL_MIXER_DELAY_RANGE_IN_MS")
885 .map(|v| {
886 v.trim()
887 .parse::<u64>()
888 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS)
889 })
890 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_DEFAULT_DELAY_RANGE_IN_MS),
891 ),
892 capacity: std::env::var("HOPR_INTERNAL_MIXER_CAPACITY")
893 .map(|v| {
894 v.trim()
895 .parse::<usize>()
896 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY)
897 })
898 .unwrap_or(hopr_transport_mixer::config::HOPR_MIXER_CAPACITY),
899 ..MixerConfig::default()
900 };
901 debug!(?mixer_cfg, "Mixer configuration");
902
903 mixer_cfg
904}