serialize epoch_accounts_hash (#27516)

This commit is contained in:
Jeff Washington (jwash) 2022-09-07 10:07:00 -07:00 committed by GitHub
parent ebf10960b5
commit a31d4a597d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 144 additions and 26 deletions

View File

@ -214,6 +214,7 @@ impl AccountsHashVerifier {
accounts_package.slot,
&accounts_hash,
None,
None,
);
datapoint_info!(
"accounts_hash_verifier",

View File

@ -258,6 +258,7 @@ fn run_bank_forks_snapshot_n<F>(
accounts_package.slot,
&last_bank.get_accounts_hash(),
None,
None,
);
let snapshot_package = SnapshotPackage::new(accounts_package, last_bank.get_accounts_hash());
snapshot_utils::archive_snapshot_package(
@ -494,6 +495,7 @@ fn test_concurrent_snapshot_packaging(
accounts_package.slot,
&Hash::default(),
None,
None,
);
let snapshot_package = SnapshotPackage::new(accounts_package, Hash::default());
pending_snapshot_package
@ -538,6 +540,7 @@ fn test_concurrent_snapshot_packaging(
saved_slot,
&Hash::default(),
None,
None,
);
snapshot_utils::verify_snapshot_archive(

View File

@ -1190,7 +1190,11 @@ pub struct AccountsDb {
/// debug feature to scan every append vec and verify refcounts are equal
exhaustively_verify_refcounts: bool,
/// A special accounts hash that occurs once per epoch
/// the full accounts hash calculation as of a predetermined block height 'N'
/// to be included in the bank hash at a predetermined block height 'M'
/// The cadence is once per epoch, all nodes calculate a full accounts hash as of a known slot calculated using 'N'
/// Some time later (to allow for slow calculation time), the bank hash at a slot calculated using 'M' includes the full accounts hash.
/// Thus, the state of all accounts on a validator is known to be correct at least once per epoch.
pub(crate) epoch_accounts_hash: Mutex<Option<EpochAccountsHash>>,
}

View File

@ -2285,6 +2285,19 @@ impl Bank {
bank
}
/// if we were to serialize THIS bank, what value should be saved for the prior accounts hash?
/// This depends on the proximity to the time to take the snapshot and the time to use the snapshot.
pub(crate) fn get_epoch_accounts_hash_to_serialize(&self) -> Option<Hash> {
self.rc
.accounts
.accounts_db
.epoch_accounts_hash
.lock()
.unwrap()
.as_ref()
.map(|hash| *hash.as_ref())
}
/// Return subset of bank fields representing serializable state
pub(crate) fn get_fields_to_serialize<'a>(
&'a self,

View File

@ -23,6 +23,14 @@ impl AsRef<Hash> for EpochAccountsHash {
}
}
impl EpochAccountsHash {
/// Make an EpochAccountsHash from a regular accounts hash
#[must_use]
pub fn new(accounts_hash: Hash) -> Self {
Self(accounts_hash)
}
}
/// Calculation of the EAH occurs once per epoch. All nodes in the cluster must agree on which
/// slot the EAH is based on. This slot will be at an offset into the epoch, and referred to as
/// the "start" slot for the EAH calculation.

View File

@ -11,6 +11,7 @@ use {
bank::{Bank, BankFieldsToDeserialize, BankIncrementalSnapshotPersistence, BankRc},
blockhash_queue::BlockhashQueue,
builtins::Builtins,
epoch_accounts_hash::EpochAccountsHash,
epoch_stakes::EpochStakes,
rent_collector::RentCollector,
runtime_config::RuntimeConfig,
@ -193,6 +194,7 @@ trait TypeContext<'a>: PartialEq {
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
epoch_accounts_hash: Option<&Hash>,
) -> std::result::Result<(), Box<bincode::ErrorKind>>
where
R: Read,
@ -390,6 +392,7 @@ fn reserialize_bank_fields_with_new_hash<W, R>(
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
epoch_accounts_hash: Option<&Hash>,
) -> Result<(), Error>
where
W: Write,
@ -400,6 +403,7 @@ where
stream_writer,
accounts_hash,
incremental_snapshot_persistence,
epoch_accounts_hash,
)
}
@ -413,6 +417,7 @@ pub fn reserialize_bank_with_new_accounts_hash(
slot: Slot,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
epoch_accounts_hash: Option<&Hash>,
) -> bool {
let bank_post = snapshot_utils::get_bank_snapshots_dir(bank_snapshots_dir, slot);
let bank_post = bank_post.join(snapshot_utils::get_snapshot_file_name(slot));
@ -431,6 +436,7 @@ pub fn reserialize_bank_with_new_accounts_hash(
&mut BufWriter::new(file_out),
accounts_hash,
incremental_snapshot_persistence,
epoch_accounts_hash,
)
.unwrap();
}
@ -541,6 +547,7 @@ where
verify_index,
accounts_db_config,
accounts_update_notifier,
bank_fields.epoch_accounts_hash,
)?;
let bank_rc = BankRc::new(Accounts::new_empty(accounts_db), bank_fields.slot);
@ -661,6 +668,7 @@ fn reconstruct_accountsdb_from_fields<E>(
verify_index: bool,
accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
epoch_accounts_hash: Option<Hash>,
) -> Result<(AccountsDb, ReconstructedAccountsDbInfo), Error>
where
E: SerializableStorage + std::marker::Sync,
@ -674,6 +682,8 @@ where
accounts_db_config,
accounts_update_notifier,
);
*accounts_db.epoch_accounts_hash.lock().unwrap() =
epoch_accounts_hash.map(EpochAccountsHash::new);
let AccountsDbFields(
_snapshot_storages,

View File

@ -212,6 +212,9 @@ impl<'a> TypeContext<'a> for Context {
// TODO: if we do a snapshot version bump, consider moving this out.
lamports_per_signature,
None::<BankIncrementalSnapshotPersistence>,
serializable_bank
.bank
.get_epoch_accounts_hash_to_serialize(),
)
.serialize(serializer)
}
@ -344,6 +347,7 @@ impl<'a> TypeContext<'a> for Context {
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
epoch_accounts_hash: Option<&Hash>,
) -> std::result::Result<(), Box<bincode::ErrorKind>>
where
R: Read,
@ -356,6 +360,7 @@ impl<'a> TypeContext<'a> for Context {
let blockhash_queue = RwLock::new(rhs.blockhash_queue.clone());
let hard_forks = RwLock::new(rhs.hard_forks.clone());
let lamports_per_signature = rhs.fee_rate_governor.lamports_per_signature;
let epoch_accounts_hash = epoch_accounts_hash.or(rhs.epoch_accounts_hash.as_ref());
let bank = SerializableVersionedBank {
blockhash_queue: &blockhash_queue,
@ -399,6 +404,7 @@ impl<'a> TypeContext<'a> for Context {
accounts_db_fields,
lamports_per_signature,
incremental_snapshot_persistence,
epoch_accounts_hash,
),
)
}

View File

@ -116,6 +116,7 @@ where
false,
Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING),
None,
None,
)
.map(|(accounts_db, _)| accounts_db)
}
@ -217,6 +218,8 @@ fn test_bank_serialize_style(
reserialize_accounts_hash: bool,
update_accounts_hash: bool,
incremental_snapshot_persistence: bool,
epoch_accounts_hash: bool,
initial_epoch_accounts_hash: bool,
) {
solana_logger::setup();
let (genesis_config, _) = create_genesis_config(500);
@ -245,6 +248,22 @@ fn test_bank_serialize_style(
let snapshot_storages = bank2.get_snapshot_storages(None);
let mut buf = vec![];
let mut writer = Cursor::new(&mut buf);
let mut expected_epoch_accounts_hash = None;
if initial_epoch_accounts_hash {
expected_epoch_accounts_hash = Some(Hash::new(&[7; 32]));
*bank2
.rc
.accounts
.accounts_db
.epoch_accounts_hash
.lock()
.unwrap() = Some(EpochAccountsHash::new(
expected_epoch_accounts_hash.unwrap(),
));
}
crate::serde_snapshot::bank_to_stream(
serde_style,
&mut std::io::BufWriter::new(&mut writer),
@ -274,7 +293,7 @@ fn test_bank_serialize_style(
incremental_capitalization: 32,
});
if reserialize_accounts_hash || incremental_snapshot_persistence {
if reserialize_accounts_hash || incremental_snapshot_persistence || epoch_accounts_hash {
let temp_dir = TempDir::new().unwrap();
let slot_dir = temp_dir.path().join(slot.to_string());
let post_path = slot_dir.join(slot.to_string());
@ -286,36 +305,71 @@ fn test_bank_serialize_style(
f.write_all(&buf).unwrap();
}
let reserialized_epoch_accounts_hash = if epoch_accounts_hash {
expected_epoch_accounts_hash = Some(Hash::new(&[3; 32]));
expected_epoch_accounts_hash
} else {
None
};
assert!(reserialize_bank_with_new_accounts_hash(
temp_dir.path(),
slot,
&accounts_hash,
incremental.as_ref(),
reserialized_epoch_accounts_hash.as_ref(),
));
let previous_len = buf.len();
// larger buffer than expected to make sure the file isn't larger than expected
let sizeof_none = std::mem::size_of::<u64>();
let sizeof_incremental_snapshot_persistence =
std::mem::size_of::<Option<BankIncrementalSnapshotPersistence>>();
let mut buf_reserialized =
vec![0; previous_len + sizeof_incremental_snapshot_persistence + 1];
let mut buf_reserialized;
{
let previous_len = buf.len();
let expected = previous_len
+ if incremental_snapshot_persistence {
// previously saved a none (size = sizeof_None), now added a Some
let sizeof_none = std::mem::size_of::<u64>();
let sizeof_incremental_snapshot_persistence =
std::mem::size_of::<Option<BankIncrementalSnapshotPersistence>>();
sizeof_incremental_snapshot_persistence - sizeof_none
} else {
// no change
0
}
+ if epoch_accounts_hash && !initial_epoch_accounts_hash {
// previously saved a none (size 1), now added a Some
let sizeof_epoch_accounts_hash_persistence =
std::mem::size_of::<Option<Hash>>();
sizeof_epoch_accounts_hash_persistence - 1
} else {
// no change
0
};
// +1: larger buffer than expected to make sure the file isn't larger than expected
buf_reserialized = vec![0; expected + 1];
let mut f = std::fs::File::open(post_path).unwrap();
let size = f.read(&mut buf_reserialized).unwrap();
let expected = if !incremental_snapshot_persistence {
previous_len
} else {
previous_len + sizeof_incremental_snapshot_persistence - sizeof_none
};
assert_eq!(size, expected);
assert_eq!(
size,
expected,
"(reserialize_accounts_hash, incremental_snapshot_persistence, epoch_accounts_hash, update_accounts_hash, initial_epoch_accounts_hash): {:?}, previous_len: {previous_len}",
(
reserialize_accounts_hash,
incremental_snapshot_persistence,
epoch_accounts_hash,
update_accounts_hash,
initial_epoch_accounts_hash,
)
);
buf_reserialized.truncate(size);
}
if update_accounts_hash || incremental_snapshot_persistence {
if update_accounts_hash {
// We cannot guarantee buffer contents are exactly the same if hash is the same.
// Things like hashsets/maps have randomness in their in-mem representations.
// This make serialized bytes not deterministic.
// This makes serialized bytes not deterministic.
// But, we can guarantee that the buffer is different if we change the hash!
assert_ne!(buf, buf_reserialized);
}
if update_accounts_hash || incremental_snapshot_persistence || epoch_accounts_hash {
buf = buf_reserialized;
}
}
@ -360,6 +414,16 @@ fn test_bank_serialize_style(
assert_eq!(dbank.get_accounts_hash(), accounts_hash);
assert!(bank2 == dbank);
assert_eq!(dbank.incremental_snapshot_persistence, incremental);
assert_eq!(dbank.rc.accounts.accounts_db.epoch_accounts_hash.lock().unwrap().map(|hash| *hash.as_ref()), expected_epoch_accounts_hash,
"(reserialize_accounts_hash, incremental_snapshot_persistence, epoch_accounts_hash, update_accounts_hash, initial_epoch_accounts_hash): {:?}",
(
reserialize_accounts_hash,
incremental_snapshot_persistence,
epoch_accounts_hash,
update_accounts_hash,
initial_epoch_accounts_hash,
)
);
}
pub(crate) fn reconstruct_accounts_db_via_serialization(
@ -413,17 +477,24 @@ fn test_bank_serialize_newer() {
for (reserialize_accounts_hash, update_accounts_hash) in
[(false, false), (true, false), (true, true)]
{
for incremental_snapshot_persistence in if reserialize_accounts_hash {
let parameters = if reserialize_accounts_hash {
[false, true].to_vec()
} else {
[false].to_vec()
} {
test_bank_serialize_style(
SerdeStyle::Newer,
reserialize_accounts_hash,
update_accounts_hash,
incremental_snapshot_persistence,
)
};
for incremental_snapshot_persistence in parameters.clone() {
for epoch_accounts_hash in parameters.clone() {
for initial_epoch_accounts_hash in parameters.clone() {
test_bank_serialize_style(
SerdeStyle::Newer,
reserialize_accounts_hash,
update_accounts_hash,
incremental_snapshot_persistence,
epoch_accounts_hash,
initial_epoch_accounts_hash,
)
}
}
}
}
}
@ -616,7 +687,7 @@ mod test_bank_serialize {
// This some what long test harness is required to freeze the ABI of
// Bank's serialization due to versioned nature
#[frozen_abi(digest = "5py4Wkuj5fV2sLyA1MrPg4pGNwMEaygQLnpLyY8MMLGC")]
#[frozen_abi(digest = "7SZNRErAktC7sRcpChrcHfsr9Uw7XXoSzNbYzoNtoQCr")]
#[derive(Serialize, AbiExample)]
pub struct BankAbiTestWrapperNewer {
#[serde(serialize_with = "wrapper_newer")]

View File

@ -2297,6 +2297,7 @@ pub fn package_and_archive_full_snapshot(
accounts_package.slot,
&bank.get_accounts_hash(),
None,
None, // todo: this needs to be passed through
);
let snapshot_package = SnapshotPackage::new(accounts_package, bank.get_accounts_hash());
@ -2350,6 +2351,7 @@ pub fn package_and_archive_incremental_snapshot(
accounts_package.slot,
&bank.get_accounts_hash(),
None,
None, // todo: this needs to be passed through
);
let snapshot_package = SnapshotPackage::new(accounts_package, bank.get_accounts_hash());