revert-revert-erasure and erasure fixes (#3833)
* fix erasure, more tests for full blobs, more metrics
* Revert "Revert "Use Rust erasure library and turn on erasure (#3768)" (#3827)"
This reverts commit 4b8cb72977
.
This commit is contained in:
parent
e03215c4c0
commit
6bef16a6a1
|
@ -1877,6 +1877,17 @@ dependencies = [
|
|||
"redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reed-solomon-erasure"
|
||||
version = "3.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"cc 1.0.31 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"smallvec 0.6.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.1.2"
|
||||
|
@ -2170,6 +2181,7 @@ dependencies = [
|
|||
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"reed-solomon-erasure 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"reqwest 0.9.15 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"ring 0.13.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rocksdb 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -3573,6 +3585,7 @@ dependencies = [
|
|||
"checksum redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)" = "423e376fffca3dfa06c9e9790a9ccd282fafb3cc6e6397d01dbf64f9bacc6b85"
|
||||
"checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76"
|
||||
"checksum redox_users 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3fe5204c3a17e97dde73f285d49be585df59ed84b50a872baf416e73b62c3828"
|
||||
"checksum reed-solomon-erasure 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77cbbd4c02f53e345fe49e74255a1b10080731ffb2a03475e11df7fc8a043c37"
|
||||
"checksum regex 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "53ee8cfdddb2e0291adfb9f13d31d3bbe0a03c9a402c01b1e24188d86c35b24f"
|
||||
"checksum regex-syntax 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8c2f35eedad5295fdf00a63d7d4b238135723f92b434ec06774dad15c7ab0861"
|
||||
"checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5"
|
||||
|
|
|
@ -40,6 +40,7 @@ nix = "0.13.0"
|
|||
rand = "0.6.5"
|
||||
rand_chacha = "0.1.1"
|
||||
rayon = "1.0.0"
|
||||
reed-solomon-erasure = "3.1.1"
|
||||
reqwest = "0.9.11"
|
||||
ring = "0.13.2"
|
||||
rocksdb = "0.11.0"
|
||||
|
|
|
@ -24,9 +24,8 @@ fn main() {
|
|||
|
||||
let chacha = !env::var("CARGO_FEATURE_CHACHA").is_err();
|
||||
let cuda = !env::var("CARGO_FEATURE_CUDA").is_err();
|
||||
let erasure = !env::var("CARGO_FEATURE_ERASURE").is_err();
|
||||
|
||||
if chacha || cuda || erasure {
|
||||
if chacha || cuda {
|
||||
println!("cargo:rerun-if-changed={}", perf_libs_dir);
|
||||
println!("cargo:rustc-link-search=native={}", perf_libs_dir);
|
||||
}
|
||||
|
@ -46,30 +45,4 @@ fn main() {
|
|||
println!("cargo:rustc-link-lib=dylib=cuda");
|
||||
println!("cargo:rustc-link-lib=dylib=cudadevrt");
|
||||
}
|
||||
if erasure {
|
||||
#[cfg(any(target_os = "macos", target_os = "ios"))]
|
||||
{
|
||||
println!(
|
||||
"cargo:rerun-if-changed={}/libgf_complete.dylib",
|
||||
perf_libs_dir
|
||||
);
|
||||
println!("cargo:rerun-if-changed={}/libJerasure.dylib", perf_libs_dir);
|
||||
}
|
||||
#[cfg(all(unix, not(any(target_os = "macos", target_os = "ios"))))]
|
||||
{
|
||||
println!("cargo:rerun-if-changed={}/libgf_complete.so", perf_libs_dir);
|
||||
println!("cargo:rerun-if-changed={}/libJerasure.so", perf_libs_dir);
|
||||
}
|
||||
#[cfg(windows)]
|
||||
{
|
||||
println!(
|
||||
"cargo:rerun-if-changed={}/libgf_complete.dll",
|
||||
perf_libs_dir
|
||||
);
|
||||
println!("cargo:rerun-if-changed={}/libJerasure.dll", perf_libs_dir);
|
||||
}
|
||||
|
||||
println!("cargo:rustc-link-lib=dylib=Jerasure");
|
||||
println!("cargo:rustc-link-lib=dylib=gf_complete");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@
|
|||
//! access read to a persistent file-based ledger.
|
||||
|
||||
use crate::entry::Entry;
|
||||
#[cfg(feature = "erasure")]
|
||||
use crate::erasure;
|
||||
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
|
||||
use crate::result::{Error, Result};
|
||||
|
@ -17,7 +16,6 @@ use hashbrown::HashMap;
|
|||
#[cfg(not(feature = "kvstore"))]
|
||||
use rocksdb;
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
use solana_metrics::counter::Counter;
|
||||
|
||||
use solana_sdk::genesis_block::GenesisBlock;
|
||||
|
@ -79,9 +77,9 @@ pub struct Blocktree {
|
|||
meta_cf: LedgerColumn<cf::SlotMeta>,
|
||||
data_cf: LedgerColumn<cf::Data>,
|
||||
erasure_cf: LedgerColumn<cf::Coding>,
|
||||
#[cfg(feature = "erasure")]
|
||||
erasure_meta_cf: LedgerColumn<cf::ErasureMeta>,
|
||||
orphans_cf: LedgerColumn<cf::Orphans>,
|
||||
session: Arc<erasure::Session>,
|
||||
pub new_blobs_signals: Vec<SyncSender<bool>>,
|
||||
pub root_slot: RwLock<u64>,
|
||||
}
|
||||
|
@ -92,7 +90,6 @@ pub const META_CF: &str = "meta";
|
|||
pub const DATA_CF: &str = "data";
|
||||
// Column family for erasure data
|
||||
pub const ERASURE_CF: &str = "erasure";
|
||||
#[cfg(feature = "erasure")]
|
||||
pub const ERASURE_META_CF: &str = "erasure_meta";
|
||||
// Column family for orphans data
|
||||
pub const ORPHANS_CF: &str = "orphans";
|
||||
|
@ -116,7 +113,7 @@ impl Blocktree {
|
|||
|
||||
// Create the erasure column family
|
||||
let erasure_cf = LedgerColumn::new(&db);
|
||||
#[cfg(feature = "erasure")]
|
||||
|
||||
let erasure_meta_cf = LedgerColumn::new(&db);
|
||||
|
||||
// Create the orphans column family. An "orphan" is defined as
|
||||
|
@ -124,14 +121,17 @@ impl Blocktree {
|
|||
// known parent
|
||||
let orphans_cf = LedgerColumn::new(&db);
|
||||
|
||||
// setup erasure
|
||||
let session = Arc::new(erasure::Session::default());
|
||||
|
||||
Ok(Blocktree {
|
||||
db,
|
||||
meta_cf,
|
||||
data_cf,
|
||||
erasure_cf,
|
||||
#[cfg(feature = "erasure")]
|
||||
erasure_meta_cf,
|
||||
orphans_cf,
|
||||
session,
|
||||
new_blobs_signals: vec![],
|
||||
root_slot: RwLock::new(0),
|
||||
})
|
||||
|
@ -259,7 +259,6 @@ impl Blocktree {
|
|||
// A map from slot to a 2-tuple of metadata: (working copy, backup copy),
|
||||
// so we can detect changes to the slot metadata later
|
||||
let mut slot_meta_working_set = HashMap::new();
|
||||
#[cfg(feature = "erasure")]
|
||||
let mut erasure_meta_working_set = HashMap::new();
|
||||
let new_blobs: Vec<_> = new_blobs.into_iter().collect();
|
||||
let mut prev_inserted_blob_datas = HashMap::new();
|
||||
|
@ -301,20 +300,17 @@ impl Blocktree {
|
|||
continue;
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
{
|
||||
let set_index = ErasureMeta::set_index_for(blob.index());
|
||||
let erasure_meta_entry = erasure_meta_working_set
|
||||
.entry((blob_slot, set_index))
|
||||
.or_insert_with(|| {
|
||||
self.erasure_meta_cf
|
||||
.get((blob_slot, set_index))
|
||||
.expect("Expect database get to succeed")
|
||||
.unwrap_or_else(|| ErasureMeta::new(set_index))
|
||||
});
|
||||
let set_index = ErasureMeta::set_index_for(blob.index());
|
||||
let erasure_meta_entry = erasure_meta_working_set
|
||||
.entry((blob_slot, set_index))
|
||||
.or_insert_with(|| {
|
||||
self.erasure_meta_cf
|
||||
.get((blob_slot, set_index))
|
||||
.expect("Expect database get to succeed")
|
||||
.unwrap_or_else(|| ErasureMeta::new(set_index))
|
||||
});
|
||||
|
||||
erasure_meta_entry.set_data_present(blob.index());
|
||||
}
|
||||
erasure_meta_entry.set_data_present(blob.index(), true);
|
||||
|
||||
let _ = self.insert_data_blob(
|
||||
blob,
|
||||
|
@ -339,11 +335,8 @@ impl Blocktree {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
{
|
||||
for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() {
|
||||
write_batch.put::<cf::ErasureMeta>((*slot, *set_index), erasure_meta)?;
|
||||
}
|
||||
for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() {
|
||||
write_batch.put::<cf::ErasureMeta>((*slot, *set_index), erasure_meta)?;
|
||||
}
|
||||
|
||||
self.db.write(write_batch)?;
|
||||
|
@ -354,36 +347,8 @@ impl Blocktree {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
for ((slot, set_index), erasure_meta) in erasure_meta_working_set.into_iter() {
|
||||
if erasure_meta.can_recover() {
|
||||
match self.recover(slot, set_index) {
|
||||
Ok(recovered) => {
|
||||
inc_new_counter_info!("erasures-recovered", recovered);
|
||||
}
|
||||
Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => {
|
||||
let mut erasure_meta = self
|
||||
.erasure_meta_cf
|
||||
.get((slot, set_index))?
|
||||
.expect("erasure meta should exist");
|
||||
|
||||
let mut batch = self.db.batch()?;
|
||||
|
||||
let start_index = erasure_meta.start_index();
|
||||
let (_, coding_end_idx) = erasure_meta.end_indexes();
|
||||
|
||||
erasure_meta.coding = 0;
|
||||
batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
|
||||
|
||||
for idx in start_index..coding_end_idx {
|
||||
batch.delete::<cf::Coding>((slot, idx))?;
|
||||
}
|
||||
|
||||
self.db.write(batch)?;
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
self.try_erasure_recover(&erasure_meta, slot, set_index)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -453,26 +418,42 @@ impl Blocktree {
|
|||
pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
|
||||
self.erasure_cf.get_bytes((slot, index))
|
||||
}
|
||||
|
||||
pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> {
|
||||
self.erasure_cf.delete((slot, index))
|
||||
let set_index = ErasureMeta::set_index_for(index);
|
||||
|
||||
let mut erasure_meta = self
|
||||
.erasure_meta_cf
|
||||
.get((slot, set_index))?
|
||||
.unwrap_or_else(|| ErasureMeta::new(set_index));
|
||||
|
||||
erasure_meta.set_coding_present(index, false);
|
||||
|
||||
let mut batch = self.db.batch()?;
|
||||
|
||||
batch.delete::<cf::Coding>((slot, index))?;
|
||||
batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
|
||||
|
||||
self.db.write(batch)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
|
||||
self.data_cf.get_bytes((slot, index))
|
||||
}
|
||||
|
||||
/// For benchmarks, testing, and setup.
|
||||
/// Does no metadata tracking. Use with care.
|
||||
pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
|
||||
self.data_cf.put_bytes((slot, index), bytes)
|
||||
}
|
||||
|
||||
pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
|
||||
self.erasure_cf.put_bytes((slot, index), bytes)
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "erasure"))]
|
||||
#[inline]
|
||||
pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
|
||||
self.put_coding_blob_bytes_raw(slot, index, bytes)
|
||||
}
|
||||
|
||||
/// this function will insert coding blobs and also automatically track erasure-related
|
||||
/// metadata. If recovery is available it will be done
|
||||
#[cfg(feature = "erasure")]
|
||||
pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
|
||||
let set_index = ErasureMeta::set_index_for(index);
|
||||
let mut erasure_meta = self
|
||||
|
@ -480,7 +461,7 @@ impl Blocktree {
|
|||
.get((slot, set_index))?
|
||||
.unwrap_or_else(|| ErasureMeta::new(set_index));
|
||||
|
||||
erasure_meta.set_coding_present(index);
|
||||
erasure_meta.set_coding_present(index, true);
|
||||
|
||||
let mut writebatch = self.db.batch()?;
|
||||
|
||||
|
@ -490,43 +471,28 @@ impl Blocktree {
|
|||
|
||||
self.db.write(writebatch)?;
|
||||
|
||||
if erasure_meta.can_recover() {
|
||||
match self.recover(slot, set_index) {
|
||||
Ok(recovered) => {
|
||||
inc_new_counter_info!("erasures-recovered", recovered);
|
||||
return Ok(());
|
||||
}
|
||||
Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => {
|
||||
let start_index = erasure_meta.start_index();
|
||||
let (_, coding_end_idx) = erasure_meta.end_indexes();
|
||||
let mut batch = self.db.batch()?;
|
||||
self.try_erasure_recover(&erasure_meta, slot, set_index)
|
||||
}
|
||||
|
||||
erasure_meta.coding = 0;
|
||||
batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
|
||||
|
||||
for idx in start_index..coding_end_idx {
|
||||
batch.delete::<cf::Coding>((slot, idx as u64))?;
|
||||
}
|
||||
|
||||
self.db.write(batch)?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
fn try_erasure_recover(
|
||||
&self,
|
||||
erasure_meta: &ErasureMeta,
|
||||
slot: u64,
|
||||
set_index: u64,
|
||||
) -> Result<()> {
|
||||
match erasure_meta.status() {
|
||||
ErasureMetaStatus::CanRecover => {
|
||||
let recovered = self.recover(slot, set_index)?;
|
||||
inc_new_counter_info!("blocktree-erasure-blobs_recovered", recovered);
|
||||
}
|
||||
ErasureMetaStatus::StillNeed(needed) => {
|
||||
inc_new_counter_info!("blocktree-erasure-blobs_needed", needed)
|
||||
}
|
||||
ErasureMetaStatus::DataFull => inc_new_counter_info!("blocktree-erasure-complete", 1),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_data_raw(&self, slot: u64, index: u64, value: &[u8]) -> Result<()> {
|
||||
self.data_cf.put_bytes((slot, index), value)
|
||||
}
|
||||
|
||||
pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
|
||||
self.data_cf.put_bytes((slot, index), bytes)
|
||||
}
|
||||
|
||||
pub fn get_data_blob(&self, slot: u64, blob_index: u64) -> Result<Option<Blob>> {
|
||||
let bytes = self.get_data_blob_bytes(slot, blob_index)?;
|
||||
Ok(bytes.map(|bytes| {
|
||||
|
@ -626,20 +592,6 @@ impl Blocktree {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn find_missing_coding_indexes(
|
||||
&self,
|
||||
slot: u64,
|
||||
start_index: u64,
|
||||
end_index: u64,
|
||||
max_missing: usize,
|
||||
) -> Vec<u64> {
|
||||
if let Ok(mut db_iterator) = self.erasure_cf.cursor() {
|
||||
Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing)
|
||||
} else {
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the entry vector for the slot starting with `blob_start_index`
|
||||
pub fn get_slot_entries(
|
||||
&self,
|
||||
|
@ -1088,43 +1040,45 @@ impl Blocktree {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
/// Attempts recovery using erasure coding
|
||||
fn recover(&self, slot: u64, set_index: u64) -> Result<usize> {
|
||||
use crate::erasure::{ErasureError, NUM_CODING, NUM_DATA};
|
||||
use crate::packet::BLOB_DATA_SIZE;
|
||||
use crate::erasure::{ERASURE_SET_SIZE, NUM_DATA};
|
||||
|
||||
let erasure_meta = self.erasure_meta_cf.get((slot, set_index))?.unwrap();
|
||||
|
||||
let start_idx = erasure_meta.start_index();
|
||||
let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes();
|
||||
|
||||
let mut erasures = Vec::with_capacity(NUM_CODING + 1);
|
||||
let (mut data, mut coding) = (vec![], vec![]);
|
||||
let present = &mut [true; ERASURE_SET_SIZE];
|
||||
let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE);
|
||||
let mut size = 0;
|
||||
|
||||
for i in start_idx..coding_end_idx {
|
||||
if erasure_meta.is_coding_present(i) {
|
||||
let blob_bytes = self
|
||||
let mut blob_bytes = self
|
||||
.erasure_cf
|
||||
.get_bytes((slot, i))?
|
||||
.expect("erasure_meta must have no false positives");
|
||||
|
||||
blob_bytes.drain(..BLOB_HEADER_SIZE);
|
||||
|
||||
if size == 0 {
|
||||
size = blob_bytes.len() - BLOB_HEADER_SIZE;
|
||||
size = blob_bytes.len();
|
||||
}
|
||||
|
||||
coding.push(blob_bytes);
|
||||
blobs.push(blob_bytes);
|
||||
} else {
|
||||
let set_relative_idx = (i - start_idx) + NUM_DATA as u64;
|
||||
coding.push(vec![0; crate::packet::BLOB_SIZE]);
|
||||
erasures.push(set_relative_idx as i32);
|
||||
let set_relative_idx = (i - start_idx) as usize + NUM_DATA;
|
||||
blobs.push(vec![0; size]);
|
||||
present[set_relative_idx] = false;
|
||||
}
|
||||
}
|
||||
|
||||
assert_ne!(size, 0);
|
||||
|
||||
for i in start_idx..data_end_idx {
|
||||
let set_relative_idx = (i - start_idx) as usize;
|
||||
|
||||
if erasure_meta.is_data_present(i) {
|
||||
let mut blob_bytes = self
|
||||
.data_cf
|
||||
|
@ -1132,90 +1086,28 @@ impl Blocktree {
|
|||
.expect("erasure_meta must have no false positives");
|
||||
|
||||
// If data is too short, extend it with zeroes
|
||||
if blob_bytes.len() < size {
|
||||
blob_bytes.resize(size, 0u8);
|
||||
}
|
||||
blob_bytes.resize(size, 0u8);
|
||||
|
||||
data.push(blob_bytes);
|
||||
blobs.insert(set_relative_idx, blob_bytes);
|
||||
} else {
|
||||
let set_relative_index = i - start_idx;
|
||||
data.push(vec![0; size]);
|
||||
blobs.insert(set_relative_idx, vec![0u8; size]);
|
||||
// data erasures must come before any coding erasures if present
|
||||
erasures.insert(0, set_relative_index as i32);
|
||||
present[set_relative_idx] = false;
|
||||
}
|
||||
}
|
||||
|
||||
let mut coding_ptrs: Vec<_> = coding
|
||||
.iter_mut()
|
||||
.map(|coding_bytes| &mut coding_bytes[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size])
|
||||
.collect();
|
||||
let (recovered_data, recovered_coding) = self
|
||||
.session
|
||||
.reconstruct_blobs(&mut blobs, present, size, start_idx, slot)?;
|
||||
|
||||
let mut data_ptrs: Vec<_> = data
|
||||
.iter_mut()
|
||||
.map(|data_bytes| &mut data_bytes[..size])
|
||||
.collect();
|
||||
let amount_recovered = recovered_data.len() + recovered_coding.len();
|
||||
|
||||
// Marks the end
|
||||
erasures.push(-1);
|
||||
trace!("erasures: {:?}, size: {}", erasures, size);
|
||||
|
||||
erasure::decode_blocks(
|
||||
data_ptrs.as_mut_slice(),
|
||||
coding_ptrs.as_mut_slice(),
|
||||
&erasures,
|
||||
)?;
|
||||
|
||||
// Create the missing blobs from the reconstructed data
|
||||
let block_start_idx = erasure_meta.start_index();
|
||||
let (mut recovered_data, mut recovered_coding) = (vec![], vec![]);
|
||||
|
||||
for i in &erasures[..erasures.len() - 1] {
|
||||
let n = *i as usize;
|
||||
|
||||
let (data_size, idx, first_byte);
|
||||
|
||||
if n < NUM_DATA {
|
||||
let mut blob = Blob::new(&data_ptrs[n]);
|
||||
|
||||
idx = n as u64 + block_start_idx;
|
||||
data_size = blob.data_size() as usize - BLOB_HEADER_SIZE;
|
||||
first_byte = blob.data[0];
|
||||
|
||||
if data_size > BLOB_DATA_SIZE {
|
||||
error!("corrupt data blob[{}] data_size: {}", idx, data_size);
|
||||
return Err(Error::ErasureError(ErasureError::CorruptCoding));
|
||||
}
|
||||
|
||||
blob.set_slot(slot);
|
||||
blob.set_index(idx);
|
||||
blob.set_size(data_size);
|
||||
recovered_data.push(blob);
|
||||
} else {
|
||||
let mut blob = Blob::new(&coding_ptrs[n - NUM_DATA]);
|
||||
|
||||
idx = (n - NUM_DATA) as u64 + block_start_idx;
|
||||
data_size = size;
|
||||
first_byte = blob.data[0];
|
||||
|
||||
if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
|
||||
error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
|
||||
return Err(Error::ErasureError(ErasureError::CorruptCoding));
|
||||
}
|
||||
|
||||
blob.set_slot(slot);
|
||||
blob.set_index(idx);
|
||||
blob.set_data_size(data_size as u64);
|
||||
recovered_coding.push(blob);
|
||||
}
|
||||
|
||||
trace!(
|
||||
"erasures[{}] ({}) size: {} data[0]: {}",
|
||||
*i,
|
||||
idx,
|
||||
data_size,
|
||||
first_byte,
|
||||
);
|
||||
}
|
||||
trace!(
|
||||
"[recover] reconstruction OK slot: {}, indexes: [{},{})",
|
||||
slot,
|
||||
start_idx,
|
||||
data_end_idx
|
||||
);
|
||||
|
||||
self.write_blobs(recovered_data)?;
|
||||
|
||||
|
@ -1223,7 +1115,7 @@ impl Blocktree {
|
|||
self.put_coding_blob_bytes_raw(slot, blob.index(), &blob.data[..])?;
|
||||
}
|
||||
|
||||
Ok(erasures.len() - 1)
|
||||
Ok(amount_recovered)
|
||||
}
|
||||
|
||||
/// Returns the next consumed index and the number of ticks in the new consumed
|
||||
|
@ -1821,44 +1713,47 @@ pub mod tests {
|
|||
let blocktree_path = get_tmp_ledger_path("test_insert_data_blobs_consecutive");
|
||||
{
|
||||
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
||||
let slot = 0;
|
||||
let parent_slot = 0;
|
||||
// Write entries
|
||||
let num_entries = 21 as u64;
|
||||
let (blobs, original_entries) = make_slot_entries(slot, parent_slot, num_entries);
|
||||
for i in 0..4 {
|
||||
let slot = i;
|
||||
let parent_slot = if i == 0 { 0 } else { i - 1 };
|
||||
// Write entries
|
||||
let num_entries = 21 as u64 * (i + 1);
|
||||
let (blobs, original_entries) = make_slot_entries(slot, parent_slot, num_entries);
|
||||
|
||||
blocktree
|
||||
.write_blobs(blobs.iter().skip(1).step_by(2))
|
||||
.unwrap();
|
||||
blocktree
|
||||
.write_blobs(blobs.iter().skip(1).step_by(2))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]);
|
||||
assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]);
|
||||
|
||||
let meta = blocktree.meta_cf.get(slot).unwrap().unwrap();
|
||||
if num_entries % 2 == 0 {
|
||||
let meta = blocktree.meta_cf.get(slot).unwrap().unwrap();
|
||||
if num_entries % 2 == 0 {
|
||||
assert_eq!(meta.received, num_entries);
|
||||
} else {
|
||||
debug!("got here");
|
||||
assert_eq!(meta.received, num_entries - 1);
|
||||
}
|
||||
assert_eq!(meta.consumed, 0);
|
||||
assert_eq!(meta.parent_slot, parent_slot);
|
||||
if num_entries % 2 == 0 {
|
||||
assert_eq!(meta.last_index, num_entries - 1);
|
||||
} else {
|
||||
assert_eq!(meta.last_index, std::u64::MAX);
|
||||
}
|
||||
|
||||
blocktree.write_blobs(blobs.iter().step_by(2)).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
blocktree.get_slot_entries(slot, 0, None).unwrap(),
|
||||
original_entries,
|
||||
);
|
||||
|
||||
let meta = blocktree.meta_cf.get(slot).unwrap().unwrap();
|
||||
assert_eq!(meta.received, num_entries);
|
||||
} else {
|
||||
assert_eq!(meta.received, num_entries - 1);
|
||||
}
|
||||
assert_eq!(meta.consumed, 0);
|
||||
assert_eq!(meta.parent_slot, 0);
|
||||
if num_entries % 2 == 0 {
|
||||
assert_eq!(meta.consumed, num_entries);
|
||||
assert_eq!(meta.parent_slot, parent_slot);
|
||||
assert_eq!(meta.last_index, num_entries - 1);
|
||||
} else {
|
||||
assert_eq!(meta.last_index, std::u64::MAX);
|
||||
}
|
||||
|
||||
blocktree.write_blobs(blobs.iter().step_by(2)).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
blocktree.get_slot_entries(0, 0, None).unwrap(),
|
||||
original_entries,
|
||||
);
|
||||
|
||||
let meta = blocktree.meta_cf.get(slot).unwrap().unwrap();
|
||||
assert_eq!(meta.received, num_entries);
|
||||
assert_eq!(meta.consumed, num_entries);
|
||||
assert_eq!(meta.parent_slot, 0);
|
||||
assert_eq!(meta.last_index, num_entries - 1);
|
||||
}
|
||||
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
|
@ -2665,7 +2560,6 @@ pub mod tests {
|
|||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
mod erasure {
|
||||
use super::*;
|
||||
use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec};
|
||||
|
@ -2730,7 +2624,7 @@ pub mod tests {
|
|||
assert_eq!(erasure_meta.data, 0x00FF);
|
||||
assert_eq!(erasure_meta.coding, 0x0);
|
||||
|
||||
let mut coding_generator = CodingGenerator::new();
|
||||
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
|
||||
let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]);
|
||||
|
||||
for shared_coding_blob in coding_blobs {
|
||||
|
@ -2749,6 +2643,23 @@ pub mod tests {
|
|||
|
||||
assert_eq!(erasure_meta.data, 0xFFFF);
|
||||
assert_eq!(erasure_meta.coding, 0x0F);
|
||||
|
||||
let (start_idx, coding_end_idx) =
|
||||
(erasure_meta.start_index(), erasure_meta.end_indexes().1);
|
||||
|
||||
for idx in start_idx..coding_end_idx {
|
||||
blocktree.delete_coding_blob(slot, idx).unwrap();
|
||||
}
|
||||
|
||||
let erasure_meta = blocktree
|
||||
.erasure_meta_cf
|
||||
.get((slot, 0))
|
||||
.expect("DB get must succeed")
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull);
|
||||
assert_eq!(erasure_meta.data, 0xFFFF);
|
||||
assert_eq!(erasure_meta.coding, 0x0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -2766,11 +2677,12 @@ pub mod tests {
|
|||
.map(Blob::into)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut coding_generator = CodingGenerator::new();
|
||||
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
|
||||
|
||||
for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() {
|
||||
let focused_index = (set_index + 1) * NUM_DATA - 1;
|
||||
let coding_blobs = coding_generator.next(&data_blobs);
|
||||
|
||||
assert_eq!(coding_blobs.len(), NUM_CODING);
|
||||
|
||||
let deleted_data = data_blobs[NUM_DATA - 1].clone();
|
||||
|
@ -2821,13 +2733,12 @@ pub mod tests {
|
|||
Blocktree::destroy(&ledger_path).expect("Expect successful Blocktree destruction");
|
||||
}
|
||||
|
||||
/// FIXME: JERASURE Threading: see Issue
|
||||
/// [#3725](https://github.com/solana-labs/solana/issues/3725)
|
||||
#[test]
|
||||
fn test_recovery_multi_slot_multi_thread() {
|
||||
use rand::rngs::SmallRng;
|
||||
use rand::SeedableRng;
|
||||
use std::thread;
|
||||
|
||||
const USE_THREADS: bool = true;
|
||||
let slots = vec![0, 3, 5, 50, 100];
|
||||
let max_erasure_sets = 16;
|
||||
solana_logger::setup();
|
||||
|
@ -2837,7 +2748,7 @@ pub mod tests {
|
|||
|
||||
// Specification should generate a ledger where each slot has an random number of
|
||||
// erasure sets. Odd erasure sets will have all data blobs and no coding blobs, and even ones
|
||||
// will have between 1-4 data blobs missing and all coding blobs
|
||||
// will have between 1 data blob missing and 1 coding blob
|
||||
let specs = slots
|
||||
.iter()
|
||||
.map(|&slot| {
|
||||
|
@ -2848,7 +2759,7 @@ pub mod tests {
|
|||
let (num_data, num_coding) = if set_index % 2 == 0 {
|
||||
(NUM_DATA - rng.gen_range(1, 5), NUM_CODING)
|
||||
} else {
|
||||
(NUM_DATA, 0)
|
||||
(NUM_DATA - 1, NUM_CODING - 1)
|
||||
};
|
||||
ErasureSpec {
|
||||
set_index,
|
||||
|
@ -2873,35 +2784,60 @@ pub mod tests {
|
|||
for slot_model in model.clone() {
|
||||
let blocktree = Arc::clone(&blocktree);
|
||||
let slot = slot_model.slot;
|
||||
let closure = move || {
|
||||
let mut rng = SmallRng::from_rng(&mut rng).unwrap();
|
||||
let handle = thread::spawn(move || {
|
||||
for erasure_set in slot_model.chunks {
|
||||
blocktree
|
||||
.write_shared_blobs(erasure_set.data)
|
||||
.expect("Writing data blobs must succeed");
|
||||
debug!(
|
||||
"multislot: wrote data: slot: {}, erasure_set: {}",
|
||||
slot, erasure_set.set_index
|
||||
);
|
||||
|
||||
for shared_coding_blob in erasure_set.coding {
|
||||
let blob = shared_coding_blob.read().unwrap();
|
||||
let size = blob.size() + BLOB_HEADER_SIZE;
|
||||
// for even sets, write data blobs first, then write coding blobs, which
|
||||
// should trigger recovery since all coding blobs will be inserted and
|
||||
// between 1-4 data blobs are missing
|
||||
if rng.gen() {
|
||||
blocktree
|
||||
.put_coding_blob_bytes(slot, blob.index(), &blob.data[..size])
|
||||
.expect("Writing coding blobs must succeed");
|
||||
}
|
||||
debug!(
|
||||
"multislot: wrote coding: slot: {}, erasure_set: {}",
|
||||
slot, erasure_set.set_index
|
||||
);
|
||||
}
|
||||
};
|
||||
.write_shared_blobs(erasure_set.data)
|
||||
.expect("Writing data blobs must succeed");
|
||||
debug!(
|
||||
"multislot: wrote data: slot: {}, erasure_set: {}",
|
||||
slot, erasure_set.set_index
|
||||
);
|
||||
|
||||
if USE_THREADS {
|
||||
handles.push(thread::spawn(closure));
|
||||
} else {
|
||||
closure();
|
||||
}
|
||||
for shared_coding_blob in erasure_set.coding {
|
||||
let blob = shared_coding_blob.read().unwrap();
|
||||
let size = blob.size() + BLOB_HEADER_SIZE;
|
||||
blocktree
|
||||
.put_coding_blob_bytes(slot, blob.index(), &blob.data[..size])
|
||||
.expect("Writing coding blobs must succeed");
|
||||
}
|
||||
debug!(
|
||||
"multislot: wrote coding: slot: {}, erasure_set: {}",
|
||||
slot, erasure_set.set_index
|
||||
);
|
||||
} else {
|
||||
// for odd sets, write coding blobs first, then write the data blobs.
|
||||
// writing the data blobs should trigger recovery, since 3/4 coding and
|
||||
// 15/16 data blobs will be present
|
||||
for shared_coding_blob in erasure_set.coding {
|
||||
let blob = shared_coding_blob.read().unwrap();
|
||||
let size = blob.size() + BLOB_HEADER_SIZE;
|
||||
blocktree
|
||||
.put_coding_blob_bytes(slot, blob.index(), &blob.data[..size])
|
||||
.expect("Writing coding blobs must succeed");
|
||||
}
|
||||
debug!(
|
||||
"multislot: wrote coding: slot: {}, erasure_set: {}",
|
||||
slot, erasure_set.set_index
|
||||
);
|
||||
|
||||
blocktree
|
||||
.write_shared_blobs(erasure_set.data)
|
||||
.expect("Writing data blobs must succeed");
|
||||
debug!(
|
||||
"multislot: wrote data: slot: {}, erasure_set: {}",
|
||||
slot, erasure_set.set_index
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
handles
|
||||
|
@ -2926,7 +2862,7 @@ pub mod tests {
|
|||
);
|
||||
|
||||
// all possibility for recovery should be exhausted
|
||||
assert!(!erasure_meta.can_recover());
|
||||
assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull);
|
||||
// Should have all data
|
||||
assert_eq!(erasure_meta.data, 0xFFFF);
|
||||
if set_index % 2 == 0 {
|
||||
|
|
|
@ -28,7 +28,6 @@ pub mod columns {
|
|||
/// Data Column
|
||||
pub struct Data;
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
#[derive(Debug)]
|
||||
/// The erasure meta column
|
||||
pub struct ErasureMeta;
|
||||
|
|
|
@ -138,7 +138,6 @@ impl TypedColumn<Kvs> for cf::SlotMeta {
|
|||
type Type = super::SlotMeta;
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
impl Column<Kvs> for cf::ErasureMeta {
|
||||
const NAME: &'static str = super::ERASURE_META_CF;
|
||||
type Index = (u64, u64);
|
||||
|
@ -157,7 +156,6 @@ impl Column<Kvs> for cf::ErasureMeta {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
impl TypedColumn<Kvs> for cf::ErasureMeta {
|
||||
type Type = super::ErasureMeta;
|
||||
}
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
#[cfg(feature = "erasure")]
|
||||
use crate::erasure::{NUM_CODING, NUM_DATA};
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
|
||||
|
@ -59,7 +58,6 @@ impl SlotMeta {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
|
||||
/// Erasure coding information
|
||||
pub struct ErasureMeta {
|
||||
|
@ -71,7 +69,13 @@ pub struct ErasureMeta {
|
|||
pub coding: u64,
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ErasureMetaStatus {
|
||||
CanRecover,
|
||||
DataFull,
|
||||
StillNeed(usize),
|
||||
}
|
||||
|
||||
impl ErasureMeta {
|
||||
pub fn new(set_index: u64) -> ErasureMeta {
|
||||
ErasureMeta {
|
||||
|
@ -81,46 +85,71 @@ impl ErasureMeta {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn can_recover(&self) -> bool {
|
||||
pub fn status(&self) -> ErasureMetaStatus {
|
||||
let (data_missing, coding_missing) = (
|
||||
NUM_DATA - self.data.count_ones() as usize,
|
||||
NUM_CODING - self.coding.count_ones() as usize,
|
||||
);
|
||||
|
||||
data_missing > 0 && data_missing + coding_missing <= NUM_CODING
|
||||
if data_missing > 0 && data_missing + coding_missing <= NUM_CODING {
|
||||
ErasureMetaStatus::CanRecover
|
||||
} else if data_missing == 0 {
|
||||
ErasureMetaStatus::DataFull
|
||||
} else {
|
||||
ErasureMetaStatus::StillNeed(data_missing + coding_missing - NUM_CODING)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_coding_present(&self, index: u64) -> bool {
|
||||
let set_index = Self::set_index_for(index);
|
||||
let position = index - self.start_index();
|
||||
let start = self.start_index();
|
||||
let end = start + NUM_CODING as u64;
|
||||
|
||||
set_index == self.set_index && self.coding & (1 << position) != 0
|
||||
if start <= index && index < end {
|
||||
let position = index - start;
|
||||
|
||||
self.coding & (1 << position) != 0
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_coding_present(&mut self, index: u64) {
|
||||
pub fn set_coding_present(&mut self, index: u64, present: bool) {
|
||||
let set_index = Self::set_index_for(index);
|
||||
|
||||
if set_index as u64 == self.set_index {
|
||||
let position = index - self.start_index();
|
||||
|
||||
self.coding |= 1 << position;
|
||||
if present {
|
||||
self.coding |= 1 << position;
|
||||
} else {
|
||||
self.coding &= !(1 << position);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_data_present(&self, index: u64) -> bool {
|
||||
let set_index = Self::set_index_for(index);
|
||||
let position = index - self.start_index();
|
||||
let start = self.start_index();
|
||||
let end = start + NUM_DATA as u64;
|
||||
|
||||
set_index == self.set_index && self.data & (1 << position) != 0
|
||||
if start <= index && index < end {
|
||||
let position = index - start;
|
||||
|
||||
self.data & (1 << position) != 0
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_data_present(&mut self, index: u64) {
|
||||
pub fn set_data_present(&mut self, index: u64, present: bool) {
|
||||
let set_index = Self::set_index_for(index);
|
||||
|
||||
if set_index as u64 == self.set_index {
|
||||
let position = index - self.start_index();
|
||||
|
||||
self.data |= 1 << position;
|
||||
if present {
|
||||
self.data |= 1 << position;
|
||||
} else {
|
||||
self.data &= !(1 << position);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,7 +168,29 @@ impl ErasureMeta {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
#[test]
|
||||
fn test_meta_indexes() {
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
let mut rng = thread_rng();
|
||||
|
||||
for _ in 0..100 {
|
||||
let set_index = rng.gen_range(0, 1_000);
|
||||
let blob_index = (set_index * NUM_DATA as u64) + rng.gen_range(0, 16);
|
||||
|
||||
assert_eq!(set_index, ErasureMeta::set_index_for(blob_index));
|
||||
let e_meta = ErasureMeta::new(set_index);
|
||||
|
||||
assert_eq!(e_meta.start_index(), set_index * NUM_DATA as u64);
|
||||
let (data_end_idx, coding_end_idx) = e_meta.end_indexes();
|
||||
assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA as u64);
|
||||
assert_eq!(
|
||||
coding_end_idx,
|
||||
set_index * NUM_DATA as u64 + NUM_CODING as u64
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_meta_coding_present() {
|
||||
let set_index = 0;
|
||||
|
@ -150,7 +201,7 @@ fn test_meta_coding_present() {
|
|||
};
|
||||
|
||||
for i in 0..NUM_CODING as u64 {
|
||||
e_meta.set_coding_present(i);
|
||||
e_meta.set_coding_present(i, true);
|
||||
assert_eq!(e_meta.is_coding_present(i), true);
|
||||
}
|
||||
for i in NUM_CODING as u64..NUM_DATA as u64 {
|
||||
|
@ -160,7 +211,7 @@ fn test_meta_coding_present() {
|
|||
e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 17) as u64);
|
||||
|
||||
for i in (NUM_DATA * 17) as u64..((NUM_DATA * 17) + NUM_CODING) as u64 {
|
||||
e_meta.set_coding_present(i);
|
||||
e_meta.set_coding_present(i, true);
|
||||
assert_eq!(e_meta.is_coding_present(i), true);
|
||||
}
|
||||
for i in (NUM_DATA * 17 + NUM_CODING) as u64..((NUM_DATA * 17) + NUM_DATA) as u64 {
|
||||
|
@ -168,9 +219,8 @@ fn test_meta_coding_present() {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
#[test]
|
||||
fn test_can_recover() {
|
||||
fn test_erasure_meta_status() {
|
||||
let set_index = 0;
|
||||
let mut e_meta = ErasureMeta {
|
||||
set_index,
|
||||
|
@ -178,36 +228,63 @@ fn test_can_recover() {
|
|||
coding: 0,
|
||||
};
|
||||
|
||||
assert!(!e_meta.can_recover());
|
||||
assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(NUM_DATA));
|
||||
|
||||
e_meta.data = 0b1111_1111_1111_1111;
|
||||
e_meta.coding = 0x00;
|
||||
|
||||
assert!(!e_meta.can_recover());
|
||||
assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
|
||||
|
||||
e_meta.coding = 0x0e;
|
||||
assert_eq!(0x0fu8, 0b0000_1111u8);
|
||||
assert!(!e_meta.can_recover());
|
||||
assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
|
||||
|
||||
e_meta.data = 0b0111_1111_1111_1111;
|
||||
assert!(e_meta.can_recover());
|
||||
assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
|
||||
|
||||
e_meta.data = 0b0111_1111_1111_1110;
|
||||
assert!(e_meta.can_recover());
|
||||
assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
|
||||
|
||||
e_meta.data = 0b0111_1111_1011_1110;
|
||||
assert!(e_meta.can_recover());
|
||||
assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
|
||||
|
||||
e_meta.data = 0b0111_1011_1011_1110;
|
||||
assert!(!e_meta.can_recover());
|
||||
assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(1));
|
||||
|
||||
e_meta.data = 0b0111_1011_1011_1110;
|
||||
assert!(!e_meta.can_recover());
|
||||
assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(1));
|
||||
|
||||
e_meta.coding = 0b0000_1110;
|
||||
e_meta.data = 0b1111_1111_1111_1100;
|
||||
assert!(e_meta.can_recover());
|
||||
assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
|
||||
|
||||
e_meta.data = 0b1111_1111_1111_1000;
|
||||
assert!(e_meta.can_recover());
|
||||
assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_meta_data_present() {
|
||||
let set_index = 0;
|
||||
let mut e_meta = ErasureMeta {
|
||||
set_index,
|
||||
data: 0,
|
||||
coding: 0,
|
||||
};
|
||||
|
||||
for i in 0..NUM_DATA as u64 {
|
||||
e_meta.set_data_present(i, true);
|
||||
assert_eq!(e_meta.is_data_present(i), true);
|
||||
}
|
||||
for i in NUM_DATA as u64..2 * NUM_DATA as u64 {
|
||||
assert_eq!(e_meta.is_data_present(i), false);
|
||||
}
|
||||
|
||||
e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 23) as u64);
|
||||
|
||||
for i in (NUM_DATA * 23) as u64..(NUM_DATA * 24) as u64 {
|
||||
e_meta.set_data_present(i, true);
|
||||
assert_eq!(e_meta.is_data_present(i), true);
|
||||
}
|
||||
for i in (NUM_DATA * 22) as u64..(NUM_DATA * 23) as u64 {
|
||||
assert_eq!(e_meta.is_data_present(i), false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,9 +30,7 @@ impl Backend for Rocks {
|
|||
type Error = rocksdb::Error;
|
||||
|
||||
fn open(path: &Path) -> Result<Rocks> {
|
||||
#[cfg(feature = "erasure")]
|
||||
use crate::blocktree::db::columns::ErasureMeta;
|
||||
use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta};
|
||||
use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, SlotMeta};
|
||||
|
||||
fs::create_dir_all(&path)?;
|
||||
|
||||
|
@ -43,7 +41,6 @@ impl Backend for Rocks {
|
|||
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
|
||||
let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options());
|
||||
let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options());
|
||||
#[cfg(feature = "erasure")]
|
||||
let erasure_meta_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
|
||||
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
|
||||
|
@ -52,7 +49,6 @@ impl Backend for Rocks {
|
|||
meta_cf_descriptor,
|
||||
data_cf_descriptor,
|
||||
erasure_cf_descriptor,
|
||||
#[cfg(feature = "erasure")]
|
||||
erasure_meta_cf_descriptor,
|
||||
orphans_cf_descriptor,
|
||||
];
|
||||
|
@ -64,13 +60,10 @@ impl Backend for Rocks {
|
|||
}
|
||||
|
||||
fn columns(&self) -> Vec<&'static str> {
|
||||
#[cfg(feature = "erasure")]
|
||||
use crate::blocktree::db::columns::ErasureMeta;
|
||||
use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta};
|
||||
use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, SlotMeta};
|
||||
|
||||
vec![
|
||||
Coding::NAME,
|
||||
#[cfg(feature = "erasure")]
|
||||
ErasureMeta::NAME,
|
||||
Data::NAME,
|
||||
Orphans::NAME,
|
||||
|
@ -196,7 +189,6 @@ impl TypedColumn<Rocks> for cf::SlotMeta {
|
|||
type Type = super::SlotMeta;
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
impl Column<Rocks> for cf::ErasureMeta {
|
||||
const NAME: &'static str = super::ERASURE_META_CF;
|
||||
type Index = (u64, u64);
|
||||
|
@ -216,7 +208,6 @@ impl Column<Rocks> for cf::ErasureMeta {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
impl TypedColumn<Rocks> for cf::ErasureMeta {
|
||||
type Type = super::ErasureMeta;
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@
|
|||
use crate::blocktree::Blocktree;
|
||||
use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT};
|
||||
use crate::entry::{EntrySender, EntrySlice};
|
||||
#[cfg(feature = "erasure")]
|
||||
use crate::erasure::CodingGenerator;
|
||||
use crate::packet::index_blobs;
|
||||
use crate::poh_recorder::WorkingBankEntries;
|
||||
|
@ -29,8 +28,6 @@ pub enum BroadcastStageReturnType {
|
|||
|
||||
struct Broadcast {
|
||||
id: Pubkey,
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
coding_generator: CodingGenerator,
|
||||
}
|
||||
|
||||
|
@ -119,7 +116,6 @@ impl Broadcast {
|
|||
|
||||
blocktree.write_shared_blobs(&blobs)?;
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
let coding = self.coding_generator.next(&blobs);
|
||||
|
||||
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
|
||||
|
@ -129,14 +125,10 @@ impl Broadcast {
|
|||
// Send out data
|
||||
ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?;
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
|
||||
|
||||
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
|
||||
|
||||
// generate and transmit any erasure coding blobs. if erasure isn't supported, just send everything again
|
||||
#[cfg(not(feature = "erasure"))]
|
||||
ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?;
|
||||
// send out erasures
|
||||
ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
|
||||
|
||||
let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
|
||||
|
||||
|
@ -194,11 +186,11 @@ impl BroadcastStage {
|
|||
storage_entry_sender: EntrySender,
|
||||
) -> BroadcastStageReturnType {
|
||||
let me = cluster_info.read().unwrap().my_data().clone();
|
||||
let coding_generator = CodingGenerator::default();
|
||||
|
||||
let mut broadcast = Broadcast {
|
||||
id: me.id,
|
||||
#[cfg(feature = "erasure")]
|
||||
coding_generator: CodingGenerator::new(),
|
||||
coding_generator,
|
||||
};
|
||||
|
||||
loop {
|
||||
|
@ -284,9 +276,9 @@ mod test {
|
|||
use crate::entry::create_ticks;
|
||||
use crate::service::Service;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::genesis_block::GenesisBlock;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
@ -321,7 +313,9 @@ mod test {
|
|||
|
||||
let exit_sender = Arc::new(AtomicBool::new(false));
|
||||
let (storage_sender, _receiver) = channel();
|
||||
let bank = Arc::new(Bank::default());
|
||||
|
||||
let (genesis_block, _) = GenesisBlock::new(10_000);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
|
||||
// Start up the broadcast stage
|
||||
let broadcast_service = BroadcastStage::new(
|
||||
|
@ -341,15 +335,13 @@ mod test {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
//TODO this test won't work since broadcast stage no longer edits the ledger
|
||||
fn test_broadcast_ledger() {
|
||||
solana_logger::setup();
|
||||
let ledger_path = get_tmp_ledger_path("test_broadcast_ledger");
|
||||
|
||||
{
|
||||
// Create the leader scheduler
|
||||
let leader_keypair = Keypair::new();
|
||||
let start_tick_height = 0;
|
||||
let max_tick_height = start_tick_height + DEFAULT_TICKS_PER_SLOT;
|
||||
|
||||
let (entry_sender, entry_receiver) = channel();
|
||||
let broadcast_service = setup_dummy_broadcast_service(
|
||||
|
@ -358,6 +350,9 @@ mod test {
|
|||
entry_receiver,
|
||||
);
|
||||
let bank = broadcast_service.bank.clone();
|
||||
let start_tick_height = bank.tick_height();
|
||||
let max_tick_height = bank.max_tick_height();
|
||||
let ticks_per_slot = bank.ticks_per_slot();
|
||||
|
||||
let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default());
|
||||
for (i, tick) in ticks.into_iter().enumerate() {
|
||||
|
@ -367,15 +362,23 @@ mod test {
|
|||
}
|
||||
|
||||
sleep(Duration::from_millis(2000));
|
||||
|
||||
trace!(
|
||||
"[broadcast_ledger] max_tick_height: {}, start_tick_height: {}, ticks_per_slot: {}",
|
||||
max_tick_height,
|
||||
start_tick_height,
|
||||
ticks_per_slot,
|
||||
);
|
||||
|
||||
let blocktree = broadcast_service.blocktree;
|
||||
let mut blob_index = 0;
|
||||
for i in 0..max_tick_height - start_tick_height {
|
||||
let slot = (start_tick_height + i + 1) / DEFAULT_TICKS_PER_SLOT;
|
||||
let slot = (start_tick_height + i + 1) / ticks_per_slot;
|
||||
|
||||
let result = blocktree.get_data_blob(slot, blob_index).unwrap();
|
||||
|
||||
blob_index += 1;
|
||||
assert!(result.is_some());
|
||||
result.expect("expect blob presence");
|
||||
}
|
||||
|
||||
drop(entry_sender);
|
||||
|
|
|
@ -1,278 +1,217 @@
|
|||
// Support erasure coding
|
||||
use crate::packet::{Blob, SharedBlob};
|
||||
use crate::result::{Error, Result};
|
||||
//! # Erasure Coding and Recovery
|
||||
//!
|
||||
//! Blobs are logically grouped into erasure sets or blocks. Each set contains 16 sequential data
|
||||
//! blobs and 4 sequential coding blobs.
|
||||
//!
|
||||
//! Coding blobs in each set starting from `start_idx`:
|
||||
//! For each erasure set:
|
||||
//! generate `NUM_CODING` coding_blobs.
|
||||
//! index the coding blobs from `start_idx` to `start_idx + NUM_CODING - 1`.
|
||||
//!
|
||||
//! model of an erasure set, with top row being data blobs and second being coding
|
||||
//! |<======================= NUM_DATA ==============================>|
|
||||
//! |<==== NUM_CODING ===>|
|
||||
//! +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
|
||||
//! | D | | D | | D | | D | | D | | D | | D | | D | | D | | D |
|
||||
//! +---+ +---+ +---+ +---+ +---+ . . . +---+ +---+ +---+ +---+ +---+
|
||||
//! | C | | C | | C | | C | | | | | | | | | | | | |
|
||||
//! +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
|
||||
//!
|
||||
//! blob structure for coding blobs
|
||||
//!
|
||||
//! + ------- meta is set and used by transport, meta.size is actual length
|
||||
//! | of data in the byte array blob.data
|
||||
//! |
|
||||
//! | + -- data is stuff shipped over the wire, and has an included
|
||||
//! | | header
|
||||
//! V V
|
||||
//! +----------+------------------------------------------------------------+
|
||||
//! | meta | data |
|
||||
//! |+---+-- |+---+---+---+---+------------------------------------------+|
|
||||
//! || s | . || i | | f | s | ||
|
||||
//! || i | . || n | i | l | i | ||
|
||||
//! || z | . || d | d | a | z | blob.data(), or blob.data_mut() ||
|
||||
//! || e | || e | | g | e | ||
|
||||
//! |+---+-- || x | | s | | ||
|
||||
//! | |+---+---+---+---+------------------------------------------+|
|
||||
//! +----------+------------------------------------------------------------+
|
||||
//! | |<=== coding blob part for "coding" =======>|
|
||||
//! | |
|
||||
//! |<============== data blob part for "coding" ==============>|
|
||||
//!
|
||||
//!
|
||||
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
|
||||
use crate::result::Result;
|
||||
use std::cmp;
|
||||
use std::convert::AsMut;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use reed_solomon_erasure::ReedSolomon;
|
||||
|
||||
//TODO(sakridge) pick these values
|
||||
pub const NUM_DATA: usize = 16; // number of data blobs
|
||||
pub const NUM_CODING: usize = 4; // number of coding blobs, also the maximum number that can go missing
|
||||
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; // total number of blobs in an erasure set, includes data and coding blobs
|
||||
/// Number of data blobs
|
||||
pub const NUM_DATA: usize = 16;
|
||||
/// Number of coding blobs; also the maximum number that can go missing.
|
||||
pub const NUM_CODING: usize = 4;
|
||||
/// Total number of blobs in an erasure set; includes data and coding blobs
|
||||
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING;
|
||||
|
||||
macro_rules! align {
|
||||
($x:expr, $align:expr) => {
|
||||
$x + ($align - 1) & !($align - 1)
|
||||
};
|
||||
}
|
||||
/// Represents an erasure "session" with a particular configuration and number of data and coding
|
||||
/// blobs
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Session(ReedSolomon);
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum ErasureError {
|
||||
NotEnoughBlocksToDecode,
|
||||
DecodeError,
|
||||
EncodeError,
|
||||
InvalidBlockSize,
|
||||
InvalidBlobData,
|
||||
CorruptCoding,
|
||||
}
|
||||
|
||||
// k = number of data devices
|
||||
// m = number of coding devices
|
||||
// w = word size
|
||||
|
||||
extern "C" {
|
||||
fn jerasure_matrix_encode(
|
||||
k: i32,
|
||||
m: i32,
|
||||
w: i32,
|
||||
matrix: *const i32,
|
||||
data_ptrs: *const *const u8,
|
||||
coding_ptrs: *const *mut u8,
|
||||
size: i32,
|
||||
);
|
||||
fn jerasure_matrix_decode(
|
||||
k: i32,
|
||||
m: i32,
|
||||
w: i32,
|
||||
matrix: *const i32,
|
||||
row_k_ones: i32,
|
||||
erasures: *const i32,
|
||||
data_ptrs: *const *mut u8,
|
||||
coding_ptrs: *const *mut u8,
|
||||
size: i32,
|
||||
) -> i32;
|
||||
fn galois_single_divide(a: i32, b: i32, w: i32) -> i32;
|
||||
fn galois_init_default_field(w: i32) -> i32;
|
||||
}
|
||||
|
||||
use std::sync::Once;
|
||||
static ERASURE_W_ONCE: Once = Once::new();
|
||||
|
||||
// jerasure word size of 32
|
||||
fn w() -> i32 {
|
||||
let w = 32;
|
||||
unsafe {
|
||||
ERASURE_W_ONCE.call_once(|| {
|
||||
galois_init_default_field(w);
|
||||
()
|
||||
});
|
||||
}
|
||||
w
|
||||
}
|
||||
|
||||
// jerasure checks that arrays are a multiple of w()/8 in length
|
||||
fn wb() -> usize {
|
||||
(w() / 8) as usize
|
||||
}
|
||||
|
||||
fn get_matrix(m: i32, k: i32, w: i32) -> Vec<i32> {
|
||||
let mut matrix = vec![0; (m * k) as usize];
|
||||
for i in 0..m {
|
||||
for j in 0..k {
|
||||
unsafe {
|
||||
matrix[(i * k + j) as usize] = galois_single_divide(1, i ^ (m + j), w);
|
||||
}
|
||||
}
|
||||
}
|
||||
matrix
|
||||
}
|
||||
|
||||
// Generate coding blocks into coding
|
||||
// There are some alignment restrictions, blocks should be aligned by 16 bytes
|
||||
// which means their size should be >= 16 bytes
|
||||
fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Result<()> {
|
||||
if data.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let k = data.len() as i32;
|
||||
let m = coding.len() as i32;
|
||||
let block_len = data[0].len() as i32;
|
||||
let matrix: Vec<i32> = get_matrix(m, k, w());
|
||||
let mut data_arg = Vec::with_capacity(data.len());
|
||||
for block in data {
|
||||
if block_len != block.len() as i32 {
|
||||
error!(
|
||||
"data block size incorrect {} expected {}",
|
||||
block.len(),
|
||||
block_len
|
||||
);
|
||||
return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
|
||||
}
|
||||
data_arg.push(block.as_ptr());
|
||||
}
|
||||
let mut coding_arg = Vec::with_capacity(coding.len());
|
||||
for block in coding {
|
||||
if block_len != block.len() as i32 {
|
||||
error!(
|
||||
"coding block size incorrect {} expected {}",
|
||||
block.len(),
|
||||
block_len
|
||||
);
|
||||
return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
|
||||
}
|
||||
coding_arg.push(block.as_mut_ptr());
|
||||
}
|
||||
|
||||
unsafe {
|
||||
jerasure_matrix_encode(
|
||||
k,
|
||||
m,
|
||||
w(),
|
||||
matrix.as_ptr(),
|
||||
data_arg.as_ptr(),
|
||||
coding_arg.as_ptr(),
|
||||
block_len,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Recover data + coding blocks into data blocks
|
||||
// data: array of blocks to recover into
|
||||
// coding: arry of coding blocks
|
||||
// erasures: list of indices in data where blocks should be recovered
|
||||
pub fn decode_blocks(
|
||||
data: &mut [&mut [u8]],
|
||||
coding: &mut [&mut [u8]],
|
||||
erasures: &[i32],
|
||||
) -> Result<()> {
|
||||
if data.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let block_len = data[0].len();
|
||||
let matrix: Vec<i32> = get_matrix(coding.len() as i32, data.len() as i32, w());
|
||||
|
||||
// generate coding pointers, blocks should be the same size
|
||||
let mut coding_arg: Vec<*mut u8> = Vec::new();
|
||||
for x in coding.iter_mut() {
|
||||
if x.len() != block_len {
|
||||
return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
|
||||
}
|
||||
coding_arg.push(x.as_mut_ptr());
|
||||
}
|
||||
|
||||
// generate data pointers, blocks should be the same size
|
||||
let mut data_arg: Vec<*mut u8> = Vec::new();
|
||||
for x in data.iter_mut() {
|
||||
if x.len() != block_len {
|
||||
return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
|
||||
}
|
||||
data_arg.push(x.as_mut_ptr());
|
||||
}
|
||||
let ret = unsafe {
|
||||
jerasure_matrix_decode(
|
||||
data.len() as i32,
|
||||
coding.len() as i32,
|
||||
w(),
|
||||
matrix.as_ptr(),
|
||||
0,
|
||||
erasures.as_ptr(),
|
||||
data_arg.as_ptr(),
|
||||
coding_arg.as_ptr(),
|
||||
data[0].len() as i32,
|
||||
)
|
||||
};
|
||||
trace!("jerasure_matrix_decode ret: {}", ret);
|
||||
for x in data[erasures[0] as usize][0..8].iter() {
|
||||
trace!("{} ", x)
|
||||
}
|
||||
trace!("");
|
||||
if ret < 0 {
|
||||
return Err(Error::ErasureError(ErasureError::DecodeError));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Generate coding blocks in window starting from start_idx,
|
||||
// for num_blobs.. For each block place the coding blobs
|
||||
// at the start of the block like so:
|
||||
//
|
||||
// model of an erasure set, with top row being data blobs and second being coding
|
||||
// |<======================= NUM_DATA ==============================>|
|
||||
// |<==== NUM_CODING ===>|
|
||||
// +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
|
||||
// | D | | D | | D | | D | | D | | D | | D | | D | | D | | D |
|
||||
// +---+ +---+ +---+ +---+ +---+ . . . +---+ +---+ +---+ +---+ +---+
|
||||
// | C | | C | | C | | C | | | | | | | | | | | | |
|
||||
// +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
|
||||
//
|
||||
// blob structure for coding, recover
|
||||
//
|
||||
// + ------- meta is set and used by transport, meta.size is actual length
|
||||
// | of data in the byte array blob.data
|
||||
// |
|
||||
// | + -- data is stuff shipped over the wire, and has an included
|
||||
// | | header
|
||||
// V V
|
||||
// +----------+------------------------------------------------------------+
|
||||
// | meta | data |
|
||||
// |+---+-- |+---+---+---+---+------------------------------------------+|
|
||||
// || s | . || i | | f | s | ||
|
||||
// || i | . || n | i | l | i | ||
|
||||
// || z | . || d | d | a | z | blob.data(), or blob.data_mut() ||
|
||||
// || e | || e | | g | e | ||
|
||||
// |+---+-- || x | | s | | ||
|
||||
// | |+---+---+---+---+------------------------------------------+|
|
||||
// +----------+------------------------------------------------------------+
|
||||
// | |<=== coding blob part for "coding" =======>|
|
||||
// | |
|
||||
// |<============== data blob part for "coding" ==============>|
|
||||
//
|
||||
//
|
||||
//
|
||||
/// Generates coding blobs on demand given data blobs
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CodingGenerator {
|
||||
leftover: Vec<SharedBlob>, // SharedBlobs that couldn't be used in last call to next()
|
||||
/// SharedBlobs that couldn't be used in last call to next()
|
||||
leftover: Vec<SharedBlob>,
|
||||
session: Arc<Session>,
|
||||
}
|
||||
|
||||
impl Default for CodingGenerator {
|
||||
fn default() -> Self {
|
||||
CodingGenerator {
|
||||
leftover: Vec::with_capacity(NUM_DATA),
|
||||
impl Session {
|
||||
pub fn new(data_count: usize, coding_count: usize) -> Result<Session> {
|
||||
let rs = ReedSolomon::new(data_count, coding_count)?;
|
||||
|
||||
Ok(Session(rs))
|
||||
}
|
||||
|
||||
/// Create coding blocks by overwriting `parity`
|
||||
pub fn encode(&self, data: &[&[u8]], parity: &mut [&mut [u8]]) -> Result<()> {
|
||||
self.0.encode_sep(data, parity)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Recover data + coding blocks into data blocks
|
||||
/// # Arguments
|
||||
/// * `data` - array of data blocks to recover into
|
||||
/// * `coding` - array of coding blocks
|
||||
/// * `erasures` - list of indices in data where blocks should be recovered
|
||||
pub fn decode_blocks(&self, blocks: &mut [&mut [u8]], present: &[bool]) -> Result<()> {
|
||||
self.0.reconstruct(blocks, present)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns `(number_of_data_blobs, number_of_coding_blobs)`
|
||||
pub fn dimensions(&self) -> (usize, usize) {
|
||||
(self.0.data_shard_count(), self.0.parity_shard_count())
|
||||
}
|
||||
|
||||
/// Reconstruct any missing blobs in this erasure set if possible
|
||||
/// Re-indexes any coding blobs that have been reconstructed and fixes up size in metadata
|
||||
/// Assumes that the user has sliced into the blobs appropriately already. else recovery will
|
||||
/// return an error or garbage data
|
||||
pub fn reconstruct_blobs<B>(
|
||||
&self,
|
||||
blobs: &mut [B],
|
||||
present: &[bool],
|
||||
size: usize,
|
||||
block_start_idx: u64,
|
||||
slot: u64,
|
||||
) -> Result<(Vec<Blob>, Vec<Blob>)>
|
||||
where
|
||||
B: AsMut<[u8]>,
|
||||
{
|
||||
let mut blocks: Vec<&mut [u8]> = blobs.iter_mut().map(AsMut::as_mut).collect();
|
||||
|
||||
trace!("[reconstruct_blobs] present: {:?}, size: {}", present, size,);
|
||||
|
||||
// Decode the blocks
|
||||
self.decode_blocks(blocks.as_mut_slice(), &present)?;
|
||||
|
||||
let mut recovered_data = vec![];
|
||||
let mut recovered_coding = vec![];
|
||||
|
||||
let erasures = present
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(i, present)| if *present { None } else { Some(i) });
|
||||
|
||||
// Create the missing blobs from the reconstructed data
|
||||
for n in erasures {
|
||||
let data_size;
|
||||
let idx;
|
||||
let first_byte;
|
||||
|
||||
if n < NUM_DATA {
|
||||
let mut blob = Blob::new(&blocks[n]);
|
||||
|
||||
data_size = blob.data_size() as usize - BLOB_HEADER_SIZE;
|
||||
idx = n as u64 + block_start_idx;
|
||||
first_byte = blob.data[0];
|
||||
|
||||
blob.set_size(data_size);
|
||||
recovered_data.push(blob);
|
||||
} else {
|
||||
let mut blob = Blob::default();
|
||||
blob.data_mut()[..size].copy_from_slice(&blocks[n]);
|
||||
data_size = size;
|
||||
idx = (n as u64 + block_start_idx) - NUM_DATA as u64;
|
||||
first_byte = blob.data[0];
|
||||
|
||||
blob.set_slot(slot);
|
||||
blob.set_index(idx);
|
||||
blob.set_size(data_size);
|
||||
recovered_coding.push(blob);
|
||||
}
|
||||
|
||||
trace!(
|
||||
"[reconstruct_blobs] erasures[{}] ({}) data_size: {} data[0]: {}",
|
||||
n,
|
||||
idx,
|
||||
data_size,
|
||||
first_byte
|
||||
);
|
||||
}
|
||||
|
||||
Ok((recovered_data, recovered_coding))
|
||||
}
|
||||
}
|
||||
|
||||
impl CodingGenerator {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
pub fn new(session: Arc<Session>) -> Self {
|
||||
CodingGenerator {
|
||||
leftover: Vec::with_capacity(session.0.data_shard_count()),
|
||||
session,
|
||||
}
|
||||
}
|
||||
|
||||
// must be called with consecutive data blobs from previous invocation
|
||||
// blobs from a new slot not start halfway through next_data
|
||||
/// Yields next set of coding blobs, if any.
|
||||
/// Must be called with consecutive data blobs within a slot.
|
||||
///
|
||||
/// Passing in a slice with the first blob having a new slot will cause internal state to
|
||||
/// reset, so the above concern does not apply to slot boundaries, only indexes within a slot
|
||||
/// must be consecutive.
|
||||
///
|
||||
/// If used improperly, it my return garbage coding blobs, but will not give an
|
||||
/// error.
|
||||
pub fn next(&mut self, next_data: &[SharedBlob]) -> Vec<SharedBlob> {
|
||||
let (num_data, num_coding) = self.session.dimensions();
|
||||
let mut next_coding =
|
||||
Vec::with_capacity((self.leftover.len() + next_data.len()) / NUM_DATA * NUM_CODING);
|
||||
Vec::with_capacity((self.leftover.len() + next_data.len()) / num_data * num_coding);
|
||||
|
||||
if self.leftover.len() > 0 && next_data.len() > 0 {
|
||||
if self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot() {
|
||||
self.leftover.clear(); // reset on slot boundaries
|
||||
}
|
||||
if !self.leftover.is_empty()
|
||||
&& !next_data.is_empty()
|
||||
&& self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot()
|
||||
{
|
||||
self.leftover.clear();
|
||||
}
|
||||
|
||||
let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect();
|
||||
|
||||
for data_blobs in next_data.chunks(NUM_DATA) {
|
||||
if data_blobs.len() < NUM_DATA {
|
||||
for data_blobs in next_data.chunks(num_data) {
|
||||
if data_blobs.len() < num_data {
|
||||
self.leftover = data_blobs.to_vec();
|
||||
break;
|
||||
}
|
||||
self.leftover.clear();
|
||||
|
||||
// find max_data_size for the chunk, round length up to a multiple of wb()
|
||||
let max_data_size = align!(
|
||||
data_blobs
|
||||
.iter()
|
||||
.fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)),
|
||||
wb()
|
||||
);
|
||||
// find max_data_size for the erasure set
|
||||
let max_data_size = data_blobs
|
||||
.iter()
|
||||
.fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max));
|
||||
|
||||
let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
|
||||
let data_ptrs: Vec<_> = data_locks
|
||||
|
@ -280,9 +219,9 @@ impl CodingGenerator {
|
|||
.map(|l| &l.data[..max_data_size])
|
||||
.collect();
|
||||
|
||||
let mut coding_blobs = Vec::with_capacity(NUM_CODING);
|
||||
let mut coding_blobs = Vec::with_capacity(num_coding);
|
||||
|
||||
for data_blob in &data_locks[..NUM_CODING] {
|
||||
for data_blob in &data_locks[..num_coding] {
|
||||
let index = data_blob.index();
|
||||
let slot = data_blob.slot();
|
||||
let id = data_blob.id();
|
||||
|
@ -305,7 +244,7 @@ impl CodingGenerator {
|
|||
.map(|blob| &mut blob.data_mut()[..max_data_size])
|
||||
.collect();
|
||||
|
||||
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)
|
||||
self.session.encode(&data_ptrs, coding_ptrs.as_mut_slice())
|
||||
}
|
||||
.is_ok()
|
||||
{
|
||||
|
@ -320,12 +259,27 @@ impl CodingGenerator {
|
|||
}
|
||||
}
|
||||
|
||||
impl Default for Session {
|
||||
fn default() -> Session {
|
||||
Session::new(NUM_DATA, NUM_CODING).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CodingGenerator {
|
||||
fn default() -> Self {
|
||||
let session = Session::default();
|
||||
CodingGenerator {
|
||||
leftover: Vec::with_capacity(session.0.data_shard_count()),
|
||||
session: Arc::new(session),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use super::*;
|
||||
use crate::blocktree::get_tmp_ledger_path;
|
||||
use crate::blocktree::Blocktree;
|
||||
use crate::entry::{make_tiny_test_entries, EntrySlice};
|
||||
use crate::packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
|
@ -368,63 +322,63 @@ pub mod test {
|
|||
|
||||
#[test]
|
||||
fn test_coding() {
|
||||
let zero_vec = vec![0; 16];
|
||||
let mut vs: Vec<Vec<u8>> = (0..4).map(|i| (i..(16 + i)).collect()).collect();
|
||||
const N_DATA: usize = 4;
|
||||
const N_CODING: usize = 2;
|
||||
|
||||
let session = Session::new(N_DATA, N_CODING).unwrap();
|
||||
|
||||
let mut vs: Vec<Vec<u8>> = (0..N_DATA as u8).map(|i| (i..(16 + i)).collect()).collect();
|
||||
let v_orig: Vec<u8> = vs[0].clone();
|
||||
|
||||
let m = 2;
|
||||
let mut coding_blocks: Vec<_> = (0..m).map(|_| vec![0u8; 16]).collect();
|
||||
let mut coding_blocks: Vec<_> = (0..N_CODING).map(|_| vec![0u8; 16]).collect();
|
||||
|
||||
{
|
||||
let mut coding_blocks_slices: Vec<_> =
|
||||
coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect();
|
||||
let v_slices: Vec<_> = vs.iter().map(|x| x.as_slice()).collect();
|
||||
let mut coding_blocks_slices: Vec<_> =
|
||||
coding_blocks.iter_mut().map(Vec::as_mut_slice).collect();
|
||||
let v_slices: Vec<_> = vs.iter().map(Vec::as_slice).collect();
|
||||
|
||||
session
|
||||
.encode(v_slices.as_slice(), coding_blocks_slices.as_mut_slice())
|
||||
.expect("encoding must succeed");
|
||||
|
||||
assert!(generate_coding_blocks(
|
||||
coding_blocks_slices.as_mut_slice(),
|
||||
v_slices.as_slice(),
|
||||
)
|
||||
.is_ok());
|
||||
}
|
||||
trace!("test_coding: coding blocks:");
|
||||
for b in &coding_blocks {
|
||||
trace!("test_coding: {:?}", b);
|
||||
}
|
||||
let erasure: i32 = 1;
|
||||
let erasures = vec![erasure, -1];
|
||||
|
||||
let erasure: usize = 1;
|
||||
let present = &mut [true; N_DATA + N_CODING];
|
||||
present[erasure] = false;
|
||||
let erased = vs[erasure].clone();
|
||||
|
||||
// clear an entry
|
||||
vs[erasure as usize].copy_from_slice(zero_vec.as_slice());
|
||||
vs[erasure as usize].copy_from_slice(&[0; 16]);
|
||||
|
||||
{
|
||||
let mut coding_blocks_slices: Vec<_> =
|
||||
coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect();
|
||||
let mut v_slices: Vec<_> = vs.iter_mut().map(|x| x.as_mut_slice()).collect();
|
||||
let mut blocks: Vec<_> = vs
|
||||
.iter_mut()
|
||||
.chain(coding_blocks.iter_mut())
|
||||
.map(Vec::as_mut_slice)
|
||||
.collect();
|
||||
|
||||
assert!(decode_blocks(
|
||||
v_slices.as_mut_slice(),
|
||||
coding_blocks_slices.as_mut_slice(),
|
||||
erasures.as_slice(),
|
||||
)
|
||||
.is_ok());
|
||||
}
|
||||
session
|
||||
.decode_blocks(blocks.as_mut_slice(), present)
|
||||
.expect("decoding must succeed");
|
||||
|
||||
trace!("test_coding: vs:");
|
||||
for v in &vs {
|
||||
trace!("test_coding: {:?}", v);
|
||||
}
|
||||
assert_eq!(v_orig, vs[0]);
|
||||
assert_eq!(erased, vs[erasure]);
|
||||
}
|
||||
|
||||
fn test_toss_and_recover(
|
||||
session: &Session,
|
||||
data_blobs: &[SharedBlob],
|
||||
coding_blobs: &[SharedBlob],
|
||||
block_start_idx: usize,
|
||||
) {
|
||||
let size = coding_blobs[0].read().unwrap().size();
|
||||
|
||||
// toss one data and one coding
|
||||
let erasures: Vec<i32> = vec![0, NUM_DATA as i32, -1];
|
||||
|
||||
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(ERASURE_SET_SIZE);
|
||||
|
||||
blobs.push(SharedBlob::default()); // empty data, erasure at zero
|
||||
|
@ -432,14 +386,23 @@ pub mod test {
|
|||
// skip first blob
|
||||
blobs.push(blob.clone());
|
||||
}
|
||||
|
||||
blobs.push(SharedBlob::default()); // empty coding, erasure at zero
|
||||
for blob in &coding_blobs[1..NUM_CODING] {
|
||||
blobs.push(blob.clone());
|
||||
}
|
||||
|
||||
let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap();
|
||||
// toss one data and one coding
|
||||
let mut present = vec![true; blobs.len()];
|
||||
present[0] = false;
|
||||
present[NUM_DATA] = false;
|
||||
|
||||
assert!(!corrupt);
|
||||
let (recovered_data, recovered_coding) = session
|
||||
.reconstruct_shared_blobs(&mut blobs, &present, size, block_start_idx as u64, 0)
|
||||
.expect("reconstruction must succeed");
|
||||
|
||||
assert_eq!(recovered_data.len(), 1);
|
||||
assert_eq!(recovered_coding.len(), 1);
|
||||
|
||||
assert_eq!(
|
||||
blobs[1].read().unwrap().meta,
|
||||
|
@ -450,15 +413,15 @@ pub mod test {
|
|||
data_blobs[block_start_idx + 1].read().unwrap().data()
|
||||
);
|
||||
assert_eq!(
|
||||
blobs[0].read().unwrap().meta,
|
||||
recovered_data[0].meta,
|
||||
data_blobs[block_start_idx].read().unwrap().meta
|
||||
);
|
||||
assert_eq!(
|
||||
blobs[0].read().unwrap().data(),
|
||||
recovered_data[0].data(),
|
||||
data_blobs[block_start_idx].read().unwrap().data()
|
||||
);
|
||||
assert_eq!(
|
||||
blobs[NUM_DATA].read().unwrap().data(),
|
||||
recovered_coding[0].data(),
|
||||
coding_blobs[0].read().unwrap().data()
|
||||
);
|
||||
}
|
||||
|
@ -468,11 +431,11 @@ pub mod test {
|
|||
solana_logger::setup();
|
||||
|
||||
// trivial case
|
||||
let mut coding_generator = CodingGenerator::new();
|
||||
let mut coding_generator = CodingGenerator::default();
|
||||
let blobs = Vec::new();
|
||||
for _ in 0..NUM_DATA * 2 {
|
||||
let coding = coding_generator.next(&blobs);
|
||||
assert_eq!(coding.len(), 0);
|
||||
assert!(coding.is_empty());
|
||||
}
|
||||
|
||||
// test coding by iterating one blob at a time
|
||||
|
@ -480,6 +443,7 @@ pub mod test {
|
|||
|
||||
for (i, blob) in data_blobs.iter().cloned().enumerate() {
|
||||
let coding_blobs = coding_generator.next(&[blob]);
|
||||
|
||||
if !coding_blobs.is_empty() {
|
||||
assert_eq!(i % NUM_DATA, NUM_DATA - 1);
|
||||
assert_eq!(coding_blobs.len(), NUM_CODING);
|
||||
|
@ -490,7 +454,12 @@ pub mod test {
|
|||
((i / NUM_DATA) * NUM_DATA + j) as u64
|
||||
);
|
||||
}
|
||||
test_toss_and_recover(&data_blobs, &coding_blobs, i - (i % NUM_DATA));
|
||||
test_toss_and_recover(
|
||||
&coding_generator.session,
|
||||
&data_blobs,
|
||||
&coding_blobs,
|
||||
i - (i % NUM_DATA),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -499,7 +468,7 @@ pub mod test {
|
|||
fn test_erasure_generate_coding_reset_on_new_slot() {
|
||||
solana_logger::setup();
|
||||
|
||||
let mut coding_generator = CodingGenerator::new();
|
||||
let mut coding_generator = CodingGenerator::default();
|
||||
|
||||
// test coding by iterating one blob at a time
|
||||
let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
|
||||
|
@ -509,13 +478,18 @@ pub mod test {
|
|||
}
|
||||
|
||||
let coding_blobs = coding_generator.next(&data_blobs[0..NUM_DATA - 1]);
|
||||
assert_eq!(coding_blobs.len(), 0);
|
||||
assert!(coding_blobs.is_empty());
|
||||
|
||||
let coding_blobs = coding_generator.next(&data_blobs[NUM_DATA..]);
|
||||
|
||||
assert_eq!(coding_blobs.len(), NUM_CODING);
|
||||
|
||||
test_toss_and_recover(&data_blobs, &coding_blobs, NUM_DATA);
|
||||
test_toss_and_recover(
|
||||
&coding_generator.session,
|
||||
&data_blobs,
|
||||
&coding_blobs,
|
||||
NUM_DATA,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -571,24 +545,17 @@ pub mod test {
|
|||
}
|
||||
}
|
||||
|
||||
/// This test is ignored because if successful, it never stops running. It is useful for
|
||||
/// dicovering an initialization race-condition in the erasure FFI bindings. If this bug
|
||||
/// re-emerges, running with `Z_THREADS = N` where `N > 1` should crash fairly rapidly.
|
||||
#[ignore]
|
||||
#[test]
|
||||
fn test_recovery_with_model() {
|
||||
use std::env;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
|
||||
const MAX_ERASURE_SETS: u64 = 16;
|
||||
solana_logger::setup();
|
||||
let n_threads: usize = env::var("Z_THREADS")
|
||||
.unwrap_or("1".to_string())
|
||||
.parse()
|
||||
.unwrap();
|
||||
const N_THREADS: usize = 2;
|
||||
const N_SLOTS: u64 = 10;
|
||||
|
||||
let specs = (0..).map(|slot| {
|
||||
solana_logger::setup();
|
||||
|
||||
let specs = (0..N_SLOTS).map(|slot| {
|
||||
let num_erasure_sets = slot % MAX_ERASURE_SETS;
|
||||
|
||||
let set_specs = (0..num_erasure_sets)
|
||||
|
@ -602,12 +569,12 @@ pub mod test {
|
|||
SlotSpec { slot, set_specs }
|
||||
});
|
||||
|
||||
let decode_mutex = Arc::new(Mutex::new(()));
|
||||
let mut handles = vec![];
|
||||
let session = Arc::new(Session::default());
|
||||
|
||||
for i in 0..n_threads {
|
||||
for i in 0..N_THREADS {
|
||||
let specs = specs.clone();
|
||||
let decode_mutex = Arc::clone(&decode_mutex);
|
||||
let session = Arc::clone(&session);
|
||||
|
||||
let handle = thread::Builder::new()
|
||||
.name(i.to_string())
|
||||
|
@ -617,55 +584,39 @@ pub mod test {
|
|||
let erased_coding = erasure_set.coding[0].clone();
|
||||
let erased_data = erasure_set.data[..3].to_vec();
|
||||
|
||||
let mut data = Vec::with_capacity(NUM_DATA);
|
||||
let mut coding = Vec::with_capacity(NUM_CODING);
|
||||
let erasures = vec![0, 1, 2, NUM_DATA as i32, -1];
|
||||
let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE);
|
||||
|
||||
data.push(SharedBlob::default());
|
||||
data.push(SharedBlob::default());
|
||||
data.push(SharedBlob::default());
|
||||
blobs.push(SharedBlob::default());
|
||||
blobs.push(SharedBlob::default());
|
||||
blobs.push(SharedBlob::default());
|
||||
for blob in erasure_set.data.into_iter().skip(3) {
|
||||
data.push(blob);
|
||||
blobs.push(blob);
|
||||
}
|
||||
|
||||
coding.push(SharedBlob::default());
|
||||
blobs.push(SharedBlob::default());
|
||||
for blob in erasure_set.coding.into_iter().skip(1) {
|
||||
coding.push(blob);
|
||||
blobs.push(blob);
|
||||
}
|
||||
|
||||
let size = erased_coding.read().unwrap().data_size() as usize;
|
||||
let size = erased_coding.read().unwrap().size() as usize;
|
||||
|
||||
let mut data_locks: Vec<_> =
|
||||
data.iter().map(|shared| shared.write().unwrap()).collect();
|
||||
let mut coding_locks: Vec<_> = coding
|
||||
.iter()
|
||||
.map(|shared| shared.write().unwrap())
|
||||
.collect();
|
||||
let mut present = vec![true; ERASURE_SET_SIZE];
|
||||
present[0] = false;
|
||||
present[1] = false;
|
||||
present[2] = false;
|
||||
present[NUM_DATA] = false;
|
||||
|
||||
let mut data_ptrs: Vec<_> = data_locks
|
||||
.iter_mut()
|
||||
.map(|blob| &mut blob.data[..size])
|
||||
.collect();
|
||||
let mut coding_ptrs: Vec<_> = coding_locks
|
||||
.iter_mut()
|
||||
.map(|blob| &mut blob.data_mut()[..size])
|
||||
.collect();
|
||||
|
||||
{
|
||||
let _lock = decode_mutex.lock();
|
||||
|
||||
decode_blocks(
|
||||
data_ptrs.as_mut_slice(),
|
||||
coding_ptrs.as_mut_slice(),
|
||||
&erasures,
|
||||
session
|
||||
.reconstruct_shared_blobs(
|
||||
&mut blobs,
|
||||
&present,
|
||||
size,
|
||||
erasure_set.set_index * NUM_DATA as u64,
|
||||
slot_model.slot,
|
||||
)
|
||||
.expect("decoding must succeed");
|
||||
}
|
||||
.expect("reconstruction must succeed");
|
||||
|
||||
drop(coding_locks);
|
||||
drop(data_locks);
|
||||
|
||||
for (expected, recovered) in erased_data.iter().zip(data.iter()) {
|
||||
for (expected, recovered) in erased_data.iter().zip(blobs.iter()) {
|
||||
let expected = expected.read().unwrap();
|
||||
let mut recovered = recovered.write().unwrap();
|
||||
let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE;
|
||||
|
@ -677,7 +628,7 @@ pub mod test {
|
|||
|
||||
assert_eq!(
|
||||
erased_coding.read().unwrap().data(),
|
||||
coding[0].read().unwrap().data()
|
||||
blobs[NUM_DATA].read().unwrap().data()
|
||||
);
|
||||
|
||||
debug!("passed set: {}", erasure_set.set_index);
|
||||
|
@ -702,7 +653,9 @@ pub mod test {
|
|||
IntoIt: Iterator<Item = S> + Clone + 'a,
|
||||
S: Borrow<SlotSpec>,
|
||||
{
|
||||
specs.into_iter().map(|spec| {
|
||||
let mut coding_generator = CodingGenerator::default();
|
||||
|
||||
specs.into_iter().map(move |spec| {
|
||||
let spec = spec.borrow();
|
||||
let slot = spec.slot;
|
||||
|
||||
|
@ -713,7 +666,7 @@ pub mod test {
|
|||
let set_index = erasure_spec.set_index as usize;
|
||||
let start_index = set_index * NUM_DATA;
|
||||
|
||||
let mut blobs = make_tiny_test_entries(NUM_DATA).to_single_entry_shared_blobs();
|
||||
let mut blobs = generate_test_blobs(0, NUM_DATA);
|
||||
index_blobs(
|
||||
&blobs,
|
||||
&Keypair::new().pubkey(),
|
||||
|
@ -722,7 +675,6 @@ pub mod test {
|
|||
0,
|
||||
);
|
||||
|
||||
let mut coding_generator = CodingGenerator::new();
|
||||
let mut coding_blobs = coding_generator.next(&blobs);
|
||||
|
||||
blobs.drain(erasure_spec.num_data..);
|
||||
|
@ -770,84 +722,60 @@ pub mod test {
|
|||
blocktree
|
||||
}
|
||||
|
||||
// fn verify_test_blobs(offset: usize, blobs: &[SharedBlob]) -> bool {
|
||||
// let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect();
|
||||
//
|
||||
// blobs.iter().enumerate().all(|(i, blob)| {
|
||||
// let blob = blob.read().unwrap();
|
||||
// blob.index() as usize == i + offset && blob.data() == &data[..]
|
||||
// })
|
||||
// }
|
||||
//
|
||||
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
|
||||
let blobs = make_tiny_test_entries(num_blobs).to_single_entry_shared_blobs();
|
||||
let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect();
|
||||
|
||||
let blobs: Vec<_> = (0..num_blobs)
|
||||
.into_iter()
|
||||
.map(|_| {
|
||||
let mut blob = Blob::default();
|
||||
blob.data_mut()[..data.len()].copy_from_slice(&data);
|
||||
blob.set_size(data.len());
|
||||
Arc::new(RwLock::new(blob))
|
||||
})
|
||||
.collect();
|
||||
|
||||
index_blobs(&blobs, &Pubkey::new_rand(), offset as u64, 0, 0);
|
||||
|
||||
blobs
|
||||
}
|
||||
|
||||
fn decode_blobs(
|
||||
blobs: &[SharedBlob],
|
||||
erasures: &[i32],
|
||||
size: usize,
|
||||
block_start_idx: u64,
|
||||
slot: u64,
|
||||
) -> Result<bool> {
|
||||
let mut locks = Vec::with_capacity(ERASURE_SET_SIZE);
|
||||
let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
|
||||
let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
|
||||
impl Session {
|
||||
fn reconstruct_shared_blobs(
|
||||
&self,
|
||||
blobs: &mut [SharedBlob],
|
||||
present: &[bool],
|
||||
size: usize,
|
||||
block_start_idx: u64,
|
||||
slot: u64,
|
||||
) -> Result<(Vec<Blob>, Vec<Blob>)> {
|
||||
let mut locks: Vec<std::sync::RwLockWriteGuard<_>> = blobs
|
||||
.iter()
|
||||
.map(|shared_blob| shared_blob.write().unwrap())
|
||||
.collect();
|
||||
|
||||
assert_eq!(blobs.len(), ERASURE_SET_SIZE);
|
||||
for b in blobs {
|
||||
locks.push(b.write().unwrap());
|
||||
let mut slices: Vec<_> = locks
|
||||
.iter_mut()
|
||||
.enumerate()
|
||||
.map(|(i, blob)| {
|
||||
if i < NUM_DATA {
|
||||
&mut blob.data[..size]
|
||||
} else {
|
||||
&mut blob.data_mut()[..size]
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
self.reconstruct_blobs(&mut slices, present, size, block_start_idx, slot)
|
||||
}
|
||||
|
||||
for (i, l) in locks.iter_mut().enumerate() {
|
||||
if i < NUM_DATA {
|
||||
data_ptrs.push(&mut l.data[..size]);
|
||||
} else {
|
||||
coding_ptrs.push(&mut l.data_mut()[..size]);
|
||||
}
|
||||
}
|
||||
|
||||
// Decode the blocks
|
||||
decode_blocks(
|
||||
data_ptrs.as_mut_slice(),
|
||||
coding_ptrs.as_mut_slice(),
|
||||
&erasures,
|
||||
)?;
|
||||
|
||||
// Create the missing blobs from the reconstructed data
|
||||
let mut corrupt = false;
|
||||
|
||||
for i in &erasures[..erasures.len() - 1] {
|
||||
let n = *i as usize;
|
||||
let mut idx = n as u64 + block_start_idx;
|
||||
|
||||
let mut data_size;
|
||||
if n < NUM_DATA {
|
||||
data_size = locks[n].data_size() as usize;
|
||||
data_size -= BLOB_HEADER_SIZE;
|
||||
if data_size > BLOB_DATA_SIZE {
|
||||
error!("corrupt data blob[{}] data_size: {}", idx, data_size);
|
||||
corrupt = true;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
data_size = size;
|
||||
idx -= NUM_DATA as u64;
|
||||
locks[n].set_slot(slot);
|
||||
locks[n].set_index(idx);
|
||||
|
||||
if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
|
||||
error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
|
||||
corrupt = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
locks[n].set_size(data_size);
|
||||
trace!(
|
||||
"erasures[{}] ({}) size: {} data[0]: {}",
|
||||
*i,
|
||||
idx,
|
||||
data_size,
|
||||
locks[n].data()[0]
|
||||
);
|
||||
}
|
||||
|
||||
Ok(corrupt)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ pub mod cluster;
|
|||
pub mod cluster_info;
|
||||
pub mod cluster_tests;
|
||||
pub mod entry;
|
||||
#[cfg(feature = "erasure")]
|
||||
pub mod erasure;
|
||||
pub mod fetch_stage;
|
||||
pub mod fullnode;
|
||||
|
|
|
@ -376,7 +376,11 @@ pub const BLOB_FLAG_IS_CODING: u32 = 0x1;
|
|||
impl Blob {
|
||||
pub fn new(data: &[u8]) -> Self {
|
||||
let mut blob = Self::default();
|
||||
|
||||
assert!(data.len() <= blob.data.len());
|
||||
|
||||
let data_len = cmp::min(data.len(), blob.data.len());
|
||||
|
||||
let bytes = &data[..data_len];
|
||||
blob.data[..data_len].copy_from_slice(bytes);
|
||||
blob.meta.size = blob.data_size() as usize;
|
||||
|
@ -463,8 +467,8 @@ impl Blob {
|
|||
LittleEndian::read_u64(&self.data[SIZE_RANGE])
|
||||
}
|
||||
|
||||
pub fn set_data_size(&mut self, ix: u64) {
|
||||
LittleEndian::write_u64(&mut self.data[SIZE_RANGE], ix);
|
||||
pub fn set_data_size(&mut self, size: u64) {
|
||||
LittleEndian::write_u64(&mut self.data[SIZE_RANGE], size);
|
||||
}
|
||||
|
||||
pub fn data(&self) -> &[u8] {
|
||||
|
|
|
@ -2,8 +2,6 @@
|
|||
|
||||
use crate::blocktree;
|
||||
use crate::cluster_info;
|
||||
#[cfg(feature = "erasure")]
|
||||
use crate::erasure;
|
||||
use crate::packet;
|
||||
use crate::poh_recorder;
|
||||
use bincode;
|
||||
|
@ -25,8 +23,7 @@ pub enum Error {
|
|||
TransactionError(transaction::TransactionError),
|
||||
ClusterInfoError(cluster_info::ClusterInfoError),
|
||||
BlobError(packet::BlobError),
|
||||
#[cfg(feature = "erasure")]
|
||||
ErasureError(erasure::ErasureError),
|
||||
ErasureError(reed_solomon_erasure::Error),
|
||||
SendError,
|
||||
PohRecorderError(poh_recorder::PohRecorderError),
|
||||
BlocktreeError(blocktree::BlocktreeError),
|
||||
|
@ -67,9 +64,8 @@ impl std::convert::From<cluster_info::ClusterInfoError> for Error {
|
|||
Error::ClusterInfoError(e)
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "erasure")]
|
||||
impl std::convert::From<erasure::ErasureError> for Error {
|
||||
fn from(e: erasure::ErasureError) -> Error {
|
||||
impl std::convert::From<reed_solomon_erasure::Error> for Error {
|
||||
fn from(e: reed_solomon_erasure::Error) -> Error {
|
||||
Error::ErasureError(e)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue