hopr_transport_p2p/
swarm.rs

1use std::{net::Ipv4Addr, num::NonZeroU8};
2
3use futures::{Sink, SinkExt, Stream, StreamExt, select};
4use hopr_internal_types::prelude::*;
5use hopr_network_types::prelude::is_public_address;
6use hopr_transport_identity::{
7    Multiaddr, PeerId,
8    multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
9};
10use hopr_transport_protocol::PeerDiscovery;
11use libp2p::{
12    autonat,
13    multiaddr::Protocol,
14    request_response::{OutboundRequestId, ResponseChannel},
15    swarm::{NetworkInfo, SwarmEvent, dial_opts::DialOpts},
16};
17use tracing::{debug, error, info, trace, warn};
18
19use crate::{HoprNetworkBehavior, HoprNetworkBehaviorEvent, constants, errors::Result};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23    static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
24        "hopr_transport_p2p_opened_connection_count",
25        "Number of currently open connections"
26    ).unwrap();
27}
28
29/// Build objects comprising the p2p network.
30///
31/// Returns a built [libp2p::Swarm] object implementing the HoprNetworkBehavior functionality.
32async fn build_p2p_network<T>(
33    me: libp2p::identity::Keypair,
34    indexer_update_input: T,
35) -> Result<libp2p::Swarm<HoprNetworkBehavior>>
36where
37    T: Stream<Item = PeerDiscovery> + Send + 'static,
38{
39    let me_peerid: PeerId = me.public().into();
40
41    // Both features could be enabled during testing, therefore we only use tokio when its
42    // exclusively enabled.
43    //
44    // NOTE: Private address filtering is implemented at multiple levels for defense-in-depth:
45    // 1. Discovery behavior filters PeerDiscovery::Allow and PeerDiscovery::Announce events
46    // 2. SwarmEvent::NewExternalAddrOfPeer events are filtered using is_public_address()
47    // 3. Network layer filters addresses in both add() and get() methods (primary protection)
48    // 4. libp2p's global_only transport wrapper could be added here but the above filtering provides equivalent
49    //    protection while maintaining compatibility with the existing code
50    #[cfg(feature = "runtime-tokio")]
51    let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
52        .with_tokio()
53        .with_tcp(
54            libp2p::tcp::Config::default().nodelay(true),
55            libp2p::noise::Config::new,
56            // use default yamux configuration to enable auto-tuning
57            // see https://github.com/libp2p/rust-libp2p/pull/4970
58            libp2p::yamux::Config::default,
59        )
60        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
61
62    #[cfg(all(feature = "transport-quic", feature = "runtime-tokio"))]
63    let swarm = swarm.with_quic();
64
65    #[cfg(feature = "runtime-tokio")]
66    let swarm = swarm.with_dns();
67
68    Ok(swarm
69        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
70        .with_behaviour(|_key| HoprNetworkBehavior::new(me_peerid, indexer_update_input))
71        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
72        .with_swarm_config(|cfg| {
73            cfg.with_dial_concurrency_factor(
74                NonZeroU8::new(
75                    std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
76                        .map(|v| v.trim().parse::<u8>().unwrap_or(u8::MAX))
77                        .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT),
78                )
79                .expect("concurrently dialed peer count must be > 0"),
80            )
81            .with_max_negotiating_inbound_streams(
82                std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
83                    .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
84                    .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
85            )
86            .with_idle_connection_timeout(
87                std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
88                    .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
89                    .map(std::time::Duration::from_secs)
90                    .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
91            )
92        })
93        .build())
94}
95
96pub struct HoprSwarm {
97    pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
98}
99
100impl std::fmt::Debug for HoprSwarm {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("HoprSwarm").finish()
103    }
104}
105
106impl From<HoprSwarm> for libp2p::Swarm<HoprNetworkBehavior> {
107    fn from(value: HoprSwarm) -> Self {
108        value.swarm
109    }
110}
111
112impl HoprSwarm {
113    pub async fn new<T>(
114        identity: libp2p::identity::Keypair,
115        indexer_update_input: T,
116        my_multiaddresses: Vec<Multiaddr>,
117    ) -> Self
118    where
119        T: Stream<Item = PeerDiscovery> + Send + 'static,
120    {
121        let mut swarm = build_p2p_network(identity, indexer_update_input)
122            .await
123            .expect("swarm must be constructible");
124
125        for multiaddress in my_multiaddresses.iter() {
126            match resolve_dns_if_any(multiaddress) {
127                Ok(ma) => {
128                    if let Err(e) = swarm.listen_on(ma.clone()) {
129                        warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
130
131                        match replace_transport_with_unspecified(&ma) {
132                            Ok(ma) => {
133                                if let Err(e) = swarm.listen_on(ma.clone()) {
134                                    warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
135                                } else {
136                                    info!(
137                                        listen_on = ?ma,
138                                        multiaddress = ?multiaddress,
139                                        "Listening for p2p connections"
140                                    );
141                                    swarm.add_external_address(multiaddress.clone());
142                                }
143                            }
144                            Err(e) => {
145                                error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
146                            }
147                        }
148                    } else {
149                        info!(
150                            listen_on = ?ma,
151                            multiaddress = ?multiaddress,
152                            "Listening for p2p connections"
153                        );
154                        swarm.add_external_address(multiaddress.clone());
155                    }
156                }
157                Err(e) => error!(%multiaddress, error = %e, "Failed to transform the multiaddress"),
158            }
159        }
160
161        // TODO: perform this check
162        // NOTE: This would be a valid check but is not immediate
163        // assert!(
164        //     swarm.listeners().count() > 0,
165        //     "The node failed to listen on at least one of the specified interfaces"
166        // );
167
168        Self { swarm }
169    }
170
171    pub fn build_protocol_control(&self, protocol: &'static str) -> crate::HoprStreamProtocolControl {
172        crate::HoprStreamProtocolControl::new(self.swarm.behaviour().streams.new_control(), protocol)
173    }
174
175    /// Main p2p loop that instantiates a new libp2p::Swarm instance and sets up listening and reacting pipelines
176    /// running in a neverending loop future.
177    ///
178    /// The function represents the entirety of the business logic of the hopr daemon related to core operations.
179    ///
180    /// This future can only be resolved by an unrecoverable error or a panic.
181    pub async fn run<T>(self, events: T)
182    where
183        T: Sink<crate::behavior::discovery::Event> + Send + 'static,
184    {
185        let mut swarm: libp2p::Swarm<HoprNetworkBehavior> = self.into();
186        futures::pin_mut!(events);
187
188        loop {
189            select! {
190                event = swarm.select_next_some() => match event {
191                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(event)) => {
192                        if let Err(_error) = events.send(event).await {
193                            tracing::error!("Failed to send discovery event from the transport layer");
194                        }
195                    }
196                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::AutonatClient(autonat::v2::client::Event {
197                        server,
198                        tested_addr,
199                        bytes_sent,
200                        result,
201                    })) => {
202                        match result {
203                            Ok(_) => {
204                                debug!(%server, %tested_addr, %bytes_sent, "Autonat server successfully tested");
205                            }
206                            Err(error) => {
207                                warn!(%server, %tested_addr, %bytes_sent, %error, "Autonat server test failed");
208                            }
209                        }
210                    }
211                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::AutonatServer(event)) => {
212                        warn!(?event, "Autonat server event");
213                    }
214                    SwarmEvent::ConnectionEstablished {
215                        peer_id,
216                        connection_id,
217                        num_established,
218                        established_in,
219                        ..
220                        // concurrent_dial_errors,
221                        // endpoint,
222                    } => {
223                        debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
224
225                        print_network_info(swarm.network_info(), "connection established");
226
227                        #[cfg(all(feature = "prometheus", not(test)))]
228                        {
229                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
230                        }
231                    }
232                    SwarmEvent::ConnectionClosed {
233                        peer_id,
234                        connection_id,
235                        cause,
236                        num_established,
237                        ..
238                        // endpoint,
239                    } => {
240                        debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
241
242                        print_network_info(swarm.network_info(), "connection closed");
243
244                        #[cfg(all(feature = "prometheus", not(test)))]
245                        {
246                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
247                        }
248                    }
249                    SwarmEvent::IncomingConnection {
250                        connection_id,
251                        local_addr,
252                        send_back_addr,
253                    } => {
254                        trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p",  "incoming connection");
255                    }
256                    SwarmEvent::IncomingConnectionError {
257                        local_addr,
258                        connection_id,
259                        error,
260                        send_back_addr,
261                        peer_id
262                    } => {
263                        error!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
264
265                        print_network_info(swarm.network_info(), "incoming connection error");
266                    }
267                    SwarmEvent::OutgoingConnectionError {
268                        connection_id,
269                        error,
270                        peer_id
271                    } => {
272                        error!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
273
274                        print_network_info(swarm.network_info(), "outgoing connection error");
275                    }
276                    SwarmEvent::NewListenAddr {
277                        listener_id,
278                        address,
279                    } => {
280                        debug!(%listener_id, %address, transport="libp2p", "new listen address")
281                    }
282                    SwarmEvent::ExpiredListenAddr {
283                        listener_id,
284                        address,
285                    } => {
286                        debug!(%listener_id, %address, transport="libp2p", "expired listen address")
287                    }
288                    SwarmEvent::ListenerClosed {
289                        listener_id,
290                        addresses,
291                        reason,
292                    } => {
293                        debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
294                    }
295                    SwarmEvent::ListenerError {
296                        listener_id,
297                        error,
298                    } => {
299                        debug!(%listener_id, transport="libp2p", %error, "listener error")
300                    }
301                    SwarmEvent::Dialing {
302                        peer_id,
303                        connection_id,
304                    } => {
305                        debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
306                    }
307                    SwarmEvent::NewExternalAddrCandidate {
308                        ..  // address: Multiaddr
309                    } => {}
310                    SwarmEvent::ExternalAddrConfirmed { address } => {
311                        info!(%address, "Detected external address")
312                    }
313                    SwarmEvent::ExternalAddrExpired {
314                        ..  // address: Multiaddr
315                    } => {}
316                    SwarmEvent::NewExternalAddrOfPeer {
317                        peer_id, address
318                    } => {
319                        // Only store public/routable addresses
320                        if is_public_address(&address) {
321                            swarm.add_peer_address(peer_id, address.clone());
322                            trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Public peer address stored in swarm")
323                        } else {
324                            trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
325                        }
326                    },
327                    _ => trace!(transport="libp2p", "Unsupported enum option detected")
328                }
329            }
330        }
331    }
332
333    pub fn run_nat_server(&mut self, port: u16) {
334        info!(listen_on = port, "Starting NAT server");
335
336        match self.swarm.listen_on(
337            Multiaddr::empty()
338                .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED))
339                .with(Protocol::Tcp(port)),
340        ) {
341            Ok(_) => {
342                info!("NAT server started");
343            }
344            Err(e) => {
345                warn!(error = %e, "Failed to listen on NAT server");
346            }
347        }
348    }
349
350    pub fn dial_nat_server(&mut self, addresses: Vec<Multiaddr>) {
351        // let dial_opts = DialOpts::peer_id(PeerId::random())
352        //     .addresses(addresses)
353        //     .extend_addresses_through_behaviour()
354        //     .build();
355        info!(
356            num_addresses = addresses.len(),
357            "Dialing NAT servers with multiple candidate addresses"
358        );
359
360        for addr in addresses {
361            let dial_opts = DialOpts::unknown_peer_id().address(addr.clone()).build();
362            if let Err(e) = self.swarm.dial(dial_opts) {
363                warn!(%addr, %e, "Failed to dial NAT server address");
364            } else {
365                info!(%addr, "Dialed NAT server address");
366                break;
367            }
368        }
369    }
370}
371
372fn print_network_info(network_info: NetworkInfo, event: &str) {
373    let num_peers = network_info.num_peers();
374    let connection_counters = network_info.connection_counters();
375    let num_incoming = connection_counters.num_established_incoming();
376    let num_outgoing = connection_counters.num_established_outgoing();
377    info!(
378        num_peers,
379        num_incoming, num_outgoing, "swarm network status after {event}"
380    );
381}
382
383pub type TicketAggregationRequestType = OutboundRequestId;
384pub type TicketAggregationResponseType = ResponseChannel<std::result::Result<Ticket, String>>;