hopr_protocol_hopr/codec/
encoder.rs1use bytes::{BufMut, BytesMut};
2use hopr_api::{
3 chain::*,
4 types::{
5 crypto::{crypto_traits::Randomizable, prelude::*},
6 internal::prelude::*,
7 primitive::prelude::*,
8 },
9};
10use hopr_crypto_packet::prelude::*;
11
12use crate::{HoprCodecConfig, OutgoingPacket, PacketEncoder, SurbStore, errors::HoprProtocolError};
13
14pub const MAX_ACKNOWLEDGEMENTS_BATCH_SIZE: usize =
19 (HoprPacket::PAYLOAD_SIZE - size_of::<u16>()) / Acknowledgement::SIZE;
20
21pub struct HoprEncoder<Chain, S, T> {
23 chain_api: Chain,
24 surb_store: S,
25 ticket_factory: T,
26 chain_key: ChainKeypair,
27 channels_dst: Hash,
28 cfg: HoprCodecConfig,
29}
30
31impl<Chain, S, T> HoprEncoder<Chain, S, T> {
32 pub fn new(
34 chain_key: ChainKeypair,
35 chain_api: Chain,
36 surb_store: S,
37 ticket_factory: T,
38 channels_dst: Hash,
39 cfg: HoprCodecConfig,
40 ) -> Self {
41 Self {
42 chain_api,
43 surb_store,
44 ticket_factory,
45 chain_key,
46 channels_dst,
47 cfg,
48 }
49 }
50}
51
52impl<Chain, S, T> HoprEncoder<Chain, S, T>
53where
54 Chain: ChainKeyOperations + ChainReadChannelOperations + ChainReadTicketOperations + ChainValues + Sync,
55 S: SurbStore,
56 T: hopr_api::tickets::TicketFactory + Sync,
57{
58 fn encode_packet_internal<D: AsRef<[u8]> + Send + 'static, Sig: Into<PacketSignals> + Send + 'static>(
59 &self,
60 next_peer: OffchainPublicKey,
61 data: D,
62 num_hops: usize,
63 signals: Sig,
64 routing: PacketRouting<ValidatedPath>,
65 pseudonym: HoprPseudonym,
66 ) -> Result<OutgoingPacket, HoprProtocolError> {
67 let next_peer = self
68 .chain_api
69 .packet_key_to_chain_key(&next_peer)
70 .map_err(HoprProtocolError::resolver)?
71 .ok_or(HoprProtocolError::KeyNotFound)?;
72
73 let next_ticket = if num_hops > 1 {
75 let channel = self
76 .chain_api
77 .channel_by_parties(self.chain_key.as_ref(), &next_peer)
78 .map_err(HoprProtocolError::resolver)?
79 .ok_or_else(|| HoprProtocolError::ChannelNotFound(*self.chain_key.as_ref(), next_peer))?;
80
81 let (outgoing_ticket_win_prob, outgoing_ticket_price) = self
82 .chain_api
83 .outgoing_ticket_values(self.cfg.outgoing_win_prob, self.cfg.outgoing_ticket_price)
84 .map_err(HoprProtocolError::resolver)?;
85
86 self.ticket_factory
87 .new_multihop_ticket(
88 &channel,
89 (num_hops as u8).try_into().expect("cannot fail due to num_hops > 1"),
90 outgoing_ticket_win_prob,
91 outgoing_ticket_price,
92 )
93 .map_err(HoprProtocolError::ticket_factory)?
94 } else {
95 TicketBuilder::zero_hop().counterparty(next_peer)
96 };
97
98 let (packet, openers) = HoprPacket::into_outgoing(
100 data.as_ref(),
101 &pseudonym,
102 routing,
103 &self.chain_key,
104 next_ticket,
105 self.chain_api.key_id_mapper_ref(),
106 &self.channels_dst,
107 signals,
108 )?;
109
110 openers.into_iter().for_each(|(surb_id, opener)| {
113 self.surb_store
114 .insert_reply_opener(HoprSenderId::from_pseudonym_and_id(&pseudonym, surb_id), opener);
115 });
116
117 let out = packet.try_as_outgoing().ok_or(HoprProtocolError::InvalidState(
118 "cannot send out packet that is not outgoing",
119 ))?;
120
121 let mut transport_payload = BytesMut::with_capacity(HoprPacket::SIZE);
122 transport_payload.put_slice(out.packet.as_ref());
123 transport_payload.put_slice(&out.ticket.into_encoded());
124
125 Ok(OutgoingPacket {
126 next_hop: out.next_hop,
127 ack_challenge: out.ack_challenge,
128 data: transport_payload.freeze(),
129 })
130 }
131}
132
133impl<Chain, S, T> PacketEncoder for HoprEncoder<Chain, S, T>
134where
135 Chain: ChainKeyOperations + ChainReadChannelOperations + ChainReadTicketOperations + ChainValues + Send + Sync,
136 S: SurbStore + Send + Sync,
137 T: hopr_api::tickets::TicketFactory + Send + Sync,
138{
139 type Error = HoprProtocolError;
140
141 #[tracing::instrument(skip_all, level = "trace")]
142 fn encode_packet<D: AsRef<[u8]> + Send + 'static, Sig: Into<PacketSignals> + Send + 'static>(
143 &self,
144 data: D,
145 routing: ResolvedTransportRouting<HoprSurb>,
146 signals: Sig,
147 ) -> Result<OutgoingPacket, Self::Error> {
148 let (next_peer, num_hops, pseudonym, routing) = match routing {
150 ResolvedTransportRouting::Forward {
151 pseudonym,
152 forward_path,
153 return_paths,
154 } => (
155 forward_path[0],
156 forward_path.num_hops(),
157 pseudonym,
158 PacketRouting::ForwardPath {
159 forward_path,
160 return_paths,
161 },
162 ),
163 ResolvedTransportRouting::Return(sender_id, surb) => {
164 let next = self
165 .chain_api
166 .key_id_mapper_ref()
167 .map_id_to_public(&surb.first_relayer)
168 .ok_or(HoprProtocolError::KeyNotFound)?;
169
170 (
171 next,
172 surb.additional_data_receiver.proof_of_relay_values().chain_length() as usize,
173 sender_id.pseudonym(),
174 PacketRouting::Surb(sender_id.surb_id(), surb),
175 )
176 }
177 };
178
179 tracing::trace!(len = data.as_ref().len(), "encoding packet");
180 self.encode_packet_internal(next_peer, data, num_hops, signals, routing, pseudonym)
181 }
182
183 #[tracing::instrument(skip_all, level = "trace", fields(destination = destination.to_peerid_str()))]
184 fn encode_acknowledgements(
185 &self,
186 acks: &[VerifiedAcknowledgement],
187 destination: &OffchainPublicKey,
188 ) -> Result<OutgoingPacket, Self::Error> {
189 tracing::trace!(num_acks = acks.len(), "encoding acknowledgements");
190
191 let mut all_acks = Vec::<u8>::with_capacity(size_of::<u16>() + acks.len() * Acknowledgement::SIZE);
192 all_acks.extend((acks.len() as u16).to_be_bytes());
193 acks.iter().for_each(|ack| all_acks.extend(ack.leak().as_ref()));
194
195 self.encode_packet_internal(
196 *destination,
197 all_acks,
198 0,
199 None,
200 PacketRouting::NoAck(*destination),
201 HoprPseudonym::random(),
202 )
203 }
204}