hopr_chain_actions/
action_queue.rs

1//! Defines the main FIFO MPSC queue for actions - the [ActionQueue] type.
2//!
3//! The [ActionQueue] acts as a MPSC queue of [Actions](hopr_chain_types::actions::Action) which are executed one-by-one
4//! as they are being popped up from the queue by a runner task.
5use async_channel::{bounded, Receiver, Sender};
6use async_trait::async_trait;
7use futures::future::Either;
8use futures::{pin_mut, FutureExt, StreamExt};
9use serde::{Deserialize, Serialize};
10use std::fmt::{Display, Formatter};
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15use tracing::{debug, error, info, trace, warn};
16
17use hopr_async_runtime::prelude::spawn;
18use hopr_chain_types::actions::Action;
19use hopr_chain_types::chain_events::ChainEventType;
20use hopr_crypto_types::types::Hash;
21use hopr_db_sql::api::tickets::HoprDbTicketOperations;
22use hopr_db_sql::info::HoprDbInfoOperations;
23use hopr_internal_types::prelude::*;
24use hopr_primitive_types::prelude::*;
25
26use crate::action_state::{ActionState, IndexerExpectation};
27use crate::errors::ChainActionsError::{ChannelAlreadyClosed, InvalidState, Timeout, TransactionSubmissionFailed};
28use crate::errors::Result;
29
30#[cfg(all(feature = "prometheus", not(test)))]
31use hopr_metrics::metrics::MultiCounter;
32
33#[cfg(all(feature = "prometheus", not(test)))]
34lazy_static::lazy_static! {
35    static ref METRIC_COUNT_ACTIONS: MultiCounter = MultiCounter::new(
36        "hopr_chain_actions_count",
37        "Number of different chain actions and their results",
38        &["action", "result"]
39    )
40    .unwrap();
41}
42
43/// Implements execution of transactions underlying each `Action`.
44///
45/// Each operation returns a transaction hash and may time out.
46#[cfg_attr(test, mockall::automock)]
47#[async_trait]
48pub trait TransactionExecutor {
49    /// Executes ticket redemption transaction given a ticket.
50    async fn redeem_ticket(&self, ticket: RedeemableTicket) -> Result<Hash>;
51
52    /// Executes channel funding transaction (or channel opening) to the given `destination` and stake.
53    /// Channel funding and channel opening are both same transactions.
54    async fn fund_channel(&self, destination: Address, balance: Balance) -> Result<Hash>;
55
56    /// Initiates closure of an outgoing channel.
57    async fn initiate_outgoing_channel_closure(&self, dst: Address) -> Result<Hash>;
58
59    /// Finalizes closure of an outgoing channel.
60    async fn finalize_outgoing_channel_closure(&self, dst: Address) -> Result<Hash>;
61
62    /// Closes incoming channel.
63    async fn close_incoming_channel(&self, src: Address) -> Result<Hash>;
64
65    /// Performs withdrawal of a certain amount from an address.
66    /// Note that this transaction is typically awaited via polling and is not tracked
67    /// by the Indexer.
68    async fn withdraw(&self, recipient: Address, amount: Balance) -> Result<Hash>;
69
70    /// Announces the node on-chain given the `AnnouncementData`
71    async fn announce(&self, data: AnnouncementData) -> Result<Hash>;
72
73    /// Registers Safe with the node.
74    async fn register_safe(&self, safe_address: Address) -> Result<Hash>;
75}
76
77/// Represents confirmation of the `Action` execution.
78#[derive(Debug, Clone, PartialEq)]
79pub struct ActionConfirmation {
80    /// Hash of the transaction that executed this action
81    pub tx_hash: Hash,
82
83    /// Corresponding chain event if any
84    pub event: Option<ChainEventType>,
85
86    /// Action that was executed
87    pub action: Action,
88}
89
90impl Display for ActionConfirmation {
91    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92        write!(f, "{} confirmed in tx {}", self.action, self.tx_hash)
93    }
94}
95
96/// Notifies about completion of a transaction (success or failure).
97pub type PendingAction = Pin<Box<dyn Future<Output = Result<ActionConfirmation>> + Send>>;
98
99/// Future that resolves once the transaction has been confirmed by the Indexer.
100type ActionFinisher = futures::channel::oneshot::Sender<Result<ActionConfirmation>>;
101
102/// Sends a future Ethereum transaction into the `ActionQueue`.
103#[derive(Debug, Clone)]
104pub struct ActionSender(Sender<(Action, ActionFinisher)>);
105
106impl ActionSender {
107    /// Delivers the future action into the `ActionQueue` for processing.
108    #[tracing::instrument(level = "debug", skip(self))]
109    pub async fn send(&self, action: Action) -> Result<PendingAction> {
110        let completer = futures::channel::oneshot::channel();
111        let sender = self.0.clone();
112
113        sender
114            .send((action, completer.0))
115            .await
116            .map(|_| {
117                completer
118                    .1
119                    .map(|r| r.unwrap_or(Err(InvalidState("channel cancelled".into()))))
120                    .boxed()
121            })
122            .map_err(|_| TransactionSubmissionFailed("ethereum tx queue is closed".into()))
123    }
124}
125
126/// Configuration for the [ActionQueue]
127#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
128pub struct ActionQueueConfig {
129    /// Maximum time (in seconds) to wait for the action to be confirmed on-chain and indexed
130    ///
131    /// Defaults to 150 seconds.
132    #[default(Duration::from_secs(150))]
133    pub max_action_confirmation_wait: Duration,
134}
135
136#[derive(Debug)]
137struct ExecutionContext<S, TxExec>
138where
139    S: ActionState,
140    TxExec: TransactionExecutor,
141{
142    action_state: Arc<S>,
143    tx_exec: Arc<TxExec>,
144    cfg: ActionQueueConfig,
145}
146
147// Needs manual implementation, so we don't need to impose Clone restrictions on the generic args
148impl<S, TxExec> Clone for ExecutionContext<S, TxExec>
149where
150    S: ActionState,
151    TxExec: TransactionExecutor,
152{
153    fn clone(&self) -> Self {
154        Self {
155            action_state: self.action_state.clone(),
156            tx_exec: self.tx_exec.clone(),
157            cfg: self.cfg,
158        }
159    }
160}
161
162impl<S, TxExec> ExecutionContext<S, TxExec>
163where
164    S: ActionState,
165    TxExec: TransactionExecutor,
166{
167    #[tracing::instrument(level = "debug", skip(self))]
168    pub async fn execute_action(self, action: Action) -> Result<ActionConfirmation> {
169        let expectation = match action.clone() {
170            Action::RedeemTicket(ticket) => {
171                debug!(%ticket, "redeeming ticket");
172                let ticket_channel_id = ticket.verified_ticket().channel_id;
173                let tx_hash = self.tx_exec.redeem_ticket(ticket).await?;
174                IndexerExpectation::new(
175                    tx_hash,
176                    move |event| matches!(event, ChainEventType::TicketRedeemed(channel, _) if ticket_channel_id == channel.get_id()),
177                )
178            }
179
180            Action::OpenChannel(address, stake) => {
181                debug!(%address, %stake, "opening channel");
182                let tx_hash = self.tx_exec.fund_channel(address, stake).await?;
183                IndexerExpectation::new(
184                    tx_hash,
185                    move |event| matches!(event, ChainEventType::ChannelOpened(channel) if channel.destination == address),
186                )
187            }
188
189            Action::FundChannel(channel, amount) => {
190                if channel.status == ChannelStatus::Open {
191                    debug!(%channel, "funding channel");
192                    let tx_hash = self.tx_exec.fund_channel(channel.destination, amount).await?;
193                    IndexerExpectation::new(
194                        tx_hash,
195                        move |event| matches!(event, ChainEventType::ChannelBalanceIncreased(r_channel, diff) if r_channel.get_id() == channel.get_id() && amount.eq(diff)),
196                    )
197                } else {
198                    return Err(InvalidState(format!("cannot fund {channel} because it is not opened")));
199                }
200            }
201
202            Action::CloseChannel(channel, direction) => match direction {
203                ChannelDirection::Incoming => match channel.status {
204                    ChannelStatus::Open | ChannelStatus::PendingToClose(_) => {
205                        debug!(%channel, "closing incoming channel");
206                        let tx_hash = self.tx_exec.close_incoming_channel(channel.source).await?;
207                        IndexerExpectation::new(
208                            tx_hash,
209                            move |event| matches!(event, ChainEventType::ChannelClosed(r_channel) if r_channel.get_id() == channel.get_id()),
210                        )
211                    }
212                    ChannelStatus::Closed => {
213                        warn!(%channel, "channel already closed");
214                        return Err(ChannelAlreadyClosed);
215                    }
216                },
217                ChannelDirection::Outgoing => match channel.status {
218                    ChannelStatus::Open => {
219                        debug!(%channel, "initiating channel closure");
220                        let tx_hash = self
221                            .tx_exec
222                            .initiate_outgoing_channel_closure(channel.destination)
223                            .await?;
224                        IndexerExpectation::new(
225                            tx_hash,
226                            move |event| matches!(event, ChainEventType::ChannelClosureInitiated(r_channel) if r_channel.get_id() == channel.get_id()),
227                        )
228                    }
229                    ChannelStatus::PendingToClose(_) => {
230                        debug!(%channel, "finalizing channel closure");
231                        let tx_hash = self
232                            .tx_exec
233                            .finalize_outgoing_channel_closure(channel.destination)
234                            .await?;
235                        IndexerExpectation::new(
236                            tx_hash,
237                            move |event| matches!(event, ChainEventType::ChannelClosed(r_channel) if r_channel.get_id() == channel.get_id()),
238                        )
239                    }
240                    ChannelStatus::Closed => {
241                        warn!(%channel, "channel already closed");
242                        return Err(ChannelAlreadyClosed);
243                    }
244                },
245            },
246
247            Action::Withdraw(recipient, amount) => {
248                // Withdrawal is not awaited via the Indexer, but polled for completion,
249                // so no indexer event stream expectation awaiting is needed.
250                // So return once the future completes
251                debug!(%recipient, %amount, "withdrawing funds");
252                return Ok(ActionConfirmation {
253                    tx_hash: self.tx_exec.withdraw(recipient, amount).await?,
254                    event: None,
255                    action: action.clone(),
256                });
257            }
258            Action::Announce(data) => {
259                debug!(mutliaddress = %data.multiaddress(), "announcing node");
260                let tx_hash = self.tx_exec.announce(data.clone()).await?;
261                IndexerExpectation::new(
262                    tx_hash,
263                    move |event| matches!(event, ChainEventType::Announcement{multiaddresses,..} if multiaddresses.contains(data.multiaddress())),
264                )
265            }
266            Action::RegisterSafe(safe_address) => {
267                debug!(%safe_address, "registering safe");
268                let tx_hash = self.tx_exec.register_safe(safe_address).await?;
269                IndexerExpectation::new(
270                    tx_hash,
271                    move |event| matches!(event, ChainEventType::NodeSafeRegistered(address) if safe_address.eq(address)),
272                )
273            }
274        };
275
276        let tx_hash = expectation.tx_hash;
277        debug!(?action, %tx_hash, "action submitted via tx, registering expectation");
278
279        // Register new expectation and await it with timeout
280        let confirmation = self.action_state.register_expectation(expectation).await?.fuse();
281        let timeout = futures_timer::Delay::new(self.cfg.max_action_confirmation_wait).fuse();
282
283        pin_mut!(confirmation, timeout);
284
285        match futures::future::select(confirmation, timeout).await {
286            Either::Left((Ok(chain_event), _)) => Ok(ActionConfirmation {
287                tx_hash: chain_event.tx_hash,
288                event: Some(chain_event.event_type),
289                action,
290            }),
291            Either::Left((Err(_), _)) => {
292                self.action_state.unregister_expectation(tx_hash).await;
293                Err(InvalidState("action expectation was removed before resolving".into()))
294            }
295            Either::Right(_) => {
296                self.action_state.unregister_expectation(tx_hash).await;
297                Err(Timeout)
298            }
299        }
300    }
301}
302
303/// A queue of [Actions](Action) to be executed.
304///
305/// This queue awaits new Actions to arrive, translates them into Ethereum
306/// transactions via [TransactionExecutor] to execute them and await their confirmation
307/// by registering their corresponding expectations in [ActionState].
308#[derive(Debug, Clone)]
309pub struct ActionQueue<Db, S, TxExec>
310where
311    Db: HoprDbInfoOperations + HoprDbTicketOperations + Send + Sync,
312    S: ActionState + Send + Sync,
313    TxExec: TransactionExecutor + Send + Sync,
314{
315    db: Db,
316    queue_send: Sender<(Action, ActionFinisher)>,
317    queue_recv: Receiver<(Action, ActionFinisher)>,
318    ctx: ExecutionContext<S, TxExec>,
319}
320
321impl<Db, S, TxExec> ActionQueue<Db, S, TxExec>
322where
323    Db: HoprDbInfoOperations + HoprDbTicketOperations + Clone + Send + Sync + 'static,
324    S: ActionState + Send + Sync + 'static,
325    TxExec: TransactionExecutor + Send + Sync + 'static,
326{
327    /// Number of pending transactions in the queue
328    pub const ACTION_QUEUE_SIZE: usize = 2048;
329
330    /// Creates a new instance with the given [TransactionExecutor] and [ActionState] implementations.
331    pub fn new(db: Db, action_state: S, tx_exec: TxExec, cfg: ActionQueueConfig) -> Self {
332        let (queue_send, queue_recv) = bounded(Self::ACTION_QUEUE_SIZE);
333        Self {
334            db,
335            ctx: ExecutionContext {
336                action_state: Arc::new(action_state),
337                tx_exec: Arc::new(tx_exec),
338                cfg,
339            },
340            queue_send,
341            queue_recv,
342        }
343    }
344
345    /// Creates a new producer of actions for this queue.
346    pub fn new_sender(&self) -> ActionSender {
347        ActionSender(self.queue_send.clone())
348    }
349
350    /// Clones the `ActionState` implementation.
351    pub fn action_state(&self) -> Arc<S> {
352        self.ctx.action_state.clone()
353    }
354
355    /// Consumes self and runs the main queue processing loop until the queue is closed.
356    ///
357    /// The method will panic if the Channel Domain Separator is not yet populated in the DB.
358    #[tracing::instrument(level = "debug", skip(self))]
359    pub async fn start(self) {
360        let queue_recv = self.queue_recv.clone();
361        pin_mut!(queue_recv);
362        while let Some((act, tx_finisher)) = queue_recv.next().await {
363            // Some minimum separation to avoid batching txs
364            futures_timer::Delay::new(Duration::from_millis(100)).await;
365
366            let exec_context = self.ctx.clone();
367            let db_clone = self.db.clone();
368
369            // NOTE: the process is "daemonized" and not awaited, so it will run in the background
370            spawn(async move {
371                let act_id = act.to_string();
372                let act_name: &'static str = (&act).into();
373                trace!(act_id, act_name, "executing");
374
375                let tx_result = exec_context.execute_action(act.clone()).await;
376                match &tx_result {
377                    Ok(confirmation) => {
378                        info!(%confirmation, "successful confirmation");
379
380                        #[cfg(all(feature = "prometheus", not(test)))]
381                        METRIC_COUNT_ACTIONS.increment(&[act_name, "success"]);
382                    }
383                    Err(err) => {
384                        // On error in Ticket redeem action, we also need to reset ack ticket state
385                        if let Action::RedeemTicket(ack) = act {
386                            error!(rror = %err, "marking the acknowledged ticket as untouched - redeem action failed");
387
388                            if let Err(e) = db_clone
389                                .update_ticket_states((&ack).into(), AcknowledgedTicketStatus::Untouched)
390                                .await
391                            {
392                                error!(%ack, error = %e, "cannot mark ticket as untouched");
393                            }
394                        }
395
396                        // Timeouts are accounted in different metric
397                        if let Timeout = err {
398                            error!(act_id, "timeout while waiting for confirmation");
399
400                            #[cfg(all(feature = "prometheus", not(test)))]
401                            METRIC_COUNT_ACTIONS.increment(&[act_name, "timeout"]);
402                        } else {
403                            error!(act_id, error = %err, "ticket action failed");
404
405                            #[cfg(all(feature = "prometheus", not(test)))]
406                            METRIC_COUNT_ACTIONS.increment(&[act_name, "failure"]);
407                        }
408                    }
409                }
410
411                let _ = tx_finisher.send(tx_result);
412            });
413        }
414        error!("action queue has finished, it should be running for the node to be able to process chain actions");
415    }
416}