diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 24a45c99c6..72e3aa8903 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -298,6 +298,28 @@ impl RepairProtocol { | Self::AncestorHashes { .. } => true, } } + + fn max_response_packets(&self) -> usize { + match self { + RepairProtocol::WindowIndex { .. } + | RepairProtocol::LegacyWindowIndexWithNonce(_, _, _, _) + | RepairProtocol::HighestWindowIndex { .. } + | RepairProtocol::LegacyHighestWindowIndexWithNonce(_, _, _, _) + | RepairProtocol::AncestorHashes { .. } + | RepairProtocol::LegacyAncestorHashes(_, _, _) => 1, + RepairProtocol::Orphan { .. } | RepairProtocol::LegacyOrphanWithNonce(_, _, _) => { + MAX_ORPHAN_REPAIR_RESPONSES + } + RepairProtocol::Pong(_) => 0, // no response + RepairProtocol::LegacyWindowIndex(_, _, _) + | RepairProtocol::LegacyHighestWindowIndex(_, _, _) + | RepairProtocol::LegacyOrphan(_, _) => 0, // unsupported + } + } + + fn max_response_bytes(&self) -> usize { + self.max_response_packets() * PACKET_DATA_SIZE + } } #[derive(Clone)] @@ -917,17 +939,17 @@ impl ServeRepair { let identity_keypair = self.cluster_info.keypair().clone(); let mut pending_pings = Vec::default(); - let requests_len = requests.len(); - for ( - i, - RepairRequestWithMeta { - request, - from_addr, - stake, - .. - }, - ) in requests.into_iter().enumerate() + for RepairRequestWithMeta { + request, + from_addr, + stake, + .. + } in requests.into_iter() { + if !data_budget.check(request.max_response_bytes()) { + stats.dropped_requests_outbound_bandwidth += 1; + continue; + } if !matches!(&request, RepairProtocol::Pong(_)) { let (check, ping_pkt) = Self::check_ping_cache(ping_cache, &request, &from_addr, &identity_keypair); @@ -958,9 +980,8 @@ impl ServeRepair { false => stats.total_response_bytes_unstaked += num_response_bytes, } } else { - stats.dropped_requests_outbound_bandwidth += requests_len - i; + stats.dropped_requests_outbound_bandwidth += 1; stats.total_dropped_response_packets += num_response_packets; - break; } } diff --git a/perf/src/data_budget.rs b/perf/src/data_budget.rs index 40bd559a2c..939627e3fc 100644 --- a/perf/src/data_budget.rs +++ b/perf/src/data_budget.rs @@ -81,6 +81,11 @@ impl DataBudget { } self.bytes.load(Ordering::Acquire) } + + #[must_use] + pub fn check(&self, size: usize) -> bool { + size <= self.bytes.load(Ordering::Acquire) + } } #[cfg(test)]