2019-07-31 17:58:10 -07:00
|
|
|
use crate::snapshot_package::SnapshotPackage;
|
2020-01-09 16:49:36 -08:00
|
|
|
use bincode::serialize_into;
|
2019-08-14 19:25:22 -07:00
|
|
|
use bzip2::bufread::BzDecoder;
|
2019-08-13 17:20:14 -07:00
|
|
|
use fs_extra::dir::CopyOptions;
|
2019-10-20 08:54:38 -07:00
|
|
|
use log::*;
|
2020-02-24 12:37:14 -08:00
|
|
|
use regex::Regex;
|
2019-09-25 18:07:41 -07:00
|
|
|
use solana_measure::measure::Measure;
|
2020-02-20 22:27:55 -08:00
|
|
|
use solana_runtime::{
|
|
|
|
accounts_db::{SnapshotStorage, SnapshotStorages},
|
|
|
|
bank::{
|
|
|
|
self, deserialize_from_snapshot, Bank, BankRcSerialize, BankSlotDelta,
|
|
|
|
MAX_SNAPSHOT_DATA_FILE_SIZE,
|
|
|
|
},
|
2020-01-09 16:49:36 -08:00
|
|
|
};
|
2020-02-24 12:37:14 -08:00
|
|
|
use solana_sdk::{clock::Slot, hash::Hash};
|
2019-11-02 00:38:30 -07:00
|
|
|
use std::{
|
|
|
|
cmp::Ordering,
|
2020-02-20 17:19:45 -08:00
|
|
|
env,
|
2020-01-23 10:20:37 -08:00
|
|
|
fs::{self, File},
|
2020-01-09 16:49:36 -08:00
|
|
|
io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write},
|
2019-11-02 00:38:30 -07:00
|
|
|
path::{Path, PathBuf},
|
2020-01-23 10:20:37 -08:00
|
|
|
process::ExitStatus,
|
2019-11-02 00:38:30 -07:00
|
|
|
};
|
2019-07-31 17:58:10 -07:00
|
|
|
use tar::Archive;
|
2020-01-09 16:49:36 -08:00
|
|
|
use tempfile::TempDir;
|
2019-12-02 14:42:05 -08:00
|
|
|
use thiserror::Error;
|
2019-07-31 17:58:10 -07:00
|
|
|
|
2019-09-25 13:42:19 -07:00
|
|
|
pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
|
2019-10-18 14:58:16 -07:00
|
|
|
pub const TAR_SNAPSHOTS_DIR: &str = "snapshots";
|
|
|
|
pub const TAR_ACCOUNTS_DIR: &str = "accounts";
|
2020-01-21 21:06:21 -08:00
|
|
|
pub const TAR_VERSION_FILE: &str = "version";
|
2019-08-06 18:47:30 -07:00
|
|
|
|
2020-02-26 20:10:31 -08:00
|
|
|
pub const SNAPSHOT_VERSION: &str = "1.0.0";
|
|
|
|
|
2019-08-06 18:47:30 -07:00
|
|
|
#[derive(PartialEq, Ord, Eq, Debug)]
|
|
|
|
pub struct SlotSnapshotPaths {
|
2019-11-02 00:38:30 -07:00
|
|
|
pub slot: Slot,
|
2019-08-06 18:47:30 -07:00
|
|
|
pub snapshot_file_path: PathBuf,
|
|
|
|
}
|
|
|
|
|
2019-12-02 14:42:05 -08:00
|
|
|
#[derive(Error, Debug)]
|
2019-10-18 18:16:06 -07:00
|
|
|
pub enum SnapshotError {
|
2019-12-02 14:42:05 -08:00
|
|
|
#[error("I/O error")]
|
|
|
|
IO(#[from] std::io::Error),
|
2019-10-18 18:16:06 -07:00
|
|
|
|
2019-12-02 14:42:05 -08:00
|
|
|
#[error("serialization error")]
|
|
|
|
Serialize(#[from] Box<bincode::ErrorKind>),
|
2019-10-18 18:16:06 -07:00
|
|
|
|
2019-12-02 14:42:05 -08:00
|
|
|
#[error("file system error")]
|
|
|
|
FsExtra(#[from] fs_extra::error::Error),
|
2020-01-23 10:20:37 -08:00
|
|
|
|
|
|
|
#[error("archive generation failure {0}")]
|
|
|
|
ArchiveGenerationFailure(ExitStatus),
|
|
|
|
|
|
|
|
#[error("storage path symlink is invalid")]
|
|
|
|
StoragePathSymlinkInvalid,
|
2019-10-18 18:16:06 -07:00
|
|
|
}
|
2019-12-02 14:42:05 -08:00
|
|
|
pub type Result<T> = std::result::Result<T, SnapshotError>;
|
2019-10-18 18:16:06 -07:00
|
|
|
|
2019-08-06 18:47:30 -07:00
|
|
|
impl PartialOrd for SlotSnapshotPaths {
|
|
|
|
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
|
|
|
Some(self.slot.cmp(&other.slot))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl SlotSnapshotPaths {
|
2019-09-23 20:12:16 -07:00
|
|
|
fn copy_snapshot_directory<P: AsRef<Path>>(&self, snapshot_hardlink_dir: P) -> Result<()> {
|
2019-08-06 18:47:30 -07:00
|
|
|
// Create a new directory in snapshot_hardlink_dir
|
|
|
|
let new_slot_hardlink_dir = snapshot_hardlink_dir.as_ref().join(self.slot.to_string());
|
|
|
|
let _ = fs::remove_dir_all(&new_slot_hardlink_dir);
|
|
|
|
fs::create_dir_all(&new_slot_hardlink_dir)?;
|
|
|
|
|
2019-09-23 20:12:16 -07:00
|
|
|
// Copy the snapshot
|
|
|
|
fs::copy(
|
2019-08-06 18:47:30 -07:00
|
|
|
&self.snapshot_file_path,
|
|
|
|
&new_slot_hardlink_dir.join(self.slot.to_string()),
|
|
|
|
)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-08-07 13:12:53 -07:00
|
|
|
pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
|
2019-07-31 17:58:10 -07:00
|
|
|
bank: &Bank,
|
2019-09-25 13:42:19 -07:00
|
|
|
snapshot_files: &SlotSnapshotPaths,
|
2019-08-07 13:12:53 -07:00
|
|
|
snapshot_path: Q,
|
2019-11-02 00:38:30 -07:00
|
|
|
slots_to_snapshot: &[Slot],
|
2020-02-26 18:04:18 -08:00
|
|
|
snapshot_package_output_path: P,
|
2020-02-20 22:27:55 -08:00
|
|
|
snapshot_storages: SnapshotStorages,
|
2019-07-31 17:58:10 -07:00
|
|
|
) -> Result<SnapshotPackage> {
|
|
|
|
// Hard link all the snapshots we need for this package
|
2019-08-07 13:12:53 -07:00
|
|
|
let snapshot_hard_links_dir = tempfile::tempdir_in(snapshot_path)?;
|
2019-07-31 17:58:10 -07:00
|
|
|
|
|
|
|
// Create a snapshot package
|
2019-08-05 22:53:19 -07:00
|
|
|
info!(
|
2019-07-31 17:58:10 -07:00
|
|
|
"Snapshot for bank: {} has {} account storage entries",
|
2019-08-21 23:59:11 -07:00
|
|
|
bank.slot(),
|
2020-02-20 22:27:55 -08:00
|
|
|
snapshot_storages.len()
|
2019-07-31 17:58:10 -07:00
|
|
|
);
|
|
|
|
|
|
|
|
// Any errors from this point on will cause the above SnapshotPackage to drop, clearing
|
|
|
|
// any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir)
|
2019-09-25 13:42:19 -07:00
|
|
|
snapshot_files.copy_snapshot_directory(snapshot_hard_links_dir.path())?;
|
2019-07-31 17:58:10 -07:00
|
|
|
|
2020-02-26 18:04:18 -08:00
|
|
|
let snapshot_package_output_file = get_snapshot_archive_path(
|
|
|
|
&snapshot_package_output_path,
|
|
|
|
&(bank.slot(), bank.get_accounts_hash()),
|
|
|
|
);
|
|
|
|
|
2019-08-07 13:12:53 -07:00
|
|
|
let package = SnapshotPackage::new(
|
2019-08-21 16:36:21 -07:00
|
|
|
bank.slot(),
|
2019-09-25 13:42:19 -07:00
|
|
|
bank.src.slot_deltas(slots_to_snapshot),
|
2019-08-07 13:12:53 -07:00
|
|
|
snapshot_hard_links_dir,
|
2020-02-20 22:27:55 -08:00
|
|
|
snapshot_storages,
|
2020-02-26 18:04:18 -08:00
|
|
|
snapshot_package_output_file,
|
2020-02-22 13:46:40 -08:00
|
|
|
bank.get_accounts_hash(),
|
2019-08-07 13:12:53 -07:00
|
|
|
);
|
|
|
|
|
2019-07-31 17:58:10 -07:00
|
|
|
Ok(package)
|
|
|
|
}
|
|
|
|
|
2020-01-23 10:20:37 -08:00
|
|
|
pub fn archive_snapshot_package(snapshot_package: &SnapshotPackage) -> Result<()> {
|
|
|
|
info!(
|
|
|
|
"Generating snapshot archive for slot {}",
|
|
|
|
snapshot_package.root
|
|
|
|
);
|
|
|
|
|
|
|
|
serialize_status_cache(
|
|
|
|
snapshot_package.root,
|
|
|
|
&snapshot_package.slot_deltas,
|
|
|
|
&snapshot_package.snapshot_links,
|
|
|
|
)?;
|
|
|
|
|
|
|
|
let mut timer = Measure::start("snapshot_package-package_snapshots");
|
|
|
|
let tar_dir = snapshot_package
|
|
|
|
.tar_output_file
|
|
|
|
.parent()
|
|
|
|
.expect("Tar output path is invalid");
|
|
|
|
|
|
|
|
fs::create_dir_all(tar_dir)?;
|
|
|
|
|
|
|
|
// Create the staging directories
|
|
|
|
let staging_dir = TempDir::new()?;
|
|
|
|
let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);
|
|
|
|
let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR);
|
|
|
|
let staging_version_file = staging_dir.path().join(TAR_VERSION_FILE);
|
|
|
|
fs::create_dir_all(&staging_accounts_dir)?;
|
|
|
|
|
|
|
|
// Add the snapshots to the staging directory
|
|
|
|
symlink::symlink_dir(
|
|
|
|
snapshot_package.snapshot_links.path(),
|
|
|
|
&staging_snapshots_dir,
|
|
|
|
)?;
|
|
|
|
|
|
|
|
// Add the AppendVecs into the compressible list
|
2020-02-20 22:27:55 -08:00
|
|
|
for storage in snapshot_package.storages.iter().flatten() {
|
2020-01-23 10:20:37 -08:00
|
|
|
storage.flush()?;
|
|
|
|
let storage_path = storage.get_path();
|
|
|
|
let output_path = staging_accounts_dir.join(
|
|
|
|
storage_path
|
|
|
|
.file_name()
|
|
|
|
.expect("Invalid AppendVec file path"),
|
|
|
|
);
|
|
|
|
|
|
|
|
// `storage_path` - The file path where the AppendVec itself is located
|
|
|
|
// `output_path` - The directory where the AppendVec will be placed in the staging directory.
|
|
|
|
let storage_path =
|
|
|
|
fs::canonicalize(storage_path).expect("Could not get absolute path for accounts");
|
|
|
|
symlink::symlink_dir(storage_path, &output_path)?;
|
|
|
|
if !output_path.is_file() {
|
|
|
|
return Err(SnapshotError::StoragePathSymlinkInvalid);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write version file
|
|
|
|
{
|
|
|
|
let mut f = std::fs::File::create(staging_version_file)?;
|
2020-02-26 20:10:31 -08:00
|
|
|
f.write_all(&SNAPSHOT_VERSION.to_string().into_bytes())?;
|
2020-01-23 10:20:37 -08:00
|
|
|
}
|
|
|
|
|
2020-02-20 17:19:45 -08:00
|
|
|
let archive_compress_options = if is_snapshot_compression_disabled() {
|
|
|
|
""
|
|
|
|
} else {
|
|
|
|
"j"
|
|
|
|
};
|
|
|
|
let archive_options = format!("{}cfhS", archive_compress_options);
|
|
|
|
|
2020-01-23 10:20:37 -08:00
|
|
|
// Tar the staging directory into the archive at `archive_path`
|
|
|
|
let archive_path = tar_dir.join("new_state.tar.bz2");
|
|
|
|
let args = vec![
|
2020-02-20 17:19:45 -08:00
|
|
|
archive_options.as_str(),
|
2020-01-23 10:20:37 -08:00
|
|
|
archive_path.to_str().unwrap(),
|
|
|
|
"-C",
|
|
|
|
staging_dir.path().to_str().unwrap(),
|
|
|
|
TAR_ACCOUNTS_DIR,
|
|
|
|
TAR_SNAPSHOTS_DIR,
|
|
|
|
TAR_VERSION_FILE,
|
|
|
|
];
|
|
|
|
|
|
|
|
let output = std::process::Command::new("tar").args(&args).output()?;
|
|
|
|
if !output.status.success() {
|
|
|
|
warn!("tar command failed with exit code: {}", output.status);
|
|
|
|
use std::str::from_utf8;
|
|
|
|
info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?"));
|
|
|
|
info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?"));
|
|
|
|
|
|
|
|
return Err(SnapshotError::ArchiveGenerationFailure(output.status));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Once everything is successful, overwrite the previous tarball so that other validators
|
|
|
|
// can fetch this newly packaged snapshot
|
|
|
|
let metadata = fs::metadata(&archive_path)?;
|
|
|
|
fs::rename(&archive_path, &snapshot_package.tar_output_file)?;
|
|
|
|
|
2020-02-24 12:37:14 -08:00
|
|
|
// Keep around at most two snapshot archives
|
|
|
|
let archives = get_snapshot_archives(snapshot_package.tar_output_file.parent().unwrap());
|
|
|
|
for old_archive in archives.into_iter().skip(2) {
|
|
|
|
fs::remove_file(old_archive.0)
|
|
|
|
.unwrap_or_else(|err| info!("Failed to remove old snapshot: {:}", err));
|
|
|
|
}
|
|
|
|
|
2020-01-23 10:20:37 -08:00
|
|
|
timer.stop();
|
|
|
|
info!(
|
2020-02-26 20:28:53 -08:00
|
|
|
"Successfully created {:?}. slot: {}, elapsed ms: {}, size={}",
|
|
|
|
snapshot_package.tar_output_file,
|
2020-01-23 10:20:37 -08:00
|
|
|
snapshot_package.root,
|
|
|
|
timer.as_ms(),
|
|
|
|
metadata.len()
|
|
|
|
);
|
|
|
|
datapoint_info!(
|
|
|
|
"snapshot-package",
|
|
|
|
("slot", snapshot_package.root, i64),
|
|
|
|
("duration_ms", timer.as_ms(), i64),
|
|
|
|
("size", metadata.len(), i64)
|
|
|
|
);
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2019-08-23 13:02:07 -07:00
|
|
|
pub fn get_snapshot_paths<P: AsRef<Path>>(snapshot_path: P) -> Vec<SlotSnapshotPaths>
|
|
|
|
where
|
|
|
|
P: std::fmt::Debug,
|
|
|
|
{
|
|
|
|
match fs::read_dir(&snapshot_path) {
|
|
|
|
Ok(paths) => {
|
|
|
|
let mut names = paths
|
|
|
|
.filter_map(|entry| {
|
|
|
|
entry.ok().and_then(|e| {
|
|
|
|
e.path()
|
|
|
|
.file_name()
|
|
|
|
.and_then(|n| n.to_str().map(|s| s.parse::<u64>().ok()))
|
|
|
|
.unwrap_or(None)
|
|
|
|
})
|
|
|
|
})
|
|
|
|
.map(|slot| {
|
|
|
|
let snapshot_path = snapshot_path.as_ref().join(slot.to_string());
|
|
|
|
SlotSnapshotPaths {
|
|
|
|
slot,
|
|
|
|
snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)),
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.collect::<Vec<SlotSnapshotPaths>>();
|
|
|
|
|
|
|
|
names.sort();
|
|
|
|
names
|
|
|
|
}
|
|
|
|
Err(err) => {
|
|
|
|
info!(
|
|
|
|
"Unable to read snapshot directory {:?}: {}",
|
|
|
|
snapshot_path, err
|
|
|
|
);
|
|
|
|
vec![]
|
|
|
|
}
|
|
|
|
}
|
2019-07-31 17:58:10 -07:00
|
|
|
}
|
|
|
|
|
2020-01-09 16:49:36 -08:00
|
|
|
pub fn serialize_snapshot_data_file<F>(
|
|
|
|
data_file_path: &Path,
|
|
|
|
maximum_file_size: u64,
|
|
|
|
mut serializer: F,
|
|
|
|
) -> Result<u64>
|
|
|
|
where
|
|
|
|
F: FnMut(&mut BufWriter<File>) -> Result<()>,
|
|
|
|
{
|
|
|
|
let data_file = File::create(data_file_path)?;
|
|
|
|
let mut data_file_stream = BufWriter::new(data_file);
|
|
|
|
serializer(&mut data_file_stream)?;
|
|
|
|
data_file_stream.flush()?;
|
|
|
|
|
|
|
|
let consumed_size = data_file_stream.seek(SeekFrom::Current(0))?;
|
|
|
|
if consumed_size > maximum_file_size {
|
|
|
|
let error_message = format!(
|
|
|
|
"too large snapshot data file to serialize: {:?} has {} bytes",
|
|
|
|
data_file_path, consumed_size
|
|
|
|
);
|
|
|
|
return Err(get_io_error(&error_message));
|
|
|
|
}
|
|
|
|
Ok(consumed_size)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn deserialize_snapshot_data_file<F, T>(
|
|
|
|
data_file_path: &Path,
|
|
|
|
maximum_file_size: u64,
|
|
|
|
mut deserializer: F,
|
|
|
|
) -> Result<T>
|
|
|
|
where
|
|
|
|
F: FnMut(&mut BufReader<File>) -> Result<T>,
|
|
|
|
{
|
|
|
|
let file_size = fs::metadata(&data_file_path)?.len();
|
|
|
|
|
|
|
|
if file_size > maximum_file_size {
|
|
|
|
let error_message = format!(
|
|
|
|
"too large snapshot data file to deserialize: {:?} has {} bytes",
|
|
|
|
data_file_path, file_size
|
|
|
|
);
|
|
|
|
return Err(get_io_error(&error_message));
|
|
|
|
}
|
|
|
|
|
|
|
|
let data_file = File::open(data_file_path)?;
|
|
|
|
let mut data_file_stream = BufReader::new(data_file);
|
|
|
|
|
|
|
|
let ret = deserializer(&mut data_file_stream)?;
|
|
|
|
|
|
|
|
let consumed_size = data_file_stream.seek(SeekFrom::Current(0))?;
|
|
|
|
|
|
|
|
if file_size != consumed_size {
|
|
|
|
let error_message = format!(
|
|
|
|
"invalid snapshot data file: {:?} has {} bytes, however consumed {} bytes to deserialize",
|
|
|
|
data_file_path, file_size, consumed_size
|
|
|
|
);
|
|
|
|
return Err(get_io_error(&error_message));
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(ret)
|
|
|
|
}
|
|
|
|
|
2020-02-20 22:27:55 -08:00
|
|
|
pub fn add_snapshot<P: AsRef<Path>>(
|
|
|
|
snapshot_path: P,
|
|
|
|
bank: &Bank,
|
|
|
|
snapshot_storages: &[SnapshotStorage],
|
|
|
|
) -> Result<SlotSnapshotPaths> {
|
2020-03-02 21:57:25 -08:00
|
|
|
bank.clean_accounts();
|
2020-02-22 13:46:40 -08:00
|
|
|
bank.update_accounts_hash();
|
2019-07-31 17:58:10 -07:00
|
|
|
let slot = bank.slot();
|
2019-08-06 18:47:30 -07:00
|
|
|
// snapshot_path/slot
|
2019-07-31 17:58:10 -07:00
|
|
|
let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot);
|
2019-10-18 18:16:06 -07:00
|
|
|
fs::create_dir_all(slot_snapshot_dir.clone())?;
|
2019-07-31 17:58:10 -07:00
|
|
|
|
2020-01-23 08:46:30 -08:00
|
|
|
// the bank snapshot is stored as snapshot_path/slot/slot
|
2020-01-09 16:49:36 -08:00
|
|
|
let snapshot_bank_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot));
|
2019-08-05 22:53:19 -07:00
|
|
|
info!(
|
2020-01-23 08:46:30 -08:00
|
|
|
"Creating snapshot for slot {}, path: {:?}",
|
|
|
|
slot, snapshot_bank_file_path,
|
2019-07-31 17:58:10 -07:00
|
|
|
);
|
2019-10-03 19:44:23 -07:00
|
|
|
|
2020-01-09 16:49:36 -08:00
|
|
|
let mut bank_serialize = Measure::start("bank-serialize-ms");
|
|
|
|
let consumed_size = serialize_snapshot_data_file(
|
|
|
|
&snapshot_bank_file_path,
|
|
|
|
MAX_SNAPSHOT_DATA_FILE_SIZE,
|
|
|
|
|stream| {
|
|
|
|
serialize_into(stream.by_ref(), &*bank)?;
|
2020-02-20 22:27:55 -08:00
|
|
|
serialize_into(
|
|
|
|
stream.by_ref(),
|
|
|
|
&BankRcSerialize {
|
|
|
|
bank_rc: &bank.rc,
|
|
|
|
snapshot_storages,
|
|
|
|
},
|
|
|
|
)?;
|
2020-01-09 16:49:36 -08:00
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)?;
|
|
|
|
bank_serialize.stop();
|
|
|
|
|
|
|
|
// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
|
|
|
|
datapoint_info!(
|
|
|
|
"snapshot-bank-file",
|
2020-01-23 08:46:30 -08:00
|
|
|
("slot", slot, i64),
|
2020-01-09 16:49:36 -08:00
|
|
|
("size", consumed_size, i64)
|
|
|
|
);
|
|
|
|
|
|
|
|
inc_new_counter_info!("bank-serialize-ms", bank_serialize.as_ms() as usize);
|
2019-09-25 18:07:41 -07:00
|
|
|
|
2019-08-05 22:53:19 -07:00
|
|
|
info!(
|
2019-11-13 11:20:39 -08:00
|
|
|
"{} for slot {} at {:?}",
|
2020-01-23 08:46:30 -08:00
|
|
|
bank_serialize, slot, snapshot_bank_file_path,
|
2019-07-31 17:58:10 -07:00
|
|
|
);
|
2019-08-06 18:47:30 -07:00
|
|
|
|
2020-01-23 08:46:30 -08:00
|
|
|
Ok(SlotSnapshotPaths {
|
|
|
|
slot,
|
|
|
|
snapshot_file_path: snapshot_bank_file_path,
|
|
|
|
})
|
2019-07-31 17:58:10 -07:00
|
|
|
}
|
|
|
|
|
2020-01-09 16:49:36 -08:00
|
|
|
pub fn serialize_status_cache(
|
|
|
|
slot: Slot,
|
2020-02-10 03:11:37 -08:00
|
|
|
slot_deltas: &[BankSlotDelta],
|
2020-01-09 16:49:36 -08:00
|
|
|
snapshot_links: &TempDir,
|
|
|
|
) -> Result<()> {
|
|
|
|
// the status cache is stored as snapshot_path/status_cache
|
|
|
|
let snapshot_status_cache_file_path =
|
|
|
|
snapshot_links.path().join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
|
|
|
|
|
|
|
|
let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
|
|
|
|
let consumed_size = serialize_snapshot_data_file(
|
|
|
|
&snapshot_status_cache_file_path,
|
|
|
|
MAX_SNAPSHOT_DATA_FILE_SIZE,
|
|
|
|
|stream| {
|
|
|
|
serialize_into(stream, slot_deltas)?;
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)?;
|
|
|
|
status_cache_serialize.stop();
|
|
|
|
|
|
|
|
// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
|
|
|
|
datapoint_info!(
|
|
|
|
"snapshot-status-cache-file",
|
|
|
|
("slot", slot, i64),
|
|
|
|
("size", consumed_size, i64)
|
|
|
|
);
|
|
|
|
|
|
|
|
inc_new_counter_info!(
|
|
|
|
"serialize-status-cache-ms",
|
|
|
|
status_cache_serialize.as_ms() as usize
|
|
|
|
);
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2019-11-02 00:38:30 -07:00
|
|
|
pub fn remove_snapshot<P: AsRef<Path>>(slot: Slot, snapshot_path: P) -> Result<()> {
|
2019-07-31 17:58:10 -07:00
|
|
|
let slot_snapshot_dir = get_bank_snapshot_dir(&snapshot_path, slot);
|
|
|
|
// Remove the snapshot directory for this slot
|
|
|
|
fs::remove_dir_all(slot_snapshot_dir)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2019-08-13 17:20:14 -07:00
|
|
|
pub fn bank_from_archive<P: AsRef<Path>>(
|
2019-12-05 18:41:29 -08:00
|
|
|
account_paths: &[PathBuf],
|
2019-10-18 14:58:16 -07:00
|
|
|
snapshot_path: &PathBuf,
|
2019-08-13 17:20:14 -07:00
|
|
|
snapshot_tar: P,
|
|
|
|
) -> Result<Bank> {
|
|
|
|
// Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()`
|
2019-10-18 14:58:16 -07:00
|
|
|
let unpack_dir = tempfile::tempdir_in(snapshot_path)?;
|
2019-08-13 17:20:14 -07:00
|
|
|
untar_snapshot_in(&snapshot_tar, &unpack_dir)?;
|
|
|
|
|
2019-11-13 11:20:39 -08:00
|
|
|
let mut measure = Measure::start("bank rebuild from snapshot");
|
2019-08-13 17:20:14 -07:00
|
|
|
let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
|
|
|
|
let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR);
|
2020-01-21 21:06:21 -08:00
|
|
|
let unpacked_version_file = unpack_dir.as_ref().join(TAR_VERSION_FILE);
|
|
|
|
|
2020-02-26 20:10:31 -08:00
|
|
|
let mut snapshot_version = String::new();
|
|
|
|
File::open(unpacked_version_file).and_then(|mut f| f.read_to_string(&mut snapshot_version))?;
|
2020-01-21 21:06:21 -08:00
|
|
|
|
2020-03-03 20:48:55 -08:00
|
|
|
let bank = rebuild_bank_from_snapshots(
|
2020-01-21 21:06:21 -08:00
|
|
|
snapshot_version.trim(),
|
2019-09-25 13:42:19 -07:00
|
|
|
account_paths,
|
|
|
|
&unpacked_snapshots_dir,
|
|
|
|
unpacked_accounts_dir,
|
|
|
|
)?;
|
2019-08-13 17:20:14 -07:00
|
|
|
|
2019-10-23 12:46:48 -07:00
|
|
|
if !bank.verify_snapshot_bank() {
|
2020-02-05 17:40:02 -08:00
|
|
|
panic!("Snapshot bank for slot {} failed to verify", bank.slot());
|
2019-09-20 13:21:12 -07:00
|
|
|
}
|
2019-11-13 11:20:39 -08:00
|
|
|
measure.stop();
|
|
|
|
info!("{}", measure);
|
2019-09-20 13:21:12 -07:00
|
|
|
|
2019-10-18 14:58:16 -07:00
|
|
|
// Move the unpacked snapshots into `snapshot_path`
|
2019-08-23 13:02:07 -07:00
|
|
|
let dir_files = fs::read_dir(&unpacked_snapshots_dir).unwrap_or_else(|err| {
|
|
|
|
panic!(
|
|
|
|
"Invalid snapshot path {:?}: {}",
|
|
|
|
unpacked_snapshots_dir, err
|
|
|
|
)
|
|
|
|
});
|
2019-08-13 17:20:14 -07:00
|
|
|
let paths: Vec<PathBuf> = dir_files
|
|
|
|
.filter_map(|entry| entry.ok().map(|e| e.path()))
|
|
|
|
.collect();
|
|
|
|
let mut copy_options = CopyOptions::new();
|
|
|
|
copy_options.overwrite = true;
|
2019-10-18 14:58:16 -07:00
|
|
|
fs_extra::move_items(&paths, &snapshot_path, ©_options)?;
|
2019-08-13 17:20:14 -07:00
|
|
|
|
|
|
|
Ok(bank)
|
|
|
|
}
|
|
|
|
|
2020-02-20 17:19:45 -08:00
|
|
|
fn is_snapshot_compression_disabled() -> bool {
|
|
|
|
if let Ok(flag) = env::var("SOLANA_DISABLE_SNAPSHOT_COMPRESSION") {
|
|
|
|
!(flag == "0" || flag == "false")
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-02-24 12:37:14 -08:00
|
|
|
pub fn get_snapshot_archive_path<P: AsRef<Path>>(
|
|
|
|
snapshot_output_dir: P,
|
|
|
|
snapshot_hash: &(Slot, Hash),
|
|
|
|
) -> PathBuf {
|
|
|
|
snapshot_output_dir.as_ref().join(format!(
|
|
|
|
"snapshot-{}-{}.tar.bz2",
|
|
|
|
snapshot_hash.0, snapshot_hash.1
|
|
|
|
))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn snapshot_hash_of(archive_filename: &str) -> Option<(Slot, Hash)> {
|
|
|
|
let snapshot_filename_regex = Regex::new(r"snapshot-(\d+)-([[:alnum:]]+)\.tar\.bz2$").unwrap();
|
|
|
|
|
|
|
|
if let Some(captures) = snapshot_filename_regex.captures(archive_filename) {
|
|
|
|
let slot_str = captures.get(1).unwrap().as_str();
|
|
|
|
let hash_str = captures.get(2).unwrap().as_str();
|
|
|
|
|
|
|
|
if let (Ok(slot), Ok(hash)) = (slot_str.parse::<Slot>(), hash_str.parse::<Hash>()) {
|
|
|
|
return Some((slot, hash));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
None
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_snapshot_archives<P: AsRef<Path>>(snapshot_output_dir: P) -> Vec<(PathBuf, (Slot, Hash))> {
|
|
|
|
let files = fs::read_dir(&snapshot_output_dir)
|
|
|
|
.unwrap_or_else(|err| panic!("Unable to read snapshot directory: {}", err));
|
|
|
|
|
|
|
|
let mut archives: Vec<_> = files
|
|
|
|
.filter_map(|entry| {
|
|
|
|
if let Ok(entry) = entry {
|
|
|
|
let path = entry.path();
|
|
|
|
if path.is_file() {
|
|
|
|
if let Some(snapshot_hash) =
|
|
|
|
snapshot_hash_of(path.file_name().unwrap().to_str().unwrap())
|
|
|
|
{
|
|
|
|
return Some((path, snapshot_hash));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
None
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
archives.sort_by(|a, b| (b.1).0.cmp(&(a.1).0)); // reverse sort by slot
|
|
|
|
archives
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn get_highest_snapshot_archive_path<P: AsRef<Path>>(
|
|
|
|
snapshot_output_dir: P,
|
2020-02-26 18:04:18 -08:00
|
|
|
) -> Option<(PathBuf, (Slot, Hash))> {
|
2020-02-24 12:37:14 -08:00
|
|
|
let archives = get_snapshot_archives(snapshot_output_dir);
|
2020-02-26 18:04:18 -08:00
|
|
|
archives.into_iter().next()
|
2019-08-13 17:20:14 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>(
|
|
|
|
snapshot_tar: P,
|
|
|
|
unpack_dir: Q,
|
|
|
|
) -> Result<()> {
|
2019-11-13 11:20:39 -08:00
|
|
|
let mut measure = Measure::start("snapshot untar");
|
2020-02-20 17:19:45 -08:00
|
|
|
let tar_bz2 = File::open(&snapshot_tar)?;
|
2019-08-14 19:25:22 -07:00
|
|
|
let tar = BzDecoder::new(BufReader::new(tar_bz2));
|
2019-08-13 17:20:14 -07:00
|
|
|
let mut archive = Archive::new(tar);
|
2020-02-20 17:19:45 -08:00
|
|
|
if !is_snapshot_compression_disabled() {
|
|
|
|
archive.unpack(&unpack_dir)?;
|
|
|
|
} else if let Err(e) = archive.unpack(&unpack_dir) {
|
|
|
|
warn!(
|
2020-02-24 10:51:24 -08:00
|
|
|
"Trying to unpack as uncompressed tar because an error occurred: {:?}",
|
2020-02-20 17:19:45 -08:00
|
|
|
e
|
|
|
|
);
|
|
|
|
let tar_bz2 = File::open(snapshot_tar)?;
|
|
|
|
let tar = BufReader::new(tar_bz2);
|
|
|
|
let mut archive = Archive::new(tar);
|
|
|
|
archive.unpack(&unpack_dir)?;
|
|
|
|
}
|
2019-11-13 11:20:39 -08:00
|
|
|
measure.stop();
|
|
|
|
info!("{}", measure);
|
2019-08-13 17:20:14 -07:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn rebuild_bank_from_snapshots<P>(
|
2020-01-21 21:06:21 -08:00
|
|
|
snapshot_version: &str,
|
2020-01-23 00:49:16 -08:00
|
|
|
account_paths: &[PathBuf],
|
2019-09-25 13:42:19 -07:00
|
|
|
unpacked_snapshots_dir: &PathBuf,
|
2019-08-06 18:47:30 -07:00
|
|
|
append_vecs_path: P,
|
2019-08-05 22:53:19 -07:00
|
|
|
) -> Result<Bank>
|
|
|
|
where
|
|
|
|
P: AsRef<Path>,
|
|
|
|
{
|
2020-01-21 21:06:21 -08:00
|
|
|
info!("snapshot version: {}", snapshot_version);
|
|
|
|
|
2019-09-25 13:42:19 -07:00
|
|
|
let mut snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
|
|
|
|
if snapshot_paths.len() > 1 {
|
|
|
|
return Err(get_io_error("invalid snapshot format"));
|
|
|
|
}
|
|
|
|
let root_paths = snapshot_paths
|
|
|
|
.pop()
|
2019-08-05 22:53:19 -07:00
|
|
|
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;
|
2019-09-25 13:42:19 -07:00
|
|
|
|
2020-01-21 21:06:21 -08:00
|
|
|
info!("Loading bank from {:?}", &root_paths.snapshot_file_path);
|
2020-01-09 16:49:36 -08:00
|
|
|
let bank = deserialize_snapshot_data_file(
|
|
|
|
&root_paths.snapshot_file_path,
|
|
|
|
MAX_SNAPSHOT_DATA_FILE_SIZE,
|
|
|
|
|stream| {
|
2020-01-23 00:49:16 -08:00
|
|
|
let mut bank: Bank = match snapshot_version {
|
2020-02-26 20:10:31 -08:00
|
|
|
SNAPSHOT_VERSION => deserialize_from_snapshot(stream.by_ref())?,
|
2020-01-21 21:06:21 -08:00
|
|
|
_ => {
|
|
|
|
return Err(get_io_error(&format!(
|
|
|
|
"unsupported snapshot version: {}",
|
|
|
|
snapshot_version
|
|
|
|
)));
|
|
|
|
}
|
|
|
|
};
|
2020-02-05 17:40:02 -08:00
|
|
|
info!("Rebuilding accounts...");
|
2020-01-23 00:49:16 -08:00
|
|
|
bank.set_bank_rc(
|
|
|
|
bank::BankRc::new(account_paths.to_vec(), 0, bank.slot()),
|
|
|
|
bank::StatusCacheRc::default(),
|
|
|
|
);
|
|
|
|
bank.rc
|
|
|
|
.accounts_from_stream(stream.by_ref(), account_paths, &append_vecs_path)?;
|
2020-01-09 16:49:36 -08:00
|
|
|
Ok(bank)
|
|
|
|
},
|
|
|
|
)?;
|
2019-08-05 22:53:19 -07:00
|
|
|
|
2019-09-25 13:42:19 -07:00
|
|
|
let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
|
2020-01-09 16:49:36 -08:00
|
|
|
let slot_deltas = deserialize_snapshot_data_file(
|
|
|
|
&status_cache_path,
|
|
|
|
MAX_SNAPSHOT_DATA_FILE_SIZE,
|
|
|
|
|stream| {
|
2020-02-05 17:40:02 -08:00
|
|
|
info!("Rebuilding status cache...");
|
2020-02-10 03:11:37 -08:00
|
|
|
let slot_deltas: Vec<BankSlotDelta> = deserialize_from_snapshot(stream)?;
|
2020-01-09 16:49:36 -08:00
|
|
|
Ok(slot_deltas)
|
|
|
|
},
|
|
|
|
)?;
|
2019-08-06 18:47:30 -07:00
|
|
|
|
2019-09-25 13:42:19 -07:00
|
|
|
bank.src.append(&slot_deltas);
|
2019-07-31 17:58:10 -07:00
|
|
|
|
2020-02-05 17:40:02 -08:00
|
|
|
info!("Loaded bank for slot: {}", bank.slot());
|
2019-08-05 22:53:19 -07:00
|
|
|
Ok(bank)
|
2019-07-31 17:58:10 -07:00
|
|
|
}
|
|
|
|
|
2019-11-02 00:38:30 -07:00
|
|
|
fn get_snapshot_file_name(slot: Slot) -> String {
|
2019-07-31 17:58:10 -07:00
|
|
|
slot.to_string()
|
|
|
|
}
|
|
|
|
|
2019-11-02 00:38:30 -07:00
|
|
|
fn get_bank_snapshot_dir<P: AsRef<Path>>(path: P, slot: Slot) -> PathBuf {
|
2019-07-31 17:58:10 -07:00
|
|
|
path.as_ref().join(slot.to_string())
|
|
|
|
}
|
|
|
|
|
2019-10-18 18:16:06 -07:00
|
|
|
fn get_io_error(error: &str) -> SnapshotError {
|
2019-08-14 23:14:40 -07:00
|
|
|
warn!("Snapshot Error: {:?}", error);
|
2019-10-18 18:16:06 -07:00
|
|
|
SnapshotError::IO(IOError::new(ErrorKind::Other, error))
|
2019-07-31 17:58:10 -07:00
|
|
|
}
|
|
|
|
|
2020-01-23 10:20:37 -08:00
|
|
|
pub fn verify_snapshot_archive<P, Q, R>(
|
2020-02-26 20:28:53 -08:00
|
|
|
snapshot_archive: P,
|
2020-01-23 10:20:37 -08:00
|
|
|
snapshots_to_verify: Q,
|
|
|
|
storages_to_verify: R,
|
|
|
|
) where
|
2019-10-19 12:09:45 -07:00
|
|
|
P: AsRef<Path>,
|
|
|
|
Q: AsRef<Path>,
|
|
|
|
R: AsRef<Path>,
|
|
|
|
{
|
|
|
|
let temp_dir = tempfile::TempDir::new().unwrap();
|
|
|
|
let unpack_dir = temp_dir.path();
|
2020-02-26 20:28:53 -08:00
|
|
|
untar_snapshot_in(snapshot_archive, &unpack_dir).unwrap();
|
2019-10-19 12:09:45 -07:00
|
|
|
|
|
|
|
// Check snapshots are the same
|
|
|
|
let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOTS_DIR);
|
|
|
|
assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
|
|
|
|
|
|
|
|
// Check the account entries are the same
|
|
|
|
let unpacked_accounts = unpack_dir.join(&TAR_ACCOUNTS_DIR);
|
|
|
|
assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap());
|
2019-07-31 17:58:10 -07:00
|
|
|
}
|
2020-01-09 16:49:36 -08:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
|
|
|
use bincode::{deserialize_from, serialize_into};
|
|
|
|
use matches::assert_matches;
|
|
|
|
use std::mem::size_of;
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_serialize_snapshot_data_file_under_limit() {
|
|
|
|
let temp_dir = tempfile::TempDir::new().unwrap();
|
|
|
|
let expected_consumed_size = size_of::<u32>() as u64;
|
|
|
|
let consumed_size = serialize_snapshot_data_file(
|
|
|
|
&temp_dir.path().join("data-file"),
|
|
|
|
expected_consumed_size,
|
|
|
|
|stream| {
|
|
|
|
serialize_into(stream, &2323_u32)?;
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
assert_eq!(consumed_size, expected_consumed_size);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_serialize_snapshot_data_file_over_limit() {
|
|
|
|
let temp_dir = tempfile::TempDir::new().unwrap();
|
|
|
|
let expected_consumed_size = size_of::<u32>() as u64;
|
|
|
|
let result = serialize_snapshot_data_file(
|
|
|
|
&temp_dir.path().join("data-file"),
|
|
|
|
expected_consumed_size - 1,
|
|
|
|
|stream| {
|
|
|
|
serialize_into(stream, &2323_u32)?;
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
);
|
|
|
|
assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_deserialize_snapshot_data_file_under_limit() {
|
|
|
|
let expected_data = 2323_u32;
|
|
|
|
let expected_consumed_size = size_of::<u32>() as u64;
|
|
|
|
|
|
|
|
let temp_dir = tempfile::TempDir::new().unwrap();
|
|
|
|
serialize_snapshot_data_file(
|
|
|
|
&temp_dir.path().join("data-file"),
|
|
|
|
expected_consumed_size,
|
|
|
|
|stream| {
|
|
|
|
serialize_into(stream, &expected_data)?;
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let actual_data = deserialize_snapshot_data_file(
|
|
|
|
&temp_dir.path().join("data-file"),
|
|
|
|
expected_consumed_size,
|
|
|
|
|stream| Ok(deserialize_from::<_, u32>(stream)?),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
assert_eq!(actual_data, expected_data);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_deserialize_snapshot_data_file_over_limit() {
|
|
|
|
let expected_data = 2323_u32;
|
|
|
|
let expected_consumed_size = size_of::<u32>() as u64;
|
|
|
|
|
|
|
|
let temp_dir = tempfile::TempDir::new().unwrap();
|
|
|
|
serialize_snapshot_data_file(
|
|
|
|
&temp_dir.path().join("data-file"),
|
|
|
|
expected_consumed_size,
|
|
|
|
|stream| {
|
|
|
|
serialize_into(stream, &expected_data)?;
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let result = deserialize_snapshot_data_file(
|
|
|
|
&temp_dir.path().join("data-file"),
|
|
|
|
expected_consumed_size - 1,
|
|
|
|
|stream| Ok(deserialize_from::<_, u32>(stream)?),
|
|
|
|
);
|
|
|
|
assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_deserialize_snapshot_data_file_extra_data() {
|
|
|
|
let expected_data = 2323_u32;
|
|
|
|
let expected_consumed_size = size_of::<u32>() as u64;
|
|
|
|
|
|
|
|
let temp_dir = tempfile::TempDir::new().unwrap();
|
|
|
|
serialize_snapshot_data_file(
|
|
|
|
&temp_dir.path().join("data-file"),
|
|
|
|
expected_consumed_size * 2,
|
|
|
|
|stream| {
|
|
|
|
serialize_into(stream.by_ref(), &expected_data)?;
|
|
|
|
serialize_into(stream.by_ref(), &expected_data)?;
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let result = deserialize_snapshot_data_file(
|
|
|
|
&temp_dir.path().join("data-file"),
|
|
|
|
expected_consumed_size * 2,
|
|
|
|
|stream| Ok(deserialize_from::<_, u32>(stream)?),
|
|
|
|
);
|
|
|
|
assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
|
|
|
|
}
|
2020-02-24 12:37:14 -08:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_snapshot_hash_of() {
|
|
|
|
assert_eq!(
|
|
|
|
snapshot_hash_of(&format!("snapshot-42-{}.tar.bz2", Hash::default())),
|
|
|
|
Some((42, Hash::default()))
|
|
|
|
);
|
|
|
|
assert!(snapshot_hash_of("invalid").is_none());
|
|
|
|
}
|
2020-01-09 16:49:36 -08:00
|
|
|
}
|