diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 8373bfb23d..0b72fabf32 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -32,6 +32,7 @@ use { solana_runtime::bank_forks::BankForks, solana_sdk::{ clock::Slot, + genesis_config::ClusterType, hash::{Hash, HASH_BYTES}, packet::PACKET_DATA_SIZE, pubkey::{Pubkey, PUBKEY_BYTES}, @@ -44,7 +45,8 @@ use { streamer::{PacketBatchReceiver, PacketBatchSender}, }, std::{ - collections::{HashMap, HashSet}, + cmp::Reverse, + collections::HashSet, net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, @@ -153,6 +155,7 @@ struct ServeRepairStats { unsigned_requests: usize, dropped_requests_outbound_bandwidth: usize, dropped_requests_load_shed: usize, + dropped_requests_low_stake: usize, total_dropped_response_packets: usize, total_response_packets: usize, total_response_bytes_staked: usize, @@ -168,6 +171,7 @@ struct ServeRepairStats { ancestor_hashes: usize, ping_cache_check_failed: usize, pings_sent: usize, + decode_time_us: u64, err_time_skew: usize, err_malformed: usize, err_sig_verify: usize, @@ -442,10 +446,22 @@ impl ServeRepair { const MAX_REQUESTS_PER_ITERATION: usize = 1024; let mut total_requests = reqs_v[0].len(); + let socket_addr_space = *self.cluster_info.socket_addr_space(); + let root_bank = self.bank_forks.read().unwrap().root_bank(); + let epoch_staked_nodes = root_bank.epoch_staked_nodes(root_bank.epoch()); + let identity_keypair = self.cluster_info.keypair().clone(); + let my_id = identity_keypair.pubkey(); + + let max_buffered_packets = if root_bank.cluster_type() == ClusterType::Testnet { + 2 * MAX_REQUESTS_PER_ITERATION + } else { + MAX_REQUESTS_PER_ITERATION + }; + let mut dropped_requests = 0; while let Ok(more) = requests_receiver.try_recv() { total_requests += more.len(); - if total_requests > MAX_REQUESTS_PER_ITERATION { + if total_requests > max_buffered_packets { dropped_requests += more.len(); } else { reqs_v.push(more); @@ -455,20 +471,64 @@ impl ServeRepair { stats.dropped_requests_load_shed += dropped_requests; stats.total_requests += total_requests; - let root_bank = self.bank_forks.read().unwrap().root_bank(); - let epoch_staked_nodes = root_bank.epoch_staked_nodes(root_bank.epoch()); - for reqs in reqs_v { - self.handle_packets( - ping_cache, - recycler, - blockstore, - reqs, - response_sender, - stats, - data_budget, - &epoch_staked_nodes, - ); + let decode_start = Instant::now(); + let mut decoded_reqs = Vec::default(); + for packet in reqs_v.iter().flatten() { + let request: RepairProtocol = match packet.deserialize_slice(..) { + Ok(request) => request, + Err(_) => { + stats.err_malformed += 1; + continue; + } + }; + + let from_addr = packet.meta.socket_addr(); + if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) { + stats.err_malformed += 1; + continue; + } + + if request.supports_signature() { + // collect stats for signature verification + Self::verify_signed_packet(&my_id, packet, &request, stats); + } else { + stats.unsigned_requests += 1; + } + + if request.sender() == &my_id { + stats.self_repair += 1; + continue; + } + + let stake = epoch_staked_nodes + .as_ref() + .and_then(|stakes| stakes.get(request.sender())) + .unwrap_or(&0); + if *stake == 0 { + stats.handle_requests_unstaked += 1; + } else { + stats.handle_requests_staked += 1; + } + decoded_reqs.push((request, from_addr, *stake)); } + stats.decode_time_us += decode_start.elapsed().as_micros() as u64; + + if decoded_reqs.len() > MAX_REQUESTS_PER_ITERATION { + stats.dropped_requests_low_stake += decoded_reqs.len() - MAX_REQUESTS_PER_ITERATION; + decoded_reqs.sort_unstable_by_key(|(_, _, stake)| Reverse(*stake)); + decoded_reqs.truncate(MAX_REQUESTS_PER_ITERATION); + } + + self.handle_packets( + ping_cache, + recycler, + blockstore, + decoded_reqs, + response_sender, + stats, + data_budget, + ); + Ok(()) } @@ -496,6 +556,11 @@ impl ServeRepair { stats.dropped_requests_load_shed, i64 ), + ( + "dropped_requests_low_stake", + stats.dropped_requests_low_stake, + i64 + ), ( "total_dropped_response_packets", stats.total_dropped_response_packets, @@ -539,6 +604,7 @@ impl ServeRepair { i64 ), ("pings_sent", stats.pings_sent, i64), + ("decode_time_us", stats.decode_time_us, i64), ("err_time_skew", stats.err_time_skew, i64), ("err_malformed", stats.err_malformed, i64), ("err_sig_verify", stats.err_sig_verify, i64), @@ -709,54 +775,16 @@ impl ServeRepair { ping_cache: &mut PingCache, recycler: &PacketBatchRecycler, blockstore: &Blockstore, - packet_batch: PacketBatch, + requests: Vec<(RepairProtocol, SocketAddr, /*stake*/ u64)>, response_sender: &PacketBatchSender, stats: &mut ServeRepairStats, data_budget: &DataBudget, - epoch_staked_nodes: &Option>>, ) { let identity_keypair = self.cluster_info.keypair().clone(); - let my_id = identity_keypair.pubkey(); - let socket_addr_space = *self.cluster_info.socket_addr_space(); let mut pending_pings = Vec::default(); - // iter over the packets - for (i, packet) in packet_batch.iter().enumerate() { - let request: RepairProtocol = match packet.deserialize_slice(..) { - Ok(request) => request, - Err(_) => { - stats.err_malformed += 1; - continue; - } - }; - - let from_addr = packet.meta.socket_addr(); - if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) { - stats.err_malformed += 1; - continue; - } - - let staked = epoch_staked_nodes - .as_ref() - .map(|nodes| nodes.contains_key(request.sender())) - .unwrap_or_default(); - match staked { - true => stats.handle_requests_staked += 1, - false => stats.handle_requests_unstaked += 1, - } - - if request.sender() == &my_id { - stats.self_repair += 1; - continue; - } - - if request.supports_signature() { - // collect stats for signature verification - Self::verify_signed_packet(&my_id, packet, &request, stats); - } else { - stats.unsigned_requests += 1; - } - + let requests_len = requests.len(); + for (i, (request, from_addr, stake)) in requests.into_iter().enumerate() { if !matches!(&request, RepairProtocol::Pong(_)) { let (check, ping_pkt) = Self::check_ping_cache(ping_cache, &request, &from_addr, &identity_keypair); @@ -768,7 +796,6 @@ impl ServeRepair { stats.ping_cache_check_failed += 1; } } - stats.processed += 1; let rsp = match Self::handle_repair( recycler, &from_addr, blockstore, request, stats, ping_cache, @@ -780,12 +807,12 @@ impl ServeRepair { let num_response_bytes = rsp.iter().map(|p| p.meta.size).sum(); if data_budget.take(num_response_bytes) && response_sender.send(rsp).is_ok() { stats.total_response_packets += num_response_packets; - match staked { + match stake > 0 { true => stats.total_response_bytes_staked += num_response_bytes, false => stats.total_response_bytes_unstaked += num_response_bytes, } } else { - stats.dropped_requests_outbound_bandwidth += packet_batch.len() - i; + stats.dropped_requests_outbound_bandwidth += requests_len - i; stats.total_dropped_response_packets += num_response_packets; break; }