async hash verify on load (#26208)

* verify accounts hash in bg on startup

* fix some tests and loading from genesis

* add extra state for when background thread has completed
This commit is contained in:
Jeff Washington (jwash) 2022-07-15 14:29:56 -05:00 committed by GitHub
parent 15d18a03e5
commit 47716a5e01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 338 additions and 74 deletions

View File

@ -5928,6 +5928,8 @@ pub(crate) mod tests {
let my_vote_pubkey = my_vote_keypair[0].pubkey();
let bank0 = bank_forks.read().unwrap().get(0).unwrap();
bank0.set_initial_accounts_hash_verification_completed();
let (voting_sender, voting_receiver) = unbounded();
// Simulate landing a vote for slot 0 landing in slot 1

View File

@ -207,6 +207,7 @@ fn run_bank_forks_snapshot_n<F>(
);
let bank_forks = &mut snapshot_test_config.bank_forks;
bank_forks.root_bank().set_startup_verification_complete();
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
@ -667,6 +668,7 @@ fn test_bank_forks_incremental_snapshot(
snapshot_test_config.accounts_dir.path().display(), snapshot_test_config.bank_snapshots_dir.path().display(), snapshot_test_config.full_snapshot_archives_dir.path().display(), snapshot_test_config.incremental_snapshot_archives_dir.path().display());
let bank_forks = &mut snapshot_test_config.bank_forks;
bank_forks.root_bank().set_startup_verification_complete();
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
@ -901,6 +903,10 @@ fn test_snapshots_with_background_services(
let pending_accounts_package = PendingAccountsPackage::default();
let pending_snapshot_package = PendingSnapshotPackage::default();
snapshot_test_config
.bank_forks
.root_bank()
.set_startup_verification_complete();
let bank_forks = Arc::new(RwLock::new(snapshot_test_config.bank_forks));
let callback = bank_forks
.read()

View File

@ -135,17 +135,21 @@ pub fn load_bank_forks(
}
info!("Processing ledger from genesis");
(
blockstore_processor::process_blockstore_for_bank_0(
let bank_forks = blockstore_processor::process_blockstore_for_bank_0(
genesis_config,
blockstore,
account_paths,
process_options,
cache_block_meta_sender,
accounts_update_notifier,
),
None,
)
);
bank_forks
.read()
.unwrap()
.root_bank()
.set_startup_verification_complete();
(bank_forks, None)
};
let mut leader_schedule_cache =

View File

@ -1539,6 +1539,7 @@ fn load_frozen_forks(
can_cached_slot_be_unflushed,
ignore_mismatch: true,
require_rooted_bank: false,
run_in_background: false,
});
break;
}

View File

@ -54,7 +54,7 @@ impl RpcHealth {
}
}
if !self.startup_verification_complete.load(Ordering::Relaxed) {
if !self.startup_verification_complete.load(Ordering::Acquire) {
return RpcHealthStatus::Unknown;
}

View File

@ -52,6 +52,7 @@ use {
rent_paying_accounts_by_partition::RentPayingAccountsByPartition,
sorted_storages::SortedStorages,
storable_accounts::StorableAccounts,
verify_accounts_hash_in_background::VerifyAccountsHashInBackground,
},
blake3::traits::digest::Digest,
crossbeam_channel::{unbounded, Receiver, Sender},
@ -1135,8 +1136,6 @@ pub struct AccountsDb {
/// true if drop_callback is attached to the bank.
is_bank_drop_callback_enabled: AtomicBool,
pub startup_verification_complete: Arc<AtomicBool>,
/// Set of slots currently being flushed by `flush_slot_cache()` or removed
/// by `remove_unrooted_slot()`. Used to ensure `remove_unrooted_slots(slots)`
/// can safely clear the set of unrooted slots `slots`.
@ -1167,6 +1166,8 @@ pub struct AccountsDb {
/// number of slots remaining where filler accounts should be added
pub filler_account_slots_remaining: AtomicU64,
pub(crate) verify_accounts_hash_in_bg: VerifyAccountsHashInBackground,
// # of passes should be a function of the total # of accounts that are active.
// higher passes = slower total time, lower dynamic memory usage
// lower passes = faster total time, higher dynamic memory usage
@ -1922,12 +1923,8 @@ impl AccountsDb {
// rayon needs a lot of stack
const ACCOUNTS_STACK_SIZE: usize = 8 * 1024 * 1024;
// this will be live shortly
// for now, this check occurs at startup, so it must always be true
let startup_verification_complete = Arc::new(AtomicBool::new(true));
AccountsDb {
startup_verification_complete,
verify_accounts_hash_in_bg: VerifyAccountsHashInBackground::default(),
filler_accounts_per_slot: AtomicU64::default(),
filler_account_slots_remaining: AtomicU64::default(),
active_stats: ActiveStats::default(),
@ -6927,6 +6924,7 @@ impl AccountsDb {
let use_index = false;
let check_hash = false; // this will not be supported anymore
// interesting to consider this
let is_startup = true;
let (calculated_hash, calculated_lamports) = self
.calculate_accounts_hash_helper_with_verify(

View File

@ -165,6 +165,7 @@ use {
},
Arc, LockResult, RwLock, RwLockReadGuard, RwLockWriteGuard,
},
thread::Builder,
time::{Duration, Instant},
},
};
@ -175,6 +176,7 @@ pub struct VerifyBankHash {
pub can_cached_slot_be_unflushed: bool,
pub ignore_mismatch: bool,
pub require_rooted_bank: bool,
pub run_in_background: bool,
}
#[derive(Debug, Default)]
@ -3710,11 +3712,30 @@ impl Bank {
}
pub fn get_startup_verification_complete(&self) -> &Arc<AtomicBool> {
&self.rc.accounts.accounts_db.startup_verification_complete
&self
.rc
.accounts
.accounts_db
.verify_accounts_hash_in_bg
.verified
}
pub fn is_startup_verification_complete(&self) -> bool {
self.get_startup_verification_complete().load(Relaxed)
self.rc
.accounts
.accounts_db
.verify_accounts_hash_in_bg
.check_complete()
}
/// This can occur because it completed in the background
/// or if the verification was run in the foreground.
pub fn set_startup_verification_complete(&self) {
self.rc
.accounts
.accounts_db
.verify_accounts_hash_in_bg
.verification_complete()
}
pub fn get_fee_for_message_with_lamports_per_signature(
@ -6815,13 +6836,20 @@ impl Bank {
/// Recalculate the hash_internal_state from the account stores. Would be used to verify a
/// snapshot.
/// return true if all is good
/// Only called from startup or test code.
#[must_use]
pub fn verify_bank_hash(&self, config: VerifyBankHash) -> bool {
let accounts = &self.rc.accounts;
// Wait until initial hash calc is complete before starting a new hash calc.
// This should only occur when we halt at a slot in ledger-tool.
accounts
.accounts_db
.verify_accounts_hash_in_bg
.wait_for_complete();
if config.require_rooted_bank
&& !self
.rc
.accounts
&& !accounts
.accounts_db
.accounts_index
.is_alive_root(self.slot())
@ -6835,26 +6863,79 @@ impl Bank {
panic!("cannot verify bank hash when bank is not a root");
}
}
self.rc.accounts.verify_bank_hash_and_lamports(
self.slot(),
&self.ancestors,
self.capitalization(),
let slot = self.slot();
let ancestors = &self.ancestors;
let cap = self.capitalization();
let epoch_schedule = self.epoch_schedule();
let rent_collector = self.rent_collector();
if config.run_in_background {
let ancestors = ancestors.clone();
let accounts = Arc::clone(accounts);
let epoch_schedule = *epoch_schedule;
let rent_collector = rent_collector.clone();
let accounts_ = Arc::clone(&accounts);
accounts.accounts_db.verify_accounts_hash_in_bg.start(|| {
Builder::new()
.name("solana-bg-hash-verifier".to_string())
.spawn(move || {
info!(
"running initial verification accounts hash calculation in background"
);
let result = accounts_.verify_bank_hash_and_lamports(
slot,
&ancestors,
cap,
config.test_hash_calculation,
self.epoch_schedule(),
&self.rent_collector,
&epoch_schedule,
&rent_collector,
config.can_cached_slot_be_unflushed,
config.ignore_mismatch,
)
);
accounts_
.accounts_db
.verify_accounts_hash_in_bg
.background_finished();
result
})
.unwrap()
});
true // initial result is true. We haven't failed yet. If verification fails, we'll panic from bg thread.
} else {
let result = accounts.verify_bank_hash_and_lamports(
slot,
&self.ancestors,
cap,
config.test_hash_calculation,
epoch_schedule,
rent_collector,
config.can_cached_slot_be_unflushed,
config.ignore_mismatch,
);
self.set_initial_accounts_hash_verification_completed();
result
}
}
/// Specify that initial verification has completed.
/// Called internally when verification runs in the foreground thread.
/// Also has to be called by some tests which don't do verification on startup.
pub fn set_initial_accounts_hash_verification_completed(&self) {
self.rc
.accounts
.accounts_db
.verify_accounts_hash_in_bg
.verification_complete();
}
/// return true if bg hash verification is complete
/// return false if bg hash verification has not completed yet
/// if hash verification failed, a panic will occur
pub fn has_initial_accounts_hash_verification_completed(&self) -> bool {
// this will be live shortly
// for now, this check occurs at startup, so it must always be true
true
self.rc
.accounts
.accounts_db
.verify_accounts_hash_in_bg
.check_complete()
}
pub fn get_snapshot_storages(&self, base_slot: Option<Slot>) -> SnapshotStorages {
@ -7067,10 +7148,16 @@ impl Bank {
can_cached_slot_be_unflushed: false,
ignore_mismatch: false,
require_rooted_bank: false,
run_in_background: true,
});
verify_time.stop();
(verify, verify_time.as_us())
} else {
self.rc
.accounts
.accounts_db
.verify_accounts_hash_in_bg
.verification_complete();
(true, 0)
};
@ -10287,6 +10374,7 @@ pub(crate) mod tests {
can_cached_slot_be_unflushed: false,
ignore_mismatch: false,
require_rooted_bank: false,
run_in_background: false,
}
}
}

View File

@ -73,6 +73,7 @@ mod storable_accounts;
mod system_instruction_processor;
pub mod transaction_batch;
pub mod transaction_error_metrics;
mod verify_accounts_hash_in_background;
pub mod vote_account;
pub mod vote_parser;
pub mod vote_sender_types;

View File

@ -0,0 +1,165 @@
//! at startup, verify accounts hash in the background
use {
crate::waitable_condvar::WaitableCondvar,
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread::JoinHandle,
time::Duration,
},
};
#[derive(Debug)]
pub(crate) struct VerifyAccountsHashInBackground {
/// true when verification has completed or never had to run in background
pub(crate) verified: Arc<AtomicBool>,
/// enable waiting for verification to become complete
complete: Arc<WaitableCondvar>,
/// thread doing verification
thread: Mutex<Option<JoinHandle<bool>>>,
/// set when background thread has completed
background_completed: Arc<AtomicBool>,
}
impl Default for VerifyAccountsHashInBackground {
fn default() -> Self {
// initialize, expecting possible background verification to be started
Self {
complete: Arc::default(),
// with default initialization, 'verified' is false
verified: Arc::new(AtomicBool::new(false)),
// no thread to start with
thread: Mutex::new(None::<JoinHandle<bool>>),
background_completed: Arc::new(AtomicBool::new(false)),
}
}
}
impl VerifyAccountsHashInBackground {
/// start the bg thread to do the verification
pub(crate) fn start(&self, start: impl FnOnce() -> JoinHandle<bool>) {
// note that we're not verified before
self.verified.store(false, Ordering::Release);
*self.thread.lock().unwrap() = Some(start());
}
/// notify that the bg process has completed
pub(crate) fn background_finished(&self) {
self.complete.notify_all();
self.background_completed.store(true, Ordering::Release);
}
/// notify that verification was completed successfully
/// This can occur because it completed in the background
/// or if the verification was run in the foreground.
pub(crate) fn verification_complete(&self) {
self.verified.store(true, Ordering::Release);
}
/// block until bg process is complete
pub fn wait_for_complete(&self) {
// just now completing
let mut lock = self.thread.lock().unwrap();
if lock.is_none() {
return; // nothing to do
}
let result = lock.take().unwrap().join().unwrap();
if !result {
panic!("initial hash verification failed");
}
// we never have to check again
self.verification_complete();
}
/// return true if bg hash verification is complete
/// return false if bg hash verification has not completed yet
/// if hash verification failed, a panic will occur
pub(crate) fn check_complete(&self) -> bool {
if self.verified.load(Ordering::Acquire) {
// already completed
return true;
}
if self.complete.wait_timeout(Duration::default())
&& !self.background_completed.load(Ordering::Acquire)
{
// timed out, so not complete
false
} else {
// Did not time out, so thread finished. Join it.
self.wait_for_complete();
true
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use {super::*, std::thread::Builder};
#[test]
fn test_default() {
let def = VerifyAccountsHashInBackground::default();
assert!(!def.check_complete());
assert!(!def.verified.load(Ordering::Acquire));
assert!(def.thread.lock().unwrap().is_none());
def.verification_complete();
assert!(def.check_complete());
}
fn start_thread_and_return(
verify: &Arc<VerifyAccountsHashInBackground>,
result: bool,
action: impl FnOnce() + Send + 'static,
) {
assert!(!verify.check_complete());
let verify_ = Arc::clone(verify);
verify.start(|| {
Builder::new()
.name("solana-bg-hash-verifier".to_string())
.spawn(move || {
// should have been marked not complete before thread started
assert!(!verify_.check_complete());
action();
verify_.background_finished();
result
})
.unwrap()
});
}
#[test]
fn test_real() {
solana_logger::setup();
let verify = Arc::new(VerifyAccountsHashInBackground::default());
start_thread_and_return(&verify, true, || {});
verify.wait_for_complete();
assert!(verify.check_complete());
}
#[test]
#[should_panic(expected = "initial hash verification failed")]
fn test_panic() {
let verify = Arc::new(VerifyAccountsHashInBackground::default());
start_thread_and_return(&verify, false, || {});
verify.wait_for_complete();
assert!(!verify.check_complete());
}
#[test]
fn test_long_running() {
solana_logger::setup();
let verify = Arc::new(VerifyAccountsHashInBackground::default());
let finish = Arc::new(AtomicBool::default());
let finish_ = finish.clone();
start_thread_and_return(&verify, true, move || {
// busy wait until atomic is set
while !finish_.load(Ordering::Relaxed) {}
});
assert!(!verify.check_complete());
finish.store(true, Ordering::Relaxed);
verify.wait_for_complete();
assert!(verify.check_complete());
}
}

View File

@ -527,6 +527,15 @@ impl TestValidator {
.expect("validator start failed")
}
/// allow tests to indicate that validator has completed initialization
pub fn set_startup_verification_complete(&self) {
self.bank_forks()
.read()
.unwrap()
.root_bank()
.set_startup_verification_complete();
}
/// Initialize the ledger directory
///
/// If `ledger_path` is `None`, a temporary ledger will be created. Otherwise the ledger will
@ -919,6 +928,7 @@ mod test {
#[test]
fn get_health() {
let (test_validator, _payer) = TestValidatorGenesis::default().start();
test_validator.set_startup_verification_complete();
let rpc_client = test_validator.get_rpc_client();
rpc_client.get_health().expect("health");
}
@ -926,6 +936,7 @@ mod test {
#[tokio::test]
async fn nonblocking_get_health() {
let (test_validator, _payer) = TestValidatorGenesis::default().start_async().await;
test_validator.set_startup_verification_complete();
let rpc_client = test_validator.get_async_rpc_client();
rpc_client.get_health().await.expect("health");
}

View File

@ -1248,8 +1248,7 @@ mod tests {
#[test]
fn test_process_token_allocations() {
let alice = Keypair::new();
let test_validator =
TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
let test_validator = simple_test_validator_no_fees(alice.pubkey());
let url = test_validator.rpc_url();
let client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
@ -1259,19 +1258,24 @@ mod tests {
#[test]
fn test_process_transfer_amount_allocations() {
let alice = Keypair::new();
let test_validator =
TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
let test_validator = simple_test_validator_no_fees(alice.pubkey());
let url = test_validator.rpc_url();
let client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
test_process_distribute_tokens_with_client(&client, alice, Some(sol_to_lamports(1.5)));
}
fn simple_test_validator_no_fees(pubkey: Pubkey) -> TestValidator {
let test_validator =
TestValidator::with_no_fees(pubkey, None, SocketAddrSpace::Unspecified);
test_validator.set_startup_verification_complete();
test_validator
}
#[test]
fn test_create_stake_allocations() {
let alice = Keypair::new();
let test_validator =
TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
let test_validator = simple_test_validator_no_fees(alice.pubkey());
let url = test_validator.rpc_url();
let client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
@ -1281,8 +1285,7 @@ mod tests {
#[test]
fn test_process_stake_allocations() {
let alice = Keypair::new();
let test_validator =
TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
let test_validator = simple_test_validator_no_fees(alice.pubkey());
let url = test_validator.rpc_url();
let client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
@ -1599,12 +1602,7 @@ mod tests {
#[test]
fn test_check_payer_balances_distribute_tokens_single_payer() {
let alice = Keypair::new();
let test_validator = TestValidator::with_custom_fees(
alice.pubkey(),
10_000,
None,
SocketAddrSpace::Unspecified,
);
let test_validator = simple_test_validator(alice.pubkey());
let url = test_validator.rpc_url();
let client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
@ -1693,13 +1691,9 @@ mod tests {
#[test]
fn test_check_payer_balances_distribute_tokens_separate_payers() {
solana_logger::setup();
let alice = Keypair::new();
let test_validator = TestValidator::with_custom_fees(
alice.pubkey(),
10_000,
None,
SocketAddrSpace::Unspecified,
);
let test_validator = simple_test_validator(alice.pubkey());
let url = test_validator.rpc_url();
let client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
@ -1818,15 +1812,17 @@ mod tests {
}
}
fn simple_test_validator(alice: Pubkey) -> TestValidator {
let test_validator =
TestValidator::with_custom_fees(alice, 10_000, None, SocketAddrSpace::Unspecified);
test_validator.set_startup_verification_complete();
test_validator
}
#[test]
fn test_check_payer_balances_distribute_stakes_single_payer() {
let alice = Keypair::new();
let test_validator = TestValidator::with_custom_fees(
alice.pubkey(),
10_000,
None,
SocketAddrSpace::Unspecified,
);
let test_validator = simple_test_validator(alice.pubkey());
let url = test_validator.rpc_url();
let client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
@ -1946,13 +1942,9 @@ mod tests {
#[test]
fn test_check_payer_balances_distribute_stakes_separate_payers() {
solana_logger::setup();
let alice = Keypair::new();
let test_validator = TestValidator::with_custom_fees(
alice.pubkey(),
10_000,
None,
SocketAddrSpace::Unspecified,
);
let test_validator = simple_test_validator(alice.pubkey());
let url = test_validator.rpc_url();
let client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());
@ -2276,11 +2268,7 @@ mod tests {
#[test]
fn test_distribute_allocations_dump_db() {
let sender_keypair = Keypair::new();
let test_validator = TestValidator::with_no_fees(
sender_keypair.pubkey(),
None,
SocketAddrSpace::Unspecified,
);
let test_validator = simple_test_validator_no_fees(sender_keypair.pubkey());
let url = test_validator.rpc_url();
let client = RpcClient::new_with_commitment(url, CommitmentConfig::processed());