hopr_transport/
network_notifier.rs1use std::sync::Arc;
2
3use async_lock::RwLock;
4use async_trait::async_trait;
5use tracing::{debug, error};
6
7use hopr_crypto_types::types::OffchainPublicKey;
8use hopr_db_sql::api::resolver::HoprDbResolverOperations;
9use hopr_path::channel_graph::ChannelGraph;
10use hopr_transport_network::{
11 network::{Network, NetworkTriggeredEvent},
12 ping::PingExternalAPI,
13 HoprDbPeersOperations, PeerId,
14};
15
16#[derive(Debug, Clone)]
24pub struct PingExternalInteractions<T>
25where
26 T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
27{
28 network: Arc<Network<T>>,
29 resolver: T,
30 channel_graph: Arc<RwLock<ChannelGraph>>,
31 emitter: futures::channel::mpsc::Sender<NetworkTriggeredEvent>,
34}
35
36impl<T> PingExternalInteractions<T>
37where
38 T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
39{
40 pub fn new(
41 network: Arc<Network<T>>,
42 resolver: T,
43 channel_graph: Arc<RwLock<ChannelGraph>>,
44 emitter: futures::channel::mpsc::Sender<NetworkTriggeredEvent>,
45 ) -> Self {
46 Self {
47 network,
48 resolver,
49 channel_graph,
50 emitter,
51 }
52 }
53}
54
55#[async_trait]
56impl<T> PingExternalAPI for PingExternalInteractions<T>
57where
58 T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
59{
60 #[tracing::instrument(level = "info", skip(self))]
61 async fn on_finished_ping(
62 &self,
63 peer: &PeerId,
64 result: &hopr_transport_network::errors::Result<std::time::Duration>,
65 version: String,
66 ) {
67 let result = match &result {
68 Ok(duration) => Ok(*duration),
69 Err(_) => Err(()),
70 };
71
72 if let Ok(pk) = OffchainPublicKey::try_from(peer) {
74 let maybe_chain_key = self.resolver.resolve_chain_key(&pk).await;
75 if let Ok(Some(chain_key)) = maybe_chain_key {
76 let mut g = self.channel_graph.write().await;
77 g.update_node_score(&chain_key, result.into());
78 debug!(%chain_key, ?result, "update node score for peer");
79 } else {
80 error!(%peer, "could not resolve chain key ");
81 }
82 } else {
83 error!(%peer, "encountered invalid peer id:");
84 }
85
86 match self
87 .network
88 .update(peer, result, result.is_ok().then_some(version))
89 .await
90 {
91 Ok(Some(updated)) => match updated {
92 NetworkTriggeredEvent::CloseConnection(peer) => {
93 if let Err(e) = self
94 .emitter
95 .clone()
96 .try_send(NetworkTriggeredEvent::CloseConnection(peer))
97 {
98 error!(error = %e, "Failed to emit a network event 'close connection'")
99 }
100 }
101 NetworkTriggeredEvent::UpdateQuality(peer, quality) => {
102 debug!("'{peer}' changed quality to '{quality}'");
103 }
104 },
105 Ok(None) => debug!("No update necessary"),
106 Err(e) => error!(error = %e, "Encountered error on on updating the collected ping data"),
107 }
108 }
109}