1use std::num::NonZeroU8;
2
3use futures::{Sink, SinkExt, Stream, StreamExt, select};
4use hopr_internal_types::prelude::*;
5use hopr_network_types::prelude::is_public_address;
6use hopr_transport_identity::{
7 Multiaddr,
8 multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
9};
10use hopr_transport_protocol::PeerDiscovery;
11use libp2p::{
12 autonat,
13 identity::PublicKey,
14 request_response::{OutboundRequestId, ResponseChannel},
15 swarm::{NetworkInfo, SwarmEvent},
16};
17use tracing::{debug, error, info, trace, warn};
18
19use crate::{HoprNetworkBehavior, HoprNetworkBehaviorEvent, constants, errors::Result};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22lazy_static::lazy_static! {
23 static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
24 "hopr_transport_p2p_opened_connection_count",
25 "Number of currently open connections"
26 ).unwrap();
27 static ref METRIC_TRANSPORT_NAT_STATUS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
28 "hopr_transport_p2p_nat_status",
29 "Current NAT status as reported by libp2p autonat. 0=Unknown, 1=Public, 2=Private"
30 ).unwrap();
31}
32
33async fn build_p2p_network<T>(
37 me: libp2p::identity::Keypair,
38 indexer_update_input: T,
39 allow_private_addresses: bool,
40) -> Result<libp2p::Swarm<HoprNetworkBehavior>>
41where
42 T: Stream<Item = PeerDiscovery> + Send + 'static,
43{
44 let me_public: PublicKey = me.public();
45
46 #[cfg(feature = "runtime-tokio")]
56 let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
57 .with_tokio()
58 .with_tcp(
59 libp2p::tcp::Config::default().nodelay(true),
60 libp2p::noise::Config::new,
61 libp2p::yamux::Config::default,
64 )
65 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?;
66
67 #[cfg(all(feature = "transport-quic", feature = "runtime-tokio"))]
68 let swarm = swarm.with_quic();
69
70 #[cfg(feature = "runtime-tokio")]
71 let swarm = swarm.with_dns();
72
73 Ok(swarm
74 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
75 .with_behaviour(|_key| HoprNetworkBehavior::new(me_public, indexer_update_input, allow_private_addresses))
76 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
77 .with_swarm_config(|cfg| {
78 cfg.with_dial_concurrency_factor(
79 NonZeroU8::new(
80 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
81 .map(|v| v.trim().parse::<u8>().unwrap_or(u8::MAX))
82 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT),
83 )
84 .expect("concurrently dialed peer count must be > 0"),
85 )
86 .with_max_negotiating_inbound_streams(
87 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
88 .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
89 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
90 )
91 .with_idle_connection_timeout(
92 std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
93 .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
94 .map(std::time::Duration::from_secs)
95 .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
96 )
97 })
98 .build())
99}
100
101pub struct HoprSwarm {
102 pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
103 pub allow_private_addresses: bool,
104}
105
106impl std::fmt::Debug for HoprSwarm {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct("HoprSwarm").finish()
109 }
110}
111
112impl From<HoprSwarm> for libp2p::Swarm<HoprNetworkBehavior> {
113 fn from(value: HoprSwarm) -> Self {
114 value.swarm
115 }
116}
117
118impl HoprSwarm {
119 pub async fn new<T>(
120 identity: libp2p::identity::Keypair,
121 indexer_update_input: T,
122 my_multiaddresses: Vec<Multiaddr>,
123 allow_private_addresses: bool,
124 ) -> Self
125 where
126 T: Stream<Item = PeerDiscovery> + Send + 'static,
127 {
128 let mut swarm = build_p2p_network(identity, indexer_update_input, allow_private_addresses)
129 .await
130 .expect("swarm must be constructible");
131
132 for multiaddress in my_multiaddresses.iter() {
133 match resolve_dns_if_any(multiaddress) {
134 Ok(ma) => {
135 if let Err(e) = swarm.listen_on(ma.clone()) {
136 warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
137
138 match replace_transport_with_unspecified(&ma) {
139 Ok(ma) => {
140 if let Err(e) = swarm.listen_on(ma.clone()) {
141 warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
142 } else {
143 info!(
144 listen_on = ?ma,
145 multiaddress = ?multiaddress,
146 "Listening for p2p connections"
147 );
148 swarm.add_external_address(multiaddress.clone());
149 }
150 }
151 Err(e) => {
152 error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
153 }
154 }
155 } else {
156 info!(
157 listen_on = ?ma,
158 multiaddress = ?multiaddress,
159 "Listening for p2p connections"
160 );
161 swarm.add_external_address(multiaddress.clone());
162 }
163 }
164 Err(e) => error!(%multiaddress, error = %e, "Failed to transform the multiaddress"),
165 }
166 }
167
168 Self {
176 swarm,
177 allow_private_addresses,
178 }
179 }
180
181 pub fn build_protocol_control(&self, protocol: &'static str) -> crate::HoprStreamProtocolControl {
182 crate::HoprStreamProtocolControl::new(self.swarm.behaviour().streams.new_control(), protocol)
183 }
184
185 pub async fn run<T>(self, events: T)
192 where
193 T: Sink<crate::behavior::discovery::Event> + Send + 'static,
194 {
195 let allow_private_addrs = self.allow_private_addresses;
196 let mut swarm: libp2p::Swarm<HoprNetworkBehavior> = self.into();
197 futures::pin_mut!(events);
198
199 loop {
200 select! {
201 event = swarm.select_next_some() => match event {
202 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(event)) => {
203 if let Err(_error) = events.send(event).await {
204 tracing::error!("Failed to send discovery event from the transport layer");
205 }
206 }
207 SwarmEvent::Behaviour(
208 HoprNetworkBehaviorEvent::Autonat(event)
209 ) => {
210 match event {
211 autonat::Event::StatusChanged { old, new } => {
212 info!(?old, ?new, "AutoNAT status changed");
213 #[cfg(all(feature = "prometheus", not(test)))]
214 {
215 let value = match new {
216 autonat::NatStatus::Unknown => 0.0,
217 autonat::NatStatus::Public(_) => 1.0,
218 autonat::NatStatus::Private => 2.0,
219 };
220 METRIC_TRANSPORT_NAT_STATUS.set(value);
221 }
222 }
223 autonat::Event::InboundProbe { .. } => {}
224 autonat::Event::OutboundProbe { .. } => {}
225 }
226 }
227 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Identify(_event)) => {}
228 SwarmEvent::ConnectionEstablished {
229 peer_id,
230 connection_id,
231 num_established,
232 established_in,
233 ..
234 } => {
237 debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
238
239 print_network_info(swarm.network_info(), "connection established");
240
241 #[cfg(all(feature = "prometheus", not(test)))]
242 {
243 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
244 }
245 }
246 SwarmEvent::ConnectionClosed {
247 peer_id,
248 connection_id,
249 cause,
250 num_established,
251 ..
252 } => {
254 debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
255
256 print_network_info(swarm.network_info(), "connection closed");
257
258 #[cfg(all(feature = "prometheus", not(test)))]
259 {
260 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
261 }
262 }
263 SwarmEvent::IncomingConnection {
264 connection_id,
265 local_addr,
266 send_back_addr,
267 } => {
268 trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", "incoming connection");
269 }
270 SwarmEvent::IncomingConnectionError {
271 local_addr,
272 connection_id,
273 error,
274 send_back_addr,
275 peer_id
276 } => {
277 error!(?peer_id, %local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
278
279 print_network_info(swarm.network_info(), "incoming connection error");
280 }
281 SwarmEvent::OutgoingConnectionError {
282 connection_id,
283 error,
284 peer_id
285 } => {
286 error!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
287
288 print_network_info(swarm.network_info(), "outgoing connection error");
289 }
290 SwarmEvent::NewListenAddr {
291 listener_id,
292 address,
293 } => {
294 debug!(%listener_id, %address, transport="libp2p", "new listen address")
295 }
296 SwarmEvent::ExpiredListenAddr {
297 listener_id,
298 address,
299 } => {
300 debug!(%listener_id, %address, transport="libp2p", "expired listen address")
301 }
302 SwarmEvent::ListenerClosed {
303 listener_id,
304 addresses,
305 reason,
306 } => {
307 debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
308 }
309 SwarmEvent::ListenerError {
310 listener_id,
311 error,
312 } => {
313 debug!(%listener_id, transport="libp2p", %error, "listener error")
314 }
315 SwarmEvent::Dialing {
316 peer_id,
317 connection_id,
318 } => {
319 debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
320 }
321 SwarmEvent::NewExternalAddrCandidate {address} => {
322 debug!(%address, "Detected new external address candidate")
323 }
324 SwarmEvent::ExternalAddrConfirmed { address } => {
325 info!(%address, "Detected external address")
326 }
327 SwarmEvent::ExternalAddrExpired {
328 .. } => {}
330 SwarmEvent::NewExternalAddrOfPeer {
331 peer_id, address
332 } => {
333 if allow_private_addrs || is_public_address(&address) {
335 swarm.add_peer_address(peer_id, address.clone());
336 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Public peer address stored in swarm")
337 } else {
338 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "Private/local peer address ignored")
339 }
340 },
341 _ => trace!(transport="libp2p", "Unsupported enum option detected")
342 }
343 }
344 }
345 }
346}
347
348fn print_network_info(network_info: NetworkInfo, event: &str) {
349 let num_peers = network_info.num_peers();
350 let connection_counters = network_info.connection_counters();
351 let num_incoming = connection_counters.num_established_incoming();
352 let num_outgoing = connection_counters.num_established_outgoing();
353 info!(
354 num_peers,
355 num_incoming, num_outgoing, "swarm network status after {event}"
356 );
357}
358
359pub type TicketAggregationRequestType = OutboundRequestId;
360pub type TicketAggregationResponseType = ResponseChannel<std::result::Result<Ticket, String>>;