hopr_transport_p2p/
lib.rs1pub mod constants;
26
27pub mod errors;
29
30pub mod peer_store;
32
33pub mod swarm;
35
36mod behavior;
38
39use std::{collections::HashSet, sync::Arc};
40
41use dashmap::DashSet;
42use futures::{AsyncRead, AsyncWrite};
43pub use hopr_api::network::Health;
44use hopr_api::network::{NetworkView, traits::NetworkStreamControl};
45use libp2p::{Multiaddr, PeerId};
46
47mod utils;
48
49pub use crate::{
50 behavior::{HoprNetworkBehavior, HoprNetworkBehaviorEvent},
51 swarm::HoprLibp2pNetworkBuilder,
52};
53
54#[derive(Debug, Clone)]
56pub enum PeerDiscovery {
57 Announce(PeerId, Vec<Multiaddr>),
58}
59
60#[derive(Clone)]
61pub struct HoprNetwork {
62 tracker: Arc<DashSet<PeerId>>,
63 store: Arc<crate::peer_store::NetworkPeerStore>,
64 control: libp2p_stream::Control,
65 protocol: libp2p::StreamProtocol,
66 event_rx: async_broadcast::InactiveReceiver<hopr_api::network::NetworkEvent>,
67}
68
69impl std::fmt::Debug for HoprNetwork {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("HoprNetwork")
72 .field("tracker", &self.tracker)
73 .field("store", &self.store)
74 .field("protocol", &self.protocol)
75 .finish()
76 }
77}
78
79impl NetworkView for HoprNetwork {
80 fn listening_as(&self) -> HashSet<Multiaddr> {
81 self.store.get(self.store.me()).unwrap_or_else(|| {
82 tracing::error!("failed to get own peer info from the peer store");
83 std::collections::HashSet::new()
84 })
85 }
86
87 #[inline]
88 fn discovered_peers(&self) -> HashSet<PeerId> {
89 self.store.iter_keys().collect()
90 }
91
92 #[inline]
93 fn connected_peers(&self) -> HashSet<PeerId> {
94 self.tracker.iter().map(|r| *r).collect()
95 }
96
97 fn is_connected(&self, peer: &PeerId) -> bool {
98 self.tracker.contains(peer)
99 }
100
101 #[inline]
102 fn multiaddress_of(&self, peer: &PeerId) -> Option<HashSet<Multiaddr>> {
103 self.store.get(peer)
104 }
105
106 fn health(&self) -> Health {
107 match self.tracker.len() {
108 0 => Health::Red,
109 1 => Health::Orange,
110 2..4 => Health::Yellow,
111 _ => Health::Green,
112 }
113 }
114
115 fn subscribe_network_events(
116 &self,
117 ) -> impl futures::Stream<Item = hopr_api::network::NetworkEvent> + Send + 'static {
118 self.event_rx.clone().activate()
119 }
120}
121
122#[async_trait::async_trait]
123impl NetworkStreamControl for HoprNetwork {
124 fn accept(
125 mut self,
126 ) -> Result<impl futures::Stream<Item = (PeerId, impl AsyncRead + AsyncWrite + Send)> + Send, impl std::error::Error>
127 {
128 self.control.accept(self.protocol)
129 }
130
131 async fn open(mut self, peer: PeerId) -> Result<impl AsyncRead + AsyncWrite + Send, impl std::error::Error> {
132 self.control.open_stream(peer, self.protocol).await
133 }
134}