Trim extra shred bytes in blockstore (#16602)
Strip the zero-padding off of data shreds before insertion into blockstore Co-authored-by: Stephen Akridge <sakridge@gmail.com> Co-authored-by: Nathan Hawkins <utsl@utsl.org>
This commit is contained in:
parent
283f587afe
commit
bc31378797
|
@ -2944,7 +2944,6 @@ pub(crate) mod tests {
|
||||||
fn test_dead_fork_entry_deserialize_failure() {
|
fn test_dead_fork_entry_deserialize_failure() {
|
||||||
// Insert entry that causes deserialization failure
|
// Insert entry that causes deserialization failure
|
||||||
let res = check_dead_fork(|_, _| {
|
let res = check_dead_fork(|_, _| {
|
||||||
let payload_len = SIZE_OF_DATA_SHRED_PAYLOAD;
|
|
||||||
let gibberish = [0xa5u8; PACKET_DATA_SIZE];
|
let gibberish = [0xa5u8; PACKET_DATA_SIZE];
|
||||||
let mut data_header = DataShredHeader::default();
|
let mut data_header = DataShredHeader::default();
|
||||||
data_header.flags |= DATA_COMPLETE_SHRED;
|
data_header.flags |= DATA_COMPLETE_SHRED;
|
||||||
|
@ -2957,7 +2956,7 @@ pub(crate) mod tests {
|
||||||
);
|
);
|
||||||
bincode::serialize_into(
|
bincode::serialize_into(
|
||||||
&mut shred.payload[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER..],
|
&mut shred.payload[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER..],
|
||||||
&gibberish[..payload_len],
|
&gibberish[..SIZE_OF_DATA_SHRED_PAYLOAD],
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
vec![shred]
|
vec![shred]
|
||||||
|
|
|
@ -611,9 +611,7 @@ mod tests {
|
||||||
use solana_ledger::{
|
use solana_ledger::{
|
||||||
blockstore::make_many_slot_entries,
|
blockstore::make_many_slot_entries,
|
||||||
blockstore_processor::fill_blockstore_slot_with_ticks,
|
blockstore_processor::fill_blockstore_slot_with_ticks,
|
||||||
shred::{
|
shred::{max_ticks_per_n_shreds, Shred},
|
||||||
max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader,
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
use solana_perf::packet::Packet;
|
use solana_perf::packet::Packet;
|
||||||
use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp};
|
use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp};
|
||||||
|
@ -726,23 +724,10 @@ mod tests {
|
||||||
nonce,
|
nonce,
|
||||||
);
|
);
|
||||||
assert!(rv.is_none());
|
assert!(rv.is_none());
|
||||||
let common_header = ShredCommonHeader {
|
let shred = Shred::new_from_data(slot, 1, 1, None, false, false, 0, 2, 0);
|
||||||
slot,
|
|
||||||
index: 1,
|
|
||||||
..ShredCommonHeader::default()
|
|
||||||
};
|
|
||||||
let data_header = DataShredHeader {
|
|
||||||
parent_offset: 1,
|
|
||||||
..DataShredHeader::default()
|
|
||||||
};
|
|
||||||
let shred_info = Shred::new_empty_from_header(
|
|
||||||
common_header,
|
|
||||||
data_header,
|
|
||||||
CodingShredHeader::default(),
|
|
||||||
);
|
|
||||||
|
|
||||||
blockstore
|
blockstore
|
||||||
.insert_shreds(vec![shred_info], None, false)
|
.insert_shreds(vec![shred], None, false)
|
||||||
.expect("Expect successful ledger write");
|
.expect("Expect successful ledger write");
|
||||||
|
|
||||||
let index = 1;
|
let index = 1;
|
||||||
|
@ -954,6 +939,7 @@ mod tests {
|
||||||
assert_eq!(shreds[0].slot(), 1);
|
assert_eq!(shreds[0].slot(), 1);
|
||||||
assert_eq!(shreds[0].index(), 0);
|
assert_eq!(shreds[0].index(), 0);
|
||||||
shreds[0].payload.push(10);
|
shreds[0].payload.push(10);
|
||||||
|
shreds[0].data_header.size = shreds[0].payload.len() as u16;
|
||||||
blockstore
|
blockstore
|
||||||
.insert_shreds(shreds, None, false)
|
.insert_shreds(shreds, None, false)
|
||||||
.expect("Expect successful ledger write");
|
.expect("Expect successful ledger write");
|
||||||
|
|
|
@ -1281,6 +1281,7 @@ impl Blockstore {
|
||||||
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
|
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
|
||||||
is_recovered: bool,
|
is_recovered: bool,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
|
use crate::shred::SHRED_PAYLOAD_SIZE;
|
||||||
let shred_index = u64::from(shred.index());
|
let shred_index = u64::from(shred.index());
|
||||||
let slot = shred.slot();
|
let slot = shred.slot();
|
||||||
let last_in_slot = if shred.last_in_slot() {
|
let last_in_slot = if shred.last_in_slot() {
|
||||||
|
@ -1290,6 +1291,13 @@ impl Blockstore {
|
||||||
false
|
false
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if shred.data_header.size == 0 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if shred.payload.len() > SHRED_PAYLOAD_SIZE {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// Check that we do not receive shred_index >= than the last_index
|
// Check that we do not receive shred_index >= than the last_index
|
||||||
// for the slot
|
// for the slot
|
||||||
let last_index = slot_meta.last_index;
|
let last_index = slot_meta.last_index;
|
||||||
|
@ -1410,7 +1418,12 @@ impl Blockstore {
|
||||||
|
|
||||||
// Commit step: commit all changes to the mutable structures at once, or none at all.
|
// Commit step: commit all changes to the mutable structures at once, or none at all.
|
||||||
// We don't want only a subset of these changes going through.
|
// We don't want only a subset of these changes going through.
|
||||||
write_batch.put_bytes::<cf::ShredData>((slot, index), &shred.payload)?;
|
write_batch.put_bytes::<cf::ShredData>(
|
||||||
|
(slot, index),
|
||||||
|
// Payload will be padded out to SHRED_PAYLOAD_SIZE
|
||||||
|
// But only need to store the bytes within data_header.size
|
||||||
|
&shred.payload[..shred.data_header.size as usize],
|
||||||
|
)?;
|
||||||
data_index.set_present(index, true);
|
data_index.set_present(index, true);
|
||||||
let newly_completed_data_sets = update_slot_meta(
|
let newly_completed_data_sets = update_slot_meta(
|
||||||
last_in_slot,
|
last_in_slot,
|
||||||
|
@ -1438,7 +1451,16 @@ impl Blockstore {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result<Option<Vec<u8>>> {
|
pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result<Option<Vec<u8>>> {
|
||||||
self.data_shred_cf.get_bytes((slot, index))
|
use crate::shred::SHRED_PAYLOAD_SIZE;
|
||||||
|
self.data_shred_cf.get_bytes((slot, index)).map(|data| {
|
||||||
|
data.map(|mut d| {
|
||||||
|
// Only data_header.size bytes stored in the blockstore so
|
||||||
|
// pad the payload out to SHRED_PAYLOAD_SIZE so that the
|
||||||
|
// erasure recovery works properly.
|
||||||
|
d.resize(cmp::max(d.len(), SHRED_PAYLOAD_SIZE), 0);
|
||||||
|
d
|
||||||
|
})
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_data_shreds_for_slot(
|
pub fn get_data_shreds_for_slot(
|
||||||
|
@ -2805,7 +2827,7 @@ impl Blockstore {
|
||||||
&self,
|
&self,
|
||||||
slot: u64,
|
slot: u64,
|
||||||
index: u32,
|
index: u32,
|
||||||
new_shred: &[u8],
|
new_shred_raw: &[u8],
|
||||||
is_data: bool,
|
is_data: bool,
|
||||||
) -> Option<Vec<u8>> {
|
) -> Option<Vec<u8>> {
|
||||||
let res = if is_data {
|
let res = if is_data {
|
||||||
|
@ -2816,8 +2838,14 @@ impl Blockstore {
|
||||||
.expect("fetch from DuplicateSlots column family failed")
|
.expect("fetch from DuplicateSlots column family failed")
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let mut payload = new_shred_raw.to_vec();
|
||||||
|
payload.resize(
|
||||||
|
std::cmp::max(new_shred_raw.len(), crate::shred::SHRED_PAYLOAD_SIZE),
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
let new_shred = Shred::new_from_serialized_shred(payload).unwrap();
|
||||||
res.map(|existing_shred| {
|
res.map(|existing_shred| {
|
||||||
if existing_shred != new_shred {
|
if existing_shred != new_shred.payload {
|
||||||
Some(existing_shred)
|
Some(existing_shred)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
@ -5270,6 +5298,23 @@ pub mod tests {
|
||||||
.insert_shreds(shreds[0..5].to_vec(), None, false)
|
.insert_shreds(shreds[0..5].to_vec(), None, false)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
let slot_meta = blockstore.meta(0).unwrap().unwrap();
|
||||||
|
// Corrupt shred by making it too large
|
||||||
|
let mut shred5 = shreds[5].clone();
|
||||||
|
shred5.payload.push(10);
|
||||||
|
shred5.data_header.size = shred5.payload.len() as u16;
|
||||||
|
assert_eq!(
|
||||||
|
blockstore.should_insert_data_shred(
|
||||||
|
&shred5,
|
||||||
|
&slot_meta,
|
||||||
|
&HashMap::new(),
|
||||||
|
&last_root,
|
||||||
|
None,
|
||||||
|
false
|
||||||
|
),
|
||||||
|
false
|
||||||
|
);
|
||||||
|
|
||||||
// Trying to insert another "is_last" shred with index < the received index should fail
|
// Trying to insert another "is_last" shred with index < the received index should fail
|
||||||
// skip over shred 7
|
// skip over shred 7
|
||||||
blockstore
|
blockstore
|
||||||
|
@ -7722,7 +7767,7 @@ pub mod tests {
|
||||||
&duplicate_shred.payload,
|
&duplicate_shred.payload,
|
||||||
duplicate_shred.is_data()
|
duplicate_shred.is_data()
|
||||||
),
|
),
|
||||||
Some(shred.payload.clone())
|
Some(shred.payload.to_vec())
|
||||||
);
|
);
|
||||||
assert!(blockstore
|
assert!(blockstore
|
||||||
.is_shred_duplicate(
|
.is_shred_duplicate(
|
||||||
|
|
|
@ -216,11 +216,12 @@ impl Shred {
|
||||||
where
|
where
|
||||||
T: Deserialize<'de>,
|
T: Deserialize<'de>,
|
||||||
{
|
{
|
||||||
|
let end = std::cmp::min(*index + size, buf.len());
|
||||||
let ret = bincode::options()
|
let ret = bincode::options()
|
||||||
.with_limit(PACKET_DATA_SIZE as u64)
|
.with_limit(PACKET_DATA_SIZE as u64)
|
||||||
.with_fixint_encoding()
|
.with_fixint_encoding()
|
||||||
.allow_trailing_bytes()
|
.allow_trailing_bytes()
|
||||||
.deserialize(&buf[*index..*index + size])?;
|
.deserialize(&buf[*index..end])?;
|
||||||
*index += size;
|
*index += size;
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
@ -318,15 +319,10 @@ impl Shred {
|
||||||
Self::deserialize_obj(&mut start, SIZE_OF_COMMON_SHRED_HEADER, &payload)?;
|
Self::deserialize_obj(&mut start, SIZE_OF_COMMON_SHRED_HEADER, &payload)?;
|
||||||
|
|
||||||
let slot = common_header.slot;
|
let slot = common_header.slot;
|
||||||
let expected_data_size = SHRED_PAYLOAD_SIZE;
|
// Shreds should be padded out to SHRED_PAYLOAD_SIZE
|
||||||
// Safe because any payload from the network must have passed through
|
// so that erasure generation/recovery works correctly
|
||||||
// window service, which implies payload wll be of size
|
// But only the data_header.size is stored in blockstore.
|
||||||
// PACKET_DATA_SIZE, and `expected_data_size` <= PACKET_DATA_SIZE.
|
payload.resize(SHRED_PAYLOAD_SIZE, 0);
|
||||||
//
|
|
||||||
// On the other hand, if this function is called locally, the payload size should match
|
|
||||||
// the `expected_data_size`.
|
|
||||||
assert!(payload.len() >= expected_data_size);
|
|
||||||
payload.truncate(expected_data_size);
|
|
||||||
let shred = if common_header.shred_type == ShredType(CODING_SHRED) {
|
let shred = if common_header.shred_type == ShredType(CODING_SHRED) {
|
||||||
let coding_header: CodingShredHeader =
|
let coding_header: CodingShredHeader =
|
||||||
Self::deserialize_obj(&mut start, SIZE_OF_CODING_SHRED_HEADER, &payload)?;
|
Self::deserialize_obj(&mut start, SIZE_OF_CODING_SHRED_HEADER, &payload)?;
|
||||||
|
|
Loading…
Reference in New Issue