1use async_trait::async_trait;
2use hopr_crypto_packet::chain::ChainPacketComponents;
3use hopr_crypto_packet::validation::validate_unacknowledged_ticket;
4use hopr_crypto_types::prelude::*;
5use hopr_db_api::errors::Result;
6use hopr_db_api::protocol::{
7 AckResult, HoprDbProtocolOperations, ResolvedAcknowledgement, TransportPacketWithChainData,
8};
9use hopr_db_api::resolver::HoprDbResolverOperations;
10use hopr_internal_types::prelude::*;
11use hopr_primitive_types::prelude::*;
12use std::ops::{Mul, Sub};
13use tracing::{instrument, trace, warn};
14
15use crate::channels::HoprDbChannelOperations;
16use crate::db::HoprDb;
17use crate::errors::DbSqlError;
18use crate::info::HoprDbInfoOperations;
19use crate::prelude::HoprDbTicketOperations;
20use crate::{HoprDbGeneralModelOperations, OptTx};
21
22use hopr_parallelize::cpu::spawn_fifo_blocking;
23
24#[cfg(all(feature = "prometheus", not(test)))]
25lazy_static::lazy_static! {
26 static ref METRIC_INCOMING_WIN_PROB: hopr_metrics::SimpleHistogram =
27 hopr_metrics::SimpleHistogram::new(
28 "hopr_tickets_incoming_win_probability",
29 "Observes the winning probabilities on incoming tickets",
30 vec![0.0, 0.0001, 0.001, 0.01, 0.05, 0.1, 0.15, 0.25, 0.3, 0.5],
31 ).unwrap();
32}
33
34#[async_trait]
35impl HoprDbProtocolOperations for HoprDb {
36 #[instrument(level = "trace", skip(self, ack, me), ret)]
37 async fn handle_acknowledgement(&self, ack: Acknowledgement, me: &ChainKeypair) -> Result<AckResult> {
38 let myself = self.clone();
39 let me_ckp = me.clone();
40
41 let result = self
42 .begin_transaction()
43 .await?
44 .perform(|tx| {
45 Box::pin(async move {
46 match myself
47 .caches
48 .unacked_tickets
49 .remove(&ack.ack_challenge())
50 .await
51 .ok_or_else(|| {
52 DbSqlError::AcknowledgementValidationError(format!(
53 "received unexpected acknowledgement for half key challenge {} - half key {}",
54 ack.ack_challenge().to_hex(),
55 ack.ack_key_share.to_hex()
56 ))
57 })? {
58 PendingAcknowledgement::WaitingAsSender => {
59 trace!("received acknowledgement as sender: first relayer has processed the packet");
60
61 Ok(ResolvedAcknowledgement::Sending(ack.ack_challenge()))
62 }
63
64 PendingAcknowledgement::WaitingAsRelayer(unacknowledged) => {
65 if myself
66 .get_channel_by_parties(
67 Some(tx),
68 unacknowledged.ticket.verified_issuer(),
69 &myself.me_onchain,
70 true,
71 )
72 .await?
73 .is_some_and(|c| {
74 c.channel_epoch.as_u32() != unacknowledged.verified_ticket().channel_epoch
75 })
76 {
77 return Err(DbSqlError::LogicalError(format!(
78 "no channel found for address '{}'",
79 unacknowledged.ticket.verified_issuer()
80 )));
81 }
82
83 let domain_separator = myself
84 .get_indexer_data(Some(tx))
85 .await?
86 .channels_dst
87 .ok_or_else(|| DbSqlError::LogicalError("domain separator missing".into()))?;
88
89 hopr_parallelize::cpu::spawn_blocking(move || {
90 let ack_ticket = unacknowledged.acknowledge(&ack.ack_key_share)?;
95
96 if ack_ticket.is_winning(&me_ckp, &domain_separator) {
97 trace!("Found a winning ticket");
98 Ok(ResolvedAcknowledgement::RelayingWin(ack_ticket))
99 } else {
100 trace!("Found a losing ticket");
101 Ok(ResolvedAcknowledgement::RelayingLoss(
102 ack_ticket.ticket.verified_ticket().channel_id,
103 ))
104 }
105 })
106 .await
107 }
108 }
109 })
110 })
111 .await?;
112
113 match &result {
114 ResolvedAcknowledgement::RelayingWin(ack_ticket) => {
115 self.ticket_manager.insert_ticket(ack_ticket.clone()).await?;
116
117 #[cfg(all(feature = "prometheus", not(test)))]
118 {
119 let verified_ticket = ack_ticket.ticket.verified_ticket();
120 let channel = verified_ticket.channel_id.to_string();
121 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS.set(
122 &[&channel, "unredeemed"],
123 self.ticket_manager
124 .unrealized_value(hopr_db_api::tickets::TicketSelector::new(
125 verified_ticket.channel_id,
126 verified_ticket.channel_epoch,
127 ))
128 .await?
129 .amount()
130 .as_u128() as f64,
131 );
132 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
133 .increment(&[&channel, "winning_count"], 1.0f64);
134 }
135 }
136 ResolvedAcknowledgement::RelayingLoss(_channel) => {
137 #[cfg(all(feature = "prometheus", not(test)))]
138 {
139 crate::tickets::METRIC_HOPR_TICKETS_INCOMING_STATISTICS
140 .increment(&[&_channel.to_string(), "losing_count"], 1.0f64);
141 }
142 }
143 _ => {}
144 };
145
146 Ok(result.into())
147 }
148
149 async fn get_network_winning_probability(&self) -> Result<f64> {
150 Ok(self
151 .get_indexer_data(None)
152 .await
153 .map(|data| data.minimum_incoming_ticket_winning_prob)?)
154 }
155
156 async fn get_network_ticket_price(&self) -> Result<Balance> {
157 Ok(self.get_indexer_data(None).await.and_then(|data| {
158 data.ticket_price
159 .ok_or(DbSqlError::LogicalError("missing ticket price".into()))
160 })?)
161 }
162
163 #[tracing::instrument(level = "trace", skip(self, data, me, path))]
164 async fn to_send(
165 &self,
166 data: Box<[u8]>,
167 me: ChainKeypair,
168 path: Vec<OffchainPublicKey>,
169 outgoing_ticket_win_prob: f64,
170 outgoing_ticket_price: Balance,
171 ) -> Result<TransportPacketWithChainData> {
172 let myself = self.clone();
173 let next_peer = myself.resolve_chain_key(&path[0]).await?.ok_or_else(|| {
174 DbSqlError::LogicalError(format!(
175 "failed to find chain key for packet key {} on previous hop",
176 path[0].to_peerid_str()
177 ))
178 })?;
179
180 let components =
181 self.begin_transaction()
182 .await?
183 .perform(|tx| {
184 Box::pin(async move {
185 let domain_separator =
186 myself.get_indexer_data(Some(tx)).await?.channels_dst.ok_or_else(|| {
187 DbSqlError::LogicalError("failed to fetch the domain separator".into())
188 })?;
189
190 let next_ticket = if path.len() == 1 {
192 TicketBuilder::zero_hop().direction(&myself.me_onchain, &next_peer)
193 } else {
194 myself
195 .create_multihop_ticket(
196 Some(tx),
197 me.public().to_address(),
198 next_peer,
199 path.len() as u8,
200 outgoing_ticket_win_prob,
201 outgoing_ticket_price,
202 )
203 .await?
204 };
205
206 spawn_fifo_blocking(move || {
207 ChainPacketComponents::into_outgoing(&data, &path, &me, next_ticket, &domain_separator)
208 .map_err(|e| {
209 DbSqlError::LogicalError(format!(
210 "failed to construct chain components for a packet: {e}"
211 ))
212 })
213 })
214 .await
215 })
216 })
217 .await?;
218
219 match components {
220 ChainPacketComponents::Final { .. } | ChainPacketComponents::Forwarded { .. } => {
221 Err(DbSqlError::LogicalError("Must contain an outgoing packet type".into()).into())
222 }
223 ChainPacketComponents::Outgoing {
224 packet,
225 ticket,
226 next_hop,
227 ack_challenge,
228 } => {
229 self.caches
230 .unacked_tickets
231 .insert(ack_challenge, PendingAcknowledgement::WaitingAsSender)
232 .await;
233
234 let payload = spawn_fifo_blocking(move || {
235 let mut payload = Vec::with_capacity(ChainPacketComponents::SIZE);
236 payload.extend_from_slice(packet.as_ref());
237
238 let ticket_bytes: [u8; Ticket::SIZE] = ticket.into();
239 payload.extend_from_slice(ticket_bytes.as_ref());
240
241 payload.into_boxed_slice()
242 })
243 .await;
244
245 Ok(TransportPacketWithChainData::Outgoing {
246 next_hop,
247 ack_challenge,
248 data: payload,
249 })
250 }
251 }
252 }
253
254 #[tracing::instrument(level = "trace", skip(self, data, me, pkt_keypair, sender), fields(sender = %sender))]
255 async fn from_recv(
256 &self,
257 data: Box<[u8]>,
258 me: ChainKeypair,
259 pkt_keypair: &OffchainKeypair,
260 sender: OffchainPublicKey,
261 outgoing_ticket_win_prob: f64,
262 outgoing_ticket_price: Balance,
263 ) -> Result<TransportPacketWithChainData> {
264 let offchain_keypair = pkt_keypair.clone();
265
266 let packet = spawn_fifo_blocking(move || {
267 ChainPacketComponents::from_incoming(&data, &offchain_keypair, sender)
268 .map_err(|e| DbSqlError::LogicalError(format!("failed to construct an incoming packet: {e}")))
269 })
270 .await?;
271
272 match packet {
273 ChainPacketComponents::Final {
274 packet_tag,
275 ack_key,
276 previous_hop,
277 plain_text,
278 ..
279 } => {
280 let offchain_keypair = pkt_keypair.clone();
281 let ack = spawn_fifo_blocking(move || Acknowledgement::new(ack_key, &offchain_keypair)).await;
282
283 Ok(TransportPacketWithChainData::Final {
284 packet_tag,
285 previous_hop,
286 plain_text,
287 ack,
288 })
289 }
290 ChainPacketComponents::Forwarded {
291 packet,
292 ticket,
293 ack_challenge,
294 packet_tag,
295 ack_key,
296 previous_hop,
297 own_key,
298 next_hop,
299 next_challenge,
300 path_pos,
301 } => {
302 let myself = self.clone();
303 let previous_hop_addr = myself.resolve_chain_key(&previous_hop).await?.ok_or_else(|| {
304 DbSqlError::LogicalError(format!(
305 "failed to find channel key for packet key {} on previous hop",
306 previous_hop.to_peerid_str()
307 ))
308 })?;
309
310 let next_hop_addr = myself.resolve_chain_key(&next_hop).await?.ok_or_else(|| {
311 DbSqlError::LogicalError(format!(
312 "failed to find channel key for packet key {} on next hop",
313 next_hop.to_peerid_str()
314 ))
315 })?;
316
317 let verified_ticket = match self
318 .begin_transaction()
319 .await?
320 .perform(|tx| {
321 Box::pin(async move {
322 let chain_data = myself.get_indexer_data(Some(tx)).await?;
323
324 let channel = myself
325 .get_channel_by_parties(Some(tx), &previous_hop_addr, &myself.me_onchain, true)
326 .await?
327 .ok_or_else(|| {
328 DbSqlError::LogicalError(format!(
329 "no channel found for previous hop address '{previous_hop_addr}'"
330 ))
331 })?;
332
333 let remaining_balance = channel
334 .balance
335 .sub(myself.ticket_manager.unrealized_value((&channel).into()).await?);
336
337 let domain_separator = chain_data.channels_dst.ok_or_else(|| {
338 DbSqlError::LogicalError("failed to fetch the domain separator".into())
339 })?;
340
341 let minimum_ticket_price = chain_data
344 .ticket_price
345 .ok_or_else(|| DbSqlError::LogicalError("failed to fetch the ticket price".into()))?
346 .mul(U256::from(path_pos));
347
348 #[cfg(all(feature = "prometheus", not(test)))]
349 METRIC_INCOMING_WIN_PROB.observe(ticket.win_prob());
350
351 let ticket = spawn_fifo_blocking(move || {
356 validate_unacknowledged_ticket(
357 ticket,
358 &channel,
359 minimum_ticket_price,
360 chain_data.minimum_incoming_ticket_winning_prob,
361 remaining_balance,
362 &domain_separator,
363 )
364 })
365 .await?;
366
367 myself.increment_outgoing_ticket_index(channel.get_id()).await?;
368
369 myself
370 .caches
371 .unacked_tickets
372 .insert(
373 ack_challenge,
374 PendingAcknowledgement::WaitingAsRelayer(
375 ticket.clone().into_unacknowledged(own_key),
376 ),
377 )
378 .await;
379
380 let ticket_builder = if path_pos == 1 {
387 TicketBuilder::zero_hop().direction(&myself.me_onchain, &next_hop_addr)
388 } else {
389 myself
393 .create_multihop_ticket(
394 Some(tx),
395 myself.me_onchain,
396 next_hop_addr,
397 path_pos,
398 outgoing_ticket_win_prob.max(ticket.win_prob()),
399 outgoing_ticket_price,
400 )
401 .await?
402 };
403
404 let ticket = spawn_fifo_blocking(move || {
406 ticket_builder
407 .challenge(next_challenge.to_ethereum_challenge())
408 .build_signed(&me, &domain_separator)
409 })
410 .await?;
411
412 Ok(ticket)
414 })
415 })
416 .await
417 {
418 Ok(ticket) => Ok(ticket),
419 Err(DbSqlError::TicketValidationError(boxed_error)) => {
420 let (rejected_ticket, error) = *boxed_error;
421 let rejected_value = rejected_ticket.amount;
422 warn!(?rejected_ticket, %rejected_value, erorr = ?error, "failure to validate during forwarding");
423
424 self.mark_unsaved_ticket_rejected(&rejected_ticket).await.map_err(|e| {
425 DbSqlError::TicketValidationError(Box::new((
426 rejected_ticket.clone(),
427 format!("during validation error '{error}' update another error occurred: {e}"),
428 )))
429 })?;
430
431 Err(DbSqlError::TicketValidationError(Box::new((rejected_ticket, error))))
432 }
433 Err(e) => Err(e),
434 }?;
435
436 let offchain_keypair = pkt_keypair.clone();
437 let (ack, payload) = spawn_fifo_blocking(move || {
438 let ack = Acknowledgement::new(ack_key, &offchain_keypair);
439
440 let mut payload = Vec::with_capacity(ChainPacketComponents::SIZE);
441 payload.extend_from_slice(packet.as_ref());
442
443 let ticket_bytes = verified_ticket.leak().into_encoded();
444 payload.extend_from_slice(ticket_bytes.as_ref());
445
446 (ack, payload.into_boxed_slice())
447 })
448 .await;
449
450 Ok(TransportPacketWithChainData::Forwarded {
451 packet_tag,
452 previous_hop,
453 next_hop,
454 data: payload,
455 ack,
456 })
457 }
458 ChainPacketComponents::Outgoing { .. } => {
459 Err(DbSqlError::LogicalError("Cannot receive an outgoing packet".into()).into())
460 }
461 }
462 }
463}
464
465impl HoprDb {
466 async fn create_multihop_ticket<'a>(
467 &'a self,
468 tx: OptTx<'a>,
469 me_onchain: Address,
470 destination: Address,
471 current_path_pos: u8,
472 winning_prob: f64,
473 ticket_price: Balance,
474 ) -> crate::errors::Result<TicketBuilder> {
475 let channel = self
476 .get_channel_by_parties(tx, &me_onchain, &destination, true)
477 .await?
478 .ok_or(DbSqlError::LogicalError(format!(
479 "channel to '{destination}' not found",
480 )))?;
481
482 let amount = Balance::new(
484 ticket_price
485 .amount()
486 .mul(U256::from(current_path_pos - 1))
487 .div_f64(winning_prob)
488 .map_err(|e| {
489 DbSqlError::LogicalError(format!(
490 "winning probability outside of the allowed interval (0.0, 1.0]: {e}"
491 ))
492 })?,
493 BalanceType::HOPR,
494 );
495
496 if channel.balance.lt(&amount) {
497 return Err(DbSqlError::LogicalError(format!(
498 "out of funds: {} with counterparty {destination} has balance {} < {amount}",
499 channel.get_id(),
500 channel.balance
501 )));
502 }
503
504 let ticket_builder = TicketBuilder::default()
505 .direction(&me_onchain, &destination)
506 .balance(amount)
507 .index(self.increment_outgoing_ticket_index(channel.get_id()).await?)
508 .index_offset(1) .win_prob(winning_prob)
510 .channel_epoch(channel.channel_epoch.as_u32());
511
512 Ok(ticket_builder)
513 }
514}