1use async_channel::{bounded, Receiver, Sender};
6use async_trait::async_trait;
7use futures::future::Either;
8use futures::{pin_mut, FutureExt, StreamExt};
9use serde::{Deserialize, Serialize};
10use std::fmt::{Display, Formatter};
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15use tracing::{debug, error, info, trace, warn};
16
17use hopr_async_runtime::prelude::spawn;
18use hopr_chain_types::actions::Action;
19use hopr_chain_types::chain_events::ChainEventType;
20use hopr_crypto_types::types::Hash;
21use hopr_db_sql::api::tickets::HoprDbTicketOperations;
22use hopr_db_sql::info::HoprDbInfoOperations;
23use hopr_internal_types::prelude::*;
24use hopr_primitive_types::prelude::*;
25
26use crate::action_state::{ActionState, IndexerExpectation};
27use crate::errors::ChainActionsError::{ChannelAlreadyClosed, InvalidState, Timeout, TransactionSubmissionFailed};
28use crate::errors::Result;
29
30#[cfg(all(feature = "prometheus", not(test)))]
31use hopr_metrics::metrics::MultiCounter;
32
33#[cfg(all(feature = "prometheus", not(test)))]
34lazy_static::lazy_static! {
35 static ref METRIC_COUNT_ACTIONS: MultiCounter = MultiCounter::new(
36 "hopr_chain_actions_count",
37 "Number of different chain actions and their results",
38 &["action", "result"]
39 )
40 .unwrap();
41}
42
43#[cfg_attr(test, mockall::automock)]
47#[async_trait]
48pub trait TransactionExecutor {
49 async fn redeem_ticket(&self, ticket: RedeemableTicket) -> Result<Hash>;
51
52 async fn fund_channel(&self, destination: Address, balance: Balance) -> Result<Hash>;
55
56 async fn initiate_outgoing_channel_closure(&self, dst: Address) -> Result<Hash>;
58
59 async fn finalize_outgoing_channel_closure(&self, dst: Address) -> Result<Hash>;
61
62 async fn close_incoming_channel(&self, src: Address) -> Result<Hash>;
64
65 async fn withdraw(&self, recipient: Address, amount: Balance) -> Result<Hash>;
69
70 async fn announce(&self, data: AnnouncementData) -> Result<Hash>;
72
73 async fn register_safe(&self, safe_address: Address) -> Result<Hash>;
75}
76
77#[derive(Debug, Clone, PartialEq)]
79pub struct ActionConfirmation {
80 pub tx_hash: Hash,
82
83 pub event: Option<ChainEventType>,
85
86 pub action: Action,
88}
89
90impl Display for ActionConfirmation {
91 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92 write!(f, "{} confirmed in tx {}", self.action, self.tx_hash)
93 }
94}
95
96pub type PendingAction = Pin<Box<dyn Future<Output = Result<ActionConfirmation>> + Send>>;
98
99type ActionFinisher = futures::channel::oneshot::Sender<Result<ActionConfirmation>>;
101
102#[derive(Debug, Clone)]
104pub struct ActionSender(Sender<(Action, ActionFinisher)>);
105
106impl ActionSender {
107 #[tracing::instrument(level = "debug", skip(self))]
109 pub async fn send(&self, action: Action) -> Result<PendingAction> {
110 let completer = futures::channel::oneshot::channel();
111 let sender = self.0.clone();
112
113 sender
114 .send((action, completer.0))
115 .await
116 .map(|_| {
117 completer
118 .1
119 .map(|r| r.unwrap_or(Err(InvalidState("channel cancelled".into()))))
120 .boxed()
121 })
122 .map_err(|_| TransactionSubmissionFailed("ethereum tx queue is closed".into()))
123 }
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
128pub struct ActionQueueConfig {
129 #[default(Duration::from_secs(150))]
133 pub max_action_confirmation_wait: Duration,
134}
135
136#[derive(Debug)]
137struct ExecutionContext<S, TxExec>
138where
139 S: ActionState,
140 TxExec: TransactionExecutor,
141{
142 action_state: Arc<S>,
143 tx_exec: Arc<TxExec>,
144 cfg: ActionQueueConfig,
145}
146
147impl<S, TxExec> Clone for ExecutionContext<S, TxExec>
149where
150 S: ActionState,
151 TxExec: TransactionExecutor,
152{
153 fn clone(&self) -> Self {
154 Self {
155 action_state: self.action_state.clone(),
156 tx_exec: self.tx_exec.clone(),
157 cfg: self.cfg,
158 }
159 }
160}
161
162impl<S, TxExec> ExecutionContext<S, TxExec>
163where
164 S: ActionState,
165 TxExec: TransactionExecutor,
166{
167 #[tracing::instrument(level = "debug", skip(self))]
168 pub async fn execute_action(self, action: Action) -> Result<ActionConfirmation> {
169 let expectation = match action.clone() {
170 Action::RedeemTicket(ticket) => {
171 debug!(%ticket, "redeeming ticket");
172 let ticket_channel_id = ticket.verified_ticket().channel_id;
173 let tx_hash = self.tx_exec.redeem_ticket(ticket).await?;
174 IndexerExpectation::new(
175 tx_hash,
176 move |event| matches!(event, ChainEventType::TicketRedeemed(channel, _) if ticket_channel_id == channel.get_id()),
177 )
178 }
179
180 Action::OpenChannel(address, stake) => {
181 debug!(%address, %stake, "opening channel");
182 let tx_hash = self.tx_exec.fund_channel(address, stake).await?;
183 IndexerExpectation::new(
184 tx_hash,
185 move |event| matches!(event, ChainEventType::ChannelOpened(channel) if channel.destination == address),
186 )
187 }
188
189 Action::FundChannel(channel, amount) => {
190 if channel.status == ChannelStatus::Open {
191 debug!(%channel, "funding channel");
192 let tx_hash = self.tx_exec.fund_channel(channel.destination, amount).await?;
193 IndexerExpectation::new(
194 tx_hash,
195 move |event| matches!(event, ChainEventType::ChannelBalanceIncreased(r_channel, diff) if r_channel.get_id() == channel.get_id() && amount.eq(diff)),
196 )
197 } else {
198 return Err(InvalidState(format!("cannot fund {channel} because it is not opened")));
199 }
200 }
201
202 Action::CloseChannel(channel, direction) => match direction {
203 ChannelDirection::Incoming => match channel.status {
204 ChannelStatus::Open | ChannelStatus::PendingToClose(_) => {
205 debug!(%channel, "closing incoming channel");
206 let tx_hash = self.tx_exec.close_incoming_channel(channel.source).await?;
207 IndexerExpectation::new(
208 tx_hash,
209 move |event| matches!(event, ChainEventType::ChannelClosed(r_channel) if r_channel.get_id() == channel.get_id()),
210 )
211 }
212 ChannelStatus::Closed => {
213 warn!(%channel, "channel already closed");
214 return Err(ChannelAlreadyClosed);
215 }
216 },
217 ChannelDirection::Outgoing => match channel.status {
218 ChannelStatus::Open => {
219 debug!(%channel, "initiating channel closure");
220 let tx_hash = self
221 .tx_exec
222 .initiate_outgoing_channel_closure(channel.destination)
223 .await?;
224 IndexerExpectation::new(
225 tx_hash,
226 move |event| matches!(event, ChainEventType::ChannelClosureInitiated(r_channel) if r_channel.get_id() == channel.get_id()),
227 )
228 }
229 ChannelStatus::PendingToClose(_) => {
230 debug!(%channel, "finalizing channel closure");
231 let tx_hash = self
232 .tx_exec
233 .finalize_outgoing_channel_closure(channel.destination)
234 .await?;
235 IndexerExpectation::new(
236 tx_hash,
237 move |event| matches!(event, ChainEventType::ChannelClosed(r_channel) if r_channel.get_id() == channel.get_id()),
238 )
239 }
240 ChannelStatus::Closed => {
241 warn!(%channel, "channel already closed");
242 return Err(ChannelAlreadyClosed);
243 }
244 },
245 },
246
247 Action::Withdraw(recipient, amount) => {
248 debug!(%recipient, %amount, "withdrawing funds");
252 return Ok(ActionConfirmation {
253 tx_hash: self.tx_exec.withdraw(recipient, amount).await?,
254 event: None,
255 action: action.clone(),
256 });
257 }
258 Action::Announce(data) => {
259 debug!(mutliaddress = %data.multiaddress(), "announcing node");
260 let tx_hash = self.tx_exec.announce(data.clone()).await?;
261 IndexerExpectation::new(
262 tx_hash,
263 move |event| matches!(event, ChainEventType::Announcement{multiaddresses,..} if multiaddresses.contains(data.multiaddress())),
264 )
265 }
266 Action::RegisterSafe(safe_address) => {
267 debug!(%safe_address, "registering safe");
268 let tx_hash = self.tx_exec.register_safe(safe_address).await?;
269 IndexerExpectation::new(
270 tx_hash,
271 move |event| matches!(event, ChainEventType::NodeSafeRegistered(address) if safe_address.eq(address)),
272 )
273 }
274 };
275
276 let tx_hash = expectation.tx_hash;
277 debug!(?action, %tx_hash, "action submitted via tx, registering expectation");
278
279 let confirmation = self.action_state.register_expectation(expectation).await?.fuse();
281 let timeout = futures_timer::Delay::new(self.cfg.max_action_confirmation_wait).fuse();
282
283 pin_mut!(confirmation, timeout);
284
285 match futures::future::select(confirmation, timeout).await {
286 Either::Left((Ok(chain_event), _)) => Ok(ActionConfirmation {
287 tx_hash: chain_event.tx_hash,
288 event: Some(chain_event.event_type),
289 action,
290 }),
291 Either::Left((Err(_), _)) => {
292 self.action_state.unregister_expectation(tx_hash).await;
293 Err(InvalidState("action expectation was removed before resolving".into()))
294 }
295 Either::Right(_) => {
296 self.action_state.unregister_expectation(tx_hash).await;
297 Err(Timeout)
298 }
299 }
300 }
301}
302
303#[derive(Debug, Clone)]
309pub struct ActionQueue<Db, S, TxExec>
310where
311 Db: HoprDbInfoOperations + HoprDbTicketOperations + Send + Sync,
312 S: ActionState + Send + Sync,
313 TxExec: TransactionExecutor + Send + Sync,
314{
315 db: Db,
316 queue_send: Sender<(Action, ActionFinisher)>,
317 queue_recv: Receiver<(Action, ActionFinisher)>,
318 ctx: ExecutionContext<S, TxExec>,
319}
320
321impl<Db, S, TxExec> ActionQueue<Db, S, TxExec>
322where
323 Db: HoprDbInfoOperations + HoprDbTicketOperations + Clone + Send + Sync + 'static,
324 S: ActionState + Send + Sync + 'static,
325 TxExec: TransactionExecutor + Send + Sync + 'static,
326{
327 pub const ACTION_QUEUE_SIZE: usize = 2048;
329
330 pub fn new(db: Db, action_state: S, tx_exec: TxExec, cfg: ActionQueueConfig) -> Self {
332 let (queue_send, queue_recv) = bounded(Self::ACTION_QUEUE_SIZE);
333 Self {
334 db,
335 ctx: ExecutionContext {
336 action_state: Arc::new(action_state),
337 tx_exec: Arc::new(tx_exec),
338 cfg,
339 },
340 queue_send,
341 queue_recv,
342 }
343 }
344
345 pub fn new_sender(&self) -> ActionSender {
347 ActionSender(self.queue_send.clone())
348 }
349
350 pub fn action_state(&self) -> Arc<S> {
352 self.ctx.action_state.clone()
353 }
354
355 #[tracing::instrument(level = "debug", skip(self))]
359 pub async fn start(self) {
360 let queue_recv = self.queue_recv.clone();
361 pin_mut!(queue_recv);
362 while let Some((act, tx_finisher)) = queue_recv.next().await {
363 futures_timer::Delay::new(Duration::from_millis(100)).await;
365
366 let exec_context = self.ctx.clone();
367 let db_clone = self.db.clone();
368
369 spawn(async move {
371 let act_id = act.to_string();
372 let act_name: &'static str = (&act).into();
373 trace!(act_id, act_name, "executing");
374
375 let tx_result = exec_context.execute_action(act.clone()).await;
376 match &tx_result {
377 Ok(confirmation) => {
378 info!(%confirmation, "successful confirmation");
379
380 #[cfg(all(feature = "prometheus", not(test)))]
381 METRIC_COUNT_ACTIONS.increment(&[act_name, "success"]);
382 }
383 Err(err) => {
384 if let Action::RedeemTicket(ack) = act {
386 error!(rror = %err, "marking the acknowledged ticket as untouched - redeem action failed");
387
388 if let Err(e) = db_clone
389 .update_ticket_states((&ack).into(), AcknowledgedTicketStatus::Untouched)
390 .await
391 {
392 error!(%ack, error = %e, "cannot mark ticket as untouched");
393 }
394 }
395
396 if let Timeout = err {
398 error!(act_id, "timeout while waiting for confirmation");
399
400 #[cfg(all(feature = "prometheus", not(test)))]
401 METRIC_COUNT_ACTIONS.increment(&[act_name, "timeout"]);
402 } else {
403 error!(act_id, error = %err, "ticket action failed");
404
405 #[cfg(all(feature = "prometheus", not(test)))]
406 METRIC_COUNT_ACTIONS.increment(&[act_name, "failure"]);
407 }
408 }
409 }
410
411 let _ = tx_finisher.send(tx_result);
412 });
413 }
414 error!("action queue has finished, it should be running for the node to be able to process chain actions");
415 }
416}