Index buffered transactions at the correct offset (#4126)

* tests
This commit is contained in:
Pankaj Garg 2019-05-02 19:05:53 -07:00 committed by GitHub
parent c2dfb9900e
commit 441e76ebeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 150 additions and 16 deletions

View File

@ -158,13 +158,13 @@ impl BankingStage {
}
let bank = bank.unwrap();
let (processed, verified_txs, verified_indexes) =
let (processed, verified_txs_len, verified_indexes) =
Self::process_received_packets(&bank, &poh_recorder, &msgs, &vers, *offset)?;
new_tx_count += processed;
if processed < verified_txs.len() {
rebuffered_packets += verified_txs.len() - processed;
if processed < verified_txs_len {
rebuffered_packets += verified_txs_len - processed;
bank_shutdown = true;
// Collect any unprocessed transactions in this batch for forwarding
unprocessed_packets.push((
@ -461,17 +461,18 @@ impl BankingStage {
Ok(chunk_start)
}
fn process_received_packets(
bank: &Arc<Bank>,
poh: &Arc<Mutex<PohRecorder>>,
msgs: &Packets,
fn process_received_packets_using_closure<'a, F>(
bank: &'a Arc<Bank>,
poh: &'a Arc<Mutex<PohRecorder>>,
transactions: Vec<Option<Transaction>>,
vers: &[u8],
offset: usize,
) -> Result<(usize, Vec<Transaction>, Vec<usize>)> {
f: F,
) -> Result<(usize, usize, Vec<usize>)>
where
F: Fn(&'a Bank, &[Transaction], &'a Arc<Mutex<PohRecorder>>) -> Result<(usize)>,
{
debug!("banking-stage-tx bank {}", bank.slot());
let transactions =
Self::deserialize_transactions(&Packets::new(msgs.packets[offset..].to_owned()));
let vers = vers[offset..].to_owned();
debug!(
@ -482,7 +483,7 @@ impl BankingStage {
let (verified_transactions, verified_indexes): (Vec<_>, Vec<_>) = transactions
.into_iter()
.zip(vers)
.zip(0..)
.zip(offset..)
.filter_map(|((tx, ver), index)| match tx {
None => None,
Some(tx) => {
@ -501,9 +502,33 @@ impl BankingStage {
verified_transactions.len()
);
let processed = Self::process_transactions(&bank, &verified_transactions, poh)?;
let tx_len = verified_transactions.len();
Ok((processed, verified_transactions, verified_indexes))
let processed = f(bank, &verified_transactions, poh)?;
Ok((processed, tx_len, verified_indexes))
}
fn process_received_packets<'a>(
bank: &'a Arc<Bank>,
poh: &'a Arc<Mutex<PohRecorder>>,
msgs: &Packets,
vers: &[u8],
offset: usize,
) -> Result<(usize, usize, Vec<usize>)> {
let transactions =
Self::deserialize_transactions(&Packets::new(msgs.packets[offset..].to_owned()));
Self::process_received_packets_using_closure(
bank,
poh,
transactions,
vers,
offset,
|x: &'a Bank, y: &[Transaction], z: &'a Arc<Mutex<PohRecorder>>| {
Self::process_transactions(x, y, z)
},
)
}
/// Process the incoming packets
@ -545,10 +570,10 @@ impl BankingStage {
}
let bank = bank.unwrap();
let (processed, verified_txs, verified_indexes) =
let (processed, verified_txs_len, verified_indexes) =
Self::process_received_packets(&bank, &poh, &msgs, &vers, 0)?;
if processed < verified_txs.len() {
if processed < verified_txs_len {
bank_shutdown = true;
// Collect any unprocessed transactions in this batch for forwarding
unprocessed_packets.push((msgs, verified_indexes[processed], vers));
@ -963,6 +988,115 @@ mod tests {
Blocktree::destroy(&ledger_path).unwrap();
}
#[test]
fn test_bank_process_received_transactions() {
let (genesis_block, mint_keypair) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block));
let ledger_path = get_tmp_ledger_path!();
{
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
let (poh_recorder, _entry_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
None,
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let pubkey = Pubkey::new_rand();
let transactions = vec![
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
Some(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_block.hash(),
0,
)),
];
assert_eq!(
BankingStage::process_received_packets_using_closure(
&bank,
&poh_recorder,
transactions.clone(),
&vec![1, 1, 1, 1, 1, 1],
0,
|_x: &Bank, y: &[Transaction], _z: &Arc<Mutex<PohRecorder>>| Ok(y.len()),
)
.ok(),
Some((6, 6, vec![0, 1, 2, 3, 4, 5]))
);
assert_eq!(
BankingStage::process_received_packets_using_closure(
&bank,
&poh_recorder,
transactions.clone(),
&vec![1, 1, 1, 0, 1, 1],
0,
|_x: &Bank, y: &[Transaction], _z: &Arc<Mutex<PohRecorder>>| Ok(y.len()),
)
.ok(),
Some((5, 5, vec![0, 1, 2, 4, 5]))
);
assert_eq!(
BankingStage::process_received_packets_using_closure(
&bank,
&poh_recorder,
transactions.clone(),
&vec![1, 1, 1, 0, 1, 1],
2,
|_x: &Bank, y: &[Transaction], _z: &Arc<Mutex<PohRecorder>>| Ok(y.len()),
)
.ok(),
Some((3, 3, vec![2, 4, 5]))
);
}
Blocktree::destroy(&ledger_path).unwrap();
}
#[test]
fn test_should_process_or_forward_packets() {
let my_id = Pubkey::new_rand();