Adds accounts hashes from full & incremental snapshots at startup (#30656)
This commit is contained in:
parent
505e3ff5c7
commit
263342a686
|
@ -100,6 +100,21 @@ pub struct SnapshotStreams<'a, R> {
|
|||
pub incremental_snapshot_stream: Option<&'a mut BufReader<R>>,
|
||||
}
|
||||
|
||||
/// Helper type to wrap BankFields when reconstructing Bank from either just a full
|
||||
/// snapshot, or both a full and incremental snapshot
|
||||
#[derive(Debug)]
|
||||
pub struct SnapshotBankFields {
|
||||
full: BankFieldsToDeserialize,
|
||||
incremental: Option<BankFieldsToDeserialize>,
|
||||
}
|
||||
|
||||
impl SnapshotBankFields {
|
||||
/// Collapse the SnapshotBankFields into a single (the latest) BankFieldsToDeserialize.
|
||||
pub fn collapse_into(self) -> BankFieldsToDeserialize {
|
||||
self.incremental.unwrap_or(self.full)
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper type to wrap AccountsDbFields when reconstructing AccountsDb from either just a full
|
||||
/// snapshot, or both a full and incremental snapshot
|
||||
#[derive(Debug)]
|
||||
|
@ -277,34 +292,35 @@ pub(crate) fn fields_from_stream<R: Read>(
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn fields_from_streams<R: Read>(
|
||||
pub(crate) fn fields_from_streams(
|
||||
serde_style: SerdeStyle,
|
||||
snapshot_streams: &mut SnapshotStreams<R>,
|
||||
snapshot_streams: &mut SnapshotStreams<impl Read>,
|
||||
) -> std::result::Result<
|
||||
(
|
||||
BankFieldsToDeserialize,
|
||||
SnapshotBankFields,
|
||||
SnapshotAccountsDbFields<SerializableAccountStorageEntry>,
|
||||
),
|
||||
Error,
|
||||
> {
|
||||
let (full_snapshot_bank_fields, full_snapshot_accounts_db_fields) =
|
||||
fields_from_stream(serde_style, snapshot_streams.full_snapshot_stream)?;
|
||||
let incremental_fields = snapshot_streams
|
||||
.incremental_snapshot_stream
|
||||
.as_mut()
|
||||
.map(|stream| fields_from_stream(serde_style, stream))
|
||||
.transpose()?;
|
||||
let (incremental_snapshot_bank_fields, incremental_snapshot_accounts_db_fields) =
|
||||
incremental_fields.unzip();
|
||||
snapshot_streams
|
||||
.incremental_snapshot_stream
|
||||
.as_mut()
|
||||
.map(|stream| fields_from_stream(serde_style, stream))
|
||||
.transpose()?
|
||||
.unzip();
|
||||
|
||||
let snapshot_bank_fields = SnapshotBankFields {
|
||||
full: full_snapshot_bank_fields,
|
||||
incremental: incremental_snapshot_bank_fields,
|
||||
};
|
||||
let snapshot_accounts_db_fields = SnapshotAccountsDbFields {
|
||||
full_snapshot_accounts_db_fields,
|
||||
incremental_snapshot_accounts_db_fields,
|
||||
};
|
||||
Ok((
|
||||
incremental_snapshot_bank_fields.unwrap_or(full_snapshot_bank_fields),
|
||||
snapshot_accounts_db_fields,
|
||||
))
|
||||
Ok((snapshot_bank_fields, snapshot_accounts_db_fields))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
|
@ -518,7 +534,7 @@ impl<'a, C> solana_frozen_abi::abi_example::IgnoreAsHelper for SerializableAccou
|
|||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn reconstruct_bank_from_fields<E>(
|
||||
bank_fields: BankFieldsToDeserialize,
|
||||
bank_fields: SnapshotBankFields,
|
||||
snapshot_accounts_db_fields: SnapshotAccountsDbFields<E>,
|
||||
genesis_config: &GenesisConfig,
|
||||
runtime_config: &RuntimeConfig,
|
||||
|
@ -537,6 +553,14 @@ fn reconstruct_bank_from_fields<E>(
|
|||
where
|
||||
E: SerializableStorage + std::marker::Sync,
|
||||
{
|
||||
let capitalizations = (
|
||||
bank_fields.full.capitalization,
|
||||
bank_fields
|
||||
.incremental
|
||||
.as_ref()
|
||||
.map(|bank_fields| bank_fields.capitalization),
|
||||
);
|
||||
let bank_fields = bank_fields.collapse_into();
|
||||
let (accounts_db, reconstructed_accounts_db_info) = reconstruct_accountsdb_from_fields(
|
||||
snapshot_accounts_db_fields,
|
||||
account_paths,
|
||||
|
@ -550,7 +574,7 @@ where
|
|||
accounts_update_notifier,
|
||||
exit,
|
||||
bank_fields.epoch_accounts_hash,
|
||||
bank_fields.capitalization,
|
||||
capitalizations,
|
||||
)?;
|
||||
|
||||
let bank_rc = BankRc::new(Accounts::new_empty(accounts_db), bank_fields.slot);
|
||||
|
@ -673,7 +697,7 @@ fn reconstruct_accountsdb_from_fields<E>(
|
|||
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
epoch_accounts_hash: Option<Hash>,
|
||||
capitalization: u64,
|
||||
capitalizations: (u64, Option<u64>),
|
||||
) -> Result<(AccountsDb, ReconstructedAccountsDbInfo), Error>
|
||||
where
|
||||
E: SerializableStorage + std::marker::Sync,
|
||||
|
@ -694,6 +718,42 @@ where
|
|||
.set_valid(EpochAccountsHash::new(epoch_accounts_hash), 0);
|
||||
}
|
||||
|
||||
// Store the accounts hash & capitalization, from the full snapshot, in the new AccountsDb
|
||||
{
|
||||
let AccountsDbFields(_, _, slot, bank_hash_info, _, _) =
|
||||
&snapshot_accounts_db_fields.full_snapshot_accounts_db_fields;
|
||||
let old_accounts_hash = accounts_db.set_accounts_hash_from_snapshot(
|
||||
*slot,
|
||||
bank_hash_info.accounts_hash.clone(),
|
||||
capitalizations.0,
|
||||
);
|
||||
assert!(
|
||||
old_accounts_hash.is_none(),
|
||||
"There should not already be an AccountsHash at slot {slot}: {old_accounts_hash:?}",
|
||||
);
|
||||
}
|
||||
|
||||
// Store the accounts hash & capitalization, from the incremental snapshot, in the new AccountsDb
|
||||
{
|
||||
if let Some(AccountsDbFields(_, _, slot, bank_hash_info, _, _)) =
|
||||
snapshot_accounts_db_fields
|
||||
.incremental_snapshot_accounts_db_fields
|
||||
.as_ref()
|
||||
{
|
||||
let old_accounts_hash = accounts_db.set_accounts_hash_from_snapshot(
|
||||
*slot,
|
||||
bank_hash_info.accounts_hash.clone(),
|
||||
capitalizations
|
||||
.1
|
||||
.expect("capitalization from incremental snapshot"),
|
||||
);
|
||||
assert!(
|
||||
old_accounts_hash.is_none(),
|
||||
"There should not already be an AccountsHash at slot {slot}: {old_accounts_hash:?}",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let AccountsDbFields(
|
||||
_snapshot_storages,
|
||||
snapshot_version,
|
||||
|
@ -741,15 +801,6 @@ where
|
|||
old_accounts_delta_hash.is_none(),
|
||||
"There should not already be an AccountsDeltaHash at slot {snapshot_slot}: {old_accounts_delta_hash:?}",
|
||||
);
|
||||
let old_accounts_hash = accounts_db.set_accounts_hash_from_snapshot(
|
||||
snapshot_slot,
|
||||
snapshot_bank_hash_info.accounts_hash,
|
||||
capitalization,
|
||||
);
|
||||
assert!(
|
||||
old_accounts_hash.is_none(),
|
||||
"There should not already be an AccountsHash at slot {snapshot_slot}: {old_accounts_hash:?}",
|
||||
);
|
||||
let old_stats = accounts_db
|
||||
.update_bank_hash_stats_from_snapshot(snapshot_slot, snapshot_bank_hash_info.stats);
|
||||
assert!(
|
||||
|
|
|
@ -124,7 +124,7 @@ where
|
|||
None,
|
||||
&Arc::default(),
|
||||
None,
|
||||
u64::default(),
|
||||
(u64::default(), None),
|
||||
)
|
||||
.map(|(accounts_db, _)| accounts_db)
|
||||
}
|
||||
|
|
|
@ -2187,7 +2187,7 @@ fn bank_fields_from_snapshots(
|
|||
Ok(
|
||||
match incremental_snapshot_version.unwrap_or(full_snapshot_version) {
|
||||
SnapshotVersion::V1_2_0 => fields_from_streams(SerdeStyle::Newer, snapshot_streams)
|
||||
.map(|(bank_fields, _accountsdb_fields)| bank_fields),
|
||||
.map(|(bank_fields, _accountsdb_fields)| bank_fields.collapse_into()),
|
||||
}?,
|
||||
)
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue