Add slot deltas into the bank snapshot directory (#29409)

This commit is contained in:
Xiang Zhu 2023-02-01 16:51:32 -08:00 committed by GitHub
parent c549129974
commit f107b8b607
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 122 additions and 89 deletions

View File

@ -331,7 +331,6 @@ mod tests {
archive_format,
},
block_height: slot,
slot_deltas: vec![],
snapshot_links: link_snapshots_dir,
snapshot_storages: storage_entries,
snapshot_version: SnapshotVersion::default(),

View File

@ -2,7 +2,6 @@
use {
crate::snapshot_utils::create_tmp_accounts_dir_for_tests,
bincode::serialize_into,
crossbeam_channel::unbounded,
fs_extra::dir::CopyOptions,
itertools::Itertools,
@ -22,7 +21,7 @@ use {
accounts_db::{self, ACCOUNTS_DB_CONFIG_FOR_TESTING},
accounts_hash::AccountsHash,
accounts_index::AccountSecondaryIndexes,
bank::{Bank, BankSlotDelta},
bank::Bank,
bank_forks::BankForks,
epoch_accounts_hash::EpochAccountsHash,
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
@ -266,13 +265,11 @@ fn run_bank_forks_snapshot_n<F>(
let bank_snapshots_dir = &snapshot_config.bank_snapshots_dir;
let last_bank_snapshot_info = snapshot_utils::get_highest_bank_snapshot_pre(bank_snapshots_dir)
.expect("no bank snapshots found in path");
let slot_deltas = last_bank.status_cache.read().unwrap().root_slot_deltas();
let accounts_package = AccountsPackage::new_for_snapshot(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
&last_bank,
&last_bank_snapshot_info,
bank_snapshots_dir,
slot_deltas,
&snapshot_config.full_snapshot_archives_dir,
&snapshot_config.incremental_snapshot_archives_dir,
last_bank.get_snapshot_storages(None),
@ -371,8 +368,15 @@ fn test_concurrent_snapshot_packaging(
// Take snapshot of zeroth bank
let bank0 = bank_forks.get(0).unwrap();
let storages = bank0.get_snapshot_storages(None);
snapshot_utils::add_bank_snapshot(bank_snapshots_dir, &bank0, &storages, snapshot_version)
.unwrap();
let slot_deltas = bank0.status_cache.read().unwrap().root_slot_deltas();
snapshot_utils::add_bank_snapshot(
bank_snapshots_dir,
&bank0,
&storages,
snapshot_version,
slot_deltas,
)
.unwrap();
// Set up snapshotting channels
let (real_accounts_package_sender, real_accounts_package_receiver) =
@ -416,11 +420,13 @@ fn test_concurrent_snapshot_packaging(
};
let snapshot_storages = bank.get_snapshot_storages(None);
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
let bank_snapshot_info = snapshot_utils::add_bank_snapshot(
bank_snapshots_dir,
&bank,
&snapshot_storages,
snapshot_config.snapshot_version,
slot_deltas,
)
.unwrap();
let accounts_package = AccountsPackage::new_for_snapshot(
@ -428,7 +434,6 @@ fn test_concurrent_snapshot_packaging(
&bank,
&bank_snapshot_info,
bank_snapshots_dir,
vec![],
full_snapshot_archives_dir,
incremental_snapshot_archives_dir,
snapshot_storages,
@ -561,20 +566,6 @@ fn test_concurrent_snapshot_packaging(
// Check the archive we cached the state for earlier was generated correctly
// before we compare, stick an empty status_cache in this dir so that the package comparison works
// This is needed since the status_cache is added by the packager and is not collected from
// the source dir for snapshots
snapshot_utils::serialize_snapshot_data_file(
&saved_snapshots_dir
.path()
.join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILENAME),
|stream| {
serialize_into(stream, &[] as &[BankSlotDelta])?;
Ok(())
},
)
.unwrap();
// files were saved off before we reserialized the bank in the hacked up accounts_hash_verifier stand-in.
solana_runtime::serde_snapshot::reserialize_bank_with_new_accounts_hash(
saved_snapshots_dir.path(),

View File

@ -357,6 +357,7 @@ impl SnapshotRequestHandler {
&snapshot_root_bank,
&snapshot_storages,
self.snapshot_config.snapshot_version,
status_cache_slot_deltas,
)
.expect("snapshot bank");
AccountsPackage::new_for_snapshot(
@ -364,7 +365,6 @@ impl SnapshotRequestHandler {
&snapshot_root_bank,
&bank_snapshot_info,
&self.snapshot_config.bank_snapshots_dir,
status_cache_slot_deltas,
&self.snapshot_config.full_snapshot_archives_dir,
&self.snapshot_config.incremental_snapshot_archives_dir,
snapshot_storages,

View File

@ -3,14 +3,14 @@ use {
accounts::Accounts,
accounts_db::AccountStorageEntry,
accounts_hash::AccountsHash,
bank::{Bank, BankSlotDelta},
bank::Bank,
epoch_accounts_hash::EpochAccountsHash,
rent_collector::RentCollector,
snapshot_archive_info::{SnapshotArchiveInfo, SnapshotArchiveInfoGetter},
snapshot_hash::SnapshotHash,
snapshot_utils::{
self, ArchiveFormat, BankSnapshotInfo, Result, SnapshotVersion,
TMP_BANK_SNAPSHOT_PREFIX,
SNAPSHOT_STATUS_CACHE_FILENAME, TMP_BANK_SNAPSHOT_PREFIX,
},
},
log::*,
@ -59,7 +59,6 @@ impl AccountsPackage {
bank: &Bank,
bank_snapshot_info: &BankSnapshotInfo,
bank_snapshots_dir: impl AsRef<Path>,
slot_deltas: Vec<BankSlotDelta>,
full_snapshot_archives_dir: impl AsRef<Path>,
incremental_snapshot_archives_dir: impl AsRef<Path>,
snapshot_storages: Vec<Arc<AccountStorageEntry>>,
@ -92,16 +91,20 @@ impl AccountsPackage {
.path()
.join(bank_snapshot_info.slot.to_string());
fs::create_dir_all(&snapshot_hardlink_dir)?;
let file_name =
snapshot_utils::path_to_file_name_str(&bank_snapshot_info.snapshot_path)?;
let snapshot_path = bank_snapshot_info.snapshot_path();
let file_name = snapshot_utils::path_to_file_name_str(&snapshot_path)?;
fs::hard_link(&snapshot_path, snapshot_hardlink_dir.join(file_name))?;
let status_cache_path = bank_snapshot_info
.snapshot_dir
.join(SNAPSHOT_STATUS_CACHE_FILENAME);
let status_cache_file_name = snapshot_utils::path_to_file_name_str(&status_cache_path)?;
fs::hard_link(
&bank_snapshot_info.snapshot_path,
snapshot_hardlink_dir.join(file_name),
&status_cache_path,
snapshot_links.path().join(status_cache_file_name),
)?;
}
let snapshot_info = SupplementalSnapshotInfo {
slot_deltas,
snapshot_links,
archive_format,
snapshot_version,
@ -174,7 +177,6 @@ impl AccountsPackage {
epoch_schedule: EpochSchedule::default(),
rent_collector: RentCollector::default(),
snapshot_info: Some(SupplementalSnapshotInfo {
slot_deltas: Vec::default(),
snapshot_links: TempDir::new().unwrap(),
archive_format: ArchiveFormat::Tar,
snapshot_version: SnapshotVersion::default(),
@ -215,7 +217,6 @@ impl std::fmt::Debug for AccountsPackage {
/// Supplemental information needed for snapshots
pub struct SupplementalSnapshotInfo {
pub slot_deltas: Vec<BankSlotDelta>,
pub snapshot_links: TempDir,
pub archive_format: ArchiveFormat,
pub snapshot_version: SnapshotVersion,
@ -237,7 +238,6 @@ pub enum AccountsPackageType {
pub struct SnapshotPackage {
pub snapshot_archive_info: SnapshotArchiveInfo,
pub block_height: Slot,
pub slot_deltas: Vec<BankSlotDelta>,
pub snapshot_links: TempDir,
pub snapshot_storages: Vec<Arc<AccountStorageEntry>>,
pub snapshot_version: SnapshotVersion,
@ -286,7 +286,6 @@ impl SnapshotPackage {
archive_format: snapshot_info.archive_format,
},
block_height: accounts_package.block_height,
slot_deltas: snapshot_info.slot_deltas,
snapshot_links: snapshot_info.snapshot_links,
snapshot_storages,
snapshot_version: snapshot_info.snapshot_version,

View File

@ -136,10 +136,10 @@ impl SnapshotVersion {
pub struct BankSnapshotInfo {
/// Slot of the bank
pub slot: Slot,
/// Path to the snapshot
pub snapshot_path: PathBuf,
/// Type of the snapshot
pub snapshot_type: BankSnapshotType,
/// Path to the bank snapshot directory
pub snapshot_dir: PathBuf,
}
impl PartialOrd for BankSnapshotInfo {
@ -155,6 +155,49 @@ impl Ord for BankSnapshotInfo {
}
}
impl BankSnapshotInfo {
pub fn new_from_dir(
bank_snapshots_dir: impl AsRef<Path>,
slot: Slot,
) -> Option<BankSnapshotInfo> {
// check this directory to see if there is a BankSnapshotPre and/or
// BankSnapshotPost file
let bank_snapshot_dir = get_bank_snapshots_dir(&bank_snapshots_dir, slot);
let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
let mut bank_snapshot_pre_path = bank_snapshot_post_path.clone();
bank_snapshot_pre_path.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
if bank_snapshot_pre_path.is_file() {
return Some(BankSnapshotInfo {
slot,
snapshot_type: BankSnapshotType::Pre,
snapshot_dir: bank_snapshot_dir,
});
}
if bank_snapshot_post_path.is_file() {
return Some(BankSnapshotInfo {
slot,
snapshot_type: BankSnapshotType::Post,
snapshot_dir: bank_snapshot_dir,
});
}
None
}
pub fn snapshot_path(&self) -> PathBuf {
let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot));
let ext = match self.snapshot_type {
BankSnapshotType::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
BankSnapshotType::Post => "",
};
bank_snapshot_path.set_extension(ext);
bank_snapshot_path
}
}
/// Bank snapshots traditionally had their accounts hash calculated prior to serialization. Since
/// the hash calculation takes a long time, an optimization has been put in to offload the accounts
/// hash calculation. The bank serialization format has not changed, so we need another way to
@ -396,15 +439,6 @@ pub fn archive_snapshot_package(
snapshot_package.slot()
);
serialize_status_cache(
snapshot_package.slot(),
&snapshot_package.slot_deltas,
&snapshot_package
.snapshot_links
.path()
.join(SNAPSHOT_STATUS_CACHE_FILENAME),
)?;
let mut timer = Measure::start("snapshot_package-package_snapshots");
let tar_dir = snapshot_package
.path()
@ -594,28 +628,10 @@ pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnaps
})
})
.for_each(|slot| {
// check this directory to see if there is a BankSnapshotPre and/or
// BankSnapshotPost file
let bank_snapshot_outer_dir = get_bank_snapshots_dir(&bank_snapshots_dir, slot);
let bank_snapshot_post_path =
bank_snapshot_outer_dir.join(get_snapshot_file_name(slot));
let mut bank_snapshot_pre_path = bank_snapshot_post_path.clone();
bank_snapshot_pre_path.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
if bank_snapshot_pre_path.is_file() {
bank_snapshots.push(BankSnapshotInfo {
slot,
snapshot_path: bank_snapshot_pre_path,
snapshot_type: BankSnapshotType::Pre,
});
}
if bank_snapshot_post_path.is_file() {
bank_snapshots.push(BankSnapshotInfo {
slot,
snapshot_path: bank_snapshot_post_path,
snapshot_type: BankSnapshotType::Post,
});
if let Some(snapshot_info) =
BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot)
{
bank_snapshots.push(snapshot_info);
}
}),
}
@ -872,15 +888,16 @@ pub fn add_bank_snapshot(
bank: &Bank,
snapshot_storages: &[Arc<AccountStorageEntry>],
snapshot_version: SnapshotVersion,
slot_deltas: Vec<BankSlotDelta>,
) -> Result<BankSnapshotInfo> {
let mut add_snapshot_time = Measure::start("add-snapshot-ms");
let slot = bank.slot();
// bank_snapshots_dir/slot
let bank_snapshots_dir = get_bank_snapshots_dir(bank_snapshots_dir, slot);
fs::create_dir_all(&bank_snapshots_dir)?;
let bank_snapshot_dir = get_bank_snapshots_dir(bank_snapshots_dir, slot);
fs::create_dir_all(&bank_snapshot_dir)?;
// the bank snapshot is stored as bank_snapshots_dir/slot/slot.BANK_SNAPSHOT_PRE_FILENAME_EXTENSION
let mut bank_snapshot_path = bank_snapshots_dir.join(get_snapshot_file_name(slot));
let mut bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
bank_snapshot_path.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
info!(
@ -907,6 +924,9 @@ pub fn add_bank_snapshot(
bank_serialize.stop();
add_snapshot_time.stop();
let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
serialize_status_cache(slot, &slot_deltas, &status_cache_path)?;
// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
datapoint_info!(
"snapshot-bank-file",
@ -926,8 +946,8 @@ pub fn add_bank_snapshot(
Ok(BankSnapshotInfo {
slot,
snapshot_path: bank_snapshot_path,
snapshot_type: BankSnapshotType::Pre,
snapshot_dir: bank_snapshot_dir,
})
}
@ -1883,16 +1903,16 @@ fn bank_fields_from_snapshots(
};
info!(
"Loading bank from full snapshot {} and incremental snapshot {:?}",
full_snapshot_root_paths.snapshot_path.display(),
full_snapshot_root_paths.snapshot_path().display(),
incremental_snapshot_root_paths
.as_ref()
.map(|paths| paths.snapshot_path.display()),
.map(|paths| paths.snapshot_path()),
);
let snapshot_root_paths = SnapshotRootPaths {
full_snapshot_root_file_path: full_snapshot_root_paths.snapshot_path,
full_snapshot_root_file_path: full_snapshot_root_paths.snapshot_path(),
incremental_snapshot_root_file_path: incremental_snapshot_root_paths
.map(|root_paths| root_paths.snapshot_path),
.map(|root_paths| root_paths.snapshot_path()),
};
deserialize_snapshot_data_files(&snapshot_root_paths, |snapshot_streams| {
@ -1942,16 +1962,16 @@ fn rebuild_bank_from_snapshots(
};
info!(
"Loading bank from full snapshot {} and incremental snapshot {:?}",
full_snapshot_root_paths.snapshot_path.display(),
full_snapshot_root_paths.snapshot_path().display(),
incremental_snapshot_root_paths
.as_ref()
.map(|paths| paths.snapshot_path.display()),
.map(|paths| paths.snapshot_path()),
);
let snapshot_root_paths = SnapshotRootPaths {
full_snapshot_root_file_path: full_snapshot_root_paths.snapshot_path,
full_snapshot_root_file_path: full_snapshot_root_paths.snapshot_path(),
incremental_snapshot_root_file_path: incremental_snapshot_root_paths
.map(|root_paths| root_paths.snapshot_path),
.map(|root_paths| root_paths.snapshot_path()),
};
let bank = deserialize_snapshot_data_files(&snapshot_root_paths, |snapshot_streams| {
@ -2172,6 +2192,22 @@ pub fn verify_snapshot_archive<P, Q, R>(
assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
std::fs::remove_file(p1).unwrap();
std::fs::remove_file(p2).unwrap();
// The new the status_cache file is inside the slot directory together with the snapshot file.
// When unpacking an archive, the status_cache file from the archive is one-level up outside of
// the slot direcotry.
// The unpacked status_cache file need to be put back into the slot directory for the directory
// comparison to pass.
let existing_unpacked_status_cache_file =
unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME);
let new_unpacked_status_cache_file = unpacked_snapshots
.join(&slot)
.join(SNAPSHOT_STATUS_CACHE_FILENAME);
fs::rename(
existing_unpacked_status_cache_file,
new_unpacked_status_cache_file,
)
.unwrap();
}
assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
@ -2199,7 +2235,7 @@ pub fn purge_old_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
if r.is_err() {
warn!(
"Couldn't remove bank snapshot at: {}",
bank_snapshot.snapshot_path.display()
bank_snapshot.snapshot_dir.display()
);
}
})
@ -2253,8 +2289,14 @@ pub fn bank_to_full_snapshot_archive(
let temp_dir = tempfile::tempdir_in(bank_snapshots_dir)?;
let snapshot_storages = bank.get_snapshot_storages(None);
let bank_snapshot_info =
add_bank_snapshot(&temp_dir, bank, &snapshot_storages, snapshot_version)?;
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
let bank_snapshot_info = add_bank_snapshot(
&temp_dir,
bank,
&snapshot_storages,
snapshot_version,
slot_deltas,
)?;
package_and_archive_full_snapshot(
bank,
@ -2300,8 +2342,14 @@ pub fn bank_to_incremental_snapshot_archive(
let temp_dir = tempfile::tempdir_in(bank_snapshots_dir)?;
let snapshot_storages = bank.get_snapshot_storages(Some(full_snapshot_slot));
let bank_snapshot_info =
add_bank_snapshot(&temp_dir, bank, &snapshot_storages, snapshot_version)?;
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
let bank_snapshot_info = add_bank_snapshot(
&temp_dir,
bank,
&snapshot_storages,
snapshot_version,
slot_deltas,
)?;
package_and_archive_incremental_snapshot(
bank,
@ -2332,13 +2380,11 @@ pub fn package_and_archive_full_snapshot(
maximum_full_snapshot_archives_to_retain: usize,
maximum_incremental_snapshot_archives_to_retain: usize,
) -> Result<FullSnapshotArchiveInfo> {
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
let accounts_package = AccountsPackage::new_for_snapshot(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
bank,
bank_snapshot_info,
bank_snapshots_dir,
slot_deltas,
&full_snapshot_archives_dir,
&incremental_snapshot_archives_dir,
snapshot_storages,
@ -2386,7 +2432,6 @@ pub fn package_and_archive_incremental_snapshot(
maximum_full_snapshot_archives_to_retain: usize,
maximum_incremental_snapshot_archives_to_retain: usize,
) -> Result<IncrementalSnapshotArchiveInfo> {
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
let accounts_package = AccountsPackage::new_for_snapshot(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(
incremental_snapshot_base_slot,
@ -2394,7 +2439,6 @@ pub fn package_and_archive_incremental_snapshot(
bank,
bank_snapshot_info,
bank_snapshots_dir,
slot_deltas,
&full_snapshot_archives_dir,
&incremental_snapshot_archives_dir,
snapshot_storages,