hopr_transport/
network_notifier.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use std::sync::Arc;

use async_lock::RwLock;
use async_trait::async_trait;
use tracing::{debug, error};

use hopr_crypto_types::types::OffchainPublicKey;
use hopr_db_sql::api::resolver::HoprDbResolverOperations;
use hopr_path::channel_graph::ChannelGraph;
use hopr_transport_network::{
    network::{Network, NetworkTriggeredEvent},
    ping::PingExternalAPI,
    HoprDbPeersOperations, PeerId,
};

/// Implementor of the ping external API.
///
/// Ping requires functionality from external components in order to obtain
/// the triggers for its functionality. This class implements the basic API by
/// aggregating all necessary ping resources without leaking them into the
/// `Ping` object and keeping both the adaptor and the ping object OCP and SRP
/// compliant.
#[derive(Debug, Clone)]
pub struct PingExternalInteractions<T>
where
    T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
{
    network: Arc<Network<T>>,
    resolver: T,
    channel_graph: Arc<RwLock<ChannelGraph>>,
    /// Implementation of the network interface allowing emitting events
    /// based on the [hopr_transport_network::network::Network] events into the p2p swarm.
    emitter: futures::channel::mpsc::Sender<NetworkTriggeredEvent>,
}

impl<T> PingExternalInteractions<T>
where
    T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
{
    pub fn new(
        network: Arc<Network<T>>,
        resolver: T,
        channel_graph: Arc<RwLock<ChannelGraph>>,
        emitter: futures::channel::mpsc::Sender<NetworkTriggeredEvent>,
    ) -> Self {
        Self {
            network,
            resolver,
            channel_graph,
            emitter,
        }
    }
}

#[async_trait]
impl<T> PingExternalAPI for PingExternalInteractions<T>
where
    T: HoprDbPeersOperations + HoprDbResolverOperations + Sync + Send + Clone + std::fmt::Debug,
{
    #[tracing::instrument(level = "info", skip(self))]
    async fn on_finished_ping(
        &self,
        peer: &PeerId,
        result: &hopr_transport_network::errors::Result<std::time::Duration>,
        version: String,
    ) {
        let result = match &result {
            Ok(duration) => Ok(*duration),
            Err(_) => Err(()),
        };

        match self
            .network
            .update(peer, result, result.is_ok().then_some(version))
            .await
        {
            Ok(Some(updated)) => match updated {
                NetworkTriggeredEvent::CloseConnection(peer) => {
                    if let Err(e) = self
                        .emitter
                        .clone()
                        .try_send(NetworkTriggeredEvent::CloseConnection(peer))
                    {
                        error!(error = %e, "Failed to emit a network event 'close connection'")
                    }
                }
                NetworkTriggeredEvent::UpdateQuality(peer, quality) => {
                    debug!("'{peer}' changed quality to '{quality}'");
                    if let Ok(pk) = OffchainPublicKey::try_from(peer) {
                        let maybe_chain_key = self.resolver.resolve_chain_key(&pk).await;
                        if let Ok(Some(chain_key)) = maybe_chain_key {
                            let mut g = self.channel_graph.write().await;
                            g.update_node_quality(&chain_key, quality);
                            debug!("update node {chain_key} with quality {quality}");
                        } else {
                            error!(%peer, "could not resolve chain key ");
                        }
                    } else {
                        error!(%peer, "encountered invalid peer id:");
                    }
                }
            },
            Ok(None) => debug!("No update necessary"),
            Err(e) => error!(error = %e, "Encountered error on on updating the collected ping data"),
        }
    }
}