hopr_chain_actions/
action_queue.rs

1//! Defines the main FIFO MPSC queue for actions - the [ActionQueue] type.
2//!
3//! The [ActionQueue] acts as a MPSC queue of [Actions](hopr_chain_types::actions::Action) which are executed one-by-one
4//! as they are being popped up from the queue by a runner task.
5use std::{
6    fmt::{Display, Formatter},
7    future::Future,
8    pin::Pin,
9    sync::Arc,
10    time::Duration,
11};
12
13use async_trait::async_trait;
14use futures::{FutureExt, SinkExt, StreamExt, future::Either, pin_mut};
15use hopr_api::db::HoprDbTicketOperations;
16use hopr_async_runtime::prelude::spawn;
17use hopr_chain_types::{actions::Action, chain_events::ChainEventType};
18use hopr_crypto_types::types::Hash;
19use hopr_internal_types::prelude::*;
20use hopr_primitive_types::prelude::*;
21use serde::{Deserialize, Serialize};
22use tracing::{debug, error, info, trace, warn};
23
24use crate::{
25    action_state::{ActionState, IndexerExpectation},
26    errors::{
27        ChainActionsError::{ChannelAlreadyClosed, InvalidState, Timeout, TransactionSubmissionFailed},
28        Result,
29    },
30};
31
32#[cfg(all(feature = "prometheus", not(test)))]
33lazy_static::lazy_static! {
34    static ref METRIC_COUNT_ACTIONS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
35        "hopr_chain_actions_count",
36        "Number of different chain actions and their results",
37        &["action", "result"]
38    )
39    .unwrap();
40}
41
42/// Implements execution of transactions underlying each `Action`.
43///
44/// Each operation returns a transaction hash and may time out.
45#[cfg_attr(test, mockall::automock)]
46#[async_trait]
47pub trait TransactionExecutor {
48    /// Executes ticket redemption transaction given a ticket.
49    async fn redeem_ticket(&self, ticket: RedeemableTicket) -> Result<Hash>;
50
51    /// Executes channel funding transaction (or channel opening) to the given `destination` and stake.
52    /// Channel funding and channel opening are both same transactions.
53    async fn fund_channel(&self, destination: Address, balance: HoprBalance) -> Result<Hash>;
54
55    /// Initiates closure of an outgoing channel.
56    async fn initiate_outgoing_channel_closure(&self, dst: Address) -> Result<Hash>;
57
58    /// Finalizes closure of an outgoing channel.
59    async fn finalize_outgoing_channel_closure(&self, dst: Address) -> Result<Hash>;
60
61    /// Closes incoming channel.
62    async fn close_incoming_channel(&self, src: Address) -> Result<Hash>;
63
64    /// Performs withdrawal of a certain amount from an address.
65    /// Note that this transaction is typically awaited via polling and is not tracked
66    /// by the Indexer.
67    async fn withdraw<C: Currency + Send + 'static>(&self, recipient: Address, amount: Balance<C>) -> Result<Hash>;
68
69    /// Announces the node on-chain given the `AnnouncementData`
70    async fn announce(&self, data: AnnouncementData) -> Result<Hash>;
71
72    /// Registers Safe with the node.
73    async fn register_safe(&self, safe_address: Address) -> Result<Hash>;
74}
75
76/// Represents confirmation of the `Action` execution.
77#[derive(Debug, Clone, PartialEq)]
78pub struct ActionConfirmation {
79    /// Hash of the transaction that executed this action
80    pub tx_hash: Hash,
81
82    /// Corresponding chain event if any
83    pub event: Option<ChainEventType>,
84
85    /// Action that was executed
86    pub action: Action,
87}
88
89impl Display for ActionConfirmation {
90    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91        write!(f, "{} confirmed in tx {}", self.action, self.tx_hash)
92    }
93}
94
95/// Notifies about completion of a transaction (success or failure).
96pub type PendingAction = Pin<Box<dyn Future<Output = Result<ActionConfirmation>> + Send>>;
97
98/// Future that resolves once the transaction has been confirmed by the Indexer.
99type ActionFinisher = futures::channel::oneshot::Sender<Result<ActionConfirmation>>;
100
101/// Sends a future Ethereum transaction into the `ActionQueue`.
102#[derive(Debug, Clone)]
103pub struct ActionSender(futures::channel::mpsc::Sender<(Action, ActionFinisher)>);
104
105impl ActionSender {
106    /// Delivers the future action into the `ActionQueue` for processing.
107    #[tracing::instrument(level = "debug", skip(self))]
108    pub async fn send(&self, action: Action) -> Result<PendingAction> {
109        let completer = futures::channel::oneshot::channel();
110        let mut sender = self.0.clone();
111
112        sender
113            .send((action, completer.0))
114            .await
115            .map(|_| {
116                completer
117                    .1
118                    .map(|r| r.unwrap_or(Err(InvalidState("channel cancelled".into()))))
119                    .boxed()
120            })
121            .map_err(|_| TransactionSubmissionFailed("ethereum tx queue is closed".into()))
122    }
123}
124
125/// Configuration for the [ActionQueue]
126#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
127pub struct ActionQueueConfig {
128    /// Maximum time (in seconds) to wait for the action to be confirmed on-chain and indexed
129    ///
130    /// Defaults to 150 seconds.
131    #[default(Duration::from_secs(150))]
132    pub max_action_confirmation_wait: Duration,
133}
134
135#[derive(Debug)]
136struct ExecutionContext<S, TxExec>
137where
138    S: ActionState,
139    TxExec: TransactionExecutor,
140{
141    action_state: Arc<S>,
142    tx_exec: Arc<TxExec>,
143    cfg: ActionQueueConfig,
144}
145
146// Needs manual implementation, so we don't need to impose Clone restrictions on the generic args
147impl<S, TxExec> Clone for ExecutionContext<S, TxExec>
148where
149    S: ActionState,
150    TxExec: TransactionExecutor,
151{
152    fn clone(&self) -> Self {
153        Self {
154            action_state: self.action_state.clone(),
155            tx_exec: self.tx_exec.clone(),
156            cfg: self.cfg,
157        }
158    }
159}
160
161impl<S, TxExec> ExecutionContext<S, TxExec>
162where
163    S: ActionState,
164    TxExec: TransactionExecutor,
165{
166    #[tracing::instrument(level = "debug", skip(self))]
167    pub async fn execute_action(self, action: Action) -> Result<ActionConfirmation> {
168        let expectation = match action.clone() {
169            Action::RedeemTicket(ticket) => {
170                debug!(%ticket, "redeeming ticket");
171                let ticket_channel_id = ticket.verified_ticket().channel_id;
172                let tx_hash = self.tx_exec.redeem_ticket(ticket).await?;
173                IndexerExpectation::new(
174                    tx_hash,
175                    move |event| matches!(event, ChainEventType::TicketRedeemed(channel, _) if ticket_channel_id == channel.get_id()),
176                )
177            }
178
179            Action::OpenChannel(address, stake) => {
180                debug!(%address, %stake, "opening channel");
181                let tx_hash = self.tx_exec.fund_channel(address, stake).await?;
182                IndexerExpectation::new(
183                    tx_hash,
184                    move |event| matches!(event, ChainEventType::ChannelOpened(channel) if channel.destination == address),
185                )
186            }
187
188            Action::FundChannel(channel, amount) => {
189                if channel.status == ChannelStatus::Open {
190                    debug!(%channel, "funding channel");
191                    let tx_hash = self.tx_exec.fund_channel(channel.destination, amount).await?;
192                    IndexerExpectation::new(
193                        tx_hash,
194                        move |event| matches!(event, ChainEventType::ChannelBalanceIncreased(r_channel, diff) if r_channel.get_id() == channel.get_id() && amount.eq(diff)),
195                    )
196                } else {
197                    return Err(InvalidState(format!("cannot fund {channel} because it is not opened")));
198                }
199            }
200
201            Action::CloseChannel(channel, direction) => match direction {
202                ChannelDirection::Incoming => match channel.status {
203                    ChannelStatus::Open | ChannelStatus::PendingToClose(_) => {
204                        debug!(%channel, "closing incoming channel");
205                        let tx_hash = self.tx_exec.close_incoming_channel(channel.source).await?;
206                        IndexerExpectation::new(
207                            tx_hash,
208                            move |event| matches!(event, ChainEventType::ChannelClosed(r_channel) if r_channel.get_id() == channel.get_id()),
209                        )
210                    }
211                    ChannelStatus::Closed => {
212                        warn!(%channel, "channel already closed");
213                        return Err(ChannelAlreadyClosed);
214                    }
215                },
216                ChannelDirection::Outgoing => match channel.status {
217                    ChannelStatus::Open => {
218                        debug!(%channel, "initiating channel closure");
219                        let tx_hash = self
220                            .tx_exec
221                            .initiate_outgoing_channel_closure(channel.destination)
222                            .await?;
223                        IndexerExpectation::new(
224                            tx_hash,
225                            move |event| matches!(event, ChainEventType::ChannelClosureInitiated(r_channel) if r_channel.get_id() == channel.get_id()),
226                        )
227                    }
228                    ChannelStatus::PendingToClose(_) => {
229                        debug!(%channel, "finalizing channel closure");
230                        let tx_hash = self
231                            .tx_exec
232                            .finalize_outgoing_channel_closure(channel.destination)
233                            .await?;
234                        IndexerExpectation::new(
235                            tx_hash,
236                            move |event| matches!(event, ChainEventType::ChannelClosed(r_channel) if r_channel.get_id() == channel.get_id()),
237                        )
238                    }
239                    ChannelStatus::Closed => {
240                        warn!(%channel, "channel already closed");
241                        return Err(ChannelAlreadyClosed);
242                    }
243                },
244            },
245
246            // Withdrawals are not awaited via the Indexer, but polled for completion,
247            // so no indexer event stream expectation awaiting is needed.
248            // So return once the future completes
249            Action::Withdraw(recipient, amount) => {
250                debug!(%recipient, %amount, "withdrawing funds");
251                return Ok(ActionConfirmation {
252                    tx_hash: self.tx_exec.withdraw(recipient, amount).await?,
253                    event: None,
254                    action: action.clone(),
255                });
256            }
257            Action::WithdrawNative(recipient, amount) => {
258                debug!(%recipient, %amount, "withdrawing native funds");
259                return Ok(ActionConfirmation {
260                    tx_hash: self.tx_exec.withdraw(recipient, amount).await?,
261                    event: None,
262                    action: action.clone(),
263                });
264            }
265            Action::Announce(data) => {
266                debug!(mutliaddress = %data.multiaddress(), "announcing node");
267                let tx_hash = self.tx_exec.announce(data.clone()).await?;
268                IndexerExpectation::new(
269                    tx_hash,
270                    move |event| matches!(event, ChainEventType::Announcement{multiaddresses,..} if multiaddresses.contains(data.multiaddress())),
271                )
272            }
273            Action::RegisterSafe(safe_address) => {
274                debug!(%safe_address, "registering safe");
275                let tx_hash = self.tx_exec.register_safe(safe_address).await?;
276                IndexerExpectation::new(
277                    tx_hash,
278                    move |event| matches!(event, ChainEventType::NodeSafeRegistered(address) if safe_address.eq(address)),
279                )
280            }
281        };
282
283        let tx_hash = expectation.tx_hash;
284        debug!(?action, %tx_hash, "action submitted via tx, registering expectation");
285
286        // Register a new expectation and await it with timeout
287        let confirmation = self.action_state.register_expectation(expectation).await?.fuse();
288        let timeout = futures_timer::Delay::new(self.cfg.max_action_confirmation_wait).fuse();
289
290        pin_mut!(confirmation, timeout);
291
292        match futures::future::select(confirmation, timeout).await {
293            Either::Left((Ok(chain_event), _)) => Ok(ActionConfirmation {
294                tx_hash: chain_event.tx_hash,
295                event: Some(chain_event.event_type),
296                action,
297            }),
298            Either::Left((Err(_), _)) => {
299                self.action_state.unregister_expectation(tx_hash).await;
300                Err(InvalidState("action expectation was removed before resolving".into()))
301            }
302            Either::Right(_) => {
303                self.action_state.unregister_expectation(tx_hash).await;
304                Err(Timeout)
305            }
306        }
307    }
308}
309
310type QueueReceiver = futures::channel::mpsc::Receiver<(Action, ActionFinisher)>;
311
312/// A queue of [Actions](Action) to be executed.
313///
314/// This queue awaits new Actions to arrive, translates them into Ethereum
315/// transactions via [TransactionExecutor] to execute them and await their confirmation
316/// by registering their corresponding expectations in [ActionState].
317#[derive(Debug, Clone)]
318pub struct ActionQueue<Db, S, TxExec>
319where
320    Db: HoprDbTicketOperations + Send + Sync,
321    S: ActionState + Send + Sync,
322    TxExec: TransactionExecutor + Send + Sync,
323{
324    db: Db,
325    queue_send: futures::channel::mpsc::Sender<(Action, ActionFinisher)>,
326    queue_recv: Arc<std::sync::Mutex<Option<QueueReceiver>>>,
327    ctx: ExecutionContext<S, TxExec>,
328}
329
330impl<Db, S, TxExec> ActionQueue<Db, S, TxExec>
331where
332    Db: HoprDbTicketOperations + Clone + Send + Sync + 'static,
333    S: ActionState + Send + Sync + 'static,
334    TxExec: TransactionExecutor + Send + Sync + 'static,
335{
336    /// Number of pending transactions in the queue
337    pub const ACTION_QUEUE_SIZE: usize = 2048;
338
339    /// Creates a new instance with the given [TransactionExecutor] and [ActionState] implementations.
340    pub fn new(db: Db, action_state: S, tx_exec: TxExec, cfg: ActionQueueConfig) -> Self {
341        let (queue_send, queue_recv) = futures::channel::mpsc::channel(Self::ACTION_QUEUE_SIZE);
342        Self {
343            db,
344            ctx: ExecutionContext {
345                action_state: Arc::new(action_state),
346                tx_exec: Arc::new(tx_exec),
347                cfg,
348            },
349            queue_send,
350            queue_recv: Arc::new(std::sync::Mutex::new(queue_recv.into())),
351        }
352    }
353
354    /// Creates a new producer of actions for this queue.
355    pub fn new_sender(&self) -> ActionSender {
356        ActionSender(self.queue_send.clone())
357    }
358
359    /// Clones the `ActionState` implementation.
360    pub fn action_state(&self) -> Arc<S> {
361        self.ctx.action_state.clone()
362    }
363
364    /// Consumes self and runs the main queue processing loop until the queue is closed.
365    ///
366    /// The method will panic if the Channel Domain Separator is not yet populated in the DB.
367    #[tracing::instrument(level = "debug", skip(self))]
368    pub async fn start(self) {
369        let queue_recv = self
370            .queue_recv
371            .lock()
372            .ok()
373            .and_then(|mut lock| lock.take())
374            .expect("impossible state: queue receiver has been already started");
375
376        pin_mut!(queue_recv);
377        while let Some((act, tx_finisher)) = queue_recv.next().await {
378            // Some minimum separation to avoid batching txs
379            futures_timer::Delay::new(Duration::from_millis(100)).await;
380
381            let exec_context = self.ctx.clone();
382            let db_clone = self.db.clone();
383
384            // NOTE: the process is "daemonized" and not awaited, so it will run in the background
385            spawn(async move {
386                let act_id = act.to_string();
387                let act_name: &'static str = (&act).into();
388                trace!(act_id, act_name, "executing");
389
390                let tx_result = exec_context.execute_action(act.clone()).await;
391                match &tx_result {
392                    Ok(confirmation) => {
393                        info!(%confirmation, "successful confirmation");
394
395                        #[cfg(all(feature = "prometheus", not(test)))]
396                        METRIC_COUNT_ACTIONS.increment(&[act_name, "success"]);
397                    }
398                    Err(err) => {
399                        // On error in the Ticket redeem action, we also need to reset ack ticket state
400                        if let Action::RedeemTicket(ack) = act {
401                            error!(rror = %err, "marking the acknowledged ticket as untouched - redeem action failed");
402
403                            if let Err(e) = db_clone
404                                .update_ticket_states((&ack).into(), AcknowledgedTicketStatus::Untouched)
405                                .await
406                            {
407                                error!(%ack, error = %e, "cannot mark ticket as untouched");
408                            }
409                        }
410
411                        // Timeouts are accounted in different metric
412                        if let Timeout = err {
413                            error!(act_id, "timeout while waiting for confirmation");
414
415                            #[cfg(all(feature = "prometheus", not(test)))]
416                            METRIC_COUNT_ACTIONS.increment(&[act_name, "timeout"]);
417                        } else {
418                            error!(act_id, error = %err, "ticket action failed");
419
420                            #[cfg(all(feature = "prometheus", not(test)))]
421                            METRIC_COUNT_ACTIONS.increment(&[act_name, "failure"]);
422                        }
423                    }
424                }
425
426                let _ = tx_finisher.send(tx_result);
427            });
428        }
429        error!("action queue has finished, it should be running for the node to be able to process chain actions");
430    }
431}