From ee791e2e3e34a7d9f8564f6625dbf3852cd26578 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Sat, 14 Sep 2019 21:05:54 -0700 Subject: [PATCH] Optimizations to shred writing and signing (#5890) * Optimizations to shred writing and signing * fix broken tests * fixes --- Cargo.lock | 1 + core/Cargo.toml | 1 + core/src/blocktree.rs | 22 ++--- core/src/shred.rs | 198 +++++++++++++++++++++--------------------- 4 files changed, 114 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81ab0b691..8ef699760 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3082,6 +3082,7 @@ dependencies = [ "jsonrpc-http-server 13.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-pubsub 13.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-ws-server 13.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/Cargo.toml b/core/Cargo.toml index 99018d4b2..3ce50684c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -36,6 +36,7 @@ jsonrpc-derive = "13.1.0" jsonrpc-http-server = "13.1.0" jsonrpc-pubsub = "13.1.0" jsonrpc-ws-server = "13.1.0" +lazy_static = "1.4.0" libc = "0.2.62" log = "0.4.8" memmap = { version = "0.7.0", optional = true } diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index a97f2fbd4..bb4b722e0 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -2435,7 +2435,7 @@ pub mod tests { pub fn test_handle_chaining_basic() { let blocktree_path = get_tmp_ledger_path("test_handle_chaining_basic"); { - let entries_per_slot = 2; + let entries_per_slot = 5; let num_slots = 3; let blocktree = Blocktree::open(&blocktree_path).unwrap(); @@ -2453,7 +2453,7 @@ pub mod tests { // Slot 1 is not trunk because slot 0 hasn't been inserted yet assert!(!s1.is_connected); assert_eq!(s1.parent_slot, 0); - assert_eq!(s1.last_index, entries_per_slot - 1); + assert_eq!(s1.last_index, shreds_per_slot as u64 - 1); // 2) Write to the second slot let shreds2 = shreds @@ -2465,7 +2465,7 @@ pub mod tests { // Slot 2 is not trunk because slot 0 hasn't been inserted yet assert!(!s2.is_connected); assert_eq!(s2.parent_slot, 1); - assert_eq!(s2.last_index, entries_per_slot - 1); + assert_eq!(s2.last_index, shreds_per_slot as u64 - 1); // Check the first slot again, it should chain to the second slot, // but still isn't part of the trunk @@ -2473,7 +2473,7 @@ pub mod tests { assert_eq!(s1.next_slots, vec![2]); assert!(!s1.is_connected); assert_eq!(s1.parent_slot, 0); - assert_eq!(s1.last_index, entries_per_slot - 1); + assert_eq!(s1.last_index, shreds_per_slot as u64 - 1); // 3) Write to the zeroth slot, check that every slot // is now part of the trunk @@ -2489,7 +2489,7 @@ pub mod tests { } else { assert_eq!(s.parent_slot, i - 1); } - assert_eq!(s.last_index, entries_per_slot - 1); + assert_eq!(s.last_index, shreds_per_slot as u64 - 1); assert!(s.is_connected); } } @@ -2502,11 +2502,12 @@ pub mod tests { { let blocktree = Blocktree::open(&blocktree_path).unwrap(); let num_slots = 30; - let entries_per_slot = 2; + let entries_per_slot = 5; // Separate every other slot into two separate vectors let mut slots = vec![]; let mut missing_slots = vec![]; + let mut shreds_per_slot = 2; for slot in 0..num_slots { let parent_slot = { if slot == 0 { @@ -2516,6 +2517,7 @@ pub mod tests { } }; let (slot_shreds, _) = make_slot_entries(slot, parent_slot, entries_per_slot); + shreds_per_slot = slot_shreds.len(); if slot % 2 == 1 { slots.extend(slot_shreds); @@ -2568,7 +2570,7 @@ pub mod tests { } else { assert_eq!(s.parent_slot, i - 1); } - assert_eq!(s.last_index, entries_per_slot - 1); + assert_eq!(s.last_index, shreds_per_slot as u64 - 1); assert!(s.is_connected); } } @@ -2582,7 +2584,7 @@ pub mod tests { { let blocktree = Blocktree::open(&blocktree_path).unwrap(); let num_slots = 15; - let entries_per_slot = 2; + let entries_per_slot = 5; assert!(entries_per_slot > 1); let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot); @@ -2617,7 +2619,7 @@ pub mod tests { assert_eq!(s.parent_slot, i - 1); } - assert_eq!(s.last_index, entries_per_slot - 1); + assert_eq!(s.last_index, shreds_per_slot as u64 - 1); // Other than slot 0, no slots should be part of the trunk if i != 0 { @@ -2653,7 +2655,7 @@ pub mod tests { assert_eq!(s.parent_slot, i - 1); } - assert_eq!(s.last_index, entries_per_slot - 1); + assert_eq!(s.last_index, shreds_per_slot as u64 - 1); } } } diff --git a/core/src/shred.rs b/core/src/shred.rs index 9cae97e34..9e0a8909a 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -4,6 +4,7 @@ use crate::result; use crate::result::Error; use bincode::serialized_size; use core::borrow::BorrowMut; +use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; @@ -12,6 +13,21 @@ use std::io::{Error as IOError, ErrorKind, Write}; use std::sync::Arc; use std::{cmp, io}; +lazy_static! { + static ref SIZE_OF_EMPTY_CODING_SHRED: usize = + { serialized_size(&CodingShred::empty_shred()).unwrap() as usize }; + static ref SIZE_OF_EMPTY_DATA_SHRED: usize = + { serialized_size(&DataShred::empty_shred()).unwrap() as usize }; + static ref SIZE_OF_SHRED_CODING_SHRED: usize = + { serialized_size(&Shred::Coding(CodingShred::empty_shred())).unwrap() as usize }; + static ref SIZE_OF_SHRED_DATA_SHRED: usize = + { serialized_size(&Shred::Data(DataShred::empty_shred())).unwrap() as usize }; + static ref SIZE_OF_SIGNATURE: usize = + { bincode::serialized_size(&Signature::default()).unwrap() as usize }; + static ref SIZE_OF_EMPTY_VEC: usize = + { bincode::serialized_size(&vec![0u8; 0]).unwrap() as usize }; +} + #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub struct ShredMetaBuf { pub slot: u64, @@ -140,12 +156,8 @@ impl Shred { | Shred::Data(_) | Shred::DataComplete(_) | Shred::LastInSlot(_) => CodingShred::overhead(), - Shred::Coding(_) => { - CodingShred::overhead() - - serialized_size(&CodingShred::empty_shred()).unwrap() as usize - } - } + bincode::serialized_size(&Signature::default()).unwrap() - as usize; + Shred::Coding(_) => CodingShred::overhead() - *SIZE_OF_EMPTY_CODING_SHRED, + } + *SIZE_OF_SIGNATURE; self.signature() .verify(pubkey.as_ref(), &shred_buf[signed_payload_offset..]) } @@ -200,8 +212,7 @@ pub struct CodingShred { /// Default shred is sized correctly to meet MTU/Packet size requirements impl Default for DataShred { fn default() -> Self { - let size = - PACKET_DATA_SIZE - serialized_size(&Shred::Data(Self::empty_shred())).unwrap() as usize; + let size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_DATA_SHRED; DataShred { header: DataShredHeader::default(), payload: vec![0; size], @@ -212,8 +223,7 @@ impl Default for DataShred { /// Default shred is sized correctly to meet MTU/Packet size requirements impl Default for CodingShred { fn default() -> Self { - let size = PACKET_DATA_SIZE - - serialized_size(&Shred::Coding(Self::empty_shred())).unwrap() as usize; + let size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_CODING_SHRED; CodingShred { header: CodingShredHeader { common_header: ShredCommonHeader::default(), @@ -248,8 +258,7 @@ impl ShredCommon for DataShred { } fn overhead() -> usize { - (bincode::serialized_size(&Shred::Data(Self::empty_shred())).unwrap() - - bincode::serialized_size(&vec![0u8; 0]).unwrap()) as usize + *SIZE_OF_SHRED_DATA_SHRED - *SIZE_OF_EMPTY_VEC } fn empty_shred() -> Self { @@ -272,7 +281,7 @@ impl ShredCommon for CodingShred { } fn overhead() -> usize { - bincode::serialized_size(&Shred::Coding(Self::empty_shred())).unwrap() as usize + *SIZE_OF_SHRED_CODING_SHRED } fn empty_shred() -> Self { @@ -282,7 +291,7 @@ impl ShredCommon for CodingShred { } } -#[derive(Default, Debug)] +#[derive(Debug)] pub struct Shredder { slot: u64, pub index: u32, @@ -292,28 +301,14 @@ pub struct Shredder { signer: Arc, pub shred_tuples: Vec<(Shred, Vec)>, fec_set_shred_start: usize, - active_shred: Option, + active_shred: Shred, active_offset: usize, } impl Write for Shredder { fn write(&mut self, buf: &[u8]) -> io::Result { - let mut current_shred = self - .active_shred - .take() - .or_else(|| { - Some(if self.index == 0 { - // If index is 0, it's the first shred in slot - Shred::FirstInSlot(self.new_data_shred()) - } else { - // Else, it is the first shred in FEC set - Shred::Data(self.new_data_shred()) - }) - }) - .unwrap(); - let written = self.active_offset; - let (slice_len, capacity) = match current_shred.borrow_mut() { + let (slice_len, capacity) = match self.active_shred.borrow_mut() { Shred::FirstInSlot(s) | Shred::Data(s) | Shred::DataComplete(s) @@ -321,20 +316,14 @@ impl Write for Shredder { Shred::Coding(s) => s.write_at(written, buf), }; - let active_shred = if buf.len() > slice_len || capacity == 0 { - self.finalize_data_shred(current_shred); - // Continue generating more data shreds. - // If the caller decides to finalize the FEC block or Slot, the data shred will - // morph into appropriate shred accordingly - Shred::Data(self.new_data_shred()) + if buf.len() > slice_len || capacity == 0 { + self.finalize_data_shred(); } else { self.active_offset += slice_len; - current_shred - }; + } - self.active_shred = Some(active_shred); if self.index - self.fec_set_index >= MAX_DATA_SHREDS_PER_FEC_BLOCK { - self.generate_coding_shreds(); + self.sign_unsigned_shreds_and_generate_codes(); } Ok(slice_len) @@ -383,6 +372,16 @@ impl Shredder { ), ))) } else { + let mut data_shred = DataShred::default(); + data_shred.header.common_header.slot = slot; + data_shred.header.common_header.index = index; + data_shred.header.parent_offset = (slot - parent) as u16; + let active_shred = if index == 0 { + // If index is 0, it's the first shred in slot + Shred::FirstInSlot(data_shred) + } else { + Shred::Data(data_shred) + }; Ok(Shredder { slot, index, @@ -390,37 +389,56 @@ impl Shredder { parent_offset: (slot - parent) as u16, fec_rate, signer: signer.clone(), - ..Shredder::default() + shred_tuples: vec![], + fec_set_shred_start: 0, + active_shred, + active_offset: 0, }) } } - /// Serialize the payload, sign it and store the signature in the shred - /// Store the signed shred in the vector of shreds - fn finalize_shred( - &mut self, - mut shred: Shred, - mut shred_buf: Vec, + fn sign_shred( + signer: &Arc, + shred: &mut Shred, + shred_buf: &mut [u8], signature_offset: usize, ) { - let data_offset = - signature_offset + bincode::serialized_size(&Signature::default()).unwrap() as usize; - let signature = self.signer.sign_message(&shred_buf[data_offset..]); + let data_offset = signature_offset + *SIZE_OF_SIGNATURE; + let signature = signer.sign_message(&shred_buf[data_offset..]); let serialized_signature = bincode::serialize(&signature).expect("Failed to generate serialized signature"); shred.set_signature(signature); shred_buf[signature_offset..signature_offset + serialized_signature.len()] .copy_from_slice(&serialized_signature); - self.shred_tuples.push((shred, shred_buf)); + } + + fn sign_unsigned_shreds_and_generate_codes(&mut self) { + let signature_offset = CodingShred::overhead(); + let signer = self.signer.clone(); + self.shred_tuples[self.fec_set_shred_start..] + .iter_mut() + .for_each(|(s, d)| Self::sign_shred(&signer, s, d, signature_offset)); + let unsigned_coding_shred_start = self.shred_tuples.len(); + self.generate_coding_shreds(); + let coding_header_offset = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED; + self.shred_tuples[unsigned_coding_shred_start..] + .iter_mut() + .for_each(|(s, d)| Self::sign_shred(&signer, s, d, coding_header_offset)); + self.fec_set_shred_start = self.shred_tuples.len(); } /// Finalize a data shred. Update the shred index for the next shred - fn finalize_data_shred(&mut self, shred: Shred) { - let data = bincode::serialize(&shred).expect("Failed to serialize shred"); + fn finalize_data_shred(&mut self) { + let mut data = vec![0; PACKET_DATA_SIZE]; + let mut wr = io::Cursor::new(&mut data[..]); + bincode::serialize_into(&mut wr, &self.active_shred).expect("Failed to serialize shred"); - self.finalize_shred(shred, data, CodingShred::overhead()); self.active_offset = 0; self.index += 1; + + let mut shred = Shred::Data(self.new_data_shred()); + std::mem::swap(&mut shred, &mut self.active_shred); + self.shred_tuples.push((shred, data)); } /// Creates a new data shred @@ -489,54 +507,49 @@ impl Shredder { .encode(&data_ptrs, coding_ptrs.as_mut_slice()) .expect("Failed in erasure encode"); - // Offset of coding shred header in the Coding Shred (i.e. overhead of enum variant) - let coding_header_offset = (serialized_size(&Shred::Coding(CodingShred::empty_shred())) - .unwrap() - - serialized_size(&CodingShred::empty_shred()).unwrap()) - as usize; - - // Finalize the coding blocks (sign and append to the shred list) + // append to the shred list coding_shreds.into_iter().for_each(|code| { let shred: Shred = bincode::deserialize(&code).unwrap(); - self.finalize_shred(shred, code, coding_header_offset) + self.shred_tuples.push((shred, code)); }); self.fec_set_index = self.index; - self.fec_set_shred_start = self.shred_tuples.len(); } } /// Create the final data shred for the current FEC set or slot /// If there's an active data shred, morph it into the final shred /// If the current active data shred is first in slot, finalize it and create a new shred - fn make_final_data_shred(&mut self) -> DataShred { - let mut shred = self - .active_shred - .take() - .map_or(self.new_data_shred(), |current_shred| match current_shred { - Shred::FirstInSlot(s) => { - self.finalize_data_shred(Shred::FirstInSlot(s)); - self.new_data_shred() + fn make_final_data_shred(&mut self, last_in_slot: u8) { + if let Shred::FirstInSlot(_) = &self.active_shred { + self.finalize_data_shred(); + } + self.active_shred = match self.active_shred.borrow_mut() { + Shred::FirstInSlot(s) + | Shred::Data(s) + | Shred::DataComplete(s) + | Shred::LastInSlot(s) => { + s.header.flags |= DATA_COMPLETE_SHRED; + if last_in_slot == LAST_SHRED_IN_SLOT { + s.header.flags |= LAST_SHRED_IN_SLOT; + Shred::LastInSlot(s.clone()) + } else { + Shred::DataComplete(s.clone()) } - Shred::Data(s) | Shred::DataComplete(s) | Shred::LastInSlot(s) => s, - Shred::Coding(_) => self.new_data_shred(), - }); - shred.header.flags |= DATA_COMPLETE_SHRED; - shred + } + Shred::Coding(_) => unreachable!(), + }; + self.finalize_data_shred(); + self.sign_unsigned_shreds_and_generate_codes(); } /// Finalize the current FEC block, and generate coding shreds pub fn finalize_data(&mut self) { - let final_shred = self.make_final_data_shred(); - self.finalize_data_shred(Shred::DataComplete(final_shred)); - self.generate_coding_shreds(); + self.make_final_data_shred(0); } /// Finalize the current slot (i.e. add last slot shred) and generate coding shreds pub fn finalize_slot(&mut self) { - let mut final_shred = self.make_final_data_shred(); - final_shred.header.flags |= LAST_SHRED_IN_SLOT; - self.finalize_data_shred(Shred::LastInSlot(final_shred)); - self.generate_coding_shreds(); + self.make_final_data_shred(LAST_SHRED_IN_SLOT); } fn fill_in_missing_shreds( @@ -738,8 +751,7 @@ impl Shredder { data_shred_bufs[..num_data] .iter() .flat_map(|data| { - let offset = - bincode::serialized_size(&Shred::Data(DataShred::empty_shred())).unwrap(); + let offset = *SIZE_OF_SHRED_DATA_SHRED; data[offset as usize..].iter() }) .cloned() @@ -768,7 +780,6 @@ mod tests { Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder"); assert!(shredder.shred_tuples.is_empty()); - assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 0); assert!(DataShred::overhead() < PACKET_DATA_SIZE); @@ -778,7 +789,6 @@ mod tests { let data: Vec = (0..25).collect(); assert_eq!(shredder.write(&data).unwrap(), data.len()); assert!(shredder.shred_tuples.is_empty()); - assert_ne!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 25); // Test1: Write some more data to shred. Not enough to create a signed shred @@ -793,8 +803,6 @@ mod tests { assert_ne!(offset, data.len()); // Assert that we have atleast one signed shred assert!(!shredder.shred_tuples.is_empty()); - // Assert that a new active shred was also created - assert_ne!(shredder.active_shred, None); // Assert that the new active shred was not populated assert_eq!(shredder.active_offset, 0); @@ -811,7 +819,8 @@ mod tests { assert_eq!(deserialized_shred.index(), 0); assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.parent(), slot - 5); - assert!(deserialized_shred.verify(&keypair.pubkey())); + // The shreds are not signed yet, as the data is not finalized + assert!(!deserialized_shred.verify(&keypair.pubkey())); let seed0 = deserialized_shred.seed(); // Test that same seed is generated for a given shred assert_eq!(seed0, deserialized_shred.seed()); @@ -860,7 +869,6 @@ mod tests { assert_eq!(deserialized_shred.index(), 2); assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.parent(), slot - 5); - assert!(deserialized_shred.verify(&keypair.pubkey())); // Test8: Write more data to generate an intermediate data shred let offset = shredder.write(&data).unwrap(); @@ -878,7 +886,6 @@ mod tests { assert_eq!(deserialized_shred.index(), 3); assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.parent(), slot - 5); - assert!(deserialized_shred.verify(&keypair.pubkey())); // Test9: Write some data to shredder let data: Vec = (0..25).collect(); @@ -899,7 +906,6 @@ mod tests { assert_eq!(deserialized_shred.index(), 4); assert_eq!(deserialized_shred.slot(), slot); assert_eq!(deserialized_shred.parent(), slot - 5); - assert!(deserialized_shred.verify(&keypair.pubkey())); } #[test] @@ -911,7 +917,6 @@ mod tests { Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder"); assert!(shredder.shred_tuples.is_empty()); - assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 0); let data: Vec<_> = (0..25).collect(); @@ -923,7 +928,7 @@ mod tests { shredder.finalize_data(); - // We should have 2 shreds now (FirstInSlot, and LastInFECBlock) + // We should have 1 shred now assert_eq!(shredder.shred_tuples.len(), 2); let (_, shred) = shredder.shred_tuples.remove(0); @@ -948,7 +953,6 @@ mod tests { .expect("Failed in creating shredder"); assert!(shredder.shred_tuples.is_empty()); - assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 0); let data: Vec<_> = (0..25).collect(); @@ -984,7 +988,6 @@ mod tests { .expect("Failed in creating shredder"); assert!(shredder.shred_tuples.is_empty()); - assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 0); // Write enough data to create a shred (> PACKET_DATA_SIZE) @@ -1060,7 +1063,6 @@ mod tests { Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder"); assert!(shredder.shred_tuples.is_empty()); - assert_eq!(shredder.active_shred, None); assert_eq!(shredder.active_offset, 0); let data: Vec<_> = (0..5000).collect();