Enables creating snapshots after booting from local state (#32137)

This commit is contained in:
Brooks 2023-06-15 22:54:32 -04:00 committed by GitHub
parent b1b7ae5e09
commit 47ff3cecc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 393 additions and 68 deletions

View File

@ -251,6 +251,7 @@ pub struct ValidatorConfig {
pub block_verification_method: BlockVerificationMethod,
pub block_production_method: BlockProductionMethod,
pub generator_config: Option<GeneratorConfig>,
pub boot_from_local_state: bool,
}
impl Default for ValidatorConfig {
@ -317,6 +318,7 @@ impl Default for ValidatorConfig {
block_verification_method: BlockVerificationMethod::default(),
block_production_method: BlockProductionMethod::default(),
generator_config: None,
boot_from_local_state: false,
}
}
}
@ -1625,6 +1627,7 @@ fn load_blockstore(
accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation,
accounts_db_skip_shrink: config.accounts_db_skip_shrink,
runtime_config: config.runtime_config.clone(),
boot_from_local_state: config.boot_from_local_state,
..blockstore_processor::ProcessOptions::default()
};

View File

@ -204,84 +204,125 @@ fn bank_forks_from_snapshot(
process::exit(1);
}
let (deserialized_bank, starting_snapshot_hashes) = if process_options.boot_from_local_state {
assert!(
!snapshot_config.should_generate_snapshots(),
"booting from local state does not support generating snapshots yet",
);
let bank = snapshot_utils::bank_from_latest_snapshot_dir(
&snapshot_config.bank_snapshots_dir,
genesis_config,
&process_options.runtime_config,
&account_paths,
process_options.debug_keys.clone(),
None,
process_options.account_indexes.clone(),
process_options.limit_load_slot_count_from_snapshot,
process_options.shrink_ratio,
process_options.verify_index,
process_options.accounts_db_config.clone(),
accounts_update_notifier,
exit,
)
.expect("load bank from local state");
(bank, None)
} else {
// Given that we are going to boot from an archive, the accountvecs held in the snapshot dirs for fast-boot should
// be released. They will be released by the account_background_service anyway. But in the case of the account_paths
// using memory-mounted file system, they are not released early enough to give space for the new append-vecs from
// the archives, causing the out-of-memory problem. So, purge the snapshot dirs upfront before loading from the archive.
snapshot_utils::purge_old_bank_snapshots(&snapshot_config.bank_snapshots_dir, 0, None);
let (deserialized_bank, full_snapshot_archive_info, incremental_snapshot_archive_info) =
snapshot_utils::bank_from_latest_snapshot_archives(
let (deserialized_bank, full_snapshot_archive_info, incremental_snapshot_archive_info) =
if process_options.boot_from_local_state {
let bank = snapshot_utils::bank_from_latest_snapshot_dir(
&snapshot_config.bank_snapshots_dir,
&snapshot_config.full_snapshot_archives_dir,
&snapshot_config.incremental_snapshot_archives_dir,
&account_paths,
genesis_config,
&process_options.runtime_config,
&account_paths,
process_options.debug_keys.clone(),
None,
process_options.account_indexes.clone(),
process_options.limit_load_slot_count_from_snapshot,
process_options.shrink_ratio,
process_options.accounts_db_test_hash_calculation,
process_options.accounts_db_skip_shrink,
process_options.verify_index,
process_options.accounts_db_config.clone(),
accounts_update_notifier,
exit,
)
.expect("load bank from snapshot archives");
.expect("load bank from local state");
let full_snapshot_hash = FullSnapshotHash((
full_snapshot_archive_info.slot(),
*full_snapshot_archive_info.hash(),
));
let starting_incremental_snapshot_hash =
incremental_snapshot_archive_info.map(|incremental_snapshot_archive_info| {
IncrementalSnapshotHash((
incremental_snapshot_archive_info.slot(),
*incremental_snapshot_archive_info.hash(),
))
});
let starting_snapshot_hashes = StartingSnapshotHashes {
full: full_snapshot_hash,
incremental: starting_incremental_snapshot_hash,
// The highest snapshot *archives* are still needed, as they are
// the starting snapshot hashes, which are used by the background services
// when performing snapshot-related tasks.
let full_snapshot_archive_info =
snapshot_utils::get_highest_full_snapshot_archive_info(
&snapshot_config.full_snapshot_archives_dir,
)
// SAFETY: Calling `bank_forks_from_snapshot` requires at least a full snapshot
.expect("get highest full snapshot");
let incremental_snapshot_archive_info =
snapshot_utils::get_highest_incremental_snapshot_archive_info(
&snapshot_config.incremental_snapshot_archives_dir,
full_snapshot_archive_info.slot(),
);
// If a newer snapshot archive was downloaded, it is possible that the snapshot
// slot is higher than the local bank we just loaded. It is unlikely the user
// intended to still boot from local state in this scenario.
let latest_snapshot_archive_slot = std::cmp::max(
full_snapshot_archive_info.slot(),
incremental_snapshot_archive_info
.as_ref()
.map(SnapshotArchiveInfoGetter::slot)
.unwrap_or(0),
);
if bank.slot() < latest_snapshot_archive_slot {
error!(
"Attempting to boot from local state at a slot {} that is *older* than the latest \
snapshot archive slot {}, which is not supported. Either remove the snapshot archive \
or remove the --boot-from-local-state CLI flag.",
bank.slot(),
latest_snapshot_archive_slot,
);
process::exit(1);
}
(
bank,
full_snapshot_archive_info,
incremental_snapshot_archive_info,
)
} else {
// Given that we are going to boot from an archive, the accountvecs held in the snapshot dirs for fast-boot should
// be released. They will be released by the account_background_service anyway. But in the case of the account_paths
// using memory-mounted file system, they are not released early enough to give space for the new append-vecs from
// the archives, causing the out-of-memory problem. So, purge the snapshot dirs upfront before loading from the archive.
snapshot_utils::purge_old_bank_snapshots(&snapshot_config.bank_snapshots_dir, 0, None);
let (deserialized_bank, full_snapshot_archive_info, incremental_snapshot_archive_info) =
snapshot_utils::bank_from_latest_snapshot_archives(
&snapshot_config.bank_snapshots_dir,
&snapshot_config.full_snapshot_archives_dir,
&snapshot_config.incremental_snapshot_archives_dir,
&account_paths,
genesis_config,
&process_options.runtime_config,
process_options.debug_keys.clone(),
None,
process_options.account_indexes.clone(),
process_options.limit_load_slot_count_from_snapshot,
process_options.shrink_ratio,
process_options.accounts_db_test_hash_calculation,
process_options.accounts_db_skip_shrink,
process_options.verify_index,
process_options.accounts_db_config.clone(),
accounts_update_notifier,
exit,
)
.expect("load bank from snapshot archives");
(
deserialized_bank,
full_snapshot_archive_info,
incremental_snapshot_archive_info,
)
};
(deserialized_bank, Some(starting_snapshot_hashes))
};
if let Some(shrink_paths) = shrink_paths {
deserialized_bank.set_shrink_paths(shrink_paths);
}
let full_snapshot_hash = FullSnapshotHash((
full_snapshot_archive_info.slot(),
*full_snapshot_archive_info.hash(),
));
let incremental_snapshot_hash =
incremental_snapshot_archive_info.map(|incremental_snapshot_archive_info| {
IncrementalSnapshotHash((
incremental_snapshot_archive_info.slot(),
*incremental_snapshot_archive_info.hash(),
))
});
let starting_snapshot_hashes = StartingSnapshotHashes {
full: full_snapshot_hash,
incremental: incremental_snapshot_hash,
};
(
Arc::new(RwLock::new(BankForks::new(deserialized_bank))),
starting_snapshot_hashes,
Some(starting_snapshot_hashes),
)
}

View File

@ -67,6 +67,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
block_verification_method: config.block_verification_method.clone(),
block_production_method: config.block_production_method.clone(),
generator_config: config.generator_config.clone(),
boot_from_local_state: config.boot_from_local_state,
}
}

View File

@ -4754,3 +4754,246 @@ fn test_duplicate_with_pruned_ancestor() {
"test_duplicate_with_pruned_ancestor",
);
}
/// Test fastboot to ensure a node can boot from local state and still produce correct snapshots
///
/// 1. Start node 1 and wait for it to take snapshots
/// 2. Start node 2 with the snapshots from (1)
/// 3. Wait for node 2 to take a bank snapshot
/// 4. Restart node 2 with the local state from (3)
/// 5. Wait for node 2 to take new snapshots
/// 6. Start node 3 with the snapshots from (5)
/// 7. Wait for node 3 to take new snapshots
/// 8. Ensure the snapshots from (7) match node's 1 and 2
#[test]
#[serial]
fn test_boot_from_local_state() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
const FULL_SNAPSHOT_INTERVAL: Slot = 100;
const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 10;
let validator1_config = SnapshotValidatorConfig::new(
FULL_SNAPSHOT_INTERVAL,
INCREMENTAL_SNAPSHOT_INTERVAL,
INCREMENTAL_SNAPSHOT_INTERVAL,
2,
);
let validator2_config = SnapshotValidatorConfig::new(
FULL_SNAPSHOT_INTERVAL,
INCREMENTAL_SNAPSHOT_INTERVAL,
INCREMENTAL_SNAPSHOT_INTERVAL,
4,
);
let validator3_config = SnapshotValidatorConfig::new(
FULL_SNAPSHOT_INTERVAL,
INCREMENTAL_SNAPSHOT_INTERVAL,
INCREMENTAL_SNAPSHOT_INTERVAL,
3,
);
let mut cluster_config = ClusterConfig {
node_stakes: vec![100 * DEFAULT_NODE_STAKE],
cluster_lamports: DEFAULT_CLUSTER_LAMPORTS,
validator_configs: make_identical_validator_configs(&validator1_config.validator_config, 1),
..ClusterConfig::default()
};
let mut cluster = LocalCluster::new(&mut cluster_config, SocketAddrSpace::Unspecified);
// in order to boot from local state, need to first have snapshot archives
info!("Waiting for validator1 to create snapshots...");
let (incremental_snapshot_archive, full_snapshot_archive) =
LocalCluster::wait_for_next_incremental_snapshot(
&cluster,
&validator1_config.full_snapshot_archives_dir,
&validator1_config.incremental_snapshot_archives_dir,
Some(Duration::from_secs(5 * 60)),
);
debug!("snapshot archives:\n\tfull: {full_snapshot_archive:?}\n\tincr: {incremental_snapshot_archive:?}");
info!("Waiting for validator1 to create snapshots... DONE");
info!("Copying snapshots to validator2...");
std::fs::copy(
full_snapshot_archive.path(),
validator2_config
.full_snapshot_archives_dir
.path()
.join(full_snapshot_archive.path().file_name().unwrap()),
)
.unwrap();
std::fs::copy(
incremental_snapshot_archive.path(),
validator2_config
.incremental_snapshot_archives_dir
.path()
.join(incremental_snapshot_archive.path().file_name().unwrap()),
)
.unwrap();
info!("Copying snapshots to validator2... DONE");
info!("Starting validator2...");
let validator2_identity = Arc::new(Keypair::new());
cluster.add_validator(
&validator2_config.validator_config,
DEFAULT_NODE_STAKE,
validator2_identity.clone(),
None,
SocketAddrSpace::Unspecified,
);
info!("Starting validator2... DONE");
// wait for a new bank snapshot to fastboot from that is newer than its snapshot archives
info!("Waiting for validator2 to create a new bank snapshot...");
let timer = Instant::now();
let bank_snapshot = loop {
if let Some(full_snapshot_slot) = snapshot_utils::get_highest_full_snapshot_archive_slot(
&validator2_config.full_snapshot_archives_dir,
) {
if let Some(incremental_snapshot_slot) =
snapshot_utils::get_highest_incremental_snapshot_archive_slot(
&validator2_config.incremental_snapshot_archives_dir,
full_snapshot_slot,
)
{
if let Some(bank_snapshot) = snapshot_utils::get_highest_bank_snapshot_post(
&validator2_config.bank_snapshots_dir,
) {
if bank_snapshot.slot > incremental_snapshot_slot {
break bank_snapshot;
}
}
}
}
assert!(
timer.elapsed() < Duration::from_secs(30),
"It should not take longer than 30 seconds to create a new bank snapshot"
);
std::thread::yield_now();
};
debug!("bank snapshot: {bank_snapshot:?}");
info!("Waiting for validator2 to create a new bank snapshot... DONE");
// restart WITH fastboot
info!("Restarting validator2 from local state...");
let mut validator2_info = cluster.exit_node(&validator2_identity.pubkey());
validator2_info.config.boot_from_local_state = true;
cluster.restart_node(
&validator2_identity.pubkey(),
validator2_info,
SocketAddrSpace::Unspecified,
);
info!("Restarting validator2 from local state... DONE");
info!("Waiting for validator2 to create snapshots...");
let (incremental_snapshot_archive, full_snapshot_archive) =
LocalCluster::wait_for_next_incremental_snapshot(
&cluster,
&validator2_config.full_snapshot_archives_dir,
&validator2_config.incremental_snapshot_archives_dir,
Some(Duration::from_secs(5 * 60)),
);
debug!("snapshot archives:\n\tfull: {full_snapshot_archive:?}\n\tincr: {incremental_snapshot_archive:?}");
info!("Waiting for validator2 to create snapshots... DONE");
info!("Copying snapshots to validator3...");
std::fs::copy(
full_snapshot_archive.path(),
validator3_config
.full_snapshot_archives_dir
.path()
.join(full_snapshot_archive.path().file_name().unwrap()),
)
.unwrap();
std::fs::copy(
incremental_snapshot_archive.path(),
validator3_config
.incremental_snapshot_archives_dir
.path()
.join(incremental_snapshot_archive.path().file_name().unwrap()),
)
.unwrap();
info!("Copying snapshots to validator3... DONE");
info!("Starting validator3...");
let validator3_identity = Arc::new(Keypair::new());
cluster.add_validator(
&validator3_config.validator_config,
DEFAULT_NODE_STAKE,
validator3_identity,
None,
SocketAddrSpace::Unspecified,
);
info!("Starting validator3... DONE");
// wait for a new snapshot to ensure the validator is making roots
info!("Waiting for validator3 to create snapshots...");
let (incremental_snapshot_archive, full_snapshot_archive) =
LocalCluster::wait_for_next_incremental_snapshot(
&cluster,
&validator3_config.full_snapshot_archives_dir,
&validator3_config.incremental_snapshot_archives_dir,
Some(Duration::from_secs(5 * 60)),
);
debug!("snapshot archives:\n\tfull: {full_snapshot_archive:?}\n\tincr: {incremental_snapshot_archive:?}");
info!("Waiting for validator3 to create snapshots... DONE");
// ensure that all validators have the correct state by comparing snapshots
// - wait for the other validators to have high enough snapshots
// - ensure validator3's snapshot hashes match the other validators' snapshot hashes
//
// NOTE: There's a chance validator's 1 or 2 have crossed the next full snapshot past what
// validator 3 has. If that happens, validator's 1 or 2 may have purged the snapshots needed
// to compare with validator 3, and thus assert. If that happens, the full snapshot interval
// may need to be adjusted larger.
for (i, other_validator_config) in [(1, &validator1_config), (2, &validator2_config)] {
info!("Checking if validator{i} has the same snapshots as validator3...");
let timer = Instant::now();
loop {
if let Some(other_full_snapshot_slot) =
snapshot_utils::get_highest_full_snapshot_archive_slot(
&other_validator_config.full_snapshot_archives_dir,
)
{
let other_incremental_snapshot_slot =
snapshot_utils::get_highest_incremental_snapshot_archive_slot(
&other_validator_config.incremental_snapshot_archives_dir,
other_full_snapshot_slot,
);
if other_full_snapshot_slot >= full_snapshot_archive.slot()
&& other_incremental_snapshot_slot >= Some(incremental_snapshot_archive.slot())
{
break;
}
}
assert!(
timer.elapsed() < Duration::from_secs(60),
"It should not take longer than 60 seconds to take snapshots"
);
std::thread::yield_now();
}
let other_full_snapshot_archives = snapshot_utils::get_full_snapshot_archives(
&other_validator_config.full_snapshot_archives_dir,
);
debug!("validator{i} full snapshot archives: {other_full_snapshot_archives:?}");
assert!(other_full_snapshot_archives
.iter()
.any(
|other_full_snapshot_archive| other_full_snapshot_archive.slot()
== full_snapshot_archive.slot()
&& other_full_snapshot_archive.hash() == full_snapshot_archive.hash()
));
let other_incremental_snapshot_archives = snapshot_utils::get_incremental_snapshot_archives(
&other_validator_config.incremental_snapshot_archives_dir,
);
debug!(
"validator{i} incremental snapshot archives: {other_incremental_snapshot_archives:?}"
);
assert!(other_incremental_snapshot_archives.iter().any(
|other_incremental_snapshot_archive| other_incremental_snapshot_archive.base_slot()
== incremental_snapshot_archive.base_slot()
&& other_incremental_snapshot_archive.slot() == incremental_snapshot_archive.slot()
&& other_incremental_snapshot_archive.hash() == incremental_snapshot_archive.hash()
));
info!("Checking if validator{i} has the same snapshots as validator3... DONE");
}
}

View File

@ -747,15 +747,43 @@ where
{
let AccountsDbFields(_, _, slot, bank_hash_info, _, _) =
&snapshot_accounts_db_fields.full_snapshot_accounts_db_fields;
let old_accounts_hash = accounts_db.set_accounts_hash_from_snapshot(
*slot,
bank_hash_info.accounts_hash.clone(),
capitalizations.0,
);
assert!(
old_accounts_hash.is_none(),
"There should not already be an AccountsHash at slot {slot}: {old_accounts_hash:?}",
);
if let Some(incremental_snapshot_persistence) = incremental_snapshot_persistence {
// If we've booted from local state that was originally intended to be an incremental
// snapshot, then we will use the incremental snapshot persistence field to set the
// initial accounts hashes in accounts db.
let old_accounts_hash = accounts_db.set_accounts_hash_from_snapshot(
incremental_snapshot_persistence.full_slot,
incremental_snapshot_persistence.full_hash.clone(),
incremental_snapshot_persistence.full_capitalization,
);
assert!(
old_accounts_hash.is_none(),
"There should not already be an AccountsHash at slot {slot}: {old_accounts_hash:?}",
);
let old_incremental_accounts_hash = accounts_db
.set_incremental_accounts_hash_from_snapshot(
*slot,
incremental_snapshot_persistence.incremental_hash.clone(),
incremental_snapshot_persistence.incremental_capitalization,
);
assert!(
old_incremental_accounts_hash.is_none(),
"There should not already be an IncrementalAccountsHash at slot {slot}: {old_incremental_accounts_hash:?}",
);
} else {
// Otherwise, we've booted from a snapshot archive, or from local state that was *not*
// intended to be an incremental snapshot.
let old_accounts_hash = accounts_db.set_accounts_hash_from_snapshot(
*slot,
bank_hash_info.accounts_hash.clone(),
capitalizations.0,
);
assert!(
old_accounts_hash.is_none(),
"There should not already be an AccountsHash at slot {slot}: {old_accounts_hash:?}",
);
}
}
// Store the accounts hash & capitalization, from the incremental snapshot, in the new AccountsDb

View File

@ -312,7 +312,7 @@ fn test_bank_serialize_style(
let slot = bank2.slot();
let incremental =
incremental_snapshot_persistence.then(|| BankIncrementalSnapshotPersistence {
full_slot: slot + 1,
full_slot: slot - 1,
full_hash: SerdeAccountsHash(Hash::new(&[1; 32])),
full_capitalization: 31,
incremental_hash: SerdeIncrementalAccountsHash(Hash::new(&[2; 32])),
@ -418,7 +418,16 @@ fn test_bank_serialize_style(
assert_eq!(dbank.get_balance(&key1.pubkey()), 0);
assert_eq!(dbank.get_balance(&key2.pubkey()), 10);
assert_eq!(dbank.get_balance(&key3.pubkey()), 0);
assert_eq!(dbank.get_accounts_hash(), Some(accounts_hash));
if let Some(incremental_snapshot_persistence) = incremental.clone() {
assert_eq!(dbank.get_accounts_hash(), None,);
assert_eq!(
dbank.get_incremental_accounts_hash(),
Some(incremental_snapshot_persistence.incremental_hash.into()),
);
} else {
assert_eq!(dbank.get_accounts_hash(), Some(accounts_hash));
assert_eq!(dbank.get_incremental_accounts_hash(), None);
}
assert!(bank2 == dbank);
assert_eq!(dbank.incremental_snapshot_persistence, incremental);
assert_eq!(dbank.get_epoch_accounts_hash_to_serialize().map(|epoch_accounts_hash| *epoch_accounts_hash.as_ref()), expected_epoch_accounts_hash,