hopr_chain_connector/connector/
events.rs

1use ahash::HashSet;
2use futures::StreamExt;
3use hopr_api::chain::{AccountSelector, ChainEvent, ChannelSelector, StateSyncOptions};
4use hopr_internal_types::channels::ChannelStatusDiscriminants;
5
6use crate::{Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
7
8impl<B, C, P, R> hopr_api::chain::ChainEvents for HoprBlockchainConnector<C, B, P, R>
9where
10    B: Backend + Send + Sync + 'static,
11{
12    type Error = ConnectorError;
13
14    fn subscribe_with_state_sync<I: IntoIterator<Item = StateSyncOptions>>(
15        &self,
16        options: I,
17    ) -> Result<impl futures::Stream<Item = ChainEvent> + Send + 'static, Self::Error> {
18        self.check_connection_state()?;
19
20        let options = options.into_iter().collect::<HashSet<_>>();
21
22        let mut state_stream = futures_concurrency::stream::StreamGroup::new();
23        if options.contains(&StateSyncOptions::PublicAccounts) && !options.contains(&StateSyncOptions::AllAccounts) {
24            let stream = self
25                .build_account_stream(AccountSelector::default().with_public_only(true))?
26                .map(ChainEvent::Announcement);
27            state_stream.insert(stream.boxed());
28        }
29
30        if options.contains(&StateSyncOptions::AllAccounts) {
31            let stream = self
32                .build_account_stream(AccountSelector::default().with_public_only(false))?
33                .map(ChainEvent::Announcement);
34            state_stream.insert(stream.boxed());
35        }
36
37        if options.contains(&StateSyncOptions::OpenedChannels) {
38            let stream = self
39                .build_channel_stream(
40                    ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]),
41                )?
42                .map(ChainEvent::ChannelOpened);
43            state_stream.insert(stream.boxed());
44        }
45
46        Ok(state_stream.chain(self.events.1.activate_cloned()))
47    }
48}
49
50#[cfg(test)]
51mod tests {
52    use std::time::Duration;
53
54    use futures::StreamExt;
55    use hex_literal::hex;
56    use hopr_api::chain::{
57        ChainEvent, ChainEvents, ChainWriteAccountOperations, ChainWriteChannelOperations, DeployedSafe,
58        StateSyncOptions,
59    };
60    use hopr_crypto_types::prelude::*;
61    use hopr_internal_types::prelude::*;
62    use hopr_primitive_types::prelude::*;
63
64    use crate::{
65        connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
66        testing::BlokliTestStateBuilder,
67    };
68
69    #[tokio::test]
70    async fn connector_should_stream_new_events() -> anyhow::Result<()> {
71        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
72            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
73        ))?;
74        let account_2 = AccountEntry {
75            public_key: *offchain_key_2.public(),
76            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
77            entry_type: AccountType::NotAnnounced,
78            safe_address: Some([2u8; Address::SIZE].into()),
79            key_id: 1.into(),
80        };
81
82        let blokli_client = BlokliTestStateBuilder::default()
83            .with_accounts([(account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
84            .with_balances([(
85                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
86                XDaiBalance::new_base(1),
87            )])
88            .with_balances([([3u8; Address::SIZE].into(), HoprBalance::new_base(100))])
89            .with_safe_allowances([([3u8; Address::SIZE].into(), HoprBalance::new_base(10000))])
90            .with_deployed_safes([DeployedSafe {
91                address: [3u8; Address::SIZE].into(),
92                owner: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
93                module: MODULE_ADDR.into(),
94                registered_nodes: vec![],
95            }])
96            .with_hopr_network_chain_info("rotsee")
97            .build_dynamic_client(MODULE_ADDR.into())
98            .with_tx_simulation_delay(Duration::from_millis(100));
99
100        let mut connector = create_connector(blokli_client)?;
101        connector.connect().await?;
102
103        let jh = tokio::task::spawn(connector.subscribe()?.take(2).collect::<Vec<_>>());
104
105        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
106            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
107        ))?;
108        let multiaddress: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
109
110        connector.register_safe(&[3u8; Address::SIZE].into()).await?.await?;
111
112        connector
113            .announce(&[multiaddress.clone()], &offchain_key_1)
114            .await?
115            .await?;
116
117        connector.open_channel(&account_2.chain_addr, 10.into()).await?.await?;
118
119        let events = jh.await?;
120
121        assert!(
122            matches!(&events[0], ChainEvent::Announcement(acc) if &acc.public_key == offchain_key_1.public() && acc.entry_type == AccountType::Announced(vec![multiaddress]))
123        );
124        assert!(
125            matches!(&events[1], ChainEvent::ChannelOpened(channel) if channel.get_id() == &generate_channel_id(&ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(), &account_2.chain_addr))
126        );
127
128        Ok(())
129    }
130
131    #[tokio::test]
132    async fn connector_should_stream_existing_state() -> anyhow::Result<()> {
133        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
134            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
135        ))?;
136        let account_1 = AccountEntry {
137            public_key: *offchain_key_1.public(),
138            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
139            entry_type: AccountType::Announced(vec!["/ip4/1.2.3.4/tcp/1234".parse()?]),
140            safe_address: Some([1u8; Address::SIZE].into()),
141            key_id: 1.into(),
142        };
143        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
144            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
145        ))?;
146        let account_2 = AccountEntry {
147            public_key: *offchain_key_2.public(),
148            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
149            entry_type: AccountType::NotAnnounced,
150            safe_address: Some([2u8; Address::SIZE].into()),
151            key_id: 2.into(),
152        };
153
154        let channel_1 = ChannelEntry::new(
155            ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
156            ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
157            10.into(),
158            1,
159            ChannelStatus::Open,
160            1,
161        );
162
163        let channel_2 = ChannelEntry::new(
164            ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
165            ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
166            15.into(),
167            2,
168            ChannelStatus::PendingToClose(std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10)),
169            1,
170        );
171
172        let blokli_client = BlokliTestStateBuilder::default()
173            .with_accounts([
174                (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
175                (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
176            ])
177            .with_channels([channel_1, channel_2])
178            .with_hopr_network_chain_info("rotsee")
179            .build_static_client();
180
181        let mut connector = create_connector(blokli_client)?;
182        connector.connect().await?;
183
184        let accounts = connector
185            .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])?
186            .take(1)
187            .collect::<Vec<_>>()
188            .await;
189        assert!(matches!(&accounts[0], ChainEvent::Announcement(acc) if acc == &account_1));
190
191        let accounts = connector
192            .subscribe_with_state_sync([StateSyncOptions::AllAccounts])?
193            .take(2)
194            .collect::<Vec<_>>()
195            .await;
196        assert!(matches!(&accounts[0], ChainEvent::Announcement(acc) if acc == &account_1));
197        assert!(matches!(&accounts[1], ChainEvent::Announcement(acc) if acc == &account_2));
198
199        let channels = connector
200            .subscribe_with_state_sync([StateSyncOptions::OpenedChannels])?
201            .take(1)
202            .collect::<Vec<_>>()
203            .await;
204        assert!(matches!(&channels[0], ChainEvent::ChannelOpened(ch) if ch == &channel_1));
205
206        Ok(())
207    }
208}