hides implementation details of shred from its public interface (#24563)

Working towards embedding versioning into shreds binary, so that a new
variant of shred struct can include merkle tree hashes of the erasure
set.
This commit is contained in:
behzad nouri 2022-04-25 12:43:22 +00:00 committed by GitHub
parent 758fcd383d
commit 895f76a93c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 365 additions and 389 deletions

View File

@ -37,7 +37,7 @@ fn get_retransmit_peers_deterministic(
num_simulated_shreds: usize,
) {
for i in 0..num_simulated_shreds {
shred.common_header.index = i as u32;
shred.set_index(i as u32);
let (_neighbors, _children) = cluster_nodes.get_retransmit_peers(
*slot_leader,
shred,
@ -55,7 +55,7 @@ fn get_retransmit_peers_deterministic_wrapper(b: &mut Bencher, unstaked_ratio: O
let slot_leader = nodes[1..].choose(&mut rng).unwrap().id;
let slot = rand::random::<u64>();
let mut shred = Shred::new_empty_data_shred();
shred.common_header.slot = slot;
shred.set_slot(slot);
b.iter(|| {
get_retransmit_peers_deterministic(
&cluster_nodes,

View File

@ -9,14 +9,21 @@ use {
solana_entry::entry::{create_ticks, Entry},
solana_ledger::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, Shredder,
MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_PAYLOAD_SIZE, SIZE_OF_CODING_SHRED_HEADERS,
SIZE_OF_DATA_SHRED_PAYLOAD,
MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD,
},
solana_perf::test_tx,
solana_sdk::{hash::Hash, signature::Keypair},
solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair},
test::Bencher,
};
// Copied these values here to avoid exposing shreds
// internals only for the sake of benchmarks.
// size of nonce: 4
// size of common shred header: 83
// size of coding shred header: 6
const VALID_SHRED_DATA_LEN: usize = PACKET_DATA_SIZE - 4 - 83 - 6;
fn make_test_entry(txs_per_entry: u64) -> Entry {
Entry {
num_hashes: 100_000,
@ -54,11 +61,10 @@ fn make_shreds(num_shreds: usize) -> Vec<Shred> {
fn make_concatenated_shreds(num_shreds: usize) -> Vec<u8> {
let data_shreds = make_shreds(num_shreds);
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize;
let mut data: Vec<u8> = vec![0; num_shreds * valid_shred_data_len];
let mut data: Vec<u8> = vec![0; num_shreds * VALID_SHRED_DATA_LEN];
for (i, shred) in (data_shreds[0..num_shreds]).iter().enumerate() {
data[i * valid_shred_data_len..(i + 1) * valid_shred_data_len]
.copy_from_slice(&shred.payload[..valid_shred_data_len]);
data[i * VALID_SHRED_DATA_LEN..(i + 1) * VALID_SHRED_DATA_LEN]
.copy_from_slice(&shred.payload()[..VALID_SHRED_DATA_LEN]);
}
data
@ -120,7 +126,7 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) {
let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0, 0, 1);
bencher.iter(|| {
let payload = shred.payload.clone();
let payload = shred.payload().clone();
let _ = Shred::new_from_serialized_shred(payload).unwrap();
})
}
@ -157,9 +163,8 @@ fn bench_shredder_decoding(bencher: &mut Bencher) {
fn bench_shredder_coding_raptorq(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK;
let data = make_concatenated_shreds(symbol_count as usize);
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize;
bencher.iter(|| {
let encoder = Encoder::with_defaults(&data, valid_shred_data_len as u16);
let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16);
encoder.get_encoded_packets(symbol_count);
})
}
@ -168,8 +173,7 @@ fn bench_shredder_coding_raptorq(bencher: &mut Bencher) {
fn bench_shredder_decoding_raptorq(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK;
let data = make_concatenated_shreds(symbol_count as usize);
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize;
let encoder = Encoder::with_defaults(&data, valid_shred_data_len as u16);
let encoder = Encoder::with_defaults(&data, VALID_SHRED_DATA_LEN as u16);
let mut packets = encoder.get_encoded_packets(symbol_count as u32);
packets.shuffle(&mut rand::thread_rng());

View File

@ -424,7 +424,7 @@ pub fn broadcast_shreds(
update_peer_stats(&cluster_nodes, last_datapoint_submit);
let root_bank = root_bank.clone();
shreds.flat_map(move |shred| {
repeat(&shred.payload).zip(cluster_nodes.get_broadcast_addrs(
repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs(
shred,
&root_bank,
DATA_PLANE_FANOUT,

View File

@ -325,13 +325,13 @@ impl BroadcastRun for BroadcastDuplicatesRun {
.filter_map(|pubkey| {
let tvu = cluster_info
.lookup_contact_info(pubkey, |contact_info| contact_info.tvu)?;
Some((&shred.payload, tvu))
Some((shred.payload(), tvu))
})
.collect(),
);
}
Some(vec![(&shred.payload, node.tvu)])
Some(vec![(shred.payload(), node.tvu)])
})
.flatten()
.collect();

View File

@ -132,7 +132,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
if fake == (i <= self.partition) {
// Send fake shreds to the first N peers
data_shreds.iter().for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
sock.send_to(b.payload(), &peer.tvu_forwards).unwrap();
});
}
});

View File

@ -31,7 +31,7 @@ impl PacketHasher {
}
pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 {
self.hash_data(&shred.payload)
self.hash_data(shred.payload())
}
fn hash_data(&self, data: &[u8]) -> u64 {

View File

@ -79,10 +79,10 @@ mod test {
assert_eq!(shred.slot(), slot);
let keypair = Keypair::new();
shred.sign(&keypair);
trace!("signature {}", shred.common_header.signature);
trace!("signature {}", shred.signature());
let nonce = 9;
let mut packet = repair_response_packet_from_bytes(
shred.payload,
shred.into_payload(),
&SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
nonce,
)

View File

@ -3144,10 +3144,7 @@ pub mod tests {
create_new_tmp_ledger,
genesis_utils::{create_genesis_config, create_genesis_config_with_leader},
get_tmp_ledger_path,
shred::{
CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED,
SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD,
},
shred::{Shred, SIZE_OF_DATA_SHRED_PAYLOAD},
},
solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
@ -3163,7 +3160,6 @@ pub mod tests {
genesis_config,
hash::{hash, Hash},
instruction::InstructionError,
packet::PACKET_DATA_SIZE,
poh_config::PohConfig,
signature::{Keypair, Signer},
system_transaction,
@ -3747,26 +3743,19 @@ pub mod tests {
fn test_dead_fork_entry_deserialize_failure() {
// Insert entry that causes deserialization failure
let res = check_dead_fork(|_, bank| {
let gibberish = [0xa5u8; PACKET_DATA_SIZE];
let mut data_header = DataShredHeader::default();
data_header.flags |= DATA_COMPLETE_SHRED;
// Need to provide the right size for Shredder::deshred.
data_header.size = SIZE_OF_DATA_SHRED_PAYLOAD as u16;
data_header.parent_offset = (bank.slot() - bank.parent_slot()) as u16;
let shred_common_header = ShredCommonHeader {
slot: bank.slot(),
..ShredCommonHeader::default()
};
let mut shred = Shred::new_empty_from_header(
shred_common_header,
data_header,
CodingShredHeader::default(),
let gibberish = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD];
let parent_offset = bank.slot() - bank.parent_slot();
let shred = Shred::new_from_data(
bank.slot(),
0, // index,
parent_offset as u16,
Some(&gibberish),
true, // is_last_data
false, // is_last_in_slot
0, // reference_tick
0, // version
0, // fec_set_index
);
bincode::serialize_into(
&mut shred.payload[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER..],
&gibberish[..SIZE_OF_DATA_SHRED_PAYLOAD],
)
.unwrap();
vec![shred]
});

View File

@ -295,7 +295,7 @@ fn retransmit(
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);
let mut retransmit_time = Measure::start("retransmit_to");
let num_nodes = match multi_target_send(socket, &shred.payload, &addrs) {
let num_nodes = match multi_target_send(socket, shred.payload(), &addrs) {
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
stats

View File

@ -1095,11 +1095,12 @@ mod tests {
// Create slots [1, 2] with 1 shred apiece
let (mut shreds, _) = make_many_slot_entries(1, 2, 1);
// Make shred for slot 1 too large
assert_eq!(shreds[0].slot(), 1);
assert_eq!(shreds[0].index(), 0);
shreds[0].payload.push(10);
shreds[0].data_header.size = shreds[0].payload.len() as u16;
// TODO: The test previously relied on corrupting shred payload
// size which we no longer want to expose. Current test no longer
// covers packet size check in repair_response_packet_from_bytes.
shreds.remove(0);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");

View File

@ -94,8 +94,8 @@ pub mod tests {
let keypair = Keypair::new();
shred.sign(&keypair);
batches[0].packets.resize(1, Packet::default());
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[0].meta.size = shred.payload().len();
let mut shred = Shred::new_from_data(
0xc0de_dead,
@ -110,8 +110,8 @@ pub mod tests {
);
shred.sign(&keypair);
batches[1].packets.resize(1, Packet::default());
batches[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[1].packets[0].meta.size = shred.payload.len();
batches[1].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[1].packets[0].meta.size = shred.payload().len();
let expected: HashSet<u64> = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect();
assert_eq!(ShredSigVerifier::read_slots(&batches), expected);
@ -143,8 +143,8 @@ pub mod tests {
0xc0de,
);
shred.sign(&leader_keypair);
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[0].meta.size = shred.payload().len();
let mut shred = Shred::new_from_data(
0,
@ -159,8 +159,8 @@ pub mod tests {
);
let wrong_keypair = Keypair::new();
shred.sign(&wrong_keypair);
batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[1].meta.size = shred.payload.len();
batches[0].packets[1].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[1].meta.size = shred.payload().len();
let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches);
let rv = verifier.verify_batches(batches, num_packets);

View File

@ -193,8 +193,8 @@ pub(crate) fn should_retransmit_and_persist(
} else if shred.index() >= MAX_DATA_SHREDS_PER_SLOT as u32 {
inc_new_counter_warn!("streamer-recv_window-shred_index_overrun", 1);
false
} else if shred.data_header.size as usize > shred.payload.len() {
inc_new_counter_warn!("streamer-recv_window-shred_bad_meta_size", 1);
} else if !shred.sanitize() {
inc_new_counter_warn!("streamer-recv_window-invalid-shred", 1);
false
} else {
true
@ -215,13 +215,13 @@ fn run_check_duplicate(
let shred_slot = shred.slot();
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
if let Some(existing_shred_payload) =
blockstore.is_shred_duplicate(shred.id(), shred.payload.clone())
blockstore.is_shred_duplicate(shred.id(), shred.payload().clone())
{
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
blockstore.store_duplicate_slot(
shred_slot,
existing_shred_payload,
shred.payload,
shred.into_payload(),
)?;
duplicate_slot_sender.send(shred_slot)?;
@ -717,7 +717,7 @@ mod test {
blockstore::{make_many_slot_entries, Blockstore},
genesis_utils::create_genesis_config_with_leader,
get_tmp_ledger_path,
shred::{DataShredHeader, Shredder},
shred::Shredder,
},
solana_sdk::{
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
@ -825,21 +825,9 @@ mod test {
0
));
// with a bad header size
let mut bad_header_shred = shreds[0].clone();
bad_header_shred.data_header.size = (bad_header_shred.payload.len() + 1) as u16;
assert!(!should_retransmit_and_persist(
&bad_header_shred,
Some(bank.clone()),
&cache,
&me_id,
0,
0
));
// with an invalid index, shred gets thrown out
let mut bad_index_shred = shreds[0].clone();
bad_index_shred.common_header.index = (MAX_DATA_SHREDS_PER_SLOT + 1) as u32;
bad_index_shred.set_index((MAX_DATA_SHREDS_PER_SLOT + 1) as u32);
assert!(!should_retransmit_and_persist(
&bad_index_shred,
Some(bank.clone()),
@ -875,7 +863,7 @@ mod test {
));
// coding shreds don't contain parent slot information, test that slot >= root
let (common, coding) = Shred::new_coding_shred_header(
let mut coding_shred = Shred::new_empty_coding(
5, // slot
5, // index
5, // fec_set_index
@ -884,8 +872,6 @@ mod test {
3, // position
0, // version
);
let mut coding_shred =
Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
coding_shred.sign(&leader_keypair);
// shred.slot() > root, shred continues
assert!(should_retransmit_and_persist(
@ -959,7 +945,7 @@ mod test {
std::net::{IpAddr, Ipv4Addr},
};
solana_logger::setup();
let (common, coding) = Shred::new_coding_shred_header(
let shred = Shred::new_empty_coding(
5, // slot
5, // index
5, // fec_set_index
@ -968,7 +954,6 @@ mod test {
4, // position
0, // version
);
let shred = Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
let mut shreds = vec![shred.clone(), shred.clone(), shred];
let _from_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let repair_meta = RepairMeta {

View File

@ -3343,9 +3343,12 @@ mod tests {
let keypair = Keypair::new();
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen();
let next_shred_index = rng.gen_range(0, 32_000);
let shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader).payload;
let other_payload = {
let other_shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
other_shred.into_payload()
};
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())

View File

@ -91,7 +91,7 @@ fn check_shreds(
Err(Error::ShredIndexMismatch)
} else if shred1.shred_type() != shred2.shred_type() {
Err(Error::ShredTypeMismatch)
} else if shred1.payload == shred2.payload {
} else if shred1.payload() == shred2.payload() {
Err(Error::InvalidDuplicateShreds)
} else {
if let Some(leader_schedule) = leader_schedule {
@ -152,14 +152,14 @@ pub(crate) fn from_shred(
wallclock: u64,
max_size: usize, // Maximum serialized size of each DuplicateShred.
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
if shred.payload == other_payload {
if shred.payload() == &other_payload {
return Err(Error::InvalidDuplicateShreds);
}
let other_shred = Shred::new_from_serialized_shred(other_payload.clone())?;
check_shreds(leader_schedule, &shred, &other_shred)?;
let (slot, shred_index, shred_type) = (shred.slot(), shred.index(), shred.shred_type());
let proof = DuplicateSlotProof {
shred1: shred.payload,
shred1: shred.into_payload(),
shred2: other_payload,
};
let data = bincode::serialize(&proof)?;
@ -259,7 +259,7 @@ pub fn into_shreds(
Err(Error::ShredIndexMismatch)
} else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type {
Err(Error::ShredTypeMismatch)
} else if shred1.payload == shred2.payload {
} else if shred1.payload() == shred2.payload() {
Err(Error::InvalidDuplicateShreds)
} else if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) {
Err(Error::InvalidSignature)
@ -352,7 +352,7 @@ pub(crate) mod tests {
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen();
let next_shred_index = rng.gen_range(0, 32_000);
let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
let leader_schedule = |s| {
@ -365,7 +365,7 @@ pub(crate) mod tests {
let chunks: Vec<_> = from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload.clone(),
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size

View File

@ -1120,12 +1120,6 @@ impl Blockstore {
)
}
fn erasure_mismatch(shred1: &Shred, shred2: &Shred) -> bool {
shred1.coding_header.num_coding_shreds != shred2.coding_header.num_coding_shreds
|| shred1.coding_header.num_data_shreds != shred2.coding_header.num_data_shreds
|| shred1.first_coding_index() != shred2.first_coding_index()
}
#[allow(clippy::too_many_arguments)]
fn check_insert_coding_shred<F>(
&self,
@ -1186,7 +1180,11 @@ impl Blockstore {
);
if let Some(conflicting_shred) = conflicting_shred {
if self
.store_duplicate_if_not_existing(slot, conflicting_shred, shred.payload.clone())
.store_duplicate_if_not_existing(
slot,
conflicting_shred,
shred.payload().clone(),
)
.is_err()
{
warn!("bad duplicate store..");
@ -1198,10 +1196,15 @@ impl Blockstore {
// ToDo: This is a potential slashing condition
warn!("Received multiple erasure configs for the same erasure set!!!");
warn!(
"Slot: {}, shred index: {}, erasure_set: {:?}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
slot, shred.index(), erasure_set, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header,
"Slot: {}, shred index: {}, erasure_set: {:?}, \
is_duplicate: {}, stored config: {:#?}, new shred: {:#?}",
slot,
shred.index(),
erasure_set,
self.has_duplicate_shreds_in_slot(slot),
erasure_meta.config(),
shred,
);
return false;
}
@ -1239,15 +1242,15 @@ impl Blockstore {
let maybe_shred = self.get_coding_shred(slot, coding_index);
if let Ok(Some(shred_data)) = maybe_shred {
let potential_shred = Shred::new_from_serialized_shred(shred_data).unwrap();
if Self::erasure_mismatch(&potential_shred, shred) {
return Some(potential_shred.payload);
if shred.erasure_mismatch(&potential_shred).unwrap() {
return Some(potential_shred.into_payload());
}
} else if let Some(potential_shred) = {
let key = ShredId::new(slot, u32::try_from(coding_index).unwrap(), ShredType::Code);
just_received_shreds.get(&key)
} {
if Self::erasure_mismatch(potential_shred, shred) {
return Some(potential_shred.payload.clone());
if shred.erasure_mismatch(potential_shred).unwrap() {
return Some(potential_shred.payload().clone());
}
}
}
@ -1397,7 +1400,7 @@ impl Blockstore {
// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &shred.payload)?;
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), shred.payload())?;
index_meta.coding_mut().insert(shred_index);
Ok(())
@ -1417,7 +1420,7 @@ impl Blockstore {
) -> Cow<'a, Vec<u8>> {
let key = ShredId::new(slot, u32::try_from(index).unwrap(), ShredType::Data);
if let Some(shred) = just_inserted_shreds.get(&key) {
Cow::Borrowed(&shred.payload)
Cow::Borrowed(shred.payload())
} else {
// If it doesn't exist in the just inserted set, it must exist in
// the backing store
@ -1442,8 +1445,7 @@ impl Blockstore {
} else {
false
};
if shred.data_header.size == 0 {
if !shred.sanitize() {
let leader_pubkey = leader_schedule
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));
@ -1452,32 +1454,14 @@ impl Blockstore {
(
"error",
format!(
"Leader {:?}, slot {}: received index {} is empty",
leader_pubkey, slot, shred_index,
"Leader {:?}, slot {}: received invalid shred",
leader_pubkey, slot,
),
String
)
);
return false;
}
if shred.payload.len() > SHRED_PAYLOAD_SIZE {
let leader_pubkey = leader_schedule
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));
datapoint_error!(
"blockstore_error",
(
"error",
format!(
"Leader {:?}, slot {}: received index {} shred.payload.len() > SHRED_PAYLOAD_SIZE",
leader_pubkey, slot, shred_index,
),
String
)
);
return false;
}
// Check that we do not receive shred_index >= than the last_index
// for the slot
let last_index = slot_meta.last_index;
@ -1495,7 +1479,7 @@ impl Blockstore {
.store_duplicate_if_not_existing(
slot,
ending_shred.into_owned(),
shred.payload.clone(),
shred.payload().clone(),
)
.is_err()
{
@ -1531,7 +1515,7 @@ impl Blockstore {
.store_duplicate_if_not_existing(
slot,
ending_shred.into_owned(),
shred.payload.clone(),
shred.payload().clone(),
)
.is_err()
{
@ -1616,12 +1600,7 @@ impl Blockstore {
// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredData>(
(slot, index),
// Payload will be padded out to SHRED_PAYLOAD_SIZE
// But only need to store the bytes within data_header.size
&shred.payload[..shred.data_header.size as usize],
)?;
write_batch.put_bytes::<cf::ShredData>((slot, index), shred.bytes_to_store())?;
data_index.insert(index);
let newly_completed_data_sets = update_slot_meta(
last_in_slot,
@ -3132,7 +3111,7 @@ impl Blockstore {
let size = payload.len().max(SHRED_PAYLOAD_SIZE);
payload.resize(size, 0u8);
let new_shred = Shred::new_from_serialized_shred(payload).unwrap();
(existing_shred != new_shred.payload).then(|| existing_shred)
(existing_shred != *new_shred.payload()).then(|| existing_shred)
}
pub fn has_duplicate_shreds_in_slot(&self, slot: Slot) -> bool {
@ -4294,7 +4273,7 @@ pub mod tests {
blockstore_db::BlockstoreRocksFifoOptions,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
leader_schedule::{FixedSchedule, LeaderSchedule},
shred::{max_ticks_per_n_shreds, DataShredHeader},
shred::max_ticks_per_n_shreds,
},
assert_matches::assert_matches,
bincode::serialize,
@ -4577,7 +4556,7 @@ pub mod tests {
let slot = 0;
let (shreds, _) = make_slot_entries(slot, 0, 100);
let num_shreds = shreds.len() as u64;
let shred_bufs: Vec<_> = shreds.iter().map(|shred| shred.payload.clone()).collect();
let shred_bufs: Vec<_> = shreds.iter().map(Shred::payload).cloned().collect();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
@ -5846,31 +5825,23 @@ pub mod tests {
.unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
// Corrupt shred by making it too large
let mut shred5 = shreds[5].clone();
shred5.payload.push(10);
shred5.data_header.size = shred5.payload.len() as u16;
assert!(!blockstore.should_insert_data_shred(
&shred5,
&slot_meta,
&HashMap::new(),
&last_root,
None,
ShredSource::Turbine
));
let shred5 = shreds[5].clone();
// Ensure that an empty shred (one with no data) would get inserted. Such shreds
// may be used as signals (broadcast does so to indicate a slot was interrupted)
// Reuse shred5's header values to avoid a false negative result
let mut empty_shred = Shred::new_from_data(
shred5.common_header.slot,
shred5.common_header.index,
shred5.data_header.parent_offset,
let empty_shred = Shred::new_from_data(
shred5.slot(),
shred5.index(),
{
let parent_offset = shred5.slot() - shred5.parent().unwrap();
parent_offset as u16
},
None, // data
true, // is_last_data
true, // is_last_in_slot
0, // reference_tick
shred5.common_header.version,
shred5.version(),
shred5.fec_set_index(),
);
assert!(blockstore.should_insert_data_shred(
@ -5881,16 +5852,6 @@ pub mod tests {
None,
ShredSource::Repaired,
));
empty_shred.data_header.size = 0;
assert!(!blockstore.should_insert_data_shred(
&empty_shred,
&slot_meta,
&HashMap::new(),
&last_root,
None,
ShredSource::Recovered,
));
// Trying to insert another "is_last" shred with index < the received index should fail
// skip over shred 7
blockstore
@ -5977,7 +5938,7 @@ pub mod tests {
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let slot = 1;
let (shred, coding) = Shred::new_coding_shred_header(
let coding_shred = Shred::new_empty_coding(
slot, 11, // index
11, // fec_set_index
11, // num_data_shreds
@ -5985,7 +5946,6 @@ pub mod tests {
8, // position
0, // version
);
let coding_shred = Shred::new_empty_from_header(shred, DataShredHeader::default(), coding);
let mut erasure_metas = HashMap::new();
let mut index_working_set = HashMap::new();
@ -6034,7 +5994,7 @@ pub mod tests {
let last_root = RwLock::new(0);
let slot = 1;
let (mut shred, coding) = Shred::new_coding_shred_header(
let mut coding_shred = Shred::new_empty_coding(
slot, 11, // index
11, // fec_set_index
11, // num_data_shreds
@ -6042,8 +6002,6 @@ pub mod tests {
8, // position
0, // version
);
let coding_shred =
Shred::new_empty_from_header(shred.clone(), DataShredHeader::default(), coding.clone());
// Insert a good coding shred
assert!(Blockstore::should_insert_coding_shred(
@ -6065,28 +6023,16 @@ pub mod tests {
));
}
shred.index += 1;
// Establish a baseline that works
{
let coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
assert!(Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
}
coding_shred.set_index(coding_shred.index() + 1);
assert!(Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
// Trying to insert a shred with index < position should fail
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
let mut coding_shred = coding_shred.clone();
let index = coding_shred.index() - coding_shred.fec_set_index() - 1;
coding_shred.set_index(index as u32);
@ -6096,76 +6042,9 @@ pub mod tests {
));
}
// Trying to insert shred with num_coding == 0 should fail
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.coding_header.num_coding_shreds = 0;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
}
// Trying to insert shred with pos >= num_coding should fail
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
let num_coding_shreds = coding_shred.index() - coding_shred.fec_set_index();
coding_shred.coding_header.num_coding_shreds = num_coding_shreds as u16;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
}
// Trying to insert with set_index with num_coding that would imply the last shred
// has index > u32::MAX should fail
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.common_header.fec_set_index = std::u32::MAX - 1;
coding_shred.coding_header.num_data_shreds = 2;
coding_shred.coding_header.num_coding_shreds = 4;
coding_shred.coding_header.position = 1;
coding_shred.common_header.index = std::u32::MAX - 1;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
coding_shred.coding_header.num_coding_shreds = 2000;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
// Decreasing the number of num_coding_shreds will put it within the allowed limit
coding_shred.coding_header.num_coding_shreds = 2;
assert!(Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
// Insertion should succeed
blockstore
.insert_shreds(vec![coding_shred], None, false)
.unwrap();
}
// Trying to insert value into slot <= than last root should fail
{
let mut coding_shred =
Shred::new_empty_from_header(shred, DataShredHeader::default(), coding);
let mut coding_shred = coding_shred.clone();
coding_shred.set_slot(*last_root.read().unwrap());
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
@ -8350,10 +8229,7 @@ pub mod tests {
blockstore
.insert_shreds(coding_shreds, Some(&leader_schedule_cache), false)
.unwrap();
let shred_bufs: Vec<_> = data_shreds
.iter()
.map(|shred| shred.payload.clone())
.collect();
let shred_bufs: Vec<_> = data_shreds.iter().map(Shred::payload).cloned().collect();
// Check all the data shreds were recovered
for (s, buf) in data_shreds.iter().zip(shred_bufs) {
@ -8606,20 +8482,24 @@ pub mod tests {
assert_eq!(
blockstore.is_shred_duplicate(
ShredId::new(slot, /*index:*/ 0, duplicate_shred.shred_type()),
duplicate_shred.payload.clone(),
duplicate_shred.payload().clone(),
),
Some(shred.payload.to_vec())
Some(shred.payload().clone())
);
assert!(blockstore
.is_shred_duplicate(
ShredId::new(slot, /*index:*/ 0, non_duplicate_shred.shred_type()),
non_duplicate_shred.payload,
non_duplicate_shred.into_payload(),
)
.is_none());
// Store a duplicate shred
blockstore
.store_duplicate_slot(slot, shred.payload.clone(), duplicate_shred.payload.clone())
.store_duplicate_slot(
slot,
shred.payload().clone(),
duplicate_shred.payload().clone(),
)
.unwrap();
// Slot is now marked as duplicate
@ -8627,8 +8507,8 @@ pub mod tests {
// Check ability to fetch the duplicates
let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap();
assert_eq!(duplicate_proof.shred1, shred.payload);
assert_eq!(duplicate_proof.shred2, duplicate_shred.payload);
assert_eq!(duplicate_proof.shred1, *shred.payload());
assert_eq!(duplicate_proof.shred2, *duplicate_shred.payload());
}
#[test]
@ -8926,30 +8806,6 @@ pub mod tests {
assert!(blockstore.has_duplicate_shreds_in_slot(slot));
}
#[test]
fn test_large_num_coding() {
solana_logger::setup();
let slot = 1;
let (_data_shreds, mut coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, 100);
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
coding_shreds[1].coding_header.num_coding_shreds = u16::MAX;
blockstore
.insert_shreds(
vec![coding_shreds[1].clone()],
Some(&leader_schedule_cache),
false,
)
.unwrap();
// Check no coding shreds are inserted
let res = blockstore.get_coding_shreds_for_slot(slot, 0).unwrap();
assert!(res.is_empty());
}
#[test]
pub fn test_insert_data_shreds_same_slot_last_index() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
@ -9074,7 +8930,7 @@ pub mod tests {
if i <= smaller_last_shred_index as u64 {
assert_eq!(
blockstore.get_data_shred(slot, i).unwrap().unwrap(),
shreds[i as usize].payload
*shreds[i as usize].payload()
);
} else {
assert!(blockstore.get_data_shred(slot, i).unwrap().is_none());
@ -9087,11 +8943,14 @@ pub mod tests {
// Case 2: Inserting a duplicate with an even smaller last shred index should not
// mark the slot as dead since the Slotmeta is full.
let mut even_smaller_last_shred_duplicate = shreds[smaller_last_shred_index - 1].clone();
even_smaller_last_shred_duplicate.set_last_in_slot();
// Flip a byte to create a duplicate shred
even_smaller_last_shred_duplicate.payload[0] =
std::u8::MAX - even_smaller_last_shred_duplicate.payload[0];
let even_smaller_last_shred_duplicate = {
let mut payload = shreds[smaller_last_shred_index - 1].payload().clone();
// Flip a byte to create a duplicate shred
payload[0] = std::u8::MAX - payload[0];
let mut shred = Shred::new_from_serialized_shred(payload).unwrap();
shred.set_last_in_slot();
shred
};
assert!(blockstore
.is_shred_duplicate(
ShredId::new(
@ -9099,7 +8958,7 @@ pub mod tests {
even_smaller_last_shred_duplicate.index(),
ShredType::Data
),
even_smaller_last_shred_duplicate.payload.clone(),
even_smaller_last_shred_duplicate.payload().clone(),
)
.is_some());
blockstore
@ -9110,7 +8969,7 @@ pub mod tests {
if i <= smaller_last_shred_index as u64 {
assert_eq!(
blockstore.get_data_shred(slot, i).unwrap().unwrap(),
shreds[i as usize].payload
*shreds[i as usize].payload()
);
} else {
assert!(blockstore.get_data_shred(slot, i).unwrap().is_none());
@ -9146,7 +9005,7 @@ pub mod tests {
.get_data_shred(slot, shred_index)
.unwrap()
.unwrap(),
shred_to_check.payload
*shred_to_check.payload()
);
} else {
assert!(blockstore
@ -9177,7 +9036,7 @@ pub mod tests {
.get_data_shred(slot, shred_index)
.unwrap()
.unwrap(),
shred_to_check.payload
*shred_to_check.payload()
);
} else {
assert!(blockstore

View File

@ -234,8 +234,8 @@ impl ErasureMeta {
ShredType::Data => None,
ShredType::Code => {
let config = ErasureConfig::new(
usize::from(shred.coding_header.num_data_shreds),
usize::from(shred.coding_header.num_coding_shreds),
usize::from(shred.num_data_shreds().ok()?),
usize::from(shred.num_coding_shreds().ok()?),
);
let first_coding_index = u64::from(shred.first_coding_index()?);
let erasure_meta = ErasureMeta {

View File

@ -67,7 +67,7 @@ use {
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
},
std::{cell::RefCell, mem::size_of},
std::{cell::RefCell, mem::size_of, ops::RangeInclusive},
thiserror::Error,
};
@ -76,25 +76,32 @@ pub type Nonce = u32;
/// The following constants are computed by hand, and hardcoded.
/// `test_shred_constants` ensures that the values are correct.
/// Constants are used over lazy_static for performance reasons.
pub const SIZE_OF_COMMON_SHRED_HEADER: usize = 83;
pub const SIZE_OF_DATA_SHRED_HEADER: usize = 5;
pub const SIZE_OF_CODING_SHRED_HEADER: usize = 6;
pub const SIZE_OF_SIGNATURE: usize = 64;
pub const SIZE_OF_SHRED_TYPE: usize = 1;
pub const SIZE_OF_SHRED_SLOT: usize = 8;
pub const SIZE_OF_SHRED_INDEX: usize = 4;
const SIZE_OF_COMMON_SHRED_HEADER: usize = 83;
const SIZE_OF_DATA_SHRED_HEADER: usize = 5;
const SIZE_OF_CODING_SHRED_HEADER: usize = 6;
const SIZE_OF_SIGNATURE: usize = 64;
const SIZE_OF_SHRED_TYPE: usize = 1;
const SIZE_OF_SHRED_SLOT: usize = 8;
const SIZE_OF_SHRED_INDEX: usize = 4;
pub const SIZE_OF_NONCE: usize = 4;
pub const SIZE_OF_CODING_SHRED_HEADERS: usize =
const SIZE_OF_CODING_SHRED_HEADERS: usize =
SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER;
pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE
- SIZE_OF_COMMON_SHRED_HEADER
- SIZE_OF_DATA_SHRED_HEADER
- SIZE_OF_CODING_SHRED_HEADERS
- SIZE_OF_NONCE;
const SHRED_DATA_OFFSET: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER;
// DataShredHeader.size is sum of common-shred-header, data-shred-header and
// data.len(). Broadcast stage may create zero length data shreds when the
// previous slot was interrupted:
// https://github.com/solana-labs/solana/blob/2d4defa47/core/src/broadcast_stage/standard_broadcast_run.rs#L79
const DATA_SHRED_SIZE_RANGE: RangeInclusive<usize> =
SHRED_DATA_OFFSET..=SHRED_DATA_OFFSET + SIZE_OF_DATA_SHRED_PAYLOAD;
pub const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE;
pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE;
pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;
const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE;
const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE;
const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;
pub const SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE;
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
@ -107,7 +114,7 @@ pub const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 32;
pub const SHRED_TICK_REFERENCE_MASK: u8 = 0b0011_1111;
const LAST_SHRED_IN_SLOT: u8 = 0b1000_0000;
pub const DATA_COMPLETE_SHRED: u8 = 0b0100_0000;
const DATA_COMPLETE_SHRED: u8 = 0b0100_0000;
#[derive(Error, Debug)]
pub enum ShredError {
@ -163,37 +170,37 @@ impl Default for ShredType {
/// A common header that is present in data and code shred headers
#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)]
pub struct ShredCommonHeader {
pub signature: Signature,
pub shred_type: ShredType,
pub slot: Slot,
pub index: u32,
pub version: u16,
pub fec_set_index: u32,
struct ShredCommonHeader {
signature: Signature,
shred_type: ShredType,
slot: Slot,
index: u32,
version: u16,
fec_set_index: u32,
}
/// The data shred header has parent offset and flags
#[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)]
pub struct DataShredHeader {
pub parent_offset: u16,
pub flags: u8,
pub size: u16,
struct DataShredHeader {
parent_offset: u16,
flags: u8,
size: u16, // common shred header + data shred header + data
}
/// The coding shred header has FEC information
#[derive(Serialize, Clone, Default, Deserialize, PartialEq, Debug)]
pub struct CodingShredHeader {
pub num_data_shreds: u16,
pub num_coding_shreds: u16,
pub position: u16,
struct CodingShredHeader {
num_data_shreds: u16,
num_coding_shreds: u16,
position: u16,
}
#[derive(Clone, Debug, PartialEq)]
pub struct Shred {
pub common_header: ShredCommonHeader,
pub data_header: DataShredHeader,
pub coding_header: CodingShredHeader,
pub payload: Vec<u8>,
common_header: ShredCommonHeader,
data_header: DataShredHeader,
coding_header: CodingShredHeader,
payload: Vec<u8>,
}
/// Tuple which uniquely identifies a shred should it exists.
@ -260,6 +267,7 @@ impl Shred {
packet.meta.size = len;
}
// TODO: Should this sanitize output?
pub fn new_from_data(
slot: Slot,
index: u32,
@ -314,7 +322,7 @@ impl Shred {
&data_header,
)
.expect("Failed to write data header into shred buffer");
// TODO: Need to check if data is too large!
if let Some(data) = data {
payload[start..start + data.len()].clone_from_slice(data);
}
@ -354,13 +362,14 @@ impl Shred {
coding_header,
payload,
};
// TODO: Should return why sanitize failed.
shred
.sanitize()
.then(|| shred)
.ok_or(ShredError::InvalidPayload)
}
pub fn new_coding_shred_header(
pub fn new_empty_coding(
slot: Slot,
index: u32,
fec_set_index: u32,
@ -368,7 +377,7 @@ impl Shred {
num_coding_shreds: u16,
position: u16,
version: u16,
) -> (ShredCommonHeader, CodingShredHeader) {
) -> Self {
let header = ShredCommonHeader {
shred_type: ShredType::Code,
index,
@ -377,38 +386,15 @@ impl Shred {
fec_set_index,
..ShredCommonHeader::default()
};
(
header,
CodingShredHeader {
num_data_shreds,
num_coding_shreds,
position,
},
)
}
pub fn new_empty_coding(
slot: Slot,
index: u32,
fec_set_index: u32,
num_data: u16,
num_code: u16,
position: u16,
version: u16,
) -> Self {
let (header, coding_header) = Self::new_coding_shred_header(
slot,
index,
fec_set_index,
num_data,
num_code,
let coding_header = CodingShredHeader {
num_data_shreds,
num_coding_shreds,
position,
version,
);
};
Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header)
}
pub fn new_empty_from_header(
fn new_empty_from_header(
common_header: ShredCommonHeader,
data_header: DataShredHeader,
coding_header: CodingShredHeader,
@ -488,7 +474,29 @@ impl Shred {
self.common_header.index
}
pub(crate) fn fec_set_index(&self) -> u32 {
#[inline]
pub fn payload(&self) -> &Vec<u8> {
&self.payload
}
// Possibly trimmed payload;
// Should only be used when storing shreds to blockstore.
pub(crate) fn bytes_to_store(&self) -> &[u8] {
match self.shred_type() {
ShredType::Code => &self.payload,
ShredType::Data => {
// Payload will be padded out to SHRED_PAYLOAD_SIZE.
// But only need to store the bytes within data_header.size.
&self.payload[..self.data_header.size as usize]
}
}
}
pub fn into_payload(self) -> Vec<u8> {
self.payload
}
pub fn fec_set_index(&self) -> u32 {
self.common_header.fec_set_index
}
@ -503,12 +511,17 @@ impl Shred {
}
// Returns true if the shred passes sanity checks.
pub(crate) fn sanitize(&self) -> bool {
self.erasure_block_index().is_some()
// TODO: Should return why sanitize failed!
pub fn sanitize(&self) -> bool {
self.payload.len() <= SHRED_PAYLOAD_SIZE
&& self.erasure_block_index().is_some()
&& match self.shred_type() {
ShredType::Data => {
self.parent().is_ok()
&& usize::from(self.data_header.size) <= self.payload.len()
let size = usize::from(self.data_header.size);
self.index() < MAX_DATA_SHREDS_PER_SLOT as u32
&& self.parent().is_ok()
&& size <= self.payload.len()
&& DATA_SHRED_SIZE_RANGE.contains(&size)
}
ShredType::Code => {
u32::from(self.coding_header.num_coding_shreds)
@ -645,7 +658,7 @@ impl Shred {
}
#[cfg(test)]
pub fn unset_data_complete(&mut self) {
pub(crate) fn unset_data_complete(&mut self) {
if self.is_data() {
self.data_header.flags &= !DATA_COMPLETE_SHRED;
}
@ -670,7 +683,7 @@ impl Shred {
}
}
pub fn reference_tick(&self) -> u8 {
pub(crate) fn reference_tick(&self) -> u8 {
if self.is_data() {
self.data_header.flags & SHRED_TICK_REFERENCE_MASK
} else {
@ -690,7 +703,7 @@ impl Shred {
limited_deserialize::<Slot>(&p.data[slot_start..slot_end]).ok()
}
pub fn reference_tick_from_data(data: &[u8]) -> u8 {
pub(crate) fn reference_tick_from_data(data: &[u8]) -> u8 {
let flags = data[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER
- size_of::<u8>()
- size_of::<u16>()];
@ -701,12 +714,43 @@ impl Shred {
self.signature()
.verify(pubkey.as_ref(), &self.payload[SIZE_OF_SIGNATURE..])
}
// Returns true if the erasure coding of the two shreds mismatch.
pub(crate) fn erasure_mismatch(self: &Shred, other: &Shred) -> Result<bool> {
match (self.shred_type(), other.shred_type()) {
(ShredType::Code, ShredType::Code) => {
let CodingShredHeader {
num_data_shreds,
num_coding_shreds,
position: _,
} = self.coding_header;
Ok(num_coding_shreds != other.coding_header.num_coding_shreds
|| num_data_shreds != other.coding_header.num_data_shreds
|| self.first_coding_index() != other.first_coding_index())
}
_ => Err(ShredError::InvalidShredType),
}
}
pub(crate) fn num_data_shreds(self: &Shred) -> Result<u16> {
match self.shred_type() {
ShredType::Data => Err(ShredError::InvalidShredType),
ShredType::Code => Ok(self.coding_header.num_data_shreds),
}
}
pub(crate) fn num_coding_shreds(self: &Shred) -> Result<u16> {
match self.shred_type() {
ShredType::Data => Err(ShredError::InvalidShredType),
ShredType::Code => Ok(self.coding_header.num_coding_shreds),
}
}
}
#[derive(Debug)]
pub struct Shredder {
pub slot: Slot,
pub parent_slot: Slot,
slot: Slot,
parent_slot: Slot,
version: u16,
reference_tick: u8,
}
@ -1008,7 +1052,6 @@ impl Shredder {
/// Combines all shreds to recreate the original buffer
pub fn deshred(shreds: &[Shred]) -> std::result::Result<Vec<u8>, reed_solomon_erasure::Error> {
use reed_solomon_erasure::Error::TooFewDataShards;
const SHRED_DATA_OFFSET: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER;
Self::verify_consistent_shred_payload_sizes("deshred()", shreds)?;
let index = shreds.first().ok_or(TooFewDataShards)?.index();
let aligned = shreds.iter().zip(index..).all(|(s, i)| s.index() == i);
@ -1160,7 +1203,7 @@ pub fn verify_test_data_shred(
}
#[cfg(test)]
pub mod tests {
mod tests {
use {
super::*,
bincode::serialized_size,
@ -1985,7 +2028,7 @@ pub mod tests {
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
assert_eq!(1, stats.index_out_of_bounds);
let (header, coding_header) = Shred::new_coding_shred_header(
let shred = Shred::new_empty_coding(
8, // slot
2, // index
10, // fec_set_index
@ -1994,7 +2037,6 @@ pub mod tests {
3, // position
200, // version
);
let shred = Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header);
shred.copy_to_packet(&mut packet);
packet.data[OFFSET_OF_SHRED_TYPE] = u8::MAX;
@ -2029,4 +2071,97 @@ pub mod tests {
Ok(ShredType::Code)
);
}
#[test]
fn test_sanitize_data_shred() {
let data = [0xa5u8; SIZE_OF_DATA_SHRED_PAYLOAD];
let mut shred = Shred::new_from_data(
420, // slot
19, // index
5, // parent_offset
Some(&data),
true, // is_last_data
false, // is_last_in_slot
3, // reference_tick
1, // version
16, // fec_set_index
);
assert!(shred.sanitize());
// Corrupt shred by making it too large
{
let mut shred = shred.clone();
shred.payload.push(10u8);
assert!(!shred.sanitize());
}
{
let mut shred = shred.clone();
shred.data_header.size += 1;
assert!(!shred.sanitize());
}
{
let mut shred = shred.clone();
shred.data_header.size = 0;
assert!(!shred.sanitize());
}
{
shred.data_header.size = shred.payload().len() as u16 + 1;
assert!(!shred.sanitize());
}
}
#[test]
fn test_sanitize_coding_shred() {
let mut shred = Shred::new_empty_coding(
1, // slot
12, // index
11, // fec_set_index
11, // num_data_shreds
11, // num_coding_shreds
8, // position
0, // version
);
assert!(shred.sanitize());
// index < position is invalid.
{
let mut shred = shred.clone();
let index = shred.index() - shred.fec_set_index() - 1;
shred.set_index(index as u32);
assert!(!shred.sanitize());
}
{
let mut shred = shred.clone();
shred.coding_header.num_coding_shreds = 0;
assert!(!shred.sanitize());
}
// pos >= num_coding is invalid.
{
let mut shred = shred.clone();
let num_coding_shreds = shred.index() - shred.fec_set_index();
shred.coding_header.num_coding_shreds = num_coding_shreds as u16;
assert!(!shred.sanitize());
}
// set_index with num_coding that would imply the last
// shred has index > u32::MAX should fail.
{
let mut shred = shred.clone();
shred.common_header.fec_set_index = std::u32::MAX - 1;
shred.coding_header.num_data_shreds = 2;
shred.coding_header.num_coding_shreds = 4;
shred.coding_header.position = 1;
shred.common_header.index = std::u32::MAX - 1;
assert!(!shred.sanitize());
shred.coding_header.num_coding_shreds = 2000;
assert!(!shred.sanitize());
// Decreasing the number of num_coding_shreds will put it within
// the allowed limit.
shred.coding_header.num_coding_shreds = 2;
assert!(shred.sanitize());
}
{
shred.coding_header.num_coding_shreds = u16::MAX;
assert!(!shred.sanitize());
}
}
}

View File

@ -480,9 +480,9 @@ pub mod tests {
assert_eq!(shred.slot(), slot);
let keypair = Keypair::new();
shred.sign(&keypair);
trace!("signature {}", shred.common_header.signature);
packet.data[0..shred.payload.len()].copy_from_slice(&shred.payload);
packet.meta.size = shred.payload.len();
trace!("signature {}", shred.signature());
packet.data[0..shred.payload().len()].copy_from_slice(shred.payload());
packet.meta.size = shred.payload().len();
let leader_slots = [(slot, keypair.pubkey().to_bytes())]
.iter()
@ -526,8 +526,8 @@ pub mod tests {
let keypair = Keypair::new();
shred.sign(&keypair);
batches[0].packets.resize(1, Packet::default());
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[0].meta.size = shred.payload().len();
let leader_slots = [(slot, keypair.pubkey().to_bytes())]
.iter()
@ -581,8 +581,8 @@ pub mod tests {
let keypair = Keypair::new();
shred.sign(&keypair);
batches[0].packets.resize(1, Packet::default());
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[0].meta.size = shred.payload().len();
let leader_slots = [
(std::u64::MAX, Pubkey::default().to_bytes()),
@ -693,8 +693,8 @@ pub mod tests {
0xc0de,
);
batches[0].packets.resize(1, Packet::default());
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload());
batches[0].packets[0].meta.size = shred.payload().len();
let pubkeys = [
(slot, keypair.pubkey().to_bytes()),
(std::u64::MAX, Pubkey::default().to_bytes()),

View File

@ -163,7 +163,7 @@ fn sort_data_coding_into_fec_sets(
assert!(!data_slot_and_index.contains(&key));
data_slot_and_index.insert(key);
let fec_entry = fec_data
.entry(shred.common_header.fec_set_index)
.entry(shred.fec_set_index())
.or_insert_with(Vec::new);
fec_entry.push(shred);
}
@ -174,7 +174,7 @@ fn sort_data_coding_into_fec_sets(
assert!(!coding_slot_and_index.contains(&key));
coding_slot_and_index.insert(key);
let fec_entry = fec_coding
.entry(shred.common_header.fec_set_index)
.entry(shred.fec_set_index())
.or_insert_with(Vec::new);
fec_entry.push(shred);
}