hopr_transport/
network_notifier.rs

1use std::sync::Arc;
2
3use async_lock::RwLock;
4use async_trait::async_trait;
5use hopr_crypto_types::types::OffchainPublicKey;
6use hopr_db_sql::api::resolver::HoprDbResolverOperations;
7#[cfg(all(feature = "prometheus", not(test)))]
8use hopr_metrics::metrics::{MultiCounter, SimpleHistogram};
9use hopr_path::channel_graph::ChannelGraph;
10use hopr_transport_network::{HoprDbPeersOperations, network::Network};
11use hopr_transport_probe::traits::{PeerDiscoveryFetch, ProbeStatusUpdate};
12use tracing::{debug, error};
13
14#[cfg(all(feature = "prometheus", not(test)))]
15lazy_static::lazy_static! {
16    static ref METRIC_TIME_TO_PING: SimpleHistogram =
17        SimpleHistogram::new(
18            "hopr_ping_time_sec",
19            "Measures total time it takes to ping a single node (seconds)",
20            vec![0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 15.0, 30.0],
21        ).unwrap();
22    static ref METRIC_PROBE_COUNT: MultiCounter = MultiCounter::new(
23            "hopr_probe_count",
24            "Total number of pings by result",
25            &["success"]
26        ).unwrap();
27}
28
29/// Implementor of the ping external API.
30///
31/// Ping requires functionality from external components in order to obtain
32/// the triggers for its functionality. This class implements the basic API by
33/// aggregating all necessary ping resources without leaking them into the
34/// `Ping` object and keeping both the adaptor and the ping object OCP and SRP
35/// compliant.
36#[derive(Debug, Clone)]
37pub struct ProbeNetworkInteractions<T>
38where
39    T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
40{
41    network: Arc<Network<T>>,
42    resolver: T,
43    channel_graph: Arc<RwLock<ChannelGraph>>,
44}
45
46impl<T> ProbeNetworkInteractions<T>
47where
48    T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
49{
50    pub fn new(network: Arc<Network<T>>, resolver: T, channel_graph: Arc<RwLock<ChannelGraph>>) -> Self {
51        Self {
52            network,
53            resolver,
54            channel_graph,
55        }
56    }
57}
58
59#[async_trait]
60impl<T> PeerDiscoveryFetch for ProbeNetworkInteractions<T>
61where
62    T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
63{
64    /// Get all peers considered by the `Network` to be pingable.
65    ///
66    /// After a duration of non-pinging based specified by the configurable threshold.
67    #[tracing::instrument(level = "trace", skip(self))]
68    async fn get_peers(&self, from_timestamp: std::time::SystemTime) -> Vec<hopr_transport_network::PeerId> {
69        self.network
70            .find_peers_to_ping(from_timestamp)
71            .await
72            .unwrap_or_else(|e| {
73                tracing::error!(error = %e, "Failed to generate peers for the heartbeat procedure");
74                vec![]
75            })
76    }
77}
78
79#[async_trait]
80impl<T> ProbeStatusUpdate for ProbeNetworkInteractions<T>
81where
82    T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
83{
84    #[tracing::instrument(level = "debug", skip(self))]
85    async fn on_finished(
86        &self,
87        peer: &hopr_transport_network::PeerId,
88        result: &hopr_transport_probe::errors::Result<std::time::Duration>,
89    ) {
90        let result = match &result {
91            Ok(duration) => {
92                #[cfg(all(feature = "prometheus", not(test)))]
93                {
94                    METRIC_TIME_TO_PING.observe((duration.as_millis() as f64) / 1000.0); // precision for seconds
95                    METRIC_PROBE_COUNT.increment(&["true"]);
96                }
97
98                Ok(*duration)
99            }
100            Err(_) => {
101                #[cfg(all(feature = "prometheus", not(test)))]
102                METRIC_PROBE_COUNT.increment(&["false"]);
103
104                Err(())
105            }
106        };
107
108        // Update the channel graph
109        if let Ok(pk) = OffchainPublicKey::try_from(peer) {
110            let maybe_chain_key = self.resolver.resolve_chain_key(&pk).await;
111            if let Ok(Some(chain_key)) = maybe_chain_key {
112                let mut g = self.channel_graph.write_arc().await;
113                g.update_node_score(&chain_key, result.into());
114                debug!(%chain_key, ?result, "update node score for peer");
115            } else {
116                error!("could not resolve chain key");
117            }
118        } else {
119            error!("encountered invalid peer id");
120        }
121
122        if let Err(error) = self.network.update(peer, result, None).await {
123            error!(%error, "Encountered error on on updating the collected ping data")
124        }
125    }
126}