1use std::{convert::identity, future::Future, pin::Pin, sync::Arc, time::Duration};
30
31use futures::{FutureExt, StreamExt, channel::mpsc::channel};
32pub use hopr_api::types::crypto::{
33 keypairs::Keypair,
34 prelude::{ChainKeypair, OffchainKeypair},
35};
36use hopr_api::{
37 chain::{AnnouncementError, HoprChainApi, SafeRegistrationError, StateSyncOptions},
38 ct::{CoverTrafficGeneration, ProbingTrafficGeneration},
39 graph::{EdgeCapacityUpdate, HoprGraphApi},
40 network::{BoxedProcessFn, NetworkStreamControl, NetworkView},
41 node::{AtomicHoprState, HoprState, NodeOnchainIdentity, TicketEvent},
42 tickets::{TicketFactory, TicketManagement},
43 types::{
44 chain::chain_events::ChainEvent,
45 internal::prelude::{ChannelDirection, ChannelStatus},
46 primitive::prelude::{Address, UnitaryFloatOps},
47 },
48};
49use hopr_async_runtime::{AbortableList, prelude::spawn};
50use hopr_network_types::addr::is_public_address;
51use hopr_transport::{HoprTransport, IncomingSession};
52use validator::Validate;
53
54use crate::{
55 Hopr, HoprLibError, HoprLibProcess, MIN_NATIVE_BALANCE, NODE_READY_TIMEOUT, SUGGESTED_NATIVE_BALANCE,
56 config::HoprLibConfig, constants,
57};
58
59#[cfg(all(feature = "telemetry", not(test)))]
60lazy_static::lazy_static! {
61 static ref METRIC_PROCESS_START_TIME: hopr_metrics::SimpleGauge = hopr_metrics::SimpleGauge::new(
62 "hopr_start_time",
63 "The unix timestamp in seconds at which the process was started"
64 ).unwrap();
65 static ref METRIC_HOPR_LIB_VERSION: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
66 "hopr_lib_version",
67 "Executed version of hopr-lib",
68 &["version"]
69 ).unwrap();
70 static ref METRIC_HOPR_NODE_INFO: hopr_metrics::MultiGauge = hopr_metrics::MultiGauge::new(
71 "hopr_node_addresses",
72 "Node on-chain and off-chain addresses",
73 &["peerid", "address", "safe_address", "module_address"]
74 ).unwrap();
75}
76
77type Factory<T> = Box<dyn FnOnce(&BuildCtx) -> T + Send>;
79type AsyncFactory<T> = Box<dyn FnOnce(BuildCtx) -> Pin<Box<dyn Future<Output = T> + Send>> + Send>;
80
81#[derive(Clone)]
83pub struct BuildCtx {
84 pub chain_key: ChainKeypair,
86 pub packet_key: OffchainKeypair,
88 pub cfg: HoprLibConfig,
90}
91
92#[derive(Default)]
98pub struct HoprBuilder;
99
100impl HoprBuilder {
101 pub fn with_identity(self, chain_key: &ChainKeypair, offchain_key: &OffchainKeypair) -> HoprBuilderWithIdentity {
103 HoprBuilderWithIdentity {
104 chain_key: chain_key.clone(),
105 packet_key: offchain_key.clone(),
106 }
107 }
108}
109
110pub struct HoprBuilderWithIdentity {
112 chain_key: ChainKeypair,
113 packet_key: OffchainKeypair,
114}
115
116impl HoprBuilderWithIdentity {
117 pub fn with_config(self, cfg: HoprLibConfig) -> HoprBuilderConfigured {
119 HoprBuilderConfigured {
120 ctx: BuildCtx {
121 chain_key: self.chain_key,
122 packet_key: self.packet_key,
123 cfg,
124 },
125 safe_and_module: None,
126 chain_factory: None,
127 graph_factory: None,
128 network_factory: None,
129 ct_factory: None,
130 }
131 }
132}
133
134pub struct HoprBuilderConfigured<Chain = (), Graph = (), Net = (), Ct = ()> {
146 ctx: BuildCtx,
147 safe_and_module: Option<(Address, Address)>,
148 chain_factory: Option<Factory<Chain>>,
149 graph_factory: Option<Factory<Graph>>,
150 network_factory: Option<AsyncFactory<(Net, BoxedProcessFn)>>,
151 ct_factory: Option<Factory<Ct>>,
152}
153
154impl<Chain, Graph, Net, Ct> HoprBuilderConfigured<Chain, Graph, Net, Ct> {
155 pub fn with_safe_module(mut self, safe: &Address, module: &Address) -> Self {
157 self.safe_and_module = Some((*safe, *module));
158 self
159 }
160
161 pub fn with_chain_api<NewChain>(
163 self,
164 f: impl FnOnce(&BuildCtx) -> NewChain + Send + 'static,
165 ) -> HoprBuilderConfigured<NewChain, Graph, Net, Ct> {
166 HoprBuilderConfigured {
167 ctx: self.ctx,
168 safe_and_module: self.safe_and_module,
169 chain_factory: Some(Box::new(f)),
170 graph_factory: self.graph_factory,
171 network_factory: self.network_factory,
172 ct_factory: self.ct_factory,
173 }
174 }
175
176 pub fn with_graph<NewGraph>(
178 self,
179 f: impl FnOnce(&BuildCtx) -> NewGraph + Send + 'static,
180 ) -> HoprBuilderConfigured<Chain, NewGraph, Net, Ct> {
181 HoprBuilderConfigured {
182 ctx: self.ctx,
183 safe_and_module: self.safe_and_module,
184 chain_factory: self.chain_factory,
185 graph_factory: Some(Box::new(f)),
186 network_factory: self.network_factory,
187 ct_factory: self.ct_factory,
188 }
189 }
190
191 pub fn with_network<NewNet>(
196 self,
197 f: impl FnOnce(BuildCtx) -> Pin<Box<dyn Future<Output = (NewNet, BoxedProcessFn)> + Send>> + Send + 'static,
198 ) -> HoprBuilderConfigured<Chain, Graph, NewNet, Ct> {
199 HoprBuilderConfigured {
200 ctx: self.ctx,
201 safe_and_module: self.safe_and_module,
202 chain_factory: self.chain_factory,
203 graph_factory: self.graph_factory,
204 network_factory: Some(Box::new(f)),
205 ct_factory: self.ct_factory,
206 }
207 }
208
209 pub fn with_cover_traffic<NewCt>(
211 self,
212 f: impl FnOnce(&BuildCtx) -> NewCt + Send + 'static,
213 ) -> HoprBuilderConfigured<Chain, Graph, Net, NewCt> {
214 HoprBuilderConfigured {
215 ctx: self.ctx,
216 safe_and_module: self.safe_and_module,
217 chain_factory: self.chain_factory,
218 graph_factory: self.graph_factory,
219 network_factory: self.network_factory,
220 ct_factory: Some(Box::new(f)),
221 }
222 }
223
224 #[cfg(feature = "session-server")]
229 pub fn with_session_server(
230 self,
231 server: impl hopr_api::node::HoprSessionServer<Session = IncomingSession, Error: std::fmt::Display>
232 + Clone
233 + Send
234 + 'static,
235 ) -> HoprBuilderWithSession<Chain, Graph, Net, Ct> {
236 let incoming_session_capacity = std::env::var("HOPR_INTERNAL_SESSION_INCOMING_CAPACITY")
237 .ok()
238 .and_then(|s| s.trim().parse::<usize>().ok())
239 .filter(|&c| c > 0)
240 .unwrap_or(256);
241
242 let (session_tx, session_rx) = channel::<IncomingSession>(incoming_session_capacity);
243
244 tracing::debug!(capacity = incoming_session_capacity, "spawning session server");
245 let handle = hopr_async_runtime::spawn_as_abortable!(
246 session_rx
247 .for_each_concurrent(None, move |session| {
248 let server = server.clone();
249 async move {
250 let session_id = *session.session.id();
251 match server.process(session).await {
252 Ok(()) => tracing::debug!(?session_id, "session processed successfully"),
253 Err(error) => {
254 tracing::error!(?session_id, %error, "session processing failed")
255 }
256 }
257 }
258 })
259 .inspect(|_| tracing::warn!(
260 task = %HoprLibProcess::SessionServer,
261 "long-running background task finished"
262 ))
263 );
264
265 HoprBuilderWithSession {
266 inner: self,
267 session_tx,
268 session_handle: handle,
269 }
270 }
271}
272
273#[cfg(feature = "session-server")]
281pub struct HoprBuilderWithSession<Chain = (), Graph = (), Net = (), Ct = ()> {
282 inner: HoprBuilderConfigured<Chain, Graph, Net, Ct>,
283 session_tx: futures::channel::mpsc::Sender<IncomingSession>,
284 session_handle: hopr_async_runtime::AbortHandle,
285}
286
287struct PreHopr<Chain, Graph, Net, Ct> {
292 chain_id: ChainKeypair,
293 transport_id: OffchainKeypair,
294 cfg: HoprLibConfig,
295 state: Arc<AtomicHoprState>,
296 transport_api: HoprTransport<Chain, Graph, Net>,
297 chain_api: Chain,
298
299 ticket_event_subscribers: (
300 async_broadcast::Sender<TicketEvent>,
301 async_broadcast::InactiveReceiver<TicketEvent>,
302 ),
303 processes: AbortableList<HoprLibProcess>,
304 session_tx: futures::channel::mpsc::Sender<IncomingSession>,
305 cover_traffic: Ct,
306 network: Net,
307 network_process: BoxedProcessFn,
308}
309
310async fn pre_build_inner<Chain, Graph, Net, Ct>(
315 configured: HoprBuilderConfigured<Chain, Graph, Net, Ct>,
316 session_tx: futures::channel::mpsc::Sender<IncomingSession>,
317 mut processes: AbortableList<HoprLibProcess>,
318) -> Result<PreHopr<Chain, Graph, Net, Ct>, HoprLibError>
319where
320 Chain: HoprChainApi + Clone + Send + Sync + 'static,
321 Graph: HoprGraphApi<HoprNodeId = hopr_api::OffchainPublicKey> + Clone + Send + Sync + 'static,
322 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
323 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
324 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
325 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
326 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
327{
328 let ctx = configured.ctx;
329 ctx.cfg.validate()?;
330
331 let chain_api = (configured
332 .chain_factory
333 .ok_or(HoprLibError::BuilderError("missing chain factory"))?)(&ctx);
334 let graph = (configured
335 .graph_factory
336 .ok_or(HoprLibError::BuilderError("missing graph factory"))?)(&ctx);
337 let (network, network_process) =
338 (configured
339 .network_factory
340 .ok_or(HoprLibError::BuilderError("missing network factory"))?)(ctx.clone())
341 .await;
342 let cover_traffic = (configured
343 .ct_factory
344 .ok_or(HoprLibError::BuilderError("missing cover traffic factory"))?)(&ctx);
345
346 let (chain_id, transport_id) = (ctx.chain_key.clone(), ctx.packet_key.clone());
347
348 let transport_api = HoprTransport::new(
349 (&chain_id, &transport_id),
350 chain_api.clone(),
351 graph.clone(),
352 vec![(&ctx.cfg.host).try_into().map_err(HoprLibError::TransportError)?],
353 ctx.cfg.protocol.clone(),
354 )
355 .map_err(HoprLibError::TransportError)?;
356
357 #[cfg(all(feature = "telemetry", not(test)))]
358 {
359 use hopr_api::types::primitive::traits::AsUnixTimestamp;
360 METRIC_PROCESS_START_TIME.set(hopr_platform::time::current_time().as_unix_timestamp().as_secs_f64());
361 METRIC_HOPR_LIB_VERSION.set(
362 &[const_format::formatcp!("{}", constants::APP_VERSION)],
363 const_format::formatcp!(
364 "{}.{}",
365 env!("CARGO_PKG_VERSION_MAJOR"),
366 env!("CARGO_PKG_VERSION_MINOR")
367 )
368 .parse()
369 .unwrap_or(0.0),
370 );
371 }
372
373 let (mut new_tickets_tx, new_tickets_rx) = async_broadcast::broadcast(2048);
374 new_tickets_tx.set_await_active(false);
375 new_tickets_tx.set_overflow(true);
376
377 let me_onchain = chain_id.public().to_address();
378
379 #[cfg(feature = "testing")]
380 tracing::warn!("!! FOR TESTING ONLY !! Node is running with some safety checks disabled!");
381
382 tracing::info!(
383 address = %me_onchain,
384 minimum_balance = %*SUGGESTED_NATIVE_BALANCE,
385 "node is not started, please fund this node",
386 );
387
388 crate::helpers::wait_for_funds(
389 *MIN_NATIVE_BALANCE,
390 *SUGGESTED_NATIVE_BALANCE,
391 Duration::from_secs(200),
392 me_onchain,
393 &chain_api,
394 )
395 .await?;
396
397 tracing::info!("starting HOPR node...");
398 let balance: hopr_api::types::primitive::prelude::XDaiBalance =
399 chain_api.balance(me_onchain).await.map_err(HoprLibError::chain)?;
400 let minimum_balance = *constants::MIN_NATIVE_BALANCE;
401
402 tracing::info!(address = %me_onchain, %balance, %minimum_balance, "node information");
403
404 if balance.le(&minimum_balance) {
405 return Err(HoprLibError::InsufficientFunds(
406 "cannot start the node without a sufficiently funded wallet".into(),
407 ));
408 }
409
410 let network_min_ticket_price = chain_api.minimum_ticket_price().await.map_err(HoprLibError::chain)?;
411 let configured_ticket_price = ctx.cfg.protocol.packet.codec.outgoing_ticket_price;
412 if configured_ticket_price.is_some_and(|c| c < network_min_ticket_price) {
413 return Err(HoprLibError::GeneralError(format!(
414 "configured outgoing ticket price < network minimum: {configured_ticket_price:?} < \
415 {network_min_ticket_price}"
416 )));
417 }
418
419 let network_min_win_prob = chain_api
420 .minimum_incoming_ticket_win_prob()
421 .await
422 .map_err(HoprLibError::chain)?;
423 let configured_win_prob = ctx.cfg.protocol.packet.codec.outgoing_win_prob;
424 if !std::env::var("HOPR_TEST_DISABLE_CHECKS").is_ok_and(|v| v.to_lowercase() == "true")
425 && configured_win_prob.is_some_and(|c| c.approx_cmp(&network_min_win_prob).is_lt())
426 {
427 return Err(HoprLibError::GeneralError(format!(
428 "configured outgoing win probability < network minimum: {configured_win_prob:?} < {network_min_win_prob}"
429 )));
430 }
431
432 tracing::info!(
433 peer_id = %transport_id.public().to_peerid_str(),
434 address = %me_onchain,
435 version = constants::APP_VERSION,
436 "Node information"
437 );
438
439 let safe_addr = ctx.cfg.safe_module.safe_address;
440 if me_onchain == safe_addr {
441 return Err(HoprLibError::GeneralError(
442 "cannot use self as staking safe address".into(),
443 ));
444 }
445
446 tracing::info!(%safe_addr, "registering safe with this node");
447 match chain_api.register_safe(&safe_addr).await {
448 Ok(awaiter) => {
449 awaiter.await.map_err(|error| {
450 tracing::error!(%safe_addr, %error, "safe registration failed");
451 HoprLibError::chain(error)
452 })?;
453 tracing::info!(%safe_addr, "safe successfully registered with this node");
454 }
455 Err(SafeRegistrationError::AlreadyRegistered(registered_safe)) => {
456 if registered_safe == safe_addr {
457 tracing::info!(%safe_addr, "this safe is already registered with this node");
458 } else {
459 tracing::error!(%safe_addr, %registered_safe, "node registered with different safe");
460 return Err(HoprLibError::GeneralError("node registered with different safe".into()));
461 }
462 }
463 Err(error) => {
464 tracing::error!(%safe_addr, %error, "safe registration failed");
465 return Err(HoprLibError::chain(error));
466 }
467 }
468
469 let multiaddresses_to_announce = if ctx.cfg.publish {
470 transport_api.announceable_multiaddresses()
471 } else {
472 Vec::new()
473 };
474
475 multiaddresses_to_announce
476 .iter()
477 .filter(|a| !is_public_address(a))
478 .for_each(|multi_addr| tracing::warn!(?multi_addr, "announcing private multiaddress"));
479
480 let chain_api_clone = chain_api.clone();
481 let me_offchain = *transport_id.public();
482 let node_ready = spawn(async move {
483 chain_api_clone
484 .await_key_binding(&me_offchain, NODE_READY_TIMEOUT)
485 .await
486 });
487
488 tracing::info!(?multiaddresses_to_announce, "announcing node on chain");
489 match chain_api.announce(&multiaddresses_to_announce, &transport_id).await {
490 Ok(awaiter) => {
491 awaiter.await.map_err(|error| {
492 tracing::error!(?multiaddresses_to_announce, %error, "node announcement failed");
493 HoprLibError::chain(error)
494 })?;
495 tracing::info!(?multiaddresses_to_announce, "node announced successfully");
496 }
497 Err(AnnouncementError::AlreadyAnnounced) => {
498 tracing::info!("node already announced on chain");
499 }
500 Err(error) => {
501 tracing::error!(%error, "failed to transmit node announcement");
502 return Err(HoprLibError::chain(error));
503 }
504 }
505
506 let this_node_account = node_ready
507 .await
508 .map_err(HoprLibError::other)?
509 .map_err(HoprLibError::chain)?;
510 if this_node_account.chain_addr != me_onchain || this_node_account.safe_address.is_none_or(|a| a != safe_addr) {
511 tracing::error!(%this_node_account, "account key-binding mismatch");
512 return Err(HoprLibError::GeneralError("account key-binding mismatch".into()));
513 }
514
515 tracing::info!(%this_node_account, "node account is ready");
516
517 {
519 let network_events = network.subscribe_network_events();
520 let graph_updater = graph.clone();
521 spawn(async move {
522 network_events
523 .for_each(|event| {
524 let graph_updater = graph_updater.clone();
525 async move {
526 let (peer_id, connected) = match event {
527 hopr_api::network::NetworkEvent::PeerConnected(p) => (p, true),
528 hopr_api::network::NetworkEvent::PeerDisconnected(p) => (p, false),
529 };
530 if let Ok(opk) = hopr_api::OffchainPublicKey::from_peerid(&peer_id) {
531 graph_updater.record_edge(hopr_api::graph::MeasurableEdge::<
532 hopr_transport::NeighborTelemetry,
533 hopr_transport::PathTelemetry,
534 >::ConnectionStatus {
535 peer: opk,
536 connected,
537 });
538 } else {
539 tracing::error!(%peer_id, "failed to convert peer ID to public key for graph update");
540 }
541 }
542 })
543 .await;
544 });
545 }
546
547 {
549 let chain_events = chain_api
550 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts, StateSyncOptions::OpenedChannels])
551 .map_err(HoprLibError::chain)?;
552
553 let graph_updater = graph.clone();
554 let chain_reader = chain_api.clone();
555
556 let ticket_price = Arc::new(parking_lot::RwLock::new(
557 chain_reader.minimum_ticket_price().await.unwrap_or_default(),
558 ));
559 let win_probability = Arc::new(parking_lot::RwLock::new(
560 chain_reader
561 .minimum_incoming_ticket_win_prob()
562 .await
563 .unwrap_or_default(),
564 ));
565
566 let proc = async move {
567 chain_events
568 .for_each(|chain_event| {
569 let chain_reader = chain_reader.clone();
570 let graph_updater = graph_updater.clone();
571 let ticket_price = ticket_price.clone();
572 let win_probability = win_probability.clone();
573
574 async move {
575 match chain_event {
576 ChainEvent::Announcement(account) => {
577 tracing::debug!(
578 account = %account.public_key,
579 "recording graph node for announced account"
580 );
581 graph_updater.record_node(account.public_key);
582 }
583 ChainEvent::ChannelOpened(channel)
584 | ChainEvent::ChannelClosed(channel)
585 | ChainEvent::ChannelBalanceIncreased(channel, _)
586 | ChainEvent::ChannelBalanceDecreased(channel, _) => {
587 let keys = hopr_async_runtime::prelude::spawn_blocking(move || {
588 chain_reader
589 .chain_key_to_packet_key(&channel.source)
590 .and_then(|src| {
591 Ok(src.zip(chain_reader.chain_key_to_packet_key(&channel.destination)?))
592 })
593 .map_err(anyhow::Error::from)
594 })
595 .await
596 .map_err(anyhow::Error::from)
597 .and_then(identity);
598
599 match keys {
600 Ok(Some((from, to))) => {
601 let capacity = if matches!(channel.status, ChannelStatus::Closed) {
602 None
603 } else if let Ok(ticket_value) =
604 ticket_price.read().div_f64(win_probability.read().as_f64())
605 {
606 Some(
607 channel
608 .balance
609 .amount()
610 .checked_div(ticket_value.amount())
611 .map(|v| v.low_u128())
612 .unwrap_or(u128::MAX),
613 )
614 } else {
615 None
616 };
617
618 tracing::debug!(
619 %channel, ?capacity,
620 "recording graph edge for channel capacity"
621 );
622 graph_updater.record_edge(hopr_api::graph::MeasurableEdge::<
623 hopr_transport::NeighborTelemetry,
624 hopr_transport::PathTelemetry,
625 >::Capacity(
626 Box::new(EdgeCapacityUpdate {
627 capacity,
628 src: from,
629 dest: to,
630 }),
631 ));
632 }
633 Ok(None) => {
634 tracing::error!(
635 %channel,
636 "could not find packet keys for channel endpoints"
637 );
638 }
639 Err(error) => {
640 tracing::error!(
641 %error, %channel,
642 "failed to convert chain keys to packet keys"
643 );
644 }
645 }
646 }
647 ChainEvent::ChannelClosureInitiated(_) => {}
648 ChainEvent::WinningProbabilityIncreased(prob)
649 | ChainEvent::WinningProbabilityDecreased(prob) => {
650 tracing::debug!(%prob, "recording winning probability change");
651 *win_probability.write() = prob;
652 }
653 ChainEvent::TicketPriceChanged(price) => {
654 tracing::debug!(%price, "recording ticket price change");
655 *ticket_price.write() = price;
656 }
657 _ => {}
658 }
659 }
660 })
661 .await;
662 }
663 .inspect(|_| {
664 tracing::warn!(
665 task = "chain-to-graph event wiring",
666 "long-running background task finished"
667 )
668 });
669 processes.insert(
670 HoprLibProcess::ChannelEvents,
671 hopr_async_runtime::spawn_as_abortable!(proc),
672 );
673 }
674
675 Ok(PreHopr {
676 chain_id,
677 transport_id,
678 cfg: ctx.cfg,
679 state: Arc::new(AtomicHoprState::new(HoprState::Uninitialized)),
680 transport_api,
681 chain_api,
682 ticket_event_subscribers: (new_tickets_tx, new_tickets_rx.deactivate()),
683 processes,
684 session_tx,
685 cover_traffic,
686 network,
687 network_process,
688 })
689}
690
691macro_rules! impl_build_methods {
696 () => {
697 pub async fn build_edge<TFact>(
699 self,
700 ticket_factory: TFact,
701 ) -> Result<Hopr<Chain, Graph, Net, ()>, HoprLibError>
702 where
703 TFact: TicketFactory + Clone + Send + Sync + 'static,
704 {
705 let (configured, session_tx, processes) = self.into_parts();
706 let pre = pre_build_inner(configured, session_tx, processes).await?;
707
708 tracing::info!("starting transport for edge node");
709 let (_, transport_processes) = pre
710 .transport_api
711 .run(
712 pre.cover_traffic,
713 pre.network,
714 pre.network_process,
715 futures::sink::drain(),
716 ticket_factory,
717 pre.session_tx,
718 )
719 .await?;
720
721 let mut processes = pre.processes;
722 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
723
724 let hopr = Hopr {
725 chain_id: NodeOnchainIdentity {
726 node_address: pre.chain_id.public().to_address(),
727 safe_address: pre.cfg.safe_module.safe_address,
728 module_address: pre.cfg.safe_module.module_address,
729 },
730 cfg: pre.cfg,
731 state: pre.state.clone(),
732 ticket_event_subscribers: pre.ticket_event_subscribers,
733 transport_id: pre.transport_id,
734 transport_api: pre.transport_api,
735 chain_api: pre.chain_api,
736 processes,
737 ticket_manager: (),
738 };
739
740 hopr.state.store(HoprState::Running, std::sync::atomic::Ordering::Relaxed);
741 tracing::info!(
742 id = %hopr.transport_id.public().to_peerid_str(),
743 version = constants::APP_VERSION,
744 "EDGE NODE STARTED AND RUNNING"
745 );
746
747 Ok(hopr)
748 }
749
750 pub async fn build_full<TMgr, TFact>(
752 self,
753 ticket_manager: TMgr,
754 ticket_factory: TFact,
755 ) -> Result<Hopr<Chain, Graph, Net, TMgr>, HoprLibError>
756 where
757 TMgr: TicketManagement + Clone + Send + Sync + 'static,
758 TFact: TicketFactory + Clone + Send + Sync + 'static,
759 {
760 let (configured, session_tx, processes) = self.into_parts();
761 let pre = pre_build_inner(configured, session_tx, processes).await?;
762 let mut processes = pre.processes;
763
764 tracing::info!("starting ticket events processor");
765 let (tickets_tx, tickets_rx) = channel(8192);
766 let (tickets_rx_stream, tickets_handle) = futures::stream::abortable(tickets_rx);
767 processes.insert(HoprLibProcess::TicketEvents, tickets_handle);
768 let new_ticket_tx = pre.ticket_event_subscribers.0.clone();
769 let tmgr_clone = ticket_manager.clone();
770 spawn(
771 tickets_rx_stream
772 .for_each(move |event| {
773 if let TicketEvent::WinningTicket(ticket) = &event
774 && let Err(error) = tmgr_clone.insert_incoming_ticket(**ticket)
775 {
776 tracing::error!(%error, "failed to insert incoming ticket");
777 }
778 if let Err(error) = new_ticket_tx.try_broadcast(event) {
779 tracing::error!(%error, "failed to broadcast ticket event");
780 }
781 futures::future::ready(())
782 })
783 .inspect(|_| {
784 tracing::warn!(task = %HoprLibProcess::TicketEvents, "long-running background task finished")
785 }),
786 );
787
788 {
789 let chain_for_neglect = pre.chain_api.clone();
790 let tmgr_for_neglect = ticket_manager.clone();
791 let events = pre.chain_api.subscribe().map_err(HoprLibError::chain)?;
792 let (neglect_handle, neglect_reg) = hopr_async_runtime::AbortHandle::new_pair();
793 spawn(
794 futures::stream::Abortable::new(
795 events.filter_map(move |event| {
796 futures::future::ready(match event {
797 ChainEvent::ChannelClosed(ch) => Some(ch),
798 _ => None,
799 })
800 }),
801 neglect_reg,
802 )
803 .for_each(move |closed_channel| {
804 let chain = chain_for_neglect.clone();
805 let tmgr = tmgr_for_neglect.clone();
806 async move {
807 match closed_channel.direction(chain.me()) {
808 Some(ChannelDirection::Incoming) => {
809 match tmgr.neglect_tickets(closed_channel.get_id(), None) {
810 Ok(neglected) if !neglected.is_empty() => {
811 tracing::warn!(
812 num_neglected = neglected.len(),
813 %closed_channel,
814 "tickets on incoming closed channel were neglected"
815 );
816 }
817 Ok(_) => {}
818 Err(error) => {
819 tracing::error!(
820 %error, %closed_channel,
821 "failed to neglect tickets on closed channel"
822 );
823 }
824 }
825 }
826 Some(ChannelDirection::Outgoing) => {}
827 _ => {}
828 }
829 }
830 })
831 .inspect(|_| {
832 tracing::warn!(
833 task = %HoprLibProcess::ChannelClosureNeglect,
834 "channel closure ticket neglect task finished"
835 )
836 }),
837 );
838 processes.insert(HoprLibProcess::ChannelClosureNeglect, neglect_handle);
839 }
840
841 tracing::info!("starting transport for full node");
842 let (_, transport_processes) = pre
843 .transport_api
844 .run(
845 pre.cover_traffic,
846 pre.network,
847 pre.network_process,
848 tickets_tx,
849 ticket_factory,
850 pre.session_tx,
851 )
852 .await?;
853 processes.flat_map_extend_from(transport_processes, HoprLibProcess::Transport);
854
855 let hopr = Hopr {
856 chain_id: NodeOnchainIdentity {
857 node_address: pre.chain_id.public().to_address(),
858 safe_address: pre.cfg.safe_module.safe_address,
859 module_address: pre.cfg.safe_module.module_address,
860 },
861 cfg: pre.cfg,
862 state: pre.state.clone(),
863 ticket_event_subscribers: pre.ticket_event_subscribers,
864 transport_id: pre.transport_id,
865 transport_api: pre.transport_api,
866 chain_api: pre.chain_api,
867 processes,
868 ticket_manager,
869 };
870
871 hopr.state.store(HoprState::Running, std::sync::atomic::Ordering::Relaxed);
872
873 tracing::info!(
874 id = %hopr.transport_id.public().to_peerid_str(),
875 version = constants::APP_VERSION,
876 "FULL NODE STARTED AND RUNNING"
877 );
878
879 #[cfg(all(feature = "telemetry", not(test)))]
880 METRIC_HOPR_NODE_INFO.set(
881 &[
882 &hopr.transport_id.public().to_peerid_str(),
883 &hopr.chain_id.node_address.to_string(),
884 &hopr.chain_id.safe_address.to_string(),
885 &hopr.chain_id.module_address.to_string(),
886 ],
887 1.0,
888 );
889
890 Ok(hopr)
891 }
892 };
893}
894
895#[cfg(feature = "session-server")]
897impl<Chain, Graph, Net, Ct> HoprBuilderWithSession<Chain, Graph, Net, Ct>
898where
899 Chain: HoprChainApi + Clone + Send + Sync + 'static,
900 Graph: HoprGraphApi<HoprNodeId = hopr_api::OffchainPublicKey> + Clone + Send + Sync + 'static,
901 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
902 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
903 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
904 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
905 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
906{
907 impl_build_methods!();
908
909 fn into_parts(
910 self,
911 ) -> (
912 HoprBuilderConfigured<Chain, Graph, Net, Ct>,
913 futures::channel::mpsc::Sender<IncomingSession>,
914 AbortableList<HoprLibProcess>,
915 ) {
916 let mut processes = AbortableList::<HoprLibProcess>::default();
917 processes.insert(HoprLibProcess::SessionServer, self.session_handle);
918 (self.inner, self.session_tx, processes)
919 }
920}
921
922#[cfg(not(feature = "session-server"))]
924impl<Chain, Graph, Net, Ct> HoprBuilderConfigured<Chain, Graph, Net, Ct>
925where
926 Chain: HoprChainApi + Clone + Send + Sync + 'static,
927 Graph: HoprGraphApi<HoprNodeId = hopr_api::OffchainPublicKey> + Clone + Send + Sync + 'static,
928 <Graph as hopr_api::graph::NetworkGraphTraverse>::Observed:
929 hopr_api::graph::traits::EdgeObservableRead + Send + 'static,
930 <Graph as hopr_api::graph::NetworkGraphWrite>::Observed: hopr_api::graph::traits::EdgeObservableWrite + Send,
931 Net: NetworkView + NetworkStreamControl + Send + Sync + Clone + 'static,
932 Ct: ProbingTrafficGeneration + CoverTrafficGeneration + Send + Sync + 'static,
933{
934 impl_build_methods!();
935
936 fn into_parts(
937 self,
938 ) -> (
939 HoprBuilderConfigured<Chain, Graph, Net, Ct>,
940 futures::channel::mpsc::Sender<IncomingSession>,
941 AbortableList<HoprLibProcess>,
942 ) {
943 let (tx, _rx) = channel::<IncomingSession>(1);
944 let processes = AbortableList::<HoprLibProcess>::default();
945 (self, tx, processes)
946 }
947}