hopr_protocol_hopr/codec/
decoder.rs1use std::{
2 ops::{Mul, Sub},
3 time::Duration,
4};
5
6use hopr_api::chain::*;
7use hopr_crypto_packet::prelude::*;
8use hopr_crypto_types::prelude::*;
9use hopr_internal_types::prelude::*;
10use hopr_primitive_types::prelude::*;
11
12use crate::{
13 AuxiliaryPacketInfo, HoprCodecConfig, IncomingAcknowledgementPacket, IncomingFinalPacket, IncomingForwardedPacket,
14 IncomingPacket, IncomingPacketError, PacketDecoder, SurbStore, TicketCreationError, TicketTracker,
15 errors::HoprProtocolError, tbf::TagBloomFilter,
16};
17
18pub struct HoprDecoder<Chain, S, T> {
20 chain_api: Chain,
21 surb_store: std::sync::Arc<S>,
22 tracker: T,
23 packet_key: OffchainKeypair,
24 chain_key: ChainKeypair,
25 channels_dst: Hash,
26 cfg: HoprCodecConfig,
27 tbf: parking_lot::Mutex<TagBloomFilter>,
28 peer_id_cache: moka::future::Cache<PeerId, OffchainPublicKey>,
29}
30
31impl<Chain, S, T> HoprDecoder<Chain, S, T>
32where
33 Chain: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
34 S: SurbStore + Send + Sync,
35 T: TicketTracker + Send + Sync,
36{
37 pub fn new(
39 (packet_key, chain_key): (OffchainKeypair, ChainKeypair),
40 chain_api: Chain,
41 surb_store: S,
42 tracker: T,
43 channels_dst: Hash,
44 cfg: HoprCodecConfig,
45 ) -> Self {
46 Self {
47 chain_api,
48 surb_store: std::sync::Arc::new(surb_store),
49 packet_key,
50 chain_key,
51 channels_dst,
52 cfg,
53 tracker,
54 tbf: parking_lot::Mutex::new(TagBloomFilter::default()),
55 peer_id_cache: moka::future::Cache::builder()
56 .time_to_idle(Duration::from_secs(600))
57 .max_capacity(100_000)
58 .build(),
59 }
60 }
61
62 async fn validate_and_replace_ticket(
63 &self,
64 mut fwd: HoprForwardedPacket,
65 ) -> Result<(HoprForwardedPacket, UnacknowledgedTicket), HoprProtocolError> {
66 let previous_hop_addr = self
67 .chain_api
68 .packet_key_to_chain_key(&fwd.previous_hop)
69 .await
70 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
71 .ok_or(HoprProtocolError::KeyNotFound)?;
72
73 let next_hop_addr = self
74 .chain_api
75 .packet_key_to_chain_key(&fwd.outgoing.next_hop)
76 .await
77 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
78 .ok_or(HoprProtocolError::KeyNotFound)?;
79
80 let incoming_channel = self
81 .chain_api
82 .channel_by_parties(&previous_hop_addr, self.chain_key.as_ref())
83 .await
84 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
85 .ok_or_else(|| HoprProtocolError::ChannelNotFound(previous_hop_addr, *self.chain_key.as_ref()))?;
86
87 let minimum_ticket_price = self
90 .chain_api
91 .minimum_ticket_price()
92 .await
93 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
94 .mul(U256::from(fwd.path_pos));
95
96 let remaining_balance = incoming_channel.balance.sub(
97 self.tracker
98 .incoming_channel_unrealized_balance(incoming_channel.get_id(), incoming_channel.channel_epoch)
99 .await
100 .map_err(|e| HoprProtocolError::TicketTrackerError(e.into()))?,
101 );
102
103 let win_prob = self
108 .chain_api
109 .minimum_incoming_ticket_win_prob()
110 .await
111 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?;
112 let domain_separator = self.channels_dst;
113
114 let verified_incoming_ticket = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
115 validate_unacknowledged_ticket(
116 fwd.outgoing.ticket,
117 &incoming_channel,
118 minimum_ticket_price,
119 win_prob,
120 remaining_balance,
121 &domain_separator,
122 )
123 })
124 .await?;
125
126 tracing::trace!(%verified_incoming_ticket, "successfully verified incoming ticket");
128
129 let ticket_builder = if fwd.path_pos > 1 {
135 let outgoing_channel = self
139 .chain_api
140 .channel_by_parties(self.chain_key.as_ref(), &next_hop_addr)
141 .await
142 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
143 .ok_or_else(|| HoprProtocolError::ChannelNotFound(*self.chain_key.as_ref(), next_hop_addr))?;
144
145 let (outgoing_ticket_win_prob, outgoing_ticket_price) = self
146 .chain_api
147 .outgoing_ticket_values(self.cfg.outgoing_win_prob, self.cfg.outgoing_ticket_price)
148 .await
149 .map_err(|e| HoprProtocolError::ResolverError(e.into()))?;
150
151 let outgoing_ticket_win_prob = outgoing_ticket_win_prob.max(&verified_incoming_ticket.win_prob());
155
156 self.tracker
160 .create_multihop_ticket(
161 &outgoing_channel,
162 fwd.path_pos,
163 outgoing_ticket_win_prob,
164 outgoing_ticket_price,
165 )
166 .await
167 .map_err(|e| match e {
168 TicketCreationError::OutOfFunds(id, a) => HoprProtocolError::OutOfFunds(id, a),
169 e => HoprProtocolError::TicketTrackerError(e.into()),
170 })?
171 } else {
172 TicketBuilder::zero_hop().counterparty(next_hop_addr)
173 };
174
175 let ticket_builder = ticket_builder.eth_challenge(fwd.next_challenge);
177 let me_on_chain = self.chain_key.clone();
178 fwd.outgoing.ticket = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
179 ticket_builder.build_signed(&me_on_chain, &domain_separator)
180 })
181 .await?
182 .leak();
183
184 let unack_ticket = verified_incoming_ticket.into_unacknowledged(fwd.own_key);
185 Ok((fwd, unack_ticket))
186 }
187}
188
189#[async_trait::async_trait]
190impl<Chain, S, T> PacketDecoder for HoprDecoder<Chain, S, T>
191where
192 Chain: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
193 S: SurbStore + Send + Sync + 'static,
194 T: TicketTracker + Send + Sync,
195{
196 type Error = HoprProtocolError;
197
198 async fn decode(
199 &self,
200 sender: PeerId,
201 data: Box<[u8]>,
202 ) -> Result<IncomingPacket, IncomingPacketError<Self::Error>> {
203 let previous_hop = match self
205 .peer_id_cache
206 .try_get_with_by_ref(
207 &sender,
208 hopr_parallelize::cpu::spawn_fifo_blocking(move || OffchainPublicKey::from_peerid(&sender)),
209 )
210 .await
211 {
212 Ok(peer) => peer,
213 Err(error) => {
214 tracing::error!(%sender, %error, "dropping packet - cannot convert peer id");
216 return Err(IncomingPacketError::Undecodable(HoprProtocolError::InvalidSender));
217 }
218 };
219
220 let offchain_keypair = self.packet_key.clone();
221 let surb_store = self.surb_store.clone();
222 let mapper = self.chain_api.key_id_mapper_ref().clone();
223
224 let packet = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
227 HoprPacket::from_incoming(&data, &offchain_keypair, previous_hop, &mapper, |p| {
228 surb_store.find_reply_opener(p)
229 })
230 })
231 .await
232 .map_err(|e| IncomingPacketError::Undecodable(e.into()))?;
233
234 if let Some(tag) = packet.packet_tag() {
237 if self.tbf.lock().check_and_set(tag) {
240 return Err(IncomingPacketError::ProcessingError(
241 previous_hop,
242 HoprProtocolError::Replay,
243 ));
244 }
245 }
246
247 match packet {
248 HoprPacket::Final(incoming) => {
249 let info = AuxiliaryPacketInfo {
251 packet_signals: incoming.signals,
252 num_surbs: incoming.surbs.len(),
253 };
254
255 if !incoming.surbs.is_empty() {
257 self.surb_store.insert_surbs(incoming.sender, incoming.surbs).await;
258 tracing::trace!(pseudonym = %incoming.sender, num_surbs = info.num_surbs, "stored incoming surbs for pseudonym");
259 }
260
261 Ok(match incoming.ack_key {
262 None => {
263 IncomingPacket::Acknowledgement(
265 IncomingAcknowledgementPacket {
266 packet_tag: incoming.packet_tag,
267 previous_hop: incoming.previous_hop,
268 received_ack: incoming
269 .plain_text
270 .as_ref()
271 .try_into()
272 .map_err(|e: GeneralError| IncomingPacketError::Undecodable(e.into()))?,
273 }
274 .into(),
275 )
276 }
277 Some(ack_key) => IncomingPacket::Final(
278 IncomingFinalPacket {
279 packet_tag: incoming.packet_tag,
280 previous_hop: incoming.previous_hop,
281 sender: incoming.sender,
282 plain_text: incoming.plain_text,
283 ack_key,
284 info,
285 }
286 .into(),
287 ),
288 })
289 }
290 HoprPacket::Forwarded(fwd) => {
291 let (fwd, verified_unack_ticket) =
293 self.validate_and_replace_ticket(*fwd)
294 .await
295 .map_err(|error| match error {
296 HoprProtocolError::TicketValidationError(e) => {
298 IncomingPacketError::InvalidTicket(previous_hop, e)
299 }
300 e => IncomingPacketError::ProcessingError(previous_hop, e),
301 })?;
302
303 let mut payload = Vec::with_capacity(HoprPacket::SIZE);
304 payload.extend_from_slice(fwd.outgoing.packet.as_ref());
305 payload.extend_from_slice(&fwd.outgoing.ticket.into_encoded());
306
307 Ok(IncomingPacket::Forwarded(
308 IncomingForwardedPacket {
309 packet_tag: fwd.packet_tag,
310 previous_hop: fwd.previous_hop,
311 next_hop: fwd.outgoing.next_hop,
312 data: payload.into_boxed_slice(),
313 ack_challenge: fwd.outgoing.ack_challenge,
314 received_ticket: verified_unack_ticket,
315 ack_key_prev_hop: fwd.ack_key,
316 }
317 .into(),
318 ))
319 }
320 HoprPacket::Outgoing(_) => Err(IncomingPacketError::ProcessingError(
321 previous_hop,
322 HoprProtocolError::InvalidState("cannot be outgoing packet"),
323 )),
324 }
325 }
326}