hopr_transport/
network_notifier.rs1use std::sync::Arc;
2
3use async_lock::RwLock;
4use async_trait::async_trait;
5use hopr_api::{chain::ChainKeyOperations, db::HoprDbPeersOperations};
6use hopr_crypto_types::types::OffchainPublicKey;
7use hopr_path::channel_graph::ChannelGraph;
8use hopr_transport_network::network::{Network, UpdateFailure};
9use hopr_transport_probe::traits::{PeerDiscoveryFetch, ProbeStatusUpdate};
10use tracing::{debug, error};
11
12#[cfg(all(feature = "prometheus", not(test)))]
13lazy_static::lazy_static! {
14 static ref METRIC_TIME_TO_PING: hopr_metrics::SimpleHistogram =
15 hopr_metrics::SimpleHistogram::new(
16 "hopr_ping_time_sec",
17 "Measures total time it takes to ping a single node (seconds)",
18 vec![0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 15.0, 30.0],
19 ).unwrap();
20 static ref METRIC_PROBE_COUNT: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
21 "hopr_probe_count",
22 "Total number of pings by result",
23 &["success"]
24 ).unwrap();
25}
26
27#[derive(Debug, Clone)]
35pub struct ProbeNetworkInteractions<Db, R> {
36 network: Arc<Network<Db>>,
37 resolver: R,
38 channel_graph: Arc<RwLock<ChannelGraph>>,
39}
40
41impl<Db, R> ProbeNetworkInteractions<Db, R>
42where
43 Db: HoprDbPeersOperations + Sync + Send + Clone,
44 R: ChainKeyOperations + Sync + Send + Clone,
45{
46 pub fn new(network: Arc<Network<Db>>, resolver: R, channel_graph: Arc<RwLock<ChannelGraph>>) -> Self {
47 Self {
48 network,
49 resolver,
50 channel_graph,
51 }
52 }
53}
54
55#[async_trait]
56impl<Db, R> PeerDiscoveryFetch for ProbeNetworkInteractions<Db, R>
57where
58 Db: HoprDbPeersOperations + Sync + Send + Clone,
59 R: ChainKeyOperations + Sync + Send + Clone,
60{
61 #[tracing::instrument(level = "trace", skip(self))]
65 async fn get_peers(&self, from_timestamp: std::time::SystemTime) -> Vec<hopr_transport_network::PeerId> {
66 self.network
67 .find_peers_to_ping(from_timestamp)
68 .await
69 .unwrap_or_else(|error| {
70 tracing::error!(%error, "failed to generate peers for the heartbeat procedure");
71 vec![]
72 })
73 }
74}
75
76#[async_trait]
77impl<Db, R> ProbeStatusUpdate for ProbeNetworkInteractions<Db, R>
78where
79 Db: HoprDbPeersOperations + Sync + Send + Clone,
80 R: ChainKeyOperations + Sync + Send + Clone,
81{
82 #[tracing::instrument(level = "debug", skip(self))]
83 async fn on_finished(
84 &self,
85 peer: &hopr_transport_network::PeerId,
86 result: &hopr_transport_probe::errors::Result<std::time::Duration>,
87 ) {
88 let result = match &result {
89 Ok(duration) => {
90 #[cfg(all(feature = "prometheus", not(test)))]
91 {
92 METRIC_TIME_TO_PING.observe((duration.as_millis() as f64) / 1000.0); METRIC_PROBE_COUNT.increment(&["true"]);
94 }
95
96 Ok(*duration)
97 }
98 Err(error) => {
99 #[cfg(all(feature = "prometheus", not(test)))]
100 METRIC_PROBE_COUNT.increment(&["false"]);
101
102 tracing::trace!(%error, "Encountered timeout on peer ping");
103 Err(UpdateFailure::Timeout)
104 }
105 };
106
107 let peer = *peer;
109 if let Ok(pubkey) = hopr_parallelize::cpu::spawn_blocking(move || OffchainPublicKey::from_peerid(&peer)).await {
111 let maybe_chain_key = self.resolver.packet_key_to_chain_key(&pubkey).await;
112 if let Ok(Some(chain_key)) = maybe_chain_key {
113 let mut g = self.channel_graph.write_arc().await;
114 g.update_node_score(&chain_key, result.into());
115 debug!(%chain_key, ?result, "update node score for peer");
116 } else {
117 error!("could not resolve chain key");
118 }
119 } else {
120 error!("encountered invalid peer id");
121 }
122
123 if let Err(error) = self.network.update(&peer, result).await {
124 error!(%error, "Encountered error on on updating the collected ping data")
125 }
126 }
127}