1pub mod constants;
22
23pub mod errors;
25
26pub mod swarm;
28
29mod behavior;
31
32use futures::{AsyncRead, AsyncWrite, Stream};
33use libp2p::{swarm::NetworkBehaviour, StreamProtocol};
34use serde::{Deserialize, Serialize};
35use std::fmt::Debug;
36
37use hopr_internal_types::legacy;
38use hopr_transport_identity::PeerId;
39use hopr_transport_network::messaging::ControlMessage;
40use hopr_transport_network::network::NetworkTriggeredEvent;
41use hopr_transport_network::ping::PingQueryReplier;
42use hopr_transport_protocol::PeerDiscovery;
43
44use crate::constants::{HOPR_HEARTBEAT_PROTOCOL_V_0_1_0, HOPR_TICKET_AGGREGATION_PROTOCOL_V_0_1_0};
45
46pub const MSG_ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
47
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
50pub struct Ping(pub ControlMessage);
51
52#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub struct Pong(pub ControlMessage, pub String);
55
56#[derive(Clone)]
58pub struct HoprStreamProtocolControl {
59 control: libp2p_stream::Control,
60 protocol: StreamProtocol,
61}
62
63impl HoprStreamProtocolControl {
64 pub fn new(control: libp2p_stream::Control, protocol: &'static str) -> Self {
65 Self {
66 control,
67 protocol: StreamProtocol::new(protocol),
68 }
69 }
70}
71
72impl std::fmt::Debug for HoprStreamProtocolControl {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 f.debug_struct("HoprStreamProtocolControl")
75 .field("protocol", &self.protocol)
76 .finish()
77 }
78}
79
80#[async_trait::async_trait]
81impl hopr_transport_protocol::stream::BidirectionalStreamControl for HoprStreamProtocolControl {
82 fn accept(
83 mut self,
84 ) -> Result<impl Stream<Item = (PeerId, impl AsyncRead + AsyncWrite + Send)> + Send, impl std::error::Error> {
85 self.control.accept(self.protocol)
86 }
87
88 async fn open(mut self, peer: PeerId) -> Result<impl AsyncRead + AsyncWrite + Send, impl std::error::Error> {
89 self.control.open_stream(peer, self.protocol).await
90 }
91}
92
93#[derive(NetworkBehaviour)]
99#[behaviour(to_swarm = "HoprNetworkBehaviorEvent")]
100pub struct HoprNetworkBehavior {
101 streams: libp2p_stream::Behaviour,
102 heartbeat_generator: behavior::heartbeat::Behaviour,
103 ticket_aggregation_behavior: behavior::ticket_aggregation::Behaviour,
104 pub heartbeat: libp2p::request_response::cbor::Behaviour<Ping, Pong>,
105 pub ticket_aggregation: libp2p::request_response::cbor::Behaviour<
106 Vec<legacy::AcknowledgedTicket>,
107 std::result::Result<legacy::Ticket, String>,
108 >,
109 discovery: behavior::discovery::Behaviour,
113}
114
115impl Debug for HoprNetworkBehavior {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 f.debug_struct("HoprNetworkBehavior").finish()
118 }
119}
120
121impl HoprNetworkBehavior {
122 #[allow(clippy::too_many_arguments)]
123 pub fn new<T, U, V, W>(
124 me: PeerId,
125 network_events: T,
126 onchain_events: U,
127 heartbeat_requests: V,
128 ticket_aggregation_processed_events: W,
129 hb_timeout: std::time::Duration,
130 ticket_aggregation_timeout: std::time::Duration,
131 ) -> Self
132 where
133 T: Stream<Item = NetworkTriggeredEvent> + Send + 'static,
134 U: Stream<Item = PeerDiscovery> + Send + 'static,
135 V: Stream<Item = (PeerId, PingQueryReplier)> + Send + 'static,
136 W: Stream<Item = behavior::ticket_aggregation::Event> + Send + 'static,
137 {
138 Self {
139 streams: libp2p_stream::Behaviour::new(),
140 discovery: behavior::discovery::Behaviour::new(me, network_events, onchain_events),
141 heartbeat_generator: behavior::heartbeat::Behaviour::new(heartbeat_requests),
142 ticket_aggregation_behavior: behavior::ticket_aggregation::Behaviour::new(
143 ticket_aggregation_processed_events,
144 ),
145 heartbeat: libp2p::request_response::cbor::Behaviour::<Ping, Pong>::new(
146 [(
147 StreamProtocol::new(HOPR_HEARTBEAT_PROTOCOL_V_0_1_0),
148 libp2p::request_response::ProtocolSupport::Full,
149 )],
150 libp2p::request_response::Config::default().with_request_timeout(hb_timeout),
151 ),
152 ticket_aggregation: libp2p::request_response::cbor::Behaviour::<
153 Vec<legacy::AcknowledgedTicket>,
154 std::result::Result<legacy::Ticket, String>,
155 >::new(
156 [(
157 StreamProtocol::new(HOPR_TICKET_AGGREGATION_PROTOCOL_V_0_1_0),
158 libp2p::request_response::ProtocolSupport::Full,
159 )],
160 libp2p::request_response::Config::default().with_request_timeout(ticket_aggregation_timeout),
161 ),
162 }
163 }
164}
165
166#[derive(Debug)]
171pub enum HoprNetworkBehaviorEvent {
172 Discovery(behavior::discovery::Event),
173 HeartbeatGenerator(behavior::heartbeat::Event),
174 TicketAggregationBehavior(behavior::ticket_aggregation::Event),
175 Heartbeat(libp2p::request_response::Event<Ping, Pong>),
176 TicketAggregation(
177 libp2p::request_response::Event<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket, String>>,
178 ),
179 KeepAlive(void::Void),
180}
181
182impl From<()> for HoprNetworkBehaviorEvent {
184 fn from(_: ()) -> Self {
185 panic!("Unexpected event: ()")
186 }
187}
188
189impl From<void::Void> for HoprNetworkBehaviorEvent {
190 fn from(event: void::Void) -> Self {
191 Self::KeepAlive(event)
192 }
193}
194
195impl From<behavior::discovery::Event> for HoprNetworkBehaviorEvent {
196 fn from(event: behavior::discovery::Event) -> Self {
197 Self::Discovery(event)
198 }
199}
200
201impl From<behavior::heartbeat::Event> for HoprNetworkBehaviorEvent {
202 fn from(event: behavior::heartbeat::Event) -> Self {
203 Self::HeartbeatGenerator(event)
204 }
205}
206
207impl From<behavior::ticket_aggregation::Event> for HoprNetworkBehaviorEvent {
208 fn from(event: behavior::ticket_aggregation::Event) -> Self {
209 Self::TicketAggregationBehavior(event)
210 }
211}
212
213impl From<libp2p::request_response::Event<Ping, Pong>> for HoprNetworkBehaviorEvent {
214 fn from(event: libp2p::request_response::Event<Ping, Pong>) -> Self {
215 Self::Heartbeat(event)
216 }
217}
218
219impl From<libp2p::request_response::Event<Vec<legacy::AcknowledgedTicket>, std::result::Result<legacy::Ticket, String>>>
220 for HoprNetworkBehaviorEvent
221{
222 fn from(
223 event: libp2p::request_response::Event<
224 Vec<legacy::AcknowledgedTicket>,
225 std::result::Result<legacy::Ticket, String>,
226 >,
227 ) -> Self {
228 Self::TicketAggregation(event)
229 }
230}
231
232pub use swarm::HoprSwarm;