hopr_transport_protocol/msg/
processor.rs

1use futures::{future::Either, SinkExt};
2use futures::{pin_mut, Sink};
3use hopr_crypto_packet::errors::PacketError;
4use hopr_db_api::protocol::TransportPacketWithChainData;
5use hopr_transport_identity::PeerId;
6use tracing::error;
7
8use hopr_async_runtime::prelude::sleep;
9use hopr_crypto_packet::errors::{
10    PacketError::{TagReplay, TransportError},
11    Result,
12};
13use hopr_crypto_types::prelude::*;
14use hopr_db_api::prelude::HoprDbProtocolOperations;
15use hopr_internal_types::prelude::*;
16use hopr_path::path::{Path, TransportPath};
17use hopr_primitive_types::prelude::*;
18
19use super::packet::OutgoingPacket;
20use crate::bloom;
21
22lazy_static::lazy_static! {
23    /// Fixed price per packet to 0.01 HOPR
24    pub static ref DEFAULT_PRICE_PER_PACKET: U256 = 10000000000000000u128.into();
25}
26
27#[async_trait::async_trait]
28pub trait PacketWrapping {
29    type Input;
30
31    async fn send(&self, data: ApplicationData, path: TransportPath) -> Result<(PeerId, Box<[u8]>)>;
32}
33
34pub struct SendPkt {
35    pub peer: PeerId,
36    pub data: Box<[u8]>,
37}
38
39pub struct SendAck {
40    pub peer: PeerId,
41    pub ack: Acknowledgement,
42}
43
44pub enum RecvOperation {
45    Receive { data: ApplicationData, ack: SendAck },
46    Forward { msg: SendPkt, ack: SendAck },
47}
48
49#[async_trait::async_trait]
50pub trait PacketUnwrapping {
51    type Packet;
52
53    async fn recv(&self, peer: &PeerId, data: Box<[u8]>) -> Result<Self::Packet>;
54}
55
56/// Implements protocol acknowledgement logic for msg packets
57#[derive(Debug, Clone)]
58pub struct PacketProcessor<Db>
59where
60    Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
61{
62    db: Db,
63    tbf: bloom::WrappedTagBloomFilter,
64    cfg: PacketInteractionConfig,
65}
66
67#[async_trait::async_trait]
68impl<Db> PacketWrapping for PacketProcessor<Db>
69where
70    Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
71{
72    type Input = ApplicationData;
73
74    #[tracing::instrument(level = "trace", skip(self, data))]
75    async fn send(&self, data: ApplicationData, path: TransportPath) -> Result<(PeerId, Box<[u8]>)> {
76        let path: std::result::Result<Vec<OffchainPublicKey>, hopr_primitive_types::errors::GeneralError> =
77            path.hops().iter().map(OffchainPublicKey::try_from).collect();
78
79        let packet = self
80            .db
81            .to_send(
82                data.to_bytes(),
83                self.cfg.chain_keypair.clone(),
84                path?,
85                self.determine_actual_outgoing_win_prob().await,
86                self.determine_actual_outgoing_ticket_price().await?,
87            )
88            .await
89            .map_err(|e| PacketError::PacketConstructionError(e.to_string()))?;
90
91        let packet: OutgoingPacket = packet
92            .try_into()
93            .map_err(|e: crate::errors::ProtocolError| PacketError::LogicError(e.to_string()))?;
94
95        Ok((packet.next_hop, packet.data))
96    }
97}
98
99#[async_trait::async_trait]
100impl<Db> PacketUnwrapping for PacketProcessor<Db>
101where
102    Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
103{
104    type Packet = RecvOperation;
105
106    #[tracing::instrument(level = "trace", skip(self, data))]
107    async fn recv(&self, peer: &PeerId, data: Box<[u8]>) -> Result<RecvOperation> {
108        let previous_hop = OffchainPublicKey::try_from(peer)
109            .map_err(|e| PacketError::LogicError(format!("failed to convert '{peer}' into the public key: {e}")))?;
110
111        let packet = self
112            .db
113            .from_recv(
114                data,
115                self.cfg.chain_keypair.clone(),
116                &self.cfg.packet_keypair,
117                previous_hop,
118                self.determine_actual_outgoing_win_prob().await,
119                self.determine_actual_outgoing_ticket_price().await?,
120            )
121            .await
122            .map_err(|e| match e {
123                hopr_db_api::errors::DbError::TicketValidationError(v) => {
124                    PacketError::TicketValidation(hopr_crypto_packet::errors::TicketValidationError {
125                        reason: v.1,
126                        ticket: Box::new(v.0),
127                    })
128                }
129                _ => PacketError::PacketConstructionError(e.to_string()),
130            })?;
131
132        if let TransportPacketWithChainData::Final { packet_tag, .. }
133        | TransportPacketWithChainData::Forwarded { packet_tag, .. } = &packet
134        {
135            if self.is_tag_replay(packet_tag).await {
136                return Err(TagReplay);
137            }
138        };
139
140        Ok(match packet {
141            TransportPacketWithChainData::Final {
142                previous_hop,
143                plain_text,
144                ack,
145                ..
146            } => {
147                let app_data = ApplicationData::from_bytes(plain_text.as_ref())?;
148                RecvOperation::Receive {
149                    data: app_data,
150                    ack: SendAck {
151                        peer: previous_hop.into(),
152                        ack,
153                    },
154                }
155            }
156            TransportPacketWithChainData::Forwarded {
157                previous_hop,
158                next_hop,
159                data,
160                ack,
161                ..
162            } => RecvOperation::Forward {
163                msg: SendPkt {
164                    peer: next_hop.into(),
165                    data,
166                },
167                ack: SendAck {
168                    peer: previous_hop.into(),
169                    ack,
170                },
171            },
172            TransportPacketWithChainData::Outgoing { .. } => {
173                return Err(PacketError::LogicError(
174                    "Attempting to process an outgoing packet in the incoming pipeline".into(),
175                ))
176            }
177        })
178    }
179}
180
181impl<Db> PacketProcessor<Db>
182where
183    Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
184{
185    /// Creates a new instance given the DB and configuration.
186    pub fn new(db: Db, tbf: bloom::WrappedTagBloomFilter, cfg: PacketInteractionConfig) -> Self {
187        Self { db, tbf, cfg }
188    }
189
190    #[tracing::instrument(level = "trace", name = "check_tag_replay", skip(self, tag))]
191    /// Check whether the packet is replayed using a packet tag.
192    ///
193    /// There is a 0.1% chance that the positive result is not a replay because a Bloom filter is used.
194    pub async fn is_tag_replay(&self, tag: &PacketTag) -> bool {
195        self.tbf
196            .with_write_lock(|inner: &mut TagBloomFilter| inner.check_and_set(tag))
197            .await
198    }
199
200    // NOTE: as opposed to the winning probability, the ticket price does not have
201    // a reasonable default and therefore the operation fails
202    async fn determine_actual_outgoing_ticket_price(&self) -> Result<Balance> {
203        // This operation hits the cache unless the new value is fetched for the first time
204        let network_ticket_price =
205            self.db.get_network_ticket_price().await.map_err(|e| {
206                PacketError::LogicError(format!("failed to determine current network ticket price: {e}"))
207            })?;
208
209        Ok(self.cfg.outgoing_ticket_price.unwrap_or(network_ticket_price))
210    }
211
212    async fn determine_actual_outgoing_win_prob(&self) -> f64 {
213        // This operation hits the cache unless the new value is fetched for the first time
214        let network_win_prob = self
215            .db
216            .get_network_winning_probability()
217            .await
218            .inspect_err(|error| error!(%error, "failed to determine current network winning probability"))
219            .ok();
220
221        // If no explicit winning probability is configured, use the network value
222        // or 1 if the network value was not determined.
223        // This code does not take the max from those, as it is the upper layer's responsibility
224        // to ensure the configured value is not smaller than the network value.
225        self.cfg
226            .outgoing_ticket_win_prob
227            .or(network_win_prob)
228            .unwrap_or(DEFAULT_OUTGOING_TICKET_WIN_PROB)
229    }
230}
231
232/// Packet send finalizer notifying the awaiting future once the send has been acknowledged.
233///
234/// This is a remnant of the original logic that assumed that the p2p transport is invokable
235/// and its result can be directly polled. As the `send_packet` logic is the only part visible
236/// outside the communication loop from the protocol side, it is retained pending a larger
237/// architectural overhaul of the hopr daemon.
238#[derive(Debug)]
239pub struct PacketSendFinalizer {
240    tx: futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>,
241}
242
243impl PacketSendFinalizer {
244    pub fn finalize(self, result: std::result::Result<(), PacketError>) {
245        if self.tx.send(result).is_err() {
246            error!("Failed to notify the awaiter about the successful packet transmission")
247        }
248    }
249}
250
251impl From<futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>> for PacketSendFinalizer {
252    fn from(value: futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>) -> Self {
253        Self { tx: value }
254    }
255}
256
257/// Await on future until the confirmation of packet reception is received
258#[derive(Debug)]
259pub struct PacketSendAwaiter {
260    rx: futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>,
261}
262
263impl From<futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>> for PacketSendAwaiter {
264    fn from(value: futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>) -> Self {
265        Self { rx: value }
266    }
267}
268
269impl PacketSendAwaiter {
270    #[tracing::instrument(level = "trace", skip(self))]
271    pub async fn consume_and_wait(self, until_timeout: std::time::Duration) -> Result<()> {
272        let timeout = sleep(until_timeout);
273        let rx = self.rx;
274        pin_mut!(rx, timeout);
275        match futures::future::select(rx, timeout).await {
276            Either::Left((Ok(Ok(v)), _)) => Ok(v),
277            Either::Left((Ok(Err(e)), _)) => Err(TransportError(e.to_string())),
278            Either::Left((Err(_), _)) => Err(TransportError("Canceled".to_owned())),
279            Either::Right(_) => Err(TransportError("Timed out on sending a packet".to_owned())),
280        }
281    }
282}
283
284pub type SendMsgInput = (ApplicationData, TransportPath, PacketSendFinalizer);
285
286#[derive(Debug, Clone)]
287pub struct MsgSender<T>
288where
289    T: Sink<SendMsgInput> + Send + Sync + Clone + 'static + std::marker::Unpin,
290{
291    tx: T,
292}
293
294impl<T> MsgSender<T>
295where
296    T: Sink<SendMsgInput> + Send + Sync + Clone + 'static + std::marker::Unpin,
297{
298    pub fn new(tx: T) -> Self {
299        Self { tx }
300    }
301
302    /// Pushes a new packet into processing.
303    #[tracing::instrument(level = "trace", skip(self, data))]
304    pub async fn send_packet(&self, data: ApplicationData, path: TransportPath) -> Result<PacketSendAwaiter> {
305        let (tx, rx) = futures::channel::oneshot::channel::<std::result::Result<(), PacketError>>();
306
307        self.tx
308            .clone()
309            .send((data, path, tx.into()))
310            .await
311            .map_err(|_| TransportError("Failed to send a message".into()))
312            .map(move |_| {
313                let awaiter: PacketSendAwaiter = rx.into();
314                awaiter
315            })
316    }
317}
318
319/// Configuration parameters for the packet interaction.
320#[derive(Clone, Debug)]
321pub struct PacketInteractionConfig {
322    pub packet_keypair: OffchainKeypair,
323    pub chain_keypair: ChainKeypair,
324    pub outgoing_ticket_win_prob: Option<f64>,
325    pub outgoing_ticket_price: Option<Balance>,
326}
327
328impl PacketInteractionConfig {
329    pub fn new(
330        packet_keypair: &OffchainKeypair,
331        chain_keypair: &ChainKeypair,
332        outgoing_ticket_win_prob: Option<f64>,
333        outgoing_ticket_price: Option<Balance>,
334    ) -> Self {
335        Self {
336            packet_keypair: packet_keypair.clone(),
337            chain_keypair: chain_keypair.clone(),
338            outgoing_ticket_win_prob,
339            outgoing_ticket_price,
340        }
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use anyhow::Context;
347    use async_std::future::timeout;
348    use futures::StreamExt;
349
350    use super::*;
351    use std::time::Duration;
352
353    #[async_std::test]
354    pub async fn packet_send_finalizer_is_triggered() {
355        let (tx, rx) = futures::channel::oneshot::channel::<std::result::Result<(), PacketError>>();
356
357        let finalizer: PacketSendFinalizer = tx.into();
358        let awaiter: PacketSendAwaiter = rx.into();
359
360        finalizer.finalize(Ok(()));
361
362        let result = awaiter.consume_and_wait(Duration::from_millis(20)).await;
363
364        assert!(result.is_ok());
365    }
366
367    #[async_std::test]
368    pub async fn message_sender_operation_reacts_on_finalizer_closure() -> anyhow::Result<()> {
369        let (tx, mut rx) = futures::channel::mpsc::unbounded::<SendMsgInput>();
370
371        let sender = MsgSender::new(tx);
372
373        let expected_data = ApplicationData::from_bytes(&[0x01, 0x02, 0x03])?;
374        let expected_path = TransportPath::direct(PeerId::random());
375
376        let result = sender.send_packet(expected_data.clone(), expected_path.clone()).await;
377        assert!(result.is_ok());
378
379        let received = rx.next();
380        let (data, path, finalizer) = timeout(Duration::from_millis(20), received)
381            .await
382            .context("Timeout")?
383            .context("value should be present")?;
384
385        assert_eq!(data, expected_data);
386        assert_eq!(path, expected_path);
387
388        async_std::task::spawn(async move {
389            async_std::task::sleep(Duration::from_millis(3)).await;
390            finalizer.finalize(Ok(()))
391        });
392
393        assert!(result
394            .context("Awaiter must be present")?
395            .consume_and_wait(Duration::from_millis(10))
396            .await
397            .is_ok());
398
399        Ok(())
400    }
401}