1use futures::{Sink, SinkExt, future::Either, pin_mut};
2use hopr_async_runtime::prelude::sleep;
3pub use hopr_crypto_packet::errors::PacketError;
4use hopr_crypto_packet::errors::{PacketError::TransportError, Result};
5use hopr_crypto_types::prelude::*;
6use hopr_db_api::{
7 prelude::HoprDbProtocolOperations,
8 protocol::{IncomingPacket, OutgoingPacket},
9};
10use hopr_internal_types::prelude::*;
11use hopr_network_types::prelude::ResolvedTransportRouting;
12use hopr_primitive_types::prelude::*;
13use hopr_protocol_app::prelude::*;
14use tracing::error;
15
16lazy_static::lazy_static! {
17 pub static ref DEFAULT_PRICE_PER_PACKET: U256 = 10000000000000000u128.into();
19}
20
21#[async_trait::async_trait]
22pub trait PacketWrapping {
23 type Input;
24
25 async fn send(&self, data: ApplicationDataOut, routing: ResolvedTransportRouting) -> Result<OutgoingPacket>;
26}
27
28#[async_trait::async_trait]
29pub trait PacketUnwrapping {
30 type Packet;
31
32 async fn recv(&self, peer: OffchainPublicKey, data: Box<[u8]>) -> Result<Self::Packet>;
33}
34
35#[derive(Debug, Clone)]
37pub struct PacketProcessor<Db>
38where
39 Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
40{
41 db: Db,
42 cfg: PacketInteractionConfig,
43}
44
45#[async_trait::async_trait]
46impl<Db> PacketWrapping for PacketProcessor<Db>
47where
48 Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
49{
50 type Input = ApplicationDataOut;
51
52 #[tracing::instrument(level = "trace", skip(self, data), ret(Debug), err)]
53 async fn send(&self, data: ApplicationDataOut, routing: ResolvedTransportRouting) -> Result<OutgoingPacket> {
54 let packet = self
55 .db
56 .to_send(
57 data.data.to_bytes(),
58 routing,
59 self.determine_actual_outgoing_win_prob().await,
60 self.determine_actual_outgoing_ticket_price().await?,
61 data.packet_info.unwrap_or_default().signals_to_destination,
62 )
63 .await
64 .map_err(|e| PacketError::PacketConstructionError(e.to_string()))?;
65
66 Ok(packet)
67 }
68}
69
70#[async_trait::async_trait]
71impl<Db> PacketUnwrapping for PacketProcessor<Db>
72where
73 Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
74{
75 type Packet = IncomingPacket;
76
77 #[tracing::instrument(level = "trace", skip(self, data))]
78 async fn recv(&self, previous_hop: OffchainPublicKey, data: Box<[u8]>) -> Result<Self::Packet> {
79 let packet = self
80 .db
81 .from_recv(
82 data,
83 &self.cfg.packet_keypair,
84 previous_hop,
85 self.determine_actual_outgoing_win_prob().await,
86 self.determine_actual_outgoing_ticket_price().await?,
87 )
88 .await
89 .map_err(|e| match e {
90 hopr_db_api::errors::DbError::TicketValidationError(v) => {
91 PacketError::TicketValidation(hopr_crypto_packet::errors::TicketValidationError {
92 reason: v.1,
93 ticket: Box::new(v.0),
94 })
95 }
96 hopr_db_api::errors::DbError::PossibleAdversaryError(e) => PacketError::PacketDecodingError(e),
97 _ => PacketError::PacketConstructionError(e.to_string()),
98 })?;
99
100 Ok(packet)
101 }
102}
103
104impl<Db> PacketProcessor<Db>
105where
106 Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
107{
108 pub fn new(db: Db, cfg: PacketInteractionConfig) -> Self {
110 Self { db, cfg }
111 }
112
113 async fn determine_actual_outgoing_ticket_price(&self) -> Result<HoprBalance> {
116 let network_ticket_price =
118 self.db.get_network_ticket_price().await.map_err(|e| {
119 PacketError::LogicError(format!("failed to determine current network ticket price: {e}"))
120 })?;
121
122 Ok(self.cfg.outgoing_ticket_price.unwrap_or(network_ticket_price))
123 }
124
125 async fn determine_actual_outgoing_win_prob(&self) -> WinningProbability {
126 let network_win_prob = self
128 .db
129 .get_network_winning_probability()
130 .await
131 .inspect_err(|error| error!(%error, "failed to determine current network winning probability"))
132 .ok();
133
134 self.cfg
139 .outgoing_ticket_win_prob
140 .or(network_win_prob)
141 .unwrap_or_default() }
143}
144
145#[derive(Debug)]
152pub struct PacketSendFinalizer {
153 tx: futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>,
154}
155
156impl PacketSendFinalizer {
157 pub fn finalize(self, result: std::result::Result<(), PacketError>) {
158 if self.tx.send(result).is_err() {
159 tracing::trace!("Failed to notify the awaiter about the successful packet transmission")
160 }
161 }
162}
163
164impl From<futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>> for PacketSendFinalizer {
165 fn from(value: futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>) -> Self {
166 Self { tx: value }
167 }
168}
169
170#[derive(Debug)]
172pub struct PacketSendAwaiter {
173 rx: futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>,
174}
175
176impl From<futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>> for PacketSendAwaiter {
177 fn from(value: futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>) -> Self {
178 Self { rx: value }
179 }
180}
181
182impl PacketSendAwaiter {
183 #[tracing::instrument(level = "trace", skip(self))]
184 pub async fn consume_and_wait(self, until_timeout: std::time::Duration) -> Result<()> {
185 let timeout = sleep(until_timeout);
186 let rx = self.rx;
187 pin_mut!(rx, timeout);
188 match futures::future::select(rx, timeout).await {
189 Either::Left((Ok(Ok(v)), _)) => Ok(v),
190 Either::Left((Ok(Err(e)), _)) => Err(TransportError(e.to_string())),
191 Either::Left((Err(_), _)) => Err(TransportError("Canceled".to_owned())),
192 Either::Right(_) => Err(TransportError("Timed out on sending a packet".to_owned())),
193 }
194 }
195}
196
197pub type SendMsgInput = (ApplicationDataOut, ResolvedTransportRouting, PacketSendFinalizer);
198
199#[derive(Debug, Clone)]
200pub struct MsgSender<T>
201where
202 T: Sink<SendMsgInput> + Send + Sync + Clone + 'static + std::marker::Unpin,
203{
204 tx: T,
205}
206
207impl<T> MsgSender<T>
208where
209 T: Sink<SendMsgInput> + Send + Sync + Clone + 'static + std::marker::Unpin,
210{
211 pub fn new(tx: T) -> Self {
212 Self { tx }
213 }
214
215 #[tracing::instrument(level = "trace", skip(self, data))]
217 pub async fn send_packet(
218 &self,
219 data: ApplicationDataOut,
220 routing: ResolvedTransportRouting,
221 ) -> Result<PacketSendAwaiter> {
222 let (tx, rx) = futures::channel::oneshot::channel::<std::result::Result<(), PacketError>>();
223
224 self.tx
225 .clone()
226 .send((data, routing, tx.into()))
227 .await
228 .map_err(|_| TransportError("Failed to send a message".into()))
229 .map(move |_| {
230 let awaiter: PacketSendAwaiter = rx.into();
231 awaiter
232 })
233 }
234}
235
236#[derive(Clone, Debug)]
238pub struct PacketInteractionConfig {
239 pub packet_keypair: OffchainKeypair,
240 pub outgoing_ticket_win_prob: Option<WinningProbability>,
241 pub outgoing_ticket_price: Option<HoprBalance>,
242}
243
244#[cfg(test)]
245mod tests {
246 use std::time::Duration;
247
248 use anyhow::Context;
249 use futures::StreamExt;
250 use hopr_crypto_random::Randomizable;
251 use hopr_internal_types::prelude::HoprPseudonym;
252 use hopr_path::ValidatedPath;
253 use tokio::time::timeout;
254
255 use super::*;
256
257 #[tokio::test]
258 pub async fn packet_send_finalizer_is_triggered() {
259 let (tx, rx) = futures::channel::oneshot::channel::<std::result::Result<(), PacketError>>();
260
261 let finalizer: PacketSendFinalizer = tx.into();
262 let awaiter: PacketSendAwaiter = rx.into();
263
264 finalizer.finalize(Ok(()));
265
266 let result = awaiter.consume_and_wait(Duration::from_millis(20)).await;
267
268 assert!(result.is_ok());
269 }
270
271 #[tokio::test]
272 pub async fn message_sender_operation_reacts_on_finalizer_closure() -> anyhow::Result<()> {
273 let (tx, mut rx) = futures::channel::mpsc::unbounded::<SendMsgInput>();
274
275 let sender = MsgSender::new(tx);
276
277 let expected_data = ApplicationData::try_from([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09].as_ref())?;
278 let expected_path = ValidatedPath::direct(
279 *OffchainKeypair::random().public(),
280 ChainKeypair::random().public().to_address(),
281 );
282
283 let routing = ResolvedTransportRouting::Forward {
284 pseudonym: HoprPseudonym::random(),
285 forward_path: expected_path.clone(),
286 return_paths: vec![],
287 };
288
289 let result = sender
290 .send_packet(
291 ApplicationDataOut::with_no_packet_info(expected_data.clone()),
292 routing.clone(),
293 )
294 .await;
295 assert!(result.is_ok());
296
297 let received = rx.next();
298 let (data, path, finalizer) = timeout(Duration::from_millis(20), received)
299 .await
300 .context("Timeout")?
301 .context("value should be present")?;
302
303 assert_eq!(data.data, expected_data);
304 assert!(matches!(path, ResolvedTransportRouting::Forward { forward_path,.. } if forward_path == expected_path));
305
306 tokio::task::spawn(async move {
307 tokio::time::sleep(Duration::from_millis(3)).await;
308 finalizer.finalize(Ok(()))
309 });
310
311 assert!(
312 result
313 .context("Awaiter must be present")?
314 .consume_and_wait(Duration::from_millis(10))
315 .await
316 .is_ok()
317 );
318
319 Ok(())
320 }
321}