1mod chain_wiring;
30
31use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
32
33use futures::{FutureExt, StreamExt, channel::mpsc::channel};
34pub use hopr_api::types::crypto::{
35 keypairs::Keypair,
36 prelude::{ChainKeypair, OffchainKeypair},
37};
38use hopr_api::{
39 chain::{AnnouncementError, HoprChainApi, SafeRegistrationError, StateSyncOptions},
40 ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
41 graph::HoprGraphApi,
42 network::{BoxedProcessFn, NetworkStreamControl, NetworkView},
43 node::{AtomicHoprState, HoprState, NodeOnchainIdentity, TicketEvent},
44 tickets::{TicketFactory, TicketManagement},
45 types::{chain::chain_events::ChainEvent, internal::prelude::ChannelDirection, primitive::prelude::Address},
46};
47use hopr_transport::{HoprTransport, IncomingSession};
48use hopr_utils::{
49 network_types::addr::is_public_address,
50 runtime::{AbortableList, prelude::spawn},
51};
52use validator::Validate;
53
54use crate::{
55 Hopr, HoprLibError, HoprLibProcess, MIN_NATIVE_BALANCE, NODE_READY_TIMEOUT, SUGGESTED_NATIVE_BALANCE,
56 config::HoprLibConfig, constants,
57};
58
59#[cfg(all(feature = "telemetry", not(test)))]
60lazy_static::lazy_static! {
61 static ref METRIC_PROCESS_START_TIME: hopr_api::types::telemetry::SimpleGauge = hopr_api::types::telemetry::SimpleGauge::new(
62 "hopr_start_time",
63 "The unix timestamp in seconds at which the process was started"
64 ).unwrap();
65 static ref METRIC_HOPR_LIB_VERSION: hopr_api::types::telemetry::MultiGauge = hopr_api::types::telemetry::MultiGauge::new(
66 "hopr_lib_version",
67 "Executed version of hopr-lib",
68 &["version"]
69 ).unwrap();
70 static ref METRIC_HOPR_NODE_INFO: hopr_api::types::telemetry::MultiGauge = hopr_api::types::telemetry::MultiGauge::new(
71 "hopr_node_addresses",
72 "Node on-chain and off-chain addresses",
73 &["peerid", "address", "safe_address", "module_address"]
74 ).unwrap();
75}
76
77const PEER_DISCOVERY_CHANNEL_CAPACITY: usize = 2048;
78
79type PeerDiscoveryRx =
80 Arc<parking_lot::Mutex<Option<futures::channel::mpsc::Receiver<(hopr_api::PeerId, Vec<hopr_api::Multiaddr>)>>>>;
81
82type Factory<T> = Box<dyn FnOnce(&BuildCtx) -> T + Send>;
84type AsyncFactory<T> = Box<dyn FnOnce(BuildCtx) -> Pin<Box<dyn Future<Output = T> + Send>> + Send>;
85
86#[derive(Clone)]
88pub struct BuildCtx {
89 pub chain_key: ChainKeypair,
91 pub packet_key: OffchainKeypair,
93 pub cfg: HoprLibConfig,
95 peer_discovery_rx: PeerDiscoveryRx,
96}
97
98impl BuildCtx {
99 pub fn take_peer_discovery_rx(
101 &self,
102 ) -> Option<futures::channel::mpsc::Receiver<(hopr_api::PeerId, Vec<hopr_api::Multiaddr>)>> {
103 self.peer_discovery_rx.lock().take()
104 }
105}
106
107#[derive(Default)]
113pub struct HoprBuilder;
114
115impl HoprBuilder {
116 pub fn with_identity(self, chain_key: &ChainKeypair, offchain_key: &OffchainKeypair) -> HoprBuilderWithIdentity {
118 HoprBuilderWithIdentity {
119 chain_key: chain_key.clone(),
120 packet_key: offchain_key.clone(),
121 }
122 }
123}
124
125pub struct HoprBuilderWithIdentity {
127 chain_key: ChainKeypair,
128 packet_key: OffchainKeypair,
129}
130
131impl HoprBuilderWithIdentity {
132 pub fn with_config(self, cfg: HoprLibConfig) -> HoprBuilderConfigured {
134 let (peer_discovery_tx, peer_discovery_rx) = futures::channel::mpsc::channel(PEER_DISCOVERY_CHANNEL_CAPACITY);
135 HoprBuilderConfigured {
136 ctx: BuildCtx {
137 chain_key: self.chain_key,
138 packet_key: self.packet_key,
139 cfg,
140 peer_discovery_rx: Arc::new(parking_lot::Mutex::new(Some(peer_discovery_rx))),
141 },
142 safe_and_module: None,
143 chain_factory: None,
144 graph_factory: None,
145 network_factory: None,
146 ct_factory: None,
147 peer_discovery_tx,
148 }
149 }
150}
151
152pub struct HoprBuilderConfigured<Chain = (), Graph = (), Net = (), Ct = ()> {
164 ctx: BuildCtx,
165 safe_and_module: Option<(Address, Address)>,
166 chain_factory: Option<Factory<Chain>>,
167 graph_factory: Option<Factory<Graph>>,
168 network_factory: Option<AsyncFactory<Result<(Net, BoxedProcessFn), HoprLibError>>>,
169 ct_factory: Option<Factory<Ct>>,
170 peer_discovery_tx: futures::channel::mpsc::Sender<(hopr_api::PeerId, Vec<hopr_api::Multiaddr>)>,
171}
172
173impl<Chain, Graph, Net, Ct> HoprBuilderConfigured<Chain, Graph, Net, Ct> {
174 pub fn with_safe_module(mut self, safe: &Address, module: &Address) -> Self {
176 self.safe_and_module = Some((*safe, *module));
177 self
178 }
179
180 pub fn with_chain_api<NewChain>(
182 self,
183 f: impl FnOnce(&BuildCtx) -> NewChain + Send + 'static,
184 ) -> HoprBuilderConfigured<NewChain, Graph, Net, Ct> {
185 HoprBuilderConfigured {
186 ctx: self.ctx,
187 safe_and_module: self.safe_and_module,
188 chain_factory: Some(Box::new(f)),
189 graph_factory: self.graph_factory,
190 network_factory: self.network_factory,
191 ct_factory: self.ct_factory,
192 peer_discovery_tx: self.peer_discovery_tx,
193 }
194 }
195
196 pub fn with_graph<NewGraph>(
198 self,
199 f: impl FnOnce(&BuildCtx) -> NewGraph + Send + 'static,
200 ) -> HoprBuilderConfigured<Chain, NewGraph, Net, Ct> {
201 HoprBuilderConfigured {
202 ctx: self.ctx,
203 safe_and_module: self.safe_and_module,
204 chain_factory: self.chain_factory,
205 graph_factory: Some(Box::new(f)),
206 network_factory: self.network_factory,
207 ct_factory: self.ct_factory,
208 peer_discovery_tx: self.peer_discovery_tx,
209 }
210 }
211
212 pub fn with_network<NewNet>(
218 self,
219 f: impl FnOnce(BuildCtx) -> Pin<Box<dyn Future<Output = Result<(NewNet, BoxedProcessFn), HoprLibError>> + Send>>
220 + Send
221 + 'static,
222 ) -> HoprBuilderConfigured<Chain, Graph, NewNet, Ct> {
223 HoprBuilderConfigured {
224 ctx: self.ctx,
225 safe_and_module: self.safe_and_module,
226 chain_factory: self.chain_factory,
227 graph_factory: self.graph_factory,
228 network_factory: Some(Box::new(f)),
229 ct_factory: self.ct_factory,
230 peer_discovery_tx: self.peer_discovery_tx,
231 }
232 }
233
234 pub fn with_cover_traffic<NewCt>(
236 self,
237 f: impl FnOnce(&BuildCtx) -> NewCt + Send + 'static,
238 ) -> HoprBuilderConfigured<Chain, Graph, Net, NewCt> {
239 HoprBuilderConfigured {
240 ctx: self.ctx,
241 safe_and_module: self.safe_and_module,
242 chain_factory: self.chain_factory,
243 graph_factory: self.graph_factory,
244 network_factory: self.network_factory,
245 ct_factory: Some(Box::new(f)),
246 peer_discovery_tx: self.peer_discovery_tx,
247 }
248 }
249
250 #[cfg(feature = "session-server")]
255 pub fn with_session_server(
256 self,
257 server: impl hopr_api::node::HoprSessionServer<Session = IncomingSession, Error: std::fmt::Display>
258 + Clone
259 + Send
260 + 'static,
261 ) -> HoprBuilderWithSession<Chain, Graph, Net, Ct> {
262 let incoming_session_capacity = self.ctx.cfg.incoming_session_capacity.max(1);
263
264 let (session_tx, session_rx) = channel::<IncomingSession>(incoming_session_capacity);
265
266 tracing::debug!(capacity = incoming_session_capacity, "spawning session server");
267 let session_diag = hopr_utils::runtime::diagnostics::ConcurrentDiagnostics::new(
268 "session_server_for_each_concurrent",
269 module_path!(),
270 file!(),
271 line!(),
272 );
273 let handle = hopr_utils::spawn_as_abortable_named!(
274 "hopr_lib_session_server",
275 session_rx
276 .for_each_concurrent(None, move |session| {
277 let server = server.clone();
278 let session_diag = session_diag.clone();
279 session_diag.wrap(async move {
280 let session_id = *session.session.id();
281 match server.process(session).await {
282 Ok(()) => tracing::debug!(?session_id, "session processed successfully"),
283 Err(error) => {
284 tracing::error!(?session_id, %error, "session processing failed")
285 }
286 }
287 })
288 })
289 .inspect(|_| tracing::warn!(
290 task = %HoprLibProcess::SessionServer,
291 "long-running background task finished"
292 ))
293 );
294
295 HoprBuilderWithSession {
296 inner: self,
297 session_tx,
298 session_handle: handle,
299 }
300 }
301}
302
303#[cfg(feature = "session-server")]
311pub struct HoprBuilderWithSession<Chain = (), Graph = (), Net = (), Ct = ()> {
312 inner: HoprBuilderConfigured<Chain, Graph, Net, Ct>,
313 session_tx: futures::channel::mpsc::Sender<IncomingSession>,
314 session_handle: hopr_utils::runtime::AbortHandle,
315}
316
317struct PreHopr<Chain, Graph, Net, Ct> {
322 chain_id: ChainKeypair,
323 transport_id: OffchainKeypair,
324 cfg: HoprLibConfig,
325 state: Arc<AtomicHoprState>,
326 transport_api: HoprTransport<Chain, Graph, Net>,
327 chain_api: Chain,
328
329 ticket_event_subscribers: (
330 async_broadcast::Sender<TicketEvent>,
331 async_broadcast::InactiveReceiver<TicketEvent>,
332 ),
333 processes: AbortableList<HoprLibProcess>,
334 session_tx: futures::channel::mpsc::Sender<IncomingSession>,
335 cover_traffic: Ct,
336 network: Net,
337 network_process: BoxedProcessFn,
338}
339
340async fn drain_incoming_data<S: futures::Stream + Unpin>(mut reader: S) {
347 let mut received: u64 = 0;
348 let mut last_report = std::time::Instant::now();
349 while reader.next().await.is_some() {
350 received += 1;
351 if last_report.elapsed().as_secs() >= 60 {
352 tracing::info!(
353 received,
354 "incoming-data drain: unrelated packets discarded in last ~1 min"
355 );
356 received = 0;
357 last_report = std::time::Instant::now();
358 }
359 }
360}
361
362async fn pre_build_inner<Chain, Graph, Net, Ct>(
363 configured: HoprBuilderConfigured<Chain, Graph, Net, Ct>,
364 session_tx: futures::channel::mpsc::Sender<IncomingSession>,
365 mut processes: AbortableList<HoprLibProcess>,
366) -> Result<PreHopr<Chain, Graph, Net, Ct>, HoprLibError>
367where
368 Chain: HoprChainApi + Clone + Send + Sync + 'static,
369 Graph: HoprGraphApi<HoprNodeId = hopr_api::OffchainPublicKey> + Clone + Send + Sync + 'static,
370 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
371 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
372 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
373 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
374 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
375{
376 let peer_discovery_tx = Some(configured.peer_discovery_tx);
377 let ctx = configured.ctx;
378 ctx.cfg.validate()?;
379
380 let chain_api = (configured
381 .chain_factory
382 .ok_or(HoprLibError::BuilderError("missing chain factory"))?)(&ctx);
383 let graph = (configured
384 .graph_factory
385 .ok_or(HoprLibError::BuilderError("missing graph factory"))?)(&ctx);
386 let (network, network_process) =
387 (configured
388 .network_factory
389 .ok_or(HoprLibError::BuilderError("missing network factory"))?)(ctx.clone())
390 .await?;
391 let cover_traffic = (configured
392 .ct_factory
393 .ok_or(HoprLibError::BuilderError("missing cover traffic factory"))?)(&ctx);
394
395 let (chain_id, transport_id) = (ctx.chain_key.clone(), ctx.packet_key.clone());
396
397 let transport_api = HoprTransport::new(
398 (&chain_id, &transport_id),
399 chain_api.clone(),
400 graph.clone(),
401 vec![(&ctx.cfg.host).try_into().map_err(HoprLibError::TransportError)?],
402 ctx.cfg.protocol.clone(),
403 )
404 .map_err(HoprLibError::TransportError)?;
405
406 #[cfg(all(feature = "telemetry", not(test)))]
407 {
408 use hopr_api::types::primitive::traits::AsUnixTimestamp;
409 METRIC_PROCESS_START_TIME.set(std::time::SystemTime::now().as_unix_timestamp().as_secs_f64());
410 METRIC_HOPR_LIB_VERSION.set(
411 &[const_format::formatcp!("{}", constants::APP_VERSION)],
412 const_format::formatcp!(
413 "{}.{}",
414 env!("CARGO_PKG_VERSION_MAJOR"),
415 env!("CARGO_PKG_VERSION_MINOR")
416 )
417 .parse()
418 .unwrap_or(0.0),
419 );
420 }
421
422 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
423 new_tickets_tx.set_await_active(false);
424 new_tickets_tx.set_overflow(true);
425
426 let me_onchain = chain_id.public().to_address();
427
428 #[cfg(feature = "testing")]
429 tracing::warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
430
431 tracing::info!(
432 address = %me_onchain,
433 minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
434 "node is not started, please fund this node",
435 );
436
437 crate::helpers::wait_for_funds(
438 *MIN_NATIVE_BALANCE,
439 *SUGGESTED_NATIVE_BALANCE,
440 Duration::from_secs(200),
441 me_onchain,
442 &chain_api,
443 )
444 .await?;
445
446 tracing::info!("starting HOPR node...");
447 let balance: hopr_api::types::primitive::prelude::XDaiBalance =
448 chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
449 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
450
451 tracing::info!(address = %me_onchain, %balance, %minimum_balance, "node information");
452
453 if balance.le(&minimum_balance) {
454 return Err(HoprLibError::InsufficientFunds(
455 "cannot start the node without a sufficiently funded wallet".into(),
456 ));
457 }
458
459 #[cfg(debug_assertions)]
460 let skip_protocol_checks = ctx.cfg.disable_protocol_checks;
461 #[cfg(not(debug_assertions))]
462 let skip_protocol_checks = false;
463
464 let network_min_ticket_price = chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)?;
465 let configured_ticket_price = ctx.cfg.protocol.packet.codec.outgoing_ticket_price;
466 if !skip_protocol_checks && configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
467 return Err(HoprLibError::GeneralError(format!(
468 "configured outgoing ticket price < network minimum: {configured_ticket_price:?} < \
469 {network_min_ticket_price}"
470 )));
471 }
472
473 let network_min_win_prob = chain_api
474 .minimum_incoming_ticket_win_prob()
475 .await
476 .map_err(HoprLibError::chain)?;
477 let configured_win_prob = ctx.cfg.protocol.packet.codec.outgoing_win_prob;
478
479 if !skip_protocol_checks && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt()) {
480 return Err(HoprLibError::GeneralError(format!(
481 "configured outgoing win probability < network minimum: {configured_win_prob:?} < {network_min_win_prob}"
482 )));
483 }
484
485 tracing::info!(
486 peer_id = %transport_id.public().to_peerid_str(),
487 address = %me_onchain,
488 version = constants::APP_VERSION,
489 "Node information"
490 );
491
492 let safe_addr = ctx.cfg.safe_module.safe_address;
493 if me_onchain == safe_addr {
494 return Err(HoprLibError::GeneralError(
495 "cannot use self as staking safe address".into(),
496 ));
497 }
498
499 tracing::info!(%safe_addr, "registering safe with this node");
500 match chain_api.register_safe(&safe_addr).await {
501 Ok(awaiter) => {
502 awaiter.await.map_err(|error| {
503 tracing::error!(%safe_addr, %error, "safe registration failed");
504 HoprLibError::chain(error)
505 })?;
506 tracing::info!(%safe_addr, "safe successfully registered with this node");
507 }
508 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) => {
509 if registered_safe == safe_addr {
510 tracing::info!(%safe_addr, "this safe is already registered with this node");
511 } else {
512 tracing::error!(%safe_addr, %registered_safe, "node registered with different safe");
513 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
514 }
515 }
516 Err(error) => {
517 tracing::error!(%safe_addr, %error, "safe registration failed");
518 return Err(HoprLibError::chain(error));
519 }
520 }
521
522 let multiaddresses_to_announce = if ctx.cfg.publish {
523 transport_api.announceable_multiaddresses()
524 } else {
525 Vec::new()
526 };
527
528 multiaddresses_to_announce
529 .iter()
530 .filter(|a| !is_public_address(a))
531 .for_each(|multi_addr| tracing::warn!(?multi_addr, "announcing private multiaddress"));
532
533 let chain_api_clone = chain_api.clone();
534 let me_offchain = *transport_id.public();
535 let node_ready = spawn(async move {
536 chain_api_clone
537 .await_key_binding(&me_offchain, NODE_READY_TIMEOUT)
538 .await
539 });
540
541 tracing::info!(?multiaddresses_to_announce, "announcing node on chain");
542 match chain_api.announce(&multiaddresses_to_announce, &transport_id).await {
543 Ok(awaiter) => {
544 awaiter.await.map_err(|error| {
545 tracing::error!(?multiaddresses_to_announce, %error, "node announcement failed");
546 HoprLibError::chain(error)
547 })?;
548 tracing::info!(?multiaddresses_to_announce, "node announced successfully");
549 }
550 Err(AnnouncementError::AlreadyAnnounced) => {
551 tracing::info!("node already announced on chain");
552 }
553 Err(error) => {
554 tracing::error!(%error, "failed to transmit node announcement");
555 return Err(HoprLibError::chain(error));
556 }
557 }
558
559 let this_node_account = node_ready
560 .await
561 .map_err(HoprLibError::other)?
562 .map_err(HoprLibError::chain)?;
563 if this_node_account.chain_addr != me_onchain || this_node_account.safe_address.is_none_or(|a| a != safe_addr) {
564 tracing::error!(%this_node_account, "account key-binding mismatch");
565 return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
566 }
567
568 tracing::info!(%this_node_account, "node account is ready");
569
570 {
572 let network_events = network.subscribe_network_events();
573 let graph_updater = graph.clone();
574 spawn(async move {
575 network_events
576 .for_each(|event| {
577 let graph_updater = graph_updater.clone();
578 async move {
579 let (peer_id, connected) = match event {
580 hopr_api::network::NetworkEvent::PeerConnected(p) => (p, true),
581 hopr_api::network::NetworkEvent::PeerDisconnected(p) => (p, false),
582 };
583 if let Ok(opk) = hopr_api::OffchainPublicKey::from_peerid(&peer_id) {
584 graph_updater.record_edge(hopr_api::graph::MeasurableEdge::<
585 hopr_transport::NeighborTelemetry,
586 hopr_transport::PathTelemetry,
587 >::ConnectionStatus {
588 peer: opk,
589 connected,
590 });
591 } else {
592 tracing::error!(%peer_id, "failed to convert peer ID to public key for graph update");
593 }
594 }
595 })
596 .await;
597 });
598 }
599
600 {
602 let chain_events = chain_api
603 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts, StateSyncOptions::OpenedChannels])
604 .map_err(HoprLibError::chain)?;
605
606 let graph_updater = graph.clone();
607 let chain_reader = chain_api.clone();
608
609 let own_chain_addr = me_onchain;
610 let own_packet_key = *transport_id.public();
611
612 let ticket_price = Arc::new(parking_lot::RwLock::new(
613 chain_reader.minimum_ticket_price().await.unwrap_or_default(),
614 ));
615 let win_probability = Arc::new(parking_lot::RwLock::new(
616 chain_reader
617 .minimum_incoming_ticket_win_prob()
618 .await
619 .unwrap_or_default(),
620 ));
621
622 let proc = chain_wiring::process_chain_events(
623 chain_reader,
624 graph_updater,
625 chain_events,
626 own_chain_addr,
627 own_packet_key,
628 ticket_price,
629 win_probability,
630 peer_discovery_tx,
631 )
632 .inspect(|_| {
633 tracing::warn!(
634 task = "chain-to-graph event wiring",
635 "long-running background task finished"
636 )
637 });
638 processes.insert(HoprLibProcess::ChannelEvents, hopr_utils::spawn_as_abortable!(proc));
639 }
640
641 Ok(PreHopr {
642 chain_id,
643 transport_id,
644 cfg: ctx.cfg,
645 state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
646 transport_api,
647 chain_api,
648 ticket_event_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
649 processes,
650 session_tx,
651 cover_traffic,
652 network,
653 network_process,
654 })
655}
656
657macro_rules! impl_build_methods {
662 () => {
663 pub async fn build_edge<TFact>(
665 self,
666 ticket_factory: TFact,
667 ) -> Result<Hopr<Chain, Graph, Net, ()>, HoprLibError>
668 where
669 TFact: TicketFactory + Clone + Send + Sync + 'static,
670 {
671 let (configured, session_tx, processes) = self.into_parts();
672 let pre = pre_build_inner(configured, session_tx, processes).await?;
673
674 tracing::info!("starting transport for edge (entry) node");
675 let (socket, transport_processes) = pre
676 .transport_api
677 .run_entry(
678 pre.cover_traffic,
679 pre.network,
680 pre.network_process,
681 ticket_factory,
682 )
683 .await?;
684
685 spawn(drain_incoming_data(socket.reader()));
687
688 let mut processes = pre.processes;
689 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
690
691 let hopr = Hopr {
692 chain_id: NodeOnchainIdentity {
693 node_address: pre.chain_id.public().to_address(),
694 safe_address: pre.cfg.safe_module.safe_address,
695 module_address: pre.cfg.safe_module.module_address,
696 },
697 cfg: pre.cfg,
698 state: pre.state.clone(),
699 ticket_event_subscribers: pre.ticket_event_subscribers,
700 transport_id: pre.transport_id,
701 transport_api: pre.transport_api,
702 chain_api: pre.chain_api,
703 processes,
704 ticket_manager: (),
705 };
706
707 hopr.state.store(HoprState::Running, std::sync::atomic::Ordering::Relaxed);
708 tracing::info!(
709 id = %hopr.transport_id.public().to_peerid_str(),
710 version = constants::APP_VERSION,
711 "EDGE NODE STARTED AND RUNNING"
712 );
713
714 Ok(hopr)
715 }
716
717 pub async fn build_full<TMgr, TFact>(
719 self,
720 ticket_manager: TMgr,
721 ticket_factory: TFact,
722 ) -> Result<Hopr<Chain, Graph, Net, TMgr>, HoprLibError>
723 where
724 TMgr: TicketManagement + Clone + Send + Sync + 'static,
725 TFact: TicketFactory + Clone + Send + Sync + 'static,
726 {
727 let (configured, session_tx, processes) = self.into_parts();
728 let pre = pre_build_inner(configured, session_tx, processes).await?;
729 let mut processes = pre.processes;
730
731 tracing::info!("starting ticket events processor");
732 let (tickets_tx, tickets_rx) = channel(8192);
733 let (tickets_rx_stream, tickets_handle) = futures::stream::abortable(tickets_rx);
734 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
735 let new_ticket_tx = pre.ticket_event_subscribers.0.clone();
736 let tmgr_clone = ticket_manager.clone();
737 spawn(
738 hopr_utils::runtime::diagnostics::instrument(
739 tickets_rx_stream
740 .for_each(move |event| {
741 if let TicketEvent::WinningTicket(ticket) = &event
742 && let Err(error) = tmgr_clone.insert_incoming_ticket(**ticket)
743 {
744 tracing::error!(%error, "failed to insert incoming ticket");
745 }
746 if let Err(error) = new_ticket_tx.try_broadcast(event) {
747 tracing::error!(%error, "failed to broadcast ticket event");
748 }
749 futures::future::ready(())
750 })
751 .inspect(|_| {
752 tracing::warn!(task = %HoprLibProcess::TicketEvents, "long-running background task finished")
753 }),
754 "hopr_lib_ticket_events",
755 module_path!(),
756 file!(),
757 line!(),
758 ),
759 );
760
761 {
762 let chain_for_neglect = pre.chain_api.clone();
763 let tmgr_for_neglect = ticket_manager.clone();
764 let events = pre.chain_api.subscribe().map_err(HoprLibError::chain)?;
765 let (neglect_handle, neglect_reg) = hopr_utils::runtime::AbortHandle::new_pair();
766 let neglect_task = futures::stream::Abortable::new(
767 events.filter_map(move |event| {
768 futures::future::ready(match event {
769 ChainEvent::ChannelClosed(ch) => Some(ch),
770 _ => None,
771 })
772 }),
773 neglect_reg,
774 )
775 .for_each(move |closed_channel| {
776 let chain = chain_for_neglect.clone();
777 let tmgr = tmgr_for_neglect.clone();
778 async move {
779 match closed_channel.direction(chain.me()) {
780 Some(ChannelDirection::Incoming) => {
781 match tmgr.neglect_tickets(closed_channel.get_id(), None) {
782 Ok(neglected) if !neglected.is_empty() => {
783 tracing::warn!(
784 num_neglected = neglected.len(),
785 %closed_channel,
786 "tickets on incoming closed channel were neglected"
787 );
788 }
789 Ok(_) => {}
790 Err(error) => {
791 tracing::error!(
792 %error, %closed_channel,
793 "failed to neglect tickets on closed channel"
794 );
795 }
796 }
797 }
798 Some(ChannelDirection::Outgoing) => {}
799 _ => {}
800 }
801 }
802 })
803 .inspect(|_| {
804 tracing::warn!(
805 task = %HoprLibProcess::ChannelClosureNeglect,
806 "channel closure ticket neglect task finished"
807 )
808 });
809 spawn(hopr_utils::runtime::diagnostics::instrument(
810 neglect_task,
811 "hopr_lib_channel_closure_neglect",
812 module_path!(),
813 file!(),
814 line!(),
815 ));
816 processes.insert(HoprLibProcess::ChannelClosureNeglect, neglect_handle);
817 }
818
819 tracing::info!("starting transport for full (relay) node");
820 let (socket, transport_processes) = pre
821 .transport_api
822 .run_relay(
823 pre.cover_traffic,
824 pre.network,
825 pre.network_process,
826 tickets_tx,
827 ticket_factory,
828 pre.session_tx,
829 )
830 .await?;
831 spawn(drain_incoming_data(socket.reader()));
833 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
834
835 let hopr = Hopr {
836 chain_id: NodeOnchainIdentity {
837 node_address: pre.chain_id.public().to_address(),
838 safe_address: pre.cfg.safe_module.safe_address,
839 module_address: pre.cfg.safe_module.module_address,
840 },
841 cfg: pre.cfg,
842 state: pre.state.clone(),
843 ticket_event_subscribers: pre.ticket_event_subscribers,
844 transport_id: pre.transport_id,
845 transport_api: pre.transport_api,
846 chain_api: pre.chain_api,
847 processes,
848 ticket_manager,
849 };
850
851 hopr.state.store(HoprState::Running, std::sync::atomic::Ordering::Relaxed);
852
853 tracing::info!(
854 id = %hopr.transport_id.public().to_peerid_str(),
855 version = constants::APP_VERSION,
856 "FULL NODE STARTED AND RUNNING"
857 );
858
859 #[cfg(all(feature = "telemetry", not(test)))]
860 METRIC_HOPR_NODE_INFO.set(
861 &[
862 &hopr.transport_id.public().to_peerid_str(),
863 &hopr.chain_id.node_address.to_string(),
864 &hopr.chain_id.safe_address.to_string(),
865 &hopr.chain_id.module_address.to_string(),
866 ],
867 1.0,
868 );
869
870 Ok(hopr)
871 }
872 };
873}
874
875#[cfg(feature = "session-server")]
877impl<Chain, Graph, Net, Ct> HoprBuilderWithSession<Chain, Graph, Net, Ct>
878where
879 Chain: HoprChainApi + Clone + Send + Sync + 'static,
880 Graph: HoprGraphApi<HoprNodeId = hopr_api::OffchainPublicKey> + Clone + Send + Sync + 'static,
881 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
882 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
883 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
884 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
885 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
886{
887 impl_build_methods!();
888
889 fn into_parts(
890 self,
891 ) -> (
892 HoprBuilderConfigured<Chain, Graph, Net, Ct>,
893 futures::channel::mpsc::Sender<IncomingSession>,
894 AbortableList<HoprLibProcess>,
895 ) {
896 let mut processes = AbortableList::<HoprLibProcess>::default();
897 processes.insert(HoprLibProcess::SessionServer, self.session_handle);
898 (self.inner, self.session_tx, processes)
899 }
900}
901
902#[cfg(not(feature = "session-server"))]
904impl<Chain, Graph, Net, Ct> HoprBuilderConfigured<Chain, Graph, Net, Ct>
905where
906 Chain: HoprChainApi + Clone + Send + Sync + 'static,
907 Graph: HoprGraphApi<HoprNodeId = hopr_api::OffchainPublicKey> + Clone + Send + Sync + 'static,
908 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
909 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
910 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
911 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
912 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
913{
914 impl_build_methods!();
915
916 fn into_parts(
917 self,
918 ) -> (
919 HoprBuilderConfigured<Chain, Graph, Net, Ct>,
920 futures::channel::mpsc::Sender<IncomingSession>,
921 AbortableList<HoprLibProcess>,
922 ) {
923 let (tx, _rx) = channel::<IncomingSession>(1);
924 let processes = AbortableList::<HoprLibProcess>::default();
925 (self, tx, processes)
926 }
927}