hopr_chain_actions/
action_state.rs

1//! This module adds functionality of tracking the action results via expectations.
2//!
3//! It contains implementation of types necessary to perform tracking the
4//! on-chain state of [Actions](hopr_chain_types::actions::Action).
5//! Once an [Action](hopr_chain_types::actions::Action) is submitted to the chain, an [IndexerExpectation]
6//! can be created and registered in an object implementing the [ActionState] trait.
7//! The expectation typically consists of a required transaction hash and a predicate of [ChainEventType]
8//! that must match on any chain event log in a block containing the given transaction hash.
9//!
10//! ### Example
11//! Once the [RegisterSafe(`0x0123..ef`)](hopr_chain_types::actions::Action) action that has been submitted via
12//! [ActionQueue](crate::action_queue::ActionQueue) in a transaction with hash `0xabcd...00`.
13//! The [IndexerExpectation] is such that whatever block that will contain the TX hash `0xabcd..00` must also contain
14//! a log that matches [NodeSafeRegistered(`0x0123..ef`)](ChainEventType) event type.
15//! If such event is never encountered by the Indexer, the safe registration action naturally times out.
16use std::{
17    collections::HashMap,
18    fmt::{Debug, Formatter},
19    future::Future,
20    pin::Pin,
21    sync::Arc,
22};
23
24use async_lock::RwLock;
25use async_trait::async_trait;
26use futures::{FutureExt, TryFutureExt, channel};
27use hopr_chain_types::chain_events::{ChainEventType, SignificantChainEvent};
28use hopr_crypto_types::types::Hash;
29use tracing::{debug, error, trace};
30
31use crate::errors::{ChainActionsError, Result};
32
33/// Future that resolves once an expectation is matched by some [SignificantChainEvent].
34/// Also allows mocking in tests.
35pub type ExpectationResolver = Pin<Box<dyn Future<Output = Result<SignificantChainEvent>> + Send>>;
36
37/// Allows tracking state of an [Action](hopr_chain_types::actions::Action) via registering
38/// [IndexerExpectations](IndexerExpectation) on [SignificantChainEvents](SignificantChainEvent) coming from the Indexer
39/// and resolving them as they are matched. Once expectations are matched, they are automatically unregistered.
40#[cfg_attr(test, mockall::automock)]
41#[async_trait]
42pub trait ActionState {
43    /// Tries to match the given event against the registered expectations.
44    /// Each matched expectation is resolved, unregistered and returned.
45    async fn match_and_resolve(&self, event: &SignificantChainEvent) -> Vec<IndexerExpectation>;
46
47    /// Registers new [IndexerExpectation].
48    async fn register_expectation(&self, exp: IndexerExpectation) -> Result<ExpectationResolver>;
49
50    /// Manually unregisters `IndexerExpectation` given its TX hash.
51    async fn unregister_expectation(&self, tx_hash: Hash);
52}
53
54/// Expectation on a chain event within a TX indexed by the Indexer.
55pub struct IndexerExpectation {
56    /// Required TX hash
57    pub tx_hash: Hash,
58    predicate: Box<dyn Fn(&ChainEventType) -> bool + Send + Sync>,
59}
60
61impl Debug for IndexerExpectation {
62    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("IndexerExpectation")
64            .field("tx_hash", &self.tx_hash)
65            .finish_non_exhaustive()
66    }
67}
68
69impl IndexerExpectation {
70    /// Constructs new expectation given the required TX hash and chain event matcher in that TX.
71    pub fn new<F>(tx_hash: Hash, expectation: F) -> Self
72    where
73        F: Fn(&ChainEventType) -> bool + Send + Sync + 'static,
74    {
75        Self {
76            tx_hash,
77            predicate: Box::new(expectation),
78        }
79    }
80
81    /// Evaluates if the given event satisfies this expectation.
82    pub fn test(&self, event: &SignificantChainEvent) -> bool {
83        event.tx_hash == self.tx_hash && (self.predicate)(&event.event_type)
84    }
85}
86
87type ExpectationTable = HashMap<Hash, (IndexerExpectation, channel::oneshot::Sender<SignificantChainEvent>)>;
88
89/// Implements [action state](ActionState) tracking using a non-persistent in-memory hash table of
90/// assumed [IndexerExpectations](IndexerExpectation).
91#[derive(Debug, Clone)]
92pub struct IndexerActionTracker {
93    expectations: Arc<RwLock<ExpectationTable>>,
94}
95
96impl Default for IndexerActionTracker {
97    fn default() -> Self {
98        Self {
99            expectations: Arc::new(RwLock::new(HashMap::new())),
100        }
101    }
102}
103
104#[async_trait]
105impl ActionState for IndexerActionTracker {
106    #[tracing::instrument(level = "debug", skip(self))]
107    async fn match_and_resolve(&self, event: &SignificantChainEvent) -> Vec<IndexerExpectation> {
108        let matched_keys = self
109            .expectations
110            .read_arc()
111            .await
112            .iter()
113            .filter_map(|(k, (e, _))| e.test(event).then_some(*k))
114            .collect::<Vec<_>>();
115
116        if matched_keys.is_empty() {
117            trace!(%event, "no expectations matched for event");
118            return Vec::new();
119        }
120
121        debug!(count = matched_keys.len(), %event, "found expectations to match event",);
122
123        let mut db_write_locked = self.expectations.write_arc().await;
124
125        matched_keys
126            .into_iter()
127            .filter_map(|key| {
128                db_write_locked
129                    .remove(&key)
130                    .and_then(|(exp, sender)| match sender.send(event.clone()) {
131                        Ok(_) => {
132                            debug!(%event, tx_hash = %key, "expectation resolved");
133                            Some(exp)
134                        }
135                        Err(_) => {
136                            error!(
137                                %event, "failed to resolve actions, because the action confirmation already timed out",
138                            );
139                            None
140                        }
141                    })
142            })
143            .collect()
144    }
145
146    #[tracing::instrument(level = "debug", skip(self))]
147    async fn register_expectation(&self, exp: IndexerExpectation) -> Result<ExpectationResolver> {
148        let mut db = self.expectations.write_arc().await;
149        if let std::collections::hash_map::Entry::Vacant(e) = db.entry(exp.tx_hash) {
150            let (tx, rx) = channel::oneshot::channel();
151            e.insert((exp, tx));
152            Ok(rx.map_err(|_| ChainActionsError::ExpectationUnregistered).boxed())
153        } else {
154            // TODO: currently cannot register multiple expectations for the same TX hash
155            Err(ChainActionsError::InvalidState(format!(
156                "expectation for tx {} already present",
157                exp.tx_hash
158            )))
159        }
160    }
161
162    #[tracing::instrument(level = "debug", skip(self))]
163    async fn unregister_expectation(&self, tx_hash: Hash) {
164        self.expectations.write_arc().await.remove(&tx_hash);
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use std::{sync::Arc, time::Duration};
171
172    use anyhow::Context;
173    use hex_literal::hex;
174    use hopr_chain_types::chain_events::{ChainEventType, NetworkRegistryStatus, SignificantChainEvent};
175    use hopr_crypto_random::random_bytes;
176    use hopr_crypto_types::types::Hash;
177    use hopr_primitive_types::prelude::*;
178    use tokio::time::timeout;
179
180    use crate::{
181        action_state::{ActionState, IndexerActionTracker, IndexerExpectation},
182        errors::ChainActionsError,
183    };
184
185    lazy_static::lazy_static! {
186        // some random address
187        static ref RANDY: Address = hex!("60f8492b6fbaf86ac2b064c90283d8978a491a01").into();
188    }
189
190    #[tokio::test]
191    async fn test_expectation_should_resolve() -> anyhow::Result<()> {
192        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
193        let sample_event = SignificantChainEvent {
194            tx_hash: random_hash,
195            event_type: ChainEventType::NodeSafeRegistered(*RANDY),
196        };
197
198        let exp = Arc::new(IndexerActionTracker::default());
199
200        let sample_event_clone = sample_event.clone();
201        let exp_clone = exp.clone();
202        tokio::task::spawn(async move {
203            tokio::time::sleep(Duration::from_millis(200)).await; // delay
204            let hash = exp_clone.match_and_resolve(&sample_event_clone).await;
205            assert!(
206                hash.iter().all(|e| e.tx_hash == random_hash),
207                "hash must be present as resolved"
208            );
209        });
210
211        let resolution = timeout(
212            Duration::from_secs(5),
213            exp.register_expectation(IndexerExpectation::new(random_hash, move |e| {
214                matches!(e, ChainEventType::NodeSafeRegistered(_))
215            }))
216            .await?,
217        )
218        .await?
219        .context("resolver must not be cancelled")?;
220
221        assert_eq!(sample_event, resolution, "resolving event must be equal");
222
223        Ok(())
224    }
225
226    #[tokio::test]
227    async fn test_expectation_should_error_when_unregistered() -> anyhow::Result<()> {
228        let sample_event = SignificantChainEvent {
229            tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
230            event_type: ChainEventType::NodeSafeRegistered(*RANDY),
231        };
232
233        let exp = Arc::new(IndexerActionTracker::default());
234
235        let sample_event_clone = sample_event.clone();
236        let exp_clone = exp.clone();
237        tokio::task::spawn(async move {
238            tokio::time::sleep(Duration::from_millis(200)).await; // delay
239            exp_clone.unregister_expectation(sample_event_clone.tx_hash).await;
240        });
241
242        let err = timeout(
243            Duration::from_secs(5),
244            exp.register_expectation(IndexerExpectation::new(sample_event.tx_hash, move |e| {
245                matches!(e, ChainEventType::NodeSafeRegistered(_))
246            }))
247            .await?,
248        )
249        .await?
250        .expect_err("should return with error");
251
252        assert!(
253            matches!(err, ChainActionsError::ExpectationUnregistered),
254            "should notify on unregistration"
255        );
256
257        Ok(())
258    }
259
260    #[tokio::test]
261    async fn test_expectation_should_resolve_and_filter() -> anyhow::Result<()> {
262        let tx_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
263        let sample_events = vec![
264            SignificantChainEvent {
265                tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
266                event_type: ChainEventType::NodeSafeRegistered(*RANDY),
267            },
268            SignificantChainEvent {
269                tx_hash,
270                event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Denied),
271            },
272            SignificantChainEvent {
273                tx_hash,
274                event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Allowed),
275            },
276        ];
277
278        let exp = Arc::new(IndexerActionTracker::default());
279
280        let sample_events_clone = sample_events.clone();
281        let exp_clone = exp.clone();
282        tokio::task::spawn(async move {
283            for sample_event in sample_events_clone {
284                tokio::time::sleep(Duration::from_millis(200)).await; // delay
285                exp_clone.match_and_resolve(&sample_event).await;
286            }
287        });
288
289        let resolution = timeout(
290            Duration::from_secs(5),
291            exp.register_expectation(IndexerExpectation::new(tx_hash, move |e| {
292                matches!(
293                    e,
294                    ChainEventType::NetworkRegistryUpdate(_, NetworkRegistryStatus::Allowed)
295                )
296            }))
297            .await?,
298        )
299        .await?
300        .context("resolver must not be cancelled")?;
301
302        assert_eq!(sample_events[2], resolution, "resolving event must be equal");
303
304        Ok(())
305    }
306
307    #[tokio::test]
308    async fn test_expectation_should_resolve_multiple_expectations() -> anyhow::Result<()> {
309        let sample_events = vec![
310            SignificantChainEvent {
311                tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
312                event_type: ChainEventType::NodeSafeRegistered(*RANDY),
313            },
314            SignificantChainEvent {
315                tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
316                event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Denied),
317            },
318            SignificantChainEvent {
319                tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
320                event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Allowed),
321            },
322        ];
323
324        let exp = Arc::new(IndexerActionTracker::default());
325
326        let sample_events_clone = sample_events.clone();
327        let exp_clone = exp.clone();
328        tokio::task::spawn(async move {
329            for sample_event in sample_events_clone {
330                tokio::time::sleep(Duration::from_millis(100)).await; // delay
331                exp_clone.match_and_resolve(&sample_event).await;
332            }
333        });
334
335        let registered_exps = vec![
336            exp.register_expectation(IndexerExpectation::new(sample_events[2].tx_hash, move |e| {
337                matches!(
338                    e,
339                    ChainEventType::NetworkRegistryUpdate(_, NetworkRegistryStatus::Allowed)
340                )
341            }))
342            .await
343            .context("should register 1")?,
344            exp.register_expectation(IndexerExpectation::new(sample_events[0].tx_hash, move |e| {
345                matches!(e, ChainEventType::NodeSafeRegistered(_))
346            }))
347            .await
348            .context("should register 2")?,
349        ];
350
351        let resolutions = timeout(Duration::from_secs(5), futures::future::try_join_all(registered_exps))
352            .await?
353            .context("no resolver can cancel")?;
354
355        assert_eq!(sample_events[2], resolutions[0], "resolving event 1 must be equal");
356        assert_eq!(sample_events[0], resolutions[1], "resolving event 2 must be equal");
357
358        Ok(())
359    }
360}