Add test for startup processing new roots past full snapshot interval (#19876)

This commit is contained in:
Brooks Prumo 2021-09-20 18:50:29 -05:00 committed by GitHub
parent 94330de843
commit 4895c69fea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 396 additions and 0 deletions

View File

@ -2,6 +2,7 @@
use {
assert_matches::assert_matches,
crossbeam_channel::{unbounded, Receiver},
fs_extra::dir::CopyOptions,
gag::BufferRedirect,
log::*,
serial_test::serial,
@ -1876,6 +1877,401 @@ fn test_incremental_snapshot_download() {
);
}
/// Test the scenario where a node starts up from a snapshot and its blockstore has enough new
/// roots that cross the full snapshot interval. In this scenario, the node needs to take a full
/// snapshot while processing the blockstore so that once the background services start up, there
/// is the correct full snapshot available to take subsequent incremental snapshots.
///
/// For this test...
/// - Start a leader node and run it long enough to take a full and incremental snapshot
/// - Download those snapshots to a validator node
/// - Copy the validator snapshots to a back up directory
/// - Start up the validator node
/// - Wait for the validator node to see enough root slots to cross the full snapshot interval
/// - Delete the snapshots on the validator node and restore the ones from the backup
/// - Restart the validator node to trigger the scenario we're trying to test
/// - Wait for the validator node to generate a new incremental snapshot
/// - Copy the new incremental snapshot (and its associated full snapshot) to another new validator
/// - Start up this new validator to ensure the snapshots from ^^^ are good
#[test]
#[serial]
fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_startup() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
// If these intervals change, also make sure to change the loop timers accordingly.
let accounts_hash_interval = 3;
let incremental_snapshot_interval = accounts_hash_interval * 3;
let full_snapshot_interval = incremental_snapshot_interval * 3;
let num_account_paths = 3;
let leader_snapshot_test_config = SnapshotValidatorConfig::new(
full_snapshot_interval,
incremental_snapshot_interval,
accounts_hash_interval,
num_account_paths,
);
let validator_snapshot_test_config = SnapshotValidatorConfig::new(
full_snapshot_interval,
incremental_snapshot_interval,
accounts_hash_interval,
num_account_paths,
);
let stake = 10_000;
let mut config = ClusterConfig {
node_stakes: vec![stake],
cluster_lamports: 1_000_000,
validator_configs: make_identical_validator_configs(
&leader_snapshot_test_config.validator_config,
1,
),
..ClusterConfig::default()
};
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
debug!("snapshot config:\n\tfull snapshot interval: {}\n\tincremental snapshot interval: {}\n\taccounts hash interval: {}",
full_snapshot_interval,
incremental_snapshot_interval,
accounts_hash_interval);
debug!(
"leader config:\n\tbank snapshots dir: {}\n\tsnapshot archives dir: {}",
leader_snapshot_test_config
.bank_snapshots_dir
.path()
.display(),
leader_snapshot_test_config
.snapshot_archives_dir
.path()
.display(),
);
debug!(
"validator config:\n\tbank snapshots dir: {}\n\tsnapshot archives dir: {}",
validator_snapshot_test_config
.bank_snapshots_dir
.path()
.display(),
validator_snapshot_test_config
.snapshot_archives_dir
.path()
.display(),
);
info!("Waiting for leader to create snapshots...");
let (incremental_snapshot_archive_info, full_snapshot_archive_info) =
LocalCluster::wait_for_next_incremental_snapshot(
&cluster,
leader_snapshot_test_config.snapshot_archives_dir.path(),
);
debug!(
"Found snapshots:\n\tfull snapshot: {}\n\tincremental snapshot: {}",
full_snapshot_archive_info.path().display(),
incremental_snapshot_archive_info.path().display()
);
assert_eq!(
full_snapshot_archive_info.slot(),
incremental_snapshot_archive_info.base_slot()
);
// Download the snapshots, then boot a validator from them.
info!("Downloading full snapshot to validator...");
download_snapshot_archive(
&cluster.entry_point_info.rpc,
validator_snapshot_test_config.snapshot_archives_dir.path(),
(
full_snapshot_archive_info.slot(),
*full_snapshot_archive_info.hash(),
),
SnapshotType::FullSnapshot,
validator_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.maximum_full_snapshot_archives_to_retain,
validator_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.maximum_incremental_snapshot_archives_to_retain,
false,
&mut None,
)
.unwrap();
let downloaded_full_snapshot_archive_info =
snapshot_utils::get_highest_full_snapshot_archive_info(
validator_snapshot_test_config.snapshot_archives_dir.path(),
)
.unwrap();
debug!(
"Downloaded full snapshot, slot: {}",
downloaded_full_snapshot_archive_info.slot()
);
info!("Downloading incremental snapshot to validator...");
download_snapshot_archive(
&cluster.entry_point_info.rpc,
validator_snapshot_test_config.snapshot_archives_dir.path(),
(
incremental_snapshot_archive_info.slot(),
*incremental_snapshot_archive_info.hash(),
),
SnapshotType::IncrementalSnapshot(incremental_snapshot_archive_info.base_slot()),
validator_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.maximum_full_snapshot_archives_to_retain,
validator_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.maximum_incremental_snapshot_archives_to_retain,
false,
&mut None,
)
.unwrap();
let downloaded_incremental_snapshot_archive_info =
snapshot_utils::get_highest_incremental_snapshot_archive_info(
validator_snapshot_test_config.snapshot_archives_dir.path(),
full_snapshot_archive_info.slot(),
)
.unwrap();
debug!(
"Downloaded incremental snapshot, slot: {}, base slot: {}",
downloaded_incremental_snapshot_archive_info.slot(),
downloaded_incremental_snapshot_archive_info.base_slot(),
);
assert_eq!(
downloaded_full_snapshot_archive_info.slot(),
downloaded_incremental_snapshot_archive_info.base_slot()
);
// closure to delete files in a directory
let delete_files = |dir: &Path| {
trace!("deleting files in dir {}", dir.display());
for entry in fs::read_dir(dir).unwrap() {
let entry = entry.unwrap();
if entry.file_type().unwrap().is_dir() {
continue;
}
let file_path = entry.path();
trace!("\t\tdeleting file {}...", file_path.display());
fs::remove_file(file_path).unwrap();
}
};
// After downloading the snapshots, copy them over to a backup directory. Later we'll need to
// restart the node and guarantee that the only snapshots present are these initial ones. So,
// the easiest way to do that is create a backup now, delete the ones on the node before
// restart, then copy the backup ones over again.
let backup_validator_snapshot_archives_dir = tempfile::tempdir_in(farf_dir()).unwrap();
trace!(
"Backing up validator snapshots to dir: {}...",
backup_validator_snapshot_archives_dir.path().display()
);
let copy_options = CopyOptions {
content_only: true,
depth: 1,
..CopyOptions::default()
};
fs_extra::dir::copy(
validator_snapshot_test_config.snapshot_archives_dir.path(),
backup_validator_snapshot_archives_dir.path(),
&copy_options,
)
.unwrap();
info!("Starting a new validator...");
let validator_identity = Arc::new(Keypair::new());
cluster.add_validator(
&validator_snapshot_test_config.validator_config,
stake,
validator_identity.clone(),
None,
SocketAddrSpace::Unspecified,
);
// To ensure that a snapshot will be taken during startup, the blockstore needs to have roots
// that cross a full snapshot interval.
info!("Waiting for the validator to see enough slots to cross a full snapshot interval...");
let starting_slot = incremental_snapshot_archive_info.slot();
let timer = Instant::now();
loop {
let validator_current_slot = cluster
.get_validator_client(&validator_identity.pubkey())
.unwrap()
.get_slot_with_commitment(CommitmentConfig::finalized())
.unwrap();
if validator_current_slot > (starting_slot + full_snapshot_interval) {
break;
}
assert!(
timer.elapsed() < Duration::from_secs(30),
"It should not take longer than 30 seconds to cross the next full snapshot interval."
);
std::thread::yield_now();
}
trace!("Waited {:?}", timer.elapsed());
// Get the highest full snapshot archive info for the validator, now that it has crossed the
// next full snapshot interval. We are going to use this to look up the same snapshot on the
// leader, which we'll then use to compare to the full snapshot the validator will create
// during startup. This ensures the snapshot creation process during startup is correct.
//
// Putting this all in its own block so its clear we're only intended to keep the leader's info
let leader_full_snapshot_archive_info_for_comparison = {
let validator_full_snapshot = snapshot_utils::get_highest_full_snapshot_archive_info(
validator_snapshot_test_config.snapshot_archives_dir.path(),
)
.unwrap();
// Now get the same full snapshot on the LEADER that we just got from the validator
let mut leader_full_snapshots = snapshot_utils::get_full_snapshot_archives(
leader_snapshot_test_config.snapshot_archives_dir.path(),
);
leader_full_snapshots.retain(|full_snapshot| {
full_snapshot.slot() == validator_full_snapshot.slot()
&& full_snapshot.hash() == validator_full_snapshot.hash()
});
// NOTE: If this unwrap() ever fails, it may be that the leader's old full snapshot archives
// were purged. If that happens, increase the maximum_full_snapshot_archives_to_retain
// in the leader's Snapshotconfig.
let leader_full_snapshot = leader_full_snapshots.first().unwrap();
// And for sanity, the full snapshot from the leader and the validator MUST be the same
assert_eq!(
(
validator_full_snapshot.slot(),
validator_full_snapshot.hash()
),
(leader_full_snapshot.slot(), leader_full_snapshot.hash())
);
leader_full_snapshot.clone()
};
trace!(
"Delete all the snapshots on the validator and restore the originals from the backup..."
);
delete_files(validator_snapshot_test_config.snapshot_archives_dir.path());
fs_extra::dir::copy(
backup_validator_snapshot_archives_dir.path(),
validator_snapshot_test_config.snapshot_archives_dir.path(),
&copy_options,
)
.unwrap();
// Get the highest full snapshot slot *before* restarting, as a comparison
let validator_full_snapshot_slot_at_startup =
snapshot_utils::get_highest_full_snapshot_archive_slot(
validator_snapshot_test_config.snapshot_archives_dir.path(),
)
.unwrap();
info!("Restarting the validator...");
let validator_info = cluster.exit_node(&validator_identity.pubkey());
cluster.restart_node(
&validator_identity.pubkey(),
validator_info,
SocketAddrSpace::Unspecified,
);
// Now, we want to ensure that the validator can make a new incremental snapshot based on the
// new full snapshot that was created during the restart.
let timer = Instant::now();
let (
validator_highest_full_snapshot_archive_info,
_validator_highest_incremental_snapshot_archive_info,
) = loop {
if let Some(highest_full_snapshot_info) =
snapshot_utils::get_highest_full_snapshot_archive_info(
validator_snapshot_test_config.snapshot_archives_dir.path(),
)
{
if highest_full_snapshot_info.slot() > validator_full_snapshot_slot_at_startup {
if let Some(highest_incremental_snapshot_info) =
snapshot_utils::get_highest_incremental_snapshot_archive_info(
validator_snapshot_test_config.snapshot_archives_dir.path(),
highest_full_snapshot_info.slot(),
)
{
info!("Success! Made new full and incremental snapshots!");
trace!(
"Full snapshot slot: {}, incremental snapshot slot: {}",
highest_full_snapshot_info.slot(),
highest_incremental_snapshot_info.slot(),
);
break (
highest_full_snapshot_info,
highest_incremental_snapshot_info,
);
}
}
}
assert!(
timer.elapsed() < Duration::from_secs(10),
"It should not take longer than 10 seconds to cross the next incremental snapshot interval."
);
std::thread::yield_now();
};
trace!("Waited {:?}", timer.elapsed());
// Check to make sure that the full snapshot the validator created during startup is the same
// as the snapshot the leader created.
// NOTE: If the assert fires and the _slots_ don't match (specifically are off by a full
// snapshot interval), then that means the loop to get the
// `validator_highest_full_snapshot_archive_info` saw the wrong one, and that may've been due
// to some weird scheduling/delays on the machine running the test. Run the test again. If
// this ever fails repeatedly then the test will need to be modified to handle this case.
assert_eq!(
(
validator_highest_full_snapshot_archive_info.slot(),
validator_highest_full_snapshot_archive_info.hash()
),
(
leader_full_snapshot_archive_info_for_comparison.slot(),
leader_full_snapshot_archive_info_for_comparison.hash()
)
);
// And lastly, startup another node with the new snapshots to ensure they work
let final_validator_snapshot_test_config = SnapshotValidatorConfig::new(
full_snapshot_interval,
incremental_snapshot_interval,
accounts_hash_interval,
num_account_paths,
);
// Copy over the snapshots to the new node, but need to remove the tmp snapshot dir so it
// doesn't break the copy files.
snapshot_utils::remove_tmp_snapshot_archives(
validator_snapshot_test_config.snapshot_archives_dir.path(),
);
fs_extra::dir::copy(
validator_snapshot_test_config.snapshot_archives_dir.path(),
final_validator_snapshot_test_config
.snapshot_archives_dir
.path(),
&copy_options,
)
.unwrap();
info!("Starting final validator...");
let final_validator_identity = Arc::new(Keypair::new());
cluster.add_validator(
&final_validator_snapshot_test_config.validator_config,
stake,
final_validator_identity,
None,
SocketAddrSpace::Unspecified,
);
// Success!
}
#[allow(unused_attributes)]
#[test]
#[serial]