From 47ff3cecc9cad527b208836f39a7321bf0511b5e Mon Sep 17 00:00:00 2001 From: Brooks Date: Thu, 15 Jun 2023 22:54:32 -0400 Subject: [PATCH] Enables creating snapshots after booting from local state (#32137) --- core/src/validator.rs | 3 + ledger/src/bank_forks_utils.rs | 155 ++++++++++------ local-cluster/src/validator_configs.rs | 1 + local-cluster/tests/local_cluster.rs | 243 +++++++++++++++++++++++++ runtime/src/serde_snapshot.rs | 46 ++++- runtime/src/serde_snapshot/tests.rs | 13 +- 6 files changed, 393 insertions(+), 68 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 1e5cbb8af3..61a0b87405 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -251,6 +251,7 @@ pub struct ValidatorConfig { pub block_verification_method: BlockVerificationMethod, pub block_production_method: BlockProductionMethod, pub generator_config: Option, + pub boot_from_local_state: bool, } impl Default for ValidatorConfig { @@ -317,6 +318,7 @@ impl Default for ValidatorConfig { block_verification_method: BlockVerificationMethod::default(), block_production_method: BlockProductionMethod::default(), generator_config: None, + boot_from_local_state: false, } } } @@ -1625,6 +1627,7 @@ fn load_blockstore( accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation, accounts_db_skip_shrink: config.accounts_db_skip_shrink, runtime_config: config.runtime_config.clone(), + boot_from_local_state: config.boot_from_local_state, ..blockstore_processor::ProcessOptions::default() }; diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index fa93c468e7..678a22423a 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -204,84 +204,125 @@ fn bank_forks_from_snapshot( process::exit(1); } - let (deserialized_bank, starting_snapshot_hashes) = if process_options.boot_from_local_state { - assert!( - !snapshot_config.should_generate_snapshots(), - "booting from local state does not support generating snapshots yet", - ); - - let bank = snapshot_utils::bank_from_latest_snapshot_dir( - &snapshot_config.bank_snapshots_dir, - genesis_config, - &process_options.runtime_config, - &account_paths, - process_options.debug_keys.clone(), - None, - process_options.account_indexes.clone(), - process_options.limit_load_slot_count_from_snapshot, - process_options.shrink_ratio, - process_options.verify_index, - process_options.accounts_db_config.clone(), - accounts_update_notifier, - exit, - ) - .expect("load bank from local state"); - - (bank, None) - } else { - // Given that we are going to boot from an archive, the accountvecs held in the snapshot dirs for fast-boot should - // be released. They will be released by the account_background_service anyway. But in the case of the account_paths - // using memory-mounted file system, they are not released early enough to give space for the new append-vecs from - // the archives, causing the out-of-memory problem. So, purge the snapshot dirs upfront before loading from the archive. - snapshot_utils::purge_old_bank_snapshots(&snapshot_config.bank_snapshots_dir, 0, None); - - let (deserialized_bank, full_snapshot_archive_info, incremental_snapshot_archive_info) = - snapshot_utils::bank_from_latest_snapshot_archives( + let (deserialized_bank, full_snapshot_archive_info, incremental_snapshot_archive_info) = + if process_options.boot_from_local_state { + let bank = snapshot_utils::bank_from_latest_snapshot_dir( &snapshot_config.bank_snapshots_dir, - &snapshot_config.full_snapshot_archives_dir, - &snapshot_config.incremental_snapshot_archives_dir, - &account_paths, genesis_config, &process_options.runtime_config, + &account_paths, process_options.debug_keys.clone(), None, process_options.account_indexes.clone(), process_options.limit_load_slot_count_from_snapshot, process_options.shrink_ratio, - process_options.accounts_db_test_hash_calculation, - process_options.accounts_db_skip_shrink, process_options.verify_index, process_options.accounts_db_config.clone(), accounts_update_notifier, exit, ) - .expect("load bank from snapshot archives"); + .expect("load bank from local state"); - let full_snapshot_hash = FullSnapshotHash(( - full_snapshot_archive_info.slot(), - *full_snapshot_archive_info.hash(), - )); - let starting_incremental_snapshot_hash = - incremental_snapshot_archive_info.map(|incremental_snapshot_archive_info| { - IncrementalSnapshotHash(( - incremental_snapshot_archive_info.slot(), - *incremental_snapshot_archive_info.hash(), - )) - }); - let starting_snapshot_hashes = StartingSnapshotHashes { - full: full_snapshot_hash, - incremental: starting_incremental_snapshot_hash, + // The highest snapshot *archives* are still needed, as they are + // the starting snapshot hashes, which are used by the background services + // when performing snapshot-related tasks. + let full_snapshot_archive_info = + snapshot_utils::get_highest_full_snapshot_archive_info( + &snapshot_config.full_snapshot_archives_dir, + ) + // SAFETY: Calling `bank_forks_from_snapshot` requires at least a full snapshot + .expect("get highest full snapshot"); + + let incremental_snapshot_archive_info = + snapshot_utils::get_highest_incremental_snapshot_archive_info( + &snapshot_config.incremental_snapshot_archives_dir, + full_snapshot_archive_info.slot(), + ); + + // If a newer snapshot archive was downloaded, it is possible that the snapshot + // slot is higher than the local bank we just loaded. It is unlikely the user + // intended to still boot from local state in this scenario. + let latest_snapshot_archive_slot = std::cmp::max( + full_snapshot_archive_info.slot(), + incremental_snapshot_archive_info + .as_ref() + .map(SnapshotArchiveInfoGetter::slot) + .unwrap_or(0), + ); + if bank.slot() < latest_snapshot_archive_slot { + error!( + "Attempting to boot from local state at a slot {} that is *older* than the latest \ + snapshot archive slot {}, which is not supported. Either remove the snapshot archive \ + or remove the --boot-from-local-state CLI flag.", + bank.slot(), + latest_snapshot_archive_slot, + ); + process::exit(1); + } + + ( + bank, + full_snapshot_archive_info, + incremental_snapshot_archive_info, + ) + } else { + // Given that we are going to boot from an archive, the accountvecs held in the snapshot dirs for fast-boot should + // be released. They will be released by the account_background_service anyway. But in the case of the account_paths + // using memory-mounted file system, they are not released early enough to give space for the new append-vecs from + // the archives, causing the out-of-memory problem. So, purge the snapshot dirs upfront before loading from the archive. + snapshot_utils::purge_old_bank_snapshots(&snapshot_config.bank_snapshots_dir, 0, None); + + let (deserialized_bank, full_snapshot_archive_info, incremental_snapshot_archive_info) = + snapshot_utils::bank_from_latest_snapshot_archives( + &snapshot_config.bank_snapshots_dir, + &snapshot_config.full_snapshot_archives_dir, + &snapshot_config.incremental_snapshot_archives_dir, + &account_paths, + genesis_config, + &process_options.runtime_config, + process_options.debug_keys.clone(), + None, + process_options.account_indexes.clone(), + process_options.limit_load_slot_count_from_snapshot, + process_options.shrink_ratio, + process_options.accounts_db_test_hash_calculation, + process_options.accounts_db_skip_shrink, + process_options.verify_index, + process_options.accounts_db_config.clone(), + accounts_update_notifier, + exit, + ) + .expect("load bank from snapshot archives"); + + ( + deserialized_bank, + full_snapshot_archive_info, + incremental_snapshot_archive_info, + ) }; - (deserialized_bank, Some(starting_snapshot_hashes)) - }; - if let Some(shrink_paths) = shrink_paths { deserialized_bank.set_shrink_paths(shrink_paths); } + let full_snapshot_hash = FullSnapshotHash(( + full_snapshot_archive_info.slot(), + *full_snapshot_archive_info.hash(), + )); + let incremental_snapshot_hash = + incremental_snapshot_archive_info.map(|incremental_snapshot_archive_info| { + IncrementalSnapshotHash(( + incremental_snapshot_archive_info.slot(), + *incremental_snapshot_archive_info.hash(), + )) + }); + let starting_snapshot_hashes = StartingSnapshotHashes { + full: full_snapshot_hash, + incremental: incremental_snapshot_hash, + }; + ( Arc::new(RwLock::new(BankForks::new(deserialized_bank))), - starting_snapshot_hashes, + Some(starting_snapshot_hashes), ) } diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 7964a48ec4..4c3de6439b 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -67,6 +67,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { block_verification_method: config.block_verification_method.clone(), block_production_method: config.block_production_method.clone(), generator_config: config.generator_config.clone(), + boot_from_local_state: config.boot_from_local_state, } } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 30900acdcc..51be6997e1 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -4754,3 +4754,246 @@ fn test_duplicate_with_pruned_ancestor() { "test_duplicate_with_pruned_ancestor", ); } + +/// Test fastboot to ensure a node can boot from local state and still produce correct snapshots +/// +/// 1. Start node 1 and wait for it to take snapshots +/// 2. Start node 2 with the snapshots from (1) +/// 3. Wait for node 2 to take a bank snapshot +/// 4. Restart node 2 with the local state from (3) +/// 5. Wait for node 2 to take new snapshots +/// 6. Start node 3 with the snapshots from (5) +/// 7. Wait for node 3 to take new snapshots +/// 8. Ensure the snapshots from (7) match node's 1 and 2 +#[test] +#[serial] +fn test_boot_from_local_state() { + solana_logger::setup_with_default(RUST_LOG_FILTER); + const FULL_SNAPSHOT_INTERVAL: Slot = 100; + const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 10; + + let validator1_config = SnapshotValidatorConfig::new( + FULL_SNAPSHOT_INTERVAL, + INCREMENTAL_SNAPSHOT_INTERVAL, + INCREMENTAL_SNAPSHOT_INTERVAL, + 2, + ); + let validator2_config = SnapshotValidatorConfig::new( + FULL_SNAPSHOT_INTERVAL, + INCREMENTAL_SNAPSHOT_INTERVAL, + INCREMENTAL_SNAPSHOT_INTERVAL, + 4, + ); + let validator3_config = SnapshotValidatorConfig::new( + FULL_SNAPSHOT_INTERVAL, + INCREMENTAL_SNAPSHOT_INTERVAL, + INCREMENTAL_SNAPSHOT_INTERVAL, + 3, + ); + + let mut cluster_config = ClusterConfig { + node_stakes: vec![100 * DEFAULT_NODE_STAKE], + cluster_lamports: DEFAULT_CLUSTER_LAMPORTS, + validator_configs: make_identical_validator_configs(&validator1_config.validator_config, 1), + ..ClusterConfig::default() + }; + let mut cluster = LocalCluster::new(&mut cluster_config, SocketAddrSpace::Unspecified); + + // in order to boot from local state, need to first have snapshot archives + info!("Waiting for validator1 to create snapshots..."); + let (incremental_snapshot_archive, full_snapshot_archive) = + LocalCluster::wait_for_next_incremental_snapshot( + &cluster, + &validator1_config.full_snapshot_archives_dir, + &validator1_config.incremental_snapshot_archives_dir, + Some(Duration::from_secs(5 * 60)), + ); + debug!("snapshot archives:\n\tfull: {full_snapshot_archive:?}\n\tincr: {incremental_snapshot_archive:?}"); + info!("Waiting for validator1 to create snapshots... DONE"); + + info!("Copying snapshots to validator2..."); + std::fs::copy( + full_snapshot_archive.path(), + validator2_config + .full_snapshot_archives_dir + .path() + .join(full_snapshot_archive.path().file_name().unwrap()), + ) + .unwrap(); + std::fs::copy( + incremental_snapshot_archive.path(), + validator2_config + .incremental_snapshot_archives_dir + .path() + .join(incremental_snapshot_archive.path().file_name().unwrap()), + ) + .unwrap(); + info!("Copying snapshots to validator2... DONE"); + + info!("Starting validator2..."); + let validator2_identity = Arc::new(Keypair::new()); + cluster.add_validator( + &validator2_config.validator_config, + DEFAULT_NODE_STAKE, + validator2_identity.clone(), + None, + SocketAddrSpace::Unspecified, + ); + info!("Starting validator2... DONE"); + + // wait for a new bank snapshot to fastboot from that is newer than its snapshot archives + info!("Waiting for validator2 to create a new bank snapshot..."); + let timer = Instant::now(); + let bank_snapshot = loop { + if let Some(full_snapshot_slot) = snapshot_utils::get_highest_full_snapshot_archive_slot( + &validator2_config.full_snapshot_archives_dir, + ) { + if let Some(incremental_snapshot_slot) = + snapshot_utils::get_highest_incremental_snapshot_archive_slot( + &validator2_config.incremental_snapshot_archives_dir, + full_snapshot_slot, + ) + { + if let Some(bank_snapshot) = snapshot_utils::get_highest_bank_snapshot_post( + &validator2_config.bank_snapshots_dir, + ) { + if bank_snapshot.slot > incremental_snapshot_slot { + break bank_snapshot; + } + } + } + } + assert!( + timer.elapsed() < Duration::from_secs(30), + "It should not take longer than 30 seconds to create a new bank snapshot" + ); + std::thread::yield_now(); + }; + debug!("bank snapshot: {bank_snapshot:?}"); + info!("Waiting for validator2 to create a new bank snapshot... DONE"); + + // restart WITH fastboot + info!("Restarting validator2 from local state..."); + let mut validator2_info = cluster.exit_node(&validator2_identity.pubkey()); + validator2_info.config.boot_from_local_state = true; + cluster.restart_node( + &validator2_identity.pubkey(), + validator2_info, + SocketAddrSpace::Unspecified, + ); + info!("Restarting validator2 from local state... DONE"); + + info!("Waiting for validator2 to create snapshots..."); + let (incremental_snapshot_archive, full_snapshot_archive) = + LocalCluster::wait_for_next_incremental_snapshot( + &cluster, + &validator2_config.full_snapshot_archives_dir, + &validator2_config.incremental_snapshot_archives_dir, + Some(Duration::from_secs(5 * 60)), + ); + debug!("snapshot archives:\n\tfull: {full_snapshot_archive:?}\n\tincr: {incremental_snapshot_archive:?}"); + info!("Waiting for validator2 to create snapshots... DONE"); + + info!("Copying snapshots to validator3..."); + std::fs::copy( + full_snapshot_archive.path(), + validator3_config + .full_snapshot_archives_dir + .path() + .join(full_snapshot_archive.path().file_name().unwrap()), + ) + .unwrap(); + std::fs::copy( + incremental_snapshot_archive.path(), + validator3_config + .incremental_snapshot_archives_dir + .path() + .join(incremental_snapshot_archive.path().file_name().unwrap()), + ) + .unwrap(); + info!("Copying snapshots to validator3... DONE"); + + info!("Starting validator3..."); + let validator3_identity = Arc::new(Keypair::new()); + cluster.add_validator( + &validator3_config.validator_config, + DEFAULT_NODE_STAKE, + validator3_identity, + None, + SocketAddrSpace::Unspecified, + ); + info!("Starting validator3... DONE"); + + // wait for a new snapshot to ensure the validator is making roots + info!("Waiting for validator3 to create snapshots..."); + let (incremental_snapshot_archive, full_snapshot_archive) = + LocalCluster::wait_for_next_incremental_snapshot( + &cluster, + &validator3_config.full_snapshot_archives_dir, + &validator3_config.incremental_snapshot_archives_dir, + Some(Duration::from_secs(5 * 60)), + ); + debug!("snapshot archives:\n\tfull: {full_snapshot_archive:?}\n\tincr: {incremental_snapshot_archive:?}"); + info!("Waiting for validator3 to create snapshots... DONE"); + + // ensure that all validators have the correct state by comparing snapshots + // - wait for the other validators to have high enough snapshots + // - ensure validator3's snapshot hashes match the other validators' snapshot hashes + // + // NOTE: There's a chance validator's 1 or 2 have crossed the next full snapshot past what + // validator 3 has. If that happens, validator's 1 or 2 may have purged the snapshots needed + // to compare with validator 3, and thus assert. If that happens, the full snapshot interval + // may need to be adjusted larger. + for (i, other_validator_config) in [(1, &validator1_config), (2, &validator2_config)] { + info!("Checking if validator{i} has the same snapshots as validator3..."); + let timer = Instant::now(); + loop { + if let Some(other_full_snapshot_slot) = + snapshot_utils::get_highest_full_snapshot_archive_slot( + &other_validator_config.full_snapshot_archives_dir, + ) + { + let other_incremental_snapshot_slot = + snapshot_utils::get_highest_incremental_snapshot_archive_slot( + &other_validator_config.incremental_snapshot_archives_dir, + other_full_snapshot_slot, + ); + if other_full_snapshot_slot >= full_snapshot_archive.slot() + && other_incremental_snapshot_slot >= Some(incremental_snapshot_archive.slot()) + { + break; + } + } + assert!( + timer.elapsed() < Duration::from_secs(60), + "It should not take longer than 60 seconds to take snapshots" + ); + std::thread::yield_now(); + } + let other_full_snapshot_archives = snapshot_utils::get_full_snapshot_archives( + &other_validator_config.full_snapshot_archives_dir, + ); + debug!("validator{i} full snapshot archives: {other_full_snapshot_archives:?}"); + assert!(other_full_snapshot_archives + .iter() + .any( + |other_full_snapshot_archive| other_full_snapshot_archive.slot() + == full_snapshot_archive.slot() + && other_full_snapshot_archive.hash() == full_snapshot_archive.hash() + )); + + let other_incremental_snapshot_archives = snapshot_utils::get_incremental_snapshot_archives( + &other_validator_config.incremental_snapshot_archives_dir, + ); + debug!( + "validator{i} incremental snapshot archives: {other_incremental_snapshot_archives:?}" + ); + assert!(other_incremental_snapshot_archives.iter().any( + |other_incremental_snapshot_archive| other_incremental_snapshot_archive.base_slot() + == incremental_snapshot_archive.base_slot() + && other_incremental_snapshot_archive.slot() == incremental_snapshot_archive.slot() + && other_incremental_snapshot_archive.hash() == incremental_snapshot_archive.hash() + )); + info!("Checking if validator{i} has the same snapshots as validator3... DONE"); + } +} diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 2e337b3045..45aab9cd90 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -747,15 +747,43 @@ where { let AccountsDbFields(_, _, slot, bank_hash_info, _, _) = &snapshot_accounts_db_fields.full_snapshot_accounts_db_fields; - let old_accounts_hash = accounts_db.set_accounts_hash_from_snapshot( - *slot, - bank_hash_info.accounts_hash.clone(), - capitalizations.0, - ); - assert!( - old_accounts_hash.is_none(), - "There should not already be an AccountsHash at slot {slot}: {old_accounts_hash:?}", - ); + + if let Some(incremental_snapshot_persistence) = incremental_snapshot_persistence { + // If we've booted from local state that was originally intended to be an incremental + // snapshot, then we will use the incremental snapshot persistence field to set the + // initial accounts hashes in accounts db. + let old_accounts_hash = accounts_db.set_accounts_hash_from_snapshot( + incremental_snapshot_persistence.full_slot, + incremental_snapshot_persistence.full_hash.clone(), + incremental_snapshot_persistence.full_capitalization, + ); + assert!( + old_accounts_hash.is_none(), + "There should not already be an AccountsHash at slot {slot}: {old_accounts_hash:?}", + ); + let old_incremental_accounts_hash = accounts_db + .set_incremental_accounts_hash_from_snapshot( + *slot, + incremental_snapshot_persistence.incremental_hash.clone(), + incremental_snapshot_persistence.incremental_capitalization, + ); + assert!( + old_incremental_accounts_hash.is_none(), + "There should not already be an IncrementalAccountsHash at slot {slot}: {old_incremental_accounts_hash:?}", + ); + } else { + // Otherwise, we've booted from a snapshot archive, or from local state that was *not* + // intended to be an incremental snapshot. + let old_accounts_hash = accounts_db.set_accounts_hash_from_snapshot( + *slot, + bank_hash_info.accounts_hash.clone(), + capitalizations.0, + ); + assert!( + old_accounts_hash.is_none(), + "There should not already be an AccountsHash at slot {slot}: {old_accounts_hash:?}", + ); + } } // Store the accounts hash & capitalization, from the incremental snapshot, in the new AccountsDb diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index b7a930e749..c0b38d5eac 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -312,7 +312,7 @@ fn test_bank_serialize_style( let slot = bank2.slot(); let incremental = incremental_snapshot_persistence.then(|| BankIncrementalSnapshotPersistence { - full_slot: slot + 1, + full_slot: slot - 1, full_hash: SerdeAccountsHash(Hash::new(&[1; 32])), full_capitalization: 31, incremental_hash: SerdeIncrementalAccountsHash(Hash::new(&[2; 32])), @@ -418,7 +418,16 @@ fn test_bank_serialize_style( assert_eq!(dbank.get_balance(&key1.pubkey()), 0); assert_eq!(dbank.get_balance(&key2.pubkey()), 10); assert_eq!(dbank.get_balance(&key3.pubkey()), 0); - assert_eq!(dbank.get_accounts_hash(), Some(accounts_hash)); + if let Some(incremental_snapshot_persistence) = incremental.clone() { + assert_eq!(dbank.get_accounts_hash(), None,); + assert_eq!( + dbank.get_incremental_accounts_hash(), + Some(incremental_snapshot_persistence.incremental_hash.into()), + ); + } else { + assert_eq!(dbank.get_accounts_hash(), Some(accounts_hash)); + assert_eq!(dbank.get_incremental_accounts_hash(), None); + } assert!(bank2 == dbank); assert_eq!(dbank.incremental_snapshot_persistence, incremental); assert_eq!(dbank.get_epoch_accounts_hash_to_serialize().map(|epoch_accounts_hash| *epoch_accounts_hash.as_ref()), expected_epoch_accounts_hash,