hopr_chain_indexer/snapshot/
validate.rs

1//! SQLite database validation for snapshot integrity verification.
2//!
3//! Validates extracted snapshot databases to ensure they contain expected
4//! tables, data, and are not corrupted before installation.
5
6use std::{fs, path::Path};
7
8use sqlx::{
9    Connection,
10    sqlite::{SqliteConnectOptions, SqliteConnection},
11};
12use tracing::{info, warn};
13
14use crate::snapshot::error::{SnapshotError, SnapshotResult};
15
16/// Metadata about a validated snapshot database.
17///
18/// Contains information gathered during validation that describes
19/// the contents and state of the snapshot database.
20#[derive(Debug, Clone)]
21pub struct SnapshotInfo {
22    /// Total number of log entries in the snapshot
23    pub log_count: u64,
24    /// Highest block number found in the snapshot (if any)
25    pub latest_block: Option<u64>,
26    /// Number of database tables found
27    pub tables: usize,
28    /// SQLite version used to create the database
29    pub sqlite_version: String,
30    /// Database file size in bytes
31    pub db_size: u64,
32}
33
34/// Validates SQLite snapshot databases for integrity and expected content.
35///
36/// Performs comprehensive validation including database connectivity,
37/// schema verification, and data integrity checks.
38#[derive(Default, PartialEq, Eq)]
39pub struct SnapshotValidator {
40    /// Required tables that must exist in valid snapshot databases
41    expected_tables: Vec<String>,
42}
43
44impl SnapshotValidator {
45    /// Creates a new validator with predefined expected tables.
46    ///
47    /// Expected tables include:
48    /// - `log` - Blockchain log entries
49    /// - `log_status` - Log processing status
50    /// - `log_topic_info` - Log topic information
51    /// - `seaql_migrations` - Database migration tracking
52    ///
53    /// Note: `sqlite_sequence` is not required as it's automatically managed by SQLite
54    pub fn new() -> Self {
55        Self {
56            expected_tables: vec![
57                "log".to_string(),
58                "log_status".to_string(),
59                "log_topic_info".to_string(),
60                "seaql_migrations".to_string(),
61            ],
62        }
63    }
64
65    /// Validates a snapshot database for integrity and expected content.
66    ///
67    /// Performs comprehensive validation:
68    /// 1. File existence and accessibility
69    /// 2. Database connectivity
70    /// 3. Schema validation (expected tables)
71    /// 4. Data integrity checks
72    /// 5. Content statistics gathering
73    ///
74    /// # Arguments
75    ///
76    /// * `db_path` - Path to the SQLite database file to validate
77    ///
78    /// # Returns
79    ///
80    /// [`SnapshotInfo`] containing validation results and database metadata
81    ///
82    /// # Errors
83    ///
84    /// * [`SnapshotError::Validation`] - Database corruption or schema issues
85    /// * [`SnapshotError::Io`] - File access errors
86    pub async fn validate_snapshot(&self, db_path: &Path) -> SnapshotResult<SnapshotInfo> {
87        info!(db = %db_path.display(), "Validating logs snapshot database");
88
89        let snapshot_info = Self::validate_sqlite_db(db_path, &self.expected_tables).await?;
90        info!(?snapshot_info, "Logs snapshot validation successful");
91
92        Ok(snapshot_info)
93    }
94
95    /// Validates the SQLite database
96    async fn validate_sqlite_db(db_path: &Path, expected_tables: &[String]) -> SnapshotResult<SnapshotInfo> {
97        // Check if file exists
98        if !db_path.exists() {
99            return Err(SnapshotError::Validation("Database file does not exist".to_string()));
100        }
101
102        // Get file size
103        let metadata = fs::metadata(db_path)?;
104        let db_size = metadata.len();
105
106        // Open database in read-only mode using sqlx
107        let options = SqliteConnectOptions::new().filename(db_path).read_only(true);
108
109        let mut conn = SqliteConnection::connect_with(&options)
110            .await
111            .map_err(|e| SnapshotError::Validation(format!("Cannot open database: {e}")))?;
112
113        // Check database integrity
114        let integrity_check: String = sqlx::query_scalar("PRAGMA integrity_check")
115            .fetch_one(&mut conn)
116            .await
117            .map_err(|e| SnapshotError::Validation(format!("Integrity check failed: {e}")))?;
118
119        if integrity_check != "ok" {
120            return Err(SnapshotError::Validation(format!(
121                "Database integrity check failed: {integrity_check}",
122            )));
123        }
124
125        // Get SQLite version
126        let sqlite_version: String = sqlx::query_scalar("SELECT sqlite_version()")
127            .fetch_one(&mut conn)
128            .await
129            .map_err(|e| SnapshotError::Validation(format!("Cannot get SQLite version: {e}")))?;
130
131        // Check for expected tables
132        let tables: Vec<String> = sqlx::query_scalar::<_, String>("SELECT name FROM sqlite_master WHERE type='table'")
133            .fetch_all(&mut conn)
134            .await
135            .map_err(|e| SnapshotError::Validation(format!("Cannot query tables: {e}")))?;
136
137        // Verify expected tables exist with strict validation
138        let missing_tables = expected_tables
139            .iter()
140            .filter(|t| !tables.contains(t))
141            .map(|t| t.to_string())
142            .collect::<Vec<_>>();
143        if !missing_tables.is_empty() {
144            return Err(SnapshotError::Validation(format!(
145                "Missing required table(s): {}. Available tables: [{}]",
146                missing_tables.join(", "),
147                tables.join(", ")
148            )));
149        }
150
151        // Get snapshot metadata using the log table (block_number stored as 8-byte big-endian blob)
152        let log_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM log")
153            .fetch_one(&mut conn)
154            .await
155            .map_err(|e| SnapshotError::Validation(format!("Cannot count logs: {e}")))?;
156
157        // Get latest block number from log table (handling blob format)
158        let latest_block: Option<i64> = match sqlx::query_scalar::<_, Vec<u8>>("SELECT MAX(block_number) FROM log")
159            .fetch_optional(&mut conn)
160            .await
161            .map_err(|e| SnapshotError::Validation(format!("Cannot get latest block: {e}")))?
162        {
163            Some(blob) if blob.len() == 8 => {
164                // Convert 8-byte big-endian blob to i64
165                let bytes: [u8; 8] = blob
166                    .try_into()
167                    .map_err(|_| SnapshotError::Validation("Invalid block_number blob length".to_string()))?;
168                Some(i64::from_be_bytes(bytes))
169            }
170            Some(_) => {
171                return Err(SnapshotError::Validation(
172                    "Invalid block_number blob format".to_string(),
173                ));
174            }
175            None => None,
176        };
177
178        // Validate that we have some data
179        if log_count == 0 {
180            warn!("Snapshot database contains no logs");
181        }
182
183        Ok(SnapshotInfo {
184            log_count: log_count as u64,
185            latest_block: latest_block.map(|b| b as u64),
186            tables: tables.len(),
187            sqlite_version,
188            db_size,
189        })
190    }
191
192    /// Checks data consistency
193    pub async fn check_data_consistency(&self, db_path: &Path) -> SnapshotResult<()> {
194        let options = SqliteConnectOptions::new().filename(db_path).read_only(true);
195
196        let mut conn = SqliteConnection::connect_with(&options)
197            .await
198            .map_err(|e| SnapshotError::Validation(format!("Cannot open database: {e}")))?;
199
200        // Check for NULL block_numbers (blob columns can't be negative in the same way)
201        let invalid_logs: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM log WHERE block_number IS NULL")
202            .fetch_one(&mut conn)
203            .await
204            .map_err(|e| SnapshotError::Validation(format!("Cannot check log consistency: {e}")))?;
205
206        if invalid_logs > 0 {
207            return Err(SnapshotError::Validation(format!(
208                "Found {invalid_logs} logs with invalid block numbers",
209            )));
210        }
211
212        Ok(())
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use tempfile::TempDir;
219
220    use super::*;
221    use crate::snapshot::test_utils::create_test_sqlite_db;
222
223    #[tokio::test]
224    async fn test_validate_ok() {
225        let temp_dir = TempDir::new().unwrap();
226        let validator = SnapshotValidator::new();
227
228        // Create test database
229        let db_path = temp_dir.path().join("hopr_logs.db");
230        create_test_sqlite_db(&db_path).await.unwrap();
231
232        // Validate the database
233        let result = validator.validate_snapshot(&db_path).await;
234
235        assert!(result.is_ok(), "Validation should succeed");
236        let info = result.unwrap();
237        assert_eq!(info.log_count, 2);
238        assert_eq!(info.latest_block, Some(2));
239        assert_eq!(info.tables, 4);
240    }
241
242    #[tokio::test]
243    async fn test_missing_file() {
244        let temp_dir = TempDir::new().unwrap();
245        let validator = SnapshotValidator::new();
246
247        // Try to validate non-existent file
248        let db_path = temp_dir.path().join("nonexistent.db");
249        let result = validator.validate_snapshot(&db_path).await;
250
251        assert!(result.is_err(), "Validation should fail for missing file");
252    }
253
254    #[tokio::test]
255    async fn test_sqlite_file_existence_check() {
256        let temp_dir = TempDir::new().unwrap();
257        let db_path = temp_dir.path().join("hopr_logs.db");
258
259        // Create a test database
260        create_test_sqlite_db(&db_path).await.unwrap();
261
262        let validator = SnapshotValidator::new();
263        let result = validator.validate_snapshot(&db_path).await;
264
265        assert!(result.is_ok());
266        let info = result.unwrap();
267        assert_eq!(info.log_count, 2);
268    }
269}