diff --git a/Cargo.lock b/Cargo.lock index fa31b33977..fde4a19427 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5870,6 +5870,7 @@ dependencies = [ "base64 0.13.0", "bincode", "chrono-humanize", + "crossbeam-channel", "log", "serde", "solana-banks-client", diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index bbe8ba06c4..c594fa9038 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -234,20 +234,16 @@ impl AccountsHashVerifier { fn save_epoch_accounts_hash(accounts_package: &AccountsPackage, accounts_hash: Hash) { if accounts_package.package_type == AccountsPackageType::EpochAccountsHash { - debug!( + info!( "saving epoch accounts hash, slot: {}, hash: {}", accounts_package.slot, accounts_hash ); - let new_epoch_accounts_hash = EpochAccountsHash::new(accounts_hash); - let old_epoch_accounts_hash = accounts_package + let epoch_accounts_hash = EpochAccountsHash::new(accounts_hash); + accounts_package .accounts .accounts_db - .epoch_accounts_hash - .lock() - .unwrap() - .replace(new_epoch_accounts_hash); - // Old epoch accounts hash must be NONE, because a previous bank must have taken it to hash into itself - assert!(old_epoch_accounts_hash.is_none()); + .epoch_accounts_hash_manager + .set_valid(epoch_accounts_hash, accounts_package.slot); } } diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index f351a077ea..21b169a84b 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -211,13 +211,31 @@ fn run_bank_forks_snapshot_n( let bank_forks = &mut snapshot_test_config.bank_forks; let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair; + let pending_accounts_package = PendingAccountsPackage::default(); + let exit = Arc::new(AtomicBool::new(false)); + let node_id = Arc::new(Keypair::new()); + let cluster_info = Arc::new(ClusterInfo::new( + ContactInfo::new_localhost(&node_id.pubkey(), timestamp()), + Arc::clone(&node_id), + SocketAddrSpace::Unspecified, + )); + let accounts_hash_verifier = AccountsHashVerifier::new( + Arc::clone(&pending_accounts_package), + None, + &exit, + &cluster_info, + None, + false, + 0, + Some(snapshot_test_config.snapshot_config.clone()), + ); + let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); let request_sender = AbsRequestSender::new(snapshot_request_sender); - let pending_accounts_package = PendingAccountsPackage::default(); let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: snapshot_test_config.snapshot_config.clone(), snapshot_request_receiver, - pending_accounts_package: pending_accounts_package.clone(), + pending_accounts_package, }; for slot in 1..=last_slot { let mut bank = Bank::new_from_parent(&bank_forks[slot - 1], &Pubkey::default(), slot); @@ -231,13 +249,6 @@ fn run_bank_forks_snapshot_n( bank_forks.set_root(bank.slot(), &request_sender, None); bank.update_accounts_hash(); snapshot_request_handler.handle_snapshot_requests(false, false, 0, &mut None); - - // Clear out any pending accounts package. Since `set_root()` can trigger an Epoch - // Accounts Hash request, we must ensure that there is not already a pending EAH - // accounts package, otherwise ABS will panic when trying to submit a second EAH - // accounts package. The most straight forward way is to clear the pending accounts - // package every time. - pending_accounts_package.lock().unwrap().take(); } } @@ -283,6 +294,9 @@ fn run_bank_forks_snapshot_n( let account_paths = &[snapshot_test_config.accounts_dir.path().to_path_buf()]; let genesis_config = &snapshot_test_config.genesis_config_info.genesis_config; restore_from_snapshot(bank_forks, last_slot, genesis_config, account_paths); + + exit.store(true, Ordering::Relaxed); + accounts_hash_verifier.join().unwrap(); } #[test_case(V1_2_0, Development)] @@ -591,6 +605,16 @@ fn test_slots_to_snapshot(snapshot_version: SnapshotVersion, cluster_type: Clust snapshot_test_config .bank_forks .set_root(current_bank.slot(), &request_sender, None); + + // Since the accounts background services are not runnning, EpochAccountsHash + // calculation requests will not be handled. To prevent banks from hanging during + // Bank::freeze() due to waiting for EAH to complete, just set the EAH to Invalid. + current_bank + .rc + .accounts + .accounts_db + .epoch_accounts_hash_manager + .set_invalid_for_tests(); } let num_old_slots = num_set_roots * *add_root_interval - MAX_CACHE_ENTRIES + 1; @@ -685,12 +709,31 @@ fn test_bank_forks_incremental_snapshot( let bank_forks = &mut snapshot_test_config.bank_forks; let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair; + let pending_accounts_package = PendingAccountsPackage::default(); + let exit = Arc::new(AtomicBool::new(false)); + let node_id = Arc::new(Keypair::new()); + let cluster_info = Arc::new(ClusterInfo::new( + ContactInfo::new_localhost(&node_id.pubkey(), timestamp()), + Arc::clone(&node_id), + SocketAddrSpace::Unspecified, + )); + let accounts_hash_verifier = AccountsHashVerifier::new( + Arc::clone(&pending_accounts_package), + None, + &exit, + &cluster_info, + None, + false, + 0, + Some(snapshot_test_config.snapshot_config.clone()), + ); + let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); let request_sender = AbsRequestSender::new(snapshot_request_sender); let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: snapshot_test_config.snapshot_config.clone(), snapshot_request_receiver, - pending_accounts_package: PendingAccountsPackage::default(), + pending_accounts_package, }; let mut last_full_snapshot_slot = None; @@ -761,6 +804,8 @@ fn test_bank_forks_incremental_snapshot( .unwrap(); } } + exit.store(true, Ordering::Relaxed); + accounts_hash_verifier.join().unwrap(); } fn make_full_snapshot_archive( diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 3a40a3df7a..7670dc8130 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -18,7 +18,7 @@ use { solana_program_runtime::timings::{ExecuteTimingType, ExecuteTimings, ThreadExecuteTimings}, solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, solana_runtime::{ - accounts_background_service::AbsRequestSender, + accounts_background_service::{AbsRequestSender, SnapshotRequestType}, accounts_db::{AccountShrinkThreshold, AccountsDbConfig}, accounts_index::AccountSecondaryIndexes, accounts_update_notifier_interface::AccountsUpdateNotifier, @@ -56,7 +56,10 @@ use { collections::{HashMap, HashSet}, path::PathBuf, result, - sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, + sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Arc, Mutex, RwLock, + }, time::{Duration, Instant}, }, thiserror::Error, @@ -727,6 +730,35 @@ pub fn test_process_blockstore( opts: &ProcessOptions, exit: &Arc, ) -> (Arc>, LeaderScheduleCache) { + // Spin up a thread to be a fake Accounts Background Service. Need to intercept and handle + // (i.e. skip/make invalid) all EpochAccountsHash requests so future rooted banks do not hang + // in Bank::freeze() waiting for an in-flight EAH calculation to complete. + let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded(); + let abs_request_sender = AbsRequestSender::new(snapshot_request_sender); + let bg_exit = Arc::new(AtomicBool::new(false)); + let bg_thread = { + let exit = Arc::clone(&bg_exit); + std::thread::spawn(move || { + while !exit.load(Relaxed) { + snapshot_request_receiver + .try_iter() + .filter(|snapshot_request| { + snapshot_request.request_type == SnapshotRequestType::EpochAccountsHash + }) + .for_each(|snapshot_request| { + snapshot_request + .snapshot_root_bank + .rc + .accounts + .accounts_db + .epoch_accounts_hash_manager + .set_invalid_for_tests(); + }); + std::thread::sleep(Duration::from_millis(100)); + } + }) + }; + let (bank_forks, leader_schedule_cache, ..) = crate::bank_forks_utils::load_bank_forks( genesis_config, blockstore, @@ -738,6 +770,7 @@ pub fn test_process_blockstore( None, exit, ); + process_blockstore_from_root( blockstore, &bank_forks, @@ -745,9 +778,13 @@ pub fn test_process_blockstore( opts, None, None, - &AbsRequestSender::default(), + &abs_request_sender, ) .unwrap(); + + bg_exit.store(true, Relaxed); + bg_thread.join().unwrap(); + (bank_forks, leader_schedule_cache) } diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 14fd48f29b..6d0deaded7 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -27,7 +27,6 @@ use { clock::{DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}, commitment_config::CommitmentConfig, epoch_schedule::EpochSchedule, - feature_set, genesis_config::{ClusterType, GenesisConfig}, message::Message, poh_config::PohConfig, @@ -258,11 +257,6 @@ impl LocalCluster { ), ); - // Do not enable Epoch Accounts Hash in local-cluster tests yet - genesis_config - .accounts - .remove(&feature_set::epoch_accounts_hash::id()); - let (leader_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config); let leader_contact_info = leader_node.info.clone(); let mut leader_config = safe_clone_config(&config.validator_configs[0]); diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 3d1df45ffd..0670a231a0 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -723,7 +723,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st // If these intervals change, also make sure to change the loop timers accordingly. let accounts_hash_interval = 3; let incremental_snapshot_interval = accounts_hash_interval * 3; - let full_snapshot_interval = incremental_snapshot_interval * 3; + let full_snapshot_interval = incremental_snapshot_interval * 5; let num_account_paths = 3; let leader_snapshot_test_config = SnapshotValidatorConfig::new( @@ -1302,7 +1302,7 @@ fn test_snapshot_restart_tower() { fn test_snapshots_blockstore_floor() { solana_logger::setup_with_default(RUST_LOG_FILTER); // First set up the cluster with 1 snapshotting leader - let snapshot_interval_slots = 10; + let snapshot_interval_slots = 100; let num_account_paths = 4; let leader_snapshot_test_config = @@ -1408,7 +1408,7 @@ fn test_snapshots_blockstore_floor() { #[serial] fn test_snapshots_restart_validity() { solana_logger::setup_with_default(RUST_LOG_FILTER); - let snapshot_interval_slots = 10; + let snapshot_interval_slots = 100; let num_account_paths = 1; let mut snapshot_test_config = setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths); diff --git a/program-test/Cargo.toml b/program-test/Cargo.toml index 27b8592df2..51e9c8c16e 100644 --- a/program-test/Cargo.toml +++ b/program-test/Cargo.toml @@ -13,6 +13,7 @@ async-trait = "0.1.57" base64 = "0.13.0" bincode = "1.3.3" chrono-humanize = "0.2.1" +crossbeam-channel = "0.5" log = "0.4.17" serde = "1.0.144" solana-banks-client = { path = "../banks-client", version = "=1.15.0" } diff --git a/program-test/src/lib.rs b/program-test/src/lib.rs index 55f37a988b..6ecf346421 100644 --- a/program-test/src/lib.rs +++ b/program-test/src/lib.rs @@ -15,6 +15,7 @@ use { stable_log, timings::ExecuteTimings, }, solana_runtime::{ + accounts_background_service::{AbsRequestSender, SnapshotRequestType}, bank::Bank, bank_forks::BankForks, builtins::Builtin, @@ -1129,11 +1130,29 @@ impl ProgramTestContext { pre_warp_slot, )) }; - bank_forks.set_root( - pre_warp_slot, - &solana_runtime::accounts_background_service::AbsRequestSender::default(), - Some(pre_warp_slot), - ); + + let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded(); + let abs_request_sender = AbsRequestSender::new(snapshot_request_sender); + + bank_forks.set_root(pre_warp_slot, &abs_request_sender, Some(pre_warp_slot)); + + // The call to `set_root()` above will send an EAH request. Need to intercept and handle + // (i.e. skip/make invalid) all EpochAccountsHash requests so future rooted banks do not + // hang in Bank::freeze() waiting for an in-flight EAH calculation to complete. + snapshot_request_receiver + .try_iter() + .filter(|snapshot_request| { + snapshot_request.request_type == SnapshotRequestType::EpochAccountsHash + }) + .for_each(|snapshot_request| { + snapshot_request + .snapshot_root_bank + .rc + .accounts + .accounts_db + .epoch_accounts_hash_manager + .set_invalid_for_tests(); + }); // warp_bank is frozen so go forward to get unfrozen bank at warp_slot bank_forks.insert(Bank::new_from_parent( diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 5f8c6f383e..bb945eb925 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -5264,6 +5264,7 @@ dependencies = [ "base64 0.13.0", "bincode", "chrono-humanize", + "crossbeam-channel", "log", "serde", "solana-banks-client", diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 5b2fc47e6a..2e4dbb2f6f 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -45,7 +45,7 @@ use { bank::Rewrites, cache_hash_data::CacheHashData, contains::Contains, - epoch_accounts_hash::EpochAccountsHash, + epoch_accounts_hash::EpochAccountsHashManager, expected_rent_collection::{ExpectedRentCollection, SlotInfoInEpoch}, pubkey_bins::PubkeyBinCalculator24, read_only_accounts_cache::ReadOnlyAccountsCache, @@ -1198,7 +1198,7 @@ pub struct AccountsDb { /// The cadence is once per epoch, all nodes calculate a full accounts hash as of a known slot calculated using 'N' /// Some time later (to allow for slow calculation time), the bank hash at a slot calculated using 'M' includes the full accounts hash. /// Thus, the state of all accounts on a validator is known to be correct at least once per epoch. - pub epoch_accounts_hash: Mutex>, + pub epoch_accounts_hash_manager: EpochAccountsHashManager, } #[derive(Debug, Default)] @@ -2087,7 +2087,7 @@ impl AccountsDb { num_hash_scan_passes, log_dead_slots: AtomicBool::new(true), exhaustively_verify_refcounts: false, - epoch_accounts_hash: Mutex::new(None), + epoch_accounts_hash_manager: EpochAccountsHashManager::new_invalid(), } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index ece485d097..4b894ccb22 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -6869,28 +6869,11 @@ impl Bank { self.last_blockhash().as_ref(), ]); - let epoch_accounts_hash = self.epoch_accounts_hash(); - let should_include_epoch_accounts_hash = self.should_include_epoch_accounts_hash(); - if should_include_epoch_accounts_hash { - // Nothing is writing a value into the epoch accounts hash yet—this is not a problem - // for normal clusters, as the feature gating this `if` block is always false. - // However, some tests enable all features, so this `if` block can be true. - // - // For now, check to see if the epoch accounts hash is `Some` before hashing. Once the - // writer-side is implemented, change this to be an `.expect()` or `.unwrap()`, as it - // will be required for the epoch accounts hash calculation to have completed and - // for this value to be `Some`. - if let Some(epoch_accounts_hash) = epoch_accounts_hash { - debug!( - "including epoch accounts hash, slot: {}, hash: {:?}", - self.slot(), - epoch_accounts_hash - ); - hash = hashv(&[hash.as_ref(), epoch_accounts_hash.as_ref().as_ref()]); - } else { - warn!("bank {}: epoch_accounts_hash was None but should have been included in this bank's hash!", self.slot()); - } - } + let epoch_accounts_hash = self.should_include_epoch_accounts_hash().then(|| { + let epoch_accounts_hash = self.wait_get_epoch_accounts_hash(); + hash = hashv(&[hash.as_ref(), epoch_accounts_hash.as_ref().as_ref()]); + epoch_accounts_hash + }); let buf = self .hard_forks @@ -6917,8 +6900,8 @@ impl Bank { self.signature_count(), self.last_blockhash(), self.capitalization(), - if should_include_epoch_accounts_hash { - format!(", epoch_accounts_hash: {:?}", epoch_accounts_hash) + if let Some(epoch_accounts_hash) = epoch_accounts_hash { + format!(", epoch_accounts_hash: {:?}", epoch_accounts_hash.as_ref()) } else { "".to_string() } @@ -6946,6 +6929,29 @@ impl Bank { self.parent_slot() < stop_slot && self.slot() >= stop_slot } + /// If the epoch accounts hash should be included in this Bank, then fetch it. If the EAH + /// calculation has not completed yet, this fn will block until it does complete. + fn wait_get_epoch_accounts_hash(&self) -> EpochAccountsHash { + let (epoch_accounts_hash, measure) = measure!(self + .rc + .accounts + .accounts_db + .epoch_accounts_hash_manager + .wait_get_epoch_accounts_hash()); + + datapoint_info!( + "bank-wait_get_epoch_accounts_hash", + ("slot", self.slot() as i64, i64), + ( + "epoch_accounts_hash", + epoch_accounts_hash.as_ref().to_string(), + String + ), + ("waiting-time-us", measure.as_us() as i64, i64), + ); + epoch_accounts_hash + } + /// Recalculate the hash_internal_state from the account stores. Would be used to verify a /// snapshot. /// return true if all is good @@ -7927,13 +7933,11 @@ impl Bank { /// Convenience fn to get the Epoch Accounts Hash pub fn epoch_accounts_hash(&self) -> Option { - *self - .rc + self.rc .accounts .accounts_db - .epoch_accounts_hash - .lock() - .unwrap() + .epoch_accounts_hash_manager + .try_get_epoch_accounts_hash() } } diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index a9533fa0b5..15cdfe4918 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -296,6 +296,13 @@ impl BankForks { "sending epoch accounts hash request, slot: {}", eah_bank.slot() ); + eah_bank + .rc + .accounts + .accounts_db + .epoch_accounts_hash_manager + .set_in_flight(eah_bank.slot()); + self.last_accounts_hash_slot = eah_bank.slot(); let squash_timing = eah_bank.squash(); total_squash_accounts_ms += squash_timing.squash_accounts_ms as i64; @@ -305,16 +312,6 @@ impl BankForks { total_squash_cache_ms += squash_timing.squash_cache_ms as i64; is_root_bank_squashed = eah_bank.slot() == root; - // Clear any existing EAH before requesting a new one - _ = eah_bank - .rc - .accounts - .accounts_db - .epoch_accounts_hash - .lock() - .unwrap() - .take(); - accounts_background_request_sender .send_snapshot_request(SnapshotRequest { snapshot_root_bank: Arc::clone(eah_bank), @@ -637,6 +634,7 @@ mod tests { signature::{Keypair, Signer}, }, solana_vote_program::vote_state::BlockTimestamp, + std::{sync::atomic::Ordering::Relaxed, time::Duration}, }; #[test] @@ -734,9 +732,38 @@ mod tests { let slots_in_epoch = 32; genesis_config.epoch_schedule = EpochSchedule::new(slots_in_epoch); + // Spin up a thread to be a fake Accounts Background Service. Need to intercept and handle + // (i.e. skip/make invalid) all EpochAccountsHash requests so future rooted banks do not hang + // in Bank::freeze() waiting for an in-flight EAH calculation to complete. + let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded(); + let abs_request_sender = AbsRequestSender::new(snapshot_request_sender); + let bg_exit = Arc::new(AtomicBool::new(false)); + let bg_thread = { + let exit = Arc::clone(&bg_exit); + std::thread::spawn(move || { + while !exit.load(Relaxed) { + snapshot_request_receiver + .try_iter() + .filter(|snapshot_request| { + snapshot_request.request_type == SnapshotRequestType::EpochAccountsHash + }) + .for_each(|snapshot_request| { + snapshot_request + .snapshot_root_bank + .rc + .accounts + .accounts_db + .epoch_accounts_hash_manager + .set_invalid_for_tests(); + }); + std::thread::sleep(Duration::from_millis(100)); + } + }) + }; + let bank0 = Bank::new_for_tests(&genesis_config); let mut bank_forks0 = BankForks::new(bank0); - bank_forks0.set_root(0, &AbsRequestSender::default(), None); + bank_forks0.set_root(0, &abs_request_sender, None); let bank1 = Bank::new_for_tests(&genesis_config); let mut bank_forks1 = BankForks::new(bank1); @@ -768,7 +795,7 @@ mod tests { // Set root in bank_forks0 to truncate the ancestor history bank_forks0.insert(child1); - bank_forks0.set_root(slot, &AbsRequestSender::default(), None); + bank_forks0.set_root(slot, &abs_request_sender, None); // Don't set root in bank_forks1 to keep the ancestor history bank_forks1.insert(child2); @@ -782,6 +809,9 @@ mod tests { info!("child0.ancestors: {:?}", child1.ancestors); info!("child1.ancestors: {:?}", child2.ancestors); assert_eq!(child1.hash(), child2.hash()); + + bg_exit.store(true, Relaxed); + bg_thread.join().unwrap(); } fn make_hash_map(data: Vec<(Slot, Vec)>) -> HashMap> { diff --git a/runtime/src/epoch_accounts_hash.rs b/runtime/src/epoch_accounts_hash.rs index 9b8689e153..010d8a1640 100644 --- a/runtime/src/epoch_accounts_hash.rs +++ b/runtime/src/epoch_accounts_hash.rs @@ -16,6 +16,9 @@ use { }, }; +mod manager; +pub use manager::Manager as EpochAccountsHashManager; + /// The EpochAccountsHash holds the result after calculating the accounts hash once per epoch #[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone, Copy)] pub struct EpochAccountsHash(Hash); @@ -29,7 +32,7 @@ impl AsRef for EpochAccountsHash { impl EpochAccountsHash { /// Make an EpochAccountsHash from a regular accounts hash #[must_use] - pub fn new(accounts_hash: Hash) -> Self { + pub const fn new(accounts_hash: Hash) -> Self { Self(accounts_hash) } } diff --git a/runtime/src/epoch_accounts_hash/manager.rs b/runtime/src/epoch_accounts_hash/manager.rs new file mode 100644 index 0000000000..0cfa34f92a --- /dev/null +++ b/runtime/src/epoch_accounts_hash/manager.rs @@ -0,0 +1,199 @@ +use { + super::EpochAccountsHash, + solana_sdk::{clock::Slot, hash::Hash}, + std::sync::{Condvar, Mutex}, +}; + +/// Manage the epoch accounts hash +/// +/// Handles setting when an EAH calculation is requested and when it completes. Also handles +/// waiting for in-flight calculations to complete when the "stop" Bank must include it. +#[derive(Debug)] +pub struct Manager { + /// Current state of the epoch accounts hash + state: Mutex, + /// This condition variable is used to wait for an in-flight EAH calculation to complete + cvar: Condvar, +} + +impl Manager { + #[must_use] + fn _new(state: State) -> Self { + Self { + state: Mutex::new(state), + cvar: Condvar::new(), + } + } + + /// Create a new epoch accounts hash manager, with the initial state set to Invalid + #[must_use] + pub fn new_invalid() -> Self { + Self::_new(State::Invalid) + } + + /// Create a new epoch accounts hash manager, with the initial state set to Valid + #[must_use] + pub fn new_valid(epoch_accounts_hash: EpochAccountsHash, slot: Slot) -> Self { + Self::_new(State::Valid(epoch_accounts_hash, slot)) + } + + /// An epoch accounts hash calculation has been requested; update our state + pub fn set_in_flight(&self, slot: Slot) { + let mut state = self.state.lock().unwrap(); + if let State::InFlight(old_slot) = &*state { + panic!("An epoch accounts hash calculation is already in-flight from slot {old_slot}!"); + } + *state = State::InFlight(slot); + } + + /// An epoch accounts hash calculation has completed; update our state + pub fn set_valid(&self, epoch_accounts_hash: EpochAccountsHash, slot: Slot) { + let mut state = self.state.lock().unwrap(); + if let State::Valid(old_epoch_accounts_hash, old_slot) = &*state { + panic!( + "The epoch accounts hash is already valid! \ + \nold slot: {old_slot}, epoch accounts hash: {old_epoch_accounts_hash:?} \ + \nnew slot: {slot}, epoch accounts hash: {epoch_accounts_hash:?}" + ); + } + *state = State::Valid(epoch_accounts_hash, slot); + self.cvar.notify_all(); + } + + /// Get the epoch accounts hash + /// + /// If an EAH calculation is in-flight, then this call will block until it completes. + pub fn wait_get_epoch_accounts_hash(&self) -> EpochAccountsHash { + let mut state = self.state.lock().unwrap(); + loop { + match &*state { + State::Valid(epoch_accounts_hash, _slot) => break *epoch_accounts_hash, + State::Invalid => break SENTINEL_EPOCH_ACCOUNTS_HASH, + State::InFlight(_slot) => state = self.cvar.wait(state).unwrap(), + } + } + } + + /// Get the epoch accounts hash + /// + /// This fn does not block, and will only yield an EAH if the state is `Valid` + pub fn try_get_epoch_accounts_hash(&self) -> Option { + let state = self.state.lock().unwrap(); + match &*state { + State::Valid(epoch_accounts_hash, _slot) => Some(*epoch_accounts_hash), + _ => None, + } + } + + /// **FOR TESTS ONLY** + /// Set the state to Invalid + /// This is needed by tests that do not fully startup all the accounts background services. + /// **FOR TESTS ONLY** + pub fn set_invalid_for_tests(&self) { + *self.state.lock().unwrap() = State::Invalid; + } +} + +/// The EpochAccountsHash is calculated in the background via AccountsBackgroundService. This enum +/// is used to track the state of that calculation, and queried when saving the EAH into a Bank. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum State { + /// On startup from genesis/slot0, the initial state of the EAH is invalid since one has not + /// yet been requested. This state should only really occur for tests and new clusters; not + /// for established running clusters. + Invalid, + /// An EAH calculation has been requested (for `Slot`) and is in flight. The Bank that should + /// save the EAH must wait until the calculation has completed. + InFlight(Slot), + /// The EAH calculation is complete (for `Slot`) and the EAH value is valid to read/use. + Valid(EpochAccountsHash, Slot), +} + +/// Sentinel epoch accounts hash value; used when getting an Invalid EAH +/// +/// Displays as "Sentine1EpochAccountsHash111111111111111111" +const SENTINEL_EPOCH_ACCOUNTS_HASH: EpochAccountsHash = + EpochAccountsHash::new(Hash::new_from_array([ + 0x06, 0x92, 0x40, 0x3b, 0xee, 0xea, 0x7e, 0xe2, 0x7d, 0xf4, 0x90, 0x7f, 0xbd, 0x9e, 0xd0, + 0xd2, 0x1c, 0x2b, 0x66, 0x9a, 0xc4, 0xda, 0xce, 0xd7, 0x23, 0x41, 0x69, 0xab, 0xb7, 0x80, + 0x00, 0x00, + ])); + +#[cfg(test)] +mod tests { + use {super::*, std::time::Duration}; + + #[test] + fn test_new_valid() { + let epoch_accounts_hash = EpochAccountsHash::new(Hash::new_unique()); + let manager = Manager::new_valid(epoch_accounts_hash, 5678); + assert_eq!( + manager.try_get_epoch_accounts_hash(), + Some(epoch_accounts_hash), + ); + assert_eq!(manager.wait_get_epoch_accounts_hash(), epoch_accounts_hash); + } + + #[test] + fn test_new_invalid() { + let manager = Manager::new_invalid(); + assert!(manager.try_get_epoch_accounts_hash().is_none()); + assert_eq!( + manager.wait_get_epoch_accounts_hash(), + SENTINEL_EPOCH_ACCOUNTS_HASH, + ); + } + + #[test] + fn test_try_get_epoch_accounts_hash() { + let epoch_accounts_hash = EpochAccountsHash::new(Hash::new_unique()); + for (state, expected) in [ + (State::Invalid, None), + (State::InFlight(123), None), + ( + State::Valid(epoch_accounts_hash, 5678), + Some(epoch_accounts_hash), + ), + ] { + let manager = Manager::_new(state); + let actual = manager.try_get_epoch_accounts_hash(); + assert_eq!(actual, expected); + } + } + + #[test] + fn test_wait_epoch_accounts_hash() { + // Test: State is Invalid, no need to wait + { + let manager = Manager::new_invalid(); + assert_eq!( + manager.wait_get_epoch_accounts_hash(), + SENTINEL_EPOCH_ACCOUNTS_HASH, + ); + } + + // Test: State is Valid, no need to wait + { + let epoch_accounts_hash = EpochAccountsHash::new(Hash::new_unique()); + let manager = Manager::new_valid(epoch_accounts_hash, 5678); + assert_eq!(manager.wait_get_epoch_accounts_hash(), epoch_accounts_hash); + } + + // Test: State is InFlight, must wait + { + let epoch_accounts_hash = EpochAccountsHash::new(Hash::new_unique()); + let manager = Manager::new_invalid(); + manager.set_in_flight(123); + + std::thread::scope(|s| { + s.spawn(|| { + std::thread::sleep(Duration::from_secs(1)); + manager.set_valid(epoch_accounts_hash, 5678) + }); + assert!(manager.try_get_epoch_accounts_hash().is_none()); + assert_eq!(manager.wait_get_epoch_accounts_hash(), epoch_accounts_hash); + assert!(manager.try_get_epoch_accounts_hash().is_some()); + }); + } + } +} diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index c8459df9b9..fa3fee4a08 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -688,8 +688,12 @@ where accounts_update_notifier, exit, ); - *accounts_db.epoch_accounts_hash.lock().unwrap() = - epoch_accounts_hash.map(EpochAccountsHash::new); + + if let Some(epoch_accounts_hash) = epoch_accounts_hash { + accounts_db + .epoch_accounts_hash_manager + .set_valid(EpochAccountsHash::new(epoch_accounts_hash), 0); + } let AccountsDbFields( _snapshot_storages, diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index 4eee544aab..d9a875c94a 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -254,15 +254,15 @@ fn test_bank_serialize_style( if initial_epoch_accounts_hash { expected_epoch_accounts_hash = Some(Hash::new(&[7; 32])); - *bank2 + bank2 .rc .accounts .accounts_db - .epoch_accounts_hash - .lock() - .unwrap() = Some(EpochAccountsHash::new( - expected_epoch_accounts_hash.unwrap(), - )); + .epoch_accounts_hash_manager + .set_valid( + EpochAccountsHash::new(expected_epoch_accounts_hash.unwrap()), + 0, + ); } crate::serde_snapshot::bank_to_stream( @@ -416,7 +416,7 @@ fn test_bank_serialize_style( assert_eq!(dbank.get_accounts_hash(), accounts_hash); assert!(bank2 == dbank); assert_eq!(dbank.incremental_snapshot_persistence, incremental); - assert_eq!(dbank.rc.accounts.accounts_db.epoch_accounts_hash.lock().unwrap().map(|hash| *hash.as_ref()), expected_epoch_accounts_hash, + assert_eq!(dbank.rc.accounts.accounts_db.epoch_accounts_hash_manager.try_get_epoch_accounts_hash().map(|hash| *hash.as_ref()), expected_epoch_accounts_hash, "(reserialize_accounts_hash, incremental_snapshot_persistence, epoch_accounts_hash, update_accounts_hash, initial_epoch_accounts_hash): {:?}", ( reserialize_accounts_hash,