hopr_transport/
network_notifier.rs1use std::sync::Arc;
2
3use async_lock::RwLock;
4use async_trait::async_trait;
5use hopr_crypto_types::types::OffchainPublicKey;
6use hopr_db_sql::api::resolver::HoprDbResolverOperations;
7#[cfg(all(feature = "prometheus", not(test)))]
8use hopr_metrics::metrics::{MultiCounter, SimpleHistogram};
9use hopr_path::channel_graph::ChannelGraph;
10use hopr_transport_network::{HoprDbPeersOperations, network::Network};
11use hopr_transport_probe::traits::{PeerDiscoveryFetch, ProbeStatusUpdate};
12use tracing::{debug, error};
13
14#[cfg(all(feature = "prometheus", not(test)))]
15lazy_static::lazy_static! {
16 static ref METRIC_TIME_TO_PING: SimpleHistogram =
17 SimpleHistogram::new(
18 "hopr_ping_time_sec",
19 "Measures total time it takes to ping a single node (seconds)",
20 vec![0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 15.0, 30.0],
21 ).unwrap();
22 static ref METRIC_PROBE_COUNT: MultiCounter = MultiCounter::new(
23 "hopr_probe_count",
24 "Total number of pings by result",
25 &["success"]
26 ).unwrap();
27}
28
29#[derive(Debug, Clone)]
37pub struct ProbeNetworkInteractions<T>
38where
39 T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
40{
41 network: Arc<Network<T>>,
42 resolver: T,
43 channel_graph: Arc<RwLock<ChannelGraph>>,
44}
45
46impl<T> ProbeNetworkInteractions<T>
47where
48 T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
49{
50 pub fn new(network: Arc<Network<T>>, resolver: T, channel_graph: Arc<RwLock<ChannelGraph>>) -> Self {
51 Self {
52 network,
53 resolver,
54 channel_graph,
55 }
56 }
57}
58
59#[async_trait]
60impl<T> PeerDiscoveryFetch for ProbeNetworkInteractions<T>
61where
62 T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
63{
64 #[tracing::instrument(level = "trace", skip(self))]
68 async fn get_peers(&self, from_timestamp: std::time::SystemTime) -> Vec<hopr_transport_network::PeerId> {
69 self.network
70 .find_peers_to_ping(from_timestamp)
71 .await
72 .unwrap_or_else(|e| {
73 tracing::error!(error = %e, "Failed to generate peers for the heartbeat procedure");
74 vec![]
75 })
76 }
77}
78
79#[async_trait]
80impl<T> ProbeStatusUpdate for ProbeNetworkInteractions<T>
81where
82 T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
83{
84 #[tracing::instrument(level = "debug", skip(self))]
85 async fn on_finished(
86 &self,
87 peer: &hopr_transport_network::PeerId,
88 result: &hopr_transport_probe::errors::Result<std::time::Duration>,
89 ) {
90 let result = match &result {
91 Ok(duration) => {
92 #[cfg(all(feature = "prometheus", not(test)))]
93 {
94 METRIC_TIME_TO_PING.observe((duration.as_millis() as f64) / 1000.0); METRIC_PROBE_COUNT.increment(&["true"]);
96 }
97
98 Ok(*duration)
99 }
100 Err(_) => {
101 #[cfg(all(feature = "prometheus", not(test)))]
102 METRIC_PROBE_COUNT.increment(&["false"]);
103
104 Err(())
105 }
106 };
107
108 if let Ok(pk) = OffchainPublicKey::try_from(peer) {
110 let maybe_chain_key = self.resolver.resolve_chain_key(&pk).await;
111 if let Ok(Some(chain_key)) = maybe_chain_key {
112 let mut g = self.channel_graph.write_arc().await;
113 g.update_node_score(&chain_key, result.into());
114 debug!(%chain_key, ?result, "update node score for peer");
115 } else {
116 error!("could not resolve chain key");
117 }
118 } else {
119 error!("encountered invalid peer id");
120 }
121
122 if let Err(error) = self.network.update(peer, result, None).await {
123 error!(%error, "Encountered error on on updating the collected ping data")
124 }
125 }
126}