hopr_chain_api/
lib.rs

1//! Crate containing the API object for chain operations used by the HOPRd node.
2
3pub mod errors;
4pub mod executors;
5
6use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
7
8use alloy::{
9    rpc::{client::ClientBuilder, types::TransactionRequest},
10    transports::{
11        http::{Http, ReqwestTransport},
12        layers::RetryBackoffLayer,
13    },
14};
15use executors::{EthereumTransactionExecutor, RpcEthereumClient, RpcEthereumClientConfig};
16use futures::{
17    FutureExt, Stream, StreamExt,
18    future::{AbortHandle, BoxFuture},
19    stream::BoxStream,
20};
21use hopr_api::{
22    Multiaddr,
23    chain::{
24        AccountSelector, AnnouncementError, ChainEvents, ChainKeyOperations, ChainReadAccountOperations,
25        ChainReadChannelOperations, ChainReceipt, ChainValues, ChainWriteAccountOperations,
26        ChainWriteChannelOperations, ChainWriteTicketOperations, ChannelSelector, DomainSeparators,
27    },
28    db::TicketSelector,
29};
30use hopr_async_runtime::{prelude::sleep, spawn_as_abortable};
31use hopr_chain_actions::{
32    ChainActions,
33    action_queue::{ActionQueue, ActionQueueConfig},
34    action_state::{ActionState, IndexerActionTracker},
35    channels::ChannelActions,
36    errors::ChainActionsError,
37    node::NodeActions,
38    payload::SafePayloadGenerator,
39    redeem::TicketRedeemActions,
40};
41pub use hopr_chain_config as config;
42pub use hopr_chain_indexer::IndexerConfig;
43use hopr_chain_indexer::{block::Indexer, handlers::ContractEventHandlers};
44use hopr_chain_rpc::{
45    HoprRpcOperations,
46    client::DefaultRetryPolicy,
47    rpc::{RpcOperations, RpcOperationsConfig},
48};
49use hopr_chain_types::ContractAddresses;
50pub use hopr_chain_types::chain_events::SignificantChainEvent;
51use hopr_crypto_types::prelude::*;
52use hopr_db_node::HoprNodeDb;
53pub use hopr_db_sql::info::IndexerStateInfo;
54use hopr_db_sql::{
55    HoprIndexerDb, HoprIndexerDbConfig,
56    logs::HoprDbLogOperations,
57    prelude::{
58        HoprDbAccountOperations, HoprDbChannelOperations, HoprDbCorruptedChannelOperations, HoprDbInfoOperations,
59    },
60};
61pub use hopr_internal_types::channels::ChannelEntry;
62use hopr_internal_types::{
63    account::AccountEntry,
64    channels::{ChannelId, CorruptedChannelEntry},
65    prelude::{AcknowledgedTicket, AcknowledgedTicketStatus, ChannelStatus, generate_channel_id},
66    tickets::WinningProbability,
67};
68use hopr_primitive_types::prelude::*;
69use tracing::{debug, error, info, trace, warn};
70
71use crate::errors::{HoprChainError, Result};
72
73#[cfg(feature = "runtime-tokio")]
74pub type DefaultHttpRequestor = hopr_chain_rpc::transport::ReqwestClient;
75
76#[cfg(not(feature = "runtime-tokio"))]
77compile_error!("The `runtime-tokio` feature must be enabled");
78
79/// Waits until the given address is funded.
80///
81/// This is done by querying the RPC provider for balance with backoff until `max_delay` argument.
82pub async fn wait_for_funds<R: ChainReadAccountOperations>(
83    min_balance: XDaiBalance,
84    suggested_balance: XDaiBalance,
85    max_delay: Duration,
86    resolver: &R,
87) -> Result<()> {
88    info!(
89        suggested_minimum_balance = %suggested_balance,
90        "Node about to start, checking for funds",
91    );
92
93    let multiplier = 1.05;
94    let mut current_delay = Duration::from_secs(2).min(max_delay);
95
96    while current_delay <= max_delay {
97        match resolver.node_balance::<XDai>().await {
98            Ok(current_balance) => {
99                info!(balance = %current_balance, "balance status");
100                if current_balance.ge(&min_balance) {
101                    info!("node is funded");
102                    return Ok(());
103                } else {
104                    warn!("still unfunded, trying again soon");
105                }
106            }
107            Err(error) => error!(%error, "failed to fetch balance from the chain"),
108        }
109
110        sleep(current_delay).await;
111        current_delay = current_delay.mul_f64(multiplier);
112    }
113
114    Err(HoprChainError::Api("timeout waiting for funds".into()))
115}
116
117fn build_transport_client(url: &str) -> Result<Http<DefaultHttpRequestor>> {
118    let parsed_url = url::Url::parse(url).unwrap_or_else(|_| panic!("failed to parse URL: {url}"));
119    Ok(ReqwestTransport::new(parsed_url))
120}
121
122#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
123pub enum HoprChainProcess {
124    Indexer,
125    OutgoingOnchainActionQueue,
126}
127
128const ON_CHAIN_SIG_EVENT_QUEUE_SIZE: usize = 10_000;
129
130type ActionQueueType<T> = ActionQueue<
131    T,
132    IndexerActionTracker,
133    EthereumTransactionExecutor<
134        TransactionRequest,
135        RpcEthereumClient<RpcOperations<DefaultHttpRequestor>>,
136        SafePayloadGenerator,
137    >,
138>;
139
140/// Represents all chain interactions exported to be used in the hopr-lib
141///
142/// NOTE: instead of creating a unified interface the [HoprChain] exports
143/// some functionality (e.g. the [ChainActions] as a referentially used)
144/// object. This behavior will be refactored and hidden behind a trait
145/// in the future implementations.
146#[derive(Clone)]
147pub struct HoprChain {
148    me_onchain: ChainKeypair,
149    safe_address: Address,
150    contract_addresses: ContractAddresses,
151    indexer_cfg: IndexerConfig,
152    indexer_events_tx: futures::channel::mpsc::Sender<SignificantChainEvent>,
153    indexer_events_rx: Arc<std::sync::Mutex<Option<futures::channel::mpsc::Receiver<SignificantChainEvent>>>>,
154    db: HoprIndexerDb,
155    node_db: HoprNodeDb,
156    hopr_chain_actions: ChainActions<HoprNodeDb>,
157    action_queue: ActionQueueType<HoprNodeDb>,
158    action_state: Arc<IndexerActionTracker>,
159    rpc_operations: RpcOperations<DefaultHttpRequestor>,
160}
161
162impl HoprChain {
163    #[allow(clippy::too_many_arguments)] // TODO: refactor this function into a reasonable group of components once fully rearchitected
164    pub fn new(
165        me_onchain: ChainKeypair,
166        chain_config: config::ChainNetworkConfig,
167        node_db: HoprNodeDb,
168        data_dir_path: &str,
169        module_address: Address,
170        contract_addresses: ContractAddresses,
171        safe_address: Address,
172        indexer_cfg: IndexerConfig,
173    ) -> Result<Self> {
174        let db = futures::executor::block_on(HoprIndexerDb::new(
175            PathBuf::from_iter([data_dir_path, "index_db"]).as_path(),
176            me_onchain.clone(),
177            HoprIndexerDbConfig {
178                create_if_missing: node_db.config().create_if_missing,
179                force_create: node_db.config().force_create,
180                log_slow_queries: node_db.config().log_slow_queries,
181            },
182        ))?;
183
184        // TODO: extract this from the global config type
185        let mut rpc_http_config = hopr_chain_rpc::HttpPostRequestorConfig::default();
186        if let Some(max_rpc_req) = chain_config.max_requests_per_sec {
187            rpc_http_config.max_requests_per_sec = Some(max_rpc_req); // override the default if set
188        }
189
190        // TODO(#7140): replace this DefaultRetryPolicy with a custom one that computes backoff with the number of
191        // retries
192        let rpc_http_retry_policy = DefaultRetryPolicy::default();
193
194        // TODO: extract this from the global config type
195        let rpc_cfg = RpcOperationsConfig {
196            chain_id: chain_config.chain.chain_id as u64,
197            contract_addrs: contract_addresses,
198            module_address,
199            safe_address,
200            expected_block_time: Duration::from_millis(chain_config.chain.block_time),
201            tx_polling_interval: Duration::from_millis(chain_config.tx_polling_interval),
202            finality: chain_config.confirmations,
203            max_block_range_fetch_size: chain_config.max_block_range,
204            ..Default::default()
205        };
206
207        // TODO: extract this from the global config type
208        let rpc_client_cfg = RpcEthereumClientConfig::default();
209
210        // TODO: extract this from the global config type
211        let action_queue_cfg = ActionQueueConfig::default();
212
213        // --- Configs done ---
214
215        let transport_client = build_transport_client(&chain_config.chain.default_provider)?;
216
217        let rpc_client = ClientBuilder::default()
218            .layer(RetryBackoffLayer::new_with_policy(2, 100, 100, rpc_http_retry_policy))
219            .transport(transport_client.clone(), transport_client.guess_local());
220
221        let requestor = DefaultHttpRequestor::new();
222
223        // Build RPC operations
224        let rpc_operations =
225            RpcOperations::new(rpc_client, requestor, &me_onchain, rpc_cfg, None).expect("failed to initialize RPC");
226
227        // Build the Ethereum Transaction Executor that uses RpcOperations as backend
228        let ethereum_tx_executor = EthereumTransactionExecutor::new(
229            RpcEthereumClient::new(rpc_operations.clone(), rpc_client_cfg),
230            SafePayloadGenerator::new(&me_onchain, contract_addresses, module_address),
231        );
232
233        // Build the Action Queue
234        let action_queue = ActionQueue::new(
235            node_db.clone(),
236            IndexerActionTracker::default(),
237            ethereum_tx_executor,
238            action_queue_cfg,
239        );
240
241        let action_state = action_queue.action_state();
242        let action_sender = action_queue.new_sender();
243
244        // Instantiate Chain Actions
245        let hopr_chain_actions = ChainActions::new(&me_onchain, db.clone(), node_db.clone(), action_sender);
246
247        // The channel can be bounded, since it is used only after the historical on-chain sync has been completed.
248        let (indexer_events_tx, indexer_events_rx) =
249            futures::channel::mpsc::channel::<SignificantChainEvent>(ON_CHAIN_SIG_EVENT_QUEUE_SIZE);
250
251        Ok(Self {
252            me_onchain,
253            safe_address,
254            contract_addresses,
255            indexer_cfg,
256            indexer_events_tx,
257            indexer_events_rx: Arc::new(std::sync::Mutex::new(Some(indexer_events_rx))),
258            db,
259            node_db,
260            hopr_chain_actions,
261            action_queue,
262            action_state,
263            rpc_operations,
264        })
265    }
266
267    /// Execute all processes of the [`HoprChain`] object.
268    ///
269    /// This method will spawn the [`HoprChainProcess::Indexer`] and [`HoprChainProcess::OutgoingOnchainActionQueue`]
270    /// processes and return join handles to the calling function.
271    pub async fn start(&self) -> errors::Result<HashMap<HoprChainProcess, AbortHandle>> {
272        let mut processes: HashMap<HoprChainProcess, AbortHandle> = HashMap::new();
273
274        processes.insert(
275            HoprChainProcess::OutgoingOnchainActionQueue,
276            spawn_as_abortable!(self.action_queue.clone().start().inspect(|_| tracing::warn!(
277                task = "action queue - outgoing",
278                "long-running background task finished"
279            ))),
280        );
281        processes.insert(
282            HoprChainProcess::Indexer,
283            Indexer::new(
284                self.rpc_operations.clone(),
285                ContractEventHandlers::new(
286                    self.contract_addresses,
287                    self.safe_address,
288                    self.me_onchain.clone(),
289                    self.db.clone(),
290                    self.node_db.clone(),
291                    self.rpc_operations.clone(),
292                ),
293                self.db.clone(),
294                self.indexer_cfg.clone(),
295                self.indexer_events_tx.clone(),
296            )
297            .start()
298            .await?,
299        );
300        Ok(processes)
301    }
302
303    pub fn me_onchain(&self) -> Address {
304        self.me_onchain.public().to_address()
305    }
306
307    pub async fn corrupted_channels(&self) -> errors::Result<Vec<CorruptedChannelEntry>> {
308        Ok(self.db.get_all_corrupted_channels(None).await?)
309    }
310
311    fn actions_ref(&self) -> &ChainActions<HoprNodeDb> {
312        &self.hopr_chain_actions
313    }
314
315    fn rpc(&self) -> &RpcOperations<DefaultHttpRequestor> {
316        &self.rpc_operations
317    }
318
319    pub async fn get_indexer_state(&self) -> errors::Result<IndexerStateInfo> {
320        let indexer_state_info = self.db.get_indexer_state_info(None).await?;
321
322        match self.db.get_last_checksummed_log().await? {
323            Some(log) => {
324                let checksum = match log.checksum {
325                    Some(checksum) => Hash::from_hex(checksum.as_str())?,
326                    None => Hash::default(),
327                };
328                Ok(IndexerStateInfo {
329                    latest_log_block_number: log.block_number as u32,
330                    latest_log_checksum: checksum,
331                    ..indexer_state_info
332                })
333            }
334            None => Ok(indexer_state_info),
335        }
336    }
337}
338
339#[async_trait::async_trait]
340impl ChainReadAccountOperations for HoprChain {
341    type Error = HoprChainError;
342
343    async fn node_balance<C: Currency>(&self) -> std::result::Result<Balance<C>, Self::Error> {
344        let bal = if C::is::<XDai>() {
345            self.rpc_operations
346                .get_xdai_balance(self.me_onchain())
347                .await?
348                .to_be_bytes()
349        } else if C::is::<WxHOPR>() {
350            self.rpc_operations
351                .get_hopr_balance(self.me_onchain())
352                .await?
353                .to_be_bytes()
354        } else {
355            return Err(HoprChainError::Api("unsupported currency".into()));
356        };
357
358        Ok(Balance::<C>::from(U256::from_be_bytes(bal)))
359    }
360
361    async fn safe_balance<C: Currency>(&self) -> std::result::Result<Balance<C>, Self::Error> {
362        let bal = if C::is::<XDai>() {
363            self.rpc_operations
364                .get_xdai_balance(self.safe_address)
365                .await?
366                .to_be_bytes()
367        } else if C::is::<WxHOPR>() {
368            self.rpc_operations
369                .get_hopr_balance(self.safe_address)
370                .await?
371                .to_be_bytes()
372        } else {
373            return Err(HoprChainError::Api("unsupported currency".into()));
374        };
375
376        Ok(Balance::<C>::from(U256::from_be_bytes(bal)))
377    }
378
379    async fn safe_allowance<C: Currency>(&self) -> std::result::Result<Balance<C>, Self::Error> {
380        let amount = if C::is::<XDai>() {
381            return Err(HoprChainError::Api("unsupported currency".into()));
382        } else {
383            self.rpc_operations
384                .get_hopr_allowance(self.safe_address, self.contract_addresses.channels)
385                .await?
386                .amount()
387        };
388        Ok(Balance::<C>::from(amount))
389    }
390
391    async fn find_account_by_address(
392        &self,
393        address: &Address,
394    ) -> std::result::Result<Option<AccountEntry>, Self::Error> {
395        Ok(self.db.get_account(None, *address).await?)
396    }
397
398    async fn find_account_by_packet_key(
399        &self,
400        packet_key: &OffchainPublicKey,
401    ) -> std::result::Result<Option<AccountEntry>, Self::Error> {
402        Ok(self.db.get_account(None, *packet_key).await?)
403    }
404
405    async fn check_node_safe_module_status(&self) -> std::result::Result<bool, Self::Error> {
406        let safe_module_configuration = self
407            .rpc_operations
408            .check_node_safe_module_status(self.me_onchain())
409            .await?;
410        if !safe_module_configuration.should_pass() {
411            error!(
412                ?safe_module_configuration,
413                "Something is wrong with the safe module configuration",
414            );
415            Ok(false)
416        } else {
417            Ok(true)
418        }
419    }
420
421    async fn can_register_with_safe(&self, safe_address: &Address) -> std::result::Result<bool, Self::Error> {
422        let me = self.me_onchain.public().to_address();
423        let target_address = self.rpc().get_module_target_address().await?;
424        debug!(node_address = %me, %safe_address, %target_address, "can register with safe");
425
426        if &target_address != safe_address {
427            // cannot proceed when the safe address is not the target/owner of the given module
428            return Err(HoprChainError::Api("safe is not the module target".into()));
429        }
430
431        let registered_address = self.rpc().get_safe_from_node_safe_registry(me).await?;
432        info!(%registered_address, "currently registered Safe address in NodeSafeRegistry");
433
434        if registered_address.is_zero() {
435            info!("Node is not associated with a Safe in NodeSafeRegistry yet");
436            Ok(true)
437        } else if &registered_address != safe_address {
438            Err(HoprChainError::Api(
439                "Node is associated with a different Safe in NodeSafeRegistry".into(),
440            ))
441        } else {
442            info!("Node is associated with correct Safe in NodeSafeRegistry");
443            Ok(false)
444        }
445    }
446
447    async fn stream_accounts<'a>(
448        &'a self,
449        selector: AccountSelector,
450    ) -> std::result::Result<BoxStream<'a, AccountEntry>, Self::Error> {
451        Ok(self.db.stream_accounts(selector.public_only).await?)
452    }
453
454    async fn count_accounts(&self, selector: AccountSelector) -> std::result::Result<usize, Self::Error> {
455        Ok(self.db.stream_accounts(selector.public_only).await?.count().await)
456    }
457}
458
459#[async_trait::async_trait]
460impl ChainWriteAccountOperations for HoprChain {
461    type Error = HoprChainError;
462
463    async fn announce(
464        &self,
465        multiaddrs: &[Multiaddr],
466        key: &OffchainKeypair,
467    ) -> std::result::Result<
468        BoxFuture<'_, std::result::Result<ChainReceipt, Self::Error>>,
469        AnnouncementError<Self::Error>,
470    > {
471        Ok(self
472            .actions_ref()
473            .announce(multiaddrs, key)
474            .await
475            .map_err(|error| match error {
476                hopr_chain_actions::errors::ChainActionsError::AlreadyAnnounced => AnnouncementError::AlreadyAnnounced,
477                e => AnnouncementError::ProcessingError(HoprChainError::ActionsError(e)),
478            })?
479            .map(|r| r.map(|c| c.tx_hash).map_err(HoprChainError::from))
480            .boxed())
481    }
482
483    async fn withdraw<C: Currency + Send>(
484        &self,
485        balance: Balance<C>,
486        recipient: &Address,
487    ) -> std::result::Result<BoxFuture<'_, std::result::Result<ChainReceipt, Self::Error>>, Self::Error> {
488        Ok(self
489            .actions_ref()
490            .withdraw(*recipient, balance)
491            .await?
492            .map(|r| r.map(|c| c.tx_hash).map_err(HoprChainError::from))
493            .boxed())
494    }
495
496    async fn register_safe(
497        &self,
498        safe_address: &Address,
499    ) -> std::result::Result<BoxFuture<'_, std::result::Result<ChainReceipt, Self::Error>>, Self::Error> {
500        Ok(self
501            .actions_ref()
502            .register_safe_by_node(*safe_address)
503            .await?
504            .map(|r| r.map(|c| c.tx_hash).map_err(HoprChainError::from))
505            .boxed())
506    }
507}
508
509#[async_trait::async_trait]
510impl ChainReadChannelOperations for HoprChain {
511    type Error = HoprChainError;
512
513    fn me(&self) -> &Address {
514        self.me_onchain.public().as_ref()
515    }
516
517    async fn channel_by_parties(
518        &self,
519        src: &Address,
520        dst: &Address,
521    ) -> std::result::Result<Option<ChannelEntry>, Self::Error> {
522        Ok(self.db.get_channel_by_parties(None, src, dst, true).await?)
523    }
524
525    async fn channel_by_id(&self, channel_id: &ChannelId) -> std::result::Result<Option<ChannelEntry>, Self::Error> {
526        Ok(self.db.get_channel_by_id(None, channel_id).await?)
527    }
528
529    async fn stream_channels<'a>(
530        &'a self,
531        selector: ChannelSelector,
532    ) -> std::result::Result<BoxStream<'a, ChannelEntry>, Self::Error> {
533        Ok(self
534            .db
535            .stream_channels(
536                selector.source,
537                selector.destination,
538                &selector.allowed_states,
539                (selector.closure_time_range.0, selector.closure_time_range.1),
540            )
541            .await?)
542    }
543}
544
545#[async_trait::async_trait]
546impl ChainWriteChannelOperations for HoprChain {
547    type Error = HoprChainError;
548
549    async fn open_channel<'a>(
550        &'a self,
551        dst: &'a Address,
552        amount: HoprBalance,
553    ) -> std::result::Result<BoxFuture<'a, std::result::Result<(ChannelId, ChainReceipt), Self::Error>>, Self::Error>
554    {
555        let me = self.me_onchain();
556        Ok(self
557            .actions_ref()
558            .open_channel(*dst, amount)
559            .await?
560            .map(move |res| {
561                res.map(|c| (generate_channel_id(&me, dst), c.tx_hash))
562                    .map_err(HoprChainError::from)
563            })
564            .boxed())
565    }
566
567    async fn fund_channel<'a>(
568        &'a self,
569        channel_id: &'a ChannelId,
570        amount: HoprBalance,
571    ) -> std::result::Result<BoxFuture<'a, std::result::Result<ChainReceipt, Self::Error>>, Self::Error> {
572        Ok(self
573            .actions_ref()
574            .fund_channel(*channel_id, amount)
575            .await?
576            .map(|res| res.map(|c| c.tx_hash).map_err(HoprChainError::from))
577            .boxed())
578    }
579
580    async fn close_channel<'a>(
581        &'a self,
582        channel_id: &'a ChannelId,
583    ) -> std::result::Result<BoxFuture<'a, std::result::Result<(ChannelStatus, ChainReceipt), Self::Error>>, Self::Error>
584    {
585        let channel = self
586            .db
587            .get_channel_by_id(None, channel_id)
588            .await?
589            .ok_or(HoprChainError::Api("channel not found".into()))?;
590
591        Ok(self
592            .actions_ref()
593            .close_channel(channel)
594            .await?
595            .map(|res| {
596                res.and_then(|c| {
597                    let status = match c.event {
598                        Some(hopr_chain_types::chain_events::ChainEventType::ChannelClosed(_)) => ChannelStatus::Closed,
599                        Some(hopr_chain_types::chain_events::ChainEventType::ChannelClosureInitiated(c)) => c.status,
600                        _ => return Err(ChainActionsError::InvalidState("closure must have event type".into())),
601                    };
602
603                    Ok((status, c.tx_hash))
604                })
605                .map_err(HoprChainError::from)
606            })
607            .boxed())
608    }
609}
610
611#[async_trait::async_trait]
612impl ChainKeyOperations for HoprChain {
613    type Error = HoprChainError;
614    type Mapper = hopr_db_sql::CacheKeyMapper;
615
616    async fn chain_key_to_packet_key(
617        &self,
618        chain: &Address,
619    ) -> std::result::Result<Option<OffchainPublicKey>, Self::Error> {
620        match self.db.translate_key(None, *chain).await? {
621            None => Ok(None),
622            Some(key) => Ok(Some(key.try_into()?)),
623        }
624    }
625
626    async fn packet_key_to_chain_key(
627        &self,
628        packet: &OffchainPublicKey,
629    ) -> std::result::Result<Option<Address>, Self::Error> {
630        match self.db.translate_key(None, *packet).await? {
631            None => Ok(None),
632            Some(key) => Ok(Some(key.try_into()?)),
633        }
634    }
635
636    fn key_id_mapper_ref(&self) -> &Self::Mapper {
637        self.db.key_id_mapper_ref()
638    }
639}
640
641#[async_trait::async_trait]
642impl ChainValues for HoprChain {
643    type Error = HoprChainError;
644
645    async fn domain_separators(&self) -> std::result::Result<DomainSeparators, Self::Error> {
646        let indexer_data = self.db.get_indexer_data(None).await?;
647        Ok(DomainSeparators {
648            ledger: indexer_data
649                .ledger_dst
650                .ok_or(HoprChainError::Api("missing ledger dst".into()))?,
651            safe_registry: indexer_data
652                .safe_registry_dst
653                .ok_or(HoprChainError::Api("missing safe registry dst".into()))?,
654            channel: indexer_data
655                .channels_dst
656                .ok_or(HoprChainError::Api("missing channel dst".into()))?,
657        })
658    }
659
660    async fn minimum_incoming_ticket_win_prob(&self) -> std::result::Result<WinningProbability, Self::Error> {
661        let indexer_data = self.db.get_indexer_data(None).await?;
662        Ok(indexer_data.minimum_incoming_ticket_winning_prob)
663    }
664
665    async fn minimum_ticket_price(&self) -> std::result::Result<HoprBalance, Self::Error> {
666        let indexer_data = self.db.get_indexer_data(None).await?;
667        // The default minimum ticket price is 0
668        Ok(indexer_data.ticket_price.unwrap_or_default())
669    }
670
671    async fn channel_closure_notice_period(&self) -> std::result::Result<Duration, Self::Error> {
672        Ok(self.rpc_operations.get_channel_closure_notice_period().await?)
673    }
674}
675
676#[async_trait::async_trait]
677impl ChainWriteTicketOperations for HoprChain {
678    type Error = HoprChainError;
679
680    async fn redeem_ticket(
681        &self,
682        ticket: AcknowledgedTicket,
683    ) -> std::result::Result<BoxFuture<'_, std::result::Result<ChainReceipt, Self::Error>>, Self::Error> {
684        Ok(self
685            .actions_ref()
686            .redeem_ticket(ticket)
687            .await?
688            .map(|r| r.map(|c| c.tx_hash).map_err(HoprChainError::from))
689            .boxed())
690    }
691
692    async fn redeem_tickets_via_selector(
693        &self,
694        selector: TicketSelector,
695    ) -> std::result::Result<Vec<BoxFuture<'_, std::result::Result<ChainReceipt, Self::Error>>>, Self::Error> {
696        Ok(self
697            .actions_ref()
698            .redeem_tickets(selector.with_state(AcknowledgedTicketStatus::Untouched))
699            .await?
700            .into_iter()
701            .map(|r| r.map(|c| c.map(|ac| ac.tx_hash).map_err(HoprChainError::from)).boxed())
702            .collect())
703    }
704}
705
706impl ChainEvents for HoprChain {
707    type Error = HoprChainError;
708
709    fn subscribe(
710        &self,
711    ) -> std::result::Result<impl Stream<Item = SignificantChainEvent> + Send + 'static, Self::Error> {
712        if let Some(stream) = self
713            .indexer_events_rx
714            .lock()
715            .map_err(|_| HoprChainError::Api("failed to lock mutex".into()))?
716            .take()
717        {
718            let indexer_action_tracker = self.action_state.clone();
719            Ok(stream.then(move |event| {
720                let indexer_action_tracker = indexer_action_tracker.clone();
721                async move {
722                    let resolved = indexer_action_tracker.match_and_resolve(&event).await;
723                    if resolved.is_empty() {
724                        trace!(%event, "No indexer expectations resolved for the event");
725                    } else {
726                        debug!(count = resolved.len(), %event, "resolved indexer expectations");
727                    }
728                    event
729                }
730            }))
731        } else {
732            Err(HoprChainError::Api("cannot subscribe more than once".into()))
733        }
734    }
735}