Add accounts hard-link files into the bank snapshot directory (#29496)
* Add accounts hard-link files into the bank snapshot directory * Small adjustments and fixes. * Address some of the review issues * Fix compilation issues * Change the latest slot snapshot storage from VecDeque to Option * IoWithSourceAndFile and expanded comments on accounts * last_slot_snapshot_storages in return value * Update comments following the review input * rename dir_accounts_hard_links to hard_link_path * Add dir_full_state flag for add_bank_snapshot * Let appendvec files hardlinking work with multiple accounts paths across multiple partitions * Fixes for rebasing * fix tests which generates account_path without adding run/ * rebasing fixes * fix account path test failures * fix test test_concurrent_snapshot_packaging * review comments. renamed the path setup function * Addressed most of the review comments * update with more review comments * handle error from create_accounts_run_and_snapshot_dirs * fix rebasing duplicate * minor accounts_dir path cleanup * minor cleanup, remove commented code * misc review comments * build error fix * Fix test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_startup * fix build error on MAX_BANK_SNAPSHOTS_TO_RETAIN * rebase fix, update hardlink filename * minor comment spelling fix * rebasing fixes * fix rebase issues; with_extension * comments changes for review * misc minor review issues * bank.fill_bank_with_ticks_for_tests * error handling on appendvec path * fix use_jit * minor comments refining * Remove type AccountStorages * get_account_path_from_appendvec_path return changed to Option * removed appendvec_path.to_path_buf in create_accounts_run_and_snapshot_dirs * add test_get_snapshot_accounts_hardlink_dir * update last_snapshot_storages comment * update last_snapshot_storages comment * symlink map_err * simplify test_get_snapshot_accounts_hardlink_dir with fake paths * log last_snapshot_storages at the end of the loop
This commit is contained in:
parent
eede50c868
commit
4909267c88
|
@ -6,6 +6,7 @@ use {
|
||||||
fs_extra::dir::CopyOptions,
|
fs_extra::dir::CopyOptions,
|
||||||
itertools::Itertools,
|
itertools::Itertools,
|
||||||
log::{info, trace},
|
log::{info, trace},
|
||||||
|
snapshot_utils::MAX_BANK_SNAPSHOTS_TO_RETAIN,
|
||||||
solana_core::{
|
solana_core::{
|
||||||
accounts_hash_verifier::AccountsHashVerifier,
|
accounts_hash_verifier::AccountsHashVerifier,
|
||||||
snapshot_packager_service::SnapshotPackagerService,
|
snapshot_packager_service::SnapshotPackagerService,
|
||||||
|
@ -495,7 +496,7 @@ fn test_concurrent_snapshot_packaging(
|
||||||
|
|
||||||
// Purge all the outdated snapshots, including the ones needed to generate the package
|
// Purge all the outdated snapshots, including the ones needed to generate the package
|
||||||
// currently sitting in the channel
|
// currently sitting in the channel
|
||||||
snapshot_utils::purge_old_bank_snapshots(bank_snapshots_dir);
|
snapshot_utils::purge_old_bank_snapshots(bank_snapshots_dir, MAX_BANK_SNAPSHOTS_TO_RETAIN);
|
||||||
|
|
||||||
let mut bank_snapshots = snapshot_utils::get_bank_snapshots_pre(bank_snapshots_dir);
|
let mut bank_snapshots = snapshot_utils::get_bank_snapshots_pre(bank_snapshots_dir);
|
||||||
bank_snapshots.sort_unstable();
|
bank_snapshots.sort_unstable();
|
||||||
|
|
|
@ -1093,7 +1093,15 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st
|
||||||
info!(
|
info!(
|
||||||
"Restarting the validator with full snapshot {validator_full_snapshot_slot_at_startup}..."
|
"Restarting the validator with full snapshot {validator_full_snapshot_slot_at_startup}..."
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Stop the test validator
|
||||||
let validator_info = cluster.exit_node(&validator_identity.pubkey());
|
let validator_info = cluster.exit_node(&validator_identity.pubkey());
|
||||||
|
|
||||||
|
// To restart, it is not enough to remove the old bank snapshot directories under snapshot/.
|
||||||
|
// The old hardlinks under <account_path>/snapshot/<slot> should also be removed.
|
||||||
|
// The purge call covers all of them.
|
||||||
|
snapshot_utils::purge_old_bank_snapshots(validator_snapshot_test_config.bank_snapshots_dir, 0);
|
||||||
|
|
||||||
cluster.restart_node(
|
cluster.restart_node(
|
||||||
&validator_identity.pubkey(),
|
&validator_identity.pubkey(),
|
||||||
validator_info,
|
validator_info,
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
mod stats;
|
mod stats;
|
||||||
use {
|
use {
|
||||||
crate::{
|
crate::{
|
||||||
accounts_db::CalcAccountsHashDataSource,
|
accounts_db::{AccountStorageEntry, CalcAccountsHashDataSource},
|
||||||
accounts_hash::CalcAccountsHashConfig,
|
accounts_hash::CalcAccountsHashConfig,
|
||||||
bank::{Bank, BankSlotDelta, DropCallback},
|
bank::{Bank, BankSlotDelta, DropCallback},
|
||||||
bank_forks::BankForks,
|
bank_forks::BankForks,
|
||||||
|
@ -16,6 +16,7 @@ use {
|
||||||
crossbeam_channel::{Receiver, SendError, Sender},
|
crossbeam_channel::{Receiver, SendError, Sender},
|
||||||
log::*,
|
log::*,
|
||||||
rand::{thread_rng, Rng},
|
rand::{thread_rng, Rng},
|
||||||
|
snapshot_utils::MAX_BANK_SNAPSHOTS_TO_RETAIN,
|
||||||
solana_measure::measure::Measure,
|
solana_measure::measure::Measure,
|
||||||
solana_sdk::clock::{BankId, Slot},
|
solana_sdk::clock::{BankId, Slot},
|
||||||
stats::StatsManager,
|
stats::StatsManager,
|
||||||
|
@ -142,13 +143,14 @@ pub struct SnapshotRequestHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SnapshotRequestHandler {
|
impl SnapshotRequestHandler {
|
||||||
// Returns the latest requested snapshot slot, if one exists
|
// Returns the latest requested snapshot block height and storages
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
pub fn handle_snapshot_requests(
|
pub fn handle_snapshot_requests(
|
||||||
&self,
|
&self,
|
||||||
test_hash_calculation: bool,
|
test_hash_calculation: bool,
|
||||||
non_snapshot_time_us: u128,
|
non_snapshot_time_us: u128,
|
||||||
last_full_snapshot_slot: &mut Option<Slot>,
|
last_full_snapshot_slot: &mut Option<Slot>,
|
||||||
) -> Option<Result<u64, SnapshotError>> {
|
) -> Option<Result<(u64, Vec<Arc<AccountStorageEntry>>), SnapshotError>> {
|
||||||
let (
|
let (
|
||||||
snapshot_request,
|
snapshot_request,
|
||||||
accounts_package_type,
|
accounts_package_type,
|
||||||
|
@ -265,7 +267,7 @@ impl SnapshotRequestHandler {
|
||||||
last_full_snapshot_slot: &mut Option<Slot>,
|
last_full_snapshot_slot: &mut Option<Slot>,
|
||||||
snapshot_request: SnapshotRequest,
|
snapshot_request: SnapshotRequest,
|
||||||
accounts_package_type: AccountsPackageType,
|
accounts_package_type: AccountsPackageType,
|
||||||
) -> Result<u64, SnapshotError> {
|
) -> Result<(u64, Vec<Arc<AccountStorageEntry>>), SnapshotError> {
|
||||||
debug!(
|
debug!(
|
||||||
"handling snapshot request: {:?}, {:?}",
|
"handling snapshot request: {:?}, {:?}",
|
||||||
snapshot_request, accounts_package_type
|
snapshot_request, accounts_package_type
|
||||||
|
@ -367,7 +369,7 @@ impl SnapshotRequestHandler {
|
||||||
&self.snapshot_config.bank_snapshots_dir,
|
&self.snapshot_config.bank_snapshots_dir,
|
||||||
&self.snapshot_config.full_snapshot_archives_dir,
|
&self.snapshot_config.full_snapshot_archives_dir,
|
||||||
&self.snapshot_config.incremental_snapshot_archives_dir,
|
&self.snapshot_config.incremental_snapshot_archives_dir,
|
||||||
snapshot_storages,
|
snapshot_storages.clone(),
|
||||||
self.snapshot_config.archive_format,
|
self.snapshot_config.archive_format,
|
||||||
self.snapshot_config.snapshot_version,
|
self.snapshot_config.snapshot_version,
|
||||||
accounts_hash_for_testing,
|
accounts_hash_for_testing,
|
||||||
|
@ -379,7 +381,7 @@ impl SnapshotRequestHandler {
|
||||||
AccountsPackage::new_for_epoch_accounts_hash(
|
AccountsPackage::new_for_epoch_accounts_hash(
|
||||||
accounts_package_type,
|
accounts_package_type,
|
||||||
&snapshot_root_bank,
|
&snapshot_root_bank,
|
||||||
snapshot_storages,
|
snapshot_storages.clone(),
|
||||||
accounts_hash_for_testing,
|
accounts_hash_for_testing,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -397,7 +399,10 @@ impl SnapshotRequestHandler {
|
||||||
|
|
||||||
// Cleanup outdated snapshots
|
// Cleanup outdated snapshots
|
||||||
let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time");
|
let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time");
|
||||||
snapshot_utils::purge_old_bank_snapshots(&self.snapshot_config.bank_snapshots_dir);
|
snapshot_utils::purge_old_bank_snapshots(
|
||||||
|
&self.snapshot_config.bank_snapshots_dir,
|
||||||
|
MAX_BANK_SNAPSHOTS_TO_RETAIN,
|
||||||
|
);
|
||||||
purge_old_snapshots_time.stop();
|
purge_old_snapshots_time.stop();
|
||||||
total_time.stop();
|
total_time.stop();
|
||||||
|
|
||||||
|
@ -419,7 +424,7 @@ impl SnapshotRequestHandler {
|
||||||
("total_us", total_time.as_us(), i64),
|
("total_us", total_time.as_us(), i64),
|
||||||
("non_snapshot_time_us", non_snapshot_time_us, i64),
|
("non_snapshot_time_us", non_snapshot_time_us, i64),
|
||||||
);
|
);
|
||||||
Ok(snapshot_root_bank.block_height())
|
Ok((snapshot_root_bank.block_height(), snapshot_storages))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -501,12 +506,13 @@ pub struct AbsRequestHandlers {
|
||||||
|
|
||||||
impl AbsRequestHandlers {
|
impl AbsRequestHandlers {
|
||||||
// Returns the latest requested snapshot block height, if one exists
|
// Returns the latest requested snapshot block height, if one exists
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
pub fn handle_snapshot_requests(
|
pub fn handle_snapshot_requests(
|
||||||
&self,
|
&self,
|
||||||
test_hash_calculation: bool,
|
test_hash_calculation: bool,
|
||||||
non_snapshot_time_us: u128,
|
non_snapshot_time_us: u128,
|
||||||
last_full_snapshot_slot: &mut Option<Slot>,
|
last_full_snapshot_slot: &mut Option<Slot>,
|
||||||
) -> Option<Result<u64, SnapshotError>> {
|
) -> Option<Result<(u64, Vec<Arc<AccountStorageEntry>>), SnapshotError>> {
|
||||||
self.snapshot_request_handler.handle_snapshot_requests(
|
self.snapshot_request_handler.handle_snapshot_requests(
|
||||||
test_hash_calculation,
|
test_hash_calculation,
|
||||||
non_snapshot_time_us,
|
non_snapshot_time_us,
|
||||||
|
@ -538,6 +544,11 @@ impl AccountsBackgroundService {
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let mut stats = StatsManager::new();
|
let mut stats = StatsManager::new();
|
||||||
let mut last_snapshot_end_time = None;
|
let mut last_snapshot_end_time = None;
|
||||||
|
|
||||||
|
// To support fastboot, we must ensure the storages used in the latest bank snapshot are
|
||||||
|
// not recycled nor removed early. Hold an Arc of their AppendVecs to prevent them from
|
||||||
|
// expiring.
|
||||||
|
let mut last_snapshot_storages: Option<Vec<Arc<AccountStorageEntry>>> = None;
|
||||||
loop {
|
loop {
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
|
@ -586,7 +597,7 @@ impl AccountsBackgroundService {
|
||||||
// snapshot requests. This is because startup verification and snapshot
|
// snapshot requests. This is because startup verification and snapshot
|
||||||
// request handling can both kick off accounts hash calculations in background
|
// request handling can both kick off accounts hash calculations in background
|
||||||
// threads, and these must not happen concurrently.
|
// threads, and these must not happen concurrently.
|
||||||
let snapshot_block_height_option_result = bank
|
let snapshot_handle_result = bank
|
||||||
.is_startup_verification_complete()
|
.is_startup_verification_complete()
|
||||||
.then(|| {
|
.then(|| {
|
||||||
request_handlers.handle_snapshot_requests(
|
request_handlers.handle_snapshot_requests(
|
||||||
|
@ -596,7 +607,7 @@ impl AccountsBackgroundService {
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.flatten();
|
.flatten();
|
||||||
if snapshot_block_height_option_result.is_some() {
|
if snapshot_handle_result.is_some() {
|
||||||
last_snapshot_end_time = Some(Instant::now());
|
last_snapshot_end_time = Some(Instant::now());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -606,12 +617,24 @@ impl AccountsBackgroundService {
|
||||||
// slots >= bank.slot()
|
// slots >= bank.slot()
|
||||||
bank.flush_accounts_cache_if_needed();
|
bank.flush_accounts_cache_if_needed();
|
||||||
|
|
||||||
if let Some(snapshot_block_height_result) = snapshot_block_height_option_result
|
if let Some(snapshot_handle_result) = snapshot_handle_result {
|
||||||
{
|
|
||||||
// Safe, see proof above
|
// Safe, see proof above
|
||||||
if let Ok(snapshot_block_height) = snapshot_block_height_result {
|
|
||||||
|
if let Ok((snapshot_block_height, snapshot_storages)) =
|
||||||
|
snapshot_handle_result
|
||||||
|
{
|
||||||
assert!(last_cleaned_block_height <= snapshot_block_height);
|
assert!(last_cleaned_block_height <= snapshot_block_height);
|
||||||
last_cleaned_block_height = snapshot_block_height;
|
last_cleaned_block_height = snapshot_block_height;
|
||||||
|
// Update the option, so the older one is released, causing the release of
|
||||||
|
// its reference counts of the appendvecs
|
||||||
|
last_snapshot_storages = Some(snapshot_storages);
|
||||||
|
debug!(
|
||||||
|
"Number of snapshot storages kept alive for fastboot: {}",
|
||||||
|
last_snapshot_storages
|
||||||
|
.as_ref()
|
||||||
|
.map(|storages| storages.len())
|
||||||
|
.unwrap_or(0)
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
return;
|
return;
|
||||||
|
@ -633,8 +656,15 @@ impl AccountsBackgroundService {
|
||||||
stats.record_and_maybe_submit(start_time.elapsed());
|
stats.record_and_maybe_submit(start_time.elapsed());
|
||||||
sleep(Duration::from_millis(INTERVAL_MS));
|
sleep(Duration::from_millis(INTERVAL_MS));
|
||||||
}
|
}
|
||||||
|
info!(
|
||||||
|
"ABS loop done. Number of snapshot storages kept alive for fastboot: {}",
|
||||||
|
last_snapshot_storages
|
||||||
|
.map(|storages| storages.len())
|
||||||
|
.unwrap_or(0)
|
||||||
|
);
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
Self { t_background }
|
Self { t_background }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5487,11 +5487,15 @@ impl AccountsDb {
|
||||||
drop(recycle_stores);
|
drop(recycle_stores);
|
||||||
let old_id = ret.append_vec_id();
|
let old_id = ret.append_vec_id();
|
||||||
ret.recycle(slot, self.next_id());
|
ret.recycle(slot, self.next_id());
|
||||||
|
// This info show the appendvec history change history. It helps debugging
|
||||||
|
// the appendvec data corrupution issues related to recycling.
|
||||||
debug!(
|
debug!(
|
||||||
"recycling store: {} {:?} old_id: {}",
|
"recycling store: old slot {}, old_id: {}, new slot {}, new id{}, path {:?} ",
|
||||||
|
slot,
|
||||||
|
old_id,
|
||||||
|
ret.slot(),
|
||||||
ret.append_vec_id(),
|
ret.append_vec_id(),
|
||||||
ret.get_path(),
|
ret.get_path(),
|
||||||
old_id
|
|
||||||
);
|
);
|
||||||
self.stats
|
self.stats
|
||||||
.recycle_store_count
|
.recycle_store_count
|
||||||
|
|
|
@ -419,8 +419,7 @@ pub fn reserialize_bank_with_new_accounts_hash(
|
||||||
) -> bool {
|
) -> bool {
|
||||||
let bank_post = snapshot_utils::get_bank_snapshots_dir(bank_snapshots_dir, slot);
|
let bank_post = snapshot_utils::get_bank_snapshots_dir(bank_snapshots_dir, slot);
|
||||||
let bank_post = bank_post.join(snapshot_utils::get_snapshot_file_name(slot));
|
let bank_post = bank_post.join(snapshot_utils::get_snapshot_file_name(slot));
|
||||||
let mut bank_pre = bank_post.clone();
|
let bank_pre = bank_post.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
|
||||||
bank_pre.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
|
|
||||||
|
|
||||||
let mut found = false;
|
let mut found = false;
|
||||||
{
|
{
|
||||||
|
|
|
@ -318,11 +318,10 @@ fn test_bank_serialize_style(
|
||||||
let temp_dir = TempDir::new().unwrap();
|
let temp_dir = TempDir::new().unwrap();
|
||||||
let slot_dir = temp_dir.path().join(slot.to_string());
|
let slot_dir = temp_dir.path().join(slot.to_string());
|
||||||
let post_path = slot_dir.join(slot.to_string());
|
let post_path = slot_dir.join(slot.to_string());
|
||||||
let mut pre_path = post_path.clone();
|
let pre_path = post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
|
||||||
pre_path.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
|
|
||||||
std::fs::create_dir(&slot_dir).unwrap();
|
std::fs::create_dir(&slot_dir).unwrap();
|
||||||
{
|
{
|
||||||
let mut f = std::fs::File::create(&pre_path).unwrap();
|
let mut f = std::fs::File::create(pre_path).unwrap();
|
||||||
f.write_all(&buf).unwrap();
|
f.write_all(&buf).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ use {
|
||||||
},
|
},
|
||||||
accounts_index::AccountSecondaryIndexes,
|
accounts_index::AccountSecondaryIndexes,
|
||||||
accounts_update_notifier_interface::AccountsUpdateNotifier,
|
accounts_update_notifier_interface::AccountsUpdateNotifier,
|
||||||
|
append_vec::AppendVec,
|
||||||
bank::{Bank, BankFieldsToDeserialize, BankSlotDelta},
|
bank::{Bank, BankFieldsToDeserialize, BankSlotDelta},
|
||||||
builtins::Builtins,
|
builtins::Builtins,
|
||||||
hardened_unpack::{
|
hardened_unpack::{
|
||||||
|
@ -164,8 +165,8 @@ impl BankSnapshotInfo {
|
||||||
// BankSnapshotPost file
|
// BankSnapshotPost file
|
||||||
let bank_snapshot_dir = get_bank_snapshots_dir(&bank_snapshots_dir, slot);
|
let bank_snapshot_dir = get_bank_snapshots_dir(&bank_snapshots_dir, slot);
|
||||||
let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
|
let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
|
||||||
let mut bank_snapshot_pre_path = bank_snapshot_post_path.clone();
|
let bank_snapshot_pre_path =
|
||||||
bank_snapshot_pre_path.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
|
bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
|
||||||
|
|
||||||
if bank_snapshot_pre_path.is_file() {
|
if bank_snapshot_pre_path.is_file() {
|
||||||
return Some(BankSnapshotInfo {
|
return Some(BankSnapshotInfo {
|
||||||
|
@ -290,6 +291,9 @@ pub enum SnapshotError {
|
||||||
|
|
||||||
#[error("snapshot slot deltas are invalid: {0}")]
|
#[error("snapshot slot deltas are invalid: {0}")]
|
||||||
VerifySlotDeltas(#[from] VerifySlotDeltasError),
|
VerifySlotDeltas(#[from] VerifySlotDeltasError),
|
||||||
|
|
||||||
|
#[error("invalid AppendVec path: {}", .0.display())]
|
||||||
|
InvalidAppendVecPath(PathBuf),
|
||||||
}
|
}
|
||||||
pub type Result<T> = std::result::Result<T, SnapshotError>;
|
pub type Result<T> = std::result::Result<T, SnapshotError>;
|
||||||
|
|
||||||
|
@ -877,6 +881,105 @@ pub fn create_accounts_run_and_snapshot_dirs(
|
||||||
Ok((run_path, snapshot_path))
|
Ok((run_path, snapshot_path))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return account path from the appendvec path after checking its format.
|
||||||
|
fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
|
||||||
|
let run_path = appendvec_path.parent()?;
|
||||||
|
let run_file_name = run_path.file_name()?;
|
||||||
|
// All appendvec files should be under <account_path>/run/.
|
||||||
|
// When generating the bank snapshot directory, they are hardlinked to <account_path>/snapshot/<slot>/
|
||||||
|
if run_file_name != "run" {
|
||||||
|
error!(
|
||||||
|
"The account path {} does not have run/ as its immediate parent directory.",
|
||||||
|
run_path.display()
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let account_path = run_path.parent()?;
|
||||||
|
Some(account_path.to_path_buf())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// From an appendvec path, derive the snapshot hardlink path. If the corresponding snapshot hardlink
|
||||||
|
/// directory does not exist, create it.
|
||||||
|
fn get_snapshot_accounts_hardlink_dir(
|
||||||
|
appendvec_path: &Path,
|
||||||
|
bank_slot: Slot,
|
||||||
|
account_paths: &mut HashSet<PathBuf>,
|
||||||
|
hardlinks_dir: impl AsRef<Path>,
|
||||||
|
) -> Result<PathBuf> {
|
||||||
|
let account_path = get_account_path_from_appendvec_path(appendvec_path)
|
||||||
|
.ok_or_else(|| SnapshotError::InvalidAppendVecPath(appendvec_path.to_path_buf()))?;
|
||||||
|
|
||||||
|
let snapshot_hardlink_dir = account_path.join("snapshot").join(bank_slot.to_string());
|
||||||
|
|
||||||
|
// Use the hashset to track, to avoid checking the file system. Only set up the hardlink directory
|
||||||
|
// and the symlink to it at the first time of seeing the account_path.
|
||||||
|
if !account_paths.contains(&account_path) {
|
||||||
|
let idx = account_paths.len();
|
||||||
|
debug!(
|
||||||
|
"for appendvec_path {}, create hard-link path {}",
|
||||||
|
appendvec_path.display(),
|
||||||
|
snapshot_hardlink_dir.display()
|
||||||
|
);
|
||||||
|
fs::create_dir_all(&snapshot_hardlink_dir).map_err(|e| {
|
||||||
|
SnapshotError::IoWithSourceAndFile(
|
||||||
|
e,
|
||||||
|
"create hard-link dir",
|
||||||
|
snapshot_hardlink_dir.clone(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
|
||||||
|
symlink::symlink_dir(&snapshot_hardlink_dir, symlink_path).map_err(|e| {
|
||||||
|
SnapshotError::IoWithSourceAndFile(
|
||||||
|
e,
|
||||||
|
"simlink the hard-link dir",
|
||||||
|
snapshot_hardlink_dir.clone(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
account_paths.insert(account_path);
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(snapshot_hardlink_dir)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Hard-link the files from accounts/ to snapshot/<bank_slot>/accounts/
|
||||||
|
/// This keeps the appendvec files alive and with the bank snapshot. The slot and id
|
||||||
|
/// in the file names are also updated in case its file is a recycled one with inconsistent slot
|
||||||
|
/// and id.
|
||||||
|
fn hard_link_storages_to_snapshot(
|
||||||
|
bank_snapshot_dir: impl AsRef<Path>,
|
||||||
|
bank_slot: Slot,
|
||||||
|
snapshot_storages: &[Arc<AccountStorageEntry>],
|
||||||
|
) -> Result<()> {
|
||||||
|
let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join("accounts_hardlinks");
|
||||||
|
fs::create_dir_all(&accounts_hardlinks_dir)?;
|
||||||
|
|
||||||
|
let mut account_paths: HashSet<PathBuf> = HashSet::new();
|
||||||
|
for storage in snapshot_storages {
|
||||||
|
storage.flush()?;
|
||||||
|
let storage_path = storage.accounts.get_path();
|
||||||
|
let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
|
||||||
|
&storage_path,
|
||||||
|
bank_slot,
|
||||||
|
&mut account_paths,
|
||||||
|
&accounts_hardlinks_dir,
|
||||||
|
)?;
|
||||||
|
// The appendvec could be recycled, so its filename may not be consistent to the slot and id.
|
||||||
|
// Use the storage slot and id to compose a consistent file name for the hard-link file.
|
||||||
|
let hardlink_filename = AppendVec::file_name(storage.slot(), storage.append_vec_id());
|
||||||
|
let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
|
||||||
|
fs::hard_link(&storage_path, &hard_link_path).map_err(|e| {
|
||||||
|
let err_msg = format!(
|
||||||
|
"hard-link appendvec file {} to {} failed. Error: {}",
|
||||||
|
storage_path.display(),
|
||||||
|
hard_link_path.display(),
|
||||||
|
e,
|
||||||
|
);
|
||||||
|
SnapshotError::Io(IoError::new(ErrorKind::Other, err_msg))
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Serialize a bank to a snapshot
|
/// Serialize a bank to a snapshot
|
||||||
///
|
///
|
||||||
/// **DEVELOPER NOTE** Any error that is returned from this function may bring down the node! This
|
/// **DEVELOPER NOTE** Any error that is returned from this function may bring down the node! This
|
||||||
|
@ -893,12 +996,21 @@ pub fn add_bank_snapshot(
|
||||||
let mut add_snapshot_time = Measure::start("add-snapshot-ms");
|
let mut add_snapshot_time = Measure::start("add-snapshot-ms");
|
||||||
let slot = bank.slot();
|
let slot = bank.slot();
|
||||||
// bank_snapshots_dir/slot
|
// bank_snapshots_dir/slot
|
||||||
let bank_snapshot_dir = get_bank_snapshots_dir(bank_snapshots_dir, slot);
|
let bank_snapshot_dir = get_bank_snapshots_dir(&bank_snapshots_dir, slot);
|
||||||
|
if bank_snapshot_dir.is_dir() {
|
||||||
|
// There is a time window from when a snapshot directory is created to when its content
|
||||||
|
// is fully filled to become a full state good to construct a bank from. At the init time,
|
||||||
|
// the system may not be booted from the latest snapshot directory, but an older and complete
|
||||||
|
// directory. Then, when adding new snapshots, the newer incomplete snapshot directory could
|
||||||
|
// be found. If so, it should be removed.
|
||||||
|
remove_bank_snapshot(slot, &bank_snapshots_dir)?;
|
||||||
|
}
|
||||||
fs::create_dir_all(&bank_snapshot_dir)?;
|
fs::create_dir_all(&bank_snapshot_dir)?;
|
||||||
|
|
||||||
// the bank snapshot is stored as bank_snapshots_dir/slot/slot.BANK_SNAPSHOT_PRE_FILENAME_EXTENSION
|
// the bank snapshot is stored as bank_snapshots_dir/slot/slot.BANK_SNAPSHOT_PRE_FILENAME_EXTENSION
|
||||||
let mut bank_snapshot_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
|
let bank_snapshot_path = bank_snapshot_dir
|
||||||
bank_snapshot_path.set_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
|
.join(get_snapshot_file_name(slot))
|
||||||
|
.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Creating bank snapshot for slot {}, path: {}",
|
"Creating bank snapshot for slot {}, path: {}",
|
||||||
|
@ -906,6 +1018,12 @@ pub fn add_bank_snapshot(
|
||||||
bank_snapshot_path.display(),
|
bank_snapshot_path.display(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// We are contructing the snapshot directory to contain the full snapshot state information to allow
|
||||||
|
// constructing a bank from this directory. It acts like an archive to include the full state.
|
||||||
|
// The set of the account appendvec files is the necessary part of this snapshot state. Hard-link them
|
||||||
|
// from the operational accounts/ directory to here.
|
||||||
|
hard_link_storages_to_snapshot(&bank_snapshot_dir, slot, snapshot_storages)?;
|
||||||
|
|
||||||
let mut bank_serialize = Measure::start("bank-serialize-ms");
|
let mut bank_serialize = Measure::start("bank-serialize-ms");
|
||||||
let bank_snapshot_serializer = move |stream: &mut BufWriter<File>| -> Result<()> {
|
let bank_snapshot_serializer = move |stream: &mut BufWriter<File>| -> Result<()> {
|
||||||
let serde_style = match snapshot_version {
|
let serde_style = match snapshot_version {
|
||||||
|
@ -994,6 +1112,15 @@ where
|
||||||
P: AsRef<Path>,
|
P: AsRef<Path>,
|
||||||
{
|
{
|
||||||
let bank_snapshot_dir = get_bank_snapshots_dir(&bank_snapshots_dir, slot);
|
let bank_snapshot_dir = get_bank_snapshots_dir(&bank_snapshots_dir, slot);
|
||||||
|
let accounts_hardlinks_dir = bank_snapshot_dir.join("accounts_hardlinks");
|
||||||
|
if fs::metadata(&accounts_hardlinks_dir).is_ok() {
|
||||||
|
// This directory contain symlinks to all accounts snapshot directories.
|
||||||
|
// They should all be removed.
|
||||||
|
for entry in fs::read_dir(accounts_hardlinks_dir)? {
|
||||||
|
let dst_path = fs::read_link(entry?.path())?;
|
||||||
|
fs::remove_dir_all(dst_path)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
fs::remove_dir_all(bank_snapshot_dir)?;
|
fs::remove_dir_all(bank_snapshot_dir)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -2186,9 +2313,9 @@ pub fn verify_snapshot_archive<P, Q, R>(
|
||||||
if let VerifyBank::NonDeterministic(slot) = verify_bank {
|
if let VerifyBank::NonDeterministic(slot) = verify_bank {
|
||||||
// file contents may be different, but deserialized structs should be equal
|
// file contents may be different, but deserialized structs should be equal
|
||||||
let slot = slot.to_string();
|
let slot = slot.to_string();
|
||||||
|
let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
|
||||||
let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
|
let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
|
||||||
let p2 = unpacked_snapshots.join(&slot).join(&slot);
|
let p2 = unpacked_snapshots.join(&slot).join(&slot);
|
||||||
|
|
||||||
assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
|
assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
|
||||||
std::fs::remove_file(p1).unwrap();
|
std::fs::remove_file(p1).unwrap();
|
||||||
std::fs::remove_file(p2).unwrap();
|
std::fs::remove_file(p2).unwrap();
|
||||||
|
@ -2208,6 +2335,17 @@ pub fn verify_snapshot_archive<P, Q, R>(
|
||||||
new_unpacked_status_cache_file,
|
new_unpacked_status_cache_file,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
let accounts_hardlinks_dir = snapshot_slot_dir.join("accounts_hardlinks");
|
||||||
|
if accounts_hardlinks_dir.is_dir() {
|
||||||
|
// This directory contain symlinks to all <account_path>/snapshot/<slot> directories.
|
||||||
|
// They should all be removed.
|
||||||
|
for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
|
||||||
|
let dst_path = fs::read_link(entry.unwrap().path()).unwrap();
|
||||||
|
fs::remove_dir_all(dst_path).unwrap();
|
||||||
|
}
|
||||||
|
std::fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
|
assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
|
||||||
|
@ -2223,13 +2361,16 @@ pub fn verify_snapshot_archive<P, Q, R>(
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove outdated bank snapshots
|
/// Remove outdated bank snapshots
|
||||||
pub fn purge_old_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
|
pub fn purge_old_bank_snapshots(
|
||||||
|
bank_snapshots_dir: impl AsRef<Path>,
|
||||||
|
num_bank_snapshots_to_retain: usize,
|
||||||
|
) {
|
||||||
let do_purge = |mut bank_snapshots: Vec<BankSnapshotInfo>| {
|
let do_purge = |mut bank_snapshots: Vec<BankSnapshotInfo>| {
|
||||||
bank_snapshots.sort_unstable();
|
bank_snapshots.sort_unstable();
|
||||||
bank_snapshots
|
bank_snapshots
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.rev()
|
.rev()
|
||||||
.skip(MAX_BANK_SNAPSHOTS_TO_RETAIN)
|
.skip(num_bank_snapshots_to_retain)
|
||||||
.for_each(|bank_snapshot| {
|
.for_each(|bank_snapshot| {
|
||||||
let r = remove_bank_snapshot(bank_snapshot.slot, &bank_snapshots_dir);
|
let r = remove_bank_snapshot(bank_snapshot.slot, &bank_snapshots_dir);
|
||||||
if r.is_err() {
|
if r.is_err() {
|
||||||
|
@ -3936,7 +4077,7 @@ mod tests {
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let (deserialized_bank, _) = bank_from_snapshot_archives(
|
let (deserialized_bank, _) = bank_from_snapshot_archives(
|
||||||
&[accounts_dir.as_path().to_path_buf()],
|
&[accounts_dir.clone()],
|
||||||
bank_snapshots_dir.path(),
|
bank_snapshots_dir.path(),
|
||||||
&full_snapshot_archive_info,
|
&full_snapshot_archive_info,
|
||||||
Some(&incremental_snapshot_archive_info),
|
Some(&incremental_snapshot_archive_info),
|
||||||
|
@ -4001,7 +4142,7 @@ mod tests {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let (deserialized_bank, _) = bank_from_snapshot_archives(
|
let (deserialized_bank, _) = bank_from_snapshot_archives(
|
||||||
&[accounts_dir.as_path().to_path_buf()],
|
&[accounts_dir],
|
||||||
bank_snapshots_dir.path(),
|
bank_snapshots_dir.path(),
|
||||||
&full_snapshot_archive_info,
|
&full_snapshot_archive_info,
|
||||||
Some(&incremental_snapshot_archive_info),
|
Some(&incremental_snapshot_archive_info),
|
||||||
|
@ -4249,4 +4390,90 @@ mod tests {
|
||||||
Err(VerifySlotDeltasError::SlotNotFoundInDeltas(333)),
|
Err(VerifySlotDeltasError::SlotNotFoundInDeltas(333)),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_bank_snapshot_dir_accounts_hardlinks() {
|
||||||
|
solana_logger::setup();
|
||||||
|
let genesis_config = GenesisConfig::default();
|
||||||
|
let bank = Bank::new_for_tests(&genesis_config);
|
||||||
|
|
||||||
|
bank.fill_bank_with_ticks_for_tests();
|
||||||
|
|
||||||
|
let bank_snapshots_dir = tempfile::TempDir::new().unwrap();
|
||||||
|
|
||||||
|
bank.squash();
|
||||||
|
bank.force_flush_accounts_cache();
|
||||||
|
|
||||||
|
let snapshot_version = SnapshotVersion::default();
|
||||||
|
let snapshot_storages = bank.get_snapshot_storages(None);
|
||||||
|
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
|
||||||
|
add_bank_snapshot(
|
||||||
|
&bank_snapshots_dir,
|
||||||
|
&bank,
|
||||||
|
&snapshot_storages,
|
||||||
|
snapshot_version,
|
||||||
|
slot_deltas,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let accounts_hardlinks_dir =
|
||||||
|
get_bank_snapshots_dir(&bank_snapshots_dir, bank.slot()).join("accounts_hardlinks");
|
||||||
|
assert!(fs::metadata(&accounts_hardlinks_dir).is_ok());
|
||||||
|
|
||||||
|
let mut hardlink_dirs: Vec<PathBuf> = Vec::new();
|
||||||
|
// This directory contain symlinks to all accounts snapshot directories.
|
||||||
|
for entry in fs::read_dir(accounts_hardlinks_dir).unwrap() {
|
||||||
|
let entry = entry.unwrap();
|
||||||
|
let symlink = entry.path();
|
||||||
|
let dst_path = fs::read_link(symlink).unwrap();
|
||||||
|
assert!(fs::metadata(&dst_path).is_ok());
|
||||||
|
hardlink_dirs.push(dst_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(remove_bank_snapshot(bank.slot(), bank_snapshots_dir).is_ok());
|
||||||
|
|
||||||
|
// When the bank snapshot is removed, all the snapshot hardlink directories should be removed.
|
||||||
|
assert!(hardlink_dirs.iter().all(|dir| fs::metadata(dir).is_err()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_get_snapshot_accounts_hardlink_dir() {
|
||||||
|
solana_logger::setup();
|
||||||
|
|
||||||
|
let slot: Slot = 1;
|
||||||
|
|
||||||
|
let mut account_paths_set: HashSet<PathBuf> = HashSet::new();
|
||||||
|
|
||||||
|
let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
|
||||||
|
let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
|
||||||
|
let accounts_hardlinks_dir = bank_snapshot_dir.join("accounts_hardlinks");
|
||||||
|
fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
|
||||||
|
|
||||||
|
let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
|
||||||
|
let appendvec_filename = format!("{slot}.0");
|
||||||
|
let appendvec_path = accounts_dir.join(appendvec_filename);
|
||||||
|
|
||||||
|
let ret = get_snapshot_accounts_hardlink_dir(
|
||||||
|
&appendvec_path,
|
||||||
|
slot,
|
||||||
|
&mut account_paths_set,
|
||||||
|
&accounts_hardlinks_dir,
|
||||||
|
);
|
||||||
|
assert!(ret.is_ok());
|
||||||
|
|
||||||
|
let wrong_appendvec_path = appendvec_path
|
||||||
|
.parent()
|
||||||
|
.unwrap()
|
||||||
|
.parent()
|
||||||
|
.unwrap()
|
||||||
|
.join(appendvec_path.file_name().unwrap());
|
||||||
|
let ret = get_snapshot_accounts_hardlink_dir(
|
||||||
|
&wrong_appendvec_path,
|
||||||
|
slot,
|
||||||
|
&mut account_paths_set,
|
||||||
|
accounts_hardlinks_dir,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(matches!(ret, Err(SnapshotError::InvalidAppendVecPath(_))));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue