1use futures::{SinkExt, StreamExt};
2use futures_time::future::FutureExt as TimeExt;
3use hopr_async_runtime::{AbortableList, spawn_as_abortable};
4use hopr_crypto_types::prelude::*;
5use hopr_internal_types::prelude::*;
6use hopr_network_types::{
7 prelude::*,
8 timeout::{SinkTimeoutError, TimeoutSinkExt, TimeoutStreamExt},
9};
10use hopr_primitive_types::prelude::Address;
11use hopr_protocol_app::prelude::*;
12use hopr_protocol_hopr::prelude::*;
13use rust_stream_ext_concurrent::then_concurrent::StreamThenConcurrentExt;
14use tracing::Instrument;
15
16use crate::PeerId;
17
18const TICKET_ACK_BUFFER_SIZE: usize = 1_000_000;
19const NUM_CONCURRENT_TICKET_ACK_PROCESSING: usize = 10;
20const ACK_OUT_BUFFER_SIZE: usize = 1_000_000;
21const NUM_CONCURRENT_ACK_OUT_PROCESSING: usize = 10;
22const QUEUE_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50);
23const PACKET_DECODING_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(150);
24
25#[cfg(all(feature = "prometheus", not(test)))]
26lazy_static::lazy_static! {
27 static ref METRIC_PACKET_COUNT: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
28 "hopr_packets_count",
29 "Number of processed packets of different types (sent, received, forwarded)",
30 &["type"]
31 ).unwrap();
32}
33
34#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, strum::Display)]
35pub enum PacketPipelineProcesses {
36 #[strum(to_string = "HOPR [msg] - ingress")]
37 MsgIn,
38 #[strum(to_string = "HOPR [msg] - egress")]
39 MsgOut,
40 #[strum(to_string = "HOPR [ack] - egress")]
41 AckOut,
42 #[strum(to_string = "HOPR [ack] - ingress")]
43 AckIn,
44 #[strum(to_string = "HOPR [msg] - mixer")]
45 Mixer,
46}
47
48#[derive(Debug, Clone, strum::EnumIs, strum::EnumTryAs)]
50pub enum TicketEvent {
51 WinningTicket(Box<RedeemableTicket>),
53 RejectedTicket(Box<Ticket>, Option<Address>),
55}
56
57async fn start_outgoing_packet_pipeline<AppOut, E, WOut, WOutErr>(
59 app_outgoing: AppOut,
60 encoder: std::sync::Arc<E>,
61 wire_outgoing: WOut,
62) where
63 AppOut: futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
64 E: PacketEncoder + Send + 'static,
65 WOut: futures::Sink<(PeerId, Box<[u8]>), Error = SinkTimeoutError<WOutErr>> + Clone + Unpin + Send + 'static,
66 WOutErr: std::error::Error,
67{
68 let res = app_outgoing
69 .then_concurrent(|(routing, data)| {
70 let encoder = encoder.clone();
71 async move {
72 match encoder
73 .encode_packet(
74 data.data.to_bytes(),
75 routing,
76 data.packet_info
77 .map(|data| data.signals_to_destination)
78 .unwrap_or_default(),
79 )
80 .await
81 {
82 Ok(packet) => {
83 #[cfg(all(feature = "prometheus", not(test)))]
84 METRIC_PACKET_COUNT.increment(&["sent"]);
85
86 tracing::trace!(peer = %packet.next_hop, "protocol message out");
87 Some((packet.next_hop.into(), packet.data))
88 }
89 Err(error) => {
90 tracing::error!(%error, "packet could not be wrapped for sending");
91 None
92 }
93 }
94 }
95 })
96 .filter_map(futures::future::ready)
97 .map(Ok)
98 .forward_to_timeout(wire_outgoing)
99 .instrument(tracing::trace_span!("msg protocol processing - egress"))
100 .await;
101
102 if let Err(error) = res {
103 tracing::error!(
104 task = "transport (protocol - msg egress)",
105 %error,
106 "long-running background task finished with error"
107 );
108 } else {
109 tracing::warn!(
110 task = "transport (protocol - msg egress)",
111 "long-running background task finished"
112 )
113 }
114}
115
116#[allow(clippy::too_many_arguments)] async fn start_incoming_packet_pipeline<WIn, WOut, D, T, TEvt, AckIn, AckOut, AppIn, AppInErr>(
124 wire_incoming: WIn,
125 decoder: std::sync::Arc<D>,
126 ticket_proc: std::sync::Arc<T>,
127 ticket_events: TEvt,
128 ack_outgoing: AckOut,
129 wire_outgoing: WOut,
130 ack_incoming: AckIn,
131 app_incoming: AppIn,
132) where
133 WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
134 WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
135 WOut::Error: std::error::Error,
136 D: PacketDecoder + Send + 'static,
137 T: UnacknowledgedTicketProcessor + Send + 'static,
138 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
139 TEvt::Error: std::error::Error,
140 AckIn: futures::Sink<(OffchainPublicKey, Acknowledgement)> + Send + Unpin + Clone + 'static,
141 AckIn::Error: std::error::Error,
142 AckOut: futures::Sink<(OffchainPublicKey, Option<HalfKey>)> + Send + Unpin + Clone + 'static,
143 AckOut::Error: std::error::Error,
144 AppIn: futures::Sink<(HoprPseudonym, ApplicationDataIn), Error = SinkTimeoutError<AppInErr>> + Send + 'static,
145 AppInErr: std::error::Error,
146{
147 let ack_outgoing_success = ack_outgoing.clone();
148 let ack_outgoing_failure = ack_outgoing;
149 let ticket_proc_success = ticket_proc;
150
151 let res = wire_incoming
152 .then_concurrent(move |(peer, data)| {
153 let decoder = decoder.clone();
154 let mut ack_outgoing_failure = ack_outgoing_failure.clone();
155 let mut ticket_events_reject = ticket_events.clone();
156
157 tracing::trace!(%peer, "protocol message in");
158
159 async move {
160 match decoder.decode(peer, data)
161 .timeout(futures_time::time::Duration::from(PACKET_DECODING_TIMEOUT))
162 .await {
163 Ok(Ok(packet)) => {
164 tracing::trace!(%peer, ?packet, "successfully decoded incoming packet");
165 Some(packet)
166 },
167 Ok(Err(IncomingPacketError::Undecodable(error))) => {
168 tracing::trace!(%peer, %error, "not sending ack back on undecodable packet - possible adversarial behavior");
172 None
173 },
174 Ok(Err(IncomingPacketError::ProcessingError(sender, error))) => {
175 tracing::error!(%peer, %error, "failed to process the decoded packet");
176 ack_outgoing_failure
178 .send((sender, None))
179 .await
180 .unwrap_or_else(|error| {
181 tracing::error!(%error, "failed to send ack to the egress queue");
182 });
183 None
184 },
185 Ok(Err(IncomingPacketError::InvalidTicket(sender, error))) => {
186 tracing::error!(%peer, %error, "failed to validate ticket on the received packet");
187 if let Err(error) = ticket_events_reject
188 .send(TicketEvent::RejectedTicket(error.ticket, error.issuer))
189 .await {
190 tracing::error!(%error, "failed to notify invalid ticket rejection");
191 }
192 ack_outgoing_failure
194 .send((sender, None))
195 .await
196 .unwrap_or_else(|error| {
197 tracing::error!(%error, "failed to send ack to the egress queue");
198 });
199 None
200 }
201 Err(_) => {
202 tracing::error!(%peer, "dropped incoming packet: failed to decode packet within {:?}", PACKET_DECODING_TIMEOUT);
204 None
205 }
206 }
207 }
208 })
209 .filter_map(futures::future::ready)
210 .then_concurrent(move |packet| {
211 let ticket_proc = ticket_proc_success.clone();
212 let mut wire_outgoing = wire_outgoing.clone();
213 let mut ack_incoming = ack_incoming.clone();
214 let mut ack_outgoing_success = ack_outgoing_success.clone();
215 async move {
216 match packet {
217 IncomingPacket::Acknowledgement(ack) => {
218 let IncomingAcknowledgementPacket { previous_hop, received_ack: ack, .. } = *ack;
219 tracing::trace!(%previous_hop , "acknowledging ticket using received ack");
220 ack_incoming
221 .send((previous_hop, ack))
222 .await
223 .unwrap_or_else(|error| {
224 tracing::error!(%error, "failed dispatching received acknowledgement to the ticket ack queue");
225 });
226
227 None
229 },
230 IncomingPacket::Final(final_packet) => {
231 let IncomingFinalPacket {
232 previous_hop,
233 sender,
234 plain_text,
235 ack_key,
236 info,
237 ..
238 } = *final_packet;
239 tracing::trace!(%previous_hop, "incoming final packet");
240
241 ack_outgoing_success
243 .send((previous_hop, Some(ack_key)))
244 .await
245 .unwrap_or_else(|error| {
246 tracing::error!(%error, "failed to send ack to the egress queue");
247 });
248
249 #[cfg(all(feature = "prometheus", not(test)))]
250 METRIC_PACKET_COUNT.increment(&["received"]);
251
252 Some((sender, plain_text, info))
253 }
254 IncomingPacket::Forwarded(fwd_packet) => {
255 let IncomingForwardedPacket {
256 previous_hop,
257 next_hop,
258 data,
259 ack_key_prev_hop,
260 ack_challenge,
261 received_ticket,
262 ..
263 } = *fwd_packet;
264 if let Err(error) = ticket_proc.insert_unacknowledged_ticket(&next_hop, ack_challenge, received_ticket).await {
265 tracing::error!(%previous_hop, %next_hop, %error, "failed to insert unacknowledged ticket into the ticket processor");
266 return None;
267 }
268
269 tracing::trace!(%previous_hop, %next_hop, "forwarding packet to the next hop");
271
272 wire_outgoing
273 .send((next_hop.into(), data))
274 .await
275 .unwrap_or_else(|error| {
276 tracing::error!(%error, "failed to forward a packet to the transport layer");
277 });
278
279 #[cfg(all(feature = "prometheus", not(test)))]
280 METRIC_PACKET_COUNT.increment(&["forwarded"]);
281
282 tracing::trace!(%previous_hop, "acknowledging forwarded packet back");
284 ack_outgoing_success
285 .send((previous_hop, Some(ack_key_prev_hop)))
286 .await
287 .unwrap_or_else(|error| {
288 tracing::error!(%error, "failed to send ack to the egress queue");
289 });
290
291 None
292 }
293 }
294 }})
295 .filter_map(|maybe_data| futures::future::ready(
296 maybe_data
298 .and_then(|(sender, data, aux_info)| ApplicationData::try_from(data.as_ref())
299 .inspect_err(|error| tracing::error!(%sender, %error, "failed to decode application data"))
300 .ok()
301 .map(|data| (sender, ApplicationDataIn {
302 data,
303 packet_info: IncomingPacketInfo {
304 signals_from_sender: aux_info.packet_signals,
305 num_saved_surbs: aux_info.num_surbs,
306 }
307 })))
308 ))
309 .map(Ok)
310 .inspect(|data| tracing::trace!(?data, "application data in"))
311 .forward_to_timeout(app_incoming)
312 .instrument(tracing::trace_span!("msg protocol processing - ingress"))
313 .await;
314
315 if let Err(error) = res {
316 tracing::error!(
317 task = "transport (protocol - msg ingress)",
318 %error,
319 "long-running background task finished with error"
320 );
321 } else {
322 tracing::warn!(
323 task = "transport (protocol - msg ingress)",
324 "long-running background task finished"
325 )
326 }
327}
328
329async fn start_outgoing_ack_pipeline<AckOut, E, WOut>(
330 ack_outgoing: AckOut,
331 encoder: std::sync::Arc<E>,
332 packet_key: OffchainKeypair,
333 wire_outgoing: WOut,
334) where
335 AckOut: futures::Stream<Item = (OffchainPublicKey, Option<HalfKey>)> + Send + 'static,
336 E: PacketEncoder + Send + 'static,
337 WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
338 WOut::Error: std::error::Error,
339{
340 ack_outgoing
341 .for_each_concurrent(
342 NUM_CONCURRENT_ACK_OUT_PROCESSING,
343 move |(destination, maybe_ack_key)| {
344 let packet_key = packet_key.clone();
345 let encoder = encoder.clone();
346 let mut wire_outgoing = wire_outgoing.clone();
347
348 async move {
349 let ack = hopr_parallelize::cpu::spawn_blocking(move || {
351 maybe_ack_key
352 .map(|ack_key| VerifiedAcknowledgement::new(ack_key, &packet_key))
353 .unwrap_or_else(|| VerifiedAcknowledgement::random(&packet_key))
354 })
355 .await;
356
357 match encoder.encode_acknowledgement(ack, &destination).await {
358 Ok(ack_packet) => {
359 tracing::trace!(%destination, "acknowledgement out");
360 wire_outgoing
361 .send((ack_packet.next_hop.into(), ack_packet.data))
362 .await
363 .unwrap_or_else(|error| {
364 tracing::error!(%error, "failed to forward an acknowledgement to the transport layer");
365 });
366 }
367 Err(error) => tracing::error!(%error, "failed to create ack packet"),
368 }
369 }
370 }
371 ).await;
372
373 tracing::warn!(
374 task = "transport (protocol - ack outgoing)",
375 "long-running background task finished"
376 );
377}
378
379async fn start_incoming_ack_pipeline<AckIn, T, TEvt>(
380 ack_incoming: AckIn,
381 ticket_events: TEvt,
382 ticket_proc: std::sync::Arc<T>,
383) where
384 AckIn: futures::Stream<Item = (OffchainPublicKey, Acknowledgement)> + Send + 'static,
385 T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
386 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
387 TEvt::Error: std::error::Error,
388{
389 ack_incoming
390 .for_each_concurrent(NUM_CONCURRENT_TICKET_ACK_PROCESSING, move |(peer, ack)| {
391 let ticket_proc = ticket_proc.clone();
392 let mut ticket_evt = ticket_events.clone();
393 async move {
394 tracing::trace!(%peer, "received acknowledgement");
395 match ticket_proc.acknowledge_ticket(peer, ack).await {
396 Ok(Some(ResolvedAcknowledgement::RelayingWin(redeemable_ticket))) => {
397 tracing::trace!(%peer, "received ack for a winning ticket");
398 ticket_evt
399 .send(TicketEvent::WinningTicket(redeemable_ticket))
400 .await
401 .unwrap_or_else(|error| {
402 tracing::error!(%error, "failed to notify winning ticket");
403 });
404 }
405 Ok(Some(ResolvedAcknowledgement::RelayingLoss(_))) => {
406 tracing::trace!(%peer, "received ack for a losing ticket");
408 }
409 Ok(None) => {
410 tracing::trace!(%peer, "received unexpected acknowledgement");
413 }
414 Err(error) => {
415 tracing::error!(%error, "failed to acknowledge ticket");
416 }
417 }
418 }
419 })
420 .await;
421
422 tracing::warn!(
423 task = "transport (protocol - ticket acknowledgement)",
424 "long-running background task finished"
425 );
426}
427
428pub fn run_packet_pipeline<WIn, WOut, C, D, T, TEvt, AppOut, AppIn>(
433 packet_key: OffchainKeypair,
434 wire_msg: (WOut, WIn),
435 codec: (C, D),
436 ticket_proc: T,
437 ticket_events: TEvt,
438 api: (AppOut, AppIn),
439) -> AbortableList<PacketPipelineProcesses>
440where
441 WOut: futures::Sink<(PeerId, Box<[u8]>)> + Clone + Unpin + Send + 'static,
442 WOut::Error: std::error::Error,
443 WIn: futures::Stream<Item = (PeerId, Box<[u8]>)> + Send + 'static,
444 C: PacketEncoder + Sync + Send + 'static,
445 D: PacketDecoder + Sync + Send + 'static,
446 T: UnacknowledgedTicketProcessor + Sync + Send + 'static,
447 TEvt: futures::Sink<TicketEvent> + Clone + Unpin + Send + 'static,
448 TEvt::Error: std::error::Error,
449 AppOut: futures::Sink<(HoprPseudonym, ApplicationDataIn)> + Send + 'static,
450 AppOut::Error: std::error::Error,
451 AppIn: futures::Stream<Item = (ResolvedTransportRouting, ApplicationDataOut)> + Send + 'static,
452{
453 let mut processes = AbortableList::default();
454
455 #[cfg(all(feature = "prometheus", not(test)))]
456 {
457 lazy_static::initialize(&METRIC_PACKET_COUNT);
459 }
460
461 let (outgoing_ack_tx, outgoing_ack_rx) =
462 futures::channel::mpsc::channel::<(OffchainPublicKey, Option<HalfKey>)>(ACK_OUT_BUFFER_SIZE);
463
464 let (incoming_ack_tx, incoming_ack_rx) =
465 futures::channel::mpsc::channel::<(OffchainPublicKey, Acknowledgement)>(TICKET_ACK_BUFFER_SIZE);
466
467 let (wire_out, wire_in) = (wire_msg.0.with_timeout(QUEUE_SEND_TIMEOUT), wire_msg.1);
470 let (app_out, app_in) = (api.0.with_timeout(QUEUE_SEND_TIMEOUT), api.1);
471 let incoming_ack_tx = incoming_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
472 let outgoing_ack_tx = outgoing_ack_tx.with_timeout(QUEUE_SEND_TIMEOUT);
473 let ticket_events = ticket_events.with_timeout(QUEUE_SEND_TIMEOUT);
474
475 let encoder = std::sync::Arc::new(codec.0);
476 let decoder = std::sync::Arc::new(codec.1);
477 let ticket_proc = std::sync::Arc::new(ticket_proc);
478
479 processes.insert(
480 PacketPipelineProcesses::MsgOut,
481 spawn_as_abortable!(start_outgoing_packet_pipeline(
482 app_in,
483 encoder.clone(),
484 wire_out.clone(),
485 )),
486 );
487
488 processes.insert(
489 PacketPipelineProcesses::MsgIn,
490 spawn_as_abortable!(start_incoming_packet_pipeline(
491 wire_in,
492 decoder,
493 ticket_proc.clone(),
494 ticket_events.clone(),
495 outgoing_ack_tx,
496 wire_out.clone(),
497 incoming_ack_tx,
498 app_out,
499 )),
500 );
501
502 processes.insert(
503 PacketPipelineProcesses::AckOut,
504 spawn_as_abortable!(start_outgoing_ack_pipeline(
505 outgoing_ack_rx,
506 encoder,
507 packet_key.clone(),
508 wire_out,
509 )),
510 );
511
512 processes.insert(
513 PacketPipelineProcesses::AckIn,
514 spawn_as_abortable!(start_incoming_ack_pipeline(incoming_ack_rx, ticket_events, ticket_proc)),
515 );
516
517 processes
518}