Move snapshot archive generation out of the SnapshotPackagerService

This commit is contained in:
Michael Vines 2020-01-23 11:20:37 -07:00
parent 6f5e0cd161
commit ce231602dc
5 changed files with 143 additions and 170 deletions

1
Cargo.lock generated
View File

@ -4005,6 +4005,7 @@ dependencies = [
"solana-sdk 0.23.0",
"solana-stake-program 0.23.0",
"solana-vote-program 0.23.0",
"symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sys-info 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)",
"tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -1,15 +1,7 @@
use solana_ledger::{
snapshot_package::{SnapshotPackage, SnapshotPackageReceiver},
snapshot_utils::{
serialize_status_cache, SnapshotError, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR,
TAR_VERSION_FILE,
},
snapshot_package::SnapshotPackageReceiver, snapshot_utils::archive_snapshot_package,
};
use solana_measure::measure::Measure;
use solana_metrics::datapoint_info;
use std::{
fs,
process::ExitStatus,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::RecvTimeoutError,
@ -18,32 +10,6 @@ use std::{
thread::{self, Builder, JoinHandle},
time::Duration,
};
use symlink;
use tempfile::TempDir;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SnapshotServiceError {
#[error("I/O error")]
IO(#[from] std::io::Error),
#[error("serialization error")]
Serialize(#[from] Box<bincode::ErrorKind>),
#[error("receive timeout error")]
RecvTimeoutError(#[from] RecvTimeoutError),
#[error("snapshot error")]
SnapshotError(#[from] SnapshotError),
#[error("archive generation failure {0}")]
ArchiveGenerationFailure(ExitStatus),
#[error("storage path symlink is invalid")]
StoragePathSymlinkInvalid,
}
type Result<T> = std::result::Result<T, SnapshotServiceError>;
pub struct SnapshotPackagerService {
t_snapshot_packager: JoinHandle<()>,
@ -58,14 +24,19 @@ impl SnapshotPackagerService {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = Self::run(&snapshot_package_receiver) {
match e {
SnapshotServiceError::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
break
match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(mut snapshot_package) => {
// Only package the latest
while let Ok(new_snapshot_package) = snapshot_package_receiver.try_recv() {
snapshot_package = new_snapshot_package;
}
if let Err(err) = archive_snapshot_package(&snapshot_package) {
warn!("Failed to create snapshot archive: {}", err);
}
SnapshotServiceError::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => info!("Error from package_snapshots: {:?}", e),
}
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => (),
}
})
.unwrap();
@ -74,123 +45,6 @@ impl SnapshotPackagerService {
}
}
pub fn package_snapshots(snapshot_package: &SnapshotPackage) -> Result<()> {
info!(
"Generating snapshot tarball for root {}",
snapshot_package.root
);
serialize_status_cache(
snapshot_package.root,
&snapshot_package.slot_deltas,
&snapshot_package.snapshot_links,
)?;
let mut timer = Measure::start("snapshot_package-package_snapshots");
let tar_dir = snapshot_package
.tar_output_file
.parent()
.expect("Tar output path is invalid");
fs::create_dir_all(tar_dir)?;
// Create the staging directories
let staging_dir = TempDir::new()?;
let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);
let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR);
let staging_version_file = staging_dir.path().join(TAR_VERSION_FILE);
fs::create_dir_all(&staging_accounts_dir)?;
// Add the snapshots to the staging directory
symlink::symlink_dir(
snapshot_package.snapshot_links.path(),
&staging_snapshots_dir,
)?;
// Add the AppendVecs into the compressible list
for storage in &snapshot_package.storage_entries {
storage.flush()?;
let storage_path = storage.get_path();
let output_path = staging_accounts_dir.join(
storage_path
.file_name()
.expect("Invalid AppendVec file path"),
);
// `storage_path` - The file path where the AppendVec itself is located
// `output_path` - The directory where the AppendVec will be placed in the staging directory.
let storage_path =
fs::canonicalize(storage_path).expect("Could not get absolute path for accounts");
symlink::symlink_dir(storage_path, &output_path)?;
if !output_path.is_file() {
return Err(SnapshotServiceError::StoragePathSymlinkInvalid);
}
}
// Write version file
{
use std::io::Write;
let snapshot_version = format!("{}\n", env!("CARGO_PKG_VERSION"));
let mut f = std::fs::File::create(staging_version_file)?;
//f.write_all(&snapshot_version.to_string().into_bytes())?;
f.write_all(&snapshot_version.into_bytes())?;
}
// Tar the staging directory into the archive at `archive_path`
let archive_path = tar_dir.join("new_state.tar.bz2");
let args = vec![
"jcfhS",
archive_path.to_str().unwrap(),
"-C",
staging_dir.path().to_str().unwrap(),
TAR_ACCOUNTS_DIR,
TAR_SNAPSHOTS_DIR,
TAR_VERSION_FILE,
];
let output = std::process::Command::new("tar").args(&args).output()?;
if !output.status.success() {
warn!("tar command failed with exit code: {}", output.status);
use std::str::from_utf8;
info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?"));
info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?"));
return Err(SnapshotServiceError::ArchiveGenerationFailure(
output.status,
));
}
// Once everything is successful, overwrite the previous tarball so that other validators
// can fetch this newly packaged snapshot
let metadata = fs::metadata(&archive_path)?;
fs::rename(&archive_path, &snapshot_package.tar_output_file)?;
timer.stop();
info!(
"Successfully created tarball. slot: {}, elapsed ms: {}, size={}",
snapshot_package.root,
timer.as_ms(),
metadata.len()
);
datapoint_info!(
"snapshot-package",
("slot", snapshot_package.root, i64),
("duration_ms", timer.as_ms(), i64),
("size", metadata.len(), i64)
);
Ok(())
}
fn run(snapshot_receiver: &SnapshotPackageReceiver) -> Result<()> {
let mut snapshot_package = snapshot_receiver.recv_timeout(Duration::from_secs(1))?;
// Only package the latest
while let Ok(new_snapshot_package) = snapshot_receiver.try_recv() {
snapshot_package = new_snapshot_package;
}
Self::package_snapshots(&snapshot_package)?;
Ok(())
}
pub fn join(self) -> thread::Result<()> {
self.t_snapshot_packager.join()
}
@ -200,14 +54,17 @@ impl SnapshotPackagerService {
mod tests {
use super::*;
use bincode::serialize_into;
use solana_ledger::snapshot_utils::{self, SNAPSHOT_STATUS_CACHE_FILE_NAME};
use solana_ledger::{
snapshot_package::SnapshotPackage,
snapshot_utils::{self, SNAPSHOT_STATUS_CACHE_FILE_NAME},
};
use solana_runtime::{
accounts_db::AccountStorageEntry, bank::MAX_SNAPSHOT_DATA_FILE_SIZE,
status_cache::SlotDelta,
};
use solana_sdk::transaction::Result as TransactionResult;
use solana_sdk::transaction;
use std::{
fs::{remove_dir_all, OpenOptions},
fs::{self, remove_dir_all, OpenOptions},
io::Write,
path::{Path, PathBuf},
};
@ -286,7 +143,7 @@ mod tests {
);
// Make tarball from packageable snapshot
SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap();
snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap();
// before we compare, stick an empty status_cache in this dir so that the package comparision works
// This is needed since the status_cache is added by the packager and is not collected from
@ -302,7 +159,7 @@ mod tests {
)
.unwrap();
// Check tarball is correct
snapshot_utils::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir);
// Check archive is correct
snapshot_utils::verify_snapshot_archive(output_tar_path, snapshots_dir, accounts_dir);
}
}

View File

@ -140,7 +140,8 @@ mod tests {
&last_bank.src.roots(),
)
.unwrap();
SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap();
snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap();
restore_from_snapshot(bank_forks, vec![accounts_dir.path().to_path_buf()]);
}
@ -322,7 +323,7 @@ mod tests {
)
.unwrap();
snapshot_utils::verify_snapshot_tar(
snapshot_utils::verify_snapshot_archive(
saved_tar,
saved_snapshots_dir.path(),
saved_accounts_dir

View File

@ -43,6 +43,7 @@ solana-sdk = { path = "../sdk", version = "0.23.0" }
solana-stake-program = { path = "../programs/stake", version = "0.23.0" }
solana-vote-program = { path = "../programs/vote", version = "0.23.0" }
sys-info = "0.5.8"
symlink = "0.1.0"
tar = "0.4.26"
thiserror = "1.0"
tempfile = "3.1.0"

View File

@ -12,10 +12,10 @@ use solana_sdk::transaction::Result as TransactionResult;
use solana_sdk::{clock::Slot, transaction};
use std::{
cmp::Ordering,
fs,
fs::File,
fs::{self, File},
io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
process::ExitStatus,
};
use tar::Archive;
use tempfile::TempDir;
@ -42,6 +42,12 @@ pub enum SnapshotError {
#[error("file system error")]
FsExtra(#[from] fs_extra::error::Error),
#[error("archive generation failure {0}")]
ArchiveGenerationFailure(ExitStatus),
#[error("storage path symlink is invalid")]
StoragePathSymlinkInvalid,
}
pub type Result<T> = std::result::Result<T, SnapshotError>;
@ -107,6 +113,110 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
Ok(package)
}
pub fn archive_snapshot_package(snapshot_package: &SnapshotPackage) -> Result<()> {
info!(
"Generating snapshot archive for slot {}",
snapshot_package.root
);
serialize_status_cache(
snapshot_package.root,
&snapshot_package.slot_deltas,
&snapshot_package.snapshot_links,
)?;
let mut timer = Measure::start("snapshot_package-package_snapshots");
let tar_dir = snapshot_package
.tar_output_file
.parent()
.expect("Tar output path is invalid");
fs::create_dir_all(tar_dir)?;
// Create the staging directories
let staging_dir = TempDir::new()?;
let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);
let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR);
let staging_version_file = staging_dir.path().join(TAR_VERSION_FILE);
fs::create_dir_all(&staging_accounts_dir)?;
// Add the snapshots to the staging directory
symlink::symlink_dir(
snapshot_package.snapshot_links.path(),
&staging_snapshots_dir,
)?;
// Add the AppendVecs into the compressible list
for storage in &snapshot_package.storage_entries {
storage.flush()?;
let storage_path = storage.get_path();
let output_path = staging_accounts_dir.join(
storage_path
.file_name()
.expect("Invalid AppendVec file path"),
);
// `storage_path` - The file path where the AppendVec itself is located
// `output_path` - The directory where the AppendVec will be placed in the staging directory.
let storage_path =
fs::canonicalize(storage_path).expect("Could not get absolute path for accounts");
symlink::symlink_dir(storage_path, &output_path)?;
if !output_path.is_file() {
return Err(SnapshotError::StoragePathSymlinkInvalid);
}
}
// Write version file
{
let snapshot_version = format!("{}\n", env!("CARGO_PKG_VERSION"));
let mut f = std::fs::File::create(staging_version_file)?;
//f.write_all(&snapshot_version.to_string().into_bytes())?;
f.write_all(&snapshot_version.into_bytes())?;
}
// Tar the staging directory into the archive at `archive_path`
let archive_path = tar_dir.join("new_state.tar.bz2");
let args = vec![
"jcfhS",
archive_path.to_str().unwrap(),
"-C",
staging_dir.path().to_str().unwrap(),
TAR_ACCOUNTS_DIR,
TAR_SNAPSHOTS_DIR,
TAR_VERSION_FILE,
];
let output = std::process::Command::new("tar").args(&args).output()?;
if !output.status.success() {
warn!("tar command failed with exit code: {}", output.status);
use std::str::from_utf8;
info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?"));
info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?"));
return Err(SnapshotError::ArchiveGenerationFailure(output.status));
}
// Once everything is successful, overwrite the previous tarball so that other validators
// can fetch this newly packaged snapshot
let metadata = fs::metadata(&archive_path)?;
fs::rename(&archive_path, &snapshot_package.tar_output_file)?;
timer.stop();
info!(
"Successfully created tarball. slot: {}, elapsed ms: {}, size={}",
snapshot_package.root,
timer.as_ms(),
metadata.len()
);
datapoint_info!(
"snapshot-package",
("slot", snapshot_package.root, i64),
("duration_ms", timer.as_ms(), i64),
("size", metadata.len(), i64)
);
Ok(())
}
pub fn get_snapshot_paths<P: AsRef<Path>>(snapshot_path: P) -> Vec<SlotSnapshotPaths>
where
P: std::fmt::Debug,
@ -469,8 +579,11 @@ fn get_io_error(error: &str) -> SnapshotError {
SnapshotError::IO(IOError::new(ErrorKind::Other, error))
}
pub fn verify_snapshot_tar<P, Q, R>(snapshot_tar: P, snapshots_to_verify: Q, storages_to_verify: R)
where
pub fn verify_snapshot_archive<P, Q, R>(
snapshot_tar: P,
snapshots_to_verify: Q,
storages_to_verify: R,
) where
P: AsRef<Path>,
Q: AsRef<Path>,
R: AsRef<Path>,