Filter out all unprocessed transactions before forwarding them (#4266)

* Filter out all unprocessed transactions before forwarding them

* fix clippy
This commit is contained in:
Pankaj Garg 2019-05-13 14:40:05 -07:00 committed by GitHub
parent a4fb01b42b
commit c9b86018c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 106 additions and 49 deletions

View File

@ -145,16 +145,10 @@ impl BankingStage {
buffered_packets: &[(Packets, Vec<usize>)],
) -> Result<UnprocessedPackets> {
let mut unprocessed_packets = vec![];
let mut bank_shutdown = false;
let mut rebuffered_packets = 0;
let mut new_tx_count = 0;
for (msgs, unprocessed_indexes) in buffered_packets {
if bank_shutdown {
rebuffered_packets += unprocessed_indexes.len();
unprocessed_packets.push((msgs.to_owned(), unprocessed_indexes.to_owned()));
continue;
}
let mut buffered_packets_iter = buffered_packets.iter();
while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() {
let bank = poh_recorder.lock().unwrap().bank();
if bank.is_none() {
rebuffered_packets += unprocessed_indexes.len();
@ -173,14 +167,22 @@ impl BankingStage {
new_tx_count += processed;
if processed < verified_txs_len {
bank_shutdown = true;
}
// Collect any unprocessed transactions in this batch for forwarding
if !new_unprocessed_indexes.is_empty() {
rebuffered_packets += new_unprocessed_indexes.len();
unprocessed_packets.push((msgs.to_owned(), new_unprocessed_indexes));
}
if processed < verified_txs_len {
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() {
let unprocessed_indexes =
Self::filter_unprocessed_packets(&bank, &msgs, &unprocessed_indexes);
if !unprocessed_indexes.is_empty() {
unprocessed_packets.push((msgs.to_owned(), unprocessed_indexes));
}
}
}
}
inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets);
@ -544,12 +546,11 @@ impl BankingStage {
.collect()
}
fn process_received_packets(
bank: &Arc<Bank>,
poh: &Arc<Mutex<PohRecorder>>,
// This function deserializes packets into transactions and returns non-None transactions
fn transactions_from_packets(
msgs: &Packets,
transaction_indexes: Vec<usize>,
) -> Result<(usize, usize, Vec<usize>)> {
transaction_indexes: &[usize],
) -> (Vec<Transaction>, Vec<usize>) {
let packets = Packets::new(
transaction_indexes
.iter()
@ -559,15 +560,37 @@ impl BankingStage {
let transactions = Self::deserialize_transactions(&packets);
debug!(
"banking-stage-tx bank: {} transactions received {}",
bank.slot(),
transactions.len()
Self::filter_transaction_indexes(transactions, &transaction_indexes)
}
// This function filters pending transactions that are still valid
fn filter_pending_transactions(
bank: &Arc<Bank>,
transactions: &[Transaction],
transaction_indexes: &[usize],
pending_indexes: &[usize],
) -> Vec<usize> {
let filter = Self::prepare_filter_for_pending_transactions(transactions, pending_indexes);
let mut error_counters = ErrorCounters::default();
let result = bank.check_transactions(
transactions,
&filter,
(MAX_RECENT_BLOCKHASHES - MAX_TRANSACTION_FORWARDING_DELAY) / 2,
&mut error_counters,
);
let (transactions, transaction_indexes) =
Self::filter_transaction_indexes(transactions, &transaction_indexes);
Self::filter_valid_transaction_indexes(&result, transaction_indexes)
}
fn process_received_packets(
bank: &Arc<Bank>,
poh: &Arc<Mutex<PohRecorder>>,
msgs: &Packets,
transaction_indexes: Vec<usize>,
) -> Result<(usize, usize, Vec<usize>)> {
let (transactions, transaction_indexes) =
Self::transactions_from_packets(msgs, &transaction_indexes);
debug!(
"bank: {} filtered transactions {}",
bank.slot(),
@ -579,22 +602,46 @@ impl BankingStage {
let (processed, unprocessed_tx_indexes) =
Self::process_transactions(bank, &transactions, poh)?;
let filter =
Self::prepare_filter_for_pending_transactions(&transactions, &unprocessed_tx_indexes);
let unprocessed_tx_count = unprocessed_tx_indexes.len();
let mut error_counters = ErrorCounters::default();
let result = bank.check_transactions(
let filtered_unprocessed_tx_indexes = Self::filter_pending_transactions(
bank,
&transactions,
&filter,
(MAX_RECENT_BLOCKHASHES - MAX_TRANSACTION_FORWARDING_DELAY) / 2,
&mut error_counters,
&transaction_indexes,
&unprocessed_tx_indexes,
);
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
unprocessed_tx_count.saturating_sub(filtered_unprocessed_tx_indexes.len())
);
Ok((
processed,
tx_len,
Self::filter_valid_transaction_indexes(&result, &transaction_indexes),
))
Ok((processed, tx_len, filtered_unprocessed_tx_indexes))
}
fn filter_unprocessed_packets(
bank: &Arc<Bank>,
msgs: &Packets,
transaction_indexes: &[usize],
) -> Vec<usize> {
let (transactions, transaction_indexes) =
Self::transactions_from_packets(msgs, &transaction_indexes);
let tx_count = transaction_indexes.len();
let unprocessed_tx_indexes = (0..transactions.len()).collect_vec();
let filtered_unprocessed_tx_indexes = Self::filter_pending_transactions(
bank,
&transactions,
&transaction_indexes,
&unprocessed_tx_indexes,
);
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
tx_count.saturating_sub(filtered_unprocessed_tx_indexes.len())
);
filtered_unprocessed_tx_indexes
}
/// Process the incoming packets
@ -620,22 +667,14 @@ impl BankingStage {
inc_new_counter_info!("banking_stage-transactions_received", count);
let proc_start = Instant::now();
let mut new_tx_count = 0;
let mut mms_iter = mms.into_iter();
let mut unprocessed_packets = vec![];
let mut bank_shutdown = false;
for (msgs, vers) in mms {
while let Some((msgs, vers)) = mms_iter.next() {
let packet_indexes: Vec<usize> = vers
.iter()
.enumerate()
.filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None })
.collect();
if bank_shutdown {
if !packet_indexes.is_empty() {
unprocessed_packets.push((msgs, packet_indexes));
}
continue;
}
let bank = poh.lock().unwrap().bank();
if bank.is_none() {
if !packet_indexes.is_empty() {
@ -648,15 +687,28 @@ impl BankingStage {
let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_received_packets(&bank, &poh, &msgs, packet_indexes)?;
if processed < verified_txs_len {
bank_shutdown = true;
}
new_tx_count += processed;
// Collect any unprocessed transactions in this batch for forwarding
if !unprocessed_indexes.is_empty() {
unprocessed_packets.push((msgs, unprocessed_indexes));
}
new_tx_count += processed;
if processed < verified_txs_len {
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
while let Some((msgs, vers)) = mms_iter.next() {
let packet_indexes: Vec<usize> = vers
.iter()
.enumerate()
.filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None })
.collect();
let unprocessed_indexes =
Self::filter_unprocessed_packets(&bank, &msgs, &packet_indexes);
if !unprocessed_indexes.is_empty() {
unprocessed_packets.push((msgs, unprocessed_indexes));
}
}
}
}
inc_new_counter_info!(

View File

@ -26,7 +26,12 @@ pub const MAX_RECENT_BLOCKHASHES: usize = MAX_HASH_AGE_IN_SECONDS;
/// This is maximum time consumed in forwarding a transaction from one node to next, before
/// it can be processed in the target node
pub const MAX_TRANSACTION_FORWARDING_DELAY: usize = 3;
#[cfg(feature = "cuda")]
pub const MAX_TRANSACTION_FORWARDING_DELAY: usize = 4;
/// More delay is expected if CUDA is not enabled (as signature verification takes longer)
#[cfg(not(feature = "cuda"))]
pub const MAX_TRANSACTION_FORWARDING_DELAY: usize = 12;
pub fn duration_as_ns(d: &Duration) -> u64 {
d.as_secs() * 1_000_000_000 + u64::from(d.subsec_nanos())