From b1d9a2e60e5045b9b99b14e576254e02ff1237c6 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Wed, 29 Dec 2021 12:34:31 -0600 Subject: [PATCH] Don't forward packets received from TPU forwards port (#22078) * Don't forward packets received from TPU forwards port * Add banking stage test --- core/src/banking_stage.rs | 204 ++++++++++++++++++++++++++++----- core/src/commitment_service.rs | 6 +- core/src/fetch_stage.rs | 16 ++- core/src/shred_fetch_stage.rs | 2 +- core/src/vote_simulator.rs | 2 +- perf/src/data_budget.rs | 8 ++ program-test/src/lib.rs | 1 + rpc/src/cluster_tpu_info.rs | 6 +- rpc/src/rpc.rs | 1 + runtime/src/bank.rs | 6 +- runtime/src/bank_forks.rs | 2 +- runtime/src/genesis_utils.rs | 6 +- sdk/src/packet.rs | 2 +- 13 files changed, 209 insertions(+), 53 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 6e32b6d4c..0034577bf 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -392,24 +392,35 @@ impl BankingStage { data_budget: &DataBudget, ) -> std::io::Result<()> { let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); - inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); const INTERVAL_MS: u64 = 100; const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5; data_budget.update(INTERVAL_MS, |bytes| { - std::cmp::min(bytes + MAX_BYTES_PER_INTERVAL, MAX_BYTES_BUDGET) + std::cmp::min( + bytes.saturating_add(MAX_BYTES_PER_INTERVAL), + MAX_BYTES_BUDGET, + ) }); - let mut packet_vec = Vec::with_capacity(packets.len()); - for p in packets { - if data_budget.take(p.meta.size) { - packet_vec.push((&p.data[..p.meta.size], tpu_forwards)); + let packet_vec: Vec<_> = packets + .iter() + .filter_map(|p| { + if !p.meta.forwarded && data_budget.take(p.meta.size) { + Some((&p.data[..p.meta.size], tpu_forwards)) + } else { + None + } + }) + .collect(); + + if !packet_vec.is_empty() { + inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec.len()); + if let Err(SendPktsError::IoError(ioerr, _num_failed)) = batch_send(socket, &packet_vec) + { + return Err(ioerr); } } - if let Err(SendPktsError::IoError(ioerr, _num_failed)) = batch_send(socket, &packet_vec) { - return Err(ioerr); - } Ok(()) } @@ -1502,7 +1513,7 @@ mod tests { system_transaction, transaction::{Transaction, TransactionError}, }, - solana_streamer::socket::SocketAddrSpace, + solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace}, solana_transaction_status::TransactionWithStatusMeta, solana_vote_program::vote_transaction, std::{ @@ -2763,16 +2774,15 @@ mod tests { fn test_forwarder_budget() { solana_logger::setup(); // Create `PacketBatch` with 1 unprocessed packet - let single_packet_batch = PacketBatch::new(vec![Packet::default()]); - let mut unprocessed_packets: UnprocessedPacketBatches = - vec![(single_packet_batch, vec![0], false)] - .into_iter() - .collect(); - - let cluster_info = new_test_cluster_info(Node::new_localhost().info); + let packet = Packet::from_data(None, &[0]).unwrap(); + let single_packet_batch = PacketBatch::new(vec![packet]); let genesis_config_info = create_slow_genesis_config(10_000); - let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; + let GenesisConfigInfo { + genesis_config, + validator_pubkey, + .. + } = &genesis_config_info; let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config)); let ledger_path = get_tmp_ledger_path!(); @@ -2791,17 +2801,155 @@ mod tests { let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank, &blockstore, Some(poh_config)); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let data_budget = DataBudget::default(); - BankingStage::handle_forwarding( - &ForwardOption::ForwardTransaction, - &cluster_info, - &mut unprocessed_packets, - &poh_recorder, - &socket, - false, - &data_budget, + let local_node = Node::new_localhost_with_pubkey(validator_pubkey); + let cluster_info = new_test_cluster_info(local_node.info); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let recv_socket = &local_node.sockets.tpu_forwards[0]; + + let test_cases = vec![ + ("budget-restricted", DataBudget::restricted(), 0), + ("budget-available", DataBudget::default(), 1), + ]; + + for (name, data_budget, expected_num_forwarded) in test_cases { + let mut unprocessed_packet_batches: UnprocessedPacketBatches = + vec![(single_packet_batch.clone(), vec![0], false)] + .into_iter() + .collect(); + BankingStage::handle_forwarding( + &ForwardOption::ForwardTransaction, + &cluster_info, + &mut unprocessed_packet_batches, + &poh_recorder, + &send_socket, + true, + &data_budget, + ); + + recv_socket + .set_nonblocking(expected_num_forwarded == 0) + .unwrap(); + + let mut packets = vec![Packet::default(); 2]; + let (_, num_received) = + recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + assert_eq!(num_received, expected_num_forwarded, "{}", name); + } + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + + #[test] + fn test_handle_forwarding() { + solana_logger::setup(); + + const FWD_PACKET: u8 = 1; + let forwarded_packet = { + let mut packet = Packet::from_data(None, &[FWD_PACKET]).unwrap(); + packet.meta.forwarded = true; + packet + }; + + const NORMAL_PACKET: u8 = 2; + let normal_packet = Packet::from_data(None, &[NORMAL_PACKET]).unwrap(); + + let packet_batch = PacketBatch::new(vec![forwarded_packet, normal_packet]); + let mut unprocessed_packet_batches: UnprocessedPacketBatches = + vec![(packet_batch, vec![0, 1], false)] + .into_iter() + .collect(); + + let genesis_config_info = create_slow_genesis_config(10_000); + let GenesisConfigInfo { + genesis_config, + validator_pubkey, + .. + } = &genesis_config_info; + let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config)); + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Arc::new( + Blockstore::open(&ledger_path) + .expect("Expected to be able to open database ledger"), ); + let poh_config = PohConfig { + // limit tick count to avoid clearing working_bank at + // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + target_tick_count: Some(bank.max_tick_height() - 1), + ..PohConfig::default() + }; + + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blockstore, Some(poh_config)); + + let local_node = Node::new_localhost_with_pubkey(validator_pubkey); + let cluster_info = new_test_cluster_info(local_node.info); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let recv_socket = &local_node.sockets.tpu_forwards[0]; + + let test_cases = vec![ + ("not-forward", ForwardOption::NotForward, true, vec![], 2), + ( + "fwd-normal", + ForwardOption::ForwardTransaction, + true, + vec![NORMAL_PACKET], + 2, + ), + ( + "fwd-no-op", + ForwardOption::ForwardTransaction, + true, + vec![], + 2, + ), + ( + "fwd-no-hold", + ForwardOption::ForwardTransaction, + false, + vec![], + 0, + ), + ]; + + for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases { + BankingStage::handle_forwarding( + &forward_option, + &cluster_info, + &mut unprocessed_packet_batches, + &poh_recorder, + &send_socket, + hold, + &DataBudget::default(), + ); + + recv_socket + .set_nonblocking(expected_ids.is_empty()) + .unwrap(); + + let mut packets = vec![Packet::default(); 2]; + let (_, num_received) = + recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + assert_eq!(num_received, expected_ids.len(), "{}", name); + for (i, expected_id) in expected_ids.iter().enumerate() { + assert_eq!(packets[i].meta.size, 1); + assert_eq!(packets[i].data[0], *expected_id, "{}", name); + } + + let num_unprocessed_packets: usize = unprocessed_packet_batches + .iter() + .map(|(b, ..)| b.packets.len()) + .sum(); + assert_eq!( + num_unprocessed_packets, expected_num_unprocessed, + "{}", + name + ); + } + exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); } diff --git a/core/src/commitment_service.rs b/core/src/commitment_service.rs index fe425fa2c..dcdc769f7 100644 --- a/core/src/commitment_service.rs +++ b/core/src/commitment_service.rs @@ -503,11 +503,7 @@ mod tests { let validator_vote_keypairs = ValidatorVoteKeypairs::new_rand(); let validator_keypairs = vec![&validator_vote_keypairs]; - let GenesisConfigInfo { - genesis_config, - mint_keypair: _, - voting_keypair: _, - } = create_genesis_config_with_vote_accounts( + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, vec![100; 1], diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 9a52e2f6b..1e78a0ddc 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -8,7 +8,7 @@ use { solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, solana_poh::poh_recorder::PohRecorder, - solana_sdk::clock::DEFAULT_TICKS_PER_SLOT, + solana_sdk::{clock::DEFAULT_TICKS_PER_SLOT, packet::Packet}, solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender}, std::{ net::UdpSocket, @@ -83,10 +83,16 @@ impl FetchStage { sendr: &PacketBatchSender, poh_recorder: &Arc>, ) -> Result<()> { - let packet_batch = recvr.recv()?; + let mark_forwarded = |packet: &mut Packet| { + packet.meta.forwarded = true; + }; + + let mut packet_batch = recvr.recv()?; let mut num_packets = packet_batch.packets.len(); + packet_batch.packets.iter_mut().for_each(mark_forwarded); let mut packet_batches = vec![packet_batch]; - while let Ok(packet_batch) = recvr.try_recv() { + while let Ok(mut packet_batch) = recvr.try_recv() { + packet_batch.packets.iter_mut().for_each(mark_forwarded); num_packets += packet_batch.packets.len(); packet_batches.push(packet_batch); // Read at most 1K transactions in a loop @@ -115,7 +121,7 @@ impl FetchStage { } fn new_multi_socket( - sockets: Vec>, + tpu_sockets: Vec>, tpu_forwards_sockets: Vec>, tpu_vote_sockets: Vec>, exit: &Arc, @@ -126,7 +132,7 @@ impl FetchStage { ) -> Self { let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024); - let tpu_threads = sockets.into_iter().map(|socket| { + let tpu_threads = tpu_sockets.into_iter().map(|socket| { streamer::receiver( socket, exit, diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 8554116dd..787ae5c07 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -192,7 +192,7 @@ impl ShredFetchStage { recycler.clone(), bank_forks.clone(), "shred_fetch_tvu_forwards", - |p| p.meta.forward = true, + |p| p.meta.forwarded = true, ); let (repair_receiver, repair_handler) = Self::packet_modifier( diff --git a/core/src/vote_simulator.rs b/core/src/vote_simulator.rs index 0c9e4ca74..c3cb9286a 100644 --- a/core/src/vote_simulator.rs +++ b/core/src/vote_simulator.rs @@ -343,7 +343,7 @@ pub fn initialize_state( let GenesisConfigInfo { mut genesis_config, mint_keypair, - voting_keypair: _, + .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, diff --git a/perf/src/data_budget.rs b/perf/src/data_budget.rs index 24eb0bb84..4c35fc6ce 100644 --- a/perf/src/data_budget.rs +++ b/perf/src/data_budget.rs @@ -10,6 +10,14 @@ pub struct DataBudget { } impl DataBudget { + /// Create a data budget with max bytes, used for tests + pub fn restricted() -> Self { + Self { + bytes: AtomicUsize::default(), + last_timestamp_ms: AtomicU64::new(u64::MAX), + } + } + // If there are enough bytes in the budget, consumes from // the budget and returns true. Otherwise returns false. #[must_use] diff --git a/program-test/src/lib.rs b/program-test/src/lib.rs index 9a8cfedfc..62a9fc84d 100644 --- a/program-test/src/lib.rs +++ b/program-test/src/lib.rs @@ -773,6 +773,7 @@ impl ProgramTest { genesis_config, mint_keypair, voting_keypair, + validator_pubkey: bootstrap_validator_pubkey, }, ) } diff --git a/rpc/src/cluster_tpu_info.rs b/rpc/src/cluster_tpu_info.rs index cf567bd5a..5421bc7b2 100644 --- a/rpc/src/cluster_tpu_info.rs +++ b/rpc/src/cluster_tpu_info.rs @@ -91,11 +91,7 @@ mod test { &validator_vote_keypairs1, &validator_vote_keypairs2, ]; - let GenesisConfigInfo { - genesis_config, - mint_keypair: _, - voting_keypair: _, - } = create_genesis_config_with_vote_accounts( + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, vec![10_000; 3], diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 7a9738400..9c127c4d7 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -6302,6 +6302,7 @@ pub mod tests { mut genesis_config, mint_keypair, voting_keypair, + .. } = create_genesis_config(TEST_MINT_LAMPORTS); genesis_config.rent.lamports_per_byte_year = 50; diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index a9c86e8f8..27c9cca56 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -14300,11 +14300,7 @@ pub(crate) mod tests { let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand(); let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand(); let validator_keypairs = vec![&validator_vote_keypairs0, &validator_vote_keypairs1]; - let GenesisConfigInfo { - genesis_config, - mint_keypair: _, - voting_keypair: _, - } = create_genesis_config_with_vote_accounts( + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, vec![10_000; 2], diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index a0b119b63..a34b17772 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -601,8 +601,8 @@ mod tests { let leader_keypair = Keypair::new(); let GenesisConfigInfo { mut genesis_config, - mint_keypair: _, voting_keypair, + .. } = create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1_000); let slots_in_epoch = 32; genesis_config.epoch_schedule = EpochSchedule::new(slots_in_epoch); diff --git a/runtime/src/genesis_utils.rs b/runtime/src/genesis_utils.rs index 71222fdf4..7db7a4f8a 100644 --- a/runtime/src/genesis_utils.rs +++ b/runtime/src/genesis_utils.rs @@ -52,6 +52,7 @@ pub struct GenesisConfigInfo { pub genesis_config: GenesisConfig, pub mint_keypair: Keypair, pub voting_keypair: Keypair, + pub validator_pubkey: Pubkey, } pub fn create_genesis_config(mint_lamports: u64) -> GenesisConfigInfo { @@ -84,10 +85,11 @@ pub fn create_genesis_config_with_vote_accounts_and_cluster_type( let voting_keypair = Keypair::from_bytes(&voting_keypairs[0].borrow().vote_keypair.to_bytes()).unwrap(); + let validator_pubkey = voting_keypairs[0].borrow().node_keypair.pubkey(); let genesis_config = create_genesis_config_with_leader_ex( mint_lamports, &mint_keypair.pubkey(), - &voting_keypairs[0].borrow().node_keypair.pubkey(), + &validator_pubkey, &voting_keypairs[0].borrow().vote_keypair.pubkey(), &voting_keypairs[0].borrow().stake_keypair.pubkey(), stakes[0], @@ -102,6 +104,7 @@ pub fn create_genesis_config_with_vote_accounts_and_cluster_type( genesis_config, mint_keypair, voting_keypair, + validator_pubkey, }; for (validator_voting_keypairs, stake) in voting_keypairs[1..].iter().zip(&stakes[1..]) { @@ -159,6 +162,7 @@ pub fn create_genesis_config_with_leader( genesis_config, mint_keypair, voting_keypair, + validator_pubkey: *validator_pubkey, } } diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 27435b3ae..19b0bae99 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -18,7 +18,7 @@ pub const PACKET_DATA_SIZE: usize = 1280 - 40 - 8; #[repr(C)] pub struct Meta { pub size: usize, - pub forward: bool, + pub forwarded: bool, pub repair: bool, pub discard: bool, pub addr: [u16; 8],