1mod codec;
53
54pub mod config;
56pub mod errors;
58
59pub mod heartbeat;
62pub mod processor;
64
65pub mod stream;
67
68#[cfg(feature = "capture")]
72mod capture;
73
74use std::{collections::HashMap, time::Duration};
75
76use futures::{FutureExt, SinkExt, StreamExt};
77use futures_time::future::FutureExt as FuturesTimeExt;
78use hopr_api::{
79 chain::{ChainKeyOperations, ChainReadChannelOperations, ChainValues},
80 db::{HoprDbProtocolOperations, IncomingPacket},
81};
82use hopr_async_runtime::spawn_as_abortable;
83use hopr_crypto_types::types::{HalfKey, OffchainPublicKey};
84use hopr_internal_types::{
85 prelude::{Acknowledgement, HoprPseudonym},
86 protocol::VerifiedAcknowledgement,
87};
88use hopr_network_types::prelude::ResolvedTransportRouting;
89use hopr_protocol_app::prelude::{ApplicationData, ApplicationDataIn, ApplicationDataOut, IncomingPacketInfo};
90use hopr_transport_bloom::TagBloomFilter;
91use hopr_transport_identity::{Multiaddr, PeerId};
92use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
93use tracing::Instrument;
94
95use crate::processor::{PacketUnwrapping, PacketWrapping};
96
97const HOPR_PACKET_SIZE: usize = hopr_crypto_packet::prelude::HoprPacket::SIZE;
98const SLOW_OP: std::time::Duration = std::time::Duration::from_millis(150);
99const QUEUE_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
100
101pub type HoprBinaryCodec = crate::codec::FixedLengthCodec<HOPR_PACKET_SIZE>;
102pub const CURRENT_HOPR_MSG_PROTOCOL: &str = "/hopr/mix/1.0.0";
103
104pub const TICKET_ACK_BUFFER_SIZE: usize = 1_000_000;
105pub const NUM_CONCURRENT_TICKET_ACK_PROCESSING: usize = 10;
106
107pub const ACK_OUT_BUFFER_SIZE: usize = 1_000_000;
108pub const NUM_CONCURRENT_ACK_OUT_PROCESSING: usize = 10;
109
110#[cfg(all(feature = "prometheus", not(test)))]
111lazy_static::lazy_static! {
112 static ref METRIC_PACKET_COUNT: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
114 "hopr_packets_count",
115 "Number of processed packets of different types (sent, received, forwarded)",
116 &["type"]
117 ).unwrap();
118 static ref METRIC_REPLAYED_PACKET_COUNT: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
119 "hopr_replayed_packet_count",
120 "The total count of replayed packets during the packet processing pipeline run",
121 ).unwrap();
122}
123
124#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
125pub enum ProtocolProcesses {
126 #[strum(to_string = "HOPR [msg] - ingress")]
127 MsgIn,
128 #[strum(to_string = "HOPR [msg] - egress")]
129 MsgOut,
130 #[strum(to_string = "HOPR [ack] - egress")]
131 AckOut,
132 #[strum(to_string = "HOPR [ack] - ingress - ticket acknowledgement")]
133 TicketAck,
134 #[strum(to_string = "HOPR [msg] - mixer")]
135 Mixer,
136 #[cfg(feature = "capture")]
137 #[strum(to_string = "packet capture")]
138 Capture,
139}
140#[derive(Debug, Clone)]
142pub enum PeerDiscovery {
143 Allow(PeerId),
144 Ban(PeerId),
145 Announce(PeerId, Vec<Multiaddr>),
146}
147
148#[cfg(feature = "capture")]
149fn inspect_ticket_data_in_packet(raw_packet: &[u8]) -> &[u8] {
150 use hopr_primitive_types::traits::BytesEncodable;
151 if raw_packet.len() >= hopr_internal_types::tickets::Ticket::SIZE {
152 &raw_packet[raw_packet.len() - hopr_internal_types::tickets::Ticket::SIZE..]
153 } else {
154 &[]
155 }
156}
157
158#[allow(clippy::too_many_arguments)]
163pub async fn run_msg_ack_protocol<Db, R>(
164 packet_cfg: processor::PacketInteractionConfig,
165 db: Db,
166 resolver: R,
167 wire_msg: (
168 impl futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
169 impl futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
170 ),
171 api: (
172 impl futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
173 impl futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
174 ),
175) -> HashMap<ProtocolProcesses, hopr_async_runtime::AbortHandle>
176where
177 Db: HoprDbProtocolOperations + Clone + Send + Sync + 'static,
178 R: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Clone + Send + Sync + 'static,
179{
180 let me = packet_cfg.packet_keypair.clone();
181
182 #[cfg(feature = "capture")]
183 let me_pub = *hopr_crypto_types::keypairs::Keypair::public(&me);
184
185 let mut processes = HashMap::new();
186
187 #[cfg(all(feature = "prometheus", not(test)))]
188 {
189 lazy_static::initialize(&METRIC_PACKET_COUNT);
191 lazy_static::initialize(&METRIC_REPLAYED_PACKET_COUNT);
192 }
193
194 #[cfg(feature = "capture")]
195 let capture = {
196 use std::{path::PathBuf, str::FromStr};
197 let writer: Box<dyn capture::PacketWriter + Send + 'static> =
198 if let Ok(desc) = std::env::var("HOPR_CAPTURE_PACKETS") {
199 if let Ok(udp_writer) = std::net::SocketAddr::from_str(&desc)
200 .map_err(std::io::Error::other)
201 .and_then(capture::UdpPacketDump::new)
202 {
203 tracing::warn!("udp packet capture initialized to {desc}");
204 Box::new(udp_writer)
205 } else if let Ok(pcap_writer) = std::fs::File::create(&desc).and_then(capture::PcapPacketWriter::new) {
206 tracing::info!("pcap file packet capture initialized to {desc}");
207 Box::new(pcap_writer)
208 } else {
209 tracing::error!(desc, "failed to create packet capture: invalid socket address or file");
210 Box::new(capture::NullWriter)
211 }
212 } else {
213 tracing::warn!("no packet capture specified");
214 Box::new(capture::NullWriter)
215 };
216 let start_capturing_path: Option<PathBuf> = std::env::var("HOPR_CAPTURE_PATH_TRIGGER").ok().map(PathBuf::from);
217 if let Some(ref path) = start_capturing_path {
218 tracing::info!(
219 "To start capturing packets, create it by running 'touch {}'",
220 path.display()
221 );
222 } else {
223 tracing::warn!("The env var 'HOPR_CAPTURE_PATH_TRIGGER' is not set, packet capture won't start");
224 };
225 let (capture, ah) = capture::packet_capture_channel(writer, start_capturing_path);
226 processes.insert(ProtocolProcesses::Capture, ah);
227 capture
228 };
229
230 let tbf = std::sync::Arc::new(parking_lot::Mutex::new(TagBloomFilter::default()));
231
232 let (ticket_ack_tx, ticket_ack_rx) =
233 futures::channel::mpsc::channel::<(Acknowledgement, OffchainPublicKey)>(TICKET_ACK_BUFFER_SIZE);
234
235 let db_clone = db.clone();
236 let resolver_clone = resolver.clone();
237 processes.insert(
238 ProtocolProcesses::TicketAck,
239 spawn_as_abortable!(ticket_ack_rx
240 .for_each_concurrent(NUM_CONCURRENT_TICKET_ACK_PROCESSING, move |(ack, sender)| {
241 let db = db_clone.clone();
242 let resolver = resolver_clone.clone();
243 async move {
244 if let Ok(verified) = hopr_parallelize::cpu::spawn_blocking(move || ack.verify(&sender)).await {
245 tracing::trace!(%sender, "received a valid acknowledgement");
246 match db.handle_acknowledgement(verified, &resolver).await {
247 Ok(_) => tracing::trace!(%sender, "successfully processed a known acknowledgement"),
248 Err(error) => tracing::trace!(%sender, %error, "valid acknowledgement is unknown or error occurred while processing it"),
251 }
252 } else {
253 tracing::error!(%sender, "failed to verify signature on acknowledgement");
254 }
255 }
256 })
257 .inspect(|_| tracing::warn!(task = "transport (protocol - ticket acknowledgement)", "long-running background task finished")))
258 );
259
260 let (ack_out_tx, ack_out_rx) =
261 futures::channel::mpsc::channel::<(Option<HalfKey>, OffchainPublicKey)>(ACK_OUT_BUFFER_SIZE);
262
263 #[cfg(feature = "capture")]
264 let capture_clone = capture.clone();
265
266 let db_clone = db.clone();
267 let resolver_clone = resolver.clone();
268 let me_clone = me.clone();
269 let msg_to_send_tx = wire_msg.0.clone();
270 processes.insert(
271 ProtocolProcesses::AckOut,
272 spawn_as_abortable!(
273 ack_out_rx
274 .for_each_concurrent(
275 NUM_CONCURRENT_ACK_OUT_PROCESSING,
276 move |(maybe_ack_key, destination)| {
277 let db = db_clone.clone();
278 let resolver = resolver_clone.clone();
279 let me = me_clone.clone();
280 let mut msg_to_send_tx_clone = msg_to_send_tx.clone();
281
282 #[cfg(feature = "capture")]
283 let mut capture = capture_clone.clone();
284 async move {
285 #[cfg(feature = "capture")]
286 let (is_random, me_pub) = (
287 maybe_ack_key.is_none(),
288 *hopr_crypto_types::keypairs::Keypair::public(&me),
289 );
290
291 let ack = hopr_parallelize::cpu::spawn_blocking(move || {
293 maybe_ack_key
294 .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &me))
295 .unwrap_or_else(|| VerifiedAcknowledgement::random(&me))
296 })
297 .await;
298
299 #[cfg(feature = "capture")]
300 let captured_packet: capture::CapturedPacket = capture::PacketBeforeTransit::OutgoingAck {
301 me: me_pub,
302 ack,
303 is_random,
304 next_hop: destination,
305 }
306 .into();
307
308 match db
309 .to_send_no_ack(ack.leak().as_ref().into(), destination, &resolver)
310 .await
311 {
312 Ok(ack_packet) => {
313 msg_to_send_tx_clone
314 .send((ack_packet.next_hop.into(), ack_packet.data))
315 .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
316 .await
317 .unwrap_or_else(|_| {
318 tracing::error!(
319 "failed to forward an acknowledgement to the transport layer: timeout"
320 );
321 Ok(())
322 })
323 .unwrap_or_else(|_| {
324 tracing::error!(
325 "failed to forward an acknowledgement to the transport layer"
326 );
327 });
328
329 #[cfg(feature = "capture")]
330 let _ = capture.try_send(captured_packet);
331 }
332 Err(error) => tracing::error!(%error, "failed to create ack packet"),
333 }
334 }
335 }
336 )
337 .inspect(|_| tracing::warn!(
338 task = "transport (protocol - ack outgoing)",
339 "long-running background task finished"
340 ))
341 ),
342 );
343
344 let msg_processor_read = processor::PacketProcessor::new(db.clone(), resolver.clone(), packet_cfg);
345 let msg_processor_write = msg_processor_read.clone();
346
347 #[cfg(feature = "capture")]
348 let capture_clone = capture.clone();
349
350 let msg_to_send_tx = wire_msg.0.clone();
351 processes.insert(
352 ProtocolProcesses::MsgOut,
353 spawn_as_abortable!(async move {
354 let _neverending = api
355 .1
356 .then_concurrent(|(routing, data)| {
357 let msg_processor = msg_processor_write.clone();
358
359 #[cfg(feature = "capture")]
360 let (mut capture_clone, data_clone, num_surbs) =
361 (capture_clone.clone(), data.clone(), routing.count_return_paths() as u8);
362
363 async move {
364 match PacketWrapping::send(&msg_processor, data, routing).await {
365 Ok(v) => {
366 #[cfg(all(feature = "prometheus", not(test)))]
367 {
368 METRIC_PACKET_COUNT.increment(&["sent"]);
369 }
370
371 #[cfg(feature = "capture")]
372 let _ = capture_clone.try_send(
373 capture::PacketBeforeTransit::OutgoingPacket {
374 me: me_pub,
375 next_hop: v.next_hop,
376 num_surbs,
377 is_forwarded: false,
378 data: data_clone.data.to_bytes().into_vec().into(),
379 ack_challenge: v.ack_challenge.as_ref().into(),
380 signals: data_clone.packet_info.unwrap_or_default().signals_to_destination,
381 ticket: inspect_ticket_data_in_packet(&v.data).into(),
382 }
383 .into(),
384 );
385
386 Some((v.next_hop.into(), v.data))
387 }
388 Err(error) => {
389 tracing::error!(%error, "packet could not be wrapped for sending");
390 None
391 }
392 }
393 }
394 })
395 .filter_map(futures::future::ready)
396 .inspect(|(peer, _)| tracing::trace!(%peer, "protocol message out"))
397 .map(Ok)
398 .forward(msg_to_send_tx)
399 .instrument(tracing::trace_span!("msg protocol processing - egress"))
400 .inspect(|_| {
401 tracing::warn!(
402 task = "transport (protocol - msg egress)",
403 "long-running background task finished"
404 )
405 })
406 .await;
407 }),
408 );
409
410 let ack_out_tx_clone_1 = ack_out_tx.clone();
411 let ack_out_tx_clone_2 = ack_out_tx.clone();
412
413 #[cfg(feature = "capture")]
414 let capture_clone = capture.clone();
415
416 let peer_id_cache: moka::future::Cache<PeerId, OffchainPublicKey> = moka::future::Cache::builder()
418 .time_to_idle(Duration::from_secs(600))
419 .max_capacity(100_000)
420 .build();
421
422 processes.insert(
423 ProtocolProcesses::MsgIn,
424 spawn_as_abortable!(async move {
425 let _neverending = wire_msg
426 .1
427 .then_concurrent(move |(peer, data)| {
428 let msg_processor = msg_processor_read.clone();
429 let mut ack_out_tx = ack_out_tx_clone_1.clone();
430 let peer_id_key_cache = peer_id_cache.clone();
431
432 tracing::trace!(%peer, "protocol message in");
433
434 #[cfg(feature = "capture")]
435 let (mut capture_clone, ticket_data_clone) = (
436 capture.clone(),
437 inspect_ticket_data_in_packet(&data).to_vec()
438 );
439
440 async move {
441 let peer_key = match peer_id_key_cache
443 .try_get_with_by_ref(&peer, hopr_parallelize::cpu::spawn_fifo_blocking(move || OffchainPublicKey::from_peerid(&peer)))
444 .await {
445 Ok(peer) => peer,
446 Err(error) => {
447 tracing::error!(%peer, %error, "dropping packet - cannot convert peer id");
449 return None;
450 }
451 };
452
453 let now = std::time::Instant::now();
454 let res = msg_processor.recv(peer_key, data).await;
455 let elapsed = now.elapsed();
456 if elapsed > SLOW_OP {
457 tracing::warn!(%peer, ?elapsed, "msg_processor.recv took too long");
458 }
459
460 if let Err(error) = &res {
463 tracing::error!(%peer, %error, "failed to process the received packet");
464
465 if error.is_undecodable() {
467 tracing::trace!(%peer, "not sending ack back on undecodable packet - possible adversarial behavior");
471 } else {
472 ack_out_tx
473 .send((None, peer_key))
474 .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
475 .await
476 .unwrap_or_else(|_| {
477 tracing::error!("failed to send ack to the egress queue: timeout");
478 Ok(())
479 })
480 .unwrap_or_else(|_| {
481 tracing::error!("failed to send ack to the egress queue");
482 });
483 }
484 }
485
486 #[cfg(feature = "capture")]
487 if let Ok(packet) = &res {
488 let _ = capture_clone.try_send(capture::PacketBeforeTransit::IncomingPacket {
489 me: me_pub,
490 packet,
491 ticket: ticket_data_clone.into(),
492 }.into()
493 );
494 }
495
496 res.ok()
497 }
498 })
499 .filter_map(move |maybe_packet| {
500 let tbf = tbf.clone();
501
502 futures::future::ready(
503 if let Some(packet) = maybe_packet {
504 match packet {
505 IncomingPacket::Acknowledgement { packet_tag, previous_hop, .. } |
506 IncomingPacket::Final { packet_tag, previous_hop,.. } |
507 IncomingPacket::Forwarded { packet_tag, previous_hop, .. } => {
508 if tbf.lock().check_and_set(&packet_tag) {
511 tracing::warn!(%previous_hop, "replayed packet received");
512
513 #[cfg(all(feature = "prometheus", not(test)))]
514 METRIC_REPLAYED_PACKET_COUNT.increment();
515
516 None
517 } else {
518 Some(packet)
519 }
520 }
521 }
522 } else {
523 tracing::trace!("received empty packet");
524 None
525 }
526 )
527 })
528 .then_concurrent(move |packet| {
529 let mut msg_to_send_tx = wire_msg.0.clone();
530
531 #[cfg(feature = "capture")]
532 let mut capture_clone = capture_clone.clone();
533
534 let mut ticket_ack_tx_clone = ticket_ack_tx.clone();
535 let mut ack_out_tx = ack_out_tx_clone_2.clone();
536 async move {
537
538 match packet {
539 IncomingPacket::Acknowledgement {
540 previous_hop,
541 ack,
542 ..
543 } => {
544 tracing::trace!(%previous_hop, "acknowledging ticket using received ack");
545 ticket_ack_tx_clone
546 .send((ack, previous_hop))
547 .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
548 .await
549 .unwrap_or_else(|_| {
550 tracing::error!("failed dispatching received acknowledgement to the ticket ack queue: timeout");
551 Ok(())
552 })
553 .unwrap_or_else(|_| {
554 tracing::error!("failed dispatching received acknowledgement to the ticket ack queue");
555 });
556
557 None
559 },
560 IncomingPacket::Final {
561 previous_hop,
562 sender,
563 plain_text,
564 ack_key,
565 info,
566 ..
567 } => {
568 ack_out_tx
570 .send((Some(ack_key), previous_hop))
571 .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
572 .await
573 .unwrap_or_else(|_| {
574 tracing::error!("failed to send ack to the egress queue: timeout");
575 Ok(())
576 })
577 .unwrap_or_else(|_| {
578 tracing::error!("failed to send ack to the egress queue");
579 });
580
581 #[cfg(all(feature = "prometheus", not(test)))]
582 {
583 METRIC_PACKET_COUNT.increment(&["received"]);
584 }
585
586 Some((sender, plain_text, info))
587 }
588 IncomingPacket::Forwarded {
589 previous_hop,
590 next_hop,
591 data,
592 ack_key,
593 ..
594 } => {
595 tracing::trace!(%previous_hop, %next_hop, "forwarding packet to the next hop");
597
598 #[cfg(feature = "capture")]
599 let captured_packet: capture::CapturedPacket = capture::PacketBeforeTransit::OutgoingPacket {
600 me: me_pub,
601 next_hop,
602 num_surbs: 0,
603 is_forwarded: true,
604 data: data.as_ref().into(),
605 ack_challenge: Default::default(),
606 signals: None.into(),
607 ticket: inspect_ticket_data_in_packet(data.as_ref()).into()
608 }.into();
609
610 msg_to_send_tx
611 .send((next_hop.into(), data))
612 .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
613 .await
614 .unwrap_or_else(|_| {
615 tracing::error!("failed to forward a packet to the transport layer: timeout");
616 Ok(())
617 })
618 .unwrap_or_else(|_| {
619 tracing::error!("failed to forward a packet to the transport layer");
620 });
621
622 #[cfg(all(feature = "prometheus", not(test)))]
623 {
624 METRIC_PACKET_COUNT.increment(&["forwarded"]);
625 }
626
627 #[cfg(feature = "capture")]
628 let _ = capture_clone.try_send(captured_packet);
629
630 tracing::trace!(%previous_hop, "acknowledging forwarded packet back");
632 ack_out_tx
633 .send((Some(ack_key), previous_hop))
634 .timeout(futures_time::time::Duration::from(QUEUE_SEND_TIMEOUT))
635 .await
636 .unwrap_or_else(|_| {
637 tracing::error!("failed to send ack to the egress queue: timeout");
638 Ok(())
639 })
640 .unwrap_or_else(|_| {
641 tracing::error!("failed to send ack to the egress queue");
642 });
643
644 None
645 }
646 }
647 }})
648 .filter_map(|maybe_data| futures::future::ready(
649 maybe_data
651 .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
652 .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
653 .ok()
654 .map(|data| (sender, ApplicationDataIn {
655 data,
656 packet_info: IncomingPacketInfo {
657 signals_from_sender: aux_info.packet_signals,
658 num_saved_surbs: aux_info.num_surbs,
659 }
660 })))
661 ))
662 .map(Ok)
663 .forward(api.0)
664 .instrument(tracing::trace_span!("msg protocol processing - ingress"))
665 .inspect(|_| tracing::warn!(task = "transport (protocol - msg ingress)", "long-running background task finished"))
666 .await;
667 }),
668 );
669
670 processes
671}