1use std::{
6 fmt::{Display, Formatter},
7 future::Future,
8 pin::Pin,
9 sync::Arc,
10 time::Duration,
11};
12
13use async_channel::{Receiver, Sender, bounded};
14use async_trait::async_trait;
15use futures::{FutureExt, StreamExt, future::Either, pin_mut};
16use hopr_async_runtime::prelude::spawn;
17use hopr_chain_types::{actions::Action, chain_events::ChainEventType};
18use hopr_crypto_types::types::Hash;
19use hopr_db_sql::{api::tickets::HoprDbTicketOperations, info::HoprDbInfoOperations};
20use hopr_internal_types::prelude::*;
21use hopr_primitive_types::prelude::*;
22use serde::{Deserialize, Serialize};
23use tracing::{debug, error, info, trace, warn};
24
25use crate::{
26 action_state::{ActionState, IndexerExpectation},
27 errors::{
28 ChainActionsError::{ChannelAlreadyClosed, InvalidState, Timeout, TransactionSubmissionFailed},
29 Result,
30 },
31};
32
33#[cfg(all(feature = "prometheus", not(test)))]
34lazy_static::lazy_static! {
35 static ref METRIC_COUNT_ACTIONS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
36 "hopr_chain_actions_count",
37 "Number of different chain actions and their results",
38 &["action", "result"]
39 )
40 .unwrap();
41}
42
43#[cfg_attr(test, mockall::automock)]
47#[async_trait]
48pub trait TransactionExecutor {
49 async fn redeem_ticket(&self, ticket: RedeemableTicket) -> Result<Hash>;
51
52 async fn fund_channel(&self, destination: Address, balance: HoprBalance) -> Result<Hash>;
55
56 async fn initiate_outgoing_channel_closure(&self, dst: Address) -> Result<Hash>;
58
59 async fn finalize_outgoing_channel_closure(&self, dst: Address) -> Result<Hash>;
61
62 async fn close_incoming_channel(&self, src: Address) -> Result<Hash>;
64
65 async fn withdraw<C: Currency + Send + 'static>(&self, recipient: Address, amount: Balance<C>) -> Result<Hash>;
69
70 async fn announce(&self, data: AnnouncementData) -> Result<Hash>;
72
73 async fn register_safe(&self, safe_address: Address) -> Result<Hash>;
75}
76
77#[derive(Debug, Clone, PartialEq)]
79pub struct ActionConfirmation {
80 pub tx_hash: Hash,
82
83 pub event: Option<ChainEventType>,
85
86 pub action: Action,
88}
89
90impl Display for ActionConfirmation {
91 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92 write!(f, "{} confirmed in tx {}", self.action, self.tx_hash)
93 }
94}
95
96pub type PendingAction = Pin<Box<dyn Future<Output = Result<ActionConfirmation>> + Send>>;
98
99type ActionFinisher = futures::channel::oneshot::Sender<Result<ActionConfirmation>>;
101
102#[derive(Debug, Clone)]
104pub struct ActionSender(Sender<(Action, ActionFinisher)>);
105
106impl ActionSender {
107 #[tracing::instrument(level = "debug", skip(self))]
109 pub async fn send(&self, action: Action) -> Result<PendingAction> {
110 let completer = futures::channel::oneshot::channel();
111 let sender = self.0.clone();
112
113 sender
114 .send((action, completer.0))
115 .await
116 .map(|_| {
117 completer
118 .1
119 .map(|r| r.unwrap_or(Err(InvalidState("channel cancelled".into()))))
120 .boxed()
121 })
122 .map_err(|_| TransactionSubmissionFailed("ethereum tx queue is closed".into()))
123 }
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
128pub struct ActionQueueConfig {
129 #[default(Duration::from_secs(150))]
133 pub max_action_confirmation_wait: Duration,
134}
135
136#[derive(Debug)]
137struct ExecutionContext<S, TxExec>
138where
139 S: ActionState,
140 TxExec: TransactionExecutor,
141{
142 action_state: Arc<S>,
143 tx_exec: Arc<TxExec>,
144 cfg: ActionQueueConfig,
145}
146
147impl<S, TxExec> Clone for ExecutionContext<S, TxExec>
149where
150 S: ActionState,
151 TxExec: TransactionExecutor,
152{
153 fn clone(&self) -> Self {
154 Self {
155 action_state: self.action_state.clone(),
156 tx_exec: self.tx_exec.clone(),
157 cfg: self.cfg,
158 }
159 }
160}
161
162impl<S, TxExec> ExecutionContext<S, TxExec>
163where
164 S: ActionState,
165 TxExec: TransactionExecutor,
166{
167 #[tracing::instrument(level = "debug", skip(self))]
168 pub async fn execute_action(self, action: Action) -> Result<ActionConfirmation> {
169 let expectation = match action.clone() {
170 Action::RedeemTicket(ticket) => {
171 debug!(%ticket, "redeeming ticket");
172 let ticket_channel_id = ticket.verified_ticket().channel_id;
173 let tx_hash = self.tx_exec.redeem_ticket(ticket).await?;
174 IndexerExpectation::new(
175 tx_hash,
176 move |event| matches!(event, ChainEventType::TicketRedeemed(channel, _) if ticket_channel_id == channel.get_id()),
177 )
178 }
179
180 Action::OpenChannel(address, stake) => {
181 debug!(%address, %stake, "opening channel");
182 let tx_hash = self.tx_exec.fund_channel(address, stake).await?;
183 IndexerExpectation::new(
184 tx_hash,
185 move |event| matches!(event, ChainEventType::ChannelOpened(channel) if channel.destination == address),
186 )
187 }
188
189 Action::FundChannel(channel, amount) => {
190 if channel.status == ChannelStatus::Open {
191 debug!(%channel, "funding channel");
192 let tx_hash = self.tx_exec.fund_channel(channel.destination, amount).await?;
193 IndexerExpectation::new(
194 tx_hash,
195 move |event| matches!(event, ChainEventType::ChannelBalanceIncreased(r_channel, diff) if r_channel.get_id() == channel.get_id() && amount.eq(diff)),
196 )
197 } else {
198 return Err(InvalidState(format!("cannot fund {channel} because it is not opened")));
199 }
200 }
201
202 Action::CloseChannel(channel, direction) => match direction {
203 ChannelDirection::Incoming => match channel.status {
204 ChannelStatus::Open | ChannelStatus::PendingToClose(_) => {
205 debug!(%channel, "closing incoming channel");
206 let tx_hash = self.tx_exec.close_incoming_channel(channel.source).await?;
207 IndexerExpectation::new(
208 tx_hash,
209 move |event| matches!(event, ChainEventType::ChannelClosed(r_channel) if r_channel.get_id() == channel.get_id()),
210 )
211 }
212 ChannelStatus::Closed => {
213 warn!(%channel, "channel already closed");
214 return Err(ChannelAlreadyClosed);
215 }
216 },
217 ChannelDirection::Outgoing => match channel.status {
218 ChannelStatus::Open => {
219 debug!(%channel, "initiating channel closure");
220 let tx_hash = self
221 .tx_exec
222 .initiate_outgoing_channel_closure(channel.destination)
223 .await?;
224 IndexerExpectation::new(
225 tx_hash,
226 move |event| matches!(event, ChainEventType::ChannelClosureInitiated(r_channel) if r_channel.get_id() == channel.get_id()),
227 )
228 }
229 ChannelStatus::PendingToClose(_) => {
230 debug!(%channel, "finalizing channel closure");
231 let tx_hash = self
232 .tx_exec
233 .finalize_outgoing_channel_closure(channel.destination)
234 .await?;
235 IndexerExpectation::new(
236 tx_hash,
237 move |event| matches!(event, ChainEventType::ChannelClosed(r_channel) if r_channel.get_id() == channel.get_id()),
238 )
239 }
240 ChannelStatus::Closed => {
241 warn!(%channel, "channel already closed");
242 return Err(ChannelAlreadyClosed);
243 }
244 },
245 },
246
247 Action::Withdraw(recipient, amount) => {
251 debug!(%recipient, %amount, "withdrawing funds");
252 return Ok(ActionConfirmation {
253 tx_hash: self.tx_exec.withdraw(recipient, amount).await?,
254 event: None,
255 action: action.clone(),
256 });
257 }
258 Action::WithdrawNative(recipient, amount) => {
259 debug!(%recipient, %amount, "withdrawing native funds");
260 return Ok(ActionConfirmation {
261 tx_hash: self.tx_exec.withdraw(recipient, amount).await?,
262 event: None,
263 action: action.clone(),
264 });
265 }
266 Action::Announce(data) => {
267 debug!(mutliaddress = %data.multiaddress(), "announcing node");
268 let tx_hash = self.tx_exec.announce(data.clone()).await?;
269 IndexerExpectation::new(
270 tx_hash,
271 move |event| matches!(event, ChainEventType::Announcement{multiaddresses,..} if multiaddresses.contains(data.multiaddress())),
272 )
273 }
274 Action::RegisterSafe(safe_address) => {
275 debug!(%safe_address, "registering safe");
276 let tx_hash = self.tx_exec.register_safe(safe_address).await?;
277 IndexerExpectation::new(
278 tx_hash,
279 move |event| matches!(event, ChainEventType::NodeSafeRegistered(address) if safe_address.eq(address)),
280 )
281 }
282 };
283
284 let tx_hash = expectation.tx_hash;
285 debug!(?action, %tx_hash, "action submitted via tx, registering expectation");
286
287 let confirmation = self.action_state.register_expectation(expectation).await?.fuse();
289 let timeout = futures_timer::Delay::new(self.cfg.max_action_confirmation_wait).fuse();
290
291 pin_mut!(confirmation, timeout);
292
293 match futures::future::select(confirmation, timeout).await {
294 Either::Left((Ok(chain_event), _)) => Ok(ActionConfirmation {
295 tx_hash: chain_event.tx_hash,
296 event: Some(chain_event.event_type),
297 action,
298 }),
299 Either::Left((Err(_), _)) => {
300 self.action_state.unregister_expectation(tx_hash).await;
301 Err(InvalidState("action expectation was removed before resolving".into()))
302 }
303 Either::Right(_) => {
304 self.action_state.unregister_expectation(tx_hash).await;
305 Err(Timeout)
306 }
307 }
308 }
309}
310
311#[derive(Debug, Clone)]
317pub struct ActionQueue<Db, S, TxExec>
318where
319 Db: HoprDbInfoOperations + HoprDbTicketOperations + Send + Sync,
320 S: ActionState + Send + Sync,
321 TxExec: TransactionExecutor + Send + Sync,
322{
323 db: Db,
324 queue_send: Sender<(Action, ActionFinisher)>,
325 queue_recv: Receiver<(Action, ActionFinisher)>,
326 ctx: ExecutionContext<S, TxExec>,
327}
328
329impl<Db, S, TxExec> ActionQueue<Db, S, TxExec>
330where
331 Db: HoprDbInfoOperations + HoprDbTicketOperations + Clone + Send + Sync + 'static,
332 S: ActionState + Send + Sync + 'static,
333 TxExec: TransactionExecutor + Send + Sync + 'static,
334{
335 pub const ACTION_QUEUE_SIZE: usize = 2048;
337
338 pub fn new(db: Db, action_state: S, tx_exec: TxExec, cfg: ActionQueueConfig) -> Self {
340 let (queue_send, queue_recv) = bounded(Self::ACTION_QUEUE_SIZE);
341 Self {
342 db,
343 ctx: ExecutionContext {
344 action_state: Arc::new(action_state),
345 tx_exec: Arc::new(tx_exec),
346 cfg,
347 },
348 queue_send,
349 queue_recv,
350 }
351 }
352
353 pub fn new_sender(&self) -> ActionSender {
355 ActionSender(self.queue_send.clone())
356 }
357
358 pub fn action_state(&self) -> Arc<S> {
360 self.ctx.action_state.clone()
361 }
362
363 #[tracing::instrument(level = "debug", skip(self))]
367 pub async fn start(self) {
368 let queue_recv = self.queue_recv.clone();
369 pin_mut!(queue_recv);
370 while let Some((act, tx_finisher)) = queue_recv.next().await {
371 futures_timer::Delay::new(Duration::from_millis(100)).await;
373
374 let exec_context = self.ctx.clone();
375 let db_clone = self.db.clone();
376
377 spawn(async move {
379 let act_id = act.to_string();
380 let act_name: &'static str = (&act).into();
381 trace!(act_id, act_name, "executing");
382
383 let tx_result = exec_context.execute_action(act.clone()).await;
384 match &tx_result {
385 Ok(confirmation) => {
386 info!(%confirmation, "successful confirmation");
387
388 #[cfg(all(feature = "prometheus", not(test)))]
389 METRIC_COUNT_ACTIONS.increment(&[act_name, "success"]);
390 }
391 Err(err) => {
392 if let Action::RedeemTicket(ack) = act {
394 error!(rror = %err, "marking the acknowledged ticket as untouched - redeem action failed");
395
396 if let Err(e) = db_clone
397 .update_ticket_states((&ack).into(), AcknowledgedTicketStatus::Untouched)
398 .await
399 {
400 error!(%ack, error = %e, "cannot mark ticket as untouched");
401 }
402 }
403
404 if let Timeout = err {
406 error!(act_id, "timeout while waiting for confirmation");
407
408 #[cfg(all(feature = "prometheus", not(test)))]
409 METRIC_COUNT_ACTIONS.increment(&[act_name, "timeout"]);
410 } else {
411 error!(act_id, error = %err, "ticket action failed");
412
413 #[cfg(all(feature = "prometheus", not(test)))]
414 METRIC_COUNT_ACTIONS.increment(&[act_name, "failure"]);
415 }
416 }
417 }
418
419 let _ = tx_finisher.send(tx_result);
420 });
421 }
422 error!("action queue has finished, it should be running for the node to be able to process chain actions");
423 }
424}