hopr_transport/
network_notifier.rs

1use std::sync::Arc;
2
3use async_lock::RwLock;
4use async_trait::async_trait;
5use tracing::{debug, error};
6
7use hopr_crypto_types::types::OffchainPublicKey;
8use hopr_db_sql::api::resolver::HoprDbResolverOperations;
9use hopr_path::channel_graph::ChannelGraph;
10use hopr_transport_network::{
11    network::{Network, NetworkTriggeredEvent},
12    ping::PingExternalAPI,
13    HoprDbPeersOperations, PeerId,
14};
15
16/// Implementor of the ping external API.
17///
18/// Ping requires functionality from external components in order to obtain
19/// the triggers for its functionality. This class implements the basic API by
20/// aggregating all necessary ping resources without leaking them into the
21/// `Ping` object and keeping both the adaptor and the ping object OCP and SRP
22/// compliant.
23#[derive(Debug, Clone)]
24pub struct PingExternalInteractions<T>
25where
26    T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
27{
28    network: Arc<Network<T>>,
29    resolver: T,
30    channel_graph: Arc<RwLock<ChannelGraph>>,
31    /// Implementation of the network interface allowing emitting events
32    /// based on the [hopr_transport_network::network::Network] events into the p2p swarm.
33    emitter: futures::channel::mpsc::Sender<NetworkTriggeredEvent>,
34}
35
36impl<T> PingExternalInteractions<T>
37where
38    T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
39{
40    pub fn new(
41        network: Arc<Network<T>>,
42        resolver: T,
43        channel_graph: Arc<RwLock<ChannelGraph>>,
44        emitter: futures::channel::mpsc::Sender<NetworkTriggeredEvent>,
45    ) -> Self {
46        Self {
47            network,
48            resolver,
49            channel_graph,
50            emitter,
51        }
52    }
53}
54
55#[async_trait]
56impl<T> PingExternalAPI for PingExternalInteractions<T>
57where
58    T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
59{
60    #[tracing::instrument(level = "info", skip(self))]
61    async fn on_finished_ping(
62        &self,
63        peer: &PeerId,
64        result: &hopr_transport_network::errors::Result<std::time::Duration>,
65        version: String,
66    ) {
67        let result = match &result {
68            Ok(duration) => Ok(*duration),
69            Err(_) => Err(()),
70        };
71
72        // Update the channel graph
73        if let Ok(pk) = OffchainPublicKey::try_from(peer) {
74            let maybe_chain_key = self.resolver.resolve_chain_key(&pk).await;
75            if let Ok(Some(chain_key)) = maybe_chain_key {
76                let mut g = self.channel_graph.write().await;
77                g.update_node_score(&chain_key, result.into());
78                debug!(%chain_key, ?result, "update node score for peer");
79            } else {
80                error!(%peer, "could not resolve chain key ");
81            }
82        } else {
83            error!(%peer, "encountered invalid peer id:");
84        }
85
86        match self
87            .network
88            .update(peer, result, result.is_ok().then_some(version))
89            .await
90        {
91            Ok(Some(updated)) => match updated {
92                NetworkTriggeredEvent::CloseConnection(peer) => {
93                    if let Err(e) = self
94                        .emitter
95                        .clone()
96                        .try_send(NetworkTriggeredEvent::CloseConnection(peer))
97                    {
98                        error!(error = %e, "Failed to emit a network event 'close connection'")
99                    }
100                }
101                NetworkTriggeredEvent::UpdateQuality(peer, quality) => {
102                    debug!("'{peer}' changed quality to '{quality}'");
103                }
104            },
105            Ok(None) => debug!("No update necessary"),
106            Err(e) => error!(error = %e, "Encountered error on on updating the collected ping data"),
107        }
108    }
109}