1mod codec;
53
54pub mod config;
56pub mod errors;
58
59pub mod heartbeat;
62pub mod processor;
64
65pub mod stream;
67
68pub mod timer;
69
70#[cfg(feature = "capture")]
74mod capture;
75
76use std::{collections::HashMap, time::Duration};
77
78use futures::{SinkExt, StreamExt};
79use hopr_async_runtime::spawn_as_abortable;
80use hopr_crypto_types::types::{HalfKey, OffchainPublicKey};
81use hopr_db_api::protocol::{HoprDbProtocolOperations, IncomingPacket};
82use hopr_internal_types::{
83 prelude::{Acknowledgement, HoprPseudonym},
84 protocol::VerifiedAcknowledgement,
85};
86use hopr_network_types::prelude::ResolvedTransportRouting;
87use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, IncomingPacketInfo};
88use hopr_transport_bloom::TagBloomFilter;
89use hopr_transport_identity::{Multiaddr, PeerId};
90use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
91use tracing::{Instrument, error, trace, warn};
92
93use crate::processor::{PacketSendFinalizer, PacketUnwrapping, PacketWrapping};
94pub use crate::{processor::DEFAULT_PRICE_PER_PACKET, timer::execute_on_tick};
95
96const HOPR_PACKET_SIZE: usize = hopr_crypto_packet::prelude::HoprPacket::SIZE;
97const SLOW_OP_MS: u128 = 150;
98
99pub type HoprBinaryCodec = crate::codec::FixedLengthCodec<HOPR_PACKET_SIZE>;
100pub const CURRENT_HOPR_MSG_PROTOCOL: &str = "/hopr/mix/1.0.0";
101
102pub const TICKET_ACK_BUFFER_SIZE: usize = 1_000_000;
103pub const NUM_CONCURRENT_TICKET_ACK_PROCESSING: usize = 10;
104
105pub const ACK_OUT_BUFFER_SIZE: usize = 1_000_000;
106pub const NUM_CONCURRENT_ACK_OUT_PROCESSING: usize = 10;
107
108#[cfg(all(feature = "prometheus", not(test)))]
109use hopr_metrics::metrics::{MultiCounter, SimpleCounter};
110
111#[cfg(all(feature = "prometheus", not(test)))]
112lazy_static::lazy_static! {
113 static ref METRIC_PACKET_COUNT: MultiCounter = MultiCounter::new(
115 "hopr_packets_count",
116 "Number of processed packets of different types (sent, received, forwarded)",
117 &["type"]
118 ).unwrap();
119 static ref METRIC_PACKET_COUNT_PER_PEER: MultiCounter = MultiCounter::new(
120 "hopr_packets_per_peer_count",
121 "Number of processed packets to/from distinct peers",
122 &["peer", "direction"]
123 ).unwrap();
124 static ref METRIC_REPLAYED_PACKET_COUNT: SimpleCounter = SimpleCounter::new(
125 "hopr_replayed_packet_count",
126 "The total count of replayed packets during the packet processing pipeline run",
127 ).unwrap();
128 static ref METRIC_REJECTED_TICKETS_COUNT: SimpleCounter =
129 SimpleCounter::new("hopr_rejected_tickets_count", "Number of rejected tickets").unwrap();
130}
131
132#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
133pub enum ProtocolProcesses {
134 #[strum(to_string = "HOPR [msg] - ingress")]
135 MsgIn,
136 #[strum(to_string = "HOPR [msg] - egress")]
137 MsgOut,
138 #[strum(to_string = "HOPR [ack] - egress")]
139 AckOut,
140 #[strum(to_string = "HOPR [ack] - ingress - ticket acknowledgement")]
141 TicketAck,
142 #[strum(to_string = "HOPR [msg] - mixer")]
143 Mixer,
144 #[cfg(feature = "capture")]
145 #[strum(to_string = "packet capture")]
146 Capture,
147}
148#[derive(Debug, Clone)]
150pub enum PeerDiscovery {
151 Allow(PeerId),
152 Ban(PeerId),
153 Announce(PeerId, Vec<Multiaddr>),
154}
155
156#[cfg(feature = "capture")]
157fn inspect_ticket_data_in_packet(raw_packet: &[u8]) -> &[u8] {
158 use hopr_primitive_types::traits::BytesEncodable;
159 if raw_packet.len() >= hopr_internal_types::tickets::Ticket::SIZE {
160 &raw_packet[raw_packet.len() - hopr_internal_types::tickets::Ticket::SIZE..]
161 } else {
162 &[]
163 }
164}
165
166#[allow(clippy::too_many_arguments)]
171pub async fn run_msg_ack_protocol<Db>(
172 packet_cfg: processor::PacketInteractionConfig,
173 db: Db,
174 wire_msg: (
175 impl futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + Sync + 'static,
176 impl futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + Sync + 'static,
177 ),
178 api: (
179 impl futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + Sync + 'static,
180 impl futures::Stream<Item = (ApplicationDataOut, ResolvedTransportRouting, PacketSendFinalizer)>
181 + Send
182 + Sync
183 + 'static,
184 ),
185) -> HashMap<ProtocolProcesses, hopr_async_runtime::AbortHandle>
186where
187 Db: HoprDbProtocolOperations + std::fmt::Debug + Clone + Send + Sync + 'static,
188{
189 let me = packet_cfg.packet_keypair.clone();
190
191 #[cfg(feature = "capture")]
192 let me_pub = *hopr_crypto_types::keypairs::Keypair::public(&me);
193
194 let mut processes = HashMap::new();
195
196 #[cfg(all(feature = "prometheus", not(test)))]
197 {
198 lazy_static::initialize(&METRIC_PACKET_COUNT);
200 lazy_static::initialize(&METRIC_PACKET_COUNT_PER_PEER);
201 lazy_static::initialize(&METRIC_REPLAYED_PACKET_COUNT);
202 lazy_static::initialize(&METRIC_REJECTED_TICKETS_COUNT);
203 }
204
205 #[cfg(feature = "capture")]
206 let capture = {
207 use std::str::FromStr;
208 let writer: Box<dyn capture::PacketWriter + Send + 'static> =
209 if let Ok(desc) = std::env::var("HOPR_CAPTURE_PACKETS") {
210 if let Ok(udp_writer) = std::net::SocketAddr::from_str(&desc)
211 .map_err(std::io::Error::other)
212 .and_then(capture::UdpPacketDump::new)
213 {
214 warn!("udp packet capture initialized to {desc}");
215 Box::new(udp_writer)
216 } else if let Ok(pcap_writer) = std::fs::File::create(&desc).and_then(capture::PcapPacketWriter::new) {
217 warn!("pcap file packet capture initialized to {desc}");
218 Box::new(pcap_writer)
219 } else {
220 error!(desc, "failed to create packet capture: invalid socket address or file");
221 Box::new(capture::NullWriter)
222 }
223 } else {
224 warn!("no packet capture specified");
225 Box::new(capture::NullWriter)
226 };
227 let (capture, ah) = capture::packet_capture_channel(writer);
228 processes.insert(ProtocolProcesses::Capture, ah);
229 capture
230 };
231
232 let tbf = std::sync::Arc::new(parking_lot::Mutex::new(TagBloomFilter::default()));
233
234 let (ticket_ack_tx, ticket_ack_rx) =
235 futures::channel::mpsc::channel::<(Acknowledgement, OffchainPublicKey)>(TICKET_ACK_BUFFER_SIZE);
236
237 let db_clone = db.clone();
238 processes.insert(
239 ProtocolProcesses::TicketAck,
240 spawn_as_abortable!(ticket_ack_rx
241 .for_each_concurrent(NUM_CONCURRENT_TICKET_ACK_PROCESSING, move |(ack, sender)| {
242 let db = db_clone.clone();
243 async move {
244 if let Ok(verified) = hopr_parallelize::cpu::spawn_blocking(move || ack.verify(&sender)).await {
245 trace!(%sender, "received a valid acknowledgement");
246 match db.handle_acknowledgement(verified).await {
247 Ok(_) => trace!(%sender, "successfully processed a known acknowledgement"),
248 Err(error) => trace!(%sender, %error, "valid acknowledgement is unknown or error occurred while processing it"),
251 }
252 } else {
253 error!(%sender, "failed to verify signature on acknowledgement");
254 }
255 }
256 }))
257 );
258
259 let (ack_out_tx, ack_out_rx) =
260 futures::channel::mpsc::channel::<(Option<HalfKey>, OffchainPublicKey)>(ACK_OUT_BUFFER_SIZE);
261
262 #[cfg(feature = "capture")]
263 let capture_clone = capture.clone();
264
265 let db_clone = db.clone();
266 let me_clone = me.clone();
267 let msg_to_send_tx = wire_msg.0.clone();
268 processes.insert(
269 ProtocolProcesses::AckOut,
270 spawn_as_abortable!(ack_out_rx.for_each_concurrent(
271 NUM_CONCURRENT_ACK_OUT_PROCESSING,
272 move |(maybe_ack_key, destination)| {
273 let db = db_clone.clone();
274 let me = me_clone.clone();
275 let mut msg_to_send_tx_clone = msg_to_send_tx.clone();
276
277 #[cfg(feature = "capture")]
278 let mut capture = capture_clone.clone();
279 async move {
280 #[cfg(feature = "capture")]
281 let (is_random, me_pub) = (
282 maybe_ack_key.is_none(),
283 *hopr_crypto_types::keypairs::Keypair::public(&me),
284 );
285
286 let ack = hopr_parallelize::cpu::spawn_blocking(move || {
288 maybe_ack_key
289 .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &me))
290 .unwrap_or_else(|| VerifiedAcknowledgement::random(&me))
291 })
292 .await;
293
294 #[cfg(feature = "capture")]
295 let captured_packet: capture::CapturedPacket = capture::PacketBeforeTransit::OutgoingAck {
296 me: me_pub,
297 ack,
298 is_random,
299 next_hop: destination,
300 }
301 .into();
302
303 match db.to_send_no_ack(ack.leak().as_ref().into(), destination).await {
304 Ok(ack_packet) => {
305 let now = std::time::Instant::now();
306 if msg_to_send_tx_clone
307 .send((ack_packet.next_hop.into(), ack_packet.data))
308 .await
309 .is_err()
310 {
311 error!("failed to forward an acknowledgement to the transport layer");
312 }
313 let elapsed = now.elapsed();
314 if elapsed.as_millis() > SLOW_OP_MS {
315 warn!(?elapsed, " msg_to_send_tx.send on ack took too long");
316 }
317
318 #[cfg(feature = "capture")]
319 let _ = capture.try_send(captured_packet);
320 }
321 Err(error) => tracing::error!(%error, "failed to create ack packet"),
322 }
323 }
324 }
325 )),
326 );
327
328 let msg_processor_read = processor::PacketProcessor::new(db.clone(), packet_cfg);
329 let msg_processor_write = msg_processor_read.clone();
330
331 #[cfg(feature = "capture")]
332 let capture_clone = capture.clone();
333
334 let msg_to_send_tx = wire_msg.0.clone();
335 processes.insert(
336 ProtocolProcesses::MsgOut,
337 spawn_as_abortable!(async move {
338 let _neverending = api
339 .1
340 .then_concurrent(|(data, routing, finalizer)| {
341 let msg_processor = msg_processor_write.clone();
342
343 #[cfg(feature = "capture")]
344 let (mut capture_clone, data_clone, num_surbs) =
345 (capture_clone.clone(), data.clone(), routing.count_return_paths() as u8);
346
347 async move {
348 match PacketWrapping::send(&msg_processor, data, routing).await {
349 Ok(v) => {
350 #[cfg(all(feature = "prometheus", not(test)))]
351 {
352 METRIC_PACKET_COUNT_PER_PEER.increment(&[&v.next_hop.to_string(), "out"]);
353 METRIC_PACKET_COUNT.increment(&["sent"]);
354 }
355 finalizer.finalize(Ok(()));
356
357 #[cfg(feature = "capture")]
358 let _ = capture_clone.try_send(
359 capture::PacketBeforeTransit::OutgoingPacket {
360 me: me_pub,
361 next_hop: v.next_hop,
362 num_surbs,
363 is_forwarded: false,
364 data: data_clone.data.to_bytes().into_vec().into(),
365 ack_challenge: v.ack_challenge.as_ref().into(),
366 signals: data_clone.packet_info.unwrap_or_default().signals_to_destination,
367 ticket: inspect_ticket_data_in_packet(&v.data).into(),
368 }
369 .into(),
370 );
371
372 Some((v.next_hop.into(), v.data))
373 }
374 Err(e) => {
375 finalizer.finalize(Err(e));
376 None
377 }
378 }
379 }
380 })
381 .filter_map(futures::future::ready)
382 .inspect(|(peer, _)| trace!(%peer, "protocol message out"))
383 .map(Ok)
384 .forward(msg_to_send_tx)
385 .instrument(tracing::trace_span!("msg protocol processing - outgoing"))
386 .await;
387 }),
388 );
389
390 let ack_out_tx_clone_1 = ack_out_tx.clone();
391 let ack_out_tx_clone_2 = ack_out_tx.clone();
392
393 #[cfg(feature = "capture")]
394 let capture_clone = capture.clone();
395
396 let peer_id_cache: moka::future::Cache<PeerId, OffchainPublicKey> = moka::future::Cache::builder()
398 .time_to_idle(Duration::from_secs(600))
399 .max_capacity(100_000)
400 .build();
401
402 processes.insert(
403 ProtocolProcesses::MsgIn,
404 spawn_as_abortable!(async move {
405 let _neverending = wire_msg
406 .1
407 .then_concurrent(move |(peer, data)| {
408 let msg_processor = msg_processor_read.clone();
409 let mut ack_out_tx = ack_out_tx_clone_1.clone();
410 let peer_id_key_cache = peer_id_cache.clone();
411
412 trace!(%peer, "protocol message in");
413
414 #[cfg(feature = "capture")]
415 let (mut capture_clone, ticket_data_clone) = (
416 capture.clone(),
417 inspect_ticket_data_in_packet(&data).to_vec()
418 );
419
420 async move {
421 let peer_key = match peer_id_key_cache
423 .try_get_with_by_ref(&peer, hopr_parallelize::cpu::spawn_fifo_blocking(move || OffchainPublicKey::from_peerid(&peer)))
424 .await {
425 Ok(peer) => peer,
426 Err(error) => {
427 error!(%peer, %error, "dropping packet - cannot convert peer id");
429 return None;
430 }
431 };
432
433 let now = std::time::Instant::now();
434 let res = msg_processor.recv(peer_key, data).await.map_err(move |e| (peer, e));
435 let elapsed = now.elapsed();
436 if elapsed.as_millis() > SLOW_OP_MS {
437 warn!(%peer, ?elapsed, "msg_processor.recv took too long");
438 }
439
440 if let Err((peer, error)) = &res {
443 #[cfg(all(feature = "prometheus", not(test)))]
444 if let hopr_crypto_packet::errors::PacketError::TicketValidation(_) = error {
445 METRIC_REJECTED_TICKETS_COUNT.increment();
446 }
447
448 error!(%peer, %error, "failed to process the received message");
449
450 if let hopr_crypto_packet::errors::PacketError::PacketDecodingError(_) | hopr_crypto_packet::errors::PacketError::SphinxError(_) = error {
452 tracing::trace!(%peer, "not sending ack back on undecodable packet - possible adversarial behavior");
456 } else {
457 let now = std::time::Instant::now();
458
459 if let Err(error) = ack_out_tx.send((None, peer_key)).await {
460 tracing::error!(%error, "failed to send ack to the egress queue");
461 }
462 let elapsed = now.elapsed();
463 if elapsed.as_millis() > SLOW_OP_MS {
464 warn!(%peer, ?elapsed, "ack_out.send on failed packet took too long");
465 }
466 }
467 }
468
469 #[cfg(feature = "capture")]
470 if let Ok(packet) = &res {
471 let _ = capture_clone.try_send(capture::PacketBeforeTransit::IncomingPacket {
472 me: me_pub,
473 packet,
474 ticket: ticket_data_clone.into(),
475 }.into()
476 );
477 }
478
479 res.ok()
480 }
481 })
482 .filter_map(move |maybe_packet| {
483 let tbf = tbf.clone();
484
485 futures::future::ready(
486 if let Some(packet) = maybe_packet {
487 match packet {
488 IncomingPacket::Acknowledgement { packet_tag, previous_hop, .. } |
489 IncomingPacket::Final { packet_tag, previous_hop,.. } |
490 IncomingPacket::Forwarded { packet_tag, previous_hop, .. } => {
491 if tbf.lock().check_and_set(&packet_tag) {
494 warn!(%previous_hop, "replayed packet received");
495
496 #[cfg(all(feature = "prometheus", not(test)))]
497 METRIC_REPLAYED_PACKET_COUNT.increment();
498
499 None
500 } else {
501 Some(packet)
502 }
503 }
504 }
505 } else {
506 trace!("received empty packet");
507 None
508 }
509 )
510 })
511 .then_concurrent(move |packet| {
512 let mut msg_to_send_tx = wire_msg.0.clone();
513
514 #[cfg(feature = "capture")]
515 let mut capture_clone = capture_clone.clone();
516
517 let mut ticket_ack_tx_clone = ticket_ack_tx.clone();
518 let mut ack_out_tx = ack_out_tx_clone_2.clone();
519 async move {
520
521 match packet {
522 IncomingPacket::Acknowledgement {
523 previous_hop,
524 ack,
525 ..
526 } => {
527 trace!(%previous_hop, "acknowledging ticket using received ack");
528 let now = std::time::Instant::now();
529 if let Err(error) = ticket_ack_tx_clone.send((ack, previous_hop)).await {
530 tracing::error!(%error, "failed dispatching received acknowledgement to the ticket ack queue");
531 }
532 let elapsed = now.elapsed();
533 if elapsed.as_millis() > SLOW_OP_MS {
534 warn!(?elapsed," ack_tx.send took too long");
535 }
536 let elapsed = now.elapsed();
537 if elapsed.as_millis() > SLOW_OP_MS {
538 warn!(%previous_hop, ?elapsed, "ack_processor.handle_acknowledgement took too long");
539 }
540
541 None
543 },
544 IncomingPacket::Final {
545 previous_hop,
546 sender,
547 plain_text,
548 ack_key,
549 info,
550 ..
551 } => {
552 trace!(%previous_hop, "acknowledging final packet back");
554 let now = std::time::Instant::now();
555 if let Err(error) = ack_out_tx.send((Some(ack_key), previous_hop)).await {
556 tracing::error!(%error, "failed to send ack to the egress queue");
557 }
558 let elapsed = now.elapsed();
559 if elapsed.as_millis() > SLOW_OP_MS {
560 warn!(%previous_hop, ?elapsed, "ack_out.send on final packet took too long");
561 }
562
563 #[cfg(all(feature = "prometheus", not(test)))]
564 {
565 METRIC_PACKET_COUNT_PER_PEER.increment(&[&previous_hop.to_string(), "in"]);
566 METRIC_PACKET_COUNT.increment(&["received"]);
567 }
568
569 Some((sender, plain_text, info))
570 }
571 IncomingPacket::Forwarded {
572 previous_hop,
573 next_hop,
574 data,
575 ack_key,
576 ..
577 } => {
578 trace!(%previous_hop, %next_hop, "forwarding packet to the next hop");
580
581 #[cfg(feature = "capture")]
582 let captured_packet: capture::CapturedPacket = capture::PacketBeforeTransit::OutgoingPacket {
583 me: me_pub,
584 next_hop,
585 num_surbs: 0,
586 is_forwarded: true,
587 data: data.as_ref().into(),
588 ack_challenge: Default::default(),
589 signals: None.into(),
590 ticket: inspect_ticket_data_in_packet(data.as_ref()).into()
591 }.into();
592
593 msg_to_send_tx
594 .send((next_hop.into(), data))
595 .await
596 .unwrap_or_else(|_| {
597 error!("failed to forward a packet to the transport layer");
598 });
599
600 #[cfg(all(feature = "prometheus", not(test)))]
601 {
602 METRIC_PACKET_COUNT.increment(&["forwarded"]);
603 }
604
605 #[cfg(feature = "capture")]
606 let _ = capture_clone.try_send(captured_packet);
607
608 trace!(%previous_hop, "acknowledging forwarded packet back");
610 let now = std::time::Instant::now();
611 if let Err(error) = ack_out_tx.send((Some(ack_key), previous_hop)).await {
612 tracing::error!(%error, "failed to send ack to the egress queue");
613 }
614 let elapsed = now.elapsed();
615 if elapsed.as_millis() > SLOW_OP_MS {
616 warn!(%previous_hop, ?elapsed, "ack_out.send on forwarded packet took too long");
617 }
618
619 None
620 }
621 }
622 }})
623 .filter_map(|maybe_data| futures::future::ready(
624 maybe_data
626 .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
627 .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
628 .ok()
629 .map(|data| (sender, ApplicationDataIn {
630 data,
631 packet_info: IncomingPacketInfo {
632 signals_from_sender: aux_info.packet_signals,
633 num_saved_surbs: aux_info.num_surbs,
634 }
635 })))
636 ))
637 .map(Ok)
638 .forward(api.0)
639 .instrument(tracing::trace_span!("msg protocol processing - incoming"))
640 .await;
641 }),
642 );
643
644 processes
645}