Removes RwLock on AccountsDb::shrink_paths (#35027)

This commit is contained in:
Brooks 2024-02-01 09:35:34 -05:00 committed by GitHub
parent 0569304835
commit daa2449ad4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 39 additions and 68 deletions

View File

@ -494,6 +494,7 @@ pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
index: Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING),
base_working_path: None,
accounts_hash_cache_path: None,
shrink_paths: None,
write_cache_limit_bytes: None,
ancient_append_vec_offset: None,
skip_initial_hash_calc: false,
@ -506,6 +507,7 @@ pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig
index: Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS),
base_working_path: None,
accounts_hash_cache_path: None,
shrink_paths: None,
write_cache_limit_bytes: None,
ancient_append_vec_offset: None,
skip_initial_hash_calc: false,
@ -547,6 +549,7 @@ pub struct AccountsDbConfig {
/// Base directory for various necessary files
pub base_working_path: Option<PathBuf>,
pub accounts_hash_cache_path: Option<PathBuf>,
pub shrink_paths: Option<Vec<PathBuf>>,
pub write_cache_limit_bytes: Option<u64>,
/// if None, ancient append vecs are set to ANCIENT_APPEND_VEC_DEFAULT_OFFSET
/// Some(offset) means include slots up to (max_slot - (slots_per_epoch - 'offset'))
@ -1396,7 +1399,7 @@ pub struct AccountsDb {
accounts_hash_cache_path: PathBuf,
pub shrink_paths: RwLock<Option<Vec<PathBuf>>>,
shrink_paths: Vec<PathBuf>,
/// Directory of paths this accounts_db needs to hold/remove
#[allow(dead_code)]
@ -2433,7 +2436,7 @@ impl AccountsDb {
base_working_path,
base_working_temp_dir,
accounts_hash_cache_path,
shrink_paths: RwLock::new(None),
shrink_paths: Vec::default(),
temp_paths: None,
file_size: DEFAULT_FILE_SIZE,
thread_pool: rayon::ThreadPoolBuilder::new()
@ -2570,6 +2573,10 @@ impl AccountsDb {
new.paths = paths;
new.temp_paths = Some(temp_dirs);
};
new.shrink_paths = accounts_db_config
.as_ref()
.and_then(|config| config.shrink_paths.clone())
.unwrap_or_else(|| new.paths.clone());
new.start_background_hasher();
{
@ -2580,15 +2587,6 @@ impl AccountsDb {
new
}
pub fn set_shrink_paths(&self, paths: Vec<PathBuf>) {
assert!(!paths.is_empty());
let mut shrink_paths = self.shrink_paths.write().unwrap();
for path in &paths {
std::fs::create_dir_all(path).expect("Create directory failed.");
}
*shrink_paths = Some(paths);
}
pub fn file_size(&self) -> u64 {
self.file_size
}
@ -4153,12 +4151,7 @@ impl AccountsDb {
let shrunken_store = self
.try_recycle_store(slot, aligned_total, aligned_total + 1024)
.unwrap_or_else(|| {
let maybe_shrink_paths = self.shrink_paths.read().unwrap();
let (shrink_paths, from) = maybe_shrink_paths
.as_ref()
.map(|paths| (paths, "shrink-w-path"))
.unwrap_or_else(|| (&self.paths, "shrink"));
self.create_store(slot, aligned_total, from, shrink_paths)
self.create_store(slot, aligned_total, "shrink", self.shrink_paths.as_slice())
});
self.storage.shrinking_in_progress(slot, shrunken_store)
}

View File

@ -205,7 +205,6 @@ pub struct ValidatorConfig {
pub voting_disabled: bool,
pub account_paths: Vec<PathBuf>,
pub account_snapshot_paths: Vec<PathBuf>,
pub account_shrink_paths: Option<Vec<PathBuf>>,
pub rpc_config: JsonRpcConfig,
/// Specifies which plugins to start up with
pub on_start_geyser_plugin_config_files: Option<Vec<PathBuf>>,
@ -277,7 +276,6 @@ impl Default for ValidatorConfig {
max_ledger_shreds: None,
account_paths: Vec::new(),
account_snapshot_paths: Vec::new(),
account_shrink_paths: None,
rpc_config: JsonRpcConfig::default(),
on_start_geyser_plugin_config_files: None,
rpc_addrs: None,
@ -1838,7 +1836,6 @@ fn load_blockstore(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
Some(&config.snapshot_config),
&process_options,
transaction_history_services
@ -1865,11 +1862,6 @@ fn load_blockstore(
let mut bank_forks = bank_forks.write().unwrap();
bank_forks.set_snapshot_config(Some(config.snapshot_config.clone()));
bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots);
if let Some(ref shrink_paths) = config.account_shrink_paths {
bank_forks
.working_bank()
.set_shrink_paths(shrink_paths.clone());
}
}
Ok((
@ -2448,12 +2440,16 @@ fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: boo
}
fn cleanup_accounts_paths(config: &ValidatorConfig) {
for accounts_path in &config.account_paths {
move_and_async_delete_path_contents(accounts_path);
for account_path in &config.account_paths {
move_and_async_delete_path_contents(account_path);
}
if let Some(ref shrink_paths) = config.account_shrink_paths {
for accounts_path in shrink_paths {
move_and_async_delete_path_contents(accounts_path);
if let Some(shrink_paths) = config
.accounts_db_config
.as_ref()
.and_then(|config| config.shrink_paths.as_ref())
{
for shrink_path in shrink_paths {
move_and_async_delete_path_contents(shrink_path);
}
}
}

View File

@ -274,7 +274,6 @@ pub fn load_and_process_ledger(
genesis_config,
blockstore.as_ref(),
account_paths,
None,
snapshot_config.as_ref(),
&process_options,
None,

View File

@ -80,7 +80,6 @@ pub fn load(
genesis_config: &GenesisConfig,
blockstore: &Blockstore,
account_paths: Vec<PathBuf>,
shrink_paths: Option<Vec<PathBuf>>,
snapshot_config: Option<&SnapshotConfig>,
process_options: ProcessOptions,
transaction_status_sender: Option<&TransactionStatusSender>,
@ -93,7 +92,6 @@ pub fn load(
genesis_config,
blockstore,
account_paths,
shrink_paths,
snapshot_config,
&process_options,
cache_block_meta_sender,
@ -121,7 +119,6 @@ pub fn load_bank_forks(
genesis_config: &GenesisConfig,
blockstore: &Blockstore,
account_paths: Vec<PathBuf>,
shrink_paths: Option<Vec<PathBuf>>,
snapshot_config: Option<&SnapshotConfig>,
process_options: &ProcessOptions,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
@ -181,7 +178,6 @@ pub fn load_bank_forks(
incremental_snapshot_archive_info,
genesis_config,
account_paths,
shrink_paths,
snapshot_config,
process_options,
accounts_update_notifier,
@ -231,7 +227,6 @@ fn bank_forks_from_snapshot(
incremental_snapshot_archive_info: Option<IncrementalSnapshotArchiveInfo>,
genesis_config: &GenesisConfig,
account_paths: Vec<PathBuf>,
shrink_paths: Option<Vec<PathBuf>>,
snapshot_config: &SnapshotConfig,
process_options: &ProcessOptions,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
@ -345,10 +340,6 @@ fn bank_forks_from_snapshot(
bank
};
if let Some(shrink_paths) = shrink_paths {
bank.set_shrink_paths(shrink_paths);
}
let full_snapshot_hash = FullSnapshotHash((
full_snapshot_archive_info.slot(),
*full_snapshot_archive_info.hash(),

View File

@ -745,7 +745,6 @@ pub fn test_process_blockstore(
blockstore,
Vec::new(),
None,
None,
opts,
None,
None,

View File

@ -13,7 +13,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
voting_disabled: config.voting_disabled,
account_paths: config.account_paths.clone(),
account_snapshot_paths: config.account_snapshot_paths.clone(),
account_shrink_paths: config.account_shrink_paths.clone(),
rpc_config: config.rpc_config.clone(),
on_start_geyser_plugin_config_files: config.on_start_geyser_plugin_config_files.clone(),
rpc_addrs: config.rpc_addrs,

View File

@ -2211,7 +2211,6 @@ fn create_snapshot_to_hard_fork(
.unwrap()
.0,
],
None,
Some(&snapshot_config),
process_options,
None,

View File

@ -4436,10 +4436,6 @@ impl Bank {
self.rc.accounts.accounts_db.remove_unrooted_slots(slots)
}
pub fn set_shrink_paths(&self, paths: Vec<PathBuf>) {
self.rc.accounts.accounts_db.set_shrink_paths(paths);
}
fn check_age(
&self,
sanitized_txs: &[impl core::borrow::Borrow<SanitizedTransaction>],

View File

@ -1205,10 +1205,30 @@ pub fn main() {
.ok()
.map(|mb| mb * MB);
let account_shrink_paths: Option<Vec<PathBuf>> =
values_t!(matches, "account_shrink_path", String)
.map(|shrink_paths| shrink_paths.into_iter().map(PathBuf::from).collect())
.ok();
let account_shrink_paths = account_shrink_paths.as_ref().map(|paths| {
create_and_canonicalize_directories(paths).unwrap_or_else(|err| {
eprintln!("Unable to access account shrink path: {err}");
exit(1);
})
});
let (account_shrink_run_paths, account_shrink_snapshot_paths) = account_shrink_paths
.map(|paths| {
create_all_accounts_run_and_snapshot_dirs(&paths).unwrap_or_else(|err| {
eprintln!("Error: {err}");
exit(1);
})
})
.unzip();
let accounts_db_config = AccountsDbConfig {
index: Some(accounts_index_config),
base_working_path: Some(ledger_path.clone()),
accounts_hash_cache_path: Some(accounts_hash_cache_path),
shrink_paths: account_shrink_run_paths,
write_cache_limit_bytes: value_t!(matches, "accounts_db_cache_limit_mb", u64)
.ok()
.map(|mb| mb * MB as u64),
@ -1452,35 +1472,14 @@ pub fn main() {
exit(1);
});
let account_shrink_paths: Option<Vec<PathBuf>> =
values_t!(matches, "account_shrink_path", String)
.map(|shrink_paths| shrink_paths.into_iter().map(PathBuf::from).collect())
.ok();
let account_shrink_paths = account_shrink_paths.as_ref().map(|paths| {
create_and_canonicalize_directories(paths).unwrap_or_else(|err| {
eprintln!("Unable to access account shrink path: {err}");
exit(1);
})
});
let (account_run_paths, account_snapshot_paths) =
create_all_accounts_run_and_snapshot_dirs(&account_paths).unwrap_or_else(|err| {
eprintln!("Error: {err}");
exit(1);
});
let (account_shrink_run_paths, account_shrink_snapshot_paths) = account_shrink_paths
.map(|paths| {
create_all_accounts_run_and_snapshot_dirs(&paths).unwrap_or_else(|err| {
eprintln!("Error: {err}");
exit(1);
})
})
.unzip();
// From now on, use run/ paths in the same way as the previous account_paths.
validator_config.account_paths = account_run_paths;
validator_config.account_shrink_paths = account_shrink_run_paths;
// These snapshot paths are only used for initial clean up, add in shrink paths if they exist.
validator_config.account_snapshot_paths =