1use std::{
17 collections::HashMap,
18 fmt::{Debug, Formatter},
19 future::Future,
20 pin::Pin,
21 sync::Arc,
22};
23
24use async_lock::RwLock;
25use async_trait::async_trait;
26use futures::{FutureExt, TryFutureExt, channel};
27use hopr_chain_types::chain_events::{ChainEventType, SignificantChainEvent};
28use hopr_crypto_types::types::Hash;
29use tracing::{debug, error, trace};
30
31use crate::errors::{ChainActionsError, Result};
32
33pub type ExpectationResolver = Pin<Box<dyn Future<Output = Result<SignificantChainEvent>> + Send>>;
36
37#[cfg_attr(test, mockall::automock)]
41#[async_trait]
42pub trait ActionState {
43 async fn match_and_resolve(&self, event: &SignificantChainEvent) -> Vec<IndexerExpectation>;
46
47 async fn register_expectation(&self, exp: IndexerExpectation) -> Result<ExpectationResolver>;
49
50 async fn unregister_expectation(&self, tx_hash: Hash);
52}
53
54pub struct IndexerExpectation {
56 pub tx_hash: Hash,
58 predicate: Box<dyn Fn(&ChainEventType) -> bool + Send + Sync>,
59}
60
61impl Debug for IndexerExpectation {
62 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63 f.debug_struct("IndexerExpectation")
64 .field("tx_hash", &self.tx_hash)
65 .finish_non_exhaustive()
66 }
67}
68
69impl IndexerExpectation {
70 pub fn new<F>(tx_hash: Hash, expectation: F) -> Self
72 where
73 F: Fn(&ChainEventType) -> bool + Send + Sync + 'static,
74 {
75 Self {
76 tx_hash,
77 predicate: Box::new(expectation),
78 }
79 }
80
81 pub fn test(&self, event: &SignificantChainEvent) -> bool {
83 event.tx_hash == self.tx_hash && (self.predicate)(&event.event_type)
84 }
85}
86
87type ExpectationTable = HashMap<Hash, (IndexerExpectation, channel::oneshot::Sender<SignificantChainEvent>)>;
88
89#[derive(Debug, Clone)]
92pub struct IndexerActionTracker {
93 expectations: Arc<RwLock<ExpectationTable>>,
94}
95
96impl Default for IndexerActionTracker {
97 fn default() -> Self {
98 Self {
99 expectations: Arc::new(RwLock::new(HashMap::new())),
100 }
101 }
102}
103
104#[async_trait]
105impl ActionState for IndexerActionTracker {
106 #[tracing::instrument(level = "debug", skip(self))]
107 async fn match_and_resolve(&self, event: &SignificantChainEvent) -> Vec<IndexerExpectation> {
108 let matched_keys = self
109 .expectations
110 .read_arc()
111 .await
112 .iter()
113 .filter_map(|(k, (e, _))| e.test(event).then_some(*k))
114 .collect::<Vec<_>>();
115
116 if matched_keys.is_empty() {
117 trace!(%event, "no expectations matched for event");
118 return Vec::new();
119 }
120
121 debug!(count = matched_keys.len(), %event, "found expectations to match event",);
122
123 let mut db_write_locked = self.expectations.write_arc().await;
124
125 matched_keys
126 .into_iter()
127 .filter_map(|key| {
128 db_write_locked
129 .remove(&key)
130 .and_then(|(exp, sender)| match sender.send(event.clone()) {
131 Ok(_) => {
132 debug!(%event, tx_hash = %key, "expectation resolved");
133 Some(exp)
134 }
135 Err(_) => {
136 error!(
137 %event, "failed to resolve actions, because the action confirmation already timed out",
138 );
139 None
140 }
141 })
142 })
143 .collect()
144 }
145
146 #[tracing::instrument(level = "debug", skip(self))]
147 async fn register_expectation(&self, exp: IndexerExpectation) -> Result<ExpectationResolver> {
148 let mut db = self.expectations.write_arc().await;
149 if let std::collections::hash_map::Entry::Vacant(e) = db.entry(exp.tx_hash) {
150 let (tx, rx) = channel::oneshot::channel();
151 e.insert((exp, tx));
152 Ok(rx.map_err(|_| ChainActionsError::ExpectationUnregistered).boxed())
153 } else {
154 Err(ChainActionsError::InvalidState(format!(
156 "expectation for tx {} already present",
157 exp.tx_hash
158 )))
159 }
160 }
161
162 #[tracing::instrument(level = "debug", skip(self))]
163 async fn unregister_expectation(&self, tx_hash: Hash) {
164 self.expectations.write_arc().await.remove(&tx_hash);
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use std::{sync::Arc, time::Duration};
171
172 use anyhow::Context;
173 use hex_literal::hex;
174 use hopr_chain_types::chain_events::{ChainEventType, NetworkRegistryStatus, SignificantChainEvent};
175 use hopr_crypto_random::random_bytes;
176 use hopr_crypto_types::types::Hash;
177 use hopr_primitive_types::prelude::*;
178 use tokio::time::timeout;
179
180 use crate::{
181 action_state::{ActionState, IndexerActionTracker, IndexerExpectation},
182 errors::ChainActionsError,
183 };
184
185 lazy_static::lazy_static! {
186 static ref RANDY: Address = hex!("60f8492b6fbaf86ac2b064c90283d8978a491a01").into();
188 }
189
190 #[tokio::test]
191 async fn test_expectation_should_resolve() -> anyhow::Result<()> {
192 let random_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
193 let sample_event = SignificantChainEvent {
194 tx_hash: random_hash,
195 event_type: ChainEventType::NodeSafeRegistered(*RANDY),
196 };
197
198 let exp = Arc::new(IndexerActionTracker::default());
199
200 let sample_event_clone = sample_event.clone();
201 let exp_clone = exp.clone();
202 tokio::task::spawn(async move {
203 tokio::time::sleep(Duration::from_millis(200)).await; let hash = exp_clone.match_and_resolve(&sample_event_clone).await;
205 assert!(
206 hash.iter().all(|e| e.tx_hash == random_hash),
207 "hash must be present as resolved"
208 );
209 });
210
211 let resolution = timeout(
212 Duration::from_secs(5),
213 exp.register_expectation(IndexerExpectation::new(random_hash, move |e| {
214 matches!(e, ChainEventType::NodeSafeRegistered(_))
215 }))
216 .await?,
217 )
218 .await?
219 .context("resolver must not be cancelled")?;
220
221 assert_eq!(sample_event, resolution, "resolving event must be equal");
222
223 Ok(())
224 }
225
226 #[tokio::test]
227 async fn test_expectation_should_error_when_unregistered() -> anyhow::Result<()> {
228 let sample_event = SignificantChainEvent {
229 tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
230 event_type: ChainEventType::NodeSafeRegistered(*RANDY),
231 };
232
233 let exp = Arc::new(IndexerActionTracker::default());
234
235 let sample_event_clone = sample_event.clone();
236 let exp_clone = exp.clone();
237 tokio::task::spawn(async move {
238 tokio::time::sleep(Duration::from_millis(200)).await; exp_clone.unregister_expectation(sample_event_clone.tx_hash).await;
240 });
241
242 let err = timeout(
243 Duration::from_secs(5),
244 exp.register_expectation(IndexerExpectation::new(sample_event.tx_hash, move |e| {
245 matches!(e, ChainEventType::NodeSafeRegistered(_))
246 }))
247 .await?,
248 )
249 .await?
250 .expect_err("should return with error");
251
252 assert!(
253 matches!(err, ChainActionsError::ExpectationUnregistered),
254 "should notify on unregistration"
255 );
256
257 Ok(())
258 }
259
260 #[tokio::test]
261 async fn test_expectation_should_resolve_and_filter() -> anyhow::Result<()> {
262 let tx_hash = Hash::from(random_bytes::<{ Hash::SIZE }>());
263 let sample_events = vec![
264 SignificantChainEvent {
265 tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
266 event_type: ChainEventType::NodeSafeRegistered(*RANDY),
267 },
268 SignificantChainEvent {
269 tx_hash,
270 event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Denied),
271 },
272 SignificantChainEvent {
273 tx_hash,
274 event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Allowed),
275 },
276 ];
277
278 let exp = Arc::new(IndexerActionTracker::default());
279
280 let sample_events_clone = sample_events.clone();
281 let exp_clone = exp.clone();
282 tokio::task::spawn(async move {
283 for sample_event in sample_events_clone {
284 tokio::time::sleep(Duration::from_millis(200)).await; exp_clone.match_and_resolve(&sample_event).await;
286 }
287 });
288
289 let resolution = timeout(
290 Duration::from_secs(5),
291 exp.register_expectation(IndexerExpectation::new(tx_hash, move |e| {
292 matches!(
293 e,
294 ChainEventType::NetworkRegistryUpdate(_, NetworkRegistryStatus::Allowed)
295 )
296 }))
297 .await?,
298 )
299 .await?
300 .context("resolver must not be cancelled")?;
301
302 assert_eq!(sample_events[2], resolution, "resolving event must be equal");
303
304 Ok(())
305 }
306
307 #[tokio::test]
308 async fn test_expectation_should_resolve_multiple_expectations() -> anyhow::Result<()> {
309 let sample_events = vec![
310 SignificantChainEvent {
311 tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
312 event_type: ChainEventType::NodeSafeRegistered(*RANDY),
313 },
314 SignificantChainEvent {
315 tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
316 event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Denied),
317 },
318 SignificantChainEvent {
319 tx_hash: Hash::from(random_bytes::<{ Hash::SIZE }>()),
320 event_type: ChainEventType::NetworkRegistryUpdate(*RANDY, NetworkRegistryStatus::Allowed),
321 },
322 ];
323
324 let exp = Arc::new(IndexerActionTracker::default());
325
326 let sample_events_clone = sample_events.clone();
327 let exp_clone = exp.clone();
328 tokio::task::spawn(async move {
329 for sample_event in sample_events_clone {
330 tokio::time::sleep(Duration::from_millis(100)).await; exp_clone.match_and_resolve(&sample_event).await;
332 }
333 });
334
335 let registered_exps = vec![
336 exp.register_expectation(IndexerExpectation::new(sample_events[2].tx_hash, move |e| {
337 matches!(
338 e,
339 ChainEventType::NetworkRegistryUpdate(_, NetworkRegistryStatus::Allowed)
340 )
341 }))
342 .await
343 .context("should register 1")?,
344 exp.register_expectation(IndexerExpectation::new(sample_events[0].tx_hash, move |e| {
345 matches!(e, ChainEventType::NodeSafeRegistered(_))
346 }))
347 .await
348 .context("should register 2")?,
349 ];
350
351 let resolutions = timeout(Duration::from_secs(5), futures::future::try_join_all(registered_exps))
352 .await?
353 .context("no resolver can cancel")?;
354
355 assert_eq!(sample_events[2], resolutions[0], "resolving event 1 must be equal");
356 assert_eq!(sample_events[0], resolutions[1], "resolving event 2 must be equal");
357
358 Ok(())
359 }
360}