Persistent account storage across directories

This commit is contained in:
Sathish Ambley 2018-12-24 16:11:20 -08:00 committed by sakridge
parent 130563cd4c
commit c276375a0e
6 changed files with 700 additions and 198 deletions

View File

@ -200,6 +200,14 @@ fn main() {
.takes_value(true)
.help("Rendezvous with the vote signer at this RPC end point"),
)
.arg(
Arg::with_name("accounts")
.short("a")
.long("accounts")
.value_name("PATHS")
.takes_value(true)
.help("Comma separated persistent accounts location"),
)
.get_matches();
let mut fullnode_config = FullnodeConfig::default();
@ -209,6 +217,9 @@ fn main() {
let use_only_bootstrap_leader = matches.is_present("no_leader_rotation");
let (keypair, gossip) = parse_identity(&matches);
let ledger_path = matches.value_of("ledger").unwrap();
if let Some(paths) = matches.value_of("accounts") {
fullnode_config.account_paths = paths.to_string();
}
let cluster_entrypoint = matches
.value_of("network")
.map(|network| network.parse().expect("failed to parse network address"));

File diff suppressed because it is too large Load Diff

View File

@ -80,7 +80,7 @@ type BankStatusCache = StatusCache<BankError>;
/// Manager for the state of all accounts and programs after processing its entries.
#[derive(Default)]
pub struct Bank {
accounts: Accounts,
accounts: Option<Arc<Accounts>>,
/// A cache of signature statuses
status_cache: RwLock<BankStatusCache>,
@ -115,7 +115,12 @@ pub struct Bank {
impl Bank {
pub fn new(genesis_block: &GenesisBlock) -> Self {
Self::new_with_paths(&genesis_block, "")
}
pub fn new_with_paths(genesis_block: &GenesisBlock, paths: &str) -> Self {
let mut bank = Self::default();
bank.accounts = Some(Arc::new(Accounts::new(&paths)));
bank.process_genesis_block(genesis_block);
bank.add_builtin_programs();
bank
@ -136,6 +141,7 @@ impl Bank {
bank.parent_hash = parent.hash();
bank.collector_id = collector_id;
bank.accounts = Some(parent.accounts());
bank
}
@ -175,8 +181,8 @@ impl Bank {
let parents = self.parents();
*self.parent.write().unwrap() = None;
let parent_accounts: Vec<_> = parents.iter().map(|b| &b.accounts).collect();
self.accounts.squash(&parent_accounts);
let parent_accounts: Vec<_> = parents.iter().map(|b| b.accounts()).collect();
self.accounts().squash(&parent_accounts);
let parent_caches: Vec<_> = parents
.iter()
@ -230,7 +236,8 @@ impl Bank {
.serialize(&mut bootstrap_leader_vote_account.userdata)
.unwrap();
self.accounts.store_slow(
self.accounts().store_slow(
self.id,
self.is_root(),
&genesis_block.bootstrap_leader_vote_account_id,
&bootstrap_leader_vote_account,
@ -248,8 +255,8 @@ impl Bank {
pub fn add_native_program(&self, name: &str, program_id: &Pubkey) {
let account = native_loader::create_program_account(name);
self.accounts
.store_slow(self.is_root(), program_id, &account);
self.accounts()
.store_slow(self.id, self.is_root(), program_id, &account);
}
fn add_builtin_programs(&self) {
@ -338,11 +345,11 @@ impl Bank {
}
// TODO: put this assert back in
// assert!(!self.is_frozen());
self.accounts.lock_accounts(txs)
self.accounts().lock_accounts(txs)
}
pub fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) {
self.accounts.unlock_accounts(txs, results)
self.accounts().unlock_accounts(txs, results)
}
fn load_accounts(
@ -351,10 +358,8 @@ impl Bank {
results: Vec<Result<()>>,
error_counters: &mut ErrorCounters,
) -> Vec<Result<(InstructionAccounts, InstructionLoaders)>> {
let parents = self.parents();
let mut accounts = vec![&self.accounts];
accounts.extend(parents.iter().map(|b| &b.accounts));
Accounts::load_accounts(&accounts, txs, results, error_counters)
self.accounts()
.load_accounts(self.id, txs, results, error_counters)
}
fn check_age(
&self,
@ -461,7 +466,7 @@ impl Bank {
inc_new_counter_info!("bank-process_transactions-error_count", err_count);
}
self.accounts.increment_transaction_count(tx_count);
self.accounts().increment_transaction_count(tx_count);
inc_new_counter_info!("bank-process_transactions-txs", tx_count);
if 0 != error_counters.last_id_not_found {
@ -536,8 +541,8 @@ impl Bank {
// TODO: put this assert back in
// assert!(!self.is_frozen());
let now = Instant::now();
self.accounts
.store_accounts(self.is_root(), txs, executed, loaded_accounts);
self.accounts()
.store_accounts(self.id, self.is_root(), txs, executed, loaded_accounts);
// once committed there is no way to unroll
let write_elapsed = now.elapsed();
@ -622,7 +627,7 @@ impl Bank {
}
account.tokens -= tokens;
self.accounts.store_slow(true, pubkey, &account);
self.accounts().store_slow(self.id, true, pubkey, &account);
Ok(())
}
None => Err(BankError::AccountNotFound),
@ -632,22 +637,27 @@ impl Bank {
pub fn deposit(&self, pubkey: &Pubkey, tokens: u64) {
let mut account = self.get_account(pubkey).unwrap_or_default();
account.tokens += tokens;
self.accounts.store_slow(self.is_root(), pubkey, &account);
self.accounts().store_slow(self.id, self.is_root(), pubkey, &account);
}
fn accounts(&self) -> Arc<Accounts> {
if let Some(accounts) = &self.accounts {
accounts.clone()
} else {
Arc::new(Accounts::new(""))
}
}
pub fn get_account(&self, pubkey: &Pubkey) -> Option<Account> {
let parents = self.parents();
let mut accounts = vec![&self.accounts];
accounts.extend(parents.iter().map(|b| &b.accounts));
Accounts::load_slow(&accounts, pubkey)
self.accounts().load_slow(self.id, pubkey)
}
pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option<Account> {
Accounts::load_slow(&[&self.accounts], pubkey)
self.accounts().load_slow(self.id, pubkey)
}
pub fn transaction_count(&self) -> u64 {
self.accounts.transaction_count()
self.accounts().transaction_count()
}
pub fn get_signature_status(&self, signature: &Signature) -> Option<Result<()>> {
@ -669,12 +679,11 @@ impl Bank {
fn hash_internal_state(&self) -> Hash {
// If there are no accounts, return the same hash as we did before
// checkpointing.
let accounts = &self.accounts.accounts_db.read().unwrap().accounts;
if accounts.is_empty() {
if !self.accounts().has_accounts(self.id) {
return self.parent_hash;
}
let accounts_delta_hash = self.accounts.hash_internal_state();
let accounts_delta_hash = self.accounts().hash_internal_state(self.id);
extend_and_hash(&self.parent_hash, &serialize(&accounts_delta_hash).unwrap())
}
@ -682,34 +691,17 @@ impl Bank {
where
F: Fn(&VoteState) -> bool,
{
let parents = self.parents();
let mut accounts = vec![&self.accounts];
accounts.extend(parents.iter().map(|b| &b.accounts));
let mut exists = HashSet::new();
accounts
self.accounts()
.accounts_db
.get_vote_accounts(self.id)
.iter()
.flat_map(|account| {
let accounts_db = account.accounts_db.read().unwrap();
let vote_states: Vec<_> = accounts_db
.accounts
.iter()
.filter_map(|(key, account)| {
if exists.contains(key) {
None
} else {
exists.insert(key.clone());
if vote_program::check_id(&account.owner) {
if let Ok(vote_state) = VoteState::deserialize(&account.userdata) {
if cond(&vote_state) {
return Some(vote_state);
}
}
}
None
}
})
.collect();
vote_states
.filter_map(|account| {
if let Ok(vote_state) = VoteState::deserialize(&account.userdata) {
if cond(&vote_state) {
return Some(vote_state);
}
}
None
})
.collect()
}

View File

@ -71,6 +71,7 @@ pub struct FullnodeConfig {
pub blockstream: Option<String>,
pub storage_rotate_count: u64,
pub tick_config: PohServiceConfig,
pub account_paths: String,
}
impl Default for FullnodeConfig {
fn default() -> Self {
@ -84,6 +85,7 @@ impl Default for FullnodeConfig {
blockstream: None,
storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE,
tick_config: PohServiceConfig::default(),
account_paths: "0,1,2,3".to_string(),
}
}
}
@ -123,7 +125,7 @@ impl Fullnode {
assert_eq!(id, node.info.id);
let (mut bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) =
new_banks_from_blocktree(ledger_path);
new_banks_from_blocktree(ledger_path, &config.account_paths);
let exit = Arc::new(AtomicBool::new(false));
let bank_info = &bank_forks_info[0];
@ -405,6 +407,7 @@ impl Fullnode {
pub fn new_banks_from_blocktree(
blocktree_path: &str,
account_paths: &str,
) -> (BankForks, Vec<BankForksInfo>, Blocktree, Receiver<bool>) {
let genesis_block =
GenesisBlock::load(blocktree_path).expect("Expected to successfully open genesis block");
@ -743,7 +746,7 @@ mod tests {
// Close the validator so that rocksdb has locks available
validator_exit();
let (bank_forks, bank_forks_info, _, _) = new_banks_from_blocktree(&validator_ledger_path);
let (bank_forks, bank_forks_info, _, _) = new_banks_from_blocktree(&validator_ledger_path, "accounts");
let bank = bank_forks.working_bank();
let entry_height = bank_forks_info[0].entry_height;

View File

@ -499,7 +499,7 @@ mod test {
let (to_leader_sender, _to_leader_receiver) = channel();
{
let (bank_forks, bank_forks_info, blocktree, l_receiver) =
new_banks_from_blocktree(&my_ledger_path);
new_banks_from_blocktree(&my_ledger_path, "");
let bank = bank_forks.working_bank();
let last_entry_id = bank_forks_info[0].last_entry_id;

View File

@ -911,7 +911,7 @@ fn test_leader_to_validator_transition() {
leader_exit();
info!("Check the ledger to make sure it's the right height...");
let bank_forks = new_banks_from_blocktree(&leader_ledger_path).0;
let bank_forks = new_banks_from_blocktree(&leader_ledger_path, "").0;
let _bank = bank_forks.working_bank();
remove_dir_all(leader_ledger_path).unwrap();