Skip to main content

hopr_lib/
builder.rs

1//! Type-state builder for constructing a [`Hopr`] node.
2//!
3//! The builder guides construction through a series of mandatory phases:
4//!
5//! 1. **Identity** — `HoprBuilder` → `HoprBuilder::with_identity`
6//! 2. **Configuration** — `HoprBuilderWithIdentity::with_config`
7//! 3. **Component factories** — chain API, graph, network, and cover-traffic
8//! 4. **Session server** (when the `session-server` feature is enabled) — `HoprBuilderConfigured::with_session_server`
9//! 5. **Build** — `build_edge` for an entry/exit node or `build_full` for a relay node with ticket management.
10//!
11//! # Example
12//!
13//! ```rust,ignore
14//! use hopr_lib::{config::HoprLibConfig, builder::{HoprBuilder, ChainKeypair, OffchainKeypair, Keypair}};
15//!
16//! let chain_key = ChainKeypair::random();
17//! let offchain_key = OffchainKeypair::random();
18//! let config = HoprLibConfig::default();
19//!
20//! let builder = HoprBuilder
21//!     .with_identity(&chain_key, &offchain_key)
22//!     .with_config(config)
23//!     .with_chain_api(|_ctx| { /* ... */ todo!() })
24//!     .with_graph(|_ctx| { /* ... */ todo!() })
25//!     .with_network(|_ctx| Box::pin(async { /* ... */ Ok(todo!()) }))
26//!     .with_cover_traffic(|_ctx| { /* ... */ todo!() });
27//! ```
28
29mod chain_wiring;
30
31use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
32
33use futures::{FutureExt, StreamExt, channel::mpsc::channel};
34pub use hopr_api::types::crypto::{
35    keypairs::Keypair,
36    prelude::{ChainKeypair, OffchainKeypair},
37};
38use hopr_api::{
39    chain::{AnnouncementError, HoprChainApi, SafeRegistrationError, StateSyncOptions},
40    ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
41    graph::HoprGraphApi,
42    network::{BoxedProcessFn, NetworkStreamControl, NetworkView},
43    node::{AtomicHoprState, HoprState, NodeOnchainIdentity, TicketEvent},
44    tickets::{TicketFactory, TicketManagement},
45    types::{chain::chain_events::ChainEvent, internal::prelude::ChannelDirection, primitive::prelude::Address},
46};
47use hopr_transport::{HoprTransport, IncomingSession};
48use hopr_utils::{
49    network_types::addr::is_public_address,
50    runtime::{AbortableList, prelude::spawn},
51};
52use validator::Validate;
53
54use crate::{
55    Hopr, HoprLibError, HoprLibProcess, MIN_NATIVE_BALANCE, NODE_READY_TIMEOUT, SUGGESTED_NATIVE_BALANCE,
56    config::HoprLibConfig, constants,
57};
58
59#[cfg(all(feature = "telemetry", not(test)))]
60lazy_static::lazy_static! {
61    static ref METRIC_PROCESS_START_TIME:  hopr_api::types::telemetry::SimpleGauge =  hopr_api::types::telemetry::SimpleGauge::new(
62        "hopr_start_time",
63        "The unix timestamp in seconds at which the process was started"
64    ).unwrap();
65    static ref METRIC_HOPR_LIB_VERSION:  hopr_api::types::telemetry::MultiGauge =  hopr_api::types::telemetry::MultiGauge::new(
66        "hopr_lib_version",
67        "Executed version of hopr-lib",
68        &["version"]
69    ).unwrap();
70    static ref METRIC_HOPR_NODE_INFO:  hopr_api::types::telemetry::MultiGauge =  hopr_api::types::telemetry::MultiGauge::new(
71        "hopr_node_addresses",
72        "Node on-chain and off-chain addresses",
73        &["peerid", "address", "safe_address", "module_address"]
74    ).unwrap();
75}
76
77const PEER_DISCOVERY_CHANNEL_CAPACITY: usize = 2048;
78
79type PeerDiscoveryRx =
80    Arc<parking_lot::Mutex<Option<futures::channel::mpsc::Receiver<(hopr_api::PeerId, Vec<hopr_api::Multiaddr>)>>>>;
81
82/// Type-erased factory closure producing `T` from a [`BuildCtx`] reference.
83type Factory<T> = Box<dyn FnOnce(&BuildCtx) -> T + Send>;
84type AsyncFactory<T> = Box<dyn FnOnce(BuildCtx) -> Pin<Box<dyn Future<Output = T> + Send>> + Send>;
85
86/// Context available to factory closures during the build step.
87#[derive(Clone)]
88pub struct BuildCtx {
89    /// Node's on-chain keypair.
90    pub chain_key: ChainKeypair,
91    /// Node's off-chain (packet) keypair.
92    pub packet_key: OffchainKeypair,
93    /// Node configuration.
94    pub cfg: HoprLibConfig,
95    peer_discovery_rx: PeerDiscoveryRx,
96}
97
98impl BuildCtx {
99    /// Take the peer-discovery receiver. Returns `Some` on the first call, `None` afterwards.
100    pub fn take_peer_discovery_rx(
101        &self,
102    ) -> Option<futures::channel::mpsc::Receiver<(hopr_api::PeerId, Vec<hopr_api::Multiaddr>)>> {
103        self.peer_discovery_rx.lock().take()
104    }
105}
106
107// ---------------------------------------------------------------------------
108// Type-state builder phases
109// ---------------------------------------------------------------------------
110
111/// Initial builder — forces `with_identity` first.
112#[derive(Default)]
113pub struct HoprBuilder;
114
115impl HoprBuilder {
116    /// Sets the node's on-chain and off-chain identity.
117    pub fn with_identity(self, chain_key: &ChainKeypair, offchain_key: &OffchainKeypair) -> HoprBuilderWithIdentity {
118        HoprBuilderWithIdentity {
119            chain_key: chain_key.clone(),
120            packet_key: offchain_key.clone(),
121        }
122    }
123}
124
125/// Builder with identity set — forces `with_config` next.
126pub struct HoprBuilderWithIdentity {
127    chain_key: ChainKeypair,
128    packet_key: OffchainKeypair,
129}
130
131impl HoprBuilderWithIdentity {
132    /// Sets the node configuration and produces the configured builder.
133    pub fn with_config(self, cfg: HoprLibConfig) -> HoprBuilderConfigured {
134        let (peer_discovery_tx, peer_discovery_rx) = futures::channel::mpsc::channel(PEER_DISCOVERY_CHANNEL_CAPACITY);
135        HoprBuilderConfigured {
136            ctx: BuildCtx {
137                chain_key: self.chain_key,
138                packet_key: self.packet_key,
139                cfg,
140                peer_discovery_rx: Arc::new(parking_lot::Mutex::new(Some(peer_discovery_rx))),
141            },
142            safe_and_module: None,
143            chain_factory: None,
144            graph_factory: None,
145            network_factory: None,
146            ct_factory: None,
147            peer_discovery_tx,
148        }
149    }
150}
151
152// ---------------------------------------------------------------------------
153// HoprBuilderConfigured — stores factories, no session yet
154// ---------------------------------------------------------------------------
155
156/// Configured builder accepting factory closures for components.
157///
158/// When the `session-server` feature is enabled, `with_session_server`
159/// must be called before building — it returns a `HoprBuilderWithSession` which
160/// has the `build_edge` / `build_full` methods.
161///
162/// When the feature is disabled, `build_edge` / `build_full` are available directly.
163pub struct HoprBuilderConfigured<Chain = (), Graph = (), Net = (), Ct = ()> {
164    ctx: BuildCtx,
165    safe_and_module: Option<(Address, Address)>,
166    chain_factory: Option<Factory<Chain>>,
167    graph_factory: Option<Factory<Graph>>,
168    network_factory: Option<AsyncFactory<Result<(Net, BoxedProcessFn), HoprLibError>>>,
169    ct_factory: Option<Factory<Ct>>,
170    peer_discovery_tx: futures::channel::mpsc::Sender<(hopr_api::PeerId, Vec<hopr_api::Multiaddr>)>,
171}
172
173impl<Chain, Graph, Net, Ct> HoprBuilderConfigured<Chain, Graph, Net, Ct> {
174    /// Sets the node Safe and module addresses.
175    pub fn with_safe_module(mut self, safe: &Address, module: &Address) -> Self {
176        self.safe_and_module = Some((*safe, *module));
177        self
178    }
179
180    /// Sets the chain API factory.
181    pub fn with_chain_api<NewChain>(
182        self,
183        f: impl FnOnce(&BuildCtx) -> NewChain + Send + 'static,
184    ) -> HoprBuilderConfigured<NewChain, Graph, Net, Ct> {
185        HoprBuilderConfigured {
186            ctx: self.ctx,
187            safe_and_module: self.safe_and_module,
188            chain_factory: Some(Box::new(f)),
189            graph_factory: self.graph_factory,
190            network_factory: self.network_factory,
191            ct_factory: self.ct_factory,
192            peer_discovery_tx: self.peer_discovery_tx,
193        }
194    }
195
196    /// Sets the graph factory.
197    pub fn with_graph<NewGraph>(
198        self,
199        f: impl FnOnce(&BuildCtx) -> NewGraph + Send + 'static,
200    ) -> HoprBuilderConfigured<Chain, NewGraph, Net, Ct> {
201        HoprBuilderConfigured {
202            ctx: self.ctx,
203            safe_and_module: self.safe_and_module,
204            chain_factory: self.chain_factory,
205            graph_factory: Some(Box::new(f)),
206            network_factory: self.network_factory,
207            ct_factory: self.ct_factory,
208            peer_discovery_tx: self.peer_discovery_tx,
209        }
210    }
211
212    /// Sets the network factory. Must resolve to `Ok((Net, BoxedProcessFn))`.
213    ///
214    /// The factory receives [`BuildCtx`] by value and returns a boxed future,
215    /// allowing async network construction without blocking the executor.
216    /// Failures are propagated as [`HoprLibError`] during the build step.
217    pub fn with_network<NewNet>(
218        self,
219        f: impl FnOnce(BuildCtx) -> Pin<Box<dyn Future<Output = Result<(NewNet, BoxedProcessFn), HoprLibError>> + Send>>
220        + Send
221        + 'static,
222    ) -> HoprBuilderConfigured<Chain, Graph, NewNet, Ct> {
223        HoprBuilderConfigured {
224            ctx: self.ctx,
225            safe_and_module: self.safe_and_module,
226            chain_factory: self.chain_factory,
227            graph_factory: self.graph_factory,
228            network_factory: Some(Box::new(f)),
229            ct_factory: self.ct_factory,
230            peer_discovery_tx: self.peer_discovery_tx,
231        }
232    }
233
234    /// Sets the cover traffic factory.
235    pub fn with_cover_traffic<NewCt>(
236        self,
237        f: impl FnOnce(&BuildCtx) -> NewCt + Send + 'static,
238    ) -> HoprBuilderConfigured<Chain, Graph, Net, NewCt> {
239        HoprBuilderConfigured {
240            ctx: self.ctx,
241            safe_and_module: self.safe_and_module,
242            chain_factory: self.chain_factory,
243            graph_factory: self.graph_factory,
244            network_factory: self.network_factory,
245            ct_factory: Some(Box::new(f)),
246            peer_discovery_tx: self.peer_discovery_tx,
247        }
248    }
249
250    /// Attaches a session server for handling incoming sessions.
251    ///
252    /// Eagerly spawns the server task and returns a [`HoprBuilderWithSession`]
253    /// that has the `build_edge` / `build_full` methods.
254    #[cfg(feature = "session-server")]
255    pub fn with_session_server(
256        self,
257        server: impl hopr_api::node::HoprSessionServer<Session = IncomingSession, Error: std::fmt::Display>
258        + Clone
259        + Send
260        + 'static,
261    ) -> HoprBuilderWithSession<Chain, Graph, Net, Ct> {
262        let incoming_session_capacity = self.ctx.cfg.incoming_session_capacity.max(1);
263
264        let (session_tx, session_rx) = channel::<IncomingSession>(incoming_session_capacity);
265
266        tracing::debug!(capacity = incoming_session_capacity, "spawning session server");
267        let session_diag = hopr_utils::runtime::diagnostics::ConcurrentDiagnostics::new(
268            "session_server_for_each_concurrent",
269            module_path!(),
270            file!(),
271            line!(),
272        );
273        let handle = hopr_utils::spawn_as_abortable_named!(
274            "hopr_lib_session_server",
275            session_rx
276                .for_each_concurrent(None, move |session| {
277                    let server = server.clone();
278                    let session_diag = session_diag.clone();
279                    session_diag.wrap(async move {
280                        let session_id = *session.session.id();
281                        match server.process(session).await {
282                            Ok(()) => tracing::debug!(?session_id, "session processed successfully"),
283                            Err(error) => {
284                                tracing::error!(?session_id, %error, "session processing failed")
285                            }
286                        }
287                    })
288                })
289                .inspect(|_| tracing::warn!(
290                    task = %HoprLibProcess::SessionServer,
291                    "long-running background task finished"
292                ))
293        );
294
295        HoprBuilderWithSession {
296            inner: self,
297            session_tx,
298            session_handle: handle,
299        }
300    }
301}
302
303// ---------------------------------------------------------------------------
304// HoprBuilderWithSession — session server attached, ready to build
305// ---------------------------------------------------------------------------
306
307/// Builder with a session server attached. Has `build_edge` / `build_full`.
308///
309/// Only exists when the `session-server` feature is enabled.
310#[cfg(feature = "session-server")]
311pub struct HoprBuilderWithSession<Chain = (), Graph = (), Net = (), Ct = ()> {
312    inner: HoprBuilderConfigured<Chain, Graph, Net, Ct>,
313    session_tx: futures::channel::mpsc::Sender<IncomingSession>,
314    session_handle: hopr_utils::runtime::AbortHandle,
315}
316
317// ---------------------------------------------------------------------------
318// Intermediate pre-build state
319// ---------------------------------------------------------------------------
320
321struct PreHopr<Chain, Graph, Net, Ct> {
322    chain_id: ChainKeypair,
323    transport_id: OffchainKeypair,
324    cfg: HoprLibConfig,
325    state: Arc<AtomicHoprState>,
326    transport_api: HoprTransport<Chain, Graph, Net>,
327    chain_api: Chain,
328
329    ticket_event_subscribers: (
330        async_broadcast::Sender<TicketEvent>,
331        async_broadcast::InactiveReceiver<TicketEvent>,
332    ),
333    processes: AbortableList<HoprLibProcess>,
334    session_tx: futures::channel::mpsc::Sender<IncomingSession>,
335    cover_traffic: Ct,
336    network: Net,
337    network_process: BoxedProcessFn,
338}
339
340// ---------------------------------------------------------------------------
341// Shared pre_build logic
342// ---------------------------------------------------------------------------
343
344/// Drains a HoprSocket reader, discarding all packets and logging throughput every ~60 seconds.
345/// Runs until the stream ends (sender side dropped).
346async fn drain_incoming_data<S: futures::Stream + Unpin>(mut reader: S) {
347    let mut received: u64 = 0;
348    let mut last_report = std::time::Instant::now();
349    while reader.next().await.is_some() {
350        received += 1;
351        if last_report.elapsed().as_secs() >= 60 {
352            tracing::info!(
353                received,
354                "incoming-data drain: unrelated packets discarded in last ~1 min"
355            );
356            received = 0;
357            last_report = std::time::Instant::now();
358        }
359    }
360}
361
362async fn pre_build_inner<Chain, Graph, Net, Ct>(
363    configured: HoprBuilderConfigured<Chain, Graph, Net, Ct>,
364    session_tx: futures::channel::mpsc::Sender<IncomingSession>,
365    mut processes: AbortableList<HoprLibProcess>,
366) -> Result<PreHopr<Chain, Graph, Net, Ct>, HoprLibError>
367where
368    Chain: HoprChainApi + Clone + Send + Sync + 'static,
369    Graph: HoprGraphApi<HoprNodeId = hopr_api::OffchainPublicKey> + Clone + Send + Sync + 'static,
370    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
371        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
372    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
373    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
374    Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
375{
376    let peer_discovery_tx = Some(configured.peer_discovery_tx);
377    let ctx = configured.ctx;
378    ctx.cfg.validate()?;
379
380    let chain_api = (configured
381        .chain_factory
382        .ok_or(HoprLibError::BuilderError("missing chain factory"))?)(&ctx);
383    let graph = (configured
384        .graph_factory
385        .ok_or(HoprLibError::BuilderError("missing graph factory"))?)(&ctx);
386    let (network, network_process) =
387        (configured
388            .network_factory
389            .ok_or(HoprLibError::BuilderError("missing network factory"))?)(ctx.clone())
390        .await?;
391    let cover_traffic = (configured
392        .ct_factory
393        .ok_or(HoprLibError::BuilderError("missing cover traffic factory"))?)(&ctx);
394
395    let (chain_id, transport_id) = (ctx.chain_key.clone(), ctx.packet_key.clone());
396
397    let transport_api = HoprTransport::new(
398        (&chain_id, &transport_id),
399        chain_api.clone(),
400        graph.clone(),
401        vec![(&ctx.cfg.host).try_into().map_err(HoprLibError::TransportError)?],
402        ctx.cfg.protocol.clone(),
403    )
404    .map_err(HoprLibError::TransportError)?;
405
406    #[cfg(all(feature = "telemetry", not(test)))]
407    {
408        use hopr_api::types::primitive::traits::AsUnixTimestamp;
409        METRIC_PROCESS_START_TIME.set(std::time::SystemTime::now().as_unix_timestamp().as_secs_f64());
410        METRIC_HOPR_LIB_VERSION.set(
411            &[const_format::formatcp!("{}", constants::APP_VERSION)],
412            const_format::formatcp!(
413                "{}.{}",
414                env!("CARGO_PKG_VERSION_MAJOR"),
415                env!("CARGO_PKG_VERSION_MINOR")
416            )
417            .parse()
418            .unwrap_or(0.0),
419        );
420    }
421
422    let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
423    new_tickets_tx.set_await_active(false);
424    new_tickets_tx.set_overflow(true);
425
426    let me_onchain = chain_id.public().to_address();
427
428    #[cfg(feature = "testing")]
429    tracing::warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
430
431    tracing::info!(
432        address = %me_onchain,
433        minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
434        "node is not started, please fund this node",
435    );
436
437    crate::helpers::wait_for_funds(
438        *MIN_NATIVE_BALANCE,
439        *SUGGESTED_NATIVE_BALANCE,
440        Duration::from_secs(200),
441        me_onchain,
442        &chain_api,
443    )
444    .await?;
445
446    tracing::info!("starting HOPR node...");
447    let balance: hopr_api::types::primitive::prelude::XDaiBalance =
448        chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
449    let minimum_balance = *constants::MIN_NATIVE_BALANCE;
450
451    tracing::info!(address = %me_onchain, %balance, %minimum_balance, "node information");
452
453    if balance.le(&minimum_balance) {
454        return Err(HoprLibError::InsufficientFunds(
455            "cannot start the node without a sufficiently funded wallet".into(),
456        ));
457    }
458
459    #[cfg(debug_assertions)]
460    let skip_protocol_checks = ctx.cfg.disable_protocol_checks;
461    #[cfg(not(debug_assertions))]
462    let skip_protocol_checks = false;
463
464    let network_min_ticket_price = chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)?;
465    let configured_ticket_price = ctx.cfg.protocol.packet.codec.outgoing_ticket_price;
466    if !skip_protocol_checks && configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
467        return Err(HoprLibError::GeneralError(format!(
468            "configured outgoing ticket price < network minimum: {configured_ticket_price:?} < \
469             {network_min_ticket_price}"
470        )));
471    }
472
473    let network_min_win_prob = chain_api
474        .minimum_incoming_ticket_win_prob()
475        .await
476        .map_err(HoprLibError::chain)?;
477    let configured_win_prob = ctx.cfg.protocol.packet.codec.outgoing_win_prob;
478
479    if !skip_protocol_checks && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt()) {
480        return Err(HoprLibError::GeneralError(format!(
481            "configured outgoing win probability < network minimum: {configured_win_prob:?} < {network_min_win_prob}"
482        )));
483    }
484
485    tracing::info!(
486        peer_id = %transport_id.public().to_peerid_str(),
487        address = %me_onchain,
488        version = constants::APP_VERSION,
489        "Node information"
490    );
491
492    let safe_addr = ctx.cfg.safe_module.safe_address;
493    if me_onchain == safe_addr {
494        return Err(HoprLibError::GeneralError(
495            "cannot use self as staking safe address".into(),
496        ));
497    }
498
499    tracing::info!(%safe_addr, "registering safe with this node");
500    match chain_api.register_safe(&safe_addr).await {
501        Ok(awaiter) => {
502            awaiter.await.map_err(|error| {
503                tracing::error!(%safe_addr, %error, "safe registration failed");
504                HoprLibError::chain(error)
505            })?;
506            tracing::info!(%safe_addr, "safe successfully registered with this node");
507        }
508        Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) => {
509            if registered_safe == safe_addr {
510                tracing::info!(%safe_addr, "this safe is already registered with this node");
511            } else {
512                tracing::error!(%safe_addr, %registered_safe, "node registered with different safe");
513                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
514            }
515        }
516        Err(error) => {
517            tracing::error!(%safe_addr, %error, "safe registration failed");
518            return Err(HoprLibError::chain(error));
519        }
520    }
521
522    let multiaddresses_to_announce = if ctx.cfg.publish {
523        transport_api.announceable_multiaddresses()
524    } else {
525        Vec::new()
526    };
527
528    multiaddresses_to_announce
529        .iter()
530        .filter(|a| !is_public_address(a))
531        .for_each(|multi_addr| tracing::warn!(?multi_addr, "announcing private multiaddress"));
532
533    let chain_api_clone = chain_api.clone();
534    let me_offchain = *transport_id.public();
535    let node_ready = spawn(async move {
536        chain_api_clone
537            .await_key_binding(&me_offchain, NODE_READY_TIMEOUT)
538            .await
539    });
540
541    tracing::info!(?multiaddresses_to_announce, "announcing node on chain");
542    match chain_api.announce(&multiaddresses_to_announce, &transport_id).await {
543        Ok(awaiter) => {
544            awaiter.await.map_err(|error| {
545                tracing::error!(?multiaddresses_to_announce, %error, "node announcement failed");
546                HoprLibError::chain(error)
547            })?;
548            tracing::info!(?multiaddresses_to_announce, "node announced successfully");
549        }
550        Err(AnnouncementError::AlreadyAnnounced) => {
551            tracing::info!("node already announced on chain");
552        }
553        Err(error) => {
554            tracing::error!(%error, "failed to transmit node announcement");
555            return Err(HoprLibError::chain(error));
556        }
557    }
558
559    let this_node_account = node_ready
560        .await
561        .map_err(HoprLibError::other)?
562        .map_err(HoprLibError::chain)?;
563    if this_node_account.chain_addr != me_onchain || this_node_account.safe_address.is_none_or(|a| a != safe_addr) {
564        tracing::error!(%this_node_account, "account key-binding mismatch");
565        return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
566    }
567
568    tracing::info!(%this_node_account, "node account is ready");
569
570    // Network → graph event wiring (subscribe before transport starts)
571    {
572        let network_events = network.subscribe_network_events();
573        let graph_updater = graph.clone();
574        spawn(async move {
575            network_events
576                .for_each(|event| {
577                    let graph_updater = graph_updater.clone();
578                    async move {
579                        let (peer_id, connected) = match event {
580                            hopr_api::network::NetworkEvent::PeerConnected(p) => (p, true),
581                            hopr_api::network::NetworkEvent::PeerDisconnected(p) => (p, false),
582                        };
583                        if let Ok(opk) = hopr_api::OffchainPublicKey::from_peerid(&peer_id) {
584                            graph_updater.record_edge(hopr_api::graph::MeasurableEdge::<
585                                hopr_transport::NeighborTelemetry,
586                                hopr_transport::PathTelemetry,
587                            >::ConnectionStatus {
588                                peer: opk,
589                                connected,
590                            });
591                        } else {
592                            tracing::error!(%peer_id, "failed to convert peer ID to public key for graph update");
593                        }
594                    }
595                })
596                .await;
597        });
598    }
599
600    // Chain → graph event wiring
601    {
602        let chain_events = chain_api
603            .subscribe_with_state_sync([StateSyncOptions::PublicAccounts, StateSyncOptions::OpenedChannels])
604            .map_err(HoprLibError::chain)?;
605
606        let graph_updater = graph.clone();
607        let chain_reader = chain_api.clone();
608
609        let own_chain_addr = me_onchain;
610        let own_packet_key = *transport_id.public();
611
612        let ticket_price = Arc::new(parking_lot::RwLock::new(
613            chain_reader.minimum_ticket_price().await.unwrap_or_default(),
614        ));
615        let win_probability = Arc::new(parking_lot::RwLock::new(
616            chain_reader
617                .minimum_incoming_ticket_win_prob()
618                .await
619                .unwrap_or_default(),
620        ));
621
622        let proc = chain_wiring::process_chain_events(
623            chain_reader,
624            graph_updater,
625            chain_events,
626            own_chain_addr,
627            own_packet_key,
628            ticket_price,
629            win_probability,
630            peer_discovery_tx,
631        )
632        .inspect(|_| {
633            tracing::warn!(
634                task = "chain-to-graph event wiring",
635                "long-running background task finished"
636            )
637        });
638        processes.insert(HoprLibProcess::ChannelEvents, hopr_utils::spawn_as_abortable!(proc));
639    }
640
641    Ok(PreHopr {
642        chain_id,
643        transport_id,
644        cfg: ctx.cfg,
645        state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
646        transport_api,
647        chain_api,
648        ticket_event_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
649        processes,
650        session_tx,
651        cover_traffic,
652        network,
653        network_process,
654    })
655}
656
657// ---------------------------------------------------------------------------
658// Build methods — shared via macro to avoid duplicating edge/full logic
659// ---------------------------------------------------------------------------
660
661macro_rules! impl_build_methods {
662    () => {
663        /// Builds an entry [`Hopr`] node.
664        pub async fn build_edge<TFact>(
665            self,
666            ticket_factory: TFact,
667        ) -> Result<Hopr<Chain, Graph, Net, ()>, HoprLibError>
668        where
669            TFact: TicketFactory + Clone + Send + Sync + 'static,
670        {
671            let (configured, session_tx, processes) = self.into_parts();
672            let pre = pre_build_inner(configured, session_tx, processes).await?;
673
674            tracing::info!("starting transport for edge (entry) node");
675            let (socket, transport_processes) = pre
676                .transport_api
677                .run_entry(
678                    pre.cover_traffic,
679                    pre.network,
680                    pre.network_process,
681                    ticket_factory,
682                )
683                .await?;
684
685            // Drain unrelated packets to avoid missing blackhole
686            spawn(drain_incoming_data(socket.reader()));
687
688            let mut processes = pre.processes;
689            processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
690
691            let hopr = Hopr {
692                chain_id: NodeOnchainIdentity {
693                    node_address: pre.chain_id.public().to_address(),
694                    safe_address: pre.cfg.safe_module.safe_address,
695                    module_address: pre.cfg.safe_module.module_address,
696                },
697                cfg: pre.cfg,
698                state: pre.state.clone(),
699                ticket_event_subscribers: pre.ticket_event_subscribers,
700                transport_id: pre.transport_id,
701                transport_api: pre.transport_api,
702                chain_api: pre.chain_api,
703                processes,
704                ticket_manager: (),
705            };
706
707            hopr.state.store(HoprState::Running, std::sync::atomic::Ordering::Relaxed);
708            tracing::info!(
709                id = %hopr.transport_id.public().to_peerid_str(),
710                version = constants::APP_VERSION,
711                "EDGE NODE STARTED AND RUNNING"
712            );
713
714            Ok(hopr)
715        }
716
717        /// Builds a full (relay) [`Hopr`] node.
718        pub async fn build_full<TMgr, TFact>(
719            self,
720            ticket_manager: TMgr,
721            ticket_factory: TFact,
722        ) -> Result<Hopr<Chain, Graph, Net, TMgr>, HoprLibError>
723        where
724            TMgr: TicketManagement + Clone + Send + Sync + 'static,
725            TFact: TicketFactory + Clone + Send + Sync + 'static,
726        {
727            let (configured, session_tx, processes) = self.into_parts();
728            let pre = pre_build_inner(configured, session_tx, processes).await?;
729            let mut processes = pre.processes;
730
731            tracing::info!("starting ticket events processor");
732            let (tickets_tx, tickets_rx) = channel(8192);
733            let (tickets_rx_stream, tickets_handle) = futures::stream::abortable(tickets_rx);
734            processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
735            let new_ticket_tx = pre.ticket_event_subscribers.0.clone();
736            let tmgr_clone = ticket_manager.clone();
737            spawn(
738                hopr_utils::runtime::diagnostics::instrument(
739                    tickets_rx_stream
740                    .for_each(move |event| {
741                        if let TicketEvent::WinningTicket(ticket) = &event
742                            && let Err(error) = tmgr_clone.insert_incoming_ticket(**ticket)
743                        {
744                            tracing::error!(%error, "failed to insert incoming ticket");
745                        }
746                        if let Err(error) = new_ticket_tx.try_broadcast(event) {
747                            tracing::error!(%error, "failed to broadcast ticket event");
748                        }
749                        futures::future::ready(())
750                    })
751                    .inspect(|_| {
752                        tracing::warn!(task = %HoprLibProcess::TicketEvents, "long-running background task finished")
753                    }),
754                    "hopr_lib_ticket_events",
755                    module_path!(),
756                    file!(),
757                    line!(),
758                ),
759            );
760
761            {
762                let chain_for_neglect = pre.chain_api.clone();
763                let tmgr_for_neglect = ticket_manager.clone();
764                let events = pre.chain_api.subscribe().map_err(HoprLibError::chain)?;
765                let (neglect_handle, neglect_reg) = hopr_utils::runtime::AbortHandle::new_pair();
766                let neglect_task = futures::stream::Abortable::new(
767                    events.filter_map(move |event| {
768                        futures::future::ready(match event {
769                            ChainEvent::ChannelClosed(ch) => Some(ch),
770                            _ => None,
771                        })
772                    }),
773                    neglect_reg,
774                )
775                .for_each(move |closed_channel| {
776                    let chain = chain_for_neglect.clone();
777                    let tmgr = tmgr_for_neglect.clone();
778                    async move {
779                        match closed_channel.direction(chain.me()) {
780                            Some(ChannelDirection::Incoming) => {
781                                match tmgr.neglect_tickets(closed_channel.get_id(), None) {
782                                    Ok(neglected) if !neglected.is_empty() => {
783                                        tracing::warn!(
784                                            num_neglected = neglected.len(),
785                                            %closed_channel,
786                                            "tickets on incoming closed channel were neglected"
787                                        );
788                                    }
789                                    Ok(_) => {}
790                                    Err(error) => {
791                                        tracing::error!(
792                                            %error, %closed_channel,
793                                            "failed to neglect tickets on closed channel"
794                                        );
795                                    }
796                                }
797                            }
798                            Some(ChannelDirection::Outgoing) => {}
799                            _ => {}
800                        }
801                    }
802                })
803                .inspect(|_| {
804                    tracing::warn!(
805                        task = %HoprLibProcess::ChannelClosureNeglect,
806                        "channel closure ticket neglect task finished"
807                    )
808                });
809                spawn(hopr_utils::runtime::diagnostics::instrument(
810                    neglect_task,
811                    "hopr_lib_channel_closure_neglect",
812                    module_path!(),
813                    file!(),
814                    line!(),
815                ));
816                processes.insert(HoprLibProcess::ChannelClosureNeglect, neglect_handle);
817            }
818
819            tracing::info!("starting transport for full (relay) node");
820            let (socket, transport_processes) = pre
821                .transport_api
822                .run_relay(
823                    pre.cover_traffic,
824                    pre.network,
825                    pre.network_process,
826                    tickets_tx,
827                    ticket_factory,
828                    pre.session_tx,
829                )
830                .await?;
831            // Drain unrelated packets to avoid missing blackhole
832            spawn(drain_incoming_data(socket.reader()));
833            processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
834
835            let hopr = Hopr {
836                chain_id: NodeOnchainIdentity {
837                    node_address: pre.chain_id.public().to_address(),
838                    safe_address: pre.cfg.safe_module.safe_address,
839                    module_address: pre.cfg.safe_module.module_address,
840                },
841                cfg: pre.cfg,
842                state: pre.state.clone(),
843                ticket_event_subscribers: pre.ticket_event_subscribers,
844                transport_id: pre.transport_id,
845                transport_api: pre.transport_api,
846                chain_api: pre.chain_api,
847                processes,
848                ticket_manager,
849            };
850
851            hopr.state.store(HoprState::Running, std::sync::atomic::Ordering::Relaxed);
852
853            tracing::info!(
854                id = %hopr.transport_id.public().to_peerid_str(),
855                version = constants::APP_VERSION,
856                "FULL NODE STARTED AND RUNNING"
857            );
858
859            #[cfg(all(feature = "telemetry", not(test)))]
860            METRIC_HOPR_NODE_INFO.set(
861                &[
862                    &hopr.transport_id.public().to_peerid_str(),
863                    &hopr.chain_id.node_address.to_string(),
864                    &hopr.chain_id.safe_address.to_string(),
865                    &hopr.chain_id.module_address.to_string(),
866                ],
867                1.0,
868            );
869
870            Ok(hopr)
871        }
872    };
873}
874
875// When session-server is ON: build methods only on HoprBuilderWithSession
876#[cfg(feature = "session-server")]
877impl<Chain, Graph, Net, Ct> HoprBuilderWithSession<Chain, Graph, Net, Ct>
878where
879    Chain: HoprChainApi + Clone + Send + Sync + 'static,
880    Graph: HoprGraphApi<HoprNodeId = hopr_api::OffchainPublicKey> + Clone + Send + Sync + 'static,
881    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
882        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
883    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
884    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
885    Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
886{
887    impl_build_methods!();
888
889    fn into_parts(
890        self,
891    ) -> (
892        HoprBuilderConfigured<Chain, Graph, Net, Ct>,
893        futures::channel::mpsc::Sender<IncomingSession>,
894        AbortableList<HoprLibProcess>,
895    ) {
896        let mut processes = AbortableList::<HoprLibProcess>::default();
897        processes.insert(HoprLibProcess::SessionServer, self.session_handle);
898        (self.inner, self.session_tx, processes)
899    }
900}
901
902// When session-server is OFF: build methods directly on HoprBuilderConfigured
903#[cfg(not(feature = "session-server"))]
904impl<Chain, Graph, Net, Ct> HoprBuilderConfigured<Chain, Graph, Net, Ct>
905where
906    Chain: HoprChainApi + Clone + Send + Sync + 'static,
907    Graph: HoprGraphApi<HoprNodeId = hopr_api::OffchainPublicKey> + Clone + Send + Sync + 'static,
908    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
909        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
910    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
911    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
912    Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
913{
914    impl_build_methods!();
915
916    fn into_parts(
917        self,
918    ) -> (
919        HoprBuilderConfigured<Chain, Graph, Net, Ct>,
920        futures::channel::mpsc::Sender<IncomingSession>,
921        AbortableList<HoprLibProcess>,
922    ) {
923        let (tx, _rx) = channel::<IncomingSession>(1);
924        let processes = AbortableList::<HoprLibProcess>::default();
925        (self, tx, processes)
926    }
927}