1use async_lock::RwLock;
17use async_trait::async_trait;
18use futures::{channel, FutureExt, TryFutureExt};
19use hopr_chain_types::chain_events::{ChainEventType, SignificantChainEvent};
20use hopr_crypto_types::types::Hash;
21use std::collections::hash_map::Entry;
22use std::collections::HashMap;
23use std::fmt::{Debug, Formatter};
24use std::future::Future;
25use std::pin::Pin;
26use std::sync::Arc;
27use tracing::{debug, error};
28
29use crate::errors::{ChainActionsError, Result};
30
31pub type ExpectationResolver = Pin<Box<dyn Future<Output = Result<SignificantChainEvent>> + Send>>;
34
35#[cfg_attr(test, mockall::automock)]
39#[async_trait]
40pub trait ActionState {
41 async fn match_and_resolve(&self, event: &SignificantChainEvent) -> Vec<IndexerExpectation>;
44
45 async fn register_expectation(&self, exp: IndexerExpectation) -> Result<ExpectationResolver>;
47
48 async fn unregister_expectation(&self, tx_hash: Hash);
50}
51
52pub struct IndexerExpectation {
54 pub tx_hash: Hash,
56 predicate: Box<dyn Fn(&ChainEventType) -> bool + Send + Sync>,
57}
58
59impl Debug for IndexerExpectation {
60 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("IndexerExpectation")
62 .field("tx_hash", &self.tx_hash)
63 .finish_non_exhaustive()
64 }
65}
66
67impl IndexerExpectation {
68 pub fn new<F>(tx_hash: Hash, expectation: F) -> Self
70 where
71 F: Fn(&ChainEventType) -> bool + Send + Sync + 'static,
72 {
73 Self {
74 tx_hash,
75 predicate: Box::new(expectation),
76 }
77 }
78
79 pub fn test(&self, event: &SignificantChainEvent) -> bool {
81 event.tx_hash == self.tx_hash && (self.predicate)(&event.event_type)
82 }
83}
84
85type ExpectationTable = HashMap<Hash, (IndexerExpectation, channel::oneshot::Sender<SignificantChainEvent>)>;
86
87#[derive(Debug, Clone)]
90pub struct IndexerActionTracker {
91 expectations: Arc<RwLock<ExpectationTable>>,
92}
93
94impl Default for IndexerActionTracker {
95 fn default() -> Self {
96 Self {
97 expectations: Arc::new(RwLock::new(HashMap::new())),
98 }
99 }
100}
101
102#[async_trait]
103impl ActionState for IndexerActionTracker {
104 #[tracing::instrument(level = "debug", skip(self))]
105 async fn match_and_resolve(&self, event: &SignificantChainEvent) -> Vec<IndexerExpectation> {
106 let matched_keys = self
107 .expectations
108 .read()
109 .await
110 .iter()
111 .filter_map(|(k, (e, _))| e.test(event).then_some(*k))
112 .collect::<Vec<_>>();
113
114 debug!(count = matched_keys.len(), ?event, "found expectations to match event",);
115
116 if matched_keys.is_empty() {
117 return Vec::new();
118 }
119 let mut db = self.expectations.write().await;
120 matched_keys
121 .into_iter()
122 .filter_map(|key| {
123 db.remove(&key)
124 .and_then(|(exp, sender)| match sender.send(event.clone()) {
125 Ok(_) => {
126 debug!(%event, "expectation resolved ");
127 Some(exp)
128 }
129 Err(_) => {
130 error!(
131 %event, "failed to resolve actions, because the action confirmation already timed out",
132 );
133 None
134 }
135 })
136 })
137 .collect()
138 }
139
140 #[tracing::instrument(level = "debug", skip(self))]
141 async fn register_expectation(&self, exp: IndexerExpectation) -> Result<ExpectationResolver> {
142 match self.expectations.write().await.entry(exp.tx_hash) {
143 Entry::Occupied(_) => {
144 return Err(ChainActionsError::InvalidState(format!(
146 "expectation for tx {} already present",
147 exp.tx_hash
148 )));
149 }
150 Entry::Vacant(e) => {
151 let (tx, rx) = channel::oneshot::channel();
152 e.insert((exp, tx));
153 Ok(rx.map_err(|_| ChainActionsError::ExpectationUnregistered).boxed())
154 }
155 }
156 }
157
158 #[tracing::instrument(level = "debug", skip(self))]
159 async fn unregister_expectation(&self, tx_hash: Hash) {
160 self.expectations.write().await.remove(&tx_hash);
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use crate::action_state::{ActionState, IndexerActionTracker, IndexerExpectation};
167 use crate::errors::ChainActionsError;
168 use anyhow::Context;
169 use async_std::prelude::FutureExt;
170 use hex_literal::hex;
171 use hopr_chain_types::chain_events::{ChainEventType, NetworkRegistryStatus, SignificantChainEvent};
172 use hopr_crypto_random::random_bytes;
173 use hopr_crypto_types::types::Hash;
174 use hopr_primitive_types::prelude::*;
175 use std::sync::Arc;
176 use std::time::Duration;
177
178 lazy_static::lazy_static! {
179 static ref RANDY: Address = hex!("60f8492b6fbaf86ac2b064c90283d8978a491a01").into();
181 }
182
183 #[async_std::test]
184 async fn test_expectation_should_resolve() -> anyhow::Result<()> {
185 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
186 let sample_event = SignificantChainEvent {
187 tx_hash: random_hash,
188 event_type: ChainEventType::NodeSafeRegistered(*RANDY),
189 };
190
191 let exp = Arc::new(IndexerActionTracker::default());
192
193 let sample_event_clone = sample_event.clone();
194 let exp_clone = exp.clone();
195 async_std::task::spawn(async move {
196 let hash = exp_clone
197 .match_and_resolve(&sample_event_clone)
198 .delay(Duration::from_millis(200))
199 .await;
200 assert!(
201 hash.iter().all(|e| e.tx_hash == random_hash),
202 "hash must be present as resolved"
203 );
204 });
205
206 let resolution = exp
207 .register_expectation(IndexerExpectation::new(random_hash, move |e| {
208 matches!(e, ChainEventType::NodeSafeRegistered(_))
209 }))
210 .await?
211 .timeout(Duration::from_secs(5))
212 .await?
213 .context("resolver must not be cancelled")?;
214
215 assert_eq!(sample_event, resolution, "resolving event must be equal");
216
217 Ok(())
218 }
219
220 #[async_std::test]
221 async fn test_expectation_should_error_when_unregistered() -> anyhow::Result<()> {
222 let sample_event = SignificantChainEvent {
223 tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
224 event_type: ChainEventType::NodeSafeRegistered(*RANDY),
225 };
226
227 let exp = Arc::new(IndexerActionTracker::default());
228
229 let sample_event_clone = sample_event.clone();
230 let exp_clone = exp.clone();
231 async_std::task::spawn(async move {
232 exp_clone
233 .unregister_expectation(sample_event_clone.tx_hash)
234 .delay(Duration::from_millis(200))
235 .await;
236 });
237
238 let err = exp
239 .register_expectation(IndexerExpectation::new(sample_event.tx_hash, move |e| {
240 matches!(e, ChainEventType::NodeSafeRegistered(_))
241 }))
242 .await?
243 .timeout(Duration::from_secs(5))
244 .await?
245 .expect_err("should return with error");
246
247 assert!(
248 matches!(err, ChainActionsError::ExpectationUnregistered),
249 "should notify on unregistration"
250 );
251
252 Ok(())
253 }
254
255 #[async_std::test]
256 async fn test_expectation_should_resolve_and_filter() -> anyhow::Result<()> {
257 let tx_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
258 let sample_events = vec![
259 SignificantChainEvent {
260 tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
261 event_type: ChainEventType::NodeSafeRegistered(*RANDY),
262 },
263 SignificantChainEvent {
264 tx_hash,
265 event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Denied),
266 },
267 SignificantChainEvent {
268 tx_hash,
269 event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Allowed),
270 },
271 ];
272
273 let exp = Arc::new(IndexerActionTracker::default());
274
275 let sample_events_clone = sample_events.clone();
276 let exp_clone = exp.clone();
277 async_std::task::spawn(async move {
278 for sample_event in sample_events_clone {
279 exp_clone
280 .match_and_resolve(&sample_event)
281 .delay(Duration::from_millis(200))
282 .await;
283 }
284 });
285
286 let resolution = exp
287 .register_expectation(IndexerExpectation::new(tx_hash, move |e| {
288 matches!(
289 e,
290 ChainEventType::NetworkRegistryUpdate(_, NetworkRegistryStatus::Allowed)
291 )
292 }))
293 .await?
294 .timeout(Duration::from_secs(5))
295 .await?
296 .context("resolver must not be cancelled")?;
297
298 assert_eq!(sample_events[2], resolution, "resolving event must be equal");
299
300 Ok(())
301 }
302
303 #[async_std::test]
304 async fn test_expectation_should_resolve_multiple_expectations() -> anyhow::Result<()> {
305 let sample_events = vec![
306 SignificantChainEvent {
307 tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
308 event_type: ChainEventType::NodeSafeRegistered(*RANDY),
309 },
310 SignificantChainEvent {
311 tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
312 event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Denied),
313 },
314 SignificantChainEvent {
315 tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
316 event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Allowed),
317 },
318 ];
319
320 let exp = Arc::new(IndexerActionTracker::default());
321
322 let sample_events_clone = sample_events.clone();
323 let exp_clone = exp.clone();
324 async_std::task::spawn(async move {
325 for sample_event in sample_events_clone {
326 exp_clone
327 .match_and_resolve(&sample_event)
328 .delay(Duration::from_millis(100))
329 .await;
330 }
331 });
332
333 let registered_exps = vec![
334 exp.register_expectation(IndexerExpectation::new(sample_events[2].tx_hash, move |e| {
335 matches!(
336 e,
337 ChainEventType::NetworkRegistryUpdate(_, NetworkRegistryStatus::Allowed)
338 )
339 }))
340 .await
341 .context("should register 1")?,
342 exp.register_expectation(IndexerExpectation::new(sample_events[0].tx_hash, move |e| {
343 matches!(e, ChainEventType::NodeSafeRegistered(_))
344 }))
345 .await
346 .context("should register 2")?,
347 ];
348
349 let resolutions = futures::future::try_join_all(registered_exps)
350 .timeout(Duration::from_secs(5))
351 .await?
352 .context("no resolver can cancel")?;
353
354 assert_eq!(sample_events[2], resolutions[0], "resolving event 1 must be equal");
355 assert_eq!(sample_events[0], resolutions[1], "resolving event 2 must be equal");
356
357 Ok(())
358 }
359}