Feature: Interleaved Snapshot Untar and Indexing - Stage 1 (#26590)

* Add snapshot_storage_lengths_from_fields

* Add SnapshotUnpacker

* Add SnapshotStorageRebuilder

* Rebuild snapshot storage during unpack

* move snapshot unpacker back into snapshot_utils

* use SerializedAppendVecId in storage rebuilder

* create AccountStorageMap type alias

* Wrap storage and next_append_vec_id in a type

* fixed typo

* move use StorageAndNextAppendVecId

* calculate num_rebuilder_threads more concisely
This commit is contained in:
apfitzge 2022-08-29 13:17:27 -05:00 committed by GitHub
parent 3c7cd62030
commit 49411aaae0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 586 additions and 147 deletions

View File

@ -633,9 +633,11 @@ impl<'a> ReadableAccount for LoadedAccount<'a> {
}
}
pub type AccountStorageMap = DashMap<Slot, SlotStores>;
#[derive(Clone, Default, Debug)]
pub struct AccountStorage {
pub map: DashMap<Slot, SlotStores>,
pub map: AccountStorageMap,
}
impl AccountStorage {

View File

@ -12,18 +12,16 @@ use {
blockhash_queue::BlockhashQueue,
builtins::Builtins,
epoch_stakes::EpochStakes,
hardened_unpack::UnpackedAppendVecMap,
rent_collector::RentCollector,
runtime_config::RuntimeConfig,
serde_snapshot::storage::SerializableAccountStorageEntry,
snapshot_utils::{self, BANK_SNAPSHOT_PRE_FILENAME_EXTENSION},
snapshot_utils::{self, StorageAndNextAppendVecId, BANK_SNAPSHOT_PRE_FILENAME_EXTENSION},
stakes::Stakes,
},
bincode::{self, config::Options, Error},
log::*,
rayon::prelude::*,
serde::{de::DeserializeOwned, Deserialize, Serialize},
solana_measure::{measure, measure::Measure},
solana_measure::measure::Measure,
solana_sdk::{
clock::{Epoch, Slot, UnixTimestamp},
deserialize_utils::default_on_eof,
@ -42,11 +40,11 @@ use {
result::Result,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
Arc,
},
thread::Builder,
},
storage::{SerializableStorage, SerializedAppendVecId},
storage::SerializableStorage,
};
mod newer;
@ -54,6 +52,7 @@ mod storage;
mod tests;
mod utils;
pub(crate) use storage::SerializedAppendVecId;
// a number of test cases in accounts_db use this
#[cfg(test)]
pub(crate) use tests::reconstruct_accounts_db_via_serialization;
@ -229,6 +228,25 @@ pub(crate) fn compare_two_serialized_banks(
Ok(fields1 == fields2)
}
/// Get snapshot storage lengths from accounts_db_fields
pub(crate) fn snapshot_storage_lengths_from_fields(
accounts_db_fields: &AccountsDbFields<SerializableAccountStorageEntry>,
) -> HashMap<Slot, HashMap<SerializedAppendVecId, usize>> {
let AccountsDbFields(snapshot_storage, ..) = &accounts_db_fields;
snapshot_storage
.iter()
.map(|(slot, slot_storage)| {
(
*slot,
slot_storage
.iter()
.map(|storage_entry| (storage_entry.id(), storage_entry.current_len()))
.collect(),
)
})
.collect()
}
pub(crate) fn fields_from_stream<R: Read>(
serde_style: SerdeStyle,
snapshot_stream: &mut BufReader<R>,
@ -285,7 +303,7 @@ pub(crate) fn bank_from_streams<R>(
serde_style: SerdeStyle,
snapshot_streams: &mut SnapshotStreams<R>,
account_paths: &[PathBuf],
unpacked_append_vec_map: UnpackedAppendVecMap,
storage_and_next_append_vec_id: StorageAndNextAppendVecId,
genesis_config: &GenesisConfig,
runtime_config: &RuntimeConfig,
debug_keys: Option<Arc<HashSet<Pubkey>>>,
@ -308,7 +326,7 @@ where
genesis_config,
runtime_config,
account_paths,
unpacked_append_vec_map,
storage_and_next_append_vec_id,
debug_keys,
additional_builtins,
account_secondary_indexes,
@ -497,7 +515,7 @@ fn reconstruct_bank_from_fields<E>(
genesis_config: &GenesisConfig,
runtime_config: &RuntimeConfig,
account_paths: &[PathBuf],
unpacked_append_vec_map: UnpackedAppendVecMap,
storage_and_next_append_vec_id: StorageAndNextAppendVecId,
debug_keys: Option<Arc<HashSet<Pubkey>>>,
additional_builtins: Option<&Builtins>,
account_secondary_indexes: AccountSecondaryIndexes,
@ -514,7 +532,7 @@ where
let (accounts_db, reconstructed_accounts_db_info) = reconstruct_accountsdb_from_fields(
snapshot_accounts_db_fields,
account_paths,
unpacked_append_vec_map,
storage_and_next_append_vec_id,
genesis_config,
account_secondary_indexes,
caching_enabled,
@ -600,7 +618,7 @@ fn remap_append_vec_file(
Ok((remapped_append_vec_id, remapped_append_vec_path))
}
fn remap_and_reconstruct_single_storage(
pub(crate) fn remap_and_reconstruct_single_storage(
slot: Slot,
old_append_vec_id: SerializedAppendVecId,
current_len: usize,
@ -624,66 +642,6 @@ fn remap_and_reconstruct_single_storage(
Ok(storage)
}
fn remap_and_reconstruct_slot_storage<E>(
slot: Slot,
slot_storage: &[E],
unpacked_append_vec_map: &UnpackedAppendVecMap,
next_append_vec_id: &AtomicAppendVecId,
num_collisions: &AtomicUsize,
) -> Result<HashMap<AppendVecId, Arc<AccountStorageEntry>>, Error>
where
E: SerializableStorage,
{
slot_storage
.iter()
.map(|storage_entry| {
let file_name = AppendVec::file_name(slot, storage_entry.id());
let append_vec_path = unpacked_append_vec_map.get(&file_name).ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
format!("{} not found in unpacked append vecs", file_name),
)
})?;
let new_storage_entry = remap_and_reconstruct_single_storage(
slot,
storage_entry.id(),
storage_entry.current_len(),
append_vec_path,
next_append_vec_id,
num_collisions,
)?;
Ok((new_storage_entry.append_vec_id(), new_storage_entry))
})
.collect::<Result<HashMap<AppendVecId, _>, Error>>()
}
fn remap_and_reconstruct_storages<E>(
snapshot_storages: Vec<(Slot, Vec<E>)>,
unpacked_append_vec_map: &UnpackedAppendVecMap,
next_append_vec_id: &AtomicAppendVecId,
num_collisions: &AtomicUsize,
) -> Result<HashMap<Slot, HashMap<AppendVecId, Arc<AccountStorageEntry>>>, Error>
where
E: SerializableStorage + std::marker::Sync,
{
snapshot_storages
.into_par_iter()
.map(|(slot, slot_storage)| {
Ok((
*slot,
remap_and_reconstruct_slot_storage(
*slot,
slot_storage,
unpacked_append_vec_map,
next_append_vec_id,
num_collisions,
)?,
))
})
.collect::<Result<HashMap<Slot, _>, Error>>()
}
/// This struct contains side-info while reconstructing the accounts DB from fields.
#[derive(Debug, Default, Copy, Clone)]
struct ReconstructedAccountsDbInfo {
@ -694,7 +652,7 @@ struct ReconstructedAccountsDbInfo {
fn reconstruct_accountsdb_from_fields<E>(
snapshot_accounts_db_fields: SnapshotAccountsDbFields<E>,
account_paths: &[PathBuf],
unpacked_append_vec_map: UnpackedAppendVecMap,
storage_and_next_append_vec_id: StorageAndNextAppendVecId,
genesis_config: &GenesisConfig,
account_secondary_indexes: AccountSecondaryIndexes,
caching_enabled: bool,
@ -718,7 +676,7 @@ where
);
let AccountsDbFields(
snapshot_storages,
_snapshot_storages,
snapshot_version,
snapshot_slot,
snapshot_bank_hash_info,
@ -726,8 +684,6 @@ where
snapshot_historical_roots_with_hash,
) = snapshot_accounts_db_fields.collapse_into()?;
let snapshot_storages = snapshot_storages.into_iter().collect::<Vec<_>>();
// Ensure all account paths exist
for path in &accounts_db.paths {
std::fs::create_dir_all(path)
@ -740,20 +696,15 @@ where
snapshot_historical_roots_with_hash,
);
// Remap the deserialized AppendVec paths to point to correct local paths
let num_collisions = AtomicUsize::new(0);
let next_append_vec_id = AtomicAppendVecId::new(0);
let (mut storage, measure_remap) = measure!(remap_and_reconstruct_storages(
snapshot_storages,
&unpacked_append_vec_map,
&next_append_vec_id,
&num_collisions
)?);
let StorageAndNextAppendVecId {
storage,
next_append_vec_id,
} = storage_and_next_append_vec_id;
// discard any slots with no storage entries
// this can happen if a non-root slot was serialized
// but non-root stores should not be included in the snapshot
storage.retain(|_slot, stores| !stores.is_empty());
storage.retain(|_slot, stores| !stores.read().unwrap().is_empty());
assert!(
!storage.is_empty(),
"At least one storage entry must exist from deserializing stream"
@ -773,11 +724,7 @@ where
.write()
.unwrap()
.insert(snapshot_slot, snapshot_bank_hash_info);
accounts_db.storage.map.extend(
storage
.into_iter()
.map(|(slot, slot_storage_entry)| (slot, Arc::new(RwLock::new(slot_storage_entry)))),
);
accounts_db.storage.map.extend(storage.into_iter());
accounts_db
.next_id
.store(next_append_vec_id, Ordering::Release);
@ -821,12 +768,6 @@ where
datapoint_info!(
"reconstruct_accountsdb_from_fields()",
("remap-time-us", measure_remap.as_us(), i64),
(
"remap-collisions",
num_collisions.load(Ordering::Relaxed),
i64
),
("accountsdb-notify-at-start-us", measure_notify.as_us(), i64),
);

View File

@ -4,7 +4,7 @@ use {
};
/// The serialized AppendVecId type is fixed as usize
pub(super) type SerializedAppendVecId = usize;
pub(crate) type SerializedAppendVecId = usize;
// Serializable version of AccountStorageEntry for snapshot format
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]

View File

@ -1,12 +1,13 @@
#![cfg(test)]
use {
super::*,
crate::{
accounts::{test_utils::create_test_accounts, Accounts},
accounts_db::{get_temp_accounts_paths, AccountShrinkThreshold},
accounts_db::{get_temp_accounts_paths, AccountShrinkThreshold, AccountStorageMap},
append_vec::AppendVec,
bank::{Bank, Rewrites},
genesis_utils::{activate_all_features, activate_feature},
hardened_unpack::UnpackedAppendVecMap,
snapshot_utils::ArchiveFormat,
status_cache::StatusCache,
},
@ -28,23 +29,48 @@ use {
tempfile::TempDir,
};
/// Simulates the unpacking & storage reconstruction done during snapshot unpacking
fn copy_append_vecs<P: AsRef<Path>>(
accounts_db: &AccountsDb,
output_dir: P,
) -> std::io::Result<UnpackedAppendVecMap> {
) -> std::io::Result<StorageAndNextAppendVecId> {
let storage_entries = accounts_db
.get_snapshot_storages(Slot::max_value(), None, None)
.0;
let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
for storage in storage_entries.iter().flatten() {
let storage_path = storage.get_path();
let file_name = AppendVec::file_name(storage.slot(), storage.append_vec_id());
let storage: AccountStorageMap = AccountStorageMap::with_capacity(storage_entries.len());
let mut next_append_vec_id = 0;
for storage_entry in storage_entries.into_iter().flatten() {
// Copy file to new directory
let storage_path = storage_entry.get_path();
let file_name = AppendVec::file_name(storage_entry.slot(), storage_entry.append_vec_id());
let output_path = output_dir.as_ref().join(&file_name);
std::fs::copy(&storage_path, &output_path)?;
unpacked_append_vec_map.insert(file_name, output_path);
// Read new file into append-vec and build new entry
let (append_vec, num_accounts) =
AppendVec::new_from_file(output_path, storage_entry.accounts.len())?;
let new_storage_entry = AccountStorageEntry::new_existing(
storage_entry.slot(),
storage_entry.append_vec_id(),
append_vec,
num_accounts,
);
next_append_vec_id = next_append_vec_id.max(new_storage_entry.append_vec_id());
storage
.entry(new_storage_entry.slot())
.or_default()
.write()
.unwrap()
.insert(
new_storage_entry.append_vec_id(),
Arc::new(new_storage_entry),
);
}
Ok(unpacked_append_vec_map)
Ok(StorageAndNextAppendVecId {
storage,
next_append_vec_id: AtomicAppendVecId::new(next_append_vec_id + 1),
})
}
fn check_accounts(accounts: &Accounts, pubkeys: &[Pubkey], num: usize) {
@ -63,7 +89,7 @@ fn check_accounts(accounts: &Accounts, pubkeys: &[Pubkey], num: usize) {
fn context_accountsdb_from_stream<'a, C, R>(
stream: &mut BufReader<R>,
account_paths: &[PathBuf],
unpacked_append_vec_map: UnpackedAppendVecMap,
storage_and_next_append_vec_id: StorageAndNextAppendVecId,
) -> Result<AccountsDb, Error>
where
C: TypeContext<'a>,
@ -78,7 +104,7 @@ where
reconstruct_accountsdb_from_fields(
snapshot_accounts_db_fields,
account_paths,
unpacked_append_vec_map,
storage_and_next_append_vec_id,
&GenesisConfig {
cluster_type: ClusterType::Development,
..GenesisConfig::default()
@ -98,7 +124,7 @@ fn accountsdb_from_stream<R>(
serde_style: SerdeStyle,
stream: &mut BufReader<R>,
account_paths: &[PathBuf],
unpacked_append_vec_map: UnpackedAppendVecMap,
storage_and_next_append_vec_id: StorageAndNextAppendVecId,
) -> Result<AccountsDb, Error>
where
R: Read,
@ -107,7 +133,7 @@ where
SerdeStyle::Newer => context_accountsdb_from_stream::<newer::Context, R>(
stream,
account_paths,
unpacked_append_vec_map,
storage_and_next_append_vec_id,
),
}
}
@ -164,7 +190,7 @@ fn test_accounts_serialize_style(serde_style: SerdeStyle) {
let copied_accounts = TempDir::new().unwrap();
// Simulate obtaining a copy of the AppendVecs from a tarball
let unpacked_append_vec_map =
let storage_and_next_append_vec_id =
copy_append_vecs(&accounts.accounts_db, copied_accounts.path()).unwrap();
let buf = writer.into_inner();
@ -175,7 +201,7 @@ fn test_accounts_serialize_style(serde_style: SerdeStyle) {
serde_style,
&mut reader,
&daccounts_paths,
unpacked_append_vec_map,
storage_and_next_append_vec_id,
)
.unwrap(),
);
@ -303,7 +329,7 @@ fn test_bank_serialize_style(
status_cache.add_root(2);
// Create a directory to simulate AppendVecs unpackaged from a snapshot tar
let copied_accounts = TempDir::new().unwrap();
let unpacked_append_vec_map =
let storage_and_next_append_vec_id =
copy_append_vecs(&bank2.rc.accounts.accounts_db, copied_accounts.path()).unwrap();
let mut snapshot_streams = SnapshotStreams {
full_snapshot_stream: &mut reader,
@ -313,7 +339,7 @@ fn test_bank_serialize_style(
serde_style,
&mut snapshot_streams,
&dbank_paths,
unpacked_append_vec_map,
storage_and_next_append_vec_id,
&genesis_config,
&RuntimeConfig::default(),
None,
@ -356,10 +382,15 @@ pub(crate) fn reconstruct_accounts_db_via_serialization(
let copied_accounts = TempDir::new().unwrap();
// Simulate obtaining a copy of the AppendVecs from a tarball
let unpacked_append_vec_map = copy_append_vecs(accounts, copied_accounts.path()).unwrap();
let mut accounts_db =
accountsdb_from_stream(SerdeStyle::Newer, &mut reader, &[], unpacked_append_vec_map)
.unwrap();
let storage_and_next_append_vec_id =
copy_append_vecs(accounts, copied_accounts.path()).unwrap();
let mut accounts_db = accountsdb_from_stream(
SerdeStyle::Newer,
&mut reader,
&[],
storage_and_next_append_vec_id,
)
.unwrap();
// The append vecs will be used from `copied_accounts` directly by the new AccountsDb so keep
// its TempDir alive
@ -431,13 +462,13 @@ fn test_extra_fields_eof() {
};
let (_accounts_dir, dbank_paths) = get_temp_accounts_paths(4).unwrap();
let copied_accounts = TempDir::new().unwrap();
let unpacked_append_vec_map =
let storage_and_next_append_vec_id =
copy_append_vecs(&bank.rc.accounts.accounts_db, copied_accounts.path()).unwrap();
let dbank = crate::serde_snapshot::bank_from_streams(
SerdeStyle::Newer,
&mut snapshot_streams,
&dbank_paths,
unpacked_append_vec_map,
storage_and_next_append_vec_id,
&genesis_config,
&RuntimeConfig::default(),
None,
@ -554,13 +585,13 @@ fn test_blank_extra_fields() {
};
let (_accounts_dir, dbank_paths) = get_temp_accounts_paths(4).unwrap();
let copied_accounts = TempDir::new().unwrap();
let unpacked_append_vec_map =
let storage_and_next_append_vec_id =
copy_append_vecs(&bank.rc.accounts.accounts_db, copied_accounts.path()).unwrap();
let dbank = crate::serde_snapshot::bank_from_streams(
SerdeStyle::Newer,
&mut snapshot_streams,
&dbank_paths,
unpacked_append_vec_map,
storage_and_next_append_vec_id,
&genesis_config,
&RuntimeConfig::default(),
None,

View File

@ -19,6 +19,7 @@ use {
snapshot_package::{
AccountsPackage, PendingAccountsPackage, SnapshotPackage, SnapshotType,
},
snapshot_utils::snapshot_storage_rebuilder::SnapshotStorageRebuilder,
status_cache,
},
bincode::{config::Options, serialize_into},
@ -28,7 +29,7 @@ use {
log::*,
rayon::prelude::*,
regex::Regex,
solana_measure::measure::Measure,
solana_measure::{measure, measure::Measure},
solana_sdk::{
clock::Slot,
genesis_config::GenesisConfig,
@ -45,7 +46,7 @@ use {
path::{Path, PathBuf},
process::ExitStatus,
str::FromStr,
sync::Arc,
sync::{atomic::AtomicU32, Arc},
},
tar::{self, Archive},
tempfile::TempDir,
@ -53,7 +54,16 @@ use {
};
mod archive_format;
mod snapshot_storage_rebuilder;
pub use archive_format::*;
use {
crate::{
accounts_db::{AccountStorageMap, AtomicAppendVecId},
hardened_unpack::streaming_unpack_snapshot,
},
crossbeam_channel::Sender,
std::thread::{Builder, JoinHandle},
};
pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
@ -180,7 +190,7 @@ struct SnapshotRootPaths {
struct UnarchivedSnapshot {
#[allow(dead_code)]
unpack_dir: TempDir,
unpacked_append_vec_map: UnpackedAppendVecMap,
storage: AccountStorageMap,
unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion,
measure_untar: Measure,
}
@ -192,6 +202,13 @@ struct UnpackedSnapshotsDirAndVersion {
snapshot_version: String,
}
/// Helper type for passing around account storage map and next append vec id
/// for reconstructing accounts from a snapshot
pub(crate) struct StorageAndNextAppendVecId {
pub storage: AccountStorageMap,
pub next_append_vec_id: AtomicAppendVecId,
}
#[derive(Error, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum SnapshotError {
@ -840,7 +857,7 @@ fn verify_and_unarchive_snapshots(
full_snapshot_archive_info: &FullSnapshotArchiveInfo,
incremental_snapshot_archive_info: Option<&IncrementalSnapshotArchiveInfo>,
account_paths: &[PathBuf],
) -> Result<(UnarchivedSnapshot, Option<UnarchivedSnapshot>)> {
) -> Result<(UnarchivedSnapshot, Option<UnarchivedSnapshot>, AtomicU32)> {
check_are_snapshots_compatible(
full_snapshot_archive_info,
incremental_snapshot_archive_info,
@ -851,6 +868,7 @@ fn verify_and_unarchive_snapshots(
std::cmp::max(1, num_cpus::get() / 4),
);
let next_append_vec_id = Arc::new(AtomicU32::new(0));
let unarchived_full_snapshot = unarchive_snapshot(
&bank_snapshots_dir,
TMP_SNAPSHOT_ARCHIVE_PREFIX,
@ -859,6 +877,7 @@ fn verify_and_unarchive_snapshots(
account_paths,
full_snapshot_archive_info.archive_format(),
parallel_divisions,
next_append_vec_id.clone(),
)?;
let unarchived_incremental_snapshot =
@ -871,13 +890,18 @@ fn verify_and_unarchive_snapshots(
account_paths,
incremental_snapshot_archive_info.archive_format(),
parallel_divisions,
next_append_vec_id.clone(),
)?;
Some(unarchived_incremental_snapshot)
} else {
None
};
Ok((unarchived_full_snapshot, unarchived_incremental_snapshot))
Ok((
unarchived_full_snapshot,
unarchived_incremental_snapshot,
Arc::try_unwrap(next_append_vec_id).unwrap(),
))
}
/// Utility for parsing out bank specific information from a snapshot archive. This utility can be used
@ -902,7 +926,7 @@ pub fn bank_fields_from_snapshot_archives(
let account_paths = vec![temp_dir.path().to_path_buf()];
let (unarchived_full_snapshot, unarchived_incremental_snapshot) =
let (unarchived_full_snapshot, unarchived_incremental_snapshot, _next_append_vec_id) =
verify_and_unarchive_snapshots(
&bank_snapshots_dir,
&full_snapshot_archive_info,
@ -942,7 +966,7 @@ pub fn bank_from_snapshot_archives(
accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
) -> Result<(Bank, BankFromArchiveTimings)> {
let (unarchived_full_snapshot, mut unarchived_incremental_snapshot) =
let (unarchived_full_snapshot, mut unarchived_incremental_snapshot, next_append_vec_id) =
verify_and_unarchive_snapshots(
bank_snapshots_dir,
full_snapshot_archive_info,
@ -950,13 +974,18 @@ pub fn bank_from_snapshot_archives(
account_paths,
)?;
let mut unpacked_append_vec_map = unarchived_full_snapshot.unpacked_append_vec_map;
let mut storage = unarchived_full_snapshot.storage;
if let Some(ref mut unarchive_preparation_result) = unarchived_incremental_snapshot {
let incremental_snapshot_unpacked_append_vec_map =
std::mem::take(&mut unarchive_preparation_result.unpacked_append_vec_map);
unpacked_append_vec_map.extend(incremental_snapshot_unpacked_append_vec_map.into_iter());
let incremental_snapshot_storages =
std::mem::take(&mut unarchive_preparation_result.storage);
storage.extend(incremental_snapshot_storages.into_iter());
}
let storage_and_next_append_vec_id = StorageAndNextAppendVecId {
storage,
next_append_vec_id,
};
let mut measure_rebuild = Measure::start("rebuild bank from snapshots");
let bank = rebuild_bank_from_snapshots(
&unarchived_full_snapshot.unpacked_snapshots_dir_and_version,
@ -966,7 +995,7 @@ pub fn bank_from_snapshot_archives(
&unarchive_preparation_result.unpacked_snapshots_dir_and_version
}),
account_paths,
unpacked_append_vec_map,
storage_and_next_append_vec_id,
genesis_config,
runtime_config,
debug_keys,
@ -1135,9 +1164,69 @@ fn verify_bank_against_expected_slot_hash(
Ok(())
}
/// Spawns a thread for unpacking a snapshot
fn spawn_unpack_snapshot_thread(
file_sender: Sender<PathBuf>,
account_paths: Arc<Vec<PathBuf>>,
ledger_dir: Arc<PathBuf>,
mut archive: Archive<SharedBufferReader>,
parallel_selector: Option<ParallelSelector>,
thread_index: usize,
) -> JoinHandle<()> {
Builder::new()
.name(format!(
"solana-streaming-unarchive-snapshot-{thread_index}"
))
.spawn(move || {
streaming_unpack_snapshot(
&mut archive,
ledger_dir.as_path(),
&account_paths,
parallel_selector,
&file_sender,
)
.unwrap();
})
.unwrap()
}
/// Streams unpacked files across channel
fn streaming_unarchive_snapshot(
file_sender: Sender<PathBuf>,
account_paths: Vec<PathBuf>,
ledger_dir: PathBuf,
snapshot_archive_path: PathBuf,
archive_format: ArchiveFormat,
num_threads: usize,
) -> Vec<JoinHandle<()>> {
let account_paths = Arc::new(account_paths);
let ledger_dir = Arc::new(ledger_dir);
let shared_buffer = untar_snapshot_create_shared_buffer(&snapshot_archive_path, archive_format);
(0..num_threads)
.map(|thread_index| {
let parallel_selector = Some(ParallelSelector {
index: thread_index,
divisions: num_threads,
});
let reader = SharedBufferReader::new(&shared_buffer);
let archive = Archive::new(reader);
spawn_unpack_snapshot_thread(
file_sender.clone(),
account_paths.clone(),
ledger_dir.clone(),
archive,
parallel_selector,
thread_index,
)
})
.collect()
}
/// Perform the common tasks when unarchiving a snapshot. Handles creating the temporary
/// directories, untaring, reading the version file, and then returning those fields plus the
/// unpacked append vec map.
/// rebuilt storage
fn unarchive_snapshot<P, Q>(
bank_snapshots_dir: P,
unpacked_snapshots_dir_prefix: &'static str,
@ -1146,6 +1235,7 @@ fn unarchive_snapshot<P, Q>(
account_paths: &[PathBuf],
archive_format: ArchiveFormat,
parallel_divisions: usize,
next_append_vec_id: Arc<AtomicU32>,
) -> Result<UnarchivedSnapshot>
where
P: AsRef<Path>,
@ -1155,24 +1245,36 @@ where
.prefix(unpacked_snapshots_dir_prefix)
.tempdir_in(bank_snapshots_dir)?;
let unpacked_snapshots_dir = unpack_dir.path().join("snapshots");
let unpacked_version_file = unpack_dir.path().join("version");
let mut measure_untar = Measure::start(measure_name);
let unpacked_append_vec_map = untar_snapshot_in(
snapshot_archive_path,
unpack_dir.path(),
account_paths,
let (file_sender, file_receiver) = crossbeam_channel::unbounded();
streaming_unarchive_snapshot(
file_sender,
account_paths.to_vec(),
unpack_dir.path().to_path_buf(),
snapshot_archive_path.as_ref().to_path_buf(),
archive_format,
parallel_divisions,
)?;
measure_untar.stop();
);
let num_rebuilder_threads = num_cpus::get_physical()
.saturating_sub(parallel_divisions)
.max(1);
let (storage, measure_untar) = measure!(
SnapshotStorageRebuilder::rebuild_storage(
file_receiver,
num_rebuilder_threads,
next_append_vec_id
),
measure_name
);
info!("{}", measure_untar);
let unpacked_version_file = unpack_dir.path().join("version");
let snapshot_version = snapshot_version_from_file(&unpacked_version_file)?;
Ok(UnarchivedSnapshot {
unpack_dir,
unpacked_append_vec_map,
storage,
unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion {
unpacked_snapshots_dir,
snapshot_version,
@ -1678,7 +1780,7 @@ fn rebuild_bank_from_snapshots(
&UnpackedSnapshotsDirAndVersion,
>,
account_paths: &[PathBuf],
unpacked_append_vec_map: UnpackedAppendVecMap,
storage_and_next_append_vec_id: StorageAndNextAppendVecId,
genesis_config: &GenesisConfig,
runtime_config: &RuntimeConfig,
debug_keys: Option<Arc<HashSet<Pubkey>>>,
@ -1727,7 +1829,7 @@ fn rebuild_bank_from_snapshots(
SerdeStyle::Newer,
snapshot_streams,
account_paths,
unpacked_append_vec_map,
storage_and_next_append_vec_id,
genesis_config,
runtime_config,
debug_keys,

View File

@ -0,0 +1,363 @@
//! Provides interfaces for rebuilding snapshot storages
use {
crate::{
accounts_db::{AccountStorageEntry, AccountStorageMap, AppendVecId, AtomicAppendVecId},
serde_snapshot::{
self, remap_and_reconstruct_single_storage, snapshot_storage_lengths_from_fields,
SerdeStyle, SerializedAppendVecId,
},
},
crossbeam_channel::{select, unbounded, Receiver, Sender},
dashmap::DashMap,
log::info,
rayon::{
iter::{IntoParallelIterator, ParallelIterator},
ThreadPool, ThreadPoolBuilder,
},
solana_sdk::clock::Slot,
std::{
collections::HashMap,
fs::File,
io::BufReader,
path::PathBuf,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::Instant,
},
};
/// Stores state for rebuilding snapshot storages
#[derive(Debug)]
pub struct SnapshotStorageRebuilder {
/// Receiver for unpacked snapshot storage files
file_receiver: Receiver<PathBuf>,
/// Number of threads to rebuild with
num_threads: usize,
/// Snapshot storage lengths - from the snapshot file
snapshot_storage_lengths: HashMap<Slot, HashMap<SerializedAppendVecId, usize>>,
/// Container for storing snapshot file paths
storage_paths: DashMap<Slot, Mutex<Vec<PathBuf>>>,
/// Container for storing rebuilt snapshot storages
storage: AccountStorageMap,
/// Tracks next append_vec_id
next_append_vec_id: Arc<AtomicAppendVecId>,
/// Tracker for number of processed slots
processed_slot_count: AtomicUsize,
/// Tracks the number of collisions in AppendVecId
num_collisions: AtomicUsize,
}
impl SnapshotStorageRebuilder {
/// Synchronously spawns threads to rebuild snapshot storages
pub fn rebuild_storage(
file_receiver: Receiver<PathBuf>,
num_threads: usize,
next_append_vec_id: Arc<AtomicAppendVecId>,
) -> AccountStorageMap {
let (snapshot_file_path, append_vec_files) = Self::get_snapshot_file(&file_receiver);
let snapshot_storage_lengths = Self::process_snapshot_file(snapshot_file_path).unwrap();
Self::spawn_rebuilder_threads(
file_receiver,
num_threads,
next_append_vec_id,
snapshot_storage_lengths,
append_vec_files,
)
}
/// Create the SnapshotStorageRebuilder for storing state during rebuilding
/// - pre-allocates data for storage paths
fn new(
file_receiver: Receiver<PathBuf>,
num_threads: usize,
next_append_vec_id: Arc<AtomicAppendVecId>,
snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
) -> Self {
let storage = DashMap::with_capacity(snapshot_storage_lengths.len());
let storage_paths: DashMap<_, _> = snapshot_storage_lengths
.iter()
.map(|(slot, storage_lengths)| {
(*slot, Mutex::new(Vec::with_capacity(storage_lengths.len())))
})
.collect();
Self {
file_receiver,
num_threads,
snapshot_storage_lengths,
storage_paths,
storage,
next_append_vec_id,
processed_slot_count: AtomicUsize::new(0),
num_collisions: AtomicUsize::new(0),
}
}
/// Waits for snapshot file
/// Due to parallel unpacking, we may receive some append_vec files before the snapshot file
/// This function will push append_vec files into a buffer until we receive the snapshot file
fn get_snapshot_file(file_receiver: &Receiver<PathBuf>) -> (PathBuf, Vec<PathBuf>) {
let mut append_vec_files = Vec::with_capacity(1024);
let snapshot_file_path = loop {
if let Ok(path) = file_receiver.recv() {
let filename = path.file_name().unwrap().to_str().unwrap();
match get_snapshot_file_kind(filename) {
Some(SnapshotFileKind::SnapshotFile) => {
break path;
}
Some(SnapshotFileKind::StorageFile) => {
append_vec_files.push(path);
}
None => {} // do nothing for other kinds of files
}
} else {
panic!("did not receive snapshot file from unpacking threads");
}
};
(snapshot_file_path, append_vec_files)
}
/// Process the snapshot file to get the size of each snapshot storage file
fn process_snapshot_file(
snapshot_file_path: PathBuf,
) -> Result<HashMap<Slot, HashMap<usize, usize>>, bincode::Error> {
let snapshot_file = File::open(snapshot_file_path).unwrap();
let mut snapshot_stream = BufReader::new(snapshot_file);
let (_bank_fields, accounts_fields) =
serde_snapshot::fields_from_stream(SerdeStyle::Newer, &mut snapshot_stream)?;
Ok(snapshot_storage_lengths_from_fields(&accounts_fields))
}
/// Spawn threads for processing buffered append_vec_files, and then received files
fn spawn_rebuilder_threads(
file_receiver: Receiver<PathBuf>,
num_threads: usize,
next_append_vec_id: Arc<AtomicAppendVecId>,
snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
append_vec_files: Vec<PathBuf>,
) -> AccountStorageMap {
let rebuilder = Arc::new(SnapshotStorageRebuilder::new(
file_receiver,
num_threads,
next_append_vec_id,
snapshot_storage_lengths,
));
let thread_pool = rebuilder.build_thread_pool();
// Synchronously process buffered append_vec_files
thread_pool.install(|| {
rebuilder.process_buffered_files(append_vec_files).unwrap();
});
// Asynchronously spawn threads to process received append_vec_files
let (exit_sender, exit_receiver) = unbounded();
for _ in 0..rebuilder.num_threads {
Self::spawn_receiver_thread(&thread_pool, exit_sender.clone(), rebuilder.clone());
}
drop(exit_sender); // drop otherwise loop below will never end
// wait for asynchronous threads to complete
rebuilder.wait_for_completion(exit_receiver);
Arc::try_unwrap(rebuilder).unwrap().storage
}
/// Processes buffered append_vec_files
fn process_buffered_files(&self, append_vec_files: Vec<PathBuf>) -> Result<(), std::io::Error> {
append_vec_files
.into_par_iter()
.map(|path| self.process_append_vec_file(path))
.collect::<Result<(), std::io::Error>>()
}
/// Spawn a single thread to process received append_vec_files
fn spawn_receiver_thread(
thread_pool: &ThreadPool,
exit_sender: Sender<()>,
rebuilder: Arc<SnapshotStorageRebuilder>,
) {
thread_pool.spawn(move || {
for path in rebuilder.file_receiver.iter() {
rebuilder.process_append_vec_file(path).unwrap();
}
exit_sender.send(()).unwrap();
})
}
/// Process an append_vec_file
fn process_append_vec_file(&self, path: PathBuf) -> Result<(), std::io::Error> {
let filename = path.file_name().unwrap().to_str().unwrap().to_owned();
if let Some(SnapshotFileKind::StorageFile) = get_snapshot_file_kind(&filename) {
let (slot, slot_complete) = self.insert_slot_storage_file(path, filename);
if slot_complete {
self.process_complete_slot(slot)?;
self.processed_slot_count.fetch_add(1, Ordering::AcqRel);
}
}
Ok(())
}
/// Inserts single storage file, returns the slot and if the slot has all of its storage entries
fn insert_slot_storage_file(&self, path: PathBuf, filename: String) -> (Slot, bool) {
let (slot, _) = get_slot_and_append_vec_id(&filename);
let slot_storage_count = self.insert_storage_file(&slot, path);
(
slot,
slot_storage_count == self.snapshot_storage_lengths.get(&slot).unwrap().len(),
)
}
/// Insert storage path into slot and return the number of storage files for the slot
fn insert_storage_file(&self, slot: &Slot, path: PathBuf) -> usize {
let slot_paths = self.storage_paths.get(slot).unwrap();
let mut lock = slot_paths.lock().unwrap();
lock.push(path);
lock.len()
}
/// Process a slot that has received all storage entries
fn process_complete_slot(&self, slot: Slot) -> Result<(), std::io::Error> {
let slot_storage_paths = self.storage_paths.get(&slot).unwrap();
let lock = slot_storage_paths.lock().unwrap();
let slot_stores = lock
.iter()
.map(|path| {
let filename = path.file_name().unwrap().to_str().unwrap();
let (_, old_append_vec_id) = get_slot_and_append_vec_id(filename);
let current_len = *self
.snapshot_storage_lengths
.get(&slot)
.unwrap()
.get(&(old_append_vec_id as usize))
.unwrap();
let storage_entry = remap_and_reconstruct_single_storage(
slot,
old_append_vec_id,
current_len,
path.as_path(),
&self.next_append_vec_id,
&self.num_collisions,
)?;
Ok((storage_entry.append_vec_id(), storage_entry))
})
.collect::<Result<HashMap<AppendVecId, Arc<AccountStorageEntry>>, std::io::Error>>()?;
let slot_entry = self.storage.entry(slot).or_default();
let mut storage_lock = slot_entry.write().unwrap();
*storage_lock = slot_stores;
Ok(())
}
/// Wait for the completion of the rebuilding threads
fn wait_for_completion(&self, exit_receiver: Receiver<()>) {
let num_slots = self.snapshot_storage_lengths.len();
let mut last_log_time = Instant::now();
loop {
select! {
recv(exit_receiver) -> maybe_thread_accounts_data_len => {
match maybe_thread_accounts_data_len {
Ok(_) => continue,
Err(_) => break,
}
}
default(std::time::Duration::from_millis(100)) => {
let now = Instant::now();
if now.duration_since(last_log_time).as_millis() >= 2000 {
let num_processed_slots = self.processed_slot_count.load(Ordering::Relaxed);
let num_collisions = self.num_collisions.load(Ordering::Relaxed);
info!("rebuilt storages for {num_processed_slots}/{num_slots} slots with {num_collisions} collisions");
last_log_time = now;
}
}
}
}
}
/// Builds thread pool to rebuild with
fn build_thread_pool(&self) -> ThreadPool {
ThreadPoolBuilder::default()
.num_threads(self.num_threads)
.build()
.unwrap()
}
}
/// Used to determine if a filename is structured like a snapshot file, storage file, or neither
#[derive(PartialEq, Debug)]
enum SnapshotFileKind {
SnapshotFile,
StorageFile,
}
/// Determines `SnapshotFileKind` for `filename` if any
fn get_snapshot_file_kind(filename: &str) -> Option<SnapshotFileKind> {
let mut periods = 0;
let mut saw_numbers = false;
for x in filename.chars() {
if !x.is_ascii_digit() {
if x == '.' {
if periods > 0 || !saw_numbers {
return None;
}
saw_numbers = false;
periods += 1;
} else {
return None;
}
} else {
saw_numbers = true;
}
}
match (periods, saw_numbers) {
(0, true) => Some(SnapshotFileKind::SnapshotFile),
(1, true) => Some(SnapshotFileKind::StorageFile),
(_, _) => None,
}
}
/// Get the slot and append vec id from the filename
fn get_slot_and_append_vec_id(filename: &str) -> (Slot, usize) {
let mut split = filename.split('.');
let slot = split.next().unwrap().parse().unwrap();
let append_vec_id = split.next().unwrap().parse().unwrap();
assert!(split.next().is_none());
(slot, append_vec_id)
}
#[cfg(test)]
mod tests {
use {super::*, crate::append_vec::AppendVec};
#[test]
fn test_get_snapshot_file_kind() {
assert_eq!(None, get_snapshot_file_kind("file.txt"));
assert_eq!(
Some(SnapshotFileKind::SnapshotFile),
get_snapshot_file_kind("1234")
);
assert_eq!(
Some(SnapshotFileKind::StorageFile),
get_snapshot_file_kind("1000.999")
);
}
#[test]
fn test_get_slot_and_append_vec_id() {
let expected_slot = 12345;
let expected_id = 9987;
let (slot, id) =
get_slot_and_append_vec_id(&AppendVec::file_name(expected_slot, expected_id));
assert_eq!(expected_slot, slot);
assert_eq!(expected_id, id);
}
}