Use a single structure for last_ids and last_ids_sigs

This commit is contained in:
Anatoly Yakovenko 2018-10-12 15:20:53 -07:00 committed by Grimes
parent e5ab9a856c
commit bba6437ea9
2 changed files with 58 additions and 55 deletions

View File

@ -4,9 +4,7 @@ extern crate rayon;
extern crate solana; extern crate solana;
extern crate test; extern crate test;
use bincode::serialize;
use solana::bank::*; use solana::bank::*;
use solana::hash::hash;
use solana::mint::Mint; use solana::mint::Mint;
use solana::signature::{Keypair, KeypairUtil}; use solana::signature::{Keypair, KeypairUtil};
use solana::system_transaction::SystemTransaction; use solana::system_transaction::SystemTransaction;
@ -21,7 +19,7 @@ fn bench_process_transaction(bencher: &mut Bencher) {
// Create transactions between unrelated parties. // Create transactions between unrelated parties.
let transactions: Vec<_> = (0..4096) let transactions: Vec<_> = (0..4096)
.into_iter() .into_iter()
.map(|i| { .map(|_| {
// Seed the 'from' account. // Seed the 'from' account.
let rando0 = Keypair::new(); let rando0 = Keypair::new();
let tx = Transaction::system_move( let tx = Transaction::system_move(

View File

@ -114,6 +114,18 @@ struct ErrorCounters {
insufficient_funds: usize, insufficient_funds: usize,
duplicate_signature: usize, duplicate_signature: usize,
} }
pub struct LastIdsQ {
/// A FIFO queue of `last_id` items, where each item is a set of signatures
/// that have been processed using that `last_id`. Rejected `last_id`
/// values are so old that the `last_id` has been pulled out of the queue.
last_ids: VecDeque<Hash>,
/// Mapping of hashes to signature sets along with timestamp. The bank uses this data to
/// reject transactions with signatures its seen before
last_ids_sigs: HashMap<Hash, (SignatureStatusMap, u64)>,
}
/// The state of all accounts and contracts after processing its entries. /// The state of all accounts and contracts after processing its entries.
pub struct Bank { pub struct Bank {
/// A map of account public keys to the balance in that account. /// A map of account public keys to the balance in that account.
@ -122,14 +134,8 @@ pub struct Bank {
/// set of accounts which are currently in the pipeline /// set of accounts which are currently in the pipeline
account_locks: Mutex<HashSet<Pubkey>>, account_locks: Mutex<HashSet<Pubkey>>,
/// A FIFO queue of `last_id` items, where each item is a set of signatures /// A FIFO queue of `last_id` items
/// that have been processed using that `last_id`. Rejected `last_id` last_ids_q: RwLock<LastIdsQ>,
/// values are so old that the `last_id` has been pulled out of the queue.
last_ids: RwLock<VecDeque<Hash>>,
/// Mapping of hashes to signature sets along with timestamp. The bank uses this data to
/// reject transactions with signatures its seen before
last_ids_sigs: RwLock<HashMap<Hash, (SignatureStatusMap, u64)>>,
/// The number of transactions the bank has processed without error since the /// The number of transactions the bank has processed without error since the
/// start of the ledger. /// start of the ledger.
@ -152,13 +158,21 @@ pub struct Bank {
signature_subscriptions: RwLock<HashMap<Signature, HashMap<Pubkey, Sink<RpcSignatureStatus>>>>, signature_subscriptions: RwLock<HashMap<Signature, HashMap<Pubkey, Sink<RpcSignatureStatus>>>>,
} }
impl Default for LastIdsQ {
fn default() -> Self {
LastIdsQ {
last_ids: VecDeque::new(),
last_ids_sigs: HashMap::new(),
}
}
}
impl Default for Bank { impl Default for Bank {
fn default() -> Self { fn default() -> Self {
Bank { Bank {
accounts: RwLock::new(HashMap::new()), accounts: RwLock::new(HashMap::new()),
account_locks: Mutex::new(HashSet::new()), account_locks: Mutex::new(HashSet::new()),
last_ids: RwLock::new(VecDeque::new()), last_ids_q: RwLock::new(LastIdsQ::default()),
last_ids_sigs: RwLock::new(HashMap::new()),
transaction_count: AtomicUsize::new(0), transaction_count: AtomicUsize::new(0),
is_leader: true, is_leader: true,
finality_time: AtomicUsize::new(std::usize::MAX), finality_time: AtomicUsize::new(std::usize::MAX),
@ -206,8 +220,9 @@ impl Bank {
/// Return the last entry ID registered. /// Return the last entry ID registered.
pub fn last_id(&self) -> Hash { pub fn last_id(&self) -> Hash {
let last_ids = self.last_ids.read().expect("'last_ids' read lock"); let last_ids_q = self.last_ids_q.read().unwrap();
let last_item = last_ids let last_item = last_ids_q
.last_ids
.iter() .iter()
.last() .last()
.expect("get last item from 'last_ids' list"); .expect("get last item from 'last_ids' list");
@ -225,7 +240,7 @@ impl Bank {
/// Forget all signatures. Useful for benchmarking. /// Forget all signatures. Useful for benchmarking.
pub fn clear_signatures(&self) { pub fn clear_signatures(&self) {
for (_, sigs) in self.last_ids_sigs.write().unwrap().iter_mut() { for sigs in &mut self.last_ids_q.write().unwrap().last_ids_sigs.values_mut() {
sigs.0.clear(); sigs.0.clear();
} }
} }
@ -262,8 +277,8 @@ impl Bank {
#[cfg(test)] #[cfg(test)]
fn reserve_signature_with_last_id_test(&self, sig: &Signature, last_id: &Hash) -> Result<()> { fn reserve_signature_with_last_id_test(&self, sig: &Signature, last_id: &Hash) -> Result<()> {
let mut last_ids_sigs = self.last_ids_sigs.write().unwrap(); let mut last_ids_q = self.last_ids_q.write().unwrap();
Self::reserve_signature_with_last_id(&mut last_ids_sigs, last_id, sig) Self::reserve_signature_with_last_id(&mut last_ids_q.last_ids_sigs, last_id, sig)
} }
fn update_signature_status( fn update_signature_status(
@ -287,10 +302,10 @@ impl Bank {
} }
fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) {
let mut last_ids = self.last_ids_sigs.write().unwrap(); let mut last_ids_q = self.last_ids_q.write().unwrap();
for (i, tx) in txs.iter().enumerate() { for (i, tx) in txs.iter().enumerate() {
Self::update_signature_status_with_last_id( Self::update_signature_status_with_last_id(
&mut last_ids, &mut last_ids_q.last_ids_sigs,
&tx.signature, &tx.signature,
&res[i], &res[i],
&tx.last_id, &tx.last_id,
@ -316,10 +331,10 @@ impl Bank {
/// Return a vec of tuple of (valid index, timestamp) /// Return a vec of tuple of (valid index, timestamp)
/// index is into the passed ids slice to avoid copying hashes /// index is into the passed ids slice to avoid copying hashes
pub fn count_valid_ids(&self, ids: &[Hash]) -> Vec<(usize, u64)> { pub fn count_valid_ids(&self, ids: &[Hash]) -> Vec<(usize, u64)> {
let last_ids = self.last_ids_sigs.read().unwrap(); let last_ids_q = self.last_ids_q.read().unwrap();
let mut ret = Vec::new(); let mut ret = Vec::new();
for (i, id) in ids.iter().enumerate() { for (i, id) in ids.iter().enumerate() {
if let Some(entry) = last_ids.get(id) { if let Some(entry) = last_ids_q.last_ids_sigs.get(id) {
ret.push((i, entry.1)); ret.push((i, entry.1));
} }
} }
@ -331,23 +346,17 @@ impl Bank {
/// the oldest ones once its internal cache is full. Once boot, the /// the oldest ones once its internal cache is full. Once boot, the
/// bank will reject transactions using that `last_id`. /// bank will reject transactions using that `last_id`.
pub fn register_entry_id(&self, last_id: &Hash) { pub fn register_entry_id(&self, last_id: &Hash) {
// this must be locked first! let mut last_ids_q = self.last_ids_q.write().unwrap();
let mut last_ids = self if last_ids_q.last_ids.len() >= MAX_ENTRY_IDS {
.last_ids let id = last_ids_q.last_ids.pop_front().unwrap();
.write()
.expect("'last_ids' write lock in register_entry_id");
let mut last_ids_sigs = self
.last_ids_sigs
.write()
.expect("last_ids_sigs write lock");
if last_ids.len() >= MAX_ENTRY_IDS {
let id = last_ids.pop_front().unwrap();
info!("removing last_id {}", id); info!("removing last_id {}", id);
last_ids_sigs.remove(&id); last_ids_q.last_ids_sigs.remove(&id);
} }
inc_new_counter_info!("bank-register_entry_id-registered", 1); inc_new_counter_info!("bank-register_entry_id-registered", 1);
last_ids_sigs.insert(*last_id, (HashMap::new(), timestamp())); last_ids_q
last_ids.push_back(*last_id); .last_ids_sigs
.insert(*last_id, (HashMap::new(), timestamp()));
last_ids_q.last_ids.push_back(*last_id);
} }
/// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method. /// Process a Transaction. This is used for unit tests and simply calls the vector Bank::process_transactions method.
@ -392,8 +401,7 @@ impl Bank {
&self, &self,
tx: &Transaction, tx: &Transaction,
accounts: &HashMap<Pubkey, Account>, accounts: &HashMap<Pubkey, Account>,
last_ids_sigs: &mut HashMap<Hash, (SignatureStatusMap, u64)>, last_ids_q: &mut LastIdsQ,
last_ids: &VecDeque<Hash>,
max_age: usize, max_age: usize,
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Result<Vec<Account>> { ) -> Result<Vec<Account>> {
@ -405,15 +413,18 @@ impl Bank {
error_counters.insufficient_funds += 1; error_counters.insufficient_funds += 1;
Err(BankError::InsufficientFundsForFee) Err(BankError::InsufficientFundsForFee)
} else { } else {
if !Self::check_entry_id_age(last_ids, tx.last_id, max_age) { if !Self::check_entry_id_age(&last_ids_q.last_ids, tx.last_id, max_age) {
error_counters.last_id_not_found += 1; error_counters.last_id_not_found += 1;
return Err(BankError::LastIdNotFound); return Err(BankError::LastIdNotFound);
} }
// There is no way to predict what contract will execute without an error // There is no way to predict what contract will execute without an error
// If a fee can pay for execution then the contract will be scheduled // If a fee can pay for execution then the contract will be scheduled
let err = let err = Self::reserve_signature_with_last_id(
Self::reserve_signature_with_last_id(last_ids_sigs, &tx.last_id, &tx.signature); &mut last_ids_q.last_ids_sigs,
&tx.last_id,
&tx.signature,
);
if let Err(BankError::LastIdNotFound) = err { if let Err(BankError::LastIdNotFound) = err {
error_counters.reserve_last_id += 1; error_counters.reserve_last_id += 1;
} else if let Err(BankError::DuplicateSignature) = err { } else if let Err(BankError::DuplicateSignature) = err {
@ -466,20 +477,13 @@ impl Bank {
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Vec<(Result<Vec<Account>>)> { ) -> Vec<(Result<Vec<Account>>)> {
let accounts = self.accounts.read().unwrap(); let accounts = self.accounts.read().unwrap();
// this must be locked first! let mut last_ids_q = self.last_ids_q.write().unwrap();
let last_ids = self.last_ids.read().unwrap();
let mut last_sigs = self.last_ids_sigs.write().unwrap();
txs.iter() txs.iter()
.zip(results.into_iter()) .zip(results.into_iter())
.map(|etx| match etx { .map(|etx| match etx {
(tx, Ok(())) => self.load_account( (tx, Ok(())) => {
tx, self.load_account(tx, &accounts, &mut last_ids_q, max_age, error_counters)
&accounts, }
&mut last_sigs,
&last_ids,
max_age,
error_counters,
),
(_, Err(e)) => Err(e), (_, Err(e)) => Err(e),
}).collect() }).collect()
} }
@ -1059,8 +1063,8 @@ impl Bank {
} }
pub fn get_signature_status(&self, signature: &Signature) -> Result<()> { pub fn get_signature_status(&self, signature: &Signature) -> Result<()> {
let last_ids_sigs = self.last_ids_sigs.read().unwrap(); let last_ids_q = self.last_ids_q.read().unwrap();
for (_hash, (signatures, _)) in last_ids_sigs.iter() { for (signatures, _) in last_ids_q.last_ids_sigs.values() {
if let Some(res) = signatures.get(signature) { if let Some(res) = signatures.get(signature) {
return res.clone(); return res.clone();
} }
@ -1073,9 +1077,10 @@ impl Bank {
} }
pub fn get_signature(&self, last_id: &Hash, signature: &Signature) -> Option<Result<()>> { pub fn get_signature(&self, last_id: &Hash, signature: &Signature) -> Option<Result<()>> {
self.last_ids_sigs self.last_ids_q
.read() .read()
.unwrap() .unwrap()
.last_ids_sigs
.get(last_id) .get(last_id)
.and_then(|sigs| sigs.0.get(signature).cloned()) .and_then(|sigs| sigs.0.get(signature).cloned())
} }