From baa96024119cb93fb699a2b7b21055d9133c29ed Mon Sep 17 00:00:00 2001 From: sakridge Date: Mon, 21 Dec 2020 21:33:37 -0800 Subject: [PATCH] Add shrink paths (#14238) --- core/src/validator.rs | 11 ++++ ledger-tool/src/main.rs | 1 + ledger/src/bank_forks_utils.rs | 4 ++ ledger/src/blockstore_processor.rs | 2 +- runtime/src/accounts_db.rs | 88 ++++++++++++++++++++++++------ runtime/src/accounts_index.rs | 4 ++ runtime/src/bank.rs | 11 ++-- validator/src/main.rs | 32 +++++++++++ 8 files changed, 132 insertions(+), 21 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 4c23f5fafb..1f5bae1654 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -79,6 +79,7 @@ pub struct ValidatorConfig { pub expected_shred_version: Option, pub voting_disabled: bool, pub account_paths: Vec, + pub account_shrink_paths: Option>, pub rpc_config: JsonRpcConfig, pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub) pub pubsub_config: PubSubConfig, @@ -120,6 +121,7 @@ impl Default for ValidatorConfig { voting_disabled: false, max_ledger_shreds: None, account_paths: Vec::new(), + account_shrink_paths: None, rpc_config: JsonRpcConfig::default(), rpc_addrs: None, pubsub_config: PubSubConfig::default(), @@ -272,6 +274,11 @@ impl Validator { for accounts_path in &config.account_paths { cleanup_accounts_path(accounts_path); } + if let Some(ref shrink_paths) = config.account_shrink_paths { + for accounts_path in shrink_paths { + cleanup_accounts_path(accounts_path); + } + } start.stop(); info!("done. {}", start); @@ -311,6 +318,9 @@ impl Validator { let leader_schedule_cache = Arc::new(leader_schedule_cache); let bank = bank_forks.working_bank(); + if let Some(ref shrink_paths) = config.account_shrink_paths { + bank.set_shrink_paths(shrink_paths.clone()); + } let bank_forks = Arc::new(RwLock::new(bank_forks)); let sample_performance_service = @@ -907,6 +917,7 @@ fn new_banks_from_ledger( &genesis_config, &blockstore, config.account_paths.clone(), + config.account_shrink_paths.clone(), config.snapshot_config.as_ref(), process_options, transaction_history_services diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 0eff05fa4e..69fad0c1b2 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -694,6 +694,7 @@ fn load_bank_forks( &genesis_config, &blockstore, account_paths, + None, snapshot_config.as_ref(), process_options, None, diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 68a2dc16b4..62d87d0e4c 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -33,6 +33,7 @@ pub fn load( genesis_config: &GenesisConfig, blockstore: &Blockstore, account_paths: Vec, + shrink_paths: Option>, snapshot_config: Option<&SnapshotConfig>, process_options: ProcessOptions, transaction_status_sender: Option, @@ -69,6 +70,9 @@ pub fn load( Some(&crate::builtins::get(process_options.bpf_jit)), ) .expect("Load from snapshot failed"); + if let Some(shrink_paths) = shrink_paths { + deserialized_bank.set_shrink_paths(shrink_paths); + } let deserialized_snapshot_hash = ( deserialized_bank.slot(), diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index bde0c59164..55eef03e71 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -925,7 +925,7 @@ fn load_frozen_forks( leader_schedule_cache.set_root(&new_root_bank); new_root_bank.squash(); - if last_free.elapsed() > Duration::from_secs(30) { + if last_free.elapsed() > Duration::from_secs(10) { // This could take few secs; so update last_free later new_root_bank.exhaustively_free_unused_resource(); last_free = Instant::now(); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index c92f39895b..194be1bbd9 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -442,6 +442,8 @@ pub struct AccountsDB { /// Set of storage paths to pick from pub(crate) paths: Vec, + pub shrink_paths: RwLock>>, + /// Directory of paths this accounts_db needs to hold/remove temp_paths: Option>, @@ -529,6 +531,7 @@ impl Default for AccountsDB { shrink_candidate_slots: Mutex::new(Vec::new()), write_version: AtomicU64::new(0), paths: vec![], + shrink_paths: RwLock::new(None), temp_paths: None, file_size: DEFAULT_FILE_SIZE, thread_pool: rayon::ThreadPoolBuilder::new() @@ -574,6 +577,15 @@ impl AccountsDB { new } + pub fn set_shrink_paths(&self, paths: Vec) { + assert!(!paths.is_empty()); + let mut shrink_paths = self.shrink_paths.write().unwrap(); + for path in &paths { + std::fs::create_dir_all(path).expect("Create directory failed."); + } + *shrink_paths = Some(paths); + } + pub fn file_size(&self) -> u64 { self.file_size } @@ -1004,11 +1016,28 @@ impl AccountsDB { } fn shrink_stale_slot(&self, candidates: &mut MutexGuard>) -> usize { - if let Some(slot) = self.do_next_shrink_slot(candidates) { - self.do_shrink_stale_slot(slot) - } else { - 0 + let mut shrunken_account_total = 0; + let mut shrunk_slot_count = 0; + let start = Instant::now(); + let num_roots = self.accounts_index.num_roots(); + loop { + if let Some(slot) = self.do_next_shrink_slot(candidates) { + shrunken_account_total += self.do_shrink_stale_slot(slot); + } else { + return 0; + } + if start.elapsed().as_millis() > 100 || shrunk_slot_count > num_roots / 10 { + debug!( + "do_shrink_stale_slot: {} {} {}us", + shrunk_slot_count, + candidates.len(), + start.elapsed().as_micros() + ); + break; + } + shrunk_slot_count += 1; } + shrunken_account_total } // Reads all accounts in given slot's AppendVecs and filter only to alive, @@ -1043,14 +1072,15 @@ impl AccountsDB { } else if !forced { let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8; let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8; - let skip_shrink = !sparse_by_count && !sparse_by_bytes; - info!( - "shrink_stale_slot ({}): skip_shrink: {} count: {}/{} byte: {}/{}", - slot, skip_shrink, alive_count, stored_count, written_bytes, total_bytes, - ); - if skip_shrink { + let not_sparse = !sparse_by_count && !sparse_by_bytes; + let too_small_to_shrink = total_bytes <= PAGE_SIZE; + if not_sparse || too_small_to_shrink { return 0; } + info!( + "shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}", + slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes, + ); } for store in stores.values() { let mut start = 0; @@ -1144,7 +1174,17 @@ impl AccountsDB { { new_store } else { - self.create_and_insert_store(slot, aligned_total, "shrink") + let maybe_shrink_paths = self.shrink_paths.read().unwrap(); + if let Some(ref shrink_paths) = *maybe_shrink_paths { + self.create_and_insert_store_with_paths( + slot, + aligned_total, + "shrink-w-path", + shrink_paths, + ) + } else { + self.create_and_insert_store(slot, aligned_total, "shrink") + } }; start.stop(); create_and_insert_store_elapsed = start.as_us(); @@ -1591,7 +1631,7 @@ impl AccountsDB { self.stats .create_store_count .fetch_add(1, Ordering::Relaxed); - self.create_store(slot, self.file_size, "store") + self.create_store(slot, self.file_size, "store", &self.paths) }; // try_available is like taking a lock on the store, @@ -1620,11 +1660,17 @@ impl AccountsDB { false } - fn create_store(&self, slot: Slot, size: u64, from: &str) -> Arc { - let path_index = thread_rng().gen_range(0, self.paths.len()); + fn create_store( + &self, + slot: Slot, + size: u64, + from: &str, + paths: &[PathBuf], + ) -> Arc { + let path_index = thread_rng().gen_range(0, paths.len()); let store = Arc::new(self.new_storage_entry( slot, - &Path::new(&self.paths[path_index]), + &Path::new(&paths[path_index]), self.page_align(size), )); @@ -1647,7 +1693,17 @@ impl AccountsDB { size: u64, from: &str, ) -> Arc { - let store = self.create_store(slot, size, from); + self.create_and_insert_store_with_paths(slot, size, from, &self.paths) + } + + fn create_and_insert_store_with_paths( + &self, + slot: Slot, + size: u64, + from: &str, + paths: &[PathBuf], + ) -> Arc { + let store = self.create_store(slot, size, from, paths); let store_for_index = store.clone(); self.insert_store(slot, store_for_index); diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 0a96dcf173..0a952c4b3e 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -712,6 +712,10 @@ impl AccountsIndex { .contains(&slot) } + pub fn num_roots(&self) -> usize { + self.roots_tracker.read().unwrap().roots.len() + } + pub fn all_roots(&self) -> Vec { self.roots_tracker .read() diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 47e6923b72..fe786c6ae7 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -2432,6 +2432,10 @@ impl Bank { self.rc.accounts.accounts_db.remove_unrooted_slot(slot) } + pub fn set_shrink_paths(&self, paths: Vec) { + self.rc.accounts.accounts_db.set_shrink_paths(paths); + } + fn load_accounts( &self, txs: &[Transaction], @@ -10100,12 +10104,11 @@ pub(crate) mod tests { 22 ); - let mut consumed_budgets = (0..3) + let consumed_budgets: usize = (0..3) .map(|_| bank.process_stale_slot_with_budget(0, force_to_return_alive_account)) - .collect::>(); - consumed_budgets.sort_unstable(); + .sum(); // consumed_budgets represents the count of alive accounts in the three slots 0,1,2 - assert_eq!(consumed_budgets, vec![0, 1, 9]); + assert_eq!(consumed_budgets, 10); } #[test] diff --git a/validator/src/main.rs b/validator/src/main.rs index 1769303ca5..591e6f414a 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1011,6 +1011,14 @@ pub fn main() { .multiple(true) .help("Comma separated persistent accounts location"), ) + .arg( + Arg::with_name("account_shrink_path") + .long("account-shrink-path") + .value_name("PATH") + .takes_value(true) + .multiple(true) + .help("Path to accounts shrink path which can hold a compacted account set."), + ) .arg( Arg::with_name("gossip_port") .long("gossip-port") @@ -1540,6 +1548,10 @@ pub fn main() { } else { vec![ledger_path.join("accounts")] }; + let account_shrink_paths: Option> = + values_t!(matches, "account_shrink_path", String) + .map(|shrink_paths| shrink_paths.into_iter().map(PathBuf::from).collect()) + .ok(); // Create and canonicalize account paths to avoid issues with symlink creation validator_config.account_paths = account_paths @@ -1558,6 +1570,26 @@ pub fn main() { }) .collect(); + validator_config.account_shrink_paths = account_shrink_paths.map(|paths| { + paths + .into_iter() + .map(|account_path| { + match fs::create_dir_all(&account_path) + .and_then(|_| fs::canonicalize(&account_path)) + { + Ok(account_path) => account_path, + Err(err) => { + eprintln!( + "Unable to access account path: {:?}, err: {:?}", + account_path, err + ); + exit(1); + } + } + }) + .collect() + }); + let snapshot_interval_slots = value_t_or_exit!(matches, "snapshot_interval_slots", u64); let maximum_local_snapshot_age = value_t_or_exit!(matches, "maximum_local_snapshot_age", u64); let snapshot_path = ledger_path.join("snapshot");