From f107b8b6074c25aed4ceb97a6bf9751ad0e6cfba Mon Sep 17 00:00:00 2001 From: Xiang Zhu Date: Wed, 1 Feb 2023 16:51:32 -0800 Subject: [PATCH] Add slot deltas into the bank snapshot directory (#29409) --- core/src/snapshot_packager_service.rs | 1 - core/tests/snapshots.rs | 33 ++--- runtime/src/accounts_background_service.rs | 2 +- runtime/src/snapshot_package.rs | 23 ++-- runtime/src/snapshot_utils.rs | 152 +++++++++++++-------- 5 files changed, 122 insertions(+), 89 deletions(-) diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index ca8edd609..8740bd35e 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -331,7 +331,6 @@ mod tests { archive_format, }, block_height: slot, - slot_deltas: vec![], snapshot_links: link_snapshots_dir, snapshot_storages: storage_entries, snapshot_version: SnapshotVersion::default(), diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 68450a112..b8c21bac5 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -2,7 +2,6 @@ use { crate::snapshot_utils::create_tmp_accounts_dir_for_tests, - bincode::serialize_into, crossbeam_channel::unbounded, fs_extra::dir::CopyOptions, itertools::Itertools, @@ -22,7 +21,7 @@ use { accounts_db::{self, ACCOUNTS_DB_CONFIG_FOR_TESTING}, accounts_hash::AccountsHash, accounts_index::AccountSecondaryIndexes, - bank::{Bank, BankSlotDelta}, + bank::Bank, bank_forks::BankForks, epoch_accounts_hash::EpochAccountsHash, genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, @@ -266,13 +265,11 @@ fn run_bank_forks_snapshot_n( let bank_snapshots_dir = &snapshot_config.bank_snapshots_dir; let last_bank_snapshot_info = snapshot_utils::get_highest_bank_snapshot_pre(bank_snapshots_dir) .expect("no bank snapshots found in path"); - let slot_deltas = last_bank.status_cache.read().unwrap().root_slot_deltas(); let accounts_package = AccountsPackage::new_for_snapshot( AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), &last_bank, &last_bank_snapshot_info, bank_snapshots_dir, - slot_deltas, &snapshot_config.full_snapshot_archives_dir, &snapshot_config.incremental_snapshot_archives_dir, last_bank.get_snapshot_storages(None), @@ -371,8 +368,15 @@ fn test_concurrent_snapshot_packaging( // Take snapshot of zeroth bank let bank0 = bank_forks.get(0).unwrap(); let storages = bank0.get_snapshot_storages(None); - snapshot_utils::add_bank_snapshot(bank_snapshots_dir, &bank0, &storages, snapshot_version) - .unwrap(); + let slot_deltas = bank0.status_cache.read().unwrap().root_slot_deltas(); + snapshot_utils::add_bank_snapshot( + bank_snapshots_dir, + &bank0, + &storages, + snapshot_version, + slot_deltas, + ) + .unwrap(); // Set up snapshotting channels let (real_accounts_package_sender, real_accounts_package_receiver) = @@ -416,11 +420,13 @@ fn test_concurrent_snapshot_packaging( }; let snapshot_storages = bank.get_snapshot_storages(None); + let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas(); let bank_snapshot_info = snapshot_utils::add_bank_snapshot( bank_snapshots_dir, &bank, &snapshot_storages, snapshot_config.snapshot_version, + slot_deltas, ) .unwrap(); let accounts_package = AccountsPackage::new_for_snapshot( @@ -428,7 +434,6 @@ fn test_concurrent_snapshot_packaging( &bank, &bank_snapshot_info, bank_snapshots_dir, - vec![], full_snapshot_archives_dir, incremental_snapshot_archives_dir, snapshot_storages, @@ -561,20 +566,6 @@ fn test_concurrent_snapshot_packaging( // Check the archive we cached the state for earlier was generated correctly - // before we compare, stick an empty status_cache in this dir so that the package comparison works - // This is needed since the status_cache is added by the packager and is not collected from - // the source dir for snapshots - snapshot_utils::serialize_snapshot_data_file( - &saved_snapshots_dir - .path() - .join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILENAME), - |stream| { - serialize_into(stream, &[] as &[BankSlotDelta])?; - Ok(()) - }, - ) - .unwrap(); - // files were saved off before we reserialized the bank in the hacked up accounts_hash_verifier stand-in. solana_runtime::serde_snapshot::reserialize_bank_with_new_accounts_hash( saved_snapshots_dir.path(), diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index 3ee9db024..1d5ba1d6c 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -357,6 +357,7 @@ impl SnapshotRequestHandler { &snapshot_root_bank, &snapshot_storages, self.snapshot_config.snapshot_version, + status_cache_slot_deltas, ) .expect("snapshot bank"); AccountsPackage::new_for_snapshot( @@ -364,7 +365,6 @@ impl SnapshotRequestHandler { &snapshot_root_bank, &bank_snapshot_info, &self.snapshot_config.bank_snapshots_dir, - status_cache_slot_deltas, &self.snapshot_config.full_snapshot_archives_dir, &self.snapshot_config.incremental_snapshot_archives_dir, snapshot_storages, diff --git a/runtime/src/snapshot_package.rs b/runtime/src/snapshot_package.rs index 00ba3b6be..c1c5513ac 100644 --- a/runtime/src/snapshot_package.rs +++ b/runtime/src/snapshot_package.rs @@ -3,14 +3,14 @@ use { accounts::Accounts, accounts_db::AccountStorageEntry, accounts_hash::AccountsHash, - bank::{Bank, BankSlotDelta}, + bank::Bank, epoch_accounts_hash::EpochAccountsHash, rent_collector::RentCollector, snapshot_archive_info::{SnapshotArchiveInfo, SnapshotArchiveInfoGetter}, snapshot_hash::SnapshotHash, snapshot_utils::{ self, ArchiveFormat, BankSnapshotInfo, Result, SnapshotVersion, - TMP_BANK_SNAPSHOT_PREFIX, + SNAPSHOT_STATUS_CACHE_FILENAME, TMP_BANK_SNAPSHOT_PREFIX, }, }, log::*, @@ -59,7 +59,6 @@ impl AccountsPackage { bank: &Bank, bank_snapshot_info: &BankSnapshotInfo, bank_snapshots_dir: impl AsRef, - slot_deltas: Vec, full_snapshot_archives_dir: impl AsRef, incremental_snapshot_archives_dir: impl AsRef, snapshot_storages: Vec>, @@ -92,16 +91,20 @@ impl AccountsPackage { .path() .join(bank_snapshot_info.slot.to_string()); fs::create_dir_all(&snapshot_hardlink_dir)?; - let file_name = - snapshot_utils::path_to_file_name_str(&bank_snapshot_info.snapshot_path)?; + let snapshot_path = bank_snapshot_info.snapshot_path(); + let file_name = snapshot_utils::path_to_file_name_str(&snapshot_path)?; + fs::hard_link(&snapshot_path, snapshot_hardlink_dir.join(file_name))?; + let status_cache_path = bank_snapshot_info + .snapshot_dir + .join(SNAPSHOT_STATUS_CACHE_FILENAME); + let status_cache_file_name = snapshot_utils::path_to_file_name_str(&status_cache_path)?; fs::hard_link( - &bank_snapshot_info.snapshot_path, - snapshot_hardlink_dir.join(file_name), + &status_cache_path, + snapshot_links.path().join(status_cache_file_name), )?; } let snapshot_info = SupplementalSnapshotInfo { - slot_deltas, snapshot_links, archive_format, snapshot_version, @@ -174,7 +177,6 @@ impl AccountsPackage { epoch_schedule: EpochSchedule::default(), rent_collector: RentCollector::default(), snapshot_info: Some(SupplementalSnapshotInfo { - slot_deltas: Vec::default(), snapshot_links: TempDir::new().unwrap(), archive_format: ArchiveFormat::Tar, snapshot_version: SnapshotVersion::default(), @@ -215,7 +217,6 @@ impl std::fmt::Debug for AccountsPackage { /// Supplemental information needed for snapshots pub struct SupplementalSnapshotInfo { - pub slot_deltas: Vec, pub snapshot_links: TempDir, pub archive_format: ArchiveFormat, pub snapshot_version: SnapshotVersion, @@ -237,7 +238,6 @@ pub enum AccountsPackageType { pub struct SnapshotPackage { pub snapshot_archive_info: SnapshotArchiveInfo, pub block_height: Slot, - pub slot_deltas: Vec, pub snapshot_links: TempDir, pub snapshot_storages: Vec>, pub snapshot_version: SnapshotVersion, @@ -286,7 +286,6 @@ impl SnapshotPackage { archive_format: snapshot_info.archive_format, }, block_height: accounts_package.block_height, - slot_deltas: snapshot_info.slot_deltas, snapshot_links: snapshot_info.snapshot_links, snapshot_storages, snapshot_version: snapshot_info.snapshot_version, diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 2450edbc0..30f1ef2ab 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -136,10 +136,10 @@ impl SnapshotVersion { pub struct BankSnapshotInfo { /// Slot of the bank pub slot: Slot, - /// Path to the snapshot - pub snapshot_path: PathBuf, /// Type of the snapshot pub snapshot_type: BankSnapshotType, + /// Path to the bank snapshot directory + pub snapshot_dir: PathBuf, } impl PartialOrd for BankSnapshotInfo { @@ -155,6 +155,49 @@ impl Ord for BankSnapshotInfo { } } +impl BankSnapshotInfo { + pub fn new_from_dir( + bank_snapshots_dir: impl AsRef, + slot: Slot, + ) -> Option { + // check this directory to see if there is a BankSnapshotPre and/or + // BankSnapshotPost file + let bank_snapshot_dir = get_bank_snapshots_dir(&bank_snapshots_dir, slot); + let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot)); + let mut bank_snapshot_pre_path = bank_snapshot_post_path.clone(); + bank_snapshot_pre_path.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION); + + if bank_snapshot_pre_path.is_file() { + return Some(BankSnapshotInfo { + slot, + snapshot_type: BankSnapshotType::Pre, + snapshot_dir: bank_snapshot_dir, + }); + } + + if bank_snapshot_post_path.is_file() { + return Some(BankSnapshotInfo { + slot, + snapshot_type: BankSnapshotType::Post, + snapshot_dir: bank_snapshot_dir, + }); + } + + None + } + + pub fn snapshot_path(&self) -> PathBuf { + let mut bank_snapshot_path = self.snapshot_dir.join(get_snapshot_file_name(self.slot)); + + let ext = match self.snapshot_type { + BankSnapshotType::Pre => BANK_SNAPSHOT_PRE_FILENAME_EXTENSION, + BankSnapshotType::Post => "", + }; + bank_snapshot_path.set_extension(ext); + + bank_snapshot_path + } +} /// Bank snapshots traditionally had their accounts hash calculated prior to serialization. Since /// the hash calculation takes a long time, an optimization has been put in to offload the accounts /// hash calculation. The bank serialization format has not changed, so we need another way to @@ -396,15 +439,6 @@ pub fn archive_snapshot_package( snapshot_package.slot() ); - serialize_status_cache( - snapshot_package.slot(), - &snapshot_package.slot_deltas, - &snapshot_package - .snapshot_links - .path() - .join(SNAPSHOT_STATUS_CACHE_FILENAME), - )?; - let mut timer = Measure::start("snapshot_package-package_snapshots"); let tar_dir = snapshot_package .path() @@ -594,28 +628,10 @@ pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef) -> Vec], snapshot_version: SnapshotVersion, + slot_deltas: Vec, ) -> Result { let mut add_snapshot_time = Measure::start("add-snapshot-ms"); let slot = bank.slot(); // bank_snapshots_dir/slot - let bank_snapshots_dir = get_bank_snapshots_dir(bank_snapshots_dir, slot); - fs::create_dir_all(&bank_snapshots_dir)?; + let bank_snapshot_dir = get_bank_snapshots_dir(bank_snapshots_dir, slot); + fs::create_dir_all(&bank_snapshot_dir)?; // the bank snapshot is stored as bank_snapshots_dir/slot/slot.BANK_SNAPSHOT_PRE_FILENAME_EXTENSION - let mut bank_snapshot_path = bank_snapshots_dir.join(get_snapshot_file_name(slot)); + let mut bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot)); bank_snapshot_path.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION); info!( @@ -907,6 +924,9 @@ pub fn add_bank_snapshot( bank_serialize.stop(); add_snapshot_time.stop(); + let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME); + serialize_status_cache(slot, &slot_deltas, &status_cache_path)?; + // Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE datapoint_info!( "snapshot-bank-file", @@ -926,8 +946,8 @@ pub fn add_bank_snapshot( Ok(BankSnapshotInfo { slot, - snapshot_path: bank_snapshot_path, snapshot_type: BankSnapshotType::Pre, + snapshot_dir: bank_snapshot_dir, }) } @@ -1883,16 +1903,16 @@ fn bank_fields_from_snapshots( }; info!( "Loading bank from full snapshot {} and incremental snapshot {:?}", - full_snapshot_root_paths.snapshot_path.display(), + full_snapshot_root_paths.snapshot_path().display(), incremental_snapshot_root_paths .as_ref() - .map(|paths| paths.snapshot_path.display()), + .map(|paths| paths.snapshot_path()), ); let snapshot_root_paths = SnapshotRootPaths { - full_snapshot_root_file_path: full_snapshot_root_paths.snapshot_path, + full_snapshot_root_file_path: full_snapshot_root_paths.snapshot_path(), incremental_snapshot_root_file_path: incremental_snapshot_root_paths - .map(|root_paths| root_paths.snapshot_path), + .map(|root_paths| root_paths.snapshot_path()), }; deserialize_snapshot_data_files(&snapshot_root_paths, |snapshot_streams| { @@ -1942,16 +1962,16 @@ fn rebuild_bank_from_snapshots( }; info!( "Loading bank from full snapshot {} and incremental snapshot {:?}", - full_snapshot_root_paths.snapshot_path.display(), + full_snapshot_root_paths.snapshot_path().display(), incremental_snapshot_root_paths .as_ref() - .map(|paths| paths.snapshot_path.display()), + .map(|paths| paths.snapshot_path()), ); let snapshot_root_paths = SnapshotRootPaths { - full_snapshot_root_file_path: full_snapshot_root_paths.snapshot_path, + full_snapshot_root_file_path: full_snapshot_root_paths.snapshot_path(), incremental_snapshot_root_file_path: incremental_snapshot_root_paths - .map(|root_paths| root_paths.snapshot_path), + .map(|root_paths| root_paths.snapshot_path()), }; let bank = deserialize_snapshot_data_files(&snapshot_root_paths, |snapshot_streams| { @@ -2172,6 +2192,22 @@ pub fn verify_snapshot_archive( assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap()); std::fs::remove_file(p1).unwrap(); std::fs::remove_file(p2).unwrap(); + + // The new the status_cache file is inside the slot directory together with the snapshot file. + // When unpacking an archive, the status_cache file from the archive is one-level up outside of + // the slot direcotry. + // The unpacked status_cache file need to be put back into the slot directory for the directory + // comparison to pass. + let existing_unpacked_status_cache_file = + unpacked_snapshots.join(SNAPSHOT_STATUS_CACHE_FILENAME); + let new_unpacked_status_cache_file = unpacked_snapshots + .join(&slot) + .join(SNAPSHOT_STATUS_CACHE_FILENAME); + fs::rename( + existing_unpacked_status_cache_file, + new_unpacked_status_cache_file, + ) + .unwrap(); } assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap()); @@ -2199,7 +2235,7 @@ pub fn purge_old_bank_snapshots(bank_snapshots_dir: impl AsRef) { if r.is_err() { warn!( "Couldn't remove bank snapshot at: {}", - bank_snapshot.snapshot_path.display() + bank_snapshot.snapshot_dir.display() ); } }) @@ -2253,8 +2289,14 @@ pub fn bank_to_full_snapshot_archive( let temp_dir = tempfile::tempdir_in(bank_snapshots_dir)?; let snapshot_storages = bank.get_snapshot_storages(None); - let bank_snapshot_info = - add_bank_snapshot(&temp_dir, bank, &snapshot_storages, snapshot_version)?; + let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas(); + let bank_snapshot_info = add_bank_snapshot( + &temp_dir, + bank, + &snapshot_storages, + snapshot_version, + slot_deltas, + )?; package_and_archive_full_snapshot( bank, @@ -2300,8 +2342,14 @@ pub fn bank_to_incremental_snapshot_archive( let temp_dir = tempfile::tempdir_in(bank_snapshots_dir)?; let snapshot_storages = bank.get_snapshot_storages(Some(full_snapshot_slot)); - let bank_snapshot_info = - add_bank_snapshot(&temp_dir, bank, &snapshot_storages, snapshot_version)?; + let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas(); + let bank_snapshot_info = add_bank_snapshot( + &temp_dir, + bank, + &snapshot_storages, + snapshot_version, + slot_deltas, + )?; package_and_archive_incremental_snapshot( bank, @@ -2332,13 +2380,11 @@ pub fn package_and_archive_full_snapshot( maximum_full_snapshot_archives_to_retain: usize, maximum_incremental_snapshot_archives_to_retain: usize, ) -> Result { - let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas(); let accounts_package = AccountsPackage::new_for_snapshot( AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), bank, bank_snapshot_info, bank_snapshots_dir, - slot_deltas, &full_snapshot_archives_dir, &incremental_snapshot_archives_dir, snapshot_storages, @@ -2386,7 +2432,6 @@ pub fn package_and_archive_incremental_snapshot( maximum_full_snapshot_archives_to_retain: usize, maximum_incremental_snapshot_archives_to_retain: usize, ) -> Result { - let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas(); let accounts_package = AccountsPackage::new_for_snapshot( AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot( incremental_snapshot_base_slot, @@ -2394,7 +2439,6 @@ pub fn package_and_archive_incremental_snapshot( bank, bank_snapshot_info, bank_snapshots_dir, - slot_deltas, &full_snapshot_archives_dir, &incremental_snapshot_archives_dir, snapshot_storages,