Snapshot optimizations (#5525)
* Change serializing snapshot tar to use shell command
This commit is contained in:
parent
0c90c889cd
commit
4688f9821f
|
@ -3526,6 +3526,7 @@ dependencies = [
|
||||||
"solana-storage-api 0.18.0-pre1",
|
"solana-storage-api 0.18.0-pre1",
|
||||||
"solana-storage-program 0.18.0-pre1",
|
"solana-storage-program 0.18.0-pre1",
|
||||||
"solana-vote-api 0.18.0-pre1",
|
"solana-vote-api 0.18.0-pre1",
|
||||||
|
"symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
@ -106,5 +106,6 @@ extern crate bzip2;
|
||||||
extern crate crossbeam_channel;
|
extern crate crossbeam_channel;
|
||||||
extern crate dir_diff;
|
extern crate dir_diff;
|
||||||
extern crate fs_extra;
|
extern crate fs_extra;
|
||||||
|
extern crate symlink;
|
||||||
extern crate tar;
|
extern crate tar;
|
||||||
extern crate tempfile;
|
extern crate tempfile;
|
||||||
|
|
|
@ -1,15 +1,15 @@
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use bzip2::write::BzEncoder;
|
|
||||||
use solana_runtime::accounts_db::AccountStorageEntry;
|
use solana_runtime::accounts_db::AccountStorageEntry;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::path::Path;
|
use std::io::{Error as IOError, ErrorKind};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::thread::{self, Builder, JoinHandle};
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use symlink;
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
|
||||||
pub type SnapshotPackageSender = Sender<SnapshotPackage>;
|
pub type SnapshotPackageSender = Sender<SnapshotPackage>;
|
||||||
|
@ -73,45 +73,56 @@ impl SnapshotPackagerService {
|
||||||
|
|
||||||
fs::create_dir_all(tar_dir)?;
|
fs::create_dir_all(tar_dir)?;
|
||||||
|
|
||||||
// Create the tar builder
|
// Create the staging directories
|
||||||
let tar_gz = tempfile::Builder::new()
|
let staging_dir = TempDir::new()?;
|
||||||
.prefix("new_state")
|
let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);
|
||||||
.suffix(".tar.bz2")
|
let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR);
|
||||||
.tempfile_in(tar_dir)?;
|
fs::create_dir_all(&staging_accounts_dir)?;
|
||||||
|
|
||||||
let temp_tar_path = tar_gz.path();
|
// Add the snapshots to the staging directory
|
||||||
let enc = BzEncoder::new(&tar_gz, bzip2::Compression::Default);
|
symlink::symlink_dir(
|
||||||
let mut tar = tar::Builder::new(enc);
|
|
||||||
|
|
||||||
// Create the list of paths to compress, starting with the snapshots
|
|
||||||
let tar_output_snapshots_dir = Path::new(&TAR_SNAPSHOTS_DIR);
|
|
||||||
|
|
||||||
// Add the snapshots to the tarball and delete the directory of hardlinks to the snapshots
|
|
||||||
// that was created to persist those snapshots while this package was being created
|
|
||||||
let res = tar.append_dir_all(
|
|
||||||
tar_output_snapshots_dir,
|
|
||||||
snapshot_package.snapshot_links.path(),
|
snapshot_package.snapshot_links.path(),
|
||||||
);
|
&staging_snapshots_dir,
|
||||||
res?;
|
)?;
|
||||||
|
|
||||||
// Add the AppendVecs into the compressible list
|
// Add the AppendVecs into the compressible list
|
||||||
let tar_output_accounts_dir = Path::new(&TAR_ACCOUNTS_DIR);
|
|
||||||
for storage in &snapshot_package.storage_entries {
|
for storage in &snapshot_package.storage_entries {
|
||||||
let storage_path = storage.get_path();
|
let storage_path = storage.get_path();
|
||||||
let output_path = tar_output_accounts_dir.join(
|
let output_path = staging_accounts_dir.join(
|
||||||
storage_path
|
storage_path
|
||||||
.file_name()
|
.file_name()
|
||||||
.expect("Invalid AppendVec file path"),
|
.expect("Invalid AppendVec file path"),
|
||||||
);
|
);
|
||||||
|
|
||||||
// `output_path` - The directory where the AppendVec will be placed in the tarball.
|
|
||||||
// `storage_path` - The file path where the AppendVec itself is located
|
// `storage_path` - The file path where the AppendVec itself is located
|
||||||
tar.append_path_with_name(storage_path, output_path)?;
|
// `output_path` - The directory where the AppendVec will be placed in the staging directory.
|
||||||
|
symlink::symlink_dir(storage_path, output_path)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tar the staging directory into the archive `temp_tar_gz`
|
||||||
|
let temp_tar_gz = tempfile::Builder::new()
|
||||||
|
.prefix("new_state")
|
||||||
|
.suffix(".tar.bz2")
|
||||||
|
.tempfile_in(tar_dir)?;
|
||||||
|
let temp_tar_path = temp_tar_gz.path();
|
||||||
|
let mut args = vec!["jcfhS"];
|
||||||
|
args.push(temp_tar_path.to_str().unwrap());
|
||||||
|
args.push("-C");
|
||||||
|
args.push(staging_dir.path().to_str().unwrap());
|
||||||
|
args.push(TAR_ACCOUNTS_DIR);
|
||||||
|
args.push(TAR_SNAPSHOTS_DIR);
|
||||||
|
|
||||||
|
let status = std::process::Command::new("tar").args(&args).status()?;
|
||||||
|
|
||||||
|
if !status.success() {
|
||||||
|
return Err(Self::get_io_error(&format!(
|
||||||
|
"Error trying to generate snapshot archive: {}",
|
||||||
|
status
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Once everything is successful, overwrite the previous tarball so that other validators
|
// Once everything is successful, overwrite the previous tarball so that other validators
|
||||||
// can rsync this newly packaged snapshot
|
// can fetch this newly packaged snapshot
|
||||||
tar.finish()?;
|
|
||||||
let _ = fs::remove_file(&snapshot_package.tar_output_file);
|
let _ = fs::remove_file(&snapshot_package.tar_output_file);
|
||||||
fs::hard_link(&temp_tar_path, &snapshot_package.tar_output_file)?;
|
fs::hard_link(&temp_tar_path, &snapshot_package.tar_output_file)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -126,6 +137,11 @@ impl SnapshotPackagerService {
|
||||||
Self::package_snapshots(&snapshot_package)?;
|
Self::package_snapshots(&snapshot_package)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_io_error(error: &str) -> Error {
|
||||||
|
warn!("Snapshot Packaging Error: {:?}", error);
|
||||||
|
Error::IO(IOError::new(ErrorKind::Other, error))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service for SnapshotPackagerService {
|
impl Service for SnapshotPackagerService {
|
||||||
|
|
|
@ -256,7 +256,7 @@ fn get_bank_snapshot_dir<P: AsRef<Path>>(path: P, slot: u64) -> PathBuf {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_io_error(error: &str) -> Error {
|
fn get_io_error(error: &str) -> Error {
|
||||||
warn!("BankForks error: {:?}", error);
|
warn!("Snapshot Error: {:?}", error);
|
||||||
Error::IO(IOError::new(ErrorKind::Other, error))
|
Error::IO(IOError::new(ErrorKind::Other, error))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ solana-stake-api = { path = "../programs/stake_api", version = "0.18.0-pre1" }
|
||||||
solana-storage-api = { path = "../programs/storage_api", version = "0.18.0-pre1" }
|
solana-storage-api = { path = "../programs/storage_api", version = "0.18.0-pre1" }
|
||||||
solana-storage-program = { path = "../programs/storage_program", version = "0.18.0-pre1" }
|
solana-storage-program = { path = "../programs/storage_program", version = "0.18.0-pre1" }
|
||||||
solana-vote-api = { path = "../programs/vote_api", version = "0.18.0-pre1" }
|
solana-vote-api = { path = "../programs/vote_api", version = "0.18.0-pre1" }
|
||||||
|
symlink = "0.1.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
serial_test = "0.2.0"
|
serial_test = "0.2.0"
|
||||||
|
|
Loading…
Reference in New Issue