Optimizations to shred writing and signing (#5890)

* Optimizations to shred writing and signing

* fix broken tests

* fixes
This commit is contained in:
Pankaj Garg 2019-09-14 21:05:54 -07:00 committed by GitHub
parent 140d4ccf77
commit ee791e2e3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 114 additions and 108 deletions

1
Cargo.lock generated
View File

@ -3082,6 +3082,7 @@ dependencies = [
"jsonrpc-http-server 13.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-pubsub 13.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-ws-server 13.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -36,6 +36,7 @@ jsonrpc-derive = "13.1.0"
jsonrpc-http-server = "13.1.0"
jsonrpc-pubsub = "13.1.0"
jsonrpc-ws-server = "13.1.0"
lazy_static = "1.4.0"
libc = "0.2.62"
log = "0.4.8"
memmap = { version = "0.7.0", optional = true }

View File

@ -2435,7 +2435,7 @@ pub mod tests {
pub fn test_handle_chaining_basic() {
let blocktree_path = get_tmp_ledger_path("test_handle_chaining_basic");
{
let entries_per_slot = 2;
let entries_per_slot = 5;
let num_slots = 3;
let blocktree = Blocktree::open(&blocktree_path).unwrap();
@ -2453,7 +2453,7 @@ pub mod tests {
// Slot 1 is not trunk because slot 0 hasn't been inserted yet
assert!(!s1.is_connected);
assert_eq!(s1.parent_slot, 0);
assert_eq!(s1.last_index, entries_per_slot - 1);
assert_eq!(s1.last_index, shreds_per_slot as u64 - 1);
// 2) Write to the second slot
let shreds2 = shreds
@ -2465,7 +2465,7 @@ pub mod tests {
// Slot 2 is not trunk because slot 0 hasn't been inserted yet
assert!(!s2.is_connected);
assert_eq!(s2.parent_slot, 1);
assert_eq!(s2.last_index, entries_per_slot - 1);
assert_eq!(s2.last_index, shreds_per_slot as u64 - 1);
// Check the first slot again, it should chain to the second slot,
// but still isn't part of the trunk
@ -2473,7 +2473,7 @@ pub mod tests {
assert_eq!(s1.next_slots, vec![2]);
assert!(!s1.is_connected);
assert_eq!(s1.parent_slot, 0);
assert_eq!(s1.last_index, entries_per_slot - 1);
assert_eq!(s1.last_index, shreds_per_slot as u64 - 1);
// 3) Write to the zeroth slot, check that every slot
// is now part of the trunk
@ -2489,7 +2489,7 @@ pub mod tests {
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, entries_per_slot - 1);
assert_eq!(s.last_index, shreds_per_slot as u64 - 1);
assert!(s.is_connected);
}
}
@ -2502,11 +2502,12 @@ pub mod tests {
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_slots = 30;
let entries_per_slot = 2;
let entries_per_slot = 5;
// Separate every other slot into two separate vectors
let mut slots = vec![];
let mut missing_slots = vec![];
let mut shreds_per_slot = 2;
for slot in 0..num_slots {
let parent_slot = {
if slot == 0 {
@ -2516,6 +2517,7 @@ pub mod tests {
}
};
let (slot_shreds, _) = make_slot_entries(slot, parent_slot, entries_per_slot);
shreds_per_slot = slot_shreds.len();
if slot % 2 == 1 {
slots.extend(slot_shreds);
@ -2568,7 +2570,7 @@ pub mod tests {
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, entries_per_slot - 1);
assert_eq!(s.last_index, shreds_per_slot as u64 - 1);
assert!(s.is_connected);
}
}
@ -2582,7 +2584,7 @@ pub mod tests {
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_slots = 15;
let entries_per_slot = 2;
let entries_per_slot = 5;
assert!(entries_per_slot > 1);
let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
@ -2617,7 +2619,7 @@ pub mod tests {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, entries_per_slot - 1);
assert_eq!(s.last_index, shreds_per_slot as u64 - 1);
// Other than slot 0, no slots should be part of the trunk
if i != 0 {
@ -2653,7 +2655,7 @@ pub mod tests {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, entries_per_slot - 1);
assert_eq!(s.last_index, shreds_per_slot as u64 - 1);
}
}
}

View File

@ -4,6 +4,7 @@ use crate::result;
use crate::result::Error;
use bincode::serialized_size;
use core::borrow::BorrowMut;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey;
@ -12,6 +13,21 @@ use std::io::{Error as IOError, ErrorKind, Write};
use std::sync::Arc;
use std::{cmp, io};
lazy_static! {
static ref SIZE_OF_EMPTY_CODING_SHRED: usize =
{ serialized_size(&CodingShred::empty_shred()).unwrap() as usize };
static ref SIZE_OF_EMPTY_DATA_SHRED: usize =
{ serialized_size(&DataShred::empty_shred()).unwrap() as usize };
static ref SIZE_OF_SHRED_CODING_SHRED: usize =
{ serialized_size(&Shred::Coding(CodingShred::empty_shred())).unwrap() as usize };
static ref SIZE_OF_SHRED_DATA_SHRED: usize =
{ serialized_size(&Shred::Data(DataShred::empty_shred())).unwrap() as usize };
static ref SIZE_OF_SIGNATURE: usize =
{ bincode::serialized_size(&Signature::default()).unwrap() as usize };
static ref SIZE_OF_EMPTY_VEC: usize =
{ bincode::serialized_size(&vec![0u8; 0]).unwrap() as usize };
}
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct ShredMetaBuf {
pub slot: u64,
@ -140,12 +156,8 @@ impl Shred {
| Shred::Data(_)
| Shred::DataComplete(_)
| Shred::LastInSlot(_) => CodingShred::overhead(),
Shred::Coding(_) => {
CodingShred::overhead()
- serialized_size(&CodingShred::empty_shred()).unwrap() as usize
}
} + bincode::serialized_size(&Signature::default()).unwrap()
as usize;
Shred::Coding(_) => CodingShred::overhead() - *SIZE_OF_EMPTY_CODING_SHRED,
} + *SIZE_OF_SIGNATURE;
self.signature()
.verify(pubkey.as_ref(), &shred_buf[signed_payload_offset..])
}
@ -200,8 +212,7 @@ pub struct CodingShred {
/// Default shred is sized correctly to meet MTU/Packet size requirements
impl Default for DataShred {
fn default() -> Self {
let size =
PACKET_DATA_SIZE - serialized_size(&Shred::Data(Self::empty_shred())).unwrap() as usize;
let size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_DATA_SHRED;
DataShred {
header: DataShredHeader::default(),
payload: vec![0; size],
@ -212,8 +223,7 @@ impl Default for DataShred {
/// Default shred is sized correctly to meet MTU/Packet size requirements
impl Default for CodingShred {
fn default() -> Self {
let size = PACKET_DATA_SIZE
- serialized_size(&Shred::Coding(Self::empty_shred())).unwrap() as usize;
let size = PACKET_DATA_SIZE - *SIZE_OF_SHRED_CODING_SHRED;
CodingShred {
header: CodingShredHeader {
common_header: ShredCommonHeader::default(),
@ -248,8 +258,7 @@ impl ShredCommon for DataShred {
}
fn overhead() -> usize {
(bincode::serialized_size(&Shred::Data(Self::empty_shred())).unwrap()
- bincode::serialized_size(&vec![0u8; 0]).unwrap()) as usize
*SIZE_OF_SHRED_DATA_SHRED - *SIZE_OF_EMPTY_VEC
}
fn empty_shred() -> Self {
@ -272,7 +281,7 @@ impl ShredCommon for CodingShred {
}
fn overhead() -> usize {
bincode::serialized_size(&Shred::Coding(Self::empty_shred())).unwrap() as usize
*SIZE_OF_SHRED_CODING_SHRED
}
fn empty_shred() -> Self {
@ -282,7 +291,7 @@ impl ShredCommon for CodingShred {
}
}
#[derive(Default, Debug)]
#[derive(Debug)]
pub struct Shredder {
slot: u64,
pub index: u32,
@ -292,28 +301,14 @@ pub struct Shredder {
signer: Arc<Keypair>,
pub shred_tuples: Vec<(Shred, Vec<u8>)>,
fec_set_shred_start: usize,
active_shred: Option<Shred>,
active_shred: Shred,
active_offset: usize,
}
impl Write for Shredder {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut current_shred = self
.active_shred
.take()
.or_else(|| {
Some(if self.index == 0 {
// If index is 0, it's the first shred in slot
Shred::FirstInSlot(self.new_data_shred())
} else {
// Else, it is the first shred in FEC set
Shred::Data(self.new_data_shred())
})
})
.unwrap();
let written = self.active_offset;
let (slice_len, capacity) = match current_shred.borrow_mut() {
let (slice_len, capacity) = match self.active_shred.borrow_mut() {
Shred::FirstInSlot(s)
| Shred::Data(s)
| Shred::DataComplete(s)
@ -321,20 +316,14 @@ impl Write for Shredder {
Shred::Coding(s) => s.write_at(written, buf),
};
let active_shred = if buf.len() > slice_len || capacity == 0 {
self.finalize_data_shred(current_shred);
// Continue generating more data shreds.
// If the caller decides to finalize the FEC block or Slot, the data shred will
// morph into appropriate shred accordingly
Shred::Data(self.new_data_shred())
if buf.len() > slice_len || capacity == 0 {
self.finalize_data_shred();
} else {
self.active_offset += slice_len;
current_shred
};
}
self.active_shred = Some(active_shred);
if self.index - self.fec_set_index >= MAX_DATA_SHREDS_PER_FEC_BLOCK {
self.generate_coding_shreds();
self.sign_unsigned_shreds_and_generate_codes();
}
Ok(slice_len)
@ -383,6 +372,16 @@ impl Shredder {
),
)))
} else {
let mut data_shred = DataShred::default();
data_shred.header.common_header.slot = slot;
data_shred.header.common_header.index = index;
data_shred.header.parent_offset = (slot - parent) as u16;
let active_shred = if index == 0 {
// If index is 0, it's the first shred in slot
Shred::FirstInSlot(data_shred)
} else {
Shred::Data(data_shred)
};
Ok(Shredder {
slot,
index,
@ -390,37 +389,56 @@ impl Shredder {
parent_offset: (slot - parent) as u16,
fec_rate,
signer: signer.clone(),
..Shredder::default()
shred_tuples: vec![],
fec_set_shred_start: 0,
active_shred,
active_offset: 0,
})
}
}
/// Serialize the payload, sign it and store the signature in the shred
/// Store the signed shred in the vector of shreds
fn finalize_shred(
&mut self,
mut shred: Shred,
mut shred_buf: Vec<u8>,
fn sign_shred(
signer: &Arc<Keypair>,
shred: &mut Shred,
shred_buf: &mut [u8],
signature_offset: usize,
) {
let data_offset =
signature_offset + bincode::serialized_size(&Signature::default()).unwrap() as usize;
let signature = self.signer.sign_message(&shred_buf[data_offset..]);
let data_offset = signature_offset + *SIZE_OF_SIGNATURE;
let signature = signer.sign_message(&shred_buf[data_offset..]);
let serialized_signature =
bincode::serialize(&signature).expect("Failed to generate serialized signature");
shred.set_signature(signature);
shred_buf[signature_offset..signature_offset + serialized_signature.len()]
.copy_from_slice(&serialized_signature);
self.shred_tuples.push((shred, shred_buf));
}
fn sign_unsigned_shreds_and_generate_codes(&mut self) {
let signature_offset = CodingShred::overhead();
let signer = self.signer.clone();
self.shred_tuples[self.fec_set_shred_start..]
.iter_mut()
.for_each(|(s, d)| Self::sign_shred(&signer, s, d, signature_offset));
let unsigned_coding_shred_start = self.shred_tuples.len();
self.generate_coding_shreds();
let coding_header_offset = *SIZE_OF_SHRED_CODING_SHRED - *SIZE_OF_EMPTY_CODING_SHRED;
self.shred_tuples[unsigned_coding_shred_start..]
.iter_mut()
.for_each(|(s, d)| Self::sign_shred(&signer, s, d, coding_header_offset));
self.fec_set_shred_start = self.shred_tuples.len();
}
/// Finalize a data shred. Update the shred index for the next shred
fn finalize_data_shred(&mut self, shred: Shred) {
let data = bincode::serialize(&shred).expect("Failed to serialize shred");
fn finalize_data_shred(&mut self) {
let mut data = vec![0; PACKET_DATA_SIZE];
let mut wr = io::Cursor::new(&mut data[..]);
bincode::serialize_into(&mut wr, &self.active_shred).expect("Failed to serialize shred");
self.finalize_shred(shred, data, CodingShred::overhead());
self.active_offset = 0;
self.index += 1;
let mut shred = Shred::Data(self.new_data_shred());
std::mem::swap(&mut shred, &mut self.active_shred);
self.shred_tuples.push((shred, data));
}
/// Creates a new data shred
@ -489,54 +507,49 @@ impl Shredder {
.encode(&data_ptrs, coding_ptrs.as_mut_slice())
.expect("Failed in erasure encode");
// Offset of coding shred header in the Coding Shred (i.e. overhead of enum variant)
let coding_header_offset = (serialized_size(&Shred::Coding(CodingShred::empty_shred()))
.unwrap()
- serialized_size(&CodingShred::empty_shred()).unwrap())
as usize;
// Finalize the coding blocks (sign and append to the shred list)
// append to the shred list
coding_shreds.into_iter().for_each(|code| {
let shred: Shred = bincode::deserialize(&code).unwrap();
self.finalize_shred(shred, code, coding_header_offset)
self.shred_tuples.push((shred, code));
});
self.fec_set_index = self.index;
self.fec_set_shred_start = self.shred_tuples.len();
}
}
/// Create the final data shred for the current FEC set or slot
/// If there's an active data shred, morph it into the final shred
/// If the current active data shred is first in slot, finalize it and create a new shred
fn make_final_data_shred(&mut self) -> DataShred {
let mut shred = self
.active_shred
.take()
.map_or(self.new_data_shred(), |current_shred| match current_shred {
Shred::FirstInSlot(s) => {
self.finalize_data_shred(Shred::FirstInSlot(s));
self.new_data_shred()
fn make_final_data_shred(&mut self, last_in_slot: u8) {
if let Shred::FirstInSlot(_) = &self.active_shred {
self.finalize_data_shred();
}
self.active_shred = match self.active_shred.borrow_mut() {
Shred::FirstInSlot(s)
| Shred::Data(s)
| Shred::DataComplete(s)
| Shred::LastInSlot(s) => {
s.header.flags |= DATA_COMPLETE_SHRED;
if last_in_slot == LAST_SHRED_IN_SLOT {
s.header.flags |= LAST_SHRED_IN_SLOT;
Shred::LastInSlot(s.clone())
} else {
Shred::DataComplete(s.clone())
}
Shred::Data(s) | Shred::DataComplete(s) | Shred::LastInSlot(s) => s,
Shred::Coding(_) => self.new_data_shred(),
});
shred.header.flags |= DATA_COMPLETE_SHRED;
shred
}
Shred::Coding(_) => unreachable!(),
};
self.finalize_data_shred();
self.sign_unsigned_shreds_and_generate_codes();
}
/// Finalize the current FEC block, and generate coding shreds
pub fn finalize_data(&mut self) {
let final_shred = self.make_final_data_shred();
self.finalize_data_shred(Shred::DataComplete(final_shred));
self.generate_coding_shreds();
self.make_final_data_shred(0);
}
/// Finalize the current slot (i.e. add last slot shred) and generate coding shreds
pub fn finalize_slot(&mut self) {
let mut final_shred = self.make_final_data_shred();
final_shred.header.flags |= LAST_SHRED_IN_SLOT;
self.finalize_data_shred(Shred::LastInSlot(final_shred));
self.generate_coding_shreds();
self.make_final_data_shred(LAST_SHRED_IN_SLOT);
}
fn fill_in_missing_shreds(
@ -738,8 +751,7 @@ impl Shredder {
data_shred_bufs[..num_data]
.iter()
.flat_map(|data| {
let offset =
bincode::serialized_size(&Shred::Data(DataShred::empty_shred())).unwrap();
let offset = *SIZE_OF_SHRED_DATA_SHRED;
data[offset as usize..].iter()
})
.cloned()
@ -768,7 +780,6 @@ mod tests {
Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder");
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
assert!(DataShred::overhead() < PACKET_DATA_SIZE);
@ -778,7 +789,6 @@ mod tests {
let data: Vec<u8> = (0..25).collect();
assert_eq!(shredder.write(&data).unwrap(), data.len());
assert!(shredder.shred_tuples.is_empty());
assert_ne!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 25);
// Test1: Write some more data to shred. Not enough to create a signed shred
@ -793,8 +803,6 @@ mod tests {
assert_ne!(offset, data.len());
// Assert that we have atleast one signed shred
assert!(!shredder.shred_tuples.is_empty());
// Assert that a new active shred was also created
assert_ne!(shredder.active_shred, None);
// Assert that the new active shred was not populated
assert_eq!(shredder.active_offset, 0);
@ -811,7 +819,8 @@ mod tests {
assert_eq!(deserialized_shred.index(), 0);
assert_eq!(deserialized_shred.slot(), slot);
assert_eq!(deserialized_shred.parent(), slot - 5);
assert!(deserialized_shred.verify(&keypair.pubkey()));
// The shreds are not signed yet, as the data is not finalized
assert!(!deserialized_shred.verify(&keypair.pubkey()));
let seed0 = deserialized_shred.seed();
// Test that same seed is generated for a given shred
assert_eq!(seed0, deserialized_shred.seed());
@ -860,7 +869,6 @@ mod tests {
assert_eq!(deserialized_shred.index(), 2);
assert_eq!(deserialized_shred.slot(), slot);
assert_eq!(deserialized_shred.parent(), slot - 5);
assert!(deserialized_shred.verify(&keypair.pubkey()));
// Test8: Write more data to generate an intermediate data shred
let offset = shredder.write(&data).unwrap();
@ -878,7 +886,6 @@ mod tests {
assert_eq!(deserialized_shred.index(), 3);
assert_eq!(deserialized_shred.slot(), slot);
assert_eq!(deserialized_shred.parent(), slot - 5);
assert!(deserialized_shred.verify(&keypair.pubkey()));
// Test9: Write some data to shredder
let data: Vec<u8> = (0..25).collect();
@ -899,7 +906,6 @@ mod tests {
assert_eq!(deserialized_shred.index(), 4);
assert_eq!(deserialized_shred.slot(), slot);
assert_eq!(deserialized_shred.parent(), slot - 5);
assert!(deserialized_shred.verify(&keypair.pubkey()));
}
#[test]
@ -911,7 +917,6 @@ mod tests {
Shredder::new(slot, slot - 5, 0.0, &keypair, 0).expect("Failed in creating shredder");
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
let data: Vec<_> = (0..25).collect();
@ -923,7 +928,7 @@ mod tests {
shredder.finalize_data();
// We should have 2 shreds now (FirstInSlot, and LastInFECBlock)
// We should have 1 shred now
assert_eq!(shredder.shred_tuples.len(), 2);
let (_, shred) = shredder.shred_tuples.remove(0);
@ -948,7 +953,6 @@ mod tests {
.expect("Failed in creating shredder");
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
let data: Vec<_> = (0..25).collect();
@ -984,7 +988,6 @@ mod tests {
.expect("Failed in creating shredder");
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
// Write enough data to create a shred (> PACKET_DATA_SIZE)
@ -1060,7 +1063,6 @@ mod tests {
Shredder::new(slot, slot - 5, 1.0, &keypair, 0).expect("Failed in creating shredder");
assert!(shredder.shred_tuples.is_empty());
assert_eq!(shredder.active_shred, None);
assert_eq!(shredder.active_offset, 0);
let data: Vec<_> = (0..5000).collect();