From 9493de4443c02d4c670a62a595392d9df403d24f Mon Sep 17 00:00:00 2001 From: sakridge Date: Fri, 3 Apr 2020 13:13:49 -0700 Subject: [PATCH] Add snapshot compression option (#9276) --- Cargo.lock | 32 +++++++ core/src/accounts_hash_verifier.rs | 2 + core/src/rpc_service.rs | 8 +- core/src/snapshot_packager_service.rs | 10 ++- core/tests/bank_forks.rs | 7 ++ download-utils/src/lib.rs | 56 ++++++++---- ledger-tool/src/main.rs | 3 + ledger/Cargo.toml | 2 + ledger/src/bank_forks.rs | 11 +++ ledger/src/bank_forks_utils.rs | 5 +- ledger/src/snapshot_package.rs | 4 + ledger/src/snapshot_utils.rs | 125 +++++++++++++++++--------- local-cluster/tests/local_cluster.rs | 9 +- validator/src/main.rs | 24 ++++- 14 files changed, 228 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 20eadbeb6..7fba057cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4181,6 +4181,7 @@ dependencies = [ "crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "dir-diff 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "ed25519-dalek 1.0.0-pre.1 (registry+https://github.com/rust-lang/crates.io-index)", + "flate2 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", "fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4214,6 +4215,7 @@ dependencies = [ "tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "thiserror 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", + "zstd 0.5.1+zstd.1.4.4 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -6249,6 +6251,33 @@ dependencies = [ "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "zstd" +version = "0.5.1+zstd.1.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "zstd-safe 2.0.3+zstd.1.4.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "zstd-safe" +version = "2.0.3+zstd.1.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", + "zstd-sys 1.4.15+zstd.1.4.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "zstd-sys" +version = "1.4.15+zstd.1.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cc 1.0.49 (registry+https://github.com/rust-lang/crates.io-index)", + "glob 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", +] + [metadata] "checksum adler32 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" "checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66" @@ -6801,3 +6830,6 @@ dependencies = [ "checksum x25519-dalek 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7ee1585dc1484373cbc1cee7aafda26634665cf449436fd6e24bfd1fad230538" "checksum xattr 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "244c3741f4240ef46274860397c7c74e50eb23624996930e484c16679633a54c" "checksum yaml-rust 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "65923dd1784f44da1d2c3dbbc5e822045628c590ba72123e1c73d3c230c4434d" +"checksum zstd 0.5.1+zstd.1.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5c5d978b793ae64375b80baf652919b148f6a496ac8802922d9999f5a553194f" +"checksum zstd-safe 2.0.3+zstd.1.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "bee25eac9753cfedd48133fa1736cbd23b774e253d89badbeac7d12b23848d3f" +"checksum zstd-sys 1.4.15+zstd.1.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "89719b034dc22d240d5b407fb0a3fe6d29952c181cff9a9f95c0bd40b4f8f7d8" diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index 70d1df094..9817d0713 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -170,6 +170,7 @@ mod tests { use super::*; use crate::cluster_info::make_accounts_hashes_message; use crate::contact_info::ContactInfo; + use solana_ledger::bank_forks::CompressionType; use solana_sdk::{ hash::hash, signature::{Keypair, Signer}, @@ -231,6 +232,7 @@ mod tests { snapshot_links, tar_output_file: PathBuf::from("."), storages: vec![], + compression: CompressionType::Bzip2, }; AccountsHashVerifier::process_snapshot( diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 839e9e781..f3887f012 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -43,8 +43,10 @@ impl RpcRequestMiddleware { pub fn new(ledger_path: PathBuf, snapshot_config: Option) -> Self { Self { ledger_path, - snapshot_archive_path_regex: Regex::new(r"/snapshot-\d+-[[:alnum:]]+\.tar\.bz2$") - .unwrap(), + snapshot_archive_path_regex: Regex::new( + r"/snapshot-\d+-[[:alnum:]]+\.tar\.(bz2|zst|gz)$", + ) + .unwrap(), snapshot_config, } } @@ -249,6 +251,7 @@ impl JsonRpcService { mod tests { use super::*; use crate::{contact_info::ContactInfo, rpc::tests::create_validator_exit}; + use solana_ledger::bank_forks::CompressionType; use solana_ledger::{ genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, @@ -319,6 +322,7 @@ mod tests { snapshot_interval_slots: 0, snapshot_package_output_path: PathBuf::from("/"), snapshot_path: PathBuf::from("/"), + compression: CompressionType::Bzip2, }), ); diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index 0d6974b1b..11e6b715c 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -84,6 +84,7 @@ impl SnapshotPackagerService { mod tests { use super::*; use bincode::serialize_into; + use solana_ledger::bank_forks::CompressionType; use solana_ledger::{ snapshot_package::SnapshotPackage, snapshot_utils::{self, SNAPSHOT_STATUS_CACHE_FILE_NAME}, @@ -169,6 +170,7 @@ mod tests { let output_tar_path = snapshot_utils::get_snapshot_archive_path( &snapshot_package_output_path, &(42, Hash::default()), + &CompressionType::Bzip2, ); let snapshot_package = SnapshotPackage::new( 5, @@ -177,6 +179,7 @@ mod tests { vec![storage_entries], output_tar_path.clone(), Hash::default(), + CompressionType::Bzip2, ); // Make tarball from packageable snapshot @@ -197,6 +200,11 @@ mod tests { .unwrap(); // Check archive is correct - snapshot_utils::verify_snapshot_archive(output_tar_path, snapshots_dir, accounts_dir); + snapshot_utils::verify_snapshot_archive( + output_tar_path, + snapshots_dir, + accounts_dir, + CompressionType::Bzip2, + ); } } diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index dc42ab8d9..24c286354 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -8,6 +8,7 @@ mod tests { use solana_core::cluster_info::ClusterInfo; use solana_core::contact_info::ContactInfo; use solana_core::snapshot_packager_service::SnapshotPackagerService; + use solana_ledger::bank_forks::CompressionType; use solana_ledger::{ bank_forks::{BankForks, SnapshotConfig}, genesis_utils::{create_genesis_config, GenesisConfigInfo}, @@ -54,6 +55,7 @@ mod tests { snapshot_interval_slots, snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()), snapshot_path: PathBuf::from(snapshot_dir.path()), + compression: CompressionType::Bzip2, }; bank_forks.set_snapshot_config(Some(snapshot_config.clone())); SnapshotTestConfig { @@ -90,7 +92,9 @@ mod tests { snapshot_utils::get_snapshot_archive_path( snapshot_package_output_path, &(old_last_bank.slot(), old_last_bank.get_accounts_hash()), + &CompressionType::Bzip2, ), + CompressionType::Bzip2, ) .unwrap(); @@ -152,6 +156,7 @@ mod tests { &last_bank.src.roots(), &snapshot_config.snapshot_package_output_path, storages, + CompressionType::Bzip2, ) .unwrap(); @@ -290,6 +295,7 @@ mod tests { saved_archive_path = Some(snapshot_utils::get_snapshot_archive_path( &snapshot_config.snapshot_package_output_path, &(slot, accounts_hash), + &CompressionType::Bzip2, )); } } @@ -352,6 +358,7 @@ mod tests { saved_accounts_dir .path() .join(accounts_dir.path().file_name().unwrap()), + CompressionType::Bzip2, ); } diff --git a/download-utils/src/lib.rs b/download-utils/src/lib.rs index 57bab5db6..9d9aa0bd4 100644 --- a/download-utils/src/lib.rs +++ b/download-utils/src/lib.rs @@ -1,6 +1,7 @@ use console::Emoji; use indicatif::{ProgressBar, ProgressStyle}; use log::*; +use solana_ledger::bank_forks::CompressionType; use solana_sdk::clock::Slot; use solana_sdk::hash::Hash; use std::fs::{self, File}; @@ -133,32 +134,49 @@ pub fn download_snapshot( ) -> Result<(), String> { // Remove all snapshot not matching the desired hash let snapshot_packages = solana_ledger::snapshot_utils::get_snapshot_archives(ledger_path); - for (snapshot_package, snapshot_hash) in snapshot_packages.iter() { - if *snapshot_hash != desired_snapshot_hash { + let mut found_package = false; + for (snapshot_package, (snapshot_slot, snapshot_hash, _compression)) in snapshot_packages.iter() + { + if (*snapshot_slot, *snapshot_hash) != desired_snapshot_hash { info!("Removing old snapshot: {:?}", snapshot_package); fs::remove_file(snapshot_package) .unwrap_or_else(|err| info!("Failed to remove old snapshot: {:}", err)); + } else { + found_package = true; } } - let desired_snapshot_package = solana_ledger::snapshot_utils::get_snapshot_archive_path( - ledger_path, - &desired_snapshot_hash, - ); - if desired_snapshot_package.exists() { + if found_package { Ok(()) } else { - download_file( - &format!( - "http://{}/{}", - rpc_addr, - desired_snapshot_package - .file_name() - .unwrap() - .to_str() - .unwrap() - ), - &desired_snapshot_package, - ) + for compression in &[ + CompressionType::Zstd, + CompressionType::Gzip, + CompressionType::Bzip2, + ] { + let desired_snapshot_package = solana_ledger::snapshot_utils::get_snapshot_archive_path( + ledger_path, + &desired_snapshot_hash, + compression, + ); + + if download_file( + &format!( + "http://{}/{}", + rpc_addr, + desired_snapshot_package + .file_name() + .unwrap() + .to_str() + .unwrap() + ), + &desired_snapshot_package, + ) + .is_ok() + { + return Ok(()); + } + } + Err("Snapshot couldn't be downloaded".to_string()) } } diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 9e23766d9..ff84298f2 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -5,6 +5,7 @@ use clap::{ use histogram; use serde_json::json; use solana_clap_utils::input_validators::is_slot; +use solana_ledger::bank_forks::CompressionType; use solana_ledger::{ bank_forks::{BankForks, SnapshotConfig}, bank_forks_utils, @@ -615,6 +616,7 @@ fn load_bank_forks( snapshot_interval_slots: 0, // Value doesn't matter snapshot_package_output_path: ledger_path.clone(), snapshot_path: ledger_path.clone().join("snapshot"), + compression: CompressionType::Bzip2, }) }; let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") { @@ -1043,6 +1045,7 @@ fn main() { &bank.src.roots(), output_directory, storages, + CompressionType::Bzip2, ) }) .and_then(|package| { diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 426c20dc5..bc629b1eb 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -16,6 +16,8 @@ chrono = { version = "0.4.11", features = ["serde"] } crossbeam-channel = "0.4" dir-diff = "0.3.2" sha2 = "0.8.1" +flate2 = "1.0.14" +zstd = "0.5.1" fs_extra = "1.1.0" itertools = "0.9.0" libc = "0.2.68" diff --git a/ledger/src/bank_forks.rs b/ledger/src/bank_forks.rs index 0c904a935..097fd5f7d 100644 --- a/ledger/src/bank_forks.rs +++ b/ledger/src/bank_forks.rs @@ -16,6 +16,14 @@ use std::{ }; use thiserror::Error; +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum CompressionType { + Bzip2, + Gzip, + Zstd, + NoCompression, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub struct SnapshotConfig { // Generate a new snapshot every this many slots @@ -26,6 +34,8 @@ pub struct SnapshotConfig { // Where to place the snapshots for recent slots pub snapshot_path: PathBuf, + + pub compression: CompressionType, } #[derive(Error, Debug)] @@ -288,6 +298,7 @@ impl BankForks { slots_to_snapshot, &config.snapshot_package_output_path, storages, + config.compression.clone(), )?; // Send the package to the packaging thread diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 109a32ee4..1db862610 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -55,7 +55,7 @@ pub fn load( match snapshot_utils::get_highest_snapshot_archive_path( &snapshot_config.snapshot_package_output_path, ) { - Some((archive_filename, archive_snapshot_hash)) => { + Some((archive_filename, (archive_slot, archive_snapshot_hash, compression))) => { info!("Loading snapshot package: {:?}", archive_filename); // Fail hard here if snapshot fails to load, don't silently continue @@ -69,6 +69,7 @@ pub fn load( &process_options.frozen_accounts, &snapshot_config.snapshot_path, &archive_filename, + compression, ) .expect("Load from snapshot failed"); @@ -77,7 +78,7 @@ pub fn load( deserialized_bank.get_accounts_hash(), ); - if deserialized_snapshot_hash != archive_snapshot_hash { + if deserialized_snapshot_hash != (archive_slot, archive_snapshot_hash) { error!( "Snapshot has mismatch:\narchive: {:?}\ndeserialized: {:?}", archive_snapshot_hash, deserialized_snapshot_hash diff --git a/ledger/src/snapshot_package.rs b/ledger/src/snapshot_package.rs index dd875900e..32f12e34d 100644 --- a/ledger/src/snapshot_package.rs +++ b/ledger/src/snapshot_package.rs @@ -1,3 +1,4 @@ +use crate::bank_forks::CompressionType; use solana_runtime::{accounts_db::SnapshotStorages, bank::BankSlotDelta}; use solana_sdk::clock::Slot; use solana_sdk::hash::Hash; @@ -19,6 +20,7 @@ pub struct SnapshotPackage { pub storages: SnapshotStorages, pub tar_output_file: PathBuf, pub hash: Hash, + pub compression: CompressionType, } impl SnapshotPackage { @@ -29,6 +31,7 @@ impl SnapshotPackage { storages: SnapshotStorages, tar_output_file: PathBuf, hash: Hash, + compression: CompressionType, ) -> Self { Self { root, @@ -37,6 +40,7 @@ impl SnapshotPackage { storages, tar_output_file, hash, + compression, } } } diff --git a/ledger/src/snapshot_utils.rs b/ledger/src/snapshot_utils.rs index 80dc187aa..8adb93aeb 100644 --- a/ledger/src/snapshot_utils.rs +++ b/ledger/src/snapshot_utils.rs @@ -1,7 +1,9 @@ +use crate::bank_forks::CompressionType; use crate::hardened_unpack::{unpack_snapshot, UnpackError}; use crate::snapshot_package::SnapshotPackage; use bincode::serialize_into; use bzip2::bufread::BzDecoder; +use flate2::read::GzDecoder; use fs_extra::dir::CopyOptions; use log::*; use regex::Regex; @@ -16,7 +18,6 @@ use solana_runtime::{ use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; use std::{ cmp::Ordering, - env, fs::{self, File}, io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, @@ -25,6 +26,7 @@ use std::{ use tar::Archive; use tempfile::TempDir; use thiserror::Error; +use zstd; pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache"; pub const TAR_SNAPSHOTS_DIR: &str = "snapshots"; @@ -90,6 +92,7 @@ pub fn package_snapshot, Q: AsRef>( slots_to_snapshot: &[Slot], snapshot_package_output_path: P, snapshot_storages: SnapshotStorages, + compression: CompressionType, ) -> Result { // Hard link all the snapshots we need for this package let snapshot_hard_links_dir = tempfile::tempdir_in(snapshot_path)?; @@ -108,6 +111,7 @@ pub fn package_snapshot, Q: AsRef>( let snapshot_package_output_file = get_snapshot_archive_path( &snapshot_package_output_path, &(bank.slot(), bank.get_accounts_hash()), + &compression, ); let package = SnapshotPackage::new( @@ -117,11 +121,21 @@ pub fn package_snapshot, Q: AsRef>( snapshot_storages, snapshot_package_output_file, bank.get_accounts_hash(), + compression, ); Ok(package) } +fn get_compression_ext(compression: &CompressionType) -> (&'static str, &'static str) { + match compression { + CompressionType::Bzip2 => ("bzip2", ".tar.bz2"), + CompressionType::Gzip => ("gzip", ".tar.gz"), + CompressionType::Zstd => ("zstd", ".tar.zst"), + CompressionType::NoCompression => ("", ".tar"), + } +} + pub fn archive_snapshot_package(snapshot_package: &SnapshotPackage) -> Result<()> { info!( "Generating snapshot archive for slot {}", @@ -181,23 +195,22 @@ pub fn archive_snapshot_package(snapshot_package: &SnapshotPackage) -> Result<() f.write_all(&SNAPSHOT_VERSION.to_string().into_bytes())?; } - let archive_compress_options = if is_snapshot_compression_disabled() { - "" - } else { - "j" - }; - let archive_options = format!("{}cfhS", archive_compress_options); + let (compression_option, file_ext) = get_compression_ext(&snapshot_package.compression); + let archive_options = "cfhS"; // Tar the staging directory into the archive at `archive_path` - let archive_path = tar_dir.join("new_state.tar.bz2"); + let archive_file = format!("new_state{}", file_ext); + let archive_path = tar_dir.join(archive_file); let args = vec![ - archive_options.as_str(), + archive_options, archive_path.to_str().unwrap(), "-C", staging_dir.path().to_str().unwrap(), TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR, TAR_VERSION_FILE, + "-I", + compression_option, ]; let output = std::process::Command::new("tar").args(&args).output()?; @@ -439,10 +452,11 @@ pub fn bank_from_archive>( frozen_account_pubkeys: &[Pubkey], snapshot_path: &PathBuf, snapshot_tar: P, + compression: CompressionType, ) -> Result { // Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()` let unpack_dir = tempfile::tempdir_in(snapshot_path)?; - untar_snapshot_in(&snapshot_tar, &unpack_dir)?; + untar_snapshot_in(&snapshot_tar, &unpack_dir, compression)?; let mut measure = Measure::start("bank rebuild from snapshot"); let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR); @@ -483,33 +497,43 @@ pub fn bank_from_archive>( Ok(bank) } -fn is_snapshot_compression_disabled() -> bool { - if let Ok(flag) = env::var("SOLANA_DISABLE_SNAPSHOT_COMPRESSION") { - !(flag == "0" || flag == "false") - } else { - false - } -} - pub fn get_snapshot_archive_path>( snapshot_output_dir: P, snapshot_hash: &(Slot, Hash), + compression: &CompressionType, ) -> PathBuf { snapshot_output_dir.as_ref().join(format!( - "snapshot-{}-{}.tar.bz2", - snapshot_hash.0, snapshot_hash.1 + "snapshot-{}-{}{}", + snapshot_hash.0, + snapshot_hash.1, + get_compression_ext(compression).1, )) } -fn snapshot_hash_of(archive_filename: &str) -> Option<(Slot, Hash)> { - let snapshot_filename_regex = Regex::new(r"snapshot-(\d+)-([[:alnum:]]+)\.tar\.bz2$").unwrap(); +fn compression_type_from_str(compress: &str) -> Option { + match compress { + "bz2" => Some(CompressionType::Bzip2), + "gz" => Some(CompressionType::Gzip), + "zst" => Some(CompressionType::Zstd), + _ => None, + } +} + +fn snapshot_hash_of(archive_filename: &str) -> Option<(Slot, Hash, CompressionType)> { + let snapshot_filename_regex = + Regex::new(r"snapshot-(\d+)-([[:alnum:]]+)\.tar\.(bz2|zst|gz)$").unwrap(); if let Some(captures) = snapshot_filename_regex.captures(archive_filename) { let slot_str = captures.get(1).unwrap().as_str(); let hash_str = captures.get(2).unwrap().as_str(); + let ext = captures.get(3).unwrap().as_str(); - if let (Ok(slot), Ok(hash)) = (slot_str.parse::(), hash_str.parse::()) { - return Some((slot, hash)); + if let (Ok(slot), Ok(hash), Some(compression)) = ( + slot_str.parse::(), + hash_str.parse::(), + compression_type_from_str(ext), + ) { + return Some((slot, hash, compression)); } } None @@ -517,7 +541,7 @@ fn snapshot_hash_of(archive_filename: &str) -> Option<(Slot, Hash)> { pub fn get_snapshot_archives>( snapshot_output_dir: P, -) -> Vec<(PathBuf, (Slot, Hash))> { +) -> Vec<(PathBuf, (Slot, Hash, CompressionType))> { match fs::read_dir(&snapshot_output_dir) { Err(err) => { info!("Unable to read snapshot directory: {}", err); @@ -548,7 +572,7 @@ pub fn get_snapshot_archives>( pub fn get_highest_snapshot_archive_path>( snapshot_output_dir: P, -) -> Option<(PathBuf, (Slot, Hash))> { +) -> Option<(PathBuf, (Slot, Hash, CompressionType))> { let archives = get_snapshot_archives(snapshot_output_dir); archives.into_iter().next() } @@ -556,23 +580,32 @@ pub fn get_highest_snapshot_archive_path>( pub fn untar_snapshot_in, Q: AsRef>( snapshot_tar: P, unpack_dir: Q, + compression: CompressionType, ) -> Result<()> { let mut measure = Measure::start("snapshot untar"); - let tar_bz2 = File::open(&snapshot_tar)?; - let tar = BzDecoder::new(BufReader::new(tar_bz2)); - let mut archive = Archive::new(tar); - if !is_snapshot_compression_disabled() { - unpack_snapshot(&mut archive, unpack_dir)?; - } else if let Err(e) = archive.unpack(&unpack_dir) { - warn!( - "Trying to unpack as uncompressed tar because an error occurred: {:?}", - e - ); - let tar_bz2 = File::open(snapshot_tar)?; - let tar = BufReader::new(tar_bz2); - let mut archive = Archive::new(tar); - unpack_snapshot(&mut archive, unpack_dir)?; - } + let tar_name = File::open(&snapshot_tar)?; + match compression { + CompressionType::Bzip2 => { + let tar = BzDecoder::new(BufReader::new(tar_name)); + let mut archive = Archive::new(tar); + unpack_snapshot(&mut archive, unpack_dir)?; + } + CompressionType::Gzip => { + let tar = GzDecoder::new(BufReader::new(tar_name)); + let mut archive = Archive::new(tar); + unpack_snapshot(&mut archive, unpack_dir)?; + } + CompressionType::Zstd => { + let tar = zstd::stream::read::Decoder::new(BufReader::new(tar_name))?; + let mut archive = Archive::new(tar); + unpack_snapshot(&mut archive, unpack_dir)?; + } + CompressionType::NoCompression => { + let tar = BufReader::new(tar_name); + let mut archive = Archive::new(tar); + unpack_snapshot(&mut archive, unpack_dir)?; + } + }; measure.stop(); info!("{}", measure); Ok(()) @@ -661,6 +694,7 @@ pub fn verify_snapshot_archive( snapshot_archive: P, snapshots_to_verify: Q, storages_to_verify: R, + compression: CompressionType, ) where P: AsRef, Q: AsRef, @@ -668,7 +702,7 @@ pub fn verify_snapshot_archive( { let temp_dir = tempfile::TempDir::new().unwrap(); let unpack_dir = temp_dir.path(); - untar_snapshot_in(snapshot_archive, &unpack_dir).unwrap(); + untar_snapshot_in(snapshot_archive, &unpack_dir, compression).unwrap(); // Check snapshots are the same let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOTS_DIR); @@ -795,8 +829,13 @@ mod tests { fn test_snapshot_hash_of() { assert_eq!( snapshot_hash_of(&format!("snapshot-42-{}.tar.bz2", Hash::default())), - Some((42, Hash::default())) + Some((42, Hash::default(), CompressionType::Bzip2)) ); + assert_eq!( + snapshot_hash_of(&format!("snapshot-43-{}.tar.zst", Hash::default())), + Some((43, Hash::default(), CompressionType::Zstd)) + ); + assert!(snapshot_hash_of("invalid").is_none()); } } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 64fa49ae3..2433259cf 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -8,6 +8,7 @@ use solana_core::{ gossip_service::discover_cluster, validator::ValidatorConfig, }; use solana_download_utils::download_snapshot; +use solana_ledger::bank_forks::CompressionType; use solana_ledger::{ bank_forks::SnapshotConfig, blockstore::Blockstore, leader_schedule::FixedSchedule, leader_schedule::LeaderSchedule, snapshot_utils, @@ -840,6 +841,7 @@ fn test_snapshot_download() { let validator_archive_path = snapshot_utils::get_snapshot_archive_path( &validator_snapshot_test_config.snapshot_output_path, &archive_snapshot_hash, + &CompressionType::Bzip2, ); // Download the snapshot, then boot a validator from it. @@ -906,6 +908,7 @@ fn test_snapshot_restart_tower() { let validator_archive_path = snapshot_utils::get_snapshot_archive_path( &validator_snapshot_test_config.snapshot_output_path, &archive_snapshot_hash, + &CompressionType::Bzip2, ); fs::hard_link(archive_filename, &validator_archive_path).unwrap(); @@ -956,7 +959,7 @@ fn test_snapshots_blockstore_floor() { trace!("Waiting for snapshot tar to be generated with slot",); - let (archive_filename, (archive_slot, archive_hash)) = loop { + let (archive_filename, (archive_slot, archive_hash, _)) = loop { let archive = snapshot_utils::get_highest_snapshot_archive_path(&snapshot_package_output_path); if archive.is_some() { @@ -970,6 +973,7 @@ fn test_snapshots_blockstore_floor() { let validator_archive_path = snapshot_utils::get_snapshot_archive_path( &validator_snapshot_test_config.snapshot_output_path, &(archive_slot, archive_hash), + &CompressionType::Bzip2, ); fs::hard_link(archive_filename, &validator_archive_path).unwrap(); let slot_floor = archive_slot; @@ -1224,7 +1228,7 @@ fn wait_for_next_snapshot( last_slot ); loop { - if let Some((filename, (slot, hash))) = + if let Some((filename, (slot, hash, _))) = snapshot_utils::get_highest_snapshot_archive_path(snapshot_package_output_path) { trace!("snapshot for slot {} exists", slot); @@ -1266,6 +1270,7 @@ fn setup_snapshot_validator_config( snapshot_interval_slots, snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()), snapshot_path: PathBuf::from(snapshot_dir.path()), + compression: CompressionType::Bzip2, }; // Create the account paths diff --git a/validator/src/main.rs b/validator/src/main.rs index 449924f7a..22608cabd 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -19,7 +19,10 @@ use solana_core::{ validator::{Validator, ValidatorConfig}, }; use solana_download_utils::{download_genesis_if_missing, download_snapshot}; -use solana_ledger::{bank_forks::SnapshotConfig, hardened_unpack::unpack_genesis_archive}; +use solana_ledger::{ + bank_forks::{CompressionType, SnapshotConfig}, + hardened_unpack::unpack_genesis_archive, +}; use solana_perf::recycler::enable_recycler_warming; use solana_sdk::{ clock::Slot, @@ -713,6 +716,14 @@ pub fn main() { intentionally crash should any transaction modify the frozen account in any way \ other than increasing the account balance"), ) + .arg( + Arg::with_name("snapshot_compression") + .long("snapshot-compression") + .possible_values(&["bz2", "gzip", "zstd", "none"]) + .value_name("COMPRESSION_TYPE") + .takes_value(true) + .help("Type of snapshot compression to use."), + ) .get_matches(); let identity_keypair = Arc::new(keypair_of(&matches, "identity").unwrap_or_else(Keypair::new)); @@ -838,6 +849,16 @@ pub fn main() { exit(1); }); + let mut snapshot_compression = CompressionType::Bzip2; + if let Ok(compression_str) = value_t!(matches, "snapshot_compression", String) { + match compression_str.as_str() { + "bz2" => snapshot_compression = CompressionType::Bzip2, + "gzip" => snapshot_compression = CompressionType::Gzip, + "zstd" => snapshot_compression = CompressionType::Zstd, + "none" => snapshot_compression = CompressionType::NoCompression, + _ => panic!("Compression type not recognized: {}", compression_str), + } + } validator_config.snapshot_config = Some(SnapshotConfig { snapshot_interval_slots: if snapshot_interval_slots > 0 { snapshot_interval_slots @@ -846,6 +867,7 @@ pub fn main() { }, snapshot_path, snapshot_package_output_path: ledger_path.clone(), + compression: snapshot_compression, }); if matches.is_present("limit_ledger_size") {