Skip to main content

hopr_chain_connector/connector/
events.rs

1use ahash::HashSet;
2use futures::StreamExt;
3use hopr_api::{
4    chain::{AccountSelector, ChainEvent, ChannelSelector, StateSyncOptions},
5    types::internal::channels::ChannelStatusDiscriminants,
6};
7
8use crate::{Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
9
10impl<B, C, P, R> hopr_api::chain::ChainEvents for HoprBlockchainConnector<C, B, P, R>
11where
12    B: Backend + Send + Sync + 'static,
13{
14    type Error = ConnectorError;
15
16    fn subscribe_with_state_sync<I: IntoIterator<Item = StateSyncOptions>>(
17        &self,
18        options: I,
19    ) -> Result<impl futures::Stream<Item = ChainEvent> + Send + 'static, Self::Error> {
20        self.check_connection_state()?;
21
22        let options = options.into_iter().collect::<HashSet<_>>();
23
24        let mut state_stream = futures_concurrency::stream::StreamGroup::new();
25        if options.contains(&StateSyncOptions::PublicAccounts) && !options.contains(&StateSyncOptions::AllAccounts) {
26            let stream = self
27                .build_account_stream(AccountSelector::default().with_public_only(true))?
28                .map(ChainEvent::Announcement);
29            state_stream.insert(stream.boxed());
30        }
31
32        if options.contains(&StateSyncOptions::AllAccounts) {
33            let stream = self
34                .build_account_stream(AccountSelector::default().with_public_only(false))?
35                .map(ChainEvent::Announcement);
36            state_stream.insert(stream.boxed());
37        }
38
39        if options.contains(&StateSyncOptions::OpenedChannels) {
40            let stream = self
41                .build_channel_stream(
42                    ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]),
43                )?
44                .map(ChainEvent::ChannelOpened);
45            state_stream.insert(stream.boxed());
46        }
47
48        Ok(state_stream.chain(self.events.1.activate_cloned()))
49    }
50}
51
52#[cfg(test)]
53mod tests {
54    use std::time::Duration;
55
56    use futures::StreamExt;
57    use hex_literal::hex;
58    use hopr_api::{
59        chain::{
60            ChainEvent, ChainEvents, ChainWriteAccountOperations, ChainWriteChannelOperations, DeployedSafe,
61            StateSyncOptions,
62        },
63        types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
64    };
65
66    use crate::{
67        connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
68        testing::BlokliTestStateBuilder,
69    };
70
71    #[tokio::test]
72    async fn connector_should_stream_new_events() -> anyhow::Result<()> {
73        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
74            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
75        ))?;
76        let account_2 = AccountEntry {
77            public_key: *offchain_key_2.public(),
78            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
79            entry_type: AccountType::NotAnnounced,
80            safe_address: Some([2u8; Address::SIZE].into()),
81            key_id: 1.into(),
82        };
83
84        let blokli_client = BlokliTestStateBuilder::default()
85            .with_accounts([(account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
86            .with_balances([(
87                ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
88                XDaiBalance::new_base(1),
89            )])
90            .with_balances([([3u8; Address::SIZE].into(), HoprBalance::new_base(100))])
91            .with_safe_allowances([([3u8; Address::SIZE].into(), HoprBalance::new_base(10000))])
92            .with_deployed_safes([DeployedSafe {
93                address: [3u8; Address::SIZE].into(),
94                owner: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
95                module: MODULE_ADDR.into(),
96                registered_nodes: vec![],
97            }])
98            .with_hopr_network_chain_info("rotsee")
99            .build_dynamic_client(MODULE_ADDR.into())
100            .with_tx_simulation_delay(Duration::from_millis(100));
101
102        let mut connector = create_connector(blokli_client)?;
103        connector.connect().await?;
104
105        let jh = tokio::task::spawn(connector.subscribe()?.take(2).collect::<Vec<_>>());
106
107        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
108            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
109        ))?;
110        let multiaddress: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
111
112        connector.register_safe(&[3u8; Address::SIZE].into()).await?.await?;
113
114        connector
115            .announce(std::slice::from_ref(&multiaddress), &offchain_key_1)
116            .await?
117            .await?;
118
119        connector.open_channel(&account_2.chain_addr, 10.into()).await?.await?;
120
121        let events = jh.await?;
122
123        assert!(
124            matches!(&events[0], ChainEvent::Announcement(acc) if &acc.public_key == offchain_key_1.public() && acc.entry_type == AccountType::Announced(vec![multiaddress]))
125        );
126        assert!(
127            matches!(&events[1], ChainEvent::ChannelOpened(channel) if channel.get_id() == &generate_channel_id(&ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(), &account_2.chain_addr))
128        );
129
130        Ok(())
131    }
132
133    #[tokio::test]
134    async fn connector_should_stream_existing_state() -> anyhow::Result<()> {
135        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
136            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
137        ))?;
138        let account_1 = AccountEntry {
139            public_key: *offchain_key_1.public(),
140            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
141            entry_type: AccountType::Announced(vec!["/ip4/1.2.3.4/tcp/1234".parse()?]),
142            safe_address: Some([1u8; Address::SIZE].into()),
143            key_id: 1.into(),
144        };
145        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
146            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
147        ))?;
148        let account_2 = AccountEntry {
149            public_key: *offchain_key_2.public(),
150            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
151            entry_type: AccountType::NotAnnounced,
152            safe_address: Some([2u8; Address::SIZE].into()),
153            key_id: 2.into(),
154        };
155
156        let channel_1 = ChannelEntry::builder()
157            .between(
158                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
159                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
160            )
161            .amount(10)
162            .ticket_index(1)
163            .status(ChannelStatus::Open)
164            .epoch(1)
165            .build()?;
166
167        let channel_2 = ChannelEntry::builder()
168            .between(
169                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
170                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
171            )
172            .amount(15)
173            .ticket_index(2)
174            .status(ChannelStatus::PendingToClose(
175                std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10),
176            ))
177            .epoch(1)
178            .build()?;
179
180        let blokli_client = BlokliTestStateBuilder::default()
181            .with_accounts([
182                (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
183                (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
184            ])
185            .with_channels([channel_1, channel_2])
186            .with_hopr_network_chain_info("rotsee")
187            .build_static_client();
188
189        let mut connector = create_connector(blokli_client)?;
190        connector.connect().await?;
191
192        let accounts = connector
193            .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])?
194            .take(1)
195            .collect::<Vec<_>>()
196            .await;
197        assert!(matches!(&accounts[0], ChainEvent::Announcement(acc) if acc == &account_1));
198
199        let accounts = connector
200            .subscribe_with_state_sync([StateSyncOptions::AllAccounts])?
201            .take(2)
202            .collect::<Vec<_>>()
203            .await;
204        assert!(matches!(&accounts[0], ChainEvent::Announcement(acc) if acc == &account_1));
205        assert!(matches!(&accounts[1], ChainEvent::Announcement(acc) if acc == &account_2));
206
207        let channels = connector
208            .subscribe_with_state_sync([StateSyncOptions::OpenedChannels])?
209            .take(1)
210            .collect::<Vec<_>>()
211            .await;
212        assert!(matches!(&channels[0], ChainEvent::ChannelOpened(ch) if ch == &channel_1));
213
214        Ok(())
215    }
216}