From fb7ba97afce752327e4bd65490987513f65b6f2c Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Tue, 9 May 2023 16:39:23 -0500 Subject: [PATCH] refactor: move test and bench only code from main code to where they are needed (#31550) * refactor: move test and bench only code to specific location * remove inactive bench test --- core/Cargo.toml | 3 - core/benches/banking_stage.rs | 13 +- core/benches/unprocessed_packet_batches.rs | 162 --------------------- core/src/banking_stage/consumer.rs | 28 ++-- core/src/unprocessed_packet_batches.rs | 25 +--- 5 files changed, 31 insertions(+), 200 deletions(-) delete mode 100644 core/benches/unprocessed_packet_batches.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index 05725e89b..01f20e3d2 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -103,8 +103,5 @@ name = "sigverify_stage" [[bench]] name = "retransmit_stage" -[[bench]] -name = "unprocessed_packet_batches" - [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index e227a8b02..23483ca20 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -27,7 +27,10 @@ use { genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, }, - solana_perf::{packet::to_packet_batches, test_tx::test_tx}, + solana_perf::{ + packet::{to_packet_batches, Packet}, + test_tx::test_tx, + }, solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry}, solana_runtime::{ bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, @@ -88,7 +91,13 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let tx = test_tx(); let transactions = vec![tx; 4194304]; - let batches = transactions_to_deserialized_packets(&transactions).unwrap(); + let batches = transactions + .iter() + .filter_map(|transaction| { + let packet = Packet::from_data(None, transaction).ok().unwrap(); + DeserializedPacket::new(packet).ok() + }) + .collect::>(); let batches_len = batches.len(); let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage( UnprocessedPacketBatches::from_iter(batches.into_iter(), 2 * batches_len), diff --git a/core/benches/unprocessed_packet_batches.rs b/core/benches/unprocessed_packet_batches.rs deleted file mode 100644 index dc07a5b57..000000000 --- a/core/benches/unprocessed_packet_batches.rs +++ /dev/null @@ -1,162 +0,0 @@ -#![allow(clippy::integer_arithmetic)] -#![feature(test)] - -extern crate test; - -use { - solana_core::{ - forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, - unprocessed_packet_batches::*, - unprocessed_transaction_storage::{ - ThreadType, UnprocessedTransactionStorage, UNPROCESSED_BUFFER_STEP_SIZE, - }, - }, - solana_perf::packet::{Packet, PacketBatch}, - solana_runtime::{ - bank::Bank, - bank_forks::BankForks, - genesis_utils::{create_genesis_config, GenesisConfigInfo}, - }, - solana_sdk::{hash::Hash, signature::Keypair, system_transaction}, - std::sync::{Arc, RwLock}, - test::Bencher, -}; - -fn build_packet_batch( - packet_per_batch_count: usize, - recent_blockhash: Option, -) -> (PacketBatch, Vec) { - let packet_batch = PacketBatch::new( - (0..packet_per_batch_count) - .map(|_| { - let tx = system_transaction::transfer( - &Keypair::new(), - &solana_sdk::pubkey::new_rand(), - 1, - recent_blockhash.unwrap_or_else(Hash::new_unique), - ); - Packet::from_data(None, tx).unwrap() - }) - .collect(), - ); - let packet_indexes: Vec = (0..packet_per_batch_count).collect(); - - (packet_batch, packet_indexes) -} - -fn insert_packet_batches( - buffer_max_size: usize, - batch_count: usize, - packet_per_batch_count: usize, -) { - let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(buffer_max_size); - - (0..batch_count).for_each(|_| { - let (packet_batch, packet_indexes) = build_packet_batch(packet_per_batch_count, None); - let deserialized_packets = deserialize_packets(&packet_batch, &packet_indexes); - unprocessed_packet_batches.insert_batch(deserialized_packets); - }); -} - -#[bench] -#[allow(clippy::unit_arg)] -fn bench_packet_clone(bencher: &mut Bencher) { - let batch_count = 1000; - let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; - - let packet_batches: Vec = (0..batch_count) - .map(|_| build_packet_batch(packet_per_batch_count, None).0) - .collect(); - - bencher.iter(|| { - test::black_box(packet_batches.iter().for_each(|packet_batch| { - let mut outer_packet = Packet::default(); - - packet_batch.iter().for_each(|packet| { - outer_packet = packet.clone(); - }); - })); - }); -} - -//* -// v1, bench: 5,600,038,163 ns/iter (+/- 940,818,988) -// v2, bench: 5,265,382,750 ns/iter (+/- 153,623,264) -#[bench] -#[ignore] -fn bench_unprocessed_packet_batches_within_limit(bencher: &mut Bencher) { - let buffer_capacity = 1_000 * UNPROCESSED_BUFFER_STEP_SIZE; - let batch_count = 1_000; - let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; - - bencher.iter(|| { - insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count); - }); -} - -// v1, bench: 6,607,014,940 ns/iter (+/- 768,191,361) -// v2, bench: 5,692,753,323 ns/iter (+/- 548,959,624) -#[bench] -#[ignore] -fn bench_unprocessed_packet_batches_beyond_limit(bencher: &mut Bencher) { - let buffer_capacity = 1_000 * UNPROCESSED_BUFFER_STEP_SIZE; - let batch_count = 1_100; - let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; - - // this is the worst scenario testing: all batches are uniformly populated with packets from - // priority 100..228, so in order to drop a batch, algo will have to drop all packets that has - // priority < 228, plus one 228. That's 2000 batch * 127 packets + 1 - // Also, since all batches have same stake distribution, the new one is always the one got - // dropped. Tho it does not change algo complexity. - bencher.iter(|| { - insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count); - }); -} - -fn buffer_iter_desc_and_forward( - buffer_max_size: usize, - batch_count: usize, - packet_per_batch_count: usize, -) { - let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(buffer_max_size); - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); - let bank = Bank::new_for_tests(&genesis_config); - let bank_forks = BankForks::new(bank); - let bank_forks = Arc::new(RwLock::new(bank_forks)); - let current_bank = bank_forks.read().unwrap().root_bank(); - // fill buffer - { - (0..batch_count).for_each(|_| { - let (packet_batch, packet_indexes) = - build_packet_batch(packet_per_batch_count, Some(genesis_config.hash())); - let deserialized_packets = deserialize_packets(&packet_batch, &packet_indexes); - unprocessed_packet_batches.insert_batch(deserialized_packets); - }); - } - - // forward whole buffer - { - let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( - unprocessed_packet_batches, - ThreadType::Transactions, - ); - let mut forward_packet_batches_by_accounts = - ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - let _ = transaction_storage.filter_forwardable_packets_and_add_batches( - current_bank, - &mut forward_packet_batches_by_accounts, - ); - } -} - -#[bench] -#[ignore] -fn bench_forwarding_unprocessed_packet_batches(bencher: &mut Bencher) { - let batch_count = 1_000; - let packet_per_batch_count = 64; - let buffer_capacity = batch_count * packet_per_batch_count; - - bencher.iter(|| { - buffer_iter_desc_and_forward(buffer_capacity, batch_count, packet_per_batch_count); - }); -} diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index d4d8051b8..c4a8bafb9 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -665,7 +665,8 @@ mod tests { super::*, crate::{ banking_stage::tests::{create_slow_genesis_config, simulate_poh}, - unprocessed_packet_batches::{self, UnprocessedPacketBatches}, + immutable_deserialized_packet::DeserializedPacketError, + unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches}, unprocessed_transaction_storage::ThreadType, }, crossbeam_channel::{unbounded, Receiver}, @@ -678,6 +679,7 @@ mod tests { get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, }, + solana_perf::packet::Packet, solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, solana_program_runtime::timings::ProgramTiming, solana_rpc::transaction_status_service::TransactionStatusService, @@ -842,6 +844,18 @@ mod tests { ) } + fn transactions_to_deserialized_packets( + transactions: &[Transaction], + ) -> Result, DeserializedPacketError> { + transactions + .iter() + .map(|transaction| { + let packet = Packet::from_data(None, transaction)?; + DeserializedPacket::new(packet) + }) + .collect() + } + #[test] fn test_bank_process_and_record_transactions() { solana_logger::setup(); @@ -1712,9 +1726,7 @@ mod tests { setup_conflicting_transactions(ledger_path.path()); let recorder = poh_recorder.read().unwrap().new_recorder(); let num_conflicting_transactions = transactions.len(); - let deserialized_packets = - unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions) - .unwrap(); + let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); assert_eq!(deserialized_packets.len(), num_conflicting_transactions); let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( @@ -1792,9 +1804,7 @@ mod tests { .push(duplicate_account_key); // corrupt transaction let recorder = poh_recorder.read().unwrap().new_recorder(); let num_conflicting_transactions = transactions.len(); - let deserialized_packets = - unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions) - .unwrap(); + let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); assert_eq!(deserialized_packets.len(), num_conflicting_transactions); let mut buffered_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( @@ -1845,9 +1855,7 @@ mod tests { setup_conflicting_transactions(ledger_path.path()); let recorder = poh_recorder.read().unwrap().new_recorder(); let num_conflicting_transactions = transactions.len(); - let deserialized_packets = - unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions) - .unwrap(); + let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); assert_eq!(deserialized_packets.len(), num_conflicting_transactions); let retryable_packet = deserialized_packets[0].clone(); let mut buffered_packet_batches = diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 2878e599f..48a37a66d 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -1,8 +1,8 @@ use { crate::immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, min_max_heap::MinMaxHeap, - solana_perf::packet::{Packet, PacketBatch}, - solana_sdk::{hash::Hash, transaction::Transaction}, + solana_perf::packet::Packet, + solana_sdk::hash::Hash, std::{ cmp::Ordering, collections::{hash_map::Entry, HashMap}, @@ -290,27 +290,6 @@ impl UnprocessedPacketBatches { } } -pub fn deserialize_packets<'a>( - packet_batch: &'a PacketBatch, - packet_indexes: &'a [usize], -) -> impl Iterator + 'a { - packet_indexes.iter().filter_map(move |packet_index| { - DeserializedPacket::new(packet_batch[*packet_index].clone()).ok() - }) -} - -pub fn transactions_to_deserialized_packets( - transactions: &[Transaction], -) -> Result, DeserializedPacketError> { - transactions - .iter() - .map(|transaction| { - let packet = Packet::from_data(None, transaction)?; - DeserializedPacket::new(packet) - }) - .collect() -} - #[cfg(test)] mod tests { use {