From c276375a0ef8e9200629241a91c35e52d78c112b Mon Sep 17 00:00:00 2001 From: Sathish Ambley Date: Mon, 24 Dec 2018 16:11:20 -0800 Subject: [PATCH] Persistent account storage across directories --- fullnode/src/main.rs | 11 + runtime/src/accounts.rs | 778 ++++++++++++++++++++++++++++++++-------- runtime/src/bank.rs | 98 +++-- src/fullnode.rs | 7 +- src/replay_stage.rs | 2 +- tests/multinode.rs | 2 +- 6 files changed, 700 insertions(+), 198 deletions(-) diff --git a/fullnode/src/main.rs b/fullnode/src/main.rs index 048a4be9b6..e4ad98dfc8 100644 --- a/fullnode/src/main.rs +++ b/fullnode/src/main.rs @@ -200,6 +200,14 @@ fn main() { .takes_value(true) .help("Rendezvous with the vote signer at this RPC end point"), ) + .arg( + Arg::with_name("accounts") + .short("a") + .long("accounts") + .value_name("PATHS") + .takes_value(true) + .help("Comma separated persistent accounts location"), + ) .get_matches(); let mut fullnode_config = FullnodeConfig::default(); @@ -209,6 +217,9 @@ fn main() { let use_only_bootstrap_leader = matches.is_present("no_leader_rotation"); let (keypair, gossip) = parse_identity(&matches); let ledger_path = matches.value_of("ledger").unwrap(); + if let Some(paths) = matches.value_of("accounts") { + fullnode_config.account_paths = paths.to_string(); + } let cluster_entrypoint = matches .value_of("network") .map(|network| network.parse().expect("failed to parse network address")); diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index df456cb301..2545d2aa92 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -1,3 +1,4 @@ +use crate::appendvec::AppendVec; use crate::bank::{BankError, Result}; use crate::runtime::has_duplicates; use bincode::serialize; @@ -9,9 +10,12 @@ use solana_sdk::hash::{hash, Hash}; use solana_sdk::native_loader; use solana_sdk::pubkey::Pubkey; use solana_sdk::transaction::Transaction; +use solana_sdk::vote_program; use std::collections::BTreeMap; -use std::ops::Deref; -use std::sync::{Mutex, RwLock}; +use std::fs::{create_dir_all, remove_dir_all}; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; pub type InstructionAccounts = Vec; pub type InstructionLoaders = Vec>; @@ -30,84 +34,389 @@ pub struct ErrorCounters { pub missing_signature_for_fee: usize, } -/// This structure handles the load/store of the accounts +// +// Persistent accounts are stored in below path location: +// //data/ +// +// Each account is stored in below format: +// +// +// The persistent store would allow for this mode of operation: +// - Concurrent single thread append with many concurrent readers. +// - Exclusive resize or truncate from the start. +// +// The underlying memory is memory mapped to a file. The accounts would be +// stored across multiple files and the mappings of file and offset of a +// particular account would be stored in a shared index. This will allow for +// concurrent commits without blocking reads, which will sequentially write +// to memory, ssd or disk, and should be as fast as the hardware allow for. +// The only required in memory data structure with a write lock is the index, +// which should be fast to update. +// +// To garbage collect, data can be re-appended to defragmnted and truncated from +// the start. The AccountsDB data structure would allow for +// - multiple readers +// - multiple writers +// - persistent backed memory +// +// To bootstrap the index from a persistent store of AppendVec's, the entries should +// also include a "commit counter". A single global atomic that tracks the number +// of commits to the entire data store. So the latest commit for each fork entry +// would be indexed. (TODO) + +const ACCOUNT_DATA_FILE_SIZE: u64 = 64 * 1024 * 1024; +const ACCOUNT_DATA_FILE: &str = "data"; +const NUM_ACCOUNT_DIRS: usize = 4; + +/// An offset into the AccountsDB::storage vector +type AppendVecId = usize; + +type Fork = u64; + +struct AccountMap(Vec<(Fork, (AppendVecId, u64))>); + +#[derive(Debug, PartialEq)] +enum AccountStorageStatus { + StorageAvailable = 0, + StorageFull = 1, +} + +impl From for AccountStorageStatus { + fn from(status: usize) -> Self { + use self::AccountStorageStatus::*; + match status { + 0 => StorageAvailable, + 1 => StorageFull, + _ => unreachable!(), + } + } +} + +struct AccountIndexInfo { + /// For each Pubkey, the account for a specific fork is in a specific + /// AppendVec at a specific index + index: RwLock>, + + /// Cached index to vote accounts for performance reasons to avoid having + /// to iterate through the entire accounts each time + vote_index: RwLock>, +} + +/// Persistent storage structure holding the accounts +struct AccountStorage { + /// storage holding the accounts + appendvec: Arc>>, + + /// Keeps track of the number of accounts stored in a specific AppendVec. + /// This is periodically checked to reuse the stores that do not have + /// any accounts in it. + count: AtomicUsize, + + /// status corresponding to the storage + status: AtomicUsize, + + /// Path to the persistent store + path: String, +} + +impl AccountStorage { + pub fn set_status(&self, status: AccountStorageStatus) { + self.status.store(status as usize, Ordering::Relaxed); + } + + pub fn get_status(&self) -> AccountStorageStatus { + self.status.load(Ordering::Relaxed).into() + } +} + +// This structure handles the load/store of the accounts pub struct AccountsDB { - /// Mapping of known public keys/IDs to accounts - pub accounts: HashMap, + /// Keeps tracks of index into AppendVec on a per fork basis + index_info: AccountIndexInfo, + + /// Account storage + storage: RwLock>, + + /// distribute the accounts across storage lists + next_id: AtomicUsize, /// The number of transactions the bank has processed without error since the /// start of the ledger. - transaction_count: u64, -} - -/// This structure handles synchronization for db -pub struct Accounts { - pub accounts_db: RwLock, - - /// set of accounts which are currently in the pipeline - account_locks: Mutex>, + transaction_count: RwLock, } impl Default for AccountsDB { fn default() -> Self { - Self { - accounts: HashMap::new(), - transaction_count: 0, + let index_info = AccountIndexInfo { + index: RwLock::new(HashMap::new()), + vote_index: RwLock::new(HashSet::new()), + }; + AccountsDB { + index_info, + storage: RwLock::new(vec![]), + next_id: AtomicUsize::new(0), + transaction_count: RwLock::new(0), } } } -impl Default for Accounts { - fn default() -> Self { - Self { - account_locks: Mutex::new(HashSet::new()), - accounts_db: RwLock::new(AccountsDB::default()), - } +/// This structure handles synchronization for db +pub struct Accounts { + pub accounts_db: AccountsDB, + + /// set of accounts which are currently in the pipeline + account_locks: Mutex>, + + /// List of persistent stores + paths: String, +} + +impl Drop for Accounts { + fn drop(&mut self) { + let paths: Vec = self.paths.split(',').map(|s| s.to_string()).collect(); + paths.iter().for_each(|p| { + let _ignored = remove_dir_all(p); + }) } } impl AccountsDB { - pub fn hash_internal_state(&self) -> Hash { - let mut ordered_accounts = BTreeMap::new(); - - // only hash internal state of the part being voted upon, i.e. since last - // checkpoint - for (pubkey, account) in &self.accounts { - ordered_accounts.insert(*pubkey, account.clone()); - } - - hash(&serialize(&ordered_accounts).unwrap()) + pub fn add_storage(&self, paths: &str) { + let paths: Vec = paths.split(',').map(|s| s.to_string()).collect(); + let mut stores: Vec = vec![]; + paths.iter().for_each(|p| { + let path = format!("{}/{}", p, std::process::id()); + let storage = AccountStorage { + appendvec: self.new_account_storage(&path), + status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize), + count: AtomicUsize::new(0), + path: path.to_string(), + }; + stores.push(storage); + }); + let _ignored = stores[0].appendvec.write().unwrap().grow_file(); + let mut storage = self.storage.write().unwrap(); + storage.append(&mut stores); } - fn load(checkpoints: &[U], pubkey: &Pubkey) -> Option - where - U: Deref, - { - for db in checkpoints { - if let Some(account) = db.accounts.get(pubkey) { - return Some(account.clone()); + fn new_account_storage(&self, p: &str) -> Arc>> { + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + let p = format!("{}/{}", p, id); + let path = Path::new(&p); + let _ignored = remove_dir_all(path); + create_dir_all(path).expect("Create directory failed"); + Arc::new(RwLock::new(AppendVec::::new( + &path.join(ACCOUNT_DATA_FILE), + true, + ACCOUNT_DATA_FILE_SIZE, + 0, + ))) + } + + pub fn get_vote_accounts(&self, fork: Fork) -> Vec { + let mut accounts: Vec = vec![]; + self.index_info + .vote_index + .read() + .unwrap() + .iter() + .for_each(|p| { + if let Some(forks) = self.index_info.index.read().unwrap().get(p) { + for (v_fork, (id, index)) in forks.0.iter() { + if fork == *v_fork { + accounts.push( + self.storage.read().unwrap()[*id] + .appendvec + .read() + .unwrap() + .get_account(*index) + .unwrap(), + ); + } + if fork > *v_fork { + break; + } + } + } + }); + accounts + } + + pub fn hash_internal_state(&self, fork: Fork) -> Option { + let mut ordered_accounts = BTreeMap::new(); + let rindex = self.index_info.index.read().unwrap(); + rindex.iter().for_each(|(p, forks)| { + for (v_fork, (id, index)) in forks.0.iter() { + if fork == *v_fork { + let account = self.storage.read().unwrap()[*id] + .appendvec + .read() + .unwrap() + .get_account(*index) + .unwrap(); + ordered_accounts.insert(*p, account); + } + if fork > *v_fork { + break; + } + } + }); + + if ordered_accounts.is_empty() { + return None; + } + Some(hash(&serialize(&ordered_accounts).unwrap())) + } + + fn load(&self, fork: Fork, pubkey: &Pubkey) -> Option { + let index = self.index_info.index.read().unwrap(); + if let Some(forks) = index.get(pubkey) { + // find most recent fork that is an ancestor of current_fork + for (v_fork, (id, offset)) in forks.0.iter() { + if *v_fork > fork { + continue; + } else { + let appendvec = &self.storage.read().unwrap()[*id].appendvec; + let av = appendvec.read().unwrap(); + return Some(av.get_account(*offset).unwrap()); + } } } None } + + fn get_storage_id(&self, start: usize, current: usize) -> usize { + let mut id = current; + let len: usize; + { + let stores = self.storage.read().unwrap(); + len = stores.len(); + if id == std::usize::MAX { + id = start % len; + if stores[id].get_status() == AccountStorageStatus::StorageAvailable { + return id; + } + } else { + stores[id].set_status(AccountStorageStatus::StorageFull); + } + + loop { + id = (id + 1) % len; + if stores[id].get_status() == AccountStorageStatus::StorageAvailable { + break; + } + if id == start % len { + break; + } + } + } + if id == start % len { + let mut stores = self.storage.write().unwrap(); + // check if new store was already created + if stores.len() == len { + let storage = AccountStorage { + appendvec: self.new_account_storage(&stores[id].path), + count: AtomicUsize::new(0), + status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize), + path: stores[id].path.clone(), + }; + stores.push(storage); + } + id = stores.len() - 1; + } + id + } + + fn append_account(&self, account: &Account) -> (usize, u64) { + let offset: u64; + let start = self.next_id.fetch_add(1, Ordering::Relaxed); + let mut id = self.get_storage_id(start, std::usize::MAX); + let mut acc = &Account::default(); + loop { + let result: Option; + { + if account.tokens != 0 { + acc = account; + } + let av = &self.storage.read().unwrap()[id].appendvec; + result = av.read().unwrap().append_account(&acc); + } + if let Some(val) = result { + offset = val; + break; + } else { + id = self.get_storage_id(start, id); + } + } + (id, offset) + } + /// Store the account update. If the update is to delete the account because the token balance /// is 0, purge needs to be set to true for the delete to occur in place. - pub fn store(&mut self, purge: bool, pubkey: &Pubkey, account: &Account) { - if account.tokens == 0 { - if purge { - // purge if balance is 0 and no checkpoints - self.accounts.remove(pubkey); - } else { - // store default account if balance is 0 and there's a checkpoint - self.accounts.insert(pubkey.clone(), Account::default()); + pub fn store(&self, fork: Fork, purge: bool, pubkey: &Pubkey, account: &Account) { + if account.tokens == 0 && purge { + // purge if balance is 0 and no checkpoints + let mut index = self.index_info.index.write().unwrap(); + if let Some(forks) = index.remove(pubkey) { + let stores = self.storage.read().unwrap(); + for (_, (id, _)) in forks.0.iter() { + stores[*id].count.fetch_sub(1, Ordering::Relaxed); + } + } + if vote_program::check_id(&account.owner) { + self.index_info.vote_index.write().unwrap().remove(pubkey); } } else { - self.accounts.insert(pubkey.clone(), account.clone()); + let (id, offset) = self.append_account(&account); + + if vote_program::check_id(&account.owner) { + let mut index = self.index_info.vote_index.write().unwrap(); + if account.tokens == 0 { + index.remove(pubkey); + } else { + index.insert(*pubkey); + } + } + + let mut result: Option = None; + { + let mut insert: Option = None; + let mut windex = self.index_info.index.write().unwrap(); + let forks = windex.entry(*pubkey).or_insert(AccountMap(vec![])); + for (i, (v_fork, (v_id, _))) in forks.0.iter().enumerate() { + if *v_fork > fork { + continue; + } + if *v_fork == fork { + result = Some(*v_id); + forks.0[i] = (fork, (id, offset)); + break; + } + insert = Some(i); + break; + } + if result.is_none() { + if let Some(index) = insert { + forks.0.insert(index, (fork, (id, offset))); + } else { + forks.0.push((fork, (id, offset))); + } + } + } + let stores = self.storage.read().unwrap(); + stores[id].count.fetch_add(1, Ordering::Relaxed); + if let Some(old_id) = result { + if stores[old_id].count.fetch_sub(1, Ordering::Relaxed) == 1 { + stores[old_id].appendvec.write().unwrap().reset(); + stores[old_id].set_status(AccountStorageStatus::StorageAvailable); + } + } } } pub fn store_accounts( - &mut self, + &self, + fork: Fork, purge: bool, txs: &[Transaction], res: &[Result<()>], @@ -121,18 +430,17 @@ impl AccountsDB { let tx = &txs[i]; let acc = raccs.as_ref().unwrap(); for (key, account) in tx.account_keys.iter().zip(acc.0.iter()) { - self.store(purge, key, account); + self.store(fork, purge, key, account); } } } - fn load_tx_accounts( - checkpoints: &[U], + + fn load_tx_accounts( + &self, + fork: Fork, tx: &Transaction, error_counters: &mut ErrorCounters, - ) -> Result> - where - U: Deref, - { + ) -> Result> { // Copy all the accounts if tx.signatures.is_empty() && tx.fee != 0 { Err(BankError::MissingSignatureForFee) @@ -147,7 +455,7 @@ impl AccountsDB { // If a fee can pay for execution then the program will be scheduled let mut called_accounts: Vec = vec![]; for key in &tx.account_keys { - called_accounts.push(Self::load(checkpoints, key).unwrap_or_default()); + called_accounts.push(self.load(fork, key).unwrap_or_default()); } if called_accounts.is_empty() || called_accounts[0].tokens == 0 { error_counters.account_not_found += 1; @@ -162,14 +470,12 @@ impl AccountsDB { } } - fn load_executable_accounts( - checkpoints: &[U], + fn load_executable_accounts( + &self, + fork: Fork, mut program_id: Pubkey, error_counters: &mut ErrorCounters, - ) -> Result> - where - U: Deref, - { + ) -> Result> { let mut accounts = Vec::new(); let mut depth = 0; loop { @@ -184,7 +490,7 @@ impl AccountsDB { } depth += 1; - let program = match Self::load(checkpoints, &program_id) { + let program = match self.load(fork, &program_id) { Some(program) => program, None => { error_counters.account_not_found += 1; @@ -205,14 +511,12 @@ impl AccountsDB { } /// For each program_id in the transaction, load its loaders. - fn load_loaders( - checkpoints: &[U], + fn load_loaders( + &self, + fork: Fork, tx: &Transaction, error_counters: &mut ErrorCounters, - ) -> Result>> - where - U: Deref, - { + ) -> Result>> { tx.instructions .iter() .map(|ix| { @@ -221,26 +525,24 @@ impl AccountsDB { return Err(BankError::AccountNotFound); } let program_id = tx.program_ids[ix.program_ids_index as usize]; - Self::load_executable_accounts(checkpoints, program_id, error_counters) + self.load_executable_accounts(fork, program_id, error_counters) }) .collect() } - fn load_accounts( - checkpoints: &[U], + fn load_accounts( + &self, + fork: Fork, txs: &[Transaction], lock_results: Vec>, error_counters: &mut ErrorCounters, - ) -> Vec> - where - U: Deref, - { + ) -> Vec> { txs.iter() .zip(lock_results.into_iter()) .map(|etx| match etx { (tx, Ok(())) => { - let accounts = Self::load_tx_accounts(checkpoints, tx, error_counters)?; - let loaders = Self::load_loaders(checkpoints, tx, error_counters)?; + let accounts = self.load_tx_accounts(fork, tx, error_counters)?; + let loaders = self.load_loaders(fork, tx, error_counters)?; Ok((accounts, loaders)) } (_, Err(e)) => Err(e), @@ -248,19 +550,22 @@ impl AccountsDB { .collect() } - pub fn increment_transaction_count(&mut self, tx_count: usize) { - self.transaction_count += tx_count as u64 + pub fn increment_transaction_count(&self, tx_count: usize) { + let mut tx = self.transaction_count.write().unwrap(); + *tx += tx_count as u64; } pub fn transaction_count(&self) -> u64 { - self.transaction_count + let tx = self.transaction_count.read().unwrap(); + *tx } /// become the root accountsDB fn squash(&mut self, parents: &[U]) where - U: Deref, + U: std::ops::Deref, { + /* self.transaction_count += parents .iter() .fold(0, |sum, parent| sum + parent.transaction_count); @@ -277,29 +582,44 @@ impl AccountsDB { // toss any zero-balance accounts, since self is root now self.accounts.retain(|_, account| account.tokens != 0); + */ } } impl Accounts { - /// Slow because lock is held for 1 operation insted of many - pub fn load_slow(checkpoints: &[U], pubkey: &Pubkey) -> Option - where - U: Deref, - { - let dbs: Vec<_> = checkpoints - .iter() - .map(|obj| obj.accounts_db.read().unwrap()) - .collect(); - AccountsDB::load(&dbs, pubkey).filter(|acc| acc.tokens != 0) + pub fn new(in_paths: &str) -> Self { + static ACCOUNT_DIR: AtomicUsize = AtomicUsize::new(0); + let paths = if !in_paths.is_empty() { + in_paths.to_string() + } else { + let mut dir: usize; + dir = ACCOUNT_DIR.fetch_add(1, Ordering::Relaxed); + let mut paths = dir.to_string(); + for _ in 1..NUM_ACCOUNT_DIRS { + dir = ACCOUNT_DIR.fetch_add(1, Ordering::Relaxed); + paths = format!("{},{}", paths, dir.to_string()); + } + paths + }; + let accounts_db = AccountsDB::default(); + accounts_db.add_storage(&paths); + Accounts { + accounts_db, + account_locks: Mutex::new(HashSet::new()), + paths, + } } + + /// Slow because lock is held for 1 operation insted of many + pub fn load_slow(&self, fork: Fork, pubkey: &Pubkey) -> Option { + self.accounts_db.load(fork, pubkey).filter(|acc| acc.tokens != 0) + } + /// Slow because lock is held for 1 operation insted of many /// * purge - if the account token value is 0 and purge is true then delete the account. /// purge should be set to false for overlays, and true for the root checkpoint. - pub fn store_slow(&self, purge: bool, pubkey: &Pubkey, account: &Account) { - self.accounts_db - .write() - .unwrap() - .store(purge, pubkey, account) + pub fn store_slow(&self, fork: Fork, purge: bool, pubkey: &Pubkey, account: &Account) { + self.accounts_db.store(fork, purge, pubkey, account) } fn lock_account( @@ -331,8 +651,8 @@ impl Accounts { } } - pub fn hash_internal_state(&self) -> Hash { - self.accounts_db.read().unwrap().hash_internal_state() + pub fn hash_internal_state(&self, fork: Fork) -> Option { + self.accounts_db.hash_internal_state(fork) } /// This function will prevent multiple threads from modifying the same account state at the @@ -363,20 +683,15 @@ impl Accounts { .for_each(|(tx, result)| Self::unlock_account(tx, result, &mut account_locks)); } - pub fn load_accounts( - checkpoints: &[U], + pub fn load_accounts( + &self, + fork: Fork, txs: &[Transaction], results: Vec>, error_counters: &mut ErrorCounters, - ) -> Vec> - where - U: Deref, - { - let dbs: Vec<_> = checkpoints - .iter() - .map(|obj| obj.accounts_db.read().unwrap()) - .collect(); - AccountsDB::load_accounts(&dbs, txs, results, error_counters) + ) -> Vec> { + self.accounts_db + .load_accounts(fork, txs, results, error_counters) } /// Store the accounts into the DB @@ -384,42 +699,44 @@ impl Accounts { /// purge should be set to false for overlays, and true for the root checkpoint. pub fn store_accounts( &self, + fork: Fork, purge: bool, txs: &[Transaction], res: &[Result<()>], loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], ) { self.accounts_db - .write() - .unwrap() - .store_accounts(purge, txs, res, loaded) + .store_accounts(fork, purge, txs, res, loaded) } pub fn increment_transaction_count(&self, tx_count: usize) { - self.accounts_db - .write() - .unwrap() - .increment_transaction_count(tx_count) + self.accounts_db.increment_transaction_count(tx_count) } pub fn transaction_count(&self) -> u64 { - self.accounts_db.read().unwrap().transaction_count() + self.accounts_db.transaction_count() } /// accounts starts with an empty data structure for every child/fork /// this function squashes all the parents into this instance pub fn squash(&self, parents: &[U]) where - U: Deref, + U: std::ops::Deref, { assert!(self.account_locks.lock().unwrap().is_empty()); + /* let dbs: Vec<_> = parents .iter() .map(|obj| obj.accounts_db.read().unwrap()) .collect(); self.accounts_db.write().unwrap().squash(&dbs); + */ + } + + pub fn has_accounts(&self, fork: Fork) -> bool { + false } } @@ -428,6 +745,7 @@ mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; + use rand::{thread_rng, Rng}; use solana_sdk::account::Account; use solana_sdk::hash::Hash; use solana_sdk::signature::Keypair; @@ -437,15 +755,18 @@ mod tests { #[test] fn test_purge() { - let mut db = AccountsDB::default(); + let paths = "purge".to_string(); + let db = AccountsDB::default(); + db.add_storage(&paths); let key = Pubkey::default(); let account = Account::new(0, 0, Pubkey::default()); // accounts are deleted when their token value is 0 and purge is true - db.store(false, &key, &account); - assert_eq!(AccountsDB::load(&[&db], &key), Some(account.clone())); + db.store(0, false, &key, &account); + assert_eq!(db.load(0, &key), Some(account.clone())); // purge should be set to true for the root checkpoint - db.store(true, &key, &account); - assert_eq!(AccountsDB::load(&[&db], &key), None); + db.store(0, true, &key, &account); + assert_eq!(db.load(0, &key), None); + cleanup_dirs(&paths); } fn load_accounts( @@ -453,12 +774,13 @@ mod tests { ka: &Vec<(Pubkey, Account)>, error_counters: &mut ErrorCounters, ) -> Vec> { - let accounts = Accounts::default(); + let accounts = Accounts::new(""); for ka in ka.iter() { - accounts.store_slow(true, &ka.0, &ka.1); + accounts.store_slow(0, true, &ka.0, &ka.1); } - Accounts::load_accounts(&[&accounts], &[tx], vec![Ok(())], error_counters) + let res = accounts.load_accounts(0, &[tx], vec![Ok(())], error_counters); + res } #[test] @@ -838,20 +1160,20 @@ mod tests { let account0 = Account::new(1, 0, key); // store value 1 in the "root", i.e. db zero - db0.store(true, &key, &account0); + db0.store(0, true, &key, &account0); // store value 0 in the child, but don't purge (see purge test above) let mut db1 = AccountsDB::default(); let account1 = Account::new(0, 0, key); - db1.store(false, &key, &account1); + db1.store(1, false, &key, &account1); // masking accounts is done at the Accounts level, at accountsDB we see // original account - assert_eq!(AccountsDB::load(&[&db1, &db0], &key), Some(account1)); + assert_eq!(db1.load(1, &key), Some(account1)); // squash, which should whack key's account db1.squash(&[&db0]); - assert_eq!(AccountsDB::load(&[&db1], &key), None); + assert_eq!(db1.load(1, &key), None); } #[test] @@ -861,22 +1183,196 @@ mod tests { // 1 token in the "root", i.e. db zero let mut db0 = AccountsDB::default(); let account0 = Account::new(1, 0, key); - db0.store(true, &key, &account0); + db0.store(0, true, &key, &account0); // 0 tokens in the child let mut db1 = AccountsDB::default(); let account1 = Account::new(0, 0, key); - db1.store(false, &key, &account1); + db1.store(1, false, &key, &account1); // masking accounts is done at the Accounts level, at accountsDB we see // original account - assert_eq!(AccountsDB::load(&[&db1, &db0], &key), Some(account1)); + assert_eq!(db1.load(1, &key), Some(account1)); - let mut accounts0 = Accounts::default(); - accounts0.accounts_db = RwLock::new(db0); - let mut accounts1 = Accounts::default(); - accounts1.accounts_db = RwLock::new(db1); - assert_eq!(Accounts::load_slow(&[&accounts1, &accounts0], &key), None); + let mut accounts0 = Accounts::new(""); + accounts0.accounts_db = db0; + let mut accounts1 = Accounts::new(""); + accounts1.accounts_db = db1; + assert_eq!(accounts1.load_slow(1, &key), None); } + fn create_account( + accounts: &AccountsDB, + pubkeys: &mut Vec, + num: usize, + num_vote: usize, + ) { + let mut nvote = num_vote; + for t in 0..num { + let pubkey = Keypair::new().pubkey(); + let mut default_account = Account::default(); + pubkeys.push(pubkey.clone()); + default_account.tokens = (t + 1) as u64; + if nvote > 0 && (t + 1) % nvote == 0 { + default_account.owner = vote_program::id(); + nvote -= 1; + } + assert!(accounts.load(0, &pubkey).is_none()); + accounts.store(0, true, &pubkey, &default_account); + } + } + + fn update_accounts(accounts: &AccountsDB, pubkeys: Vec, range: usize) { + for _ in 1..1000 { + let idx = thread_rng().gen_range(0, range); + if let Some(mut account) = accounts.load(0, &pubkeys[idx]) { + account.tokens = account.tokens + 1; + accounts.store(0, true, &pubkeys[idx], &account); + if account.tokens == 0 { + assert!(accounts.load(0, &pubkeys[idx]).is_none()); + } else { + let mut default_account = Account::default(); + default_account.tokens = account.tokens; + assert_eq!(compare_account(&default_account, &account), true); + } + } + } + } + + fn compare_account(account1: &Account, account2: &Account) -> bool { + if account1.userdata != account2.userdata + || account1.owner != account2.owner + || account1.executable != account2.executable + || account1.tokens != account2.tokens + { + return false; + } + true + } + + fn cleanup_dirs(paths: &str) { + let paths: Vec = paths.split(',').map(|s| s.to_string()).collect(); + paths.iter().for_each(|p| { + let _ignored = remove_dir_all(p); + }) + } + + #[test] + fn test_account_one() { + let paths = "one".to_string(); + let accounts = AccountsDB::default(); + accounts.add_storage(&paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 1, 0); + let account = accounts.load(0, &pubkeys[0]).unwrap(); + let mut default_account = Account::default(); + default_account.tokens = 1; + assert_eq!(compare_account(&default_account, &account), true); + cleanup_dirs(&paths); + } + + #[test] + fn test_account_many() { + let paths = "many0,many1".to_string(); + let accounts = AccountsDB::default(); + accounts.add_storage(&paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 100, 0); + for _ in 1..100 { + let idx = thread_rng().gen_range(0, 99); + let account = accounts.load(0, &pubkeys[idx]).unwrap(); + let mut default_account = Account::default(); + default_account.tokens = (idx + 1) as u64; + assert_eq!(compare_account(&default_account, &account), true); + } + cleanup_dirs(&paths); + } + + #[test] + fn test_account_update() { + let paths = "update0".to_string(); + let accounts = AccountsDB::default(); + accounts.add_storage(&paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 100, 0); + update_accounts(&accounts, pubkeys, 99); + { + let stores = accounts.storage.read().unwrap(); + assert_eq!(stores.len(), 1); + assert_eq!(stores[0].count.load(Ordering::Relaxed), 100); + assert_eq!( + stores[0].get_status(), + AccountStorageStatus::StorageAvailable + ); + } + cleanup_dirs(&paths); + } + + #[test] + fn test_account_grow() { + let paths = "grow0".to_string(); + let accounts = AccountsDB::default(); + accounts.add_storage(&paths); + let count = [0, 1]; + let status = [ + AccountStorageStatus::StorageAvailable, + AccountStorageStatus::StorageFull, + ]; + let pubkey1 = Keypair::new().pubkey(); + let account1 = Account::new(1, ACCOUNT_DATA_FILE_SIZE as usize / 2, pubkey1); + accounts.store(0, true, &pubkey1, &account1); + { + let stores = accounts.storage.read().unwrap(); + assert_eq!(stores.len(), 1); + assert_eq!(stores[0].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[0].get_status(), status[0]); + } + + let pubkey2 = Keypair::new().pubkey(); + let account2 = Account::new(1, ACCOUNT_DATA_FILE_SIZE as usize / 2, pubkey2); + accounts.store(0, true, &pubkey2, &account2); + { + let stores = accounts.storage.read().unwrap(); + assert_eq!(stores.len(), 2); + assert_eq!(stores[0].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[0].get_status(), status[1]); + assert_eq!(stores[1].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[1].get_status(), status[0]); + } + assert_eq!(accounts.load(0, &pubkey1).unwrap(), account1); + assert_eq!(accounts.load(0, &pubkey2).unwrap(), account2); + + for i in 0..25 { + let index = i % 2; + accounts.store(0, true, &pubkey1, &account1); + { + let stores = accounts.storage.read().unwrap(); + assert_eq!(stores.len(), 3); + assert_eq!(stores[0].count.load(Ordering::Relaxed), count[index]); + assert_eq!(stores[0].get_status(), status[0]); + assert_eq!(stores[1].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[1].get_status(), status[1]); + assert_eq!(stores[2].count.load(Ordering::Relaxed), count[index ^ 1]); + assert_eq!(stores[2].get_status(), status[0]); + } + assert_eq!(accounts.load(0, &pubkey1).unwrap(), account1); + assert_eq!(accounts.load(0, &pubkey2).unwrap(), account2); + } + cleanup_dirs(&paths); + } + + #[test] + fn test_account_vote() { + let paths = "vote0".to_string(); + let accounts = AccountsDB::default(); + accounts.add_storage(&paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 100, 6); + let accounts = accounts.get_vote_accounts(0); + assert_eq!(accounts.len(), 6); + accounts.iter().for_each(|account| { + assert_eq!(account.owner, vote_program::id()); + }); + cleanup_dirs(&paths); + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 29b7a2915a..118c2a9c15 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -80,7 +80,7 @@ type BankStatusCache = StatusCache; /// Manager for the state of all accounts and programs after processing its entries. #[derive(Default)] pub struct Bank { - accounts: Accounts, + accounts: Option>, /// A cache of signature statuses status_cache: RwLock, @@ -115,7 +115,12 @@ pub struct Bank { impl Bank { pub fn new(genesis_block: &GenesisBlock) -> Self { + Self::new_with_paths(&genesis_block, "") + } + + pub fn new_with_paths(genesis_block: &GenesisBlock, paths: &str) -> Self { let mut bank = Self::default(); + bank.accounts = Some(Arc::new(Accounts::new(&paths))); bank.process_genesis_block(genesis_block); bank.add_builtin_programs(); bank @@ -136,6 +141,7 @@ impl Bank { bank.parent_hash = parent.hash(); bank.collector_id = collector_id; + bank.accounts = Some(parent.accounts()); bank } @@ -175,8 +181,8 @@ impl Bank { let parents = self.parents(); *self.parent.write().unwrap() = None; - let parent_accounts: Vec<_> = parents.iter().map(|b| &b.accounts).collect(); - self.accounts.squash(&parent_accounts); + let parent_accounts: Vec<_> = parents.iter().map(|b| b.accounts()).collect(); + self.accounts().squash(&parent_accounts); let parent_caches: Vec<_> = parents .iter() @@ -230,7 +236,8 @@ impl Bank { .serialize(&mut bootstrap_leader_vote_account.userdata) .unwrap(); - self.accounts.store_slow( + self.accounts().store_slow( + self.id, self.is_root(), &genesis_block.bootstrap_leader_vote_account_id, &bootstrap_leader_vote_account, @@ -248,8 +255,8 @@ impl Bank { pub fn add_native_program(&self, name: &str, program_id: &Pubkey) { let account = native_loader::create_program_account(name); - self.accounts - .store_slow(self.is_root(), program_id, &account); + self.accounts() + .store_slow(self.id, self.is_root(), program_id, &account); } fn add_builtin_programs(&self) { @@ -338,11 +345,11 @@ impl Bank { } // TODO: put this assert back in // assert!(!self.is_frozen()); - self.accounts.lock_accounts(txs) + self.accounts().lock_accounts(txs) } pub fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) { - self.accounts.unlock_accounts(txs, results) + self.accounts().unlock_accounts(txs, results) } fn load_accounts( @@ -351,10 +358,8 @@ impl Bank { results: Vec>, error_counters: &mut ErrorCounters, ) -> Vec> { - let parents = self.parents(); - let mut accounts = vec![&self.accounts]; - accounts.extend(parents.iter().map(|b| &b.accounts)); - Accounts::load_accounts(&accounts, txs, results, error_counters) + self.accounts() + .load_accounts(self.id, txs, results, error_counters) } fn check_age( &self, @@ -461,7 +466,7 @@ impl Bank { inc_new_counter_info!("bank-process_transactions-error_count", err_count); } - self.accounts.increment_transaction_count(tx_count); + self.accounts().increment_transaction_count(tx_count); inc_new_counter_info!("bank-process_transactions-txs", tx_count); if 0 != error_counters.last_id_not_found { @@ -536,8 +541,8 @@ impl Bank { // TODO: put this assert back in // assert!(!self.is_frozen()); let now = Instant::now(); - self.accounts - .store_accounts(self.is_root(), txs, executed, loaded_accounts); + self.accounts() + .store_accounts(self.id, self.is_root(), txs, executed, loaded_accounts); // once committed there is no way to unroll let write_elapsed = now.elapsed(); @@ -622,7 +627,7 @@ impl Bank { } account.tokens -= tokens; - self.accounts.store_slow(true, pubkey, &account); + self.accounts().store_slow(self.id, true, pubkey, &account); Ok(()) } None => Err(BankError::AccountNotFound), @@ -632,22 +637,27 @@ impl Bank { pub fn deposit(&self, pubkey: &Pubkey, tokens: u64) { let mut account = self.get_account(pubkey).unwrap_or_default(); account.tokens += tokens; - self.accounts.store_slow(self.is_root(), pubkey, &account); + self.accounts().store_slow(self.id, self.is_root(), pubkey, &account); + } + + fn accounts(&self) -> Arc { + if let Some(accounts) = &self.accounts { + accounts.clone() + } else { + Arc::new(Accounts::new("")) + } } pub fn get_account(&self, pubkey: &Pubkey) -> Option { - let parents = self.parents(); - let mut accounts = vec![&self.accounts]; - accounts.extend(parents.iter().map(|b| &b.accounts)); - Accounts::load_slow(&accounts, pubkey) + self.accounts().load_slow(self.id, pubkey) } pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option { - Accounts::load_slow(&[&self.accounts], pubkey) + self.accounts().load_slow(self.id, pubkey) } pub fn transaction_count(&self) -> u64 { - self.accounts.transaction_count() + self.accounts().transaction_count() } pub fn get_signature_status(&self, signature: &Signature) -> Option> { @@ -669,12 +679,11 @@ impl Bank { fn hash_internal_state(&self) -> Hash { // If there are no accounts, return the same hash as we did before // checkpointing. - let accounts = &self.accounts.accounts_db.read().unwrap().accounts; - if accounts.is_empty() { + if !self.accounts().has_accounts(self.id) { return self.parent_hash; } - let accounts_delta_hash = self.accounts.hash_internal_state(); + let accounts_delta_hash = self.accounts().hash_internal_state(self.id); extend_and_hash(&self.parent_hash, &serialize(&accounts_delta_hash).unwrap()) } @@ -682,34 +691,17 @@ impl Bank { where F: Fn(&VoteState) -> bool, { - let parents = self.parents(); - let mut accounts = vec![&self.accounts]; - accounts.extend(parents.iter().map(|b| &b.accounts)); - let mut exists = HashSet::new(); - accounts + self.accounts() + .accounts_db + .get_vote_accounts(self.id) .iter() - .flat_map(|account| { - let accounts_db = account.accounts_db.read().unwrap(); - let vote_states: Vec<_> = accounts_db - .accounts - .iter() - .filter_map(|(key, account)| { - if exists.contains(key) { - None - } else { - exists.insert(key.clone()); - if vote_program::check_id(&account.owner) { - if let Ok(vote_state) = VoteState::deserialize(&account.userdata) { - if cond(&vote_state) { - return Some(vote_state); - } - } - } - None - } - }) - .collect(); - vote_states + .filter_map(|account| { + if let Ok(vote_state) = VoteState::deserialize(&account.userdata) { + if cond(&vote_state) { + return Some(vote_state); + } + } + None }) .collect() } diff --git a/src/fullnode.rs b/src/fullnode.rs index 592f82bea0..8850cd14a5 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -71,6 +71,7 @@ pub struct FullnodeConfig { pub blockstream: Option, pub storage_rotate_count: u64, pub tick_config: PohServiceConfig, + pub account_paths: String, } impl Default for FullnodeConfig { fn default() -> Self { @@ -84,6 +85,7 @@ impl Default for FullnodeConfig { blockstream: None, storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE, tick_config: PohServiceConfig::default(), + account_paths: "0,1,2,3".to_string(), } } } @@ -123,7 +125,7 @@ impl Fullnode { assert_eq!(id, node.info.id); let (mut bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = - new_banks_from_blocktree(ledger_path); + new_banks_from_blocktree(ledger_path, &config.account_paths); let exit = Arc::new(AtomicBool::new(false)); let bank_info = &bank_forks_info[0]; @@ -405,6 +407,7 @@ impl Fullnode { pub fn new_banks_from_blocktree( blocktree_path: &str, + account_paths: &str, ) -> (BankForks, Vec, Blocktree, Receiver) { let genesis_block = GenesisBlock::load(blocktree_path).expect("Expected to successfully open genesis block"); @@ -743,7 +746,7 @@ mod tests { // Close the validator so that rocksdb has locks available validator_exit(); - let (bank_forks, bank_forks_info, _, _) = new_banks_from_blocktree(&validator_ledger_path); + let (bank_forks, bank_forks_info, _, _) = new_banks_from_blocktree(&validator_ledger_path, "accounts"); let bank = bank_forks.working_bank(); let entry_height = bank_forks_info[0].entry_height; diff --git a/src/replay_stage.rs b/src/replay_stage.rs index ea383fe497..1aaa298314 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -499,7 +499,7 @@ mod test { let (to_leader_sender, _to_leader_receiver) = channel(); { let (bank_forks, bank_forks_info, blocktree, l_receiver) = - new_banks_from_blocktree(&my_ledger_path); + new_banks_from_blocktree(&my_ledger_path, ""); let bank = bank_forks.working_bank(); let last_entry_id = bank_forks_info[0].last_entry_id; diff --git a/tests/multinode.rs b/tests/multinode.rs index 3e32684491..a0aab038cb 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -911,7 +911,7 @@ fn test_leader_to_validator_transition() { leader_exit(); info!("Check the ledger to make sure it's the right height..."); - let bank_forks = new_banks_from_blocktree(&leader_ledger_path).0; + let bank_forks = new_banks_from_blocktree(&leader_ledger_path, "").0; let _bank = bank_forks.working_bank(); remove_dir_all(leader_ledger_path).unwrap();