hopr_chain_connector/connector/
events.rs1use ahash::HashSet;
2use futures::StreamExt;
3use hopr_api::chain::{AccountSelector, ChainEvent, ChannelSelector, StateSyncOptions};
4use hopr_internal_types::channels::ChannelStatusDiscriminants;
5
6use crate::{Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
7
8impl<B, C, P, R> hopr_api::chain::ChainEvents for HoprBlockchainConnector<C, B, P, R>
9where
10 B: Backend + Send + Sync + 'static,
11{
12 type Error = ConnectorError;
13
14 fn subscribe_with_state_sync<I: IntoIterator<Item = StateSyncOptions>>(
15 &self,
16 options: I,
17 ) -> Result<impl futures::Stream<Item = ChainEvent> + Send + 'static, Self::Error> {
18 self.check_connection_state()?;
19
20 let options = options.into_iter().collect::<HashSet<_>>();
21
22 let mut state_stream = futures_concurrency::stream::StreamGroup::new();
23 if options.contains(&StateSyncOptions::PublicAccounts) && !options.contains(&StateSyncOptions::AllAccounts) {
24 let stream = self
25 .build_account_stream(AccountSelector::default().with_public_only(true))?
26 .map(ChainEvent::Announcement);
27 state_stream.insert(stream.boxed());
28 }
29
30 if options.contains(&StateSyncOptions::AllAccounts) {
31 let stream = self
32 .build_account_stream(AccountSelector::default().with_public_only(false))?
33 .map(ChainEvent::Announcement);
34 state_stream.insert(stream.boxed());
35 }
36
37 if options.contains(&StateSyncOptions::OpenedChannels) {
38 let stream = self
39 .build_channel_stream(
40 ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]),
41 )?
42 .map(ChainEvent::ChannelOpened);
43 state_stream.insert(stream.boxed());
44 }
45
46 Ok(state_stream.chain(self.events.1.activate_cloned()))
47 }
48}
49
50#[cfg(test)]
51mod tests {
52 use std::time::Duration;
53
54 use futures::StreamExt;
55 use hex_literal::hex;
56 use hopr_api::chain::{
57 ChainEvent, ChainEvents, ChainWriteAccountOperations, ChainWriteChannelOperations, DeployedSafe,
58 StateSyncOptions,
59 };
60 use hopr_crypto_types::prelude::*;
61 use hopr_internal_types::prelude::*;
62 use hopr_primitive_types::prelude::*;
63
64 use crate::{
65 connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
66 testing::BlokliTestStateBuilder,
67 };
68
69 #[tokio::test]
70 async fn connector_should_stream_new_events() -> anyhow::Result<()> {
71 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
72 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
73 ))?;
74 let account_2 = AccountEntry {
75 public_key: *offchain_key_2.public(),
76 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
77 entry_type: AccountType::NotAnnounced,
78 safe_address: Some([2u8; Address::SIZE].into()),
79 key_id: 1.into(),
80 };
81
82 let blokli_client = BlokliTestStateBuilder::default()
83 .with_accounts([(account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1))])
84 .with_balances([(
85 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
86 XDaiBalance::new_base(1),
87 )])
88 .with_balances([([3u8; Address::SIZE].into(), HoprBalance::new_base(100))])
89 .with_safe_allowances([([3u8; Address::SIZE].into(), HoprBalance::new_base(10000))])
90 .with_deployed_safes([DeployedSafe {
91 address: [3u8; Address::SIZE].into(),
92 owner: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
93 module: MODULE_ADDR.into(),
94 registered_nodes: vec![],
95 }])
96 .with_hopr_network_chain_info("rotsee")
97 .build_dynamic_client(MODULE_ADDR.into())
98 .with_tx_simulation_delay(Duration::from_millis(100));
99
100 let mut connector = create_connector(blokli_client)?;
101 connector.connect().await?;
102
103 let jh = tokio::task::spawn(connector.subscribe()?.take(2).collect::<Vec<_>>());
104
105 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
106 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
107 ))?;
108 let multiaddress: Multiaddr = "/ip4/127.0.0.1/tcp/1234".parse()?;
109
110 connector.register_safe(&[3u8; Address::SIZE].into()).await?.await?;
111
112 connector
113 .announce(&[multiaddress.clone()], &offchain_key_1)
114 .await?
115 .await?;
116
117 connector.open_channel(&account_2.chain_addr, 10.into()).await?.await?;
118
119 let events = jh.await?;
120
121 assert!(
122 matches!(&events[0], ChainEvent::Announcement(acc) if &acc.public_key == offchain_key_1.public() && acc.entry_type == AccountType::Announced(vec![multiaddress]))
123 );
124 assert!(
125 matches!(&events[1], ChainEvent::ChannelOpened(channel) if channel.get_id() == &generate_channel_id(&ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(), &account_2.chain_addr))
126 );
127
128 Ok(())
129 }
130
131 #[tokio::test]
132 async fn connector_should_stream_existing_state() -> anyhow::Result<()> {
133 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
134 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
135 ))?;
136 let account_1 = AccountEntry {
137 public_key: *offchain_key_1.public(),
138 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
139 entry_type: AccountType::Announced(vec!["/ip4/1.2.3.4/tcp/1234".parse()?]),
140 safe_address: Some([1u8; Address::SIZE].into()),
141 key_id: 1.into(),
142 };
143 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
144 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
145 ))?;
146 let account_2 = AccountEntry {
147 public_key: *offchain_key_2.public(),
148 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
149 entry_type: AccountType::NotAnnounced,
150 safe_address: Some([2u8; Address::SIZE].into()),
151 key_id: 2.into(),
152 };
153
154 let channel_1 = ChannelEntry::new(
155 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
156 ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
157 10.into(),
158 1,
159 ChannelStatus::Open,
160 1,
161 );
162
163 let channel_2 = ChannelEntry::new(
164 ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
165 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
166 15.into(),
167 2,
168 ChannelStatus::PendingToClose(std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10)),
169 1,
170 );
171
172 let blokli_client = BlokliTestStateBuilder::default()
173 .with_accounts([
174 (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
175 (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
176 ])
177 .with_channels([channel_1, channel_2])
178 .with_hopr_network_chain_info("rotsee")
179 .build_static_client();
180
181 let mut connector = create_connector(blokli_client)?;
182 connector.connect().await?;
183
184 let accounts = connector
185 .subscribe_with_state_sync([StateSyncOptions::PublicAccounts])?
186 .take(1)
187 .collect::<Vec<_>>()
188 .await;
189 assert!(matches!(&accounts[0], ChainEvent::Announcement(acc) if acc == &account_1));
190
191 let accounts = connector
192 .subscribe_with_state_sync([StateSyncOptions::AllAccounts])?
193 .take(2)
194 .collect::<Vec<_>>()
195 .await;
196 assert!(matches!(&accounts[0], ChainEvent::Announcement(acc) if acc == &account_1));
197 assert!(matches!(&accounts[1], ChainEvent::Announcement(acc) if acc == &account_2));
198
199 let channels = connector
200 .subscribe_with_state_sync([StateSyncOptions::OpenedChannels])?
201 .take(1)
202 .collect::<Vec<_>>()
203 .await;
204 assert!(matches!(&channels[0], ChainEvent::ChannelOpened(ch) if ch == &channel_1));
205
206 Ok(())
207 }
208}