1use futures::{SinkExt, StreamExt};
2use futures_time::{future::FutureExt as TimeExt, stream::StreamExt as TimeStreamExt};
3use hopr_async_runtime::{AbortableList, spawn_as_abortable};
4use hopr_crypto_types::prelude::*;
5use hopr_internal_types::prelude::*;
6use hopr_network_types::{
7 prelude::*,
8 timeout::{SinkTimeoutError, TimeoutSinkExt, TimeoutStreamExt},
9};
10use hopr_primitive_types::prelude::Address;
11use hopr_protocol_app::prelude::*;
12use hopr_protocol_hopr::prelude::*;
13use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
14use tracing::Instrument;
15use validator::{Validate, ValidationError, ValidationErrors};
16
17use crate::PeerId;
18
19const TICKET_ACK_BUFFER_SIZE: usize = 1_000_000;
20const NUM_CONCURRENT_TICKET_ACK_PROCESSING: usize = 10;
21const ACK_OUT_BUFFER_SIZE: usize = 1_000_000;
22const NUM_CONCURRENT_ACK_OUT_PROCESSING: usize = 10;
23const QUEUE_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
24const PACKET_DECODING_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(150);
25
26#[cfg(all(feature = "prometheus", not(test)))]
27lazy_static::lazy_static! {
28 static ref METRIC_PACKET_COUNT: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
29 "hopr_packets_count",
30 "Number of processed packets of different types (sent, received, forwarded)",
31 &["type"]
32 ).unwrap();
33}
34
35#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
36pub enum PacketPipelineProcesses {
37 #[strum(to_string = "HOPR [msg] - ingress")]
38 MsgIn,
39 #[strum(to_string = "HOPR [msg] - egress")]
40 MsgOut,
41 #[strum(to_string = "HOPR [ack] - egress")]
42 AckOut,
43 #[strum(to_string = "HOPR [ack] - ingress")]
44 AckIn,
45 #[strum(to_string = "HOPR [msg] - mixer")]
46 Mixer,
47}
48
49#[derive(Debug, Clone, strum::EnumIs, strum::EnumTryAs)]
51pub enum TicketEvent {
52 WinningTicket(Box<RedeemableTicket>),
54 RejectedTicket(Box<Ticket>, Option<Address>),
56}
57
58async fn start_outgoing_packet_pipeline<AppOut, E, WOut, WOutErr>(
60 app_outgoing: AppOut,
61 encoder: std::sync::Arc<E>,
62 wire_outgoing: WOut,
63) where
64 AppOut: futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
65 E: PacketEncoder + Send + 'static,
66 WOut: futures::Sink<(PeerId, Box<[u8]>), Error = SinkTimeoutError<WOutErr>> + Clone + Unpin + Send + 'static,
67 WOutErr: std::error::Error,
68{
69 let res = app_outgoing
70 .then_concurrent(|(routing, data)| {
71 let encoder = encoder.clone();
72 async move {
73 match encoder
74 .encode_packet(
75 data.data.to_bytes(),
76 routing,
77 data.packet_info
78 .map(|data| data.signals_to_destination)
79 .unwrap_or_default(),
80 )
81 .await
82 {
83 Ok(packet) => {
84 #[cfg(all(feature = "prometheus", not(test)))]
85 METRIC_PACKET_COUNT.increment(&["sent"]);
86
87 tracing::trace!(peer = packet.next_hop.to_peerid_str(), "protocol message out");
88 Some((packet.next_hop.into(), packet.data))
89 }
90 Err(error) => {
91 tracing::error!(%error, "packet could not be wrapped for sending");
92 None
93 }
94 }
95 }
96 })
97 .filter_map(futures::future::ready)
98 .map(Ok)
99 .forward_to_timeout(wire_outgoing)
100 .in_current_span()
101 .await;
102
103 if let Err(error) = res {
104 tracing::error!(
105 task = "transport (protocol - msg egress)",
106 %error,
107 "long-running background task finished with error"
108 );
109 } else {
110 tracing::warn!(
111 task = "transport (protocol - msg egress)",
112 "long-running background task finished"
113 )
114 }
115}
116
117async fn start_incoming_packet_pipeline<WIn, WOut, D, T, TEvt, AckIn, AckOut, AppIn, AppInErr>(
124 (wire_outgoing, wire_incoming): (WOut, WIn),
125 decoder: std::sync::Arc<D>,
126 ticket_proc: std::sync::Arc<T>,
127 ticket_events: TEvt,
128 (ack_outgoing, ack_incoming): (AckOut, AckIn),
129 app_incoming: AppIn,
130) where
131 WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
132 WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
133 WOut::Error: std::error::Error,
134 D: PacketDecoder + Send + 'static,
135 T: UnacknowledgedTicketProcessor + Send + 'static,
136 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
137 TEvt::Error: std::error::Error,
138 AckIn: futures::Sink<(OffchainPublicKey, Vec<Acknowledgement>)> + Send + Unpin + Clone + 'static,
139 AckIn::Error: std::error::Error,
140 AckOut: futures::Sink<(OffchainPublicKey, Option<HalfKey>)> + Send + Unpin + Clone + 'static,
141 AckOut::Error: std::error::Error,
142 AppIn: futures::Sink<(HoprPseudonym, ApplicationDataIn), Error = SinkTimeoutError<AppInErr>> + Send + 'static,
143 AppInErr: std::error::Error,
144{
145 let ack_outgoing_success = ack_outgoing.clone();
146 let ack_outgoing_failure = ack_outgoing;
147 let ticket_proc_success = ticket_proc;
148
149 let res = wire_incoming
150 .then_concurrent(move |(peer, data)| {
151 let decoder = decoder.clone();
152 let mut ack_outgoing_failure = ack_outgoing_failure.clone();
153 let mut ticket_events_reject = ticket_events.clone();
154
155 tracing::trace!("protocol message in");
156
157 async move {
158 match decoder.decode(peer, data)
159 .timeout(futures_time::time::Duration::from(PACKET_DECODING_TIMEOUT))
160 .await {
161 Ok(Ok(packet)) => {
162 tracing::trace!(?packet, "successfully decoded incoming packet");
163 Some(packet)
164 },
165 Ok(Err(IncomingPacketError::Undecodable(error))) => {
166 tracing::trace!(%error, "not sending ack back on undecodable packet - possible adversarial behavior");
170 None
171 },
172 Ok(Err(IncomingPacketError::ProcessingError(sender, error))) => {
173 tracing::error!(%error, "failed to process the decoded packet");
174 ack_outgoing_failure
176 .send((sender, None))
177 .await
178 .unwrap_or_else(|error| {
179 tracing::error!(%error, "failed to send ack to the egress queue");
180 });
181 None
182 },
183 Ok(Err(IncomingPacketError::InvalidTicket(sender, error))) => {
184 tracing::error!(%peer, %error, "failed to validate ticket on the received packet");
185 if let Err(error) = ticket_events_reject
186 .send(TicketEvent::RejectedTicket(error.ticket, error.issuer))
187 .await {
188 tracing::error!(%error, "failed to notify invalid ticket rejection");
189 }
190 ack_outgoing_failure
192 .send((sender, None))
193 .await
194 .unwrap_or_else(|error| {
195 tracing::error!(%error, "failed to send ack to the egress queue");
196 });
197 None
198 }
199 Err(_) => {
200 tracing::error!("dropped incoming packet: failed to decode packet within {:?}", PACKET_DECODING_TIMEOUT);
202 None
203 }
204 }
205 }.instrument(tracing::debug_span!("incoming_packet_decode", %peer))
206 })
207 .filter_map(futures::future::ready)
208 .then_concurrent(move |packet| {
209 let ticket_proc = ticket_proc_success.clone();
210 let mut wire_outgoing = wire_outgoing.clone();
211 let mut ack_incoming = ack_incoming.clone();
212 let mut ack_outgoing_success = ack_outgoing_success.clone();
213 async move {
214 match packet {
215 IncomingPacket::Acknowledgement(ack) => {
216 let IncomingAcknowledgementPacket { previous_hop, received_acks, .. } = *ack;
217 tracing::trace!(previous_hop = previous_hop.to_peerid_str(), num_acks = received_acks.len(), "incoming acknowledgements");
218 ack_incoming
219 .send((previous_hop, received_acks))
220 .await
221 .unwrap_or_else(|error| {
222 tracing::error!(%error, "failed dispatching received acknowledgement to the ticket ack queue");
223 });
224
225 None
227 },
228 IncomingPacket::Final(final_packet) => {
229 let IncomingFinalPacket {
230 previous_hop,
231 sender,
232 plain_text,
233 ack_key,
234 info,
235 ..
236 } = *final_packet;
237 tracing::trace!(previous_hop = previous_hop.to_peerid_str(), "incoming final packet");
238
239 ack_outgoing_success
241 .send((previous_hop, Some(ack_key)))
242 .await
243 .unwrap_or_else(|error| {
244 tracing::error!(%error, "failed to send ack to the egress queue");
245 });
246
247 #[cfg(all(feature = "prometheus", not(test)))]
248 METRIC_PACKET_COUNT.increment(&["received"]);
249
250 Some((sender, plain_text, info))
251 }
252 IncomingPacket::Forwarded(fwd_packet) => {
253 let IncomingForwardedPacket {
254 previous_hop,
255 next_hop,
256 data,
257 ack_key_prev_hop,
258 ack_challenge,
259 received_ticket,
260 ..
261 } = *fwd_packet;
262 if let Err(error) = ticket_proc.insert_unacknowledged_ticket(&next_hop, ack_challenge, received_ticket).await {
263 tracing::error!(
264 previous_hop = previous_hop.to_peerid_str(),
265 next_hop = next_hop.to_peerid_str(),
266 %error,
267 "failed to insert unacknowledged ticket into the ticket processor"
268 );
269 return None;
270 }
271
272 tracing::trace!(
274 previous_hop = previous_hop.to_peerid_str(),
275 next_hop = next_hop.to_peerid_str(),
276 "forwarding packet to the next hop"
277 );
278
279 wire_outgoing
280 .send((next_hop.into(), data))
281 .await
282 .unwrap_or_else(|error| {
283 tracing::error!(%error, "failed to forward a packet to the transport layer");
284 });
285
286 #[cfg(all(feature = "prometheus", not(test)))]
287 METRIC_PACKET_COUNT.increment(&["forwarded"]);
288
289 tracing::trace!(previous_hop = previous_hop.to_peerid_str(), "acknowledging forwarded packet back");
291 ack_outgoing_success
292 .send((previous_hop, Some(ack_key_prev_hop)))
293 .await
294 .unwrap_or_else(|error| {
295 tracing::error!(%error, "failed to send ack to the egress queue");
296 });
297
298 None
299 }
300 }
301 }})
302 .filter_map(|maybe_data| futures::future::ready(
303 maybe_data
305 .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
306 .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
307 .ok()
308 .map(|data| (sender, ApplicationDataIn {
309 data,
310 packet_info: IncomingPacketInfo {
311 signals_from_sender: aux_info.packet_signals,
312 num_saved_surbs: aux_info.num_surbs,
313 }
314 })))
315 ))
316 .map(Ok)
317 .forward_to_timeout(app_incoming)
318 .in_current_span()
319 .await;
320
321 if let Err(error) = res {
322 tracing::error!(
323 task = "transport (protocol - msg ingress)",
324 %error,
325 "long-running background task finished with error"
326 );
327 } else {
328 tracing::warn!(
329 task = "transport (protocol - msg ingress)",
330 "long-running background task finished"
331 )
332 }
333}
334
335async fn start_outgoing_ack_pipeline<AckOut, E, WOut>(
336 ack_outgoing: AckOut,
337 encoder: std::sync::Arc<E>,
338 cfg: AcknowledgementPipelineConfig,
339 packet_key: OffchainKeypair,
340 wire_outgoing: WOut,
341) where
342 AckOut: futures::Stream<Item = (OffchainPublicKey, Option<HalfKey>)> + Send + 'static,
343 E: PacketEncoder + Send + 'static,
344 WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
345 WOut::Error: std::error::Error,
346{
347 ack_outgoing
348 .then(move |(destination, maybe_ack_key)|{
349 let packet_key = packet_key.clone();
350 async move {
351 let ack = hopr_parallelize::cpu::spawn_blocking(move || {
353 maybe_ack_key
354 .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &packet_key))
355 .unwrap_or_else(|| VerifiedAcknowledgement::random(&packet_key))
356 })
357 .await;
358 (destination, ack)
359 }
360 })
361 .buffer(futures_time::time::Duration::from(cfg.ack_buffer_interval))
362 .filter(|acks| futures::future::ready(!acks.is_empty()))
363 .flat_map(|buffered_acks| {
364 let mut groups = halfbrown::HashMap::<OffchainPublicKey, Vec<VerifiedAcknowledgement>, ahash::RandomState>::with_capacity_and_hasher(
368 cfg.ack_grouping_capacity,
369 ahash::RandomState::default()
370 );
371 for (dst, ack) in buffered_acks {
372 groups
373 .entry(dst)
374 .and_modify(|v| v.push(ack))
375 .or_insert_with(|| vec![ack]);
376 }
377 tracing::trace!(
378 num_groups = groups.len(),
379 num_acks = groups.values().map(|v| v.len()).sum::<usize>(),
380 "acknowledgements grouped"
381 );
382 futures::stream::iter(groups)
383 })
384 .for_each_concurrent(
385 NUM_CONCURRENT_ACK_OUT_PROCESSING,
386 move |(destination, acks)| {
387 let encoder = encoder.clone();
388 let mut wire_outgoing = wire_outgoing.clone();
389 async move {
390 for ack_chunk in acks.chunks(MAX_ACKNOWLEDGEMENTS_BATCH_SIZE) {
392 match encoder.encode_acknowledgements(ack_chunk, &destination).await {
393 Ok(ack_packet) => {
394 wire_outgoing
395 .feed((ack_packet.next_hop.into(), ack_packet.data))
396 .await
397 .unwrap_or_else(|error| {
398 tracing::error!(%error, "failed to forward an acknowledgement to the transport layer");
399 });
400 }
401 Err(error) => tracing::error!(%error, "failed to create ack packet"),
402 }
403 }
404 if let Err(error) = wire_outgoing.flush().await {
405 tracing::error!(%error, "failed to flush acknowledgements batch to the transport layer");
406 }
407 tracing::trace!("acknowledgements out");
408 }.instrument(tracing::debug_span!("outgoing_ack_batch", peer = destination.to_peerid_str()))
409 }
410 )
411 .in_current_span()
412 .await;
413
414 tracing::warn!(
415 task = "transport (protocol - ack outgoing)",
416 "long-running background task finished"
417 );
418}
419
420async fn start_incoming_ack_pipeline<AckIn, T, TEvt>(
421 ack_incoming: AckIn,
422 ticket_events: TEvt,
423 ticket_proc: std::sync::Arc<T>,
424) where
425 AckIn: futures::Stream<Item = (OffchainPublicKey, Vec<Acknowledgement>)> + Send + 'static,
426 T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
427 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
428 TEvt::Error: std::error::Error,
429{
430 ack_incoming
431 .for_each_concurrent(NUM_CONCURRENT_TICKET_ACK_PROCESSING, move |(peer, acks)| {
432 let ticket_proc = ticket_proc.clone();
433 let mut ticket_evt = ticket_events.clone();
434 async move {
435 tracing::trace!(num = acks.len(), "received acknowledgements");
436 match ticket_proc.acknowledge_tickets(peer, acks).await {
437 Ok(resolutions) if !resolutions.is_empty() => {
438 let resolutions_iter = resolutions.into_iter().filter_map(|resolution| match resolution {
439 ResolvedAcknowledgement::RelayingWin(redeemable_ticket) => {
440 tracing::trace!("received ack for a winning ticket");
441 Some(Ok(TicketEvent::WinningTicket(redeemable_ticket)))
442 }
443 ResolvedAcknowledgement::RelayingLoss(_) => {
444 tracing::trace!("received ack for a losing ticket");
446 None
447 }
448 });
449
450 if let Err(error) = ticket_evt.send_all(&mut futures::stream::iter(resolutions_iter)).await {
452 tracing::error!(%error, "failed to notify ticket resolutions");
453 }
454 }
455 Ok(_) => {
456 tracing::debug!("acknowledgement batch could not acknowledge any ticket");
457 }
458 Err(TicketAcknowledgementError::UnexpectedAcknowledgement) => {
459 tracing::trace!("received unexpected acknowledgement");
462 }
463 Err(error) => {
464 tracing::error!(%error, "failed to acknowledge ticket");
465 }
466 }
467 }
468 .instrument(tracing::debug_span!("incoming_ack_batch", peer = peer.to_peerid_str()))
469 })
470 .in_current_span()
471 .await;
472
473 tracing::warn!(
474 task = "transport (protocol - ticket acknowledgement)",
475 "long-running background task finished"
476 );
477}
478
479fn default_ack_buffer_interval() -> std::time::Duration {
480 std::time::Duration::from_millis(200)
481}
482
483fn default_ack_grouping_capacity() -> usize {
484 5
485}
486
487#[derive(Debug, Copy, Clone, smart_default::SmartDefault, Eq, PartialEq)]
489#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
490pub struct AcknowledgementPipelineConfig {
491 #[default(default_ack_buffer_interval())]
495 #[cfg_attr(
496 feature = "serde",
497 serde(default = "default_ack_buffer_interval", with = "humantime_serde")
498 )]
499 pub ack_buffer_interval: std::time::Duration,
500 #[default(default_ack_grouping_capacity())]
507 #[cfg_attr(feature = "serde", serde(default = "default_ack_grouping_capacity"))]
508 pub ack_grouping_capacity: usize,
509}
510
511impl Validate for AcknowledgementPipelineConfig {
513 fn validate(&self) -> Result<(), ValidationErrors> {
514 let mut errors = ValidationErrors::new();
515 if self.ack_grouping_capacity == 0 {
516 errors.add("ack_grouping_capacity", ValidationError::new("must be greater than 0"));
517 }
518 if self.ack_buffer_interval < std::time::Duration::from_millis(10) {
519 errors.add("ack_buffer_interval", ValidationError::new("must be at least 10 ms"));
520 }
521 if errors.is_empty() { Ok(()) } else { Err(errors) }
522 }
523}
524
525#[tracing::instrument(skip_all, level = "trace", fields(me = packet_key.public().to_peerid_str()))]
530pub fn run_packet_pipeline<WIn, WOut, C, D, T, TEvt, AppOut, AppIn>(
531 packet_key: OffchainKeypair,
532 wire_msg: (WOut, WIn),
533 codec: (C, D),
534 ticket_proc: T,
535 ticket_events: TEvt,
536 ack_config: AcknowledgementPipelineConfig,
537 api: (AppOut, AppIn),
538) -> AbortableList<PacketPipelineProcesses>
539where
540 WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
541 WOut::Error: std::error::Error,
542 WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
543 C: PacketEncoder + Sync + Send + 'static,
544 D: PacketDecoder + Sync + Send + 'static,
545 T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
546 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
547 TEvt::Error: std::error::Error,
548 AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
549 AppOut::Error: std::error::Error,
550 AppIn: futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
551{
552 let mut processes = AbortableList::default();
553
554 #[cfg(all(feature = "prometheus", not(test)))]
555 {
556 lazy_static::initialize(&METRIC_PACKET_COUNT);
558 }
559
560 let (outgoing_ack_tx, outgoing_ack_rx) =
561 futures::channel::mpsc::channel::<(OffchainPublicKey, Option<HalfKey>)>(ACK_OUT_BUFFER_SIZE);
562
563 let (incoming_ack_tx, incoming_ack_rx) =
564 futures::channel::mpsc::channel::<(OffchainPublicKey, Vec<Acknowledgement>)>(TICKET_ACK_BUFFER_SIZE);
565
566 let (wire_out, wire_in) = (wire_msg.0.with_timeout(QUEUE_SEND_TIMEOUT), wire_msg.1);
569 let (app_out, app_in) = (api.0.with_timeout(QUEUE_SEND_TIMEOUT), api.1);
570 let incoming_ack_tx = incoming_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
571 let outgoing_ack_tx = outgoing_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
572 let ticket_events = ticket_events.with_timeout(QUEUE_SEND_TIMEOUT);
573
574 let encoder = std::sync::Arc::new(codec.0);
575 let decoder = std::sync::Arc::new(codec.1);
576 let ticket_proc = std::sync::Arc::new(ticket_proc);
577
578 processes.insert(
579 PacketPipelineProcesses::MsgOut,
580 spawn_as_abortable!(
581 start_outgoing_packet_pipeline(app_in, encoder.clone(), wire_out.clone(),).in_current_span()
582 ),
583 );
584
585 processes.insert(
586 PacketPipelineProcesses::MsgIn,
587 spawn_as_abortable!(
588 start_incoming_packet_pipeline(
589 (wire_out.clone(), wire_in),
590 decoder,
591 ticket_proc.clone(),
592 ticket_events.clone(),
593 (outgoing_ack_tx, incoming_ack_tx),
594 app_out,
595 )
596 .in_current_span()
597 ),
598 );
599
600 processes.insert(
601 PacketPipelineProcesses::AckOut,
602 spawn_as_abortable!(
603 start_outgoing_ack_pipeline(outgoing_ack_rx, encoder, ack_config, packet_key.clone(), wire_out,)
604 .in_current_span()
605 ),
606 );
607
608 processes.insert(
609 PacketPipelineProcesses::AckIn,
610 spawn_as_abortable!(start_incoming_ack_pipeline(incoming_ack_rx, ticket_events, ticket_proc).in_current_span()),
611 );
612
613 processes
614}