split load+execute from commit in bank, insert record between them in TPU code (#2487)

* split load+execute from commit in bank, insert record between them in TPU code
* clippy
* remove clear_signatures() race with commit_transactions()
* add #[test] back
This commit is contained in:
Rob Walker 2019-01-21 10:17:04 -08:00 committed by GitHub
parent 6611188edf
commit e6030d66eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 142 additions and 34 deletions

View File

@ -42,7 +42,8 @@ fn check_txs(receiver: &Receiver<Vec<Entry>>, ref_tx_count: usize) {
#[bench] #[bench]
fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let num_threads = BankingStage::num_threads() as usize; let num_threads = BankingStage::num_threads() as usize;
let txes = 1000 * num_threads; // a multiple of packet chunk 2X duplicates to avoid races
let txes = 192 * 50 * num_threads * 2;
let mint_total = 1_000_000_000_000; let mint_total = 1_000_000_000_000;
let mint = Mint::new(mint_total); let mint = Mint::new(mint_total);
@ -115,14 +116,18 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
bank.register_tick(&id); bank.register_tick(&id);
} }
let half_len = verified.len() / 2;
let mut start = 0;
bencher.iter(move || { bencher.iter(move || {
// make sure the tx last id is still registered // make sure the transactions are still valid
bank.register_tick(&mint.last_id()); bank.register_tick(&mint.last_id());
for v in verified.chunks(verified.len() / num_threads) { for v in verified[start..start + half_len].chunks(verified.len() / num_threads) {
verified_sender.send(v.to_vec()).unwrap(); verified_sender.send(v.to_vec()).unwrap();
} }
check_txs(&signal_receiver, txes); check_txs(&signal_receiver, txes / 2);
bank.clear_signatures(); bank.clear_signatures();
start += half_len;
start %= verified.len();
}); });
} }
@ -130,7 +135,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
let progs = 4; let progs = 4;
let num_threads = BankingStage::num_threads() as usize; let num_threads = BankingStage::num_threads() as usize;
let txes = 1000 * num_threads; // a multiple of packet chunk 2X duplicates to avoid races
let txes = 96 * 100 * num_threads * 2;
let mint_total = 1_000_000_000_000; let mint_total = 1_000_000_000_000;
let mint = Mint::new(mint_total); let mint = Mint::new(mint_total);
@ -218,13 +224,17 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
bank.register_tick(&id); bank.register_tick(&id);
} }
let half_len = verified.len() / 2;
let mut start = 0;
bencher.iter(move || { bencher.iter(move || {
// make sure the transactions are still valid // make sure the transactions are still valid
bank.register_tick(&mint.last_id()); bank.register_tick(&mint.last_id());
for v in verified.chunks(verified.len() / num_threads) { for v in verified[start..start + half_len].chunks(verified.len() / num_threads) {
verified_sender.send(v.to_vec()).unwrap(); verified_sender.send(v.to_vec()).unwrap();
} }
check_txs(&signal_receiver, txes); check_txs(&signal_receiver, txes / 2);
bank.clear_signatures(); bank.clear_signatures();
start += half_len;
start %= verified.len();
}); });
} }

View File

@ -404,26 +404,38 @@ impl Bank {
// same account state // same account state
let lock_results = self.lock_accounts(txs); let lock_results = self.lock_accounts(txs);
let lock_time = now.elapsed(); let lock_time = now.elapsed();
let now = Instant::now(); let now = Instant::now();
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce // Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelihood of any single thread getting starved and processing old ids. // the likelihood of any single thread getting starved and processing old ids.
// TODO: Banking stage threads should be prioritized to complete faster then this queue // TODO: Banking stage threads should be prioritized to complete faster then this queue
// expires. // expires.
let results = let (loaded_accounts, results) =
self.execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS as usize / 2); self.load_and_execute_transactions(txs, lock_results, MAX_ENTRY_IDS as usize / 2);
let process_time = now.elapsed(); let load_execute_time = now.elapsed();
let now = Instant::now();
self.record_transactions(txs, &results, poh)?; let record_time = {
let record_time = now.elapsed(); let now = Instant::now();
self.record_transactions(txs, &results, poh)?;
now.elapsed()
};
let commit_time = {
let now = Instant::now();
self.commit_transactions(txs, &loaded_accounts, &results);
now.elapsed()
};
let now = Instant::now(); let now = Instant::now();
// Once the accounts are new transactions can enter the pipeline to process them // Once the accounts are new transactions can enter the pipeline to process them
self.unlock_accounts(&txs, &results); self.unlock_accounts(&txs, &results);
let unlock_time = now.elapsed(); let unlock_time = now.elapsed();
debug!( debug!(
"lock: {}us process: {}us record: {}us unlock: {}us txs_len={}", "lock: {}us load_execute: {}us record: {}us commit: {}us unlock: {}us txs_len: {}",
duration_as_us(&lock_time), duration_as_us(&lock_time),
duration_as_us(&process_time), duration_as_us(&load_execute_time),
duration_as_us(&record_time), duration_as_us(&record_time),
duration_as_us(&commit_time),
duration_as_us(&unlock_time), duration_as_us(&unlock_time),
txs.len(), txs.len(),
); );
@ -447,10 +459,10 @@ impl Bank {
} }
}) })
.collect(); .collect();
debug!("processed: {} ", processed_transactions.len());
// unlock all the accounts with errors which are filtered by the above `filter_map` // unlock all the accounts with errors which are filtered by the above `filter_map`
if !processed_transactions.is_empty() { if !processed_transactions.is_empty() {
let hash = Transaction::hash(&processed_transactions); let hash = Transaction::hash(&processed_transactions);
debug!("processed ok: {} {}", processed_transactions.len(), hash);
// record and unlock will unlock all the successfull transactions // record and unlock will unlock all the successfull transactions
poh.record(hash, processed_transactions).map_err(|e| { poh.record(hash, processed_transactions).map_err(|e| {
warn!("record failure: {:?}", e); warn!("record failure: {:?}", e);
@ -472,14 +484,16 @@ impl Bank {
.load_accounts(txs, &mut last_ids, lock_results, max_age, error_counters) .load_accounts(txs, &mut last_ids, lock_results, max_age, error_counters)
} }
/// Process a batch of transactions. #[allow(clippy::type_complexity)]
#[must_use] fn load_and_execute_transactions(
pub fn execute_and_commit_transactions(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
lock_results: Vec<Result<()>>, lock_results: Vec<Result<()>>,
max_age: usize, max_age: usize,
) -> Vec<Result<()>> { ) -> (
Vec<Result<(InstructionAccounts, InstructionLoaders)>>,
Vec<Result<()>>,
) {
debug!("processing transactions: {}", txs.len()); debug!("processing transactions: {}", txs.len());
let mut error_counters = ErrorCounters::default(); let mut error_counters = ErrorCounters::default();
let now = Instant::now(); let now = Instant::now();
@ -505,23 +519,13 @@ impl Bank {
.collect(); .collect();
let execution_elapsed = now.elapsed(); let execution_elapsed = now.elapsed();
let now = Instant::now();
self.accounts
.store_accounts(txs, &executed, &loaded_accounts);
// Check account subscriptions and send notifications
self.send_account_notifications(txs, &executed, &loaded_accounts);
// once committed there is no way to unroll
let write_elapsed = now.elapsed();
debug!( debug!(
"load: {}us execute: {}us store: {}us txs_len={}", "load: {}us execute: {}us txs_len={}",
duration_as_us(&load_elapsed), duration_as_us(&load_elapsed),
duration_as_us(&execution_elapsed), duration_as_us(&execution_elapsed),
duration_as_us(&write_elapsed),
txs.len(), txs.len(),
); );
self.update_transaction_statuses(txs, &executed);
let mut tx_count = 0; let mut tx_count = 0;
let mut err_count = 0; let mut err_count = 0;
for (r, tx) in executed.iter().zip(txs.iter()) { for (r, tx) in executed.iter().zip(txs.iter()) {
@ -570,13 +574,50 @@ impl Bank {
error_counters.insufficient_funds error_counters.insufficient_funds
); );
} }
(loaded_accounts, executed)
}
fn commit_transactions(
&self,
txs: &[Transaction],
loaded_accounts: &[Result<(InstructionAccounts, InstructionLoaders)>],
executed: &[Result<()>],
) {
let now = Instant::now();
self.accounts.store_accounts(txs, executed, loaded_accounts);
// Check account subscriptions and send notifications
self.send_account_notifications(txs, executed, loaded_accounts);
// once committed there is no way to unroll
let write_elapsed = now.elapsed();
debug!(
"store: {}us txs_len={}",
duration_as_us(&write_elapsed),
txs.len(),
);
self.update_transaction_statuses(txs, &executed);
}
/// Process a batch of transactions.
#[must_use]
pub fn load_execute_and_commit_transactions(
&self,
txs: &[Transaction],
lock_results: Vec<Result<()>>,
max_age: usize,
) -> Vec<Result<()>> {
let (loaded_accounts, executed) =
self.load_and_execute_transactions(txs, lock_results, max_age);
self.commit_transactions(txs, &loaded_accounts, &executed);
executed executed
} }
#[must_use] #[must_use]
pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> { pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> {
let lock_results = self.lock_accounts(txs); let lock_results = self.lock_accounts(txs);
let results = self.execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS); let results = self.load_execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS);
self.unlock_accounts(txs, &results); self.unlock_accounts(txs, &results);
results results
} }
@ -618,7 +659,7 @@ impl Bank {
let results: Vec<Result<()>> = entries let results: Vec<Result<()>> = entries
.into_par_iter() .into_par_iter()
.map(|(e, lock_results)| { .map(|(e, lock_results)| {
let results = self.execute_and_commit_transactions( let results = self.load_execute_and_commit_transactions(
&e.transactions, &e.transactions,
lock_results.to_vec(), lock_results.to_vec(),
MAX_ENTRY_IDS, MAX_ENTRY_IDS,
@ -1354,7 +1395,7 @@ mod tests {
let lock_result = bank.lock_accounts(&pay_alice); let lock_result = bank.lock_accounts(&pay_alice);
let results_alice = let results_alice =
bank.execute_and_commit_transactions(&pay_alice, lock_result, MAX_ENTRY_IDS); bank.load_execute_and_commit_transactions(&pay_alice, lock_result, MAX_ENTRY_IDS);
assert_eq!(results_alice[0], Ok(())); assert_eq!(results_alice[0], Ok(()));
// try executing an interleaved transfer twice // try executing an interleaved transfer twice
@ -1871,4 +1912,61 @@ mod tests {
assert_eq!(bank.get_storage_last_id(), storage_last_id); assert_eq!(bank.get_storage_last_id(), storage_last_id);
assert_eq!(bank.get_pubkeys_for_entry_height(0), vec![]); assert_eq!(bank.get_pubkeys_for_entry_height(0), vec![]);
} }
#[test]
fn test_bank_process_and_record_transactions() {
let mint = Mint::new(10_000);
let bank = Arc::new(Bank::new(&mint));
let pubkey = Keypair::new().pubkey();
let transactions = vec![Transaction::system_move(
&mint.keypair(),
pubkey,
1,
mint.last_id(),
0,
)];
let (entry_sender, entry_receiver) = channel();
let mut poh_recorder = PohRecorder::new(
bank.clone(),
entry_sender,
bank.last_id(),
Some(bank.tick_height() + 1),
);
bank.process_and_record_transactions(&transactions, &poh_recorder)
.unwrap();
poh_recorder.tick().unwrap();
let mut need_tick = true;
// read entries until I find mine, might be ticks...
while need_tick {
let entries = entry_receiver.recv().unwrap();
for entry in entries {
if !entry.is_tick() {
assert_eq!(entry.transactions.len(), transactions.len());
assert_eq!(bank.get_balance(&pubkey), 1);
} else {
need_tick = false;
}
}
}
let transactions = vec![Transaction::system_move(
&mint.keypair(),
pubkey,
2,
mint.last_id(),
0,
)];
assert_eq!(
bank.process_and_record_transactions(&transactions, &poh_recorder),
Err(BankError::RecordFailure)
);
assert_eq!(bank.get_balance(&pubkey), 1);
}
} }