1use std::num::NonZeroU8;
2
3use futures::{Sink, SinkExt, Stream, StreamExt, select};
4use hopr_internal_types::prelude::*;
5use hopr_network_types::prelude::is_public_address;
6use hopr_transport_identity::{
7 Multiaddr,
8 multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
9};
10use hopr_transport_protocol::PeerDiscovery;
11use libp2p::{
12 autonat,
13 identity::PublicKey,
14 request_response::{OutboundRequestId, ResponseChannel},
15 swarm::{NetworkInfo, SwarmEvent},
16};
17use tracing::{debug, error, info, trace, warn};
18
19use crate::{HoprNetworkBehavior, HoprNetworkBehaviorEvent, constants, errors::Result};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23 static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
24 "hopr_transport_p2p_opened_connection_count",
25 "Number of currently open connections"
26 ).unwrap();
27 static ref METRIC_TRANSPORT_NAT_STATUS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
28 "hopr_transport_p2p_nat_status",
29 "Current NAT status as reported by libp2p autonat. 0=Unknown, 1=Public, 2=Private"
30 ).unwrap();
31}
32
33async fn build_p2p_network<T>(
37 me: libp2p::identity::Keypair,
38 indexer_update_input: T,
39) -> Result<libp2p::Swarm<HoprNetworkBehavior>>
40where
41 T: Stream<Item = PeerDiscovery> + Send + 'static,
42{
43 let me_public: PublicKey = me.public();
44
45 #[cfg(feature = "runtime-tokio")]
55 let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
56 .with_tokio()
57 .with_tcp(
58 libp2p::tcp::Config::default().nodelay(true),
59 libp2p::noise::Config::new,
60 libp2p::yamux::Config::default,
63 )
64 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
65
66 #[cfg(all(feature = "transport-quic", feature = "runtime-tokio"))]
67 let swarm = swarm.with_quic();
68
69 #[cfg(feature = "runtime-tokio")]
70 let swarm = swarm.with_dns();
71
72 Ok(swarm
73 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
74 .with_behaviour(|_key| HoprNetworkBehavior::new(me_public, indexer_update_input))
75 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
76 .with_swarm_config(|cfg| {
77 cfg.with_dial_concurrency_factor(
78 NonZeroU8::new(
79 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
80 .map(|v| v.trim().parse::<u8>().unwrap_or(u8::MAX))
81 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT),
82 )
83 .expect("concurrently dialed peer count must be > 0"),
84 )
85 .with_max_negotiating_inbound_streams(
86 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
87 .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
88 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
89 )
90 .with_idle_connection_timeout(
91 std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
92 .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
93 .map(std::time::Duration::from_secs)
94 .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
95 )
96 })
97 .build())
98}
99
100pub struct HoprSwarm {
101 pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
102}
103
104impl std::fmt::Debug for HoprSwarm {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("HoprSwarm").finish()
107 }
108}
109
110impl From<HoprSwarm> for libp2p::Swarm<HoprNetworkBehavior> {
111 fn from(value: HoprSwarm) -> Self {
112 value.swarm
113 }
114}
115
116impl HoprSwarm {
117 pub async fn new<T>(
118 identity: libp2p::identity::Keypair,
119 indexer_update_input: T,
120 my_multiaddresses: Vec<Multiaddr>,
121 ) -> Self
122 where
123 T: Stream<Item = PeerDiscovery> + Send + 'static,
124 {
125 let mut swarm = build_p2p_network(identity, indexer_update_input)
126 .await
127 .expect("swarm must be constructible");
128
129 for multiaddress in my_multiaddresses.iter() {
130 match resolve_dns_if_any(multiaddress) {
131 Ok(ma) => {
132 if let Err(e) = swarm.listen_on(ma.clone()) {
133 warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
134
135 match replace_transport_with_unspecified(&ma) {
136 Ok(ma) => {
137 if let Err(e) = swarm.listen_on(ma.clone()) {
138 warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
139 } else {
140 info!(
141 listen_on = ?ma,
142 multiaddress = ?multiaddress,
143 "Listening for p2p connections"
144 );
145 swarm.add_external_address(multiaddress.clone());
146 }
147 }
148 Err(e) => {
149 error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
150 }
151 }
152 } else {
153 info!(
154 listen_on = ?ma,
155 multiaddress = ?multiaddress,
156 "Listening for p2p connections"
157 );
158 swarm.add_external_address(multiaddress.clone());
159 }
160 }
161 Err(e) => error!(%multiaddress, error = %e, "Failed to transform the multiaddress"),
162 }
163 }
164
165 Self { swarm }
173 }
174
175 pub fn build_protocol_control(&self, protocol: &'static str) -> crate::HoprStreamProtocolControl {
176 crate::HoprStreamProtocolControl::new(self.swarm.behaviour().streams.new_control(), protocol)
177 }
178
179 pub async fn run<T>(self, events: T)
186 where
187 T: Sink<crate::behavior::discovery::Event> + Send + 'static,
188 {
189 let mut swarm: libp2p::Swarm<HoprNetworkBehavior> = self.into();
190 futures::pin_mut!(events);
191
192 loop {
193 select! {
194 event = swarm.select_next_some() => match event {
195 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(event)) => {
196 if let Err(_error) = events.send(event).await {
197 tracing::error!("Failed to send discovery event from the transport layer");
198 }
199 }
200 SwarmEvent::Behaviour(
201 HoprNetworkBehaviorEvent::Autonat(event)
202 ) => {
203 match event {
204 autonat::Event::StatusChanged { old, new } => {
205 info!(?old, ?new, "AutoNAT status changed");
206 #[cfg(all(feature = "prometheus", not(test)))]
207 {
208 let value = match new {
209 autonat::NatStatus::Unknown => 0.0,
210 autonat::NatStatus::Public(_) => 1.0,
211 autonat::NatStatus::Private => 2.0,
212 };
213 METRIC_TRANSPORT_NAT_STATUS.set(value);
214 }
215 }
216 autonat::Event::InboundProbe { .. } => {}
217 autonat::Event::OutboundProbe { .. } => {}
218 }
219 }
220 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Identify(_event)) => {}
221 SwarmEvent::ConnectionEstablished {
222 peer_id,
223 connection_id,
224 num_established,
225 established_in,
226 ..
227 } => {
230 debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
231
232 print_network_info(swarm.network_info(), "connection established");
233
234 #[cfg(all(feature = "prometheus", not(test)))]
235 {
236 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
237 }
238 }
239 SwarmEvent::ConnectionClosed {
240 peer_id,
241 connection_id,
242 cause,
243 num_established,
244 ..
245 } => {
247 debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
248
249 print_network_info(swarm.network_info(), "connection closed");
250
251 #[cfg(all(feature = "prometheus", not(test)))]
252 {
253 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
254 }
255 }
256 SwarmEvent::IncomingConnection {
257 connection_id,
258 local_addr,
259 send_back_addr,
260 } => {
261 trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", "incoming connection");
262 }
263 SwarmEvent::IncomingConnectionError {
264 local_addr,
265 connection_id,
266 error,
267 send_back_addr,
268 peer_id
269 } => {
270 error!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
271
272 print_network_info(swarm.network_info(), "incoming connection error");
273 }
274 SwarmEvent::OutgoingConnectionError {
275 connection_id,
276 error,
277 peer_id
278 } => {
279 error!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
280
281 print_network_info(swarm.network_info(), "outgoing connection error");
282 }
283 SwarmEvent::NewListenAddr {
284 listener_id,
285 address,
286 } => {
287 debug!(%listener_id, %address, transport="libp2p", "new listen address")
288 }
289 SwarmEvent::ExpiredListenAddr {
290 listener_id,
291 address,
292 } => {
293 debug!(%listener_id, %address, transport="libp2p", "expired listen address")
294 }
295 SwarmEvent::ListenerClosed {
296 listener_id,
297 addresses,
298 reason,
299 } => {
300 debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
301 }
302 SwarmEvent::ListenerError {
303 listener_id,
304 error,
305 } => {
306 debug!(%listener_id, transport="libp2p", %error, "listener error")
307 }
308 SwarmEvent::Dialing {
309 peer_id,
310 connection_id,
311 } => {
312 debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
313 }
314 SwarmEvent::NewExternalAddrCandidate {address} => {
315 debug!(%address, "Detected new external address candidate")
316 }
317 SwarmEvent::ExternalAddrConfirmed { address } => {
318 info!(%address, "Detected external address")
319 }
320 SwarmEvent::ExternalAddrExpired {
321 .. } => {}
323 SwarmEvent::NewExternalAddrOfPeer {
324 peer_id, address
325 } => {
326 if is_public_address(&address) {
328 swarm.add_peer_address(peer_id, address.clone());
329 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Public peer address stored in swarm")
330 } else {
331 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
332 }
333 },
334 _ => trace!(transport="libp2p", "Unsupported enum option detected")
335 }
336 }
337 }
338 }
339}
340
341fn print_network_info(network_info: NetworkInfo, event: &str) {
342 let num_peers = network_info.num_peers();
343 let connection_counters = network_info.connection_counters();
344 let num_incoming = connection_counters.num_established_incoming();
345 let num_outgoing = connection_counters.num_established_outgoing();
346 info!(
347 num_peers,
348 num_incoming, num_outgoing, "swarm network status after {event}"
349 );
350}
351
352pub type TicketAggregationRequestType = OutboundRequestId;
353pub type TicketAggregationResponseType = ResponseChannel<std::result::Result<Ticket, String>>;