Shred entries in parallel (#6180)

* Make shredding more parallel

* Fix erasure tests

* Fix replicator test

* Remove UnfinishedSlotInfo
This commit is contained in:
carllin 2019-10-08 00:42:51 -07:00 committed by GitHub
parent 667f9e0d79
commit ac2374e9a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 779 additions and 779 deletions

View File

@ -2,7 +2,12 @@
extern crate test;
use solana_core::shred::{Shredder, RECOMMENDED_FEC_RATE};
use solana_core::entry::create_ticks;
use solana_core::shred::{
max_ticks_per_shred, Shredder, RECOMMENDED_FEC_RATE, SIZE_OF_DATA_SHRED_HEADER,
};
use solana_sdk::hash::Hash;
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::Arc;
use test::Bencher;
@ -10,24 +15,29 @@ use test::Bencher;
#[bench]
fn bench_shredder(bencher: &mut Bencher) {
let kp = Arc::new(Keypair::new());
// 1Mb
let data = vec![0u8; 1000 * 1000];
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER;
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
// ~1Mb
let num_ticks = max_ticks_per_shred() * num_shreds as u64;
let entries = create_ticks(num_ticks, Hash::default());
bencher.iter(|| {
let mut shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, &kp, 0).unwrap();
bincode::serialize_into(&mut shredder, &data).unwrap();
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone()).unwrap();
shredder.entries_to_shreds(&entries, true, 0);
})
}
#[bench]
fn bench_deshredder(bencher: &mut Bencher) {
let kp = Arc::new(Keypair::new());
// 10MB
let data = vec![0u8; 10000 * 1000];
let mut shredded = Shredder::new(1, 0, 0.0, &kp, 0).unwrap();
let _ = bincode::serialize_into(&mut shredded, &data);
shredded.finalize_data();
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER;
// ~10Mb
let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size;
let num_ticks = max_ticks_per_shred() * num_shreds as u64;
let entries = create_ticks(num_ticks, Hash::default());
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp).unwrap();
let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;
bencher.iter(|| {
let raw = &mut Shredder::deshred(&shredded.shreds).unwrap();
let raw = &mut Shredder::deshred(&data_shreds).unwrap();
assert_ne!(raw.len(), 0);
})
}

View File

@ -169,7 +169,7 @@ mod test {
None,
true,
&Arc::new(Keypair::new()),
&entries,
entries,
)
.unwrap();

View File

@ -18,7 +18,6 @@ use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::borrow::Borrow;
use std::cell::RefCell;
use std::cmp;
use std::fs;
@ -806,21 +805,17 @@ impl Blocktree {
self.code_shred_cf.get_bytes((slot, index))
}
pub fn write_entries<I>(
pub fn write_entries(
&self,
start_slot: u64,
num_ticks_in_start_slot: u64,
start_index: u64,
start_index: u32,
ticks_per_slot: u64,
parent: Option<u64>,
is_full_slot: bool,
keypair: &Arc<Keypair>,
entries: I,
) -> Result<usize>
where
I: IntoIterator,
I::Item: Borrow<Entry>,
{
entries: Vec<Entry>,
) -> Result<usize> {
assert!(num_ticks_in_start_slot < ticks_per_slot);
let mut remaining_ticks_in_slot = ticks_per_slot - num_ticks_in_start_slot;
@ -833,40 +828,45 @@ impl Blocktree {
},
|v| v,
);
let mut shredder =
Shredder::new(current_slot, parent_slot, 0.0, keypair, start_index as u32)
.expect("Failed to create entry shredder");
let mut shredder = Shredder::new(current_slot, parent_slot, 0.0, keypair.clone())
.expect("Failed to create entry shredder");
let mut all_shreds = vec![];
let mut slot_entries = vec![];
// Find all the entries for start_slot
for entry in entries {
for entry in entries.into_iter() {
if remaining_ticks_in_slot == 0 {
current_slot += 1;
parent_slot = current_slot - 1;
remaining_ticks_in_slot = ticks_per_slot;
shredder.finalize_slot();
all_shreds.append(&mut shredder.shreds);
shredder =
Shredder::new(current_slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0)
.expect("Failed to create entry shredder");
let mut current_entries = vec![];
std::mem::swap(&mut slot_entries, &mut current_entries);
let start_index = {
if all_shreds.is_empty() {
start_index
} else {
0
}
};
let (mut data_shreds, mut coding_shreds, _) =
shredder.entries_to_shreds(&current_entries, true, start_index);
all_shreds.append(&mut data_shreds);
all_shreds.append(&mut coding_shreds);
shredder = Shredder::new(current_slot, parent_slot, 0.0, keypair.clone())
.expect("Failed to create entry shredder");
}
if entry.borrow().is_tick() {
if entry.is_tick() {
remaining_ticks_in_slot -= 1;
}
bincode::serialize_into(&mut shredder, &vec![entry.borrow().clone()])
.expect("Expect to write all entries to shreds");
if remaining_ticks_in_slot == 0 {
shredder.finalize_slot();
} else {
shredder.finalize_data();
}
slot_entries.push(entry);
}
if is_full_slot && remaining_ticks_in_slot != 0 {
shredder.finalize_slot();
if !slot_entries.is_empty() {
let (mut data_shreds, mut coding_shreds, _) =
shredder.entries_to_shreds(&slot_entries, is_full_slot, 0);
all_shreds.append(&mut data_shreds);
all_shreds.append(&mut coding_shreds);
}
all_shreds.append(&mut shredder.shreds);
let num_shreds = all_shreds.len();
self.insert_shreds(all_shreds, None)?;
@ -919,6 +919,7 @@ impl Blocktree {
break;
}
let (current_slot, index) = db_iterator.key().expect("Expect a valid key");
let current_index = {
if current_slot > slot {
end_index
@ -926,6 +927,7 @@ impl Blocktree {
index
}
};
let upper_index = cmp::min(current_index, end_index);
for i in prev_index..upper_index {
@ -982,9 +984,9 @@ impl Blocktree {
) -> Result<(Vec<Entry>, usize)> {
// Find the next consecutive block of shreds.
let mut serialized_shreds: Vec<Vec<u8>> = vec![];
let data_cf = self.db.column::<cf::ShredData>();
let data_shred_cf = self.db.column::<cf::ShredData>();
while let Some(serialized_shred) = data_cf.get_bytes((slot, start_index))? {
while let Some(serialized_shred) = data_shred_cf.get_bytes((slot, start_index))? {
serialized_shreds.push(serialized_shred);
start_index += 1;
}
@ -994,6 +996,7 @@ impl Blocktree {
serialized_shreds.len(),
slot
);
let mut shreds: Vec<Shred> = serialized_shreds
.into_iter()
.filter_map(|serialized_shred| Shred::new_from_serialized_shred(serialized_shred).ok())
@ -1036,7 +1039,6 @@ impl Blocktree {
}
trace!("Found {:?} entries", all_entries.len());
Ok((all_entries, num))
}
@ -1551,15 +1553,14 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re
// Fill slot 0 with ticks that link back to the genesis_block to bootstrap the ledger.
let blocktree = Blocktree::open(ledger_path)?;
let entries = crate::entry::create_ticks(ticks_per_slot, genesis_block.hash());
let mut shredder = Shredder::new(0, 0, 0.0, &Arc::new(Keypair::new()), 0)
.expect("Failed to create entry shredder");
let entries = crate::entry::create_ticks(ticks_per_slot, genesis_block.hash());
let last_hash = entries.last().unwrap().hash;
bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds");
shredder.finalize_slot();
let shreds: Vec<Shred> = shredder.shreds.drain(..).collect();
let shredder = Shredder::new(0, 0, 0.0, Arc::new(Keypair::new()))
.expect("Failed to create entry shredder");
let shreds = shredder.entries_to_shreds(&entries, true, 0).0;
assert!(shreds.last().unwrap().last_in_slot());
blocktree.insert_shreds(shreds, None)?;
blocktree.set_roots(&[0])?;
@ -1641,24 +1642,18 @@ pub fn entries_to_test_shreds(
parent_slot: u64,
is_full_slot: bool,
) -> Vec<Shred> {
let mut shredder = Shredder::new(slot, parent_slot, 0.0, &Arc::new(Keypair::new()), 0 as u32)
let shredder = Shredder::new(slot, parent_slot, 0.0, Arc::new(Keypair::new()))
.expect("Failed to create entry shredder");
bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds");
if is_full_slot {
shredder.finalize_slot();
} else {
shredder.finalize_data();
}
shredder.shreds.drain(..).collect()
shredder.entries_to_shreds(&entries, is_full_slot, 0).0
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::entry::{create_ticks, Entry};
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
use crate::shred::max_ticks_per_shred;
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::thread_rng;
@ -1667,6 +1662,54 @@ pub mod tests {
use std::iter::FromIterator;
use std::time::Duration;
#[test]
fn test_create_new_ledger() {
let mint_total = 1_000_000_000_000;
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(mint_total);
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
let ledger = Blocktree::open(&ledger_path).unwrap();
let ticks = create_ticks(genesis_block.ticks_per_slot, genesis_block.hash());
let entries = ledger.get_slot_entries(0, 0, None).unwrap();
assert_eq!(ticks, entries);
// Destroying database without closing it first is undefined behavior
drop(ledger);
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_insert_get_bytes() {
// Create enough entries to ensure there are at least two shreds created
let num_entries = max_ticks_per_shred() + 1;
assert!(num_entries > 1);
let (mut shreds, _) = make_slot_entries(0, 0, num_entries);
let ledger_path = get_tmp_ledger_path("test_insert_data_shreds_basic");
let ledger = Blocktree::open(&ledger_path).unwrap();
// Insert last shred, test we can retrieve it
let last_shred = shreds.pop().unwrap();
assert!(last_shred.index() > 0);
ledger
.insert_shreds(vec![last_shred.clone()], None)
.unwrap();
let serialized_shred = ledger
.data_shred_cf
.get_bytes((0, last_shred.index() as u64))
.unwrap()
.unwrap();
let deserialized_shred = Shred::new_from_serialized_shred(serialized_shred).unwrap();
assert_eq!(last_shred, deserialized_shred);
// Destroying database without closing it first is undefined behavior
drop(ledger);
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_write_entries() {
solana_logger::setup();
@ -1877,7 +1920,8 @@ pub mod tests {
#[test]
fn test_insert_data_shreds_basic() {
let num_entries = 5;
// Create enough entries to ensure there are at least two shreds created
let num_entries = max_ticks_per_shred() + 1;
assert!(num_entries > 1);
let (mut shreds, entries) = make_slot_entries(0, 0, num_entries);
@ -1888,6 +1932,7 @@ pub mod tests {
// Insert last shred, we're missing the other shreds, so no consecutive
// shreds starting from slot 0, index 0 should exist.
assert!(shreds.len() > 1);
let last_shred = shreds.pop().unwrap();
ledger.insert_shreds(vec![last_shred], None).unwrap();
assert!(ledger.get_slot_entries(0, 0, None).unwrap().is_empty());
@ -2098,21 +2143,28 @@ pub mod tests {
let blocktree_path = get_tmp_ledger_path("test_insert_data_shreds_consecutive");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Create enough entries to ensure there are at least two shreds created
let min_entries = max_ticks_per_shred() + 1;
for i in 0..4 {
let slot = i;
let parent_slot = if i == 0 { 0 } else { i - 1 };
// Write entries
let num_entries = 21 as u64 * (i + 1);
let (mut shreds, original_entries) =
make_slot_entries(slot, parent_slot, num_entries);
let num_entries = min_entries * (i + 1);
let (shreds, original_entries) = make_slot_entries(slot, parent_slot, num_entries);
let num_shreds = shreds.len() as u64;
assert!(num_shreds > 1);
let mut even_shreds = vec![];
let mut odd_shreds = vec![];
for i in (0..num_shreds).rev() {
if i % 2 != 0 {
odd_shreds.insert(0, shreds.remove(i as usize));
for (i, shred) in shreds.into_iter().enumerate() {
if i % 2 == 0 {
even_shreds.push(shred);
} else {
odd_shreds.push(shred);
}
}
blocktree.insert_shreds(odd_shreds, None).unwrap();
assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]);
@ -2121,7 +2173,7 @@ pub mod tests {
if num_shreds % 2 == 0 {
assert_eq!(meta.received, num_shreds);
} else {
debug!("got here");
trace!("got here");
assert_eq!(meta.received, num_shreds - 1);
}
assert_eq!(meta.consumed, 0);
@ -2131,7 +2183,7 @@ pub mod tests {
assert_eq!(meta.last_index, std::u64::MAX);
}
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(even_shreds, None).unwrap();
assert_eq!(
blocktree.get_slot_entries(slot, 0, None).unwrap(),
@ -2504,11 +2556,13 @@ pub mod tests {
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_slots = 15;
let entries_per_slot = 5;
// Create enough entries to ensure there are at least two shreds created
let entries_per_slot = max_ticks_per_shred() + 1;
assert!(entries_per_slot > 1);
let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
let shreds_per_slot = shreds.len() / num_slots as usize;
assert!(shreds_per_slot > 1);
// Write the shreds such that every 3rd slot has a gap in the beginning
let mut missing_shreds = vec![];
@ -2852,13 +2906,15 @@ pub mod tests {
// Write entries
let gap: u64 = 10;
assert!(gap > 3);
let num_entries = 10;
// Create enough entries to ensure there are at least two shreds created
let num_entries = max_ticks_per_shred() + 1;
let entries = create_ticks(num_entries, Hash::default());
let mut shreds = entries_to_test_shreds(entries, slot, 0, true);
let num_shreds = shreds.len();
for (i, b) in shreds.iter_mut().enumerate() {
b.set_index(i as u32 * gap as u32);
b.set_slot(slot);
assert!(num_shreds > 1);
for (i, s) in shreds.iter_mut().enumerate() {
s.set_index(i as u32 * gap as u32);
s.set_slot(slot);
}
blocktree.insert_shreds(shreds, None).unwrap();
@ -2892,7 +2948,8 @@ pub mod tests {
vec![1],
);
// Test with end indexes that are greater than the last item in the ledger
// Test with a range that encompasses a shred with index == gap which was
// already inserted.
let mut expected: Vec<u64> = (1..gap).collect();
expected.push(gap + 1);
assert_eq!(
@ -2943,8 +3000,9 @@ pub mod tests {
assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty);
assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty);
let entries = create_ticks(20, Hash::default());
let entries = create_ticks(100, Hash::default());
let mut shreds = entries_to_test_shreds(entries, slot, 0, true);
assert!(shreds.len() > 2);
shreds.drain(2..);
const ONE: u64 = 1;

View File

@ -456,7 +456,7 @@ pub mod tests {
Some(parent_slot),
true,
&Arc::new(Keypair::new()),
&entries,
entries,
)
.unwrap();
@ -849,7 +849,7 @@ pub mod tests {
// Fill up the rest of slot 1 with ticks
entries.extend(create_ticks(genesis_block.ticks_per_slot, last_entry_hash));
let last_blockhash = entries.last().unwrap().hash;
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to successfully open database ledger");
blocktree
@ -861,7 +861,7 @@ pub mod tests {
None,
true,
&Arc::new(Keypair::new()),
&entries,
entries,
)
.unwrap();
let (bank_forks, bank_forks_info, _) =
@ -877,7 +877,7 @@ pub mod tests {
mint - deducted_from_mint
);
assert_eq!(bank.tick_height(), 2 * genesis_block.ticks_per_slot - 1);
assert_eq!(bank.last_blockhash(), entries.last().unwrap().hash);
assert_eq!(bank.last_blockhash(), last_blockhash);
}
#[test]

View File

@ -1,5 +1,6 @@
use super::*;
use crate::entry::Entry;
use crate::shred::{Shredder, RECOMMENDED_FEC_RATE};
use solana_sdk::hash::Hash;
pub(super) struct BroadcastFakeBlobsRun {
@ -30,22 +31,26 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
let last_tick = receive_results.last_tick;
let keypair = &cluster_info.read().unwrap().keypair.clone();
let latest_blob_index = blocktree
let next_shred_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
.unwrap_or(0) as u32;
let num_entries = receive_results.entries.len();
let (shred_bufs, _) = broadcast_utils::entries_to_shreds(
receive_results.entries,
let shredder = Shredder::new(
bank.slot(),
receive_results.last_tick,
bank.max_tick_height(),
keypair,
latest_blob_index,
bank.parent().unwrap().slot(),
None,
RECOMMENDED_FEC_RATE,
keypair.clone(),
)
.expect("Expected to create a new shredder");
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(
&receive_results.entries,
last_tick == bank.max_tick_height(),
next_shred_index,
);
// If the last blockhash is default, a new block is being created
@ -58,15 +63,10 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
.map(|_| Entry::new(&self.last_blockhash, 0, vec![]))
.collect();
let (fake_shred_bufs, _) = broadcast_utils::entries_to_shreds(
fake_entries,
receive_results.last_tick,
bank.slot(),
bank.max_tick_height(),
keypair,
latest_blob_index,
bank.parent().unwrap().slot(),
None,
let (fake_data_shreds, fake_coding_shreds, _) = shredder.entries_to_shreds(
&fake_entries,
last_tick == bank.max_tick_height(),
next_shred_index,
);
// If it's the last tick, reset the last block hash to default
@ -75,19 +75,27 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
self.last_blockhash = Hash::default();
}
blocktree.insert_shreds(shred_bufs.clone(), None)?;
blocktree.insert_shreds(data_shreds.clone(), None)?;
blocktree.insert_shreds(coding_shreds.clone(), None)?;
// 3) Start broadcast step
let peers = cluster_info.read().unwrap().tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| {
if i <= self.partition {
// Send fake blobs to the first N peers
fake_shred_bufs.iter().for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
});
fake_data_shreds
.iter()
.chain(fake_coding_shreds.iter())
.for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
});
} else {
shred_bufs.iter().for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
});
data_shreds
.iter()
.chain(coding_shreds.iter())
.for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
});
}
});

View File

@ -1,9 +1,7 @@
use crate::entry::Entry;
use crate::poh_recorder::WorkingBankEntry;
use crate::result::Result;
use crate::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE};
use solana_runtime::bank::Bank;
use solana_sdk::signature::Keypair;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -71,79 +69,6 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
})
}
pub(super) fn entries_to_shreds(
entries: Vec<Entry>,
last_tick: u64,
slot: u64,
bank_max_tick: u64,
keypair: &Arc<Keypair>,
latest_shred_index: u64,
parent_slot: u64,
last_unfinished_slot: Option<UnfinishedSlotInfo>,
) -> (Vec<Shred>, Option<UnfinishedSlotInfo>) {
let mut shreds = if let Some(unfinished_slot) = last_unfinished_slot {
if unfinished_slot.slot != slot {
let mut shredder = Shredder::new(
unfinished_slot.slot,
unfinished_slot.parent,
RECOMMENDED_FEC_RATE,
keypair,
unfinished_slot.next_index as u32,
)
.expect("Expected to create a new shredder");
shredder.finalize_slot();
shredder.shreds.drain(..).collect()
} else {
vec![]
}
} else {
vec![]
};
let mut shredder = Shredder::new(
slot,
parent_slot,
RECOMMENDED_FEC_RATE,
keypair,
latest_shred_index as u32,
)
.expect("Expected to create a new shredder");
let now = Instant::now();
bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds");
let elapsed = now.elapsed().as_millis();
let unfinished_slot = if last_tick == bank_max_tick {
shredder.finalize_slot();
None
} else {
shredder.finalize_data();
Some(UnfinishedSlotInfo {
next_index: u64::from(shredder.index),
slot,
parent: parent_slot,
})
};
let num_shreds = shredder.shreds.len();
shreds.append(&mut shredder.shreds);
datapoint_info!(
"shredding-stats",
("slot", slot as i64, i64),
("num_shreds", num_shreds as i64, i64),
("signing_coding", shredder.signing_coding_time as i64, i64),
(
"copying_serializing",
(elapsed - shredder.signing_coding_time) as i64,
i64
),
);
(shreds, unfinished_slot)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,4 +1,5 @@
use super::*;
use crate::shred::{Shredder, RECOMMENDED_FEC_RATE};
use solana_sdk::hash::Hash;
pub(super) struct FailEntryVerificationBroadcastRun {}
@ -29,38 +30,52 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
last_entry.hash = Hash::default();
}
let keypair = &cluster_info.read().unwrap().keypair.clone();
let latest_blob_index = blocktree
let keypair = cluster_info.read().unwrap().keypair.clone();
let next_shred_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
.unwrap_or(0) as u32;
let (shred_infos, _) = broadcast_utils::entries_to_shreds(
receive_results.entries,
last_tick,
let shredder = Shredder::new(
bank.slot(),
bank.max_tick_height(),
keypair,
latest_blob_index,
bank.parent().unwrap().slot(),
None,
RECOMMENDED_FEC_RATE,
keypair.clone(),
)
.expect("Expected to create a new shredder");
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(
&receive_results.entries,
last_tick == bank.max_tick_height(),
next_shred_index,
);
let seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect();
blocktree.insert_shreds(shred_infos.clone(), None)?;
let all_shreds = data_shreds
.iter()
.cloned()
.chain(coding_shreds.iter().cloned())
.collect::<Vec<_>>();
let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect();
blocktree
.insert_shreds(all_shreds, None)
.expect("Failed to insert shreds in blocktree");
// 3) Start broadcast step
let bank_epoch = bank.get_stakers_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
let shred_bufs: Vec<Vec<u8>> = shred_infos.into_iter().map(|s| s.payload).collect();
// Broadcast data + erasures
let all_shred_bufs: Vec<Vec<u8>> = data_shreds
.into_iter()
.chain(coding_shreds.into_iter())
.map(|s| s.payload)
.collect();
// Broadcast data
cluster_info.read().unwrap().broadcast_shreds(
sock,
&shred_bufs,
&seeds,
&all_shred_bufs,
&all_seeds,
stakes.as_ref(),
)?;

View File

@ -1,6 +1,6 @@
use super::broadcast_utils;
use super::*;
use crate::broadcast_stage::broadcast_utils::{entries_to_shreds, UnfinishedSlotInfo};
use crate::shred::{Shredder, RECOMMENDED_FEC_RATE};
use solana_sdk::timing::duration_as_ms;
#[derive(Default)]
@ -12,7 +12,6 @@ struct BroadcastStats {
pub(super) struct StandardBroadcastRun {
stats: BroadcastStats,
unfinished_slot: Option<UnfinishedSlotInfo>,
current_slot: Option<u64>,
shredding_elapsed: u128,
insertion_elapsed: u128,
@ -24,7 +23,6 @@ impl StandardBroadcastRun {
pub(super) fn new() -> Self {
Self {
stats: BroadcastStats::default(),
unfinished_slot: None,
current_slot: None,
shredding_elapsed: 0,
insertion_elapsed: 0,
@ -42,7 +40,7 @@ impl StandardBroadcastRun {
run_elapsed: u64,
num_entries: usize,
num_shreds: usize,
blob_index: u64,
shred_index: u32,
) {
inc_new_counter_info!("broadcast_service-time_ms", broadcast_elapsed as usize);
@ -67,7 +65,7 @@ impl StandardBroadcastRun {
("shredding_time", shredding_elapsed as i64, i64),
("insert_shred_time", insert_shreds_elapsed as i64, i64),
("broadcast_time", broadcast_elapsed as i64, i64),
("transmit-index", blob_index as i64, i64),
("transmit-index", i64::from(shred_index), i64),
);
}
}
@ -95,11 +93,11 @@ impl BroadcastRun for StandardBroadcastRun {
// 2) Convert entries to blobs + generate coding blobs
let keypair = &cluster_info.read().unwrap().keypair.clone();
let latest_shred_index = blocktree
let next_shred_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
.unwrap_or(0) as u32;
let parent_slot = if let Some(parent_bank) = bank.parent() {
parent_bank.slot()
@ -107,25 +105,35 @@ impl BroadcastRun for StandardBroadcastRun {
0
};
// Create shreds from entries
let to_shreds_start = Instant::now();
let (shred_infos, uninished_slot) = entries_to_shreds(
receive_results.entries,
last_tick,
let shredder = Shredder::new(
bank.slot(),
bank.max_tick_height(),
keypair,
latest_shred_index,
parent_slot,
self.unfinished_slot,
RECOMMENDED_FEC_RATE,
keypair.clone(),
)
.expect("Expected to create a new shredder");
let (data_shreds, coding_shreds, latest_shred_index) = shredder.entries_to_shreds(
&receive_results.entries,
last_tick == bank.max_tick_height(),
next_shred_index,
);
let to_shreds_elapsed = to_shreds_start.elapsed();
self.unfinished_slot = uninished_slot;
let all_seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect();
let num_shreds = shred_infos.len();
let all_shreds = data_shreds
.iter()
.cloned()
.chain(coding_shreds.iter().cloned())
.collect::<Vec<_>>();
let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect();
let num_shreds = all_shreds.len();
// Insert shreds into blocktree
let insert_shreds_start = Instant::now();
blocktree
.insert_shreds(shred_infos.clone(), None)
.insert_shreds(all_shreds, None)
.expect("Failed to insert shreds in blocktree");
let insert_shreds_elapsed = insert_shreds_start.elapsed();
@ -134,7 +142,11 @@ impl BroadcastRun for StandardBroadcastRun {
let bank_epoch = bank.get_stakers_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
let all_shred_bufs: Vec<Vec<u8>> = shred_infos.into_iter().map(|s| s.payload).collect();
let all_shred_bufs: Vec<Vec<u8>> = data_shreds
.into_iter()
.chain(coding_shreds.into_iter())
.map(|s| s.payload)
.collect();
trace!("Broadcasting {:?} shreds", all_shred_bufs.len());
cluster_info.read().unwrap().broadcast_shreds(
@ -145,13 +157,6 @@ impl BroadcastRun for StandardBroadcastRun {
)?;
let broadcast_elapsed = broadcast_start.elapsed();
let latest_shred_index = uninished_slot.map(|s| s.next_index).unwrap_or_else(|| {
blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0)
});
self.insertion_elapsed += insert_shreds_elapsed.as_millis();
self.shredding_elapsed += to_shreds_elapsed.as_millis();
@ -164,7 +169,7 @@ impl BroadcastRun for StandardBroadcastRun {
("shredding_time", self.shredding_elapsed as i64, i64),
("insertion_time", self.insertion_elapsed as i64, i64),
("broadcast_time", self.broadcast_elapsed as i64, i64),
("num_shreds", latest_shred_index as i64, i64),
("num_shreds", i64::from(latest_shred_index), i64),
(
"slot_broadcast_time",
self.slot_broadcast_start.unwrap().elapsed().as_millis() as i64,
@ -186,7 +191,7 @@ impl BroadcastRun for StandardBroadcastRun {
),
num_entries,
num_shreds,
latest_shred_index,
next_shred_index,
);
Ok(())

View File

@ -136,7 +136,7 @@ mod tests {
None,
true,
&Arc::new(keypair),
&entries,
entries,
)
.unwrap();
@ -153,7 +153,7 @@ mod tests {
hasher.hash(&buf[..size]);
// golden needs to be updated if blob stuff changes....
let golden: Hash = "CLGvEayebjdgnLdttFAweZE9rqVkehXqEStUifG9kiU9"
let golden: Hash = "CGL4L6Q2QwiZQDCMwzshqj3S9riroUQuDjx8bS7ra2PU"
.parse()
.unwrap();

View File

@ -146,7 +146,7 @@ mod tests {
Some(0),
true,
&Arc::new(Keypair::new()),
&entries,
entries,
)
.unwrap();
@ -193,10 +193,10 @@ mod tests {
return;
}
let entries = create_ticks(32, Hash::default());
let ledger_dir = "test_encrypt_file_many_keys_multiple";
let ledger_path = get_tmp_ledger_path(ledger_dir);
let ticks_per_slot = 16;
let ticks_per_slot = 90;
let entries = create_ticks(2 * ticks_per_slot, Hash::default());
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
blocktree
.write_entries(
@ -207,7 +207,7 @@ mod tests {
Some(0),
true,
&Arc::new(Keypair::new()),
&entries,
entries,
)
.unwrap();

View File

@ -1775,9 +1775,9 @@ mod tests {
use crate::crds_value::CrdsValueLabel;
use crate::repair_service::RepairType;
use crate::result::Error;
use crate::shred::max_ticks_per_shred;
use crate::shred::{DataShredHeader, Shred};
use crate::test_tx::test_tx;
use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::collections::HashSet;
@ -1980,7 +1980,7 @@ mod tests {
let _ = fill_blocktree_slot_with_ticks(
&blocktree,
DEFAULT_TICKS_PER_SLOT,
max_ticks_per_shred() + 1,
2,
1,
Hash::default(),

View File

@ -404,6 +404,7 @@ mod test {
};
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::cluster_info::Node;
use crate::shred::max_ticks_per_shred;
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
@ -535,7 +536,7 @@ mod test {
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
let num_entries_per_slot = 10;
let num_entries_per_slot = max_ticks_per_shred() + 1;
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
for (mut slot_shreds, _) in shreds.into_iter() {

File diff suppressed because it is too large Load Diff

View File

@ -311,17 +311,14 @@ mod test {
};
fn local_entries_to_shred(
entries: Vec<Entry>,
entries: &[Entry],
slot: u64,
parent: u64,
keypair: &Arc<Keypair>,
) -> Vec<Shred> {
let mut shredder =
Shredder::new(slot, parent, 0.0, keypair, 0).expect("Failed to create entry shredder");
bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds");
shredder.finalize_slot();
shredder.shreds.drain(..).collect()
let shredder = Shredder::new(slot, parent, 0.0, keypair.clone())
.expect("Failed to create entry shredder");
shredder.entries_to_shreds(&entries, true, 0).0
}
#[test]
@ -330,8 +327,7 @@ mod test {
let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap());
let num_entries = 10;
let original_entries = create_ticks(num_entries, Hash::default());
let mut shreds =
local_entries_to_shred(original_entries.clone(), 0, 0, &Arc::new(Keypair::new()));
let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Arc::new(Keypair::new()));
shreds.reverse();
blocktree
.insert_shreds(shreds, None)
@ -356,7 +352,7 @@ mod test {
));
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let mut shreds = local_entries_to_shred(vec![Entry::default()], 0, 0, &leader_keypair);
let mut shreds = local_entries_to_shred(&[Entry::default()], 0, 0, &leader_keypair);
// with a Bank for slot 0, blob continues
assert_eq!(
@ -408,8 +404,7 @@ mod test {
// with a shred where shred.slot() == root, blob gets thrown out
let slot = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
let shreds =
local_entries_to_shred(vec![Entry::default()], slot, slot - 1, &leader_keypair);
let shreds = local_entries_to_shred(&[Entry::default()], slot, slot - 1, &leader_keypair);
assert_eq!(
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot),
false
@ -418,7 +413,7 @@ mod test {
// with a shred where shred.parent() < root, blob gets thrown out
let slot = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
let shreds =
local_entries_to_shred(vec![Entry::default()], slot + 1, slot - 1, &leader_keypair);
local_entries_to_shred(&[Entry::default()], slot + 1, slot - 1, &leader_keypair);
assert_eq!(
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot),
false