Remove blocktree blob references (#5691)

* Remove blocktree blob references

* fixes and cleanup

* replace uninitialized() call with MaybeUninit

* fix bench
This commit is contained in:
Pankaj Garg 2019-09-03 21:32:51 -07:00 committed by GitHub
parent 2b696ac8dc
commit 3b0d48e3b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 691 additions and 3292 deletions

View File

@ -6,89 +6,60 @@ extern crate test;
#[macro_use]
extern crate solana_core;
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
use solana_core::blocktree::{get_tmp_ledger_path, Blocktree};
use solana_core::entry::{make_large_test_entries, make_tiny_test_entries, EntrySlice};
use solana_core::packet::{Blob, BLOB_HEADER_SIZE};
use rand::Rng;
use solana_core::blocktree::{entries_to_test_shreds, get_tmp_ledger_path, Blocktree};
use solana_core::entry::{make_large_test_entries, make_tiny_test_entries, Entry};
use std::path::Path;
use test::Bencher;
// Given some blobs and a ledger at ledger_path, benchmark writing the blobs to the ledger
fn bench_write_blobs(bench: &mut Bencher, blobs: &mut Vec<Blob>, ledger_path: &Path) {
// Given some shreds and a ledger at ledger_path, benchmark writing the shreds to the ledger
fn bench_write_shreds(bench: &mut Bencher, entries: Vec<Entry>, ledger_path: &Path) {
let blocktree =
Blocktree::open(ledger_path).expect("Expected to be able to open database ledger");
let num_blobs = blobs.len();
bench.iter(move || {
for blob in blobs.iter_mut() {
let index = blob.index();
blocktree
.put_data_blob_bytes(
blob.slot(),
index,
&blob.data[..BLOB_HEADER_SIZE + blob.size()],
)
.unwrap();
blob.set_index(index + num_blobs as u64);
}
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true);
blocktree.insert_shreds(shreds).unwrap();
});
Blocktree::destroy(ledger_path).expect("Expected successful database destruction");
}
// Insert some blobs into the ledger in preparation for read benchmarks
// Insert some shreds into the ledger in preparation for read benchmarks
fn setup_read_bench(
blocktree: &mut Blocktree,
num_small_blobs: u64,
num_large_blobs: u64,
num_small_shreds: u64,
num_large_shreds: u64,
slot: u64,
) {
// Make some big and small entries
let mut entries = make_large_test_entries(num_large_blobs as usize);
entries.extend(make_tiny_test_entries(num_small_blobs as usize));
let mut entries = make_large_test_entries(num_large_shreds as usize);
entries.extend(make_tiny_test_entries(num_small_shreds as usize));
// Convert the entries to blobs, write the blobs to the ledger
let mut blobs = entries.to_blobs();
for (index, b) in blobs.iter_mut().enumerate() {
b.set_index(index as u64);
b.set_slot(slot);
}
// Convert the entries to shreds, write the shreds to the ledger
let shreds = entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true);
blocktree
.write_blobs(&blobs)
.expect("Expectd successful insertion of blobs into ledger");
.insert_shreds(shreds)
.expect("Expectd successful insertion of shreds into ledger");
}
// Write small blobs to the ledger
// Write small shreds to the ledger
#[bench]
#[ignore]
fn bench_write_small(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let num_entries = 32 * 1024;
let entries = make_tiny_test_entries(num_entries);
let mut blobs = entries.to_blobs();
for (index, b) in blobs.iter_mut().enumerate() {
b.set_index(index as u64);
}
bench_write_blobs(bench, &mut blobs, &ledger_path);
bench_write_shreds(bench, entries, &ledger_path);
}
// Write big blobs to the ledger
// Write big shreds to the ledger
#[bench]
#[ignore]
fn bench_write_big(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let num_entries = 32 * 1024;
let entries = make_large_test_entries(num_entries);
let mut blobs = entries.to_blobs();
for (index, b) in blobs.iter_mut().enumerate() {
b.set_index(index as u64);
}
bench_write_blobs(bench, &mut blobs, &ledger_path);
bench_write_shreds(bench, entries, &ledger_path);
}
#[bench]
@ -98,20 +69,20 @@ fn bench_read_sequential(bench: &mut Bencher) {
let mut blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
// Insert some big and small blobs into the ledger
let num_small_blobs = 32 * 1024;
let num_large_blobs = 32 * 1024;
let total_blobs = num_small_blobs + num_large_blobs;
// Insert some big and small shreds into the ledger
let num_small_shreds = 32 * 1024;
let num_large_shreds = 32 * 1024;
let total_shreds = num_small_shreds + num_large_shreds;
let slot = 0;
setup_read_bench(&mut blocktree, num_small_blobs, num_large_blobs, slot);
setup_read_bench(&mut blocktree, num_small_shreds, num_large_shreds, slot);
let num_reads = total_blobs / 15;
let num_reads = total_shreds / 15;
let mut rng = rand::thread_rng();
bench.iter(move || {
// Generate random starting point in the range [0, total_blobs - 1], read num_reads blobs sequentially
let start_index = rng.gen_range(0, num_small_blobs + num_large_blobs);
// Generate random starting point in the range [0, total_shreds - 1], read num_reads shreds sequentially
let start_index = rng.gen_range(0, num_small_shreds + num_large_shreds);
for i in start_index..start_index + num_reads {
let _ = blocktree.get_data_shred_as_blob(slot, i as u64 % total_blobs);
let _ = blocktree.get_data_shred(slot, i as u64 % total_shreds);
}
});
@ -125,24 +96,24 @@ fn bench_read_random(bench: &mut Bencher) {
let mut blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
// Insert some big and small blobs into the ledger
let num_small_blobs = 32 * 1024;
let num_large_blobs = 32 * 1024;
let total_blobs = num_small_blobs + num_large_blobs;
// Insert some big and small shreds into the ledger
let num_small_shreds = 32 * 1024;
let num_large_shreds = 32 * 1024;
let total_shreds = num_small_shreds + num_large_shreds;
let slot = 0;
setup_read_bench(&mut blocktree, num_small_blobs, num_large_blobs, slot);
setup_read_bench(&mut blocktree, num_small_shreds, num_large_shreds, slot);
let num_reads = total_blobs / 15;
let num_reads = total_shreds / 15;
// Generate a num_reads sized random sample of indexes in range [0, total_blobs - 1],
// Generate a num_reads sized random sample of indexes in range [0, total_shreds - 1],
// simulating random reads
let mut rng = rand::thread_rng();
let indexes: Vec<usize> = (0..num_reads)
.map(|_| rng.gen_range(0, total_blobs) as usize)
.map(|_| rng.gen_range(0, total_shreds) as usize)
.collect();
bench.iter(move || {
for i in indexes.iter() {
let _ = blocktree.get_data_shred_as_blob(slot, *i as u64);
let _ = blocktree.get_data_shred(slot, *i as u64);
}
});
@ -151,45 +122,30 @@ fn bench_read_random(bench: &mut Bencher) {
#[bench]
#[ignore]
fn bench_insert_data_blob_small(bench: &mut Bencher) {
fn bench_insert_data_shred_small(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = make_tiny_test_entries(num_entries);
let mut blobs = entries.to_blobs();
blobs.shuffle(&mut thread_rng());
bench.iter(move || {
for blob in blobs.iter_mut() {
let index = blob.index();
blob.set_index(index + num_entries as u64);
}
blocktree.write_blobs(&blobs).unwrap();
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true);
blocktree.insert_shreds(shreds).unwrap();
});
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[bench]
#[ignore]
fn bench_insert_data_blob_big(bench: &mut Bencher) {
fn bench_insert_data_shred_big(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = make_large_test_entries(num_entries);
let mut shared_blobs = entries.to_shared_blobs();
shared_blobs.shuffle(&mut thread_rng());
bench.iter(move || {
for blob in shared_blobs.iter_mut() {
let index = blob.read().unwrap().index();
blocktree.write_shared_blobs(vec![blob.clone()]).unwrap();
blob.write().unwrap().set_index(index + num_entries as u64);
}
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true);
blocktree.insert_shreds(shreds).unwrap();
});
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}

View File

@ -161,7 +161,7 @@ mod test {
let expected_tick_heights = [5, 6, 7, 8, 8, 9];
blocktree
.write_entries_using_shreds(
.write_entries(
1,
0,
0,

File diff suppressed because it is too large Load Diff

View File

@ -442,7 +442,6 @@ fn process_pending_slots(
pub mod tests {
use super::*;
use crate::blocktree::create_new_tmp_ledger;
use crate::blocktree::tests::entries_to_blobs;
use crate::entry::{create_ticks, next_entry, next_entry_mut, Entry};
use crate::genesis_utils::{
create_genesis_block, create_genesis_block_with_leader, GenesisBlockInfo,
@ -468,7 +467,7 @@ pub mod tests {
let last_entry_hash = entries.last().unwrap().hash;
blocktree
.write_entries_using_shreds(
.write_entries(
slot,
0,
0,
@ -520,8 +519,18 @@ pub mod tests {
// throw away last one
entries.pop();
let blobs = entries_to_blobs(&entries, slot, parent_slot, false);
blocktree.insert_data_blobs(blobs.iter()).unwrap();
blocktree
.write_entries(
slot,
0,
0,
ticks_per_slot,
Some(parent_slot),
false,
&Arc::new(Keypair::new()),
entries,
)
.expect("Expected to write shredded entries to blocktree");
}
// slot 2, points at slot 1
@ -863,7 +872,7 @@ pub mod tests {
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to successfully open database ledger");
blocktree
.write_entries_using_shreds(
.write_entries(
1,
0,
0,

View File

@ -1,17 +1,14 @@
//! A stage to broadcast data from a leader node to validators
use self::broadcast_bad_blob_sizes::BroadcastBadBlobSizes;
use self::broadcast_fake_blobs_run::BroadcastFakeBlobsRun;
use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun;
use self::standard_broadcast_run::StandardBroadcastRun;
use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError};
use crate::erasure::{CodingGenerator, ErasureConfig};
use crate::erasure::ErasureConfig;
use crate::poh_recorder::WorkingBankEntries;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::shred::Shredder;
use crate::staking_utils;
use rayon::ThreadPool;
use solana_metrics::{datapoint, inc_new_counter_error, inc_new_counter_info};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
@ -20,7 +17,6 @@ use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
mod broadcast_bad_blob_sizes;
mod broadcast_fake_blobs_run;
pub(crate) mod broadcast_utils;
mod fail_entry_verification_broadcast_run;
@ -38,7 +34,6 @@ pub enum BroadcastStageType {
Standard,
FailEntryVerification,
BroadcastFakeBlobs,
BroadcastBadBlobSizes,
}
impl BroadcastStageType {
@ -81,16 +76,6 @@ impl BroadcastStageType {
BroadcastFakeBlobsRun::new(0),
erasure_config,
),
BroadcastStageType::BroadcastBadBlobSizes => BroadcastStage::new(
sock,
cluster_info,
receiver,
exit_sender,
blocktree,
BroadcastBadBlobSizes::new(),
erasure_config,
),
}
}
}
@ -98,7 +83,6 @@ impl BroadcastStageType {
trait BroadcastRun {
fn run(
&mut self,
broadcast: &mut Broadcast,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>,
sock: &UdpSocket,
@ -106,11 +90,6 @@ trait BroadcastRun {
) -> Result<()>;
}
struct Broadcast {
coding_generator: CodingGenerator,
thread_pool: ThreadPool,
}
// Implement a destructor for the BroadcastStage thread to signal it exited
// even on panics
struct Finalizer {
@ -141,22 +120,9 @@ impl BroadcastStage {
receiver: &Receiver<WorkingBankEntries>,
blocktree: &Arc<Blocktree>,
mut broadcast_stage_run: impl BroadcastRun,
erasure_config: &ErasureConfig,
) -> BroadcastStageReturnType {
let coding_generator = CodingGenerator::new_from_config(erasure_config);
let mut broadcast = Broadcast {
coding_generator,
thread_pool: rayon::ThreadPoolBuilder::new()
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
.build()
.unwrap(),
};
loop {
if let Err(e) =
broadcast_stage_run.run(&mut broadcast, &cluster_info, receiver, sock, blocktree)
{
if let Err(e) = broadcast_stage_run.run(&cluster_info, receiver, sock, blocktree) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => {
return BroadcastStageReturnType::ChannelDisconnected;
@ -195,11 +161,10 @@ impl BroadcastStage {
exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
broadcast_stage_run: impl BroadcastRun + Send + 'static,
erasure_config: &ErasureConfig,
_erasure_config: &ErasureConfig,
) -> Self {
let blocktree = blocktree.clone();
let exit_sender = exit_sender.clone();
let erasure_config = *erasure_config;
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
@ -210,7 +175,6 @@ impl BroadcastStage {
&receiver,
&blocktree,
broadcast_stage_run,
&erasure_config,
)
})
.unwrap();

View File

@ -1,84 +0,0 @@
use super::*;
use crate::packet::BLOB_HEADER_SIZE;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Signable;
pub(super) struct BroadcastBadBlobSizes {}
impl BroadcastBadBlobSizes {
pub(super) fn new() -> Self {
Self {}
}
}
impl BroadcastRun for BroadcastBadBlobSizes {
fn run(
&mut self,
broadcast: &mut Broadcast,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>,
sock: &UdpSocket,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
// 1) Pull entries from banking stage
let mut receive_results = broadcast_utils::recv_slot_blobs(receiver)?;
let bank = receive_results.bank.clone();
let last_tick = receive_results.last_tick;
// 2) Convert entries to blobs + generate coding blobs. Set a garbage PoH on the last entry
// in the slot to make verification fail on validators
if last_tick == bank.max_tick_height() {
let mut last_entry = receive_results
.ventries
.last_mut()
.unwrap()
.last_mut()
.unwrap();
last_entry.0.hash = Hash::default();
}
let keypair = &cluster_info.read().unwrap().keypair.clone();
let latest_blob_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
let (data_blobs, coding_blobs) = broadcast_utils::entries_to_blobs(
receive_results.ventries,
&broadcast.thread_pool,
latest_blob_index,
last_tick,
&bank,
&keypair,
&mut broadcast.coding_generator,
);
for b in data_blobs.iter().chain(coding_blobs.iter()) {
let mut w_b = b.write().unwrap();
let real_size = w_b.meta.size;
// corrupt the size in the header
w_b.set_size(std::usize::MAX - BLOB_HEADER_SIZE);
// resign the blob
w_b.sign(&keypair);
// don't corrupt the size in the meta so that broadcast will still work
w_b.meta.size = real_size;
}
blocktree.write_shared_blobs(data_blobs.iter())?;
blocktree.put_shared_coding_blobs(coding_blobs.iter())?;
// 3) Start broadcast step
let bank_epoch = bank.get_stakers_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
// Broadcast data + erasures
cluster_info.read().unwrap().broadcast(
sock,
data_blobs.iter().chain(coding_blobs.iter()),
stakes.as_ref(),
)?;
Ok(())
}
}

View File

@ -19,14 +19,13 @@ impl BroadcastFakeBlobsRun {
impl BroadcastRun for BroadcastFakeBlobsRun {
fn run(
&mut self,
broadcast: &mut Broadcast,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>,
sock: &UdpSocket,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
// 1) Pull entries from banking stage
let receive_results = broadcast_utils::recv_slot_blobs(receiver)?;
let receive_results = broadcast_utils::recv_slot_shreds(receiver)?;
let bank = receive_results.bank.clone();
let last_tick = receive_results.last_tick;
@ -37,14 +36,14 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
.map(|meta| meta.consumed)
.unwrap_or(0);
let (data_blobs, coding_blobs) = broadcast_utils::entries_to_blobs(
let (shreds, shred_bufs, _) = broadcast_utils::entries_to_shreds(
receive_results.ventries,
&broadcast.thread_pool,
bank.slot(),
receive_results.last_tick,
bank.max_tick_height(),
keypair,
latest_blob_index,
last_tick,
&bank,
&keypair,
&mut broadcast.coding_generator,
bank.parent().unwrap().slot(),
);
// If the last blockhash is default, a new block is being created
@ -57,14 +56,14 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
.map(|_| vec![(Entry::new(&self.last_blockhash, 0, vec![]), 0)])
.collect();
let (fake_data_blobs, fake_coding_blobs) = broadcast_utils::entries_to_blobs(
let (_fake_shreds, fake_shred_bufs, _) = broadcast_utils::entries_to_shreds(
fake_ventries,
&broadcast.thread_pool,
bank.slot(),
receive_results.last_tick,
bank.max_tick_height(),
keypair,
latest_blob_index,
last_tick,
&bank,
&keypair,
&mut broadcast.coding_generator,
bank.parent().unwrap().slot(),
);
// If it's the last tick, reset the last block hash to default
@ -73,48 +72,18 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
self.last_blockhash = Hash::default();
}
blocktree.write_shared_blobs(data_blobs.iter())?;
blocktree.put_shared_coding_blobs(coding_blobs.iter())?;
// Set the forwarded flag to true, so that the blobs won't be forwarded to peers
data_blobs
.iter()
.for_each(|blob| blob.write().unwrap().set_forwarded(true));
coding_blobs
.iter()
.for_each(|blob| blob.write().unwrap().set_forwarded(true));
fake_data_blobs
.iter()
.for_each(|blob| blob.write().unwrap().set_forwarded(true));
fake_coding_blobs
.iter()
.for_each(|blob| blob.write().unwrap().set_forwarded(true));
blocktree.insert_shreds(shreds)?;
// 3) Start broadcast step
let peers = cluster_info.read().unwrap().tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| {
if i <= self.partition {
// Send fake blobs to the first N peers
fake_data_blobs.iter().for_each(|b| {
let blob = b.read().unwrap();
sock.send_to(&blob.data[..blob.meta.size], &peer.tvu)
.unwrap();
});
fake_coding_blobs.iter().for_each(|b| {
let blob = b.read().unwrap();
sock.send_to(&blob.data[..blob.meta.size], &peer.tvu)
.unwrap();
fake_shred_bufs.iter().for_each(|b| {
sock.send_to(b, &peer.tvu_forwards).unwrap();
});
} else {
data_blobs.iter().for_each(|b| {
let blob = b.read().unwrap();
sock.send_to(&blob.data[..blob.meta.size], &peer.tvu)
.unwrap();
});
coding_blobs.iter().for_each(|b| {
let blob = b.read().unwrap();
sock.send_to(&blob.data[..blob.meta.size], &peer.tvu)
.unwrap();
shred_bufs.iter().for_each(|b| {
sock.send_to(b, &peer.tvu_forwards).unwrap();
});
}
});

View File

@ -1,13 +1,9 @@
use crate::entry::Entry;
use crate::entry::EntrySlice;
use crate::erasure::CodingGenerator;
use crate::packet::{self, SharedBlob};
use crate::poh_recorder::WorkingBankEntries;
use crate::result::Result;
use rayon::prelude::*;
use rayon::ThreadPool;
use crate::shred::{Shred, Shredder};
use solana_runtime::bank::Bank;
use solana_sdk::signature::{Keypair, KeypairUtil, Signable};
use solana_sdk::signature::Keypair;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -38,7 +34,7 @@ impl ReceiveResults {
}
}
pub(super) fn recv_slot_blobs(receiver: &Receiver<WorkingBankEntries>) -> Result<ReceiveResults> {
pub(super) fn recv_slot_shreds(receiver: &Receiver<WorkingBankEntries>) -> Result<ReceiveResults> {
let timer = Duration::new(1, 0);
let (mut bank, entries) = receiver.recv_timeout(timer)?;
let recv_start = Instant::now();
@ -74,83 +70,52 @@ pub(super) fn recv_slot_blobs(receiver: &Receiver<WorkingBankEntries>) -> Result
Ok(receive_results)
}
pub(super) fn entries_to_blobs(
pub(super) fn entries_to_shreds(
ventries: Vec<Vec<(Entry, u64)>>,
thread_pool: &ThreadPool,
latest_blob_index: u64,
slot: u64,
last_tick: u64,
bank: &Bank,
keypair: &Keypair,
coding_generator: &mut CodingGenerator,
) -> (Vec<SharedBlob>, Vec<SharedBlob>) {
let blobs = generate_data_blobs(
ventries,
thread_pool,
latest_blob_index,
last_tick,
&bank,
&keypair,
);
bank_max_tick: u64,
keypair: &Arc<Keypair>,
mut latest_shred_index: u64,
parent_slot: u64,
) -> (Vec<Shred>, Vec<Vec<u8>>, u64) {
let mut all_shred_bufs = vec![];
let mut all_shreds = vec![];
let num_ventries = ventries.len();
ventries
.into_iter()
.enumerate()
.for_each(|(i, entries_tuple)| {
let (entries, _): (Vec<_>, Vec<_>) = entries_tuple.into_iter().unzip();
//entries
let mut shredder = Shredder::new(
slot,
Some(parent_slot),
1.0,
keypair,
latest_shred_index as u32,
)
.expect("Expected to create a new shredder");
let coding = generate_coding_blobs(&blobs, &thread_pool, coding_generator, &keypair);
bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds");
(blobs, coding)
}
pub(super) fn generate_data_blobs(
ventries: Vec<Vec<(Entry, u64)>>,
thread_pool: &ThreadPool,
latest_blob_index: u64,
last_tick: u64,
bank: &Bank,
keypair: &Keypair,
) -> Vec<SharedBlob> {
let blobs: Vec<SharedBlob> = thread_pool.install(|| {
ventries
.into_par_iter()
.map(|p| {
let entries: Vec<_> = p.into_iter().map(|e| e.0).collect();
entries.to_shared_blobs()
})
.flatten()
.collect()
});
packet::index_blobs(
&blobs,
&keypair.pubkey(),
latest_blob_index,
bank.slot(),
bank.parent().map_or(0, |parent| parent.slot()),
);
if last_tick == bank.max_tick_height() {
blobs.last().unwrap().write().unwrap().set_is_last_in_slot();
}
// Make sure not to modify the blob header or data after signing it here
thread_pool.install(|| {
blobs.par_iter().for_each(|b| {
b.write().unwrap().sign(keypair);
})
});
blobs
}
pub(super) fn generate_coding_blobs(
blobs: &[SharedBlob],
thread_pool: &ThreadPool,
coding_generator: &mut CodingGenerator,
keypair: &Keypair,
) -> Vec<SharedBlob> {
let coding = coding_generator.next(&blobs);
thread_pool.install(|| {
coding.par_iter().for_each(|c| {
c.write().unwrap().sign(keypair);
})
});
coding
if i == (num_ventries - 1) && last_tick == bank_max_tick {
shredder.finalize_slot();
} else {
shredder.finalize_fec_block();
}
let mut shreds: Vec<Shred> = shredder
.shreds
.iter()
.map(|s| bincode::deserialize(s).unwrap())
.collect();
trace!("Inserting {:?} shreds in blocktree", shreds.len());
latest_shred_index = u64::from(shredder.index);
all_shreds.append(&mut shreds);
all_shred_bufs.append(&mut shredder.shreds);
});
(all_shreds, all_shred_bufs, latest_shred_index)
}

View File

@ -12,14 +12,13 @@ impl FailEntryVerificationBroadcastRun {
impl BroadcastRun for FailEntryVerificationBroadcastRun {
fn run(
&mut self,
broadcast: &mut Broadcast,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>,
sock: &UdpSocket,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
// 1) Pull entries from banking stage
let mut receive_results = broadcast_utils::recv_slot_blobs(receiver)?;
let mut receive_results = broadcast_utils::recv_slot_shreds(receiver)?;
let bank = receive_results.bank.clone();
let last_tick = receive_results.last_tick;
@ -42,27 +41,29 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
.map(|meta| meta.consumed)
.unwrap_or(0);
let (data_blobs, coding_blobs) = broadcast_utils::entries_to_blobs(
let (shreds, shred_bufs, _) = broadcast_utils::entries_to_shreds(
receive_results.ventries,
&broadcast.thread_pool,
latest_blob_index,
bank.slot(),
last_tick,
&bank,
&keypair,
&mut broadcast.coding_generator,
bank.max_tick_height(),
keypair,
latest_blob_index,
bank.parent().unwrap().slot(),
);
blocktree.write_shared_blobs(data_blobs.iter())?;
blocktree.put_shared_coding_blobs(coding_blobs.iter())?;
let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect();
blocktree.insert_shreds(shreds)?;
// 3) Start broadcast step
let bank_epoch = bank.get_stakers_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
// Broadcast data + erasures
cluster_info.read().unwrap().broadcast(
cluster_info.read().unwrap().broadcast_shreds(
sock,
data_blobs.iter().chain(coding_blobs.iter()),
&shred_bufs,
&seeds,
stakes.as_ref(),
)?;

View File

@ -1,6 +1,6 @@
use super::broadcast_utils;
use super::*;
use crate::shred::Shred;
use crate::broadcast_stage::broadcast_utils::entries_to_shreds;
use solana_sdk::timing::duration_as_ms;
#[derive(Default)]
@ -51,14 +51,13 @@ impl StandardBroadcastRun {
impl BroadcastRun for StandardBroadcastRun {
fn run(
&mut self,
_broadcast: &mut Broadcast,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>,
sock: &UdpSocket,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
// 1) Pull entries from banking stage
let receive_results = broadcast_utils::recv_slot_blobs(receiver)?;
let receive_results = broadcast_utils::recv_slot_shreds(receiver)?;
let receive_elapsed = receive_results.time_elapsed;
let num_entries = receive_results.num_entries;
let bank = receive_results.bank.clone();
@ -68,7 +67,7 @@ impl BroadcastRun for StandardBroadcastRun {
// 2) Convert entries to blobs + generate coding blobs
let to_blobs_start = Instant::now();
let keypair = &cluster_info.read().unwrap().keypair.clone();
let mut latest_blob_index = blocktree
let latest_shred_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
@ -79,48 +78,22 @@ impl BroadcastRun for StandardBroadcastRun {
} else {
0
};
let mut all_shreds = vec![];
let mut all_seeds = vec![];
let num_ventries = receive_results.ventries.len();
receive_results
.ventries
.into_iter()
.enumerate()
.for_each(|(i, entries_tuple)| {
let (entries, _): (Vec<_>, Vec<_>) = entries_tuple.into_iter().unzip();
//entries
let mut shredder = Shredder::new(
bank.slot(),
Some(parent_slot),
1.0,
keypair,
latest_blob_index as u32,
)
.expect("Expected to create a new shredder");
bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds");
if i == (num_ventries - 1) && last_tick == bank.max_tick_height() {
shredder.finalize_slot();
} else {
shredder.finalize_fec_block();
}
let (all_shreds, all_shred_bufs, latest_shred_index) = entries_to_shreds(
receive_results.ventries,
bank.slot(),
last_tick,
bank.max_tick_height(),
keypair,
latest_shred_index,
parent_slot,
);
let shreds: Vec<Shred> = shredder
.shreds
.iter()
.map(|s| bincode::deserialize(s).unwrap())
.collect();
let mut seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect();
trace!("Inserting {:?} shreds in blocktree", shreds.len());
blocktree
.insert_shreds(shreds)
.expect("Failed to insert shreds in blocktree");
latest_blob_index = u64::from(shredder.index);
all_shreds.append(&mut shredder.shreds);
all_seeds.append(&mut seeds);
});
let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect();
let num_shreds = all_shreds.len();
blocktree
.insert_shreds(all_shreds)
.expect("Failed to insert shreds in blocktree");
let to_blobs_elapsed = to_blobs_start.elapsed();
@ -129,15 +102,15 @@ impl BroadcastRun for StandardBroadcastRun {
let bank_epoch = bank.get_stakers_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
trace!("Broadcasting {:?} shreds", all_shreds.len());
trace!("Broadcasting {:?} shreds", all_shred_bufs.len());
cluster_info.read().unwrap().broadcast_shreds(
sock,
&all_shreds,
&all_shred_bufs,
&all_seeds,
stakes.as_ref(),
)?;
inc_new_counter_debug!("streamer-broadcast-sent", all_shreds.len());
inc_new_counter_debug!("streamer-broadcast-sent", num_shreds);
let broadcast_elapsed = broadcast_start.elapsed();
self.update_broadcast_stats(
@ -145,7 +118,7 @@ impl BroadcastRun for StandardBroadcastRun {
duration_as_ms(&(receive_elapsed + to_blobs_elapsed + broadcast_elapsed)),
num_entries,
duration_as_ms(&to_blobs_elapsed),
latest_blob_index,
latest_shred_index,
);
Ok(())

View File

@ -27,7 +27,7 @@ pub fn chacha_cbc_encrypt_ledger(
let mut current_slot = start_slot;
let mut start_index = 0;
loop {
match blocktree.get_data_shreds(current_slot, start_index, &mut buffer) {
match blocktree.get_data_shreds(current_slot, start_index, std::u64::MAX, &mut buffer) {
Ok((last_index, mut size)) => {
debug!(
"chacha: encrypting slice: {} num_shreds: {} data_len: {}",
@ -128,7 +128,7 @@ mod tests {
let entries = make_tiny_deterministic_test_entries(slots_per_segment);
blocktree
.write_entries_using_shreds(
.write_entries(
0,
0,
0,

View File

@ -48,7 +48,7 @@ pub fn chacha_cbc_encrypt_file_many_keys(
chacha_init_sha_state(int_sha_states.as_mut_ptr(), num_keys as u32);
}
loop {
match blocktree.get_data_shreds(current_slot, start_index, &mut buffer) {
match blocktree.get_data_shreds(current_slot, start_index, std::u64::MAX, &mut buffer) {
Ok((last_index, mut size)) => {
debug!(
"chacha_cuda: encrypting segment: {} num_shreds: {} data_len: {}",
@ -139,7 +139,7 @@ mod tests {
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
blocktree
.write_entries_using_shreds(
.write_entries(
0,
0,
0,
@ -196,7 +196,7 @@ mod tests {
let ticks_per_slot = 16;
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
blocktree
.write_entries_using_shreds(
.write_entries(
0,
0,
0,

View File

@ -1752,7 +1752,7 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
mod tests {
use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::tests::make_many_slot_entries_using_shreds;
use crate::blocktree::tests::make_many_slot_entries;
use crate::blocktree::Blocktree;
use crate::blocktree_processor::tests::fill_blocktree_slot_with_ticks;
use crate::crds_value::CrdsValueLabel;
@ -1998,7 +1998,7 @@ mod tests {
assert!(rv.is_empty());
// Create slots 1, 2, 3 with 5 blobs apiece
let (blobs, _) = make_many_slot_entries_using_shreds(1, 3, 5);
let (blobs, _) = make_many_slot_entries(1, 3, 5);
blocktree
.insert_shreds(blobs)

View File

@ -315,7 +315,7 @@ impl ClusterInfoRepairListener {
// sending the blobs in this slot for repair, we expect these slots
// to be full.
if let Some(blob_data) = blocktree
.get_data_shred_bytes(slot, blob_index as u64)
.get_data_shred(slot, blob_index as u64)
.expect("Failed to read data blob from blocktree")
{
socket.send_to(&blob_data[..], repairee_tvu)?;
@ -323,7 +323,7 @@ impl ClusterInfoRepairListener {
}
if let Some(coding_bytes) = blocktree
.get_coding_blob_bytes(slot, blob_index as u64)
.get_coding_shred(slot, blob_index as u64)
.expect("Failed to read coding blob from blocktree")
{
socket.send_to(&coding_bytes[..], repairee_tvu)?;
@ -479,7 +479,7 @@ impl Service for ClusterInfoRepairListener {
mod tests {
use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::tests::make_many_slot_entries_using_shreds;
use crate::blocktree::tests::make_many_slot_entries;
use crate::cluster_info::Node;
use crate::packet::{Blob, SharedBlob};
use crate::streamer;
@ -623,7 +623,7 @@ mod tests {
let entries_per_slot = 5;
let num_slots = 10;
assert_eq!(num_slots % 2, 0);
let (shreds, _) = make_many_slot_entries_using_shreds(0, num_slots, entries_per_slot);
let (shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
let num_shreds_per_slot = shreds.len() as u64 / num_slots;
// Write slots in the range [0, num_slots] to blocktree
@ -703,7 +703,7 @@ mod tests {
// Create blobs for first two epochs and write them to blocktree
let total_slots = slots_per_epoch * 2;
let (shreds, _) = make_many_slot_entries_using_shreds(0, total_slots, 1);
let (shreds, _) = make_many_slot_entries(0, total_slots, 1);
blocktree.insert_shreds(shreds).unwrap();
// Write roots so that these slots will qualify to be sent by the repairman

View File

@ -40,10 +40,6 @@
//! |<============== data blob part for "coding" ==============>|
//!
//!
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use std::cmp;
use std::convert::AsMut;
use std::sync::{Arc, RwLock};
use reed_solomon_erasure::ReedSolomon;
@ -52,8 +48,6 @@ use reed_solomon_erasure::ReedSolomon;
pub const NUM_DATA: usize = 8;
/// Number of coding blobs; also the maximum number that can go missing.
pub const NUM_CODING: usize = 8;
/// Total number of blobs in an erasure set; includes data and coding blobs
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING;
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct ErasureConfig {
@ -94,14 +88,6 @@ type Result<T> = std::result::Result<T, reed_solomon_erasure::Error>;
#[derive(Debug, Clone)]
pub struct Session(ReedSolomon);
/// Generates coding blobs on demand given data blobs
#[derive(Debug, Clone)]
pub struct CodingGenerator {
/// SharedBlobs that couldn't be used in last call to next()
leftover: Vec<SharedBlob>,
session: Arc<Session>,
}
impl Session {
pub fn new(data_count: usize, coding_count: usize) -> Result<Session> {
let rs = ReedSolomon::new(data_count, coding_count)?;
@ -132,180 +118,6 @@ impl Session {
Ok(())
}
/// Returns `(number_of_data_blobs, number_of_coding_blobs)`
pub fn dimensions(&self) -> (usize, usize) {
(self.0.data_shard_count(), self.0.parity_shard_count())
}
/// Reconstruct any missing blobs in this erasure set if possible
/// Re-indexes any coding blobs that have been reconstructed and fixes up size in metadata
/// Assumes that the user has sliced into the blobs appropriately already. else recovery will
/// return an error or garbage data
pub fn reconstruct_blobs<B>(
&self,
blobs: &mut [B],
present: &[bool],
size: usize,
block_start_idx: u64,
slot: u64,
) -> Result<(Vec<Blob>, Vec<Blob>)>
where
B: AsMut<[u8]>,
{
let mut blocks: Vec<&mut [u8]> = blobs.iter_mut().map(AsMut::as_mut).collect();
trace!("[reconstruct_blobs] present: {:?}, size: {}", present, size,);
// Decode the blocks
self.decode_blocks(blocks.as_mut_slice(), &present)?;
let mut recovered_data = vec![];
let mut recovered_coding = vec![];
let erasures = present
.iter()
.enumerate()
.filter_map(|(i, present)| if *present { None } else { Some(i) });
// Create the missing blobs from the reconstructed data
for n in erasures {
let data_size;
let idx;
let first_byte;
if n < self.0.data_shard_count() {
let mut blob = Blob::new(&blocks[n]);
blob.meta.size = blob.data_size() as usize;
data_size = blob.data_size() as usize;
idx = n as u64 + block_start_idx;
first_byte = blob.data[0];
recovered_data.push(blob);
} else {
let mut blob = Blob::default();
blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size].copy_from_slice(&blocks[n]);
blob.meta.size = size;
data_size = size;
idx = n as u64 + block_start_idx - NUM_DATA as u64;
first_byte = blob.data[0];
blob.set_slot(slot);
blob.set_index(idx);
blob.set_coding();
recovered_coding.push(blob);
}
trace!(
"[reconstruct_blobs] erasures[{}] ({}) data_size: {} data[0]: {}",
n,
idx,
data_size,
first_byte
);
}
Ok((recovered_data, recovered_coding))
}
}
impl CodingGenerator {
pub fn new(session: Arc<Session>) -> Self {
CodingGenerator {
leftover: Vec::with_capacity(session.0.data_shard_count()),
session,
}
}
pub fn new_from_config(config: &ErasureConfig) -> Self {
CodingGenerator {
leftover: Vec::with_capacity(config.num_data),
session: Arc::new(Session::new_from_config(config).unwrap()),
}
}
/// Yields next set of coding blobs, if any.
/// Must be called with consecutive data blobs within a slot.
///
/// Passing in a slice with the first blob having a new slot will cause internal state to
/// reset, so the above concern does not apply to slot boundaries, only indexes within a slot
/// must be consecutive.
///
/// If used improperly, it my return garbage coding blobs, but will not give an
/// error.
pub fn next(&mut self, next_data: &[SharedBlob]) -> Vec<SharedBlob> {
let (num_data, num_coding) = self.session.dimensions();
let mut next_coding =
Vec::with_capacity((self.leftover.len() + next_data.len()) / num_data * num_coding);
if !self.leftover.is_empty()
&& !next_data.is_empty()
&& self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot()
{
self.leftover.clear();
}
let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect();
for data_blobs in next_data.chunks(num_data) {
if data_blobs.len() < num_data {
self.leftover = data_blobs.to_vec();
break;
}
self.leftover.clear();
// find max_data_size for the erasure set
let max_data_size = data_blobs
.iter()
.fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max));
let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
let data_ptrs: Vec<_> = data_locks
.iter()
.map(|l| &l.data[..max_data_size])
.collect();
let mut coding_blobs = Vec::with_capacity(num_coding);
for data_blob in &data_locks[..num_coding] {
let index = data_blob.index();
let slot = data_blob.slot();
let id = data_blob.id();
let version = data_blob.version();
let mut coding_blob = Blob::default();
coding_blob.set_index(index);
coding_blob.set_slot(slot);
coding_blob.set_id(&id);
coding_blob.set_version(version);
coding_blob.set_size(max_data_size);
coding_blob.set_coding();
coding_blob.set_erasure_config(&data_blob.erasure_config());
coding_blobs.push(coding_blob);
}
if {
let mut coding_ptrs: Vec<_> = coding_blobs
.iter_mut()
.map(|blob| &mut blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + max_data_size])
.collect();
self.session.encode(&data_ptrs, coding_ptrs.as_mut_slice())
}
.is_ok()
{
next_coding.append(&mut coding_blobs);
}
}
next_coding
.into_iter()
.map(|blob| Arc::new(RwLock::new(blob)))
.collect()
}
}
impl Default for Session {
@ -314,27 +126,9 @@ impl Default for Session {
}
}
impl Default for CodingGenerator {
fn default() -> Self {
let session = Session::default();
CodingGenerator {
leftover: Vec::with_capacity(session.0.data_shard_count()),
session: Arc::new(session),
}
}
}
#[cfg(test)]
pub mod test {
use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::Blocktree;
use crate::packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signable;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::borrow::Borrow;
use std::path::Path;
/// Specifies the contents of a 16-data-blob and 4-coding-blob erasure set
/// Exists to be passed to `generate_blocktree_with_coding`
@ -354,23 +148,6 @@ pub mod test {
pub set_specs: Vec<ErasureSpec>,
}
/// Model of a slot in 16-blob chunks with varying amounts of erasure and coding blobs
/// present
#[derive(Debug, Clone)]
pub struct SlotModel {
pub slot: u64,
pub chunks: Vec<ErasureSetModel>,
}
/// Model of 16-blob chunk
#[derive(Debug, Clone)]
pub struct ErasureSetModel {
pub set_index: u64,
pub start_index: u64,
pub coding: Vec<SharedBlob>,
pub data: Vec<SharedBlob>,
}
#[test]
fn test_coding() {
const N_DATA: usize = 4;
@ -421,416 +198,4 @@ pub mod test {
assert_eq!(v_orig, vs[0]);
assert_eq!(erased, vs[erasure]);
}
fn test_toss_and_recover(
session: &Session,
data_blobs: &[SharedBlob],
coding_blobs: &[SharedBlob],
block_start_idx: usize,
) {
let size = coding_blobs[0].read().unwrap().size();
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(ERASURE_SET_SIZE);
blobs.push(SharedBlob::default()); // empty data, erasure at zero
for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] {
// skip first blob
blobs.push(blob.clone());
}
blobs.push(SharedBlob::default()); // empty coding, erasure at zero
for blob in &coding_blobs[1..NUM_CODING] {
blobs.push(blob.clone());
}
// toss one data and one coding
let mut present = vec![true; blobs.len()];
present[0] = false;
present[NUM_DATA] = false;
let (recovered_data, recovered_coding) = session
.reconstruct_shared_blobs(&mut blobs, &present, size, block_start_idx as u64, 0)
.expect("reconstruction must succeed");
assert_eq!(recovered_data.len(), 1);
assert_eq!(recovered_coding.len(), 1);
assert_eq!(
blobs[1].read().unwrap().meta,
data_blobs[block_start_idx + 1].read().unwrap().meta
);
assert_eq!(
blobs[1].read().unwrap().data(),
data_blobs[block_start_idx + 1].read().unwrap().data()
);
assert_eq!(
recovered_data[0].meta,
data_blobs[block_start_idx].read().unwrap().meta
);
assert_eq!(
recovered_data[0].data(),
data_blobs[block_start_idx].read().unwrap().data()
);
assert_eq!(
recovered_coding[0].data(),
coding_blobs[0].read().unwrap().data()
);
}
#[test]
fn test_generate_coding() {
solana_logger::setup();
// trivial case
let mut coding_generator = CodingGenerator::default();
let blobs = Vec::new();
for _ in 0..NUM_DATA * 2 {
let coding = coding_generator.next(&blobs);
assert!(coding.is_empty());
}
// test coding by iterating one blob at a time
let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
for (i, blob) in data_blobs.iter().cloned().enumerate() {
let coding_blobs = coding_generator.next(&[blob]);
if !coding_blobs.is_empty() {
assert_eq!(i % NUM_DATA, NUM_DATA - 1);
assert_eq!(coding_blobs.len(), NUM_CODING);
for j in 0..NUM_CODING {
let coding_blob = coding_blobs[j].read().unwrap();
//assert_eq!(coding_blob.index(), (i * NUM_DATA + j % NUM_CODING) as u64);
assert!(coding_blob.is_coding());
}
test_toss_and_recover(
&coding_generator.session,
&data_blobs,
&coding_blobs,
i - (i % NUM_DATA),
);
}
}
}
#[test]
fn test_erasure_generate_coding_reset_on_new_slot() {
solana_logger::setup();
let mut coding_generator = CodingGenerator::default();
// test coding by iterating one blob at a time
let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
for i in NUM_DATA..NUM_DATA * 2 {
data_blobs[i].write().unwrap().set_slot(1);
}
let coding_blobs = coding_generator.next(&data_blobs[0..NUM_DATA - 1]);
assert!(coding_blobs.is_empty());
let coding_blobs = coding_generator.next(&data_blobs[NUM_DATA..]);
assert_eq!(coding_blobs.len(), NUM_CODING);
test_toss_and_recover(
&coding_generator.session,
&data_blobs,
&coding_blobs,
NUM_DATA,
);
}
#[test]
#[ignore]
fn test_erasure_generate_blocktree_with_coding() {
let cases = vec![
(NUM_DATA, NUM_CODING, 7, 5),
(NUM_DATA - 6, NUM_CODING - 1, 5, 7),
];
for (num_data, num_coding, num_slots, num_sets_per_slot) in cases {
let ledger_path = get_tmp_ledger_path!();
let specs = (0..num_slots)
.map(|slot| {
let set_specs = (0..num_sets_per_slot)
.map(|set_index| ErasureSpec {
set_index,
num_data,
num_coding,
})
.collect();
SlotSpec { slot, set_specs }
})
.collect::<Vec<_>>();
let blocktree = generate_blocktree_with_coding(&ledger_path, &specs);
for spec in specs.iter() {
let slot = spec.slot;
for erasure_spec in spec.set_specs.iter() {
let start_index = erasure_spec.set_index * NUM_DATA as u64;
let (data_end, coding_end) = (
start_index + erasure_spec.num_data as u64,
start_index + erasure_spec.num_coding as u64,
);
for idx in start_index..data_end {
let opt_bytes = blocktree.get_data_shred_bytes(slot, idx).unwrap();
assert!(opt_bytes.is_some());
}
for idx in start_index..coding_end {
let opt_bytes = blocktree.get_coding_blob_bytes(slot, idx).unwrap();
assert!(opt_bytes.is_some());
}
}
}
drop(blocktree);
Blocktree::destroy(&ledger_path).expect("Expect successful blocktree destruction");
}
}
#[test]
fn test_recovery_with_model() {
use std::thread;
const MAX_ERASURE_SETS: u64 = 16;
const N_THREADS: usize = 2;
const N_SLOTS: u64 = 10;
solana_logger::setup();
let specs = (0..N_SLOTS).map(|slot| {
let num_erasure_sets = slot % MAX_ERASURE_SETS;
let set_specs = (0..num_erasure_sets)
.map(|set_index| ErasureSpec {
set_index,
num_data: NUM_DATA,
num_coding: NUM_CODING,
})
.collect();
SlotSpec { slot, set_specs }
});
let mut handles = vec![];
let session = Arc::new(Session::default());
for i in 0..N_THREADS {
let specs = specs.clone();
let session = Arc::clone(&session);
let handle = thread::Builder::new()
.name(i.to_string())
.spawn(move || {
for slot_model in generate_ledger_model(specs) {
for erasure_set in slot_model.chunks {
let erased_coding = erasure_set.coding[0].clone();
let erased_data = erasure_set.data[..3].to_vec();
let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE);
blobs.push(SharedBlob::default());
blobs.push(SharedBlob::default());
blobs.push(SharedBlob::default());
for blob in erasure_set.data.into_iter().skip(3) {
blobs.push(blob);
}
blobs.push(SharedBlob::default());
for blob in erasure_set.coding.into_iter().skip(1) {
blobs.push(blob);
}
let size = erased_coding.read().unwrap().size() as usize;
let mut present = vec![true; ERASURE_SET_SIZE];
present[0] = false;
present[1] = false;
present[2] = false;
present[NUM_DATA] = false;
session
.reconstruct_shared_blobs(
&mut blobs,
&present,
size,
erasure_set.set_index * NUM_DATA as u64,
slot_model.slot,
)
.expect("reconstruction must succeed");
for (expected, recovered) in erased_data.iter().zip(blobs.iter()) {
let expected = expected.read().unwrap();
let mut recovered = recovered.write().unwrap();
let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE;
recovered.set_size(data_size);
let corrupt = data_size > BLOB_DATA_SIZE;
assert!(!corrupt, "CORRUPTION");
assert_eq!(&*expected, &*recovered);
}
assert_eq!(
erased_coding.read().unwrap().data(),
blobs[NUM_DATA].read().unwrap().data()
);
debug!("passed set: {}", erasure_set.set_index);
}
debug!("passed slot: {}", slot_model.slot);
}
})
.expect("thread build error");
handles.push(handle);
}
handles.into_iter().for_each(|h| h.join().unwrap());
}
/// Generates a model of a ledger containing certain data and coding blobs according to a spec
pub fn generate_ledger_model<'a, I, IntoIt, S>(
specs: I,
) -> impl Iterator<Item = SlotModel> + Clone + 'a
where
I: IntoIterator<Item = S, IntoIter = IntoIt>,
IntoIt: Iterator<Item = S> + Clone + 'a,
S: Borrow<SlotSpec>,
{
let mut coding_generator = CodingGenerator::default();
let keypair = Keypair::new();
let bytes = keypair.to_bytes();
specs.into_iter().map(move |spec| {
let spec = spec.borrow();
let slot = spec.slot;
let chunks = spec
.set_specs
.iter()
.map(|erasure_spec| {
let set_index = erasure_spec.set_index as usize;
let start_index = set_index * NUM_DATA;
let mut blobs = generate_test_blobs(start_index, NUM_DATA);
let keypair = Keypair::from_bytes(&bytes).unwrap();
index_blobs(&blobs, &keypair.pubkey(), start_index as u64, slot, 0);
// Signing has to be deferred until all data/header fields are set correctly
blobs.iter().for_each(|blob| {
blob.write().unwrap().sign(&keypair);
});
let mut coding_blobs = coding_generator.next(&blobs);
blobs.drain(erasure_spec.num_data..);
coding_blobs.drain(erasure_spec.num_coding..);
ErasureSetModel {
start_index: start_index as u64,
set_index: set_index as u64,
data: blobs,
coding: coding_blobs,
}
})
.collect();
SlotModel { slot, chunks }
})
}
/// Genarates a ledger according to the given specs.
/// Blocktree should have correct SlotMeta and ErasureMeta and so on but will not have done any
/// possible recovery.
pub fn generate_blocktree_with_coding(ledger_path: &Path, specs: &[SlotSpec]) -> Blocktree {
let blocktree = Blocktree::open(ledger_path).unwrap();
let model = generate_ledger_model(specs);
for slot_model in model {
let slot = slot_model.slot;
for erasure_set in slot_model.chunks {
blocktree.write_shared_blobs(erasure_set.data).unwrap();
for shared_coding_blob in erasure_set.coding.into_iter() {
let blob = shared_coding_blob.read().unwrap();
blocktree
.put_coding_blob_bytes_raw(
slot,
blob.index(),
&blob.data[..blob.size() + BLOB_HEADER_SIZE],
)
.unwrap();
}
}
}
blocktree
}
// fn verify_test_blobs(offset: usize, blobs: &[SharedBlob]) -> bool {
// let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect();
//
// blobs.iter().enumerate().all(|(i, blob)| {
// let blob = blob.read().unwrap();
// blob.index() as usize == i + offset && blob.data() == &data[..]
// })
// }
//
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect();
let blobs: Vec<_> = (0..num_blobs)
.into_iter()
.map(|_| {
let mut blob = Blob::default();
blob.data_mut()[..].copy_from_slice(&data);
blob.set_size(BLOB_DATA_SIZE);
blob.set_erasure_config(&ErasureConfig::default());
Arc::new(RwLock::new(blob))
})
.collect();
index_blobs(&blobs, &Pubkey::new_rand(), offset as u64, 0, 0);
blobs
}
impl Session {
fn reconstruct_shared_blobs(
&self,
blobs: &mut [SharedBlob],
present: &[bool],
size: usize,
block_start_idx: u64,
slot: u64,
) -> Result<(Vec<Blob>, Vec<Blob>)> {
let mut locks: Vec<std::sync::RwLockWriteGuard<_>> = blobs
.iter()
.map(|shared_blob| shared_blob.write().unwrap())
.collect();
let mut slices: Vec<_> = locks
.iter_mut()
.enumerate()
.map(|(i, blob)| {
if i < NUM_DATA {
&mut blob.data[..size]
} else {
&mut blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size]
}
})
.collect();
self.reconstruct_blobs(&mut slices, present, size, block_start_idx, slot)
}
}
}

View File

@ -399,8 +399,8 @@ mod tests {
// Write a blob into slot 2 that chains to slot 1,
// but slot 1 is empty so should not be skipped
let (blobs, _) = make_slot_entries(2, 1, 1);
blocktree.write_blobs(&blobs[..]).unwrap();
let (shreds, _) = make_slot_entries(2, 1, 1);
blocktree.insert_shreds(shreds).unwrap();
assert_eq!(
cache
.next_leader_slot(&pubkey, 0, &bank, Some(&blocktree))
@ -410,10 +410,10 @@ mod tests {
);
// Write a blob into slot 1
let (blobs, _) = make_slot_entries(1, 0, 1);
let (shreds, _) = make_slot_entries(1, 0, 1);
// Check that slot 1 and 2 are skipped
blocktree.write_blobs(&blobs[..]).unwrap();
blocktree.insert_shreds(shreds).unwrap();
assert_eq!(
cache
.next_leader_slot(&pubkey, 0, &bank, Some(&blocktree))

View File

@ -83,8 +83,8 @@ mod tests {
fn test_cleanup() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (blobs, _) = make_many_slot_entries(0, 50, 5);
blocktree.write_blobs(blobs).unwrap();
let (shreds, _) = make_many_slot_entries(0, 50, 5);
blocktree.insert_shreds(shreds).unwrap();
let blocktree = Arc::new(blocktree);
let (sender, receiver) = channel();

View File

@ -73,7 +73,7 @@ impl fmt::Debug for Packet {
impl Default for Packet {
fn default() -> Packet {
Packet {
data: unsafe { std::mem::uninitialized() },
data: unsafe { std::mem::MaybeUninit::uninit().assume_init() },
meta: Meta::default(),
}
}

View File

@ -403,14 +403,13 @@ impl Service for RepairService {
mod test {
use super::*;
use crate::blocktree::tests::{
make_chaining_slot_entries, make_chaining_slot_entries_using_shreds,
make_many_slot_entries_using_shreds, make_slot_entries,
make_chaining_slot_entries, make_many_slot_entries, make_slot_entries,
};
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::cluster_info::Node;
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
use std::cmp::min;
use std::sync::mpsc::channel;
use std::thread::Builder;
@ -421,10 +420,10 @@ mod test {
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Create some orphan slots
let (mut blobs, _) = make_slot_entries(1, 0, 1);
let (blobs2, _) = make_slot_entries(5, 2, 1);
blobs.extend(blobs2);
blocktree.write_blobs(&blobs).unwrap();
let (mut shreds, _) = make_slot_entries(1, 0, 1);
let (shreds2, _) = make_slot_entries(5, 2, 1);
shreds.extend(shreds2);
blocktree.insert_shreds(shreds).unwrap();
assert_eq!(
RepairService::generate_repairs(&blocktree, 0, 2).unwrap(),
vec![RepairType::HighestBlob(0, 0), RepairType::Orphan(2)]
@ -440,11 +439,11 @@ mod test {
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (blobs, _) = make_slot_entries(2, 0, 1);
let (shreds, _) = make_slot_entries(2, 0, 1);
// Write this blob to slot 2, should chain to slot 0, which we haven't received
// any blobs for
blocktree.write_blobs(&blobs).unwrap();
blocktree.insert_shreds(shreds).unwrap();
// Check that repair tries to patch the empty slot
assert_eq!(
@ -465,8 +464,7 @@ mod test {
let num_slots = 2;
// Create some blobs
let (mut shreds, _) =
make_many_slot_entries_using_shreds(0, num_slots as u64, 50 as u64);
let (mut shreds, _) = make_many_slot_entries(0, num_slots as u64, 50 as u64);
let num_shreds = shreds.len() as u64;
let num_shreds_per_slot = num_shreds / num_slots;
@ -510,18 +508,20 @@ mod test {
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_entries_per_slot = 10;
let num_entries_per_slot = 100;
// Create some blobs
let (mut blobs, _) = make_slot_entries(0, 0, num_entries_per_slot as u64);
let (mut shreds, _) = make_slot_entries(0, 0, num_entries_per_slot as u64);
let num_shreds_per_slot = shreds.len() as u64;
// Remove is_last flag on last blob
blobs.last_mut().unwrap().set_flags(0);
// Remove last shred (which is also last in slot) so that slot is not complete
shreds.pop();
blocktree.write_blobs(&blobs).unwrap();
blocktree.insert_shreds(shreds).unwrap();
// We didn't get the last blob for this slot, so ask for the highest blob for that slot
let expected: Vec<RepairType> = vec![RepairType::HighestBlob(0, num_entries_per_slot)];
let expected: Vec<RepairType> =
vec![RepairType::HighestBlob(0, num_shreds_per_slot - 1)];
assert_eq!(
RepairService::generate_repairs(&blocktree, 0, std::usize::MAX).unwrap(),
@ -540,7 +540,7 @@ mod test {
let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
let num_entries_per_slot = 10;
let shreds = make_chaining_slot_entries_using_shreds(&slots, num_entries_per_slot);
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
for (mut slot_shreds, _) in shreds.into_iter() {
slot_shreds.remove(0);
blocktree.insert_shreds(slot_shreds).unwrap();
@ -593,9 +593,9 @@ mod test {
// Create some blobs in slots 0..num_slots
for i in start..start + num_slots {
let parent = if i > 0 { i - 1 } else { 0 };
let (blobs, _) = make_slot_entries(i, parent, num_entries_per_slot as u64);
let (shreds, _) = make_slot_entries(i, parent, num_entries_per_slot as u64);
blocktree.write_blobs(&blobs).unwrap();
blocktree.insert_shreds(shreds).unwrap();
}
let end = 4;
@ -631,25 +631,25 @@ mod test {
let root = 10;
let fork1 = vec![5, 7, root, 15, 20, 21];
let fork1_blobs: Vec<_> = make_chaining_slot_entries(&fork1, num_entries_per_slot)
let fork1_shreds: Vec<_> = make_chaining_slot_entries(&fork1, num_entries_per_slot)
.into_iter()
.flat_map(|(blobs, _)| blobs)
.flat_map(|(shreds, _)| shreds)
.collect();
let fork2 = vec![8, 12];
let fork2_blobs = make_chaining_slot_entries(&fork2, num_entries_per_slot);
let fork2_shreds = make_chaining_slot_entries(&fork2, num_entries_per_slot);
// Remove the last blob from each slot to make an incomplete slot
let fork2_incomplete_blobs: Vec<_> = fork2_blobs
let fork2_incomplete_shreds: Vec<_> = fork2_shreds
.into_iter()
.flat_map(|(mut blobs, _)| {
blobs.pop();
blobs
.flat_map(|(mut shreds, _)| {
shreds.pop();
shreds
})
.collect();
let mut full_slots = BTreeSet::new();
blocktree.write_blobs(&fork1_blobs).unwrap();
blocktree.write_blobs(&fork2_incomplete_blobs).unwrap();
blocktree.insert_shreds(fork1_shreds).unwrap();
blocktree.insert_shreds(fork2_incomplete_shreds).unwrap();
// Test that only slots > root from fork1 were included
let epoch_schedule = EpochSchedule::new(32, 32, false);
@ -668,11 +668,11 @@ mod test {
let last_epoch = epoch_schedule.get_stakers_epoch(root);
let last_slot = epoch_schedule.get_last_slot_in_epoch(last_epoch);
let fork3 = vec![last_slot, last_slot + 1];
let fork3_blobs: Vec<_> = make_chaining_slot_entries(&fork3, num_entries_per_slot)
let fork3_shreds: Vec<_> = make_chaining_slot_entries(&fork3, num_entries_per_slot)
.into_iter()
.flat_map(|(blobs, _)| blobs)
.flat_map(|(shreds, _)| shreds)
.collect();
blocktree.write_blobs(&fork3_blobs).unwrap();
blocktree.insert_shreds(fork3_shreds).unwrap();
RepairService::get_completed_slots_past_root(
&blocktree,
&mut full_slots,
@ -705,22 +705,23 @@ mod test {
.name("writer".to_string())
.spawn(move || {
let slots: Vec<_> = (1..num_slots + 1).collect();
let mut blobs: Vec<_> = make_chaining_slot_entries(&slots, entries_per_slot)
let mut shreds: Vec<_> = make_chaining_slot_entries(&slots, entries_per_slot)
.into_iter()
.flat_map(|(blobs, _)| blobs)
.flat_map(|(shreds, _)| shreds)
.collect();
blobs.shuffle(&mut thread_rng());
shreds.shuffle(&mut thread_rng());
let mut i = 0;
let max_step = entries_per_slot * 4;
let repair_interval_ms = 10;
let mut rng = rand::thread_rng();
while i < blobs.len() as usize {
let step = rng.gen_range(1, max_step + 1);
blocktree_
.insert_data_blobs(&blobs[i..min(i + max_step as usize, blobs.len())])
.unwrap();
let num_shreds = shreds.len();
while i < num_shreds {
let step = rng.gen_range(1, max_step + 1) as usize;
let step = std::cmp::min(step, num_shreds - i);
let shreds_to_insert = shreds.drain(..step).collect_vec();
blocktree_.insert_shreds(shreds_to_insert).unwrap();
sleep(Duration::from_millis(repair_interval_ms));
i += step as usize;
i += step;
}
})
.unwrap();
@ -747,8 +748,8 @@ mod test {
// Update with new root, should filter out the slots <= root
root = num_slots / 2;
let (blobs, _) = make_slot_entries(num_slots + 2, num_slots + 1, entries_per_slot);
blocktree.insert_data_blobs(&blobs).unwrap();
let (shreds, _) = make_slot_entries(num_slots + 2, num_slots + 1, entries_per_slot);
blocktree.insert_shreds(shreds).unwrap();
RepairService::update_epoch_slots(
Pubkey::default(),
root,

View File

@ -841,12 +841,10 @@ fn aggregate_stake_lockouts(
mod test {
use super::*;
use crate::bank_forks::Confidence;
use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::tests::entries_to_test_shreds;
use crate::blocktree::tests::make_slot_entries;
use crate::blocktree::{entries_to_test_shreds, get_tmp_ledger_path};
use crate::entry;
use crate::erasure::ErasureConfig;
use crate::genesis_utils::{create_genesis_block, create_genesis_block_with_leader};
use crate::packet::Blob;
use crate::replay_stage::ReplayStage;
use crate::shred::Shred;
use solana_runtime::genesis_utils::GenesisBlockInfo;
@ -873,11 +871,8 @@ mod test {
bank_forks.working_bank().freeze();
// Insert blob for slot 1, generate new forks, check result
let mut blob_slot_1 = Blob::default();
blob_slot_1.set_slot(1);
blob_slot_1.set_parent(0);
blob_slot_1.set_erasure_config(&ErasureConfig::default());
blocktree.insert_data_blobs(&vec![blob_slot_1]).unwrap();
let (shreds, _) = make_slot_entries(1, 0, 8);
blocktree.insert_shreds(shreds).unwrap();
assert!(bank_forks.get(1).is_none());
ReplayStage::generate_new_bank_forks(
&blocktree,
@ -887,11 +882,8 @@ mod test {
assert!(bank_forks.get(1).is_some());
// Insert blob for slot 3, generate new forks, check result
let mut blob_slot_2 = Blob::default();
blob_slot_2.set_slot(2);
blob_slot_2.set_parent(0);
blob_slot_2.set_erasure_config(&ErasureConfig::default());
blocktree.insert_data_blobs(&vec![blob_slot_2]).unwrap();
let (shreds, _) = make_slot_entries(2, 0, 8);
blocktree.insert_shreds(shreds).unwrap();
assert!(bank_forks.get(2).is_none());
ReplayStage::generate_new_bank_forks(
&blocktree,

View File

@ -343,7 +343,7 @@ impl Write for Shredder {
.unwrap();
let written = self.active_offset;
let (slice_len, left_capacity) = match current_shred.borrow_mut() {
let (slice_len, capacity) = match current_shred.borrow_mut() {
Shred::FirstInSlot(s) => s.write_at(written, buf),
Shred::FirstInFECSet(s)
| Shred::Data(s)
@ -352,7 +352,7 @@ impl Write for Shredder {
Shred::Coding(s) => s.write_at(written, buf),
};
let active_shred = if buf.len() > slice_len || left_capacity == 0 {
let active_shred = if buf.len() > slice_len || capacity == 0 {
self.finalize_data_shred(current_shred);
// Continue generating more data shreds.
// If the caller decides to finalize the FEC block or Slot, the data shred will

View File

@ -234,7 +234,7 @@ impl Validator {
bank.ticks_per_slot(),
&id,
&blocktree,
blocktree.new_blobs_signals.first().cloned(),
blocktree.new_shreds_signals.first().cloned(),
&leader_schedule_cache,
&poh_config,
);
@ -245,7 +245,7 @@ impl Validator {
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &poh_config, &exit);
assert_eq!(
blocktree.new_blobs_signals.len(),
blocktree.new_shreds_signals.len(),
1,
"New blob signal for the TVU should be the same as the clear bank signal."
);

View File

@ -491,14 +491,6 @@ fn test_fail_entry_verification_leader() {
test_faulty_node(BroadcastStageType::FailEntryVerification);
}
#[allow(unused_attributes)]
#[test]
#[serial]
#[ignore]
fn test_bad_blob_size_leader() {
test_faulty_node(BroadcastStageType::BroadcastBadBlobSizes);
}
#[test]
#[ignore]
fn test_fake_blobs_broadcast_leader() {