diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 3b8fccb45a..b3cd01f828 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -579,7 +579,8 @@ impl BankingStage { }; const INTERVAL_MS: u64 = 100; - const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; + // 12 MB outbound limit per second + const MAX_BYTES_PER_SECOND: usize = 12_000_000; const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5; data_budget.update(INTERVAL_MS, |bytes| { diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 56039cb8e6..5271aff0fa 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -2,7 +2,6 @@ use { crate::{ cluster_slots::ClusterSlots, duplicate_repair_status::ANCESTOR_HASH_REPAIR_SAMPLE_SIZE, - packet_threshold::DynamicPacketToProcessThreshold, repair_response, repair_service::{OutstandingShredRepairs, RepairStats}, request_response::RequestResponse, @@ -25,7 +24,10 @@ use { shred::{Nonce, Shred, SIZE_OF_NONCE}, }, solana_metrics::inc_new_counter_debug, - solana_perf::packet::{PacketBatch, PacketBatchRecycler}, + solana_perf::{ + data_budget::DataBudget, + packet::{PacketBatch, PacketBatchRecycler}, + }, solana_sdk::{ clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms, }, @@ -144,7 +146,9 @@ impl RequestResponse for AncestorHashesRepairType { pub struct ServeRepairStats { pub total_requests: usize, pub dropped_requests: usize, + pub total_dropped_response_packets: usize, pub total_response_packets: usize, + pub total_response_bytes: usize, pub processed: usize, pub self_repair: usize, pub window_index: usize, @@ -323,17 +327,18 @@ impl ServeRepair { requests_receiver: &PacketBatchReceiver, response_sender: &PacketBatchSender, stats: &mut ServeRepairStats, - packet_threshold: &mut DynamicPacketToProcessThreshold, + data_budget: &DataBudget, ) -> Result<()> { //TODO cache connections let timeout = Duration::new(1, 0); let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?]; + const MAX_REQUESTS_PER_ITERATION: usize = 1024; let mut total_requests = reqs_v[0].len(); let mut dropped_requests = 0; while let Ok(more) = requests_receiver.try_recv() { total_requests += more.len(); - if packet_threshold.should_drop(total_requests) { + if total_requests > MAX_REQUESTS_PER_ITERATION { dropped_requests += more.len(); } else { reqs_v.push(more); @@ -343,11 +348,17 @@ impl ServeRepair { stats.dropped_requests += dropped_requests; stats.total_requests += total_requests; - let timer = Instant::now(); for reqs in reqs_v { - Self::handle_packets(obj, recycler, blockstore, reqs, response_sender, stats); + Self::handle_packets( + obj, + recycler, + blockstore, + reqs, + response_sender, + stats, + data_budget, + ); } - packet_threshold.update(total_requests, timer.elapsed()); Ok(()) } @@ -365,7 +376,13 @@ impl ServeRepair { "serve_repair-requests_received", ("total_requests", stats.total_requests, i64), ("dropped_requests", stats.dropped_requests, i64), + ( + "total_dropped_response_packets", + stats.total_dropped_response_packets, + i64 + ), ("total_response_packets", stats.total_response_packets, i64), + ("total_response_bytes", stats.total_response_bytes, i64), ("self_repair", stats.self_repair, i64), ("window_index", stats.window_index, i64), ( @@ -391,6 +408,10 @@ impl ServeRepair { response_sender: PacketBatchSender, exit: &Arc, ) -> JoinHandle<()> { + const INTERVAL_MS: u64 = 1000; + const MAX_BYTES_PER_SECOND: usize = 12_000_000; + const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; + let exit = exit.clone(); let recycler = PacketBatchRecycler::default(); Builder::new() @@ -398,7 +419,7 @@ impl ServeRepair { .spawn(move || { let mut last_print = Instant::now(); let mut stats = ServeRepairStats::default(); - let mut packet_threshold = DynamicPacketToProcessThreshold::default(); + let data_budget = DataBudget::default(); loop { let result = Self::run_listen( &me, @@ -407,7 +428,7 @@ impl ServeRepair { &requests_receiver, &response_sender, &mut stats, - &mut packet_threshold, + &data_budget, ); match result { Err(Error::RecvTimeout(_)) | Ok(_) => {} @@ -420,6 +441,7 @@ impl ServeRepair { Self::report_reset_stats(&me, &mut stats); last_print = Instant::now(); } + data_budget.update(INTERVAL_MS, |_bytes| MAX_BYTES_PER_INTERVAL); } }) .unwrap() @@ -432,19 +454,31 @@ impl ServeRepair { packet_batch: PacketBatch, response_sender: &PacketBatchSender, stats: &mut ServeRepairStats, + data_budget: &DataBudget, ) { // iter over the packets - packet_batch.iter().for_each(|packet| { + for (i, packet) in packet_batch.iter().enumerate() { if let Ok(request) = packet.deserialize_slice(..) { stats.processed += 1; let from_addr = packet.meta.socket_addr(); - let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats); - stats.total_response_packets += rsp.as_ref().map(PacketBatch::len).unwrap_or(0); - if let Some(rsp) = rsp { - let _ignore_disconnect = response_sender.send(rsp); + let rsp = + match Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats) + { + None => continue, + Some(rsp) => rsp, + }; + let num_response_packets = rsp.len(); + let num_response_bytes = rsp.iter().map(|p| p.meta.size).sum(); + if data_budget.take(num_response_bytes) && response_sender.send(rsp).is_ok() { + stats.total_response_bytes += num_response_bytes; + stats.total_response_packets += num_response_packets; + } else { + stats.dropped_requests += packet_batch.len() - i; + stats.total_dropped_response_packets += num_response_packets; + break; } } - }); + } } fn window_index_request_bytes(