Moves the async deleter code to accounts-db (#35040)
This commit is contained in:
parent
5dd9609aea
commit
f62293918d
|
@ -1,8 +1,13 @@
|
|||
use {
|
||||
lazy_static,
|
||||
log::*,
|
||||
solana_measure::measure,
|
||||
std::{
|
||||
collections::HashSet,
|
||||
fs,
|
||||
path::{Path, PathBuf},
|
||||
sync::Mutex,
|
||||
thread,
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -54,6 +59,82 @@ pub fn create_accounts_run_and_snapshot_dirs(
|
|||
Ok((run_path, snapshot_path))
|
||||
}
|
||||
|
||||
/// Moves and asynchronously deletes the contents of a directory to avoid blocking on it.
|
||||
/// The directory is re-created after the move, and should now be empty.
|
||||
pub fn move_and_async_delete_path_contents(path: impl AsRef<Path>) {
|
||||
move_and_async_delete_path(&path);
|
||||
// The following could fail if the rename failed.
|
||||
// If that happens, the directory should be left as is.
|
||||
// So we ignore errors here.
|
||||
_ = std::fs::create_dir(path);
|
||||
}
|
||||
|
||||
/// Delete directories/files asynchronously to avoid blocking on it.
|
||||
/// First, in sync context, check if the original path exists, if it
|
||||
/// does, rename the original path to *_to_be_deleted.
|
||||
/// If there's an in-progress deleting thread for this path, return.
|
||||
/// Then spawn a thread to delete the renamed path.
|
||||
pub fn move_and_async_delete_path(path: impl AsRef<Path>) {
|
||||
lazy_static! {
|
||||
static ref IN_PROGRESS_DELETES: Mutex<HashSet<PathBuf>> = Mutex::new(HashSet::new());
|
||||
};
|
||||
|
||||
// Grab the mutex so no new async delete threads can be spawned for this path.
|
||||
let mut lock = IN_PROGRESS_DELETES.lock().unwrap();
|
||||
|
||||
// If the path does not exist, there's nothing to delete.
|
||||
if !path.as_ref().exists() {
|
||||
return;
|
||||
}
|
||||
|
||||
// If the original path (`pathbuf` here) is already being deleted,
|
||||
// then the path should not be moved and deleted again.
|
||||
if lock.contains(path.as_ref()) {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut path_delete = path.as_ref().to_path_buf();
|
||||
path_delete.set_file_name(format!(
|
||||
"{}{}",
|
||||
path_delete.file_name().unwrap().to_str().unwrap(),
|
||||
"_to_be_deleted"
|
||||
));
|
||||
if let Err(err) = fs::rename(&path, &path_delete) {
|
||||
warn!(
|
||||
"Cannot async delete, retrying in sync mode: failed to rename '{}' to '{}': {err}",
|
||||
path.as_ref().display(),
|
||||
path_delete.display(),
|
||||
);
|
||||
// Although the delete here is synchronous, we want to prevent another thread
|
||||
// from moving & deleting this directory via `move_and_async_delete_path`.
|
||||
lock.insert(path.as_ref().to_path_buf());
|
||||
drop(lock); // unlock before doing sync delete
|
||||
|
||||
delete_contents_of_path(&path);
|
||||
IN_PROGRESS_DELETES.lock().unwrap().remove(path.as_ref());
|
||||
return;
|
||||
}
|
||||
|
||||
lock.insert(path_delete.clone());
|
||||
drop(lock);
|
||||
thread::Builder::new()
|
||||
.name("solDeletePath".to_string())
|
||||
.spawn(move || {
|
||||
trace!("background deleting {}...", path_delete.display());
|
||||
let (result, measure_delete) = measure!(fs::remove_dir_all(&path_delete));
|
||||
if let Err(err) = result {
|
||||
panic!("Failed to async delete '{}': {err}", path_delete.display());
|
||||
}
|
||||
trace!(
|
||||
"background deleting {}... Done, and{measure_delete}",
|
||||
path_delete.display()
|
||||
);
|
||||
|
||||
IN_PROGRESS_DELETES.lock().unwrap().remove(&path_delete);
|
||||
})
|
||||
.expect("spawn background delete thread");
|
||||
}
|
||||
|
||||
/// Delete the files and subdirectories in a directory.
|
||||
/// This is useful if the process does not have permission
|
||||
/// to delete the top level directory it might be able to
|
||||
|
|
|
@ -35,6 +35,7 @@ use {
|
|||
accounts_index::AccountSecondaryIndexes,
|
||||
accounts_update_notifier_interface::AccountsUpdateNotifier,
|
||||
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
|
||||
utils::{move_and_async_delete_path, move_and_async_delete_path_contents},
|
||||
},
|
||||
solana_client::connection_cache::{ConnectionCache, Protocol},
|
||||
solana_entry::poh::compute_hash_time_ns,
|
||||
|
@ -100,9 +101,7 @@ use {
|
|||
snapshot_bank_utils::{self, DISABLED_SNAPSHOT_ARCHIVE_INTERVAL},
|
||||
snapshot_config::SnapshotConfig,
|
||||
snapshot_hash::StartingSnapshotHashes,
|
||||
snapshot_utils::{
|
||||
self, clean_orphaned_account_snapshot_dirs, move_and_async_delete_path_contents,
|
||||
},
|
||||
snapshot_utils::{self, clean_orphaned_account_snapshot_dirs},
|
||||
},
|
||||
solana_sdk::{
|
||||
clock::Slot,
|
||||
|
@ -623,7 +622,7 @@ impl Validator {
|
|||
];
|
||||
for old_accounts_hash_cache_dir in old_accounts_hash_cache_dirs {
|
||||
if old_accounts_hash_cache_dir.exists() {
|
||||
snapshot_utils::move_and_async_delete_path(old_accounts_hash_cache_dir);
|
||||
move_and_async_delete_path(old_accounts_hash_cache_dir);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,8 @@ use {
|
|||
crossbeam_channel::unbounded,
|
||||
log::*,
|
||||
solana_accounts_db::{
|
||||
hardened_unpack::open_genesis_config, utils::create_all_accounts_run_and_snapshot_dirs,
|
||||
hardened_unpack::open_genesis_config,
|
||||
utils::{create_all_accounts_run_and_snapshot_dirs, move_and_async_delete_path_contents},
|
||||
},
|
||||
solana_core::{
|
||||
accounts_hash_verifier::AccountsHashVerifier, validator::BlockVerificationMethod,
|
||||
|
@ -35,9 +36,7 @@ use {
|
|||
prioritization_fee_cache::PrioritizationFeeCache,
|
||||
snapshot_config::SnapshotConfig,
|
||||
snapshot_hash::StartingSnapshotHashes,
|
||||
snapshot_utils::{
|
||||
self, clean_orphaned_account_snapshot_dirs, move_and_async_delete_path_contents,
|
||||
},
|
||||
snapshot_utils::{self, clean_orphaned_account_snapshot_dirs},
|
||||
},
|
||||
solana_sdk::{
|
||||
clock::Slot, genesis_config::GenesisConfig, pubkey::Pubkey,
|
||||
|
|
|
@ -23,7 +23,7 @@ use {
|
|||
append_vec::AppendVec,
|
||||
hardened_unpack::{self, ParallelSelector, UnpackError},
|
||||
shared_buffer_reader::{SharedBuffer, SharedBufferReader},
|
||||
utils::{delete_contents_of_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
|
||||
utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR},
|
||||
},
|
||||
solana_measure::{measure, measure::Measure},
|
||||
solana_sdk::{clock::Slot, hash::Hash},
|
||||
|
@ -36,7 +36,7 @@ use {
|
|||
path::{Path, PathBuf},
|
||||
process::ExitStatus,
|
||||
str::FromStr,
|
||||
sync::{Arc, Mutex},
|
||||
sync::Arc,
|
||||
thread::{Builder, JoinHandle},
|
||||
},
|
||||
tar::{self, Archive},
|
||||
|
@ -529,82 +529,6 @@ pub enum GetSnapshotAccountsHardLinkDirError {
|
|||
},
|
||||
}
|
||||
|
||||
/// Moves and asynchronously deletes the contents of a directory to avoid blocking on it.
|
||||
/// The directory is re-created after the move, and should now be empty.
|
||||
pub fn move_and_async_delete_path_contents(path: impl AsRef<Path>) {
|
||||
move_and_async_delete_path(&path);
|
||||
// The following could fail if the rename failed.
|
||||
// If that happens, the directory should be left as is.
|
||||
// So we ignore errors here.
|
||||
_ = std::fs::create_dir(path);
|
||||
}
|
||||
|
||||
/// Delete directories/files asynchronously to avoid blocking on it.
|
||||
/// First, in sync context, check if the original path exists, if it
|
||||
/// does, rename the original path to *_to_be_deleted.
|
||||
/// If there's an in-progress deleting thread for this path, return.
|
||||
/// Then spawn a thread to delete the renamed path.
|
||||
pub fn move_and_async_delete_path(path: impl AsRef<Path>) {
|
||||
lazy_static! {
|
||||
static ref IN_PROGRESS_DELETES: Mutex<HashSet<PathBuf>> = Mutex::new(HashSet::new());
|
||||
};
|
||||
|
||||
// Grab the mutex so no new async delete threads can be spawned for this path.
|
||||
let mut lock = IN_PROGRESS_DELETES.lock().unwrap();
|
||||
|
||||
// If the path does not exist, there's nothing to delete.
|
||||
if !path.as_ref().exists() {
|
||||
return;
|
||||
}
|
||||
|
||||
// If the original path (`pathbuf` here) is already being deleted,
|
||||
// then the path should not be moved and deleted again.
|
||||
if lock.contains(path.as_ref()) {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut path_delete = path.as_ref().to_path_buf();
|
||||
path_delete.set_file_name(format!(
|
||||
"{}{}",
|
||||
path_delete.file_name().unwrap().to_str().unwrap(),
|
||||
"_to_be_deleted"
|
||||
));
|
||||
if let Err(err) = fs::rename(&path, &path_delete) {
|
||||
warn!(
|
||||
"Cannot async delete, retrying in sync mode: failed to rename '{}' to '{}': {err}",
|
||||
path.as_ref().display(),
|
||||
path_delete.display(),
|
||||
);
|
||||
// Although the delete here is synchronous, we want to prevent another thread
|
||||
// from moving & deleting this directory via `move_and_async_delete_path`.
|
||||
lock.insert(path.as_ref().to_path_buf());
|
||||
drop(lock); // unlock before doing sync delete
|
||||
|
||||
delete_contents_of_path(&path);
|
||||
IN_PROGRESS_DELETES.lock().unwrap().remove(path.as_ref());
|
||||
return;
|
||||
}
|
||||
|
||||
lock.insert(path_delete.clone());
|
||||
drop(lock);
|
||||
Builder::new()
|
||||
.name("solDeletePath".to_string())
|
||||
.spawn(move || {
|
||||
trace!("background deleting {}...", path_delete.display());
|
||||
let (result, measure_delete) = measure!(fs::remove_dir_all(&path_delete));
|
||||
if let Err(err) = result {
|
||||
panic!("Failed to async delete '{}': {err}", path_delete.display());
|
||||
}
|
||||
trace!(
|
||||
"background deleting {}... Done, and{measure_delete}",
|
||||
path_delete.display()
|
||||
);
|
||||
|
||||
IN_PROGRESS_DELETES.lock().unwrap().remove(&path_delete);
|
||||
})
|
||||
.expect("spawn background delete thread");
|
||||
}
|
||||
|
||||
/// The account snapshot directories under <account_path>/snapshot/<slot> contain account files hardlinked
|
||||
/// from <account_path>/run taken at snapshot <slot> time. They are referenced by the symlinks from the
|
||||
/// bank snapshot dir snapshot/<slot>/accounts_hardlinks/. We observed that sometimes the bank snapshot dir
|
||||
|
|
Loading…
Reference in New Issue