Parse snapshot for bank fields (#26016)

This commit is contained in:
carllin 2022-07-06 17:30:30 -05:00 committed by GitHub
parent ef30f1729c
commit 90ef2cd02a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 249 additions and 58 deletions

View File

@ -939,7 +939,7 @@ impl NonceInfo for NonceFull {
// Sync fields with BankFieldsToSerialize! This is paired with it.
// All members are made public to remain Bank's members private and to make versioned deserializer workable on this
#[derive(Clone, Debug, Default, PartialEq)]
pub(crate) struct BankFieldsToDeserialize {
pub struct BankFieldsToDeserialize {
pub(crate) blockhash_queue: BlockhashQueue,
pub(crate) ancestors: AncestorsForSerialization,
pub(crate) hash: Hash,

View File

@ -14,6 +14,7 @@ use {
epoch_stakes::EpochStakes,
hardened_unpack::UnpackedAppendVecMap,
rent_collector::RentCollector,
serde_snapshot::storage::SerializableAccountStorageEntry,
snapshot_utils::{self, BANK_SNAPSHOT_PRE_FILENAME_EXTENSION},
stakes::Stakes,
},
@ -64,7 +65,7 @@ pub(crate) enum SerdeStyle {
const MAX_STREAM_SIZE: u64 = 32 * 1024 * 1024 * 1024;
#[derive(Clone, Debug, Default, Deserialize, Serialize, AbiExample, PartialEq)]
struct AccountsDbFields<T>(
pub struct AccountsDbFields<T>(
HashMap<Slot, Vec<T>>,
StoredMetaWriteVersion,
Slot,
@ -87,7 +88,7 @@ pub struct SnapshotStreams<'a, R> {
/// Helper type to wrap AccountsDbFields when reconstructing AccountsDb from either just a full
/// snapshot, or both a full and incremental snapshot
#[derive(Debug)]
struct SnapshotAccountsDbFields<T> {
pub struct SnapshotAccountsDbFields<T> {
full_snapshot_accounts_db_fields: AccountsDbFields<T>,
incremental_snapshot_accounts_db_fields: Option<AccountsDbFields<T>>,
}
@ -226,23 +227,16 @@ pub(crate) fn compare_two_serialized_banks(
Ok(fields1 == fields2)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn bank_from_streams<R>(
pub(crate) fn fields_from_streams<R>(
serde_style: SerdeStyle,
snapshot_streams: &mut SnapshotStreams<R>,
account_paths: &[PathBuf],
unpacked_append_vec_map: UnpackedAppendVecMap,
genesis_config: &GenesisConfig,
debug_keys: Option<Arc<HashSet<Pubkey>>>,
additional_builtins: Option<&Builtins>,
account_secondary_indexes: AccountSecondaryIndexes,
caching_enabled: bool,
limit_load_slot_count_from_snapshot: Option<usize>,
shrink_ratio: AccountShrinkThreshold,
verify_index: bool,
accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
) -> std::result::Result<Bank, Error>
) -> std::result::Result<
(
BankFieldsToDeserialize,
SnapshotAccountsDbFields<SerializableAccountStorageEntry>,
),
Error,
>
where
R: Read,
{
@ -278,9 +272,36 @@ where
full_snapshot_accounts_db_fields,
incremental_snapshot_accounts_db_fields,
};
reconstruct_bank_from_fields(
Ok((
incremental_snapshot_bank_fields.unwrap_or(full_snapshot_bank_fields),
snapshot_accounts_db_fields,
))
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn bank_from_streams<R>(
serde_style: SerdeStyle,
snapshot_streams: &mut SnapshotStreams<R>,
account_paths: &[PathBuf],
unpacked_append_vec_map: UnpackedAppendVecMap,
genesis_config: &GenesisConfig,
debug_keys: Option<Arc<HashSet<Pubkey>>>,
additional_builtins: Option<&Builtins>,
account_secondary_indexes: AccountSecondaryIndexes,
caching_enabled: bool,
limit_load_slot_count_from_snapshot: Option<usize>,
shrink_ratio: AccountShrinkThreshold,
verify_index: bool,
accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
) -> std::result::Result<Bank, Error>
where
R: Read,
{
let (bank_fields, accounts_db_fields) = fields_from_streams(serde_style, snapshot_streams)?;
reconstruct_bank_from_fields(
bank_fields,
accounts_db_fields,
genesis_config,
account_paths,
unpacked_append_vec_map,

View File

@ -13,7 +13,7 @@ use {
std::{cell::RefCell, collections::HashSet, sync::RwLock},
};
type AccountsDbFields = super::AccountsDbFields<SerializableAccountStorageEntry>;
pub(super) type AccountsDbFields = super::AccountsDbFields<SerializableAccountStorageEntry>;
#[derive(Default, Clone, PartialEq, Eq, Debug, Deserialize, Serialize, AbiExample)]
struct UnusedAccounts {

View File

@ -8,7 +8,7 @@ pub(super) type SerializedAppendVecId = usize;
// Serializable version of AccountStorageEntry for snapshot format
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub(super) struct SerializableAccountStorageEntry {
pub struct SerializableAccountStorageEntry {
id: SerializedAppendVecId,
accounts_current_len: usize,
}

View File

@ -5,10 +5,12 @@ use {
},
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
bank::{Bank, BankSlotDelta},
bank::{Bank, BankFieldsToDeserialize, BankSlotDelta},
builtins::Builtins,
hardened_unpack::{unpack_snapshot, ParallelSelector, UnpackError, UnpackedAppendVecMap},
serde_snapshot::{bank_from_streams, bank_to_stream, SerdeStyle, SnapshotStreams},
serde_snapshot::{
bank_from_streams, bank_to_stream, fields_from_streams, SerdeStyle, SnapshotStreams,
},
shared_buffer_reader::{SharedBuffer, SharedBufferReader},
snapshot_archive_info::{
FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfoGetter,
@ -795,6 +797,91 @@ pub struct BankFromArchiveTimings {
// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
fn verify_and_unarchive_snapshots(
bank_snapshots_dir: impl AsRef<Path>,
full_snapshot_archive_info: &FullSnapshotArchiveInfo,
incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
account_paths: &[PathBuf],
) -> Result<(UnarchivedSnapshot, Option<UnarchivedSnapshot>)> {
check_are_snapshots_compatible(
full_snapshot_archive_info,
incremental_snapshot_archive_info,
)?;
let parallel_divisions = std::cmp::min(
PARALLEL_UNTAR_READERS_DEFAULT,
std::cmp::max(1, num_cpus::get() / 4),
);
let unarchived_full_snapshot = unarchive_snapshot(
&bank_snapshots_dir,
TMP_SNAPSHOT_ARCHIVE_PREFIX,
full_snapshot_archive_info.path(),
"snapshot untar",
account_paths,
full_snapshot_archive_info.archive_format(),
parallel_divisions,
)?;
let unarchived_incremental_snapshot =
if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
let unarchived_incremental_snapshot = unarchive_snapshot(
&bank_snapshots_dir,
TMP_SNAPSHOT_ARCHIVE_PREFIX,
incremental_snapshot_archive_info.path(),
"incremental snapshot untar",
account_paths,
incremental_snapshot_archive_info.archive_format(),
parallel_divisions,
)?;
Some(unarchived_incremental_snapshot)
} else {
None
};
Ok((unarchived_full_snapshot, unarchived_incremental_snapshot))
}
/// Utility for parsing out bank specific information from a snapshot archive. This utility can be used
/// to parse out bank specific information like the leader schedule, epoch schedule, etc.
pub fn bank_fields_from_snapshot_archives(
bank_snapshots_dir: impl AsRef<Path>,
full_snapshot_archives_dir: impl AsRef<Path>,
incremental_snapshot_archives_dir: impl AsRef<Path>,
) -> Result<BankFieldsToDeserialize> {
let full_snapshot_archive_info =
get_highest_full_snapshot_archive_info(&full_snapshot_archives_dir)
.ok_or(SnapshotError::NoSnapshotArchives)?;
let incremental_snapshot_archive_info = get_highest_incremental_snapshot_archive_info(
&incremental_snapshot_archives_dir,
full_snapshot_archive_info.slot(),
);
let temp_dir = tempfile::Builder::new()
.prefix("dummy-accounts-path")
.tempdir()?;
let account_paths = vec![temp_dir.path().to_path_buf()];
let (unarchived_full_snapshot, unarchived_incremental_snapshot) =
verify_and_unarchive_snapshots(
&bank_snapshots_dir,
&full_snapshot_archive_info,
incremental_snapshot_archive_info.as_ref(),
&account_paths,
)?;
bank_fields_from_snapshots(
&unarchived_full_snapshot.unpacked_snapshots_dir_and_version,
unarchived_incremental_snapshot
.as_ref()
.map(|unarchive_preparation_result| {
&unarchive_preparation_result.unpacked_snapshots_dir_and_version
}),
)
}
/// Rebuild bank from snapshot archives. Handles either just a full snapshot, or both a full
/// snapshot and an incremental snapshot.
#[allow(clippy::too_many_arguments)]
@ -816,41 +903,13 @@ pub fn bank_from_snapshot_archives(
accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
) -> Result<(Bank, BankFromArchiveTimings)> {
check_are_snapshots_compatible(
full_snapshot_archive_info,
incremental_snapshot_archive_info,
)?;
let parallel_divisions = std::cmp::min(
PARALLEL_UNTAR_READERS_DEFAULT,
std::cmp::max(1, num_cpus::get() / 4),
);
let unarchived_full_snapshot = unarchive_snapshot(
&bank_snapshots_dir,
TMP_SNAPSHOT_ARCHIVE_PREFIX,
full_snapshot_archive_info.path(),
"snapshot untar",
account_paths,
full_snapshot_archive_info.archive_format(),
parallel_divisions,
)?;
let mut unarchived_incremental_snapshot =
if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
let unarchived_incremental_snapshot = unarchive_snapshot(
&bank_snapshots_dir,
TMP_SNAPSHOT_ARCHIVE_PREFIX,
incremental_snapshot_archive_info.path(),
"incremental snapshot untar",
account_paths,
incremental_snapshot_archive_info.archive_format(),
parallel_divisions,
)?;
Some(unarchived_incremental_snapshot)
} else {
None
};
let (unarchived_full_snapshot, mut unarchived_incremental_snapshot) =
verify_and_unarchive_snapshots(
bank_snapshots_dir,
full_snapshot_archive_info,
incremental_snapshot_archive_info,
account_paths,
)?;
let mut unpacked_append_vec_map = unarchived_full_snapshot.unpacked_append_vec_map;
if let Some(ref mut unarchive_preparation_result) = unarchived_incremental_snapshot {
@ -1557,6 +1616,51 @@ fn verify_unpacked_snapshots_dir_and_version(
Ok((snapshot_version, root_paths))
}
fn bank_fields_from_snapshots(
full_snapshot_unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
incremental_snapshot_unpacked_snapshots_dir_and_version: Option<
&UnpackedSnapshotsDirAndVersion,
>,
) -> Result<BankFieldsToDeserialize> {
let (full_snapshot_version, full_snapshot_root_paths) =
verify_unpacked_snapshots_dir_and_version(
full_snapshot_unpacked_snapshots_dir_and_version,
)?;
let (incremental_snapshot_version, incremental_snapshot_root_paths) =
if let Some(snapshot_unpacked_snapshots_dir_and_version) =
incremental_snapshot_unpacked_snapshots_dir_and_version
{
let (snapshot_version, bank_snapshot_info) = verify_unpacked_snapshots_dir_and_version(
snapshot_unpacked_snapshots_dir_and_version,
)?;
(Some(snapshot_version), Some(bank_snapshot_info))
} else {
(None, None)
};
info!(
"Loading bank from full snapshot {} and incremental snapshot {:?}",
full_snapshot_root_paths.snapshot_path.display(),
incremental_snapshot_root_paths
.as_ref()
.map(|paths| paths.snapshot_path.display()),
);
let snapshot_root_paths = SnapshotRootPaths {
full_snapshot_root_file_path: full_snapshot_root_paths.snapshot_path,
incremental_snapshot_root_file_path: incremental_snapshot_root_paths
.map(|root_paths| root_paths.snapshot_path),
};
deserialize_snapshot_data_files(&snapshot_root_paths, |snapshot_streams| {
Ok(
match incremental_snapshot_version.unwrap_or(full_snapshot_version) {
SnapshotVersion::V1_2_0 => fields_from_streams(SerdeStyle::Newer, snapshot_streams)
.map(|(bank_fields, _accountsdb_fields)| bank_fields),
}?,
)
})
}
#[allow(clippy::too_many_arguments)]
fn rebuild_bank_from_snapshots(
full_snapshot_unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
@ -3599,6 +3703,72 @@ mod tests {
);
}
#[test]
fn test_bank_fields_from_snapshot() {
solana_logger::setup();
let collector = Pubkey::new_unique();
let key1 = Keypair::new();
let (genesis_config, mint_keypair) = create_genesis_config(sol_to_lamports(1_000_000.));
let bank0 = Arc::new(Bank::new_for_tests(&genesis_config));
while !bank0.is_complete() {
bank0.register_tick(&Hash::new_unique());
}
let slot = 1;
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &collector, slot));
while !bank1.is_complete() {
bank1.register_tick(&Hash::new_unique());
}
let all_snapshots_dir = tempfile::TempDir::new().unwrap();
let snapshot_archive_format = ArchiveFormat::Tar;
let full_snapshot_slot = slot;
bank_to_full_snapshot_archive(
&all_snapshots_dir,
&bank1,
None,
&all_snapshots_dir,
&all_snapshots_dir,
snapshot_archive_format,
DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
)
.unwrap();
let slot = slot + 1;
let bank2 = Arc::new(Bank::new_from_parent(&bank1, &collector, slot));
bank2
.transfer(sol_to_lamports(1.), &mint_keypair, &key1.pubkey())
.unwrap();
while !bank2.is_complete() {
bank2.register_tick(&Hash::new_unique());
}
bank_to_incremental_snapshot_archive(
&all_snapshots_dir,
&bank2,
full_snapshot_slot,
None,
&all_snapshots_dir,
&all_snapshots_dir,
snapshot_archive_format,
DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
)
.unwrap();
let bank_fields = bank_fields_from_snapshot_archives(
&all_snapshots_dir,
&all_snapshots_dir,
&all_snapshots_dir,
)
.unwrap();
assert_eq!(bank_fields.slot, bank2.slot());
assert_eq!(bank_fields.parent_slot, bank2.parent_slot());
}
/// All the permutations of `snapshot_type` for the new-and-old accounts packages:
///
/// new | old |