Skip to main content

hopr_ct_full_network/
discovery.rs

1use futures::{StreamExt, stream::BoxStream};
2use hopr_api::{
3    ct::{CoverTrafficGeneration, ProbeRouting, ProbingTrafficGeneration},
4    graph::{
5        NetworkGraphTraverse, NetworkGraphView,
6        traits::{EdgeNetworkObservableRead, EdgeObservableRead},
7    },
8    types::{
9        crypto::types::OffchainPublicKey,
10        crypto_random::Randomizable,
11        internal::{
12            NodeId,
13            protocol::HoprPseudonym,
14            routing::{DestinationRouting, PathId, RoutingOptions},
15        },
16    },
17};
18use hopr_statistics::WeightedCollection;
19
20use crate::{ProberConfig, priority::immediate_probe_priority};
21
22pub struct FullNetworkDiscovery<U> {
23    me: OffchainPublicKey,
24    cfg: ProberConfig,
25    graph: U,
26}
27
28impl<U> FullNetworkDiscovery<U> {
29    pub fn new(me: OffchainPublicKey, cfg: ProberConfig, graph: U) -> Self {
30        Self { me, cfg, graph }
31    }
32}
33
34/// Strip the leading source and trailing destination from a loopback path.
35///
36/// `simple_loopback_to_self` returns `[me, intermediates..., me]`.  The caller
37/// adds `me` as the destination and the planner prepends `me` as source, so
38/// only the interior intermediate nodes are needed.
39fn strip_loopback_endpoints(mut path: Vec<OffchainPublicKey>, me: &OffchainPublicKey) -> Vec<OffchainPublicKey> {
40    if path.last() == Some(me) {
41        path.pop();
42    }
43    if path.first() == Some(me) {
44        path.remove(0);
45    }
46    path
47}
48
49/// Build a `DestinationRouting::Forward` with a random pseudonym and an explicit intermediate path.
50fn loopback_routing(me: NodeId, path: Vec<OffchainPublicKey>) -> Option<DestinationRouting> {
51    let path: Vec<NodeId> = path.into_iter().map(NodeId::from).collect();
52    hopr_api::network::BoundedVec::try_from(path)
53        .ok()
54        .map(|path| DestinationRouting::Forward {
55            destination: Box::new(me),
56            pseudonym: Some(HoprPseudonym::random()),
57            forward_options: RoutingOptions::IntermediatePath(path),
58            return_options: None,
59        })
60}
61
62/// Stream that cycles through 1-, 2-, and 3-hop loopback paths with weighted shuffle.
63///
64/// Shared by both cover traffic and intermediate probing — the caller wraps each
65/// emitted item into the appropriate outer type.
66fn loopback_path_stream<U>(cfg: ProberConfig, graph: U) -> impl futures::Stream<Item = (Vec<OffchainPublicKey>, PathId)>
67where
68    U: NetworkGraphTraverse<NodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
69{
70    // 2, 3, 4 edges → 1-, 2-, 3-hops in the HOPR protocol
71    futures_time::stream::interval(futures_time::time::Duration::from(cfg.interval))
72        .flat_map(|_| futures::stream::iter([2usize, 3, 4]))
73        .filter_map(move |edge_count| futures::future::ready(std::num::NonZeroUsize::new(edge_count)))
74        .flat_map(move |edge_count| {
75            let paths = graph.simple_loopback_to_self(edge_count.get(), Some(100));
76
77            let count = paths.len();
78            tracing::debug!(edge_count = edge_count.get(), count, "loopback path candidates");
79            let weighted: Vec<_> = paths
80                .into_iter()
81                .map(|(path, path_id)| ((path, path_id), cfg.base_priority))
82                .collect();
83
84            futures::stream::iter(WeightedCollection::new(weighted).into_shuffled())
85        })
86}
87
88impl<U> CoverTrafficGeneration for FullNetworkDiscovery<U>
89where
90    U: NetworkGraphTraverse<NodeId = OffchainPublicKey> + Clone + Send + Sync + 'static,
91{
92    fn build(&self) -> BoxStream<'static, DestinationRouting> {
93        cfg_if::cfg_if! {
94            if #[cfg(feature = "noise")] {
95                let me = self.me;
96                let me_node: NodeId = me.into();
97
98                loopback_path_stream(self.cfg, self.graph.clone())
99                    .filter_map(move |(path, _)| {
100                        let intermediates = strip_loopback_endpoints(path, &me);
101                        futures::future::ready(loopback_routing(me_node, intermediates))
102                    })
103                    .boxed()
104            } else {
105                Box::pin(futures::stream::empty())
106            }
107        }
108    }
109}
110
111impl<U> ProbingTrafficGeneration for FullNetworkDiscovery<U>
112where
113    U: NetworkGraphView<NodeId = OffchainPublicKey, Observed = hopr_network_graph::Observations>
114        + NetworkGraphTraverse<NodeId = OffchainPublicKey>
115        + Clone
116        + Send
117        + Sync
118        + 'static,
119{
120    fn build(&self) -> BoxStream<'static, ProbeRouting> {
121        let cfg = self.cfg;
122        let me = self.me;
123
124        let immediates = immediate_probe_stream(me, cfg, self.graph.clone());
125
126        let me_node: NodeId = me.into();
127        let intermediates = loopback_path_stream(cfg, self.graph.clone()).filter_map(move |(path, path_id)| {
128            let intermediates = strip_loopback_endpoints(path, &me);
129            let routing = loopback_routing(me_node, intermediates).map(|r| ProbeRouting::Looping((r, path_id)));
130            futures::future::ready(routing)
131        });
132
133        futures::stream::select(immediates, intermediates).boxed()
134    }
135}
136
137/// Cached weighted-shuffle batch with a creation timestamp for TTL checks.
138struct ShuffleCache {
139    probes: Vec<ProbeRouting>,
140    created_at: std::time::Instant,
141}
142
143/// Stream of neighbor probes emitted in bursts per tick.
144///
145/// Each tick emits the entire cached batch. The cache is recomputed when it is
146/// empty (all probes drained) or when `cfg.shuffle_ttl` has expired, whichever
147/// comes first. This avoids re-traversing the graph every tick while still
148/// emitting all peers in each burst.
149///
150/// When `cfg.probe_connected_only` is `true`, only peers with a
151/// `Connected(true)` edge from `me` are included. This assumes that a
152/// background discovery mechanism (e.g. libp2p identify/kademlia) has
153/// already established connections and recorded them in the graph.
154fn immediate_probe_stream<U>(
155    me: OffchainPublicKey,
156    cfg: ProberConfig,
157    graph: U,
158) -> impl futures::Stream<Item = ProbeRouting>
159where
160    U: NetworkGraphView<NodeId = OffchainPublicKey, Observed = hopr_network_graph::Observations>
161        + Clone
162        + Send
163        + Sync
164        + 'static,
165{
166    let cache: Option<ShuffleCache> = None;
167
168    futures::stream::unfold(
169        (
170            cache,
171            futures_time::stream::interval(futures_time::time::Duration::from(cfg.interval)),
172        ),
173        move |(mut cache, mut ticker)| {
174            let graph = graph.clone();
175
176            async move {
177                use futures::StreamExt as _;
178                ticker.next().await?;
179
180                // Reuse cached shuffle if still within TTL.
181                let needs_refresh = cache
182                    .as_ref()
183                    .is_none_or(|c| c.probes.is_empty() || c.created_at.elapsed() >= cfg.shuffle_ttl);
184
185                if needs_refresh {
186                    let now = std::time::SystemTime::now()
187                        .duration_since(std::time::UNIX_EPOCH)
188                        .unwrap_or_default();
189
190                    let weighted: Vec<_> = graph
191                        .nodes()
192                        .filter(|peer| futures::future::ready(peer != &me))
193                        .filter_map(|peer| {
194                            let obs = graph.edge(&me, &peer);
195                            if cfg.probe_connected_only {
196                                let connected = obs
197                                    .as_ref()
198                                    .and_then(|o| o.immediate_qos())
199                                    .map(|imm| imm.is_connected())
200                                    .unwrap_or(false);
201                                if !connected {
202                                    return futures::future::ready(None);
203                                }
204                            }
205                            let priority = match obs {
206                                Some(obs) => immediate_probe_priority(obs.score(), obs.last_update(), now, &cfg),
207                                None => immediate_probe_priority(0.0, std::time::Duration::ZERO, now, &cfg),
208                            };
209                            futures::future::ready(Some((peer, priority)))
210                        })
211                        .collect()
212                        .await;
213
214                    let peer_count = weighted.len();
215                    let zero_hop = RoutingOptions::Hops(0.try_into().expect("0 is a valid u8"));
216                    let probes: Vec<_> = WeightedCollection::new(weighted)
217                        .into_shuffled()
218                        .into_iter()
219                        .map(|peer| {
220                            ProbeRouting::Neighbor(DestinationRouting::Forward {
221                                destination: Box::new(peer.into()),
222                                pseudonym: Some(HoprPseudonym::random()),
223                                forward_options: zero_hop.clone(),
224                                return_options: Some(zero_hop.clone()),
225                            })
226                        })
227                        .collect();
228
229                    tracing::debug!(peer_count, probes = probes.len(), "computed new neighbor probe shuffle");
230                    cache = Some(ShuffleCache {
231                        probes,
232                        created_at: std::time::Instant::now(),
233                    });
234                }
235
236                let batch = cache.as_ref().map(|c| c.probes.clone()).unwrap_or_default();
237                tracing::debug!(probes = batch.len(), "emitting neighbor probe batch");
238
239                Some((futures::stream::iter(batch), (cache, ticker)))
240            }
241        },
242    )
243    .flatten()
244}
245
246#[cfg(test)]
247mod tests {
248    use std::{collections::HashSet, sync::Arc};
249
250    use futures::{StreamExt, pin_mut};
251    use hopr_api::{
252        OffchainKeypair,
253        ct::{ProbeRouting, ProbingTrafficGeneration},
254        graph::{NetworkGraphUpdate, NetworkGraphWrite},
255        types::{crypto::keypairs::Keypair, internal::NodeId},
256    };
257    use hopr_network_graph::ChannelGraph;
258    use tokio::time::timeout;
259
260    use super::*;
261
262    const TINY_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(20);
263
264    fn fast_cfg() -> ProberConfig {
265        ProberConfig {
266            interval: std::time::Duration::from_millis(1),
267            shuffle_ttl: std::time::Duration::ZERO,
268            probe_connected_only: false,
269            ..Default::default()
270        }
271    }
272
273    fn random_key() -> OffchainPublicKey {
274        *OffchainKeypair::random().public()
275    }
276
277    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
278    struct Node {
279        pub id: OffchainPublicKey,
280    }
281
282    impl From<Node> for OffchainPublicKey {
283        fn from(node: Node) -> Self {
284            node.id
285        }
286    }
287
288    lazy_static::lazy_static! {
289        static ref RANDOM_PEERS: HashSet<Node> = (1..10).map(|_| {
290            Node {
291                id: OffchainPublicKey::from_privkey(&hopr_api::types::crypto_random::random_bytes::<32>()).unwrap(),
292            }
293        }).collect::<HashSet<_>>();
294    }
295
296    #[tokio::test]
297    async fn peers_should_not_be_passed_if_none_are_present() -> anyhow::Result<()> {
298        let me = random_key();
299        let prober = FullNetworkDiscovery::new(me, Default::default(), Arc::new(ChannelGraph::new(me)));
300        let stream = ProbingTrafficGeneration::build(&prober);
301        pin_mut!(stream);
302
303        assert!(timeout(TINY_TIMEOUT, stream.next()).await.is_err());
304        Ok(())
305    }
306
307    #[tokio::test]
308    async fn peers_should_have_randomized_order() -> anyhow::Result<()> {
309        let me = random_key();
310        let graph = Arc::new(ChannelGraph::new(me));
311        for node in RANDOM_PEERS.iter() {
312            graph.record_node(node.clone());
313        }
314
315        let peer_count = RANDOM_PEERS.len();
316        let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
317        let stream = ProbingTrafficGeneration::build(&prober);
318        pin_mut!(stream);
319
320        let extract_peer = |routing: ProbeRouting| -> OffchainPublicKey {
321            match routing {
322                ProbeRouting::Neighbor(DestinationRouting::Forward { destination, .. }) => {
323                    if let NodeId::Offchain(peer_key) = destination.as_ref() {
324                        *peer_key
325                    } else {
326                        panic!("expected offchain destination");
327                    }
328                }
329                _ => panic!("expected Neighbor Forward routing"),
330            }
331        };
332
333        // Collect two full rounds from the stream.
334        let both_rounds: Vec<OffchainPublicKey> = timeout(
335            TINY_TIMEOUT * 40,
336            stream.take(peer_count * 2).map(extract_peer).collect::<Vec<_>>(),
337        )
338        .await?;
339
340        let round_1 = &both_rounds[..peer_count];
341        let round_2 = &both_rounds[peer_count..];
342
343        // Both rounds should cover the same set of peers.
344        let set_1: HashSet<_> = round_1.iter().collect();
345        let set_2: HashSet<_> = round_2.iter().collect();
346        assert_eq!(set_1, set_2, "both rounds should cover the same peers");
347
348        // The two rounds should (almost certainly) differ in order.
349        assert_ne!(round_1, round_2, "two rounds should differ in order (probabilistic)");
350        Ok(())
351    }
352
353    #[tokio::test]
354    async fn peers_should_be_generated_in_multiple_rounds() -> anyhow::Result<()> {
355        let me = random_key();
356        let graph = Arc::new(ChannelGraph::new(me));
357        graph.record_node(RANDOM_PEERS.iter().next().unwrap().clone());
358
359        let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
360        let stream = ProbingTrafficGeneration::build(&prober);
361        pin_mut!(stream);
362
363        assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
364        assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_some());
365        Ok(())
366    }
367
368    #[cfg(not(feature = "noise"))]
369    #[tokio::test]
370    async fn cover_traffic_should_produce_empty_stream() -> anyhow::Result<()> {
371        let me = random_key();
372        let prober = FullNetworkDiscovery::new(me, fast_cfg(), Arc::new(ChannelGraph::new(me)));
373        let stream = CoverTrafficGeneration::build(&prober);
374        pin_mut!(stream);
375
376        assert!(timeout(TINY_TIMEOUT, stream.next()).await?.is_none());
377        Ok(())
378    }
379
380    #[tokio::test]
381    async fn only_neighbor_probes_emitted_when_no_looping_paths_exist() -> anyhow::Result<()> {
382        let me = random_key();
383        let graph = Arc::new(ChannelGraph::new(me));
384
385        let a = random_key();
386        let b = random_key();
387        graph.record_node(a);
388        graph.record_node(b);
389        graph.add_edge(&me, &a)?;
390        graph.add_edge(&a, &b)?;
391
392        let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
393        let stream = ProbingTrafficGeneration::build(&prober);
394        pin_mut!(stream);
395
396        let items: Vec<ProbeRouting> = timeout(TINY_TIMEOUT * 50, stream.take(10).collect::<Vec<_>>()).await?;
397
398        assert!(!items.is_empty(), "should produce neighbor probes");
399        assert!(
400            items.iter().all(|r| matches!(r, ProbeRouting::Neighbor(_))),
401            "all probes should be Neighbor when no looping paths exist"
402        );
403        Ok(())
404    }
405
406    #[tokio::test]
407    async fn neighbor_probes_should_cover_all_known_nodes_across_rounds() -> anyhow::Result<()> {
408        let me = random_key();
409        let graph = Arc::new(ChannelGraph::new(me));
410
411        let peer_a = random_key();
412        let peer_b = random_key();
413        graph.record_node(peer_a);
414        graph.record_node(peer_b);
415
416        let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
417        let stream = ProbingTrafficGeneration::build(&prober);
418        pin_mut!(stream);
419
420        let destinations: Vec<NodeId> = timeout(
421            TINY_TIMEOUT * 50,
422            neighbor_destinations(stream).take(4).collect::<Vec<_>>(),
423        )
424        .await?;
425
426        let unique: HashSet<NodeId> = destinations.iter().cloned().collect();
427        let expected: HashSet<NodeId> = [peer_a, peer_b].into_iter().map(NodeId::from).collect();
428
429        assert_eq!(unique, expected, "probes should cover all known graph peers");
430        assert_eq!(destinations.len(), 4, "should have probes across multiple rounds");
431        Ok(())
432    }
433
434    #[tokio::test]
435    async fn single_tick_should_emit_all_peers_in_burst() -> anyhow::Result<()> {
436        let me = random_key();
437        let graph = Arc::new(ChannelGraph::new(me));
438
439        let peer_count = RANDOM_PEERS.len();
440        for node in RANDOM_PEERS.iter() {
441            graph.record_node(node.clone());
442        }
443
444        let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
445        let stream = ProbingTrafficGeneration::build(&prober);
446        pin_mut!(stream);
447
448        // Collect exactly peer_count items — should all arrive from a single tick burst.
449        let burst: Vec<ProbeRouting> = timeout(TINY_TIMEOUT * 20, stream.take(peer_count).collect::<Vec<_>>()).await?;
450
451        assert_eq!(
452            burst.len(),
453            peer_count,
454            "a single tick should emit all {peer_count} peers"
455        );
456        assert!(
457            burst.iter().all(|r| matches!(r, ProbeRouting::Neighbor(_))),
458            "all burst items should be Neighbor probes"
459        );
460
461        Ok(())
462    }
463
464    /// Extract `NodeId` destinations from a `ProbeRouting::Neighbor` stream.
465    fn neighbor_destinations(stream: impl futures::Stream<Item = ProbeRouting>) -> impl futures::Stream<Item = NodeId> {
466        stream.filter_map(|r| {
467            futures::future::ready(match r {
468                ProbeRouting::Neighbor(DestinationRouting::Forward { destination, .. }) => Some(*destination),
469                _ => None,
470            })
471        })
472    }
473
474    /// Helper: mark an edge as fully connected with capacity, so it passes all cost checks.
475    fn mark_edge_ready(graph: &ChannelGraph, src: &OffchainPublicKey, dst: &OffchainPublicKey) {
476        use hopr_api::graph::traits::{EdgeObservableWrite, EdgeWeightType};
477        graph.upsert_edge(src, dst, |obs| {
478            obs.record(EdgeWeightType::Connected(true));
479            obs.record(EdgeWeightType::Immediate(Ok(std::time::Duration::from_millis(50))));
480            obs.record(EdgeWeightType::Capacity(Some(1000)));
481        });
482    }
483
484    #[tokio::test]
485    async fn loopback_probes_should_be_emitted_for_two_edge_path() -> anyhow::Result<()> {
486        // Topology: me → a → b, b → me (connected neighbor)
487        // Loopback: me → a → b → me
488        let me = random_key();
489        let a = random_key();
490        let b = random_key();
491        let graph = Arc::new(ChannelGraph::new(me));
492        graph.add_node(a);
493        graph.add_node(b);
494
495        // Forward edges
496        graph.add_edge(&me, &a)?;
497        graph.add_edge(&a, &b)?;
498        mark_edge_ready(&graph, &me, &a);
499        mark_edge_ready(&graph, &a, &b);
500
501        // Return edge: b is a connected neighbor of me
502        graph.add_edge(&b, &me)?;
503        mark_edge_ready(&graph, &b, &me);
504
505        // Also need me → b edge so simple_loopback_to_self finds b as a connected neighbor
506        graph.add_edge(&me, &b)?;
507        mark_edge_ready(&graph, &me, &b);
508
509        let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
510        let stream = ProbingTrafficGeneration::build(&prober);
511        pin_mut!(stream);
512
513        // Collect enough items to get both neighbor and loopback probes
514        let items: Vec<ProbeRouting> = timeout(TINY_TIMEOUT * 100, stream.take(20).collect::<Vec<_>>()).await?;
515
516        let looping_count = items.iter().filter(|r| matches!(r, ProbeRouting::Looping(_))).count();
517        let neighbor_count = items.iter().filter(|r| matches!(r, ProbeRouting::Neighbor(_))).count();
518
519        assert!(neighbor_count > 0, "should have neighbor probes");
520        assert!(
521            looping_count > 0,
522            "should have loopback probes (was {looping_count} out of {} total)",
523            items.len()
524        );
525
526        // Verify loopback routing has IntermediatePath
527        for item in &items {
528            if let ProbeRouting::Looping((
529                DestinationRouting::Forward {
530                    destination,
531                    forward_options,
532                    ..
533                },
534                _,
535            )) = item
536            {
537                assert_eq!(
538                    destination.as_ref(),
539                    &NodeId::Offchain(me),
540                    "loopback destination should be me"
541                );
542                assert!(
543                    matches!(forward_options, RoutingOptions::IntermediatePath(_)),
544                    "loopback should use IntermediatePath routing"
545                );
546            }
547        }
548
549        Ok(())
550    }
551
552    #[tokio::test]
553    async fn probe_connected_only_should_skip_unconnected_peers() -> anyhow::Result<()> {
554        let me = random_key();
555        let graph = Arc::new(ChannelGraph::new(me));
556
557        let connected_peer = random_key();
558        let unconnected_peer = random_key();
559        graph.record_node(connected_peer);
560        graph.record_node(unconnected_peer);
561
562        // Only mark one peer as connected.
563        mark_edge_ready(&graph, &me, &connected_peer);
564
565        let cfg = ProberConfig {
566            probe_connected_only: true,
567            ..fast_cfg()
568        };
569        let prober = FullNetworkDiscovery::new(me, cfg, graph);
570        let stream = ProbingTrafficGeneration::build(&prober);
571        pin_mut!(stream);
572
573        let destinations: Vec<NodeId> = timeout(
574            TINY_TIMEOUT * 50,
575            neighbor_destinations(stream).take(3).collect::<Vec<_>>(),
576        )
577        .await?;
578
579        let unique: HashSet<NodeId> = destinations.iter().cloned().collect();
580        assert_eq!(unique.len(), 1, "only one peer should be probed");
581        assert!(
582            unique.contains(&NodeId::from(connected_peer)),
583            "only the connected peer should be probed"
584        );
585        Ok(())
586    }
587
588    #[tokio::test]
589    async fn probe_connected_only_disabled_should_probe_all_peers() -> anyhow::Result<()> {
590        let me = random_key();
591        let graph = Arc::new(ChannelGraph::new(me));
592
593        let connected_peer = random_key();
594        let unconnected_peer = random_key();
595        graph.record_node(connected_peer);
596        graph.record_node(unconnected_peer);
597
598        mark_edge_ready(&graph, &me, &connected_peer);
599
600        // Default: probe_connected_only = false
601        let prober = FullNetworkDiscovery::new(me, fast_cfg(), graph);
602        let stream = ProbingTrafficGeneration::build(&prober);
603        pin_mut!(stream);
604
605        let destinations: Vec<NodeId> = timeout(
606            TINY_TIMEOUT * 50,
607            neighbor_destinations(stream).take(4).collect::<Vec<_>>(),
608        )
609        .await?;
610
611        let unique: HashSet<NodeId> = destinations.iter().cloned().collect();
612        let expected: HashSet<NodeId> = [connected_peer, unconnected_peer]
613            .into_iter()
614            .map(NodeId::from)
615            .collect();
616        assert_eq!(
617            unique, expected,
618            "both peers should be probed when probe_connected_only is false"
619        );
620        Ok(())
621    }
622
623    #[tokio::test]
624    async fn loopback_routing_should_reject_full_path_with_me() -> anyhow::Result<()> {
625        // Verify that passing the full path [me, a, b, me] to loopback_routing
626        // exceeds BoundedVec capacity and returns None.
627        let me = random_key();
628        let a = random_key();
629        let b = random_key();
630        let me_node = NodeId::Offchain(me);
631
632        // Full path as returned by simple_loopback_to_self: [me, a, b, me]
633        let full_path = vec![me, a, b, me];
634        assert!(
635            loopback_routing(me_node, full_path).is_none(),
636            "full path [me, a, b, me] should exceed BoundedVec<3> and return None"
637        );
638
639        // Stripped path (intermediates only): [a, b]
640        let stripped_path = vec![a, b];
641        assert!(
642            loopback_routing(me_node, stripped_path).is_some(),
643            "stripped path [a, b] should fit BoundedVec<3> and return Some"
644        );
645
646        Ok(())
647    }
648}