hopr_chain_connector/connector/
channels.rs

1use blokli_client::api::{BlokliQueryClient, BlokliTransactionClient};
2use futures::{FutureExt, StreamExt, TryFutureExt, future::BoxFuture, stream::BoxStream};
3use hopr_api::chain::{ChainReceipt, ChannelSelector};
4use hopr_chain_types::prelude::*;
5use hopr_crypto_types::prelude::Keypair;
6use hopr_internal_types::prelude::*;
7use hopr_primitive_types::prelude::*;
8
9use crate::{backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
10
11impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
12where
13    B: Backend + Send + Sync + 'static,
14{
15    pub(crate) fn build_channel_stream(
16        &self,
17        selector: ChannelSelector,
18    ) -> Result<impl futures::Stream<Item = ChannelEntry> + Send + 'static, ConnectorError> {
19        // Note: Since the graph does not contain Closed channels, they cannot
20        // be selected if requested solely via the ChannelSelector.
21        if selector.allowed_states == [ChannelStatusDiscriminants::Closed] {
22            return Err(ConnectorError::InvalidArguments("cannot stream closed channels only"));
23        }
24
25        let mut channels = self
26            .graph
27            .read()
28            .all_edges()
29            .map(|(_, _, e)| e)
30            .copied()
31            .collect::<Vec<_>>();
32
33        // Ensure the returned channels are always perfectly ordered by their id.
34        channels.sort_unstable();
35
36        let backend = self.backend.clone();
37        Ok(futures::stream::iter(channels).filter_map(move |channel_id| {
38            let backend = backend.clone();
39            let selector = selector.clone();
40            // This avoids the cache on purpose so it does not get spammed
41            async move {
42                match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
43                {
44                    Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
45                    Ok(Err(error)) => {
46                        tracing::error!(%error, %channel_id, "backend error when looking up channel");
47                        None
48                    }
49                    Err(error) => {
50                        tracing::error!(%error, %channel_id, "join error when looking up channel");
51                        None
52                    }
53                }
54            }
55        }))
56    }
57}
58
59#[async_trait::async_trait]
60impl<B, C, P, R> hopr_api::chain::ChainReadChannelOperations for HoprBlockchainConnector<C, B, P, R>
61where
62    B: Backend + Send + Sync + 'static,
63    C: Send + Sync,
64    P: Send + Sync,
65    R: Send + Sync,
66{
67    type Error = ConnectorError;
68
69    fn me(&self) -> &Address {
70        self.chain_key.public().as_ref()
71    }
72
73    async fn channel_by_parties(&self, src: &Address, dst: &Address) -> Result<Option<ChannelEntry>, Self::Error> {
74        self.check_connection_state()?;
75
76        let backend = self.backend.clone();
77        let src = *src;
78        let dst = *dst;
79        Ok(self
80            .channel_by_parties
81            .try_get_with(ChannelParties::new(src, dst), async move {
82                tracing::warn!(%src, %dst, "cache miss on channel_by_parties");
83                match hopr_async_runtime::prelude::spawn_blocking(move || {
84                    let channel_id = generate_channel_id(&src, &dst);
85                    backend.get_channel_by_id(&channel_id)
86                })
87                .await
88                {
89                    Ok(Ok(value)) => Ok(value),
90                    Ok(Err(e)) => Err(ConnectorError::BackendError(e.into())),
91                    Err(e) => Err(ConnectorError::BackendError(e.into())),
92                }
93            })
94            .await?)
95    }
96
97    async fn channel_by_id(&self, channel_id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error> {
98        self.check_connection_state()?;
99
100        let channel_id = *channel_id;
101        let backend = self.backend.clone();
102        Ok(self
103            .channel_by_id
104            .try_get_with_by_ref(&channel_id, async move {
105                tracing::warn!(%channel_id, "cache miss on channel_by_id");
106                match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
107                {
108                    Ok(Ok(value)) => Ok(value),
109                    Ok(Err(e)) => Err(ConnectorError::BackendError(e.into())),
110                    Err(e) => Err(ConnectorError::BackendError(e.into())),
111                }
112            })
113            .await?)
114    }
115
116    async fn stream_channels<'a>(
117        &'a self,
118        selector: ChannelSelector,
119    ) -> Result<BoxStream<'a, ChannelEntry>, Self::Error> {
120        self.check_connection_state()?;
121
122        Ok(self.build_channel_stream(selector)?.boxed())
123    }
124}
125
126#[async_trait::async_trait]
127impl<B, C, P> hopr_api::chain::ChainWriteChannelOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
128where
129    B: Backend + Send + Sync + 'static,
130    C: BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
131    P: PayloadGenerator + Send + Sync + 'static,
132    P::TxRequest: Send + Sync + 'static,
133{
134    type Error = ConnectorError;
135
136    async fn open_channel<'a>(
137        &'a self,
138        dst: &'a Address,
139        amount: HoprBalance,
140    ) -> Result<BoxFuture<'a, Result<(ChannelId, ChainReceipt), Self::Error>>, Self::Error> {
141        self.check_connection_state()?;
142
143        let id = generate_channel_id(self.chain_key.public().as_ref(), dst);
144        let tx_req = self.payload_generator.fund_channel(*dst, amount)?;
145        tracing::debug!(channel_id = %id, %dst, %amount, "opening channel");
146
147        Ok(self
148            .send_tx(tx_req)
149            .await?
150            .and_then(move |tx_hash| futures::future::ok((id, tx_hash)))
151            .boxed())
152    }
153
154    async fn fund_channel<'a>(
155        &'a self,
156        channel_id: &'a ChannelId,
157        amount: HoprBalance,
158    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
159        self.check_connection_state()?;
160
161        use hopr_api::chain::ChainReadChannelOperations;
162
163        let channel = self
164            .channel_by_id(channel_id)
165            .await?
166            .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
167        let tx_req = self.payload_generator.fund_channel(channel.destination, amount)?;
168        tracing::debug!(%channel_id, %amount, "funding channel");
169
170        Ok(self.send_tx(tx_req).await?.boxed())
171    }
172
173    async fn close_channel<'a>(
174        &'a self,
175        channel_id: &'a ChannelId,
176    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
177        self.check_connection_state()?;
178
179        use hopr_api::chain::ChainReadChannelOperations;
180
181        let channel = self
182            .channel_by_id(channel_id)
183            .await?
184            .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
185
186        let direction = channel.direction(self.me()).ok_or(ConnectorError::InvalidArguments(
187            "cannot close channels that is not own",
188        ))?;
189
190        let tx_req = match channel.status {
191            ChannelStatus::Closed => return Err(ConnectorError::ChannelClosed(*channel_id)),
192            ChannelStatus::Open => {
193                if direction == ChannelDirection::Outgoing {
194                    tracing::debug!(%channel_id, "initiating outgoing channel closure");
195                    self.payload_generator
196                        .initiate_outgoing_channel_closure(channel.destination)?
197                } else {
198                    tracing::debug!(%channel_id, "closing incoming channel");
199                    self.payload_generator.close_incoming_channel(channel.source)?
200                }
201            }
202            c if c.closure_time_elapsed(&std::time::SystemTime::now()) => {
203                if direction == ChannelDirection::Outgoing {
204                    tracing::debug!(%channel_id, "finalizing outgoing channel closure");
205                    self.payload_generator
206                        .finalize_outgoing_channel_closure(channel.destination)?
207                } else {
208                    tracing::debug!(%channel_id, "closing incoming channel");
209                    self.payload_generator.close_incoming_channel(channel.source)?
210                }
211            }
212            _ => return Err(ConnectorError::InvalidState("channel closure time has not elapsed")),
213        };
214
215        Ok(self.send_tx(tx_req).await?.boxed())
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use std::time::Duration;
222
223    use hex_literal::hex;
224    use hopr_api::chain::{ChainReadChannelOperations, ChainWriteChannelOperations};
225    use hopr_crypto_types::keypairs::{ChainKeypair, OffchainKeypair};
226
227    use super::*;
228    use crate::{
229        connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
230        testing::BlokliTestStateBuilder,
231    };
232
233    #[tokio::test]
234    async fn connector_should_get_and_stream_channels() -> anyhow::Result<()> {
235        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
236            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
237        ))?;
238        let account_1 = AccountEntry {
239            public_key: *offchain_key_1.public(),
240            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
241            entry_type: AccountType::NotAnnounced,
242            safe_address: Some([1u8; Address::SIZE].into()),
243            key_id: 1.into(),
244        };
245        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
246            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
247        ))?;
248        let account_2 = AccountEntry {
249            public_key: *offchain_key_2.public(),
250            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
251            entry_type: AccountType::NotAnnounced,
252            safe_address: Some([2u8; Address::SIZE].into()),
253            key_id: 2.into(),
254        };
255
256        let channel_1 = ChannelEntry::new(
257            ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
258            ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
259            10.into(),
260            1,
261            ChannelStatus::Open,
262            1,
263        );
264
265        let channel_2 = ChannelEntry::new(
266            ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
267            ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
268            15.into(),
269            2,
270            ChannelStatus::PendingToClose(std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10)),
271            1,
272        );
273
274        let blokli_client = BlokliTestStateBuilder::default()
275            .with_accounts([
276                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
277                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
278            ])
279            .with_channels([channel_1, channel_2])
280            .with_hopr_network_chain_info("rotsee")
281            .build_dynamic_client(MODULE_ADDR.into());
282
283        let mut connector = create_connector(blokli_client)?;
284        connector.connect().await?;
285
286        assert_eq!(Some(channel_1), connector.channel_by_id(channel_1.get_id()).await?);
287        assert_eq!(
288            Some(channel_1),
289            connector
290                .channel_by_parties(&channel_1.source, &channel_1.destination)
291                .await?
292        );
293        assert_eq!(Some(channel_2), connector.channel_by_id(channel_2.get_id()).await?);
294        assert_eq!(
295            Some(channel_2),
296            connector
297                .channel_by_parties(&channel_2.source, &channel_2.destination)
298                .await?
299        );
300
301        assert_eq!(
302            vec![channel_1, channel_2],
303            connector
304                .stream_channels(ChannelSelector::default())
305                .await?
306                .collect::<Vec<_>>()
307                .await
308        );
309
310        assert_eq!(
311            vec![channel_1],
312            connector
313                .stream_channels(ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]))
314                .await?
315                .collect::<Vec<_>>()
316                .await
317        );
318        assert_eq!(
319            vec![channel_2],
320            connector
321                .stream_channels(
322                    ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
323                )
324                .await?
325                .collect::<Vec<_>>()
326                .await
327        );
328        assert_eq!(
329            Vec::<ChannelEntry>::new(),
330            connector
331                .stream_channels(
332                    ChannelSelector::default()
333                        .with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
334                        .with_closure_time_range(
335                            DateTime::from(std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(11))..
336                        )
337                )
338                .await?
339                .collect::<Vec<_>>()
340                .await
341        );
342
343        Ok(())
344    }
345
346    #[tokio::test]
347    async fn connector_should_open_channel() -> anyhow::Result<()> {
348        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
349            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
350        ))?;
351        let account_1 = AccountEntry {
352            public_key: *offchain_key_1.public(),
353            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
354            entry_type: AccountType::NotAnnounced,
355            safe_address: Some([1u8; Address::SIZE].into()),
356            key_id: 1.into(),
357        };
358        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
359            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
360        ))?;
361        let account_2 = AccountEntry {
362            public_key: *offchain_key_2.public(),
363            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
364            entry_type: AccountType::NotAnnounced,
365            safe_address: Some([2u8; Address::SIZE].into()),
366            key_id: 2.into(),
367        };
368
369        let blokli_client = BlokliTestStateBuilder::default()
370            .with_accounts([
371                (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
372                (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
373            ])
374            .with_hopr_network_chain_info("rotsee")
375            .build_dynamic_client(MODULE_ADDR.into());
376
377        let mut connector = create_connector(blokli_client)?;
378        connector.connect().await?;
379
380        connector.open_channel(&account_2.chain_addr, 10.into()).await?.await?;
381
382        insta::assert_yaml_snapshot!(*connector.client().snapshot());
383
384        Ok(())
385    }
386
387    #[tokio::test]
388    async fn connector_should_fund_channel() -> anyhow::Result<()> {
389        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
390            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
391        ))?;
392        let account_1 = AccountEntry {
393            public_key: *offchain_key_1.public(),
394            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
395            entry_type: AccountType::NotAnnounced,
396            safe_address: Some([1u8; Address::SIZE].into()),
397            key_id: 1.into(),
398        };
399        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
400            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
401        ))?;
402        let account_2 = AccountEntry {
403            public_key: *offchain_key_2.public(),
404            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
405            entry_type: AccountType::NotAnnounced,
406            safe_address: Some([2u8; Address::SIZE].into()),
407            key_id: 2.into(),
408        };
409
410        let channel_1 = ChannelEntry::new(
411            ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
412            ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
413            10.into(),
414            1,
415            ChannelStatus::Open,
416            1,
417        );
418
419        let blokli_client = BlokliTestStateBuilder::default()
420            .with_accounts([
421                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
422                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
423            ])
424            .with_channels([channel_1])
425            .with_hopr_network_chain_info("rotsee")
426            .build_dynamic_client(MODULE_ADDR.into());
427
428        let mut connector = create_connector(blokli_client)?;
429        connector.connect().await?;
430
431        connector.fund_channel(&channel_1.get_id(), 5.into()).await?.await?;
432
433        insta::assert_yaml_snapshot!(*connector.client().snapshot());
434
435        Ok(())
436    }
437
438    #[tokio::test]
439    async fn connector_should_initiate_channel_closure() -> anyhow::Result<()> {
440        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
441            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
442        ))?;
443        let account_1 = AccountEntry {
444            public_key: *offchain_key_1.public(),
445            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
446            entry_type: AccountType::NotAnnounced,
447            safe_address: Some([1u8; Address::SIZE].into()),
448            key_id: 1.into(),
449        };
450        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
451            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
452        ))?;
453        let account_2 = AccountEntry {
454            public_key: *offchain_key_2.public(),
455            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
456            entry_type: AccountType::NotAnnounced,
457            safe_address: Some([2u8; Address::SIZE].into()),
458            key_id: 2.into(),
459        };
460
461        let channel_1 = ChannelEntry::new(
462            ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
463            ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
464            10.into(),
465            1,
466            ChannelStatus::Open,
467            1,
468        );
469
470        let blokli_client = BlokliTestStateBuilder::default()
471            .with_accounts([
472                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
473                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
474            ])
475            .with_channels([channel_1])
476            .with_hopr_network_chain_info("rotsee")
477            .build_dynamic_client(MODULE_ADDR.into());
478
479        let mut connector = create_connector(blokli_client)?;
480        connector.connect().await?;
481
482        connector.close_channel(&channel_1.get_id()).await?.await?;
483
484        let mut snapshot = (*connector.client().snapshot()).clone();
485
486        // Replace the closure time value to make the snapshot deterministic
487        snapshot
488            .channels
489            .get_mut(&hex::encode(channel_1.get_id()))
490            .unwrap()
491            .closure_time = Some(blokli_client::api::types::DateTime { 0: "dummy".into() });
492
493        insta::assert_yaml_snapshot!(snapshot);
494
495        Ok(())
496    }
497
498    #[tokio::test]
499    async fn connector_should_finalize_channel_closure() -> anyhow::Result<()> {
500        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
501            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
502        ))?;
503        let account_1 = AccountEntry {
504            public_key: *offchain_key_1.public(),
505            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
506            entry_type: AccountType::NotAnnounced,
507            safe_address: Some([1u8; Address::SIZE].into()),
508            key_id: 1.into(),
509        };
510        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
511            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
512        ))?;
513        let account_2 = AccountEntry {
514            public_key: *offchain_key_2.public(),
515            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
516            entry_type: AccountType::NotAnnounced,
517            safe_address: Some([2u8; Address::SIZE].into()),
518            key_id: 2.into(),
519        };
520
521        let channel_1 = ChannelEntry::new(
522            ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
523            ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
524            10.into(),
525            1,
526            ChannelStatus::PendingToClose(std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10)),
527            1,
528        );
529
530        let blokli_client = BlokliTestStateBuilder::default()
531            .with_accounts([
532                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
533                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
534            ])
535            .with_channels([channel_1])
536            .with_hopr_network_chain_info("rotsee")
537            .build_dynamic_client(MODULE_ADDR.into());
538
539        let mut connector = create_connector(blokli_client)?;
540        connector.connect().await?;
541
542        connector.close_channel(&channel_1.get_id()).await?.await?;
543
544        insta::assert_yaml_snapshot!(*connector.client().snapshot());
545
546        Ok(())
547    }
548
549    #[tokio::test]
550    async fn connector_should_close_incoming_channel() -> anyhow::Result<()> {
551        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
552            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
553        ))?;
554        let account_1 = AccountEntry {
555            public_key: *offchain_key_1.public(),
556            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
557            entry_type: AccountType::NotAnnounced,
558            safe_address: Some([1u8; Address::SIZE].into()),
559            key_id: 1.into(),
560        };
561        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
562            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
563        ))?;
564        let account_2 = AccountEntry {
565            public_key: *offchain_key_2.public(),
566            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
567            entry_type: AccountType::NotAnnounced,
568            safe_address: Some([2u8; Address::SIZE].into()),
569            key_id: 2.into(),
570        };
571
572        let channel_1 = ChannelEntry::new(
573            ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
574            ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
575            10.into(),
576            1,
577            ChannelStatus::Open,
578            1,
579        );
580
581        let blokli_client = BlokliTestStateBuilder::default()
582            .with_accounts([
583                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
584                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
585            ])
586            .with_channels([channel_1])
587            .with_hopr_network_chain_info("rotsee")
588            .build_dynamic_client(MODULE_ADDR.into());
589
590        let mut connector = create_connector(blokli_client)?;
591        connector.connect().await?;
592
593        connector.close_channel(&channel_1.get_id()).await?.await?;
594
595        insta::assert_yaml_snapshot!(*connector.client().snapshot());
596
597        Ok(())
598    }
599}