1pub mod config;
48pub mod errors;
50
51pub mod bloom;
53pub mod ack;
56pub mod heartbeat;
58pub mod msg;
60pub mod ticket_aggregation;
62
63pub mod stream;
65
66pub mod timer;
67use hopr_transport_identity::Multiaddr;
68pub use timer::execute_on_tick;
69
70use futures::{SinkExt, StreamExt};
71use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
72use std::collections::HashMap;
73use tracing::error;
74
75use hopr_async_runtime::prelude::spawn;
76use hopr_db_api::protocol::HoprDbProtocolOperations;
77use hopr_internal_types::protocol::{Acknowledgement, ApplicationData};
78use hopr_path::path::TransportPath;
79use hopr_transport_identity::PeerId;
80
81pub use msg::processor::DEFAULT_PRICE_PER_PACKET;
82use msg::processor::{PacketSendFinalizer, PacketUnwrapping, PacketWrapping};
83
84#[cfg(all(feature = "prometheus", not(test)))]
85use hopr_metrics::metrics::{MultiCounter, SimpleCounter};
86
87#[cfg(all(feature = "prometheus", not(test)))]
88lazy_static::lazy_static! {
89 static ref METRIC_RECEIVED_ACKS: MultiCounter = MultiCounter::new(
91 "hopr_received_ack_count",
92 "Number of received acknowledgements",
93 &["valid"]
94 )
95 .unwrap();
96 static ref METRIC_SENT_ACKS: SimpleCounter =
97 SimpleCounter::new("hopr_sent_acks_count", "Number of sent message acknowledgements").unwrap();
98 static ref METRIC_TICKETS_COUNT: MultiCounter =
99 MultiCounter::new("hopr_tickets_count", "Number of winning tickets", &["type"]).unwrap();
100 static ref METRIC_PACKET_COUNT: MultiCounter = MultiCounter::new(
102 "hopr_packets_count",
103 "Number of processed packets of different types (sent, received, forwarded)",
104 &["type"]
105 ).unwrap();
106 static ref METRIC_PACKET_COUNT_PER_PEER: MultiCounter = MultiCounter::new(
107 "hopr_packets_per_peer_count",
108 "Number of processed packets to/from distinct peers",
109 &["peer", "direction"]
110 ).unwrap();
111 static ref METRIC_REPLAYED_PACKET_COUNT: SimpleCounter = SimpleCounter::new(
112 "hopr_replayed_packet_count",
113 "The total count of replayed packets during the packet processing pipeline run",
114 ).unwrap();
115 static ref METRIC_REJECTED_TICKETS_COUNT: SimpleCounter =
116 SimpleCounter::new("hopr_rejected_tickets_count", "Number of rejected tickets").unwrap();
117}
118
119#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
120pub enum ProtocolProcesses {
121 #[strum(to_string = "HOPR [ack] - ingress")]
122 AckIn,
123 #[strum(to_string = "HOPR [ack] - egress")]
124 AckOut,
125 #[strum(to_string = "HOPR [msg] - ingress")]
126 MsgIn,
127 #[strum(to_string = "HOPR [msg] - egress")]
128 MsgOut,
129 #[strum(to_string = "HOPR [msg] - mixer")]
130 Mixer,
131 #[strum(to_string = "bloom filter persistence (periodic)")]
132 BloomPersist,
133}
134#[derive(Debug, Clone)]
136pub enum PeerDiscovery {
137 Allow(PeerId),
138 Ban(PeerId),
139 Announce(PeerId, Vec<Multiaddr>),
140}
141
142#[allow(clippy::too_many_arguments)]
147pub async fn run_msg_ack_protocol<Db>(
148 packet_cfg: msg::processor::PacketInteractionConfig,
149 db: Db,
150 bloom_filter_persistent_path: Option<String>,
151 wire_ack: (
152 impl futures::Sink<(PeerId, Acknowledgement)> + Send + Sync + 'static,
153 impl futures::Stream<Item = (PeerId, Acknowledgement)> + Send + Sync + 'static,
154 ),
155 wire_msg: (
156 impl futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + Sync + 'static,
157 impl futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + Sync + 'static,
158 ),
159 api: (
160 impl futures::Sink<ApplicationData> + Send + Sync + 'static,
161 impl futures::Stream<Item = (ApplicationData, TransportPath, PacketSendFinalizer)> + Send + Sync + 'static,
162 ),
163) -> HashMap<ProtocolProcesses, hopr_async_runtime::prelude::JoinHandle<()>>
164where
165 Db: HoprDbProtocolOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
166{
167 let me = packet_cfg.packet_keypair.clone();
168 let me_onchain = &packet_cfg.chain_keypair.clone();
169
170 let mut processes = HashMap::new();
171
172 #[cfg(all(feature = "prometheus", not(test)))]
173 {
174 lazy_static::initialize(&METRIC_RECEIVED_ACKS);
176 lazy_static::initialize(&METRIC_SENT_ACKS);
177 lazy_static::initialize(&METRIC_TICKETS_COUNT);
178 lazy_static::initialize(&METRIC_PACKET_COUNT);
179 lazy_static::initialize(&METRIC_PACKET_COUNT_PER_PEER);
180 lazy_static::initialize(&METRIC_REPLAYED_PACKET_COUNT);
181 lazy_static::initialize(&METRIC_REJECTED_TICKETS_COUNT);
182 }
183
184 let tbf = if let Some(bloom_filter_persistent_path) = bloom_filter_persistent_path {
185 let tbf = bloom::WrappedTagBloomFilter::new(bloom_filter_persistent_path);
186 let tbf_2 = tbf.clone();
187 processes.insert(
188 ProtocolProcesses::BloomPersist,
189 spawn(Box::pin(execute_on_tick(
190 std::time::Duration::from_secs(90),
191 move || {
192 let tbf_clone = tbf_2.clone();
193
194 async move { tbf_clone.save().await }
195 },
196 "persisting the bloom filter to disk".into(),
197 ))),
198 );
199 tbf
200 } else {
201 bloom::WrappedTagBloomFilter::new("no_tbf".into())
202 };
203
204 let ack_processor_read = ack::processor::AcknowledgementProcessor::new(db.clone(), me_onchain);
205 let ack_processor_write = ack_processor_read.clone();
206 let msg_processor_read = msg::processor::PacketProcessor::new(db.clone(), tbf, packet_cfg);
207 let msg_processor_write = msg_processor_read.clone();
208
209 processes.insert(
210 ProtocolProcesses::AckIn,
211 spawn(async move {
212 let _neverending = wire_ack
213 .1
214 .for_each_concurrent(None, move |(peer, ack)| {
215 let ack_processor = ack_processor_read.clone();
216
217 async move {
218 let _ack_result = ack_processor.recv(&peer, ack).await;
219 #[cfg(all(feature = "prometheus", not(test)))]
220 match &_ack_result {
221 Ok(hopr_db_api::prelude::AckResult::Sender(_)) => {
222 METRIC_RECEIVED_ACKS.increment(&["true"]);
223 }
224 Ok(hopr_db_api::prelude::AckResult::RelayerWinning(_)) => {
225 METRIC_RECEIVED_ACKS.increment(&["true"]);
226 METRIC_TICKETS_COUNT.increment(&["winning"]);
227 }
228 Ok(hopr_db_api::prelude::AckResult::RelayerLosing) => {
229 METRIC_RECEIVED_ACKS.increment(&["true"]);
230 METRIC_TICKETS_COUNT.increment(&["losing"]);
231 }
232 Err(_) => {
233 METRIC_RECEIVED_ACKS.increment(&["false"]);
234 }
235 }
236 }
237 })
238 .await;
239 }),
240 );
241
242 let (internal_ack_send, internal_ack_rx) = futures::channel::mpsc::unbounded::<(PeerId, Acknowledgement)>();
243
244 processes.insert(
245 ProtocolProcesses::AckOut,
246 spawn(async move {
247 let _neverending = internal_ack_rx
248 .then_concurrent(move |(peer, ack)| {
249 let ack_processor = ack_processor_write.clone();
250
251 #[cfg(all(feature = "prometheus", not(test)))]
252 METRIC_SENT_ACKS.increment();
253
254 async move { (peer, ack_processor.send(&peer, ack).await) }
255 })
256 .map(Ok)
257 .forward(wire_ack.0)
258 .await;
259 }),
260 );
261
262 let msg_to_send_tx = wire_msg.0.clone();
263 processes.insert(
264 ProtocolProcesses::MsgOut,
265 spawn(async move {
266 let _neverending = api
267 .1
268 .then_concurrent(|(data, path, finalizer)| {
269 let msg_processor = msg_processor_write.clone();
270
271 async move {
272 match PacketWrapping::send(&msg_processor, data, path).await {
273 Ok(v) => {
274 #[cfg(all(feature = "prometheus", not(test)))]
275 {
276 METRIC_PACKET_COUNT_PER_PEER.increment(&["out", &v.0.to_string()]);
277 METRIC_PACKET_COUNT.increment(&["sent"]);
278 }
279 finalizer.finalize(Ok(()));
280 Some(v)
281 }
282 Err(e) => {
283 finalizer.finalize(Err(e));
284 None
285 }
286 }
287 }
288 })
289 .filter_map(|v| async move { v })
290 .map(Ok)
291 .forward(msg_to_send_tx)
292 .await;
293 }),
294 );
295
296 let me = me.clone();
297 processes.insert(
298 ProtocolProcesses::MsgIn,
299 spawn(async move {
300 let _neverending = wire_msg
301 .1
302 .then_concurrent(move |(peer, data)| {
303 let msg_processor = msg_processor_read.clone();
304
305 async move { msg_processor.recv(&peer, data).await.map_err(|e| (peer, e)) }
306 })
307 .filter_map(move |v| {
308 let mut internal_ack_send = internal_ack_send.clone();
309 let mut msg_to_send_tx = wire_msg.0.clone();
310 let me = me.clone();
311
312 async move {
313 match v {
314 Ok(v) => match v {
315 msg::processor::RecvOperation::Receive { data, ack } => {
316 #[cfg(all(feature = "prometheus", not(test)))]
317 {
318 METRIC_PACKET_COUNT_PER_PEER.increment(&["in", &ack.peer.to_string()]);
319 METRIC_PACKET_COUNT.increment(&["received"]);
320 }
321 internal_ack_send.send((ack.peer, ack.ack)).await.unwrap_or_else(|e| {
322 error!(error = %e, "Failed to forward an acknowledgement to the transport layer");
323 });
324 Some(data)
325 }
326 msg::processor::RecvOperation::Forward { msg, ack } => {
327 #[cfg(all(feature = "prometheus", not(test)))]
328 {
329 METRIC_PACKET_COUNT_PER_PEER.increment(&["in", &ack.peer.to_string()]);
330 METRIC_PACKET_COUNT_PER_PEER.increment(&["out", &msg.peer.to_string()]);
331 METRIC_PACKET_COUNT.increment(&["forwarded"]);
332 }
333
334 msg_to_send_tx.send((msg.peer, msg.data)).await.unwrap_or_else(|_e| {
335 error!("Failed to forward a message to the transport layer");
336 });
337 internal_ack_send.send((ack.peer, ack.ack)).await.unwrap_or_else(|e| {
338 error!(error = %e, "Failed to forward an acknowledgement to the transport layer");
339 });
340 None
341 }
342 },
343 Err((peer, e)) => {
344 #[cfg(all(feature = "prometheus", not(test)))]
345 match e {
346 hopr_crypto_packet::errors::PacketError::TagReplay => {
347 METRIC_REPLAYED_PACKET_COUNT.increment();
348 },
349 hopr_crypto_packet::errors::PacketError::TicketValidation(_) => {
350 METRIC_REJECTED_TICKETS_COUNT.increment();
351 },
352 _ => {}
353 }
354
355 error!(peer = %peer, error = %e, "Failed to process the received message");
356 internal_ack_send
358 .send((
359 peer,
360 Acknowledgement::random(&me),
361 ))
362 .await
363 .unwrap_or_else(|e| {
364 error!(error = %e, "Failed to forward an acknowledgement for a failed packet recv to the transport layer");
365 });
366
367 None
368 }
369 }
370 }
371 })
372 .map(Ok)
373 .forward(api.0)
374 .await;
375 }),
376 );
377
378 processes
379}