hopr_protocol_hopr/codec/
decoder.rs

1use std::{
2    ops::{Mul, Sub},
3    time::Duration,
4};
5
6use hopr_api::chain::*;
7use hopr_crypto_packet::prelude::*;
8use hopr_crypto_types::prelude::*;
9use hopr_internal_types::prelude::*;
10use hopr_primitive_types::prelude::*;
11
12use crate::{
13    AuxiliaryPacketInfo, HoprCodecConfig, IncomingAcknowledgementPacket, IncomingFinalPacket, IncomingForwardedPacket,
14    IncomingPacket, IncomingPacketError, PacketDecoder, SurbStore, TicketCreationError, TicketTracker,
15    errors::HoprProtocolError, tbf::TagBloomFilter,
16};
17
18/// Default [decoder](PacketDecoder) implementation for HOPR packets.
19pub struct HoprDecoder<Chain, S, T> {
20    chain_api: Chain,
21    surb_store: std::sync::Arc<S>,
22    tracker: T,
23    packet_key: OffchainKeypair,
24    chain_key: ChainKeypair,
25    channels_dst: Hash,
26    cfg: HoprCodecConfig,
27    tbf: parking_lot::Mutex<TagBloomFilter>,
28    peer_id_cache: moka::future::Cache<PeerId, OffchainPublicKey>,
29}
30
31impl<Chain, S, T> HoprDecoder<Chain, S, T>
32where
33    Chain: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
34    S: SurbStore + Send + Sync,
35    T: TicketTracker + Send + Sync,
36{
37    /// Creates a new instance of the decoder.
38    pub fn new(
39        (packet_key, chain_key): (OffchainKeypair, ChainKeypair),
40        chain_api: Chain,
41        surb_store: S,
42        tracker: T,
43        channels_dst: Hash,
44        cfg: HoprCodecConfig,
45    ) -> Self {
46        Self {
47            chain_api,
48            surb_store: std::sync::Arc::new(surb_store),
49            packet_key,
50            chain_key,
51            channels_dst,
52            cfg,
53            tracker,
54            tbf: parking_lot::Mutex::new(TagBloomFilter::default()),
55            peer_id_cache: moka::future::Cache::builder()
56                .time_to_idle(Duration::from_secs(600))
57                .max_capacity(100_000)
58                .build(),
59        }
60    }
61
62    async fn validate_and_replace_ticket(
63        &self,
64        mut fwd: HoprForwardedPacket,
65    ) -> Result<(HoprForwardedPacket, UnacknowledgedTicket), HoprProtocolError> {
66        let previous_hop_addr = self
67            .chain_api
68            .packet_key_to_chain_key(&fwd.previous_hop)
69            .await
70            .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
71            .ok_or(HoprProtocolError::KeyNotFound)?;
72
73        let next_hop_addr = self
74            .chain_api
75            .packet_key_to_chain_key(&fwd.outgoing.next_hop)
76            .await
77            .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
78            .ok_or(HoprProtocolError::KeyNotFound)?;
79
80        let incoming_channel = self
81            .chain_api
82            .channel_by_parties(&previous_hop_addr, self.chain_key.as_ref())
83            .await
84            .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
85            .ok_or_else(|| HoprProtocolError::ChannelNotFound(previous_hop_addr, *self.chain_key.as_ref()))?;
86
87        // The ticket price from the oracle times my node's position on the
88        // path is the acceptable minimum
89        let minimum_ticket_price = self
90            .chain_api
91            .minimum_ticket_price()
92            .await
93            .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
94            .mul(U256::from(fwd.path_pos));
95
96        let remaining_balance = incoming_channel.balance.sub(
97            self.tracker
98                .incoming_channel_unrealized_balance(incoming_channel.get_id(), incoming_channel.channel_epoch)
99                .await
100                .map_err(|e| HoprProtocolError::TicketTrackerError(e.into()))?,
101        );
102
103        // Here also the signature on the ticket gets validated,
104        // so afterward we are sure the source of the `channel`
105        // (which is equal to `previous_hop_addr`) has issued this
106        // ticket.
107        let win_prob = self
108            .chain_api
109            .minimum_incoming_ticket_win_prob()
110            .await
111            .map_err(|e| HoprProtocolError::ResolverError(e.into()))?;
112        let domain_separator = self.channels_dst;
113
114        let verified_incoming_ticket = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
115            validate_unacknowledged_ticket(
116                fwd.outgoing.ticket,
117                &incoming_channel,
118                minimum_ticket_price,
119                win_prob,
120                remaining_balance,
121                &domain_separator,
122            )
123        })
124        .await?;
125
126        // The ticket is now validated:
127        tracing::trace!(%verified_incoming_ticket, "successfully verified incoming ticket");
128
129        // NOTE: that the path position according to the ticket value
130        // may no longer match the path position from the packet header,
131        // because the ticket issuer may set the price of the ticket higher.
132
133        // Create the new ticket for the new packet
134        let ticket_builder = if fwd.path_pos > 1 {
135            // There must be a channel to the next node if it's not the final hop.
136            // If the channel does not exist, the ticket we extracted before cannot be saved,
137            // as there would be no way to acknowledge it without the channel.
138            let outgoing_channel = self
139                .chain_api
140                .channel_by_parties(self.chain_key.as_ref(), &next_hop_addr)
141                .await
142                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?
143                .ok_or_else(|| HoprProtocolError::ChannelNotFound(*self.chain_key.as_ref(), next_hop_addr))?;
144
145            let (outgoing_ticket_win_prob, outgoing_ticket_price) = self
146                .chain_api
147                .outgoing_ticket_values(self.cfg.outgoing_win_prob, self.cfg.outgoing_ticket_price)
148                .await
149                .map_err(|e| HoprProtocolError::ResolverError(e.into()))?;
150
151            // We currently take the maximum of the win prob from the incoming ticket
152            // and the one configured on this node.
153            // Therefore, the winning probability can only increase along the path.
154            let outgoing_ticket_win_prob = outgoing_ticket_win_prob.max(&verified_incoming_ticket.win_prob());
155
156            // The following operation fails if there's not enough balance on the channel to the next hop.
157            // Again, in this case, we cannot save the ticket we previously extracted because there is no way it gets
158            // acknowledged without enough balance.
159            self.tracker
160                .create_multihop_ticket(
161                    &outgoing_channel,
162                    fwd.path_pos,
163                    outgoing_ticket_win_prob,
164                    outgoing_ticket_price,
165                )
166                .await
167                .map_err(|e| match e {
168                    TicketCreationError::OutOfFunds(id, a) => HoprProtocolError::OutOfFunds(id, a),
169                    e => HoprProtocolError::TicketTrackerError(e.into()),
170                })?
171        } else {
172            TicketBuilder::zero_hop().counterparty(next_hop_addr)
173        };
174
175        // Finally, replace the ticket in the outgoing packet with a new one
176        let ticket_builder = ticket_builder.eth_challenge(fwd.next_challenge);
177        let me_on_chain = self.chain_key.clone();
178        fwd.outgoing.ticket = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
179            ticket_builder.build_signed(&me_on_chain, &domain_separator)
180        })
181        .await?
182        .leak();
183
184        let unack_ticket = verified_incoming_ticket.into_unacknowledged(fwd.own_key);
185        Ok((fwd, unack_ticket))
186    }
187}
188
189#[async_trait::async_trait]
190impl<Chain, S, T> PacketDecoder for HoprDecoder<Chain, S, T>
191where
192    Chain: ChainReadChannelOperations + ChainKeyOperations + ChainValues + Send + Sync,
193    S: SurbStore + Send + Sync + 'static,
194    T: TicketTracker + Send + Sync,
195{
196    type Error = HoprProtocolError;
197
198    async fn decode(
199        &self,
200        sender: PeerId,
201        data: Box<[u8]>,
202    ) -> Result<IncomingPacket, IncomingPacketError<Self::Error>> {
203        // Try to retrieve the peer's public key from the cache or compute it if it does not exist yet
204        let previous_hop = match self
205            .peer_id_cache
206            .try_get_with_by_ref(
207                &sender,
208                hopr_parallelize::cpu::spawn_fifo_blocking(move || OffchainPublicKey::from_peerid(&sender)),
209            )
210            .await
211        {
212            Ok(peer) => peer,
213            Err(error) => {
214                // There absolutely nothing we can do when the peer id is unparseable (e.g., non-ed25519 based)
215                tracing::error!(%sender, %error, "dropping packet - cannot convert peer id");
216                return Err(IncomingPacketError::Undecodable(HoprProtocolError::InvalidSender));
217            }
218        };
219
220        let offchain_keypair = self.packet_key.clone();
221        let surb_store = self.surb_store.clone();
222        let mapper = self.chain_api.key_id_mapper_ref().clone();
223
224        // If the following operation fails, it means that the packet is not a valid Hopr packet,
225        // and as such should not be acknowledged later.
226        let packet = hopr_parallelize::cpu::spawn_fifo_blocking(move || {
227            HoprPacket::from_incoming(&data, &offchain_keypair, previous_hop, &mapper, |p| {
228                surb_store.find_reply_opener(p)
229            })
230        })
231        .await
232        .map_err(|e| IncomingPacketError::Undecodable(e.into()))?;
233
234        // This is checked on both Final and Forwarded packets,
235        // Outgoing packets are not allowed to pass and are later reported as invalid state.
236        if let Some(tag) = packet.packet_tag() {
237            // This operation has run-time of ~10 nanoseconds,
238            // and therefore does not need to be invoked via spawn_blocking
239            if self.tbf.lock().check_and_set(tag) {
240                return Err(IncomingPacketError::ProcessingError(
241                    previous_hop,
242                    HoprProtocolError::Replay,
243                ));
244            }
245        }
246
247        match packet {
248            HoprPacket::Final(incoming) => {
249                // Extract additional information from the packet that will be passed upwards
250                let info = AuxiliaryPacketInfo {
251                    packet_signals: incoming.signals,
252                    num_surbs: incoming.surbs.len(),
253                };
254
255                // Store all incoming SURBs if any
256                if !incoming.surbs.is_empty() {
257                    self.surb_store.insert_surbs(incoming.sender, incoming.surbs).await;
258                    tracing::trace!(pseudonym = %incoming.sender, num_surbs = info.num_surbs, "stored incoming surbs for pseudonym");
259                }
260
261                Ok(match incoming.ack_key {
262                    None => {
263                        // The contained payload represents an Acknowledgement
264                        IncomingPacket::Acknowledgement(
265                            IncomingAcknowledgementPacket {
266                                packet_tag: incoming.packet_tag,
267                                previous_hop: incoming.previous_hop,
268                                received_ack: incoming
269                                    .plain_text
270                                    .as_ref()
271                                    .try_into()
272                                    .map_err(|e: GeneralError| IncomingPacketError::Undecodable(e.into()))?,
273                            }
274                            .into(),
275                        )
276                    }
277                    Some(ack_key) => IncomingPacket::Final(
278                        IncomingFinalPacket {
279                            packet_tag: incoming.packet_tag,
280                            previous_hop: incoming.previous_hop,
281                            sender: incoming.sender,
282                            plain_text: incoming.plain_text,
283                            ack_key,
284                            info,
285                        }
286                        .into(),
287                    ),
288                })
289            }
290            HoprPacket::Forwarded(fwd) => {
291                // Transform the ticket so it can be sent to the next hop
292                let (fwd, verified_unack_ticket) =
293                    self.validate_and_replace_ticket(*fwd)
294                        .await
295                        .map_err(|error| match error {
296                            // Distinguish ticket validation errors so that they can get extra treatment later
297                            HoprProtocolError::TicketValidationError(e) => {
298                                IncomingPacketError::InvalidTicket(previous_hop, e)
299                            }
300                            e => IncomingPacketError::ProcessingError(previous_hop, e),
301                        })?;
302
303                let mut payload = Vec::with_capacity(HoprPacket::SIZE);
304                payload.extend_from_slice(fwd.outgoing.packet.as_ref());
305                payload.extend_from_slice(&fwd.outgoing.ticket.into_encoded());
306
307                Ok(IncomingPacket::Forwarded(
308                    IncomingForwardedPacket {
309                        packet_tag: fwd.packet_tag,
310                        previous_hop: fwd.previous_hop,
311                        next_hop: fwd.outgoing.next_hop,
312                        data: payload.into_boxed_slice(),
313                        ack_challenge: fwd.outgoing.ack_challenge,
314                        received_ticket: verified_unack_ticket,
315                        ack_key_prev_hop: fwd.ack_key,
316                    }
317                    .into(),
318                ))
319            }
320            HoprPacket::Outgoing(_) => Err(IncomingPacketError::ProcessingError(
321                previous_hop,
322                HoprProtocolError::InvalidState("cannot be outgoing packet"),
323            )),
324        }
325    }
326}