1use async_trait::async_trait;
2use futures::{stream, StreamExt};
3use sea_orm::entity::Set;
4use sea_orm::query::QueryTrait;
5use sea_orm::sea_query::{Expr, OnConflict, Value};
6use sea_orm::{
7 ActiveModelTrait, ColumnTrait, EntityTrait, FromQueryResult, IntoActiveModel, PaginatorTrait, QueryFilter,
8 QueryOrder, QuerySelect,
9};
10use tracing::{error, trace};
11
12use hopr_crypto_types::prelude::Hash;
13use hopr_db_api::errors::{DbError, Result};
14use hopr_db_api::logs::HoprDbLogOperations;
15use hopr_db_entity::errors::DbEntityError;
16use hopr_db_entity::prelude::{Log, LogStatus, LogTopicInfo};
17use hopr_db_entity::{log, log_status, log_topic_info};
18use hopr_primitive_types::prelude::*;
19
20use crate::db::HoprDb;
21use crate::errors::DbSqlError;
22use crate::{HoprDbGeneralModelOperations, TargetDb};
23
24#[derive(FromQueryResult)]
25struct BlockNumber {
26 block_number: Vec<u8>,
27}
28
29#[async_trait]
30impl HoprDbLogOperations for HoprDb {
31 async fn store_log<'a>(&'a self, log: SerializableLog) -> Result<()> {
32 match self.store_logs([log].to_vec()).await {
33 Ok(results) => {
34 if let Some(result) = results.into_iter().next() {
35 result
36 } else {
37 panic!("when inserting a log into the db, the result should be a single item")
38 }
39 }
40 Err(e) => Err(e),
41 }
42 }
43
44 async fn store_logs(&self, logs: Vec<SerializableLog>) -> Result<Vec<Result<()>>> {
45 self.nest_transaction_in_db(None, TargetDb::Logs)
46 .await?
47 .perform(|tx| {
48 Box::pin(async move {
49 let results = stream::iter(logs).then(|log| async {
50 let model = log::ActiveModel::from(log.clone());
51 let status_model = log_status::ActiveModel::from(log);
52 let log_status_query = LogStatus::insert(status_model).on_conflict(
53 OnConflict::columns([
54 log_status::Column::LogIndex,
55 log_status::Column::TransactionIndex,
56 log_status::Column::BlockNumber,
57 ])
58 .do_nothing()
59 .to_owned(),
60 );
61 let log_query = Log::insert(model).on_conflict(
62 OnConflict::columns([
63 log::Column::LogIndex,
64 log::Column::TransactionIndex,
65 log::Column::BlockNumber,
66 ])
67 .do_nothing()
68 .to_owned(),
69 );
70
71 match log_status_query.exec(tx.as_ref()).await {
72 Ok(_) => match log_query.exec(tx.as_ref()).await {
73 Ok(_) => Ok(()),
74 Err(e) => {
75 error!("Failed to insert log into db: {:?}", e);
76 Err(DbError::General(e.to_string()))
77 }
78 },
79 Err(e) => {
80 error!("Failed to insert log status into db: {:?}", e);
81 Err(DbError::General(e.to_string()))
82 }
83 }
84 });
85 Ok(results.collect::<Vec<_>>().await)
86 })
87 })
88 .await
89 }
90
91 async fn get_log(&self, block_number: u64, tx_index: u64, log_index: u64) -> Result<SerializableLog> {
92 let bn_enc = block_number.to_be_bytes().to_vec();
93 let tx_index_enc = tx_index.to_be_bytes().to_vec();
94 let log_index_enc = log_index.to_be_bytes().to_vec();
95
96 let query = Log::find()
97 .filter(log::Column::BlockNumber.eq(bn_enc))
98 .filter(log::Column::TransactionIndex.eq(tx_index_enc))
99 .filter(log::Column::LogIndex.eq(log_index_enc))
100 .find_also_related(LogStatus);
101
102 match query.all(self.conn(TargetDb::Logs)).await {
103 Ok(mut res) => {
104 if let Some((log, log_status)) = res.pop() {
105 if let Some(status) = log_status {
106 create_log(log, status).map_err(DbError::from)
107 } else {
108 Err(DbError::MissingLogStatus)
109 }
110 } else {
111 Err(DbError::MissingLog)
112 }
113 }
114 Err(e) => Err(DbError::from(DbSqlError::from(e))),
115 }
116 }
117
118 async fn get_logs<'a>(
119 &'a self,
120 block_number: Option<u64>,
121 block_offset: Option<u64>,
122 ) -> Result<Vec<SerializableLog>> {
123 let min_block_number = block_number.unwrap_or(0);
124 let max_block_number = block_offset.map(|v| min_block_number + v + 1);
125
126 let query = Log::find()
127 .find_also_related(LogStatus)
128 .filter(log::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
129 .apply_if(max_block_number, |q, v| {
130 q.filter(log::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
131 })
132 .order_by_asc(log::Column::BlockNumber)
133 .order_by_asc(log::Column::TransactionIndex)
134 .order_by_asc(log::Column::LogIndex);
135
136 match query.all(self.conn(TargetDb::Logs)).await {
137 Ok(logs) => Ok(logs
138 .into_iter()
139 .map(|(log, status)| {
140 if let Some(status) = status {
141 create_log(log, status).unwrap()
142 } else {
143 error!("Missing log status for log in db: {:?}", log);
144 SerializableLog::try_from(log).unwrap()
145 }
146 })
147 .collect()),
148 Err(e) => {
149 error!("Failed to get logs from db: {:?}", e);
150 Err(DbError::from(DbSqlError::from(e)))
151 }
152 }
153 }
154
155 async fn get_logs_count(&self, block_number: Option<u64>, block_offset: Option<u64>) -> Result<u64> {
156 let min_block_number = block_number.unwrap_or(0);
157 let max_block_number = block_offset.map(|v| min_block_number + v + 1);
158
159 Log::find()
160 .select_only()
161 .column(log::Column::BlockNumber)
162 .column(log::Column::TransactionIndex)
163 .column(log::Column::LogIndex)
164 .filter(log::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
165 .apply_if(max_block_number, |q, v| {
166 q.filter(log::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
167 })
168 .count(self.conn(TargetDb::Logs))
169 .await
170 .map_err(|e| DbSqlError::from(e).into())
171 }
172
173 async fn get_logs_block_numbers<'a>(
174 &'a self,
175 block_number: Option<u64>,
176 block_offset: Option<u64>,
177 processed: Option<bool>,
178 ) -> Result<Vec<u64>> {
179 let min_block_number = block_number.unwrap_or(0);
180 let max_block_number = block_offset.map(|v| min_block_number + v + 1);
181
182 LogStatus::find()
183 .select_only()
184 .column(log_status::Column::BlockNumber)
185 .distinct()
186 .filter(log_status::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
187 .apply_if(max_block_number, |q, v| {
188 q.filter(log_status::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
189 })
190 .apply_if(processed, |q, v| q.filter(log_status::Column::Processed.eq(v)))
191 .order_by_asc(log_status::Column::BlockNumber)
192 .into_model::<BlockNumber>()
193 .all(self.conn(TargetDb::Logs))
194 .await
195 .map(|res| {
196 res.into_iter()
197 .map(|b| U256::from_be_bytes(b.block_number).as_u64())
198 .collect()
199 })
200 .map_err(|e| {
201 error!("Failed to get logs block numbers from db: {:?}", e);
202 DbError::from(DbSqlError::from(e))
203 })
204 }
205
206 async fn set_logs_processed(&self, block_number: Option<u64>, block_offset: Option<u64>) -> Result<()> {
207 let min_block_number = block_number.unwrap_or(0);
208 let max_block_number = block_offset.map(|v| min_block_number + v + 1);
209 let now = Utc::now();
210
211 let query = LogStatus::update_many()
212 .col_expr(log_status::Column::Processed, Expr::value(Value::Bool(Some(true))))
213 .col_expr(
214 log_status::Column::ProcessedAt,
215 Expr::value(Value::ChronoDateTimeUtc(Some(now.into()))),
216 )
217 .filter(log_status::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
218 .apply_if(max_block_number, |q, v| {
219 q.filter(log_status::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
220 });
221
222 match query.exec(self.conn(TargetDb::Logs)).await {
223 Ok(_) => Ok(()),
224 Err(e) => Err(DbError::from(DbSqlError::from(e))),
225 }
226 }
227
228 async fn set_log_processed<'a>(&'a self, mut log: SerializableLog) -> Result<()> {
229 log.processed = Some(true);
230 log.processed_at = Some(Utc::now());
231 let log_status = log_status::ActiveModel::from(log);
232
233 let db_tx = self.nest_transaction_in_db(None, TargetDb::Logs).await?;
234
235 db_tx
236 .perform(|tx| {
237 Box::pin(async move {
238 match LogStatus::update(log_status).exec(tx.as_ref()).await {
239 Ok(_) => Ok(()),
240 Err(e) => {
241 error!("Failed to update log status in db");
242 Err(DbError::from(DbSqlError::from(e)))
243 }
244 }
245 })
246 })
247 .await
248 }
249
250 async fn set_logs_unprocessed(&self, block_number: Option<u64>, block_offset: Option<u64>) -> Result<()> {
251 let min_block_number = block_number.unwrap_or(0);
252 let max_block_number = block_offset.map(|v| min_block_number + v + 1);
253
254 let query = LogStatus::update_many()
255 .col_expr(log_status::Column::Processed, Expr::value(Value::Bool(Some(false))))
256 .col_expr(
257 log_status::Column::ProcessedAt,
258 Expr::value(Value::ChronoDateTimeUtc(None)),
259 )
260 .filter(log_status::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
261 .apply_if(max_block_number, |q, v| {
262 q.filter(log_status::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
263 });
264
265 match query.exec(self.conn(TargetDb::Logs)).await {
266 Ok(_) => Ok(()),
267 Err(e) => Err(DbError::from(DbSqlError::from(e))),
268 }
269 }
270
271 async fn get_last_checksummed_log(&self) -> Result<Option<SerializableLog>> {
272 let query = LogStatus::find()
273 .filter(log_status::Column::Checksum.is_not_null())
274 .order_by_desc(log_status::Column::BlockNumber)
275 .order_by_desc(log_status::Column::TransactionIndex)
276 .order_by_desc(log_status::Column::LogIndex)
277 .find_also_related(Log);
278
279 match query.one(self.conn(TargetDb::Logs)).await {
280 Ok(Some((status, Some(log)))) => {
281 if let Ok(slog) = create_log(log, status) {
282 Ok(Some(slog))
283 } else {
284 Ok(None)
285 }
286 }
287 Ok(_) => Ok(None),
288 Err(e) => Err(DbError::from(DbSqlError::from(e))),
289 }
290 }
291
292 async fn update_logs_checksums(&self) -> Result<Hash> {
293 self.nest_transaction_in_db(None, TargetDb::Logs)
294 .await?
295 .perform(|tx| {
296 Box::pin(async move {
297 let mut last_checksum = LogStatus::find()
298 .filter(log_status::Column::Checksum.is_not_null())
299 .order_by_desc(log_status::Column::BlockNumber)
300 .order_by_desc(log_status::Column::TransactionIndex)
301 .order_by_desc(log_status::Column::LogIndex)
302 .one(tx.as_ref())
303 .await
304 .map_err(|e| DbError::from(DbSqlError::from(e)))?
305 .and_then(|m| m.checksum)
306 .and_then(|c| Hash::try_from(c.as_slice()).ok())
307 .unwrap_or_default();
308
309 let query = LogStatus::find()
310 .filter(log_status::Column::Checksum.is_null())
311 .order_by_asc(log_status::Column::BlockNumber)
312 .order_by_asc(log_status::Column::TransactionIndex)
313 .order_by_asc(log_status::Column::LogIndex)
314 .find_also_related(Log);
315
316 match query.all(tx.as_ref()).await {
317 Ok(entries) => {
318 let mut entries = entries.into_iter();
319 while let Some((status, Some(log_entry))) = entries.next() {
320 let slog = create_log(log_entry.clone(), status.clone())?;
321 let log_hash = Hash::create(&[
324 log_entry.block_hash.as_slice(),
325 log_entry.transaction_hash.as_slice(),
326 log_entry.log_index.as_slice(),
327 ]);
328
329 let next_checksum = Hash::create(&[last_checksum.as_ref(), log_hash.as_ref()]);
330
331 let mut updated_status = status.into_active_model();
332 updated_status.checksum = Set(Some(next_checksum.as_ref().to_vec()));
333
334 match updated_status.update(tx.as_ref()).await {
335 Ok(_) => {
336 last_checksum = next_checksum;
337 trace!(log = %slog, checksum = %next_checksum, "Generated log checksum");
338 }
339 Err(error) => {
340 error!(%error, "Failed to update log status checksum in db");
341 break;
342 }
343 }
344 }
345 Ok(last_checksum)
346 }
347 Err(e) => Err(DbError::from(DbSqlError::from(e))),
348 }
349 })
350 })
351 .await
352 }
353
354 async fn ensure_logs_origin(&self, contract_address_topics: Vec<(Address, Hash)>) -> Result<()> {
355 if contract_address_topics.is_empty() {
356 return Err(DbError::LogicalError(
357 "contract address topics must not be empty".into(),
358 ));
359 }
360
361 self.nest_transaction_in_db(None, TargetDb::Logs)
362 .await?
363 .perform(|tx| {
364 Box::pin(async move {
365 let log_count = Log::find()
367 .select_only()
368 .column(log::Column::BlockNumber)
369 .column(log::Column::TransactionIndex)
370 .column(log::Column::LogIndex)
371 .count(tx.as_ref())
372 .await
373 .map_err(|e| DbError::from(DbSqlError::from(e)))?;
374 let log_topic_count = LogTopicInfo::find()
375 .count(tx.as_ref())
376 .await
377 .map_err(|e| DbError::from(DbSqlError::from(e)))?;
378
379 if log_count == 0 && log_topic_count == 0 {
380 LogTopicInfo::insert_many(contract_address_topics.into_iter().map(|(addr, topic)| {
382 log_topic_info::ActiveModel {
383 address: Set(addr.to_string()),
384 topic: Set(topic.to_string()),
385 ..Default::default()
386 }
387 }))
388 .exec(tx.as_ref())
389 .await
390 .map_err(|e| DbError::from(DbSqlError::from(e)))?;
391 } else {
392 for (addr, topic) in contract_address_topics {
394 let log_topic_count = LogTopicInfo::find()
395 .filter(log_topic_info::Column::Address.eq(addr.to_string()))
396 .filter(log_topic_info::Column::Topic.eq(topic.to_string()))
397 .count(tx.as_ref())
398 .await
399 .map_err(|e| DbError::from(DbSqlError::from(e)))?;
400 if log_topic_count != 1 {
401 return Err(DbError::InconsistentLogs);
402 }
403 }
404 }
405 Ok(())
406 })
407 })
408 .await
409 }
410}
411
412fn create_log(raw_log: log::Model, status: log_status::Model) -> crate::errors::Result<SerializableLog> {
413 let log = SerializableLog::try_from(raw_log).map_err(DbSqlError::from)?;
414
415 let checksum = if let Some(c) = status.checksum {
416 let h: std::result::Result<[u8; 32], _> = c.try_into();
417
418 if let Ok(hash) = h {
419 Some(Hash::from(hash).to_hex())
420 } else {
421 return Err(DbSqlError::from(DbEntityError::ConversionError(
422 "Invalid log checksum".into(),
423 )));
424 }
425 } else {
426 None
427 };
428
429 let log = if let Some(raw_ts) = status.processed_at {
430 let ts = DateTime::<Utc>::from_naive_utc_and_offset(raw_ts, Utc);
431 SerializableLog {
432 processed: Some(status.processed),
433 processed_at: Some(ts),
434 checksum,
435 ..log
436 }
437 } else {
438 SerializableLog {
439 processed: Some(status.processed),
440 processed_at: None,
441 checksum,
442 ..log
443 }
444 };
445
446 Ok(log)
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452
453 use hopr_crypto_types::prelude::{ChainKeypair, Hash, Keypair};
454
455 #[async_std::test]
456 async fn test_store_single_log() {
457 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
458
459 let log = SerializableLog {
460 address: Address::new(b"my address 123456789"),
461 topics: [Hash::create(&[b"my topic"]).into()].into(),
462 data: [1, 2, 3, 4].into(),
463 tx_index: 1u64,
464 block_number: 1u64,
465 block_hash: Hash::create(&[b"my block hash"]).into(),
466 tx_hash: Hash::create(&[b"my tx hash"]).into(),
467 log_index: 1u64,
468 removed: false,
469 processed: Some(false),
470 ..Default::default()
471 };
472
473 db.store_log(log.clone()).await.unwrap();
474
475 let logs = db.get_logs(None, None).await.unwrap();
476
477 assert_eq!(logs.len(), 1);
478 assert_eq!(logs[0], log);
479 }
480
481 #[async_std::test]
482 async fn test_store_multiple_logs() {
483 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
484
485 let log_1 = SerializableLog {
486 address: Address::new(b"my address 123456789"),
487 topics: [Hash::create(&[b"my topic"]).into()].into(),
488 data: [1, 2, 3, 4].into(),
489 tx_index: 1u64,
490 block_number: 1u64,
491 block_hash: Hash::create(&[b"my block hash"]).into(),
492 tx_hash: Hash::create(&[b"my tx hash"]).into(),
493 log_index: 1u64,
494 removed: false,
495 processed: Some(false),
496 ..Default::default()
497 };
498
499 let log_2 = SerializableLog {
500 address: Address::new(b"my address 223456789"),
501 topics: [Hash::create(&[b"my topic 2"]).into()].into(),
502 data: [1, 2, 3, 4, 5].into(),
503 tx_index: 2u64,
504 block_number: 2u64,
505 block_hash: Hash::create(&[b"my block hash 2"]).into(),
506 tx_hash: Hash::create(&[b"my tx hash 2"]).into(),
507 log_index: 2u64,
508 removed: false,
509 processed: Some(true),
510 ..Default::default()
511 };
512
513 db.store_log(log_1.clone()).await.unwrap();
514 db.store_log(log_2.clone()).await.unwrap();
515
516 let logs = db.get_logs(None, None).await.unwrap();
517
518 assert_eq!(logs.len(), 2);
519 assert_eq!(logs[0], log_1);
520 assert_eq!(logs[1], log_2);
521
522 let log_2_retrieved = db
523 .get_log(log_2.block_number, log_2.tx_index, log_2.log_index)
524 .await
525 .unwrap();
526
527 assert_eq!(log_2, log_2_retrieved);
528 }
529
530 #[async_std::test]
531 async fn test_store_duplicate_log() {
532 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
533
534 let log = SerializableLog {
535 address: Address::new(b"my address 123456789"),
536 topics: [Hash::create(&[b"my topic"]).into()].into(),
537 data: [1, 2, 3, 4].into(),
538 tx_index: 1u64,
539 block_number: 1u64,
540 block_hash: Hash::create(&[b"my block hash"]).into(),
541 tx_hash: Hash::create(&[b"my tx hash"]).into(),
542 log_index: 1u64,
543 removed: false,
544 ..Default::default()
545 };
546
547 db.store_log(log.clone()).await.unwrap();
548
549 db.store_log(log.clone())
550 .await
551 .expect_err("Expected error due to duplicate log insertion");
552
553 let logs = db.get_logs(None, None).await.unwrap();
554
555 assert_eq!(logs.len(), 1);
556 }
557
558 #[async_std::test]
559 async fn test_set_log_processed() {
560 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
561
562 let log = SerializableLog {
563 address: Address::new(b"my address 123456789"),
564 topics: [Hash::create(&[b"my topic"]).into()].into(),
565 data: [1, 2, 3, 4].into(),
566 tx_index: 1u64,
567 block_number: 1u64,
568 block_hash: Hash::create(&[b"my block hash"]).into(),
569 tx_hash: Hash::create(&[b"my tx hash"]).into(),
570 log_index: 1u64,
571 removed: false,
572 ..Default::default()
573 };
574
575 db.store_log(log.clone()).await.unwrap();
576
577 let log_db = db.get_log(log.block_number, log.tx_index, log.log_index).await.unwrap();
578
579 assert_eq!(log_db.processed, Some(false));
580 assert_eq!(log_db.processed_at, None);
581
582 db.set_log_processed(log.clone()).await.unwrap();
583
584 let log_db_updated = db.get_log(log.block_number, log.tx_index, log.log_index).await.unwrap();
585
586 assert_eq!(log_db_updated.processed, Some(true));
587 assert!(log_db_updated.processed_at.is_some());
588 }
589
590 #[async_std::test]
591 async fn test_list_logs_ordered() {
592 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
593
594 let logs_per_tx = 3;
595 let tx_per_block = 3;
596 let blocks = 10;
597 let start_block = 32183412;
598 let base_log = SerializableLog {
599 address: Address::new(b"my address 123456789"),
600 topics: [Hash::create(&[b"my topic"]).into()].into(),
601 data: [1, 2, 3, 4].into(),
602 tx_index: 0,
603 block_number: 0,
604 block_hash: Hash::create(&[b"my block hash"]).into(),
605 tx_hash: Hash::create(&[b"my tx hash"]).into(),
606 log_index: 0,
607 removed: false,
608 ..Default::default()
609 };
610
611 for block_offset in 0..blocks {
612 for tx_index in 0..tx_per_block {
613 for log_index in 0..logs_per_tx {
614 let log = SerializableLog {
615 tx_index,
616 block_number: start_block + block_offset,
617 log_index,
618 ..base_log.clone()
619 };
620 db.store_log(log).await.unwrap()
621 }
622 }
623 }
624
625 let block_fetch_interval = 3;
626 let mut next_block = start_block;
627
628 while next_block <= start_block + blocks {
629 let ordered_logs = db.get_logs(Some(next_block), Some(block_fetch_interval)).await.unwrap();
630
631 assert!(!ordered_logs.is_empty());
632
633 ordered_logs.iter().reduce(|prev_log, curr_log| {
634 assert!(prev_log.block_number >= next_block);
635 assert!(prev_log.block_number <= (next_block + block_fetch_interval));
636 assert!(curr_log.block_number >= next_block);
637 assert!(curr_log.block_number <= (next_block + block_fetch_interval));
638 if prev_log.block_number == curr_log.block_number {
639 if prev_log.tx_index == curr_log.tx_index {
640 assert!(prev_log.log_index < curr_log.log_index);
641 } else {
642 assert!(prev_log.tx_index < curr_log.tx_index);
643 }
644 } else {
645 assert!(prev_log.block_number < curr_log.block_number);
646 }
647 curr_log
648 });
649 next_block += block_fetch_interval;
650 }
651 }
652
653 #[async_std::test]
654 async fn test_get_nonexistent_log() {
655 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
656
657 let result = db.get_log(999, 999, 999).await;
658
659 assert!(result.is_err());
660 }
661
662 #[async_std::test]
663 async fn test_get_logs_with_block_offset() {
664 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
665
666 let log_1 = SerializableLog {
667 address: Address::new(b"my address 123456789"),
668 topics: [Hash::create(&[b"topic1"]).into()].into(),
669 data: [1, 2, 3, 4].into(),
670 tx_index: 1,
671 block_number: 1,
672 block_hash: Hash::create(&[b"block_hash1"]).into(),
673 tx_hash: Hash::create(&[b"tx_hash1"]).into(),
674 log_index: 1,
675 removed: false,
676 processed: Some(false),
677 ..Default::default()
678 };
679
680 let log_2 = SerializableLog {
681 address: Address::new(b"my address 223456789"),
682 topics: [Hash::create(&[b"topic2"]).into()].into(),
683 data: [1, 2, 3, 4].into(),
684 tx_index: 2,
685 block_number: 2,
686 block_hash: Hash::create(&[b"block_hash2"]).into(),
687 tx_hash: Hash::create(&[b"tx_hash2"]).into(),
688 log_index: 2,
689 removed: false,
690 processed: Some(false),
691 ..Default::default()
692 };
693
694 db.store_logs(vec![log_1.clone(), log_2.clone()])
695 .await
696 .unwrap()
697 .into_iter()
698 .for_each(|r| assert!(r.is_ok()));
699
700 let logs = db.get_logs(Some(1), Some(0)).await.unwrap();
701
702 assert_eq!(logs.len(), 1);
703 assert_eq!(logs[0], log_1);
704 }
705
706 #[async_std::test]
707 async fn test_set_logs_unprocessed() {
708 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
709
710 let log = SerializableLog {
711 address: Address::new(b"my address 123456789"),
712 topics: [Hash::create(&[b"topic"]).into()].into(),
713 data: [1, 2, 3, 4].into(),
714 tx_index: 1,
715 block_number: 1,
716 block_hash: Hash::create(&[b"block_hash"]).into(),
717 tx_hash: Hash::create(&[b"tx_hash"]).into(),
718 log_index: 1,
719 removed: false,
720 processed: Some(true),
721 processed_at: Some(Utc::now()),
722 ..Default::default()
723 };
724
725 db.store_log(log.clone()).await.unwrap();
726
727 db.set_logs_unprocessed(Some(1), Some(0)).await.unwrap();
728
729 let log_db = db.get_log(log.block_number, log.tx_index, log.log_index).await.unwrap();
730
731 assert_eq!(log_db.processed, Some(false));
732 assert!(log_db.processed_at.is_none());
733 }
734
735 #[async_std::test]
736 async fn test_get_logs_block_numbers() {
737 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
738
739 let log_1 = SerializableLog {
740 address: Address::new(b"my address 123456789"),
741 topics: [Hash::create(&[b"topic1"]).into()].into(),
742 data: [1, 2, 3, 4].into(),
743 tx_index: 1,
744 block_number: 1,
745 block_hash: Hash::create(&[b"block_hash1"]).into(),
746 tx_hash: Hash::create(&[b"tx_hash1"]).into(),
747 log_index: 1,
748 removed: false,
749 processed: Some(true),
750 ..Default::default()
751 };
752
753 let log_2 = SerializableLog {
754 address: Address::new(b"my address 223456789"),
755 topics: [Hash::create(&[b"topic2"]).into()].into(),
756 data: [1, 2, 3, 4].into(),
757 tx_index: 2,
758 block_number: 2,
759 block_hash: Hash::create(&[b"block_hash2"]).into(),
760 tx_hash: Hash::create(&[b"tx_hash2"]).into(),
761 log_index: 2,
762 removed: false,
763 processed: Some(false),
764 ..Default::default()
765 };
766
767 let log_3 = SerializableLog {
768 address: Address::new(b"my address 323456789"),
769 topics: [Hash::create(&[b"topic3"]).into()].into(),
770 data: [1, 2, 3, 4].into(),
771 tx_index: 3,
772 block_number: 3,
773 block_hash: Hash::create(&[b"block_hash3"]).into(),
774 tx_hash: Hash::create(&[b"tx_hash3"]).into(),
775 log_index: 3,
776 removed: false,
777 processed: Some(false),
778 ..Default::default()
779 };
780
781 db.store_logs(vec![log_1.clone(), log_2.clone(), log_3.clone()])
782 .await
783 .unwrap()
784 .into_iter()
785 .for_each(|r| assert!(r.is_ok()));
786
787 let block_numbers_all = db.get_logs_block_numbers(None, None, None).await.unwrap();
788 assert_eq!(block_numbers_all.len(), 3);
789 assert_eq!(block_numbers_all, [1, 2, 3]);
790
791 let block_numbers_first_only = db.get_logs_block_numbers(Some(1), Some(0), None).await.unwrap();
792 assert_eq!(block_numbers_first_only.len(), 1);
793 assert_eq!(block_numbers_first_only[0], 1);
794
795 let block_numbers_last_only = db.get_logs_block_numbers(Some(3), Some(0), None).await.unwrap();
796 assert_eq!(block_numbers_last_only.len(), 1);
797 assert_eq!(block_numbers_last_only[0], 3);
798
799 let block_numbers_processed = db.get_logs_block_numbers(None, None, Some(true)).await.unwrap();
800 assert_eq!(block_numbers_processed.len(), 1);
801 assert_eq!(block_numbers_processed[0], 1);
802
803 let block_numbers_unprocessed_second = db.get_logs_block_numbers(Some(2), Some(0), Some(false)).await.unwrap();
804 assert_eq!(block_numbers_unprocessed_second.len(), 1);
805 assert_eq!(block_numbers_unprocessed_second[0], 2);
806 }
807
808 #[async_std::test]
809 async fn test_update_logs_checksums() {
810 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
811
812 let log_1 = SerializableLog {
814 address: Address::new(b"my address 123456789"),
815 topics: [Hash::create(&[b"topic"]).into()].into(),
816 data: [1, 2, 3, 4].into(),
817 tx_index: 1,
818 block_number: 1,
819 block_hash: Hash::create(&[b"block_hash"]).into(),
820 tx_hash: Hash::create(&[b"tx_hash"]).into(),
821 log_index: 1,
822 removed: false,
823 ..Default::default()
824 };
825
826 db.store_log(log_1.clone()).await.unwrap();
827
828 assert!(db.get_last_checksummed_log().await.unwrap().is_none());
829
830 db.update_logs_checksums().await.unwrap();
831
832 let updated_log_1 = db.get_last_checksummed_log().await.unwrap().unwrap();
833 assert!(updated_log_1.checksum.is_some());
834
835 let log_2 = SerializableLog {
837 block_number: 2,
838 ..log_1.clone()
839 };
840 let log_3 = SerializableLog {
841 block_number: 3,
842 ..log_1.clone()
843 };
844
845 db.store_logs(vec![log_2.clone(), log_3.clone()])
846 .await
847 .unwrap()
848 .into_iter()
849 .for_each(|r| assert!(r.is_ok()));
850
851 assert_eq!(
853 updated_log_1.clone().checksum.unwrap(),
854 db.get_last_checksummed_log().await.unwrap().unwrap().checksum.unwrap()
855 );
856
857 db.update_logs_checksums().await.unwrap();
858
859 let updated_log_3 = db.get_last_checksummed_log().await.unwrap().unwrap();
860
861 db.get_logs(None, None).await.unwrap().into_iter().for_each(|log| {
862 assert!(log.checksum.is_some());
863 });
864
865 assert_ne!(
867 updated_log_1.clone().checksum.unwrap(),
868 updated_log_3.clone().checksum.unwrap(),
869 );
870 assert_ne!(updated_log_1, updated_log_3);
871 }
872
873 #[async_std::test]
874 async fn test_should_not_allow_inconsistent_logs_in_the_db() -> anyhow::Result<()> {
875 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
876 let addr_1 = Address::new(b"my address 123456789");
877 let addr_2 = Address::new(b"my 2nd address 12345");
878 let topic_1 = Hash::create(&[b"my topic 1"]);
879 let topic_2 = Hash::create(&[b"my topic 2"]);
880
881 db.ensure_logs_origin(vec![(addr_1, topic_1)]).await?;
882
883 db.ensure_logs_origin(vec![(addr_1, topic_2)])
884 .await
885 .expect_err("expected error due to inconsistent logs in the db");
886
887 db.ensure_logs_origin(vec![(addr_2, topic_1)])
888 .await
889 .expect_err("expected error due to inconsistent logs in the db");
890
891 db.ensure_logs_origin(vec![(addr_1, topic_1)]).await?;
892
893 Ok(())
894 }
895}