1use blokli_client::api::{BlokliQueryClient, BlokliTransactionClient};
2use futures::{FutureExt, StreamExt, TryFutureExt, future::BoxFuture, stream::BoxStream};
3use hopr_api::chain::{ChainReceipt, ChannelSelector};
4use hopr_chain_types::prelude::*;
5use hopr_crypto_types::prelude::Keypair;
6use hopr_internal_types::prelude::*;
7use hopr_primitive_types::prelude::*;
8
9use crate::{backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
10
11impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
12where
13 B: Backend + Send + Sync + 'static,
14{
15 pub(crate) fn build_channel_stream(
16 &self,
17 selector: ChannelSelector,
18 ) -> Result<impl futures::Stream<Item = ChannelEntry> + Send + 'static, ConnectorError> {
19 if selector.allowed_states == [ChannelStatusDiscriminants::Closed] {
22 return Err(ConnectorError::InvalidArguments("cannot stream closed channels only"));
23 }
24
25 let mut channels = self
26 .graph
27 .read()
28 .all_edges()
29 .map(|(_, _, e)| e)
30 .copied()
31 .collect::<Vec<_>>();
32
33 channels.sort_unstable();
35
36 let backend = self.backend.clone();
37 Ok(futures::stream::iter(channels).filter_map(move |channel_id| {
38 let backend = backend.clone();
39 let selector = selector.clone();
40 async move {
42 match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
43 {
44 Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
45 Ok(Err(error)) => {
46 tracing::error!(%error, %channel_id, "backend error when looking up channel");
47 None
48 }
49 Err(error) => {
50 tracing::error!(%error, %channel_id, "join error when looking up channel");
51 None
52 }
53 }
54 }
55 }))
56 }
57}
58
59#[async_trait::async_trait]
60impl<B, C, P, R> hopr_api::chain::ChainReadChannelOperations for HoprBlockchainConnector<C, B, P, R>
61where
62 B: Backend + Send + Sync + 'static,
63 C: Send + Sync,
64 P: Send + Sync,
65 R: Send + Sync,
66{
67 type Error = ConnectorError;
68
69 fn me(&self) -> &Address {
70 self.chain_key.public().as_ref()
71 }
72
73 async fn channel_by_parties(&self, src: &Address, dst: &Address) -> Result<Option<ChannelEntry>, Self::Error> {
74 self.check_connection_state()?;
75
76 let backend = self.backend.clone();
77 let src = *src;
78 let dst = *dst;
79 Ok(self
80 .channel_by_parties
81 .try_get_with(ChannelParties::new(src, dst), async move {
82 tracing::warn!(%src, %dst, "cache miss on channel_by_parties");
83 match hopr_async_runtime::prelude::spawn_blocking(move || {
84 let channel_id = generate_channel_id(&src, &dst);
85 backend.get_channel_by_id(&channel_id)
86 })
87 .await
88 {
89 Ok(Ok(value)) => Ok(value),
90 Ok(Err(e)) => Err(ConnectorError::BackendError(e.into())),
91 Err(e) => Err(ConnectorError::BackendError(e.into())),
92 }
93 })
94 .await?)
95 }
96
97 async fn channel_by_id(&self, channel_id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error> {
98 self.check_connection_state()?;
99
100 let channel_id = *channel_id;
101 let backend = self.backend.clone();
102 Ok(self
103 .channel_by_id
104 .try_get_with_by_ref(&channel_id, async move {
105 tracing::warn!(%channel_id, "cache miss on channel_by_id");
106 match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
107 {
108 Ok(Ok(value)) => Ok(value),
109 Ok(Err(e)) => Err(ConnectorError::BackendError(e.into())),
110 Err(e) => Err(ConnectorError::BackendError(e.into())),
111 }
112 })
113 .await?)
114 }
115
116 async fn stream_channels<'a>(
117 &'a self,
118 selector: ChannelSelector,
119 ) -> Result<BoxStream<'a, ChannelEntry>, Self::Error> {
120 self.check_connection_state()?;
121
122 Ok(self.build_channel_stream(selector)?.boxed())
123 }
124}
125
126#[async_trait::async_trait]
127impl<B, C, P> hopr_api::chain::ChainWriteChannelOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
128where
129 B: Backend + Send + Sync + 'static,
130 C: BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
131 P: PayloadGenerator + Send + Sync + 'static,
132 P::TxRequest: Send + Sync + 'static,
133{
134 type Error = ConnectorError;
135
136 async fn open_channel<'a>(
137 &'a self,
138 dst: &'a Address,
139 amount: HoprBalance,
140 ) -> Result<BoxFuture<'a, Result<(ChannelId, ChainReceipt), Self::Error>>, Self::Error> {
141 self.check_connection_state()?;
142
143 let id = generate_channel_id(self.chain_key.public().as_ref(), dst);
144 let tx_req = self.payload_generator.fund_channel(*dst, amount)?;
145 tracing::debug!(channel_id = %id, %dst, %amount, "opening channel");
146
147 Ok(self
148 .send_tx(tx_req)
149 .await?
150 .and_then(move |tx_hash| futures::future::ok((id, tx_hash)))
151 .boxed())
152 }
153
154 async fn fund_channel<'a>(
155 &'a self,
156 channel_id: &'a ChannelId,
157 amount: HoprBalance,
158 ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
159 self.check_connection_state()?;
160
161 use hopr_api::chain::ChainReadChannelOperations;
162
163 let channel = self
164 .channel_by_id(channel_id)
165 .await?
166 .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
167 let tx_req = self.payload_generator.fund_channel(channel.destination, amount)?;
168 tracing::debug!(%channel_id, %amount, "funding channel");
169
170 Ok(self.send_tx(tx_req).await?.boxed())
171 }
172
173 async fn close_channel<'a>(
174 &'a self,
175 channel_id: &'a ChannelId,
176 ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
177 self.check_connection_state()?;
178
179 use hopr_api::chain::ChainReadChannelOperations;
180
181 let channel = self
182 .channel_by_id(channel_id)
183 .await?
184 .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
185
186 let direction = channel.direction(self.me()).ok_or(ConnectorError::InvalidArguments(
187 "cannot close channels that is not own",
188 ))?;
189
190 let tx_req = match channel.status {
191 ChannelStatus::Closed => return Err(ConnectorError::ChannelClosed(*channel_id)),
192 ChannelStatus::Open => {
193 if direction == ChannelDirection::Outgoing {
194 tracing::debug!(%channel_id, "initiating outgoing channel closure");
195 self.payload_generator
196 .initiate_outgoing_channel_closure(channel.destination)?
197 } else {
198 tracing::debug!(%channel_id, "closing incoming channel");
199 self.payload_generator.close_incoming_channel(channel.source)?
200 }
201 }
202 c if c.closure_time_elapsed(&std::time::SystemTime::now()) => {
203 if direction == ChannelDirection::Outgoing {
204 tracing::debug!(%channel_id, "finalizing outgoing channel closure");
205 self.payload_generator
206 .finalize_outgoing_channel_closure(channel.destination)?
207 } else {
208 tracing::debug!(%channel_id, "closing incoming channel");
209 self.payload_generator.close_incoming_channel(channel.source)?
210 }
211 }
212 _ => return Err(ConnectorError::InvalidState("channel closure time has not elapsed")),
213 };
214
215 Ok(self.send_tx(tx_req).await?.boxed())
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use std::time::Duration;
222
223 use hex_literal::hex;
224 use hopr_api::chain::{ChainReadChannelOperations, ChainWriteChannelOperations};
225 use hopr_crypto_types::keypairs::{ChainKeypair, OffchainKeypair};
226
227 use super::*;
228 use crate::{
229 connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
230 testing::BlokliTestStateBuilder,
231 };
232
233 #[tokio::test]
234 async fn connector_should_get_and_stream_channels() -> anyhow::Result<()> {
235 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
236 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
237 ))?;
238 let account_1 = AccountEntry {
239 public_key: *offchain_key_1.public(),
240 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
241 entry_type: AccountType::NotAnnounced,
242 safe_address: Some([1u8; Address::SIZE].into()),
243 key_id: 1.into(),
244 };
245 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
246 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
247 ))?;
248 let account_2 = AccountEntry {
249 public_key: *offchain_key_2.public(),
250 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
251 entry_type: AccountType::NotAnnounced,
252 safe_address: Some([2u8; Address::SIZE].into()),
253 key_id: 2.into(),
254 };
255
256 let channel_1 = ChannelEntry::new(
257 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
258 ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
259 10.into(),
260 1,
261 ChannelStatus::Open,
262 1,
263 );
264
265 let channel_2 = ChannelEntry::new(
266 ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
267 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
268 15.into(),
269 2,
270 ChannelStatus::PendingToClose(std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10)),
271 1,
272 );
273
274 let blokli_client = BlokliTestStateBuilder::default()
275 .with_accounts([
276 (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
277 (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
278 ])
279 .with_channels([channel_1, channel_2])
280 .with_hopr_network_chain_info("rotsee")
281 .build_dynamic_client(MODULE_ADDR.into());
282
283 let mut connector = create_connector(blokli_client)?;
284 connector.connect().await?;
285
286 assert_eq!(Some(channel_1), connector.channel_by_id(channel_1.get_id()).await?);
287 assert_eq!(
288 Some(channel_1),
289 connector
290 .channel_by_parties(&channel_1.source, &channel_1.destination)
291 .await?
292 );
293 assert_eq!(Some(channel_2), connector.channel_by_id(channel_2.get_id()).await?);
294 assert_eq!(
295 Some(channel_2),
296 connector
297 .channel_by_parties(&channel_2.source, &channel_2.destination)
298 .await?
299 );
300
301 assert_eq!(
302 vec![channel_1, channel_2],
303 connector
304 .stream_channels(ChannelSelector::default())
305 .await?
306 .collect::<Vec<_>>()
307 .await
308 );
309
310 assert_eq!(
311 vec![channel_1],
312 connector
313 .stream_channels(ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]))
314 .await?
315 .collect::<Vec<_>>()
316 .await
317 );
318 assert_eq!(
319 vec![channel_2],
320 connector
321 .stream_channels(
322 ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
323 )
324 .await?
325 .collect::<Vec<_>>()
326 .await
327 );
328 assert_eq!(
329 Vec::<ChannelEntry>::new(),
330 connector
331 .stream_channels(
332 ChannelSelector::default()
333 .with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
334 .with_closure_time_range(
335 DateTime::from(std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(11))..
336 )
337 )
338 .await?
339 .collect::<Vec<_>>()
340 .await
341 );
342
343 Ok(())
344 }
345
346 #[tokio::test]
347 async fn connector_should_open_channel() -> anyhow::Result<()> {
348 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
349 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
350 ))?;
351 let account_1 = AccountEntry {
352 public_key: *offchain_key_1.public(),
353 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
354 entry_type: AccountType::NotAnnounced,
355 safe_address: Some([1u8; Address::SIZE].into()),
356 key_id: 1.into(),
357 };
358 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
359 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
360 ))?;
361 let account_2 = AccountEntry {
362 public_key: *offchain_key_2.public(),
363 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
364 entry_type: AccountType::NotAnnounced,
365 safe_address: Some([2u8; Address::SIZE].into()),
366 key_id: 2.into(),
367 };
368
369 let blokli_client = BlokliTestStateBuilder::default()
370 .with_accounts([
371 (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
372 (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
373 ])
374 .with_hopr_network_chain_info("rotsee")
375 .build_dynamic_client(MODULE_ADDR.into());
376
377 let mut connector = create_connector(blokli_client)?;
378 connector.connect().await?;
379
380 connector.open_channel(&account_2.chain_addr, 10.into()).await?.await?;
381
382 insta::assert_yaml_snapshot!(*connector.client().snapshot());
383
384 Ok(())
385 }
386
387 #[tokio::test]
388 async fn connector_should_fund_channel() -> anyhow::Result<()> {
389 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
390 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
391 ))?;
392 let account_1 = AccountEntry {
393 public_key: *offchain_key_1.public(),
394 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
395 entry_type: AccountType::NotAnnounced,
396 safe_address: Some([1u8; Address::SIZE].into()),
397 key_id: 1.into(),
398 };
399 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
400 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
401 ))?;
402 let account_2 = AccountEntry {
403 public_key: *offchain_key_2.public(),
404 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
405 entry_type: AccountType::NotAnnounced,
406 safe_address: Some([2u8; Address::SIZE].into()),
407 key_id: 2.into(),
408 };
409
410 let channel_1 = ChannelEntry::new(
411 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
412 ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
413 10.into(),
414 1,
415 ChannelStatus::Open,
416 1,
417 );
418
419 let blokli_client = BlokliTestStateBuilder::default()
420 .with_accounts([
421 (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
422 (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
423 ])
424 .with_channels([channel_1])
425 .with_hopr_network_chain_info("rotsee")
426 .build_dynamic_client(MODULE_ADDR.into());
427
428 let mut connector = create_connector(blokli_client)?;
429 connector.connect().await?;
430
431 connector.fund_channel(&channel_1.get_id(), 5.into()).await?.await?;
432
433 insta::assert_yaml_snapshot!(*connector.client().snapshot());
434
435 Ok(())
436 }
437
438 #[tokio::test]
439 async fn connector_should_initiate_channel_closure() -> anyhow::Result<()> {
440 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
441 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
442 ))?;
443 let account_1 = AccountEntry {
444 public_key: *offchain_key_1.public(),
445 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
446 entry_type: AccountType::NotAnnounced,
447 safe_address: Some([1u8; Address::SIZE].into()),
448 key_id: 1.into(),
449 };
450 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
451 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
452 ))?;
453 let account_2 = AccountEntry {
454 public_key: *offchain_key_2.public(),
455 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
456 entry_type: AccountType::NotAnnounced,
457 safe_address: Some([2u8; Address::SIZE].into()),
458 key_id: 2.into(),
459 };
460
461 let channel_1 = ChannelEntry::new(
462 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
463 ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
464 10.into(),
465 1,
466 ChannelStatus::Open,
467 1,
468 );
469
470 let blokli_client = BlokliTestStateBuilder::default()
471 .with_accounts([
472 (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
473 (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
474 ])
475 .with_channels([channel_1])
476 .with_hopr_network_chain_info("rotsee")
477 .build_dynamic_client(MODULE_ADDR.into());
478
479 let mut connector = create_connector(blokli_client)?;
480 connector.connect().await?;
481
482 connector.close_channel(&channel_1.get_id()).await?.await?;
483
484 let mut snapshot = (*connector.client().snapshot()).clone();
485
486 snapshot
488 .channels
489 .get_mut(&hex::encode(channel_1.get_id()))
490 .unwrap()
491 .closure_time = Some(blokli_client::api::types::DateTime { 0: "dummy".into() });
492
493 insta::assert_yaml_snapshot!(snapshot);
494
495 Ok(())
496 }
497
498 #[tokio::test]
499 async fn connector_should_finalize_channel_closure() -> anyhow::Result<()> {
500 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
501 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
502 ))?;
503 let account_1 = AccountEntry {
504 public_key: *offchain_key_1.public(),
505 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
506 entry_type: AccountType::NotAnnounced,
507 safe_address: Some([1u8; Address::SIZE].into()),
508 key_id: 1.into(),
509 };
510 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
511 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
512 ))?;
513 let account_2 = AccountEntry {
514 public_key: *offchain_key_2.public(),
515 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
516 entry_type: AccountType::NotAnnounced,
517 safe_address: Some([2u8; Address::SIZE].into()),
518 key_id: 2.into(),
519 };
520
521 let channel_1 = ChannelEntry::new(
522 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
523 ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
524 10.into(),
525 1,
526 ChannelStatus::PendingToClose(std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10)),
527 1,
528 );
529
530 let blokli_client = BlokliTestStateBuilder::default()
531 .with_accounts([
532 (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
533 (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
534 ])
535 .with_channels([channel_1])
536 .with_hopr_network_chain_info("rotsee")
537 .build_dynamic_client(MODULE_ADDR.into());
538
539 let mut connector = create_connector(blokli_client)?;
540 connector.connect().await?;
541
542 connector.close_channel(&channel_1.get_id()).await?.await?;
543
544 insta::assert_yaml_snapshot!(*connector.client().snapshot());
545
546 Ok(())
547 }
548
549 #[tokio::test]
550 async fn connector_should_close_incoming_channel() -> anyhow::Result<()> {
551 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
552 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
553 ))?;
554 let account_1 = AccountEntry {
555 public_key: *offchain_key_1.public(),
556 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
557 entry_type: AccountType::NotAnnounced,
558 safe_address: Some([1u8; Address::SIZE].into()),
559 key_id: 1.into(),
560 };
561 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
562 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
563 ))?;
564 let account_2 = AccountEntry {
565 public_key: *offchain_key_2.public(),
566 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
567 entry_type: AccountType::NotAnnounced,
568 safe_address: Some([2u8; Address::SIZE].into()),
569 key_id: 2.into(),
570 };
571
572 let channel_1 = ChannelEntry::new(
573 ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
574 ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
575 10.into(),
576 1,
577 ChannelStatus::Open,
578 1,
579 );
580
581 let blokli_client = BlokliTestStateBuilder::default()
582 .with_accounts([
583 (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
584 (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
585 ])
586 .with_channels([channel_1])
587 .with_hopr_network_chain_info("rotsee")
588 .build_dynamic_client(MODULE_ADDR.into());
589
590 let mut connector = create_connector(blokli_client)?;
591 connector.connect().await?;
592
593 connector.close_channel(&channel_1.get_id()).await?.await?;
594
595 insta::assert_yaml_snapshot!(*connector.client().snapshot());
596
597 Ok(())
598 }
599}