Clean orphaned account snapshot dirs (#30645)
* Clean up orphaned account snapshot hardlink dirs * fix compilation issues * debugged, now working. seeing the orphaned directories deleted * change back to eprintln + exit for account_path error * changed eprintln to panic for now * add test_clean_orphaned_account_snapshot_dirs for codecov check * address a few comments and nit isseus * directly unzip, skipped the intermediate array of tuples * let set_up_account_run_and_snapshot_paths return Result * 'proper' typo, and comment on return * use map_err * use for loop in clean_orphaned_account_snapshot_dirs, removed panic * add test_set_up_account_run_and_snapshot_paths * minor, replace .for_each with .all * rename set_up_account_run_and_snapshot_paths to create_all_accounts_run_and_snapshot_dirs * remove unnecessary closure return type * change to for loop * change match to unwrap_or_else * remove create_dir_all(&account_path) in create_all * minor comment cleanup
This commit is contained in:
parent
c449a15c30
commit
8e3a30c22c
|
@ -89,7 +89,7 @@ use {
|
|||
snapshot_archive_info::SnapshotArchiveInfoGetter,
|
||||
snapshot_config::SnapshotConfig,
|
||||
snapshot_hash::StartingSnapshotHashes,
|
||||
snapshot_utils::{self, move_and_async_delete_path},
|
||||
snapshot_utils::{self, clean_orphaned_account_snapshot_dirs, move_and_async_delete_path},
|
||||
},
|
||||
solana_sdk::{
|
||||
clock::Slot,
|
||||
|
@ -128,6 +128,7 @@ pub struct ValidatorConfig {
|
|||
pub expected_shred_version: Option<u16>,
|
||||
pub voting_disabled: bool,
|
||||
pub account_paths: Vec<PathBuf>,
|
||||
pub account_snapshot_paths: Vec<PathBuf>,
|
||||
pub account_shrink_paths: Option<Vec<PathBuf>>,
|
||||
pub rpc_config: JsonRpcConfig,
|
||||
/// Specifies which plugins to start up with
|
||||
|
@ -193,6 +194,7 @@ impl Default for ValidatorConfig {
|
|||
voting_disabled: false,
|
||||
max_ledger_shreds: None,
|
||||
account_paths: Vec::new(),
|
||||
account_snapshot_paths: Vec::new(),
|
||||
account_shrink_paths: None,
|
||||
rpc_config: JsonRpcConfig::default(),
|
||||
on_start_geyser_plugin_config_files: None,
|
||||
|
@ -494,6 +496,17 @@ impl Validator {
|
|||
start.stop();
|
||||
info!("done. {}", start);
|
||||
|
||||
info!("Cleaning orphaned account snapshot directories..");
|
||||
if let Err(e) = clean_orphaned_account_snapshot_dirs(
|
||||
&config.snapshot_config.bank_snapshots_dir,
|
||||
&config.account_snapshot_paths,
|
||||
) {
|
||||
return Err(format!(
|
||||
"Failed to clean orphaned account snapshot directories: {e:?}"
|
||||
));
|
||||
}
|
||||
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
{
|
||||
let exit = exit.clone();
|
||||
config
|
||||
|
|
|
@ -68,8 +68,9 @@ use {
|
|||
snapshot_hash::StartingSnapshotHashes,
|
||||
snapshot_minimizer::SnapshotMinimizer,
|
||||
snapshot_utils::{
|
||||
self, create_accounts_run_and_snapshot_dirs, move_and_async_delete_path, ArchiveFormat,
|
||||
SnapshotVersion, DEFAULT_ARCHIVE_COMPRESSION, SUPPORTED_ARCHIVE_COMPRESSION,
|
||||
self, clean_orphaned_account_snapshot_dirs, create_all_accounts_run_and_snapshot_dirs,
|
||||
move_and_async_delete_path, ArchiveFormat, SnapshotVersion,
|
||||
DEFAULT_ARCHIVE_COMPRESSION, SUPPORTED_ARCHIVE_COMPRESSION,
|
||||
},
|
||||
},
|
||||
solana_sdk::{
|
||||
|
@ -1112,7 +1113,7 @@ fn load_bank_forks(
|
|||
Some(SnapshotConfig {
|
||||
full_snapshot_archives_dir,
|
||||
incremental_snapshot_archives_dir,
|
||||
bank_snapshots_dir,
|
||||
bank_snapshots_dir: bank_snapshots_dir.clone(),
|
||||
..SnapshotConfig::new_load_only()
|
||||
})
|
||||
};
|
||||
|
@ -1179,18 +1180,11 @@ fn load_bank_forks(
|
|||
vec![non_primary_accounts_path]
|
||||
};
|
||||
|
||||
// For all account_paths, set up the run/ and snapshot/ sub directories.
|
||||
// If the sub directories do not exist, the account_path will be cleaned because older version put account files there
|
||||
let account_run_paths: Vec<PathBuf> = account_paths.into_iter().map(
|
||||
|account_path| {
|
||||
match create_accounts_run_and_snapshot_dirs(&account_path) {
|
||||
Ok((account_run_path, _account_snapshot_path)) => account_run_path,
|
||||
Err(err) => {
|
||||
eprintln!("Unable to create account run and snapshot sub directories: {}, err: {err:?}", account_path.display());
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
}).collect();
|
||||
let (account_run_paths, account_snapshot_paths) =
|
||||
create_all_accounts_run_and_snapshot_dirs(&account_paths).unwrap_or_else(|err| {
|
||||
eprintln!("Error: {err:?}");
|
||||
exit(1);
|
||||
});
|
||||
|
||||
// From now on, use run/ paths in the same way as the previous account_paths.
|
||||
let account_paths = account_run_paths;
|
||||
|
@ -1205,6 +1199,17 @@ fn load_bank_forks(
|
|||
measure.stop();
|
||||
info!("done. {}", measure);
|
||||
|
||||
info!(
|
||||
"Cleaning contents of account snapshot paths: {:?}",
|
||||
account_snapshot_paths
|
||||
);
|
||||
if let Err(e) =
|
||||
clean_orphaned_account_snapshot_dirs(&bank_snapshots_dir, &account_snapshot_paths)
|
||||
{
|
||||
eprintln!("Failed to clean orphaned account snapshot dirs. Error: {e:?}");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
let mut accounts_update_notifier = Option::<AccountsUpdateNotifier>::default();
|
||||
let mut transaction_notifier = Option::<TransactionNotifierLock>::default();
|
||||
if arg_matches.is_present("geyser_plugin_config") {
|
||||
|
|
|
@ -12,6 +12,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
|
|||
expected_shred_version: config.expected_shred_version,
|
||||
voting_disabled: config.voting_disabled,
|
||||
account_paths: config.account_paths.clone(),
|
||||
account_snapshot_paths: config.account_snapshot_paths.clone(),
|
||||
account_shrink_paths: config.account_shrink_paths.clone(),
|
||||
rpc_config: config.rpc_config.clone(),
|
||||
on_start_geyser_plugin_config_files: config.on_start_geyser_plugin_config_files.clone(),
|
||||
|
|
|
@ -72,6 +72,7 @@ pub use archive_format::*;
|
|||
pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
|
||||
pub const SNAPSHOT_VERSION_FILENAME: &str = "version";
|
||||
pub const SNAPSHOT_STATE_COMPLETE_FILENAME: &str = "state_complete";
|
||||
pub const SNAPSHOT_ACCOUNTS_HARDLINKS: &str = "accounts_hardlinks";
|
||||
pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
|
||||
pub const DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = 25_000;
|
||||
pub const DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = 100;
|
||||
|
@ -471,6 +472,48 @@ pub fn move_and_async_delete_path(path: impl AsRef<Path> + Copy) {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
/// The account snapshot directories under <account_path>/snapshot/<slot> contain account files hardlinked
|
||||
/// from <account_path>/run taken at snapshot <slot> time. They are referenced by the symlinks from the
|
||||
/// bank snapshot dir snapshot/<slot>/accounts_hardlinks/. We observed that sometimes the bank snapshot dir
|
||||
/// could be deleted but the account snapshot directories were left behind, possibly by some manual operations
|
||||
/// or some legacy code not using the symlinks to clean up the acccount snapshot hardlink directories.
|
||||
/// This function cleans up any account snapshot directories that are no longer referenced by the bank
|
||||
/// snapshot dirs, to ensure proper snapshot operations.
|
||||
pub fn clean_orphaned_account_snapshot_dirs(
|
||||
bank_snapshots_dir: impl AsRef<Path>,
|
||||
account_snapshot_paths: &[PathBuf],
|
||||
) -> Result<()> {
|
||||
// Create the HashSet of the account snapshot hardlink directories referenced by the snapshot dirs.
|
||||
// This is used to clean up any hardlinks that are no longer referenced by the snapshot dirs.
|
||||
let mut account_snapshot_dirs_referenced = HashSet::new();
|
||||
let snapshots = get_bank_snapshots(bank_snapshots_dir);
|
||||
for snapshot in snapshots {
|
||||
let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
|
||||
// loop through entries in the snapshot_hardlink_dir, read the symlinks, add the target to the HashSet
|
||||
for entry in fs::read_dir(&account_hardlinks_dir)? {
|
||||
let path = entry?.path();
|
||||
let target = fs::read_link(&path)?;
|
||||
account_snapshot_dirs_referenced.insert(target);
|
||||
}
|
||||
}
|
||||
|
||||
// loop through the account snapshot hardlink directories, if the directory is not in the account_snapshot_dirs_referenced set, delete it
|
||||
for account_snapshot_path in account_snapshot_paths {
|
||||
for entry in fs::read_dir(account_snapshot_path)? {
|
||||
let path = entry?.path();
|
||||
if !account_snapshot_dirs_referenced.contains(&path) {
|
||||
info!(
|
||||
"Removing orphaned account snapshot hardlink directory: {}",
|
||||
path.display()
|
||||
);
|
||||
move_and_async_delete_path(&path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
|
||||
/// directory won't be cleaned up. Call this function to clean them up.
|
||||
pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
|
||||
|
@ -963,6 +1006,30 @@ pub fn create_accounts_run_and_snapshot_dirs(
|
|||
Ok((run_path, snapshot_path))
|
||||
}
|
||||
|
||||
/// For all account_paths, create the run/ and snapshot/ sub directories.
|
||||
/// If an account_path directory does not exist, create it.
|
||||
/// It returns (account_run_paths, account_snapshot_paths) or error
|
||||
pub fn create_all_accounts_run_and_snapshot_dirs(
|
||||
account_paths: &[PathBuf],
|
||||
) -> Result<(Vec<PathBuf>, Vec<PathBuf>)> {
|
||||
let mut run_dirs = Vec::with_capacity(account_paths.len());
|
||||
let mut snapshot_dirs = Vec::with_capacity(account_paths.len());
|
||||
for account_path in account_paths {
|
||||
// create the run/ and snapshot/ sub directories for each account_path
|
||||
let (run_dir, snapshot_dir) =
|
||||
create_accounts_run_and_snapshot_dirs(account_path).map_err(|err| {
|
||||
SnapshotError::IoWithSourceAndFile(
|
||||
err,
|
||||
"Unable to create account run and snapshot directories",
|
||||
account_path.to_path_buf(),
|
||||
)
|
||||
})?;
|
||||
run_dirs.push(run_dir);
|
||||
snapshot_dirs.push(snapshot_dir);
|
||||
}
|
||||
Ok((run_dirs, snapshot_dirs))
|
||||
}
|
||||
|
||||
/// Return account path from the appendvec path after checking its format.
|
||||
fn get_account_path_from_appendvec_path(appendvec_path: &Path) -> Option<PathBuf> {
|
||||
let run_path = appendvec_path.parent()?;
|
||||
|
@ -2801,7 +2868,7 @@ mod tests {
|
|||
system_transaction,
|
||||
transaction::SanitizedTransaction,
|
||||
},
|
||||
std::{convert::TryFrom, mem::size_of},
|
||||
std::{convert::TryFrom, mem::size_of, os::unix::fs::PermissionsExt, sync::Arc},
|
||||
tempfile::NamedTempFile,
|
||||
};
|
||||
|
||||
|
@ -4688,4 +4755,117 @@ mod tests {
|
|||
let snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap();
|
||||
assert_eq!(snapshot.slot, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_create_all_accounts_run_and_snapshot_dirs() {
|
||||
solana_logger::setup();
|
||||
|
||||
let (_tmp_dirs, account_paths): (Vec<TempDir>, Vec<PathBuf>) = (0..4)
|
||||
.map(|_| {
|
||||
let tmp_dir = tempfile::TempDir::new().unwrap();
|
||||
let account_path = tmp_dir.path().join("accounts");
|
||||
(tmp_dir, account_path)
|
||||
})
|
||||
.unzip();
|
||||
|
||||
// Set the parent directory of the first account path to be readonly, so that
|
||||
// create_dir_all in create_all_accounts_run_and_snapshot_dirs fails.
|
||||
let account_path_first = &account_paths[0];
|
||||
let parent = account_path_first.parent().unwrap();
|
||||
let mut parent_permissions = fs::metadata(parent).unwrap().permissions();
|
||||
parent_permissions.set_readonly(true);
|
||||
fs::set_permissions(parent, parent_permissions.clone()).unwrap();
|
||||
|
||||
// assert that create_all_accounts_run_and_snapshot_dirs returns error when the first account path
|
||||
// is readonly.
|
||||
assert!(create_all_accounts_run_and_snapshot_dirs(&account_paths).is_err());
|
||||
|
||||
// Set the parent directory of the first account path to be writable, so that
|
||||
// create_all_accounts_run_and_snapshot_dirs returns Ok.
|
||||
parent_permissions.set_mode(0o744);
|
||||
fs::set_permissions(parent, parent_permissions.clone()).unwrap();
|
||||
let result = create_all_accounts_run_and_snapshot_dirs(&account_paths);
|
||||
assert!(result.is_ok());
|
||||
|
||||
let (account_run_paths, account_snapshot_paths) = result.unwrap();
|
||||
account_run_paths.iter().all(|path| path.is_dir());
|
||||
account_snapshot_paths.iter().all(|path| path.is_dir());
|
||||
|
||||
delete_contents_of_path(account_path_first);
|
||||
assert!(account_path_first.exists());
|
||||
let mut permissions = fs::metadata(account_path_first).unwrap().permissions();
|
||||
permissions.set_readonly(true);
|
||||
fs::set_permissions(account_path_first, permissions.clone()).unwrap();
|
||||
parent_permissions.set_readonly(true);
|
||||
fs::set_permissions(parent, parent_permissions.clone()).unwrap();
|
||||
// assert that create_all_accounts_run_and_snapshot_dirs returns error when the first account path
|
||||
// and its parent are readonly. This exercises the case where the first account path is readonly,
|
||||
// causing create_accounts_run_and_snapshot_dirs to fail.
|
||||
assert!(create_all_accounts_run_and_snapshot_dirs(&account_paths).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clean_orphaned_account_snapshot_dirs() {
|
||||
solana_logger::setup();
|
||||
let genesis_config = GenesisConfig::default();
|
||||
let mut bank = Arc::new(Bank::new_for_tests(&genesis_config));
|
||||
|
||||
let tmp_dir = tempfile::TempDir::new().unwrap();
|
||||
let bank_snapshots_dir = tmp_dir.path();
|
||||
let collecter_id = Pubkey::new_unique();
|
||||
let snapshot_version = SnapshotVersion::default();
|
||||
|
||||
for _ in 0..2 {
|
||||
// prepare the bank
|
||||
bank = Arc::new(Bank::new_from_parent(&bank, &collecter_id, bank.slot() + 1));
|
||||
bank.fill_bank_with_ticks_for_tests();
|
||||
bank.squash();
|
||||
bank.force_flush_accounts_cache();
|
||||
|
||||
// generate the bank snapshot directory for slot+1
|
||||
let snapshot_storages = bank.get_snapshot_storages(None);
|
||||
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
|
||||
add_bank_snapshot(
|
||||
bank_snapshots_dir,
|
||||
&bank,
|
||||
&snapshot_storages,
|
||||
snapshot_version,
|
||||
slot_deltas,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let snapshot_dir_slot_2 = bank_snapshots_dir.join("2");
|
||||
let accounts_link_dir_slot_2 = snapshot_dir_slot_2.join("accounts_hardlinks");
|
||||
|
||||
// the symlinks point to the account snapshot hardlink directories <account_path>/snapshot/<slot>/ for slot 2
|
||||
// get them via read_link
|
||||
let hardlink_dirs_slot_2: Vec<PathBuf> = fs::read_dir(accounts_link_dir_slot_2)
|
||||
.unwrap()
|
||||
.map(|entry| {
|
||||
let symlink = entry.unwrap().path();
|
||||
fs::read_link(symlink).unwrap()
|
||||
})
|
||||
.collect();
|
||||
|
||||
// remove the bank snapshot directory for slot 2, so the account snapshot slot 2 directories become orphaned
|
||||
fs::remove_dir_all(snapshot_dir_slot_2).unwrap();
|
||||
|
||||
// verify the orphaned account snapshot hardlink directories are still there
|
||||
assert!(hardlink_dirs_slot_2
|
||||
.iter()
|
||||
.all(|dir| fs::metadata(dir).is_ok()));
|
||||
|
||||
let account_snapshot_paths: Vec<PathBuf> = hardlink_dirs_slot_2
|
||||
.iter()
|
||||
.map(|dir| dir.parent().unwrap().parent().unwrap().to_path_buf())
|
||||
.collect();
|
||||
// clean the orphaned hardlink directories
|
||||
clean_orphaned_account_snapshot_dirs(bank_snapshots_dir, &account_snapshot_paths).unwrap();
|
||||
|
||||
// verify the hardlink directories are gone
|
||||
assert!(hardlink_dirs_slot_2
|
||||
.iter()
|
||||
.all(|dir| fs::metadata(dir).is_err()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ use {
|
|||
runtime_config::RuntimeConfig,
|
||||
snapshot_config::{SnapshotConfig, SnapshotUsage},
|
||||
snapshot_utils::{
|
||||
self, create_accounts_run_and_snapshot_dirs, ArchiveFormat, SnapshotVersion,
|
||||
self, create_all_accounts_run_and_snapshot_dirs, ArchiveFormat, SnapshotVersion,
|
||||
},
|
||||
},
|
||||
solana_sdk::{
|
||||
|
@ -1351,32 +1351,26 @@ pub fn main() {
|
|||
.ok();
|
||||
|
||||
// Create and canonicalize account paths to avoid issues with symlink creation
|
||||
let account_run_paths: Vec<PathBuf> = account_paths
|
||||
.into_iter()
|
||||
.map(|account_path| {
|
||||
match fs::create_dir_all(&account_path).and_then(|_| fs::canonicalize(&account_path)) {
|
||||
Ok(account_path) => account_path,
|
||||
Err(err) => {
|
||||
eprintln!("Unable to access account path: {account_path:?}, err: {err:?}");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
}).map(
|
||||
|account_path| {
|
||||
// For all account_paths, set up the run/ and snapshot/ sub directories.
|
||||
// If the sub directories do not exist, the account_path will be cleaned because older version put account files there
|
||||
match create_accounts_run_and_snapshot_dirs(&account_path) {
|
||||
Ok((account_run_path, _account_snapshot_path)) => account_run_path,
|
||||
Err(err) => {
|
||||
eprintln!("Unable to create account run and snapshot sub directories: {}, err: {err:?}", account_path.display());
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
}).collect();
|
||||
account_paths.iter().for_each(|account_path| {
|
||||
fs::create_dir_all(account_path)
|
||||
.and_then(|_| fs::canonicalize(account_path))
|
||||
.unwrap_or_else(|err| {
|
||||
eprintln!("Unable to access account path: {account_path:?}, err: {err:?}");
|
||||
exit(1);
|
||||
});
|
||||
});
|
||||
|
||||
let (account_run_paths, account_snapshot_paths) =
|
||||
create_all_accounts_run_and_snapshot_dirs(&account_paths).unwrap_or_else(|err| {
|
||||
eprintln!("Error: {err:?}");
|
||||
exit(1);
|
||||
});
|
||||
|
||||
// From now on, use run/ paths in the same way as the previous account_paths.
|
||||
validator_config.account_paths = account_run_paths;
|
||||
|
||||
validator_config.account_snapshot_paths = account_snapshot_paths;
|
||||
|
||||
validator_config.account_shrink_paths = account_shrink_paths.map(|paths| {
|
||||
paths
|
||||
.into_iter()
|
||||
|
|
Loading…
Reference in New Issue