From b05fb87f22658e6a8f1d5a2b1201007e6e6f46c6 Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Thu, 29 Jul 2021 16:46:54 -0500 Subject: [PATCH] Add test_bank_forks_incremental_snapshot() (#18565) This commit builds on PR #18504 by adding a test to core/tests/snapshot.rs for Incremental Snapshots. The test adds banks to bank forks in a loop and takes both full snapshots and incremental snapshots at intervals, and validates they are rebuild-able. For background info about Incremental Snapshots, see #17088. Fixes #18829 and #18972 --- core/tests/snapshots.rs | 281 +++++++++++++++++++++++++++++++--- runtime/src/snapshot_utils.rs | 111 +++++++++++--- 2 files changed, 352 insertions(+), 40 deletions(-) diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 476e294365..5be4eb825f 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -29,6 +29,11 @@ macro_rules! DEFINE_SNAPSHOT_VERSION_PARAMETERIZED_TEST_FUNCTIONS { fn test_slots_to_snapshot() { run_test_slots_to_snapshot(SNAPSHOT_VERSION, CLUSTER_TYPE) } + + #[test] + fn test_bank_forks_incremental_snapshot_n() { + run_test_bank_forks_incremental_snapshot_n(SNAPSHOT_VERSION, CLUSTER_TYPE) + } } }; } @@ -39,6 +44,7 @@ mod tests { use crossbeam_channel::unbounded; use fs_extra::dir::CopyOptions; use itertools::Itertools; + use log::{info, trace}; use solana_core::snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService}; use solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}; use solana_runtime::{ @@ -57,7 +63,7 @@ mod tests { use solana_sdk::{ clock::Slot, genesis_config::{ClusterType, GenesisConfig}, - hash::hashv, + hash::{hashv, Hash}, pubkey::Pubkey, signature::{Keypair, Signer}, system_transaction, @@ -66,6 +72,7 @@ mod tests { use std::{ collections::HashSet, fs, + io::{Error, ErrorKind}, path::PathBuf, sync::{ atomic::{AtomicBool, Ordering}, @@ -83,8 +90,8 @@ mod tests { struct SnapshotTestConfig { accounts_dir: TempDir, - snapshot_dir: TempDir, - _snapshot_output_path: TempDir, + bank_snapshots_dir: TempDir, + snapshot_archives_dir: TempDir, snapshot_config: SnapshotConfig, bank_forks: BankForks, genesis_config_info: GenesisConfigInfo, @@ -94,11 +101,12 @@ mod tests { fn new( snapshot_version: SnapshotVersion, cluster_type: ClusterType, - snapshot_interval_slots: u64, + accounts_hash_interval_slots: Slot, + snapshot_interval_slots: Slot, ) -> SnapshotTestConfig { let accounts_dir = TempDir::new().unwrap(); - let snapshot_dir = TempDir::new().unwrap(); - let snapshot_output_path = TempDir::new().unwrap(); + let bank_snapshots_dir = TempDir::new().unwrap(); + let snapshot_archives_dir = TempDir::new().unwrap(); let mut genesis_config_info = create_genesis_config(10_000); genesis_config_info.genesis_config.cluster_type = cluster_type; let bank0 = Bank::new_with_paths( @@ -114,12 +122,12 @@ mod tests { ); bank0.freeze(); let mut bank_forks = BankForks::new(bank0); - bank_forks.accounts_hash_interval_slots = snapshot_interval_slots; + bank_forks.accounts_hash_interval_slots = accounts_hash_interval_slots; let snapshot_config = SnapshotConfig { snapshot_interval_slots, - snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()), - snapshot_path: PathBuf::from(snapshot_dir.path()), + snapshot_package_output_path: snapshot_archives_dir.path().to_path_buf(), + snapshot_path: bank_snapshots_dir.path().to_path_buf(), archive_format: ArchiveFormat::TarBzip2, snapshot_version, maximum_snapshots_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, @@ -127,8 +135,8 @@ mod tests { bank_forks.set_snapshot_config(Some(snapshot_config.clone())); SnapshotTestConfig { accounts_dir, - snapshot_dir, - _snapshot_output_path: snapshot_output_path, + bank_snapshots_dir, + snapshot_archives_dir, snapshot_config, bank_forks, genesis_config_info, @@ -185,9 +193,9 @@ mod tests { .clone(); assert_eq!(*bank, deserialized_bank); - let bank_snapshot_infos = snapshot_utils::get_bank_snapshots(&snapshot_path); + let bank_snapshots = snapshot_utils::get_bank_snapshots(&snapshot_path); - for p in bank_snapshot_infos { + for p in bank_snapshots { snapshot_utils::remove_bank_snapshot(p.slot, &snapshot_path).unwrap(); } } @@ -207,7 +215,12 @@ mod tests { { solana_logger::setup(); // Set up snapshotting config - let mut snapshot_test_config = SnapshotTestConfig::new(snapshot_version, cluster_type, 1); + let mut snapshot_test_config = SnapshotTestConfig::new( + snapshot_version, + cluster_type, + set_root_interval, + set_root_interval, + ); let bank_forks = &mut snapshot_test_config.bank_forks; let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair; @@ -316,10 +329,11 @@ mod tests { solana_logger::setup(); // Set up snapshotting config - let mut snapshot_test_config = SnapshotTestConfig::new(snapshot_version, cluster_type, 1); + let mut snapshot_test_config = + SnapshotTestConfig::new(snapshot_version, cluster_type, 1, 1); let bank_forks = &mut snapshot_test_config.bank_forks; - let snapshots_dir = &snapshot_test_config.snapshot_dir; + let bank_snapshots_dir = &snapshot_test_config.bank_snapshots_dir; let snapshot_config = &snapshot_test_config.snapshot_config; let snapshot_path = &snapshot_config.snapshot_path; let snapshot_package_output_path = &snapshot_config.snapshot_package_output_path; @@ -438,9 +452,9 @@ mod tests { // currently sitting in the channel snapshot_utils::purge_old_bank_snapshots(snapshot_path); - let mut bank_snapshot_infos = snapshot_utils::get_bank_snapshots(&snapshots_dir); - bank_snapshot_infos.sort_unstable(); - assert!(bank_snapshot_infos + let mut bank_snapshots = snapshot_utils::get_bank_snapshots(&bank_snapshots_dir); + bank_snapshots.sort_unstable(); + assert!(bank_snapshots .into_iter() .map(|path| path.slot) .eq(3..=snapshot_utils::MAX_BANK_SNAPSHOTS as u64 + 2)); @@ -541,7 +555,8 @@ mod tests { let mut snapshot_test_config = SnapshotTestConfig::new( snapshot_version, cluster_type, - (*add_root_interval * num_set_roots * 2) as u64, + (*add_root_interval * num_set_roots * 2) as Slot, + (*add_root_interval * num_set_roots * 2) as Slot, ); let mut current_bank = snapshot_test_config.bank_forks[0].clone(); let request_sender = AbsRequestSender::new(Some(snapshot_sender)); @@ -610,4 +625,230 @@ mod tests { ); } } + + fn run_test_bank_forks_incremental_snapshot_n( + snapshot_version: SnapshotVersion, + cluster_type: ClusterType, + ) { + solana_logger::setup(); + + const SET_ROOT_INTERVAL: Slot = 2; + const INCREMENTAL_SNAPSHOT_INTERVAL_SLOTS: Slot = SET_ROOT_INTERVAL * 2; + const FULL_SNAPSHOT_INTERVAL_SLOTS: Slot = INCREMENTAL_SNAPSHOT_INTERVAL_SLOTS * 5; + const LAST_SLOT: Slot = FULL_SNAPSHOT_INTERVAL_SLOTS * 2 - 1; + + info!("Running bank forks incremental snapshot test, full snapshot interval: {} slots, incremental snapshot interval: {} slots, last slot: {}, set root interval: {} slots", + FULL_SNAPSHOT_INTERVAL_SLOTS, INCREMENTAL_SNAPSHOT_INTERVAL_SLOTS, LAST_SLOT, SET_ROOT_INTERVAL); + + let mut snapshot_test_config = SnapshotTestConfig::new( + snapshot_version, + cluster_type, + SET_ROOT_INTERVAL, + FULL_SNAPSHOT_INTERVAL_SLOTS, + ); + trace!("SnapshotTestConfig:\naccounts_dir: {}\nbank_snapshots_dir: {}\nsnapshot_archives_dir: {}", snapshot_test_config.accounts_dir.path().display(), snapshot_test_config.bank_snapshots_dir.path().display(), snapshot_test_config.snapshot_archives_dir.path().display()); + + let bank_forks = &mut snapshot_test_config.bank_forks; + let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair; + + let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); + let (accounts_package_sender, _accounts_package_receiver) = channel(); + let request_sender = AbsRequestSender::new(Some(snapshot_request_sender)); + let snapshot_request_handler = SnapshotRequestHandler { + snapshot_config: snapshot_test_config.snapshot_config.clone(), + snapshot_request_receiver, + accounts_package_sender, + }; + + let mut last_full_snapshot_slot = None; + for slot in 1..=LAST_SLOT { + // Make a new bank and perform some transactions + let bank = { + let bank = Bank::new_from_parent(&bank_forks[slot - 1], &Pubkey::default(), slot); + + let key = Keypair::new().pubkey(); + let tx = system_transaction::transfer(mint_keypair, &key, 1, bank.last_blockhash()); + assert_eq!(bank.process_transaction(&tx), Ok(())); + + let key = Keypair::new().pubkey(); + let tx = system_transaction::transfer(mint_keypair, &key, 0, bank.last_blockhash()); + assert_eq!(bank.process_transaction(&tx), Ok(())); + + while !bank.is_complete() { + bank.register_tick(&Hash::new_unique()); + } + + bank_forks.insert(bank) + }; + + // Set root to make sure we don't end up with too many account storage entries + // and to allow snapshotting of bank and the purging logic on status_cache to + // kick in + if slot % SET_ROOT_INTERVAL == 0 { + // set_root sends a snapshot request + bank_forks.set_root(bank.slot(), &request_sender, None); + bank.update_accounts_hash(); + snapshot_request_handler.handle_snapshot_requests(false, false, false, 0); + } + + // Since AccountsBackgroundService isn't running, manually make a full snapshot archive + // at the right interval + if slot % FULL_SNAPSHOT_INTERVAL_SLOTS == 0 { + make_full_snapshot_archive(&bank, &snapshot_test_config.snapshot_config).unwrap(); + last_full_snapshot_slot = Some(slot); + } + // Similarly, make an incremental snapshot archive at the right interval, but only if + // there's been at least one full snapshot first, and a full snapshot wasn't already + // taken at this slot. + // + // Then, after making an incremental snapshot, restore the bank and verify it is correct + else if slot % INCREMENTAL_SNAPSHOT_INTERVAL_SLOTS == 0 + && last_full_snapshot_slot.is_some() + && slot != last_full_snapshot_slot.unwrap() + { + make_incremental_snapshot_archive( + &bank, + last_full_snapshot_slot.unwrap(), + &snapshot_test_config.snapshot_config, + ) + .unwrap(); + + restore_from_incremental_snapshot_and_check_banks_are_equal( + &bank, + last_full_snapshot_slot.unwrap(), + &snapshot_test_config.snapshot_config, + snapshot_test_config.accounts_dir.path().to_path_buf(), + &snapshot_test_config.genesis_config_info.genesis_config, + ) + .unwrap(); + } + } + } + + fn make_full_snapshot_archive( + bank: &Bank, + snapshot_config: &SnapshotConfig, + ) -> snapshot_utils::Result<()> { + let slot = bank.slot(); + info!("Making full snapshot archive from bank at slot: {}", slot); + let bank_snapshot_info = snapshot_utils::get_bank_snapshots(&snapshot_config.snapshot_path) + .into_iter() + .find(|elem| elem.slot == slot) + .ok_or_else(|| Error::new(ErrorKind::Other, "did not find snapshot with this path"))?; + snapshot_utils::package_process_and_archive_full_snapshot( + bank, + &bank_snapshot_info, + &snapshot_config.snapshot_path, + &snapshot_config.snapshot_package_output_path, + bank.get_snapshot_storages(), + snapshot_config.archive_format, + snapshot_config.snapshot_version, + None, + snapshot_config.maximum_snapshots_to_retain, + )?; + + Ok(()) + } + + fn make_incremental_snapshot_archive( + bank: &Bank, + incremental_snapshot_base_slot: Slot, + snapshot_config: &SnapshotConfig, + ) -> snapshot_utils::Result<()> { + let slot = bank.slot(); + info!( + "Making incremental snapshot archive from bank at slot: {}, and base slot: {}", + slot, incremental_snapshot_base_slot, + ); + let bank_snapshot_info = snapshot_utils::get_bank_snapshots(&snapshot_config.snapshot_path) + .into_iter() + .find(|elem| elem.slot == slot) + .ok_or_else(|| Error::new(ErrorKind::Other, "did not find snapshot with this path"))?; + snapshot_utils::package_process_and_archive_incremental_snapshot( + bank, + incremental_snapshot_base_slot, + &bank_snapshot_info, + &snapshot_config.snapshot_path, + &snapshot_config.snapshot_package_output_path, + bank.get_incremental_snapshot_storages(incremental_snapshot_base_slot), + snapshot_config.archive_format, + snapshot_config.snapshot_version, + None, + snapshot_config.maximum_snapshots_to_retain, + )?; + + Ok(()) + } + + fn restore_from_incremental_snapshot_and_check_banks_are_equal( + bank: &Bank, + last_full_snapshot_slot: Slot, + snapshot_config: &SnapshotConfig, + accounts_dir: PathBuf, + genesis_config: &GenesisConfig, + ) -> snapshot_utils::Result<()> { + let ( + full_snapshot_archive_slot, + (incremental_snapshot_archive_base_slot, incremental_snapshot_archive_slot), + deserialized_bank, + ) = restore_from_incremental_snapshot(snapshot_config, accounts_dir, genesis_config)?; + + assert_eq!( + full_snapshot_archive_slot, + incremental_snapshot_archive_base_slot + ); + assert_eq!(full_snapshot_archive_slot, last_full_snapshot_slot); + assert_eq!(incremental_snapshot_archive_slot, bank.slot(),); + assert_eq!(*bank, deserialized_bank); + + Ok(()) + } + + fn restore_from_incremental_snapshot( + snapshot_config: &SnapshotConfig, + accounts_dir: PathBuf, + genesis_config: &GenesisConfig, + ) -> snapshot_utils::Result<(Slot, (Slot, Slot), Bank)> { + let full_snapshot_archive_info = snapshot_utils::get_highest_full_snapshot_archive_info( + &snapshot_config.snapshot_package_output_path, + ) + .ok_or_else(|| Error::new(ErrorKind::Other, "no full snapshot"))?; + + let incremental_snapshot_archive_info = + snapshot_utils::get_highest_incremental_snapshot_archive_info( + &snapshot_config.snapshot_package_output_path, + *full_snapshot_archive_info.slot(), + ) + .ok_or_else(|| Error::new(ErrorKind::Other, "no incremental snapshot"))?; + + info!("Restoring bank from full snapshot slot: {}, and incremental snapshot slot: {} (with base slot: {})", + full_snapshot_archive_info.slot(), incremental_snapshot_archive_info.slot(), incremental_snapshot_archive_info.base_slot()); + + let (deserialized_bank, _) = snapshot_utils::bank_from_snapshot_archives( + &[accounts_dir], + &[], + &snapshot_config.snapshot_path, + full_snapshot_archive_info.path(), + Some(incremental_snapshot_archive_info.path()), + snapshot_config.archive_format, + genesis_config, + None, + None, + AccountSecondaryIndexes::default(), + false, + None, + accounts_db::AccountShrinkThreshold::default(), + false, + false, + )?; + + Ok(( + *full_snapshot_archive_info.slot(), + ( + *incremental_snapshot_archive_info.base_slot(), + *incremental_snapshot_archive_info.slot(), + ), + deserialized_bank, + )) + } } diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 84f668fb9f..05f62205fb 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -133,15 +133,15 @@ impl IncrementalSnapshotArchiveInfo { }) } - fn path(&self) -> &PathBuf { + pub fn path(&self) -> &PathBuf { &self.inner.path } - fn base_slot(&self) -> &Slot { + pub fn base_slot(&self) -> &Slot { &self.base_slot } - fn slot(&self) -> &Slot { + pub fn slot(&self) -> &Slot { &self.inner.slot } @@ -368,7 +368,7 @@ where /// Package up bank snapshot files, snapshot storages, and slot deltas for an incremental snapshot. #[allow(clippy::too_many_arguments)] -fn package_incremental_snapshot( +pub fn package_incremental_snapshot( bank: &Bank, incremental_snapshot_base_slot: Slot, bank_snapshot_info: &BankSnapshotInfo, @@ -420,6 +420,7 @@ where ) } +/// Create a snapshot package fn do_package_snapshot

( bank: &Bank, bank_snapshot_info: &BankSnapshotInfo, @@ -434,8 +435,6 @@ fn do_package_snapshot

( where P: AsRef, { - // Create a snapshot package - // Hard link the snapshot into a tmpdir, to ensure its not removed prior to packaging. { let snapshot_hardlink_dir = snapshot_tmpdir @@ -1777,25 +1776,20 @@ pub fn bank_to_full_snapshot_archive, Q: AsRef>( bank.rehash(); // Bank accounts may have been manually modified by the caller let temp_dir = tempfile::tempdir_in(snapshots_dir)?; - let storages = bank.get_snapshot_storages(); let bank_snapshot_info = add_bank_snapshot(&temp_dir, bank, &storages, snapshot_version)?; - let package = package_full_snapshot( + + package_process_and_archive_full_snapshot( bank, &bank_snapshot_info, &temp_dir, - bank.src.slot_deltas(&bank.src.roots()), snapshot_package_output_path, storages, archive_format, snapshot_version, - None, - )?; - - let package = process_accounts_package_pre(package, thread_pool, None); - - archive_snapshot_package(&package, maximum_snapshots_to_retain)?; - Ok(package.tar_output_file) + thread_pool, + maximum_snapshots_to_retain, + ) } /// Convenience function to create an incremental snapshot archive out of any Bank, regardless of @@ -1825,25 +1819,102 @@ pub fn bank_to_incremental_snapshot_archive, Q: AsRef>( bank.rehash(); // Bank accounts may have been manually modified by the caller let temp_dir = tempfile::tempdir_in(snapshots_dir)?; - let storages = bank.get_incremental_snapshot_storages(full_snapshot_slot); let bank_snapshot_info = add_bank_snapshot(&temp_dir, bank, &storages, snapshot_version)?; - let package = package_incremental_snapshot( + + package_process_and_archive_incremental_snapshot( bank, full_snapshot_slot, &bank_snapshot_info, &temp_dir, - bank.src.slot_deltas(&bank.src.roots()), snapshot_package_output_path, storages, archive_format, snapshot_version, + thread_pool, + maximum_snapshots_to_retain, + ) +} + +/// Helper function to hold shared code to package, process, and archive full snapshots +pub fn package_process_and_archive_full_snapshot( + bank: &Bank, + bank_snapshot_info: &BankSnapshotInfo, + snapshots_dir: impl AsRef, + snapshot_package_output_path: impl AsRef, + snapshot_storages: SnapshotStorages, + archive_format: ArchiveFormat, + snapshot_version: SnapshotVersion, + thread_pool: Option<&ThreadPool>, + maximum_snapshots_to_retain: usize, +) -> Result { + let package = package_full_snapshot( + bank, + bank_snapshot_info, + snapshots_dir, + bank.src.slot_deltas(&bank.src.roots()), + snapshot_package_output_path, + snapshot_storages, + archive_format, + snapshot_version, None, )?; - let package = process_accounts_package_pre(package, thread_pool, Some(full_snapshot_slot)); + process_and_archive_snapshot_package_pre( + package, + thread_pool, + None, + maximum_snapshots_to_retain, + ) +} + +/// Helper function to hold shared code to package, process, and archive incremental snapshots +#[allow(clippy::too_many_arguments)] +pub fn package_process_and_archive_incremental_snapshot( + bank: &Bank, + incremental_snapshot_base_slot: Slot, + bank_snapshot_info: &BankSnapshotInfo, + snapshots_dir: impl AsRef, + snapshot_package_output_path: impl AsRef, + snapshot_storages: SnapshotStorages, + archive_format: ArchiveFormat, + snapshot_version: SnapshotVersion, + thread_pool: Option<&ThreadPool>, + maximum_snapshots_to_retain: usize, +) -> Result { + let package = package_incremental_snapshot( + bank, + incremental_snapshot_base_slot, + bank_snapshot_info, + snapshots_dir, + bank.src.slot_deltas(&bank.src.roots()), + snapshot_package_output_path, + snapshot_storages, + archive_format, + snapshot_version, + None, + )?; + + process_and_archive_snapshot_package_pre( + package, + thread_pool, + Some(incremental_snapshot_base_slot), + maximum_snapshots_to_retain, + ) +} + +/// Helper function to hold shared code to process and archive snapshot packages +fn process_and_archive_snapshot_package_pre( + package_pre: AccountsPackagePre, + thread_pool: Option<&ThreadPool>, + incremental_snapshot_base_slot: Option, + maximum_snapshots_to_retain: usize, +) -> Result { + let package = + process_accounts_package_pre(package_pre, thread_pool, incremental_snapshot_base_slot); archive_snapshot_package(&package, maximum_snapshots_to_retain)?; + Ok(package.tar_output_file) }