From 895f76a93c1fff61cfa4686d9925d87638dfdeb1 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 25 Apr 2022 12:43:22 +0000 Subject: [PATCH] hides implementation details of shred from its public interface (#24563) Working towards embedding versioning into shreds binary, so that a new variant of shred struct can include merkle tree hashes of the erasure set. --- core/benches/cluster_nodes.rs | 4 +- core/benches/shredder.rs | 28 +- core/src/broadcast_stage.rs | 2 +- .../broadcast_duplicates_run.rs | 4 +- .../broadcast_fake_shreds_run.rs | 2 +- core/src/packet_hasher.rs | 2 +- core/src/repair_response.rs | 4 +- core/src/replay_stage.rs | 37 +-- core/src/retransmit_stage.rs | 2 +- core/src/serve_repair.rs | 7 +- core/src/sigverify_shreds.rs | 16 +- core/src/window_service.rs | 31 +- gossip/src/cluster_info.rs | 7 +- gossip/src/duplicate_shred.rs | 12 +- ledger/src/blockstore.rs | 281 +++++------------ ledger/src/blockstore_meta.rs | 4 +- ledger/src/shred.rs | 289 +++++++++++++----- ledger/src/sigverify_shreds.rs | 18 +- ledger/tests/shred.rs | 4 +- 19 files changed, 365 insertions(+), 389 deletions(-) diff --git a/core/benches/cluster_nodes.rs b/core/benches/cluster_nodes.rs index c2302fcabc..9b63a40c67 100644 --- a/core/benches/cluster_nodes.rs +++ b/core/benches/cluster_nodes.rs @@ -37,7 +37,7 @@ fn get_retransmit_peers_deterministic( num_simulated_shreds: usize, ) { for i in 0..num_simulated_shreds { - shred.common_header.index = i as u32; + shred.set_index(i as u32); let (_neighbors, _children) = cluster_nodes.get_retransmit_peers( *slot_leader, shred, @@ -55,7 +55,7 @@ fn get_retransmit_peers_deterministic_wrapper(b: &mut Bencher, unstaked_ratio: O let slot_leader = nodes[1..].choose(&mut rng).unwrap().id; let slot = rand::random::(); let mut shred = Shred::new_empty_data_shred(); - shred.common_header.slot = slot; + shred.set_slot(slot); b.iter(|| { get_retransmit_peers_deterministic( &cluster_nodes, diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 565f8ced2d..89571b9c91 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -9,14 +9,21 @@ use { solana_entry::entry::{create_ticks, Entry}, solana_ledger::shred::{ max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, Shredder, - MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_PAYLOAD_SIZE, SIZE_OF_CODING_SHRED_HEADERS, - SIZE_OF_DATA_SHRED_PAYLOAD, + MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD, }, solana_perf::test_tx, - solana_sdk::{hash::Hash, signature::Keypair}, + solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair}, test::Bencher, }; +// Copied these values here to avoid exposing shreds +// internals only for the sake of benchmarks. + +// size of nonce: 4 +// size of common shred header: 83 +// size of coding shred header: 6 +const VALID_SHRED_DATA_LEN: usize = PACKET_DATA_SIZE - 4 - 83 - 6; + fn make_test_entry(txs_per_entry: u64) -> Entry { Entry { num_hashes: 100_000, @@ -54,11 +61,10 @@ fn make_shreds(num_shreds: usize) -> Vec { fn make_concatenated_shreds(num_shreds: usize) -> Vec { let data_shreds = make_shreds(num_shreds); - let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize; - let mut data: Vec = vec![0; num_shreds * valid_shred_data_len]; + let mut data: Vec = vec![0; num_shreds * VALID_SHRED_DATA_LEN]; for (i, shred) in (data_shreds[0..num_shreds]).iter().enumerate() { - data[i * valid_shred_data_len..(i + 1) * valid_shred_data_len] - .copy_from_slice(&shred.payload[..valid_shred_data_len]); + data[i * VALID_SHRED_DATA_LEN..(i + 1) * VALID_SHRED_DATA_LEN] + .copy_from_slice(&shred.payload()[..VALID_SHRED_DATA_LEN]); } data @@ -120,7 +126,7 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) { let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0, 0, 1); bencher.iter(|| { - let payload = shred.payload.clone(); + let payload = shred.payload().clone(); let _ = Shred::new_from_serialized_shred(payload).unwrap(); }) } @@ -157,9 +163,8 @@ fn bench_shredder_decoding(bencher: &mut Bencher) { fn bench_shredder_coding_raptorq(bencher: &mut Bencher) { let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK; let data = make_concatenated_shreds(symbol_count as usize); - let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize; bencher.iter(|| { - let encoder = Encoder::with_defaults(&data, valid_shred_data_len as u16); + let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16); encoder.get_encoded_packets(symbol_count); }) } @@ -168,8 +173,7 @@ fn bench_shredder_coding_raptorq(bencher: &mut Bencher) { fn bench_shredder_decoding_raptorq(bencher: &mut Bencher) { let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK; let data = make_concatenated_shreds(symbol_count as usize); - let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize; - let encoder = Encoder::with_defaults(&data, valid_shred_data_len as u16); + let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16); let mut packets = encoder.get_encoded_packets(symbol_count as u32); packets.shuffle(&mut rand::thread_rng()); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 84cdb9fa22..11e7bf979a 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -424,7 +424,7 @@ pub fn broadcast_shreds( update_peer_stats(&cluster_nodes, last_datapoint_submit); let root_bank = root_bank.clone(); shreds.flat_map(move |shred| { - repeat(&shred.payload).zip(cluster_nodes.get_broadcast_addrs( + repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs( shred, &root_bank, DATA_PLANE_FANOUT, diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index f50943c24e..e4ba405193 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -325,13 +325,13 @@ impl BroadcastRun for BroadcastDuplicatesRun { .filter_map(|pubkey| { let tvu = cluster_info .lookup_contact_info(pubkey, |contact_info| contact_info.tvu)?; - Some((&shred.payload, tvu)) + Some((shred.payload(), tvu)) }) .collect(), ); } - Some(vec![(&shred.payload, node.tvu)]) + Some(vec![(shred.payload(), node.tvu)]) }) .flatten() .collect(); diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index a0bf77153a..90f404933c 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -132,7 +132,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { if fake == (i <= self.partition) { // Send fake shreds to the first N peers data_shreds.iter().for_each(|b| { - sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); + sock.send_to(b.payload(), &peer.tvu_forwards).unwrap(); }); } }); diff --git a/core/src/packet_hasher.rs b/core/src/packet_hasher.rs index 575c9733fd..51746445bd 100644 --- a/core/src/packet_hasher.rs +++ b/core/src/packet_hasher.rs @@ -31,7 +31,7 @@ impl PacketHasher { } pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 { - self.hash_data(&shred.payload) + self.hash_data(shred.payload()) } fn hash_data(&self, data: &[u8]) -> u64 { diff --git a/core/src/repair_response.rs b/core/src/repair_response.rs index 201efc4554..b2235c0870 100644 --- a/core/src/repair_response.rs +++ b/core/src/repair_response.rs @@ -79,10 +79,10 @@ mod test { assert_eq!(shred.slot(), slot); let keypair = Keypair::new(); shred.sign(&keypair); - trace!("signature {}", shred.common_header.signature); + trace!("signature {}", shred.signature()); let nonce = 9; let mut packet = repair_response_packet_from_bytes( - shred.payload, + shred.into_payload(), &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080), nonce, ) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 84a355eea7..1a99a1e527 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3144,10 +3144,7 @@ pub mod tests { create_new_tmp_ledger, genesis_utils::{create_genesis_config, create_genesis_config_with_leader}, get_tmp_ledger_path, - shred::{ - CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED, - SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD, - }, + shred::{Shred, SIZE_OF_DATA_SHRED_PAYLOAD}, }, solana_rpc::{ optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, @@ -3163,7 +3160,6 @@ pub mod tests { genesis_config, hash::{hash, Hash}, instruction::InstructionError, - packet::PACKET_DATA_SIZE, poh_config::PohConfig, signature::{Keypair, Signer}, system_transaction, @@ -3747,26 +3743,19 @@ pub mod tests { fn test_dead_fork_entry_deserialize_failure() { // Insert entry that causes deserialization failure let res = check_dead_fork(|_, bank| { - let gibberish = [0xa5u8; PACKET_DATA_SIZE]; - let mut data_header = DataShredHeader::default(); - data_header.flags |= DATA_COMPLETE_SHRED; - // Need to provide the right size for Shredder::deshred. - data_header.size = SIZE_OF_DATA_SHRED_PAYLOAD as u16; - data_header.parent_offset = (bank.slot() - bank.parent_slot()) as u16; - let shred_common_header = ShredCommonHeader { - slot: bank.slot(), - ..ShredCommonHeader::default() - }; - let mut shred = Shred::new_empty_from_header( - shred_common_header, - data_header, - CodingShredHeader::default(), + let gibberish = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD]; + let parent_offset = bank.slot() - bank.parent_slot(); + let shred = Shred::new_from_data( + bank.slot(), + 0, // index, + parent_offset as u16, + Some(&gibberish), + true, // is_last_data + false, // is_last_in_slot + 0, // reference_tick + 0, // version + 0, // fec_set_index ); - bincode::serialize_into( - &mut shred.payload[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER..], - &gibberish[..SIZE_OF_DATA_SHRED_PAYLOAD], - ) - .unwrap(); vec![shred] }); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index e89fe32bb0..b3b51df34b 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -295,7 +295,7 @@ fn retransmit( .fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed); let mut retransmit_time = Measure::start("retransmit_to"); - let num_nodes = match multi_target_send(socket, &shred.payload, &addrs) { + let num_nodes = match multi_target_send(socket, shred.payload(), &addrs) { Ok(()) => addrs.len(), Err(SendPktsError::IoError(ioerr, num_failed)) => { stats diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 137cdbaa0c..6227d142bd 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -1095,11 +1095,12 @@ mod tests { // Create slots [1, 2] with 1 shred apiece let (mut shreds, _) = make_many_slot_entries(1, 2, 1); - // Make shred for slot 1 too large assert_eq!(shreds[0].slot(), 1); assert_eq!(shreds[0].index(), 0); - shreds[0].payload.push(10); - shreds[0].data_header.size = shreds[0].payload.len() as u16; + // TODO: The test previously relied on corrupting shred payload + // size which we no longer want to expose. Current test no longer + // covers packet size check in repair_response_packet_from_bytes. + shreds.remove(0); blockstore .insert_shreds(shreds, None, false) .expect("Expect successful ledger write"); diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 4ac60a2971..2175fe5113 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -94,8 +94,8 @@ pub mod tests { let keypair = Keypair::new(); shred.sign(&keypair); batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0].packets[0].meta.size = shred.payload().len(); let mut shred = Shred::new_from_data( 0xc0de_dead, @@ -110,8 +110,8 @@ pub mod tests { ); shred.sign(&keypair); batches[1].packets.resize(1, Packet::default()); - batches[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[1].packets[0].meta.size = shred.payload.len(); + batches[1].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[1].packets[0].meta.size = shred.payload().len(); let expected: HashSet = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect(); assert_eq!(ShredSigVerifier::read_slots(&batches), expected); @@ -143,8 +143,8 @@ pub mod tests { 0xc0de, ); shred.sign(&leader_keypair); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0].packets[0].meta.size = shred.payload().len(); let mut shred = Shred::new_from_data( 0, @@ -159,8 +159,8 @@ pub mod tests { ); let wrong_keypair = Keypair::new(); shred.sign(&wrong_keypair); - batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[1].meta.size = shred.payload.len(); + batches[0].packets[1].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0].packets[1].meta.size = shred.payload().len(); let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches); let rv = verifier.verify_batches(batches, num_packets); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index f5eaa11864..5b12d6f441 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -193,8 +193,8 @@ pub(crate) fn should_retransmit_and_persist( } else if shred.index() >= MAX_DATA_SHREDS_PER_SLOT as u32 { inc_new_counter_warn!("streamer-recv_window-shred_index_overrun", 1); false - } else if shred.data_header.size as usize > shred.payload.len() { - inc_new_counter_warn!("streamer-recv_window-shred_bad_meta_size", 1); + } else if !shred.sanitize() { + inc_new_counter_warn!("streamer-recv_window-invalid-shred", 1); false } else { true @@ -215,13 +215,13 @@ fn run_check_duplicate( let shred_slot = shred.slot(); if !blockstore.has_duplicate_shreds_in_slot(shred_slot) { if let Some(existing_shred_payload) = - blockstore.is_shred_duplicate(shred.id(), shred.payload.clone()) + blockstore.is_shred_duplicate(shred.id(), shred.payload().clone()) { cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?; blockstore.store_duplicate_slot( shred_slot, existing_shred_payload, - shred.payload, + shred.into_payload(), )?; duplicate_slot_sender.send(shred_slot)?; @@ -717,7 +717,7 @@ mod test { blockstore::{make_many_slot_entries, Blockstore}, genesis_utils::create_genesis_config_with_leader, get_tmp_ledger_path, - shred::{DataShredHeader, Shredder}, + shred::Shredder, }, solana_sdk::{ epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, @@ -825,21 +825,9 @@ mod test { 0 )); - // with a bad header size - let mut bad_header_shred = shreds[0].clone(); - bad_header_shred.data_header.size = (bad_header_shred.payload.len() + 1) as u16; - assert!(!should_retransmit_and_persist( - &bad_header_shred, - Some(bank.clone()), - &cache, - &me_id, - 0, - 0 - )); - // with an invalid index, shred gets thrown out let mut bad_index_shred = shreds[0].clone(); - bad_index_shred.common_header.index = (MAX_DATA_SHREDS_PER_SLOT + 1) as u32; + bad_index_shred.set_index((MAX_DATA_SHREDS_PER_SLOT + 1) as u32); assert!(!should_retransmit_and_persist( &bad_index_shred, Some(bank.clone()), @@ -875,7 +863,7 @@ mod test { )); // coding shreds don't contain parent slot information, test that slot >= root - let (common, coding) = Shred::new_coding_shred_header( + let mut coding_shred = Shred::new_empty_coding( 5, // slot 5, // index 5, // fec_set_index @@ -884,8 +872,6 @@ mod test { 3, // position 0, // version ); - let mut coding_shred = - Shred::new_empty_from_header(common, DataShredHeader::default(), coding); coding_shred.sign(&leader_keypair); // shred.slot() > root, shred continues assert!(should_retransmit_and_persist( @@ -959,7 +945,7 @@ mod test { std::net::{IpAddr, Ipv4Addr}, }; solana_logger::setup(); - let (common, coding) = Shred::new_coding_shred_header( + let shred = Shred::new_empty_coding( 5, // slot 5, // index 5, // fec_set_index @@ -968,7 +954,6 @@ mod test { 4, // position 0, // version ); - let shred = Shred::new_empty_from_header(common, DataShredHeader::default(), coding); let mut shreds = vec![shred.clone(), shred.clone(), shred]; let _from_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); let repair_meta = RepairMeta { diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 426829e89d..e70d1b6d34 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -3343,9 +3343,12 @@ mod tests { let keypair = Keypair::new(); let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); - let next_shred_index = rng.gen(); + let next_shred_index = rng.gen_range(0, 32_000); let shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); - let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader).payload; + let other_payload = { + let other_shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); + other_shred.into_payload() + }; let leader_schedule = |s| { if s == slot { Some(leader.pubkey()) diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 9a8d0437dc..33c81f5fb6 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -91,7 +91,7 @@ fn check_shreds( Err(Error::ShredIndexMismatch) } else if shred1.shred_type() != shred2.shred_type() { Err(Error::ShredTypeMismatch) - } else if shred1.payload == shred2.payload { + } else if shred1.payload() == shred2.payload() { Err(Error::InvalidDuplicateShreds) } else { if let Some(leader_schedule) = leader_schedule { @@ -152,14 +152,14 @@ pub(crate) fn from_shred( wallclock: u64, max_size: usize, // Maximum serialized size of each DuplicateShred. ) -> Result, Error> { - if shred.payload == other_payload { + if shred.payload() == &other_payload { return Err(Error::InvalidDuplicateShreds); } let other_shred = Shred::new_from_serialized_shred(other_payload.clone())?; check_shreds(leader_schedule, &shred, &other_shred)?; let (slot, shred_index, shred_type) = (shred.slot(), shred.index(), shred.shred_type()); let proof = DuplicateSlotProof { - shred1: shred.payload, + shred1: shred.into_payload(), shred2: other_payload, }; let data = bincode::serialize(&proof)?; @@ -259,7 +259,7 @@ pub fn into_shreds( Err(Error::ShredIndexMismatch) } else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type { Err(Error::ShredTypeMismatch) - } else if shred1.payload == shred2.payload { + } else if shred1.payload() == shred2.payload() { Err(Error::InvalidDuplicateShreds) } else if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) { Err(Error::InvalidSignature) @@ -352,7 +352,7 @@ pub(crate) mod tests { let leader = Arc::new(Keypair::new()); let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); - let next_shred_index = rng.gen(); + let next_shred_index = rng.gen_range(0, 32_000); let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); let leader_schedule = |s| { @@ -365,7 +365,7 @@ pub(crate) mod tests { let chunks: Vec<_> = from_shred( shred1.clone(), Pubkey::new_unique(), // self_pubkey - shred2.payload.clone(), + shred2.payload().clone(), Some(leader_schedule), rng.gen(), // wallclock 512, // max_size diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index ff4194e907..3057ca4a9b 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1120,12 +1120,6 @@ impl Blockstore { ) } - fn erasure_mismatch(shred1: &Shred, shred2: &Shred) -> bool { - shred1.coding_header.num_coding_shreds != shred2.coding_header.num_coding_shreds - || shred1.coding_header.num_data_shreds != shred2.coding_header.num_data_shreds - || shred1.first_coding_index() != shred2.first_coding_index() - } - #[allow(clippy::too_many_arguments)] fn check_insert_coding_shred( &self, @@ -1186,7 +1180,11 @@ impl Blockstore { ); if let Some(conflicting_shred) = conflicting_shred { if self - .store_duplicate_if_not_existing(slot, conflicting_shred, shred.payload.clone()) + .store_duplicate_if_not_existing( + slot, + conflicting_shred, + shred.payload().clone(), + ) .is_err() { warn!("bad duplicate store.."); @@ -1198,10 +1196,15 @@ impl Blockstore { // ToDo: This is a potential slashing condition warn!("Received multiple erasure configs for the same erasure set!!!"); warn!( - "Slot: {}, shred index: {}, erasure_set: {:?}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}", - slot, shred.index(), erasure_set, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header, + "Slot: {}, shred index: {}, erasure_set: {:?}, \ + is_duplicate: {}, stored config: {:#?}, new shred: {:#?}", + slot, + shred.index(), + erasure_set, + self.has_duplicate_shreds_in_slot(slot), + erasure_meta.config(), + shred, ); - return false; } @@ -1239,15 +1242,15 @@ impl Blockstore { let maybe_shred = self.get_coding_shred(slot, coding_index); if let Ok(Some(shred_data)) = maybe_shred { let potential_shred = Shred::new_from_serialized_shred(shred_data).unwrap(); - if Self::erasure_mismatch(&potential_shred, shred) { - return Some(potential_shred.payload); + if shred.erasure_mismatch(&potential_shred).unwrap() { + return Some(potential_shred.into_payload()); } } else if let Some(potential_shred) = { let key = ShredId::new(slot, u32::try_from(coding_index).unwrap(), ShredType::Code); just_received_shreds.get(&key) } { - if Self::erasure_mismatch(potential_shred, shred) { - return Some(potential_shred.payload.clone()); + if shred.erasure_mismatch(potential_shred).unwrap() { + return Some(potential_shred.payload().clone()); } } } @@ -1397,7 +1400,7 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::((slot, shred_index), &shred.payload)?; + write_batch.put_bytes::((slot, shred_index), shred.payload())?; index_meta.coding_mut().insert(shred_index); Ok(()) @@ -1417,7 +1420,7 @@ impl Blockstore { ) -> Cow<'a, Vec> { let key = ShredId::new(slot, u32::try_from(index).unwrap(), ShredType::Data); if let Some(shred) = just_inserted_shreds.get(&key) { - Cow::Borrowed(&shred.payload) + Cow::Borrowed(shred.payload()) } else { // If it doesn't exist in the just inserted set, it must exist in // the backing store @@ -1442,8 +1445,7 @@ impl Blockstore { } else { false }; - - if shred.data_header.size == 0 { + if !shred.sanitize() { let leader_pubkey = leader_schedule .and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None)); @@ -1452,32 +1454,14 @@ impl Blockstore { ( "error", format!( - "Leader {:?}, slot {}: received index {} is empty", - leader_pubkey, slot, shred_index, + "Leader {:?}, slot {}: received invalid shred", + leader_pubkey, slot, ), String ) ); return false; } - if shred.payload.len() > SHRED_PAYLOAD_SIZE { - let leader_pubkey = leader_schedule - .and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None)); - - datapoint_error!( - "blockstore_error", - ( - "error", - format!( - "Leader {:?}, slot {}: received index {} shred.payload.len() > SHRED_PAYLOAD_SIZE", - leader_pubkey, slot, shred_index, - ), - String - ) - ); - return false; - } - // Check that we do not receive shred_index >= than the last_index // for the slot let last_index = slot_meta.last_index; @@ -1495,7 +1479,7 @@ impl Blockstore { .store_duplicate_if_not_existing( slot, ending_shred.into_owned(), - shred.payload.clone(), + shred.payload().clone(), ) .is_err() { @@ -1531,7 +1515,7 @@ impl Blockstore { .store_duplicate_if_not_existing( slot, ending_shred.into_owned(), - shred.payload.clone(), + shred.payload().clone(), ) .is_err() { @@ -1616,12 +1600,7 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. - write_batch.put_bytes::( - (slot, index), - // Payload will be padded out to SHRED_PAYLOAD_SIZE - // But only need to store the bytes within data_header.size - &shred.payload[..shred.data_header.size as usize], - )?; + write_batch.put_bytes::((slot, index), shred.bytes_to_store())?; data_index.insert(index); let newly_completed_data_sets = update_slot_meta( last_in_slot, @@ -3132,7 +3111,7 @@ impl Blockstore { let size = payload.len().max(SHRED_PAYLOAD_SIZE); payload.resize(size, 0u8); let new_shred = Shred::new_from_serialized_shred(payload).unwrap(); - (existing_shred != new_shred.payload).then(|| existing_shred) + (existing_shred != *new_shred.payload()).then(|| existing_shred) } pub fn has_duplicate_shreds_in_slot(&self, slot: Slot) -> bool { @@ -4294,7 +4273,7 @@ pub mod tests { blockstore_db::BlockstoreRocksFifoOptions, genesis_utils::{create_genesis_config, GenesisConfigInfo}, leader_schedule::{FixedSchedule, LeaderSchedule}, - shred::{max_ticks_per_n_shreds, DataShredHeader}, + shred::max_ticks_per_n_shreds, }, assert_matches::assert_matches, bincode::serialize, @@ -4577,7 +4556,7 @@ pub mod tests { let slot = 0; let (shreds, _) = make_slot_entries(slot, 0, 100); let num_shreds = shreds.len() as u64; - let shred_bufs: Vec<_> = shreds.iter().map(|shred| shred.payload.clone()).collect(); + let shred_bufs: Vec<_> = shreds.iter().map(Shred::payload).cloned().collect(); let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); @@ -5846,31 +5825,23 @@ pub mod tests { .unwrap(); let slot_meta = blockstore.meta(0).unwrap().unwrap(); - // Corrupt shred by making it too large - let mut shred5 = shreds[5].clone(); - shred5.payload.push(10); - shred5.data_header.size = shred5.payload.len() as u16; - assert!(!blockstore.should_insert_data_shred( - &shred5, - &slot_meta, - &HashMap::new(), - &last_root, - None, - ShredSource::Turbine - )); + let shred5 = shreds[5].clone(); // Ensure that an empty shred (one with no data) would get inserted. Such shreds // may be used as signals (broadcast does so to indicate a slot was interrupted) // Reuse shred5's header values to avoid a false negative result - let mut empty_shred = Shred::new_from_data( - shred5.common_header.slot, - shred5.common_header.index, - shred5.data_header.parent_offset, + let empty_shred = Shred::new_from_data( + shred5.slot(), + shred5.index(), + { + let parent_offset = shred5.slot() - shred5.parent().unwrap(); + parent_offset as u16 + }, None, // data true, // is_last_data true, // is_last_in_slot 0, // reference_tick - shred5.common_header.version, + shred5.version(), shred5.fec_set_index(), ); assert!(blockstore.should_insert_data_shred( @@ -5881,16 +5852,6 @@ pub mod tests { None, ShredSource::Repaired, )); - empty_shred.data_header.size = 0; - assert!(!blockstore.should_insert_data_shred( - &empty_shred, - &slot_meta, - &HashMap::new(), - &last_root, - None, - ShredSource::Recovered, - )); - // Trying to insert another "is_last" shred with index < the received index should fail // skip over shred 7 blockstore @@ -5977,7 +5938,7 @@ pub mod tests { let blockstore = Blockstore::open(ledger_path.path()).unwrap(); let slot = 1; - let (shred, coding) = Shred::new_coding_shred_header( + let coding_shred = Shred::new_empty_coding( slot, 11, // index 11, // fec_set_index 11, // num_data_shreds @@ -5985,7 +5946,6 @@ pub mod tests { 8, // position 0, // version ); - let coding_shred = Shred::new_empty_from_header(shred, DataShredHeader::default(), coding); let mut erasure_metas = HashMap::new(); let mut index_working_set = HashMap::new(); @@ -6034,7 +5994,7 @@ pub mod tests { let last_root = RwLock::new(0); let slot = 1; - let (mut shred, coding) = Shred::new_coding_shred_header( + let mut coding_shred = Shred::new_empty_coding( slot, 11, // index 11, // fec_set_index 11, // num_data_shreds @@ -6042,8 +6002,6 @@ pub mod tests { 8, // position 0, // version ); - let coding_shred = - Shred::new_empty_from_header(shred.clone(), DataShredHeader::default(), coding.clone()); // Insert a good coding shred assert!(Blockstore::should_insert_coding_shred( @@ -6065,28 +6023,16 @@ pub mod tests { )); } - shred.index += 1; - // Establish a baseline that works - { - let coding_shred = Shred::new_empty_from_header( - shred.clone(), - DataShredHeader::default(), - coding.clone(), - ); - assert!(Blockstore::should_insert_coding_shred( - &coding_shred, - &last_root - )); - } + coding_shred.set_index(coding_shred.index() + 1); + assert!(Blockstore::should_insert_coding_shred( + &coding_shred, + &last_root + )); // Trying to insert a shred with index < position should fail { - let mut coding_shred = Shred::new_empty_from_header( - shred.clone(), - DataShredHeader::default(), - coding.clone(), - ); + let mut coding_shred = coding_shred.clone(); let index = coding_shred.index() - coding_shred.fec_set_index() - 1; coding_shred.set_index(index as u32); @@ -6096,76 +6042,9 @@ pub mod tests { )); } - // Trying to insert shred with num_coding == 0 should fail - { - let mut coding_shred = Shred::new_empty_from_header( - shred.clone(), - DataShredHeader::default(), - coding.clone(), - ); - coding_shred.coding_header.num_coding_shreds = 0; - assert!(!Blockstore::should_insert_coding_shred( - &coding_shred, - &last_root - )); - } - - // Trying to insert shred with pos >= num_coding should fail - { - let mut coding_shred = Shred::new_empty_from_header( - shred.clone(), - DataShredHeader::default(), - coding.clone(), - ); - let num_coding_shreds = coding_shred.index() - coding_shred.fec_set_index(); - coding_shred.coding_header.num_coding_shreds = num_coding_shreds as u16; - assert!(!Blockstore::should_insert_coding_shred( - &coding_shred, - &last_root - )); - } - - // Trying to insert with set_index with num_coding that would imply the last shred - // has index > u32::MAX should fail - { - let mut coding_shred = Shred::new_empty_from_header( - shred.clone(), - DataShredHeader::default(), - coding.clone(), - ); - coding_shred.common_header.fec_set_index = std::u32::MAX - 1; - coding_shred.coding_header.num_data_shreds = 2; - coding_shred.coding_header.num_coding_shreds = 4; - coding_shred.coding_header.position = 1; - coding_shred.common_header.index = std::u32::MAX - 1; - assert!(!Blockstore::should_insert_coding_shred( - &coding_shred, - &last_root - )); - - coding_shred.coding_header.num_coding_shreds = 2000; - assert!(!Blockstore::should_insert_coding_shred( - &coding_shred, - &last_root - )); - - // Decreasing the number of num_coding_shreds will put it within the allowed limit - coding_shred.coding_header.num_coding_shreds = 2; - assert!(Blockstore::should_insert_coding_shred( - &coding_shred, - &last_root - )); - - // Insertion should succeed - blockstore - .insert_shreds(vec![coding_shred], None, false) - .unwrap(); - } - // Trying to insert value into slot <= than last root should fail { - let mut coding_shred = - Shred::new_empty_from_header(shred, DataShredHeader::default(), coding); + let mut coding_shred = coding_shred.clone(); coding_shred.set_slot(*last_root.read().unwrap()); assert!(!Blockstore::should_insert_coding_shred( &coding_shred, @@ -8350,10 +8229,7 @@ pub mod tests { blockstore .insert_shreds(coding_shreds, Some(&leader_schedule_cache), false) .unwrap(); - let shred_bufs: Vec<_> = data_shreds - .iter() - .map(|shred| shred.payload.clone()) - .collect(); + let shred_bufs: Vec<_> = data_shreds.iter().map(Shred::payload).cloned().collect(); // Check all the data shreds were recovered for (s, buf) in data_shreds.iter().zip(shred_bufs) { @@ -8606,20 +8482,24 @@ pub mod tests { assert_eq!( blockstore.is_shred_duplicate( ShredId::new(slot, /*index:*/ 0, duplicate_shred.shred_type()), - duplicate_shred.payload.clone(), + duplicate_shred.payload().clone(), ), - Some(shred.payload.to_vec()) + Some(shred.payload().clone()) ); assert!(blockstore .is_shred_duplicate( ShredId::new(slot, /*index:*/ 0, non_duplicate_shred.shred_type()), - non_duplicate_shred.payload, + non_duplicate_shred.into_payload(), ) .is_none()); // Store a duplicate shred blockstore - .store_duplicate_slot(slot, shred.payload.clone(), duplicate_shred.payload.clone()) + .store_duplicate_slot( + slot, + shred.payload().clone(), + duplicate_shred.payload().clone(), + ) .unwrap(); // Slot is now marked as duplicate @@ -8627,8 +8507,8 @@ pub mod tests { // Check ability to fetch the duplicates let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap(); - assert_eq!(duplicate_proof.shred1, shred.payload); - assert_eq!(duplicate_proof.shred2, duplicate_shred.payload); + assert_eq!(duplicate_proof.shred1, *shred.payload()); + assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload()); } #[test] @@ -8926,30 +8806,6 @@ pub mod tests { assert!(blockstore.has_duplicate_shreds_in_slot(slot)); } - #[test] - fn test_large_num_coding() { - solana_logger::setup(); - let slot = 1; - let (_data_shreds, mut coding_shreds, leader_schedule_cache) = - setup_erasure_shreds(slot, 0, 100); - - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - - coding_shreds[1].coding_header.num_coding_shreds = u16::MAX; - blockstore - .insert_shreds( - vec![coding_shreds[1].clone()], - Some(&leader_schedule_cache), - false, - ) - .unwrap(); - - // Check no coding shreds are inserted - let res = blockstore.get_coding_shreds_for_slot(slot, 0).unwrap(); - assert!(res.is_empty()); - } - #[test] pub fn test_insert_data_shreds_same_slot_last_index() { let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -9074,7 +8930,7 @@ pub mod tests { if i <= smaller_last_shred_index as u64 { assert_eq!( blockstore.get_data_shred(slot, i).unwrap().unwrap(), - shreds[i as usize].payload + *shreds[i as usize].payload() ); } else { assert!(blockstore.get_data_shred(slot, i).unwrap().is_none()); @@ -9087,11 +8943,14 @@ pub mod tests { // Case 2: Inserting a duplicate with an even smaller last shred index should not // mark the slot as dead since the Slotmeta is full. - let mut even_smaller_last_shred_duplicate = shreds[smaller_last_shred_index - 1].clone(); - even_smaller_last_shred_duplicate.set_last_in_slot(); - // Flip a byte to create a duplicate shred - even_smaller_last_shred_duplicate.payload[0] = - std::u8::MAX - even_smaller_last_shred_duplicate.payload[0]; + let even_smaller_last_shred_duplicate = { + let mut payload = shreds[smaller_last_shred_index - 1].payload().clone(); + // Flip a byte to create a duplicate shred + payload[0] = std::u8::MAX - payload[0]; + let mut shred = Shred::new_from_serialized_shred(payload).unwrap(); + shred.set_last_in_slot(); + shred + }; assert!(blockstore .is_shred_duplicate( ShredId::new( @@ -9099,7 +8958,7 @@ pub mod tests { even_smaller_last_shred_duplicate.index(), ShredType::Data ), - even_smaller_last_shred_duplicate.payload.clone(), + even_smaller_last_shred_duplicate.payload().clone(), ) .is_some()); blockstore @@ -9110,7 +8969,7 @@ pub mod tests { if i <= smaller_last_shred_index as u64 { assert_eq!( blockstore.get_data_shred(slot, i).unwrap().unwrap(), - shreds[i as usize].payload + *shreds[i as usize].payload() ); } else { assert!(blockstore.get_data_shred(slot, i).unwrap().is_none()); @@ -9146,7 +9005,7 @@ pub mod tests { .get_data_shred(slot, shred_index) .unwrap() .unwrap(), - shred_to_check.payload + *shred_to_check.payload() ); } else { assert!(blockstore @@ -9177,7 +9036,7 @@ pub mod tests { .get_data_shred(slot, shred_index) .unwrap() .unwrap(), - shred_to_check.payload + *shred_to_check.payload() ); } else { assert!(blockstore diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 7544c7b24f..41b6ffb179 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -234,8 +234,8 @@ impl ErasureMeta { ShredType::Data => None, ShredType::Code => { let config = ErasureConfig::new( - usize::from(shred.coding_header.num_data_shreds), - usize::from(shred.coding_header.num_coding_shreds), + usize::from(shred.num_data_shreds().ok()?), + usize::from(shred.num_coding_shreds().ok()?), ); let first_coding_index = u64::from(shred.first_coding_index()?); let erasure_meta = ErasureMeta { diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 202ef2ef8b..c6ccc52abc 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -67,7 +67,7 @@ use { pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }, - std::{cell::RefCell, mem::size_of}, + std::{cell::RefCell, mem::size_of, ops::RangeInclusive}, thiserror::Error, }; @@ -76,25 +76,32 @@ pub type Nonce = u32; /// The following constants are computed by hand, and hardcoded. /// `test_shred_constants` ensures that the values are correct. /// Constants are used over lazy_static for performance reasons. -pub const SIZE_OF_COMMON_SHRED_HEADER: usize = 83; -pub const SIZE_OF_DATA_SHRED_HEADER: usize = 5; -pub const SIZE_OF_CODING_SHRED_HEADER: usize = 6; -pub const SIZE_OF_SIGNATURE: usize = 64; -pub const SIZE_OF_SHRED_TYPE: usize = 1; -pub const SIZE_OF_SHRED_SLOT: usize = 8; -pub const SIZE_OF_SHRED_INDEX: usize = 4; +const SIZE_OF_COMMON_SHRED_HEADER: usize = 83; +const SIZE_OF_DATA_SHRED_HEADER: usize = 5; +const SIZE_OF_CODING_SHRED_HEADER: usize = 6; +const SIZE_OF_SIGNATURE: usize = 64; +const SIZE_OF_SHRED_TYPE: usize = 1; +const SIZE_OF_SHRED_SLOT: usize = 8; +const SIZE_OF_SHRED_INDEX: usize = 4; pub const SIZE_OF_NONCE: usize = 4; -pub const SIZE_OF_CODING_SHRED_HEADERS: usize = +const SIZE_OF_CODING_SHRED_HEADERS: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER; pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE - SIZE_OF_COMMON_SHRED_HEADER - SIZE_OF_DATA_SHRED_HEADER - SIZE_OF_CODING_SHRED_HEADERS - SIZE_OF_NONCE; +const SHRED_DATA_OFFSET: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER; +// DataShredHeader.size is sum of common-shred-header, data-shred-header and +// data.len(). Broadcast stage may create zero length data shreds when the +// previous slot was interrupted: +// https://github.com/solana-labs/solana/blob/2d4defa47/core/src/broadcast_stage/standard_broadcast_run.rs#L79 +const DATA_SHRED_SIZE_RANGE: RangeInclusive = + SHRED_DATA_OFFSET..=SHRED_DATA_OFFSET + SIZE_OF_DATA_SHRED_PAYLOAD; -pub const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE; -pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE; -pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; +const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE; +const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE; +const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; pub const SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() @@ -107,7 +114,7 @@ pub const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 32; pub const SHRED_TICK_REFERENCE_MASK: u8 = 0b0011_1111; const LAST_SHRED_IN_SLOT: u8 = 0b1000_0000; -pub const DATA_COMPLETE_SHRED: u8 = 0b0100_0000; +const DATA_COMPLETE_SHRED: u8 = 0b0100_0000; #[derive(Error, Debug)] pub enum ShredError { @@ -163,37 +170,37 @@ impl Default for ShredType { /// A common header that is present in data and code shred headers #[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] -pub struct ShredCommonHeader { - pub signature: Signature, - pub shred_type: ShredType, - pub slot: Slot, - pub index: u32, - pub version: u16, - pub fec_set_index: u32, +struct ShredCommonHeader { + signature: Signature, + shred_type: ShredType, + slot: Slot, + index: u32, + version: u16, + fec_set_index: u32, } /// The data shred header has parent offset and flags #[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)] -pub struct DataShredHeader { - pub parent_offset: u16, - pub flags: u8, - pub size: u16, +struct DataShredHeader { + parent_offset: u16, + flags: u8, + size: u16, // common shred header + data shred header + data } /// The coding shred header has FEC information #[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)] -pub struct CodingShredHeader { - pub num_data_shreds: u16, - pub num_coding_shreds: u16, - pub position: u16, +struct CodingShredHeader { + num_data_shreds: u16, + num_coding_shreds: u16, + position: u16, } #[derive(Clone, Debug, PartialEq)] pub struct Shred { - pub common_header: ShredCommonHeader, - pub data_header: DataShredHeader, - pub coding_header: CodingShredHeader, - pub payload: Vec, + common_header: ShredCommonHeader, + data_header: DataShredHeader, + coding_header: CodingShredHeader, + payload: Vec, } /// Tuple which uniquely identifies a shred should it exists. @@ -260,6 +267,7 @@ impl Shred { packet.meta.size = len; } + // TODO: Should this sanitize output? pub fn new_from_data( slot: Slot, index: u32, @@ -314,7 +322,7 @@ impl Shred { &data_header, ) .expect("Failed to write data header into shred buffer"); - + // TODO: Need to check if data is too large! if let Some(data) = data { payload[start..start + data.len()].clone_from_slice(data); } @@ -354,13 +362,14 @@ impl Shred { coding_header, payload, }; + // TODO: Should return why sanitize failed. shred .sanitize() .then(|| shred) .ok_or(ShredError::InvalidPayload) } - pub fn new_coding_shred_header( + pub fn new_empty_coding( slot: Slot, index: u32, fec_set_index: u32, @@ -368,7 +377,7 @@ impl Shred { num_coding_shreds: u16, position: u16, version: u16, - ) -> (ShredCommonHeader, CodingShredHeader) { + ) -> Self { let header = ShredCommonHeader { shred_type: ShredType::Code, index, @@ -377,38 +386,15 @@ impl Shred { fec_set_index, ..ShredCommonHeader::default() }; - ( - header, - CodingShredHeader { - num_data_shreds, - num_coding_shreds, - position, - }, - ) - } - - pub fn new_empty_coding( - slot: Slot, - index: u32, - fec_set_index: u32, - num_data: u16, - num_code: u16, - position: u16, - version: u16, - ) -> Self { - let (header, coding_header) = Self::new_coding_shred_header( - slot, - index, - fec_set_index, - num_data, - num_code, + let coding_header = CodingShredHeader { + num_data_shreds, + num_coding_shreds, position, - version, - ); + }; Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header) } - pub fn new_empty_from_header( + fn new_empty_from_header( common_header: ShredCommonHeader, data_header: DataShredHeader, coding_header: CodingShredHeader, @@ -488,7 +474,29 @@ impl Shred { self.common_header.index } - pub(crate) fn fec_set_index(&self) -> u32 { + #[inline] + pub fn payload(&self) -> &Vec { + &self.payload + } + + // Possibly trimmed payload; + // Should only be used when storing shreds to blockstore. + pub(crate) fn bytes_to_store(&self) -> &[u8] { + match self.shred_type() { + ShredType::Code => &self.payload, + ShredType::Data => { + // Payload will be padded out to SHRED_PAYLOAD_SIZE. + // But only need to store the bytes within data_header.size. + &self.payload[..self.data_header.size as usize] + } + } + } + + pub fn into_payload(self) -> Vec { + self.payload + } + + pub fn fec_set_index(&self) -> u32 { self.common_header.fec_set_index } @@ -503,12 +511,17 @@ impl Shred { } // Returns true if the shred passes sanity checks. - pub(crate) fn sanitize(&self) -> bool { - self.erasure_block_index().is_some() + // TODO: Should return why sanitize failed! + pub fn sanitize(&self) -> bool { + self.payload.len() <= SHRED_PAYLOAD_SIZE + && self.erasure_block_index().is_some() && match self.shred_type() { ShredType::Data => { - self.parent().is_ok() - && usize::from(self.data_header.size) <= self.payload.len() + let size = usize::from(self.data_header.size); + self.index() < MAX_DATA_SHREDS_PER_SLOT as u32 + && self.parent().is_ok() + && size <= self.payload.len() + && DATA_SHRED_SIZE_RANGE.contains(&size) } ShredType::Code => { u32::from(self.coding_header.num_coding_shreds) @@ -645,7 +658,7 @@ impl Shred { } #[cfg(test)] - pub fn unset_data_complete(&mut self) { + pub(crate) fn unset_data_complete(&mut self) { if self.is_data() { self.data_header.flags &= !DATA_COMPLETE_SHRED; } @@ -670,7 +683,7 @@ impl Shred { } } - pub fn reference_tick(&self) -> u8 { + pub(crate) fn reference_tick(&self) -> u8 { if self.is_data() { self.data_header.flags & SHRED_TICK_REFERENCE_MASK } else { @@ -690,7 +703,7 @@ impl Shred { limited_deserialize::(&p.data[slot_start..slot_end]).ok() } - pub fn reference_tick_from_data(data: &[u8]) -> u8 { + pub(crate) fn reference_tick_from_data(data: &[u8]) -> u8 { let flags = data[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER - size_of::() - size_of::()]; @@ -701,12 +714,43 @@ impl Shred { self.signature() .verify(pubkey.as_ref(), &self.payload[SIZE_OF_SIGNATURE..]) } + + // Returns true if the erasure coding of the two shreds mismatch. + pub(crate) fn erasure_mismatch(self: &Shred, other: &Shred) -> Result { + match (self.shred_type(), other.shred_type()) { + (ShredType::Code, ShredType::Code) => { + let CodingShredHeader { + num_data_shreds, + num_coding_shreds, + position: _, + } = self.coding_header; + Ok(num_coding_shreds != other.coding_header.num_coding_shreds + || num_data_shreds != other.coding_header.num_data_shreds + || self.first_coding_index() != other.first_coding_index()) + } + _ => Err(ShredError::InvalidShredType), + } + } + + pub(crate) fn num_data_shreds(self: &Shred) -> Result { + match self.shred_type() { + ShredType::Data => Err(ShredError::InvalidShredType), + ShredType::Code => Ok(self.coding_header.num_data_shreds), + } + } + + pub(crate) fn num_coding_shreds(self: &Shred) -> Result { + match self.shred_type() { + ShredType::Data => Err(ShredError::InvalidShredType), + ShredType::Code => Ok(self.coding_header.num_coding_shreds), + } + } } #[derive(Debug)] pub struct Shredder { - pub slot: Slot, - pub parent_slot: Slot, + slot: Slot, + parent_slot: Slot, version: u16, reference_tick: u8, } @@ -1008,7 +1052,6 @@ impl Shredder { /// Combines all shreds to recreate the original buffer pub fn deshred(shreds: &[Shred]) -> std::result::Result, reed_solomon_erasure::Error> { use reed_solomon_erasure::Error::TooFewDataShards; - const SHRED_DATA_OFFSET: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER; Self::verify_consistent_shred_payload_sizes("deshred()", shreds)?; let index = shreds.first().ok_or(TooFewDataShards)?.index(); let aligned = shreds.iter().zip(index..).all(|(s, i)| s.index() == i); @@ -1160,7 +1203,7 @@ pub fn verify_test_data_shred( } #[cfg(test)] -pub mod tests { +mod tests { use { super::*, bincode::serialized_size, @@ -1985,7 +2028,7 @@ pub mod tests { assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); assert_eq!(1, stats.index_out_of_bounds); - let (header, coding_header) = Shred::new_coding_shred_header( + let shred = Shred::new_empty_coding( 8, // slot 2, // index 10, // fec_set_index @@ -1994,7 +2037,6 @@ pub mod tests { 3, // position 200, // version ); - let shred = Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header); shred.copy_to_packet(&mut packet); packet.data[OFFSET_OF_SHRED_TYPE] = u8::MAX; @@ -2029,4 +2071,97 @@ pub mod tests { Ok(ShredType::Code) ); } + + #[test] + fn test_sanitize_data_shred() { + let data = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD]; + let mut shred = Shred::new_from_data( + 420, // slot + 19, // index + 5, // parent_offset + Some(&data), + true, // is_last_data + false, // is_last_in_slot + 3, // reference_tick + 1, // version + 16, // fec_set_index + ); + assert!(shred.sanitize()); + // Corrupt shred by making it too large + { + let mut shred = shred.clone(); + shred.payload.push(10u8); + assert!(!shred.sanitize()); + } + { + let mut shred = shred.clone(); + shred.data_header.size += 1; + assert!(!shred.sanitize()); + } + { + let mut shred = shred.clone(); + shred.data_header.size = 0; + assert!(!shred.sanitize()); + } + { + shred.data_header.size = shred.payload().len() as u16 + 1; + assert!(!shred.sanitize()); + } + } + + #[test] + fn test_sanitize_coding_shred() { + let mut shred = Shred::new_empty_coding( + 1, // slot + 12, // index + 11, // fec_set_index + 11, // num_data_shreds + 11, // num_coding_shreds + 8, // position + 0, // version + ); + assert!(shred.sanitize()); + // index < position is invalid. + { + let mut shred = shred.clone(); + let index = shred.index() - shred.fec_set_index() - 1; + shred.set_index(index as u32); + assert!(!shred.sanitize()); + } + { + let mut shred = shred.clone(); + shred.coding_header.num_coding_shreds = 0; + assert!(!shred.sanitize()); + } + // pos >= num_coding is invalid. + { + let mut shred = shred.clone(); + let num_coding_shreds = shred.index() - shred.fec_set_index(); + shred.coding_header.num_coding_shreds = num_coding_shreds as u16; + assert!(!shred.sanitize()); + } + // set_index with num_coding that would imply the last + // shred has index > u32::MAX should fail. + { + let mut shred = shred.clone(); + shred.common_header.fec_set_index = std::u32::MAX - 1; + shred.coding_header.num_data_shreds = 2; + shred.coding_header.num_coding_shreds = 4; + shred.coding_header.position = 1; + shred.common_header.index = std::u32::MAX - 1; + assert!(!shred.sanitize()); + + shred.coding_header.num_coding_shreds = 2000; + assert!(!shred.sanitize()); + + // Decreasing the number of num_coding_shreds will put it within + // the allowed limit. + shred.coding_header.num_coding_shreds = 2; + assert!(shred.sanitize()); + } + { + shred.coding_header.num_coding_shreds = u16::MAX; + assert!(!shred.sanitize()); + } + } } diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 7bd7a5b727..e5c9dabbe9 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -480,9 +480,9 @@ pub mod tests { assert_eq!(shred.slot(), slot); let keypair = Keypair::new(); shred.sign(&keypair); - trace!("signature {}", shred.common_header.signature); - packet.data[0..shred.payload.len()].copy_from_slice(&shred.payload); - packet.meta.size = shred.payload.len(); + trace!("signature {}", shred.signature()); + packet.data[0..shred.payload().len()].copy_from_slice(shred.payload()); + packet.meta.size = shred.payload().len(); let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() @@ -526,8 +526,8 @@ pub mod tests { let keypair = Keypair::new(); shred.sign(&keypair); batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0].packets[0].meta.size = shred.payload().len(); let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() @@ -581,8 +581,8 @@ pub mod tests { let keypair = Keypair::new(); shred.sign(&keypair); batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0].packets[0].meta.size = shred.payload().len(); let leader_slots = [ (std::u64::MAX, Pubkey::default().to_bytes()), @@ -693,8 +693,8 @@ pub mod tests { 0xc0de, ); batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batches[0].packets[0].meta.size = shred.payload.len(); + batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0].packets[0].meta.size = shred.payload().len(); let pubkeys = [ (slot, keypair.pubkey().to_bytes()), (std::u64::MAX, Pubkey::default().to_bytes()), diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index 7a1cf976d8..cdc25189e7 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -163,7 +163,7 @@ fn sort_data_coding_into_fec_sets( assert!(!data_slot_and_index.contains(&key)); data_slot_and_index.insert(key); let fec_entry = fec_data - .entry(shred.common_header.fec_set_index) + .entry(shred.fec_set_index()) .or_insert_with(Vec::new); fec_entry.push(shred); } @@ -174,7 +174,7 @@ fn sort_data_coding_into_fec_sets( assert!(!coding_slot_and_index.contains(&key)); coding_slot_and_index.insert(key); let fec_entry = fec_coding - .entry(shred.common_header.fec_set_index) + .entry(shred.fec_set_index()) .or_insert_with(Vec::new); fec_entry.push(shred); }