Use serialize_into to fill in shreds instead of writing one byte at a time (#5695)
automerge
This commit is contained in:
parent
ffc748becb
commit
47535b9ff1
|
@ -36,7 +36,6 @@ use std::sync::{Arc, RwLock};
|
||||||
pub use self::meta::*;
|
pub use self::meta::*;
|
||||||
pub use self::rooted_slot_iterator::*;
|
pub use self::rooted_slot_iterator::*;
|
||||||
use solana_sdk::timing::Slot;
|
use solana_sdk::timing::Slot;
|
||||||
use std::io::Write;
|
|
||||||
|
|
||||||
mod db;
|
mod db;
|
||||||
mod meta;
|
mod meta;
|
||||||
|
@ -807,11 +806,8 @@ impl Blocktree {
|
||||||
remaining_ticks_in_slot -= 1;
|
remaining_ticks_in_slot -= 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
let data = bincode::serialize(&vec![entry.borrow().clone()]).unwrap();
|
bincode::serialize_into(&mut shredder, &vec![entry.borrow().clone()])
|
||||||
let mut offset = 0;
|
.expect("Expect to write all entries to shreds");
|
||||||
while offset < data.len() {
|
|
||||||
offset += shredder.write(&data[offset..]).unwrap();
|
|
||||||
}
|
|
||||||
if remaining_ticks_in_slot == 0 {
|
if remaining_ticks_in_slot == 0 {
|
||||||
shredder.finalize_slot();
|
shredder.finalize_slot();
|
||||||
} else {
|
} else {
|
||||||
|
@ -2507,11 +2503,8 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re
|
||||||
let mut shredder = Shredder::new(0, Some(0), 0.0, &Arc::new(Keypair::new()), 0)
|
let mut shredder = Shredder::new(0, Some(0), 0.0, &Arc::new(Keypair::new()), 0)
|
||||||
.expect("Failed to create entry shredder");
|
.expect("Failed to create entry shredder");
|
||||||
let last_hash = entries.last().unwrap().hash;
|
let last_hash = entries.last().unwrap().hash;
|
||||||
let data = bincode::serialize(&entries).unwrap();
|
bincode::serialize_into(&mut shredder, &entries)
|
||||||
let mut offset = 0;
|
.expect("Expect to write all entries to shreds");
|
||||||
while offset < data.len() {
|
|
||||||
offset += shredder.write(&data[offset..]).unwrap();
|
|
||||||
}
|
|
||||||
shredder.finalize_slot();
|
shredder.finalize_slot();
|
||||||
let shreds: Vec<Shred> = shredder
|
let shreds: Vec<Shred> = shredder
|
||||||
.shreds
|
.shreds
|
||||||
|
@ -4842,11 +4835,8 @@ pub mod tests {
|
||||||
)
|
)
|
||||||
.expect("Failed to create entry shredder");
|
.expect("Failed to create entry shredder");
|
||||||
|
|
||||||
let data = bincode::serialize(&entries).unwrap();
|
bincode::serialize_into(&mut shredder, &entries)
|
||||||
let mut offset = 0;
|
.expect("Expect to write all entries to shreds");
|
||||||
while offset < data.len() {
|
|
||||||
offset += shredder.write(&data[offset..]).unwrap();
|
|
||||||
}
|
|
||||||
if is_full_slot {
|
if is_full_slot {
|
||||||
shredder.finalize_slot();
|
shredder.finalize_slot();
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -2,7 +2,6 @@ use super::broadcast_utils;
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::shred::Shred;
|
use crate::shred::Shred;
|
||||||
use solana_sdk::timing::duration_as_ms;
|
use solana_sdk::timing::duration_as_ms;
|
||||||
use std::io::Write;
|
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct BroadcastStats {
|
struct BroadcastStats {
|
||||||
|
@ -99,12 +98,8 @@ impl BroadcastRun for StandardBroadcastRun {
|
||||||
)
|
)
|
||||||
.expect("Expected to create a new shredder");
|
.expect("Expected to create a new shredder");
|
||||||
|
|
||||||
let data = bincode::serialize(&entries).unwrap();
|
bincode::serialize_into(&mut shredder, &entries)
|
||||||
let mut offset = 0;
|
.expect("Expect to write all entries to shreds");
|
||||||
while offset < data.len() {
|
|
||||||
offset += shredder.write(&data[offset..]).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
if i == (num_ventries - 1) && last_tick == bank.max_tick_height() {
|
if i == (num_ventries - 1) && last_tick == bank.max_tick_height() {
|
||||||
shredder.finalize_slot();
|
shredder.finalize_slot();
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -219,8 +219,8 @@ impl Default for CodingShred {
|
||||||
|
|
||||||
/// Common trait implemented by all types of shreds
|
/// Common trait implemented by all types of shreds
|
||||||
pub trait ShredCommon {
|
pub trait ShredCommon {
|
||||||
/// Write at a particular offset in the shred
|
/// Write at a particular offset in the shred. Returns amount written and leftover capacity
|
||||||
fn write_at(&mut self, offset: usize, buf: &[u8]) -> usize;
|
fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize);
|
||||||
/// Overhead of shred enum and headers
|
/// Overhead of shred enum and headers
|
||||||
fn overhead() -> usize;
|
fn overhead() -> usize;
|
||||||
/// Utility function to create an empty shred
|
/// Utility function to create an empty shred
|
||||||
|
@ -228,12 +228,14 @@ pub trait ShredCommon {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShredCommon for FirstDataShred {
|
impl ShredCommon for FirstDataShred {
|
||||||
fn write_at(&mut self, offset: usize, buf: &[u8]) -> usize {
|
fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) {
|
||||||
let slice_len = cmp::min(self.payload.len().saturating_sub(offset), buf.len());
|
let mut capacity = self.payload.len().saturating_sub(offset);
|
||||||
|
let slice_len = cmp::min(capacity, buf.len());
|
||||||
|
capacity -= slice_len;
|
||||||
if slice_len > 0 {
|
if slice_len > 0 {
|
||||||
self.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]);
|
self.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]);
|
||||||
}
|
}
|
||||||
slice_len
|
(slice_len, capacity)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn overhead() -> usize {
|
fn overhead() -> usize {
|
||||||
|
@ -250,12 +252,14 @@ impl ShredCommon for FirstDataShred {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShredCommon for DataShred {
|
impl ShredCommon for DataShred {
|
||||||
fn write_at(&mut self, offset: usize, buf: &[u8]) -> usize {
|
fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) {
|
||||||
let slice_len = cmp::min(self.payload.len().saturating_sub(offset), buf.len());
|
let mut capacity = self.payload.len().saturating_sub(offset);
|
||||||
|
let slice_len = cmp::min(capacity, buf.len());
|
||||||
|
capacity -= slice_len;
|
||||||
if slice_len > 0 {
|
if slice_len > 0 {
|
||||||
self.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]);
|
self.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]);
|
||||||
}
|
}
|
||||||
slice_len
|
(slice_len, capacity)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn overhead() -> usize {
|
fn overhead() -> usize {
|
||||||
|
@ -272,12 +276,14 @@ impl ShredCommon for DataShred {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShredCommon for CodingShred {
|
impl ShredCommon for CodingShred {
|
||||||
fn write_at(&mut self, offset: usize, buf: &[u8]) -> usize {
|
fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) {
|
||||||
let slice_len = cmp::min(self.header.payload.len().saturating_sub(offset), buf.len());
|
let mut capacity = self.header.payload.len().saturating_sub(offset);
|
||||||
|
let slice_len = cmp::min(capacity, buf.len());
|
||||||
|
capacity -= slice_len;
|
||||||
if slice_len > 0 {
|
if slice_len > 0 {
|
||||||
self.header.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]);
|
self.header.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]);
|
||||||
}
|
}
|
||||||
slice_len
|
(slice_len, capacity)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn overhead() -> usize {
|
fn overhead() -> usize {
|
||||||
|
@ -333,7 +339,7 @@ impl Write for Shredder {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let written = self.active_offset;
|
let written = self.active_offset;
|
||||||
let slice_len = match current_shred.borrow_mut() {
|
let (slice_len, left_capacity) = match current_shred.borrow_mut() {
|
||||||
Shred::FirstInSlot(s) => s.write_at(written, buf),
|
Shred::FirstInSlot(s) => s.write_at(written, buf),
|
||||||
Shred::FirstInFECSet(s)
|
Shred::FirstInFECSet(s)
|
||||||
| Shred::Data(s)
|
| Shred::Data(s)
|
||||||
|
@ -342,7 +348,7 @@ impl Write for Shredder {
|
||||||
Shred::Coding(s) => s.write_at(written, buf),
|
Shred::Coding(s) => s.write_at(written, buf),
|
||||||
};
|
};
|
||||||
|
|
||||||
let active_shred = if buf.len() > slice_len {
|
let active_shred = if buf.len() > slice_len || left_capacity == 0 {
|
||||||
self.finalize_data_shred(current_shred);
|
self.finalize_data_shred(current_shred);
|
||||||
// Continue generating more data shreds.
|
// Continue generating more data shreds.
|
||||||
// If the caller decides to finalize the FEC block or Slot, the data shred will
|
// If the caller decides to finalize the FEC block or Slot, the data shred will
|
||||||
|
|
|
@ -258,7 +258,6 @@ mod test {
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
use std::fs::remove_dir_all;
|
use std::fs::remove_dir_all;
|
||||||
use std::io::Write;
|
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
|
@ -268,11 +267,8 @@ mod test {
|
||||||
fn local_entries_to_shred(entries: Vec<Entry>, keypair: &Arc<Keypair>) -> Vec<Shred> {
|
fn local_entries_to_shred(entries: Vec<Entry>, keypair: &Arc<Keypair>) -> Vec<Shred> {
|
||||||
let mut shredder =
|
let mut shredder =
|
||||||
Shredder::new(0, Some(0), 0.0, keypair, 0).expect("Failed to create entry shredder");
|
Shredder::new(0, Some(0), 0.0, keypair, 0).expect("Failed to create entry shredder");
|
||||||
let data = bincode::serialize(&entries).unwrap();
|
bincode::serialize_into(&mut shredder, &entries)
|
||||||
let mut offset = 0;
|
.expect("Expect to write all entries to shreds");
|
||||||
while offset < data.len() {
|
|
||||||
offset += shredder.write(&data[offset..]).unwrap();
|
|
||||||
}
|
|
||||||
shredder.finalize_slot();
|
shredder.finalize_slot();
|
||||||
shredder
|
shredder
|
||||||
.shreds
|
.shreds
|
||||||
|
|
Loading…
Reference in New Issue