hopr_chain_connector/connector/
events.rs1use ahash::HashSet;
2use futures::StreamExt;
3use hopr_api::{
4 chain::{AccountSelector, ChainEvent, ChannelSelector, StateSyncOptions},
5 types::internal::channels::ChannelStatusDiscriminants,
6};
7
8use crate::{Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
9
10impl<B, C, P, R> hopr_api::chain::ChainEvents for HoprBlockchainConnector<C, B, P, R>
11where
12 B: Backend + Send + Sync + 'static,
13{
14 type Error = ConnectorError;
15
16 fn subscribe_with_state_sync<I: IntoIterator<Item = StateSyncOptions>>(
17 &self,
18 options: I,
19 ) -> Result<impl futures::Stream<Item = ChainEvent> + Send + 'static, Self::Error> {
20 self.check_connection_state()?;
21
22 let options = options.into_iter().collect::<HashSet<_>>();
23
24 let mut state_stream = futures_concurrency::stream::StreamGroup::new();
25 if options.contains(&StateSyncOptions::PublicAccounts) && !options.contains(&StateSyncOptions::AllAccounts) {
26 let stream = self
27 .build_account_stream(AccountSelector::default().with_public_only(true))?
28 .map(ChainEvent::Announcement);
29 state_stream.insert(stream.boxed());
30 }
31
32 if options.contains(&StateSyncOptions::AllAccounts) {
33 let stream = self
34 .build_account_stream(AccountSelector::default().with_public_only(false))?
35 .map(ChainEvent::Announcement);
36 state_stream.insert(stream.boxed());
37 }
38
39 if options.contains(&StateSyncOptions::OpenedChannels) {
40 let stream = self
41 .build_channel_stream(
42 ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]),
43 )?
44 .map(ChainEvent::ChannelOpened);
45 state_stream.insert(stream.boxed());
46 }
47
48 Ok(state_stream.chain(self.events.1.activate_cloned()))
49 }
50}
51
52#[cfg(test)]
53mod tests {
54 use std::time::Duration;
55
56 use futures::StreamExt;
57 use hex_literal::hex;
58 use hopr_api::{
59 chain::{
60 ChainEvent, ChainEvents, ChainWriteAccountOperations, ChainWriteChannelOperations, DeployedSafe,
61 StateSyncOptions,
62 },
63 types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
64 };
65
66 use crate::{
67 connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
68 testing::BlokliTestStateBuilder,
69 };
70
71 #[tokio::test]
72 async fn connector_should_stream_new_events() -> anyhow::Result<()> {
73 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
74 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
75 ))?;
76 let account_2 = AccountEntry {
77 public_key: *offchain_key_2.public(),
78 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
79 entry_type: AccountType::NotAnnounced,
80 safe_address: Some([2u8; Address::SIZE].into()),
81 key_id: 1.into(),
82 };
83 let deployer_addr = ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address();
84
85 let blokli_client = BlokliTestStateBuilder::default()
86 .with_accounts([(account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
87 .with_balances([(
88 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
89 XDaiBalance::new_base(1),
90 )])
91 .with_balances([([3u8; Address::SIZE].into(), HoprBalance::new_base(100))])
92 .with_safe_allowances([([3u8; Address::SIZE].into(), HoprBalance::new_base(10000))])
93 .with_deployed_safes([DeployedSafe {
94 address: [3u8; Address::SIZE].into(),
95 owners: vec![deployer_addr],
96 module: MODULE_ADDR.into(),
97 registered_nodes: vec![],
98 deployer: deployer_addr,
99 }])
100 .with_hopr_network_chain_info("rotsee")
101 .build_dynamic_client(MODULE_ADDR.into())
102 .with_tx_simulation_delay(Duration::from_millis(100));
103
104 let mut connector = create_connector(blokli_client)?;
105 connector.connect().await?;
106
107 let jh = tokio::task::spawn(connector.subscribe()?.take(2).collect::<Vec<_>>());
108
109 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
110 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
111 ))?;
112 let multiaddress: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
113
114 connector.register_safe(&[3u8; Address::SIZE].into()).await?.await?;
115
116 connector
117 .announce(std::slice::from_ref(&multiaddress), &offchain_key_1)
118 .await?
119 .await?;
120
121 connector.open_channel(&account_2.chain_addr, 10.into()).await?.await?;
122
123 let events = jh.await?;
124
125 assert!(
126 matches!(&events[0], ChainEvent::Announcement(acc) if &acc.public_key == offchain_key_1.public() && acc.entry_type == AccountType::Announced(vec![multiaddress]))
127 );
128 assert!(
129 matches!(&events[1], ChainEvent::ChannelOpened(channel) if channel.get_id() == &generate_channel_id(&ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(), &account_2.chain_addr))
130 );
131
132 Ok(())
133 }
134
135 #[tokio::test]
136 async fn connector_should_stream_existing_state() -> anyhow::Result<()> {
137 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
138 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
139 ))?;
140 let account_1 = AccountEntry {
141 public_key: *offchain_key_1.public(),
142 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
143 entry_type: AccountType::Announced(vec!["/ip4/1.2.3.4/tcp/1234".parse()?]),
144 safe_address: Some([1u8; Address::SIZE].into()),
145 key_id: 1.into(),
146 };
147 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
148 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
149 ))?;
150 let account_2 = AccountEntry {
151 public_key: *offchain_key_2.public(),
152 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
153 entry_type: AccountType::NotAnnounced,
154 safe_address: Some([2u8; Address::SIZE].into()),
155 key_id: 2.into(),
156 };
157
158 let channel_1 = ChannelEntry::builder()
159 .between(
160 &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
161 &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
162 )
163 .amount(10)
164 .ticket_index(1)
165 .status(ChannelStatus::Open)
166 .epoch(1)
167 .build()?;
168
169 let channel_2 = ChannelEntry::builder()
170 .between(
171 &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
172 &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
173 )
174 .amount(15)
175 .ticket_index(2)
176 .status(ChannelStatus::PendingToClose(
177 std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10),
178 ))
179 .epoch(1)
180 .build()?;
181
182 let blokli_client = BlokliTestStateBuilder::default()
183 .with_accounts([
184 (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
185 (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
186 ])
187 .with_channels([channel_1, channel_2])
188 .with_hopr_network_chain_info("rotsee")
189 .build_static_client();
190
191 let mut connector = create_connector(blokli_client)?;
192 connector.connect().await?;
193
194 let accounts = connector
195 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])?
196 .take(1)
197 .collect::<Vec<_>>()
198 .await;
199 assert!(matches!(&accounts[0], ChainEvent::Announcement(acc) if acc == &account_1));
200
201 let accounts = connector
202 .subscribe_with_state_sync([StateSyncOptions::AllAccounts])?
203 .take(2)
204 .collect::<Vec<_>>()
205 .await;
206 assert!(matches!(&accounts[0], ChainEvent::Announcement(acc) if acc == &account_1));
207 assert!(matches!(&accounts[1], ChainEvent::Announcement(acc) if acc == &account_2));
208
209 let channels = connector
210 .subscribe_with_state_sync([StateSyncOptions::OpenedChannels])?
211 .take(1)
212 .collect::<Vec<_>>()
213 .await;
214 assert!(matches!(&channels[0], ChainEvent::ChannelOpened(ch) if ch == &channel_1));
215
216 Ok(())
217 }
218}