hopr_transport_p2p/
lib.rs

1//! # P2P
2//!
3//! The underlying technology for managing the peer-to-peer networking used by this package is the [`rust-libp2p`](https://github.com/libp2p/rust-libp2p) library ([documentation](https://docs.libp2p.io/)).
4//!
5//! ## Modularity
6//!
7//! `rust-libp2p` is highly modular allowing for reimplmenting expected behavior using custom implementations for API traits.
8//!
9//! This way it is possible to experiment with and combine different components of the library in order to construct a specific targeted use case.
10//!
11//! ## `rust-libp2p` connectivity
12//!
13//! As per the [official documentation](https://connectivity.libp2p.io/), the connectivity types in the library are divided into the `standalone` (implementation of network over host) and `browser` (implementation of network over browser).
14//!
15//! Nodes that are not located behind a blocking firewall or NAT are designated as **public nodes** and can utilize the `TCP` or `QUIC` connectivity, with the recommendation to use QUIC if possible.
16//!
17//! Browser based solutions are almost always located behind a private network or a blocking firewall and to open a connection towards the standalone nodes these utilize either the `WebSocket` approach (by hijacking the `TCP` connection) or the (not yet fully speced up) `WebTransport` (by hijacking the `QUIC` connection).
18//!
19
20/// Constants exported by the crate.
21pub mod constants;
22
23/// Errors generated by the crate.
24pub mod errors;
25
26/// Raw swarm definition for the HOPR network.
27pub mod swarm;
28
29/// P2P behavior definitions for the transport level interactions not related to the HOPR protocol
30mod behavior;
31
32use futures::{AsyncRead, AsyncWrite, Stream};
33use libp2p::{swarm::NetworkBehaviour, StreamProtocol};
34use serde::{Deserialize, Serialize};
35use std::fmt::Debug;
36
37use hopr_internal_types::legacy;
38use hopr_transport_identity::PeerId;
39use hopr_transport_network::messaging::ControlMessage;
40use hopr_transport_network::network::NetworkTriggeredEvent;
41use hopr_transport_network::ping::PingQueryReplier;
42use hopr_transport_protocol::PeerDiscovery;
43
44use crate::constants::{HOPR_HEARTBEAT_PROTOCOL_V_0_1_0, HOPR_TICKET_AGGREGATION_PROTOCOL_V_0_1_0};
45
46pub const MSG_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
47
48/// `Ping` protocol base type for the ping operation
49#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
50pub struct Ping(pub ControlMessage);
51
52/// `Pong` protocol base type for the pong operation
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub struct Pong(pub ControlMessage, pub String);
55
56// Control object for the streams over the HOPR protocols
57#[derive(Clone)]
58pub struct HoprStreamProtocolControl {
59    control: libp2p_stream::Control,
60    protocol: StreamProtocol,
61}
62
63impl HoprStreamProtocolControl {
64    pub fn new(control: libp2p_stream::Control, protocol: &'static str) -> Self {
65        Self {
66            control,
67            protocol: StreamProtocol::new(protocol),
68        }
69    }
70}
71
72impl std::fmt::Debug for HoprStreamProtocolControl {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        f.debug_struct("HoprStreamProtocolControl")
75            .field("protocol", &self.protocol)
76            .finish()
77    }
78}
79
80#[async_trait::async_trait]
81impl hopr_transport_protocol::stream::BidirectionalStreamControl for HoprStreamProtocolControl {
82    fn accept(
83        mut self,
84    ) -> Result<impl Stream<Item = (PeerId, impl AsyncRead + AsyncWrite + Send)> + Send, impl std::error::Error> {
85        self.control.accept(self.protocol)
86    }
87
88    async fn open(mut self, peer: PeerId) -> Result<impl AsyncRead + AsyncWrite + Send, impl std::error::Error> {
89        self.control.open_stream(peer, self.protocol).await
90    }
91}
92
93/// Network Behavior definition for aggregated HOPR network functionality.
94///
95/// Individual network behaviors from the libp2p perspectives are aggregated
96/// under this type in order to create an aggregated network behavior capable
97/// of generating events for all component behaviors.
98#[derive(NetworkBehaviour)]
99#[behaviour(to_swarm = "HoprNetworkBehaviorEvent")]
100pub struct HoprNetworkBehavior {
101    streams: libp2p_stream::Behaviour,
102    heartbeat_generator: behavior::heartbeat::Behaviour,
103    ticket_aggregation_behavior: behavior::ticket_aggregation::Behaviour,
104    pub heartbeat: libp2p::request_response::cbor::Behaviour<Ping, Pong>,
105    pub ticket_aggregation: libp2p::request_response::cbor::Behaviour<
106        Vec<legacy::AcknowledgedTicket>,
107        std::result::Result<legacy::Ticket, String>,
108    >,
109    // WARNING: the order of struct members is important, `discovery` must be the last member,
110    // because the request_response components remove the peer from its peer store after a failed
111    // dial operation and the discovery mechanism is responsible for populating all peer stores.
112    discovery: behavior::discovery::Behaviour,
113}
114
115impl Debug for HoprNetworkBehavior {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        f.debug_struct("HoprNetworkBehavior").finish()
118    }
119}
120
121impl HoprNetworkBehavior {
122    #[allow(clippy::too_many_arguments)]
123    pub fn new<T, U, V, W>(
124        me: PeerId,
125        network_events: T,
126        onchain_events: U,
127        heartbeat_requests: V,
128        ticket_aggregation_processed_events: W,
129        hb_timeout: std::time::Duration,
130        ticket_aggregation_timeout: std::time::Duration,
131    ) -> Self
132    where
133        T: Stream<Item = NetworkTriggeredEvent> + Send + 'static,
134        U: Stream<Item = PeerDiscovery> + Send + 'static,
135        V: Stream<Item = (PeerId, PingQueryReplier)> + Send + 'static,
136        W: Stream<Item = behavior::ticket_aggregation::Event> + Send + 'static,
137    {
138        Self {
139            streams: libp2p_stream::Behaviour::new(),
140            discovery: behavior::discovery::Behaviour::new(me, network_events, onchain_events),
141            heartbeat_generator: behavior::heartbeat::Behaviour::new(heartbeat_requests),
142            ticket_aggregation_behavior: behavior::ticket_aggregation::Behaviour::new(
143                ticket_aggregation_processed_events,
144            ),
145            heartbeat: libp2p::request_response::cbor::Behaviour::<Ping, Pong>::new(
146                [(
147                    StreamProtocol::new(HOPR_HEARTBEAT_PROTOCOL_V_0_1_0),
148                    libp2p::request_response::ProtocolSupport::Full,
149                )],
150                libp2p::request_response::Config::default().with_request_timeout(hb_timeout),
151            ),
152            ticket_aggregation: libp2p::request_response::cbor::Behaviour::<
153                Vec<legacy::AcknowledgedTicket>,
154                std::result::Result<legacy::Ticket, String>,
155            >::new(
156                [(
157                    StreamProtocol::new(HOPR_TICKET_AGGREGATION_PROTOCOL_V_0_1_0),
158                    libp2p::request_response::ProtocolSupport::Full,
159                )],
160                libp2p::request_response::Config::default().with_request_timeout(ticket_aggregation_timeout),
161            ),
162        }
163    }
164}
165
166/// Aggregated network behavior event inheriting the component behaviors' events.
167///
168/// Necessary to allow the libp2p handler to properly distribute the events for
169/// processing in the business logic loop.
170#[derive(Debug)]
171pub enum HoprNetworkBehaviorEvent {
172    Discovery(behavior::discovery::Event),
173    HeartbeatGenerator(behavior::heartbeat::Event),
174    TicketAggregationBehavior(behavior::ticket_aggregation::Event),
175    Heartbeat(libp2p::request_response::Event<Ping, Pong>),
176    TicketAggregation(
177        libp2p::request_response::Event<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket, String>>,
178    ),
179    KeepAlive(void::Void),
180}
181
182// Unexpected libp2p_stream event
183impl From<()> for HoprNetworkBehaviorEvent {
184    fn from(_: ()) -> Self {
185        panic!("Unexpected event: ()")
186    }
187}
188
189impl From<void::Void> for HoprNetworkBehaviorEvent {
190    fn from(event: void::Void) -> Self {
191        Self::KeepAlive(event)
192    }
193}
194
195impl From<behavior::discovery::Event> for HoprNetworkBehaviorEvent {
196    fn from(event: behavior::discovery::Event) -> Self {
197        Self::Discovery(event)
198    }
199}
200
201impl From<behavior::heartbeat::Event> for HoprNetworkBehaviorEvent {
202    fn from(event: behavior::heartbeat::Event) -> Self {
203        Self::HeartbeatGenerator(event)
204    }
205}
206
207impl From<behavior::ticket_aggregation::Event> for HoprNetworkBehaviorEvent {
208    fn from(event: behavior::ticket_aggregation::Event) -> Self {
209        Self::TicketAggregationBehavior(event)
210    }
211}
212
213impl From<libp2p::request_response::Event<Ping, Pong>> for HoprNetworkBehaviorEvent {
214    fn from(event: libp2p::request_response::Event<Ping, Pong>) -> Self {
215        Self::Heartbeat(event)
216    }
217}
218
219impl From<libp2p::request_response::Event<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket, String>>>
220    for HoprNetworkBehaviorEvent
221{
222    fn from(
223        event: libp2p::request_response::Event<
224            Vec<legacy::AcknowledgedTicket>,
225            std::result::Result<legacy::Ticket, String>,
226        >,
227    ) -> Self {
228        Self::TicketAggregation(event)
229    }
230}
231
232pub use swarm::HoprSwarm;