1mod codec;
53
54pub mod config;
56pub mod errors;
58
59pub mod heartbeat;
62pub mod processor;
64
65pub mod stream;
67
68pub mod timer;
69
70#[cfg(feature = "capture")]
74mod capture;
75
76use std::{collections::HashMap, time::Duration};
77
78use futures::{FutureExt, SinkExt, StreamExt};
79use futures_time::future::FutureExt as FuturesTimeExt;
80use hopr_api::{
81 chain::{ChainKeyOperations, ChainReadChannelOperations, ChainValues},
82 db::{HoprDbProtocolOperations, IncomingPacket},
83};
84use hopr_async_runtime::spawn_as_abortable;
85use hopr_crypto_types::types::{HalfKey, OffchainPublicKey};
86use hopr_internal_types::{
87 prelude::{Acknowledgement, HoprPseudonym},
88 protocol::VerifiedAcknowledgement,
89};
90use hopr_network_types::prelude::ResolvedTransportRouting;
91use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, IncomingPacketInfo};
92use hopr_transport_bloom::TagBloomFilter;
93use hopr_transport_identity::{Multiaddr, PeerId};
94use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
95use tracing::Instrument;
96
97use crate::processor::{PacketUnwrapping, PacketWrapping};
98pub use crate::timer::execute_on_tick;
99
100const HOPR_PACKET_SIZE: usize = hopr_crypto_packet::prelude::HoprPacket::SIZE;
101const SLOW_OP: std::time::Duration = std::time::Duration::from_millis(150);
102const QUEUE_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
103
104pub type HoprBinaryCodec = crate::codec::FixedLengthCodec<HOPR_PACKET_SIZE>;
105pub const CURRENT_HOPR_MSG_PROTOCOL: &str = "/hopr/mix/1.0.0";
106
107pub const TICKET_ACK_BUFFER_SIZE: usize = 1_000_000;
108pub const NUM_CONCURRENT_TICKET_ACK_PROCESSING: usize = 10;
109
110pub const ACK_OUT_BUFFER_SIZE: usize = 1_000_000;
111pub const NUM_CONCURRENT_ACK_OUT_PROCESSING: usize = 10;
112
113#[cfg(all(feature = "prometheus", not(test)))]
114lazy_static::lazy_static! {
115 static ref METRIC_PACKET_COUNT: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
117 "hopr_packets_count",
118 "Number of processed packets of different types (sent, received, forwarded)",
119 &["type"]
120 ).unwrap();
121 static ref METRIC_REPLAYED_PACKET_COUNT: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
122 "hopr_replayed_packet_count",
123 "The total count of replayed packets during the packet processing pipeline run",
124 ).unwrap();
125}
126
127#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
128pub enum ProtocolProcesses {
129 #[strum(to_string = "HOPR [msg] - ingress")]
130 MsgIn,
131 #[strum(to_string = "HOPR [msg] - egress")]
132 MsgOut,
133 #[strum(to_string = "HOPR [ack] - egress")]
134 AckOut,
135 #[strum(to_string = "HOPR [ack] - ingress - ticket acknowledgement")]
136 TicketAck,
137 #[strum(to_string = "HOPR [msg] - mixer")]
138 Mixer,
139 #[cfg(feature = "capture")]
140 #[strum(to_string = "packet capture")]
141 Capture,
142}
143#[derive(Debug, Clone)]
145pub enum PeerDiscovery {
146 Allow(PeerId),
147 Ban(PeerId),
148 Announce(PeerId, Vec<Multiaddr>),
149}
150
151#[cfg(feature = "capture")]
152fn inspect_ticket_data_in_packet(raw_packet: &[u8]) -> &[u8] {
153 use hopr_primitive_types::traits::BytesEncodable;
154 if raw_packet.len() >= hopr_internal_types::tickets::Ticket::SIZE {
155 &raw_packet[raw_packet.len() - hopr_internal_types::tickets::Ticket::SIZE..]
156 } else {
157 &[]
158 }
159}
160
161#[allow(clippy::too_many_arguments)]
166pub async fn run_msg_ack_protocol<Db, R>(
167 packet_cfg: processor::PacketInteractionConfig,
168 db: Db,
169 resolver: R,
170 wire_msg: (
171 impl futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
172 impl futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
173 ),
174 api: (
175 impl futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
176 impl futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
177 ),
178) -> HashMap<ProtocolProcesses, hopr_async_runtime::AbortHandle>
179where
180 Db: HoprDbProtocolOperations + Clone + Send + Sync + 'static,
181 R: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Clone + Send + Sync + 'static,
182{
183 let me = packet_cfg.packet_keypair.clone();
184
185 #[cfg(feature = "capture")]
186 let me_pub = *hopr_crypto_types::keypairs::Keypair::public(&me);
187
188 let mut processes = HashMap::new();
189
190 #[cfg(all(feature = "prometheus", not(test)))]
191 {
192 lazy_static::initialize(&METRIC_PACKET_COUNT);
194 lazy_static::initialize(&METRIC_REPLAYED_PACKET_COUNT);
195 }
196
197 #[cfg(feature = "capture")]
198 let capture = {
199 use std::{path::PathBuf, str::FromStr};
200 let writer: Box<dyn capture::PacketWriter + Send + 'static> =
201 if let Ok(desc) = std::env::var("HOPR_CAPTURE_PACKETS") {
202 if let Ok(udp_writer) = std::net::SocketAddr::from_str(&desc)
203 .map_err(std::io::Error::other)
204 .and_then(capture::UdpPacketDump::new)
205 {
206 tracing::warn!("udp packet capture initialized to {desc}");
207 Box::new(udp_writer)
208 } else if let Ok(pcap_writer) = std::fs::File::create(&desc).and_then(capture::PcapPacketWriter::new) {
209 tracing::info!("pcap file packet capture initialized to {desc}");
210 Box::new(pcap_writer)
211 } else {
212 tracing::error!(desc, "failed to create packet capture: invalid socket address or file");
213 Box::new(capture::NullWriter)
214 }
215 } else {
216 tracing::warn!("no packet capture specified");
217 Box::new(capture::NullWriter)
218 };
219 let start_capturing_path: Option<PathBuf> = std::env::var("HOPR_CAPTURE_PATH_TRIGGER").ok().map(PathBuf::from);
220 if let Some(ref path) = start_capturing_path {
221 tracing::info!(
222 "To start capturing packets, create it by running 'touch {}'",
223 path.display()
224 );
225 } else {
226 tracing::warn!("The env var 'HOPR_CAPTURE_PATH_TRIGGER' is not set, packet capture won't start");
227 };
228 let (capture, ah) = capture::packet_capture_channel(writer, start_capturing_path);
229 processes.insert(ProtocolProcesses::Capture, ah);
230 capture
231 };
232
233 let tbf = std::sync::Arc::new(parking_lot::Mutex::new(TagBloomFilter::default()));
234
235 let (ticket_ack_tx, ticket_ack_rx) =
236 futures::channel::mpsc::channel::<(Acknowledgement, OffchainPublicKey)>(TICKET_ACK_BUFFER_SIZE);
237
238 let db_clone = db.clone();
239 let resolver_clone = resolver.clone();
240 processes.insert(
241 ProtocolProcesses::TicketAck,
242 spawn_as_abortable!(ticket_ack_rx
243 .for_each_concurrent(NUM_CONCURRENT_TICKET_ACK_PROCESSING, move |(ack, sender)| {
244 let db = db_clone.clone();
245 let resolver = resolver_clone.clone();
246 async move {
247 if let Ok(verified) = hopr_parallelize::cpu::spawn_blocking(move || ack.verify(&sender)).await {
248 tracing::trace!(%sender, "received a valid acknowledgement");
249 match db.handle_acknowledgement(verified, &resolver).await {
250 Ok(_) => tracing::trace!(%sender, "successfully processed a known acknowledgement"),
251 Err(error) => tracing::trace!(%sender, %error, "valid acknowledgement is unknown or error occurred while processing it"),
254 }
255 } else {
256 tracing::error!(%sender, "failed to verify signature on acknowledgement");
257 }
258 }
259 })
260 .inspect(|_| tracing::warn!(task = "transport (protocol - ticket acknowledgement)", "long-running background task finished")))
261 );
262
263 let (ack_out_tx, ack_out_rx) =
264 futures::channel::mpsc::channel::<(Option<HalfKey>, OffchainPublicKey)>(ACK_OUT_BUFFER_SIZE);
265
266 #[cfg(feature = "capture")]
267 let capture_clone = capture.clone();
268
269 let db_clone = db.clone();
270 let resolver_clone = resolver.clone();
271 let me_clone = me.clone();
272 let msg_to_send_tx = wire_msg.0.clone();
273 processes.insert(
274 ProtocolProcesses::AckOut,
275 spawn_as_abortable!(
276 ack_out_rx
277 .for_each_concurrent(
278 NUM_CONCURRENT_ACK_OUT_PROCESSING,
279 move |(maybe_ack_key, destination)| {
280 let db = db_clone.clone();
281 let resolver = resolver_clone.clone();
282 let me = me_clone.clone();
283 let mut msg_to_send_tx_clone = msg_to_send_tx.clone();
284
285 #[cfg(feature = "capture")]
286 let mut capture = capture_clone.clone();
287 async move {
288 #[cfg(feature = "capture")]
289 let (is_random, me_pub) = (
290 maybe_ack_key.is_none(),
291 *hopr_crypto_types::keypairs::Keypair::public(&me),
292 );
293
294 let ack = hopr_parallelize::cpu::spawn_blocking(move || {
296 maybe_ack_key
297 .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &me))
298 .unwrap_or_else(|| VerifiedAcknowledgement::random(&me))
299 })
300 .await;
301
302 #[cfg(feature = "capture")]
303 let captured_packet: capture::CapturedPacket = capture::PacketBeforeTransit::OutgoingAck {
304 me: me_pub,
305 ack,
306 is_random,
307 next_hop: destination,
308 }
309 .into();
310
311 match db
312 .to_send_no_ack(ack.leak().as_ref().into(), destination, &resolver)
313 .await
314 {
315 Ok(ack_packet) => {
316 msg_to_send_tx_clone
317 .send((ack_packet.next_hop.into(), ack_packet.data))
318 .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
319 .await
320 .unwrap_or_else(|_| {
321 tracing::error!(
322 "failed to forward an acknowledgement to the transport layer: timeout"
323 );
324 Ok(())
325 })
326 .unwrap_or_else(|_| {
327 tracing::error!(
328 "failed to forward an acknowledgement to the transport layer"
329 );
330 });
331
332 #[cfg(feature = "capture")]
333 let _ = capture.try_send(captured_packet);
334 }
335 Err(error) => tracing::error!(%error, "failed to create ack packet"),
336 }
337 }
338 }
339 )
340 .inspect(|_| tracing::warn!(
341 task = "transport (protocol - ack outgoing)",
342 "long-running background task finished"
343 ))
344 ),
345 );
346
347 let msg_processor_read = processor::PacketProcessor::new(db.clone(), resolver.clone(), packet_cfg);
348 let msg_processor_write = msg_processor_read.clone();
349
350 #[cfg(feature = "capture")]
351 let capture_clone = capture.clone();
352
353 let msg_to_send_tx = wire_msg.0.clone();
354 processes.insert(
355 ProtocolProcesses::MsgOut,
356 spawn_as_abortable!(async move {
357 let _neverending = api
358 .1
359 .then_concurrent(|(routing, data)| {
360 let msg_processor = msg_processor_write.clone();
361
362 #[cfg(feature = "capture")]
363 let (mut capture_clone, data_clone, num_surbs) =
364 (capture_clone.clone(), data.clone(), routing.count_return_paths() as u8);
365
366 async move {
367 match PacketWrapping::send(&msg_processor, data, routing).await {
368 Ok(v) => {
369 #[cfg(all(feature = "prometheus", not(test)))]
370 {
371 METRIC_PACKET_COUNT.increment(&["sent"]);
372 }
373
374 #[cfg(feature = "capture")]
375 let _ = capture_clone.try_send(
376 capture::PacketBeforeTransit::OutgoingPacket {
377 me: me_pub,
378 next_hop: v.next_hop,
379 num_surbs,
380 is_forwarded: false,
381 data: data_clone.data.to_bytes().into_vec().into(),
382 ack_challenge: v.ack_challenge.as_ref().into(),
383 signals: data_clone.packet_info.unwrap_or_default().signals_to_destination,
384 ticket: inspect_ticket_data_in_packet(&v.data).into(),
385 }
386 .into(),
387 );
388
389 Some((v.next_hop.into(), v.data))
390 }
391 Err(error) => {
392 tracing::error!(%error, "packet could not be wrapped for sending");
393 None
394 }
395 }
396 }
397 })
398 .filter_map(futures::future::ready)
399 .inspect(|(peer, _)| tracing::trace!(%peer, "protocol message out"))
400 .map(Ok)
401 .forward(msg_to_send_tx)
402 .instrument(tracing::trace_span!("msg protocol processing - egress"))
403 .inspect(|_| {
404 tracing::warn!(
405 task = "transport (protocol - msg egress)",
406 "long-running background task finished"
407 )
408 })
409 .await;
410 }),
411 );
412
413 let ack_out_tx_clone_1 = ack_out_tx.clone();
414 let ack_out_tx_clone_2 = ack_out_tx.clone();
415
416 #[cfg(feature = "capture")]
417 let capture_clone = capture.clone();
418
419 let peer_id_cache: moka::future::Cache<PeerId, OffchainPublicKey> = moka::future::Cache::builder()
421 .time_to_idle(Duration::from_secs(600))
422 .max_capacity(100_000)
423 .build();
424
425 processes.insert(
426 ProtocolProcesses::MsgIn,
427 spawn_as_abortable!(async move {
428 let _neverending = wire_msg
429 .1
430 .then_concurrent(move |(peer, data)| {
431 let msg_processor = msg_processor_read.clone();
432 let mut ack_out_tx = ack_out_tx_clone_1.clone();
433 let peer_id_key_cache = peer_id_cache.clone();
434
435 tracing::trace!(%peer, "protocol message in");
436
437 #[cfg(feature = "capture")]
438 let (mut capture_clone, ticket_data_clone) = (
439 capture.clone(),
440 inspect_ticket_data_in_packet(&data).to_vec()
441 );
442
443 async move {
444 let peer_key = match peer_id_key_cache
446 .try_get_with_by_ref(&peer, hopr_parallelize::cpu::spawn_fifo_blocking(move || OffchainPublicKey::from_peerid(&peer)))
447 .await {
448 Ok(peer) => peer,
449 Err(error) => {
450 tracing::error!(%peer, %error, "dropping packet - cannot convert peer id");
452 return None;
453 }
454 };
455
456 let now = std::time::Instant::now();
457 let res = msg_processor.recv(peer_key, data).await;
458 let elapsed = now.elapsed();
459 if elapsed > SLOW_OP {
460 tracing::warn!(%peer, ?elapsed, "msg_processor.recv took too long");
461 }
462
463 if let Err(error) = &res {
466 tracing::error!(%peer, %error, "failed to process the received packet");
467
468 if error.is_undecodable() {
470 tracing::trace!(%peer, "not sending ack back on undecodable packet - possible adversarial behavior");
474 } else {
475 ack_out_tx
476 .send((None, peer_key))
477 .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
478 .await
479 .unwrap_or_else(|_| {
480 tracing::error!("failed to send ack to the egress queue: timeout");
481 Ok(())
482 })
483 .unwrap_or_else(|_| {
484 tracing::error!("failed to send ack to the egress queue");
485 });
486 }
487 }
488
489 #[cfg(feature = "capture")]
490 if let Ok(packet) = &res {
491 let _ = capture_clone.try_send(capture::PacketBeforeTransit::IncomingPacket {
492 me: me_pub,
493 packet,
494 ticket: ticket_data_clone.into(),
495 }.into()
496 );
497 }
498
499 res.ok()
500 }
501 })
502 .filter_map(move |maybe_packet| {
503 let tbf = tbf.clone();
504
505 futures::future::ready(
506 if let Some(packet) = maybe_packet {
507 match packet {
508 IncomingPacket::Acknowledgement { packet_tag, previous_hop, .. } |
509 IncomingPacket::Final { packet_tag, previous_hop,.. } |
510 IncomingPacket::Forwarded { packet_tag, previous_hop, .. } => {
511 if tbf.lock().check_and_set(&packet_tag) {
514 tracing::warn!(%previous_hop, "replayed packet received");
515
516 #[cfg(all(feature = "prometheus", not(test)))]
517 METRIC_REPLAYED_PACKET_COUNT.increment();
518
519 None
520 } else {
521 Some(packet)
522 }
523 }
524 }
525 } else {
526 tracing::trace!("received empty packet");
527 None
528 }
529 )
530 })
531 .then_concurrent(move |packet| {
532 let mut msg_to_send_tx = wire_msg.0.clone();
533
534 #[cfg(feature = "capture")]
535 let mut capture_clone = capture_clone.clone();
536
537 let mut ticket_ack_tx_clone = ticket_ack_tx.clone();
538 let mut ack_out_tx = ack_out_tx_clone_2.clone();
539 async move {
540
541 match packet {
542 IncomingPacket::Acknowledgement {
543 previous_hop,
544 ack,
545 ..
546 } => {
547 tracing::trace!(%previous_hop, "acknowledging ticket using received ack");
548 ticket_ack_tx_clone
549 .send((ack, previous_hop))
550 .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
551 .await
552 .unwrap_or_else(|_| {
553 tracing::error!("failed dispatching received acknowledgement to the ticket ack queue: timeout");
554 Ok(())
555 })
556 .unwrap_or_else(|_| {
557 tracing::error!("failed dispatching received acknowledgement to the ticket ack queue");
558 });
559
560 None
562 },
563 IncomingPacket::Final {
564 previous_hop,
565 sender,
566 plain_text,
567 ack_key,
568 info,
569 ..
570 } => {
571 ack_out_tx
573 .send((Some(ack_key), previous_hop))
574 .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
575 .await
576 .unwrap_or_else(|_| {
577 tracing::error!("failed to send ack to the egress queue: timeout");
578 Ok(())
579 })
580 .unwrap_or_else(|_| {
581 tracing::error!("failed to send ack to the egress queue");
582 });
583
584 #[cfg(all(feature = "prometheus", not(test)))]
585 {
586 METRIC_PACKET_COUNT.increment(&["received"]);
587 }
588
589 Some((sender, plain_text, info))
590 }
591 IncomingPacket::Forwarded {
592 previous_hop,
593 next_hop,
594 data,
595 ack_key,
596 ..
597 } => {
598 tracing::trace!(%previous_hop, %next_hop, "forwarding packet to the next hop");
600
601 #[cfg(feature = "capture")]
602 let captured_packet: capture::CapturedPacket = capture::PacketBeforeTransit::OutgoingPacket {
603 me: me_pub,
604 next_hop,
605 num_surbs: 0,
606 is_forwarded: true,
607 data: data.as_ref().into(),
608 ack_challenge: Default::default(),
609 signals: None.into(),
610 ticket: inspect_ticket_data_in_packet(data.as_ref()).into()
611 }.into();
612
613 msg_to_send_tx
614 .send((next_hop.into(), data))
615 .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
616 .await
617 .unwrap_or_else(|_| {
618 tracing::error!("failed to forward a packet to the transport layer: timeout");
619 Ok(())
620 })
621 .unwrap_or_else(|_| {
622 tracing::error!("failed to forward a packet to the transport layer");
623 });
624
625 #[cfg(all(feature = "prometheus", not(test)))]
626 {
627 METRIC_PACKET_COUNT.increment(&["forwarded"]);
628 }
629
630 #[cfg(feature = "capture")]
631 let _ = capture_clone.try_send(captured_packet);
632
633 tracing::trace!(%previous_hop, "acknowledging forwarded packet back");
635 ack_out_tx
636 .send((Some(ack_key), previous_hop))
637 .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
638 .await
639 .unwrap_or_else(|_| {
640 tracing::error!("failed to send ack to the egress queue: timeout");
641 Ok(())
642 })
643 .unwrap_or_else(|_| {
644 tracing::error!("failed to send ack to the egress queue");
645 });
646
647 None
648 }
649 }
650 }})
651 .filter_map(|maybe_data| futures::future::ready(
652 maybe_data
654 .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
655 .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
656 .ok()
657 .map(|data| (sender, ApplicationDataIn {
658 data,
659 packet_info: IncomingPacketInfo {
660 signals_from_sender: aux_info.packet_signals,
661 num_saved_surbs: aux_info.num_surbs,
662 }
663 })))
664 ))
665 .map(Ok)
666 .forward(api.0)
667 .instrument(tracing::trace_span!("msg protocol processing - ingress"))
668 .inspect(|_| tracing::warn!(task = "transport (protocol - msg ingress)", "long-running background task finished"))
669 .await;
670 }),
671 );
672
673 processes
674}