1use std::num::NonZeroU8;
2
3use futures::{Stream, StreamExt};
4use hopr_network_types::prelude::is_public_address;
5use hopr_transport_identity::{
6 Multiaddr,
7 multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
8};
9use hopr_transport_protocol::PeerDiscovery;
10use libp2p::{
11 PeerId, autonat,
12 identity::PublicKey,
13 swarm::{NetworkInfo, SwarmEvent},
14};
15use tracing::{debug, error, info, trace, warn};
16
17use crate::{HoprNetwork, HoprNetworkBehavior, HoprNetworkBehaviorEvent, constants, errors::Result};
18
19#[cfg(all(feature = "prometheus", not(test)))]
20lazy_static::lazy_static! {
21 static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
22 "hopr_transport_p2p_active_connection_count",
23 "Number of currently active connections"
24 ).unwrap();
25 static ref METRIC_TRANSPORT_NAT_STATUS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
26 "hopr_transport_p2p_nat_status",
27 "Current NAT status as reported by libp2p autonat. 0=Unknown, 1=Public, 2=Private"
28 ).unwrap();
29 static ref METRIC_NETWORK_HEALTH: hopr_metrics::SimpleGauge =
30 hopr_metrics::SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
31}
32
33pub struct InactiveNetwork {
34 swarm: libp2p::Swarm<HoprNetworkBehavior>,
35}
36
37impl InactiveNetwork {
41 #[cfg(feature = "runtime-tokio")]
42 pub async fn build<T>(me: libp2p::identity::Keypair, external_discovery_events: T) -> Result<Self>
43 where
44 T: Stream<Item = PeerDiscovery> + Send + 'static,
45 {
46 let me_public: PublicKey = me.public();
47
48 let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
49 .with_tokio()
50 .with_tcp(
51 libp2p::tcp::Config::default().nodelay(true),
52 libp2p::noise::Config::new,
53 libp2p::yamux::Config::default,
56 )
57 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
58
59 #[cfg(feature = "transport-quic")]
60 let swarm = swarm.with_quic();
61
62 let swarm = swarm.with_dns();
63
64 Ok(Self {
65 swarm: swarm
66 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
67 .with_behaviour(|_key| HoprNetworkBehavior::new(me_public, external_discovery_events))
68 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
69 .with_swarm_config(|cfg| {
70 cfg.with_dial_concurrency_factor(
71 NonZeroU8::new({
72 let v = std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
73 .ok()
74 .and_then(|v| v.trim().parse::<u8>().ok())
75 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT);
76 v.max(1)
77 })
78 .expect("clamped to >= 1, will never fail"),
79 )
80 .with_max_negotiating_inbound_streams(
81 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
82 .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
83 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
84 )
85 .with_idle_connection_timeout(
86 std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
87 .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
88 .map(std::time::Duration::from_secs)
89 .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
90 )
91 })
92 .build(),
93 })
94 }
95
96 pub fn with_listen_on(mut self, multiaddresses: Vec<Multiaddr>) -> Result<InactiveConfiguredNetwork> {
97 for multiaddress in multiaddresses.iter() {
98 match resolve_dns_if_any(multiaddress) {
99 Ok(ma) => {
100 if let Err(e) = self.swarm.listen_on(ma.clone()) {
101 warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
102
103 match replace_transport_with_unspecified(&ma) {
104 Ok(ma) => {
105 if let Err(e) = self.swarm.listen_on(ma.clone()) {
106 warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
107 } else {
108 info!(
109 listen_on = ?ma,
110 multiaddress = ?multiaddress,
111 "Listening for p2p connections"
112 );
113 self.swarm.add_external_address(multiaddress.clone());
114 }
115 }
116 Err(e) => {
117 error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
118 }
119 }
120 } else {
121 info!(
122 listen_on = ?ma,
123 multiaddress = ?multiaddress,
124 "Listening for p2p connections"
125 );
126 self.swarm.add_external_address(multiaddress.clone());
127 }
128 }
129 Err(error) => error!(%multiaddress, %error, "Failed to transform the multiaddress"),
130 }
131 }
132
133 Ok(InactiveConfiguredNetwork { swarm: self.swarm })
134 }
135}
136
137pub struct InactiveConfiguredNetwork {
138 swarm: libp2p::Swarm<HoprNetworkBehavior>,
139}
140
141pub struct HoprLibp2pNetworkBuilder {
148 pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
149 me: PeerId,
150 my_addresses: Vec<Multiaddr>,
151}
152
153impl std::fmt::Debug for HoprLibp2pNetworkBuilder {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 f.debug_struct("HoprSwarm").finish()
156 }
157}
158
159impl From<HoprLibp2pNetworkBuilder> for libp2p::Swarm<HoprNetworkBehavior> {
160 fn from(value: HoprLibp2pNetworkBuilder) -> Self {
161 value.swarm
162 }
163}
164
165impl HoprLibp2pNetworkBuilder {
166 pub async fn new<T>(
167 identity: libp2p::identity::Keypair,
168 external_discovery_events: T,
169 my_multiaddresses: Vec<Multiaddr>,
170 ) -> Self
171 where
172 T: Stream<Item = PeerDiscovery> + Send + 'static,
173 {
174 #[cfg(all(feature = "prometheus", not(test)))]
175 {
176 METRIC_NETWORK_HEALTH.set(0.0);
177 }
178
179 let me = identity.public().to_peer_id();
180 let swarm = InactiveNetwork::build(identity, external_discovery_events)
181 .await
182 .expect("swarm must be constructible");
183
184 let swarm = swarm
185 .with_listen_on(my_multiaddresses.clone())
186 .expect("swarm must be configurable");
187
188 Self {
189 swarm: swarm.swarm,
190 me,
191 my_addresses: my_multiaddresses,
192 }
193 }
194
195 pub fn into_network_with_stream_protocol_process(
196 self,
197 protocol: &'static str,
198 allow_private_addresses: bool,
199 ) -> (HoprNetwork, impl std::future::Future<Output = ()>) {
200 let tracker = hopr_transport_network::track::NetworkPeerTracker::new();
201 let store =
202 hopr_transport_network::store::NetworkPeerStore::new(self.me, self.my_addresses.into_iter().collect());
203
204 let network = HoprNetwork {
205 tracker: tracker.clone(),
206 store: store.clone(),
207 control: self.swarm.behaviour().streams.new_control(),
208 protocol: libp2p::StreamProtocol::new(protocol),
209 };
210
211 #[cfg(all(feature = "prometheus", not(test)))]
212 let network_inner = network.clone();
213 let mut swarm = self.swarm;
214 let process = async move {
215 while let Some(event) = swarm.next().await {
216 match event {
217 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(_)) => {}
218 SwarmEvent::Behaviour(
219 HoprNetworkBehaviorEvent::Autonat(event)
220 ) => {
221 match event {
222 autonat::Event::StatusChanged { old, new } => {
223 info!(?old, ?new, "AutoNAT status changed");
224 #[cfg(all(feature = "prometheus", not(test)))]
225 {
226 let value = match new {
227 autonat::NatStatus::Unknown => 0.0,
228 autonat::NatStatus::Public(_) => 1.0,
229 autonat::NatStatus::Private => 2.0,
230 };
231 METRIC_TRANSPORT_NAT_STATUS.set(value);
232 }
233 }
234 autonat::Event::InboundProbe { .. } => {}
235 autonat::Event::OutboundProbe { .. } => {}
236 }
237 }
238 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Identify(_event)) => {}
239 SwarmEvent::ConnectionEstablished {
240 peer_id,
241 connection_id,
242 num_established,
243 established_in,
244 endpoint,
245 ..
246 } => {
248 debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
249
250 if num_established == std::num::NonZero::<u32>::new(1).expect("must be a non-zero value") {
251 match endpoint {
252 libp2p::core::ConnectedPoint::Dialer { address, .. } => {
253 if allow_private_addresses || is_public_address(&address) {
254 if let Err(error) = store.add(peer_id, std::collections::HashSet::from([address])) {
255 error!(peer = %peer_id, %error, direction = "outgoing", "failed to add connected peer to the peer store");
256 }
257 tracker.add(peer_id);
258 } else {
259 debug!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
260 }
261 },
262 libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => {
263 if allow_private_addresses || is_public_address(&send_back_addr) {
264 if let Err(error) = store.add(peer_id, std::collections::HashSet::from([send_back_addr])) {
265 error!(peer = %peer_id, %error, direction = "incoming", "failed to add connected peer to the peer store");
266 }
267 tracker.add(peer_id);
268 } else {
269 debug!(transport="libp2p", peer = %peer_id, multiaddress = %send_back_addr, "Private/local peer address ignored")
270 }
271 }
272 }
273 } else {
274 trace!(transport="libp2p", peer = %peer_id, num_established, "Additional connection established")
275 }
276
277 print_network_info(swarm.network_info(), "connection established");
278
279 #[cfg(all(feature = "prometheus", not(test)))]
280 {
281 METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
282 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
283 }
284 }
285 SwarmEvent::ConnectionClosed {
286 peer_id,
287 connection_id,
288 cause,
289 num_established,
290 ..
291 } => {
293 debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
294
295 if num_established == 0 {
296 tracker.remove(&peer_id);
297 }
298
299 print_network_info(swarm.network_info(), "connection closed");
300
301 #[cfg(all(feature = "prometheus", not(test)))]
302 {
303 METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
304 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
305 }
306 }
307 SwarmEvent::IncomingConnection {
308 connection_id,
309 local_addr,
310 send_back_addr,
311 } => {
312 trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", "incoming connection");
313 }
314 SwarmEvent::IncomingConnectionError {
315 local_addr,
316 connection_id,
317 error,
318 send_back_addr,
319 peer_id
320 } => {
321 debug!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
322 }
323 SwarmEvent::OutgoingConnectionError {
324 connection_id,
325 error,
326 peer_id
327 } => {
328 debug!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
329
330 if let Some(peer_id) = peer_id
331 && !swarm.is_connected(&peer_id) {
332 if let Err(error) = store.remove(&peer_id) {
333 error!(peer = %peer_id, %error, "failed to remove undialable peer from the peer store");
334 }
335 tracker.remove(&peer_id);
336 }
337
338 #[cfg(all(feature = "prometheus", not(test)))]
339 {
340 METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
341 }
342 }
343 SwarmEvent::NewListenAddr {
344 listener_id,
345 address,
346 } => {
347 debug!(%listener_id, %address, transport="libp2p", "new listen address")
348 }
349 SwarmEvent::ExpiredListenAddr {
350 listener_id,
351 address,
352 } => {
353 debug!(%listener_id, %address, transport="libp2p", "expired listen address")
354 }
355 SwarmEvent::ListenerClosed {
356 listener_id,
357 addresses,
358 reason,
359 } => {
360 debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
361 }
362 SwarmEvent::ListenerError {
363 listener_id,
364 error,
365 } => {
366 debug!(%listener_id, transport="libp2p", %error, "listener error")
367 }
368 SwarmEvent::Dialing {
369 peer_id,
370 connection_id,
371 } => {
372 debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
373 }
374 SwarmEvent::NewExternalAddrCandidate {address} => {
375 debug!(%address, "Detected new external address candidate")
376 }
377 SwarmEvent::ExternalAddrConfirmed { address } => {
378 info!(%address, "Detected external address")
379 }
380 SwarmEvent::ExternalAddrExpired {
381 .. } => {}
383 SwarmEvent::NewExternalAddrOfPeer {
384 peer_id, address
385 } => {
386 if allow_private_addresses || is_public_address(&address) {
388 swarm.add_peer_address(peer_id, address.clone());
389 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Public peer address stored in swarm")
390 } else {
391 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
392 }
393 },
394 _ => trace!(transport="libp2p", "Unsupported enum option detected")
395 }
396 }
397 };
398
399 (network, process)
400 }
401}
402
403fn print_network_info(network_info: NetworkInfo, event: &str) {
404 let num_peers = network_info.num_peers();
405 let connection_counters = network_info.connection_counters();
406 let num_incoming = connection_counters.num_established_incoming();
407 let num_outgoing = connection_counters.num_established_outgoing();
408 info!(
409 num_peers,
410 num_incoming, num_outgoing, "swarm network status after {event}"
411 );
412}