diff --git a/core/tests/epoch_accounts_hash.rs b/core/tests/epoch_accounts_hash.rs new file mode 100755 index 0000000000..eeea8ac420 --- /dev/null +++ b/core/tests/epoch_accounts_hash.rs @@ -0,0 +1,330 @@ +#![allow(clippy::integer_arithmetic)] +use { + log::*, + solana_core::{ + accounts_hash_verifier::AccountsHashVerifier, + snapshot_packager_service::SnapshotPackagerService, + }, + solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, + solana_runtime::{ + accounts_background_service::{ + AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService, DroppedSlotsReceiver, + PrunedBanksRequestHandler, SnapshotRequestHandler, + }, + accounts_hash::CalcAccountsHashConfig, + bank::Bank, + bank_forks::BankForks, + epoch_accounts_hash::{self, EpochAccountsHash}, + genesis_utils::{self, GenesisConfigInfo}, + snapshot_config::SnapshotConfig, + snapshot_package::{PendingAccountsPackage, PendingSnapshotPackage}, + }, + solana_sdk::{ + clock::Slot, + epoch_schedule::EpochSchedule, + feature_set, + native_token::LAMPORTS_PER_SOL, + pubkey::Pubkey, + signature::{Keypair, Signer}, + system_transaction, + timing::timestamp, + }, + solana_streamer::socket::SocketAddrSpace, + std::{ + mem::ManuallyDrop, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + time::Duration, + }, + tempfile::TempDir, +}; + +struct TestEnvironment { + bank_forks: Arc>, + + genesis_config_info: GenesisConfigInfo, + _bank_snapshots_dir: TempDir, + _full_snapshot_archives_dir: TempDir, + _incremental_snapshot_archives_dir: TempDir, + _snapshot_config: SnapshotConfig, + + // NOTE: This field must come after bank_forks because it must be dropped after + background_services: BackgroundServices, +} + +impl TestEnvironment { + /// A small, round number to make the tests run quickly, and easy to debug + const SLOTS_PER_EPOCH: u64 = 100; + + /// A small, round number to ensure accounts packages are sent to the background services + const ACCOUNTS_HASH_INTERVAL: u64 = 10; + + #[must_use] + fn new() -> TestEnvironment { + let bank_snapshots_dir = TempDir::new().unwrap(); + let full_snapshot_archives_dir = TempDir::new().unwrap(); + let incremental_snapshot_archives_dir = TempDir::new().unwrap(); + let mut genesis_config_info = genesis_utils::create_genesis_config_with_leader( + 100_000 * LAMPORTS_PER_SOL, // mint_lamports + &Pubkey::new_unique(), // validator_pubkey + 100 * LAMPORTS_PER_SOL, // validator_stake_lamports + ); + genesis_config_info.genesis_config.epoch_schedule = + EpochSchedule::custom(Self::SLOTS_PER_EPOCH, Self::SLOTS_PER_EPOCH, false); + let snapshot_config = SnapshotConfig { + full_snapshot_archives_dir: full_snapshot_archives_dir.path().to_path_buf(), + incremental_snapshot_archives_dir: incremental_snapshot_archives_dir + .path() + .to_path_buf(), + bank_snapshots_dir: bank_snapshots_dir.path().to_path_buf(), + ..SnapshotConfig::new_load_only() + }; + + let mut bank_forks = + BankForks::new(Bank::new_for_tests(&genesis_config_info.genesis_config)); + bank_forks.set_snapshot_config(Some(snapshot_config.clone())); + bank_forks.set_accounts_hash_interval_slots(Self::ACCOUNTS_HASH_INTERVAL); + let bank_forks = Arc::new(RwLock::new(bank_forks)); + + let exit = Arc::new(AtomicBool::new(false)); + let node_id = Arc::new(Keypair::new()); + let cluster_info = Arc::new(ClusterInfo::new( + ContactInfo::new_localhost(&node_id.pubkey(), timestamp()), + Arc::clone(&node_id), + SocketAddrSpace::Unspecified, + )); + + let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded(); + let background_services = BackgroundServices::new( + Arc::clone(&exit), + Arc::clone(&cluster_info), + &snapshot_config, + pruned_banks_receiver, + Arc::clone(&bank_forks), + ); + let bank = bank_forks.read().unwrap().working_bank(); + bank.set_callback(Some(Box::new( + bank.rc + .accounts + .accounts_db + .create_drop_bank_callback(pruned_banks_sender), + ))); + assert!(bank + .feature_set + .is_active(&feature_set::epoch_accounts_hash::id())); + + bank.set_startup_verification_complete(); + + TestEnvironment { + bank_forks, + genesis_config_info, + _bank_snapshots_dir: bank_snapshots_dir, + _full_snapshot_archives_dir: full_snapshot_archives_dir, + _incremental_snapshot_archives_dir: incremental_snapshot_archives_dir, + _snapshot_config: snapshot_config, + background_services, + } + } +} + +/// In order to shut down the background services correctly, each service's thread must be joined. +/// However, since `.join()` takes a `self` and `drop()` takes a `&mut self`, it means a "normal" +/// implementation of drop will no work. Instead, we must handle drop ourselves. +struct BackgroundServices { + exit: Arc, + accounts_background_service: ManuallyDrop, + accounts_background_request_sender: AbsRequestSender, + accounts_hash_verifier: ManuallyDrop, + snapshot_packager_service: ManuallyDrop, +} + +impl BackgroundServices { + #[must_use] + fn new( + exit: Arc, + cluster_info: Arc, + snapshot_config: &SnapshotConfig, + pruned_banks_receiver: DroppedSlotsReceiver, + bank_forks: Arc>, + ) -> Self { + info!("Starting background services..."); + + let pending_snapshot_package = PendingSnapshotPackage::default(); + let snapshot_packager_service = SnapshotPackagerService::new( + pending_snapshot_package.clone(), + None, + &exit, + &cluster_info, + snapshot_config.clone(), + false, + ); + + let pending_accounts_package = PendingAccountsPackage::default(); + let accounts_hash_verifier = AccountsHashVerifier::new( + Arc::clone(&pending_accounts_package), + Some(pending_snapshot_package), + &exit, + &cluster_info, + None, + false, + 0, + Some(snapshot_config.clone()), + ); + + let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded(); + let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender); + let snapshot_request_handler = SnapshotRequestHandler { + snapshot_config: snapshot_config.clone(), + snapshot_request_receiver, + pending_accounts_package, + }; + let pruned_banks_request_handler = PrunedBanksRequestHandler { + pruned_banks_receiver, + }; + let accounts_background_service = AccountsBackgroundService::new( + bank_forks, + &exit, + AbsRequestHandlers { + snapshot_request_handler, + pruned_banks_request_handler, + }, + false, + false, + None, + ); + + info!("Starting background services... DONE"); + Self { + exit, + accounts_background_service: ManuallyDrop::new(accounts_background_service), + accounts_background_request_sender, + accounts_hash_verifier: ManuallyDrop::new(accounts_hash_verifier), + snapshot_packager_service: ManuallyDrop::new(snapshot_packager_service), + } + } +} + +impl Drop for BackgroundServices { + fn drop(&mut self) { + info!("Stopping background services..."); + self.exit.store(true, Ordering::Relaxed); + + unsafe { ManuallyDrop::take(&mut self.accounts_background_service) } + .join() + .expect("stop ABS"); + + unsafe { ManuallyDrop::take(&mut self.accounts_hash_verifier) } + .join() + .expect("stop AHV"); + + unsafe { ManuallyDrop::take(&mut self.snapshot_packager_service) } + .join() + .expect("stop SPS"); + + info!("Stopping background services... DONE"); + } +} + +/// Run through a few epochs and ensure the Epoch Accounts Hash is calculated correctly +#[test] +fn test_epoch_accounts_hash() { + solana_logger::setup(); + + const NUM_EPOCHS_TO_TEST: u64 = 2; + const SET_ROOT_INTERVAL: Slot = 3; + + let test_config = TestEnvironment::new(); + let bank_forks = &test_config.bank_forks; + + let mut expected_epoch_accounts_hash = None; + + let slots_per_epoch = test_config + .genesis_config_info + .genesis_config + .epoch_schedule + .slots_per_epoch; + for _ in 0..slots_per_epoch * NUM_EPOCHS_TO_TEST { + let bank = { + let parent = bank_forks.read().unwrap().working_bank(); + let bank = bank_forks.write().unwrap().insert(Bank::new_from_parent( + &parent, + &Pubkey::default(), + parent.slot() + 1, + )); + + let transaction = system_transaction::transfer( + &test_config.genesis_config_info.mint_keypair, + &Pubkey::new_unique(), + 1, + bank.last_blockhash(), + ); + bank.process_transaction(&transaction).unwrap(); + bank.fill_bank_with_ticks_for_tests(); + + bank + }; + trace!("new bank {}", bank.slot()); + + // Set roots so that ABS requests are sent (this is what requests EAH calculations) + if bank.slot() % SET_ROOT_INTERVAL == 0 { + trace!("rooting bank {}", bank.slot()); + bank_forks.write().unwrap().set_root( + bank.slot(), + &test_config + .background_services + .accounts_background_request_sender, + None, + ); + } + + // To ensure EAH calculations are correct, calculate the accounts hash here, in-band. + // This will be the expected EAH that gets saved into the "stop" bank. + if bank.slot() == epoch_accounts_hash::calculation_start(&bank) { + bank.freeze(); + let (accounts_hash, _) = bank + .rc + .accounts + .accounts_db + .calculate_accounts_hash( + bank.slot(), + &CalcAccountsHashConfig { + use_bg_thread_pool: false, + check_hash: false, + ancestors: Some(&bank.ancestors), + use_write_cache: true, + epoch_schedule: bank.epoch_schedule(), + rent_collector: bank.rent_collector(), + store_detailed_debug_info_on_failure: false, + full_snapshot: None, + enable_rehashing: true, + }, + ) + .unwrap(); + expected_epoch_accounts_hash = Some(EpochAccountsHash::new(accounts_hash)); + debug!( + "slot {}, expected epoch accounts hash: {:?}", + bank.slot(), + expected_epoch_accounts_hash + ); + } + + // Test: Ensure that the "stop" bank has the correct EAH + if bank.slot() == epoch_accounts_hash::calculation_stop(&bank) { + // Sometimes AHV does not get scheduled to run, which causes the test to fail + // spuriously. Sleep a bit here to ensure AHV gets a chance to run. + std::thread::sleep(Duration::from_secs(1)); + let actual_epoch_accounts_hash = bank.epoch_accounts_hash(); + debug!( + "slot {}, actual epoch accounts hash: {:?}", + bank.slot(), + actual_epoch_accounts_hash, + ); + assert_eq!(expected_epoch_accounts_hash, actual_epoch_accounts_hash); + } + + // Give the background services a chance to run + std::thread::yield_now(); + } +} diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 48cb2808f0..f351a077ea 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -213,10 +213,11 @@ fn run_bank_forks_snapshot_n( let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); let request_sender = AbsRequestSender::new(snapshot_request_sender); + let pending_accounts_package = PendingAccountsPackage::default(); let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: snapshot_test_config.snapshot_config.clone(), snapshot_request_receiver, - pending_accounts_package: PendingAccountsPackage::default(), + pending_accounts_package: pending_accounts_package.clone(), }; for slot in 1..=last_slot { let mut bank = Bank::new_from_parent(&bank_forks[slot - 1], &Pubkey::default(), slot); @@ -230,6 +231,13 @@ fn run_bank_forks_snapshot_n( bank_forks.set_root(bank.slot(), &request_sender, None); bank.update_accounts_hash(); snapshot_request_handler.handle_snapshot_requests(false, false, 0, &mut None); + + // Clear out any pending accounts package. Since `set_root()` can trigger an Epoch + // Accounts Hash request, we must ensure that there is not already a pending EAH + // accounts package, otherwise ABS will panic when trying to submit a second EAH + // accounts package. The most straight forward way is to clear the pending accounts + // package every time. + pending_accounts_package.lock().unwrap().take(); } } diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 6d0deaded7..14fd48f29b 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -27,6 +27,7 @@ use { clock::{DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}, commitment_config::CommitmentConfig, epoch_schedule::EpochSchedule, + feature_set, genesis_config::{ClusterType, GenesisConfig}, message::Message, poh_config::PohConfig, @@ -257,6 +258,11 @@ impl LocalCluster { ), ); + // Do not enable Epoch Accounts Hash in local-cluster tests yet + genesis_config + .accounts + .remove(&feature_set::epoch_accounts_hash::id()); + let (leader_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config); let leader_contact_info = leader_node.info.clone(); let mut leader_config = safe_clone_config(&config.validator_configs[0]); diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index bc57c79968..08a0f87043 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -164,11 +164,11 @@ impl SnapshotRequestHandler { let SnapshotRequest { snapshot_root_bank, status_cache_slot_deltas, - request_type: _request_type, + request_type, } = snapshot_request; - // we should not rely on the state of this validator until startup verification is complete - assert!(snapshot_root_bank.is_startup_verification_complete()); + // we should not rely on the state of this validator until startup verification is complete (unless handling an EAH request) + assert!(snapshot_root_bank.is_startup_verification_complete() || request_type == SnapshotRequestType::EpochAccountsHash); let previous_hash = if test_hash_calculation { // We have to use the index version here. diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index b2a840e1d9..ece485d097 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -6881,6 +6881,11 @@ impl Bank { // will be required for the epoch accounts hash calculation to have completed and // for this value to be `Some`. if let Some(epoch_accounts_hash) = epoch_accounts_hash { + debug!( + "including epoch accounts hash, slot: {}, hash: {:?}", + self.slot(), + epoch_accounts_hash + ); hash = hashv(&[hash.as_ref(), epoch_accounts_hash.as_ref().as_ref()]); } else { warn!("bank {}: epoch_accounts_hash was None but should have been included in this bank's hash!", self.slot()); @@ -7921,7 +7926,7 @@ impl Bank { } /// Convenience fn to get the Epoch Accounts Hash - fn epoch_accounts_hash(&self) -> Option { + pub fn epoch_accounts_hash(&self) -> Option { *self .rc .accounts diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index 09c60eee74..a9533fa0b5 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -4,11 +4,12 @@ use { crate::{ accounts_background_service::{AbsRequestSender, SnapshotRequest, SnapshotRequestType}, bank::Bank, + epoch_accounts_hash, snapshot_config::SnapshotConfig, }, log::*, solana_measure::measure::Measure, - solana_sdk::{clock::Slot, hash::Hash, timing}, + solana_sdk::{clock::Slot, feature_set, hash::Hash, timing}, std::{ collections::{hash_map::Entry, HashMap, HashSet}, ops::Index, @@ -273,7 +274,55 @@ impl BankForks { let mut total_squash_accounts_store_ms = 0; let mut total_squash_cache_ms = 0; let mut total_snapshot_ms = 0; - if let Some(bank) = banks.iter().find(|bank| { + + // handle epoch accounts hash + // go through all the banks, oldest first + // find the newest bank where we should do EAH + // NOTE: Instead of filter-collect-assert, `.find()` could be used instead. Once + // sufficient testing guarantees only one bank will ever request an EAH, change to + // `.find()`. + let eah_banks: Vec<_> = banks + .iter() + .filter(|&&bank| self.should_request_epoch_accounts_hash(bank)) + .collect(); + assert!( + eah_banks.len() <= 1, + "At most one bank should request an epoch accounts hash calculation! num banks: {}, bank slots: {:?}", + eah_banks.len(), + eah_banks.iter().map(|bank| bank.slot()).collect::>(), + ); + if let Some(eah_bank) = eah_banks.first() { + debug!( + "sending epoch accounts hash request, slot: {}", + eah_bank.slot() + ); + self.last_accounts_hash_slot = eah_bank.slot(); + let squash_timing = eah_bank.squash(); + total_squash_accounts_ms += squash_timing.squash_accounts_ms as i64; + total_squash_accounts_index_ms += squash_timing.squash_accounts_index_ms as i64; + total_squash_accounts_cache_ms += squash_timing.squash_accounts_cache_ms as i64; + total_squash_accounts_store_ms += squash_timing.squash_accounts_store_ms as i64; + total_squash_cache_ms += squash_timing.squash_cache_ms as i64; + is_root_bank_squashed = eah_bank.slot() == root; + + // Clear any existing EAH before requesting a new one + _ = eah_bank + .rc + .accounts + .accounts_db + .epoch_accounts_hash + .lock() + .unwrap() + .take(); + + accounts_background_request_sender + .send_snapshot_request(SnapshotRequest { + snapshot_root_bank: Arc::clone(eah_bank), + status_cache_slot_deltas: Vec::default(), + request_type: SnapshotRequestType::EpochAccountsHash, + }) + .expect("send epoch accounts hash request"); + } else if let Some(bank) = banks.iter().find(|bank| { bank.slot() > self.last_accounts_hash_slot && bank.block_height() % self.accounts_hash_interval_slots == 0 }) { @@ -315,6 +364,9 @@ impl BankForks { snapshot_time.stop(); total_snapshot_ms += snapshot_time.as_ms() as i64; } + + drop(eah_banks); + if !is_root_bank_squashed { let squash_timing = root_bank.squash(); total_squash_accounts_ms += squash_timing.squash_accounts_ms as i64; @@ -549,6 +601,22 @@ impl BankForks { pub fn set_accounts_hash_interval_slots(&mut self, accounts_interval_slots: u64) { self.accounts_hash_interval_slots = accounts_interval_slots; } + + /// Determine if this bank should request an epoch accounts hash + #[must_use] + fn should_request_epoch_accounts_hash(&self, bank: &Bank) -> bool { + if !bank + .feature_set + .is_active(&feature_set::epoch_accounts_hash::id()) + { + return false; + } + + let start_slot = epoch_accounts_hash::calculation_start(bank); + bank.slot() > self.last_accounts_hash_slot + && bank.parent_slot() < start_slot + && bank.slot() >= start_slot + } } #[cfg(test)] @@ -563,10 +631,10 @@ mod tests { }, solana_sdk::{ clock::UnixTimestamp, + epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey, signature::{Keypair, Signer}, - sysvar::epoch_schedule::EpochSchedule, }, solana_vote_program::vote_state::BlockTimestamp, };