Change check_txs to ignore recv errors and re-enable test (#4593)

Use more chunks to avoid duplicate signature failures:
Duplicate signatures can occur because bank.clear_signatures()
can occur before the bank has actually committed the signatures
to the status cache and then error out on the next set of transactions.
This commit is contained in:
sakridge 2019-06-17 19:04:21 -07:00 committed by GitHub
parent 7fe10ba060
commit 9cafd1f85e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 32 additions and 15 deletions

View File

@ -22,7 +22,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature; use solana_sdk::signature::Signature;
use solana_sdk::system_transaction; use solana_sdk::system_transaction;
use solana_sdk::timing::{ use solana_sdk::timing::{
duration_as_ms, timestamp, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES, duration_as_us, timestamp, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES,
}; };
use std::iter; use std::iter;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -33,18 +33,20 @@ use test::Bencher;
fn check_txs(receiver: &Arc<Receiver<WorkingBankEntries>>, ref_tx_count: usize) { fn check_txs(receiver: &Arc<Receiver<WorkingBankEntries>>, ref_tx_count: usize) {
let mut total = 0; let mut total = 0;
let now = Instant::now();
loop { loop {
let entries = receiver.recv_timeout(Duration::new(1, 0)); let entries = receiver.recv_timeout(Duration::new(1, 0));
if let Ok((_, entries)) = entries { if let Ok((_, entries)) = entries {
for (entry, _) in &entries { for (entry, _) in &entries {
total += entry.transactions.len(); total += entry.transactions.len();
} }
} else {
break;
} }
if total >= ref_tx_count { if total >= ref_tx_count {
break; break;
} }
if now.elapsed().as_secs() > 60 {
break;
}
} }
assert_eq!(total, ref_tx_count); assert_eq!(total, ref_tx_count);
} }
@ -85,12 +87,12 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
} }
#[bench] #[bench]
#[ignore]
fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
solana_logger::setup(); solana_logger::setup();
let num_threads = BankingStage::num_threads() as usize; let num_threads = BankingStage::num_threads() as usize;
// a multiple of packet chunk 2X duplicates to avoid races // a multiple of packet chunk 2X duplicates to avoid races
let txes = 192 * num_threads * 2; const CHUNKS: usize = 32;
let txes = 192 * num_threads * CHUNKS;
let mint_total = 1_000_000_000_000; let mint_total = 1_000_000_000_000;
let GenesisBlockInfo { let GenesisBlockInfo {
mut genesis_block, mut genesis_block,
@ -168,7 +170,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
); );
poh_recorder.lock().unwrap().set_bank(&bank); poh_recorder.lock().unwrap().set_bank(&bank);
let half_len = verified.len() / 2; let chunk_len = verified.len() / CHUNKS;
let mut start = 0; let mut start = 0;
// This is so that the signal_receiver does not go out of scope after the closure. // This is so that the signal_receiver does not go out of scope after the closure.
@ -178,18 +180,33 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let signal_receiver2 = signal_receiver.clone(); let signal_receiver2 = signal_receiver.clone();
bencher.iter(move || { bencher.iter(move || {
let now = Instant::now(); let now = Instant::now();
for v in verified[start..start + half_len].chunks(verified.len() / num_threads) { let mut sent = 0;
trace!("sending... {}..{} {}", start, start + half_len, timestamp());
for v in verified[start..start + chunk_len].chunks(verified.len() / num_threads) {
trace!(
"sending... {}..{} {}",
start,
start + chunk_len,
timestamp()
);
for xv in v {
sent += xv.0.packets.len();
}
verified_sender.send(v.to_vec()).unwrap(); verified_sender.send(v.to_vec()).unwrap();
} }
check_txs(&signal_receiver2, txes / 2); check_txs(&signal_receiver2, txes / CHUNKS);
trace!(
"time: {} checked: {}", // This signature clear may not actually clear the signatures
duration_as_ms(&now.elapsed()), // in this chunk, but since we rotate between 32 chunks then
txes / 2 // we should clear them by the time we come around again to re-use that chunk.
);
bank.clear_signatures(); bank.clear_signatures();
start += half_len; trace!(
"time: {} checked: {} sent: {}",
duration_as_us(&now.elapsed()),
txes / CHUNKS,
sent,
);
start += chunk_len;
start %= verified.len(); start %= verified.len();
}); });
drop(vote_sender); drop(vote_sender);