From f73d38739addd1bfd110cfe8cfd2d0f3d9ca2495 Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Tue, 16 Apr 2019 13:32:22 -0700 Subject: [PATCH] Split AccountsDB from Accounts (#3808) Split AccountsDB from Accounts --- runtime/src/accounts.rs | 1286 +++++++----------------------------- runtime/src/accounts_db.rs | 834 +++++++++++++++++++++++ runtime/src/bank.rs | 3 +- runtime/src/lib.rs | 1 + 4 files changed, 1062 insertions(+), 1062 deletions(-) create mode 100644 runtime/src/accounts_db.rs diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index d296c76045..57c54d6cc3 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -1,31 +1,13 @@ -//! Persistent accounts are stored in below path location: -//! //data/ -//! -//! The persistent store would allow for this mode of operation: -//! - Concurrent single thread append with many concurrent readers. -//! -//! The underlying memory is memory mapped to a file. The accounts would be -//! stored across multiple files and the mappings of file and offset of a -//! particular account would be stored in a shared index. This will allow for -//! concurrent commits without blocking reads, which will sequentially write -//! to memory, ssd or disk, and should be as fast as the hardware allow for. -//! The only required in memory data structure with a write lock is the index, -//! which should be fast to update. -//! -//! AppendVec's only store accounts for single forks. To bootstrap the -//! index from a persistent store of AppendVec's, the entries include -//! a "write_version". A single global atomic `AccountsDB::write_version` -//! tracks the number of commits to the entire data store. So the latest -//! commit for each fork entry would be indexed. - +use crate::accounts_db::{ + get_paths_vec, AccountInfo, AccountStorageSlice, AccountsDB, ErrorCounters, + InstructionAccounts, InstructionLoaders, +}; use crate::accounts_index::{AccountsIndex, Fork}; -use crate::append_vec::{AppendVec, StorageMeta, StoredAccount}; +use crate::append_vec::StoredAccount; use crate::message_processor::has_duplicates; use bincode::serialize; use hashbrown::{HashMap, HashSet}; use log::*; -use rand::{thread_rng, Rng}; -use rayon::prelude::*; use solana_metrics::counter::Counter; use solana_sdk::account::Account; use solana_sdk::fee_calculator::FeeCalculator; @@ -36,148 +18,15 @@ use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::transaction::Result; use solana_sdk::transaction::{Transaction, TransactionError}; use std::env; -use std::fs::{create_dir_all, remove_dir_all}; +use std::fs::remove_dir_all; use std::ops::Neg; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex}; -pub type InstructionAccounts = Vec; -pub type InstructionLoaders = Vec>; - -#[derive(Debug, Default)] -pub struct ErrorCounters { - pub account_not_found: usize, - pub account_in_use: usize, - pub account_loaded_twice: usize, - pub blockhash_not_found: usize, - pub blockhash_too_old: usize, - pub reserve_blockhash: usize, - pub insufficient_funds: usize, - pub invalid_account_index: usize, - pub duplicate_signature: usize, - pub call_chain_too_deep: usize, - pub missing_signature_for_fee: usize, -} - -const ACCOUNT_DATA_FILE_SIZE: u64 = 64 * 1024 * 1024; -const ACCOUNT_DATA_FILE: &str = "data"; const ACCOUNTSDB_DIR: &str = "accountsdb"; const NUM_ACCOUNT_DIRS: usize = 4; -/// An offset into the AccountsDB::storage vector -type AppendVecId = usize; - -#[derive(Debug, PartialEq)] -enum AccountStorageStatus { - StorageAvailable = 0, - StorageFull = 1, -} - -impl From for AccountStorageStatus { - fn from(status: usize) -> Self { - use self::AccountStorageStatus::*; - match status { - 0 => StorageAvailable, - 1 => StorageFull, - _ => unreachable!(), - } - } -} - -#[derive(Default, Clone)] -struct AccountInfo { - /// index identifying the append storage - id: AppendVecId, - - /// offset into the storage - offset: usize, - - /// lamports in the account used when squashing kept for optimization - /// purposes to remove accounts with zero balance. - lamports: u64, -} - -/// Persistent storage structure holding the accounts -struct AccountStorageEntry { - fork_id: Fork, - - /// storage holding the accounts - accounts: AppendVec, - - /// Keeps track of the number of accounts stored in a specific AppendVec. - /// This is periodically checked to reuse the stores that do not have - /// any accounts in it. - count: AtomicUsize, - - /// status corresponding to the storage - status: AtomicUsize, -} - -impl AccountStorageEntry { - pub fn new(path: &str, fork_id: Fork, id: usize, file_size: u64) -> Self { - let p = format!("{}/{}", path, id); - let path = Path::new(&p); - let _ignored = remove_dir_all(path); - create_dir_all(path).expect("Create directory failed"); - let accounts = AppendVec::new(&path.join(ACCOUNT_DATA_FILE), true, file_size as usize); - - AccountStorageEntry { - fork_id, - accounts, - count: AtomicUsize::new(0), - status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize), - } - } - - pub fn set_status(&self, status: AccountStorageStatus) { - self.status.store(status as usize, Ordering::Relaxed); - } - - pub fn get_status(&self) -> AccountStorageStatus { - self.status.load(Ordering::Relaxed).into() - } - - fn add_account(&self) { - self.count.fetch_add(1, Ordering::Relaxed); - } - - fn remove_account(&self) -> bool { - if self.count.fetch_sub(1, Ordering::Relaxed) == 1 { - self.accounts.reset(); - self.set_status(AccountStorageStatus::StorageAvailable); - true - } else { - false - } - } -} - -type AccountStorage = Vec>; -type AccountStorageSlice = [Arc]; - -// This structure handles the load/store of the accounts -#[derive(Default)] -pub struct AccountsDB { - /// Keeps tracks of index into AppendVec on a per fork basis - accounts_index: RwLock>, - - /// Account storage - storage: RwLock, - - /// distribute the accounts across storage lists - next_id: AtomicUsize, - - /// write version - write_version: AtomicUsize, - - /// Set of storage paths to pick from - paths: Vec, - - /// Starting file size of appendvecs - file_size: u64, -} - /// This structure handles synchronization for db #[derive(Default)] pub struct Accounts { @@ -195,10 +44,6 @@ pub struct Accounts { own_paths: bool, } -fn get_paths_vec(paths: &str) -> Vec { - paths.split(',').map(ToString::to_string).collect() -} - impl Drop for Accounts { fn drop(&mut self) { let paths = get_paths_vec(&self.paths); @@ -214,431 +59,6 @@ impl Drop for Accounts { } } -impl AccountsDB { - pub fn new_with_file_size(paths: &str, file_size: u64) -> Self { - let paths = get_paths_vec(&paths); - AccountsDB { - accounts_index: RwLock::new(AccountsIndex::default()), - storage: RwLock::new(vec![]), - next_id: AtomicUsize::new(0), - write_version: AtomicUsize::new(0), - paths, - file_size, - } - } - - pub fn new(paths: &str) -> Self { - Self::new_with_file_size(paths, ACCOUNT_DATA_FILE_SIZE) - } - - fn new_storage_entry(&self, fork_id: Fork, path: &str) -> AccountStorageEntry { - AccountStorageEntry::new( - path, - fork_id, - self.next_id.fetch_add(1, Ordering::Relaxed), - self.file_size, - ) - } - - pub fn has_accounts(&self, fork: Fork) -> bool { - for x in self.storage.read().unwrap().iter() { - if x.fork_id == fork && x.count.load(Ordering::Relaxed) > 0 { - return true; - } - } - false - } - - /// Scan a specific fork through all the account storage in parallel with sequential read - // PERF: Sequentially read each storage entry in parallel - pub fn scan_account_storage(&self, fork_id: Fork, scan_func: F) -> Vec - where - F: Fn(&StoredAccount, &mut B) -> (), - F: Send + Sync, - B: Send + Default, - { - let storage_maps: Vec> = self - .storage - .read() - .unwrap() - .iter() - .filter(|store| store.fork_id == fork_id) - .cloned() - .collect(); - storage_maps - .into_par_iter() - .map(|storage| { - let accounts = storage.accounts.accounts(0); - let mut retval = B::default(); - accounts - .iter() - .for_each(|stored_account| scan_func(stored_account, &mut retval)); - retval - }) - .collect() - } - - fn hash_account(stored_account: &StoredAccount) -> Hash { - let mut hasher = Hasher::default(); - hasher.hash(&serialize(&stored_account.balance).unwrap()); - hasher.hash(stored_account.data); - hasher.result() - } - - pub fn hash_internal_state(&self, fork_id: Fork) -> Option { - let accumulator: Vec> = self.scan_account_storage( - fork_id, - |stored_account: &StoredAccount, accum: &mut Vec<(Pubkey, u64, Hash)>| { - accum.push(( - stored_account.meta.pubkey, - stored_account.meta.write_version, - Self::hash_account(stored_account), - )); - }, - ); - let mut account_hashes: Vec<_> = accumulator.into_iter().flat_map(|x| x).collect(); - account_hashes.sort_by_key(|s| (s.0, (s.1 as i64).neg())); - account_hashes.dedup_by_key(|s| s.0); - if account_hashes.is_empty() { - None - } else { - let mut hasher = Hasher::default(); - for (_, _, hash) in account_hashes { - hasher.hash(hash.as_ref()); - } - Some(hasher.result()) - } - } - - fn load( - storage: &AccountStorageSlice, - ancestors: &HashMap, - accounts_index: &AccountsIndex, - pubkey: &Pubkey, - ) -> Option { - let info = accounts_index.get(pubkey, ancestors)?; - //TODO: thread this as a ref - storage - .get(info.id) - .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) - } - - fn load_tx_accounts( - storage: &AccountStorageSlice, - ancestors: &HashMap, - accounts_index: &AccountsIndex, - tx: &Transaction, - fee: u64, - error_counters: &mut ErrorCounters, - ) -> Result> { - // Copy all the accounts - let message = tx.message(); - if tx.signatures.is_empty() && fee != 0 { - Err(TransactionError::MissingSignatureForFee) - } else { - // Check for unique account keys - if has_duplicates(&message.account_keys) { - error_counters.account_loaded_twice += 1; - return Err(TransactionError::AccountLoadedTwice); - } - - // There is no way to predict what program will execute without an error - // If a fee can pay for execution then the program will be scheduled - let mut called_accounts: Vec = vec![]; - for key in &message.account_keys { - called_accounts - .push(Self::load(storage, ancestors, accounts_index, key).unwrap_or_default()); - } - if called_accounts.is_empty() || called_accounts[0].lamports == 0 { - error_counters.account_not_found += 1; - Err(TransactionError::AccountNotFound) - } else if called_accounts[0].lamports < fee { - error_counters.insufficient_funds += 1; - Err(TransactionError::InsufficientFundsForFee) - } else { - called_accounts[0].lamports -= fee; - Ok(called_accounts) - } - } - } - - fn load_executable_accounts( - storage: &AccountStorageSlice, - ancestors: &HashMap, - accounts_index: &AccountsIndex, - program_id: &Pubkey, - error_counters: &mut ErrorCounters, - ) -> Result> { - let mut accounts = Vec::new(); - let mut depth = 0; - let mut program_id = *program_id; - loop { - if native_loader::check_id(&program_id) { - // at the root of the chain, ready to dispatch - break; - } - - if depth >= 5 { - error_counters.call_chain_too_deep += 1; - return Err(TransactionError::CallChainTooDeep); - } - depth += 1; - - let program = match Self::load(storage, ancestors, accounts_index, &program_id) { - Some(program) => program, - None => { - error_counters.account_not_found += 1; - return Err(TransactionError::AccountNotFound); - } - }; - if !program.executable || program.owner == Pubkey::default() { - error_counters.account_not_found += 1; - return Err(TransactionError::AccountNotFound); - } - - // add loader to chain - program_id = program.owner; - accounts.insert(0, (program_id, program)); - } - Ok(accounts) - } - - /// For each program_id in the transaction, load its loaders. - fn load_loaders( - storage: &AccountStorageSlice, - ancestors: &HashMap, - accounts_index: &AccountsIndex, - tx: &Transaction, - error_counters: &mut ErrorCounters, - ) -> Result>> { - let message = tx.message(); - message - .instructions - .iter() - .map(|ix| { - if message.program_ids().len() <= ix.program_ids_index as usize { - error_counters.account_not_found += 1; - return Err(TransactionError::AccountNotFound); - } - let program_id = message.program_ids()[ix.program_ids_index as usize]; - Self::load_executable_accounts( - storage, - ancestors, - accounts_index, - &program_id, - error_counters, - ) - }) - .collect() - } - - fn load_accounts( - &self, - ancestors: &HashMap, - txs: &[Transaction], - lock_results: Vec>, - fee_calculator: &FeeCalculator, - error_counters: &mut ErrorCounters, - ) -> Vec> { - //PERF: hold the lock to scan for the references, but not to clone the accounts - //TODO: two locks usually leads to deadlocks, should this be one structure? - let accounts_index = self.accounts_index.read().unwrap(); - let storage = self.storage.read().unwrap(); - txs.iter() - .zip(lock_results.into_iter()) - .map(|etx| match etx { - (tx, Ok(())) => { - let fee = fee_calculator.calculate_fee(tx.message()); - let accounts = Self::load_tx_accounts( - &storage, - ancestors, - &accounts_index, - tx, - fee, - error_counters, - )?; - let loaders = Self::load_loaders( - &storage, - ancestors, - &accounts_index, - tx, - error_counters, - )?; - Ok((accounts, loaders)) - } - (_, Err(e)) => Err(e), - }) - .collect() - } - - fn load_slow(&self, ancestors: &HashMap, pubkey: &Pubkey) -> Option { - let accounts_index = self.accounts_index.read().unwrap(); - let storage = self.storage.read().unwrap(); - Self::load(&storage, ancestors, &accounts_index, pubkey) - } - - fn get_storage_id(&self, fork_id: Fork, start: usize, current: usize) -> usize { - let mut id = current; - let len: usize; - { - let stores = self.storage.read().unwrap(); - len = stores.len(); - if len > 0 { - if id == std::usize::MAX { - id = start % len; - if stores[id].get_status() == AccountStorageStatus::StorageAvailable { - return id; - } - } else { - stores[id].set_status(AccountStorageStatus::StorageFull); - } - - loop { - id = (id + 1) % len; - if fork_id == stores[id].fork_id - && stores[id].get_status() == AccountStorageStatus::StorageAvailable - { - break; - } - if id == start % len { - break; - } - } - } - } - if len == 0 || id == start % len { - let mut stores = self.storage.write().unwrap(); - // check if new store was already created - if stores.len() == len { - let path_idx = thread_rng().gen_range(0, self.paths.len()); - let storage = self.new_storage_entry(fork_id, &self.paths[path_idx]); - stores.push(Arc::new(storage)); - } - id = stores.len() - 1; - } - id - } - - fn append_account(&self, fork_id: Fork, pubkey: &Pubkey, account: &Account) -> (usize, usize) { - let offset: usize; - let start = self.next_id.fetch_add(1, Ordering::Relaxed); - let mut id = self.get_storage_id(fork_id, start, std::usize::MAX); - - // Even if no lamports, need to preserve the account owner so - // we can update the vote_accounts correctly if this account is purged - // when squashing. - let acc = &mut account.clone(); - if account.lamports == 0 { - acc.data.resize(0, 0); - } - - loop { - let result: Option; - { - let accounts = &self.storage.read().unwrap()[id]; - let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64; - let meta = StorageMeta { - write_version, - pubkey: *pubkey, - data_len: account.data.len() as u64, - }; - result = accounts.accounts.append_account(meta, account); - accounts.add_account(); - } - if let Some(val) = result { - offset = val; - break; - } else { - id = self.get_storage_id(fork_id, start, id); - } - } - (id, offset) - } - - pub fn purge_fork(&self, fork: Fork) { - //add_root should be called first - let is_root = self.accounts_index.read().unwrap().is_root(fork); - trace!("PURGING {} {}", fork, is_root); - if !is_root { - self.storage.write().unwrap().retain(|x| { - trace!("PURGING {} {}", x.fork_id, fork); - x.fork_id != fork - }); - } - } - - /// Store the account update. - pub fn store(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) { - //TODO; these blocks should be separate functions and unit tested - let infos: Vec<_> = accounts - .iter() - .map(|(pubkey, account)| { - let (id, offset) = self.append_account(fork_id, pubkey, account); - AccountInfo { - id, - offset, - lamports: account.lamports, - } - }) - .collect(); - - let reclaims: Vec<(Fork, AccountInfo)> = { - let mut index = self.accounts_index.write().unwrap(); - let mut reclaims = vec![]; - for (i, info) in infos.into_iter().enumerate() { - let key = &accounts[i].0; - reclaims.extend(index.insert(fork_id, key, info).into_iter()) - } - reclaims - }; - - let dead_forks: HashSet = { - let stores = self.storage.read().unwrap(); - let mut cleared_forks: HashSet = HashSet::new(); - for (fork_id, account_info) in reclaims { - let cleared = stores[account_info.id].remove_account(); - if cleared { - cleared_forks.insert(fork_id); - } - } - let live_forks: HashSet = stores.iter().map(|x| x.fork_id).collect(); - cleared_forks.difference(&live_forks).cloned().collect() - }; - { - let mut index = self.accounts_index.write().unwrap(); - for fork in dead_forks { - index.cleanup_dead_fork(fork); - } - } - } - - pub fn store_accounts( - &self, - fork: Fork, - txs: &[Transaction], - res: &[Result<()>], - loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], - ) { - let mut accounts: Vec<(&Pubkey, &Account)> = vec![]; - for (i, raccs) in loaded.iter().enumerate() { - if res[i].is_err() || raccs.is_err() { - continue; - } - - let message = &txs[i].message(); - let acc = raccs.as_ref().unwrap(); - for (key, account) in message.account_keys.iter().zip(acc.0.iter()) { - accounts.push((key, account)); - } - } - self.store(fork, &accounts); - } - - pub fn add_root(&self, fork: Fork) { - self.accounts_index.write().unwrap().add_root(fork) - } -} - impl Accounts { fn make_new_dir() -> String { static ACCOUNT_DIR: AtomicUsize = AtomicUsize::new(0); @@ -689,6 +109,155 @@ impl Accounts { } } + fn load_tx_accounts( + storage: &AccountStorageSlice, + ancestors: &HashMap, + accounts_index: &AccountsIndex, + tx: &Transaction, + fee: u64, + error_counters: &mut ErrorCounters, + ) -> Result> { + // Copy all the accounts + let message = tx.message(); + if tx.signatures.is_empty() && fee != 0 { + Err(TransactionError::MissingSignatureForFee) + } else { + // Check for unique account keys + if has_duplicates(&message.account_keys) { + error_counters.account_loaded_twice += 1; + return Err(TransactionError::AccountLoadedTwice); + } + + // There is no way to predict what program will execute without an error + // If a fee can pay for execution then the program will be scheduled + let mut called_accounts: Vec = vec![]; + for key in &message.account_keys { + called_accounts.push( + AccountsDB::load(storage, ancestors, accounts_index, key).unwrap_or_default(), + ); + } + if called_accounts.is_empty() || called_accounts[0].lamports == 0 { + error_counters.account_not_found += 1; + Err(TransactionError::AccountNotFound) + } else if called_accounts[0].lamports < fee { + error_counters.insufficient_funds += 1; + Err(TransactionError::InsufficientFundsForFee) + } else { + called_accounts[0].lamports -= fee; + Ok(called_accounts) + } + } + } + + fn load_executable_accounts( + storage: &AccountStorageSlice, + ancestors: &HashMap, + accounts_index: &AccountsIndex, + program_id: &Pubkey, + error_counters: &mut ErrorCounters, + ) -> Result> { + let mut accounts = Vec::new(); + let mut depth = 0; + let mut program_id = *program_id; + loop { + if native_loader::check_id(&program_id) { + // at the root of the chain, ready to dispatch + break; + } + + if depth >= 5 { + error_counters.call_chain_too_deep += 1; + return Err(TransactionError::CallChainTooDeep); + } + depth += 1; + + let program = match AccountsDB::load(storage, ancestors, accounts_index, &program_id) { + Some(program) => program, + None => { + error_counters.account_not_found += 1; + return Err(TransactionError::AccountNotFound); + } + }; + if !program.executable || program.owner == Pubkey::default() { + error_counters.account_not_found += 1; + return Err(TransactionError::AccountNotFound); + } + + // add loader to chain + program_id = program.owner; + accounts.insert(0, (program_id, program)); + } + Ok(accounts) + } + + /// For each program_id in the transaction, load its loaders. + fn load_loaders( + storage: &AccountStorageSlice, + ancestors: &HashMap, + accounts_index: &AccountsIndex, + tx: &Transaction, + error_counters: &mut ErrorCounters, + ) -> Result>> { + let message = tx.message(); + message + .instructions + .iter() + .map(|ix| { + if message.program_ids().len() <= ix.program_ids_index as usize { + error_counters.account_not_found += 1; + return Err(TransactionError::AccountNotFound); + } + let program_id = message.program_ids()[ix.program_ids_index as usize]; + Self::load_executable_accounts( + storage, + ancestors, + accounts_index, + &program_id, + error_counters, + ) + }) + .collect() + } + + fn load_accounts_internal( + &self, + ancestors: &HashMap, + txs: &[Transaction], + lock_results: Vec>, + fee_calculator: &FeeCalculator, + error_counters: &mut ErrorCounters, + ) -> Vec> { + //PERF: hold the lock to scan for the references, but not to clone the accounts + //TODO: two locks usually leads to deadlocks, should this be one structure? + let accounts_index = self.accounts_db.accounts_index.read().unwrap(); + let storage = self.accounts_db.storage.read().unwrap(); + txs.iter() + .zip(lock_results.into_iter()) + .map(|etx| match etx { + (tx, Ok(())) => { + let fee = fee_calculator.calculate_fee(tx.message()); + let accounts = Self::load_tx_accounts( + &storage, + ancestors, + &accounts_index, + tx, + fee, + error_counters, + )?; + let loaders = Self::load_loaders( + &storage, + ancestors, + &accounts_index, + tx, + error_counters, + )?; + Ok((accounts, loaders)) + } + (_, Err(e)) => Err(e), + }) + .collect() + } + /// Slow because lock is held for 1 operation instead of many pub fn load_slow(&self, ancestors: &HashMap, pubkey: &Pubkey) -> Option { self.accounts_db @@ -752,8 +321,36 @@ impl Accounts { } } - pub fn hash_internal_state(&self, fork: Fork) -> Option { - self.accounts_db.hash_internal_state(fork) + fn hash_account(stored_account: &StoredAccount) -> Hash { + let mut hasher = Hasher::default(); + hasher.hash(&serialize(&stored_account.balance).unwrap()); + hasher.hash(stored_account.data); + hasher.result() + } + + pub fn hash_internal_state(&self, fork_id: Fork) -> Option { + let accumulator: Vec> = self.accounts_db.scan_account_storage( + fork_id, + |stored_account: &StoredAccount, accum: &mut Vec<(Pubkey, u64, Hash)>| { + accum.push(( + stored_account.meta.pubkey, + stored_account.meta.write_version, + Self::hash_account(stored_account), + )); + }, + ); + let mut account_hashes: Vec<_> = accumulator.into_iter().flat_map(|x| x).collect(); + account_hashes.sort_by_key(|s| (s.0, (s.1 as i64).neg())); + account_hashes.dedup_by_key(|s| s.0); + if account_hashes.is_empty() { + None + } else { + let mut hasher = Hasher::default(); + for (_, _, hash) in account_hashes { + hasher.hash(hash.as_ref()); + } + Some(hasher.result()) + } } /// This function will prevent multiple threads from modifying the same account state at the @@ -802,8 +399,7 @@ impl Accounts { fee_calculator: &FeeCalculator, error_counters: &mut ErrorCounters, ) -> Vec> { - self.accounts_db - .load_accounts(ancestors, txs, results, fee_calculator, error_counters) + self.load_accounts_internal(ancestors, txs, results, fee_calculator, error_counters) } /// Store the accounts into the DB @@ -814,7 +410,19 @@ impl Accounts { res: &[Result<()>], loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], ) { - self.accounts_db.store_accounts(fork, txs, res, loaded) + let mut accounts: Vec<(&Pubkey, &Account)> = vec![]; + for (i, raccs) in loaded.iter().enumerate() { + if res[i].is_err() || raccs.is_err() { + continue; + } + + let message = &txs[i].message(); + let acc = raccs.as_ref().unwrap(); + for (key, account) in message.account_keys.iter().zip(acc.0.iter()) { + accounts.push((key, account)); + } + } + self.accounts_db.store(fork, &accounts); } /// Purge a fork if it is not a root @@ -833,20 +441,12 @@ mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; - use rand::{thread_rng, Rng}; use solana_sdk::account::Account; use solana_sdk::hash::Hash; use solana_sdk::instruction::CompiledInstruction; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::transaction::Transaction; - fn cleanup_paths(paths: &str) { - let paths = get_paths_vec(&paths); - paths.iter().for_each(|p| { - let _ignored = remove_dir_all(p); - }); - } - fn load_accounts_with_fee( tx: Transaction, ka: &Vec<(Pubkey, Account)>, @@ -1248,456 +848,39 @@ mod tests { ); } - #[macro_export] - macro_rules! tmp_accounts_name { - () => { - &format!("{}-{}", file!(), line!()) - }; - } - - #[macro_export] - macro_rules! get_tmp_accounts_path { - () => { - get_tmp_accounts_path(tmp_accounts_name!()) - }; - } - - struct TempPaths { - pub paths: String, - } - - impl Drop for TempPaths { - fn drop(&mut self) { - cleanup_paths(&self.paths); - } - } - - fn get_tmp_accounts_path(paths: &str) -> TempPaths { - let vpaths = get_paths_vec(paths); - let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); - let vpaths: Vec<_> = vpaths - .iter() - .map(|path| format!("{}/{}", out_dir, path)) - .collect(); - TempPaths { - paths: vpaths.join(","), - } - } - #[test] - fn test_accountsdb_add_root() { - solana_logger::setup(); - let paths = get_tmp_accounts_path!(); - let db = AccountsDB::new(&paths.paths); - let key = Pubkey::default(); - let account0 = Account::new(1, 0, &key); + fn test_load_by_program() { + let accounts = Accounts::new(None); - db.store(0, &[(&key, &account0)]); - db.add_root(0); - let ancestors = vec![(1, 1)].into_iter().collect(); - assert_eq!(db.load_slow(&ancestors, &key), Some(account0)); - } - - #[test] - fn test_accountsdb_latest_ancestor() { - solana_logger::setup(); - let paths = get_tmp_accounts_path!(); - let db = AccountsDB::new(&paths.paths); - let key = Pubkey::default(); - let account0 = Account::new(1, 0, &key); - - db.store(0, &[(&key, &account0)]); - - let account1 = Account::new(0, 0, &key); - db.store(1, &[(&key, &account1)]); - - let ancestors = vec![(1, 1)].into_iter().collect(); - assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); - - let ancestors = vec![(1, 1), (0, 0)].into_iter().collect(); - assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); - } - - #[test] - fn test_accountsdb_latest_ancestor_with_root() { - solana_logger::setup(); - let paths = get_tmp_accounts_path!(); - let db = AccountsDB::new(&paths.paths); - let key = Pubkey::default(); - let account0 = Account::new(1, 0, &key); - - db.store(0, &[(&key, &account0)]); - - let account1 = Account::new(0, 0, &key); - db.store(1, &[(&key, &account1)]); - db.add_root(0); - - let ancestors = vec![(1, 1)].into_iter().collect(); - assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); - - let ancestors = vec![(1, 1), (0, 0)].into_iter().collect(); - assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); - } - - #[test] - fn test_accountsdb_root_one_fork() { - solana_logger::setup(); - let paths = get_tmp_accounts_path!(); - let db = AccountsDB::new(&paths.paths); - let key = Pubkey::default(); - let account0 = Account::new(1, 0, &key); - - // store value 1 in the "root", i.e. db zero - db.store(0, &[(&key, &account0)]); - - // now we have: - // - // root0 -> key.lamports==1 - // / \ - // / \ - // key.lamports==0 <- fork1 \ - // fork2 -> key.lamports==1 - // (via root0) - - // store value 0 in one child - let account1 = Account::new(0, 0, &key); - db.store(1, &[(&key, &account1)]); - - // masking accounts is done at the Accounts level, at accountsDB we see - // original account (but could also accept "None", which is implemented - // at the Accounts level) - let ancestors = vec![(0, 0), (1, 1)].into_iter().collect(); - assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); - - // we should see 1 token in fork 2 - let ancestors = vec![(0, 0), (2, 2)].into_iter().collect(); - assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account0); - - db.add_root(0); - - let ancestors = vec![(1, 1)].into_iter().collect(); - assert_eq!(db.load_slow(&ancestors, &key), Some(account1)); - let ancestors = vec![(2, 2)].into_iter().collect(); - assert_eq!(db.load_slow(&ancestors, &key), Some(account0)); // original value - } - - #[test] - fn test_accountsdb_add_root_many() { - let paths = get_tmp_accounts_path!(); - let db = AccountsDB::new(&paths.paths); - - let mut pubkeys: Vec = vec![]; - create_account(&db, &mut pubkeys, 0, 100, 0, 0); - for _ in 1..100 { - let idx = thread_rng().gen_range(0, 99); - let ancestors = vec![(0, 0)].into_iter().collect(); - let account = db.load_slow(&ancestors, &pubkeys[idx]).unwrap(); - let mut default_account = Account::default(); - default_account.lamports = (idx + 1) as u64; - assert_eq!(default_account, account); - } - - db.add_root(0); - - // check that all the accounts appear with a new root - for _ in 1..100 { - let idx = thread_rng().gen_range(0, 99); - let ancestors = vec![(0, 0)].into_iter().collect(); - let account0 = db.load_slow(&ancestors, &pubkeys[idx]).unwrap(); - let ancestors = vec![(1, 1)].into_iter().collect(); - let account1 = db.load_slow(&ancestors, &pubkeys[idx]).unwrap(); - let mut default_account = Account::default(); - default_account.lamports = (idx + 1) as u64; - assert_eq!(&default_account, &account0); - assert_eq!(&default_account, &account1); - } - } - - #[test] - #[ignore] - fn test_accountsdb_count_stores() { - let paths = get_tmp_accounts_path!(); - let db = AccountsDB::new(&paths.paths); - - let mut pubkeys: Vec = vec![]; - create_account( - &db, - &mut pubkeys, - 0, - 2, - ACCOUNT_DATA_FILE_SIZE as usize / 3, - 0, - ); - assert!(check_storage(&db, 2)); - - let pubkey = Pubkey::new_rand(); - let account = Account::new(1, ACCOUNT_DATA_FILE_SIZE as usize / 3, &pubkey); - db.store(1, &[(&pubkey, &account)]); - db.store(1, &[(&pubkeys[0], &account)]); - { - let stores = db.storage.read().unwrap(); - assert_eq!(stores.len(), 2); - assert_eq!(stores[0].count.load(Ordering::Relaxed), 2); - assert_eq!(stores[1].count.load(Ordering::Relaxed), 2); - } - db.add_root(1); - { - let stores = db.storage.read().unwrap(); - assert_eq!(stores.len(), 2); - assert_eq!(stores[0].count.load(Ordering::Relaxed), 2); - assert_eq!(stores[1].count.load(Ordering::Relaxed), 2); - } - } - - #[test] - fn test_accounts_unsquashed() { - let key = Pubkey::default(); - - // 1 token in the "root", i.e. db zero - let paths = get_tmp_accounts_path!(); - let db0 = AccountsDB::new(&paths.paths); - let account0 = Account::new(1, 0, &key); - db0.store(0, &[(&key, &account0)]); - - // 0 lamports in the child - let account1 = Account::new(0, 0, &key); - db0.store(1, &[(&key, &account1)]); - - // masking accounts is done at the Accounts level, at accountsDB we see - // original account - let ancestors = vec![(0, 0), (1, 1)].into_iter().collect(); - assert_eq!(db0.load_slow(&ancestors, &key), Some(account1)); - - let mut accounts1 = Accounts::new(None); - accounts1.accounts_db = Arc::new(db0); - assert_eq!(accounts1.load_slow(&ancestors, &key), None); - let ancestors = vec![(0, 0)].into_iter().collect(); - assert_eq!(accounts1.load_slow(&ancestors, &key), Some(account0)); - } - - fn create_account( - accounts: &AccountsDB, - pubkeys: &mut Vec, - fork: Fork, - num: usize, - space: usize, - num_vote: usize, - ) { - for t in 0..num { - let pubkey = Pubkey::new_rand(); - let account = Account::new((t + 1) as u64, space, &Account::default().owner); - pubkeys.push(pubkey.clone()); - let ancestors = vec![(fork, 0)].into_iter().collect(); - assert!(accounts.load_slow(&ancestors, &pubkey).is_none()); - accounts.store(fork, &[(&pubkey, &account)]); - } - for t in 0..num_vote { - let pubkey = Pubkey::new_rand(); - let account = Account::new((num + t + 1) as u64, space, &solana_vote_api::id()); - pubkeys.push(pubkey.clone()); - let ancestors = vec![(fork, 0)].into_iter().collect(); - assert!(accounts.load_slow(&ancestors, &pubkey).is_none()); - accounts.store(fork, &[(&pubkey, &account)]); - } - } - - fn update_accounts(accounts: &AccountsDB, pubkeys: &Vec, fork: Fork, range: usize) { - for _ in 1..1000 { - let idx = thread_rng().gen_range(0, range); - let ancestors = vec![(fork, 0)].into_iter().collect(); - if let Some(mut account) = accounts.load_slow(&ancestors, &pubkeys[idx]) { - account.lamports = account.lamports + 1; - accounts.store(fork, &[(&pubkeys[idx], &account)]); - if account.lamports == 0 { - let ancestors = vec![(fork, 0)].into_iter().collect(); - assert!(accounts.load_slow(&ancestors, &pubkeys[idx]).is_none()); - } else { - let mut default_account = Account::default(); - default_account.lamports = account.lamports; - assert_eq!(default_account, account); - } - } - } - } - - fn check_storage(accounts: &AccountsDB, count: usize) -> bool { - let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.len(), 1); - assert_eq!( - stores[0].get_status(), - AccountStorageStatus::StorageAvailable - ); - stores[0].count.load(Ordering::Relaxed) == count - } - - fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec, fork: Fork) { - for _ in 1..100 { - let idx = thread_rng().gen_range(0, 99); - let ancestors = vec![(fork, 0)].into_iter().collect(); - let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap(); - let mut default_account = Account::default(); - default_account.lamports = (idx + 1) as u64; - assert_eq!(default_account, account); - } - } - - #[test] - fn test_account_one() { - let paths = get_tmp_accounts_path!(); - let accounts = AccountsDB::new(&paths.paths); - let mut pubkeys: Vec = vec![]; - create_account(&accounts, &mut pubkeys, 0, 1, 0, 0); - let ancestors = vec![(0, 0)].into_iter().collect(); - let account = accounts.load_slow(&ancestors, &pubkeys[0]).unwrap(); - let mut default_account = Account::default(); - default_account.lamports = 1; - assert_eq!(default_account, account); - } - - #[test] - fn test_account_many() { - let paths = get_tmp_accounts_path("many0,many1"); - let accounts = AccountsDB::new(&paths.paths); - let mut pubkeys: Vec = vec![]; - create_account(&accounts, &mut pubkeys, 0, 100, 0, 0); - check_accounts(&accounts, &pubkeys, 0); - } - - #[test] - fn test_account_update() { - let paths = get_tmp_accounts_path!(); - let accounts = AccountsDB::new(&paths.paths); - let mut pubkeys: Vec = vec![]; - create_account(&accounts, &mut pubkeys, 0, 100, 0, 0); - update_accounts(&accounts, &pubkeys, 0, 99); - assert_eq!(check_storage(&accounts, 100), true); - } - - #[test] - fn test_account_grow_many() { - let paths = get_tmp_accounts_path("many2,many3"); - let size = 4096; - let accounts = AccountsDB::new_with_file_size(&paths.paths, size); - let mut keys = vec![]; - for i in 0..9 { - let key = Pubkey::new_rand(); - let account = Account::new(i + 1, size as usize / 4, &key); - accounts.store(0, &[(&key, &account)]); - keys.push(key); - } - for (i, key) in keys.iter().enumerate() { - let ancestors = vec![(0, 0)].into_iter().collect(); - assert_eq!( - accounts.load_slow(&ancestors, &key).unwrap().lamports, - (i as u64) + 1 - ); - } - - let mut append_vec_histogram = HashMap::new(); - for storage in accounts.storage.read().unwrap().iter() { - *append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1; - } - for count in append_vec_histogram.values() { - assert!(*count >= 2); - } - } - - #[test] - #[ignore] - fn test_account_grow() { - let paths = get_tmp_accounts_path!(); - let accounts = AccountsDB::new(&paths.paths); - let count = [0, 1]; - let status = [ - AccountStorageStatus::StorageAvailable, - AccountStorageStatus::StorageFull, - ]; + // Load accounts owned by various programs into AccountsDB + let pubkey0 = Pubkey::new_rand(); + let account0 = Account::new(1, 0, &Pubkey::new(&[2; 32])); + accounts.store_slow(0, &pubkey0, &account0); let pubkey1 = Pubkey::new_rand(); - let account1 = Account::new(1, ACCOUNT_DATA_FILE_SIZE as usize / 2, &pubkey1); - accounts.store(0, &[(&pubkey1, &account1)]); - { - let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.len(), 1); - assert_eq!(stores[0].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[0].get_status(), status[0]); - } - + let account1 = Account::new(1, 0, &Pubkey::new(&[2; 32])); + accounts.store_slow(0, &pubkey1, &account1); let pubkey2 = Pubkey::new_rand(); - let account2 = Account::new(1, ACCOUNT_DATA_FILE_SIZE as usize / 2, &pubkey2); - accounts.store(0, &[(&pubkey2, &account2)]); - { - let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.len(), 2); - assert_eq!(stores[0].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[0].get_status(), status[1]); - assert_eq!(stores[1].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[1].get_status(), status[0]); - } - let ancestors = vec![(0, 0)].into_iter().collect(); - assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); - assert_eq!(accounts.load_slow(&ancestors, &pubkey2).unwrap(), account2); + let account2 = Account::new(1, 0, &Pubkey::new(&[3; 32])); + accounts.store_slow(0, &pubkey2, &account2); - for i in 0..25 { - let index = i % 2; - accounts.store(0, &[(&pubkey1, &account1)]); - { - let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.len(), 3); - assert_eq!(stores[0].count.load(Ordering::Relaxed), count[index]); - assert_eq!(stores[0].get_status(), status[0]); - assert_eq!(stores[1].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[1].get_status(), status[1]); - assert_eq!(stores[2].count.load(Ordering::Relaxed), count[index ^ 1]); - assert_eq!(stores[2].get_status(), status[0]); - } - let ancestors = vec![(0, 0)].into_iter().collect(); - assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); - assert_eq!(accounts.load_slow(&ancestors, &pubkey2).unwrap(), account2); - } + let loaded = accounts.load_by_program(0, &Pubkey::new(&[2; 32])); + assert_eq!(loaded.len(), 2); + let loaded = accounts.load_by_program(0, &Pubkey::new(&[3; 32])); + assert_eq!(loaded, vec![(pubkey2, account2)]); + let loaded = accounts.load_by_program(0, &Pubkey::new(&[4; 32])); + assert_eq!(loaded, vec![]); } #[test] - fn test_purge_fork_not_root() { - let paths = get_tmp_accounts_path!(); - let accounts = AccountsDB::new(&paths.paths); - let mut pubkeys: Vec = vec![]; - create_account(&accounts, &mut pubkeys, 0, 1, 0, 0); - let ancestors = vec![(0, 0)].into_iter().collect(); - assert!(accounts.load_slow(&ancestors, &pubkeys[0]).is_some());; - accounts.purge_fork(0); - assert!(accounts.load_slow(&ancestors, &pubkeys[0]).is_none());; - } - - #[test] - fn test_purge_fork_after_root() { - let paths = get_tmp_accounts_path!(); - let accounts = AccountsDB::new(&paths.paths); - let mut pubkeys: Vec = vec![]; - create_account(&accounts, &mut pubkeys, 0, 1, 0, 0); - let ancestors = vec![(0, 0)].into_iter().collect(); - accounts.add_root(0); - accounts.purge_fork(0); - assert!(accounts.load_slow(&ancestors, &pubkeys[0]).is_some()); - } - - #[test] - fn test_accounts_empty_hash_internal_state() { - let paths = get_tmp_accounts_path!(); - let accounts = AccountsDB::new(&paths.paths); - assert_eq!(accounts.hash_internal_state(0), None); - } - - #[test] - fn test_accountsdb_account_not_found() { - let paths = get_tmp_accounts_path!(); - let accounts = AccountsDB::new(&paths.paths); + fn test_accounts_account_not_found() { + let accounts = Accounts::new(None); let mut error_counters = ErrorCounters::default(); let ancestors = vec![(0, 0)].into_iter().collect(); - let accounts_index = accounts.accounts_index.read().unwrap(); - let storage = accounts.storage.read().unwrap(); + let accounts_index = accounts.accounts_db.accounts_index.read().unwrap(); + let storage = accounts.accounts_db.storage.read().unwrap(); assert_eq!( - AccountsDB::load_executable_accounts( + Accounts::load_executable_accounts( &storage, &ancestors, &accounts_index, @@ -1710,28 +893,9 @@ mod tests { } #[test] - fn test_load_by_program() { - let paths = get_tmp_accounts_path!(); - let accounts_db = AccountsDB::new(&paths.paths); - - // Load accounts owned by various programs into AccountsDB - let pubkey0 = Pubkey::new_rand(); - let account0 = Account::new(1, 0, &Pubkey::new(&[2; 32])); - accounts_db.store(0, &[(&pubkey0, &account0)]); - let pubkey1 = Pubkey::new_rand(); - let account1 = Account::new(1, 0, &Pubkey::new(&[2; 32])); - accounts_db.store(0, &[(&pubkey1, &account1)]); - let pubkey2 = Pubkey::new_rand(); - let account2 = Account::new(1, 0, &Pubkey::new(&[3; 32])); - accounts_db.store(0, &[(&pubkey2, &account2)]); - - let mut accounts_proper = Accounts::new(None); - accounts_proper.accounts_db = Arc::new(accounts_db); - let accounts = accounts_proper.load_by_program(0, &Pubkey::new(&[2; 32])); - assert_eq!(accounts.len(), 2); - let accounts = accounts_proper.load_by_program(0, &Pubkey::new(&[3; 32])); - assert_eq!(accounts, vec![(pubkey2, account2)]); - let accounts = accounts_proper.load_by_program(0, &Pubkey::new(&[4; 32])); - assert_eq!(accounts, vec![]); + fn test_accounts_empty_hash_internal_state() { + let accounts = Accounts::new(None); + assert_eq!(accounts.hash_internal_state(0), None); } + } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs new file mode 100644 index 0000000000..5f4955d7c5 --- /dev/null +++ b/runtime/src/accounts_db.rs @@ -0,0 +1,834 @@ +//! Persistent accounts are stored in below path location: +//! //data/ +//! +//! The persistent store would allow for this mode of operation: +//! - Concurrent single thread append with many concurrent readers. +//! +//! The underlying memory is memory mapped to a file. The accounts would be +//! stored across multiple files and the mappings of file and offset of a +//! particular account would be stored in a shared index. This will allow for +//! concurrent commits without blocking reads, which will sequentially write +//! to memory, ssd or disk, and should be as fast as the hardware allow for. +//! The only required in memory data structure with a write lock is the index, +//! which should be fast to update. +//! +//! AppendVec's only store accounts for single forks. To bootstrap the +//! index from a persistent store of AppendVec's, the entries include +//! a "write_version". A single global atomic `AccountsDB::write_version` +//! tracks the number of commits to the entire data store. So the latest +//! commit for each fork entry would be indexed. + +use crate::accounts_index::{AccountsIndex, Fork}; +use crate::append_vec::{AppendVec, StorageMeta, StoredAccount}; +use hashbrown::{HashMap, HashSet}; +use log::*; +use rand::{thread_rng, Rng}; +use rayon::prelude::*; +use solana_sdk::account::Account; +use solana_sdk::pubkey::Pubkey; +use std::fs::{create_dir_all, remove_dir_all}; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; + +const ACCOUNT_DATA_FILE_SIZE: u64 = 64 * 1024 * 1024; +const ACCOUNT_DATA_FILE: &str = "data"; + +#[derive(Debug, Default)] +pub struct ErrorCounters { + pub account_not_found: usize, + pub account_in_use: usize, + pub account_loaded_twice: usize, + pub blockhash_not_found: usize, + pub blockhash_too_old: usize, + pub reserve_blockhash: usize, + pub insufficient_funds: usize, + pub invalid_account_index: usize, + pub duplicate_signature: usize, + pub call_chain_too_deep: usize, + pub missing_signature_for_fee: usize, +} + +#[derive(Default, Clone)] +pub struct AccountInfo { + /// index identifying the append storage + id: AppendVecId, + + /// offset into the storage + offset: usize, + + /// lamports in the account used when squashing kept for optimization + /// purposes to remove accounts with zero balance. + lamports: u64, +} +/// An offset into the AccountsDB::storage vector +type AppendVecId = usize; +type AccountStorage = Vec>; +pub type AccountStorageSlice = [Arc]; +pub type InstructionAccounts = Vec; +pub type InstructionLoaders = Vec>; + +#[derive(Debug, PartialEq)] +pub enum AccountStorageStatus { + StorageAvailable = 0, + StorageFull = 1, +} + +impl From for AccountStorageStatus { + fn from(status: usize) -> Self { + use self::AccountStorageStatus::*; + match status { + 0 => StorageAvailable, + 1 => StorageFull, + _ => unreachable!(), + } + } +} + +/// Persistent storage structure holding the accounts +pub struct AccountStorageEntry { + fork_id: Fork, + + /// storage holding the accounts + accounts: AppendVec, + + /// Keeps track of the number of accounts stored in a specific AppendVec. + /// This is periodically checked to reuse the stores that do not have + /// any accounts in it. + count: AtomicUsize, + + /// status corresponding to the storage + status: AtomicUsize, +} + +impl AccountStorageEntry { + pub fn new(path: &str, fork_id: Fork, id: usize, file_size: u64) -> Self { + let p = format!("{}/{}", path, id); + let path = Path::new(&p); + let _ignored = remove_dir_all(path); + create_dir_all(path).expect("Create directory failed"); + let accounts = AppendVec::new(&path.join(ACCOUNT_DATA_FILE), true, file_size as usize); + + AccountStorageEntry { + fork_id, + accounts, + count: AtomicUsize::new(0), + status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize), + } + } + + pub fn set_status(&self, status: AccountStorageStatus) { + self.status.store(status as usize, Ordering::Relaxed); + } + + pub fn get_status(&self) -> AccountStorageStatus { + self.status.load(Ordering::Relaxed).into() + } + + fn add_account(&self) { + self.count.fetch_add(1, Ordering::Relaxed); + } + + fn remove_account(&self) -> bool { + if self.count.fetch_sub(1, Ordering::Relaxed) == 1 { + self.accounts.reset(); + self.set_status(AccountStorageStatus::StorageAvailable); + true + } else { + false + } + } +} + +// This structure handles the load/store of the accounts +#[derive(Default)] +pub struct AccountsDB { + /// Keeps tracks of index into AppendVec on a per fork basis + pub accounts_index: RwLock>, + + /// Account storage + pub storage: RwLock, + + /// distribute the accounts across storage lists + next_id: AtomicUsize, + + /// write version + write_version: AtomicUsize, + + /// Set of storage paths to pick from + paths: Vec, + + /// Starting file size of appendvecs + file_size: u64, +} + +pub fn get_paths_vec(paths: &str) -> Vec { + paths.split(',').map(ToString::to_string).collect() +} + +impl AccountsDB { + pub fn new_with_file_size(paths: &str, file_size: u64) -> Self { + let paths = get_paths_vec(&paths); + AccountsDB { + accounts_index: RwLock::new(AccountsIndex::default()), + storage: RwLock::new(vec![]), + next_id: AtomicUsize::new(0), + write_version: AtomicUsize::new(0), + paths, + file_size, + } + } + + pub fn new(paths: &str) -> Self { + Self::new_with_file_size(paths, ACCOUNT_DATA_FILE_SIZE) + } + + fn new_storage_entry(&self, fork_id: Fork, path: &str) -> AccountStorageEntry { + AccountStorageEntry::new( + path, + fork_id, + self.next_id.fetch_add(1, Ordering::Relaxed), + self.file_size, + ) + } + + pub fn has_accounts(&self, fork: Fork) -> bool { + for x in self.storage.read().unwrap().iter() { + if x.fork_id == fork && x.count.load(Ordering::Relaxed) > 0 { + return true; + } + } + false + } + + /// Scan a specific fork through all the account storage in parallel with sequential read + // PERF: Sequentially read each storage entry in parallel + pub fn scan_account_storage(&self, fork_id: Fork, scan_func: F) -> Vec + where + F: Fn(&StoredAccount, &mut B) -> (), + F: Send + Sync, + B: Send + Default, + { + let storage_maps: Vec> = self + .storage + .read() + .unwrap() + .iter() + .filter(|store| store.fork_id == fork_id) + .cloned() + .collect(); + storage_maps + .into_par_iter() + .map(|storage| { + let accounts = storage.accounts.accounts(0); + let mut retval = B::default(); + accounts + .iter() + .for_each(|stored_account| scan_func(stored_account, &mut retval)); + retval + }) + .collect() + } + + pub fn load( + storage: &AccountStorageSlice, + ancestors: &HashMap, + accounts_index: &AccountsIndex, + pubkey: &Pubkey, + ) -> Option { + let info = accounts_index.get(pubkey, ancestors)?; + //TODO: thread this as a ref + storage + .get(info.id) + .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) + } + + pub fn load_slow(&self, ancestors: &HashMap, pubkey: &Pubkey) -> Option { + let accounts_index = self.accounts_index.read().unwrap(); + let storage = self.storage.read().unwrap(); + Self::load(&storage, ancestors, &accounts_index, pubkey) + } + + fn get_storage_id(&self, fork_id: Fork, start: usize, current: usize) -> usize { + let mut id = current; + let len: usize; + { + let stores = self.storage.read().unwrap(); + len = stores.len(); + if len > 0 { + if id == std::usize::MAX { + id = start % len; + if stores[id].get_status() == AccountStorageStatus::StorageAvailable { + return id; + } + } else { + stores[id].set_status(AccountStorageStatus::StorageFull); + } + + loop { + id = (id + 1) % len; + if fork_id == stores[id].fork_id + && stores[id].get_status() == AccountStorageStatus::StorageAvailable + { + break; + } + if id == start % len { + break; + } + } + } + } + if len == 0 || id == start % len { + let mut stores = self.storage.write().unwrap(); + // check if new store was already created + if stores.len() == len { + let path_idx = thread_rng().gen_range(0, self.paths.len()); + let storage = self.new_storage_entry(fork_id, &self.paths[path_idx]); + stores.push(Arc::new(storage)); + } + id = stores.len() - 1; + } + id + } + + fn append_account(&self, fork_id: Fork, pubkey: &Pubkey, account: &Account) -> (usize, usize) { + let offset: usize; + let start = self.next_id.fetch_add(1, Ordering::Relaxed); + let mut id = self.get_storage_id(fork_id, start, std::usize::MAX); + + // Even if no lamports, need to preserve the account owner so + // we can update the vote_accounts correctly if this account is purged + // when squashing. + let acc = &mut account.clone(); + if account.lamports == 0 { + acc.data.resize(0, 0); + } + + loop { + let result: Option; + { + let accounts = &self.storage.read().unwrap()[id]; + let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64; + let meta = StorageMeta { + write_version, + pubkey: *pubkey, + data_len: account.data.len() as u64, + }; + result = accounts.accounts.append_account(meta, account); + accounts.add_account(); + } + if let Some(val) = result { + offset = val; + break; + } else { + id = self.get_storage_id(fork_id, start, id); + } + } + (id, offset) + } + + pub fn purge_fork(&self, fork: Fork) { + //add_root should be called first + let is_root = self.accounts_index.read().unwrap().is_root(fork); + trace!("PURGING {} {}", fork, is_root); + if !is_root { + self.storage.write().unwrap().retain(|x| { + trace!("PURGING {} {}", x.fork_id, fork); + x.fork_id != fork + }); + } + } + + /// Store the account update. + pub fn store(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) { + //TODO; these blocks should be separate functions and unit tested + let infos: Vec<_> = accounts + .iter() + .map(|(pubkey, account)| { + let (id, offset) = self.append_account(fork_id, pubkey, account); + AccountInfo { + id, + offset, + lamports: account.lamports, + } + }) + .collect(); + + let reclaims: Vec<(Fork, AccountInfo)> = { + let mut index = self.accounts_index.write().unwrap(); + let mut reclaims = vec![]; + for (i, info) in infos.into_iter().enumerate() { + let key = &accounts[i].0; + reclaims.extend(index.insert(fork_id, key, info).into_iter()) + } + reclaims + }; + + let dead_forks: HashSet = { + let stores = self.storage.read().unwrap(); + let mut cleared_forks: HashSet = HashSet::new(); + for (fork_id, account_info) in reclaims { + let cleared = stores[account_info.id].remove_account(); + if cleared { + cleared_forks.insert(fork_id); + } + } + let live_forks: HashSet = stores.iter().map(|x| x.fork_id).collect(); + cleared_forks.difference(&live_forks).cloned().collect() + }; + { + let mut index = self.accounts_index.write().unwrap(); + for fork in dead_forks { + index.cleanup_dead_fork(fork); + } + } + } + + pub fn add_root(&self, fork: Fork) { + self.accounts_index.write().unwrap().add_root(fork) + } +} + +#[cfg(test)] +mod tests { + // TODO: all the bank tests are bank specific, issue: 2194 + use super::*; + use rand::{thread_rng, Rng}; + use solana_sdk::account::Account; + + fn cleanup_paths(paths: &str) { + let paths = get_paths_vec(&paths); + paths.iter().for_each(|p| { + let _ignored = remove_dir_all(p); + }); + } + + struct TempPaths { + pub paths: String, + } + + impl Drop for TempPaths { + fn drop(&mut self) { + cleanup_paths(&self.paths); + } + } + + fn get_tmp_accounts_path(paths: &str) -> TempPaths { + let vpaths = get_paths_vec(paths); + let out_dir = std::env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); + let vpaths: Vec<_> = vpaths + .iter() + .map(|path| format!("{}/{}", out_dir, path)) + .collect(); + TempPaths { + paths: vpaths.join(","), + } + } + + #[macro_export] + macro_rules! tmp_accounts_name { + () => { + &format!("{}-{}", file!(), line!()) + }; + } + + #[macro_export] + macro_rules! get_tmp_accounts_path { + () => { + get_tmp_accounts_path(tmp_accounts_name!()) + }; + } + + #[test] + fn test_accountsdb_add_root() { + solana_logger::setup(); + let paths = get_tmp_accounts_path!(); + let db = AccountsDB::new(&paths.paths); + let key = Pubkey::default(); + let account0 = Account::new(1, 0, &key); + + db.store(0, &[(&key, &account0)]); + db.add_root(0); + let ancestors = vec![(1, 1)].into_iter().collect(); + assert_eq!(db.load_slow(&ancestors, &key), Some(account0)); + } + + #[test] + fn test_accountsdb_latest_ancestor() { + solana_logger::setup(); + let paths = get_tmp_accounts_path!(); + let db = AccountsDB::new(&paths.paths); + let key = Pubkey::default(); + let account0 = Account::new(1, 0, &key); + + db.store(0, &[(&key, &account0)]); + + let account1 = Account::new(0, 0, &key); + db.store(1, &[(&key, &account1)]); + + let ancestors = vec![(1, 1)].into_iter().collect(); + assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); + + let ancestors = vec![(1, 1), (0, 0)].into_iter().collect(); + assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); + } + + #[test] + fn test_accountsdb_latest_ancestor_with_root() { + solana_logger::setup(); + let paths = get_tmp_accounts_path!(); + let db = AccountsDB::new(&paths.paths); + let key = Pubkey::default(); + let account0 = Account::new(1, 0, &key); + + db.store(0, &[(&key, &account0)]); + + let account1 = Account::new(0, 0, &key); + db.store(1, &[(&key, &account1)]); + db.add_root(0); + + let ancestors = vec![(1, 1)].into_iter().collect(); + assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); + + let ancestors = vec![(1, 1), (0, 0)].into_iter().collect(); + assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); + } + + #[test] + fn test_accountsdb_root_one_fork() { + solana_logger::setup(); + let paths = get_tmp_accounts_path!(); + let db = AccountsDB::new(&paths.paths); + let key = Pubkey::default(); + let account0 = Account::new(1, 0, &key); + + // store value 1 in the "root", i.e. db zero + db.store(0, &[(&key, &account0)]); + + // now we have: + // + // root0 -> key.lamports==1 + // / \ + // / \ + // key.lamports==0 <- fork1 \ + // fork2 -> key.lamports==1 + // (via root0) + + // store value 0 in one child + let account1 = Account::new(0, 0, &key); + db.store(1, &[(&key, &account1)]); + + // masking accounts is done at the Accounts level, at accountsDB we see + // original account (but could also accept "None", which is implemented + // at the Accounts level) + let ancestors = vec![(0, 0), (1, 1)].into_iter().collect(); + assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account1); + + // we should see 1 token in fork 2 + let ancestors = vec![(0, 0), (2, 2)].into_iter().collect(); + assert_eq!(&db.load_slow(&ancestors, &key).unwrap(), &account0); + + db.add_root(0); + + let ancestors = vec![(1, 1)].into_iter().collect(); + assert_eq!(db.load_slow(&ancestors, &key), Some(account1)); + let ancestors = vec![(2, 2)].into_iter().collect(); + assert_eq!(db.load_slow(&ancestors, &key), Some(account0)); // original value + } + + #[test] + fn test_accountsdb_add_root_many() { + let paths = get_tmp_accounts_path!(); + let db = AccountsDB::new(&paths.paths); + + let mut pubkeys: Vec = vec![]; + create_account(&db, &mut pubkeys, 0, 100, 0, 0); + for _ in 1..100 { + let idx = thread_rng().gen_range(0, 99); + let ancestors = vec![(0, 0)].into_iter().collect(); + let account = db.load_slow(&ancestors, &pubkeys[idx]).unwrap(); + let mut default_account = Account::default(); + default_account.lamports = (idx + 1) as u64; + assert_eq!(default_account, account); + } + + db.add_root(0); + + // check that all the accounts appear with a new root + for _ in 1..100 { + let idx = thread_rng().gen_range(0, 99); + let ancestors = vec![(0, 0)].into_iter().collect(); + let account0 = db.load_slow(&ancestors, &pubkeys[idx]).unwrap(); + let ancestors = vec![(1, 1)].into_iter().collect(); + let account1 = db.load_slow(&ancestors, &pubkeys[idx]).unwrap(); + let mut default_account = Account::default(); + default_account.lamports = (idx + 1) as u64; + assert_eq!(&default_account, &account0); + assert_eq!(&default_account, &account1); + } + } + + #[test] + #[ignore] + fn test_accountsdb_count_stores() { + let paths = get_tmp_accounts_path!(); + let db = AccountsDB::new(&paths.paths); + + let mut pubkeys: Vec = vec![]; + create_account( + &db, + &mut pubkeys, + 0, + 2, + ACCOUNT_DATA_FILE_SIZE as usize / 3, + 0, + ); + assert!(check_storage(&db, 2)); + + let pubkey = Pubkey::new_rand(); + let account = Account::new(1, ACCOUNT_DATA_FILE_SIZE as usize / 3, &pubkey); + db.store(1, &[(&pubkey, &account)]); + db.store(1, &[(&pubkeys[0], &account)]); + { + let stores = db.storage.read().unwrap(); + assert_eq!(stores.len(), 2); + assert_eq!(stores[0].count.load(Ordering::Relaxed), 2); + assert_eq!(stores[1].count.load(Ordering::Relaxed), 2); + } + db.add_root(1); + { + let stores = db.storage.read().unwrap(); + assert_eq!(stores.len(), 2); + assert_eq!(stores[0].count.load(Ordering::Relaxed), 2); + assert_eq!(stores[1].count.load(Ordering::Relaxed), 2); + } + } + + #[test] + fn test_accounts_unsquashed() { + let key = Pubkey::default(); + + // 1 token in the "root", i.e. db zero + let paths = get_tmp_accounts_path!(); + let db0 = AccountsDB::new(&paths.paths); + let account0 = Account::new(1, 0, &key); + db0.store(0, &[(&key, &account0)]); + + // 0 lamports in the child + let account1 = Account::new(0, 0, &key); + db0.store(1, &[(&key, &account1)]); + + // masking accounts is done at the Accounts level, at accountsDB we see + // original account + let ancestors = vec![(0, 0), (1, 1)].into_iter().collect(); + assert_eq!(db0.load_slow(&ancestors, &key), Some(account1)); + let ancestors = vec![(0, 0)].into_iter().collect(); + assert_eq!(db0.load_slow(&ancestors, &key), Some(account0)); + } + + fn create_account( + accounts: &AccountsDB, + pubkeys: &mut Vec, + fork: Fork, + num: usize, + space: usize, + num_vote: usize, + ) { + for t in 0..num { + let pubkey = Pubkey::new_rand(); + let account = Account::new((t + 1) as u64, space, &Account::default().owner); + pubkeys.push(pubkey.clone()); + let ancestors = vec![(fork, 0)].into_iter().collect(); + assert!(accounts.load_slow(&ancestors, &pubkey).is_none()); + accounts.store(fork, &[(&pubkey, &account)]); + } + for t in 0..num_vote { + let pubkey = Pubkey::new_rand(); + let account = Account::new((num + t + 1) as u64, space, &solana_vote_api::id()); + pubkeys.push(pubkey.clone()); + let ancestors = vec![(fork, 0)].into_iter().collect(); + assert!(accounts.load_slow(&ancestors, &pubkey).is_none()); + accounts.store(fork, &[(&pubkey, &account)]); + } + } + + fn update_accounts(accounts: &AccountsDB, pubkeys: &Vec, fork: Fork, range: usize) { + for _ in 1..1000 { + let idx = thread_rng().gen_range(0, range); + let ancestors = vec![(fork, 0)].into_iter().collect(); + if let Some(mut account) = accounts.load_slow(&ancestors, &pubkeys[idx]) { + account.lamports = account.lamports + 1; + accounts.store(fork, &[(&pubkeys[idx], &account)]); + if account.lamports == 0 { + let ancestors = vec![(fork, 0)].into_iter().collect(); + assert!(accounts.load_slow(&ancestors, &pubkeys[idx]).is_none()); + } else { + let mut default_account = Account::default(); + default_account.lamports = account.lamports; + assert_eq!(default_account, account); + } + } + } + } + + fn check_storage(accounts: &AccountsDB, count: usize) -> bool { + let stores = accounts.storage.read().unwrap(); + assert_eq!(stores.len(), 1); + assert_eq!( + stores[0].get_status(), + AccountStorageStatus::StorageAvailable + ); + stores[0].count.load(Ordering::Relaxed) == count + } + + fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec, fork: Fork) { + for _ in 1..100 { + let idx = thread_rng().gen_range(0, 99); + let ancestors = vec![(fork, 0)].into_iter().collect(); + let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap(); + let mut default_account = Account::default(); + default_account.lamports = (idx + 1) as u64; + assert_eq!(default_account, account); + } + } + + #[test] + fn test_account_one() { + let paths = get_tmp_accounts_path!(); + let accounts = AccountsDB::new(&paths.paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 0, 1, 0, 0); + let ancestors = vec![(0, 0)].into_iter().collect(); + let account = accounts.load_slow(&ancestors, &pubkeys[0]).unwrap(); + let mut default_account = Account::default(); + default_account.lamports = 1; + assert_eq!(default_account, account); + } + + #[test] + fn test_account_many() { + let paths = get_tmp_accounts_path("many0,many1"); + let accounts = AccountsDB::new(&paths.paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 0, 100, 0, 0); + check_accounts(&accounts, &pubkeys, 0); + } + + #[test] + fn test_account_update() { + let paths = get_tmp_accounts_path!(); + let accounts = AccountsDB::new(&paths.paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 0, 100, 0, 0); + update_accounts(&accounts, &pubkeys, 0, 99); + assert_eq!(check_storage(&accounts, 100), true); + } + + #[test] + fn test_account_grow_many() { + let paths = get_tmp_accounts_path("many2,many3"); + let size = 4096; + let accounts = AccountsDB::new_with_file_size(&paths.paths, size); + let mut keys = vec![]; + for i in 0..9 { + let key = Pubkey::new_rand(); + let account = Account::new(i + 1, size as usize / 4, &key); + accounts.store(0, &[(&key, &account)]); + keys.push(key); + } + for (i, key) in keys.iter().enumerate() { + let ancestors = vec![(0, 0)].into_iter().collect(); + assert_eq!( + accounts.load_slow(&ancestors, &key).unwrap().lamports, + (i as u64) + 1 + ); + } + + let mut append_vec_histogram = HashMap::new(); + for storage in accounts.storage.read().unwrap().iter() { + *append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1; + } + for count in append_vec_histogram.values() { + assert!(*count >= 2); + } + } + + #[test] + #[ignore] + fn test_account_grow() { + let paths = get_tmp_accounts_path!(); + let accounts = AccountsDB::new(&paths.paths); + let count = [0, 1]; + let status = [ + AccountStorageStatus::StorageAvailable, + AccountStorageStatus::StorageFull, + ]; + let pubkey1 = Pubkey::new_rand(); + let account1 = Account::new(1, ACCOUNT_DATA_FILE_SIZE as usize / 2, &pubkey1); + accounts.store(0, &[(&pubkey1, &account1)]); + { + let stores = accounts.storage.read().unwrap(); + assert_eq!(stores.len(), 1); + assert_eq!(stores[0].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[0].get_status(), status[0]); + } + + let pubkey2 = Pubkey::new_rand(); + let account2 = Account::new(1, ACCOUNT_DATA_FILE_SIZE as usize / 2, &pubkey2); + accounts.store(0, &[(&pubkey2, &account2)]); + { + let stores = accounts.storage.read().unwrap(); + assert_eq!(stores.len(), 2); + assert_eq!(stores[0].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[0].get_status(), status[1]); + assert_eq!(stores[1].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[1].get_status(), status[0]); + } + let ancestors = vec![(0, 0)].into_iter().collect(); + assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); + assert_eq!(accounts.load_slow(&ancestors, &pubkey2).unwrap(), account2); + + for i in 0..25 { + let index = i % 2; + accounts.store(0, &[(&pubkey1, &account1)]); + { + let stores = accounts.storage.read().unwrap(); + assert_eq!(stores.len(), 3); + assert_eq!(stores[0].count.load(Ordering::Relaxed), count[index]); + assert_eq!(stores[0].get_status(), status[0]); + assert_eq!(stores[1].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[1].get_status(), status[1]); + assert_eq!(stores[2].count.load(Ordering::Relaxed), count[index ^ 1]); + assert_eq!(stores[2].get_status(), status[0]); + } + let ancestors = vec![(0, 0)].into_iter().collect(); + assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); + assert_eq!(accounts.load_slow(&ancestors, &pubkey2).unwrap(), account2); + } + } + + #[test] + fn test_purge_fork_not_root() { + let paths = get_tmp_accounts_path!(); + let accounts = AccountsDB::new(&paths.paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 0, 1, 0, 0); + let ancestors = vec![(0, 0)].into_iter().collect(); + assert!(accounts.load_slow(&ancestors, &pubkeys[0]).is_some());; + accounts.purge_fork(0); + assert!(accounts.load_slow(&ancestors, &pubkeys[0]).is_none());; + } + + #[test] + fn test_purge_fork_after_root() { + let paths = get_tmp_accounts_path!(); + let accounts = AccountsDB::new(&paths.paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 0, 1, 0, 0); + let ancestors = vec![(0, 0)].into_iter().collect(); + accounts.add_root(0); + accounts.purge_fork(0); + assert!(accounts.load_slow(&ancestors, &pubkeys[0]).is_some()); + } + +} diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index fb3018b55c..49a0973fb8 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -3,7 +3,8 @@ //! on behalf of the caller, and a low-level API for when they have //! already been signed and verified. -use crate::accounts::{Accounts, ErrorCounters, InstructionAccounts, InstructionLoaders}; +use crate::accounts::Accounts; +use crate::accounts_db::{ErrorCounters, InstructionAccounts, InstructionLoaders}; use crate::blockhash_queue::BlockhashQueue; use crate::locked_accounts_results::LockedAccountsResults; use crate::message_processor::{MessageProcessor, ProcessInstruction}; diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index b7b14fcf51..3a91d1b110 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -1,4 +1,5 @@ mod accounts; +mod accounts_db; mod accounts_index; pub mod append_vec; pub mod bank;