From 47535b9ff1d86617df3a77ef6e0e3f5c5bd08e06 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Tue, 27 Aug 2019 17:11:24 -0700 Subject: [PATCH] Use serialize_into to fill in shreds instead of writing one byte at a time (#5695) automerge --- core/src/blocktree.rs | 22 ++++--------- .../broadcast_stage/standard_broadcast_run.rs | 9 ++---- core/src/shred.rs | 32 +++++++++++-------- core/src/window_service.rs | 8 ++--- 4 files changed, 29 insertions(+), 42 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 03a51e400b..0a8512bf94 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -36,7 +36,6 @@ use std::sync::{Arc, RwLock}; pub use self::meta::*; pub use self::rooted_slot_iterator::*; use solana_sdk::timing::Slot; -use std::io::Write; mod db; mod meta; @@ -807,11 +806,8 @@ impl Blocktree { remaining_ticks_in_slot -= 1; } - let data = bincode::serialize(&vec![entry.borrow().clone()]).unwrap(); - let mut offset = 0; - while offset < data.len() { - offset += shredder.write(&data[offset..]).unwrap(); - } + bincode::serialize_into(&mut shredder, &vec![entry.borrow().clone()]) + .expect("Expect to write all entries to shreds"); if remaining_ticks_in_slot == 0 { shredder.finalize_slot(); } else { @@ -2507,11 +2503,8 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re let mut shredder = Shredder::new(0, Some(0), 0.0, &Arc::new(Keypair::new()), 0) .expect("Failed to create entry shredder"); let last_hash = entries.last().unwrap().hash; - let data = bincode::serialize(&entries).unwrap(); - let mut offset = 0; - while offset < data.len() { - offset += shredder.write(&data[offset..]).unwrap(); - } + bincode::serialize_into(&mut shredder, &entries) + .expect("Expect to write all entries to shreds"); shredder.finalize_slot(); let shreds: Vec = shredder .shreds @@ -4842,11 +4835,8 @@ pub mod tests { ) .expect("Failed to create entry shredder"); - let data = bincode::serialize(&entries).unwrap(); - let mut offset = 0; - while offset < data.len() { - offset += shredder.write(&data[offset..]).unwrap(); - } + bincode::serialize_into(&mut shredder, &entries) + .expect("Expect to write all entries to shreds"); if is_full_slot { shredder.finalize_slot(); } else { diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index e11ddd83ee..de043a0e8e 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -2,7 +2,6 @@ use super::broadcast_utils; use super::*; use crate::shred::Shred; use solana_sdk::timing::duration_as_ms; -use std::io::Write; #[derive(Default)] struct BroadcastStats { @@ -99,12 +98,8 @@ impl BroadcastRun for StandardBroadcastRun { ) .expect("Expected to create a new shredder"); - let data = bincode::serialize(&entries).unwrap(); - let mut offset = 0; - while offset < data.len() { - offset += shredder.write(&data[offset..]).unwrap(); - } - + bincode::serialize_into(&mut shredder, &entries) + .expect("Expect to write all entries to shreds"); if i == (num_ventries - 1) && last_tick == bank.max_tick_height() { shredder.finalize_slot(); } else { diff --git a/core/src/shred.rs b/core/src/shred.rs index 99bdc9740c..31cb10b063 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -219,8 +219,8 @@ impl Default for CodingShred { /// Common trait implemented by all types of shreds pub trait ShredCommon { - /// Write at a particular offset in the shred - fn write_at(&mut self, offset: usize, buf: &[u8]) -> usize; + /// Write at a particular offset in the shred. Returns amount written and leftover capacity + fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize); /// Overhead of shred enum and headers fn overhead() -> usize; /// Utility function to create an empty shred @@ -228,12 +228,14 @@ pub trait ShredCommon { } impl ShredCommon for FirstDataShred { - fn write_at(&mut self, offset: usize, buf: &[u8]) -> usize { - let slice_len = cmp::min(self.payload.len().saturating_sub(offset), buf.len()); + fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) { + let mut capacity = self.payload.len().saturating_sub(offset); + let slice_len = cmp::min(capacity, buf.len()); + capacity -= slice_len; if slice_len > 0 { self.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]); } - slice_len + (slice_len, capacity) } fn overhead() -> usize { @@ -250,12 +252,14 @@ impl ShredCommon for FirstDataShred { } impl ShredCommon for DataShred { - fn write_at(&mut self, offset: usize, buf: &[u8]) -> usize { - let slice_len = cmp::min(self.payload.len().saturating_sub(offset), buf.len()); + fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) { + let mut capacity = self.payload.len().saturating_sub(offset); + let slice_len = cmp::min(capacity, buf.len()); + capacity -= slice_len; if slice_len > 0 { self.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]); } - slice_len + (slice_len, capacity) } fn overhead() -> usize { @@ -272,12 +276,14 @@ impl ShredCommon for DataShred { } impl ShredCommon for CodingShred { - fn write_at(&mut self, offset: usize, buf: &[u8]) -> usize { - let slice_len = cmp::min(self.header.payload.len().saturating_sub(offset), buf.len()); + fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) { + let mut capacity = self.header.payload.len().saturating_sub(offset); + let slice_len = cmp::min(capacity, buf.len()); + capacity -= slice_len; if slice_len > 0 { self.header.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]); } - slice_len + (slice_len, capacity) } fn overhead() -> usize { @@ -333,7 +339,7 @@ impl Write for Shredder { .unwrap(); let written = self.active_offset; - let slice_len = match current_shred.borrow_mut() { + let (slice_len, left_capacity) = match current_shred.borrow_mut() { Shred::FirstInSlot(s) => s.write_at(written, buf), Shred::FirstInFECSet(s) | Shred::Data(s) @@ -342,7 +348,7 @@ impl Write for Shredder { Shred::Coding(s) => s.write_at(written, buf), }; - let active_shred = if buf.len() > slice_len { + let active_shred = if buf.len() > slice_len || left_capacity == 0 { self.finalize_data_shred(current_shred); // Continue generating more data shreds. // If the caller decides to finalize the FEC block or Slot, the data shred will diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 03511fff2b..7abda6c75a 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -258,7 +258,6 @@ mod test { use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; - use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; @@ -268,11 +267,8 @@ mod test { fn local_entries_to_shred(entries: Vec, keypair: &Arc) -> Vec { let mut shredder = Shredder::new(0, Some(0), 0.0, keypair, 0).expect("Failed to create entry shredder"); - let data = bincode::serialize(&entries).unwrap(); - let mut offset = 0; - while offset < data.len() { - offset += shredder.write(&data[offset..]).unwrap(); - } + bincode::serialize_into(&mut shredder, &entries) + .expect("Expect to write all entries to shreds"); shredder.finalize_slot(); shredder .shreds