From 0dbf7995b50b43237b1fd239eb92f25dae59ba27 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Wed, 18 Sep 2019 20:08:27 -0700 Subject: [PATCH] Remove unnecessary serialize of shred data (#5967) * Remove unnecessary serialize of shred data * remove obsolete code * fix golden hash --- core/src/blocktree.rs | 35 ++---- core/src/chacha.rs | 2 +- core/src/shred.rs | 254 +++++++++++++----------------------------- 3 files changed, 93 insertions(+), 198 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 0388d3cde..d39e024f9 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -1667,7 +1667,6 @@ pub fn entries_to_test_shreds( pub mod tests { use super::*; use crate::entry::{create_ticks, Entry}; - use crate::shred::{DataShredHeader, CODING_SHRED}; use itertools::Itertools; use rand::seq::SliceRandom; use rand::thread_rng; @@ -3107,13 +3106,8 @@ pub mod tests { let index_cf = blocktree.db.column::(); let last_root = RwLock::new(0); - let mut shred = DataShredHeader::default(); let slot = 1; - shred.common_header.header.shred_type = CODING_SHRED; - shred.common_header.header.position = 10; - shred.common_header.header.coding_header.index = 11; - shred.common_header.header.coding_header.slot = 1; - shred.common_header.header.num_coding_shreds = shred.common_header.header.position + 1; + let mut shred = Shredder::new_coding_shred_header(slot, 11, 11, 11, 10); let coding_shred = Shred::new_empty_from_header(shred.clone()); // Insert a good coding shred @@ -3131,7 +3125,7 @@ pub mod tests { // Trying to insert the same shred again should fail { let index = index_cf - .get(shred.common_header.header.coding_header.slot) + .get(shred.common_header.coding_header.slot) .unwrap() .unwrap(); assert!(!Blocktree::should_insert_coding_shred( @@ -3141,13 +3135,13 @@ pub mod tests { )); } - shred.common_header.header.coding_header.index += 1; + shred.common_header.coding_header.index += 1; // Establish a baseline that works { let coding_shred = Shred::new_empty_from_header(shred.clone()); let index = index_cf - .get(shred.common_header.header.coding_header.slot) + .get(shred.common_header.coding_header.slot) .unwrap() .unwrap(); assert!(Blocktree::should_insert_coding_shred( @@ -3160,7 +3154,7 @@ pub mod tests { // Trying to insert a shred with index < position should fail { let mut coding_shred = Shred::new_empty_from_header(shred.clone()); - let index = coding_shred.headers.common_header.header.position - 1; + let index = coding_shred.headers.common_header.position - 1; coding_shred.set_index(index as u32); let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); @@ -3174,7 +3168,7 @@ pub mod tests { // Trying to insert shred with num_coding == 0 should fail { let mut coding_shred = Shred::new_empty_from_header(shred.clone()); - coding_shred.headers.common_header.header.num_coding_shreds = 0; + coding_shred.headers.common_header.num_coding_shreds = 0; let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( &coding_shred, @@ -3186,8 +3180,8 @@ pub mod tests { // Trying to insert shred with pos >= num_coding should fail { let mut coding_shred = Shred::new_empty_from_header(shred.clone()); - coding_shred.headers.common_header.header.num_coding_shreds = - coding_shred.headers.common_header.header.position; + coding_shred.headers.common_header.num_coding_shreds = + coding_shred.headers.common_header.position; let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( &coding_shred, @@ -3200,14 +3194,9 @@ pub mod tests { // has index > u32::MAX should fail { let mut coding_shred = Shred::new_empty_from_header(shred.clone()); - coding_shred.headers.common_header.header.num_coding_shreds = 3; - coding_shred - .headers - .common_header - .header - .coding_header - .index = std::u32::MAX - 1; - coding_shred.headers.common_header.header.position = 0; + coding_shred.headers.common_header.num_coding_shreds = 3; + coding_shred.headers.common_header.coding_header.index = std::u32::MAX - 1; + coding_shred.headers.common_header.position = 0; let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); assert!(!Blocktree::should_insert_coding_shred( &coding_shred, @@ -3216,7 +3205,7 @@ pub mod tests { )); // Decreasing the number of num_coding_shreds will put it within the allowed limit - coding_shred.headers.common_header.header.num_coding_shreds = 2; + coding_shred.headers.common_header.num_coding_shreds = 2; assert!(Blocktree::should_insert_coding_shred( &coding_shred, index.coding(), diff --git a/core/src/chacha.rs b/core/src/chacha.rs index e6e553cde..c668d6410 100644 --- a/core/src/chacha.rs +++ b/core/src/chacha.rs @@ -153,7 +153,7 @@ mod tests { hasher.hash(&buf[..size]); // golden needs to be updated if blob stuff changes.... - let golden: Hash = "3LWNjNqC6HncoWUhXbk6cUH8NSM675aZqRPGUC4Zq21H" + let golden: Hash = "CLGvEayebjdgnLdttFAweZE9rqVkehXqEStUifG9kiU9" .parse() .unwrap(); diff --git a/core/src/shred.rs b/core/src/shred.rs index ea268ade2..651b9a2af 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -12,19 +12,15 @@ use solana_rayon_threadlimit::get_thread_count; use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; +use std::io; use std::io::{Error as IOError, ErrorKind, Write}; use std::sync::Arc; -use std::{cmp, io}; lazy_static! { static ref SIZE_OF_CODING_SHRED_HEADER: usize = { serialized_size(&CodingShredHeader::default()).unwrap() as usize }; static ref SIZE_OF_DATA_SHRED_HEADER: usize = { serialized_size(&DataShredHeader::default()).unwrap() as usize }; - static ref SIZE_OF_EMPTY_CODING_SHRED: usize = - { serialized_size(&CodingShred::empty_shred()).unwrap() as usize }; - static ref SIZE_OF_EMPTY_DATA_SHRED: usize = - { serialized_size(&DataShred::empty_shred()).unwrap() as usize }; static ref SIZE_OF_SIGNATURE: usize = { bincode::serialized_size(&Signature::default()).unwrap() as usize }; static ref SIZE_OF_EMPTY_VEC: usize = @@ -60,7 +56,7 @@ impl Shred { let header = if shred_type == CODING_SHRED { let end = *SIZE_OF_CODING_SHRED_HEADER; let mut header = DataShredHeader::default(); - header.common_header.header = bincode::deserialize(&shred_buf[..end])?; + header.common_header = bincode::deserialize(&shred_buf[..end])?; header } else { let end = *SIZE_OF_DATA_SHRED_HEADER; @@ -80,7 +76,7 @@ impl Shred { if self.is_data() { &self.headers.data_header } else { - &self.headers.common_header.header.coding_header + &self.headers.common_header.coding_header } } @@ -88,7 +84,7 @@ impl Shred { if self.is_data() { &mut self.headers.data_header } else { - &mut self.headers.common_header.header.coding_header + &mut self.headers.common_header.coding_header } } @@ -133,7 +129,7 @@ impl Shred { } pub fn is_data(&self) -> bool { - self.headers.common_header.header.shred_type == DATA_SHRED + self.headers.common_header.shred_type == DATA_SHRED } pub fn last_in_slot(&self) -> bool { @@ -162,7 +158,7 @@ impl Shred { pub fn coding_params(&self) -> Option<(u16, u16, u16)> { if !self.is_data() { - let header = &self.headers.common_header.header; + let header = &self.headers.common_header; Some(( header.num_data_shreds, header.num_coding_shreds, @@ -175,7 +171,7 @@ impl Shred { pub fn verify(&self, pubkey: &Pubkey) -> bool { let signed_payload_offset = if self.is_data() { - CodingShred::overhead() + *SIZE_OF_CODING_SHRED_HEADER } else { *SIZE_OF_SHRED_TYPE } + *SIZE_OF_SIGNATURE; @@ -205,7 +201,7 @@ pub struct ShredCommonHeader { /// A common header that is present at start of every data shred #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub struct DataShredHeader { - pub common_header: CodingShred, + pub common_header: CodingShredHeader, pub data_header: ShredCommonHeader, pub parent_offset: u16, pub flags: u8, @@ -221,27 +217,12 @@ pub struct CodingShredHeader { pub position: u16, } -#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] -pub struct DataShred { - pub header: DataShredHeader, - pub payload: Vec, -} - -#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] -pub struct CodingShred { - pub header: CodingShredHeader, - pub payload: Vec, -} - impl Default for DataShredHeader { fn default() -> Self { DataShredHeader { - common_header: CodingShred { - header: CodingShredHeader { - shred_type: DATA_SHRED, - ..CodingShredHeader::default() - }, - payload: vec![], + common_header: CodingShredHeader { + shred_type: DATA_SHRED, + ..CodingShredHeader::default() }, data_header: ShredCommonHeader::default(), parent_offset: 0, @@ -262,84 +243,6 @@ impl Default for CodingShredHeader { } } -/// Default shred is sized correctly to meet MTU/Packet size requirements -impl Default for DataShred { - fn default() -> Self { - let size = PACKET_DATA_SIZE - *SIZE_OF_EMPTY_DATA_SHRED; - DataShred { - header: DataShredHeader::default(), - payload: vec![0; size], - } - } -} - -/// Default shred is sized correctly to meet MTU/Packet size requirements -impl Default for CodingShred { - fn default() -> Self { - let size = PACKET_DATA_SIZE - *SIZE_OF_EMPTY_CODING_SHRED; - CodingShred { - header: CodingShredHeader::default(), - payload: vec![0; size], - } - } -} - -/// Common trait implemented by all types of shreds -pub trait ShredCommon { - /// Write at a particular offset in the shred. Returns amount written and leftover capacity - fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize); - /// Overhead of shred enum and headers - fn overhead() -> usize; - /// Utility function to create an empty shred - fn empty_shred() -> Self; -} - -impl ShredCommon for DataShred { - fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) { - let mut capacity = self.payload.len().saturating_sub(offset); - let slice_len = cmp::min(capacity, buf.len()); - capacity -= slice_len; - if slice_len > 0 { - self.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]); - } - (slice_len, capacity) - } - - fn overhead() -> usize { - *SIZE_OF_EMPTY_DATA_SHRED - *SIZE_OF_EMPTY_VEC - } - - fn empty_shred() -> Self { - DataShred { - header: DataShredHeader::default(), - payload: vec![], - } - } -} - -impl ShredCommon for CodingShred { - fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) { - let mut capacity = self.payload.len().saturating_sub(offset); - let slice_len = cmp::min(capacity, buf.len()); - capacity -= slice_len; - if slice_len > 0 { - self.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]); - } - (slice_len, capacity) - } - - fn overhead() -> usize { - *SIZE_OF_EMPTY_CODING_SHRED - } - - fn empty_shred() -> Self { - CodingShred { - header: CodingShredHeader::default(), - payload: vec![], - } - } -} - #[derive(Debug)] pub struct Shredder { slot: u64, @@ -350,14 +253,17 @@ pub struct Shredder { signer: Arc, pub shreds: Vec, fec_set_shred_start: usize, - active_shred: DataShred, + active_shred: Vec, + active_shred_header: DataShredHeader, active_offset: usize, } impl Write for Shredder { fn write(&mut self, buf: &[u8]) -> io::Result { - let written = self.active_offset; - let (slice_len, capacity) = self.active_shred.write_at(written, buf); + let offset = self.active_offset + *SIZE_OF_DATA_SHRED_HEADER; + let slice_len = std::cmp::min(buf.len(), PACKET_DATA_SIZE - offset); + self.active_shred[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]); + let capacity = PACKET_DATA_SIZE - offset - slice_len; if buf.len() > slice_len || capacity == 0 { self.finalize_data_shred(); @@ -408,11 +314,11 @@ impl Shredder { ), ))) } else { - let mut data_shred = DataShred::default(); - data_shred.header.data_header.slot = slot; - data_shred.header.data_header.index = index; - data_shred.header.parent_offset = (slot - parent) as u16; - let active_shred = data_shred; + let mut header = DataShredHeader::default(); + header.data_header.slot = slot; + header.data_header.index = index; + header.parent_offset = (slot - parent) as u16; + let active_shred = vec![0; PACKET_DATA_SIZE]; Ok(Shredder { slot, index, @@ -423,6 +329,7 @@ impl Shredder { shreds: vec![], fec_set_shred_start: 0, active_shred, + active_shred_header: header, active_offset: 0, }) } @@ -439,7 +346,7 @@ impl Shredder { } fn sign_unsigned_shreds_and_generate_codes(&mut self) { - let signature_offset = CodingShred::overhead(); + let signature_offset = *SIZE_OF_CODING_SHRED_HEADER; let signer = self.signer.clone(); PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { @@ -465,41 +372,43 @@ impl Shredder { /// Finalize a data shred. Update the shred index for the next shred fn finalize_data_shred(&mut self) { - let mut data = Vec::with_capacity(PACKET_DATA_SIZE); - bincode::serialize_into(&mut data, &self.active_shred).expect("Failed to serialize shred"); - self.active_offset = 0; self.index += 1; - let mut shred = self.new_data_shred(); - std::mem::swap(&mut shred, &mut self.active_shred); - let shred_info = Shred::new(shred.header, data); - self.shreds.push(shred_info); + // Swap header + let mut header = DataShredHeader::default(); + header.data_header.slot = self.slot; + header.data_header.index = self.index; + header.parent_offset = self.parent_offset; + std::mem::swap(&mut header, &mut self.active_shred_header); + + // Swap shred buffer + let mut shred_buf = vec![0; PACKET_DATA_SIZE]; + std::mem::swap(&mut shred_buf, &mut self.active_shred); + + let mut wr = io::Cursor::new(&mut shred_buf[..*SIZE_OF_DATA_SHRED_HEADER]); + bincode::serialize_into(&mut wr, &header) + .expect("Failed to write header into shred buffer"); + + let shred = Shred::new(header, shred_buf); + self.shreds.push(shred); } - /// Creates a new data shred - fn new_data_shred(&self) -> DataShred { - let mut data_shred = DataShred::default(); - data_shred.header.data_header.slot = self.slot; - data_shred.header.data_header.index = self.index; - data_shred.header.parent_offset = self.parent_offset; - data_shred - } - - pub fn new_coding_shred( + pub fn new_coding_shred_header( slot: u64, index: u32, num_data: usize, num_code: usize, position: usize, - ) -> CodingShred { - let mut coding_shred = CodingShred::default(); - coding_shred.header.coding_header.slot = slot; - coding_shred.header.coding_header.index = index; - coding_shred.header.num_data_shreds = num_data as u16; - coding_shred.header.num_coding_shreds = num_code as u16; - coding_shred.header.position = position as u16; - coding_shred + ) -> DataShredHeader { + let mut header = DataShredHeader::default(); + header.common_header.shred_type = CODING_SHRED; + header.common_header.coding_header.index = index; + header.common_header.coding_header.slot = slot; + header.common_header.num_coding_shreds = num_code as u16; + header.common_header.num_data_shreds = num_data as u16; + header.common_header.position = position as u16; + header } /// Generates coding shreds for the data shreds in the current FEC set @@ -513,7 +422,7 @@ impl Shredder { let start_index = self.index - num_data as u32; // All information after coding shred field in a data shred is encoded - let coding_block_offset = CodingShred::overhead(); + let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER; let data_ptrs: Vec<_> = self.shreds[self.fec_set_shred_start..] .iter() .map(|data| &data.payload[coding_block_offset..]) @@ -522,15 +431,15 @@ impl Shredder { // Create empty coding shreds, with correctly populated headers let mut coding_shreds = Vec::with_capacity(num_coding); (0..num_coding).for_each(|i| { - let shred = bincode::serialize(&Self::new_coding_shred( + let header = Self::new_coding_shred_header( self.slot, start_index + i as u32, num_data, num_coding, i, - )) - .unwrap(); - coding_shreds.push(shred); + ); + let shred = Shred::new_empty_from_header(header); + coding_shreds.push(shred.payload); }); // Grab pointers for the coding blocks @@ -546,15 +455,14 @@ impl Shredder { // append to the shred list coding_shreds.into_iter().enumerate().for_each(|(i, code)| { - let mut header = DataShredHeader::default(); - header.common_header.header.shred_type = CODING_SHRED; - header.common_header.header.coding_header.index = start_index + i as u32; - header.common_header.header.coding_header.slot = self.slot; - header.common_header.header.num_coding_shreds = num_coding as u16; - header.common_header.header.num_data_shreds = num_data as u16; - header.common_header.header.position = i as u16; - let shred_info = Shred::new(header, code); - self.shreds.push(shred_info); + let header = Self::new_coding_shred_header( + self.slot, + start_index + i as u32, + num_data, + num_coding, + i, + ); + self.shreds.push(Shred::new(header, code)); }); self.fec_set_index = self.index; } @@ -564,12 +472,12 @@ impl Shredder { /// If there's an active data shred, morph it into the final shred /// If the current active data shred is first in slot, finalize it and create a new shred fn make_final_data_shred(&mut self, last_in_slot: u8) { - if self.active_shred.header.data_header.index == 0 { + if self.active_shred_header.data_header.index == 0 { self.finalize_data_shred(); } - self.active_shred.header.flags |= DATA_COMPLETE_SHRED; + self.active_shred_header.flags |= DATA_COMPLETE_SHRED; if last_in_slot == LAST_SHRED_IN_SLOT { - self.active_shred.header.flags |= LAST_SHRED_IN_SLOT; + self.active_shred_header.flags |= LAST_SHRED_IN_SLOT; } self.finalize_data_shred(); self.sign_unsigned_shreds_and_generate_codes(); @@ -618,21 +526,22 @@ impl Shredder { first_index: usize, missing: usize, ) -> Vec { - if missing < first_index + num_data { - let mut data_shred = DataShred::default(); - data_shred.header.data_header.slot = slot; - data_shred.header.data_header.index = missing as u32; - bincode::serialize(&data_shred).unwrap() + let header = if missing < first_index + num_data { + let mut header = DataShredHeader::default(); + header.data_header.slot = slot; + header.data_header.index = missing as u32; + header } else { - bincode::serialize(&Self::new_coding_shred( + Self::new_coding_shred_header( slot, missing.saturating_sub(num_data) as u32, num_data, num_coding, missing - first_index - num_data, - )) - .unwrap() - } + ) + }; + let shred = Shred::new_empty_from_header(header); + shred.payload } pub fn try_recovery( @@ -646,7 +555,7 @@ impl Shredder { let mut recovered_code = vec![]; let fec_set_size = num_data + num_coding; if num_coding > 0 && shreds.len() < fec_set_size { - let coding_block_offset = CodingShred::overhead(); + let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER; // Let's try recovering missing shreds using erasure let mut present = &mut vec![true; fec_set_size]; @@ -764,7 +673,7 @@ impl Shredder { data_shred_bufs[..num_data] .iter() .flat_map(|data| { - let offset = *SIZE_OF_EMPTY_DATA_SHRED; + let offset = *SIZE_OF_DATA_SHRED_HEADER; data[offset as usize..].iter() }) .cloned() @@ -819,9 +728,6 @@ mod tests { assert!(shredder.shreds.is_empty()); assert_eq!(shredder.active_offset, 0); - assert!(DataShred::overhead() < PACKET_DATA_SIZE); - assert!(CodingShred::overhead() < PACKET_DATA_SIZE); - // Test0: Write some data to shred. Not enough to create a signed shred let data: Vec = (0..25).collect(); assert_eq!(shredder.write(&data).unwrap(), data.len()); @@ -1372,7 +1278,7 @@ mod tests { let mut index = 0; while index < shredder.shreds.len() { - let num_data_shreds = cmp::min( + let num_data_shreds = std::cmp::min( MAX_DATA_SHREDS_PER_FEC_BLOCK as usize, (shredder.shreds.len() - index) / 2, );