Add shrink paths (#14238)

This commit is contained in:
sakridge 2020-12-21 21:33:37 -08:00 committed by GitHub
parent 3316e7166c
commit baa9602411
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 132 additions and 21 deletions

View File

@ -79,6 +79,7 @@ pub struct ValidatorConfig {
pub expected_shred_version: Option<u16>, pub expected_shred_version: Option<u16>,
pub voting_disabled: bool, pub voting_disabled: bool,
pub account_paths: Vec<PathBuf>, pub account_paths: Vec<PathBuf>,
pub account_shrink_paths: Option<Vec<PathBuf>>,
pub rpc_config: JsonRpcConfig, pub rpc_config: JsonRpcConfig,
pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub) pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub)
pub pubsub_config: PubSubConfig, pub pubsub_config: PubSubConfig,
@ -120,6 +121,7 @@ impl Default for ValidatorConfig {
voting_disabled: false, voting_disabled: false,
max_ledger_shreds: None, max_ledger_shreds: None,
account_paths: Vec::new(), account_paths: Vec::new(),
account_shrink_paths: None,
rpc_config: JsonRpcConfig::default(), rpc_config: JsonRpcConfig::default(),
rpc_addrs: None, rpc_addrs: None,
pubsub_config: PubSubConfig::default(), pubsub_config: PubSubConfig::default(),
@ -272,6 +274,11 @@ impl Validator {
for accounts_path in &config.account_paths { for accounts_path in &config.account_paths {
cleanup_accounts_path(accounts_path); cleanup_accounts_path(accounts_path);
} }
if let Some(ref shrink_paths) = config.account_shrink_paths {
for accounts_path in shrink_paths {
cleanup_accounts_path(accounts_path);
}
}
start.stop(); start.stop();
info!("done. {}", start); info!("done. {}", start);
@ -311,6 +318,9 @@ impl Validator {
let leader_schedule_cache = Arc::new(leader_schedule_cache); let leader_schedule_cache = Arc::new(leader_schedule_cache);
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
if let Some(ref shrink_paths) = config.account_shrink_paths {
bank.set_shrink_paths(shrink_paths.clone());
}
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let sample_performance_service = let sample_performance_service =
@ -907,6 +917,7 @@ fn new_banks_from_ledger(
&genesis_config, &genesis_config,
&blockstore, &blockstore,
config.account_paths.clone(), config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(), config.snapshot_config.as_ref(),
process_options, process_options,
transaction_history_services transaction_history_services

View File

@ -694,6 +694,7 @@ fn load_bank_forks(
&genesis_config, &genesis_config,
&blockstore, &blockstore,
account_paths, account_paths,
None,
snapshot_config.as_ref(), snapshot_config.as_ref(),
process_options, process_options,
None, None,

View File

@ -33,6 +33,7 @@ pub fn load(
genesis_config: &GenesisConfig, genesis_config: &GenesisConfig,
blockstore: &Blockstore, blockstore: &Blockstore,
account_paths: Vec<PathBuf>, account_paths: Vec<PathBuf>,
shrink_paths: Option<Vec<PathBuf>>,
snapshot_config: Option<&SnapshotConfig>, snapshot_config: Option<&SnapshotConfig>,
process_options: ProcessOptions, process_options: ProcessOptions,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
@ -69,6 +70,9 @@ pub fn load(
Some(&crate::builtins::get(process_options.bpf_jit)), Some(&crate::builtins::get(process_options.bpf_jit)),
) )
.expect("Load from snapshot failed"); .expect("Load from snapshot failed");
if let Some(shrink_paths) = shrink_paths {
deserialized_bank.set_shrink_paths(shrink_paths);
}
let deserialized_snapshot_hash = ( let deserialized_snapshot_hash = (
deserialized_bank.slot(), deserialized_bank.slot(),

View File

@ -925,7 +925,7 @@ fn load_frozen_forks(
leader_schedule_cache.set_root(&new_root_bank); leader_schedule_cache.set_root(&new_root_bank);
new_root_bank.squash(); new_root_bank.squash();
if last_free.elapsed() > Duration::from_secs(30) { if last_free.elapsed() > Duration::from_secs(10) {
// This could take few secs; so update last_free later // This could take few secs; so update last_free later
new_root_bank.exhaustively_free_unused_resource(); new_root_bank.exhaustively_free_unused_resource();
last_free = Instant::now(); last_free = Instant::now();

View File

@ -442,6 +442,8 @@ pub struct AccountsDB {
/// Set of storage paths to pick from /// Set of storage paths to pick from
pub(crate) paths: Vec<PathBuf>, pub(crate) paths: Vec<PathBuf>,
pub shrink_paths: RwLock<Option<Vec<PathBuf>>>,
/// Directory of paths this accounts_db needs to hold/remove /// Directory of paths this accounts_db needs to hold/remove
temp_paths: Option<Vec<TempDir>>, temp_paths: Option<Vec<TempDir>>,
@ -529,6 +531,7 @@ impl Default for AccountsDB {
shrink_candidate_slots: Mutex::new(Vec::new()), shrink_candidate_slots: Mutex::new(Vec::new()),
write_version: AtomicU64::new(0), write_version: AtomicU64::new(0),
paths: vec![], paths: vec![],
shrink_paths: RwLock::new(None),
temp_paths: None, temp_paths: None,
file_size: DEFAULT_FILE_SIZE, file_size: DEFAULT_FILE_SIZE,
thread_pool: rayon::ThreadPoolBuilder::new() thread_pool: rayon::ThreadPoolBuilder::new()
@ -574,6 +577,15 @@ impl AccountsDB {
new new
} }
pub fn set_shrink_paths(&self, paths: Vec<PathBuf>) {
assert!(!paths.is_empty());
let mut shrink_paths = self.shrink_paths.write().unwrap();
for path in &paths {
std::fs::create_dir_all(path).expect("Create directory failed.");
}
*shrink_paths = Some(paths);
}
pub fn file_size(&self) -> u64 { pub fn file_size(&self) -> u64 {
self.file_size self.file_size
} }
@ -1004,11 +1016,28 @@ impl AccountsDB {
} }
fn shrink_stale_slot(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> usize { fn shrink_stale_slot(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> usize {
if let Some(slot) = self.do_next_shrink_slot(candidates) { let mut shrunken_account_total = 0;
self.do_shrink_stale_slot(slot) let mut shrunk_slot_count = 0;
} else { let start = Instant::now();
0 let num_roots = self.accounts_index.num_roots();
loop {
if let Some(slot) = self.do_next_shrink_slot(candidates) {
shrunken_account_total += self.do_shrink_stale_slot(slot);
} else {
return 0;
}
if start.elapsed().as_millis() > 100 || shrunk_slot_count > num_roots / 10 {
debug!(
"do_shrink_stale_slot: {} {} {}us",
shrunk_slot_count,
candidates.len(),
start.elapsed().as_micros()
);
break;
}
shrunk_slot_count += 1;
} }
shrunken_account_total
} }
// Reads all accounts in given slot's AppendVecs and filter only to alive, // Reads all accounts in given slot's AppendVecs and filter only to alive,
@ -1043,14 +1072,15 @@ impl AccountsDB {
} else if !forced { } else if !forced {
let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8; let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8;
let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8; let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8;
let skip_shrink = !sparse_by_count && !sparse_by_bytes; let not_sparse = !sparse_by_count && !sparse_by_bytes;
info!( let too_small_to_shrink = total_bytes <= PAGE_SIZE;
"shrink_stale_slot ({}): skip_shrink: {} count: {}/{} byte: {}/{}", if not_sparse || too_small_to_shrink {
slot, skip_shrink, alive_count, stored_count, written_bytes, total_bytes,
);
if skip_shrink {
return 0; return 0;
} }
info!(
"shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}",
slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes,
);
} }
for store in stores.values() { for store in stores.values() {
let mut start = 0; let mut start = 0;
@ -1144,7 +1174,17 @@ impl AccountsDB {
{ {
new_store new_store
} else { } else {
self.create_and_insert_store(slot, aligned_total, "shrink") let maybe_shrink_paths = self.shrink_paths.read().unwrap();
if let Some(ref shrink_paths) = *maybe_shrink_paths {
self.create_and_insert_store_with_paths(
slot,
aligned_total,
"shrink-w-path",
shrink_paths,
)
} else {
self.create_and_insert_store(slot, aligned_total, "shrink")
}
}; };
start.stop(); start.stop();
create_and_insert_store_elapsed = start.as_us(); create_and_insert_store_elapsed = start.as_us();
@ -1591,7 +1631,7 @@ impl AccountsDB {
self.stats self.stats
.create_store_count .create_store_count
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
self.create_store(slot, self.file_size, "store") self.create_store(slot, self.file_size, "store", &self.paths)
}; };
// try_available is like taking a lock on the store, // try_available is like taking a lock on the store,
@ -1620,11 +1660,17 @@ impl AccountsDB {
false false
} }
fn create_store(&self, slot: Slot, size: u64, from: &str) -> Arc<AccountStorageEntry> { fn create_store(
let path_index = thread_rng().gen_range(0, self.paths.len()); &self,
slot: Slot,
size: u64,
from: &str,
paths: &[PathBuf],
) -> Arc<AccountStorageEntry> {
let path_index = thread_rng().gen_range(0, paths.len());
let store = Arc::new(self.new_storage_entry( let store = Arc::new(self.new_storage_entry(
slot, slot,
&Path::new(&self.paths[path_index]), &Path::new(&paths[path_index]),
self.page_align(size), self.page_align(size),
)); ));
@ -1647,7 +1693,17 @@ impl AccountsDB {
size: u64, size: u64,
from: &str, from: &str,
) -> Arc<AccountStorageEntry> { ) -> Arc<AccountStorageEntry> {
let store = self.create_store(slot, size, from); self.create_and_insert_store_with_paths(slot, size, from, &self.paths)
}
fn create_and_insert_store_with_paths(
&self,
slot: Slot,
size: u64,
from: &str,
paths: &[PathBuf],
) -> Arc<AccountStorageEntry> {
let store = self.create_store(slot, size, from, paths);
let store_for_index = store.clone(); let store_for_index = store.clone();
self.insert_store(slot, store_for_index); self.insert_store(slot, store_for_index);

View File

@ -712,6 +712,10 @@ impl<T: 'static + Clone> AccountsIndex<T> {
.contains(&slot) .contains(&slot)
} }
pub fn num_roots(&self) -> usize {
self.roots_tracker.read().unwrap().roots.len()
}
pub fn all_roots(&self) -> Vec<Slot> { pub fn all_roots(&self) -> Vec<Slot> {
self.roots_tracker self.roots_tracker
.read() .read()

View File

@ -2432,6 +2432,10 @@ impl Bank {
self.rc.accounts.accounts_db.remove_unrooted_slot(slot) self.rc.accounts.accounts_db.remove_unrooted_slot(slot)
} }
pub fn set_shrink_paths(&self, paths: Vec<PathBuf>) {
self.rc.accounts.accounts_db.set_shrink_paths(paths);
}
fn load_accounts( fn load_accounts(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
@ -10100,12 +10104,11 @@ pub(crate) mod tests {
22 22
); );
let mut consumed_budgets = (0..3) let consumed_budgets: usize = (0..3)
.map(|_| bank.process_stale_slot_with_budget(0, force_to_return_alive_account)) .map(|_| bank.process_stale_slot_with_budget(0, force_to_return_alive_account))
.collect::<Vec<_>>(); .sum();
consumed_budgets.sort_unstable();
// consumed_budgets represents the count of alive accounts in the three slots 0,1,2 // consumed_budgets represents the count of alive accounts in the three slots 0,1,2
assert_eq!(consumed_budgets, vec![0, 1, 9]); assert_eq!(consumed_budgets, 10);
} }
#[test] #[test]

View File

@ -1011,6 +1011,14 @@ pub fn main() {
.multiple(true) .multiple(true)
.help("Comma separated persistent accounts location"), .help("Comma separated persistent accounts location"),
) )
.arg(
Arg::with_name("account_shrink_path")
.long("account-shrink-path")
.value_name("PATH")
.takes_value(true)
.multiple(true)
.help("Path to accounts shrink path which can hold a compacted account set."),
)
.arg( .arg(
Arg::with_name("gossip_port") Arg::with_name("gossip_port")
.long("gossip-port") .long("gossip-port")
@ -1540,6 +1548,10 @@ pub fn main() {
} else { } else {
vec![ledger_path.join("accounts")] vec![ledger_path.join("accounts")]
}; };
let account_shrink_paths: Option<Vec<PathBuf>> =
values_t!(matches, "account_shrink_path", String)
.map(|shrink_paths| shrink_paths.into_iter().map(PathBuf::from).collect())
.ok();
// Create and canonicalize account paths to avoid issues with symlink creation // Create and canonicalize account paths to avoid issues with symlink creation
validator_config.account_paths = account_paths validator_config.account_paths = account_paths
@ -1558,6 +1570,26 @@ pub fn main() {
}) })
.collect(); .collect();
validator_config.account_shrink_paths = account_shrink_paths.map(|paths| {
paths
.into_iter()
.map(|account_path| {
match fs::create_dir_all(&account_path)
.and_then(|_| fs::canonicalize(&account_path))
{
Ok(account_path) => account_path,
Err(err) => {
eprintln!(
"Unable to access account path: {:?}, err: {:?}",
account_path, err
);
exit(1);
}
}
})
.collect()
});
let snapshot_interval_slots = value_t_or_exit!(matches, "snapshot_interval_slots", u64); let snapshot_interval_slots = value_t_or_exit!(matches, "snapshot_interval_slots", u64);
let maximum_local_snapshot_age = value_t_or_exit!(matches, "maximum_local_snapshot_age", u64); let maximum_local_snapshot_age = value_t_or_exit!(matches, "maximum_local_snapshot_age", u64);
let snapshot_path = ledger_path.join("snapshot"); let snapshot_path = ledger_path.join("snapshot");