Skip to main content

hopr_builder/
lib.rs

1#[cfg(feature = "session-server")]
2pub mod exit;
3
4#[cfg(feature = "session-server")]
5pub mod config;
6
7#[cfg(any(feature = "testing", test))]
8pub mod testing;
9
10use std::sync::Arc;
11
12use futures::{FutureExt, StreamExt as _};
13use futures_concurrency::stream::StreamExt;
14use hopr_chain_connector::{
15    BlockchainConnectorConfig, HoprBlockchainSafeConnector,
16    api::{AccountSelector, ChainEvent, HoprChainApi, StateSyncOptions},
17    blokli_client::{BlokliClient, BlokliClientConfig},
18    create_trustful_hopr_blokli_connector,
19};
20#[cfg(feature = "runtime-tokio")]
21pub use hopr_lib;
22#[cfg(feature = "session-server")]
23use hopr_lib::traits::HoprSessionServer;
24#[cfg(feature = "runtime-tokio")]
25use hopr_lib::{
26    ChainKeypair, Hopr, HoprLibError, HoprTransportIO, OffchainKeypair, api::network::NetworkEvent,
27    config::HoprLibConfig,
28};
29use hopr_lib::{Keypair, UnitaryFloatOps};
30use hopr_network_graph::{ChannelGraph, SharedChannelGraph};
31use hopr_transport_p2p::{HoprLibp2pNetworkBuilder, HoprNetwork, PeerDiscovery};
32#[cfg(feature = "runtime-tokio")]
33use validator::Validate;
34#[cfg(feature = "runtime-tokio")]
35use {
36    futures::SinkExt,
37    hopr_lib::{
38        api::{PeerId, graph::EdgeCapacityUpdate, graph::NetworkGraphUpdate},
39        {ChannelStatus, NeighborTelemetry, PathTelemetry},
40    },
41};
42
43#[cfg(feature = "session-server")]
44use crate::{config::SessionIpForwardingConfig, exit::HoprServerIpForwardingReactor};
45
46pub type ReferenceHopr = Hopr<Arc<HoprBlockchainSafeConnector<BlokliClient>>, SharedChannelGraph, HoprNetwork>;
47
48#[cfg(feature = "runtime-tokio")]
49pub async fn build_reference(
50    identity: (&ChainKeypair, &OffchainKeypair),
51    config: HoprLibConfig,
52    blokli_url: String,
53    #[cfg(feature = "session-server")] server_config: SessionIpForwardingConfig,
54) -> anyhow::Result<(
55    Arc<ReferenceHopr>,
56    impl Future<Output = std::result::Result<HoprTransportIO, HoprLibError>>,
57)> {
58    let (chain_key, packet_key) = identity;
59
60    let mut chain_connector = create_trustful_hopr_blokli_connector(
61        chain_key,
62        BlockchainConnectorConfig {
63            connection_sync_timeout: std::time::Duration::from_mins(1),
64            sync_tolerance: 90,
65            tx_timeout_multiplier: std::env::var("HOPR_TX_TIMEOUT_MULTIPLIER")
66                .ok()
67                .and_then(|p| {
68                    p.parse()
69                        .inspect_err(|error| tracing::warn!(%error, "failed to parse HOPR_TX_TIMEOUT_MULTIPLIER"))
70                        .ok()
71                })
72                .unwrap_or_else(|| BlockchainConnectorConfig::default().tx_timeout_multiplier),
73        },
74        BlokliClient::new(
75            blokli_url.parse()?,
76            BlokliClientConfig {
77                timeout: std::time::Duration::from_secs(30),
78                stream_reconnect_timeout: std::time::Duration::from_secs(30),
79                subscription_stream_restart_delay: Some(std::time::Duration::from_secs(1)),
80            },
81        ),
82        config.safe_module.module_address,
83    )
84    .await?;
85    chain_connector.connect().await?;
86    let chain_connector = Arc::new(chain_connector);
87
88    #[cfg(feature = "session-server")]
89    let session_server = HoprServerIpForwardingReactor::new(packet_key.clone(), server_config);
90
91    build_with_chain(
92        chain_key,
93        packet_key,
94        config,
95        None,
96        chain_connector,
97        #[cfg(feature = "session-server")]
98        session_server,
99    )
100    .await
101}
102
103#[cfg(feature = "runtime-tokio")]
104pub async fn build_with_chain<
105    Chain,
106    #[cfg(feature = "session-server")] Srv: HoprSessionServer + Clone + Send + 'static,
107>(
108    chain_key: &ChainKeypair,
109    packet_key: &OffchainKeypair,
110    config: HoprLibConfig,
111    probe_cfg: Option<hopr_ct_full_network::ProberConfig>,
112    chain_connector: Chain,
113    #[cfg(feature = "session-server")] server: Srv,
114) -> anyhow::Result<(
115    Arc<Hopr<Chain, SharedChannelGraph, HoprNetwork>>,
116    impl Future<Output = std::result::Result<HoprTransportIO, HoprLibError>>,
117)>
118where
119    Chain: HoprChainApi + Clone + Send + Sync + 'static,
120{
121    if let Some(ref pcfg) = probe_cfg {
122        pcfg.validate()
123            .map_err(|e| anyhow::anyhow!("invalid ProberConfig: {e}"))?;
124
125        let probe_timeout = config.protocol.probe.timeout;
126        anyhow::ensure!(
127            pcfg.interval >= probe_timeout,
128            "ProberConfig interval ({:?}) must be >= ProbeConfig timeout ({:?}) to prevent overlapping probe rounds",
129            pcfg.interval,
130            probe_timeout,
131        );
132    }
133
134    // Calculate the minimum capacity based on accounts (each account can generate 2 messages),
135    // plus 100 as an additional buffer
136    let minimum_capacity = chain_connector
137        .count_accounts(AccountSelector {
138            public_only: true,
139            ..Default::default()
140        })
141        .await
142        .map_err(hopr_lib::HoprLibError::chain)?
143        .saturating_mul(2)
144        .saturating_add(100);
145
146    let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
147        .ok()
148        .and_then(|s| s.trim().parse::<usize>().ok())
149        .filter(|&c| c > 0)
150        .unwrap_or(2048)
151        .max(minimum_capacity);
152
153    tracing::debug!(
154        capacity = chain_discovery_events_capacity,
155        minimum_required = minimum_capacity,
156        "creating chain discovery events channel"
157    );
158    let (indexer_peer_update_tx, indexer_peer_update_rx) =
159        futures::channel::mpsc::channel::<PeerDiscovery>(chain_discovery_events_capacity);
160
161    // create network
162    let network_builder = HoprLibp2pNetworkBuilder::new(indexer_peer_update_rx);
163    // create graph
164    let graph = std::sync::Arc::new(ChannelGraph::new(*packet_key.public()));
165
166    // END = implementation definitions
167
168    // START = process chain and network events into graph updates
169    let chain_events = chain_connector
170        .subscribe_with_state_sync([StateSyncOptions::PublicAccounts, StateSyncOptions::OpenedChannels])?;
171    let network_events = network_builder.subscribe_network_events();
172    let graph_updater = graph.clone();
173    let chain_reader = chain_connector.clone();
174    let indexer_peer_update_tx = indexer_peer_update_tx.clone();
175
176    let proc =
177        async move {
178            enum Event {
179                Chain(ChainEvent),
180                Network(NetworkEvent),
181            }
182
183            let ticket_price = std::sync::Arc::new(parking_lot::RwLock::new(chain_reader.minimum_ticket_price().await.unwrap_or_default()));
184            let win_probability = std::sync::Arc::new(parking_lot::RwLock::new(chain_reader.minimum_incoming_ticket_win_prob().await.unwrap_or_default()));
185
186            network_events
187                .map(Event::Network)
188                .merge(chain_events.map(Event::Chain))
189                .for_each(|event| {
190                    let mut indexer_peer_update_tx = indexer_peer_update_tx.clone();
191                    let chain_reader = chain_reader.clone();
192                    let graph_updater = graph_updater.clone();
193                    let ticket_price = ticket_price.clone();
194                    let win_probability = win_probability.clone();
195
196                    async move {
197                        match event {
198                            Event::Chain(chain_event) => {
199                                match chain_event {
200                                    ChainEvent::Announcement(account) =>{
201                                        tracing::debug!(account = %account.public_key, "recording graph update for announced account");
202                                        graph_updater.record_node(account.public_key);
203                                        let peer_id: PeerId = account.public_key.into();
204                                        if let Err(error) = indexer_peer_update_tx.send(PeerDiscovery::Announce(peer_id, account.get_multiaddrs().to_vec())).await {
205                                            tracing::error!(peer = %peer_id, %error, reason = "failed to propagate the record", "ignoring announced peer")
206                                        }
207                                    },
208                                    ChainEvent::ChannelOpened(channel) |
209                                    ChainEvent::ChannelClosed(channel) |
210                                    ChainEvent::ChannelBalanceIncreased(channel, _) |
211                                    ChainEvent::ChannelBalanceDecreased(channel, _) => {
212                                        let from = chain_reader.chain_key_to_packet_key(&channel.source).await;
213                                        let to = chain_reader.chain_key_to_packet_key(&channel.destination).await;
214
215                                        match (from, to) {
216                                            (Ok(Some(from)), Ok(Some(to))) => {
217                                                let capacity =  if matches!(channel.status, ChannelStatus::Closed) {
218                                                    None
219                                                } else if let Ok(ticket_value) = ticket_price.read().div_f64(win_probability.read().as_f64()) {
220                                                    Some(
221                                                        channel.balance
222                                                            .amount()
223                                                            .checked_div(ticket_value.amount())
224                                                            .map(|v| v.low_u128())
225                                                            .unwrap_or(u128::MAX)
226                                                    )
227                                                } else {
228                                                    None
229                                                };
230
231                                                tracing::debug!(%channel, ?capacity, "recording graph update for channel capacity change");
232                                                graph_updater.record_edge(hopr_lib::api::graph::MeasurableEdge::<NeighborTelemetry, PathTelemetry>::Capacity(Box::new(EdgeCapacityUpdate{
233                                                    capacity,
234                                                    src: from,
235                                                    dest: to
236                                            })));
237                                            },
238                                            (Ok(_), Ok(_)) => {
239                                                tracing::error!(%channel, "could not find packet keys for the channel endpoints");
240                                            },
241                                            (Err(e), _) | (_, Err(e)) => {
242                                                tracing::error!(%e, %channel, "failed to convert chain keys to packet keys for graph update");
243                                            }
244                                        }
245                                    },
246                                    ChainEvent::ChannelClosureInitiated(_channel) => {},
247                                    ChainEvent::WinningProbabilityIncreased(probability) |
248                                    ChainEvent::WinningProbabilityDecreased(probability) => {
249                                        tracing::debug!(%probability, "recording winning probability change");
250                                        *win_probability.write() = probability;
251                                    }
252                                    ChainEvent::TicketPriceChanged(price) => {
253                                        tracing::debug!(%price, "recording ticket price change");
254                                        *ticket_price.write() = price;
255                                    },
256                                    _ => {}
257                                }
258                            }
259                            Event::Network(network_event) => {
260                                match network_event {
261                                    NetworkEvent::PeerConnected(peer_id) =>
262                                        if let Ok(opk) = hopr_lib::peer_id_to_public_key(&peer_id).await {
263                                            graph_updater.record_edge(hopr_lib::api::graph::MeasurableEdge::<NeighborTelemetry, PathTelemetry>::ConnectionStatus {
264                                                peer: opk,
265                                                connected: true
266                                            });
267                                        } else {
268                                            tracing::error!(%peer_id, "failed to convert peer ID to public key for graph update");
269                                        },
270                                    NetworkEvent::PeerDisconnected(peer_id) =>
271                                        if let Ok(opk) = hopr_lib::peer_id_to_public_key(&peer_id).await {
272                                            graph_updater.record_edge(hopr_lib::api::graph::MeasurableEdge::<NeighborTelemetry, PathTelemetry>::ConnectionStatus {
273                                                peer: opk,
274                                                connected: false
275                                            });
276                                        } else {
277                                            tracing::error!(%peer_id, "failed to convert peer ID to public key for graph update");
278                                        },
279                                };
280                            }
281                        }
282                    }
283                })
284                .await;
285        }
286        .inspect(|_| tracing::warn!(task = "Interconnecting chain, graph and network", "long-running background task finished"));
287    let _jh = tokio::spawn(proc);
288    // END = process chain and network events into graph updates
289
290    // create the node
291    let node =
292        Arc::new(hopr_lib::Hopr::new((chain_key, packet_key), chain_connector.clone(), graph.clone(), config).await?);
293
294    let node_for_run = node.clone();
295    let start = async move {
296        node_for_run
297            .run(
298                hopr_ct_full_network::FullNetworkDiscovery::new(
299                    *packet_key.public(),
300                    probe_cfg.unwrap_or_default(),
301                    graph,
302                ),
303                network_builder,
304                #[cfg(feature = "session-server")]
305                server,
306            )
307            .await
308    };
309
310    Ok((node, start))
311}