diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index fa7b814a5e..fe76d80404 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -258,6 +258,25 @@ pub enum RepairProtocol { }, } +const REPAIR_REQUEST_PONG_SERIALIZED_BYTES: usize = PUBKEY_BYTES + HASH_BYTES + SIGNATURE_BYTES; +const REPAIR_REQUEST_MIN_BYTES: usize = REPAIR_REQUEST_PONG_SERIALIZED_BYTES; + +fn discard_malformed_repair_requests( + batch: &mut PacketBatch, + stats: &mut ServeRepairStats, +) -> usize { + let mut well_formed_requests = 0; + for packet in batch.iter_mut() { + if packet.meta().size < REPAIR_REQUEST_MIN_BYTES { + stats.err_malformed += 1; + packet.meta_mut().set_discard(true); + } else { + well_formed_requests += 1; + } + } + well_formed_requests +} + #[derive(Debug, AbiEnumVisitor, AbiExample, Deserialize, Serialize)] #[frozen_abi(digest = "CkffjyMPCwuJgk9NiCMELXLCecAnTPZqpKEnUCb3VyVf")] pub(crate) enum RepairResponse { @@ -603,7 +622,12 @@ impl ServeRepair { } result.ok() }; - reqs_v.iter().flatten().filter_map(decode_packet).collect() + reqs_v + .iter() + .flatten() + .filter(|packet| !packet.meta().discard()) + .filter_map(decode_packet) + .collect() } /// Process messages from the network @@ -637,12 +661,20 @@ impl ServeRepair { }; let mut dropped_requests = 0; - while let Ok(more) = requests_receiver.try_recv() { + let mut well_formed_requests = discard_malformed_repair_requests(&mut reqs_v[0], stats); + for mut more in requests_receiver.try_iter() { total_requests += more.len(); - if total_requests > max_buffered_packets { + if well_formed_requests > max_buffered_packets { + // Already exceeded max. Don't waste time discarding dropped_requests += more.len(); - } else { + continue; + } + let retained = discard_malformed_repair_requests(&mut more, stats); + well_formed_requests += retained; + if retained > 0 && well_formed_requests <= max_buffered_packets { reqs_v.push(more); + } else { + dropped_requests += more.len(); } } @@ -1402,6 +1434,82 @@ mod tests { } } + fn repair_request_header_for_tests() -> RepairRequestHeader { + RepairRequestHeader { + signature: Signature::default(), + sender: Pubkey::default(), + recipient: Pubkey::default(), + timestamp: timestamp(), + nonce: Nonce::default(), + } + } + + #[test] + fn test_check_well_formed_repair_request() { + let mut rng = rand::thread_rng(); + let keypair = Keypair::new(); + let ping = ping_pong::Ping::<[u8; 32]>::new_rand(&mut rng, &keypair).unwrap(); + let pong = Pong::new(&ping, &keypair).unwrap(); + let request = RepairProtocol::Pong(pong); + let mut pkt = Packet::from_data(None, &request).unwrap(); + let mut batch = PacketBatch::new(vec![pkt.clone()]); + let mut stats = ServeRepairStats::default(); + let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats); + assert_eq!(num_well_formed, 1); + pkt.meta_mut().size = 5; + let mut batch = PacketBatch::new(vec![pkt]); + let mut stats = ServeRepairStats::default(); + let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats); + assert_eq!(num_well_formed, 0); + assert_eq!(stats.err_malformed, 1); + + let request = RepairProtocol::WindowIndex { + header: repair_request_header_for_tests(), + slot: 123, + shred_index: 456, + }; + let mut pkt = Packet::from_data(None, &request).unwrap(); + let mut batch = PacketBatch::new(vec![pkt.clone()]); + let mut stats = ServeRepairStats::default(); + let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats); + assert_eq!(num_well_formed, 1); + pkt.meta_mut().size = 8; + let mut batch = PacketBatch::new(vec![pkt]); + let mut stats = ServeRepairStats::default(); + let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats); + assert_eq!(num_well_formed, 0); + assert_eq!(stats.err_malformed, 1); + + let request = RepairProtocol::AncestorHashes { + header: repair_request_header_for_tests(), + slot: 123, + }; + let mut pkt = Packet::from_data(None, &request).unwrap(); + let mut batch = PacketBatch::new(vec![pkt.clone()]); + let mut stats = ServeRepairStats::default(); + let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats); + assert_eq!(num_well_formed, 1); + pkt.meta_mut().size = 1; + let mut batch = PacketBatch::new(vec![pkt]); + let mut stats = ServeRepairStats::default(); + let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats); + assert_eq!(num_well_formed, 0); + assert_eq!(stats.err_malformed, 1); + + let request = RepairProtocol::LegacyOrphan(LegacyContactInfo::default(), 123); + let mut pkt = Packet::from_data(None, &request).unwrap(); + let mut batch = PacketBatch::new(vec![pkt.clone()]); + let mut stats = ServeRepairStats::default(); + let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats); + assert_eq!(num_well_formed, 1); + pkt.meta_mut().size = 3; + let mut batch = PacketBatch::new(vec![pkt]); + let mut stats = ServeRepairStats::default(); + let num_well_formed = discard_malformed_repair_requests(&mut batch, &mut stats); + assert_eq!(num_well_formed, 0); + assert_eq!(stats.err_malformed, 1); + } + #[test] fn test_serialize_deserialize_signed_request() { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);