From f62293918d3462417ff94e4439c427c257d5c20c Mon Sep 17 00:00:00 2001 From: Brooks Date: Fri, 2 Feb 2024 09:21:26 -0500 Subject: [PATCH] Moves the async deleter code to accounts-db (#35040) --- accounts-db/src/utils.rs | 81 +++++++++++++++++++++++++++++++++ core/src/validator.rs | 7 ++- ledger-tool/src/ledger_utils.rs | 7 ++- runtime/src/snapshot_utils.rs | 80 +------------------------------- 4 files changed, 89 insertions(+), 86 deletions(-) diff --git a/accounts-db/src/utils.rs b/accounts-db/src/utils.rs index 7a38d23b0..6ac1674a3 100644 --- a/accounts-db/src/utils.rs +++ b/accounts-db/src/utils.rs @@ -1,8 +1,13 @@ use { + lazy_static, log::*, + solana_measure::measure, std::{ + collections::HashSet, fs, path::{Path, PathBuf}, + sync::Mutex, + thread, }, }; @@ -54,6 +59,82 @@ pub fn create_accounts_run_and_snapshot_dirs( Ok((run_path, snapshot_path)) } +/// Moves and asynchronously deletes the contents of a directory to avoid blocking on it. +/// The directory is re-created after the move, and should now be empty. +pub fn move_and_async_delete_path_contents(path: impl AsRef) { + move_and_async_delete_path(&path); + // The following could fail if the rename failed. + // If that happens, the directory should be left as is. + // So we ignore errors here. + _ = std::fs::create_dir(path); +} + +/// Delete directories/files asynchronously to avoid blocking on it. +/// First, in sync context, check if the original path exists, if it +/// does, rename the original path to *_to_be_deleted. +/// If there's an in-progress deleting thread for this path, return. +/// Then spawn a thread to delete the renamed path. +pub fn move_and_async_delete_path(path: impl AsRef) { + lazy_static! { + static ref IN_PROGRESS_DELETES: Mutex> = Mutex::new(HashSet::new()); + }; + + // Grab the mutex so no new async delete threads can be spawned for this path. + let mut lock = IN_PROGRESS_DELETES.lock().unwrap(); + + // If the path does not exist, there's nothing to delete. + if !path.as_ref().exists() { + return; + } + + // If the original path (`pathbuf` here) is already being deleted, + // then the path should not be moved and deleted again. + if lock.contains(path.as_ref()) { + return; + } + + let mut path_delete = path.as_ref().to_path_buf(); + path_delete.set_file_name(format!( + "{}{}", + path_delete.file_name().unwrap().to_str().unwrap(), + "_to_be_deleted" + )); + if let Err(err) = fs::rename(&path, &path_delete) { + warn!( + "Cannot async delete, retrying in sync mode: failed to rename '{}' to '{}': {err}", + path.as_ref().display(), + path_delete.display(), + ); + // Although the delete here is synchronous, we want to prevent another thread + // from moving & deleting this directory via `move_and_async_delete_path`. + lock.insert(path.as_ref().to_path_buf()); + drop(lock); // unlock before doing sync delete + + delete_contents_of_path(&path); + IN_PROGRESS_DELETES.lock().unwrap().remove(path.as_ref()); + return; + } + + lock.insert(path_delete.clone()); + drop(lock); + thread::Builder::new() + .name("solDeletePath".to_string()) + .spawn(move || { + trace!("background deleting {}...", path_delete.display()); + let (result, measure_delete) = measure!(fs::remove_dir_all(&path_delete)); + if let Err(err) = result { + panic!("Failed to async delete '{}': {err}", path_delete.display()); + } + trace!( + "background deleting {}... Done, and{measure_delete}", + path_delete.display() + ); + + IN_PROGRESS_DELETES.lock().unwrap().remove(&path_delete); + }) + .expect("spawn background delete thread"); +} + /// Delete the files and subdirectories in a directory. /// This is useful if the process does not have permission /// to delete the top level directory it might be able to diff --git a/core/src/validator.rs b/core/src/validator.rs index 730155249..3adaa699b 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -35,6 +35,7 @@ use { accounts_index::AccountSecondaryIndexes, accounts_update_notifier_interface::AccountsUpdateNotifier, hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, + utils::{move_and_async_delete_path, move_and_async_delete_path_contents}, }, solana_client::connection_cache::{ConnectionCache, Protocol}, solana_entry::poh::compute_hash_time_ns, @@ -100,9 +101,7 @@ use { snapshot_bank_utils::{self, DISABLED_SNAPSHOT_ARCHIVE_INTERVAL}, snapshot_config::SnapshotConfig, snapshot_hash::StartingSnapshotHashes, - snapshot_utils::{ - self, clean_orphaned_account_snapshot_dirs, move_and_async_delete_path_contents, - }, + snapshot_utils::{self, clean_orphaned_account_snapshot_dirs}, }, solana_sdk::{ clock::Slot, @@ -623,7 +622,7 @@ impl Validator { ]; for old_accounts_hash_cache_dir in old_accounts_hash_cache_dirs { if old_accounts_hash_cache_dir.exists() { - snapshot_utils::move_and_async_delete_path(old_accounts_hash_cache_dir); + move_and_async_delete_path(old_accounts_hash_cache_dir); } } diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index ba6ac1ebe..2663a205f 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -4,7 +4,8 @@ use { crossbeam_channel::unbounded, log::*, solana_accounts_db::{ - hardened_unpack::open_genesis_config, utils::create_all_accounts_run_and_snapshot_dirs, + hardened_unpack::open_genesis_config, + utils::{create_all_accounts_run_and_snapshot_dirs, move_and_async_delete_path_contents}, }, solana_core::{ accounts_hash_verifier::AccountsHashVerifier, validator::BlockVerificationMethod, @@ -35,9 +36,7 @@ use { prioritization_fee_cache::PrioritizationFeeCache, snapshot_config::SnapshotConfig, snapshot_hash::StartingSnapshotHashes, - snapshot_utils::{ - self, clean_orphaned_account_snapshot_dirs, move_and_async_delete_path_contents, - }, + snapshot_utils::{self, clean_orphaned_account_snapshot_dirs}, }, solana_sdk::{ clock::Slot, genesis_config::GenesisConfig, pubkey::Pubkey, diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index ff0afc1e7..1bd9c4d25 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -23,7 +23,7 @@ use { append_vec::AppendVec, hardened_unpack::{self, ParallelSelector, UnpackError}, shared_buffer_reader::{SharedBuffer, SharedBufferReader}, - utils::{delete_contents_of_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR}, + utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR}, }, solana_measure::{measure, measure::Measure}, solana_sdk::{clock::Slot, hash::Hash}, @@ -36,7 +36,7 @@ use { path::{Path, PathBuf}, process::ExitStatus, str::FromStr, - sync::{Arc, Mutex}, + sync::Arc, thread::{Builder, JoinHandle}, }, tar::{self, Archive}, @@ -529,82 +529,6 @@ pub enum GetSnapshotAccountsHardLinkDirError { }, } -/// Moves and asynchronously deletes the contents of a directory to avoid blocking on it. -/// The directory is re-created after the move, and should now be empty. -pub fn move_and_async_delete_path_contents(path: impl AsRef) { - move_and_async_delete_path(&path); - // The following could fail if the rename failed. - // If that happens, the directory should be left as is. - // So we ignore errors here. - _ = std::fs::create_dir(path); -} - -/// Delete directories/files asynchronously to avoid blocking on it. -/// First, in sync context, check if the original path exists, if it -/// does, rename the original path to *_to_be_deleted. -/// If there's an in-progress deleting thread for this path, return. -/// Then spawn a thread to delete the renamed path. -pub fn move_and_async_delete_path(path: impl AsRef) { - lazy_static! { - static ref IN_PROGRESS_DELETES: Mutex> = Mutex::new(HashSet::new()); - }; - - // Grab the mutex so no new async delete threads can be spawned for this path. - let mut lock = IN_PROGRESS_DELETES.lock().unwrap(); - - // If the path does not exist, there's nothing to delete. - if !path.as_ref().exists() { - return; - } - - // If the original path (`pathbuf` here) is already being deleted, - // then the path should not be moved and deleted again. - if lock.contains(path.as_ref()) { - return; - } - - let mut path_delete = path.as_ref().to_path_buf(); - path_delete.set_file_name(format!( - "{}{}", - path_delete.file_name().unwrap().to_str().unwrap(), - "_to_be_deleted" - )); - if let Err(err) = fs::rename(&path, &path_delete) { - warn!( - "Cannot async delete, retrying in sync mode: failed to rename '{}' to '{}': {err}", - path.as_ref().display(), - path_delete.display(), - ); - // Although the delete here is synchronous, we want to prevent another thread - // from moving & deleting this directory via `move_and_async_delete_path`. - lock.insert(path.as_ref().to_path_buf()); - drop(lock); // unlock before doing sync delete - - delete_contents_of_path(&path); - IN_PROGRESS_DELETES.lock().unwrap().remove(path.as_ref()); - return; - } - - lock.insert(path_delete.clone()); - drop(lock); - Builder::new() - .name("solDeletePath".to_string()) - .spawn(move || { - trace!("background deleting {}...", path_delete.display()); - let (result, measure_delete) = measure!(fs::remove_dir_all(&path_delete)); - if let Err(err) = result { - panic!("Failed to async delete '{}': {err}", path_delete.display()); - } - trace!( - "background deleting {}... Done, and{measure_delete}", - path_delete.display() - ); - - IN_PROGRESS_DELETES.lock().unwrap().remove(&path_delete); - }) - .expect("spawn background delete thread"); -} - /// The account snapshot directories under /snapshot/ contain account files hardlinked /// from /run taken at snapshot time. They are referenced by the symlinks from the /// bank snapshot dir snapshot//accounts_hardlinks/. We observed that sometimes the bank snapshot dir