Skip to main content

hopr_lib/
builder.rs

1//! Type-state builder for constructing a [`Hopr`] node.
2//!
3//! The builder guides construction through a series of mandatory phases:
4//!
5//! 1. **Identity** — `HoprBuilder` → `HoprBuilder::with_identity`
6//! 2. **Configuration** — `HoprBuilderWithIdentity::with_config`
7//! 3. **Component factories** — chain API, graph, network, and cover-traffic
8//! 4. **Session server** (when the `session-server` feature is enabled) — `HoprBuilderConfigured::with_session_server`
9//! 5. **Build** — `build_edge` for an entry/exit node or `build_full` for a relay node with ticket management.
10//!
11//! # Example
12//!
13//! ```rust,ignore
14//! use hopr_lib::{config::HoprLibConfig, builder::{HoprBuilder, ChainKeypair, OffchainKeypair, Keypair}};
15//!
16//! let chain_key = ChainKeypair::random();
17//! let offchain_key = OffchainKeypair::random();
18//! let config = HoprLibConfig::default();
19//!
20//! let builder = HoprBuilder
21//!     .with_identity(&chain_key, &offchain_key)
22//!     .with_config(config)
23//!     .with_chain_api(|_ctx| { /* ... */ todo!() })
24//!     .with_graph(|_ctx| { /* ... */ todo!() })
25//!     .with_network(|_ctx| Box::pin(async { /* ... */ todo!() }))
26//!     .with_cover_traffic(|_ctx| { /* ... */ todo!() });
27//! ```
28
29use std::{convert::identity, future::Future, pin::Pin, sync::Arc, time::Duration};
30
31use futures::{FutureExt, StreamExt, channel::mpsc::channel};
32pub use hopr_api::types::crypto::{
33    keypairs::Keypair,
34    prelude::{ChainKeypair, OffchainKeypair},
35};
36use hopr_api::{
37    chain::{AnnouncementError, HoprChainApi, SafeRegistrationError, StateSyncOptions},
38    ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
39    graph::{EdgeCapacityUpdate, HoprGraphApi},
40    network::{BoxedProcessFn, NetworkStreamControl, NetworkView},
41    node::{AtomicHoprState, HoprState, NodeOnchainIdentity, TicketEvent},
42    tickets::{TicketFactory, TicketManagement},
43    types::{
44        chain::chain_events::ChainEvent,
45        internal::prelude::{ChannelDirection, ChannelStatus},
46        primitive::prelude::{Address, UnitaryFloatOps},
47    },
48};
49use hopr_async_runtime::{AbortableList, prelude::spawn};
50use hopr_network_types::addr::is_public_address;
51use hopr_transport::{HoprTransport, IncomingSession};
52use validator::Validate;
53
54use crate::{
55    Hopr, HoprLibError, HoprLibProcess, MIN_NATIVE_BALANCE, NODE_READY_TIMEOUT, SUGGESTED_NATIVE_BALANCE,
56    config::HoprLibConfig, constants,
57};
58
59#[cfg(all(feature = "telemetry", not(test)))]
60lazy_static::lazy_static! {
61    static ref METRIC_PROCESS_START_TIME:  hopr_metrics::SimpleGauge =  hopr_metrics::SimpleGauge::new(
62        "hopr_start_time",
63        "The unix timestamp in seconds at which the process was started"
64    ).unwrap();
65    static ref METRIC_HOPR_LIB_VERSION:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
66        "hopr_lib_version",
67        "Executed version of hopr-lib",
68        &["version"]
69    ).unwrap();
70    static ref METRIC_HOPR_NODE_INFO:  hopr_metrics::MultiGauge =  hopr_metrics::MultiGauge::new(
71        "hopr_node_addresses",
72        "Node on-chain and off-chain addresses",
73        &["peerid", "address", "safe_address", "module_address"]
74    ).unwrap();
75}
76
77/// Type-erased factory closure producing `T` from a [`BuildCtx`] reference.
78type Factory<T> = Box<dyn FnOnce(&BuildCtx) -> T + Send>;
79type AsyncFactory<T> = Box<dyn FnOnce(BuildCtx) -> Pin<Box<dyn Future<Output = T> + Send>> + Send>;
80
81/// Context available to factory closures during the build step.
82#[derive(Clone)]
83pub struct BuildCtx {
84    /// Node's on-chain keypair.
85    pub chain_key: ChainKeypair,
86    /// Node's off-chain (packet) keypair.
87    pub packet_key: OffchainKeypair,
88    /// Node configuration.
89    pub cfg: HoprLibConfig,
90}
91
92// ---------------------------------------------------------------------------
93// Type-state builder phases
94// ---------------------------------------------------------------------------
95
96/// Initial builder — forces `with_identity` first.
97#[derive(Default)]
98pub struct HoprBuilder;
99
100impl HoprBuilder {
101    /// Sets the node's on-chain and off-chain identity.
102    pub fn with_identity(self, chain_key: &ChainKeypair, offchain_key: &OffchainKeypair) -> HoprBuilderWithIdentity {
103        HoprBuilderWithIdentity {
104            chain_key: chain_key.clone(),
105            packet_key: offchain_key.clone(),
106        }
107    }
108}
109
110/// Builder with identity set — forces `with_config` next.
111pub struct HoprBuilderWithIdentity {
112    chain_key: ChainKeypair,
113    packet_key: OffchainKeypair,
114}
115
116impl HoprBuilderWithIdentity {
117    /// Sets the node configuration and produces the configured builder.
118    pub fn with_config(self, cfg: HoprLibConfig) -> HoprBuilderConfigured {
119        HoprBuilderConfigured {
120            ctx: BuildCtx {
121                chain_key: self.chain_key,
122                packet_key: self.packet_key,
123                cfg,
124            },
125            safe_and_module: None,
126            chain_factory: None,
127            graph_factory: None,
128            network_factory: None,
129            ct_factory: None,
130        }
131    }
132}
133
134// ---------------------------------------------------------------------------
135// HoprBuilderConfigured — stores factories, no session yet
136// ---------------------------------------------------------------------------
137
138/// Configured builder accepting factory closures for components.
139///
140/// When the `session-server` feature is enabled, [`with_session_server`](HoprBuilderConfigured::with_session_server)
141/// must be called before building — it returns a [`HoprBuilderWithSession`] which
142/// has the `build_edge` / `build_full` methods.
143///
144/// When the feature is disabled, `build_edge` / `build_full` are available directly.
145pub struct HoprBuilderConfigured<Chain = (), Graph = (), Net = (), Ct = ()> {
146    ctx: BuildCtx,
147    safe_and_module: Option<(Address, Address)>,
148    chain_factory: Option<Factory<Chain>>,
149    graph_factory: Option<Factory<Graph>>,
150    network_factory: Option<AsyncFactory<(Net, BoxedProcessFn)>>,
151    ct_factory: Option<Factory<Ct>>,
152}
153
154impl<Chain, Graph, Net, Ct> HoprBuilderConfigured<Chain, Graph, Net, Ct> {
155    /// Sets the node Safe and module addresses.
156    pub fn with_safe_module(mut self, safe: &Address, module: &Address) -> Self {
157        self.safe_and_module = Some((*safe, *module));
158        self
159    }
160
161    /// Sets the chain API factory.
162    pub fn with_chain_api<NewChain>(
163        self,
164        f: impl FnOnce(&BuildCtx) -> NewChain + Send + 'static,
165    ) -> HoprBuilderConfigured<NewChain, Graph, Net, Ct> {
166        HoprBuilderConfigured {
167            ctx: self.ctx,
168            safe_and_module: self.safe_and_module,
169            chain_factory: Some(Box::new(f)),
170            graph_factory: self.graph_factory,
171            network_factory: self.network_factory,
172            ct_factory: self.ct_factory,
173        }
174    }
175
176    /// Sets the graph factory.
177    pub fn with_graph<NewGraph>(
178        self,
179        f: impl FnOnce(&BuildCtx) -> NewGraph + Send + 'static,
180    ) -> HoprBuilderConfigured<Chain, NewGraph, Net, Ct> {
181        HoprBuilderConfigured {
182            ctx: self.ctx,
183            safe_and_module: self.safe_and_module,
184            chain_factory: self.chain_factory,
185            graph_factory: Some(Box::new(f)),
186            network_factory: self.network_factory,
187            ct_factory: self.ct_factory,
188        }
189    }
190
191    /// Sets the network factory. Must return `(Net, BoxedProcessFn)`.
192    ///
193    /// The factory receives [`BuildCtx`] by value and returns a boxed future,
194    /// allowing async network construction without blocking the executor.
195    pub fn with_network<NewNet>(
196        self,
197        f: impl FnOnce(BuildCtx) -> Pin<Box<dyn Future<Output = (NewNet, BoxedProcessFn)> + Send>> + Send + 'static,
198    ) -> HoprBuilderConfigured<Chain, Graph, NewNet, Ct> {
199        HoprBuilderConfigured {
200            ctx: self.ctx,
201            safe_and_module: self.safe_and_module,
202            chain_factory: self.chain_factory,
203            graph_factory: self.graph_factory,
204            network_factory: Some(Box::new(f)),
205            ct_factory: self.ct_factory,
206        }
207    }
208
209    /// Sets the cover traffic factory.
210    pub fn with_cover_traffic<NewCt>(
211        self,
212        f: impl FnOnce(&BuildCtx) -> NewCt + Send + 'static,
213    ) -> HoprBuilderConfigured<Chain, Graph, Net, NewCt> {
214        HoprBuilderConfigured {
215            ctx: self.ctx,
216            safe_and_module: self.safe_and_module,
217            chain_factory: self.chain_factory,
218            graph_factory: self.graph_factory,
219            network_factory: self.network_factory,
220            ct_factory: Some(Box::new(f)),
221        }
222    }
223
224    /// Attaches a session server for handling incoming sessions.
225    ///
226    /// Eagerly spawns the server task and returns a [`HoprBuilderWithSession`]
227    /// that has the `build_edge` / `build_full` methods.
228    #[cfg(feature = "session-server")]
229    pub fn with_session_server(
230        self,
231        server: impl hopr_api::node::HoprSessionServer<Session = IncomingSession, Error: std::fmt::Display>
232        + Clone
233        + Send
234        + 'static,
235    ) -> HoprBuilderWithSession<Chain, Graph, Net, Ct> {
236        let incoming_session_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
237            .ok()
238            .and_then(|s| s.trim().parse::<usize>().ok())
239            .filter(|&c| c > 0)
240            .unwrap_or(256);
241
242        let (session_tx, session_rx) = channel::<IncomingSession>(incoming_session_capacity);
243
244        tracing::debug!(capacity = incoming_session_capacity, "spawning session server");
245        let handle = hopr_async_runtime::spawn_as_abortable!(
246            session_rx
247                .for_each_concurrent(None, move |session| {
248                    let server = server.clone();
249                    async move {
250                        let session_id = *session.session.id();
251                        match server.process(session).await {
252                            Ok(()) => tracing::debug!(?session_id, "session processed successfully"),
253                            Err(error) => {
254                                tracing::error!(?session_id, %error, "session processing failed")
255                            }
256                        }
257                    }
258                })
259                .inspect(|_| tracing::warn!(
260                    task = %HoprLibProcess::SessionServer,
261                    "long-running background task finished"
262                ))
263        );
264
265        HoprBuilderWithSession {
266            inner: self,
267            session_tx,
268            session_handle: handle,
269        }
270    }
271}
272
273// ---------------------------------------------------------------------------
274// HoprBuilderWithSession — session server attached, ready to build
275// ---------------------------------------------------------------------------
276
277/// Builder with a session server attached. Has `build_edge` / `build_full`.
278///
279/// Only exists when the `session-server` feature is enabled.
280#[cfg(feature = "session-server")]
281pub struct HoprBuilderWithSession<Chain = (), Graph = (), Net = (), Ct = ()> {
282    inner: HoprBuilderConfigured<Chain, Graph, Net, Ct>,
283    session_tx: futures::channel::mpsc::Sender<IncomingSession>,
284    session_handle: hopr_async_runtime::AbortHandle,
285}
286
287// ---------------------------------------------------------------------------
288// Intermediate pre-build state
289// ---------------------------------------------------------------------------
290
291struct PreHopr<Chain, Graph, Net, Ct> {
292    chain_id: ChainKeypair,
293    transport_id: OffchainKeypair,
294    cfg: HoprLibConfig,
295    state: Arc<AtomicHoprState>,
296    transport_api: HoprTransport<Chain, Graph, Net>,
297    chain_api: Chain,
298
299    ticket_event_subscribers: (
300        async_broadcast::Sender<TicketEvent>,
301        async_broadcast::InactiveReceiver<TicketEvent>,
302    ),
303    processes: AbortableList<HoprLibProcess>,
304    session_tx: futures::channel::mpsc::Sender<IncomingSession>,
305    cover_traffic: Ct,
306    network: Net,
307    network_process: BoxedProcessFn,
308}
309
310// ---------------------------------------------------------------------------
311// Shared pre_build logic
312// ---------------------------------------------------------------------------
313
314async fn pre_build_inner<Chain, Graph, Net, Ct>(
315    configured: HoprBuilderConfigured<Chain, Graph, Net, Ct>,
316    session_tx: futures::channel::mpsc::Sender<IncomingSession>,
317    mut processes: AbortableList<HoprLibProcess>,
318) -> Result<PreHopr<Chain, Graph, Net, Ct>, HoprLibError>
319where
320    Chain: HoprChainApi + Clone + Send + Sync + 'static,
321    Graph: HoprGraphApi<HoprNodeId = hopr_api::OffchainPublicKey> + Clone + Send + Sync + 'static,
322    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
323        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
324    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
325    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
326    Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
327{
328    let ctx = configured.ctx;
329    ctx.cfg.validate()?;
330
331    let chain_api = (configured
332        .chain_factory
333        .ok_or(HoprLibError::BuilderError("missing chain factory"))?)(&ctx);
334    let graph = (configured
335        .graph_factory
336        .ok_or(HoprLibError::BuilderError("missing graph factory"))?)(&ctx);
337    let (network, network_process) =
338        (configured
339            .network_factory
340            .ok_or(HoprLibError::BuilderError("missing network factory"))?)(ctx.clone())
341        .await;
342    let cover_traffic = (configured
343        .ct_factory
344        .ok_or(HoprLibError::BuilderError("missing cover traffic factory"))?)(&ctx);
345
346    let (chain_id, transport_id) = (ctx.chain_key.clone(), ctx.packet_key.clone());
347
348    let transport_api = HoprTransport::new(
349        (&chain_id, &transport_id),
350        chain_api.clone(),
351        graph.clone(),
352        vec![(&ctx.cfg.host).try_into().map_err(HoprLibError::TransportError)?],
353        ctx.cfg.protocol.clone(),
354    )
355    .map_err(HoprLibError::TransportError)?;
356
357    #[cfg(all(feature = "telemetry", not(test)))]
358    {
359        use hopr_api::types::primitive::traits::AsUnixTimestamp;
360        METRIC_PROCESS_START_TIME.set(hopr_platform::time::current_time().as_unix_timestamp().as_secs_f64());
361        METRIC_HOPR_LIB_VERSION.set(
362            &[const_format::formatcp!("{}", constants::APP_VERSION)],
363            const_format::formatcp!(
364                "{}.{}",
365                env!("CARGO_PKG_VERSION_MAJOR"),
366                env!("CARGO_PKG_VERSION_MINOR")
367            )
368            .parse()
369            .unwrap_or(0.0),
370        );
371    }
372
373    let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
374    new_tickets_tx.set_await_active(false);
375    new_tickets_tx.set_overflow(true);
376
377    let me_onchain = chain_id.public().to_address();
378
379    #[cfg(feature = "testing")]
380    tracing::warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
381
382    tracing::info!(
383        address = %me_onchain,
384        minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
385        "node is not started, please fund this node",
386    );
387
388    crate::helpers::wait_for_funds(
389        *MIN_NATIVE_BALANCE,
390        *SUGGESTED_NATIVE_BALANCE,
391        Duration::from_secs(200),
392        me_onchain,
393        &chain_api,
394    )
395    .await?;
396
397    tracing::info!("starting HOPR node...");
398    let balance: hopr_api::types::primitive::prelude::XDaiBalance =
399        chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
400    let minimum_balance = *constants::MIN_NATIVE_BALANCE;
401
402    tracing::info!(address = %me_onchain, %balance, %minimum_balance, "node information");
403
404    if balance.le(&minimum_balance) {
405        return Err(HoprLibError::InsufficientFunds(
406            "cannot start the node without a sufficiently funded wallet".into(),
407        ));
408    }
409
410    let network_min_ticket_price = chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)?;
411    let configured_ticket_price = ctx.cfg.protocol.packet.codec.outgoing_ticket_price;
412    if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
413        return Err(HoprLibError::GeneralError(format!(
414            "configured outgoing ticket price < network minimum: {configured_ticket_price:?} < \
415             {network_min_ticket_price}"
416        )));
417    }
418
419    let network_min_win_prob = chain_api
420        .minimum_incoming_ticket_win_prob()
421        .await
422        .map_err(HoprLibError::chain)?;
423    let configured_win_prob = ctx.cfg.protocol.packet.codec.outgoing_win_prob;
424    if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
425        && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
426    {
427        return Err(HoprLibError::GeneralError(format!(
428            "configured outgoing win probability < network minimum: {configured_win_prob:?} < {network_min_win_prob}"
429        )));
430    }
431
432    tracing::info!(
433        peer_id = %transport_id.public().to_peerid_str(),
434        address = %me_onchain,
435        version = constants::APP_VERSION,
436        "Node information"
437    );
438
439    let safe_addr = ctx.cfg.safe_module.safe_address;
440    if me_onchain == safe_addr {
441        return Err(HoprLibError::GeneralError(
442            "cannot use self as staking safe address".into(),
443        ));
444    }
445
446    tracing::info!(%safe_addr, "registering safe with this node");
447    match chain_api.register_safe(&safe_addr).await {
448        Ok(awaiter) => {
449            awaiter.await.map_err(|error| {
450                tracing::error!(%safe_addr, %error, "safe registration failed");
451                HoprLibError::chain(error)
452            })?;
453            tracing::info!(%safe_addr, "safe successfully registered with this node");
454        }
455        Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) => {
456            if registered_safe == safe_addr {
457                tracing::info!(%safe_addr, "this safe is already registered with this node");
458            } else {
459                tracing::error!(%safe_addr, %registered_safe, "node registered with different safe");
460                return Err(HoprLibError::GeneralError("node registered with different safe".into()));
461            }
462        }
463        Err(error) => {
464            tracing::error!(%safe_addr, %error, "safe registration failed");
465            return Err(HoprLibError::chain(error));
466        }
467    }
468
469    let multiaddresses_to_announce = if ctx.cfg.publish {
470        transport_api.announceable_multiaddresses()
471    } else {
472        Vec::new()
473    };
474
475    multiaddresses_to_announce
476        .iter()
477        .filter(|a| !is_public_address(a))
478        .for_each(|multi_addr| tracing::warn!(?multi_addr, "announcing private multiaddress"));
479
480    let chain_api_clone = chain_api.clone();
481    let me_offchain = *transport_id.public();
482    let node_ready = spawn(async move {
483        chain_api_clone
484            .await_key_binding(&me_offchain, NODE_READY_TIMEOUT)
485            .await
486    });
487
488    tracing::info!(?multiaddresses_to_announce, "announcing node on chain");
489    match chain_api.announce(&multiaddresses_to_announce, &transport_id).await {
490        Ok(awaiter) => {
491            awaiter.await.map_err(|error| {
492                tracing::error!(?multiaddresses_to_announce, %error, "node announcement failed");
493                HoprLibError::chain(error)
494            })?;
495            tracing::info!(?multiaddresses_to_announce, "node announced successfully");
496        }
497        Err(AnnouncementError::AlreadyAnnounced) => {
498            tracing::info!("node already announced on chain");
499        }
500        Err(error) => {
501            tracing::error!(%error, "failed to transmit node announcement");
502            return Err(HoprLibError::chain(error));
503        }
504    }
505
506    let this_node_account = node_ready
507        .await
508        .map_err(HoprLibError::other)?
509        .map_err(HoprLibError::chain)?;
510    if this_node_account.chain_addr != me_onchain || this_node_account.safe_address.is_none_or(|a| a != safe_addr) {
511        tracing::error!(%this_node_account, "account key-binding mismatch");
512        return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
513    }
514
515    tracing::info!(%this_node_account, "node account is ready");
516
517    // Network → graph event wiring (subscribe before transport starts)
518    {
519        let network_events = network.subscribe_network_events();
520        let graph_updater = graph.clone();
521        spawn(async move {
522            network_events
523                .for_each(|event| {
524                    let graph_updater = graph_updater.clone();
525                    async move {
526                        let (peer_id, connected) = match event {
527                            hopr_api::network::NetworkEvent::PeerConnected(p) => (p, true),
528                            hopr_api::network::NetworkEvent::PeerDisconnected(p) => (p, false),
529                        };
530                        if let Ok(opk) = hopr_api::OffchainPublicKey::from_peerid(&peer_id) {
531                            graph_updater.record_edge(hopr_api::graph::MeasurableEdge::<
532                                hopr_transport::NeighborTelemetry,
533                                hopr_transport::PathTelemetry,
534                            >::ConnectionStatus {
535                                peer: opk,
536                                connected,
537                            });
538                        } else {
539                            tracing::error!(%peer_id, "failed to convert peer ID to public key for graph update");
540                        }
541                    }
542                })
543                .await;
544        });
545    }
546
547    // Chain → graph event wiring
548    {
549        let chain_events = chain_api
550            .subscribe_with_state_sync([StateSyncOptions::PublicAccounts, StateSyncOptions::OpenedChannels])
551            .map_err(HoprLibError::chain)?;
552
553        let graph_updater = graph.clone();
554        let chain_reader = chain_api.clone();
555
556        let ticket_price = Arc::new(parking_lot::RwLock::new(
557            chain_reader.minimum_ticket_price().await.unwrap_or_default(),
558        ));
559        let win_probability = Arc::new(parking_lot::RwLock::new(
560            chain_reader
561                .minimum_incoming_ticket_win_prob()
562                .await
563                .unwrap_or_default(),
564        ));
565
566        let proc = async move {
567            chain_events
568                .for_each(|chain_event| {
569                    let chain_reader = chain_reader.clone();
570                    let graph_updater = graph_updater.clone();
571                    let ticket_price = ticket_price.clone();
572                    let win_probability = win_probability.clone();
573
574                    async move {
575                        match chain_event {
576                            ChainEvent::Announcement(account) => {
577                                tracing::debug!(
578                                    account = %account.public_key,
579                                    "recording graph node for announced account"
580                                );
581                                graph_updater.record_node(account.public_key);
582                            }
583                            ChainEvent::ChannelOpened(channel)
584                            | ChainEvent::ChannelClosed(channel)
585                            | ChainEvent::ChannelBalanceIncreased(channel, _)
586                            | ChainEvent::ChannelBalanceDecreased(channel, _) => {
587                                let keys = hopr_async_runtime::prelude::spawn_blocking(move || {
588                                    chain_reader
589                                        .chain_key_to_packet_key(&channel.source)
590                                        .and_then(|src| {
591                                            Ok(src.zip(chain_reader.chain_key_to_packet_key(&channel.destination)?))
592                                        })
593                                        .map_err(anyhow::Error::from)
594                                })
595                                .await
596                                .map_err(anyhow::Error::from)
597                                .and_then(identity);
598
599                                match keys {
600                                    Ok(Some((from, to))) => {
601                                        let capacity = if matches!(channel.status, ChannelStatus::Closed) {
602                                            None
603                                        } else if let Ok(ticket_value) =
604                                            ticket_price.read().div_f64(win_probability.read().as_f64())
605                                        {
606                                            Some(
607                                                channel
608                                                    .balance
609                                                    .amount()
610                                                    .checked_div(ticket_value.amount())
611                                                    .map(|v| v.low_u128())
612                                                    .unwrap_or(u128::MAX),
613                                            )
614                                        } else {
615                                            None
616                                        };
617
618                                        tracing::debug!(
619                                            %channel, ?capacity,
620                                            "recording graph edge for channel capacity"
621                                        );
622                                        graph_updater.record_edge(hopr_api::graph::MeasurableEdge::<
623                                            hopr_transport::NeighborTelemetry,
624                                            hopr_transport::PathTelemetry,
625                                        >::Capacity(
626                                            Box::new(EdgeCapacityUpdate {
627                                                capacity,
628                                                src: from,
629                                                dest: to,
630                                            }),
631                                        ));
632                                    }
633                                    Ok(None) => {
634                                        tracing::error!(
635                                            %channel,
636                                            "could not find packet keys for channel endpoints"
637                                        );
638                                    }
639                                    Err(error) => {
640                                        tracing::error!(
641                                            %error, %channel,
642                                            "failed to convert chain keys to packet keys"
643                                        );
644                                    }
645                                }
646                            }
647                            ChainEvent::ChannelClosureInitiated(_) => {}
648                            ChainEvent::WinningProbabilityIncreased(prob)
649                            | ChainEvent::WinningProbabilityDecreased(prob) => {
650                                tracing::debug!(%prob, "recording winning probability change");
651                                *win_probability.write() = prob;
652                            }
653                            ChainEvent::TicketPriceChanged(price) => {
654                                tracing::debug!(%price, "recording ticket price change");
655                                *ticket_price.write() = price;
656                            }
657                            _ => {}
658                        }
659                    }
660                })
661                .await;
662        }
663        .inspect(|_| {
664            tracing::warn!(
665                task = "chain-to-graph event wiring",
666                "long-running background task finished"
667            )
668        });
669        processes.insert(
670            HoprLibProcess::ChannelEvents,
671            hopr_async_runtime::spawn_as_abortable!(proc),
672        );
673    }
674
675    Ok(PreHopr {
676        chain_id,
677        transport_id,
678        cfg: ctx.cfg,
679        state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
680        transport_api,
681        chain_api,
682        ticket_event_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
683        processes,
684        session_tx,
685        cover_traffic,
686        network,
687        network_process,
688    })
689}
690
691// ---------------------------------------------------------------------------
692// Build methods — shared via macro to avoid duplicating edge/full logic
693// ---------------------------------------------------------------------------
694
695macro_rules! impl_build_methods {
696    () => {
697        /// Builds an edge (entry/exit) [`Hopr`] node.
698        pub async fn build_edge<TFact>(
699            self,
700            ticket_factory: TFact,
701        ) -> Result<Hopr<Chain, Graph, Net, ()>, HoprLibError>
702        where
703            TFact: TicketFactory + Clone + Send + Sync + 'static,
704        {
705            let (configured, session_tx, processes) = self.into_parts();
706            let pre = pre_build_inner(configured, session_tx, processes).await?;
707
708            tracing::info!("starting transport for edge node");
709            let (_, transport_processes) = pre
710                .transport_api
711                .run(
712                    pre.cover_traffic,
713                    pre.network,
714                    pre.network_process,
715                    futures::sink::drain(),
716                    ticket_factory,
717                    pre.session_tx,
718                )
719                .await?;
720
721            let mut processes = pre.processes;
722            processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
723
724            let hopr = Hopr {
725                chain_id: NodeOnchainIdentity {
726                    node_address: pre.chain_id.public().to_address(),
727                    safe_address: pre.cfg.safe_module.safe_address,
728                    module_address: pre.cfg.safe_module.module_address,
729                },
730                cfg: pre.cfg,
731                state: pre.state.clone(),
732                ticket_event_subscribers: pre.ticket_event_subscribers,
733                transport_id: pre.transport_id,
734                transport_api: pre.transport_api,
735                chain_api: pre.chain_api,
736                processes,
737                ticket_manager: (),
738            };
739
740            hopr.state.store(HoprState::Running, std::sync::atomic::Ordering::Relaxed);
741            tracing::info!(
742                id = %hopr.transport_id.public().to_peerid_str(),
743                version = constants::APP_VERSION,
744                "EDGE NODE STARTED AND RUNNING"
745            );
746
747            Ok(hopr)
748        }
749
750        /// Builds a full (relay) [`Hopr`] node.
751        pub async fn build_full<TMgr, TFact>(
752            self,
753            ticket_manager: TMgr,
754            ticket_factory: TFact,
755        ) -> Result<Hopr<Chain, Graph, Net, TMgr>, HoprLibError>
756        where
757            TMgr: TicketManagement + Clone + Send + Sync + 'static,
758            TFact: TicketFactory + Clone + Send + Sync + 'static,
759        {
760            let (configured, session_tx, processes) = self.into_parts();
761            let pre = pre_build_inner(configured, session_tx, processes).await?;
762            let mut processes = pre.processes;
763
764            tracing::info!("starting ticket events processor");
765            let (tickets_tx, tickets_rx) = channel(8192);
766            let (tickets_rx_stream, tickets_handle) = futures::stream::abortable(tickets_rx);
767            processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
768            let new_ticket_tx = pre.ticket_event_subscribers.0.clone();
769            let tmgr_clone = ticket_manager.clone();
770            spawn(
771                tickets_rx_stream
772                    .for_each(move |event| {
773                        if let TicketEvent::WinningTicket(ticket) = &event
774                            && let Err(error) = tmgr_clone.insert_incoming_ticket(**ticket)
775                        {
776                            tracing::error!(%error, "failed to insert incoming ticket");
777                        }
778                        if let Err(error) = new_ticket_tx.try_broadcast(event) {
779                            tracing::error!(%error, "failed to broadcast ticket event");
780                        }
781                        futures::future::ready(())
782                    })
783                    .inspect(|_| {
784                        tracing::warn!(task = %HoprLibProcess::TicketEvents, "long-running background task finished")
785                    }),
786            );
787
788            {
789                let chain_for_neglect = pre.chain_api.clone();
790                let tmgr_for_neglect = ticket_manager.clone();
791                let events = pre.chain_api.subscribe().map_err(HoprLibError::chain)?;
792                let (neglect_handle, neglect_reg) = hopr_async_runtime::AbortHandle::new_pair();
793                spawn(
794                    futures::stream::Abortable::new(
795                        events.filter_map(move |event| {
796                            futures::future::ready(match event {
797                                ChainEvent::ChannelClosed(ch) => Some(ch),
798                                _ => None,
799                            })
800                        }),
801                        neglect_reg,
802                    )
803                    .for_each(move |closed_channel| {
804                        let chain = chain_for_neglect.clone();
805                        let tmgr = tmgr_for_neglect.clone();
806                        async move {
807                            match closed_channel.direction(chain.me()) {
808                                Some(ChannelDirection::Incoming) => {
809                                    match tmgr.neglect_tickets(closed_channel.get_id(), None) {
810                                        Ok(neglected) if !neglected.is_empty() => {
811                                            tracing::warn!(
812                                                num_neglected = neglected.len(),
813                                                %closed_channel,
814                                                "tickets on incoming closed channel were neglected"
815                                            );
816                                        }
817                                        Ok(_) => {}
818                                        Err(error) => {
819                                            tracing::error!(
820                                                %error, %closed_channel,
821                                                "failed to neglect tickets on closed channel"
822                                            );
823                                        }
824                                    }
825                                }
826                                Some(ChannelDirection::Outgoing) => {}
827                                _ => {}
828                            }
829                        }
830                    })
831                    .inspect(|_| {
832                        tracing::warn!(
833                            task = %HoprLibProcess::ChannelClosureNeglect,
834                            "channel closure ticket neglect task finished"
835                        )
836                    }),
837                );
838                processes.insert(HoprLibProcess::ChannelClosureNeglect, neglect_handle);
839            }
840
841            tracing::info!("starting transport for full node");
842            let (_, transport_processes) = pre
843                .transport_api
844                .run(
845                    pre.cover_traffic,
846                    pre.network,
847                    pre.network_process,
848                    tickets_tx,
849                    ticket_factory,
850                    pre.session_tx,
851                )
852                .await?;
853            processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
854
855            let hopr = Hopr {
856                chain_id: NodeOnchainIdentity {
857                    node_address: pre.chain_id.public().to_address(),
858                    safe_address: pre.cfg.safe_module.safe_address,
859                    module_address: pre.cfg.safe_module.module_address,
860                },
861                cfg: pre.cfg,
862                state: pre.state.clone(),
863                ticket_event_subscribers: pre.ticket_event_subscribers,
864                transport_id: pre.transport_id,
865                transport_api: pre.transport_api,
866                chain_api: pre.chain_api,
867                processes,
868                ticket_manager,
869            };
870
871            hopr.state.store(HoprState::Running, std::sync::atomic::Ordering::Relaxed);
872
873            tracing::info!(
874                id = %hopr.transport_id.public().to_peerid_str(),
875                version = constants::APP_VERSION,
876                "FULL NODE STARTED AND RUNNING"
877            );
878
879            #[cfg(all(feature = "telemetry", not(test)))]
880            METRIC_HOPR_NODE_INFO.set(
881                &[
882                    &hopr.transport_id.public().to_peerid_str(),
883                    &hopr.chain_id.node_address.to_string(),
884                    &hopr.chain_id.safe_address.to_string(),
885                    &hopr.chain_id.module_address.to_string(),
886                ],
887                1.0,
888            );
889
890            Ok(hopr)
891        }
892    };
893}
894
895// When session-server is ON: build methods only on HoprBuilderWithSession
896#[cfg(feature = "session-server")]
897impl<Chain, Graph, Net, Ct> HoprBuilderWithSession<Chain, Graph, Net, Ct>
898where
899    Chain: HoprChainApi + Clone + Send + Sync + 'static,
900    Graph: HoprGraphApi<HoprNodeId = hopr_api::OffchainPublicKey> + Clone + Send + Sync + 'static,
901    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
902        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
903    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
904    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
905    Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
906{
907    impl_build_methods!();
908
909    fn into_parts(
910        self,
911    ) -> (
912        HoprBuilderConfigured<Chain, Graph, Net, Ct>,
913        futures::channel::mpsc::Sender<IncomingSession>,
914        AbortableList<HoprLibProcess>,
915    ) {
916        let mut processes = AbortableList::<HoprLibProcess>::default();
917        processes.insert(HoprLibProcess::SessionServer, self.session_handle);
918        (self.inner, self.session_tx, processes)
919    }
920}
921
922// When session-server is OFF: build methods directly on HoprBuilderConfigured
923#[cfg(not(feature = "session-server"))]
924impl<Chain, Graph, Net, Ct> HoprBuilderConfigured<Chain, Graph, Net, Ct>
925where
926    Chain: HoprChainApi + Clone + Send + Sync + 'static,
927    Graph: HoprGraphApi<HoprNodeId = hopr_api::OffchainPublicKey> + Clone + Send + Sync + 'static,
928    <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
929        hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
930    <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
931    Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
932    Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
933{
934    impl_build_methods!();
935
936    fn into_parts(
937        self,
938    ) -> (
939        HoprBuilderConfigured<Chain, Graph, Net, Ct>,
940        futures::channel::mpsc::Sender<IncomingSession>,
941        AbortableList<HoprLibProcess>,
942    ) {
943        let (tx, _rx) = channel::<IncomingSession>(1);
944        let processes = AbortableList::<HoprLibProcess>::default();
945        (self, tx, processes)
946    }
947}