hopr_chain_indexer/snapshot/download.rs
1//! Secure snapshot downloading with HTTP/HTTPS and local file support.
2//!
3//! Provides robust download capabilities for snapshot archives with:
4//! - **URL Support**: HTTP/HTTPS with retry logic, local file:// URLs
5//! - **Safety**: Size limits, timeout protection, disk space validation
6//! - **Reliability**: Exponential backoff, progress tracking, error recovery
7//! - **Cross-platform**: Uses sysinfo for disk space checking
8
9use std::{
10 fs,
11 fs::File,
12 path::Path,
13 sync::{
14 Arc,
15 atomic::{AtomicU64, Ordering},
16 },
17 time::Duration,
18};
19
20use async_lock::Mutex;
21use backon::{FuturesTimerSleeper, Retryable};
22use futures_util::{AsyncWriteExt, TryStreamExt, io::AllowStdIo};
23use reqwest::Client;
24use smart_default::SmartDefault;
25use sysinfo::Disks;
26use tracing::{debug, error, info};
27
28use crate::{
29 constants::{
30 LOGS_SNAPSHOT_DOWNLOADER_MAX_RETRIES, LOGS_SNAPSHOT_DOWNLOADER_MAX_SIZE, LOGS_SNAPSHOT_DOWNLOADER_TIMEOUT,
31 },
32 snapshot::error::{SnapshotError, SnapshotResult},
33};
34
35/// Configuration for snapshot downloads with safety limits.
36///
37/// Controls download behavior including size limits, timeouts, and retry attempts
38/// to ensure safe and reliable snapshot downloads.
39#[derive(Debug, Clone, SmartDefault)]
40pub struct DownloadConfig {
41 /// Maximum allowed file size in bytes
42 #[default(_code = "LOGS_SNAPSHOT_DOWNLOADER_MAX_SIZE")]
43 pub max_size: u64,
44 /// HTTP request timeout duration
45 #[default(_code = "LOGS_SNAPSHOT_DOWNLOADER_TIMEOUT")]
46 pub timeout: Duration,
47 /// Maximum number of retry attempts for failed downloads
48 #[default(_code = "LOGS_SNAPSHOT_DOWNLOADER_MAX_RETRIES")]
49 pub max_retries: u32,
50}
51
52/// Downloads snapshot archives from HTTP/HTTPS and file:// URLs.
53///
54/// Provides secure, reliable downloading with automatic retry logic for network sources
55/// and direct file copying for local sources.
56///
57/// # Features
58///
59/// - **HTTP/HTTPS**: Automatic retry with exponential backoff, progress tracking
60/// - **Local Files**: Direct copying from file:// URLs with validation
61/// - **Safety**: Size limits, disk space checks, timeout protection
62/// - **Monitoring**: Progress reporting and detailed error messages
63///
64/// # Examples
65///
66/// ```no_run
67/// use std::path::Path;
68///
69/// use hopr_chain_indexer::snapshot::download::SnapshotDownloader;
70///
71/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
72/// let downloader = SnapshotDownloader::new()?;
73///
74/// // Download from HTTPS
75/// downloader
76/// .download_snapshot(
77/// "https://snapshots.hoprnet.org/logs.tar.xz",
78/// Path::new("/tmp/snapshot.tar.xz"),
79/// )
80/// .await?;
81///
82/// // Copy from local file
83/// downloader
84/// .download_snapshot("file:///backups/snapshot.tar.xz", Path::new("/tmp/snapshot.tar.xz"))
85/// .await?;
86/// # Ok(())
87/// # }
88/// ```
89pub struct SnapshotDownloader {
90 client: Client,
91 config: DownloadConfig,
92}
93
94impl SnapshotDownloader {
95 /// Creates a new snapshot downloader with default configuration
96 pub fn new() -> SnapshotResult<Self> {
97 Self::with_config(DownloadConfig::default())
98 }
99
100 /// Creates a new snapshot downloader with custom configuration
101 pub fn with_config(config: DownloadConfig) -> SnapshotResult<Self> {
102 Ok(Self {
103 client: Client::builder()
104 .timeout(config.timeout)
105 .user_agent("curl/8.14.1") // acts like curl for compatibility
106 .build()
107 .map_err(SnapshotError::Network)?,
108 config,
109 })
110 }
111
112 /// Downloads a snapshot from the given URL to the target path.
113 ///
114 /// Supports HTTP/HTTPS URLs with retry logic and file:// URLs for local files.
115 ///
116 /// # Arguments
117 ///
118 /// * `url` - Source URL (http://, https://, or file:// scheme)
119 /// * `target_path` - Destination file path
120 ///
121 /// # Errors
122 ///
123 /// Returns [`SnapshotError`] for network failures, file system errors, or validation failures.
124 pub async fn download_snapshot(&self, url: &str, target_path: &Path) -> SnapshotResult<()> {
125 self.download_snapshot_with_retry(url, target_path, self.config.max_retries)
126 .await
127 }
128
129 /// Downloads a snapshot with configurable retry logic.
130 ///
131 /// Implements exponential backoff between retry attempts for HTTP/HTTPS URLs.
132 /// Local file:// URLs are handled without retry logic. Certain errors
133 /// (like 4xx HTTP status codes or insufficient disk space) will not be retried.
134 ///
135 /// # Arguments
136 ///
137 /// * `url` - The HTTP/HTTPS or file:// URL to download/copy from
138 /// * `target_path` - Local path where the downloaded file will be saved
139 /// * `max_retries` - Maximum number of retry attempts (ignored for file:// URLs)
140 ///
141 /// # Errors
142 ///
143 /// Returns `SnapshotError` for various failure conditions including:
144 /// - Network errors (with retry)
145 /// - HTTP errors (4xx without retry, 5xx with retry)
146 /// - Insufficient disk space (without retry)
147 /// - File size exceeding limits (without retry)
148 pub async fn download_snapshot_with_retry(
149 &self,
150 url: &str,
151 target_path: &Path,
152 max_retries: u32,
153 ) -> SnapshotResult<()> {
154 let backoff = backon::ExponentialBuilder::default().with_max_times(max_retries as usize);
155
156 (|| async { self.download_snapshot_once(url, target_path).await })
157 .retry(backoff)
158 .sleep(FuturesTimerSleeper)
159 .when(|err| {
160 !matches!(
161 err,
162 SnapshotError::TooLarge { .. }
163 | SnapshotError::InsufficientSpace { .. }
164 | SnapshotError::HttpStatus { status: 400..=499 },
165 )
166 })
167 .notify(|error, _dur| {
168 error!(%error, "Download attempt failed");
169 })
170 .await
171 }
172
173 /// Performs a single download attempt
174 async fn download_snapshot_once(&self, url: &str, target_path: &Path) -> SnapshotResult<()> {
175 info!(%url, "Downloading logs snapshot file");
176
177 // Check available disk space
178 let parent_dir = target_path
179 .parent()
180 .ok_or_else(|| SnapshotError::InvalidData("Target path has no parent directory".to_string()))?;
181 self.check_disk_space(parent_dir).await?;
182
183 // Handle file:// URLs for local file access
184 if url.starts_with("file://") {
185 return self.copy_local_file(url, target_path).await;
186 }
187
188 // Send GET request
189 let response = self.client.get(url).send().await?;
190
191 // Check response status
192 if !response.status().is_success() {
193 return Err(SnapshotError::HttpStatus {
194 status: response.status().as_u16(),
195 });
196 }
197
198 // Check content length
199 if let Some(content_length) = response.content_length() {
200 if content_length > self.config.max_size {
201 return Err(SnapshotError::TooLarge {
202 size: content_length,
203 max_size: self.config.max_size,
204 });
205 }
206 }
207
208 // Create parent directory if it doesn't exist
209 if let Some(parent) = target_path.parent() {
210 fs::create_dir_all(parent)?;
211 }
212
213 // Fail if content length is not available
214 let total_bytes = response
215 .content_length()
216 .ok_or_else(SnapshotError::ContentLengthMissing)?;
217
218 // Create file writer using futures-io
219 let file = File::create(target_path)?;
220 let file_writer = Arc::new(Mutex::new(AllowStdIo::new(file)));
221
222 // Use AtomicU64 for thread-safe progress tracking
223 let downloaded = Arc::new(AtomicU64::new(0));
224
225 let stream = response.bytes_stream();
226
227 // Process each chunk with progress tracking and size checking
228 stream
229 .map_err(SnapshotError::Network)
230 .try_for_each(|chunk| {
231 let downloaded = downloaded.clone();
232 let file_writer = file_writer.clone();
233 let max_size = self.config.max_size;
234
235 async move {
236 let received_bytes =
237 downloaded.fetch_add(chunk.len() as u64, Ordering::Relaxed) + chunk.len() as u64;
238
239 // Check size limit and abort if exceeded
240 if received_bytes > max_size {
241 return Err(SnapshotError::TooLarge {
242 size: received_bytes,
243 max_size,
244 });
245 }
246
247 // Progress reporting, only per 1MB or at the end
248 let progress = (received_bytes as f64 / total_bytes as f64) * 100.0;
249 if received_bytes % (1024 * 1024) == 0 || received_bytes == total_bytes {
250 debug!(
251 progress = format!("{:.1}%", progress),
252 %received_bytes, %total_bytes, "Logs snapshot download progress"
253 );
254 }
255
256 // Write chunk to file using AsyncWriteExt
257 {
258 let mut writer = file_writer.lock().await;
259 writer.write_all(&chunk).await.map_err(SnapshotError::Io)?;
260 writer.flush().await.map_err(SnapshotError::Io)?;
261 }
262
263 Ok(())
264 }
265 })
266 .await?;
267
268 let downloaded_bytes = downloaded.load(Ordering::Relaxed);
269 info!(%downloaded_bytes, to = %target_path.display(), "Logs snapshot file downloaded");
270
271 Ok(())
272 }
273
274 /// Copies a local file from a file:// URL to the target path.
275 ///
276 /// Validates file existence, checks size limits, and copies the file to the target location.
277 ///
278 /// # Arguments
279 ///
280 /// * `url` - File URL in format `file:///absolute/path/to/file`
281 /// * `target_path` - Destination file path
282 ///
283 /// # Errors
284 ///
285 /// * [`SnapshotError::InvalidData`] - Invalid file:// URL format
286 /// * [`SnapshotError::Io`] - File not found or permission errors
287 /// * [`SnapshotError::TooLarge`] - File exceeds size limit
288 async fn copy_local_file(&self, url: &str, target_path: &Path) -> SnapshotResult<()> {
289 // Parse the file path from the URL
290 let file_path = url
291 .strip_prefix("file://")
292 .ok_or_else(|| SnapshotError::InvalidData("Invalid file:// URL format".to_string()))?;
293
294 let source_path = Path::new(file_path);
295
296 // Validate path to prevent directory traversal
297 let canonical_path = source_path.canonicalize().map_err(SnapshotError::Io)?;
298
299 // Check if source file exists
300 if !canonical_path.exists() {
301 return Err(SnapshotError::Io(std::io::Error::new(
302 std::io::ErrorKind::NotFound,
303 format!("Local file not found: {file_path}"),
304 )));
305 }
306
307 // Check file size
308 let metadata = fs::metadata(canonical_path.clone())?;
309 if metadata.len() > self.config.max_size {
310 return Err(SnapshotError::TooLarge {
311 size: metadata.len(),
312 max_size: self.config.max_size,
313 });
314 }
315
316 // Create parent directory if it doesn't exist
317 if let Some(parent) = target_path.parent() {
318 fs::create_dir_all(parent)?;
319 }
320
321 // Copy the file using futures-io
322 let copied_bytes = fs::copy(canonical_path.clone(), target_path)? as u64;
323 info!(
324 %copied_bytes, from = %canonical_path.display(), to = %target_path.display(),
325 "Copied local snapshot file",
326 );
327
328 Ok(())
329 }
330
331 /// Checks if there's sufficient disk space available for download and extraction.
332 ///
333 /// Validates that the target directory has at least 3x the maximum download size
334 /// available to account for:
335 /// 1. The downloaded archive
336 /// 2. Extracted files
337 /// 3. Safety margin for system operations
338 ///
339 /// # Arguments
340 ///
341 /// * `dir` - Directory to check for available space
342 ///
343 /// # Errors
344 ///
345 /// Returns `SnapshotError::InsufficientSpace` if available space is below requirements
346 pub async fn check_disk_space(&self, dir: &Path) -> SnapshotResult<()> {
347 // Create directory if it doesn't exist
348 fs::create_dir_all(dir)?;
349
350 // Check if directory exists and is accessible
351 let metadata = fs::metadata(dir)?;
352 if !metadata.is_dir() {
353 return Err(SnapshotError::Io(std::io::Error::new(
354 std::io::ErrorKind::NotFound,
355 "Target directory does not exist",
356 )));
357 }
358
359 // Get available disk space
360 let available_bytes = get_available_disk_space(dir)?;
361
362 // We need at least 3x the max size to account for:
363 // 1. Downloaded archive
364 // 2. Extracted files
365 // 3. Safety margin for system operations
366 let required_bytes = self.config.max_size * 3;
367
368 if available_bytes < required_bytes {
369 return Err(SnapshotError::InsufficientSpace {
370 required: required_bytes / (1024 * 1024),
371 available: available_bytes / (1024 * 1024),
372 });
373 }
374
375 Ok(())
376 }
377}
378
379/// Gets available disk space in bytes for the given directory using cross-platform sysinfo.
380///
381/// This function uses the sysinfo crate to provide platform-independent disk space checking.
382/// It finds the disk/mount point that contains the specified directory and returns the
383/// available space on that disk.
384///
385/// # Arguments
386///
387/// * `dir` - Directory path to check (will be canonicalized)
388///
389/// # Returns
390///
391/// Available space in bytes on the disk containing the directory
392///
393/// # Errors
394///
395/// - `SnapshotError::Io` if the path cannot be canonicalized
396/// - `SnapshotError::InvalidData` if no disks are found on the system
397fn get_available_disk_space(dir: &Path) -> SnapshotResult<u64> {
398 // Find the disk that contains the given directory
399 let target_path = dir.canonicalize().map_err(SnapshotError::Io)?;
400
401 // Find the disk with the longest matching mount point
402 let disks = Disks::new_with_refreshed_list();
403
404 // Filter out disks with non matching mount points
405 let mut usable_disks = disks
406 .iter()
407 .filter(|d| target_path.starts_with(d.mount_point()))
408 .collect::<Vec<_>>();
409
410 // Sort disks by mount point length (longest first)
411 usable_disks.sort_by(|a, b| {
412 b.mount_point()
413 .as_os_str()
414 .len()
415 .cmp(&(a.mount_point().as_os_str().len()))
416 });
417
418 // If no usable disks found, return an error
419 usable_disks.first().map_or_else(
420 || {
421 Err(SnapshotError::InvalidData(format!(
422 "Could not determine disk space for path: {dir:?}"
423 )))
424 },
425 |disk| Ok(disk.available_space()),
426 )
427}
428
429#[cfg(test)]
430mod tests {
431 use tempfile::TempDir;
432
433 use super::*;
434
435 #[tokio::test]
436 async fn test_disk_space_validation() {
437 let temp_dir = TempDir::new().unwrap();
438 let downloader = SnapshotDownloader::new().expect("Failed to create SnapshotDownloader");
439
440 // Test with available disk space (this should pass)
441 let result = downloader.check_disk_space(temp_dir.path()).await;
442 assert!(result.is_ok());
443
444 // Test with invalid directory path
445 let invalid_path = temp_dir.path().join("nonexistent/nested/path");
446 let result = downloader.check_disk_space(&invalid_path).await;
447 // Should create the directory and succeed
448 assert!(result.is_ok());
449 }
450
451 #[tokio::test]
452 async fn test_enhanced_error_messages() {
453 let temp_dir = TempDir::new().unwrap();
454 let downloader = SnapshotDownloader::new().expect("Failed to create SnapshotDownloader");
455
456 // Test invalid URL error
457 let result = downloader.download_snapshot("invalid://url", temp_dir.path()).await;
458 assert!(result.is_err());
459
460 // Test file not found error
461 let result = downloader
462 .download_snapshot("https://httpbin.org/status/404", temp_dir.path())
463 .await;
464 assert!(result.is_err());
465 }
466}