limit repairs to top staked requests in batch (#28673)

This commit is contained in:
Jeff Biseda 2022-11-16 16:30:41 -08:00 committed by GitHub
parent ae48ac97dd
commit 17ee3349f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 86 additions and 59 deletions

View File

@ -32,6 +32,7 @@ use {
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::Slot,
genesis_config::ClusterType,
hash::{Hash, HASH_BYTES},
packet::PACKET_DATA_SIZE,
pubkey::{Pubkey, PUBKEY_BYTES},
@ -44,7 +45,8 @@ use {
streamer::{PacketBatchReceiver, PacketBatchSender},
},
std::{
collections::{HashMap, HashSet},
cmp::Reverse,
collections::HashSet,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
@ -153,6 +155,7 @@ struct ServeRepairStats {
unsigned_requests: usize,
dropped_requests_outbound_bandwidth: usize,
dropped_requests_load_shed: usize,
dropped_requests_low_stake: usize,
total_dropped_response_packets: usize,
total_response_packets: usize,
total_response_bytes_staked: usize,
@ -168,6 +171,7 @@ struct ServeRepairStats {
ancestor_hashes: usize,
ping_cache_check_failed: usize,
pings_sent: usize,
decode_time_us: u64,
err_time_skew: usize,
err_malformed: usize,
err_sig_verify: usize,
@ -442,10 +446,22 @@ impl ServeRepair {
const MAX_REQUESTS_PER_ITERATION: usize = 1024;
let mut total_requests = reqs_v[0].len();
let socket_addr_space = *self.cluster_info.socket_addr_space();
let root_bank = self.bank_forks.read().unwrap().root_bank();
let epoch_staked_nodes = root_bank.epoch_staked_nodes(root_bank.epoch());
let identity_keypair = self.cluster_info.keypair().clone();
let my_id = identity_keypair.pubkey();
let max_buffered_packets = if root_bank.cluster_type() == ClusterType::Testnet {
2 * MAX_REQUESTS_PER_ITERATION
} else {
MAX_REQUESTS_PER_ITERATION
};
let mut dropped_requests = 0;
while let Ok(more) = requests_receiver.try_recv() {
total_requests += more.len();
if total_requests > MAX_REQUESTS_PER_ITERATION {
if total_requests > max_buffered_packets {
dropped_requests += more.len();
} else {
reqs_v.push(more);
@ -455,20 +471,64 @@ impl ServeRepair {
stats.dropped_requests_load_shed += dropped_requests;
stats.total_requests += total_requests;
let root_bank = self.bank_forks.read().unwrap().root_bank();
let epoch_staked_nodes = root_bank.epoch_staked_nodes(root_bank.epoch());
for reqs in reqs_v {
self.handle_packets(
ping_cache,
recycler,
blockstore,
reqs,
response_sender,
stats,
data_budget,
&epoch_staked_nodes,
);
let decode_start = Instant::now();
let mut decoded_reqs = Vec::default();
for packet in reqs_v.iter().flatten() {
let request: RepairProtocol = match packet.deserialize_slice(..) {
Ok(request) => request,
Err(_) => {
stats.err_malformed += 1;
continue;
}
};
let from_addr = packet.meta.socket_addr();
if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) {
stats.err_malformed += 1;
continue;
}
if request.supports_signature() {
// collect stats for signature verification
Self::verify_signed_packet(&my_id, packet, &request, stats);
} else {
stats.unsigned_requests += 1;
}
if request.sender() == &my_id {
stats.self_repair += 1;
continue;
}
let stake = epoch_staked_nodes
.as_ref()
.and_then(|stakes| stakes.get(request.sender()))
.unwrap_or(&0);
if *stake == 0 {
stats.handle_requests_unstaked += 1;
} else {
stats.handle_requests_staked += 1;
}
decoded_reqs.push((request, from_addr, *stake));
}
stats.decode_time_us += decode_start.elapsed().as_micros() as u64;
if decoded_reqs.len() > MAX_REQUESTS_PER_ITERATION {
stats.dropped_requests_low_stake += decoded_reqs.len() - MAX_REQUESTS_PER_ITERATION;
decoded_reqs.sort_unstable_by_key(|(_, _, stake)| Reverse(*stake));
decoded_reqs.truncate(MAX_REQUESTS_PER_ITERATION);
}
self.handle_packets(
ping_cache,
recycler,
blockstore,
decoded_reqs,
response_sender,
stats,
data_budget,
);
Ok(())
}
@ -496,6 +556,11 @@ impl ServeRepair {
stats.dropped_requests_load_shed,
i64
),
(
"dropped_requests_low_stake",
stats.dropped_requests_low_stake,
i64
),
(
"total_dropped_response_packets",
stats.total_dropped_response_packets,
@ -539,6 +604,7 @@ impl ServeRepair {
i64
),
("pings_sent", stats.pings_sent, i64),
("decode_time_us", stats.decode_time_us, i64),
("err_time_skew", stats.err_time_skew, i64),
("err_malformed", stats.err_malformed, i64),
("err_sig_verify", stats.err_sig_verify, i64),
@ -709,54 +775,16 @@ impl ServeRepair {
ping_cache: &mut PingCache,
recycler: &PacketBatchRecycler,
blockstore: &Blockstore,
packet_batch: PacketBatch,
requests: Vec<(RepairProtocol, SocketAddr, /*stake*/ u64)>,
response_sender: &PacketBatchSender,
stats: &mut ServeRepairStats,
data_budget: &DataBudget,
epoch_staked_nodes: &Option<Arc<HashMap<Pubkey, u64>>>,
) {
let identity_keypair = self.cluster_info.keypair().clone();
let my_id = identity_keypair.pubkey();
let socket_addr_space = *self.cluster_info.socket_addr_space();
let mut pending_pings = Vec::default();
// iter over the packets
for (i, packet) in packet_batch.iter().enumerate() {
let request: RepairProtocol = match packet.deserialize_slice(..) {
Ok(request) => request,
Err(_) => {
stats.err_malformed += 1;
continue;
}
};
let from_addr = packet.meta.socket_addr();
if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) {
stats.err_malformed += 1;
continue;
}
let staked = epoch_staked_nodes
.as_ref()
.map(|nodes| nodes.contains_key(request.sender()))
.unwrap_or_default();
match staked {
true => stats.handle_requests_staked += 1,
false => stats.handle_requests_unstaked += 1,
}
if request.sender() == &my_id {
stats.self_repair += 1;
continue;
}
if request.supports_signature() {
// collect stats for signature verification
Self::verify_signed_packet(&my_id, packet, &request, stats);
} else {
stats.unsigned_requests += 1;
}
let requests_len = requests.len();
for (i, (request, from_addr, stake)) in requests.into_iter().enumerate() {
if !matches!(&request, RepairProtocol::Pong(_)) {
let (check, ping_pkt) =
Self::check_ping_cache(ping_cache, &request, &from_addr, &identity_keypair);
@ -768,7 +796,6 @@ impl ServeRepair {
stats.ping_cache_check_failed += 1;
}
}
stats.processed += 1;
let rsp = match Self::handle_repair(
recycler, &from_addr, blockstore, request, stats, ping_cache,
@ -780,12 +807,12 @@ impl ServeRepair {
let num_response_bytes = rsp.iter().map(|p| p.meta.size).sum();
if data_budget.take(num_response_bytes) && response_sender.send(rsp).is_ok() {
stats.total_response_packets += num_response_packets;
match staked {
match stake > 0 {
true => stats.total_response_bytes_staked += num_response_bytes,
false => stats.total_response_bytes_unstaked += num_response_bytes,
}
} else {
stats.dropped_requests_outbound_bandwidth += packet_batch.len() - i;
stats.dropped_requests_outbound_bandwidth += requests_len - i;
stats.total_dropped_response_packets += num_response_packets;
break;
}