diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index a61b37c82f..c81af40eb1 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -633,9 +633,11 @@ impl<'a> ReadableAccount for LoadedAccount<'a> { } } +pub type AccountStorageMap = DashMap; + #[derive(Clone, Default, Debug)] pub struct AccountStorage { - pub map: DashMap, + pub map: AccountStorageMap, } impl AccountStorage { diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 5fc800dcfa..4b6fe326f6 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -12,18 +12,16 @@ use { blockhash_queue::BlockhashQueue, builtins::Builtins, epoch_stakes::EpochStakes, - hardened_unpack::UnpackedAppendVecMap, rent_collector::RentCollector, runtime_config::RuntimeConfig, serde_snapshot::storage::SerializableAccountStorageEntry, - snapshot_utils::{self, BANK_SNAPSHOT_PRE_FILENAME_EXTENSION}, + snapshot_utils::{self, StorageAndNextAppendVecId, BANK_SNAPSHOT_PRE_FILENAME_EXTENSION}, stakes::Stakes, }, bincode::{self, config::Options, Error}, log::*, - rayon::prelude::*, serde::{de::DeserializeOwned, Deserialize, Serialize}, - solana_measure::{measure, measure::Measure}, + solana_measure::measure::Measure, solana_sdk::{ clock::{Epoch, Slot, UnixTimestamp}, deserialize_utils::default_on_eof, @@ -42,11 +40,11 @@ use { result::Result, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, RwLock, + Arc, }, thread::Builder, }, - storage::{SerializableStorage, SerializedAppendVecId}, + storage::SerializableStorage, }; mod newer; @@ -54,6 +52,7 @@ mod storage; mod tests; mod utils; +pub(crate) use storage::SerializedAppendVecId; // a number of test cases in accounts_db use this #[cfg(test)] pub(crate) use tests::reconstruct_accounts_db_via_serialization; @@ -229,6 +228,25 @@ pub(crate) fn compare_two_serialized_banks( Ok(fields1 == fields2) } +/// Get snapshot storage lengths from accounts_db_fields +pub(crate) fn snapshot_storage_lengths_from_fields( + accounts_db_fields: &AccountsDbFields, +) -> HashMap> { + let AccountsDbFields(snapshot_storage, ..) = &accounts_db_fields; + snapshot_storage + .iter() + .map(|(slot, slot_storage)| { + ( + *slot, + slot_storage + .iter() + .map(|storage_entry| (storage_entry.id(), storage_entry.current_len())) + .collect(), + ) + }) + .collect() +} + pub(crate) fn fields_from_stream( serde_style: SerdeStyle, snapshot_stream: &mut BufReader, @@ -285,7 +303,7 @@ pub(crate) fn bank_from_streams( serde_style: SerdeStyle, snapshot_streams: &mut SnapshotStreams, account_paths: &[PathBuf], - unpacked_append_vec_map: UnpackedAppendVecMap, + storage_and_next_append_vec_id: StorageAndNextAppendVecId, genesis_config: &GenesisConfig, runtime_config: &RuntimeConfig, debug_keys: Option>>, @@ -308,7 +326,7 @@ where genesis_config, runtime_config, account_paths, - unpacked_append_vec_map, + storage_and_next_append_vec_id, debug_keys, additional_builtins, account_secondary_indexes, @@ -497,7 +515,7 @@ fn reconstruct_bank_from_fields( genesis_config: &GenesisConfig, runtime_config: &RuntimeConfig, account_paths: &[PathBuf], - unpacked_append_vec_map: UnpackedAppendVecMap, + storage_and_next_append_vec_id: StorageAndNextAppendVecId, debug_keys: Option>>, additional_builtins: Option<&Builtins>, account_secondary_indexes: AccountSecondaryIndexes, @@ -514,7 +532,7 @@ where let (accounts_db, reconstructed_accounts_db_info) = reconstruct_accountsdb_from_fields( snapshot_accounts_db_fields, account_paths, - unpacked_append_vec_map, + storage_and_next_append_vec_id, genesis_config, account_secondary_indexes, caching_enabled, @@ -600,7 +618,7 @@ fn remap_append_vec_file( Ok((remapped_append_vec_id, remapped_append_vec_path)) } -fn remap_and_reconstruct_single_storage( +pub(crate) fn remap_and_reconstruct_single_storage( slot: Slot, old_append_vec_id: SerializedAppendVecId, current_len: usize, @@ -624,66 +642,6 @@ fn remap_and_reconstruct_single_storage( Ok(storage) } -fn remap_and_reconstruct_slot_storage( - slot: Slot, - slot_storage: &[E], - unpacked_append_vec_map: &UnpackedAppendVecMap, - next_append_vec_id: &AtomicAppendVecId, - num_collisions: &AtomicUsize, -) -> Result>, Error> -where - E: SerializableStorage, -{ - slot_storage - .iter() - .map(|storage_entry| { - let file_name = AppendVec::file_name(slot, storage_entry.id()); - let append_vec_path = unpacked_append_vec_map.get(&file_name).ok_or_else(|| { - io::Error::new( - io::ErrorKind::NotFound, - format!("{} not found in unpacked append vecs", file_name), - ) - })?; - - let new_storage_entry = remap_and_reconstruct_single_storage( - slot, - storage_entry.id(), - storage_entry.current_len(), - append_vec_path, - next_append_vec_id, - num_collisions, - )?; - Ok((new_storage_entry.append_vec_id(), new_storage_entry)) - }) - .collect::, Error>>() -} - -fn remap_and_reconstruct_storages( - snapshot_storages: Vec<(Slot, Vec)>, - unpacked_append_vec_map: &UnpackedAppendVecMap, - next_append_vec_id: &AtomicAppendVecId, - num_collisions: &AtomicUsize, -) -> Result>>, Error> -where - E: SerializableStorage + std::marker::Sync, -{ - snapshot_storages - .into_par_iter() - .map(|(slot, slot_storage)| { - Ok(( - *slot, - remap_and_reconstruct_slot_storage( - *slot, - slot_storage, - unpacked_append_vec_map, - next_append_vec_id, - num_collisions, - )?, - )) - }) - .collect::, Error>>() -} - /// This struct contains side-info while reconstructing the accounts DB from fields. #[derive(Debug, Default, Copy, Clone)] struct ReconstructedAccountsDbInfo { @@ -694,7 +652,7 @@ struct ReconstructedAccountsDbInfo { fn reconstruct_accountsdb_from_fields( snapshot_accounts_db_fields: SnapshotAccountsDbFields, account_paths: &[PathBuf], - unpacked_append_vec_map: UnpackedAppendVecMap, + storage_and_next_append_vec_id: StorageAndNextAppendVecId, genesis_config: &GenesisConfig, account_secondary_indexes: AccountSecondaryIndexes, caching_enabled: bool, @@ -718,7 +676,7 @@ where ); let AccountsDbFields( - snapshot_storages, + _snapshot_storages, snapshot_version, snapshot_slot, snapshot_bank_hash_info, @@ -726,8 +684,6 @@ where snapshot_historical_roots_with_hash, ) = snapshot_accounts_db_fields.collapse_into()?; - let snapshot_storages = snapshot_storages.into_iter().collect::>(); - // Ensure all account paths exist for path in &accounts_db.paths { std::fs::create_dir_all(path) @@ -740,20 +696,15 @@ where snapshot_historical_roots_with_hash, ); - // Remap the deserialized AppendVec paths to point to correct local paths - let num_collisions = AtomicUsize::new(0); - let next_append_vec_id = AtomicAppendVecId::new(0); - let (mut storage, measure_remap) = measure!(remap_and_reconstruct_storages( - snapshot_storages, - &unpacked_append_vec_map, - &next_append_vec_id, - &num_collisions - )?); + let StorageAndNextAppendVecId { + storage, + next_append_vec_id, + } = storage_and_next_append_vec_id; // discard any slots with no storage entries // this can happen if a non-root slot was serialized // but non-root stores should not be included in the snapshot - storage.retain(|_slot, stores| !stores.is_empty()); + storage.retain(|_slot, stores| !stores.read().unwrap().is_empty()); assert!( !storage.is_empty(), "At least one storage entry must exist from deserializing stream" @@ -773,11 +724,7 @@ where .write() .unwrap() .insert(snapshot_slot, snapshot_bank_hash_info); - accounts_db.storage.map.extend( - storage - .into_iter() - .map(|(slot, slot_storage_entry)| (slot, Arc::new(RwLock::new(slot_storage_entry)))), - ); + accounts_db.storage.map.extend(storage.into_iter()); accounts_db .next_id .store(next_append_vec_id, Ordering::Release); @@ -821,12 +768,6 @@ where datapoint_info!( "reconstruct_accountsdb_from_fields()", - ("remap-time-us", measure_remap.as_us(), i64), - ( - "remap-collisions", - num_collisions.load(Ordering::Relaxed), - i64 - ), ("accountsdb-notify-at-start-us", measure_notify.as_us(), i64), ); diff --git a/runtime/src/serde_snapshot/storage.rs b/runtime/src/serde_snapshot/storage.rs index be152d4147..a1d8d15e70 100644 --- a/runtime/src/serde_snapshot/storage.rs +++ b/runtime/src/serde_snapshot/storage.rs @@ -4,7 +4,7 @@ use { }; /// The serialized AppendVecId type is fixed as usize -pub(super) type SerializedAppendVecId = usize; +pub(crate) type SerializedAppendVecId = usize; // Serializable version of AccountStorageEntry for snapshot format #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index 5834a23f96..8de4966576 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -1,12 +1,13 @@ #![cfg(test)] + use { super::*, crate::{ accounts::{test_utils::create_test_accounts, Accounts}, - accounts_db::{get_temp_accounts_paths, AccountShrinkThreshold}, + accounts_db::{get_temp_accounts_paths, AccountShrinkThreshold, AccountStorageMap}, + append_vec::AppendVec, bank::{Bank, Rewrites}, genesis_utils::{activate_all_features, activate_feature}, - hardened_unpack::UnpackedAppendVecMap, snapshot_utils::ArchiveFormat, status_cache::StatusCache, }, @@ -28,23 +29,48 @@ use { tempfile::TempDir, }; +/// Simulates the unpacking & storage reconstruction done during snapshot unpacking fn copy_append_vecs>( accounts_db: &AccountsDb, output_dir: P, -) -> std::io::Result { +) -> std::io::Result { let storage_entries = accounts_db .get_snapshot_storages(Slot::max_value(), None, None) .0; - let mut unpacked_append_vec_map = UnpackedAppendVecMap::new(); - for storage in storage_entries.iter().flatten() { - let storage_path = storage.get_path(); - let file_name = AppendVec::file_name(storage.slot(), storage.append_vec_id()); + let storage: AccountStorageMap = AccountStorageMap::with_capacity(storage_entries.len()); + let mut next_append_vec_id = 0; + for storage_entry in storage_entries.into_iter().flatten() { + // Copy file to new directory + let storage_path = storage_entry.get_path(); + let file_name = AppendVec::file_name(storage_entry.slot(), storage_entry.append_vec_id()); let output_path = output_dir.as_ref().join(&file_name); std::fs::copy(&storage_path, &output_path)?; - unpacked_append_vec_map.insert(file_name, output_path); + + // Read new file into append-vec and build new entry + let (append_vec, num_accounts) = + AppendVec::new_from_file(output_path, storage_entry.accounts.len())?; + let new_storage_entry = AccountStorageEntry::new_existing( + storage_entry.slot(), + storage_entry.append_vec_id(), + append_vec, + num_accounts, + ); + next_append_vec_id = next_append_vec_id.max(new_storage_entry.append_vec_id()); + storage + .entry(new_storage_entry.slot()) + .or_default() + .write() + .unwrap() + .insert( + new_storage_entry.append_vec_id(), + Arc::new(new_storage_entry), + ); } - Ok(unpacked_append_vec_map) + Ok(StorageAndNextAppendVecId { + storage, + next_append_vec_id: AtomicAppendVecId::new(next_append_vec_id + 1), + }) } fn check_accounts(accounts: &Accounts, pubkeys: &[Pubkey], num: usize) { @@ -63,7 +89,7 @@ fn check_accounts(accounts: &Accounts, pubkeys: &[Pubkey], num: usize) { fn context_accountsdb_from_stream<'a, C, R>( stream: &mut BufReader, account_paths: &[PathBuf], - unpacked_append_vec_map: UnpackedAppendVecMap, + storage_and_next_append_vec_id: StorageAndNextAppendVecId, ) -> Result where C: TypeContext<'a>, @@ -78,7 +104,7 @@ where reconstruct_accountsdb_from_fields( snapshot_accounts_db_fields, account_paths, - unpacked_append_vec_map, + storage_and_next_append_vec_id, &GenesisConfig { cluster_type: ClusterType::Development, ..GenesisConfig::default() @@ -98,7 +124,7 @@ fn accountsdb_from_stream( serde_style: SerdeStyle, stream: &mut BufReader, account_paths: &[PathBuf], - unpacked_append_vec_map: UnpackedAppendVecMap, + storage_and_next_append_vec_id: StorageAndNextAppendVecId, ) -> Result where R: Read, @@ -107,7 +133,7 @@ where SerdeStyle::Newer => context_accountsdb_from_stream::( stream, account_paths, - unpacked_append_vec_map, + storage_and_next_append_vec_id, ), } } @@ -164,7 +190,7 @@ fn test_accounts_serialize_style(serde_style: SerdeStyle) { let copied_accounts = TempDir::new().unwrap(); // Simulate obtaining a copy of the AppendVecs from a tarball - let unpacked_append_vec_map = + let storage_and_next_append_vec_id = copy_append_vecs(&accounts.accounts_db, copied_accounts.path()).unwrap(); let buf = writer.into_inner(); @@ -175,7 +201,7 @@ fn test_accounts_serialize_style(serde_style: SerdeStyle) { serde_style, &mut reader, &daccounts_paths, - unpacked_append_vec_map, + storage_and_next_append_vec_id, ) .unwrap(), ); @@ -303,7 +329,7 @@ fn test_bank_serialize_style( status_cache.add_root(2); // Create a directory to simulate AppendVecs unpackaged from a snapshot tar let copied_accounts = TempDir::new().unwrap(); - let unpacked_append_vec_map = + let storage_and_next_append_vec_id = copy_append_vecs(&bank2.rc.accounts.accounts_db, copied_accounts.path()).unwrap(); let mut snapshot_streams = SnapshotStreams { full_snapshot_stream: &mut reader, @@ -313,7 +339,7 @@ fn test_bank_serialize_style( serde_style, &mut snapshot_streams, &dbank_paths, - unpacked_append_vec_map, + storage_and_next_append_vec_id, &genesis_config, &RuntimeConfig::default(), None, @@ -356,10 +382,15 @@ pub(crate) fn reconstruct_accounts_db_via_serialization( let copied_accounts = TempDir::new().unwrap(); // Simulate obtaining a copy of the AppendVecs from a tarball - let unpacked_append_vec_map = copy_append_vecs(accounts, copied_accounts.path()).unwrap(); - let mut accounts_db = - accountsdb_from_stream(SerdeStyle::Newer, &mut reader, &[], unpacked_append_vec_map) - .unwrap(); + let storage_and_next_append_vec_id = + copy_append_vecs(accounts, copied_accounts.path()).unwrap(); + let mut accounts_db = accountsdb_from_stream( + SerdeStyle::Newer, + &mut reader, + &[], + storage_and_next_append_vec_id, + ) + .unwrap(); // The append vecs will be used from `copied_accounts` directly by the new AccountsDb so keep // its TempDir alive @@ -431,13 +462,13 @@ fn test_extra_fields_eof() { }; let (_accounts_dir, dbank_paths) = get_temp_accounts_paths(4).unwrap(); let copied_accounts = TempDir::new().unwrap(); - let unpacked_append_vec_map = + let storage_and_next_append_vec_id = copy_append_vecs(&bank.rc.accounts.accounts_db, copied_accounts.path()).unwrap(); let dbank = crate::serde_snapshot::bank_from_streams( SerdeStyle::Newer, &mut snapshot_streams, &dbank_paths, - unpacked_append_vec_map, + storage_and_next_append_vec_id, &genesis_config, &RuntimeConfig::default(), None, @@ -554,13 +585,13 @@ fn test_blank_extra_fields() { }; let (_accounts_dir, dbank_paths) = get_temp_accounts_paths(4).unwrap(); let copied_accounts = TempDir::new().unwrap(); - let unpacked_append_vec_map = + let storage_and_next_append_vec_id = copy_append_vecs(&bank.rc.accounts.accounts_db, copied_accounts.path()).unwrap(); let dbank = crate::serde_snapshot::bank_from_streams( SerdeStyle::Newer, &mut snapshot_streams, &dbank_paths, - unpacked_append_vec_map, + storage_and_next_append_vec_id, &genesis_config, &RuntimeConfig::default(), None, diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 3e60060547..a2ad674b99 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -19,6 +19,7 @@ use { snapshot_package::{ AccountsPackage, PendingAccountsPackage, SnapshotPackage, SnapshotType, }, + snapshot_utils::snapshot_storage_rebuilder::SnapshotStorageRebuilder, status_cache, }, bincode::{config::Options, serialize_into}, @@ -28,7 +29,7 @@ use { log::*, rayon::prelude::*, regex::Regex, - solana_measure::measure::Measure, + solana_measure::{measure, measure::Measure}, solana_sdk::{ clock::Slot, genesis_config::GenesisConfig, @@ -45,7 +46,7 @@ use { path::{Path, PathBuf}, process::ExitStatus, str::FromStr, - sync::Arc, + sync::{atomic::AtomicU32, Arc}, }, tar::{self, Archive}, tempfile::TempDir, @@ -53,7 +54,16 @@ use { }; mod archive_format; +mod snapshot_storage_rebuilder; pub use archive_format::*; +use { + crate::{ + accounts_db::{AccountStorageMap, AtomicAppendVecId}, + hardened_unpack::streaming_unpack_snapshot, + }, + crossbeam_channel::Sender, + std::thread::{Builder, JoinHandle}, +}; pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache"; pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote"; @@ -180,7 +190,7 @@ struct SnapshotRootPaths { struct UnarchivedSnapshot { #[allow(dead_code)] unpack_dir: TempDir, - unpacked_append_vec_map: UnpackedAppendVecMap, + storage: AccountStorageMap, unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion, measure_untar: Measure, } @@ -192,6 +202,13 @@ struct UnpackedSnapshotsDirAndVersion { snapshot_version: String, } +/// Helper type for passing around account storage map and next append vec id +/// for reconstructing accounts from a snapshot +pub(crate) struct StorageAndNextAppendVecId { + pub storage: AccountStorageMap, + pub next_append_vec_id: AtomicAppendVecId, +} + #[derive(Error, Debug)] #[allow(clippy::large_enum_variant)] pub enum SnapshotError { @@ -840,7 +857,7 @@ fn verify_and_unarchive_snapshots( full_snapshot_archive_info: &FullSnapshotArchiveInfo, incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>, account_paths: &[PathBuf], -) -> Result<(UnarchivedSnapshot, Option)> { +) -> Result<(UnarchivedSnapshot, Option, AtomicU32)> { check_are_snapshots_compatible( full_snapshot_archive_info, incremental_snapshot_archive_info, @@ -851,6 +868,7 @@ fn verify_and_unarchive_snapshots( std::cmp::max(1, num_cpus::get() / 4), ); + let next_append_vec_id = Arc::new(AtomicU32::new(0)); let unarchived_full_snapshot = unarchive_snapshot( &bank_snapshots_dir, TMP_SNAPSHOT_ARCHIVE_PREFIX, @@ -859,6 +877,7 @@ fn verify_and_unarchive_snapshots( account_paths, full_snapshot_archive_info.archive_format(), parallel_divisions, + next_append_vec_id.clone(), )?; let unarchived_incremental_snapshot = @@ -871,13 +890,18 @@ fn verify_and_unarchive_snapshots( account_paths, incremental_snapshot_archive_info.archive_format(), parallel_divisions, + next_append_vec_id.clone(), )?; Some(unarchived_incremental_snapshot) } else { None }; - Ok((unarchived_full_snapshot, unarchived_incremental_snapshot)) + Ok(( + unarchived_full_snapshot, + unarchived_incremental_snapshot, + Arc::try_unwrap(next_append_vec_id).unwrap(), + )) } /// Utility for parsing out bank specific information from a snapshot archive. This utility can be used @@ -902,7 +926,7 @@ pub fn bank_fields_from_snapshot_archives( let account_paths = vec![temp_dir.path().to_path_buf()]; - let (unarchived_full_snapshot, unarchived_incremental_snapshot) = + let (unarchived_full_snapshot, unarchived_incremental_snapshot, _next_append_vec_id) = verify_and_unarchive_snapshots( &bank_snapshots_dir, &full_snapshot_archive_info, @@ -942,7 +966,7 @@ pub fn bank_from_snapshot_archives( accounts_db_config: Option, accounts_update_notifier: Option, ) -> Result<(Bank, BankFromArchiveTimings)> { - let (unarchived_full_snapshot, mut unarchived_incremental_snapshot) = + let (unarchived_full_snapshot, mut unarchived_incremental_snapshot, next_append_vec_id) = verify_and_unarchive_snapshots( bank_snapshots_dir, full_snapshot_archive_info, @@ -950,13 +974,18 @@ pub fn bank_from_snapshot_archives( account_paths, )?; - let mut unpacked_append_vec_map = unarchived_full_snapshot.unpacked_append_vec_map; + let mut storage = unarchived_full_snapshot.storage; if let Some(ref mut unarchive_preparation_result) = unarchived_incremental_snapshot { - let incremental_snapshot_unpacked_append_vec_map = - std::mem::take(&mut unarchive_preparation_result.unpacked_append_vec_map); - unpacked_append_vec_map.extend(incremental_snapshot_unpacked_append_vec_map.into_iter()); + let incremental_snapshot_storages = + std::mem::take(&mut unarchive_preparation_result.storage); + storage.extend(incremental_snapshot_storages.into_iter()); } + let storage_and_next_append_vec_id = StorageAndNextAppendVecId { + storage, + next_append_vec_id, + }; + let mut measure_rebuild = Measure::start("rebuild bank from snapshots"); let bank = rebuild_bank_from_snapshots( &unarchived_full_snapshot.unpacked_snapshots_dir_and_version, @@ -966,7 +995,7 @@ pub fn bank_from_snapshot_archives( &unarchive_preparation_result.unpacked_snapshots_dir_and_version }), account_paths, - unpacked_append_vec_map, + storage_and_next_append_vec_id, genesis_config, runtime_config, debug_keys, @@ -1135,9 +1164,69 @@ fn verify_bank_against_expected_slot_hash( Ok(()) } +/// Spawns a thread for unpacking a snapshot +fn spawn_unpack_snapshot_thread( + file_sender: Sender, + account_paths: Arc>, + ledger_dir: Arc, + mut archive: Archive, + parallel_selector: Option, + thread_index: usize, +) -> JoinHandle<()> { + Builder::new() + .name(format!( + "solana-streaming-unarchive-snapshot-{thread_index}" + )) + .spawn(move || { + streaming_unpack_snapshot( + &mut archive, + ledger_dir.as_path(), + &account_paths, + parallel_selector, + &file_sender, + ) + .unwrap(); + }) + .unwrap() +} + +/// Streams unpacked files across channel +fn streaming_unarchive_snapshot( + file_sender: Sender, + account_paths: Vec, + ledger_dir: PathBuf, + snapshot_archive_path: PathBuf, + archive_format: ArchiveFormat, + num_threads: usize, +) -> Vec> { + let account_paths = Arc::new(account_paths); + let ledger_dir = Arc::new(ledger_dir); + let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format); + + (0..num_threads) + .map(|thread_index| { + let parallel_selector = Some(ParallelSelector { + index: thread_index, + divisions: num_threads, + }); + + let reader = SharedBufferReader::new(&shared_buffer); + let archive = Archive::new(reader); + spawn_unpack_snapshot_thread( + file_sender.clone(), + account_paths.clone(), + ledger_dir.clone(), + archive, + parallel_selector, + thread_index, + ) + }) + .collect() +} + /// Perform the common tasks when unarchiving a snapshot. Handles creating the temporary /// directories, untaring, reading the version file, and then returning those fields plus the -/// unpacked append vec map. +/// rebuilt storage fn unarchive_snapshot( bank_snapshots_dir: P, unpacked_snapshots_dir_prefix: &'static str, @@ -1146,6 +1235,7 @@ fn unarchive_snapshot( account_paths: &[PathBuf], archive_format: ArchiveFormat, parallel_divisions: usize, + next_append_vec_id: Arc, ) -> Result where P: AsRef, @@ -1155,24 +1245,36 @@ where .prefix(unpacked_snapshots_dir_prefix) .tempdir_in(bank_snapshots_dir)?; let unpacked_snapshots_dir = unpack_dir.path().join("snapshots"); + let unpacked_version_file = unpack_dir.path().join("version"); - let mut measure_untar = Measure::start(measure_name); - let unpacked_append_vec_map = untar_snapshot_in( - snapshot_archive_path, - unpack_dir.path(), - account_paths, + let (file_sender, file_receiver) = crossbeam_channel::unbounded(); + streaming_unarchive_snapshot( + file_sender, + account_paths.to_vec(), + unpack_dir.path().to_path_buf(), + snapshot_archive_path.as_ref().to_path_buf(), archive_format, parallel_divisions, - )?; - measure_untar.stop(); + ); + + let num_rebuilder_threads = num_cpus::get_physical() + .saturating_sub(parallel_divisions) + .max(1); + let (storage, measure_untar) = measure!( + SnapshotStorageRebuilder::rebuild_storage( + file_receiver, + num_rebuilder_threads, + next_append_vec_id + ), + measure_name + ); info!("{}", measure_untar); - let unpacked_version_file = unpack_dir.path().join("version"); let snapshot_version = snapshot_version_from_file(&unpacked_version_file)?; Ok(UnarchivedSnapshot { unpack_dir, - unpacked_append_vec_map, + storage, unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion { unpacked_snapshots_dir, snapshot_version, @@ -1678,7 +1780,7 @@ fn rebuild_bank_from_snapshots( &UnpackedSnapshotsDirAndVersion, >, account_paths: &[PathBuf], - unpacked_append_vec_map: UnpackedAppendVecMap, + storage_and_next_append_vec_id: StorageAndNextAppendVecId, genesis_config: &GenesisConfig, runtime_config: &RuntimeConfig, debug_keys: Option>>, @@ -1727,7 +1829,7 @@ fn rebuild_bank_from_snapshots( SerdeStyle::Newer, snapshot_streams, account_paths, - unpacked_append_vec_map, + storage_and_next_append_vec_id, genesis_config, runtime_config, debug_keys, diff --git a/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs new file mode 100644 index 0000000000..8b8b926900 --- /dev/null +++ b/runtime/src/snapshot_utils/snapshot_storage_rebuilder.rs @@ -0,0 +1,363 @@ +//! Provides interfaces for rebuilding snapshot storages + +use { + crate::{ + accounts_db::{AccountStorageEntry, AccountStorageMap, AppendVecId, AtomicAppendVecId}, + serde_snapshot::{ + self, remap_and_reconstruct_single_storage, snapshot_storage_lengths_from_fields, + SerdeStyle, SerializedAppendVecId, + }, + }, + crossbeam_channel::{select, unbounded, Receiver, Sender}, + dashmap::DashMap, + log::info, + rayon::{ + iter::{IntoParallelIterator, ParallelIterator}, + ThreadPool, ThreadPoolBuilder, + }, + solana_sdk::clock::Slot, + std::{ + collections::HashMap, + fs::File, + io::BufReader, + path::PathBuf, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::Instant, + }, +}; + +/// Stores state for rebuilding snapshot storages +#[derive(Debug)] +pub struct SnapshotStorageRebuilder { + /// Receiver for unpacked snapshot storage files + file_receiver: Receiver, + /// Number of threads to rebuild with + num_threads: usize, + /// Snapshot storage lengths - from the snapshot file + snapshot_storage_lengths: HashMap>, + /// Container for storing snapshot file paths + storage_paths: DashMap>>, + /// Container for storing rebuilt snapshot storages + storage: AccountStorageMap, + /// Tracks next append_vec_id + next_append_vec_id: Arc, + /// Tracker for number of processed slots + processed_slot_count: AtomicUsize, + /// Tracks the number of collisions in AppendVecId + num_collisions: AtomicUsize, +} + +impl SnapshotStorageRebuilder { + /// Synchronously spawns threads to rebuild snapshot storages + pub fn rebuild_storage( + file_receiver: Receiver, + num_threads: usize, + next_append_vec_id: Arc, + ) -> AccountStorageMap { + let (snapshot_file_path, append_vec_files) = Self::get_snapshot_file(&file_receiver); + let snapshot_storage_lengths = Self::process_snapshot_file(snapshot_file_path).unwrap(); + Self::spawn_rebuilder_threads( + file_receiver, + num_threads, + next_append_vec_id, + snapshot_storage_lengths, + append_vec_files, + ) + } + + /// Create the SnapshotStorageRebuilder for storing state during rebuilding + /// - pre-allocates data for storage paths + fn new( + file_receiver: Receiver, + num_threads: usize, + next_append_vec_id: Arc, + snapshot_storage_lengths: HashMap>, + ) -> Self { + let storage = DashMap::with_capacity(snapshot_storage_lengths.len()); + let storage_paths: DashMap<_, _> = snapshot_storage_lengths + .iter() + .map(|(slot, storage_lengths)| { + (*slot, Mutex::new(Vec::with_capacity(storage_lengths.len()))) + }) + .collect(); + Self { + file_receiver, + num_threads, + snapshot_storage_lengths, + storage_paths, + storage, + next_append_vec_id, + processed_slot_count: AtomicUsize::new(0), + num_collisions: AtomicUsize::new(0), + } + } + + /// Waits for snapshot file + /// Due to parallel unpacking, we may receive some append_vec files before the snapshot file + /// This function will push append_vec files into a buffer until we receive the snapshot file + fn get_snapshot_file(file_receiver: &Receiver) -> (PathBuf, Vec) { + let mut append_vec_files = Vec::with_capacity(1024); + let snapshot_file_path = loop { + if let Ok(path) = file_receiver.recv() { + let filename = path.file_name().unwrap().to_str().unwrap(); + match get_snapshot_file_kind(filename) { + Some(SnapshotFileKind::SnapshotFile) => { + break path; + } + Some(SnapshotFileKind::StorageFile) => { + append_vec_files.push(path); + } + None => {} // do nothing for other kinds of files + } + } else { + panic!("did not receive snapshot file from unpacking threads"); + } + }; + + (snapshot_file_path, append_vec_files) + } + + /// Process the snapshot file to get the size of each snapshot storage file + fn process_snapshot_file( + snapshot_file_path: PathBuf, + ) -> Result>, bincode::Error> { + let snapshot_file = File::open(snapshot_file_path).unwrap(); + let mut snapshot_stream = BufReader::new(snapshot_file); + let (_bank_fields, accounts_fields) = + serde_snapshot::fields_from_stream(SerdeStyle::Newer, &mut snapshot_stream)?; + + Ok(snapshot_storage_lengths_from_fields(&accounts_fields)) + } + + /// Spawn threads for processing buffered append_vec_files, and then received files + fn spawn_rebuilder_threads( + file_receiver: Receiver, + num_threads: usize, + next_append_vec_id: Arc, + snapshot_storage_lengths: HashMap>, + append_vec_files: Vec, + ) -> AccountStorageMap { + let rebuilder = Arc::new(SnapshotStorageRebuilder::new( + file_receiver, + num_threads, + next_append_vec_id, + snapshot_storage_lengths, + )); + + let thread_pool = rebuilder.build_thread_pool(); + + // Synchronously process buffered append_vec_files + thread_pool.install(|| { + rebuilder.process_buffered_files(append_vec_files).unwrap(); + }); + + // Asynchronously spawn threads to process received append_vec_files + let (exit_sender, exit_receiver) = unbounded(); + for _ in 0..rebuilder.num_threads { + Self::spawn_receiver_thread(&thread_pool, exit_sender.clone(), rebuilder.clone()); + } + drop(exit_sender); // drop otherwise loop below will never end + + // wait for asynchronous threads to complete + rebuilder.wait_for_completion(exit_receiver); + Arc::try_unwrap(rebuilder).unwrap().storage + } + + /// Processes buffered append_vec_files + fn process_buffered_files(&self, append_vec_files: Vec) -> Result<(), std::io::Error> { + append_vec_files + .into_par_iter() + .map(|path| self.process_append_vec_file(path)) + .collect::>() + } + + /// Spawn a single thread to process received append_vec_files + fn spawn_receiver_thread( + thread_pool: &ThreadPool, + exit_sender: Sender<()>, + rebuilder: Arc, + ) { + thread_pool.spawn(move || { + for path in rebuilder.file_receiver.iter() { + rebuilder.process_append_vec_file(path).unwrap(); + } + exit_sender.send(()).unwrap(); + }) + } + + /// Process an append_vec_file + fn process_append_vec_file(&self, path: PathBuf) -> Result<(), std::io::Error> { + let filename = path.file_name().unwrap().to_str().unwrap().to_owned(); + if let Some(SnapshotFileKind::StorageFile) = get_snapshot_file_kind(&filename) { + let (slot, slot_complete) = self.insert_slot_storage_file(path, filename); + if slot_complete { + self.process_complete_slot(slot)?; + self.processed_slot_count.fetch_add(1, Ordering::AcqRel); + } + } + Ok(()) + } + + /// Inserts single storage file, returns the slot and if the slot has all of its storage entries + fn insert_slot_storage_file(&self, path: PathBuf, filename: String) -> (Slot, bool) { + let (slot, _) = get_slot_and_append_vec_id(&filename); + let slot_storage_count = self.insert_storage_file(&slot, path); + + ( + slot, + slot_storage_count == self.snapshot_storage_lengths.get(&slot).unwrap().len(), + ) + } + + /// Insert storage path into slot and return the number of storage files for the slot + fn insert_storage_file(&self, slot: &Slot, path: PathBuf) -> usize { + let slot_paths = self.storage_paths.get(slot).unwrap(); + let mut lock = slot_paths.lock().unwrap(); + lock.push(path); + lock.len() + } + + /// Process a slot that has received all storage entries + fn process_complete_slot(&self, slot: Slot) -> Result<(), std::io::Error> { + let slot_storage_paths = self.storage_paths.get(&slot).unwrap(); + let lock = slot_storage_paths.lock().unwrap(); + + let slot_stores = lock + .iter() + .map(|path| { + let filename = path.file_name().unwrap().to_str().unwrap(); + let (_, old_append_vec_id) = get_slot_and_append_vec_id(filename); + let current_len = *self + .snapshot_storage_lengths + .get(&slot) + .unwrap() + .get(&(old_append_vec_id as usize)) + .unwrap(); + + let storage_entry = remap_and_reconstruct_single_storage( + slot, + old_append_vec_id, + current_len, + path.as_path(), + &self.next_append_vec_id, + &self.num_collisions, + )?; + + Ok((storage_entry.append_vec_id(), storage_entry)) + }) + .collect::>, std::io::Error>>()?; + + let slot_entry = self.storage.entry(slot).or_default(); + let mut storage_lock = slot_entry.write().unwrap(); + *storage_lock = slot_stores; + Ok(()) + } + + /// Wait for the completion of the rebuilding threads + fn wait_for_completion(&self, exit_receiver: Receiver<()>) { + let num_slots = self.snapshot_storage_lengths.len(); + let mut last_log_time = Instant::now(); + loop { + select! { + recv(exit_receiver) -> maybe_thread_accounts_data_len => { + match maybe_thread_accounts_data_len { + Ok(_) => continue, + Err(_) => break, + } + } + default(std::time::Duration::from_millis(100)) => { + let now = Instant::now(); + if now.duration_since(last_log_time).as_millis() >= 2000 { + let num_processed_slots = self.processed_slot_count.load(Ordering::Relaxed); + let num_collisions = self.num_collisions.load(Ordering::Relaxed); + info!("rebuilt storages for {num_processed_slots}/{num_slots} slots with {num_collisions} collisions"); + last_log_time = now; + } + } + } + } + } + + /// Builds thread pool to rebuild with + fn build_thread_pool(&self) -> ThreadPool { + ThreadPoolBuilder::default() + .num_threads(self.num_threads) + .build() + .unwrap() + } +} + +/// Used to determine if a filename is structured like a snapshot file, storage file, or neither +#[derive(PartialEq, Debug)] +enum SnapshotFileKind { + SnapshotFile, + StorageFile, +} + +/// Determines `SnapshotFileKind` for `filename` if any +fn get_snapshot_file_kind(filename: &str) -> Option { + let mut periods = 0; + let mut saw_numbers = false; + for x in filename.chars() { + if !x.is_ascii_digit() { + if x == '.' { + if periods > 0 || !saw_numbers { + return None; + } + saw_numbers = false; + periods += 1; + } else { + return None; + } + } else { + saw_numbers = true; + } + } + + match (periods, saw_numbers) { + (0, true) => Some(SnapshotFileKind::SnapshotFile), + (1, true) => Some(SnapshotFileKind::StorageFile), + (_, _) => None, + } +} + +/// Get the slot and append vec id from the filename +fn get_slot_and_append_vec_id(filename: &str) -> (Slot, usize) { + let mut split = filename.split('.'); + let slot = split.next().unwrap().parse().unwrap(); + let append_vec_id = split.next().unwrap().parse().unwrap(); + assert!(split.next().is_none()); + + (slot, append_vec_id) +} + +#[cfg(test)] +mod tests { + use {super::*, crate::append_vec::AppendVec}; + + #[test] + fn test_get_snapshot_file_kind() { + assert_eq!(None, get_snapshot_file_kind("file.txt")); + assert_eq!( + Some(SnapshotFileKind::SnapshotFile), + get_snapshot_file_kind("1234") + ); + assert_eq!( + Some(SnapshotFileKind::StorageFile), + get_snapshot_file_kind("1000.999") + ); + } + + #[test] + fn test_get_slot_and_append_vec_id() { + let expected_slot = 12345; + let expected_id = 9987; + let (slot, id) = + get_slot_and_append_vec_id(&AppendVec::file_name(expected_slot, expected_id)); + assert_eq!(expected_slot, slot); + assert_eq!(expected_id, id); + } +}