1use blokli_client::api::{BlokliQueryClient, BlokliTransactionClient};
2use futures::{FutureExt, StreamExt, future::BoxFuture, stream::BoxStream};
3use hopr_api::{
4 chain::{ChainReceipt, ChannelSelector},
5 types::{chain::prelude::*, crypto::prelude::Keypair, internal::prelude::*, primitive::prelude::*},
6};
7
8use crate::{backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
9
10impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
11where
12 B: Backend + Send + Sync + 'static,
13{
14 pub(crate) fn build_channel_stream(
15 &self,
16 selector: ChannelSelector,
17 ) -> Result<impl futures::Stream<Item = ChannelEntry> + Send + 'static, ConnectorError> {
18 if selector.allowed_states == [ChannelStatusDiscriminants::Closed] {
21 return Err(ConnectorError::InvalidArguments("cannot stream closed channels only"));
22 }
23
24 let mut channels = self
25 .graph
26 .read()
27 .all_edges()
28 .map(|(_, _, e)| e)
29 .copied()
30 .collect::<Vec<_>>();
31
32 channels.sort_unstable();
34
35 let backend = self.backend.clone();
36 Ok(futures::stream::iter(channels).filter_map(move |channel_id| {
37 let backend = backend.clone();
38 let selector = selector.clone();
39 async move {
41 match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
42 {
43 Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
44 Ok(Err(error)) => {
45 tracing::error!(%error, %channel_id, "backend error when looking up channel");
46 None
47 }
48 Err(error) => {
49 tracing::error!(%error, %channel_id, "join error when looking up channel");
50 None
51 }
52 }
53 }
54 }))
55 }
56}
57
58#[async_trait::async_trait]
59impl<B, C, P, R> hopr_api::chain::ChainReadChannelOperations for HoprBlockchainConnector<C, B, P, R>
60where
61 B: Backend + Send + Sync + 'static,
62 C: Send + Sync,
63 P: Send + Sync,
64 R: Send + Sync,
65{
66 type Error = ConnectorError;
67
68 fn me(&self) -> &Address {
69 self.chain_key.public().as_ref()
70 }
71
72 async fn channel_by_parties(&self, src: &Address, dst: &Address) -> Result<Option<ChannelEntry>, Self::Error> {
73 self.check_connection_state()?;
74
75 let backend = self.backend.clone();
76 let src = *src;
77 let dst = *dst;
78 Ok(self
79 .channel_by_parties
80 .try_get_with(ChannelParties::new(src, dst), async move {
81 tracing::warn!(%src, %dst, "cache miss on channel_by_parties");
82 match hopr_async_runtime::prelude::spawn_blocking(move || {
83 let channel_id = generate_channel_id(&src, &dst);
84 backend.get_channel_by_id(&channel_id)
85 })
86 .await
87 {
88 Ok(Ok(value)) => Ok(value),
89 Ok(Err(e)) => Err(ConnectorError::backend(e)),
90 Err(e) => Err(ConnectorError::backend(e)),
91 }
92 })
93 .await?)
94 }
95
96 async fn channel_by_id(&self, channel_id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error> {
97 self.check_connection_state()?;
98
99 let channel_id = *channel_id;
100 let backend = self.backend.clone();
101 Ok(self
102 .channel_by_id
103 .try_get_with_by_ref(&channel_id, async move {
104 tracing::warn!(%channel_id, "cache miss on channel_by_id");
105 match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
106 {
107 Ok(Ok(value)) => Ok(value),
108 Ok(Err(e)) => Err(ConnectorError::backend(e)),
109 Err(e) => Err(ConnectorError::backend(e)),
110 }
111 })
112 .await?)
113 }
114
115 async fn stream_channels<'a>(
116 &'a self,
117 selector: ChannelSelector,
118 ) -> Result<BoxStream<'a, ChannelEntry>, Self::Error> {
119 self.check_connection_state()?;
120
121 Ok(self.build_channel_stream(selector)?.boxed())
122 }
123}
124
125#[async_trait::async_trait]
126impl<B, C, P> hopr_api::chain::ChainWriteChannelOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
127where
128 B: Backend + Send + Sync + 'static,
129 C: BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
130 P: PayloadGenerator + Send + Sync + 'static,
131 P::TxRequest: Send + Sync + 'static,
132{
133 type Error = ConnectorError;
134
135 async fn open_channel<'a>(
136 &'a self,
137 dst: &'a Address,
138 amount: HoprBalance,
139 ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
140 self.check_connection_state()?;
141
142 let tx_req = self.payload_generator.fund_channel(*dst, amount)?;
143 tracing::debug!( %dst, %amount, "opening channel");
144
145 Ok(self.send_tx(tx_req, None).await?.boxed())
146 }
147
148 async fn fund_channel<'a>(
149 &'a self,
150 channel_id: &'a ChannelId,
151 amount: HoprBalance,
152 ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
153 self.check_connection_state()?;
154
155 use hopr_api::chain::ChainReadChannelOperations;
156
157 let channel = self
158 .channel_by_id(channel_id)
159 .await?
160 .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
161 let tx_req = self.payload_generator.fund_channel(channel.destination, amount)?;
162 tracing::debug!(%channel_id, %amount, "funding channel");
163
164 Ok(self.send_tx(tx_req, None).await?.boxed())
165 }
166
167 async fn close_channel<'a>(
168 &'a self,
169 channel_id: &'a ChannelId,
170 ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
171 self.check_connection_state()?;
172
173 use hopr_api::chain::ChainReadChannelOperations;
174
175 let channel = self
176 .channel_by_id(channel_id)
177 .await?
178 .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
179
180 let direction = channel.direction(self.me()).ok_or(ConnectorError::InvalidArguments(
181 "cannot close channels that is not own",
182 ))?;
183
184 let tx_req = match channel.status {
185 ChannelStatus::Closed => return Err(ConnectorError::ChannelClosed(*channel_id)),
186 ChannelStatus::Open => {
187 if direction == ChannelDirection::Outgoing {
188 tracing::debug!(%channel_id, "initiating outgoing channel closure");
189 self.payload_generator
190 .initiate_outgoing_channel_closure(channel.destination)?
191 } else {
192 tracing::debug!(%channel_id, "closing incoming channel");
193 self.payload_generator.close_incoming_channel(channel.source)?
194 }
195 }
196 c if c.closure_time_elapsed(&std::time::SystemTime::now()) => {
197 if direction == ChannelDirection::Outgoing {
198 tracing::debug!(%channel_id, "finalizing outgoing channel closure");
199 self.payload_generator
200 .finalize_outgoing_channel_closure(channel.destination)?
201 } else {
202 tracing::debug!(%channel_id, "closing incoming channel");
203 self.payload_generator.close_incoming_channel(channel.source)?
204 }
205 }
206 _ => return Err(ConnectorError::InvalidState("channel closure time has not elapsed")),
207 };
208
209 Ok(self.send_tx(tx_req, None).await?.boxed())
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use std::time::Duration;
216
217 use hex_literal::hex;
218 use hopr_api::{
219 chain::{ChainReadChannelOperations, ChainWriteChannelOperations},
220 types::crypto::keypairs::{ChainKeypair, OffchainKeypair},
221 };
222
223 use super::*;
224 use crate::{
225 connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
226 testing::BlokliTestStateBuilder,
227 };
228
229 #[tokio::test]
230 async fn connector_should_get_and_stream_channels() -> anyhow::Result<()> {
231 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
232 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
233 ))?;
234 let account_1 = AccountEntry {
235 public_key: *offchain_key_1.public(),
236 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
237 entry_type: AccountType::NotAnnounced,
238 safe_address: Some([1u8; Address::SIZE].into()),
239 key_id: 1.into(),
240 };
241 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
242 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
243 ))?;
244 let account_2 = AccountEntry {
245 public_key: *offchain_key_2.public(),
246 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
247 entry_type: AccountType::NotAnnounced,
248 safe_address: Some([2u8; Address::SIZE].into()),
249 key_id: 2.into(),
250 };
251
252 let channel_1 = ChannelEntry::builder()
253 .between(
254 &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
255 &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
256 )
257 .amount(10)
258 .ticket_index(1)
259 .status(ChannelStatus::Open)
260 .epoch(1)
261 .build()?;
262
263 let channel_2 = ChannelEntry::builder()
264 .between(
265 &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
266 &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
267 )
268 .amount(15)
269 .ticket_index(2)
270 .status(ChannelStatus::PendingToClose(
271 std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10),
272 ))
273 .epoch(1)
274 .build()?;
275
276 let blokli_client = BlokliTestStateBuilder::default()
277 .with_accounts([
278 (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
279 (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
280 ])
281 .with_channels([channel_1, channel_2])
282 .with_hopr_network_chain_info("rotsee")
283 .build_dynamic_client(MODULE_ADDR.into());
284
285 let mut connector = create_connector(blokli_client)?;
286 connector.connect().await?;
287
288 assert_eq!(Some(channel_1), connector.channel_by_id(channel_1.get_id()).await?);
289 assert_eq!(
290 Some(channel_1),
291 connector
292 .channel_by_parties(&channel_1.source, &channel_1.destination)
293 .await?
294 );
295 assert_eq!(Some(channel_2), connector.channel_by_id(channel_2.get_id()).await?);
296 assert_eq!(
297 Some(channel_2),
298 connector
299 .channel_by_parties(&channel_2.source, &channel_2.destination)
300 .await?
301 );
302
303 assert_eq!(
304 vec![channel_1, channel_2],
305 connector
306 .stream_channels(ChannelSelector::default())
307 .await?
308 .collect::<Vec<_>>()
309 .await
310 );
311
312 assert_eq!(
313 vec![channel_1],
314 connector
315 .stream_channels(ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]))
316 .await?
317 .collect::<Vec<_>>()
318 .await
319 );
320 assert_eq!(
321 vec![channel_2],
322 connector
323 .stream_channels(
324 ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
325 )
326 .await?
327 .collect::<Vec<_>>()
328 .await
329 );
330 assert_eq!(
331 Vec::<ChannelEntry>::new(),
332 connector
333 .stream_channels(
334 ChannelSelector::default()
335 .with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
336 .with_closure_time_range(
337 DateTime::from(std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(11))..
338 )
339 )
340 .await?
341 .collect::<Vec<_>>()
342 .await
343 );
344
345 Ok(())
346 }
347
348 #[tokio::test]
349 async fn connector_should_open_channel() -> anyhow::Result<()> {
350 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
351 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
352 ))?;
353 let account_1 = AccountEntry {
354 public_key: *offchain_key_1.public(),
355 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
356 entry_type: AccountType::NotAnnounced,
357 safe_address: Some([1u8; Address::SIZE].into()),
358 key_id: 1.into(),
359 };
360 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
361 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
362 ))?;
363 let account_2 = AccountEntry {
364 public_key: *offchain_key_2.public(),
365 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
366 entry_type: AccountType::NotAnnounced,
367 safe_address: Some([2u8; Address::SIZE].into()),
368 key_id: 2.into(),
369 };
370
371 let blokli_client = BlokliTestStateBuilder::default()
372 .with_accounts([
373 (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
374 (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
375 ])
376 .with_hopr_network_chain_info("rotsee")
377 .build_dynamic_client(MODULE_ADDR.into());
378
379 let mut connector = create_connector(blokli_client)?;
380 connector.connect().await?;
381
382 connector.open_channel(&account_2.chain_addr, 10.into()).await?.await?;
383
384 insta::assert_yaml_snapshot!(*connector.client().snapshot());
385
386 Ok(())
387 }
388
389 #[tokio::test]
390 async fn connector_should_fund_channel() -> anyhow::Result<()> {
391 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
392 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
393 ))?;
394 let account_1 = AccountEntry {
395 public_key: *offchain_key_1.public(),
396 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
397 entry_type: AccountType::NotAnnounced,
398 safe_address: Some([1u8; Address::SIZE].into()),
399 key_id: 1.into(),
400 };
401 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
402 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
403 ))?;
404 let account_2 = AccountEntry {
405 public_key: *offchain_key_2.public(),
406 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
407 entry_type: AccountType::NotAnnounced,
408 safe_address: Some([2u8; Address::SIZE].into()),
409 key_id: 2.into(),
410 };
411
412 let channel_1 = ChannelEntry::builder()
413 .between(
414 &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
415 &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
416 )
417 .amount(10)
418 .ticket_index(1)
419 .status(ChannelStatus::Open)
420 .epoch(1)
421 .build()?;
422
423 let blokli_client = BlokliTestStateBuilder::default()
424 .with_accounts([
425 (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
426 (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
427 ])
428 .with_channels([channel_1])
429 .with_hopr_network_chain_info("rotsee")
430 .build_dynamic_client(MODULE_ADDR.into());
431
432 let mut connector = create_connector(blokli_client)?;
433 connector.connect().await?;
434
435 connector.fund_channel(channel_1.get_id(), 5.into()).await?.await?;
436
437 insta::assert_yaml_snapshot!(*connector.client().snapshot());
438
439 Ok(())
440 }
441
442 #[tokio::test]
443 async fn connector_should_initiate_channel_closure() -> anyhow::Result<()> {
444 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
445 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
446 ))?;
447 let account_1 = AccountEntry {
448 public_key: *offchain_key_1.public(),
449 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
450 entry_type: AccountType::NotAnnounced,
451 safe_address: Some([1u8; Address::SIZE].into()),
452 key_id: 1.into(),
453 };
454 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
455 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
456 ))?;
457 let account_2 = AccountEntry {
458 public_key: *offchain_key_2.public(),
459 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
460 entry_type: AccountType::NotAnnounced,
461 safe_address: Some([2u8; Address::SIZE].into()),
462 key_id: 2.into(),
463 };
464
465 let channel_1 = ChannelEntry::builder()
466 .between(
467 &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
468 &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
469 )
470 .amount(10)
471 .ticket_index(1)
472 .status(ChannelStatus::Open)
473 .epoch(1)
474 .build()?;
475
476 let blokli_client = BlokliTestStateBuilder::default()
477 .with_accounts([
478 (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
479 (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
480 ])
481 .with_channels([channel_1])
482 .with_hopr_network_chain_info("rotsee")
483 .build_dynamic_client(MODULE_ADDR.into());
484
485 let mut connector = create_connector(blokli_client)?;
486 connector.connect().await?;
487
488 connector.close_channel(channel_1.get_id()).await?.await?;
489
490 let mut snapshot = (*connector.client().snapshot()).clone();
491
492 snapshot
494 .channels
495 .get_mut(&hex::encode(channel_1.get_id()))
496 .unwrap()
497 .closure_time = Some(blokli_client::api::types::DateTime("dummy".into()));
498
499 insta::assert_yaml_snapshot!(snapshot);
500
501 Ok(())
502 }
503
504 #[tokio::test]
505 async fn connector_should_finalize_channel_closure() -> anyhow::Result<()> {
506 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
507 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
508 ))?;
509 let account_1 = AccountEntry {
510 public_key: *offchain_key_1.public(),
511 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
512 entry_type: AccountType::NotAnnounced,
513 safe_address: Some([1u8; Address::SIZE].into()),
514 key_id: 1.into(),
515 };
516 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
517 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
518 ))?;
519 let account_2 = AccountEntry {
520 public_key: *offchain_key_2.public(),
521 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
522 entry_type: AccountType::NotAnnounced,
523 safe_address: Some([2u8; Address::SIZE].into()),
524 key_id: 2.into(),
525 };
526
527 let channel_1 = ChannelEntry::builder()
528 .between(
529 &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
530 &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
531 )
532 .amount(10)
533 .ticket_index(1)
534 .status(ChannelStatus::PendingToClose(
535 std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10),
536 ))
537 .epoch(1)
538 .build()?;
539
540 let blokli_client = BlokliTestStateBuilder::default()
541 .with_accounts([
542 (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
543 (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
544 ])
545 .with_channels([channel_1])
546 .with_hopr_network_chain_info("rotsee")
547 .build_dynamic_client(MODULE_ADDR.into());
548
549 let mut connector = create_connector(blokli_client)?;
550 connector.connect().await?;
551
552 connector.close_channel(channel_1.get_id()).await?.await?;
553
554 insta::assert_yaml_snapshot!(*connector.client().snapshot());
555
556 Ok(())
557 }
558
559 #[tokio::test]
560 async fn connector_should_close_incoming_channel() -> anyhow::Result<()> {
561 let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
562 "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
563 ))?;
564 let account_1 = AccountEntry {
565 public_key: *offchain_key_1.public(),
566 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
567 entry_type: AccountType::NotAnnounced,
568 safe_address: Some([1u8; Address::SIZE].into()),
569 key_id: 1.into(),
570 };
571 let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
572 "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
573 ))?;
574 let account_2 = AccountEntry {
575 public_key: *offchain_key_2.public(),
576 chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
577 entry_type: AccountType::NotAnnounced,
578 safe_address: Some([2u8; Address::SIZE].into()),
579 key_id: 2.into(),
580 };
581
582 let channel_1 = ChannelEntry::builder()
583 .between(
584 &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
585 &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
586 )
587 .amount(10)
588 .ticket_index(1)
589 .status(ChannelStatus::Open)
590 .epoch(1)
591 .build()?;
592
593 let blokli_client = BlokliTestStateBuilder::default()
594 .with_accounts([
595 (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
596 (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
597 ])
598 .with_channels([channel_1])
599 .with_hopr_network_chain_info("rotsee")
600 .build_dynamic_client(MODULE_ADDR.into());
601
602 let mut connector = create_connector(blokli_client)?;
603 connector.connect().await?;
604
605 connector.close_channel(channel_1.get_id()).await?.await?;
606
607 insta::assert_yaml_snapshot!(*connector.client().snapshot());
608
609 Ok(())
610 }
611}