Skip to main content

hopr_transport_p2p/
swarm.rs

1#[cfg(feature = "runtime-tokio")]
2use std::num::NonZeroU8;
3use std::sync::Arc;
4
5use dashmap::DashSet;
6use futures::{FutureExt, Stream, StreamExt, stream::BoxStream};
7use hopr_api::{Multiaddr, OffchainKeypair, network::BoxedProcessFn};
8use hopr_utils::network_types::prelude::is_public_address;
9use libp2p::{
10    autonat,
11    identity::PublicKey,
12    swarm::{NetworkInfo, SwarmEvent},
13};
14use tracing::{debug, error, info, trace, warn};
15
16use crate::{
17    HoprNetwork, HoprNetworkBehavior, HoprNetworkBehaviorEvent, PeerDiscovery, constants,
18    errors::Result,
19    utils::{replace_transport_with_unspecified, resolve_dns_if_any},
20};
21
22#[cfg(all(feature = "telemetry", not(test)))]
23lazy_static::lazy_static! {
24    static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT:  hopr_types::telemetry::SimpleGauge =  hopr_types::telemetry::SimpleGauge::new(
25        "hopr_transport_p2p_active_connection_count",
26        "Number of currently active p2p connections as observed from libp2p events"
27    ).unwrap();
28    static ref METRIC_TRANSPORT_NAT_STATUS: hopr_types::telemetry::SimpleGauge = hopr_types::telemetry::SimpleGauge::new(
29        "hopr_transport_p2p_nat_status",
30        "Current NAT status as reported by libp2p autonat. 0=Unknown, 1=Public, 2=Private"
31    ).unwrap();
32    static ref METRIC_NETWORK_HEALTH: hopr_types::telemetry::SimpleGauge =
33         hopr_types::telemetry::SimpleGauge::new("hopr_network_health", "Connectivity health indicator").unwrap();
34}
35
36pub struct InactiveNetwork {
37    swarm: libp2p::Swarm<HoprNetworkBehavior>,
38}
39
40#[cfg(any(target_os = "android", target_os = "ios"))]
41fn swarm_dns_config() -> (libp2p::dns::ResolverConfig, libp2p::dns::ResolverOpts) {
42    (
43        libp2p::dns::ResolverConfig::cloudflare(),
44        libp2p::dns::ResolverOpts::default(),
45    )
46}
47
48/// Build objects comprising an inactive p2p network.
49///
50/// Returns a built [libp2p::Swarm] object implementing the HoprNetworkBehavior functionality.
51impl InactiveNetwork {
52    #[cfg(feature = "runtime-tokio")]
53    pub async fn build(
54        me: libp2p::identity::Keypair,
55        external_discovery_events: BoxStream<'static, PeerDiscovery>,
56    ) -> Result<Self> {
57        let me_public: PublicKey = me.public();
58
59        let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
60            .with_tokio()
61            .with_tcp(
62                libp2p::tcp::Config::default().nodelay(true),
63                libp2p::noise::Config::new,
64                // use default yamux configuration to enable auto-tuning
65                // see https://github.com/libp2p/rust-libp2p/pull/4970
66                libp2p::yamux::Config::default,
67            )
68            .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
69
70        #[cfg(feature = "transport-quic")]
71        let swarm = swarm.with_quic();
72
73        #[cfg(any(target_os = "android", target_os = "ios"))]
74        let swarm = {
75            let (dns_resolver_config, dns_resolver_opts) = swarm_dns_config();
76            swarm.with_dns_config(dns_resolver_config, dns_resolver_opts)
77        };
78
79        #[cfg(not(any(target_os = "android", target_os = "ios")))]
80        let swarm = swarm
81            .with_dns()
82            .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
83
84        Ok(Self {
85            swarm: swarm
86                .with_behaviour(|_key| HoprNetworkBehavior::new(me_public, external_discovery_events))
87                .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
88                .with_swarm_config(|cfg| {
89                    cfg.with_dial_concurrency_factor(
90                        NonZeroU8::new({
91                            let v = std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
92                                .ok()
93                                .and_then(|v| v.trim().parse::<u8>().ok())
94                                .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT);
95                            v.max(1)
96                        })
97                        .expect("clamped to >= 1, will never fail"),
98                    )
99                    .with_max_negotiating_inbound_streams(
100                        std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
101                            .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
102                            .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
103                    )
104                    .with_idle_connection_timeout(
105                        std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
106                            .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
107                            .map(std::time::Duration::from_secs)
108                            .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
109                    )
110                })
111                .build(),
112        })
113    }
114
115    #[cfg(not(feature = "runtime-tokio"))]
116    pub async fn build<T>(_me: libp2p::identity::Keypair, _external_discovery_events: T) -> Result<Self>
117    where
118        T: Stream<Item = PeerDiscovery> + Send + 'static,
119    {
120        Err(crate::errors::P2PError::Libp2p(
121            "InactiveNetwork::build requires the runtime-tokio feature".to_string(),
122        ))
123    }
124
125    pub fn with_listen_on(mut self, multiaddresses: Vec<Multiaddr>) -> Result<InactiveConfiguredNetwork> {
126        for multiaddress in multiaddresses.iter() {
127            match resolve_dns_if_any(multiaddress) {
128                Ok(ma) => {
129                    if let Err(e) = self.swarm.listen_on(ma.clone()) {
130                        warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
131
132                        match replace_transport_with_unspecified(&ma) {
133                            Ok(ma) => {
134                                if let Err(e) = self.swarm.listen_on(ma.clone()) {
135                                    warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
136                                } else {
137                                    info!(
138                                        listen_on = ?ma,
139                                        multiaddress = ?multiaddress,
140                                        "Listening for p2p connections"
141                                    );
142                                    self.swarm.add_external_address(multiaddress.clone());
143                                }
144                            }
145                            Err(e) => {
146                                error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
147                            }
148                        }
149                    } else {
150                        info!(
151                            listen_on = ?ma,
152                            multiaddress = ?multiaddress,
153                            "Listening for p2p connections"
154                        );
155                        self.swarm.add_external_address(multiaddress.clone());
156                    }
157                }
158                Err(error) => error!(%multiaddress, %error, "Failed to transform the multiaddress"),
159            }
160        }
161
162        Ok(InactiveConfiguredNetwork { swarm: self.swarm })
163    }
164}
165
166pub struct InactiveConfiguredNetwork {
167    swarm: libp2p::Swarm<HoprNetworkBehavior>,
168}
169
170/// Factory for constructing the libp2p network and its background process.
171///
172/// Accepts a peer discovery stream and produces a [`HoprNetwork`] + background
173/// process function. The network supports event subscription via
174/// [`NetworkView::subscribe_network_events`](hopr_api::network::NetworkView::subscribe_network_events).
175pub struct HoprLibp2pNetworkBuilder {
176    bootstrap: std::pin::Pin<Box<dyn Stream<Item = PeerDiscovery> + Send + Sync>>,
177}
178
179impl HoprLibp2pNetworkBuilder {
180    pub fn new<T>(bootstrap: T) -> Self
181    where
182        T: Stream<Item = PeerDiscovery> + Send + Sync + 'static,
183    {
184        Self {
185            bootstrap: Box::pin(bootstrap),
186        }
187    }
188}
189
190impl std::fmt::Debug for HoprLibp2pNetworkBuilder {
191    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192        f.debug_struct("HoprSwarmBuilder").finish_non_exhaustive()
193    }
194}
195
196impl HoprLibp2pNetworkBuilder {
197    /// Build the network and return it along with its background process.
198    ///
199    /// The returned [`HoprNetwork`] supports event subscription — subscribe
200    /// before starting the process to avoid missing events.
201    pub async fn build(
202        self,
203        identity: &OffchainKeypair,
204        my_multiaddresses: Vec<Multiaddr>,
205        protocol: &'static str,
206        allow_private_addresses: bool,
207    ) -> std::result::Result<(HoprNetwork, BoxedProcessFn), impl std::error::Error> {
208        #[cfg(all(feature = "telemetry", not(test)))]
209        {
210            METRIC_NETWORK_HEALTH.set(0.0);
211        }
212
213        let identity_p2p: libp2p::identity::Keypair = identity.into();
214        let me = identity_p2p.public().to_peer_id();
215        let swarm = InactiveNetwork::build(identity_p2p, self.bootstrap)
216            .await
217            .expect("swarm must be constructible");
218
219        let swarm = swarm
220            .with_listen_on(my_multiaddresses.clone())
221            .expect("swarm must be configurable");
222
223        let swarm = swarm.swarm;
224        let store = crate::peer_store::NetworkPeerStore::new(me, my_multiaddresses.into_iter().collect());
225        let tracker: Arc<DashSet<libp2p::PeerId>> = Default::default();
226
227        let (notifier, event_rx) = async_broadcast::broadcast(1000);
228
229        let network = HoprNetwork {
230            tracker: tracker.clone(),
231            store: Arc::new(store.clone()),
232            control: swarm.behaviour().streams.new_control(),
233            protocol: libp2p::StreamProtocol::new(protocol),
234            event_rx: event_rx.deactivate(),
235        };
236
237        #[cfg(all(feature = "telemetry", not(test)))]
238        let network_inner = network.clone();
239        let mut swarm = swarm;
240        let process = async move {
241            while let Some(event) = swarm.next().await {
242                match event {
243                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(_)) => {}
244                    SwarmEvent::Behaviour(
245                        HoprNetworkBehaviorEvent::Autonat(event)
246                    ) => {
247                        match event {
248                            autonat::Event::StatusChanged { old, new } => {
249                                info!(?old, ?new, "AutoNAT status changed");
250                                #[cfg(all(feature = "telemetry", not(test)))]
251                                {
252                                    let value = match new {
253                                        autonat::NatStatus::Unknown => 0.0,
254                                        autonat::NatStatus::Public(_) => 1.0,
255                                        autonat::NatStatus::Private => 2.0,
256                                    };
257                                    METRIC_TRANSPORT_NAT_STATUS.set(value);
258                                }
259                            }
260                            autonat::Event::InboundProbe { .. } => {}
261                            autonat::Event::OutboundProbe { .. } => {}
262                        }
263                    }
264                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Identify(_event)) => {}
265                    SwarmEvent::ConnectionEstablished {
266                        peer_id,
267                        connection_id,
268                        num_established,
269                        established_in,
270                        endpoint,
271                        ..
272                        // concurrent_dial_errors,
273                    } => {
274                        debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
275
276                        if num_established == std::num::NonZero::<u32>::new(1).expect("must be a non-zero value") {
277                            match endpoint {
278                                libp2p::core::ConnectedPoint::Dialer { address, .. } => {
279                                    if allow_private_addresses || is_public_address(&address) {
280                                        if let Err(error) = store.add(peer_id, std::collections::HashSet::from([address])) {
281                                            error!(peer = %peer_id, %error, direction = "outgoing", "failed to add connected peer to the peer store");
282                                        }
283                                    } else {
284                                        debug!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address encountered")
285                                    }
286                                    tracker.insert(peer_id);
287                                    if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerConnected(peer_id)) {
288                                        error!(peer = %peer_id, %error, "failed to broadcast peer connected event");
289                                    }
290                                },
291                                libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => {
292                                    if allow_private_addresses || is_public_address(&send_back_addr) {
293                                        if let Err(error) = store.add(peer_id, std::collections::HashSet::from([send_back_addr])) {
294                                            error!(peer = %peer_id, %error, direction = "incoming", "failed to add connected peer to the peer store");
295                                        }
296                                    } else {
297                                        debug!(transport="libp2p", peer = %peer_id, multiaddress = %send_back_addr, "Private/local peer address ignored")
298                                    }
299                                    tracker.insert(peer_id);
300                                    if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerConnected(peer_id)) {
301                                        error!(peer = %peer_id, %error, "failed to broadcast peer connected event");
302                                    }
303                                }
304                            }
305                        } else {
306                            trace!(transport="libp2p", peer = %peer_id, num_established, "Additional connection established")
307                        }
308
309                        print_network_info(swarm.network_info(), "connection established");
310
311                        #[cfg(all(feature = "telemetry", not(test)))]
312                        {
313                            METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
314                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
315                        }
316                    }
317                    SwarmEvent::ConnectionClosed {
318                        peer_id,
319                        connection_id,
320                        cause,
321                        num_established,
322                        ..
323                        // endpoint,
324                    } => {
325                        debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
326
327                        if num_established == 0 {
328                            tracker.remove(&peer_id);
329                            if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerDisconnected(peer_id)) {
330                                error!(peer = %peer_id, %error, "failed to broadcast peer disconnected event");
331                            }
332                        }
333
334                        print_network_info(swarm.network_info(), "connection closed");
335
336                        #[cfg(all(feature = "telemetry", not(test)))]
337                        {
338                            METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
339                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
340                        }
341                    }
342                    SwarmEvent::IncomingConnection {
343                        connection_id,
344                        local_addr,
345                        send_back_addr,
346                    } => {
347                        trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", "incoming connection");
348                    }
349                    SwarmEvent::IncomingConnectionError {
350                        local_addr,
351                        connection_id,
352                        error,
353                        send_back_addr,
354                        peer_id
355                    } => {
356                        debug!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
357                    }
358                    SwarmEvent::OutgoingConnectionError {
359                        connection_id,
360                        error,
361                        peer_id
362                    } => {
363                        debug!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
364
365                        if let Some(peer_id) = peer_id
366                            && !swarm.is_connected(&peer_id) {
367                                if let Err(error) = store.remove(&peer_id) {
368                                    error!(peer = %peer_id, %error, "failed to remove undialable peer from the peer store");
369                                }
370                                tracker.remove(&peer_id);
371                                if let Err(error) = notifier.try_broadcast(hopr_api::network::NetworkEvent::PeerDisconnected(peer_id)) {
372                                    error!(peer = %peer_id, %error, "failed to broadcast peer disconnected event");
373                                }
374                            }
375
376                        #[cfg(all(feature = "telemetry", not(test)))]
377                        {
378                            METRIC_NETWORK_HEALTH.set((hopr_api::network::NetworkView::health(&network_inner) as i32).into());
379                        }
380                    }
381                    SwarmEvent::NewListenAddr {
382                        listener_id,
383                        address,
384                    } => {
385                        debug!(%listener_id, %address, transport="libp2p", "new listen address")
386                    }
387                    SwarmEvent::ExpiredListenAddr {
388                        listener_id,
389                        address,
390                    } => {
391                        debug!(%listener_id, %address, transport="libp2p", "expired listen address")
392                    }
393                    SwarmEvent::ListenerClosed {
394                        listener_id,
395                        addresses,
396                        reason,
397                    } => {
398                        debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
399                    }
400                    SwarmEvent::ListenerError {
401                        listener_id,
402                        error,
403                    } => {
404                        debug!(%listener_id, transport="libp2p", %error, "listener error")
405                    }
406                    SwarmEvent::Dialing {
407                        peer_id,
408                        connection_id,
409                    } => {
410                        debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
411                    }
412                    SwarmEvent::NewExternalAddrCandidate {address} => {
413                        debug!(%address, "Detected new external address candidate")
414                    }
415                    SwarmEvent::ExternalAddrConfirmed { address } => {
416                        info!(%address, "Detected external address")
417                    }
418                    SwarmEvent::ExternalAddrExpired {
419                        ..  // address: Multiaddr
420                    } => {}
421                    SwarmEvent::NewExternalAddrOfPeer {
422                        peer_id, address
423                    } => {
424                        // Only store public/routable addresses
425                        if allow_private_addresses || is_public_address(&address) {
426                            swarm.add_peer_address(peer_id, address.clone());
427                            trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Public peer address stored in swarm")
428                        } else {
429                            trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
430                        }
431                    },
432                    _ => trace!(transport="libp2p", "Unsupported enum option detected")
433                }
434            }
435        }.boxed();
436
437        Ok::<_, std::io::Error>((network, Box::new(move || process)))
438    }
439}
440
441fn print_network_info(network_info: NetworkInfo, event: &str) {
442    let num_peers = network_info.num_peers();
443    let connection_counters = network_info.connection_counters();
444    let num_incoming = connection_counters.num_established_incoming();
445    let num_outgoing = connection_counters.num_established_outgoing();
446    info!(
447        num_peers,
448        num_incoming, num_outgoing, "swarm network status after {event}"
449    );
450}
451
452#[cfg(test)]
453mod tests {
454    #[test]
455    #[cfg(any(target_os = "android", target_os = "ios"))]
456    fn uses_cloudflare_dns_resolver_config() {
457        let (resolver_config, resolver_opts) = super::swarm_dns_config();
458        assert_eq!(resolver_config, libp2p::dns::ResolverConfig::cloudflare());
459        assert_eq!(resolver_opts, libp2p::dns::ResolverOpts::default());
460    }
461}