Skip to main content

hopr_chain_connector/connector/
events.rs

1use ahash::HashSet;
2use futures::StreamExt;
3use hopr_api::{
4    chain::{AccountSelector, ChainEvent, ChannelSelector, StateSyncOptions},
5    types::internal::channels::ChannelStatusDiscriminants,
6};
7
8use crate::{Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
9
10impl<B, C, P, R> hopr_api::chain::ChainEvents for HoprBlockchainConnector<C, B, P, R>
11where
12    B: Backend + Send + Sync + 'static,
13{
14    type Error = ConnectorError;
15
16    fn subscribe_with_state_sync<I: IntoIterator<Item = StateSyncOptions>>(
17        &self,
18        options: I,
19    ) -> Result<impl futures::Stream<Item = ChainEvent> + Send + 'static, Self::Error> {
20        self.check_connection_state()?;
21
22        let options = options.into_iter().collect::<HashSet<_>>();
23
24        let mut state_stream = futures_concurrency::stream::StreamGroup::new();
25        if options.contains(&StateSyncOptions::PublicAccounts) && !options.contains(&StateSyncOptions::AllAccounts) {
26            let stream = self
27                .build_account_stream(AccountSelector::default().with_public_only(true))?
28                .map(ChainEvent::Announcement);
29            state_stream.insert(stream.boxed());
30        }
31
32        if options.contains(&StateSyncOptions::AllAccounts) {
33            let stream = self
34                .build_account_stream(AccountSelector::default().with_public_only(false))?
35                .map(ChainEvent::Announcement);
36            state_stream.insert(stream.boxed());
37        }
38
39        if options.contains(&StateSyncOptions::OpenedChannels) {
40            let stream = self
41                .build_channel_stream(
42                    ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]),
43                )?
44                .map(ChainEvent::ChannelOpened);
45            state_stream.insert(stream.boxed());
46        }
47
48        Ok(state_stream.chain(self.events.1.activate_cloned()))
49    }
50}
51
52#[cfg(test)]
53mod tests {
54    use std::time::Duration;
55
56    use futures::StreamExt;
57    use hex_literal::hex;
58    use hopr_api::{
59        chain::{
60            ChainEvent, ChainEvents, ChainWriteAccountOperations, ChainWriteChannelOperations, DeployedSafe,
61            StateSyncOptions,
62        },
63        types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
64    };
65
66    use crate::{
67        connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
68        testing::BlokliTestStateBuilder,
69    };
70
71    #[tokio::test]
72    async fn connector_should_stream_new_events() -> anyhow::Result<()> {
73        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
74            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
75        ))?;
76        let account_2 = AccountEntry {
77            public_key: *offchain_key_2.public(),
78            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
79            entry_type: AccountType::NotAnnounced,
80            safe_address: Some([2u8; Address::SIZE].into()),
81            key_id: 1.into(),
82        };
83        let deployer_addr = ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address();
84
85        let blokli_client = BlokliTestStateBuilder::default()
86            .with_accounts([(account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
87            .with_balances([(
88                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
89                XDaiBalance::new_base(1),
90            )])
91            .with_balances([([3u8; Address::SIZE].into(), HoprBalance::new_base(100))])
92            .with_safe_allowances([([3u8; Address::SIZE].into(), HoprBalance::new_base(10000))])
93            .with_deployed_safes([DeployedSafe {
94                address: [3u8; Address::SIZE].into(),
95                owners: vec![deployer_addr],
96                module: MODULE_ADDR.into(),
97                registered_nodes: vec![],
98                deployer: deployer_addr,
99            }])
100            .with_hopr_network_chain_info("rotsee")
101            .build_dynamic_client(MODULE_ADDR.into())
102            .with_tx_simulation_delay(Duration::from_millis(100));
103
104        let mut connector = create_connector(blokli_client)?;
105        connector.connect().await?;
106
107        let jh = tokio::task::spawn(connector.subscribe()?.take(2).collect::<Vec<_>>());
108
109        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
110            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
111        ))?;
112        let multiaddress: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
113
114        connector.register_safe(&[3u8; Address::SIZE].into()).await?.await?;
115
116        connector
117            .announce(std::slice::from_ref(&multiaddress), &offchain_key_1)
118            .await?
119            .await?;
120
121        connector.open_channel(&account_2.chain_addr, 10.into()).await?.await?;
122
123        let events = jh.await?;
124
125        assert!(
126            matches!(&events[0], ChainEvent::Announcement(acc) if &acc.public_key == offchain_key_1.public() && acc.entry_type == AccountType::Announced(vec![multiaddress]))
127        );
128        assert!(
129            matches!(&events[1], ChainEvent::ChannelOpened(channel) if channel.get_id() == &generate_channel_id(&ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(), &account_2.chain_addr))
130        );
131
132        Ok(())
133    }
134
135    #[tokio::test]
136    async fn connector_should_stream_existing_state() -> anyhow::Result<()> {
137        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
138            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
139        ))?;
140        let account_1 = AccountEntry {
141            public_key: *offchain_key_1.public(),
142            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
143            entry_type: AccountType::Announced(vec!["/ip4/1.2.3.4/tcp/1234".parse()?]),
144            safe_address: Some([1u8; Address::SIZE].into()),
145            key_id: 1.into(),
146        };
147        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
148            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
149        ))?;
150        let account_2 = AccountEntry {
151            public_key: *offchain_key_2.public(),
152            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
153            entry_type: AccountType::NotAnnounced,
154            safe_address: Some([2u8; Address::SIZE].into()),
155            key_id: 2.into(),
156        };
157
158        let channel_1 = ChannelEntry::builder()
159            .between(
160                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
161                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
162            )
163            .amount(10)
164            .ticket_index(1)
165            .status(ChannelStatus::Open)
166            .epoch(1)
167            .build()?;
168
169        let channel_2 = ChannelEntry::builder()
170            .between(
171                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
172                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
173            )
174            .amount(15)
175            .ticket_index(2)
176            .status(ChannelStatus::PendingToClose(
177                std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10),
178            ))
179            .epoch(1)
180            .build()?;
181
182        let blokli_client = BlokliTestStateBuilder::default()
183            .with_accounts([
184                (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
185                (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
186            ])
187            .with_channels([channel_1, channel_2])
188            .with_hopr_network_chain_info("rotsee")
189            .build_static_client();
190
191        let mut connector = create_connector(blokli_client)?;
192        connector.connect().await?;
193
194        let accounts = connector
195            .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])?
196            .take(1)
197            .collect::<Vec<_>>()
198            .await;
199        assert!(matches!(&accounts[0], ChainEvent::Announcement(acc) if acc == &account_1));
200
201        let accounts = connector
202            .subscribe_with_state_sync([StateSyncOptions::AllAccounts])?
203            .take(2)
204            .collect::<Vec<_>>()
205            .await;
206        assert!(matches!(&accounts[0], ChainEvent::Announcement(acc) if acc == &account_1));
207        assert!(matches!(&accounts[1], ChainEvent::Announcement(acc) if acc == &account_2));
208
209        let channels = connector
210            .subscribe_with_state_sync([StateSyncOptions::OpenedChannels])?
211            .take(1)
212            .collect::<Vec<_>>()
213            .await;
214        assert!(matches!(&channels[0], ChainEvent::ChannelOpened(ch) if ch == &channel_1));
215
216        Ok(())
217    }
218}