serialize incremental_snapshot_hash (#26839)

* serialize incremental_snapshot_hash

* pr feedback
This commit is contained in:
Jeff Washington (jwash) 2022-08-17 15:14:31 -05:00 committed by GitHub
parent c1111fa069
commit 225cddcffb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 101 additions and 16 deletions

View File

@ -190,6 +190,7 @@ impl AccountsHashVerifier {
accounts_package.snapshot_links.path(),
accounts_package.slot,
&accounts_hash,
None,
);
datapoint_info!(
"accounts_hash_verifier",

View File

@ -256,6 +256,7 @@ fn run_bank_forks_snapshot_n<F>(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&last_bank.get_accounts_hash(),
None,
);
let snapshot_package = SnapshotPackage::new(accounts_package, last_bank.get_accounts_hash());
snapshot_utils::archive_snapshot_package(
@ -491,6 +492,7 @@ fn test_concurrent_snapshot_packaging(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&Hash::default(),
None,
);
let snapshot_package = SnapshotPackage::new(accounts_package, Hash::default());
pending_snapshot_package
@ -534,6 +536,7 @@ fn test_concurrent_snapshot_packaging(
saved_snapshots_dir.path(),
saved_slot,
&Hash::default(),
None,
);
snapshot_utils::verify_snapshot_archive(

View File

@ -236,6 +236,25 @@ impl RentDebit {
}
}
/// Incremental snapshots only calculate their accounts hash based on the account changes WITHIN the incremental slot range.
/// So, we need to keep track of the full snapshot expected accounts hash results.
/// We also need to keep track of the hash and capitalization specific to the incremental snapshot slot range.
/// The capitalization we calculate for the incremental slot will NOT be consistent with the bank's capitalization.
/// It is not feasible to calculate a capitalization delta that is correct given just incremental slots account data and the full snapshot's capitalization.
#[derive(Serialize, Deserialize, AbiExample, Clone, Debug, Default, PartialEq, Eq)]
pub struct BankIncrementalSnapshotPersistence {
/// slot of full snapshot
pub full_slot: Slot,
/// accounts hash from the full snapshot
pub full_hash: Hash,
/// capitalization from the full snapshot
pub full_capitalization: u64,
/// hash of the accounts in the incremental snapshot slot range, including zero-lamport accounts
pub incremental_hash: Hash,
/// capitalization of the accounts in the incremental snapshot slot range
pub incremental_capitalization: u64,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct RentDebits(HashMap<Pubkey, RentDebit>);
impl RentDebits {
@ -976,6 +995,7 @@ pub struct BankFieldsToDeserialize {
pub(crate) epoch_stakes: HashMap<Epoch, EpochStakes>,
pub(crate) is_delta: bool,
pub(crate) accounts_data_len: u64,
pub(crate) incremental_snapshot_persistence: Option<BankIncrementalSnapshotPersistence>,
}
// Bank's common fields shared by all supported snapshot versions for serialization.
@ -1083,6 +1103,7 @@ impl PartialEq for Bank {
accounts_data_size_delta_on_chain: _,
accounts_data_size_delta_off_chain: _,
fee_structure: _,
incremental_snapshot_persistence: _,
// Ignore new fields explicitly if they do not impact PartialEq.
// Adding ".." will remove compile-time checks that if a new field
// is added to the struct, this ParitalEq is accordingly updated.
@ -1336,6 +1357,8 @@ pub struct Bank {
/// Transaction fee structure
pub fee_structure: FeeStructure,
pub incremental_snapshot_persistence: Option<BankIncrementalSnapshotPersistence>,
}
struct VoteWithStakeDelegations {
@ -1466,6 +1489,7 @@ impl Bank {
fn default_with_accounts(accounts: Accounts) -> Self {
let mut bank = Self {
incremental_snapshot_persistence: None,
rewrites_skipped_this_slot: Rewrites::default(),
rc: BankRc::new(accounts, Slot::default()),
status_cache: Arc::<RwLock<BankStatusCache>>::default(),
@ -1765,6 +1789,7 @@ impl Bank {
let accounts_data_size_initial = parent.load_accounts_data_size();
let mut new = Bank {
incremental_snapshot_persistence: None,
rewrites_skipped_this_slot: Rewrites::default(),
rc,
status_cache,
@ -2126,6 +2151,7 @@ impl Bank {
}
let feature_set = new();
let mut bank = Self {
incremental_snapshot_persistence: fields.incremental_snapshot_persistence,
rewrites_skipped_this_slot: Rewrites::default(),
rc: bank_rc,
status_cache: new(),

View File

@ -8,7 +8,7 @@ use {
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
append_vec::{AppendVec, StoredMetaWriteVersion},
bank::{Bank, BankFieldsToDeserialize, BankRc},
bank::{Bank, BankFieldsToDeserialize, BankIncrementalSnapshotPersistence, BankRc},
blockhash_queue::BlockhashQueue,
builtins::Builtins,
epoch_stakes::EpochStakes,
@ -77,6 +77,7 @@ pub struct AccountsDbFields<T>(
/// slots that were roots within the last epoch for which we care about the hash value
#[serde(deserialize_with = "default_on_eof")]
Vec<(Slot, Hash)>,
// here?
);
/// Helper type to wrap BufReader streams when deserializing and reconstructing from either just a
@ -193,6 +194,7 @@ trait TypeContext<'a>: PartialEq {
stream_reader: &mut BufReader<R>,
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> std::result::Result<(), Box<bincode::ErrorKind>>
where
R: Read,
@ -370,12 +372,18 @@ fn reserialize_bank_fields_with_new_hash<W, R>(
stream_reader: &mut BufReader<R>,
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> Result<(), Error>
where
W: Write,
R: Read,
{
newer::Context::reserialize_bank_fields_with_hash(stream_reader, stream_writer, accounts_hash)
newer::Context::reserialize_bank_fields_with_hash(
stream_reader,
stream_writer,
accounts_hash,
incremental_snapshot_persistence,
)
}
/// effectively updates the accounts hash in the serialized bank file on disk
@ -387,6 +395,7 @@ pub fn reserialize_bank_with_new_accounts_hash(
bank_snapshots_dir: impl AsRef<Path>,
slot: Slot,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> bool {
let bank_post = snapshot_utils::get_bank_snapshots_dir(bank_snapshots_dir, slot);
let bank_post = bank_post.join(snapshot_utils::get_snapshot_file_name(slot));
@ -404,6 +413,7 @@ pub fn reserialize_bank_with_new_accounts_hash(
&mut BufReader::new(file),
&mut BufWriter::new(file_out),
accounts_hash,
incremental_snapshot_persistence,
)
.unwrap();
}

View File

@ -96,6 +96,7 @@ impl From<DeserializableVersionedBank> for BankFieldsToDeserialize {
stakes: dvb.stakes,
epoch_stakes: dvb.epoch_stakes,
is_delta: dvb.is_delta,
incremental_snapshot_persistence: None,
}
}
}
@ -209,6 +210,7 @@ impl<'a> TypeContext<'a> for Context {
// we can grab it on restart.
// TODO: if we do a snapshot version bump, consider moving this out.
lamports_per_signature,
None::<BankIncrementalSnapshotPersistence>,
)
.serialize(serializer)
}
@ -314,6 +316,10 @@ impl<'a> TypeContext<'a> for Context {
bank_fields.fee_rate_governor = bank_fields
.fee_rate_governor
.clone_with_lamports_per_signature(lamports_per_signature);
let incremental_snapshot_persistence = ignore_eof_error(deserialize_from(stream))?;
bank_fields.incremental_snapshot_persistence = incremental_snapshot_persistence;
Ok((bank_fields, accounts_db_fields))
}
@ -327,12 +333,13 @@ impl<'a> TypeContext<'a> for Context {
}
/// deserialize the bank from 'stream_reader'
/// modify the accounts_hash
/// modify the accounts_hash and incremental_snapshot_persistence
/// reserialize the bank to 'stream_writer'
fn reserialize_bank_fields_with_hash<R, W>(
stream_reader: &mut BufReader<R>,
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> std::result::Result<(), Box<bincode::ErrorKind>>
where
R: Read,
@ -345,6 +352,7 @@ impl<'a> TypeContext<'a> for Context {
let blockhash_queue = RwLock::new(rhs.blockhash_queue.clone());
let hard_forks = RwLock::new(rhs.hard_forks.clone());
let lamports_per_signature = rhs.fee_rate_governor.lamports_per_signature;
let bank = SerializableVersionedBank {
blockhash_queue: &blockhash_queue,
ancestors: &rhs.ancestors,
@ -382,7 +390,12 @@ impl<'a> TypeContext<'a> for Context {
bincode::serialize_into(
stream_writer,
&(bank, accounts_db_fields, lamports_per_signature),
&(
bank,
accounts_db_fields,
lamports_per_signature,
incremental_snapshot_persistence,
),
)
}
}

View File

@ -190,6 +190,7 @@ fn test_bank_serialize_style(
serde_style: SerdeStyle,
reserialize_accounts_hash: bool,
update_accounts_hash: bool,
incremental_snapshot_persistence: bool,
) {
solana_logger::setup();
let (genesis_config, _) = create_genesis_config(500);
@ -236,8 +237,18 @@ fn test_bank_serialize_style(
} else {
bank2.get_accounts_hash()
};
if reserialize_accounts_hash {
let slot = bank2.slot();
let slot = bank2.slot();
let incremental =
incremental_snapshot_persistence.then(|| BankIncrementalSnapshotPersistence {
full_slot: slot + 1,
full_hash: Hash::new(&[1; 32]),
full_capitalization: 31,
incremental_hash: Hash::new(&[2; 32]),
incremental_capitalization: 32,
});
if reserialize_accounts_hash || incremental_snapshot_persistence {
let temp_dir = TempDir::new().unwrap();
let slot_dir = temp_dir.path().join(slot.to_string());
let post_path = slot_dir.join(slot.to_string());
@ -248,21 +259,32 @@ fn test_bank_serialize_style(
let mut f = std::fs::File::create(&pre_path).unwrap();
f.write_all(&buf).unwrap();
}
assert!(reserialize_bank_with_new_accounts_hash(
temp_dir.path(),
slot,
&accounts_hash
&accounts_hash,
incremental.as_ref(),
));
let previous_len = buf.len();
// larger buffer than expected to make sure the file isn't larger than expected
let mut buf_reserialized = vec![0; previous_len + 1];
let sizeof_none = std::mem::size_of::<u64>();
let sizeof_incremental_snapshot_persistence =
std::mem::size_of::<Option<BankIncrementalSnapshotPersistence>>();
let mut buf_reserialized =
vec![0; previous_len + sizeof_incremental_snapshot_persistence + 1];
{
let mut f = std::fs::File::open(post_path).unwrap();
let size = f.read(&mut buf_reserialized).unwrap();
assert_eq!(size, previous_len);
let expected = if !incremental_snapshot_persistence {
previous_len
} else {
previous_len + sizeof_incremental_snapshot_persistence - sizeof_none
};
assert_eq!(size, expected);
buf_reserialized.truncate(size);
}
if update_accounts_hash {
if update_accounts_hash || incremental_snapshot_persistence {
// We cannot guarantee buffer contents are exactly the same if hash is the same.
// Things like hashsets/maps have randomness in their in-mem representations.
// This make serialized bytes not deterministic.
@ -311,6 +333,7 @@ fn test_bank_serialize_style(
assert_eq!(dbank.get_balance(&key3.pubkey()), 0);
assert_eq!(dbank.get_accounts_hash(), accounts_hash);
assert!(bank2 == dbank);
assert_eq!(dbank.incremental_snapshot_persistence, incremental);
}
pub(crate) fn reconstruct_accounts_db_via_serialization(
@ -359,11 +382,18 @@ fn test_bank_serialize_newer() {
for (reserialize_accounts_hash, update_accounts_hash) in
[(false, false), (true, false), (true, true)]
{
test_bank_serialize_style(
SerdeStyle::Newer,
reserialize_accounts_hash,
update_accounts_hash,
)
for incremental_snapshot_persistence in if reserialize_accounts_hash {
[false, true].to_vec()
} else {
[false].to_vec()
} {
test_bank_serialize_style(
SerdeStyle::Newer,
reserialize_accounts_hash,
update_accounts_hash,
incremental_snapshot_persistence,
)
}
}
}
@ -555,7 +585,7 @@ mod test_bank_serialize {
// This some what long test harness is required to freeze the ABI of
// Bank's serialization due to versioned nature
#[frozen_abi(digest = "9vGBt7YfymKUTPWLHVVpQbDtPD7dFDwXRMFkCzwujNqJ")]
#[frozen_abi(digest = "5py4Wkuj5fV2sLyA1MrPg4pGNwMEaygQLnpLyY8MMLGC")]
#[derive(Serialize, AbiExample)]
pub struct BankAbiTestWrapperNewer {
#[serde(serialize_with = "wrapper_newer")]

View File

@ -2045,6 +2045,7 @@ pub fn package_and_archive_full_snapshot(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&bank.get_accounts_hash(),
None,
);
let snapshot_package = SnapshotPackage::new(accounts_package, bank.get_accounts_hash());
@ -2097,6 +2098,7 @@ pub fn package_and_archive_incremental_snapshot(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&bank.get_accounts_hash(),
None,
);
let snapshot_package = SnapshotPackage::new(accounts_package, bank.get_accounts_hash());