From d3750b47d2de59dc65a8d15dfe0948366ac04d64 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Sun, 6 Sep 2020 17:25:49 -0700 Subject: [PATCH] Compress snapshot archive within the validator to reduce system dependencies --- runtime/src/snapshot_utils.rs | 99 +++++++++++++++++++++++------------ 1 file changed, 66 insertions(+), 33 deletions(-) diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 1774dd189b..cb74aa582c 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -19,9 +19,9 @@ use std::{ cmp::Ordering, fmt, fs::{self, File}, - io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write}, + io::{self, BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, - process::ExitStatus, + process::{self, ExitStatus}, str::FromStr, }; use tar::Archive; @@ -193,12 +193,12 @@ pub fn package_snapshot, Q: AsRef>( Ok(package) } -fn get_compression_ext(compression: &CompressionType) -> (&'static str, &'static str) { +fn get_compression_ext(compression: &CompressionType) -> &'static str { match compression { - CompressionType::Bzip2 => ("bzip2", ".tar.bz2"), - CompressionType::Gzip => ("gzip", ".tar.gz"), - CompressionType::Zstd => ("zstd", ".tar.zst"), - CompressionType::NoCompression => ("", ".tar"), + CompressionType::Bzip2 => ".tar.bz2", + CompressionType::Gzip => ".tar.gz", + CompressionType::Zstd => ".tar.zst", + CompressionType::NoCompression => ".tar", } } @@ -257,40 +257,73 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<() // Write version file { - let mut f = std::fs::File::create(staging_version_file)?; + let mut f = fs::File::create(staging_version_file)?; f.write_all(snapshot_package.snapshot_version.as_str().as_bytes())?; } - let (compression_option, file_ext) = get_compression_ext(&snapshot_package.compression); - let archive_options = "cfhS"; + let file_ext = get_compression_ext(&snapshot_package.compression); // Tar the staging directory into the archive at `archive_path` - let archive_file = format!("new_state{}", file_ext); - let archive_path = tar_dir.join(archive_file); - let args = vec![ - archive_options, - archive_path.to_str().unwrap(), - "--use-compress-program", - compression_option, - "-C", - staging_dir.path().to_str().unwrap(), - TAR_ACCOUNTS_DIR, - TAR_SNAPSHOTS_DIR, - TAR_VERSION_FILE, - ]; + // + // system `tar` program is used for -S (sparse file support) + let archive_path = tar_dir.join(format!("new_state{}", file_ext)); - let output = std::process::Command::new("tar").args(&args).output()?; - if !output.status.success() { - warn!("tar command failed with exit code: {}", output.status); - use std::str::from_utf8; - info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?")); - info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?")); + let mut tar = process::Command::new("tar") + .args(&[ + "chS", + "-C", + staging_dir.path().to_str().unwrap(), + TAR_ACCOUNTS_DIR, + TAR_SNAPSHOTS_DIR, + TAR_VERSION_FILE, + ]) + .stdin(process::Stdio::null()) + .stdout(process::Stdio::piped()) + .stderr(process::Stdio::inherit()) + .spawn()?; - return Err(SnapshotError::ArchiveGenerationFailure(output.status)); + match &mut tar.stdout { + None => { + return Err(SnapshotError::IO(IOError::new( + ErrorKind::Other, + "tar stdout unavailable".to_string(), + ))); + } + Some(tar_output) => { + let mut archive_file = fs::File::create(&archive_path)?; + + match snapshot_package.compression { + CompressionType::Bzip2 => { + let mut encoder = + bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::Best); + io::copy(tar_output, &mut encoder)?; + let _ = encoder.finish()?; + } + CompressionType::Gzip => { + let mut encoder = + flate2::write::GzEncoder::new(archive_file, flate2::Compression::default()); + io::copy(tar_output, &mut encoder)?; + let _ = encoder.finish()?; + } + CompressionType::NoCompression => { + io::copy(tar_output, &mut archive_file)?; + } + CompressionType::Zstd => { + let mut encoder = zstd::stream::Encoder::new(archive_file, 0)?; + io::copy(tar_output, &mut encoder)?; + let _ = encoder.finish()?; + } + }; + } } - // Once everything is successful, overwrite the previous tarball so that other validators - // can fetch this newly packaged snapshot + let tar_exit_status = tar.wait()?; + if !tar_exit_status.success() { + warn!("tar command failed with exit code: {}", tar_exit_status); + return Err(SnapshotError::ArchiveGenerationFailure(tar_exit_status)); + } + + // Atomically move the archive into position for other validators to find let metadata = fs::metadata(&archive_path)?; fs::rename(&archive_path, &snapshot_package.tar_output_file)?; @@ -587,7 +620,7 @@ pub fn get_snapshot_archive_path>( "snapshot-{}-{}{}", snapshot_hash.0, snapshot_hash.1, - get_compression_ext(compression).1, + get_compression_ext(compression), )) }