accounts shedding (#3078)

* accounts shedding

* fixup
This commit is contained in:
Rob Walker 2019-03-03 16:04:04 -08:00 committed by GitHub
parent 8ec10d4de9
commit e4dba03e12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 157 additions and 120 deletions

View File

@ -4,7 +4,7 @@ extern crate test;
use bincode::{deserialize, serialize_into, serialized_size};
use rand::{thread_rng, Rng};
use solana_runtime::appendvec::{
use solana_runtime::append_vec::{
deserialize_account, get_serialized_size, serialize_account, AppendVec,
};
use solana_sdk::account::Account;
@ -20,7 +20,7 @@ use test::Bencher;
const START_SIZE: u64 = 4 * 1024 * 1024;
const INC_SIZE: u64 = 1 * 1024 * 1024;
fn get_appendvec_bench_path(path: &str) -> PathBuf {
fn get_append_vec_bench_path(path: &str) -> PathBuf {
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let mut buf = PathBuf::new();
buf.push(&format!("{}/{}", out_dir, path));
@ -28,8 +28,8 @@ fn get_appendvec_bench_path(path: &str) -> PathBuf {
}
#[bench]
fn appendvec_atomic_append(bencher: &mut Bencher) {
let path = get_appendvec_bench_path("bench_append");
fn append_vec_atomic_append(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_append");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE);
bencher.iter(|| {
if vec.append(AtomicUsize::new(0)).is_none() {
@ -41,8 +41,8 @@ fn appendvec_atomic_append(bencher: &mut Bencher) {
}
#[bench]
fn appendvec_atomic_random_access(bencher: &mut Bencher) {
let path = get_appendvec_bench_path("bench_ra");
fn append_vec_atomic_random_access(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_ra");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE);
let size = 1_000_000;
for _ in 0..size {
@ -59,8 +59,8 @@ fn appendvec_atomic_random_access(bencher: &mut Bencher) {
}
#[bench]
fn appendvec_atomic_random_change(bencher: &mut Bencher) {
let path = get_appendvec_bench_path("bench_rax");
fn append_vec_atomic_random_change(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_rax");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE);
let size = 1_000_000;
for _ in 0..size {
@ -83,8 +83,8 @@ fn appendvec_atomic_random_change(bencher: &mut Bencher) {
}
#[bench]
fn appendvec_atomic_random_read(bencher: &mut Bencher) {
let path = get_appendvec_bench_path("bench_read");
fn append_vec_atomic_random_read(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_read");
let mut vec = AppendVec::<AtomicUsize>::new(&path, true, START_SIZE, INC_SIZE);
let size = 1_000_000;
for _ in 0..size {
@ -103,8 +103,8 @@ fn appendvec_atomic_random_read(bencher: &mut Bencher) {
}
#[bench]
fn appendvec_concurrent_lock_append(bencher: &mut Bencher) {
let path = get_appendvec_bench_path("bench_lock_append");
fn append_vec_concurrent_lock_append(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_lock_append");
let vec = Arc::new(RwLock::new(AppendVec::<AtomicUsize>::new(
&path, true, START_SIZE, INC_SIZE,
)));
@ -143,8 +143,8 @@ fn appendvec_concurrent_lock_append(bencher: &mut Bencher) {
}
#[bench]
fn appendvec_concurrent_get_append(bencher: &mut Bencher) {
let path = get_appendvec_bench_path("bench_get_append");
fn append_vec_concurrent_get_append(bencher: &mut Bencher) {
let path = get_append_vec_bench_path("bench_get_append");
let vec = Arc::new(RwLock::new(AppendVec::<AtomicUsize>::new(
&path, true, START_SIZE, INC_SIZE,
)));

View File

@ -1,4 +1,4 @@
use crate::appendvec::AppendVec;
use crate::append_vec::AppendVec;
use crate::bank::{BankError, Result};
use crate::runtime::has_duplicates;
use bincode::serialize;
@ -76,9 +76,6 @@ type AppendVecId = usize;
type Fork = u64;
#[derive(Debug)]
struct AccountMap(RwLock<HashMap<Fork, (AppendVecId, u64)>>);
#[derive(Debug, PartialEq)]
enum AccountStorageStatus {
StorageAvailable = 0,
@ -96,20 +93,29 @@ impl From<usize> for AccountStorageStatus {
}
}
struct AccountIndexInfo {
/// For each Pubkey, the account for a specific fork is in a specific
/// AppendVec at a specific index
index: RwLock<HashMap<Pubkey, AccountMap>>,
// in a given a Fork, which AppendVecId and offset
type AccountMap = RwLock<HashMap<Fork, (AppendVecId, u64)>>;
/// information about where Accounts are stored and which vote accounts are present
/// keying hierarchy is:
///
/// pubkey->fork->append_vec->offset
///
struct AccountIndex {
/// For each Pubkey, the Account for a specific Fork is in a specific
/// AppendVec at a specific index. There may be an Account for Pubkey
/// in any number of Forks.
account_maps: RwLock<HashMap<Pubkey, AccountMap>>,
/// Cached index to vote accounts for performance reasons to avoid having
/// to iterate through the entire accounts each time
vote_index: RwLock<HashSet<Pubkey>>,
/// to iterate through the entire accounts each time
vote_accounts: RwLock<HashSet<Pubkey>>,
}
/// Persistent storage structure holding the accounts
struct AccountStorage {
/// storage holding the accounts
appendvec: Arc<RwLock<AppendVec<Account>>>,
accounts: Arc<RwLock<AppendVec<Account>>>,
/// Keeps track of the number of accounts stored in a specific AppendVec.
/// This is periodically checked to reuse the stores that do not have
@ -134,19 +140,18 @@ impl AccountStorage {
}
#[derive(Default, Debug)]
struct AccountsForkInfo {
/// The number of transactions the bank has processed without error since the
/// start of the ledger.
struct ForkInfo {
/// The number of transactions processed without error
transaction_count: u64,
/// List of all parents corresponding to this fork
/// List of all parents of this fork
parents: Vec<Fork>,
}
// This structure handles the load/store of the accounts
pub struct AccountsDB {
/// Keeps tracks of index into AppendVec on a per fork basis
index_info: AccountIndexInfo,
account_index: AccountIndex,
/// Account storage
storage: RwLock<Vec<AccountStorage>>,
@ -155,7 +160,7 @@ pub struct AccountsDB {
next_id: AtomicUsize,
/// Information related to the fork
fork_info: RwLock<HashMap<Fork, AccountsForkInfo>>,
fork_infos: RwLock<HashMap<Fork, ForkInfo>>,
}
/// This structure handles synchronization for db
@ -194,15 +199,15 @@ impl Drop for Accounts {
impl AccountsDB {
pub fn new(fork: Fork, paths: &str) -> Self {
let index_info = AccountIndexInfo {
index: RwLock::new(HashMap::new()),
vote_index: RwLock::new(HashSet::new()),
let account_index = AccountIndex {
account_maps: RwLock::new(HashMap::new()),
vote_accounts: RwLock::new(HashSet::new()),
};
let accounts_db = AccountsDB {
index_info,
account_index,
storage: RwLock::new(vec![]),
next_id: AtomicUsize::new(0),
fork_info: RwLock::new(HashMap::new()),
fork_infos: RwLock::new(HashMap::new()),
};
accounts_db.add_storage(paths);
accounts_db.add_fork(fork, None);
@ -210,15 +215,17 @@ impl AccountsDB {
}
pub fn add_fork(&self, fork: Fork, parent: Option<Fork>) {
let mut info = self.fork_info.write().unwrap();
let mut fork_info = AccountsForkInfo::default();
let mut fork_infos = self.fork_infos.write().unwrap();
let mut fork_info = ForkInfo::default();
if let Some(parent) = parent {
fork_info.parents.push(parent);
if let Some(list) = info.get(&parent) {
fork_info.parents.extend_from_slice(&list.parents);
if let Some(parent_fork_info) = fork_infos.get(&parent) {
fork_info
.parents
.extend_from_slice(&parent_fork_info.parents);
}
}
if let Some(old_fork_info) = info.insert(fork, fork_info) {
if let Some(old_fork_info) = fork_infos.insert(fork, fork_info) {
panic!("duplicate forks! {} {:?}", fork, old_fork_info);
}
}
@ -228,7 +235,7 @@ impl AccountsDB {
let mut stores: Vec<AccountStorage> = vec![];
paths.iter().for_each(|p| {
let storage = AccountStorage {
appendvec: self.new_account_storage(&p),
accounts: self.new_account_storage(&p),
status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize),
count: AtomicUsize::new(0),
path: p.to_string(),
@ -254,8 +261,8 @@ impl AccountsDB {
}
fn get_vote_accounts(&self, fork: Fork) -> HashMap<Pubkey, Account> {
self.index_info
.vote_index
self.account_index
.vote_accounts
.read()
.unwrap()
.iter()
@ -270,11 +277,10 @@ impl AccountsDB {
}
pub fn has_accounts(&self, fork: Fork) -> bool {
let index = self.index_info.index.read().unwrap();
let account_maps = self.account_index.account_maps.read().unwrap();
for entry in index.values() {
let account_map = entry.0.read().unwrap();
if account_map.contains_key(&fork) {
for account_map in account_maps.values() {
if account_map.read().unwrap().contains_key(&fork) {
return true;
}
}
@ -282,20 +288,29 @@ impl AccountsDB {
}
pub fn hash_internal_state(&self, fork: Fork) -> Option<Hash> {
let mut ordered_accounts = BTreeMap::new();
let rindex = self.index_info.index.read().unwrap();
rindex.iter().for_each(|(p, entry)| {
let forks = entry.0.read().unwrap();
if let Some((id, index)) = forks.get(&fork) {
let account = self.storage.read().unwrap()[*id]
.appendvec
.read()
.unwrap()
.get_account(*index)
.unwrap();
ordered_accounts.insert(*p, account);
}
});
let ordered_accounts: BTreeMap<_, _> = self
.account_index
.account_maps
.read()
.unwrap()
.iter()
.filter_map(|(pubkey, account_map)| {
let account_map = account_map.read().unwrap();
if let Some((vec_id, offset)) = account_map.get(&fork) {
Some((
*pubkey,
self.storage.read().unwrap()[*vec_id]
.accounts
.read()
.unwrap()
.get_account(*offset)
.unwrap(),
))
} else {
None
}
})
.collect();
if ordered_accounts.is_empty() {
return None;
@ -304,26 +319,26 @@ impl AccountsDB {
}
fn get_account(&self, id: AppendVecId, offset: u64) -> Account {
let appendvec = &self.storage.read().unwrap()[id].appendvec;
let av = appendvec.read().unwrap();
let accounts = &self.storage.read().unwrap()[id].accounts;
let av = accounts.read().unwrap();
av.get_account(offset).unwrap()
}
fn load(&self, fork: Fork, pubkey: &Pubkey, walk_back: bool) -> Option<Account> {
let index = self.index_info.index.read().unwrap();
if let Some(map) = index.get(pubkey) {
let forks = map.0.read().unwrap();
let account_maps = self.account_index.account_maps.read().unwrap();
if let Some(account_map) = account_maps.get(pubkey) {
let account_map = account_map.read().unwrap();
// find most recent fork that is an ancestor of current_fork
if let Some((id, offset)) = forks.get(&fork) {
if let Some((id, offset)) = account_map.get(&fork) {
return Some(self.get_account(*id, *offset));
} else {
if !walk_back {
return None;
}
let fork_info = self.fork_info.read().unwrap();
if let Some(info) = fork_info.get(&fork) {
for parent_fork in info.parents.iter() {
if let Some((id, offset)) = forks.get(&parent_fork) {
let fork_infos = self.fork_infos.read().unwrap();
if let Some(fork_info) = fork_infos.get(&fork) {
for parent_fork in fork_info.parents.iter() {
if let Some((id, offset)) = account_map.get(&parent_fork) {
return Some(self.get_account(*id, *offset));
}
}
@ -363,7 +378,7 @@ impl AccountsDB {
// check if new store was already created
if stores.len() == len {
let storage = AccountStorage {
appendvec: self.new_account_storage(&stores[id].path),
accounts: self.new_account_storage(&stores[id].path),
count: AtomicUsize::new(0),
status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize),
path: stores[id].path.clone(),
@ -381,7 +396,7 @@ impl AccountsDB {
let mut id = self.get_storage_id(start, std::usize::MAX);
// Even if no tokens, need to preserve the account owner so
// we can update the vote_index correctly if this account is purged
// we can update the vote_accounts correctly if this account is purged
// when squashing.
let acc = &mut account.clone();
if account.tokens == 0 {
@ -391,7 +406,7 @@ impl AccountsDB {
loop {
let result: Option<u64>;
{
let av = &self.storage.read().unwrap()[id].appendvec;
let av = &self.storage.read().unwrap()[id].accounts;
result = av.read().unwrap().append_account(acc);
}
if let Some(val) = result {
@ -405,12 +420,12 @@ impl AccountsDB {
}
fn remove_account_entries(&self, entries: &[Fork], map: &AccountMap) -> bool {
let mut forks = map.0.write().unwrap();
let mut forks = map.write().unwrap();
for fork in entries.iter() {
if let Some((id, _)) = forks.remove(&fork) {
let stores = self.storage.read().unwrap();
if stores[id].count.fetch_sub(1, Ordering::Relaxed) == 1 {
stores[id].appendvec.write().unwrap().reset();
stores[id].accounts.write().unwrap().reset();
stores[id].set_status(AccountStorageStatus::StorageAvailable);
}
}
@ -418,9 +433,9 @@ impl AccountsDB {
forks.is_empty()
}
fn account_map_is_empty(pubkey: &Pubkey, index: &HashMap<Pubkey, AccountMap>) -> bool {
if let Some(account_map) = index.get(pubkey) {
if account_map.0.read().unwrap().len() == 0 {
fn account_map_is_empty(pubkey: &Pubkey, account_maps: &HashMap<Pubkey, AccountMap>) -> bool {
if let Some(account_map) = account_maps.get(pubkey) {
if account_map.read().unwrap().len() == 0 {
return true;
}
}
@ -430,25 +445,33 @@ impl AccountsDB {
fn update_vote_cache(
&self,
account: &Account,
index: &HashMap<Pubkey, AccountMap>,
account_maps: &HashMap<Pubkey, AccountMap>,
pubkey: &Pubkey,
) {
if solana_vote_api::check_id(&account.owner) {
if Self::account_map_is_empty(pubkey, index) {
self.index_info.vote_index.write().unwrap().remove(pubkey);
if Self::account_map_is_empty(pubkey, account_maps) {
self.account_index
.vote_accounts
.write()
.unwrap()
.remove(pubkey);
} else {
self.index_info.vote_index.write().unwrap().insert(*pubkey);
self.account_index
.vote_accounts
.write()
.unwrap()
.insert(*pubkey);
}
}
}
fn insert_account_entry(&self, fork: Fork, id: AppendVecId, offset: u64, map: &AccountMap) {
let mut forks = map.0.write().unwrap();
let mut forks = map.write().unwrap();
let stores = self.storage.read().unwrap();
stores[id].count.fetch_add(1, Ordering::Relaxed);
if let Some((old_id, _)) = forks.insert(fork, (id, offset)) {
if stores[old_id].count.fetch_sub(1, Ordering::Relaxed) == 1 {
stores[old_id].appendvec.write().unwrap().reset();
stores[old_id].accounts.write().unwrap().reset();
stores[old_id].set_status(AccountStorageStatus::StorageAvailable);
}
}
@ -458,26 +481,32 @@ impl AccountsDB {
fn store_account(&self, fork: Fork, pubkey: &Pubkey, account: &Account) {
if account.tokens == 0 && self.is_squashed(fork) {
// purge if balance is 0 and no checkpoints
let index = self.index_info.index.read().unwrap();
let map = index.get(&pubkey).unwrap();
let account_maps = self.account_index.account_maps.read().unwrap();
let map = account_maps.get(&pubkey).unwrap();
self.remove_account_entries(&[fork], &map);
self.update_vote_cache(account, &index, pubkey);
self.update_vote_cache(account, &account_maps, pubkey);
} else {
let (id, offset) = self.append_account(account);
let index = self.index_info.index.read().unwrap();
let account_maps = self.account_index.account_maps.read().unwrap();
let map = index.get(&pubkey).unwrap();
let map = account_maps.get(&pubkey).unwrap();
self.insert_account_entry(fork, id, offset, &map);
self.update_vote_cache(account, &index, pubkey);
self.update_vote_cache(account, &account_maps, pubkey);
}
}
pub fn store(&self, fork: Fork, pubkey: &Pubkey, account: &Account) {
{
if !self.index_info.index.read().unwrap().contains_key(&pubkey) {
let mut windex = self.index_info.index.write().unwrap();
windex.insert(*pubkey, AccountMap(RwLock::new(HashMap::new())));
if !self
.account_index
.account_maps
.read()
.unwrap()
.contains_key(&pubkey)
{
let mut waccount_maps = self.account_index.account_maps.write().unwrap();
waccount_maps.insert(*pubkey, RwLock::new(HashMap::new()));
}
}
self.store_account(fork, pubkey, account);
@ -492,23 +521,23 @@ impl AccountsDB {
) {
let mut keys = vec![];
{
let index = self.index_info.index.read().unwrap();
let account_maps = self.account_index.account_maps.read().unwrap();
for (i, raccs) in loaded.iter().enumerate() {
if res[i].is_err() || raccs.is_err() {
continue;
}
let tx = &txs[i];
for key in tx.account_keys.iter() {
if !index.contains_key(&key) {
if !account_maps.contains_key(&key) {
keys.push(*key);
}
}
}
}
if !keys.is_empty() {
let mut index = self.index_info.index.write().unwrap();
let mut account_maps = self.account_index.account_maps.write().unwrap();
for key in keys.iter() {
index.insert(*key, AccountMap(RwLock::new(HashMap::new())));
account_maps.insert(*key, RwLock::new(HashMap::new()));
}
}
for (i, raccs) in loaded.iter().enumerate() {
@ -640,28 +669,27 @@ impl AccountsDB {
}
pub fn increment_transaction_count(&self, fork: Fork, tx_count: usize) {
let mut info = self.fork_info.write().unwrap();
let entry = info.entry(fork).or_insert(AccountsForkInfo::default());
entry.transaction_count += tx_count as u64;
let mut fork_infos = self.fork_infos.write().unwrap();
let fork_info = fork_infos.entry(fork).or_insert(ForkInfo::default());
fork_info.transaction_count += tx_count as u64;
}
pub fn transaction_count(&self, fork: Fork) -> u64 {
let info = self.fork_info.read().unwrap();
if let Some(entry) = info.get(&fork) {
entry.transaction_count
} else {
0
}
self.fork_infos
.read()
.unwrap()
.get(&fork)
.map_or(0, |fork_info| fork_info.transaction_count)
}
fn remove_parents(&self, fork: Fork) -> Vec<Fork> {
let mut info = self.fork_info.write().unwrap();
let mut info = self.fork_infos.write().unwrap();
let fork_info = info.get_mut(&fork).unwrap();
fork_info.parents.split_off(0)
}
fn is_squashed(&self, fork: Fork) -> bool {
self.fork_info
self.fork_infos
.read()
.unwrap()
.get(&fork)
@ -670,13 +698,13 @@ impl AccountsDB {
.is_empty()
}
fn get_merged_index(
fn get_merged_account_map(
&self,
fork: Fork,
parents: &[Fork],
map: &AccountMap,
) -> Option<(Fork, AppendVecId, u64)> {
let forks = map.0.read().unwrap();
let forks = map.read().unwrap();
if let Some((id, offset)) = forks.get(&fork) {
return Some((fork, *id, *offset));
} else {
@ -701,9 +729,10 @@ impl AccountsDB {
// absent
let mut keys = vec![];
{
let index = self.index_info.index.read().unwrap();
index.iter().for_each(|(pubkey, map)| {
if let Some((parent_fork, id, offset)) = self.get_merged_index(fork, &parents, &map)
let account_maps = self.account_index.account_maps.read().unwrap();
account_maps.iter().for_each(|(pubkey, map)| {
if let Some((parent_fork, id, offset)) =
self.get_merged_account_map(fork, &parents, &map)
{
if parent_fork != fork {
self.insert_account_entry(fork, id, offset, &map);
@ -713,16 +742,16 @@ impl AccountsDB {
if self.remove_account_entries(&[fork], &map) {
keys.push(pubkey.clone());
}
self.update_vote_cache(&account, &index, pubkey);
self.update_vote_cache(&account, &account_maps, pubkey);
}
}
}
});
}
if !keys.is_empty() {
let mut index = self.index_info.index.write().unwrap();
let mut account_maps = self.account_index.account_maps.write().unwrap();
for key in keys.iter() {
index.remove(&key);
account_maps.remove(&key);
}
}
}
@ -1688,7 +1717,15 @@ mod tests {
accounts_db.squash(1);
accounts_db.squash(2);
assert_eq!(accounts_db.index_info.vote_index.read().unwrap().len(), 1);
assert_eq!(
accounts_db
.account_index
.vote_accounts
.read()
.unwrap()
.len(),
1
);
assert_eq!(accounts_db.get_vote_accounts(1).len(), 1);
assert_eq!(accounts_db.get_vote_accounts(2).len(), 1);

View File

@ -1,5 +1,5 @@
mod accounts;
pub mod appendvec;
pub mod append_vec;
pub mod bank;
pub mod bloom;
mod hash_queue;