hopr_chain_connector/connector/
events.rs1use ahash::HashSet;
2use futures::StreamExt;
3use hopr_api::{
4 chain::{AccountSelector, ChainEvent, ChannelSelector, StateSyncOptions},
5 types::internal::channels::ChannelStatusDiscriminants,
6};
7
8use crate::{Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
9
10impl<B, C, P, R> hopr_api::chain::ChainEvents for HoprBlockchainConnector<C, B, P, R>
11where
12 B: Backend + Send + Sync + 'static,
13{
14 type Error = ConnectorError;
15
16 fn subscribe_with_state_sync<I: IntoIterator<Item = StateSyncOptions>>(
17 &self,
18 options: I,
19 ) -> Result<impl futures::Stream<Item = ChainEvent> + Send + 'static, Self::Error> {
20 self.check_connection_state()?;
21
22 let options = options.into_iter().collect::<HashSet<_>>();
23
24 let mut state_stream = futures_concurrency::stream::StreamGroup::new();
25 if options.contains(&StateSyncOptions::PublicAccounts) && !options.contains(&StateSyncOptions::AllAccounts) {
26 let stream = self
27 .build_account_stream(AccountSelector::default().with_public_only(true))?
28 .map(ChainEvent::Announcement);
29 state_stream.insert(stream.boxed());
30 }
31
32 if options.contains(&StateSyncOptions::AllAccounts) {
33 let stream = self
34 .build_account_stream(AccountSelector::default().with_public_only(false))?
35 .map(ChainEvent::Announcement);
36 state_stream.insert(stream.boxed());
37 }
38
39 if options.contains(&StateSyncOptions::OpenedChannels) {
40 let stream = self
41 .build_channel_stream(
42 ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]),
43 )?
44 .map(ChainEvent::ChannelOpened);
45 state_stream.insert(stream.boxed());
46 }
47
48 Ok(state_stream.chain(self.events.1.activate_cloned()))
49 }
50}
51
52#[cfg(test)]
53mod tests {
54 use std::time::Duration;
55
56 use futures::StreamExt;
57 use hex_literal::hex;
58 use hopr_api::{
59 chain::{
60 ChainEvent, ChainEvents, ChainWriteAccountOperations, ChainWriteChannelOperations, DeployedSafe,
61 StateSyncOptions,
62 },
63 types::{crypto::prelude::*, internal::prelude::*, primitive::prelude::*},
64 };
65
66 use crate::{
67 connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
68 testing::BlokliTestStateBuilder,
69 };
70
71 #[tokio::test]
72 async fn connector_should_stream_new_events() -> anyhow::Result<()> {
73 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
74 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
75 ))?;
76 let account_2 = AccountEntry {
77 public_key: *offchain_key_2.public(),
78 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
79 entry_type: AccountType::NotAnnounced,
80 safe_address: Some([2u8; Address::SIZE].into()),
81 key_id: 1.into(),
82 };
83
84 let blokli_client = BlokliTestStateBuilder::default()
85 .with_accounts([(account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
86 .with_balances([(
87 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
88 XDaiBalance::new_base(1),
89 )])
90 .with_balances([([3u8; Address::SIZE].into(), HoprBalance::new_base(100))])
91 .with_safe_allowances([([3u8; Address::SIZE].into(), HoprBalance::new_base(10000))])
92 .with_deployed_safes([DeployedSafe {
93 address: [3u8; Address::SIZE].into(),
94 owner: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
95 module: MODULE_ADDR.into(),
96 registered_nodes: vec![],
97 }])
98 .with_hopr_network_chain_info("rotsee")
99 .build_dynamic_client(MODULE_ADDR.into())
100 .with_tx_simulation_delay(Duration::from_millis(100));
101
102 let mut connector = create_connector(blokli_client)?;
103 connector.connect().await?;
104
105 let jh = tokio::task::spawn(connector.subscribe()?.take(2).collect::<Vec<_>>());
106
107 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
108 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
109 ))?;
110 let multiaddress: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
111
112 connector.register_safe(&[3u8; Address::SIZE].into()).await?.await?;
113
114 connector
115 .announce(std::slice::from_ref(&multiaddress), &offchain_key_1)
116 .await?
117 .await?;
118
119 connector.open_channel(&account_2.chain_addr, 10.into()).await?.await?;
120
121 let events = jh.await?;
122
123 assert!(
124 matches!(&events[0], ChainEvent::Announcement(acc) if &acc.public_key == offchain_key_1.public() && acc.entry_type == AccountType::Announced(vec![multiaddress]))
125 );
126 assert!(
127 matches!(&events[1], ChainEvent::ChannelOpened(channel) if channel.get_id() == &generate_channel_id(&ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(), &account_2.chain_addr))
128 );
129
130 Ok(())
131 }
132
133 #[tokio::test]
134 async fn connector_should_stream_existing_state() -> anyhow::Result<()> {
135 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
136 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
137 ))?;
138 let account_1 = AccountEntry {
139 public_key: *offchain_key_1.public(),
140 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
141 entry_type: AccountType::Announced(vec!["/ip4/1.2.3.4/tcp/1234".parse()?]),
142 safe_address: Some([1u8; Address::SIZE].into()),
143 key_id: 1.into(),
144 };
145 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
146 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
147 ))?;
148 let account_2 = AccountEntry {
149 public_key: *offchain_key_2.public(),
150 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
151 entry_type: AccountType::NotAnnounced,
152 safe_address: Some([2u8; Address::SIZE].into()),
153 key_id: 2.into(),
154 };
155
156 let channel_1 = ChannelEntry::builder()
157 .between(
158 &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
159 &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
160 )
161 .amount(10)
162 .ticket_index(1)
163 .status(ChannelStatus::Open)
164 .epoch(1)
165 .build()?;
166
167 let channel_2 = ChannelEntry::builder()
168 .between(
169 &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
170 &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
171 )
172 .amount(15)
173 .ticket_index(2)
174 .status(ChannelStatus::PendingToClose(
175 std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10),
176 ))
177 .epoch(1)
178 .build()?;
179
180 let blokli_client = BlokliTestStateBuilder::default()
181 .with_accounts([
182 (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
183 (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
184 ])
185 .with_channels([channel_1, channel_2])
186 .with_hopr_network_chain_info("rotsee")
187 .build_static_client();
188
189 let mut connector = create_connector(blokli_client)?;
190 connector.connect().await?;
191
192 let accounts = connector
193 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])?
194 .take(1)
195 .collect::<Vec<_>>()
196 .await;
197 assert!(matches!(&accounts[0], ChainEvent::Announcement(acc) if acc == &account_1));
198
199 let accounts = connector
200 .subscribe_with_state_sync([StateSyncOptions::AllAccounts])?
201 .take(2)
202 .collect::<Vec<_>>()
203 .await;
204 assert!(matches!(&accounts[0], ChainEvent::Announcement(acc) if acc == &account_1));
205 assert!(matches!(&accounts[1], ChainEvent::Announcement(acc) if acc == &account_2));
206
207 let channels = connector
208 .subscribe_with_state_sync([StateSyncOptions::OpenedChannels])?
209 .take(1)
210 .collect::<Vec<_>>()
211 .await;
212 assert!(matches!(&channels[0], ChainEvent::ChannelOpened(ch) if ch == &channel_1));
213
214 Ok(())
215 }
216}