1use futures::{select, Stream, StreamExt};
2use libp2p::swarm::NetworkInfo;
3use libp2p::{request_response::OutboundRequestId, request_response::ResponseChannel, swarm::SwarmEvent};
4use std::num::NonZeroU8;
5use tracing::{debug, error, info, trace, warn};
6
7use hopr_internal_types::prelude::*;
8use hopr_transport_identity::{
9 multiaddrs::{replace_transport_with_unspecified, resolve_dns_if_any},
10 Multiaddr, PeerId,
11};
12use hopr_transport_network::{messaging::ControlMessage, network::NetworkTriggeredEvent, ping::PingQueryReplier};
13use hopr_transport_protocol::{
14 config::ProtocolConfig,
15 ticket_aggregation::processor::{TicketAggregationActions, TicketAggregationFinalizer, TicketAggregationProcessed},
16 PeerDiscovery,
17};
18
19use crate::{constants, errors::Result, HoprNetworkBehavior, HoprNetworkBehaviorEvent, Ping, Pong};
20
21#[cfg(all(feature = "prometheus", not(test)))]
22use hopr_metrics::metrics::SimpleGauge;
23
24#[cfg(all(feature = "prometheus", not(test)))]
25lazy_static::lazy_static! {
26 static ref METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT: SimpleGauge = SimpleGauge::new(
27 "hopr_transport_p2p_opened_connection_count",
28 "Number of currently open connections"
29 ).unwrap();
30}
31
32async fn build_p2p_network<T, U>(
36 me: libp2p::identity::Keypair,
37 network_update_input: futures::channel::mpsc::Receiver<NetworkTriggeredEvent>,
38 indexer_update_input: U,
39 heartbeat_requests: futures::channel::mpsc::UnboundedReceiver<(PeerId, PingQueryReplier)>,
40 ticket_aggregation_interactions: T,
41 protocol_cfg: ProtocolConfig,
42) -> Result<libp2p::Swarm<HoprNetworkBehavior>>
43where
44 T: Stream<Item = crate::behavior::ticket_aggregation::Event> + Send + 'static,
45 U: Stream<Item = PeerDiscovery> + Send + 'static,
46{
47 let me_peerid: PeerId = me.public().into();
48
49 #[cfg(feature = "runtime-async-std")]
50 let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
51 .with_async_std()
52 .with_tcp(
53 libp2p::tcp::Config::default().nodelay(true),
54 libp2p::noise::Config::new,
55 libp2p::yamux::Config::default,
58 )
59 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
60 .with_quic()
61 .with_dns();
62
63 #[cfg(all(feature = "runtime-tokio", not(feature = "runtime-async-std")))]
66 let swarm = libp2p::SwarmBuilder::with_existing_identity(me)
67 .with_tokio()
68 .with_tcp(
69 libp2p::tcp::Config::default().nodelay(true),
70 libp2p::noise::Config::new,
71 libp2p::yamux::Config::default,
74 )
75 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
76 .with_quic()
77 .with_dns();
78
79 Ok(swarm
80 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
81 .with_behaviour(|_key| {
82 HoprNetworkBehavior::new(
83 me_peerid,
84 network_update_input,
85 indexer_update_input,
86 heartbeat_requests,
87 ticket_aggregation_interactions,
88 protocol_cfg.heartbeat.timeout,
89 protocol_cfg.ticket_aggregation.timeout,
90 )
91 })
92 .map_err(|e| crate::errors::P2PError::Libp2p(e.to_string()))?
93 .with_swarm_config(|cfg| {
94 cfg.with_dial_concurrency_factor(
95 NonZeroU8::new(
96 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_CONCURRENTLY_DIALED_PEER_COUNT")
97 .map(|v| v.trim().parse::<u8>().unwrap_or(u8::MAX))
98 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_DIALED_PEER_COUNT),
99 )
100 .expect("concurrently dialed peer count must be > 0"),
101 )
102 .with_max_negotiating_inbound_streams(
103 std::env::var("HOPR_INTERNAL_LIBP2P_MAX_NEGOTIATING_INBOUND_STREAM_COUNT")
104 .and_then(|v| v.parse::<usize>().map_err(|_e| std::env::VarError::NotPresent))
105 .unwrap_or(constants::HOPR_SWARM_CONCURRENTLY_NEGOTIATING_INBOUND_PEER_COUNT),
106 )
107 .with_idle_connection_timeout(
108 std::env::var("HOPR_INTERNAL_LIBP2P_SWARM_IDLE_TIMEOUT")
109 .and_then(|v| v.parse::<u64>().map_err(|_e| std::env::VarError::NotPresent))
110 .map(std::time::Duration::from_secs)
111 .unwrap_or(constants::HOPR_SWARM_IDLE_CONNECTION_TIMEOUT),
112 )
113 })
114 .build())
115}
116
117pub type TicketAggregationWriter =
118 TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>;
119pub type TicketAggregationEvent = crate::behavior::ticket_aggregation::Event;
120
121pub struct HoprSwarm {
122 pub(crate) swarm: libp2p::Swarm<HoprNetworkBehavior>,
123}
124
125impl HoprSwarm {
126 pub async fn new<U, T>(
127 identity: libp2p::identity::Keypair,
128 network_update_input: futures::channel::mpsc::Receiver<NetworkTriggeredEvent>,
129 indexer_update_input: U,
130 heartbeat_requests: futures::channel::mpsc::UnboundedReceiver<(PeerId, PingQueryReplier)>,
131 ticket_aggregation_interactions: T,
132 my_multiaddresses: Vec<Multiaddr>,
133 protocol_cfg: ProtocolConfig,
134 ) -> Self
135 where
136 T: Stream<Item = TicketAggregationEvent> + Send + 'static,
137 U: Stream<Item = PeerDiscovery> + Send + 'static,
138 {
139 let mut swarm = build_p2p_network(
140 identity,
141 network_update_input,
142 indexer_update_input,
143 heartbeat_requests,
144 ticket_aggregation_interactions,
145 protocol_cfg,
146 )
147 .await
148 .expect("swarm must be constructible");
149
150 for multiaddress in my_multiaddresses.iter() {
151 match resolve_dns_if_any(multiaddress) {
152 Ok(ma) => {
153 if let Err(e) = swarm.listen_on(ma.clone()) {
154 warn!(%multiaddress, listen_on=%ma, error = %e, "Failed to listen_on, will try to use an unspecified address");
155
156 match replace_transport_with_unspecified(&ma) {
157 Ok(ma) => {
158 if let Err(e) = swarm.listen_on(ma.clone()) {
159 warn!(multiaddress = %ma, error = %e, "Failed to listen_on using the unspecified multiaddress",);
160 } else {
161 info!(
162 listen_on = ?ma,
163 multiaddress = ?multiaddress,
164 "Listening for p2p connections"
165 );
166 swarm.add_external_address(multiaddress.clone());
167 }
168 }
169 Err(e) => {
170 error!(multiaddress = %ma, error = %e, "Failed to transform the multiaddress")
171 }
172 }
173 } else {
174 info!(
175 listen_on = ?ma,
176 multiaddress = ?multiaddress,
177 "Listening for p2p connections"
178 );
179 swarm.add_external_address(multiaddress.clone());
180 }
181 }
182 Err(e) => error!(%multiaddress, error = %e, "Failed to transform the multiaddress"),
183 }
184 }
185
186 Self { swarm }
194 }
195
196 pub fn build_protocol_control(&self, protocol: &'static str) -> crate::HoprStreamProtocolControl {
197 crate::HoprStreamProtocolControl::new(self.swarm.behaviour().streams.new_control(), protocol)
198 }
199
200 pub fn with_processors(self, ticket_aggregation_writer: TicketAggregationWriter) -> HoprSwarmWithProcessors {
202 HoprSwarmWithProcessors {
203 swarm: self,
204 ticket_aggregation_writer,
205 }
206 }
207}
208
209fn print_network_info(network_info: NetworkInfo, event: &str) {
210 let num_peers = network_info.num_peers();
211 let connection_counters = network_info.connection_counters();
212 let num_incoming = connection_counters.num_established_incoming();
213 let num_outgoing = connection_counters.num_established_outgoing();
214 info!(
215 num_peers,
216 num_incoming, num_outgoing, "swarm network status after {event}"
217 );
218}
219
220impl From<HoprSwarm> for libp2p::Swarm<HoprNetworkBehavior> {
221 fn from(value: HoprSwarm) -> Self {
222 value.swarm
223 }
224}
225
226#[derive(Debug)]
229pub enum Inputs {
230 Message((PeerId, Box<[u8]>)),
231 Acknowledgement((PeerId, Acknowledgement)),
232}
233
234impl From<(PeerId, Acknowledgement)> for Inputs {
235 fn from(value: (PeerId, Acknowledgement)) -> Self {
236 Self::Acknowledgement(value)
237 }
238}
239
240impl From<(PeerId, Box<[u8]>)> for Inputs {
241 fn from(value: (PeerId, Box<[u8]>)) -> Self {
242 Self::Message(value)
243 }
244}
245
246use hopr_internal_types::legacy;
247
248pub type TicketAggregationRequestType = OutboundRequestId;
249pub type TicketAggregationResponseType = ResponseChannel<std::result::Result<legacy::Ticket, String>>;
250
251pub struct HoprSwarmWithProcessors {
252 swarm: HoprSwarm,
253 ticket_aggregation_writer: TicketAggregationActions<TicketAggregationResponseType, TicketAggregationRequestType>,
254}
255
256impl std::fmt::Debug for HoprSwarmWithProcessors {
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 f.debug_struct("SwarmEventLoop").finish()
259 }
260}
261
262impl HoprSwarmWithProcessors {
263 pub async fn run(self, version: String) {
270 let mut swarm: libp2p::Swarm<HoprNetworkBehavior> = self.swarm.into();
271
272 let active_pings: moka::future::Cache<libp2p::request_response::OutboundRequestId, PingQueryReplier> =
274 moka::future::CacheBuilder::new(1000)
275 .time_to_live(std::time::Duration::from_secs(40))
276 .build();
277 let active_aggregation_requests: moka::future::Cache<
278 libp2p::request_response::OutboundRequestId,
279 TicketAggregationFinalizer,
280 > = moka::future::CacheBuilder::new(1000)
281 .time_to_live(std::time::Duration::from_secs(40))
282 .build();
283
284 let mut aggregation_writer = self.ticket_aggregation_writer;
285
286 loop {
287 select! {
288 event = swarm.select_next_some() => match event {
289 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::TicketAggregation(event)) => {
290 let _span = tracing::span!(tracing::Level::DEBUG, "swarm protocol", protocol = "/hopr/ticket_aggregation/0.1.0");
291 match event {
292 libp2p::request_response::Event::<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket,String>>::Message {
293 peer,
294 message,
295 connection_id
296 } => {
297 match message {
298 libp2p::request_response::Message::<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket,String>>::Request {
299 request_id, request, channel
300 } => {
301 trace!(%peer, %request_id, %connection_id, "Received a ticket aggregation request");
302
303 let request = request.into_iter().map(TransferableWinningTicket::from).collect::<Vec<_>>();
304 if let Err(e) = aggregation_writer.receive_aggregation_request(peer, request, channel) {
305 error!(%peer, %request_id, %connection_id, error = %e, "Failed to process a ticket aggregation request");
306 }
307 },
308 libp2p::request_response::Message::<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket, String>>::Response {
309 request_id, response
310 } => {
311 if let Err(e) = aggregation_writer.receive_ticket(peer, response.map(|t| t.0), request_id) {
312 error!(%peer, %request_id, %connection_id, error = %e, "Failed to receive aggregated ticket");
313 }
314 }
315 }
316 },
317 libp2p::request_response::Event::<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket,String>>::OutboundFailure {
318 peer, request_id, error, connection_id
319 } => {
320 error!(%peer, %request_id, %connection_id, %error, "Failed to send an aggregation request");
321 },
322 libp2p::request_response::Event::<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket,String>>::InboundFailure {
323 peer, request_id, error, connection_id
324 } => {
325 warn!(%peer, %request_id, %connection_id, %error, "Failed to receive an aggregated ticket");
326 },
327 libp2p::request_response::Event::<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket,String>>::ResponseSent {..} => {
328 },
330 }
331 }
332 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Heartbeat(event)) => {
333 let _span = tracing::span!(tracing::Level::DEBUG, "swarm protocol", protocol = "/hopr/heartbeat/0.1.0");
334 match event {
335 libp2p::request_response::Event::<Ping,Pong>::Message {
336 peer,
337 message,
338 connection_id
339 } => {
340 match message {
341 libp2p::request_response::Message::<Ping,Pong>::Request {
342 request_id, request, channel
343 } => {
344 trace!(%peer, %request_id, %connection_id, "Received a heartbeat Ping");
345
346 if let Ok(challenge_response) = ControlMessage::generate_pong_response(&request.0)
347 {
348 if swarm.behaviour_mut().heartbeat.send_response(channel, Pong(challenge_response, version.clone())).is_err() {
349 error!(%peer, %request_id, %connection_id, "Failed to reply to a Ping request");
350 };
351 }
352 },
353 libp2p::request_response::Message::<Ping,Pong>::Response {
354 request_id, response
355 } => {
356 if let Some(replier) = active_pings.remove(&request_id).await {
357 active_pings.run_pending_tasks().await; trace!(%peer, %request_id, "Processing manual ping response");
359 replier.notify(response.0, response.1)
360 } else {
361 debug!(%peer, %request_id, "Failed to find heartbeat replier");
362 }
363 }
364 }
365 },
366 libp2p::request_response::Event::<Ping,Pong>::OutboundFailure {
367 peer, request_id, error, connection_id
368 } => {
369 active_pings.invalidate(&request_id).await;
370 if matches!(error, libp2p::request_response::OutboundFailure::DialFailure) {
371 trace!(%peer, %request_id, %connection_id, %error, "Peer is offline");
372 } else {
373 error!(%peer, %request_id, %connection_id, %error, "Failed heartbeat protocol on outbound");
374 }
375 },
376 libp2p::request_response::Event::<Ping,Pong>::InboundFailure {
377 peer, request_id, error, connection_id
378 } => {
379 warn!(%peer, %request_id, %connection_id, "Failed to receive a Pong request: {error}");
380 },
381 libp2p::request_response::Event::<Ping,Pong>::ResponseSent {..} => {
382 },
384 }
385 }
386 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::KeepAlive(_)) => {}
387 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::Discovery(_)) => {}
388 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::TicketAggregationBehavior(event)) => {
389 let _span = tracing::span!(tracing::Level::DEBUG, "swarm behavior", behavior="ticket aggregation");
390
391 match event {
392 TicketAggregationProcessed::Send(peer, acked_tickets, finalizer) => {
393 let ack_tkt_count = acked_tickets.len();
394 let request_id = swarm.behaviour_mut().ticket_aggregation.send_request(&peer, acked_tickets);
395 debug!(%peer, %request_id, "Sending request to aggregate {ack_tkt_count} tickets");
396 active_aggregation_requests.insert(request_id, finalizer).await;
397 },
398 TicketAggregationProcessed::Reply(peer, ticket, response) => {
399 debug!(%peer, "Enqueuing a response'");
400 if swarm.behaviour_mut().ticket_aggregation.send_response(response, ticket.map(legacy::Ticket)).is_err() {
401 error!(%peer, "Failed to enqueue response");
402 }
403 },
404 TicketAggregationProcessed::Receive(peer, _, request) => {
405 match active_aggregation_requests.remove(&request).await {
406 Some(finalizer) => {
407 active_aggregation_requests.run_pending_tasks().await;
408 finalizer.finalize();
409 },
410 None => {
411 warn!(%peer, request_id = %request, "Response already handled")
412 }
413 }
414 }
415 }
416 }
417 SwarmEvent::Behaviour(HoprNetworkBehaviorEvent::HeartbeatGenerator(event)) => {
418 let _span = tracing::span!(tracing::Level::DEBUG, "swarm behavior", behavior="heartbeat generator");
419
420 trace!(event = tracing::field::debug(&event), "Received a heartbeat event");
421 match event {
422 crate::behavior::heartbeat::Event::ToProbe((peer, replier)) => {
423 let req_id = swarm.behaviour_mut().heartbeat.send_request(&peer, Ping(replier.challenge()));
424 active_pings.insert(req_id, replier).await;
425 },
426 }
427 }
428 SwarmEvent::ConnectionEstablished {
429 peer_id,
430 connection_id,
431 num_established,
432 established_in,
433 ..
434 } => {
437 debug!(%peer_id, %connection_id, num_established, established_in_ms = established_in.as_millis(), transport="libp2p", "connection established");
438
439 print_network_info(swarm.network_info(), "connection established");
440
441 #[cfg(all(feature = "prometheus", not(test)))]
442 {
443 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.increment(1.0);
444 }
445 }
446 SwarmEvent::ConnectionClosed {
447 peer_id,
448 connection_id,
449 cause,
450 num_established,
451 ..
452 } => {
454 debug!(%peer_id, %connection_id, num_established, transport="libp2p", "connection closed: {cause:?}");
455
456 print_network_info(swarm.network_info(), "connection closed");
457
458 #[cfg(all(feature = "prometheus", not(test)))]
459 {
460 METRIC_TRANSPORT_P2P_OPEN_CONNECTION_COUNT.decrement(1.0);
461 }
462 }
463 SwarmEvent::IncomingConnection {
464 connection_id,
465 local_addr,
466 send_back_addr,
467 } => {
468 trace!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", "incoming connection");
469 }
470 SwarmEvent::IncomingConnectionError {
471 local_addr,
472 connection_id,
473 error,
474 send_back_addr,
475 } => {
476 error!(%local_addr, %send_back_addr, %connection_id, transport="libp2p", %error, "incoming connection error");
477
478 print_network_info(swarm.network_info(), "incoming connection error");
479 }
480 SwarmEvent::OutgoingConnectionError {
481 connection_id,
482 error,
483 peer_id
484 } => {
485 error!(peer = ?peer_id, %connection_id, transport="libp2p", %error, "outgoing connection error");
486
487 print_network_info(swarm.network_info(), "outgoing connection error");
488 }
489 SwarmEvent::NewListenAddr {
490 listener_id,
491 address,
492 } => {
493 debug!(%listener_id, %address, transport="libp2p", "new listen address")
494 }
495 SwarmEvent::ExpiredListenAddr {
496 listener_id,
497 address,
498 } => {
499 debug!(%listener_id, %address, transport="libp2p", "expired listen address")
500 }
501 SwarmEvent::ListenerClosed {
502 listener_id,
503 addresses,
504 reason,
505 } => {
506 debug!(%listener_id, ?addresses, ?reason, transport="libp2p", "listener closed", )
507 }
508 SwarmEvent::ListenerError {
509 listener_id,
510 error,
511 } => {
512 debug!(%listener_id, transport="libp2p", %error, "listener error")
513 }
514 SwarmEvent::Dialing {
515 peer_id,
516 connection_id,
517 } => {
518 debug!(peer = ?peer_id, %connection_id, transport="libp2p", "dialing")
519 }
520 SwarmEvent::NewExternalAddrCandidate {
521 .. } => {}
523 SwarmEvent::ExternalAddrConfirmed {
524 .. } => {}
526 SwarmEvent::ExternalAddrExpired {
527 .. } => {}
529 SwarmEvent::NewExternalAddrOfPeer {
530 peer_id, address
531 } => {
532 trace!(transport="libp2p", peer = %peer_id, multiaddress = %address, "New peer stored in swarm")
533 },
534 _ => trace!(transport="libp2p", "Unsupported enum option detected")
535 }
536 }
537 }
538 }
539}