From 748220c9d3569f87c3b541dd6cbc303d5e179c19 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 19 Apr 2023 09:08:13 -0700 Subject: [PATCH] Forwarder: Add common setup for tests (#31232) --- core/src/banking_stage/forwarder.rs | 289 +++++++++++++++------------- 1 file changed, 151 insertions(+), 138 deletions(-) diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 39ff472125..d9a45a23d9 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -271,24 +271,83 @@ mod tests { unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches}, unprocessed_transaction_storage::ThreadType, }, - solana_ledger::{ - blockstore::Blockstore, genesis_utils::GenesisConfigInfo, - get_tmp_ledger_path_auto_delete, - }, + solana_gossip::cluster_info::Node, + solana_ledger::{blockstore::Blockstore, genesis_utils::GenesisConfigInfo}, solana_perf::packet::PacketFlags, - solana_poh::poh_recorder::create_test_recorder, + solana_poh::{poh_recorder::create_test_recorder, poh_service::PohService}, solana_runtime::bank::Bank, solana_sdk::{ hash::Hash, poh_config::PohConfig, signature::Keypair, signer::Signer, system_transaction, transaction::VersionedTransaction, }, solana_streamer::recvmmsg::recv_mmsg, + std::sync::atomic::AtomicBool, + tempfile::TempDir, }; + struct TestSetup { + _ledger_dir: TempDir, + bank_forks: Arc>, + poh_recorder: Arc>, + exit: Arc, + poh_service: PohService, + cluster_info: Arc, + local_node: Node, + } + + fn setup() -> TestSetup { + let validator_keypair = Arc::new(Keypair::new()); + let genesis_config_info = + create_slow_genesis_config_with_leader(10_000, &validator_keypair.pubkey()); + let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; + + let bank: Bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = bank_forks.read().unwrap().working_bank(); + + let ledger_path = TempDir::new().unwrap(); + let blockstore = Arc::new( + Blockstore::open(ledger_path.as_ref()) + .expect("Expected to be able to open database ledger"), + ); + let poh_config = PohConfig { + // limit tick count to avoid clearing working_bank at + // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + target_tick_count: Some(bank.max_tick_height() - 1), + ..PohConfig::default() + }; + + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blockstore, Some(poh_config), None); + + let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair)); + let cluster_info = Arc::new(cluster_info); + + TestSetup { + _ledger_dir: ledger_path, + bank_forks, + poh_recorder, + exit, + poh_service, + cluster_info, + local_node, + } + } + #[test] #[ignore] fn test_forwarder_budget() { solana_logger::setup(); + let TestSetup { + bank_forks, + poh_recorder, + exit, + poh_service, + cluster_info, + local_node, + .. + } = setup(); + // Create `PacketBatch` with 1 unprocessed packet let tx = system_transaction::transfer( &Keypair::new(), @@ -299,82 +358,63 @@ mod tests { let packet = Packet::from_data(None, tx).unwrap(); let deserialized_packet = DeserializedPacket::new(packet).unwrap(); - let validator_keypair = Arc::new(Keypair::new()); - let genesis_config_info = - create_slow_genesis_config_with_leader(10_000, &validator_keypair.pubkey()); - let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; - - let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); - let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); - let ledger_path = get_tmp_ledger_path_auto_delete!(); - { - let blockstore = Arc::new( - Blockstore::open(ledger_path.path()) - .expect("Expected to be able to open database ledger"), + let test_cases = vec![ + ("budget-restricted", DataBudget::restricted(), 0), + ("budget-available", DataBudget::default(), 1), + ]; + for (name, data_budget, expected_num_forwarded) in test_cases { + let forwarder = Forwarder::new( + poh_recorder.clone(), + bank_forks.clone(), + cluster_info.clone(), + Arc::new(ConnectionCache::default()), + Arc::new(data_budget), + ); + let unprocessed_packet_batches: UnprocessedPacketBatches = + UnprocessedPacketBatches::from_iter( + vec![deserialized_packet.clone()].into_iter(), + 1, + ); + let stats = BankingStageStats::default(); + forwarder.handle_forwarding( + &mut UnprocessedTransactionStorage::new_transaction_storage( + unprocessed_packet_batches, + ThreadType::Transactions, + ), + true, + &mut LeaderSlotMetricsTracker::new(0), + &stats, + &mut TracerPacketStats::new(0), ); - let poh_config = PohConfig { - // limit tick count to avoid clearing working_bank at - // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage - target_tick_count: Some(bank.max_tick_height() - 1), - ..PohConfig::default() - }; - let (exit, poh_recorder, poh_service, _entry_receiver) = - create_test_recorder(&bank, &blockstore, Some(poh_config), None); - - let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair)); - let cluster_info = Arc::new(cluster_info); let recv_socket = &local_node.sockets.tpu_forwards[0]; + recv_socket + .set_nonblocking(expected_num_forwarded == 0) + .unwrap(); - let test_cases = vec![ - ("budget-restricted", DataBudget::restricted(), 0), - ("budget-available", DataBudget::default(), 1), - ]; - for (name, data_budget, expected_num_forwarded) in test_cases { - let forwarder = Forwarder::new( - poh_recorder.clone(), - bank_forks.clone(), - cluster_info.clone(), - Arc::new(ConnectionCache::default()), - Arc::new(data_budget), - ); - let unprocessed_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter( - vec![deserialized_packet.clone()].into_iter(), - 1, - ); - let stats = BankingStageStats::default(); - forwarder.handle_forwarding( - &mut UnprocessedTransactionStorage::new_transaction_storage( - unprocessed_packet_batches, - ThreadType::Transactions, - ), - true, - &mut LeaderSlotMetricsTracker::new(0), - &stats, - &mut TracerPacketStats::new(0), - ); - - recv_socket - .set_nonblocking(expected_num_forwarded == 0) - .unwrap(); - - let mut packets = vec![Packet::default(); 2]; - let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); - assert_eq!(num_received, expected_num_forwarded, "{name}"); - } - - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); + let mut packets = vec![Packet::default(); 2]; + let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + assert_eq!(num_received, expected_num_forwarded, "{name}"); } - Blockstore::destroy(ledger_path.path()).unwrap(); + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); } #[test] #[ignore] fn test_handle_forwarding() { solana_logger::setup(); + let TestSetup { + bank_forks, + poh_recorder, + exit, + poh_service, + cluster_info, + local_node, + .. + } = setup(); + // packets are deserialized upon receiving, failed packets will not be // forwarded; Therefore need to create real packets here. let keypair = Keypair::new(); @@ -402,82 +442,55 @@ mod tests { ), ThreadType::Transactions, ); + let connection_cache = ConnectionCache::default(); - let validator_keypair = Arc::new(Keypair::new()); - let genesis_config_info = - create_slow_genesis_config_with_leader(10_000, &validator_keypair.pubkey()); - let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; - let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); - let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); - let ledger_path = get_tmp_ledger_path_auto_delete!(); - { - let blockstore = Arc::new( - Blockstore::open(ledger_path.path()) - .expect("Expected to be able to open database ledger"), + let test_cases = vec![ + ("fwd-normal", true, vec![normal_block_hash], 2), + ("fwd-no-op", true, vec![], 2), + ("fwd-no-hold", false, vec![], 0), + ]; + + let forwarder = Forwarder::new( + poh_recorder, + bank_forks, + cluster_info, + Arc::new(connection_cache), + Arc::new(DataBudget::default()), + ); + for (name, hold, expected_ids, expected_num_unprocessed) in test_cases { + let stats = BankingStageStats::default(); + forwarder.handle_forwarding( + &mut unprocessed_packet_batches, + hold, + &mut LeaderSlotMetricsTracker::new(0), + &stats, + &mut TracerPacketStats::new(0), ); - let poh_config = PohConfig { - // limit tick count to avoid clearing working_bank at - // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage - target_tick_count: Some(bank.max_tick_height() - 1), - ..PohConfig::default() - }; - let (exit, poh_recorder, poh_service, _entry_receiver) = - create_test_recorder(&bank, &blockstore, Some(poh_config), None); - - let (local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair)); let recv_socket = &local_node.sockets.tpu_forwards[0]; - let connection_cache = ConnectionCache::default(); + recv_socket + .set_nonblocking(expected_ids.is_empty()) + .unwrap(); - let test_cases = vec![ - ("fwd-normal", true, vec![normal_block_hash], 2), - ("fwd-no-op", true, vec![], 2), - ("fwd-no-hold", false, vec![], 0), - ]; - - let forwarder = Forwarder::new( - poh_recorder, - bank_forks, - Arc::new(cluster_info), - Arc::new(connection_cache), - Arc::new(DataBudget::default()), - ); - for (name, hold, expected_ids, expected_num_unprocessed) in test_cases { - let stats = BankingStageStats::default(); - forwarder.handle_forwarding( - &mut unprocessed_packet_batches, - hold, - &mut LeaderSlotMetricsTracker::new(0), - &stats, - &mut TracerPacketStats::new(0), + let mut packets = vec![Packet::default(); 2]; + let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + assert_eq!(num_received, expected_ids.len(), "{name}"); + for (i, expected_id) in expected_ids.iter().enumerate() { + assert_eq!(packets[i].meta().size, 215); + let recv_transaction: VersionedTransaction = + packets[i].deserialize_slice(..).unwrap(); + assert_eq!( + recv_transaction.message.recent_blockhash(), + expected_id, + "{name}" ); - - recv_socket - .set_nonblocking(expected_ids.is_empty()) - .unwrap(); - - let mut packets = vec![Packet::default(); 2]; - let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); - assert_eq!(num_received, expected_ids.len(), "{name}"); - for (i, expected_id) in expected_ids.iter().enumerate() { - assert_eq!(packets[i].meta().size, 215); - let recv_transaction: VersionedTransaction = - packets[i].deserialize_slice(..).unwrap(); - assert_eq!( - recv_transaction.message.recent_blockhash(), - expected_id, - "{name}" - ); - } - - let num_unprocessed_packets: usize = unprocessed_packet_batches.len(); - assert_eq!(num_unprocessed_packets, expected_num_unprocessed, "{name}"); } - exit.store(true, Ordering::Relaxed); - poh_service.join().unwrap(); + let num_unprocessed_packets: usize = unprocessed_packet_batches.len(); + assert_eq!(num_unprocessed_packets, expected_num_unprocessed, "{name}"); } - Blockstore::destroy(ledger_path.path()).unwrap(); + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); } }