hopr_transport_p2p/
swarm.rs

1use std::num::NonZeroU8;
2
3use futures::{Sink, SinkExt, Stream, StreamExt, select};
4use hopr_internal_types::prelude::*;
5use hopr_network_types::prelude::is_public_address;
6use hopr_transport_identity::{
7    Multiaddr,
8    multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
9};
10use hopr_transport_protocol::PeerDiscovery;
11use libp2p::{
12    autonat,
13    identity::PublicKey,
14    request_response::{OutboundRequestId, ResponseChannel},
15    swarm::{NetworkInfo, SwarmEvent},
16};
17use tracing::{debug, error, info, trace, warn};
18
19use crate::{HoprNetworkBehavior, HoprNetworkBehaviorEvent, constants, errors::Result};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23    static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
24        "hopr_transport_p2p_opened_connection_count",
25        "Number of currently open connections"
26    ).unwrap();
27    static ref METRIC_TRANSPORT_NAT_STATUS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
28        "hopr_transport_p2p_nat_status",
29        "Current NAT status as reported by libp2p autonat. 0=Unknown, 1=Public, 2=Private"
30    ).unwrap();
31}
32
33/// Build objects comprising the p2p network.
34///
35/// Returns a built [libp2p::Swarm] object implementing the HoprNetworkBehavior functionality.
36async fn build_p2p_network<T>(
37    me: libp2p::identity::Keypair,
38    indexer_update_input: T,
39    allow_private_addresses: bool,
40) -> Result<libp2p::Swarm<HoprNetworkBehavior>>
41where
42    T: Stream<Item = PeerDiscovery> + Send + 'static,
43{
44    let me_public: PublicKey = me.public();
45
46    // Both features could be enabled during testing; therefore, we only use tokio when it's
47    // exclusively enabled.
48    //
49    // NOTE: Private address filtering is implemented at multiple levels for defense-in-depth:
50    // 1. Discovery behavior filters PeerDiscovery::Allow and PeerDiscovery::Announce events
51    // 2. SwarmEvent::NewExternalAddrOfPeer events are filtered using is_public_address()
52    // 3. Network layer filters addresses in both add() and get() methods (primary protection)
53    // 4. libp2p's global_only transport wrapper could be added here but the above filtering provides equivalent
54    //    protection while maintaining compatibility with the existing code
55    #[cfg(feature = "runtime-tokio")]
56    let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
57        .with_tokio()
58        .with_tcp(
59            libp2p::tcp::Config::default().nodelay(true),
60            libp2p::noise::Config::new,
61            // use default yamux configuration to enable auto-tuning
62            // see https://github.com/libp2p/rust-libp2p/pull/4970
63            libp2p::yamux::Config::default,
64        )
65        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
66
67    #[cfg(all(feature = "transport-quic", feature = "runtime-tokio"))]
68    let swarm = swarm.with_quic();
69
70    #[cfg(feature = "runtime-tokio")]
71    let swarm = swarm.with_dns();
72
73    Ok(swarm
74        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
75        .with_behaviour(|_key| HoprNetworkBehavior::new(me_public, indexer_update_input, allow_private_addresses))
76        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
77        .with_swarm_config(|cfg| {
78            cfg.with_dial_concurrency_factor(
79                NonZeroU8::new(
80                    std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
81                        .map(|v| v.trim().parse::<u8>().unwrap_or(u8::MAX))
82                        .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT),
83                )
84                .expect("concurrently dialed peer count must be > 0"),
85            )
86            .with_max_negotiating_inbound_streams(
87                std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
88                    .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
89                    .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
90            )
91            .with_idle_connection_timeout(
92                std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
93                    .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
94                    .map(std::time::Duration::from_secs)
95                    .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
96            )
97        })
98        .build())
99}
100
101pub struct HoprSwarm {
102    pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
103    pub allow_private_addresses: bool,
104}
105
106impl std::fmt::Debug for HoprSwarm {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct("HoprSwarm").finish()
109    }
110}
111
112impl From<HoprSwarm> for libp2p::Swarm<HoprNetworkBehavior> {
113    fn from(value: HoprSwarm) -> Self {
114        value.swarm
115    }
116}
117
118impl HoprSwarm {
119    pub async fn new<T>(
120        identity: libp2p::identity::Keypair,
121        indexer_update_input: T,
122        my_multiaddresses: Vec<Multiaddr>,
123        allow_private_addresses: bool,
124    ) -> Self
125    where
126        T: Stream<Item = PeerDiscovery> + Send + 'static,
127    {
128        let mut swarm = build_p2p_network(identity, indexer_update_input, allow_private_addresses)
129            .await
130            .expect("swarm must be constructible");
131
132        for multiaddress in my_multiaddresses.iter() {
133            match resolve_dns_if_any(multiaddress) {
134                Ok(ma) => {
135                    if let Err(e) = swarm.listen_on(ma.clone()) {
136                        warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
137
138                        match replace_transport_with_unspecified(&ma) {
139                            Ok(ma) => {
140                                if let Err(e) = swarm.listen_on(ma.clone()) {
141                                    warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
142                                } else {
143                                    info!(
144                                        listen_on = ?ma,
145                                        multiaddress = ?multiaddress,
146                                        "Listening for p2p connections"
147                                    );
148                                    swarm.add_external_address(multiaddress.clone());
149                                }
150                            }
151                            Err(e) => {
152                                error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
153                            }
154                        }
155                    } else {
156                        info!(
157                            listen_on = ?ma,
158                            multiaddress = ?multiaddress,
159                            "Listening for p2p connections"
160                        );
161                        swarm.add_external_address(multiaddress.clone());
162                    }
163                }
164                Err(e) => error!(%multiaddress, error = %e, "Failed to transform the multiaddress"),
165            }
166        }
167
168        // TODO: perform this check
169        // NOTE: This would be a valid check but is not immediate
170        // assert!(
171        //     swarm.listeners().count() > 0,
172        //     "The node failed to listen on at least one of the specified interfaces"
173        // );
174
175        Self {
176            swarm,
177            allow_private_addresses,
178        }
179    }
180
181    pub fn build_protocol_control(&self, protocol: &'static str) -> crate::HoprStreamProtocolControl {
182        crate::HoprStreamProtocolControl::new(self.swarm.behaviour().streams.new_control(), protocol)
183    }
184
185    /// Main p2p loop that instantiates a new libp2p::Swarm instance and sets up listening and reacting pipelines
186    /// running in a neverending loop future.
187    ///
188    /// The function represents the entirety of the business logic of the hopr daemon related to core operations.
189    ///
190    /// This future can only be resolved by an unrecoverable error or a panic.
191    pub async fn run<T>(self, events: T)
192    where
193        T: Sink<crate::behavior::discovery::Event> + Send + 'static,
194    {
195        let allow_private_addrs = self.allow_private_addresses;
196        let mut swarm: libp2p::Swarm<HoprNetworkBehavior> = self.into();
197        futures::pin_mut!(events);
198
199        loop {
200            select! {
201                event = swarm.select_next_some() => match event {
202                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(event)) => {
203                        if let Err(_error) = events.send(event).await {
204                            tracing::error!("Failed to send discovery event from the transport layer");
205                        }
206                    }
207                    SwarmEvent::Behaviour(
208                        HoprNetworkBehaviorEvent::Autonat(event)
209                    ) => {
210                            match event {
211                                autonat::Event::StatusChanged { old, new } => {
212                                    info!(?old, ?new, "AutoNAT status changed");
213                                    #[cfg(all(feature = "prometheus", not(test)))]
214                                    {
215                                        let value = match new {
216                                            autonat::NatStatus::Unknown => 0.0,
217                                            autonat::NatStatus::Public(_) => 1.0,
218                                            autonat::NatStatus::Private => 2.0,
219                                        };
220                                        METRIC_TRANSPORT_NAT_STATUS.set(value);
221                                    }
222                                }
223                                autonat::Event::InboundProbe { .. } => {}
224                                autonat::Event::OutboundProbe { .. } => {}
225                            }
226                    }
227                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Identify(_event)) => {}
228                    SwarmEvent::ConnectionEstablished {
229                        peer_id,
230                        connection_id,
231                        num_established,
232                        established_in,
233                        ..
234                        // concurrent_dial_errors,
235                        // endpoint,
236                    } => {
237                        debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
238
239                        print_network_info(swarm.network_info(), "connection established");
240
241                        #[cfg(all(feature = "prometheus", not(test)))]
242                        {
243                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
244                        }
245                    }
246                    SwarmEvent::ConnectionClosed {
247                        peer_id,
248                        connection_id,
249                        cause,
250                        num_established,
251                        ..
252                        // endpoint,
253                    } => {
254                        debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
255
256                        print_network_info(swarm.network_info(), "connection closed");
257
258                        #[cfg(all(feature = "prometheus", not(test)))]
259                        {
260                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
261                        }
262                    }
263                    SwarmEvent::IncomingConnection {
264                        connection_id,
265                        local_addr,
266                        send_back_addr,
267                    } => {
268                        trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p",  "incoming connection");
269                    }
270                    SwarmEvent::IncomingConnectionError {
271                        local_addr,
272                        connection_id,
273                        error,
274                        send_back_addr,
275                        peer_id
276                    } => {
277                        error!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
278
279                        print_network_info(swarm.network_info(), "incoming connection error");
280                    }
281                    SwarmEvent::OutgoingConnectionError {
282                        connection_id,
283                        error,
284                        peer_id
285                    } => {
286                        error!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
287
288                        print_network_info(swarm.network_info(), "outgoing connection error");
289                    }
290                    SwarmEvent::NewListenAddr {
291                        listener_id,
292                        address,
293                    } => {
294                        debug!(%listener_id, %address, transport="libp2p", "new listen address")
295                    }
296                    SwarmEvent::ExpiredListenAddr {
297                        listener_id,
298                        address,
299                    } => {
300                        debug!(%listener_id, %address, transport="libp2p", "expired listen address")
301                    }
302                    SwarmEvent::ListenerClosed {
303                        listener_id,
304                        addresses,
305                        reason,
306                    } => {
307                        debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
308                    }
309                    SwarmEvent::ListenerError {
310                        listener_id,
311                        error,
312                    } => {
313                        debug!(%listener_id, transport="libp2p", %error, "listener error")
314                    }
315                    SwarmEvent::Dialing {
316                        peer_id,
317                        connection_id,
318                    } => {
319                        debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
320                    }
321                    SwarmEvent::NewExternalAddrCandidate {address} => {
322                        debug!(%address, "Detected new external address candidate")
323                    }
324                    SwarmEvent::ExternalAddrConfirmed { address } => {
325                        info!(%address, "Detected external address")
326                    }
327                    SwarmEvent::ExternalAddrExpired {
328                        ..  // address: Multiaddr
329                    } => {}
330                    SwarmEvent::NewExternalAddrOfPeer {
331                        peer_id, address
332                    } => {
333                        // Only store public/routable addresses
334                        if allow_private_addrs || is_public_address(&address) {
335                            swarm.add_peer_address(peer_id, address.clone());
336                            trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Public peer address stored in swarm")
337                        } else {
338                            trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
339                        }
340                    },
341                    _ => trace!(transport="libp2p", "Unsupported enum option detected")
342                }
343            }
344        }
345    }
346}
347
348fn print_network_info(network_info: NetworkInfo, event: &str) {
349    let num_peers = network_info.num_peers();
350    let connection_counters = network_info.connection_counters();
351    let num_incoming = connection_counters.num_established_incoming();
352    let num_outgoing = connection_counters.num_established_outgoing();
353    info!(
354        num_peers,
355        num_incoming, num_outgoing, "swarm network status after {event}"
356    );
357}
358
359pub type TicketAggregationRequestType = OutboundRequestId;
360pub type TicketAggregationResponseType = ResponseChannel<std::result::Result<Ticket, String>>;