From 4688f9821f9d8ecf18d90a30a547304be6952e1f Mon Sep 17 00:00:00 2001 From: carllin Date: Wed, 14 Aug 2019 23:14:40 -0700 Subject: [PATCH] Snapshot optimizations (#5525) * Change serializing snapshot tar to use shell command --- Cargo.lock | 1 + core/src/lib.rs | 1 + core/src/snapshot_package.rs | 68 ++++++++++++++++++++++-------------- core/src/snapshot_utils.rs | 2 +- local_cluster/Cargo.toml | 1 + 5 files changed, 46 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5b0f5cbe6..4a0db80a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3526,6 +3526,7 @@ dependencies = [ "solana-storage-api 0.18.0-pre1", "solana-storage-program 0.18.0-pre1", "solana-vote-api 0.18.0-pre1", + "symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/core/src/lib.rs b/core/src/lib.rs index 5ba3c1e12..baaa1d3d9 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -106,5 +106,6 @@ extern crate bzip2; extern crate crossbeam_channel; extern crate dir_diff; extern crate fs_extra; +extern crate symlink; extern crate tar; extern crate tempfile; diff --git a/core/src/snapshot_package.rs b/core/src/snapshot_package.rs index ace39fb11..85ded3dd9 100644 --- a/core/src/snapshot_package.rs +++ b/core/src/snapshot_package.rs @@ -1,15 +1,15 @@ use crate::result::{Error, Result}; use crate::service::Service; -use bzip2::write::BzEncoder; use solana_runtime::accounts_db::AccountStorageEntry; use std::fs; -use std::path::Path; +use std::io::{Error as IOError, ErrorKind}; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; +use symlink; use tempfile::TempDir; pub type SnapshotPackageSender = Sender; @@ -73,45 +73,56 @@ impl SnapshotPackagerService { fs::create_dir_all(tar_dir)?; - // Create the tar builder - let tar_gz = tempfile::Builder::new() - .prefix("new_state") - .suffix(".tar.bz2") - .tempfile_in(tar_dir)?; + // Create the staging directories + let staging_dir = TempDir::new()?; + let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR); + let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR); + fs::create_dir_all(&staging_accounts_dir)?; - let temp_tar_path = tar_gz.path(); - let enc = BzEncoder::new(&tar_gz, bzip2::Compression::Default); - let mut tar = tar::Builder::new(enc); - - // Create the list of paths to compress, starting with the snapshots - let tar_output_snapshots_dir = Path::new(&TAR_SNAPSHOTS_DIR); - - // Add the snapshots to the tarball and delete the directory of hardlinks to the snapshots - // that was created to persist those snapshots while this package was being created - let res = tar.append_dir_all( - tar_output_snapshots_dir, + // Add the snapshots to the staging directory + symlink::symlink_dir( snapshot_package.snapshot_links.path(), - ); - res?; + &staging_snapshots_dir, + )?; // Add the AppendVecs into the compressible list - let tar_output_accounts_dir = Path::new(&TAR_ACCOUNTS_DIR); for storage in &snapshot_package.storage_entries { let storage_path = storage.get_path(); - let output_path = tar_output_accounts_dir.join( + let output_path = staging_accounts_dir.join( storage_path .file_name() .expect("Invalid AppendVec file path"), ); - // `output_path` - The directory where the AppendVec will be placed in the tarball. // `storage_path` - The file path where the AppendVec itself is located - tar.append_path_with_name(storage_path, output_path)?; + // `output_path` - The directory where the AppendVec will be placed in the staging directory. + symlink::symlink_dir(storage_path, output_path)?; + } + + // Tar the staging directory into the archive `temp_tar_gz` + let temp_tar_gz = tempfile::Builder::new() + .prefix("new_state") + .suffix(".tar.bz2") + .tempfile_in(tar_dir)?; + let temp_tar_path = temp_tar_gz.path(); + let mut args = vec!["jcfhS"]; + args.push(temp_tar_path.to_str().unwrap()); + args.push("-C"); + args.push(staging_dir.path().to_str().unwrap()); + args.push(TAR_ACCOUNTS_DIR); + args.push(TAR_SNAPSHOTS_DIR); + + let status = std::process::Command::new("tar").args(&args).status()?; + + if !status.success() { + return Err(Self::get_io_error(&format!( + "Error trying to generate snapshot archive: {}", + status + ))); } // Once everything is successful, overwrite the previous tarball so that other validators - // can rsync this newly packaged snapshot - tar.finish()?; + // can fetch this newly packaged snapshot let _ = fs::remove_file(&snapshot_package.tar_output_file); fs::hard_link(&temp_tar_path, &snapshot_package.tar_output_file)?; Ok(()) @@ -126,6 +137,11 @@ impl SnapshotPackagerService { Self::package_snapshots(&snapshot_package)?; Ok(()) } + + fn get_io_error(error: &str) -> Error { + warn!("Snapshot Packaging Error: {:?}", error); + Error::IO(IOError::new(ErrorKind::Other, error)) + } } impl Service for SnapshotPackagerService { diff --git a/core/src/snapshot_utils.rs b/core/src/snapshot_utils.rs index 6bcb57a29..99af2a47b 100644 --- a/core/src/snapshot_utils.rs +++ b/core/src/snapshot_utils.rs @@ -256,7 +256,7 @@ fn get_bank_snapshot_dir>(path: P, slot: u64) -> PathBuf { } fn get_io_error(error: &str) -> Error { - warn!("BankForks error: {:?}", error); + warn!("Snapshot Error: {:?}", error); Error::IO(IOError::new(ErrorKind::Other, error)) } diff --git a/local_cluster/Cargo.toml b/local_cluster/Cargo.toml index a995bfb99..1185d5d89 100644 --- a/local_cluster/Cargo.toml +++ b/local_cluster/Cargo.toml @@ -19,6 +19,7 @@ solana-stake-api = { path = "../programs/stake_api", version = "0.18.0-pre1" } solana-storage-api = { path = "../programs/storage_api", version = "0.18.0-pre1" } solana-storage-program = { path = "../programs/storage_program", version = "0.18.0-pre1" } solana-vote-api = { path = "../programs/vote_api", version = "0.18.0-pre1" } +symlink = "0.1.0" [dev-dependencies] serial_test = "0.2.0"