Skip to main content

hopr_reference/
lib.rs

1#[cfg(feature = "session-server")]
2pub mod exit;
3
4#[cfg(feature = "session-server")]
5pub mod config;
6
7#[cfg(any(feature = "testing", test))]
8pub mod testing;
9
10use std::sync::Arc;
11
12#[cfg(feature = "runtime-tokio")]
13pub use hopr_lib;
14use hopr_ticket_manager::{HoprTicketManager, RedbStore, RedbTicketQueue};
15#[cfg(feature = "runtime-tokio")]
16use {
17    hopr_chain_connector::{
18        BlockchainConnectorConfig, HoprBlockchainSafeConnector,
19        api::HoprChainApi,
20        blokli_client::{BlokliClient, BlokliClientConfig},
21        create_trustful_hopr_blokli_connector,
22    },
23    hopr_lib::builder::{ChainKeypair, Keypair, OffchainKeypair},
24    hopr_lib::{Hopr, config::HoprLibConfig},
25    hopr_network_graph::{ChannelGraph, SharedChannelGraph},
26    hopr_transport_p2p::{HoprLibp2pNetworkBuilder, HoprNetwork},
27    validator::Validate,
28};
29
30#[cfg(feature = "session-server")]
31use crate::{config::SessionIpForwardingConfig, exit::HoprServerIpForwardingReactor};
32
33/// Shareable [`HoprTicketManager`] with [`RedbStore`] backend.
34pub type SharedTicketManager = Arc<HoprTicketManager<RedbStore, RedbTicketQueue>>;
35
36/// The reference HOPR node type using canonical implementations.
37#[cfg(feature = "runtime-tokio")]
38pub type ReferenceHopr =
39    Hopr<Arc<HoprBlockchainSafeConnector<BlokliClient>>, SharedChannelGraph, HoprNetwork, SharedTicketManager>;
40
41/// Builds a reference HOPR node using canonical implementations.
42#[cfg(feature = "runtime-tokio")]
43pub async fn build_reference(
44    identity: (&ChainKeypair, &OffchainKeypair),
45    config: HoprLibConfig,
46    blokli_url: String,
47    #[cfg(feature = "session-server")] server_config: SessionIpForwardingConfig,
48) -> anyhow::Result<Arc<ReferenceHopr>> {
49    let (chain_key, packet_key) = identity;
50
51    let mut chain_connector = create_trustful_hopr_blokli_connector(
52        chain_key,
53        BlockchainConnectorConfig {
54            connection_sync_timeout: std::time::Duration::from_mins(1),
55            sync_tolerance: 90,
56            tx_timeout_multiplier: std::env::var("HOPR_TX_TIMEOUT_MULTIPLIER")
57                .ok()
58                .and_then(|p| {
59                    p.parse()
60                        .inspect_err(|error| tracing::warn!(%error, "failed to parse HOPR_TX_TIMEOUT_MULTIPLIER"))
61                        .ok()
62                })
63                .unwrap_or_else(|| BlockchainConnectorConfig::default().tx_timeout_multiplier),
64        },
65        BlokliClient::new(
66            blokli_url.parse()?,
67            BlokliClientConfig {
68                timeout: std::time::Duration::from_secs(30),
69                stream_reconnect_timeout: std::time::Duration::from_secs(30),
70                subscription_stream_restart_delay: Some(std::time::Duration::from_secs(1)),
71                ..Default::default()
72            },
73        ),
74        config.safe_module.module_address,
75    )
76    .await?;
77    chain_connector.connect().await?;
78    let chain_connector = Arc::new(chain_connector);
79
80    #[cfg(feature = "session-server")]
81    let session_server = HoprServerIpForwardingReactor::new(packet_key.clone(), server_config);
82
83    build_with_chain(
84        chain_key,
85        packet_key,
86        config,
87        None,
88        chain_connector,
89        #[cfg(feature = "session-server")]
90        session_server,
91    )
92    .await
93}
94
95/// Builds a HOPR node with a custom chain connector using canonical implementations
96/// for all other components.
97#[cfg(feature = "runtime-tokio")]
98pub async fn build_with_chain<
99    Chain,
100    #[cfg(feature = "session-server")] Srv: hopr_lib::api::node::HoprSessionServer<Session = hopr_lib::IncomingSession, Error: std::fmt::Display>
101        + Clone
102        + Send
103        + 'static,
104>(
105    chain_key: &ChainKeypair,
106    packet_key: &OffchainKeypair,
107    config: HoprLibConfig,
108    probe_cfg: Option<hopr_ct_full_network::ProberConfig>,
109    chain_connector: Chain,
110    #[cfg(feature = "session-server")] server: Srv,
111) -> anyhow::Result<Arc<Hopr<Chain, SharedChannelGraph, HoprNetwork, SharedTicketManager>>>
112where
113    Chain: HoprChainApi + Clone + Send + Sync + 'static,
114{
115    if let Some(ref pcfg) = probe_cfg {
116        pcfg.validate()
117            .map_err(|e| anyhow::anyhow!("invalid ProberConfig: {e}"))?;
118        let probe_timeout = config.protocol.probe.timeout;
119        anyhow::ensure!(
120            pcfg.interval >= probe_timeout,
121            "ProberConfig interval ({:?}) must be >= ProbeConfig timeout ({:?})",
122            pcfg.interval,
123            probe_timeout,
124        );
125    }
126
127    let backend = RedbStore::new_temp().map_err(hopr_ticket_manager::TicketManagerError::store)?;
128    let (ticket_manager, ticket_factory) = HoprTicketManager::new_with_factory(backend);
129    let ticket_manager = Arc::new(ticket_manager);
130    let ticket_factory = Arc::new(ticket_factory);
131
132    // Sync ticket manager and factory with on-chain state
133    {
134        use futures::StreamExt;
135        use hopr_lib::api::chain::ChannelSelector;
136
137        let me = chain_connector.me();
138        let incoming_channels: Vec<_> = chain_connector
139            .stream_channels(ChannelSelector::default().with_destination(*me))
140            .map_err(|e| anyhow::anyhow!("failed to stream incoming channels: {e}"))?
141            .collect()
142            .await;
143        ticket_manager
144            .sync_from_incoming_channels(&incoming_channels)
145            .map_err(|e| anyhow::anyhow!("failed to sync ticket manager: {e}"))?;
146
147        let outgoing_channels: Vec<_> = chain_connector
148            .stream_channels(ChannelSelector::default().with_source(*me))
149            .map_err(|e| anyhow::anyhow!("failed to stream outgoing channels: {e}"))?
150            .collect()
151            .await;
152        ticket_factory
153            .sync_from_outgoing_channels(&outgoing_channels)
154            .map_err(|e| anyhow::anyhow!("failed to sync ticket factory: {e}"))?;
155    }
156
157    // Chain→peer-discovery wiring
158    let (peer_discovery_tx, peer_discovery_rx) = futures::channel::mpsc::channel(2048);
159    {
160        use futures::{SinkExt, StreamExt};
161        use hopr_lib::api::chain::StateSyncOptions;
162        let chain_events = chain_connector
163            .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])
164            .map_err(|e| anyhow::anyhow!("failed to subscribe to chain events: {e}"))?;
165        let tx = peer_discovery_tx;
166        tokio::spawn(async move {
167            chain_events
168                .for_each(|event| {
169                    let mut tx = tx.clone();
170                    async move {
171                        if let hopr_lib::api::types::chain::chain_events::ChainEvent::Announcement(account) = event {
172                            let peer_id: hopr_lib::api::PeerId = account.public_key.into();
173                            if let Err(error) = tx
174                                .send(hopr_transport_p2p::PeerDiscovery::Announce(
175                                    peer_id,
176                                    account.get_multiaddrs().to_vec(),
177                                ))
178                                .await
179                            {
180                                tracing::error!(%peer_id, %error, "failed to send peer discovery event");
181                            }
182                        }
183                    }
184                })
185                .await;
186        });
187    }
188
189    let prober_cfg = probe_cfg.unwrap_or_default();
190    let graph: SharedChannelGraph = Arc::new(ChannelGraph::new(*packet_key.public()));
191    let graph_for_ct = graph.clone();
192
193    let safe_address = config.safe_module.safe_address;
194    let module_address = config.safe_module.module_address;
195
196    let builder = hopr_lib::builder::HoprBuilder::new()
197        .with_identity(chain_key, packet_key)
198        .with_config(config)
199        .with_safe_module(&safe_address, &module_address)
200        .with_chain_api(move |_ctx| chain_connector)
201        .with_graph(move |_ctx| graph)
202        .with_network(move |ctx| {
203            Box::pin(async move {
204                let multiaddresses = vec![
205                    (&ctx.cfg.host)
206                        .try_into()
207                        .expect("host config must be a valid multiaddress"),
208                ];
209                let nb = HoprLibp2pNetworkBuilder::new(peer_discovery_rx);
210                nb.build(
211                    &ctx.packet_key,
212                    multiaddresses,
213                    "/hopr/mix/1.1.0",
214                    ctx.cfg.protocol.transport.prefer_local_addresses,
215                )
216                .await
217                .expect("network must be constructible")
218            })
219        })
220        .with_cover_traffic(move |ctx| {
221            hopr_ct_full_network::FullNetworkDiscovery::new(*ctx.packet_key.public(), prober_cfg, graph_for_ct)
222        });
223
224    #[cfg(feature = "session-server")]
225    let builder = builder.with_session_server(server);
226
227    let node = builder.build_full(ticket_manager, ticket_factory).await?;
228
229    Ok(Arc::new(node))
230}