1#[cfg(feature = "session-server")]
2pub mod exit;
3
4#[cfg(feature = "session-server")]
5pub mod config;
6
7#[cfg(any(feature = "testing", test))]
8pub mod testing;
9
10use std::sync::Arc;
11
12use futures::{FutureExt, StreamExt as _};
13use futures_concurrency::stream::StreamExt;
14use hopr_chain_connector::{
15 BlockchainConnectorConfig, HoprBlockchainSafeConnector,
16 api::{AccountSelector, ChainEvent, HoprChainApi, StateSyncOptions},
17 blokli_client::{BlokliClient, BlokliClientConfig},
18 create_trustful_hopr_blokli_connector,
19};
20#[cfg(feature = "runtime-tokio")]
21pub use hopr_lib;
22#[cfg(feature = "session-server")]
23use hopr_lib::traits::HoprSessionServer;
24#[cfg(feature = "runtime-tokio")]
25use hopr_lib::{
26 ChainKeypair, Hopr, HoprLibError, HoprTransportIO, OffchainKeypair, api::network::NetworkEvent,
27 config::HoprLibConfig,
28};
29use hopr_lib::{Keypair, UnitaryFloatOps};
30use hopr_network_graph::{ChannelGraph, SharedChannelGraph};
31use hopr_transport_p2p::{HoprLibp2pNetworkBuilder, HoprNetwork, PeerDiscovery};
32#[cfg(feature = "runtime-tokio")]
33use validator::Validate;
34#[cfg(feature = "runtime-tokio")]
35use {
36 futures::SinkExt,
37 hopr_lib::{
38 api::{PeerId, graph::EdgeCapacityUpdate, graph::NetworkGraphUpdate},
39 {ChannelStatus, NeighborTelemetry, PathTelemetry},
40 },
41};
42
43#[cfg(feature = "session-server")]
44use crate::{config::SessionIpForwardingConfig, exit::HoprServerIpForwardingReactor};
45
46pub type ReferenceHopr = Hopr<Arc<HoprBlockchainSafeConnector<BlokliClient>>, SharedChannelGraph, HoprNetwork>;
47
48#[cfg(feature = "runtime-tokio")]
49pub async fn build_reference(
50 identity: (&ChainKeypair, &OffchainKeypair),
51 config: HoprLibConfig,
52 blokli_url: String,
53 #[cfg(feature = "session-server")] server_config: SessionIpForwardingConfig,
54) -> anyhow::Result<(
55 Arc<ReferenceHopr>,
56 impl Future<Output = std::result::Result<HoprTransportIO, HoprLibError>>,
57)> {
58 let (chain_key, packet_key) = identity;
59
60 let mut chain_connector = create_trustful_hopr_blokli_connector(
61 chain_key,
62 BlockchainConnectorConfig {
63 connection_sync_timeout: std::time::Duration::from_mins(1),
64 sync_tolerance: 90,
65 tx_timeout_multiplier: std::env::var("HOPR_TX_TIMEOUT_MULTIPLIER")
66 .ok()
67 .and_then(|p| {
68 p.parse()
69 .inspect_err(|error| tracing::warn!(%error, "failed to parse HOPR_TX_TIMEOUT_MULTIPLIER"))
70 .ok()
71 })
72 .unwrap_or_else(|| BlockchainConnectorConfig::default().tx_timeout_multiplier),
73 },
74 BlokliClient::new(
75 blokli_url.parse()?,
76 BlokliClientConfig {
77 timeout: std::time::Duration::from_secs(30),
78 stream_reconnect_timeout: std::time::Duration::from_secs(30),
79 subscription_stream_restart_delay: Some(std::time::Duration::from_secs(1)),
80 },
81 ),
82 config.safe_module.module_address,
83 )
84 .await?;
85 chain_connector.connect().await?;
86 let chain_connector = Arc::new(chain_connector);
87
88 #[cfg(feature = "session-server")]
89 let session_server = HoprServerIpForwardingReactor::new(packet_key.clone(), server_config);
90
91 build_with_chain(
92 chain_key,
93 packet_key,
94 config,
95 None,
96 chain_connector,
97 #[cfg(feature = "session-server")]
98 session_server,
99 )
100 .await
101}
102
103#[cfg(feature = "runtime-tokio")]
104pub async fn build_with_chain<
105 Chain,
106 #[cfg(feature = "session-server")] Srv: HoprSessionServer + Clone + Send + 'static,
107>(
108 chain_key: &ChainKeypair,
109 packet_key: &OffchainKeypair,
110 config: HoprLibConfig,
111 probe_cfg: Option<hopr_ct_full_network::ProberConfig>,
112 chain_connector: Chain,
113 #[cfg(feature = "session-server")] server: Srv,
114) -> anyhow::Result<(
115 Arc<Hopr<Chain, SharedChannelGraph, HoprNetwork>>,
116 impl Future<Output = std::result::Result<HoprTransportIO, HoprLibError>>,
117)>
118where
119 Chain: HoprChainApi + Clone + Send + Sync + 'static,
120{
121 if let Some(ref pcfg) = probe_cfg {
122 pcfg.validate()
123 .map_err(|e| anyhow::anyhow!("invalid ProberConfig: {e}"))?;
124
125 let probe_timeout = config.protocol.probe.timeout;
126 anyhow::ensure!(
127 pcfg.interval >= probe_timeout,
128 "ProberConfig interval ({:?}) must be >= ProbeConfig timeout ({:?}) to prevent overlapping probe rounds",
129 pcfg.interval,
130 probe_timeout,
131 );
132 }
133
134 let minimum_capacity = chain_connector
137 .count_accounts(AccountSelector {
138 public_only: true,
139 ..Default::default()
140 })
141 .await
142 .map_err(hopr_lib::HoprLibError::chain)?
143 .saturating_mul(2)
144 .saturating_add(100);
145
146 let chain_discovery_events_capacity = std::env::var("HOPR_INTERNAL_CHAIN_DISCOVERY_CHANNEL_CAPACITY")
147 .ok()
148 .and_then(|s| s.trim().parse::<usize>().ok())
149 .filter(|&c| c > 0)
150 .unwrap_or(2048)
151 .max(minimum_capacity);
152
153 tracing::debug!(
154 capacity = chain_discovery_events_capacity,
155 minimum_required = minimum_capacity,
156 "creating chain discovery events channel"
157 );
158 let (indexer_peer_update_tx, indexer_peer_update_rx) =
159 futures::channel::mpsc::channel::<PeerDiscovery>(chain_discovery_events_capacity);
160
161 let network_builder = HoprLibp2pNetworkBuilder::new(indexer_peer_update_rx);
163 let graph = std::sync::Arc::new(ChannelGraph::new(*packet_key.public()));
165
166 let chain_events = chain_connector
170 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts, StateSyncOptions::OpenedChannels])?;
171 let network_events = network_builder.subscribe_network_events();
172 let graph_updater = graph.clone();
173 let chain_reader = chain_connector.clone();
174 let indexer_peer_update_tx = indexer_peer_update_tx.clone();
175
176 let proc =
177 async move {
178 enum Event {
179 Chain(ChainEvent),
180 Network(NetworkEvent),
181 }
182
183 let ticket_price = std::sync::Arc::new(parking_lot::RwLock::new(chain_reader.minimum_ticket_price().await.unwrap_or_default()));
184 let win_probability = std::sync::Arc::new(parking_lot::RwLock::new(chain_reader.minimum_incoming_ticket_win_prob().await.unwrap_or_default()));
185
186 network_events
187 .map(Event::Network)
188 .merge(chain_events.map(Event::Chain))
189 .for_each(|event| {
190 let mut indexer_peer_update_tx = indexer_peer_update_tx.clone();
191 let chain_reader = chain_reader.clone();
192 let graph_updater = graph_updater.clone();
193 let ticket_price = ticket_price.clone();
194 let win_probability = win_probability.clone();
195
196 async move {
197 match event {
198 Event::Chain(chain_event) => {
199 match chain_event {
200 ChainEvent::Announcement(account) =>{
201 tracing::debug!(account = %account.public_key, "recording graph update for announced account");
202 graph_updater.record_node(account.public_key);
203 let peer_id: PeerId = account.public_key.into();
204 if let Err(error) = indexer_peer_update_tx.send(PeerDiscovery::Announce(peer_id, account.get_multiaddrs().to_vec())).await {
205 tracing::error!(peer = %peer_id, %error, reason = "failed to propagate the record", "ignoring announced peer")
206 }
207 },
208 ChainEvent::ChannelOpened(channel) |
209 ChainEvent::ChannelClosed(channel) |
210 ChainEvent::ChannelBalanceIncreased(channel, _) |
211 ChainEvent::ChannelBalanceDecreased(channel, _) => {
212 let from = chain_reader.chain_key_to_packet_key(&channel.source).await;
213 let to = chain_reader.chain_key_to_packet_key(&channel.destination).await;
214
215 match (from, to) {
216 (Ok(Some(from)), Ok(Some(to))) => {
217 let capacity = if matches!(channel.status, ChannelStatus::Closed) {
218 None
219 } else if let Ok(ticket_value) = ticket_price.read().div_f64(win_probability.read().as_f64()) {
220 Some(
221 channel.balance
222 .amount()
223 .checked_div(ticket_value.amount())
224 .map(|v| v.low_u128())
225 .unwrap_or(u128::MAX)
226 )
227 } else {
228 None
229 };
230
231 tracing::debug!(%channel, ?capacity, "recording graph update for channel capacity change");
232 graph_updater.record_edge(hopr_lib::api::graph::MeasurableEdge::<NeighborTelemetry, PathTelemetry>::Capacity(Box::new(EdgeCapacityUpdate{
233 capacity,
234 src: from,
235 dest: to
236 })));
237 },
238 (Ok(_), Ok(_)) => {
239 tracing::error!(%channel, "could not find packet keys for the channel endpoints");
240 },
241 (Err(e), _) | (_, Err(e)) => {
242 tracing::error!(%e, %channel, "failed to convert chain keys to packet keys for graph update");
243 }
244 }
245 },
246 ChainEvent::ChannelClosureInitiated(_channel) => {},
247 ChainEvent::WinningProbabilityIncreased(probability) |
248 ChainEvent::WinningProbabilityDecreased(probability) => {
249 tracing::debug!(%probability, "recording winning probability change");
250 *win_probability.write() = probability;
251 }
252 ChainEvent::TicketPriceChanged(price) => {
253 tracing::debug!(%price, "recording ticket price change");
254 *ticket_price.write() = price;
255 },
256 _ => {}
257 }
258 }
259 Event::Network(network_event) => {
260 match network_event {
261 NetworkEvent::PeerConnected(peer_id) =>
262 if let Ok(opk) = hopr_lib::peer_id_to_public_key(&peer_id).await {
263 graph_updater.record_edge(hopr_lib::api::graph::MeasurableEdge::<NeighborTelemetry, PathTelemetry>::ConnectionStatus {
264 peer: opk,
265 connected: true
266 });
267 } else {
268 tracing::error!(%peer_id, "failed to convert peer ID to public key for graph update");
269 },
270 NetworkEvent::PeerDisconnected(peer_id) =>
271 if let Ok(opk) = hopr_lib::peer_id_to_public_key(&peer_id).await {
272 graph_updater.record_edge(hopr_lib::api::graph::MeasurableEdge::<NeighborTelemetry, PathTelemetry>::ConnectionStatus {
273 peer: opk,
274 connected: false
275 });
276 } else {
277 tracing::error!(%peer_id, "failed to convert peer ID to public key for graph update");
278 },
279 };
280 }
281 }
282 }
283 })
284 .await;
285 }
286 .inspect(|_| tracing::warn!(task = "Interconnecting chain, graph and network", "long-running background task finished"));
287 let _jh = tokio::spawn(proc);
288 let node =
292 Arc::new(hopr_lib::Hopr::new((chain_key, packet_key), chain_connector.clone(), graph.clone(), config).await?);
293
294 let node_for_run = node.clone();
295 let start = async move {
296 node_for_run
297 .run(
298 hopr_ct_full_network::FullNetworkDiscovery::new(
299 *packet_key.public(),
300 probe_cfg.unwrap_or_default(),
301 graph,
302 ),
303 network_builder,
304 #[cfg(feature = "session-server")]
305 server,
306 )
307 .await
308 };
309
310 Ok((node, start))
311}