hopr_transport_protocol/
processor.rs

1use futures::{Sink, SinkExt, future::Either, pin_mut};
2use hopr_async_runtime::prelude::sleep;
3pub use hopr_crypto_packet::errors::PacketError;
4use hopr_crypto_packet::errors::{PacketError::TransportError, Result};
5use hopr_crypto_types::prelude::*;
6use hopr_db_api::{
7    prelude::HoprDbProtocolOperations,
8    protocol::{IncomingPacket, OutgoingPacket},
9};
10use hopr_internal_types::prelude::*;
11use hopr_network_types::prelude::ResolvedTransportRouting;
12use hopr_primitive_types::prelude::*;
13use hopr_protocol_app::prelude::*;
14use tracing::error;
15
16lazy_static::lazy_static! {
17    /// Fixed price per packet to 0.01 HOPR
18    pub static ref DEFAULT_PRICE_PER_PACKET: U256 = 10000000000000000u128.into();
19}
20
21#[async_trait::async_trait]
22pub trait PacketWrapping {
23    type Input;
24
25    async fn send(&self, data: ApplicationDataOut, routing: ResolvedTransportRouting) -> Result<OutgoingPacket>;
26}
27
28#[async_trait::async_trait]
29pub trait PacketUnwrapping {
30    type Packet;
31
32    async fn recv(&self, peer: OffchainPublicKey, data: Box<[u8]>) -> Result<Self::Packet>;
33}
34
35/// Implements protocol acknowledgement logic for msg packets
36#[derive(Debug, Clone)]
37pub struct PacketProcessor<Db>
38where
39    Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
40{
41    db: Db,
42    cfg: PacketInteractionConfig,
43}
44
45#[async_trait::async_trait]
46impl<Db> PacketWrapping for PacketProcessor<Db>
47where
48    Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
49{
50    type Input = ApplicationDataOut;
51
52    #[tracing::instrument(level = "trace", skip(self, data), ret(Debug), err)]
53    async fn send(&self, data: ApplicationDataOut, routing: ResolvedTransportRouting) -> Result<OutgoingPacket> {
54        let packet = self
55            .db
56            .to_send(
57                data.data.to_bytes(),
58                routing,
59                self.determine_actual_outgoing_win_prob().await,
60                self.determine_actual_outgoing_ticket_price().await?,
61                data.packet_info.unwrap_or_default().signals_to_destination,
62            )
63            .await
64            .map_err(|e| PacketError::PacketConstructionError(e.to_string()))?;
65
66        Ok(packet)
67    }
68}
69
70#[async_trait::async_trait]
71impl<Db> PacketUnwrapping for PacketProcessor<Db>
72where
73    Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
74{
75    type Packet = IncomingPacket;
76
77    #[tracing::instrument(level = "trace", skip(self, data))]
78    async fn recv(&self, previous_hop: OffchainPublicKey, data: Box<[u8]>) -> Result<Self::Packet> {
79        let packet = self
80            .db
81            .from_recv(
82                data,
83                &self.cfg.packet_keypair,
84                previous_hop,
85                self.determine_actual_outgoing_win_prob().await,
86                self.determine_actual_outgoing_ticket_price().await?,
87            )
88            .await
89            .map_err(|e| match e {
90                hopr_db_api::errors::DbError::TicketValidationError(v) => {
91                    PacketError::TicketValidation(hopr_crypto_packet::errors::TicketValidationError {
92                        reason: v.1,
93                        ticket: Box::new(v.0),
94                    })
95                }
96                hopr_db_api::errors::DbError::PossibleAdversaryError(e) => PacketError::PacketDecodingError(e),
97                _ => PacketError::PacketConstructionError(e.to_string()),
98            })?;
99
100        Ok(packet)
101    }
102}
103
104impl<Db> PacketProcessor<Db>
105where
106    Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
107{
108    /// Creates a new instance given the DB and configuration.
109    pub fn new(db: Db, cfg: PacketInteractionConfig) -> Self {
110        Self { db, cfg }
111    }
112
113    // NOTE: as opposed to the winning probability, the ticket price does not have
114    // a reasonable default and therefore the operation fails
115    async fn determine_actual_outgoing_ticket_price(&self) -> Result<HoprBalance> {
116        // This operation hits the cache unless the new value is fetched for the first time
117        let network_ticket_price =
118            self.db.get_network_ticket_price().await.map_err(|e| {
119                PacketError::LogicError(format!("failed to determine current network ticket price: {e}"))
120            })?;
121
122        Ok(self.cfg.outgoing_ticket_price.unwrap_or(network_ticket_price))
123    }
124
125    async fn determine_actual_outgoing_win_prob(&self) -> WinningProbability {
126        // This operation hits the cache unless the new value is fetched for the first time
127        let network_win_prob = self
128            .db
129            .get_network_winning_probability()
130            .await
131            .inspect_err(|error| error!(%error, "failed to determine current network winning probability"))
132            .ok();
133
134        // If no explicit winning probability is configured, use the network value
135        // or 1 if the network value was not determined.
136        // This code does not take the max from those, as it is the upper layer's responsibility
137        // to ensure the configured value is not smaller than the network value.
138        self.cfg
139            .outgoing_ticket_win_prob
140            .or(network_win_prob)
141            .unwrap_or_default() // Absolute default WinningProbability is 1.0
142    }
143}
144
145/// Packet send finalizer notifying the awaiting future once the send has been acknowledged.
146///
147/// This is a remnant of the original logic that assumed that the p2p transport is invokable
148/// and its result can be directly polled. As the `send_packet` logic is the only part visible
149/// outside the communication loop from the protocol side, it is retained pending a larger
150/// architectural overhaul of the hopr daemon.
151#[derive(Debug)]
152pub struct PacketSendFinalizer {
153    tx: futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>,
154}
155
156impl PacketSendFinalizer {
157    pub fn finalize(self, result: std::result::Result<(), PacketError>) {
158        if self.tx.send(result).is_err() {
159            tracing::trace!("Failed to notify the awaiter about the successful packet transmission")
160        }
161    }
162}
163
164impl From<futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>> for PacketSendFinalizer {
165    fn from(value: futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>) -> Self {
166        Self { tx: value }
167    }
168}
169
170/// Await on future until the confirmation of packet reception is received
171#[derive(Debug)]
172pub struct PacketSendAwaiter {
173    rx: futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>,
174}
175
176impl From<futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>> for PacketSendAwaiter {
177    fn from(value: futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>) -> Self {
178        Self { rx: value }
179    }
180}
181
182impl PacketSendAwaiter {
183    #[tracing::instrument(level = "trace", skip(self))]
184    pub async fn consume_and_wait(self, until_timeout: std::time::Duration) -> Result<()> {
185        let timeout = sleep(until_timeout);
186        let rx = self.rx;
187        pin_mut!(rx, timeout);
188        match futures::future::select(rx, timeout).await {
189            Either::Left((Ok(Ok(v)), _)) => Ok(v),
190            Either::Left((Ok(Err(e)), _)) => Err(TransportError(e.to_string())),
191            Either::Left((Err(_), _)) => Err(TransportError("Canceled".to_owned())),
192            Either::Right(_) => Err(TransportError("Timed out on sending a packet".to_owned())),
193        }
194    }
195}
196
197pub type SendMsgInput = (ApplicationDataOut, ResolvedTransportRouting, PacketSendFinalizer);
198
199#[derive(Debug, Clone)]
200pub struct MsgSender<T>
201where
202    T: Sink<SendMsgInput> + Send + Sync + Clone + 'static + std::marker::Unpin,
203{
204    tx: T,
205}
206
207impl<T> MsgSender<T>
208where
209    T: Sink<SendMsgInput> + Send + Sync + Clone + 'static + std::marker::Unpin,
210{
211    pub fn new(tx: T) -> Self {
212        Self { tx }
213    }
214
215    /// Pushes a new packet into processing.
216    #[tracing::instrument(level = "trace", skip(self, data))]
217    pub async fn send_packet(
218        &self,
219        data: ApplicationDataOut,
220        routing: ResolvedTransportRouting,
221    ) -> Result<PacketSendAwaiter> {
222        let (tx, rx) = futures::channel::oneshot::channel::<std::result::Result<(), PacketError>>();
223
224        self.tx
225            .clone()
226            .send((data, routing, tx.into()))
227            .await
228            .map_err(|_| TransportError("Failed to send a message".into()))
229            .map(move |_| {
230                let awaiter: PacketSendAwaiter = rx.into();
231                awaiter
232            })
233    }
234}
235
236/// Configuration parameters for the packet interaction.
237#[derive(Clone, Debug)]
238pub struct PacketInteractionConfig {
239    pub packet_keypair: OffchainKeypair,
240    pub outgoing_ticket_win_prob: Option<WinningProbability>,
241    pub outgoing_ticket_price: Option<HoprBalance>,
242}
243
244#[cfg(test)]
245mod tests {
246    use std::time::Duration;
247
248    use anyhow::Context;
249    use futures::StreamExt;
250    use hopr_crypto_random::Randomizable;
251    use hopr_internal_types::prelude::HoprPseudonym;
252    use hopr_path::ValidatedPath;
253    use tokio::time::timeout;
254
255    use super::*;
256
257    #[tokio::test]
258    pub async fn packet_send_finalizer_is_triggered() {
259        let (tx, rx) = futures::channel::oneshot::channel::<std::result::Result<(), PacketError>>();
260
261        let finalizer: PacketSendFinalizer = tx.into();
262        let awaiter: PacketSendAwaiter = rx.into();
263
264        finalizer.finalize(Ok(()));
265
266        let result = awaiter.consume_and_wait(Duration::from_millis(20)).await;
267
268        assert!(result.is_ok());
269    }
270
271    #[tokio::test]
272    pub async fn message_sender_operation_reacts_on_finalizer_closure() -> anyhow::Result<()> {
273        let (tx, mut rx) = futures::channel::mpsc::unbounded::<SendMsgInput>();
274
275        let sender = MsgSender::new(tx);
276
277        let expected_data = ApplicationData::try_from([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09].as_ref())?;
278        let expected_path = ValidatedPath::direct(
279            *OffchainKeypair::random().public(),
280            ChainKeypair::random().public().to_address(),
281        );
282
283        let routing = ResolvedTransportRouting::Forward {
284            pseudonym: HoprPseudonym::random(),
285            forward_path: expected_path.clone(),
286            return_paths: vec![],
287        };
288
289        let result = sender
290            .send_packet(
291                ApplicationDataOut::with_no_packet_info(expected_data.clone()),
292                routing.clone(),
293            )
294            .await;
295        assert!(result.is_ok());
296
297        let received = rx.next();
298        let (data, path, finalizer) = timeout(Duration::from_millis(20), received)
299            .await
300            .context("Timeout")?
301            .context("value should be present")?;
302
303        assert_eq!(data.data, expected_data);
304        assert!(matches!(path, ResolvedTransportRouting::Forward { forward_path,.. } if forward_path == expected_path));
305
306        tokio::task::spawn(async move {
307            tokio::time::sleep(Duration::from_millis(3)).await;
308            finalizer.finalize(Ok(()))
309        });
310
311        assert!(
312            result
313                .context("Awaiter must be present")?
314                .consume_and_wait(Duration::from_millis(10))
315                .await
316                .is_ok()
317        );
318
319        Ok(())
320    }
321}