1use std::sync::{Arc, atomic::AtomicU64};
2
3use futures::StreamExt;
4use hopr_api::{
5 chain::ChainReadChannelOperations,
6 db::{HoprDbTicketOperations, TicketSelector},
7};
8use hopr_crypto_types::prelude::*;
9use hopr_internal_types::prelude::*;
10#[cfg(feature = "rayon")]
11use hopr_parallelize::cpu::rayon::prelude::*;
12use hopr_primitive_types::balance::HoprBalance;
13use validator::ValidationError;
14
15use crate::{
16 HoprProtocolError, ResolvedAcknowledgement, TicketAcknowledgementError, TicketTracker,
17 UnacknowledgedTicketProcessor,
18};
19
20mod metrics {
28 #[cfg(any(not(feature = "prometheus"), test))]
29 pub use noop::*;
30 #[cfg(all(feature = "prometheus", not(test)))]
31 pub use real::*;
32
33 #[cfg(all(feature = "prometheus", not(test)))]
34 mod real {
35 lazy_static::lazy_static! {
36 static ref PER_PEER_ENABLED: bool = std::env::var("HOPR_METRICS_UNACK_PER_PEER")
38 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
39 .unwrap_or(false);
40
41 static ref UNACK_PEERS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
42 "hopr_tickets_unack_peers_total",
43 "Number of peers with unacknowledged tickets in cache",
44 )
45 .unwrap();
46 static ref UNACK_TICKETS: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
47 "hopr_tickets_unack_tickets_total",
48 "Total number of unacknowledged tickets across all peer caches",
49 )
50 .unwrap();
51 static ref UNACK_INSERTIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
52 "hopr_tickets_unack_insertions_total",
53 "Total number of unacknowledged tickets inserted into cache",
54 )
55 .unwrap();
56 static ref UNACK_LOOKUPS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
57 "hopr_tickets_unack_lookups_total",
58 "Total number of ticket acknowledgement lookups",
59 )
60 .unwrap();
61 static ref UNACK_LOOKUP_MISSES: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
62 "hopr_tickets_unack_lookup_misses_total",
63 "Total number of ticket lookup failures (unknown ticket)",
64 )
65 .unwrap();
66 static ref UNACK_EVICTIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
67 "hopr_tickets_unack_evictions_total",
68 "Total number of unacknowledged tickets evicted from cache",
69 )
70 .unwrap();
71 static ref UNACK_PEER_EVICTIONS: hopr_metrics::SimpleCounter = hopr_metrics::SimpleCounter::new(
72 "hopr_tickets_unack_peer_evictions_total",
73 "Total number of peer caches evicted from the outer unacknowledged ticket cache",
74 )
75 .unwrap();
76 static ref UNACK_TICKETS_PER_PEER: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
77 "hopr_tickets_unack_tickets_per_peer",
78 "Number of unacknowledged tickets per peer in cache (enable with HOPR_METRICS_UNACK_PER_PEER=1)",
79 &["peer"],
80 )
81 .unwrap();
82 }
83
84 pub fn initialize() {
85 lazy_static::initialize(&PER_PEER_ENABLED);
86 lazy_static::initialize(&UNACK_PEERS);
87 lazy_static::initialize(&UNACK_TICKETS);
88 lazy_static::initialize(&UNACK_INSERTIONS);
89 lazy_static::initialize(&UNACK_LOOKUPS);
90 lazy_static::initialize(&UNACK_LOOKUP_MISSES);
91 lazy_static::initialize(&UNACK_EVICTIONS);
92 lazy_static::initialize(&UNACK_PEER_EVICTIONS);
93 if *PER_PEER_ENABLED {
94 lazy_static::initialize(&UNACK_TICKETS_PER_PEER);
95 }
96 }
97
98 #[inline]
99 #[allow(dead_code)]
100 pub fn per_peer_enabled() -> bool {
101 *PER_PEER_ENABLED
102 }
103
104 #[inline]
105 pub fn inc_unack_peers() {
106 UNACK_PEERS.increment(1.0);
107 }
108
109 #[inline]
110 pub fn dec_unack_peers() {
111 UNACK_PEERS.decrement(1.0);
112 }
113
114 #[inline]
115 pub fn inc_unack_tickets() {
116 UNACK_TICKETS.increment(1.0);
117 }
118
119 #[inline]
120 pub fn dec_unack_tickets() {
121 UNACK_TICKETS.decrement(1.0);
122 }
123
124 #[inline]
125 pub fn inc_insertions() {
126 UNACK_INSERTIONS.increment();
127 }
128
129 #[inline]
130 pub fn inc_lookups() {
131 UNACK_LOOKUPS.increment();
132 }
133
134 #[inline]
135 pub fn inc_lookup_misses() {
136 UNACK_LOOKUP_MISSES.increment();
137 }
138
139 #[inline]
140 pub fn inc_evictions() {
141 UNACK_EVICTIONS.increment();
142 }
143
144 #[inline]
145 pub fn inc_peer_evictions() {
146 UNACK_PEER_EVICTIONS.increment();
147 }
148
149 #[inline]
150 pub fn inc_unack_tickets_for_peer(peer: &str) {
151 if *PER_PEER_ENABLED {
152 UNACK_TICKETS_PER_PEER.increment(&[peer], 1.0);
153 }
154 }
155
156 #[inline]
157 pub fn dec_unack_tickets_for_peer(peer: &str) {
158 if *PER_PEER_ENABLED {
159 UNACK_TICKETS_PER_PEER.decrement(&[peer], 1.0);
160 }
161 }
162
163 #[inline]
164 #[allow(dead_code)]
165 pub fn reset_unack_tickets_for_peer(peer: &str) {
166 if *PER_PEER_ENABLED {
167 UNACK_TICKETS_PER_PEER.set(&[peer], 0.0);
168 }
169 }
170 }
171
172 #[cfg(any(not(feature = "prometheus"), test))]
173 mod noop {
174 #[inline]
175 pub fn initialize() {}
176 #[inline]
177 #[allow(dead_code)]
178 pub fn per_peer_enabled() -> bool {
179 false
180 }
181 #[inline]
182 pub fn inc_unack_peers() {}
183 #[inline]
184 pub fn dec_unack_peers() {}
185 #[inline]
186 pub fn inc_unack_tickets() {}
187 #[inline]
188 pub fn dec_unack_tickets() {}
189 #[inline]
190 pub fn inc_insertions() {}
191 #[inline]
192 pub fn inc_lookups() {}
193 #[inline]
194 pub fn inc_lookup_misses() {}
195 #[inline]
196 pub fn inc_evictions() {}
197 #[inline]
198 pub fn inc_peer_evictions() {}
199 #[inline]
200 pub fn inc_unack_tickets_for_peer(_: &str) {}
201 #[inline]
202 pub fn dec_unack_tickets_for_peer(_: &str) {}
203 #[inline]
204 #[allow(dead_code)]
205 pub fn reset_unack_tickets_for_peer(_: &str) {}
206 }
207}
208
209const MIN_UNACK_TICKET_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
210fn validate_unack_ticket_timeout(timeout: &std::time::Duration) -> Result<(), ValidationError> {
211 if timeout < &MIN_UNACK_TICKET_TIMEOUT {
212 Err(ValidationError::new("unack_ticket_timeout too low"))
213 } else {
214 Ok(())
215 }
216}
217
218const MIN_OUTGOING_INDEX_RETENTION: std::time::Duration = std::time::Duration::from_secs(10);
219
220fn validate_outgoing_index_retention(retention: &std::time::Duration) -> Result<(), ValidationError> {
221 if retention < &MIN_OUTGOING_INDEX_RETENTION {
222 Err(ValidationError::new("outgoing_index_cache_retention too low"))
223 } else {
224 Ok(())
225 }
226}
227
228fn default_outgoing_index_retention() -> std::time::Duration {
229 std::time::Duration::from_secs(10)
230}
231
232fn default_unack_ticket_timeout() -> std::time::Duration {
233 std::time::Duration::from_secs(30)
234}
235
236fn default_max_unack_tickets() -> usize {
237 10_000_000
238}
239
240fn just_true() -> bool {
241 true
242}
243
244#[derive(Debug, Clone, Copy, smart_default::SmartDefault, PartialEq, validator::Validate)]
247#[cfg_attr(
248 feature = "serde",
249 derive(serde::Deserialize, serde::Serialize),
250 serde(deny_unknown_fields)
251)]
252pub struct HoprTicketProcessorConfig {
253 #[default(default_unack_ticket_timeout())]
259 #[validate(custom(function = "validate_unack_ticket_timeout"))]
260 #[cfg_attr(
261 feature = "serde",
262 serde(default = "default_unack_ticket_timeout", with = "humantime_serde")
263 )]
264 pub unack_ticket_timeout: std::time::Duration,
265 #[default(default_max_unack_tickets())]
271 #[validate(range(min = 100))]
272 #[cfg_attr(feature = "serde", serde(default = "default_max_unack_tickets"))]
273 pub max_unack_tickets: usize,
274 #[default(default_outgoing_index_retention())]
278 #[validate(custom(function = "validate_outgoing_index_retention"))]
279 #[cfg_attr(
280 feature = "serde",
281 serde(default = "default_outgoing_index_retention", with = "humantime_serde")
282 )]
283 pub outgoing_index_cache_retention: std::time::Duration,
284 #[default(just_true())]
290 #[cfg_attr(feature = "serde", serde(default = "just_true"))]
291 pub use_batch_verification: bool,
292}
293
294#[derive(Clone)]
296pub struct HoprTicketProcessor<Chain, Db> {
297 unacknowledged_tickets:
298 moka::future::Cache<OffchainPublicKey, moka::future::Cache<HalfKeyChallenge, UnacknowledgedTicket>>,
299 out_ticket_index: moka::future::Cache<(ChannelId, u32), Arc<AtomicU64>>,
300 db: Db,
301 chain_api: Chain,
302 chain_key: ChainKeypair,
303 channels_dst: Hash,
304 cfg: HoprTicketProcessorConfig,
305}
306
307impl<Chain, Db> HoprTicketProcessor<Chain, Db> {
308 pub fn new(
310 chain_api: Chain,
311 db: Db,
312 chain_key: ChainKeypair,
313 channels_dst: Hash,
314 cfg: HoprTicketProcessorConfig,
315 ) -> Self {
316 metrics::initialize();
317
318 Self {
319 out_ticket_index: moka::future::Cache::builder()
320 .time_to_idle(cfg.outgoing_index_cache_retention)
321 .max_capacity(10_000)
322 .build(),
323 unacknowledged_tickets: moka::future::Cache::builder()
324 .time_to_idle(cfg.unack_ticket_timeout)
325 .max_capacity(100_000)
326 .async_eviction_listener(
327 |_key,
328 value: moka::future::Cache<HalfKeyChallenge, UnacknowledgedTicket>,
329 cause|
330 -> moka::notification::ListenerFuture {
331 Box::pin(async move {
332 if !matches!(cause, moka::notification::RemovalCause::Replaced) {
333 metrics::dec_unack_peers();
334 if matches!(
335 cause,
336 moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size
337 ) {
338 metrics::inc_peer_evictions();
339 }
340 }
341 value.invalidate_all();
345 value.run_pending_tasks().await;
346 })
347 },
348 )
349 .build(),
350 chain_api,
351 db,
352 chain_key,
353 channels_dst,
354 cfg,
355 }
356 }
357}
358
359impl<Chain, Db> HoprTicketProcessor<Chain, Db>
360where
361 Db: HoprDbTicketOperations + Clone + Send + 'static,
362{
363 pub fn outgoing_index_sync_task(
369 &self,
370 reg: futures::future::AbortRegistration,
371 ) -> impl Future<Output = ()> + use<Db, Chain> {
372 let index_save_stream = futures::stream::Abortable::new(
373 futures_time::stream::interval(futures_time::time::Duration::from(
374 self.cfg.outgoing_index_cache_retention.div_f32(2.0),
375 )),
376 reg,
377 );
378
379 let db = self.db.clone();
380 let out_ticket_index = self.out_ticket_index.clone();
381
382 index_save_stream
383 .for_each(move |_| {
384 let db = db.clone();
385 let out_ticket_index = out_ticket_index.clone();
386 async move {
387 for (channel_key, out_idx) in out_ticket_index.iter() {
390 if let Err(error) = db
391 .update_outgoing_ticket_index(
392 &channel_key.0,
393 channel_key.1,
394 out_idx.load(std::sync::atomic::Ordering::SeqCst),
395 )
396 .await
397 {
398 tracing::error!(%error, channel_id = %channel_key.0, epoch = channel_key.1, "failed to sync outgoing ticket index to db");
399 }
400 }
401 tracing::trace!("synced outgoing ticket indices to db");
402 }
403 })
404 }
405}
406
407#[async_trait::async_trait]
408impl<Chain, Db> UnacknowledgedTicketProcessor for HoprTicketProcessor<Chain, Db>
409where
410 Chain: ChainReadChannelOperations + Send + Sync,
411 Db: Send + Sync,
412{
413 type Error = HoprProtocolError;
414
415 #[tracing::instrument(skip(self, next_hop, challenge, ticket), level = "trace", fields(next_hop = next_hop.to_peerid_str()))]
416 async fn insert_unacknowledged_ticket(
417 &self,
418 next_hop: &OffchainPublicKey,
419 challenge: HalfKeyChallenge,
420 ticket: UnacknowledgedTicket,
421 ) -> Result<(), Self::Error> {
422 tracing::trace!(%ticket, "received unacknowledged ticket");
423
424 let peer_id = next_hop.to_peerid_str();
425 let inner_cache = self
426 .unacknowledged_tickets
427 .get_with_by_ref(next_hop, async {
428 let peer_id_for_eviction = peer_id.clone();
429 metrics::inc_unack_peers();
430 moka::future::Cache::builder()
431 .time_to_live(self.cfg.unack_ticket_timeout)
432 .max_capacity(self.cfg.max_unack_tickets as u64)
433 .eviction_listener(move |_key, _value, cause| {
434 metrics::dec_unack_tickets();
435 metrics::dec_unack_tickets_for_peer(&peer_id_for_eviction);
436
437 if matches!(
439 cause,
440 moka::notification::RemovalCause::Expired | moka::notification::RemovalCause::Size
441 ) {
442 metrics::inc_evictions();
443 }
444 })
445 .build()
446 })
447 .await;
448
449 inner_cache.insert(challenge, ticket).await;
450
451 metrics::inc_insertions();
452 metrics::inc_unack_tickets();
453 metrics::inc_unack_tickets_for_peer(&peer_id);
454
455 Ok(())
456 }
457
458 #[tracing::instrument(skip_all, level = "trace", fields(peer = peer.to_peerid_str()))]
459 async fn acknowledge_tickets(
460 &self,
461 peer: OffchainPublicKey,
462 acks: Vec<Acknowledgement>,
463 ) -> Result<Vec<ResolvedAcknowledgement>, TicketAcknowledgementError<Self::Error>> {
464 if !self.unacknowledged_tickets.contains_key(&peer) {
469 tracing::trace!("not awaiting any acknowledgement from peer");
470 return Err(TicketAcknowledgementError::UnexpectedAcknowledgement);
471 }
472 let Some(awaiting_ack_from_peer) = self.unacknowledged_tickets.get(&peer).await else {
473 tracing::trace!("not awaiting any acknowledgement from peer");
474 return Err(TicketAcknowledgementError::UnexpectedAcknowledgement);
475 };
476
477 let use_batch_verify = self.cfg.use_batch_verification;
479 let half_keys_challenges = hopr_parallelize::cpu::spawn_fifo_blocking(
480 move || {
481 if use_batch_verify {
482 let acks = Acknowledgement::verify_batch(acks.into_iter().map(|ack| (peer, ack)));
485
486 #[cfg(feature = "rayon")]
487 let iter = acks.into_par_iter();
488
489 #[cfg(not(feature = "rayon"))]
490 let iter = acks.into_iter();
491
492 iter.map(|verified| {
493 verified.and_then(|verified| {
494 Ok((*verified.ack_key_share(), verified.ack_key_share().to_challenge()?))
495 })
496 })
497 .filter_map(|res| {
498 res.inspect_err(|error| tracing::error!(%error, "failed to process acknowledgement"))
499 .ok()
500 })
501 .collect::<Vec<_>>()
502 } else {
503 #[cfg(feature = "rayon")]
504 let iter = acks.into_par_iter();
505
506 #[cfg(not(feature = "rayon"))]
507 let iter = acks.into_iter();
508
509 iter.map(|ack| {
510 ack.verify(&peer).and_then(|verified| {
511 Ok((*verified.ack_key_share(), verified.ack_key_share().to_challenge()?))
512 })
513 })
514 .filter_map(|res| {
515 res.inspect_err(|error| tracing::error!(%error, "failed to process acknowledgement"))
516 .ok()
517 })
518 .collect::<Vec<_>>()
519 }
520 },
521 "ack_verify",
522 )
523 .await
524 .map_err(|e| TicketAcknowledgementError::Inner(HoprProtocolError::from(e)))?;
525
526 let mut unack_tickets = Vec::with_capacity(half_keys_challenges.len());
528 for (half_key, challenge) in half_keys_challenges {
529 metrics::inc_lookups();
530
531 let Some(unack_ticket) = awaiting_ack_from_peer.remove(&challenge).await else {
532 tracing::trace!(%challenge, "received acknowledgement for unknown ticket");
533 metrics::inc_lookup_misses();
534 continue;
535 };
536
537 let issuer_channel = match self
538 .chain_api
539 .channel_by_parties(unack_ticket.ticket.verified_issuer(), self.chain_key.as_ref())
540 .await
541 {
542 Ok(Some(channel)) => {
543 if channel.channel_epoch != unack_ticket.verified_ticket().channel_epoch {
544 tracing::error!(%unack_ticket, "received acknowledgement for ticket issued in a different epoch");
545 continue;
546 }
547 channel
548 }
549 Ok(None) => {
550 tracing::error!(%unack_ticket, "received acknowledgement for ticket issued for unknown channel");
551 continue;
552 }
553 Err(error) => {
554 tracing::error!(%error, %unack_ticket, "failed to resolve channel for unacknowledged ticket");
555 continue;
556 }
557 };
558
559 unack_tickets.push((issuer_channel, half_key, unack_ticket));
560 }
561
562 let domain_separator = self.channels_dst;
563 let chain_key = self.chain_key.clone();
564 Ok(hopr_parallelize::cpu::spawn_fifo_blocking(
565 move || {
566 #[cfg(feature = "rayon")]
567 let iter = unack_tickets.into_par_iter();
568
569 #[cfg(not(feature = "rayon"))]
570 let iter = unack_tickets.into_iter();
571
572 iter.filter_map(|(channel, half_key, unack_ticket)| {
573 let Ok(ack_ticket) = unack_ticket.acknowledge(&half_key) else {
579 tracing::error!(%unack_ticket, "failed to acknowledge ticket");
580 return None;
581 };
582
583 match ack_ticket.into_redeemable(&chain_key, &domain_separator) {
586 Ok(redeemable) => {
587 tracing::trace!(%channel, "found winning ticket");
588 Some(ResolvedAcknowledgement::RelayingWin(Box::new(redeemable)))
589 }
590 Err(CoreTypesError::TicketNotWinning) => {
591 tracing::trace!(%channel, "found losing ticket");
592 Some(ResolvedAcknowledgement::RelayingLoss(*channel.get_id()))
593 }
594 Err(error) => {
595 tracing::error!(%error, %channel, "error when acknowledging ticket");
596 Some(ResolvedAcknowledgement::RelayingLoss(*channel.get_id()))
597 }
598 }
599 })
600 .collect::<Vec<_>>()
601 },
602 "ticket_into_redeemable",
603 )
604 .await
605 .map_err(|e| TicketAcknowledgementError::Inner(HoprProtocolError::from(e)))?)
606 }
607}
608
609#[async_trait::async_trait]
610impl<Chain, Db> TicketTracker for HoprTicketProcessor<Chain, Db>
611where
612 Chain: Send + Sync,
613 Db: HoprDbTicketOperations + Clone + Send + Sync + 'static,
614{
615 type Error = Arc<Db::Error>;
616
617 async fn next_outgoing_ticket_index(&self, channel: &ChannelEntry) -> Result<u64, Self::Error> {
618 let channel_id = *channel.get_id();
619 let epoch = channel.channel_epoch;
620 let current_idx = channel.ticket_index;
621 self.out_ticket_index
622 .try_get_with((channel_id, epoch), async {
623 self.db
624 .get_or_create_outgoing_ticket_index(&channel_id, epoch, current_idx)
625 .await
626 .map(|maybe_idx| Arc::new(AtomicU64::new(maybe_idx.unwrap_or_default())))
627 })
628 .await
629 .map(|idx| idx.fetch_add(1, std::sync::atomic::Ordering::SeqCst))
630 }
631
632 async fn incoming_channel_unrealized_balance(
633 &self,
634 channel_id: &ChannelId,
635 epoch: u32,
636 index: u64,
637 ) -> Result<HoprBalance, Self::Error> {
638 self.db
643 .get_tickets_value(TicketSelector::new(*channel_id, epoch).with_index_range(index..))
644 .await
645 .map_err(Into::into)
646 }
647}
648
649#[cfg(test)]
650mod tests {
651 use hopr_crypto_random::Randomizable;
652
653 use super::*;
654 use crate::utils::*;
655
656 #[tokio::test]
657 async fn ticket_processor_should_acknowledge_previously_inserted_tickets() -> anyhow::Result<()> {
658 let blokli_client = create_blokli_client()?;
659
660 let node = create_node(1, &blokli_client).await?;
661
662 let ticket_processor = HoprTicketProcessor::new(
663 node.chain_api.clone(),
664 node.node_db.clone(),
665 node.chain_key.clone(),
666 Hash::default(),
667 HoprTicketProcessorConfig::default(),
668 );
669
670 const NUM_TICKETS: usize = 5;
671
672 let mut acks = Vec::with_capacity(5);
673 for index in 0..NUM_TICKETS {
674 let own_share = HalfKey::random();
675 let ack_share = HalfKey::random();
676 let challenge = Challenge::from_own_share_and_half_key(&own_share.to_challenge()?, &ack_share)?;
677
678 let unack_ticket = TicketBuilder::default()
679 .counterparty(&PEERS[1].0)
680 .index(index as u64)
681 .channel_epoch(1)
682 .amount(10_u32)
683 .challenge(challenge)
684 .build_signed(&PEERS[0].0, &Hash::default())?
685 .into_unacknowledged(own_share);
686
687 ticket_processor
688 .insert_unacknowledged_ticket(PEERS[2].1.public(), ack_share.to_challenge()?, unack_ticket)
689 .await?;
690
691 acks.push(VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak());
692 }
693
694 let resolutions = ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), acks).await?;
695 assert_eq!(NUM_TICKETS, resolutions.len());
696 assert!(
697 resolutions
698 .iter()
699 .all(|res| matches!(res, ResolvedAcknowledgement::RelayingWin(_)))
700 );
701
702 Ok(())
703 }
704
705 #[tokio::test]
706 async fn ticket_processor_should_reject_acknowledgements_from_unexpected_sender() -> anyhow::Result<()> {
707 let blokli_client = create_blokli_client()?;
708
709 let node = create_node(1, &blokli_client).await?;
710
711 let ticket_processor = HoprTicketProcessor::new(
712 node.chain_api.clone(),
713 node.node_db.clone(),
714 node.chain_key.clone(),
715 Hash::default(),
716 HoprTicketProcessorConfig::default(),
717 );
718
719 const NUM_ACKS: usize = 5;
720
721 let mut acks = Vec::with_capacity(5);
722 for _ in 0..NUM_ACKS {
723 let ack_share = HalfKey::random();
724 acks.push(VerifiedAcknowledgement::new(ack_share, &PEERS[2].1).leak());
725 }
726
727 assert!(matches!(
728 ticket_processor.acknowledge_tickets(*PEERS[2].1.public(), acks).await,
729 Err(TicketAcknowledgementError::UnexpectedAcknowledgement)
730 ));
731
732 Ok(())
733 }
734}