Adds calculate_incremental_accounts_hash() (#29734)

**Node operators,** please delete your `<ledger>/calculate_accounts_hash_cache` directory (if you have one).  It has been moved/renamed to `<ledger>/accounts_hash_cache/full`.
This commit is contained in:
Brooks 2023-01-17 15:04:29 -05:00 committed by GitHub
parent 8daed75df9
commit 8c62927d59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 285 additions and 54 deletions

View File

@ -26,7 +26,7 @@ use {
accounts_cache::{AccountsCache, CachedAccount, SlotCache},
accounts_hash::{
AccountsHash, AccountsHasher, CalcAccountsHashConfig, CalculateHashIntermediate,
HashStats,
HashStats, ZeroLamportAccounts,
},
accounts_index::{
AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexConfig,
@ -1220,7 +1220,8 @@ pub struct AccountsDb {
/// Set of storage paths to pick from
pub(crate) paths: Vec<PathBuf>,
accounts_hash_cache_path: PathBuf,
full_accounts_hash_cache_path: PathBuf,
incremental_accounts_hash_cache_path: PathBuf,
// used by tests
// holds this until we are dropped
@ -2184,7 +2185,7 @@ impl<'a> AppendVecScan for ScanState<'a> {
}
impl AccountsDb {
pub const ACCOUNTS_HASH_CACHE_DIR: &str = "calculate_accounts_hash_cache";
pub const ACCOUNTS_HASH_CACHE_DIR: &str = "accounts_hash_cache";
pub fn default_for_tests() -> Self {
Self::default_with_accounts_index(AccountInfoAccountsIndex::default_for_tests(), None)
@ -2234,7 +2235,8 @@ impl AccountsDb {
write_cache_limit_bytes: None,
write_version: AtomicU64::new(0),
paths: vec![],
accounts_hash_cache_path,
full_accounts_hash_cache_path: accounts_hash_cache_path.join("full"),
incremental_accounts_hash_cache_path: accounts_hash_cache_path.join("incremental"),
temp_accounts_hash_cache_path,
shrink_paths: RwLock::new(None),
temp_paths: None,
@ -5845,7 +5847,6 @@ impl AccountsDb {
if lamports == 0 {
return Hash::default();
}
let mut hasher = blake3::Hasher::new();
hasher.update(&lamports.to_le_bytes());
@ -7176,17 +7177,9 @@ impl AccountsDb {
);
}
fn get_cache_hash_data(
&self,
config: &CalcAccountsHashConfig<'_>,
slot: Slot,
) -> CacheHashData {
Self::_get_cache_hash_data(self.accounts_hash_cache_path.clone(), config, slot)
}
/// normal code path returns the common cache path
/// when called after a failure has been detected, redirect the cache storage to a separate folder for debugging later
fn _get_cache_hash_data(
fn get_cache_hash_data(
accounts_hash_cache_path: PathBuf,
config: &CalcAccountsHashConfig<'_>,
slot: Slot,
@ -7206,10 +7199,53 @@ impl AccountsDb {
// modeled after get_accounts_delta_hash
// intended to be faster than calculate_accounts_hash
pub fn calculate_accounts_hash_from_storages(
&self,
config: &CalcAccountsHashConfig<'_>,
storages: &SortedStorages<'_>,
stats: HashStats,
) -> Result<(AccountsHash, u64), BankHashVerificationError> {
self._calculate_accounts_hash_from_storages(
config,
storages,
stats,
CalcAccountsHashFlavor::Full,
self.full_accounts_hash_cache_path.clone(),
)
}
/// Calculate the incremental accounts hash
///
/// This calculation is intended to be used by incremental snapshots, and thus differs from a
/// "full" accounts hash in a few ways:
/// - Zero-lamport accounts are *included* in the hash because zero-lamport accounts are also
/// included in the incremental snapshot. This ensures reconstructing the AccountsDb is
/// still correct when using this incremental accounts hash.
/// - `storages` must be *greater than* `base_slot`. This follows the same requirements as
/// incremental snapshots.
pub fn calculate_incremental_accounts_hash(
&self,
config: &CalcAccountsHashConfig<'_>,
storages: &SortedStorages<'_>,
base_slot: Slot,
stats: HashStats,
) -> Result<(AccountsHash, /* capitalization */ u64), BankHashVerificationError> {
assert!(storages.range().start > base_slot, "The storages for calculating an incremental accounts hash must all be higher than the base slot");
self._calculate_accounts_hash_from_storages(
config,
storages,
stats,
CalcAccountsHashFlavor::Incremental,
self.incremental_accounts_hash_cache_path.clone(),
)
}
fn _calculate_accounts_hash_from_storages(
&self,
config: &CalcAccountsHashConfig<'_>,
storages: &SortedStorages<'_>,
mut stats: HashStats,
flavor: CalcAccountsHashFlavor,
accounts_hash_cache_path: PathBuf,
) -> Result<(AccountsHash, u64), BankHashVerificationError> {
let _guard = self.active_stats.activate(ActiveStatItem::Hash);
stats.oldest_root = storages.range().start;
@ -7217,49 +7253,53 @@ impl AccountsDb {
self.mark_old_slots_as_dirty(storages, config.epoch_schedule.slots_per_epoch, &mut stats);
let use_bg_thread_pool = config.use_bg_thread_pool;
let mut scan_and_hash = || {
let cache_hash_data = self.get_cache_hash_data(config, storages.max_slot_inclusive());
let scan_and_hash = || {
let cache_hash_data = Self::get_cache_hash_data(
accounts_hash_cache_path,
config,
storages.max_slot_inclusive(),
);
let bounds = Range {
start: 0,
end: PUBKEY_BINS_FOR_CALCULATING_HASHES,
};
let hash = AccountsHasher {
let accounts_hasher = AccountsHasher {
filler_account_suffix: if self.filler_accounts_config.count > 0 {
self.filler_account_suffix
} else {
None
},
zero_lamport_accounts: flavor.zero_lamport_accounts(),
};
// get raw data by scanning
let result = self.scan_snapshot_stores_with_cache(
let cache_hash_data_files = self.scan_snapshot_stores_with_cache(
&cache_hash_data,
storages,
&mut stats,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
&bounds,
config,
hash.filler_account_suffix.as_ref(),
accounts_hasher.filler_account_suffix.as_ref(),
)?;
// convert mmapped cache files into slices of data
let slices = result
let cache_hash_intermediates = cache_hash_data_files
.iter()
.map(|d| d.get_cache_hash_data())
.collect::<Vec<_>>();
// rework slices of data into bins for parallel processing and to match data shape expected by 'rest_of_hash_calculation'
let result = AccountsHasher::get_binned_data(
&slices,
&cache_hash_intermediates,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
&bounds,
);
// turn raw data into merkle tree hashes and sum of lamports
let final_result = hash.rest_of_hash_calculation(result, &mut stats);
let final_result = accounts_hasher.rest_of_hash_calculation(result, &mut stats);
info!(
"calculate_accounts_hash_from_storages: slot: {} {:?}",
storages.max_slot_inclusive(),
@ -8952,6 +8992,23 @@ pub enum CalcAccountsHashDataSource {
Storages,
}
/// Which accounts hash calculation is being performed?
#[derive(Debug)]
enum CalcAccountsHashFlavor {
Full,
Incremental,
}
impl CalcAccountsHashFlavor {
/// How should zero-lamport accounts be handled by this accounts hash calculation?
fn zero_lamport_accounts(&self) -> ZeroLamportAccounts {
match self {
CalcAccountsHashFlavor::Full => ZeroLamportAccounts::Excluded,
CalcAccountsHashFlavor::Incremental => ZeroLamportAccounts::Included,
}
}
}
#[cfg(test)]
impl AccountsDb {
pub fn new(paths: Vec<PathBuf>, cluster_type: &ClusterType) -> Self {
@ -17364,4 +17421,159 @@ pub mod tests {
make_ancient_append_vec_full(full.new_storage());
full
}
#[test]
fn test_calculate_incremental_accounts_hash() {
let accounts_db =
AccountsDb::new_for_tests_with_caching(Vec::new(), &ClusterType::Development);
let owner = Pubkey::new_unique();
let mut accounts: Vec<_> = (0..10)
.map(|_| (Pubkey::new_unique(), AccountSharedData::new(0, 0, &owner)))
.collect();
// store some accounts into slot 0
let slot = 0;
{
accounts[0].1.set_lamports(0);
accounts[1].1.set_lamports(1);
accounts[2].1.set_lamports(10);
accounts[3].1.set_lamports(100);
//accounts[4].1.set_lamports(1_000); <-- will be added next slot
let accounts = vec![
(&accounts[0].0, &accounts[0].1),
(&accounts[1].0, &accounts[1].1),
(&accounts[2].0, &accounts[2].1),
(&accounts[3].0, &accounts[3].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.add_root_and_flush_write_cache(slot);
}
// store some accounts into slot 1
let slot = slot + 1;
{
//accounts[0].1.set_lamports(0); <-- unchanged
accounts[1].1.set_lamports(0); /* <-- drain account */
//accounts[2].1.set_lamports(10); <-- unchanged
//accounts[3].1.set_lamports(100); <-- unchanged
accounts[4].1.set_lamports(1_000); /* <-- add account */
let accounts = vec![
(&accounts[1].0, &accounts[1].1),
(&accounts[4].0, &accounts[4].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.add_root_and_flush_write_cache(slot);
}
// calculate the full accounts hash
let full_accounts_hash = {
accounts_db.clean_accounts(Some(slot - 1), false, None);
let (storages, _) = accounts_db.get_snapshot_storages(..=slot, None);
let storages = SortedStorages::new(&storages);
accounts_db
.calculate_accounts_hash_from_storages(
&CalcAccountsHashConfig::default(),
&storages,
HashStats::default(),
)
.unwrap()
};
assert_eq!(full_accounts_hash.1, 1_110);
let full_accounts_hash_slot = slot;
// Calculate the expected full accounts hash here and ensure it matches.
// Ensure the zero-lamport accounts are NOT included in the full accounts hash.
let full_account_hashes = [(2, 0), (3, 0), (4, 1)].into_iter().map(|(index, slot)| {
let (pubkey, account) = &accounts[index];
AccountsDb::hash_account(slot, account, pubkey, INCLUDE_SLOT_IN_HASH_TESTS)
});
let expected_accounts_hash = AccountsHash(compute_merkle_root(full_account_hashes));
assert_eq!(full_accounts_hash.0, expected_accounts_hash);
// store accounts into slot 2
let slot = slot + 1;
{
//accounts[0].1.set_lamports(0); <-- unchanged
//accounts[1].1.set_lamports(0); <-- unchanged
accounts[2].1.set_lamports(0); /* <-- drain account */
//accounts[3].1.set_lamports(100); <-- unchanged
//accounts[4].1.set_lamports(1_000); <-- unchanged
accounts[5].1.set_lamports(10_000); /* <-- add account */
accounts[6].1.set_lamports(100_000); /* <-- add account */
//accounts[7].1.set_lamports(1_000_000); <-- will be added next slot
let accounts = vec![
(&accounts[2].0, &accounts[2].1),
(&accounts[5].0, &accounts[5].1),
(&accounts[6].0, &accounts[6].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.add_root_and_flush_write_cache(slot);
}
// store accounts into slot 3
let slot = slot + 1;
{
//accounts[0].1.set_lamports(0); <-- unchanged
//accounts[1].1.set_lamports(0); <-- unchanged
//accounts[2].1.set_lamports(0); <-- unchanged
accounts[3].1.set_lamports(0); /* <-- drain account */
//accounts[4].1.set_lamports(1_000); <-- unchanged
accounts[5].1.set_lamports(0); /* <-- drain account */
//accounts[6].1.set_lamports(100_000); <-- unchanged
accounts[7].1.set_lamports(1_000_000); /* <-- add account */
let accounts = vec![
(&accounts[3].0, &accounts[3].1),
(&accounts[5].0, &accounts[5].1),
(&accounts[7].0, &accounts[7].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.add_root_and_flush_write_cache(slot);
}
// calculate the incremental accounts hash
let incremental_accounts_hash = {
accounts_db.clean_accounts(Some(slot - 1), false, Some(full_accounts_hash_slot));
let (storages, _) =
accounts_db.get_snapshot_storages(full_accounts_hash_slot + 1..=slot, None);
let storages = SortedStorages::new(&storages);
accounts_db
.calculate_incremental_accounts_hash(
&CalcAccountsHashConfig::default(),
&storages,
full_accounts_hash_slot,
HashStats::default(),
)
.unwrap()
};
assert_eq!(incremental_accounts_hash.1, 1_100_000);
// Ensure the zero-lamport accounts are included in the IAH.
// Accounts 2, 3, and 5 are all zero-lamports.
let incremental_account_hashes =
[(2, 2), (3, 3), (5, 3), (6, 2), (7, 3)]
.into_iter()
.map(|(index, slot)| {
let (pubkey, account) = &accounts[index];
if account.is_zero_lamport() {
// For incremental accounts hash, the hash of a zero lamport account is the hash of its pubkey.
// Ensure this implementation detail remains in sync with AccountsHasher::de_dup_in_parallel().
let hash = blake3::hash(bytemuck::bytes_of(pubkey));
Hash::new_from_array(hash.into())
} else {
AccountsDb::hash_account(slot, account, pubkey, INCLUDE_SLOT_IN_HASH_TESTS)
}
});
let expected_accounts_hash = AccountsHash(compute_merkle_root(incremental_account_hashes));
assert_eq!(incremental_accounts_hash.0, expected_accounts_hash);
}
fn compute_merkle_root(hashes: impl IntoIterator<Item = Hash>) -> Hash {
let hashes = hashes.into_iter().collect();
AccountsHasher::compute_merkle_root_recurse(hashes, MERKLE_FANOUT)
}
}

View File

@ -495,9 +495,19 @@ impl CumulativeOffsets {
}
}
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct AccountsHasher {
pub filler_account_suffix: Option<Pubkey>,
pub zero_lamport_accounts: ZeroLamportAccounts,
}
impl Default for AccountsHasher {
fn default() -> Self {
Self {
filler_account_suffix: None,
zero_lamport_accounts: ZeroLamportAccounts::Excluded,
}
}
}
impl AccountsHasher {
@ -862,7 +872,7 @@ impl AccountsHasher {
/// Vec, with one entry per bin
/// for each entry, Vec<Hash> in pubkey order
/// If return Vec<AccountHashesFile> was flattened, it would be all hashes, in pubkey order.
fn de_dup_and_eliminate_zeros<'a>(
fn de_dup_accounts<'a>(
&self,
sorted_data_by_pubkey: &'a [SortedDataByPubkey<'a>],
stats: &mut HashStats,
@ -947,7 +957,7 @@ impl AccountsHasher {
// go through: [..][pubkey_bin][..] and return hashes and lamport sum
// slot groups^ ^accounts found in a slot group, sorted by pubkey, higher slot, write_version
// 1. eliminate zero lamport accounts
// 1. handle zero lamport accounts
// 2. pick the highest slot or (slot = and highest version) of each pubkey
// 3. produce this output:
// a. AccountHashesFile: individual account hashes in pubkey order
@ -1021,15 +1031,26 @@ impl AccountsHasher {
&mut first_item_to_pubkey_division,
);
// add lamports, get hash as long as the lamports are > 0
if item.lamports != 0
&& (!filler_accounts_enabled || !self.is_filler_account(&item.pubkey))
{
overall_sum = Self::checked_cast_for_capitalization(
item.lamports as u128 + overall_sum as u128,
);
hashes.write(&item.hash);
// add lamports and get hash
if item.lamports != 0 {
// do not include filler accounts in the hash
if !(filler_accounts_enabled && self.is_filler_account(&item.pubkey)) {
overall_sum = Self::checked_cast_for_capitalization(
item.lamports as u128 + overall_sum as u128,
);
hashes.write(&item.hash);
}
} else {
// if lamports == 0, check if they should be included
if self.zero_lamport_accounts == ZeroLamportAccounts::Included {
// For incremental accounts hash, the hash of a zero lamport account is
// the hash of its pubkey
let hash = blake3::hash(bytemuck::bytes_of(&item.pubkey));
let hash = Hash::new_from_array(hash.into());
hashes.write(&hash);
}
}
if !duplicate_pubkey_indexes.is_empty() {
// skip past duplicate keys in earlier slots
// reverse this list because get_item can remove first_items[*i] when *i is exhausted
@ -1066,7 +1087,7 @@ impl AccountsHasher {
data_sections_by_pubkey: Vec<SortedDataByPubkey<'_>>,
mut stats: &mut HashStats,
) -> (Hash, u64) {
let (hashes, total_lamports) = self.de_dup_and_eliminate_zeros(
let (hashes, total_lamports) = self.de_dup_accounts(
&data_sections_by_pubkey,
stats,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
@ -1088,7 +1109,14 @@ impl AccountsHasher {
}
}
/// Hash of all the accounts
/// How should zero-lamport accounts be treated by the accounts hasher?
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ZeroLamportAccounts {
Excluded,
Included,
}
/// Hash of accounts
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, AbiExample)]
pub struct AccountsHash(pub Hash);
@ -1291,7 +1319,7 @@ pub mod tests {
let vec = vec![vec![], vec![]];
let (hashes, lamports) =
accounts_hash.de_dup_and_eliminate_zeros(&vec, &mut HashStats::default(), one_range());
accounts_hash.de_dup_accounts(&vec, &mut HashStats::default(), one_range());
assert_eq!(
vec![Hash::default(); 0],
get_vec_vec(hashes)
@ -1302,7 +1330,7 @@ pub mod tests {
assert_eq!(lamports, 0);
let vec = vec![];
let (hashes, lamports) =
accounts_hash.de_dup_and_eliminate_zeros(&vec, &mut HashStats::default(), zero_range());
accounts_hash.de_dup_accounts(&vec, &mut HashStats::default(), zero_range());
let empty: Vec<Vec<Hash>> = Vec::default();
assert_eq!(empty, get_vec_vec(hashes));
assert_eq!(lamports, 0);
@ -1399,25 +1427,16 @@ pub mod tests {
let (hashes3, lamports3, _) = hash.de_dup_accounts_in_parallel(&slice3, 0);
let vec = slice.to_vec();
let slice4 = convert_to_slice2(&vec);
let (hashes4, lamports4) = hash.de_dup_and_eliminate_zeros(
&slice4,
&mut HashStats::default(),
end - start,
);
let (hashes4, lamports4) =
hash.de_dup_accounts(&slice4, &mut HashStats::default(), end - start);
let vec = slice.to_vec();
let slice5 = convert_to_slice2(&vec);
let (hashes5, lamports5) = hash.de_dup_and_eliminate_zeros(
&slice5,
&mut HashStats::default(),
end - start,
);
let (hashes5, lamports5) =
hash.de_dup_accounts(&slice5, &mut HashStats::default(), end - start);
let vec = slice.to_vec();
let slice5 = convert_to_slice2(&vec);
let (hashes6, lamports6) = hash.de_dup_and_eliminate_zeros(
&slice5,
&mut HashStats::default(),
end - start,
);
let (hashes6, lamports6) =
hash.de_dup_accounts(&slice5, &mut HashStats::default(), end - start);
let hashes2 = get_vec(hashes2);
let hashes3 = get_vec(hashes3);
@ -1936,7 +1955,7 @@ pub mod tests {
Pubkey::new_unique(),
)],
];
AccountsHasher::default().de_dup_and_eliminate_zeros(
AccountsHasher::default().de_dup_accounts(
&[convert_to_slice(&input)],
&mut HashStats::default(),
2, // accounts above are in 2 groups