hopr_transport/
network_notifier.rs

1use std::sync::Arc;
2
3use async_lock::RwLock;
4use async_trait::async_trait;
5use hopr_api::{chain::ChainKeyOperations, db::HoprDbPeersOperations};
6use hopr_crypto_types::types::OffchainPublicKey;
7use hopr_path::channel_graph::ChannelGraph;
8use hopr_transport_network::network::{Network, UpdateFailure};
9use hopr_transport_probe::traits::{PeerDiscoveryFetch, ProbeStatusUpdate};
10use tracing::{debug, error};
11
12#[cfg(all(feature = "prometheus", not(test)))]
13lazy_static::lazy_static! {
14    static ref METRIC_TIME_TO_PING:  hopr_metrics::SimpleHistogram =
15         hopr_metrics::SimpleHistogram::new(
16            "hopr_ping_time_sec",
17            "Measures total time it takes to ping a single node (seconds)",
18            vec![0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 15.0, 30.0],
19        ).unwrap();
20    static ref METRIC_PROBE_COUNT:  hopr_metrics::MultiCounter =  hopr_metrics::MultiCounter::new(
21            "hopr_probe_count",
22            "Total number of pings by result",
23            &["success"]
24        ).unwrap();
25}
26
27/// Implementor of the ping external API.
28///
29/// Ping requires functionality from external components in order to obtain
30/// the triggers for its functionality. This class implements the basic API by
31/// aggregating all necessary ping resources without leaking them into the
32/// `Ping` object and keeping both the adaptor and the ping object OCP and SRP
33/// compliant.
34#[derive(Debug, Clone)]
35pub struct ProbeNetworkInteractions<Db, R> {
36    network: Arc<Network<Db>>,
37    resolver: R,
38    channel_graph: Arc<RwLock<ChannelGraph>>,
39}
40
41impl<Db, R> ProbeNetworkInteractions<Db, R>
42where
43    Db: HoprDbPeersOperations + Sync + Send + Clone,
44    R: ChainKeyOperations + Sync + Send + Clone,
45{
46    pub fn new(network: Arc<Network<Db>>, resolver: R, channel_graph: Arc<RwLock<ChannelGraph>>) -> Self {
47        Self {
48            network,
49            resolver,
50            channel_graph,
51        }
52    }
53}
54
55#[async_trait]
56impl<Db, R> PeerDiscoveryFetch for ProbeNetworkInteractions<Db, R>
57where
58    Db: HoprDbPeersOperations + Sync + Send + Clone,
59    R: ChainKeyOperations + Sync + Send + Clone,
60{
61    /// Get all peers considered by the `Network` to be pingable.
62    ///
63    /// After a duration of non-pinging based specified by the configurable threshold.
64    #[tracing::instrument(level = "trace", skip(self))]
65    async fn get_peers(&self, from_timestamp: std::time::SystemTime) -> Vec<hopr_transport_network::PeerId> {
66        self.network
67            .find_peers_to_ping(from_timestamp)
68            .await
69            .unwrap_or_else(|error| {
70                tracing::error!(%error, "failed to generate peers for the heartbeat procedure");
71                vec![]
72            })
73    }
74}
75
76#[async_trait]
77impl<Db, R> ProbeStatusUpdate for ProbeNetworkInteractions<Db, R>
78where
79    Db: HoprDbPeersOperations + Sync + Send + Clone,
80    R: ChainKeyOperations + Sync + Send + Clone,
81{
82    #[tracing::instrument(level = "debug", skip(self))]
83    async fn on_finished(
84        &self,
85        peer: &hopr_transport_network::PeerId,
86        result: &hopr_transport_probe::errors::Result<std::time::Duration>,
87    ) {
88        let result = match &result {
89            Ok(duration) => {
90                #[cfg(all(feature = "prometheus", not(test)))]
91                {
92                    METRIC_TIME_TO_PING.observe((duration.as_millis() as f64) / 1000.0); // precision for seconds
93                    METRIC_PROBE_COUNT.increment(&["true"]);
94                }
95
96                Ok(*duration)
97            }
98            Err(error) => {
99                #[cfg(all(feature = "prometheus", not(test)))]
100                METRIC_PROBE_COUNT.increment(&["false"]);
101
102                tracing::trace!(%error, "Encountered timeout on peer ping");
103                Err(UpdateFailure::Timeout)
104            }
105        };
106
107        // Update the channel graph
108        let peer = *peer;
109        // PeerId -> OffchainPublicKey is a CPU-intensive blocking operation
110        if let Ok(pubkey) = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await {
111            let maybe_chain_key = self.resolver.packet_key_to_chain_key(&pubkey).await;
112            if let Ok(Some(chain_key)) = maybe_chain_key {
113                let mut g = self.channel_graph.write_arc().await;
114                g.update_node_score(&chain_key, result.into());
115                debug!(%chain_key, ?result, "update node score for peer");
116            } else {
117                error!("could not resolve chain key");
118            }
119        } else {
120            error!("encountered invalid peer id");
121        }
122
123        if let Err(error) = self.network.update(&peer, result).await {
124            error!(%error, "Encountered error on on updating the collected ping data")
125        }
126    }
127}