1pub mod errors;
4pub mod executors;
5
6use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
7
8use alloy::{
9 rpc::{client::ClientBuilder, types::TransactionRequest},
10 transports::{
11 http::{Http, ReqwestTransport},
12 layers::RetryBackoffLayer,
13 },
14};
15use executors::{EthereumTransactionExecutor, RpcEthereumClient, RpcEthereumClientConfig};
16use futures::{
17 FutureExt, Stream, StreamExt,
18 future::{AbortHandle, BoxFuture},
19 stream::BoxStream,
20};
21use hopr_api::{
22 Multiaddr,
23 chain::{
24 AccountSelector, AnnouncementError, ChainEvents, ChainKeyOperations, ChainReadAccountOperations,
25 ChainReadChannelOperations, ChainReceipt, ChainValues, ChainWriteAccountOperations,
26 ChainWriteChannelOperations, ChainWriteTicketOperations, ChannelSelector, DomainSeparators,
27 },
28 db::TicketSelector,
29};
30use hopr_async_runtime::{prelude::sleep, spawn_as_abortable};
31use hopr_chain_actions::{
32 ChainActions,
33 action_queue::{ActionQueue, ActionQueueConfig},
34 action_state::{ActionState, IndexerActionTracker},
35 channels::ChannelActions,
36 errors::ChainActionsError,
37 node::NodeActions,
38 payload::SafePayloadGenerator,
39 redeem::TicketRedeemActions,
40};
41pub use hopr_chain_config as config;
42pub use hopr_chain_indexer::IndexerConfig;
43use hopr_chain_indexer::{block::Indexer, handlers::ContractEventHandlers};
44use hopr_chain_rpc::{
45 HoprRpcOperations,
46 client::DefaultRetryPolicy,
47 rpc::{RpcOperations, RpcOperationsConfig},
48};
49use hopr_chain_types::ContractAddresses;
50pub use hopr_chain_types::chain_events::SignificantChainEvent;
51use hopr_crypto_types::prelude::*;
52use hopr_db_node::HoprNodeDb;
53pub use hopr_db_sql::info::IndexerStateInfo;
54use hopr_db_sql::{
55 HoprIndexerDb, HoprIndexerDbConfig,
56 logs::HoprDbLogOperations,
57 prelude::{
58 HoprDbAccountOperations, HoprDbChannelOperations, HoprDbCorruptedChannelOperations, HoprDbInfoOperations,
59 },
60};
61pub use hopr_internal_types::channels::ChannelEntry;
62use hopr_internal_types::{
63 account::AccountEntry,
64 channels::{ChannelId, CorruptedChannelEntry},
65 prelude::{AcknowledgedTicket, AcknowledgedTicketStatus, ChannelStatus, generate_channel_id},
66 tickets::WinningProbability,
67};
68use hopr_primitive_types::prelude::*;
69use tracing::{debug, error, info, trace, warn};
70
71use crate::errors::{HoprChainError, Result};
72
73#[cfg(feature = "runtime-tokio")]
74pub type DefaultHttpRequestor = hopr_chain_rpc::transport::ReqwestClient;
75
76#[cfg(not(feature = "runtime-tokio"))]
77compile_error!("The `runtime-tokio` feature must be enabled");
78
79pub async fn wait_for_funds<R: ChainReadAccountOperations>(
83 min_balance: XDaiBalance,
84 suggested_balance: XDaiBalance,
85 max_delay: Duration,
86 resolver: &R,
87) -> Result<()> {
88 info!(
89 suggested_minimum_balance = %suggested_balance,
90 "Node about to start, checking for funds",
91 );
92
93 let multiplier = 1.05;
94 let mut current_delay = Duration::from_secs(2).min(max_delay);
95
96 while current_delay <= max_delay {
97 match resolver.node_balance::<XDai>().await {
98 Ok(current_balance) => {
99 info!(balance = %current_balance, "balance status");
100 if current_balance.ge(&min_balance) {
101 info!("node is funded");
102 return Ok(());
103 } else {
104 warn!("still unfunded, trying again soon");
105 }
106 }
107 Err(error) => error!(%error, "failed to fetch balance from the chain"),
108 }
109
110 sleep(current_delay).await;
111 current_delay = current_delay.mul_f64(multiplier);
112 }
113
114 Err(HoprChainError::Api("timeout waiting for funds".into()))
115}
116
117fn build_transport_client(url: &str) -> Result<Http<DefaultHttpRequestor>> {
118 let parsed_url = url::Url::parse(url).unwrap_or_else(|_| panic!("failed to parse URL: {url}"));
119 Ok(ReqwestTransport::new(parsed_url))
120}
121
122#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
123pub enum HoprChainProcess {
124 Indexer,
125 OutgoingOnchainActionQueue,
126}
127
128const ON_CHAIN_SIG_EVENT_QUEUE_SIZE: usize = 10_000;
129
130type ActionQueueType<T> = ActionQueue<
131 T,
132 IndexerActionTracker,
133 EthereumTransactionExecutor<
134 TransactionRequest,
135 RpcEthereumClient<RpcOperations<DefaultHttpRequestor>>,
136 SafePayloadGenerator,
137 >,
138>;
139
140#[derive(Clone)]
147pub struct HoprChain {
148 me_onchain: ChainKeypair,
149 safe_address: Address,
150 contract_addresses: ContractAddresses,
151 indexer_cfg: IndexerConfig,
152 indexer_events_tx: futures::channel::mpsc::Sender<SignificantChainEvent>,
153 indexer_events_rx: Arc<std::sync::Mutex<Option<futures::channel::mpsc::Receiver<SignificantChainEvent>>>>,
154 db: HoprIndexerDb,
155 node_db: HoprNodeDb,
156 hopr_chain_actions: ChainActions<HoprNodeDb>,
157 action_queue: ActionQueueType<HoprNodeDb>,
158 action_state: Arc<IndexerActionTracker>,
159 rpc_operations: RpcOperations<DefaultHttpRequestor>,
160}
161
162impl HoprChain {
163 #[allow(clippy::too_many_arguments)] pub fn new(
165 me_onchain: ChainKeypair,
166 chain_config: config::ChainNetworkConfig,
167 node_db: HoprNodeDb,
168 data_dir_path: &str,
169 module_address: Address,
170 contract_addresses: ContractAddresses,
171 safe_address: Address,
172 indexer_cfg: IndexerConfig,
173 ) -> Result<Self> {
174 let db = futures::executor::block_on(HoprIndexerDb::new(
175 PathBuf::from_iter([data_dir_path, "index_db"]).as_path(),
176 me_onchain.clone(),
177 HoprIndexerDbConfig {
178 create_if_missing: node_db.config().create_if_missing,
179 force_create: node_db.config().force_create,
180 log_slow_queries: node_db.config().log_slow_queries,
181 },
182 ))?;
183
184 let mut rpc_http_config = hopr_chain_rpc::HttpPostRequestorConfig::default();
186 if let Some(max_rpc_req) = chain_config.max_requests_per_sec {
187 rpc_http_config.max_requests_per_sec = Some(max_rpc_req); }
189
190 let rpc_http_retry_policy = DefaultRetryPolicy::default();
193
194 let rpc_cfg = RpcOperationsConfig {
196 chain_id: chain_config.chain.chain_id as u64,
197 contract_addrs: contract_addresses,
198 module_address,
199 safe_address,
200 expected_block_time: Duration::from_millis(chain_config.chain.block_time),
201 tx_polling_interval: Duration::from_millis(chain_config.tx_polling_interval),
202 finality: chain_config.confirmations,
203 max_block_range_fetch_size: chain_config.max_block_range,
204 ..Default::default()
205 };
206
207 let rpc_client_cfg = RpcEthereumClientConfig::default();
209
210 let action_queue_cfg = ActionQueueConfig::default();
212
213 let transport_client = build_transport_client(&chain_config.chain.default_provider)?;
216
217 let rpc_client = ClientBuilder::default()
218 .layer(RetryBackoffLayer::new_with_policy(2, 100, 100, rpc_http_retry_policy))
219 .transport(transport_client.clone(), transport_client.guess_local());
220
221 let requestor = DefaultHttpRequestor::new();
222
223 let rpc_operations =
225 RpcOperations::new(rpc_client, requestor, &me_onchain, rpc_cfg, None).expect("failed to initialize RPC");
226
227 let ethereum_tx_executor = EthereumTransactionExecutor::new(
229 RpcEthereumClient::new(rpc_operations.clone(), rpc_client_cfg),
230 SafePayloadGenerator::new(&me_onchain, contract_addresses, module_address),
231 );
232
233 let action_queue = ActionQueue::new(
235 node_db.clone(),
236 IndexerActionTracker::default(),
237 ethereum_tx_executor,
238 action_queue_cfg,
239 );
240
241 let action_state = action_queue.action_state();
242 let action_sender = action_queue.new_sender();
243
244 let hopr_chain_actions = ChainActions::new(&me_onchain, db.clone(), node_db.clone(), action_sender);
246
247 let (indexer_events_tx, indexer_events_rx) =
249 futures::channel::mpsc::channel::<SignificantChainEvent>(ON_CHAIN_SIG_EVENT_QUEUE_SIZE);
250
251 Ok(Self {
252 me_onchain,
253 safe_address,
254 contract_addresses,
255 indexer_cfg,
256 indexer_events_tx,
257 indexer_events_rx: Arc::new(std::sync::Mutex::new(Some(indexer_events_rx))),
258 db,
259 node_db,
260 hopr_chain_actions,
261 action_queue,
262 action_state,
263 rpc_operations,
264 })
265 }
266
267 pub async fn start(&self) -> errors::Result<HashMap<HoprChainProcess, AbortHandle>> {
272 let mut processes: HashMap<HoprChainProcess, AbortHandle> = HashMap::new();
273
274 processes.insert(
275 HoprChainProcess::OutgoingOnchainActionQueue,
276 spawn_as_abortable!(self.action_queue.clone().start().inspect(|_| tracing::warn!(
277 task = "action queue - outgoing",
278 "long-running background task finished"
279 ))),
280 );
281 processes.insert(
282 HoprChainProcess::Indexer,
283 Indexer::new(
284 self.rpc_operations.clone(),
285 ContractEventHandlers::new(
286 self.contract_addresses,
287 self.safe_address,
288 self.me_onchain.clone(),
289 self.db.clone(),
290 self.node_db.clone(),
291 self.rpc_operations.clone(),
292 ),
293 self.db.clone(),
294 self.indexer_cfg.clone(),
295 self.indexer_events_tx.clone(),
296 )
297 .start()
298 .await?,
299 );
300 Ok(processes)
301 }
302
303 pub fn me_onchain(&self) -> Address {
304 self.me_onchain.public().to_address()
305 }
306
307 pub async fn corrupted_channels(&self) -> errors::Result<Vec<CorruptedChannelEntry>> {
308 Ok(self.db.get_all_corrupted_channels(None).await?)
309 }
310
311 fn actions_ref(&self) -> &ChainActions<HoprNodeDb> {
312 &self.hopr_chain_actions
313 }
314
315 fn rpc(&self) -> &RpcOperations<DefaultHttpRequestor> {
316 &self.rpc_operations
317 }
318
319 pub async fn get_indexer_state(&self) -> errors::Result<IndexerStateInfo> {
320 let indexer_state_info = self.db.get_indexer_state_info(None).await?;
321
322 match self.db.get_last_checksummed_log().await? {
323 Some(log) => {
324 let checksum = match log.checksum {
325 Some(checksum) => Hash::from_hex(checksum.as_str())?,
326 None => Hash::default(),
327 };
328 Ok(IndexerStateInfo {
329 latest_log_block_number: log.block_number as u32,
330 latest_log_checksum: checksum,
331 ..indexer_state_info
332 })
333 }
334 None => Ok(indexer_state_info),
335 }
336 }
337}
338
339#[async_trait::async_trait]
340impl ChainReadAccountOperations for HoprChain {
341 type Error = HoprChainError;
342
343 async fn node_balance<C: Currency>(&self) -> std::result::Result<Balance<C>, Self::Error> {
344 let bal = if C::is::<XDai>() {
345 self.rpc_operations
346 .get_xdai_balance(self.me_onchain())
347 .await?
348 .to_be_bytes()
349 } else if C::is::<WxHOPR>() {
350 self.rpc_operations
351 .get_hopr_balance(self.me_onchain())
352 .await?
353 .to_be_bytes()
354 } else {
355 return Err(HoprChainError::Api("unsupported currency".into()));
356 };
357
358 Ok(Balance::<C>::from(U256::from_be_bytes(bal)))
359 }
360
361 async fn safe_balance<C: Currency>(&self) -> std::result::Result<Balance<C>, Self::Error> {
362 let bal = if C::is::<XDai>() {
363 self.rpc_operations
364 .get_xdai_balance(self.safe_address)
365 .await?
366 .to_be_bytes()
367 } else if C::is::<WxHOPR>() {
368 self.rpc_operations
369 .get_hopr_balance(self.safe_address)
370 .await?
371 .to_be_bytes()
372 } else {
373 return Err(HoprChainError::Api("unsupported currency".into()));
374 };
375
376 Ok(Balance::<C>::from(U256::from_be_bytes(bal)))
377 }
378
379 async fn safe_allowance<C: Currency>(&self) -> std::result::Result<Balance<C>, Self::Error> {
380 let amount = if C::is::<XDai>() {
381 return Err(HoprChainError::Api("unsupported currency".into()));
382 } else {
383 self.rpc_operations
384 .get_hopr_allowance(self.safe_address, self.contract_addresses.channels)
385 .await?
386 .amount()
387 };
388 Ok(Balance::<C>::from(amount))
389 }
390
391 async fn find_account_by_address(
392 &self,
393 address: &Address,
394 ) -> std::result::Result<Option<AccountEntry>, Self::Error> {
395 Ok(self.db.get_account(None, *address).await?)
396 }
397
398 async fn find_account_by_packet_key(
399 &self,
400 packet_key: &OffchainPublicKey,
401 ) -> std::result::Result<Option<AccountEntry>, Self::Error> {
402 Ok(self.db.get_account(None, *packet_key).await?)
403 }
404
405 async fn check_node_safe_module_status(&self) -> std::result::Result<bool, Self::Error> {
406 let safe_module_configuration = self
407 .rpc_operations
408 .check_node_safe_module_status(self.me_onchain())
409 .await?;
410 if !safe_module_configuration.should_pass() {
411 error!(
412 ?safe_module_configuration,
413 "Something is wrong with the safe module configuration",
414 );
415 Ok(false)
416 } else {
417 Ok(true)
418 }
419 }
420
421 async fn can_register_with_safe(&self, safe_address: &Address) -> std::result::Result<bool, Self::Error> {
422 let me = self.me_onchain.public().to_address();
423 let target_address = self.rpc().get_module_target_address().await?;
424 debug!(node_address = %me, %safe_address, %target_address, "can register with safe");
425
426 if &target_address != safe_address {
427 return Err(HoprChainError::Api("safe is not the module target".into()));
429 }
430
431 let registered_address = self.rpc().get_safe_from_node_safe_registry(me).await?;
432 info!(%registered_address, "currently registered Safe address in NodeSafeRegistry");
433
434 if registered_address.is_zero() {
435 info!("Node is not associated with a Safe in NodeSafeRegistry yet");
436 Ok(true)
437 } else if ®istered_address != safe_address {
438 Err(HoprChainError::Api(
439 "Node is associated with a different Safe in NodeSafeRegistry".into(),
440 ))
441 } else {
442 info!("Node is associated with correct Safe in NodeSafeRegistry");
443 Ok(false)
444 }
445 }
446
447 async fn stream_accounts<'a>(
448 &'a self,
449 selector: AccountSelector,
450 ) -> std::result::Result<BoxStream<'a, AccountEntry>, Self::Error> {
451 Ok(self.db.stream_accounts(selector.public_only).await?)
452 }
453
454 async fn count_accounts(&self, selector: AccountSelector) -> std::result::Result<usize, Self::Error> {
455 Ok(self.db.stream_accounts(selector.public_only).await?.count().await)
456 }
457}
458
459#[async_trait::async_trait]
460impl ChainWriteAccountOperations for HoprChain {
461 type Error = HoprChainError;
462
463 async fn announce(
464 &self,
465 multiaddrs: &[Multiaddr],
466 key: &OffchainKeypair,
467 ) -> std::result::Result<
468 BoxFuture<'_, std::result::Result<ChainReceipt, Self::Error>>,
469 AnnouncementError<Self::Error>,
470 > {
471 Ok(self
472 .actions_ref()
473 .announce(multiaddrs, key)
474 .await
475 .map_err(|error| match error {
476 hopr_chain_actions::errors::ChainActionsError::AlreadyAnnounced => AnnouncementError::AlreadyAnnounced,
477 e => AnnouncementError::ProcessingError(HoprChainError::ActionsError(e)),
478 })?
479 .map(|r| r.map(|c| c.tx_hash).map_err(HoprChainError::from))
480 .boxed())
481 }
482
483 async fn withdraw<C: Currency + Send>(
484 &self,
485 balance: Balance<C>,
486 recipient: &Address,
487 ) -> std::result::Result<BoxFuture<'_, std::result::Result<ChainReceipt, Self::Error>>, Self::Error> {
488 Ok(self
489 .actions_ref()
490 .withdraw(*recipient, balance)
491 .await?
492 .map(|r| r.map(|c| c.tx_hash).map_err(HoprChainError::from))
493 .boxed())
494 }
495
496 async fn register_safe(
497 &self,
498 safe_address: &Address,
499 ) -> std::result::Result<BoxFuture<'_, std::result::Result<ChainReceipt, Self::Error>>, Self::Error> {
500 Ok(self
501 .actions_ref()
502 .register_safe_by_node(*safe_address)
503 .await?
504 .map(|r| r.map(|c| c.tx_hash).map_err(HoprChainError::from))
505 .boxed())
506 }
507}
508
509#[async_trait::async_trait]
510impl ChainReadChannelOperations for HoprChain {
511 type Error = HoprChainError;
512
513 fn me(&self) -> &Address {
514 self.me_onchain.public().as_ref()
515 }
516
517 async fn channel_by_parties(
518 &self,
519 src: &Address,
520 dst: &Address,
521 ) -> std::result::Result<Option<ChannelEntry>, Self::Error> {
522 Ok(self.db.get_channel_by_parties(None, src, dst, true).await?)
523 }
524
525 async fn channel_by_id(&self, channel_id: &ChannelId) -> std::result::Result<Option<ChannelEntry>, Self::Error> {
526 Ok(self.db.get_channel_by_id(None, channel_id).await?)
527 }
528
529 async fn stream_channels<'a>(
530 &'a self,
531 selector: ChannelSelector,
532 ) -> std::result::Result<BoxStream<'a, ChannelEntry>, Self::Error> {
533 Ok(self
534 .db
535 .stream_channels(
536 selector.source,
537 selector.destination,
538 &selector.allowed_states,
539 (selector.closure_time_range.0, selector.closure_time_range.1),
540 )
541 .await?)
542 }
543}
544
545#[async_trait::async_trait]
546impl ChainWriteChannelOperations for HoprChain {
547 type Error = HoprChainError;
548
549 async fn open_channel<'a>(
550 &'a self,
551 dst: &'a Address,
552 amount: HoprBalance,
553 ) -> std::result::Result<BoxFuture<'a, std::result::Result<(ChannelId, ChainReceipt), Self::Error>>, Self::Error>
554 {
555 let me = self.me_onchain();
556 Ok(self
557 .actions_ref()
558 .open_channel(*dst, amount)
559 .await?
560 .map(move |res| {
561 res.map(|c| (generate_channel_id(&me, dst), c.tx_hash))
562 .map_err(HoprChainError::from)
563 })
564 .boxed())
565 }
566
567 async fn fund_channel<'a>(
568 &'a self,
569 channel_id: &'a ChannelId,
570 amount: HoprBalance,
571 ) -> std::result::Result<BoxFuture<'a, std::result::Result<ChainReceipt, Self::Error>>, Self::Error> {
572 Ok(self
573 .actions_ref()
574 .fund_channel(*channel_id, amount)
575 .await?
576 .map(|res| res.map(|c| c.tx_hash).map_err(HoprChainError::from))
577 .boxed())
578 }
579
580 async fn close_channel<'a>(
581 &'a self,
582 channel_id: &'a ChannelId,
583 ) -> std::result::Result<BoxFuture<'a, std::result::Result<(ChannelStatus, ChainReceipt), Self::Error>>, Self::Error>
584 {
585 let channel = self
586 .db
587 .get_channel_by_id(None, channel_id)
588 .await?
589 .ok_or(HoprChainError::Api("channel not found".into()))?;
590
591 Ok(self
592 .actions_ref()
593 .close_channel(channel)
594 .await?
595 .map(|res| {
596 res.and_then(|c| {
597 let status = match c.event {
598 Some(hopr_chain_types::chain_events::ChainEventType::ChannelClosed(_)) => ChannelStatus::Closed,
599 Some(hopr_chain_types::chain_events::ChainEventType::ChannelClosureInitiated(c)) => c.status,
600 _ => return Err(ChainActionsError::InvalidState("closure must have event type".into())),
601 };
602
603 Ok((status, c.tx_hash))
604 })
605 .map_err(HoprChainError::from)
606 })
607 .boxed())
608 }
609}
610
611#[async_trait::async_trait]
612impl ChainKeyOperations for HoprChain {
613 type Error = HoprChainError;
614 type Mapper = hopr_db_sql::CacheKeyMapper;
615
616 async fn chain_key_to_packet_key(
617 &self,
618 chain: &Address,
619 ) -> std::result::Result<Option<OffchainPublicKey>, Self::Error> {
620 match self.db.translate_key(None, *chain).await? {
621 None => Ok(None),
622 Some(key) => Ok(Some(key.try_into()?)),
623 }
624 }
625
626 async fn packet_key_to_chain_key(
627 &self,
628 packet: &OffchainPublicKey,
629 ) -> std::result::Result<Option<Address>, Self::Error> {
630 match self.db.translate_key(None, *packet).await? {
631 None => Ok(None),
632 Some(key) => Ok(Some(key.try_into()?)),
633 }
634 }
635
636 fn key_id_mapper_ref(&self) -> &Self::Mapper {
637 self.db.key_id_mapper_ref()
638 }
639}
640
641#[async_trait::async_trait]
642impl ChainValues for HoprChain {
643 type Error = HoprChainError;
644
645 async fn domain_separators(&self) -> std::result::Result<DomainSeparators, Self::Error> {
646 let indexer_data = self.db.get_indexer_data(None).await?;
647 Ok(DomainSeparators {
648 ledger: indexer_data
649 .ledger_dst
650 .ok_or(HoprChainError::Api("missing ledger dst".into()))?,
651 safe_registry: indexer_data
652 .safe_registry_dst
653 .ok_or(HoprChainError::Api("missing safe registry dst".into()))?,
654 channel: indexer_data
655 .channels_dst
656 .ok_or(HoprChainError::Api("missing channel dst".into()))?,
657 })
658 }
659
660 async fn minimum_incoming_ticket_win_prob(&self) -> std::result::Result<WinningProbability, Self::Error> {
661 let indexer_data = self.db.get_indexer_data(None).await?;
662 Ok(indexer_data.minimum_incoming_ticket_winning_prob)
663 }
664
665 async fn minimum_ticket_price(&self) -> std::result::Result<HoprBalance, Self::Error> {
666 let indexer_data = self.db.get_indexer_data(None).await?;
667 Ok(indexer_data.ticket_price.unwrap_or_default())
669 }
670
671 async fn channel_closure_notice_period(&self) -> std::result::Result<Duration, Self::Error> {
672 Ok(self.rpc_operations.get_channel_closure_notice_period().await?)
673 }
674}
675
676#[async_trait::async_trait]
677impl ChainWriteTicketOperations for HoprChain {
678 type Error = HoprChainError;
679
680 async fn redeem_ticket(
681 &self,
682 ticket: AcknowledgedTicket,
683 ) -> std::result::Result<BoxFuture<'_, std::result::Result<ChainReceipt, Self::Error>>, Self::Error> {
684 Ok(self
685 .actions_ref()
686 .redeem_ticket(ticket)
687 .await?
688 .map(|r| r.map(|c| c.tx_hash).map_err(HoprChainError::from))
689 .boxed())
690 }
691
692 async fn redeem_tickets_via_selector(
693 &self,
694 selector: TicketSelector,
695 ) -> std::result::Result<Vec<BoxFuture<'_, std::result::Result<ChainReceipt, Self::Error>>>, Self::Error> {
696 Ok(self
697 .actions_ref()
698 .redeem_tickets(selector.with_state(AcknowledgedTicketStatus::Untouched))
699 .await?
700 .into_iter()
701 .map(|r| r.map(|c| c.map(|ac| ac.tx_hash).map_err(HoprChainError::from)).boxed())
702 .collect())
703 }
704}
705
706impl ChainEvents for HoprChain {
707 type Error = HoprChainError;
708
709 fn subscribe(
710 &self,
711 ) -> std::result::Result<impl Stream<Item = SignificantChainEvent> + Send + 'static, Self::Error> {
712 if let Some(stream) = self
713 .indexer_events_rx
714 .lock()
715 .map_err(|_| HoprChainError::Api("failed to lock mutex".into()))?
716 .take()
717 {
718 let indexer_action_tracker = self.action_state.clone();
719 Ok(stream.then(move |event| {
720 let indexer_action_tracker = indexer_action_tracker.clone();
721 async move {
722 let resolved = indexer_action_tracker.match_and_resolve(&event).await;
723 if resolved.is_empty() {
724 trace!(%event, "No indexer expectations resolved for the event");
725 } else {
726 debug!(count = resolved.len(), %event, "resolved indexer expectations");
727 }
728 event
729 }
730 }))
731 } else {
732 Err(HoprChainError::Api("cannot subscribe more than once".into()))
733 }
734 }
735}