hopr_db_sql/
logs.rs

1use async_trait::async_trait;
2use futures::{StreamExt, stream};
3use hopr_crypto_types::prelude::Hash;
4use hopr_db_api::{
5    errors::{DbError, Result},
6    logs::HoprDbLogOperations,
7};
8use hopr_db_entity::{
9    errors::DbEntityError,
10    log, log_status, log_topic_info,
11    prelude::{Log, LogStatus, LogTopicInfo},
12};
13use hopr_primitive_types::prelude::*;
14use sea_orm::{
15    ActiveModelTrait, ColumnTrait, DbErr, EntityTrait, FromQueryResult, IntoActiveModel, PaginatorTrait, QueryFilter,
16    QueryOrder, QuerySelect,
17    entity::Set,
18    query::QueryTrait,
19    sea_query::{Expr, OnConflict, Value},
20};
21use tracing::{error, trace, warn};
22
23use crate::{HoprDbGeneralModelOperations, TargetDb, db::HoprDb, errors::DbSqlError};
24
25#[derive(FromQueryResult)]
26struct BlockNumber {
27    block_number: Vec<u8>,
28}
29
30#[async_trait]
31impl HoprDbLogOperations for HoprDb {
32    async fn store_log<'a>(&'a self, log: SerializableLog) -> Result<()> {
33        match self.store_logs([log].to_vec()).await {
34            Ok(results) => {
35                if let Some(result) = results.into_iter().next() {
36                    result
37                } else {
38                    panic!("when inserting a log into the db, the result should be a single item")
39                }
40            }
41            Err(e) => Err(e),
42        }
43    }
44
45    async fn store_logs(&self, logs: Vec<SerializableLog>) -> Result<Vec<Result<()>>> {
46        self.nest_transaction_in_db(None, TargetDb::Logs)
47            .await?
48            .perform(|tx| {
49                Box::pin(async move {
50                    let results = stream::iter(logs).then(|log| async {
51                        let log_id = log.to_string();
52                        let model = log::ActiveModel::from(log.clone());
53                        let status_model = log_status::ActiveModel::from(log);
54                        let log_status_query = LogStatus::insert(status_model).on_conflict(
55                            OnConflict::columns([
56                                log_status::Column::LogIndex,
57                                log_status::Column::TransactionIndex,
58                                log_status::Column::BlockNumber,
59                            ])
60                            .do_nothing()
61                            .to_owned(),
62                        );
63                        let log_query = Log::insert(model).on_conflict(
64                            OnConflict::columns([
65                                log::Column::LogIndex,
66                                log::Column::TransactionIndex,
67                                log::Column::BlockNumber,
68                            ])
69                            .do_nothing()
70                            .to_owned(),
71                        );
72
73                        match log_status_query.exec(tx.as_ref()).await {
74                            Ok(_) => match log_query.exec(tx.as_ref()).await {
75                                Ok(_) => Ok(()),
76                                Err(DbErr::RecordNotInserted) => {
77                                    warn!(log_id, "log already in the DB");
78                                    Err(DbError::General(format!("log already exists in the DB: {log_id}")))
79                                }
80                                Err(e) => {
81                                    error!("Failed to insert log into db: {:?}", e);
82                                    Err(DbError::General(e.to_string()))
83                                }
84                            },
85                            Err(DbErr::RecordNotInserted) => {
86                                warn!(log_id, "log already in the DB");
87                                Err(DbError::General(format!(
88                                    "log status already exists in the DB: {log_id}"
89                                )))
90                            }
91                            Err(e) => {
92                                error!(%log_id, "Failed to insert log status into db: {:?}", e);
93                                Err(DbError::General(e.to_string()))
94                            }
95                        }
96                    });
97                    Ok(results.collect::<Vec<_>>().await)
98                })
99            })
100            .await
101    }
102
103    async fn get_log(&self, block_number: u64, tx_index: u64, log_index: u64) -> Result<SerializableLog> {
104        let bn_enc = block_number.to_be_bytes().to_vec();
105        let tx_index_enc = tx_index.to_be_bytes().to_vec();
106        let log_index_enc = log_index.to_be_bytes().to_vec();
107
108        let query = Log::find()
109            .filter(log::Column::BlockNumber.eq(bn_enc))
110            .filter(log::Column::TransactionIndex.eq(tx_index_enc))
111            .filter(log::Column::LogIndex.eq(log_index_enc))
112            .find_also_related(LogStatus);
113
114        match query.all(self.conn(TargetDb::Logs)).await {
115            Ok(mut res) => {
116                if let Some((log, log_status)) = res.pop() {
117                    if let Some(status) = log_status {
118                        create_log(log, status).map_err(DbError::from)
119                    } else {
120                        Err(DbError::MissingLogStatus)
121                    }
122                } else {
123                    Err(DbError::MissingLog)
124                }
125            }
126            Err(e) => Err(DbError::from(DbSqlError::from(e))),
127        }
128    }
129
130    async fn get_logs<'a>(
131        &'a self,
132        block_number: Option<u64>,
133        block_offset: Option<u64>,
134    ) -> Result<Vec<SerializableLog>> {
135        let min_block_number = block_number.unwrap_or(0);
136        let max_block_number = block_offset.map(|v| min_block_number + v + 1);
137
138        let query = Log::find()
139            .find_also_related(LogStatus)
140            .filter(log::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
141            .apply_if(max_block_number, |q, v| {
142                q.filter(log::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
143            })
144            .order_by_asc(log::Column::BlockNumber)
145            .order_by_asc(log::Column::TransactionIndex)
146            .order_by_asc(log::Column::LogIndex);
147
148        match query.all(self.conn(TargetDb::Logs)).await {
149            Ok(logs) => Ok(logs
150                .into_iter()
151                .map(|(log, status)| {
152                    if let Some(status) = status {
153                        create_log(log, status).unwrap()
154                    } else {
155                        error!("Missing log status for log in db: {:?}", log);
156                        SerializableLog::try_from(log).unwrap()
157                    }
158                })
159                .collect()),
160            Err(e) => {
161                error!("Failed to get logs from db: {:?}", e);
162                Err(DbError::from(DbSqlError::from(e)))
163            }
164        }
165    }
166
167    async fn get_logs_count(&self, block_number: Option<u64>, block_offset: Option<u64>) -> Result<u64> {
168        let min_block_number = block_number.unwrap_or(0);
169        let max_block_number = block_offset.map(|v| min_block_number + v + 1);
170
171        Log::find()
172            .select_only()
173            .column(log::Column::BlockNumber)
174            .column(log::Column::TransactionIndex)
175            .column(log::Column::LogIndex)
176            .filter(log::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
177            .apply_if(max_block_number, |q, v| {
178                q.filter(log::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
179            })
180            .count(self.conn(TargetDb::Logs))
181            .await
182            .map_err(|e| DbSqlError::from(e).into())
183    }
184
185    async fn get_logs_block_numbers<'a>(
186        &'a self,
187        block_number: Option<u64>,
188        block_offset: Option<u64>,
189        processed: Option<bool>,
190    ) -> Result<Vec<u64>> {
191        let min_block_number = block_number.unwrap_or(0);
192        let max_block_number = block_offset.map(|v| min_block_number + v + 1);
193
194        LogStatus::find()
195            .select_only()
196            .column(log_status::Column::BlockNumber)
197            .distinct()
198            .filter(log_status::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
199            .apply_if(max_block_number, |q, v| {
200                q.filter(log_status::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
201            })
202            .apply_if(processed, |q, v| q.filter(log_status::Column::Processed.eq(v)))
203            .order_by_asc(log_status::Column::BlockNumber)
204            .into_model::<BlockNumber>()
205            .all(self.conn(TargetDb::Logs))
206            .await
207            .map(|res| {
208                res.into_iter()
209                    .map(|b| U256::from_be_bytes(b.block_number).as_u64())
210                    .collect()
211            })
212            .map_err(|e| {
213                error!("Failed to get logs block numbers from db: {:?}", e);
214                DbError::from(DbSqlError::from(e))
215            })
216    }
217
218    async fn set_logs_processed(&self, block_number: Option<u64>, block_offset: Option<u64>) -> Result<()> {
219        let min_block_number = block_number.unwrap_or(0);
220        let max_block_number = block_offset.map(|v| min_block_number + v + 1);
221        let now = Utc::now();
222
223        let query = LogStatus::update_many()
224            .col_expr(log_status::Column::Processed, Expr::value(Value::Bool(Some(true))))
225            .col_expr(
226                log_status::Column::ProcessedAt,
227                Expr::value(Value::ChronoDateTimeUtc(Some(now.into()))),
228            )
229            .filter(log_status::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
230            .apply_if(max_block_number, |q, v| {
231                q.filter(log_status::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
232            });
233
234        match query.exec(self.conn(TargetDb::Logs)).await {
235            Ok(_) => Ok(()),
236            Err(e) => Err(DbError::from(DbSqlError::from(e))),
237        }
238    }
239
240    async fn set_log_processed<'a>(&'a self, mut log: SerializableLog) -> Result<()> {
241        log.processed = Some(true);
242        log.processed_at = Some(Utc::now());
243        let log_status = log_status::ActiveModel::from(log);
244
245        let db_tx = self.nest_transaction_in_db(None, TargetDb::Logs).await?;
246
247        db_tx
248            .perform(|tx| {
249                Box::pin(async move {
250                    match LogStatus::update(log_status).exec(tx.as_ref()).await {
251                        Ok(_) => Ok(()),
252                        Err(e) => {
253                            error!("Failed to update log status in db");
254                            Err(DbError::from(DbSqlError::from(e)))
255                        }
256                    }
257                })
258            })
259            .await
260    }
261
262    async fn set_logs_unprocessed(&self, block_number: Option<u64>, block_offset: Option<u64>) -> Result<()> {
263        let min_block_number = block_number.unwrap_or(0);
264        let max_block_number = block_offset.map(|v| min_block_number + v + 1);
265
266        let query = LogStatus::update_many()
267            .col_expr(log_status::Column::Processed, Expr::value(Value::Bool(Some(false))))
268            .col_expr(
269                log_status::Column::ProcessedAt,
270                Expr::value(Value::ChronoDateTimeUtc(None)),
271            )
272            .filter(log_status::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
273            .apply_if(max_block_number, |q, v| {
274                q.filter(log_status::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
275            });
276
277        match query.exec(self.conn(TargetDb::Logs)).await {
278            Ok(_) => Ok(()),
279            Err(e) => Err(DbError::from(DbSqlError::from(e))),
280        }
281    }
282
283    async fn get_last_checksummed_log(&self) -> Result<Option<SerializableLog>> {
284        let query = LogStatus::find()
285            .filter(log_status::Column::Checksum.is_not_null())
286            .order_by_desc(log_status::Column::BlockNumber)
287            .order_by_desc(log_status::Column::TransactionIndex)
288            .order_by_desc(log_status::Column::LogIndex)
289            .find_also_related(Log);
290
291        match query.one(self.conn(TargetDb::Logs)).await {
292            Ok(Some((status, Some(log)))) => {
293                if let Ok(slog) = create_log(log, status) {
294                    Ok(Some(slog))
295                } else {
296                    Ok(None)
297                }
298            }
299            Ok(_) => Ok(None),
300            Err(e) => Err(DbError::from(DbSqlError::from(e))),
301        }
302    }
303
304    async fn update_logs_checksums(&self) -> Result<Hash> {
305        self.nest_transaction_in_db(None, TargetDb::Logs)
306            .await?
307            .perform(|tx| {
308                Box::pin(async move {
309                    let mut last_checksum = LogStatus::find()
310                        .filter(log_status::Column::Checksum.is_not_null())
311                        .order_by_desc(log_status::Column::BlockNumber)
312                        .order_by_desc(log_status::Column::TransactionIndex)
313                        .order_by_desc(log_status::Column::LogIndex)
314                        .one(tx.as_ref())
315                        .await
316                        .map_err(|e| DbError::from(DbSqlError::from(e)))?
317                        .and_then(|m| m.checksum)
318                        .and_then(|c| Hash::try_from(c.as_slice()).ok())
319                        .unwrap_or_default();
320
321                    let query = LogStatus::find()
322                        .filter(log_status::Column::Checksum.is_null())
323                        .order_by_asc(log_status::Column::BlockNumber)
324                        .order_by_asc(log_status::Column::TransactionIndex)
325                        .order_by_asc(log_status::Column::LogIndex)
326                        .find_also_related(Log);
327
328                    match query.all(tx.as_ref()).await {
329                        Ok(entries) => {
330                            let mut entries = entries.into_iter();
331                            while let Some((status, Some(log_entry))) = entries.next() {
332                                let slog = create_log(log_entry.clone(), status.clone())?;
333                                // we compute the hash of a single log as a combination of the block
334                                // hash, TX hash, and the log index
335                                let log_hash = Hash::create(&[
336                                    log_entry.block_hash.as_slice(),
337                                    log_entry.transaction_hash.as_slice(),
338                                    log_entry.log_index.as_slice(),
339                                ]);
340
341                                let next_checksum = Hash::create(&[last_checksum.as_ref(), log_hash.as_ref()]);
342
343                                let mut updated_status = status.into_active_model();
344                                updated_status.checksum = Set(Some(next_checksum.as_ref().to_vec()));
345
346                                match updated_status.update(tx.as_ref()).await {
347                                    Ok(_) => {
348                                        last_checksum = next_checksum;
349                                        trace!(log = %slog, checksum = %next_checksum, "Generated log checksum");
350                                    }
351                                    Err(error) => {
352                                        error!(%error, "Failed to update log status checksum in db");
353                                        break;
354                                    }
355                                }
356                            }
357                            Ok(last_checksum)
358                        }
359                        Err(e) => Err(DbError::from(DbSqlError::from(e))),
360                    }
361                })
362            })
363            .await
364    }
365
366    async fn ensure_logs_origin(&self, contract_address_topics: Vec<(Address, Hash)>) -> Result<()> {
367        if contract_address_topics.is_empty() {
368            return Err(DbError::LogicalError(
369                "contract address topics must not be empty".into(),
370            ));
371        }
372
373        self.nest_transaction_in_db(None, TargetDb::Logs)
374            .await?
375            .perform(|tx| {
376                Box::pin(async move {
377                    // keep selected columns to a minimum to reduce copy overhead in db
378                    let log_count = Log::find()
379                        .select_only()
380                        .column(log::Column::BlockNumber)
381                        .column(log::Column::TransactionIndex)
382                        .column(log::Column::LogIndex)
383                        .count(tx.as_ref())
384                        .await
385                        .map_err(|e| DbError::from(DbSqlError::from(e)))?;
386                    let log_topic_count = LogTopicInfo::find()
387                        .count(tx.as_ref())
388                        .await
389                        .map_err(|e| DbError::from(DbSqlError::from(e)))?;
390
391                    if log_count == 0 && log_topic_count == 0 {
392                        // Prime the DB with the values
393                        LogTopicInfo::insert_many(contract_address_topics.into_iter().map(|(addr, topic)| {
394                            log_topic_info::ActiveModel {
395                                address: Set(addr.to_string()),
396                                topic: Set(topic.to_string()),
397                                ..Default::default()
398                            }
399                        }))
400                        .exec(tx.as_ref())
401                        .await
402                        .map_err(|e| DbError::from(DbSqlError::from(e)))?;
403                    } else {
404                        // Check that all contract addresses and topics are in the DB
405                        for (addr, topic) in contract_address_topics {
406                            let log_topic_count = LogTopicInfo::find()
407                                .filter(log_topic_info::Column::Address.eq(addr.to_string()))
408                                .filter(log_topic_info::Column::Topic.eq(topic.to_string()))
409                                .count(tx.as_ref())
410                                .await
411                                .map_err(|e| DbError::from(DbSqlError::from(e)))?;
412                            if log_topic_count != 1 {
413                                return Err(DbError::InconsistentLogs);
414                            }
415                        }
416                    }
417                    Ok(())
418                })
419            })
420            .await
421    }
422}
423
424fn create_log(raw_log: log::Model, status: log_status::Model) -> crate::errors::Result<SerializableLog> {
425    let log = SerializableLog::try_from(raw_log).map_err(DbSqlError::from)?;
426
427    let checksum = if let Some(c) = status.checksum {
428        let h: std::result::Result<[u8; 32], _> = c.try_into();
429
430        if let Ok(hash) = h {
431            Some(Hash::from(hash).to_hex())
432        } else {
433            return Err(DbSqlError::from(DbEntityError::ConversionError(
434                "Invalid log checksum".into(),
435            )));
436        }
437    } else {
438        None
439    };
440
441    let log = if let Some(raw_ts) = status.processed_at {
442        let ts = DateTime::<Utc>::from_naive_utc_and_offset(raw_ts, Utc);
443        SerializableLog {
444            processed: Some(status.processed),
445            processed_at: Some(ts),
446            checksum,
447            ..log
448        }
449    } else {
450        SerializableLog {
451            processed: Some(status.processed),
452            processed_at: None,
453            checksum,
454            ..log
455        }
456    };
457
458    Ok(log)
459}
460
461#[cfg(test)]
462mod tests {
463    use hopr_crypto_types::prelude::{ChainKeypair, Hash, Keypair};
464
465    use super::*;
466
467    #[tokio::test]
468    async fn test_store_single_log() {
469        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
470
471        let log = SerializableLog {
472            address: Address::new(b"my address 123456789"),
473            topics: [Hash::create(&[b"my topic"]).into()].into(),
474            data: [1, 2, 3, 4].into(),
475            tx_index: 1u64,
476            block_number: 1u64,
477            block_hash: Hash::create(&[b"my block hash"]).into(),
478            tx_hash: Hash::create(&[b"my tx hash"]).into(),
479            log_index: 1u64,
480            removed: false,
481            processed: Some(false),
482            ..Default::default()
483        };
484
485        db.store_log(log.clone()).await.unwrap();
486
487        let logs = db.get_logs(None, None).await.unwrap();
488
489        assert_eq!(logs.len(), 1);
490        assert_eq!(logs[0], log);
491    }
492
493    #[tokio::test]
494    async fn test_store_multiple_logs() {
495        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
496
497        let log_1 = SerializableLog {
498            address: Address::new(b"my address 123456789"),
499            topics: [Hash::create(&[b"my topic"]).into()].into(),
500            data: [1, 2, 3, 4].into(),
501            tx_index: 1u64,
502            block_number: 1u64,
503            block_hash: Hash::create(&[b"my block hash"]).into(),
504            tx_hash: Hash::create(&[b"my tx hash"]).into(),
505            log_index: 1u64,
506            removed: false,
507            processed: Some(false),
508            ..Default::default()
509        };
510
511        let log_2 = SerializableLog {
512            address: Address::new(b"my address 223456789"),
513            topics: [Hash::create(&[b"my topic 2"]).into()].into(),
514            data: [1, 2, 3, 4, 5].into(),
515            tx_index: 2u64,
516            block_number: 2u64,
517            block_hash: Hash::create(&[b"my block hash 2"]).into(),
518            tx_hash: Hash::create(&[b"my tx hash 2"]).into(),
519            log_index: 2u64,
520            removed: false,
521            processed: Some(true),
522            ..Default::default()
523        };
524
525        db.store_log(log_1.clone()).await.unwrap();
526        db.store_log(log_2.clone()).await.unwrap();
527
528        let logs = db.get_logs(None, None).await.unwrap();
529
530        assert_eq!(logs.len(), 2);
531        assert_eq!(logs[0], log_1);
532        assert_eq!(logs[1], log_2);
533
534        let log_2_retrieved = db
535            .get_log(log_2.block_number, log_2.tx_index, log_2.log_index)
536            .await
537            .unwrap();
538
539        assert_eq!(log_2, log_2_retrieved);
540    }
541
542    #[tokio::test]
543    async fn test_store_duplicate_log() {
544        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
545
546        let log = SerializableLog {
547            address: Address::new(b"my address 123456789"),
548            topics: [Hash::create(&[b"my topic"]).into()].into(),
549            data: [1, 2, 3, 4].into(),
550            tx_index: 1u64,
551            block_number: 1u64,
552            block_hash: Hash::create(&[b"my block hash"]).into(),
553            tx_hash: Hash::create(&[b"my tx hash"]).into(),
554            log_index: 1u64,
555            removed: false,
556            ..Default::default()
557        };
558
559        db.store_log(log.clone()).await.unwrap();
560
561        db.store_log(log.clone())
562            .await
563            .expect_err("Expected error due to duplicate log insertion");
564
565        let logs = db.get_logs(None, None).await.unwrap();
566
567        assert_eq!(logs.len(), 1);
568    }
569
570    #[tokio::test]
571    async fn test_set_log_processed() {
572        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
573
574        let log = SerializableLog {
575            address: Address::new(b"my address 123456789"),
576            topics: [Hash::create(&[b"my topic"]).into()].into(),
577            data: [1, 2, 3, 4].into(),
578            tx_index: 1u64,
579            block_number: 1u64,
580            block_hash: Hash::create(&[b"my block hash"]).into(),
581            tx_hash: Hash::create(&[b"my tx hash"]).into(),
582            log_index: 1u64,
583            removed: false,
584            ..Default::default()
585        };
586
587        db.store_log(log.clone()).await.unwrap();
588
589        let log_db = db.get_log(log.block_number, log.tx_index, log.log_index).await.unwrap();
590
591        assert_eq!(log_db.processed, Some(false));
592        assert_eq!(log_db.processed_at, None);
593
594        db.set_log_processed(log.clone()).await.unwrap();
595
596        let log_db_updated = db.get_log(log.block_number, log.tx_index, log.log_index).await.unwrap();
597
598        assert_eq!(log_db_updated.processed, Some(true));
599        assert!(log_db_updated.processed_at.is_some());
600    }
601
602    #[tokio::test]
603    async fn test_list_logs_ordered() {
604        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
605
606        let logs_per_tx = 3;
607        let tx_per_block = 3;
608        let blocks = 10;
609        let start_block = 32183412;
610        let base_log = SerializableLog {
611            address: Address::new(b"my address 123456789"),
612            topics: [Hash::create(&[b"my topic"]).into()].into(),
613            data: [1, 2, 3, 4].into(),
614            tx_index: 0,
615            block_number: 0,
616            block_hash: Hash::create(&[b"my block hash"]).into(),
617            tx_hash: Hash::create(&[b"my tx hash"]).into(),
618            log_index: 0,
619            removed: false,
620            ..Default::default()
621        };
622
623        for block_offset in 0..blocks {
624            for tx_index in 0..tx_per_block {
625                for log_index in 0..logs_per_tx {
626                    let log = SerializableLog {
627                        tx_index,
628                        block_number: start_block + block_offset,
629                        log_index,
630                        ..base_log.clone()
631                    };
632                    db.store_log(log).await.unwrap()
633                }
634            }
635        }
636
637        let block_fetch_interval = 3;
638        let mut next_block = start_block;
639
640        while next_block <= start_block + blocks {
641            let ordered_logs = db.get_logs(Some(next_block), Some(block_fetch_interval)).await.unwrap();
642
643            assert!(!ordered_logs.is_empty());
644
645            ordered_logs.iter().reduce(|prev_log, curr_log| {
646                assert!(prev_log.block_number >= next_block);
647                assert!(prev_log.block_number <= (next_block + block_fetch_interval));
648                assert!(curr_log.block_number >= next_block);
649                assert!(curr_log.block_number <= (next_block + block_fetch_interval));
650                if prev_log.block_number == curr_log.block_number {
651                    if prev_log.tx_index == curr_log.tx_index {
652                        assert!(prev_log.log_index < curr_log.log_index);
653                    } else {
654                        assert!(prev_log.tx_index < curr_log.tx_index);
655                    }
656                } else {
657                    assert!(prev_log.block_number < curr_log.block_number);
658                }
659                curr_log
660            });
661            next_block += block_fetch_interval;
662        }
663    }
664
665    #[tokio::test]
666    async fn test_get_nonexistent_log() {
667        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
668
669        let result = db.get_log(999, 999, 999).await;
670
671        assert!(result.is_err());
672    }
673
674    #[tokio::test]
675    async fn test_get_logs_with_block_offset() {
676        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
677
678        let log_1 = SerializableLog {
679            address: Address::new(b"my address 123456789"),
680            topics: [Hash::create(&[b"topic1"]).into()].into(),
681            data: [1, 2, 3, 4].into(),
682            tx_index: 1,
683            block_number: 1,
684            block_hash: Hash::create(&[b"block_hash1"]).into(),
685            tx_hash: Hash::create(&[b"tx_hash1"]).into(),
686            log_index: 1,
687            removed: false,
688            processed: Some(false),
689            ..Default::default()
690        };
691
692        let log_2 = SerializableLog {
693            address: Address::new(b"my address 223456789"),
694            topics: [Hash::create(&[b"topic2"]).into()].into(),
695            data: [1, 2, 3, 4].into(),
696            tx_index: 2,
697            block_number: 2,
698            block_hash: Hash::create(&[b"block_hash2"]).into(),
699            tx_hash: Hash::create(&[b"tx_hash2"]).into(),
700            log_index: 2,
701            removed: false,
702            processed: Some(false),
703            ..Default::default()
704        };
705
706        db.store_logs(vec![log_1.clone(), log_2.clone()])
707            .await
708            .unwrap()
709            .into_iter()
710            .for_each(|r| assert!(r.is_ok()));
711
712        let logs = db.get_logs(Some(1), Some(0)).await.unwrap();
713
714        assert_eq!(logs.len(), 1);
715        assert_eq!(logs[0], log_1);
716    }
717
718    #[tokio::test]
719    async fn test_set_logs_unprocessed() {
720        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
721
722        let log = SerializableLog {
723            address: Address::new(b"my address 123456789"),
724            topics: [Hash::create(&[b"topic"]).into()].into(),
725            data: [1, 2, 3, 4].into(),
726            tx_index: 1,
727            block_number: 1,
728            block_hash: Hash::create(&[b"block_hash"]).into(),
729            tx_hash: Hash::create(&[b"tx_hash"]).into(),
730            log_index: 1,
731            removed: false,
732            processed: Some(true),
733            processed_at: Some(Utc::now()),
734            ..Default::default()
735        };
736
737        db.store_log(log.clone()).await.unwrap();
738
739        db.set_logs_unprocessed(Some(1), Some(0)).await.unwrap();
740
741        let log_db = db.get_log(log.block_number, log.tx_index, log.log_index).await.unwrap();
742
743        assert_eq!(log_db.processed, Some(false));
744        assert!(log_db.processed_at.is_none());
745    }
746
747    #[tokio::test]
748    async fn test_get_logs_block_numbers() {
749        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
750
751        let log_1 = SerializableLog {
752            address: Address::new(b"my address 123456789"),
753            topics: [Hash::create(&[b"topic1"]).into()].into(),
754            data: [1, 2, 3, 4].into(),
755            tx_index: 1,
756            block_number: 1,
757            block_hash: Hash::create(&[b"block_hash1"]).into(),
758            tx_hash: Hash::create(&[b"tx_hash1"]).into(),
759            log_index: 1,
760            removed: false,
761            processed: Some(true),
762            ..Default::default()
763        };
764
765        let log_2 = SerializableLog {
766            address: Address::new(b"my address 223456789"),
767            topics: [Hash::create(&[b"topic2"]).into()].into(),
768            data: [1, 2, 3, 4].into(),
769            tx_index: 2,
770            block_number: 2,
771            block_hash: Hash::create(&[b"block_hash2"]).into(),
772            tx_hash: Hash::create(&[b"tx_hash2"]).into(),
773            log_index: 2,
774            removed: false,
775            processed: Some(false),
776            ..Default::default()
777        };
778
779        let log_3 = SerializableLog {
780            address: Address::new(b"my address 323456789"),
781            topics: [Hash::create(&[b"topic3"]).into()].into(),
782            data: [1, 2, 3, 4].into(),
783            tx_index: 3,
784            block_number: 3,
785            block_hash: Hash::create(&[b"block_hash3"]).into(),
786            tx_hash: Hash::create(&[b"tx_hash3"]).into(),
787            log_index: 3,
788            removed: false,
789            processed: Some(false),
790            ..Default::default()
791        };
792
793        db.store_logs(vec![log_1.clone(), log_2.clone(), log_3.clone()])
794            .await
795            .unwrap()
796            .into_iter()
797            .for_each(|r| assert!(r.is_ok()));
798
799        let block_numbers_all = db.get_logs_block_numbers(None, None, None).await.unwrap();
800        assert_eq!(block_numbers_all.len(), 3);
801        assert_eq!(block_numbers_all, [1, 2, 3]);
802
803        let block_numbers_first_only = db.get_logs_block_numbers(Some(1), Some(0), None).await.unwrap();
804        assert_eq!(block_numbers_first_only.len(), 1);
805        assert_eq!(block_numbers_first_only[0], 1);
806
807        let block_numbers_last_only = db.get_logs_block_numbers(Some(3), Some(0), None).await.unwrap();
808        assert_eq!(block_numbers_last_only.len(), 1);
809        assert_eq!(block_numbers_last_only[0], 3);
810
811        let block_numbers_processed = db.get_logs_block_numbers(None, None, Some(true)).await.unwrap();
812        assert_eq!(block_numbers_processed.len(), 1);
813        assert_eq!(block_numbers_processed[0], 1);
814
815        let block_numbers_unprocessed_second = db.get_logs_block_numbers(Some(2), Some(0), Some(false)).await.unwrap();
816        assert_eq!(block_numbers_unprocessed_second.len(), 1);
817        assert_eq!(block_numbers_unprocessed_second[0], 2);
818    }
819
820    #[tokio::test]
821    async fn test_update_logs_checksums() {
822        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
823
824        // insert first log and update checksum
825        let log_1 = SerializableLog {
826            address: Address::new(b"my address 123456789"),
827            topics: [Hash::create(&[b"topic"]).into()].into(),
828            data: [1, 2, 3, 4].into(),
829            tx_index: 1,
830            block_number: 1,
831            block_hash: Hash::create(&[b"block_hash"]).into(),
832            tx_hash: Hash::create(&[b"tx_hash"]).into(),
833            log_index: 1,
834            removed: false,
835            ..Default::default()
836        };
837
838        db.store_log(log_1.clone()).await.unwrap();
839
840        assert!(db.get_last_checksummed_log().await.unwrap().is_none());
841
842        db.update_logs_checksums().await.unwrap();
843
844        let updated_log_1 = db.get_last_checksummed_log().await.unwrap().unwrap();
845        assert!(updated_log_1.checksum.is_some());
846
847        // insert two more logs and update checksums
848        let log_2 = SerializableLog {
849            block_number: 2,
850            ..log_1.clone()
851        };
852        let log_3 = SerializableLog {
853            block_number: 3,
854            ..log_1.clone()
855        };
856
857        db.store_logs(vec![log_2.clone(), log_3.clone()])
858            .await
859            .unwrap()
860            .into_iter()
861            .for_each(|r| assert!(r.is_ok()));
862
863        // ensure the first log is still the last updated
864        assert_eq!(
865            updated_log_1.clone().checksum.unwrap(),
866            db.get_last_checksummed_log().await.unwrap().unwrap().checksum.unwrap()
867        );
868
869        db.update_logs_checksums().await.unwrap();
870
871        let updated_log_3 = db.get_last_checksummed_log().await.unwrap().unwrap();
872
873        db.get_logs(None, None).await.unwrap().into_iter().for_each(|log| {
874            assert!(log.checksum.is_some());
875        });
876
877        // ensure the first log is not the last updated anymore
878        assert_ne!(
879            updated_log_1.clone().checksum.unwrap(),
880            updated_log_3.clone().checksum.unwrap(),
881        );
882        assert_ne!(updated_log_1, updated_log_3);
883    }
884
885    #[tokio::test]
886    async fn test_should_not_allow_inconsistent_logs_in_the_db() -> anyhow::Result<()> {
887        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
888        let addr_1 = Address::new(b"my address 123456789");
889        let addr_2 = Address::new(b"my 2nd address 12345");
890        let topic_1 = Hash::create(&[b"my topic 1"]);
891        let topic_2 = Hash::create(&[b"my topic 2"]);
892
893        db.ensure_logs_origin(vec![(addr_1, topic_1)]).await?;
894
895        db.ensure_logs_origin(vec![(addr_1, topic_2)])
896            .await
897            .expect_err("expected error due to inconsistent logs in the db");
898
899        db.ensure_logs_origin(vec![(addr_2, topic_1)])
900            .await
901            .expect_err("expected error due to inconsistent logs in the db");
902
903        db.ensure_logs_origin(vec![(addr_1, topic_1)]).await?;
904
905        Ok(())
906    }
907}