hopr_transport_p2p/
swarm.rs

1use std::{net::Ipv4Addr, num::NonZeroU8};
2
3use futures::{Stream, StreamExt, select};
4use hopr_internal_types::prelude::*;
5#[cfg(all(feature = "prometheus", not(test)))]
6use hopr_metrics::metrics::SimpleGauge;
7use hopr_transport_identity::{
8    Multiaddr, PeerId,
9    multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
10};
11use hopr_transport_protocol::PeerDiscovery;
12use libp2p::{
13    autonat,
14    multiaddr::Protocol,
15    request_response::{OutboundRequestId, ResponseChannel},
16    swarm::{NetworkInfo, SwarmEvent, dial_opts::DialOpts},
17};
18use tracing::{debug, error, info, trace, warn};
19
20use crate::{HoprNetworkBehavior, HoprNetworkBehaviorEvent, constants, errors::Result};
21
22#[cfg(all(feature = "prometheus", not(test)))]
23lazy_static::lazy_static! {
24    static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT: SimpleGauge = SimpleGauge::new(
25        "hopr_transport_p2p_opened_connection_count",
26        "Number of currently open connections"
27    ).unwrap();
28}
29
30/// Build objects comprising the p2p network.
31///
32/// Returns a built [libp2p::Swarm] object implementing the HoprNetworkBehavior functionality.
33async fn build_p2p_network<T>(
34    me: libp2p::identity::Keypair,
35    indexer_update_input: T,
36) -> Result<libp2p::Swarm<HoprNetworkBehavior>>
37where
38    T: Stream<Item = PeerDiscovery> + Send + 'static,
39{
40    let me_peerid: PeerId = me.public().into();
41
42    // Both features could be enabled during testing, therefore we only use tokio when its
43    // exclusively enabled.
44    #[cfg(feature = "runtime-tokio")]
45    let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
46        .with_tokio()
47        .with_tcp(
48            libp2p::tcp::Config::default().nodelay(true),
49            libp2p::noise::Config::new,
50            // use default yamux configuration to enable auto-tuning
51            // see https://github.com/libp2p/rust-libp2p/pull/4970
52            libp2p::yamux::Config::default,
53        )
54        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
55        .with_quic()
56        .with_dns();
57
58    Ok(swarm
59        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
60        .with_behaviour(|_key| HoprNetworkBehavior::new(me_peerid, indexer_update_input))
61        .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
62        .with_swarm_config(|cfg| {
63            cfg.with_dial_concurrency_factor(
64                NonZeroU8::new(
65                    std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
66                        .map(|v| v.trim().parse::<u8>().unwrap_or(u8::MAX))
67                        .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT),
68                )
69                .expect("concurrently dialed peer count must be > 0"),
70            )
71            .with_max_negotiating_inbound_streams(
72                std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
73                    .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
74                    .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
75            )
76            .with_idle_connection_timeout(
77                std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
78                    .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
79                    .map(std::time::Duration::from_secs)
80                    .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
81            )
82        })
83        .build())
84}
85
86pub struct HoprSwarm {
87    pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
88}
89
90impl std::fmt::Debug for HoprSwarm {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.debug_struct("HoprSwarm").finish()
93    }
94}
95
96impl From<HoprSwarm> for libp2p::Swarm<HoprNetworkBehavior> {
97    fn from(value: HoprSwarm) -> Self {
98        value.swarm
99    }
100}
101
102impl HoprSwarm {
103    pub async fn new<T>(
104        identity: libp2p::identity::Keypair,
105        indexer_update_input: T,
106        my_multiaddresses: Vec<Multiaddr>,
107    ) -> Self
108    where
109        T: Stream<Item = PeerDiscovery> + Send + 'static,
110    {
111        let mut swarm = build_p2p_network(identity, indexer_update_input)
112            .await
113            .expect("swarm must be constructible");
114
115        for multiaddress in my_multiaddresses.iter() {
116            match resolve_dns_if_any(multiaddress) {
117                Ok(ma) => {
118                    if let Err(e) = swarm.listen_on(ma.clone()) {
119                        warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
120
121                        match replace_transport_with_unspecified(&ma) {
122                            Ok(ma) => {
123                                if let Err(e) = swarm.listen_on(ma.clone()) {
124                                    warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
125                                } else {
126                                    info!(
127                                        listen_on = ?ma,
128                                        multiaddress = ?multiaddress,
129                                        "Listening for p2p connections"
130                                    );
131                                    swarm.add_external_address(multiaddress.clone());
132                                }
133                            }
134                            Err(e) => {
135                                error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
136                            }
137                        }
138                    } else {
139                        info!(
140                            listen_on = ?ma,
141                            multiaddress = ?multiaddress,
142                            "Listening for p2p connections"
143                        );
144                        swarm.add_external_address(multiaddress.clone());
145                    }
146                }
147                Err(e) => error!(%multiaddress, error = %e, "Failed to transform the multiaddress"),
148            }
149        }
150
151        // TODO: perform this check
152        // NOTE: This would be a valid check but is not immediate
153        // assert!(
154        //     swarm.listeners().count() > 0,
155        //     "The node failed to listen on at least one of the specified interfaces"
156        // );
157
158        Self { swarm }
159    }
160
161    pub fn build_protocol_control(&self, protocol: &'static str) -> crate::HoprStreamProtocolControl {
162        crate::HoprStreamProtocolControl::new(self.swarm.behaviour().streams.new_control(), protocol)
163    }
164
165    /// Main p2p loop that instantiates a new libp2p::Swarm instance and sets up listening and reacting pipelines
166    /// running in a neverending loop future.
167    ///
168    /// The function represents the entirety of the business logic of the hopr daemon related to core operations.
169    ///
170    /// This future can only be resolved by an unrecoverable error or a panic.
171    pub async fn run(self) {
172        let mut swarm: libp2p::Swarm<HoprNetworkBehavior> = self.into();
173
174        loop {
175            select! {
176                event = swarm.select_next_some() => match event {
177                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(_)) => {}
178                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::AutonatClient(autonat::v2::client::Event {
179                        server,
180                        tested_addr,
181                        bytes_sent,
182                        result,
183                    })) => {
184                        match result {
185                            Ok(_) => {
186                                debug!(%server, %tested_addr, %bytes_sent, "Autonat server successfully tested");
187                            }
188                            Err(e) => {
189                                warn!(%server, %tested_addr, %bytes_sent, %e, "Autonat server test failed");
190                            }
191                        }
192                    }
193                    SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::AutonatServer(event)) => {
194                        warn!(?event, "Autonat server event");
195                    }
196                    SwarmEvent::ConnectionEstablished {
197                        peer_id,
198                        connection_id,
199                        num_established,
200                        established_in,
201                        ..
202                        // concurrent_dial_errors,
203                        // endpoint,
204                    } => {
205                        debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
206
207                        print_network_info(swarm.network_info(), "connection established");
208
209                        #[cfg(all(feature = "prometheus", not(test)))]
210                        {
211                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
212                        }
213                    }
214                    SwarmEvent::ConnectionClosed {
215                        peer_id,
216                        connection_id,
217                        cause,
218                        num_established,
219                        ..
220                        // endpoint,
221                    } => {
222                        debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
223
224                        print_network_info(swarm.network_info(), "connection closed");
225
226                        #[cfg(all(feature = "prometheus", not(test)))]
227                        {
228                            METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
229                        }
230                    }
231                    SwarmEvent::IncomingConnection {
232                        connection_id,
233                        local_addr,
234                        send_back_addr,
235                    } => {
236                        trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p",  "incoming connection");
237                    }
238                    SwarmEvent::IncomingConnectionError {
239                        local_addr,
240                        connection_id,
241                        error,
242                        send_back_addr,
243                    } => {
244                        error!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
245
246                        print_network_info(swarm.network_info(), "incoming connection error");
247                    }
248                    SwarmEvent::OutgoingConnectionError {
249                        connection_id,
250                        error,
251                        peer_id
252                    } => {
253                        error!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
254
255                        print_network_info(swarm.network_info(), "outgoing connection error");
256                    }
257                    SwarmEvent::NewListenAddr {
258                        listener_id,
259                        address,
260                    } => {
261                        debug!(%listener_id, %address, transport="libp2p", "new listen address")
262                    }
263                    SwarmEvent::ExpiredListenAddr {
264                        listener_id,
265                        address,
266                    } => {
267                        debug!(%listener_id, %address, transport="libp2p", "expired listen address")
268                    }
269                    SwarmEvent::ListenerClosed {
270                        listener_id,
271                        addresses,
272                        reason,
273                    } => {
274                        debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
275                    }
276                    SwarmEvent::ListenerError {
277                        listener_id,
278                        error,
279                    } => {
280                        debug!(%listener_id, transport="libp2p", %error, "listener error")
281                    }
282                    SwarmEvent::Dialing {
283                        peer_id,
284                        connection_id,
285                    } => {
286                        debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
287                    }
288                    SwarmEvent::NewExternalAddrCandidate {
289                        ..  // address: Multiaddr
290                    } => {}
291                    SwarmEvent::ExternalAddrConfirmed { address } => {
292                        info!(%address, "Detected external address")
293                    }
294                    SwarmEvent::ExternalAddrExpired {
295                        ..  // address: Multiaddr
296                    } => {}
297                    SwarmEvent::NewExternalAddrOfPeer {
298                        peer_id, address
299                    } => {
300                        swarm.add_peer_address(peer_id, address.clone());
301                        trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "New peer stored in swarm")
302                    },
303                    _ => trace!(transport="libp2p", "Unsupported enum option detected")
304                }
305            }
306        }
307    }
308
309    pub fn run_nat_server(&mut self, port: u16) {
310        info!(listen_on = port, "Starting NAT server");
311
312        match self.swarm.listen_on(
313            Multiaddr::empty()
314                .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED))
315                .with(Protocol::Tcp(port)),
316        ) {
317            Ok(_) => {
318                info!("NAT server started");
319            }
320            Err(e) => {
321                warn!(error = %e, "Failed to listen on NAT server");
322            }
323        }
324    }
325
326    pub fn dial_nat_server(&mut self, addresses: Vec<Multiaddr>) {
327        // let dial_opts = DialOpts::peer_id(PeerId::random())
328        //     .addresses(addresses)
329        //     .extend_addresses_through_behaviour()
330        //     .build();
331        info!(
332            num_addresses = addresses.len(),
333            "Dialing NAT servers with multiple candidate addresses"
334        );
335
336        for addr in addresses {
337            let dial_opts = DialOpts::unknown_peer_id().address(addr.clone()).build();
338            if let Err(e) = self.swarm.dial(dial_opts) {
339                warn!(%addr, %e, "Failed to dial NAT server address");
340            } else {
341                info!(%addr, "Dialed NAT server address");
342                break;
343            }
344        }
345    }
346}
347
348fn print_network_info(network_info: NetworkInfo, event: &str) {
349    let num_peers = network_info.num_peers();
350    let connection_counters = network_info.connection_counters();
351    let num_incoming = connection_counters.num_established_incoming();
352    let num_outgoing = connection_counters.num_established_outgoing();
353    info!(
354        num_peers,
355        num_incoming, num_outgoing, "swarm network status after {event}"
356    );
357}
358
359pub type TicketAggregationRequestType = OutboundRequestId;
360pub type TicketAggregationResponseType = ResponseChannel<std::result::Result<Ticket, String>>;