Bank::freeze() waits for EAH calculation to complete (#28170)
This commit is contained in:
parent
63c00e7f5e
commit
a8c6a9e5fc
|
@ -5870,6 +5870,7 @@ dependencies = [
|
|||
"base64 0.13.0",
|
||||
"bincode",
|
||||
"chrono-humanize",
|
||||
"crossbeam-channel",
|
||||
"log",
|
||||
"serde",
|
||||
"solana-banks-client",
|
||||
|
|
|
@ -234,20 +234,16 @@ impl AccountsHashVerifier {
|
|||
|
||||
fn save_epoch_accounts_hash(accounts_package: &AccountsPackage, accounts_hash: Hash) {
|
||||
if accounts_package.package_type == AccountsPackageType::EpochAccountsHash {
|
||||
debug!(
|
||||
info!(
|
||||
"saving epoch accounts hash, slot: {}, hash: {}",
|
||||
accounts_package.slot, accounts_hash
|
||||
);
|
||||
let new_epoch_accounts_hash = EpochAccountsHash::new(accounts_hash);
|
||||
let old_epoch_accounts_hash = accounts_package
|
||||
let epoch_accounts_hash = EpochAccountsHash::new(accounts_hash);
|
||||
accounts_package
|
||||
.accounts
|
||||
.accounts_db
|
||||
.epoch_accounts_hash
|
||||
.lock()
|
||||
.unwrap()
|
||||
.replace(new_epoch_accounts_hash);
|
||||
// Old epoch accounts hash must be NONE, because a previous bank must have taken it to hash into itself
|
||||
assert!(old_epoch_accounts_hash.is_none());
|
||||
.epoch_accounts_hash_manager
|
||||
.set_valid(epoch_accounts_hash, accounts_package.slot);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -211,13 +211,31 @@ fn run_bank_forks_snapshot_n<F>(
|
|||
let bank_forks = &mut snapshot_test_config.bank_forks;
|
||||
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
|
||||
|
||||
let pending_accounts_package = PendingAccountsPackage::default();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let node_id = Arc::new(Keypair::new());
|
||||
let cluster_info = Arc::new(ClusterInfo::new(
|
||||
ContactInfo::new_localhost(&node_id.pubkey(), timestamp()),
|
||||
Arc::clone(&node_id),
|
||||
SocketAddrSpace::Unspecified,
|
||||
));
|
||||
let accounts_hash_verifier = AccountsHashVerifier::new(
|
||||
Arc::clone(&pending_accounts_package),
|
||||
None,
|
||||
&exit,
|
||||
&cluster_info,
|
||||
None,
|
||||
false,
|
||||
0,
|
||||
Some(snapshot_test_config.snapshot_config.clone()),
|
||||
);
|
||||
|
||||
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
|
||||
let request_sender = AbsRequestSender::new(snapshot_request_sender);
|
||||
let pending_accounts_package = PendingAccountsPackage::default();
|
||||
let snapshot_request_handler = SnapshotRequestHandler {
|
||||
snapshot_config: snapshot_test_config.snapshot_config.clone(),
|
||||
snapshot_request_receiver,
|
||||
pending_accounts_package: pending_accounts_package.clone(),
|
||||
pending_accounts_package,
|
||||
};
|
||||
for slot in 1..=last_slot {
|
||||
let mut bank = Bank::new_from_parent(&bank_forks[slot - 1], &Pubkey::default(), slot);
|
||||
|
@ -231,13 +249,6 @@ fn run_bank_forks_snapshot_n<F>(
|
|||
bank_forks.set_root(bank.slot(), &request_sender, None);
|
||||
bank.update_accounts_hash();
|
||||
snapshot_request_handler.handle_snapshot_requests(false, false, 0, &mut None);
|
||||
|
||||
// Clear out any pending accounts package. Since `set_root()` can trigger an Epoch
|
||||
// Accounts Hash request, we must ensure that there is not already a pending EAH
|
||||
// accounts package, otherwise ABS will panic when trying to submit a second EAH
|
||||
// accounts package. The most straight forward way is to clear the pending accounts
|
||||
// package every time.
|
||||
pending_accounts_package.lock().unwrap().take();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -283,6 +294,9 @@ fn run_bank_forks_snapshot_n<F>(
|
|||
let account_paths = &[snapshot_test_config.accounts_dir.path().to_path_buf()];
|
||||
let genesis_config = &snapshot_test_config.genesis_config_info.genesis_config;
|
||||
restore_from_snapshot(bank_forks, last_slot, genesis_config, account_paths);
|
||||
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
accounts_hash_verifier.join().unwrap();
|
||||
}
|
||||
|
||||
#[test_case(V1_2_0, Development)]
|
||||
|
@ -591,6 +605,16 @@ fn test_slots_to_snapshot(snapshot_version: SnapshotVersion, cluster_type: Clust
|
|||
snapshot_test_config
|
||||
.bank_forks
|
||||
.set_root(current_bank.slot(), &request_sender, None);
|
||||
|
||||
// Since the accounts background services are not runnning, EpochAccountsHash
|
||||
// calculation requests will not be handled. To prevent banks from hanging during
|
||||
// Bank::freeze() due to waiting for EAH to complete, just set the EAH to Invalid.
|
||||
current_bank
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.epoch_accounts_hash_manager
|
||||
.set_invalid_for_tests();
|
||||
}
|
||||
|
||||
let num_old_slots = num_set_roots * *add_root_interval - MAX_CACHE_ENTRIES + 1;
|
||||
|
@ -685,12 +709,31 @@ fn test_bank_forks_incremental_snapshot(
|
|||
let bank_forks = &mut snapshot_test_config.bank_forks;
|
||||
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
|
||||
|
||||
let pending_accounts_package = PendingAccountsPackage::default();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let node_id = Arc::new(Keypair::new());
|
||||
let cluster_info = Arc::new(ClusterInfo::new(
|
||||
ContactInfo::new_localhost(&node_id.pubkey(), timestamp()),
|
||||
Arc::clone(&node_id),
|
||||
SocketAddrSpace::Unspecified,
|
||||
));
|
||||
let accounts_hash_verifier = AccountsHashVerifier::new(
|
||||
Arc::clone(&pending_accounts_package),
|
||||
None,
|
||||
&exit,
|
||||
&cluster_info,
|
||||
None,
|
||||
false,
|
||||
0,
|
||||
Some(snapshot_test_config.snapshot_config.clone()),
|
||||
);
|
||||
|
||||
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
|
||||
let request_sender = AbsRequestSender::new(snapshot_request_sender);
|
||||
let snapshot_request_handler = SnapshotRequestHandler {
|
||||
snapshot_config: snapshot_test_config.snapshot_config.clone(),
|
||||
snapshot_request_receiver,
|
||||
pending_accounts_package: PendingAccountsPackage::default(),
|
||||
pending_accounts_package,
|
||||
};
|
||||
|
||||
let mut last_full_snapshot_slot = None;
|
||||
|
@ -761,6 +804,8 @@ fn test_bank_forks_incremental_snapshot(
|
|||
.unwrap();
|
||||
}
|
||||
}
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
accounts_hash_verifier.join().unwrap();
|
||||
}
|
||||
|
||||
fn make_full_snapshot_archive(
|
||||
|
|
|
@ -18,7 +18,7 @@ use {
|
|||
solana_program_runtime::timings::{ExecuteTimingType, ExecuteTimings, ThreadExecuteTimings},
|
||||
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
|
||||
solana_runtime::{
|
||||
accounts_background_service::AbsRequestSender,
|
||||
accounts_background_service::{AbsRequestSender, SnapshotRequestType},
|
||||
accounts_db::{AccountShrinkThreshold, AccountsDbConfig},
|
||||
accounts_index::AccountSecondaryIndexes,
|
||||
accounts_update_notifier_interface::AccountsUpdateNotifier,
|
||||
|
@ -56,7 +56,10 @@ use {
|
|||
collections::{HashMap, HashSet},
|
||||
path::PathBuf,
|
||||
result,
|
||||
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering::Relaxed},
|
||||
Arc, Mutex, RwLock,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
},
|
||||
thiserror::Error,
|
||||
|
@ -727,6 +730,35 @@ pub fn test_process_blockstore(
|
|||
opts: &ProcessOptions,
|
||||
exit: &Arc<AtomicBool>,
|
||||
) -> (Arc<RwLock<BankForks>>, LeaderScheduleCache) {
|
||||
// Spin up a thread to be a fake Accounts Background Service. Need to intercept and handle
|
||||
// (i.e. skip/make invalid) all EpochAccountsHash requests so future rooted banks do not hang
|
||||
// in Bank::freeze() waiting for an in-flight EAH calculation to complete.
|
||||
let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
|
||||
let abs_request_sender = AbsRequestSender::new(snapshot_request_sender);
|
||||
let bg_exit = Arc::new(AtomicBool::new(false));
|
||||
let bg_thread = {
|
||||
let exit = Arc::clone(&bg_exit);
|
||||
std::thread::spawn(move || {
|
||||
while !exit.load(Relaxed) {
|
||||
snapshot_request_receiver
|
||||
.try_iter()
|
||||
.filter(|snapshot_request| {
|
||||
snapshot_request.request_type == SnapshotRequestType::EpochAccountsHash
|
||||
})
|
||||
.for_each(|snapshot_request| {
|
||||
snapshot_request
|
||||
.snapshot_root_bank
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.epoch_accounts_hash_manager
|
||||
.set_invalid_for_tests();
|
||||
});
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let (bank_forks, leader_schedule_cache, ..) = crate::bank_forks_utils::load_bank_forks(
|
||||
genesis_config,
|
||||
blockstore,
|
||||
|
@ -738,6 +770,7 @@ pub fn test_process_blockstore(
|
|||
None,
|
||||
exit,
|
||||
);
|
||||
|
||||
process_blockstore_from_root(
|
||||
blockstore,
|
||||
&bank_forks,
|
||||
|
@ -745,9 +778,13 @@ pub fn test_process_blockstore(
|
|||
opts,
|
||||
None,
|
||||
None,
|
||||
&AbsRequestSender::default(),
|
||||
&abs_request_sender,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
bg_exit.store(true, Relaxed);
|
||||
bg_thread.join().unwrap();
|
||||
|
||||
(bank_forks, leader_schedule_cache)
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ use {
|
|||
clock::{DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT},
|
||||
commitment_config::CommitmentConfig,
|
||||
epoch_schedule::EpochSchedule,
|
||||
feature_set,
|
||||
genesis_config::{ClusterType, GenesisConfig},
|
||||
message::Message,
|
||||
poh_config::PohConfig,
|
||||
|
@ -258,11 +257,6 @@ impl LocalCluster {
|
|||
),
|
||||
);
|
||||
|
||||
// Do not enable Epoch Accounts Hash in local-cluster tests yet
|
||||
genesis_config
|
||||
.accounts
|
||||
.remove(&feature_set::epoch_accounts_hash::id());
|
||||
|
||||
let (leader_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
|
||||
let leader_contact_info = leader_node.info.clone();
|
||||
let mut leader_config = safe_clone_config(&config.validator_configs[0]);
|
||||
|
|
|
@ -723,7 +723,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st
|
|||
// If these intervals change, also make sure to change the loop timers accordingly.
|
||||
let accounts_hash_interval = 3;
|
||||
let incremental_snapshot_interval = accounts_hash_interval * 3;
|
||||
let full_snapshot_interval = incremental_snapshot_interval * 3;
|
||||
let full_snapshot_interval = incremental_snapshot_interval * 5;
|
||||
|
||||
let num_account_paths = 3;
|
||||
let leader_snapshot_test_config = SnapshotValidatorConfig::new(
|
||||
|
@ -1302,7 +1302,7 @@ fn test_snapshot_restart_tower() {
|
|||
fn test_snapshots_blockstore_floor() {
|
||||
solana_logger::setup_with_default(RUST_LOG_FILTER);
|
||||
// First set up the cluster with 1 snapshotting leader
|
||||
let snapshot_interval_slots = 10;
|
||||
let snapshot_interval_slots = 100;
|
||||
let num_account_paths = 4;
|
||||
|
||||
let leader_snapshot_test_config =
|
||||
|
@ -1408,7 +1408,7 @@ fn test_snapshots_blockstore_floor() {
|
|||
#[serial]
|
||||
fn test_snapshots_restart_validity() {
|
||||
solana_logger::setup_with_default(RUST_LOG_FILTER);
|
||||
let snapshot_interval_slots = 10;
|
||||
let snapshot_interval_slots = 100;
|
||||
let num_account_paths = 1;
|
||||
let mut snapshot_test_config =
|
||||
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
|
||||
|
|
|
@ -13,6 +13,7 @@ async-trait = "0.1.57"
|
|||
base64 = "0.13.0"
|
||||
bincode = "1.3.3"
|
||||
chrono-humanize = "0.2.1"
|
||||
crossbeam-channel = "0.5"
|
||||
log = "0.4.17"
|
||||
serde = "1.0.144"
|
||||
solana-banks-client = { path = "../banks-client", version = "=1.15.0" }
|
||||
|
|
|
@ -15,6 +15,7 @@ use {
|
|||
stable_log, timings::ExecuteTimings,
|
||||
},
|
||||
solana_runtime::{
|
||||
accounts_background_service::{AbsRequestSender, SnapshotRequestType},
|
||||
bank::Bank,
|
||||
bank_forks::BankForks,
|
||||
builtins::Builtin,
|
||||
|
@ -1129,11 +1130,29 @@ impl ProgramTestContext {
|
|||
pre_warp_slot,
|
||||
))
|
||||
};
|
||||
bank_forks.set_root(
|
||||
pre_warp_slot,
|
||||
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
|
||||
Some(pre_warp_slot),
|
||||
);
|
||||
|
||||
let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
|
||||
let abs_request_sender = AbsRequestSender::new(snapshot_request_sender);
|
||||
|
||||
bank_forks.set_root(pre_warp_slot, &abs_request_sender, Some(pre_warp_slot));
|
||||
|
||||
// The call to `set_root()` above will send an EAH request. Need to intercept and handle
|
||||
// (i.e. skip/make invalid) all EpochAccountsHash requests so future rooted banks do not
|
||||
// hang in Bank::freeze() waiting for an in-flight EAH calculation to complete.
|
||||
snapshot_request_receiver
|
||||
.try_iter()
|
||||
.filter(|snapshot_request| {
|
||||
snapshot_request.request_type == SnapshotRequestType::EpochAccountsHash
|
||||
})
|
||||
.for_each(|snapshot_request| {
|
||||
snapshot_request
|
||||
.snapshot_root_bank
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.epoch_accounts_hash_manager
|
||||
.set_invalid_for_tests();
|
||||
});
|
||||
|
||||
// warp_bank is frozen so go forward to get unfrozen bank at warp_slot
|
||||
bank_forks.insert(Bank::new_from_parent(
|
||||
|
|
|
@ -5264,6 +5264,7 @@ dependencies = [
|
|||
"base64 0.13.0",
|
||||
"bincode",
|
||||
"chrono-humanize",
|
||||
"crossbeam-channel",
|
||||
"log",
|
||||
"serde",
|
||||
"solana-banks-client",
|
||||
|
|
|
@ -45,7 +45,7 @@ use {
|
|||
bank::Rewrites,
|
||||
cache_hash_data::CacheHashData,
|
||||
contains::Contains,
|
||||
epoch_accounts_hash::EpochAccountsHash,
|
||||
epoch_accounts_hash::EpochAccountsHashManager,
|
||||
expected_rent_collection::{ExpectedRentCollection, SlotInfoInEpoch},
|
||||
pubkey_bins::PubkeyBinCalculator24,
|
||||
read_only_accounts_cache::ReadOnlyAccountsCache,
|
||||
|
@ -1198,7 +1198,7 @@ pub struct AccountsDb {
|
|||
/// The cadence is once per epoch, all nodes calculate a full accounts hash as of a known slot calculated using 'N'
|
||||
/// Some time later (to allow for slow calculation time), the bank hash at a slot calculated using 'M' includes the full accounts hash.
|
||||
/// Thus, the state of all accounts on a validator is known to be correct at least once per epoch.
|
||||
pub epoch_accounts_hash: Mutex<Option<EpochAccountsHash>>,
|
||||
pub epoch_accounts_hash_manager: EpochAccountsHashManager,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
@ -2087,7 +2087,7 @@ impl AccountsDb {
|
|||
num_hash_scan_passes,
|
||||
log_dead_slots: AtomicBool::new(true),
|
||||
exhaustively_verify_refcounts: false,
|
||||
epoch_accounts_hash: Mutex::new(None),
|
||||
epoch_accounts_hash_manager: EpochAccountsHashManager::new_invalid(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6869,28 +6869,11 @@ impl Bank {
|
|||
self.last_blockhash().as_ref(),
|
||||
]);
|
||||
|
||||
let epoch_accounts_hash = self.epoch_accounts_hash();
|
||||
let should_include_epoch_accounts_hash = self.should_include_epoch_accounts_hash();
|
||||
if should_include_epoch_accounts_hash {
|
||||
// Nothing is writing a value into the epoch accounts hash yet—this is not a problem
|
||||
// for normal clusters, as the feature gating this `if` block is always false.
|
||||
// However, some tests enable all features, so this `if` block can be true.
|
||||
//
|
||||
// For now, check to see if the epoch accounts hash is `Some` before hashing. Once the
|
||||
// writer-side is implemented, change this to be an `.expect()` or `.unwrap()`, as it
|
||||
// will be required for the epoch accounts hash calculation to have completed and
|
||||
// for this value to be `Some`.
|
||||
if let Some(epoch_accounts_hash) = epoch_accounts_hash {
|
||||
debug!(
|
||||
"including epoch accounts hash, slot: {}, hash: {:?}",
|
||||
self.slot(),
|
||||
epoch_accounts_hash
|
||||
);
|
||||
hash = hashv(&[hash.as_ref(), epoch_accounts_hash.as_ref().as_ref()]);
|
||||
} else {
|
||||
warn!("bank {}: epoch_accounts_hash was None but should have been included in this bank's hash!", self.slot());
|
||||
}
|
||||
}
|
||||
let epoch_accounts_hash = self.should_include_epoch_accounts_hash().then(|| {
|
||||
let epoch_accounts_hash = self.wait_get_epoch_accounts_hash();
|
||||
hash = hashv(&[hash.as_ref(), epoch_accounts_hash.as_ref().as_ref()]);
|
||||
epoch_accounts_hash
|
||||
});
|
||||
|
||||
let buf = self
|
||||
.hard_forks
|
||||
|
@ -6917,8 +6900,8 @@ impl Bank {
|
|||
self.signature_count(),
|
||||
self.last_blockhash(),
|
||||
self.capitalization(),
|
||||
if should_include_epoch_accounts_hash {
|
||||
format!(", epoch_accounts_hash: {:?}", epoch_accounts_hash)
|
||||
if let Some(epoch_accounts_hash) = epoch_accounts_hash {
|
||||
format!(", epoch_accounts_hash: {:?}", epoch_accounts_hash.as_ref())
|
||||
} else {
|
||||
"".to_string()
|
||||
}
|
||||
|
@ -6946,6 +6929,29 @@ impl Bank {
|
|||
self.parent_slot() < stop_slot && self.slot() >= stop_slot
|
||||
}
|
||||
|
||||
/// If the epoch accounts hash should be included in this Bank, then fetch it. If the EAH
|
||||
/// calculation has not completed yet, this fn will block until it does complete.
|
||||
fn wait_get_epoch_accounts_hash(&self) -> EpochAccountsHash {
|
||||
let (epoch_accounts_hash, measure) = measure!(self
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.epoch_accounts_hash_manager
|
||||
.wait_get_epoch_accounts_hash());
|
||||
|
||||
datapoint_info!(
|
||||
"bank-wait_get_epoch_accounts_hash",
|
||||
("slot", self.slot() as i64, i64),
|
||||
(
|
||||
"epoch_accounts_hash",
|
||||
epoch_accounts_hash.as_ref().to_string(),
|
||||
String
|
||||
),
|
||||
("waiting-time-us", measure.as_us() as i64, i64),
|
||||
);
|
||||
epoch_accounts_hash
|
||||
}
|
||||
|
||||
/// Recalculate the hash_internal_state from the account stores. Would be used to verify a
|
||||
/// snapshot.
|
||||
/// return true if all is good
|
||||
|
@ -7927,13 +7933,11 @@ impl Bank {
|
|||
|
||||
/// Convenience fn to get the Epoch Accounts Hash
|
||||
pub fn epoch_accounts_hash(&self) -> Option<EpochAccountsHash> {
|
||||
*self
|
||||
.rc
|
||||
self.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.epoch_accounts_hash
|
||||
.lock()
|
||||
.unwrap()
|
||||
.epoch_accounts_hash_manager
|
||||
.try_get_epoch_accounts_hash()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -296,6 +296,13 @@ impl BankForks {
|
|||
"sending epoch accounts hash request, slot: {}",
|
||||
eah_bank.slot()
|
||||
);
|
||||
eah_bank
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.epoch_accounts_hash_manager
|
||||
.set_in_flight(eah_bank.slot());
|
||||
|
||||
self.last_accounts_hash_slot = eah_bank.slot();
|
||||
let squash_timing = eah_bank.squash();
|
||||
total_squash_accounts_ms += squash_timing.squash_accounts_ms as i64;
|
||||
|
@ -305,16 +312,6 @@ impl BankForks {
|
|||
total_squash_cache_ms += squash_timing.squash_cache_ms as i64;
|
||||
is_root_bank_squashed = eah_bank.slot() == root;
|
||||
|
||||
// Clear any existing EAH before requesting a new one
|
||||
_ = eah_bank
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.epoch_accounts_hash
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take();
|
||||
|
||||
accounts_background_request_sender
|
||||
.send_snapshot_request(SnapshotRequest {
|
||||
snapshot_root_bank: Arc::clone(eah_bank),
|
||||
|
@ -637,6 +634,7 @@ mod tests {
|
|||
signature::{Keypair, Signer},
|
||||
},
|
||||
solana_vote_program::vote_state::BlockTimestamp,
|
||||
std::{sync::atomic::Ordering::Relaxed, time::Duration},
|
||||
};
|
||||
|
||||
#[test]
|
||||
|
@ -734,9 +732,38 @@ mod tests {
|
|||
let slots_in_epoch = 32;
|
||||
genesis_config.epoch_schedule = EpochSchedule::new(slots_in_epoch);
|
||||
|
||||
// Spin up a thread to be a fake Accounts Background Service. Need to intercept and handle
|
||||
// (i.e. skip/make invalid) all EpochAccountsHash requests so future rooted banks do not hang
|
||||
// in Bank::freeze() waiting for an in-flight EAH calculation to complete.
|
||||
let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
|
||||
let abs_request_sender = AbsRequestSender::new(snapshot_request_sender);
|
||||
let bg_exit = Arc::new(AtomicBool::new(false));
|
||||
let bg_thread = {
|
||||
let exit = Arc::clone(&bg_exit);
|
||||
std::thread::spawn(move || {
|
||||
while !exit.load(Relaxed) {
|
||||
snapshot_request_receiver
|
||||
.try_iter()
|
||||
.filter(|snapshot_request| {
|
||||
snapshot_request.request_type == SnapshotRequestType::EpochAccountsHash
|
||||
})
|
||||
.for_each(|snapshot_request| {
|
||||
snapshot_request
|
||||
.snapshot_root_bank
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.epoch_accounts_hash_manager
|
||||
.set_invalid_for_tests();
|
||||
});
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let bank0 = Bank::new_for_tests(&genesis_config);
|
||||
let mut bank_forks0 = BankForks::new(bank0);
|
||||
bank_forks0.set_root(0, &AbsRequestSender::default(), None);
|
||||
bank_forks0.set_root(0, &abs_request_sender, None);
|
||||
|
||||
let bank1 = Bank::new_for_tests(&genesis_config);
|
||||
let mut bank_forks1 = BankForks::new(bank1);
|
||||
|
@ -768,7 +795,7 @@ mod tests {
|
|||
|
||||
// Set root in bank_forks0 to truncate the ancestor history
|
||||
bank_forks0.insert(child1);
|
||||
bank_forks0.set_root(slot, &AbsRequestSender::default(), None);
|
||||
bank_forks0.set_root(slot, &abs_request_sender, None);
|
||||
|
||||
// Don't set root in bank_forks1 to keep the ancestor history
|
||||
bank_forks1.insert(child2);
|
||||
|
@ -782,6 +809,9 @@ mod tests {
|
|||
info!("child0.ancestors: {:?}", child1.ancestors);
|
||||
info!("child1.ancestors: {:?}", child2.ancestors);
|
||||
assert_eq!(child1.hash(), child2.hash());
|
||||
|
||||
bg_exit.store(true, Relaxed);
|
||||
bg_thread.join().unwrap();
|
||||
}
|
||||
|
||||
fn make_hash_map(data: Vec<(Slot, Vec<Slot>)>) -> HashMap<Slot, HashSet<Slot>> {
|
||||
|
|
|
@ -16,6 +16,9 @@ use {
|
|||
},
|
||||
};
|
||||
|
||||
mod manager;
|
||||
pub use manager::Manager as EpochAccountsHashManager;
|
||||
|
||||
/// The EpochAccountsHash holds the result after calculating the accounts hash once per epoch
|
||||
#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct EpochAccountsHash(Hash);
|
||||
|
@ -29,7 +32,7 @@ impl AsRef<Hash> for EpochAccountsHash {
|
|||
impl EpochAccountsHash {
|
||||
/// Make an EpochAccountsHash from a regular accounts hash
|
||||
#[must_use]
|
||||
pub fn new(accounts_hash: Hash) -> Self {
|
||||
pub const fn new(accounts_hash: Hash) -> Self {
|
||||
Self(accounts_hash)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,199 @@
|
|||
use {
|
||||
super::EpochAccountsHash,
|
||||
solana_sdk::{clock::Slot, hash::Hash},
|
||||
std::sync::{Condvar, Mutex},
|
||||
};
|
||||
|
||||
/// Manage the epoch accounts hash
|
||||
///
|
||||
/// Handles setting when an EAH calculation is requested and when it completes. Also handles
|
||||
/// waiting for in-flight calculations to complete when the "stop" Bank must include it.
|
||||
#[derive(Debug)]
|
||||
pub struct Manager {
|
||||
/// Current state of the epoch accounts hash
|
||||
state: Mutex<State>,
|
||||
/// This condition variable is used to wait for an in-flight EAH calculation to complete
|
||||
cvar: Condvar,
|
||||
}
|
||||
|
||||
impl Manager {
|
||||
#[must_use]
|
||||
fn _new(state: State) -> Self {
|
||||
Self {
|
||||
state: Mutex::new(state),
|
||||
cvar: Condvar::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new epoch accounts hash manager, with the initial state set to Invalid
|
||||
#[must_use]
|
||||
pub fn new_invalid() -> Self {
|
||||
Self::_new(State::Invalid)
|
||||
}
|
||||
|
||||
/// Create a new epoch accounts hash manager, with the initial state set to Valid
|
||||
#[must_use]
|
||||
pub fn new_valid(epoch_accounts_hash: EpochAccountsHash, slot: Slot) -> Self {
|
||||
Self::_new(State::Valid(epoch_accounts_hash, slot))
|
||||
}
|
||||
|
||||
/// An epoch accounts hash calculation has been requested; update our state
|
||||
pub fn set_in_flight(&self, slot: Slot) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if let State::InFlight(old_slot) = &*state {
|
||||
panic!("An epoch accounts hash calculation is already in-flight from slot {old_slot}!");
|
||||
}
|
||||
*state = State::InFlight(slot);
|
||||
}
|
||||
|
||||
/// An epoch accounts hash calculation has completed; update our state
|
||||
pub fn set_valid(&self, epoch_accounts_hash: EpochAccountsHash, slot: Slot) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if let State::Valid(old_epoch_accounts_hash, old_slot) = &*state {
|
||||
panic!(
|
||||
"The epoch accounts hash is already valid! \
|
||||
\nold slot: {old_slot}, epoch accounts hash: {old_epoch_accounts_hash:?} \
|
||||
\nnew slot: {slot}, epoch accounts hash: {epoch_accounts_hash:?}"
|
||||
);
|
||||
}
|
||||
*state = State::Valid(epoch_accounts_hash, slot);
|
||||
self.cvar.notify_all();
|
||||
}
|
||||
|
||||
/// Get the epoch accounts hash
|
||||
///
|
||||
/// If an EAH calculation is in-flight, then this call will block until it completes.
|
||||
pub fn wait_get_epoch_accounts_hash(&self) -> EpochAccountsHash {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
loop {
|
||||
match &*state {
|
||||
State::Valid(epoch_accounts_hash, _slot) => break *epoch_accounts_hash,
|
||||
State::Invalid => break SENTINEL_EPOCH_ACCOUNTS_HASH,
|
||||
State::InFlight(_slot) => state = self.cvar.wait(state).unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the epoch accounts hash
|
||||
///
|
||||
/// This fn does not block, and will only yield an EAH if the state is `Valid`
|
||||
pub fn try_get_epoch_accounts_hash(&self) -> Option<EpochAccountsHash> {
|
||||
let state = self.state.lock().unwrap();
|
||||
match &*state {
|
||||
State::Valid(epoch_accounts_hash, _slot) => Some(*epoch_accounts_hash),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// **FOR TESTS ONLY**
|
||||
/// Set the state to Invalid
|
||||
/// This is needed by tests that do not fully startup all the accounts background services.
|
||||
/// **FOR TESTS ONLY**
|
||||
pub fn set_invalid_for_tests(&self) {
|
||||
*self.state.lock().unwrap() = State::Invalid;
|
||||
}
|
||||
}
|
||||
|
||||
/// The EpochAccountsHash is calculated in the background via AccountsBackgroundService. This enum
|
||||
/// is used to track the state of that calculation, and queried when saving the EAH into a Bank.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum State {
|
||||
/// On startup from genesis/slot0, the initial state of the EAH is invalid since one has not
|
||||
/// yet been requested. This state should only really occur for tests and new clusters; not
|
||||
/// for established running clusters.
|
||||
Invalid,
|
||||
/// An EAH calculation has been requested (for `Slot`) and is in flight. The Bank that should
|
||||
/// save the EAH must wait until the calculation has completed.
|
||||
InFlight(Slot),
|
||||
/// The EAH calculation is complete (for `Slot`) and the EAH value is valid to read/use.
|
||||
Valid(EpochAccountsHash, Slot),
|
||||
}
|
||||
|
||||
/// Sentinel epoch accounts hash value; used when getting an Invalid EAH
|
||||
///
|
||||
/// Displays as "Sentine1EpochAccountsHash111111111111111111"
|
||||
const SENTINEL_EPOCH_ACCOUNTS_HASH: EpochAccountsHash =
|
||||
EpochAccountsHash::new(Hash::new_from_array([
|
||||
0x06, 0x92, 0x40, 0x3b, 0xee, 0xea, 0x7e, 0xe2, 0x7d, 0xf4, 0x90, 0x7f, 0xbd, 0x9e, 0xd0,
|
||||
0xd2, 0x1c, 0x2b, 0x66, 0x9a, 0xc4, 0xda, 0xce, 0xd7, 0x23, 0x41, 0x69, 0xab, 0xb7, 0x80,
|
||||
0x00, 0x00,
|
||||
]));
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use {super::*, std::time::Duration};
|
||||
|
||||
#[test]
|
||||
fn test_new_valid() {
|
||||
let epoch_accounts_hash = EpochAccountsHash::new(Hash::new_unique());
|
||||
let manager = Manager::new_valid(epoch_accounts_hash, 5678);
|
||||
assert_eq!(
|
||||
manager.try_get_epoch_accounts_hash(),
|
||||
Some(epoch_accounts_hash),
|
||||
);
|
||||
assert_eq!(manager.wait_get_epoch_accounts_hash(), epoch_accounts_hash);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_invalid() {
|
||||
let manager = Manager::new_invalid();
|
||||
assert!(manager.try_get_epoch_accounts_hash().is_none());
|
||||
assert_eq!(
|
||||
manager.wait_get_epoch_accounts_hash(),
|
||||
SENTINEL_EPOCH_ACCOUNTS_HASH,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_get_epoch_accounts_hash() {
|
||||
let epoch_accounts_hash = EpochAccountsHash::new(Hash::new_unique());
|
||||
for (state, expected) in [
|
||||
(State::Invalid, None),
|
||||
(State::InFlight(123), None),
|
||||
(
|
||||
State::Valid(epoch_accounts_hash, 5678),
|
||||
Some(epoch_accounts_hash),
|
||||
),
|
||||
] {
|
||||
let manager = Manager::_new(state);
|
||||
let actual = manager.try_get_epoch_accounts_hash();
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wait_epoch_accounts_hash() {
|
||||
// Test: State is Invalid, no need to wait
|
||||
{
|
||||
let manager = Manager::new_invalid();
|
||||
assert_eq!(
|
||||
manager.wait_get_epoch_accounts_hash(),
|
||||
SENTINEL_EPOCH_ACCOUNTS_HASH,
|
||||
);
|
||||
}
|
||||
|
||||
// Test: State is Valid, no need to wait
|
||||
{
|
||||
let epoch_accounts_hash = EpochAccountsHash::new(Hash::new_unique());
|
||||
let manager = Manager::new_valid(epoch_accounts_hash, 5678);
|
||||
assert_eq!(manager.wait_get_epoch_accounts_hash(), epoch_accounts_hash);
|
||||
}
|
||||
|
||||
// Test: State is InFlight, must wait
|
||||
{
|
||||
let epoch_accounts_hash = EpochAccountsHash::new(Hash::new_unique());
|
||||
let manager = Manager::new_invalid();
|
||||
manager.set_in_flight(123);
|
||||
|
||||
std::thread::scope(|s| {
|
||||
s.spawn(|| {
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
manager.set_valid(epoch_accounts_hash, 5678)
|
||||
});
|
||||
assert!(manager.try_get_epoch_accounts_hash().is_none());
|
||||
assert_eq!(manager.wait_get_epoch_accounts_hash(), epoch_accounts_hash);
|
||||
assert!(manager.try_get_epoch_accounts_hash().is_some());
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -688,8 +688,12 @@ where
|
|||
accounts_update_notifier,
|
||||
exit,
|
||||
);
|
||||
*accounts_db.epoch_accounts_hash.lock().unwrap() =
|
||||
epoch_accounts_hash.map(EpochAccountsHash::new);
|
||||
|
||||
if let Some(epoch_accounts_hash) = epoch_accounts_hash {
|
||||
accounts_db
|
||||
.epoch_accounts_hash_manager
|
||||
.set_valid(EpochAccountsHash::new(epoch_accounts_hash), 0);
|
||||
}
|
||||
|
||||
let AccountsDbFields(
|
||||
_snapshot_storages,
|
||||
|
|
|
@ -254,15 +254,15 @@ fn test_bank_serialize_style(
|
|||
|
||||
if initial_epoch_accounts_hash {
|
||||
expected_epoch_accounts_hash = Some(Hash::new(&[7; 32]));
|
||||
*bank2
|
||||
bank2
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.epoch_accounts_hash
|
||||
.lock()
|
||||
.unwrap() = Some(EpochAccountsHash::new(
|
||||
expected_epoch_accounts_hash.unwrap(),
|
||||
));
|
||||
.epoch_accounts_hash_manager
|
||||
.set_valid(
|
||||
EpochAccountsHash::new(expected_epoch_accounts_hash.unwrap()),
|
||||
0,
|
||||
);
|
||||
}
|
||||
|
||||
crate::serde_snapshot::bank_to_stream(
|
||||
|
@ -416,7 +416,7 @@ fn test_bank_serialize_style(
|
|||
assert_eq!(dbank.get_accounts_hash(), accounts_hash);
|
||||
assert!(bank2 == dbank);
|
||||
assert_eq!(dbank.incremental_snapshot_persistence, incremental);
|
||||
assert_eq!(dbank.rc.accounts.accounts_db.epoch_accounts_hash.lock().unwrap().map(|hash| *hash.as_ref()), expected_epoch_accounts_hash,
|
||||
assert_eq!(dbank.rc.accounts.accounts_db.epoch_accounts_hash_manager.try_get_epoch_accounts_hash().map(|hash| *hash.as_ref()), expected_epoch_accounts_hash,
|
||||
"(reserialize_accounts_hash, incremental_snapshot_persistence, epoch_accounts_hash, update_accounts_hash, initial_epoch_accounts_hash): {:?}",
|
||||
(
|
||||
reserialize_accounts_hash,
|
||||
|
|
Loading…
Reference in New Issue