hopr_transport_protocol/
processor.rs

1use futures::{Sink, SinkExt, future::Either, pin_mut};
2use hopr_async_runtime::prelude::sleep;
3pub use hopr_crypto_packet::errors::PacketError;
4use hopr_crypto_packet::errors::{PacketError::TransportError, Result};
5use hopr_crypto_types::prelude::*;
6use hopr_db_api::{
7    prelude::HoprDbProtocolOperations,
8    protocol::{IncomingPacket, OutgoingPacket},
9};
10use hopr_internal_types::prelude::*;
11use hopr_network_types::prelude::ResolvedTransportRouting;
12use hopr_primitive_types::prelude::*;
13use hopr_transport_identity::PeerId;
14use hopr_transport_packet::prelude::ApplicationData;
15use tracing::error;
16
17lazy_static::lazy_static! {
18    /// Fixed price per packet to 0.01 HOPR
19    pub static ref DEFAULT_PRICE_PER_PACKET: U256 = 10000000000000000u128.into();
20}
21
22#[async_trait::async_trait]
23pub trait PacketWrapping {
24    type Input;
25
26    async fn send(&self, data: ApplicationData, routing: ResolvedTransportRouting) -> Result<OutgoingPacket>;
27}
28
29#[async_trait::async_trait]
30pub trait PacketUnwrapping {
31    type Packet;
32
33    async fn recv(&self, peer: &PeerId, data: Box<[u8]>) -> Result<Option<Self::Packet>>;
34}
35
36/// Implements protocol acknowledgement logic for msg packets
37#[derive(Debug, Clone)]
38pub struct PacketProcessor<Db>
39where
40    Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
41{
42    db: Db,
43    cfg: PacketInteractionConfig,
44}
45
46#[async_trait::async_trait]
47impl<Db> PacketWrapping for PacketProcessor<Db>
48where
49    Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
50{
51    type Input = ApplicationData;
52
53    #[tracing::instrument(level = "trace", skip(self, data))]
54    async fn send(&self, data: ApplicationData, routing: ResolvedTransportRouting) -> Result<OutgoingPacket> {
55        let packet = self
56            .db
57            .to_send(
58                data.to_bytes(),
59                routing,
60                self.determine_actual_outgoing_win_prob().await,
61                self.determine_actual_outgoing_ticket_price().await?,
62            )
63            .await
64            .map_err(|e| PacketError::PacketConstructionError(e.to_string()))?;
65
66        Ok(packet)
67    }
68}
69
70#[async_trait::async_trait]
71impl<Db> PacketUnwrapping for PacketProcessor<Db>
72where
73    Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
74{
75    type Packet = IncomingPacket;
76
77    #[tracing::instrument(level = "trace", skip(self, data))]
78    async fn recv(&self, peer: &PeerId, data: Box<[u8]>) -> Result<Option<Self::Packet>> {
79        let previous_hop = OffchainPublicKey::try_from(peer)
80            .map_err(|e| PacketError::LogicError(format!("failed to convert '{peer}' into the public key: {e}")))?;
81
82        self.db
83            .from_recv(
84                data,
85                &self.cfg.packet_keypair,
86                previous_hop,
87                self.determine_actual_outgoing_win_prob().await,
88                self.determine_actual_outgoing_ticket_price().await?,
89            )
90            .await
91            .map_err(|e| match e {
92                hopr_db_api::errors::DbError::TicketValidationError(v) => {
93                    PacketError::TicketValidation(hopr_crypto_packet::errors::TicketValidationError {
94                        reason: v.1,
95                        ticket: Box::new(v.0),
96                    })
97                }
98                _ => PacketError::PacketConstructionError(e.to_string()),
99            })
100    }
101}
102
103impl<Db> PacketProcessor<Db>
104where
105    Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
106{
107    /// Creates a new instance given the DB and configuration.
108    pub fn new(db: Db, cfg: PacketInteractionConfig) -> Self {
109        Self { db, cfg }
110    }
111
112    // NOTE: as opposed to the winning probability, the ticket price does not have
113    // a reasonable default and therefore the operation fails
114    async fn determine_actual_outgoing_ticket_price(&self) -> Result<HoprBalance> {
115        // This operation hits the cache unless the new value is fetched for the first time
116        let network_ticket_price =
117            self.db.get_network_ticket_price().await.map_err(|e| {
118                PacketError::LogicError(format!("failed to determine current network ticket price: {e}"))
119            })?;
120
121        Ok(self.cfg.outgoing_ticket_price.unwrap_or(network_ticket_price))
122    }
123
124    async fn determine_actual_outgoing_win_prob(&self) -> WinningProbability {
125        // This operation hits the cache unless the new value is fetched for the first time
126        let network_win_prob = self
127            .db
128            .get_network_winning_probability()
129            .await
130            .inspect_err(|error| error!(%error, "failed to determine current network winning probability"))
131            .ok();
132
133        // If no explicit winning probability is configured, use the network value
134        // or 1 if the network value was not determined.
135        // This code does not take the max from those, as it is the upper layer's responsibility
136        // to ensure the configured value is not smaller than the network value.
137        self.cfg
138            .outgoing_ticket_win_prob
139            .or(network_win_prob)
140            .unwrap_or_default() // Absolute default WinningProbability is 1.0
141    }
142}
143
144/// Packet send finalizer notifying the awaiting future once the send has been acknowledged.
145///
146/// This is a remnant of the original logic that assumed that the p2p transport is invokable
147/// and its result can be directly polled. As the `send_packet` logic is the only part visible
148/// outside the communication loop from the protocol side, it is retained pending a larger
149/// architectural overhaul of the hopr daemon.
150#[derive(Debug)]
151pub struct PacketSendFinalizer {
152    tx: futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>,
153}
154
155impl PacketSendFinalizer {
156    pub fn finalize(self, result: std::result::Result<(), PacketError>) {
157        if self.tx.send(result).is_err() {
158            error!("Failed to notify the awaiter about the successful packet transmission")
159        }
160    }
161}
162
163impl From<futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>> for PacketSendFinalizer {
164    fn from(value: futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>) -> Self {
165        Self { tx: value }
166    }
167}
168
169/// Await on future until the confirmation of packet reception is received
170#[derive(Debug)]
171pub struct PacketSendAwaiter {
172    rx: futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>,
173}
174
175impl From<futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>> for PacketSendAwaiter {
176    fn from(value: futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>) -> Self {
177        Self { rx: value }
178    }
179}
180
181impl PacketSendAwaiter {
182    #[tracing::instrument(level = "trace", skip(self))]
183    pub async fn consume_and_wait(self, until_timeout: std::time::Duration) -> Result<()> {
184        let timeout = sleep(until_timeout);
185        let rx = self.rx;
186        pin_mut!(rx, timeout);
187        match futures::future::select(rx, timeout).await {
188            Either::Left((Ok(Ok(v)), _)) => Ok(v),
189            Either::Left((Ok(Err(e)), _)) => Err(TransportError(e.to_string())),
190            Either::Left((Err(_), _)) => Err(TransportError("Canceled".to_owned())),
191            Either::Right(_) => Err(TransportError("Timed out on sending a packet".to_owned())),
192        }
193    }
194}
195
196pub type SendMsgInput = (ApplicationData, ResolvedTransportRouting, PacketSendFinalizer);
197
198#[derive(Debug, Clone)]
199pub struct MsgSender<T>
200where
201    T: Sink<SendMsgInput> + Send + Sync + Clone + 'static + std::marker::Unpin,
202{
203    tx: T,
204}
205
206impl<T> MsgSender<T>
207where
208    T: Sink<SendMsgInput> + Send + Sync + Clone + 'static + std::marker::Unpin,
209{
210    pub fn new(tx: T) -> Self {
211        Self { tx }
212    }
213
214    /// Pushes a new packet into processing.
215    #[tracing::instrument(level = "trace", skip(self, data))]
216    pub async fn send_packet(
217        &self,
218        data: ApplicationData,
219        routing: ResolvedTransportRouting,
220    ) -> Result<PacketSendAwaiter> {
221        let (tx, rx) = futures::channel::oneshot::channel::<std::result::Result<(), PacketError>>();
222
223        self.tx
224            .clone()
225            .send((data, routing, tx.into()))
226            .await
227            .map_err(|_| TransportError("Failed to send a message".into()))
228            .map(move |_| {
229                let awaiter: PacketSendAwaiter = rx.into();
230                awaiter
231            })
232    }
233}
234
235/// Configuration parameters for the packet interaction.
236#[derive(Clone, Debug)]
237pub struct PacketInteractionConfig {
238    pub packet_keypair: OffchainKeypair,
239    pub outgoing_ticket_win_prob: Option<WinningProbability>,
240    pub outgoing_ticket_price: Option<HoprBalance>,
241}
242
243#[cfg(test)]
244mod tests {
245    use std::time::Duration;
246
247    use anyhow::Context;
248    use futures::StreamExt;
249    use hopr_crypto_random::Randomizable;
250    use hopr_internal_types::prelude::HoprPseudonym;
251    use hopr_path::ValidatedPath;
252    use tokio::time::timeout;
253
254    use super::*;
255
256    #[tokio::test]
257    pub async fn packet_send_finalizer_is_triggered() {
258        let (tx, rx) = futures::channel::oneshot::channel::<std::result::Result<(), PacketError>>();
259
260        let finalizer: PacketSendFinalizer = tx.into();
261        let awaiter: PacketSendAwaiter = rx.into();
262
263        finalizer.finalize(Ok(()));
264
265        let result = awaiter.consume_and_wait(Duration::from_millis(20)).await;
266
267        assert!(result.is_ok());
268    }
269
270    #[tokio::test]
271    pub async fn message_sender_operation_reacts_on_finalizer_closure() -> anyhow::Result<()> {
272        let (tx, mut rx) = futures::channel::mpsc::unbounded::<SendMsgInput>();
273
274        let sender = MsgSender::new(tx);
275
276        let expected_data = ApplicationData::from_bytes(&[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09])?;
277        let expected_path = ValidatedPath::direct(
278            *OffchainKeypair::random().public(),
279            ChainKeypair::random().public().to_address(),
280        );
281
282        let routing = ResolvedTransportRouting::Forward {
283            pseudonym: HoprPseudonym::random(),
284            forward_path: expected_path.clone(),
285            return_paths: vec![],
286        };
287
288        let result = sender.send_packet(expected_data.clone(), routing.clone()).await;
289        assert!(result.is_ok());
290
291        let received = rx.next();
292        let (data, path, finalizer) = timeout(Duration::from_millis(20), received)
293            .await
294            .context("Timeout")?
295            .context("value should be present")?;
296
297        assert_eq!(data, expected_data);
298        assert!(matches!(path, ResolvedTransportRouting::Forward { forward_path,.. } if forward_path == expected_path));
299
300        tokio::task::spawn(async move {
301            tokio::time::sleep(Duration::from_millis(3)).await;
302            finalizer.finalize(Ok(()))
303        });
304
305        assert!(
306            result
307                .context("Awaiter must be present")?
308                .consume_and_wait(Duration::from_millis(10))
309                .await
310                .is_ok()
311        );
312
313        Ok(())
314    }
315}