diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index edcb824045..c62ea76edf 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -2944,7 +2944,6 @@ pub(crate) mod tests { fn test_dead_fork_entry_deserialize_failure() { // Insert entry that causes deserialization failure let res = check_dead_fork(|_, _| { - let payload_len = SIZE_OF_DATA_SHRED_PAYLOAD; let gibberish = [0xa5u8; PACKET_DATA_SIZE]; let mut data_header = DataShredHeader::default(); data_header.flags |= DATA_COMPLETE_SHRED; @@ -2957,7 +2956,7 @@ pub(crate) mod tests { ); bincode::serialize_into( &mut shred.payload[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER..], - &gibberish[..payload_len], + &gibberish[..SIZE_OF_DATA_SHRED_PAYLOAD], ) .unwrap(); vec![shred] diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index c0ca3e971d..17e9fa098b 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -611,9 +611,7 @@ mod tests { use solana_ledger::{ blockstore::make_many_slot_entries, blockstore_processor::fill_blockstore_slot_with_ticks, - shred::{ - max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, - }, + shred::{max_ticks_per_n_shreds, Shred}, }; use solana_perf::packet::Packet; use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp}; @@ -726,23 +724,10 @@ mod tests { nonce, ); assert!(rv.is_none()); - let common_header = ShredCommonHeader { - slot, - index: 1, - ..ShredCommonHeader::default() - }; - let data_header = DataShredHeader { - parent_offset: 1, - ..DataShredHeader::default() - }; - let shred_info = Shred::new_empty_from_header( - common_header, - data_header, - CodingShredHeader::default(), - ); + let shred = Shred::new_from_data(slot, 1, 1, None, false, false, 0, 2, 0); blockstore - .insert_shreds(vec![shred_info], None, false) + .insert_shreds(vec![shred], None, false) .expect("Expect successful ledger write"); let index = 1; @@ -954,6 +939,7 @@ mod tests { assert_eq!(shreds[0].slot(), 1); assert_eq!(shreds[0].index(), 0); shreds[0].payload.push(10); + shreds[0].data_header.size = shreds[0].payload.len() as u16; blockstore .insert_shreds(shreds, None, false) .expect("Expect successful ledger write"); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 0b33fb15ad..794d95df0b 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1281,6 +1281,7 @@ impl Blockstore { leader_schedule: Option<&Arc>, is_recovered: bool, ) -> bool { + use crate::shred::SHRED_PAYLOAD_SIZE; let shred_index = u64::from(shred.index()); let slot = shred.slot(); let last_in_slot = if shred.last_in_slot() { @@ -1290,6 +1291,13 @@ impl Blockstore { false }; + if shred.data_header.size == 0 { + return false; + } + if shred.payload.len() > SHRED_PAYLOAD_SIZE { + return false; + } + // Check that we do not receive shred_index >= than the last_index // for the slot let last_index = slot_meta.last_index; @@ -1410,7 +1418,12 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, index), &shred.payload)?; + write_batch.put_bytes::( + (slot, index), + // Payload will be padded out to SHRED_PAYLOAD_SIZE + // But only need to store the bytes within data_header.size + &shred.payload[..shred.data_header.size as usize], + )?; data_index.set_present(index, true); let newly_completed_data_sets = update_slot_meta( last_in_slot, @@ -1438,7 +1451,16 @@ impl Blockstore { } pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result>> { - self.data_shred_cf.get_bytes((slot, index)) + use crate::shred::SHRED_PAYLOAD_SIZE; + self.data_shred_cf.get_bytes((slot, index)).map(|data| { + data.map(|mut d| { + // Only data_header.size bytes stored in the blockstore so + // pad the payload out to SHRED_PAYLOAD_SIZE so that the + // erasure recovery works properly. + d.resize(cmp::max(d.len(), SHRED_PAYLOAD_SIZE), 0); + d + }) + }) } pub fn get_data_shreds_for_slot( @@ -2805,7 +2827,7 @@ impl Blockstore { &self, slot: u64, index: u32, - new_shred: &[u8], + new_shred_raw: &[u8], is_data: bool, ) -> Option> { let res = if is_data { @@ -2816,8 +2838,14 @@ impl Blockstore { .expect("fetch from DuplicateSlots column family failed") }; + let mut payload = new_shred_raw.to_vec(); + payload.resize( + std::cmp::max(new_shred_raw.len(), crate::shred::SHRED_PAYLOAD_SIZE), + 0, + ); + let new_shred = Shred::new_from_serialized_shred(payload).unwrap(); res.map(|existing_shred| { - if existing_shred != new_shred { + if existing_shred != new_shred.payload { Some(existing_shred) } else { None @@ -5270,6 +5298,23 @@ pub mod tests { .insert_shreds(shreds[0..5].to_vec(), None, false) .unwrap(); + let slot_meta = blockstore.meta(0).unwrap().unwrap(); + // Corrupt shred by making it too large + let mut shred5 = shreds[5].clone(); + shred5.payload.push(10); + shred5.data_header.size = shred5.payload.len() as u16; + assert_eq!( + blockstore.should_insert_data_shred( + &shred5, + &slot_meta, + &HashMap::new(), + &last_root, + None, + false + ), + false + ); + // Trying to insert another "is_last" shred with index < the received index should fail // skip over shred 7 blockstore @@ -7722,7 +7767,7 @@ pub mod tests { &duplicate_shred.payload, duplicate_shred.is_data() ), - Some(shred.payload.clone()) + Some(shred.payload.to_vec()) ); assert!(blockstore .is_shred_duplicate( diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index db76902b93..1d9fb96a98 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -216,11 +216,12 @@ impl Shred { where T: Deserialize<'de>, { + let end = std::cmp::min(*index + size, buf.len()); let ret = bincode::options() .with_limit(PACKET_DATA_SIZE as u64) .with_fixint_encoding() .allow_trailing_bytes() - .deserialize(&buf[*index..*index + size])?; + .deserialize(&buf[*index..end])?; *index += size; Ok(ret) } @@ -318,15 +319,10 @@ impl Shred { Self::deserialize_obj(&mut start, SIZE_OF_COMMON_SHRED_HEADER, &payload)?; let slot = common_header.slot; - let expected_data_size = SHRED_PAYLOAD_SIZE; - // Safe because any payload from the network must have passed through - // window service, which implies payload wll be of size - // PACKET_DATA_SIZE, and `expected_data_size` <= PACKET_DATA_SIZE. - // - // On the other hand, if this function is called locally, the payload size should match - // the `expected_data_size`. - assert!(payload.len() >= expected_data_size); - payload.truncate(expected_data_size); + // Shreds should be padded out to SHRED_PAYLOAD_SIZE + // so that erasure generation/recovery works correctly + // But only the data_header.size is stored in blockstore. + payload.resize(SHRED_PAYLOAD_SIZE, 0); let shred = if common_header.shred_type == ShredType(CODING_SHRED) { let coding_header: CodingShredHeader = Self::deserialize_obj(&mut start, SIZE_OF_CODING_SHRED_HEADER, &payload)?;