hopr_transport_protocol/msg/
processor.rs1use futures::{future::Either, SinkExt};
2use futures::{pin_mut, Sink};
3use hopr_crypto_packet::errors::PacketError;
4use hopr_db_api::protocol::TransportPacketWithChainData;
5use hopr_transport_identity::PeerId;
6use tracing::error;
7
8use hopr_async_runtime::prelude::sleep;
9use hopr_crypto_packet::errors::{
10 PacketError::{TagReplay, TransportError},
11 Result,
12};
13use hopr_crypto_types::prelude::*;
14use hopr_db_api::prelude::HoprDbProtocolOperations;
15use hopr_internal_types::prelude::*;
16use hopr_path::path::{Path, TransportPath};
17use hopr_primitive_types::prelude::*;
18
19use super::packet::OutgoingPacket;
20use crate::bloom;
21
22lazy_static::lazy_static! {
23 pub static ref DEFAULT_PRICE_PER_PACKET: U256 = 10000000000000000u128.into();
25}
26
27#[async_trait::async_trait]
28pub trait PacketWrapping {
29 type Input;
30
31 async fn send(&self, data: ApplicationData, path: TransportPath) -> Result<(PeerId, Box<[u8]>)>;
32}
33
34pub struct SendPkt {
35 pub peer: PeerId,
36 pub data: Box<[u8]>,
37}
38
39pub struct SendAck {
40 pub peer: PeerId,
41 pub ack: Acknowledgement,
42}
43
44pub enum RecvOperation {
45 Receive { data: ApplicationData, ack: SendAck },
46 Forward { msg: SendPkt, ack: SendAck },
47}
48
49#[async_trait::async_trait]
50pub trait PacketUnwrapping {
51 type Packet;
52
53 async fn recv(&self, peer: &PeerId, data: Box<[u8]>) -> Result<Self::Packet>;
54}
55
56#[derive(Debug, Clone)]
58pub struct PacketProcessor<Db>
59where
60 Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
61{
62 db: Db,
63 tbf: bloom::WrappedTagBloomFilter,
64 cfg: PacketInteractionConfig,
65}
66
67#[async_trait::async_trait]
68impl<Db> PacketWrapping for PacketProcessor<Db>
69where
70 Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
71{
72 type Input = ApplicationData;
73
74 #[tracing::instrument(level = "trace", skip(self, data))]
75 async fn send(&self, data: ApplicationData, path: TransportPath) -> Result<(PeerId, Box<[u8]>)> {
76 let path: std::result::Result<Vec<OffchainPublicKey>, hopr_primitive_types::errors::GeneralError> =
77 path.hops().iter().map(OffchainPublicKey::try_from).collect();
78
79 let packet = self
80 .db
81 .to_send(
82 data.to_bytes(),
83 self.cfg.chain_keypair.clone(),
84 path?,
85 self.determine_actual_outgoing_win_prob().await,
86 self.determine_actual_outgoing_ticket_price().await?,
87 )
88 .await
89 .map_err(|e| PacketError::PacketConstructionError(e.to_string()))?;
90
91 let packet: OutgoingPacket = packet
92 .try_into()
93 .map_err(|e: crate::errors::ProtocolError| PacketError::LogicError(e.to_string()))?;
94
95 Ok((packet.next_hop, packet.data))
96 }
97}
98
99#[async_trait::async_trait]
100impl<Db> PacketUnwrapping for PacketProcessor<Db>
101where
102 Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
103{
104 type Packet = RecvOperation;
105
106 #[tracing::instrument(level = "trace", skip(self, data))]
107 async fn recv(&self, peer: &PeerId, data: Box<[u8]>) -> Result<RecvOperation> {
108 let previous_hop = OffchainPublicKey::try_from(peer)
109 .map_err(|e| PacketError::LogicError(format!("failed to convert '{peer}' into the public key: {e}")))?;
110
111 let packet = self
112 .db
113 .from_recv(
114 data,
115 self.cfg.chain_keypair.clone(),
116 &self.cfg.packet_keypair,
117 previous_hop,
118 self.determine_actual_outgoing_win_prob().await,
119 self.determine_actual_outgoing_ticket_price().await?,
120 )
121 .await
122 .map_err(|e| match e {
123 hopr_db_api::errors::DbError::TicketValidationError(v) => {
124 PacketError::TicketValidation(hopr_crypto_packet::errors::TicketValidationError {
125 reason: v.1,
126 ticket: Box::new(v.0),
127 })
128 }
129 _ => PacketError::PacketConstructionError(e.to_string()),
130 })?;
131
132 if let TransportPacketWithChainData::Final { packet_tag, .. }
133 | TransportPacketWithChainData::Forwarded { packet_tag, .. } = &packet
134 {
135 if self.is_tag_replay(packet_tag).await {
136 return Err(TagReplay);
137 }
138 };
139
140 Ok(match packet {
141 TransportPacketWithChainData::Final {
142 previous_hop,
143 plain_text,
144 ack,
145 ..
146 } => {
147 let app_data = ApplicationData::from_bytes(plain_text.as_ref())?;
148 RecvOperation::Receive {
149 data: app_data,
150 ack: SendAck {
151 peer: previous_hop.into(),
152 ack,
153 },
154 }
155 }
156 TransportPacketWithChainData::Forwarded {
157 previous_hop,
158 next_hop,
159 data,
160 ack,
161 ..
162 } => RecvOperation::Forward {
163 msg: SendPkt {
164 peer: next_hop.into(),
165 data,
166 },
167 ack: SendAck {
168 peer: previous_hop.into(),
169 ack,
170 },
171 },
172 TransportPacketWithChainData::Outgoing { .. } => {
173 return Err(PacketError::LogicError(
174 "Attempting to process an outgoing packet in the incoming pipeline".into(),
175 ))
176 }
177 })
178 }
179}
180
181impl<Db> PacketProcessor<Db>
182where
183 Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
184{
185 pub fn new(db: Db, tbf: bloom::WrappedTagBloomFilter, cfg: PacketInteractionConfig) -> Self {
187 Self { db, tbf, cfg }
188 }
189
190 #[tracing::instrument(level = "trace", name = "check_tag_replay", skip(self, tag))]
191 pub async fn is_tag_replay(&self, tag: &PacketTag) -> bool {
195 self.tbf
196 .with_write_lock(|inner: &mut TagBloomFilter| inner.check_and_set(tag))
197 .await
198 }
199
200 async fn determine_actual_outgoing_ticket_price(&self) -> Result<Balance> {
203 let network_ticket_price =
205 self.db.get_network_ticket_price().await.map_err(|e| {
206 PacketError::LogicError(format!("failed to determine current network ticket price: {e}"))
207 })?;
208
209 Ok(self.cfg.outgoing_ticket_price.unwrap_or(network_ticket_price))
210 }
211
212 async fn determine_actual_outgoing_win_prob(&self) -> f64 {
213 let network_win_prob = self
215 .db
216 .get_network_winning_probability()
217 .await
218 .inspect_err(|error| error!(%error, "failed to determine current network winning probability"))
219 .ok();
220
221 self.cfg
226 .outgoing_ticket_win_prob
227 .or(network_win_prob)
228 .unwrap_or(DEFAULT_OUTGOING_TICKET_WIN_PROB)
229 }
230}
231
232#[derive(Debug)]
239pub struct PacketSendFinalizer {
240 tx: futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>,
241}
242
243impl PacketSendFinalizer {
244 pub fn finalize(self, result: std::result::Result<(), PacketError>) {
245 if self.tx.send(result).is_err() {
246 error!("Failed to notify the awaiter about the successful packet transmission")
247 }
248 }
249}
250
251impl From<futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>> for PacketSendFinalizer {
252 fn from(value: futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>) -> Self {
253 Self { tx: value }
254 }
255}
256
257#[derive(Debug)]
259pub struct PacketSendAwaiter {
260 rx: futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>,
261}
262
263impl From<futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>> for PacketSendAwaiter {
264 fn from(value: futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>) -> Self {
265 Self { rx: value }
266 }
267}
268
269impl PacketSendAwaiter {
270 #[tracing::instrument(level = "trace", skip(self))]
271 pub async fn consume_and_wait(self, until_timeout: std::time::Duration) -> Result<()> {
272 let timeout = sleep(until_timeout);
273 let rx = self.rx;
274 pin_mut!(rx, timeout);
275 match futures::future::select(rx, timeout).await {
276 Either::Left((Ok(Ok(v)), _)) => Ok(v),
277 Either::Left((Ok(Err(e)), _)) => Err(TransportError(e.to_string())),
278 Either::Left((Err(_), _)) => Err(TransportError("Canceled".to_owned())),
279 Either::Right(_) => Err(TransportError("Timed out on sending a packet".to_owned())),
280 }
281 }
282}
283
284pub type SendMsgInput = (ApplicationData, TransportPath, PacketSendFinalizer);
285
286#[derive(Debug, Clone)]
287pub struct MsgSender<T>
288where
289 T: Sink<SendMsgInput> + Send + Sync + Clone + 'static + std::marker::Unpin,
290{
291 tx: T,
292}
293
294impl<T> MsgSender<T>
295where
296 T: Sink<SendMsgInput> + Send + Sync + Clone + 'static + std::marker::Unpin,
297{
298 pub fn new(tx: T) -> Self {
299 Self { tx }
300 }
301
302 #[tracing::instrument(level = "trace", skip(self, data))]
304 pub async fn send_packet(&self, data: ApplicationData, path: TransportPath) -> Result<PacketSendAwaiter> {
305 let (tx, rx) = futures::channel::oneshot::channel::<std::result::Result<(), PacketError>>();
306
307 self.tx
308 .clone()
309 .send((data, path, tx.into()))
310 .await
311 .map_err(|_| TransportError("Failed to send a message".into()))
312 .map(move |_| {
313 let awaiter: PacketSendAwaiter = rx.into();
314 awaiter
315 })
316 }
317}
318
319#[derive(Clone, Debug)]
321pub struct PacketInteractionConfig {
322 pub packet_keypair: OffchainKeypair,
323 pub chain_keypair: ChainKeypair,
324 pub outgoing_ticket_win_prob: Option<f64>,
325 pub outgoing_ticket_price: Option<Balance>,
326}
327
328impl PacketInteractionConfig {
329 pub fn new(
330 packet_keypair: &OffchainKeypair,
331 chain_keypair: &ChainKeypair,
332 outgoing_ticket_win_prob: Option<f64>,
333 outgoing_ticket_price: Option<Balance>,
334 ) -> Self {
335 Self {
336 packet_keypair: packet_keypair.clone(),
337 chain_keypair: chain_keypair.clone(),
338 outgoing_ticket_win_prob,
339 outgoing_ticket_price,
340 }
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use anyhow::Context;
347 use async_std::future::timeout;
348 use futures::StreamExt;
349
350 use super::*;
351 use std::time::Duration;
352
353 #[async_std::test]
354 pub async fn packet_send_finalizer_is_triggered() {
355 let (tx, rx) = futures::channel::oneshot::channel::<std::result::Result<(), PacketError>>();
356
357 let finalizer: PacketSendFinalizer = tx.into();
358 let awaiter: PacketSendAwaiter = rx.into();
359
360 finalizer.finalize(Ok(()));
361
362 let result = awaiter.consume_and_wait(Duration::from_millis(20)).await;
363
364 assert!(result.is_ok());
365 }
366
367 #[async_std::test]
368 pub async fn message_sender_operation_reacts_on_finalizer_closure() -> anyhow::Result<()> {
369 let (tx, mut rx) = futures::channel::mpsc::unbounded::<SendMsgInput>();
370
371 let sender = MsgSender::new(tx);
372
373 let expected_data = ApplicationData::from_bytes(&[0x01, 0x02, 0x03])?;
374 let expected_path = TransportPath::direct(PeerId::random());
375
376 let result = sender.send_packet(expected_data.clone(), expected_path.clone()).await;
377 assert!(result.is_ok());
378
379 let received = rx.next();
380 let (data, path, finalizer) = timeout(Duration::from_millis(20), received)
381 .await
382 .context("Timeout")?
383 .context("value should be present")?;
384
385 assert_eq!(data, expected_data);
386 assert_eq!(path, expected_path);
387
388 async_std::task::spawn(async move {
389 async_std::task::sleep(Duration::from_millis(3)).await;
390 finalizer.finalize(Ok(()))
391 });
392
393 assert!(result
394 .context("Awaiter must be present")?
395 .consume_and_wait(Duration::from_millis(10))
396 .await
397 .is_ok());
398
399 Ok(())
400 }
401}