Tick entry ids as only valid last_ids (#1441)

Generate tick entry ids and only register ticks as the last_id expected by the bank.  Since the bank is MT, the in-flight pipeline of transactions cannot be close to the end of the queue or there is a high possibility that a starved thread will encode an expired last_id into the ledger.  The banking_stage therefore uses a shorter age limit for encoded last_ids then the validators.

Bench client doesn't send transactions that are older then 30 seconds.
This commit is contained in:
anatoly yakovenko 2018-10-10 17:23:06 -07:00 committed by GitHub
parent 5c523716aa
commit 5c85e037f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 307 additions and 167 deletions

View File

@ -28,17 +28,14 @@ fn bench_process_transaction(bencher: &mut Bencher) {
&mint.keypair(), &mint.keypair(),
rando0.pubkey(), rando0.pubkey(),
10_000, 10_000,
mint.last_id(), bank.last_id(),
0, 0,
); );
assert_eq!(bank.process_transaction(&tx), Ok(())); assert_eq!(bank.process_transaction(&tx), Ok(()));
// Seed the 'to' account and a cell for its signature. // Seed the 'to' account and a cell for its signature.
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
bank.register_entry_id(&last_id);
let rando1 = Keypair::new(); let rando1 = Keypair::new();
let tx = Transaction::system_move(&rando0, rando1.pubkey(), 1, last_id, 0); let tx = Transaction::system_move(&rando0, rando1.pubkey(), 1, bank.last_id(), 0);
assert_eq!(bank.process_transaction(&tx), Ok(())); assert_eq!(bank.process_transaction(&tx), Ok(()));
// Finally, return the transaction to the benchmark. // Finally, return the transaction to the benchmark.

View File

@ -47,7 +47,8 @@ use window::WINDOW_SIZE;
/// but requires clients to update its `last_id` more frequently. Raising the value /// but requires clients to update its `last_id` more frequently. Raising the value
/// lengthens the time a client must wait to be certain a missing transaction will /// lengthens the time a client must wait to be certain a missing transaction will
/// not be processed by the network. /// not be processed by the network.
pub const MAX_ENTRY_IDS: usize = 1024 * 32; pub const NUM_TICKS_PER_SECOND: usize = 10;
pub const MAX_ENTRY_IDS: usize = NUM_TICKS_PER_SECOND * 120;
pub const VERIFY_BLOCK_SIZE: usize = 16; pub const VERIFY_BLOCK_SIZE: usize = 16;
@ -106,9 +107,12 @@ type SignatureStatusMap = HashMap<Signature, Result<()>>;
#[derive(Default)] #[derive(Default)]
struct ErrorCounters { struct ErrorCounters {
account_not_found_validator: usize, account_not_found: usize,
account_not_found_leader: usize,
account_in_use: usize, account_in_use: usize,
last_id_not_found: usize,
reserve_last_id: usize,
insufficient_funds: usize,
duplicate_signature: usize,
} }
/// The state of all accounts and contracts after processing its entries. /// The state of all accounts and contracts after processing its entries.
pub struct Bank { pub struct Bank {
@ -226,6 +230,25 @@ impl Bank {
} }
} }
/// Return the position of the last_id in the last_id_queue starting from the back
/// If the last_id is not found last_id_queue.len() is returned
fn compute_entry_id_age(last_id_queue: &VecDeque<Hash>, entry_id: Hash) -> Option<usize> {
for (i, id) in last_id_queue.iter().rev().enumerate() {
if *id == entry_id {
return Some(i);
}
}
None
}
/// Check if the age of the entry_id is within the max_age
/// return false for any entries with an age equal to or above max_age
fn check_entry_id_age(last_id_queue: &VecDeque<Hash>, entry_id: Hash, max_age: usize) -> bool {
match Self::compute_entry_id_age(last_id_queue, entry_id) {
Some(age) if age < max_age => true,
_ => false,
}
}
fn reserve_signature_with_last_id( fn reserve_signature_with_last_id(
last_ids_sigs: &mut HashMap<Hash, (SignatureStatusMap, u64)>, last_ids_sigs: &mut HashMap<Hash, (SignatureStatusMap, u64)>,
last_id: &Hash, last_id: &Hash,
@ -308,6 +331,7 @@ impl Bank {
/// the oldest ones once its internal cache is full. Once boot, the /// the oldest ones once its internal cache is full. Once boot, the
/// bank will reject transactions using that `last_id`. /// bank will reject transactions using that `last_id`.
pub fn register_entry_id(&self, last_id: &Hash) { pub fn register_entry_id(&self, last_id: &Hash) {
// this must be locked first!
let mut last_ids = self let mut last_ids = self
.last_ids .last_ids
.write() .write()
@ -318,8 +342,10 @@ impl Bank {
.expect("last_ids_sigs write lock"); .expect("last_ids_sigs write lock");
if last_ids.len() >= MAX_ENTRY_IDS { if last_ids.len() >= MAX_ENTRY_IDS {
let id = last_ids.pop_front().unwrap(); let id = last_ids.pop_front().unwrap();
info!("removing last_id {}", id);
last_ids_sigs.remove(&id); last_ids_sigs.remove(&id);
} }
inc_new_counter_info!("bank-register_entry_id-registered", 1);
last_ids_sigs.insert(*last_id, (HashMap::new(), timestamp())); last_ids_sigs.insert(*last_id, (HashMap::new(), timestamp()));
last_ids.push_back(*last_id); last_ids.push_back(*last_id);
} }
@ -367,29 +393,38 @@ impl Bank {
tx: &Transaction, tx: &Transaction,
accounts: &HashMap<Pubkey, Account>, accounts: &HashMap<Pubkey, Account>,
last_ids_sigs: &mut HashMap<Hash, (SignatureStatusMap, u64)>, last_ids_sigs: &mut HashMap<Hash, (SignatureStatusMap, u64)>,
last_ids: &VecDeque<Hash>,
max_age: usize,
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Result<Vec<Account>> { ) -> Result<Vec<Account>> {
// Copy all the accounts // Copy all the accounts
if accounts.get(&tx.account_keys[0]).is_none() { if accounts.get(&tx.account_keys[0]).is_none() {
if !self.is_leader { error_counters.account_not_found += 1;
error_counters.account_not_found_validator += 1;
} else {
error_counters.account_not_found_leader += 1;
}
Err(BankError::AccountNotFound) Err(BankError::AccountNotFound)
} else if accounts.get(&tx.account_keys[0]).unwrap().tokens < tx.fee { } else if accounts.get(&tx.account_keys[0]).unwrap().tokens < tx.fee {
error_counters.insufficient_funds += 1;
Err(BankError::InsufficientFundsForFee) Err(BankError::InsufficientFundsForFee)
} else { } else {
if !Self::check_entry_id_age(last_ids, tx.last_id, max_age) {
error_counters.last_id_not_found += 1;
return Err(BankError::LastIdNotFound);
}
// There is no way to predict what contract will execute without an error
// If a fee can pay for execution then the contract will be scheduled
let err =
Self::reserve_signature_with_last_id(last_ids_sigs, &tx.last_id, &tx.signature);
if let Err(BankError::LastIdNotFound) = err {
error_counters.reserve_last_id += 1;
} else if let Err(BankError::DuplicateSignature) = err {
error_counters.duplicate_signature += 1;
}
err?;
let mut called_accounts: Vec<Account> = tx let mut called_accounts: Vec<Account> = tx
.account_keys .account_keys
.iter() .iter()
.map(|key| accounts.get(key).cloned().unwrap_or_default()) .map(|key| accounts.get(key).cloned().unwrap_or_default())
.collect(); .collect();
// There is no way to predict what contract will execute without an error
// If a fee can pay for execution then the contract will be scheduled
let err =
Self::reserve_signature_with_last_id(last_ids_sigs, &tx.last_id, &tx.signature);
err?;
called_accounts[0].tokens -= tx.fee; called_accounts[0].tokens -= tx.fee;
Ok(called_accounts) Ok(called_accounts)
} }
@ -405,10 +440,12 @@ impl Bank {
.iter() .iter()
.map(|tx| Self::lock_account(&mut account_locks, &tx.account_keys, &mut error_counters)) .map(|tx| Self::lock_account(&mut account_locks, &tx.account_keys, &mut error_counters))
.collect(); .collect();
inc_new_counter_info!( if error_counters.account_in_use != 0 {
"bank-process_transactions-account_in_use", inc_new_counter_info!(
error_counters.account_in_use "bank-process_transactions-account_in_use",
); error_counters.account_in_use
);
}
rv rv
} }
@ -425,14 +462,24 @@ impl Bank {
&self, &self,
txs: &[Transaction], txs: &[Transaction],
results: Vec<Result<()>>, results: Vec<Result<()>>,
max_age: usize,
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Vec<(Result<Vec<Account>>)> { ) -> Vec<(Result<Vec<Account>>)> {
let accounts = self.accounts.read().unwrap(); let accounts = self.accounts.read().unwrap();
// this must be locked first!
let last_ids = self.last_ids.read().unwrap();
let mut last_sigs = self.last_ids_sigs.write().unwrap(); let mut last_sigs = self.last_ids_sigs.write().unwrap();
txs.iter() txs.iter()
.zip(results.into_iter()) .zip(results.into_iter())
.map(|etx| match etx { .map(|etx| match etx {
(tx, Ok(())) => self.load_account(tx, &accounts, &mut last_sigs, error_counters), (tx, Ok(())) => self.load_account(
tx,
&accounts,
&mut last_sigs,
&last_ids,
max_age,
error_counters,
),
(_, Err(e)) => Err(e), (_, Err(e)) => Err(e),
}).collect() }).collect()
} }
@ -657,7 +704,11 @@ impl Bank {
let locked_accounts = self.lock_accounts(txs); let locked_accounts = self.lock_accounts(txs);
let lock_time = now.elapsed(); let lock_time = now.elapsed();
let now = Instant::now(); let now = Instant::now();
let results = self.execute_and_commit_transactions(txs, locked_accounts); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelyhood of any single thread getting starved and processing old ids.
// TODO: Banking stage threads should be prioratized to complete faster then this queue
// expires.
let results = self.execute_and_commit_transactions(txs, locked_accounts, MAX_ENTRY_IDS / 2);
let process_time = now.elapsed(); let process_time = now.elapsed();
let now = Instant::now(); let now = Instant::now();
self.record_transactions(txs, &results, poh)?; self.record_transactions(txs, &results, poh)?;
@ -712,11 +763,13 @@ impl Bank {
&self, &self,
txs: &[Transaction], txs: &[Transaction],
locked_accounts: Vec<Result<()>>, locked_accounts: Vec<Result<()>>,
max_age: usize,
) -> Vec<Result<()>> { ) -> Vec<Result<()>> {
debug!("processing transactions: {}", txs.len()); debug!("processing transactions: {}", txs.len());
let mut error_counters = ErrorCounters::default(); let mut error_counters = ErrorCounters::default();
let now = Instant::now(); let now = Instant::now();
let mut loaded_accounts = self.load_accounts(txs, locked_accounts, &mut error_counters); let mut loaded_accounts =
self.load_accounts(txs, locked_accounts, max_age, &mut error_counters);
let load_elapsed = now.elapsed(); let load_elapsed = now.elapsed();
let now = Instant::now(); let now = Instant::now();
let executed: Vec<Result<()>> = loaded_accounts let executed: Vec<Result<()>> = loaded_accounts
@ -741,44 +794,59 @@ impl Bank {
self.update_transaction_statuses(txs, &executed); self.update_transaction_statuses(txs, &executed);
let mut tx_count = 0; let mut tx_count = 0;
let mut err_count = 0; let mut err_count = 0;
for r in &executed { for (r, tx) in executed.iter().zip(txs.iter()) {
if r.is_ok() { if r.is_ok() {
tx_count += 1; tx_count += 1;
} else { } else {
if err_count == 0 { if err_count == 0 {
debug!("tx error: {:?}", r); info!("tx error: {:?} {:?}", r, tx);
} }
err_count += 1; err_count += 1;
} }
} }
if err_count > 0 { if err_count > 0 {
info!("{} errors of {} txs", err_count, err_count + tx_count); info!("{} errors of {} txs", err_count, err_count + tx_count);
if !self.is_leader { inc_new_counter_info!(
inc_new_counter_info!("bank-process_transactions_err-validator", err_count); "bank-process_transactions-account_not_found",
inc_new_counter_info!( error_counters.account_not_found
"bank-appy_debits-account_not_found-validator", );
error_counters.account_not_found_validator
);
} else {
inc_new_counter_info!("bank-process_transactions_err-leader", err_count);
inc_new_counter_info!(
"bank-appy_debits-account_not_found-leader",
error_counters.account_not_found_leader
);
}
inc_new_counter_info!("bank-process_transactions-error_count", err_count); inc_new_counter_info!("bank-process_transactions-error_count", err_count);
} }
self.transaction_count self.transaction_count
.fetch_add(tx_count, Ordering::Relaxed); .fetch_add(tx_count, Ordering::Relaxed);
inc_new_counter_info!("bank-process_transactions-txs", tx_count); inc_new_counter_info!("bank-process_transactions-txs", tx_count);
if 0 != error_counters.last_id_not_found {
inc_new_counter_info!(
"bank-process_transactions-error-last_id_not_found",
error_counters.last_id_not_found
);
}
if 0 != error_counters.reserve_last_id {
inc_new_counter_info!(
"bank-process_transactions-error-reserve_last_id",
error_counters.reserve_last_id
);
}
if 0 != error_counters.duplicate_signature {
inc_new_counter_info!(
"bank-process_transactions-error-duplicate_signature",
error_counters.duplicate_signature
);
}
if 0 != error_counters.insufficient_funds {
inc_new_counter_info!(
"bank-process_transactions-error-insufficient_funds",
error_counters.insufficient_funds
);
}
executed executed
} }
#[must_use] #[must_use]
pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> { pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> {
let locked_accounts = self.lock_accounts(txs); let locked_accounts = self.lock_accounts(txs);
let results = self.execute_and_commit_transactions(txs, locked_accounts); let results = self.execute_and_commit_transactions(txs, locked_accounts, MAX_ENTRY_IDS);
self.unlock_accounts(txs, &results); self.unlock_accounts(txs, &results);
results results
} }
@ -804,8 +872,9 @@ impl Bank {
for result in self.process_transactions(&entry.transactions) { for result in self.process_transactions(&entry.transactions) {
result?; result?;
} }
} else {
self.register_entry_id(&entry.id);
} }
self.register_entry_id(&entry.id);
Ok(()) Ok(())
} }
@ -1235,28 +1304,11 @@ mod tests {
let key1 = Keypair::new().pubkey(); let key1 = Keypair::new().pubkey();
let key2 = Keypair::new().pubkey(); let key2 = Keypair::new().pubkey();
let bank = Bank::new(&mint); let bank = Bank::new(&mint);
let t1 = Transaction::system_move_many(
let spend = SystemProgram::Move { tokens: 1 };
let instructions = vec![
Instruction {
program_ids_index: 0,
userdata: serialize(&spend).unwrap(),
accounts: vec![0, 1],
},
Instruction {
program_ids_index: 0,
userdata: serialize(&spend).unwrap(),
accounts: vec![0, 2],
},
];
let t1 = Transaction::new_with_instructions(
&mint.keypair(), &mint.keypair(),
&[key1, key2], &[(key1, 1), (key2, 1)],
mint.last_id(), mint.last_id(),
0, 0,
vec![SystemProgram::id()],
instructions,
); );
let res = bank.process_transactions(&vec![t1.clone()]); let res = bank.process_transactions(&vec![t1.clone()]);
assert_eq!(res.len(), 1); assert_eq!(res.len(), 1);
@ -1488,39 +1540,55 @@ mod tests {
mint: &Mint, mint: &Mint,
keypairs: &[Keypair], keypairs: &[Keypair],
) -> impl Iterator<Item = Entry> { ) -> impl Iterator<Item = Entry> {
let mut last_id = mint.last_id();
let mut hash = mint.last_id(); let mut hash = mint.last_id();
let mut entries: Vec<Entry> = vec![]; let mut entries: Vec<Entry> = vec![];
let mut num_hashes = 0;
for k in keypairs { for k in keypairs {
let txs = vec![Transaction::system_new( let txs = vec![Transaction::system_new(
&mint.keypair(), &mint.keypair(),
k.pubkey(), k.pubkey(),
1, 1,
hash, last_id,
)]; )];
let mut e = ledger::next_entries(&hash, 0, txs); let mut e = ledger::next_entries(&hash, 0, txs);
entries.append(&mut e); entries.append(&mut e);
hash = entries.last().unwrap().id; hash = entries.last().unwrap().id;
let tick = Entry::new_mut(&mut hash, &mut num_hashes, vec![]);
last_id = hash;
entries.push(tick);
} }
entries.into_iter() entries.into_iter()
} }
fn create_sample_block(mint: &Mint, length: usize) -> impl Iterator<Item = Entry> { // create a ledger with tick entries every `ticks` entries
fn create_sample_block_with_ticks(
mint: &Mint,
length: usize,
ticks: usize,
) -> impl Iterator<Item = Entry> {
let mut entries = Vec::with_capacity(length); let mut entries = Vec::with_capacity(length);
let mut hash = mint.last_id(); let mut hash = mint.last_id();
let mut last_id = mint.last_id();
let mut num_hashes = 0; let mut num_hashes = 0;
for _ in 0..length { for i in 0..length {
let keypair = Keypair::new(); let keypair = Keypair::new();
let tx = Transaction::system_new(&mint.keypair(), keypair.pubkey(), 1, hash); let tx = Transaction::system_new(&mint.keypair(), keypair.pubkey(), 1, last_id);
let entry = Entry::new_mut(&mut hash, &mut num_hashes, vec![tx]); let entry = Entry::new_mut(&mut hash, &mut num_hashes, vec![tx]);
entries.push(entry); entries.push(entry);
if (i + 1) % ticks == 0 {
let tick = Entry::new_mut(&mut hash, &mut num_hashes, vec![]);
last_id = hash;
entries.push(tick);
}
} }
entries.into_iter() entries.into_iter()
} }
fn create_sample_ledger(length: usize) -> (impl Iterator<Item = Entry>, Pubkey) { fn create_sample_ledger(length: usize) -> (impl Iterator<Item = Entry>, Pubkey) {
let mint = Mint::new(1 + length as i64); let mint = Mint::new(length as i64 + 1);
let genesis = mint.create_entries(); let genesis = mint.create_entries();
let block = create_sample_block(&mint, length); let block = create_sample_block_with_ticks(&mint, length, length);
(genesis.into_iter().chain(block), mint.pubkey()) (genesis.into_iter().chain(block), mint.pubkey())
} }
@ -1534,7 +1602,7 @@ mod tests {
} }
#[test] #[test]
fn test_process_ledger() { fn test_process_ledger_simple() {
let (ledger, pubkey) = create_sample_ledger(1); let (ledger, pubkey) = create_sample_ledger(1);
let (ledger, dup) = ledger.tee(); let (ledger, dup) = ledger.tee();
let bank = Bank::default(); let bank = Bank::default();
@ -1542,10 +1610,13 @@ mod tests {
.process_ledger(ledger, &mut LeaderScheduler::default()) .process_ledger(ledger, &mut LeaderScheduler::default())
.unwrap(); .unwrap();
assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, 3); assert_eq!(ledger_height, 4);
assert_eq!(tail.len(), 3); assert_eq!(tail.len(), 4);
assert_eq!(tail, dup.collect_vec()); assert_eq!(tail, dup.collect_vec());
let last_entry = &tail[tail.len() - 1]; let last_entry = &tail[tail.len() - 1];
// last entry is a tick
assert_eq!(0, last_entry.transactions.len());
// tick is registered
assert_eq!(bank.last_id(), last_entry.id); assert_eq!(bank.last_id(), last_entry.id);
} }
@ -1566,7 +1637,7 @@ mod tests {
.process_ledger(ledger, &mut LeaderScheduler::default()) .process_ledger(ledger, &mut LeaderScheduler::default())
.unwrap(); .unwrap();
assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, entry_count as u64 + 2); assert_eq!(ledger_height, entry_count as u64 + 3);
assert!(tail.len() <= window_size); assert!(tail.len() <= window_size);
let last_entry = &tail[tail.len() - 1]; let last_entry = &tail[tail.len() - 1];
assert_eq!(bank.last_id(), last_entry.id); assert_eq!(bank.last_id(), last_entry.id);
@ -1598,7 +1669,7 @@ mod tests {
fn test_process_ledger_from_files() { fn test_process_ledger_from_files() {
let mint = Mint::new(2); let mint = Mint::new(2);
let genesis = to_file_iter(mint.create_entries().into_iter()); let genesis = to_file_iter(mint.create_entries().into_iter());
let block = to_file_iter(create_sample_block(&mint, 1)); let block = to_file_iter(create_sample_block_with_ticks(&mint, 1, 1));
let bank = Bank::default(); let bank = Bank::default();
bank.process_ledger(genesis.chain(block), &mut LeaderScheduler::default()) bank.process_ledger(genesis.chain(block), &mut LeaderScheduler::default())
@ -1665,7 +1736,8 @@ mod tests {
let pay_alice = vec![tx1]; let pay_alice = vec![tx1];
let locked_alice = bank.lock_accounts(&pay_alice); let locked_alice = bank.lock_accounts(&pay_alice);
let results_alice = bank.execute_and_commit_transactions(&pay_alice, locked_alice); let results_alice =
bank.execute_and_commit_transactions(&pay_alice, locked_alice, MAX_ENTRY_IDS);
assert_eq!(results_alice[0], Ok(())); assert_eq!(results_alice[0], Ok(()));
// try executing an interleaved transfer twice // try executing an interleaved transfer twice
@ -1777,4 +1849,33 @@ mod tests {
.contains_key(&signature) .contains_key(&signature)
); );
} }
#[test]
fn test_entry_id_age() {
let mut q = VecDeque::new();
let hash1 = Hash::default();
let hash2 = hash(hash1.as_ref());
let hash3 = hash(hash2.as_ref());
assert_eq!(Bank::compute_entry_id_age(&q, hash1), None);
q.push_back(hash1);
assert_eq!(Bank::compute_entry_id_age(&q, hash1), Some(0));
q.push_back(hash2);
assert_eq!(Bank::compute_entry_id_age(&q, hash1), Some(1));
assert_eq!(Bank::compute_entry_id_age(&q, hash2), Some(0));
assert_eq!(Bank::compute_entry_id_age(&q, hash3), None);
// all are below 2
assert_eq!(Bank::check_entry_id_age(&q, hash2, 2), true);
assert_eq!(Bank::check_entry_id_age(&q, hash1, 2), true);
// hash2 is most recent with age 0, max is 1, anything equal to max or above is rejected
assert_eq!(Bank::check_entry_id_age(&q, hash2, 1), true);
assert_eq!(Bank::check_entry_id_age(&q, hash1, 1), false);
// max_age 0 is always rejected
assert_eq!(Bank::check_entry_id_age(&q, hash1, 0), false);
assert_eq!(Bank::check_entry_id_age(&q, hash2, 0), false);
// hash3 is not in the q
assert_eq!(Bank::check_entry_id_age(&q, hash3, 3), false);
}
} }

View File

@ -2,7 +2,7 @@
//! to contruct a software pipeline. The stage uses all available CPU cores and //! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU. //! can do its processing in parallel with signature verification on the GPU.
use bank::Bank; use bank::{Bank, NUM_TICKS_PER_SECOND};
use bincode::deserialize; use bincode::deserialize;
use counter::Counter; use counter::Counter;
use entry::Entry; use entry::Entry;
@ -44,7 +44,7 @@ pub enum Config {
impl Default for Config { impl Default for Config {
fn default() -> Config { fn default() -> Config {
// TODO: Change this to Tick to enable PoH // TODO: Change this to Tick to enable PoH
Config::Sleep(Duration::from_millis(500)) Config::Sleep(Duration::from_millis(1000 / NUM_TICKS_PER_SECOND as u64))
} }
} }
impl BankingStage { impl BankingStage {

View File

@ -2,13 +2,17 @@ extern crate bincode;
#[macro_use] #[macro_use]
extern crate clap; extern crate clap;
extern crate influx_db_client; extern crate influx_db_client;
extern crate rand;
extern crate rayon; extern crate rayon;
#[macro_use]
extern crate log;
extern crate serde_json; extern crate serde_json;
#[macro_use] #[macro_use]
extern crate solana; extern crate solana;
use clap::{App, Arg}; use clap::{App, Arg};
use influx_db_client as influxdb; use influx_db_client as influxdb;
use rand::{thread_rng, Rng};
use rayon::prelude::*; use rayon::prelude::*;
use solana::client::mk_client; use solana::client::mk_client;
use solana::cluster_info::{ClusterInfo, NodeInfo}; use solana::cluster_info::{ClusterInfo, NodeInfo};
@ -21,6 +25,7 @@ use solana::service::Service;
use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil}; use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil};
use solana::system_transaction::SystemTransaction; use solana::system_transaction::SystemTransaction;
use solana::thin_client::{poll_gossip_for_leader, ThinClient}; use solana::thin_client::{poll_gossip_for_leader, ThinClient};
use solana::timing::timestamp;
use solana::timing::{duration_as_ms, duration_as_s}; use solana::timing::{duration_as_ms, duration_as_s};
use solana::transaction::Transaction; use solana::transaction::Transaction;
use solana::wallet::request_airdrop; use solana::wallet::request_airdrop;
@ -173,27 +178,34 @@ fn send_barrier_transaction(barrier_client: &mut ThinClient, last_id: &mut Hash,
} }
} }
type SharedTransactions = Arc<RwLock<VecDeque<Vec<(Transaction, u64)>>>>;
fn generate_txs( fn generate_txs(
shared_txs: &Arc<RwLock<VecDeque<Vec<Transaction>>>>, shared_txs: &SharedTransactions,
source: &[Keypair], source: &[Keypair],
dest: &[Keypair], dest: &[Keypair],
last_id: &Hash,
threads: usize, threads: usize,
reclaim: bool, reclaim: bool,
leader: &NodeInfo,
) { ) {
let mut client = mk_client(leader);
let last_id = client.get_last_id();
info!("last_id: {} {:?}", last_id, Instant::now());
let tx_count = source.len(); let tx_count = source.len();
println!("Signing transactions... {} (reclaim={})", tx_count, reclaim); println!("Signing transactions... {} (reclaim={})", tx_count, reclaim);
let signing_start = Instant::now(); let signing_start = Instant::now();
let pairs: Vec<_> = source.iter().zip(dest.iter()).collect(); let pairs: Vec<_> = if !reclaim {
source.iter().zip(dest.iter()).collect()
} else {
dest.iter().zip(source.iter()).collect()
};
let transactions: Vec<_> = pairs let transactions: Vec<_> = pairs
.par_iter() .par_iter()
.map(|(id, keypair)| { .map(|(id, keypair)| {
if !reclaim { (
Transaction::system_new(id, keypair.pubkey(), 1, *last_id) Transaction::system_new(id, keypair.pubkey(), 1, last_id),
} else { timestamp(),
Transaction::system_new(keypair, id.pubkey(), 1, *last_id) )
}
}).collect(); }).collect();
let duration = signing_start.elapsed(); let duration = signing_start.elapsed();
@ -201,10 +213,11 @@ fn generate_txs(
let bsps = (tx_count) as f64 / ns as f64; let bsps = (tx_count) as f64 / ns as f64;
let nsps = ns as f64 / (tx_count) as f64; let nsps = ns as f64 / (tx_count) as f64;
println!( println!(
"Done. {:.2} thousand signatures per second, {:.2} us per signature, {} ms total time", "Done. {:.2} thousand signatures per second, {:.2} us per signature, {} ms total time, {}",
bsps * 1_000_000_f64, bsps * 1_000_000_f64,
nsps / 1_000_f64, nsps / 1_000_f64,
duration_as_ms(&duration), duration_as_ms(&duration),
last_id,
); );
metrics::submit( metrics::submit(
influxdb::Point::new("bench-tps") influxdb::Point::new("bench-tps")
@ -227,7 +240,7 @@ fn generate_txs(
fn do_tx_transfers( fn do_tx_transfers(
exit_signal: &Arc<AtomicBool>, exit_signal: &Arc<AtomicBool>,
shared_txs: &Arc<RwLock<VecDeque<Vec<Transaction>>>>, shared_txs: &SharedTransactions,
leader: &NodeInfo, leader: &NodeInfo,
shared_tx_thread_count: &Arc<AtomicIsize>, shared_tx_thread_count: &Arc<AtomicIsize>,
total_tx_sent_count: &Arc<AtomicUsize>, total_tx_sent_count: &Arc<AtomicUsize>,
@ -249,7 +262,11 @@ fn do_tx_transfers(
let tx_len = txs0.len(); let tx_len = txs0.len();
let transfer_start = Instant::now(); let transfer_start = Instant::now();
for tx in txs0 { for tx in txs0 {
client.transfer_signed(&tx).unwrap(); let now = timestamp();
if now > tx.1 && now - tx.1 > 1000 * 30 {
continue;
}
client.transfer_signed(&tx.0).unwrap();
} }
shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed);
total_tx_sent_count.fetch_add(tx_len, Ordering::Relaxed); total_tx_sent_count.fetch_add(tx_len, Ordering::Relaxed);
@ -274,28 +291,45 @@ fn do_tx_transfers(
} }
} }
fn split_tokens(tokens: i64, per_unit: i64, max_units: usize) -> (usize, i64) { const MAX_SPENDS_PER_TX: usize = 5;
let total_blocks = tokens / per_unit; fn verify_transfer(client: &mut ThinClient, tx: &Transaction) -> bool {
let max_keys_to_fund = cmp::min(total_blocks - 1, max_units as i64); if client.poll_for_signature(&tx.signature).is_err() {
let blocks_per_unit = total_blocks / (max_keys_to_fund + 1); println!("no signature");
(max_keys_to_fund as usize, blocks_per_unit * per_unit) return false;
}
for a in &tx.account_keys[1..] {
if client.poll_get_balance(a).unwrap_or(0) == 0 {
println!(
"no balance {} source bal: {} {:?}",
a,
client.poll_get_balance(&tx.account_keys[0]).unwrap_or(0),
tx
);
return false;
}
}
true
} }
/// fund the dests keys by spending all of the source keys into MAX_SPENDS_PER_TX
/// on every iteration. This allows us to replay the transfers because the source is either empty,
/// or full
fn fund_keys(client: &mut ThinClient, source: &Keypair, dests: &[Keypair], tokens: i64) { fn fund_keys(client: &mut ThinClient, source: &Keypair, dests: &[Keypair], tokens: i64) {
let max_per_move = 5; let total = tokens * dests.len() as i64;
let total = tokens * (dests.len() as i64 + 1);
let mut funded: Vec<(&Keypair, i64)> = vec![(source, total)]; let mut funded: Vec<(&Keypair, i64)> = vec![(source, total)];
let mut notfunded: Vec<&Keypair> = dests.iter().collect(); let mut notfunded: Vec<&Keypair> = dests.iter().collect();
println!("funding keys {}", dests.len()); println!("funding keys {}", dests.len());
while !notfunded.is_empty() { while !notfunded.is_empty() {
let last_id = client.get_last_id();
let mut new_funded: Vec<(&Keypair, i64)> = vec![]; let mut new_funded: Vec<(&Keypair, i64)> = vec![];
let mut to_fund = vec![]; let mut to_fund = vec![];
println!("creating from... {}", funded.len()); println!("creating from... {}", funded.len());
for f in &mut funded { for f in &mut funded {
let max_units = cmp::min(notfunded.len(), max_per_move); let max_units = cmp::min(notfunded.len(), MAX_SPENDS_PER_TX);
let (num, per_unit) = split_tokens(f.1, tokens, max_units); if max_units == 0 {
let start = notfunded.len() - num; break;
}
let start = notfunded.len() - max_units;
let per_unit = f.1 / (max_units as i64);
let moves: Vec<_> = notfunded[start..] let moves: Vec<_> = notfunded[start..]
.iter() .iter()
.map(|k| (k.pubkey(), per_unit)) .map(|k| (k.pubkey(), per_unit))
@ -307,25 +341,38 @@ fn fund_keys(client: &mut ThinClient, source: &Keypair, dests: &[Keypair], token
if !moves.is_empty() { if !moves.is_empty() {
to_fund.push((f.0, moves)); to_fund.push((f.0, moves));
} }
f.1 -= per_unit * (num as i64);
assert!(f.1 >= per_unit);
} }
println!("generating... {}", to_fund.len()); println!("sending... {}", to_fund.len());
let to_fund_txs: Vec<_> = to_fund // try to transfer a few at a time with recent last_id
.par_iter() to_fund.chunks(10_000).for_each(|chunk| {
.map(|(k, m)| Transaction::system_move_many(k, &m, last_id, 0)) loop {
.collect(); let last_id = client.get_last_id();
println!("transfering... {}", to_fund.len()); println!("generating... {} {}", chunk.len(), last_id);
to_fund_txs.iter().for_each(|tx| { let mut to_fund_txs: Vec<_> = chunk
let _ = client.transfer_signed(&tx).expect("transfer"); .par_iter()
.map(|(k, m)| Transaction::system_move_many(k, &m, last_id, 0))
.collect();
// with randomly distributed the failures
// most of the account pairs should have some funding in one of the pairs
// durring generate_tx step
thread_rng().shuffle(&mut to_fund_txs);
println!("transfering... {}", chunk.len());
to_fund_txs.iter().for_each(|tx| {
let _ = client.transfer_signed(&tx).expect("transfer");
});
// randomly sample some of the transfers
thread_rng().shuffle(&mut to_fund_txs);
let max = cmp::min(10, to_fund_txs.len());
if to_fund_txs[..max]
.iter()
.all(|tx| verify_transfer(client, tx))
{
break;
}
}
}); });
println!( println!("funded: {} left: {}", new_funded.len(), notfunded.len());
"funded {} total: {} left: {}", funded = new_funded;
new_funded.len(),
funded.len() + new_funded.len(),
notfunded.len()
);
funded.append(&mut new_funded);
} }
} }
@ -600,7 +647,13 @@ fn main() {
let mut rnd = GenKeys::new(seed); let mut rnd = GenKeys::new(seed);
println!("Creating {} keypairs...", tx_count * 2); println!("Creating {} keypairs...", tx_count * 2);
let keypairs = rnd.gen_n_keypairs(tx_count * 2); let mut total_keys = 0;
let mut target = tx_count * 2;
while target > 0 {
total_keys += target;
target /= MAX_SPENDS_PER_TX;
}
let gen_keypairs = rnd.gen_n_keypairs(total_keys as i64);
let barrier_id = rnd.gen_n_keypairs(1).pop().unwrap(); let barrier_id = rnd.gen_n_keypairs(1).pop().unwrap();
println!("Get tokens..."); println!("Get tokens...");
@ -608,13 +661,19 @@ fn main() {
// Sample the first keypair, see if it has tokens, if so then resume // Sample the first keypair, see if it has tokens, if so then resume
// to avoid token loss // to avoid token loss
let keypair0_balance = client.poll_get_balance(&keypairs[0].pubkey()).unwrap_or(0); let keypair0_balance = client
.poll_get_balance(&gen_keypairs.last().unwrap().pubkey())
.unwrap_or(0);
if num_tokens_per_account > keypair0_balance { if num_tokens_per_account > keypair0_balance {
let extra = (num_tokens_per_account - keypair0_balance) * (keypairs.len() as i64); let extra = num_tokens_per_account - keypair0_balance;
airdrop_tokens(&mut client, &leader, &id, extra); let total = extra * (gen_keypairs.len() as i64);
fund_keys(&mut client, &id, &keypairs, num_tokens_per_account); airdrop_tokens(&mut client, &leader, &id, total);
println!("adding more tokens {}", extra);
fund_keys(&mut client, &id, &gen_keypairs, extra);
} }
let start = gen_keypairs.len() - (tx_count * 2) as usize;
let keypairs = &gen_keypairs[start..];
airdrop_tokens(&mut barrier_client, &leader, &barrier_id, 1); airdrop_tokens(&mut barrier_client, &leader, &barrier_id, 1);
println!("Get last ID..."); println!("Get last ID...");
@ -641,8 +700,7 @@ fn main() {
}).unwrap() }).unwrap()
}).collect(); }).collect();
let shared_txs: Arc<RwLock<VecDeque<Vec<Transaction>>>> = let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new()));
Arc::new(RwLock::new(VecDeque::new()));
let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0)); let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0));
let total_tx_sent_count = Arc::new(AtomicUsize::new(0)); let total_tx_sent_count = Arc::new(AtomicUsize::new(0));
@ -683,9 +741,9 @@ fn main() {
&shared_txs, &shared_txs,
&keypairs[..len], &keypairs[..len],
&keypairs[len..], &keypairs[len..],
&last_id,
threads, threads,
reclaim_tokens_back_to_source_account, reclaim_tokens_back_to_source_account,
&leader,
); );
// In sustained mode overlap the transfers with generation // In sustained mode overlap the transfers with generation
// this has higher average performance but lower peak performance // this has higher average performance but lower peak performance
@ -788,16 +846,6 @@ fn converge(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
#[test]
fn test_split_tokens() {
assert_eq!(split_tokens(3, 2, 5), (0, 2));
assert_eq!(split_tokens(4, 2, 5), (1, 2));
assert_eq!(split_tokens(5, 2, 5), (1, 2));
assert_eq!(split_tokens(6, 2, 5), (2, 2));
assert_eq!(split_tokens(20, 2, 5), (5, 2));
assert_eq!(split_tokens(30, 2, 5), (5, 4));
}
#[test] #[test]
fn test_switch_directions() { fn test_switch_directions() {
assert_eq!(should_switch_directions(20, 0), false); assert_eq!(should_switch_directions(20, 0), false);

View File

@ -205,7 +205,6 @@ impl BudgetTransaction for Transaction {
fn vote(&self) -> Option<(Pubkey, Vote, Hash)> { fn vote(&self) -> Option<(Pubkey, Vote, Hash)> {
if self.instructions.len() > 1 { if self.instructions.len() > 1 {
error!("expecting only 1 Instruction per vote");
None None
} else if let Some(Instruction::NewVote(vote)) = self.instruction(0) { } else if let Some(Instruction::NewVote(vote)) = self.instruction(0) {
Some((self.account_keys[0], vote, self.last_id)) Some((self.account_keys[0], vote, self.last_id))

View File

@ -2,24 +2,15 @@
//! writes entries to the given writer, which is typically a file or //! writes entries to the given writer, which is typically a file or
//! stdout, and then sends the Entry to its output channel. //! stdout, and then sends the Entry to its output channel.
use bank::Bank;
use bincode; use bincode;
use entry::Entry; use entry::Entry;
use std::io::{self, BufRead, Error, ErrorKind, Write}; use std::io::{self, BufRead, Error, ErrorKind, Write};
use std::mem::size_of; use std::mem::size_of;
pub struct EntryWriter<'a, W> { pub struct EntryWriter {}
bank: &'a Bank,
writer: W,
}
impl<'a, W: Write> EntryWriter<'a, W> { impl EntryWriter {
/// Create a new Tpu that wraps the given Bank. fn write_entry<W: Write>(writer: &mut W, entry: &Entry) -> io::Result<()> {
pub fn new(bank: &'a Bank, writer: W) -> Self {
EntryWriter { bank, writer }
}
fn write_entry(writer: &mut W, entry: &Entry) -> io::Result<()> {
let entry_bytes = let entry_bytes =
bincode::serialize(&entry).map_err(|e| Error::new(ErrorKind::Other, e.to_string()))?; bincode::serialize(&entry).map_err(|e| Error::new(ErrorKind::Other, e.to_string()))?;
@ -32,7 +23,7 @@ impl<'a, W: Write> EntryWriter<'a, W> {
writer.flush() writer.flush()
} }
pub fn write_entries<I>(writer: &mut W, entries: I) -> io::Result<()> pub fn write_entries<W: Write, I>(writer: &mut W, entries: I) -> io::Result<()>
where where
I: IntoIterator<Item = Entry>, I: IntoIterator<Item = Entry>,
{ {
@ -41,20 +32,6 @@ impl<'a, W: Write> EntryWriter<'a, W> {
} }
Ok(()) Ok(())
} }
fn write_and_register_entry(&mut self, entry: &Entry) -> io::Result<()> {
trace!("write_and_register_entry entry");
self.bank.register_entry_id(&entry.id);
Self::write_entry(&mut self.writer, entry)
}
pub fn write_and_register_entries(&mut self, entries: &[Entry]) -> io::Result<()> {
for entry in entries {
self.write_and_register_entry(&entry)?;
}
Ok(())
}
} }
struct EntryReader<R: BufRead> { struct EntryReader<R: BufRead> {

View File

@ -53,7 +53,7 @@ impl PohRecorder {
// This guarantees PoH order and Entry production and banks LastId queue is the same. // This guarantees PoH order and Entry production and banks LastId queue is the same.
let mut poh = self.poh.lock().unwrap(); let mut poh = self.poh.lock().unwrap();
let tick = poh.record(mixin); let tick = poh.record(mixin);
self.bank.register_entry_id(&tick.id); assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function");
let entry = Entry { let entry = Entry {
num_hashes: tick.num_hashes, num_hashes: tick.num_hashes,
id: tick.id, id: tick.id,
@ -71,6 +71,7 @@ mod tests {
use mint::Mint; use mint::Mint;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::Arc; use std::sync::Arc;
use system_transaction::test_tx;
#[test] #[test]
fn test_poh() { fn test_poh() {
@ -81,7 +82,8 @@ mod tests {
//send some data //send some data
let h1 = hash(b"hello world!"); let h1 = hash(b"hello world!");
assert!(poh_recorder.record(h1, vec![]).is_ok()); let tx = test_tx();
assert!(poh_recorder.record(h1, vec![tx]).is_ok());
assert!(poh_recorder.tick().is_ok()); assert!(poh_recorder.tick().is_ok());
//get some events //get some events

View File

@ -135,7 +135,7 @@ impl SystemTransaction for Transaction {
Instruction { Instruction {
program_ids_index: 0, program_ids_index: 0,
userdata: serialize(&spend).unwrap(), userdata: serialize(&spend).unwrap(),
accounts: vec![0, i as u8], accounts: vec![0, i as u8 + 1],
} }
}).collect(); }).collect();
let to_keys: Vec<_> = moves.iter().map(|(to_key, _)| *to_key).collect(); let to_keys: Vec<_> = moves.iter().map(|(to_key, _)| *to_key).collect();
@ -215,4 +215,19 @@ mod tests {
let sign_data0b = tx0.get_sign_data(); let sign_data0b = tx0.get_sign_data();
assert_ne!(sign_data0a, sign_data0b); assert_ne!(sign_data0a, sign_data0b);
} }
#[test]
fn test_move_many() {
let from = Keypair::new();
let t1 = Keypair::new();
let t2 = Keypair::new();
let moves = vec![(t1.pubkey(), 1), (t2.pubkey(), 2)];
let tx = Transaction::system_move_many(&from, &moves, Default::default(), 0);
assert_eq!(tx.account_keys[0], from.pubkey());
assert_eq!(tx.account_keys[1], t1.pubkey());
assert_eq!(tx.account_keys[2], t2.pubkey());
assert_eq!(tx.instructions.len(), 2);
assert_eq!(tx.instructions[0].accounts, vec![0, 1]);
assert_eq!(tx.instructions[1].accounts, vec![0, 2]);
}
} }

View File

@ -18,8 +18,10 @@ use solana::ncp::Ncp;
use solana::result; use solana::result;
use solana::service::Service; use solana::service::Service;
use solana::signature::{Keypair, KeypairUtil}; use solana::signature::{Keypair, KeypairUtil};
use solana::system_transaction::SystemTransaction;
use solana::thin_client::ThinClient; use solana::thin_client::ThinClient;
use solana::timing::{duration_as_ms, duration_as_s}; use solana::timing::{duration_as_ms, duration_as_s};
use solana::transaction::Transaction;
use solana::window::{default_window, WINDOW_SIZE}; use solana::window::{default_window, WINDOW_SIZE};
use solana_program_interface::pubkey::Pubkey; use solana_program_interface::pubkey::Pubkey;
use std::collections::{HashSet, VecDeque}; use std::collections::{HashSet, VecDeque};
@ -1469,10 +1471,9 @@ fn send_tx_and_retry_get_balance(
let mut client = mk_client(leader); let mut client = mk_client(leader);
trace!("getting leader last_id"); trace!("getting leader last_id");
let last_id = client.get_last_id(); let last_id = client.get_last_id();
let tx = Transaction::system_new(&alice.keypair(), *bob_pubkey, transfer_amount, last_id);
info!("executing leader transfer"); info!("executing leader transfer");
let _sig = client let _res = client.retry_transfer_signed(&tx, 30);
.transfer(transfer_amount, &alice.keypair(), *bob_pubkey, &last_id)
.unwrap();
retry_get_balance(&mut client, bob_pubkey, expected) retry_get_balance(&mut client, bob_pubkey, expected)
} }