hopr_chain_indexer/snapshot/
download.rs

1//! Secure snapshot downloading with HTTP/HTTPS and local file support.
2//!
3//! Provides robust download capabilities for snapshot archives with:
4//! - **URL Support**: HTTP/HTTPS with retry logic, local file:// URLs
5//! - **Safety**: Size limits, timeout protection, disk space validation
6//! - **Reliability**: Exponential backoff, progress tracking, error recovery
7//! - **Cross-platform**: Uses sysinfo for disk space checking
8
9use std::{
10    fs,
11    fs::File,
12    path::Path,
13    sync::{
14        Arc,
15        atomic::{AtomicU64, Ordering},
16    },
17    time::Duration,
18};
19
20use async_lock::Mutex;
21use backon::{FuturesTimerSleeper, Retryable};
22use futures_util::{AsyncWriteExt, TryStreamExt, io::AllowStdIo};
23use reqwest::Client;
24use smart_default::SmartDefault;
25use sysinfo::Disks;
26use tracing::{debug, error, info};
27
28use crate::{
29    constants::{
30        LOGS_SNAPSHOT_DOWNLOADER_MAX_RETRIES, LOGS_SNAPSHOT_DOWNLOADER_MAX_SIZE, LOGS_SNAPSHOT_DOWNLOADER_TIMEOUT,
31    },
32    snapshot::error::{SnapshotError, SnapshotResult},
33};
34
35/// Configuration for snapshot downloads with safety limits.
36///
37/// Controls download behavior including size limits, timeouts, and retry attempts
38/// to ensure safe and reliable snapshot downloads.
39#[derive(Debug, Clone, SmartDefault)]
40pub struct DownloadConfig {
41    /// Maximum allowed file size in bytes
42    #[default(_code = "LOGS_SNAPSHOT_DOWNLOADER_MAX_SIZE")]
43    pub max_size: u64,
44    /// HTTP request timeout duration
45    #[default(_code = "LOGS_SNAPSHOT_DOWNLOADER_TIMEOUT")]
46    pub timeout: Duration,
47    /// Maximum number of retry attempts for failed downloads
48    #[default(_code = "LOGS_SNAPSHOT_DOWNLOADER_MAX_RETRIES")]
49    pub max_retries: u32,
50}
51
52/// Downloads snapshot archives from HTTP/HTTPS and file:// URLs.
53///
54/// Provides secure, reliable downloading with automatic retry logic for network sources
55/// and direct file copying for local sources.
56///
57/// # Features
58///
59/// - **HTTP/HTTPS**: Automatic retry with exponential backoff, progress tracking
60/// - **Local Files**: Direct copying from file:// URLs with validation
61/// - **Safety**: Size limits, disk space checks, timeout protection
62/// - **Monitoring**: Progress reporting and detailed error messages
63///
64/// # Examples
65///
66/// ```no_run
67/// use std::path::Path;
68///
69/// use hopr_chain_indexer::snapshot::download::SnapshotDownloader;
70///
71/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
72/// let downloader = SnapshotDownloader::new()?;
73///
74/// // Download from HTTPS
75/// downloader
76///     .download_snapshot(
77///         "https://snapshots.hoprnet.org/logs.tar.xz",
78///         Path::new("/tmp/snapshot.tar.xz"),
79///     )
80///     .await?;
81///
82/// // Copy from local file
83/// downloader
84///     .download_snapshot("file:///backups/snapshot.tar.xz", Path::new("/tmp/snapshot.tar.xz"))
85///     .await?;
86/// # Ok(())
87/// # }
88/// ```
89pub struct SnapshotDownloader {
90    client: Client,
91    config: DownloadConfig,
92}
93
94impl SnapshotDownloader {
95    /// Creates a new snapshot downloader with default configuration
96    pub fn new() -> SnapshotResult<Self> {
97        Self::with_config(DownloadConfig::default())
98    }
99
100    /// Creates a new snapshot downloader with custom configuration
101    pub fn with_config(config: DownloadConfig) -> SnapshotResult<Self> {
102        Ok(Self {
103            client: Client::builder()
104                .timeout(config.timeout)
105                .user_agent("curl/8.14.1") // acts like curl for compatibility
106                .build()
107                .map_err(SnapshotError::Network)?,
108            config,
109        })
110    }
111
112    /// Downloads a snapshot from the given URL to the target path.
113    ///
114    /// Supports HTTP/HTTPS URLs with retry logic and file:// URLs for local files.
115    ///
116    /// # Arguments
117    ///
118    /// * `url` - Source URL (http://, https://, or file:// scheme)
119    /// * `target_path` - Destination file path
120    ///
121    /// # Errors
122    ///
123    /// Returns [`SnapshotError`] for network failures, file system errors, or validation failures.
124    pub async fn download_snapshot(&self, url: &str, target_path: &Path) -> SnapshotResult<()> {
125        self.download_snapshot_with_retry(url, target_path, self.config.max_retries)
126            .await
127    }
128
129    /// Downloads a snapshot with configurable retry logic.
130    ///
131    /// Implements exponential backoff between retry attempts for HTTP/HTTPS URLs.
132    /// Local file:// URLs are handled without retry logic. Certain errors
133    /// (like 4xx HTTP status codes or insufficient disk space) will not be retried.
134    ///
135    /// # Arguments
136    ///
137    /// * `url` - The HTTP/HTTPS or file:// URL to download/copy from
138    /// * `target_path` - Local path where the downloaded file will be saved
139    /// * `max_retries` - Maximum number of retry attempts (ignored for file:// URLs)
140    ///
141    /// # Errors
142    ///
143    /// Returns `SnapshotError` for various failure conditions including:
144    /// - Network errors (with retry)
145    /// - HTTP errors (4xx without retry, 5xx with retry)
146    /// - Insufficient disk space (without retry)
147    /// - File size exceeding limits (without retry)
148    pub async fn download_snapshot_with_retry(
149        &self,
150        url: &str,
151        target_path: &Path,
152        max_retries: u32,
153    ) -> SnapshotResult<()> {
154        let backoff = backon::ExponentialBuilder::default().with_max_times(max_retries as usize);
155
156        (|| async { self.download_snapshot_once(url, target_path).await })
157            .retry(backoff)
158            .sleep(FuturesTimerSleeper)
159            .when(|err| {
160                !matches!(
161                    err,
162                    SnapshotError::TooLarge { .. }
163                        | SnapshotError::InsufficientSpace { .. }
164                        | SnapshotError::HttpStatus { status: 400..=499 },
165                )
166            })
167            .notify(|error, _dur| {
168                error!(%error, "Download attempt failed");
169            })
170            .await
171    }
172
173    /// Performs a single download attempt
174    async fn download_snapshot_once(&self, url: &str, target_path: &Path) -> SnapshotResult<()> {
175        info!(%url, "Downloading logs snapshot file");
176
177        // Check available disk space
178        let parent_dir = target_path
179            .parent()
180            .ok_or_else(|| SnapshotError::InvalidData("Target path has no parent directory".to_string()))?;
181        self.check_disk_space(parent_dir).await?;
182
183        // Handle file:// URLs for local file access
184        if url.starts_with("file://") {
185            return self.copy_local_file(url, target_path).await;
186        }
187
188        // Send GET request
189        let response = self.client.get(url).send().await?;
190
191        // Check response status
192        if !response.status().is_success() {
193            return Err(SnapshotError::HttpStatus {
194                status: response.status().as_u16(),
195            });
196        }
197
198        // Check content length
199        if let Some(content_length) = response.content_length() {
200            if content_length > self.config.max_size {
201                return Err(SnapshotError::TooLarge {
202                    size: content_length,
203                    max_size: self.config.max_size,
204                });
205            }
206        }
207
208        // Create parent directory if it doesn't exist
209        if let Some(parent) = target_path.parent() {
210            fs::create_dir_all(parent)?;
211        }
212
213        // Fail if content length is not available
214        let total_bytes = response
215            .content_length()
216            .ok_or_else(SnapshotError::ContentLengthMissing)?;
217
218        // Create file writer using futures-io
219        let file = File::create(target_path)?;
220        let file_writer = Arc::new(Mutex::new(AllowStdIo::new(file)));
221
222        // Use AtomicU64 for thread-safe progress tracking
223        let downloaded = Arc::new(AtomicU64::new(0));
224
225        let stream = response.bytes_stream();
226
227        // Process each chunk with progress tracking and size checking
228        stream
229            .map_err(SnapshotError::Network)
230            .try_for_each(|chunk| {
231                let downloaded = downloaded.clone();
232                let file_writer = file_writer.clone();
233                let max_size = self.config.max_size;
234
235                async move {
236                    let received_bytes =
237                        downloaded.fetch_add(chunk.len() as u64, Ordering::Relaxed) + chunk.len() as u64;
238
239                    // Check size limit and abort if exceeded
240                    if received_bytes > max_size {
241                        return Err(SnapshotError::TooLarge {
242                            size: received_bytes,
243                            max_size,
244                        });
245                    }
246
247                    // Progress reporting, only per 1MB or at the end
248                    let progress = (received_bytes as f64 / total_bytes as f64) * 100.0;
249                    if received_bytes % (1024 * 1024) == 0 || received_bytes == total_bytes {
250                        debug!(
251                            progress = format!("{:.1}%", progress),
252                            %received_bytes, %total_bytes, "Logs snapshot download progress"
253                        );
254                    }
255
256                    // Write chunk to file using AsyncWriteExt
257                    {
258                        let mut writer = file_writer.lock().await;
259                        writer.write_all(&chunk).await.map_err(SnapshotError::Io)?;
260                        writer.flush().await.map_err(SnapshotError::Io)?;
261                    }
262
263                    Ok(())
264                }
265            })
266            .await?;
267
268        let downloaded_bytes = downloaded.load(Ordering::Relaxed);
269        info!(%downloaded_bytes, to = %target_path.display(), "Logs snapshot file downloaded");
270
271        Ok(())
272    }
273
274    /// Copies a local file from a file:// URL to the target path.
275    ///
276    /// Validates file existence, checks size limits, and copies the file to the target location.
277    ///
278    /// # Arguments
279    ///
280    /// * `url` - File URL in format `file:///absolute/path/to/file`
281    /// * `target_path` - Destination file path
282    ///
283    /// # Errors
284    ///
285    /// * [`SnapshotError::InvalidData`] - Invalid file:// URL format
286    /// * [`SnapshotError::Io`] - File not found or permission errors
287    /// * [`SnapshotError::TooLarge`] - File exceeds size limit
288    async fn copy_local_file(&self, url: &str, target_path: &Path) -> SnapshotResult<()> {
289        // Parse the file path from the URL
290        let file_path = url
291            .strip_prefix("file://")
292            .ok_or_else(|| SnapshotError::InvalidData("Invalid file:// URL format".to_string()))?;
293
294        let source_path = Path::new(file_path);
295
296        // Validate path to prevent directory traversal
297        let canonical_path = source_path.canonicalize().map_err(SnapshotError::Io)?;
298
299        // Check if source file exists
300        if !canonical_path.exists() {
301            return Err(SnapshotError::Io(std::io::Error::new(
302                std::io::ErrorKind::NotFound,
303                format!("Local file not found: {file_path}"),
304            )));
305        }
306
307        // Check file size
308        let metadata = fs::metadata(canonical_path.clone())?;
309        if metadata.len() > self.config.max_size {
310            return Err(SnapshotError::TooLarge {
311                size: metadata.len(),
312                max_size: self.config.max_size,
313            });
314        }
315
316        // Create parent directory if it doesn't exist
317        if let Some(parent) = target_path.parent() {
318            fs::create_dir_all(parent)?;
319        }
320
321        // Copy the file using futures-io
322        let copied_bytes = fs::copy(canonical_path.clone(), target_path)? as u64;
323        info!(
324            %copied_bytes, from = %canonical_path.display(), to = %target_path.display(),
325            "Copied local snapshot file",
326        );
327
328        Ok(())
329    }
330
331    /// Checks if there's sufficient disk space available for download and extraction.
332    ///
333    /// Validates that the target directory has at least 3x the maximum download size
334    /// available to account for:
335    /// 1. The downloaded archive
336    /// 2. Extracted files
337    /// 3. Safety margin for system operations
338    ///
339    /// # Arguments
340    ///
341    /// * `dir` - Directory to check for available space
342    ///
343    /// # Errors
344    ///
345    /// Returns `SnapshotError::InsufficientSpace` if available space is below requirements
346    pub async fn check_disk_space(&self, dir: &Path) -> SnapshotResult<()> {
347        // Create directory if it doesn't exist
348        fs::create_dir_all(dir)?;
349
350        // Check if directory exists and is accessible
351        let metadata = fs::metadata(dir)?;
352        if !metadata.is_dir() {
353            return Err(SnapshotError::Io(std::io::Error::new(
354                std::io::ErrorKind::NotFound,
355                "Target directory does not exist",
356            )));
357        }
358
359        // Get available disk space
360        let available_bytes = get_available_disk_space(dir)?;
361
362        // We need at least 3x the max size to account for:
363        // 1. Downloaded archive
364        // 2. Extracted files
365        // 3. Safety margin for system operations
366        let required_bytes = self.config.max_size * 3;
367
368        if available_bytes < required_bytes {
369            return Err(SnapshotError::InsufficientSpace {
370                required: required_bytes / (1024 * 1024),
371                available: available_bytes / (1024 * 1024),
372            });
373        }
374
375        Ok(())
376    }
377}
378
379/// Gets available disk space in bytes for the given directory using cross-platform sysinfo.
380///
381/// This function uses the sysinfo crate to provide platform-independent disk space checking.
382/// It finds the disk/mount point that contains the specified directory and returns the
383/// available space on that disk.
384///
385/// # Arguments
386///
387/// * `dir` - Directory path to check (will be canonicalized)
388///
389/// # Returns
390///
391/// Available space in bytes on the disk containing the directory
392///
393/// # Errors
394///
395/// - `SnapshotError::Io` if the path cannot be canonicalized
396/// - `SnapshotError::InvalidData` if no disks are found on the system
397fn get_available_disk_space(dir: &Path) -> SnapshotResult<u64> {
398    // Find the disk that contains the given directory
399    let target_path = dir.canonicalize().map_err(SnapshotError::Io)?;
400
401    // Find the disk with the longest matching mount point
402    let disks = Disks::new_with_refreshed_list();
403
404    // Filter out disks with non matching mount points
405    let mut usable_disks = disks
406        .iter()
407        .filter(|d| target_path.starts_with(d.mount_point()))
408        .collect::<Vec<_>>();
409
410    // Sort disks by mount point length (longest first)
411    usable_disks.sort_by(|a, b| {
412        b.mount_point()
413            .as_os_str()
414            .len()
415            .cmp(&(a.mount_point().as_os_str().len()))
416    });
417
418    // If no usable disks found, return an error
419    usable_disks.first().map_or_else(
420        || {
421            Err(SnapshotError::InvalidData(format!(
422                "Could not determine disk space for path: {dir:?}"
423            )))
424        },
425        |disk| Ok(disk.available_space()),
426    )
427}
428
429#[cfg(test)]
430mod tests {
431    use tempfile::TempDir;
432
433    use super::*;
434
435    #[tokio::test]
436    async fn test_disk_space_validation() {
437        let temp_dir = TempDir::new().unwrap();
438        let downloader = SnapshotDownloader::new().expect("Failed to create SnapshotDownloader");
439
440        // Test with available disk space (this should pass)
441        let result = downloader.check_disk_space(temp_dir.path()).await;
442        assert!(result.is_ok());
443
444        // Test with invalid directory path
445        let invalid_path = temp_dir.path().join("nonexistent/nested/path");
446        let result = downloader.check_disk_space(&invalid_path).await;
447        // Should create the directory and succeed
448        assert!(result.is_ok());
449    }
450
451    #[tokio::test]
452    async fn test_enhanced_error_messages() {
453        let temp_dir = TempDir::new().unwrap();
454        let downloader = SnapshotDownloader::new().expect("Failed to create SnapshotDownloader");
455
456        // Test invalid URL error
457        let result = downloader.download_snapshot("invalid://url", temp_dir.path()).await;
458        assert!(result.is_err());
459
460        // Test file not found error
461        let result = downloader
462            .download_snapshot("https://httpbin.org/status/404", temp_dir.path())
463            .await;
464        assert!(result.is_err());
465    }
466}