hopr_chain_connector/connector/
events.rs

1use ahash::HashSet;
2use futures::StreamExt;
3use hopr_api::chain::{AccountSelector, ChainEvent, ChannelSelector, StateSyncOptions};
4use hopr_internal_types::channels::ChannelStatusDiscriminants;
5
6use crate::{Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
7
8impl<B, C, P, R> hopr_api::chain::ChainEvents for HoprBlockchainConnector<C, B, P, R>
9where
10    B: Backend + Send + Sync + 'static,
11{
12    type Error = ConnectorError;
13
14    fn subscribe_with_state_sync<I: IntoIterator<Item = StateSyncOptions>>(
15        &self,
16        options: I,
17    ) -> Result<impl futures::Stream<Item = ChainEvent> + Send + 'static, Self::Error> {
18        self.check_connection_state()?;
19
20        let options = options.into_iter().collect::<HashSet<_>>();
21
22        let mut state_stream = futures_concurrency::stream::StreamGroup::new();
23        if options.contains(&StateSyncOptions::PublicAccounts) && !options.contains(&StateSyncOptions::AllAccounts) {
24            let stream = self
25                .build_account_stream(AccountSelector::default().with_public_only(true))?
26                .map(ChainEvent::Announcement);
27            state_stream.insert(stream.boxed());
28        }
29
30        if options.contains(&StateSyncOptions::AllAccounts) {
31            let stream = self
32                .build_account_stream(AccountSelector::default().with_public_only(false))?
33                .map(ChainEvent::Announcement);
34            state_stream.insert(stream.boxed());
35        }
36
37        if options.contains(&StateSyncOptions::OpenedChannels) {
38            let stream = self
39                .build_channel_stream(
40                    ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]),
41                )?
42                .map(ChainEvent::ChannelOpened);
43            state_stream.insert(stream.boxed());
44        }
45
46        Ok(state_stream.chain(self.events.1.activate_cloned()))
47    }
48}