Skip to main content

hopr_chain_connector/connector/
channels.rs

1use blokli_client::api::{BlokliQueryClient, BlokliTransactionClient};
2use futures::{FutureExt, StreamExt, future::BoxFuture, stream::BoxStream};
3use hopr_api::{
4    chain::{ChainReceipt, ChannelSelector},
5    types::{chain::prelude::*, crypto::prelude::Keypair, internal::prelude::*, primitive::prelude::*},
6};
7
8use crate::{backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
9
10impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
11where
12    B: Backend + Send + Sync + 'static,
13{
14    pub(crate) fn build_channel_stream(
15        &self,
16        selector: ChannelSelector,
17    ) -> Result<impl futures::Stream<Item = ChannelEntry> + Send + 'static, ConnectorError> {
18        // Note: Since the graph does not contain Closed channels, they cannot
19        // be selected if requested solely via the ChannelSelector.
20        if selector.allowed_states == [ChannelStatusDiscriminants::Closed] {
21            return Err(ConnectorError::InvalidArguments("cannot stream closed channels only"));
22        }
23
24        let mut channels = self
25            .graph
26            .read()
27            .all_edges()
28            .map(|(_, _, e)| e)
29            .copied()
30            .collect::<Vec<_>>();
31
32        // Ensure the returned channels are always perfectly ordered by their id.
33        channels.sort_unstable();
34
35        let backend = self.backend.clone();
36        Ok(futures::stream::iter(channels).filter_map(move |channel_id| {
37            let backend = backend.clone();
38            let selector = selector.clone();
39            // This avoids the cache on purpose so it does not get spammed
40            async move {
41                match hopr_utils::runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
42                {
43                    Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
44                    Ok(Err(error)) => {
45                        tracing::error!(%error, %channel_id, "backend error when looking up channel");
46                        None
47                    }
48                    Err(error) => {
49                        tracing::error!(%error, %channel_id, "join error when looking up channel");
50                        None
51                    }
52                }
53            }
54        }))
55    }
56}
57
58async fn channel_by_id_async<B: Backend + Send + Sync + 'static>(
59    channel_by_id: moka::sync::Cache<ChannelId, Option<ChannelEntry>, ahash::RandomState>,
60    backend: std::sync::Arc<B>,
61    channel_id: ChannelId,
62) -> Result<Option<ChannelEntry>, ConnectorError> {
63    Ok(hopr_utils::runtime::prelude::spawn_blocking(move || {
64        channel_by_id.try_get_with_by_ref(&channel_id, || {
65            tracing::warn!(%channel_id, "cache miss on channel_by_id");
66            backend.get_channel_by_id(&channel_id).map_err(ConnectorError::backend)
67        })
68    })
69    .await
70    .map_err(ConnectorError::backend)??)
71}
72
73#[async_trait::async_trait]
74impl<B, C, P, R> hopr_api::chain::ChainReadChannelOperations for HoprBlockchainConnector<C, B, P, R>
75where
76    B: Backend + Send + Sync + 'static,
77    C: Send + Sync,
78    P: Send + Sync,
79    R: Send + Sync,
80{
81    type Error = ConnectorError;
82
83    fn me(&self) -> &Address {
84        self.chain_key.public().as_ref()
85    }
86
87    fn channel_by_parties(&self, src: &Address, dst: &Address) -> Result<Option<ChannelEntry>, Self::Error> {
88        self.check_connection_state()?;
89
90        let src = *src;
91        let dst = *dst;
92        Ok(self
93            .channel_by_parties
94            .try_get_with(ChannelParties::new(src, dst), || {
95                tracing::warn!(%src, %dst, "cache miss on channel_by_parties");
96                let channel_id = generate_channel_id(&src, &dst);
97                self.backend
98                    .get_channel_by_id(&channel_id)
99                    .map_err(ConnectorError::backend)
100            })?)
101    }
102
103    fn channel_by_id(&self, channel_id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error> {
104        self.check_connection_state()?;
105
106        Ok(self.channel_by_id.try_get_with_by_ref(channel_id, || {
107            tracing::warn!(%channel_id, "cache miss on channel_by_id");
108            self.backend
109                .get_channel_by_id(channel_id)
110                .map_err(ConnectorError::backend)
111        })?)
112    }
113
114    fn stream_channels(&self, selector: ChannelSelector) -> Result<BoxStream<'_, ChannelEntry>, Self::Error> {
115        self.check_connection_state()?;
116
117        Ok(self.build_channel_stream(selector)?.boxed())
118    }
119}
120
121#[async_trait::async_trait]
122impl<B, C, P> hopr_api::chain::ChainWriteChannelOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
123where
124    B: Backend + Send + Sync + 'static,
125    C: BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
126    P: PayloadGenerator + Send + Sync + 'static,
127    P::TxRequest: Send + Sync + 'static,
128{
129    type Error = ConnectorError;
130
131    async fn open_channel<'a>(
132        &'a self,
133        dst: &'a Address,
134        amount: HoprBalance,
135    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
136        self.check_connection_state()?;
137
138        let tx_req = self.payload_generator.fund_channel(*dst, amount)?;
139        tracing::debug!( %dst, %amount, "opening channel");
140
141        Ok(self.send_tx(tx_req, None).await?.boxed())
142    }
143
144    async fn fund_channel<'a>(
145        &'a self,
146        channel_id: &'a ChannelId,
147        amount: HoprBalance,
148    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
149        self.check_connection_state()?;
150
151        let channel = channel_by_id_async(self.channel_by_id.clone(), self.backend.clone(), *channel_id)
152            .await?
153            .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
154
155        let tx_req = self.payload_generator.fund_channel(channel.destination, amount)?;
156        tracing::debug!(%channel_id, %amount, "funding channel");
157
158        Ok(self.send_tx(tx_req, None).await?.boxed())
159    }
160
161    async fn close_channel<'a>(
162        &'a self,
163        channel_id: &'a ChannelId,
164    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
165        self.check_connection_state()?;
166
167        use hopr_api::chain::ChainReadChannelOperations;
168
169        let channel = channel_by_id_async(self.channel_by_id.clone(), self.backend.clone(), *channel_id)
170            .await?
171            .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
172
173        let direction = channel.direction(self.me()).ok_or(ConnectorError::InvalidArguments(
174            "cannot close channels that is not own",
175        ))?;
176
177        let tx_req = match channel.status {
178            ChannelStatus::Closed => return Err(ConnectorError::ChannelClosed(*channel_id)),
179            ChannelStatus::Open => {
180                if direction == ChannelDirection::Outgoing {
181                    tracing::debug!(%channel_id, "initiating outgoing channel closure");
182                    self.payload_generator
183                        .initiate_outgoing_channel_closure(channel.destination)?
184                } else {
185                    tracing::debug!(%channel_id, "closing incoming channel");
186                    self.payload_generator.close_incoming_channel(channel.source)?
187                }
188            }
189            c if c.closure_time_elapsed(&std::time::SystemTime::now()) => {
190                if direction == ChannelDirection::Outgoing {
191                    tracing::debug!(%channel_id, "finalizing outgoing channel closure");
192                    self.payload_generator
193                        .finalize_outgoing_channel_closure(channel.destination)?
194                } else {
195                    tracing::debug!(%channel_id, "closing incoming channel");
196                    self.payload_generator.close_incoming_channel(channel.source)?
197                }
198            }
199            _ => return Err(ConnectorError::InvalidState("channel closure time has not elapsed")),
200        };
201
202        Ok(self.send_tx(tx_req, None).await?.boxed())
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use std::time::Duration;
209
210    use hex_literal::hex;
211    use hopr_api::{
212        chain::{ChainReadChannelOperations, ChainWriteChannelOperations},
213        types::crypto::keypairs::{ChainKeypair, OffchainKeypair},
214    };
215
216    use super::*;
217    use crate::{
218        connector::tests::{MODULE_ADDR, PRIVATE_KEY_1, PRIVATE_KEY_2, create_connector},
219        testing::BlokliTestStateBuilder,
220    };
221
222    #[tokio::test]
223    async fn connector_should_get_and_stream_channels() -> anyhow::Result<()> {
224        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
225            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
226        ))?;
227        let account_1 = AccountEntry {
228            public_key: *offchain_key_1.public(),
229            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
230            entry_type: AccountType::NotAnnounced,
231            safe_address: Some([1u8; Address::SIZE].into()),
232            key_id: 1.into(),
233        };
234        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
235            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
236        ))?;
237        let account_2 = AccountEntry {
238            public_key: *offchain_key_2.public(),
239            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
240            entry_type: AccountType::NotAnnounced,
241            safe_address: Some([2u8; Address::SIZE].into()),
242            key_id: 2.into(),
243        };
244
245        let channel_1 = ChannelEntry::builder()
246            .between(
247                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
248                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
249            )
250            .amount(10)
251            .ticket_index(1)
252            .status(ChannelStatus::Open)
253            .epoch(1)
254            .build()?;
255
256        let channel_2 = ChannelEntry::builder()
257            .between(
258                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
259                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
260            )
261            .amount(15)
262            .ticket_index(2)
263            .status(ChannelStatus::PendingToClose(
264                std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10),
265            ))
266            .epoch(1)
267            .build()?;
268
269        let blokli_client = BlokliTestStateBuilder::default()
270            .with_accounts([
271                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
272                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
273            ])
274            .with_channels([channel_1, channel_2])
275            .with_hopr_network_chain_info("rotsee")
276            .build_dynamic_client(MODULE_ADDR.into());
277
278        let mut connector = create_connector(blokli_client)?;
279        connector.connect().await?;
280
281        assert_eq!(Some(channel_1), connector.channel_by_id(channel_1.get_id())?);
282        assert_eq!(
283            Some(channel_1),
284            connector.channel_by_parties(&channel_1.source, &channel_1.destination)?
285        );
286        assert_eq!(Some(channel_2), connector.channel_by_id(channel_2.get_id())?);
287        assert_eq!(
288            Some(channel_2),
289            connector.channel_by_parties(&channel_2.source, &channel_2.destination)?
290        );
291
292        assert_eq!(
293            vec![channel_1, channel_2],
294            connector
295                .stream_channels(ChannelSelector::default())?
296                .collect::<Vec<_>>()
297                .await
298        );
299
300        assert_eq!(
301            vec![channel_1],
302            connector
303                .stream_channels(ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::Open]))?
304                .collect::<Vec<_>>()
305                .await
306        );
307        assert_eq!(
308            vec![channel_2],
309            connector
310                .stream_channels(
311                    ChannelSelector::default().with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
312                )?
313                .collect::<Vec<_>>()
314                .await
315        );
316        assert_eq!(
317            Vec::<ChannelEntry>::new(),
318            connector
319                .stream_channels(
320                    ChannelSelector::default()
321                        .with_allowed_states(&[ChannelStatusDiscriminants::PendingToClose])
322                        .with_closure_time_range(
323                            DateTime::from(std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(11))..
324                        )
325                )?
326                .collect::<Vec<_>>()
327                .await
328        );
329
330        Ok(())
331    }
332
333    #[tokio::test]
334    async fn connector_should_open_channel() -> anyhow::Result<()> {
335        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
336            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
337        ))?;
338        let account_1 = AccountEntry {
339            public_key: *offchain_key_1.public(),
340            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
341            entry_type: AccountType::NotAnnounced,
342            safe_address: Some([1u8; Address::SIZE].into()),
343            key_id: 1.into(),
344        };
345        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
346            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
347        ))?;
348        let account_2 = AccountEntry {
349            public_key: *offchain_key_2.public(),
350            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
351            entry_type: AccountType::NotAnnounced,
352            safe_address: Some([2u8; Address::SIZE].into()),
353            key_id: 2.into(),
354        };
355
356        let blokli_client = BlokliTestStateBuilder::default()
357            .with_accounts([
358                (account_1.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
359                (account_2.clone(), HoprBalance::new_base(100), XDaiBalance::new_base(1)),
360            ])
361            .with_hopr_network_chain_info("rotsee")
362            .build_dynamic_client(MODULE_ADDR.into());
363
364        let mut connector = create_connector(blokli_client)?;
365        connector.connect().await?;
366
367        connector.open_channel(&account_2.chain_addr, 10.into()).await?.await?;
368
369        insta::assert_yaml_snapshot!(*connector.client().snapshot());
370
371        Ok(())
372    }
373
374    #[tokio::test]
375    async fn connector_should_fund_channel() -> anyhow::Result<()> {
376        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
377            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
378        ))?;
379        let account_1 = AccountEntry {
380            public_key: *offchain_key_1.public(),
381            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
382            entry_type: AccountType::NotAnnounced,
383            safe_address: Some([1u8; Address::SIZE].into()),
384            key_id: 1.into(),
385        };
386        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
387            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
388        ))?;
389        let account_2 = AccountEntry {
390            public_key: *offchain_key_2.public(),
391            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
392            entry_type: AccountType::NotAnnounced,
393            safe_address: Some([2u8; Address::SIZE].into()),
394            key_id: 2.into(),
395        };
396
397        let channel_1 = ChannelEntry::builder()
398            .between(
399                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
400                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
401            )
402            .amount(10)
403            .ticket_index(1)
404            .status(ChannelStatus::Open)
405            .epoch(1)
406            .build()?;
407
408        let blokli_client = BlokliTestStateBuilder::default()
409            .with_accounts([
410                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
411                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
412            ])
413            .with_channels([channel_1])
414            .with_hopr_network_chain_info("rotsee")
415            .build_dynamic_client(MODULE_ADDR.into());
416
417        let mut connector = create_connector(blokli_client)?;
418        connector.connect().await?;
419
420        connector.fund_channel(channel_1.get_id(), 5.into()).await?.await?;
421
422        insta::assert_yaml_snapshot!(*connector.client().snapshot());
423
424        Ok(())
425    }
426
427    #[tokio::test]
428    async fn connector_should_initiate_channel_closure() -> anyhow::Result<()> {
429        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
430            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
431        ))?;
432        let account_1 = AccountEntry {
433            public_key: *offchain_key_1.public(),
434            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
435            entry_type: AccountType::NotAnnounced,
436            safe_address: Some([1u8; Address::SIZE].into()),
437            key_id: 1.into(),
438        };
439        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
440            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
441        ))?;
442        let account_2 = AccountEntry {
443            public_key: *offchain_key_2.public(),
444            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
445            entry_type: AccountType::NotAnnounced,
446            safe_address: Some([2u8; Address::SIZE].into()),
447            key_id: 2.into(),
448        };
449
450        let channel_1 = ChannelEntry::builder()
451            .between(
452                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
453                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
454            )
455            .amount(10)
456            .ticket_index(1)
457            .status(ChannelStatus::Open)
458            .epoch(1)
459            .build()?;
460
461        let blokli_client = BlokliTestStateBuilder::default()
462            .with_accounts([
463                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
464                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
465            ])
466            .with_channels([channel_1])
467            .with_hopr_network_chain_info("rotsee")
468            .build_dynamic_client(MODULE_ADDR.into());
469
470        let mut connector = create_connector(blokli_client)?;
471        connector.connect().await?;
472
473        connector.close_channel(channel_1.get_id()).await?.await?;
474
475        let mut snapshot = (*connector.client().snapshot()).clone();
476
477        // Replace the closure time value to make the snapshot deterministic
478        snapshot
479            .channels
480            .get_mut(&hex::encode(channel_1.get_id()))
481            .unwrap()
482            .closure_time = Some(blokli_client::api::types::DateTime("dummy".into()));
483
484        insta::assert_yaml_snapshot!(snapshot);
485
486        Ok(())
487    }
488
489    #[tokio::test]
490    async fn connector_should_finalize_channel_closure() -> anyhow::Result<()> {
491        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
492            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
493        ))?;
494        let account_1 = AccountEntry {
495            public_key: *offchain_key_1.public(),
496            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
497            entry_type: AccountType::NotAnnounced,
498            safe_address: Some([1u8; Address::SIZE].into()),
499            key_id: 1.into(),
500        };
501        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
502            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
503        ))?;
504        let account_2 = AccountEntry {
505            public_key: *offchain_key_2.public(),
506            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
507            entry_type: AccountType::NotAnnounced,
508            safe_address: Some([2u8; Address::SIZE].into()),
509            key_id: 2.into(),
510        };
511
512        let channel_1 = ChannelEntry::builder()
513            .between(
514                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
515                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
516            )
517            .amount(10)
518            .ticket_index(1)
519            .status(ChannelStatus::PendingToClose(
520                std::time::SystemTime::UNIX_EPOCH + Duration::from_mins(10),
521            ))
522            .epoch(1)
523            .build()?;
524
525        let blokli_client = BlokliTestStateBuilder::default()
526            .with_accounts([
527                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
528                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
529            ])
530            .with_channels([channel_1])
531            .with_hopr_network_chain_info("rotsee")
532            .build_dynamic_client(MODULE_ADDR.into());
533
534        let mut connector = create_connector(blokli_client)?;
535        connector.connect().await?;
536
537        connector.close_channel(channel_1.get_id()).await?.await?;
538
539        insta::assert_yaml_snapshot!(*connector.client().snapshot());
540
541        Ok(())
542    }
543
544    #[tokio::test]
545    async fn connector_should_close_incoming_channel() -> anyhow::Result<()> {
546        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
547            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
548        ))?;
549        let account_1 = AccountEntry {
550            public_key: *offchain_key_1.public(),
551            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
552            entry_type: AccountType::NotAnnounced,
553            safe_address: Some([1u8; Address::SIZE].into()),
554            key_id: 1.into(),
555        };
556        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
557            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
558        ))?;
559        let account_2 = AccountEntry {
560            public_key: *offchain_key_2.public(),
561            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
562            entry_type: AccountType::NotAnnounced,
563            safe_address: Some([2u8; Address::SIZE].into()),
564            key_id: 2.into(),
565        };
566
567        let channel_1 = ChannelEntry::builder()
568            .between(
569                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
570                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
571            )
572            .amount(10)
573            .ticket_index(1)
574            .status(ChannelStatus::Open)
575            .epoch(1)
576            .build()?;
577
578        let blokli_client = BlokliTestStateBuilder::default()
579            .with_accounts([
580                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
581                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
582            ])
583            .with_channels([channel_1])
584            .with_hopr_network_chain_info("rotsee")
585            .build_dynamic_client(MODULE_ADDR.into());
586
587        let mut connector = create_connector(blokli_client)?;
588        connector.connect().await?;
589
590        connector.close_channel(channel_1.get_id()).await?.await?;
591
592        insta::assert_yaml_snapshot!(*connector.client().snapshot());
593
594        Ok(())
595    }
596
597    #[tokio::test]
598    async fn connector_should_not_close_non_existing_channel() -> anyhow::Result<()> {
599        let blokli_client = BlokliTestStateBuilder::default()
600            .with_hopr_network_chain_info("rotsee")
601            .build_dynamic_client(MODULE_ADDR.into());
602
603        let mut connector = create_connector(blokli_client)?;
604        connector.connect().await?;
605
606        let channel_id = ChannelId::from([1u8; ChannelId::SIZE]);
607        let result = connector.close_channel(&channel_id).await;
608
609        match result {
610            Err(e) => {
611                let err_msg = e.to_string();
612                assert!(
613                    err_msg.contains("does not exist"),
614                    "Expected 'does not exist' error, got: {}",
615                    err_msg
616                );
617            }
618            _ => panic!("Expected error when closing non-existing channel"),
619        }
620
621        Ok(())
622    }
623
624    #[tokio::test]
625    async fn connector_should_not_close_already_closed_channel() -> anyhow::Result<()> {
626        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
627            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
628        ))?;
629        let account_1 = AccountEntry {
630            public_key: *offchain_key_1.public(),
631            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
632            entry_type: AccountType::NotAnnounced,
633            safe_address: Some([1u8; Address::SIZE].into()),
634            key_id: 1.into(),
635        };
636        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
637            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
638        ))?;
639        let account_2 = AccountEntry {
640            public_key: *offchain_key_2.public(),
641            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
642            entry_type: AccountType::NotAnnounced,
643            safe_address: Some([2u8; Address::SIZE].into()),
644            key_id: 2.into(),
645        };
646
647        let channel_1 = ChannelEntry::builder()
648            .between(
649                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
650                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
651            )
652            .amount(10)
653            .ticket_index(1)
654            .status(ChannelStatus::Closed)
655            .epoch(1)
656            .build()?;
657
658        let blokli_client = BlokliTestStateBuilder::default()
659            .with_accounts([
660                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
661                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
662            ])
663            .with_channels([channel_1])
664            .with_hopr_network_chain_info("rotsee")
665            .build_dynamic_client(MODULE_ADDR.into());
666
667        let mut connector = create_connector(blokli_client)?;
668        connector.connect().await?;
669
670        let result = connector.close_channel(channel_1.get_id()).await;
671
672        match result {
673            Err(e) => {
674                let err_msg = e.to_string();
675                assert!(
676                    err_msg.contains("is closed"),
677                    "Expected 'is closed' error, got: {}",
678                    err_msg
679                );
680            }
681            _ => panic!("Expected error when closing already closed channel"),
682        }
683
684        Ok(())
685    }
686
687    #[tokio::test]
688    async fn connector_should_not_close_unowned_channel() -> anyhow::Result<()> {
689        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
690            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
691        ))?;
692        let account_2 = AccountEntry {
693            public_key: *offchain_key_2.public(),
694            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
695            entry_type: AccountType::NotAnnounced,
696            safe_address: Some([2u8; Address::SIZE].into()),
697            key_id: 2.into(),
698        };
699        let _offchain_key_3 = OffchainKeypair::from_secret(&hex!(
700            "0000000000000000000000000000000000000000000000000000000000000003"
701        ))?;
702        let account_3 = AccountEntry {
703            public_key: *_offchain_key_3.public(),
704            chain_addr: ChainKeypair::from_secret(&hex!(
705                "0000000000000000000000000000000000000000000000000000000000000003"
706            ))?
707            .public()
708            .to_address(),
709            entry_type: AccountType::NotAnnounced,
710            safe_address: Some([3u8; Address::SIZE].into()),
711            key_id: 3.into(),
712        };
713
714        // node is using PRIVATE_KEY_1, so it owns account_1
715        // this channel is between PRIVATE_KEY_2 and PRIVATE_KEY_3
716        let channel_1 = ChannelEntry::builder()
717            .between(
718                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
719                &ChainKeypair::from_secret(&hex!(
720                    "0000000000000000000000000000000000000000000000000000000000000003"
721                ))?,
722            )
723            .amount(10)
724            .ticket_index(1)
725            .status(ChannelStatus::Open)
726            .epoch(1)
727            .build()?;
728
729        let blokli_client = BlokliTestStateBuilder::default()
730            .with_accounts([
731                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
732                (account_3, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
733            ])
734            .with_channels([channel_1])
735            .with_hopr_network_chain_info("rotsee")
736            .build_dynamic_client(MODULE_ADDR.into());
737
738        let mut connector = create_connector(blokli_client)?;
739        connector.connect().await?;
740
741        let result = connector.close_channel(channel_1.get_id()).await;
742
743        match result {
744            Err(e) => {
745                let err_msg = e.to_string();
746                assert!(
747                    err_msg.contains("cannot close channels that is not own"),
748                    "Expected ownership error, got: {}",
749                    err_msg
750                );
751            }
752            _ => panic!("Expected error when closing unowned channel"),
753        }
754
755        Ok(())
756    }
757
758    #[tokio::test]
759    async fn connector_should_not_fund_non_existing_channel() -> anyhow::Result<()> {
760        let blokli_client = BlokliTestStateBuilder::default()
761            .with_hopr_network_chain_info("rotsee")
762            .build_dynamic_client(MODULE_ADDR.into());
763
764        let mut connector = create_connector(blokli_client)?;
765        connector.connect().await?;
766
767        let channel_id = ChannelId::from([1u8; ChannelId::SIZE]);
768        let result = connector.fund_channel(&channel_id, 10.into()).await;
769
770        match result {
771            Err(e) => {
772                let err_msg = e.to_string();
773                assert!(
774                    err_msg.contains("does not exist"),
775                    "Expected 'does not exist' error, got: {}",
776                    err_msg
777                );
778            }
779            _ => panic!("Expected error when funding non-existing channel"),
780        }
781
782        Ok(())
783    }
784
785    #[tokio::test]
786    async fn connector_should_not_finalize_channel_closure_before_time() -> anyhow::Result<()> {
787        let offchain_key_1 = OffchainKeypair::from_secret(&hex!(
788            "60741b83b99e36aa0c1331578156e16b8e21166d01834abb6c64b103f885734d"
789        ))?;
790        let account_1 = AccountEntry {
791            public_key: *offchain_key_1.public(),
792            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_1)?.public().to_address(),
793            entry_type: AccountType::NotAnnounced,
794            safe_address: Some([1u8; Address::SIZE].into()),
795            key_id: 1.into(),
796        };
797        let offchain_key_2 = OffchainKeypair::from_secret(&hex!(
798            "71bf1f42ebbfcd89c3e197a3fd7cda79b92499e509b6fefa0fe44d02821d146a"
799        ))?;
800        let account_2 = AccountEntry {
801            public_key: *offchain_key_2.public(),
802            chain_addr: ChainKeypair::from_secret(&PRIVATE_KEY_2)?.public().to_address(),
803            entry_type: AccountType::NotAnnounced,
804            safe_address: Some([2u8; Address::SIZE].into()),
805            key_id: 2.into(),
806        };
807
808        let channel_1 = ChannelEntry::builder()
809            .between(
810                &ChainKeypair::from_secret(&PRIVATE_KEY_1)?,
811                &ChainKeypair::from_secret(&PRIVATE_KEY_2)?,
812            )
813            .amount(10)
814            .ticket_index(1)
815            .status(ChannelStatus::PendingToClose(
816                std::time::SystemTime::now() + Duration::from_secs(3600),
817            ))
818            .epoch(1)
819            .build()?;
820
821        let blokli_client = BlokliTestStateBuilder::default()
822            .with_accounts([
823                (account_1, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
824                (account_2, HoprBalance::new_base(100), XDaiBalance::new_base(1)),
825            ])
826            .with_channels([channel_1])
827            .with_hopr_network_chain_info("rotsee")
828            .build_dynamic_client(MODULE_ADDR.into());
829
830        let mut connector = create_connector(blokli_client)?;
831        connector.connect().await?;
832
833        let result = connector.close_channel(channel_1.get_id()).await;
834
835        match result {
836            Err(e) => {
837                let err_msg = e.to_string();
838                assert!(
839                    err_msg.contains("channel closure time has not elapsed"),
840                    "Expected time elapsed error, got: {}",
841                    err_msg
842                );
843            }
844            _ => panic!("Expected error when closing channel before time elapsed"),
845        }
846
847        Ok(())
848    }
849}