Rework Accounts for fast squash, hashing state and checkpoint recovery. (#3613)

* accounts rewrite

* ignore grow tests

* skip duplicate roots

* allow for a root race

* logger

* accounts_index tests

* tests

* tests
This commit is contained in:
anatoly yakovenko 2019-04-15 17:15:50 -07:00 committed by GitHub
parent 2bbed7727f
commit 68fc303b9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 838 additions and 641 deletions

1
Cargo.lock generated
View File

@ -2535,6 +2535,7 @@ dependencies = [
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"memmap 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -92,6 +92,7 @@ fn test_leader_failure_4() {
}
#[test]
fn test_two_unbalanced_stakes() {
solana_logger::setup();
let mut fullnode_config = FullnodeConfig::default();
let num_ticks_per_second = 100;
let num_ticks_per_slot = 160;

View File

@ -19,6 +19,7 @@ libloading = "0.5.0"
log = "0.4.2"
memmap = "0.6.2"
rand = "0.6.5"
rayon = "1.0.0"
serde = "1.0.88"
serde_derive = "1.0.88"
serde_json = "1.0.38"

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,224 @@
use hashbrown::{HashMap, HashSet};
use log::*;
use solana_sdk::pubkey::Pubkey;
pub type Fork = u64;
#[derive(Default)]
pub struct AccountsIndex<T> {
account_maps: HashMap<Pubkey, Vec<(Fork, T)>>,
roots: HashSet<Fork>,
//This value that needs to be stored to recover the index from AppendVec
last_root: Fork,
}
impl<T: Clone> AccountsIndex<T> {
/// Get an account
/// The latest account that appears in `ancestors` or `roots` is returned.
pub fn get(&self, pubkey: &Pubkey, ancestors: &HashMap<Fork, usize>) -> Option<&T> {
let list = self.account_maps.get(pubkey)?;
let mut max = 0;
let mut rv = None;
for e in list.iter().rev() {
if e.0 >= max && (ancestors.get(&e.0).is_some() || self.is_root(e.0)) {
trace!("GET {} {:?}", e.0, ancestors);
rv = Some(&e.1);
max = e.0;
}
}
rv
}
/// Insert a new fork.
/// @retval - The return value contains any squashed accounts that can freed from storage.
pub fn insert(&mut self, fork: Fork, pubkey: &Pubkey, account_info: T) -> Vec<(Fork, T)> {
let mut rv = vec![];
let mut fork_vec: Vec<(Fork, T)> = vec![];
{
let entry = self.account_maps.entry(*pubkey).or_insert(vec![]);
std::mem::swap(entry, &mut fork_vec);
};
// filter out old entries
rv.extend(fork_vec.iter().filter(|(f, _)| *f == fork).cloned());
fork_vec.retain(|(f, _)| *f != fork);
// add the new entry
fork_vec.push((fork, account_info));
rv.extend(
fork_vec
.iter()
.filter(|(fork, _)| self.is_purged(*fork))
.cloned(),
);
fork_vec.retain(|(fork, _)| !self.is_purged(*fork));
{
let entry = self.account_maps.entry(*pubkey).or_insert(vec![]);
std::mem::swap(entry, &mut fork_vec);
};
rv
}
fn is_purged(&self, fork: Fork) -> bool {
!self.is_root(fork) && fork < self.last_root
}
pub fn is_root(&self, fork: Fork) -> bool {
self.roots.contains(&fork)
}
pub fn add_root(&mut self, fork: Fork) {
if fork > self.last_root {
self.last_root = fork;
}
self.roots.insert(fork);
}
/// Remove the fork when the storage for the fork is freed
/// Accounts no longer reference this fork.
pub fn cleanup_dead_fork(&mut self, fork: Fork) {
self.roots.remove(&fork);
}
}
#[cfg(test)]
mod tests {
use super::*;
use solana_sdk::signature::{Keypair, KeypairUtil};
#[test]
fn test_get_empty() {
let key = Keypair::new();
let index = AccountsIndex::<bool>::default();
let ancestors = HashMap::new();
assert_eq!(index.get(&key.pubkey(), &ancestors), None);
}
#[test]
fn test_insert_no_ancestors() {
let key = Keypair::new();
let mut index = AccountsIndex::<bool>::default();
let gc = index.insert(0, &key.pubkey(), true);
assert!(gc.is_empty());
let ancestors = HashMap::new();
assert_eq!(index.get(&key.pubkey(), &ancestors), None);
}
#[test]
fn test_insert_wrong_ancestors() {
let key = Keypair::new();
let mut index = AccountsIndex::<bool>::default();
let gc = index.insert(0, &key.pubkey(), true);
assert!(gc.is_empty());
let ancestors = vec![(1, 1)].into_iter().collect();
assert_eq!(index.get(&key.pubkey(), &ancestors), None);
}
#[test]
fn test_insert_with_ancestors() {
let key = Keypair::new();
let mut index = AccountsIndex::<bool>::default();
let gc = index.insert(0, &key.pubkey(), true);
assert!(gc.is_empty());
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&true));
}
#[test]
fn test_is_root() {
let mut index = AccountsIndex::<bool>::default();
assert!(!index.is_root(0));
index.add_root(0);
assert!(index.is_root(0));
}
#[test]
fn test_insert_with_root() {
let key = Keypair::new();
let mut index = AccountsIndex::<bool>::default();
let gc = index.insert(0, &key.pubkey(), true);
assert!(gc.is_empty());
let ancestors = vec![].into_iter().collect();
index.add_root(0);
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&true));
}
#[test]
fn test_is_purged() {
let mut index = AccountsIndex::<bool>::default();
assert!(!index.is_purged(0));
index.add_root(1);
assert!(index.is_purged(0));
}
#[test]
fn test_max_last_root() {
let mut index = AccountsIndex::<bool>::default();
index.add_root(1);
index.add_root(0);
assert_eq!(index.last_root, 1);
}
#[test]
fn test_cleanup_first() {
let mut index = AccountsIndex::<bool>::default();
index.add_root(1);
index.add_root(0);
index.cleanup_dead_fork(0);
assert!(index.is_root(1));
assert!(!index.is_root(0));
}
#[test]
fn test_cleanup_last() {
//this behavior might be undefined, clean up should only occur on older forks
let mut index = AccountsIndex::<bool>::default();
index.add_root(1);
index.add_root(0);
index.cleanup_dead_fork(1);
assert!(!index.is_root(1));
assert!(index.is_root(0));
}
#[test]
fn test_update_last_wins() {
let key = Keypair::new();
let mut index = AccountsIndex::<bool>::default();
let ancestors = vec![(0, 0)].into_iter().collect();
let gc = index.insert(0, &key.pubkey(), true);
assert!(gc.is_empty());
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&true));
let gc = index.insert(0, &key.pubkey(), false);
assert_eq!(gc, vec![(0, true)]);
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&false));
}
#[test]
fn test_update_new_fork() {
let key = Keypair::new();
let mut index = AccountsIndex::<bool>::default();
let ancestors = vec![(0, 0)].into_iter().collect();
let gc = index.insert(0, &key.pubkey(), true);
assert!(gc.is_empty());
let gc = index.insert(1, &key.pubkey(), false);
assert!(gc.is_empty());
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&true));
let ancestors = vec![(1, 0)].into_iter().collect();
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&false));
}
#[test]
fn test_update_gc_purged_fork() {
let key = Keypair::new();
let mut index = AccountsIndex::<bool>::default();
let gc = index.insert(0, &key.pubkey(), true);
assert!(gc.is_empty());
index.add_root(1);
let gc = index.insert(1, &key.pubkey(), false);
assert_eq!(gc, vec![(0, true)]);
let ancestors = vec![].into_iter().collect();
assert_eq!(index.get(&key.pubkey(), &ancestors), Some(&false));
}
}

View File

@ -1,5 +1,6 @@
use memmap::MmapMut;
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom, Write};
use std::mem;
@ -15,6 +16,18 @@ macro_rules! align_up {
};
}
//TODO: This structure should contain references
/// StoredAccount contains enough context to recover the index from storage itself
#[derive(Clone, PartialEq, Debug)]
pub struct StoredAccount {
/// global write version
pub write_version: u64,
/// key for the account
pub pubkey: Pubkey,
/// account data
pub account: Account,
}
pub struct AppendVec {
map: MmapMut,
// This mutex forces append to be single threaded, but concurrent with reads
@ -119,31 +132,40 @@ impl AppendVec {
Some(pos)
}
//TODO: Make this safer
//StoredAccount should be a struct of references with the same lifetime as &self
//The structure should have a method to clone the account out
#[allow(clippy::transmute_ptr_to_ptr)]
pub fn get_account(&self, offset: usize) -> &Account {
let account: *mut Account = {
let data = self.get_slice(offset, mem::size_of::<Account>());
unsafe { std::mem::transmute::<*const u8, *mut Account>(data.as_ptr()) }
pub fn get_account(&self, offset: usize) -> &StoredAccount {
let account: *mut StoredAccount = {
let data = self.get_slice(offset, mem::size_of::<StoredAccount>());
unsafe { std::mem::transmute::<*const u8, *mut StoredAccount>(data.as_ptr()) }
};
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
let data_at = align_up!(offset + mem::size_of::<Account>(), mem::size_of::<u64>());
let account_ref: &mut Account = unsafe { &mut *account };
let data = self.get_slice(data_at, account_ref.data.len());
let data_at = align_up!(
offset + mem::size_of::<StoredAccount>(),
mem::size_of::<u64>()
);
let account_ref: &mut StoredAccount = unsafe { &mut *account };
let data = self.get_slice(data_at, account_ref.account.data.len());
unsafe {
let mut new_data = Vec::from_raw_parts(data.as_mut_ptr(), data.len(), data.len());
std::mem::swap(&mut account_ref.data, &mut new_data);
std::mem::swap(&mut account_ref.account.data, &mut new_data);
std::mem::forget(new_data);
};
account_ref
}
pub fn accounts(&self, mut start: usize) -> Vec<&Account> {
pub fn accounts(&self, mut start: usize) -> Vec<&StoredAccount> {
let mut accounts = vec![];
loop {
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
let end = align_up!(start + mem::size_of::<Account>(), mem::size_of::<u64>());
let end = align_up!(
start + mem::size_of::<StoredAccount>(),
mem::size_of::<u64>()
);
if end > self.len() {
break;
}
@ -151,19 +173,22 @@ impl AppendVec {
accounts.push(first);
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may
//crash on some architectures.
let data_at = align_up!(start + mem::size_of::<Account>(), mem::size_of::<u64>());
let next = align_up!(data_at + first.data.len(), mem::size_of::<u64>());
let data_at = align_up!(
start + mem::size_of::<StoredAccount>(),
mem::size_of::<u64>()
);
let next = align_up!(data_at + first.account.data.len(), mem::size_of::<u64>());
start = next;
}
accounts
}
pub fn append_account(&self, account: &Account) -> Option<usize> {
let acc_ptr = account as *const Account;
let data_len = account.data.len();
let data_ptr = account.data.as_ptr();
pub fn append_account(&self, account: &StoredAccount) -> Option<usize> {
let acc_ptr = account as *const StoredAccount;
let data_len = account.account.data.len();
let data_ptr = account.account.data.as_ptr();
let ptrs = [
(acc_ptr as *const u8, mem::size_of::<Account>()),
(acc_ptr as *const u8, mem::size_of::<StoredAccount>()),
(data_ptr, data_len),
];
self.append_ptrs(&ptrs)
@ -171,6 +196,7 @@ impl AppendVec {
}
pub mod test_utils {
use super::StoredAccount;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use solana_sdk::account::Account;
@ -200,11 +226,15 @@ pub mod test_utils {
TempFile { path: buf }
}
pub fn create_test_account(sample: usize) -> Account {
pub fn create_test_account(sample: usize) -> StoredAccount {
let data_len = sample % 256;
let mut account = Account::new(sample as u64, 0, &Pubkey::default());
account.data = (0..data_len).map(|_| data_len as u8).collect();
account
StoredAccount {
write_version: 0,
pubkey: Pubkey::default(),
account,
}
}
}

View File

@ -112,9 +112,6 @@ pub struct Bank {
/// where all the Accounts are stored
accounts: Arc<Accounts>,
/// Bank accounts fork id
accounts_id: u64,
/// A cache of signature statuses
status_cache: Arc<RwLock<BankStatusCache>>,
@ -124,6 +121,9 @@ pub struct Bank {
/// Previous checkpoint of this bank
parent: RwLock<Option<Arc<Bank>>>,
/// The set of parents including this bank
ancestors: HashMap<u64, usize>,
/// Hash of this Bank's state. Only meaningful after freezing.
hash: RwLock<Hash>,
@ -182,7 +182,8 @@ impl Bank {
pub fn new_with_paths(genesis_block: &GenesisBlock, paths: Option<String>) -> Self {
let mut bank = Self::default();
bank.accounts = Arc::new(Accounts::new(bank.slot, paths));
bank.ancestors.insert(bank.slot(), 0);
bank.accounts = Arc::new(Accounts::new(paths));
bank.process_genesis_block(genesis_block);
// genesis needs stakes for all epochs up to the epoch implied by
// slot = 0 and genesis configuration
@ -218,12 +219,7 @@ impl Bank {
bank.parent_hash = parent.hash();
bank.collector_id = *collector_id;
// Accounts needs a unique id
static BANK_ACCOUNTS_ID: AtomicUsize = AtomicUsize::new(1);
bank.accounts_id = BANK_ACCOUNTS_ID.fetch_add(1, Ordering::Relaxed) as u64;
bank.accounts = parent.accounts.clone();
bank.accounts
.new_from_parent(bank.accounts_id, parent.accounts_id);
bank.accounts = Arc::new(Accounts::new_from_parent(&parent.accounts));
bank.epoch_vote_accounts = {
let mut epoch_vote_accounts = parent.epoch_vote_accounts.clone();
@ -236,6 +232,10 @@ impl Bank {
}
epoch_vote_accounts
};
bank.ancestors.insert(bank.slot(), 0);
bank.parents().iter().enumerate().for_each(|(i, p)| {
bank.ancestors.insert(p.slot(), i + 1);
});
bank
}
@ -274,7 +274,10 @@ impl Bank {
*self.parent.write().unwrap() = None;
let squash_accounts_start = Instant::now();
self.accounts.squash(self.accounts_id);
for p in &parents {
// root forks cannot be purged
self.accounts.add_root(p.slot());
}
let squash_accounts_ms = duration_as_ms(&squash_accounts_start.elapsed());
let squash_cache_start = Instant::now();
@ -503,7 +506,7 @@ impl Bank {
}
// TODO: put this assert back in
// assert!(!self.is_frozen());
let results = self.accounts.lock_accounts(self.accounts_id, txs);
let results = self.accounts.lock_accounts(txs);
LockedAccountsResults::new(results, &self, txs)
}
@ -511,7 +514,6 @@ impl Bank {
if locked_accounts_results.needs_unlock {
locked_accounts_results.needs_unlock = false;
self.accounts.unlock_accounts(
self.accounts_id,
locked_accounts_results.transactions(),
locked_accounts_results.locked_accounts_results(),
)
@ -525,7 +527,7 @@ impl Bank {
error_counters: &mut ErrorCounters,
) -> Vec<Result<(InstructionAccounts, InstructionLoaders)>> {
self.accounts.load_accounts(
self.accounts_id,
&self.ancestors,
txs,
results,
&self.fee_calculator,
@ -578,12 +580,6 @@ impl Bank {
lock_results: Vec<Result<()>>,
error_counters: &mut ErrorCounters,
) -> Vec<Result<()>> {
let mut ancestors = HashMap::new();
ancestors.insert(self.slot(), 0);
self.parents().iter().enumerate().for_each(|(i, p)| {
ancestors.insert(p.slot(), i + 1);
});
let rcache = self.status_cache.read().unwrap();
txs.iter()
.zip(lock_results.into_iter())
@ -596,7 +592,7 @@ impl Bank {
.get_signature_status(
&tx.signatures[0],
&tx.message().recent_blockhash,
&ancestors,
&self.ancestors,
)
.is_some()
{
@ -761,7 +757,7 @@ impl Bank {
// assert!(!self.is_frozen());
let now = Instant::now();
self.accounts
.store_accounts(self.accounts_id, txs, executed, loaded_accounts);
.store_accounts(self.slot(), txs, executed, loaded_accounts);
self.store_vote_accounts(txs, executed, loaded_accounts);
@ -828,8 +824,7 @@ impl Bank {
}
fn store(&self, pubkey: &Pubkey, account: &Account) {
self.accounts.store_slow(self.accounts_id, pubkey, &account);
self.accounts.store_slow(self.slot(), pubkey, account);
if solana_vote_api::check_id(&account.owner) {
let mut vote_accounts = self.vote_accounts.write().unwrap();
if account.lamports != 0 {
@ -863,19 +858,19 @@ impl Bank {
}
pub fn get_account(&self, pubkey: &Pubkey) -> Option<Account> {
self.accounts.load_slow(self.accounts_id, pubkey)
self.accounts.load_slow(&self.ancestors, pubkey)
}
pub fn get_program_accounts_modified_since_parent(
&self,
program_id: &Pubkey,
) -> Vec<(Pubkey, Account)> {
self.accounts
.load_by_program_slow_no_parent(self.accounts_id, program_id)
self.accounts.load_by_program(self.slot(), program_id)
}
pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option<Account> {
self.accounts.load_slow_no_parent(self.accounts_id, pubkey)
let just_self: HashMap<u64, usize> = vec![(self.slot(), 0)].into_iter().collect();
self.accounts.load_slow(&just_self, pubkey)
}
pub fn transaction_count(&self) -> u64 {
@ -890,13 +885,8 @@ impl Bank {
&self,
signature: &Signature,
) -> Option<(usize, Result<()>)> {
let mut ancestors = HashMap::new();
ancestors.insert(self.slot(), 0);
self.parents().iter().enumerate().for_each(|(i, p)| {
ancestors.insert(p.slot(), i + 1);
});
let rcache = self.status_cache.read().unwrap();
rcache.get_signature_status_slow(signature, &ancestors)
rcache.get_signature_status_slow(signature, &self.ancestors)
}
pub fn get_signature_status(&self, signature: &Signature) -> Option<Result<()>> {
@ -913,11 +903,11 @@ impl Bank {
fn hash_internal_state(&self) -> Hash {
// If there are no accounts, return the same hash as we did before
// checkpointing.
if !self.accounts.has_accounts(self.accounts_id) {
if !self.accounts.has_accounts(self.slot()) {
return self.parent_hash;
}
let accounts_delta_hash = self.accounts.hash_internal_state(self.accounts_id);
let accounts_delta_hash = self.accounts.hash_internal_state(self.slot());
extend_and_hash(&self.parent_hash, &serialize(&accounts_delta_hash).unwrap())
}
@ -1039,7 +1029,8 @@ impl Bank {
impl Drop for Bank {
fn drop(&mut self) {
self.accounts.remove_accounts(self.accounts_id);
// For root forks this is a noop
self.accounts.purge_fork(self.slot());
}
}
@ -1056,10 +1047,14 @@ mod tests {
use solana_vote_api::vote_state::VoteState;
#[test]
fn test_bank_new() {
fn test_bank_new_no_parent() {
solana_logger::setup();
let (genesis_block, _) = GenesisBlock::new(10_000);
let bank = Bank::new(&genesis_block);
assert_eq!(bank.get_balance(&genesis_block.mint_id), 10_000);
trace!("get balance {}", genesis_block.mint_id);
let bal = bank.get_balance(&genesis_block.mint_id);
trace!("done get balance {}", bal);
assert_eq!(bal, 10_000);
}
#[test]

View File

@ -1,4 +1,5 @@
mod accounts;
mod accounts_index;
pub mod append_vec;
pub mod bank;
pub mod bank_client;