Construct bank from snapshot dir (#30171)
* Construct a bank from a snapshot directory * Clean up. Remove archiving releated data structures * Fix auto checks * remove ArchiveFormat::None * fix merge error * remove incremental snapshot dir * Minor cleanup, remove unused functiond defs * remove pub fn bank_from_latest_snapshot_dir * rename bank_from_snapshot_dir to bank_from_snapshot * Clean up invalid comments * A few minor review changes * Removed insert_slot_storage_file * Add comment explain hardlink symlink * Skip the whole verify_snapshot_bank call for the from_dir case * Add bank.set_initial_accounts_hash_verification_completed() * address review issues: appendvec to append_vec, replace unwrap with expect, etc * AtomicAppendVecId, remove arc on bank etc * slice, CI error on &snapshot_version_path * measure_build_storage * move snapshot_from * remove measure_name from build_storage_from_snapshot_dir * remove from_dir specific next_append_vec_id logic * revert insert_slot_storage_file change * init next_append_vec_id to fix the substraction underflow * remove measure from build_storage_from_snapshot_dir * make measure name more specific * refactor status_cache deserialization into a function * remove reference to pass the ci check * track next appendvec id * verify that the next_append_vec_id tracking is correct * clean up usize * in build_storage_from_snapshot_dir remove expect * test max appendvecc id tracking with multiple banks in the test * cleared expect and unwrap in streaming_snapshot_dir_files * rebase cleanup * change to measure! * dereference arc in the right way
This commit is contained in:
parent
809041b151
commit
d69f60229d
|
@ -598,7 +598,7 @@ where
|
|||
Ok(bank)
|
||||
}
|
||||
|
||||
fn reconstruct_single_storage(
|
||||
pub(crate) fn reconstruct_single_storage(
|
||||
slot: &Slot,
|
||||
append_vec_path: &Path,
|
||||
current_len: usize,
|
||||
|
|
|
@ -249,6 +249,17 @@ pub enum BankSnapshotType {
|
|||
Post,
|
||||
}
|
||||
|
||||
/// When constructing a bank a snapshot, traditionally the snapshot was from a snapshot archive. Now,
|
||||
/// the snapshot can be from a snapshot directory, or from a snapshot archive. This is the flag to
|
||||
/// indicate which.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum SnapshotFrom {
|
||||
/// Build from the snapshot archive
|
||||
Archive,
|
||||
/// Build directly from the bank snapshot directory
|
||||
Dir,
|
||||
}
|
||||
|
||||
/// Helper type when rebuilding from snapshots. Designed to handle when rebuilding from just a
|
||||
/// full snapshot, or from both a full snapshot and an incremental snapshot.
|
||||
#[derive(Debug)]
|
||||
|
@ -290,6 +301,9 @@ pub enum SnapshotError {
|
|||
#[error("serialization error: {0}")]
|
||||
Serialize(#[from] bincode::Error),
|
||||
|
||||
#[error("crossbeam send error: {0}")]
|
||||
CrossbeamSend(#[from] crossbeam_channel::SendError<PathBuf>),
|
||||
|
||||
#[error("archive generation failure {0}")]
|
||||
ArchiveGenerationFailure(ExitStatus),
|
||||
|
||||
|
@ -337,7 +351,11 @@ pub enum SnapshotError {
|
|||
|
||||
#[error("no valid snapshot dir found under {}", .0.display())]
|
||||
NoSnapshotSlotDir(PathBuf),
|
||||
|
||||
#[error("snapshot dir account paths mismatching")]
|
||||
AccountPathsMismatch,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum SnapshotNewFromDirError {
|
||||
#[error("I/O error: {0}")]
|
||||
|
@ -1324,7 +1342,7 @@ fn verify_and_unarchive_snapshots(
|
|||
|
||||
let parallel_divisions = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT);
|
||||
|
||||
let next_append_vec_id = Arc::new(AtomicU32::new(0));
|
||||
let next_append_vec_id = Arc::new(AtomicAppendVecId::new(0));
|
||||
let unarchived_full_snapshot = unarchive_snapshot(
|
||||
&bank_snapshots_dir,
|
||||
TMP_SNAPSHOT_ARCHIVE_PREFIX,
|
||||
|
@ -1443,7 +1461,7 @@ pub fn bank_from_snapshot_archives(
|
|||
};
|
||||
|
||||
let mut measure_rebuild = Measure::start("rebuild bank from snapshots");
|
||||
let bank = rebuild_bank_from_snapshots(
|
||||
let bank = rebuild_bank_from_unarchived_snapshots(
|
||||
&unarchived_full_snapshot.unpacked_snapshots_dir_and_version,
|
||||
unarchived_incremental_snapshot
|
||||
.as_ref()
|
||||
|
@ -1617,6 +1635,70 @@ pub fn bank_from_latest_snapshot_archives(
|
|||
))
|
||||
}
|
||||
|
||||
/// Build bank from a snapshot (a snapshot directory, not a snapshot archive)
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn bank_from_snapshot_dir(
|
||||
account_paths: &[PathBuf],
|
||||
bank_snapshot: &BankSnapshotInfo,
|
||||
genesis_config: &GenesisConfig,
|
||||
runtime_config: &RuntimeConfig,
|
||||
debug_keys: Option<Arc<HashSet<Pubkey>>>,
|
||||
additional_builtins: Option<&Builtins>,
|
||||
account_secondary_indexes: AccountSecondaryIndexes,
|
||||
limit_load_slot_count_from_snapshot: Option<usize>,
|
||||
shrink_ratio: AccountShrinkThreshold,
|
||||
verify_index: bool,
|
||||
accounts_db_config: Option<AccountsDbConfig>,
|
||||
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
) -> Result<(Bank, BankFromArchiveTimings)> {
|
||||
let next_append_vec_id = Arc::new(AtomicAppendVecId::new(0));
|
||||
|
||||
let (storage, measure_build_storage) = measure!(
|
||||
build_storage_from_snapshot_dir(bank_snapshot, account_paths, next_append_vec_id.clone())?,
|
||||
"build storage from snapshot dir"
|
||||
);
|
||||
info!("{}", measure_build_storage);
|
||||
|
||||
let next_append_vec_id =
|
||||
Arc::try_unwrap(next_append_vec_id).expect("this is the only strong reference");
|
||||
let storage_and_next_append_vec_id = StorageAndNextAppendVecId {
|
||||
storage,
|
||||
next_append_vec_id,
|
||||
};
|
||||
let mut measure_rebuild = Measure::start("rebuild bank from snapshots");
|
||||
let bank = rebuild_bank_from_snapshot(
|
||||
bank_snapshot,
|
||||
account_paths,
|
||||
storage_and_next_append_vec_id,
|
||||
genesis_config,
|
||||
runtime_config,
|
||||
debug_keys,
|
||||
additional_builtins,
|
||||
account_secondary_indexes,
|
||||
limit_load_slot_count_from_snapshot,
|
||||
shrink_ratio,
|
||||
verify_index,
|
||||
accounts_db_config,
|
||||
accounts_update_notifier,
|
||||
exit,
|
||||
)?;
|
||||
measure_rebuild.stop();
|
||||
info!("{}", measure_rebuild);
|
||||
|
||||
// Skip bank.verify_snapshot_bank. Subsequent snapshot requests/accounts hash verification requests
|
||||
// will calculate and check the accounts hash, so we will still have safety/correctness there.
|
||||
bank.set_initial_accounts_hash_verification_completed();
|
||||
|
||||
let timings = BankFromArchiveTimings {
|
||||
rebuild_bank_from_snapshots_us: measure_rebuild.as_us(),
|
||||
full_snapshot_untar_us: measure_build_storage.as_us(),
|
||||
incremental_snapshot_untar_us: 0,
|
||||
verify_snapshot_bank_us: 0,
|
||||
};
|
||||
Ok((bank, timings))
|
||||
}
|
||||
|
||||
/// Check to make sure the deserialized bank's slot and hash matches the snapshot archive's slot
|
||||
/// and hash
|
||||
fn verify_bank_against_expected_slot_hash(
|
||||
|
@ -1748,7 +1830,7 @@ fn unarchive_snapshot<P, Q>(
|
|||
account_paths: &[PathBuf],
|
||||
archive_format: ArchiveFormat,
|
||||
parallel_divisions: usize,
|
||||
next_append_vec_id: Arc<AtomicU32>,
|
||||
next_append_vec_id: Arc<AtomicAppendVecId>,
|
||||
) -> Result<UnarchivedSnapshot>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
|
@ -1776,7 +1858,8 @@ where
|
|||
SnapshotStorageRebuilder::rebuild_storage(
|
||||
file_receiver,
|
||||
num_rebuilder_threads,
|
||||
next_append_vec_id
|
||||
next_append_vec_id,
|
||||
SnapshotFrom::Archive,
|
||||
)?,
|
||||
measure_name
|
||||
);
|
||||
|
@ -1799,6 +1882,99 @@ where
|
|||
})
|
||||
}
|
||||
|
||||
/// Streams snapshot dir files across channel
|
||||
/// Follow the flow of streaming_unarchive_snapshot(), but handle the from_dir case.
|
||||
fn streaming_snapshot_dir_files(
|
||||
file_sender: Sender<PathBuf>,
|
||||
snapshot_file_path: impl Into<PathBuf>,
|
||||
snapshot_version_path: impl Into<PathBuf>,
|
||||
account_paths: &[PathBuf],
|
||||
) -> Result<()> {
|
||||
file_sender.send(snapshot_file_path.into())?;
|
||||
file_sender.send(snapshot_version_path.into())?;
|
||||
|
||||
for account_path in account_paths {
|
||||
for file in fs::read_dir(account_path)? {
|
||||
file_sender.send(file?.path())?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform the common tasks when deserialize a snapshot. Handles reading snapshot file, reading the version file,
|
||||
/// and then returning those fields plus the rebuilt storage
|
||||
fn build_storage_from_snapshot_dir(
|
||||
snapshot_info: &BankSnapshotInfo,
|
||||
account_paths: &[PathBuf],
|
||||
next_append_vec_id: Arc<AtomicAppendVecId>,
|
||||
) -> Result<AccountStorageMap> {
|
||||
let bank_snapshot_dir = &snapshot_info.snapshot_dir;
|
||||
let snapshot_file_path = &snapshot_info.snapshot_path();
|
||||
let snapshot_version_path = bank_snapshot_dir.join("version");
|
||||
let (file_sender, file_receiver) = crossbeam_channel::unbounded();
|
||||
|
||||
let accounts_hardlinks = bank_snapshot_dir.join("accounts_hardlinks");
|
||||
|
||||
let account_paths_set: HashSet<_> = HashSet::from_iter(account_paths.iter());
|
||||
|
||||
for dir_symlink in fs::read_dir(accounts_hardlinks)? {
|
||||
// The symlink point to <account_path>/snapshot/<slot> which contain the account files hardlinks
|
||||
// The corresponding run path should be <account_path>/run/
|
||||
let snapshot_account_path = fs::read_link(dir_symlink?.path())?;
|
||||
let account_run_path = snapshot_account_path
|
||||
.parent()
|
||||
.ok_or_else(|| SnapshotError::InvalidAccountPath(snapshot_account_path.clone()))?
|
||||
.parent()
|
||||
.ok_or_else(|| SnapshotError::InvalidAccountPath(snapshot_account_path.clone()))?
|
||||
.join("run");
|
||||
if !account_paths_set.contains(&account_run_path) {
|
||||
// The appendvec from the bank snapshot stoarge does not match any of the provided account_paths set.
|
||||
// The accout paths have changed so the snapshot is no longer usable.
|
||||
return Err(SnapshotError::AccountPathsMismatch);
|
||||
}
|
||||
// Generate hard-links to make the account files available in the main accounts/, and let the new appendvec
|
||||
// paths be in accounts/
|
||||
for file in fs::read_dir(snapshot_account_path)? {
|
||||
let file_path = file?.path().to_path_buf();
|
||||
let file_name = file_path
|
||||
.file_name()
|
||||
.ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.clone()))?;
|
||||
let dest_path = account_run_path.clone().join(file_name);
|
||||
fs::hard_link(&file_path, &dest_path).map_err(|e| {
|
||||
let err_msg = format!(
|
||||
"Error: {}. Failed to hard-link {} to {}",
|
||||
e,
|
||||
file_path.display(),
|
||||
dest_path.display()
|
||||
);
|
||||
SnapshotError::Io(IoError::new(ErrorKind::Other, err_msg))
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
streaming_snapshot_dir_files(
|
||||
file_sender,
|
||||
snapshot_file_path,
|
||||
snapshot_version_path,
|
||||
account_paths,
|
||||
)?;
|
||||
|
||||
let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1);
|
||||
let version_and_storages = SnapshotStorageRebuilder::rebuild_storage(
|
||||
file_receiver,
|
||||
num_rebuilder_threads,
|
||||
next_append_vec_id,
|
||||
SnapshotFrom::Dir,
|
||||
)?;
|
||||
|
||||
let RebuiltSnapshotStorage {
|
||||
snapshot_version: _,
|
||||
storage,
|
||||
} = version_and_storages;
|
||||
Ok(storage)
|
||||
}
|
||||
|
||||
/// Reads the `snapshot_version` from a file. Before opening the file, its size
|
||||
/// is compared to `MAX_SNAPSHOT_VERSION_FILE_SIZE`. If the size exceeds this
|
||||
/// threshold, it is not opened and an error is returned.
|
||||
|
@ -2279,8 +2455,23 @@ fn bank_fields_from_snapshots(
|
|||
})
|
||||
}
|
||||
|
||||
fn deserialize_status_cache(status_cache_path: &Path) -> Result<Vec<BankSlotDelta>> {
|
||||
deserialize_snapshot_data_file(status_cache_path, |stream| {
|
||||
info!(
|
||||
"Rebuilding status cache from {}",
|
||||
status_cache_path.display()
|
||||
);
|
||||
let slot_delta: Vec<BankSlotDelta> = bincode::options()
|
||||
.with_limit(MAX_SNAPSHOT_DATA_FILE_SIZE)
|
||||
.with_fixint_encoding()
|
||||
.allow_trailing_bytes()
|
||||
.deserialize_from(stream)?;
|
||||
Ok(slot_delta)
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn rebuild_bank_from_snapshots(
|
||||
fn rebuild_bank_from_unarchived_snapshots(
|
||||
full_snapshot_unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion,
|
||||
incremental_snapshot_unpacked_snapshots_dir_and_version: Option<
|
||||
&UnpackedSnapshotsDirAndVersion,
|
||||
|
@ -2315,7 +2506,7 @@ fn rebuild_bank_from_snapshots(
|
|||
(None, None)
|
||||
};
|
||||
info!(
|
||||
"Loading bank from full snapshot {} and incremental snapshot {:?}",
|
||||
"Rebuiding bank from full snapshot {} and incremental snapshot {:?}",
|
||||
full_snapshot_root_paths.snapshot_path().display(),
|
||||
incremental_snapshot_root_paths
|
||||
.as_ref()
|
||||
|
@ -2368,24 +2559,73 @@ fn rebuild_bank_from_snapshots(
|
|||
},
|
||||
)
|
||||
.join(SNAPSHOT_STATUS_CACHE_FILENAME);
|
||||
let slot_deltas = deserialize_snapshot_data_file(&status_cache_path, |stream| {
|
||||
info!(
|
||||
"Rebuilding status cache from {}",
|
||||
status_cache_path.display()
|
||||
);
|
||||
let slot_deltas: Vec<BankSlotDelta> = bincode::options()
|
||||
.with_limit(MAX_SNAPSHOT_DATA_FILE_SIZE)
|
||||
.with_fixint_encoding()
|
||||
.allow_trailing_bytes()
|
||||
.deserialize_from(stream)?;
|
||||
Ok(slot_deltas)
|
||||
})?;
|
||||
let slot_deltas = deserialize_status_cache(&status_cache_path)?;
|
||||
|
||||
verify_slot_deltas(slot_deltas.as_slice(), &bank)?;
|
||||
|
||||
bank.status_cache.write().unwrap().append(&slot_deltas);
|
||||
|
||||
info!("Loaded bank for slot: {}", bank.slot());
|
||||
info!("Rebuilt bank for slot: {}", bank.slot());
|
||||
Ok(bank)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn rebuild_bank_from_snapshot(
|
||||
bank_snapshot: &BankSnapshotInfo,
|
||||
account_paths: &[PathBuf],
|
||||
storage_and_next_append_vec_id: StorageAndNextAppendVecId,
|
||||
genesis_config: &GenesisConfig,
|
||||
runtime_config: &RuntimeConfig,
|
||||
debug_keys: Option<Arc<HashSet<Pubkey>>>,
|
||||
additional_builtins: Option<&Builtins>,
|
||||
account_secondary_indexes: AccountSecondaryIndexes,
|
||||
limit_load_slot_count_from_snapshot: Option<usize>,
|
||||
shrink_ratio: AccountShrinkThreshold,
|
||||
verify_index: bool,
|
||||
accounts_db_config: Option<AccountsDbConfig>,
|
||||
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
) -> Result<Bank> {
|
||||
info!(
|
||||
"Rebuilding bank from snapshot {}",
|
||||
bank_snapshot.snapshot_dir.display(),
|
||||
);
|
||||
|
||||
let snapshot_root_paths = SnapshotRootPaths {
|
||||
full_snapshot_root_file_path: bank_snapshot.snapshot_path(),
|
||||
incremental_snapshot_root_file_path: None,
|
||||
};
|
||||
|
||||
let bank = deserialize_snapshot_data_files(&snapshot_root_paths, |snapshot_streams| {
|
||||
Ok(bank_from_streams(
|
||||
SerdeStyle::Newer,
|
||||
snapshot_streams,
|
||||
account_paths,
|
||||
storage_and_next_append_vec_id,
|
||||
genesis_config,
|
||||
runtime_config,
|
||||
debug_keys,
|
||||
additional_builtins,
|
||||
account_secondary_indexes,
|
||||
limit_load_slot_count_from_snapshot,
|
||||
shrink_ratio,
|
||||
verify_index,
|
||||
accounts_db_config,
|
||||
accounts_update_notifier,
|
||||
exit,
|
||||
)?)
|
||||
})?;
|
||||
|
||||
let status_cache_path = bank_snapshot
|
||||
.snapshot_dir
|
||||
.join(SNAPSHOT_STATUS_CACHE_FILENAME);
|
||||
let slot_deltas = deserialize_status_cache(&status_cache_path)?;
|
||||
|
||||
verify_slot_deltas(slot_deltas.as_slice(), &bank)?;
|
||||
|
||||
bank.status_cache.write().unwrap().append(&slot_deltas);
|
||||
|
||||
info!("Rebuilt bank for slot: {}", bank.slot());
|
||||
Ok(bank)
|
||||
}
|
||||
|
||||
|
@ -2920,6 +3160,7 @@ mod tests {
|
|||
accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING,
|
||||
accounts_hash::{CalcAccountsHashConfig, HashStats},
|
||||
genesis_utils,
|
||||
snapshot_utils::snapshot_storage_rebuilder::get_slot_and_append_vec_id,
|
||||
sorted_storages::SortedStorages,
|
||||
status_cache::Status,
|
||||
},
|
||||
|
@ -2933,10 +3174,14 @@ mod tests {
|
|||
system_transaction,
|
||||
transaction::SanitizedTransaction,
|
||||
},
|
||||
std::{convert::TryFrom, mem::size_of, os::unix::fs::PermissionsExt, sync::Arc},
|
||||
std::{
|
||||
convert::TryFrom,
|
||||
mem::size_of,
|
||||
os::unix::fs::PermissionsExt,
|
||||
sync::{atomic::Ordering, Arc},
|
||||
},
|
||||
tempfile::NamedTempFile,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_serialize_snapshot_data_file_under_limit() {
|
||||
let temp_dir = tempfile::TempDir::new().unwrap();
|
||||
|
@ -5103,4 +5348,78 @@ mod tests {
|
|||
.unwrap();
|
||||
assert_eq!(other_incremental_accounts_hash, incremental_accounts_hash);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_from_snapshot_dir() {
|
||||
solana_logger::setup();
|
||||
let genesis_config = GenesisConfig::default();
|
||||
let mut bank = Arc::new(Bank::new_for_tests(&genesis_config));
|
||||
|
||||
let tmp_dir = tempfile::TempDir::new().unwrap();
|
||||
let bank_snapshots_dir = tmp_dir.path();
|
||||
let collecter_id = Pubkey::new_unique();
|
||||
let snapshot_version = SnapshotVersion::default();
|
||||
|
||||
for _ in 0..3 {
|
||||
// prepare the bank
|
||||
bank = Arc::new(Bank::new_from_parent(&bank, &collecter_id, bank.slot() + 1));
|
||||
bank.fill_bank_with_ticks_for_tests();
|
||||
bank.squash();
|
||||
bank.force_flush_accounts_cache();
|
||||
|
||||
// generate the bank snapshot directory for slot+1
|
||||
let snapshot_storages = bank.get_snapshot_storages(None);
|
||||
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
|
||||
add_bank_snapshot(
|
||||
bank_snapshots_dir,
|
||||
&bank,
|
||||
&snapshot_storages,
|
||||
snapshot_version,
|
||||
slot_deltas,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let bank_snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap();
|
||||
let account_paths = &bank.rc.accounts.accounts_db.paths;
|
||||
|
||||
// Clear the contents of the account paths run directories. When constructing the bank, the appendvec
|
||||
// files will be extracted from the snapshot hardlink directories into these run/ directories.
|
||||
for path in account_paths {
|
||||
delete_contents_of_path(path);
|
||||
}
|
||||
|
||||
let (bank_constructed, ..) = bank_from_snapshot_dir(
|
||||
account_paths,
|
||||
&bank_snapshot,
|
||||
&genesis_config,
|
||||
&RuntimeConfig::default(),
|
||||
None,
|
||||
None,
|
||||
AccountSecondaryIndexes::default(),
|
||||
None,
|
||||
AccountShrinkThreshold::default(),
|
||||
false,
|
||||
Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
|
||||
None,
|
||||
&Arc::default(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
bank_constructed.wait_for_initial_accounts_hash_verification_completed_for_tests();
|
||||
assert_eq!(bank_constructed, *bank);
|
||||
|
||||
// Verify that the next_append_vec_id tracking is correct
|
||||
let mut max_id = 0;
|
||||
for path in account_paths {
|
||||
fs::read_dir(path).unwrap().for_each(|entry| {
|
||||
let path = entry.unwrap().path();
|
||||
let filename = path.file_name().unwrap();
|
||||
let (_slot, append_vec_id) = get_slot_and_append_vec_id(filename.to_str().unwrap());
|
||||
max_id = std::cmp::max(max_id, append_vec_id);
|
||||
});
|
||||
}
|
||||
let next_id = bank.accounts().accounts_db.next_id.load(Ordering::Relaxed) as usize;
|
||||
assert_eq!(max_id, next_id - 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,16 @@
|
|||
//! Provides interfaces for rebuilding snapshot storages
|
||||
|
||||
use {
|
||||
super::{get_io_error, snapshot_version_from_file, SnapshotError, SnapshotVersion},
|
||||
super::{
|
||||
get_io_error, snapshot_version_from_file, SnapshotError, SnapshotFrom, SnapshotVersion,
|
||||
},
|
||||
crate::{
|
||||
account_storage::{AccountStorageMap, AccountStorageReference},
|
||||
accounts_db::{AccountStorageEntry, AccountsDb, AppendVecId, AtomicAppendVecId},
|
||||
append_vec::AppendVec,
|
||||
serde_snapshot::{
|
||||
self, remap_and_reconstruct_single_storage, snapshot_storage_lengths_from_fields,
|
||||
SerdeStyle, SerializedAppendVecId,
|
||||
self, reconstruct_single_storage, remap_and_reconstruct_single_storage,
|
||||
snapshot_storage_lengths_from_fields, SerdeStyle, SerializedAppendVecId,
|
||||
},
|
||||
},
|
||||
crossbeam_channel::{select, unbounded, Receiver, Sender},
|
||||
|
@ -67,6 +69,8 @@ pub(crate) struct SnapshotStorageRebuilder {
|
|||
processed_slot_count: AtomicUsize,
|
||||
/// Tracks the number of collisions in AppendVecId
|
||||
num_collisions: AtomicUsize,
|
||||
/// Rebuild from the snapshot files or archives
|
||||
snapshot_from: SnapshotFrom,
|
||||
}
|
||||
|
||||
impl SnapshotStorageRebuilder {
|
||||
|
@ -75,6 +79,7 @@ impl SnapshotStorageRebuilder {
|
|||
file_receiver: Receiver<PathBuf>,
|
||||
num_threads: usize,
|
||||
next_append_vec_id: Arc<AtomicAppendVecId>,
|
||||
snapshot_from: SnapshotFrom,
|
||||
) -> Result<RebuiltSnapshotStorage, SnapshotError> {
|
||||
let (snapshot_version_path, snapshot_file_path, append_vec_files) =
|
||||
Self::get_version_and_snapshot_files(&file_receiver);
|
||||
|
@ -93,6 +98,7 @@ impl SnapshotStorageRebuilder {
|
|||
next_append_vec_id,
|
||||
snapshot_storage_lengths,
|
||||
append_vec_files,
|
||||
snapshot_from,
|
||||
)
|
||||
.map_err(|err| SnapshotError::IoWithSource(err, "rebuild snapshot storages"))?;
|
||||
|
||||
|
@ -109,6 +115,7 @@ impl SnapshotStorageRebuilder {
|
|||
num_threads: usize,
|
||||
next_append_vec_id: Arc<AtomicAppendVecId>,
|
||||
snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
|
||||
snapshot_from: SnapshotFrom,
|
||||
) -> Self {
|
||||
let storage = DashMap::with_capacity(snapshot_storage_lengths.len());
|
||||
let storage_paths: DashMap<_, _> = snapshot_storage_lengths
|
||||
|
@ -126,6 +133,7 @@ impl SnapshotStorageRebuilder {
|
|||
next_append_vec_id,
|
||||
processed_slot_count: AtomicUsize::new(0),
|
||||
num_collisions: AtomicUsize::new(0),
|
||||
snapshot_from,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -198,18 +206,22 @@ impl SnapshotStorageRebuilder {
|
|||
next_append_vec_id: Arc<AtomicAppendVecId>,
|
||||
snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
|
||||
append_vec_files: Vec<PathBuf>,
|
||||
snapshot_from: SnapshotFrom,
|
||||
) -> Result<AccountStorageMap, std::io::Error> {
|
||||
let rebuilder = Arc::new(SnapshotStorageRebuilder::new(
|
||||
file_receiver,
|
||||
num_threads,
|
||||
next_append_vec_id,
|
||||
snapshot_storage_lengths,
|
||||
snapshot_from,
|
||||
));
|
||||
|
||||
let thread_pool = rebuilder.build_thread_pool();
|
||||
|
||||
// Synchronously process buffered append_vec_files
|
||||
thread_pool.install(|| rebuilder.process_buffered_files(append_vec_files))?;
|
||||
if snapshot_from == SnapshotFrom::Archive {
|
||||
// Synchronously process buffered append_vec_files
|
||||
thread_pool.install(|| rebuilder.process_buffered_files(append_vec_files))?;
|
||||
}
|
||||
|
||||
// Asynchronously spawn threads to process received append_vec_files
|
||||
let (exit_sender, exit_receiver) = unbounded();
|
||||
|
@ -260,8 +272,18 @@ impl SnapshotStorageRebuilder {
|
|||
fn process_append_vec_file(&self, path: PathBuf) -> Result<(), std::io::Error> {
|
||||
let filename = path.file_name().unwrap().to_str().unwrap().to_owned();
|
||||
if let Some(SnapshotFileKind::Storage) = get_snapshot_file_kind(&filename) {
|
||||
let (slot, slot_complete) = self.insert_slot_storage_file(path, filename);
|
||||
if slot_complete {
|
||||
let (slot, append_vec_id) = get_slot_and_append_vec_id(&filename);
|
||||
if self.snapshot_from == SnapshotFrom::Dir {
|
||||
// Keep track of the highest append_vec_id in the system, so the future append_vecs
|
||||
// can be assigned to unique IDs. This is only needed when loading from a snapshot
|
||||
// dir. When loading from a snapshot archive, the max of the appendvec IDs is
|
||||
// updated in remap_append_vec_file(), which is not in the from_dir route.
|
||||
self.next_append_vec_id
|
||||
.fetch_max((append_vec_id + 1) as AppendVecId, Ordering::Relaxed);
|
||||
}
|
||||
let slot_storage_count = self.insert_storage_file(&slot, path);
|
||||
if slot_storage_count == self.snapshot_storage_lengths.get(&slot).unwrap().len() {
|
||||
// slot_complete
|
||||
self.process_complete_slot(slot)?;
|
||||
self.processed_slot_count.fetch_add(1, Ordering::AcqRel);
|
||||
}
|
||||
|
@ -269,17 +291,6 @@ impl SnapshotStorageRebuilder {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Inserts single storage file, returns the slot and if the slot has all of its storage entries
|
||||
fn insert_slot_storage_file(&self, path: PathBuf, filename: String) -> (Slot, bool) {
|
||||
let (slot, _) = get_slot_and_append_vec_id(&filename);
|
||||
let slot_storage_count = self.insert_storage_file(&slot, path);
|
||||
|
||||
(
|
||||
slot,
|
||||
slot_storage_count == self.snapshot_storage_lengths.get(&slot).unwrap().len(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Insert storage path into slot and return the number of storage files for the slot
|
||||
fn insert_storage_file(&self, slot: &Slot, path: PathBuf) -> usize {
|
||||
let slot_paths = self.storage_paths.get(slot).unwrap();
|
||||
|
@ -305,14 +316,22 @@ impl SnapshotStorageRebuilder {
|
|||
.get(&old_append_vec_id)
|
||||
.unwrap();
|
||||
|
||||
let storage_entry = remap_and_reconstruct_single_storage(
|
||||
slot,
|
||||
old_append_vec_id,
|
||||
current_len,
|
||||
path.as_path(),
|
||||
&self.next_append_vec_id,
|
||||
&self.num_collisions,
|
||||
)?;
|
||||
let storage_entry = match &self.snapshot_from {
|
||||
SnapshotFrom::Archive => remap_and_reconstruct_single_storage(
|
||||
slot,
|
||||
old_append_vec_id,
|
||||
current_len,
|
||||
path.as_path(),
|
||||
&self.next_append_vec_id,
|
||||
&self.num_collisions,
|
||||
)?,
|
||||
SnapshotFrom::Dir => reconstruct_single_storage(
|
||||
&slot,
|
||||
path.as_path(),
|
||||
current_len,
|
||||
old_append_vec_id as AppendVecId,
|
||||
)?,
|
||||
};
|
||||
|
||||
Ok((storage_entry.append_vec_id(), storage_entry))
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue