1use async_trait::async_trait;
2use futures::{StreamExt, stream};
3use hopr_crypto_types::prelude::Hash;
4use hopr_db_api::{
5 errors::{DbError, Result},
6 logs::HoprDbLogOperations,
7};
8use hopr_db_entity::{
9 errors::DbEntityError,
10 log, log_status, log_topic_info,
11 prelude::{Log, LogStatus, LogTopicInfo},
12};
13use hopr_primitive_types::prelude::*;
14use sea_orm::{
15 ActiveModelTrait, ColumnTrait, DbErr, EntityTrait, FromQueryResult, IntoActiveModel, PaginatorTrait, QueryFilter,
16 QueryOrder, QuerySelect,
17 entity::Set,
18 query::QueryTrait,
19 sea_query::{Expr, OnConflict, Value},
20};
21use tracing::{error, trace, warn};
22
23use crate::{HoprDbGeneralModelOperations, TargetDb, db::HoprDb, errors::DbSqlError};
24
25#[derive(FromQueryResult)]
26struct BlockNumber {
27 block_number: Vec<u8>,
28}
29
30#[async_trait]
31impl HoprDbLogOperations for HoprDb {
32 async fn store_log<'a>(&'a self, log: SerializableLog) -> Result<()> {
33 match self.store_logs([log].to_vec()).await {
34 Ok(results) => {
35 if let Some(result) = results.into_iter().next() {
36 result
37 } else {
38 panic!("when inserting a log into the db, the result should be a single item")
39 }
40 }
41 Err(e) => Err(e),
42 }
43 }
44
45 async fn store_logs(&self, logs: Vec<SerializableLog>) -> Result<Vec<Result<()>>> {
46 self.nest_transaction_in_db(None, TargetDb::Logs)
47 .await?
48 .perform(|tx| {
49 Box::pin(async move {
50 let results = stream::iter(logs).then(|log| async {
51 let log_id = log.to_string();
52 let model = log::ActiveModel::from(log.clone());
53 let status_model = log_status::ActiveModel::from(log);
54 let log_status_query = LogStatus::insert(status_model).on_conflict(
55 OnConflict::columns([
56 log_status::Column::LogIndex,
57 log_status::Column::TransactionIndex,
58 log_status::Column::BlockNumber,
59 ])
60 .do_nothing()
61 .to_owned(),
62 );
63 let log_query = Log::insert(model).on_conflict(
64 OnConflict::columns([
65 log::Column::LogIndex,
66 log::Column::TransactionIndex,
67 log::Column::BlockNumber,
68 ])
69 .do_nothing()
70 .to_owned(),
71 );
72
73 match log_status_query.exec(tx.as_ref()).await {
74 Ok(_) => match log_query.exec(tx.as_ref()).await {
75 Ok(_) => Ok(()),
76 Err(DbErr::RecordNotInserted) => {
77 warn!(log_id, "log already in the DB");
78 Err(DbError::General(format!("log already exists in the DB: {log_id}")))
79 }
80 Err(e) => {
81 error!("Failed to insert log into db: {:?}", e);
82 Err(DbError::General(e.to_string()))
83 }
84 },
85 Err(DbErr::RecordNotInserted) => {
86 warn!(log_id, "log already in the DB");
87 Err(DbError::General(format!(
88 "log status already exists in the DB: {log_id}"
89 )))
90 }
91 Err(e) => {
92 error!(%log_id, "Failed to insert log status into db: {:?}", e);
93 Err(DbError::General(e.to_string()))
94 }
95 }
96 });
97 Ok(results.collect::<Vec<_>>().await)
98 })
99 })
100 .await
101 }
102
103 async fn get_log(&self, block_number: u64, tx_index: u64, log_index: u64) -> Result<SerializableLog> {
104 let bn_enc = block_number.to_be_bytes().to_vec();
105 let tx_index_enc = tx_index.to_be_bytes().to_vec();
106 let log_index_enc = log_index.to_be_bytes().to_vec();
107
108 let query = Log::find()
109 .filter(log::Column::BlockNumber.eq(bn_enc))
110 .filter(log::Column::TransactionIndex.eq(tx_index_enc))
111 .filter(log::Column::LogIndex.eq(log_index_enc))
112 .find_also_related(LogStatus);
113
114 match query.all(self.conn(TargetDb::Logs)).await {
115 Ok(mut res) => {
116 if let Some((log, log_status)) = res.pop() {
117 if let Some(status) = log_status {
118 create_log(log, status).map_err(DbError::from)
119 } else {
120 Err(DbError::MissingLogStatus)
121 }
122 } else {
123 Err(DbError::MissingLog)
124 }
125 }
126 Err(e) => Err(DbError::from(DbSqlError::from(e))),
127 }
128 }
129
130 async fn get_logs<'a>(
131 &'a self,
132 block_number: Option<u64>,
133 block_offset: Option<u64>,
134 ) -> Result<Vec<SerializableLog>> {
135 let min_block_number = block_number.unwrap_or(0);
136 let max_block_number = block_offset.map(|v| min_block_number + v + 1);
137
138 let query = Log::find()
139 .find_also_related(LogStatus)
140 .filter(log::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
141 .apply_if(max_block_number, |q, v| {
142 q.filter(log::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
143 })
144 .order_by_asc(log::Column::BlockNumber)
145 .order_by_asc(log::Column::TransactionIndex)
146 .order_by_asc(log::Column::LogIndex);
147
148 match query.all(self.conn(TargetDb::Logs)).await {
149 Ok(logs) => Ok(logs
150 .into_iter()
151 .map(|(log, status)| {
152 if let Some(status) = status {
153 create_log(log, status).unwrap()
154 } else {
155 error!("Missing log status for log in db: {:?}", log);
156 SerializableLog::try_from(log).unwrap()
157 }
158 })
159 .collect()),
160 Err(e) => {
161 error!("Failed to get logs from db: {:?}", e);
162 Err(DbError::from(DbSqlError::from(e)))
163 }
164 }
165 }
166
167 async fn get_logs_count(&self, block_number: Option<u64>, block_offset: Option<u64>) -> Result<u64> {
168 let min_block_number = block_number.unwrap_or(0);
169 let max_block_number = block_offset.map(|v| min_block_number + v + 1);
170
171 Log::find()
172 .select_only()
173 .column(log::Column::BlockNumber)
174 .column(log::Column::TransactionIndex)
175 .column(log::Column::LogIndex)
176 .filter(log::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
177 .apply_if(max_block_number, |q, v| {
178 q.filter(log::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
179 })
180 .count(self.conn(TargetDb::Logs))
181 .await
182 .map_err(|e| DbSqlError::from(e).into())
183 }
184
185 async fn get_logs_block_numbers<'a>(
186 &'a self,
187 block_number: Option<u64>,
188 block_offset: Option<u64>,
189 processed: Option<bool>,
190 ) -> Result<Vec<u64>> {
191 let min_block_number = block_number.unwrap_or(0);
192 let max_block_number = block_offset.map(|v| min_block_number + v + 1);
193
194 LogStatus::find()
195 .select_only()
196 .column(log_status::Column::BlockNumber)
197 .distinct()
198 .filter(log_status::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
199 .apply_if(max_block_number, |q, v| {
200 q.filter(log_status::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
201 })
202 .apply_if(processed, |q, v| q.filter(log_status::Column::Processed.eq(v)))
203 .order_by_asc(log_status::Column::BlockNumber)
204 .into_model::<BlockNumber>()
205 .all(self.conn(TargetDb::Logs))
206 .await
207 .map(|res| {
208 res.into_iter()
209 .map(|b| U256::from_be_bytes(b.block_number).as_u64())
210 .collect()
211 })
212 .map_err(|e| {
213 error!("Failed to get logs block numbers from db: {:?}", e);
214 DbError::from(DbSqlError::from(e))
215 })
216 }
217
218 async fn set_logs_processed(&self, block_number: Option<u64>, block_offset: Option<u64>) -> Result<()> {
219 let min_block_number = block_number.unwrap_or(0);
220 let max_block_number = block_offset.map(|v| min_block_number + v + 1);
221 let now = Utc::now();
222
223 let query = LogStatus::update_many()
224 .col_expr(log_status::Column::Processed, Expr::value(Value::Bool(Some(true))))
225 .col_expr(
226 log_status::Column::ProcessedAt,
227 Expr::value(Value::ChronoDateTimeUtc(Some(now.into()))),
228 )
229 .filter(log_status::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
230 .apply_if(max_block_number, |q, v| {
231 q.filter(log_status::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
232 });
233
234 match query.exec(self.conn(TargetDb::Logs)).await {
235 Ok(_) => Ok(()),
236 Err(e) => Err(DbError::from(DbSqlError::from(e))),
237 }
238 }
239
240 async fn set_log_processed<'a>(&'a self, mut log: SerializableLog) -> Result<()> {
241 log.processed = Some(true);
242 log.processed_at = Some(Utc::now());
243 let log_status = log_status::ActiveModel::from(log);
244
245 let db_tx = self.nest_transaction_in_db(None, TargetDb::Logs).await?;
246
247 db_tx
248 .perform(|tx| {
249 Box::pin(async move {
250 match LogStatus::update(log_status).exec(tx.as_ref()).await {
251 Ok(_) => Ok(()),
252 Err(e) => {
253 error!("Failed to update log status in db");
254 Err(DbError::from(DbSqlError::from(e)))
255 }
256 }
257 })
258 })
259 .await
260 }
261
262 async fn set_logs_unprocessed(&self, block_number: Option<u64>, block_offset: Option<u64>) -> Result<()> {
263 let min_block_number = block_number.unwrap_or(0);
264 let max_block_number = block_offset.map(|v| min_block_number + v + 1);
265
266 let query = LogStatus::update_many()
267 .col_expr(log_status::Column::Processed, Expr::value(Value::Bool(Some(false))))
268 .col_expr(
269 log_status::Column::ProcessedAt,
270 Expr::value(Value::ChronoDateTimeUtc(None)),
271 )
272 .filter(log_status::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
273 .apply_if(max_block_number, |q, v| {
274 q.filter(log_status::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
275 });
276
277 match query.exec(self.conn(TargetDb::Logs)).await {
278 Ok(_) => Ok(()),
279 Err(e) => Err(DbError::from(DbSqlError::from(e))),
280 }
281 }
282
283 async fn get_last_checksummed_log(&self) -> Result<Option<SerializableLog>> {
284 let query = LogStatus::find()
285 .filter(log_status::Column::Checksum.is_not_null())
286 .order_by_desc(log_status::Column::BlockNumber)
287 .order_by_desc(log_status::Column::TransactionIndex)
288 .order_by_desc(log_status::Column::LogIndex)
289 .find_also_related(Log);
290
291 match query.one(self.conn(TargetDb::Logs)).await {
292 Ok(Some((status, Some(log)))) => {
293 if let Ok(slog) = create_log(log, status) {
294 Ok(Some(slog))
295 } else {
296 Ok(None)
297 }
298 }
299 Ok(_) => Ok(None),
300 Err(e) => Err(DbError::from(DbSqlError::from(e))),
301 }
302 }
303
304 async fn update_logs_checksums(&self) -> Result<Hash> {
305 self.nest_transaction_in_db(None, TargetDb::Logs)
306 .await?
307 .perform(|tx| {
308 Box::pin(async move {
309 let mut last_checksum = LogStatus::find()
310 .filter(log_status::Column::Checksum.is_not_null())
311 .order_by_desc(log_status::Column::BlockNumber)
312 .order_by_desc(log_status::Column::TransactionIndex)
313 .order_by_desc(log_status::Column::LogIndex)
314 .one(tx.as_ref())
315 .await
316 .map_err(|e| DbError::from(DbSqlError::from(e)))?
317 .and_then(|m| m.checksum)
318 .and_then(|c| Hash::try_from(c.as_slice()).ok())
319 .unwrap_or_default();
320
321 let query = LogStatus::find()
322 .filter(log_status::Column::Checksum.is_null())
323 .order_by_asc(log_status::Column::BlockNumber)
324 .order_by_asc(log_status::Column::TransactionIndex)
325 .order_by_asc(log_status::Column::LogIndex)
326 .find_also_related(Log);
327
328 match query.all(tx.as_ref()).await {
329 Ok(entries) => {
330 let mut entries = entries.into_iter();
331 while let Some((status, Some(log_entry))) = entries.next() {
332 let slog = create_log(log_entry.clone(), status.clone())?;
333 let log_hash = Hash::create(&[
336 log_entry.block_hash.as_slice(),
337 log_entry.transaction_hash.as_slice(),
338 log_entry.log_index.as_slice(),
339 ]);
340
341 let next_checksum = Hash::create(&[last_checksum.as_ref(), log_hash.as_ref()]);
342
343 let mut updated_status = status.into_active_model();
344 updated_status.checksum = Set(Some(next_checksum.as_ref().to_vec()));
345
346 match updated_status.update(tx.as_ref()).await {
347 Ok(_) => {
348 last_checksum = next_checksum;
349 trace!(log = %slog, checksum = %next_checksum, "Generated log checksum");
350 }
351 Err(error) => {
352 error!(%error, "Failed to update log status checksum in db");
353 break;
354 }
355 }
356 }
357 Ok(last_checksum)
358 }
359 Err(e) => Err(DbError::from(DbSqlError::from(e))),
360 }
361 })
362 })
363 .await
364 }
365
366 async fn ensure_logs_origin(&self, contract_address_topics: Vec<(Address, Hash)>) -> Result<()> {
367 if contract_address_topics.is_empty() {
368 return Err(DbError::LogicalError(
369 "contract address topics must not be empty".into(),
370 ));
371 }
372
373 self.nest_transaction_in_db(None, TargetDb::Logs)
374 .await?
375 .perform(|tx| {
376 Box::pin(async move {
377 let log_count = Log::find()
379 .select_only()
380 .column(log::Column::BlockNumber)
381 .column(log::Column::TransactionIndex)
382 .column(log::Column::LogIndex)
383 .count(tx.as_ref())
384 .await
385 .map_err(|e| DbError::from(DbSqlError::from(e)))?;
386 let log_topic_count = LogTopicInfo::find()
387 .count(tx.as_ref())
388 .await
389 .map_err(|e| DbError::from(DbSqlError::from(e)))?;
390
391 if log_count == 0 && log_topic_count == 0 {
392 LogTopicInfo::insert_many(contract_address_topics.into_iter().map(|(addr, topic)| {
394 log_topic_info::ActiveModel {
395 address: Set(addr.to_string()),
396 topic: Set(topic.to_string()),
397 ..Default::default()
398 }
399 }))
400 .exec(tx.as_ref())
401 .await
402 .map_err(|e| DbError::from(DbSqlError::from(e)))?;
403 } else {
404 for (addr, topic) in contract_address_topics {
406 let log_topic_count = LogTopicInfo::find()
407 .filter(log_topic_info::Column::Address.eq(addr.to_string()))
408 .filter(log_topic_info::Column::Topic.eq(topic.to_string()))
409 .count(tx.as_ref())
410 .await
411 .map_err(|e| DbError::from(DbSqlError::from(e)))?;
412 if log_topic_count != 1 {
413 return Err(DbError::InconsistentLogs);
414 }
415 }
416 }
417 Ok(())
418 })
419 })
420 .await
421 }
422}
423
424fn create_log(raw_log: log::Model, status: log_status::Model) -> crate::errors::Result<SerializableLog> {
425 let log = SerializableLog::try_from(raw_log).map_err(DbSqlError::from)?;
426
427 let checksum = if let Some(c) = status.checksum {
428 let h: std::result::Result<[u8; 32], _> = c.try_into();
429
430 if let Ok(hash) = h {
431 Some(Hash::from(hash).to_hex())
432 } else {
433 return Err(DbSqlError::from(DbEntityError::ConversionError(
434 "Invalid log checksum".into(),
435 )));
436 }
437 } else {
438 None
439 };
440
441 let log = if let Some(raw_ts) = status.processed_at {
442 let ts = DateTime::<Utc>::from_naive_utc_and_offset(raw_ts, Utc);
443 SerializableLog {
444 processed: Some(status.processed),
445 processed_at: Some(ts),
446 checksum,
447 ..log
448 }
449 } else {
450 SerializableLog {
451 processed: Some(status.processed),
452 processed_at: None,
453 checksum,
454 ..log
455 }
456 };
457
458 Ok(log)
459}
460
461#[cfg(test)]
462mod tests {
463 use hopr_crypto_types::prelude::{ChainKeypair, Hash, Keypair};
464
465 use super::*;
466
467 #[tokio::test]
468 async fn test_store_single_log() {
469 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
470
471 let log = SerializableLog {
472 address: Address::new(b"my address 123456789"),
473 topics: [Hash::create(&[b"my topic"]).into()].into(),
474 data: [1, 2, 3, 4].into(),
475 tx_index: 1u64,
476 block_number: 1u64,
477 block_hash: Hash::create(&[b"my block hash"]).into(),
478 tx_hash: Hash::create(&[b"my tx hash"]).into(),
479 log_index: 1u64,
480 removed: false,
481 processed: Some(false),
482 ..Default::default()
483 };
484
485 db.store_log(log.clone()).await.unwrap();
486
487 let logs = db.get_logs(None, None).await.unwrap();
488
489 assert_eq!(logs.len(), 1);
490 assert_eq!(logs[0], log);
491 }
492
493 #[tokio::test]
494 async fn test_store_multiple_logs() {
495 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
496
497 let log_1 = SerializableLog {
498 address: Address::new(b"my address 123456789"),
499 topics: [Hash::create(&[b"my topic"]).into()].into(),
500 data: [1, 2, 3, 4].into(),
501 tx_index: 1u64,
502 block_number: 1u64,
503 block_hash: Hash::create(&[b"my block hash"]).into(),
504 tx_hash: Hash::create(&[b"my tx hash"]).into(),
505 log_index: 1u64,
506 removed: false,
507 processed: Some(false),
508 ..Default::default()
509 };
510
511 let log_2 = SerializableLog {
512 address: Address::new(b"my address 223456789"),
513 topics: [Hash::create(&[b"my topic 2"]).into()].into(),
514 data: [1, 2, 3, 4, 5].into(),
515 tx_index: 2u64,
516 block_number: 2u64,
517 block_hash: Hash::create(&[b"my block hash 2"]).into(),
518 tx_hash: Hash::create(&[b"my tx hash 2"]).into(),
519 log_index: 2u64,
520 removed: false,
521 processed: Some(true),
522 ..Default::default()
523 };
524
525 db.store_log(log_1.clone()).await.unwrap();
526 db.store_log(log_2.clone()).await.unwrap();
527
528 let logs = db.get_logs(None, None).await.unwrap();
529
530 assert_eq!(logs.len(), 2);
531 assert_eq!(logs[0], log_1);
532 assert_eq!(logs[1], log_2);
533
534 let log_2_retrieved = db
535 .get_log(log_2.block_number, log_2.tx_index, log_2.log_index)
536 .await
537 .unwrap();
538
539 assert_eq!(log_2, log_2_retrieved);
540 }
541
542 #[tokio::test]
543 async fn test_store_duplicate_log() {
544 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
545
546 let log = SerializableLog {
547 address: Address::new(b"my address 123456789"),
548 topics: [Hash::create(&[b"my topic"]).into()].into(),
549 data: [1, 2, 3, 4].into(),
550 tx_index: 1u64,
551 block_number: 1u64,
552 block_hash: Hash::create(&[b"my block hash"]).into(),
553 tx_hash: Hash::create(&[b"my tx hash"]).into(),
554 log_index: 1u64,
555 removed: false,
556 ..Default::default()
557 };
558
559 db.store_log(log.clone()).await.unwrap();
560
561 db.store_log(log.clone())
562 .await
563 .expect_err("Expected error due to duplicate log insertion");
564
565 let logs = db.get_logs(None, None).await.unwrap();
566
567 assert_eq!(logs.len(), 1);
568 }
569
570 #[tokio::test]
571 async fn test_set_log_processed() {
572 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
573
574 let log = SerializableLog {
575 address: Address::new(b"my address 123456789"),
576 topics: [Hash::create(&[b"my topic"]).into()].into(),
577 data: [1, 2, 3, 4].into(),
578 tx_index: 1u64,
579 block_number: 1u64,
580 block_hash: Hash::create(&[b"my block hash"]).into(),
581 tx_hash: Hash::create(&[b"my tx hash"]).into(),
582 log_index: 1u64,
583 removed: false,
584 ..Default::default()
585 };
586
587 db.store_log(log.clone()).await.unwrap();
588
589 let log_db = db.get_log(log.block_number, log.tx_index, log.log_index).await.unwrap();
590
591 assert_eq!(log_db.processed, Some(false));
592 assert_eq!(log_db.processed_at, None);
593
594 db.set_log_processed(log.clone()).await.unwrap();
595
596 let log_db_updated = db.get_log(log.block_number, log.tx_index, log.log_index).await.unwrap();
597
598 assert_eq!(log_db_updated.processed, Some(true));
599 assert!(log_db_updated.processed_at.is_some());
600 }
601
602 #[tokio::test]
603 async fn test_list_logs_ordered() {
604 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
605
606 let logs_per_tx = 3;
607 let tx_per_block = 3;
608 let blocks = 10;
609 let start_block = 32183412;
610 let base_log = SerializableLog {
611 address: Address::new(b"my address 123456789"),
612 topics: [Hash::create(&[b"my topic"]).into()].into(),
613 data: [1, 2, 3, 4].into(),
614 tx_index: 0,
615 block_number: 0,
616 block_hash: Hash::create(&[b"my block hash"]).into(),
617 tx_hash: Hash::create(&[b"my tx hash"]).into(),
618 log_index: 0,
619 removed: false,
620 ..Default::default()
621 };
622
623 for block_offset in 0..blocks {
624 for tx_index in 0..tx_per_block {
625 for log_index in 0..logs_per_tx {
626 let log = SerializableLog {
627 tx_index,
628 block_number: start_block + block_offset,
629 log_index,
630 ..base_log.clone()
631 };
632 db.store_log(log).await.unwrap()
633 }
634 }
635 }
636
637 let block_fetch_interval = 3;
638 let mut next_block = start_block;
639
640 while next_block <= start_block + blocks {
641 let ordered_logs = db.get_logs(Some(next_block), Some(block_fetch_interval)).await.unwrap();
642
643 assert!(!ordered_logs.is_empty());
644
645 ordered_logs.iter().reduce(|prev_log, curr_log| {
646 assert!(prev_log.block_number >= next_block);
647 assert!(prev_log.block_number <= (next_block + block_fetch_interval));
648 assert!(curr_log.block_number >= next_block);
649 assert!(curr_log.block_number <= (next_block + block_fetch_interval));
650 if prev_log.block_number == curr_log.block_number {
651 if prev_log.tx_index == curr_log.tx_index {
652 assert!(prev_log.log_index < curr_log.log_index);
653 } else {
654 assert!(prev_log.tx_index < curr_log.tx_index);
655 }
656 } else {
657 assert!(prev_log.block_number < curr_log.block_number);
658 }
659 curr_log
660 });
661 next_block += block_fetch_interval;
662 }
663 }
664
665 #[tokio::test]
666 async fn test_get_nonexistent_log() {
667 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
668
669 let result = db.get_log(999, 999, 999).await;
670
671 assert!(result.is_err());
672 }
673
674 #[tokio::test]
675 async fn test_get_logs_with_block_offset() {
676 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
677
678 let log_1 = SerializableLog {
679 address: Address::new(b"my address 123456789"),
680 topics: [Hash::create(&[b"topic1"]).into()].into(),
681 data: [1, 2, 3, 4].into(),
682 tx_index: 1,
683 block_number: 1,
684 block_hash: Hash::create(&[b"block_hash1"]).into(),
685 tx_hash: Hash::create(&[b"tx_hash1"]).into(),
686 log_index: 1,
687 removed: false,
688 processed: Some(false),
689 ..Default::default()
690 };
691
692 let log_2 = SerializableLog {
693 address: Address::new(b"my address 223456789"),
694 topics: [Hash::create(&[b"topic2"]).into()].into(),
695 data: [1, 2, 3, 4].into(),
696 tx_index: 2,
697 block_number: 2,
698 block_hash: Hash::create(&[b"block_hash2"]).into(),
699 tx_hash: Hash::create(&[b"tx_hash2"]).into(),
700 log_index: 2,
701 removed: false,
702 processed: Some(false),
703 ..Default::default()
704 };
705
706 db.store_logs(vec![log_1.clone(), log_2.clone()])
707 .await
708 .unwrap()
709 .into_iter()
710 .for_each(|r| assert!(r.is_ok()));
711
712 let logs = db.get_logs(Some(1), Some(0)).await.unwrap();
713
714 assert_eq!(logs.len(), 1);
715 assert_eq!(logs[0], log_1);
716 }
717
718 #[tokio::test]
719 async fn test_set_logs_unprocessed() {
720 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
721
722 let log = SerializableLog {
723 address: Address::new(b"my address 123456789"),
724 topics: [Hash::create(&[b"topic"]).into()].into(),
725 data: [1, 2, 3, 4].into(),
726 tx_index: 1,
727 block_number: 1,
728 block_hash: Hash::create(&[b"block_hash"]).into(),
729 tx_hash: Hash::create(&[b"tx_hash"]).into(),
730 log_index: 1,
731 removed: false,
732 processed: Some(true),
733 processed_at: Some(Utc::now()),
734 ..Default::default()
735 };
736
737 db.store_log(log.clone()).await.unwrap();
738
739 db.set_logs_unprocessed(Some(1), Some(0)).await.unwrap();
740
741 let log_db = db.get_log(log.block_number, log.tx_index, log.log_index).await.unwrap();
742
743 assert_eq!(log_db.processed, Some(false));
744 assert!(log_db.processed_at.is_none());
745 }
746
747 #[tokio::test]
748 async fn test_get_logs_block_numbers() {
749 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
750
751 let log_1 = SerializableLog {
752 address: Address::new(b"my address 123456789"),
753 topics: [Hash::create(&[b"topic1"]).into()].into(),
754 data: [1, 2, 3, 4].into(),
755 tx_index: 1,
756 block_number: 1,
757 block_hash: Hash::create(&[b"block_hash1"]).into(),
758 tx_hash: Hash::create(&[b"tx_hash1"]).into(),
759 log_index: 1,
760 removed: false,
761 processed: Some(true),
762 ..Default::default()
763 };
764
765 let log_2 = SerializableLog {
766 address: Address::new(b"my address 223456789"),
767 topics: [Hash::create(&[b"topic2"]).into()].into(),
768 data: [1, 2, 3, 4].into(),
769 tx_index: 2,
770 block_number: 2,
771 block_hash: Hash::create(&[b"block_hash2"]).into(),
772 tx_hash: Hash::create(&[b"tx_hash2"]).into(),
773 log_index: 2,
774 removed: false,
775 processed: Some(false),
776 ..Default::default()
777 };
778
779 let log_3 = SerializableLog {
780 address: Address::new(b"my address 323456789"),
781 topics: [Hash::create(&[b"topic3"]).into()].into(),
782 data: [1, 2, 3, 4].into(),
783 tx_index: 3,
784 block_number: 3,
785 block_hash: Hash::create(&[b"block_hash3"]).into(),
786 tx_hash: Hash::create(&[b"tx_hash3"]).into(),
787 log_index: 3,
788 removed: false,
789 processed: Some(false),
790 ..Default::default()
791 };
792
793 db.store_logs(vec![log_1.clone(), log_2.clone(), log_3.clone()])
794 .await
795 .unwrap()
796 .into_iter()
797 .for_each(|r| assert!(r.is_ok()));
798
799 let block_numbers_all = db.get_logs_block_numbers(None, None, None).await.unwrap();
800 assert_eq!(block_numbers_all.len(), 3);
801 assert_eq!(block_numbers_all, [1, 2, 3]);
802
803 let block_numbers_first_only = db.get_logs_block_numbers(Some(1), Some(0), None).await.unwrap();
804 assert_eq!(block_numbers_first_only.len(), 1);
805 assert_eq!(block_numbers_first_only[0], 1);
806
807 let block_numbers_last_only = db.get_logs_block_numbers(Some(3), Some(0), None).await.unwrap();
808 assert_eq!(block_numbers_last_only.len(), 1);
809 assert_eq!(block_numbers_last_only[0], 3);
810
811 let block_numbers_processed = db.get_logs_block_numbers(None, None, Some(true)).await.unwrap();
812 assert_eq!(block_numbers_processed.len(), 1);
813 assert_eq!(block_numbers_processed[0], 1);
814
815 let block_numbers_unprocessed_second = db.get_logs_block_numbers(Some(2), Some(0), Some(false)).await.unwrap();
816 assert_eq!(block_numbers_unprocessed_second.len(), 1);
817 assert_eq!(block_numbers_unprocessed_second[0], 2);
818 }
819
820 #[tokio::test]
821 async fn test_update_logs_checksums() {
822 let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
823
824 let log_1 = SerializableLog {
826 address: Address::new(b"my address 123456789"),
827 topics: [Hash::create(&[b"topic"]).into()].into(),
828 data: [1, 2, 3, 4].into(),
829 tx_index: 1,
830 block_number: 1,
831 block_hash: Hash::create(&[b"block_hash"]).into(),
832 tx_hash: Hash::create(&[b"tx_hash"]).into(),
833 log_index: 1,
834 removed: false,
835 ..Default::default()
836 };
837
838 db.store_log(log_1.clone()).await.unwrap();
839
840 assert!(db.get_last_checksummed_log().await.unwrap().is_none());
841
842 db.update_logs_checksums().await.unwrap();
843
844 let updated_log_1 = db.get_last_checksummed_log().await.unwrap().unwrap();
845 assert!(updated_log_1.checksum.is_some());
846
847 let log_2 = SerializableLog {
849 block_number: 2,
850 ..log_1.clone()
851 };
852 let log_3 = SerializableLog {
853 block_number: 3,
854 ..log_1.clone()
855 };
856
857 db.store_logs(vec![log_2.clone(), log_3.clone()])
858 .await
859 .unwrap()
860 .into_iter()
861 .for_each(|r| assert!(r.is_ok()));
862
863 assert_eq!(
865 updated_log_1.clone().checksum.unwrap(),
866 db.get_last_checksummed_log().await.unwrap().unwrap().checksum.unwrap()
867 );
868
869 db.update_logs_checksums().await.unwrap();
870
871 let updated_log_3 = db.get_last_checksummed_log().await.unwrap().unwrap();
872
873 db.get_logs(None, None).await.unwrap().into_iter().for_each(|log| {
874 assert!(log.checksum.is_some());
875 });
876
877 assert_ne!(
879 updated_log_1.clone().checksum.unwrap(),
880 updated_log_3.clone().checksum.unwrap(),
881 );
882 assert_ne!(updated_log_1, updated_log_3);
883 }
884
885 #[tokio::test]
886 async fn test_should_not_allow_inconsistent_logs_in_the_db() -> anyhow::Result<()> {
887 let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
888 let addr_1 = Address::new(b"my address 123456789");
889 let addr_2 = Address::new(b"my 2nd address 12345");
890 let topic_1 = Hash::create(&[b"my topic 1"]);
891 let topic_2 = Hash::create(&[b"my topic 2"]);
892
893 db.ensure_logs_origin(vec![(addr_1, topic_1)]).await?;
894
895 db.ensure_logs_origin(vec![(addr_1, topic_2)])
896 .await
897 .expect_err("expected error due to inconsistent logs in the db");
898
899 db.ensure_logs_origin(vec![(addr_2, topic_1)])
900 .await
901 .expect_err("expected error due to inconsistent logs in the db");
902
903 db.ensure_logs_origin(vec![(addr_1, topic_1)]).await?;
904
905 Ok(())
906 }
907}