diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index dd3e3eb23a..9804ba8be2 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -14,7 +14,7 @@ use { dashmap::{mapref::entry::Entry::Occupied, DashMap}, solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE}, solana_perf::{ - packet::{limited_deserialize, Packet, PacketBatch}, + packet::{Packet, PacketBatch}, recycler::Recycler, }, solana_runtime::bank::Bank, @@ -322,55 +322,54 @@ impl AncestorHashesService { blockstore: &Blockstore, ) -> Option<(Slot, DuplicateAncestorDecision)> { let from_addr = packet.meta.socket_addr(); - limited_deserialize(&packet.data[..packet.meta.size.saturating_sub(SIZE_OF_NONCE)]) - .ok() - .and_then(|ancestor_hashes_response| { - // Verify the response - let request_slot = repair_response::nonce(&packet.data[..packet.meta.size]) - .and_then(|nonce| { - outstanding_requests.write().unwrap().register_response( - nonce, - &ancestor_hashes_response, - timestamp(), - // If the response is valid, return the slot the request - // was for - |ancestor_hashes_request| ancestor_hashes_request.0, - ) - }); + let ancestor_hashes_response = packet + .deserialize_slice(..packet.meta.size.saturating_sub(SIZE_OF_NONCE)) + .ok()?; - if request_slot.is_none() { - stats.invalid_packets += 1; - return None; - } + // Verify the response + let request_slot = repair_response::nonce(packet).and_then(|nonce| { + outstanding_requests.write().unwrap().register_response( + nonce, + &ancestor_hashes_response, + timestamp(), + // If the response is valid, return the slot the request + // was for + |ancestor_hashes_request| ancestor_hashes_request.0, + ) + }); - // If was a valid response, there must be a valid `request_slot` - let request_slot = request_slot.unwrap(); - stats.processed += 1; + if request_slot.is_none() { + stats.invalid_packets += 1; + return None; + } - if let Occupied(mut ancestor_hashes_status_ref) = - ancestor_hashes_request_statuses.entry(request_slot) - { - let decision = ancestor_hashes_status_ref.get_mut().add_response( - &from_addr, - ancestor_hashes_response.into_slot_hashes(), - blockstore, - ); - if decision.is_some() { - // Once a request is completed, remove it from the map so that new - // requests for the same slot can be made again if necessary. It's - // important to hold the `write` lock here via - // `ancestor_hashes_status_ref` so that we don't race with deletion + - // insertion from the `t_ancestor_requests` thread, which may - // 1) Remove expired statuses from `ancestor_hashes_request_statuses` - // 2) Insert another new one via `manage_ancestor_requests()`. - // In which case we wouldn't want to delete the newly inserted entry here. - ancestor_hashes_status_ref.remove(); - } - decision.map(|decision| (request_slot, decision)) - } else { - None - } - }) + // If was a valid response, there must be a valid `request_slot` + let request_slot = request_slot.unwrap(); + stats.processed += 1; + + if let Occupied(mut ancestor_hashes_status_ref) = + ancestor_hashes_request_statuses.entry(request_slot) + { + let decision = ancestor_hashes_status_ref.get_mut().add_response( + &from_addr, + ancestor_hashes_response.into_slot_hashes(), + blockstore, + ); + if decision.is_some() { + // Once a request is completed, remove it from the map so that new + // requests for the same slot can be made again if necessary. It's + // important to hold the `write` lock here via + // `ancestor_hashes_status_ref` so that we don't race with deletion + + // insertion from the `t_ancestor_requests` thread, which may + // 1) Remove expired statuses from `ancestor_hashes_request_statuses` + // 2) Insert another new one via `manage_ancestor_requests()`. + // In which case we wouldn't want to delete the newly inserted entry here. + ancestor_hashes_status_ref.remove(); + } + decision.map(|decision| (request_slot, decision)) + } else { + None + } } fn handle_ancestor_request_decision( diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index b57e05d624..748bd60091 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -2179,7 +2179,7 @@ mod tests { get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, }, - solana_perf::packet::{limited_deserialize, to_packet_batches, PacketFlags}, + solana_perf::packet::{to_packet_batches, PacketFlags}, solana_poh::{ poh_recorder::{create_test_recorder, Record, WorkingBankEntry}, poh_service::PohService, @@ -4185,7 +4185,7 @@ mod tests { for (i, expected_id) in expected_ids.iter().enumerate() { assert_eq!(packets[i].meta.size, 215); let recv_transaction: VersionedTransaction = - limited_deserialize(&packets[i].data[0..packets[i].meta.size]).unwrap(); + packets[i].deserialize_slice(..).unwrap(); assert_eq!( recv_transaction.message.recent_blockhash(), expected_id, diff --git a/core/src/repair_response.rs b/core/src/repair_response.rs index ddc5973ade..e8f8347f3e 100644 --- a/core/src/repair_response.rs +++ b/core/src/repair_response.rs @@ -3,7 +3,6 @@ use { blockstore::Blockstore, shred::{Nonce, SIZE_OF_NONCE}, }, - solana_perf::packet::limited_deserialize, solana_sdk::{clock::Slot, packet::Packet}, std::{io, net::SocketAddr}, }; @@ -40,12 +39,9 @@ pub fn repair_response_packet_from_bytes( Some(packet) } -pub fn nonce(buf: &[u8]) -> Option { - if buf.len() < SIZE_OF_NONCE { - None - } else { - limited_deserialize(&buf[buf.len() - SIZE_OF_NONCE..]).ok() - } +pub fn nonce(packet: &Packet) -> Option { + let nonce_start = packet.meta.size.checked_sub(SIZE_OF_NONCE)?; + packet.deserialize_slice(nonce_start..).ok() } #[cfg(test)] diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 6f110c9487..b51d47e709 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -25,7 +25,7 @@ use { shred::{Nonce, Shred, SIZE_OF_NONCE}, }, solana_metrics::inc_new_counter_debug, - solana_perf::packet::{limited_deserialize, PacketBatch, PacketBatchRecycler}, + solana_perf::packet::{PacketBatch, PacketBatchRecycler}, solana_sdk::{ clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms, }, @@ -432,17 +432,14 @@ impl ServeRepair { ) { // iter over the packets packet_batch.iter().for_each(|packet| { - let from_addr = packet.meta.socket_addr(); - limited_deserialize(&packet.data[..packet.meta.size]) - .into_iter() - .for_each(|request| { - stats.processed += 1; - let rsp = - Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats); - if let Some(rsp) = rsp { - let _ignore_disconnect = response_sender.send(rsp); - } - }); + if let Ok(request) = packet.deserialize_slice(..) { + stats.processed += 1; + let from_addr = packet.meta.socket_addr(); + let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats); + if let Some(rsp) = rsp { + let _ignore_disconnect = response_sender.send(rsp); + } + } }); } @@ -815,9 +812,9 @@ mod tests { let rv: Vec = rv .into_iter() - .filter_map(|b| { - assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce); - Shred::new_from_serialized_shred(b.data.to_vec()).ok() + .filter_map(|p| { + assert_eq!(repair_response::nonce(p).unwrap(), nonce); + Shred::new_from_serialized_shred(p.data.to_vec()).ok() }) .collect(); assert!(!rv.is_empty()); @@ -899,9 +896,9 @@ mod tests { verify_responses(&request, rv.iter()); let rv: Vec = rv .into_iter() - .filter_map(|b| { - assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce); - Shred::new_from_serialized_shred(b.data.to_vec()).ok() + .filter_map(|p| { + assert_eq!(repair_response::nonce(p).unwrap(), nonce); + Shred::new_from_serialized_shred(p.data.to_vec()).ok() }) .collect(); assert_eq!(rv[0].index(), 1); @@ -1148,7 +1145,9 @@ mod tests { #[test] fn test_run_ancestor_hashes() { fn deserialize_ancestor_hashes_response(packet: &Packet) -> AncestorHashesResponseVersion { - limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE]).unwrap() + packet + .deserialize_slice(..packet.meta.size - SIZE_OF_NONCE) + .unwrap() } solana_logger::setup(); diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index edf62358cf..3d44f60cdc 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -1,6 +1,6 @@ use { min_max_heap::MinMaxHeap, - solana_perf::packet::{limited_deserialize, Packet, PacketBatch}, + solana_perf::packet::{Packet, PacketBatch}, solana_program_runtime::compute_budget::ComputeBudget, solana_sdk::{ hash::Hash, @@ -91,8 +91,7 @@ impl DeserializedPacket { packet: Packet, priority: Option, ) -> Result { - let versioned_transaction: VersionedTransaction = - limited_deserialize(&packet.data[0..packet.meta.size])?; + let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?; let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?; let message_bytes = packet_message(&packet)?; let message_hash = Message::hash_raw_message(message_bytes); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 004e3c1cb3..4998daec4b 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -376,7 +376,7 @@ where let repair_info = RepairMeta { _from_addr: packet.meta.socket_addr(), // If can't parse the nonce, dump the packet. - nonce: repair_response::nonce(&packet.data)?, + nonce: repair_response::nonce(packet)?, }; Some((shred, Some(repair_info))) } else { diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 3aec6dd833..eb13d93691 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -53,7 +53,7 @@ use { }, solana_perf::{ data_budget::DataBudget, - packet::{limited_deserialize, Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, + packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, }, solana_rayon_threadlimit::get_thread_count, solana_runtime::{bank_forks::BankForks, vote_parser}, @@ -2466,8 +2466,7 @@ impl ClusterInfo { .packets_received_count .add_relaxed(packets.len() as u64); let verify_packet = |packet: Packet| { - let data = &packet.data[..packet.meta.size]; - let protocol: Protocol = limited_deserialize(data).ok()?; + let protocol: Protocol = packet.deserialize_slice(..).ok()?; protocol.sanitize().ok()?; let protocol = protocol.par_verify(&self.stats)?; Some((packet.meta.socket_addr(), protocol)) @@ -3235,7 +3234,7 @@ mod tests { ) { assert_eq!(packet.meta.socket_addr(), socket); let bytes = serialize(&pong).unwrap(); - match limited_deserialize(&packet.data[..packet.meta.size]).unwrap() { + match packet.deserialize_slice(..).unwrap() { Protocol::PongMessage(pong) => assert_eq!(serialize(&pong).unwrap(), bytes), _ => panic!("invalid packet!"), } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 094dc08627..d65defdcda 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -56,7 +56,7 @@ use { num_enum::{IntoPrimitive, TryFromPrimitive}, serde::{Deserialize, Serialize}, solana_entry::entry::{create_ticks, Entry}, - solana_perf::packet::{limited_deserialize, Packet}, + solana_perf::packet::Packet, solana_sdk::{ clock::Slot, hash::{hashv, Hash}, @@ -478,9 +478,10 @@ impl Shred { } // Get slot from a shred packet with partial deserialize - pub fn get_slot_from_packet(packet: &Packet) -> Option { - let buffer = packet.data.get(OFFSET_OF_SHRED_SLOT..)?; - limited_deserialize(buffer).ok() + pub fn get_slot_from_packet(p: &Packet) -> Option { + let slot_start = OFFSET_OF_SHRED_SLOT; + let slot_end = slot_start + SIZE_OF_SHRED_SLOT; + p.deserialize_slice(slot_start..slot_end).ok() } pub(crate) fn reference_tick_from_data(data: &[u8]) -> Result { @@ -553,7 +554,7 @@ pub fn get_shred_slot_index_type( return None; } - let index = match limited_deserialize::(&p.data[index_start..index_end]) { + let index = match p.deserialize_slice(index_start..index_end) { Ok(x) => x, Err(_e) => { stats.index_bad_deserialize += 1; @@ -566,7 +567,7 @@ pub fn get_shred_slot_index_type( return None; } - let slot = match limited_deserialize::(&p.data[slot_start..slot_end]) { + let slot = match p.deserialize_slice(slot_start..slot_end) { Ok(x) => x, Err(_e) => { stats.slot_bad_deserialize += 1; diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index e83c1de1b3..1b903920e0 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -12,7 +12,7 @@ use { solana_metrics::inc_new_counter_debug, solana_perf::{ cuda_runtime::PinnedVec, - packet::{limited_deserialize, Packet, PacketBatch}, + packet::{Packet, PacketBatch}, perf_libs, recycler_cache::RecyclerCache, sigverify::{self, count_packets_in_batches, TxOffset}, @@ -54,10 +54,7 @@ pub fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) return Some(0); } trace!("slot start and end {} {}", slot_start, slot_end); - if packet.meta.size < slot_end { - return Some(0); - } - let slot: u64 = limited_deserialize(&packet.data[slot_start..slot_end]).ok()?; + let slot: u64 = packet.deserialize_slice(slot_start..slot_end).ok()?; let msg_end = if packet.meta.repair() { packet.meta.size.saturating_sub(SIZE_OF_NONCE) } else { @@ -115,16 +112,17 @@ fn slot_key_data_for_gpu< batch .iter() .map(|packet| { - let slot_start = size_of::() + size_of::(); - let slot_end = slot_start + size_of::(); - if packet.meta.size < slot_end || packet.meta.discard() { - return std::u64::MAX; + if packet.meta.discard() { + return Slot::MAX; } - let slot: Option = - limited_deserialize(&packet.data[slot_start..slot_end]).ok(); + + let slot_start = size_of::() + size_of::(); + let slot_end = slot_start + size_of::(); + let slot: Option = + packet.deserialize_slice(slot_start..slot_end).ok(); match slot { Some(slot) if slot_keys.get(&slot).is_some() => slot, - _ => std::u64::MAX, + _ => Slot::MAX, } }) .collect() diff --git a/perf/src/packet.rs b/perf/src/packet.rs index f8bbefe26a..a103bab1d5 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -229,17 +229,6 @@ pub fn to_packet_batches_for_tests(items: &[T]) -> Vec(data: &[u8]) -> bincode::Result -where - T: serde::de::DeserializeOwned, -{ - bincode::options() - .with_limit(PACKET_DATA_SIZE as u64) - .with_fixint_encoding() - .allow_trailing_bytes() - .deserialize_from(data) -} - pub fn deserialize_from_with_limit(reader: R) -> bincode::Result where R: Read, diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 671d810b01..5866f192dd 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -1,5 +1,5 @@ use { - bincode::Result, + bincode::{Options, Result}, bitflags::bitflags, serde::Serialize, std::{ @@ -67,6 +67,20 @@ impl Packet { } Ok(()) } + + pub fn deserialize_slice(&self, index: I) -> Result + where + T: serde::de::DeserializeOwned, + I: std::slice::SliceIndex<[u8], Output = [u8]>, + { + let data = &self.data[0..self.meta.size]; + let bytes = data.get(index).ok_or(bincode::ErrorKind::SizeLimit)?; + bincode::options() + .with_limit(PACKET_DATA_SIZE as u64) + .with_fixint_encoding() + .reject_trailing_bytes() + .deserialize(bytes) + } } impl fmt::Debug for Packet { @@ -150,3 +164,47 @@ impl Default for Meta { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_deserialize_slice() { + let p = Packet::from_data(None, u32::MAX).unwrap(); + assert_eq!(p.deserialize_slice(..).ok(), Some(u32::MAX)); + assert_eq!(p.deserialize_slice(0..4).ok(), Some(u32::MAX)); + assert_eq!( + p.deserialize_slice::(0..4) + .map_err(|e| e.to_string()), + Err("Slice had bytes remaining after deserialization".to_string()), + ); + assert_eq!( + p.deserialize_slice::(0..0) + .map_err(|e| e.to_string()), + Err("io error: unexpected end of file".to_string()), + ); + assert_eq!( + p.deserialize_slice::(0..1) + .map_err(|e| e.to_string()), + Err("io error: unexpected end of file".to_string()), + ); + assert_eq!( + p.deserialize_slice::(0..5) + .map_err(|e| e.to_string()), + Err("the size limit has been reached".to_string()), + ); + #[allow(clippy::reversed_empty_ranges)] + let reversed_empty_range = 4..0; + assert_eq!( + p.deserialize_slice::(reversed_empty_range) + .map_err(|e| e.to_string()), + Err("the size limit has been reached".to_string()), + ); + assert_eq!( + p.deserialize_slice::(4..5) + .map_err(|e| e.to_string()), + Err("the size limit has been reached".to_string()), + ); + } +} diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index fd809f987f..c528442d7b 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -9,8 +9,7 @@ use { }; pub use { solana_perf::packet::{ - limited_deserialize, to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS, - PACKETS_PER_BATCH, + to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS, PACKETS_PER_BATCH, }, solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}, };