hopr_chain_indexer/snapshot/
validate.rs1use std::{fs, path::Path};
7
8use sqlx::{
9 Connection,
10 sqlite::{SqliteConnectOptions, SqliteConnection},
11};
12use tracing::{info, warn};
13
14use crate::snapshot::error::{SnapshotError, SnapshotResult};
15
16#[derive(Debug, Clone)]
21pub struct SnapshotInfo {
22 pub log_count: u64,
24 pub latest_block: Option<u64>,
26 pub tables: usize,
28 pub sqlite_version: String,
30 pub db_size: u64,
32}
33
34#[derive(Default, PartialEq, Eq)]
39pub struct SnapshotValidator {
40 expected_tables: Vec<String>,
42}
43
44impl SnapshotValidator {
45 pub fn new() -> Self {
55 Self {
56 expected_tables: vec![
57 "log".to_string(),
58 "log_status".to_string(),
59 "log_topic_info".to_string(),
60 "seaql_migrations".to_string(),
61 ],
62 }
63 }
64
65 pub async fn validate_snapshot(&self, db_path: &Path) -> SnapshotResult<SnapshotInfo> {
87 info!(db = %db_path.display(), "Validating logs snapshot database");
88
89 let snapshot_info = Self::validate_sqlite_db(db_path, &self.expected_tables).await?;
90 info!(?snapshot_info, "Logs snapshot validation successful");
91
92 Ok(snapshot_info)
93 }
94
95 async fn validate_sqlite_db(db_path: &Path, expected_tables: &[String]) -> SnapshotResult<SnapshotInfo> {
97 if !db_path.exists() {
99 return Err(SnapshotError::Validation("Database file does not exist".to_string()));
100 }
101
102 let metadata = fs::metadata(db_path)?;
104 let db_size = metadata.len();
105
106 let options = SqliteConnectOptions::new().filename(db_path).read_only(true);
108
109 let mut conn = SqliteConnection::connect_with(&options)
110 .await
111 .map_err(|e| SnapshotError::Validation(format!("Cannot open database: {e}")))?;
112
113 let integrity_check: String = sqlx::query_scalar("PRAGMA integrity_check")
115 .fetch_one(&mut conn)
116 .await
117 .map_err(|e| SnapshotError::Validation(format!("Integrity check failed: {e}")))?;
118
119 if integrity_check != "ok" {
120 return Err(SnapshotError::Validation(format!(
121 "Database integrity check failed: {integrity_check}",
122 )));
123 }
124
125 let sqlite_version: String = sqlx::query_scalar("SELECT sqlite_version()")
127 .fetch_one(&mut conn)
128 .await
129 .map_err(|e| SnapshotError::Validation(format!("Cannot get SQLite version: {e}")))?;
130
131 let tables: Vec<String> = sqlx::query_scalar::<_, String>("SELECT name FROM sqlite_master WHERE type='table'")
133 .fetch_all(&mut conn)
134 .await
135 .map_err(|e| SnapshotError::Validation(format!("Cannot query tables: {e}")))?;
136
137 let missing_tables = expected_tables
139 .iter()
140 .filter(|t| !tables.contains(t))
141 .map(|t| t.to_string())
142 .collect::<Vec<_>>();
143 if !missing_tables.is_empty() {
144 return Err(SnapshotError::Validation(format!(
145 "Missing required table(s): {}. Available tables: [{}]",
146 missing_tables.join(", "),
147 tables.join(", ")
148 )));
149 }
150
151 let log_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM log")
153 .fetch_one(&mut conn)
154 .await
155 .map_err(|e| SnapshotError::Validation(format!("Cannot count logs: {e}")))?;
156
157 let latest_block: Option<i64> = match sqlx::query_scalar::<_, Vec<u8>>("SELECT MAX(block_number) FROM log")
159 .fetch_optional(&mut conn)
160 .await
161 .map_err(|e| SnapshotError::Validation(format!("Cannot get latest block: {e}")))?
162 {
163 Some(blob) if blob.len() == 8 => {
164 let bytes: [u8; 8] = blob
166 .try_into()
167 .map_err(|_| SnapshotError::Validation("Invalid block_number blob length".to_string()))?;
168 Some(i64::from_be_bytes(bytes))
169 }
170 Some(_) => {
171 return Err(SnapshotError::Validation(
172 "Invalid block_number blob format".to_string(),
173 ));
174 }
175 None => None,
176 };
177
178 if log_count == 0 {
180 warn!("Snapshot database contains no logs");
181 }
182
183 Ok(SnapshotInfo {
184 log_count: log_count as u64,
185 latest_block: latest_block.map(|b| b as u64),
186 tables: tables.len(),
187 sqlite_version,
188 db_size,
189 })
190 }
191
192 pub async fn check_data_consistency(&self, db_path: &Path) -> SnapshotResult<()> {
194 let options = SqliteConnectOptions::new().filename(db_path).read_only(true);
195
196 let mut conn = SqliteConnection::connect_with(&options)
197 .await
198 .map_err(|e| SnapshotError::Validation(format!("Cannot open database: {e}")))?;
199
200 let invalid_logs: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM log WHERE block_number IS NULL")
202 .fetch_one(&mut conn)
203 .await
204 .map_err(|e| SnapshotError::Validation(format!("Cannot check log consistency: {e}")))?;
205
206 if invalid_logs > 0 {
207 return Err(SnapshotError::Validation(format!(
208 "Found {invalid_logs} logs with invalid block numbers",
209 )));
210 }
211
212 Ok(())
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use tempfile::TempDir;
219
220 use super::*;
221 use crate::snapshot::test_utils::create_test_sqlite_db;
222
223 #[tokio::test]
224 async fn test_validate_ok() {
225 let temp_dir = TempDir::new().unwrap();
226 let validator = SnapshotValidator::new();
227
228 let db_path = temp_dir.path().join("hopr_logs.db");
230 create_test_sqlite_db(&db_path).await.unwrap();
231
232 let result = validator.validate_snapshot(&db_path).await;
234
235 assert!(result.is_ok(), "Validation should succeed");
236 let info = result.unwrap();
237 assert_eq!(info.log_count, 2);
238 assert_eq!(info.latest_block, Some(2));
239 assert_eq!(info.tables, 4);
240 }
241
242 #[tokio::test]
243 async fn test_missing_file() {
244 let temp_dir = TempDir::new().unwrap();
245 let validator = SnapshotValidator::new();
246
247 let db_path = temp_dir.path().join("nonexistent.db");
249 let result = validator.validate_snapshot(&db_path).await;
250
251 assert!(result.is_err(), "Validation should fail for missing file");
252 }
253
254 #[tokio::test]
255 async fn test_sqlite_file_existence_check() {
256 let temp_dir = TempDir::new().unwrap();
257 let db_path = temp_dir.path().join("hopr_logs.db");
258
259 create_test_sqlite_db(&db_path).await.unwrap();
261
262 let validator = SnapshotValidator::new();
263 let result = validator.validate_snapshot(&db_path).await;
264
265 assert!(result.is_ok());
266 let info = result.unwrap();
267 assert_eq!(info.log_count, 2);
268 }
269}