Skip to main content

hopr_chain_connector/connector/
channels.rs

1use blokli_client::api::{BlokliQueryClient, BlokliTransactionClient};
2use futures::{FutureExt, StreamExt, future::BoxFuture, stream::BoxStream};
3use hopr_api::{
4    chain::{ChainReceipt, ChannelSelector},
5    types::{chain::prelude::*, crypto::prelude::Keypair, internal::prelude::*, primitive::prelude::*},
6};
7
8use crate::{backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
9
10impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
11where
12    B: Backend + Send + Sync + 'static,
13{
14    pub(crate) fn build_channel_stream(
15        &self,
16        selector: ChannelSelector,
17    ) -> Result<impl futures::Stream<Item = ChannelEntry> + Send + 'static, ConnectorError> {
18        // Note: Since the graph does not contain Closed channels, they cannot
19        // be selected if requested solely via the ChannelSelector.
20        if selector.allowed_states == [ChannelStatusDiscriminants::Closed] {
21            return Err(ConnectorError::InvalidArguments("cannot stream closed channels only"));
22        }
23
24        let mut channels = self
25            .graph
26            .read()
27            .all_edges()
28            .map(|(_, _, e)| e)
29            .copied()
30            .collect::<Vec<_>>();
31
32        // Ensure the returned channels are always perfectly ordered by their id.
33        channels.sort_unstable();
34
35        let backend = self.backend.clone();
36        Ok(futures::stream::iter(channels).filter_map(move |channel_id| {
37            let backend = backend.clone();
38            let selector = selector.clone();
39            // This avoids the cache on purpose so it does not get spammed
40            async move {
41                match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
42                {
43                    Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
44                    Ok(Err(error)) => {
45                        tracing::error!(%error, %channel_id, "backend error when looking up channel");
46                        None
47                    }
48                    Err(error) => {
49                        tracing::error!(%error, %channel_id, "join error when looking up channel");
50                        None
51                    }
52                }
53            }
54        }))
55    }
56}
57
58#[async_trait::async_trait]
59impl<B, C, P, R> hopr_api::chain::ChainReadChannelOperations for HoprBlockchainConnector<C, B, P, R>
60where
61    B: Backend + Send + Sync + 'static,
62    C: Send + Sync,
63    P: Send + Sync,
64    R: Send + Sync,
65{
66    type Error = ConnectorError;
67
68    fn me(&self) -> &Address {
69        self.chain_key.public().as_ref()
70    }
71
72    async fn channel_by_parties(&self, src: &Address, dst: &Address) -> Result<Option<ChannelEntry>, Self::Error> {
73        self.check_connection_state()?;
74
75        let backend = self.backend.clone();
76        let src = *src;
77        let dst = *dst;
78        Ok(self
79            .channel_by_parties
80            .try_get_with(ChannelParties::new(src, dst), async move {
81                tracing::warn!(%src, %dst, "cache miss on channel_by_parties");
82                match hopr_async_runtime::prelude::spawn_blocking(move || {
83                    let channel_id = generate_channel_id(&src, &dst);
84                    backend.get_channel_by_id(&channel_id)
85                })
86                .await
87                {
88                    Ok(Ok(value)) => Ok(value),
89                    Ok(Err(e)) => Err(ConnectorError::backend(e)),
90                    Err(e) => Err(ConnectorError::backend(e)),
91                }
92            })
93            .await?)
94    }
95
96    async fn channel_by_id(&self, channel_id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error> {
97        self.check_connection_state()?;
98
99        let channel_id = *channel_id;
100        let backend = self.backend.clone();
101        Ok(self
102            .channel_by_id
103            .try_get_with_by_ref(&channel_id, async move {
104                tracing::warn!(%channel_id, "cache miss on channel_by_id");
105                match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
106                {
107                    Ok(Ok(value)) => Ok(value),
108                    Ok(Err(e)) => Err(ConnectorError::backend(e)),
109                    Err(e) => Err(ConnectorError::backend(e)),
110                }
111            })
112            .await?)
113    }
114
115    async fn stream_channels<'a>(
116        &'a self,
117        selector: ChannelSelector,
118    ) -> Result<BoxStream<'a, ChannelEntry>, Self::Error> {
119        self.check_connection_state()?;
120
121        Ok(self.build_channel_stream(selector)?.boxed())
122    }
123}
124
125#[async_trait::async_trait]
126impl<B, C, P> hopr_api::chain::ChainWriteChannelOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
127where
128    B: Backend + Send + Sync + 'static,
129    C: BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
130    P: PayloadGenerator + Send + Sync + 'static,
131    P::TxRequest: Send + Sync + 'static,
132{
133    type Error = ConnectorError;
134
135    async fn open_channel<'a>(
136        &'a self,
137        dst: &'a Address,
138        amount: HoprBalance,
139    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
140        self.check_connection_state()?;
141
142        let tx_req = self.payload_generator.fund_channel(*dst, amount)?;
143        tracing::debug!( %dst, %amount, "opening channel");
144
145        Ok(self.send_tx(tx_req, None).await?.boxed())
146    }
147
148    async fn fund_channel<'a>(
149        &'a self,
150        channel_id: &'a ChannelId,
151        amount: HoprBalance,
152    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
153        self.check_connection_state()?;
154
155        use hopr_api::chain::ChainReadChannelOperations;
156
157        let channel = self
158            .channel_by_id(channel_id)
159            .await?
160            .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
161        let tx_req = self.payload_generator.fund_channel(channel.destination, amount)?;
162        tracing::debug!(%channel_id, %amount, "funding channel");
163
164        Ok(self.send_tx(tx_req, None).await?.boxed())
165    }
166
167    async fn close_channel<'a>(
168        &'a self,
169        channel_id: &'a ChannelId,
170    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
171        self.check_connection_state()?;
172
173        use hopr_api::chain::ChainReadChannelOperations;
174
175        let channel = self
176            .channel_by_id(channel_id)
177            .await?
178            .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
179
180        let direction = channel.direction(self.me()).ok_or(ConnectorError::InvalidArguments(
181            "cannot close channels that is not own",
182        ))?;
183
184        let tx_req = match channel.status {
185            ChannelStatus::Closed => return Err(ConnectorError::ChannelClosed(*channel_id)),
186            ChannelStatus::Open => {
187                if direction == ChannelDirection::Outgoing {
188                    tracing::debug!(%channel_id, "initiating outgoing channel closure");
189                    self.payload_generator
190                        .initiate_outgoing_channel_closure(channel.destination)?
191                } else {
192                    tracing::debug!(%channel_id, "closing incoming channel");
193                    self.payload_generator.close_incoming_channel(channel.source)?
194                }
195            }
196            c if c.closure_time_elapsed(&std::time::SystemTime::now()) => {
197                if direction == ChannelDirection::Outgoing {
198                    tracing::debug!(%channel_id, "finalizing outgoing channel closure");
199                    self.payload_generator
200                        .finalize_outgoing_channel_closure(channel.destination)?
201                } else {
202                    tracing::debug!(%channel_id, "closing incoming channel");
203                    self.payload_generator.close_incoming_channel(channel.source)?
204                }
205            }
206            _ => return Err(ConnectorError::InvalidState("channel closure time has not elapsed")),
207        };
208
209        Ok(self.send_tx(tx_req, None).await?.boxed())
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use std::time::Duration;
216
217    use hex_literal::hex;
218    use hopr_api::{
219        chain::{ChainReadChannelOperations, ChainWriteChannelOperations},
220        types::crypto::keypairs::{ChainKeypair, OffchainKeypair},
221    };
222
223    use super::*;
224    use crate::{
225        connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
226        testing::BlokliTestStateBuilder,
227    };
228
229    #[tokio::test]
230    async fn connector_should_get_and_stream_channels() -> anyhow::Result<()> {
231        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
232            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
233        ))?;
234        let account_1 = AccountEntry {
235            public_key: *offchain_key_1.public(),
236            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
237            entry_type: AccountType::NotAnnounced,
238            safe_address: Some([1u8; Address::SIZE].into()),
239            key_id: 1.into(),
240        };
241        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
242            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
243        ))?;
244        let account_2 = AccountEntry {
245            public_key: *offchain_key_2.public(),
246            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
247            entry_type: AccountType::NotAnnounced,
248            safe_address: Some([2u8; Address::SIZE].into()),
249            key_id: 2.into(),
250        };
251
252        let channel_1 = ChannelEntry::builder()
253            .between(
254                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
255                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
256            )
257            .amount(10)
258            .ticket_index(1)
259            .status(ChannelStatus::Open)
260            .epoch(1)
261            .build()?;
262
263        let channel_2 = ChannelEntry::builder()
264            .between(
265                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
266                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
267            )
268            .amount(15)
269            .ticket_index(2)
270            .status(ChannelStatus::PendingToClose(
271                std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10),
272            ))
273            .epoch(1)
274            .build()?;
275
276        let blokli_client = BlokliTestStateBuilder::default()
277            .with_accounts([
278                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
279                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
280            ])
281            .with_channels([channel_1, channel_2])
282            .with_hopr_network_chain_info("rotsee")
283            .build_dynamic_client(MODULE_ADDR.into());
284
285        let mut connector = create_connector(blokli_client)?;
286        connector.connect().await?;
287
288        assert_eq!(Some(channel_1), connector.channel_by_id(channel_1.get_id()).await?);
289        assert_eq!(
290            Some(channel_1),
291            connector
292                .channel_by_parties(&channel_1.source, &channel_1.destination)
293                .await?
294        );
295        assert_eq!(Some(channel_2), connector.channel_by_id(channel_2.get_id()).await?);
296        assert_eq!(
297            Some(channel_2),
298            connector
299                .channel_by_parties(&channel_2.source, &channel_2.destination)
300                .await?
301        );
302
303        assert_eq!(
304            vec![channel_1, channel_2],
305            connector
306                .stream_channels(ChannelSelector::default())
307                .await?
308                .collect::<Vec<_>>()
309                .await
310        );
311
312        assert_eq!(
313            vec![channel_1],
314            connector
315                .stream_channels(ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]))
316                .await?
317                .collect::<Vec<_>>()
318                .await
319        );
320        assert_eq!(
321            vec![channel_2],
322            connector
323                .stream_channels(
324                    ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
325                )
326                .await?
327                .collect::<Vec<_>>()
328                .await
329        );
330        assert_eq!(
331            Vec::<ChannelEntry>::new(),
332            connector
333                .stream_channels(
334                    ChannelSelector::default()
335                        .with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
336                        .with_closure_time_range(
337                            DateTime::from(std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(11))..
338                        )
339                )
340                .await?
341                .collect::<Vec<_>>()
342                .await
343        );
344
345        Ok(())
346    }
347
348    #[tokio::test]
349    async fn connector_should_open_channel() -> anyhow::Result<()> {
350        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
351            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
352        ))?;
353        let account_1 = AccountEntry {
354            public_key: *offchain_key_1.public(),
355            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
356            entry_type: AccountType::NotAnnounced,
357            safe_address: Some([1u8; Address::SIZE].into()),
358            key_id: 1.into(),
359        };
360        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
361            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
362        ))?;
363        let account_2 = AccountEntry {
364            public_key: *offchain_key_2.public(),
365            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
366            entry_type: AccountType::NotAnnounced,
367            safe_address: Some([2u8; Address::SIZE].into()),
368            key_id: 2.into(),
369        };
370
371        let blokli_client = BlokliTestStateBuilder::default()
372            .with_accounts([
373                (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
374                (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
375            ])
376            .with_hopr_network_chain_info("rotsee")
377            .build_dynamic_client(MODULE_ADDR.into());
378
379        let mut connector = create_connector(blokli_client)?;
380        connector.connect().await?;
381
382        connector.open_channel(&account_2.chain_addr, 10.into()).await?.await?;
383
384        insta::assert_yaml_snapshot!(*connector.client().snapshot());
385
386        Ok(())
387    }
388
389    #[tokio::test]
390    async fn connector_should_fund_channel() -> anyhow::Result<()> {
391        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
392            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
393        ))?;
394        let account_1 = AccountEntry {
395            public_key: *offchain_key_1.public(),
396            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
397            entry_type: AccountType::NotAnnounced,
398            safe_address: Some([1u8; Address::SIZE].into()),
399            key_id: 1.into(),
400        };
401        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
402            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
403        ))?;
404        let account_2 = AccountEntry {
405            public_key: *offchain_key_2.public(),
406            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
407            entry_type: AccountType::NotAnnounced,
408            safe_address: Some([2u8; Address::SIZE].into()),
409            key_id: 2.into(),
410        };
411
412        let channel_1 = ChannelEntry::builder()
413            .between(
414                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
415                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
416            )
417            .amount(10)
418            .ticket_index(1)
419            .status(ChannelStatus::Open)
420            .epoch(1)
421            .build()?;
422
423        let blokli_client = BlokliTestStateBuilder::default()
424            .with_accounts([
425                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
426                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
427            ])
428            .with_channels([channel_1])
429            .with_hopr_network_chain_info("rotsee")
430            .build_dynamic_client(MODULE_ADDR.into());
431
432        let mut connector = create_connector(blokli_client)?;
433        connector.connect().await?;
434
435        connector.fund_channel(channel_1.get_id(), 5.into()).await?.await?;
436
437        insta::assert_yaml_snapshot!(*connector.client().snapshot());
438
439        Ok(())
440    }
441
442    #[tokio::test]
443    async fn connector_should_initiate_channel_closure() -> anyhow::Result<()> {
444        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
445            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
446        ))?;
447        let account_1 = AccountEntry {
448            public_key: *offchain_key_1.public(),
449            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
450            entry_type: AccountType::NotAnnounced,
451            safe_address: Some([1u8; Address::SIZE].into()),
452            key_id: 1.into(),
453        };
454        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
455            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
456        ))?;
457        let account_2 = AccountEntry {
458            public_key: *offchain_key_2.public(),
459            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
460            entry_type: AccountType::NotAnnounced,
461            safe_address: Some([2u8; Address::SIZE].into()),
462            key_id: 2.into(),
463        };
464
465        let channel_1 = ChannelEntry::builder()
466            .between(
467                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
468                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
469            )
470            .amount(10)
471            .ticket_index(1)
472            .status(ChannelStatus::Open)
473            .epoch(1)
474            .build()?;
475
476        let blokli_client = BlokliTestStateBuilder::default()
477            .with_accounts([
478                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
479                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
480            ])
481            .with_channels([channel_1])
482            .with_hopr_network_chain_info("rotsee")
483            .build_dynamic_client(MODULE_ADDR.into());
484
485        let mut connector = create_connector(blokli_client)?;
486        connector.connect().await?;
487
488        connector.close_channel(channel_1.get_id()).await?.await?;
489
490        let mut snapshot = (*connector.client().snapshot()).clone();
491
492        // Replace the closure time value to make the snapshot deterministic
493        snapshot
494            .channels
495            .get_mut(&hex::encode(channel_1.get_id()))
496            .unwrap()
497            .closure_time = Some(blokli_client::api::types::DateTime("dummy".into()));
498
499        insta::assert_yaml_snapshot!(snapshot);
500
501        Ok(())
502    }
503
504    #[tokio::test]
505    async fn connector_should_finalize_channel_closure() -> anyhow::Result<()> {
506        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
507            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
508        ))?;
509        let account_1 = AccountEntry {
510            public_key: *offchain_key_1.public(),
511            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
512            entry_type: AccountType::NotAnnounced,
513            safe_address: Some([1u8; Address::SIZE].into()),
514            key_id: 1.into(),
515        };
516        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
517            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
518        ))?;
519        let account_2 = AccountEntry {
520            public_key: *offchain_key_2.public(),
521            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
522            entry_type: AccountType::NotAnnounced,
523            safe_address: Some([2u8; Address::SIZE].into()),
524            key_id: 2.into(),
525        };
526
527        let channel_1 = ChannelEntry::builder()
528            .between(
529                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
530                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
531            )
532            .amount(10)
533            .ticket_index(1)
534            .status(ChannelStatus::PendingToClose(
535                std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10),
536            ))
537            .epoch(1)
538            .build()?;
539
540        let blokli_client = BlokliTestStateBuilder::default()
541            .with_accounts([
542                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
543                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
544            ])
545            .with_channels([channel_1])
546            .with_hopr_network_chain_info("rotsee")
547            .build_dynamic_client(MODULE_ADDR.into());
548
549        let mut connector = create_connector(blokli_client)?;
550        connector.connect().await?;
551
552        connector.close_channel(channel_1.get_id()).await?.await?;
553
554        insta::assert_yaml_snapshot!(*connector.client().snapshot());
555
556        Ok(())
557    }
558
559    #[tokio::test]
560    async fn connector_should_close_incoming_channel() -> anyhow::Result<()> {
561        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
562            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
563        ))?;
564        let account_1 = AccountEntry {
565            public_key: *offchain_key_1.public(),
566            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
567            entry_type: AccountType::NotAnnounced,
568            safe_address: Some([1u8; Address::SIZE].into()),
569            key_id: 1.into(),
570        };
571        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
572            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
573        ))?;
574        let account_2 = AccountEntry {
575            public_key: *offchain_key_2.public(),
576            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
577            entry_type: AccountType::NotAnnounced,
578            safe_address: Some([2u8; Address::SIZE].into()),
579            key_id: 2.into(),
580        };
581
582        let channel_1 = ChannelEntry::builder()
583            .between(
584                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
585                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
586            )
587            .amount(10)
588            .ticket_index(1)
589            .status(ChannelStatus::Open)
590            .epoch(1)
591            .build()?;
592
593        let blokli_client = BlokliTestStateBuilder::default()
594            .with_accounts([
595                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
596                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
597            ])
598            .with_channels([channel_1])
599            .with_hopr_network_chain_info("rotsee")
600            .build_dynamic_client(MODULE_ADDR.into());
601
602        let mut connector = create_connector(blokli_client)?;
603        connector.connect().await?;
604
605        connector.close_channel(channel_1.get_id()).await?.await?;
606
607        insta::assert_yaml_snapshot!(*connector.client().snapshot());
608
609        Ok(())
610    }
611}