1use futures::{Sink, SinkExt, future::Either, pin_mut};
2use hopr_async_runtime::prelude::sleep;
3pub use hopr_crypto_packet::errors::PacketError;
4use hopr_crypto_packet::errors::{PacketError::TransportError, Result};
5use hopr_crypto_types::prelude::*;
6use hopr_db_api::{
7 prelude::HoprDbProtocolOperations,
8 protocol::{IncomingPacket, OutgoingPacket},
9};
10use hopr_internal_types::prelude::*;
11use hopr_network_types::prelude::ResolvedTransportRouting;
12use hopr_primitive_types::prelude::*;
13use hopr_transport_identity::PeerId;
14use hopr_transport_packet::prelude::ApplicationData;
15use tracing::error;
16
17lazy_static::lazy_static! {
18 pub static ref DEFAULT_PRICE_PER_PACKET: U256 = 10000000000000000u128.into();
20}
21
22#[async_trait::async_trait]
23pub trait PacketWrapping {
24 type Input;
25
26 async fn send(&self, data: ApplicationData, routing: ResolvedTransportRouting) -> Result<OutgoingPacket>;
27}
28
29#[async_trait::async_trait]
30pub trait PacketUnwrapping {
31 type Packet;
32
33 async fn recv(&self, peer: &PeerId, data: Box<[u8]>) -> Result<Option<Self::Packet>>;
34}
35
36#[derive(Debug, Clone)]
38pub struct PacketProcessor<Db>
39where
40 Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
41{
42 db: Db,
43 cfg: PacketInteractionConfig,
44}
45
46#[async_trait::async_trait]
47impl<Db> PacketWrapping for PacketProcessor<Db>
48where
49 Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
50{
51 type Input = ApplicationData;
52
53 #[tracing::instrument(level = "trace", skip(self, data))]
54 async fn send(&self, data: ApplicationData, routing: ResolvedTransportRouting) -> Result<OutgoingPacket> {
55 let packet = self
56 .db
57 .to_send(
58 data.to_bytes(),
59 routing,
60 self.determine_actual_outgoing_win_prob().await,
61 self.determine_actual_outgoing_ticket_price().await?,
62 )
63 .await
64 .map_err(|e| PacketError::PacketConstructionError(e.to_string()))?;
65
66 Ok(packet)
67 }
68}
69
70#[async_trait::async_trait]
71impl<Db> PacketUnwrapping for PacketProcessor<Db>
72where
73 Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
74{
75 type Packet = IncomingPacket;
76
77 #[tracing::instrument(level = "trace", skip(self, data))]
78 async fn recv(&self, peer: &PeerId, data: Box<[u8]>) -> Result<Option<Self::Packet>> {
79 let previous_hop = OffchainPublicKey::try_from(peer)
80 .map_err(|e| PacketError::LogicError(format!("failed to convert '{peer}' into the public key: {e}")))?;
81
82 self.db
83 .from_recv(
84 data,
85 &self.cfg.packet_keypair,
86 previous_hop,
87 self.determine_actual_outgoing_win_prob().await,
88 self.determine_actual_outgoing_ticket_price().await?,
89 )
90 .await
91 .map_err(|e| match e {
92 hopr_db_api::errors::DbError::TicketValidationError(v) => {
93 PacketError::TicketValidation(hopr_crypto_packet::errors::TicketValidationError {
94 reason: v.1,
95 ticket: Box::new(v.0),
96 })
97 }
98 _ => PacketError::PacketConstructionError(e.to_string()),
99 })
100 }
101}
102
103impl<Db> PacketProcessor<Db>
104where
105 Db: HoprDbProtocolOperations + Send + Sync + std::fmt::Debug + Clone,
106{
107 pub fn new(db: Db, cfg: PacketInteractionConfig) -> Self {
109 Self { db, cfg }
110 }
111
112 async fn determine_actual_outgoing_ticket_price(&self) -> Result<HoprBalance> {
115 let network_ticket_price =
117 self.db.get_network_ticket_price().await.map_err(|e| {
118 PacketError::LogicError(format!("failed to determine current network ticket price: {e}"))
119 })?;
120
121 Ok(self.cfg.outgoing_ticket_price.unwrap_or(network_ticket_price))
122 }
123
124 async fn determine_actual_outgoing_win_prob(&self) -> WinningProbability {
125 let network_win_prob = self
127 .db
128 .get_network_winning_probability()
129 .await
130 .inspect_err(|error| error!(%error, "failed to determine current network winning probability"))
131 .ok();
132
133 self.cfg
138 .outgoing_ticket_win_prob
139 .or(network_win_prob)
140 .unwrap_or_default() }
142}
143
144#[derive(Debug)]
151pub struct PacketSendFinalizer {
152 tx: futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>,
153}
154
155impl PacketSendFinalizer {
156 pub fn finalize(self, result: std::result::Result<(), PacketError>) {
157 if self.tx.send(result).is_err() {
158 error!("Failed to notify the awaiter about the successful packet transmission")
159 }
160 }
161}
162
163impl From<futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>> for PacketSendFinalizer {
164 fn from(value: futures::channel::oneshot::Sender<std::result::Result<(), PacketError>>) -> Self {
165 Self { tx: value }
166 }
167}
168
169#[derive(Debug)]
171pub struct PacketSendAwaiter {
172 rx: futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>,
173}
174
175impl From<futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>> for PacketSendAwaiter {
176 fn from(value: futures::channel::oneshot::Receiver<std::result::Result<(), PacketError>>) -> Self {
177 Self { rx: value }
178 }
179}
180
181impl PacketSendAwaiter {
182 #[tracing::instrument(level = "trace", skip(self))]
183 pub async fn consume_and_wait(self, until_timeout: std::time::Duration) -> Result<()> {
184 let timeout = sleep(until_timeout);
185 let rx = self.rx;
186 pin_mut!(rx, timeout);
187 match futures::future::select(rx, timeout).await {
188 Either::Left((Ok(Ok(v)), _)) => Ok(v),
189 Either::Left((Ok(Err(e)), _)) => Err(TransportError(e.to_string())),
190 Either::Left((Err(_), _)) => Err(TransportError("Canceled".to_owned())),
191 Either::Right(_) => Err(TransportError("Timed out on sending a packet".to_owned())),
192 }
193 }
194}
195
196pub type SendMsgInput = (ApplicationData, ResolvedTransportRouting, PacketSendFinalizer);
197
198#[derive(Debug, Clone)]
199pub struct MsgSender<T>
200where
201 T: Sink<SendMsgInput> + Send + Sync + Clone + 'static + std::marker::Unpin,
202{
203 tx: T,
204}
205
206impl<T> MsgSender<T>
207where
208 T: Sink<SendMsgInput> + Send + Sync + Clone + 'static + std::marker::Unpin,
209{
210 pub fn new(tx: T) -> Self {
211 Self { tx }
212 }
213
214 #[tracing::instrument(level = "trace", skip(self, data))]
216 pub async fn send_packet(
217 &self,
218 data: ApplicationData,
219 routing: ResolvedTransportRouting,
220 ) -> Result<PacketSendAwaiter> {
221 let (tx, rx) = futures::channel::oneshot::channel::<std::result::Result<(), PacketError>>();
222
223 self.tx
224 .clone()
225 .send((data, routing, tx.into()))
226 .await
227 .map_err(|_| TransportError("Failed to send a message".into()))
228 .map(move |_| {
229 let awaiter: PacketSendAwaiter = rx.into();
230 awaiter
231 })
232 }
233}
234
235#[derive(Clone, Debug)]
237pub struct PacketInteractionConfig {
238 pub packet_keypair: OffchainKeypair,
239 pub outgoing_ticket_win_prob: Option<WinningProbability>,
240 pub outgoing_ticket_price: Option<HoprBalance>,
241}
242
243#[cfg(test)]
244mod tests {
245 use std::time::Duration;
246
247 use anyhow::Context;
248 use futures::StreamExt;
249 use hopr_crypto_random::Randomizable;
250 use hopr_internal_types::prelude::HoprPseudonym;
251 use hopr_path::ValidatedPath;
252 use tokio::time::timeout;
253
254 use super::*;
255
256 #[tokio::test]
257 pub async fn packet_send_finalizer_is_triggered() {
258 let (tx, rx) = futures::channel::oneshot::channel::<std::result::Result<(), PacketError>>();
259
260 let finalizer: PacketSendFinalizer = tx.into();
261 let awaiter: PacketSendAwaiter = rx.into();
262
263 finalizer.finalize(Ok(()));
264
265 let result = awaiter.consume_and_wait(Duration::from_millis(20)).await;
266
267 assert!(result.is_ok());
268 }
269
270 #[tokio::test]
271 pub async fn message_sender_operation_reacts_on_finalizer_closure() -> anyhow::Result<()> {
272 let (tx, mut rx) = futures::channel::mpsc::unbounded::<SendMsgInput>();
273
274 let sender = MsgSender::new(tx);
275
276 let expected_data = ApplicationData::from_bytes(&[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09])?;
277 let expected_path = ValidatedPath::direct(
278 *OffchainKeypair::random().public(),
279 ChainKeypair::random().public().to_address(),
280 );
281
282 let routing = ResolvedTransportRouting::Forward {
283 pseudonym: HoprPseudonym::random(),
284 forward_path: expected_path.clone(),
285 return_paths: vec![],
286 };
287
288 let result = sender.send_packet(expected_data.clone(), routing.clone()).await;
289 assert!(result.is_ok());
290
291 let received = rx.next();
292 let (data, path, finalizer) = timeout(Duration::from_millis(20), received)
293 .await
294 .context("Timeout")?
295 .context("value should be present")?;
296
297 assert_eq!(data, expected_data);
298 assert!(matches!(path, ResolvedTransportRouting::Forward { forward_path,.. } if forward_path == expected_path));
299
300 tokio::task::spawn(async move {
301 tokio::time::sleep(Duration::from_millis(3)).await;
302 finalizer.finalize(Ok(()))
303 });
304
305 assert!(
306 result
307 .context("Awaiter must be present")?
308 .consume_and_wait(Duration::from_millis(10))
309 .await
310 .is_ok()
311 );
312
313 Ok(())
314 }
315}