1mod codec;
53
54pub mod config;
56pub mod errors;
58
59pub mod heartbeat;
62pub mod processor;
64
65pub mod stream;
67
68pub mod timer;
69use std::collections::HashMap;
70
71use futures::{SinkExt, StreamExt};
72use hopr_async_runtime::spawn_as_abortable;
73use hopr_crypto_types::types::OffchainPublicKey;
74use hopr_db_api::protocol::{HoprDbProtocolOperations, IncomingPacket};
75use hopr_internal_types::{prelude::HoprPseudonym, protocol::Acknowledgement};
76use hopr_network_types::prelude::ResolvedTransportRouting;
77use hopr_transport_bloom::persistent::WrappedTagBloomFilter;
78use hopr_transport_identity::{Multiaddr, PeerId};
79use hopr_transport_packet::prelude::ApplicationData;
80use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
81use tracing::{error, trace, warn};
82
83use crate::processor::{PacketSendFinalizer, PacketUnwrapping, PacketWrapping};
84pub use crate::{processor::DEFAULT_PRICE_PER_PACKET, timer::execute_on_tick};
85
86const HOPR_PACKET_SIZE: usize = hopr_crypto_packet::prelude::HoprPacket::SIZE;
87const SLOW_OP_MS: u128 = 150;
88
89pub type HoprBinaryCodec = crate::codec::FixedLengthCodec<HOPR_PACKET_SIZE>;
90pub const CURRENT_HOPR_MSG_PROTOCOL: &str = "/hopr/mix/1.0.0";
91
92#[cfg(all(feature = "prometheus", not(test)))]
93use hopr_metrics::metrics::{MultiCounter, SimpleCounter};
94
95#[cfg(all(feature = "prometheus", not(test)))]
96lazy_static::lazy_static! {
97 static ref METRIC_PACKET_COUNT: MultiCounter = MultiCounter::new(
99 "hopr_packets_count",
100 "Number of processed packets of different types (sent, received, forwarded)",
101 &["type"]
102 ).unwrap();
103 static ref METRIC_PACKET_COUNT_PER_PEER: MultiCounter = MultiCounter::new(
104 "hopr_packets_per_peer_count",
105 "Number of processed packets to/from distinct peers",
106 &["peer", "direction"]
107 ).unwrap();
108 static ref METRIC_REPLAYED_PACKET_COUNT: SimpleCounter = SimpleCounter::new(
109 "hopr_replayed_packet_count",
110 "The total count of replayed packets during the packet processing pipeline run",
111 ).unwrap();
112 static ref METRIC_REJECTED_TICKETS_COUNT: SimpleCounter =
113 SimpleCounter::new("hopr_rejected_tickets_count", "Number of rejected tickets").unwrap();
114}
115
116#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
117pub enum ProtocolProcesses {
118 #[strum(to_string = "HOPR [msg] - ingress")]
119 MsgIn,
120 #[strum(to_string = "HOPR [msg] - egress")]
121 MsgOut,
122 #[strum(to_string = "HOPR [msg] - mixer")]
123 Mixer,
124 #[strum(to_string = "bloom filter persistence (periodic)")]
125 BloomPersist,
126}
127#[derive(Debug, Clone)]
129pub enum PeerDiscovery {
130 Allow(PeerId),
131 Ban(PeerId),
132 Announce(PeerId, Vec<Multiaddr>),
133}
134
135#[allow(clippy::too_many_arguments)]
140pub async fn run_msg_ack_protocol<Db>(
141 packet_cfg: processor::PacketInteractionConfig,
142 db: Db,
143 bloom_filter_persistent_path: Option<String>,
144 wire_msg: (
145 impl futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + Sync + 'static,
146 impl futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + Sync + 'static,
147 ),
148 api: (
149 impl futures::Sink<(HoprPseudonym, ApplicationData)> + Send + Sync + 'static,
150 impl futures::Stream<Item = (ApplicationData, ResolvedTransportRouting, PacketSendFinalizer)>
151 + Send
152 + Sync
153 + 'static,
154 ),
155) -> HashMap<ProtocolProcesses, hopr_async_runtime::AbortHandle>
156where
157 Db: HoprDbProtocolOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
158{
159 let me = packet_cfg.packet_keypair.clone();
160
161 let mut processes = HashMap::new();
162
163 #[cfg(all(feature = "prometheus", not(test)))]
164 {
165 lazy_static::initialize(&METRIC_PACKET_COUNT);
170 lazy_static::initialize(&METRIC_PACKET_COUNT_PER_PEER);
171 lazy_static::initialize(&METRIC_REPLAYED_PACKET_COUNT);
172 lazy_static::initialize(&METRIC_REJECTED_TICKETS_COUNT);
173 }
174
175 let tbf = if let Some(bloom_filter_persistent_path) = bloom_filter_persistent_path {
176 let tbf = WrappedTagBloomFilter::new(bloom_filter_persistent_path);
177 let tbf_2 = tbf.clone();
178 processes.insert(
179 ProtocolProcesses::BloomPersist,
180 spawn_as_abortable(Box::pin(execute_on_tick(
181 std::time::Duration::from_secs(90),
182 move || {
183 let tbf_clone = tbf_2.clone();
184
185 async move { tbf_clone.save().await }
186 },
187 "persisting the bloom filter to disk".into(),
188 ))),
189 );
190 tbf
191 } else {
192 WrappedTagBloomFilter::new("no_tbf".into())
193 };
194
195 let msg_processor_read = processor::PacketProcessor::new(db.clone(), packet_cfg);
196 let msg_processor_write = msg_processor_read.clone();
197
198 let msg_to_send_tx = wire_msg.0.clone();
199 processes.insert(
200 ProtocolProcesses::MsgOut,
201 spawn_as_abortable(async move {
202 let _neverending = api
203 .1
204 .then_concurrent(|(data, routing, finalizer)| {
205 let msg_processor = msg_processor_write.clone();
206
207 async move {
208 match PacketWrapping::send(&msg_processor, data, routing).await {
209 Ok(v) => {
210 let v: (PeerId, Box<[u8]>) = (v.next_hop.into(), v.data);
211 #[cfg(all(feature = "prometheus", not(test)))]
212 {
213 METRIC_PACKET_COUNT_PER_PEER.increment(&["out", &v.0.to_string()]);
214 METRIC_PACKET_COUNT.increment(&["sent"]);
215 }
216 finalizer.finalize(Ok(()));
217 Some(v)
218 }
219 Err(e) => {
220 finalizer.finalize(Err(e));
221 None
222 }
223 }
224 }
225 })
226 .filter_map(|v| async move { v })
227 .map(Ok)
228 .forward(msg_to_send_tx)
229 .await;
230 }),
231 );
232
233 let msg_to_send_tx = wire_msg.0.clone();
234 let db_for_recv = db.clone();
235 let me_for_recv = me.clone();
236 processes.insert(
237 ProtocolProcesses::MsgIn,
238 spawn_as_abortable(async move {
239 let _neverending = wire_msg
240 .1
241 .then_concurrent(move |(peer, data)| {
242 let msg_processor = msg_processor_read.clone();
243 let db = db_for_recv.clone();
244 let mut msg_to_send_tx = msg_to_send_tx.clone();
245 let me = me.clone();
246
247 async move {
248 let now = std::time::Instant::now();
249 let res = msg_processor.recv(&peer, data).await.map_err(move |e| (peer, e));
250 let elapsed = now.elapsed();
251 if elapsed.as_millis() > SLOW_OP_MS {
252 tracing::warn!("msg_processor.recv took {}ms", elapsed.as_millis());
253 }
254 if let Err((peer, e)) = &res {
255 #[cfg(all(feature = "prometheus", not(test)))]
256 if let hopr_crypto_packet::errors::PacketError::TicketValidation(_) = e {
257 METRIC_REJECTED_TICKETS_COUNT.increment();
258 }
259
260 error!(peer = %peer, error = %e, "Failed to process the received message");
261
262 let peer: OffchainPublicKey = match peer.try_into() {
263 Ok(p) => p,
264 Err(error) => {
265 tracing::warn!(%peer, %error, "Dropping packet – cannot convert peer id");
266 return None;
267 }
268 };
269
270 let ack = Acknowledgement::random(&me);
272
273 match db
274 .to_send_no_ack(ack.as_ref().to_vec().into_boxed_slice(), peer)
275 .await {
276 Ok(ack_packet) => {
277 let now = std::time::Instant::now();
278 msg_to_send_tx
279 .send((
280 ack_packet.next_hop.into(),
281 ack_packet.data,
282 ))
283 .await
284 .unwrap_or_else(|_e| {
285 error!("Failed to forward an acknowledgement for a failed packet recv to the transport layer");
286 });
287 let elapsed = now.elapsed();
288 if elapsed.as_millis() > SLOW_OP_MS {
289 tracing::warn!("msg_to_send_tx.send took {}ms", elapsed.as_millis());
290 }
291 },
292 Err(error) => tracing::error!(%error, "Failed to create random ack packet for a failed receive"),
293 }
294 }
295
296 res.ok().flatten()
297 }
298 })
299 .filter_map(move |maybe_packet| {
300 let tbf = tbf.clone();
301
302 async move {
303 if let Some(packet) = maybe_packet {
304 match packet {
305 IncomingPacket::Final { packet_tag, previous_hop,.. }
306 | IncomingPacket::Forwarded { packet_tag, previous_hop, .. } => {
307 if tbf.is_tag_replay(&packet_tag).await {
308 warn!("replayed packet received from {previous_hop}");
309 #[cfg(all(feature = "prometheus", not(test)))]
310 METRIC_REPLAYED_PACKET_COUNT.increment();
311
312 None
313 } else {
314 Some(packet)
315 }
316 }
317 }
318 } else {
319 trace!("received empty packet");
320 None
321 }
322 }
323 })
324 .then_concurrent(move |packet| {
325 let mut msg_to_send_tx = wire_msg.0.clone();
326 let db = db.clone();
327 let me = me_for_recv.clone();
328
329 async move {
330
331 match packet {
332 IncomingPacket::Final {
333 previous_hop,
334 sender,
335 plain_text,
336 ack_key,
337 ..
338 } => {
339 trace!("acknowledging final packet to {previous_hop}");
340 let ack = Acknowledgement::new(ack_key, &me);
341 if let Ok(ack_packet) = db
342 .to_send_no_ack(ack.as_ref().to_vec().into_boxed_slice(), previous_hop)
343 .await
344 .inspect_err(|error| tracing::error!(error = %error, "Failed to create ack packet for a received message"))
345 {
346 msg_to_send_tx
347 .send((
348 ack_packet.next_hop.into(),
349 ack_packet.data,
350 ))
351 .await
352 .unwrap_or_else(|_e| {
353 error!("Failed to send an acknowledgement for a received packet to the transport layer");
354 });
355 }
356
357 Some((sender, plain_text))
358 }
359 IncomingPacket::Forwarded {
360 previous_hop,
361 next_hop,
362 data,
363 ack,
364 ..
365 } => {
366 trace!("acknowledging forwarded packet {previous_hop}->{next_hop}");
367 msg_to_send_tx
368 .send((
369 next_hop.into(),
370 data,
371 ))
372 .await
373 .unwrap_or_else(|_e| {
374 error!("Failed to forward a packet to the transport layer");
375 });
376
377 if let Ok(ack_packet) = db
378 .to_send_no_ack(ack.as_ref().to_vec().into_boxed_slice(), previous_hop)
379 .await
380 .inspect_err(|error| tracing::error!(error = %error, "Failed to create ack packet for a relayed message"))
381 {
382 msg_to_send_tx
383 .send((
384 ack_packet.next_hop.into(),
385 ack_packet.data,
386 ))
387 .await
388 .unwrap_or_else(|_e| {
389 error!("Failed to send an acknowledgement for a relayed packet to the transport layer");
390 });
391 }
392 None
393 }
394 }
395 }})
396 .filter_map(|maybe_data| async move {
397 if let Some((sender, data)) = maybe_data {
398 ApplicationData::from_bytes(data.as_ref())
399 .inspect_err(|error| tracing::error!(error = %error, "Failed to decode application data"))
400 .ok()
401 .map(|data| (sender, data))
402 } else {
403 None
404 }
405 })
406 .map(Ok)
407 .forward(api.0)
408 .await;
409 }),
410 );
411
412 processes
413}