hopr_chain_connector/connector/
channels.rs

1use blokli_client::api::{BlokliQueryClient, BlokliTransactionClient};
2use futures::{FutureExt, StreamExt, TryFutureExt, future::BoxFuture, stream::BoxStream};
3use hopr_api::chain::{ChainReceipt, ChannelSelector};
4use hopr_chain_types::prelude::*;
5use hopr_crypto_types::prelude::Keypair;
6use hopr_internal_types::prelude::*;
7use hopr_primitive_types::prelude::*;
8
9use crate::{backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
10
11impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
12where
13    B: Backend + Send + Sync + 'static,
14{
15    pub(crate) fn build_channel_stream(
16        &self,
17        selector: ChannelSelector,
18    ) -> Result<impl futures::Stream<Item = ChannelEntry> + Send + 'static, ConnectorError> {
19        // Note: Since the graph does not contain Closed channels, they cannot
20        // be selected if requested solely via the ChannelSelector.
21        if selector.allowed_states == [ChannelStatusDiscriminants::Closed] {
22            return Err(ConnectorError::InvalidArguments("cannot stream closed channels only"));
23        }
24
25        let channels = self
26            .graph
27            .read()
28            .all_edges()
29            .map(|(_, _, e)| e)
30            .copied()
31            .collect::<Vec<_>>();
32
33        let backend = self.backend.clone();
34        Ok(futures::stream::iter(channels).filter_map(move |channel_id| {
35            let backend = backend.clone();
36            let selector = selector.clone();
37            // This avoids the cache on purpose so it does not get spammed
38            async move {
39                match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
40                {
41                    Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
42                    Ok(Err(error)) => {
43                        tracing::error!(%error, %channel_id, "backend error when looking up channel");
44                        None
45                    }
46                    Err(error) => {
47                        tracing::error!(%error, %channel_id, "join error when looking up channel");
48                        None
49                    }
50                }
51            }
52        }))
53    }
54}
55
56#[async_trait::async_trait]
57impl<B, C, P, R> hopr_api::chain::ChainReadChannelOperations for HoprBlockchainConnector<C, B, P, R>
58where
59    B: Backend + Send + Sync + 'static,
60    C: Send + Sync,
61    P: Send + Sync,
62    R: Send + Sync,
63{
64    type Error = ConnectorError;
65
66    fn me(&self) -> &Address {
67        self.chain_key.public().as_ref()
68    }
69
70    async fn channel_by_parties(&self, src: &Address, dst: &Address) -> Result<Option<ChannelEntry>, Self::Error> {
71        self.check_connection_state()?;
72
73        let backend = self.backend.clone();
74        let src = *src;
75        let dst = *dst;
76        Ok(self
77            .channel_by_parties
78            .try_get_with(ChannelParties::new(src, dst), async move {
79                tracing::warn!(%src, %dst, "cache miss on channel_by_parties");
80                match hopr_async_runtime::prelude::spawn_blocking(move || {
81                    let channel_id = generate_channel_id(&src, &dst);
82                    backend.get_channel_by_id(&channel_id)
83                })
84                .await
85                {
86                    Ok(Ok(value)) => Ok(value),
87                    Ok(Err(e)) => Err(ConnectorError::BackendError(e.into())),
88                    Err(e) => Err(ConnectorError::BackendError(e.into())),
89                }
90            })
91            .await?)
92    }
93
94    async fn channel_by_id(&self, channel_id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error> {
95        self.check_connection_state()?;
96
97        let channel_id = *channel_id;
98        let backend = self.backend.clone();
99        Ok(self
100            .channel_by_id
101            .try_get_with_by_ref(&channel_id, async move {
102                tracing::warn!(%channel_id, "cache miss on channel_by_id");
103                match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
104                {
105                    Ok(Ok(value)) => Ok(value),
106                    Ok(Err(e)) => Err(ConnectorError::BackendError(e.into())),
107                    Err(e) => Err(ConnectorError::BackendError(e.into())),
108                }
109            })
110            .await?)
111    }
112
113    async fn stream_channels<'a>(
114        &'a self,
115        selector: ChannelSelector,
116    ) -> Result<BoxStream<'a, ChannelEntry>, Self::Error> {
117        self.check_connection_state()?;
118
119        Ok(self.build_channel_stream(selector)?.boxed())
120    }
121}
122
123#[async_trait::async_trait]
124impl<B, C, P> hopr_api::chain::ChainWriteChannelOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
125where
126    B: Backend + Send + Sync + 'static,
127    C: BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
128    P: PayloadGenerator + Send + Sync + 'static,
129    P::TxRequest: Send + Sync + 'static,
130{
131    type Error = ConnectorError;
132
133    async fn open_channel<'a>(
134        &'a self,
135        dst: &'a Address,
136        amount: HoprBalance,
137    ) -> Result<BoxFuture<'a, Result<(ChannelId, ChainReceipt), Self::Error>>, Self::Error> {
138        self.check_connection_state()?;
139
140        let id = generate_channel_id(self.chain_key.public().as_ref(), dst);
141        let tx_req = self.payload_generator.fund_channel(*dst, amount)?;
142        tracing::debug!(channel_id = %id, %dst, %amount, "opening channel");
143
144        Ok(self
145            .send_tx(tx_req)
146            .await?
147            .and_then(move |tx_hash| futures::future::ok((id, tx_hash)))
148            .boxed())
149    }
150
151    async fn fund_channel<'a>(
152        &'a self,
153        channel_id: &'a ChannelId,
154        amount: HoprBalance,
155    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
156        self.check_connection_state()?;
157
158        use hopr_api::chain::ChainReadChannelOperations;
159
160        let channel = self
161            .channel_by_id(channel_id)
162            .await?
163            .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
164        let tx_req = self.payload_generator.fund_channel(channel.destination, amount)?;
165        tracing::debug!(%channel_id, %amount, "funding channel");
166
167        Ok(self.send_tx(tx_req).await?.boxed())
168    }
169
170    async fn close_channel<'a>(
171        &'a self,
172        channel_id: &'a ChannelId,
173    ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
174        self.check_connection_state()?;
175
176        use hopr_api::chain::ChainReadChannelOperations;
177
178        let channel = self
179            .channel_by_id(channel_id)
180            .await?
181            .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
182
183        let direction = channel.direction(self.me()).ok_or(ConnectorError::InvalidArguments(
184            "cannot close channels that is not own",
185        ))?;
186
187        let tx_req = match channel.status {
188            ChannelStatus::Closed => return Err(ConnectorError::ChannelClosed(*channel_id)),
189            ChannelStatus::Open => {
190                if direction == ChannelDirection::Outgoing {
191                    tracing::debug!(%channel_id, "initiating outgoing channel closure");
192                    self.payload_generator
193                        .initiate_outgoing_channel_closure(channel.destination)?
194                } else {
195                    tracing::debug!(%channel_id, "closing incoming channel");
196                    self.payload_generator.close_incoming_channel(channel.source)?
197                }
198            }
199            c if c.closure_time_elapsed(&std::time::SystemTime::now()) => {
200                if direction == ChannelDirection::Outgoing {
201                    tracing::debug!(%channel_id, "finalizing outgoing channel closure");
202                    self.payload_generator
203                        .finalize_outgoing_channel_closure(channel.destination)?
204                } else {
205                    tracing::debug!(%channel_id, "closing incoming channel");
206                    self.payload_generator.close_incoming_channel(channel.source)?
207                }
208            }
209            _ => return Err(ConnectorError::InvalidState("channel closure time has not elapsed")),
210        };
211
212        Ok(self.send_tx(tx_req).await?.boxed())
213    }
214}