hopr_db_sql/
logs.rs

1use async_trait::async_trait;
2use futures::{stream, StreamExt};
3use sea_orm::entity::Set;
4use sea_orm::query::QueryTrait;
5use sea_orm::sea_query::{Expr, OnConflict, Value};
6use sea_orm::{
7    ActiveModelTrait, ColumnTrait, EntityTrait, FromQueryResult, IntoActiveModel, PaginatorTrait, QueryFilter,
8    QueryOrder, QuerySelect,
9};
10use tracing::{error, trace};
11
12use hopr_crypto_types::prelude::Hash;
13use hopr_db_api::errors::{DbError, Result};
14use hopr_db_api::logs::HoprDbLogOperations;
15use hopr_db_entity::errors::DbEntityError;
16use hopr_db_entity::prelude::{Log, LogStatus, LogTopicInfo};
17use hopr_db_entity::{log, log_status, log_topic_info};
18use hopr_primitive_types::prelude::*;
19
20use crate::db::HoprDb;
21use crate::errors::DbSqlError;
22use crate::{HoprDbGeneralModelOperations, TargetDb};
23
24#[derive(FromQueryResult)]
25struct BlockNumber {
26    block_number: Vec<u8>,
27}
28
29#[async_trait]
30impl HoprDbLogOperations for HoprDb {
31    async fn store_log<'a>(&'a self, log: SerializableLog) -> Result<()> {
32        match self.store_logs([log].to_vec()).await {
33            Ok(results) => {
34                if let Some(result) = results.into_iter().next() {
35                    result
36                } else {
37                    panic!("when inserting a log into the db, the result should be a single item")
38                }
39            }
40            Err(e) => Err(e),
41        }
42    }
43
44    async fn store_logs(&self, logs: Vec<SerializableLog>) -> Result<Vec<Result<()>>> {
45        self.nest_transaction_in_db(None, TargetDb::Logs)
46            .await?
47            .perform(|tx| {
48                Box::pin(async move {
49                    let results = stream::iter(logs).then(|log| async {
50                        let model = log::ActiveModel::from(log.clone());
51                        let status_model = log_status::ActiveModel::from(log);
52                        let log_status_query = LogStatus::insert(status_model).on_conflict(
53                            OnConflict::columns([
54                                log_status::Column::LogIndex,
55                                log_status::Column::TransactionIndex,
56                                log_status::Column::BlockNumber,
57                            ])
58                            .do_nothing()
59                            .to_owned(),
60                        );
61                        let log_query = Log::insert(model).on_conflict(
62                            OnConflict::columns([
63                                log::Column::LogIndex,
64                                log::Column::TransactionIndex,
65                                log::Column::BlockNumber,
66                            ])
67                            .do_nothing()
68                            .to_owned(),
69                        );
70
71                        match log_status_query.exec(tx.as_ref()).await {
72                            Ok(_) => match log_query.exec(tx.as_ref()).await {
73                                Ok(_) => Ok(()),
74                                Err(e) => {
75                                    error!("Failed to insert log into db: {:?}", e);
76                                    Err(DbError::General(e.to_string()))
77                                }
78                            },
79                            Err(e) => {
80                                error!("Failed to insert log status into db: {:?}", e);
81                                Err(DbError::General(e.to_string()))
82                            }
83                        }
84                    });
85                    Ok(results.collect::<Vec<_>>().await)
86                })
87            })
88            .await
89    }
90
91    async fn get_log(&self, block_number: u64, tx_index: u64, log_index: u64) -> Result<SerializableLog> {
92        let bn_enc = block_number.to_be_bytes().to_vec();
93        let tx_index_enc = tx_index.to_be_bytes().to_vec();
94        let log_index_enc = log_index.to_be_bytes().to_vec();
95
96        let query = Log::find()
97            .filter(log::Column::BlockNumber.eq(bn_enc))
98            .filter(log::Column::TransactionIndex.eq(tx_index_enc))
99            .filter(log::Column::LogIndex.eq(log_index_enc))
100            .find_also_related(LogStatus);
101
102        match query.all(self.conn(TargetDb::Logs)).await {
103            Ok(mut res) => {
104                if let Some((log, log_status)) = res.pop() {
105                    if let Some(status) = log_status {
106                        create_log(log, status).map_err(DbError::from)
107                    } else {
108                        Err(DbError::MissingLogStatus)
109                    }
110                } else {
111                    Err(DbError::MissingLog)
112                }
113            }
114            Err(e) => Err(DbError::from(DbSqlError::from(e))),
115        }
116    }
117
118    async fn get_logs<'a>(
119        &'a self,
120        block_number: Option<u64>,
121        block_offset: Option<u64>,
122    ) -> Result<Vec<SerializableLog>> {
123        let min_block_number = block_number.unwrap_or(0);
124        let max_block_number = block_offset.map(|v| min_block_number + v + 1);
125
126        let query = Log::find()
127            .find_also_related(LogStatus)
128            .filter(log::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
129            .apply_if(max_block_number, |q, v| {
130                q.filter(log::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
131            })
132            .order_by_asc(log::Column::BlockNumber)
133            .order_by_asc(log::Column::TransactionIndex)
134            .order_by_asc(log::Column::LogIndex);
135
136        match query.all(self.conn(TargetDb::Logs)).await {
137            Ok(logs) => Ok(logs
138                .into_iter()
139                .map(|(log, status)| {
140                    if let Some(status) = status {
141                        create_log(log, status).unwrap()
142                    } else {
143                        error!("Missing log status for log in db: {:?}", log);
144                        SerializableLog::try_from(log).unwrap()
145                    }
146                })
147                .collect()),
148            Err(e) => {
149                error!("Failed to get logs from db: {:?}", e);
150                Err(DbError::from(DbSqlError::from(e)))
151            }
152        }
153    }
154
155    async fn get_logs_count(&self, block_number: Option<u64>, block_offset: Option<u64>) -> Result<u64> {
156        let min_block_number = block_number.unwrap_or(0);
157        let max_block_number = block_offset.map(|v| min_block_number + v + 1);
158
159        Log::find()
160            .select_only()
161            .column(log::Column::BlockNumber)
162            .column(log::Column::TransactionIndex)
163            .column(log::Column::LogIndex)
164            .filter(log::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
165            .apply_if(max_block_number, |q, v| {
166                q.filter(log::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
167            })
168            .count(self.conn(TargetDb::Logs))
169            .await
170            .map_err(|e| DbSqlError::from(e).into())
171    }
172
173    async fn get_logs_block_numbers<'a>(
174        &'a self,
175        block_number: Option<u64>,
176        block_offset: Option<u64>,
177        processed: Option<bool>,
178    ) -> Result<Vec<u64>> {
179        let min_block_number = block_number.unwrap_or(0);
180        let max_block_number = block_offset.map(|v| min_block_number + v + 1);
181
182        LogStatus::find()
183            .select_only()
184            .column(log_status::Column::BlockNumber)
185            .distinct()
186            .filter(log_status::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
187            .apply_if(max_block_number, |q, v| {
188                q.filter(log_status::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
189            })
190            .apply_if(processed, |q, v| q.filter(log_status::Column::Processed.eq(v)))
191            .order_by_asc(log_status::Column::BlockNumber)
192            .into_model::<BlockNumber>()
193            .all(self.conn(TargetDb::Logs))
194            .await
195            .map(|res| {
196                res.into_iter()
197                    .map(|b| U256::from_be_bytes(b.block_number).as_u64())
198                    .collect()
199            })
200            .map_err(|e| {
201                error!("Failed to get logs block numbers from db: {:?}", e);
202                DbError::from(DbSqlError::from(e))
203            })
204    }
205
206    async fn set_logs_processed(&self, block_number: Option<u64>, block_offset: Option<u64>) -> Result<()> {
207        let min_block_number = block_number.unwrap_or(0);
208        let max_block_number = block_offset.map(|v| min_block_number + v + 1);
209        let now = Utc::now();
210
211        let query = LogStatus::update_many()
212            .col_expr(log_status::Column::Processed, Expr::value(Value::Bool(Some(true))))
213            .col_expr(
214                log_status::Column::ProcessedAt,
215                Expr::value(Value::ChronoDateTimeUtc(Some(now.into()))),
216            )
217            .filter(log_status::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
218            .apply_if(max_block_number, |q, v| {
219                q.filter(log_status::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
220            });
221
222        match query.exec(self.conn(TargetDb::Logs)).await {
223            Ok(_) => Ok(()),
224            Err(e) => Err(DbError::from(DbSqlError::from(e))),
225        }
226    }
227
228    async fn set_log_processed<'a>(&'a self, mut log: SerializableLog) -> Result<()> {
229        log.processed = Some(true);
230        log.processed_at = Some(Utc::now());
231        let log_status = log_status::ActiveModel::from(log);
232
233        let db_tx = self.nest_transaction_in_db(None, TargetDb::Logs).await?;
234
235        db_tx
236            .perform(|tx| {
237                Box::pin(async move {
238                    match LogStatus::update(log_status).exec(tx.as_ref()).await {
239                        Ok(_) => Ok(()),
240                        Err(e) => {
241                            error!("Failed to update log status in db");
242                            Err(DbError::from(DbSqlError::from(e)))
243                        }
244                    }
245                })
246            })
247            .await
248    }
249
250    async fn set_logs_unprocessed(&self, block_number: Option<u64>, block_offset: Option<u64>) -> Result<()> {
251        let min_block_number = block_number.unwrap_or(0);
252        let max_block_number = block_offset.map(|v| min_block_number + v + 1);
253
254        let query = LogStatus::update_many()
255            .col_expr(log_status::Column::Processed, Expr::value(Value::Bool(Some(false))))
256            .col_expr(
257                log_status::Column::ProcessedAt,
258                Expr::value(Value::ChronoDateTimeUtc(None)),
259            )
260            .filter(log_status::Column::BlockNumber.gte(min_block_number.to_be_bytes().to_vec()))
261            .apply_if(max_block_number, |q, v| {
262                q.filter(log_status::Column::BlockNumber.lt(v.to_be_bytes().to_vec()))
263            });
264
265        match query.exec(self.conn(TargetDb::Logs)).await {
266            Ok(_) => Ok(()),
267            Err(e) => Err(DbError::from(DbSqlError::from(e))),
268        }
269    }
270
271    async fn get_last_checksummed_log(&self) -> Result<Option<SerializableLog>> {
272        let query = LogStatus::find()
273            .filter(log_status::Column::Checksum.is_not_null())
274            .order_by_desc(log_status::Column::BlockNumber)
275            .order_by_desc(log_status::Column::TransactionIndex)
276            .order_by_desc(log_status::Column::LogIndex)
277            .find_also_related(Log);
278
279        match query.one(self.conn(TargetDb::Logs)).await {
280            Ok(Some((status, Some(log)))) => {
281                if let Ok(slog) = create_log(log, status) {
282                    Ok(Some(slog))
283                } else {
284                    Ok(None)
285                }
286            }
287            Ok(_) => Ok(None),
288            Err(e) => Err(DbError::from(DbSqlError::from(e))),
289        }
290    }
291
292    async fn update_logs_checksums(&self) -> Result<Hash> {
293        self.nest_transaction_in_db(None, TargetDb::Logs)
294            .await?
295            .perform(|tx| {
296                Box::pin(async move {
297                    let mut last_checksum = LogStatus::find()
298                        .filter(log_status::Column::Checksum.is_not_null())
299                        .order_by_desc(log_status::Column::BlockNumber)
300                        .order_by_desc(log_status::Column::TransactionIndex)
301                        .order_by_desc(log_status::Column::LogIndex)
302                        .one(tx.as_ref())
303                        .await
304                        .map_err(|e| DbError::from(DbSqlError::from(e)))?
305                        .and_then(|m| m.checksum)
306                        .and_then(|c| Hash::try_from(c.as_slice()).ok())
307                        .unwrap_or_default();
308
309                    let query = LogStatus::find()
310                        .filter(log_status::Column::Checksum.is_null())
311                        .order_by_asc(log_status::Column::BlockNumber)
312                        .order_by_asc(log_status::Column::TransactionIndex)
313                        .order_by_asc(log_status::Column::LogIndex)
314                        .find_also_related(Log);
315
316                    match query.all(tx.as_ref()).await {
317                        Ok(entries) => {
318                            let mut entries = entries.into_iter();
319                            while let Some((status, Some(log_entry))) = entries.next() {
320                                let slog = create_log(log_entry.clone(), status.clone())?;
321                                // we compute the hash of a single log as a combination of the block
322                                // hash, TX hash, and the log index
323                                let log_hash = Hash::create(&[
324                                    log_entry.block_hash.as_slice(),
325                                    log_entry.transaction_hash.as_slice(),
326                                    log_entry.log_index.as_slice(),
327                                ]);
328
329                                let next_checksum = Hash::create(&[last_checksum.as_ref(), log_hash.as_ref()]);
330
331                                let mut updated_status = status.into_active_model();
332                                updated_status.checksum = Set(Some(next_checksum.as_ref().to_vec()));
333
334                                match updated_status.update(tx.as_ref()).await {
335                                    Ok(_) => {
336                                        last_checksum = next_checksum;
337                                        trace!(log = %slog, checksum = %next_checksum, "Generated log checksum");
338                                    }
339                                    Err(error) => {
340                                        error!(%error, "Failed to update log status checksum in db");
341                                        break;
342                                    }
343                                }
344                            }
345                            Ok(last_checksum)
346                        }
347                        Err(e) => Err(DbError::from(DbSqlError::from(e))),
348                    }
349                })
350            })
351            .await
352    }
353
354    async fn ensure_logs_origin(&self, contract_address_topics: Vec<(Address, Hash)>) -> Result<()> {
355        if contract_address_topics.is_empty() {
356            return Err(DbError::LogicalError(
357                "contract address topics must not be empty".into(),
358            ));
359        }
360
361        self.nest_transaction_in_db(None, TargetDb::Logs)
362            .await?
363            .perform(|tx| {
364                Box::pin(async move {
365                    // keep selected columns to a minimum to reduce copy overhead in db
366                    let log_count = Log::find()
367                        .select_only()
368                        .column(log::Column::BlockNumber)
369                        .column(log::Column::TransactionIndex)
370                        .column(log::Column::LogIndex)
371                        .count(tx.as_ref())
372                        .await
373                        .map_err(|e| DbError::from(DbSqlError::from(e)))?;
374                    let log_topic_count = LogTopicInfo::find()
375                        .count(tx.as_ref())
376                        .await
377                        .map_err(|e| DbError::from(DbSqlError::from(e)))?;
378
379                    if log_count == 0 && log_topic_count == 0 {
380                        // Prime the DB with the values
381                        LogTopicInfo::insert_many(contract_address_topics.into_iter().map(|(addr, topic)| {
382                            log_topic_info::ActiveModel {
383                                address: Set(addr.to_string()),
384                                topic: Set(topic.to_string()),
385                                ..Default::default()
386                            }
387                        }))
388                        .exec(tx.as_ref())
389                        .await
390                        .map_err(|e| DbError::from(DbSqlError::from(e)))?;
391                    } else {
392                        // Check that all contract addresses and topics are in the DB
393                        for (addr, topic) in contract_address_topics {
394                            let log_topic_count = LogTopicInfo::find()
395                                .filter(log_topic_info::Column::Address.eq(addr.to_string()))
396                                .filter(log_topic_info::Column::Topic.eq(topic.to_string()))
397                                .count(tx.as_ref())
398                                .await
399                                .map_err(|e| DbError::from(DbSqlError::from(e)))?;
400                            if log_topic_count != 1 {
401                                return Err(DbError::InconsistentLogs);
402                            }
403                        }
404                    }
405                    Ok(())
406                })
407            })
408            .await
409    }
410}
411
412fn create_log(raw_log: log::Model, status: log_status::Model) -> crate::errors::Result<SerializableLog> {
413    let log = SerializableLog::try_from(raw_log).map_err(DbSqlError::from)?;
414
415    let checksum = if let Some(c) = status.checksum {
416        let h: std::result::Result<[u8; 32], _> = c.try_into();
417
418        if let Ok(hash) = h {
419            Some(Hash::from(hash).to_hex())
420        } else {
421            return Err(DbSqlError::from(DbEntityError::ConversionError(
422                "Invalid log checksum".into(),
423            )));
424        }
425    } else {
426        None
427    };
428
429    let log = if let Some(raw_ts) = status.processed_at {
430        let ts = DateTime::<Utc>::from_naive_utc_and_offset(raw_ts, Utc);
431        SerializableLog {
432            processed: Some(status.processed),
433            processed_at: Some(ts),
434            checksum,
435            ..log
436        }
437    } else {
438        SerializableLog {
439            processed: Some(status.processed),
440            processed_at: None,
441            checksum,
442            ..log
443        }
444    };
445
446    Ok(log)
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452
453    use hopr_crypto_types::prelude::{ChainKeypair, Hash, Keypair};
454
455    #[async_std::test]
456    async fn test_store_single_log() {
457        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
458
459        let log = SerializableLog {
460            address: Address::new(b"my address 123456789"),
461            topics: [Hash::create(&[b"my topic"]).into()].into(),
462            data: [1, 2, 3, 4].into(),
463            tx_index: 1u64,
464            block_number: 1u64,
465            block_hash: Hash::create(&[b"my block hash"]).into(),
466            tx_hash: Hash::create(&[b"my tx hash"]).into(),
467            log_index: 1u64,
468            removed: false,
469            processed: Some(false),
470            ..Default::default()
471        };
472
473        db.store_log(log.clone()).await.unwrap();
474
475        let logs = db.get_logs(None, None).await.unwrap();
476
477        assert_eq!(logs.len(), 1);
478        assert_eq!(logs[0], log);
479    }
480
481    #[async_std::test]
482    async fn test_store_multiple_logs() {
483        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
484
485        let log_1 = SerializableLog {
486            address: Address::new(b"my address 123456789"),
487            topics: [Hash::create(&[b"my topic"]).into()].into(),
488            data: [1, 2, 3, 4].into(),
489            tx_index: 1u64,
490            block_number: 1u64,
491            block_hash: Hash::create(&[b"my block hash"]).into(),
492            tx_hash: Hash::create(&[b"my tx hash"]).into(),
493            log_index: 1u64,
494            removed: false,
495            processed: Some(false),
496            ..Default::default()
497        };
498
499        let log_2 = SerializableLog {
500            address: Address::new(b"my address 223456789"),
501            topics: [Hash::create(&[b"my topic 2"]).into()].into(),
502            data: [1, 2, 3, 4, 5].into(),
503            tx_index: 2u64,
504            block_number: 2u64,
505            block_hash: Hash::create(&[b"my block hash 2"]).into(),
506            tx_hash: Hash::create(&[b"my tx hash 2"]).into(),
507            log_index: 2u64,
508            removed: false,
509            processed: Some(true),
510            ..Default::default()
511        };
512
513        db.store_log(log_1.clone()).await.unwrap();
514        db.store_log(log_2.clone()).await.unwrap();
515
516        let logs = db.get_logs(None, None).await.unwrap();
517
518        assert_eq!(logs.len(), 2);
519        assert_eq!(logs[0], log_1);
520        assert_eq!(logs[1], log_2);
521
522        let log_2_retrieved = db
523            .get_log(log_2.block_number, log_2.tx_index, log_2.log_index)
524            .await
525            .unwrap();
526
527        assert_eq!(log_2, log_2_retrieved);
528    }
529
530    #[async_std::test]
531    async fn test_store_duplicate_log() {
532        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
533
534        let log = SerializableLog {
535            address: Address::new(b"my address 123456789"),
536            topics: [Hash::create(&[b"my topic"]).into()].into(),
537            data: [1, 2, 3, 4].into(),
538            tx_index: 1u64,
539            block_number: 1u64,
540            block_hash: Hash::create(&[b"my block hash"]).into(),
541            tx_hash: Hash::create(&[b"my tx hash"]).into(),
542            log_index: 1u64,
543            removed: false,
544            ..Default::default()
545        };
546
547        db.store_log(log.clone()).await.unwrap();
548
549        db.store_log(log.clone())
550            .await
551            .expect_err("Expected error due to duplicate log insertion");
552
553        let logs = db.get_logs(None, None).await.unwrap();
554
555        assert_eq!(logs.len(), 1);
556    }
557
558    #[async_std::test]
559    async fn test_set_log_processed() {
560        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
561
562        let log = SerializableLog {
563            address: Address::new(b"my address 123456789"),
564            topics: [Hash::create(&[b"my topic"]).into()].into(),
565            data: [1, 2, 3, 4].into(),
566            tx_index: 1u64,
567            block_number: 1u64,
568            block_hash: Hash::create(&[b"my block hash"]).into(),
569            tx_hash: Hash::create(&[b"my tx hash"]).into(),
570            log_index: 1u64,
571            removed: false,
572            ..Default::default()
573        };
574
575        db.store_log(log.clone()).await.unwrap();
576
577        let log_db = db.get_log(log.block_number, log.tx_index, log.log_index).await.unwrap();
578
579        assert_eq!(log_db.processed, Some(false));
580        assert_eq!(log_db.processed_at, None);
581
582        db.set_log_processed(log.clone()).await.unwrap();
583
584        let log_db_updated = db.get_log(log.block_number, log.tx_index, log.log_index).await.unwrap();
585
586        assert_eq!(log_db_updated.processed, Some(true));
587        assert!(log_db_updated.processed_at.is_some());
588    }
589
590    #[async_std::test]
591    async fn test_list_logs_ordered() {
592        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
593
594        let logs_per_tx = 3;
595        let tx_per_block = 3;
596        let blocks = 10;
597        let start_block = 32183412;
598        let base_log = SerializableLog {
599            address: Address::new(b"my address 123456789"),
600            topics: [Hash::create(&[b"my topic"]).into()].into(),
601            data: [1, 2, 3, 4].into(),
602            tx_index: 0,
603            block_number: 0,
604            block_hash: Hash::create(&[b"my block hash"]).into(),
605            tx_hash: Hash::create(&[b"my tx hash"]).into(),
606            log_index: 0,
607            removed: false,
608            ..Default::default()
609        };
610
611        for block_offset in 0..blocks {
612            for tx_index in 0..tx_per_block {
613                for log_index in 0..logs_per_tx {
614                    let log = SerializableLog {
615                        tx_index,
616                        block_number: start_block + block_offset,
617                        log_index,
618                        ..base_log.clone()
619                    };
620                    db.store_log(log).await.unwrap()
621                }
622            }
623        }
624
625        let block_fetch_interval = 3;
626        let mut next_block = start_block;
627
628        while next_block <= start_block + blocks {
629            let ordered_logs = db.get_logs(Some(next_block), Some(block_fetch_interval)).await.unwrap();
630
631            assert!(!ordered_logs.is_empty());
632
633            ordered_logs.iter().reduce(|prev_log, curr_log| {
634                assert!(prev_log.block_number >= next_block);
635                assert!(prev_log.block_number <= (next_block + block_fetch_interval));
636                assert!(curr_log.block_number >= next_block);
637                assert!(curr_log.block_number <= (next_block + block_fetch_interval));
638                if prev_log.block_number == curr_log.block_number {
639                    if prev_log.tx_index == curr_log.tx_index {
640                        assert!(prev_log.log_index < curr_log.log_index);
641                    } else {
642                        assert!(prev_log.tx_index < curr_log.tx_index);
643                    }
644                } else {
645                    assert!(prev_log.block_number < curr_log.block_number);
646                }
647                curr_log
648            });
649            next_block += block_fetch_interval;
650        }
651    }
652
653    #[async_std::test]
654    async fn test_get_nonexistent_log() {
655        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
656
657        let result = db.get_log(999, 999, 999).await;
658
659        assert!(result.is_err());
660    }
661
662    #[async_std::test]
663    async fn test_get_logs_with_block_offset() {
664        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
665
666        let log_1 = SerializableLog {
667            address: Address::new(b"my address 123456789"),
668            topics: [Hash::create(&[b"topic1"]).into()].into(),
669            data: [1, 2, 3, 4].into(),
670            tx_index: 1,
671            block_number: 1,
672            block_hash: Hash::create(&[b"block_hash1"]).into(),
673            tx_hash: Hash::create(&[b"tx_hash1"]).into(),
674            log_index: 1,
675            removed: false,
676            processed: Some(false),
677            ..Default::default()
678        };
679
680        let log_2 = SerializableLog {
681            address: Address::new(b"my address 223456789"),
682            topics: [Hash::create(&[b"topic2"]).into()].into(),
683            data: [1, 2, 3, 4].into(),
684            tx_index: 2,
685            block_number: 2,
686            block_hash: Hash::create(&[b"block_hash2"]).into(),
687            tx_hash: Hash::create(&[b"tx_hash2"]).into(),
688            log_index: 2,
689            removed: false,
690            processed: Some(false),
691            ..Default::default()
692        };
693
694        db.store_logs(vec![log_1.clone(), log_2.clone()])
695            .await
696            .unwrap()
697            .into_iter()
698            .for_each(|r| assert!(r.is_ok()));
699
700        let logs = db.get_logs(Some(1), Some(0)).await.unwrap();
701
702        assert_eq!(logs.len(), 1);
703        assert_eq!(logs[0], log_1);
704    }
705
706    #[async_std::test]
707    async fn test_set_logs_unprocessed() {
708        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
709
710        let log = SerializableLog {
711            address: Address::new(b"my address 123456789"),
712            topics: [Hash::create(&[b"topic"]).into()].into(),
713            data: [1, 2, 3, 4].into(),
714            tx_index: 1,
715            block_number: 1,
716            block_hash: Hash::create(&[b"block_hash"]).into(),
717            tx_hash: Hash::create(&[b"tx_hash"]).into(),
718            log_index: 1,
719            removed: false,
720            processed: Some(true),
721            processed_at: Some(Utc::now()),
722            ..Default::default()
723        };
724
725        db.store_log(log.clone()).await.unwrap();
726
727        db.set_logs_unprocessed(Some(1), Some(0)).await.unwrap();
728
729        let log_db = db.get_log(log.block_number, log.tx_index, log.log_index).await.unwrap();
730
731        assert_eq!(log_db.processed, Some(false));
732        assert!(log_db.processed_at.is_none());
733    }
734
735    #[async_std::test]
736    async fn test_get_logs_block_numbers() {
737        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
738
739        let log_1 = SerializableLog {
740            address: Address::new(b"my address 123456789"),
741            topics: [Hash::create(&[b"topic1"]).into()].into(),
742            data: [1, 2, 3, 4].into(),
743            tx_index: 1,
744            block_number: 1,
745            block_hash: Hash::create(&[b"block_hash1"]).into(),
746            tx_hash: Hash::create(&[b"tx_hash1"]).into(),
747            log_index: 1,
748            removed: false,
749            processed: Some(true),
750            ..Default::default()
751        };
752
753        let log_2 = SerializableLog {
754            address: Address::new(b"my address 223456789"),
755            topics: [Hash::create(&[b"topic2"]).into()].into(),
756            data: [1, 2, 3, 4].into(),
757            tx_index: 2,
758            block_number: 2,
759            block_hash: Hash::create(&[b"block_hash2"]).into(),
760            tx_hash: Hash::create(&[b"tx_hash2"]).into(),
761            log_index: 2,
762            removed: false,
763            processed: Some(false),
764            ..Default::default()
765        };
766
767        let log_3 = SerializableLog {
768            address: Address::new(b"my address 323456789"),
769            topics: [Hash::create(&[b"topic3"]).into()].into(),
770            data: [1, 2, 3, 4].into(),
771            tx_index: 3,
772            block_number: 3,
773            block_hash: Hash::create(&[b"block_hash3"]).into(),
774            tx_hash: Hash::create(&[b"tx_hash3"]).into(),
775            log_index: 3,
776            removed: false,
777            processed: Some(false),
778            ..Default::default()
779        };
780
781        db.store_logs(vec![log_1.clone(), log_2.clone(), log_3.clone()])
782            .await
783            .unwrap()
784            .into_iter()
785            .for_each(|r| assert!(r.is_ok()));
786
787        let block_numbers_all = db.get_logs_block_numbers(None, None, None).await.unwrap();
788        assert_eq!(block_numbers_all.len(), 3);
789        assert_eq!(block_numbers_all, [1, 2, 3]);
790
791        let block_numbers_first_only = db.get_logs_block_numbers(Some(1), Some(0), None).await.unwrap();
792        assert_eq!(block_numbers_first_only.len(), 1);
793        assert_eq!(block_numbers_first_only[0], 1);
794
795        let block_numbers_last_only = db.get_logs_block_numbers(Some(3), Some(0), None).await.unwrap();
796        assert_eq!(block_numbers_last_only.len(), 1);
797        assert_eq!(block_numbers_last_only[0], 3);
798
799        let block_numbers_processed = db.get_logs_block_numbers(None, None, Some(true)).await.unwrap();
800        assert_eq!(block_numbers_processed.len(), 1);
801        assert_eq!(block_numbers_processed[0], 1);
802
803        let block_numbers_unprocessed_second = db.get_logs_block_numbers(Some(2), Some(0), Some(false)).await.unwrap();
804        assert_eq!(block_numbers_unprocessed_second.len(), 1);
805        assert_eq!(block_numbers_unprocessed_second[0], 2);
806    }
807
808    #[async_std::test]
809    async fn test_update_logs_checksums() {
810        let db = HoprDb::new_in_memory(ChainKeypair::random()).await.unwrap();
811
812        // insert first log and update checksum
813        let log_1 = SerializableLog {
814            address: Address::new(b"my address 123456789"),
815            topics: [Hash::create(&[b"topic"]).into()].into(),
816            data: [1, 2, 3, 4].into(),
817            tx_index: 1,
818            block_number: 1,
819            block_hash: Hash::create(&[b"block_hash"]).into(),
820            tx_hash: Hash::create(&[b"tx_hash"]).into(),
821            log_index: 1,
822            removed: false,
823            ..Default::default()
824        };
825
826        db.store_log(log_1.clone()).await.unwrap();
827
828        assert!(db.get_last_checksummed_log().await.unwrap().is_none());
829
830        db.update_logs_checksums().await.unwrap();
831
832        let updated_log_1 = db.get_last_checksummed_log().await.unwrap().unwrap();
833        assert!(updated_log_1.checksum.is_some());
834
835        // insert two more logs and update checksums
836        let log_2 = SerializableLog {
837            block_number: 2,
838            ..log_1.clone()
839        };
840        let log_3 = SerializableLog {
841            block_number: 3,
842            ..log_1.clone()
843        };
844
845        db.store_logs(vec![log_2.clone(), log_3.clone()])
846            .await
847            .unwrap()
848            .into_iter()
849            .for_each(|r| assert!(r.is_ok()));
850
851        // ensure the first log is still the last updated
852        assert_eq!(
853            updated_log_1.clone().checksum.unwrap(),
854            db.get_last_checksummed_log().await.unwrap().unwrap().checksum.unwrap()
855        );
856
857        db.update_logs_checksums().await.unwrap();
858
859        let updated_log_3 = db.get_last_checksummed_log().await.unwrap().unwrap();
860
861        db.get_logs(None, None).await.unwrap().into_iter().for_each(|log| {
862            assert!(log.checksum.is_some());
863        });
864
865        // ensure the first log is not the last updated anymore
866        assert_ne!(
867            updated_log_1.clone().checksum.unwrap(),
868            updated_log_3.clone().checksum.unwrap(),
869        );
870        assert_ne!(updated_log_1, updated_log_3);
871    }
872
873    #[async_std::test]
874    async fn test_should_not_allow_inconsistent_logs_in_the_db() -> anyhow::Result<()> {
875        let db = HoprDb::new_in_memory(ChainKeypair::random()).await?;
876        let addr_1 = Address::new(b"my address 123456789");
877        let addr_2 = Address::new(b"my 2nd address 12345");
878        let topic_1 = Hash::create(&[b"my topic 1"]);
879        let topic_2 = Hash::create(&[b"my topic 2"]);
880
881        db.ensure_logs_origin(vec![(addr_1, topic_1)]).await?;
882
883        db.ensure_logs_origin(vec![(addr_1, topic_2)])
884            .await
885            .expect_err("expected error due to inconsistent logs in the db");
886
887        db.ensure_logs_origin(vec![(addr_2, topic_1)])
888            .await
889            .expect_err("expected error due to inconsistent logs in the db");
890
891        db.ensure_logs_origin(vec![(addr_1, topic_1)]).await?;
892
893        Ok(())
894    }
895}