hopr_chain_connector/connector/
channels.rs1use blokli_client::api::{BlokliQueryClient, BlokliTransactionClient};
2use futures::{FutureExt, StreamExt, TryFutureExt, future::BoxFuture, stream::BoxStream};
3use hopr_api::chain::{ChainReceipt, ChannelSelector};
4use hopr_chain_types::prelude::*;
5use hopr_crypto_types::prelude::Keypair;
6use hopr_internal_types::prelude::*;
7use hopr_primitive_types::prelude::*;
8
9use crate::{backend::Backend, connector::HoprBlockchainConnector, errors::ConnectorError};
10
11impl<B, C, P, R> HoprBlockchainConnector<C, B, P, R>
12where
13 B: Backend + Send + Sync + 'static,
14{
15 pub(crate) fn build_channel_stream(
16 &self,
17 selector: ChannelSelector,
18 ) -> Result<impl futures::Stream<Item = ChannelEntry> + Send + 'static, ConnectorError> {
19 if selector.allowed_states == [ChannelStatusDiscriminants::Closed] {
22 return Err(ConnectorError::InvalidArguments("cannot stream closed channels only"));
23 }
24
25 let channels = self
26 .graph
27 .read()
28 .all_edges()
29 .map(|(_, _, e)| e)
30 .copied()
31 .collect::<Vec<_>>();
32
33 let backend = self.backend.clone();
34 Ok(futures::stream::iter(channels).filter_map(move |channel_id| {
35 let backend = backend.clone();
36 let selector = selector.clone();
37 async move {
39 match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
40 {
41 Ok(Ok(value)) => value.filter(|c| selector.satisfies(c)),
42 Ok(Err(error)) => {
43 tracing::error!(%error, %channel_id, "backend error when looking up channel");
44 None
45 }
46 Err(error) => {
47 tracing::error!(%error, %channel_id, "join error when looking up channel");
48 None
49 }
50 }
51 }
52 }))
53 }
54}
55
56#[async_trait::async_trait]
57impl<B, C, P, R> hopr_api::chain::ChainReadChannelOperations for HoprBlockchainConnector<C, B, P, R>
58where
59 B: Backend + Send + Sync + 'static,
60 C: Send + Sync,
61 P: Send + Sync,
62 R: Send + Sync,
63{
64 type Error = ConnectorError;
65
66 fn me(&self) -> &Address {
67 self.chain_key.public().as_ref()
68 }
69
70 async fn channel_by_parties(&self, src: &Address, dst: &Address) -> Result<Option<ChannelEntry>, Self::Error> {
71 self.check_connection_state()?;
72
73 let backend = self.backend.clone();
74 let src = *src;
75 let dst = *dst;
76 Ok(self
77 .channel_by_parties
78 .try_get_with(ChannelParties::new(src, dst), async move {
79 tracing::warn!(%src, %dst, "cache miss on channel_by_parties");
80 match hopr_async_runtime::prelude::spawn_blocking(move || {
81 let channel_id = generate_channel_id(&src, &dst);
82 backend.get_channel_by_id(&channel_id)
83 })
84 .await
85 {
86 Ok(Ok(value)) => Ok(value),
87 Ok(Err(e)) => Err(ConnectorError::BackendError(e.into())),
88 Err(e) => Err(ConnectorError::BackendError(e.into())),
89 }
90 })
91 .await?)
92 }
93
94 async fn channel_by_id(&self, channel_id: &ChannelId) -> Result<Option<ChannelEntry>, Self::Error> {
95 self.check_connection_state()?;
96
97 let channel_id = *channel_id;
98 let backend = self.backend.clone();
99 Ok(self
100 .channel_by_id
101 .try_get_with_by_ref(&channel_id, async move {
102 tracing::warn!(%channel_id, "cache miss on channel_by_id");
103 match hopr_async_runtime::prelude::spawn_blocking(move || backend.get_channel_by_id(&channel_id)).await
104 {
105 Ok(Ok(value)) => Ok(value),
106 Ok(Err(e)) => Err(ConnectorError::BackendError(e.into())),
107 Err(e) => Err(ConnectorError::BackendError(e.into())),
108 }
109 })
110 .await?)
111 }
112
113 async fn stream_channels<'a>(
114 &'a self,
115 selector: ChannelSelector,
116 ) -> Result<BoxStream<'a, ChannelEntry>, Self::Error> {
117 self.check_connection_state()?;
118
119 Ok(self.build_channel_stream(selector)?.boxed())
120 }
121}
122
123#[async_trait::async_trait]
124impl<B, C, P> hopr_api::chain::ChainWriteChannelOperations for HoprBlockchainConnector<C, B, P, P::TxRequest>
125where
126 B: Backend + Send + Sync + 'static,
127 C: BlokliQueryClient + BlokliTransactionClient + Send + Sync + 'static,
128 P: PayloadGenerator + Send + Sync + 'static,
129 P::TxRequest: Send + Sync + 'static,
130{
131 type Error = ConnectorError;
132
133 async fn open_channel<'a>(
134 &'a self,
135 dst: &'a Address,
136 amount: HoprBalance,
137 ) -> Result<BoxFuture<'a, Result<(ChannelId, ChainReceipt), Self::Error>>, Self::Error> {
138 self.check_connection_state()?;
139
140 let id = generate_channel_id(self.chain_key.public().as_ref(), dst);
141 let tx_req = self.payload_generator.fund_channel(*dst, amount)?;
142 tracing::debug!(channel_id = %id, %dst, %amount, "opening channel");
143
144 Ok(self
145 .send_tx(tx_req)
146 .await?
147 .and_then(move |tx_hash| futures::future::ok((id, tx_hash)))
148 .boxed())
149 }
150
151 async fn fund_channel<'a>(
152 &'a self,
153 channel_id: &'a ChannelId,
154 amount: HoprBalance,
155 ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
156 self.check_connection_state()?;
157
158 use hopr_api::chain::ChainReadChannelOperations;
159
160 let channel = self
161 .channel_by_id(channel_id)
162 .await?
163 .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
164 let tx_req = self.payload_generator.fund_channel(channel.destination, amount)?;
165 tracing::debug!(%channel_id, %amount, "funding channel");
166
167 Ok(self.send_tx(tx_req).await?.boxed())
168 }
169
170 async fn close_channel<'a>(
171 &'a self,
172 channel_id: &'a ChannelId,
173 ) -> Result<BoxFuture<'a, Result<ChainReceipt, Self::Error>>, Self::Error> {
174 self.check_connection_state()?;
175
176 use hopr_api::chain::ChainReadChannelOperations;
177
178 let channel = self
179 .channel_by_id(channel_id)
180 .await?
181 .ok_or_else(|| ConnectorError::ChannelDoesNotExist(*channel_id))?;
182
183 let direction = channel.direction(self.me()).ok_or(ConnectorError::InvalidArguments(
184 "cannot close channels that is not own",
185 ))?;
186
187 let tx_req = match channel.status {
188 ChannelStatus::Closed => return Err(ConnectorError::ChannelClosed(*channel_id)),
189 ChannelStatus::Open => {
190 if direction == ChannelDirection::Outgoing {
191 tracing::debug!(%channel_id, "initiating outgoing channel closure");
192 self.payload_generator
193 .initiate_outgoing_channel_closure(channel.destination)?
194 } else {
195 tracing::debug!(%channel_id, "closing incoming channel");
196 self.payload_generator.close_incoming_channel(channel.source)?
197 }
198 }
199 c if c.closure_time_elapsed(&std::time::SystemTime::now()) => {
200 if direction == ChannelDirection::Outgoing {
201 tracing::debug!(%channel_id, "finalizing outgoing channel closure");
202 self.payload_generator
203 .finalize_outgoing_channel_closure(channel.destination)?
204 } else {
205 tracing::debug!(%channel_id, "closing incoming channel");
206 self.payload_generator.close_incoming_channel(channel.source)?
207 }
208 }
209 _ => return Err(ConnectorError::InvalidState("channel closure time has not elapsed")),
210 };
211
212 Ok(self.send_tx(tx_req).await?.boxed())
213 }
214}