hopr_chain_actions/
action_state.rs

1//! This module adds functionality of tracking the action results via expectations.
2//!
3//! It contains implementation of types necessary to perform tracking the
4//! on-chain state of [Actions](hopr_chain_types::actions::Action).
5//! Once an [Action](hopr_chain_types::actions::Action) is submitted to the chain, an [IndexerExpectation]
6//! can be created and registered in an object implementing the [ActionState] trait.
7//! The expectation typically consists of a required transaction hash and a predicate of [ChainEventType]
8//! that must match on any chain event log in a block containing the given transaction hash.
9//!
10//! ### Example
11//! Once the [RegisterSafe(`0x0123..ef`)](hopr_chain_types::actions::Action) action that has been submitted via [ActionQueue](crate::action_queue::ActionQueue)
12//! in a transaction with hash `0xabcd...00`.
13//! The [IndexerExpectation] is such that whatever block that will contain the TX hash `0xabcd..00` must also contain
14//! a log that matches [NodeSafeRegistered(`0x0123..ef`)](ChainEventType) event type.
15//! If such event is never encountered by the Indexer, the safe registration action naturally times out.
16use async_lock::RwLock;
17use async_trait::async_trait;
18use futures::{channel, FutureExt, TryFutureExt};
19use hopr_chain_types::chain_events::{ChainEventType, SignificantChainEvent};
20use hopr_crypto_types::types::Hash;
21use std::collections::hash_map::Entry;
22use std::collections::HashMap;
23use std::fmt::{Debug, Formatter};
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use tracing::{debug, error};
28
29use crate::errors::{ChainActionsError, Result};
30
31/// Future that resolves once an expectation is matched by some [SignificantChainEvent].
32/// Also allows mocking in tests.
33pub type ExpectationResolver = Pin<Box<dyn Future<Output = Result<SignificantChainEvent>> + Send>>;
34
35/// Allows tracking state of an [Action](hopr_chain_types::actions::Action) via registering [IndexerExpectations](IndexerExpectation) on
36/// [SignificantChainEvents](SignificantChainEvent) coming from the Indexer and resolving them as they are
37/// matched. Once expectations are matched, they are automatically unregistered.
38#[cfg_attr(test, mockall::automock)]
39#[async_trait]
40pub trait ActionState {
41    /// Tries to match the given event against the registered expectations.
42    /// Each matched expectation is resolved, unregistered and returned.
43    async fn match_and_resolve(&self, event: &SignificantChainEvent) -> Vec<IndexerExpectation>;
44
45    /// Registers new [IndexerExpectation].
46    async fn register_expectation(&self, exp: IndexerExpectation) -> Result<ExpectationResolver>;
47
48    /// Manually unregisters `IndexerExpectation` given its TX hash.
49    async fn unregister_expectation(&self, tx_hash: Hash);
50}
51
52/// Expectation on a chain event within a TX indexed by the Indexer.
53pub struct IndexerExpectation {
54    /// Required TX hash
55    pub tx_hash: Hash,
56    predicate: Box<dyn Fn(&ChainEventType) -> bool + Send + Sync>,
57}
58
59impl Debug for IndexerExpectation {
60    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("IndexerExpectation")
62            .field("tx_hash", &self.tx_hash)
63            .finish_non_exhaustive()
64    }
65}
66
67impl IndexerExpectation {
68    /// Constructs new expectation given the required TX hash and chain event matcher in that TX.
69    pub fn new<F>(tx_hash: Hash, expectation: F) -> Self
70    where
71        F: Fn(&ChainEventType) -> bool + Send + Sync + 'static,
72    {
73        Self {
74            tx_hash,
75            predicate: Box::new(expectation),
76        }
77    }
78
79    /// Evaluates if the given event satisfies this expectation.
80    pub fn test(&self, event: &SignificantChainEvent) -> bool {
81        event.tx_hash == self.tx_hash && (self.predicate)(&event.event_type)
82    }
83}
84
85type ExpectationTable = HashMap<Hash, (IndexerExpectation, channel::oneshot::Sender<SignificantChainEvent>)>;
86
87/// Implements [action state](ActionState) tracking using a non-persistent in-memory hash table of
88/// assumed [IndexerExpectations](IndexerExpectation).
89#[derive(Debug, Clone)]
90pub struct IndexerActionTracker {
91    expectations: Arc<RwLock<ExpectationTable>>,
92}
93
94impl Default for IndexerActionTracker {
95    fn default() -> Self {
96        Self {
97            expectations: Arc::new(RwLock::new(HashMap::new())),
98        }
99    }
100}
101
102#[async_trait]
103impl ActionState for IndexerActionTracker {
104    #[tracing::instrument(level = "debug", skip(self))]
105    async fn match_and_resolve(&self, event: &SignificantChainEvent) -> Vec<IndexerExpectation> {
106        let matched_keys = self
107            .expectations
108            .read()
109            .await
110            .iter()
111            .filter_map(|(k, (e, _))| e.test(event).then_some(*k))
112            .collect::<Vec<_>>();
113
114        debug!(count = matched_keys.len(), ?event, "found expectations to match event",);
115
116        if matched_keys.is_empty() {
117            return Vec::new();
118        }
119        let mut db = self.expectations.write().await;
120        matched_keys
121            .into_iter()
122            .filter_map(|key| {
123                db.remove(&key)
124                    .and_then(|(exp, sender)| match sender.send(event.clone()) {
125                        Ok(_) => {
126                            debug!(%event, "expectation resolved ");
127                            Some(exp)
128                        }
129                        Err(_) => {
130                            error!(
131                                %event, "failed to resolve actions, because the action confirmation already timed out",
132                            );
133                            None
134                        }
135                    })
136            })
137            .collect()
138    }
139
140    #[tracing::instrument(level = "debug", skip(self))]
141    async fn register_expectation(&self, exp: IndexerExpectation) -> Result<ExpectationResolver> {
142        match self.expectations.write().await.entry(exp.tx_hash) {
143            Entry::Occupied(_) => {
144                // TODO: currently cannot register multiple expectations for the same TX hash
145                return Err(ChainActionsError::InvalidState(format!(
146                    "expectation for tx {} already present",
147                    exp.tx_hash
148                )));
149            }
150            Entry::Vacant(e) => {
151                let (tx, rx) = channel::oneshot::channel();
152                e.insert((exp, tx));
153                Ok(rx.map_err(|_| ChainActionsError::ExpectationUnregistered).boxed())
154            }
155        }
156    }
157
158    #[tracing::instrument(level = "debug", skip(self))]
159    async fn unregister_expectation(&self, tx_hash: Hash) {
160        self.expectations.write().await.remove(&tx_hash);
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use crate::action_state::{ActionState, IndexerActionTracker, IndexerExpectation};
167    use crate::errors::ChainActionsError;
168    use anyhow::Context;
169    use async_std::prelude::FutureExt;
170    use hex_literal::hex;
171    use hopr_chain_types::chain_events::{ChainEventType, NetworkRegistryStatus, SignificantChainEvent};
172    use hopr_crypto_random::random_bytes;
173    use hopr_crypto_types::types::Hash;
174    use hopr_primitive_types::prelude::*;
175    use std::sync::Arc;
176    use std::time::Duration;
177
178    lazy_static::lazy_static! {
179        // some random address
180        static ref RANDY: Address = hex!("60f8492b6fbaf86ac2b064c90283d8978a491a01").into();
181    }
182
183    #[async_std::test]
184    async fn test_expectation_should_resolve() -> anyhow::Result<()> {
185        let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
186        let sample_event = SignificantChainEvent {
187            tx_hash: random_hash,
188            event_type: ChainEventType::NodeSafeRegistered(*RANDY),
189        };
190
191        let exp = Arc::new(IndexerActionTracker::default());
192
193        let sample_event_clone = sample_event.clone();
194        let exp_clone = exp.clone();
195        async_std::task::spawn(async move {
196            let hash = exp_clone
197                .match_and_resolve(&sample_event_clone)
198                .delay(Duration::from_millis(200))
199                .await;
200            assert!(
201                hash.iter().all(|e| e.tx_hash == random_hash),
202                "hash must be present as resolved"
203            );
204        });
205
206        let resolution = exp
207            .register_expectation(IndexerExpectation::new(random_hash, move |e| {
208                matches!(e, ChainEventType::NodeSafeRegistered(_))
209            }))
210            .await?
211            .timeout(Duration::from_secs(5))
212            .await?
213            .context("resolver must not be cancelled")?;
214
215        assert_eq!(sample_event, resolution, "resolving event must be equal");
216
217        Ok(())
218    }
219
220    #[async_std::test]
221    async fn test_expectation_should_error_when_unregistered() -> anyhow::Result<()> {
222        let sample_event = SignificantChainEvent {
223            tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
224            event_type: ChainEventType::NodeSafeRegistered(*RANDY),
225        };
226
227        let exp = Arc::new(IndexerActionTracker::default());
228
229        let sample_event_clone = sample_event.clone();
230        let exp_clone = exp.clone();
231        async_std::task::spawn(async move {
232            exp_clone
233                .unregister_expectation(sample_event_clone.tx_hash)
234                .delay(Duration::from_millis(200))
235                .await;
236        });
237
238        let err = exp
239            .register_expectation(IndexerExpectation::new(sample_event.tx_hash, move |e| {
240                matches!(e, ChainEventType::NodeSafeRegistered(_))
241            }))
242            .await?
243            .timeout(Duration::from_secs(5))
244            .await?
245            .expect_err("should return with error");
246
247        assert!(
248            matches!(err, ChainActionsError::ExpectationUnregistered),
249            "should notify on unregistration"
250        );
251
252        Ok(())
253    }
254
255    #[async_std::test]
256    async fn test_expectation_should_resolve_and_filter() -> anyhow::Result<()> {
257        let tx_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
258        let sample_events = vec![
259            SignificantChainEvent {
260                tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
261                event_type: ChainEventType::NodeSafeRegistered(*RANDY),
262            },
263            SignificantChainEvent {
264                tx_hash,
265                event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Denied),
266            },
267            SignificantChainEvent {
268                tx_hash,
269                event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Allowed),
270            },
271        ];
272
273        let exp = Arc::new(IndexerActionTracker::default());
274
275        let sample_events_clone = sample_events.clone();
276        let exp_clone = exp.clone();
277        async_std::task::spawn(async move {
278            for sample_event in sample_events_clone {
279                exp_clone
280                    .match_and_resolve(&sample_event)
281                    .delay(Duration::from_millis(200))
282                    .await;
283            }
284        });
285
286        let resolution = exp
287            .register_expectation(IndexerExpectation::new(tx_hash, move |e| {
288                matches!(
289                    e,
290                    ChainEventType::NetworkRegistryUpdate(_, NetworkRegistryStatus::Allowed)
291                )
292            }))
293            .await?
294            .timeout(Duration::from_secs(5))
295            .await?
296            .context("resolver must not be cancelled")?;
297
298        assert_eq!(sample_events[2], resolution, "resolving event must be equal");
299
300        Ok(())
301    }
302
303    #[async_std::test]
304    async fn test_expectation_should_resolve_multiple_expectations() -> anyhow::Result<()> {
305        let sample_events = vec![
306            SignificantChainEvent {
307                tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
308                event_type: ChainEventType::NodeSafeRegistered(*RANDY),
309            },
310            SignificantChainEvent {
311                tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
312                event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Denied),
313            },
314            SignificantChainEvent {
315                tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
316                event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Allowed),
317            },
318        ];
319
320        let exp = Arc::new(IndexerActionTracker::default());
321
322        let sample_events_clone = sample_events.clone();
323        let exp_clone = exp.clone();
324        async_std::task::spawn(async move {
325            for sample_event in sample_events_clone {
326                exp_clone
327                    .match_and_resolve(&sample_event)
328                    .delay(Duration::from_millis(100))
329                    .await;
330            }
331        });
332
333        let registered_exps = vec![
334            exp.register_expectation(IndexerExpectation::new(sample_events[2].tx_hash, move |e| {
335                matches!(
336                    e,
337                    ChainEventType::NetworkRegistryUpdate(_, NetworkRegistryStatus::Allowed)
338                )
339            }))
340            .await
341            .context("should register 1")?,
342            exp.register_expectation(IndexerExpectation::new(sample_events[0].tx_hash, move |e| {
343                matches!(e, ChainEventType::NodeSafeRegistered(_))
344            }))
345            .await
346            .context("should register 2")?,
347        ];
348
349        let resolutions = futures::future::try_join_all(registered_exps)
350            .timeout(Duration::from_secs(5))
351            .await?
352            .context("no resolver can cancel")?;
353
354        assert_eq!(sample_events[2], resolutions[0], "resolving event 1 must be equal");
355        assert_eq!(sample_events[0], resolutions[1], "resolving event 2 must be equal");
356
357        Ok(())
358    }
359}