From 6a46c0128b60c56e42ddc4f29b3d7d07bdccdd72 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 11 Apr 2024 09:32:31 -0600 Subject: [PATCH] v1.17: BankingStage Forwarding Filter (backport of #685) (#696) * BankingStage Forwarding Filter (#685) * add PacketFlags::FROM_STAKED_NODE * Only forward packets from staked node * fix local-cluster test forwarding * review comment * tpu_votes get marked as from_staked_node (cherry picked from commit 1744e9efd74d83aeb15b384a8174949dbe753172) # Conflicts: # sdk/src/packet.rs * resolve conflict * remove test_ledger_cleanup_service * Revert "remove test_ledger_cleanup_service" This reverts commit 68a580d716151dfc0424d70d53f973e8d7dc400b. * revert: local-cluster test changes --------- Co-authored-by: Andrew Fitzgerald Co-authored-by: Trent Nelson --- bench-streamer/src/main.rs | 1 + core/src/banking_stage/forwarder.rs | 1 + core/src/fetch_stage.rs | 3 +++ core/src/repair/ancestor_hashes_service.rs | 2 ++ core/src/repair/serve_repair_service.rs | 1 + core/src/shred_fetch_stage.rs | 1 + gossip/src/gossip_service.rs | 1 + sdk/src/packet.rs | 12 ++++++++++++ streamer/src/nonblocking/quic.rs | 1 + streamer/src/streamer.rs | 9 ++++++++- 10 files changed, 31 insertions(+), 1 deletion(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 987df41134..a65e1f8a72 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -116,6 +116,7 @@ fn main() -> Result<()> { Duration::from_millis(1), // coalesce true, None, + false, )); } diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 1cb656f0dd..ebc352f115 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -161,6 +161,7 @@ impl Forwarder { self.update_data_budget(); let packet_vec: Vec<_> = forwardable_packets .filter(|p| !p.meta().forwarded()) + .filter(|p| p.meta().is_from_staked_node()) .filter(|p| self.data_budget.take(p.meta().size)) .filter_map(|p| p.data(..).map(|data| data.to_vec())) .collect(); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index b3eb36201f..7535d2a9ed 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -169,6 +169,7 @@ impl FetchStage { coalesce, true, in_vote_only_mode.clone(), + false, // unstaked connections ) }) .collect() @@ -190,6 +191,7 @@ impl FetchStage { coalesce, true, in_vote_only_mode.clone(), + false, // unstaked connections ) }) .collect() @@ -210,6 +212,7 @@ impl FetchStage { coalesce, true, None, + true, // only staked connections should be voting ) }) .collect(); diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index fc70dbab16..a1d1b9bb1c 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -170,6 +170,7 @@ impl AncestorHashesService { Duration::from_millis(1), // coalesce false, // use_pinned_memory None, // in_vote_only_mode + false, // is_staked_service ); let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded(); @@ -1299,6 +1300,7 @@ mod test { Duration::from_millis(1), // coalesce false, None, + false, ); let (remote_request_sender, remote_request_receiver) = unbounded(); let t_packet_adapter = Builder::new() diff --git a/core/src/repair/serve_repair_service.rs b/core/src/repair/serve_repair_service.rs index 9819d0ea43..e49de60840 100644 --- a/core/src/repair/serve_repair_service.rs +++ b/core/src/repair/serve_repair_service.rs @@ -46,6 +46,7 @@ impl ServeRepairService { Duration::from_millis(1), // coalesce false, // use_pinned_memory None, // in_vote_only_mode + false, // is_staked_service ); let t_packet_adapter = Builder::new() .name(String::from("solServRAdapt")) diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 703167b0b4..8bcf0e06b8 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -156,6 +156,7 @@ impl ShredFetchStage { PACKET_COALESCE_DURATION, true, // use_pinned_memory None, // in_vote_only_mode + false, ) }) .collect(); diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index b587a5e067..03ade4e709 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -58,6 +58,7 @@ impl GossipService { Duration::from_millis(1), // coalesce false, None, + false, ); let (consume_sender, listen_receiver) = unbounded(); let t_socket_consume = cluster_info.clone().start_socket_consume_thread( diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index faea9ab475..276ba58da1 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -33,6 +33,8 @@ bitflags! { /// the packet is built. /// This field can be removed when the above feature gate is adopted by mainnet-beta. const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000; + /// For marking packets from staked nodes + const FROM_STAKED_NODE = 0b1000_0000; } } @@ -213,6 +215,11 @@ impl Meta { self.port = socket_addr.port(); } + pub fn set_from_staked_node(&mut self, from_staked_node: bool) { + self.flags + .set(PacketFlags::FROM_STAKED_NODE, from_staked_node); + } + #[inline] pub fn discard(&self) -> bool { self.flags.contains(PacketFlags::DISCARD) @@ -265,6 +272,11 @@ impl Meta { pub fn round_compute_unit_price(&self) -> bool { self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE) } + + #[inline] + pub fn is_from_staked_node(&self) -> bool { + self.flags.contains(PacketFlags::FROM_STAKED_NODE) + } } impl Default for Meta { diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index fc585a6399..7c1ac060d0 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -908,6 +908,7 @@ async fn handle_chunk( if packet_accum.is_none() { let mut meta = Meta::default(); meta.set_socket_addr(remote_addr); + meta.set_from_staked_node(matches!(peer_type, ConnectionPeerType::Staked)); *packet_accum = Some(PacketAccumulator { meta, chunks: Vec::new(), diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 1fd7bfc974..3c89d2574e 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -110,6 +110,7 @@ fn recv_loop( coalesce: Duration, use_pinned_memory: bool, in_vote_only_mode: Option>, + is_staked_service: bool, ) -> Result<()> { loop { let mut packet_batch = if use_pinned_memory { @@ -147,7 +148,9 @@ fn recv_loop( if len == PACKETS_PER_BATCH { full_packet_batches_count.fetch_add(1, Ordering::Relaxed); } - + packet_batch + .iter_mut() + .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service)); packet_batch_sender.send(packet_batch)?; } break; @@ -156,6 +159,7 @@ fn recv_loop( } } +#[allow(clippy::too_many_arguments)] pub fn receiver( socket: Arc, exit: Arc, @@ -165,6 +169,7 @@ pub fn receiver( coalesce: Duration, use_pinned_memory: bool, in_vote_only_mode: Option>, + is_staked_service: bool, ) -> JoinHandle<()> { let res = socket.set_read_timeout(Some(Duration::new(1, 0))); assert!(res.is_ok(), "streamer::receiver set_read_timeout error"); @@ -180,6 +185,7 @@ pub fn receiver( coalesce, use_pinned_memory, in_vote_only_mode, + is_staked_service, ); }) .unwrap() @@ -488,6 +494,7 @@ mod test { Duration::from_millis(1), // coalesce true, None, + false, ); const NUM_PACKETS: usize = 5; let t_responder = {