Add Packet::deserialize_slice convenience method

This commit is contained in:
Justin Starry 2022-05-24 11:15:20 +08:00
parent 79df1954eb
commit cad1c41ce2
12 changed files with 152 additions and 115 deletions

View File

@ -14,7 +14,7 @@ use {
dashmap::{mapref::entry::Entry::Occupied, DashMap}, dashmap::{mapref::entry::Entry::Occupied, DashMap},
solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE}, solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE},
solana_perf::{ solana_perf::{
packet::{limited_deserialize, Packet, PacketBatch}, packet::{Packet, PacketBatch},
recycler::Recycler, recycler::Recycler,
}, },
solana_runtime::bank::Bank, solana_runtime::bank::Bank,
@ -322,12 +322,12 @@ impl AncestorHashesService {
blockstore: &Blockstore, blockstore: &Blockstore,
) -> Option<(Slot, DuplicateAncestorDecision)> { ) -> Option<(Slot, DuplicateAncestorDecision)> {
let from_addr = packet.meta.socket_addr(); let from_addr = packet.meta.socket_addr();
limited_deserialize(&packet.data[..packet.meta.size.saturating_sub(SIZE_OF_NONCE)]) let ancestor_hashes_response = packet
.ok() .deserialize_slice(..packet.meta.size.saturating_sub(SIZE_OF_NONCE))
.and_then(|ancestor_hashes_response| { .ok()?;
// Verify the response // Verify the response
let request_slot = repair_response::nonce(&packet.data[..packet.meta.size]) let request_slot = repair_response::nonce(packet).and_then(|nonce| {
.and_then(|nonce| {
outstanding_requests.write().unwrap().register_response( outstanding_requests.write().unwrap().register_response(
nonce, nonce,
&ancestor_hashes_response, &ancestor_hashes_response,
@ -370,7 +370,6 @@ impl AncestorHashesService {
} else { } else {
None None
} }
})
} }
fn handle_ancestor_request_decision( fn handle_ancestor_request_decision(

View File

@ -2179,7 +2179,7 @@ mod tests {
get_tmp_ledger_path_auto_delete, get_tmp_ledger_path_auto_delete,
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
}, },
solana_perf::packet::{limited_deserialize, to_packet_batches, PacketFlags}, solana_perf::packet::{to_packet_batches, PacketFlags},
solana_poh::{ solana_poh::{
poh_recorder::{create_test_recorder, Record, WorkingBankEntry}, poh_recorder::{create_test_recorder, Record, WorkingBankEntry},
poh_service::PohService, poh_service::PohService,
@ -4185,7 +4185,7 @@ mod tests {
for (i, expected_id) in expected_ids.iter().enumerate() { for (i, expected_id) in expected_ids.iter().enumerate() {
assert_eq!(packets[i].meta.size, 215); assert_eq!(packets[i].meta.size, 215);
let recv_transaction: VersionedTransaction = let recv_transaction: VersionedTransaction =
limited_deserialize(&packets[i].data[0..packets[i].meta.size]).unwrap(); packets[i].deserialize_slice(..).unwrap();
assert_eq!( assert_eq!(
recv_transaction.message.recent_blockhash(), recv_transaction.message.recent_blockhash(),
expected_id, expected_id,

View File

@ -3,7 +3,6 @@ use {
blockstore::Blockstore, blockstore::Blockstore,
shred::{Nonce, SIZE_OF_NONCE}, shred::{Nonce, SIZE_OF_NONCE},
}, },
solana_perf::packet::limited_deserialize,
solana_sdk::{clock::Slot, packet::Packet}, solana_sdk::{clock::Slot, packet::Packet},
std::{io, net::SocketAddr}, std::{io, net::SocketAddr},
}; };
@ -40,12 +39,9 @@ pub fn repair_response_packet_from_bytes(
Some(packet) Some(packet)
} }
pub fn nonce(buf: &[u8]) -> Option<Nonce> { pub fn nonce(packet: &Packet) -> Option<Nonce> {
if buf.len() < SIZE_OF_NONCE { let nonce_start = packet.meta.size.checked_sub(SIZE_OF_NONCE)?;
None packet.deserialize_slice(nonce_start..).ok()
} else {
limited_deserialize(&buf[buf.len() - SIZE_OF_NONCE..]).ok()
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -25,7 +25,7 @@ use {
shred::{Nonce, Shred, SIZE_OF_NONCE}, shred::{Nonce, Shred, SIZE_OF_NONCE},
}, },
solana_metrics::inc_new_counter_debug, solana_metrics::inc_new_counter_debug,
solana_perf::packet::{limited_deserialize, PacketBatch, PacketBatchRecycler}, solana_perf::packet::{PacketBatch, PacketBatchRecycler},
solana_sdk::{ solana_sdk::{
clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms, clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms,
}, },
@ -432,17 +432,14 @@ impl ServeRepair {
) { ) {
// iter over the packets // iter over the packets
packet_batch.iter().for_each(|packet| { packet_batch.iter().for_each(|packet| {
let from_addr = packet.meta.socket_addr(); if let Ok(request) = packet.deserialize_slice(..) {
limited_deserialize(&packet.data[..packet.meta.size])
.into_iter()
.for_each(|request| {
stats.processed += 1; stats.processed += 1;
let rsp = let from_addr = packet.meta.socket_addr();
Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats); let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats);
if let Some(rsp) = rsp { if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp); let _ignore_disconnect = response_sender.send(rsp);
} }
}); }
}); });
} }
@ -815,9 +812,9 @@ mod tests {
let rv: Vec<Shred> = rv let rv: Vec<Shred> = rv
.into_iter() .into_iter()
.filter_map(|b| { .filter_map(|p| {
assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce); assert_eq!(repair_response::nonce(p).unwrap(), nonce);
Shred::new_from_serialized_shred(b.data.to_vec()).ok() Shred::new_from_serialized_shred(p.data.to_vec()).ok()
}) })
.collect(); .collect();
assert!(!rv.is_empty()); assert!(!rv.is_empty());
@ -899,9 +896,9 @@ mod tests {
verify_responses(&request, rv.iter()); verify_responses(&request, rv.iter());
let rv: Vec<Shred> = rv let rv: Vec<Shred> = rv
.into_iter() .into_iter()
.filter_map(|b| { .filter_map(|p| {
assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce); assert_eq!(repair_response::nonce(p).unwrap(), nonce);
Shred::new_from_serialized_shred(b.data.to_vec()).ok() Shred::new_from_serialized_shred(p.data.to_vec()).ok()
}) })
.collect(); .collect();
assert_eq!(rv[0].index(), 1); assert_eq!(rv[0].index(), 1);
@ -1148,7 +1145,9 @@ mod tests {
#[test] #[test]
fn test_run_ancestor_hashes() { fn test_run_ancestor_hashes() {
fn deserialize_ancestor_hashes_response(packet: &Packet) -> AncestorHashesResponseVersion { fn deserialize_ancestor_hashes_response(packet: &Packet) -> AncestorHashesResponseVersion {
limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE]).unwrap() packet
.deserialize_slice(..packet.meta.size - SIZE_OF_NONCE)
.unwrap()
} }
solana_logger::setup(); solana_logger::setup();

View File

@ -1,6 +1,6 @@
use { use {
min_max_heap::MinMaxHeap, min_max_heap::MinMaxHeap,
solana_perf::packet::{limited_deserialize, Packet, PacketBatch}, solana_perf::packet::{Packet, PacketBatch},
solana_program_runtime::compute_budget::ComputeBudget, solana_program_runtime::compute_budget::ComputeBudget,
solana_sdk::{ solana_sdk::{
hash::Hash, hash::Hash,
@ -91,8 +91,7 @@ impl DeserializedPacket {
packet: Packet, packet: Packet,
priority: Option<u64>, priority: Option<u64>,
) -> Result<Self, DeserializedPacketError> { ) -> Result<Self, DeserializedPacketError> {
let versioned_transaction: VersionedTransaction = let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
limited_deserialize(&packet.data[0..packet.meta.size])?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?; let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?; let message_bytes = packet_message(&packet)?;
let message_hash = Message::hash_raw_message(message_bytes); let message_hash = Message::hash_raw_message(message_bytes);

View File

@ -376,7 +376,7 @@ where
let repair_info = RepairMeta { let repair_info = RepairMeta {
_from_addr: packet.meta.socket_addr(), _from_addr: packet.meta.socket_addr(),
// If can't parse the nonce, dump the packet. // If can't parse the nonce, dump the packet.
nonce: repair_response::nonce(&packet.data)?, nonce: repair_response::nonce(packet)?,
}; };
Some((shred, Some(repair_info))) Some((shred, Some(repair_info)))
} else { } else {

View File

@ -53,7 +53,7 @@ use {
}, },
solana_perf::{ solana_perf::{
data_budget::DataBudget, data_budget::DataBudget,
packet::{limited_deserialize, Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE},
}, },
solana_rayon_threadlimit::get_thread_count, solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank_forks::BankForks, vote_parser}, solana_runtime::{bank_forks::BankForks, vote_parser},
@ -2466,8 +2466,7 @@ impl ClusterInfo {
.packets_received_count .packets_received_count
.add_relaxed(packets.len() as u64); .add_relaxed(packets.len() as u64);
let verify_packet = |packet: Packet| { let verify_packet = |packet: Packet| {
let data = &packet.data[..packet.meta.size]; let protocol: Protocol = packet.deserialize_slice(..).ok()?;
let protocol: Protocol = limited_deserialize(data).ok()?;
protocol.sanitize().ok()?; protocol.sanitize().ok()?;
let protocol = protocol.par_verify(&self.stats)?; let protocol = protocol.par_verify(&self.stats)?;
Some((packet.meta.socket_addr(), protocol)) Some((packet.meta.socket_addr(), protocol))
@ -3235,7 +3234,7 @@ mod tests {
) { ) {
assert_eq!(packet.meta.socket_addr(), socket); assert_eq!(packet.meta.socket_addr(), socket);
let bytes = serialize(&pong).unwrap(); let bytes = serialize(&pong).unwrap();
match limited_deserialize(&packet.data[..packet.meta.size]).unwrap() { match packet.deserialize_slice(..).unwrap() {
Protocol::PongMessage(pong) => assert_eq!(serialize(&pong).unwrap(), bytes), Protocol::PongMessage(pong) => assert_eq!(serialize(&pong).unwrap(), bytes),
_ => panic!("invalid packet!"), _ => panic!("invalid packet!"),
} }

View File

@ -56,7 +56,7 @@ use {
num_enum::{IntoPrimitive, TryFromPrimitive}, num_enum::{IntoPrimitive, TryFromPrimitive},
serde::{Deserialize, Serialize}, serde::{Deserialize, Serialize},
solana_entry::entry::{create_ticks, Entry}, solana_entry::entry::{create_ticks, Entry},
solana_perf::packet::{limited_deserialize, Packet}, solana_perf::packet::Packet,
solana_sdk::{ solana_sdk::{
clock::Slot, clock::Slot,
hash::{hashv, Hash}, hash::{hashv, Hash},
@ -478,9 +478,10 @@ impl Shred {
} }
// Get slot from a shred packet with partial deserialize // Get slot from a shred packet with partial deserialize
pub fn get_slot_from_packet(packet: &Packet) -> Option<Slot> { pub fn get_slot_from_packet(p: &Packet) -> Option<Slot> {
let buffer = packet.data.get(OFFSET_OF_SHRED_SLOT..)?; let slot_start = OFFSET_OF_SHRED_SLOT;
limited_deserialize(buffer).ok() let slot_end = slot_start + SIZE_OF_SHRED_SLOT;
p.deserialize_slice(slot_start..slot_end).ok()
} }
pub(crate) fn reference_tick_from_data(data: &[u8]) -> Result<u8, Error> { pub(crate) fn reference_tick_from_data(data: &[u8]) -> Result<u8, Error> {
@ -553,7 +554,7 @@ pub fn get_shred_slot_index_type(
return None; return None;
} }
let index = match limited_deserialize::<u32>(&p.data[index_start..index_end]) { let index = match p.deserialize_slice(index_start..index_end) {
Ok(x) => x, Ok(x) => x,
Err(_e) => { Err(_e) => {
stats.index_bad_deserialize += 1; stats.index_bad_deserialize += 1;
@ -566,7 +567,7 @@ pub fn get_shred_slot_index_type(
return None; return None;
} }
let slot = match limited_deserialize::<Slot>(&p.data[slot_start..slot_end]) { let slot = match p.deserialize_slice(slot_start..slot_end) {
Ok(x) => x, Ok(x) => x,
Err(_e) => { Err(_e) => {
stats.slot_bad_deserialize += 1; stats.slot_bad_deserialize += 1;

View File

@ -12,7 +12,7 @@ use {
solana_metrics::inc_new_counter_debug, solana_metrics::inc_new_counter_debug,
solana_perf::{ solana_perf::{
cuda_runtime::PinnedVec, cuda_runtime::PinnedVec,
packet::{limited_deserialize, Packet, PacketBatch}, packet::{Packet, PacketBatch},
perf_libs, perf_libs,
recycler_cache::RecyclerCache, recycler_cache::RecyclerCache,
sigverify::{self, count_packets_in_batches, TxOffset}, sigverify::{self, count_packets_in_batches, TxOffset},
@ -54,10 +54,7 @@ pub fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap<u64, [u8; 32]>)
return Some(0); return Some(0);
} }
trace!("slot start and end {} {}", slot_start, slot_end); trace!("slot start and end {} {}", slot_start, slot_end);
if packet.meta.size < slot_end { let slot: u64 = packet.deserialize_slice(slot_start..slot_end).ok()?;
return Some(0);
}
let slot: u64 = limited_deserialize(&packet.data[slot_start..slot_end]).ok()?;
let msg_end = if packet.meta.repair() { let msg_end = if packet.meta.repair() {
packet.meta.size.saturating_sub(SIZE_OF_NONCE) packet.meta.size.saturating_sub(SIZE_OF_NONCE)
} else { } else {
@ -115,16 +112,17 @@ fn slot_key_data_for_gpu<
batch batch
.iter() .iter()
.map(|packet| { .map(|packet| {
let slot_start = size_of::<Signature>() + size_of::<ShredType>(); if packet.meta.discard() {
let slot_end = slot_start + size_of::<u64>(); return Slot::MAX;
if packet.meta.size < slot_end || packet.meta.discard() {
return std::u64::MAX;
} }
let slot: Option<u64> =
limited_deserialize(&packet.data[slot_start..slot_end]).ok(); let slot_start = size_of::<Signature>() + size_of::<ShredType>();
let slot_end = slot_start + size_of::<Slot>();
let slot: Option<Slot> =
packet.deserialize_slice(slot_start..slot_end).ok();
match slot { match slot {
Some(slot) if slot_keys.get(&slot).is_some() => slot, Some(slot) if slot_keys.get(&slot).is_some() => slot,
_ => std::u64::MAX, _ => Slot::MAX,
} }
}) })
.collect() .collect()

View File

@ -229,17 +229,6 @@ pub fn to_packet_batches_for_tests<T: Serialize>(items: &[T]) -> Vec<PacketBatch
to_packet_batches(items, NUM_PACKETS) to_packet_batches(items, NUM_PACKETS)
} }
pub fn limited_deserialize<T>(data: &[u8]) -> bincode::Result<T>
where
T: serde::de::DeserializeOwned,
{
bincode::options()
.with_limit(PACKET_DATA_SIZE as u64)
.with_fixint_encoding()
.allow_trailing_bytes()
.deserialize_from(data)
}
pub fn deserialize_from_with_limit<R, T>(reader: R) -> bincode::Result<T> pub fn deserialize_from_with_limit<R, T>(reader: R) -> bincode::Result<T>
where where
R: Read, R: Read,

View File

@ -1,5 +1,5 @@
use { use {
bincode::Result, bincode::{Options, Result},
bitflags::bitflags, bitflags::bitflags,
serde::Serialize, serde::Serialize,
std::{ std::{
@ -67,6 +67,20 @@ impl Packet {
} }
Ok(()) Ok(())
} }
pub fn deserialize_slice<T, I>(&self, index: I) -> Result<T>
where
T: serde::de::DeserializeOwned,
I: std::slice::SliceIndex<[u8], Output = [u8]>,
{
let data = &self.data[0..self.meta.size];
let bytes = data.get(index).ok_or(bincode::ErrorKind::SizeLimit)?;
bincode::options()
.with_limit(PACKET_DATA_SIZE as u64)
.with_fixint_encoding()
.reject_trailing_bytes()
.deserialize(bytes)
}
} }
impl fmt::Debug for Packet { impl fmt::Debug for Packet {
@ -150,3 +164,47 @@ impl Default for Meta {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_deserialize_slice() {
let p = Packet::from_data(None, u32::MAX).unwrap();
assert_eq!(p.deserialize_slice(..).ok(), Some(u32::MAX));
assert_eq!(p.deserialize_slice(0..4).ok(), Some(u32::MAX));
assert_eq!(
p.deserialize_slice::<u16, _>(0..4)
.map_err(|e| e.to_string()),
Err("Slice had bytes remaining after deserialization".to_string()),
);
assert_eq!(
p.deserialize_slice::<u32, _>(0..0)
.map_err(|e| e.to_string()),
Err("io error: unexpected end of file".to_string()),
);
assert_eq!(
p.deserialize_slice::<u32, _>(0..1)
.map_err(|e| e.to_string()),
Err("io error: unexpected end of file".to_string()),
);
assert_eq!(
p.deserialize_slice::<u32, _>(0..5)
.map_err(|e| e.to_string()),
Err("the size limit has been reached".to_string()),
);
#[allow(clippy::reversed_empty_ranges)]
let reversed_empty_range = 4..0;
assert_eq!(
p.deserialize_slice::<u32, _>(reversed_empty_range)
.map_err(|e| e.to_string()),
Err("the size limit has been reached".to_string()),
);
assert_eq!(
p.deserialize_slice::<u32, _>(4..5)
.map_err(|e| e.to_string()),
Err("the size limit has been reached".to_string()),
);
}
}

View File

@ -9,8 +9,7 @@ use {
}; };
pub use { pub use {
solana_perf::packet::{ solana_perf::packet::{
limited_deserialize, to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS, to_packet_batches, PacketBatch, PacketBatchRecycler, NUM_PACKETS, PACKETS_PER_BATCH,
PACKETS_PER_BATCH,
}, },
solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}, solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE},
}; };