Fix early exit clearing all buffered packets (#4810)

This commit is contained in:
carllin 2019-06-26 22:39:50 -07:00 committed by GitHub
parent b8ae025f90
commit 97c97db97e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 175 additions and 50 deletions

View File

@ -28,7 +28,7 @@ use solana_sdk::timing::{
};
use solana_sdk::transaction::{self, Transaction, TransactionError};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
@ -85,7 +85,6 @@ impl BankingStage {
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its blockhash is registered with the bank.
let exit = Arc::new(AtomicBool::new(false));
let my_pubkey = cluster_info.read().unwrap().id();
// Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
@ -99,7 +98,6 @@ impl BankingStage {
let poh_recorder = poh_recorder.clone();
let cluster_info = cluster_info.clone();
let exit = exit.clone();
let mut recv_start = Instant::now();
Builder::new()
.name("solana-banking-stage-tx".to_string())
@ -113,7 +111,6 @@ impl BankingStage {
enable_forwarding,
i,
);
exit.store(true, Ordering::Relaxed);
})
.unwrap()
})
@ -171,7 +168,7 @@ impl BankingStage {
&poh_recorder,
&msgs,
unprocessed_indexes.to_owned(),
)?;
);
new_tx_count += processed;
@ -358,8 +355,7 @@ impl BankingStage {
buffered_packets.append(&mut unprocessed_packets);
}
Err(err) => {
debug!("solana-banking-stage-tx: exit due to {:?}", err);
break;
debug!("solana-banking-stage-tx error: {:?}", err);
}
}
}
@ -377,16 +373,22 @@ impl BankingStage {
.collect()
}
#[allow(clippy::match_wild_err_arm)]
fn record_transactions(
bank_slot: u64,
txs: &[Transaction],
results: &[transaction::Result<()>],
poh: &Arc<Mutex<PohRecorder>>,
) -> Result<()> {
) -> (Result<()>, Vec<usize>) {
let mut ok_txs = vec![];
let processed_transactions: Vec<_> = results
.iter()
.zip(txs.iter())
.filter_map(|(r, x)| {
.enumerate()
.filter_map(|(i, (r, x))| {
if r.is_ok() {
ok_txs.push(i);
}
if Bank::can_commit(r) {
Some(x.clone())
} else {
@ -394,6 +396,7 @@ impl BankingStage {
}
})
.collect();
debug!("processed: {} ", processed_transactions.len());
// unlock all the accounts with errors which are filtered by the above `filter_map`
if !processed_transactions.is_empty() {
@ -401,13 +404,24 @@ impl BankingStage {
"banking_stage-record_transactions",
processed_transactions.len()
);
let hash = hash_transactions(&processed_transactions);
let hash = hash_transactions(&processed_transactions[..]);
// record and unlock will unlock all the successful transactions
poh.lock()
let res = poh
.lock()
.unwrap()
.record(bank_slot, hash, processed_transactions)?;
.record(bank_slot, hash, processed_transactions);
match res {
Ok(()) => (),
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => {
// If record errors, return all the ok transactions as retryable, filter out
// all the transactions we know were errors
return (res, ok_txs);
}
Err(_) => panic!("Poh recorder returned unexpected error"),
}
}
Ok(())
(Ok(()), vec![])
}
fn process_and_record_transactions_locked(
@ -415,13 +429,13 @@ impl BankingStage {
txs: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>,
lock_results: &LockedAccountsResults,
) -> Result<()> {
) -> (Result<()>, Vec<usize>) {
let now = Instant::now();
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelihood of any single thread getting starved and processing old ids.
// TODO: Banking stage threads should be prioritized to complete faster then this queue
// expires.
let (loaded_accounts, results) =
let (loaded_accounts, results, mut retryable_txs) =
bank.load_and_execute_transactions(txs, lock_results, MAX_PROCESSING_AGE);
let load_execute_time = now.elapsed();
@ -429,7 +443,12 @@ impl BankingStage {
let record_time = {
let now = Instant::now();
Self::record_transactions(bank.slot(), txs, &results, poh)?;
let (res, retryable_record_txs) =
Self::record_transactions(bank.slot(), txs, &results, poh);
retryable_txs.extend(retryable_record_txs);
if res.is_err() {
return (res, retryable_txs);
}
now.elapsed()
};
@ -450,7 +469,7 @@ impl BankingStage {
txs.len(),
);
Ok(())
(Ok(()), retryable_txs)
}
pub fn process_and_record_transactions(
@ -465,18 +484,9 @@ impl BankingStage {
let lock_results = bank.lock_accounts(txs);
let lock_time = now.elapsed();
let unprocessed_txs: Vec<_> = lock_results
.locked_accounts_results()
.iter()
.zip(chunk_offset..)
.filter_map(|(res, index)| match res {
Err(TransactionError::AccountInUse) => Some(index),
Ok(_) => None,
Err(_) => None,
})
.collect();
let results = Self::process_and_record_transactions_locked(bank, txs, poh, &lock_results);
let (result, mut retryable_txs) =
Self::process_and_record_transactions_locked(bank, txs, poh, &lock_results);
retryable_txs.iter_mut().for_each(|x| *x += chunk_offset);
let now = Instant::now();
// Once the accounts are new transactions can enter the pipeline to process them
@ -491,7 +501,7 @@ impl BankingStage {
txs.len(),
);
(results, unprocessed_txs)
(result, retryable_txs)
}
/// Sends transactions to the bank.
@ -502,7 +512,7 @@ impl BankingStage {
bank: &Bank,
transactions: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>,
) -> Result<(usize, Vec<usize>)> {
) -> (usize, Vec<usize>) {
let mut chunk_start = 0;
let mut unprocessed_txs = vec![];
while chunk_start != transactions.len() {
@ -513,30 +523,38 @@ impl BankingStage {
&Entry::serialized_to_blob_size,
);
let (result, unprocessed_txs_in_chunk) = Self::process_and_record_transactions(
let (result, retryable_txs_in_chunk) = Self::process_and_record_transactions(
bank,
&transactions[chunk_start..chunk_end],
poh,
chunk_start,
);
trace!("process_transactions: {:?}", result);
unprocessed_txs.extend_from_slice(&unprocessed_txs_in_chunk);
trace!("process_transactions result: {:?}", result);
// Add the retryable txs (transactions that errored in a way that warrants a retry)
// to the list of unprocessed txs.
unprocessed_txs.extend_from_slice(&retryable_txs_in_chunk);
if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result {
info!(
"process transactions: max height reached slot: {} height: {}",
bank.slot(),
bank.tick_height()
);
let range: Vec<usize> = (chunk_start..chunk_end).collect();
// process_and_record_transactions has returned all retryable errors in
// transactions[chunk_start..chunk_end], so we just need to push the remaining
// transactions into the unprocessed queue.
let range: Vec<usize> = (chunk_end..transactions.len()).collect();
unprocessed_txs.extend_from_slice(&range);
unprocessed_txs.sort_unstable();
unprocessed_txs.dedup();
break;
}
result?;
// Don't exit early on any other type of error, continue processing...
chunk_start = chunk_end;
}
Ok((chunk_start, unprocessed_txs))
(chunk_start, unprocessed_txs)
}
// This function returns a vector of transactions that are not None. It also returns a vector
@ -601,7 +619,7 @@ impl BankingStage {
Self::filter_transaction_indexes(transactions, &transaction_indexes)
}
// This function filters pending transactions that are still valid
// This function filters pending transactions that are still valid
fn filter_pending_transactions(
bank: &Arc<Bank>,
transactions: &[Transaction],
@ -636,7 +654,7 @@ impl BankingStage {
poh: &Arc<Mutex<PohRecorder>>,
msgs: &Packets,
transaction_indexes: Vec<usize>,
) -> Result<(usize, usize, Vec<usize>)> {
) -> (usize, usize, Vec<usize>) {
let (transactions, transaction_indexes) =
Self::transactions_from_packets(msgs, &transaction_indexes);
debug!(
@ -648,7 +666,7 @@ impl BankingStage {
let tx_len = transactions.len();
let (processed, unprocessed_tx_indexes) =
Self::process_transactions(bank, &transactions, poh)?;
Self::process_transactions(bank, &transactions, poh);
let unprocessed_tx_count = unprocessed_tx_indexes.len();
@ -663,7 +681,7 @@ impl BankingStage {
unprocessed_tx_count.saturating_sub(filtered_unprocessed_tx_indexes.len())
);
Ok((processed, tx_len, filtered_unprocessed_tx_indexes))
(processed, tx_len, filtered_unprocessed_tx_indexes)
}
fn filter_unprocessed_packets(
@ -746,7 +764,7 @@ impl BankingStage {
let bank = bank.unwrap();
let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_received_packets(&bank, &poh, &msgs, packet_indexes)?;
Self::process_received_packets(&bank, &poh, &msgs, packet_indexes);
new_tx_count += processed;
@ -1191,8 +1209,7 @@ mod tests {
];
let mut results = vec![Ok(()), Ok(())];
BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder)
.unwrap();
BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder);
let (_, entries) = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len());
@ -1201,17 +1218,48 @@ mod tests {
1,
InstructionError::new_result_with_negative_lamports(),
));
BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder)
.unwrap();
let (res, retryable) = BankingStage::record_transactions(
bank.slot(),
&transactions,
&results,
&poh_recorder,
);
res.unwrap();
assert!(retryable.is_empty());
let (_, entries) = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len());
// Other TransactionErrors should not be recorded
results[0] = Err(TransactionError::AccountNotFound);
BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder)
.unwrap();
let (res, retryable) = BankingStage::record_transactions(
bank.slot(),
&transactions,
&results,
&poh_recorder,
);
res.unwrap();
assert!(retryable.is_empty());
let (_, entries) = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1);
// Once bank is set to a new bank (setting bank.slot() + 1 in record_transactions),
// record_transactions should throw MaxHeightReached and return the set of retryable
// txs
let (res, retryable) = BankingStage::record_transactions(
bank.slot() + 1,
&transactions,
&results,
&poh_recorder,
);
assert_matches!(
res,
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
);
// The first result was an error so it's filtered out. The second result was Ok(),
// so it should be marked as retryable
assert_eq!(retryable, vec![1]);
// Should receive nothing from PohRecorder b/c record failed
assert!(entry_receiver.try_recv().is_err());
}
Blocktree::destroy(&ledger_path).unwrap();
}
@ -1617,4 +1665,68 @@ mod tests {
})
.collect_vec();
}
#[test]
fn test_process_transactions_returns_unprocessed_txs() {
solana_logger::setup();
let GenesisBlockInfo {
genesis_block,
mint_keypair,
..
} = create_genesis_block(10_000);
let bank = Arc::new(Bank::new(&genesis_block));
let mut transactions = vec![];
loop {
let pubkey = Pubkey::new_rand();
// Make enough transactions to span multiple entries
transactions.push(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
));
if entry::num_will_fit(
&transactions[0..],
packet::BLOB_DATA_SIZE as u64,
&Entry::serialized_to_blob_size,
) < transactions.len()
{
break;
}
}
let ledger_path = get_tmp_ledger_path!();
{
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
let (poh_recorder, _entry_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
Some(4),
bank.ticks_per_slot(),
&Pubkey::new_rand(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
// Poh Recorder has not working bank, so should throw MaxHeightReached error on
// record
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let (processed_transactions_count, mut retryable_txs) =
BankingStage::process_transactions(&bank, &transactions, &poh_recorder);
assert_eq!(processed_transactions_count, 0,);
retryable_txs.sort();
let expected: Vec<usize> = (0..transactions.len()).collect();
assert_eq!(retryable_txs, expected);
}
Blocktree::destroy(&ledger_path).unwrap();
}
}

View File

@ -939,10 +939,23 @@ impl Bank {
) -> (
Vec<Result<(InstructionAccounts, InstructionLoaders, InstructionCredits)>>,
Vec<Result<()>>,
Vec<usize>,
) {
debug!("processing transactions: {}", txs.len());
let mut error_counters = ErrorCounters::default();
let now = Instant::now();
let retryable_txs: Vec<_> = lock_results
.locked_accounts_results()
.iter()
.enumerate()
.filter_map(|(index, res)| match res {
Err(TransactionError::AccountInUse) => Some(index),
Ok(_) => None,
Err(_) => None,
})
.collect();
let sig_results = self.check_transactions(
txs,
lock_results.locked_accounts_results(),
@ -1004,7 +1017,7 @@ impl Bank {
inc_new_counter_info!("bank-process_transactions-txs", tx_count, 0, 1000);
inc_new_counter_info!("bank-process_transactions-sigs", signature_count, 0, 1000);
Self::update_error_counters(&error_counters);
(loaded_accounts, executed)
(loaded_accounts, executed, retryable_txs)
}
fn filter_program_errors_and_collect_fee(
@ -1089,7 +1102,7 @@ impl Bank {
lock_results: &LockedAccountsResults,
max_age: usize,
) -> Vec<Result<()>> {
let (loaded_accounts, executed) =
let (loaded_accounts, executed, _) =
self.load_and_execute_transactions(txs, lock_results, max_age);
self.commit_transactions(txs, &loaded_accounts, &executed)