v1.17: BankingStage Forwarding Filter (backport of #685) (#696)

* BankingStage Forwarding Filter (#685)

* add PacketFlags::FROM_STAKED_NODE

* Only forward packets from staked node

* fix local-cluster test forwarding

* review comment

* tpu_votes get marked as from_staked_node

(cherry picked from commit 1744e9efd74d83aeb15b384a8174949dbe753172)

# Conflicts:
#	sdk/src/packet.rs

* resolve conflict

* remove test_ledger_cleanup_service

* Revert "remove test_ledger_cleanup_service"

This reverts commit 68a580d716151dfc0424d70d53f973e8d7dc400b.

* revert: local-cluster test changes

---------

Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com>
Co-authored-by: Trent Nelson <trent@solana.com>
This commit is contained in:
mergify[bot] 2024-04-11 09:32:31 -06:00 committed by GitHub
parent c38894eed3
commit 6a46c0128b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 31 additions and 1 deletions

View File

@ -116,6 +116,7 @@ fn main() -> Result<()> {
Duration::from_millis(1), // coalesce
true,
None,
false,
));
}

View File

@ -161,6 +161,7 @@ impl Forwarder {
self.update_data_budget();
let packet_vec: Vec<_> = forwardable_packets
.filter(|p| !p.meta().forwarded())
.filter(|p| p.meta().is_from_staked_node())
.filter(|p| self.data_budget.take(p.meta().size))
.filter_map(|p| p.data(..).map(|data| data.to_vec()))
.collect();

View File

@ -169,6 +169,7 @@ impl FetchStage {
coalesce,
true,
in_vote_only_mode.clone(),
false, // unstaked connections
)
})
.collect()
@ -190,6 +191,7 @@ impl FetchStage {
coalesce,
true,
in_vote_only_mode.clone(),
false, // unstaked connections
)
})
.collect()
@ -210,6 +212,7 @@ impl FetchStage {
coalesce,
true,
None,
true, // only staked connections should be voting
)
})
.collect();

View File

@ -170,6 +170,7 @@ impl AncestorHashesService {
Duration::from_millis(1), // coalesce
false, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
);
let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded();
@ -1299,6 +1300,7 @@ mod test {
Duration::from_millis(1), // coalesce
false,
None,
false,
);
let (remote_request_sender, remote_request_receiver) = unbounded();
let t_packet_adapter = Builder::new()

View File

@ -46,6 +46,7 @@ impl ServeRepairService {
Duration::from_millis(1), // coalesce
false, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
);
let t_packet_adapter = Builder::new()
.name(String::from("solServRAdapt"))

View File

@ -156,6 +156,7 @@ impl ShredFetchStage {
PACKET_COALESCE_DURATION,
true, // use_pinned_memory
None, // in_vote_only_mode
false,
)
})
.collect();

View File

@ -58,6 +58,7 @@ impl GossipService {
Duration::from_millis(1), // coalesce
false,
None,
false,
);
let (consume_sender, listen_receiver) = unbounded();
let t_socket_consume = cluster_info.clone().start_socket_consume_thread(

View File

@ -33,6 +33,8 @@ bitflags! {
/// the packet is built.
/// This field can be removed when the above feature gate is adopted by mainnet-beta.
const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;
/// For marking packets from staked nodes
const FROM_STAKED_NODE = 0b1000_0000;
}
}
@ -213,6 +215,11 @@ impl Meta {
self.port = socket_addr.port();
}
pub fn set_from_staked_node(&mut self, from_staked_node: bool) {
self.flags
.set(PacketFlags::FROM_STAKED_NODE, from_staked_node);
}
#[inline]
pub fn discard(&self) -> bool {
self.flags.contains(PacketFlags::DISCARD)
@ -265,6 +272,11 @@ impl Meta {
pub fn round_compute_unit_price(&self) -> bool {
self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)
}
#[inline]
pub fn is_from_staked_node(&self) -> bool {
self.flags.contains(PacketFlags::FROM_STAKED_NODE)
}
}
impl Default for Meta {

View File

@ -908,6 +908,7 @@ async fn handle_chunk(
if packet_accum.is_none() {
let mut meta = Meta::default();
meta.set_socket_addr(remote_addr);
meta.set_from_staked_node(matches!(peer_type, ConnectionPeerType::Staked));
*packet_accum = Some(PacketAccumulator {
meta,
chunks: Vec::new(),

View File

@ -110,6 +110,7 @@ fn recv_loop(
coalesce: Duration,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
is_staked_service: bool,
) -> Result<()> {
loop {
let mut packet_batch = if use_pinned_memory {
@ -147,7 +148,9 @@ fn recv_loop(
if len == PACKETS_PER_BATCH {
full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
}
packet_batch
.iter_mut()
.for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
packet_batch_sender.send(packet_batch)?;
}
break;
@ -156,6 +159,7 @@ fn recv_loop(
}
}
#[allow(clippy::too_many_arguments)]
pub fn receiver(
socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
@ -165,6 +169,7 @@ pub fn receiver(
coalesce: Duration,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
is_staked_service: bool,
) -> JoinHandle<()> {
let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
@ -180,6 +185,7 @@ pub fn receiver(
coalesce,
use_pinned_memory,
in_vote_only_mode,
is_staked_service,
);
})
.unwrap()
@ -488,6 +494,7 @@ mod test {
Duration::from_millis(1), // coalesce
true,
None,
false,
);
const NUM_PACKETS: usize = 5;
let t_responder = {