Enforce a 12MB limit on outbound repair (#26493)
This commit is contained in:
parent
a0e160b5aa
commit
f6d5b253fb
|
@ -579,7 +579,8 @@ impl BankingStage {
|
||||||
};
|
};
|
||||||
|
|
||||||
const INTERVAL_MS: u64 = 100;
|
const INTERVAL_MS: u64 = 100;
|
||||||
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
|
// 12 MB outbound limit per second
|
||||||
|
const MAX_BYTES_PER_SECOND: usize = 12_000_000;
|
||||||
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
|
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
|
||||||
const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5;
|
const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5;
|
||||||
data_budget.update(INTERVAL_MS, |bytes| {
|
data_budget.update(INTERVAL_MS, |bytes| {
|
||||||
|
|
|
@ -2,7 +2,6 @@ use {
|
||||||
crate::{
|
crate::{
|
||||||
cluster_slots::ClusterSlots,
|
cluster_slots::ClusterSlots,
|
||||||
duplicate_repair_status::ANCESTOR_HASH_REPAIR_SAMPLE_SIZE,
|
duplicate_repair_status::ANCESTOR_HASH_REPAIR_SAMPLE_SIZE,
|
||||||
packet_threshold::DynamicPacketToProcessThreshold,
|
|
||||||
repair_response,
|
repair_response,
|
||||||
repair_service::{OutstandingShredRepairs, RepairStats},
|
repair_service::{OutstandingShredRepairs, RepairStats},
|
||||||
request_response::RequestResponse,
|
request_response::RequestResponse,
|
||||||
|
@ -25,7 +24,10 @@ use {
|
||||||
shred::{Nonce, Shred, SIZE_OF_NONCE},
|
shred::{Nonce, Shred, SIZE_OF_NONCE},
|
||||||
},
|
},
|
||||||
solana_metrics::inc_new_counter_debug,
|
solana_metrics::inc_new_counter_debug,
|
||||||
solana_perf::packet::{PacketBatch, PacketBatchRecycler},
|
solana_perf::{
|
||||||
|
data_budget::DataBudget,
|
||||||
|
packet::{PacketBatch, PacketBatchRecycler},
|
||||||
|
},
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms,
|
clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms,
|
||||||
},
|
},
|
||||||
|
@ -144,7 +146,9 @@ impl RequestResponse for AncestorHashesRepairType {
|
||||||
pub struct ServeRepairStats {
|
pub struct ServeRepairStats {
|
||||||
pub total_requests: usize,
|
pub total_requests: usize,
|
||||||
pub dropped_requests: usize,
|
pub dropped_requests: usize,
|
||||||
|
pub total_dropped_response_packets: usize,
|
||||||
pub total_response_packets: usize,
|
pub total_response_packets: usize,
|
||||||
|
pub total_response_bytes: usize,
|
||||||
pub processed: usize,
|
pub processed: usize,
|
||||||
pub self_repair: usize,
|
pub self_repair: usize,
|
||||||
pub window_index: usize,
|
pub window_index: usize,
|
||||||
|
@ -323,17 +327,18 @@ impl ServeRepair {
|
||||||
requests_receiver: &PacketBatchReceiver,
|
requests_receiver: &PacketBatchReceiver,
|
||||||
response_sender: &PacketBatchSender,
|
response_sender: &PacketBatchSender,
|
||||||
stats: &mut ServeRepairStats,
|
stats: &mut ServeRepairStats,
|
||||||
packet_threshold: &mut DynamicPacketToProcessThreshold,
|
data_budget: &DataBudget,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
//TODO cache connections
|
//TODO cache connections
|
||||||
let timeout = Duration::new(1, 0);
|
let timeout = Duration::new(1, 0);
|
||||||
let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?];
|
let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?];
|
||||||
|
const MAX_REQUESTS_PER_ITERATION: usize = 1024;
|
||||||
let mut total_requests = reqs_v[0].len();
|
let mut total_requests = reqs_v[0].len();
|
||||||
|
|
||||||
let mut dropped_requests = 0;
|
let mut dropped_requests = 0;
|
||||||
while let Ok(more) = requests_receiver.try_recv() {
|
while let Ok(more) = requests_receiver.try_recv() {
|
||||||
total_requests += more.len();
|
total_requests += more.len();
|
||||||
if packet_threshold.should_drop(total_requests) {
|
if total_requests > MAX_REQUESTS_PER_ITERATION {
|
||||||
dropped_requests += more.len();
|
dropped_requests += more.len();
|
||||||
} else {
|
} else {
|
||||||
reqs_v.push(more);
|
reqs_v.push(more);
|
||||||
|
@ -343,11 +348,17 @@ impl ServeRepair {
|
||||||
stats.dropped_requests += dropped_requests;
|
stats.dropped_requests += dropped_requests;
|
||||||
stats.total_requests += total_requests;
|
stats.total_requests += total_requests;
|
||||||
|
|
||||||
let timer = Instant::now();
|
|
||||||
for reqs in reqs_v {
|
for reqs in reqs_v {
|
||||||
Self::handle_packets(obj, recycler, blockstore, reqs, response_sender, stats);
|
Self::handle_packets(
|
||||||
|
obj,
|
||||||
|
recycler,
|
||||||
|
blockstore,
|
||||||
|
reqs,
|
||||||
|
response_sender,
|
||||||
|
stats,
|
||||||
|
data_budget,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
packet_threshold.update(total_requests, timer.elapsed());
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -365,7 +376,13 @@ impl ServeRepair {
|
||||||
"serve_repair-requests_received",
|
"serve_repair-requests_received",
|
||||||
("total_requests", stats.total_requests, i64),
|
("total_requests", stats.total_requests, i64),
|
||||||
("dropped_requests", stats.dropped_requests, i64),
|
("dropped_requests", stats.dropped_requests, i64),
|
||||||
|
(
|
||||||
|
"total_dropped_response_packets",
|
||||||
|
stats.total_dropped_response_packets,
|
||||||
|
i64
|
||||||
|
),
|
||||||
("total_response_packets", stats.total_response_packets, i64),
|
("total_response_packets", stats.total_response_packets, i64),
|
||||||
|
("total_response_bytes", stats.total_response_bytes, i64),
|
||||||
("self_repair", stats.self_repair, i64),
|
("self_repair", stats.self_repair, i64),
|
||||||
("window_index", stats.window_index, i64),
|
("window_index", stats.window_index, i64),
|
||||||
(
|
(
|
||||||
|
@ -391,6 +408,10 @@ impl ServeRepair {
|
||||||
response_sender: PacketBatchSender,
|
response_sender: PacketBatchSender,
|
||||||
exit: &Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
|
const INTERVAL_MS: u64 = 1000;
|
||||||
|
const MAX_BYTES_PER_SECOND: usize = 12_000_000;
|
||||||
|
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
|
||||||
|
|
||||||
let exit = exit.clone();
|
let exit = exit.clone();
|
||||||
let recycler = PacketBatchRecycler::default();
|
let recycler = PacketBatchRecycler::default();
|
||||||
Builder::new()
|
Builder::new()
|
||||||
|
@ -398,7 +419,7 @@ impl ServeRepair {
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let mut last_print = Instant::now();
|
let mut last_print = Instant::now();
|
||||||
let mut stats = ServeRepairStats::default();
|
let mut stats = ServeRepairStats::default();
|
||||||
let mut packet_threshold = DynamicPacketToProcessThreshold::default();
|
let data_budget = DataBudget::default();
|
||||||
loop {
|
loop {
|
||||||
let result = Self::run_listen(
|
let result = Self::run_listen(
|
||||||
&me,
|
&me,
|
||||||
|
@ -407,7 +428,7 @@ impl ServeRepair {
|
||||||
&requests_receiver,
|
&requests_receiver,
|
||||||
&response_sender,
|
&response_sender,
|
||||||
&mut stats,
|
&mut stats,
|
||||||
&mut packet_threshold,
|
&data_budget,
|
||||||
);
|
);
|
||||||
match result {
|
match result {
|
||||||
Err(Error::RecvTimeout(_)) | Ok(_) => {}
|
Err(Error::RecvTimeout(_)) | Ok(_) => {}
|
||||||
|
@ -420,6 +441,7 @@ impl ServeRepair {
|
||||||
Self::report_reset_stats(&me, &mut stats);
|
Self::report_reset_stats(&me, &mut stats);
|
||||||
last_print = Instant::now();
|
last_print = Instant::now();
|
||||||
}
|
}
|
||||||
|
data_budget.update(INTERVAL_MS, |_bytes| MAX_BYTES_PER_INTERVAL);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
@ -432,19 +454,31 @@ impl ServeRepair {
|
||||||
packet_batch: PacketBatch,
|
packet_batch: PacketBatch,
|
||||||
response_sender: &PacketBatchSender,
|
response_sender: &PacketBatchSender,
|
||||||
stats: &mut ServeRepairStats,
|
stats: &mut ServeRepairStats,
|
||||||
|
data_budget: &DataBudget,
|
||||||
) {
|
) {
|
||||||
// iter over the packets
|
// iter over the packets
|
||||||
packet_batch.iter().for_each(|packet| {
|
for (i, packet) in packet_batch.iter().enumerate() {
|
||||||
if let Ok(request) = packet.deserialize_slice(..) {
|
if let Ok(request) = packet.deserialize_slice(..) {
|
||||||
stats.processed += 1;
|
stats.processed += 1;
|
||||||
let from_addr = packet.meta.socket_addr();
|
let from_addr = packet.meta.socket_addr();
|
||||||
let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats);
|
let rsp =
|
||||||
stats.total_response_packets += rsp.as_ref().map(PacketBatch::len).unwrap_or(0);
|
match Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats)
|
||||||
if let Some(rsp) = rsp {
|
{
|
||||||
let _ignore_disconnect = response_sender.send(rsp);
|
None => continue,
|
||||||
|
Some(rsp) => rsp,
|
||||||
|
};
|
||||||
|
let num_response_packets = rsp.len();
|
||||||
|
let num_response_bytes = rsp.iter().map(|p| p.meta.size).sum();
|
||||||
|
if data_budget.take(num_response_bytes) && response_sender.send(rsp).is_ok() {
|
||||||
|
stats.total_response_bytes += num_response_bytes;
|
||||||
|
stats.total_response_packets += num_response_packets;
|
||||||
|
} else {
|
||||||
|
stats.dropped_requests += packet_batch.len() - i;
|
||||||
|
stats.total_dropped_response_packets += num_response_packets;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn window_index_request_bytes(
|
fn window_index_request_bytes(
|
||||||
|
|
Loading…
Reference in New Issue