diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 37d180aa19..279d35d6a0 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -158,13 +158,13 @@ impl BankingStage { } let bank = bank.unwrap(); - let (processed, verified_txs, verified_indexes) = + let (processed, verified_txs_len, verified_indexes) = Self::process_received_packets(&bank, &poh_recorder, &msgs, &vers, *offset)?; new_tx_count += processed; - if processed < verified_txs.len() { - rebuffered_packets += verified_txs.len() - processed; + if processed < verified_txs_len { + rebuffered_packets += verified_txs_len - processed; bank_shutdown = true; // Collect any unprocessed transactions in this batch for forwarding unprocessed_packets.push(( @@ -461,17 +461,18 @@ impl BankingStage { Ok(chunk_start) } - fn process_received_packets( - bank: &Arc, - poh: &Arc>, - msgs: &Packets, + fn process_received_packets_using_closure<'a, F>( + bank: &'a Arc, + poh: &'a Arc>, + transactions: Vec>, vers: &[u8], offset: usize, - ) -> Result<(usize, Vec, Vec)> { + f: F, + ) -> Result<(usize, usize, Vec)> + where + F: Fn(&'a Bank, &[Transaction], &'a Arc>) -> Result<(usize)>, + { debug!("banking-stage-tx bank {}", bank.slot()); - let transactions = - Self::deserialize_transactions(&Packets::new(msgs.packets[offset..].to_owned())); - let vers = vers[offset..].to_owned(); debug!( @@ -482,7 +483,7 @@ impl BankingStage { let (verified_transactions, verified_indexes): (Vec<_>, Vec<_>) = transactions .into_iter() .zip(vers) - .zip(0..) + .zip(offset..) .filter_map(|((tx, ver), index)| match tx { None => None, Some(tx) => { @@ -501,9 +502,33 @@ impl BankingStage { verified_transactions.len() ); - let processed = Self::process_transactions(&bank, &verified_transactions, poh)?; + let tx_len = verified_transactions.len(); - Ok((processed, verified_transactions, verified_indexes)) + let processed = f(bank, &verified_transactions, poh)?; + + Ok((processed, tx_len, verified_indexes)) + } + + fn process_received_packets<'a>( + bank: &'a Arc, + poh: &'a Arc>, + msgs: &Packets, + vers: &[u8], + offset: usize, + ) -> Result<(usize, usize, Vec)> { + let transactions = + Self::deserialize_transactions(&Packets::new(msgs.packets[offset..].to_owned())); + + Self::process_received_packets_using_closure( + bank, + poh, + transactions, + vers, + offset, + |x: &'a Bank, y: &[Transaction], z: &'a Arc>| { + Self::process_transactions(x, y, z) + }, + ) } /// Process the incoming packets @@ -545,10 +570,10 @@ impl BankingStage { } let bank = bank.unwrap(); - let (processed, verified_txs, verified_indexes) = + let (processed, verified_txs_len, verified_indexes) = Self::process_received_packets(&bank, &poh, &msgs, &vers, 0)?; - if processed < verified_txs.len() { + if processed < verified_txs_len { bank_shutdown = true; // Collect any unprocessed transactions in this batch for forwarding unprocessed_packets.push((msgs, verified_indexes[processed], vers)); @@ -963,6 +988,115 @@ mod tests { Blocktree::destroy(&ledger_path).unwrap(); } + #[test] + fn test_bank_process_received_transactions() { + let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); + let bank = Arc::new(Bank::new(&genesis_block)); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (poh_recorder, _entry_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + None, + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + ); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + + let pubkey = Pubkey::new_rand(); + + let transactions = vec![ + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + ]; + + assert_eq!( + BankingStage::process_received_packets_using_closure( + &bank, + &poh_recorder, + transactions.clone(), + &vec![1, 1, 1, 1, 1, 1], + 0, + |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(y.len()), + ) + .ok(), + Some((6, 6, vec![0, 1, 2, 3, 4, 5])) + ); + + assert_eq!( + BankingStage::process_received_packets_using_closure( + &bank, + &poh_recorder, + transactions.clone(), + &vec![1, 1, 1, 0, 1, 1], + 0, + |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(y.len()), + ) + .ok(), + Some((5, 5, vec![0, 1, 2, 4, 5])) + ); + + assert_eq!( + BankingStage::process_received_packets_using_closure( + &bank, + &poh_recorder, + transactions.clone(), + &vec![1, 1, 1, 0, 1, 1], + 2, + |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(y.len()), + ) + .ok(), + Some((3, 3, vec![2, 4, 5])) + ); + } + Blocktree::destroy(&ledger_path).unwrap(); + } + #[test] fn test_should_process_or_forward_packets() { let my_id = Pubkey::new_rand();