Add test_bank_forks_incremental_snapshot() (#18565)

This commit builds on PR #18504 by adding a test to core/tests/snapshot.rs for Incremental Snapshots. The test adds banks to bank forks in a loop and takes both full snapshots and incremental snapshots at intervals, and validates they are rebuild-able.

For background info about Incremental Snapshots, see #17088.

Fixes #18829 and #18972
This commit is contained in:
Brooks Prumo 2021-07-29 16:46:54 -05:00 committed by GitHub
parent 46fdf8a4d2
commit b05fb87f22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 352 additions and 40 deletions

View File

@ -29,6 +29,11 @@ macro_rules! DEFINE_SNAPSHOT_VERSION_PARAMETERIZED_TEST_FUNCTIONS {
fn test_slots_to_snapshot() {
run_test_slots_to_snapshot(SNAPSHOT_VERSION, CLUSTER_TYPE)
}
#[test]
fn test_bank_forks_incremental_snapshot_n() {
run_test_bank_forks_incremental_snapshot_n(SNAPSHOT_VERSION, CLUSTER_TYPE)
}
}
};
}
@ -39,6 +44,7 @@ mod tests {
use crossbeam_channel::unbounded;
use fs_extra::dir::CopyOptions;
use itertools::Itertools;
use log::{info, trace};
use solana_core::snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService};
use solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo};
use solana_runtime::{
@ -57,7 +63,7 @@ mod tests {
use solana_sdk::{
clock::Slot,
genesis_config::{ClusterType, GenesisConfig},
hash::hashv,
hash::{hashv, Hash},
pubkey::Pubkey,
signature::{Keypair, Signer},
system_transaction,
@ -66,6 +72,7 @@ mod tests {
use std::{
collections::HashSet,
fs,
io::{Error, ErrorKind},
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
@ -83,8 +90,8 @@ mod tests {
struct SnapshotTestConfig {
accounts_dir: TempDir,
snapshot_dir: TempDir,
_snapshot_output_path: TempDir,
bank_snapshots_dir: TempDir,
snapshot_archives_dir: TempDir,
snapshot_config: SnapshotConfig,
bank_forks: BankForks,
genesis_config_info: GenesisConfigInfo,
@ -94,11 +101,12 @@ mod tests {
fn new(
snapshot_version: SnapshotVersion,
cluster_type: ClusterType,
snapshot_interval_slots: u64,
accounts_hash_interval_slots: Slot,
snapshot_interval_slots: Slot,
) -> SnapshotTestConfig {
let accounts_dir = TempDir::new().unwrap();
let snapshot_dir = TempDir::new().unwrap();
let snapshot_output_path = TempDir::new().unwrap();
let bank_snapshots_dir = TempDir::new().unwrap();
let snapshot_archives_dir = TempDir::new().unwrap();
let mut genesis_config_info = create_genesis_config(10_000);
genesis_config_info.genesis_config.cluster_type = cluster_type;
let bank0 = Bank::new_with_paths(
@ -114,12 +122,12 @@ mod tests {
);
bank0.freeze();
let mut bank_forks = BankForks::new(bank0);
bank_forks.accounts_hash_interval_slots = snapshot_interval_slots;
bank_forks.accounts_hash_interval_slots = accounts_hash_interval_slots;
let snapshot_config = SnapshotConfig {
snapshot_interval_slots,
snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()),
snapshot_path: PathBuf::from(snapshot_dir.path()),
snapshot_package_output_path: snapshot_archives_dir.path().to_path_buf(),
snapshot_path: bank_snapshots_dir.path().to_path_buf(),
archive_format: ArchiveFormat::TarBzip2,
snapshot_version,
maximum_snapshots_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
@ -127,8 +135,8 @@ mod tests {
bank_forks.set_snapshot_config(Some(snapshot_config.clone()));
SnapshotTestConfig {
accounts_dir,
snapshot_dir,
_snapshot_output_path: snapshot_output_path,
bank_snapshots_dir,
snapshot_archives_dir,
snapshot_config,
bank_forks,
genesis_config_info,
@ -185,9 +193,9 @@ mod tests {
.clone();
assert_eq!(*bank, deserialized_bank);
let bank_snapshot_infos = snapshot_utils::get_bank_snapshots(&snapshot_path);
let bank_snapshots = snapshot_utils::get_bank_snapshots(&snapshot_path);
for p in bank_snapshot_infos {
for p in bank_snapshots {
snapshot_utils::remove_bank_snapshot(p.slot, &snapshot_path).unwrap();
}
}
@ -207,7 +215,12 @@ mod tests {
{
solana_logger::setup();
// Set up snapshotting config
let mut snapshot_test_config = SnapshotTestConfig::new(snapshot_version, cluster_type, 1);
let mut snapshot_test_config = SnapshotTestConfig::new(
snapshot_version,
cluster_type,
set_root_interval,
set_root_interval,
);
let bank_forks = &mut snapshot_test_config.bank_forks;
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
@ -316,10 +329,11 @@ mod tests {
solana_logger::setup();
// Set up snapshotting config
let mut snapshot_test_config = SnapshotTestConfig::new(snapshot_version, cluster_type, 1);
let mut snapshot_test_config =
SnapshotTestConfig::new(snapshot_version, cluster_type, 1, 1);
let bank_forks = &mut snapshot_test_config.bank_forks;
let snapshots_dir = &snapshot_test_config.snapshot_dir;
let bank_snapshots_dir = &snapshot_test_config.bank_snapshots_dir;
let snapshot_config = &snapshot_test_config.snapshot_config;
let snapshot_path = &snapshot_config.snapshot_path;
let snapshot_package_output_path = &snapshot_config.snapshot_package_output_path;
@ -438,9 +452,9 @@ mod tests {
// currently sitting in the channel
snapshot_utils::purge_old_bank_snapshots(snapshot_path);
let mut bank_snapshot_infos = snapshot_utils::get_bank_snapshots(&snapshots_dir);
bank_snapshot_infos.sort_unstable();
assert!(bank_snapshot_infos
let mut bank_snapshots = snapshot_utils::get_bank_snapshots(&bank_snapshots_dir);
bank_snapshots.sort_unstable();
assert!(bank_snapshots
.into_iter()
.map(|path| path.slot)
.eq(3..=snapshot_utils::MAX_BANK_SNAPSHOTS as u64 + 2));
@ -541,7 +555,8 @@ mod tests {
let mut snapshot_test_config = SnapshotTestConfig::new(
snapshot_version,
cluster_type,
(*add_root_interval * num_set_roots * 2) as u64,
(*add_root_interval * num_set_roots * 2) as Slot,
(*add_root_interval * num_set_roots * 2) as Slot,
);
let mut current_bank = snapshot_test_config.bank_forks[0].clone();
let request_sender = AbsRequestSender::new(Some(snapshot_sender));
@ -610,4 +625,230 @@ mod tests {
);
}
}
fn run_test_bank_forks_incremental_snapshot_n(
snapshot_version: SnapshotVersion,
cluster_type: ClusterType,
) {
solana_logger::setup();
const SET_ROOT_INTERVAL: Slot = 2;
const INCREMENTAL_SNAPSHOT_INTERVAL_SLOTS: Slot = SET_ROOT_INTERVAL * 2;
const FULL_SNAPSHOT_INTERVAL_SLOTS: Slot = INCREMENTAL_SNAPSHOT_INTERVAL_SLOTS * 5;
const LAST_SLOT: Slot = FULL_SNAPSHOT_INTERVAL_SLOTS * 2 - 1;
info!("Running bank forks incremental snapshot test, full snapshot interval: {} slots, incremental snapshot interval: {} slots, last slot: {}, set root interval: {} slots",
FULL_SNAPSHOT_INTERVAL_SLOTS, INCREMENTAL_SNAPSHOT_INTERVAL_SLOTS, LAST_SLOT, SET_ROOT_INTERVAL);
let mut snapshot_test_config = SnapshotTestConfig::new(
snapshot_version,
cluster_type,
SET_ROOT_INTERVAL,
FULL_SNAPSHOT_INTERVAL_SLOTS,
);
trace!("SnapshotTestConfig:\naccounts_dir: {}\nbank_snapshots_dir: {}\nsnapshot_archives_dir: {}", snapshot_test_config.accounts_dir.path().display(), snapshot_test_config.bank_snapshots_dir.path().display(), snapshot_test_config.snapshot_archives_dir.path().display());
let bank_forks = &mut snapshot_test_config.bank_forks;
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
let (accounts_package_sender, _accounts_package_receiver) = channel();
let request_sender = AbsRequestSender::new(Some(snapshot_request_sender));
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_receiver,
accounts_package_sender,
};
let mut last_full_snapshot_slot = None;
for slot in 1..=LAST_SLOT {
// Make a new bank and perform some transactions
let bank = {
let bank = Bank::new_from_parent(&bank_forks[slot - 1], &Pubkey::default(), slot);
let key = Keypair::new().pubkey();
let tx = system_transaction::transfer(mint_keypair, &key, 1, bank.last_blockhash());
assert_eq!(bank.process_transaction(&tx), Ok(()));
let key = Keypair::new().pubkey();
let tx = system_transaction::transfer(mint_keypair, &key, 0, bank.last_blockhash());
assert_eq!(bank.process_transaction(&tx), Ok(()));
while !bank.is_complete() {
bank.register_tick(&Hash::new_unique());
}
bank_forks.insert(bank)
};
// Set root to make sure we don't end up with too many account storage entries
// and to allow snapshotting of bank and the purging logic on status_cache to
// kick in
if slot % SET_ROOT_INTERVAL == 0 {
// set_root sends a snapshot request
bank_forks.set_root(bank.slot(), &request_sender, None);
bank.update_accounts_hash();
snapshot_request_handler.handle_snapshot_requests(false, false, false, 0);
}
// Since AccountsBackgroundService isn't running, manually make a full snapshot archive
// at the right interval
if slot % FULL_SNAPSHOT_INTERVAL_SLOTS == 0 {
make_full_snapshot_archive(&bank, &snapshot_test_config.snapshot_config).unwrap();
last_full_snapshot_slot = Some(slot);
}
// Similarly, make an incremental snapshot archive at the right interval, but only if
// there's been at least one full snapshot first, and a full snapshot wasn't already
// taken at this slot.
//
// Then, after making an incremental snapshot, restore the bank and verify it is correct
else if slot % INCREMENTAL_SNAPSHOT_INTERVAL_SLOTS == 0
&& last_full_snapshot_slot.is_some()
&& slot != last_full_snapshot_slot.unwrap()
{
make_incremental_snapshot_archive(
&bank,
last_full_snapshot_slot.unwrap(),
&snapshot_test_config.snapshot_config,
)
.unwrap();
restore_from_incremental_snapshot_and_check_banks_are_equal(
&bank,
last_full_snapshot_slot.unwrap(),
&snapshot_test_config.snapshot_config,
snapshot_test_config.accounts_dir.path().to_path_buf(),
&snapshot_test_config.genesis_config_info.genesis_config,
)
.unwrap();
}
}
}
fn make_full_snapshot_archive(
bank: &Bank,
snapshot_config: &SnapshotConfig,
) -> snapshot_utils::Result<()> {
let slot = bank.slot();
info!("Making full snapshot archive from bank at slot: {}", slot);
let bank_snapshot_info = snapshot_utils::get_bank_snapshots(&snapshot_config.snapshot_path)
.into_iter()
.find(|elem| elem.slot == slot)
.ok_or_else(|| Error::new(ErrorKind::Other, "did not find snapshot with this path"))?;
snapshot_utils::package_process_and_archive_full_snapshot(
bank,
&bank_snapshot_info,
&snapshot_config.snapshot_path,
&snapshot_config.snapshot_package_output_path,
bank.get_snapshot_storages(),
snapshot_config.archive_format,
snapshot_config.snapshot_version,
None,
snapshot_config.maximum_snapshots_to_retain,
)?;
Ok(())
}
fn make_incremental_snapshot_archive(
bank: &Bank,
incremental_snapshot_base_slot: Slot,
snapshot_config: &SnapshotConfig,
) -> snapshot_utils::Result<()> {
let slot = bank.slot();
info!(
"Making incremental snapshot archive from bank at slot: {}, and base slot: {}",
slot, incremental_snapshot_base_slot,
);
let bank_snapshot_info = snapshot_utils::get_bank_snapshots(&snapshot_config.snapshot_path)
.into_iter()
.find(|elem| elem.slot == slot)
.ok_or_else(|| Error::new(ErrorKind::Other, "did not find snapshot with this path"))?;
snapshot_utils::package_process_and_archive_incremental_snapshot(
bank,
incremental_snapshot_base_slot,
&bank_snapshot_info,
&snapshot_config.snapshot_path,
&snapshot_config.snapshot_package_output_path,
bank.get_incremental_snapshot_storages(incremental_snapshot_base_slot),
snapshot_config.archive_format,
snapshot_config.snapshot_version,
None,
snapshot_config.maximum_snapshots_to_retain,
)?;
Ok(())
}
fn restore_from_incremental_snapshot_and_check_banks_are_equal(
bank: &Bank,
last_full_snapshot_slot: Slot,
snapshot_config: &SnapshotConfig,
accounts_dir: PathBuf,
genesis_config: &GenesisConfig,
) -> snapshot_utils::Result<()> {
let (
full_snapshot_archive_slot,
(incremental_snapshot_archive_base_slot, incremental_snapshot_archive_slot),
deserialized_bank,
) = restore_from_incremental_snapshot(snapshot_config, accounts_dir, genesis_config)?;
assert_eq!(
full_snapshot_archive_slot,
incremental_snapshot_archive_base_slot
);
assert_eq!(full_snapshot_archive_slot, last_full_snapshot_slot);
assert_eq!(incremental_snapshot_archive_slot, bank.slot(),);
assert_eq!(*bank, deserialized_bank);
Ok(())
}
fn restore_from_incremental_snapshot(
snapshot_config: &SnapshotConfig,
accounts_dir: PathBuf,
genesis_config: &GenesisConfig,
) -> snapshot_utils::Result<(Slot, (Slot, Slot), Bank)> {
let full_snapshot_archive_info = snapshot_utils::get_highest_full_snapshot_archive_info(
&snapshot_config.snapshot_package_output_path,
)
.ok_or_else(|| Error::new(ErrorKind::Other, "no full snapshot"))?;
let incremental_snapshot_archive_info =
snapshot_utils::get_highest_incremental_snapshot_archive_info(
&snapshot_config.snapshot_package_output_path,
*full_snapshot_archive_info.slot(),
)
.ok_or_else(|| Error::new(ErrorKind::Other, "no incremental snapshot"))?;
info!("Restoring bank from full snapshot slot: {}, and incremental snapshot slot: {} (with base slot: {})",
full_snapshot_archive_info.slot(), incremental_snapshot_archive_info.slot(), incremental_snapshot_archive_info.base_slot());
let (deserialized_bank, _) = snapshot_utils::bank_from_snapshot_archives(
&[accounts_dir],
&[],
&snapshot_config.snapshot_path,
full_snapshot_archive_info.path(),
Some(incremental_snapshot_archive_info.path()),
snapshot_config.archive_format,
genesis_config,
None,
None,
AccountSecondaryIndexes::default(),
false,
None,
accounts_db::AccountShrinkThreshold::default(),
false,
false,
)?;
Ok((
*full_snapshot_archive_info.slot(),
(
*incremental_snapshot_archive_info.base_slot(),
*incremental_snapshot_archive_info.slot(),
),
deserialized_bank,
))
}
}

View File

@ -133,15 +133,15 @@ impl IncrementalSnapshotArchiveInfo {
})
}
fn path(&self) -> &PathBuf {
pub fn path(&self) -> &PathBuf {
&self.inner.path
}
fn base_slot(&self) -> &Slot {
pub fn base_slot(&self) -> &Slot {
&self.base_slot
}
fn slot(&self) -> &Slot {
pub fn slot(&self) -> &Slot {
&self.inner.slot
}
@ -368,7 +368,7 @@ where
/// Package up bank snapshot files, snapshot storages, and slot deltas for an incremental snapshot.
#[allow(clippy::too_many_arguments)]
fn package_incremental_snapshot<P, Q>(
pub fn package_incremental_snapshot<P, Q>(
bank: &Bank,
incremental_snapshot_base_slot: Slot,
bank_snapshot_info: &BankSnapshotInfo,
@ -420,6 +420,7 @@ where
)
}
/// Create a snapshot package
fn do_package_snapshot<P>(
bank: &Bank,
bank_snapshot_info: &BankSnapshotInfo,
@ -434,8 +435,6 @@ fn do_package_snapshot<P>(
where
P: AsRef<Path>,
{
// Create a snapshot package
// Hard link the snapshot into a tmpdir, to ensure its not removed prior to packaging.
{
let snapshot_hardlink_dir = snapshot_tmpdir
@ -1777,25 +1776,20 @@ pub fn bank_to_full_snapshot_archive<P: AsRef<Path>, Q: AsRef<Path>>(
bank.rehash(); // Bank accounts may have been manually modified by the caller
let temp_dir = tempfile::tempdir_in(snapshots_dir)?;
let storages = bank.get_snapshot_storages();
let bank_snapshot_info = add_bank_snapshot(&temp_dir, bank, &storages, snapshot_version)?;
let package = package_full_snapshot(
package_process_and_archive_full_snapshot(
bank,
&bank_snapshot_info,
&temp_dir,
bank.src.slot_deltas(&bank.src.roots()),
snapshot_package_output_path,
storages,
archive_format,
snapshot_version,
None,
)?;
let package = process_accounts_package_pre(package, thread_pool, None);
archive_snapshot_package(&package, maximum_snapshots_to_retain)?;
Ok(package.tar_output_file)
thread_pool,
maximum_snapshots_to_retain,
)
}
/// Convenience function to create an incremental snapshot archive out of any Bank, regardless of
@ -1825,25 +1819,102 @@ pub fn bank_to_incremental_snapshot_archive<P: AsRef<Path>, Q: AsRef<Path>>(
bank.rehash(); // Bank accounts may have been manually modified by the caller
let temp_dir = tempfile::tempdir_in(snapshots_dir)?;
let storages = bank.get_incremental_snapshot_storages(full_snapshot_slot);
let bank_snapshot_info = add_bank_snapshot(&temp_dir, bank, &storages, snapshot_version)?;
let package = package_incremental_snapshot(
package_process_and_archive_incremental_snapshot(
bank,
full_snapshot_slot,
&bank_snapshot_info,
&temp_dir,
bank.src.slot_deltas(&bank.src.roots()),
snapshot_package_output_path,
storages,
archive_format,
snapshot_version,
thread_pool,
maximum_snapshots_to_retain,
)
}
/// Helper function to hold shared code to package, process, and archive full snapshots
pub fn package_process_and_archive_full_snapshot(
bank: &Bank,
bank_snapshot_info: &BankSnapshotInfo,
snapshots_dir: impl AsRef<Path>,
snapshot_package_output_path: impl AsRef<Path>,
snapshot_storages: SnapshotStorages,
archive_format: ArchiveFormat,
snapshot_version: SnapshotVersion,
thread_pool: Option<&ThreadPool>,
maximum_snapshots_to_retain: usize,
) -> Result<PathBuf> {
let package = package_full_snapshot(
bank,
bank_snapshot_info,
snapshots_dir,
bank.src.slot_deltas(&bank.src.roots()),
snapshot_package_output_path,
snapshot_storages,
archive_format,
snapshot_version,
None,
)?;
let package = process_accounts_package_pre(package, thread_pool, Some(full_snapshot_slot));
process_and_archive_snapshot_package_pre(
package,
thread_pool,
None,
maximum_snapshots_to_retain,
)
}
/// Helper function to hold shared code to package, process, and archive incremental snapshots
#[allow(clippy::too_many_arguments)]
pub fn package_process_and_archive_incremental_snapshot(
bank: &Bank,
incremental_snapshot_base_slot: Slot,
bank_snapshot_info: &BankSnapshotInfo,
snapshots_dir: impl AsRef<Path>,
snapshot_package_output_path: impl AsRef<Path>,
snapshot_storages: SnapshotStorages,
archive_format: ArchiveFormat,
snapshot_version: SnapshotVersion,
thread_pool: Option<&ThreadPool>,
maximum_snapshots_to_retain: usize,
) -> Result<PathBuf> {
let package = package_incremental_snapshot(
bank,
incremental_snapshot_base_slot,
bank_snapshot_info,
snapshots_dir,
bank.src.slot_deltas(&bank.src.roots()),
snapshot_package_output_path,
snapshot_storages,
archive_format,
snapshot_version,
None,
)?;
process_and_archive_snapshot_package_pre(
package,
thread_pool,
Some(incremental_snapshot_base_slot),
maximum_snapshots_to_retain,
)
}
/// Helper function to hold shared code to process and archive snapshot packages
fn process_and_archive_snapshot_package_pre(
package_pre: AccountsPackagePre,
thread_pool: Option<&ThreadPool>,
incremental_snapshot_base_slot: Option<Slot>,
maximum_snapshots_to_retain: usize,
) -> Result<PathBuf> {
let package =
process_accounts_package_pre(package_pre, thread_pool, incremental_snapshot_base_slot);
archive_snapshot_package(&package, maximum_snapshots_to_retain)?;
Ok(package.tar_output_file)
}