Don't forward transactions that are expired or failed signature check (#4199)

This commit is contained in:
Pankaj Garg 2019-05-08 10:32:25 -07:00 committed by GitHub
parent 349306ddf7
commit 1a2b131ceb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 245 additions and 160 deletions

View File

@ -16,6 +16,7 @@ use crate::sigverify_stage::VerifiedPackets;
use bincode::deserialize; use bincode::deserialize;
use itertools::Itertools; use itertools::Itertools;
use solana_metrics::counter::Counter; use solana_metrics::counter::Counter;
use solana_runtime::accounts_db::ErrorCounters;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_runtime::locked_accounts_results::LockedAccountsResults;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
@ -487,58 +488,59 @@ impl BankingStage {
Ok((chunk_start, unprocessed_txs)) Ok((chunk_start, unprocessed_txs))
} }
fn process_received_packets_using_closure<'a, F>( // This function returns a vector of transactions that are not None. It also returns a vector
bank: &'a Arc<Bank>, // with position of the transaction in the input list
poh: &'a Arc<Mutex<PohRecorder>>, fn filter_transaction_indexes(
transactions: Vec<Option<Transaction>>, transactions: Vec<Option<Transaction>>,
indexes: &[usize], indexes: &[usize],
f: F, ) -> (Vec<Transaction>, Vec<usize>) {
) -> Result<(usize, usize, Vec<usize>)> transactions
where
F: Fn(&'a Bank, &[Transaction], &'a Arc<Mutex<PohRecorder>>) -> Result<(usize, Vec<usize>)>,
{
debug!("banking-stage-tx bank {}", bank.slot());
debug!(
"bank: {} transactions received {}",
bank.slot(),
transactions.len()
);
let (verified_transactions, verified_indexes): (Vec<_>, Vec<_>) = transactions
.into_iter() .into_iter()
.zip(indexes) .zip(indexes)
.filter_map(|(tx, index)| match tx { .filter_map(|(tx, index)| match tx {
None => None, None => None,
Some(tx) => Some((tx, index)), Some(tx) => Some((tx, index)),
}) })
.unzip(); .unzip()
debug!(
"bank: {} verified transactions {}",
bank.slot(),
verified_transactions.len()
);
let tx_len = verified_transactions.len();
let (processed, unprocessed_txs) = f(bank, &verified_transactions, poh)?;
let unprocessed_indexes: Vec<_> = unprocessed_txs
.iter()
.map(|x| verified_indexes[*x])
.collect();
Ok((processed, tx_len, unprocessed_indexes))
} }
fn process_received_packets<'a>( // This function creates a filter of transaction results with Ok() for every pending
bank: &'a Arc<Bank>, // transaction. The non-pending transactions are marked with TransactionError
poh: &'a Arc<Mutex<PohRecorder>>, fn prepare_filter_for_pending_transactions(
transactions: &[Transaction],
pending_tx_indexes: &[usize],
) -> Vec<transaction::Result<()>> {
let mut mask = vec![Err(TransactionError::BlockhashNotFound); transactions.len()];
pending_tx_indexes.iter().for_each(|x| mask[*x] = Ok(()));
mask
}
// This function returns a vector containing index of all valid transactions. A valid
// transaction has result Ok() as the value
fn filter_valid_transaction_indexes(
valid_txs: &[transaction::Result<()>],
transaction_indexes: &[usize],
) -> Vec<usize> {
let valid_transactions = valid_txs
.iter()
.enumerate()
.filter_map(|(index, x)| if x.is_ok() { Some(index) } else { None })
.collect_vec();
valid_transactions
.iter()
.map(|x| transaction_indexes[*x])
.collect()
}
fn process_received_packets(
bank: &Arc<Bank>,
poh: &Arc<Mutex<PohRecorder>>,
msgs: &Packets, msgs: &Packets,
packet_indexes: Vec<usize>, transaction_indexes: Vec<usize>,
) -> Result<(usize, usize, Vec<usize>)> { ) -> Result<(usize, usize, Vec<usize>)> {
let packets = Packets::new( let packets = Packets::new(
packet_indexes transaction_indexes
.iter() .iter()
.map(|x| msgs.packets[*x].to_owned()) .map(|x| msgs.packets[*x].to_owned())
.collect_vec(), .collect_vec(),
@ -546,15 +548,42 @@ impl BankingStage {
let transactions = Self::deserialize_transactions(&packets); let transactions = Self::deserialize_transactions(&packets);
Self::process_received_packets_using_closure( debug!(
bank, "banking-stage-tx bank: {} transactions received {}",
poh, bank.slot(),
transactions, transactions.len()
&packet_indexes, );
|x: &'a Bank, y: &[Transaction], z: &'a Arc<Mutex<PohRecorder>>| {
Self::process_transactions(x, y, z) let (transactions, transaction_indexes) =
}, Self::filter_transaction_indexes(transactions, &transaction_indexes);
)
debug!(
"bank: {} filtered transactions {}",
bank.slot(),
transactions.len()
);
let tx_len = transactions.len();
let (processed, unprocessed_tx_indexes) =
Self::process_transactions(bank, &transactions, poh)?;
let filter =
Self::prepare_filter_for_pending_transactions(&transactions, &unprocessed_tx_indexes);
let mut error_counters = ErrorCounters::default();
let result = bank.check_transactions(
&transactions,
&filter,
MAX_RECENT_BLOCKHASHES / 2,
&mut error_counters,
);
Ok((
processed,
tx_len,
Self::filter_valid_transaction_indexes(&result, &transaction_indexes),
))
} }
/// Process the incoming packets /// Process the incoming packets
@ -1041,118 +1070,158 @@ mod tests {
} }
#[test] #[test]
fn test_bank_process_received_transactions() { fn test_bank_filter_transaction_indexes() {
let (genesis_block, mint_keypair) = create_genesis_block(10_000); let (genesis_block, mint_keypair) = create_genesis_block(10_000);
let bank = Arc::new(Bank::new(&genesis_block)); let pubkey = Pubkey::new_rand();
let ledger_path = get_tmp_ledger_path!();
{
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
let (poh_recorder, _entry_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
None,
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let pubkey = Pubkey::new_rand(); let transactions = vec![
None,
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
None,
None,
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
None,
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
None,
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
None,
None,
];
let transactions = vec![ let filtered_transactions = vec![
Some(system_transaction::transfer( system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
&mint_keypair, system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
&pubkey, system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
1, system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
genesis_block.hash(), system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
0, system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
)), ];
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
];
assert_eq!( assert_eq!(
BankingStage::process_received_packets_using_closure( BankingStage::filter_transaction_indexes(
&bank, transactions.clone(),
&poh_recorder, &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
transactions.clone(), ),
&vec![0, 1, 2, 3, 4, 5], (filtered_transactions.clone(), vec![1, 2, 3, 6, 8, 10])
|_x: &Bank, y: &[Transaction], _z: &Arc<Mutex<PohRecorder>>| Ok(( );
y.len(),
vec![0, 1, 2, 3, 4, 5]
)),
)
.ok(),
Some((6, 6, vec![0, 1, 2, 3, 4, 5]))
);
assert_eq!( assert_eq!(
BankingStage::process_received_packets_using_closure( BankingStage::filter_transaction_indexes(
&bank, transactions,
&poh_recorder, &vec![1, 2, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15],
transactions.clone(), ),
&vec![0, 1, 2, 3, 4, 5], (filtered_transactions, vec![2, 4, 5, 9, 11, 13])
|_x: &Bank, y: &[Transaction], _z: &Arc<Mutex<PohRecorder>>| Ok(( );
y.len() - 1, }
vec![0, 1, 2, 4, 5]
)),
)
.ok(),
Some((5, 6, vec![0, 1, 2, 4, 5]))
);
assert_eq!( #[test]
BankingStage::process_received_packets_using_closure( fn test_bank_prepare_filter_for_pending_transaction() {
&bank, let (genesis_block, mint_keypair) = create_genesis_block(10_000);
&poh_recorder, let pubkey = Pubkey::new_rand();
transactions.clone(),
&vec![0, 1, 2, 3, 4, 5], let transactions = vec![
|_x: &Bank, y: &[Transaction], _z: &Arc<Mutex<PohRecorder>>| Ok(( system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
y.len() - 3, system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
vec![2, 4, 5] system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
)), system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
) system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
.ok(), system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
Some((3, 6, vec![2, 4, 5])) ];
);
} assert_eq!(
Blocktree::destroy(&ledger_path).unwrap(); BankingStage::prepare_filter_for_pending_transactions(&transactions, &vec![2, 4, 5],),
vec![
Err(TransactionError::BlockhashNotFound),
Err(TransactionError::BlockhashNotFound),
Ok(()),
Err(TransactionError::BlockhashNotFound),
Ok(()),
Ok(())
]
);
assert_eq!(
BankingStage::prepare_filter_for_pending_transactions(&transactions, &vec![0, 2, 3],),
vec![
Ok(()),
Err(TransactionError::BlockhashNotFound),
Ok(()),
Ok(()),
Err(TransactionError::BlockhashNotFound),
Err(TransactionError::BlockhashNotFound),
]
);
}
#[test]
fn test_bank_filter_valid_transaction_indexes() {
assert_eq!(
BankingStage::filter_valid_transaction_indexes(
&vec![
Err(TransactionError::BlockhashNotFound),
Err(TransactionError::BlockhashNotFound),
Ok(()),
Err(TransactionError::BlockhashNotFound),
Ok(()),
Ok(())
],
&vec![2, 4, 5, 9, 11, 13]
),
vec![5, 11, 13]
);
assert_eq!(
BankingStage::filter_valid_transaction_indexes(
&vec![
Ok(()),
Err(TransactionError::BlockhashNotFound),
Err(TransactionError::BlockhashNotFound),
Ok(()),
Ok(()),
Ok(())
],
&vec![1, 6, 7, 9, 31, 43]
),
vec![1, 9, 31, 43]
);
} }
#[test] #[test]

View File

@ -558,11 +558,11 @@ impl Bank {
fn check_refs( fn check_refs(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
lock_results: &LockedAccountsResults<Transaction>, lock_results: &[Result<()>],
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Vec<Result<()>> { ) -> Vec<Result<()>> {
txs.iter() txs.iter()
.zip(lock_results.locked_accounts_results()) .zip(lock_results)
.map(|(tx, lock_res)| { .map(|(tx, lock_res)| {
if lock_res.is_ok() && !tx.verify_refs() { if lock_res.is_ok() && !tx.verify_refs() {
error_counters.invalid_account_index += 1; error_counters.invalid_account_index += 1;
@ -625,6 +625,19 @@ impl Bank {
}) })
.collect() .collect()
} }
pub fn check_transactions(
&self,
txs: &[Transaction],
lock_results: &[Result<()>],
max_age: usize,
mut error_counters: &mut ErrorCounters,
) -> Vec<Result<()>> {
let refs_results = self.check_refs(txs, lock_results, &mut error_counters);
let age_results = self.check_age(txs, refs_results, max_age, &mut error_counters);
self.check_signatures(txs, age_results, &mut error_counters)
}
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
pub fn load_and_execute_transactions( pub fn load_and_execute_transactions(
&self, &self,
@ -638,9 +651,12 @@ impl Bank {
debug!("processing transactions: {}", txs.len()); debug!("processing transactions: {}", txs.len());
let mut error_counters = ErrorCounters::default(); let mut error_counters = ErrorCounters::default();
let now = Instant::now(); let now = Instant::now();
let refs_results = self.check_refs(txs, lock_results, &mut error_counters); let sig_results = self.check_transactions(
let age_results = self.check_age(txs, refs_results, max_age, &mut error_counters); txs,
let sig_results = self.check_signatures(txs, age_results, &mut error_counters); lock_results.locked_accounts_results(),
max_age,
&mut error_counters,
);
let mut loaded_accounts = self.load_accounts(txs, sig_results, &mut error_counters); let mut loaded_accounts = self.load_accounts(txs, sig_results, &mut error_counters);
let tick_height = self.tick_height(); let tick_height = self.tick_height();

View File

@ -1,5 +1,5 @@
mod accounts; mod accounts;
mod accounts_db; pub mod accounts_db;
mod accounts_index; mod accounts_index;
pub mod append_vec; pub mod append_vec;
pub mod bank; pub mod bank;