1use std::{
6 fmt::{Display, Formatter},
7 future::Future,
8 pin::Pin,
9 sync::Arc,
10 time::Duration,
11};
12
13use async_trait::async_trait;
14use futures::{FutureExt, SinkExt, StreamExt, future::Either, pin_mut};
15use hopr_api::db::HoprDbTicketOperations;
16use hopr_async_runtime::prelude::spawn;
17use hopr_chain_types::{actions::Action, chain_events::ChainEventType};
18use hopr_crypto_types::types::Hash;
19use hopr_internal_types::prelude::*;
20use hopr_primitive_types::prelude::*;
21use serde::{Deserialize, Serialize};
22use tracing::{debug, error, info, trace, warn};
23
24use crate::{
25 action_state::{ActionState, IndexerExpectation},
26 errors::{
27 ChainActionsError::{ChannelAlreadyClosed, InvalidState, Timeout, TransactionSubmissionFailed},
28 Result,
29 },
30};
31
32#[cfg(all(feature = "prometheus", not(test)))]
33lazy_static::lazy_static! {
34 static ref METRIC_COUNT_ACTIONS: hopr_metrics::MultiCounter = hopr_metrics::MultiCounter::new(
35 "hopr_chain_actions_count",
36 "Number of different chain actions and their results",
37 &["action", "result"]
38 )
39 .unwrap();
40}
41
42#[cfg_attr(test, mockall::automock)]
46#[async_trait]
47pub trait TransactionExecutor {
48 async fn redeem_ticket(&self, ticket: RedeemableTicket) -> Result<Hash>;
50
51 async fn fund_channel(&self, destination: Address, balance: HoprBalance) -> Result<Hash>;
54
55 async fn initiate_outgoing_channel_closure(&self, dst: Address) -> Result<Hash>;
57
58 async fn finalize_outgoing_channel_closure(&self, dst: Address) -> Result<Hash>;
60
61 async fn close_incoming_channel(&self, src: Address) -> Result<Hash>;
63
64 async fn withdraw<C: Currency + Send + 'static>(&self, recipient: Address, amount: Balance<C>) -> Result<Hash>;
68
69 async fn announce(&self, data: AnnouncementData) -> Result<Hash>;
71
72 async fn register_safe(&self, safe_address: Address) -> Result<Hash>;
74}
75
76#[derive(Debug, Clone, PartialEq)]
78pub struct ActionConfirmation {
79 pub tx_hash: Hash,
81
82 pub event: Option<ChainEventType>,
84
85 pub action: Action,
87}
88
89impl Display for ActionConfirmation {
90 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91 write!(f, "{} confirmed in tx {}", self.action, self.tx_hash)
92 }
93}
94
95pub type PendingAction = Pin<Box<dyn Future<Output = Result<ActionConfirmation>> + Send>>;
97
98type ActionFinisher = futures::channel::oneshot::Sender<Result<ActionConfirmation>>;
100
101#[derive(Debug, Clone)]
103pub struct ActionSender(futures::channel::mpsc::Sender<(Action, ActionFinisher)>);
104
105impl ActionSender {
106 #[tracing::instrument(level = "debug", skip(self))]
108 pub async fn send(&self, action: Action) -> Result<PendingAction> {
109 let completer = futures::channel::oneshot::channel();
110 let mut sender = self.0.clone();
111
112 sender
113 .send((action, completer.0))
114 .await
115 .map(|_| {
116 completer
117 .1
118 .map(|r| r.unwrap_or(Err(InvalidState("channel cancelled".into()))))
119 .boxed()
120 })
121 .map_err(|_| TransactionSubmissionFailed("ethereum tx queue is closed".into()))
122 }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, smart_default::SmartDefault, Serialize, Deserialize)]
127pub struct ActionQueueConfig {
128 #[default(Duration::from_secs(150))]
132 pub max_action_confirmation_wait: Duration,
133}
134
135#[derive(Debug)]
136struct ExecutionContext<S, TxExec>
137where
138 S: ActionState,
139 TxExec: TransactionExecutor,
140{
141 action_state: Arc<S>,
142 tx_exec: Arc<TxExec>,
143 cfg: ActionQueueConfig,
144}
145
146impl<S, TxExec> Clone for ExecutionContext<S, TxExec>
148where
149 S: ActionState,
150 TxExec: TransactionExecutor,
151{
152 fn clone(&self) -> Self {
153 Self {
154 action_state: self.action_state.clone(),
155 tx_exec: self.tx_exec.clone(),
156 cfg: self.cfg,
157 }
158 }
159}
160
161impl<S, TxExec> ExecutionContext<S, TxExec>
162where
163 S: ActionState,
164 TxExec: TransactionExecutor,
165{
166 #[tracing::instrument(level = "debug", skip(self))]
167 pub async fn execute_action(self, action: Action) -> Result<ActionConfirmation> {
168 let expectation = match action.clone() {
169 Action::RedeemTicket(ticket) => {
170 debug!(%ticket, "redeeming ticket");
171 let ticket_channel_id = ticket.verified_ticket().channel_id;
172 let tx_hash = self.tx_exec.redeem_ticket(ticket).await?;
173 IndexerExpectation::new(
174 tx_hash,
175 move |event| matches!(event, ChainEventType::TicketRedeemed(channel, _) if ticket_channel_id == channel.get_id()),
176 )
177 }
178
179 Action::OpenChannel(address, stake) => {
180 debug!(%address, %stake, "opening channel");
181 let tx_hash = self.tx_exec.fund_channel(address, stake).await?;
182 IndexerExpectation::new(
183 tx_hash,
184 move |event| matches!(event, ChainEventType::ChannelOpened(channel) if channel.destination == address),
185 )
186 }
187
188 Action::FundChannel(channel, amount) => {
189 if channel.status == ChannelStatus::Open {
190 debug!(%channel, "funding channel");
191 let tx_hash = self.tx_exec.fund_channel(channel.destination, amount).await?;
192 IndexerExpectation::new(
193 tx_hash,
194 move |event| matches!(event, ChainEventType::ChannelBalanceIncreased(r_channel, diff) if r_channel.get_id() == channel.get_id() && amount.eq(diff)),
195 )
196 } else {
197 return Err(InvalidState(format!("cannot fund {channel} because it is not opened")));
198 }
199 }
200
201 Action::CloseChannel(channel, direction) => match direction {
202 ChannelDirection::Incoming => match channel.status {
203 ChannelStatus::Open | ChannelStatus::PendingToClose(_) => {
204 debug!(%channel, "closing incoming channel");
205 let tx_hash = self.tx_exec.close_incoming_channel(channel.source).await?;
206 IndexerExpectation::new(
207 tx_hash,
208 move |event| matches!(event, ChainEventType::ChannelClosed(r_channel) if r_channel.get_id() == channel.get_id()),
209 )
210 }
211 ChannelStatus::Closed => {
212 warn!(%channel, "channel already closed");
213 return Err(ChannelAlreadyClosed);
214 }
215 },
216 ChannelDirection::Outgoing => match channel.status {
217 ChannelStatus::Open => {
218 debug!(%channel, "initiating channel closure");
219 let tx_hash = self
220 .tx_exec
221 .initiate_outgoing_channel_closure(channel.destination)
222 .await?;
223 IndexerExpectation::new(
224 tx_hash,
225 move |event| matches!(event, ChainEventType::ChannelClosureInitiated(r_channel) if r_channel.get_id() == channel.get_id()),
226 )
227 }
228 ChannelStatus::PendingToClose(_) => {
229 debug!(%channel, "finalizing channel closure");
230 let tx_hash = self
231 .tx_exec
232 .finalize_outgoing_channel_closure(channel.destination)
233 .await?;
234 IndexerExpectation::new(
235 tx_hash,
236 move |event| matches!(event, ChainEventType::ChannelClosed(r_channel) if r_channel.get_id() == channel.get_id()),
237 )
238 }
239 ChannelStatus::Closed => {
240 warn!(%channel, "channel already closed");
241 return Err(ChannelAlreadyClosed);
242 }
243 },
244 },
245
246 Action::Withdraw(recipient, amount) => {
250 debug!(%recipient, %amount, "withdrawing funds");
251 return Ok(ActionConfirmation {
252 tx_hash: self.tx_exec.withdraw(recipient, amount).await?,
253 event: None,
254 action: action.clone(),
255 });
256 }
257 Action::WithdrawNative(recipient, amount) => {
258 debug!(%recipient, %amount, "withdrawing native funds");
259 return Ok(ActionConfirmation {
260 tx_hash: self.tx_exec.withdraw(recipient, amount).await?,
261 event: None,
262 action: action.clone(),
263 });
264 }
265 Action::Announce(data) => {
266 debug!(mutliaddress = %data.multiaddress(), "announcing node");
267 let tx_hash = self.tx_exec.announce(data.clone()).await?;
268 IndexerExpectation::new(
269 tx_hash,
270 move |event| matches!(event, ChainEventType::Announcement{multiaddresses,..} if multiaddresses.contains(data.multiaddress())),
271 )
272 }
273 Action::RegisterSafe(safe_address) => {
274 debug!(%safe_address, "registering safe");
275 let tx_hash = self.tx_exec.register_safe(safe_address).await?;
276 IndexerExpectation::new(
277 tx_hash,
278 move |event| matches!(event, ChainEventType::NodeSafeRegistered(address) if safe_address.eq(address)),
279 )
280 }
281 };
282
283 let tx_hash = expectation.tx_hash;
284 debug!(?action, %tx_hash, "action submitted via tx, registering expectation");
285
286 let confirmation = self.action_state.register_expectation(expectation).await?.fuse();
288 let timeout = futures_timer::Delay::new(self.cfg.max_action_confirmation_wait).fuse();
289
290 pin_mut!(confirmation, timeout);
291
292 match futures::future::select(confirmation, timeout).await {
293 Either::Left((Ok(chain_event), _)) => Ok(ActionConfirmation {
294 tx_hash: chain_event.tx_hash,
295 event: Some(chain_event.event_type),
296 action,
297 }),
298 Either::Left((Err(_), _)) => {
299 self.action_state.unregister_expectation(tx_hash).await;
300 Err(InvalidState("action expectation was removed before resolving".into()))
301 }
302 Either::Right(_) => {
303 self.action_state.unregister_expectation(tx_hash).await;
304 Err(Timeout)
305 }
306 }
307 }
308}
309
310type QueueReceiver = futures::channel::mpsc::Receiver<(Action, ActionFinisher)>;
311
312#[derive(Debug, Clone)]
318pub struct ActionQueue<Db, S, TxExec>
319where
320 Db: HoprDbTicketOperations + Send + Sync,
321 S: ActionState + Send + Sync,
322 TxExec: TransactionExecutor + Send + Sync,
323{
324 db: Db,
325 queue_send: futures::channel::mpsc::Sender<(Action, ActionFinisher)>,
326 queue_recv: Arc<std::sync::Mutex<Option<QueueReceiver>>>,
327 ctx: ExecutionContext<S, TxExec>,
328}
329
330impl<Db, S, TxExec> ActionQueue<Db, S, TxExec>
331where
332 Db: HoprDbTicketOperations + Clone + Send + Sync + 'static,
333 S: ActionState + Send + Sync + 'static,
334 TxExec: TransactionExecutor + Send + Sync + 'static,
335{
336 pub const ACTION_QUEUE_SIZE: usize = 2048;
338
339 pub fn new(db: Db, action_state: S, tx_exec: TxExec, cfg: ActionQueueConfig) -> Self {
341 let (queue_send, queue_recv) = futures::channel::mpsc::channel(Self::ACTION_QUEUE_SIZE);
342 Self {
343 db,
344 ctx: ExecutionContext {
345 action_state: Arc::new(action_state),
346 tx_exec: Arc::new(tx_exec),
347 cfg,
348 },
349 queue_send,
350 queue_recv: Arc::new(std::sync::Mutex::new(queue_recv.into())),
351 }
352 }
353
354 pub fn new_sender(&self) -> ActionSender {
356 ActionSender(self.queue_send.clone())
357 }
358
359 pub fn action_state(&self) -> Arc<S> {
361 self.ctx.action_state.clone()
362 }
363
364 #[tracing::instrument(level = "debug", skip(self))]
368 pub async fn start(self) {
369 let queue_recv = self
370 .queue_recv
371 .lock()
372 .ok()
373 .and_then(|mut lock| lock.take())
374 .expect("impossible state: queue receiver has been already started");
375
376 pin_mut!(queue_recv);
377 while let Some((act, tx_finisher)) = queue_recv.next().await {
378 futures_timer::Delay::new(Duration::from_millis(100)).await;
380
381 let exec_context = self.ctx.clone();
382 let db_clone = self.db.clone();
383
384 spawn(async move {
386 let act_id = act.to_string();
387 let act_name: &'static str = (&act).into();
388 trace!(act_id, act_name, "executing");
389
390 let tx_result = exec_context.execute_action(act.clone()).await;
391 match &tx_result {
392 Ok(confirmation) => {
393 info!(%confirmation, "successful confirmation");
394
395 #[cfg(all(feature = "prometheus", not(test)))]
396 METRIC_COUNT_ACTIONS.increment(&[act_name, "success"]);
397 }
398 Err(err) => {
399 if let Action::RedeemTicket(ack) = act {
401 error!(rror = %err, "marking the acknowledged ticket as untouched - redeem action failed");
402
403 if let Err(e) = db_clone
404 .update_ticket_states((&ack).into(), AcknowledgedTicketStatus::Untouched)
405 .await
406 {
407 error!(%ack, error = %e, "cannot mark ticket as untouched");
408 }
409 }
410
411 if let Timeout = err {
413 error!(act_id, "timeout while waiting for confirmation");
414
415 #[cfg(all(feature = "prometheus", not(test)))]
416 METRIC_COUNT_ACTIONS.increment(&[act_name, "timeout"]);
417 } else {
418 error!(act_id, error = %err, "ticket action failed");
419
420 #[cfg(all(feature = "prometheus", not(test)))]
421 METRIC_COUNT_ACTIONS.increment(&[act_name, "failure"]);
422 }
423 }
424 }
425
426 let _ = tx_finisher.send(tx_result);
427 });
428 }
429 error!("action queue has finished, it should be running for the node to be able to process chain actions");
430 }
431}