Interleaved snapshot unpack versioning (#27484)

* Issue #27346 - deserialize and check snapshot version before account fields

* Update comment on SnapshotFileKind

Co-authored-by: Brooks Prumo <brooks@prumo.org>

* SnapshotStorageRebuilderResult to RebuiltSnapshotStorage

* better error propagation from rebuild_storage

Co-authored-by: apfitzge <apfitzge@users.noreply.github.com>
Co-authored-by: Brooks Prumo <brooks@prumo.org>
This commit is contained in:
apfitzge 2022-09-22 11:44:25 -05:00 committed by GitHub
parent 91d556dc66
commit a846d50b6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 91 additions and 43 deletions

View File

@ -64,6 +64,7 @@ use {
crate::{
accounts_db::{AccountStorageMap, AtomicAppendVecId},
hardened_unpack::streaming_unpack_snapshot,
snapshot_utils::snapshot_storage_rebuilder::RebuiltSnapshotStorage,
},
crossbeam_channel::Sender,
std::thread::{Builder, JoinHandle},
@ -134,10 +135,6 @@ impl SnapshotVersion {
pub fn as_str(self) -> &'static str {
<&str as From<Self>>::from(self)
}
fn maybe_from_string(version_string: &str) -> Option<SnapshotVersion> {
version_string.parse::<Self>().ok()
}
}
/// Information about a bank snapshot. Namely the slot of the bank, the path to the snapshot, and
@ -203,7 +200,7 @@ struct UnarchivedSnapshot {
#[derive(Debug)]
struct UnpackedSnapshotsDirAndVersion {
unpacked_snapshots_dir: PathBuf,
snapshot_version: String,
snapshot_version: SnapshotVersion,
}
/// Helper type for passing around account storage map and next append vec id
@ -1262,7 +1259,6 @@ where
.prefix(unpacked_snapshots_dir_prefix)
.tempdir_in(bank_snapshots_dir)?;
let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
let unpacked_version_file = unpack_dir.path().join("version");
let (file_sender, file_receiver) = crossbeam_channel::unbounded();
streaming_unarchive_snapshot(
@ -1277,18 +1273,20 @@ where
let num_rebuilder_threads = num_cpus::get_physical()
.saturating_sub(parallel_divisions)
.max(1);
let (storage, measure_untar) = measure!(
let (version_and_storages, measure_untar) = measure!(
SnapshotStorageRebuilder::rebuild_storage(
file_receiver,
num_rebuilder_threads,
next_append_vec_id
),
)?,
measure_name
);
info!("{}", measure_untar);
let snapshot_version = snapshot_version_from_file(&unpacked_version_file)?;
let RebuiltSnapshotStorage {
snapshot_version,
storage,
} = version_and_storages;
Ok(UnarchivedSnapshot {
unpack_dir,
storage,
@ -1726,14 +1724,7 @@ fn verify_unpacked_snapshots_dir_and_version(
&unpacked_snapshots_dir_and_version.snapshot_version
);
let snapshot_version =
SnapshotVersion::maybe_from_string(&unpacked_snapshots_dir_and_version.snapshot_version)
.ok_or_else(|| {
get_io_error(&format!(
"unsupported snapshot version: {}",
&unpacked_snapshots_dir_and_version.snapshot_version,
))
})?;
let snapshot_version = unpacked_snapshots_dir_and_version.snapshot_version;
let mut bank_snapshots =
get_bank_snapshots_post(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir);
if bank_snapshots.len() > 1 {

View File

@ -1,6 +1,7 @@
//! Provides interfaces for rebuilding snapshot storages
use {
super::{get_io_error, snapshot_version_from_file, SnapshotError, SnapshotVersion},
crate::{
accounts_db::{AccountStorageEntry, AccountStorageMap, AppendVecId, AtomicAppendVecId},
serde_snapshot::{
@ -28,10 +29,17 @@ use {
time::Instant,
},
};
/// Convenient wrapper for snapshot version and rebuilt storages
pub(crate) struct RebuiltSnapshotStorage {
/// Snapshot version
pub snapshot_version: SnapshotVersion,
/// Rebuilt storages
pub storage: AccountStorageMap,
}
/// Stores state for rebuilding snapshot storages
#[derive(Debug)]
pub struct SnapshotStorageRebuilder {
pub(crate) struct SnapshotStorageRebuilder {
/// Receiver for unpacked snapshot storage files
file_receiver: Receiver<PathBuf>,
/// Number of threads to rebuild with
@ -52,20 +60,35 @@ pub struct SnapshotStorageRebuilder {
impl SnapshotStorageRebuilder {
/// Synchronously spawns threads to rebuild snapshot storages
pub fn rebuild_storage(
pub(crate) fn rebuild_storage(
file_receiver: Receiver<PathBuf>,
num_threads: usize,
next_append_vec_id: Arc<AtomicAppendVecId>,
) -> AccountStorageMap {
let (snapshot_file_path, append_vec_files) = Self::get_snapshot_file(&file_receiver);
let snapshot_storage_lengths = Self::process_snapshot_file(snapshot_file_path).unwrap();
Self::spawn_rebuilder_threads(
) -> Result<RebuiltSnapshotStorage, SnapshotError> {
let (snapshot_version_path, snapshot_file_path, append_vec_files) =
Self::get_version_and_snapshot_files(&file_receiver);
let snapshot_version_str = snapshot_version_from_file(&snapshot_version_path)?;
let snapshot_version = snapshot_version_str.parse().map_err(|_| {
get_io_error(&format!(
"unsupported snapshot version: {}",
snapshot_version_str,
))
})?;
let snapshot_storage_lengths =
Self::process_snapshot_file(snapshot_version, snapshot_file_path)?;
let account_storage_map = Self::spawn_rebuilder_threads(
file_receiver,
num_threads,
next_append_vec_id,
snapshot_storage_lengths,
append_vec_files,
)
);
Ok(RebuiltSnapshotStorage {
snapshot_version,
storage: account_storage_map,
})
}
/// Create the SnapshotStorageRebuilder for storing state during rebuilding
@ -98,16 +121,34 @@ impl SnapshotStorageRebuilder {
/// Waits for snapshot file
/// Due to parallel unpacking, we may receive some append_vec files before the snapshot file
/// This function will push append_vec files into a buffer until we receive the snapshot file
fn get_snapshot_file(file_receiver: &Receiver<PathBuf>) -> (PathBuf, Vec<PathBuf>) {
fn get_version_and_snapshot_files(
file_receiver: &Receiver<PathBuf>,
) -> (PathBuf, PathBuf, Vec<PathBuf>) {
let mut append_vec_files = Vec::with_capacity(1024);
let snapshot_file_path = loop {
let mut snapshot_version_path = None;
let mut snapshot_file_path = None;
loop {
if let Ok(path) = file_receiver.recv() {
let filename = path.file_name().unwrap().to_str().unwrap();
match get_snapshot_file_kind(filename) {
Some(SnapshotFileKind::SnapshotFile) => {
break path;
Some(SnapshotFileKind::Version) => {
snapshot_version_path = Some(path);
// break if we have both the snapshot file and the version file
if snapshot_file_path.is_some() {
break;
}
}
Some(SnapshotFileKind::StorageFile) => {
Some(SnapshotFileKind::BankFields) => {
snapshot_file_path = Some(path);
// break if we have both the snapshot file and the version file
if snapshot_version_path.is_some() {
break;
}
}
Some(SnapshotFileKind::Storage) => {
append_vec_files.push(path);
}
None => {} // do nothing for other kinds of files
@ -115,21 +156,28 @@ impl SnapshotStorageRebuilder {
} else {
panic!("did not receive snapshot file from unpacking threads");
}
};
}
let snapshot_version_path = snapshot_version_path.unwrap();
let snapshot_file_path = snapshot_file_path.unwrap();
(snapshot_file_path, append_vec_files)
(snapshot_version_path, snapshot_file_path, append_vec_files)
}
/// Process the snapshot file to get the size of each snapshot storage file
fn process_snapshot_file(
snapshot_version: SnapshotVersion,
snapshot_file_path: PathBuf,
) -> Result<HashMap<Slot, HashMap<usize, usize>>, bincode::Error> {
let snapshot_file = File::open(snapshot_file_path).unwrap();
let mut snapshot_stream = BufReader::new(snapshot_file);
let (_bank_fields, accounts_fields) =
serde_snapshot::fields_from_stream(SerdeStyle::Newer, &mut snapshot_stream)?;
match snapshot_version {
SnapshotVersion::V1_2_0 => {
let (_bank_fields, accounts_fields) =
serde_snapshot::fields_from_stream(SerdeStyle::Newer, &mut snapshot_stream)?;
Ok(snapshot_storage_lengths_from_fields(&accounts_fields))
Ok(snapshot_storage_lengths_from_fields(&accounts_fields))
}
}
}
/// Spawn threads for processing buffered append_vec_files, and then received files
@ -191,7 +239,7 @@ impl SnapshotStorageRebuilder {
/// Process an append_vec_file
fn process_append_vec_file(&self, path: PathBuf) -> Result<(), std::io::Error> {
let filename = path.file_name().unwrap().to_str().unwrap().to_owned();
if let Some(SnapshotFileKind::StorageFile) = get_snapshot_file_kind(&filename) {
if let Some(SnapshotFileKind::Storage) = get_snapshot_file_kind(&filename) {
let (slot, slot_complete) = self.insert_slot_storage_file(path, filename);
if slot_complete {
self.process_complete_slot(slot)?;
@ -290,15 +338,20 @@ impl SnapshotStorageRebuilder {
}
}
/// Used to determine if a filename is structured like a snapshot file, storage file, or neither
/// Used to determine if a filename is structured like a version file, bank file, or storage file
#[derive(PartialEq, Debug)]
enum SnapshotFileKind {
SnapshotFile,
StorageFile,
Version,
BankFields,
Storage,
}
/// Determines `SnapshotFileKind` for `filename` if any
fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
if filename == "version" {
return Some(SnapshotFileKind::Version);
}
let mut periods = 0;
let mut saw_numbers = false;
for x in filename.chars() {
@ -318,8 +371,8 @@ fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
}
match (periods, saw_numbers) {
(0, true) => Some(SnapshotFileKind::SnapshotFile),
(1, true) => Some(SnapshotFileKind::StorageFile),
(0, true) => Some(SnapshotFileKind::BankFields),
(1, true) => Some(SnapshotFileKind::Storage),
(_, _) => None,
}
}
@ -342,11 +395,15 @@ mod tests {
fn test_get_snapshot_file_kind() {
assert_eq!(None, get_snapshot_file_kind("file.txt"));
assert_eq!(
Some(SnapshotFileKind::SnapshotFile),
Some(SnapshotFileKind::Version),
get_snapshot_file_kind("version")
);
assert_eq!(
Some(SnapshotFileKind::BankFields),
get_snapshot_file_kind("1234")
);
assert_eq!(
Some(SnapshotFileKind::StorageFile),
Some(SnapshotFileKind::Storage),
get_snapshot_file_kind("1000.999")
);
}