From 1c0758e3bdf4fb897d063294aec9793e947db1da Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 29 Jan 2019 16:33:28 -0800 Subject: [PATCH] Accounts refactoring for forking. * Move last_id and signature dup handling out of Accounts. * Accounts that handle overlays. --- src/accounts.rs | 335 ++++++++++++++++++++++-------------------------- src/bank.rs | 65 +++++++--- 2 files changed, 201 insertions(+), 199 deletions(-) diff --git a/src/accounts.rs b/src/accounts.rs index 70baaf276..e6b7d1497 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -1,7 +1,6 @@ use crate::bank::BankError; use crate::bank::Result; use crate::counter::Counter; -use crate::status_deque::{StatusDeque, StatusDequeError}; use bincode::serialize; use hashbrown::{HashMap, HashSet}; use log::Level; @@ -10,6 +9,7 @@ use solana_sdk::hash::{hash, Hash}; use solana_sdk::pubkey::Pubkey; use solana_sdk::transaction::Transaction; use std::collections::BTreeMap; +use std::ops::Deref; use std::sync::atomic::AtomicUsize; use std::sync::{Mutex, RwLock}; @@ -21,6 +21,7 @@ pub struct ErrorCounters { pub account_not_found: usize, pub account_in_use: usize, pub last_id_not_found: usize, + pub last_id_too_old: usize, pub reserve_last_id: usize, pub insufficient_funds: usize, pub duplicate_signature: usize, @@ -65,10 +66,6 @@ impl Default for Accounts { } impl AccountsDB { - pub fn keys(&self) -> Vec { - self.accounts.keys().cloned().collect() - } - pub fn hash_internal_state(&self) -> Hash { let mut ordered_accounts = BTreeMap::new(); @@ -81,16 +78,28 @@ impl AccountsDB { hash(&serialize(&ordered_accounts).unwrap()) } - fn load(&self, pubkey: &Pubkey) -> Option<&Account> { - if let Some(account) = self.accounts.get(pubkey) { - return Some(account); + fn load(checkpoints: &[U], pubkey: &Pubkey) -> Option + where + U: Deref, + { + for db in checkpoints { + if let Some(account) = db.accounts.get(pubkey) { + return Some(account.clone()); + } } None } - - pub fn store(&mut self, pubkey: &Pubkey, account: &Account) { + /// Store the account update. If the update is to delete the account because the token balance + /// is 0, purge needs to be set to true for the delete to occur in place. + pub fn store(&mut self, purge: bool, pubkey: &Pubkey, account: &Account) { if account.tokens == 0 { - self.accounts.remove(pubkey); + if purge { + // purge if balance is 0 and no checkpoints + self.accounts.remove(pubkey); + } else { + // store default account if balance is 0 and there's a checkpoint + self.accounts.insert(pubkey.clone(), Account::default()); + } } else { self.accounts.insert(pubkey.clone(), account.clone()); } @@ -98,6 +107,7 @@ impl AccountsDB { pub fn store_accounts( &mut self, + purge: bool, txs: &[Transaction], res: &[Result<()>], loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], @@ -108,65 +118,51 @@ impl AccountsDB { } let tx = &txs[i]; - let accs = raccs.as_ref().unwrap(); - for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) { - self.store(key, account); + let acc = raccs.as_ref().unwrap(); + for (key, account) in tx.account_keys.iter().zip(acc.0.iter()) { + self.store(purge, key, account); } } } - - fn load_tx_accounts( - &self, + fn load_tx_accounts( + checkpoints: &[U], tx: &Transaction, - last_ids: &mut StatusDeque>, - max_age: usize, error_counters: &mut ErrorCounters, - ) -> Result> { + ) -> Result> + where + U: Deref, + { // Copy all the accounts if tx.signatures.is_empty() && tx.fee != 0 { Err(BankError::MissingSignatureForFee) - } else if tx.account_keys.is_empty() || self.load(&tx.account_keys[0]).is_none() { - error_counters.account_not_found += 1; - Err(BankError::AccountNotFound) - } else if self.load(&tx.account_keys[0]).unwrap().tokens < tx.fee { - error_counters.insufficient_funds += 1; - Err(BankError::InsufficientFundsForFee) } else { - if !last_ids.check_entry_id_age(tx.last_id, max_age) { - error_counters.last_id_not_found += 1; - return Err(BankError::LastIdNotFound); - } - // There is no way to predict what program will execute without an error // If a fee can pay for execution then the program will be scheduled - last_ids - .reserve_signature_with_last_id(&tx.last_id, &tx.signatures[0]) - .map_err(|err| match err { - StatusDequeError::LastIdNotFound => { - error_counters.reserve_last_id += 1; - BankError::LastIdNotFound - } - StatusDequeError::DuplicateSignature => { - error_counters.duplicate_signature += 1; - BankError::DuplicateSignature - } - })?; - - let mut called_accounts: Vec = tx - .account_keys - .iter() - .map(|key| self.load(key).cloned().unwrap_or_default()) - .collect(); - called_accounts[0].tokens -= tx.fee; - Ok(called_accounts) + let mut called_accounts: Vec = vec![]; + for key in &tx.account_keys { + called_accounts.push(Self::load(checkpoints, key).unwrap_or_default()); + } + if called_accounts.is_empty() || called_accounts[0].tokens == 0 { + error_counters.account_not_found += 1; + Err(BankError::AccountNotFound) + } else if called_accounts[0].tokens < tx.fee { + error_counters.insufficient_funds += 1; + Err(BankError::InsufficientFundsForFee) + } else { + called_accounts[0].tokens -= tx.fee; + Ok(called_accounts) + } } } - fn load_executable_accounts( - &self, + fn load_executable_accounts( + checkpoints: &[U], mut program_id: Pubkey, error_counters: &mut ErrorCounters, - ) -> Result> { + ) -> Result> + where + U: Deref, + { let mut accounts = Vec::new(); let mut depth = 0; loop { @@ -181,7 +177,7 @@ impl AccountsDB { } depth += 1; - let program = match self.load(&program_id) { + let program = match Self::load(checkpoints, &program_id) { Some(program) => program, None => { error_counters.account_not_found += 1; @@ -202,11 +198,14 @@ impl AccountsDB { } /// For each program_id in the transaction, load its loaders. - fn load_loaders( - &self, + fn load_loaders( + checkpoints: &[U], tx: &Transaction, error_counters: &mut ErrorCounters, - ) -> Result>> { + ) -> Result>> + where + U: Deref, + { tx.instructions .iter() .map(|ix| { @@ -215,25 +214,26 @@ impl AccountsDB { return Err(BankError::AccountNotFound); } let program_id = tx.program_ids[ix.program_ids_index as usize]; - self.load_executable_accounts(program_id, error_counters) + Self::load_executable_accounts(checkpoints, program_id, error_counters) }) .collect() } - fn load_accounts( - &self, + fn load_accounts( + checkpoints: &[U], txs: &[Transaction], - last_ids: &mut StatusDeque>, lock_results: Vec>, - max_age: usize, error_counters: &mut ErrorCounters, - ) -> Vec> { + ) -> Vec> + where + U: Deref, + { txs.iter() .zip(lock_results.into_iter()) .map(|etx| match etx { (tx, Ok(())) => { - let accounts = self.load_tx_accounts(tx, last_ids, max_age, error_counters)?; - let loaders = self.load_loaders(tx, error_counters)?; + let accounts = Self::load_tx_accounts(checkpoints, tx, error_counters)?; + let loaders = Self::load_loaders(checkpoints, tx, error_counters)?; Ok((accounts, loaders)) } (_, Err(e)) => Err(e), @@ -248,36 +248,35 @@ impl AccountsDB { pub fn transaction_count(&self) -> u64 { self.transaction_count } + pub fn account_values_slow(&self) -> Vec<(Pubkey, solana_sdk::account::Account)> { + self.accounts.iter().map(|(x, y)| (*x, y.clone())).collect() + } + fn merge(&mut self, other: Self) { + self.transaction_count += other.transaction_count; + self.accounts.extend(other.accounts) + } } impl Accounts { - // TODO use a fork - pub fn copy_for_tpu(&self) -> Self { - let copy = Accounts::default(); - - { - let mut accounts_db = copy.accounts_db.write().unwrap(); - for (key, val) in self.accounts_db.read().unwrap().accounts.iter() { - accounts_db.accounts.insert(key.clone(), val.clone()); - } - accounts_db.transaction_count = self.transaction_count(); - } - - copy + /// Slow because lock is held for 1 operation insted of many + pub fn load_slow(checkpoints: &[U], pubkey: &Pubkey) -> Option + where + U: Deref, + { + let dbs: Vec<_> = checkpoints + .iter() + .map(|obj| obj.accounts_db.read().unwrap()) + .collect(); + AccountsDB::load(&dbs, pubkey) } - - pub fn keys(&self) -> Vec { - self.accounts_db.read().unwrap().keys() - } - - /// Slow because lock is held for 1 operation instead of many - pub fn load_slow(&self, pubkey: &Pubkey) -> Option { - self.accounts_db.read().unwrap().load(pubkey).cloned() - } - - /// Slow because lock is held for 1 operation instead of many - pub fn store_slow(&self, pubkey: &Pubkey, account: &Account) { - self.accounts_db.write().unwrap().store(pubkey, account) + /// Slow because lock is held for 1 operation insted of many + /// * purge - if the account token value is 0 and purge is true then delete the account. + /// purge should be set to false for overlays, and true for the root checkpoint. + pub fn store_slow(&self, purge: bool, pubkey: &Pubkey, account: &Account) { + self.accounts_db + .write() + .unwrap() + .store(purge, pubkey, account) } fn lock_account( @@ -341,25 +340,28 @@ impl Accounts { .for_each(|(tx, result)| Self::unlock_account(tx, result, &mut account_locks)); } - pub fn load_accounts( - &self, + pub fn load_accounts( + checkpoints: &[U], txs: &[Transaction], - last_ids: &mut StatusDeque>, - lock_results: Vec>, - max_age: usize, + results: Vec>, error_counters: &mut ErrorCounters, - ) -> Vec> { - self.accounts_db.read().unwrap().load_accounts( - txs, - last_ids, - lock_results, - max_age, - error_counters, - ) + ) -> Vec> + where + U: Deref, + { + let dbs: Vec<_> = checkpoints + .iter() + .map(|obj| obj.accounts_db.read().unwrap()) + .collect(); + AccountsDB::load_accounts(&dbs, txs, results, error_counters) } + /// Store the accounts into the DB + /// * purge - if the account token value is 0 and purge is true then delete the account. + /// purge should be set to false for overlays, and true for the root checkpoint. pub fn store_accounts( &self, + purge: bool, txs: &[Transaction], res: &[Result<()>], loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], @@ -367,7 +369,7 @@ impl Accounts { self.accounts_db .write() .unwrap() - .store_accounts(txs, res, loaded) + .store_accounts(purge, txs, res, loaded) } pub fn increment_transaction_count(&self, tx_count: usize) { @@ -380,6 +382,27 @@ impl Accounts { pub fn transaction_count(&self) -> u64 { self.accounts_db.read().unwrap().transaction_count() } + /// accounts starts with an empty data structure for every fork + /// self is root, merge the fork into self + pub fn merge_into_root(&self, other: Self) { + assert!(other.account_locks.lock().unwrap().is_empty()); + let db = other.accounts_db.into_inner().unwrap(); + let mut mydb = self.accounts_db.write().unwrap(); + mydb.merge(db) + } + pub fn copy_for_tpu(&self) -> Self { + //TODO: deprecate this in favor of forks and merge_into_root + let copy = Accounts::default(); + + { + let mut accounts_db = copy.accounts_db.write().unwrap(); + for (key, val) in self.accounts_db.read().unwrap().accounts.iter() { + accounts_db.accounts.insert(key.clone(), val.clone()); + } + accounts_db.transaction_count = self.transaction_count(); + } + copy + } } #[cfg(test)] @@ -394,22 +417,30 @@ mod tests { use solana_sdk::transaction::Instruction; use solana_sdk::transaction::Transaction; + #[test] + fn test_purge() { + let mut db = AccountsDB::default(); + let key = Pubkey::default(); + let account = Account::new(0, 0, Pubkey::default()); + // accounts are deleted when their token value is 0 and purge is true + db.store(false, &key, &account); + assert_eq!(AccountsDB::load(&[&db], &key), Some(account.clone())); + // purge should be set to true for the root checkpoint + db.store(true, &key, &account); + assert_eq!(AccountsDB::load(&[&db], &key), None); + } + fn load_accounts( tx: Transaction, ka: &Vec<(Pubkey, Account)>, error_counters: &mut ErrorCounters, - max_age: usize, ) -> Vec> { let accounts = Accounts::default(); for ka in ka.iter() { - accounts.store_slow(&ka.0, &ka.1); + accounts.store_slow(true, &ka.0, &ka.1); } - let id = Default::default(); - let mut last_ids: StatusDeque> = StatusDeque::default(); - last_ids.register_tick(&id); - - accounts.load_accounts(&[tx], &mut last_ids, vec![Ok(())], max_age, error_counters) + Accounts::load_accounts(&[&accounts], &[tx], vec![Ok(())], error_counters) } fn assert_counters(error_counters: &ErrorCounters, expected: [usize; 8]) { @@ -423,34 +454,6 @@ mod tests { assert_eq!(error_counters.missing_signature_for_fee, expected[7]); } - #[test] - fn test_load_accounts_index_out_of_bounds() { - let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); - let mut error_counters = ErrorCounters::default(); - - let keypair = Keypair::new(); - let key0 = keypair.pubkey(); - - let account = Account::new(1, 1, Pubkey::default()); - accounts.push((key0, account)); - - let instructions = vec![Instruction::new(1, &(), vec![0, 1])]; - let tx = Transaction::new_with_instructions( - &[&keypair], - &[], // TODO this should contain a key, should fail - Hash::default(), - 0, - vec![solana_native_loader::id()], - instructions, - ); - - let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters, 10); - - assert_counters(&error_counters, [1, 0, 0, 0, 0, 0, 0, 0]); - assert_eq!(loaded_accounts.len(), 1); - assert_eq!(loaded_accounts[0], Err(BankError::AccountNotFound)); - } - #[test] fn test_load_accounts_no_key() { let accounts: Vec<(Pubkey, Account)> = Vec::new(); @@ -466,7 +469,7 @@ mod tests { instructions, ); - let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters, 10); + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); assert_counters(&error_counters, [1, 0, 0, 0, 0, 0, 0, 0]); assert_eq!(loaded_accounts.len(), 1); @@ -490,7 +493,7 @@ mod tests { instructions, ); - let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters, 10); + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); assert_counters(&error_counters, [1, 0, 0, 0, 0, 0, 0, 0]); assert_eq!(loaded_accounts.len(), 1); @@ -522,45 +525,13 @@ mod tests { instructions, ); - let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters, 10); + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); assert_counters(&error_counters, [1, 0, 0, 0, 0, 0, 0, 0]); assert_eq!(loaded_accounts.len(), 1); assert_eq!(loaded_accounts[0], Err(BankError::AccountNotFound)); } - #[test] - fn test_load_accounts_max_age() { - let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); - let mut error_counters = ErrorCounters::default(); - - let keypair = Keypair::new(); - let key0 = keypair.pubkey(); - let key1 = Pubkey::new(&[5u8; 32]); - - let account = Account::new(1, 1, Pubkey::default()); - accounts.push((key0, account)); - - let account = Account::new(2, 1, Pubkey::default()); - accounts.push((key1, account)); - - let instructions = vec![Instruction::new(1, &(), vec![0])]; - let tx = Transaction::new_with_instructions( - &[&keypair], - &[], - Hash::default(), - 0, - vec![solana_native_loader::id()], - instructions, - ); - - let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters, 0); - - assert_counters(&error_counters, [0, 0, 1, 0, 0, 0, 0, 0]); - assert_eq!(loaded_accounts.len(), 1); - assert_eq!(loaded_accounts[0], Err(BankError::LastIdNotFound)); - } - #[test] fn test_load_accounts_insufficient_funds() { let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); @@ -582,7 +553,7 @@ mod tests { instructions, ); - let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters, 10); + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); assert_counters(&error_counters, [0, 0, 0, 0, 1, 0, 0, 0]); assert_eq!(loaded_accounts.len(), 1); @@ -614,7 +585,7 @@ mod tests { instructions, ); - let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters, 10); + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); assert_counters(&error_counters, [0, 0, 0, 0, 0, 0, 0, 0]); assert_eq!(loaded_accounts.len(), 1); @@ -686,7 +657,7 @@ mod tests { instructions, ); - let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters, 10); + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); assert_counters(&error_counters, [0, 0, 0, 0, 0, 0, 1, 0]); assert_eq!(loaded_accounts.len(), 1); @@ -720,7 +691,7 @@ mod tests { instructions, ); - let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters, 10); + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); assert_counters(&error_counters, [1, 0, 0, 0, 0, 0, 0, 0]); assert_eq!(loaded_accounts.len(), 1); @@ -753,7 +724,7 @@ mod tests { instructions, ); - let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters, 10); + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); assert_counters(&error_counters, [1, 0, 0, 0, 0, 0, 0, 0]); assert_eq!(loaded_accounts.len(), 1); @@ -802,7 +773,7 @@ mod tests { instructions, ); - let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters, 10); + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); assert_counters(&error_counters, [0, 0, 0, 0, 0, 0, 0, 0]); assert_eq!(loaded_accounts.len(), 1); diff --git a/src/bank.rs b/src/bank.rs index 98f1054eb..7bfd7eef4 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -11,7 +11,7 @@ use crate::genesis_block::GenesisBlock; use crate::leader_scheduler::LeaderScheduler; use crate::poh_recorder::PohRecorder; use crate::runtime::{self, RuntimeError}; -use crate::status_deque::{Status, StatusDeque, MAX_ENTRY_IDS}; +use crate::status_deque::{Status, StatusDeque, StatusDequeError, MAX_ENTRY_IDS}; use bincode::deserialize; use itertools::Itertools; use log::Level; @@ -159,13 +159,14 @@ impl Bank { mint_account.tokens -= genesis_block.bootstrap_leader_tokens; bootstrap_leader_account.tokens += genesis_block.bootstrap_leader_tokens; self.accounts.store_slow( + true, &genesis_block.bootstrap_leader_id, &bootstrap_leader_account, ); }; self.accounts - .store_slow(&genesis_block.mint_id, &mint_account); + .store_slow(true, &genesis_block.mint_id, &mint_account); self.register_tick(&genesis_block.last_id()); } @@ -178,7 +179,7 @@ impl Bank { loader: solana_native_loader::id(), }; self.accounts - .store_slow(&system_program::id(), &system_program_account); + .store_slow(true, &system_program::id(), &system_program_account); } fn add_builtin_programs(&self) { @@ -193,7 +194,7 @@ impl Bank { loader: solana_native_loader::id(), }; self.accounts - .store_slow(&vote_program::id(), &vote_program_account); + .store_slow(true, &vote_program::id(), &vote_program_account); // Storage program let storage_program_account = Account { @@ -204,7 +205,7 @@ impl Bank { loader: solana_native_loader::id(), }; self.accounts - .store_slow(&storage_program::id(), &storage_program_account); + .store_slow(true, &storage_program::id(), &storage_program_account); let storage_system_account = Account { tokens: 1, @@ -214,7 +215,7 @@ impl Bank { loader: Pubkey::default(), }; self.accounts - .store_slow(&storage_program::system_id(), &storage_system_account); + .store_slow(true, &storage_program::system_id(), &storage_system_account); // Bpf Loader let bpf_loader_account = Account { @@ -226,7 +227,7 @@ impl Bank { }; self.accounts - .store_slow(&bpf_loader::id(), &bpf_loader_account); + .store_slow(true, &bpf_loader::id(), &bpf_loader_account); // Budget program let budget_program_account = Account { @@ -237,7 +238,7 @@ impl Bank { loader: solana_native_loader::id(), }; self.accounts - .store_slow(&budget_program::id(), &budget_program_account); + .store_slow(true, &budget_program::id(), &budget_program_account); // Erc20 token program let erc20_account = Account { @@ -249,7 +250,7 @@ impl Bank { }; self.accounts - .store_slow(&token_program::id(), &erc20_account); + .store_slow(true, &token_program::id(), &erc20_account); } /// Return the last entry ID registered. @@ -428,17 +429,46 @@ impl Bank { } fn load_accounts( + &self, + txs: &[Transaction], + results: Vec>, + error_counters: &mut ErrorCounters, + ) -> Vec> { + Accounts::load_accounts(&[&self.accounts], txs, results, error_counters) + } + fn check_signatures( &self, txs: &[Transaction], lock_results: Vec>, max_age: usize, error_counters: &mut ErrorCounters, - ) -> Vec> { + ) -> Vec> { let mut last_ids = self.last_ids.write().unwrap(); - self.accounts - .load_accounts(txs, &mut last_ids, lock_results, max_age, error_counters) + txs.iter() + .zip(lock_results.into_iter()) + .map(|(tx, lock_res)| { + if lock_res.is_ok() { + let r = if !last_ids.check_entry_id_age(tx.last_id, max_age) { + Err(StatusDequeError::LastIdNotFound) + } else { + last_ids.reserve_signature_with_last_id(&tx.last_id, &tx.signatures[0]) + }; + r.map_err(|err| match err { + StatusDequeError::LastIdNotFound => { + error_counters.reserve_last_id += 1; + BankError::LastIdNotFound + } + StatusDequeError::DuplicateSignature => { + error_counters.duplicate_signature += 1; + BankError::DuplicateSignature + } + }) + } else { + lock_res + } + }) + .collect() } - #[allow(clippy::type_complexity)] fn load_and_execute_transactions( &self, @@ -452,8 +482,8 @@ impl Bank { debug!("processing transactions: {}", txs.len()); let mut error_counters = ErrorCounters::default(); let now = Instant::now(); - let mut loaded_accounts = - self.load_accounts(txs, lock_results, max_age, &mut error_counters); + let sig_results = self.check_signatures(txs, lock_results, max_age, &mut error_counters); + let mut loaded_accounts = self.load_accounts(txs, sig_results, &mut error_counters); let tick_height = self.tick_height(); let load_elapsed = now.elapsed(); @@ -539,7 +569,8 @@ impl Bank { executed: &[Result<()>], ) { let now = Instant::now(); - self.accounts.store_accounts(txs, executed, loaded_accounts); + self.accounts + .store_accounts(true, txs, executed, loaded_accounts); // Check account subscriptions and send notifications self.send_account_notifications(txs, executed, loaded_accounts); @@ -753,7 +784,7 @@ impl Bank { } pub fn get_account(&self, pubkey: &Pubkey) -> Option { - self.accounts.load_slow(pubkey) + Accounts::load_slow(&[&self.accounts], pubkey) } pub fn transaction_count(&self) -> u64 {