hopr_chain_connector/connector/
events.rs1use ahash::HashSet;
2use futures::StreamExt;
3use hopr_api::chain::{AccountSelector, ChainEvent, ChannelSelector, StateSyncOptions};
4use hopr_internal_types::channels::ChannelStatusDiscriminants;
5
6use crate::{Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
7
8impl<B, C, P, R> hopr_api::chain::ChainEvents for HoprBlockchainConnector<C, B, P, R>
9where
10 B: Backend + Send + Sync + 'static,
11{
12 type Error = ConnectorError;
13
14 fn subscribe_with_state_sync<I: IntoIterator<Item = StateSyncOptions>>(
15 &self,
16 options: I,
17 ) -> Result<impl futures::Stream<Item = ChainEvent> + Send + 'static, Self::Error> {
18 self.check_connection_state()?;
19
20 let options = options.into_iter().collect::<HashSet<_>>();
21
22 let mut state_stream = futures_concurrency::stream::StreamGroup::new();
23 if options.contains(&StateSyncOptions::PublicAccounts) && !options.contains(&StateSyncOptions::AllAccounts) {
24 let stream = self
25 .build_account_stream(AccountSelector::default().with_public_only(true))?
26 .map(ChainEvent::Announcement);
27 state_stream.insert(stream.boxed());
28 }
29
30 if options.contains(&StateSyncOptions::AllAccounts) {
31 let stream = self
32 .build_account_stream(AccountSelector::default().with_public_only(false))?
33 .map(ChainEvent::Announcement);
34 state_stream.insert(stream.boxed());
35 }
36
37 if options.contains(&StateSyncOptions::OpenedChannels) {
38 let stream = self
39 .build_channel_stream(
40 ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]),
41 )?
42 .map(ChainEvent::ChannelOpened);
43 state_stream.insert(stream.boxed());
44 }
45
46 Ok(state_stream.chain(self.events.1.activate_cloned()))
47 }
48}