From 013e1d9d498af374eaaa2ad83070b107de63b948 Mon Sep 17 00:00:00 2001 From: sakridge Date: Tue, 21 Sep 2021 18:49:41 +0300 Subject: [PATCH] Limit transaction forwarding from banking_stage (#19940) --- core/src/banking_stage.rs | 73 ++++++++++++++++++++++++++++- gossip/src/cluster_info.rs | 2 +- gossip/src/lib.rs | 1 - {gossip => perf}/src/data_budget.rs | 4 +- perf/src/lib.rs | 1 + 5 files changed, 75 insertions(+), 6 deletions(-) rename {gossip => perf}/src/data_budget.rs (96%) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 31a8cb0a43..0de26fb645 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -13,6 +13,7 @@ use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info}; use solana_perf::{ cuda_runtime::PinnedVec, + data_budget::DataBudget, packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH}, perf_libs, }; @@ -305,6 +306,7 @@ impl BankingStage { LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default(), ))); + let data_budget = Arc::new(DataBudget::default()); // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { @@ -322,6 +324,7 @@ impl BankingStage { let gossip_vote_sender = gossip_vote_sender.clone(); let duplicates = duplicates.clone(); let cost_tracker = cost_tracker.clone(); + let data_budget = data_budget.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -337,6 +340,7 @@ impl BankingStage { gossip_vote_sender, &duplicates, &cost_tracker, + &data_budget, ); }) .unwrap() @@ -360,11 +364,21 @@ impl BankingStage { socket: &std::net::UdpSocket, tpu_forwards: &std::net::SocketAddr, unprocessed_packets: &UnprocessedPackets, + data_budget: &DataBudget, ) -> std::io::Result<()> { let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets.iter()); inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); + const INTERVAL_MS: u64 = 100; + const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; + const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; + const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5; + data_budget.update(INTERVAL_MS, |bytes| { + std::cmp::min(bytes + MAX_BYTES_PER_INTERVAL, MAX_BYTES_BUDGET) + }); for p in packets { - socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?; + if data_budget.take(p.meta.size) { + socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?; + } } Ok(()) @@ -559,6 +573,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, cost_tracker: &Arc>, + data_budget: &DataBudget, ) -> BufferedPacketsDecision { let bank_start; let ( @@ -618,6 +633,7 @@ impl BankingStage { poh_recorder, socket, false, + data_budget, ); } BufferedPacketsDecision::ForwardAndHold => { @@ -628,6 +644,7 @@ impl BankingStage { poh_recorder, socket, true, + data_budget, ); } _ => (), @@ -642,6 +659,7 @@ impl BankingStage { poh_recorder: &Arc>, socket: &UdpSocket, hold: bool, + data_budget: &DataBudget, ) { if !enable_forwarding { if !hold { @@ -654,7 +672,7 @@ impl BankingStage { Some(addr) => addr, None => return, }; - let _ = Self::forward_buffered_packets(socket, &addr, buffered_packets); + let _ = Self::forward_buffered_packets(socket, &addr, buffered_packets, data_budget); if hold { buffered_packets.retain(|(_, index, _)| !index.is_empty()); for (_, _, forwarded) in buffered_packets.iter_mut() { @@ -678,6 +696,7 @@ impl BankingStage { gossip_vote_sender: ReplayVoteSender, duplicates: &Arc, PacketHasher)>>, cost_tracker: &Arc>, + data_budget: &DataBudget, ) { let recorder = poh_recorder.lock().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -698,6 +717,7 @@ impl BankingStage { &banking_stage_stats, &recorder, cost_tracker, + data_budget, ); if matches!(decision, BufferedPacketsDecision::Hold) || matches!(decision, BufferedPacketsDecision::ForwardAndHold) @@ -2872,6 +2892,55 @@ mod tests { Blockstore::destroy(&ledger_path).unwrap(); } + #[test] + fn test_forwarder_budget() { + solana_logger::setup(); + // Create `Packets` with 1 unprocessed element + let single_element_packets = Packets::new(vec![Packet::default()]); + let mut unprocessed_packets: UnprocessedPackets = + vec![(single_element_packets, vec![0], false)] + .into_iter() + .collect(); + + let cluster_info = new_test_cluster_info(Node::new_localhost().info); + + let genesis_config_info = create_slow_genesis_config(10_000); + let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; + + let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config)); + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Arc::new( + Blockstore::open(&ledger_path) + .expect("Expected to be able to open database ledger"), + ); + let poh_config = PohConfig { + // limit tick count to avoid clearing working_bank at + // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + target_tick_count: Some(bank.max_tick_height() - 1), + ..PohConfig::default() + }; + + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blockstore, Some(poh_config)); + + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let data_budget = DataBudget::default(); + BankingStage::handle_forwarding( + true, + &cluster_info, + &mut unprocessed_packets, + &poh_recorder, + &socket, + false, + &data_budget, + ); + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + #[test] fn test_push_unprocessed_batch_limit() { solana_logger::setup(); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 9fc81244bb..47f3592fbc 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -26,7 +26,6 @@ use { self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance, SnapshotHash, Version, Vote, MAX_WALLCLOCK, }, - data_budget::DataBudget, epoch_slots::EpochSlots, gossip_error::GossipError, ping_pong::{self, PingCache, Pong}, @@ -45,6 +44,7 @@ use { bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, multi_bind_in_range, PortRange, }, + solana_perf::data_budget::DataBudget, solana_perf::packet::{ limited_deserialize, to_packets_with_destination, Packet, Packets, PacketsRecycler, PACKET_DATA_SIZE, diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index 1c5f7ccecd..bbb8c5be05 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -13,7 +13,6 @@ pub mod crds_gossip_pull; pub mod crds_gossip_push; pub mod crds_shards; pub mod crds_value; -pub mod data_budget; pub mod deprecated; pub mod duplicate_shred; pub mod epoch_slots; diff --git a/gossip/src/data_budget.rs b/perf/src/data_budget.rs similarity index 96% rename from gossip/src/data_budget.rs rename to perf/src/data_budget.rs index cab20bd6fd..24eb0bb84e 100644 --- a/gossip/src/data_budget.rs +++ b/perf/src/data_budget.rs @@ -21,7 +21,7 @@ impl DataBudget { } match self.bytes.compare_exchange_weak( budget, - budget - size, + budget.saturating_sub(size), Ordering::AcqRel, Ordering::Acquire, ) { @@ -37,7 +37,7 @@ impl DataBudget { let now = solana_sdk::timing::timestamp(); let mut last_timestamp = self.last_timestamp_ms.load(Ordering::Acquire); loop { - if now < last_timestamp + duration_millis { + if now < last_timestamp.saturating_add(duration_millis) { return false; } match self.last_timestamp_ms.compare_exchange_weak( diff --git a/perf/src/lib.rs b/perf/src/lib.rs index 82feefaa70..d27bb6fb96 100644 --- a/perf/src/lib.rs +++ b/perf/src/lib.rs @@ -1,4 +1,5 @@ pub mod cuda_runtime; +pub mod data_budget; pub mod packet; pub mod perf_libs; pub mod recycler;